CommunicationProcessor.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. from lea import Lea
  2. from random import randrange
  3. from Attack.MembersMgmtCommAttack import MessageType
  4. from Attack.MembersMgmtCommAttack import Message
  5. # needed because of machine inprecision. E.g A time difference of 0.1s is stored as >0.1s
  6. EPS_TOLERANCE = 1e-13 # works for a difference of 0.1, no less
  7. def greater_than(a: float, b: float):
  8. """
  9. A greater than operator desgined to handle slight machine inprecision up to EPS_TOLERANCE.
  10. :return: True if a > b, otherwise False
  11. """
  12. return b - a < -EPS_TOLERANCE
  13. class CommunicationProcessor():
  14. """
  15. Class to process parsed input CSV/XML data and retrieve a mapping or other information.
  16. """
  17. def __init__(self, mtypes:dict, nat:bool):
  18. """
  19. Creates an instance of CommunicationProcessor.
  20. :param packets: the list of abstract packets
  21. :param mtypes: a dict containing an int to EnumType mapping of MessageTypes
  22. :param nat: whether NAT is present in this network
  23. """
  24. self.packets = []
  25. self.mtypes = mtypes
  26. self.nat = nat
  27. def set_mapping(self, packets: list, mapped_ids: dict):
  28. """
  29. Set the selected mapping for this communication processor.
  30. :param packets: all packets contained in the mapped time frame
  31. :param mapped_ids: the chosen IDs
  32. """
  33. self.packets = packets
  34. self.local_init_ids = set(mapped_ids)
  35. def get_comm_interval(self, cpp_comm_proc, strategy: str, number_ids: int, max_int_time: int, start_idx: int, end_idx: int):
  36. """
  37. Finds a communication interval with respect to the given strategy. The interval is maximum of the given seconds
  38. and has at least number_ids communicating initiators in it.
  39. :param cpp_comm_proc: An instance of the C++ communication processor that stores all the input messages and
  40. is responsible for retrieving the interval(s)
  41. :param strategy: The selection strategy (i.e. random, optimal, custom)
  42. :param number_ids: The number of initiator IDs that have to exist in the interval(s)
  43. :param max_int_time: The maximum time period of the interval
  44. :param start_idx: The message index the interval should start at (None if not specified)
  45. :param end_idx: The message index the interval should stop at (inclusive) (None if not specified)
  46. :return: A dict representing the communication interval. It contains the initiator IDs,
  47. the start index and end index of the respective interval. The respective keys
  48. are {IDs, Start, End}. If no interval is found, an empty dict is returned.
  49. """
  50. if strategy == "random":
  51. # try finding not-empty interval 5 times
  52. for i in range(5):
  53. start_idx = randrange(0, cpp_comm_proc.get_message_count())
  54. interval = cpp_comm_proc.find_interval_from_startidx(start_idx, number_ids, max_int_time)
  55. if interval and interval["IDs"]:
  56. return interval
  57. return {}
  58. elif strategy == "optimal":
  59. intervals = cpp_comm_proc.find_optimal_interval(number_ids, max_int_time)
  60. if not intervals:
  61. return {}
  62. else:
  63. for i in range(5):
  64. interval = intervals[randrange(0, len(intervals))]
  65. if interval and interval["IDs"]:
  66. return interval
  67. return {}
  68. elif strategy == "custom":
  69. if start_idx is None:
  70. print("Custom strategy was selected, but no (valid) start index was specified.")
  71. print("Because of this, a random interval is selected.")
  72. start_idx = randrange(0, cpp_comm_proc.get_message_count())
  73. elif end_idx is not None:
  74. ids = cpp_comm_proc.get_interval_init_ids(start_idx, end_idx)
  75. if not ids:
  76. return {}
  77. return {"IDs": ids, "Start": start_idx, "End": end_idx}
  78. interval = cpp_comm_proc.find_interval_from_startidx(start_idx, number_ids, max_int_time)
  79. if not interval or not interval["IDs"]:
  80. return {}
  81. return interval
  82. def det_id_roles_and_msgs(self):
  83. """
  84. Determine the role of every mapped ID. The role can be initiator, responder or both.
  85. On the side also connect corresponding messages together to quickly find out
  86. which reply belongs to which request and vice versa.
  87. :return: the selected messages
  88. """
  89. mtypes = self.mtypes
  90. # setup initial variables and their values
  91. respnd_ids = set()
  92. # msgs --> the filtered messages, msg_id --> an increasing ID to give every message an artificial primary key
  93. msgs, msg_id = [], 0
  94. # keep track of previous request to find connections
  95. prev_reqs = {}
  96. # used to determine whether a request has been seen yet, so that replies before the first request are skipped and do not throw an error by
  97. # accessing the empty dict prev_reqs (this is not a perfect solution, but it works most of the time)
  98. req_seen = False
  99. local_init_ids = self.local_init_ids
  100. external_init_ids = set()
  101. # process every packet individually
  102. for packet in self.packets:
  103. id_src, id_dst, msg_type, time = packet["Src"], packet["Dst"], int(packet["Type"]), float(packet["Time"])
  104. lineno = packet.get("LineNumber", -1)
  105. # if if either one of the IDs is not mapped, continue
  106. if (id_src not in local_init_ids) and (id_dst not in local_init_ids):
  107. continue
  108. # convert message type number to enum type
  109. msg_type = mtypes[msg_type]
  110. # process a request
  111. if msg_type in {MessageType.SALITY_HELLO, MessageType.SALITY_NL_REQUEST}:
  112. if not self.nat and id_dst in local_init_ids and id_src not in local_init_ids:
  113. external_init_ids.add(id_src)
  114. elif id_src not in local_init_ids:
  115. continue
  116. else:
  117. # process ID's role
  118. respnd_ids.add(id_dst)
  119. # convert the abstract message into a message object to handle it better
  120. msg_str = "{0}-{1}".format(id_src, id_dst)
  121. msg = Message(msg_id, id_src, id_dst, msg_type, time, line_no = lineno)
  122. msgs.append(msg)
  123. prev_reqs[msg_str] = msg_id
  124. msg_id += 1
  125. req_seen = True
  126. # process a reply
  127. elif msg_type in {MessageType.SALITY_HELLO_REPLY, MessageType.SALITY_NL_REPLY} and req_seen:
  128. if not self.nat and id_src in local_init_ids and id_dst not in local_init_ids:
  129. # process ID's role
  130. external_init_ids.add(id_dst)
  131. elif id_dst not in local_init_ids:
  132. continue
  133. else:
  134. # process ID's role
  135. respnd_ids.add(id_src)
  136. # convert the abstract message into a message object to handle it better
  137. msg_str = "{0}-{1}".format(id_dst, id_src)
  138. # find the request message ID for this response and set its reference index
  139. refer_idx = prev_reqs[msg_str]
  140. msgs[refer_idx].refer_msg_id = msg_id
  141. msg = Message(msg_id, id_src, id_dst, msg_type, time, refer_idx, lineno)
  142. msgs.append(msg)
  143. # remove the request to this response from storage
  144. del(prev_reqs[msg_str])
  145. msg_id += 1
  146. elif msg_type == MessageType.TIMEOUT and id_src in local_init_ids and not self.nat:
  147. # convert the abstract message into a message object to handle it better
  148. msg_str = "{0}-{1}".format(id_dst, id_src)
  149. # find the request message ID for this response and set its reference index
  150. refer_idx = prev_reqs.get(msg_str)
  151. if refer_idx is not None:
  152. msgs[refer_idx].refer_msg_id = msg_id
  153. if msgs[refer_idx].type == MessageType.SALITY_NL_REQUEST:
  154. msg = Message(msg_id, id_src, id_dst, MessageType.SALITY_NL_REPLY, time, refer_idx, lineno)
  155. else:
  156. msg = Message(msg_id, id_src, id_dst, MessageType.SALITY_HELLO_REPLY, time, refer_idx, lineno)
  157. msgs.append(msg)
  158. # remove the request to this response from storage
  159. del(prev_reqs[msg_str])
  160. msg_id += 1
  161. # store the retrieved information in this object for later use
  162. self.respnd_ids = sorted(respnd_ids)
  163. self.external_init_ids = sorted(external_init_ids)
  164. self.messages = msgs
  165. # return the selected messages
  166. return self.messages
  167. def det_ext_and_local_ids(self, prob_rspnd_local: int=0):
  168. """
  169. Map the given IDs to a locality (i.e. local or external} considering the given probabilities.
  170. :param comm_type: the type of communication (i.e. local, external or mixed)
  171. :param prob_rspnd_local: the probabilty that a responder is local
  172. """
  173. external_ids = set()
  174. local_ids = self.local_init_ids.copy()
  175. # set up probabilistic chooser
  176. rspnd_locality = Lea.fromValFreqsDict({"local": prob_rspnd_local*100, "external": (1-prob_rspnd_local)*100})
  177. for id_ in self.external_init_ids:
  178. external_ids.add(id_)
  179. # determine responder localities
  180. for id_ in self.respnd_ids:
  181. if id_ in local_ids or id_ in external_ids:
  182. continue
  183. pos = rspnd_locality.random()
  184. if pos == "local":
  185. local_ids.add(id_)
  186. elif pos == "external":
  187. external_ids.add(id_)
  188. self.local_ids, self.external_ids = local_ids, external_ids
  189. return self.local_ids, self.external_ids