3
0

test_Queries.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. import random
  2. import unittest
  3. import Core.Controller as Ctrl
  4. import ID2TLib.TestLibrary as Test
  5. # TODO: improve coverage
  6. controller = Ctrl.Controller(pcap_file_path=Test.test_pcap, do_extra_tests=False, non_verbose=True)
  7. controller.load_pcap_statistics(flag_write_file=False, flag_recalculate_stats=True, flag_print_statistics=False)
  8. file_information = [('Pcap file path', Test.test_pcap),
  9. ('Total packet count', 1998, 'packets'),
  10. ("Recognized packets", 1988, "packets"),
  11. ("Unrecognized packets", 10, "PDUs"), ("% Recognized packets", 99.49949949949949, "%"),
  12. ("% Unrecognized packets", 0.5005005005005005, "%"),
  13. ("Last unknown PDU", '1970-01-01 01:07:39.604899'),
  14. ('Capture duration', '25.4294414520264', 'seconds'),
  15. ('Capture start', '\t1970-01-01 01:01:45.647675'),
  16. ('Capture end', '\t1970-01-01 01:08:10.102034')]
  17. file_statistics = [('Avg. packet rate', 78.57034301757812, 'packets/sec'), ('Avg. packet size', 0.0, 'kbytes'),
  18. ('Avg. packets sent', 90.0, 'packets'), ('Avg. bandwidth in', 9.5290, 'kbit/s'),
  19. ('Avg. bandwidth out', 9.5290, 'kbit/s')]
  20. ip_addresses = ["10.0.2.15", "104.83.103.45", "13.107.21.200", "131.253.61.100", "172.217.23.142",
  21. "172.217.23.174", "192.168.33.254", "204.79.197.200", "23.51.123.27", "35.161.3.50",
  22. "52.11.17.245", "52.34.37.177", "52.39.210.199", "52.41.250.141", "52.85.173.182",
  23. "54.149.74.139", "54.187.98.195", "54.192.44.108", "54.192.44.177", "72.247.178.113",
  24. "72.247.178.67", "93.184.220.29"]
  25. ports = [53, 80, 443, 49157, 49160, 49163, 49164, 49165, 49166, 49167, 49168, 49169, 49170, 49171, 49172, 49173, 49174,
  26. 49175, 49176, 49177, 49178, 49179, 49180, 49181, 49182, 49183, 49184, 49185, 49186, 49187, 49188, 49189, 49190,
  27. 49191, 49192, 49193, 49194, 49195, 49196, 49197, 49247, 49323, 49470, 49636, 49695, 49798, 49927, 49935, 49945,
  28. 50262, 50836, 50968, 51143, 51166, 51350, 51451, 51669, 51713, 52033, 52135, 52399, 52520, 52644, 52697, 52743,
  29. 52786, 52964, 52981, 53059, 53234, 53461, 53691, 53708, 53745, 53836, 54049, 54446, 54593, 54598, 54652, 54663,
  30. 54717, 54853, 54930, 55004, 55018, 55119, 55125, 55299, 55310, 55463, 55650, 55667, 55752, 55843, 55851, 56146,
  31. 56325, 56567, 56589, 56750, 57049, 57179, 57275, 57520, 57653, 57840, 57957, 57991, 58401, 58440, 58645, 58797,
  32. 58814, 58905, 58913, 58943, 59380, 59408, 59461, 59467, 59652, 59660, 59718, 59746, 59844, 60006, 60209, 60414,
  33. 60422, 60659, 60696, 60708, 60756, 60827, 60840, 61181, 61300, 61592, 61718, 61738, 61769, 61807, 62412, 62428,
  34. 62447, 62490, 62625, 62626, 62664, 63425, 64096, 64121, 64137, 64252, 64334, 64337, 64479, 64509, 64637, 64807,
  35. 64811, 65448, 65487]
  36. class TestQueries(unittest.TestCase):
  37. def test_get_file_information(self):
  38. self.assertEqual(controller.statistics.get_file_information(), file_information)
  39. def test_get_general_file_statistics(self):
  40. file_stats = controller.statistics.get_general_file_statistics()
  41. file_stats[3] = ('Avg. bandwidth in', round(file_stats[3][1], 4), 'kbit/s')
  42. file_stats[4] = ('Avg. bandwidth out', round(file_stats[4][1], 4), 'kbit/s')
  43. self.assertEqual(file_stats, file_statistics)
  44. def test_get_capture_duration(self):
  45. self.assertEqual(controller.statistics.get_capture_duration(), '25.4294414520264')
  46. def test_get_pcap_timestamp_start(self):
  47. self.assertEqual(controller.statistics.get_pcap_timestamp_start(), '1970-01-01 01:01:45.647675')
  48. def test_get_pcap_timestamp_end(self):
  49. self.assertEqual(controller.statistics.get_pcap_timestamp_end(), '1970-01-01 01:08:10.102034')
  50. def test_get_pps_sent_1(self):
  51. self.assertEqual(controller.statistics.get_pps_sent(ip_address='72.247.178.67'), 0)
  52. def test_get_pps_sent_2(self):
  53. self.assertEqual(controller.statistics.get_pps_sent(ip_address='10.0.2.15'), 32)
  54. def test_get_pps_received_1(self):
  55. self.assertEqual(controller.statistics.get_pps_received(ip_address='72.247.178.67'), 0)
  56. def test_get_pps_received_2(self):
  57. self.assertEqual(controller.statistics.get_pps_received(ip_address='10.0.2.15'), 46)
  58. def test_get_packet_count(self):
  59. self.assertEqual(controller.statistics.get_packet_count(), 1998)
  60. def test_get_most_used_ip_address(self):
  61. self.assertEqual(controller.statistics.get_most_used_ip_address(), '10.0.2.15')
  62. def test_get_ttl_distribution_1(self):
  63. self.assertEqual(controller.statistics.get_ttl_distribution(ip_address='72.247.178.67'), {64: 5})
  64. def test_get_ttl_distribution_2(self):
  65. self.assertEqual(controller.statistics.get_ttl_distribution(ip_address='10.0.2.15'), {128: 817})
  66. def test_get_mss_distribution_1(self):
  67. self.assertEqual(controller.statistics.get_mss_distribution(ip_address='72.247.178.67'), {1460: 1})
  68. def test_get_mss_distribution_2(self):
  69. self.assertEqual(controller.statistics.get_mss_distribution(ip_address='10.0.2.15'), {1460: 36})
  70. def test_get_win_distribution_1(self):
  71. self.assertEqual(controller.statistics.get_win_distribution(ip_address='72.247.178.67'), {65535: 5})
  72. def test_get_tos_distribution_1(self):
  73. self.assertEqual(controller.statistics.get_tos_distribution(ip_address='72.247.178.67'), {0: 5})
  74. def test_get_tos_distribution_2(self):
  75. self.assertEqual(controller.statistics.get_tos_distribution(ip_address='10.0.2.15'), {0: 817})
  76. def test_get_ip_address_count(self):
  77. self.assertEqual(controller.statistics.get_ip_address_count(), 22)
  78. def test_get_ip_addresses(self):
  79. self.assertEqual(controller.statistics.get_ip_addresses(), ip_addresses)
  80. def test_get_random_ip_address(self):
  81. random.seed(5)
  82. self.assertEqual(controller.statistics.get_random_ip_address(), '72.247.178.113')
  83. def test_get_random_ip_address_count_2(self):
  84. random.seed(5)
  85. self.assertEqual(controller.statistics.get_random_ip_address(2), ['72.247.178.113', '23.51.123.27'])
  86. def test_get_mac_address_1(self):
  87. self.assertEqual(controller.statistics.get_mac_address(ip_address='72.247.178.67'), '52:54:00:12:35:02')
  88. def test_get_mac_address_2(self):
  89. self.assertEqual(controller.statistics.get_mac_address(ip_address='10.0.2.15'), '08:00:27:a3:83:43')
  90. def test_get_most_used_mss(self):
  91. self.assertEqual(controller.statistics.get_most_used_mss(ip_address='10.0.2.15'), 1460)
  92. def test_get_most_used_ttl(self):
  93. self.assertEqual(controller.statistics.get_most_used_ttl(ip_address='10.0.2.15'), 128)
  94. def test_is_query_no_string(self):
  95. self.assertFalse(controller.statistics.is_query(42))
  96. def test_is_query_named_query(self):
  97. self.assertTrue(controller.statistics.is_query('least_used(ipaddress)'))
  98. def test_is_query_standard_query(self):
  99. self.assertTrue(controller.statistics.is_query('SELECT * from ip_statistics'))
  100. def test_calculate_standard_deviation(self):
  101. self.assertEqual(controller.statistics.calculate_standard_deviation([1, 1, 2, 3, 5, 8, 13, 21]),
  102. 6.609652033201143)
  103. def test_calculate_entropy(self):
  104. self.assertEqual(controller.statistics.calculate_entropy([1, 1, 2, 3, 5, 8, 13, 21]), 2.371389165297016)
  105. def test_calculate_entropy_normalized(self):
  106. self.assertEqual(controller.statistics.calculate_entropy([1, 1, 2, 3, 5, 8, 13, 21], normalized=True),
  107. (2.371389165297016, 0.7904630550990053))
  108. def test_calculate_complement_packet_rates_1(self):
  109. cpr = controller.statistics.calculate_complement_packet_rates(0)[0:9]
  110. self.assertEqual(cpr, [(186.418564, 0), (186.418824, 0), (186.419346, 0), (186.445361, 0),
  111. (186.46954399999998, 0), (186.476234, 0), (186.477304, 0), (186.48606999999998, 0),
  112. (186.486761, 0)])
  113. def test_calculate_complement_packet_rates_2(self):
  114. cpr = controller.statistics.calculate_complement_packet_rates(42)[0:9]
  115. self.assertEqual(cpr, [(186.418564, 41), (186.418824, 42), (186.419346, 42), (186.445361, 42),
  116. (186.46954399999998, 42), (186.476234, 42), (186.477304, 42), (186.48606999999998, 42),
  117. (186.486761, 42)])
  118. # NAMED QUERY TESTS
  119. def test_most_used_ipaddress(self):
  120. self.assertEqual(controller.statistics.process_db_query('most_used(ipaddress)'), '10.0.2.15')
  121. def test_most_used_macaddress(self):
  122. self.assertEqual(controller.statistics.process_db_query('most_used(macaddress)'), '52:54:00:12:35:02')
  123. def test_most_used_portnumber(self):
  124. self.assertEqual(controller.statistics.process_db_query('most_used(portnumber)'), 443)
  125. def test_most_used_protocolname(self):
  126. self.assertEqual(controller.statistics.process_db_query('most_used(protocolname)'), 'IPv4')
  127. def test_most_used_ttlvalue(self):
  128. self.assertEqual(controller.statistics.process_db_query('most_used(ttlvalue)'), 64)
  129. def test_most_used_mssvalue(self):
  130. self.assertEqual(controller.statistics.process_db_query('most_used(mssvalue)'), 1460)
  131. def test_most_used_winsize(self):
  132. self.assertEqual(controller.statistics.process_db_query('most_used(winsize)'), 65535)
  133. def test_most_used_ipclass(self):
  134. self.assertEqual(controller.statistics.process_db_query('most_used(ipclass)'), 'A')
  135. def test_least_used_ipaddress(self):
  136. self.assertEqual(controller.statistics.process_db_query('least_used(ipaddress)'), '72.247.178.113')
  137. def test_least_used_macaddress(self):
  138. self.assertEqual(controller.statistics.process_db_query('least_used(macaddress)'), '08:00:27:a3:83:43')
  139. def test_least_used_portnumber(self):
  140. self.assertEqual(controller.statistics.process_db_query('least_used(portnumber)'), [58645, 59844])
  141. def test_least_used_protocolname(self):
  142. self.assertEqual(controller.statistics.process_db_query('least_used(protocolname)'), 'UDP')
  143. def test_least_used_ttlvalue(self):
  144. self.assertEqual(controller.statistics.process_db_query('least_used(ttlvalue)'), 255)
  145. def test_avg_pktsreceived(self):
  146. self.assertEqual(controller.statistics.process_db_query('avg(pktsreceived)'), 90.36363636363636)
  147. def test_avg_pktssent(self):
  148. self.assertEqual(controller.statistics.process_db_query('avg(pktssent)'), 90.36363636363636)
  149. def test_avg_kbytesreceived(self):
  150. self.assertEqual(controller.statistics.process_db_query('avg(kbytesreceived)'), 30.289683948863637)
  151. def test_avg_kbytessent(self):
  152. self.assertEqual(controller.statistics.process_db_query('avg(kbytessent)'), 30.289683948863637)
  153. def test_avg_ttlvalue(self):
  154. self.assertEqual(controller.statistics.process_db_query('avg(ttlvalue)'), 75.08695652173913)
  155. def test_avg_mss(self):
  156. self.assertEqual(controller.statistics.process_db_query('avg(mss)'), 1460.0)
  157. def test_all_ipaddress(self):
  158. self.assertEqual(controller.statistics.process_db_query('all(ipaddress)'), ip_addresses)
  159. def test_all_ttlvalue(self):
  160. self.assertEqual(controller.statistics.process_db_query('all(ttlvalue)'), [64, 128, 255])
  161. def test_all_mss(self):
  162. self.assertEqual(controller.statistics.process_db_query('all(mss)'), 1460)
  163. def test_all_macaddress(self):
  164. self.assertEqual(controller.statistics.process_db_query('all(macaddress)'), ['08:00:27:a3:83:43',
  165. '52:54:00:12:35:02'])
  166. def test_all_portnumber(self):
  167. self.assertEqual(controller.statistics.process_db_query('all(portnumber)'), ports)
  168. def test_all_protocolname(self):
  169. self.assertEqual(controller.statistics.process_db_query('all(protocolname)'), ['IPv4', 'TCP', 'UDP'])
  170. def test_nested_query(self):
  171. self.assertEqual(controller.statistics.process_db_query('macaddress(ipaddress in most_used(ipaddress))'), '08:00:27:a3:83:43')