3
0
Преглед на файлове

Merge branch 'UT_pyparsing_final_SKIP_CI' of stefan.schmidt/ID2T-toolkit into master

Carlos Garcia преди 6 години
родител
ревизия
4479ed9057
променени са 5 файла, в които са добавени 371 реда и са изтрити 238 реда
  1. 140 0
      code/Test/test_NamedQueries.py
  2. 38 0
      code/Test/test_NestedNamedQueries.py
  3. 0 238
      code/Test/test_Queries.py
  4. 45 0
      code/Test/test_SQLQueries.py
  5. 148 0
      code/Test/test_internalQueries.py

+ 140 - 0
code/Test/test_NamedQueries.py

@@ -0,0 +1,140 @@
+import unittest
+import pyparsing
+
+import ID2TLib.TestLibrary as Lib
+import Core.Controller as Ctrl
+
+controller = Ctrl.Controller(pcap_file_path=Lib.test_pcap, do_extra_tests=False, non_verbose=True)
+controller.load_pcap_statistics(flag_write_file=False, flag_recalculate_stats=True, flag_print_statistics=False)
+
+ip_addresses = ["10.0.2.15",      "104.83.103.45",  "13.107.21.200",  "131.253.61.100", "172.217.23.142",
+                "172.217.23.174", "192.168.33.254", "204.79.197.200", "23.51.123.27",   "35.161.3.50",
+                "52.11.17.245",   "52.34.37.177",   "52.39.210.199",  "52.41.250.141",  "52.85.173.182",
+                "54.149.74.139",  "54.187.98.195",  "54.192.44.108",  "54.192.44.177",  "72.247.178.113",
+                "72.247.178.67",  "93.184.220.29"]
+
+allWinSize = [0,     822,   1330,  5082,  8192,  9900,  27060, 35657, 39917, 47030, 50782, 51310, 52202, 52740, 55062,
+              56492, 58520, 59950, 59980, 61380, 62788, 62810, 62811, 62906, 63056, 63076, 63086, 63151, 63261, 63350,
+              63370, 63400, 63409, 63456, 63516, 63547, 63552, 63572, 63603, 63628, 63655, 63663, 63675, 63686, 63706,
+              63839, 63842, 63886, 63893, 63917, 63954, 63963, 63982, 63991, 64000, 64005, 64088, 64110, 64148, 64165,
+              64177, 64189, 64194, 64198, 64209, 64230, 64240, 65535]
+
+leastUsedWinASize = [822,   1330,  5082,  9900,  27060, 35657, 39917, 47030, 50782, 51310, 52202, 52740, 55062, 56492,
+                     58520, 59950, 59980, 61380, 63056, 63963, 63982, 64000, 64005, 64198, 64230]
+
+allPort = [53,    80,    443,   49157, 49160, 49163, 49164, 49165, 49166, 49167, 49168, 49169, 49170, 49171, 49172,
+           49173, 49174, 49175, 49176, 49177, 49178, 49179, 49180, 49181, 49182, 49183, 49184, 49185, 49186, 49187,
+           49188, 49189, 49190, 49191, 49192, 49193, 49194, 49195, 49196, 49197, 49247, 49323, 49470, 49636, 49695,
+           49798, 49927, 49935, 49945, 50262, 50836, 50968, 51143, 51166, 51350, 51451, 51669, 51713, 52033, 52135,
+           52399, 52520, 52644, 52697, 52743, 52786, 52964, 52981, 53059, 53234, 53461, 53691, 53708, 53745, 53836,
+           54049, 54446, 54593, 54598, 54652, 54663, 54717, 54853, 54930, 55004, 55018, 55119, 55125, 55299, 55310,
+           55463, 55650, 55667, 55752, 55843, 55851, 56146, 56325, 56567, 56589, 56750, 57049, 57179, 57275, 57520,
+           57653, 57840, 57957, 57991, 58401, 58440, 58645, 58797, 58814, 58905, 58913, 58943, 59380, 59408, 59461,
+           59467, 59652, 59660, 59718, 59746, 59844, 60006, 60209, 60414, 60422, 60659, 60696, 60708, 60756, 60827,
+           60840, 61181, 61300, 61592, 61718, 61738, 61769, 61807, 62412, 62428, 62447, 62490, 62625, 62626, 62664,
+           63425, 64096, 64121, 64137, 64252, 64334, 64337, 64479, 64509, 64637, 64807, 64811, 65448, 65487]
+
+
+class UnitTestNamedQueries(unittest.TestCase):
+    def test_most_used_ipaddress(self):
+        self.assertEqual(controller.statistics.process_db_query('most_used(ipaddress)'), '10.0.2.15')
+
+    def test_most_used_macaddress(self):
+        self.assertEqual(controller.statistics.process_db_query('most_used(macaddress)'), '52:54:00:12:35:02')
+
+    def test_most_used_portnumber(self):
+        self.assertEqual(controller.statistics.process_db_query('most_used(portnumber)'), 443)
+
+    def test_most_used_protocolname(self):
+        self.assertEqual(controller.statistics.process_db_query('most_used(protocolname)'), 'IPv4')
+
+    def test_most_used_ttlvalue(self):
+        self.assertEqual(controller.statistics.process_db_query('most_used(ttlvalue)'), 64)
+
+    def test_most_used_mssvalue(self):
+        self.assertEqual(controller.statistics.process_db_query('most_used(mssvalue)'), 1460)
+
+    def test_most_used_winsize(self):
+        self.assertEqual(controller.statistics.process_db_query('most_used(winsize)'), 65535)
+
+    def test_most_used_ipclass(self):
+        self.assertEqual(controller.statistics.process_db_query('most_used(ipclass)'), 'A')
+
+    def test_least_used_ipaddress(self):
+        self.assertEqual(controller.statistics.process_db_query('least_used(ipaddress)'), '72.247.178.113')
+
+    def test_least_used_macaddress(self):
+        self.assertEqual(controller.statistics.process_db_query('least_used(macaddress)'), '08:00:27:a3:83:43')
+
+    def test_least_used_portnumber(self):
+        self.assertEqual(controller.statistics.process_db_query('least_used(portnumber)'), [58645, 59844])
+
+    def test_least_used_protocolname(self):
+        self.assertEqual(controller.statistics.process_db_query('least_used(protocolname)'), 'UDP')
+
+    def test_least_used_ttlvalue(self):
+        self.assertEqual(controller.statistics.process_db_query('least_used(ttlvalue)'), 255)
+
+    def test_least_used_mssvalue(self):
+        self.assertEqual(controller.statistics.process_db_query('least_used(mssvalue)'), 1460)
+
+    def least_used_winsize(self):
+        self.assertEqual(controller.statistics.process_db_query('least_used(winsize)'), leastUsedWinASize)
+
+    def test_least_used_ipclass(self):
+        self.assertEqual(controller.statistics.process_db_query('least_used(ipclass)'), ['A-private', 'C', 'C-private'])
+
+    def test_avg_pktsreceived(self):
+        self.assertEqual(controller.statistics.process_db_query('avg(pktsreceived)'), 90.36363636363636)
+
+    def test_avg_pktssent(self):
+        self.assertEqual(controller.statistics.process_db_query('avg(pktssent)'), 90.36363636363636)
+
+    def test_avg_kbytesreceived(self):
+        self.assertEqual(controller.statistics.process_db_query('avg(kbytesreceived)'), 30.289683948863637)
+
+    def test_avg_kbytessent(self):
+        self.assertEqual(controller.statistics.process_db_query('avg(kbytessent)'), 30.289683948863637)
+
+    def test_avg_ttlvalue(self):
+        self.assertEqual(controller.statistics.process_db_query('avg(ttlvalue)'), 75.08695652173913)
+
+    def test_avg_mss(self):
+        self.assertEqual(controller.statistics.process_db_query('avg(mss)'), 1460.0)
+
+    def test_avg_ipaddress(self):
+        with self.assertRaises(pyparsing.ParseException):
+            controller.statistics.process_db_query('avg(ipAddress)')
+
+    def test_all_ipaddress(self):
+        self.assertEqual(controller.statistics.process_db_query('all(ipaddress)'), ip_addresses)
+
+    def test_all_ttlvalue(self):
+        self.assertEqual(controller.statistics.process_db_query('all(ttlvalue)'), [64, 128, 255])
+
+    def test_all_mss(self):
+        self.assertEqual(controller.statistics.process_db_query('all(mss)'), 1460)
+
+    def test_all_macaddress(self):
+        self.assertEqual(controller.statistics.process_db_query('all(macaddress)'), ['08:00:27:a3:83:43',
+                                                                                     '52:54:00:12:35:02'])
+
+    def test_all_portnumber(self):
+        self.assertEqual(controller.statistics.process_db_query('all(portnumber)'), allPort)
+
+    def test_all_protocolname(self):
+        self.assertEqual(controller.statistics.process_db_query('all(protocolname)'), ['IPv4', 'TCP', 'UDP'])
+
+    def test_all_winsize(self):
+        self.assertEqual(controller.statistics.process_db_query('all(winSize)'),
+                         allWinSize)
+
+    def test_all_ipclass(self):
+        self.assertEqual(controller.statistics.process_db_query('all(ipClass)'),
+                         ['A', 'A-private', 'B', 'C', 'C-private'])
+
+    def test_is_query_named_query(self):
+        self.assertTrue(controller.statistics.is_query('least_used(ipaddress)'))
+
+    def test_is_query_no_string(self):
+        self.assertFalse(controller.statistics.is_query(42))

+ 38 - 0
code/Test/test_NestedNamedQueries.py

@@ -0,0 +1,38 @@
+import unittest
+import sqlite3
+import pyparsing
+
+import ID2TLib.TestLibrary as Lib
+import Core.Controller as Ctrl
+
+controller = Ctrl.Controller(pcap_file_path=Lib.test_pcap, do_extra_tests=False, non_verbose=True)
+controller.load_pcap_statistics(flag_write_file=False, flag_recalculate_stats=True, flag_print_statistics=False)
+
+
+class UnitTestNestedNamedQueries(unittest.TestCase):
+    def test_nested_query(self):
+        self.assertEqual(controller.statistics.process_db_query('macaddress(ipaddress in most_used(ipaddress))'),
+                         '08:00:27:a3:83:43')
+        self.assertEqual(controller.statistics.process_db_query('macaddress(ipaddress in least_used(ipaddress))'),
+                         '52:54:00:12:35:02')
+        self.assertEqual(controller.statistics.process_db_query('ipaddress(macaddress in least_used(macaddress))'),
+                         '10.0.2.15')
+        self.assertEqual(controller.statistics.process_db_query('ipaddress(macaddress in 08:00:27:a3:83:43)'),
+                         '10.0.2.15')
+        self.assertEqual(controller.statistics.process_db_query('ipaddress(macaddress in [08:00:27:a3:83:43])'),
+                         '10.0.2.15')
+        self.assertEqual(controller.statistics.process_db_query('ipaddress(macaddress in most_used(macaddress))'),
+                         ['104.83.103.45', '13.107.21.200', '131.253.61.100', '172.217.23.142', '172.217.23.174',
+                          '192.168.33.254', '204.79.197.200', '23.51.123.27', '35.161.3.50', '52.11.17.245',
+                          '52.34.37.177', '52.39.210.199', '52.41.250.141', '52.85.173.182', '54.149.74.139',
+                          '54.187.98.195', '54.192.44.108', '54.192.44.177', '72.247.178.113', '72.247.178.67',
+                          '93.184.220.29'])
+
+        # semantically incorrect query
+        with self.assertRaises(sqlite3.OperationalError):
+            controller.statistics.process_db_query('ipaddress(ipaddress in most_used(macaddress))')
+
+        # syntactically incorrect query
+        with self.assertRaises(pyparsing.ParseException):
+            controller.statistics.process_db_query('ipaddress(macaddress in '
+                                                   'most_used(macaddress(ipaddress in least_used(ipaddress))))')

+ 0 - 238
code/Test/test_Queries.py

@@ -1,238 +0,0 @@
-import random
-import unittest
-
-import Core.Controller as Ctrl
-import ID2TLib.TestLibrary as Test
-
-# TODO: improve coverage
-
-controller = Ctrl.Controller(pcap_file_path=Test.test_pcap, do_extra_tests=False, non_verbose=True)
-controller.load_pcap_statistics(flag_write_file=False, flag_recalculate_stats=True, flag_print_statistics=False)
-
-file_information = [('Pcap file path', Test.test_pcap),
-                    ('Total packet count', 1998, 'packets'),
-                    ("Recognized packets", 1988, "packets"),
-                    ("Unrecognized packets", 10, "PDUs"), ("% Recognized packets", 99.49949949949949, "%"),
-                    ("% Unrecognized packets", 0.5005005005005005, "%"),
-                    ("Last unknown PDU", '1970-01-01 01:07:39.604899'),
-                    ('Capture duration', '25.4294414520264', 'seconds'),
-                    ('Capture start', '\t1970-01-01 01:01:45.647675'),
-                    ('Capture end', '\t1970-01-01 01:08:10.102034')]
-
-file_statistics = [('Avg. packet rate', 78.57034301757812, 'packets/sec'), ('Avg. packet size', 0.0, 'kbytes'),
-                   ('Avg. packets sent', 90.0, 'packets'), ('Avg. bandwidth in', 9.5290, 'kbit/s'),
-                   ('Avg. bandwidth out', 9.5290, 'kbit/s')]
-
-ip_addresses = ["10.0.2.15", "104.83.103.45", "13.107.21.200", "131.253.61.100", "172.217.23.142",
-                "172.217.23.174", "192.168.33.254", "204.79.197.200", "23.51.123.27", "35.161.3.50",
-                "52.11.17.245", "52.34.37.177", "52.39.210.199", "52.41.250.141", "52.85.173.182",
-                "54.149.74.139", "54.187.98.195", "54.192.44.108", "54.192.44.177", "72.247.178.113",
-                "72.247.178.67", "93.184.220.29"]
-ports = [53, 80, 443, 49157, 49160, 49163, 49164, 49165, 49166, 49167, 49168, 49169, 49170, 49171, 49172, 49173, 49174,
-         49175, 49176, 49177, 49178, 49179, 49180, 49181, 49182, 49183, 49184, 49185, 49186, 49187, 49188, 49189, 49190,
-         49191, 49192, 49193, 49194, 49195, 49196, 49197, 49247, 49323, 49470, 49636, 49695, 49798, 49927, 49935, 49945,
-         50262, 50836, 50968, 51143, 51166, 51350, 51451, 51669, 51713, 52033, 52135, 52399, 52520, 52644, 52697, 52743,
-         52786, 52964, 52981, 53059, 53234, 53461, 53691, 53708, 53745, 53836, 54049, 54446, 54593, 54598, 54652, 54663,
-         54717, 54853, 54930, 55004, 55018, 55119, 55125, 55299, 55310, 55463, 55650, 55667, 55752, 55843, 55851, 56146,
-         56325, 56567, 56589, 56750, 57049, 57179, 57275, 57520, 57653, 57840, 57957, 57991, 58401, 58440, 58645, 58797,
-         58814, 58905, 58913, 58943, 59380, 59408, 59461, 59467, 59652, 59660, 59718, 59746, 59844, 60006, 60209, 60414,
-         60422, 60659, 60696, 60708, 60756, 60827, 60840, 61181, 61300, 61592, 61718, 61738, 61769, 61807, 62412, 62428,
-         62447, 62490, 62625, 62626, 62664, 63425, 64096, 64121, 64137, 64252, 64334, 64337, 64479, 64509, 64637, 64807,
-         64811, 65448, 65487]
-
-
-class TestQueries(unittest.TestCase):
-    def test_get_file_information(self):
-        self.assertEqual(controller.statistics.get_file_information(), file_information)
-
-    def test_get_general_file_statistics(self):
-        file_stats = controller.statistics.get_general_file_statistics()
-        file_stats[3] = ('Avg. bandwidth in', round(file_stats[3][1], 4), 'kbit/s')
-        file_stats[4] = ('Avg. bandwidth out', round(file_stats[4][1], 4), 'kbit/s')
-        self.assertEqual(file_stats, file_statistics)
-
-    def test_get_capture_duration(self):
-        self.assertEqual(controller.statistics.get_capture_duration(), '25.4294414520264')
-
-    def test_get_pcap_timestamp_start(self):
-        self.assertEqual(controller.statistics.get_pcap_timestamp_start(), '1970-01-01 01:01:45.647675')
-
-    def test_get_pcap_timestamp_end(self):
-        self.assertEqual(controller.statistics.get_pcap_timestamp_end(), '1970-01-01 01:08:10.102034')
-
-    def test_get_pps_sent_1(self):
-        self.assertEqual(controller.statistics.get_pps_sent(ip_address='72.247.178.67'), 0)
-
-    def test_get_pps_sent_2(self):
-        self.assertEqual(controller.statistics.get_pps_sent(ip_address='10.0.2.15'), 32)
-
-    def test_get_pps_received_1(self):
-        self.assertEqual(controller.statistics.get_pps_received(ip_address='72.247.178.67'), 0)
-
-    def test_get_pps_received_2(self):
-        self.assertEqual(controller.statistics.get_pps_received(ip_address='10.0.2.15'), 46)
-
-    def test_get_packet_count(self):
-        self.assertEqual(controller.statistics.get_packet_count(), 1998)
-
-    def test_get_most_used_ip_address(self):
-        self.assertEqual(controller.statistics.get_most_used_ip_address(), '10.0.2.15')
-
-    def test_get_ttl_distribution_1(self):
-        self.assertEqual(controller.statistics.get_ttl_distribution(ip_address='72.247.178.67'), {64: 5})
-
-    def test_get_ttl_distribution_2(self):
-        self.assertEqual(controller.statistics.get_ttl_distribution(ip_address='10.0.2.15'), {128: 817})
-
-    def test_get_mss_distribution_1(self):
-        self.assertEqual(controller.statistics.get_mss_distribution(ip_address='72.247.178.67'), {1460: 1})
-
-    def test_get_mss_distribution_2(self):
-        self.assertEqual(controller.statistics.get_mss_distribution(ip_address='10.0.2.15'), {1460: 36})
-
-    def test_get_win_distribution_1(self):
-        self.assertEqual(controller.statistics.get_win_distribution(ip_address='72.247.178.67'), {65535: 5})
-
-    def test_get_tos_distribution_1(self):
-        self.assertEqual(controller.statistics.get_tos_distribution(ip_address='72.247.178.67'), {0: 5})
-
-    def test_get_tos_distribution_2(self):
-        self.assertEqual(controller.statistics.get_tos_distribution(ip_address='10.0.2.15'), {0: 817})
-
-    def test_get_ip_address_count(self):
-        self.assertEqual(controller.statistics.get_ip_address_count(), 22)
-
-    def test_get_ip_addresses(self):
-        self.assertEqual(controller.statistics.get_ip_addresses(), ip_addresses)
-
-    def test_get_random_ip_address(self):
-        random.seed(5)
-        self.assertEqual(controller.statistics.get_random_ip_address(), '72.247.178.113')
-
-    def test_get_random_ip_address_count_2(self):
-        random.seed(5)
-        self.assertEqual(controller.statistics.get_random_ip_address(2), ['72.247.178.113', '23.51.123.27'])
-
-    def test_get_mac_address_1(self):
-        self.assertEqual(controller.statistics.get_mac_address(ip_address='72.247.178.67'), '52:54:00:12:35:02')
-
-    def test_get_mac_address_2(self):
-        self.assertEqual(controller.statistics.get_mac_address(ip_address='10.0.2.15'), '08:00:27:a3:83:43')
-
-    def test_get_most_used_mss(self):
-        self.assertEqual(controller.statistics.get_most_used_mss(ip_address='10.0.2.15'), 1460)
-
-    def test_get_most_used_ttl(self):
-        self.assertEqual(controller.statistics.get_most_used_ttl(ip_address='10.0.2.15'), 128)
-
-    def test_is_query_no_string(self):
-        self.assertFalse(controller.statistics.is_query(42))
-
-    def test_is_query_named_query(self):
-        self.assertTrue(controller.statistics.is_query('least_used(ipaddress)'))
-
-    def test_is_query_standard_query(self):
-        self.assertTrue(controller.statistics.is_query('SELECT * from ip_statistics'))
-
-    def test_calculate_standard_deviation(self):
-        self.assertEqual(controller.statistics.calculate_standard_deviation([1, 1, 2, 3, 5, 8, 13, 21]),
-                         6.609652033201143)
-
-    def test_calculate_entropy(self):
-        self.assertEqual(controller.statistics.calculate_entropy([1, 1, 2, 3, 5, 8, 13, 21]), 2.371389165297016)
-
-    def test_calculate_entropy_normalized(self):
-        self.assertEqual(controller.statistics.calculate_entropy([1, 1, 2, 3, 5, 8, 13, 21], normalized=True),
-                         (2.371389165297016, 0.7904630550990053))
-
-    def test_calculate_complement_packet_rates_1(self):
-        cpr = controller.statistics.calculate_complement_packet_rates(0)[0:9]
-        self.assertEqual(cpr, [(186.418564, 0), (186.418824, 0), (186.419346, 0), (186.445361, 0),
-                               (186.46954399999998, 0), (186.476234, 0), (186.477304, 0), (186.48606999999998, 0),
-                               (186.486761, 0)])
-
-    def test_calculate_complement_packet_rates_2(self):
-        cpr = controller.statistics.calculate_complement_packet_rates(42)[0:9]
-        self.assertEqual(cpr, [(186.418564, 41), (186.418824, 42), (186.419346, 42), (186.445361, 42),
-                               (186.46954399999998, 42), (186.476234, 42), (186.477304, 42), (186.48606999999998, 42),
-                               (186.486761, 42)])
-
-    # NAMED QUERY TESTS
-    def test_most_used_ipaddress(self):
-        self.assertEqual(controller.statistics.process_db_query('most_used(ipaddress)'), '10.0.2.15')
-
-    def test_most_used_macaddress(self):
-        self.assertEqual(controller.statistics.process_db_query('most_used(macaddress)'), '52:54:00:12:35:02')
-
-    def test_most_used_portnumber(self):
-        self.assertEqual(controller.statistics.process_db_query('most_used(portnumber)'), 443)
-
-    def test_most_used_protocolname(self):
-        self.assertEqual(controller.statistics.process_db_query('most_used(protocolname)'), 'IPv4')
-
-    def test_most_used_ttlvalue(self):
-        self.assertEqual(controller.statistics.process_db_query('most_used(ttlvalue)'), 64)
-
-    def test_most_used_mssvalue(self):
-        self.assertEqual(controller.statistics.process_db_query('most_used(mssvalue)'), 1460)
-
-    def test_most_used_winsize(self):
-        self.assertEqual(controller.statistics.process_db_query('most_used(winsize)'), 65535)
-
-    def test_most_used_ipclass(self):
-        self.assertEqual(controller.statistics.process_db_query('most_used(ipclass)'), 'A')
-
-    def test_least_used_ipaddress(self):
-        self.assertEqual(controller.statistics.process_db_query('least_used(ipaddress)'), '72.247.178.113')
-
-    def test_least_used_macaddress(self):
-        self.assertEqual(controller.statistics.process_db_query('least_used(macaddress)'), '08:00:27:a3:83:43')
-
-    def test_least_used_portnumber(self):
-        self.assertEqual(controller.statistics.process_db_query('least_used(portnumber)'), [58645, 59844])
-
-    def test_least_used_protocolname(self):
-        self.assertEqual(controller.statistics.process_db_query('least_used(protocolname)'), 'UDP')
-
-    def test_least_used_ttlvalue(self):
-        self.assertEqual(controller.statistics.process_db_query('least_used(ttlvalue)'), 255)
-
-    def test_avg_pktsreceived(self):
-        self.assertEqual(controller.statistics.process_db_query('avg(pktsreceived)'), 90.36363636363636)
-
-    def test_avg_pktssent(self):
-        self.assertEqual(controller.statistics.process_db_query('avg(pktssent)'), 90.36363636363636)
-
-    def test_avg_kbytesreceived(self):
-        self.assertEqual(controller.statistics.process_db_query('avg(kbytesreceived)'), 30.289683948863637)
-
-    def test_avg_kbytessent(self):
-        self.assertEqual(controller.statistics.process_db_query('avg(kbytessent)'), 30.289683948863637)
-
-    def test_avg_ttlvalue(self):
-        self.assertEqual(controller.statistics.process_db_query('avg(ttlvalue)'), 75.08695652173913)
-
-    def test_avg_mss(self):
-        self.assertEqual(controller.statistics.process_db_query('avg(mss)'), 1460.0)
-
-    def test_all_ipaddress(self):
-        self.assertEqual(controller.statistics.process_db_query('all(ipaddress)'), ip_addresses)
-
-    def test_all_ttlvalue(self):
-        self.assertEqual(controller.statistics.process_db_query('all(ttlvalue)'), [64, 128, 255])
-
-    def test_all_mss(self):
-        self.assertEqual(controller.statistics.process_db_query('all(mss)'), 1460)
-
-    def test_all_macaddress(self):
-        self.assertEqual(controller.statistics.process_db_query('all(macaddress)'), ['08:00:27:a3:83:43',
-                                                                                     '52:54:00:12:35:02'])
-
-    def test_all_portnumber(self):
-        self.assertEqual(controller.statistics.process_db_query('all(portnumber)'), ports)
-
-    def test_all_protocolname(self):
-        self.assertEqual(controller.statistics.process_db_query('all(protocolname)'), ['IPv4', 'TCP', 'UDP'])
-
-    def test_nested_query(self):
-        self.assertEqual(controller.statistics.process_db_query('macaddress(ipaddress in most_used(ipaddress))'), '08:00:27:a3:83:43')

+ 45 - 0
code/Test/test_SQLQueries.py

@@ -0,0 +1,45 @@
+import unittest
+import sqlite3
+
+import ID2TLib.TestLibrary as Lib
+import Core.Controller as Ctrl
+
+controller = Ctrl.Controller(pcap_file_path=Lib.test_pcap, do_extra_tests=False, non_verbose=True)
+controller.load_pcap_statistics(flag_write_file=False, flag_recalculate_stats=True, flag_print_statistics=False)
+
+
+class UnitTestSqlQueries(unittest.TestCase):
+    def test_apostrophe(self):
+        query = "Select ipAddress from ip_Statistics where pktsSent = '5'"
+        query2 = "Select ipAddress from ip_Statistics where pktsSent = 5"
+        self.assertEqual(controller.statistics.stats_db.process_db_query(query),
+                         controller.statistics.stats_db.process_db_query(query2))
+
+    def test_parenthesis(self):
+        query = "Select (ipAddress) from (ip_Statistics) where (pktsSent) = (2 + (3))"
+        self.assertEqual("72.247.178.67", controller.statistics.stats_db.process_db_query(query))
+
+    def test_noResult(self):
+        query = "Select ipAddress from ip_statistics where ipaddress = 'abc'"
+        self.assertEqual([], controller.statistics.stats_db.process_db_query(query))
+
+    def test_severalOperator(self):
+        query1 = "Select ipAddress from ip_Statistics where pktsSent = '5'"
+        query2 = "Select ipAddress from ip_Statistics where pktsSent < '5'"
+        query3 = "Select ipAddress from ip_Statistics where pktsSent <= '5'"
+        query4 = "Select ipAddress from ip_Statistics where pktsSent > '356'"
+        query5 = "Select ipAddress from ip_Statistics where pktsSent >= '356'"
+
+        self.assertEqual("72.247.178.67", controller.statistics.stats_db.process_db_query(query1))
+        self.assertEqual("72.247.178.113", controller.statistics.stats_db.process_db_query(query2))
+        self.assertEqual(["72.247.178.67", "72.247.178.113"], controller.statistics.stats_db.process_db_query(query3))
+        self.assertEqual("10.0.2.15", controller.statistics.stats_db.process_db_query(query4))
+        self.assertEqual(["10.0.2.15", "172.217.23.174"], controller.statistics.stats_db.process_db_query(query5))
+
+        # compare of tables with different dimension
+        with self.assertRaises(sqlite3.OperationalError):
+            controller.statistics.stats_db.process_db_query('Select ipAddress from ip_Statistics where pktsSent'
+                                                            '= (Select * from ip_Statistics)')
+
+    def test_is_query_standard_query(self):
+        self.assertTrue(controller.statistics.is_query('SELECT * from ip_statistics'))

+ 148 - 0
code/Test/test_internalQueries.py

@@ -0,0 +1,148 @@
+import unittest
+import random
+
+import ID2TLib.TestLibrary as Lib
+import Core.Controller as Ctrl
+
+controller = Ctrl.Controller(pcap_file_path=Lib.test_pcap, do_extra_tests=False, non_verbose=True)
+controller.load_pcap_statistics(flag_write_file=False, flag_recalculate_stats=True, flag_print_statistics=False)
+
+ipAddresses = ['10.0.2.15', '104.83.103.45', '13.107.21.200', '131.253.61.100', '172.217.23.142', '172.217.23.174',
+               '192.168.33.254', '204.79.197.200', '23.51.123.27', '35.161.3.50', '52.11.17.245', '52.34.37.177',
+               '52.39.210.199', '52.41.250.141', '52.85.173.182', '54.149.74.139', '54.187.98.195', '54.192.44.108',
+               '54.192.44.177', '72.247.178.113', '72.247.178.67', '93.184.220.29']
+
+
+class UnitTestInternalQueries(unittest.TestCase):
+    # FILE METAINFORMATION TESTS
+    def test_get_file_information(self):
+        self.assertEqual(controller.statistics.get_file_information(),
+                         [('Pcap file path', Lib.test_pcap),
+                          ('Total packet count', 1998, 'packets'),
+                          ("Recognized packets", 1988, "packets"),
+                          ("Unrecognized packets", 10, "PDUs"), ("% Recognized packets", 99.49949949949949, "%"),
+                          ("% Unrecognized packets", 0.5005005005005005, "%"),
+                          ("Last unknown PDU", '1970-01-01 01:07:39.604899'),
+                          ('Capture duration', '25.4294414520264', 'seconds'),
+                          ('Capture start', '\t1970-01-01 01:01:45.647675'),
+                          ('Capture end', '\t1970-01-01 01:08:10.102034')])
+
+    def test_get_packet_count(self):
+        self.assertEqual(controller.statistics.get_packet_count(), 1998)
+
+    def test_get_capture_duration(self):
+        self.assertEqual(controller.statistics.get_capture_duration(), '25.4294414520264')
+
+    def test_get_pcap_timestamp_start(self):
+        self.assertEqual(controller.statistics.get_pcap_timestamp_start(), '1970-01-01 01:01:45.647675')
+
+    def test_get_pcap_timestamp_end(self):
+        self.assertEqual(controller.statistics.get_pcap_timestamp_end(), '1970-01-01 01:08:10.102034')
+
+    # FIXME: This seems to be the only testcase where float values differ slightly between macOS and Linux
+    def test_get_general_file_statistics(self):
+        file_stats = controller.statistics.get_general_file_statistics()
+        self.assertEqual(file_stats[0], ('Avg. packet rate', 78.57034301757812, 'packets/sec'))
+        self.assertEqual(file_stats[1], ('Avg. packet size', 0.0, 'kbytes'))
+        self.assertEqual(file_stats[2], ('Avg. packets sent', 90.0, 'packets'))
+        self.assertEqual(file_stats[3][0], 'Avg. bandwidth in')
+        self.assertAlmostEqual(file_stats[3][1], 9.529013633728027, places=5)
+        self.assertEqual(file_stats[3][2], 'kbit/s')
+        self.assertEqual(file_stats[4][0], 'Avg. bandwidth out')
+        self.assertAlmostEqual(file_stats[4][1], 9.529013633728027, places=5)
+
+    # INTERNAL QUERY TESTS
+    def test_get_ip_address_count(self):
+        self.assertEqual(controller.statistics.get_ip_address_count(), 22)
+
+    def test_get_ip_addresses(self):
+        self.assertEqual(controller.statistics.get_ip_addresses(), ipAddresses)
+
+    def test_get_most_used_ip_address(self):
+        self.assertEqual(controller.statistics.get_most_used_ip_address(), '10.0.2.15')
+
+    def test_get_random_ip_address(self):
+        random.seed(5)
+        self.assertEqual(controller.statistics.get_random_ip_address(), '72.247.178.113')
+
+    def test_get_random_ip_address_count_2(self):
+        random.seed(5)
+        self.assertEqual(controller.statistics.get_random_ip_address(2), ['72.247.178.113', '23.51.123.27'])
+
+    def test_get_ip_address_from_mac(self):
+        self.assertEqual(controller.statistics.get_ip_address_from_mac('08:00:27:a3:83:43'), '10.0.2.15')
+
+    def test_get_mac_address_1(self):
+        self.assertEqual(controller.statistics.get_mac_address(ip_address='72.247.178.67'), '52:54:00:12:35:02')
+
+    def test_get_mac_address_2(self):
+        self.assertEqual(controller.statistics.get_mac_address(ip_address='10.0.2.15'), '08:00:27:a3:83:43')
+
+    def test_get_most_used_mss(self):
+        self.assertEqual(controller.statistics.get_most_used_mss(ip_address='10.0.2.15'), 1460)
+
+    def test_get_most_used_ttl(self):
+        self.assertEqual(controller.statistics.get_most_used_ttl(ip_address='10.0.2.15'), 128)
+
+    def test_get_pps_sent_1(self):
+        self.assertEqual(controller.statistics.get_pps_sent(ip_address='72.247.178.67'), 0)
+
+    def test_get_pps_sent_2(self):
+        self.assertEqual(controller.statistics.get_pps_sent(ip_address='10.0.2.15'), 32)
+
+    def test_get_pps_sent_wrong_input(self):
+        # wrong input parameter
+        with self.assertRaises(TypeError):
+            self.assertEqual(controller.statistics.get_pps_sent('08:00:27:a3:83:43'), 32)
+
+    def test_get_pps_received_1(self):
+        self.assertEqual(controller.statistics.get_pps_received(ip_address='72.247.178.67'), 0)
+
+    def test_get_pps_received_2(self):
+        self.assertEqual(controller.statistics.get_pps_received(ip_address='10.0.2.15'), 46)
+
+    def test_get_ttl_distribution_1(self):
+        self.assertEqual(controller.statistics.get_ttl_distribution(ip_address='72.247.178.67'), {64: 5})
+
+    def test_get_ttl_distribution_2(self):
+        self.assertEqual(controller.statistics.get_ttl_distribution(ip_address='10.0.2.15'), {128: 817})
+
+    def test_get_mss_distribution_1(self):
+        self.assertEqual(controller.statistics.get_mss_distribution(ip_address='72.247.178.67'), {1460: 1})
+
+    def test_get_mss_distribution_2(self):
+        self.assertEqual(controller.statistics.get_mss_distribution(ip_address='10.0.2.15'), {1460: 36})
+
+    def test_get_win_distribution_1(self):
+        self.assertEqual(controller.statistics.get_win_distribution(ip_address='72.247.178.67'), {65535: 5})
+
+    def test_get_tos_distribution_1(self):
+        self.assertEqual(controller.statistics.get_tos_distribution(ip_address='72.247.178.67'), {0: 5})
+
+    def test_get_tos_distribution_2(self):
+        self.assertEqual(controller.statistics.get_tos_distribution(ip_address='10.0.2.15'), {0: 817})
+
+    # INTERNAL HELPER-FUNCTION TESTS
+    def test_calculate_standard_deviation(self):
+        self.assertEqual(controller.statistics.calculate_standard_deviation([1, 1, 2, 3, 5, 8, 13, 21]),
+                         6.609652033201143)
+
+    def test_calculate_entropy(self):
+        self.assertEqual(controller.statistics.calculate_entropy([1, 1, 2, 3, 5, 8, 13, 21]), 2.371389165297016)
+
+    def test_calculate_entropy_normalized(self):
+        self.assertEqual(controller.statistics.calculate_entropy([1, 1, 2, 3, 5, 8, 13, 21], normalized=True),
+                         (2.371389165297016, 0.7904630550990053))
+
+    def test_calculate_complement_packet_rates_1(self):
+        cpr = controller.statistics.calculate_complement_packet_rates(0)[0:9]
+        self.assertEqual(cpr, [(186.418564, 0), (186.418824, 0), (186.419346, 0), (186.445361, 0),
+                               (186.46954399999998, 0), (186.476234, 0), (186.477304, 0), (186.48606999999998, 0),
+                               (186.486761, 0)])
+
+    def test_calculate_complement_packet_rates_2(self):
+        cpr = controller.statistics.calculate_complement_packet_rates(42)[0:9]
+        self.assertEqual(cpr, [(186.418564, 41), (186.418824, 42), (186.419346, 42), (186.445361, 42),
+                               (186.46954399999998, 42), (186.476234, 42), (186.477304, 42),
+                               (186.48606999999998, 42),
+                               (186.486761, 42)])