123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- package server;
- import org.json.simple.JSONArray;
- import org.json.simple.JSONObject;
- import org.json.simple.parser.JSONParser;
- import org.json.simple.parser.ParseException;
- import java.io.*;
- import java.net.InetAddress;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Random;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- public class TCPServer {
- public static void main(String[] args) throws Exception {
- TCPServer app = new TCPServer();
- app.start();
- }
- private ServerSocket server;
- private final String logPath = "/data/logs/";
- private final long startTime = System.currentTimeMillis();
- private final long startDataTime = 1351742401;
- private final long roundLength = 1800;
- private int round = 1;
- private File logfile;
- private FileWriter logFW;
- private BufferedWriter logBW;
- private Random rd = new Random();
- private static final int delayRange = 0;
- private static List<List<String>> lstOfLsts = new ArrayList<>();
- private static volatile boolean switchRound = false;
- private int lineDone = 0;
- public TCPServer() throws Exception {
- this.server = new ServerSocket(2345, 1, InetAddress.getLocalHost());
- System.out.println("\r\nRunning Server: " + "Host=" + getSocketAddress().getHostAddress() + " Port=" + getPort() + " Hostname=" + getSocketAddress().getHostName());
- logfile = new File(logPath + "/slogs.txt");
- boolean exists = logfile.exists();
- if (!exists) {
- logfile.createNewFile();
- }
- logFW = new FileWriter(logfile, true);
- logBW = new BufferedWriter(logFW);
- }
- void start() throws IOException {
- ExecutorService executor = Executors.newCachedThreadPool();
- while (true) {
- //this will block until a connection is made
- Socket client = this.server.accept();
- System.out.println("\r\nNew connection from " + client.getInetAddress().getHostAddress());
- executor.execute(new ServerThread(client));
- }
- }
- public InetAddress getSocketAddress() {
- return this.server.getInetAddress();
- }
- public int getPort() {
- return this.server.getLocalPort();
- }
- private class ServerThread implements Runnable {
- private Socket client;
- public ServerThread(Socket client) {
- this.client = client;
- }
- @Override
- public void run() {
- try {
- for(int i = 0; i < delayRange + 2; i++)
- lstOfLsts.add(new ArrayList<>());
- listen();
- this.client.close();
- } catch (IOException | ParseException | ExecutionException | InterruptedException e) {
- e.printStackTrace();
- }
- }
- void listen() throws IOException, ParseException, ExecutionException, InterruptedException {
- BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
- ExecutorService executor = Executors.newSingleThreadExecutor();
- String data = null;
- List<Future<?>> tasks = new ArrayList<>();
- while((data = in.readLine()) != null) {
- if(data.equals("end!")) {
- break;
- }
- //System.out.println("Message from " + client.getInetAddress().getHostAddress() + ": " + data);
- JSONObject obj = (JSONObject) new JSONParser().parse(data);
- long timestamp = Long.parseLong(String.valueOf(obj.get("timestamp")));
- long user_id = Long.parseLong(String.valueOf(obj.get("user_id")));
- JSONArray arr = (JSONArray) obj.get("hashtags");
- String[] hashtags = new String[arr.size()];
- for(int i=0; i<arr.size(); i++)
- hashtags[i]= String.valueOf(arr.get(i));
- Future<?> task = executor.submit(new Logger(user_id, hashtags, timestamp, client.getInetAddress().getHostAddress()));
- tasks.add(task);
- if(switchRound) {
- for(Future t : tasks)
- t.get();
- log();
- switchRound = false;
- }
- }
- for(List<String> l : lstOfLsts)
- for(String s : l) {
- logBW.write(s);
- logBW.newLine();
- incrementCount();
- }
- logBW.close();
- System.out.println("listening done");
- System.exit(0);
- }
- void log() throws IOException {
- List<String> logs = lstOfLsts.get(0);
- for(int i = 0; i < logs.size(); i++) {
- String s = logs.get(i);
- logBW.write(s);
- logBW.newLine();
- incrementCount();
- }
- lstOfLsts.remove(logs);
- lstOfLsts.add(new ArrayList<>());
- }
- private synchronized void incrementCount() {
- lineDone++;
- }
- }
- class Logger implements Runnable {
- private String[] hashtags;
- private long timestamp;
- private String senderAddress;
- private long id;
- Logger(long user_id, String[] hashtags, long timestamp, String senderAddress) {
- this.hashtags = hashtags;
- this.senderAddress = senderAddress;
- this.id = user_id;
- calculateTimeDiff(timestamp);
- }
- @Override
- public void run() {
- }
- String produceLogString() {
- String hashtagsString = "";
- for(String s : hashtags)
- hashtagsString = hashtagsString + (s + " ");
- return timestamp + "\t" + id + "\t" + hashtagsString.trim();
- }
- private void calculateTimeDiff(long stamp) {
- //long currentTime = System.currentTimeMillis();
- //long diffCurrentTime = currentTime - startTime;//in milis
- //this.timestamp = startDataTime + diffCurrentTime/1000;// in sec
- long diffStartTime = stamp - startDataTime;
- int inRound = (int)(diffStartTime / roundLength) + 1;
- int rdOffset = rd.nextInt(delayRange + 1);
- int rdRound = rdOffset + inRound;
- this.timestamp = startDataTime + roundLength * rdRound;
- //add to q
- List<String> targetLst;
- if(inRound > round) {
- round = inRound;
- switchRound = true;
- targetLst = lstOfLsts.get(rdOffset + 1);
- } else {
- targetLst = lstOfLsts.get(rdOffset);
- }
- targetLst.add(produceLogString());
- }
- }
- }
|