""" This module starts the probe response attack UI. Use --help to show possible parameters """ import cmd import sys import argparse import logging import os import inspect import netifaces import time from report_fetcher import ReportFetcher from attack_logic import ProbeResponseAttackLogic is_simulation = True # Logging config logging.basicConfig(format="%(levelname)s (%(funcName)s): %(message)s") logger = logging.getLogger("pra_framework") logger.setLevel(logging.DEBUG) # TODO: change for testing if not is_simulation: dirname_save = time.strftime("attack_%Y_%m_%d__%H_%M_%S") + "/" else: dirname_save = "output_dir_testing" + "/" if not os.path.exists(dirname_save): logging.debug("creating new directory to save results: %s" % dirname_save) try: os.mkdir(dirname_save) except Exception as ex_createdir: print(ex_createdir) # Log to file, logging to console is already enabled fileHandler = logging.FileHandler(dirname_save + "/framework.log") logFormatter = logging.Formatter("%(asctime)s %(levelname)s (%(funcName)s): %(message)s", datefmt="%d/%m/%Y %I:%M:%S") fileHandler.setFormatter(logFormatter) logger.addHandler(fileHandler) class AttackUI(cmd.Cmd): """ Command line UI to control the attack framework. """ intro = """\n ____ ____ _____ ____ _ ____ _____ | _ \| _ \| ____| _ \ / \ | _ \| ____| | |_) | |_) | _| | |_) / _ \ | |_) | _| | __/| _ <| |___| __/ ___ \| _ <| |___ |_| |_| \_\_____|_| /_/ \_\_| \_\_____| ########################################################### # Welcome to PREPARE, the Probe REsPonse Attack fRamEwork # ########################################################### Type 'help' for available commands. Use tab for auto-completion. """ promt = ">" file = None def __init__(self, parameters): super().__init__() try: mac_iface = netifaces.ifaddresses(parameters.interface)[17][0]["addr"] ip_for_iface = netifaces.ifaddresses(parameters.interface)[2][0]["addr"] self.attacklogic = ProbeResponseAttackLogic(interface_name=parameters.interface, ip_src=ip_for_iface, mac_source=mac_iface, mac_gw=parameters.macgw, rate_kbit_per_s=parameters.rate, marker_encoding=parameters.encoding, markerbits_value=parameters.markervaluebits, markerbits_checksum=parameters.checksumbits, rate=parameters.rate, base_dir_save=dirname_save, base_dir_zmap="../zmap", report_fetcher_classname=parameters.reportfetcherclass, use_feedback_ips=parameters.usefeedbackips, use_plus1=parameters.useplus1, is_simulation=is_simulation, _ip_stage1="0.0.0.0", _cidr_bits_stage1=0) except Exception as ex: raise FileExistsError("could not initialize logic: %r" % ex) def do_start(self, _): """ Start the probe response attack """ logging.info("starting attack") try: self.attacklogic.start() except Exception as ex: print(ex) def do_stop(self, _): """ Stop the probe response attack """ logging.info("stopping attack") self.attacklogic.stop() def do_setreportfetcher(self, _): """ Set a new reportfetcher to fetch report events of the attacked CIDS. """ def check(clz_search): return inspect.isclass(clz_search) and issubclass(clz_search, ReportFetcher) and clz_search != ReportFetcher classes = inspect.getmembers(sys.modules["report_fetcher"], check) print("available classes (chose 0-%d):" % (len(classes) - 1)) for cnt, clz in enumerate(classes): print("%d: %s" % (cnt, clz[1].__name__)) try: index = int(input("\nNew report fetcher class: ")) self.attacklogic.set_report_fetcher_class(classes[index][0]) except: print("did not understand, won't change anything") def do_setrate(self, rate): """ Change the probe rate of the scanner (Kbit/s) """ try: rate = int(rate) logger.info("New attack rate: %d Kbit/s" % rate) self.attacklogic.rate_kbit_per_s = rate except: print("did not understand, won't change anything") def do_setverbosity(self, _): """ Change the frameworks log verbosity (0=WARNING, 1=INFO, 2=DEBUG) """ try: level = int(input("\nNew log level 0-2:")) levels = [logging.WARNING, logging.INFO, logging.DEBUG] logger.setLevel(levels[level]) except: print("did not understand, won't change anything") def do_stats(self, _): """ Show basic statistics about the attack. """ logging.info("Showing statistics:") logger.info("Attack durations, groups, addresses (1st = root, last = identified):") seconds_total = 0 identified_group_index = 0 try: for groupstore in self.attacklogic.iterations: start = groupstore[0][0] end = groupstore[0][1] diff = 0 if end == 0 else (end - start) seconds_total += diff groups = groupstore[2] addresses = groupstore[3] identified_groups = 0 identified_addr = 0 if identified_group_index > 0: try: # group store 2 (iteration) = group handler iteration 1 identified_groups = sum([1 for gti in self.attacklogic.grouphandler.identified_groups if gti[2] == identified_group_index]) identified_addr = sum([gti[0].amount_addresses for gti in self.attacklogic.grouphandler.identified_groups if gti[2] == identified_group_index]) except: pass identified_group_index += 1 logger.info("%d -> %d (diff: %d), groups: %d, addresses: %d, identified g: %d, identified a: %d" % (start, end, diff, groups, addresses, identified_groups, identified_addr)) logger.info("seconds total: %d" % seconds_total) logger.info("probes total: [first iteration] + %d" % self.attacklogic.get_amount_of_probes()) logger.info("identified groups: %d" % len(self.attacklogic.grouphandler.identified_groups)) logger.info("identified addresses: %d" % sum([gti[0].amount_addresses for gti in self.attacklogic.grouphandler.identified_groups])) except Exception as ex: logger.warning("try again on less activity") logger.warning(ex) def do_quit(self, _): """ Shutdown the attack framework. """ sys.exit(0) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-i", "--interface", help="Interface to send on packets", required=True) parser.add_argument("-m", "--macgw", help="MAC address of gateway", required=True) parser.add_argument("-r", "--rate", type=int, help="Rate used to send packets (Kbit/s)", default=1000) parser.add_argument("-e", "--encoding", type=int, help="Encoding to be used. Single or bit-OR combination (1=dst port," "2=src IP, 4=src port)", default=5) # WARNING: setting marker values too low will make the attack impossible parser.add_argument("-b", "--markervaluebits", type=int, help="Amount of bits to be used for markervalue", default=28) parser.add_argument("-c", "--checksumbits", type=int, help="Amount of bits to be used for checksum", default=4) parser.add_argument("-f", "--reportfetcherclass", help="Name of report fetcher to be used" " (class name contained in report_fetcher.py)", default="TracingReportFetcher") parser.add_argument("-l", "--useplus1", type=bool, help="Use +1 groups", default=False) parser.add_argument("-g", "--usefeedbackips", type=bool, help="Use scanner feedback for group clustering", default=False) args = parser.parse_args() AttackUI(args).cmdloop()