TestUtil.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. #!/usr/bin/python3
  2. import scapy.packet
  3. import scapy.utils
  4. import shlex
  5. import subprocess
  6. import os
  7. # You could compare pcaps by byte or by hash too, but this class tells you
  8. # where exactly pcaps differ
  9. class PcapComparator:
  10. def compare_files(self, file: str, other_file: str):
  11. self.compare_captures(scapy.utils.rdpcap(file), scapy.utils.rdpcap(other_file))
  12. def compare_captures(self, packetsA, packetsB):
  13. if len(packetsA) != len(packetsB):
  14. self.fail("Both pcap's have to have the same amount of packets")
  15. for i in range(len(packetsA)):
  16. p, p2 = packetsA[i], packetsB[i]
  17. if abs(p.time - p2.time) > (10 ** -7):
  18. self.fail("Packets no %i in the pcap's don't appear at the same time" % (i + 1))
  19. self.compare_packets(p, p2, i + 1)
  20. def compare_packets(self, p: scapy.packet.BasePacket, p2: scapy.packet.BasePacket, packet_number: int):
  21. if p == p2:
  22. return
  23. while type(p) != scapy.packet.NoPayload or type(p2) != scapy.packet.NoPayload:
  24. if type(p) != type(p2):
  25. self.fail("Packets %i are of incompatible types: %s and %s" % (packet_number, type(p).__name__, type(p2).__name__))
  26. for field in p.fields:
  27. if p.fields[field] != p2.fields[field]:
  28. packet_type = type(p).__name__
  29. v, v2 = p.fields[field], p2.fields[field]
  30. self.fail("Packets %i differ in field %s.%s: %s != %s" %
  31. (packet_number, packet_type, field, v, v2))
  32. p = p.payload
  33. p2 = p2.payload
  34. def fail(self, message: str):
  35. raise AssertionError(message)
  36. class ID2TExecution:
  37. ID2T_PATH = ".."
  38. ID2T_LOCATION = ID2T_PATH + "/" + "id2t"
  39. OUTPUT_FILES_PREFIX_LINE = "Output files created:"
  40. def __init__(self, input_filename, id2t_path=ID2T_LOCATION, seed=None):
  41. self.input_file = input_filename
  42. self.seed = str(seed)
  43. self.id2t_path = id2t_path
  44. self.generated_files = [] # files generated by id2t
  45. self.keep_files = []
  46. self.return_code = None
  47. self.id2t_output = None
  48. def has_run(self):
  49. return self.return_code is not None
  50. def run(self, parameters):
  51. if self.has_run():
  52. raise RuntimeError("This instance has already run and can't do it again")
  53. command = self.get_run_command(parameters)
  54. return_code, output = subprocess.getstatusoutput(command)
  55. self.return_code = return_code
  56. self.id2t_output = output
  57. self.generated_files = self._parse_files(output)
  58. def get_run_command(self, parameters):
  59. command_args = [self.id2t_path, "-i", self.input_file]
  60. if self.seed is not None:
  61. command_args.extend(["-S", self.seed])
  62. command_args.extend(["-a", "MembersMgmtCommAttack"])
  63. command_args.extend(parameters)
  64. return " ".join(map(shlex.quote, command_args))
  65. def _parse_files(self, program_output: str) -> "list[str]":
  66. lines = program_output.split(os.linesep)
  67. if self.OUTPUT_FILES_PREFIX_LINE not in lines:
  68. raise AssertionError("The magic string is not in the program output anymore, has the program output structure changed?")
  69. index = lines.index(self.OUTPUT_FILES_PREFIX_LINE)
  70. next_empty_line_index = lines.index("", index) if "" in lines[index:] else len(lines)
  71. return lines[index + 1:next_empty_line_index]
  72. def get_pcap_filename(self):
  73. self._require_run()
  74. return self._find_pcap()
  75. def get_output(self):
  76. self._require_run()
  77. return self.id2t_output
  78. def get_return_code(self):
  79. self._require_run()
  80. return self.return_code
  81. def keep_file(self, file):
  82. self._require_run()
  83. if file not in self.generated_files:
  84. raise ValueError("%s is not generated by id2t" % file)
  85. if file not in self.keep_files:
  86. self.keep_files.append()
  87. def get_kept_files(self):
  88. self._require_run()
  89. return self.keep_files
  90. def get_generated_files(self):
  91. self._require_run()
  92. return self.generated_files
  93. def get_files_for_deletion(self):
  94. self._require_run()
  95. return [file for file in self.generated_files if file not in self.keep_files and not "No packets were injected." in file]
  96. def _find_pcap(self) -> str:
  97. for gen_file in self.generated_files:
  98. if "No packets were injected." in gen_file:
  99. return "No packets were injected."
  100. return next(file for file in self.generated_files if file.endswith(".pcap"))
  101. def _require_run(self):
  102. if not self.has_run():
  103. raise RuntimeError("You have to execute run() before you can call this method")
  104. def cleanup(self):
  105. if self.has_run():
  106. id2t_relative = os.path.dirname(self.id2t_path)
  107. for file in self.get_files_for_deletion():
  108. if "No packets were injected." in file:
  109. pass
  110. try:
  111. os.unlink(id2t_relative + "/" + file)
  112. except: pass
  113. def __del__(self):
  114. self.cleanup()
  115. if __name__ == "__main__":
  116. import sys
  117. if len(sys.argv) < 3:
  118. print("Usage: %s one.pcap other.pcap" % sys.argv[0])
  119. exit(0)
  120. try:
  121. PcapComparator().compare_files(sys.argv[1], sys.argv[2])
  122. print("The given pcaps are equal")
  123. except AssertionError as e:
  124. print("The given pcaps are not equal")
  125. print("Error message:", *e.args)
  126. exit(1)
  127. except Exception as e:
  128. print("During the comparison an unexpected error happened")
  129. print(type(e).__name__ + ":", *e.args)
  130. exit(1)