Sfoglia il codice sorgente

Fixed test-case and added a determinism-checking test-case (which does its job because it fails right now)

Denis Waßmann 6 anni fa
parent
commit
5054679743

+ 2 - 2
code/ID2TLib/OldLibs/IPGenerator.py

@@ -1,5 +1,5 @@
 import random
-from code.ID2TLib import IPv4 as ip
+from .. import IPv4 as ip
 
 
 class IPChooser:
@@ -25,7 +25,7 @@ class IPChooserByRange(IPChooser):
 		return self.range.block_size()
 
 class IPChooserByList(IPChooser):
-	def __init__(self, ips: list[ip.IPAddress]) -> "IPChooserByList":
+	def __init__(self, ips: "list[ip.IPAddress]") -> "IPChooserByList":
 		self.ips = list(ips)
 		if not self.ips:
 			raise ValueError("list of ips must not be empty")

BIN
resources/telnet-raw.pcap


+ 1 - 1
test/test_ipgenerator.py

@@ -1,4 +1,4 @@
-from ID2TLib.IPGenerator import IPGenerator
+from ID2TLib.OldLibs.IPGenerator import IPGenerator
 import unittest
 
 class IPGeneratorTestCase(unittest.TestCase):

+ 116 - 0
test/test_pcap_comparator.py

@@ -0,0 +1,116 @@
+#!/usr/bin/python3
+
+import sys, os
+import subprocess, shlex
+import time
+import unittest
+
+import scapy.all
+
+class PcapComparison(unittest.TestCase):
+    ID2T_PATH = ".."
+    ID2T_LOCATION = ID2T_PATH + "/" + "id2t"
+    NUM_ITERATIONS = 3
+
+    PCAP_ENVIRONMENT_VALUE = "ID2T_SRC_PCAP"
+    SEED_ENVIRONMENT_VALUE = "ID2T_SEED"
+
+    DEFAULT_PCAP = "resources/telnet-raw.pcap"
+    DEFAULT_SEED = "42"
+
+    OUTPUT_FILES_PREFIX_LINE = "Output files created:"
+
+    def setUp(self):
+        self.generated_files = []
+        self.keep_files = []
+
+    def test_determinism(self):
+        input_pcap = os.environ.get(self.PCAP_ENVIRONMENT_VALUE, self.DEFAULT_PCAP)
+        seed = os.environ.get(self.SEED_ENVIRONMENT_VALUE, self.DEFAULT_SEED)
+
+        command_args = [self.ID2T_LOCATION, "-i", input_pcap, "--seed", seed, "-a", "MembersMgmtCommAttack"]
+        command = " ".join(map(shlex.quote, command_args))
+        self.print_warning("The command that gets executed is:", command)
+
+        generated_pcap = None
+        for i in range(self.NUM_ITERATIONS):
+            retcode, output = subprocess.getstatusoutput(command)
+
+            self.print_warning(output)
+            self.assertEqual(retcode, 0, "For some reason id2t completed with an error")
+
+            files = self.parse_files(output)
+            self.generated_files.extend(files)
+
+            pcap = self.find_pcap(files)
+            if generated_pcap is not None:
+                try: self.compare_pcaps(generated_pcap, pcap)
+                except AssertionError as e:
+                    self.keep_files = [generated_pcap, pcap]
+                    raise e
+            else:
+                generated_pcap = pcap
+
+            self.print_warning()
+            time.sleep(1) # let some time pass between calls because files are based on the time
+
+    def tearDown(self):
+        self.print_warning("Cleaning up files generated by the test-calls...")
+        for file in self.generated_files:
+            if file in self.keep_files: continue
+
+            self.print_warning(file)
+            os.remove(self.ID2T_PATH + os.path.sep + file)
+        self.print_warning("Done")
+        self.print_warning("The following files have been kept: " + ", ".join(self.keep_files))
+
+    def parse_files(self, program_output: str) -> "list[str]":
+        lines = program_output.split(os.linesep)
+
+        self.assertIn(self.OUTPUT_FILES_PREFIX_LINE, lines,
+                "The magic string is not in the program output anymore, has the program output structure changed?")
+        index = lines.index(self.OUTPUT_FILES_PREFIX_LINE)
+
+        return lines[index + 1:]
+
+    def find_pcap(self, files: "list[str]") -> str:
+        return next(file for file in files if file.endswith(".pcap"))
+
+    def compare_pcaps(self, one: str, other: str):
+        packetsA = list(scapy.all.rdpcap(self.ID2T_PATH + "/" + one))
+        packetsB = list(scapy.all.rdpcap(self.ID2T_PATH + "/" + other))
+
+        self.assertEqual(len(packetsA), len(packetsB), "Both pcap's have to have the same amount of packets")
+        for i in range(len(packetsA)):
+            p, p2 = packetsA[i], packetsB[i]
+
+            self.assertAlmostEqual(p.time, p2.time, "Packets no %i in the pcap's don't appear at the same time" % (i + 1))
+            self.compare_packets(p, p2, i + 1)
+
+    def compare_packets(self, p, p2, packet_number):
+        if p == p2:
+            return
+
+        while type(p) != scapy.packet.NoPayload or type(p2) != scapy.packet.NoPayload:
+            if type(p) != type(p2):
+                self.fail("Packets %i are of incompatible types: %s and %s" % (packet_number, type(p).__name__, type(p2).__name__))
+
+            for field in p.fields:
+                if p.fields[field] != p2.fields[field]:
+                    packet_type = type(p).__name__
+                    v, v2 = p.fields[field], p2.fields[field]
+
+                    self.fail("Packets %i differ in field %s.%s: %s != %s" %
+                                (packet_number, packet_type, field, v, v2))
+
+            p = p.payload
+            p2 = p2.payload
+
+    def print_warning(self, *text):
+        print(*text, file=sys.stderr)
+
+if __name__ == "__main__":
+    suite = unittest.TestSuite()
+    suite.addTest(PcapComparison("test_determinism"))
+
+    unittest.TextTestRunner().run(suite)