package analyzer; import analyzer.models.Client; import analyzer.models.Hashtag; import analyzer.models.Round; import analyzer.models.User; import java.io.IOException; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; /** * This class calls the parsers running parallel, then performs intersection attacks from the loaded objects. */ public class Analyzer { private static final int threadPoolSize = 8; public static final int roundLength = 1800; public static final int clientNo = 1000000; public static final boolean coverTraffic = false; public static final int delay = 0; public static final String childPath = coverTraffic ? "ct" : delay > 0 ? "delay" : roundLength + ""; public static final String logType = "random_"; //"top-" "random_" //public static final String logPath = "C:\\Users\\Admin\\Desktop\\Skripts\\Thesis\\Repo\\local\\vm-mount\\logs\\" + logType + clientNo + "\\" + roundLength; public static final String logPath = "C:\\Users\\Admin\\Desktop\\Skripts\\Thesis\\Repo\\local\\vm-mount\\logs\\" + logType + clientNo + "\\" + childPath; public static String clientLogPath = logPath + "\\clogs.txt"; public static String serverLogPath = logPath + "\\slogs.txt"; //TEST //private static final User toTraceUser = new User(306637085); private static final String toTraceHashtag = "InstantFollowBack"; private static final String intersectingClientID = "client-1034"; private static final long targetUserID = 398449825L; public static void main(String[] args) throws InterruptedException, ExecutionException { //a.traceHashtag(); //traceClientHashtagLink(); traceClientUserLink(); } public Analyzer () throws InterruptedException { Thread slogs = new Thread(() -> { try { ServerLogParser.run(); } catch (IOException e) { e.printStackTrace(); } }); Thread clogs = new Thread(ClientLogParser::run); slogs.start(); clogs.start(); slogs.join(); clogs.join(); //checkParsing(); System.out.println("round length: " + Analyzer.roundLength); //new ServerLogParser(serverLogPath).traceUser(); } private static void traceClientUserLink() throws InterruptedException, ExecutionException { Analyzer privateInst = new Analyzer(); int roundNum = privateInst.intersectUsers(ClientLogParser.clients.get(intersectingClientID), 1, true); System.out.println("roundNum: " + roundNum); } private static void traceClientHashtagLink() throws ExecutionException, InterruptedException { Analyzer privateInst = new Analyzer(); List> intersection = privateInst.intersectHashtags(ClientLogParser.clients.get(intersectingClientID), 1000, true); Hashtag[] hashtags = new Hashtag[intersection.size()]; for(int i = 0; i < hashtags.length; i++) hashtags[i] = ServerLogParser.hashtags.get(intersection.get(i).getKey()); System.out.println(); List> results = privateInst.calculatePointsGivenHashtags(ClientLogParser.clients.get(intersectingClientID), hashtags, 1, true); System.out.println("Result size: " + results.size()); User targetUser = ServerLogParser.users.get(targetUserID); System.out.println(); targetUser.getHashtags().forEach((k, v) -> System.out.println(k.getName())); } public void traceHashtag() throws InterruptedException, ExecutionException { Hashtag staticTargetHashtag = ServerLogParser.hashtags.get(toTraceHashtag); Client staticTargetClient = ClientLogParser.clients.get(intersectingClientID); System.out.println(staticTargetHashtag.getName() + " size: " + staticTargetHashtag.getRoundMap().size()); long startCalculatingMaxPoint = System.currentTimeMillis(); calculatePointsAllClients(staticTargetHashtag, 0, true); long calculatingMaxPointTime = System.currentTimeMillis() - startCalculatingMaxPoint; System.out.println("CalculatingMaxPointTime: " + calculatingMaxPointTime + " ms"); System.out.println(); System.out.println(staticTargetClient.getId() + " " + staticTargetClient.postingTraces()); //print traced hashtag System.out.println("Hashtag " + staticTargetHashtag.getName() + " " + staticTargetHashtag.hashtagTraces()); long startIntersecting = System.currentTimeMillis(); intersectHashtags(staticTargetClient, 0, true); long intersectingTime = System.currentTimeMillis() - startIntersecting; System.out.println("Intersecting Time: " + intersectingTime + " ms"); } /** * This method calculates points between a given hashtag and all clients, and prints out these points if needed. * @param hashtag hashtag to calculate points against all clients * @param returnNum number of returned highest point 2-tuples (client, point) * @param printOut true if log print-out is needed * @return list of highest point 2-tuples (client, point) * @throws InterruptedException * @throws ExecutionException */ public List> calculatePointsAllClients(Hashtag hashtag, int returnNum, boolean printOut) throws InterruptedException, ExecutionException { Hashtable clientPointsWithHashtag = new Hashtable<>(ClientLogParser.clients.size(), 1); ClientLogParser.clients.values().forEach(c -> clientPointsWithHashtag.put(c, 0)); ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize); List> tasks = new ArrayList<>(); for(Client c : clientPointsWithHashtag.keySet()) { Future task = executor.submit(new PointCalculatorTask(c, hashtag, clientPointsWithHashtag, null)); tasks.add(task); } for(Future task : tasks) task.get(); executor.shutdown(); long startSortTime = System.currentTimeMillis(); List> sortedList = new ArrayList<>(clientPointsWithHashtag.entrySet()); sortedList.sort(((o1, o2) -> o2.getValue().compareTo(o1.getValue()))); long sortTime = System.currentTimeMillis() - startSortTime; if(printOut) { System.out.println("Points for hashtag " + hashtag.getName()); System.out.println("Sorting time : " + sortTime + " ms"); System.out.println("Client-ID\tPoints"); int maxPrintOut = Math.min(20, sortedList.size()); for(int i=0; i < maxPrintOut; i++) { Map.Entry e = sortedList.get(i); System.out.println(e.getKey().getId() + "\t" + e.getValue()); } } if(returnNum > 0) { if(returnNum >= sortedList.size()) return sortedList; else { List> returnList = new ArrayList<>(); int maxVal = sortedList.get(0).getValue(); for (Map.Entry entry : sortedList) { if (entry.getValue().equals(maxVal)) { returnList.add(entry); } else { if (returnList.size() < returnNum) { maxVal = entry.getValue(); returnList.add(entry); } else break; } } return returnList; } } return null; } /** * This method calculates points between a client and a set of hashtags and prints out these points if needed. * @param client client to calculate point against a set of hashtags * @param hashtags set of hashtags to calculate point against client * @param returnNum number of returned highest point 2-tuples (hashtag, point) * @param printOut true if log print-out is needed * @return list of highest point 2-tuples (hashtag, point) * @throws InterruptedException * @throws ExecutionException */ public List> calculatePointsGivenHashtags(Client client, Hashtag[] hashtags, int returnNum, boolean printOut) throws InterruptedException, ExecutionException { Hashtable hashtagPointMap = new Hashtable<>(hashtags.length, 1); Arrays.stream(hashtags).forEach(h -> hashtagPointMap.put(h, 0)); ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize); List> tasks = new ArrayList<>(); for(Hashtag h : hashtags) { Future task = executor.submit(new PointCalculatorTask(client, h, null, hashtagPointMap)); tasks.add(task); } for(Future task : tasks) task.get(); executor.shutdown(); long startSortTime = System.currentTimeMillis(); List> sortedList = new ArrayList<>(hashtagPointMap.entrySet()); sortedList.sort(((o1, o2) -> o2.getValue().compareTo(o1.getValue()))); long sortTime = System.currentTimeMillis() - startSortTime; if(returnNum > 0) { if(returnNum >= sortedList.size()) return sortedList; else { List> returnList = new ArrayList<>(); int maxVal = sortedList.get(0).getValue(); for (Map.Entry entry : sortedList) { if (entry.getValue().equals(maxVal)) { returnList.add(entry); } else { if (returnList.size() < returnNum) { maxVal = entry.getValue(); returnList.add(entry); } else break; } } if(printOut) { System.out.println("Points for Client " + client.getId()); System.out.println("Sorting time : " + sortTime + " ms"); System.out.println("Client-ID\tPoints"); for(int i=0; i < returnList.size(); i++) { Map.Entry e = returnList.get(i); System.out.println(e.getKey().getName() + "\t" + e.getValue()); } } return returnList; } } return null; } /** * This method performs intersection attacks by calculating the correlation points from a given targetClient against hashtags based on active rounds * @param targetClient target client to perform the attack * @param returnNum number of returned 2-tuples of most relevant suspects * @param printOut true if log print-out is needed * @return list of highest point 2-tuples (hashtag, point) */ public List> intersectHashtags(Client targetClient, int returnNum, boolean printOut) { //initialize a mapping of all possible hashtags with their no of occurrences Map results = new Hashtable<>(); List targetRounds = new ArrayList<>(targetClient.getRounds().keySet()); int targetNoRounds = targetRounds.size(); for(int i = 0; i < targetNoRounds; i++) { results.forEach((k, v) -> results.replace(k, v - 1)); //results.entrySet().removeIf(e -> (e.getValue() < 0)); int targetRoundNo = targetRounds.get(i).getNo(); List hashtagsOfARound = new ArrayList<>(ServerLogParser.rounds.get(targetRoundNo).getHashtags().keySet()); for(Hashtag hashtag: hashtagsOfARound) { if(results.containsKey(hashtag.getName())) results.replace(hashtag.getName(), results.get(hashtag.getName()) + 2); else results.put(hashtag.getName(), -(i + 1) + 1); } } //results.entrySet().removeIf(e -> (e.getValue() < targetNoRound /2)); List> sortedList = new ArrayList<>(results.entrySet()); sortedList.sort(Map.Entry.comparingByValue((v1, v2) -> Integer.compare(v2, v1))); if(returnNum > 0) { if(returnNum >= sortedList.size()) return sortedList; else { List> returnList = new ArrayList<>(); int maxVal = sortedList.get(0).getValue(); for (Map.Entry entry : sortedList) { if (entry.getValue().equals(maxVal)) { returnList.add(entry); } else { if (returnList.size() < returnNum) { maxVal = entry.getValue(); returnList.add(entry); } else break; } } if(printOut) { System.out.print(targetClient.getId() + ": "); System.out.println("no of rounds of " + targetClient.getId() + ": " + targetNoRounds); for(int i = 0; i < returnList.size(); i++) { System.out.print(returnList.get(i).getKey() + "|" + returnList.get(i).getValue() + " "); } //sortedList.forEach(e -> System.out.print(e.getKey() + "|" + e.getValue() + " ")); System.out.println(); } return returnList; } } return null; } /** * This method performs intersection attacks by matching the behaviour of targetClient with all other clients. * @param targetClient target client to perform the attacks * @param returnNum number of returned 2-tuples of most relevant suspects * @param printOut true if log print-out is needed * @return number of rounds needed until the attacks finish */ public int intersectUsers(Client targetClient, int returnNum, boolean printOut) { int roundProcessed = -1; List userList = new ArrayList<>(ServerLogParser.users.values()); List roundList = new ArrayList<>(ServerLogParser.rounds.values()); roundList.sort(Comparator.comparingInt(Round::getNo)); for(Round r: roundList) { if(targetClient.getRounds().containsKey(r)) { userList.removeIf(u -> !u.getRounds().containsKey(r) || !u.getRounds().get(r).equals(targetClient.getRounds().get(r))); } else { userList.removeIf(u -> u.getRounds().containsKey(r)); } if(userList.size() <= 1) { roundProcessed = r.getNo(); break; } } if(userList.size() != 1) { roundProcessed = -1; } if(printOut) { System.out.println("Suspect(s): "); userList.forEach(e -> System.out.println(e.getId())); System.out.println(); System.out.println("rounds processed: " + roundProcessed); System.out.println(); System.out.println(); } return roundProcessed; } /** * This method checks the parsing processes, exits the JVM if there is an error. */ private static void checkParsing() { if(ClientLogParser.clientRoundNo != ServerLogParser.serverRoundNo) { System.out.println("Parsing Error: Round numbers are not equals"); System.exit(1); } if(ClientLogParser.clients.size() != ServerLogParser.users.size()) { System.out.println("Parsing Error: Client/User numbers are not equals"); System.exit(1); } } /** * This class models the parallel execution of point calculations */ private static class PointCalculatorTask implements Runnable { private final Client client; private final Hashtag hashtag; private final Hashtable clientMap; private final Hashtable hashtagMap; private PointCalculatorTask(Client c, Hashtag hashtag, Hashtable clientMap, Hashtable hashtagMap) { this.client = c; this.hashtag = hashtag; this.clientMap = clientMap; this.hashtagMap = hashtagMap; } @Override public void run() { Map clone = new Hashtable<>(client.getRounds()); AtomicInteger n = new AtomicInteger(); clone.keySet().retainAll(this.hashtag.getRoundMap().keySet()); clone.forEach((k, v) -> n.addAndGet((Math.min(v, this.hashtag.getRoundMap().get(k))))); if(clientMap != null) clientMap.put(this.client, n.get()); if(hashtagMap != null) hashtagMap.put(this.hashtag, n.get()); } } }