attack_logic.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747
  1. import os
  2. import logging
  3. import threading
  4. import re
  5. import pickle
  6. import os.path
  7. import time
  8. import math
  9. import struct
  10. import queue
  11. from multiprocessing import Process, SimpleQueue
  12. from group import Group
  13. import scanner_wrapper
  14. import group_handler
  15. import report_fetcher
  16. import ipv4
  17. from utility import ip_str_to_bytes
  18. from pypacker.checksum import fletcher32
  19. from pypacker.psocket import SocketHndl
  20. from pypacker.layer12 import ethernet
  21. from pypacker.layer3 import ip
  22. from pypacker.layer4 import tcp
  23. split_equal = re.compile("=+").split
  24. split_space = re.compile(" +").split
  25. split_tab = re.compile("\t").split
  26. split_comma = re.compile(",").split
  27. split_slash = re.compile("/").split
  28. unpack_port = struct.Struct(">H").unpack
  29. pack_checksum = struct.Struct(">I").pack
  30. pack_port = struct.Struct(">H").pack
  31. pack_marker = struct.Struct(">Q").pack
  32. pack_byte = struct.Struct(">B").pack
  33. pack_markervalue = struct.Struct(">I").pack
  34. unpack_marker = struct.Struct(">Q").unpack
  35. unpack_markervalue = struct.Struct(">Q").unpack
  36. IPv4Address = ipv4.IPv4Address
  37. IPv4Network = ipv4.IPv4Network
  38. ENCODE_PORT_DST = 1
  39. ENCODE_IP_SRC = 2
  40. ENCODE_PORT_SRC = 4
  41. logger = logging.getLogger("pra_framework")
  42. class ProbeResponseAttackLogic(object):
  43. """
  44. Implemenration of the probe response attack using dedicated scanner for probing.
  45. Workflow:
  46. >> Stage 1:
  47. - Send probes without creating initial groups (saves state information) -> create Groups WHILE getting feedback
  48. - Get result and add Groups to _iterations[index_iteration]
  49. - Create groups: Check scanner feedback and re-group per sub-group based on it
  50. >> Stage 2:
  51. - Create Ggroups before scanning (first time: after stage 1/iteration 1)
  52. - Set amount of subgroups based on amount of responses
  53. - Continue scanning until no groups are left
  54. The destination address can be encoded into different parts of markers like
  55. source IP address, source port, destination port etc.
  56. Pre-filtering of noisy values:
  57. - Noisy marker values don't need to get pre-filtered as this problem can be effectively
  58. mitigated using a trade off between marker/checksum-bits.
  59. Detection mitigation:
  60. - Avoiding low-noise markers to avoid detection does not seem to be necessary
  61. as low noise markers like destination ports would appear with a low frequency
  62. -> sending 65535 (dst port) * (other marker) values will appear approximately
  63. "amount of sensors" * y times for every attack iterations, which is likely to be a
  64. low number for a specific port "p", which is hard to detect as statistic anomaly.
  65. """
  66. # Default parameters to be set. This is more efficient flexible
  67. # than explicitly setting them in __init__() via self._xyz...
  68. PROP_DEFAULTS = {
  69. # network options
  70. "interface_name": "eth0",
  71. "mac_source": None,
  72. "mac_gw": None,
  73. # source IP, mainly intended for native probes
  74. "ip_src": "1.1.1.1",
  75. "rate_kbit_per_s": 1000,
  76. # marker options
  77. "marker_encoding": 0,
  78. "markerbits_value": 24,
  79. "markerbits_checksum": 8,
  80. # directory to save all data related to a attack cycle (trailing slash)
  81. "base_dir_save": "./",
  82. # directory which contains "src/zmap"
  83. "base_dir_zmap": "../zmap",
  84. "disable_monitor": 1,
  85. "verbosity": 3,
  86. "report_fetcher_classname": "TracingReportFetcher",
  87. # state info. Should only be changed by internal logic.
  88. # stage 1 or 2 (initial 0)
  89. #"_stage": 0,
  90. # index to _iterations where most rectent groups get stored
  91. "_group_storeindex": 0,
  92. "use_feedback_ips": False,
  93. "use_plus1": False,
  94. "is_simulation": False,
  95. # debugging options
  96. # avoid scanning the whole network -> limit to a subnet (default is: scan whole IPv4 - blacklist)
  97. "_ip_stage1": "0.0.0.0",
  98. "_cidr_bits_stage1": 0
  99. }
  100. def __init__(self, **kwargs):
  101. # set default values, will be overwritten by config variables
  102. # parameter hierarchy: default -> constructor
  103. for prop, default in ProbeResponseAttackLogic.PROP_DEFAULTS.items():
  104. setattr(self, prop, kwargs.get(prop, default))
  105. logger.debug("config: %s = %s" % (prop, kwargs.get(prop, default)))
  106. self._running = False
  107. # current scan processes
  108. self._current_scans = []
  109. # Files
  110. self._filename_state_groups = os.path.join(self.base_dir_save, "state_groups.bin")
  111. # contains information like: ip -> response type
  112. self._filename_scannerresponse_stage1 = os.path.join(self.base_dir_save, "scanner_response.csv")
  113. if self.is_simulation:
  114. self._filename_ip_blacklist = os.path.join(self.base_dir_zmap, "blacklist.conf_simulation")
  115. self._filename_ip_whitelist = None
  116. else:
  117. self._filename_ip_blacklist = os.path.join(self.base_dir_zmap, "blacklist.conf")
  118. #self._filename_ip_whitelist = os.path.join(self.base_dir_zmap, "whitelist.conf")
  119. self._filename_ip_whitelist = None
  120. self._filename_ip_blacklist_plus_feedback_ips = os.path.join(self.base_dir_save, "blacklist_plus_feedback_addr.conf")
  121. self._filename_full_report = os.path.join(self.base_dir_save, "report_fetcher_responses.csv")
  122. self._filename_identified_monitors = os.path.join(self.base_dir_save, "identified_monitors.csv")
  123. self._dirname_scanner_logs = self.base_dir_save
  124. # set default report Fetcher if anything goes wrong
  125. self._reportfetcher_class = report_fetcher.TracingReportFetcher
  126. self.set_report_fetcher_class(self.report_fetcher_classname)
  127. self._report_fetcher = None
  128. self.attack_thread = None
  129. self.marker_bits_total = self.markerbits_value + self.markerbits_checksum
  130. self.marker_bytes_total = math.ceil((self.markerbits_value + self.markerbits_checksum)/8)
  131. # amount of bits to shift marker value to make it 4 bytes from the left
  132. # bits=9: 0000 0000 0000 0000 0000 0001 1111 1111 -> left shift=23 -> 1111 1111 1000 0000....
  133. self.marker_value_leftshit = 32 - self.markerbits_value
  134. self.marker_value_bytes_amount = math.ceil(self.markerbits_value/8)
  135. self.marker_value_amount = 2 ** self.markerbits_value
  136. # By using pl1 groups we are saving/getting additional "amount of new subgroups" marker values
  137. logger.debug("total amount of marker VALUES: %d (plus1: %r)" % (self.marker_value_amount, self.use_plus1))
  138. logger.debug("initial scan: %r/%r" % (self._ip_stage1, self._cidr_bits_stage1))
  139. # Example: 5 marker values:, _log_2(5)_ = 2 -> 2 Bits = 4 Subgroups
  140. self.cidr_bits_stage1 = math.floor(math.log(self.marker_value_amount, 2))
  141. self.marker_checksum_bytes_amount = math.ceil(self.markerbits_checksum / 8)
  142. # Groups which got/get scanned.
  143. # [[start, stop], {marker: Group}, total groups, total addresses]
  144. self._iterations = []
  145. self._iterations.append([[0, 0], {}, 0, 0])
  146. initial_group = Group(ip_network_object=IPv4Network(nw_ip_str=self._ip_stage1,
  147. prefixlen=self._cidr_bits_stage1),
  148. response_count=0)
  149. self._root_group_name = b"ROOT_GROUP_PRA"
  150. self._iterations[0][1][self._root_group_name] = initial_group
  151. addresses_root = 2 ** (32 - self._cidr_bits_stage1)
  152. self._iterations[0][2] = min(self.marker_value_amount, addresses_root)
  153. self._iterations[0][3] = addresses_root
  154. self._group_storeindex = 0
  155. """
  156. if self.markerbits_value % 8 != 0 or self.markerbits_checksum % 8 !=0:
  157. logger.warning("markerbits not multiple of 8, value/checksum = %r/%r" %
  158. (self.markerbits_value, self.markerbits_checksum))
  159. """
  160. # native prober
  161. # TODO: adjust this on other platforms
  162. self._native_prober_amount = 5
  163. self._native_prober_sockets = []
  164. self._native_prober_conn_send = []
  165. self._native_prober_processes = []
  166. self._native_prober_conn_send_index = 0
  167. self._groupqueue = SimpleQueue()
  168. self._blacklist = set()
  169. # { top group marker value -> set(feedback_address1, feedback_address2, ...)}
  170. self._blacklist_toptoipv4obj = {}
  171. self._grouphandler = group_handler.GroupHandler(self.marker_value_amount,
  172. self.use_plus1,
  173. self._create_marker_bitlevel)
  174. self._initial_count = 0
  175. iterations = property(lambda self: self._iterations)
  176. grouphandler = property(lambda self: self._grouphandler)
  177. def set_report_fetcher_class(self, classname):
  178. if self._running:
  179. return
  180. # get report fetcher by classname
  181. fetcher_module = __import__("report_fetcher")
  182. try:
  183. logger.debug("setting report fetcher class: %s" % classname)
  184. self._reportfetcher_class = getattr(fetcher_module, classname)
  185. except:
  186. raise Exception("could not load Report Fetcher class! Is it implemented in report_fetcher.py?")
  187. def _report_values_to_marker(self, ip_source, port_src, port_dst):
  188. """
  189. Combines all markers given by a report in the correct order. Unused parts are left out.
  190. return -- markervalue (int), markerchecksum (int), marker (bytes)
  191. """
  192. bts = []
  193. if self.marker_encoding & ENCODE_PORT_DST == ENCODE_PORT_DST:
  194. bts.append(pack_port(port_dst))
  195. if self.marker_encoding & ENCODE_IP_SRC == ENCODE_IP_SRC:
  196. bts.append(ip_str_to_bytes(ip_source))
  197. if self.marker_encoding & ENCODE_PORT_SRC == ENCODE_PORT_SRC:
  198. bts.append(pack_port(port_src))
  199. bts = b"".join(bts)
  200. #logger.debug("decoding: %s" % bts)
  201. markervalue_and_checksum = int.from_bytes(bts, "big")
  202. #logger.debug("report full marker: %s=%d" % (bts, markervalue_and_checksum))
  203. # marker value: b"AAAA AAAA AAAA AABB" -> b"00AA AAAA AAAA AAAA"
  204. # marker checksum: b"AAAA AAAA AAAA AABB" -> b"0000 0000 0000 00BB"
  205. marker_length = self.marker_bits_total
  206. return (markervalue_and_checksum >> (marker_length - self.markerbits_value)),\
  207. markervalue_and_checksum & (0xFFFFFFFFFFFFFFFF >> (64 - self.markerbits_checksum)),\
  208. bts
  209. def _create_checksum_bitlevel(self, markervalue):
  210. """
  211. Create a checksum using value markervalue
  212. markervalue -- integer to create checksum from (non padded)
  213. return -- checksum as integer
  214. """
  215. marker_value_leftshifted = markervalue << self.marker_value_leftshit
  216. marker_value_bytes_forchecksum = pack_markervalue(marker_value_leftshifted)
  217. #logger.debug("padded marker value before checksum: %s" % marker_padded)
  218. #return pack_checksum(fletcher32(marker_padded, len(marker_padded)/2)), marker_padded
  219. #checksum_int = fletcher32(marker_value_bytes_forchecksum, len(marker_value_bytes_forchecksum)/2) & 0xFFFFFFFF
  220. return fletcher32(marker_value_bytes_forchecksum, 2) >> (32 - self.markerbits_checksum)
  221. def _create_marker_bitlevel(self, marker_value_int_to_encode):
  222. """
  223. Create a new marker like [marker][checksum] from the given value_int.
  224. marker value:
  225. 291 -> b"00000123" (hex) -> b"000123" (marker value: eg 3 bytes)
  226. -> chk = checksum(b"000123" -> b"00012300") = b"abcdefgh"
  227. -> b"000123" (marker value) + b"abcdefgh"[:checksum_len] (marker checksum)
  228. Resulting marker (3 value bytes, 2 checksum bytes): b"000123abcdefgh" -> b"000123abcd"
  229. WARNING: this assumes self.marker_bits_total % 8 == 0 is True (otherwise shift resulting marker to the left)
  230. """
  231. checksum_int = self._create_checksum_bitlevel(marker_value_int_to_encode)
  232. # take marker value from left, cut off marker bits from right
  233. marker_int = (marker_value_int_to_encode << self.markerbits_checksum) | checksum_int
  234. #logger.debug("new marker value orig/marker value for checksum/checksum is = %s/%s/%s" %
  235. # (marker_value, marker_value_padded, marker_checksum))
  236. # INFO: if self.marker_bits_total is not divisible by 8 without rest: shift self.marker_bits_total % 8 to right
  237. return pack_marker(marker_int)[-self.marker_bytes_total:]
  238. def _read_scanner_feedback_addresses(self):
  239. """
  240. Read scanner feedback addresses gathered from stage 1. Additionally this updates
  241. _filename_ip_blacklist_plus_feedback_ips to contain all blacklisted IPs (including feedback ones).
  242. """
  243. if not os.path.exists(self._filename_scannerresponse_stage1):
  244. logger.warn("could not find response file at: %s" % self._filename_scannerresponse_stage1)
  245. return []
  246. logger.debug("reading feedback file")
  247. fd = open(self._filename_scannerresponse_stage1, "r")
  248. # skip header
  249. fd.readline()
  250. # File format of feedback file:
  251. # classification,saddr,daddr,daddr_inner_icmp,sport,dport,success
  252. # rst,1.0.0.0,1.1.1.1,(None),256,2,0
  253. # synack,1.0.0.1,1.1.1.1,(None),256,2,1
  254. self._blacklist.clear()
  255. self._blacklist_toptoipv4obj.clear()
  256. groups_stage1 = self._iterations[1][1]
  257. # found IP -> remove trailing/unused bits: (found IP) >> (IPv4 length - (marker value bits))
  258. bits_shift_ip = 32 - self.markerbits_value
  259. for line in fd:
  260. columns = split_comma(line)
  261. if len(columns) < 4:
  262. logger.warning("not enough columns in CSV file: %r" % columns)
  263. continue
  264. # CSV format: type,ip src (attack target), ip dst,ip extern (our extern ip),...
  265. responsetype = columns[0]
  266. # assume everything other than ICMP are potential monitor nodes
  267. if not responsetype.startswith("icmp"):
  268. address = columns[1]
  269. else:
  270. continue
  271. # assume ICMP unreachable indicates no monitor node
  272. try:
  273. # convert ip to marker value and check if its belongs to a found group
  274. # this is MUCH faster than checking every single group
  275. address_obj = IPv4Address(ip_str=address)
  276. # 1.2.3.4 -> 1.2 = marker value
  277. top_group_markervalue = address_obj.ip_int >> bits_shift_ip
  278. if top_group_markervalue not in groups_stage1:
  279. #logger.derbug("skipping IP not belonging in initial groups: %r" % address_obj)
  280. continue
  281. except Exception as ex:
  282. logger.warning("invalid IPv4 address in feedback file: %r" % address)
  283. print(ex)
  284. continue
  285. # this is a bit redundant but doesn't hurt
  286. if address_obj.packed not in self._blacklist:
  287. self._blacklist.add(address_obj.packed)
  288. else:
  289. # already saved
  290. # skip adding the address to the _blacklist_toptoipv4obj list
  291. continue
  292. try:
  293. self._blacklist_toptoipv4obj[top_group_markervalue].append(address_obj)
  294. except KeyError:
  295. self._blacklist_toptoipv4obj[top_group_markervalue] = [address_obj]
  296. fd.close()
  297. logger.debug("amount feedback addresses: %d" % len(self._blacklist))
  298. # update extended blacklist file
  299. logger.debug("updating extended blacklist file")
  300. fd_read = open(self._filename_ip_blacklist, "r")
  301. blacklist_standard = fd_read.read()
  302. fd_read.close()
  303. fd_write = open(self._filename_ip_blacklist_plus_feedback_ips, "w")
  304. fd_write.write(blacklist_standard)
  305. for top_group_markervalue, addresses in self._blacklist_toptoipv4obj.items():
  306. for addr in addresses:
  307. fd_write.write("%s/32\n" % addr.compressed)
  308. fd_write.close()
  309. def _addentry_callback(self, ip_source, port_src, port_dst):
  310. """
  311. Callback called by reportfetcher if a new reponse was found.
  312. ip_source -- IPv4 address as string
  313. port_src -- source port as int
  314. port_dst -- destination port as int
  315. """
  316. marker_value_report_int, marker_checksum_report_int, marker_report = self._report_values_to_marker(ip_source,
  317. port_src,
  318. port_dst)
  319. #logger.debug("from report: marker value=%d, checksum=%d, full marker=%s" %
  320. # (marker_value_report_int, marker_checksum_report_int, marker_report))
  321. marker_checksum_gen = self._create_checksum_bitlevel(marker_value_report_int)
  322. #logger.debug("comparing checksum: new/report = %s/%s" % (checksum_gen, marker_checksum_report))
  323. # compare via startswith: checksum could only be partially encoded
  324. # skip values having invalid checksums, ignore if no checksum is used
  325. if marker_checksum_report_int != marker_checksum_gen and self.markerbits_checksum != 0:
  326. #logger.debug("checksum didn't match (YOLO): %r!=%r" % (
  327. # marker_checksum_report_int, marker_checksum_gen))
  328. return
  329. # logger.debug("checksum matched!!!")
  330. if self._group_storeindex == 1:
  331. group_dict_current = self._iterations[1][1]
  332. if marker_value_report_int in group_dict_current:
  333. group_dict_current[marker_value_report_int].response_count += 1
  334. """
  335. logger.debug("incremented response count: group=%r, count=%d" %
  336. (group_dict_current[marker_report], group_dict_current[marker_report].response_count))
  337. """
  338. else:
  339. # create initial entries
  340. # marker value is actually (part of) our IP address in the first stage
  341. ip_int = marker_value_report_int << (32 - self.markerbits_value)
  342. try:
  343. ip_network = IPv4Network(nw_ip_int=ip_int, prefixlen=self.cidr_bits_stage1)
  344. newgroup = Group(ip_network_object=ip_network, response_count=1)
  345. """
  346. self._initial_count += 1
  347. if self._initial_count % 100 == 0:
  348. logger.debug("%d: creating initial group based on report response: marker=%s, group=%r" %
  349. (self._initial_count, marker_report, newgroup))
  350. """
  351. self._iterations[0][1][self._root_group_name].add_subgroup(newgroup)
  352. group_dict_current[marker_value_report_int] = newgroup
  353. except Exception as ex:
  354. logger.warning("could not create initial group (first stage), correct checksum but wrong network? ip=%r" % ip_int)
  355. print(ex)
  356. else:
  357. group_dict_current = self._iterations[self._group_storeindex][1]
  358. # add entries for iteration >= 2
  359. try:
  360. # group should be created until now
  361. # this can additionally filter out noise as groups are created incrementally
  362. subgroup = group_dict_current[marker_value_report_int]
  363. #if random.random() > 0.5:
  364. # logger.debug("marker value=%d" % marker_value_report_int)
  365. subgroup.response_count += 1
  366. # connect to top group to subgroup if not yet known
  367. # sub groups not connected get deleted at the end of each iteration
  368. # subgroup was not yet added, do it now
  369. if subgroup.response_count == 1:
  370. subgroup.top_group.add_subgroup(subgroup)
  371. except KeyError:
  372. """
  373. logger.warning("checksum correct but marker value %r not found for response (total: %d), "
  374. "marker=%r -> not counting" % (
  375. marker_value_report_int, len(group_dict_current), marker_report))
  376. """
  377. #logger.warning("ip=%r, port src=%r, port dst=%r" % (ip_source, port_src, port_dst))
  378. pass
  379. def _save_state(self):
  380. """
  381. Save the current group state using python pickle format.
  382. """
  383. logger.debug("saving state to: %s" % self._filename_state_groups)
  384. fd_state = open(self._filename_state_groups, "wb")
  385. pickle.dump(self._iterations, fd_state)
  386. fd_state.close()
  387. logger.debug("finished saving state")
  388. def start(self):
  389. """
  390. Starts the PRA attack.
  391. """
  392. if self._running:
  393. logger.debug("can not start: attack is already running")
  394. return
  395. # init report fetcher
  396. self._report_fetcher = self._reportfetcher_class(self._addentry_callback,
  397. self._filename_full_report)
  398. self._running = True
  399. self.attack_thread = threading.Thread(target=self._do_attack)
  400. self.attack_thread.start()
  401. def stop(self):
  402. """
  403. Stops the PRA attack.
  404. """
  405. if not self._running:
  406. logger.warning("Scanner is not running -> nothing to stop")
  407. return
  408. self._running = False
  409. logger.debug("stopping any scan processes")
  410. for scanner in self._current_scans:
  411. scanner.stop()
  412. for sock in self._native_prober_sockets:
  413. sock.close()
  414. for proc in self._native_prober_processes:
  415. proc.terminate()
  416. addresses_found_amount = sum([gti[0].amount_addresses for gti in self._grouphandler.identified_groups])
  417. logger.info("found %d addresses, saving to: %s" % (addresses_found_amount, self._filename_identified_monitors))
  418. fd_write = open(self._filename_identified_monitors, "w")
  419. fd_write.write("address\ttimestamp\titeration\n")
  420. for group_timestamp_iteration in self._grouphandler.identified_groups:
  421. ts = group_timestamp_iteration[1]
  422. iteration = group_timestamp_iteration[2]
  423. for address in group_timestamp_iteration[0].addresses:
  424. line = "%s\t%d\t%d\n" % (address, ts, iteration)
  425. fd_write.write(line)
  426. #logger.debug(line.strip())
  427. fd_write.close()
  428. # disable to save space
  429. #self._save_state()
  430. def _start_native_prober(self):
  431. """
  432. Initiate processes to probe using a native python implementation.
  433. """
  434. self._native_prober_conn_send.clear()
  435. for cnt in range(self._native_prober_amount):
  436. socket_hndl = SocketHndl(iface_name=self.interface_name, buffersize_send=2 ** 28)
  437. self._native_prober_sockets.append(socket_hndl)
  438. proc = Process(target=self._probe_native_cycler, args=(cnt,
  439. socket_hndl,
  440. self._groupqueue,
  441. self._blacklist,
  442. self.mac_source,
  443. self.mac_gw,
  444. self.ip_src,
  445. self.marker_encoding))
  446. self._native_prober_processes.append(proc)
  447. proc.start()
  448. logger.debug("waiting some seconds for processess to settle")
  449. time.sleep(1)
  450. def _probe_native_cycler(self, cnt, sockethndl, groupqueue, ip_blacklist,
  451. mac_src_s, mac_dst_s, ip_src_s, marker_encoding):
  452. """
  453. A native prober cycler to be used with processes.
  454. """
  455. logger.debug("starting probing process No.%d" % cnt)
  456. #basepacket = ethernet.Ethernet(dst_s=mac_dst_s, src_s=mac_src_s) +\
  457. basepacket = ethernet.Ethernet(dst_s=mac_dst_s, src_s=mac_src_s) +\
  458. ip.IP(src_s=ip_src_s, dst_s="1.2.3.4") +\
  459. tcp.TCP(sport=50821)
  460. #logger.debug("basepacket: %r" % basepacket)
  461. send = sockethndl.send
  462. ip_obj = basepacket.body_handler
  463. ip_obj_bin = ip_obj.bin
  464. tcp_obj = ip_obj.tcp
  465. ether_bytes = basepacket.header_bytes
  466. # initialize paket data
  467. basepacket.bin()
  468. queue_get = groupqueue.get
  469. while True:
  470. marker, addresses_bytes, is_cidr = queue_get()
  471. # blacklists are only used for CIDR groups, single adresses were created by feedback IPs
  472. #logger.debug("sending...")
  473. for ip_address in addresses_bytes:
  474. # single addresses or (cidr and not in blacklist)
  475. if is_cidr and ip_address in ip_blacklist:
  476. #logger.debug("not sending because in blacklist: addr=%r, CIDR=%r" % (ip_address, is_cidr))
  477. continue
  478. ip_obj.dst = ip_address
  479. bytes_used = 0
  480. #if cnt % 2000 == 0:
  481. # logger.debug("%d: placing marker: %r" % (self._group_storeindex, marker))
  482. # #time.sleep(0.2)
  483. #cnt += 1
  484. # this assumes that all marker types are fully used
  485. if marker_encoding & ENCODE_PORT_DST == ENCODE_PORT_DST:
  486. tcp_obj.dport = unpack_port(marker[: 2])[0]
  487. bytes_used += 2
  488. if marker_encoding & ENCODE_IP_SRC == ENCODE_IP_SRC:
  489. ip_obj.src = marker[bytes_used: bytes_used + 4][0]
  490. bytes_used += 4
  491. if marker_encoding & ENCODE_PORT_SRC == ENCODE_PORT_SRC:
  492. tcp_obj.sport = unpack_port(marker[bytes_used: bytes_used + 2])[0]
  493. # TODO: use update_auto_fields=False for faster packet creation
  494. send(ether_bytes + ip_obj_bin())
  495. #send(ether_bytes + ip_obj_bin(update_auto_fields=False))
  496. def _do_attack(self):
  497. """
  498. Main attack loop to cycle through iterations until all monitors have been found.
  499. """
  500. group_handler_obj = self._grouphandler
  501. while self._running:
  502. self._iterations.append([[0, 0], {}, 0, 0])
  503. self._group_storeindex += 1
  504. self._iterations[self._group_storeindex][0] = [time.time(), 0]
  505. logger.info("new attack round! group store index: %d" % self._group_storeindex)
  506. # after initial full scan
  507. if self._group_storeindex >= 2:
  508. logger.info("initiating subgrouping using group handler, top groups=%d" %
  509. len(self._iterations[self._group_storeindex - 1][1]))
  510. if self.use_feedback_ips and self._group_storeindex == 2:
  511. self._read_scanner_feedback_addresses()
  512. # create subgroups from groups of last round
  513. group_handler_obj.init_subgroup_creating(self._iterations[self._group_storeindex - 1][1],
  514. self._iterations[self._group_storeindex][1],
  515. self._blacklist_toptoipv4obj)
  516. if group_handler_obj.state == group_handler.STATE_FINISHED:
  517. logger.info("no groups left to scan, all monitors found")
  518. self._iterations[self._group_storeindex][0][1] = time.time()
  519. break
  520. elif self._group_storeindex > 1:
  521. logger.debug("letting group handler create some groups in advance..")
  522. time.sleep(10)
  523. self._report_fetcher.before_scanning()
  524. # limit scanner feedback saving to first iteration
  525. filename_output_csv = self._filename_scannerresponse_stage1 if self._group_storeindex == 1 else None
  526. if self._group_storeindex == 2:
  527. # scanner feedback should have been read by now, start native probe which need the blacklist
  528. self._start_native_prober()
  529. cnt = 0
  530. # scan groups until handler has no groups left or we are initially scanning in stage 1
  531. while group_handler_obj.state != group_handler.STATE_INACTIVE or self._group_storeindex == 1:
  532. """
  533. # Iteration 0: [g_r00t]
  534. # Iteration 1: [g1], [g2], ... <- auto-created by response (we did not create '+1'-groups)
  535. # Iteration 2: [g1, +1][g2, +1] <- updated by group handler
  536. """
  537. group = None
  538. # this is skipped in the first iteration
  539. if self._group_storeindex >= 2:
  540. while group is None and group_handler_obj.state != group_handler.STATE_INACTIVE:
  541. try:
  542. # this blocks until a new group is available
  543. # logger.debug("next group..")
  544. group = group_handler_obj.get_next_subgroup()
  545. #logger.debug("got a group from grouphandler, marker=%r, group=%r" %
  546. # (group.marker_bytes, group))
  547. except queue.Empty:
  548. # loop over timeouts until we got a group or state of grouphandler changes
  549. #logger.debug("Empty...")
  550. time.sleep(1)
  551. pass
  552. # no groups left, break scan loop
  553. if group is None:
  554. break
  555. else:
  556. # initial scan? -> take initial group
  557. group = self._iterations[0][1][self._root_group_name]
  558. cnt += 1
  559. #if cnt % 10000 == 0:
  560. # logger.debug("scan loop %d" % cnt)
  561. if group.is_plus1:
  562. # logger.debug("got a +1 group, not sending: %r" % group)
  563. continue
  564. # group size is too small: send probes via native implementation.
  565. # using a ZMap would be too ineffective: Trade off between
  566. # cost of creating a scan process (ZMap) vs. native probe costs
  567. if (group.amount_addresses < 5000 or group.group_type == Group.GROUP_TYPE_SINGLE_ADDRESSES)\
  568. and self._group_storeindex != 1:
  569. if cnt % 100000 == 0:
  570. logger.info("adding %d address for native probing, cnt=%d, queue grouphandler=%d" %
  571. (group.amount_addresses, cnt, len(self._grouphandler._group_queue)))
  572. #if self._group_storeindex >= 2:
  573. # logger.debug("group=%r, CIDR=%r" % (group, group.group_type == Group.GROUP_TYPE_CIDR))
  574. self._groupqueue.put([group.marker_bytes,
  575. group.addresses_single_bytes,
  576. group.group_type == Group.GROUP_TYPE_CIDR])
  577. #time.sleep(0.1)
  578. else:
  579. blacklist = self._filename_ip_blacklist
  580. if self.use_feedback_ips and self._group_storeindex >= 2 and group.group_type == Group.GROUP_TYPE_CIDR:
  581. # use extended blacklist file for CIDR (avoid scanning of single IPs which
  582. # are already scanned by separate groups)
  583. blacklist = self._filename_ip_blacklist_plus_feedback_ips
  584. self._current_scans.clear()
  585. #if group.amount_addresses > 499:
  586. logger.debug("probing via zmap: max amount addresses=%r, group=%r, markervalue=%r" %
  587. (group.amount_addresses, group, group.marker_value_int))
  588. # time.sleep(2)
  589. scan_timeout = -1
  590. """
  591. if self._group_storeindex == 1:
  592. scan_timeout = 60 * 60
  593. #scan_timeout = 10
  594. logger.debug("limiting scan to %d seconds" % scan_timeout)
  595. """
  596. # group.markervalue: this is None in the first stage -> encode target address
  597. # checksum: ZMap module got his own implementation which is needed by stage 0
  598. # (create checksum of IP addresses)
  599. scanner = scanner_wrapper.ZmapWrapper(
  600. filename_output_csv=filename_output_csv,
  601. filename_blacklist_target_ip=blacklist,
  602. filename_whitelist_target_ip=self._filename_ip_whitelist,
  603. # TODO: comment in to save scanner logs, disables scanner feedback!
  604. #dirname_logs=self._dirname_scanner_logs,
  605. rate_kbit_per_s=self.rate_kbit_per_s,
  606. interface_name=self.interface_name,
  607. mac_source=self.mac_source,
  608. mac_gw=self.mac_gw,
  609. marker_encoding=self.marker_encoding,
  610. markervalue=group.marker_value_int,
  611. markerbits_value=self.markerbits_value,
  612. markerbits_checksum=self.markerbits_checksum,
  613. disable_monitor=self.disable_monitor,
  614. verbosity=3,
  615. dir_zmap=self.base_dir_zmap,
  616. target_addresses=group.addresses,
  617. fast_mode=False,
  618. scan_timeout=scan_timeout)
  619. self._current_scans.append(scanner)
  620. scanner.start()
  621. self._current_scans.clear()
  622. # we don't use the grouphandler so we have to break here
  623. if self._group_storeindex == 1:
  624. break
  625. while self._grouphandler.queuesize > 0:
  626. logger.debug("waiting until all queued native probes have been processed, queue: %d" % self._grouphandler.queuesize)
  627. time.sleep(5)
  628. while not self._groupqueue.empty():
  629. logger.debug("waiting until group queue has been emptied")
  630. time.sleep(5)
  631. self._report_fetcher.after_scanning()
  632. self._iterations[self._group_storeindex][0][1] = time.time()
  633. logger.info("duration of round %d: %d seconds, groups (should be unchanged)=%d" % (
  634. self._group_storeindex,
  635. int(self._iterations[self._group_storeindex][0][1] - self._iterations[self._group_storeindex][0][0]),
  636. len(self._iterations[self._group_storeindex][1])))
  637. # root -> 1st stage -> 2nd stage
  638. if self._group_storeindex >= 2:
  639. if self.use_plus1:
  640. group_handler_obj.update_plus1_subgroups(self._iterations[self._group_storeindex - 1][1])
  641. # update amount of groups
  642. logger.debug("updating amount of groups")
  643. self._iterations[self._group_storeindex][2] = len(self._iterations[self._group_storeindex][1])
  644. # update total addresses
  645. logger.debug("updating total addresses")
  646. self._iterations[self._group_storeindex][3] = sum([group.amount_addresses
  647. for _, group in self._iterations[self._group_storeindex][1].items()])
  648. group_handler_obj.remove_empty_groups(self._iterations[self._group_storeindex][1])
  649. # now everything should be uptodate, cleanup groups of last round
  650. if self._group_storeindex >= 2:
  651. # TODO: activate if needed
  652. group_handler_obj.cleanup_groups(self._iterations[self._group_storeindex - 1][1])
  653. self.stop()
  654. def get_amount_of_probes(self):
  655. """
  656. Return the total amount of probes (or network packets) sent out.
  657. """
  658. # - Single feedback addresses could have been scanned: remove redundant counted single addresses
  659. # len([group1, group2]) -> len([singleaddr_group1, group1-singleaddr_group1, ...])
  660. return self._grouphandler.addresses_total - len(self._blacklist)