cluster.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import pandas as pd
  4. from datetime import datetime, timedelta
  5. from random import choice
  6. import hashlib
  7. import click
  8. from sqlalchemy import delete, func, select, update
  9. from . import app
  10. from clustering.kshape_filter import KShapeFilter
  11. from simulation.models import Cluster_result, Nym_info, User_info, db
  12. from clustering.util import Constants, cnt_cluster
  13. from clustering.update_ts import fill_ts
  14. class Clustering:
  15. def __init__(self):
  16. self.user_list = {} # user names
  17. self.time_series = pd.DataFrame() # time series dataset, update every round
  18. self.delay_user_list = []
  19. def get_user(self, df):
  20. """Extract username from received dataframe"""
  21. users = df['user_id'].drop_duplicates().values.tolist() # users per round
  22. return users
  23. def update_user(self, user_list):
  24. """Add new user to user_list"""
  25. user = dict(zip(user_list, [0]*len(user_list)))
  26. self.user_list.update(user)
  27. def update_time_series(self, df):
  28. """Add new round of data to the time series"""
  29. self.time_series = df.combine_first(self.time_series).fillna(0) # fill NaN data for k-shape
  30. def rnd_time_series(self, df, users):
  31. """Filter out the time series of active users in this round"""
  32. rnd_df = df.loc[users]
  33. print(rnd_df.shape)
  34. return rnd_df
  35. def delay_user(self, df, model, cnt_cluster):
  36. """
  37. Filter out users who in small cluster
  38. :param cnt_cluster: count the amount of users in each cluster
  39. :param model: cluster model
  40. """
  41. cluster_s = cnt_cluster.loc[cnt_cluster['count'] <= Constants.CLUSTER_SIZE].index.tolist()
  42. new_df = pd.DataFrame()
  43. for i in cluster_s:
  44. filter_df = df[model.labels_ == i]
  45. new_df = pd.concat([filter_df, new_df], ignore_index=False)
  46. delay_user = new_df.index.tolist()
  47. if delay_user:
  48. print(f"{len(delay_user)} messages been delayed, join in next round")
  49. return delay_user
  50. def ts_cluster(self, data):
  51. users = self.get_user(data)
  52. rnd_users = self.delay_user_list + users
  53. print(f"Participate users: {len(users)} active, {len(self.delay_user_list)} delayed")
  54. df = fill_ts(data)
  55. self.update_time_series(df)
  56. rnd_df = self.rnd_time_series(self.time_series, rnd_users)
  57. ks, y_pred = KShapeFilter.k_shape(rnd_df, Constants.CLUSTER_NUM)
  58. cnt = cnt_cluster(ks)
  59. self.delay_user_list = self.delay_user(rnd_df, ks, cnt)
  60. def anonymity_simulation(date, hour, random=True):
  61. """
  62. simulate anonymity attack, select one user randomly to update user_info
  63. :param date: date
  64. :param hour: hour
  65. :return: None
  66. """
  67. print("attack:", date, hour)
  68. start_time = datetime.strptime(f"{date} {hour}", "%Y-%m-%d %H")
  69. # Two attack methods, specified anonymous attack and random anonymous attack
  70. # Specified, random=0
  71. if random == False:
  72. attack_user_target = "537073336dcd41ff4f362d111888907c"
  73. # Random attack, random=1
  74. else:
  75. res = db.session.execute(
  76. select(User_info.user_id)
  77. .where(User_info.timestamp <= start_time.strftime("%Y-%m-%d %H:%M:%S"))
  78. .group_by(User_info.user_id)
  79. )
  80. user_list_json = list(
  81. map(
  82. lambda x: {
  83. "user_id": x,
  84. },
  85. res.scalars().all(),
  86. )
  87. )
  88. user_list = [item["user_id"] for item in user_list_json]
  89. attack_user_target = choice(user_list)
  90. return attack_user_target
  91. def clustering(self, date, hour, with_attack=0, with_random=1):
  92. """clustering function"""
  93. print(date, hour)
  94. start_time = datetime.strptime(f"{date} {hour}", "%Y-%m-%d %H")
  95. end_time = start_time + timedelta(hours=1)
  96. res = db.session.execute(
  97. select(User_info)
  98. .where(User_info.timestamp >= start_time.strftime("%Y-%m-%d %H:%M:%S"))
  99. .where(User_info.timestamp < end_time.strftime("%Y-%m-%d %H:%M:%S"))
  100. )
  101. # print(res.scalars().all())
  102. user_list = list(
  103. map(
  104. lambda x: {
  105. "user_id": x.user_id,
  106. "timestamp": x.timestamp,
  107. },
  108. res.scalars().all(),
  109. )
  110. )
  111. # print(user_list)
  112. print(f"{len(user_list)} users are selected")
  113. for user in user_list:
  114. user_id = user["user_id"]
  115. # user_id = hashlib.md5(int(user_id).to_bytes(8, 'big')).hexdigest() # For tweet dataset
  116. nym = hashlib.md5(user_id.encode("utf-8")).hexdigest()
  117. user_nym = Nym_info(timestamp=user["timestamp"], nym=nym)
  118. db.session.merge(user_nym)
  119. db.session.commit()
  120. k_filter = KShapeFilter(iter_hour=f"{date} {hour}")
  121. k_filter.load_data(user_list)
  122. k_filter.feature_extract()
  123. normal_user, abnormal_user = k_filter.train_predict()
  124. if with_attack == 1:
  125. target_user = self.anonymity_simulation(date, hour, random=with_random)
  126. if target_user not in normal_user:
  127. print(f"[Warning!] Anonymity Attack, user:{target_user}")
  128. normal_user.append(target_user)
  129. abnormal_user = [item for item in abnormal_user if item != target_user]
  130. # delete normal user from anonymity user list and record the length of the anonymity user list
  131. stmt = delete(Nym_info).where(Nym_info.nym.in_(normal_user))
  132. db.session.execute(stmt)
  133. db.session.commit()
  134. else:
  135. # delete all the users which is not in normal user list
  136. stmt = delete(Nym_info).where(
  137. Nym_info.nym.notin_(normal_user)
  138. )
  139. db.session.execute(stmt)
  140. db.session.commit()
  141. stmt = (
  142. update(Nym_info)
  143. .where(Nym_info.nym.in_(abnormal_user))
  144. )
  145. print(stmt)
  146. result = db.session.execute(stmt)
  147. db.session.commit()
  148. nyms_count = db.session.query(
  149. func.count(Nym_info.nym)
  150. ).scalar()
  151. print(f"fictitious user count is: {nyms_count}")
  152. cluster_result = Cluster_result(
  153. timestamp=start_time,
  154. normal_user=json.dumps(normal_user),
  155. abnormal_user=json.dumps(abnormal_user),
  156. normal_user_count=len(normal_user),
  157. abnormal_user_count=len(abnormal_user),
  158. nyms_count=nyms_count,
  159. )
  160. db.session.merge(cluster_result)
  161. db.session.commit()
  162. print(f"{len(abnormal_user)} abnormal users")
  163. return {"status": 1}
  164. @app.cli.command("clustering")
  165. @click.argument("date")
  166. @click.argument("hour")
  167. @click.option("-a", "--attack", required=True, type=int)
  168. @click.option("-r", "--random", required=True, type=int)
  169. def clustering_by_date_hour(self, date, hour, attack=0, random=1):
  170. print(date, hour)
  171. print(f"is attack?:{attack}")
  172. self.clustering(date, hour, with_attack=attack, with_random=random)
  173. return None