123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- package client;
- import org.json.simple.JSONObject;
- import org.json.simple.parser.JSONParser;
- import org.json.simple.parser.ParseException;
- import java.io.*;
- import java.net.*;
- import java.util.*;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- public class Client {
- private Socket socket;
- private final String proxyHostname = "proxyserver.com";
- private final InetAddress proxyAddress;
- private final int serverPort = 1234;
- private static final String dataPath = "/data/tweets-nov-2012.json.gz.out";
- private static final String logPath = "/data/logs/";
- private int lineCount = 0;
- private volatile int lineDone = 0;
- private static List<Long> ids = new ArrayList<>();
- private PrintWriter out;
- public static void main(String[] args) throws IOException, InterruptedException, ParseException, ExecutionException {
- Client c = new Client();
- c.run();
- }
- public Client() throws UnknownHostException {
- proxyAddress = InetAddress.getByName(proxyHostname);
- }
- void run() throws IOException, InterruptedException, ParseException, ExecutionException {
- System.out.println("Local Address: " + InetAddress.getLocalHost());
- // parsing dataset from path
- int lineTotal = 1150976;
- FileReader fr = new FileReader(dataPath);
- BufferedReader br= new BufferedReader(fr);
- JSONObject obj;
- String line;
- this.socket = new Socket(proxyAddress, serverPort);
- System.out.println("Connected to " + socket.getInetAddress().getHostAddress() + ":" + serverPort);
- out = new PrintWriter(this.socket.getOutputStream(), true);
- int readProgress = 0;
- long timeDiff = 0L;
- long startTime = System.currentTimeMillis();
- Timer timer = new Timer();
- ExecutorService executor = Executors.newSingleThreadExecutor();
- List<Future<?>> tasks = new ArrayList<>();
- while((line = br.readLine()) != null) {
- lineCount++;
- obj = (JSONObject) new JSONParser().parse(line);
- long timestamp = Long.parseLong(String.valueOf(obj.get("timestamp")));
- //get first timestamp
- if(lineCount == 1)
- timeDiff = startTime - timestamp * 1000;
- //schedule sending message
- long currentTime = System.currentTimeMillis();
- long delay = (timestamp * 1000 + timeDiff) - currentTime;
- if (delay < 0)
- delay = 0;
- //timer.schedule(new ClientThread(line), delay);
- tasks.add(executor.submit(new ClientThread(line)));
- //count progress
- if(readProgress != (int)(((double)lineCount/(double)lineTotal) * 100)) {
- readProgress = (int)(((double)lineCount/(double)lineTotal) * 100);
- System.out.println(readProgress + "% read");
- }
- //limit buffer
- if(lineCount > lineDone + 100)
- tasks.get(lineCount - 2).get();
- }
- // Always close files.
- br.close();
- fr.close();
- for(Future<?> task:tasks) {
- task.get();
- }
- executor.shutdown();
- socket.close();
- writeBoundNames();
- }
- private void writeBoundNames() throws IOException {
- File file = new File(logPath + "/boundnames.txt");
- if (file.exists())
- return;
- FileWriter fw = new FileWriter(file);
- BufferedWriter bw = new BufferedWriter(fw);
- for (int i=0; i<ids.size(); i++) {
- bw.write(ids.get(i) + "\t" + "client-" + i);
- if(i<ids.size() - 1)
- bw.newLine();
- }
- bw.close();
- }
- class ClientThread implements Runnable {
- private final Object lock = new Object();
- private long id;
- private long timestamp;
- private String data;
- private String clientName;
- ClientThread(String data) throws ParseException {
- JSONObject obj = (JSONObject) new JSONParser().parse(data);
- this.timestamp = Long.parseLong(String.valueOf(obj.get("timestamp")));
- this.id = Long.parseLong(String.valueOf(obj.get("user_id")));
- this.data = data;
- this.clientName = getClientName();
- }
- @Override
- public void run() {
- send();
- log();
- incrementCount();
- }
- private void send() {
- out.write(data + "\n");
- out.flush();
- System.out.println(this.clientName + " sends: " + data);
- }
- private void log() {
- try {
- File file = new File(logPath + "/clogs.txt");
- boolean exists = file.exists();
- if (!exists) {
- file.createNewFile();
- }
- FileWriter fw = new FileWriter(file, true);
- BufferedWriter bw = new BufferedWriter(fw);
- synchronized (lock) {
- if (exists)
- bw.newLine();
- bw.write(this.clientName + "\t" + socket.getInetAddress().getHostAddress() + "\t" + timestamp);
- bw.close();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private String getClientName() {
- int index = ids.indexOf(id);
- if (index == -1) {
- ids.add(id);
- index = ids.size() - 1;
- }
- return "client-" + index;
- }
- private synchronized void incrementCount() {
- lineDone++;
- }
- }
- }
|