Переглянути джерело

refactor Unittests

refactor ID2TAttackTest Class
refactor imports
refactor resource paths
refactor TestLibrary
refactor list handling in DDoS
refactor test_dest_mac_length_zero in DDoS UT
move path constants to Utility
update documentation
rename test_is_port
remove current coverage
remove old test.pcap
use os.path.splitext for label filename instead of custom string manipulation function
fix platform dependent issue in get_rnd_win_size
fix abort testscript early with proper exit status when tests fail
fix broken DDoS Unittest
fix get_rnd_win_size
Jens Keim 6 роки тому
батько
коміт
7c5a53aee5

+ 1 - 0
build.sh

@@ -57,6 +57,7 @@ ID2T_DIR=\$(readlink -f \$0)
 SCRIPT_PATH=\${ID2T_DIR%/*}
 cd \$SCRIPT_PATH/code
 # Execute tests
+set -e
 PYTHONWARNINGS="ignore" coverage run --source=. -m unittest discover -s Test/ >/dev/null
 coverage html
 coverage report -m

+ 8 - 0
code/Attack/BaseAttack.py

@@ -144,10 +144,15 @@ class BaseAttack(metaclass=ABCMeta):
             """
             return num < 1 or num > 65535
 
+        if ports_input is None or ports_input is "":
+            return False
+
         if isinstance(ports_input, str):
             ports_input = ports_input.replace(' ', '').split(',')
         elif isinstance(ports_input, int):
             ports_input = [ports_input]
+        elif len(ports_input) is 0:
+            return False
 
         ports_output = []
 
@@ -249,6 +254,9 @@ class BaseAttack(metaclass=ABCMeta):
     #########################################
 
     def set_seed(self, seed: int):
+        """
+        :param seed: The random seed to be set.
+        """
         if isinstance(seed, int):
             random.seed(seed)
 

+ 1 - 4
code/Attack/DDoSAttack.py

@@ -57,10 +57,7 @@ class DDoSAttack(BaseAttack.BaseAttack):
         # attacker configuration
         num_attackers = randint(1, 16)
         # The most used IP class in background traffic
-        most_used_ip_class = self.statistics.process_db_query("most_used(ipClass)")
-        if isinstance(most_used_ip_class, list):
-            most_used_ip_class.sort()
-            most_used_ip_class = most_used_ip_class[0]
+        most_used_ip_class = handle_most_used_outputs(self.statistics.process_db_query("most_used(ipClass)"))
 
         self.add_param_value(Param.IP_SOURCE, self.generate_random_ipv4_address(most_used_ip_class, num_attackers))
         self.add_param_value(Param.MAC_SOURCE, self.generate_random_mac_address(num_attackers))

+ 19 - 20
code/Attack/EternalBlueExploit.py

@@ -1,24 +1,23 @@
 import logging
-
 from random import randint, uniform
+
 from lea import Lea
-from scapy.utils import RawPcapReader
 from scapy.layers.inet import Ether
+from scapy.utils import RawPcapReader
 
-from definitions import ROOT_DIR
 from Attack import BaseAttack
 from Attack.AttackParameters import Parameter as Param
 from Attack.AttackParameters import ParameterTypes
-from ID2TLib.Utility import update_timestamp, get_interval_pps, handle_most_used_outputs
 from ID2TLib.SMBLib import smb_port
+import ID2TLib.Utility as Util
 
 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
 # noinspection PyPep8
 
 
 class EternalBlueExploit(BaseAttack.BaseAttack):
-    template_scan_pcap_path = ROOT_DIR + "/../resources/Win7_eternalblue_scan.pcap"
-    template_attack_pcap_path = ROOT_DIR + "/../resources/Win7_eternalblue_exploit.pcap"
+    template_scan_pcap_path = Util.RESOURCE_DIR + "Win7_eternalblue_scan.pcap"
+    template_attack_pcap_path = Util.RESOURCE_DIR + "Win7_eternalblue_exploit.pcap"
     # Empirical values from Metasploit experiments
     minDefaultPort = 30000
     maxDefaultPort = 50000
@@ -109,14 +108,14 @@ class EternalBlueExploit(BaseAttack.BaseAttack):
             source_ttl_prob_dict = Lea.fromValFreqsDict(source_ttl_dist)
             source_ttl_value = source_ttl_prob_dict.random()
         else:
-            source_ttl_value = handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
+            source_ttl_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
 
         destination_ttl_dist = self.statistics.get_ttl_distribution(ip_destination)
         if len(destination_ttl_dist) > 0:
             destination_ttl_prob_dict = Lea.fromValFreqsDict(destination_ttl_dist)
             destination_ttl_value = destination_ttl_prob_dict.random()
         else:
-            destination_ttl_value = handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
+            destination_ttl_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
 
         # Set Window Size based on Window Size distribution of IP address
         source_win_dist = self.statistics.get_win_distribution(ip_source)
@@ -134,7 +133,7 @@ class EternalBlueExploit(BaseAttack.BaseAttack):
             destination_win_prob_dict = Lea.fromValFreqsDict(destination_win_dist)
 
         # Set MSS (Maximum Segment Size) based on MSS distribution of IP address
-        mss_value = handle_most_used_outputs(self.statistics.process_db_query("most_used(mssValue)"))
+        mss_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(mssValue)"))
         if not mss_value:
             mss_value = 1465
 
@@ -185,8 +184,8 @@ class EternalBlueExploit(BaseAttack.BaseAttack):
                 new_pkt = (eth_frame / ip_pkt / tcp_pkt)
                 new_pkt.time = timestamp_next_pkt
 
-                pps = max(get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
-                timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num]#float(timeSteps.random())
+                pps = max(Util.get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
+                timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num]#float(timeSteps.random())
             # Reply
             else:
                 # Ether
@@ -213,7 +212,7 @@ class EternalBlueExploit(BaseAttack.BaseAttack):
                         tcp_pkt.setfieldval("options", tcp_options)
 
                 new_pkt = (eth_frame / ip_pkt / tcp_pkt)
-                timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num]#+ float(timeSteps.random())
+                timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num]#+ float(timeSteps.random())
                 new_pkt.time = timestamp_next_pkt
 
             packets.append(new_pkt)
@@ -279,8 +278,8 @@ class EternalBlueExploit(BaseAttack.BaseAttack):
                         new_pkt = (eth_frame / ip_pkt / tcp_pkt)
                         new_pkt.time = timestamp_next_pkt
 
-                        pps = max(get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
-                        timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num] #float(timeSteps.random())
+                        pps = max(Util.get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
+                        timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num] #float(timeSteps.random())
 
                     # Reply
                     else:
@@ -309,8 +308,8 @@ class EternalBlueExploit(BaseAttack.BaseAttack):
 
                         new_pkt = (eth_frame / ip_pkt / tcp_pkt)
 
-                        pps = max(get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
-                        timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num]#float(timeSteps.random())
+                        pps = max(Util.get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
+                        timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num]#float(timeSteps.random())
 
                         new_pkt.time = timestamp_next_pkt
 
@@ -352,8 +351,8 @@ class EternalBlueExploit(BaseAttack.BaseAttack):
                         new_pkt = (eth_frame / ip_pkt / tcp_pkt)
                         new_pkt.time = timestamp_next_pkt
 
-                        pps = max(get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
-                        timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num]# float(timeSteps.random())
+                        pps = max(Util.get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
+                        timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num]# float(timeSteps.random())
 
                     # Reply
                     else:
@@ -382,8 +381,8 @@ class EternalBlueExploit(BaseAttack.BaseAttack):
 
                         new_pkt = (eth_frame / ip_pkt / tcp_pkt)
 
-                        pps = max(get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
-                        timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num]# float(timeSteps.random())
+                        pps = max(Util.get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
+                        timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + inter_arrival_times[pkt_num]# float(timeSteps.random())
 
                         new_pkt.time = timestamp_next_pkt
 

+ 10 - 11
code/Attack/JoomlaRegPrivExploit.py

@@ -1,22 +1,21 @@
 import logging
-
 from random import randint
+
 from lea import Lea
-from scapy.utils import RawPcapReader
 from scapy.layers.inet import Ether
+from scapy.utils import RawPcapReader
 
-from definitions import ROOT_DIR
 from Attack import BaseAttack
 from Attack.AttackParameters import Parameter as Param
 from Attack.AttackParameters import ParameterTypes
-from ID2TLib.Utility import update_timestamp, get_interval_pps, handle_most_used_outputs
+import ID2TLib.Utility as Util
 
 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
 # noinspection PyPep8
 
 
 class JoomlaRegPrivExploit(BaseAttack.BaseAttack):
-    template_attack_pcap_path = ROOT_DIR + "/../resources/joomla_registration_privesc.pcap"
+    template_attack_pcap_path = Util.RESOURCE_DIR + "joomla_registration_privesc.pcap"
     # HTTP port
     http_port = 80
     # Metasploit experiments show this range of ports
@@ -111,14 +110,14 @@ class JoomlaRegPrivExploit(BaseAttack.BaseAttack):
             source_ttl_prob_dict = Lea.fromValFreqsDict(source_ttl_dist)
             source_ttl_value = source_ttl_prob_dict.random()
         else:
-            source_ttl_value = handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
+            source_ttl_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
 
         destination_ttl_dist = self.statistics.get_ttl_distribution(ip_destination)
         if len(destination_ttl_dist) > 0:
             destination_ttl_prob_dict = Lea.fromValFreqsDict(destination_ttl_dist)
             destination_ttl_value = destination_ttl_prob_dict.random()
         else:
-            destination_ttl_value = handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
+            destination_ttl_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
 
         # Inject Joomla_registration_privesc
         # Read joomla_registration_privesc pcap file
@@ -186,8 +185,8 @@ class JoomlaRegPrivExploit(BaseAttack.BaseAttack):
                 new_pkt = (eth_frame / ip_pkt/ tcp_pkt / str_tcp_seg)
                 new_pkt.time = timestamp_next_pkt
 
-                pps = max(get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
-                timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps) + float(timeSteps.random())
+                pps = max(Util.get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
+                timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + float(timeSteps.random())
 
             # Reply: Victim --> attacker
             else:
@@ -212,8 +211,8 @@ class JoomlaRegPrivExploit(BaseAttack.BaseAttack):
                     victim_seq += max(strLen, 1)
 
                 new_pkt = (eth_frame / ip_pkt / tcp_pkt / str_tcp_seg)
-                pps = max(get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
-                timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps) + float(timeSteps.random())
+                pps = max(Util.get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
+                timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + float(timeSteps.random())
                 new_pkt.time = timestamp_next_pkt
 
             packets.append(new_pkt)

+ 15 - 16
code/Attack/PortscanAttack.py

@@ -1,15 +1,14 @@
-import logging
 import csv
-
+import logging
 from random import shuffle, randint, choice
+
 from lea import Lea
 from scapy.layers.inet import IP, Ether, TCP
 
-from definitions import ROOT_DIR
 from Attack import BaseAttack
 from Attack.AttackParameters import Parameter as Param
 from Attack.AttackParameters import ParameterTypes
-from ID2TLib.Utility import update_timestamp, get_interval_pps, handle_most_used_outputs
+import ID2TLib.Utility as Util
 
 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
 # noinspection PyPep8
@@ -88,7 +87,7 @@ class PortscanAttack(BaseAttack.BaseAttack):
         :return: Ports numbers to be used as default destination ports or default open ports in the port scan.
         """
         ports_dst = []
-        file = open(ROOT_DIR + '/../resources/nmap-services-tcp.csv', 'rt')
+        file = open(Util.RESOURCE_DIR + 'nmap-services-tcp.csv', 'rt')
         spamreader = csv.reader(file, delimiter=',')
         for count in range(ports_num):
             # escape first row (header)
@@ -167,13 +166,13 @@ class PortscanAttack(BaseAttack.BaseAttack):
             source_mss_prob_dict = Lea.fromValFreqsDict(source_mss_dist)
             source_mss_value = source_mss_prob_dict.random()
         else:
-            source_mss_value = handle_most_used_outputs(self.statistics.process_db_query("most_used(mssValue)"))
+            source_mss_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(mssValue)"))
         destination_mss_dist = self.statistics.get_mss_distribution(ip_destination)
         if len(destination_mss_dist) > 0:
             destination_mss_prob_dict = Lea.fromValFreqsDict(destination_mss_dist)
             destination_mss_value = destination_mss_prob_dict.random()
         else:
-            destination_mss_value = handle_most_used_outputs(self.statistics.process_db_query("most_used(mssValue)"))
+            destination_mss_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(mssValue)"))
 
         # Set TTL based on TTL distribution of IP address
         source_ttl_dist = self.statistics.get_ttl_distribution(ip_source)
@@ -181,13 +180,13 @@ class PortscanAttack(BaseAttack.BaseAttack):
             source_ttl_prob_dict = Lea.fromValFreqsDict(source_ttl_dist)
             source_ttl_value = source_ttl_prob_dict.random()
         else:
-            source_ttl_value = handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
+            source_ttl_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
         destination_ttl_dist = self.statistics.get_ttl_distribution(ip_destination)
         if len(destination_ttl_dist) > 0:
             destination_ttl_prob_dict = Lea.fromValFreqsDict(destination_ttl_dist)
             destination_ttl_value = destination_ttl_prob_dict.random()
         else:
-            destination_ttl_value = handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
+            destination_ttl_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
 
         # Set Window Size based on Window Size distribution of IP address
         source_win_dist = self.statistics.get_win_distribution(ip_source)
@@ -195,13 +194,13 @@ class PortscanAttack(BaseAttack.BaseAttack):
             source_win_prob_dict = Lea.fromValFreqsDict(source_win_dist)
             source_win_value = source_win_prob_dict.random()
         else:
-            source_win_value = handle_most_used_outputs(self.statistics.process_db_query("most_used(winSize)"))
+            source_win_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(winSize)"))
         destination_win_dist = self.statistics.get_win_distribution(ip_destination)
         if len(destination_win_dist) > 0:
             destination_win_prob_dict = Lea.fromValFreqsDict(destination_win_dist)
             destination_win_value = destination_win_prob_dict.random()
         else:
-            destination_win_value = handle_most_used_outputs(self.statistics.process_db_query("most_used(winSize)"))
+            destination_win_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(winSize)"))
 
         minDelay,maxDelay = self.get_reply_delay(ip_destination)
 
@@ -233,9 +232,9 @@ class PortscanAttack(BaseAttack.BaseAttack):
                                     options=[('MSS', destination_mss_value)])
                 reply = (reply_ether / reply_ip / reply_tcp)
 
-                timestamp_reply = update_timestamp(timestamp_next_pkt,pps,minDelay)
+                timestamp_reply = Util.update_timestamp(timestamp_next_pkt,pps,minDelay)
                 while (timestamp_reply <= timestamp_prv_reply):
-                    timestamp_reply = update_timestamp(timestamp_prv_reply,pps,minDelay)
+                    timestamp_reply = Util.update_timestamp(timestamp_prv_reply,pps,minDelay)
                 timestamp_prv_reply = timestamp_reply
 
                 reply.time = timestamp_reply
@@ -246,14 +245,14 @@ class PortscanAttack(BaseAttack.BaseAttack):
                 confirm_ip = request_ip
                 confirm_tcp = TCP(sport=sport, dport=dport, seq=1, window=0, flags='R')
                 confirm = (confirm_ether / confirm_ip / confirm_tcp)
-                timestamp_confirm = update_timestamp(timestamp_reply,pps,minDelay)
+                timestamp_confirm = Util.update_timestamp(timestamp_reply,pps,minDelay)
                 confirm.time = timestamp_confirm
                 packets.append(confirm)
 
                 # else: destination port is NOT OPEN -> no reply is sent by target
 
-            pps = max(get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
-            timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps)
+            pps = max(Util.get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
+            timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps)
 
         # store end time of attack
         self.attack_end_utime = packets[-1].time

+ 11 - 12
code/Attack/SQLiAttack.py

@@ -2,21 +2,20 @@ import logging
 import random
 
 from lea import Lea
-from scapy.utils import RawPcapReader
 from scapy.layers.inet import Ether
-from ID2TLib.Utility import update_timestamp, get_interval_pps, handle_most_used_outputs
+from scapy.utils import RawPcapReader
 
-from definitions import ROOT_DIR
 from Attack import BaseAttack
 from Attack.AttackParameters import Parameter as Param
 from Attack.AttackParameters import ParameterTypes
+import ID2TLib.Utility as Util
 
 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
 # noinspection PyPep8
 
 
 class SQLiAttack(BaseAttack.BaseAttack):
-    template_attack_pcap_path = ROOT_DIR + "/../resources/ATutorSQLi.pcap"
+    template_attack_pcap_path = Util.RESOURCE_DIR + "ATutorSQLi.pcap"
     # HTTP port
     http_port = 80
     # Metasploit experiments show this range of ports
@@ -108,14 +107,14 @@ class SQLiAttack(BaseAttack.BaseAttack):
             source_ttl_prob_dict = Lea.fromValFreqsDict(source_ttl_dist)
             source_ttl_value = source_ttl_prob_dict.random()
         else:
-            source_ttl_value = handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
+            source_ttl_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
 
         destination_ttl_dist = self.statistics.get_ttl_distribution(ip_destination)
         if len(destination_ttl_dist) > 0:
             destination_ttl_prob_dict = Lea.fromValFreqsDict(destination_ttl_dist)
             destination_ttl_value = destination_ttl_prob_dict.random()
         else:
-            destination_ttl_value = handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
+            destination_ttl_value = Util.handle_most_used_outputs(self.statistics.process_db_query("most_used(ttlValue)"))
 
         # Inject SQLi Attack
         # Read SQLi Attack pcap file
@@ -187,8 +186,8 @@ class SQLiAttack(BaseAttack.BaseAttack):
                     new_pkt = (eth_frame / ip_pkt/ tcp_pkt / str_tcp_seg)
                     new_pkt.time = timestamp_next_pkt
 
-                    pps = max(get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
-                    timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps) + float(timeSteps.random())
+                    pps = max(Util.get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
+                    timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + float(timeSteps.random())
 
                 # Victim --> attacker
                 else:
@@ -213,7 +212,7 @@ class SQLiAttack(BaseAttack.BaseAttack):
                         victim_seq += max(strLen, 1)
 
                     new_pkt = (eth_frame / ip_pkt / tcp_pkt / str_tcp_seg)
-                    timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps) + float(timeSteps.random())
+                    timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + float(timeSteps.random())
                     new_pkt.time = timestamp_next_pkt
 
             # The last connection
@@ -249,8 +248,8 @@ class SQLiAttack(BaseAttack.BaseAttack):
                     new_pkt = (eth_frame / ip_pkt / tcp_pkt / str_tcp_seg)
                     new_pkt.time = timestamp_next_pkt
 
-                    pps = max(get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
-                    timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps) + float(timeSteps.random())
+                    pps = max(Util.get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
+                    timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + float(timeSteps.random())
 
                 # Victim --> attacker
                 else:
@@ -274,7 +273,7 @@ class SQLiAttack(BaseAttack.BaseAttack):
                         victim_seq += max(strLen, 1)
 
                     new_pkt = (eth_frame / ip_pkt / tcp_pkt / str_tcp_seg)
-                    timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps) + float(timeSteps.random())
+                    timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps) + float(timeSteps.random())
                     new_pkt.time = timestamp_next_pkt
 
             packets.append(new_pkt)

+ 5 - 0
code/ID2TLib/AttackController.py

@@ -23,6 +23,11 @@ class AttackController:
         self.seed = None
 
     def set_seed(self, seed: int):
+        """
+        Sets global seed.
+
+        :param seed: random seed
+        """
         self.seed = seed
 
     def create_attack(self, attack_name: str, seed=None):

+ 2 - 3
code/ID2TLib/LabelManager.py

@@ -3,7 +3,6 @@ from datetime import datetime
 from xml.dom.minidom import *
 
 import ID2TLib.Label as Label
-import ID2TLib.Utility as Utility
 
 
 class LabelManager:
@@ -29,7 +28,7 @@ class LabelManager:
         self.labels = list()
 
         if filepath_pcap is not None:
-            self.label_file_path = Utility.rreplace(filepath_pcap, '.pcap', '_labels.xml', 1)
+            self.label_file_path = os.path.splitext(filepath_pcap)[0] + '_labels.xml'
             # only load labels if label file is existing
             if os.path.exists(self.label_file_path):
                 self.load_labels()
@@ -84,7 +83,7 @@ class LabelManager:
             return timestamp_root
 
         if filepath is not None:
-            self.label_file_path = Utility.rreplace(filepath, '.pcap', '_labels.xml', 1)
+            self.label_file_path = os.path.splitext(filepath)[0] + '_labels.xml'
 
         # Generate XML
         doc = Document()

+ 12 - 2
code/ID2TLib/Statistics.py

@@ -575,8 +575,18 @@ class Statistics:
             return None
 
     def get_rnd_win_size(self, pkts_num):
-        return self.process_db_query(
-            "SELECT DISTINCT winSize FROM tcp_win ORDER BY RANDOM() LIMIT "+str(pkts_num)+";")
+        """
+        :param pkts_num: maximum number of window sizes, that should be returned
+        :return: A list of randomly chosen window sizes with given length.
+        """
+        sql_return = self.process_db_query("SELECT DISTINCT winSize FROM tcp_win ORDER BY winsize ASC;")
+        if not isinstance(sql_return, list):
+            return [sql_return]
+        result = []
+        for i in range(0, min(pkts_num, len(sql_return))):
+            result.append(random.choice(sql_return))
+            sql_return.remove(result[i])
+        return result
 
     def get_statistics_database(self):
         """

+ 38 - 38
code/ID2TLib/TestLibrary.py

@@ -1,16 +1,16 @@
-import os
 import hashlib
+import os
 import random
 
-from definitions import ROOT_DIR
+import ID2TLib.Utility as Util
 
-test_resource_dir = ROOT_DIR + "/../resources/test"
-test_pcap = ROOT_DIR + "/../resources/test/reference_1998.pcap"
+test_resource_dir = Util.TEST_DIR
+test_pcap = Util.TEST_DIR + "reference_1998.pcap"
 test_pcap_ips = ["10.0.2.15", "52.85.173.182"]
 test_pcap_empty = []
 
 """
-helper functions for generic_test
+helper functions for ID2TAttackTest
 """
 
 
@@ -42,34 +42,6 @@ def clean_up(controller):
     os.remove(controller.label_manager.label_file_path)
 
 
-"""
-function patches for unittests
-"""
-
-
-def get_bytes(count, ignore):
-    """
-    unittest patch for get_rnd_bytes (ID2TLib.Utility.py)
-
-    :param count: count of requested bytes
-    :param ignore: <not used>
-    :return: a count of As
-    """
-    return b'A' * count
-
-
-def get_x86_nop(count, side_effect_free, char_filter):
-    """
-    unittest patch for get_rnd_x86_nop (ID2TLib.Utility.py)
-
-    :param count: count of requested nops
-    :param side_effect_free: <not used>
-    :param char_filter: <not used>
-    :return: a count of \x90
-    """
-    return b'\x90' * count
-
-
 def rename_test_result_files(controller, caller_function: str, attack_sub_dir=False, test_sub_dir=False):
     """
     :param controller: controller which created output files
@@ -111,14 +83,42 @@ def rename_test_result_files(controller, caller_function: str, attack_sub_dir=Fa
     controller.label_manager.label_file_path = result_labels_path
 
 
-def get_win_size(pkts_num):
-    result = []
-    for i in range(0, pkts_num):
-        result.append(10)
-    return result
+"""
+function patches for unittests
+"""
+
+
+def get_bytes(count, ignore):
+    """
+    unittest patch for get_rnd_bytes (ID2TLib.Utility.py)
+
+    :param count: count of requested bytes
+    :param ignore: <not used>
+    :return: a count of As
+    """
+    return b'A' * count
+
+
+def get_x86_nop(count, side_effect_free, char_filter):
+    """
+    unittest patch for get_rnd_x86_nop (ID2TLib.Utility.py)
+
+    :param count: count of requested nops
+    :param side_effect_free: <not used>
+    :param char_filter: <not used>
+    :return: a count of \x90
+    """
+    return b'\x90' * count
 
 
 def get_attacker_config(ip_source_list, ipAddress: str):
+    """
+    unittest patch for get_attacker_config (ID2TLib.Utility.py)
+
+    :param ip_source_list: List of source IPs
+    :param ipAddress: The IP address of the attacker
+    :return: A tuple consisting of (port, ttlValue)
+    """
     next_port = random.randint(0, 2 ** 16 - 1)
     ttl = random.randint(1, 255)
 

+ 7 - 12
code/ID2TLib/Utility.py

@@ -1,4 +1,5 @@
 import ipaddress
+import os
 
 from random import randint, uniform
 from os import urandom
@@ -8,6 +9,12 @@ from lea import Lea
 from scipy.stats import gamma
 from scapy.layers.inet import RandShort
 
+CODE_DIR = os.path.dirname(os.path.abspath(__file__)) + "/../"
+ROOT_DIR = CODE_DIR + "../"
+RESOURCE_DIR = ROOT_DIR + "resources/"
+TEST_DIR = RESOURCE_DIR + "test/"
+
+
 platforms = {"win7", "win10", "winxp", "win8.1", "macos", "linux", "win8", "winvista", "winnt", "win2000"}
 platform_probability = {"win7": 48.43, "win10": 27.99, "winxp": 6.07, "win8.1": 6.07, "macos": 5.94, "linux": 3.38,
                         "win8": 1.35, "winvista": 0.46, "winnt": 0.31}
@@ -340,15 +347,3 @@ def get_attacker_config(ip_source_list, ipAddress: str):
         attacker_ttl_mapping[ipAddress] = ttl
     # return port and TTL
     return next_port, ttl
-
-
-def rreplace(s, old, new, maxreplace):
-    """
-    Replaces occurences of a sub-string with a new sub-string, but, unlike replace(), it starts at the end
-    :param s: The string to search and replace from.
-    :param old: The old sub-string you wish to replace.
-    :param new: The new sub-string you wish to put in-place of the old one.
-    :param maxreplace: The maximum number of times you wish to replace the sub-string.
-    :return: The result of the reverse replace operation.
-    """
-    return new.join(s.rsplit(old, maxreplace))

+ 0 - 28
code/Test/GenericTest.py

@@ -1,28 +0,0 @@
-import unittest
-import inspect
-
-import ID2TLib.Controller as Ctrl
-import ID2TLib.TestLibrary as Lib
-
-
-class GenericTest(unittest.TestCase):
-
-    def generic_test(self, attack_args, sha_checksum, seed=5, cleanup=True, pcap=Lib.test_pcap, flag_write_file=False,
-                     flag_recalculate_stats=False, flag_print_statistics=False, attack_sub_dir=True, test_sub_dir=True):
-        # TODO: move seed to attacks
-        controller = Ctrl.Controller(pcap_file_path=pcap, do_extra_tests=False)
-        controller.load_pcap_statistics(flag_write_file, flag_recalculate_stats, flag_print_statistics)
-        controller.process_attacks(attack_args, [[seed]])
-
-        caller_function = inspect.stack()[1].function
-
-        try:
-            self.assertEqual(sha_checksum, Lib.get_sha256(controller.pcap_dest_path))
-        except self.failureException:
-            Lib.rename_test_result_files(controller, caller_function, attack_sub_dir, test_sub_dir)
-            raise
-
-        if cleanup:
-            Lib.clean_up(controller)
-        else:
-            Lib.rename_test_result_files(controller, caller_function, attack_sub_dir, test_sub_dir)

+ 46 - 0
code/Test/ID2TAttackTest.py

@@ -0,0 +1,46 @@
+import unittest
+import inspect
+
+import ID2TLib.Controller as Ctrl
+import ID2TLib.TestLibrary as Lib
+
+
+class ID2TAttackTest(unittest.TestCase):
+    """
+    Generic Test Class for ID2T attacks based on unittest.TestCase.
+    """
+
+    def checksum_test(self, attack_args, sha256_checksum, seed=5, cleanup=True, pcap=Lib.test_pcap,
+                      flag_write_file=False, flag_recalculate_stats=False, flag_print_statistics=False,
+                      attack_sub_dir=True, test_sub_dir=True):
+        """
+        Runs the attack against a given sha256 checksum.
+
+        :param attack_args: A list of attacks with their attack parameters (as defined in Controller.process_attacks).
+        :param sha256_checksum: The checksum to verify the result pcap.
+        :param seed: A random seed to keep random values static (care for count and order of random generation).
+        :param cleanup: Clean up attack output after testing.
+        :param pcap: The input pcap for the attack.
+        :param flag_write_file: Writes the statistics to a file.
+        :param flag_recalculate_stats: Forces the recalculation of statistics.
+        :param flag_print_statistics: Prints the statistics on the terminal.
+        :param attack_sub_dir: create sub-directory for each attack-class if True
+        :param test_sub_dir: create sub-directory for each test-function/case if True
+        """
+
+        controller = Ctrl.Controller(pcap_file_path=pcap, do_extra_tests=False)
+        controller.load_pcap_statistics(flag_write_file, flag_recalculate_stats, flag_print_statistics)
+        controller.process_attacks(attack_args, [[seed]])
+
+        caller_function = inspect.stack()[1].function
+
+        try:
+            self.assertEqual(sha256_checksum, Lib.get_sha256(controller.pcap_dest_path))
+        except self.failureException:
+            Lib.rename_test_result_files(controller, caller_function, attack_sub_dir, test_sub_dir)
+            raise
+
+        if cleanup:
+            Lib.clean_up(controller)
+        else:
+            Lib.rename_test_result_files(controller, caller_function, attack_sub_dir, test_sub_dir)

+ 65 - 61
code/Test/test_BaseAttack.py

@@ -1,178 +1,182 @@
 import unittest
-import Attack.BaseAttack as BaseAttack
+import Attack.BaseAttack as BA
+
+# TODO: improve coverage
 
 
 class TestBaseAttack(unittest.TestCase):
 
     def test_is_mac_address_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_mac_address("00:80:41:ae:fd:7e"))
+        self.assertTrue(BA.BaseAttack._is_mac_address("00:80:41:ae:fd:7e"))
 
     def test_is_mac_address_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_mac_address("00:80:41:aec:fd:7e"))
+        self.assertFalse(BA.BaseAttack._is_mac_address("00:80:41:aec:fd:7e"))
 
     def test_is_mac_address_empty(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_mac_address(""))
+        self.assertFalse(BA.BaseAttack._is_mac_address(""))
 
     def test_is_mac_address_minus_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_mac_address("00-80-41-ae-fd-7e"))
+        self.assertTrue(BA.BaseAttack._is_mac_address("00-80-41-ae-fd-7e"))
 
     def test_is_mac_address_minus_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_mac_address("00-80-41-aec-fd-7e"))
+        self.assertFalse(BA.BaseAttack._is_mac_address("00-80-41-aec-fd-7e"))
 
     def test_is_mac_address_list_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_mac_address(["00:80:41:ae:fd:7e", "00-80-41-ae-fd-7e"]))
+        self.assertTrue(BA.BaseAttack._is_mac_address(["00:80:41:ae:fd:7e", "00-80-41-ae-fd-7e"]))
 
     def test_is_mac_address_list_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_mac_address(["00:80:41:aec:fd:7e", "00-80-41-aec-fd-7e"]))
+        self.assertFalse(BA.BaseAttack._is_mac_address(["00:80:41:aec:fd:7e", "00-80-41-aec-fd-7e"]))
 
     def test_is_ip_address_empty(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_ip_address("")[0])
+        self.assertFalse(BA.BaseAttack._is_ip_address("")[0])
 
     def test_is_ip_address_v4_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_ip_address("192.168.178.1")[0])
+        self.assertTrue(BA.BaseAttack._is_ip_address("192.168.178.1")[0])
 
     def test_is_ip_address_v4_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_ip_address("192.1689.178.1")[0])
+        self.assertFalse(BA.BaseAttack._is_ip_address("192.1689.178.1")[0])
 
     def test_is_ip_address_v6_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_ip_address("2001:0db8:85a3:08d3:1319:8a2e:0370:7344")[0])
+        self.assertTrue(BA.BaseAttack._is_ip_address("2001:0db8:85a3:08d3:1319:8a2e:0370:7344")[0])
 
     def test_is_ip_address_v6_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_ip_address("2001:0db8:85a3:08d3X:1319:8a2e:0370:7344")[0])
+        self.assertFalse(BA.BaseAttack._is_ip_address("2001:0db8:85a3:08d3X:1319:8a2e:0370:7344")[0])
 
     def test_is_ip_address_v6_shortened_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_ip_address("2001:0db8:85a3:08d3:1319::0370:7344")[0])
+        self.assertTrue(BA.BaseAttack._is_ip_address("2001:0db8:85a3:08d3:1319::0370:7344")[0])
 
     def test_is_ip_address_v6_shortened_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_ip_address("2001::85a3:08d3X::8a2e:0370:7344")[0])
+        self.assertFalse(BA.BaseAttack._is_ip_address("2001::85a3:08d3X::8a2e:0370:7344")[0])
 
     def test_is_ip_address_list_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_ip_address(["192.168.178.1", "192.168.178.10"])[0])
+        self.assertTrue(BA.BaseAttack._is_ip_address(["192.168.178.1", "192.168.178.10"])[0])
 
     def test_is_ip_address_list_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_ip_address(["192.1689.178.1", "192.168.178.10"])[0])
+        self.assertFalse(BA.BaseAttack._is_ip_address(["192.1689.178.1", "192.168.178.10"])[0])
 
     def test_is_ip_address_comma_list_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_ip_address("192.168.178.1,192.168.178.10")[0])
+        self.assertTrue(BA.BaseAttack._is_ip_address("192.168.178.1,192.168.178.10")[0])
 
     def test_is_ip_address_comma_list_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_ip_address("192.168.178.1,192.1689.178.10")[0])
+        self.assertFalse(BA.BaseAttack._is_ip_address("192.168.178.1,192.1689.178.10")[0])
+
+    def test_is_port_none(self):
+        self.assertFalse(BA.BaseAttack._is_port(None))
 
-    #FIX?
-    #def test_is_port_empty(self):
-        #self.assertFalse(ba._is_port(""))
+    def test_is_port_empty(self):
+        self.assertFalse(BA.BaseAttack._is_port(""))
 
-    #def test_is_port_empty_list(self):
-        #self.assertFalse(ba._is_port([]))
+    def test_is_port_empty_list(self):
+        self.assertFalse(BA.BaseAttack._is_port([]))
 
     def test_is_port_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_port(5000))
+        self.assertTrue(BA.BaseAttack._is_port(5000))
 
     def test_is_port_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_port(70000))
+        self.assertFalse(BA.BaseAttack._is_port(70000))
 
     def test_is_port_string_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_port("5000"))
+        self.assertTrue(BA.BaseAttack._is_port("5000"))
 
     def test_is_port_string_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_port("70000"))
+        self.assertFalse(BA.BaseAttack._is_port("70000"))
 
     def test_is_port_string_comma_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_port("5000, 4000, 3000"))
+        self.assertTrue(BA.BaseAttack._is_port("5000, 4000, 3000"))
 
     def test_is_port_string_comma_ivalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_port("5000, 70000, 3000"))
+        self.assertFalse(BA.BaseAttack._is_port("5000, 70000, 3000"))
 
     def test_is_port_valid_list(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_port([5000, 4000, 3000]))
+        self.assertTrue(BA.BaseAttack._is_port([5000, 4000, 3000]))
 
     def test_is_port_invalid_list(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_port([5000, 70000, 0]))
+        self.assertFalse(BA.BaseAttack._is_port([5000, 70000, 0]))
 
     def test_is_port_valid_string_list(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_port(["5000", "4000", "3000"]))
+        self.assertTrue(BA.BaseAttack._is_port(["5000", "4000", "3000"]))
 
     def test_is_port_invalid_string_list(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_port(["5000", "70000", "0"]))
+        self.assertFalse(BA.BaseAttack._is_port(["5000", "70000", "0"]))
 
     def test_is_port_range_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_port("3000-5000"))
+        self.assertTrue(BA.BaseAttack._is_port("3000-5000"))
 
     def test_is_port_range_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_port("0-70000"))
+        self.assertFalse(BA.BaseAttack._is_port("0-70000"))
 
     def test_is_port_range_dots_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_port("3000...5000"))
+        self.assertTrue(BA.BaseAttack._is_port("3000...5000"))
 
     def test_is_port_range_dots_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_port("0...70000"))
+        self.assertFalse(BA.BaseAttack._is_port("0...70000"))
 
     def test_is_port_range_list_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_port(["3000-5000", "6000-7000"]))
+        self.assertTrue(BA.BaseAttack._is_port(["3000-5000", "6000-7000"]))
 
     def test_is_port_range_list_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_port(["0-70000", "6000-7000"]))
+        self.assertFalse(BA.BaseAttack._is_port(["0-70000", "6000-7000"]))
 
     def test_is_timestamp_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_timestamp("2018-01-25 23:54:00"))
+        self.assertTrue(BA.BaseAttack._is_timestamp("2018-01-25 23:54:00"))
 
     def test_is_timestamp_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_timestamp("20-0100-125 23c:54x:00a"))
+        self.assertFalse(BA.BaseAttack._is_timestamp("20-0100-125 23c:54x:00a"))
 
     def test_is_boolean_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_boolean("42")[0])
+        self.assertFalse(BA.BaseAttack._is_boolean("42")[0])
 
     def test_is_boolean_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_boolean(True))
-        self.assertTrue(BaseAttack.BaseAttack._is_boolean(False))
+        self.assertTrue(BA.BaseAttack._is_boolean(True))
+        self.assertTrue(BA.BaseAttack._is_boolean(False))
 
     def test_is_boolean_valid_strings(self):
         for value in {"y", "yes", "t", "true", "on", "1", "n", "no", "f", "false", "off", "0"}:
             with self.subTest(value=value):
-                self.assertTrue(BaseAttack.BaseAttack._is_boolean(value))
+                self.assertTrue(BA.BaseAttack._is_boolean(value))
 
     def test_is_float_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_float(50.67)[0])
+        self.assertTrue(BA.BaseAttack._is_float(50.67)[0])
 
     def test_is_float_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_float("invalid")[0])
+        self.assertFalse(BA.BaseAttack._is_float("invalid")[0])
 
     def test_is_domain_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack._is_domain("foo://example.com:8042/over/there?name=ferret"))
+        self.assertTrue(BA.BaseAttack._is_domain("foo://example.com:8042/over/there?name=ferret"))
 
     def test_is_domain_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack._is_domain("this is not a valid domain, I guess, maybe, let's find out."))
+        self.assertFalse(BA.BaseAttack._is_domain("this is not a valid domain, I guess, maybe, let's find out."))
 
     def test_is_valid_ipaddress_valid(self):
-        self.assertTrue(BaseAttack.BaseAttack.is_valid_ip_address(BaseAttack, "192.168.178.42"))
+        self.assertTrue(BA.BaseAttack.is_valid_ip_address(BA, "192.168.178.42"))
 
     def test_is_valid_ipaddress_invalid(self):
-        self.assertFalse(BaseAttack.BaseAttack.is_valid_ip_address(BaseAttack, "192.168.1789.42"))
+        self.assertFalse(BA.BaseAttack.is_valid_ip_address(BA, "192.168.1789.42"))
 
     def test_ip_src_dst_equal_check_equal(self):
         with self.assertRaises(SystemExit):
-            BaseAttack.BaseAttack.ip_src_dst_equal_check(BaseAttack, "192.168.178.42", "192.168.178.42")
+            BA.BaseAttack.ip_src_dst_equal_check(BA, "192.168.178.42", "192.168.178.42")
 
     def test_ip_src_dst_equal_check_unequal(self):
-        BaseAttack.BaseAttack.ip_src_dst_equal_check(BaseAttack, "192.168.178.42", "192.168.178.43")
+        BA.BaseAttack.ip_src_dst_equal_check(BA, "192.168.178.42", "192.168.178.43")
 
     def test_clean_whitespaces(self):
-        self.assertEqual("a\nb\rc\td\'e", BaseAttack.BaseAttack.clean_white_spaces(BaseAttack, "a\\nb\\rc\\td\\\'e"))
+        self.assertEqual("a\nb\rc\td\'e", BA.BaseAttack.clean_white_spaces(BA, "a\\nb\\rc\\td\\\'e"))
 
     def test_generate_random_ipv4_address(self):
-        ip_list = BaseAttack.BaseAttack.generate_random_ipv4_address("Unknown", 10)
+        ip_list = BA.BaseAttack.generate_random_ipv4_address("Unknown", 10)
         for ip in ip_list:
             with self.subTest(ip=ip):
-                self.assertTrue(BaseAttack.BaseAttack._is_ip_address(ip))
+                self.assertTrue(BA.BaseAttack._is_ip_address(ip))
 
     def test_generate_random_ipv6_address(self):
-        ip_list = BaseAttack.BaseAttack.generate_random_ipv6_address(10)
+        ip_list = BA.BaseAttack.generate_random_ipv6_address(10)
         for ip in ip_list:
             with self.subTest(ip=ip):
-                self.assertTrue(BaseAttack.BaseAttack._is_ip_address(ip))
+                self.assertTrue(BA.BaseAttack._is_ip_address(ip))
 
     def test_generate_random_mac_address(self):
-        mac_list = BaseAttack.BaseAttack.generate_random_mac_address(10)
+        mac_list = BA.BaseAttack.generate_random_mac_address(10)
         for mac in mac_list:
             with self.subTest(mac=mac):
-                self.assertTrue(BaseAttack.BaseAttack._is_mac_address(mac))
+                self.assertTrue(BA.BaseAttack._is_mac_address(mac))

+ 21 - 37
code/Test/test_DDoS.py

@@ -1,50 +1,34 @@
-import unittest
 import unittest.mock as mock
 
-import ID2TLib.Statistics as Statistics
-import Test.GenericTest as GenericTest
+import Test.ID2TAttackTest as Test
 import ID2TLib.TestLibrary as Lib
 
-sha_basic_ddos = '52c5df968818155f6aa143d4f7eed8fe4014df595d9e561cc9c898842790bbc8'
-sha_num_attackers_ddos = 'e4e1acf27cb87445be802db7a4cad31fcce1a14221b0c65af040d14c8f30b4d1'
-# FIXME: get hash for currently broken test
-sha_dest_mac_length_zero_ddos = ''
-sha_mss_none_ddos = '52c5df968818155f6aa143d4f7eed8fe4014df595d9e561cc9c898842790bbc8'
+sha_basic_ddos = '87c6c9cf4b496b84fecfd758c1d891ff06fe234dba2f421a5ab8bd7d6d9239a5'
+sha_num_attackers_ddos = 'cbbb9b55d03a0efde965bbb8c38f6ba8a9acbd605cb2f3ac22a6ed6e3958f8e9'
+sha_dest_mac_length_zero_ddos = 'acf1d108ab3d4e76636c6b58e08296126a74fcf3936377588376a79716fffd60'
+sha_mss_none_ddos = '87c6c9cf4b496b84fecfd758c1d891ff06fe234dba2f421a5ab8bd7d6d9239a5'
 
-"""
-Name                             Stmts   Miss  Cover   Missing
---------------------------------------------------------------------------------------------
-Attack/DDoSAttack.py                124     7   94%    70, 105-106, 120, 123, 141, 187
-"""
+# TODO: improve coverage
 
 
-class UnitTestDDoS(GenericTest.GenericTest):
+class UnitTestDDoS(Test.ID2TAttackTest):
 
-    @mock.patch.object(Statistics.Statistics, 'get_rnd_win_size', side_effect=Lib.get_win_size)
     @mock.patch('ID2TLib.Utility.get_attacker_config', side_effect=Lib.get_attacker_config)
-    def test_ddos_basic(self, mock_get_attacker_config, mock_get_rnd_win_size):
-        self.generic_test([['DDoSAttack']],
-                          sha_basic_ddos)
+    def test_ddos_basic(self, mock_get_attacker_config):
+        self.checksum_test([['DDoSAttack']],
+                           sha_basic_ddos)
 
-    @mock.patch.object(Statistics.Statistics, 'get_rnd_win_size', side_effect=Lib.get_win_size)
     @mock.patch('ID2TLib.Utility.get_attacker_config', side_effect=Lib.get_attacker_config)
-    def test_ddos_num_attackers(self, mock_get_attacker_config, mock_get_rnd_win_size):
-        self.generic_test([['DDoSAttack', 'attackers.count=5']],
-                          sha_num_attackers_ddos)
-
-    # FIXME: currently returns 'ERROR: 'NoneType' object has no attribute 'route'
-    #@mock.patch.object(Statistics.Statistics, 'get_rnd_win_size', side_effect=get_win_size)
-    #@mock.patch('ID2TLib.Utility.get_attacker_config', side_effect=get_attacker_config)
-    #@mock.patch('ID2TLib.Statistics.Statistics.get_mac_address', return_value='')
-    #def test_dest_mac_length_zero(self, mock_dest_mac, mock_get_attacker_config, mock_get_rnd_win_size):
-    #    self.generic_test([['DDoSAttack']], sha_dest_mac_length_zero_ddos)
-
-    @mock.patch.object(Statistics.Statistics, 'get_rnd_win_size', side_effect=Lib.get_win_size)
-    @mock.patch('ID2TLib.Utility.get_attacker_config', side_effect=Lib.get_attacker_config)
-    @mock.patch('ID2TLib.Statistics.Statistics.get_most_used_mss', return_value=None)
-    def test_ddos_mss_none(self, mock_mss, mock_get_attacker_config, mock_get_rnd_win_size):
-        self.generic_test([['DDoSAttack']], sha_mss_none_ddos)
+    def test_ddos_num_attackers(self, mock_get_attacker_config):
+        self.checksum_test([['DDoSAttack', 'attackers.count=5']],
+                           sha_num_attackers_ddos)
 
+    @mock.patch('ID2TLib.Utility.get_attacker_config', side_effect=Lib.get_attacker_config)
+    @mock.patch('ID2TLib.Statistics.Statistics.get_mac_address', return_value=[])
+    def test_ddos_dest_mac_length_zero(self, mock_dest_mac, mock_get_attacker_config):
+        self.checksum_test([['DDoSAttack']], sha_dest_mac_length_zero_ddos)
 
-if __name__ == '__main__':
-    unittest.main()
+    @mock.patch('ID2TLib.Utility.get_attacker_config', side_effect=Lib.get_attacker_config)
+    @mock.patch('ID2TLib.Statistics.Statistics.get_most_used_mss', return_value=None)
+    def test_ddos_mss_none(self, mock_mss, mock_get_attacker_config):
+        self.checksum_test([['DDoSAttack']], sha_mss_none_ddos)

+ 5 - 17
code/Test/test_EternalBlue.py

@@ -1,23 +1,11 @@
-import unittest
-
-import Test.GenericTest as GenTest
+import Test.ID2TAttackTest as Test
 
 sha_default = 'c707492a0493efcf46a569c91fe77685286402ddfdff3c79e64157b3324dc9f6'
 
-"""
-CURRENT COVERAGE
-Name                             Stmts   Miss  Cover   Missing (lines)
----------------------------------------------------------------------------
-Attack/EternalBlueExploit.py       246     10    96%   62, 72, 112, 119, 126-127, 133-134, 139, 266
-"""
-# TODO: get 100% coverage
-
-
-class UnitTestEternalBlue(GenTest.GenericTest):
+# TODO: improve coverage
 
-    def test_eternalblue_default(self):
-        self.generic_test([['EternalBlueExploit']], sha_default)
 
+class UnitTestEternalBlue(Test.ID2TAttackTest):
 
-if __name__ == '__main__':
-    unittest.main()
+    def test_eternal_blue_default(self):
+        self.checksum_test([['EternalBlueExploit']], sha_default)

+ 9 - 19
code/Test/test_FTPWinaXeExploit.py

@@ -1,8 +1,7 @@
-import unittest
 import unittest.mock as mock
 
 import ID2TLib.TestLibrary as Lib
-import Test.GenericTest as GenericTest
+import Test.ID2TAttackTest as Test
 
 sha_ftp_basic = 'ad9bc7b55c3b0365c0f02ae9b9b7aafdb43acbdd8c8c274d30cb286821e772cc'
 sha_ftp_mac = '388831100c907cfc6815bcc1869f30d937be29091dd8e54a734eb52f14a23f3c'
@@ -11,37 +10,32 @@ sha_not_empty_custom_payload_empty_file = '41186fc804fb2a8fb3605be3246a5246be927
 sha_empty_custom_payload_not_empty_file = 'b1f43c3147dd3684b1db4d7d370801f25de693b632b97a95b933a4d296094f31'
 sha_valid_ip = 'ad9bc7b55c3b0365c0f02ae9b9b7aafdb43acbdd8c8c274d30cb286821e772cc'
 
-"""
-Name                             Stmts   Miss  Cover   Missing
---------------------------------------------------------------------------------------------
-Attack/FTPWinaXeExploit.py         141     14    99%    67
-"""
-# TODO: get 100% coverage
+# TODO: improve coverage
 
 
-class UnitTestFTPWinaXeExploit(GenericTest.GenericTest):
+class UnitTestFTPWinaXeExploit(Test.ID2TAttackTest):
 
     @mock.patch('ID2TLib.Utility.get_rnd_bytes', side_effect=Lib.get_bytes)
     @mock.patch('ID2TLib.Utility.get_rnd_x86_nop', side_effect=Lib.get_x86_nop)
     def test_ftp_basic(self, mock_get_rnd_x86_nop, mock_get_rnd_bytes):
-        self.generic_test([['FTPWinaXeExploit']], sha_ftp_basic)
+        self.checksum_test([['FTPWinaXeExploit']], sha_ftp_basic)
 
     @mock.patch('ID2TLib.Utility.get_rnd_bytes', side_effect=Lib.get_bytes)
     @mock.patch('ID2TLib.Utility.get_rnd_x86_nop', side_effect=Lib.get_x86_nop)
     @mock.patch('ID2TLib.Statistics.Statistics.get_mac_address')
     def test_ftp_mac(self, mock_mac_address, mock_get_rnd_x86_nop, mock_get_rnd_bytes):
         mock_mac_address.return_value = Lib.test_pcap_empty
-        self.generic_test([['FTPWinaXeExploit']], sha_ftp_mac)
+        self.checksum_test([['FTPWinaXeExploit']], sha_ftp_mac)
 
     @mock.patch('ID2TLib.Utility.get_rnd_bytes', side_effect=Lib.get_bytes)
     @mock.patch('ID2TLib.Utility.get_rnd_x86_nop', side_effect=Lib.get_x86_nop)
     def test_ftp_random_ip_src(self, mock_get_rnd_x86_nop, mock_get_rnd_bytes):
-        self.generic_test([['FTPWinaXeExploit', 'ip.src.shuffle=1']], sha_ftp_random_ip_src)
+        self.checksum_test([['FTPWinaXeExploit', 'ip.src.shuffle=1']], sha_ftp_random_ip_src)
 
     @mock.patch('ID2TLib.Utility.get_rnd_bytes', side_effect=Lib.get_bytes)
     @mock.patch('ID2TLib.Utility.get_rnd_x86_nop', side_effect=Lib.get_x86_nop)
     def test_ftp_not_empty_custom_payload_empty_file(self, mock_get_rnd_x86_nop, mock_get_rnd_bytes):
-        self.generic_test([['FTPWinaXeExploit', 'custom.payload=1']], sha_not_empty_custom_payload_empty_file)
+        self.checksum_test([['FTPWinaXeExploit', 'custom.payload=1']], sha_not_empty_custom_payload_empty_file)
 
     @mock.patch('ID2TLib.Utility.get_rnd_bytes', side_effect=Lib.get_bytes)
     @mock.patch('ID2TLib.Utility.get_rnd_x86_nop', side_effect=Lib.get_x86_nop)
@@ -49,14 +43,10 @@ class UnitTestFTPWinaXeExploit(GenericTest.GenericTest):
     @mock.patch('ID2TLib.Utility.get_bytes_from_file', return_value=b'AAAAA')
     def test_ftp_empty_custom_payload_not_empty_file(self, mock_bytes_from_file, mock_payload_len, mock_get_rnd_x86_nop,
                                                      mock_get_rnd_bytes):
-        self.generic_test([['FTPWinaXeExploit', 'custom.payload.file=1']], sha_empty_custom_payload_not_empty_file)
+        self.checksum_test([['FTPWinaXeExploit', 'custom.payload.file=1']], sha_empty_custom_payload_not_empty_file)
 
     @mock.patch('ID2TLib.Utility.get_rnd_bytes', side_effect=Lib.get_bytes)
     @mock.patch('ID2TLib.Utility.get_rnd_x86_nop', side_effect=Lib.get_x86_nop)
     @mock.patch('Attack.BaseAttack.BaseAttack.is_valid_ip_address', return_values=[False, True])
     def test_ftp_invalid_ip(self, mock_valid_ip_check, mock_get_rnd_x86_nop, mock_get_rnd_bytes):
-        self.generic_test([['FTPWinaXeExploit']], sha_valid_ip)
-
-
-if __name__ == '__main__':
-    unittest.main()
+        self.checksum_test([['FTPWinaXeExploit']], sha_valid_ip)

+ 4 - 16
code/Test/test_Joomla.py

@@ -1,23 +1,11 @@
-import unittest
-
-import Test.GenericTest as GenericTest
+import Test.ID2TAttackTest as Test
 
 sha_default = 'a45bd543ae7416cdc5fd76c886f48990b43075753931683407686aac2cfbc111'
 
-"""
-CURRENT COVERAGE
-Name                             Stmts   Miss  Cover   Missing (lines)
----------------------------------------------------------------------------
-Attack/JoomlaRegPrivExploit.py     127      4    97%   62, 71, 116, 123
-"""
-# TODO: get 100% coverage
+# TODO: improve coverage
 
 
-class UnitTestJoomla(GenericTest.GenericTest):
+class UnitTestJoomla(Test.ID2TAttackTest):
 
     def test_joomla_default(self):
-        self.generic_test([['JoomlaRegPrivExploit']], sha_default)
-
-
-if __name__ == '__main__':
-    unittest.main()
+        self.checksum_test([['JoomlaRegPrivExploit']], sha_default)

+ 10 - 22
code/Test/test_PortscanAttack.py

@@ -1,8 +1,6 @@
-import unittest
 import unittest.mock as mock
 
-import ID2TLib.TestLibrary as Lib
-from Test.GenericTest import GenericTest
+import Test.ID2TAttackTest as Test
 
 sha_portscan_default = '6af539fb9f9a28f84a5c337a07dbdc1a11885c5c6de8f9a682bd74b89edc5130'
 sha_portscan_reverse_ports = '1c03342b7b94fdd1c9903d07237bc5239ebb7bd77a3dd137c9c378fa216c5382'
@@ -13,41 +11,31 @@ sha_portscan_ttl_value_zero = 'ff8cf15d8e59856e0c6e43d81fa40180ebf2127042f376217
 sha_portscan_win_value_zero = 'b2fcbf72190ac3bf12192d0d7ee8c09ef87adb0d94a2610615ca76d8b577bbfb'
 sha_portscan_ip_src_random = 'c3939f30a40fa6e2164cc91dc4a7e823ca409492d44508e3edfc9d24748af0e5'
 
-"""
-CURRENT COVERAGE
-Name                             Stmts   Miss  Cover   Missing (lines)
----------------------------------------------------------------------------
-Attack/PortscanAttack.py           146      6    96%   73, 108-109, 158, 211, 238
-"""
-# TODO: get 100% coverage
+# TODO: improve coverage
 
 
-class UnitTestPortscanAttack(GenericTest):
+class UnitTestPortscanAttack(Test.ID2TAttackTest):
 
     def test_portscan_default(self):
-        self.generic_test([['PortscanAttack']], sha_portscan_default)
+        self.checksum_test([['PortscanAttack']], sha_portscan_default)
 
     def test_portscan_reverse_ports(self):
-        self.generic_test([['PortscanAttack', 'port.dst.order-desc=1']], sha_portscan_reverse_ports)
+        self.checksum_test([['PortscanAttack', 'port.dst.order-desc=1']], sha_portscan_reverse_ports)
 
     def test_portscan_shuffle_dst_ports(self):
-        self.generic_test([['PortscanAttack', 'port.dst.shuffle=1']], sha_portscan_shuffle_dst_ports)
+        self.checksum_test([['PortscanAttack', 'port.dst.shuffle=1']], sha_portscan_shuffle_dst_ports)
 
     def test_portscan_shuffle_src_ports(self):
-        self.generic_test([['PortscanAttack', 'port.src.shuffle=1']], sha_portscan_shuffle_src_ports)
+        self.checksum_test([['PortscanAttack', 'port.src.shuffle=1']], sha_portscan_shuffle_src_ports)
 
     @mock.patch('ID2TLib.Statistics.Statistics.get_mss_distribution', return_value='')
     def test_portscan_mss_length_zero(self, mock_mss_dis):
-        self.generic_test([['PortscanAttack']], sha_portscan_mss_value_zero)
+        self.checksum_test([['PortscanAttack']], sha_portscan_mss_value_zero)
 
     @mock.patch('ID2TLib.Statistics.Statistics.get_ttl_distribution', return_value='')
     def test_portscan_ttl_length_zero(self, mock_ttl_dis):
-        self.generic_test([['PortscanAttack']], sha_portscan_ttl_value_zero)
+        self.checksum_test([['PortscanAttack']], sha_portscan_ttl_value_zero)
 
     @mock.patch('ID2TLib.Statistics.Statistics.get_win_distribution', return_value='')
     def test_portscan_win_length_zero(self, mock_win_dis):
-        self.generic_test([['PortscanAttack']], sha_portscan_win_value_zero)
-
-
-if __name__ == '__main__':
-    unittest.main()
+        self.checksum_test([['PortscanAttack']], sha_portscan_win_value_zero)

+ 7 - 11
code/Test/test_Queries.py

@@ -1,15 +1,15 @@
+import random
 import unittest
 
-import definitions as defs
-import random as rand
 import ID2TLib.Controller as Ctrl
+import ID2TLib.TestLibrary as Test
 
-pcap = defs.ROOT_DIR + "/../resources/test/reference_1998.pcap"
+# TODO: improve coverage
 
-controller = Ctrl.Controller(pcap_file_path=pcap, do_extra_tests=False)
+controller = Ctrl.Controller(pcap_file_path=Test.test_pcap, do_extra_tests=False)
 controller.load_pcap_statistics(flag_write_file=False, flag_recalculate_stats=True, flag_print_statistics=False)
 
-file_information = [('Pcap file', defs.ROOT_DIR + '/../resources/test/reference_1998.pcap'),
+file_information = [('Pcap file', Test.test_pcap),
                     ('Packets', 1998, 'packets'), ('Capture length', '25.4294414520264', 'seconds'),
                     ('Capture start', '1970-01-01 01:01:45.647675'), ('Capture end', '1970-01-01 01:08:10.102034')]
 
@@ -100,11 +100,11 @@ class TestQueries(unittest.TestCase):
         self.assertEqual(controller.statistics.get_ip_addresses(), ip_addresses)
 
     def test_get_random_ip_address(self):
-        rand.seed(5)
+        random.seed(5)
         self.assertEqual(controller.statistics.get_random_ip_address(), '72.247.178.113')
 
     def test_get_random_ip_address_count_2(self):
-        rand.seed(5)
+        random.seed(5)
         self.assertEqual(controller.statistics.get_random_ip_address(2), ['72.247.178.113', '23.51.123.27'])
 
     def test_get_mac_address_1(self):
@@ -226,7 +226,3 @@ class TestQueries(unittest.TestCase):
 
     def test_all_protocolname(self):
         self.assertEqual(controller.statistics.process_db_query('all(protocolname)'), ['IPv4', 'TCP', 'UDP'])
-
-
-if __name__ == '__main__':
-    unittest.main()

+ 0 - 2
code/Test/test_SMBLib.py

@@ -56,5 +56,3 @@ class TestSMBLib(unittest.TestCase):
     def test_invalid_smb_version(self):
         with self.assertRaises(SystemExit):
             SMBLib.invalid_smb_version("abc")
-
-

+ 8 - 21
code/Test/test_SMBLoris.py

@@ -1,43 +1,30 @@
-import unittest
-import unittest.mock as mock
-
 import ID2TLib.TestLibrary as Lib
-from Test.GenericTest import GenericTest
+import Test.ID2TAttackTest as Test
 
 sha_default = 'cbfb154a80546ebcf0a0d5128bcc42e4d69228c1d97ea4dda49ba156703b78c2'
 sha_one_attacker = 'a316ba1a667318ef4b8d1bf5ffee3f58dfcd0221b0cc3ab62dd967379217eb27'
 sha_sixteen_attackers = '08b17b360ee9be1657e7c437e5aef354dac374ceca3b4ee437c45c0d9d03a2ef'
 sha_ips_in_pcap = 'f299e4139780869d9f02c25ba00f1cad483a4f215d6aef4079b93f7f7e1de22a'
 
-"""
-CURRENT COVERAGE
-Name                             Stmts   Miss  Cover   Missing (lines)
----------------------------------------------------------------------------
-Attack/SMBLorisAttack.py           128      4    97%   67, 72, 149, 182
-"""
-# TODO: get 100% coverage
+# TODO: improve coverage
 
 
-class UnitTestSMBLoris(GenericTest):
+class UnitTestSMBLoris(Test.ID2TAttackTest):
 
     def test_smbloris_default(self):
-        self.generic_test([['SMBLorisAttack']], sha_default)
+        self.checksum_test([['SMBLorisAttack']], sha_default)
 
     def test_smbloris_one_attacker(self):
-        self.generic_test([['SMBLorisAttack', 'ip.src=192.168.1.240', 'ip.dst=192.168.1.210']], sha_one_attacker)
+        self.checksum_test([['SMBLorisAttack', 'ip.src=192.168.1.240', 'ip.dst=192.168.1.210']], sha_one_attacker)
 
     def test_smbloris_ips_in_pcap(self):
         ip_src = 'ip.src='+Lib.test_pcap_ips[0]
         ip_dst = 'ip.dst='+Lib.test_pcap_ips[1]
-        self.generic_test([['SMBLorisAttack', ip_src, ip_dst]], sha_ips_in_pcap)
+        self.checksum_test([['SMBLorisAttack', ip_src, ip_dst]], sha_ips_in_pcap)
 
     def test_smbloris_sixteen_attackers(self):
-        self.generic_test([['SMBLorisAttack', 'ip.dst=192.168.1.210', 'attackers.count=16']], sha_sixteen_attackers)
+        self.checksum_test([['SMBLorisAttack', 'ip.dst=192.168.1.210', 'attackers.count=16']], sha_sixteen_attackers)
 
     def test_smbloris_same_ip_src_dst(self):
         with self.assertRaises(SystemExit):
-            self.generic_test([['SMBLorisAttack', 'ip.src=192.168.1.240', 'ip.dst=192.168.1.240']], sha_default)
-
-
-if __name__ == '__main__':
-    unittest.main()
+            self.checksum_test([['SMBLorisAttack', 'ip.src=192.168.1.240', 'ip.dst=192.168.1.240']], sha_default)

+ 17 - 28
code/Test/test_SMBScan.py

@@ -1,7 +1,6 @@
-import unittest
 import unittest.mock as mock
 
-import Test.GenericTest as GenericTest
+import Test.ID2TAttackTest as Test
 
 sha_default = '213e194da7bc952cc093868c7450901b0fb93c7255d694eb37ea0b9b48bca65d'
 sha_one_victim_linux = '4928d421caaec8f2c4e5c5bb835b5521b705478779cbc8f343b77143a5a66995'
@@ -12,70 +11,60 @@ sha_dest_mac_only = '0814dadb666e0056ef5b3a572a4971f333376b61e602acb84cb99c85184
 sha_ip_src_shuffle = '6c0c9ccbedb631e4965ec36932276a1bd73b8a4aca5a5c46f01fd0a2800a064f'
 sha_smb2 = '8755a901295a90362d8041ecf1243a31fff582f5fe64555205625263c253476e'
 
-"""
-CURRENT COVERAGE
-Name                             Stmts   Miss  Cover   Missing (lines)
----------------------------------------------------------------------------
-Attack/SMBScanAttack.py            239      9    96%   65, 73-74, 82, 193, 210-211, 284-285
-"""
-# TODO: get 100% coverage
+# TODO: improve coverage
 
 
-class UnitTestSMBScan(GenericTest.GenericTest):
+class UnitTestSMBScan(Test.ID2TAttackTest):
 
     def test_smbscan_default(self):
         with mock.patch("ID2TLib.Utility.get_rnd_os", return_value="win7"):
-            self.generic_test([['SMBScanAttack']], sha_default)
+            self.checksum_test([['SMBScanAttack']], sha_default)
 
     def test_smbscan_one_victim_linux(self):
         with mock.patch("ID2TLib.Utility.get_rnd_os", return_value="linux"):
-            self.generic_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.10']],
-                              sha_one_victim_linux)
+            self.checksum_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.10']],
+                               sha_one_victim_linux)
 
     def test_smbscan_victim_range_winxp_hosting(self):
         with mock.patch("ID2TLib.Utility.get_rnd_os", return_value="winxp"):
-            self.generic_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.5',
+            self.checksum_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.5',
                                 'ip.dst.end=192.168.178.10', 'hosting.ip=192.168.178.5']],
-                              sha_victim_range_winxp_hosting)
+                               sha_victim_range_winxp_hosting)
 
     def test_smbscan_multiple_victims_macos(self):
         with mock.patch("ID2TLib.Utility.get_rnd_os", return_value="macos"):
-            self.generic_test([['SMBScanAttack', 'ip.src=192.168.178.1',
+            self.checksum_test([['SMBScanAttack', 'ip.src=192.168.178.1',
                                 'ip.dst=192.168.178.10,192.168.178.15,192.168.178.20',
                                 'hosting.ip=192.168.178.15,192.168.178.20']], sha_multiple_victims_macos)
 
     def test_smbscan_invalid_smb_version(self):
         with self.assertRaises(SystemExit):
-            self.generic_test([['SMBScanAttack', 'protocol.version=42']], 'somehash')
+            self.checksum_test([['SMBScanAttack', 'protocol.version=42']], 'somehash')
 
     def test_smbscan_invalid_smb_platform(self):
         with self.assertRaises(SystemExit):
-            self.generic_test([['SMBScanAttack', 'hosting.version=1337']], 'somehash')
+            self.checksum_test([['SMBScanAttack', 'hosting.version=1337']], 'somehash')
 
     def test_smbscan_port_shuffle(self):
         with mock.patch("ID2TLib.Utility.get_rnd_os", return_value="win7"):
-            self.generic_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.5',
+            self.checksum_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.5',
                                 'ip.dst.end=192.168.178.10', 'hosting.ip=192.168.178.5', 'port.src.shuffle=false']],
-                              sha_port_shuffle)
+                               sha_port_shuffle)
 
     def test_smbscan_dest_mac_only(self):
         with mock.patch("ID2TLib.Utility.get_rnd_os", return_value="win7"):
-            self.generic_test([['SMBScanAttack', 'ip.src=192.168.178.1',
+            self.checksum_test([['SMBScanAttack', 'ip.src=192.168.178.1',
                                 'mac.dst=00:0C:29:9C:70:64']], sha_dest_mac_only)
 
     def test_smbscan_src_ip_shuffle(self):
         with mock.patch("ID2TLib.Utility.get_rnd_os", return_value="win7"):
-            self.generic_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.5',
+            self.checksum_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.5',
                                 'ip.dst.end=192.168.178.10', 'hosting.ip=192.168.178.5', 'ip.src.shuffle=True']],
-                              sha_ip_src_shuffle)
+                               sha_ip_src_shuffle)
 
     def test_smbscan_smb2(self):
 
         with mock.patch("ID2TLib.Utility.get_rnd_os", return_value="linux"):
-            self.generic_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.5',
+            self.checksum_test([['SMBScanAttack', 'ip.src=192.168.178.1', 'ip.dst=192.168.178.5',
                                 'ip.dst.end=192.168.178.10', 'hosting.ip=192.168.178.5', 'protocol.version=2.1',
                                 'hosting.version=2.1']], sha_smb2)
-
-
-if __name__ == '__main__':
-    unittest.main()

+ 4 - 16
code/Test/test_SQLi.py

@@ -1,23 +1,11 @@
-import unittest
-
-import Test.GenericTest as GenericTest
+import Test.ID2TAttackTest as Test
 
 sha_default = 'a130ecdaf5fd8c09ef8418d2dbe7bd68c54e922553eb9fa703df016115393a46'
 
-"""
-CURRENT COVERAGE
-Name                             Stmts   Miss  Cover   Missing (lines)
----------------------------------------------------------------------------
-Attack/SQLiAttack.py               159      5    97%   62, 71, 113, 120, 245
-"""
-# TODO: get 100% coverage
+# TODO: improve coverage
 
 
-class UnitTestSQLi(GenericTest.GenericTest):
+class UnitTestSQLi(Test.ID2TAttackTest):
 
     def test_sqli_default(self):
-        self.generic_test([['SQLiAttack']], sha_default)
-
-
-if __name__ == '__main__':
-    unittest.main()
+        self.checksum_test([['SQLiAttack']], sha_default)

+ 13 - 20
code/Test/test_Utility.py

@@ -1,8 +1,10 @@
 import unittest
 
-import ID2TLib.TestLibrary as TestLibrary
+import ID2TLib.TestLibrary as Lib
 import ID2TLib.Utility as Utility
 
+# TODO: improve coverage
+
 
 class TestUtility(unittest.TestCase):
 
@@ -38,6 +40,7 @@ class TestUtility(unittest.TestCase):
         results = [("A", 1), ("B", 2)]
         self.assertIn(Utility.get_nth_random_element(letters, numbers), results)
 
+    # TODO: ???
     #def test_get_nth_random_element_single_list(self):
         #letters = ["A", "B", "C"]
         #self.assertIn(Utility.get_nth_random_element(letters), letters)
@@ -108,7 +111,7 @@ class TestUtility(unittest.TestCase):
     def test_generate_source_port_from_platform_newwinmac_maxport(self):
         self.assertTrue(49152 <= Utility.generate_source_port_from_platform("win7", 65535) <= 65535)
 
-    # Test get_filetime_format????
+    # TODO: get_filetime_format Test
 
     def test_get_rnd_boot_time_invalid(self):
         with self.assertRaises(SystemExit):
@@ -173,26 +176,26 @@ class TestUtility(unittest.TestCase):
 
     def test_get_bytes_from_file_invalid_path(self):
         with self.assertRaises(SystemExit):
-            Utility.get_bytes_from_file(TestLibrary.test_resource_dir+"/NonExistingFile.txt")
+            Utility.get_bytes_from_file(Lib.test_resource_dir + "/NonExistingFile.txt")
 
     def test_get_bytes_from_file_invalid_header(self):
         with self.assertRaises(SystemExit):
-            Utility.get_bytes_from_file(TestLibrary.test_resource_dir+"/InvalidHeader.txt")
+            Utility.get_bytes_from_file(Lib.test_resource_dir + "/InvalidHeader.txt")
 
     def test_get_bytes_from_file_invalid_hexfile(self):
         with self.assertRaises(SystemExit):
-            Utility.get_bytes_from_file(TestLibrary.test_resource_dir+"/InvalidHexFile.txt")
+            Utility.get_bytes_from_file(Lib.test_resource_dir + "/InvalidHexFile.txt")
 
     def test_get_bytes_from_file_invalid_strfile(self):
         with self.assertRaises(SystemExit):
-            Utility.get_bytes_from_file(TestLibrary.test_resource_dir+"/InvalidStringFile.txt")
+            Utility.get_bytes_from_file(Lib.test_resource_dir + "/InvalidStringFile.txt")
 
     def test_get_bytes_from_file_str(self):
-        result = Utility.get_bytes_from_file(TestLibrary.test_resource_dir+"/StringTestFile.txt")
+        result = Utility.get_bytes_from_file(Lib.test_resource_dir + "/StringTestFile.txt")
         self.assertEqual(result, b'This is a string-test')
 
     def test_get_bytes_from_file_hex(self):
-        result = Utility.get_bytes_from_file(TestLibrary.test_resource_dir+"/HexTestFile.txt")
+        result = Utility.get_bytes_from_file(Lib.test_resource_dir + "/HexTestFile.txt")
         self.assertEqual(result, b'\xab\xcd\xef\xff\x10\xff\xaa\xab')
 
     def test_handle_most_used_outputs_empty(self):
@@ -214,18 +217,6 @@ class TestUtility(unittest.TestCase):
         test_input = [2, 4, 0, 1, 3]
         self.assertEqual(Utility.handle_most_used_outputs(test_input), 0)
 
-    def test_rreplace_not_in_string(self):
-        self.assertEqual(Utility.rreplace("testateststring", "nonexisting", "correct", 1), "testateststring")
-
-    def test_rreplace_zero(self):
-        self.assertEqual(Utility.rreplace("testateststring", "test", "correct", 0), "testateststring")
-
-    def test_rreplace_one(self):
-        self.assertEqual(Utility.rreplace("testateststring", "test", "correct", 1), "testacorrectstring")
-
-    def test_rreplace_two(self):
-        self.assertEqual(Utility.rreplace("testateststring", "test", "correct", 2), "correctacorrectstring")
-
     def test_check_payload_len_exceeded(self):
         with self.assertRaises(SystemExit):
             Utility.check_payload_len(10, 5)
@@ -235,3 +226,5 @@ class TestUtility(unittest.TestCase):
             Utility.check_payload_len(5, 10)
         except SystemExit:
             self.fail()
+
+    # TODO: get_attacker_config Tests

+ 0 - 3
code/definitions.py

@@ -1,3 +0,0 @@
-import os
-
-ROOT_DIR = os.path.dirname(os.path.abspath(__file__))

BIN
resources/test/test.pcap