|
@@ -10,21 +10,21 @@ class CommunicationProcessor():
|
|
|
Class to process parsed input CSV/XML data and retrieve a mapping or other information.
|
|
|
"""
|
|
|
|
|
|
- def __init__(self, packets):
|
|
|
+ def __init__(self, packets:list, mtypes:dict):
|
|
|
self.packets = packets
|
|
|
+ self.mtypes = mtypes
|
|
|
|
|
|
- def set_mapping(self, packets: list, mapped_ids: dict, id_comms:dict):
|
|
|
+
|
|
|
+ def set_mapping(self, packets: list, mapped_ids: dict):
|
|
|
"""
|
|
|
Set the selected mapping for this communication processor.
|
|
|
|
|
|
:param packets: all packets contained in the mapped time frame
|
|
|
:param mapped_ids: the chosen IDs
|
|
|
- :param id_comms: the communications between the mapped IDs within the mapped interval
|
|
|
"""
|
|
|
self.packets = packets
|
|
|
- self.ids = mapped_ids.keys()
|
|
|
- self.id_comms = id_comms
|
|
|
- self.indv_id_counts = mapped_ids
|
|
|
+ self.init_ids = set(mapped_ids.keys())
|
|
|
+
|
|
|
|
|
|
def find_interval_with_most_comm(self, number_ids: int, max_int_time: float):
|
|
|
"""
|
|
@@ -36,17 +36,18 @@ class CommunicationProcessor():
|
|
|
:return: A triple consisting of the IDs, as well as start and end idx with respect to the given packets.
|
|
|
"""
|
|
|
packets = self.packets
|
|
|
+ mtypes = self.mtypes
|
|
|
|
|
|
- def get_nez_msg_counts(msg_counts: dict):
|
|
|
+ def get_nez_comm_counts(comm_counts: dict):
|
|
|
"""
|
|
|
Filters out all msg_counts that have 0 as value
|
|
|
"""
|
|
|
- nez_msg_counts = dict()
|
|
|
- for msg in msg_counts.keys():
|
|
|
- count = msg_counts[msg]
|
|
|
+ nez_comm_counts = dict()
|
|
|
+ for id_ in comm_counts.keys():
|
|
|
+ count = comm_counts[id_]
|
|
|
if count > 0:
|
|
|
- nez_msg_counts[msg] = count
|
|
|
- return nez_msg_counts
|
|
|
+ nez_comm_counts[id_] = count
|
|
|
+ return nez_comm_counts
|
|
|
|
|
|
def greater_than(a: float, b: float):
|
|
|
"""
|
|
@@ -55,102 +56,52 @@ class CommunicationProcessor():
|
|
|
"""
|
|
|
return b - a < -EPS_TOLERANCE
|
|
|
|
|
|
- def change_msg_counts(msg_counts: dict, idx: int, add=True):
|
|
|
+ def change_comm_counts(comm_counts: dict, idx: int, add=True):
|
|
|
"""
|
|
|
- Changes the value of the message count of the message occuring in the packet specified by the given index.
|
|
|
- Adds 1 if add is True and subtracts 1 otherwise.
|
|
|
+ Changes the communication count, stored in comm_counts, of the initiating ID with respect to the
|
|
|
+ packet specified by the given index. If add is True, 1 is added to the value, otherwise 1 is subtracted.
|
|
|
"""
|
|
|
change = 1 if add else -1
|
|
|
+ mtype = mtypes[int(packets[idx]["Type"])]
|
|
|
id_src, id_dst = packets[idx]["Src"], packets[idx]["Dst"]
|
|
|
- src_to_dst = "{0}-{1}".format(id_src, id_dst)
|
|
|
- dst_to_src = "{0}-{1}".format(id_dst, id_src)
|
|
|
-
|
|
|
- if src_to_dst in msg_counts.keys():
|
|
|
- msg_counts[src_to_dst] += change
|
|
|
- elif dst_to_src in msg_counts.keys():
|
|
|
- msg_counts[dst_to_src] += change
|
|
|
- elif add:
|
|
|
- msg_counts[src_to_dst] = 1
|
|
|
-
|
|
|
- def count_ids_in_msg_counts(msg_counts: dict):
|
|
|
- """
|
|
|
- Counts all ids that are involved in messages with a non zero message count
|
|
|
- """
|
|
|
- ids = set()
|
|
|
- for msg in msg_counts.keys():
|
|
|
- src, dst = msg.split("-")
|
|
|
- ids.add(dst)
|
|
|
- ids.add(src)
|
|
|
- return len(ids)
|
|
|
-
|
|
|
- def get_msg_count_first_ids(msg_counts: list):
|
|
|
+ if mtype in {MessageType.SALITY_HELLO, MessageType.SALITY_NL_REQUEST}:
|
|
|
+ if id_src in comm_counts:
|
|
|
+ comm_counts[id_src] += change
|
|
|
+ elif change > 0:
|
|
|
+ comm_counts[id_src] = 1
|
|
|
+ elif mtype in {MessageType.SALITY_HELLO_REPLY, MessageType.SALITY_NL_REPLY}:
|
|
|
+ if id_dst in comm_counts:
|
|
|
+ comm_counts[id_dst] += change
|
|
|
+ elif change > 0:
|
|
|
+ comm_counts[id_dst] = 1
|
|
|
+
|
|
|
+ def get_comm_count_first_ids(comm_counts: list):
|
|
|
"""
|
|
|
Finds the IDs that communicate among themselves the most with respect to the given message counts.
|
|
|
:param msg_counts: a sorted list of message counts where each entry is a tuple of key and value
|
|
|
:return: The picked IDs and their total message count as a tuple
|
|
|
"""
|
|
|
# if order of most messages is important, use an additional list
|
|
|
- picked_ids = set()
|
|
|
- total_msg_count = 0
|
|
|
+ picked_ids = {}
|
|
|
+ total_comm_count = 0
|
|
|
|
|
|
# iterate over every message count
|
|
|
- for i, msg in enumerate(msg_counts):
|
|
|
+ for i, comm in enumerate(comm_counts):
|
|
|
count_picked_ids = len(picked_ids)
|
|
|
- id_one, id_two = msg[0].split("-")
|
|
|
|
|
|
# if enough IDs have been found, stop
|
|
|
if count_picked_ids >= number_ids:
|
|
|
break
|
|
|
|
|
|
- # if two IDs can be added without exceeding the desired number of IDs, add them
|
|
|
- if count_picked_ids - 2 <= number_ids:
|
|
|
- picked_ids.add(id_one)
|
|
|
- picked_ids.add(id_two)
|
|
|
- total_msg_count += msg[1]
|
|
|
-
|
|
|
- # if there is only room for one more id to be added,
|
|
|
- # find one that is already contained in the picked IDs
|
|
|
- else:
|
|
|
- for j, msg in enumerate(msg_counts[i:]):
|
|
|
- id_one, id_two = msg[0].split("-")
|
|
|
- if id_one in picked_ids:
|
|
|
- picked_ids.add(id_two)
|
|
|
- total_msg_count += msg[1]
|
|
|
- break
|
|
|
- elif id_two in picked_ids:
|
|
|
- picked_ids.add(id_one)
|
|
|
- total_msg_count += msg[1]
|
|
|
- break
|
|
|
- break
|
|
|
-
|
|
|
- return picked_ids, total_msg_count
|
|
|
-
|
|
|
- def get_indv_id_counts_and_comms(picked_ids: dict, msg_counts: dict):
|
|
|
- """
|
|
|
- Retrieves the total mentions of one ID in the communication pattern
|
|
|
- and all communication entries that include only picked IDs.
|
|
|
- """
|
|
|
- indv_id_counts = {}
|
|
|
- id_comms = set()
|
|
|
- for msg in msg_counts:
|
|
|
- ids = msg.split("-")
|
|
|
- if ids[0] in picked_ids and ids[1] in picked_ids:
|
|
|
- msg_other_dir = "{}-{}".format(ids[1], ids[0])
|
|
|
- if (not msg in id_comms) and (not msg_other_dir in id_comms):
|
|
|
- id_comms.add(msg)
|
|
|
+ picked_ids[comm[0]] = comm[1]
|
|
|
+ total_comm_count += comm[1]
|
|
|
|
|
|
- for id_ in ids:
|
|
|
- if id_ in indv_id_counts:
|
|
|
- indv_id_counts[id_] += msg_counts[msg]
|
|
|
- else:
|
|
|
- indv_id_counts[id_] = msg_counts[msg]
|
|
|
+ return picked_ids, total_comm_count
|
|
|
|
|
|
- return indv_id_counts, id_comms
|
|
|
|
|
|
-
|
|
|
- # first find all possible intervals that contain enough IDs that communicate among themselves
|
|
|
+ # first find all possible intervals that contain enough IDs that initiate communication
|
|
|
idx_low, idx_high = 0, 0
|
|
|
- msg_counts = dict()
|
|
|
+ comm_counts = dict()
|
|
|
possible_intervals = []
|
|
|
|
|
|
# Iterate over all packets from start to finish and process the info of each packet
|
|
@@ -163,107 +114,71 @@ class CommunicationProcessor():
|
|
|
# if current interval time exceeds time interval, save the message counts if appropriate, or stop if no more packets
|
|
|
if greater_than(cur_int_time, max_int_time) or idx_high >= len(packets):
|
|
|
# get all message counts for communications that took place in the current intervall
|
|
|
- nez_msg_counts = get_nez_msg_counts(msg_counts)
|
|
|
-
|
|
|
+ nez_comm_counts = get_nez_comm_counts(comm_counts)
|
|
|
# if we have enough IDs as specified by the caller, mark as possible interval
|
|
|
- if count_ids_in_msg_counts(nez_msg_counts) >= number_ids:
|
|
|
+ if len(nez_comm_counts) >= number_ids:
|
|
|
# possible_intervals.append((nez_msg_counts, packets[idx_low]["Time"], packets[idx_high-1]["Time"]))
|
|
|
- possible_intervals.append((nez_msg_counts, idx_low, idx_high - 1))
|
|
|
+ possible_intervals.append((nez_comm_counts, idx_low, idx_high - 1))
|
|
|
|
|
|
if idx_high >= len(packets):
|
|
|
break
|
|
|
|
|
|
# let idx_low "catch up" so that the current interval time fits into the interval time specified by the caller
|
|
|
while greater_than(cur_int_time, max_int_time):
|
|
|
- change_msg_counts(msg_counts, idx_low, add=False)
|
|
|
+ change_comm_counts(comm_counts, idx_low, add=False)
|
|
|
idx_low += 1
|
|
|
cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
|
|
|
|
|
|
# consume the new packet at idx_high and process its information
|
|
|
- change_msg_counts(msg_counts, idx_high)
|
|
|
+ change_comm_counts(comm_counts, idx_high)
|
|
|
idx_high += 1
|
|
|
|
|
|
-
|
|
|
# now find the interval in which as many IDs as specified communicate the most in the given time interval
|
|
|
summed_intervals = []
|
|
|
sum_intervals_idxs = []
|
|
|
cur_highest_sum = 0
|
|
|
|
|
|
- # for every interval compute the sum of msg_counts of the first most communicative IDs and eventually find
|
|
|
+ # for every interval compute the sum of id_counts of the first most communicative IDs and eventually find
|
|
|
# the interval(s) with most communication and its IDs
|
|
|
# on the side also store the communication count of the individual IDs
|
|
|
for j, interval in enumerate(possible_intervals):
|
|
|
- msg_counts = interval[0].items()
|
|
|
- sorted_msg_counts = sorted(msg_counts, key=lambda x: x[1], reverse=True)
|
|
|
- picked_ids, msg_sum = get_msg_count_first_ids(sorted_msg_counts)
|
|
|
-
|
|
|
- if msg_sum == cur_highest_sum:
|
|
|
- summed_intervals.append({"IDs": picked_ids, "MsgSum": msg_sum, "Start": interval[1], "End": interval[2]})
|
|
|
- sum_intervals_idxs.append(j)
|
|
|
- elif msg_sum > cur_highest_sum:
|
|
|
- summed_intervals = []
|
|
|
- sum_intervals_idxs = [j]
|
|
|
- summed_intervals.append({"IDs": picked_ids, "MsgSum": msg_sum, "Start": interval[1], "End": interval[2]})
|
|
|
- cur_highest_sum = msg_sum
|
|
|
+ comm_counts = interval[0].items()
|
|
|
+ sorted_comm_counts = sorted(comm_counts, key=lambda x: x[1], reverse=True)
|
|
|
+ picked_ids, comm_sum = get_comm_count_first_ids(sorted_comm_counts)
|
|
|
|
|
|
- for j, interval in enumerate(summed_intervals):
|
|
|
- idx = sum_intervals_idxs[j]
|
|
|
- msg_counts_picked = possible_intervals[idx][0]
|
|
|
- indv_id_counts, id_comms = get_indv_id_counts_and_comms(interval["IDs"], msg_counts_picked)
|
|
|
- interval["IDs"] = indv_id_counts
|
|
|
- interval["Comms"] = id_comms
|
|
|
+ if comm_sum == cur_highest_sum:
|
|
|
+ summed_intervals.append({"IDs": picked_ids, "CommSum": comm_sum, "Start": interval[1], "End": interval[2]})
|
|
|
+ elif comm_sum > cur_highest_sum:
|
|
|
+ summed_intervals = []
|
|
|
+ summed_intervals.append({"IDs": picked_ids, "CommSum": comm_sum, "Start": interval[1], "End": interval[2]})
|
|
|
+ cur_highest_sum = comm_sum
|
|
|
|
|
|
return summed_intervals
|
|
|
|
|
|
|
|
|
- def det_id_roles_and_msgs(self, mtypes: dict):
|
|
|
+ def det_id_roles_and_msgs(self):
|
|
|
"""
|
|
|
Determine the role of every mapped ID. The role can be initiator, responder or both.
|
|
|
On the side also connect corresponding messages together to quickly find out
|
|
|
which reply belongs to which request and vice versa.
|
|
|
|
|
|
- :param mtypes: a dict for fast number to enum type lookup of message types
|
|
|
:return: a 4-tuple as (initiator IDs, responder IDs, both IDs, messages)
|
|
|
"""
|
|
|
|
|
|
+ mtypes = self.mtypes
|
|
|
# setup initial variables and their values
|
|
|
- init_ids, respnd_ids, both_ids = set(), set(), set()
|
|
|
+ respnd_ids = set()
|
|
|
# msgs --> the filtered messages, msg_id --> an increasing ID to give every message an artificial primary key
|
|
|
msgs, msg_id = [], 0
|
|
|
- # kepp track of previous request to find connections
|
|
|
+ # keep track of previous request to find connections
|
|
|
prev_reqs = {}
|
|
|
- all_ids = self.ids
|
|
|
- packets = self.packets
|
|
|
-
|
|
|
- def process_initiator(id_: str):
|
|
|
- """
|
|
|
- Process the given ID as initiator and update the above sets accordingly.
|
|
|
- """
|
|
|
- if id_ in both_ids:
|
|
|
- pass
|
|
|
- elif not id_ in respnd_ids:
|
|
|
- init_ids.add(id_)
|
|
|
- elif id_ in respnd_ids:
|
|
|
- respnd_ids.remove(id_)
|
|
|
- both_ids.add(id_)
|
|
|
-
|
|
|
- def process_responder(id_: str):
|
|
|
- """
|
|
|
- Process the given ID as responder and update the above sets accordingly.
|
|
|
- """
|
|
|
- if id_ in both_ids:
|
|
|
- pass
|
|
|
- elif not id_ in init_ids:
|
|
|
- respnd_ids.add(id_)
|
|
|
- elif id_ in init_ids:
|
|
|
- init_ids.remove(id_)
|
|
|
- both_ids.add(id_)
|
|
|
+ init_ids = self.init_ids
|
|
|
|
|
|
# process every packet individually
|
|
|
- for packet in packets:
|
|
|
+ for packet in self.packets:
|
|
|
id_src, id_dst, msg_type, time = packet["Src"], packet["Dst"], int(packet["Type"]), float(packet["Time"])
|
|
|
# if if either one of the IDs is not mapped, continue
|
|
|
- if (not id_src in all_ids) or (not id_dst in all_ids):
|
|
|
+ if (id_src not in init_ids) and (id_dst not in init_ids):
|
|
|
continue
|
|
|
|
|
|
# convert message type number to enum type
|
|
@@ -271,9 +186,10 @@ class CommunicationProcessor():
|
|
|
|
|
|
# process a request
|
|
|
if msg_type in {MessageType.SALITY_HELLO, MessageType.SALITY_NL_REQUEST}:
|
|
|
- # process each ID's role
|
|
|
- process_initiator(id_src)
|
|
|
- process_responder(id_dst)
|
|
|
+ if id_src not in init_ids:
|
|
|
+ continue
|
|
|
+ # process ID's role
|
|
|
+ respnd_ids.add(id_dst)
|
|
|
# convert the abstract message into a message object to handle it better
|
|
|
msg_str = "{0}-{1}".format(id_src, id_dst)
|
|
|
msg = Message(msg_id, id_src, id_dst, msg_type, time)
|
|
@@ -282,9 +198,10 @@ class CommunicationProcessor():
|
|
|
|
|
|
# process a reply
|
|
|
elif msg_type in {MessageType.SALITY_HELLO_REPLY, MessageType.SALITY_NL_REPLY}:
|
|
|
- # process each ID's role
|
|
|
- process_initiator(id_dst)
|
|
|
- process_responder(id_src)
|
|
|
+ if id_dst not in init_ids:
|
|
|
+ continue
|
|
|
+ # process ID's role
|
|
|
+ respnd_ids.add(id_src)
|
|
|
# convert the abstract message into a message object to handle it better
|
|
|
msg_str = "{0}-{1}".format(id_dst, id_src)
|
|
|
# find the request message ID for this response and set its reference index
|
|
@@ -301,113 +218,422 @@ class CommunicationProcessor():
|
|
|
msg_id += 1
|
|
|
|
|
|
# store the retrieved information in this object for later use
|
|
|
- self.init_ids, self.respnd_ids, self.both_ids = sorted(init_ids), sorted(respnd_ids), sorted(both_ids)
|
|
|
+ self.respnd_ids = sorted(respnd_ids)
|
|
|
self.messages = msgs
|
|
|
|
|
|
# return the retrieved information
|
|
|
- return init_ids, respnd_ids, both_ids, msgs
|
|
|
+ return self.init_ids, self.respnd_ids, msgs
|
|
|
|
|
|
|
|
|
- def det_ext_and_local_ids(self, comm_type: str, prob_init_local: int, prob_rspnd_local: int):
|
|
|
+ def det_ext_and_local_ids(self, prob_rspnd_local: int):
|
|
|
"""
|
|
|
Map the given IDs to a locality (i.e. local or external} considering the given probabilities.
|
|
|
|
|
|
:param comm_type: the type of communication (i.e. local, external or mixed)
|
|
|
- :param prob_init_local: the probabilty that an initiator ID is local
|
|
|
:param prob_rspnd_local: the probabilty that a responder is local
|
|
|
"""
|
|
|
- init_ids, respnd_ids, both_ids = self.init_ids, self.respnd_ids, self.both_ids
|
|
|
- id_comms = sorted(self.id_comms)
|
|
|
external_ids = set()
|
|
|
- local_ids = set()
|
|
|
- ids = self.ids
|
|
|
-
|
|
|
- def map_init_is_local(id_:str):
|
|
|
- """
|
|
|
- Map the given ID as local and handle its communication partners' locality
|
|
|
- """
|
|
|
- # loop over all communication entries
|
|
|
-
|
|
|
- for id_comm in id_comms:
|
|
|
- ids = id_comm.split("-")
|
|
|
- other = ids[0] if id_ == ids[1] else ids[1]
|
|
|
-
|
|
|
- # if id_comm does not contain the ID to be mapped, continue
|
|
|
- if not (id_ == ids[0] or id_ == ids[1]):
|
|
|
- continue
|
|
|
-
|
|
|
- # if other is already mapped, continue
|
|
|
- if other in local_ids or other in external_ids:
|
|
|
- continue
|
|
|
-
|
|
|
- # if comm_type is mixed, other ID can be local or external
|
|
|
- if comm_type == "mixed":
|
|
|
- other_pos = mixed_respnd_is_local.random()
|
|
|
- if other_pos == "local":
|
|
|
- local_ids.add(other)
|
|
|
- elif other_pos == "external":
|
|
|
- external_ids.add(other)
|
|
|
-
|
|
|
- # if comm_type is external, other ID must be external to fulfill type
|
|
|
- # exlude initiators not to throw away too much communication
|
|
|
- elif comm_type == "external":
|
|
|
- if not other in initiators:
|
|
|
- external_ids.add(other)
|
|
|
-
|
|
|
- def map_init_is_external(id_: int):
|
|
|
- """
|
|
|
- Map the given ID as external and handle its communication partners' locality
|
|
|
- """
|
|
|
- for id_comm in id_comms:
|
|
|
- ids = id_comm.split("-")
|
|
|
- other = ids[0] if id_ == ids[1] else ids[1]
|
|
|
-
|
|
|
- # if id_comm does not contain the ID to be mapped, continue
|
|
|
- if not (id_ == ids[0] or id_ == ids[1]):
|
|
|
- continue
|
|
|
-
|
|
|
- # if other is already mapped, continue
|
|
|
- if other in local_ids or other in external_ids:
|
|
|
- continue
|
|
|
-
|
|
|
- if not other in initiators:
|
|
|
- local_ids.add(other)
|
|
|
-
|
|
|
-
|
|
|
- # if comm_type is local, map all IDs to local
|
|
|
- if comm_type == "local":
|
|
|
- local_ids = set(mapped_ids.keys())
|
|
|
- else:
|
|
|
- # set up probabilistic chooser
|
|
|
- init_local_or_external = Lea.fromValFreqsDict({"local": prob_init_local*100, "external": (1-prob_init_local)*100})
|
|
|
- mixed_respnd_is_local = Lea.fromValFreqsDict({"local": prob_rspnd_local*100, "external": (1-prob_rspnd_local)*100})
|
|
|
-
|
|
|
- # assign IDs in 'both' local everytime for mixed?
|
|
|
- # sort initiators by some order, to gain determinism
|
|
|
- initiators = sorted(list(init_ids) + list(both_ids))
|
|
|
- # sort by individual communication count to increase final communication count
|
|
|
- # better to sort by highest count of 'shared' IDs in case of local comm_type?
|
|
|
- initiators = sorted(initiators, key=lambda id_:self.indv_id_counts[id_], reverse=True)
|
|
|
-
|
|
|
- for id_ in initiators:
|
|
|
- pos = init_local_or_external.random()
|
|
|
- if pos == "local":
|
|
|
- # if id_ has already been mapped differently, its communication partners still have to be mapped
|
|
|
- if id_ in external_ids:
|
|
|
- map_init_is_external(id_)
|
|
|
- # otherwise, map as chosen above
|
|
|
- else:
|
|
|
- local_ids.add(id_)
|
|
|
- map_init_is_local(id_)
|
|
|
- elif pos == "external":
|
|
|
- # if id_ has already been mapped differently, its communication partners still have to be mapped
|
|
|
- if id_ in local_ids:
|
|
|
- map_init_is_local(id_)
|
|
|
- # otherwise, map as chosen above
|
|
|
- else:
|
|
|
- external_ids.add(id_)
|
|
|
- map_init_is_external(id_)
|
|
|
+ local_ids = self.init_ids.copy()
|
|
|
+
|
|
|
+ # set up probabilistic chooser
|
|
|
+ rspnd_locality = Lea.fromValFreqsDict({"local": prob_rspnd_local*100, "external": (1-prob_rspnd_local)*100})
|
|
|
+
|
|
|
+ # determine responder localities
|
|
|
+ for id_ in self.respnd_ids:
|
|
|
+ if id_ in local_ids or id_ in external_ids:
|
|
|
+ continue
|
|
|
+
|
|
|
+ pos = rspnd_locality.random()
|
|
|
+ if pos == "local":
|
|
|
+ local_ids.add(id_)
|
|
|
+ elif pos == "external":
|
|
|
+ external_ids.add(id_)
|
|
|
|
|
|
self.local_ids, self.external_ids = local_ids, external_ids
|
|
|
- return local_ids, external_ids
|
|
|
-
|
|
|
+ return self.local_ids, self.external_ids
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ # def find_interval_with_most_comm(self, number_ids: int, max_int_time: float):
|
|
|
+ # """
|
|
|
+ # Finds a time interval of the given seconds where the given number of IDs communicate among themselves the most.
|
|
|
+
|
|
|
+ # :param packets: The packets containing the communication
|
|
|
+ # :param number_ids: The number of IDs that are to be considered
|
|
|
+ # :param max_int_time: A short description of the attack.
|
|
|
+ # :return: A triple consisting of the IDs, as well as start and end idx with respect to the given packets.
|
|
|
+ # """
|
|
|
+ # packets = self.packets
|
|
|
+
|
|
|
+ # def get_nez_msg_counts(msg_counts: dict):
|
|
|
+ # """
|
|
|
+ # Filters out all msg_counts that have 0 as value
|
|
|
+ # """
|
|
|
+ # nez_msg_counts = dict()
|
|
|
+ # for msg in msg_counts.keys():
|
|
|
+ # count = msg_counts[msg]
|
|
|
+ # if count > 0:
|
|
|
+ # nez_msg_counts[msg] = count
|
|
|
+ # return nez_msg_counts
|
|
|
+
|
|
|
+ # def greater_than(a: float, b: float):
|
|
|
+ # """
|
|
|
+ # A greater than operator desgined to handle slight machine inprecision up to EPS_TOLERANCE.
|
|
|
+ # :return: True if a > b, otherwise False
|
|
|
+ # """
|
|
|
+ # return b - a < -EPS_TOLERANCE
|
|
|
+
|
|
|
+ # def change_msg_counts(msg_counts: dict, idx: int, add=True):
|
|
|
+ # """
|
|
|
+ # Changes the value of the message count of the message occuring in the packet specified by the given index.
|
|
|
+ # Adds 1 if add is True and subtracts 1 otherwise.
|
|
|
+ # """
|
|
|
+ # change = 1 if add else -1
|
|
|
+ # id_src, id_dst = packets[idx]["Src"], packets[idx]["Dst"]
|
|
|
+ # src_to_dst = "{0}-{1}".format(id_src, id_dst)
|
|
|
+ # dst_to_src = "{0}-{1}".format(id_dst, id_src)
|
|
|
+
|
|
|
+ # if src_to_dst in msg_counts.keys():
|
|
|
+ # msg_counts[src_to_dst] += change
|
|
|
+ # elif dst_to_src in msg_counts.keys():
|
|
|
+ # msg_counts[dst_to_src] += change
|
|
|
+ # elif add:
|
|
|
+ # msg_counts[src_to_dst] = 1
|
|
|
+
|
|
|
+ # def count_ids_in_msg_counts(msg_counts: dict):
|
|
|
+ # """
|
|
|
+ # Counts all ids that are involved in messages with a non zero message count
|
|
|
+ # """
|
|
|
+ # ids = set()
|
|
|
+ # for msg in msg_counts.keys():
|
|
|
+ # src, dst = msg.split("-")
|
|
|
+ # ids.add(dst)
|
|
|
+ # ids.add(src)
|
|
|
+ # return len(ids)
|
|
|
+
|
|
|
+ # def get_msg_count_first_ids(msg_counts: list):
|
|
|
+ # """
|
|
|
+ # Finds the IDs that communicate among themselves the most with respect to the given message counts.
|
|
|
+ # :param msg_counts: a sorted list of message counts where each entry is a tuple of key and value
|
|
|
+ # :return: The picked IDs and their total message count as a tuple
|
|
|
+ # """
|
|
|
+ # # if order of most messages is important, use an additional list
|
|
|
+ # picked_ids = set()
|
|
|
+ # total_msg_count = 0
|
|
|
+
|
|
|
+ # # iterate over every message count
|
|
|
+ # for i, msg in enumerate(msg_counts):
|
|
|
+ # count_picked_ids = len(picked_ids)
|
|
|
+ # id_one, id_two = msg[0].split("-")
|
|
|
+
|
|
|
+ # # if enough IDs have been found, stop
|
|
|
+ # if count_picked_ids >= number_ids:
|
|
|
+ # break
|
|
|
+
|
|
|
+ # # if two IDs can be added without exceeding the desired number of IDs, add them
|
|
|
+ # if count_picked_ids - 2 <= number_ids:
|
|
|
+ # picked_ids.add(id_one)
|
|
|
+ # picked_ids.add(id_two)
|
|
|
+ # total_msg_count += msg[1]
|
|
|
+
|
|
|
+ # # if there is only room for one more id to be added,
|
|
|
+ # # find one that is already contained in the picked IDs
|
|
|
+ # else:
|
|
|
+ # for j, msg in enumerate(msg_counts[i:]):
|
|
|
+ # id_one, id_two = msg[0].split("-")
|
|
|
+ # if id_one in picked_ids:
|
|
|
+ # picked_ids.add(id_two)
|
|
|
+ # total_msg_count += msg[1]
|
|
|
+ # break
|
|
|
+ # elif id_two in picked_ids:
|
|
|
+ # picked_ids.add(id_one)
|
|
|
+ # total_msg_count += msg[1]
|
|
|
+ # break
|
|
|
+ # break
|
|
|
+
|
|
|
+ # return picked_ids, total_msg_count
|
|
|
+
|
|
|
+ # def get_indv_id_counts_and_comms(picked_ids: dict, msg_counts: dict):
|
|
|
+ # """
|
|
|
+ # Retrieves the total mentions of one ID in the communication pattern
|
|
|
+ # and all communication entries that include only picked IDs.
|
|
|
+ # """
|
|
|
+ # indv_id_counts = {}
|
|
|
+ # id_comms = set()
|
|
|
+ # for msg in msg_counts:
|
|
|
+ # ids = msg.split("-")
|
|
|
+ # if ids[0] in picked_ids and ids[1] in picked_ids:
|
|
|
+ # msg_other_dir = "{}-{}".format(ids[1], ids[0])
|
|
|
+ # if (not msg in id_comms) and (not msg_other_dir in id_comms):
|
|
|
+ # id_comms.add(msg)
|
|
|
+
|
|
|
+ # for id_ in ids:
|
|
|
+ # if id_ in indv_id_counts:
|
|
|
+ # indv_id_counts[id_] += msg_counts[msg]
|
|
|
+ # else:
|
|
|
+ # indv_id_counts[id_] = msg_counts[msg]
|
|
|
+
|
|
|
+ # return indv_id_counts, id_comms
|
|
|
+
|
|
|
+
|
|
|
+ # # first find all possible intervals that contain enough IDs that communicate among themselves
|
|
|
+ # idx_low, idx_high = 0, 0
|
|
|
+ # msg_counts = dict()
|
|
|
+ # possible_intervals = []
|
|
|
+
|
|
|
+ # # Iterate over all packets from start to finish and process the info of each packet
|
|
|
+ # # If time of packet within time interval, update the message count for this communication
|
|
|
+ # # If time of packet exceeds time interval, substract from the message count for this communication
|
|
|
+ # while True:
|
|
|
+ # if idx_high < len(packets):
|
|
|
+ # cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
|
|
|
+
|
|
|
+ # # if current interval time exceeds time interval, save the message counts if appropriate, or stop if no more packets
|
|
|
+ # if greater_than(cur_int_time, max_int_time) or idx_high >= len(packets):
|
|
|
+ # # get all message counts for communications that took place in the current intervall
|
|
|
+ # nez_msg_counts = get_nez_msg_counts(msg_counts)
|
|
|
+
|
|
|
+ # # if we have enough IDs as specified by the caller, mark as possible interval
|
|
|
+ # if count_ids_in_msg_counts(nez_msg_counts) >= number_ids:
|
|
|
+ # # possible_intervals.append((nez_msg_counts, packets[idx_low]["Time"], packets[idx_high-1]["Time"]))
|
|
|
+ # possible_intervals.append((nez_msg_counts, idx_low, idx_high - 1))
|
|
|
+
|
|
|
+ # if idx_high >= len(packets):
|
|
|
+ # break
|
|
|
+
|
|
|
+ # # let idx_low "catch up" so that the current interval time fits into the interval time specified by the caller
|
|
|
+ # while greater_than(cur_int_time, max_int_time):
|
|
|
+ # change_msg_counts(msg_counts, idx_low, add=False)
|
|
|
+ # idx_low += 1
|
|
|
+ # cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
|
|
|
+
|
|
|
+ # # consume the new packet at idx_high and process its information
|
|
|
+ # change_msg_counts(msg_counts, idx_high)
|
|
|
+ # idx_high += 1
|
|
|
+
|
|
|
+
|
|
|
+ # # now find the interval in which as many IDs as specified communicate the most in the given time interval
|
|
|
+ # summed_intervals = []
|
|
|
+ # sum_intervals_idxs = []
|
|
|
+ # cur_highest_sum = 0
|
|
|
+
|
|
|
+ # # for every interval compute the sum of msg_counts of the first most communicative IDs and eventually find
|
|
|
+ # # the interval(s) with most communication and its IDs
|
|
|
+ # # on the side also store the communication count of the individual IDs
|
|
|
+ # for j, interval in enumerate(possible_intervals):
|
|
|
+ # msg_counts = interval[0].items()
|
|
|
+ # sorted_msg_counts = sorted(msg_counts, key=lambda x: x[1], reverse=True)
|
|
|
+ # picked_ids, msg_sum = get_msg_count_first_ids(sorted_msg_counts)
|
|
|
+
|
|
|
+ # if msg_sum == cur_highest_sum:
|
|
|
+ # summed_intervals.append({"IDs": picked_ids, "MsgSum": msg_sum, "Start": interval[1], "End": interval[2]})
|
|
|
+ # sum_intervals_idxs.append(j)
|
|
|
+ # elif msg_sum > cur_highest_sum:
|
|
|
+ # summed_intervals = []
|
|
|
+ # sum_intervals_idxs = [j]
|
|
|
+ # summed_intervals.append({"IDs": picked_ids, "MsgSum": msg_sum, "Start": interval[1], "End": interval[2]})
|
|
|
+ # cur_highest_sum = msg_sum
|
|
|
+
|
|
|
+ # for j, interval in enumerate(summed_intervals):
|
|
|
+ # idx = sum_intervals_idxs[j]
|
|
|
+ # msg_counts_picked = possible_intervals[idx][0]
|
|
|
+ # indv_id_counts, id_comms = get_indv_id_counts_and_comms(interval["IDs"], msg_counts_picked)
|
|
|
+ # interval["IDs"] = indv_id_counts
|
|
|
+ # interval["Comms"] = id_comms
|
|
|
+
|
|
|
+ # return summed_intervals
|
|
|
+
|
|
|
+ # def det_id_roles_and_msgs(self):
|
|
|
+ # """
|
|
|
+ # Determine the role of every mapped ID. The role can be initiator, responder or both.
|
|
|
+ # On the side also connect corresponding messages together to quickly find out
|
|
|
+ # which reply belongs to which request and vice versa.
|
|
|
+
|
|
|
+ # :return: a 4-tuple as (initiator IDs, responder IDs, both IDs, messages)
|
|
|
+ # """
|
|
|
+
|
|
|
+ # mtypes = self.mtypes
|
|
|
+ # # setup initial variables and their values
|
|
|
+ # init_ids, respnd_ids, both_ids = set(), set(), set()
|
|
|
+ # # msgs --> the filtered messages, msg_id --> an increasing ID to give every message an artificial primary key
|
|
|
+ # msgs, msg_id = [], 0
|
|
|
+ # # kepp track of previous request to find connections
|
|
|
+ # prev_reqs = {}
|
|
|
+ # all_init_ids = self.init_ids
|
|
|
+ # packets = self.packets
|
|
|
+
|
|
|
+ # def process_initiator(id_: str):
|
|
|
+ # """
|
|
|
+ # Process the given ID as initiator and update the above sets accordingly.
|
|
|
+ # """
|
|
|
+ # if id_ in both_ids:
|
|
|
+ # pass
|
|
|
+ # elif not id_ in respnd_ids:
|
|
|
+ # init_ids.add(id_)
|
|
|
+ # elif id_ in respnd_ids:
|
|
|
+ # respnd_ids.remove(id_)
|
|
|
+ # both_ids.add(id_)
|
|
|
+
|
|
|
+ # def process_responder(id_: str):
|
|
|
+ # """
|
|
|
+ # Process the given ID as responder and update the above sets accordingly.
|
|
|
+ # """
|
|
|
+ # if id_ in both_ids:
|
|
|
+ # pass
|
|
|
+ # elif not id_ in init_ids:
|
|
|
+ # respnd_ids.add(id_)
|
|
|
+ # elif id_ in init_ids:
|
|
|
+ # init_ids.remove(id_)
|
|
|
+ # both_ids.add(id_)
|
|
|
+
|
|
|
+ # # process every packet individually
|
|
|
+ # for packet in packets:
|
|
|
+ # id_src, id_dst, msg_type, time = packet["Src"], packet["Dst"], int(packet["Type"]), float(packet["Time"])
|
|
|
+ # # if if either one of the IDs is not mapped, continue
|
|
|
+ # if (not id_src in all_ids) or (not id_dst in all_ids):
|
|
|
+ # continue
|
|
|
+
|
|
|
+ # # convert message type number to enum type
|
|
|
+ # msg_type = mtypes[msg_type]
|
|
|
+
|
|
|
+ # # process a request
|
|
|
+ # if msg_type in {MessageType.SALITY_HELLO, MessageType.SALITY_NL_REQUEST}:
|
|
|
+ # # process each ID's role
|
|
|
+ # process_initiator(id_src)
|
|
|
+ # process_responder(id_dst)
|
|
|
+ # # convert the abstract message into a message object to handle it better
|
|
|
+ # msg_str = "{0}-{1}".format(id_src, id_dst)
|
|
|
+ # msg = Message(msg_id, id_src, id_dst, msg_type, time)
|
|
|
+ # msgs.append(msg)
|
|
|
+ # prev_reqs[msg_str] = msg_id
|
|
|
+
|
|
|
+ # # process a reply
|
|
|
+ # elif msg_type in {MessageType.SALITY_HELLO_REPLY, MessageType.SALITY_NL_REPLY}:
|
|
|
+ # # process each ID's role
|
|
|
+ # process_initiator(id_dst)
|
|
|
+ # process_responder(id_src)
|
|
|
+ # # convert the abstract message into a message object to handle it better
|
|
|
+ # msg_str = "{0}-{1}".format(id_dst, id_src)
|
|
|
+ # # find the request message ID for this response and set its reference index
|
|
|
+ # refer_idx = prev_reqs[msg_str]
|
|
|
+ # msgs[refer_idx].refer_msg_id = msg_id
|
|
|
+ # # print(msgs[refer_idx])
|
|
|
+ # msg = Message(msg_id, id_src, id_dst, msg_type, time, refer_idx)
|
|
|
+ # msgs.append(msg)
|
|
|
+ # # remove the request to this response from storage
|
|
|
+ # del(prev_reqs[msg_str])
|
|
|
+
|
|
|
+ # # for message ID only count actual messages
|
|
|
+ # if not msg_type == MessageType.TIMEOUT:
|
|
|
+ # msg_id += 1
|
|
|
+
|
|
|
+ # # store the retrieved information in this object for later use
|
|
|
+ # self.init_ids, self.respnd_ids, self.both_ids = sorted(init_ids), sorted(respnd_ids), sorted(both_ids)
|
|
|
+ # self.messages = msgs
|
|
|
+
|
|
|
+ # # return the retrieved information
|
|
|
+ # return init_ids, respnd_ids, both_ids, msgs
|
|
|
+
|
|
|
+
|
|
|
+ # def det_ext_and_local_ids(self, comm_type: str, prob_init_local: int, prob_rspnd_local: int):
|
|
|
+ # """
|
|
|
+ # Map the given IDs to a locality (i.e. local or external} considering the given probabilities.
|
|
|
+
|
|
|
+ # :param comm_type: the type of communication (i.e. local, external or mixed)
|
|
|
+ # :param prob_init_local: the probabilty that an initiator ID is local
|
|
|
+ # :param prob_rspnd_local: the probabilty that a responder is local
|
|
|
+ # """
|
|
|
+ # init_ids, respnd_ids, both_ids = self.init_ids, self.respnd_ids, self.both_ids
|
|
|
+ # id_comms = sorted(self.id_comms)
|
|
|
+ # external_ids = set()
|
|
|
+ # local_ids = set()
|
|
|
+ # ids = self.init_ids
|
|
|
+
|
|
|
+ # def map_init_is_local(id_:str):
|
|
|
+ # """
|
|
|
+ # Map the given ID as local and handle its communication partners' locality
|
|
|
+ # """
|
|
|
+ # # loop over all communication entries
|
|
|
+
|
|
|
+ # for id_comm in id_comms:
|
|
|
+ # ids = id_comm.split("-")
|
|
|
+ # other = ids[0] if id_ == ids[1] else ids[1]
|
|
|
+
|
|
|
+ # # if id_comm does not contain the ID to be mapped, continue
|
|
|
+ # if not (id_ == ids[0] or id_ == ids[1]):
|
|
|
+ # continue
|
|
|
+
|
|
|
+ # # if other is already mapped, continue
|
|
|
+ # if other in local_ids or other in external_ids:
|
|
|
+ # continue
|
|
|
+
|
|
|
+ # # if comm_type is mixed, other ID can be local or external
|
|
|
+ # if comm_type == "mixed":
|
|
|
+ # other_pos = mixed_respnd_is_local.random()
|
|
|
+ # if other_pos == "local":
|
|
|
+ # local_ids.add(other)
|
|
|
+ # elif other_pos == "external":
|
|
|
+ # external_ids.add(other)
|
|
|
+
|
|
|
+ # # if comm_type is external, other ID must be external to fulfill type
|
|
|
+ # # exlude initiators not to throw away too much communication
|
|
|
+ # elif comm_type == "external":
|
|
|
+ # if not other in initiators:
|
|
|
+ # external_ids.add(other)
|
|
|
+
|
|
|
+ # def map_init_is_external(id_: int):
|
|
|
+ # """
|
|
|
+ # Map the given ID as external and handle its communication partners' locality
|
|
|
+ # """
|
|
|
+ # for id_comm in id_comms:
|
|
|
+ # ids = id_comm.split("-")
|
|
|
+ # other = ids[0] if id_ == ids[1] else ids[1]
|
|
|
+
|
|
|
+ # # if id_comm does not contain the ID to be mapped, continue
|
|
|
+ # if not (id_ == ids[0] or id_ == ids[1]):
|
|
|
+ # continue
|
|
|
+
|
|
|
+ # # if other is already mapped, continue
|
|
|
+ # if other in local_ids or other in external_ids:
|
|
|
+ # continue
|
|
|
+
|
|
|
+ # if not other in initiators:
|
|
|
+ # local_ids.add(other)
|
|
|
+
|
|
|
+
|
|
|
+ # # if comm_type is local, map all IDs to local
|
|
|
+ # if comm_type == "local":
|
|
|
+ # local_ids = set(mapped_ids.keys())
|
|
|
+ # else:
|
|
|
+ # # set up probabilistic chooser
|
|
|
+ # init_local_or_external = Lea.fromValFreqsDict({"local": prob_init_local*100, "external": (1-prob_init_local)*100})
|
|
|
+ # mixed_respnd_is_local = Lea.fromValFreqsDict({"local": prob_rspnd_local*100, "external": (1-prob_rspnd_local)*100})
|
|
|
+
|
|
|
+ # # assign IDs in 'both' local everytime for mixed?
|
|
|
+ # # sort initiators by some order, to gain determinism
|
|
|
+ # initiators = sorted(list(init_ids) + list(both_ids))
|
|
|
+ # # sort by individual communication count to increase final communication count
|
|
|
+ # # better to sort by highest count of 'shared' IDs in case of local comm_type?
|
|
|
+ # initiators = sorted(initiators, key=lambda id_:self.indv_id_counts[id_], reverse=True)
|
|
|
+
|
|
|
+ # for id_ in initiators:
|
|
|
+ # pos = init_local_or_external.random()
|
|
|
+ # if pos == "local":
|
|
|
+ # # if id_ has already been mapped differently, its communication partners still have to be mapped
|
|
|
+ # if id_ in external_ids:
|
|
|
+ # map_init_is_external(id_)
|
|
|
+ # # otherwise, map as chosen above
|
|
|
+ # else:
|
|
|
+ # local_ids.add(id_)
|
|
|
+ # map_init_is_local(id_)
|
|
|
+ # elif pos == "external":
|
|
|
+ # # if id_ has already been mapped differently, its communication partners still have to be mapped
|
|
|
+ # if id_ in local_ids:
|
|
|
+ # map_init_is_local(id_)
|
|
|
+ # # otherwise, map as chosen above
|
|
|
+ # else:
|
|
|
+ # external_ids.add(id_)
|
|
|
+ # map_init_is_external(id_)
|
|
|
+
|
|
|
+ # self.local_ids, self.external_ids = local_ids, external_ids
|
|
|
+ # return
|