test_Queries.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import unittest
  2. from definitions import ROOT_DIR
  3. import ID2TLib.Controller as Ctrl
  4. pcap = ROOT_DIR + "/../resources/test/reference_1998.pcap"
  5. controller = Ctrl.Controller(pcap_file_path=pcap, do_extra_tests=False)
  6. controller.load_pcap_statistics(flag_write_file=False, flag_recalculate_stats=True, flag_print_statistics=False)
  7. file_information = [('Pcap file', ROOT_DIR + '/../resources/test/reference_1998.pcap'),
  8. ('Packets', 1998, 'packets'), ('Capture length', '25.4294414520264', 'seconds'),
  9. ('Capture start', '1970-01-01 01:01:45.647675'), ('Capture end', '1970-01-01 01:08:10.102034')]
  10. file_statistics = [('Avg. packet rate', 78.57034301757812, 'packets/sec'), ('Avg. packet size', 0.0, 'kbytes'),
  11. ('Avg. packets sent', 90.0, 'packets'), ('Avg. bandwidth in', 9.529012680053711, 'kbit/s'),
  12. ('Avg. bandwidth out', 9.529012680053711, 'kbit/s')]
  13. #FIXME: currently sorted ascending
  14. ip_addresses = ["10.0.2.15", "104.83.103.45", "13.107.21.200", "131.253.61.100","172.217.23.142",
  15. "172.217.23.174", "192.168.33.254", "204.79.197.200", "23.51.123.27", "35.161.3.50",
  16. "52.11.17.245", "52.34.37.177", "52.39.210.199", "52.41.250.141", "52.85.173.182",
  17. "54.149.74.139", "54.187.98.195", "54.192.44.108", "54.192.44.177", "72.247.178.113",
  18. "72.247.178.67", "93.184.220.29"]
  19. class TestQueries(unittest.TestCase):
  20. def test_get_file_information(self):
  21. self.assertEqual(controller.statistics.get_file_information(), file_information)
  22. def test_get_general_file_statistics(self):
  23. self.assertEqual(controller.statistics.get_general_file_statistics(), file_statistics)
  24. def test_get_capture_duration(self):
  25. self.assertEqual(controller.statistics.get_capture_duration(),'25.4294414520264')
  26. def test_get_pcap_timestamp_start(self):
  27. self.assertEqual(controller.statistics.get_pcap_timestamp_start(), '1970-01-01 01:01:45.647675')
  28. def test_get_pcap_timestamp_end(self):
  29. self.assertEqual(controller.statistics.get_pcap_timestamp_end(), '1970-01-01 01:08:10.102034')
  30. def test_get_pps_sent_1(self):
  31. self.assertEqual(controller.statistics.get_pps_sent(ip_address='72.247.178.67'), 0)
  32. def test_get_pps_sent_2(self):
  33. self.assertEqual(controller.statistics.get_pps_sent(ip_address='10.0.2.15'), 32)
  34. def test_get_pps_received_1(self):
  35. self.assertEqual(controller.statistics.get_pps_received(ip_address='72.247.178.67'), 0)
  36. def test_get_pps_received_2(self):
  37. self.assertEqual(controller.statistics.get_pps_received(ip_address='10.0.2.15'), 46)
  38. def test_get_packet_count(self):
  39. self.assertEqual(controller.statistics.get_packet_count(), 1998)
  40. def test_get_most_used_ip_address(self):
  41. self.assertEqual(controller.statistics.get_most_used_ip_address(), '10.0.2.15')
  42. def test_get_ttl_distribution_1(self):
  43. self.assertEqual(controller.statistics.get_ttl_distribution(ipAddress='72.247.178.67'), {64: 5})
  44. def test_get_ttl_distribution_2(self):
  45. self.assertEqual(controller.statistics.get_ttl_distribution(ipAddress='10.0.2.15'), {128: 817})
  46. def test_get_mss_distribution_1(self):
  47. self.assertEqual(controller.statistics.get_mss_distribution(ipAddress='72.247.178.67'), {1460: 1})
  48. def test_get_mss_distribution_2(self):
  49. self.assertEqual(controller.statistics.get_mss_distribution(ipAddress='10.0.2.15'), {1460: 36})
  50. def test_get_win_distribution_1(self):
  51. self.assertEqual(controller.statistics.get_win_distribution(ipAddress='72.247.178.67'), {65535: 5})
  52. # TODO: get win_distribution for this ip
  53. #def test_get_win_distribution_2(self):
  54. # self.assertEqual(controller.statistics.get_win_distribution(ipAddress='10.0.2.15'),'')
  55. def test_get_tos_distribution_1(self):
  56. self.assertEqual(controller.statistics.get_tos_distribution(ipAddress='72.247.178.67'), {0: 5})
  57. def test_get_tos_distribution_2(self):
  58. self.assertEqual(controller.statistics.get_tos_distribution(ipAddress='10.0.2.15'), {0: 817})
  59. def test_get_ip_address_count(self):
  60. self.assertEqual(controller.statistics.get_ip_address_count(), 22)
  61. def test_get_ip_addresses(self):
  62. self.assertEqual(controller.statistics.get_ip_addresses(), ip_addresses)
  63. #TODO: move random outside of query and use seed to test
  64. #def test_get_random_ip_address(self):
  65. # self.assertEqual(controller.statistics.get_random_ip_address(), '')
  66. def test_get_mac_address_1(self):
  67. self.assertEqual(controller.statistics.get_mac_address(ipAddress='72.247.178.67'), '52:54:00:12:35:02')
  68. def test_get_mac_address_2(self):
  69. self.assertEqual(controller.statistics.get_mac_address(ipAddress='10.0.2.15'), '08:00:27:a3:83:43')
  70. def test_get_most_used_mss(self):
  71. self.assertEqual(controller.statistics.get_most_used_mss(ipAddress='10.0.2.15'), 1460)
  72. def test_get_most_used_ttl(self):
  73. self.assertEqual(controller.statistics.get_most_used_ttl(ipAddress='10.0.2.15'), 128)
  74. def test_is_query_no_string(self):
  75. self.assertFalse(controller.statistics.is_query(42))
  76. def test_is_query_named_query(self):
  77. self.assertTrue(controller.statistics.is_query('least_used(ipaddress)'))
  78. def test_is_query_standard_query(self):
  79. self.assertTrue(controller.statistics.is_query('SELECT * from ip_statistics'))
  80. def test_calculate_standard_deviation(self):
  81. self.assertEqual(controller.statistics.calculate_standard_deviation([1,1,2,3,5,8,13,21]), 6.609652033201143)
  82. def test_calculate_entropy_unnormalized(self):
  83. self.assertEqual(controller.statistics.calculate_entropy([1,1,2,3,5,8,13,21]), 2.371389165297016)
  84. def test_calculate_entropy_normalized(self):
  85. self.assertEqual(controller.statistics.calculate_entropy([1, 1, 2, 3, 5, 8, 13, 21], normalized=True),
  86. (2.371389165297016, 0.7904630550990053))
  87. #TODO: get complement packet rates and a reasonable pps
  88. #def test_calculate_complement_packet_rates(self):
  89. # self.assertEqual(controller.statistics.calculate_complement_packet_rates(42), '')
  90. # NAMED QUERY TESTS
  91. def test_most_used_ipaddress(self):
  92. self.assertEqual(controller.statistics.process_db_query('most_used(ipaddress)'), '10.0.2.15')
  93. def test_most_used_macaddress(self):
  94. self.assertEqual(controller.statistics.process_db_query('most_used(macaddress)'), [('52:54:00:12:35:02', 21)])
  95. def test_most_used_portnumber(self):
  96. self.assertEqual(controller.statistics.process_db_query('most_used(portnumber)'), [(443, 28)])
  97. def test_most_used_protocolname(self):
  98. self.assertEqual(controller.statistics.process_db_query('most_used(protocolname)'), [('IPv4', 22)])
  99. def test_most_used_ttlvalue(self):
  100. self.assertEqual(controller.statistics.process_db_query('most_used(ttlvalue)'), 64)
  101. def test_most_used_mssvalue(self):
  102. self.assertEqual(controller.statistics.process_db_query('most_used(mssvalue)'), 1460)
  103. def test_most_used_winsize(self):
  104. self.assertEqual(controller.statistics.process_db_query('most_used(winsize)'), 65535)
  105. def test_most_used_ipclass(self):
  106. self.assertEqual(controller.statistics.process_db_query('most_used(ipclass)'), 'A')
  107. def test_least_used_ipaddress(self):
  108. self.assertEqual(controller.statistics.process_db_query('least_used(ipaddress)'), '72.247.178.113')
  109. def test_least_used_macaddress(self):
  110. self.assertEqual(controller.statistics.process_db_query('least_used(macaddress)'), [('08:00:27:a3:83:43', 1)])
  111. def test_least_used_portnumber(self):
  112. self.assertEqual(controller.statistics.process_db_query('least_used(portnumber)'), [(58645, 1), (59844, 1)])
  113. def test_least_used_protocolname(self):
  114. self.assertEqual(controller.statistics.process_db_query('least_used(protocolname)'), [('UDP', 2)])
  115. def test_least_used_ttlvalue(self):
  116. self.assertEqual(controller.statistics.process_db_query('least_used(ttlvalue)'), 255)
  117. def test_avg_pktsreceived(self):
  118. self.assertEqual(controller.statistics.process_db_query('avg(pktsreceived)'), 90.36363636363636)
  119. def test_avg_pktssent(self):
  120. self.assertEqual(controller.statistics.process_db_query('avg(pktssent)'), 90.36363636363636)
  121. def test_avg_kbytesreceived(self):
  122. self.assertEqual(controller.statistics.process_db_query('avg(kbytesreceived)'), 30.289683948863637)
  123. def test_avg_kbytessent(self):
  124. self.assertEqual(controller.statistics.process_db_query('avg(kbytessent)'), 30.289683948863637)
  125. def test_avg_ttlvalue(self):
  126. self.assertEqual(controller.statistics.process_db_query('avg(ttlvalue)'), 75.08695652173913)
  127. def test_avg_mss(self):
  128. self.assertEqual(controller.statistics.process_db_query('avg(mss)'), 1460.0)
  129. def test_all_ipaddress(self):
  130. self.assertEqual(controller.statistics.process_db_query('all(ipaddress)'), ip_addresses)
  131. def test_all_ttlvalue(self):
  132. self.assertEqual(controller.statistics.process_db_query('all(ttlvalue)'), [64, 128, 255])
  133. def test_all_mss(self):
  134. self.assertEqual(controller.statistics.process_db_query('all(mss)'), 1460)
  135. def test_all_macaddress(self):
  136. self.assertEqual(controller.statistics.process_db_query('all(macaddress)'), ['52:54:00:12:35:02', '08:00:27:a3:83:43'])
  137. # TODO: get list of ports
  138. #def test_all_portnumber(self):
  139. # self.assertEqual(controller.statistics.process_db_query('all(portnumber)'), '')
  140. def test_all_protocolname(self):
  141. self.assertEqual(controller.statistics.process_db_query('all(protocolname)'), ['IPv4', 'TCP', 'UDP'])
  142. if __name__ == '__main__':
  143. unittest.main()