3
0

test_determinism_mmcomm.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. #!/usr/bin/python3
  2. import sys, os
  3. import subprocess, shlex
  4. import time
  5. import unittest
  6. import random
  7. from Test.TestUtil import PcapComparator, ID2TExecution
  8. # this dictionary holds the generators (functions) for the parameters
  9. # that will be passed to the MembershipMgmtCommAttack
  10. # items need the parameter-name as key and a function that will be called
  11. # without parameters and returns a valid value for that parameter as value
  12. # WARNING: parameters will be passed via command line, make sure your values
  13. # get converted to string correctly
  14. _random_bool = lambda: random.random() < 0.5
  15. ID2T_PARAMETER_GENERATORS = {
  16. "bots.count": lambda: random.randint(1, 6),
  17. "hidden_mark": _random_bool,
  18. "interval.selection.end": lambda: random.randint(100, 1501), # values are taken from default trace
  19. "interval.selection.start": lambda: random.randint(0, 1401),
  20. "interval.selection.strategy": lambda: random.choice(["optimal", "custom", "random"]),
  21. "ip.reuse.external": lambda: random.uniform(0, 1),
  22. "ip.reuse.local": lambda: random.uniform(0, 1),
  23. "ip.reuse.total": lambda: random.uniform(0, 1),
  24. "multiport": _random_bool,
  25. "nat.present": _random_bool,
  26. "packet.padding": lambda: random.randint(0, 100),
  27. "packets.limit": lambda: random.randint(50, 250),
  28. "ttl.from.caida": _random_bool,
  29. }
  30. class PcapComparison(unittest.TestCase):
  31. ID2T_PATH = ".."
  32. ID2T_LOCATION = ID2T_PATH + "/" + "id2t"
  33. NUM_ITERATIONS_PER_PARAMS = 3
  34. NUM_ITERATIONS = 4
  35. PCAP_ENVIRONMENT_VALUE = "ID2T_SRC_PCAP"
  36. SEED_ENVIRONMENT_VALUE = "ID2T_SEED"
  37. DEFAULT_PCAP = "resources/test/Botnet/telnet-raw.pcap"
  38. DEFAULT_SEED = "42"
  39. VERBOSE = False
  40. def __init__(self, *args, **kwargs):
  41. unittest.TestCase.__init__(self, *args, **kwargs)
  42. # params to call id2t with, as a list[list[str]]
  43. # do a round of testing for each list[str] we get
  44. # if none generate some params itself
  45. self.id2t_params = None
  46. self.printed_newline = False
  47. def set_id2t_params(self, params: "list[list[str]]"):
  48. self.id2t_params = params
  49. def setUp(self):
  50. self.executions = []
  51. def test_determinism(self):
  52. self.print_warning("Conducting test for determinism of Membership Management Communication Attack:\n")
  53. input_pcap = os.environ.get(self.PCAP_ENVIRONMENT_VALUE, self.DEFAULT_PCAP)
  54. seed = os.environ.get(self.SEED_ENVIRONMENT_VALUE, None)
  55. if self.id2t_params is None:
  56. self.id2t_params = self.random_id2t_params()
  57. use_random_seeds = not bool(seed)
  58. for i, params in enumerate(self.id2t_params):
  59. self.print_warning("Test round %d:" % (i+1))
  60. self.print_warning("=================================")
  61. if use_random_seeds:
  62. seed = random.randint(0, 0x7FFFFFFF)
  63. self.do_test_round(input_pcap, seed, params)
  64. self.print_warning()
  65. def do_test_round(self, input_pcap, seed, additional_params):
  66. generated_pcap = None
  67. for i in range(self.NUM_ITERATIONS_PER_PARAMS):
  68. execution = ID2TExecution(input_pcap, seed=seed)
  69. self.print_warning("The command that gets executed is:", execution.get_run_command(additional_params))
  70. self.executions.append(execution)
  71. try:
  72. execution.run(additional_params)
  73. except AssertionError as e:
  74. self.print_warning(execution.get_output())
  75. self.assertEqual(execution.get_return_code(), 0, "For some reason id2t completed with an error")
  76. raise e
  77. self.print_warning(execution.get_output())
  78. pcap = execution.get_pcap_filename()
  79. if generated_pcap is not None:
  80. if "No packets were injected." in pcap or "No packets were injected." in generated_pcap:
  81. self.assertEqual(pcap, generated_pcap)
  82. else:
  83. try:
  84. self.compare_pcaps(generated_pcap, pcap)
  85. except AssertionError as e:
  86. execution.keep_file(pcap)
  87. for ex in self.executions:
  88. try:
  89. ex.keep_file(generated_pcap)
  90. except ValueError:
  91. pass
  92. raise e
  93. else:
  94. generated_pcap = pcap
  95. self.print_warning()
  96. time.sleep(1) # let some time pass between calls because files are based on the time
  97. def tearDown(self):
  98. self.print_warning("Cleaning up files generated by the test-calls...")
  99. for id2t_run in self.executions:
  100. for file in id2t_run.get_files_for_deletion():
  101. self.print_warning(file)
  102. id2t_run.cleanup()
  103. self.print_warning("Done")
  104. if any(e.get_kept_files() for e in self.executions):
  105. self.print_warning("The following files have been kept:")
  106. for e in self.executions:
  107. for file in e.get_kept_files():
  108. self.print_warning(file)
  109. def compare_pcaps(self, one: str, other: str):
  110. PcapComparator().compare_files(self.ID2T_PATH + "/" + one, self.ID2T_PATH + "/" + other)
  111. def print_warning(self, *text):
  112. if self.VERBOSE:
  113. if not self.printed_newline:
  114. print("\n", file=sys.stderr)
  115. self.printed_newline = True
  116. print(*text, file=sys.stderr)
  117. def random_id2t_params(self):
  118. """
  119. :return: A list of parameter-lists for id2t, useful if you want several
  120. iterations
  121. """
  122. param_list = []
  123. for i in range(self.NUM_ITERATIONS):
  124. param_list.append(self.random_id2t_param_set())
  125. return param_list
  126. def random_id2t_param_set(self):
  127. """
  128. Create a list of parameters to call the membersmgmtcommattack with
  129. :return: a list of command-line parameters
  130. """
  131. param = lambda key, val: "%s=%s" % (str(key), str(val))
  132. number_of_keys = min(random.randint(2, 5), len(ID2T_PARAMETER_GENERATORS))
  133. keys = random.sample(list(ID2T_PARAMETER_GENERATORS), number_of_keys)
  134. params = []
  135. for key in keys:
  136. generator = ID2T_PARAMETER_GENERATORS[key]
  137. params.append(param(key, generator()))
  138. return params
  139. if __name__ == "__main__":
  140. import sys
  141. # parameters for this program are interpreted as id2t-parameters
  142. id2t_args = sys.argv[1:]
  143. comparison = PcapComparison("test_determinism")
  144. if id2t_args: comparison.set_id2t_params([id2t_args])
  145. suite = unittest.TestSuite()
  146. suite.addTest(comparison)
  147. unittest.TextTestRunner().run(suite)