Pārlūkot izejas kodu

Merge branch 'sality_unittests' of stefan.schmidt/ID2T-toolkit into master

Carlos Garcia 6 gadi atpakaļ
vecāks
revīzija
95004f7fed

+ 9 - 7
code/Attack/SalityBotnet.py

@@ -1,20 +1,20 @@
 import logging
 
-from random import randint
+from random import randint, choice
 from scapy.utils import RawPcapReader
 from scapy.layers.inet import Ether
 
 from Attack import BaseAttack
 from Attack.AttackParameters import Parameter as Param
 from Attack.AttackParameters import ParameterTypes
-from ID2TLib.Utility import update_timestamp, get_interval_pps, handle_most_used_outputs
+import ID2TLib.Utility as Util
 
 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
 # noinspection PyPep8
 
 
 class SalityBotnet(BaseAttack.BaseAttack):
-    template_attack_pcap_path = "resources/sality_botnet.pcap"
+    template_attack_pcap_path = Util.RESOURCE_DIR + "/../resources/sality_botnet.pcap"
 
     def __init__(self):
         """
@@ -72,7 +72,7 @@ class SalityBotnet(BaseAttack.BaseAttack):
 
         # Pick a DNS server from the background traffic
         ip_dns_server = self.statistics.process_db_query("SELECT ipAddress FROM ip_protocols WHERE protocolName='DNS' AND protocolCount=(SELECT MAX(protocolCount) FROM ip_protocols WHERE protocolName='DNS');")
-        ip_dns_server = handle_most_used_outputs(ip_dns_server)
+        ip_dns_server = Util.handle_most_used_outputs(ip_dns_server)
         if not ip_dns_server or ip_source == ip_dns_server:
             ip_dns_server = self.statistics.get_random_ip_address()
         mac_dns_server = self.statistics.get_mac_address(ip_dns_server)
@@ -115,15 +115,17 @@ class SalityBotnet(BaseAttack.BaseAttack):
             if ip_pkt.getfieldval("ttl") not in ttl_map:
                 source_ttl = self.statistics.get_most_used_ttl(ip_pkt.getfieldval("src"))
                 if not source_ttl:
-                    source_ttl = self.statistics.process_db_query("SELECT ttlValue FROM ip_ttl ORDER BY RANDOM() LIMIT 1;")
+                    source_ttl = self.statistics.process_db_query("SELECT ttlValue FROM ip_ttl;")
+                    if isinstance(source_ttl, list):
+                        source_ttl = choice(source_ttl)
                 ttl_map[ip_pkt.getfieldval("ttl")] = source_ttl
             ip_pkt.setfieldval("ttl", ttl_map[ip_pkt.getfieldval("ttl")])
 
             new_pkt = (eth_frame / ip_pkt)
             new_pkt.time = timestamp_next_pkt
 
-            pps = max(get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
-            timestamp_next_pkt = update_timestamp(timestamp_next_pkt, pps)
+            pps = max(Util.get_interval_pps(complement_interval_pps, timestamp_next_pkt), 10)
+            timestamp_next_pkt = Util.update_timestamp(timestamp_next_pkt, pps)
 
             packets.append(new_pkt)
 

+ 18 - 0
code/Test/test_SalityBotnet.py

@@ -0,0 +1,18 @@
+import unittest.mock as mock
+
+import Test.ID2TAttackTest as Test
+import ID2TLib.TestLibrary as Lib
+
+sha_botnet_basic = '72c537fba918154dbe937694d8da87260bebb05c0ad20802051fa80107c1efbe'
+sha_botnet_most_used_ip_in_list = '72c537fba918154dbe937694d8da87260bebb05c0ad20802051fa80107c1efbe'
+
+
+class UnitTestSalityBotnet(Test.ID2TAttackTest):
+
+    def test_botnet_basic(self):
+        self.checksum_test([['SalityBotnet']], sha_botnet_basic)
+
+    @mock.patch('ID2TLib.Statistics.Statistics.get_most_used_ip_address')
+    def test_botnet_most_used_ip(self, mock_most_used_ip_address):
+        mock_most_used_ip_address.return_value = Lib.test_pcap_ips[0]
+        self.checksum_test([['SalityBotnet']], sha_botnet_most_used_ip_in_list)

BIN
resources/sality_botnet.pcap