3
0

AttackController.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. import importlib
  2. import sys
  3. import difflib
  4. import pkgutil
  5. import typing
  6. import Attack.AttackParameters as atkParam
  7. import Core.LabelManager as LabelManager
  8. import Core.Statistics as Statistics
  9. import ID2TLib.Label as Label
  10. import ID2TLib.PcapFile as PcapFile
  11. import ID2TLib.Utility as Util
  12. class AttackController:
  13. def __init__(self, pcap_file: PcapFile.PcapFile, statistics_class: Statistics, label_manager: LabelManager):
  14. """
  15. Creates a new AttackController. The controller manages the attack injection, including the PCAP writing.
  16. :param pcap_file: The source .pcap file to run the attack on.
  17. :param statistics_class: A Statistics Object.
  18. :param label_manager: A LabelManager Object.
  19. """
  20. self.statistics = statistics_class
  21. self.pcap_file = pcap_file
  22. self.label_mgr = label_manager
  23. self.current_attack = None
  24. self.added_attacks = []
  25. self.seed = None
  26. self.total_packets = 0
  27. def set_seed(self, seed: int) -> None:
  28. """
  29. Sets rng seed.
  30. :param seed: rng seed
  31. """
  32. self.seed = seed
  33. def get_seed(self) -> typing.Union[int, None]:
  34. """
  35. Gets rng seed.
  36. :return: The current rng seed
  37. """
  38. return self.seed
  39. @staticmethod
  40. def choose_attack(input_name):
  41. """"
  42. Finds the attack best matching to input_name
  43. :param input_name: The name of the attack the user put in
  44. :return: The best matching attack in case one was found
  45. """
  46. import Attack
  47. # Find all attacks, exclude some classes
  48. package = Attack
  49. available_attacks = []
  50. for _, name, __ in pkgutil.iter_modules(package.__path__):
  51. if name != 'BaseAttack' and name != 'AttackParameters':
  52. available_attacks.append(name)
  53. highest_sim = 0.0
  54. highest_sim_attack = None
  55. for attack in available_attacks:
  56. # Check if attack name is accurate
  57. if input_name == attack:
  58. return attack
  59. # Compares input with one of the available attacks
  60. # Makes comparison with lowercase version with generic endings removed
  61. # List of generic attack name endings can be found in ID2TLib.Utility
  62. counter_check = attack.lower()
  63. if not any(ending in input_name for ending in Util.generic_attack_names):
  64. counter_check = Util.remove_generic_ending(counter_check)
  65. similarity = difflib.SequenceMatcher(None, input_name.lower(), counter_check).ratio()
  66. # Exact match, return appropriate attack name
  67. if similarity == 1.0:
  68. return attack
  69. # Found more likely match
  70. if similarity > highest_sim:
  71. highest_sim = similarity
  72. highest_sim_attack = attack
  73. # Found no exactly matching attack name, print best match and exit
  74. if highest_sim >= 0.6:
  75. print('Found no attack of name ' + input_name + '. The closest match was ' + highest_sim_attack +
  76. '. Use ./id2t -l for a list of available attacks.')
  77. exit(1)
  78. # Found no reasonably matching attack name, recommend -l and exit
  79. else:
  80. print('Found no attack of name ' + input_name + ' or one similar to it.'
  81. ' Use ./id2t -l for an overview of available attacks.')
  82. exit(1)
  83. def create_attack(self, attack_name: str, seed=None):
  84. """
  85. Creates dynamically a new class instance based on the given attack_name.
  86. :param attack_name: The name of the attack, must correspond to the attack's class name.
  87. :param seed: random seed for param generation
  88. :return: None
  89. """
  90. attack_name = self.choose_attack(attack_name)
  91. print("\nCreating attack instance of \033[1m" + attack_name + "\033[0m")
  92. # Load attack class
  93. attack_module = importlib.import_module("Attack." + attack_name)
  94. attack_class = getattr(attack_module, attack_name)
  95. # Instantiate the desired attack
  96. self.current_attack = attack_class()
  97. # Initialize the parameters of the attack with defaults or user supplied values.
  98. self.current_attack.set_statistics(self.statistics)
  99. if seed is not None:
  100. self.current_attack.set_seed(seed=seed)
  101. self.current_attack.init_params()
  102. # Unset the user-specified-flag for all parameters set in init_params
  103. for k, v in self.current_attack.params.items():
  104. self.current_attack.params[k] = self.current_attack.ValuePair(v.value, False)
  105. # Record the attack
  106. self.added_attacks.append(self.current_attack)
  107. def process_attack(self, attack: str, params: str, time=False):
  108. """
  109. Takes as input the name of an attack (classname) and the attack parameters as string. Parses the string of
  110. attack parameters, creates the attack by writing the attack packets and returns the path of the written pcap.
  111. :param attack: The classname of the attack to injecect.
  112. :param params: The parameters for attack customization, see attack class for supported params.
  113. :param time: Measure packet generation time or not.
  114. :return: The file path to the created pcap file.
  115. """
  116. self.create_attack(attack, self.seed)
  117. print("Validating and adding attack parameters.")
  118. # Add attack parameters if provided
  119. params_dict = []
  120. if isinstance(params, list) and params:
  121. # Convert attack param list into dictionary
  122. for entry in params:
  123. params_dict.append(entry.split('='))
  124. params_dict = dict(params_dict)
  125. # Check if Parameter.INJECT_AT_TIMESTAMP and Parameter.INJECT_AFTER_PACKET are provided at the same time
  126. # if TRUE: delete Paramter.INJECT_AT_TIMESTAMP (lower priority) and use Parameter.INJECT_AFTER_PACKET
  127. if (atkParam.Parameter.INJECT_AFTER_PACKET.value in params_dict) and (
  128. atkParam.Parameter.INJECT_AT_TIMESTAMP.value in params_dict):
  129. print("CONFLICT: Parameters", atkParam.Parameter.INJECT_AT_TIMESTAMP.value, "and",
  130. atkParam.Parameter.INJECT_AFTER_PACKET.value,
  131. "given at the same time. Ignoring", atkParam.Parameter.INJECT_AT_TIMESTAMP.value, "and using",
  132. atkParam.Parameter.INJECT_AFTER_PACKET.value, "instead to derive the timestamp.")
  133. del params_dict[atkParam.Parameter.INJECT_AT_TIMESTAMP.value]
  134. # Extract attack_note parameter, if not provided returns an empty string
  135. key_attack_note = "attack.note"
  136. attack_note = params_dict.get(key_attack_note, "")
  137. params_dict.pop(key_attack_note, None) # delete entry if found, otherwise return an empty string
  138. # Pass paramters to attack controller
  139. self.set_params(params_dict)
  140. else:
  141. attack_note = "This attack used only (random) default parameters."
  142. # Write attack into pcap file
  143. print("Generating attack packets...", end=" ")
  144. sys.stdout.flush() # force python to print text immediately
  145. if time:
  146. self.current_attack.set_start_time()
  147. self.current_attack.generate_attack_packets()
  148. if time:
  149. self.current_attack.set_finish_time()
  150. duration = self.current_attack.get_packet_generation_time()
  151. self.total_packets, temp_attack_pcap_path = self.current_attack.generate_attack_pcap()
  152. print("done. (total: " + str(self.total_packets) + " pkts", end="")
  153. if time:
  154. print(" in ", duration, " seconds", end="")
  155. print(".)")
  156. # Store label into LabelManager
  157. label = Label.Label(attack, self.get_attack_start_utime(), self.get_attack_end_utime(),
  158. self.seed, self.current_attack.params, attack_note)
  159. self.label_mgr.add_labels(label)
  160. return temp_attack_pcap_path, duration
  161. def get_attack_start_utime(self):
  162. """
  163. :return: The start time (timestamp of first packet) of the attack as unix timestamp.
  164. """
  165. return self.current_attack.attack_start_utime
  166. def get_attack_end_utime(self):
  167. """
  168. :return: The end time (timestamp of last packet) of the attack as unix timestamp.
  169. """
  170. return self.current_attack.attack_end_utime
  171. def set_params(self, params: dict):
  172. """
  173. Sets the attack's parameters.
  174. :param params: The parameters in a dictionary: {parameter_name: parameter_value}
  175. :return: None
  176. """
  177. for param_key, param_value in params.items():
  178. self.current_attack.add_param_value(param_key, param_value)