#!/usr/bin/python3 import sys, os import subprocess, shlex import time import unittest import scapy.all class PcapComparison(unittest.TestCase): ID2T_PATH = ".." ID2T_LOCATION = ID2T_PATH + "/" + "id2t" NUM_ITERATIONS = 3 PCAP_ENVIRONMENT_VALUE = "ID2T_SRC_PCAP" SEED_ENVIRONMENT_VALUE = "ID2T_SEED" DEFAULT_PCAP = "resources/telnet-raw.pcap" DEFAULT_SEED = "42" OUTPUT_FILES_PREFIX_LINE = "Output files created:" def setUp(self): self.generated_files = [] self.keep_files = [] def test_determinism(self): input_pcap = os.environ.get(self.PCAP_ENVIRONMENT_VALUE, self.DEFAULT_PCAP) seed = os.environ.get(self.SEED_ENVIRONMENT_VALUE, self.DEFAULT_SEED) command_args = [self.ID2T_LOCATION, "-i", input_pcap, "--seed", seed, "-a", "MembersMgmtCommAttack"] command = " ".join(map(shlex.quote, command_args)) self.print_warning("The command that gets executed is:", command) generated_pcap = None for i in range(self.NUM_ITERATIONS): retcode, output = subprocess.getstatusoutput(command) self.print_warning(output) self.assertEqual(retcode, 0, "For some reason id2t completed with an error") files = self.parse_files(output) self.generated_files.extend(files) pcap = self.find_pcap(files) if generated_pcap is not None: try: self.compare_pcaps(generated_pcap, pcap) except AssertionError as e: self.keep_files = [generated_pcap, pcap] raise e else: generated_pcap = pcap self.print_warning() time.sleep(1) # let some time pass between calls because files are based on the time def tearDown(self): self.print_warning("Cleaning up files generated by the test-calls...") for file in self.generated_files: if file in self.keep_files: continue self.print_warning(file) os.remove(self.ID2T_PATH + os.path.sep + file) self.print_warning("Done") self.print_warning("The following files have been kept: " + ", ".join(self.keep_files)) def parse_files(self, program_output: str) -> "list[str]": lines = program_output.split(os.linesep) self.assertIn(self.OUTPUT_FILES_PREFIX_LINE, lines, "The magic string is not in the program output anymore, has the program output structure changed?") index = lines.index(self.OUTPUT_FILES_PREFIX_LINE) return lines[index + 1:] def find_pcap(self, files: "list[str]") -> str: return next(file for file in files if file.endswith(".pcap")) def compare_pcaps(self, one: str, other: str): packetsA = list(scapy.all.rdpcap(self.ID2T_PATH + "/" + one)) packetsB = list(scapy.all.rdpcap(self.ID2T_PATH + "/" + other)) self.assertEqual(len(packetsA), len(packetsB), "Both pcap's have to have the same amount of packets") for i in range(len(packetsA)): p, p2 = packetsA[i], packetsB[i] self.assertAlmostEqual(p.time, p2.time, "Packets no %i in the pcap's don't appear at the same time" % (i + 1)) self.compare_packets(p, p2, i + 1) def compare_packets(self, p, p2, packet_number): if p == p2: return while type(p) != scapy.packet.NoPayload or type(p2) != scapy.packet.NoPayload: if type(p) != type(p2): self.fail("Packets %i are of incompatible types: %s and %s" % (packet_number, type(p).__name__, type(p2).__name__)) for field in p.fields: if p.fields[field] != p2.fields[field]: packet_type = type(p).__name__ v, v2 = p.fields[field], p2.fields[field] self.fail("Packets %i differ in field %s.%s: %s != %s" % (packet_number, packet_type, field, v, v2)) p = p.payload p2 = p2.payload def print_warning(self, *text): print(*text, file=sys.stderr) if __name__ == "__main__": suite = unittest.TestSuite() suite.addTest(PcapComparison("test_determinism")) unittest.TextTestRunner().run(suite)