CommunicationProcessor.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. from lea import Lea
  2. from Attack.MembersMgmtCommAttack import MessageType
  3. from Attack.MembersMgmtCommAttack import Message
  4. # needed because of machine inprecision. E.g A time difference of 0.1s is stored as >0.1s
  5. EPS_TOLERANCE = 1e-13 # works for a difference of 0.1, no less
  6. def greater_than(a: float, b: float):
  7. """
  8. A greater than operator desgined to handle slight machine inprecision up to EPS_TOLERANCE.
  9. :return: True if a > b, otherwise False
  10. """
  11. return b - a < -EPS_TOLERANCE
  12. class CommunicationProcessor():
  13. """
  14. Class to process parsed input CSV/XML data and retrieve a mapping or other information.
  15. """
  16. def __init__(self, packets:list, mtypes:dict, nat:bool):
  17. self.packets = packets
  18. self.mtypes = mtypes
  19. self.nat = nat
  20. def set_mapping(self, packets: list, mapped_ids: dict):
  21. """
  22. Set the selected mapping for this communication processor.
  23. :param packets: all packets contained in the mapped time frame
  24. :param mapped_ids: the chosen IDs
  25. """
  26. self.packets = packets
  27. self.local_init_ids = set(mapped_ids)
  28. def find_interval_most_comm(self, number_ids: int, max_int_time: float):
  29. if self.nat:
  30. return self._find_interval_most_comm_nat(number_ids, max_int_time)
  31. else:
  32. return self._find_interval_most_comm_nonat(number_ids, max_int_time)
  33. def _find_interval_most_comm_nonat(self, number_ids: int, max_int_time: float):
  34. """
  35. Finds the time interval(s) of the given seconds with the most overall communication (i.e. requests and responses)
  36. that has at least number_ids communication initiators in it.
  37. :param number_ids: The number of initiator IDs that have to exist in the interval(s)
  38. :param max_int_time: The maximum time period of the interval
  39. :return: A list of triples, where each triple contains the initiator IDs, the start index and end index
  40. of the respective interval in that order. The indices are with respect to self.packets
  41. """
  42. # setup initial variables
  43. packets = self.packets
  44. mtypes = self.mtypes
  45. idx_low, idx_high = 0, 0 # the indices spanning the interval
  46. comm_sum = 0 # the communication sum of the current interval
  47. cur_highest_sum = 0 # the highest communication sum seen so far
  48. ids = [] # the initiator IDs seen in the current interval in order of appearance
  49. possible_intervals = [] # all intervals that have cur_highest_sum of communication and contain enough IDs
  50. # Iterate over all packets from start to finish and process the info of each packet.
  51. # Similar to a Sliding Window approach.
  52. while True:
  53. if idx_high < len(packets):
  54. cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
  55. # if current interval time exceeds maximum time period, process information of the current interval
  56. if greater_than(cur_int_time, max_int_time) or idx_high >= len(packets):
  57. interval_ids = set(ids)
  58. # if the interval contains enough initiator IDs, add it to possible_intervals
  59. if len(interval_ids) >= number_ids:
  60. interval = {"IDs": sorted(interval_ids), "Start": idx_low, "End": idx_high-1}
  61. # reset possible intervals if new maximum of communication is found
  62. if comm_sum > cur_highest_sum:
  63. possible_intervals = [interval]
  64. cur_highest_sum = comm_sum
  65. # append otherwise
  66. elif comm_sum == cur_highest_sum:
  67. possible_intervals.append(interval)
  68. # stop if all packets have been processed
  69. if idx_high >= len(packets):
  70. break
  71. # let idx_low "catch up" so that the current interval time fits into the maximum time period again
  72. while greater_than(cur_int_time, max_int_time):
  73. cur_packet = packets[idx_low]
  74. # if message was no timeout, delete the first appearance of the initiator ID
  75. # of this packet from the initiator list and update comm_sum
  76. if mtypes[int(cur_packet["Type"])] != MessageType.TIMEOUT:
  77. comm_sum -= 1
  78. del ids[0]
  79. idx_low += 1
  80. cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
  81. # consume the new packet at idx_high and process its information
  82. cur_packet = packets[idx_high]
  83. cur_mtype = mtypes[int(cur_packet["Type"])]
  84. # if message is request, add src to initiator list
  85. if MessageType.is_request(cur_mtype):
  86. ids.append(cur_packet["Src"])
  87. comm_sum += 1
  88. # if message is response, add dst to initiator list
  89. elif MessageType.is_response(cur_mtype):
  90. ids.append(cur_packet["Dst"])
  91. comm_sum += 1
  92. idx_high += 1
  93. return possible_intervals
  94. def _find_interval_most_comm_nat(self, number_ids: int, max_int_time: float):
  95. """
  96. Finds the time interval(s) of the given seconds with the most communication (i.e. requests and responses)
  97. by the most number_ids communicative initiator IDs of the interval.
  98. :param number_ids: The number of initiator IDs that have to exist in the interval(s)
  99. :param max_int_time: The maximum time period of the interval
  100. :return: A list of triples, where each triple contains the initiator IDs, the start index and the end index
  101. of the respective interval in that order. The indices are with respect to self.packets
  102. """
  103. def get_nez_comm_amounts():
  104. """
  105. Filters out all comm_amounts that have 0 as value.
  106. :return: a dict with initiator IDs as keys and their non-zero communication amount as value
  107. """
  108. nez_comm_amounts = dict()
  109. # Iterate over comm_amounts dict and add every entry
  110. # with non-zero comm_amount to new dict.
  111. for id_ in comm_amounts:
  112. amount = comm_amounts[id_]
  113. if amount > 0:
  114. nez_comm_amounts[id_] = amount
  115. return nez_comm_amounts
  116. def change_comm_amounts(packet: dict, add:bool=True):
  117. """
  118. Changes the communication amount, stored in comm_amounts, of the initiating ID with respect to the
  119. packet specified by the given index.
  120. :param packet: the packet to be processed, containing src and dst ID
  121. :param add: If add is True, 1 is added to the communication amount of the IDs, otherwise 1 is subtracted
  122. """
  123. change = 1 if add else -1
  124. mtype = mtypes[int(packet["Type"])]
  125. id_src, id_dst = packet["Src"], packet["Dst"]
  126. # if message is request, src is initiator
  127. if MessageType.is_request(mtype):
  128. # if src exists in comm_amounts, add 1 to its amount
  129. if id_src in comm_amounts:
  130. comm_amounts[id_src] += change
  131. # else if op is add, add the ID with comm value 1 to comm_amounts
  132. elif change > 0:
  133. comm_amounts[id_src] = 1
  134. # if message is response, dst is initiator
  135. elif MessageType.is_response(mtype):
  136. # if src exists in comm_amounts, add 1 to its amount
  137. if id_dst in comm_amounts:
  138. comm_amounts[id_dst] += change
  139. # else if op is add, add the ID with comm value 1 to comm_amounts
  140. elif change > 0:
  141. comm_amounts[id_dst] = 1
  142. def get_comm_amount_first_ids():
  143. """
  144. Finds the number_ids IDs that communicate the most with respect to nez_comm_amounts
  145. :return: The picked IDs as a list and their summed message amount as a tuple like (IDs, sum).
  146. """
  147. picked_ids = [] # the IDs that have been picked
  148. summed_comm_amount = 0 # the summed communication amount of all picked IDs
  149. # sort the comm amounts to easily access the IDs with the most communication
  150. sorted_comm_amounts = sorted(nez_comm_amounts.items(), key=lambda x: x[1], reverse=True)
  151. # iterate over the sorted communication amounts
  152. for id_, amount in sorted_comm_amounts:
  153. count_picked_ids = len(picked_ids)
  154. # if enough IDs have been found, stop
  155. if count_picked_ids >= number_ids:
  156. break
  157. # else pick this ID
  158. picked_ids.append(id_)
  159. summed_comm_amount += amount
  160. return picked_ids, summed_comm_amount
  161. # setup initial variables
  162. packets = self.packets
  163. mtypes = self.mtypes
  164. idx_low, idx_high = 0, 0 # the indices spanning the interval
  165. cur_highest_sum = 0 # the highest communication sum seen so far
  166. # a dict containing information about what initiator ID has communicated how much
  167. comm_amounts = {} # entry is a tuple of (ID, amount)
  168. possible_intervals = [] # all intervals that have cur_highest_sum of communication and contain enough IDs
  169. # Iterate over all packets from start to finish and process the info of each packet.
  170. # Similar to a Sliding Window approach.
  171. while True:
  172. if idx_high < len(packets):
  173. cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
  174. # if current interval time exceeds maximum time period, process information of the current interval
  175. if greater_than(cur_int_time, max_int_time) or idx_high >= len(packets):
  176. # filter out all IDs with a zero amount of communication for the current interval
  177. nez_comm_amounts = get_nez_comm_amounts()
  178. # if the interval contains enough initiator IDs, add it to possible_intervals
  179. if len(nez_comm_amounts) >= number_ids:
  180. # pick the most communicative IDs and store their sum of communication
  181. picked_ids, comm_sum = get_comm_amount_first_ids()
  182. interval = {"IDs": picked_ids, "Start": idx_low, "End": idx_high-1}
  183. # reset possible intervals if new maximum of communication is found
  184. if comm_sum > cur_highest_sum:
  185. possible_intervals = [interval]
  186. cur_highest_sum = comm_sum
  187. # append otherwise
  188. elif comm_sum == cur_highest_sum:
  189. possible_intervals.append(interval)
  190. # stop if all packets have been processed
  191. if idx_high >= len(packets):
  192. break
  193. # let idx_low "catch up" so that the current interval time fits into the maximum time period again
  194. while greater_than(cur_int_time, max_int_time):
  195. # adjust communication amounts to discard the earliest packet of the current interval
  196. change_comm_amounts(packets[idx_low], add=False)
  197. idx_low += 1
  198. cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
  199. # consume the new packet at idx_high and process its information
  200. change_comm_amounts(packets[idx_high])
  201. idx_high += 1
  202. return possible_intervals
  203. def det_id_roles_and_msgs(self):
  204. """
  205. Determine the role of every mapped ID. The role can be initiator, responder or both.
  206. On the side also connect corresponding messages together to quickly find out
  207. which reply belongs to which request and vice versa.
  208. :return: a triple as (initiator IDs, responder IDs, messages)
  209. """
  210. mtypes = self.mtypes
  211. # setup initial variables and their values
  212. respnd_ids = set()
  213. # msgs --> the filtered messages, msg_id --> an increasing ID to give every message an artificial primary key
  214. msgs, msg_id = [], 0
  215. # keep track of previous request to find connections
  216. prev_reqs = {}
  217. local_init_ids = self.local_init_ids
  218. external_init_ids = set()
  219. # process every packet individually
  220. for packet in self.packets:
  221. id_src, id_dst, msg_type, time = packet["Src"], packet["Dst"], int(packet["Type"]), float(packet["Time"])
  222. lineno = packet.get("LineNumber", -1)
  223. # if if either one of the IDs is not mapped, continue
  224. if (id_src not in local_init_ids) and (id_dst not in local_init_ids):
  225. continue
  226. # convert message type number to enum type
  227. msg_type = mtypes[msg_type]
  228. # process a request
  229. if msg_type in {MessageType.SALITY_HELLO, MessageType.SALITY_NL_REQUEST}:
  230. if not self.nat and id_dst in local_init_ids and id_src not in local_init_ids:
  231. external_init_ids.add(id_src)
  232. elif id_src not in local_init_ids:
  233. continue
  234. else:
  235. # process ID's role
  236. respnd_ids.add(id_dst)
  237. # convert the abstract message into a message object to handle it better
  238. msg_str = "{0}-{1}".format(id_src, id_dst)
  239. msg = Message(msg_id, id_src, id_dst, msg_type, time, line_no = lineno)
  240. msgs.append(msg)
  241. prev_reqs[msg_str] = msg_id
  242. msg_id += 1
  243. # process a reply
  244. elif msg_type in {MessageType.SALITY_HELLO_REPLY, MessageType.SALITY_NL_REPLY}:
  245. if not self.nat and id_src in local_init_ids and id_dst not in local_init_ids:
  246. # process ID's role
  247. external_init_ids.add(id_dst)
  248. elif id_dst not in local_init_ids:
  249. continue
  250. else:
  251. # process ID's role
  252. respnd_ids.add(id_src)
  253. # convert the abstract message into a message object to handle it better
  254. msg_str = "{0}-{1}".format(id_dst, id_src)
  255. # find the request message ID for this response and set its reference index
  256. refer_idx = prev_reqs[msg_str]
  257. msgs[refer_idx].refer_msg_id = msg_id
  258. msg = Message(msg_id, id_src, id_dst, msg_type, time, refer_idx, lineno)
  259. msgs.append(msg)
  260. # remove the request to this response from storage
  261. del(prev_reqs[msg_str])
  262. msg_id += 1
  263. elif msg_type == MessageType.TIMEOUT and id_src in local_init_ids and not self.nat:
  264. # convert the abstract message into a message object to handle it better
  265. msg_str = "{0}-{1}".format(id_dst, id_src)
  266. # find the request message ID for this response and set its reference index
  267. refer_idx = prev_reqs.get(msg_str)
  268. if refer_idx is not None:
  269. msgs[refer_idx].refer_msg_id = msg_id
  270. if msgs[refer_idx].type == MessageType.SALITY_NL_REQUEST:
  271. msg = Message(msg_id, id_src, id_dst, MessageType.SALITY_NL_REPLY, time, refer_idx, lineno)
  272. else:
  273. msg = Message(msg_id, id_src, id_dst, MessageType.SALITY_HELLO_REPLY, time, refer_idx, lineno)
  274. msgs.append(msg)
  275. # remove the request to this response from storage
  276. del(prev_reqs[msg_str])
  277. msg_id += 1
  278. # store the retrieved information in this object for later use
  279. self.respnd_ids = sorted(respnd_ids)
  280. self.external_init_ids = sorted(external_init_ids)
  281. self.messages = msgs
  282. # return the retrieved information
  283. return self.local_init_ids, self.external_init_ids, self.respnd_ids, self.messages
  284. def det_ext_and_local_ids(self, prob_rspnd_local: int):
  285. """
  286. Map the given IDs to a locality (i.e. local or external} considering the given probabilities.
  287. :param comm_type: the type of communication (i.e. local, external or mixed)
  288. :param prob_rspnd_local: the probabilty that a responder is local
  289. """
  290. external_ids = set()
  291. local_ids = self.local_init_ids.copy()
  292. # set up probabilistic chooser
  293. rspnd_locality = Lea.fromValFreqsDict({"local": prob_rspnd_local*100, "external": (1-prob_rspnd_local)*100})
  294. for id_ in self.external_init_ids:
  295. external_ids.add(id_)
  296. # determine responder localities
  297. for id_ in self.respnd_ids:
  298. if id_ in local_ids or id_ in external_ids:
  299. continue
  300. pos = rspnd_locality.random()
  301. if pos == "local":
  302. local_ids.add(id_)
  303. elif pos == "external":
  304. external_ids.add(id_)
  305. self.local_ids, self.external_ids = local_ids, external_ids
  306. return self.local_ids, self.external_ids