""" Module to handle report fetching. Requirements: - socketIO_client """ import requests import time import re import logging import threading import os from collections import defaultdict split_tab = re.compile("\t").split split_newline = re.compile("\r?\n+").split pattern_html = re.compile("<.+>") from socketIO_client import SocketIO import utility STATE_INITIATED = 0 # state when ReportFetcher was initiated STATE_BEFORE_SCANNING = 1 # state when before_scanning() was called STATE_AFTER_SCANNING = 2 # state when after_scanning() was called logging.basicConfig(format="%(levelname)s (%(funcName)s): %(message)s") logger = logging.getLogger("pra_framework") logger.setLevel(logging.DEBUG) class ReportFetcher(object): """ Handles report fetching on a CIDS. All attack events have to be added using add_entry(ip, port_src, port_dst) until the end of the method after_scanning(). """ def __init__(self, add_entry_callback, filename_save): """ add_entry_callback -- callback to be used to count/reassemble markers. method-signature: callback(ip_source (string), port_source (int), port_dest(int)) filename_save -- filename to save report information """ self._state = STATE_INITIATED self.add_entry = add_entry_callback self._filename_save = filename_save def before_scanning(self, *args): """ Initiate report fetching. This will be called before the actual scan starts. This can include: - Remember last report entry to skip unneeded entries - Initiate HTTP connection to collect attack entries args -- any additional parameters (just for debugging purposes) """ if self._state not in [STATE_INITIATED, STATE_AFTER_SCANNING]: raise Exception("Wrong state: %d" % self._state) self._state = STATE_BEFORE_SCANNING def after_scanning(self): """ Execute all actions tied to finishing the report fetching. This will be called after a scan has finished. """ if self._state != STATE_BEFORE_SCANNING: raise Exception("Wrong state: %d" % self._state) self._state = STATE_AFTER_SCANNING class TracingReportFetcher(ReportFetcher): """ Report fetcher implementation for TraCINg CIDS. Note: There is a bug which lets TraCINg setting port 0 and 65535 to "" (empty value). """ def __init__(self, addentry_callback, filename_save, #tracing_domain="ssi.cased.de"): tracing_domain="localhost"): super().__init__(addentry_callback, filename_save) self._tracing_domain = tracing_domain self._wait_thread = None self._socketio_connection = None self._fd_savereport = None def before_scanning(self): super().before_scanning() logger.debug("starting to listen to TraCINg over Socket.IO, domain: %s" % self._tracing_domain) socketio_connection = SocketIO(self._tracing_domain, 80) self._socketio_connection = socketio_connection # check for file before creating it add_header = not os.path.isfile(self._filename_save) self._fd_savereport = open(self._filename_save, "a") if add_header: # file did not yet exist, append header self._fd_savereport.write("date\tmonitor\ttype\tip_src\tip_dst\tport_src\tport_dst\n") self._fd_savereport.flush() def incident_callback(*incidents): # logger.debug("got INCIDENTS: %d" % len(incidents)) # we've got source and destination IP adress / "officially" only information # about source/target city. As using these information (IP address or city) # would be far too easy we leave those information out. for incident in incidents: # take external monitor ip if present monitor_ip = incident["external_ip"] if "external_ip" in incident else incident["dst"]["ip"] #logger.debug("Monitor IP extern/normal: %r/%r" % (monitor_ip, incident["dst"]["ip"])) #pprint.pprint(incident) # just react on events 10=Transport Layer, 11=Portscan try: if incident["type"] in [10, 11]: port_src = int(incident["src"]["port"]) port_dst = int(incident["dst"]["port"]) #logger.debug("adding entry for IP: %s, port src=%d, port dst=%d" % (monitor_ip, port_src, port_dst)) self.add_entry(None, port_src, port_dst) except Exception as ex: logger.warning("problem on extracting TraCINg data: %r" % ex) try: # date, sensor, ip src, ip dst, port src, port dst entry = "%s\t%s\t%s\t%s\t%s\t%s\t%s\n" % ( incident["date"], incident["sensorname"], incident["type"], incident["src"]["ip"], monitor_ip, incident["src"]["port"], incident["dst"]["port"] ) # logger.debug("new entry is: %r" % entry) #self._fd_savereport.write(entry) #self._fd_savereport.flush() except Exception as ex: logger.warning(ex) #logger.debug(self._fetched_ips) socketio_connection.on("markIncident", incident_callback) self._wait_thread = threading.Thread(target=socketio_connection.wait) # logger.debug("starting event handler thread") self._wait_thread.start() # jsut for savety: make sure we can handle incoming events time.sleep(1) def after_scanning(self): # make sure all events got catched logger.info("waiting to catch all events") #time.sleep(10) #time.sleep(30) #time.sleep(60) #time.sleep(180) #time.sleep(300) time.sleep(600) super().after_scanning() logger.debug("disconnecting socket io") self._socketio_connection.disconnect() if self._fd_savereport is not None: self._fd_savereport.close() PATTERN_TABSPLIT = re.compile("\t+") PATTERN_NEWLINESPLIT = re.compile("\r?\n") class DShieldReportFetcher(ReportFetcher): """ Report Fetcher for the DShield CIDS. Report can be fetched using the following URL: https://www.dshield.org/livefeed.html?day=today&start=463960503000&count=1000 Parameters: day - The day to fetch a report for start - if of an attack event to start with count - amount of events to download TODO: DShield events doensn't seem to be placed in order, not even on a daily accuracy. """ # day format: YYYY-MM-DD DSHIELD_URL_DAY_START_COUNT = "http://isc.sans.edu/livefeed.html?day=%s&start=%s&count=%d" FETCH_MAX_ENTRIES_PER_REQUEST = 30000 def __init__(self, addentry_callback, filename_save, report_day=None, use_source_ip_filter=True, # reports seem to emerge after max 6 hours sleeptime=60 * 60 * 12): #sleeptime=10): """ use_source_ip_filter -- Skip everything not matching our external IP address. This implies NOT using spoofed addresses """ super().__init__(addentry_callback, filename_save) self._event_id_start = None # only fetch that specific day self._report_day = report_day self._report_day_is_implicit = True if report_day is None else False self._use_source_ip_filter = use_source_ip_filter self._current_external_source_ip = None self._sleeptime = sleeptime def before_scanning(self): super().before_scanning() # DShield seems to auto adjust the id if it's too low for the specified date. # A too high id lead to an empty result. # initial starting: set initial event id if self._event_id_start is None: current_event_id = "1" # use this to fetch older reports if self._report_day is None: self._report_day = time.strftime("%Y-%m-%d") logger.debug("setting implicit report day: %s" % self._report_day) current_event_id = self._get_newest_id(self._report_day) logger.info("using initial id: %s" % current_event_id) self._event_id_start = current_event_id if self._use_source_ip_filter: logger.debug("retrieving external IP address") try: self._current_external_source_ip = self.ip_to_dshield_ip(utility.get_external_ip()) logger.debug("normalized DShield IP address: %s" % self._current_external_source_ip) except Exception: # no external IP set, non IP based filtering will be used pass @staticmethod def ip_to_dshield_ip(ip): """ Convert an IP address to an DShield style IP address having padded zeros: "1.2.3.4" -> "001.002.003.004" """ ip_ints = ip.split(".") return "%03d.%03d.%03d.%03d" % (int(ip_ints[0]), int(ip_ints[1]), int(ip_ints[2]), int(ip_ints[3])) def _get_newest_id(self, day): logger.debug("fetching newest event ID") event_id_to_return = 1 event_id_to_test = 1 event_id_step = 1000000 report_full = "" while report_full is not None: logger.debug("testing ID: %r" % event_id_to_test) event_id_to_test += event_id_step report_full, lines, event_id_report = self._fetch_report(day, event_id_to_test, amount=50) if event_id_report is not None: #logger.debug("updating ids: %r/%r/%r" % (event_id_report, event_id_to_test,event_id_to_return )) event_id_to_test = int(event_id_report) event_id_to_return = event_id_to_test logger.debug("got freshest id: %s (+ max %d)" % (event_id_to_return, event_id_step)) return event_id_to_return @staticmethod def _fetch_report(day, event_id, amount): if day is None: day = "today" url = DShieldReportFetcher.DSHIELD_URL_DAY_START_COUNT % ( day, event_id, DShieldReportFetcher.FETCH_MAX_ENTRIES_PER_REQUEST) try: request = requests.get(url) except Exception as e: logger.warning("could not fetch: %s, connection problems? %r" % (url, e)) return None, None, None lines = split_newline(request.text) if len(lines) < 3: # the last part of the report should have 2 lines left (header, newline) logger.debug("No events left") return None, None, None elif len(lines) < amount - 1: # length is greater/equal 3 logger.debug("sanity checking if this is a valid report") logger.debug("first/second line:\n%s\n%s" % (lines[0], lines[1])) search = pattern_html.search(request.text) if search is not None: logger.warning("reaponse seem to contain html: %s, stopping to fetch, current url: %s" % (search.group(0), url)) return None, None, None try: return request.text, lines, int(split_tab(lines[-2])[0]) except ValueError: # last report ID could not be extracted return None, None, None def after_scanning(self): super().after_scanning() days_to_check = [self._report_day] logger.info("waiting %d seconds as DShield reports don't appear instantly" % self._sleeptime) time.sleep(self._sleeptime) current_day = time.strftime("%Y-%m-%d") # assume max 1 day has passed after calling before_scanning() if self._report_day != current_day and self._report_day_is_implicit: # assume that attack time is less than two days logger.warn("initial day is not current day, min 1 day has been passed") days_to_check.append(current_day) # update to newest day for next possible iteration self._report_day = current_day current_event_id = self._event_id_start # this should be enough max_downloads = 1000 #max_downloads = 10 fd_fullsave = open(self._filename_save, "a") logger.debug("fetching the following days: %r" % days_to_check) for day in days_to_check: logger.debug("downloading all events starting from day=%s/ID=%s" % (day, current_event_id)) amount_downloads = 0 while amount_downloads < max_downloads: logger.info("requesting DShield report, day=%s, ID=%s, amount=%d" % (day, current_event_id, DShieldReportFetcher.FETCH_MAX_ENTRIES_PER_REQUEST)) report_full, lines, current_event_id = self._fetch_report(day, current_event_id, DShieldReportFetcher.FETCH_MAX_ENTRIES_PER_REQUEST) if report_full is None: break fd_fullsave.write(report_full) cnt = 0 # parse all events while cnt < len(lines) - 1: # this skip header: start at 1 cnt += 1 columns = split_tab(lines[cnt]) if len(columns) > 5: try: # skip every source IP address not matching our external IP address if self._current_external_source_ip is not None and self._current_external_source_ip != columns[3]: continue self.add_entry(columns[3], int(columns[4]), int(columns[5])) except Exception: pass # these are mainly errors based on missing data logger.warning("some error occured, ip/sport/dport: %s/%s/%s" % (columns[3], columns[5], columns[4])) #logger.warning(ex) #if cnt < 10: # logger.debug("decoded IP: %r" % columns[3]) else: pass # event entry is too short, report seems to contain scrambled data #logger.warning("event entry is too short: %r, skipping" % lines[cnt]) # start from last fetched ID, take line "-2" as "-1" is empty amount_downloads += 1 self._event_id_start = current_event_id logger.debug("all days fetched") fd_fullsave.close() def fetch_dshield(filename_save, report_day=None): #fetched_entries = defaultdict(int) def addentry_callback(ip_src, port_src, port_dest): try: pass #entry = "%s__%d__%d" % (ip_src, int(port_src), int(port_dest)) #fetched_entries[entry] += 1 except: return None reportfetcher = DShieldReportFetcher(addentry_callback, filename_save, report_day=report_day, sleeptime=1) reportfetcher.before_scanning() reportfetcher.after_scanning() def fetch_tracing(filename_save): def addentry_callback(ip_src, port_src, port_dest): try: pass #entry = "%s__%d__%d" % (ip_src, int(port_src), int(port_dest)) #fetched_entries[entry] += 1 except: return None reportfetcher = TracingReportFetcher(addentry_callback, filename_save) reportfetcher.before_scanning() #reportfetcher.after_scanning() import queue if __name__ == "__main__1": dir_report = "./raw_data_dshield/" queue_inout = queue.Queue(999999) year = 2014 month = 3 amount_parallel_fetches = 1 dates = ["%d-%02d-%02d" % (year, month, day) for day in range(15, 16)] for datestr_in in dates: queue_inout.put(datestr_in) def fetch_cycler(): while True: datestr = queue_inout.get(block=True) filename_save = dir_report + "dshield_report_" + datestr.replace("-", "_") + ".csv" logger.debug("file/day: %s/%s" % (filename_save, datestr)) fetch_dshield(filename_save, report_day=datestr) for x in range(amount_parallel_fetches): thread = threading.Thread(target=fetch_cycler) thread.start() #filename_save = dir_report + "dshield_report_test.csv" #fetch_dshield(filename_save, report_day=None) if __name__ == "__main__": dir_report = "./raw_data_tracing" fetch_tracing(dir_report + "/attackdata.csv")