123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429 |
- """
- Module to handle report fetching.
- Requirements:
- - socketIO_client
- """
- import requests
- import time
- import re
- import logging
- import threading
- import os
- from collections import defaultdict
- split_tab = re.compile("\t").split
- split_newline = re.compile("\r?\n+").split
- pattern_html = re.compile("<.+>")
- from socketIO_client import SocketIO
- import utility
- STATE_INITIATED = 0 # state when ReportFetcher was initiated
- STATE_BEFORE_SCANNING = 1 # state when before_scanning() was called
- STATE_AFTER_SCANNING = 2 # state when after_scanning() was called
- logging.basicConfig(format="%(levelname)s (%(funcName)s): %(message)s")
- logger = logging.getLogger("pra_framework")
- logger.setLevel(logging.DEBUG)
- class ReportFetcher(object):
- """
- Handles report fetching on a CIDS. All attack events have to be added
- using add_entry(ip, port_src, port_dst) until the end of the method after_scanning().
- """
- def __init__(self, add_entry_callback, filename_save):
- """
- add_entry_callback -- callback to be used to count/reassemble markers.
- method-signature:
- callback(ip_source (string), port_source (int), port_dest(int))
- filename_save -- filename to save report information
- """
- self._state = STATE_INITIATED
- self.add_entry = add_entry_callback
- self._filename_save = filename_save
- def before_scanning(self, *args):
- """
- Initiate report fetching. This will be called before the actual scan starts.
- This can include:
- - Remember last report entry to skip unneeded entries
- - Initiate HTTP connection to collect attack entries
- args -- any additional parameters (just for debugging purposes)
- """
- if self._state not in [STATE_INITIATED, STATE_AFTER_SCANNING]:
- raise Exception("Wrong state: %d" % self._state)
- self._state = STATE_BEFORE_SCANNING
- def after_scanning(self):
- """
- Execute all actions tied to finishing the report fetching. This will be called
- after a scan has finished.
- """
- if self._state != STATE_BEFORE_SCANNING:
- raise Exception("Wrong state: %d" % self._state)
- self._state = STATE_AFTER_SCANNING
- class TracingReportFetcher(ReportFetcher):
- """
- Report fetcher implementation for TraCINg CIDS.
- Note:
- There is a bug which lets TraCINg setting port 0 and 65535 to "" (empty value).
- """
- def __init__(self, addentry_callback,
- filename_save,
- #tracing_domain="ssi.cased.de"):
- tracing_domain="localhost"):
- super().__init__(addentry_callback, filename_save)
- self._tracing_domain = tracing_domain
- self._wait_thread = None
- self._socketio_connection = None
- self._fd_savereport = None
- def before_scanning(self):
- super().before_scanning()
- logger.debug("starting to listen to TraCINg over Socket.IO, domain: %s" % self._tracing_domain)
- socketio_connection = SocketIO(self._tracing_domain, 80)
- self._socketio_connection = socketio_connection
- # check for file before creating it
- add_header = not os.path.isfile(self._filename_save)
- self._fd_savereport = open(self._filename_save, "a")
- if add_header:
- # file did not yet exist, append header
- self._fd_savereport.write("date\tmonitor\ttype\tip_src\tip_dst\tport_src\tport_dst\n")
- self._fd_savereport.flush()
- def incident_callback(*incidents):
- # logger.debug("got INCIDENTS: %d" % len(incidents))
- # we've got source and destination IP adress / "officially" only information
- # about source/target city. As using these information (IP address or city)
- # would be far too easy we leave those information out.
- for incident in incidents:
- # take external monitor ip if present
- monitor_ip = incident["external_ip"] if "external_ip" in incident else incident["dst"]["ip"]
- #logger.debug("Monitor IP extern/normal: %r/%r" % (monitor_ip, incident["dst"]["ip"]))
- #pprint.pprint(incident)
- # just react on events 10=Transport Layer, 11=Portscan
- port_src, port_dst = None, None
- try:
- if incident["type"] in [10, 11]:
- port_src = int(incident["src"]["port"])
- port_dst = int(incident["dst"]["port"])
- #logger.debug("adding entry for IP: %s, port src=%d, port dst=%d" % (monitor_ip, port_src, port_dst))
- self.add_entry(None, port_src, port_dst)
- except Exception as ex:
- logger.warning("problem on extracting TraCINg data: %r" % ex)
- logger.warning("ports: %r %r" % (port_src, port_dst))
- try:
- # date, sensor, ip src, ip dst, port src, port dst
- entry = "%s\t%s\t%s\t%s\t%s\t%s\t%s\n" % (
- incident["date"],
- incident["sensorname"],
- incident["type"],
- incident["src"]["ip"],
- monitor_ip,
- incident["src"]["port"],
- incident["dst"]["port"]
- )
- # logger.debug("new entry is: %r" % entry)
- #self._fd_savereport.write(entry)
- #self._fd_savereport.flush()
- except Exception as ex:
- logger.warning(ex)
- #logger.debug(self._fetched_ips)
- socketio_connection.on("markIncident", incident_callback)
- self._wait_thread = threading.Thread(target=socketio_connection.wait)
- # logger.debug("starting event handler thread")
- self._wait_thread.start()
- # jsut for savety: make sure we can handle incoming events
- time.sleep(1)
- def after_scanning(self):
- # make sure all events got catched
- logger.info("waiting to catch all events")
- #time.sleep(10)
- #time.sleep(30)
- #time.sleep(60)
- #time.sleep(180)
- #time.sleep(300)
- time.sleep(600)
- super().after_scanning()
- logger.debug("disconnecting socket io")
- self._socketio_connection.disconnect()
- if self._fd_savereport is not None:
- self._fd_savereport.close()
- PATTERN_TABSPLIT = re.compile("\t+")
- PATTERN_NEWLINESPLIT = re.compile("\r?\n")
- class DShieldReportFetcher(ReportFetcher):
- """
- Report Fetcher for the DShield CIDS.
- Report can be fetched using the following URL:
- https://www.dshield.org/livefeed.html?day=today&start=463960503000&count=1000
- Parameters:
- day - The day to fetch a report for
- start - if of an attack event to start with
- count - amount of events to download
- TODO: DShield events doensn't seem to be placed in order, not even on a daily accuracy.
- """
- # day format: YYYY-MM-DD
- DSHIELD_URL_DAY_START_COUNT = "http://isc.sans.edu/livefeed.html?day=%s&start=%s&count=%d"
- FETCH_MAX_ENTRIES_PER_REQUEST = 30000
- def __init__(self, addentry_callback,
- filename_save,
- report_day=None,
- use_source_ip_filter=True,
- # reports seem to emerge after max 6 hours
- sleeptime=60 * 60 * 12):
- #sleeptime=10):
- """
- use_source_ip_filter -- Skip everything not matching our external IP address.
- This implies NOT using spoofed addresses
- """
- super().__init__(addentry_callback, filename_save)
- self._event_id_start = None
- # only fetch that specific day
- self._report_day = report_day
- self._report_day_is_implicit = True if report_day is None else False
- self._use_source_ip_filter = use_source_ip_filter
- self._current_external_source_ip = None
- self._sleeptime = sleeptime
- def before_scanning(self):
- super().before_scanning()
- # DShield seems to auto adjust the id if it's too low for the specified date.
- # A too high id lead to an empty result.
- # initial starting: set initial event id
- if self._event_id_start is None:
- current_event_id = "1" # use this to fetch older reports
- if self._report_day is None:
- self._report_day = time.strftime("%Y-%m-%d")
- logger.debug("setting implicit report day: %s" % self._report_day)
- current_event_id = self._get_newest_id(self._report_day)
- logger.info("using initial id: %s" % current_event_id)
- self._event_id_start = current_event_id
- if self._use_source_ip_filter:
- logger.debug("retrieving external IP address")
- try:
- self._current_external_source_ip = self.ip_to_dshield_ip(utility.get_external_ip())
- logger.debug("normalized DShield IP address: %s" % self._current_external_source_ip)
- except Exception:
- # no external IP set, non IP based filtering will be used
- pass
- @staticmethod
- def ip_to_dshield_ip(ip):
- """
- Convert an IP address to an DShield style IP address having padded zeros: "1.2.3.4" -> "001.002.003.004"
- """
- ip_ints = ip.split(".")
- return "%03d.%03d.%03d.%03d" % (int(ip_ints[0]), int(ip_ints[1]), int(ip_ints[2]), int(ip_ints[3]))
- def _get_newest_id(self, day):
- logger.debug("fetching newest event ID")
- event_id_to_return = 1
- event_id_to_test = 1
- event_id_step = 1000000
- report_full = ""
- while report_full is not None:
- logger.debug("testing ID: %r" % event_id_to_test)
- event_id_to_test += event_id_step
- report_full, lines, event_id_report = self._fetch_report(day, event_id_to_test, amount=50)
- if event_id_report is not None:
- #logger.debug("updating ids: %r/%r/%r" % (event_id_report, event_id_to_test,event_id_to_return ))
- event_id_to_test = int(event_id_report)
- event_id_to_return = event_id_to_test
- logger.debug("got freshest id: %s (+ max %d)" % (event_id_to_return, event_id_step))
- return event_id_to_return
- @staticmethod
- def _fetch_report(day, event_id, amount):
- if day is None:
- day = "today"
- url = DShieldReportFetcher.DSHIELD_URL_DAY_START_COUNT % (
- day,
- event_id,
- DShieldReportFetcher.FETCH_MAX_ENTRIES_PER_REQUEST)
- try:
- request = requests.get(url)
- except Exception as e:
- logger.warning("could not fetch: %s, connection problems? %r" % (url, e))
- return None, None, None
- lines = split_newline(request.text)
- if len(lines) < 3:
- # the last part of the report should have 2 lines left (header, newline)
- logger.debug("No events left")
- return None, None, None
- elif len(lines) < amount - 1:
- # length is greater/equal 3
- logger.debug("sanity checking if this is a valid report")
- logger.debug("first/second line:\n%s\n%s" % (lines[0], lines[1]))
- search = pattern_html.search(request.text)
- if search is not None:
- logger.warning("reaponse seem to contain html: %s, stopping to fetch, current url: %s" %
- (search.group(0), url))
- return None, None, None
- try:
- return request.text, lines, int(split_tab(lines[-2])[0])
- except ValueError:
- # last report ID could not be extracted
- return None, None, None
- def after_scanning(self):
- super().after_scanning()
- days_to_check = [self._report_day]
- logger.info("waiting %d seconds as DShield reports don't appear instantly" % self._sleeptime)
- time.sleep(self._sleeptime)
- current_day = time.strftime("%Y-%m-%d")
- # assume max 1 day has passed after calling before_scanning()
- if self._report_day != current_day and self._report_day_is_implicit:
- # assume that attack time is less than two days
- logger.warn("initial day is not current day, min 1 day has been passed")
- days_to_check.append(current_day)
- # update to newest day for next possible iteration
- self._report_day = current_day
- current_event_id = self._event_id_start
- # this should be enough
- max_downloads = 1000
- #max_downloads = 10
- fd_fullsave = open(self._filename_save, "a")
- logger.debug("fetching the following days: %r" % days_to_check)
- for day in days_to_check:
- logger.debug("downloading all events starting from day=%s/ID=%s" % (day, current_event_id))
- amount_downloads = 0
- while amount_downloads < max_downloads:
- logger.info("requesting DShield report, day=%s, ID=%s, amount=%d" %
- (day, current_event_id, DShieldReportFetcher.FETCH_MAX_ENTRIES_PER_REQUEST))
- report_full, lines, current_event_id = self._fetch_report(day,
- current_event_id,
- DShieldReportFetcher.FETCH_MAX_ENTRIES_PER_REQUEST)
- if report_full is None:
- break
- fd_fullsave.write(report_full)
- cnt = 0
- # parse all events
- while cnt < len(lines) - 1:
- # this skip header: start at 1
- cnt += 1
- columns = split_tab(lines[cnt])
- if len(columns) > 5:
- try:
- # skip every source IP address not matching our external IP address
- if self._current_external_source_ip is not None and self._current_external_source_ip != columns[3]:
- continue
- self.add_entry(columns[3], int(columns[4]), int(columns[5]))
- except Exception:
- pass
- # these are mainly errors based on missing data
- logger.warning("some error occured, ip/sport/dport: %s/%s/%s" %
- (columns[3], columns[5], columns[4]))
- #logger.warning(ex)
- #if cnt < 10:
- # logger.debug("decoded IP: %r" % columns[3])
- else:
- pass
- # event entry is too short, report seems to contain scrambled data
- #logger.warning("event entry is too short: %r, skipping" % lines[cnt])
- # start from last fetched ID, take line "-2" as "-1" is empty
- amount_downloads += 1
- self._event_id_start = current_event_id
- logger.debug("all days fetched")
- fd_fullsave.close()
- def fetch_dshield(filename_save, report_day=None):
- #fetched_entries = defaultdict(int)
- def addentry_callback(ip_src, port_src, port_dest):
- try:
- pass
- #entry = "%s__%d__%d" % (ip_src, int(port_src), int(port_dest))
- #fetched_entries[entry] += 1
- except:
- return None
- reportfetcher = DShieldReportFetcher(addentry_callback,
- filename_save,
- report_day=report_day,
- sleeptime=1)
- reportfetcher.before_scanning()
- reportfetcher.after_scanning()
- def fetch_tracing(filename_save):
- def addentry_callback(ip_src, port_src, port_dest):
- try:
- pass
- #entry = "%s__%d__%d" % (ip_src, int(port_src), int(port_dest))
- #fetched_entries[entry] += 1
- except:
- return None
- reportfetcher = TracingReportFetcher(addentry_callback,
- filename_save)
- reportfetcher.before_scanning()
- #reportfetcher.after_scanning()
- import queue
- if __name__ == "__main__1":
- dir_report = "./raw_data_dshield/"
- queue_inout = queue.Queue(999999)
- year = 2014
- month = 3
- amount_parallel_fetches = 1
- dates = ["%d-%02d-%02d" % (year, month, day) for day in range(15, 16)]
- for datestr_in in dates:
- queue_inout.put(datestr_in)
- def fetch_cycler():
- while True:
- datestr = queue_inout.get(block=True)
- filename_save = dir_report + "dshield_report_" + datestr.replace("-", "_") + ".csv"
- logger.debug("file/day: %s/%s" % (filename_save, datestr))
- fetch_dshield(filename_save, report_day=datestr)
- for x in range(amount_parallel_fetches):
- thread = threading.Thread(target=fetch_cycler)
- thread.start()
- #filename_save = dir_report + "dshield_report_test.csv"
- #fetch_dshield(filename_save, report_day=None)
- if __name__ == "__main__":
- dir_report = "./raw_data_tracing"
- fetch_tracing(dir_report + "/attackdata.csv")
|