StatsDatabase.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. import os.path
  2. import random as rnd
  3. import typing
  4. import sqlite3
  5. import sys
  6. # TODO: double check this import
  7. # does it complain because libpcapreader is not a .py?
  8. import ID2TLib.libpcapreader as pr
  9. import Core.QueryParser as qp
  10. import pyparsing as pp
  11. def dict_gen(curs: sqlite3.Cursor):
  12. """
  13. Generates a dictionary of a sqlite3.Cursor object by fetching the query's results.
  14. Taken from Python Essential Reference by David Beazley.
  15. """
  16. field_names = [d[0] for d in curs.description]
  17. while True:
  18. rows = curs.fetchmany()
  19. if not rows:
  20. return
  21. for row in rows:
  22. yield dict(zip(field_names, row))
  23. class QueryExecutionException(Exception):
  24. pass
  25. class StatsDatabase:
  26. def __init__(self, db_path: str):
  27. """
  28. Creates a new StatsDatabase.
  29. :param db_path: The path to the database file
  30. """
  31. self.query_parser = qp.QueryParser()
  32. self.existing_db = os.path.exists(db_path)
  33. self.database = sqlite3.connect(db_path)
  34. self.cursor = self.database.cursor()
  35. self.current_interval_statistics_table = ""
  36. # If DB not existing, create a new DB scheme
  37. if self.existing_db:
  38. if self.get_db_outdated():
  39. print('Statistics database outdated. Recreating database at: ', db_path)
  40. else:
  41. print('Located statistics database at: ', db_path)
  42. else:
  43. print('Statistics database not found. Creating new database at: ', db_path)
  44. def get_file_info(self):
  45. """
  46. Retrieves general file statistics from the database. This includes:
  47. - packetCount : Number of packets in the PCAP file
  48. - captureDuration : Duration of the packet capture in seconds
  49. - timestampFirstPacket : Timestamp of the first captured packet
  50. - timestampLastPacket : Timestamp of the last captured packet
  51. - avgPacketRate : Average packet rate
  52. - avgPacketSize : Average packet size
  53. - avgPacketsSentPerHost : Average number of packets sent per host
  54. - avgBandwidthIn : Average incoming bandwidth
  55. - avgBandwidthOut : Average outgoing bandwidth
  56. :return: a dictionary of keys (see above) and their respective values
  57. """
  58. return [r for r in dict_gen(
  59. self.cursor.execute('SELECT * FROM file_statistics'))][0]
  60. def get_db_exists(self):
  61. """
  62. :return: True if the database was already existent, otherwise False
  63. """
  64. return self.existing_db
  65. def get_db_outdated(self):
  66. """
  67. Retrieves the database version from the database and compares it to the version
  68. it should have to check whether the database is outdated and needs to be recreated.
  69. :return: True if the versions match, otherwise False
  70. """
  71. self.cursor.execute('PRAGMA user_version;')
  72. return self.cursor.fetchall()[0][0] != pr.pcap_processor.get_db_version()
  73. @staticmethod
  74. def _get_selector_keywords():
  75. """
  76. :return: a list of selector keywords
  77. """
  78. return ['most_used', 'least_used', 'avg', 'all']
  79. @staticmethod
  80. def _get_parametrized_selector_keywords():
  81. """
  82. :return: a list of parameterizable selector keywords
  83. """
  84. return ['ipaddress', 'macaddress']
  85. @staticmethod
  86. def _get_extractor_keywords():
  87. """
  88. :return: a list of extractor keywords
  89. """
  90. return ['random', 'first', 'last']
  91. def get_all_named_query_keywords(self):
  92. """
  93. :return: a list of all named query keywords, used to identify named queries
  94. """
  95. return (
  96. self._get_selector_keywords() + self._get_parametrized_selector_keywords() + self._get_extractor_keywords())
  97. @staticmethod
  98. def get_all_sql_query_keywords():
  99. """
  100. :return: a list of all supported SQL keywords, used to identify SQL queries
  101. """
  102. return ["select", "insert"]
  103. def process_user_defined_query(self, query_string: str, query_parameters: tuple = None):
  104. """
  105. Takes as input a SQL query query_string and optional a tuple of parameters which are marked by '?' in the query
  106. and later substituted.
  107. :param query_string: The query to execute
  108. :param query_parameters: The tuple of parameters to inject into the query
  109. :return: the results of the query
  110. """
  111. if query_parameters is not None:
  112. self.cursor.execute(query_string, query_parameters)
  113. else:
  114. self.cursor.execute(query_string)
  115. self.database.commit()
  116. return self.cursor.fetchall()
  117. def get_field_types(self, *table_names):
  118. """
  119. Creates a dictionary whose keys are the fields of the given table(s) and whose values are the appropriate field
  120. types, like TEXT for strings and REAL for float numbers.
  121. :param table_names: The name of table(s)
  122. :return: a dictionary of {field_name : field_type} for fields of all tables
  123. """
  124. dic = {}
  125. for table in table_names:
  126. self.cursor.execute("PRAGMA table_info('%s')" % table)
  127. results = self.cursor.fetchall()
  128. for field in results:
  129. dic[field[1].lower()] = field[2]
  130. return dic
  131. def get_current_interval_statistics_table(self):
  132. """
  133. :return: the current interval statistics table used for internal calculations
  134. """
  135. return self.current_interval_statistics_table
  136. def set_current_interval_statistics_table(self, current_interval: float=0.0):
  137. """
  138. Sets the current interval statistics table, which should be used for internal calculations.
  139. :param current_interval: the current interval, which should be used for internal calculations, in seconds
  140. """
  141. if current_interval == 0.0:
  142. table_name = self.process_db_query("SELECT name FROM interval_tables WHERE is_default=1")
  143. print(table_name)
  144. print("No user specified interval found. Using default interval: " +
  145. str(float(table_name[len("interval_statistics_"):])/1000000) + "s")
  146. else:
  147. self.current_interval_statistics_table = "interval_statistics_" + str(int(current_interval*1000000))
  148. print("User specified interval(s) found. Using first interval length given: " + str(current_interval) + "s")
  149. def named_query_parameterized(self, keyword: str, param_op_val: list):
  150. """
  151. Executes a parameterizable named query.
  152. :param keyword: The query to be executed, like ipaddress or macadress
  153. :param param_op_val: A list consisting of triples with (parameter, operator, value)
  154. :return: the results of the executed query
  155. """
  156. named_queries = {
  157. "ipaddress": "SELECT DISTINCT ip_statistics.ipAddress from ip_statistics INNER JOIN ip_mac, ip_ttl, "
  158. "ip_ports, ip_protocols ON ip_statistics.ipAddress=ip_mac.ipAddress AND "
  159. "ip_statistics.ipAddress=ip_ttl.ipAddress AND ip_statistics.ipAddress=ip_ports.ipAddress "
  160. "AND ip_statistics.ipAddress=ip_protocols.ipAddress WHERE ",
  161. "macaddress": "SELECT DISTINCT macAddress from ip_mac WHERE "}
  162. query = named_queries.get(keyword)
  163. field_types = self.get_field_types('ip_mac', 'ip_ttl', 'ip_ports', 'ip_protocols', 'ip_statistics', 'ip_mac')
  164. conditions = []
  165. for key, op, value in param_op_val:
  166. # Check whether the value is not a simple value, but another query (or list)
  167. if isinstance(value, pp.ParseResults):
  168. if value[0] == "list":
  169. # We have a list, cut the token off and use the remaining elements
  170. value = value[1:]
  171. # Lists can only be used with "in"
  172. if op is not "in":
  173. raise QueryExecutionException("List values require the usage of the 'in' operator!")
  174. else:
  175. # If we have another query instead of a direct value, execute and replace it
  176. rvalue = self._execute_query_list(value)
  177. # Do we have a comparison operator with a multiple-result query?
  178. if op is not "in" and value[0] in ['most_used', 'least_used', 'all', 'ipaddress_param',
  179. 'macaddress_param']:
  180. raise QueryExecutionException("The extractor '" + value[0] +
  181. "' may return more than one result!")
  182. # Make value contain a simple list with the results of the query
  183. value = map(lambda x: str(x[0]), rvalue)
  184. else:
  185. # Make sure value is a list now to simplify handling
  186. value = [value]
  187. # this makes sure that TEXT fields are queried by strings,
  188. # e.g. ipAddress=192.168.178.1 --is-converted-to--> ipAddress='192.168.178.1'
  189. if field_types.get(key) == 'TEXT':
  190. def ensure_string(x):
  191. if not str(x).startswith("'") and not str(x).startswith('"'):
  192. return "'" + x + "'"
  193. else:
  194. return x
  195. value = map(ensure_string, value)
  196. # If we have more than one value, join them together, separated by commas
  197. value = ",".join(map(str, value))
  198. # this replacement is required to remove ambiguity in SQL query
  199. if key == 'ipAddress':
  200. key = 'ip_mac.ipAddress'
  201. conditions.append(key + " " + op + " (" + str(value) + ")")
  202. where_clause = " AND ".join(conditions)
  203. query += where_clause
  204. self.cursor.execute(query)
  205. return self.cursor.fetchall()
  206. named_queries = {
  207. "most_used.ipaddress": "SELECT ipAddress FROM ip_statistics WHERE (pktsSent+pktsReceived) == "
  208. "(SELECT MAX(pktsSent+pktsReceived) from ip_statistics) ORDER BY ipAddress ASC",
  209. "most_used.macaddress": "SELECT macAddress FROM (SELECT macAddress, COUNT(*) as occ from ip_mac GROUP BY "
  210. "macAddress) WHERE occ=(SELECT COUNT(*) as occ from ip_mac GROUP BY macAddress "
  211. "ORDER BY occ DESC LIMIT 1) ORDER BY macAddress ASC",
  212. "most_used.portnumber": "SELECT portNumber FROM ip_ports GROUP BY portNumber HAVING COUNT(portNumber)="
  213. "(SELECT MAX(cntPort) from (SELECT portNumber, COUNT(portNumber) as cntPort FROM "
  214. "ip_ports GROUP BY portNumber)) ORDER BY portNumber ASC",
  215. "most_used.protocolname": "SELECT protocolName FROM ip_protocols GROUP BY protocolName HAVING "
  216. "COUNT(protocolCount)=(SELECT COUNT(protocolCount) as cnt FROM ip_protocols "
  217. "GROUP BY protocolName ORDER BY cnt DESC LIMIT 1) ORDER BY protocolName ASC",
  218. "most_used.ttlvalue": "SELECT ttlValue FROM (SELECT ttlValue, SUM(ttlCount) as occ FROM ip_ttl GROUP BY "
  219. "ttlValue) WHERE occ=(SELECT SUM(ttlCount) as occ FROM ip_ttl GROUP BY ttlValue "
  220. "ORDER BY occ DESC LIMIT 1) ORDER BY ttlValue ASC",
  221. "most_used.mssvalue": "SELECT mssValue FROM (SELECT mssValue, SUM(mssCount) as occ FROM tcp_mss GROUP BY "
  222. "mssValue) WHERE occ=(SELECT SUM(mssCount) as occ FROM tcp_mss GROUP BY mssValue "
  223. "ORDER BY occ DESC LIMIT 1) ORDER BY mssValue ASC",
  224. "most_used.winsize": "SELECT winSize FROM (SELECT winSize, SUM(winCount) as occ FROM tcp_win GROUP BY "
  225. "winSize) WHERE occ=(SELECT SUM(winCount) as occ FROM tcp_win GROUP BY winSize ORDER "
  226. "BY occ DESC LIMIT 1) ORDER BY winSize ASC",
  227. "most_used.ipclass": "SELECT ipClass FROM (SELECT ipClass, COUNT(*) as occ from ip_statistics GROUP BY "
  228. "ipClass ORDER BY occ DESC) WHERE occ=(SELECT COUNT(*) as occ from ip_statistics "
  229. "GROUP BY ipClass ORDER BY occ DESC LIMIT 1) ORDER BY ipClass ASC",
  230. "least_used.ipaddress": "SELECT ipAddress FROM ip_statistics WHERE (pktsSent+pktsReceived) == (SELECT "
  231. "MIN(pktsSent+pktsReceived) from ip_statistics) ORDER BY ipAddress ASC",
  232. "least_used.macaddress": "SELECT macAddress FROM (SELECT macAddress, COUNT(*) as occ from ip_mac GROUP "
  233. "BY macAddress) WHERE occ=(SELECT COUNT(*) as occ from ip_mac GROUP BY macAddress "
  234. "ORDER BY occ ASC LIMIT 1) ORDER BY macAddress ASC",
  235. "least_used.portnumber": "SELECT portNumber FROM ip_ports GROUP BY portNumber HAVING COUNT(portNumber)="
  236. "(SELECT MIN(cntPort) from (SELECT portNumber, COUNT(portNumber) as cntPort FROM "
  237. "ip_ports GROUP BY portNumber)) ORDER BY portNumber ASC",
  238. "least_used.protocolname": "SELECT protocolName FROM ip_protocols GROUP BY protocolName HAVING "
  239. "COUNT(protocolCount)=(SELECT COUNT(protocolCount) as cnt FROM ip_protocols "
  240. "GROUP BY protocolName ORDER BY cnt ASC LIMIT 1) ORDER BY protocolName ASC",
  241. "least_used.ttlvalue": "SELECT ttlValue FROM (SELECT ttlValue, SUM(ttlCount) as occ FROM ip_ttl GROUP BY "
  242. "ttlValue) WHERE occ=(SELECT SUM(ttlCount) as occ FROM ip_ttl GROUP BY ttlValue "
  243. "ORDER BY occ ASC LIMIT 1) ORDER BY ttlValue ASC",
  244. "least_used.mssvalue": "SELECT mssValue FROM (SELECT mssValue, SUM(mssCount) as occ FROM tcp_mss GROUP BY "
  245. "mssValue) WHERE occ=(SELECT SUM(mssCount) as occ FROM tcp_mss GROUP BY mssValue "
  246. "ORDER BY occ ASC LIMIT 1) ORDER BY mssValue ASC",
  247. "least_used.winsize": "SELECT winSize FROM (SELECT winSize, SUM(winCount) as occ FROM tcp_win GROUP BY "
  248. "winSize) WHERE occ=(SELECT SUM(winCount) as occ FROM tcp_win GROUP BY winSize "
  249. "ORDER BY occ ASC LIMIT 1) ORDER BY winSize ASC",
  250. "least_used.ipclass": "SELECT ipClass FROM (SELECT ipClass, COUNT(*) as occ from ip_statistics GROUP BY "
  251. "ipClass ORDER BY occ DESC) WHERE occ=(SELECT COUNT(*) as occ from ip_statistics "
  252. "GROUP BY ipClass ORDER BY occ ASC LIMIT 1) ORDER BY ipClass ASC",
  253. "avg.pktsreceived": "SELECT avg(pktsReceived) from ip_statistics",
  254. "avg.pktssent": "SELECT avg(pktsSent) from ip_statistics",
  255. "avg.kbytesreceived": "SELECT avg(kbytesReceived) from ip_statistics",
  256. "avg.kbytessent": "SELECT avg(kbytesSent) from ip_statistics",
  257. "avg.ttlvalue": "SELECT avg(ttlValue) from ip_ttl",
  258. "avg.mss": "SELECT avg(mssValue) from tcp_mss",
  259. "all.ipaddress": "SELECT ipAddress from ip_statistics ORDER BY ipAddress ASC",
  260. "all.ttlvalue": "SELECT DISTINCT ttlValue from ip_ttl ORDER BY ttlValue ASC",
  261. "all.mss": "SELECT DISTINCT mssValue from tcp_mss ORDER BY mssValue ASC",
  262. "all.macaddress": "SELECT DISTINCT macAddress from ip_mac ORDER BY macAddress ASC",
  263. "all.portnumber": "SELECT DISTINCT portNumber from ip_ports ORDER BY portNumber ASC",
  264. "all.protocolname": "SELECT DISTINCT protocolName from ip_protocols ORDER BY protocolName ASC",
  265. "all.winsize": "SELECT DISTINCT winSize FROM tcp_win ORDER BY winSize ASC",
  266. "all.ipclass": "SELECT DISTINCT ipClass FROM ip_statistics ORDER BY ipClass ASC"}
  267. def _execute_query_list(self, query_list):
  268. """
  269. Recursively executes a list of named queries. They are of the following form:
  270. ['macaddress_param', [['ipaddress', 'in', ['most_used', 'ipaddress']]]]
  271. :param query_list: The query statement list obtained from the query parser
  272. :return: The result of the query (either a single result or a list).
  273. """
  274. if query_list[0] == "random":
  275. return [rnd.choice(self._execute_query_list(query_list[1:]))]
  276. elif query_list[0] == "first":
  277. return [self._execute_query_list(query_list[1:])[0]]
  278. elif query_list[0] == "last":
  279. return [self._execute_query_list(query_list[1:])[-1]]
  280. elif query_list[0] == "macaddress_param":
  281. return self.named_query_parameterized("macaddress", query_list[1])
  282. elif query_list[0] == "ipaddress_param":
  283. return self.named_query_parameterized("ipaddress", query_list[1])
  284. else:
  285. query = self.named_queries.get(query_list[0] + "." + query_list[1])
  286. if query is None:
  287. raise QueryExecutionException("The requested query '" + query_list[0] + "(" + query_list[1] +
  288. ")' was not found in the internal query list!")
  289. self.cursor.execute(str(query))
  290. # TODO: fetch query on demand
  291. last_result = self.cursor.fetchall()
  292. return last_result
  293. def process_db_query(self, query_string_in: str, print_results=False, sql_query_parameters: tuple = None):
  294. """
  295. Processes a database query. This can either be a standard SQL query or a named query (predefined query).
  296. :param query_string_in: The string containing the query
  297. :param print_results: Indicated whether the results should be printed to terminal (True) or not (False)
  298. :param sql_query_parameters: Parameters for the SQL query (optional)
  299. :return: the results of the query
  300. """
  301. named_query_keywords = self.get_all_named_query_keywords()
  302. # Clean query_string
  303. query_string = query_string_in.lower().lstrip()
  304. # query_string is a user-defined SQL query
  305. result = None
  306. if sql_query_parameters is not None or query_string.startswith("select") or query_string.startswith("insert"):
  307. result = self.process_user_defined_query(query_string, sql_query_parameters)
  308. # query string is a named query -> parse it and pass it to statisticsDB
  309. elif any(k in query_string for k in named_query_keywords) and all(k in query_string for k in ['(', ')']):
  310. if query_string[-1] != ";":
  311. query_string += ";"
  312. query_list = self.query_parser.parse_query(query_string)
  313. result = self._execute_query_list(query_list)
  314. else:
  315. sys.stderr.write(
  316. "Query invalid. Only named queries and SQL SELECT/INSERT allowed. Please check the query's syntax!\n")
  317. return
  318. # If result is tuple/list with single element, extract value from list
  319. requires_extraction = (isinstance(result, list) or isinstance(result, tuple)) and len(result) == 1 and \
  320. (not isinstance(result[0], tuple) or len(result[0]) == 1)
  321. while requires_extraction:
  322. if isinstance(result, list) or isinstance(result, tuple):
  323. result = result[0]
  324. else:
  325. requires_extraction = False
  326. # If tuple of tuples or list of tuples, each consisting of single element is returned,
  327. # then convert it into list of values, because the returned column is clearly specified by the given query
  328. if (isinstance(result, tuple) or isinstance(result, list)) and all(len(val) == 1 for val in result):
  329. result = [c for c in result for c in c]
  330. # Print results if option print_results is True
  331. if print_results:
  332. if isinstance(result, list) and len(result) == 1:
  333. result = result[0]
  334. print("Query returned 1 record:\n")
  335. for i in range(0, len(result)):
  336. print(str(self.cursor.description[i][0]) + ": " + str(result[i]))
  337. else:
  338. self._print_query_results(query_string_in, result if isinstance(result, list) else [result])
  339. return result
  340. def process_interval_statistics_query(self, query_string_in: str):
  341. """
  342. :param query_string_in:
  343. :return:
  344. """
  345. if self.current_interval_statistics_table != "":
  346. table_name = self.current_interval_statistics_table
  347. else:
  348. table_name = self.process_db_query("SELECT name FROM interval_tables WHERE is_default=1")
  349. return self.process_user_defined_query(query_string_in % table_name)
  350. def _print_query_results(self, query_string_in: str, result: typing.List[typing.Union[str, float, int]]) -> None:
  351. """
  352. Prints the results of a query.
  353. Based on http://stackoverflow.com/a/20383011/3017719.
  354. :param query_string_in: The query the results belong to
  355. :param result: The list of query results
  356. """
  357. # Print number of results according to type of result
  358. if len(result) == 1:
  359. print("Query returned 1 record:\n")
  360. else:
  361. print("Query returned " + str(len(result)) + " records:\n")
  362. # Print query results
  363. if query_string_in.lstrip().upper().startswith(
  364. "SELECT") and result is not None and self.cursor.description is not None:
  365. widths = []
  366. columns = []
  367. tavnit = '|'
  368. separator = '+'
  369. for index, cd in enumerate(self.cursor.description):
  370. max_col_length = 0
  371. if len(result) > 0:
  372. max_col_length = max(list(map(lambda x:
  373. len(str(x[index] if len(self.cursor.description) > 1 else x)),
  374. result)))
  375. widths.append(max(len(cd[0]), max_col_length))
  376. columns.append(cd[0])
  377. for w in widths:
  378. tavnit += " %-" + "%ss |" % (w,)
  379. separator += '-' * w + '--+'
  380. print(separator)
  381. print(tavnit % tuple(columns))
  382. print(separator)
  383. for row in result:
  384. print(tavnit % row)
  385. print(separator)
  386. else:
  387. print(result)