Browse Source

refactored attack recognition and related method, added tests

Jonathan Speth 6 years ago
parent
commit
352eb41658

+ 53 - 51
code/Core/AttackController.py

@@ -1,6 +1,7 @@
 import importlib
 import sys
 import difflib
+import pkgutil
 
 import Attack.AttackParameters as atkParam
 import Core.LabelManager as LabelManager
@@ -36,6 +37,56 @@ class AttackController:
         """
         self.seed = seed
 
+    @staticmethod
+    def choose_attack(input_name):
+        """"
+        Finds the attack best matching to input_name
+
+        :param input_name: The name of the attack the user put in
+        :return: The best matching attack in case one was found
+        """
+
+        import Attack
+
+        # Find all attacks, exclude some classes
+        package = Attack
+        available_attacks = []
+        for _, name, __ in pkgutil.iter_modules(package.__path__):
+            if name != 'BaseAttack' and name != 'AttackParameters':
+                available_attacks.append(name)
+
+        highest_sim = 0.0
+        highest_sim_attack = None
+        for attack in available_attacks:
+            # Check if attack name is accurate
+            if input_name == attack:
+                return attack
+            # Compares input with one of the available attacks
+            # Makes comparison with lowercase version with generic endings removed
+            # List of generic attack name endings can be found in ID2TLib.Utility
+            counter_check = attack.lower()
+            if not any(ending in input_name for ending in Util.generic_attack_names):
+                counter_check = Util.remove_generic_ending(counter_check)
+            similarity = difflib.SequenceMatcher(None, input_name.lower(), counter_check).ratio()
+            # Exact match, return appropriate attack name
+            if similarity == 1.0:
+                return attack
+            # Found more likely match
+            if similarity > highest_sim:
+                highest_sim = similarity
+                highest_sim_attack = attack
+
+        # Found no exactly matching attack name, print best match and exit
+        if highest_sim >= 0.6:
+            print('Found no attack of name ' + input_name + '. The closest match was ' + highest_sim_attack +
+                  '.  Use ./id2t -l for a list of available attacks.')
+            exit(1)
+        # Found no reasonably matching attack name, recommend -l and exit
+        else:
+            print('Found no attack of name ' + input_name + ' or one similar to it.'
+                                                            ' Use ./id2t -l for an overview of available attacks.')
+            exit(1)
+
     def create_attack(self, attack_name: str, seed=None):
         """
         Creates dynamically a new class instance based on the given attack_name.
@@ -44,57 +95,7 @@ class AttackController:
         :param seed: random seed for param generation
         :return: None
         """
-
-        def choose_attack(input_name):
-            """"
-            Finds the attack best matching to input_name
-
-            :param input_name: The name of the attack the user put in
-            :return: The best matching attack in case one was found
-            """
-
-            import pkgutil
-            import Attack
-
-            # Find all attacks, exclude some classes
-            package = Attack
-            available_attacks = []
-            for _, name, __ in pkgutil.iter_modules(package.__path__):
-                if name != 'BaseAttack' and name != 'AttackParameters':
-                    available_attacks.append(name)
-
-            highest_sim = 0.0
-            highest_sim_attack = None
-            for attack in available_attacks:
-                # Check if attack name is accurate
-                if input_name == attack:
-                    return attack
-                # Compares input with one of the available attacks
-                # Makes comparison with lowercase version with generic 'attack' and 'exploit' ending removed
-                counter_check = attack.lower()
-                if 'attack' not in input_name and 'exploit' not in input_name:
-                    counter_check = Util.rchop(attack.lower(), ('attack', 'exploit'))
-                similarity = difflib.SequenceMatcher(None, input_name.lower(), counter_check).ratio()
-                # Exact match, return appropriate attack name
-                if similarity == 1.0:
-                    return attack
-                # Found more likely match
-                if similarity > highest_sim:
-                    highest_sim = similarity
-                    highest_sim_attack = attack
-
-            # Found no exactly matching attack name, print best match and exit
-            if highest_sim >= 0.6:
-                print('Found no attack of name ' + input_name + '. The closest match was ' + highest_sim_attack +
-                      '.  Use ./id2t -l for a list of available attacks.')
-                exit(1)
-            # Found no reasonably matching attack name, recommend -l and exit
-            else:
-                print('Found no attack of name ' + input_name + ' or one similar to it.'
-                      ' Use ./id2t -l for an overview of available attacks.')
-                exit(1)
-
-        attack_name = choose_attack(attack_name)
+        attack_name = self.choose_attack(attack_name)
 
         print("\nCreating attack instance of \033[1m" + attack_name + "\033[0m")
         # Load attack class
@@ -174,6 +175,7 @@ class AttackController:
 
         return temp_attack_pcap_path, duration
 
+
     def get_attack_start_utime(self):
         """
         :return: The start time (timestamp of first packet) of the attack as unix timestamp.

+ 5 - 4
code/ID2TLib/Utility.py

@@ -31,6 +31,8 @@ forbidden_chars = [b'\x00', b'\x0a', b'\x0d']
 attacker_port_mapping = {}
 attacker_ttl_mapping = {}
 
+generic_attack_names = {"attack", "exploit"}
+
 
 def update_timestamp(timestamp, pps, delay=0):
     """
@@ -354,15 +356,14 @@ def get_attacker_config(ip_source_list, ip_address: str):
     return next_port, ttl
 
 
-def rchop(string, endings):
+def remove_generic_ending(string):
     """"
-    Returns the input string with it's ending cut off, in case it was part of 'endings'
+    Returns the input string with it's ending cut off, in case it was a generic one
 
     :param string: Input string
-    :param endings: List of possible endings to be cut off
     :return: Input string with ending cut off
     """
-    for end in endings:
+    for end in generic_attack_names:
         if string.endswith(end):
             return string[:-len(end)]
     return string

+ 21 - 0
code/Test/test_AttackController.py

@@ -0,0 +1,21 @@
+import unittest
+
+import Core.AttackController as atkCtrl
+
+
+class TestAttackController(unittest.TestCase):
+    def test_choose_attack_correct_name(self):
+        self.assertEqual(atkCtrl.AttackController.choose_attack("DDoSAttack"), "DDoSAttack")
+
+    def test_choose_attack_lower_case(self):
+        self.assertEqual(atkCtrl.AttackController.choose_attack("ddosattack"), "DDoSAttack")
+
+    def test_choose_attack_no_ending(self):
+        self.assertEqual(atkCtrl.AttackController.choose_attack("DDoS"), "DDoSAttack")
+
+    def test_choose_attack_lower_case_no_ending(self):
+        self.assertEqual(atkCtrl.AttackController.choose_attack("ddos"), "DDoSAttack")
+
+    def test_choose_attack_lower_case_invalid_name(self):
+        with self.assertRaises(SystemExit):
+            atkCtrl.AttackController.choose_attack("somewrongname")

+ 10 - 1
code/Test/test_Utility.py

@@ -226,4 +226,13 @@ class TestUtility(unittest.TestCase):
         except SystemExit:
             self.fail()
 
-            # TODO: get_attacker_config Tests
+    def test_remove_generic_ending_attack(self):
+        self.assertEqual(Utility.remove_generic_ending("someattack"), "some")
+
+    def test_remove_generic_ending_exploit(self):
+        self.assertEqual(Utility.remove_generic_ending("someexploit"), "some")
+
+    def test_remove_generic_ending_wrong_ending(self):
+        self.assertEqual(Utility.remove_generic_ending("somestuff"), "somestuff")
+
+    # TODO: get_attacker_config Tests