AttackController.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. import importlib
  2. import sys
  3. from Attack.AttackParameters import Parameter
  4. from ID2TLib import LabelManager
  5. from ID2TLib import Statistics
  6. from ID2TLib.Label import Label
  7. from ID2TLib.PcapFile import PcapFile
  8. class AttackController:
  9. def __init__(self, pcap_file: PcapFile, statistics_class: Statistics, label_manager: LabelManager):
  10. """
  11. Creates a new AttackController. The controller manages the attack injection, including the PCAP writing.
  12. :param statistics_class:
  13. """
  14. self.statistics = statistics_class
  15. self.pcap_file = pcap_file
  16. self.label_mgr = label_manager
  17. self.current_attack = None
  18. self.added_attacks = []
  19. self.seed = None
  20. def set_seed(self, seed: int):
  21. self.seed = seed
  22. def create_attack(self, attack_name: str, seed=None):
  23. """
  24. Creates dynamically a new class instance based on the given attack_name.
  25. :param attack_name: The name of the attack, must correspond to the attack's class name.
  26. :param seed: random seed for param generation
  27. :return: None
  28. """
  29. print("\nCreating attack instance of \033[1m" + attack_name + "\033[0m")
  30. # Load attack class
  31. attack_module = importlib.import_module("Attack." + attack_name)
  32. attack_class = getattr(attack_module, attack_name)
  33. # Instantiate the desired attack
  34. self.current_attack = attack_class()
  35. # Initialize the parameters of the attack with defaults or user supplied values.
  36. self.current_attack.set_statistics(self.statistics)
  37. if seed is not None:
  38. self.current_attack.set_seed(seed=seed)
  39. self.current_attack.init_params()
  40. # Record the attack
  41. self.added_attacks.append(self.current_attack)
  42. def process_attack(self, attack: str, params: str):
  43. """
  44. Takes as input the name of an attack (classname) and the attack parameters as string. Parses the string of
  45. attack parameters, creates the attack by writing the attack packets and returns the path of the written pcap.
  46. :param attack: The classname of the attack to injecect.
  47. :param params: The parameters for attack customization, see attack class for supported params.
  48. :return: The file path to the created pcap file.
  49. """
  50. self.create_attack(attack, self.seed)
  51. print("Validating and adding attack parameters.")
  52. # Add attack parameters if provided
  53. params_dict = []
  54. if isinstance(params, list) and params:
  55. # Convert attack param list into dictionary
  56. for entry in params:
  57. params_dict.append(entry.split('='))
  58. params_dict = dict(params_dict)
  59. # Check if Parameter.INJECT_AT_TIMESTAMP and Parameter.INJECT_AFTER_PACKET are provided at the same time
  60. # if TRUE: delete Paramter.INJECT_AT_TIMESTAMP (lower priority) and use Parameter.INJECT_AFTER_PACKET
  61. if (Parameter.INJECT_AFTER_PACKET.value in params_dict) and (
  62. Parameter.INJECT_AT_TIMESTAMP.value in params_dict):
  63. print("CONFLICT: Parameters", Parameter.INJECT_AT_TIMESTAMP.value, "and",
  64. Parameter.INJECT_AFTER_PACKET.value,
  65. "given at the same time. Ignoring", Parameter.INJECT_AT_TIMESTAMP.value, "and using",
  66. Parameter.INJECT_AFTER_PACKET.value, "instead to derive the timestamp.")
  67. del params_dict[Parameter.INJECT_AT_TIMESTAMP.value]
  68. # Extract attack_note parameter, if not provided returns an empty string
  69. key_attack_note = "attack.note"
  70. attack_note = params_dict.get(key_attack_note, "")
  71. params_dict.pop(key_attack_note, None) # delete entry if found, otherwise return an empty string
  72. # Pass paramters to attack controller
  73. self.set_params(params_dict)
  74. else:
  75. attack_note = "This attack used only (random) default parameters."
  76. # Write attack into pcap file
  77. print("Generating attack packets...", end=" ")
  78. sys.stdout.flush() # force python to print text immediately
  79. total_packets, temp_attack_pcap_path = self.current_attack.generate_attack_pcap()
  80. print("done. (total: " + str(total_packets) + " pkts.)")
  81. # Store label into LabelManager
  82. l = Label(attack, self.get_attack_start_utime(),
  83. self.get_attack_end_utime(), attack_note)
  84. self.label_mgr.add_labels(l)
  85. return temp_attack_pcap_path
  86. def get_attack_start_utime(self):
  87. """
  88. :return: The start time (timestamp of first packet) of the attack as unix timestamp.
  89. """
  90. return self.current_attack.attack_start_utime
  91. def get_attack_end_utime(self):
  92. """
  93. :return: The end time (timestamp of last packet) of the attack as unix timestamp.
  94. """
  95. return self.current_attack.attack_end_utime
  96. def set_params(self, params: dict):
  97. """
  98. Sets the attack's parameters.
  99. :param params: The parameters in a dictionary: {parameter_name: parameter_value}
  100. :return: None
  101. """
  102. for param_key, param_value in params.items():
  103. self.current_attack.add_param_value(param_key, param_value)