Controller.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import os
  2. import sys
  3. from ID2TLib.AttackController import AttackController
  4. from ID2TLib.LabelManager import LabelManager
  5. from ID2TLib.PcapFile import PcapFile
  6. from ID2TLib.Statistics import Statistics
  7. class Controller:
  8. def __init__(self, in_pcap_file_path: str, do_extra_tests: bool, out_pcap_file_path):
  9. """
  10. Creates a new Controller, acting as a central coordinator for the whole application.
  11. :param pcap_file_path:
  12. """
  13. # Fields
  14. self.pcap_src_path = in_pcap_file_path.strip()
  15. if out_pcap_file_path:
  16. self.pcap_out_path = out_pcap_file_path.strip()
  17. else:
  18. self.pcap_out_path = None
  19. self.pcap_dest_path = ''
  20. self.written_pcaps = []
  21. self.do_extra_tests = do_extra_tests
  22. # Initialize class instances
  23. print("Input file: %s" % self.pcap_src_path)
  24. self.pcap_file = PcapFile(self.pcap_src_path)
  25. self.label_manager = LabelManager(self.pcap_src_path)
  26. self.statistics = Statistics(self.pcap_file)
  27. self.statistics.do_extra_tests = self.do_extra_tests
  28. self.statisticsDB = self.statistics.get_statistics_database()
  29. self.attack_controller = AttackController(self.pcap_file, self.statistics, self.label_manager)
  30. def load_pcap_statistics(self, flag_write_file: bool, flag_recalculate_stats: bool, flag_print_statistics: bool):
  31. """
  32. Loads the PCAP statistics either from the database, if the statistics were calculated earlier, or calculates
  33. the statistics and creates a new database.
  34. :param flag_write_file: Writes the statistics to a file.
  35. :param flag_recalculate_stats: Forces the recalculation of statistics.
  36. :param flag_print_statistics: Prints the statistics on the terminal.
  37. :return: None
  38. """
  39. self.statistics.load_pcap_statistics(flag_write_file, flag_recalculate_stats, flag_print_statistics)
  40. def process_attacks(self, attacks_config: list):
  41. """
  42. Creates the attack based on the attack name and the attack parameters given in the attacks_config. The
  43. attacks_config is a list of attacks, e.g.
  44. [['PortscanAttack', 'ip.src="192.168.178.2",'dst.port=80'],['PortscanAttack', 'ip.src="10.10.10.2"]].
  45. Merges the individual temporary attack pcaps into one single pcap and merges this single pcap with the
  46. input dataset.
  47. :param attacks_config: A list of attacks with their attack parameters.
  48. """
  49. # load attacks sequentially
  50. for attack in attacks_config:
  51. temp_attack_pcap = self.attack_controller.process_attack(attack[0], attack[1:])
  52. self.written_pcaps.append(temp_attack_pcap)
  53. # merge attack pcaps to get single attack pcap
  54. if len(self.written_pcaps) > 1:
  55. print("\nMerging temporary attack pcaps into single pcap file...", end=" ")
  56. sys.stdout.flush() # force python to print text immediately
  57. for i in range(0, len(self.written_pcaps) - 1):
  58. attacks_pcap = PcapFile(self.written_pcaps[i])
  59. attacks_pcap_path = attacks_pcap.merge_attack(self.written_pcaps[i + 1])
  60. os.remove(self.written_pcaps[i + 1]) # remove merged pcap
  61. print("done.")
  62. else:
  63. attacks_pcap_path = self.written_pcaps[0]
  64. # merge single attack pcap with all attacks into base pcap
  65. print("Merging base pcap with single attack pcap...", end=" ")
  66. sys.stdout.flush() # force python to print text immediately
  67. self.pcap_dest_path = self.pcap_file.merge_attack(attacks_pcap_path)
  68. if self.pcap_out_path:
  69. if not self.pcap_out_path.endswith(".pcap"):
  70. self.pcap_out_path += ".pcap"
  71. os.rename(self.pcap_dest_path, self.pcap_out_path)
  72. self.pcap_dest_path = self.pcap_out_path
  73. print("done.")
  74. # delete intermediate PCAP files
  75. print('Deleting intermediate attack pcap...', end=" ")
  76. sys.stdout.flush() # force python to print text immediately
  77. os.remove(attacks_pcap_path)
  78. print("done.")
  79. # write label file with attacks
  80. self.label_manager.write_label_file(self.pcap_dest_path)
  81. # print status message
  82. print('\nOutput files created: \n', self.pcap_dest_path, '\n', self.label_manager.label_file_path)
  83. def process_db_queries(self, query, print_results=False):
  84. """
  85. Processes a statistics database query. This can be a standard SQL query or a named query.
  86. :param query: The query as a string or multiple queries as a list of strings.
  87. :param print_results: Must be True if the results should be printed to terminal.
  88. :return: The query's result
  89. """
  90. print("Processing database query/queries...")
  91. if isinstance(query, list) or isinstance(query, tuple):
  92. for q in query:
  93. self.statisticsDB.process_db_query(q, print_results)
  94. else:
  95. self.statisticsDB.process_db_query(query, print_results)
  96. def enter_query_mode(self):
  97. """
  98. Enters into the query mode. This is a read-eval-print-loop, where the user can input named queries or SQL
  99. queries and the results are printed.
  100. """
  101. print("Entering into query mode...")
  102. print("Enter statement ending by ';' and press ENTER to send query. Exit by sending an empty query..")
  103. buffer = ""
  104. while True:
  105. line = input("> ")
  106. if line == "":
  107. break
  108. buffer += line
  109. import sqlite3
  110. if sqlite3.complete_statement(buffer):
  111. try:
  112. buffer = buffer.strip()
  113. self.statisticsDB.process_db_query(buffer, True)
  114. except sqlite3.Error as e:
  115. print("An error occurred:", e.args[0])
  116. buffer = ""
  117. def create_statistics_plot(self, params: str):
  118. """
  119. Plots the statistics to a file by using the given customization parameters.
  120. """
  121. if params is not None and params[0] is not None:
  122. params_dict = dict([z.split("=") for z in params])
  123. self.statistics.plot_statistics(format=params_dict['format'])
  124. else:
  125. self.statistics.plot_statistics()