CommunicationProcessor.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. from lea import Lea
  2. from Attack.MembersMgmtCommAttack import MessageType
  3. from Attack.MembersMgmtCommAttack import Message
  4. # needed because of machine inprecision. E.g A time difference of 0.1s is stored as >0.1s
  5. EPS_TOLERANCE = 1e-13 # works for a difference of 0.1, no less
  6. def greater_than(a: float, b: float):
  7. """
  8. A greater than operator desgined to handle slight machine inprecision up to EPS_TOLERANCE.
  9. :return: True if a > b, otherwise False
  10. """
  11. return b - a < -EPS_TOLERANCE
  12. class CommunicationProcessor():
  13. """
  14. Class to process parsed input CSV/XML data and retrieve a mapping or other information.
  15. """
  16. def __init__(self, packets:list, mtypes:dict, nat:bool):
  17. """
  18. Creates an instance of CommunicationProcessor.
  19. :param packets: the list of abstract packets
  20. :param mtypes: a dict containing an int to EnumType mapping of MessageTypes
  21. :param nat: whether NAT is present in this network
  22. """
  23. self.packets = packets
  24. self.mtypes = mtypes
  25. self.nat = nat
  26. def set_mapping(self, packets: list, mapped_ids: dict):
  27. """
  28. Set the selected mapping for this communication processor.
  29. :param packets: all packets contained in the mapped time frame
  30. :param mapped_ids: the chosen IDs
  31. """
  32. self.packets = packets
  33. self.local_init_ids = set(mapped_ids)
  34. def find_interval_most_comm(self, number_ids: int, max_int_time: float):
  35. """
  36. Finds the time interval(s) of the given seconds with the most overall communication (i.e. requests and responses)
  37. that has at least number_ids communication initiators in it.
  38. :param number_ids: The number of initiator IDs that have to exist in the interval(s)
  39. :param max_int_time: The maximum time period of the interval
  40. :return: A list of triples, where each triple contains the initiator IDs, the start index and end index
  41. of the respective interval in that order. The indices are with respect to self.packets
  42. """
  43. # setup initial variables
  44. packets = self.packets
  45. mtypes = self.mtypes
  46. idx_low, idx_high = 0, 0 # the indices spanning the interval
  47. comm_sum = 0 # the communication sum of the current interval
  48. cur_highest_sum = 0 # the highest communication sum seen so far
  49. init_ids = [] # the initiator IDs seen in the current interval in order of appearance
  50. possible_intervals = [] # all intervals that have cur_highest_sum of communication and contain enough IDs
  51. # Iterate over all packets from start to finish and process the info of each packet.
  52. # Similar to a Sliding Window approach.
  53. while True:
  54. if idx_high < len(packets):
  55. cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
  56. # if current interval time exceeds maximum time period, process information of the current interval
  57. if greater_than(cur_int_time, max_int_time) or idx_high >= len(packets):
  58. interval_ids = set(init_ids)
  59. # if the interval contains enough initiator IDs, add it to possible_intervals
  60. if len(interval_ids) >= number_ids:
  61. interval = {"IDs": sorted(interval_ids), "Start": idx_low, "End": idx_high-1}
  62. # reset possible intervals if new maximum of communication is found
  63. if comm_sum > cur_highest_sum:
  64. possible_intervals = [interval]
  65. cur_highest_sum = comm_sum
  66. # append otherwise
  67. elif comm_sum == cur_highest_sum:
  68. possible_intervals.append(interval)
  69. # stop if all packets have been processed
  70. if idx_high >= len(packets):
  71. break
  72. # let idx_low "catch up" so that the current interval time fits into the maximum time period again
  73. while greater_than(cur_int_time, max_int_time):
  74. cur_packet = packets[idx_low]
  75. # if message was no timeout, delete the first appearance of the initiator ID
  76. # of this packet from the initiator list and update comm_sum
  77. if mtypes[int(cur_packet["Type"])] != MessageType.TIMEOUT:
  78. comm_sum -= 1
  79. del init_ids[0]
  80. idx_low += 1
  81. cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
  82. # consume the new packet at idx_high and process its information
  83. cur_packet = packets[idx_high]
  84. cur_mtype = mtypes[int(cur_packet["Type"])]
  85. # if message is request, add src to initiator list
  86. if MessageType.is_request(cur_mtype):
  87. init_ids.append(cur_packet["Src"])
  88. comm_sum += 1
  89. # if message is response, add dst to initiator list
  90. elif MessageType.is_response(cur_mtype):
  91. init_ids.append(cur_packet["Dst"])
  92. comm_sum += 1
  93. idx_high += 1
  94. return possible_intervals
  95. def det_id_roles_and_msgs(self):
  96. """
  97. Determine the role of every mapped ID. The role can be initiator, responder or both.
  98. On the side also connect corresponding messages together to quickly find out
  99. which reply belongs to which request and vice versa.
  100. :return: a triple as (initiator IDs, responder IDs, messages)
  101. """
  102. mtypes = self.mtypes
  103. # setup initial variables and their values
  104. respnd_ids = set()
  105. # msgs --> the filtered messages, msg_id --> an increasing ID to give every message an artificial primary key
  106. msgs, msg_id = [], 0
  107. # keep track of previous request to find connections
  108. prev_reqs = {}
  109. # used to determine whether a request has been seen yet, so that replies before the first request are skipped and do not throw an error by
  110. # accessing the empty dict prev_reqs (this is not a perfect solution, but it works most of the time)
  111. req_seen = False
  112. local_init_ids = self.local_init_ids
  113. external_init_ids = set()
  114. # process every packet individually
  115. for packet in self.packets:
  116. id_src, id_dst, msg_type, time = packet["Src"], packet["Dst"], int(packet["Type"]), float(packet["Time"])
  117. lineno = packet.get("LineNumber", -1)
  118. # if if either one of the IDs is not mapped, continue
  119. if (id_src not in local_init_ids) and (id_dst not in local_init_ids):
  120. continue
  121. # convert message type number to enum type
  122. msg_type = mtypes[msg_type]
  123. # process a request
  124. if msg_type in {MessageType.SALITY_HELLO, MessageType.SALITY_NL_REQUEST}:
  125. if not self.nat and id_dst in local_init_ids and id_src not in local_init_ids:
  126. external_init_ids.add(id_src)
  127. elif id_src not in local_init_ids:
  128. continue
  129. else:
  130. # process ID's role
  131. respnd_ids.add(id_dst)
  132. # convert the abstract message into a message object to handle it better
  133. msg_str = "{0}-{1}".format(id_src, id_dst)
  134. msg = Message(msg_id, id_src, id_dst, msg_type, time, line_no = lineno)
  135. msgs.append(msg)
  136. prev_reqs[msg_str] = msg_id
  137. msg_id += 1
  138. req_seen = True
  139. # process a reply
  140. elif msg_type in {MessageType.SALITY_HELLO_REPLY, MessageType.SALITY_NL_REPLY} and req_seen:
  141. if not self.nat and id_src in local_init_ids and id_dst not in local_init_ids:
  142. # process ID's role
  143. external_init_ids.add(id_dst)
  144. elif id_dst not in local_init_ids:
  145. continue
  146. else:
  147. # process ID's role
  148. respnd_ids.add(id_src)
  149. # convert the abstract message into a message object to handle it better
  150. msg_str = "{0}-{1}".format(id_dst, id_src)
  151. # find the request message ID for this response and set its reference index
  152. refer_idx = prev_reqs[msg_str]
  153. msgs[refer_idx].refer_msg_id = msg_id
  154. msg = Message(msg_id, id_src, id_dst, msg_type, time, refer_idx, lineno)
  155. msgs.append(msg)
  156. # remove the request to this response from storage
  157. del(prev_reqs[msg_str])
  158. msg_id += 1
  159. elif msg_type == MessageType.TIMEOUT and id_src in local_init_ids and not self.nat:
  160. # convert the abstract message into a message object to handle it better
  161. msg_str = "{0}-{1}".format(id_dst, id_src)
  162. # find the request message ID for this response and set its reference index
  163. refer_idx = prev_reqs.get(msg_str)
  164. if refer_idx is not None:
  165. msgs[refer_idx].refer_msg_id = msg_id
  166. if msgs[refer_idx].type == MessageType.SALITY_NL_REQUEST:
  167. msg = Message(msg_id, id_src, id_dst, MessageType.SALITY_NL_REPLY, time, refer_idx, lineno)
  168. else:
  169. msg = Message(msg_id, id_src, id_dst, MessageType.SALITY_HELLO_REPLY, time, refer_idx, lineno)
  170. msgs.append(msg)
  171. # remove the request to this response from storage
  172. del(prev_reqs[msg_str])
  173. msg_id += 1
  174. # store the retrieved information in this object for later use
  175. self.respnd_ids = sorted(respnd_ids)
  176. self.external_init_ids = sorted(external_init_ids)
  177. self.messages = msgs
  178. # return the retrieved information
  179. return self.local_init_ids, self.external_init_ids, self.respnd_ids, self.messages
  180. def det_ext_and_local_ids(self, prob_rspnd_local: int=0):
  181. """
  182. Map the given IDs to a locality (i.e. local or external} considering the given probabilities.
  183. :param comm_type: the type of communication (i.e. local, external or mixed)
  184. :param prob_rspnd_local: the probabilty that a responder is local
  185. """
  186. external_ids = set()
  187. local_ids = self.local_init_ids.copy()
  188. # set up probabilistic chooser
  189. rspnd_locality = Lea.fromValFreqsDict({"local": prob_rspnd_local*100, "external": (1-prob_rspnd_local)*100})
  190. for id_ in self.external_init_ids:
  191. external_ids.add(id_)
  192. # determine responder localities
  193. for id_ in self.respnd_ids:
  194. if id_ in local_ids or id_ in external_ids:
  195. continue
  196. pos = rspnd_locality.random()
  197. if pos == "local":
  198. local_ids.add(id_)
  199. elif pos == "external":
  200. external_ids.add(id_)
  201. self.local_ids, self.external_ids = local_ids, external_ids
  202. return self.local_ids, self.external_ids