Client.java 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package client;
  2. import org.json.simple.JSONObject;
  3. import org.json.simple.parser.JSONParser;
  4. import org.json.simple.parser.ParseException;
  5. import java.io.*;
  6. import java.net.*;
  7. import java.util.*;
  8. import java.util.concurrent.ExecutionException;
  9. import java.util.concurrent.ExecutorService;
  10. import java.util.concurrent.Executors;
  11. import java.util.concurrent.Future;
  12. public class Client {
  13. private Socket socket;
  14. private final String proxyHostname = "proxyserver.com";
  15. private final InetAddress proxyAddress;
  16. private final int serverPort = 1234;
  17. private int lineCount = 0;
  18. private volatile int lineDone = 0;
  19. private static List<Long> ids = new ArrayList<>();
  20. private static Random rd = new Random();
  21. private static final int clientNum = 1000000;
  22. private static final int coverMessageNum = 0;//clientNum * 5;
  23. private static final int roundLength = 1800;
  24. private static final int roundNum = 30 * 24 * 3600 / roundLength;
  25. private static int coverMessageNumPerRound = coverMessageNum / roundNum;
  26. private static final String dataPath = "/data/sorted_tweets_random_" + clientNum + ".txt";
  27. private static final String logPath = "/data/logs/";
  28. private PrintWriter out;
  29. private FileWriter logFW;
  30. private BufferedWriter logBW;
  31. public static void main(String[] args) throws IOException, InterruptedException, ParseException, ExecutionException {
  32. Client c = new Client();
  33. c.run();
  34. }
  35. public Client() throws IOException {
  36. proxyAddress = InetAddress.getByName(proxyHostname);
  37. File logfile = new File(logPath + "/clogs.txt");
  38. boolean exists = logfile.exists();
  39. if (!exists) {
  40. logfile.createNewFile();
  41. }
  42. logFW = new FileWriter(logfile, true);
  43. logBW = new BufferedWriter(logFW);
  44. }
  45. void run() throws IOException, InterruptedException, ParseException, ExecutionException {
  46. System.out.println("Local Address: " + InetAddress.getLocalHost());
  47. // parsing dataset from path
  48. int lineTotal = 1150976;
  49. FileReader fr = new FileReader(dataPath);
  50. BufferedReader br= new BufferedReader(fr);
  51. JSONObject obj;
  52. String line;
  53. this.socket = new Socket(proxyAddress, serverPort);
  54. System.out.println("Connected to " + socket.getInetAddress().getHostAddress() + ":" + serverPort);
  55. out = new PrintWriter(this.socket.getOutputStream(), true);
  56. int readProgress = 0;
  57. long timeDiff = 0L;
  58. long startTime = System.currentTimeMillis();
  59. ExecutorService executor = Executors.newSingleThreadExecutor();
  60. List<Future<?>> tasks = new ArrayList<>();
  61. long roundStartTime = 1351742401;
  62. while((line = br.readLine()) != null) {
  63. lineCount++;
  64. obj = (JSONObject) new JSONParser().parse(line);
  65. long timestamp = Long.parseLong(String.valueOf(obj.get("timestamp")));
  66. //get first timestamp
  67. if(lineCount == 1) {
  68. timeDiff = startTime - timestamp * 1000;
  69. roundStartTime = timestamp;
  70. }
  71. if(timestamp - roundStartTime > roundLength) {
  72. roundStartTime = timestamp;
  73. coverMessageNumPerRound += coverMessageNum / roundNum;
  74. }
  75. tasks.add(executor.submit(new ClientThread(line)));
  76. //count progress
  77. if(readProgress != (int)(((double)lineCount/(double)lineTotal) * 100)) {
  78. readProgress = (int)(((double)lineCount/(double)lineTotal) * 100);
  79. System.out.println(readProgress + "% read");
  80. }
  81. //limit buffer
  82. if(lineCount > lineDone + 100)
  83. tasks.get(lineCount - 2).get();
  84. }
  85. out.write("end!");
  86. out.flush();
  87. // Always close files.
  88. br.close();
  89. fr.close();
  90. for(Future<?> task:tasks) {
  91. task.get();
  92. }
  93. executor.shutdown();
  94. socket.close();
  95. logBW.close();
  96. writeBoundNames();
  97. System.exit(0);
  98. }
  99. private void writeBoundNames() throws IOException {
  100. File file = new File(logPath + "/boundnames.txt");
  101. if (file.exists())
  102. return;
  103. FileWriter fw = new FileWriter(file);
  104. BufferedWriter bw = new BufferedWriter(fw);
  105. for (int i=0; i<ids.size(); i++) {
  106. bw.write(ids.get(i) + "\t" + "client-" + i);
  107. if(i<ids.size() - 1)
  108. bw.newLine();
  109. }
  110. bw.close();
  111. }
  112. class ClientThread implements Runnable {
  113. private final Object lock = new Object();
  114. private long id;
  115. private long timestamp;
  116. private String data;
  117. private String clientName;
  118. private int index;
  119. ClientThread(String data) throws ParseException {
  120. JSONObject obj = (JSONObject) new JSONParser().parse(data);
  121. this.timestamp = Long.parseLong(String.valueOf(obj.get("timestamp")));
  122. this.id = Long.parseLong(String.valueOf(obj.get("user_id")));
  123. this.data = data;
  124. this.clientName = getClientName();
  125. }
  126. @Override
  127. public void run() {
  128. send();
  129. log();
  130. incrementCount();
  131. }
  132. private void send() {
  133. out.write(data + "\n");
  134. out.flush();
  135. System.out.println(this.clientName + " sends: " + data);
  136. }
  137. private void log() {
  138. try {
  139. synchronized (lock) {
  140. if (lineDone != 0) {
  141. logBW.newLine();
  142. }
  143. logBW.write(this.clientName + "\t" + socket.getInetAddress().getHostAddress() + "\t" + timestamp);
  144. logCoverTraffic();
  145. }
  146. } catch (IOException e) {
  147. e.printStackTrace();
  148. }
  149. }
  150. private void logCoverTraffic() throws IOException {
  151. int messageNum = rd.nextInt(coverMessageNumPerRound + 1);
  152. coverMessageNumPerRound -= messageNum;
  153. for(int i = 0; i < messageNum; i++) {
  154. logBW.newLine();
  155. logBW.write(coverMessageGen());
  156. }
  157. }
  158. private String coverMessageGen() {
  159. int randomCoverClientID = rd.nextInt(clientNum);
  160. while(randomCoverClientID == this.index)
  161. randomCoverClientID = rd.nextInt(clientNum);
  162. return "client-" + randomCoverClientID + "\t" + socket.getInetAddress().getHostAddress() + "\t" + timestamp;
  163. }
  164. private String getClientName() {
  165. this.index = ids.indexOf(id);
  166. if (index == -1) {
  167. ids.add(id);
  168. index = ids.size() - 1;
  169. }
  170. return "client-" + index;
  171. }
  172. private synchronized void incrementCount() {
  173. lineDone++;
  174. }
  175. }
  176. }