+import json
+import pandas as pd
+from datetime import datetime, timedelta
+from random import choice
+import hashlib
+import click
+from sqlalchemy import delete, func, select, update
+from . import app
+from .kshape_filter import KShapeFilter
+from .models import Cluster_result, Nym_info, User_info, db
+from .util import Constants, cnt_cluster
+from .update_ts import fill_ts
+class Clustering:
+ def __init__(self):
+ self.user_list = {}
+ self.time_series = pd.DataFrame()
+ self.delay_user_list = []
+ def get_user(self, df):
+ """Extract username from received dataframe"""
+ users = df['user_id'].drop_duplicates().values.tolist()
+ return users
+ def update_user(self, user_list):
+ """Add new user to user_list"""
+ user = dict(zip(user_list, [0]*len(user_list)))
+ self.user_list.update(user)
+ def update_time_series(self, df):
+ """Add new round of data to the time series"""
+ self.time_series = df.combine_first(self.time_series).fillna(0)
+ def rnd_time_series(self, df, users):
+ """Filter out the time series of active users in this round"""
+ rnd_df = df.loc[users]
+ print(rnd_df.shape)
+ return rnd_df
+ def delay_user(self, df, model, cnt_cluster):
+ """
+ Filter out users who in small cluster
+ :param cnt_cluster: count the amount of users in each cluster
+ :param model: cluster model
+ """
+ cluster_s = cnt_cluster.loc[cnt_cluster['count'] <= Constants.CLUSTER_SIZE].index.tolist()
+ new_df = pd.DataFrame()
+ for i in cluster_s:
+ filter_df = df[model.labels_ == i]
+ new_df = pd.concat([filter_df, new_df], ignore_index=False)
+ delay_user = new_df.index.tolist()
+ if delay_user:
+ print(f"{len(delay_user)} messages been delayed, join in next round")
+ return delay_user
+ def ts_cluster(self, data):
+ users = self.get_user(data)
+ rnd_users = self.delay_user_list + users
+ print(f"Participate users: {len(users)} active, {len(self.delay_user_list)} delayed")
+ df = fill_ts(data)
+ self.update_time_series(df)
+ rnd_df = self.rnd_time_series(self.time_series, rnd_users)
+ ks, y_pred = KShapeFilter.k_shape(rnd_df, Constants.CLUSTER_NUM)
+ cnt = cnt_cluster(ks)
+ self.delay_user_list = self.delay_user(rnd_df, ks, cnt)
+ def anonymity_simulation(date, hour, random=True):
+ """
+ simulate anonymity attack, select one user randomly to update user_info
+ :param date: date
+ :param hour: hour
+ :return: None
+ """
+ print("attack:", date, hour)
+ start_time = datetime.strptime(f"{date} {hour}", "%Y-%m-%d %H")
+ if random == False:
+ attack_user_target = "537073336dcd41ff4f362d111888907c"
+ else:
+ res = db.session.execute(
+ select(User_info.user_id)
+ .where(User_info.timestamp <= start_time.strftime("%Y-%m-%d %H:%M:%S"))
+ .group_by(User_info.user_id)
+ )
+ user_list_json = list(
+ map(
+ lambda x: {
+ "user_id": x,
+ },
+ res.scalars().all(),
+ )
+ )
+ user_list = [item["user_id"] for item in user_list_json]
+ attack_user_target = choice(user_list)
+ return attack_user_target
+ def clustering(self, date, hour, with_attack=0, with_random=1):
+ """clustering function"""
+ print(date, hour)
+ start_time = datetime.strptime(f"{date} {hour}", "%Y-%m-%d %H")
+ end_time = start_time + timedelta(hours=1)
+ res = db.session.execute(
+ select(User_info)
+ .where(User_info.timestamp >= start_time.strftime("%Y-%m-%d %H:%M:%S"))
+ .where(User_info.timestamp < end_time.strftime("%Y-%m-%d %H:%M:%S"))
+ )
+ user_list = list(
+ map(
+ lambda x: {
+ "user_id": x.user_id,
+ "timestamp": x.timestamp,
+ },
+ res.scalars().all(),
+ )
+ )
+ print(f"{len(user_list)} users are selected")
+ for user in user_list:
+ user_id = user["user_id"]
+ nym = hashlib.md5(user_id.encode("utf-8")).hexdigest()
+ user_nym = Nym_info(timestamp=user["timestamp"], nym=nym)
+ db.session.merge(user_nym)
+ db.session.commit()
+ k_filter = KShapeFilter(iter_hour=f"{date} {hour}")
+ k_filter.load_data(user_list)
+ k_filter.feature_extract()
+ normal_user, abnormal_user = k_filter.train_predict()
+ if with_attack == 1:
+ target_user = self.anonymity_simulation(date, hour, random=with_random)
+ if target_user not in normal_user:
+ print(f"[Warning!] Anonymity Attack, user:{target_user}")
+ normal_user.append(target_user)
+ abnormal_user = [item for item in abnormal_user if item != target_user]
+ stmt = delete(Nym_info).where(Nym_info.nym.in_(normal_user))
+ db.session.execute(stmt)
+ db.session.commit()
+ else:
+ stmt = delete(Nym_info).where(
+ Nym_info.nym.notin_(normal_user)
+ )
+ db.session.execute(stmt)
+ db.session.commit()
+ stmt = (
+ update(Nym_info)
+ .where(Nym_info.nym.in_(abnormal_user))
+ )
+ print(stmt)
+ result = db.session.execute(stmt)
+ db.session.commit()
+ nyms_count = db.session.query(
+ func.count(Nym_info.nym)
+ ).scalar()
+ print(f"fictitious user count is: {nyms_count}")
+ cluster_result = Cluster_result(
+ timestamp=start_time,
+ normal_user=json.dumps(normal_user),
+ abnormal_user=json.dumps(abnormal_user),
+ normal_user_count=len(normal_user),
+ abnormal_user_count=len(abnormal_user),
+ nyms_count=nyms_count,
+ )
+ db.session.merge(cluster_result)
+ db.session.commit()
+ print(f"{len(abnormal_user)} abnormal users")
+ return {"status": 1}
+ @app.cli.command("clustering")
+ @click.argument("date")
+ @click.argument("hour")
+ @click.option("-a", "--attack", required=True, type=int)
+ @click.option("-r", "--random", required=True, type=int)
+ def clustering_by_date_hour(self, date, hour, attack=0, random=1):
+ print(date, hour)
+ print(f"is attack?:{attack}")
+ self.clustering(date, hour, with_attack=attack, with_random=random)
+ return None