TestUtil.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. #!/usr/bin/python3
  2. import scapy.all
  3. import scapy.packet
  4. import shlex
  5. import subprocess
  6. import os
  7. # You could compare pcaps by byte or by hash too, but this class tells you
  8. # where exactly pcaps differ
  9. class PcapComparator:
  10. def compare_files(self, file: str, other_file: str):
  11. self.compare_captures(scapy.all.rdpcap(file), scapy.all.rdpcap(other_file))
  12. def compare_captures(self, packetsA, packetsB):
  13. if len(packetsA) != len(packetsB):
  14. self.fail("Both pcap's have to have the same amount of packets")
  15. for i in range(len(packetsA)):
  16. p, p2 = packetsA[i], packetsB[i]
  17. if abs(p.time - p2.time) > (10 ** -7):
  18. self.fail("Packets no %i in the pcap's don't appear at the same time" % (i + 1))
  19. self.compare_packets(p, p2, i + 1)
  20. def compare_packets(self, p: scapy.packet.BasePacket, p2: scapy.packet.BasePacket, packet_number: int):
  21. if p == p2:
  22. return
  23. while type(p) != scapy.packet.NoPayload or type(p2) != scapy.packet.NoPayload:
  24. if type(p) != type(p2):
  25. self.fail("Packets %i are of incompatible types: %s and %s" % (packet_number, type(p).__name__, type(p2).__name__))
  26. for field in p.fields:
  27. if p.fields[field] != p2.fields[field]:
  28. packet_type = type(p).__name__
  29. v, v2 = p.fields[field], p2.fields[field]
  30. self.fail("Packets %i differ in field %s.%s: %s != %s" %
  31. (packet_number, packet_type, field, v, v2))
  32. p = p.payload
  33. p2 = p2.payload
  34. def fail(self, message: str):
  35. raise AssertionError(message)
  36. class ID2TExecution:
  37. ID2T_PATH = ".."
  38. ID2T_LOCATION = ID2T_PATH + "/" + "id2t"
  39. OUTPUT_FILES_PREFIX_LINE = "Output files created:"
  40. def __init__(self, input_filename, id2t_path=ID2T_LOCATION, seed=None):
  41. self.input_file = input_filename
  42. self.seed = str(seed)
  43. self.id2t_path = id2t_path
  44. self.generated_files = [] # files generated by id2t
  45. self.keep_files = []
  46. self.return_code = None
  47. self.id2t_output = None
  48. def has_run(self):
  49. return self.return_code is not None
  50. def run(self, parameters):
  51. if self.has_run():
  52. raise RuntimeError("This instance has already run and can't do it again")
  53. command = self.get_run_command(parameters)
  54. return_code, output = subprocess.getstatusoutput(command)
  55. self.return_code = return_code
  56. self.id2t_output = output
  57. self.generated_files = self._parse_files(output)
  58. def get_run_command(self, parameters):
  59. command_args = [self.id2t_path, "-i", self.input_file]
  60. if self.seed is not None:
  61. command_args.extend(["--seed", self.seed])
  62. command_args.extend(["-a", "MembersMgmtCommAttack"])
  63. command_args.extend(parameters)
  64. return " ".join(map(shlex.quote, command_args))
  65. def _parse_files(self, program_output: str) -> "list[str]":
  66. lines = program_output.split(os.linesep)
  67. if self.OUTPUT_FILES_PREFIX_LINE not in lines:
  68. raise AssertionError("The magic string is not in the program output anymore, has the program output structure changed?")
  69. index = lines.index(self.OUTPUT_FILES_PREFIX_LINE)
  70. return lines[index + 1:]
  71. def get_pcap_filename(self):
  72. self._require_run()
  73. return self._find_pcap()
  74. def get_output(self):
  75. self._require_run()
  76. return self.id2t_output
  77. def get_return_code(self):
  78. self._require_run()
  79. return self.return_code
  80. def keep_file(self, file):
  81. self._require_run()
  82. if file not in self.generated_files:
  83. raise ValueError("%s is not generated by id2t" % file)
  84. if file not in self.keep_files:
  85. self.keep_files.append()
  86. def get_kept_files(self):
  87. self._require_run()
  88. return self.keep_files
  89. def get_generated_files(self):
  90. self._require_run()
  91. return self.generated_files
  92. def get_files_for_deletion(self):
  93. self._require_run()
  94. return [file for file in self.generated_files if file not in self.keep_files]
  95. def _find_pcap(self) -> str:
  96. return next(file for file in self.generated_files if file.endswith(".pcap"))
  97. def _require_run(self):
  98. if not self.has_run():
  99. raise RuntimeError("You have to execute run() before you can call this method")
  100. def cleanup(self):
  101. if self.has_run():
  102. id2t_relative = os.path.dirname(self.id2t_path)
  103. for file in self.get_files_for_deletion():
  104. try:
  105. os.unlink(id2t_relative + "/" + file)
  106. except: pass
  107. def __del__(self):
  108. self.cleanup()
  109. if __name__ == "__main__":
  110. import sys
  111. if len(sys.argv) < 3:
  112. print("Usage: %s one.pcap other.pcap" % sys.argv[0])
  113. exit(0)
  114. try:
  115. PcapComparator().compare_files(sys.argv[1], sys.argv[2])
  116. print("The given pcaps are equal")
  117. except AssertionError as e:
  118. print("The given pcaps are not equal")
  119. print("Error message:", *e.args)
  120. exit(1)
  121. except Exception as e:
  122. print("During the comparison an unexpected error happened")
  123. print(type(e).__name__ + ":", *e.args)
  124. exit(1)