import unittest import logging import threading import time import sys import re import struct from struct import pack, unpack import ipaddress from ipaddress import IPv4Address as IPv4AddressIntern from ipaddress import IPv4Network as IPv4NetworkIntern import utility import scanner_wrapper from group import Group import attack_logic from main_monitor_simulator import MonitorSimulator from ipv4_address import IPv4Address, IPv4Network from pypacker.layer12 import ethernet from pypacker.layer3 import ip from pypacker.layer4 import tcp from pypacker.psocket import SocketHndl from pypacker.checksum import fletcher32 logging.basicConfig(format="%(levelname)s (%(funcName)s): %(message)s") logger = logging.getLogger("proberesponseattack") logger.setLevel(logging.DEBUG) class TestStaticMethods(unittest.TestCase): def test_ip_to_str(self): ip_zero = utility.int_to_ip_str(0) self.assertEqual("0.0.0.0", ip_zero) ip_one = utility.int_to_ip_str(1) self.assertEqual("0.0.0.1", ip_one) ip256 = utility.int_to_ip_str(256) self.assertEqual("0.0.1.0", ip256) ip512 = utility.int_to_ip_str(512) self.assertEqual("0.0.2.0", ip512) ipmax = utility.int_to_ip_str(2 ** 31) self.assertEqual("128.0.0.0", ipmax) groupsize_stage1 = int((265 ** 4) / 65536) ipend1 = utility.int_to_ip_str(groupsize_stage1) print(ipend1) self.assertEqual("0.1.37.241", ipend1) class TestScanner(unittest.TestCase): def test_scanlocalhost(self): def ready_callback(obj): logger.warning("callback: scanner finished") # this should scan an empty target scanner = scanner_wrapper.ZmapWrapper(["127.0.0.1"], 80, ip_source="127.0.0.1", ready_callback=ready_callback) scanner.start() def test_scanouter(self): def ready_callback(obj): logger.warning("callback: scanner finished") # this should scan an empty target scanner = scanner_wrapper.ZmapWrapper(["173.194.44.88/16"], 80, ready_callback=ready_callback) threading.Thread(target=scanner.start).start() logger.debug("sleeping until stopping process") for x in range(10): time.sleep(1) scanner.get_rest_time() logger.debug("now stopping scan") scanner.stop() class ZmapWrapperTest(unittest.TestCase): def test_send(self): def read_callback(obj): print("zmap has finished!") wrapper = scanner_wrapper.ZmapWrapper(mac_gw="00:13:e8:63:f3:8f", mac_source=None, interface_name="lo", marker_encoding=7, markervalue=4294967296, markerbits_value=32, markerbits_checksum=32, target_addresses="192.168.178.0/24", ready_callback=read_callback) wrapper.start() class ZmapEncoderTest(unittest.TestCase): def _test_encode_decode_checksum_allmarker(self): # config: all marker (32 Bit), full checksum (32 Bit) wrapper = scanner_wrapper.ZmapWrapper( mac_gw="24:65:11:85:e9:ac", mac_source="00:13:e8:63:f3:8f", interface_name="lo", filename_blacklist_target_ip="./test_files/blacklist_minimal.conf", marker_encoding=7, markervalue=4294967294, #port_dst=12345, markerbits_value=32, markerbits_checksum=32, verbosity=3, target_addresses="127.0.0.1/32", ) def start_wrapper_delayed(): time.sleep(2) wrapper.start() scanthread = threading.Thread(target=start_wrapper_delayed) sockethandler = SocketHndl(timeout=5) scanthread.start() found = False for bts in sockethandler: pkt = ethernet.Ethernet(bts) logger.info("got packet") dport = pkt[tcp.TCP].dport ip_src = pkt[ip.IP].src sport = pkt[tcp.TCP].sport #print(fletcher32(b"\xFF\xFF\xFF\xFE", 2)) # 0xFFFF FFFEFFFE FFFE if dport == 0xFFFF and ip_src == b"\xFF\xFE\xFF\xFE" and sport == 0xFFFE: logger.info("found encoded packet") ip_from_marker = pack(">H", dport) + ip_src[:2] checksum_new = fletcher32(ip_from_marker, 2) logger.debug(" checksum(%r) = %r" % (ip_from_marker, checksum_new)) checksum_given = unpack(">I", ip_src[2:] + pack(">H", sport))[0] logger.info("checksums: %r == %r ???" % (checksum_new, checksum_given)) self.assertEqual(checksum_new, checksum_given) found = True break else: logger.debug("%0X %r %0X" % (dport, ip_src, sport)) self.assertTrue(found) sockethandler.close() def test_encode_decode_checksum_onlyports(self): # config: port marker: dport 16 Bit/sport 8 Bit (24 Bit), checksum (8 Bit) wrapper = scanner_wrapper.ZmapWrapper( mac_gw="24:65:11:85:e9:ac", mac_source="00:13:e8:63:f3:8f", filename_blacklist_target_ip="./test_files/blacklist_minimal.conf", interface_name="lo", marker_encoding=5, markervalue=4294967294, markerbits_value=24, markerbits_checksum=8, verbosity=3, target_addresses="127.0.0.1/32", ) def start_wrapper_delayed(): time.sleep(2) wrapper.start() scanthread = threading.Thread(target=start_wrapper_delayed) sockethandler = SocketHndl(timeout=5) scanthread.start() found = False for bts in sockethandler: pkt = ethernet.Ethernet(bts) dport = pkt[tcp.TCP].dport sport = pkt[tcp.TCP].sport # Encoding via: destination + source port # marker value: 0xFFFFFFFE -3 Bytes-> 0xFFFFFF00 -> checksum: 0xFF]00FF00 # -> marker: FFFF FFFF = (dport, sport) if dport == 0xFFFF and sport == 0xFFFF: logger.info("!!!!! found encoded packet") marker_valuer = pack(">H", dport) + pack(">H", sport & 0xFF00) checksum_new = fletcher32(marker_valuer, 2) >> 24 logger.debug("checksum(%r) = %r" % (marker_valuer, checksum_new)) checksum_given = sport & 0xFF logger.info("checksums: %r == %r ???" % (checksum_new, checksum_given)) self.assertEqual(checksum_new, checksum_given) found = True break else: logger.debug("dport/sport = %0X / %0X" % (dport, sport)) self.assertTrue(found) sockethandler.close() pack_checksum_4bytes = struct.Struct(">I").pack class ProbeResponseAttackLogicTest(unittest.TestCase): def test_multiple(self): logger.debug("1") logic = attack_logic.ProbeResponseAttackLogic( markerbits_value=24, markerbits_checksum=8, base_dir_zmap="../zmap", base_dir_save="./test_files" ) logic._read_scanner_feedback_addresses() # 3 bytes will be padded by b"\x00" bts = b"\01\02\03" checksum_gen = logic._create_checksum(bts) checksum_correct = pack_checksum_4bytes(fletcher32(b"\x00" + bts, 2)) self.assertEqual(checksum_gen, checksum_correct) # logger.debug("2") marker = logic._report_values_to_marker("1.2.3.4", 65535, 1024) self.assertEqual(marker, b"\x04\x00" + b"\01\x02\x03\x04" + b"\xFF\xFF") # logger.debug("3") marker = logic._create_marker(7593) marker_value = b"\x00\x1d\xa9" self.assertEqual(marker[0:3], marker_value) checksum = pack_checksum_4bytes(fletcher32(b"\x00" + marker_value, 2))[-1:] self.assertEqual(marker, marker_value + checksum) logger.debug("4") logic._save_state() logic._read_state() class GroupTest(unittest.TestCase): def test_init(self): group = Group(ip_network_bytes=b"\01\02\03\00", cidr_bits=31) group.create_subgroups(2) self.assertEqual(len(group.subgroups), 2) group = Group(ip_network_bytes=b"\01\02\03\00", cidr_bits=24) group.create_subgroups(5) self.assertEqual(len(group.subgroups), 5) # 3 valid addresses in range 1.2.3.0/24 ip_objs = [IPv4Address(ip_str) for ip_str in ["1.2.3.4", "1.2.3.5", "1.2.3.6", "9.9.9.9"] ] group = Group(ip_network_bytes=b"\01\02\03\00", cidr_bits=24) group.create_subgroups(5, ipv4_addresses=ip_objs) class ChecksumTest(unittest.TestCase): def test_fletcher32(self): pack_checksum = struct.Struct(">I").pack checksum = fletcher32(b"\x00\x00\x00\x00", 2) checksum_packed = pack_checksum_4bytes(checksum) self.assertEqual(checksum_packed, b"\xFF\xFF\xFF\xFF") checksum = fletcher32(b"\xFF\xFF\xFF\xFE", 2) checksum_packed = pack_checksum_4bytes(checksum) self.assertEqual(checksum_packed, b"\xFF\xFE\xFF\xFE") checksum = fletcher32(b"\x00\x02\x03\x04", 2) checksum_packed = pack_checksum_4bytes(checksum) self.assertEqual(checksum_packed, b"\x03\x08\x03\x06") def test_fletcher32_iterate(self): print("starting to iterate") pack_4bytes = struct.Struct(">I").pack for x in range(10100, 10600): value = pack_4bytes(x << 4) chk = fletcher32(value, 2) print("checksum(%s) = %d (%s)" % (value, chk >> 28, pack_4bytes(chk))) class SimulatorTest(unittest.TestCase): """The simulator is tested manually""" def test_simulator(self): pass class ZMapSinglePacket(unittest.TestCase): def test_zmap_single_packet(self): """ Input/Output for checksum 0x00000000 0xFFFFFFFF 0xFFFFFFFF 0xFFFFFFFF 0xFFFFFFFE 0xFFFEFFFE 0x00020304 0x03080306 """ wrapper = scanner_wrapper.ZmapWrapper( mac_gw="24:65:11:85:e9:ac", mac_source="00:13:e8:63:f3:8f", interface_name="lo", rate=1, marker_encoding=7, filename_blacklist_target_ip="./test_files/blacklist_minimal.conf", #markervalue=4294967294, markervalue=0x00020304, markerbits_value=32, markerbits_checksum=32, verbosity=4, disable_monitor=1, target_addresses="127.0.0.1/32", ) wrapper.start() class ZMapMassScan(unittest.TestCase): def test_zmap_single_packet(self): wrapper = scanner_wrapper.ZmapWrapper( mac_gw="9e:fd:12:61:48:09", #mac_source="", interface_name="eth10", rate_mbit_per_s=500, marker_encoding=0, filename_blacklist_target_ip="./test_files/blacklist_minimal.conf", markervalue=None, markerbits_value=32, markerbits_checksum=32, verbosity=0, disable_monitor=1, #target_addresses=["1.2.3.4/32", "5.6.7.8/32"] target_addresses=["1.0.0.0/8"] ) wrapper.start() from pypacker.psocket import SocketHndl class TestNativeProbe(unittest.TestCase): """ Scanning 17.000.000 addresses takes ~45 minutes on a 1,6GHz double core. Interpolated to a Quadcore with 3GHz this should give ~6 Minutes ((1/0.012287) * (1438961773.0532002 - 1438961740.421199)) / (60*2*4) """ def test_send(self): basepacket = ethernet.Ethernet(dst_s="00:01:02:03:04:05", src_s="00:01:02:03:04:05") +\ ip.IP(src_s="1.1.1.1", dst_s="1.2.3.4") +\ tcp.TCP(sport=50821) hndl = SocketHndl(iface_name="eth10") send = hndl.send iterations = 256**3 - 500000 ip_obj = basepacket.body_handler tcp_obj = ip_obj.body_handler basepacket_bin = basepacket.bin ether_ip_bytes = basepacket.header_bytes + ip_obj.header_bytes start = time.time() for cnt in range(iterations): if cnt % 10000 == 0: print("%f" % (cnt/(time.time()-start))) tcp_obj.dport = 1234 #ip_obj.src = b"\x00\x11\x22\x33" tcp_obj.sport = 1234 send(ether_ip_bytes + tcp_obj.bin()) cnt += 1 print(time.time()) class IPv4AddressTest(unittest.TestCase): def test_ipv4address(self): nws = [[IPv4Network(nw_ip_int=0, prefixlen=30), IPv4NetworkIntern("0.0.0.0/30"), 1, IPv4Address(ip_int=3), IPv4AddressIntern("0.0.0.1")], [IPv4Network(nw_ip_int=0, prefixlen=30), IPv4NetworkIntern("0.0.0.0/30"), 2, IPv4Address(ip_int=2), IPv4AddressIntern("0.0.0.2")], [IPv4Network(nw_ip_int=0, prefixlen=31), IPv4NetworkIntern("0.0.0.0/31"), 1, IPv4Address(ip_int=1), IPv4AddressIntern("0.0.0.1")], [IPv4Network(nw_ip_int=2**20 + 2**21, prefixlen=15), IPv4NetworkIntern("0.48.0.0/15"), 5, IPv4Address(ip_int=2**20 + 2**21 + 1), IPv4AddressIntern("0.48.0.1")] ] for nw in nws: logger.debug("%r <-> %r" % (nw[0], nw[1])) #logger.debug("%r <-> %r" % (len(nw[0]), len(l))) self.assertEqual(nw[0].compressed, nw[1].compressed) self.assertTrue(nw[3] in nw[0]) self.assertTrue(nw[4] in nw[1]) sn1 = nw[0].subnets(prefixlen_diff=nw[2]) sn2 = nw[1].subnets(prefixlen_diff=nw[2]) cnt = 0 for sn in sn2: self.assertEqual(sn1[cnt].compressed, sn.compressed) cnt += 1 self.assertEqual(cnt, len(sn1)) print("----") nws = [[IPv4Network(nw_ip_bytes=b"\x00\x00\x00\x00", prefixlen=30), IPv4NetworkIntern("0.0.0.0/30"), 1, IPv4Address(ip_int=3), IPv4AddressIntern("0.0.0.1")], [IPv4Network(nw_ip_bytes=b"\x00\x00\x00\x00", prefixlen=30), IPv4NetworkIntern("0.0.0.0/30"), 2, IPv4Address(ip_bytes=b"\x00\x00\x00\x02"), IPv4AddressIntern("0.0.0.2")], [IPv4Network(nw_ip_bytes=b"\x00\x00\x00\x00", prefixlen=31), IPv4NetworkIntern("0.0.0.0/31"), 1, IPv4Address(ip_str="0.0.0.1"), IPv4AddressIntern("0.0.0.1")], [IPv4Network(nw_ip_int=2**20 + 2**21, prefixlen=15), IPv4NetworkIntern("0.48.0.0/15"), 5, IPv4Address(ip_int=2**20 + 2**21 + 1), IPv4AddressIntern("0.48.0.1")], [IPv4Network(nw_ip_str="1.2.3.128", prefixlen=31), IPv4NetworkIntern("1.2.3.128/31"), 1, IPv4Address(ip_str="1.2.3.129"), IPv4AddressIntern("1.2.3.129")] ] for nw in nws: logger.debug("%r <-> %r" % (nw[0], nw[1])) #logger.debug("%r <-> %r" % (len(nw[0]), len(l))) self.assertEqual(nw[0].compressed, nw[1].compressed) self.assertTrue(nw[3] in nw[0]) self.assertTrue(nw[4] in nw[1]) sn1 = nw[0].subnets(prefixlen_diff=nw[2]) sn2 = nw[1].subnets(prefixlen_diff=nw[2]) cnt = 0 for sn in sn2: self.assertEqual(sn1[cnt].compressed, sn.compressed) cnt += 1 self.assertEqual(cnt, len(sn1)) self.assertEqual(len(IPv4Network(nw_ip_bytes=b"\x00\x00\x00\x00", prefixlen=30).hosts), 4) self.assertEqual(len(IPv4Network(nw_ip_bytes=b"\x00\x00\x00\x00", prefixlen=29).hosts), 8) #print("hosts: %r" % IPv4Network(nw_ip_bytes=b"\x00\x02\x00\x00", prefixlen=29).hosts) def _test_ipv4address_containsin_nwset(self): nwset = set(IPv4Network(nw_ip_str="1.0.0.0", prefixlen=30).subnets(2)) ipv4addr = IPv4Address(ip_str="0.0.0.1") print(ipv4addr in nwset) def _test_ipv4address_performance(self): repeats = 2000 start = time.time() for x in range(repeats): sn = IPv4NetworkIntern("1.2.3.0/24").subnets(5) for x in sn: y = x print("time diff: %f" % (time.time() - start)) start = time.time() for x in range(repeats): sn = IPv4Network(nw_ip_str="1.2.3.0", prefixlen=24).subnets(prefixlen_diff=5) for x in sn: y = x print("time diff: %f (should be ~50 times faster)" % (time.time() - start)) def _test_ipv4address_performance2(self): repeats = 500000 start = time.time() nw = IPv4Network(nw_ip_str="1.2.3.0", prefixlen=24) for x in range(repeats): if x % 10000 == 0: print(x/repeats) sn = nw.subnets(prefixlen_diff=8) g = Group(ip_network_object=sn) def _test_ipv4address_memory(self): l = [] total = 256**3 for x in range(total): if x % 100000 == 0: print(x/total) nw = IPv4Network(nw_ip_int=256**3, prefixlen=24) g = Group(ip_network_object=nw) l.append(g) import hashlib class DShieldNoiseTest(unittest.TestCase): def test_noise(self): """ Test noise resilience of various checksums """ pack_port = struct.Struct(">H").pack split_tab = re.compile("\t").split #split_newline = re.compile("\r?\n").split pack_markervalue = struct.Struct(">I").pack unpack_4bytes = struct.Struct(">I").unpack markervalue_length = 28 # XXXX checksum_length = 4 # XXXX checksum_mask = (0xFFFFFFFFFFFFFFFF >> (64 - checksum_length)) # Noise results on DShield data # Noise = amount of unique false values introduced, every unique marker value is counted once as # additional coutns just increase the response count for a group # # fletcher32: # 4 Bits Checksum = ~1,3% Noise # 6 Bits Checksum = ~0,6% Noise # 600 MB DShield report data = ~6.450.129 events -> 1,3% = ~82.861 events # MD5: # similar results (see above) md5 = hashlib.md5 def get_checksum(bts): return unpack_4bytes(md5(bts).digest()[:4])[0] def _report_values_to_marker(ip_source, port_src, port_dst): """ Combines all markers given by a report in the correct order. Unused parts are left out. return -- markervalue (int), markerchecksum (int), marker (bytes) """ bts = [] if port_dst is not None: bts.append(pack_port(port_dst)) if ip_source is not None: bts.append(ip_str_to_bytes(ip_source)) if port_src is not None: bts.append(pack_port(port_src)) bts = b"".join(bts) markervalue_and_checksum = int.from_bytes(bts, "big") #logger.debug("report full marker: %s=%d" % (bts, markervalue_and_checksum)) # marker value: b"AAAA AAAA AAAA AABB" -> b"00AA AAAA AAAA AAAA" # marker value: b"AAAA AAAA AAAA AABB" -> b"0000 0000 0000 00BB" marker_length = 32 return (markervalue_and_checksum >> (marker_length - markervalue_length)),\ markervalue_and_checksum & checksum_mask,\ bts def _create_checksum_bitlevel(markervalue): """ Create a checksum using value markervalue markervalue -- integer to create checksum from (non padded) return -- checksum as integer """ marker_value_leftshifted = markervalue << checksum_length marker_value_bytes_forchecksum = pack_markervalue(marker_value_leftshifted) #logger.debug("padded marker value before checksum: %s" % marker_padded) #return pack_checksum(fletcher32(marker_padded, len(marker_padded)/2)), marker_padded #checksum_int = fletcher32(marker_value_bytes_forchecksum, len(marker_value_bytes_forchecksum)/2) & 0xFFFFFFFF return fletcher32(marker_value_bytes_forchecksum, 2) >> (32 - checksum_length) #return get_checksum(marker_value_bytes_forchecksum) >> (32 - checksum_length) fd = open("./raw_data_dshield/dshield_report_2014_04_23.csv", "r") firstline = fd.readline() #print("skipping: %s" % firstline) cnt_total = 0 cnt_matches = 0 cnt_parsefail = 0 parse_match = {} for line in fd: cnt_total += 1 if cnt_total % 100000 == 0: print("count=%d, matches=%d, ratio=%f parse fail=%d" % (cnt_total, len(parse_match), (len(parse_match)/cnt_total), cnt_parsefail)) columns = split_tab(line) #print(columns) try: ip_source, port_src, port_dst = None, int(columns[4]), int(columns[5]) except: cnt_parsefail += 1 continue marker_value_report_int, marker_checksum_report_int, marker_report = _report_values_to_marker(ip_source, port_src, port_dst) marker_checksum_gen = _create_checksum_bitlevel(marker_value_report_int) #time.sleep(1) #print("checking checksum: %d <-> %d" % (marker_checksum_report_int, marker_checksum_gen)) if marker_checksum_report_int == marker_checksum_gen: #print("there was a match!!!!") parse_match[marker_value_report_int] = 1 print("count=%d, matches=%d, ratio=%f parse fail=%d" % (cnt_total, cnt_matches, (cnt_matches/cnt_total), cnt_parsefail)) fd.close() class TracingSimulator(unittest.TestCase): """ Note: Test this by starting TraCINg (node index.js) -> connect via report fetcher (python reportfetcher.py, config for TraCINg) and execute test (python tests.py) """ def test_tracing_event(self): monitorsimulator = MonitorSimulator( amount_monitors=1000, interface_name="eth10", buffer_size=10, url_tracing="https://localhost:443/") pkt = ethernet.Ethernet() + ip.IP(src_s="1.1.1.1", dst_s="1.2.3.4") + tcp.TCP() monitorsimulator._attack_reaction(pkt) import report_fetcher class DShieldNormalizerTest(unittest.TestCase): def test_ipnormalizer(self): print(report_fetcher.DShieldReportFetcher.ip_to_dshield_ip("1.2.3.4")) print(report_fetcher.DShieldReportFetcher.ip_to_dshield_ip(utility.get_external_ip())) suite = unittest.TestSuite() loader = unittest.defaultTestLoader # suite.addTests(loader.loadTestsFromTestCase(ZmapWrapperTest)); # suite.addTests(loader.loadTestsFromTestCase(ZmapEncoderTest)) # suite.addTests(loader.loadTestsFromTestCase(ProbeResponseAttackLogicTest)) # suite.addTests(loader.loadTestsFromTestCase(GroupTest)) # suite.addTests(loader.loadTestsFromTestCase(ChecksumTest)) #suite.addTests(loader.loadTestsFromTestCase(DShieldNoiseTest)) # suite.addTests(loader.loadTestsFromTestCase(ZMapSinglePacket)) #suite.addTests(loader.loadTestsFromTestCase(ZMapMassScan)) #suite.addTests(loader.loadTestsFromTestCase(TracingSimulator)) #suite.addTests(loader.loadTestsFromTestCase(IPv4AddressTest)) #suite.addTests(loader.loadTestsFromTestCase(TestNativeProbe)) suite.addTests(loader.loadTestsFromTestCase(DShieldNormalizerTest)) if __name__ == '__main__': logger.info("starting tests") # unittest.main() unittest.TextTestRunner().run(suite)