123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- # -*- coding: utf-8 -*-
- import json
- import pandas as pd
- from datetime import datetime, timedelta
- from random import choice
- import hashlib
- import click
- from sqlalchemy import delete, func, select, update
- from . import app
- from .kshape_filter import KShapeFilter
- from .models import Cluster_result, Nym_info, User_info, db
- from .util import Constants, cnt_cluster
- from .update_ts import fill_ts
- class Clustering:
- def __init__(self):
- self.user_list = {} # user names
- self.time_series = pd.DataFrame() # time series dataset, update every round
- self.delay_user_list = []
- def get_user(self, df):
- """Extract username from received dataframe"""
- users = df['user_id'].drop_duplicates().values.tolist() # users per round
- return users
- def update_user(self, user_list):
- """Add new user to user_list"""
- user = dict(zip(user_list, [0]*len(user_list)))
- self.user_list.update(user)
- def update_time_series(self, df):
- """Add new round of data to the time series"""
- self.time_series = df.combine_first(self.time_series).fillna(0) # fill NaN data for k-shape
- def rnd_time_series(self, df, users):
- """Filter out the time series of active users in this round"""
- rnd_df = df.loc[users]
- print(rnd_df.shape)
- return rnd_df
- def delay_user(self, df, model, cnt_cluster):
- """
- Filter out users who in small cluster
- :param cnt_cluster: count the amount of users in each cluster
- :param model: cluster model
- """
- cluster_s = cnt_cluster.loc[cnt_cluster['count'] <= Constants.CLUSTER_SIZE].index.tolist()
- new_df = pd.DataFrame()
- for i in cluster_s:
- filter_df = df[model.labels_ == i]
- new_df = pd.concat([filter_df, new_df], ignore_index=False)
- delay_user = new_df.index.tolist()
- if delay_user:
- print(f"{len(delay_user)} messages been delayed, join in next round")
- return delay_user
- def ts_cluster(self, data):
- users = self.get_user(data)
- rnd_users = self.delay_user_list + users
- print(f"Participate users: {len(users)} active, {len(self.delay_user_list)} delayed")
- df = fill_ts(data)
- self.update_time_series(df)
- rnd_df = self.rnd_time_series(self.time_series, rnd_users)
- ks, y_pred = KShapeFilter.k_shape(rnd_df, Constants.CLUSTER_NUM)
- cnt = cnt_cluster(ks)
- self.delay_user_list = self.delay_user(rnd_df, ks, cnt)
- def anonymity_simulation(date, hour, random=True):
- """
- simulate anonymity attack, select one user randomly to update user_info
- :param date: date
- :param hour: hour
- :return: None
- """
- print("attack:", date, hour)
- start_time = datetime.strptime(f"{date} {hour}", "%Y-%m-%d %H")
- # Two attack methods, specified anonymous attack and random anonymous attack
- # Specified, random=0
- if random == False:
- attack_user_target = "537073336dcd41ff4f362d111888907c"
- # Random attack, random=1
- else:
- res = db.session.execute(
- select(User_info.user_id)
- .where(User_info.timestamp <= start_time.strftime("%Y-%m-%d %H:%M:%S"))
- .group_by(User_info.user_id)
- )
- user_list_json = list(
- map(
- lambda x: {
- "user_id": x,
- },
- res.scalars().all(),
- )
- )
- user_list = [item["user_id"] for item in user_list_json]
- attack_user_target = choice(user_list)
- return attack_user_target
- def clustering(self, date, hour, with_attack=0, with_random=1):
- """clustering function"""
- print(date, hour)
- start_time = datetime.strptime(f"{date} {hour}", "%Y-%m-%d %H")
- end_time = start_time + timedelta(hours=1)
- res = db.session.execute(
- select(User_info)
- .where(User_info.timestamp >= start_time.strftime("%Y-%m-%d %H:%M:%S"))
- .where(User_info.timestamp < end_time.strftime("%Y-%m-%d %H:%M:%S"))
- )
- # print(res.scalars().all())
- user_list = list(
- map(
- lambda x: {
- "user_id": x.user_id,
- "timestamp": x.timestamp,
- },
- res.scalars().all(),
- )
- )
- # print(user_list)
- print(f"{len(user_list)} users are selected")
- for user in user_list:
- user_id = user["user_id"]
- # user_id = hashlib.md5(int(user_id).to_bytes(8, 'big')).hexdigest() # For tweet dataset
- nym = hashlib.md5(user_id.encode("utf-8")).hexdigest()
- user_nym = Nym_info(timestamp=user["timestamp"], nym=nym)
- db.session.merge(user_nym)
- db.session.commit()
- k_filter = KShapeFilter(iter_hour=f"{date} {hour}")
- k_filter.load_data(user_list)
- k_filter.feature_extract()
- normal_user, abnormal_user = k_filter.train_predict()
- if with_attack == 1:
- target_user = self.anonymity_simulation(date, hour, random=with_random)
- if target_user not in normal_user:
- print(f"[Warning!] Anonymity Attack, user:{target_user}")
- normal_user.append(target_user)
- abnormal_user = [item for item in abnormal_user if item != target_user]
- # delete normal user from anonymity user list and record the length of the anonymity user list
- stmt = delete(Nym_info).where(Nym_info.nym.in_(normal_user))
- db.session.execute(stmt)
- db.session.commit()
- else:
- # delete all the users which is not in normal user list
- stmt = delete(Nym_info).where(
- Nym_info.nym.notin_(normal_user)
- )
- db.session.execute(stmt)
- db.session.commit()
- stmt = (
- update(Nym_info)
- .where(Nym_info.nym.in_(abnormal_user))
- )
- print(stmt)
- result = db.session.execute(stmt)
- db.session.commit()
- nyms_count = db.session.query(
- func.count(Nym_info.nym)
- ).scalar()
- print(f"fictitious user count is: {nyms_count}")
- cluster_result = Cluster_result(
- timestamp=start_time,
- normal_user=json.dumps(normal_user),
- abnormal_user=json.dumps(abnormal_user),
- normal_user_count=len(normal_user),
- abnormal_user_count=len(abnormal_user),
- nyms_count=nyms_count,
- )
- db.session.merge(cluster_result)
- db.session.commit()
- print(f"{len(abnormal_user)} abnormal users")
- return {"status": 1}
- @app.cli.command("clustering")
- @click.argument("date")
- @click.argument("hour")
- @click.option("-a", "--attack", required=True, type=int)
- @click.option("-r", "--random", required=True, type=int)
- def clustering_by_date_hour(self, date, hour, attack=0, random=1):
- print(date, hour)
- print(f"is attack?:{attack}")
- self.clustering(date, hour, with_attack=attack, with_random=random)
- return None
|