123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- from lea import Lea
- from Attack.MembersMgmtCommAttack import MessageType
- from Attack.MembersMgmtCommAttack import Message
- # needed because of machine inprecision. E.g A time difference of 0.1s is stored as >0.1s
- EPS_TOLERANCE = 1e-13 # works for a difference of 0.1, no less
- def greater_than(a: float, b: float):
- """
- A greater than operator desgined to handle slight machine inprecision up to EPS_TOLERANCE.
- :return: True if a > b, otherwise False
- """
- return b - a < -EPS_TOLERANCE
- class CommunicationProcessor():
- """
- Class to process parsed input CSV/XML data and retrieve a mapping or other information.
- """
- def __init__(self, packets:list, mtypes:dict, nat:bool):
- self.packets = packets
- self.mtypes = mtypes
- self.nat = nat
- def set_mapping(self, packets: list, mapped_ids: dict):
- """
- Set the selected mapping for this communication processor.
- :param packets: all packets contained in the mapped time frame
- :param mapped_ids: the chosen IDs
- """
- self.packets = packets
- self.local_init_ids = set(mapped_ids)
- def find_interval_most_comm(self, number_ids: int, max_int_time: float):
- if self.nat:
- return self._find_interval_most_comm_nat(number_ids, max_int_time)
- else:
- return self._find_interval_most_comm_nonat(number_ids, max_int_time)
- def _find_interval_most_comm_nonat(self, number_ids: int, max_int_time: float):
- """
- Finds the time interval(s) of the given seconds with the most overall communication (i.e. requests and responses)
- that has at least number_ids communication initiators in it.
- :param number_ids: The number of initiator IDs that have to exist in the interval(s)
- :param max_int_time: The maximum time period of the interval
- :return: A list of triples, where each triple contains the initiator IDs, the start index and end index
- of the respective interval in that order. The indices are with respect to self.packets
- """
- # setup initial variables
- packets = self.packets
- mtypes = self.mtypes
- idx_low, idx_high = 0, 0 # the indices spanning the interval
- comm_sum = 0 # the communication sum of the current interval
- cur_highest_sum = 0 # the highest communication sum seen so far
- ids = [] # the initiator IDs seen in the current interval in order of appearance
- possible_intervals = [] # all intervals that have cur_highest_sum of communication and contain enough IDs
- # Iterate over all packets from start to finish and process the info of each packet.
- # Similar to a Sliding Window approach.
- while True:
- if idx_high < len(packets):
- cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
-
- # if current interval time exceeds maximum time period, process information of the current interval
- if greater_than(cur_int_time, max_int_time) or idx_high >= len(packets):
- interval_ids = set(ids)
- # if the interval contains enough initiator IDs, add it to possible_intervals
- if len(interval_ids) >= number_ids:
- interval = {"IDs": sorted(interval_ids), "Start": idx_low, "End": idx_high-1}
- # reset possible intervals if new maximum of communication is found
- if comm_sum > cur_highest_sum:
- possible_intervals = [interval]
- cur_highest_sum = comm_sum
- # append otherwise
- elif comm_sum == cur_highest_sum:
- possible_intervals.append(interval)
- # stop if all packets have been processed
- if idx_high >= len(packets):
- break
- # let idx_low "catch up" so that the current interval time fits into the maximum time period again
- while greater_than(cur_int_time, max_int_time):
- cur_packet = packets[idx_low]
- # if message was no timeout, delete the first appearance of the initiator ID
- # of this packet from the initiator list and update comm_sum
- if mtypes[int(cur_packet["Type"])] != MessageType.TIMEOUT:
- comm_sum -= 1
- del ids[0]
- idx_low += 1
- cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
- # consume the new packet at idx_high and process its information
- cur_packet = packets[idx_high]
- cur_mtype = mtypes[int(cur_packet["Type"])]
- # if message is request, add src to initiator list
- if MessageType.is_request(cur_mtype):
- ids.append(cur_packet["Src"])
- comm_sum += 1
- # if message is response, add dst to initiator list
- elif MessageType.is_response(cur_mtype):
- ids.append(cur_packet["Dst"])
- comm_sum += 1
- idx_high += 1
- return possible_intervals
- def _find_interval_most_comm_nat(self, number_ids: int, max_int_time: float):
- """
- Finds the time interval(s) of the given seconds with the most communication (i.e. requests and responses)
- by the most number_ids communicative initiator IDs of the interval.
- :param number_ids: The number of initiator IDs that have to exist in the interval(s)
- :param max_int_time: The maximum time period of the interval
- :return: A list of triples, where each triple contains the initiator IDs, the start index and the end index
- of the respective interval in that order. The indices are with respect to self.packets
- """
- def get_nez_comm_amounts():
- """
- Filters out all comm_amounts that have 0 as value.
- :return: a dict with initiator IDs as keys and their non-zero communication amount as value
- """
- nez_comm_amounts = dict()
- # Iterate over comm_amounts dict and add every entry
- # with non-zero comm_amount to new dict.
- for id_ in comm_amounts:
- amount = comm_amounts[id_]
- if amount > 0:
- nez_comm_amounts[id_] = amount
- return nez_comm_amounts
- def change_comm_amounts(packet: dict, add:bool=True):
- """
- Changes the communication amount, stored in comm_amounts, of the initiating ID with respect to the
- packet specified by the given index.
- :param packet: the packet to be processed, containing src and dst ID
- :param add: If add is True, 1 is added to the communication amount of the IDs, otherwise 1 is subtracted
- """
- change = 1 if add else -1
- mtype = mtypes[int(packet["Type"])]
- id_src, id_dst = packet["Src"], packet["Dst"]
- # if message is request, src is initiator
- if MessageType.is_request(mtype):
- # if src exists in comm_amounts, add 1 to its amount
- if id_src in comm_amounts:
- comm_amounts[id_src] += change
- # else if op is add, add the ID with comm value 1 to comm_amounts
- elif change > 0:
- comm_amounts[id_src] = 1
- # if message is response, dst is initiator
- elif MessageType.is_response(mtype):
- # if src exists in comm_amounts, add 1 to its amount
- if id_dst in comm_amounts:
- comm_amounts[id_dst] += change
- # else if op is add, add the ID with comm value 1 to comm_amounts
- elif change > 0:
- comm_amounts[id_dst] = 1
- def get_comm_amount_first_ids():
- """
- Finds the number_ids IDs that communicate the most with respect to nez_comm_amounts
- :return: The picked IDs as a list and their summed message amount as a tuple like (IDs, sum).
- """
- picked_ids = [] # the IDs that have been picked
- summed_comm_amount = 0 # the summed communication amount of all picked IDs
- # sort the comm amounts to easily access the IDs with the most communication
- sorted_comm_amounts = sorted(nez_comm_amounts.items(), key=lambda x: x[1], reverse=True)
- # iterate over the sorted communication amounts
- for id_, amount in sorted_comm_amounts:
- count_picked_ids = len(picked_ids)
- # if enough IDs have been found, stop
- if count_picked_ids >= number_ids:
- break
- # else pick this ID
- picked_ids.append(id_)
- summed_comm_amount += amount
- return picked_ids, summed_comm_amount
- # setup initial variables
- packets = self.packets
- mtypes = self.mtypes
- idx_low, idx_high = 0, 0 # the indices spanning the interval
- cur_highest_sum = 0 # the highest communication sum seen so far
- # a dict containing information about what initiator ID has communicated how much
- comm_amounts = {} # entry is a tuple of (ID, amount)
- possible_intervals = [] # all intervals that have cur_highest_sum of communication and contain enough IDs
- # Iterate over all packets from start to finish and process the info of each packet.
- # Similar to a Sliding Window approach.
- while True:
- if idx_high < len(packets):
- cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
-
- # if current interval time exceeds maximum time period, process information of the current interval
- if greater_than(cur_int_time, max_int_time) or idx_high >= len(packets):
- # filter out all IDs with a zero amount of communication for the current interval
- nez_comm_amounts = get_nez_comm_amounts()
- # if the interval contains enough initiator IDs, add it to possible_intervals
- if len(nez_comm_amounts) >= number_ids:
- # pick the most communicative IDs and store their sum of communication
- picked_ids, comm_sum = get_comm_amount_first_ids()
- interval = {"IDs": picked_ids, "Start": idx_low, "End": idx_high-1}
- # reset possible intervals if new maximum of communication is found
- if comm_sum > cur_highest_sum:
- possible_intervals = [interval]
- cur_highest_sum = comm_sum
- # append otherwise
- elif comm_sum == cur_highest_sum:
- possible_intervals.append(interval)
- # stop if all packets have been processed
- if idx_high >= len(packets):
- break
- # let idx_low "catch up" so that the current interval time fits into the maximum time period again
- while greater_than(cur_int_time, max_int_time):
- # adjust communication amounts to discard the earliest packet of the current interval
- change_comm_amounts(packets[idx_low], add=False)
- idx_low += 1
- cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
- # consume the new packet at idx_high and process its information
- change_comm_amounts(packets[idx_high])
- idx_high += 1
- return possible_intervals
- def det_id_roles_and_msgs(self):
- """
- Determine the role of every mapped ID. The role can be initiator, responder or both.
- On the side also connect corresponding messages together to quickly find out
- which reply belongs to which request and vice versa.
- :return: a triple as (initiator IDs, responder IDs, messages)
- """
- mtypes = self.mtypes
- # setup initial variables and their values
- respnd_ids = set()
- # msgs --> the filtered messages, msg_id --> an increasing ID to give every message an artificial primary key
- msgs, msg_id = [], 0
- # keep track of previous request to find connections
- prev_reqs = {}
- local_init_ids = self.local_init_ids
- external_init_ids = set()
- # process every packet individually
- for packet in self.packets:
- id_src, id_dst, msg_type, time = packet["Src"], packet["Dst"], int(packet["Type"]), float(packet["Time"])
- lineno = packet.get("LineNumber", -1)
- # if if either one of the IDs is not mapped, continue
- if (id_src not in local_init_ids) and (id_dst not in local_init_ids):
- continue
- # convert message type number to enum type
- msg_type = mtypes[msg_type]
- # process a request
- if msg_type in {MessageType.SALITY_HELLO, MessageType.SALITY_NL_REQUEST}:
- if not self.nat and id_dst in local_init_ids and id_src not in local_init_ids:
- external_init_ids.add(id_src)
- elif id_src not in local_init_ids:
- continue
- else:
- # process ID's role
- respnd_ids.add(id_dst)
- # convert the abstract message into a message object to handle it better
- msg_str = "{0}-{1}".format(id_src, id_dst)
- msg = Message(msg_id, id_src, id_dst, msg_type, time, line_no = lineno)
- msgs.append(msg)
- prev_reqs[msg_str] = msg_id
- msg_id += 1
- # process a reply
- elif msg_type in {MessageType.SALITY_HELLO_REPLY, MessageType.SALITY_NL_REPLY}:
- if not self.nat and id_src in local_init_ids and id_dst not in local_init_ids:
- # process ID's role
- external_init_ids.add(id_dst)
- elif id_dst not in local_init_ids:
- continue
- else:
- # process ID's role
- respnd_ids.add(id_src)
- # convert the abstract message into a message object to handle it better
- msg_str = "{0}-{1}".format(id_dst, id_src)
- # find the request message ID for this response and set its reference index
- refer_idx = prev_reqs[msg_str]
- msgs[refer_idx].refer_msg_id = msg_id
- msg = Message(msg_id, id_src, id_dst, msg_type, time, refer_idx, lineno)
- msgs.append(msg)
- # remove the request to this response from storage
- del(prev_reqs[msg_str])
- msg_id += 1
- elif msg_type == MessageType.TIMEOUT and id_src in local_init_ids and not self.nat:
- # convert the abstract message into a message object to handle it better
- msg_str = "{0}-{1}".format(id_dst, id_src)
- # find the request message ID for this response and set its reference index
- refer_idx = prev_reqs.get(msg_str)
- if refer_idx is not None:
- msgs[refer_idx].refer_msg_id = msg_id
- if msgs[refer_idx].type == MessageType.SALITY_NL_REQUEST:
- msg = Message(msg_id, id_src, id_dst, MessageType.SALITY_NL_REPLY, time, refer_idx, lineno)
- else:
- msg = Message(msg_id, id_src, id_dst, MessageType.SALITY_HELLO_REPLY, time, refer_idx, lineno)
- msgs.append(msg)
- # remove the request to this response from storage
- del(prev_reqs[msg_str])
- msg_id += 1
- # store the retrieved information in this object for later use
- self.respnd_ids = sorted(respnd_ids)
- self.external_init_ids = sorted(external_init_ids)
- self.messages = msgs
- # return the retrieved information
- return self.local_init_ids, self.external_init_ids, self.respnd_ids, self.messages
- def det_ext_and_local_ids(self, prob_rspnd_local: int):
- """
- Map the given IDs to a locality (i.e. local or external} considering the given probabilities.
- :param comm_type: the type of communication (i.e. local, external or mixed)
- :param prob_rspnd_local: the probabilty that a responder is local
- """
- external_ids = set()
- local_ids = self.local_init_ids.copy()
-
- # set up probabilistic chooser
- rspnd_locality = Lea.fromValFreqsDict({"local": prob_rspnd_local*100, "external": (1-prob_rspnd_local)*100})
- for id_ in self.external_init_ids:
- external_ids.add(id_)
- # determine responder localities
- for id_ in self.respnd_ids:
- if id_ in local_ids or id_ in external_ids:
- continue
-
- pos = rspnd_locality.random()
- if pos == "local":
- local_ids.add(id_)
- elif pos == "external":
- external_ids.add(id_)
- self.local_ids, self.external_ids = local_ids, external_ids
- return self.local_ids, self.external_ids
|