فهرست منبع

Merge branch 'merge_tests' of stefan.schmidt/ID2T-toolkit into master

Carlos Garcia 6 سال پیش
والد
کامیت
197a8a4312

+ 1 - 82
code/Attack/EternalBlueExploit.py

@@ -137,90 +137,9 @@ class EternalBlueExploit(BaseAttack.BaseAttack):
         if not mss_value:
             mss_value = 1465
 
-        # Scan (MS17) for EternalBlue
-        # Read Win7_eternalblue_scan pcap file
-        orig_ip_dst = None
-        exploit_raw_packets = RawPcapReader(self.template_scan_pcap_path)
-        inter_arrival_times = self.get_inter_arrival_time(exploit_raw_packets)
-        exploit_raw_packets.close()
-        exploit_raw_packets = RawPcapReader(self.template_scan_pcap_path)
-
-        source_origin_wins, destination_origin_wins = {}, {}
-
-        for pkt_num, pkt in enumerate(exploit_raw_packets):
-            eth_frame = Ether(pkt[0])
-            ip_pkt = eth_frame.payload
-            tcp_pkt = ip_pkt.payload
-
-            if pkt_num == 0:
-                if tcp_pkt.getfieldval("dport") == smb_port:
-                    orig_ip_dst = ip_pkt.getfieldval("dst") # victim IP
-
-            # Request
-            if ip_pkt.getfieldval("dst") == orig_ip_dst: # victim IP
-                # Ether
-                eth_frame.setfieldval("src", mac_source)
-                eth_frame.setfieldval("dst", mac_destination)
-                # IP
-                ip_pkt.setfieldval("src", ip_source)
-                ip_pkt.setfieldval("dst", ip_destination)
-                ip_pkt.setfieldval("ttl", source_ttl_value)
-                # TCP
-                tcp_pkt.setfieldval("sport",port_source)
-                tcp_pkt.setfieldval("dport",port_destination)
-                ## Window Size (mapping)
-                source_origin_win = tcp_pkt.getfieldval("window")
-                if source_origin_win not in source_origin_wins:
-                    source_origin_wins[source_origin_win] = source_win_prob_dict.random()
-                new_win = source_origin_wins[source_origin_win]
-                tcp_pkt.setfieldval("window", new_win)
-                ## MSS
-                tcp_options = tcp_pkt.getfieldval("options")
-                if tcp_options:
-                    if tcp_options[0][0] == "MSS":
-                        tcp_options [0] = ("MSS",mss_value)
-                        tcp_pkt.setfieldval("options", tcp_options)
-
-                new_pkt = (eth_frame / ip_pkt / tcp_pkt)
-                new_pkt.time = timestamp_next_pkt
-
-                pps = max(Util.get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
-                timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num]#float(timeSteps.random())
-            # Reply
-            else:
-                # Ether
-                eth_frame.setfieldval("src", mac_destination)
-                eth_frame.setfieldval("dst", mac_source)
-                # IP
-                ip_pkt.setfieldval("src", ip_destination)
-                ip_pkt.setfieldval("dst", ip_source)
-                ip_pkt.setfieldval("ttl", destination_ttl_value)
-                # TCP
-                tcp_pkt.setfieldval("dport", port_source)
-                tcp_pkt.setfieldval("sport",port_destination)
-                ## Window Size
-                destination_origin_win = tcp_pkt.getfieldval("window")
-                if destination_origin_win not in destination_origin_wins:
-                    destination_origin_wins[destination_origin_win] = destination_win_prob_dict.random()
-                new_win = destination_origin_wins[destination_origin_win]
-                tcp_pkt.setfieldval("window", new_win)
-                ## MSS
-                tcp_options = tcp_pkt.getfieldval("options")
-                if tcp_options:
-                    if tcp_options[0][0] == "MSS":
-                        tcp_options[0] = ("MSS", mss_value)
-                        tcp_pkt.setfieldval("options", tcp_options)
-
-                new_pkt = (eth_frame / ip_pkt / tcp_pkt)
-                timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num]#+ float(timeSteps.random())
-                new_pkt.time = timestamp_next_pkt
-
-            packets.append(new_pkt)
-
-
         # Inject EternalBlue exploit packets
         # Read Win7_eternalblue_exploit pcap file
-        exploit_raw_packets.close()
+        source_origin_wins, destination_origin_wins = {}, {}
         exploit_raw_packets = RawPcapReader(self.template_attack_pcap_path)
 
         port_source = randint(self.minDefaultPort,self.maxDefaultPort) # experiments show this range of ports

+ 233 - 0
code/Attack/MS17ScanAttack.py

@@ -0,0 +1,233 @@
+import logging
+from random import randint, uniform
+
+from lea import Lea
+from scapy.layers.inet import Ether
+from scapy.utils import RawPcapReader
+
+from Attack import BaseAttack
+from Attack.AttackParameters import Parameter as Param
+from Attack.AttackParameters import ParameterTypes
+from ID2TLib.SMBLib import smb_port
+import ID2TLib.Utility as Util
+
+logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
+
+
+# noinspection PyPep8
+
+
+class MS17ScanAttack(BaseAttack.BaseAttack):
+    template_scan_pcap_path = Util.RESOURCE_DIR + "Win7_eternalblue_scan.pcap"
+    # Empirical values from Metasploit experiments
+    minDefaultPort = 30000
+    maxDefaultPort = 50000
+    last_conn_dst_port = 4444
+
+    def __init__(self):
+        """
+        Creates a new instance of the EternalBlue Exploit.
+
+        """
+        # Initialize attack
+        super(MS17ScanAttack, self).__init__("MS17ScanAttack", "Injects a MS17 scan'",
+                                                 "Scanning/Probing")
+
+        # Define allowed parameters and their type
+        self.supported_params.update({
+            Param.MAC_SOURCE: ParameterTypes.TYPE_MAC_ADDRESS,
+            Param.IP_SOURCE: ParameterTypes.TYPE_IP_ADDRESS,
+            Param.PORT_SOURCE: ParameterTypes.TYPE_PORT,
+            Param.MAC_DESTINATION: ParameterTypes.TYPE_MAC_ADDRESS,
+            Param.IP_DESTINATION: ParameterTypes.TYPE_IP_ADDRESS,
+            Param.PORT_DESTINATION: ParameterTypes.TYPE_PORT,
+            Param.INJECT_AT_TIMESTAMP: ParameterTypes.TYPE_FLOAT,
+            Param.INJECT_AFTER_PACKET: ParameterTypes.TYPE_PACKET_POSITION,
+            Param.PACKETS_PER_SECOND: ParameterTypes.TYPE_FLOAT
+        })
+
+    def init_params(self):
+        """
+        Initialize the parameters of this attack using the user supplied command line parameters.
+        Use the provided statistics to calculate default parameters and to process user
+        supplied queries.
+        """
+        # PARAMETERS: initialize with default utilsvalues
+        # (values are overwritten if user specifies them)
+        # Attacker configuration
+        most_used_ip_address = self.statistics.get_most_used_ip_address()
+        random_ip_address = self.statistics.get_random_ip_address()
+        while random_ip_address == most_used_ip_address:
+            random_ip_address = self.statistics.get_random_ip_address()
+        self.add_param_value(Param.IP_SOURCE, random_ip_address)
+        self.add_param_value(Param.MAC_SOURCE, self.statistics.get_mac_address(random_ip_address))
+        self.add_param_value(Param.PORT_SOURCE, randint(self.minDefaultPort, self.maxDefaultPort))
+
+        # Victim configuration
+        self.add_param_value(Param.IP_DESTINATION, most_used_ip_address)
+        destination_mac = self.statistics.get_mac_address(most_used_ip_address)
+        if isinstance(destination_mac, list) and len(destination_mac) == 0:
+            destination_mac = self.generate_random_mac_address()
+        self.add_param_value(Param.MAC_DESTINATION, destination_mac)
+        self.add_param_value(Param.PORT_DESTINATION, smb_port)
+
+        # Attack configuration
+        self.add_param_value(Param.PACKETS_PER_SECOND,
+                             (self.statistics.get_pps_sent(most_used_ip_address) +
+                              self.statistics.get_pps_received(most_used_ip_address)) / 2)
+        self.add_param_value(Param.INJECT_AFTER_PACKET, randint(0, self.statistics.get_packet_count()))
+
+    def generate_attack_pcap(self):
+
+        # Timestamp
+        timestamp_next_pkt = self.get_param_value(Param.INJECT_AT_TIMESTAMP)
+        pps = self.get_param_value(Param.PACKETS_PER_SECOND)
+
+        # calculate complement packet rates of BG traffic per interval
+        complement_interval_pps = self.statistics.calculate_complement_packet_rates(pps)
+
+        # Initialize parameters
+        packets = []
+        mac_source = self.get_param_value(Param.MAC_SOURCE)
+        ip_source = self.get_param_value(Param.IP_SOURCE)
+        port_source = self.get_param_value(Param.PORT_SOURCE)
+        mac_destination = self.get_param_value(Param.MAC_DESTINATION)
+        ip_destination = self.get_param_value(Param.IP_DESTINATION)
+        port_destination = self.get_param_value(Param.PORT_DESTINATION)
+
+        # Check ip.src == ip.dst
+        self.ip_src_dst_equal_check(ip_source, ip_destination)
+
+        path_attack_pcap = None
+
+        # Set TTL based on TTL distribution of IP address
+        source_ttl_dist = self.statistics.get_ttl_distribution(ip_source)
+        if len(source_ttl_dist) > 0:
+            source_ttl_prob_dict = Lea.fromValFreqsDict(source_ttl_dist)
+            source_ttl_value = source_ttl_prob_dict.random()
+        else:
+            source_ttl_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
+
+        destination_ttl_dist = self.statistics.get_ttl_distribution(ip_destination)
+        if len(destination_ttl_dist) > 0:
+            destination_ttl_prob_dict = Lea.fromValFreqsDict(destination_ttl_dist)
+            destination_ttl_value = destination_ttl_prob_dict.random()
+        else:
+            destination_ttl_value = Util.handle_most_used_outputs(
+                self.statistics.process_db_query("most_used(ttlValue)"))
+
+        # Set Window Size based on Window Size distribution of IP address
+        source_win_dist = self.statistics.get_win_distribution(ip_source)
+        if len(source_win_dist) > 0:
+            source_win_prob_dict = Lea.fromValFreqsDict(source_win_dist)
+        else:
+            source_win_dist = self.statistics.get_win_distribution(self.statistics.get_most_used_ip_address())
+            source_win_prob_dict = Lea.fromValFreqsDict(source_win_dist)
+
+        destination_win_dist = self.statistics.get_win_distribution(ip_destination)
+        if len(destination_win_dist) > 0:
+            destination_win_prob_dict = Lea.fromValFreqsDict(destination_win_dist)
+        else:
+            destination_win_dist = self.statistics.get_win_distribution(self.statistics.get_most_used_ip_address())
+            destination_win_prob_dict = Lea.fromValFreqsDict(destination_win_dist)
+
+        # Set MSS (Maximum Segment Size) based on MSS distribution of IP address
+        mss_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(mssValue)"))
+        if not mss_value:
+            mss_value = 1465
+
+        # Scan (MS17)
+        # Read Win7_eternalblue_scan pcap file
+        orig_ip_dst = None
+        exploit_raw_packets = RawPcapReader(self.template_scan_pcap_path)
+        inter_arrival_times = self.get_inter_arrival_time(exploit_raw_packets)
+        exploit_raw_packets.close()
+        exploit_raw_packets = RawPcapReader(self.template_scan_pcap_path)
+
+        source_origin_wins, destination_origin_wins = {}, {}
+
+        for pkt_num, pkt in enumerate(exploit_raw_packets):
+            eth_frame = Ether(pkt[0])
+            ip_pkt = eth_frame.payload
+            tcp_pkt = ip_pkt.payload
+
+            if pkt_num == 0:
+                if tcp_pkt.getfieldval("dport") == smb_port:
+                    orig_ip_dst = ip_pkt.getfieldval("dst")  # victim IP
+
+            # Request
+            if ip_pkt.getfieldval("dst") == orig_ip_dst:  # victim IP
+                # Ether
+                eth_frame.setfieldval("src", mac_source)
+                eth_frame.setfieldval("dst", mac_destination)
+                # IP
+                ip_pkt.setfieldval("src", ip_source)
+                ip_pkt.setfieldval("dst", ip_destination)
+                ip_pkt.setfieldval("ttl", source_ttl_value)
+                # TCP
+                tcp_pkt.setfieldval("sport", port_source)
+                tcp_pkt.setfieldval("dport", port_destination)
+                ## Window Size (mapping)
+                source_origin_win = tcp_pkt.getfieldval("window")
+                if source_origin_win not in source_origin_wins:
+                    source_origin_wins[source_origin_win] = source_win_prob_dict.random()
+                new_win = source_origin_wins[source_origin_win]
+                tcp_pkt.setfieldval("window", new_win)
+                ## MSS
+                tcp_options = tcp_pkt.getfieldval("options")
+                if tcp_options:
+                    if tcp_options[0][0] == "MSS":
+                        tcp_options[0] = ("MSS", mss_value)
+                        tcp_pkt.setfieldval("options", tcp_options)
+
+                new_pkt = (eth_frame / ip_pkt / tcp_pkt)
+                new_pkt.time = timestamp_next_pkt
+
+                pps = max(Util.get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
+                timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[
+                    pkt_num]  # float(timeSteps.random())
+            # Reply
+            else:
+                # Ether
+                eth_frame.setfieldval("src", mac_destination)
+                eth_frame.setfieldval("dst", mac_source)
+                # IP
+                ip_pkt.setfieldval("src", ip_destination)
+                ip_pkt.setfieldval("dst", ip_source)
+                ip_pkt.setfieldval("ttl", destination_ttl_value)
+                # TCP
+                tcp_pkt.setfieldval("dport", port_source)
+                tcp_pkt.setfieldval("sport", port_destination)
+                ## Window Size
+                destination_origin_win = tcp_pkt.getfieldval("window")
+                if destination_origin_win not in destination_origin_wins:
+                    destination_origin_wins[destination_origin_win] = destination_win_prob_dict.random()
+                new_win = destination_origin_wins[destination_origin_win]
+                tcp_pkt.setfieldval("window", new_win)
+                ## MSS
+                tcp_options = tcp_pkt.getfieldval("options")
+                if tcp_options:
+                    if tcp_options[0][0] == "MSS":
+                        tcp_options[0] = ("MSS", mss_value)
+                        tcp_pkt.setfieldval("options", tcp_options)
+
+                new_pkt = (eth_frame / ip_pkt / tcp_pkt)
+                timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[
+                    pkt_num]  # + float(timeSteps.random())
+                new_pkt.time = timestamp_next_pkt
+
+            packets.append(new_pkt)
+
+        exploit_raw_packets.close()
+
+        # Store timestamp of first packet (for attack label)
+        self.attack_start_utime = packets[0].time
+        self.attack_end_utime = packets[-1].time
+
+        if len(packets) > 0:
+            packets = sorted(packets, key=lambda pkt: pkt.time)
+            path_attack_pcap = self.write_attack_pcap(packets, True, path_attack_pcap)
+
+        # return packets sorted by packet time_sec_start
+        # pkt_num+1: because pkt_num starts at 0
+        return pkt_num + 1, path_attack_pcap

+ 13 - 5
code/ID2TLib/Utility.py

@@ -1,7 +1,7 @@
 import ipaddress
 import os
 
-from random import randint, uniform
+from random import randint, uniform, choice
 from os import urandom
 from datetime import datetime
 from calendar import timegm
@@ -71,10 +71,18 @@ def get_nth_random_element(*element_list):
     :param element_list: An arbitrary number of lists.
     :return: A tuple of the n-th element of every list.
     """
-    range_max = min([len(x) for x in element_list])
-    if range_max > 0: range_max -= 1
-    n = randint(0, range_max)
-    return tuple(x[n] for x in element_list)
+    if len(element_list) <= 0:
+        return None
+    elif len(element_list) == 1 and len(element_list[0]) > 0:
+        return choice(element_list[0])
+    else:
+        range_max = min([len(x) for x in element_list])
+        if range_max > 0:
+            range_max -= 1
+            n = randint(0, range_max)
+            return tuple(x[n] for x in element_list)
+        else:
+            return None
 
 
 def index_increment(number: int, max: int):

+ 61 - 0
code/Test/ID2TAttackTest.py

@@ -3,6 +3,7 @@ import inspect
 
 import ID2TLib.Controller as Ctrl
 import ID2TLib.TestLibrary as Lib
+import scapy.utils as pcr
 
 
 class ID2TAttackTest(unittest.TestCase):
@@ -44,3 +45,63 @@ class ID2TAttackTest(unittest.TestCase):
             Lib.clean_up(controller)
         else:
             Lib.rename_test_result_files(controller, caller_function, attack_sub_dir, test_sub_dir)
+
+    def order_test(self, attack_args, seed=None, cleanup=True, pcap=Lib.test_pcap,
+                   flag_write_file=False, flag_recalculate_stats=False, flag_print_statistics=False,
+                   attack_sub_dir=True, test_sub_dir=True):
+        """
+        Checks if the result of an attack includes all packets in correct order.
+
+        :param attack_args: A list of attacks with their attack parameters (as defined in Controller.process_attacks).
+        :param seed: A random seed to keep random values static (care for count and order of random generation).
+        :param cleanup: Clean up attack output after testing.
+        :param pcap: The input pcap for the attack.
+        :param flag_write_file: Writes the statistics to a file.
+        :param flag_recalculate_stats: Forces the recalculation of statistics.
+        :param flag_print_statistics: Prints the statistics on the terminal.
+        :param attack_sub_dir: create sub-directory for each attack-class if True
+        :param test_sub_dir: create sub-directory for each test-function/case if True
+        """
+
+        controller = Ctrl.Controller(pcap_file_path=pcap, do_extra_tests=False)
+        controller.load_pcap_statistics(flag_write_file, flag_recalculate_stats, flag_print_statistics)
+        controller.process_attacks(attack_args, [[seed]])
+
+        caller_function = inspect.stack()[1].function
+
+        try:
+            path = controller.pcap_dest_path
+            file = pcr.RawPcapReader(path)
+            packet_a = file.read_packet()
+            packet_b = file.read_packet()
+            i = 0
+
+            while packet_b is not None:
+
+                time_a = packet_a[2][0:2]
+                time_b = packet_b[2][0:2]
+
+                if time_a[0] > time_b[0]:
+                    file.close()
+                    self.fail("Packet order incorrect at: " + str(i+1) + "-" + str(i+2) +
+                              ". Current time: " + str(time_a) + " Next time: " + str(time_b))
+                elif time_a[0] == time_b[0]:
+                    if time_a[1] > time_b[1]:
+                        file.close()
+                        self.fail("Packet order incorrect at: " + str(i + 1) + "-" + str(i + 2) +
+                                  ". Current time: " + str(time_a) + " Next time: " + str(time_b))
+
+                packet_a = packet_b
+                packet_b = file.read_packet()
+                i += 1
+
+            file.close()
+
+        except self.failureException:
+            Lib.rename_test_result_files(controller, caller_function, attack_sub_dir, test_sub_dir)
+            raise
+
+        if cleanup:
+            Lib.clean_up(controller)
+        else:
+            Lib.rename_test_result_files(controller, caller_function, attack_sub_dir, test_sub_dir)

+ 3 - 0
code/Test/test_DDoS.py

@@ -32,3 +32,6 @@ class UnitTestDDoS(Test.ID2TAttackTest):
     @mock.patch('ID2TLib.Statistics.Statistics.get_most_used_mss', return_value=None)
     def test_ddos_mss_none(self, mock_mss, mock_get_attacker_config):
         self.checksum_test([['DDoSAttack']], sha_mss_none_ddos)
+
+    def test_ddos_order(self):
+        self.order_test([['DDoSAttack', 'attackers.count=2']])

+ 17 - 2
code/Test/test_EternalBlue.py

@@ -1,11 +1,26 @@
 import Test.ID2TAttackTest as Test
+import ID2TLib.TestLibrary as Lib
 
-sha_default = 'c707492a0493efcf46a569c91fe77685286402ddfdff3c79e64157b3324dc9f6'
+sha_default = '0ea04ea0ac61092aee244d56b2efd2e48056b9006c530e708f46b3cb2a9c314b'
+sha_ips_not_in_pcap = '03b7d1d2b0c9999aa607ce9ef7186c5f352d2330145a0f9774109d0f21c03aea'
+sha_multiple_params = '1f97161c38c2d586a7aedafe265747401317ecd6f1747af5216bb41af7b3aaf8'
 
 # TODO: improve coverage
 
 
 class UnitTestEternalBlue(Test.ID2TAttackTest):
 
-    def test_eternal_blue_default(self):
+    def test_eternalblue_default(self):
         self.checksum_test([['EternalBlueExploit']], sha_default)
+
+    def test_eternalblue_ips_not_in_pcap(self):
+        self.checksum_test([['EternalBlueExploit', 'ip.src=1.1.1.1', 'ip.dst=2.2.2.2']], sha_ips_not_in_pcap)
+
+    def test_eternalblue_multiple_params(self):
+        ip_src = 'ip.src='+Lib.test_pcap_ips[0]
+        ip_dst = 'ip.dst='+Lib.test_pcap_ips[1]
+        self.checksum_test([['EternalBlueExploit', ip_src, ip_dst, 'mac.src=00:0C:21:1C:60:61',
+                             'mac.dst=04:0C:32:2C:63:62', 'port.src=1337', 'port.dst=42']], sha_multiple_params)
+
+    def test_eternalblue_order(self):
+        self.order_test([['EternalBlueExploit']])

+ 3 - 0
code/Test/test_FTPWinaXeExploit.py

@@ -50,3 +50,6 @@ class UnitTestFTPWinaXeExploit(Test.ID2TAttackTest):
     @mock.patch('Attack.BaseAttack.BaseAttack.is_valid_ip_address', return_values=[False, True])
     def test_ftp_invalid_ip(self, mock_valid_ip_check, mock_get_rnd_x86_nop, mock_get_rnd_bytes):
         self.checksum_test([['FTPWinaXeExploit']], sha_valid_ip)
+
+    def test_ftp_order(self):
+        self.order_test([['FTPWinaXeExploit']])

+ 16 - 0
code/Test/test_Joomla.py

@@ -1,6 +1,9 @@
 import Test.ID2TAttackTest as Test
+import ID2TLib.TestLibrary as Lib
 
 sha_default = 'a45bd543ae7416cdc5fd76c886f48990b43075753931683407686aac2cfbc111'
+sha_ips_not_in_pcap = 'bb3926cea75624124777422b68de8f1e699b3219e279f5a9bcd789ed837aa349'
+sha_multiple_params = '6a39bafde84f30c63389c35ba24446d5aabb8e8942ee3a34974556211d6091d8'
 
 # TODO: improve coverage
 
@@ -9,3 +12,16 @@ class UnitTestJoomla(Test.ID2TAttackTest):
 
     def test_joomla_default(self):
         self.checksum_test([['JoomlaRegPrivExploit']], sha_default)
+
+    def test_joomla_ips_not_in_pcap(self):
+        self.checksum_test([['JoomlaRegPrivExploit', 'ip.src=1.1.1.1', 'ip.dst=2.2.2.2']], sha_ips_not_in_pcap)
+
+    def test_joomla_multiple_params(self):
+        ip_src = 'ip.src='+Lib.test_pcap_ips[0]
+        ip_dst = 'ip.dst='+Lib.test_pcap_ips[1]
+        self.checksum_test([['JoomlaRegPrivExploit', ip_src, ip_dst, 'mac.src=00:0C:21:1C:60:61',
+                             'mac.dst=04:0C:32:2C:63:62', 'port.dst=42',
+                             'target.host=www.ihopethisisnotarealwebsite.com']], sha_multiple_params)
+
+    def test_joomla_order(self):
+        self.order_test([['JoomlaRegPrivExploit']])

+ 24 - 0
code/Test/test_MS17ScanAttack.py

@@ -0,0 +1,24 @@
+import Test.ID2TAttackTest as Test
+import ID2TLib.TestLibrary as Lib
+
+sha_default = '7251523bec9294756ac7ced1ad8b3c53625fdad8648b86915c8a4699300ce46a'
+sha_ips_not_in_pcap = '6d150cf267fba423b5dabe44b36bee37b0d626c15041131a1f01a81f36ea3dfd'
+sha_multiple_params = '765f71390a75827fc362d55c07a2d46d74c6b918b767ae1da2706247adb60919'
+
+
+class UnitTestMS17Scan(Test.ID2TAttackTest):
+
+    def test_MS17Scan_default(self):
+        self.checksum_test([['MS17ScanAttack']], sha_default)
+
+    def test_MS17Scan_ips_not_in_pcap(self):
+        self.checksum_test([['MS17ScanAttack', 'ip.src=1.1.1.1', 'ip.dst=2.2.2.2']], sha_ips_not_in_pcap)
+
+    def test_MS17Scan_multiple_params(self):
+        ip_src = 'ip.src='+Lib.test_pcap_ips[0]
+        ip_dst = 'ip.dst='+Lib.test_pcap_ips[1]
+        self.checksum_test([['MS17ScanAttack', ip_src, ip_dst, 'mac.src=00:0C:21:1C:60:61',
+                             'mac.dst=04:0C:32:2C:63:62', 'port.src=1337', 'port.dst=42']], sha_multiple_params)
+
+    def test_MS17Scan_order(self):
+        self.order_test([['MS17ScanAttack']])

+ 5 - 15
code/Test/test_PortscanAttack.py

@@ -1,15 +1,11 @@
-import unittest.mock as mock
-
 import Test.ID2TAttackTest as Test
 
 sha_portscan_default = '6af539fb9f9a28f84a5c337a07dbdc1a11885c5c6de8f9a682bd74b89edc5130'
 sha_portscan_reverse_ports = '1c03342b7b94fdd1c9903d07237bc5239ebb7bd77a3dd137c9c378fa216c5382'
 sha_portscan_shuffle_dst_ports = '40485e47766438425900b787c4cda4ad1b5cd0d233b80f38bd45b5a88b70a797'
 sha_portscan_shuffle_src_ports = '48578b45e18bdbdc0a9f3f4cec160ccb58839250348ec4d3ec44c1b15da248de'
-sha_portscan_mss_value_zero = '8d32476a89262b78118a68867fff1d45c81f8ffb4970201f9d5ee3dfd94ba58a'
-sha_portscan_ttl_value_zero = 'ff8cf15d8e59856e0c6e43d81fa40180ebf2127042f376217cc2a20e4f21726e'
-sha_portscan_win_value_zero = 'b2fcbf72190ac3bf12192d0d7ee8c09ef87adb0d94a2610615ca76d8b577bbfb'
 sha_portscan_ip_src_random = 'c3939f30a40fa6e2164cc91dc4a7e823ca409492d44508e3edfc9d24748af0e5'
+sha_portscan_ips_not_in_pcap = '7f0f65beb8398fc1abe65b0819b6e3a5ce143fd8c9eafb2d5498b84f21cec9e1'
 
 # TODO: improve coverage
 
@@ -28,14 +24,8 @@ class UnitTestPortscanAttack(Test.ID2TAttackTest):
     def test_portscan_shuffle_src_ports(self):
         self.checksum_test([['PortscanAttack', 'port.src.shuffle=1']], sha_portscan_shuffle_src_ports)
 
-    @mock.patch('ID2TLib.Statistics.Statistics.get_mss_distribution', return_value='')
-    def test_portscan_mss_length_zero(self, mock_mss_dis):
-        self.checksum_test([['PortscanAttack']], sha_portscan_mss_value_zero)
-
-    @mock.patch('ID2TLib.Statistics.Statistics.get_ttl_distribution', return_value='')
-    def test_portscan_ttl_length_zero(self, mock_ttl_dis):
-        self.checksum_test([['PortscanAttack']], sha_portscan_ttl_value_zero)
+    def test_portscan_ips_not_in_pcap(self):
+        self.checksum_test([['PortscanAttack', 'ip.src=1.1.1.1', 'ip.dst=2.2.2.2']], sha_portscan_ips_not_in_pcap)
 
-    @mock.patch('ID2TLib.Statistics.Statistics.get_win_distribution', return_value='')
-    def test_portscan_win_length_zero(self, mock_win_dis):
-        self.checksum_test([['PortscanAttack']], sha_portscan_win_value_zero)
+    def test_portscan_order(self):
+        self.order_test([['PortscanAttack']])

+ 3 - 0
code/Test/test_SMBLoris.py

@@ -28,3 +28,6 @@ class UnitTestSMBLoris(Test.ID2TAttackTest):
     def test_smbloris_same_ip_src_dst(self):
         with self.assertRaises(SystemExit):
             self.checksum_test([['SMBLorisAttack', 'ip.src=192.168.1.240', 'ip.dst=192.168.1.240']], sha_default)
+
+    def test_smbloris_order(self):
+        self.order_test([['SMBLorisAttack', 'attackers.count=1']])

+ 3 - 0
code/Test/test_SMBScan.py

@@ -68,3 +68,6 @@ class UnitTestSMBScan(Test.ID2TAttackTest):
             self.checksum_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.5',
                                 'ip.dst.end=192.168.178.10', 'hosting.ip=192.168.178.5', 'protocol.version=2.1',
                                 'hosting.version=2.1']], sha_smb2)
+
+    def test_smbscan_order(self):
+        self.order_test([['SMBScanAttack']])

+ 15 - 0
code/Test/test_SQLi.py

@@ -1,6 +1,9 @@
 import Test.ID2TAttackTest as Test
+import ID2TLib.TestLibrary as Lib
 
 sha_default = 'a130ecdaf5fd8c09ef8418d2dbe7bd68c54e922553eb9fa703df016115393a46'
+sha_ips_not_in_pcap = 'b3174ab4b7573c317c3e87b35e14eb38d99cf33613d76cfd77b0c30cbf1f1fa2'
+sha_multiple_params = 'aac4d2015e2af52dfefc0f76fcbfca664e3420d07af8b574803f56aae70222c5'
 
 # TODO: improve coverage
 
@@ -9,3 +12,15 @@ class UnitTestSQLi(Test.ID2TAttackTest):
 
     def test_sqli_default(self):
         self.checksum_test([['SQLiAttack']], sha_default)
+
+    def test_sqli_ips_not_in_pcap(self):
+        self.checksum_test([['SQLiAttack', 'ip.src=1.1.1.1', 'ip.dst=2.2.2.2']], sha_ips_not_in_pcap)
+
+    def test_sqli_multiple_params(self):
+        ip_src = 'ip.src='+Lib.test_pcap_ips[0]
+        ip_dst = 'ip.dst='+Lib.test_pcap_ips[1]
+        self.checksum_test([['SQLiAttack', ip_src, ip_dst, 'mac.src=00:0C:21:1C:60:61',
+                             'mac.dst=04:0C:32:2C:63:62', 'port.dst=42',
+                             'target.host=www.ihopethisisnotarealwebsite.com']], sha_multiple_params)
+    def test_sqli_order(self):
+        self.order_test([['SQLiAttack']])

+ 14 - 5
code/Test/test_Utility.py

@@ -27,7 +27,6 @@ class TestUtility(unittest.TestCase):
         cipps = [(5, 1), (10, 2), (15, 3)]
         self.assertEqual(Utility.get_interval_pps(cipps, 30), 3)
 
-    # Errors if empty list and result bad if only one list
     def test_get_nth_random_element_equal_no(self):
         letters = ["A", "B", "C"]
         numbers = [1, 2, 3]
@@ -40,10 +39,16 @@ class TestUtility(unittest.TestCase):
         results = [("A", 1), ("B", 2)]
         self.assertIn(Utility.get_nth_random_element(letters, numbers), results)
 
-    # TODO: ???
-    #def test_get_nth_random_element_single_list(self):
-        #letters = ["A", "B", "C"]
-        #self.assertIn(Utility.get_nth_random_element(letters), letters)
+    def test_get_nth_random_element_single_list(self):
+        letters = ["A", "B", "C"]
+        self.assertIn(Utility.get_nth_random_element(letters), letters)
+
+    def test_get_nth_random_element_empty_list(self):
+        letters = ["A", "B", "C"]
+        self.assertEqual(Utility.get_nth_random_element(letters, []), None)
+
+    def test_get_nth_random_element_nothing(self):
+        self.assertEqual(Utility.get_nth_random_element(), None)
 
     def test_index_increment_not_max(self):
         self.assertEqual(Utility.index_increment(5, 10), 6)
@@ -136,6 +141,7 @@ class TestUtility(unittest.TestCase):
         for byte in result:
             if byte.to_bytes(1, "little") not in Utility.x86_nops and byte.to_bytes(1, "little") not in Utility.x86_pseudo_nops:
                 correct = False
+                break
         self.assertTrue(correct)
 
     def test_get_rnd_x86_nop_without_sideeffects(self):
@@ -144,6 +150,7 @@ class TestUtility(unittest.TestCase):
         for byte in result:
             if byte.to_bytes(1, "little") in Utility.x86_pseudo_nops:
                 correct = False
+                break
         self.assertTrue(correct)
 
     def test_get_rnd_x86_nop_filter(self):
@@ -152,6 +159,7 @@ class TestUtility(unittest.TestCase):
         for byte in result:
             if byte.to_bytes(1, "little") in Utility.x86_nops:
                 correct = False
+                break
         self.assertTrue(correct)
 
     def test_get_rnd_x86_nop_single_filter(self):
@@ -160,6 +168,7 @@ class TestUtility(unittest.TestCase):
         for byte in result:
             if byte.to_bytes(1, "little") == b'\x20':
                 correct = False
+                break
         self.assertTrue(correct)
 
     def test_get_rnd_bytes_number(self):