TestUtil.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. #!/usr/bin/python3
  2. import scapy.main
  3. # This import is needed, otherwise scapy throws warnings. When reading a pcap scapy will not
  4. # find the layer-type 1 (ethernet) because it has not been loaded at the time. To circumvent
  5. # this we explicitely load the ethernet-type here.
  6. # For the curious guys and gals, the exact error message is:
  7. # "RawPcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % the_missing_ll_number
  8. # If the same problems happens with other ll-types feel free to load ALL imaginable layers
  9. # with the following line.
  10. # import scapy.layers.all
  11. import scapy.layers.l2
  12. import scapy.packet
  13. import scapy.utils
  14. import shlex
  15. import subprocess
  16. import os
  17. # You could compare pcaps by byte or by hash too, but this class tells you
  18. # where exactly pcaps differ
  19. class PcapComparator:
  20. def compare_files(self, file: str, other_file: str):
  21. self.compare_captures(scapy.utils.rdpcap(file), scapy.utils.rdpcap(other_file))
  22. def compare_captures(self, packetsA, packetsB):
  23. if len(packetsA) != len(packetsB):
  24. self.fail("Both pcaps have to have the same amount of packets")
  25. for i in range(len(packetsA)):
  26. p, p2 = packetsA[i], packetsB[i]
  27. if abs(p.time - p2.time) > (10 ** -7):
  28. self.fail("Packets no %i in the pcaps don't appear at the same time" % (i + 1))
  29. self.compare_packets(p, p2, i + 1)
  30. def compare_packets(self, p: scapy.packet.BasePacket, p2: scapy.packet.BasePacket, packet_number: int):
  31. if p == p2:
  32. return
  33. while type(p) != scapy.packet.NoPayload or type(p2) != scapy.packet.NoPayload:
  34. if type(p) != type(p2):
  35. self.fail("Packets %i are of incompatible types: %s and %s" % (packet_number, type(p).__name__, type(p2).__name__))
  36. for field in p.fields:
  37. if p.fields[field] != p2.fields[field]:
  38. packet_type = type(p).__name__
  39. v, v2 = p.fields[field], p2.fields[field]
  40. self.fail("Packets %i differ in field %s.%s: %s != %s" %
  41. (packet_number, packet_type, field, v, v2))
  42. p = p.payload
  43. p2 = p2.payload
  44. def fail(self, message: str):
  45. raise AssertionError(message)
  46. class ID2TExecution:
  47. ID2T_PATH = ".."
  48. ID2T_LOCATION = ID2T_PATH + "/" + "id2t"
  49. OUTPUT_FILES_PREFIX_LINE = "Output files created:"
  50. def __init__(self, input_filename, id2t_path=ID2T_LOCATION, seed=None):
  51. self.input_file = input_filename
  52. self.seed = str(seed)
  53. self.id2t_path = id2t_path
  54. self.generated_files = [] # files generated by id2t
  55. self.keep_files = []
  56. self.return_code = None
  57. self.id2t_output = None
  58. def has_run(self):
  59. return self.return_code is not None
  60. def run(self, parameters):
  61. if self.has_run():
  62. raise RuntimeError("This instance has already run and can't do it again")
  63. command = self.get_run_command(parameters)
  64. return_code, output = subprocess.getstatusoutput(command)
  65. self.return_code = return_code
  66. self.id2t_output = output
  67. self.generated_files = self._parse_files(output)
  68. def get_run_command(self, parameters):
  69. command_args = [self.id2t_path, "-i", self.input_file]
  70. if self.seed is not None:
  71. command_args.extend(["-S", self.seed])
  72. command_args.extend(["-a", "MembersMgmtCommAttack"])
  73. command_args.extend(parameters)
  74. return " ".join(map(shlex.quote, command_args))
  75. def _parse_files(self, program_output: str) -> "list[str]":
  76. lines = program_output.split(os.linesep)
  77. if self.OUTPUT_FILES_PREFIX_LINE not in lines:
  78. raise AssertionError("The magic string is not in the program output anymore, has the program output structure changed?")
  79. index = lines.index(self.OUTPUT_FILES_PREFIX_LINE)
  80. next_empty_line_index = lines.index("", index) if "" in lines[index:] else len(lines)
  81. return lines[index + 1:next_empty_line_index]
  82. def get_pcap_filename(self):
  83. self._require_run()
  84. return self._find_pcap()
  85. def get_output(self):
  86. self._require_run()
  87. return self.id2t_output
  88. def get_return_code(self):
  89. self._require_run()
  90. return self.return_code
  91. def keep_file(self, file):
  92. self._require_run()
  93. if file not in self.generated_files:
  94. raise ValueError("%s is not generated by id2t" % file)
  95. if file not in self.keep_files:
  96. self.keep_files.append(file)
  97. def get_kept_files(self):
  98. self._require_run()
  99. return self.keep_files
  100. def get_generated_files(self):
  101. self._require_run()
  102. return self.generated_files
  103. def get_files_for_deletion(self):
  104. self._require_run()
  105. return [file for file in self.generated_files if file not in self.keep_files and not "No packets were injected." in file]
  106. def _find_pcap(self) -> str:
  107. for gen_file in self.generated_files:
  108. if "No packets were injected." in gen_file:
  109. return "No packets were injected."
  110. return next(file for file in self.generated_files if file.endswith(".pcap"))
  111. def _require_run(self):
  112. if not self.has_run():
  113. raise RuntimeError("You have to execute run() before you can call this method")
  114. def cleanup(self):
  115. if self.has_run():
  116. id2t_relative = os.path.dirname(self.id2t_path)
  117. for file in self.get_files_for_deletion():
  118. if "No packets were injected." in file:
  119. pass
  120. try:
  121. os.unlink(id2t_relative + "/" + file)
  122. except: pass
  123. def __del__(self):
  124. self.cleanup()
  125. if __name__ == "__main__":
  126. import sys
  127. if len(sys.argv) < 3:
  128. print("Usage: %s one.pcap other.pcap" % sys.argv[0])
  129. exit(0)
  130. try:
  131. PcapComparator().compare_files(sys.argv[1], sys.argv[2])
  132. print("The given pcaps are equal")
  133. except AssertionError as e:
  134. print("The given pcaps are not equal")
  135. print("Error message:", *e.args)
  136. exit(1)
  137. except Exception as e:
  138. print("During the comparison an unexpected error happened")
  139. print(type(e).__name__ + ":", *e.args)
  140. exit(1)