# -*- coding: utf-8 -*- import json import pandas as pd from datetime import datetime, timedelta from random import choice import hashlib import click from sqlalchemy import delete, func, select, update from . import app from clustering.kshape_filter import KShapeFilter from simulation.models import Cluster_result, Nym_info, User_info, db from clustering.util import Constants, cnt_cluster from clustering.update_ts import fill_ts class Clustering: def __init__(self): self.user_list = {} # user names self.time_series = pd.DataFrame() # time series dataset, update every round self.delay_user_list = [] def get_user(self, df): """Extract username from received dataframe""" users = df['user_id'].drop_duplicates().values.tolist() # users per round return users def update_user(self, user_list): """Add new user to user_list""" user = dict(zip(user_list, [0]*len(user_list))) self.user_list.update(user) def update_time_series(self, df): """Add new round of data to the time series""" self.time_series = df.combine_first(self.time_series).fillna(0) # fill NaN data for k-shape def rnd_time_series(self, df, users): """Filter out the time series of active users in this round""" rnd_df = df.loc[users] print(rnd_df.shape) return rnd_df def delay_user(self, df, model, cnt_cluster): """ Filter out users who in small cluster :param cnt_cluster: count the amount of users in each cluster :param model: cluster model """ cluster_s = cnt_cluster.loc[cnt_cluster['count'] <= Constants.CLUSTER_SIZE].index.tolist() new_df = pd.DataFrame() for i in cluster_s: filter_df = df[model.labels_ == i] new_df = pd.concat([filter_df, new_df], ignore_index=False) delay_user = new_df.index.tolist() if delay_user: print(f"{len(delay_user)} messages been delayed, join in next round") return delay_user def ts_cluster(self, data): users = self.get_user(data) rnd_users = self.delay_user_list + users print(f"Participate users: {len(users)} active, {len(self.delay_user_list)} delayed") df = fill_ts(data) self.update_time_series(df) rnd_df = self.rnd_time_series(self.time_series, rnd_users) ks, y_pred = KShapeFilter.k_shape(rnd_df, Constants.CLUSTER_NUM) cnt = cnt_cluster(ks) self.delay_user_list = self.delay_user(rnd_df, ks, cnt) def anonymity_simulation(date, hour, random=True): """ simulate anonymity attack, select one user randomly to update user_info :param date: date :param hour: hour :return: None """ print("attack:", date, hour) start_time = datetime.strptime(f"{date} {hour}", "%Y-%m-%d %H") # Two attack methods, specified anonymous attack and random anonymous attack # Specified, random=0 if random == False: attack_user_target = "537073336dcd41ff4f362d111888907c" # Random attack, random=1 else: res = db.session.execute( select(User_info.user_id) .where(User_info.timestamp <= start_time.strftime("%Y-%m-%d %H:%M:%S")) .group_by(User_info.user_id) ) user_list_json = list( map( lambda x: { "user_id": x, }, res.scalars().all(), ) ) user_list = [item["user_id"] for item in user_list_json] attack_user_target = choice(user_list) return attack_user_target def clustering(self, date, hour, with_attack=0, with_random=1): """clustering function""" print(date, hour) start_time = datetime.strptime(f"{date} {hour}", "%Y-%m-%d %H") end_time = start_time + timedelta(hours=1) res = db.session.execute( select(User_info) .where(User_info.timestamp >= start_time.strftime("%Y-%m-%d %H:%M:%S")) .where(User_info.timestamp < end_time.strftime("%Y-%m-%d %H:%M:%S")) ) # print(res.scalars().all()) user_list = list( map( lambda x: { "user_id": x.user_id, "timestamp": x.timestamp, }, res.scalars().all(), ) ) # print(user_list) print(f"{len(user_list)} users are selected") for user in user_list: user_id = user["user_id"] # user_id = hashlib.md5(int(user_id).to_bytes(8, 'big')).hexdigest() # For tweet dataset nym = hashlib.md5(user_id.encode("utf-8")).hexdigest() user_nym = Nym_info(timestamp=user["timestamp"], nym=nym) db.session.merge(user_nym) db.session.commit() k_filter = KShapeFilter(iter_hour=f"{date} {hour}") k_filter.load_data(user_list) k_filter.feature_extract() normal_user, abnormal_user = k_filter.train_predict() if with_attack == 1: target_user = self.anonymity_simulation(date, hour, random=with_random) if target_user not in normal_user: print(f"[Warning!] Anonymity Attack, user:{target_user}") normal_user.append(target_user) abnormal_user = [item for item in abnormal_user if item != target_user] # delete normal user from anonymity user list and record the length of the anonymity user list stmt = delete(Nym_info).where(Nym_info.nym.in_(normal_user)) db.session.execute(stmt) db.session.commit() else: # delete all the users which is not in normal user list stmt = delete(Nym_info).where( Nym_info.nym.notin_(normal_user) ) db.session.execute(stmt) db.session.commit() stmt = ( update(Nym_info) .where(Nym_info.nym.in_(abnormal_user)) ) print(stmt) result = db.session.execute(stmt) db.session.commit() nyms_count = db.session.query( func.count(Nym_info.nym) ).scalar() print(f"fictitious user count is: {nyms_count}") cluster_result = Cluster_result( timestamp=start_time, normal_user=json.dumps(normal_user), abnormal_user=json.dumps(abnormal_user), normal_user_count=len(normal_user), abnormal_user_count=len(abnormal_user), nyms_count=nyms_count, ) db.session.merge(cluster_result) db.session.commit() print(f"{len(abnormal_user)} abnormal users") return {"status": 1} @app.cli.command("clustering") @click.argument("date") @click.argument("hour") @click.option("-a", "--attack", required=True, type=int) @click.option("-r", "--random", required=True, type=int) def clustering_by_date_hour(self, date, hour, attack=0, random=1): print(date, hour) print(f"is attack?:{attack}") self.clustering(date, hour, with_attack=attack, with_random=random) return None