test_pcap_comparator.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. #!/usr/bin/python3
  2. import os
  3. import sys
  4. import subprocess
  5. import shlex
  6. import time
  7. import unittest
  8. import scapy.all
  9. class PcapComparison(unittest.TestCase):
  10. ID2T_DIRECTORY = ".."
  11. ID2T_LOCATION = ID2T_DIRECTORY + "/" + "id2t"
  12. NUM_ITERATIONS = 3
  13. PCAP_ENVIRONMENT_VALUE = "ID2T_SRC_PCAP"
  14. SEED_ENVIRONMENT_VALUE = "ID2T_SEED"
  15. DEFAULT_PCAP = "resources/telnet-raw.pcap"
  16. DEFAULT_SEED = "42"
  17. OUTPUT_FILES_PREFIX_LINE = "Output files created: "
  18. def __init__(self, *args, **kwargs):
  19. unittest.TestCase.__init__(self, *args, **kwargs)
  20. # params to call id2t with, as a list[list[str]]
  21. # do a round of testing for each list[str] we get
  22. # if none generate some params itself
  23. self.id2t_params = None
  24. def set_id2t_params(self, params: "list[list[str]]"):
  25. self.id2t_params = params
  26. def setUp(self):
  27. self.generated_files = []
  28. self.keep_files = []
  29. def test_determinism(self):
  30. input_pcap = os.environ.get(self.PCAP_ENVIRONMENT_VALUE, self.DEFAULT_PCAP)
  31. seed = os.environ.get(self.SEED_ENVIRONMENT_VALUE, self.DEFAULT_SEED)
  32. if self.id2t_params is None:
  33. self.id2t_params = self.random_id2t_params()
  34. for params in self.id2t_params:
  35. self.do_test_round(input_pcap, seed, params)
  36. def do_test_round(self, input_pcap, seed, additional_params):
  37. command_args = [self.ID2T_LOCATION, "-i", input_pcap, "-S", seed, "-a", "MembersMgmtCommAttack"] + additional_params
  38. command = " ".join(map(shlex.quote, command_args))
  39. self.print_warning("The command that gets executed is:", command)
  40. generated_pcap = None
  41. for i in range(self.NUM_ITERATIONS):
  42. retcode, output = subprocess.getstatusoutput(command)
  43. self.print_warning(output)
  44. self.assertEqual(retcode, 0, "For some reason id2t completed with an error")
  45. files = self.parse_files(output)
  46. self.generated_files.extend(files)
  47. pcap = self.find_pcap(files)
  48. if generated_pcap is not None:
  49. try:
  50. self.compare_pcaps(generated_pcap, pcap)
  51. except AssertionError as e:
  52. self.keep_files = [generated_pcap, pcap]
  53. raise e
  54. else:
  55. generated_pcap = pcap
  56. self.print_warning()
  57. time.sleep(1) # let some time pass between calls because files are based on the time
  58. def tearDown(self):
  59. self.print_warning("Cleaning up files generated by the test-calls...")
  60. for file in self.generated_files:
  61. if file in self.keep_files: continue
  62. self.print_warning(file)
  63. os.remove(self.ID2T_DIRECTORY + os.path.sep + file)
  64. self.print_warning("Done")
  65. self.print_warning("The following files have been kept: " + ", ".join(self.keep_files))
  66. def parse_files(self, program_output: str) -> "list[str]":
  67. lines = program_output.split(os.linesep)
  68. self.assertIn(self.OUTPUT_FILES_PREFIX_LINE, lines,
  69. "The magic string is not in the program output anymore, has the program output structure changed?")
  70. index = lines.index(self.OUTPUT_FILES_PREFIX_LINE)
  71. return [file.strip() for file in lines[index + 1:]]
  72. def find_pcap(self, files: "list[str]") -> str:
  73. return next(file for file in files if file.endswith(".pcap"))
  74. def compare_pcaps(self, one: str, other: str):
  75. packetsA = list(scapy.all.rdpcap(self.ID2T_DIRECTORY + "/" + one))
  76. packetsB = list(scapy.all.rdpcap(self.ID2T_DIRECTORY + "/" + other))
  77. self.assertEqual(len(packetsA), len(packetsB), "Both pcap's have to have the same amount of packets")
  78. for i in range(len(packetsA)):
  79. p, p2 = packetsA[i], packetsB[i]
  80. self.assertAlmostEqual(p.time, p2.time, "Packets no %i in the pcap's don't appear at the same time" % (i + 1))
  81. self.compare_packets(p, p2, i + 1)
  82. def compare_packets(self, p, p2, packet_number):
  83. if p == p2:
  84. return
  85. while type(p) != scapy.packet.NoPayload or type(p2) != scapy.packet.NoPayload:
  86. if type(p) != type(p2):
  87. self.fail("Packets %i are of incompatible types: %s and %s" % (packet_number, type(p).__name__, type(p2).__name__))
  88. for field in p.fields:
  89. if p.fields[field] != p2.fields[field]:
  90. packet_type = type(p).__name__
  91. v, v2 = p.fields[field], p2.fields[field]
  92. self.fail("Packets %i differ in field %s.%s: %s != %s" %
  93. (packet_number, packet_type, field, v, v2))
  94. p = p.payload
  95. p2 = p2.payload
  96. def print_warning(self, *text):
  97. print(*text, file=sys.stderr)
  98. def random_id2t_params(self):
  99. param = lambda key, val: "-p%s=%s" % (str(key), str(val))
  100. return [
  101. []
  102. ]
  103. if __name__ == "__main__":
  104. import sys
  105. # parameters for this program are interpreted as id2t-parameters
  106. id2t_args = sys.argv[1:]
  107. comparison = PcapComparison("test_determinism")
  108. if id2t_args:
  109. comparison.set_id2t_params([id2t_args])
  110. suite = unittest.TestSuite()
  111. suite.addTest(comparison)
  112. unittest.TextTestRunner().run(suite)