123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- import os
- import sys
- import shutil
- from ID2TLib.AttackController import AttackController
- from ID2TLib.LabelManager import LabelManager
- from ID2TLib.PcapFile import PcapFile
- from ID2TLib.Statistics import Statistics
- from ID2TLib.AttackContext import AttackContext
- class Controller:
- def __init__(self, in_pcap_file_path: str, do_extra_tests: bool, out_pcap_file_path):
- """
- Creates a new Controller, acting as a central coordinator for the whole application.
- :param pcap_file_path:
- """
- # Fields
- self.pcap_src_path = in_pcap_file_path.strip()
- if out_pcap_file_path:
- self.pcap_out_path = out_pcap_file_path.strip()
- else:
- self.pcap_out_path = None
- self.pcap_dest_path = ''
- self.written_pcaps = []
- self.do_extra_tests = do_extra_tests
- # Initialize class instances
- print("Input file: %s" % self.pcap_src_path)
- self.pcap_file = PcapFile(self.pcap_src_path)
- self.label_manager = LabelManager(self.pcap_src_path)
- self.statistics = Statistics(self.pcap_file)
- self.statistics.do_extra_tests = self.do_extra_tests
- self.statisticsDB = self.statistics.get_statistics_database()
- self.attack_controller = AttackController(self.pcap_file, self.statistics, self.label_manager)
- def load_pcap_statistics(self, flag_write_file: bool, flag_recalculate_stats: bool, flag_print_statistics: bool):
- """
- Loads the PCAP statistics either from the database, if the statistics were calculated earlier, or calculates
- the statistics and creates a new database.
- :param flag_write_file: Writes the statistics to a file.
- :param flag_recalculate_stats: Forces the recalculation of statistics.
- :param flag_print_statistics: Prints the statistics on the terminal.
- :return: None
- """
- self.statistics.load_pcap_statistics(flag_write_file, flag_recalculate_stats, flag_print_statistics)
- def process_attacks(self, attacks_config: list):
- """
- Creates the attack based on the attack name and the attack parameters given in the attacks_config. The
- attacks_config is a list of attacks, e.g.
- [['PortscanAttack', 'ip.src="192.168.178.2",'dst.port=80'],['PortscanAttack', 'ip.src="10.10.10.2"]].
- Merges the individual temporary attack pcaps into one single pcap and merges this single pcap with the
- input dataset.
- :param attacks_config: A list of attacks with their attack parameters.
- """
- # context for the attack(s)
- context = AttackContext()
- # note if new xml file has been created by MembersMgmtCommAttack
- created_xml = None
- # load attacks sequentially
- for attack in attacks_config:
- # check if new xml file has been created by MembersMgmtCommAttack
- if attack[0] == "MembersMgmtCommAttack":
- for param in attack[1:]:
- key, value = param.split("=")
- if key == "file.csv":
- if os.path.isfile(value):
- created_xml, _ = os.path.splitext(value)
- created_xml += ".xml"
- break
- temp_attack_pcap = self.attack_controller.process_attack(attack[0], attack[1:], context)
- self.written_pcaps.append(temp_attack_pcap)
- # merge attack pcaps to get single attack pcap
- if len(self.written_pcaps) > 1:
- print("\nMerging temporary attack pcaps into single pcap file...", end=" ")
- sys.stdout.flush() # force python to print text immediately
- for i in range(0, len(self.written_pcaps) - 1):
- attacks_pcap = PcapFile(self.written_pcaps[i])
- attacks_pcap_path = attacks_pcap.merge_attack(self.written_pcaps[i + 1])
- os.remove(self.written_pcaps[i + 1]) # remove merged pcap
- print("done.")
- else:
- attacks_pcap_path = self.written_pcaps[0]
- # merge single attack pcap with all attacks into base pcap
- print("Merging base pcap with single attack pcap...", end=" ")
- sys.stdout.flush() # force python to print text immediately
- # cp merged PCAP to output path
- self.pcap_dest_path = self.pcap_file.merge_attack(attacks_pcap_path)
- if self.pcap_out_path:
- if not self.pcap_out_path.endswith(".pcap"):
- self.pcap_out_path += ".pcap"
- os.rename(self.pcap_dest_path, self.pcap_out_path)
- self.pcap_dest_path = self.pcap_out_path
- print("done.")
- # delete intermediate PCAP files
- print('Deleting intermediate attack pcap...', end=" ")
- sys.stdout.flush() # force python to print text immediately
- os.remove(attacks_pcap_path)
- print("done.")
- # write label file with attacks
- self.label_manager.write_label_file(self.pcap_dest_path)
- # if MembersMgmtCommAttack created an xml file, move it into input pcap directory
- if created_xml:
- pcap_dir = os.path.splitext(self.pcap_dest_path)[0]
- if "/" in pcap_dir:
- pcap_dir = "/".join(pcap_dir.split("/")[:-1])
- xml_name = os.path.splitext(created_xml)[0] + ".xml"
- if "/" in xml_name:
- xml_name = xml_name.split("/")[-1]
- new_xml_path = pcap_dir + "/" + xml_name
- os.rename(created_xml, new_xml_path)
- # pcap_base contains the name of the pcap-file without the ".pcap" extension
- pcap_base = os.path.splitext(self.pcap_dest_path)[0]
- for suffix, filename in context.get_allocated_files():
- print(filename, pcap_base + suffix)
- shutil.move(filename, pcap_base + suffix)
- context.reset()
- # print status message
- if created_xml:
- print('\nOutput files created: \n', self.pcap_dest_path, '\n', self.label_manager.label_file_path, '\n', new_xml_path)
- else:
- print('\nOutput files created: \n', self.pcap_dest_path, '\n', self.label_manager.label_file_path)
- def process_db_queries(self, query, print_results=False):
- """
- Processes a statistics database query. This can be a standard SQL query or a named query.
- :param query: The query as a string or multiple queries as a list of strings.
- :param print_results: Must be True if the results should be printed to terminal.
- :return: The query's result
- """
- print("Processing database query/queries...")
- if isinstance(query, list) or isinstance(query, tuple):
- for q in query:
- self.statisticsDB.process_db_query(q, print_results)
- else:
- self.statisticsDB.process_db_query(query, print_results)
- def enter_query_mode(self):
- """
- Enters into the query mode. This is a read-eval-print-loop, where the user can input named queries or SQL
- queries and the results are printed.
- """
- print("Entering into query mode...")
- print("Enter statement ending by ';' and press ENTER to send query. Exit by sending an empty query..")
- buffer = ""
- while True:
- line = input("> ")
- if line == "":
- break
- buffer += line
- import sqlite3
- if sqlite3.complete_statement(buffer):
- try:
- buffer = buffer.strip()
- self.statisticsDB.process_db_query(buffer, True)
- except sqlite3.Error as e:
- print("An error occurred:", e.args[0])
- buffer = ""
- def create_statistics_plot(self, params: str):
- """
- Plots the statistics to a file by using the given customization parameters.
- """
- if params is not None and params[0] is not None:
- params_dict = dict([z.split("=") for z in params])
- self.statistics.plot_statistics(format=params_dict['format'])
- else:
- self.statistics.plot_statistics()
|