Browse Source

update cover traffic and delay

Minh Tùng Trần 2 years ago
parent
commit
b7cf6af722

+ 8 - 0
code/.idea/.gitignore

@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
+# Editor-based HTTP Client requests
+/httpRequests/

+ 6 - 0
code/.idea/compiler.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="CompilerConfiguration">
+    <bytecodeTargetLevel target="16" />
+  </component>
+</project>

+ 20 - 0
code/.idea/jarRepositories.xml

@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="RemoteRepositoriesConfiguration">
+    <remote-repository>
+      <option name="id" value="central" />
+      <option name="name" value="Maven Central repository" />
+      <option name="url" value="https://repo1.maven.org/maven2" />
+    </remote-repository>
+    <remote-repository>
+      <option name="id" value="jboss.community" />
+      <option name="name" value="JBoss Community repository" />
+      <option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
+    </remote-repository>
+    <remote-repository>
+      <option name="id" value="MavenRepo" />
+      <option name="name" value="MavenRepo" />
+      <option name="url" value="https://repo.maven.apache.org/maven2/" />
+    </remote-repository>
+  </component>
+</project>

+ 65 - 0
code/.idea/libraries-with-intellij-classes.xml

@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="libraries-with-intellij-classes">
+    <option name="intellijApiContainingLibraries">
+      <list>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="ideaIU" />
+          <option name="groupId" value="com.jetbrains.intellij.idea" />
+        </LibraryCoordinatesState>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="ideaIU" />
+          <option name="groupId" value="com.jetbrains" />
+        </LibraryCoordinatesState>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="ideaIC" />
+          <option name="groupId" value="com.jetbrains.intellij.idea" />
+        </LibraryCoordinatesState>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="ideaIC" />
+          <option name="groupId" value="com.jetbrains" />
+        </LibraryCoordinatesState>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="pycharmPY" />
+          <option name="groupId" value="com.jetbrains.intellij.pycharm" />
+        </LibraryCoordinatesState>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="pycharmPY" />
+          <option name="groupId" value="com.jetbrains" />
+        </LibraryCoordinatesState>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="pycharmPC" />
+          <option name="groupId" value="com.jetbrains.intellij.pycharm" />
+        </LibraryCoordinatesState>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="pycharmPC" />
+          <option name="groupId" value="com.jetbrains" />
+        </LibraryCoordinatesState>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="clion" />
+          <option name="groupId" value="com.jetbrains.intellij.clion" />
+        </LibraryCoordinatesState>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="clion" />
+          <option name="groupId" value="com.jetbrains" />
+        </LibraryCoordinatesState>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="riderRD" />
+          <option name="groupId" value="com.jetbrains.intellij.rider" />
+        </LibraryCoordinatesState>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="riderRD" />
+          <option name="groupId" value="com.jetbrains" />
+        </LibraryCoordinatesState>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="goland" />
+          <option name="groupId" value="com.jetbrains.intellij.goland" />
+        </LibraryCoordinatesState>
+        <LibraryCoordinatesState>
+          <option name="artifactId" value="goland" />
+          <option name="groupId" value="com.jetbrains" />
+        </LibraryCoordinatesState>
+      </list>
+    </option>
+  </component>
+</project>

+ 10 - 0
code/.idea/misc.xml

@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ExternalStorageConfigurationManager" enabled="true" />
+  <component name="FrameworkDetectionExcludesConfiguration">
+    <file type="web" url="file://$PROJECT_DIR$" />
+  </component>
+  <component name="ProjectRootManager" version="2" languageLevel="JDK_16" default="true" project-jdk-name="openjdk-16" project-jdk-type="JavaSDK">
+    <output url="file://$PROJECT_DIR$/out" />
+  </component>
+</project>

+ 8 - 0
code/.idea/modules.xml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectModuleManager">
+    <modules>
+      <module fileurl="file://$PROJECT_DIR$/.idea/modules/client.iml" filepath="$PROJECT_DIR$/.idea/modules/client.iml" />
+    </modules>
+  </component>
+</project>

+ 118 - 0
code/.idea/remote-targets.xml

@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="RemoteTargetsManager">
+    <targets>
+      <target name="socketclient" type="docker">
+        <config>
+          <option name="targetPlatform">
+            <TargetPlatform />
+          </option>
+          <option name="buildImageConfig">
+            <BuildImageConfig>
+              <option name="builtImageTag" value="socketclient" />
+              <option name="dockerFile" value="simulation/client/Dockerfile" />
+            </BuildImageConfig>
+          </option>
+        </config>
+        <ContributedStateBase type="JavaLanguageRuntime">
+          <config>
+            <option name="agentFolder">
+              <VolumeState>
+                <option name="targetSpecificBits">
+                  <map>
+                    <entry key="mountAsVolume" value="false" />
+                  </map>
+                </option>
+              </VolumeState>
+            </option>
+            <option name="classpathFolder">
+              <VolumeState>
+                <option name="targetSpecificBits">
+                  <map>
+                    <entry key="mountAsVolume" value="false" />
+                  </map>
+                </option>
+              </VolumeState>
+            </option>
+            <option name="homePath" value="/usr/java/openjdk-16" />
+            <option name="javaVersionString" value="16.0.1" />
+          </config>
+        </ContributedStateBase>
+      </target>
+      <target name="tcpserver" type="docker">
+        <config>
+          <option name="targetPlatform">
+            <TargetPlatform />
+          </option>
+          <option name="buildImageConfig">
+            <BuildImageConfig>
+              <option name="builtImageTag" value="tcpserver" />
+              <option name="dockerFile" value="simulation/server/Dockerfile" />
+            </BuildImageConfig>
+          </option>
+        </config>
+        <ContributedStateBase type="JavaLanguageRuntime">
+          <config>
+            <option name="agentFolder">
+              <VolumeState>
+                <option name="targetSpecificBits">
+                  <map>
+                    <entry key="mountAsVolume" value="false" />
+                  </map>
+                </option>
+              </VolumeState>
+            </option>
+            <option name="classpathFolder">
+              <VolumeState>
+                <option name="targetSpecificBits">
+                  <map>
+                    <entry key="mountAsVolume" value="false" />
+                  </map>
+                </option>
+              </VolumeState>
+            </option>
+            <option name="homePath" value="/usr/java/openjdk-16" />
+            <option name="javaVersionString" value="16.0.1" />
+          </config>
+        </ContributedStateBase>
+      </target>
+      <target name="proxyserver" type="docker">
+        <config>
+          <option name="targetPlatform">
+            <TargetPlatform />
+          </option>
+          <option name="buildImageConfig">
+            <BuildImageConfig>
+              <option name="builtImageTag" value="proxyserver" />
+              <option name="dockerFile" value="simulation/proxy/Dockerfile" />
+            </BuildImageConfig>
+          </option>
+        </config>
+        <ContributedStateBase type="JavaLanguageRuntime">
+          <config>
+            <option name="agentFolder">
+              <VolumeState>
+                <option name="targetSpecificBits">
+                  <map>
+                    <entry key="mountAsVolume" value="false" />
+                  </map>
+                </option>
+              </VolumeState>
+            </option>
+            <option name="classpathFolder">
+              <VolumeState>
+                <option name="targetSpecificBits">
+                  <map>
+                    <entry key="mountAsVolume" value="false" />
+                  </map>
+                </option>
+              </VolumeState>
+            </option>
+            <option name="homePath" value="/usr/java/openjdk-16" />
+            <option name="javaVersionString" value="16.0.1" />
+          </config>
+        </ContributedStateBase>
+      </target>
+    </targets>
+  </component>
+</project>

+ 58 - 23
code/simulation/client/src/main/java/client/Client.java

@@ -17,21 +17,40 @@ public class Client {
     private final String proxyHostname = "proxyserver.com";
     private final InetAddress proxyAddress;
     private final int serverPort = 1234;
-    private static final String dataPath = "/data/tweets-nov-2012.json.gz.out";
-    private static final String logPath = "/data/logs/";
     private int lineCount = 0;
     private volatile int lineDone = 0;
     private static List<Long> ids = new ArrayList<>();
+    private static Random rd = new Random();
+
+    private static final int clientNum = 1000000;
+    private static final int coverMessageNum = 0;//clientNum * 5;
+    private static final int roundLength = 1800;
+    private static final int roundNum = 30 * 24 * 3600 / roundLength;
+    private static int coverMessageNumPerRound = coverMessageNum / roundNum;
+
+    private static final String dataPath = "/data/sorted_tweets_random_" + clientNum + ".txt";
+    private static final String logPath = "/data/logs/";
 
     private PrintWriter out;
 
+    private FileWriter logFW;
+    private BufferedWriter logBW;
+
     public static void main(String[] args) throws IOException, InterruptedException, ParseException, ExecutionException {
         Client c = new Client();
         c.run();
     }
 
-    public Client() throws UnknownHostException {
+    public Client() throws IOException {
         proxyAddress = InetAddress.getByName(proxyHostname);
+
+        File logfile = new File(logPath + "/clogs.txt");
+        boolean exists = logfile.exists();
+        if (!exists) {
+            logfile.createNewFile();
+        }
+        logFW = new FileWriter(logfile, true);
+        logBW = new BufferedWriter(logFW);
     }
 
     void run() throws IOException, InterruptedException, ParseException, ExecutionException {
@@ -50,9 +69,9 @@ public class Client {
 
         long timeDiff = 0L;
         long startTime = System.currentTimeMillis();
-        Timer timer = new Timer();
         ExecutorService executor = Executors.newSingleThreadExecutor();
         List<Future<?>> tasks = new ArrayList<>();
+        long roundStartTime = 1351742401;
         while((line = br.readLine()) != null) {
             lineCount++;
 
@@ -60,15 +79,16 @@ public class Client {
             long timestamp = Long.parseLong(String.valueOf(obj.get("timestamp")));
 
             //get first timestamp
-            if(lineCount == 1)
+            if(lineCount == 1) {
                 timeDiff = startTime - timestamp * 1000;
+                roundStartTime = timestamp;
+            }
+
+            if(timestamp - roundStartTime > roundLength) {
+                roundStartTime = timestamp;
+                coverMessageNumPerRound += coverMessageNum / roundNum;
+            }
 
-            //schedule sending message
-            long currentTime = System.currentTimeMillis();
-            long delay = (timestamp * 1000 + timeDiff) - currentTime;
-            if (delay < 0)
-                delay = 0;
-            //timer.schedule(new ClientThread(line), delay);
             tasks.add(executor.submit(new ClientThread(line)));
             //count progress
             if(readProgress != (int)(((double)lineCount/(double)lineTotal) * 100)) {
@@ -80,6 +100,8 @@ public class Client {
             if(lineCount > lineDone + 100)
                 tasks.get(lineCount - 2).get();
         }
+        out.write("end!");
+        out.flush();
         // Always close files.
         br.close();
         fr.close();
@@ -88,7 +110,9 @@ public class Client {
         }
         executor.shutdown();
         socket.close();
+        logBW.close();
         writeBoundNames();
+        System.exit(0);
     }
 
     private void writeBoundNames() throws IOException {
@@ -112,6 +136,7 @@ public class Client {
         private long timestamp;
         private String data;
         private String clientName;
+        private int index;
 
         ClientThread(String data) throws ParseException {
             JSONObject obj = (JSONObject) new JSONParser().parse(data);
@@ -136,26 +161,36 @@ public class Client {
 
         private void log() {
             try {
-                File file = new File(logPath + "/clogs.txt");
-                boolean exists = file.exists();
-                if (!exists) {
-                    file.createNewFile();
-                }
-                FileWriter fw = new FileWriter(file, true);
-                BufferedWriter bw = new BufferedWriter(fw);
                 synchronized (lock) {
-                    if (exists)
-                        bw.newLine();
-                    bw.write(this.clientName + "\t" + socket.getInetAddress().getHostAddress() + "\t" + timestamp);
-                    bw.close();
+                    if (lineDone != 0) {
+                        logBW.newLine();
+                    }
+                    logBW.write(this.clientName + "\t" + socket.getInetAddress().getHostAddress() + "\t" + timestamp);
+                    logCoverTraffic();
                 }
             } catch (IOException e) {
                 e.printStackTrace();
             }
         }
 
+        private void logCoverTraffic() throws IOException {
+            int messageNum = rd.nextInt(coverMessageNumPerRound + 1);
+            coverMessageNumPerRound -= messageNum;
+            for(int i = 0; i < messageNum; i++) {
+                logBW.newLine();
+                logBW.write(coverMessageGen());
+            }
+        }
+
+        private String coverMessageGen() {
+            int randomCoverClientID = rd.nextInt(clientNum);
+            while(randomCoverClientID == this.index)
+                randomCoverClientID = rd.nextInt(clientNum);
+            return "client-" + randomCoverClientID + "\t" + socket.getInetAddress().getHostAddress() + "\t" + timestamp;
+        }
+
         private String getClientName() {
-            int index = ids.indexOf(id);
+            this.index = ids.indexOf(id);
             if (index == -1) {
                 ids.add(id);
                 index = ids.size() - 1;

+ 4 - 1
code/simulation/proxy/src/main/java/proxy/ProxyServer.java

@@ -13,7 +13,6 @@ import java.net.InetAddress;
 public class ProxyServer {
     public static List<String> MSGs = new ArrayList<>();
     private static final int PORT = 1234;
-    private static final int roundLength = 3600; //secs
     private ServerSocket server;
     private Forwarder f;
 
@@ -63,6 +62,7 @@ public class ProxyServer {
             try {
                 listen();
                 this.client.close();
+                System.exit(0);
             } catch (IOException e) {
                 e.printStackTrace();
             }
@@ -74,6 +74,9 @@ public class ProxyServer {
             while((data = in.readLine()) != null) {
                 //MSGs.add(data);
                 f.forwardDirectly(data);
+                if(data.equals("end!")) {
+                    break;
+                }
                 System.out.println(client.getInetAddress().getHostAddress() + ": " + data);
             }
         }

+ 87 - 31
code/simulation/server/src/main/java/server/TCPServer.java

@@ -11,6 +11,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -26,12 +27,32 @@ public class TCPServer {
     private final long startTime = System.currentTimeMillis();
     private final long startDataTime = 1351742401;
 
-    private final long roundLength = 7200;
-    private int round = 0;
+    private final long roundLength = 1800;
+    private int round = 1;
+
+    private File logfile;
+    private FileWriter logFW;
+    private BufferedWriter logBW;
+
+    private Random rd = new Random();
+
+    private static final int delayRange = 0;
+    private static List<List<String>> lstOfLsts = new ArrayList<>();
+    private static volatile boolean switchRound = false;
+
+    private int lineDone = 0;
 
     public TCPServer() throws Exception {
         this.server = new ServerSocket(2345, 1, InetAddress.getLocalHost());
         System.out.println("\r\nRunning Server: " + "Host=" + getSocketAddress().getHostAddress() + " Port=" + getPort() + " Hostname=" + getSocketAddress().getHostName());
+
+        logfile = new File(logPath + "/slogs.txt");
+        boolean exists = logfile.exists();
+        if (!exists) {
+            logfile.createNewFile();
+        }
+        logFW = new FileWriter(logfile, true);
+        logBW = new BufferedWriter(logFW);
     }
 
     void start() throws IOException {
@@ -62,6 +83,8 @@ public class TCPServer {
         @Override
         public void run() {
             try {
+                for(int i = 0; i < delayRange + 2; i++)
+                    lstOfLsts.add(new ArrayList<>());
                 listen();
                 this.client.close();
             } catch (IOException | ParseException | ExecutionException | InterruptedException e) {
@@ -75,7 +98,10 @@ public class TCPServer {
             String data = null;
             List<Future<?>> tasks = new ArrayList<>();
             while((data = in.readLine()) != null) {
-                System.out.println("Message from " + client.getInetAddress().getHostAddress() + ": " + data);
+                if(data.equals("end!")) {
+                    break;
+                }
+                //System.out.println("Message from " + client.getInetAddress().getHostAddress() + ": " + data);
 
                 JSONObject obj = (JSONObject) new JSONParser().parse(data);
                 long timestamp = Long.parseLong(String.valueOf(obj.get("timestamp")));
@@ -86,54 +112,69 @@ public class TCPServer {
                     hashtags[i]= String.valueOf(arr.get(i));
                 Future<?> task = executor.submit(new Logger(user_id, hashtags, timestamp, client.getInetAddress().getHostAddress()));
                 tasks.add(task);
+
+                if(switchRound) {
+                    for(Future t : tasks)
+                        t.get();
+                    log();
+                    switchRound = false;
+                }
             }
-            for(Future task : tasks)
-                task.get();
+            for(List<String> l : lstOfLsts)
+                for(String s : l) {
+                    logBW.write(s);
+                    logBW.newLine();
+                    incrementCount();
+                }
+            logBW.close();
             System.out.println("listening done");
+            System.exit(0);
+        }
+
+
+        void log() throws IOException {
+            List<String> logs = lstOfLsts.get(0);
+            for(int i = 0; i < logs.size(); i++) {
+                String s = logs.get(i);
+                logBW.write(s);
+                logBW.newLine();
+                incrementCount();
+            }
+            lstOfLsts.remove(logs);
+            lstOfLsts.add(new ArrayList<>());
+
+        }
+
+        private synchronized void incrementCount() {
+            lineDone++;
         }
     }
 
     class Logger implements Runnable {
-        private Object lock = new Object();
         private String[] hashtags;
         private long timestamp;
         private String senderAddress;
         private long id;
 
+
         Logger(long user_id, String[] hashtags, long timestamp, String senderAddress) {
             this.hashtags = hashtags;
-            calculateTimeDiff(timestamp);
             this.senderAddress = senderAddress;
             this.id = user_id;
+            calculateTimeDiff(timestamp);
         }
 
         @Override
         public void run() {
-            log();
         }
 
-        void log() {
-            try {
-                File file = new File(logPath + "/slogs.txt");
-                boolean exists = file.exists();
-                if (!exists) {
-                    file.createNewFile();
-                }
-                FileWriter fw = new FileWriter(file, true);
-                BufferedWriter bw = new BufferedWriter(fw);
-                synchronized (lock) {
-                    if (exists)
-                        bw.newLine();
-                    String hashtagsString = "";
-                    for(String s : hashtags)
-                        hashtagsString = hashtagsString + (s + " ");
-
-                    bw.write( timestamp + "\t" + id + "\t" + hashtagsString.trim());
-                    bw.close();
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
+
+        String produceLogString() {
+            String hashtagsString = "";
+            for(String s : hashtags)
+                hashtagsString = hashtagsString + (s + " ");
+
+            return timestamp + "\t" + id + "\t" + hashtagsString.trim();
         }
 
         private void calculateTimeDiff(long stamp) {
@@ -142,7 +183,22 @@ public class TCPServer {
             //this.timestamp = startDataTime + diffCurrentTime/1000;// in sec
             long diffStartTime = stamp - startDataTime;
             int inRound = (int)(diffStartTime / roundLength) + 1;
-            this.timestamp = startDataTime + roundLength * inRound;
+            int rdOffset = rd.nextInt(delayRange + 1);
+
+            int rdRound = rdOffset + inRound;
+            this.timestamp = startDataTime + roundLength * rdRound;
+
+            //add to q
+            List<String> targetLst;
+            if(inRound > round) {
+                round = inRound;
+                switchRound = true;
+                targetLst = lstOfLsts.get(rdOffset + 1);
+            } else {
+                targetLst = lstOfLsts.get(rdOffset);
+            }
+            targetLst.add(produceLogString());
+
         }
     }
 }

+ 6 - 5
code/src/main/java/analyzer/Accuracy.java

@@ -27,7 +27,7 @@ public class Accuracy {
     private static int counter;
     private static int totalHashtags = 1000;
     private static int totalClients = 1000;
-    private static final int clientThreshold = 2;
+    private static final int clientThreshold = 5;
 
     private static volatile int correct = 0;
     private static volatile int averageRounds = 0;
@@ -65,7 +65,7 @@ public class Accuracy {
 
     //Step: set intersection -> calculate point ->
     private static void clientTraceAccuracyUserUnknown() throws InterruptedException, ExecutionException {
-        List<Client> clientList = createClientSet(totalClients, clientThreshold);
+        List<Client> clientList = createClientSet(totalClients);
         ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
         List<Future<?>> tasks = new ArrayList<>();
 
@@ -86,7 +86,8 @@ public class Accuracy {
     }
 
     private static void clientTraceAccuracyUserKnown() throws InterruptedException, ExecutionException {
-        List<Client> clientList = createClientSet(totalClients, clientThreshold);
+        List<Client> clientList = createClientSet(totalClients);
+        System.out.println("clientList size " + clientList.size());
         ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
         List<Future<?>> tasks = new ArrayList<>();
 
@@ -140,13 +141,13 @@ public class Accuracy {
         return false;
     }
 
-    private static List<Client> createClientSet(int num, int threshold) {
+    private static List<Client> createClientSet(int num) {
         List<Client> result = new ArrayList<>();
         for(Client c : ClientLogParser.clients.values()) {
             if(result.size() == num)
                 break;
             else {
-                if(c.getTotalPosts() >= threshold)
+                if(c.getTotalPosts() >= Accuracy.clientThreshold)
                     result.add(c);
             }
         }

+ 63 - 35
code/src/main/java/analyzer/Analyzer.java

@@ -19,22 +19,29 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class Analyzer {
     private static final int threadPoolSize = 8;
 
-    public static final int roundLength = 3600;
+    public static final int roundLength = 1800;
     public static final int clientNo = 1000000;
 
+    public static final boolean coverTraffic = false;
+    public static final int delay = 0;
+    public static final String childPath = coverTraffic ? "ct" : delay > 0 ? "delay" : roundLength + "";
     public static final String logType = "random_";    //"top-" "random_"
-    public static final String logPath = "C:\\Users\\Admin\\Desktop\\Skripts\\Thesis\\Repo\\local\\vm-mount\\logs\\" + roundLength + "\\" + logType + clientNo;
-    public static final String clientLogPath = logPath + "\\clogs.txt";
+    //public static final String logPath = "C:\\Users\\Admin\\Desktop\\Skripts\\Thesis\\Repo\\local\\vm-mount\\logs\\" + logType + clientNo + "\\" + roundLength;
+    public static final String logPath = "C:\\Users\\Admin\\Desktop\\Skripts\\Thesis\\Repo\\local\\vm-mount\\logs\\" + logType + clientNo + "\\" + childPath;
+    public static String clientLogPath = logPath + "\\clogs.txt";
     public static String serverLogPath = logPath + "\\slogs.txt";
 
-    //public static final User toTraceUser = new User(306637085);
-    public static final String toTraceHashtag = "InstantFollowBack";
-    public static final String intersectingClientID = "client-6220";
+    //TEST
+    //private static final User toTraceUser = new User(306637085);
+    private static final String toTraceHashtag = "InstantFollowBack";
+    private static final String intersectingClientID = "client-1034";
+    private static final long targetUserID = 398449825L;
+
 
     public static void main(String[] args) throws InterruptedException, ExecutionException {
-        //new Analyzer().traceHashtag();
-        //new Analyzer().calculatePointsGivenHashtags(ClientLogParser.clients.get("client-6220"), new Hashtag[] {ServerLogParser.hashtags.get("InstantFollowBack"), ServerLogParser.hashtags.get("sales")}, 0, true);
-        new Analyzer().intersectUsers(ClientLogParser.clients.get("client-6219"), 0, true);
+        //a.traceHashtag();
+        //traceClientHashtagLink();
+        traceClientUserLink();
     }
 
     public Analyzer () throws InterruptedException {
@@ -45,17 +52,37 @@ public class Analyzer {
                 e.printStackTrace();
             }
         });
-        //Thread clogs = new Thread(ClientLogParser::run);
+        Thread clogs = new Thread(ClientLogParser::run);
         slogs.start();
-        //clogs.start();
+        clogs.start();
         slogs.join();
-        //clogs.join();
+        clogs.join();
         //checkParsing();
         System.out.println("round length: " + Analyzer.roundLength);
 
         //new ServerLogParser(serverLogPath).traceUser();
     }
 
+    private static void traceClientUserLink() throws InterruptedException, ExecutionException {
+        Analyzer privateInst = new Analyzer();
+        int roundNum = privateInst.intersectUsers(ClientLogParser.clients.get(intersectingClientID), 1, true);
+        System.out.println("roundNum: " + roundNum);
+    }
+
+    private static void traceClientHashtagLink() throws ExecutionException, InterruptedException {
+        Analyzer privateInst = new Analyzer();
+        List<Map.Entry<String, Integer>> intersection = privateInst.intersectHashtags(ClientLogParser.clients.get(intersectingClientID), 1000, true);
+        Hashtag[] hashtags = new Hashtag[intersection.size()];
+        for(int i = 0; i < hashtags.length; i++)
+            hashtags[i] = ServerLogParser.hashtags.get(intersection.get(i).getKey());
+        System.out.println();
+        List<Map.Entry<Hashtag, Integer>> results = privateInst.calculatePointsGivenHashtags(ClientLogParser.clients.get(intersectingClientID), hashtags, 1, true);
+        System.out.println("Result size: " + results.size());
+        User targetUser = ServerLogParser.users.get(targetUserID);
+        System.out.println();
+        targetUser.getHashtags().forEach((k, v) -> System.out.println(k.getName()));
+    }
+
     public void traceHashtag() throws InterruptedException, ExecutionException {
         Hashtag staticTargetHashtag = ServerLogParser.hashtags.get(toTraceHashtag);
         Client staticTargetClient = ClientLogParser.clients.get(intersectingClientID);
@@ -169,16 +196,7 @@ public class Analyzer {
         sortedList.sort(((o1, o2) -> o2.getValue().compareTo(o1.getValue())));
         long sortTime = System.currentTimeMillis() - startSortTime;
 
-        if(printOut) {
-            System.out.println("Points for Client " + client.getId());
-            System.out.println("Sorting time : " + sortTime + " ms");
-            System.out.println("Client-ID\tPoints");
-            int maxPrintOut = Math.min(20, sortedList.size());
-            for(int i=0; i < maxPrintOut; i++) {
-                Map.Entry<Hashtag, Integer> e = sortedList.get(i);
-                System.out.println(e.getKey().getName() + "\t" + e.getValue());
-            }
-        }
+
         if(returnNum > 0) {
             if(returnNum >= sortedList.size())
                 return sortedList;
@@ -196,6 +214,15 @@ public class Analyzer {
                             break;
                     }
                 }
+                if(printOut) {
+                    System.out.println("Points for Client " + client.getId());
+                    System.out.println("Sorting time : " + sortTime + " ms");
+                    System.out.println("Client-ID\tPoints");
+                    for(int i=0; i < returnList.size(); i++) {
+                        Map.Entry<Hashtag, Integer> e = returnList.get(i);
+                        System.out.println(e.getKey().getName() + "\t" + e.getValue());
+                    }
+                }
                 return returnList;
             }
         }
@@ -231,12 +258,7 @@ public class Analyzer {
         //results.entrySet().removeIf(e -> (e.getValue() < targetNoRound /2));
         List<Map.Entry<String, Integer>> sortedList = new ArrayList<>(results.entrySet());
         sortedList.sort(Map.Entry.comparingByValue((v1, v2) -> Integer.compare(v2, v1)));
-        if(printOut) {
-            System.out.print(targetClient.getId() + ": ");
-            System.out.println("no of rounds of " + targetClient.getId() +  ": " + targetNoRounds);
-            sortedList.forEach(e -> System.out.print(e.getKey() + "|" + e.getValue() + " "));
-            System.out.println();
-        }
+
 
         if(returnNum > 0) {
             if(returnNum >= sortedList.size())
@@ -255,6 +277,15 @@ public class Analyzer {
                             break;
                     }
                 }
+                if(printOut) {
+                    System.out.print(targetClient.getId() + ": ");
+                    System.out.println("no of rounds of " + targetClient.getId() +  ": " + targetNoRounds);
+                    for(int i = 0; i < returnList.size(); i++) {
+                        System.out.print(returnList.get(i).getKey() + "|" + returnList.get(i).getValue() + " ");
+                    }
+                    //sortedList.forEach(e -> System.out.print(e.getKey() + "|" + e.getValue() + " "));
+                    System.out.println();
+                }
                 return returnList;
             }
         }
@@ -288,10 +319,12 @@ public class Analyzer {
             roundProcessed = -1;
         }
         if(printOut) {
-            for(User u: userList)
-                System.out.print(u.getId() + " ");
+            System.out.println("Suspect(s): ");
+            userList.forEach(e -> System.out.println(e.getId()));
+            System.out.println();
             System.out.println("rounds processed: " + roundProcessed);
             System.out.println();
+            System.out.println();
         }
         return roundProcessed;
     }
@@ -300,11 +333,6 @@ public class Analyzer {
      * This method checks the parsing processes, exits the JVM if there is an error.
      */
     private static void checkParsing() {
-        if(ClientLogParser.clientLineCounter != ServerLogParser.serverLineCounter) {
-            System.out.println("Parsing Error: Line numbers are not equals");
-            System.exit(1);
-        }
-
         if(ClientLogParser.clientRoundNo != ServerLogParser.serverRoundNo) {
             System.out.println("Parsing Error: Round numbers are not equals");
             System.exit(1);

+ 13 - 19
code/src/main/java/analyzer/ClientLogParser.java

@@ -1,6 +1,7 @@
 package analyzer;
 
 import analyzer.models.Client;
+import analyzer.models.Round;
 
 import java.io.BufferedReader;
 import java.io.FileReader;
@@ -13,6 +14,7 @@ import java.util.*;
  */
 public class ClientLogParser{
 
+    private static long startRoundTime = 1351742401;
     public static volatile int clientRoundNo = 0;
     public static volatile boolean DONE = false;
 
@@ -35,27 +37,27 @@ public class ClientLogParser{
         while((line = br.readLine()) != null) {
             clientLineCounter++;
             roundLinesCount++;
-
             String[] elements = line.trim().split("\t");//client-id destination timestamp
             long timestamp = Long.parseLong(elements[2]);
-            String user_id = elements[0];
+            String client_id = elements[0];
             //get log start time
             if(clientLineCounter == 1){
                 clientRoundNo = 1;
             }
             //parse behind ServerLogParser
-            while (ClientLogParser.clientRoundNo >= ServerLogParser.serverRoundNo && !ServerLogParser.DONE)
-                Thread.sleep(10);
+            while (ClientLogParser.clientRoundNo >= ServerLogParser.serverRoundNo - 1 && !ServerLogParser.DONE)
+                Thread.onSpinWait();
 
             //next round
-            if(roundLinesCount == ServerLogParser.roundLineCounters.get(clientRoundNo - 1) + 1) {
+            if(timestamp >= startRoundTime + Analyzer.roundLength) {
+                //System.out.println(clientRoundNo + " " + roundLinesCount);
+                startRoundTime += Analyzer.roundLength;
                 clientRoundNo++;
                 roundLinesCount = 1;
             }
-            addUser(user_id, clientRoundNo);
-
-            if(ClientLogParser.clientLineCounter == ServerLogParser.serverLineCounter && ServerLogParser.DONE)
-                break;
+            addUser(client_id, clientRoundNo);
+            //if(ClientLogParser.clientLineCounter == ServerLogParser.serverLineCounter && ServerLogParser.DONE)
+            //    break;
         }
         //executor.shutdown();
         System.out.println("No of Rounds " + clientRoundNo);
@@ -72,15 +74,7 @@ public class ClientLogParser{
             c = new Client(id);
             clients.put(id, c);
         }
-        c.addRound(ServerLogParser.rounds.get(round));
-    }
-
-    private static void printOut() {
-        System.out.println("Users with hashtag " + Analyzer.toTraceHashtag + ": ");
-        System.out.println("client-ID\t" + "round:posts" );
-        for(Client c: clients.values()) {
-            System.out.print(c.getId() + "\t");
-            System.out.println(c.getRoundtoString());
-        }
+        Round r = ServerLogParser.rounds.get(round);
+        c.addRound(r);
     }
 }

+ 10 - 10
code/src/main/java/analyzer/Counter.java

@@ -15,16 +15,16 @@ public class Counter {
     private static String path = "C:\\Users\\Admin\\Desktop\\Skripts\\Thesis\\Repo\\local\\round_based_data_" + Analyzer.roundLength + ".txt";
 
     public static void main(String[] args) throws IOException, InterruptedException {
-        //countHashtag(10);
-        //System.out.println();
-        //countClient(10);
-        datasetStatistics();
+        countHashtag(10);
+        System.out.println();
+        countClient(10000);
+        //datasetStatistics();
         //trial();
     }
 
 
     public static void countHashtag(int num) throws IOException {
-        FileReader fr = new FileReader(path);
+        FileReader fr = new FileReader(Analyzer.serverLogPath);
         BufferedReader br= new BufferedReader(fr);
         String line;
         while((line = br.readLine()) != null) {
@@ -48,16 +48,16 @@ public class Counter {
     }
 
     public static void countClient(int num) throws IOException {
-        FileReader fr = new FileReader(path);
+        FileReader fr = new FileReader(Analyzer.clientLogPath);
         BufferedReader br= new BufferedReader(fr);
         String line;
         while((line = br.readLine()) != null) {
-            String[] elements = line.trim().split("\t");//timestamp user_id hashtags
+            String[] elements = line.trim().split("\t");//client-id destination timestamp
 
-            if (clientsCounter.containsKey(elements[1]))
-                clientsCounter.replace(elements[1], clientsCounter.get(elements[1]) + 1);
+            if (clientsCounter.containsKey(elements[0]))
+                clientsCounter.replace(elements[0], clientsCounter.get(elements[0]) + 1);
             else
-                clientsCounter.put(elements[1], 1);
+                clientsCounter.put(elements[0], 1);
 
         }
 

+ 6 - 16
code/src/main/java/analyzer/ServerLogParser.java

@@ -20,7 +20,7 @@ public class ServerLogParser {
 
     public static volatile int serverRoundNo = 0;
     public static volatile boolean DONE = false;
-    public static volatile List<Integer> roundLineCounters = new ArrayList<>();
+    //public static volatile List<Integer> roundLineCounters = new ArrayList<>();
     public static volatile int serverLineCounter = 0;
     public static volatile int lineTotal = 0;
 
@@ -62,9 +62,10 @@ public class ServerLogParser {
                 serverRoundNo = 1;
             }
             //next round
-            if(timestamp > startRoundTime + Analyzer.roundLength - 50) {
-                startRoundTime = timestamp;
-                roundLineCounters.add(roundLinesCount - 1);
+            if(timestamp >= startRoundTime + Analyzer.roundLength) {
+                //System.out.println(serverRoundNo+ " " + roundLinesCount);
+                startRoundTime += Analyzer.roundLength;
+                //roundLineCounters.add(roundLinesCount - 1);
                 serverRoundNo++;
                 roundLinesCount = 1;
             }
@@ -74,7 +75,7 @@ public class ServerLogParser {
             System.out.print("\r");
             System.out.print(progress + " %");
         }
-        roundLineCounters.add(roundLinesCount);
+        //roundLineCounters.add(roundLinesCount);
         System.out.println();
         System.out.println("No of Rounds " + serverRoundNo);
         System.out.println(users.size() + " users found");
@@ -119,15 +120,4 @@ public class ServerLogParser {
             h.addRound(r);
         }
     }
-
-    private static void printOut() {
-        System.out.println("Users with hashtag " + Analyzer.toTraceHashtag + ": ");
-        System.out.println("user_id\t\t" + "hashtags\t\t" + "rounds\t\t" + "total" );
-        for(User u: users.values()) {
-            System.out.print(u.getId() + "\t");
-            System.out.print(u.hashtagsToString());
-            System.out.print("\t" + u.getRounds());
-            System.out.println("\t" + u.totalPosts());
-        }
-    }
 }