123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594 |
- import unittest
- import logging
- import threading
- import time
- import sys
- import re
- import struct
- from struct import pack, unpack
- import ipaddress
- from ipaddress import IPv4Address as IPv4AddressIntern
- from ipaddress import IPv4Network as IPv4NetworkIntern
- import utility
- import scanner_wrapper
- from group import Group
- import attack_logic
- from main_monitor_simulator import MonitorSimulator
- from ipv4_address import IPv4Address, IPv4Network
- from pypacker.layer12 import ethernet
- from pypacker.layer3 import ip
- from pypacker.layer4 import tcp
- from pypacker.psocket import SocketHndl
- from pypacker.checksum import fletcher32
- logging.basicConfig(format="%(levelname)s (%(funcName)s): %(message)s")
- logger = logging.getLogger("proberesponseattack")
- logger.setLevel(logging.DEBUG)
- class TestStaticMethods(unittest.TestCase):
- def test_ip_to_str(self):
- ip_zero = utility.int_to_ip_str(0)
- self.assertEqual("0.0.0.0", ip_zero)
- ip_one = utility.int_to_ip_str(1)
- self.assertEqual("0.0.0.1", ip_one)
- ip256 = utility.int_to_ip_str(256)
- self.assertEqual("0.0.1.0", ip256)
- ip512 = utility.int_to_ip_str(512)
- self.assertEqual("0.0.2.0", ip512)
- ipmax = utility.int_to_ip_str(2 ** 31)
- self.assertEqual("128.0.0.0", ipmax)
- groupsize_stage1 = int((265 ** 4) / 65536)
- ipend1 = utility.int_to_ip_str(groupsize_stage1)
- print(ipend1)
- self.assertEqual("0.1.37.241", ipend1)
- class TestScanner(unittest.TestCase):
- def test_scanlocalhost(self):
- def ready_callback(obj):
- logger.warning("callback: scanner finished")
- # this should scan an empty target
- scanner = scanner_wrapper.ZmapWrapper(["127.0.0.1"],
- 80,
- ip_source="127.0.0.1",
- ready_callback=ready_callback)
- scanner.start()
- def test_scanouter(self):
- def ready_callback(obj):
- logger.warning("callback: scanner finished")
- # this should scan an empty target
- scanner = scanner_wrapper.ZmapWrapper(["173.194.44.88/16"],
- 80,
- ready_callback=ready_callback)
- threading.Thread(target=scanner.start).start()
- logger.debug("sleeping until stopping process")
- for x in range(10):
- time.sleep(1)
- scanner.get_rest_time()
- logger.debug("now stopping scan")
- scanner.stop()
- class ZmapWrapperTest(unittest.TestCase):
- def test_send(self):
- def read_callback(obj):
- print("zmap has finished!")
- wrapper = scanner_wrapper.ZmapWrapper(mac_gw="00:13:e8:63:f3:8f",
- mac_source=None,
- interface_name="lo",
- marker_encoding=7,
- markervalue=4294967296,
- markerbits_value=32,
- markerbits_checksum=32,
- target_addresses="192.168.178.0/24",
- ready_callback=read_callback)
- wrapper.start()
- class ZmapEncoderTest(unittest.TestCase):
- def _test_encode_decode_checksum_allmarker(self):
- # config: all marker (32 Bit), full checksum (32 Bit)
- wrapper = scanner_wrapper.ZmapWrapper(
- mac_gw="24:65:11:85:e9:ac",
- mac_source="00:13:e8:63:f3:8f",
- interface_name="lo",
- filename_blacklist_target_ip="./test_files/blacklist_minimal.conf",
- marker_encoding=7,
- markervalue=4294967294,
- #port_dst=12345,
- markerbits_value=32,
- markerbits_checksum=32,
- verbosity=3,
- target_addresses="127.0.0.1/32",
- )
- def start_wrapper_delayed():
- time.sleep(2)
- wrapper.start()
- scanthread = threading.Thread(target=start_wrapper_delayed)
- sockethandler = SocketHndl(timeout=5)
- scanthread.start()
- found = False
- for bts in sockethandler:
- pkt = ethernet.Ethernet(bts)
- logger.info("got packet")
- dport = pkt[tcp.TCP].dport
- ip_src = pkt[ip.IP].src
- sport = pkt[tcp.TCP].sport
- #print(fletcher32(b"\xFF\xFF\xFF\xFE", 2))
- # 0xFFFF FFFEFFFE FFFE
- if dport == 0xFFFF and ip_src == b"\xFF\xFE\xFF\xFE" and sport == 0xFFFE:
- logger.info("found encoded packet")
- ip_from_marker = pack(">H", dport) + ip_src[:2]
- checksum_new = fletcher32(ip_from_marker, 2)
- logger.debug(" checksum(%r) = %r" % (ip_from_marker, checksum_new))
- checksum_given = unpack(">I", ip_src[2:] + pack(">H", sport))[0]
- logger.info("checksums: %r == %r ???" % (checksum_new, checksum_given))
- self.assertEqual(checksum_new, checksum_given)
- found = True
- break
- else:
- logger.debug("%0X %r %0X" % (dport, ip_src, sport))
- self.assertTrue(found)
- sockethandler.close()
- def test_encode_decode_checksum_onlyports(self):
- # config: port marker: dport 16 Bit/sport 8 Bit (24 Bit), checksum (8 Bit)
- wrapper = scanner_wrapper.ZmapWrapper(
- mac_gw="24:65:11:85:e9:ac",
- mac_source="00:13:e8:63:f3:8f",
- filename_blacklist_target_ip="./test_files/blacklist_minimal.conf",
- interface_name="lo",
- marker_encoding=5,
- markervalue=4294967294,
- markerbits_value=24,
- markerbits_checksum=8,
- verbosity=3,
- target_addresses="127.0.0.1/32",
- )
- def start_wrapper_delayed():
- time.sleep(2)
- wrapper.start()
- scanthread = threading.Thread(target=start_wrapper_delayed)
- sockethandler = SocketHndl(timeout=5)
- scanthread.start()
- found = False
- for bts in sockethandler:
- pkt = ethernet.Ethernet(bts)
- dport = pkt[tcp.TCP].dport
- sport = pkt[tcp.TCP].sport
- # Encoding via: destination + source port
- # marker value: 0xFFFFFFFE -3 Bytes-> 0xFFFFFF00 -> checksum: 0xFF]00FF00
- # -> marker: FFFF FFFF = (dport, sport)
- if dport == 0xFFFF and sport == 0xFFFF:
- logger.info("!!!!! found encoded packet")
- marker_valuer = pack(">H", dport) + pack(">H", sport & 0xFF00)
- checksum_new = fletcher32(marker_valuer, 2) >> 24
- logger.debug("checksum(%r) = %r" % (marker_valuer, checksum_new))
- checksum_given = sport & 0xFF
- logger.info("checksums: %r == %r ???" % (checksum_new, checksum_given))
- self.assertEqual(checksum_new, checksum_given)
- found = True
- break
- else:
- logger.debug("dport/sport = %0X / %0X" % (dport, sport))
- self.assertTrue(found)
- sockethandler.close()
- pack_checksum_4bytes = struct.Struct(">I").pack
- class ProbeResponseAttackLogicTest(unittest.TestCase):
- def test_multiple(self):
- logger.debug("1")
- logic = attack_logic.ProbeResponseAttackLogic(
- markerbits_value=24,
- markerbits_checksum=8,
- base_dir_zmap="../zmap",
- base_dir_save="./test_files"
- )
- logic._read_scanner_feedback_addresses()
- # 3 bytes will be padded by b"\x00"
- bts = b"\01\02\03"
- checksum_gen = logic._create_checksum(bts)
- checksum_correct = pack_checksum_4bytes(fletcher32(b"\x00" + bts, 2))
- self.assertEqual(checksum_gen, checksum_correct)
- #
- logger.debug("2")
- marker = logic._report_values_to_marker("1.2.3.4", 65535, 1024)
- self.assertEqual(marker, b"\x04\x00" + b"\01\x02\x03\x04" + b"\xFF\xFF")
- #
- logger.debug("3")
- marker = logic._create_marker(7593)
- marker_value = b"\x00\x1d\xa9"
- self.assertEqual(marker[0:3], marker_value)
- checksum = pack_checksum_4bytes(fletcher32(b"\x00" + marker_value, 2))[-1:]
- self.assertEqual(marker, marker_value + checksum)
- logger.debug("4")
- logic._save_state()
- logic._read_state()
- class GroupTest(unittest.TestCase):
- def test_init(self):
- group = Group(ip_network_bytes=b"\01\02\03\00", cidr_bits=31)
- group.create_subgroups(2)
- self.assertEqual(len(group.subgroups), 2)
- group = Group(ip_network_bytes=b"\01\02\03\00", cidr_bits=24)
- group.create_subgroups(5)
- self.assertEqual(len(group.subgroups), 5)
- # 3 valid addresses in range 1.2.3.0/24
- ip_objs = [IPv4Address(ip_str)
- for ip_str in ["1.2.3.4", "1.2.3.5", "1.2.3.6", "9.9.9.9"]
- ]
- group = Group(ip_network_bytes=b"\01\02\03\00", cidr_bits=24)
- group.create_subgroups(5, ipv4_addresses=ip_objs)
- class ChecksumTest(unittest.TestCase):
- def test_fletcher32(self):
- pack_checksum = struct.Struct(">I").pack
- checksum = fletcher32(b"\x00\x00\x00\x00", 2)
- checksum_packed = pack_checksum_4bytes(checksum)
- self.assertEqual(checksum_packed, b"\xFF\xFF\xFF\xFF")
- checksum = fletcher32(b"\xFF\xFF\xFF\xFE", 2)
- checksum_packed = pack_checksum_4bytes(checksum)
- self.assertEqual(checksum_packed, b"\xFF\xFE\xFF\xFE")
- checksum = fletcher32(b"\x00\x02\x03\x04", 2)
- checksum_packed = pack_checksum_4bytes(checksum)
- self.assertEqual(checksum_packed, b"\x03\x08\x03\x06")
- def test_fletcher32_iterate(self):
- print("starting to iterate")
- pack_4bytes = struct.Struct(">I").pack
- for x in range(10100, 10600):
- value = pack_4bytes(x << 4)
- chk = fletcher32(value, 2)
- print("checksum(%s) = %d (%s)" % (value, chk >> 28, pack_4bytes(chk)))
- class SimulatorTest(unittest.TestCase):
- """The simulator is tested manually"""
- def test_simulator(self):
- pass
- class ZMapSinglePacket(unittest.TestCase):
- def test_zmap_single_packet(self):
- """
- Input/Output for checksum
- 0x00000000 0xFFFFFFFF
- 0xFFFFFFFF 0xFFFFFFFF
- 0xFFFFFFFE 0xFFFEFFFE
- 0x00020304 0x03080306
- """
- wrapper = scanner_wrapper.ZmapWrapper(
- mac_gw="24:65:11:85:e9:ac",
- mac_source="00:13:e8:63:f3:8f",
- interface_name="lo",
- rate=1,
- marker_encoding=7,
- filename_blacklist_target_ip="./test_files/blacklist_minimal.conf",
- #markervalue=4294967294,
- markervalue=0x00020304,
- markerbits_value=32,
- markerbits_checksum=32,
- verbosity=4,
- disable_monitor=1,
- target_addresses="127.0.0.1/32",
- )
- wrapper.start()
- class ZMapMassScan(unittest.TestCase):
- def test_zmap_single_packet(self):
- wrapper = scanner_wrapper.ZmapWrapper(
- mac_gw="9e:fd:12:61:48:09",
- #mac_source="",
- interface_name="eth10",
- rate_mbit_per_s=500,
- marker_encoding=0,
- filename_blacklist_target_ip="./test_files/blacklist_minimal.conf",
- markervalue=None,
- markerbits_value=32,
- markerbits_checksum=32,
- verbosity=0,
- disable_monitor=1,
- #target_addresses=["1.2.3.4/32", "5.6.7.8/32"]
- target_addresses=["1.0.0.0/8"]
- )
- wrapper.start()
- from pypacker.psocket import SocketHndl
- class TestNativeProbe(unittest.TestCase):
- """
- Scanning 17.000.000 addresses takes ~45 minutes on a 1,6GHz double core.
- Interpolated to a Quadcore with 3GHz this should give ~6 Minutes
- ((1/0.012287) * (1438961773.0532002 - 1438961740.421199)) / (60*2*4)
- """
- def test_send(self):
- basepacket = ethernet.Ethernet(dst_s="00:01:02:03:04:05", src_s="00:01:02:03:04:05") +\
- ip.IP(src_s="1.1.1.1", dst_s="1.2.3.4") +\
- tcp.TCP(sport=50821)
- hndl = SocketHndl(iface_name="eth10")
- send = hndl.send
- iterations = 256**3 - 500000
- ip_obj = basepacket.body_handler
- tcp_obj = ip_obj.body_handler
- basepacket_bin = basepacket.bin
- ether_ip_bytes = basepacket.header_bytes + ip_obj.header_bytes
- start = time.time()
- for cnt in range(iterations):
- if cnt % 10000 == 0:
- print("%f" % (cnt/(time.time()-start)))
- tcp_obj.dport = 1234
- #ip_obj.src = b"\x00\x11\x22\x33"
- tcp_obj.sport = 1234
- send(ether_ip_bytes + tcp_obj.bin())
- cnt += 1
- print(time.time())
- class IPv4AddressTest(unittest.TestCase):
- def test_ipv4address(self):
- nws = [[IPv4Network(nw_ip_int=0, prefixlen=30), IPv4NetworkIntern("0.0.0.0/30"), 1, IPv4Address(ip_int=3), IPv4AddressIntern("0.0.0.1")],
- [IPv4Network(nw_ip_int=0, prefixlen=30), IPv4NetworkIntern("0.0.0.0/30"), 2, IPv4Address(ip_int=2), IPv4AddressIntern("0.0.0.2")],
- [IPv4Network(nw_ip_int=0, prefixlen=31), IPv4NetworkIntern("0.0.0.0/31"), 1, IPv4Address(ip_int=1), IPv4AddressIntern("0.0.0.1")],
- [IPv4Network(nw_ip_int=2**20 + 2**21, prefixlen=15), IPv4NetworkIntern("0.48.0.0/15"), 5, IPv4Address(ip_int=2**20 + 2**21 + 1), IPv4AddressIntern("0.48.0.1")]
- ]
- for nw in nws:
- logger.debug("%r <-> %r" % (nw[0], nw[1]))
- #logger.debug("%r <-> %r" % (len(nw[0]), len(l)))
- self.assertEqual(nw[0].compressed, nw[1].compressed)
- self.assertTrue(nw[3] in nw[0])
- self.assertTrue(nw[4] in nw[1])
- sn1 = nw[0].subnets(prefixlen_diff=nw[2])
- sn2 = nw[1].subnets(prefixlen_diff=nw[2])
- cnt = 0
- for sn in sn2:
- self.assertEqual(sn1[cnt].compressed, sn.compressed)
- cnt += 1
- self.assertEqual(cnt, len(sn1))
- print("----")
- nws = [[IPv4Network(nw_ip_bytes=b"\x00\x00\x00\x00", prefixlen=30), IPv4NetworkIntern("0.0.0.0/30"), 1, IPv4Address(ip_int=3), IPv4AddressIntern("0.0.0.1")],
- [IPv4Network(nw_ip_bytes=b"\x00\x00\x00\x00", prefixlen=30), IPv4NetworkIntern("0.0.0.0/30"), 2, IPv4Address(ip_bytes=b"\x00\x00\x00\x02"), IPv4AddressIntern("0.0.0.2")],
- [IPv4Network(nw_ip_bytes=b"\x00\x00\x00\x00", prefixlen=31), IPv4NetworkIntern("0.0.0.0/31"), 1, IPv4Address(ip_str="0.0.0.1"), IPv4AddressIntern("0.0.0.1")],
- [IPv4Network(nw_ip_int=2**20 + 2**21, prefixlen=15), IPv4NetworkIntern("0.48.0.0/15"), 5, IPv4Address(ip_int=2**20 + 2**21 + 1), IPv4AddressIntern("0.48.0.1")],
- [IPv4Network(nw_ip_str="1.2.3.128", prefixlen=31), IPv4NetworkIntern("1.2.3.128/31"), 1, IPv4Address(ip_str="1.2.3.129"), IPv4AddressIntern("1.2.3.129")]
- ]
- for nw in nws:
- logger.debug("%r <-> %r" % (nw[0], nw[1]))
- #logger.debug("%r <-> %r" % (len(nw[0]), len(l)))
- self.assertEqual(nw[0].compressed, nw[1].compressed)
- self.assertTrue(nw[3] in nw[0])
- self.assertTrue(nw[4] in nw[1])
- sn1 = nw[0].subnets(prefixlen_diff=nw[2])
- sn2 = nw[1].subnets(prefixlen_diff=nw[2])
- cnt = 0
- for sn in sn2:
- self.assertEqual(sn1[cnt].compressed, sn.compressed)
- cnt += 1
- self.assertEqual(cnt, len(sn1))
- self.assertEqual(len(IPv4Network(nw_ip_bytes=b"\x00\x00\x00\x00", prefixlen=30).hosts), 4)
- self.assertEqual(len(IPv4Network(nw_ip_bytes=b"\x00\x00\x00\x00", prefixlen=29).hosts), 8)
- #print("hosts: %r" % IPv4Network(nw_ip_bytes=b"\x00\x02\x00\x00", prefixlen=29).hosts)
- def _test_ipv4address_containsin_nwset(self):
- nwset = set(IPv4Network(nw_ip_str="1.0.0.0", prefixlen=30).subnets(2))
- ipv4addr = IPv4Address(ip_str="0.0.0.1")
- print(ipv4addr in nwset)
- def _test_ipv4address_performance(self):
- repeats = 2000
- start = time.time()
- for x in range(repeats):
- sn = IPv4NetworkIntern("1.2.3.0/24").subnets(5)
- for x in sn:
- y = x
- print("time diff: %f" % (time.time() - start))
- start = time.time()
- for x in range(repeats):
- sn = IPv4Network(nw_ip_str="1.2.3.0", prefixlen=24).subnets(prefixlen_diff=5)
- for x in sn:
- y = x
- print("time diff: %f (should be ~50 times faster)" % (time.time() - start))
- def _test_ipv4address_performance2(self):
- repeats = 500000
- start = time.time()
- nw = IPv4Network(nw_ip_str="1.2.3.0", prefixlen=24)
- for x in range(repeats):
- if x % 10000 == 0:
- print(x/repeats)
- sn = nw.subnets(prefixlen_diff=8)
- g = Group(ip_network_object=sn)
- def _test_ipv4address_memory(self):
- l = []
- total = 256**3
- for x in range(total):
- if x % 100000 == 0:
- print(x/total)
- nw = IPv4Network(nw_ip_int=256**3, prefixlen=24)
- g = Group(ip_network_object=nw)
- l.append(g)
- import hashlib
- class DShieldNoiseTest(unittest.TestCase):
- def test_noise(self):
- """
- Test noise resilience of various checksums
- """
- pack_port = struct.Struct(">H").pack
- split_tab = re.compile("\t").split
- #split_newline = re.compile("\r?\n").split
- pack_markervalue = struct.Struct(">I").pack
- unpack_4bytes = struct.Struct(">I").unpack
- markervalue_length = 28 # XXXX
- checksum_length = 4 # XXXX
- checksum_mask = (0xFFFFFFFFFFFFFFFF >> (64 - checksum_length))
- # Noise results on DShield data
- # Noise = amount of unique false values introduced, every unique marker value is counted once as
- # additional coutns just increase the response count for a group
- #
- # fletcher32:
- # 4 Bits Checksum = ~1,3% Noise
- # 6 Bits Checksum = ~0,6% Noise
- # 600 MB DShield report data = ~6.450.129 events -> 1,3% = ~82.861 events
- # MD5:
- # similar results (see above)
- md5 = hashlib.md5
- def get_checksum(bts):
- return unpack_4bytes(md5(bts).digest()[:4])[0]
- def _report_values_to_marker(ip_source, port_src, port_dst):
- """
- Combines all markers given by a report in the correct order. Unused parts are left out.
- return -- markervalue (int), markerchecksum (int), marker (bytes)
- """
- bts = []
- if port_dst is not None:
- bts.append(pack_port(port_dst))
- if ip_source is not None:
- bts.append(ip_str_to_bytes(ip_source))
- if port_src is not None:
- bts.append(pack_port(port_src))
- bts = b"".join(bts)
- markervalue_and_checksum = int.from_bytes(bts, "big")
- #logger.debug("report full marker: %s=%d" % (bts, markervalue_and_checksum))
- # marker value: b"AAAA AAAA AAAA AABB" -> b"00AA AAAA AAAA AAAA"
- # marker value: b"AAAA AAAA AAAA AABB" -> b"0000 0000 0000 00BB"
- marker_length = 32
- return (markervalue_and_checksum >> (marker_length - markervalue_length)),\
- markervalue_and_checksum & checksum_mask,\
- bts
- def _create_checksum_bitlevel(markervalue):
- """
- Create a checksum using value markervalue
- markervalue -- integer to create checksum from (non padded)
- return -- checksum as integer
- """
- marker_value_leftshifted = markervalue << checksum_length
- marker_value_bytes_forchecksum = pack_markervalue(marker_value_leftshifted)
- #logger.debug("padded marker value before checksum: %s" % marker_padded)
- #return pack_checksum(fletcher32(marker_padded, len(marker_padded)/2)), marker_padded
- #checksum_int = fletcher32(marker_value_bytes_forchecksum, len(marker_value_bytes_forchecksum)/2) & 0xFFFFFFFF
- return fletcher32(marker_value_bytes_forchecksum, 2) >> (32 - checksum_length)
- #return get_checksum(marker_value_bytes_forchecksum) >> (32 - checksum_length)
- fd = open("./raw_data_dshield/dshield_report_2014_04_23.csv", "r")
- firstline = fd.readline()
- #print("skipping: %s" % firstline)
- cnt_total = 0
- cnt_matches = 0
- cnt_parsefail = 0
- parse_match = {}
- for line in fd:
- cnt_total += 1
- if cnt_total % 100000 == 0:
- print("count=%d, matches=%d, ratio=%f parse fail=%d" % (cnt_total, len(parse_match), (len(parse_match)/cnt_total), cnt_parsefail))
- columns = split_tab(line)
- #print(columns)
- try:
- ip_source, port_src, port_dst = None, int(columns[4]), int(columns[5])
- except:
- cnt_parsefail += 1
- continue
- marker_value_report_int, marker_checksum_report_int, marker_report = _report_values_to_marker(ip_source, port_src, port_dst)
- marker_checksum_gen = _create_checksum_bitlevel(marker_value_report_int)
- #time.sleep(1)
- #print("checking checksum: %d <-> %d" % (marker_checksum_report_int, marker_checksum_gen))
- if marker_checksum_report_int == marker_checksum_gen:
- #print("there was a match!!!!")
- parse_match[marker_value_report_int] = 1
- print("count=%d, matches=%d, ratio=%f parse fail=%d" % (cnt_total, cnt_matches, (cnt_matches/cnt_total), cnt_parsefail))
- fd.close()
- class TracingSimulator(unittest.TestCase):
- """
- Note:
- Test this by starting TraCINg (node index.js) -> connect via report fetcher
- (python reportfetcher.py, config for TraCINg) and execute test (python tests.py)
- """
- def test_tracing_event(self):
- monitorsimulator = MonitorSimulator(
- amount_monitors=1000,
- interface_name="eth10",
- buffer_size=10,
- url_tracing="https://localhost:443/")
- pkt = ethernet.Ethernet() + ip.IP(src_s="1.1.1.1", dst_s="1.2.3.4") + tcp.TCP()
- monitorsimulator._attack_reaction(pkt)
- import report_fetcher
- class DShieldNormalizerTest(unittest.TestCase):
- def test_ipnormalizer(self):
- print(report_fetcher.DShieldReportFetcher.ip_to_dshield_ip("1.2.3.4"))
- print(report_fetcher.DShieldReportFetcher.ip_to_dshield_ip(utility.get_external_ip()))
- suite = unittest.TestSuite()
- loader = unittest.defaultTestLoader
- # suite.addTests(loader.loadTestsFromTestCase(ZmapWrapperTest));
- # suite.addTests(loader.loadTestsFromTestCase(ZmapEncoderTest))
- # suite.addTests(loader.loadTestsFromTestCase(ProbeResponseAttackLogicTest))
- # suite.addTests(loader.loadTestsFromTestCase(GroupTest))
- # suite.addTests(loader.loadTestsFromTestCase(ChecksumTest))
- #suite.addTests(loader.loadTestsFromTestCase(DShieldNoiseTest))
- # suite.addTests(loader.loadTestsFromTestCase(ZMapSinglePacket))
- #suite.addTests(loader.loadTestsFromTestCase(ZMapMassScan))
- #suite.addTests(loader.loadTestsFromTestCase(TracingSimulator))
- #suite.addTests(loader.loadTestsFromTestCase(IPv4AddressTest))
- #suite.addTests(loader.loadTestsFromTestCase(TestNativeProbe))
- suite.addTests(loader.loadTestsFromTestCase(DShieldNormalizerTest))
- if __name__ == '__main__':
- logger.info("starting tests")
- # unittest.main()
- unittest.TextTestRunner().run(suite)
|