report_fetcher.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. """
  2. Module to handle report fetching.
  3. Requirements:
  4. - socketIO_client
  5. """
  6. import requests
  7. import time
  8. import re
  9. import logging
  10. import threading
  11. import os
  12. from collections import defaultdict
  13. split_tab = re.compile("\t").split
  14. split_newline = re.compile("\r?\n+").split
  15. pattern_html = re.compile("<.+>")
  16. from socketIO_client import SocketIO
  17. import utility
  18. STATE_INITIATED = 0 # state when ReportFetcher was initiated
  19. STATE_BEFORE_SCANNING = 1 # state when before_scanning() was called
  20. STATE_AFTER_SCANNING = 2 # state when after_scanning() was called
  21. logging.basicConfig(format="%(levelname)s (%(funcName)s): %(message)s")
  22. logger = logging.getLogger("pra_framework")
  23. logger.setLevel(logging.DEBUG)
  24. class ReportFetcher(object):
  25. """
  26. Handles report fetching on a CIDS. All attack events have to be added
  27. using add_entry(ip, port_src, port_dst) until the end of the method after_scanning().
  28. """
  29. def __init__(self, add_entry_callback, filename_save):
  30. """
  31. add_entry_callback -- callback to be used to count/reassemble markers.
  32. method-signature:
  33. callback(ip_source (string), port_source (int), port_dest(int))
  34. filename_save -- filename to save report information
  35. """
  36. self._state = STATE_INITIATED
  37. self.add_entry = add_entry_callback
  38. self._filename_save = filename_save
  39. def before_scanning(self, *args):
  40. """
  41. Initiate report fetching. This will be called before the actual scan starts.
  42. This can include:
  43. - Remember last report entry to skip unneeded entries
  44. - Initiate HTTP connection to collect attack entries
  45. args -- any additional parameters (just for debugging purposes)
  46. """
  47. if self._state not in [STATE_INITIATED, STATE_AFTER_SCANNING]:
  48. raise Exception("Wrong state: %d" % self._state)
  49. self._state = STATE_BEFORE_SCANNING
  50. def after_scanning(self):
  51. """
  52. Execute all actions tied to finishing the report fetching. This will be called
  53. after a scan has finished.
  54. """
  55. if self._state != STATE_BEFORE_SCANNING:
  56. raise Exception("Wrong state: %d" % self._state)
  57. self._state = STATE_AFTER_SCANNING
  58. class TracingReportFetcher(ReportFetcher):
  59. """
  60. Report fetcher implementation for TraCINg CIDS.
  61. Note:
  62. There is a bug which lets TraCINg setting port 0 and 65535 to "" (empty value).
  63. """
  64. def __init__(self, addentry_callback,
  65. filename_save,
  66. #tracing_domain="ssi.cased.de"):
  67. tracing_domain="localhost"):
  68. super().__init__(addentry_callback, filename_save)
  69. self._tracing_domain = tracing_domain
  70. self._wait_thread = None
  71. self._socketio_connection = None
  72. self._fd_savereport = None
  73. def before_scanning(self):
  74. super().before_scanning()
  75. logger.debug("starting to listen to TraCINg over Socket.IO, domain: %s" % self._tracing_domain)
  76. socketio_connection = SocketIO(self._tracing_domain, 80)
  77. self._socketio_connection = socketio_connection
  78. # check for file before creating it
  79. add_header = not os.path.isfile(self._filename_save)
  80. self._fd_savereport = open(self._filename_save, "a")
  81. if add_header:
  82. # file did not yet exist, append header
  83. self._fd_savereport.write("date\tmonitor\ttype\tip_src\tip_dst\tport_src\tport_dst\n")
  84. self._fd_savereport.flush()
  85. def incident_callback(*incidents):
  86. # logger.debug("got INCIDENTS: %d" % len(incidents))
  87. # we've got source and destination IP adress / "officially" only information
  88. # about source/target city. As using these information (IP address or city)
  89. # would be far too easy we leave those information out.
  90. for incident in incidents:
  91. # take external monitor ip if present
  92. monitor_ip = incident["external_ip"] if "external_ip" in incident else incident["dst"]["ip"]
  93. #logger.debug("Monitor IP extern/normal: %r/%r" % (monitor_ip, incident["dst"]["ip"]))
  94. #pprint.pprint(incident)
  95. # just react on events 10=Transport Layer, 11=Portscan
  96. try:
  97. if incident["type"] in [10, 11]:
  98. port_src = int(incident["src"]["port"])
  99. port_dst = int(incident["dst"]["port"])
  100. #logger.debug("adding entry for IP: %s, port src=%d, port dst=%d" % (monitor_ip, port_src, port_dst))
  101. self.add_entry(None, port_src, port_dst)
  102. except Exception as ex:
  103. logger.warning("problem on extracting TraCINg data: %r" % ex)
  104. try:
  105. # date, sensor, ip src, ip dst, port src, port dst
  106. entry = "%s\t%s\t%s\t%s\t%s\t%s\t%s\n" % (
  107. incident["date"],
  108. incident["sensorname"],
  109. incident["type"],
  110. incident["src"]["ip"],
  111. monitor_ip,
  112. incident["src"]["port"],
  113. incident["dst"]["port"]
  114. )
  115. # logger.debug("new entry is: %r" % entry)
  116. #self._fd_savereport.write(entry)
  117. #self._fd_savereport.flush()
  118. except Exception as ex:
  119. logger.warning(ex)
  120. #logger.debug(self._fetched_ips)
  121. socketio_connection.on("markIncident", incident_callback)
  122. self._wait_thread = threading.Thread(target=socketio_connection.wait)
  123. # logger.debug("starting event handler thread")
  124. self._wait_thread.start()
  125. # jsut for savety: make sure we can handle incoming events
  126. time.sleep(1)
  127. def after_scanning(self):
  128. # make sure all events got catched
  129. logger.info("waiting to catch all events")
  130. #time.sleep(10)
  131. #time.sleep(30)
  132. #time.sleep(60)
  133. #time.sleep(180)
  134. #time.sleep(300)
  135. time.sleep(600)
  136. super().after_scanning()
  137. logger.debug("disconnecting socket io")
  138. self._socketio_connection.disconnect()
  139. if self._fd_savereport is not None:
  140. self._fd_savereport.close()
  141. PATTERN_TABSPLIT = re.compile("\t+")
  142. PATTERN_NEWLINESPLIT = re.compile("\r?\n")
  143. class DShieldReportFetcher(ReportFetcher):
  144. """
  145. Report Fetcher for the DShield CIDS.
  146. Report can be fetched using the following URL:
  147. https://www.dshield.org/livefeed.html?day=today&start=463960503000&count=1000
  148. Parameters:
  149. day - The day to fetch a report for
  150. start - if of an attack event to start with
  151. count - amount of events to download
  152. TODO: DShield events doensn't seem to be placed in order, not even on a daily accuracy.
  153. """
  154. # day format: YYYY-MM-DD
  155. DSHIELD_URL_DAY_START_COUNT = "http://isc.sans.edu/livefeed.html?day=%s&start=%s&count=%d"
  156. FETCH_MAX_ENTRIES_PER_REQUEST = 30000
  157. def __init__(self, addentry_callback,
  158. filename_save,
  159. report_day=None,
  160. use_source_ip_filter=True,
  161. # reports seem to emerge after max 6 hours
  162. sleeptime=60 * 60 * 12):
  163. #sleeptime=10):
  164. """
  165. use_source_ip_filter -- Skip everything not matching our external IP address.
  166. This implies NOT using spoofed addresses
  167. """
  168. super().__init__(addentry_callback, filename_save)
  169. self._event_id_start = None
  170. # only fetch that specific day
  171. self._report_day = report_day
  172. self._report_day_is_implicit = True if report_day is None else False
  173. self._use_source_ip_filter = use_source_ip_filter
  174. self._current_external_source_ip = None
  175. self._sleeptime = sleeptime
  176. def before_scanning(self):
  177. super().before_scanning()
  178. # DShield seems to auto adjust the id if it's too low for the specified date.
  179. # A too high id lead to an empty result.
  180. # initial starting: set initial event id
  181. if self._event_id_start is None:
  182. current_event_id = "1" # use this to fetch older reports
  183. if self._report_day is None:
  184. self._report_day = time.strftime("%Y-%m-%d")
  185. logger.debug("setting implicit report day: %s" % self._report_day)
  186. current_event_id = self._get_newest_id(self._report_day)
  187. logger.info("using initial id: %s" % current_event_id)
  188. self._event_id_start = current_event_id
  189. if self._use_source_ip_filter:
  190. logger.debug("retrieving external IP address")
  191. try:
  192. self._current_external_source_ip = self.ip_to_dshield_ip(utility.get_external_ip())
  193. logger.debug("normalized DShield IP address: %s" % self._current_external_source_ip)
  194. except Exception:
  195. # no external IP set, non IP based filtering will be used
  196. pass
  197. @staticmethod
  198. def ip_to_dshield_ip(ip):
  199. """
  200. Convert an IP address to an DShield style IP address having padded zeros: "1.2.3.4" -> "001.002.003.004"
  201. """
  202. ip_ints = ip.split(".")
  203. return "%03d.%03d.%03d.%03d" % (int(ip_ints[0]), int(ip_ints[1]), int(ip_ints[2]), int(ip_ints[3]))
  204. def _get_newest_id(self, day):
  205. logger.debug("fetching newest event ID")
  206. event_id_to_return = 1
  207. event_id_to_test = 1
  208. event_id_step = 1000000
  209. report_full = ""
  210. while report_full is not None:
  211. logger.debug("testing ID: %r" % event_id_to_test)
  212. event_id_to_test += event_id_step
  213. report_full, lines, event_id_report = self._fetch_report(day, event_id_to_test, amount=50)
  214. if event_id_report is not None:
  215. #logger.debug("updating ids: %r/%r/%r" % (event_id_report, event_id_to_test,event_id_to_return ))
  216. event_id_to_test = int(event_id_report)
  217. event_id_to_return = event_id_to_test
  218. logger.debug("got freshest id: %s (+ max %d)" % (event_id_to_return, event_id_step))
  219. return event_id_to_return
  220. @staticmethod
  221. def _fetch_report(day, event_id, amount):
  222. if day is None:
  223. day = "today"
  224. url = DShieldReportFetcher.DSHIELD_URL_DAY_START_COUNT % (
  225. day,
  226. event_id,
  227. DShieldReportFetcher.FETCH_MAX_ENTRIES_PER_REQUEST)
  228. try:
  229. request = requests.get(url)
  230. except Exception as e:
  231. logger.warning("could not fetch: %s, connection problems? %r" % (url, e))
  232. return None, None, None
  233. lines = split_newline(request.text)
  234. if len(lines) < 3:
  235. # the last part of the report should have 2 lines left (header, newline)
  236. logger.debug("No events left")
  237. return None, None, None
  238. elif len(lines) < amount - 1:
  239. # length is greater/equal 3
  240. logger.debug("sanity checking if this is a valid report")
  241. logger.debug("first/second line:\n%s\n%s" % (lines[0], lines[1]))
  242. search = pattern_html.search(request.text)
  243. if search is not None:
  244. logger.warning("reaponse seem to contain html: %s, stopping to fetch, current url: %s" %
  245. (search.group(0), url))
  246. return None, None, None
  247. try:
  248. return request.text, lines, int(split_tab(lines[-2])[0])
  249. except ValueError:
  250. # last report ID could not be extracted
  251. return None, None, None
  252. def after_scanning(self):
  253. super().after_scanning()
  254. days_to_check = [self._report_day]
  255. logger.info("waiting %d seconds as DShield reports don't appear instantly" % self._sleeptime)
  256. time.sleep(self._sleeptime)
  257. current_day = time.strftime("%Y-%m-%d")
  258. # assume max 1 day has passed after calling before_scanning()
  259. if self._report_day != current_day and self._report_day_is_implicit:
  260. # assume that attack time is less than two days
  261. logger.warn("initial day is not current day, min 1 day has been passed")
  262. days_to_check.append(current_day)
  263. # update to newest day for next possible iteration
  264. self._report_day = current_day
  265. current_event_id = self._event_id_start
  266. # this should be enough
  267. max_downloads = 1000
  268. #max_downloads = 10
  269. fd_fullsave = open(self._filename_save, "a")
  270. logger.debug("fetching the following days: %r" % days_to_check)
  271. for day in days_to_check:
  272. logger.debug("downloading all events starting from day=%s/ID=%s" % (day, current_event_id))
  273. amount_downloads = 0
  274. while amount_downloads < max_downloads:
  275. logger.info("requesting DShield report, day=%s, ID=%s, amount=%d" %
  276. (day, current_event_id, DShieldReportFetcher.FETCH_MAX_ENTRIES_PER_REQUEST))
  277. report_full, lines, current_event_id = self._fetch_report(day,
  278. current_event_id,
  279. DShieldReportFetcher.FETCH_MAX_ENTRIES_PER_REQUEST)
  280. if report_full is None:
  281. break
  282. fd_fullsave.write(report_full)
  283. cnt = 0
  284. # parse all events
  285. while cnt < len(lines) - 1:
  286. # this skip header: start at 1
  287. cnt += 1
  288. columns = split_tab(lines[cnt])
  289. if len(columns) > 5:
  290. try:
  291. # skip every source IP address not matching our external IP address
  292. if self._current_external_source_ip is not None and self._current_external_source_ip != columns[3]:
  293. continue
  294. self.add_entry(columns[3], int(columns[4]), int(columns[5]))
  295. except Exception:
  296. pass
  297. # these are mainly errors based on missing data
  298. logger.warning("some error occured, ip/sport/dport: %s/%s/%s" %
  299. (columns[3], columns[5], columns[4]))
  300. #logger.warning(ex)
  301. #if cnt < 10:
  302. # logger.debug("decoded IP: %r" % columns[3])
  303. else:
  304. pass
  305. # event entry is too short, report seems to contain scrambled data
  306. #logger.warning("event entry is too short: %r, skipping" % lines[cnt])
  307. # start from last fetched ID, take line "-2" as "-1" is empty
  308. amount_downloads += 1
  309. self._event_id_start = current_event_id
  310. logger.debug("all days fetched")
  311. fd_fullsave.close()
  312. def fetch_dshield(filename_save, report_day=None):
  313. #fetched_entries = defaultdict(int)
  314. def addentry_callback(ip_src, port_src, port_dest):
  315. try:
  316. pass
  317. #entry = "%s__%d__%d" % (ip_src, int(port_src), int(port_dest))
  318. #fetched_entries[entry] += 1
  319. except:
  320. return None
  321. reportfetcher = DShieldReportFetcher(addentry_callback,
  322. filename_save,
  323. report_day=report_day,
  324. sleeptime=1)
  325. reportfetcher.before_scanning()
  326. reportfetcher.after_scanning()
  327. def fetch_tracing(filename_save):
  328. def addentry_callback(ip_src, port_src, port_dest):
  329. try:
  330. pass
  331. #entry = "%s__%d__%d" % (ip_src, int(port_src), int(port_dest))
  332. #fetched_entries[entry] += 1
  333. except:
  334. return None
  335. reportfetcher = TracingReportFetcher(addentry_callback,
  336. filename_save)
  337. reportfetcher.before_scanning()
  338. #reportfetcher.after_scanning()
  339. import queue
  340. if __name__ == "__main__1":
  341. dir_report = "./raw_data_dshield/"
  342. queue_inout = queue.Queue(999999)
  343. year = 2014
  344. month = 3
  345. amount_parallel_fetches = 1
  346. dates = ["%d-%02d-%02d" % (year, month, day) for day in range(15, 16)]
  347. for datestr_in in dates:
  348. queue_inout.put(datestr_in)
  349. def fetch_cycler():
  350. while True:
  351. datestr = queue_inout.get(block=True)
  352. filename_save = dir_report + "dshield_report_" + datestr.replace("-", "_") + ".csv"
  353. logger.debug("file/day: %s/%s" % (filename_save, datestr))
  354. fetch_dshield(filename_save, report_day=datestr)
  355. for x in range(amount_parallel_fetches):
  356. thread = threading.Thread(target=fetch_cycler)
  357. thread.start()
  358. #filename_save = dir_report + "dshield_report_test.csv"
  359. #fetch_dshield(filename_save, report_day=None)
  360. if __name__ == "__main__":
  361. dir_report = "./raw_data_tracing"
  362. fetch_tracing(dir_report + "/attackdata.csv")