Browse Source

Merge branch 'UT_DDoS' into unittest_master

Jens Keim 6 years ago
parent
commit
5ca5b19930
5 changed files with 119 additions and 35 deletions
  1. 4 35
      code/Attack/DDoSAttack.py
  2. 4 0
      code/ID2TLib/Statistics.py
  3. 16 0
      code/ID2TLib/TestLibrary.py
  4. 45 0
      code/ID2TLib/Utility.py
  5. 50 0
      code/Test/test_DDoS.py

+ 4 - 35
code/Attack/DDoSAttack.py

@@ -10,7 +10,8 @@ from Attack import BaseAttack
 from Attack.AttackParameters import Parameter as Param
 from Attack.AttackParameters import ParameterTypes
 from ID2TLib.Utility import update_timestamp, get_interval_pps, get_nth_random_element, index_increment, \
-    handle_most_used_outputs
+    handle_most_used_outputs, get_attacker_config
+from ID2TLib.Utility import update_timestamp, get_interval_pps, get_nth_random_element
 
 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
 # noinspection PyPep8
@@ -77,37 +78,6 @@ class DDoSAttack(BaseAttack.BaseAttack):
         self.add_param_value(Param.VICTIM_BUFFER, randint(1000,10000))
 
     def generate_attack_pcap(self):
-        def get_attacker_config(ipAddress: str):
-            """
-            Returns the attacker configuration depending on the IP address, this includes the port for the next
-            attacking packet and the previously used (fixed) TTL value.
-            :param ipAddress: The IP address of the attacker
-            :return: A tuple consisting of (port, ttlValue)
-            """
-            # Determine port
-            port = attacker_port_mapping.get(ipAddress)
-            if port is not None:  # use next port
-                next_port = attacker_port_mapping.get(ipAddress) + 1
-                if next_port > (2 ** 16 - 1):
-                    next_port = 1
-            else:  # generate starting port
-                next_port = RandShort()
-            attacker_port_mapping[ipAddress] = next_port
-            # Determine TTL value
-            ttl = attacker_ttl_mapping.get(ipAddress)
-            if ttl is None:  # determine TTL value
-                is_invalid = True
-                pos = ip_source_list.index(ipAddress)
-                pos_max = len(gd)
-                while is_invalid:
-                    ttl = int(round(gd[pos]))
-                    if 0 < ttl < 256:  # validity check
-                        is_invalid = False
-                    else:
-                        pos = index_increment(pos, pos_max)
-                attacker_ttl_mapping[ipAddress] = ttl
-            # return port and TTL
-            return next_port, ttl
         BUFFER_SIZE = 1000
 
         # Determine source IP and MAC address
@@ -176,8 +146,7 @@ class DDoSAttack(BaseAttack.BaseAttack):
         attack_duration = self.get_param_value(Param.ATTACK_DURATION)
         pkts_num = int(pps * attack_duration)
 
-        source_win_sizes = self.statistics.process_db_query(
-                "SELECT DISTINCT winSize FROM tcp_win ORDER BY RANDOM() LIMIT "+str(pkts_num)+";")
+        source_win_sizes = self.statistics.get_rnd_win_size(pkts_num)
 
         destination_win_dist = self.statistics.get_win_distribution(ip_destination)
         if len(destination_win_dist) > 0:
@@ -212,7 +181,7 @@ class DDoSAttack(BaseAttack.BaseAttack):
                 # Select one IP address and its corresponding MAC address
                 (ip_source, mac_source) = get_nth_random_element(ip_source_list, mac_source_list)
                 # Determine source port
-                (port_source, ttl_value) = get_attacker_config(ip_source)
+                (port_source, ttl_value) = get_attacker_config(ip_source_list ,ip_source)
                 request_ether = Ether(dst=mac_destination, src=mac_source)
                 request_ip = IP(src=ip_source, dst=ip_destination, ttl=ttl_value)
                 # Random win size for each packet

+ 4 - 0
code/ID2TLib/Statistics.py

@@ -574,6 +574,10 @@ class Statistics:
         else:
             return None
 
+    def get_rnd_win_size(self, pkts_num):
+        return self.process_db_query(
+            "SELECT DISTINCT winSize FROM tcp_win ORDER BY RANDOM() LIMIT "+str(pkts_num)+";")
+
     def get_statistics_database(self):
         """
         :return: A reference to the statistics database object

+ 16 - 0
code/ID2TLib/TestLibrary.py

@@ -1,5 +1,6 @@
 import os
 import hashlib
+import random
 
 from definitions import ROOT_DIR
 
@@ -108,3 +109,18 @@ def rename_test_result_files(controller, caller_function: str, attack_sub_dir=Fa
 
     os.rename(controller.label_manager.label_file_path, result_labels_path)
     controller.label_manager.label_file_path = result_labels_path
+
+
+def get_win_size(pkts_num):
+    result = []
+    for i in range(0, pkts_num):
+        result.append(10)
+    return result
+
+
+def get_attacker_config(ip_source_list, ipAddress: str):
+    next_port = random.randint(0, 2 ** 16 - 1)
+    ttl = random.randint(1, 255)
+
+    return next_port, ttl
+

+ 45 - 0
code/ID2TLib/Utility.py

@@ -5,6 +5,8 @@ from os import urandom
 from datetime import datetime
 from calendar import timegm
 from lea import Lea
+from scipy.stats import gamma
+from scapy.layers.inet import RandShort
 
 platforms = {"win7", "win10", "winxp", "win8.1", "macos", "linux", "win8", "winvista", "winnt", "win2000"}
 platform_probability = {"win7": 48.43, "win10": 27.99, "winxp": 6.07, "win8.1": 6.07, "macos": 5.94, "linux": 3.38,
@@ -18,6 +20,9 @@ x86_pseudo_nops = {b'\x97', b'\x96', b'\x95', b'\x93', b'\x92', b'\x91', b'\x99'
                    b'\x5b', b'\x59', b'\x5f', b'\x5a', b'\x5e', b'\xd6'}
 forbidden_chars = [b'\x00', b'\x0a', b'\x0d']
 
+attacker_port_mapping = {}
+attacker_ttl_mapping = {}
+
 
 def update_timestamp(timestamp, pps, delay=0):
     """
@@ -231,6 +236,7 @@ def check_payload_len(payload_len: int, limit: int):
         print("\nCustom payload too long: ", payload_len, " bytes. Should be a maximum of ", limit, " bytes.")
         exit(1)
 
+
 def get_bytes_from_file(filepath):
     """
     Converts the content of a file into its byte representation
@@ -296,3 +302,42 @@ def handle_most_used_outputs(most_used_x):
         return most_used_x[0]
     else:
         return most_used_x
+
+
+def get_attacker_config(ip_source_list, ipAddress: str):
+    """
+    Returns the attacker configuration depending on the IP address, this includes the port for the next
+    attacking packet and the previously used (fixed) TTL value.
+    :param ip_source_list: List of source IPs
+    :param ipAddress: The IP address of the attacker
+    :return: A tuple consisting of (port, ttlValue)
+    """
+    # Gamma distribution parameters derived from MAWI 13.8G dataset
+    alpha, loc, beta = (2.3261710235, -0.188306914406, 44.4853123884)
+    gd = gamma.rvs(alpha, loc=loc, scale=beta, size=len(ip_source_list))
+
+    # Determine port
+    port = attacker_port_mapping.get(ipAddress)
+    if port is not None:  # use next port
+        next_port = attacker_port_mapping.get(ipAddress) + 1
+        if next_port > (2 ** 16 - 1):
+            next_port = 1
+    else:  # generate starting port
+        next_port = RandShort()
+    attacker_port_mapping[ipAddress] = next_port
+    # Determine TTL value
+    ttl = attacker_ttl_mapping.get(ipAddress)
+    if ttl is None:  # determine TTL value
+        is_invalid = True
+        pos = ip_source_list.index(ipAddress)
+        pos_max = len(gd)
+        while is_invalid:
+            ttl = int(round(gd[pos]))
+            if 0 < ttl < 256:  # validity check
+                is_invalid = False
+            else:
+                pos = index_increment(pos, pos_max)
+        attacker_ttl_mapping[ipAddress] = ttl
+    # return port and TTL
+    return next_port, ttl
+

+ 50 - 0
code/Test/test_DDoS.py

@@ -0,0 +1,50 @@
+import unittest
+import unittest.mock as mock
+
+import ID2TLib.Statistics as Statistics
+import Test.GenericTest as GenericTest
+import ID2TLib.TestLibrary as Lib
+
+sha_basic_ddos = '52c5df968818155f6aa143d4f7eed8fe4014df595d9e561cc9c898842790bbc8'
+sha_num_attackers_ddos = 'e4e1acf27cb87445be802db7a4cad31fcce1a14221b0c65af040d14c8f30b4d1'
+# FIXME: get hash for currently broken test
+sha_dest_mac_length_zero_ddos = ''
+sha_mss_none_ddos = '52c5df968818155f6aa143d4f7eed8fe4014df595d9e561cc9c898842790bbc8'
+
+"""
+Name                             Stmts   Miss  Cover   Missing
+--------------------------------------------------------------------------------------------
+Attack/DDoSAttack.py                124     7   94%    70, 105-106, 120, 123, 141, 187
+"""
+
+
+class UnitTestDDoS(GenericTest.GenericTest):
+
+    @mock.patch.object(Statistics.Statistics, 'get_rnd_win_size', side_effect=Lib.get_win_size)
+    @mock.patch('ID2TLib.Utility.get_attacker_config', side_effect=Lib.get_attacker_config)
+    def test_basic(self, mock_get_attacker_config, mock_get_rnd_win_size):
+        self.generic_test([['DDoSAttack']],
+                          sha_basic_ddos)
+
+    @mock.patch.object(Statistics.Statistics, 'get_rnd_win_size', side_effect=Lib.get_win_size)
+    @mock.patch('ID2TLib.Utility.get_attacker_config', side_effect=Lib.get_attacker_config)
+    def test_num_attackers(self, mock_get_attacker_config, mock_get_rnd_win_size):
+        self.generic_test([['DDoSAttack', 'attackers.count=5']],
+                          sha_num_attackers_ddos)
+
+    # FIXME: currently returns 'ERROR: 'NoneType' object has no attribute 'route'
+    #@mock.patch.object(Statistics.Statistics, 'get_rnd_win_size', side_effect=get_win_size)
+    #@mock.patch('ID2TLib.Utility.get_attacker_config', side_effect=get_attacker_config)
+    #@mock.patch('ID2TLib.Statistics.Statistics.get_mac_address', return_value='')
+    #def test_dest_mac_length_zero(self, mock_dest_mac, mock_get_attacker_config, mock_get_rnd_win_size):
+    #    self.generic_test([['DDoSAttack']], sha_dest_mac_length_zero_ddos)
+
+    @mock.patch.object(Statistics.Statistics, 'get_rnd_win_size', side_effect=Lib.get_win_size)
+    @mock.patch('ID2TLib.Utility.get_attacker_config', side_effect=Lib.get_attacker_config)
+    @mock.patch('ID2TLib.Statistics.Statistics.get_most_used_mss', return_value=None)
+    def test_mss_none(self, mock_mss, mock_get_attacker_config, mock_get_rnd_win_size):
+        self.generic_test([['DDoSAttack']], sha_mss_none_ddos)
+
+
+if __name__ == '__main__':
+    unittest.main()