report_fetcher.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. """
  2. Module to handle report fetching.
  3. Requirements:
  4. - socketIO_client
  5. """
  6. import requests
  7. import time
  8. import re
  9. import logging
  10. import threading
  11. import os
  12. from collections import defaultdict
  13. split_tab = re.compile("\t").split
  14. split_newline = re.compile("\r?\n+").split
  15. pattern_html = re.compile("<.+>")
  16. from socketIO_client import SocketIO
  17. import utility
  18. STATE_INITIATED = 0 # state when ReportFetcher was initiated
  19. STATE_BEFORE_SCANNING = 1 # state when before_scanning() was called
  20. STATE_AFTER_SCANNING = 2 # state when after_scanning() was called
  21. logging.basicConfig(format="%(levelname)s (%(funcName)s): %(message)s")
  22. logger = logging.getLogger("pra_framework")
  23. logger.setLevel(logging.DEBUG)
  24. class ReportFetcher(object):
  25. """
  26. Handles report fetching on a CIDS. All attack events have to be added
  27. using add_entry(ip, port_src, port_dst) until the end of the method after_scanning().
  28. """
  29. def __init__(self, add_entry_callback, filename_save):
  30. """
  31. add_entry_callback -- callback to be used to count/reassemble markers.
  32. method-signature:
  33. callback(ip_source (string), port_source (int), port_dest(int))
  34. filename_save -- filename to save report information
  35. """
  36. self._state = STATE_INITIATED
  37. self.add_entry = add_entry_callback
  38. self._filename_save = filename_save
  39. def before_scanning(self, *args):
  40. """
  41. Initiate report fetching. This will be called before the actual scan starts.
  42. This can include:
  43. - Remember last report entry to skip unneeded entries
  44. - Initiate HTTP connection to collect attack entries
  45. args -- any additional parameters (just for debugging purposes)
  46. """
  47. if self._state not in [STATE_INITIATED, STATE_AFTER_SCANNING]:
  48. raise Exception("Wrong state: %d" % self._state)
  49. self._state = STATE_BEFORE_SCANNING
  50. def after_scanning(self):
  51. """
  52. Execute all actions tied to finishing the report fetching. This will be called
  53. after a scan has finished.
  54. """
  55. if self._state != STATE_BEFORE_SCANNING:
  56. raise Exception("Wrong state: %d" % self._state)
  57. self._state = STATE_AFTER_SCANNING
  58. class TracingReportFetcher(ReportFetcher):
  59. """
  60. Report fetcher implementation for TraCINg CIDS.
  61. Note:
  62. There is a bug which lets TraCINg setting port 0 and 65535 to "" (empty value).
  63. """
  64. def __init__(self, addentry_callback,
  65. filename_save,
  66. #tracing_domain="ssi.cased.de"):
  67. tracing_domain="localhost"):
  68. super().__init__(addentry_callback, filename_save)
  69. self._tracing_domain = tracing_domain
  70. self._wait_thread = None
  71. self._socketio_connection = None
  72. self._fd_savereport = None
  73. def before_scanning(self):
  74. super().before_scanning()
  75. logger.debug("starting to listen to TraCINg over Socket.IO, domain: %s" % self._tracing_domain)
  76. socketio_connection = SocketIO(self._tracing_domain, 80)
  77. self._socketio_connection = socketio_connection
  78. # check for file before creating it
  79. add_header = not os.path.isfile(self._filename_save)
  80. self._fd_savereport = open(self._filename_save, "a")
  81. if add_header:
  82. # file did not yet exist, append header
  83. self._fd_savereport.write("date\tmonitor\ttype\tip_src\tip_dst\tport_src\tport_dst\n")
  84. self._fd_savereport.flush()
  85. def incident_callback(*incidents):
  86. # logger.debug("got INCIDENTS: %d" % len(incidents))
  87. # we've got source and destination IP adress / "officially" only information
  88. # about source/target city. As using these information (IP address or city)
  89. # would be far too easy we leave those information out.
  90. for incident in incidents:
  91. # take external monitor ip if present
  92. monitor_ip = incident["external_ip"] if "external_ip" in incident else incident["dst"]["ip"]
  93. #logger.debug("Monitor IP extern/normal: %r/%r" % (monitor_ip, incident["dst"]["ip"]))
  94. #pprint.pprint(incident)
  95. # just react on events 10=Transport Layer, 11=Portscan
  96. port_src, port_dst = None, None
  97. try:
  98. if incident["type"] in [10, 11]:
  99. port_src = int(incident["src"]["port"])
  100. port_dst = int(incident["dst"]["port"])
  101. #logger.debug("adding entry for IP: %s, port src=%d, port dst=%d" % (monitor_ip, port_src, port_dst))
  102. self.add_entry(None, port_src, port_dst)
  103. except Exception as ex:
  104. logger.warning("problem on extracting TraCINg data: %r" % ex)
  105. logger.warning("ports: %r %r" % (port_src, port_dst))
  106. try:
  107. # date, sensor, ip src, ip dst, port src, port dst
  108. entry = "%s\t%s\t%s\t%s\t%s\t%s\t%s\n" % (
  109. incident["date"],
  110. incident["sensorname"],
  111. incident["type"],
  112. incident["src"]["ip"],
  113. monitor_ip,
  114. incident["src"]["port"],
  115. incident["dst"]["port"]
  116. )
  117. # logger.debug("new entry is: %r" % entry)
  118. #self._fd_savereport.write(entry)
  119. #self._fd_savereport.flush()
  120. except Exception as ex:
  121. logger.warning(ex)
  122. #logger.debug(self._fetched_ips)
  123. socketio_connection.on("markIncident", incident_callback)
  124. self._wait_thread = threading.Thread(target=socketio_connection.wait)
  125. # logger.debug("starting event handler thread")
  126. self._wait_thread.start()
  127. # jsut for savety: make sure we can handle incoming events
  128. time.sleep(1)
  129. def after_scanning(self):
  130. # make sure all events got catched
  131. logger.info("waiting to catch all events")
  132. #time.sleep(10)
  133. #time.sleep(30)
  134. #time.sleep(60)
  135. #time.sleep(180)
  136. #time.sleep(300)
  137. time.sleep(600)
  138. super().after_scanning()
  139. logger.debug("disconnecting socket io")
  140. self._socketio_connection.disconnect()
  141. if self._fd_savereport is not None:
  142. self._fd_savereport.close()
  143. PATTERN_TABSPLIT = re.compile("\t+")
  144. PATTERN_NEWLINESPLIT = re.compile("\r?\n")
  145. class DShieldReportFetcher(ReportFetcher):
  146. """
  147. Report Fetcher for the DShield CIDS.
  148. Report can be fetched using the following URL:
  149. https://www.dshield.org/livefeed.html?day=today&start=463960503000&count=1000
  150. Parameters:
  151. day - The day to fetch a report for
  152. start - if of an attack event to start with
  153. count - amount of events to download
  154. TODO: DShield events doensn't seem to be placed in order, not even on a daily accuracy.
  155. """
  156. # day format: YYYY-MM-DD
  157. DSHIELD_URL_DAY_START_COUNT = "http://isc.sans.edu/livefeed.html?day=%s&start=%s&count=%d"
  158. FETCH_MAX_ENTRIES_PER_REQUEST = 30000
  159. def __init__(self, addentry_callback,
  160. filename_save,
  161. report_day=None,
  162. use_source_ip_filter=True,
  163. # reports seem to emerge after max 6 hours
  164. sleeptime=60 * 60 * 12):
  165. #sleeptime=10):
  166. """
  167. use_source_ip_filter -- Skip everything not matching our external IP address.
  168. This implies NOT using spoofed addresses
  169. """
  170. super().__init__(addentry_callback, filename_save)
  171. self._event_id_start = None
  172. # only fetch that specific day
  173. self._report_day = report_day
  174. self._report_day_is_implicit = True if report_day is None else False
  175. self._use_source_ip_filter = use_source_ip_filter
  176. self._current_external_source_ip = None
  177. self._sleeptime = sleeptime
  178. def before_scanning(self):
  179. super().before_scanning()
  180. # DShield seems to auto adjust the id if it's too low for the specified date.
  181. # A too high id lead to an empty result.
  182. # initial starting: set initial event id
  183. if self._event_id_start is None:
  184. current_event_id = "1" # use this to fetch older reports
  185. if self._report_day is None:
  186. self._report_day = time.strftime("%Y-%m-%d")
  187. logger.debug("setting implicit report day: %s" % self._report_day)
  188. current_event_id = self._get_newest_id(self._report_day)
  189. logger.info("using initial id: %s" % current_event_id)
  190. self._event_id_start = current_event_id
  191. if self._use_source_ip_filter:
  192. logger.debug("retrieving external IP address")
  193. try:
  194. self._current_external_source_ip = self.ip_to_dshield_ip(utility.get_external_ip())
  195. logger.debug("normalized DShield IP address: %s" % self._current_external_source_ip)
  196. except Exception:
  197. # no external IP set, non IP based filtering will be used
  198. pass
  199. @staticmethod
  200. def ip_to_dshield_ip(ip):
  201. """
  202. Convert an IP address to an DShield style IP address having padded zeros: "1.2.3.4" -> "001.002.003.004"
  203. """
  204. ip_ints = ip.split(".")
  205. return "%03d.%03d.%03d.%03d" % (int(ip_ints[0]), int(ip_ints[1]), int(ip_ints[2]), int(ip_ints[3]))
  206. def _get_newest_id(self, day):
  207. logger.debug("fetching newest event ID")
  208. event_id_to_return = 1
  209. event_id_to_test = 1
  210. event_id_step = 1000000
  211. report_full = ""
  212. while report_full is not None:
  213. logger.debug("testing ID: %r" % event_id_to_test)
  214. event_id_to_test += event_id_step
  215. report_full, lines, event_id_report = self._fetch_report(day, event_id_to_test, amount=50)
  216. if event_id_report is not None:
  217. #logger.debug("updating ids: %r/%r/%r" % (event_id_report, event_id_to_test,event_id_to_return ))
  218. event_id_to_test = int(event_id_report)
  219. event_id_to_return = event_id_to_test
  220. logger.debug("got freshest id: %s (+ max %d)" % (event_id_to_return, event_id_step))
  221. return event_id_to_return
  222. @staticmethod
  223. def _fetch_report(day, event_id, amount):
  224. if day is None:
  225. day = "today"
  226. url = DShieldReportFetcher.DSHIELD_URL_DAY_START_COUNT % (
  227. day,
  228. event_id,
  229. DShieldReportFetcher.FETCH_MAX_ENTRIES_PER_REQUEST)
  230. try:
  231. request = requests.get(url)
  232. except Exception as e:
  233. logger.warning("could not fetch: %s, connection problems? %r" % (url, e))
  234. return None, None, None
  235. lines = split_newline(request.text)
  236. if len(lines) < 3:
  237. # the last part of the report should have 2 lines left (header, newline)
  238. logger.debug("No events left")
  239. return None, None, None
  240. elif len(lines) < amount - 1:
  241. # length is greater/equal 3
  242. logger.debug("sanity checking if this is a valid report")
  243. logger.debug("first/second line:\n%s\n%s" % (lines[0], lines[1]))
  244. search = pattern_html.search(request.text)
  245. if search is not None:
  246. logger.warning("reaponse seem to contain html: %s, stopping to fetch, current url: %s" %
  247. (search.group(0), url))
  248. return None, None, None
  249. try:
  250. return request.text, lines, int(split_tab(lines[-2])[0])
  251. except ValueError:
  252. # last report ID could not be extracted
  253. return None, None, None
  254. def after_scanning(self):
  255. super().after_scanning()
  256. days_to_check = [self._report_day]
  257. logger.info("waiting %d seconds as DShield reports don't appear instantly" % self._sleeptime)
  258. time.sleep(self._sleeptime)
  259. current_day = time.strftime("%Y-%m-%d")
  260. # assume max 1 day has passed after calling before_scanning()
  261. if self._report_day != current_day and self._report_day_is_implicit:
  262. # assume that attack time is less than two days
  263. logger.warn("initial day is not current day, min 1 day has been passed")
  264. days_to_check.append(current_day)
  265. # update to newest day for next possible iteration
  266. self._report_day = current_day
  267. current_event_id = self._event_id_start
  268. # this should be enough
  269. max_downloads = 1000
  270. #max_downloads = 10
  271. fd_fullsave = open(self._filename_save, "a")
  272. logger.debug("fetching the following days: %r" % days_to_check)
  273. for day in days_to_check:
  274. logger.debug("downloading all events starting from day=%s/ID=%s" % (day, current_event_id))
  275. amount_downloads = 0
  276. while amount_downloads < max_downloads:
  277. logger.info("requesting DShield report, day=%s, ID=%s, amount=%d" %
  278. (day, current_event_id, DShieldReportFetcher.FETCH_MAX_ENTRIES_PER_REQUEST))
  279. report_full, lines, current_event_id = self._fetch_report(day,
  280. current_event_id,
  281. DShieldReportFetcher.FETCH_MAX_ENTRIES_PER_REQUEST)
  282. if report_full is None:
  283. break
  284. fd_fullsave.write(report_full)
  285. cnt = 0
  286. # parse all events
  287. while cnt < len(lines) - 1:
  288. # this skip header: start at 1
  289. cnt += 1
  290. columns = split_tab(lines[cnt])
  291. if len(columns) > 5:
  292. try:
  293. # skip every source IP address not matching our external IP address
  294. if self._current_external_source_ip is not None and self._current_external_source_ip != columns[3]:
  295. continue
  296. self.add_entry(columns[3], int(columns[4]), int(columns[5]))
  297. except Exception:
  298. pass
  299. # these are mainly errors based on missing data
  300. logger.warning("some error occured, ip/sport/dport: %s/%s/%s" %
  301. (columns[3], columns[5], columns[4]))
  302. #logger.warning(ex)
  303. #if cnt < 10:
  304. # logger.debug("decoded IP: %r" % columns[3])
  305. else:
  306. pass
  307. # event entry is too short, report seems to contain scrambled data
  308. #logger.warning("event entry is too short: %r, skipping" % lines[cnt])
  309. # start from last fetched ID, take line "-2" as "-1" is empty
  310. amount_downloads += 1
  311. self._event_id_start = current_event_id
  312. logger.debug("all days fetched")
  313. fd_fullsave.close()
  314. def fetch_dshield(filename_save, report_day=None):
  315. #fetched_entries = defaultdict(int)
  316. def addentry_callback(ip_src, port_src, port_dest):
  317. try:
  318. pass
  319. #entry = "%s__%d__%d" % (ip_src, int(port_src), int(port_dest))
  320. #fetched_entries[entry] += 1
  321. except:
  322. return None
  323. reportfetcher = DShieldReportFetcher(addentry_callback,
  324. filename_save,
  325. report_day=report_day,
  326. sleeptime=1)
  327. reportfetcher.before_scanning()
  328. reportfetcher.after_scanning()
  329. def fetch_tracing(filename_save):
  330. def addentry_callback(ip_src, port_src, port_dest):
  331. try:
  332. pass
  333. #entry = "%s__%d__%d" % (ip_src, int(port_src), int(port_dest))
  334. #fetched_entries[entry] += 1
  335. except:
  336. return None
  337. reportfetcher = TracingReportFetcher(addentry_callback,
  338. filename_save)
  339. reportfetcher.before_scanning()
  340. #reportfetcher.after_scanning()
  341. import queue
  342. if __name__ == "__main__1":
  343. dir_report = "./raw_data_dshield/"
  344. queue_inout = queue.Queue(999999)
  345. year = 2014
  346. month = 3
  347. amount_parallel_fetches = 1
  348. dates = ["%d-%02d-%02d" % (year, month, day) for day in range(15, 16)]
  349. for datestr_in in dates:
  350. queue_inout.put(datestr_in)
  351. def fetch_cycler():
  352. while True:
  353. datestr = queue_inout.get(block=True)
  354. filename_save = dir_report + "dshield_report_" + datestr.replace("-", "_") + ".csv"
  355. logger.debug("file/day: %s/%s" % (filename_save, datestr))
  356. fetch_dshield(filename_save, report_day=datestr)
  357. for x in range(amount_parallel_fetches):
  358. thread = threading.Thread(target=fetch_cycler)
  359. thread.start()
  360. #filename_save = dir_report + "dshield_report_test.csv"
  361. #fetch_dshield(filename_save, report_day=None)
  362. if __name__ == "__main__":
  363. dir_report = "./raw_data_tracing"
  364. fetch_tracing(dir_report + "/attackdata.csv")