package server; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; import java.io.*; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TCPServer { public static void main(String[] args) throws Exception { TCPServer app = new TCPServer(); app.start(); } private ServerSocket server; private final String logPath = "/data/logs/"; private final long startTime = System.currentTimeMillis(); private final long startDataTime = 1351742401; private final long roundLength = 1800; private int round = 1; private File logfile; private FileWriter logFW; private BufferedWriter logBW; private Random rd = new Random(); private static final int delayRange = 0; private static List> lstOfLsts = new ArrayList<>(); private static volatile boolean switchRound = false; private int lineDone = 0; public TCPServer() throws Exception { this.server = new ServerSocket(2345, 1, InetAddress.getLocalHost()); System.out.println("\r\nRunning Server: " + "Host=" + getSocketAddress().getHostAddress() + " Port=" + getPort() + " Hostname=" + getSocketAddress().getHostName()); logfile = new File(logPath + "/slogs.txt"); boolean exists = logfile.exists(); if (!exists) { logfile.createNewFile(); } logFW = new FileWriter(logfile, true); logBW = new BufferedWriter(logFW); } void start() throws IOException { ExecutorService executor = Executors.newCachedThreadPool(); while (true) { //this will block until a connection is made Socket client = this.server.accept(); System.out.println("\r\nNew connection from " + client.getInetAddress().getHostAddress()); executor.execute(new ServerThread(client)); } } public InetAddress getSocketAddress() { return this.server.getInetAddress(); } public int getPort() { return this.server.getLocalPort(); } private class ServerThread implements Runnable { private Socket client; public ServerThread(Socket client) { this.client = client; } @Override public void run() { try { for(int i = 0; i < delayRange + 2; i++) lstOfLsts.add(new ArrayList<>()); listen(); this.client.close(); } catch (IOException | ParseException | ExecutionException | InterruptedException e) { e.printStackTrace(); } } void listen() throws IOException, ParseException, ExecutionException, InterruptedException { BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); ExecutorService executor = Executors.newSingleThreadExecutor(); String data = null; List> tasks = new ArrayList<>(); while((data = in.readLine()) != null) { if(data.equals("end!")) { break; } //System.out.println("Message from " + client.getInetAddress().getHostAddress() + ": " + data); JSONObject obj = (JSONObject) new JSONParser().parse(data); long timestamp = Long.parseLong(String.valueOf(obj.get("timestamp"))); long user_id = Long.parseLong(String.valueOf(obj.get("user_id"))); JSONArray arr = (JSONArray) obj.get("hashtags"); String[] hashtags = new String[arr.size()]; for(int i=0; i task = executor.submit(new Logger(user_id, hashtags, timestamp, client.getInetAddress().getHostAddress())); tasks.add(task); if(switchRound) { for(Future t : tasks) t.get(); log(); switchRound = false; } } for(List l : lstOfLsts) for(String s : l) { logBW.write(s); logBW.newLine(); incrementCount(); } logBW.close(); System.out.println("listening done"); System.exit(0); } void log() throws IOException { List logs = lstOfLsts.get(0); for(int i = 0; i < logs.size(); i++) { String s = logs.get(i); logBW.write(s); logBW.newLine(); incrementCount(); } lstOfLsts.remove(logs); lstOfLsts.add(new ArrayList<>()); } private synchronized void incrementCount() { lineDone++; } } class Logger implements Runnable { private String[] hashtags; private long timestamp; private String senderAddress; private long id; Logger(long user_id, String[] hashtags, long timestamp, String senderAddress) { this.hashtags = hashtags; this.senderAddress = senderAddress; this.id = user_id; calculateTimeDiff(timestamp); } @Override public void run() { } String produceLogString() { String hashtagsString = ""; for(String s : hashtags) hashtagsString = hashtagsString + (s + " "); return timestamp + "\t" + id + "\t" + hashtagsString.trim(); } private void calculateTimeDiff(long stamp) { //long currentTime = System.currentTimeMillis(); //long diffCurrentTime = currentTime - startTime;//in milis //this.timestamp = startDataTime + diffCurrentTime/1000;// in sec long diffStartTime = stamp - startDataTime; int inRound = (int)(diffStartTime / roundLength) + 1; int rdOffset = rd.nextInt(delayRange + 1); int rdRound = rdOffset + inRound; this.timestamp = startDataTime + roundLength * rdRound; //add to q List targetLst; if(inRound > round) { round = inRound; switchRound = true; targetLst = lstOfLsts.get(rdOffset + 1); } else { targetLst = lstOfLsts.get(rdOffset); } targetLst.add(produceLogString()); } } }