123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- #!/usr/bin/python3
- import sys, os
- import subprocess, shlex
- import time
- import unittest
- import scapy.all
- class PcapComparison(unittest.TestCase):
- ID2T_PATH = ".."
- ID2T_LOCATION = ID2T_PATH + "/" + "id2t"
- NUM_ITERATIONS = 3
- PCAP_ENVIRONMENT_VALUE = "ID2T_SRC_PCAP"
- SEED_ENVIRONMENT_VALUE = "ID2T_SEED"
- DEFAULT_PCAP = "resources/telnet-raw.pcap"
- DEFAULT_SEED = "42"
- OUTPUT_FILES_PREFIX_LINE = "Output files created:"
- def setUp(self):
- self.generated_files = []
- self.keep_files = []
- def test_determinism(self):
- input_pcap = os.environ.get(self.PCAP_ENVIRONMENT_VALUE, self.DEFAULT_PCAP)
- seed = os.environ.get(self.SEED_ENVIRONMENT_VALUE, self.DEFAULT_SEED)
- command_args = [self.ID2T_LOCATION, "-i", input_pcap, "--seed", seed, "-a", "MembersMgmtCommAttack"]
- command = " ".join(map(shlex.quote, command_args))
- self.print_warning("The command that gets executed is:", command)
- generated_pcap = None
- for i in range(self.NUM_ITERATIONS):
- retcode, output = subprocess.getstatusoutput(command)
- self.print_warning(output)
- self.assertEqual(retcode, 0, "For some reason id2t completed with an error")
- files = self.parse_files(output)
- self.generated_files.extend(files)
- pcap = self.find_pcap(files)
- if generated_pcap is not None:
- try: self.compare_pcaps(generated_pcap, pcap)
- except AssertionError as e:
- self.keep_files = [generated_pcap, pcap]
- raise e
- else:
- generated_pcap = pcap
- self.print_warning()
- time.sleep(1) # let some time pass between calls because files are based on the time
- def tearDown(self):
- self.print_warning("Cleaning up files generated by the test-calls...")
- for file in self.generated_files:
- if file in self.keep_files: continue
- self.print_warning(file)
- os.remove(self.ID2T_PATH + os.path.sep + file)
- self.print_warning("Done")
- self.print_warning("The following files have been kept: " + ", ".join(self.keep_files))
- def parse_files(self, program_output: str) -> "list[str]":
- lines = program_output.split(os.linesep)
- self.assertIn(self.OUTPUT_FILES_PREFIX_LINE, lines,
- "The magic string is not in the program output anymore, has the program output structure changed?")
- index = lines.index(self.OUTPUT_FILES_PREFIX_LINE)
- return lines[index + 1:]
- def find_pcap(self, files: "list[str]") -> str:
- return next(file for file in files if file.endswith(".pcap"))
- def compare_pcaps(self, one: str, other: str):
- packetsA = list(scapy.all.rdpcap(self.ID2T_PATH + "/" + one))
- packetsB = list(scapy.all.rdpcap(self.ID2T_PATH + "/" + other))
- self.assertEqual(len(packetsA), len(packetsB), "Both pcap's have to have the same amount of packets")
- for i in range(len(packetsA)):
- p, p2 = packetsA[i], packetsB[i]
- self.assertAlmostEqual(p.time, p2.time, "Packets no %i in the pcap's don't appear at the same time" % (i + 1))
- self.compare_packets(p, p2, i + 1)
- def compare_packets(self, p, p2, packet_number):
- if p == p2:
- return
- while type(p) != scapy.packet.NoPayload or type(p2) != scapy.packet.NoPayload:
- if type(p) != type(p2):
- self.fail("Packets %i are of incompatible types: %s and %s" % (packet_number, type(p).__name__, type(p2).__name__))
- for field in p.fields:
- if p.fields[field] != p2.fields[field]:
- packet_type = type(p).__name__
- v, v2 = p.fields[field], p2.fields[field]
- self.fail("Packets %i differ in field %s.%s: %s != %s" %
- (packet_number, packet_type, field, v, v2))
- p = p.payload
- p2 = p2.payload
- def print_warning(self, *text):
- print(*text, file=sys.stderr)
- if __name__ == "__main__":
- suite = unittest.TestSuite()
- suite.addTest(PcapComparison("test_determinism"))
- unittest.TextTestRunner().run(suite)
|