123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- package client;
- import org.json.simple.JSONObject;
- import org.json.simple.parser.JSONParser;
- import org.json.simple.parser.ParseException;
- import java.io.*;
- import java.net.*;
- import java.util.*;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- public class Client {
- private Socket socket;
- private final String proxyHostname = "proxyserver.com";
- private final InetAddress proxyAddress;
- private final int serverPort = 1234;
- private int lineCount = 0;
- private volatile int lineDone = 0;
- private static List<Long> ids = new ArrayList<>();
- private static Random rd = new Random();
- private static final int clientNum = 1000000;
- private static final int coverMessageNum = 0;//clientNum * 5;
- private static final int roundLength = 1800;
- private static final int roundNum = 30 * 24 * 3600 / roundLength;
- private static int coverMessageNumPerRound = coverMessageNum / roundNum;
- private static final String dataPath = "/data/sorted_tweets_random_" + clientNum + ".txt";
- private static final String logPath = "/data/logs/";
- private PrintWriter out;
- private FileWriter logFW;
- private BufferedWriter logBW;
- public static void main(String[] args) throws IOException, InterruptedException, ParseException, ExecutionException {
- Client c = new Client();
- c.run();
- }
- public Client() throws IOException {
- proxyAddress = InetAddress.getByName(proxyHostname);
- File logfile = new File(logPath + "/clogs.txt");
- boolean exists = logfile.exists();
- if (!exists) {
- logfile.createNewFile();
- }
- logFW = new FileWriter(logfile, true);
- logBW = new BufferedWriter(logFW);
- }
- void run() throws IOException, InterruptedException, ParseException, ExecutionException {
- System.out.println("Local Address: " + InetAddress.getLocalHost());
- // parsing dataset from path
- int lineTotal = 1150976;
- FileReader fr = new FileReader(dataPath);
- BufferedReader br= new BufferedReader(fr);
- JSONObject obj;
- String line;
- this.socket = new Socket(proxyAddress, serverPort);
- System.out.println("Connected to " + socket.getInetAddress().getHostAddress() + ":" + serverPort);
- out = new PrintWriter(this.socket.getOutputStream(), true);
- int readProgress = 0;
- long timeDiff = 0L;
- long startTime = System.currentTimeMillis();
- ExecutorService executor = Executors.newSingleThreadExecutor();
- List<Future<?>> tasks = new ArrayList<>();
- long roundStartTime = 1351742401;
- while((line = br.readLine()) != null) {
- lineCount++;
- obj = (JSONObject) new JSONParser().parse(line);
- long timestamp = Long.parseLong(String.valueOf(obj.get("timestamp")));
- //get first timestamp
- if(lineCount == 1) {
- timeDiff = startTime - timestamp * 1000;
- roundStartTime = timestamp;
- }
- if(timestamp - roundStartTime > roundLength) {
- roundStartTime = timestamp;
- coverMessageNumPerRound += coverMessageNum / roundNum;
- }
- tasks.add(executor.submit(new ClientThread(line)));
- //count progress
- if(readProgress != (int)(((double)lineCount/(double)lineTotal) * 100)) {
- readProgress = (int)(((double)lineCount/(double)lineTotal) * 100);
- System.out.println(readProgress + "% read");
- }
- //limit buffer
- if(lineCount > lineDone + 100)
- tasks.get(lineCount - 2).get();
- }
- out.write("end!");
- out.flush();
- // Always close files.
- br.close();
- fr.close();
- for(Future<?> task:tasks) {
- task.get();
- }
- executor.shutdown();
- socket.close();
- logBW.close();
- writeBoundNames();
- System.exit(0);
- }
- private void writeBoundNames() throws IOException {
- File file = new File(logPath + "/boundnames.txt");
- if (file.exists())
- return;
- FileWriter fw = new FileWriter(file);
- BufferedWriter bw = new BufferedWriter(fw);
- for (int i=0; i<ids.size(); i++) {
- bw.write(ids.get(i) + "\t" + "client-" + i);
- if(i<ids.size() - 1)
- bw.newLine();
- }
- bw.close();
- }
- class ClientThread implements Runnable {
- private final Object lock = new Object();
- private long id;
- private long timestamp;
- private String data;
- private String clientName;
- private int index;
- ClientThread(String data) throws ParseException {
- JSONObject obj = (JSONObject) new JSONParser().parse(data);
- this.timestamp = Long.parseLong(String.valueOf(obj.get("timestamp")));
- this.id = Long.parseLong(String.valueOf(obj.get("user_id")));
- this.data = data;
- this.clientName = getClientName();
- }
- @Override
- public void run() {
- send();
- log();
- incrementCount();
- }
- private void send() {
- out.write(data + "\n");
- out.flush();
- System.out.println(this.clientName + " sends: " + data);
- }
- private void log() {
- try {
- synchronized (lock) {
- if (lineDone != 0) {
- logBW.newLine();
- }
- logBW.write(this.clientName + "\t" + socket.getInetAddress().getHostAddress() + "\t" + timestamp);
- logCoverTraffic();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private void logCoverTraffic() throws IOException {
- int messageNum = rd.nextInt(coverMessageNumPerRound + 1);
- coverMessageNumPerRound -= messageNum;
- for(int i = 0; i < messageNum; i++) {
- logBW.newLine();
- logBW.write(coverMessageGen());
- }
- }
- private String coverMessageGen() {
- int randomCoverClientID = rd.nextInt(clientNum);
- while(randomCoverClientID == this.index)
- randomCoverClientID = rd.nextInt(clientNum);
- return "client-" + randomCoverClientID + "\t" + socket.getInetAddress().getHostAddress() + "\t" + timestamp;
- }
- private String getClientName() {
- this.index = ids.indexOf(id);
- if (index == -1) {
- ids.add(id);
- index = ids.size() - 1;
- }
- return "client-" + index;
- }
- private synchronized void incrementCount() {
- lineDone++;
- }
- }
- }
|