AttackController.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import importlib
  2. import tempfile
  3. from scapy.utils import PcapWriter
  4. import Attack
  5. from Attack.AttackParameters import Parameter
  6. from ID2TLib import LabelManager
  7. from ID2TLib import Statistics
  8. from ID2TLib.Label import Label
  9. from ID2TLib.PcapFile import PcapFile
  10. class AttackController:
  11. def __init__(self, pcap_file: PcapFile, statistics_class: Statistics, label_manager: LabelManager):
  12. """
  13. Creates a new AttackController. The controller manages the attack injection, including the PCAP writing.
  14. :param statistics_class:
  15. """
  16. self.statistics = statistics_class
  17. self.pcap_file = pcap_file
  18. self.label_mgr = label_manager
  19. self.available_attacks = Attack.__all__
  20. self.current_attack = None
  21. self.added_attacks = []
  22. self.written_pcaps = []
  23. # The PCAP where the attack should be injected into
  24. self.base_pcap = self.statistics.pcap_filepath
  25. def write_attack_pcap(self):
  26. """
  27. Writes the attack's packets into a PCAP file with a temporary filename.
  28. :return: The path of the written PCAP file.
  29. """
  30. packets = self.current_attack.get_packets()
  31. # Write packets into pcap file
  32. temp_pcap = tempfile.NamedTemporaryFile(delete=False)
  33. pktdump = PcapWriter(temp_pcap.name)
  34. pktdump.write(packets)
  35. # Store pcap path and close file objects
  36. pcap_path = temp_pcap.name
  37. pktdump.close()
  38. temp_pcap.close()
  39. return pcap_path
  40. def create_attack(self, attack_name: str):
  41. """
  42. Creates dynamically a new class instance based on the given attack_name.
  43. :param attack_name: The name of the attack, must correspond to the attack's class name.
  44. :return: None
  45. """
  46. print("\nCreating attack instance of \033[1m" + attack_name + "\033[0m")
  47. # Load attack class
  48. attack_module = importlib.import_module("Attack." + attack_name)
  49. attack_class = getattr(attack_module, attack_name)
  50. # Set current attack
  51. self.current_attack = attack_class(self.statistics, self.base_pcap)
  52. self.added_attacks.append(self.current_attack)
  53. def process_attack(self, attack: str, params: str):
  54. """
  55. :param attack:
  56. :param params:
  57. :return:
  58. """
  59. self.create_attack(attack)
  60. # Add attack parameters if provided
  61. print("Validating and adding attack parameters.")
  62. params_dict = []
  63. if params is not None:
  64. # Convert attack param list into dictionary
  65. for entry in params:
  66. params_dict.append(entry.split('='))
  67. params_dict = dict(params_dict)
  68. # Check if Parameter.INJECT_AT_TIMESTAMP and Parameter.INJECT_AFTER_PACKET are provided at the same time
  69. # if TRUE: delete Paramter.INJECT_AT_TIMESTAMP (lower priority) and use Parameter.INJECT_AFTER_PACKET
  70. if (Parameter.INJECT_AFTER_PACKET.value in params_dict) and (
  71. Parameter.INJECT_AT_TIMESTAMP.value in params_dict):
  72. print("CONFLICT: Parameters", Parameter.INJECT_AT_TIMESTAMP.value, "and",
  73. Parameter.INJECT_AFTER_PACKET.value,
  74. "given at the same time. Ignoring", Parameter.INJECT_AT_TIMESTAMP.value, "and using",
  75. Parameter.INJECT_AFTER_PACKET.value, "instead to derive the timestamp.")
  76. del params_dict[Parameter.INJECT_AT_TIMESTAMP.value]
  77. # Extract attack_note parameter, if not provided returns an empty string
  78. key_attack_note = "attack.note"
  79. attack_note = params_dict.get(key_attack_note, "")
  80. params_dict.pop(key_attack_note, None) # delete entry if found, otherwise return an empty string
  81. # Pass paramters to attack controller
  82. self.set_params(params_dict)
  83. else:
  84. attack_note = ""
  85. # Write attack into pcap file
  86. temp_attack_pcap_path = self.write_attack_pcap()
  87. # Merge attack with existing pcap
  88. pcap_dest_path = self.pcap_file.merge_attack(temp_attack_pcap_path)
  89. self.written_pcaps.append(pcap_dest_path)
  90. # Store label into LabelManager
  91. l = Label(attack, self.get_attack_start_utime(),
  92. self.get_attack_end_utime(), attack_note)
  93. self.label_mgr.add_labels(l)
  94. return pcap_dest_path
  95. def get_attack_start_utime(self):
  96. """
  97. :return: The start time (timestamp of first packet) of the attack as unix timestamp.
  98. """
  99. return self.current_attack.attack_start_utime
  100. def get_attack_end_utime(self):
  101. """
  102. :return: The end time (timestamp of last packet) of the attack as unix timestamp.
  103. """
  104. return self.current_attack.attack_end_utime
  105. def set_params(self, params: dict):
  106. """
  107. Sets the attack's parameters.
  108. :param params: The parameters in a dictionary: {parameter_name: parameter_value}
  109. :return: None
  110. """
  111. for param_key, param_value in params.items():
  112. self.current_attack.add_param_value(param_key, param_value)