123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389 |
- import logging
- import time
- import math
- import threading
- import struct
- from queue import Queue, Empty
- from collections import deque
- logger = logging.getLogger("pra_framework")
- pack_byte = struct.Struct(">B").pack
- pack_4bytes = struct.Struct(">I").pack
- STATE_INACTIVE = 0
- STATE_SUBGROUPING = 1
- STATE_READING_SAVED_GROUPS = 2
- STATE_FINISHED = 4
- class GroupHandler(object):
- """
- Creates new groups asynchronously.
- Lifecycle:
- init_subgroup_creating() -> while group_handler.state == STATE_SUBGROUPING: get_next_subgroup()
- -queue empty-> init_reading() -> while group_handler.state == STATE_SUBGROUPING: get_next_saved_subgroup()
- -> STATE_INACTIVE
- """
- def __init__(self, marker_value_amount,
- use_plus1,
- create_marker_callback):
- self._marker_value_amount = marker_value_amount
- self._state = STATE_INACTIVE
- self._use_plus1 = use_plus1
- self._create_marker_callback = create_marker_callback
- #self._group_queue = Queue(500000)
- self._group_queue_maxlen = 1000000
- self._group_queue = deque(maxlen=self._group_queue_maxlen)
- self._subgroup_create_thread = None
- self._identified_groups = []
- self._subgrouping_finished = True
- self._iteration = 0
- self._feedback_addresses = None
- self._addresscount = 0
- def _get_state(self):
- return self._state
- # One of STATE_INACTIVE, STATE_FINISHED, STATE_SUBGROUPING
- state = property(_get_state)
- def _get_queuesize(self):
- return len(self._group_queue)
- queuesize = property(_get_queuesize)
- def _get_identified_groups(self):
- return self._identified_groups
- identified_groups = property(_get_identified_groups)
- def _get_addresses_total(self):
- return self._addresscount
- # total amount of created addresses for probing (single addresses in group intersections not removed)
- addresses_total = property(_get_addresses_total)
- def init_subgroup_creating(self, top_groups, subgroup_storage, feedback_addresses):
- """
- Initiate subgroup creating. If no more groups are left for subgrouping, then
- all monitor IP addresses are found and the attack is finished.
- top_groups -- groups to resplit
- subgroup_storage -- dict for storing groups
- feedback_addresses -- list of IPv4Address objects for re-clustering
- """
- logger.debug("group creating initiated, queue length (should be 0)=%d" % self.queuesize)
- if self._state != STATE_INACTIVE:
- logger.warn("state not inactive, have been all groups read? queue=%d" %
- len(self._group_queue))
- return
- self._state = STATE_SUBGROUPING
- self._feedback_addresses = feedback_addresses
- self._iteration += 1
- # marker = [group, amount_subgroups]
- groups_to_resplit = {}
- # responses of NON identified groups, needed to distribute marker
- responses_total = 0
- # stop if we got x% non idenfitied addresses left.
- # this assumes we got low noise, else we stop too early
- addresses_fromnonidentified_total = 0
- # Count total amount of responses from report.
- # In iteration 1 groups have been created by response at iteration 0
- # Don't count identified groups.
- for marker, group in top_groups.items():
- if group.response_count < group.amount_addresses:
- responses_total += group.response_count
- addresses_fromnonidentified_total += group.amount_addresses
- # We only take groups non-identified groups (response count < addresses per group) and
- # having at minimum 1 response
- if group.response_count != 0:
- #logger.debug("group to be resplittet: %r" % group)
- groups_to_resplit[marker] = [group, 0]
- elif group.response_count >= group.amount_addresses:
- # BE WARNED: no responses to other groups could also mean: missed responses because
- # of network errors etc. In this case the "+1" group is counted as identified
- """
- logger.info("group was identified: %r, response count/total addresses = %d/%d" %
- (group, group.response_count, group.amount_addresses))
- """
- """
- def show_hierarchy(group):
- gs = [group]
- _top_group = group.top_group
- while _top_group is not None:
- gs.append(_top_group)
- _top_group = _top_group.top_group
- p = ["%r" % g for g in gs]
- print(" -> ".join(p))
- show_hierarchy(group)
- """
- self._identified_groups.append([group, int(time.time()), self._iteration])
- # stop if nearly all nose have been identified. This avoids endless attack loops
- # because of disappearing monitor nodes
- # add one to avoid division by zero
- if responses_total / (addresses_fromnonidentified_total + 1) > 0.99:
- logger.info("fraction of identified monitors reached: responses/addresses (non identified) = %d/%d = %f" %
- (responses_total, addresses_fromnonidentified_total, (responses_total / addresses_fromnonidentified_total)))
- self._state = STATE_FINISHED
- return
- elif len(groups_to_resplit) == 0:
- logger.info("no groups left to create subgroups from, identified groups=%d" % len(self._identified_groups))
- self._state = STATE_FINISHED
- return
- self._subgroup_create_thread = threading.Thread(target=self._subgroups_create_cycler,
- args=(groups_to_resplit,
- subgroup_storage,
- responses_total))
- logger.debug("starting subgroup creating using %d top groups" % len(groups_to_resplit))
- self._subgroup_create_thread.start()
- def get_next_subgroup(self):
- """
- return -- next group or raises Empty exceptions if no groups are currently available.
- The scanner state switched to STATE_INACTIVE if all no groups are left AND no more
- groups can be created (subgroup creating finished).
- """
- try:
- return self._group_queue.popleft()
- except IndexError:
- # subgrouping finished and queue is empty: change state to inactive
- if self._subgrouping_finished:
- logger.debug("!!! switching to STATE_INACTIVE")
- self._state = STATE_INACTIVE
- raise Empty
- def _subgroups_create_cycler(self, groups_to_resplit, subgroup_storage, responses_total):
- """
- Create news ubgroups incrementally.
- groups_to_resplit -- list containing groups to resplit and count of subgroups (init to 0) [[Group(), 0], ...]
- subgroup_storage -- dictionary to store new subgroups
- responses_total -- sum of all responses for all subgroups
- """
- # marker (amount of subgroups) are distributed based on the amount of responses from the first stage:
- # amount per subgroups = (responses per top-group)/(total responses) * (amount marker)
- # more feedpack = more marker
- logger.debug("setting new marker frequencies per group")
- self._subgrouping_finished = False
- for marker in groups_to_resplit.keys():
- group = groups_to_resplit[marker][0]
- groups_to_resplit[marker][1] = math.floor((group.response_count / responses_total) * self._marker_value_amount)
- """
- logger.debug("group=%r: response count=%d, target split=%d, responses total=%d" % (group,
- group.response_count,
- groups_to_resplit[marker][1],
- responses_total))
- """
- # create one more subgroup which we don't use marker for
- if self._use_plus1:
- groups_to_resplit[marker][1] += 1
- # at minimum 2 markers for every group, robin hood principle: take from the rich/give it to the bitch
- logger.debug("re-distributing marker frequencies (min 2 per group)")
- self._redistribute_marker_frequencies(groups_to_resplit)
- """
- logger.info("total responses/groups for resplit/marker values: %d/%d/%d" % (
- responses_total,
- len(groups_to_resplit),
- self._marker_value_amount
- )
- )
- """
- """
- if len(groups_to_resplit) < 20:
- logger.debug("groups to resplit:")
- for marker, group_freq in groups_to_resplit.items():
- logger.debug("%s -> %r (max subgroups to be created: %d)" % (marker, group_freq[0], group_freq[1]))
- """
- markervalue_int = 0
- # sanity check for already stored marker
- plus1_cnt = 0
- plus1_cnt_connected = 0
- subgroup_cnt = 0
- if self._iteration == 1 and len(self._feedback_addresses) != 0:
- logger.debug("will use feedback addresses for subgrouping")
- group_miss = 0
- # create subgroups vor every top group in current stage
- for marker, group_freq in groups_to_resplit.items():
- # skip group as we don't have enough marker values, try in next iteration
- if group_freq[1] < 2:
- # set to 1 to avoid out-filtering in next iteration
- group_freq[0].response_count = 1
- subgroup_storage["GROUP_MISS_%d" % group_miss] = group_freq[0]
- group_miss += 1
- continue
- feedback_addr = []
- if self._iteration == 1:
- # marker is actually the target IP address in the first iteration
- try:
- feedback_addr = self._feedback_addresses[marker]
- except KeyError:
- # no scanner feedback address for top group, this can happen sometimes:
- # monitor not sending anything on probes on scanner level
- #logger.warning("could not find feedback addresses for top group %r" % group_freq)
- pass
- # split up to group_freq[1] amount of subgroups (stop if single addresses are reached)
- # "+1" subgroup are automatically created by create_subgroups
- #logger.debug("creating max %d subgroups" % group_freq[1])
- # creating 500.000 times 256 subgroups (/24 + 8) takes ~6 Minutes
- subgroups = group_freq[0].create_subgroups(
- group_freq[1],
- ipv4_addresses=feedback_addr,
- use_plus1=self._use_plus1)
- #logger.debug("subgroup amount for %r: %d" % (group_freq[0], len(subgroups)))
- if self._use_plus1:
- if not subgroups[-1].is_plus1:
- logger.warning("????? +1 group missing!!!")
- else:
- #logger.debug("---> top/+1 group=%r\t%r" % (group_freq[0], subgroups[-1]))
- plus1_cnt += 1
- #logger.debug("first group to resplit is: %r" % subgroups[0])
- subgroup_cnt += len(subgroups)
- if len(subgroups) > group_freq[1]:
- logger.warning("group created more subgroubs than it should: %d > %d" %
- (len(subgroups), group_freq[1]))
- if len(subgroups) == 1:
- logger.warning("?????? just 1 subgroup? check this! This should have been an identified group, "
- "parent/subgroup: %r" % subgroups)
- """
- logger.debug("creating subgroups for group=%r: marker start/end = %d/%d" % (group_freq[0],
- group_freq[0].subgroup_start_marker_value_int,
- group_freq[0].subgroup_end_marker_value_int))
- """
- for subgroup in subgroups:
- if not subgroup.is_plus1:
- marker_bytes_subgroup = self._create_marker_callback(markervalue_int)
- subgroup.marker_value_int = markervalue_int
- subgroup.marker_bytes = marker_bytes_subgroup
- subgroup_storage[markervalue_int] = subgroup
- #if markervalue_int % 100 == 0:
- # logger.debug("encoded marker value: %d -> %r" % (markervalue_int, marker_bytes_subgroup))
- markervalue_int += 1
- self._addresscount += subgroup.amount_addresses
- else:
- marker = b"PLUS1_SUBGROUP" + pack_4bytes(markervalue_int)
- subgroup_storage[marker] = subgroup
- group_freq[0].plus1_subgroup = subgroup
- plus1_cnt_connected += 1
- # top group is known to subgroup, top group gets subgroup to know on response
- subgroup.top_group = group_freq[0]
- # store group to be able to be found later on
- if not subgroup.is_plus1:
- while len(self._group_queue) >= self._group_queue_maxlen:
- #logger.debug("group handler queue filled, waiting some seconds")
- time.sleep(1)
- self._group_queue.append(subgroup)
- logger.debug("finished creating all groups, total=%d, +1 subgroups=%d, +1 subgroups connected=%d, top groups=%d" %
- (subgroup_cnt, plus1_cnt, plus1_cnt_connected, len(groups_to_resplit)))
- self._feedback_addresses.clear()
- self._subgrouping_finished = True
- @staticmethod
- def _redistribute_marker_frequencies(marker_groupfreq):
- """
- Redistribute marker to have min 2 vor every position.
- Input: {a:1, b:999, c:0, ...}
- Updated: {a:2, b:996, c:2 ...}
- marker_groupfreq -- {marker : (group, amount_subgroups)}
- return -- dict with min 1 at each position
- """
- for index in marker_groupfreq.keys():
- if marker_groupfreq[index][1] < 2:
- # max_value({a:1, b:999, ...}) -> l=[(b, 999), (a, 1), ...] -> l[0][0]
- max_index = sorted(marker_groupfreq.items(), key=lambda x: x[1][1], reverse=True)[0][0]
- if marker_groupfreq[max_index][1] <= 2:
- logger.warning("too low frequencies (1) for re-distributing, not enough marker values? (try more than 8)")
- break
- adding = 2 - marker_groupfreq[index][1]
- marker_groupfreq[index][1] += adding
- marker_groupfreq[max_index][1] -= adding
- @staticmethod
- def update_plus1_subgroups(top_groups):
- """
- Derive amount of responses of subgroup "+1" using top group and beneath groups.
- For iteration index > 1 there has to be a "+1"-group in any case. Top groups
- which are already identified get discarded
- Example:
- Top-Group = [...] = 6 Responses
- 3 Sub-Groups = [2][1]["+1"-Group] ... [4 (group completely identified -> discard)]
- Updated amount: [2][1][6- (2+1) = 3]
- Behaviour on different response counts (implying +1):
- Top < sum(Sub): Noise in sub or drop in top -> set +1 to 0
- Top >= sum(Sub): Update +1 (can be Noise or drop after all but that should be filtered in the next iterations)
- index_iteration -- index pointing to the groups BEFORE the most recent created groups.
- """
- logger.debug("updating +1 groups")
- for _, top_group in top_groups.items():
- # +1 subgroup is None if Top group is an identified group, ignore
- if top_group.plus1_subgroup is None:
- continue
- response_sum_subgroups = sum([group.response_count for group in top_group.subgroups])
- """
- logger.debug("top group/responses top/reponses subgroups/top -> sub: plus 1 count: %r / %r / %r / %r" %
- (top_group, top_group.response_count, response_sum_subgroups, top_group.plus1_subgroup.response_count))
- """
- if response_sum_subgroups > top_group.response_count:
- logger.warning("new response sum of subgroups greater than response count of top group"
- "-> assuming noise in new responses OR packet drops in old, setting '+1'-group to 0")
- top_group.plus1_subgroup.response_count = 0
- else:
- top_group.plus1_subgroup.response_count = top_group.response_count - response_sum_subgroups
- #if top_group.plus1_subgroup.response_count != 0:
- # logger.debug("found a subgroup having response: %r=%d" %
- # (top_group.plus1_subgroup, top_group.plus1_subgroup.response_count))
- @staticmethod
- def remove_empty_groups(subgroups):
- """
- Remove all groups having response count of 0.
- This has to be called AFTER update +1 groups or non empty +1 groups could
- be removed by mistake otherwise.
- """
- logger.debug("removing empty groups")
- # we can't change a dict while traversing! store keys to be deleted
- keys_to_remove = [markervalue for markervalue, subgroup in subgroups.items()
- if subgroup.response_count == 0]
- for key in keys_to_remove:
- del subgroups[key]
- logger.debug("removed empty groups=%d, remaining groups=%d" % (len(keys_to_remove), len(subgroups)))
- @staticmethod
- def cleanup_groups(topgroups):
- """
- Cleanup unneeded groups.
- groups -- dict of groups to be cleaned
- """
- logger.debug("cleaning up %d groups" % len(topgroups))
- for markervalue, topgroup_ref in topgroups.items():
- for subgroup in topgroup_ref.subgroups:
- subgroup.top_group = None
- topgroup_ref.plus1_subgroup = None
- topgroup_ref.subgroups.clear()
- topgroups.clear()
|