package client; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; import java.io.*; import java.net.*; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class Client { private Socket socket; private final String proxyHostname = "proxyserver.com"; private final InetAddress proxyAddress; private final int serverPort = 1234; private int lineCount = 0; private volatile int lineDone = 0; private static List ids = new ArrayList<>(); private static Random rd = new Random(); private static final int clientNum = 1000000; private static final int coverMessageNum = 0;//clientNum * 5; private static final int roundLength = 1800; private static final int roundNum = 30 * 24 * 3600 / roundLength; private static int coverMessageNumPerRound = coverMessageNum / roundNum; private static final String dataPath = "/data/sorted_tweets_random_" + clientNum + ".txt"; private static final String logPath = "/data/logs/"; private PrintWriter out; private FileWriter logFW; private BufferedWriter logBW; public static void main(String[] args) throws IOException, InterruptedException, ParseException, ExecutionException { Client c = new Client(); c.run(); } public Client() throws IOException { proxyAddress = InetAddress.getByName(proxyHostname); File logfile = new File(logPath + "/clogs.txt"); boolean exists = logfile.exists(); if (!exists) { logfile.createNewFile(); } logFW = new FileWriter(logfile, true); logBW = new BufferedWriter(logFW); } void run() throws IOException, InterruptedException, ParseException, ExecutionException { System.out.println("Local Address: " + InetAddress.getLocalHost()); // parsing dataset from path int lineTotal = 1150976; FileReader fr = new FileReader(dataPath); BufferedReader br= new BufferedReader(fr); JSONObject obj; String line; this.socket = new Socket(proxyAddress, serverPort); System.out.println("Connected to " + socket.getInetAddress().getHostAddress() + ":" + serverPort); out = new PrintWriter(this.socket.getOutputStream(), true); int readProgress = 0; long timeDiff = 0L; long startTime = System.currentTimeMillis(); ExecutorService executor = Executors.newSingleThreadExecutor(); List> tasks = new ArrayList<>(); long roundStartTime = 1351742401; while((line = br.readLine()) != null) { lineCount++; obj = (JSONObject) new JSONParser().parse(line); long timestamp = Long.parseLong(String.valueOf(obj.get("timestamp"))); //get first timestamp if(lineCount == 1) { timeDiff = startTime - timestamp * 1000; roundStartTime = timestamp; } if(timestamp - roundStartTime > roundLength) { roundStartTime = timestamp; coverMessageNumPerRound += coverMessageNum / roundNum; } tasks.add(executor.submit(new ClientThread(line))); //count progress if(readProgress != (int)(((double)lineCount/(double)lineTotal) * 100)) { readProgress = (int)(((double)lineCount/(double)lineTotal) * 100); System.out.println(readProgress + "% read"); } //limit buffer if(lineCount > lineDone + 100) tasks.get(lineCount - 2).get(); } out.write("end!"); out.flush(); // Always close files. br.close(); fr.close(); for(Future task:tasks) { task.get(); } executor.shutdown(); socket.close(); logBW.close(); writeBoundNames(); System.exit(0); } private void writeBoundNames() throws IOException { File file = new File(logPath + "/boundnames.txt"); if (file.exists()) return; FileWriter fw = new FileWriter(file); BufferedWriter bw = new BufferedWriter(fw); for (int i=0; i