|
@@ -0,0 +1,154 @@
|
|
|
+#!/usr/bin/python3
|
|
|
+
|
|
|
+import os
|
|
|
+import sys
|
|
|
+import subprocess
|
|
|
+import shlex
|
|
|
+import time
|
|
|
+import unittest
|
|
|
+
|
|
|
+import scapy.all
|
|
|
+
|
|
|
+
|
|
|
+class PcapComparison(unittest.TestCase):
|
|
|
+ ID2T_DIRECTORY = ".."
|
|
|
+ ID2T_LOCATION = ID2T_DIRECTORY + "/" + "id2t"
|
|
|
+ NUM_ITERATIONS = 3
|
|
|
+
|
|
|
+ PCAP_ENVIRONMENT_VALUE = "ID2T_SRC_PCAP"
|
|
|
+ SEED_ENVIRONMENT_VALUE = "ID2T_SEED"
|
|
|
+
|
|
|
+ DEFAULT_PCAP = "resources/telnet-raw.pcap"
|
|
|
+ DEFAULT_SEED = "42"
|
|
|
+
|
|
|
+ OUTPUT_FILES_PREFIX_LINE = "Output files created: "
|
|
|
+
|
|
|
+ def __init__(self, *args, **kwargs):
|
|
|
+ unittest.TestCase.__init__(self, *args, **kwargs)
|
|
|
+
|
|
|
+ # params to call id2t with, as a list[list[str]]
|
|
|
+ # do a round of testing for each list[str] we get
|
|
|
+ # if none generate some params itself
|
|
|
+ self.id2t_params = None
|
|
|
+
|
|
|
+ def set_id2t_params(self, params: "list[list[str]]"):
|
|
|
+ self.id2t_params = params
|
|
|
+
|
|
|
+ def setUp(self):
|
|
|
+ self.generated_files = []
|
|
|
+ self.keep_files = []
|
|
|
+
|
|
|
+ def test_determinism(self):
|
|
|
+ input_pcap = os.environ.get(self.PCAP_ENVIRONMENT_VALUE, self.DEFAULT_PCAP)
|
|
|
+ seed = os.environ.get(self.SEED_ENVIRONMENT_VALUE, self.DEFAULT_SEED)
|
|
|
+
|
|
|
+ if self.id2t_params is None:
|
|
|
+ self.id2t_params = self.random_id2t_params()
|
|
|
+
|
|
|
+ for params in self.id2t_params:
|
|
|
+ self.do_test_round(input_pcap, seed, params)
|
|
|
+
|
|
|
+ def do_test_round(self, input_pcap, seed, additional_params):
|
|
|
+ command_args = [self.ID2T_LOCATION, "-i", input_pcap, "-S", seed, "-a", "MembersMgmtCommAttack"] + additional_params
|
|
|
+ command = " ".join(map(shlex.quote, command_args))
|
|
|
+ self.print_warning("The command that gets executed is:", command)
|
|
|
+
|
|
|
+ generated_pcap = None
|
|
|
+ for i in range(self.NUM_ITERATIONS):
|
|
|
+ retcode, output = subprocess.getstatusoutput(command)
|
|
|
+
|
|
|
+ self.print_warning(output)
|
|
|
+ self.assertEqual(retcode, 0, "For some reason id2t completed with an error")
|
|
|
+
|
|
|
+ files = self.parse_files(output)
|
|
|
+ self.generated_files.extend(files)
|
|
|
+
|
|
|
+ pcap = self.find_pcap(files)
|
|
|
+ if generated_pcap is not None:
|
|
|
+ try:
|
|
|
+ self.compare_pcaps(generated_pcap, pcap)
|
|
|
+ except AssertionError as e:
|
|
|
+ self.keep_files = [generated_pcap, pcap]
|
|
|
+ raise e
|
|
|
+ else:
|
|
|
+ generated_pcap = pcap
|
|
|
+
|
|
|
+ self.print_warning()
|
|
|
+ time.sleep(1) # let some time pass between calls because files are based on the time
|
|
|
+
|
|
|
+ def tearDown(self):
|
|
|
+ self.print_warning("Cleaning up files generated by the test-calls...")
|
|
|
+ for file in self.generated_files:
|
|
|
+ if file in self.keep_files: continue
|
|
|
+
|
|
|
+ self.print_warning(file)
|
|
|
+ os.remove(self.ID2T_DIRECTORY + os.path.sep + file)
|
|
|
+ self.print_warning("Done")
|
|
|
+ self.print_warning("The following files have been kept: " + ", ".join(self.keep_files))
|
|
|
+
|
|
|
+ def parse_files(self, program_output: str) -> "list[str]":
|
|
|
+ lines = program_output.split(os.linesep)
|
|
|
+
|
|
|
+ self.assertIn(self.OUTPUT_FILES_PREFIX_LINE, lines,
|
|
|
+ "The magic string is not in the program output anymore, has the program output structure changed?")
|
|
|
+ index = lines.index(self.OUTPUT_FILES_PREFIX_LINE)
|
|
|
+
|
|
|
+ return [file.strip() for file in lines[index + 1:]]
|
|
|
+
|
|
|
+ def find_pcap(self, files: "list[str]") -> str:
|
|
|
+ return next(file for file in files if file.endswith(".pcap"))
|
|
|
+
|
|
|
+ def compare_pcaps(self, one: str, other: str):
|
|
|
+ packetsA = list(scapy.all.rdpcap(self.ID2T_DIRECTORY + "/" + one))
|
|
|
+ packetsB = list(scapy.all.rdpcap(self.ID2T_DIRECTORY + "/" + other))
|
|
|
+
|
|
|
+ self.assertEqual(len(packetsA), len(packetsB), "Both pcap's have to have the same amount of packets")
|
|
|
+ for i in range(len(packetsA)):
|
|
|
+ p, p2 = packetsA[i], packetsB[i]
|
|
|
+
|
|
|
+ self.assertAlmostEqual(p.time, p2.time, "Packets no %i in the pcap's don't appear at the same time" % (i + 1))
|
|
|
+ self.compare_packets(p, p2, i + 1)
|
|
|
+
|
|
|
+ def compare_packets(self, p, p2, packet_number):
|
|
|
+ if p == p2:
|
|
|
+ return
|
|
|
+
|
|
|
+ while type(p) != scapy.packet.NoPayload or type(p2) != scapy.packet.NoPayload:
|
|
|
+ if type(p) != type(p2):
|
|
|
+ self.fail("Packets %i are of incompatible types: %s and %s" % (packet_number, type(p).__name__, type(p2).__name__))
|
|
|
+
|
|
|
+ for field in p.fields:
|
|
|
+ if p.fields[field] != p2.fields[field]:
|
|
|
+ packet_type = type(p).__name__
|
|
|
+ v, v2 = p.fields[field], p2.fields[field]
|
|
|
+
|
|
|
+ self.fail("Packets %i differ in field %s.%s: %s != %s" %
|
|
|
+ (packet_number, packet_type, field, v, v2))
|
|
|
+
|
|
|
+ p = p.payload
|
|
|
+ p2 = p2.payload
|
|
|
+
|
|
|
+ def print_warning(self, *text):
|
|
|
+ print(*text, file=sys.stderr)
|
|
|
+
|
|
|
+ def random_id2t_params(self):
|
|
|
+ param = lambda key, val: "-p%s=%s" % (str(key), str(val))
|
|
|
+
|
|
|
+ return [
|
|
|
+ []
|
|
|
+ ]
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ import sys
|
|
|
+
|
|
|
+ # parameters for this program are interpreted as id2t-parameters
|
|
|
+ id2t_args = sys.argv[1:]
|
|
|
+ comparison = PcapComparison("test_determinism")
|
|
|
+ if id2t_args:
|
|
|
+ comparison.set_id2t_params([id2t_args])
|
|
|
+
|
|
|
+ suite = unittest.TestSuite()
|
|
|
+ suite.addTest(comparison)
|
|
|
+
|
|
|
+ unittest.TextTestRunner().run(suite)
|