test_NamedQueries.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. import unittest
  2. import pyparsing
  3. import ID2TLib.TestLibrary as Lib
  4. import Core.Controller as Ctrl
  5. controller = Ctrl.Controller(pcap_file_path=Lib.test_pcap, do_extra_tests=False, non_verbose=True)
  6. controller.load_pcap_statistics(flag_write_file=False, flag_recalculate_stats=True, flag_print_statistics=False,
  7. intervals=[], delete=True)
  8. ip_addresses = ["10.0.2.15", "104.83.103.45", "13.107.21.200", "131.253.61.100", "172.217.23.142",
  9. "172.217.23.174", "192.168.33.254", "204.79.197.200", "23.51.123.27", "35.161.3.50",
  10. "52.11.17.245", "52.34.37.177", "52.39.210.199", "52.41.250.141", "52.85.173.182",
  11. "54.149.74.139", "54.187.98.195", "54.192.44.108", "54.192.44.177", "72.247.178.113",
  12. "72.247.178.67", "93.184.220.29"]
  13. allWinSize = [0, 822, 1330, 5082, 8192, 9900, 27060, 35657, 39917, 47030, 50782, 51310, 52202, 52740, 55062,
  14. 56492, 58520, 59950, 59980, 61380, 62788, 62810, 62811, 62906, 63056, 63076, 63086, 63151, 63261, 63350,
  15. 63370, 63400, 63409, 63456, 63516, 63547, 63552, 63572, 63603, 63628, 63655, 63663, 63675, 63686, 63706,
  16. 63839, 63842, 63886, 63893, 63917, 63954, 63963, 63982, 63991, 64000, 64005, 64088, 64110, 64148, 64165,
  17. 64177, 64189, 64194, 64198, 64209, 64230, 64240, 65535]
  18. leastUsedWinASize = [822, 1330, 5082, 9900, 27060, 35657, 39917, 47030, 50782, 51310, 52202, 52740, 55062, 56492,
  19. 58520, 59950, 59980, 61380, 63056, 63963, 63982, 64000, 64005, 64198, 64230]
  20. allPort = [53, 80, 443, 49157, 49160, 49163, 49164, 49165, 49166, 49167, 49168, 49169, 49170, 49171, 49172,
  21. 49173, 49174, 49175, 49176, 49177, 49178, 49179, 49180, 49181, 49182, 49183, 49184, 49185, 49186, 49187,
  22. 49188, 49189, 49190, 49191, 49192, 49193, 49194, 49195, 49196, 49197, 49247, 49323, 49470, 49636, 49695,
  23. 49798, 49927, 49935, 49945, 50262, 50836, 50968, 51143, 51166, 51350, 51451, 51669, 51713, 52033, 52135,
  24. 52399, 52520, 52644, 52697, 52743, 52786, 52964, 52981, 53059, 53234, 53461, 53691, 53708, 53745, 53836,
  25. 54049, 54446, 54593, 54598, 54652, 54663, 54717, 54853, 54930, 55004, 55018, 55119, 55125, 55299, 55310,
  26. 55463, 55650, 55667, 55752, 55843, 55851, 56146, 56325, 56567, 56589, 56750, 57049, 57179, 57275, 57520,
  27. 57653, 57840, 57957, 57991, 58401, 58440, 58645, 58797, 58814, 58905, 58913, 58943, 59380, 59408, 59461,
  28. 59467, 59652, 59660, 59718, 59746, 59844, 60006, 60209, 60414, 60422, 60659, 60696, 60708, 60756, 60827,
  29. 60840, 61181, 61300, 61592, 61718, 61738, 61769, 61807, 62412, 62428, 62447, 62490, 62625, 62626, 62664,
  30. 63425, 64096, 64121, 64137, 64252, 64334, 64337, 64479, 64509, 64637, 64807, 64811, 65448, 65487]
  31. class UnitTestNamedQueries(unittest.TestCase):
  32. def test_most_used_ipaddress(self):
  33. self.assertEqual(controller.statistics.process_db_query('most_used(ipaddress)'), '10.0.2.15')
  34. def test_most_used_macaddress(self):
  35. self.assertEqual(controller.statistics.process_db_query('most_used(macaddress)'), '52:54:00:12:35:02')
  36. def test_most_used_portnumber(self):
  37. self.assertEqual(controller.statistics.process_db_query('most_used(portnumber)'), 443)
  38. def test_most_used_protocolname(self):
  39. self.assertEqual(controller.statistics.process_db_query('most_used(protocolname)'), 'IPv4')
  40. def test_most_used_ttlvalue(self):
  41. self.assertEqual(controller.statistics.process_db_query('most_used(ttlvalue)'), 64)
  42. def test_most_used_mssvalue(self):
  43. self.assertEqual(controller.statistics.process_db_query('most_used(mssvalue)'), 1460)
  44. def test_most_used_winsize(self):
  45. self.assertEqual(controller.statistics.process_db_query('most_used(winsize)'), 65535)
  46. def test_most_used_ipclass(self):
  47. self.assertEqual(controller.statistics.process_db_query('most_used(ipclass)'), 'A')
  48. def test_least_used_ipaddress(self):
  49. self.assertEqual(controller.statistics.process_db_query('least_used(ipaddress)'), '72.247.178.113')
  50. def test_least_used_macaddress(self):
  51. self.assertEqual(controller.statistics.process_db_query('least_used(macaddress)'), '08:00:27:a3:83:43')
  52. def test_least_used_portnumber(self):
  53. self.assertEqual(controller.statistics.process_db_query('least_used(portnumber)'), [58645, 59844])
  54. def test_least_used_protocolname(self):
  55. self.assertEqual(controller.statistics.process_db_query('least_used(protocolname)'), 'UDP')
  56. def test_least_used_ttlvalue(self):
  57. self.assertEqual(controller.statistics.process_db_query('least_used(ttlvalue)'), 255)
  58. def test_least_used_mssvalue(self):
  59. self.assertEqual(controller.statistics.process_db_query('least_used(mssvalue)'), 1460)
  60. def test_least_used_winsize(self):
  61. self.assertEqual(controller.statistics.process_db_query('least_used(winsize)'), leastUsedWinASize)
  62. def test_least_used_ipclass(self):
  63. self.assertEqual(controller.statistics.process_db_query('least_used(ipclass)'), ['A-private', 'C', 'C-private'])
  64. def test_avg_pktsreceived(self):
  65. self.assertEqual(controller.statistics.process_db_query('avg(pktsreceived)'), 90.36363636363636)
  66. def test_avg_pktssent(self):
  67. self.assertEqual(controller.statistics.process_db_query('avg(pktssent)'), 90.36363636363636)
  68. def test_avg_kbytesreceived(self):
  69. self.assertEqual(controller.statistics.process_db_query('avg(kbytesreceived)'), 30.289683948863637)
  70. def test_avg_kbytessent(self):
  71. self.assertEqual(controller.statistics.process_db_query('avg(kbytessent)'), 30.289683948863637)
  72. def test_avg_ttlvalue(self):
  73. self.assertEqual(controller.statistics.process_db_query('avg(ttlvalue)'), 75.08695652173913)
  74. def test_avg_mss(self):
  75. self.assertEqual(controller.statistics.process_db_query('avg(mss)'), 1460.0)
  76. def test_avg_ipaddress(self):
  77. with self.assertRaises(pyparsing.ParseException):
  78. controller.statistics.process_db_query('avg(ipAddress)')
  79. def test_all_ipaddress(self):
  80. self.assertEqual(controller.statistics.process_db_query('all(ipaddress)'), ip_addresses)
  81. def test_all_ttlvalue(self):
  82. self.assertEqual(controller.statistics.process_db_query('all(ttlvalue)'), [64, 128, 255])
  83. def test_all_mss(self):
  84. self.assertEqual(controller.statistics.process_db_query('all(mss)'), 1460)
  85. def test_all_macaddress(self):
  86. self.assertEqual(controller.statistics.process_db_query('all(macaddress)'), ['08:00:27:a3:83:43',
  87. '52:54:00:12:35:02'])
  88. def test_all_portnumber(self):
  89. self.assertEqual(controller.statistics.process_db_query('all(portnumber)'), allPort)
  90. def test_all_protocolname(self):
  91. self.assertEqual(controller.statistics.process_db_query('all(protocolname)'), ['IPv4', 'TCP', 'UDP'])
  92. def test_all_winsize(self):
  93. self.assertEqual(controller.statistics.process_db_query('all(winSize)'),
  94. allWinSize)
  95. def test_all_ipclass(self):
  96. self.assertEqual(controller.statistics.process_db_query('all(ipClass)'),
  97. ['A', 'A-private', 'B', 'C', 'C-private'])
  98. def test_is_query_named_query(self):
  99. self.assertTrue(controller.statistics.is_query('least_used(ipaddress)'))
  100. def test_is_query_no_string(self):
  101. self.assertFalse(controller.statistics.is_query(42))