import logging import time import math import threading import struct from queue import Queue, Empty from collections import deque logger = logging.getLogger("pra_framework") pack_byte = struct.Struct(">B").pack pack_4bytes = struct.Struct(">I").pack STATE_INACTIVE = 0 STATE_SUBGROUPING = 1 STATE_READING_SAVED_GROUPS = 2 STATE_FINISHED = 4 class GroupHandler(object): """ Creates new groups asynchronously. Lifecycle: init_subgroup_creating() -> while group_handler.state == STATE_SUBGROUPING: get_next_subgroup() -queue empty-> init_reading() -> while group_handler.state == STATE_SUBGROUPING: get_next_saved_subgroup() -> STATE_INACTIVE """ def __init__(self, marker_value_amount, use_plus1, create_marker_callback): self._marker_value_amount = marker_value_amount self._state = STATE_INACTIVE self._use_plus1 = use_plus1 self._create_marker_callback = create_marker_callback #self._group_queue = Queue(500000) self._group_queue_maxlen = 1000000 self._group_queue = deque(maxlen=self._group_queue_maxlen) self._subgroup_create_thread = None self._identified_groups = [] self._subgrouping_finished = True self._iteration = 0 self._feedback_addresses = None self._addresscount = 0 def _get_state(self): return self._state # One of STATE_INACTIVE, STATE_FINISHED, STATE_SUBGROUPING state = property(_get_state) def _get_queuesize(self): return len(self._group_queue) queuesize = property(_get_queuesize) def _get_identified_groups(self): return self._identified_groups identified_groups = property(_get_identified_groups) def _get_addresses_total(self): return self._addresscount # total amount of created addresses for probing (single addresses in group intersections not removed) addresses_total = property(_get_addresses_total) def init_subgroup_creating(self, top_groups, subgroup_storage, feedback_addresses): """ Initiate subgroup creating. If no more groups are left for subgrouping, then all monitor IP addresses are found and the attack is finished. top_groups -- groups to resplit subgroup_storage -- dict for storing groups feedback_addresses -- list of IPv4Address objects for re-clustering """ logger.debug("group creating initiated, queue length (should be 0)=%d" % self.queuesize) if self._state != STATE_INACTIVE: logger.warn("state not inactive, have been all groups read? queue=%d" % len(self._group_queue)) return self._state = STATE_SUBGROUPING self._feedback_addresses = feedback_addresses self._iteration += 1 # marker = [group, amount_subgroups] groups_to_resplit = {} # responses of NON identified groups, needed to distribute marker responses_total = 0 # stop if we got x% non idenfitied addresses left. # this assumes we got low noise, else we stop too early addresses_fromnonidentified_total = 0 # Count total amount of responses from report. # In iteration 1 groups have been created by response at iteration 0 # Don't count identified groups. for marker, group in top_groups.items(): if group.response_count < group.amount_addresses: responses_total += group.response_count addresses_fromnonidentified_total += group.amount_addresses # We only take groups non-identified groups (response count < addresses per group) and # having at minimum 1 response if group.response_count != 0: #logger.debug("group to be resplittet: %r" % group) groups_to_resplit[marker] = [group, 0] elif group.response_count >= group.amount_addresses: # BE WARNED: no responses to other groups could also mean: missed responses because # of network errors etc. In this case the "+1" group is counted as identified """ logger.info("group was identified: %r, response count/total addresses = %d/%d" % (group, group.response_count, group.amount_addresses)) """ """ def show_hierarchy(group): gs = [group] _top_group = group.top_group while _top_group is not None: gs.append(_top_group) _top_group = _top_group.top_group p = ["%r" % g for g in gs] print(" -> ".join(p)) show_hierarchy(group) """ self._identified_groups.append([group, int(time.time()), self._iteration]) # stop if nearly all nose have been identified. This avoids endless attack loops # because of disappearing monitor nodes # add one to avoid division by zero if responses_total / (addresses_fromnonidentified_total + 1) > 0.99: logger.info("fraction of identified monitors reached: responses/addresses (non identified) = %d/%d = %f" % (responses_total, addresses_fromnonidentified_total, (responses_total / addresses_fromnonidentified_total))) self._state = STATE_FINISHED return elif len(groups_to_resplit) == 0: logger.info("no groups left to create subgroups from, identified groups=%d" % len(self._identified_groups)) self._state = STATE_FINISHED return self._subgroup_create_thread = threading.Thread(target=self._subgroups_create_cycler, args=(groups_to_resplit, subgroup_storage, responses_total)) logger.debug("starting subgroup creating using %d top groups" % len(groups_to_resplit)) self._subgroup_create_thread.start() def get_next_subgroup(self): """ return -- next group or raises Empty exceptions if no groups are currently available. The scanner state switched to STATE_INACTIVE if all no groups are left AND no more groups can be created (subgroup creating finished). """ try: return self._group_queue.popleft() except IndexError: # subgrouping finished and queue is empty: change state to inactive if self._subgrouping_finished: logger.debug("!!! switching to STATE_INACTIVE") self._state = STATE_INACTIVE raise Empty def _subgroups_create_cycler(self, groups_to_resplit, subgroup_storage, responses_total): """ Create news ubgroups incrementally. groups_to_resplit -- list containing groups to resplit and count of subgroups (init to 0) [[Group(), 0], ...] subgroup_storage -- dictionary to store new subgroups responses_total -- sum of all responses for all subgroups """ # marker (amount of subgroups) are distributed based on the amount of responses from the first stage: # amount per subgroups = (responses per top-group)/(total responses) * (amount marker) # more feedpack = more marker logger.debug("setting new marker frequencies per group") self._subgrouping_finished = False for marker in groups_to_resplit.keys(): group = groups_to_resplit[marker][0] groups_to_resplit[marker][1] = math.floor((group.response_count / responses_total) * self._marker_value_amount) """ logger.debug("group=%r: response count=%d, target split=%d, responses total=%d" % (group, group.response_count, groups_to_resplit[marker][1], responses_total)) """ # create one more subgroup which we don't use marker for if self._use_plus1: groups_to_resplit[marker][1] += 1 # at minimum 2 markers for every group, robin hood principle: take from the rich/give it to the bitch logger.debug("re-distributing marker frequencies (min 2 per group)") self._redistribute_marker_frequencies(groups_to_resplit) """ logger.info("total responses/groups for resplit/marker values: %d/%d/%d" % ( responses_total, len(groups_to_resplit), self._marker_value_amount ) ) """ """ if len(groups_to_resplit) < 20: logger.debug("groups to resplit:") for marker, group_freq in groups_to_resplit.items(): logger.debug("%s -> %r (max subgroups to be created: %d)" % (marker, group_freq[0], group_freq[1])) """ markervalue_int = 0 # sanity check for already stored marker plus1_cnt = 0 plus1_cnt_connected = 0 subgroup_cnt = 0 if self._iteration == 1 and len(self._feedback_addresses) != 0: logger.debug("will use feedback addresses for subgrouping") group_miss = 0 # create subgroups vor every top group in current stage for marker, group_freq in groups_to_resplit.items(): # skip group as we don't have enough marker values, try in next iteration if group_freq[1] < 2: # set to 1 to avoid out-filtering in next iteration group_freq[0].response_count = 1 subgroup_storage["GROUP_MISS_%d" % group_miss] = group_freq[0] group_miss += 1 continue feedback_addr = [] if self._iteration == 1: # marker is actually the target IP address in the first iteration try: feedback_addr = self._feedback_addresses[marker] except KeyError: # no scanner feedback address for top group, this can happen sometimes: # monitor not sending anything on probes on scanner level #logger.warning("could not find feedback addresses for top group %r" % group_freq) pass # split up to group_freq[1] amount of subgroups (stop if single addresses are reached) # "+1" subgroup are automatically created by create_subgroups #logger.debug("creating max %d subgroups" % group_freq[1]) # creating 500.000 times 256 subgroups (/24 + 8) takes ~6 Minutes subgroups = group_freq[0].create_subgroups( group_freq[1], ipv4_addresses=feedback_addr, use_plus1=self._use_plus1) #logger.debug("subgroup amount for %r: %d" % (group_freq[0], len(subgroups))) if self._use_plus1: if not subgroups[-1].is_plus1: logger.warning("????? +1 group missing!!!") else: #logger.debug("---> top/+1 group=%r\t%r" % (group_freq[0], subgroups[-1])) plus1_cnt += 1 #logger.debug("first group to resplit is: %r" % subgroups[0]) subgroup_cnt += len(subgroups) if len(subgroups) > group_freq[1]: logger.warning("group created more subgroubs than it should: %d > %d" % (len(subgroups), group_freq[1])) if len(subgroups) == 1: logger.warning("?????? just 1 subgroup? check this! This should have been an identified group, " "parent/subgroup: %r" % subgroups) """ logger.debug("creating subgroups for group=%r: marker start/end = %d/%d" % (group_freq[0], group_freq[0].subgroup_start_marker_value_int, group_freq[0].subgroup_end_marker_value_int)) """ for subgroup in subgroups: if not subgroup.is_plus1: marker_bytes_subgroup = self._create_marker_callback(markervalue_int) subgroup.marker_value_int = markervalue_int subgroup.marker_bytes = marker_bytes_subgroup subgroup_storage[markervalue_int] = subgroup #if markervalue_int % 100 == 0: # logger.debug("encoded marker value: %d -> %r" % (markervalue_int, marker_bytes_subgroup)) markervalue_int += 1 self._addresscount += subgroup.amount_addresses else: marker = b"PLUS1_SUBGROUP" + pack_4bytes(markervalue_int) subgroup_storage[marker] = subgroup group_freq[0].plus1_subgroup = subgroup plus1_cnt_connected += 1 # top group is known to subgroup, top group gets subgroup to know on response subgroup.top_group = group_freq[0] # store group to be able to be found later on if not subgroup.is_plus1: while len(self._group_queue) >= self._group_queue_maxlen: #logger.debug("group handler queue filled, waiting some seconds") time.sleep(1) self._group_queue.append(subgroup) logger.debug("finished creating all groups, total=%d, +1 subgroups=%d, +1 subgroups connected=%d, top groups=%d" % (subgroup_cnt, plus1_cnt, plus1_cnt_connected, len(groups_to_resplit))) self._feedback_addresses.clear() self._subgrouping_finished = True @staticmethod def _redistribute_marker_frequencies(marker_groupfreq): """ Redistribute marker to have min 2 vor every position. Input: {a:1, b:999, c:0, ...} Updated: {a:2, b:996, c:2 ...} marker_groupfreq -- {marker : (group, amount_subgroups)} return -- dict with min 1 at each position """ for index in marker_groupfreq.keys(): if marker_groupfreq[index][1] < 2: # max_value({a:1, b:999, ...}) -> l=[(b, 999), (a, 1), ...] -> l[0][0] max_index = sorted(marker_groupfreq.items(), key=lambda x: x[1][1], reverse=True)[0][0] if marker_groupfreq[max_index][1] <= 2: logger.warning("too low frequencies (1) for re-distributing, not enough marker values? (try more than 8)") break adding = 2 - marker_groupfreq[index][1] marker_groupfreq[index][1] += adding marker_groupfreq[max_index][1] -= adding @staticmethod def update_plus1_subgroups(top_groups): """ Derive amount of responses of subgroup "+1" using top group and beneath groups. For iteration index > 1 there has to be a "+1"-group in any case. Top groups which are already identified get discarded Example: Top-Group = [...] = 6 Responses 3 Sub-Groups = [2][1]["+1"-Group] ... [4 (group completely identified -> discard)] Updated amount: [2][1][6- (2+1) = 3] Behaviour on different response counts (implying +1): Top < sum(Sub): Noise in sub or drop in top -> set +1 to 0 Top >= sum(Sub): Update +1 (can be Noise or drop after all but that should be filtered in the next iterations) index_iteration -- index pointing to the groups BEFORE the most recent created groups. """ logger.debug("updating +1 groups") for _, top_group in top_groups.items(): # +1 subgroup is None if Top group is an identified group, ignore if top_group.plus1_subgroup is None: continue response_sum_subgroups = sum([group.response_count for group in top_group.subgroups]) """ logger.debug("top group/responses top/reponses subgroups/top -> sub: plus 1 count: %r / %r / %r / %r" % (top_group, top_group.response_count, response_sum_subgroups, top_group.plus1_subgroup.response_count)) """ if response_sum_subgroups > top_group.response_count: logger.warning("new response sum of subgroups greater than response count of top group" "-> assuming noise in new responses OR packet drops in old, setting '+1'-group to 0") top_group.plus1_subgroup.response_count = 0 else: top_group.plus1_subgroup.response_count = top_group.response_count - response_sum_subgroups #if top_group.plus1_subgroup.response_count != 0: # logger.debug("found a subgroup having response: %r=%d" % # (top_group.plus1_subgroup, top_group.plus1_subgroup.response_count)) @staticmethod def remove_empty_groups(subgroups): """ Remove all groups having response count of 0. This has to be called AFTER update +1 groups or non empty +1 groups could be removed by mistake otherwise. """ logger.debug("removing empty groups") # we can't change a dict while traversing! store keys to be deleted keys_to_remove = [markervalue for markervalue, subgroup in subgroups.items() if subgroup.response_count == 0] for key in keys_to_remove: del subgroups[key] logger.debug("removed empty groups=%d, remaining groups=%d" % (len(keys_to_remove), len(subgroups))) @staticmethod def cleanup_groups(topgroups): """ Cleanup unneeded groups. groups -- dict of groups to be cleaned """ logger.debug("cleaning up %d groups" % len(topgroups)) for markervalue, topgroup_ref in topgroups.items(): for subgroup in topgroup_ref.subgroups: subgroup.top_group = None topgroup_ref.plus1_subgroup = None topgroup_ref.subgroups.clear() topgroups.clear()