Browse Source

add generic temporal efficiency test

add FIXME for SQLiAttack (giant fluctuation in runtime)
Jens Keim 6 years ago
parent
commit
64d486bd3a
3 changed files with 73 additions and 47 deletions
  1. 2 2
      code/ID2TLib/AttackController.py
  2. 50 6
      code/Test/ID2TAttackTest.py
  3. 21 39
      code/Test/efficiency_testing.py

+ 2 - 2
code/ID2TLib/AttackController.py

@@ -100,8 +100,8 @@ class AttackController:
         if time:
             self.current_attack.set_finish_time()
         duration = self.current_attack.get_packet_generation_time()
-        total_packets, temp_attack_pcap_path = self.current_attack.generate_attack_pcap()
-        print("done. (total: " + str(total_packets) + " pkts", end="")
+        self.total_packets, temp_attack_pcap_path = self.current_attack.generate_attack_pcap()
+        print("done. (total: " + str(self.total_packets) + " pkts", end="")
         if time:
             print(" in ", duration, " seconds", end="")
         print(".)")

+ 50 - 6
code/Test/ID2TAttackTest.py

@@ -30,20 +30,64 @@ class ID2TAttackTest(unittest.TestCase):
         :param time: Measure time for packet generation.
         """
 
-        self.controller = Ctrl.Controller(pcap_file_path=pcap, do_extra_tests=False)
-        self.controller.load_pcap_statistics(flag_write_file, flag_recalculate_stats, flag_print_statistics)
-        self.controller.process_attacks(attack_args, [[seed]], time)
+        controller = Ctrl.Controller(pcap_file_path=pcap, do_extra_tests=False)
+        controller.load_pcap_statistics(flag_write_file, flag_recalculate_stats, flag_print_statistics)
+
+        controller.process_attacks(attack_args, [[seed]], time)
 
         caller_function = inspect.stack()[1].function
 
         try:
-            self.assertEqual(sha256_checksum, Lib.get_sha256(self.controller.pcap_dest_path))
+            self.assertEqual(sha256_checksum, Lib.get_sha256(controller.pcap_dest_path))
         except self.failureException:
-            Lib.rename_test_result_files(self.controller, caller_function, attack_sub_dir, test_sub_dir)
+            Lib.rename_test_result_files(controller, caller_function, attack_sub_dir, test_sub_dir)
             raise
 
         if cleanup:
-            Lib.clean_up(self.controller)
+            Lib.clean_up(controller)
+        else:
+            Lib.rename_test_result_files(controller, caller_function, attack_sub_dir, test_sub_dir)
+
+    def temporal_efficiency_test(self, attack_args, time_limit=15, factor=1, seed=None, cleanup=True, pcap=Lib.test_pcap,
+                                 flag_write_file=False, flag_recalculate_stats=False, flag_print_statistics=False,
+                                 attack_sub_dir=True, test_sub_dir=True):
+        """
+        Runs the attack with given aruments and monitors time efficiency.
+
+        :param attack_args: A list of attacks with their attack parameters (as defined in Controller.process_attacks).
+        :param time_limit: The given time limit in seconds.
+        :param factor: A factor to scale the generation time (e.g. only 7 pkts generated -> *10000/7 for 15 seconds).
+        :param seed: A random seed to keep random values static (care for count and order of random generation).
+        :param cleanup: Clean up attack output after testing.
+        :param pcap: The input pcap for the attack.
+        :param flag_write_file: Writes the statistics to a file.
+        :param flag_recalculate_stats: Forces the recalculation of statistics.
+        :param flag_print_statistics: Prints the statistics on the terminal.
+        :param attack_sub_dir: create sub-directory for each attack-class if True
+        :param test_sub_dir: create sub-directory for each test-function/case if True
+        """
+
+        controller = Ctrl.Controller(pcap_file_path=pcap, do_extra_tests=False)
+        controller.load_pcap_statistics(flag_write_file, flag_recalculate_stats, flag_print_statistics)
+
+        if seed is None:
+            controller.process_attacks(attack_args, time=True)
+        else:
+            controller.process_attacks(attack_args, [[seed]], time=True)
+
+        duration = controller.durations[0]*factor/controller.attack_controller.total_packets
+        print(attack_args[0][0] + ' needs ' + str(duration) + ' seconds to generate ' + str(factor) + ' packets.')
+
+        caller_function = inspect.stack()[1].function
+
+        try:
+            self.assertLessEqual(duration, time_limit)
+        except self.failureException:
+            Lib.rename_test_result_files(controller, caller_function, attack_sub_dir, test_sub_dir)
+            raise
+
+        if cleanup:
+            Lib.clean_up(controller)
         else:
             Lib.rename_test_result_files(self.controller, caller_function, attack_sub_dir, test_sub_dir)
 

+ 21 - 39
code/Test/efficiency_testing.py

@@ -1,68 +1,50 @@
 import unittest.mock as mock
 
-import ID2TLib.Utility as Util
 import ID2TLib.TestLibrary as Lib
 import Test.ID2TAttackTest as Test
 
-sha_SMBLoris_10_000 = '1433f4f69e9311ca7f64920f94992a8e8fbd045433fc0143dc47dfd25a6a02c1'
-sha_SMBLoris_100_000 = '4891720093ba32f9794431ee16815931b1866bccac58b8ef750b669742875fb0'
-sha_SMBScan_10_000 = '311cbfc28859597ce7ff58b1bdc8f0ddd733f33ab2bd83a4a7579edadce736aa'
-sha_SMBScan_100_000 = '9e55f4c2f035ec52701eabed32757f427627ce5ccf53e30bfc084680a4bf49a2'
-sha_SMBScan_hosting_10_000 = '14f92d19535332bf523e94bcb2038309844abedaa848ca7195a343256adba5f3'
-sha_SMBScan_hosting_100_000 = '029d064de82122202b6ae53d4efff2ea3318ded73a2987cb16e1e74606532766'
-sha_FTPExploit = '75290f0135b13b9d570a484fc7c674b80921a9311cd1229243ea8c547d8c08f0'
-sha_Portscan_open = '2e1deda7b36fb39705dd44dcb2350cca547cfb5049a16fb383c522dbe7e1d4e9'
-sha_Portscan_close = '3a62f594b9cd31bf7b8c455d1bb5cff3ec2beb044f6da39b066317910f10be66'
-sha_SQLi = '40ab01ef72491dcbcc3d8302b578abb5062397e9cd10d81f87aa6b5fff9f3b69'
-
 
 class EfficiencyTests(Test.ID2TAttackTest):
 
     def test_SMBLoris_10_000(self):
-        self.checksum_test([['SMBLorisAttack', 'attackers.count=30', 'packets.per-second=7.7']], sha_SMBLoris_10_000,
-                           time=True)
-        self.assertLessEqual(self.controller.durations[0], 15)
+        self.temporal_efficiency_test([['SMBLorisAttack', 'attackers.count=30', 'packets.per-second=8.0']],
+                                      time_limit=15, factor=10000)
 
     def test_SMBLoris_100_000(self):
-        self.checksum_test([['SMBLorisAttack', 'attackers.count=30', 'packets.per-second=95']], sha_SMBLoris_100_000,
-                           time=True)
-        self.assertLessEqual(self.controller.durations[0], 150)
+        self.temporal_efficiency_test([['SMBLorisAttack', 'attackers.count=30', 'packets.per-second=98']],
+                                      time_limit=150, factor=100000)
 
     def test_SMBScan_10_000(self):
-        self.checksum_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.10-192.168.197.145']],
-                           sha_SMBScan_10_000, time=True)
-        self.assertLessEqual(self.controller.durations[0], 15)
+        self.temporal_efficiency_test([['SMBScanAttack', 'ip.src=192.168.178.1',
+                                        'ip.dst=192.168.178.10-192.168.197.145']], time_limit=15, factor=10000)
 
     def test_SMBScan_100_000(self):
-        self.checksum_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.0.1-192.168.195.81']],
-                           sha_SMBScan_100_000, time=True)
-        self.assertLessEqual(self.controller.durations[0], 150)
+        self.temporal_efficiency_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.0.1-192.168.195.76']],
+                                      time_limit=150, factor=100000)
 
     def test_SMBScan_hosting_10_000(self):
-        self.checksum_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.10-192.168.181.241',
-                             'hosting.ip=192.168.178.10-192.168.181.241']], sha_SMBScan_hosting_10_000, time=True)
-        self.assertLessEqual(self.controller.durations[0], 15)
+        self.temporal_efficiency_test([['SMBScanAttack', 'ip.src=192.168.178.1',
+                                        'ip.dst=192.168.178.10-192.168.181.241',
+                                        'hosting.ip=192.168.178.10-192.168.181.241']], time_limit=15, factor=10000)
 
     def test_SMBScan_hosting_100_000(self):
-        self.checksum_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.10-192.168.217.25',
-                             'hosting.ip=192.168.178.10-192.168.217.25']], sha_SMBScan_hosting_100_000, time=True)
-        self.assertLessEqual(self.controller.durations[0], 150)
+        self.temporal_efficiency_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.10-192.168.217.25',
+                                        'hosting.ip=192.168.178.10-192.168.217.25']], time_limit=150, factor=100000)
 
     @mock.patch('ID2TLib.Utility.get_rnd_bytes', side_effect=Lib.get_bytes)
     @mock.patch('ID2TLib.Utility.get_rnd_x86_nop', side_effect=Lib.get_x86_nop)
     def test_FTPExploit(self, mock_get_rnd_x86_nop, mock_get_rnd_bytes):
-        self.checksum_test([['FTPWinaXeExploit', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.10']],
-                           sha_FTPExploit, time=True)
-        self.assertLessEqual(self.controller.durations[0]*10000/7, 15)
+        self.temporal_efficiency_test([['FTPWinaXeExploit', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.10']],
+                                      time_limit=15, factor=10000)
 
     def test_PortscanAttack_open(self):
-        self.checksum_test([['PortscanAttack', 'ip.src=192.168.178.1', 'port.open=80']], sha_Portscan_open, time=True)
-        self.assertLessEqual(self.controller.durations[0]*10000/1002, 15)
+        self.temporal_efficiency_test([['PortscanAttack', 'ip.src=192.168.178.1', 'port.open=80']], time_limit=15,
+                                      factor=10000)
 
     def test_PortscanAttack_close(self):
-        self.checksum_test([['PortscanAttack', 'ip.src=192.168.178.1', 'port.open=20']], sha_Portscan_close, time=True)
-        self.assertLessEqual(self.controller.durations[0]*10, 15)
+        self.temporal_efficiency_test([['PortscanAttack', 'ip.src=192.168.178.1', 'port.open=20']], time_limit=15,
+                                      factor=10000)
 
     def test_sqli_default(self):
-        self.checksum_test([['SQLiAttack', 'ip.dst=192.168.0.1']], sha_SQLi)
-        self.assertLessEqual(self.controller.durations[0]*10000/6423, 15)
+        # FIXME: sometimes it takes 15.34028493521018 instead of the normal 7.150923313737726 seconds
+        self.temporal_efficiency_test([['SQLiAttack', 'ip.dst=192.168.0.1']], time_limit=15, factor=10000)