test_pcap_comparator.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. #!/usr/bin/python3
  2. import sys, os
  3. import subprocess, shlex
  4. import time
  5. import unittest
  6. import random
  7. import scapy.all
  8. from TestUtil import PcapComparator
  9. # this dictionary holds the generators (functions) for the parameters
  10. # that will be passed to the MembershipMgmtCommAttack
  11. # items need the parameter-name as key and a function that will be called
  12. # without parameters and returns a valid value for that parameter as value
  13. # WARNING: parameters will be passed via command line, make sure your values
  14. # get converted to string correctly
  15. _random_bool = lambda: random.random() < 0.5
  16. ID2T_PARAMETER_GENERATORS = {
  17. "bots.count": lambda: random.randint(3, 6),
  18. # "file.csv":,
  19. # "file.xml":,
  20. "hidden_mark": _random_bool,
  21. # "interval.selection.end":,
  22. # "interval.selection.start":,
  23. # "interval.selection.strategy":,
  24. # "ip.reuse.external":,
  25. # "ip.reuse.local":,
  26. # "ip.reuse.total":,
  27. "multiport": _random_bool,
  28. "nat.present": _random_bool,
  29. "packet.padding": lambda: random.randint(0, 100),
  30. "packets.limit": lambda: random.randint(50, 150),
  31. "packets.per-second": lambda: random.randint(1000, 2000) / 100,
  32. "ttl.from.caida": _random_bool,
  33. }
  34. class PcapComparison(unittest.TestCase):
  35. ID2T_PATH = ".."
  36. ID2T_LOCATION = ID2T_PATH + "/" + "id2t"
  37. NUM_ITERATIONS_PER_PARAMS = 3
  38. NUM_ITERATIONS = 5
  39. PCAP_ENVIRONMENT_VALUE = "ID2T_SRC_PCAP"
  40. SEED_ENVIRONMENT_VALUE = "ID2T_SEED"
  41. DEFAULT_PCAP = "resources/test/telnet-raw.pcap"
  42. DEFAULT_SEED = "42"
  43. OUTPUT_FILES_PREFIX_LINE = "Output files created:"
  44. def __init__(self, *args, **kwargs):
  45. unittest.TestCase.__init__(self, *args, **kwargs)
  46. # params to call id2t with, as a list[list[str]]
  47. # do a round of testing for each list[str] we get
  48. # if none generate some params itself
  49. self.id2t_params = None
  50. def set_id2t_params(self, params: "list[list[str]]"):
  51. self.id2t_params = params
  52. def setUp(self):
  53. self.generated_files = []
  54. self.keep_files = []
  55. def test_determinism(self):
  56. input_pcap = os.environ.get(self.PCAP_ENVIRONMENT_VALUE, self.DEFAULT_PCAP)
  57. seed = os.environ.get(self.SEED_ENVIRONMENT_VALUE, self.DEFAULT_SEED)
  58. if self.id2t_params is None:
  59. self.id2t_params = self.random_id2t_params()
  60. for params in self.id2t_params:
  61. self.do_test_round(input_pcap, seed, params)
  62. def do_test_round(self, input_pcap, seed, additional_params):
  63. command_args = [self.ID2T_LOCATION, "-i", input_pcap, "--seed", seed, "-a", "MembersMgmtCommAttack"] + additional_params
  64. command = " ".join(map(shlex.quote, command_args))
  65. self.print_warning("The command that gets executed is:", command)
  66. generated_pcap = None
  67. for i in range(self.NUM_ITERATIONS_PER_PARAMS):
  68. retcode, output = subprocess.getstatusoutput(command)
  69. self.print_warning(output)
  70. self.assertEqual(retcode, 0, "For some reason id2t completed with an error")
  71. files = self.parse_files(output)
  72. self.generated_files.extend(files)
  73. pcap = self.find_pcap(files)
  74. if generated_pcap is not None:
  75. try:
  76. self.compare_pcaps(generated_pcap, pcap)
  77. except AssertionError as e:
  78. self.keep_files = [generated_pcap, pcap]
  79. raise e
  80. else:
  81. generated_pcap = pcap
  82. self.print_warning()
  83. time.sleep(1) # let some time pass between calls because files are based on the time
  84. def tearDown(self):
  85. self.print_warning("Cleaning up files generated by the test-calls...")
  86. for file in self.generated_files:
  87. if file in self.keep_files: continue
  88. self.print_warning(file)
  89. os.remove(self.ID2T_PATH + os.path.sep + file)
  90. self.print_warning("Done")
  91. self.print_warning("The following files have been kept: " + ", ".join(self.keep_files))
  92. def parse_files(self, program_output: str) -> "list[str]":
  93. lines = program_output.split(os.linesep)
  94. self.assertIn(self.OUTPUT_FILES_PREFIX_LINE, lines,
  95. "The magic string is not in the program output anymore, has the program output structure changed?")
  96. index = lines.index(self.OUTPUT_FILES_PREFIX_LINE)
  97. return lines[index + 1:]
  98. def find_pcap(self, files: "list[str]") -> str:
  99. return next(file for file in files if file.endswith(".pcap"))
  100. def compare_pcaps(self, one: str, other: str):
  101. PcapComparator().compare_files(self.ID2T_PATH + "/" + one, self.ID2T_PATH + "/" + other)
  102. def print_warning(self, *text):
  103. print(*text, file=sys.stderr)
  104. def random_id2t_params(self):
  105. """
  106. :return: A list of parameter-lists for id2t, useful if you want several
  107. iterations
  108. """
  109. param_list = []
  110. for i in range(self.NUM_ITERATIONS):
  111. param_list.append(self.random_id2t_param_set())
  112. return param_list
  113. def random_id2t_param_set(self):
  114. """
  115. Create a list of parameters to call the membersmgmtcommattack with
  116. :return: a list of command-line parameters
  117. """
  118. param = lambda key, val: "%s=%s" % (str(key), str(val))
  119. number_of_keys = min(random.randint(2, 5), len(ID2T_PARAMETER_GENERATORS))
  120. keys = random.sample(list(ID2T_PARAMETER_GENERATORS), number_of_keys)
  121. params = []
  122. for key in keys:
  123. generator = ID2T_PARAMETER_GENERATORS[key]
  124. params.append(param(key, generator()))
  125. return params
  126. if __name__ == "__main__":
  127. import sys
  128. # parameters for this program are interpreted as id2t-parameters
  129. id2t_args = sys.argv[1:]
  130. comparison = PcapComparison("test_determinism")
  131. if id2t_args: comparison.set_id2t_params([id2t_args])
  132. suite = unittest.TestSuite()
  133. suite.addTest(comparison)
  134. unittest.TextTestRunner().run(suite)