group_handler.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. import logging
  2. import time
  3. import math
  4. import threading
  5. import struct
  6. from queue import Queue, Empty
  7. from collections import deque
  8. logger = logging.getLogger("pra_framework")
  9. pack_byte = struct.Struct(">B").pack
  10. pack_4bytes = struct.Struct(">I").pack
  11. STATE_INACTIVE = 0
  12. STATE_SUBGROUPING = 1
  13. STATE_READING_SAVED_GROUPS = 2
  14. STATE_FINISHED = 4
  15. class GroupHandler(object):
  16. """
  17. Creates new groups asynchronously.
  18. Lifecycle:
  19. init_subgroup_creating() -> while group_handler.state == STATE_SUBGROUPING: get_next_subgroup()
  20. -queue empty-> init_reading() -> while group_handler.state == STATE_SUBGROUPING: get_next_saved_subgroup()
  21. -> STATE_INACTIVE
  22. """
  23. def __init__(self, marker_value_amount,
  24. use_plus1,
  25. create_marker_callback):
  26. self._marker_value_amount = marker_value_amount
  27. self._state = STATE_INACTIVE
  28. self._use_plus1 = use_plus1
  29. self._create_marker_callback = create_marker_callback
  30. #self._group_queue = Queue(500000)
  31. self._group_queue_maxlen = 1000000
  32. self._group_queue = deque(maxlen=self._group_queue_maxlen)
  33. self._subgroup_create_thread = None
  34. self._identified_groups = []
  35. self._subgrouping_finished = True
  36. self._iteration = 0
  37. self._feedback_addresses = None
  38. self._addresscount = 0
  39. def _get_state(self):
  40. return self._state
  41. # One of STATE_INACTIVE, STATE_FINISHED, STATE_SUBGROUPING
  42. state = property(_get_state)
  43. def _get_queuesize(self):
  44. return len(self._group_queue)
  45. queuesize = property(_get_queuesize)
  46. def _get_identified_groups(self):
  47. return self._identified_groups
  48. identified_groups = property(_get_identified_groups)
  49. def _get_addresses_total(self):
  50. return self._addresscount
  51. # total amount of created addresses for probing (single addresses in group intersections not removed)
  52. addresses_total = property(_get_addresses_total)
  53. def init_subgroup_creating(self, top_groups, subgroup_storage, feedback_addresses):
  54. """
  55. Initiate subgroup creating. If no more groups are left for subgrouping, then
  56. all monitor IP addresses are found and the attack is finished.
  57. top_groups -- groups to resplit
  58. subgroup_storage -- dict for storing groups
  59. feedback_addresses -- list of IPv4Address objects for re-clustering
  60. """
  61. logger.debug("group creating initiated, queue length (should be 0)=%d" % self.queuesize)
  62. if self._state != STATE_INACTIVE:
  63. logger.warn("state not inactive, have been all groups read? queue=%d" %
  64. len(self._group_queue))
  65. return
  66. self._state = STATE_SUBGROUPING
  67. self._feedback_addresses = feedback_addresses
  68. self._iteration += 1
  69. # marker = [group, amount_subgroups]
  70. groups_to_resplit = {}
  71. # responses of NON identified groups, needed to distribute marker
  72. responses_total = 0
  73. # stop if we got x% non idenfitied addresses left.
  74. # this assumes we got low noise, else we stop too early
  75. addresses_fromnonidentified_total = 0
  76. # Count total amount of responses from report.
  77. # In iteration 1 groups have been created by response at iteration 0
  78. # Don't count identified groups.
  79. for marker, group in top_groups.items():
  80. if group.response_count < group.amount_addresses:
  81. responses_total += group.response_count
  82. addresses_fromnonidentified_total += group.amount_addresses
  83. # We only take groups non-identified groups (response count < addresses per group) and
  84. # having at minimum 1 response
  85. if group.response_count != 0:
  86. #logger.debug("group to be resplittet: %r" % group)
  87. groups_to_resplit[marker] = [group, 0]
  88. elif group.response_count >= group.amount_addresses:
  89. # BE WARNED: no responses to other groups could also mean: missed responses because
  90. # of network errors etc. In this case the "+1" group is counted as identified
  91. """
  92. logger.info("group was identified: %r, response count/total addresses = %d/%d" %
  93. (group, group.response_count, group.amount_addresses))
  94. """
  95. """
  96. def show_hierarchy(group):
  97. gs = [group]
  98. _top_group = group.top_group
  99. while _top_group is not None:
  100. gs.append(_top_group)
  101. _top_group = _top_group.top_group
  102. p = ["%r" % g for g in gs]
  103. print(" -> ".join(p))
  104. show_hierarchy(group)
  105. """
  106. self._identified_groups.append([group, int(time.time()), self._iteration])
  107. # stop if nearly all nose have been identified. This avoids endless attack loops
  108. # because of disappearing monitor nodes
  109. # add one to avoid division by zero
  110. if responses_total / (addresses_fromnonidentified_total + 1) > 0.99:
  111. logger.info("fraction of identified monitors reached: responses/addresses (non identified) = %d/%d = %f" %
  112. (responses_total, addresses_fromnonidentified_total, (responses_total / addresses_fromnonidentified_total)))
  113. self._state = STATE_FINISHED
  114. return
  115. elif len(groups_to_resplit) == 0:
  116. logger.info("no groups left to create subgroups from, identified groups=%d" % len(self._identified_groups))
  117. self._state = STATE_FINISHED
  118. return
  119. self._subgroup_create_thread = threading.Thread(target=self._subgroups_create_cycler,
  120. args=(groups_to_resplit,
  121. subgroup_storage,
  122. responses_total))
  123. logger.debug("starting subgroup creating using %d top groups" % len(groups_to_resplit))
  124. self._subgroup_create_thread.start()
  125. def get_next_subgroup(self):
  126. """
  127. return -- next group or raises Empty exceptions if no groups are currently available.
  128. The scanner state switched to STATE_INACTIVE if all no groups are left AND no more
  129. groups can be created (subgroup creating finished).
  130. """
  131. try:
  132. return self._group_queue.popleft()
  133. except IndexError:
  134. # subgrouping finished and queue is empty: change state to inactive
  135. if self._subgrouping_finished:
  136. logger.debug("!!! switching to STATE_INACTIVE")
  137. self._state = STATE_INACTIVE
  138. raise Empty
  139. def _subgroups_create_cycler(self, groups_to_resplit, subgroup_storage, responses_total):
  140. """
  141. Create news ubgroups incrementally.
  142. groups_to_resplit -- list containing groups to resplit and count of subgroups (init to 0) [[Group(), 0], ...]
  143. subgroup_storage -- dictionary to store new subgroups
  144. responses_total -- sum of all responses for all subgroups
  145. """
  146. # marker (amount of subgroups) are distributed based on the amount of responses from the first stage:
  147. # amount per subgroups = (responses per top-group)/(total responses) * (amount marker)
  148. # more feedpack = more marker
  149. logger.debug("setting new marker frequencies per group")
  150. self._subgrouping_finished = False
  151. for marker in groups_to_resplit.keys():
  152. group = groups_to_resplit[marker][0]
  153. groups_to_resplit[marker][1] = math.floor((group.response_count / responses_total) * self._marker_value_amount)
  154. """
  155. logger.debug("group=%r: response count=%d, target split=%d, responses total=%d" % (group,
  156. group.response_count,
  157. groups_to_resplit[marker][1],
  158. responses_total))
  159. """
  160. # create one more subgroup which we don't use marker for
  161. if self._use_plus1:
  162. groups_to_resplit[marker][1] += 1
  163. # at minimum 2 markers for every group, robin hood principle: take from the rich/give it to the bitch
  164. logger.debug("re-distributing marker frequencies (min 2 per group)")
  165. self._redistribute_marker_frequencies(groups_to_resplit)
  166. """
  167. logger.info("total responses/groups for resplit/marker values: %d/%d/%d" % (
  168. responses_total,
  169. len(groups_to_resplit),
  170. self._marker_value_amount
  171. )
  172. )
  173. """
  174. """
  175. if len(groups_to_resplit) < 20:
  176. logger.debug("groups to resplit:")
  177. for marker, group_freq in groups_to_resplit.items():
  178. logger.debug("%s -> %r (max subgroups to be created: %d)" % (marker, group_freq[0], group_freq[1]))
  179. """
  180. markervalue_int = 0
  181. # sanity check for already stored marker
  182. plus1_cnt = 0
  183. plus1_cnt_connected = 0
  184. subgroup_cnt = 0
  185. if self._iteration == 1 and len(self._feedback_addresses) != 0:
  186. logger.debug("will use feedback addresses for subgrouping")
  187. group_miss = 0
  188. # create subgroups vor every top group in current stage
  189. for marker, group_freq in groups_to_resplit.items():
  190. # skip group as we don't have enough marker values, try in next iteration
  191. if group_freq[1] < 2:
  192. # set to 1 to avoid out-filtering in next iteration
  193. group_freq[0].response_count = 1
  194. subgroup_storage["GROUP_MISS_%d" % group_miss] = group_freq[0]
  195. group_miss += 1
  196. continue
  197. feedback_addr = []
  198. if self._iteration == 1:
  199. # marker is actually the target IP address in the first iteration
  200. try:
  201. feedback_addr = self._feedback_addresses[marker]
  202. except KeyError:
  203. # no scanner feedback address for top group, this can happen sometimes:
  204. # monitor not sending anything on probes on scanner level
  205. #logger.warning("could not find feedback addresses for top group %r" % group_freq)
  206. pass
  207. # split up to group_freq[1] amount of subgroups (stop if single addresses are reached)
  208. # "+1" subgroup are automatically created by create_subgroups
  209. #logger.debug("creating max %d subgroups" % group_freq[1])
  210. # creating 500.000 times 256 subgroups (/24 + 8) takes ~6 Minutes
  211. subgroups = group_freq[0].create_subgroups(
  212. group_freq[1],
  213. ipv4_addresses=feedback_addr,
  214. use_plus1=self._use_plus1)
  215. #logger.debug("subgroup amount for %r: %d" % (group_freq[0], len(subgroups)))
  216. if self._use_plus1:
  217. if not subgroups[-1].is_plus1:
  218. logger.warning("????? +1 group missing!!!")
  219. else:
  220. #logger.debug("---> top/+1 group=%r\t%r" % (group_freq[0], subgroups[-1]))
  221. plus1_cnt += 1
  222. #logger.debug("first group to resplit is: %r" % subgroups[0])
  223. subgroup_cnt += len(subgroups)
  224. if len(subgroups) > group_freq[1]:
  225. logger.warning("group created more subgroubs than it should: %d > %d" %
  226. (len(subgroups), group_freq[1]))
  227. if len(subgroups) == 1:
  228. logger.warning("?????? just 1 subgroup? check this! This should have been an identified group, "
  229. "parent/subgroup: %r" % subgroups)
  230. """
  231. logger.debug("creating subgroups for group=%r: marker start/end = %d/%d" % (group_freq[0],
  232. group_freq[0].subgroup_start_marker_value_int,
  233. group_freq[0].subgroup_end_marker_value_int))
  234. """
  235. for subgroup in subgroups:
  236. if not subgroup.is_plus1:
  237. marker_bytes_subgroup = self._create_marker_callback(markervalue_int)
  238. subgroup.marker_value_int = markervalue_int
  239. subgroup.marker_bytes = marker_bytes_subgroup
  240. subgroup_storage[markervalue_int] = subgroup
  241. #if markervalue_int % 100 == 0:
  242. # logger.debug("encoded marker value: %d -> %r" % (markervalue_int, marker_bytes_subgroup))
  243. markervalue_int += 1
  244. self._addresscount += subgroup.amount_addresses
  245. else:
  246. marker = b"PLUS1_SUBGROUP" + pack_4bytes(markervalue_int)
  247. subgroup_storage[marker] = subgroup
  248. group_freq[0].plus1_subgroup = subgroup
  249. plus1_cnt_connected += 1
  250. # top group is known to subgroup, top group gets subgroup to know on response
  251. subgroup.top_group = group_freq[0]
  252. # store group to be able to be found later on
  253. if not subgroup.is_plus1:
  254. while len(self._group_queue) >= self._group_queue_maxlen:
  255. #logger.debug("group handler queue filled, waiting some seconds")
  256. time.sleep(1)
  257. self._group_queue.append(subgroup)
  258. logger.debug("finished creating all groups, total=%d, +1 subgroups=%d, +1 subgroups connected=%d, top groups=%d" %
  259. (subgroup_cnt, plus1_cnt, plus1_cnt_connected, len(groups_to_resplit)))
  260. self._feedback_addresses.clear()
  261. self._subgrouping_finished = True
  262. @staticmethod
  263. def _redistribute_marker_frequencies(marker_groupfreq):
  264. """
  265. Redistribute marker to have min 2 vor every position.
  266. Input: {a:1, b:999, c:0, ...}
  267. Updated: {a:2, b:996, c:2 ...}
  268. marker_groupfreq -- {marker : (group, amount_subgroups)}
  269. return -- dict with min 1 at each position
  270. """
  271. for index in marker_groupfreq.keys():
  272. if marker_groupfreq[index][1] < 2:
  273. # max_value({a:1, b:999, ...}) -> l=[(b, 999), (a, 1), ...] -> l[0][0]
  274. max_index = sorted(marker_groupfreq.items(), key=lambda x: x[1][1], reverse=True)[0][0]
  275. if marker_groupfreq[max_index][1] <= 2:
  276. logger.warning("too low frequencies (1) for re-distributing, not enough marker values? (try more than 8)")
  277. break
  278. adding = 2 - marker_groupfreq[index][1]
  279. marker_groupfreq[index][1] += adding
  280. marker_groupfreq[max_index][1] -= adding
  281. @staticmethod
  282. def update_plus1_subgroups(top_groups):
  283. """
  284. Derive amount of responses of subgroup "+1" using top group and beneath groups.
  285. For iteration index > 1 there has to be a "+1"-group in any case. Top groups
  286. which are already identified get discarded
  287. Example:
  288. Top-Group = [...] = 6 Responses
  289. 3 Sub-Groups = [2][1]["+1"-Group] ... [4 (group completely identified -> discard)]
  290. Updated amount: [2][1][6- (2+1) = 3]
  291. Behaviour on different response counts (implying +1):
  292. Top < sum(Sub): Noise in sub or drop in top -> set +1 to 0
  293. Top >= sum(Sub): Update +1 (can be Noise or drop after all but that should be filtered in the next iterations)
  294. index_iteration -- index pointing to the groups BEFORE the most recent created groups.
  295. """
  296. logger.debug("updating +1 groups")
  297. for _, top_group in top_groups.items():
  298. # +1 subgroup is None if Top group is an identified group, ignore
  299. if top_group.plus1_subgroup is None:
  300. continue
  301. response_sum_subgroups = sum([group.response_count for group in top_group.subgroups])
  302. """
  303. logger.debug("top group/responses top/reponses subgroups/top -> sub: plus 1 count: %r / %r / %r / %r" %
  304. (top_group, top_group.response_count, response_sum_subgroups, top_group.plus1_subgroup.response_count))
  305. """
  306. if response_sum_subgroups > top_group.response_count:
  307. logger.warning("new response sum of subgroups greater than response count of top group"
  308. "-> assuming noise in new responses OR packet drops in old, setting '+1'-group to 0")
  309. top_group.plus1_subgroup.response_count = 0
  310. else:
  311. top_group.plus1_subgroup.response_count = top_group.response_count - response_sum_subgroups
  312. #if top_group.plus1_subgroup.response_count != 0:
  313. # logger.debug("found a subgroup having response: %r=%d" %
  314. # (top_group.plus1_subgroup, top_group.plus1_subgroup.response_count))
  315. @staticmethod
  316. def remove_empty_groups(subgroups):
  317. """
  318. Remove all groups having response count of 0.
  319. This has to be called AFTER update +1 groups or non empty +1 groups could
  320. be removed by mistake otherwise.
  321. """
  322. logger.debug("removing empty groups")
  323. # we can't change a dict while traversing! store keys to be deleted
  324. keys_to_remove = [markervalue for markervalue, subgroup in subgroups.items()
  325. if subgroup.response_count == 0]
  326. for key in keys_to_remove:
  327. del subgroups[key]
  328. logger.debug("removed empty groups=%d, remaining groups=%d" % (len(keys_to_remove), len(subgroups)))
  329. @staticmethod
  330. def cleanup_groups(topgroups):
  331. """
  332. Cleanup unneeded groups.
  333. groups -- dict of groups to be cleaned
  334. """
  335. logger.debug("cleaning up %d groups" % len(topgroups))
  336. for markervalue, topgroup_ref in topgroups.items():
  337. for subgroup in topgroup_ref.subgroups:
  338. subgroup.top_group = None
  339. topgroup_ref.plus1_subgroup = None
  340. topgroup_ref.subgroups.clear()
  341. topgroups.clear()