123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727 |
- import os
- import logging
- import threading
- import re
- import pickle
- import os.path
- import time
- import math
- import struct
- import queue
- from multiprocessing import Process, SimpleQueue
- from group import Group
- import scanner_wrapper
- import group_handler
- import ipv4
- from utility import ip_str_to_bytes
- from pypacker.checksum import fletcher32
- from pypacker.psocket import SocketHndl
- from pypacker.layer12 import ethernet
- from pypacker.layer3 import ip
- from pypacker.layer4 import tcp
- split_equal = re.compile("=+").split
- split_space = re.compile(" +").split
- split_tab = re.compile("\t").split
- split_comma = re.compile(",").split
- split_slash = re.compile("/").split
- unpack_port = struct.Struct(">H").unpack
- pack_checksum = struct.Struct(">I").pack
- pack_port = struct.Struct(">H").pack
- pack_marker = struct.Struct(">Q").pack
- pack_byte = struct.Struct(">B").pack
- pack_markervalue = struct.Struct(">I").pack
- unpack_marker = struct.Struct(">Q").unpack
- unpack_markervalue = struct.Struct(">Q").unpack
- IPv4Address = ipv4.IPv4Address
- IPv4Network = ipv4.IPv4Network
- ENCODE_PORT_DST = 1
- ENCODE_IP_SRC = 2
- ENCODE_PORT_SRC = 4
- logger = logging.getLogger("pra_framework")
- class ProbeResponseAttackLogic(object):
- """
- Implemenration of the probe response attack using dedicated scanner for probing.
- Workflow:
- >> Stage 1:
- - Send probes without creating initial groups (saves state information) -> create Groups WHILE scanning
- - Get result and add Groups to _iterations[index_iteration]
- - Create groups: Check scanner feedback and re-group per sub-group based on it
- >> Stage 2:
- - Create Ggroups before scanning (first time: after stage 1/iteration 1)
- - Set amount of subgroups based on amount of responses
- - Continue scanning until no groups are left
- The destination address can be encoded into different parts of markers like
- source IP address, source port, destination port etc.
- Pre-filtering of noisy values:
- - Noisy marker values do not get pre-filtered as this problem can be effectively
- mitigated using a trade off between marker/checksum-bits.
- Detection mitigation:
- - Avoiding low-noise markers to avoid detection does not seem to be necessary
- as low noise markers like destination ports would appear with a low frequency
- -> sending 65535 (dst port) * (other marker) values will appear approximately
- "amount of sensors" * y times for every attack iterations, which is likely to be a
- low number for a specific port "p", which is hard to detect as statistic anomaly.
- """
- PROP_DEFAULTS = {
- # network options
- "interface_name": "eth0",
- "mac_source": None,
- "mac_gw": None,
- # source IP, mainly intended for native probes
- "ip_src": "1.1.1.1",
- "rate_kbit_per_s": 1000,
- # marker options
- "marker_encoding": 0,
- "markerbits_value": 24,
- "markerbits_checksum": 8,
- # directory to save all data related to a attack cycle (trailing slash)
- "base_dir_save": "./",
- # directory which contains "src/zmap"
- "base_dir_zmap": "../zmap",
- "disable_monitor": 1,
- "verbosity": 3,
- "report_fetcher_classname": "TracingReportFetcher",
- # state info. Should only be changed by internal logic.
- # stage 1 or 2 (initial 0)
- #"_stage": 0,
- # index to _iterations where most rectent groups get stored
- "_group_storeindex": 0,
- "use_feedback_ips": False,
- "use_plus1": False,
- # TODO: change for testing
- "is_simulation": False,
- # debugging options
- # avoid scanning the whole network -> limit to a subnet (default is: scan whole IPv4 - blacklist)
- "_ip_stage1": "0.0.0.0",
- "_cidr_bits_stage1": 0
- }
- def __init__(self, **kwargs):
- # set default values, will be overwritten by config variables
- # parameter hierarchy: default -> constructor
- for prop, default in ProbeResponseAttackLogic.PROP_DEFAULTS.items():
- setattr(self, prop, kwargs.get(prop, default))
- logger.debug("config: %s = %s" % (prop, kwargs.get(prop, default)))
- self._running = False
- # current scan processes
- self._current_scans = []
- # Files
- self._filename_state_groups = os.path.join(self.base_dir_save, "state_groups.bin")
- # contains information like: ip -> response type
- self._filename_scannerresponse_stage1 = os.path.join(self.base_dir_save, "scanner_response.csv")
- if self.is_simulation:
- self._filename_ip_blacklist = os.path.join(self.base_dir_zmap, "blacklist.conf_simulation")
- self._filename_ip_whitelist = None
- else:
- self._filename_ip_blacklist = os.path.join(self.base_dir_zmap, "blacklist.conf")
- #self._filename_ip_whitelist = os.path.join(self.base_dir_zmap, "whitelist.conf")
- self._filename_ip_whitelist = None
- self._filename_ip_blacklist_plus_feedback_ips = os.path.join(self.base_dir_save, "blacklist_plus_feedback_addr.conf")
- self._filename_full_report = os.path.join(self.base_dir_save, "report_fetcher_responses.csv")
- self._filename_identified_monitors = os.path.join(self.base_dir_save, "identified_monitors.csv")
- self._dirname_scanner_logs = self.base_dir_save
- self.set_report_fetcher_class(self.report_fetcher_classname)
- self._report_fetcher = None
- self.attack_thread = None
- self.marker_bits_total = self.markerbits_value + self.markerbits_checksum
- self.marker_bytes_total = math.ceil((self.markerbits_value + self.markerbits_checksum)/8)
- # amount of bits to shift marker value to make it 4 bytes from the left
- # bits=9: 0000 0000 0000 0000 0000 0001 1111 1111 -> left shift=23 -> 1111 1111 1000 0000....
- self.marker_value_leftshit = 32 - self.markerbits_value
- self.marker_value_bytes_amount = math.ceil(self.markerbits_value/8)
- self.marker_value_amount = 2 ** self.markerbits_value
- # By using pl1 groups we are saving/getting additional "amount of new subgroups" marker values
- logger.debug("total amount of marker VALUES: %d (plus1: %r)" % (self.marker_value_amount, self.use_plus1))
- logger.debug("initial scan: %r/%r" % (self._ip_stage1, self._cidr_bits_stage1))
- # Example: 5 marker values:, _log_2(5)_ = 2 -> 2 Bits = 4 Subgroups
- self.cidr_bits_stage1 = math.floor(math.log(self.marker_value_amount, 2))
- self.marker_checksum_bytes_amount = math.ceil(self.markerbits_checksum / 8)
- # Groups which got/get scanned.
- # [[start, stop], {marke: Group}, total groups, total addresses]
- self._iterations = []
- self._iterations.append([[0, 0], {}, 0, 0])
- initial_group = Group(ip_network_object=IPv4Network(nw_ip_str=self._ip_stage1,
- prefixlen=self._cidr_bits_stage1),
- response_count=0)
- self._root_group_name = b"ROOT_GROUP_PRA"
- self._iterations[0][1][self._root_group_name] = initial_group
- addresses_root = 2 ** (32 - self._cidr_bits_stage1)
- self._iterations[0][2] = min(self.marker_value_amount, addresses_root)
- self._iterations[0][3] = addresses_root
- self._group_storeindex = 0
- """
- if self.markerbits_value % 8 != 0 or self.markerbits_checksum % 8 !=0:
- logger.warning("markerbits not multiple of 8, value/checksum = %r/%r" %
- (self.markerbits_value, self.markerbits_checksum))
- """
- # native prober
- # TODO: adjust this on other platforms
- self._native_prober_amount = 5
- self._native_prober_sockets = []
- self._native_prober_conn_send = []
- self._native_prober_processes = []
- self._native_prober_conn_send_index = 0
- self._groupqueue = SimpleQueue()
- self._blacklist = set()
- # { top group marker value -> set(feedback_address1, feedback_address2, ...)}
- self._blacklist_toptoipv4obj = {}
- self._grouphandler = group_handler.GroupHandler(self.marker_value_amount,
- self.use_plus1,
- self._create_marker_bitlevel)
- self._initial_count = 0
- def set_report_fetcher_class(self, classname):
- if self._running:
- return
- # get report fetcher by classname
- fetcher_module = __import__("report_fetcher")
- try:
- logger.debug("setting report fetcher class: %s" % classname)
- self._reportfetcher_class = getattr(fetcher_module, classname)
- except:
- raise Exception("could not load Report Fetcher class! Is it implemented in report_fetcher.py?")
- def _report_values_to_marker(self, ip_source, port_src, port_dst):
- """
- Combines all markers given by a report in the correct order. Unused parts are left out.
- return -- markervalue (int), markerchecksum (int), marker (bytes)
- """
- bts = []
- if self.marker_encoding & ENCODE_PORT_DST == ENCODE_PORT_DST:
- bts.append(pack_port(port_dst))
- if self.marker_encoding & ENCODE_IP_SRC == ENCODE_IP_SRC:
- bts.append(ip_str_to_bytes(ip_source))
- if self.marker_encoding & ENCODE_PORT_SRC == ENCODE_PORT_SRC:
- bts.append(pack_port(port_src))
- bts = b"".join(bts)
- #logger.debug("decoding: %s" % bts)
- markervalue_and_checksum = int.from_bytes(bts, "big")
- #logger.debug("report full marker: %s=%d" % (bts, markervalue_and_checksum))
- # marker value: b"AAAA AAAA AAAA AABB" -> b"00AA AAAA AAAA AAAA"
- # marker checksum: b"AAAA AAAA AAAA AABB" -> b"0000 0000 0000 00BB"
- marker_length = self.marker_bits_total
- return (markervalue_and_checksum >> (marker_length - self.markerbits_value)),\
- markervalue_and_checksum & (0xFFFFFFFFFFFFFFFF >> (64 - self.markerbits_checksum)),\
- bts
- def _create_checksum_bitlevel(self, markervalue):
- """
- Create a checksum using value markervalue
- markervalue -- integer to create checksum from (non padded)
- return -- checksum as integer
- """
- marker_value_leftshifted = markervalue << self.marker_value_leftshit
- marker_value_bytes_forchecksum = pack_markervalue(marker_value_leftshifted)
- #logger.debug("padded marker value before checksum: %s" % marker_padded)
- #return pack_checksum(fletcher32(marker_padded, len(marker_padded)/2)), marker_padded
- #checksum_int = fletcher32(marker_value_bytes_forchecksum, len(marker_value_bytes_forchecksum)/2) & 0xFFFFFFFF
- return fletcher32(marker_value_bytes_forchecksum, 2) >> (32 - self.markerbits_checksum)
- def _create_marker_bitlevel(self, marker_value_int_to_encode):
- """
- Create a new marker like [marker][checksum] from the given value_int.
- marker value:
- 291 -> b"00000123" (hex) -> b"000123" (marker value: eg 3 bytes)
- -> chk = checksum(b"000123" -> b"00012300") = b"abcdefgh"
- -> b"000123" (marker value) + b"abcdefgh"[:checksum_len] (marker checksum)
- Resulting marker (3 value bytes, 2 checksum bytes): b"000123abcdefgh" -> b"000123abcd"
- WARNING: this assumes self.marker_bits_total % 8 == 0 is True (otherwise shift resulting marker to the left)
- """
- checksum_int = self._create_checksum_bitlevel(marker_value_int_to_encode)
- # take marker value from left, cut off marker bits from right
- marker_int = (marker_value_int_to_encode << self.markerbits_checksum) | checksum_int
- #logger.debug("new marker value orig/marker value for checksum/checksum is = %s/%s/%s" %
- # (marker_value, marker_value_padded, marker_checksum))
- # INFO: if self.marker_bits_total is not divisible by 8 without rest: shift self.marker_bits_total % 8 to right
- return pack_marker(marker_int)[-self.marker_bytes_total:]
- def _read_scanner_feedback_addresses(self):
- """
- Read scanner feedback addresses gathered from stage 1. Additionally this updates
- _filename_ip_blacklist_plus_feedback_ips to contain all blacklisted IPs (including feedback ones).
- """
- if not os.path.exists(self._filename_scannerresponse_stage1):
- logger.warn("could not find response file at: %s" % self._filename_scannerresponse_stage1)
- return []
- logger.debug("reading feedback file")
- fd = open(self._filename_scannerresponse_stage1, "r")
- # skip header
- fd.readline()
- # File format of feedback file:
- # classification,saddr,daddr,daddr_inner_icmp,sport,dport,success
- # rst,1.0.0.0,1.1.1.1,(None),256,2,0
- # synack,1.0.0.1,1.1.1.1,(None),256,2,1
- self._blacklist.clear()
- self._blacklist_toptoipv4obj.clear()
- groups_stage1 = self._iterations[1][1]
- # found IP -> remove trailing/unused bits: (found IP) >> (IPv4 length - (marker value bits))
- bits_shift_ip = 32 - self.markerbits_value
- for line in fd:
- columns = split_comma(line)
- if len(columns) < 4:
- logger.warning("not enough columns in CSV file: %r" % columns)
- continue
- # CSV format: type,ip src (attack target), ip dst,ip extern (our extern ip),...
- responsetype = columns[0]
- # assume everything other than ICMP are potential monitor nodes
- if not responsetype.startswith("icmp"):
- address = columns[1]
- else:
- continue
- # assume ICMP unreachable indicates no monitor node
- try:
- # convert ip to marker value and check if its belongs to a found group
- # this is MUCH faster than checking every single group
- address_obj = IPv4Address(ip_str=address)
- # 1.2.3.4 -> 1.2 = marker value
- top_group_markervalue = address_obj.ip_int >> bits_shift_ip
- if not top_group_markervalue in groups_stage1:
- #logger.derbug("skipping IP not belonging in initial groups: %r" % address_obj)
- continue
- except Exception as ex:
- logger.warning("invalid IPv4 address in feedback file: %r" % address)
- print(ex)
- continue
- # this is a bit redundant but doesn't hurt
- if address_obj.packed not in self._blacklist:
- self._blacklist.add(address_obj.packed)
- else:
- # already saved
- # skip adding the address to the _blacklist_toptoipv4obj list
- continue
- try:
- self._blacklist_toptoipv4obj[top_group_markervalue].append(address_obj)
- except KeyError:
- self._blacklist_toptoipv4obj[top_group_markervalue] = [address_obj]
- fd.close()
- logger.debug("amount feedback addresses: %d" % len(self._blacklist))
- # update extended blacklist file
- logger.debug("updating extended blacklist file")
- fd_read = open(self._filename_ip_blacklist, "r")
- blacklist_standard = fd_read.read()
- fd_read.close()
- fd_write = open(self._filename_ip_blacklist_plus_feedback_ips, "w")
- fd_write.write(blacklist_standard)
- for top_group_markervalue, addresses in self._blacklist_toptoipv4obj.items():
- for addr in addresses:
- fd_write.write("%s/32\n" % addr.compressed)
- fd_write.close()
- def _addentry_callback(self, ip_source, port_src, port_dst):
- """
- Callback called by reportfetcher if a new reponse was found.
- ip_source -- IPv4 address as string
- port_src -- source port as int
- port_dst -- destination port as int
- """
- marker_value_report_int, marker_checksum_report_int, marker_report = self._report_values_to_marker(ip_source,
- port_src,
- port_dst)
- #logger.debug("from report: marker value=%d, checksum=%d, full marker=%s" %
- # (marker_value_report_int, marker_checksum_report_int, marker_report))
- marker_checksum_gen = self._create_checksum_bitlevel(marker_value_report_int)
- #logger.debug("comparing checksum: new/report = %s/%s" % (checksum_gen, marker_checksum_report))
- # compare via startswith: checksum could only be partially encoded
- # skip values having invalid checksums, ignore if no checksum is used
- if marker_checksum_report_int != marker_checksum_gen and self.markerbits_checksum != 0:
- #logger.debug("checksum didn't match (YOLO): %r!=%r" % (
- # marker_checksum_report_int, marker_checksum_gen))
- return
- # logger.debug("checksum matched!!!")
- if self._group_storeindex == 1:
- group_dict_current = self._iterations[1][1]
- if marker_value_report_int in group_dict_current:
- group_dict_current[marker_value_report_int].response_count += 1
- """
- logger.debug("incremented response count: group=%r, count=%d" %
- (group_dict_current[marker_report], group_dict_current[marker_report].response_count))
- """
- else:
- # create initial entries
- # marker value is actually (part of) our IP address in the first stage
- ip_int = marker_value_report_int << (32 - self.markerbits_value)
- try:
- ip_network = IPv4Network(nw_ip_int=ip_int, prefixlen=self.cidr_bits_stage1)
- newgroup = Group(ip_network_object=ip_network, response_count=1)
- """
- self._initial_count += 1
- if self._initial_count % 100 == 0:
- logger.debug("%d: creating initial group based on report response: marker=%s, group=%r" %
- (self._initial_count, marker_report, newgroup))
- """
- self._iterations[0][1][self._root_group_name].add_subgroup(newgroup)
- group_dict_current[marker_value_report_int] = newgroup
- except Exception as ex:
- logger.warning("could not create initial group (first stage), correct checksum but wrong network? ip=%r" % ip_int)
- print(ex)
- else:
- group_dict_current = self._iterations[self._group_storeindex][1]
- # add entries for iteration >= 2
- try:
- # group should be created until now
- # this can additionally filter out noise as groups are created incrementally
- subgroup = group_dict_current[marker_value_report_int]
- #if random.random() > 0.5:
- # logger.debug("marker value=%d" % marker_value_report_int)
- subgroup.response_count += 1
- # connect to top group to subgroup if not yet known
- # sub groups not connected get deleted at the end of each iteration
- # subgroup was not yet added, do it now
- if subgroup.response_count == 1:
- subgroup.top_group.add_subgroup(subgroup)
- except KeyError:
- """
- logger.warning("checksum correct but marker value %r not found for response (total: %d), "
- "marker=%r -> not counting" % (
- marker_value_report_int, len(group_dict_current), marker_report))
- """
- #logger.warning("ip=%r, port src=%r, port dst=%r" % (ip_source, port_src, port_dst))
- pass
- def _save_state(self):
- """
- Save the current group state using python pickle format.
- """
- logger.debug("saving state to: %s" % self._filename_state_groups)
- fd_state = open(self._filename_state_groups, "wb")
- pickle.dump(self._iterations, fd_state)
- fd_state.close()
- logger.debug("finished saving state")
- def start(self):
- """
- Starts the PRA attack.
- """
- if self._running:
- logger.debug("can not start: attack is already running")
- return
- # init report fetcher
- self._report_fetcher = self._reportfetcher_class(self._addentry_callback,
- self._filename_full_report)
- self._running = True
- self.attack_thread = threading.Thread(target=self._do_attack)
- self.attack_thread.start()
- def stop(self):
- """
- Stops the PRA attack.
- """
- if not self._running:
- logger.warning("Scanner is not running -> nothing to stop")
- return
- self._running = False
- logger.debug("stopping any scan processes")
- for scanner in self._current_scans:
- scanner.stop()
- for sock in self._native_prober_sockets:
- sock.close()
- for proc in self._native_prober_processes:
- proc.terminate()
- addresses_found_amount = sum([gti[0].amount_addresses for gti in self._grouphandler.identified_groups])
- logger.info("found %d addresses, saving to: %s" % (addresses_found_amount, self._filename_identified_monitors))
- fd_write = open(self._filename_identified_monitors, "w")
- fd_write.write("address\ttimestamp\titeration\n")
- for group_timestamp_iteration in self._grouphandler.identified_groups:
- ts = group_timestamp_iteration[1]
- iteration = group_timestamp_iteration[2]
- for address in group_timestamp_iteration[0].addresses:
- line = "%s\t%d\t%d\n" % (address, ts, iteration)
- fd_write.write(line)
- #logger.debug(line.strip())
- fd_write.close()
- self._save_state()
- def _start_native_prober(self):
- """
- Initiate processes to probe using a native python implementation.
- """
- self._native_prober_conn_send.clear()
- for cnt in range(self._native_prober_amount):
- socket_hndl = SocketHndl(iface_name=self.interface_name, buffersize_send=2 ** 28)
- self._native_prober_sockets.append(socket_hndl)
- proc = Process(target=self._probe_native_cycler, args=(cnt,
- socket_hndl,
- self._groupqueue,
- self._blacklist,
- self.mac_source,
- self.mac_gw,
- self.ip_src,
- self.marker_encoding))
- self._native_prober_processes.append(proc)
- proc.start()
- logger.debug("waiting some seconds for processess to settle")
- time.sleep(1)
- def _probe_native_cycler(self, cnt, sockethndl, groupqueue, ip_blacklist, mac_src_s, mac_dst_s, ip_src_s, marker_encoding):
- """
- A native prober cycler to be used with processes.
- """
- logger.debug("starting probing process No.%d" % cnt)
- #basepacket = ethernet.Ethernet(dst_s=mac_dst_s, src_s=mac_src_s) +\
- basepacket = ethernet.Ethernet(dst_s=mac_dst_s, src_s=mac_src_s) +\
- ip.IP(src_s=ip_src_s, dst_s="1.2.3.4") +\
- tcp.TCP(sport=50821)
- #logger.debug("basepacket: %r" % basepacket)
- send = sockethndl.send
- ip_obj = basepacket.body_handler
- ip_obj_bin = ip_obj.bin
- tcp_obj = ip_obj.tcp
- ether_bytes = basepacket.header_bytes
- # initialize paket data
- basepacket.bin()
- queue_get = groupqueue.get
- while True:
- marker, addresses_bytes, is_cidr = queue_get()
- # blacklists are only used for CIDR groups, single adresses were created by feedback IPs
- #logger.debug("sending...")
- for ip_address in addresses_bytes:
- # single addresses or (cidr and not in blacklist)
- if is_cidr and ip_address in ip_blacklist:
- #logger.debug("not sending because in blacklist: addr=%r, CIDR=%r" % (ip_address, is_cidr))
- continue
- ip_obj.dst = ip_address
- bytes_used = 0
- #if cnt % 2000 == 0:
- # logger.debug("%d: placing marker: %r" % (self._group_storeindex, marker))
- # #time.sleep(0.2)
- #cnt += 1
- # this assumes that all marker types are fully used
- if marker_encoding & ENCODE_PORT_DST == ENCODE_PORT_DST:
- tcp_obj.dport = unpack_port(marker[: 2])[0]
- bytes_used += 2
- if marker_encoding & ENCODE_IP_SRC == ENCODE_IP_SRC:
- ip_obj.src = marker[bytes_used: bytes_used + 4][0]
- bytes_used += 4
- if marker_encoding & ENCODE_PORT_SRC == ENCODE_PORT_SRC:
- tcp_obj.sport = unpack_port(marker[bytes_used: bytes_used + 2])[0]
- # TODO: use update_auto_fields=False for faster packet creation
- send(ether_bytes + ip_obj_bin())
- #send(ether_bytes + ip_obj_bin(update_auto_fields=False))
- def _do_attack(self):
- """
- Main attack loop to cycle through iterations until all monitors have been found.
- """
- group_handler_obj = self._grouphandler
- while self._running:
- self._iterations.append([[0, 0], {}, 0, 0])
- self._group_storeindex += 1
- self._iterations[self._group_storeindex][0] = [time.time(), 0]
- logger.info("new attack round! group store index: %d" % self._group_storeindex)
- # after initial full scan
- if self._group_storeindex >= 2:
- logger.info("initiating subgrouping using group handler, top groups=%d" % len(self._iterations[self._group_storeindex - 1][1]))
- if self.use_feedback_ips and self._group_storeindex == 2:
- self._read_scanner_feedback_addresses()
- # create subgroups from groups of last round
- group_handler_obj.init_subgroup_creating(self._iterations[self._group_storeindex - 1][1],
- self._iterations[self._group_storeindex][1],
- self._blacklist_toptoipv4obj)
- if group_handler_obj.state == group_handler.STATE_FINISHED:
- logger.info("no groups left to scan, all monitors found")
- self._iterations[self._group_storeindex][0][1] = time.time()
- break
- elif self._group_storeindex > 1:
- logger.debug("letting group handler create some groups in advance..")
- time.sleep(10)
- self._report_fetcher.before_scanning()
- # limit scanner feedback saving to first iteration
- filename_output_csv = self._filename_scannerresponse_stage1 if self._group_storeindex == 1 else None
- if self._group_storeindex == 2:
- # scanner feedback should have been read by now, start native probe which need the blacklist
- self._start_native_prober()
- cnt = 0
- # scan groups until handler has no groups left or we are initially scanning in stage 1
- while group_handler_obj.state != group_handler.STATE_INACTIVE or self._group_storeindex == 1:
- """
- # Iteration 0: [g_r00t]
- # Iteration 1: [g1], [g2], ... <- auto-created by response (we did not create '+1'-groups)
- # Iteration 2: [g1, +1][g2, +1] <- updated by group handler
- """
- group = None
- # this is skipped in the first iteration
- if self._group_storeindex >= 2:
- while group is None and group_handler_obj.state != group_handler.STATE_INACTIVE:
- try:
- # this blocks until a new group is available
- # logger.debug("next group..")
- group = group_handler_obj.get_next_subgroup()
- #logger.debug("got a group from grouphandler, marker=%r, group=%r" %
- # (group.marker_bytes, group))
- except queue.Empty:
- # loop over timeouts until we got a group or state of grouphandler changes
- #logger.debug("Empty...")
- time.sleep(1)
- pass
- # no groups left, break scan loop
- if group is None:
- break
- else:
- # initial scan? -> take initial group
- group = self._iterations[0][1][self._root_group_name]
- cnt += 1
- #if cnt % 10000 == 0:
- # logger.debug("scan loop %d" % cnt)
- if group.is_plus1:
- # logger.debug("got a +1 group, not sending: %r" % group)
- continue
- # group size is too small: send probes via native implementation.
- # using a ZMap would be too ineffective: Trade off between
- # cost of creating a scan process (ZMap) vs. native probe costs
- if (group.amount_addresses < 5000 or group.group_type == Group.GROUP_TYPE_SINGLE_ADDRESSES)\
- and self._group_storeindex != 1:
- if cnt % 100000 == 0:
- logger.info("adding %d address for native probing, cnt=%d, queue grouphandler=%d" %
- (group.amount_addresses, cnt, len(self._grouphandler._group_queue)))
- #if self._group_storeindex >= 2:
- # logger.debug("group=%r, CIDR=%r" % (group, group.group_type == Group.GROUP_TYPE_CIDR))
- self._groupqueue.put([group.marker_bytes,
- group.addresses_single_bytes,
- group.group_type == Group.GROUP_TYPE_CIDR])
- #time.sleep(0.1)
- else:
- blacklist = self._filename_ip_blacklist
- if self.use_feedback_ips and self._group_storeindex >= 2 and group.group_type == Group.GROUP_TYPE_CIDR:
- # use extended blacklist file for CIDR (avoid scanning of single IPs which
- # are already scanned by separate groups)
- blacklist = self._filename_ip_blacklist_plus_feedback_ips
- self._current_scans.clear()
- #if group.amount_addresses > 499:
- logger.debug("probing via zmap: max amount addresses=%r, group=%r, markervalue=%r" %
- (group.amount_addresses, group, group.marker_value_int))
- # time.sleep(2)
- scan_timeout = -1
- # TODO: just for testing: stop scanning after x seconds
- """
- if self._group_storeindex == 1:
- scan_timeout = 60 * 60
- #scan_timeout = 10
- logger.debug("limiting scan to %d seconds" % scan_timeout)
- """
- # group.markervalue: this is None in the first stage -> encode target address
- # checksum: ZMap module got his own implementation which is needed by stage 0
- # (create checksum of IP addresses from whole IPv4 address range)
- scanner = scanner_wrapper.ZmapWrapper(
- filename_output_csv=filename_output_csv,
- filename_blacklist_target_ip=blacklist,
- filename_whitelist_target_ip=self._filename_ip_whitelist,
- # TODO: comment in to save scanner logs, disables scanner feedback!
- #dirname_logs=self._dirname_scanner_logs,
- rate_kbit_per_s=self.rate_kbit_per_s,
- interface_name=self.interface_name,
- mac_source=self.mac_source,
- mac_gw=self.mac_gw,
- marker_encoding=self.marker_encoding,
- markervalue=group.marker_value_int,
- markerbits_value=self.markerbits_value,
- markerbits_checksum=self.markerbits_checksum,
- disable_monitor=self.disable_monitor,
- verbosity=3,
- dir_zmap=self.base_dir_zmap,
- target_addresses=group.addresses,
- fast_mode=False,
- scan_timeout=scan_timeout)
- self._current_scans.append(scanner)
- scanner.start()
- self._current_scans.clear()
- # we don't use the grouphandler so we have to break here
- if self._group_storeindex == 1:
- break
- while self._grouphandler.queuesize > 0:
- logger.debug("waiting until all queued native probes have been processed, queue: %d" % self._grouphandler.queuesize)
- time.sleep(5)
- while not self._groupqueue.empty():
- logger.debug("waiting until group queue has been emptied")
- time.sleep(5)
- self._report_fetcher.after_scanning()
- self._iterations[self._group_storeindex][0][1] = time.time()
- logger.info("duration of round %d: %d seconds, groups (should be unchanged)=%d" % (
- self._group_storeindex,
- int(self._iterations[self._group_storeindex][0][1] - self._iterations[self._group_storeindex][0][0]),
- len(self._iterations[self._group_storeindex][1])))
- if self._group_storeindex >= 2:
- if self.use_plus1:
- group_handler_obj.update_plus1_subgroups(self._iterations[self._group_storeindex - 1][1])
- self._iterations[self._group_storeindex][2] = len(self._iterations[self._group_storeindex][1])
- self._iterations[self._group_storeindex][3] = sum([group.amount_addresses
- for _,group in self._iterations[self._group_storeindex][1].items()])
- group_handler_obj.remove_empty_groups(self._iterations[self._group_storeindex][1])
- self.stop()
- def get_amount_of_probes(self):
- """
- Return the total amount of probes (or network packets) sent out.
- """
- # - Single feedback addresses could have been scanned: remove redundant counted single addresses
- # len([group1, group2]) -> len([singleaddr_group1, group1-singleaddr_group1, ...])
- return self._grouphandler.addresses_total - len(self._blacklist)
|