from lea import Lea from Attack.MembersMgmtCommAttack import MessageType from Attack.MembersMgmtCommAttack import Message # needed because of machine inprecision. E.g A time difference of 0.1s is stored as >0.1s EPS_TOLERANCE = 1e-13 # works for a difference of 0.1, no less def greater_than(a: float, b: float): """ A greater than operator desgined to handle slight machine inprecision up to EPS_TOLERANCE. :return: True if a > b, otherwise False """ return b - a < -EPS_TOLERANCE class CommunicationProcessor(): """ Class to process parsed input CSV/XML data and retrieve a mapping or other information. """ def __init__(self, packets:list, mtypes:dict, nat:bool): self.packets = packets self.mtypes = mtypes self.nat = nat def set_mapping(self, packets: list, mapped_ids: dict): """ Set the selected mapping for this communication processor. :param packets: all packets contained in the mapped time frame :param mapped_ids: the chosen IDs """ self.packets = packets self.local_init_ids = set(mapped_ids) def find_interval_most_comm(self, number_ids: int, max_int_time: float): if self.nat: return self._find_interval_most_comm_nat(number_ids, max_int_time) else: return self._find_interval_most_comm_nonat(number_ids, max_int_time) def _find_interval_most_comm_nonat(self, number_ids: int, max_int_time: float): """ Finds the time interval(s) of the given seconds with the most overall communication (i.e. requests and responses) that has at least number_ids communication initiators in it. :param number_ids: The number of initiator IDs that have to exist in the interval(s) :param max_int_time: The maximum time period of the interval :return: A list of triples, where each triple contains the initiator IDs, the start index and end index of the respective interval in that order. The indices are with respect to self.packets """ # setup initial variables packets = self.packets mtypes = self.mtypes idx_low, idx_high = 0, 0 # the indices spanning the interval comm_sum = 0 # the communication sum of the current interval cur_highest_sum = 0 # the highest communication sum seen so far ids = [] # the initiator IDs seen in the current interval in order of appearance possible_intervals = [] # all intervals that have cur_highest_sum of communication and contain enough IDs # Iterate over all packets from start to finish and process the info of each packet. # Similar to a Sliding Window approach. while True: if idx_high < len(packets): cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"]) # if current interval time exceeds maximum time period, process information of the current interval if greater_than(cur_int_time, max_int_time) or idx_high >= len(packets): interval_ids = set(ids) # if the interval contains enough initiator IDs, add it to possible_intervals if len(interval_ids) >= number_ids: interval = {"IDs": sorted(interval_ids), "Start": idx_low, "End": idx_high-1} # reset possible intervals if new maximum of communication is found if comm_sum > cur_highest_sum: possible_intervals = [interval] cur_highest_sum = comm_sum # append otherwise elif comm_sum == cur_highest_sum: possible_intervals.append(interval) # stop if all packets have been processed if idx_high >= len(packets): break # let idx_low "catch up" so that the current interval time fits into the maximum time period again while greater_than(cur_int_time, max_int_time): cur_packet = packets[idx_low] # if message was no timeout, delete the first appearance of the initiator ID # of this packet from the initiator list and update comm_sum if mtypes[int(cur_packet["Type"])] != MessageType.TIMEOUT: comm_sum -= 1 del ids[0] idx_low += 1 cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"]) # consume the new packet at idx_high and process its information cur_packet = packets[idx_high] cur_mtype = mtypes[int(cur_packet["Type"])] # if message is request, add src to initiator list if MessageType.is_request(cur_mtype): ids.append(cur_packet["Src"]) comm_sum += 1 # if message is response, add dst to initiator list elif MessageType.is_response(cur_mtype): ids.append(cur_packet["Dst"]) comm_sum += 1 idx_high += 1 return possible_intervals def _find_interval_most_comm_nat(self, number_ids: int, max_int_time: float): """ Finds the time interval(s) of the given seconds with the most communication (i.e. requests and responses) by the most number_ids communicative initiator IDs of the interval. :param number_ids: The number of initiator IDs that have to exist in the interval(s) :param max_int_time: The maximum time period of the interval :return: A list of triples, where each triple contains the initiator IDs, the start index and the end index of the respective interval in that order. The indices are with respect to self.packets """ def get_nez_comm_amounts(): """ Filters out all comm_amounts that have 0 as value. :return: a dict with initiator IDs as keys and their non-zero communication amount as value """ nez_comm_amounts = dict() # Iterate over comm_amounts dict and add every entry # with non-zero comm_amount to new dict. for id_ in comm_amounts: amount = comm_amounts[id_] if amount > 0: nez_comm_amounts[id_] = amount return nez_comm_amounts def change_comm_amounts(packet: dict, add:bool=True): """ Changes the communication amount, stored in comm_amounts, of the initiating ID with respect to the packet specified by the given index. :param packet: the packet to be processed, containing src and dst ID :param add: If add is True, 1 is added to the communication amount of the IDs, otherwise 1 is subtracted """ change = 1 if add else -1 mtype = mtypes[int(packet["Type"])] id_src, id_dst = packet["Src"], packet["Dst"] # if message is request, src is initiator if MessageType.is_request(mtype): # if src exists in comm_amounts, add 1 to its amount if id_src in comm_amounts: comm_amounts[id_src] += change # else if op is add, add the ID with comm value 1 to comm_amounts elif change > 0: comm_amounts[id_src] = 1 # if message is response, dst is initiator elif MessageType.is_response(mtype): # if src exists in comm_amounts, add 1 to its amount if id_dst in comm_amounts: comm_amounts[id_dst] += change # else if op is add, add the ID with comm value 1 to comm_amounts elif change > 0: comm_amounts[id_dst] = 1 def get_comm_amount_first_ids(): """ Finds the number_ids IDs that communicate the most with respect to nez_comm_amounts :return: The picked IDs as a list and their summed message amount as a tuple like (IDs, sum). """ picked_ids = [] # the IDs that have been picked summed_comm_amount = 0 # the summed communication amount of all picked IDs # sort the comm amounts to easily access the IDs with the most communication sorted_comm_amounts = sorted(nez_comm_amounts.items(), key=lambda x: x[1], reverse=True) # iterate over the sorted communication amounts for id_, amount in sorted_comm_amounts: count_picked_ids = len(picked_ids) # if enough IDs have been found, stop if count_picked_ids >= number_ids: break # else pick this ID picked_ids.append(id_) summed_comm_amount += amount return picked_ids, summed_comm_amount # setup initial variables packets = self.packets mtypes = self.mtypes idx_low, idx_high = 0, 0 # the indices spanning the interval cur_highest_sum = 0 # the highest communication sum seen so far # a dict containing information about what initiator ID has communicated how much comm_amounts = {} # entry is a tuple of (ID, amount) possible_intervals = [] # all intervals that have cur_highest_sum of communication and contain enough IDs # Iterate over all packets from start to finish and process the info of each packet. # Similar to a Sliding Window approach. while True: if idx_high < len(packets): cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"]) # if current interval time exceeds maximum time period, process information of the current interval if greater_than(cur_int_time, max_int_time) or idx_high >= len(packets): # filter out all IDs with a zero amount of communication for the current interval nez_comm_amounts = get_nez_comm_amounts() # if the interval contains enough initiator IDs, add it to possible_intervals if len(nez_comm_amounts) >= number_ids: # pick the most communicative IDs and store their sum of communication picked_ids, comm_sum = get_comm_amount_first_ids() interval = {"IDs": picked_ids, "Start": idx_low, "End": idx_high-1} # reset possible intervals if new maximum of communication is found if comm_sum > cur_highest_sum: possible_intervals = [interval] cur_highest_sum = comm_sum # append otherwise elif comm_sum == cur_highest_sum: possible_intervals.append(interval) # stop if all packets have been processed if idx_high >= len(packets): break # let idx_low "catch up" so that the current interval time fits into the maximum time period again while greater_than(cur_int_time, max_int_time): # adjust communication amounts to discard the earliest packet of the current interval change_comm_amounts(packets[idx_low], add=False) idx_low += 1 cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"]) # consume the new packet at idx_high and process its information change_comm_amounts(packets[idx_high]) idx_high += 1 return possible_intervals def det_id_roles_and_msgs(self): """ Determine the role of every mapped ID. The role can be initiator, responder or both. On the side also connect corresponding messages together to quickly find out which reply belongs to which request and vice versa. :return: a triple as (initiator IDs, responder IDs, messages) """ mtypes = self.mtypes # setup initial variables and their values respnd_ids = set() # msgs --> the filtered messages, msg_id --> an increasing ID to give every message an artificial primary key msgs, msg_id = [], 0 # keep track of previous request to find connections prev_reqs = {} local_init_ids = self.local_init_ids external_init_ids = set() # process every packet individually for packet in self.packets: id_src, id_dst, msg_type, time = packet["Src"], packet["Dst"], int(packet["Type"]), float(packet["Time"]) lineno = packet.get("LineNumber", -1) # if if either one of the IDs is not mapped, continue if (id_src not in local_init_ids) and (id_dst not in local_init_ids): continue # convert message type number to enum type msg_type = mtypes[msg_type] # process a request if msg_type in {MessageType.SALITY_HELLO, MessageType.SALITY_NL_REQUEST}: if not self.nat and id_dst in local_init_ids and id_src not in local_init_ids: external_init_ids.add(id_src) elif id_src not in local_init_ids: continue else: # process ID's role respnd_ids.add(id_dst) # convert the abstract message into a message object to handle it better msg_str = "{0}-{1}".format(id_src, id_dst) msg = Message(msg_id, id_src, id_dst, msg_type, time, line_no = lineno) msgs.append(msg) prev_reqs[msg_str] = msg_id msg_id += 1 # process a reply elif msg_type in {MessageType.SALITY_HELLO_REPLY, MessageType.SALITY_NL_REPLY}: if not self.nat and id_src in local_init_ids and id_dst not in local_init_ids: # process ID's role external_init_ids.add(id_dst) elif id_dst not in local_init_ids: continue else: # process ID's role respnd_ids.add(id_src) # convert the abstract message into a message object to handle it better msg_str = "{0}-{1}".format(id_dst, id_src) # find the request message ID for this response and set its reference index refer_idx = prev_reqs[msg_str] msgs[refer_idx].refer_msg_id = msg_id msg = Message(msg_id, id_src, id_dst, msg_type, time, refer_idx, lineno) msgs.append(msg) # remove the request to this response from storage del(prev_reqs[msg_str]) msg_id += 1 elif msg_type == MessageType.TIMEOUT and id_src in local_init_ids and not self.nat: # convert the abstract message into a message object to handle it better msg_str = "{0}-{1}".format(id_dst, id_src) # find the request message ID for this response and set its reference index refer_idx = prev_reqs.get(msg_str) if refer_idx is not None: msgs[refer_idx].refer_msg_id = msg_id if msgs[refer_idx].type == MessageType.SALITY_NL_REQUEST: msg = Message(msg_id, id_src, id_dst, MessageType.SALITY_NL_REPLY, time, refer_idx, lineno) else: msg = Message(msg_id, id_src, id_dst, MessageType.SALITY_HELLO_REPLY, time, refer_idx, lineno) msgs.append(msg) # remove the request to this response from storage del(prev_reqs[msg_str]) msg_id += 1 # store the retrieved information in this object for later use self.respnd_ids = sorted(respnd_ids) self.external_init_ids = sorted(external_init_ids) self.messages = msgs # return the retrieved information return self.local_init_ids, self.external_init_ids, self.respnd_ids, self.messages def det_ext_and_local_ids(self, prob_rspnd_local: int): """ Map the given IDs to a locality (i.e. local or external} considering the given probabilities. :param comm_type: the type of communication (i.e. local, external or mixed) :param prob_rspnd_local: the probabilty that a responder is local """ external_ids = set() local_ids = self.local_init_ids.copy() # set up probabilistic chooser rspnd_locality = Lea.fromValFreqsDict({"local": prob_rspnd_local*100, "external": (1-prob_rspnd_local)*100}) for id_ in self.external_init_ids: external_ids.add(id_) # determine responder localities for id_ in self.respnd_ids: if id_ in local_ids or id_ in external_ids: continue pos = rspnd_locality.random() if pos == "local": local_ids.add(id_) elif pos == "external": external_ids.add(id_) self.local_ids, self.external_ids = local_ids, external_ids return self.local_ids, self.external_ids