Client.java 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package client;
  2. import org.json.simple.JSONObject;
  3. import org.json.simple.parser.JSONParser;
  4. import org.json.simple.parser.ParseException;
  5. import java.io.*;
  6. import java.net.*;
  7. import java.util.*;
  8. import java.util.concurrent.ExecutionException;
  9. import java.util.concurrent.ExecutorService;
  10. import java.util.concurrent.Executors;
  11. import java.util.concurrent.Future;
  12. public class Client {
  13. private Socket socket;
  14. private final String proxyHostname = "proxyserver.com";
  15. private final InetAddress proxyAddress;
  16. private final int serverPort = 1234;
  17. private static final String dataPath = "/data/tweets-nov-2012.json.gz.out";
  18. private static final String logPath = "/data/logs/";
  19. private int lineCount = 0;
  20. private volatile int lineDone = 0;
  21. private static List<Long> ids = new ArrayList<>();
  22. private PrintWriter out;
  23. public static void main(String[] args) throws IOException, InterruptedException, ParseException, ExecutionException {
  24. Client c = new Client();
  25. c.run();
  26. }
  27. public Client() throws UnknownHostException {
  28. proxyAddress = InetAddress.getByName(proxyHostname);
  29. }
  30. void run() throws IOException, InterruptedException, ParseException, ExecutionException {
  31. System.out.println("Local Address: " + InetAddress.getLocalHost());
  32. // parsing dataset from path
  33. int lineTotal = 1150976;
  34. FileReader fr = new FileReader(dataPath);
  35. BufferedReader br= new BufferedReader(fr);
  36. JSONObject obj;
  37. String line;
  38. this.socket = new Socket(proxyAddress, serverPort);
  39. System.out.println("Connected to " + socket.getInetAddress().getHostAddress() + ":" + serverPort);
  40. out = new PrintWriter(this.socket.getOutputStream(), true);
  41. int readProgress = 0;
  42. long timeDiff = 0L;
  43. long startTime = System.currentTimeMillis();
  44. Timer timer = new Timer();
  45. ExecutorService executor = Executors.newSingleThreadExecutor();
  46. List<Future<?>> tasks = new ArrayList<>();
  47. while((line = br.readLine()) != null) {
  48. lineCount++;
  49. obj = (JSONObject) new JSONParser().parse(line);
  50. long timestamp = Long.parseLong(String.valueOf(obj.get("timestamp")));
  51. //get first timestamp
  52. if(lineCount == 1)
  53. timeDiff = startTime - timestamp * 1000;
  54. //schedule sending message
  55. long currentTime = System.currentTimeMillis();
  56. long delay = (timestamp * 1000 + timeDiff) - currentTime;
  57. if (delay < 0)
  58. delay = 0;
  59. //timer.schedule(new ClientThread(line), delay);
  60. tasks.add(executor.submit(new ClientThread(line)));
  61. //count progress
  62. if(readProgress != (int)(((double)lineCount/(double)lineTotal) * 100)) {
  63. readProgress = (int)(((double)lineCount/(double)lineTotal) * 100);
  64. System.out.println(readProgress + "% read");
  65. }
  66. //limit buffer
  67. if(lineCount > lineDone + 100)
  68. tasks.get(lineCount - 2).get();
  69. }
  70. // Always close files.
  71. br.close();
  72. fr.close();
  73. for(Future<?> task:tasks) {
  74. task.get();
  75. }
  76. executor.shutdown();
  77. socket.close();
  78. writeBoundNames();
  79. }
  80. private void writeBoundNames() throws IOException {
  81. File file = new File(logPath + "/boundnames.txt");
  82. if (file.exists())
  83. return;
  84. FileWriter fw = new FileWriter(file);
  85. BufferedWriter bw = new BufferedWriter(fw);
  86. for (int i=0; i<ids.size(); i++) {
  87. bw.write(ids.get(i) + "\t" + "client-" + i);
  88. if(i<ids.size() - 1)
  89. bw.newLine();
  90. }
  91. bw.close();
  92. }
  93. class ClientThread implements Runnable {
  94. private final Object lock = new Object();
  95. private long id;
  96. private long timestamp;
  97. private String data;
  98. private String clientName;
  99. ClientThread(String data) throws ParseException {
  100. JSONObject obj = (JSONObject) new JSONParser().parse(data);
  101. this.timestamp = Long.parseLong(String.valueOf(obj.get("timestamp")));
  102. this.id = Long.parseLong(String.valueOf(obj.get("user_id")));
  103. this.data = data;
  104. this.clientName = getClientName();
  105. }
  106. @Override
  107. public void run() {
  108. send();
  109. log();
  110. incrementCount();
  111. }
  112. private void send() {
  113. out.write(data + "\n");
  114. out.flush();
  115. System.out.println(this.clientName + " sends: " + data);
  116. }
  117. private void log() {
  118. try {
  119. File file = new File(logPath + "/clogs.txt");
  120. boolean exists = file.exists();
  121. if (!exists) {
  122. file.createNewFile();
  123. }
  124. FileWriter fw = new FileWriter(file, true);
  125. BufferedWriter bw = new BufferedWriter(fw);
  126. synchronized (lock) {
  127. if (exists)
  128. bw.newLine();
  129. bw.write(this.clientName + "\t" + socket.getInetAddress().getHostAddress() + "\t" + timestamp);
  130. bw.close();
  131. }
  132. } catch (IOException e) {
  133. e.printStackTrace();
  134. }
  135. }
  136. private String getClientName() {
  137. int index = ids.indexOf(id);
  138. if (index == -1) {
  139. ids.add(id);
  140. index = ids.size() - 1;
  141. }
  142. return "client-" + index;
  143. }
  144. private synchronized void incrementCount() {
  145. lineDone++;
  146. }
  147. }
  148. }