Browse Source

refactor imports in ID2TLib/Utility.py

refactor whitspaces in ID2TLib/Utility.py
deep refactoring ID2TLib/Utility.py
Jens Keim 6 years ago
parent
commit
aeb34bb03f
2 changed files with 55 additions and 70 deletions
  1. 55 60
      code/ID2TLib/Utility.py
  2. 0 10
      code/Test/test_Utility.py

+ 55 - 60
code/ID2TLib/Utility.py

@@ -1,22 +1,21 @@
+import calendar as cal
+import datetime as dt
 import ipaddress
 import os
+import random as rnd
 
-from random import randint, uniform, choice
-from os import urandom
-from datetime import datetime
-from calendar import timegm
 import matplotlib
+import scipy.stats as stats
+
 matplotlib.use('Agg', force=True)
-from lea import Lea
-from scipy.stats import gamma
-from scapy.layers.inet import RandShort
+import lea
+import scapy.layers.inet as inet
 
 CODE_DIR = os.path.dirname(os.path.abspath(__file__)) + "/../"
 ROOT_DIR = CODE_DIR + "../"
 RESOURCE_DIR = ROOT_DIR + "resources/"
 TEST_DIR = RESOURCE_DIR + "test/"
 
-
 platforms = {"win7", "win10", "winxp", "win8.1", "macos", "linux", "win8", "winvista", "winnt", "win2000"}
 platform_probability = {"win7": 48.43, "win10": 27.99, "winxp": 6.07, "win8.1": 6.07, "macos": 5.94, "linux": 3.38,
                         "win8": 1.35, "winvista": 0.46, "winnt": 0.31}
@@ -42,12 +41,12 @@ def update_timestamp(timestamp, pps, delay=0):
     if delay == 0:
         # Calculate request timestamp
         # To imitate the bursty behavior of traffic
-        randomdelay = Lea.fromValFreqsDict({1 / pps: 70, 2 / pps: 20, 5 / pps: 7, 10 / pps: 3})
-        return timestamp + uniform(1 / pps, randomdelay.random())
+        randomdelay = lea.Lea.fromValFreqsDict({1 / pps: 70, 2 / pps: 20, 5 / pps: 7, 10 / pps: 3})
+        return timestamp + rnd.uniform(1 / pps, randomdelay.random())
     else:
         # Calculate reply timestamp
-        randomdelay = Lea.fromValFreqsDict({2 * delay: 70, 3 * delay: 20, 5 * delay: 7, 10 * delay: 3})
-        return timestamp + uniform(1 / pps + delay, 1 / pps + randomdelay.random())
+        randomdelay = lea.Lea.fromValFreqsDict({2 * delay: 70, 3 * delay: 20, 5 * delay: 7, 10 * delay: 3})
+        return timestamp + rnd.uniform(1 / pps + delay, 1 / pps + randomdelay.random())
 
 
 def get_interval_pps(complement_interval_pps, timestamp):
@@ -60,9 +59,9 @@ def get_interval_pps(complement_interval_pps, timestamp):
     :return: the corresponding packet rate (pps) .
     """
     for row in complement_interval_pps:
-        if timestamp<=row[0]:
+        if timestamp <= row[0]:
             return row[1]
-    return complement_interval_pps[-1][1] # in case the timstamp > capture max timestamp
+    return complement_interval_pps[-1][1]  # in case the timstamp > capture max timestamp
 
 
 def get_nth_random_element(*element_list):
@@ -70,37 +69,31 @@ def get_nth_random_element(*element_list):
     Returns the n-th element of every list from an arbitrary number of given lists.
     For example, list1 contains IP addresses, list 2 contains MAC addresses. Use of this function ensures that
     the n-th IP address uses always the n-th MAC address.
+
     :param element_list: An arbitrary number of lists.
     :return: A tuple of the n-th element of every list.
     """
     if len(element_list) <= 0:
         return None
     elif len(element_list) == 1 and len(element_list[0]) > 0:
-        return choice(element_list[0])
+        return rnd.choice(element_list[0])
     else:
         range_max = min([len(x) for x in element_list])
         if range_max > 0:
             range_max -= 1
-            n = randint(0, range_max)
+            n = rnd.randint(0, range_max)
             return tuple(x[n] for x in element_list)
         else:
             return None
 
 
-def index_increment(number: int, max: int):
-            if number + 1 < max:
-                return number + 1
-            else:
-                return 0
-
-
 def get_rnd_os():
     """
     Chooses random platform over an operating system probability distribution
 
     :return: random platform as string
     """
-    os_dist = Lea.fromValFreqsDict(platform_probability)
+    os_dist = lea.Lea.fromValFreqsDict(platform_probability)
     return os_dist.random()
 
 
@@ -131,37 +124,37 @@ def get_ip_range(start_ip: str, end_ip: str):
     if start < end:
         while start <= end:
             ips.append(start.exploded)
-            start = start+1
+            start = start + 1
     elif start > end:
         while start >= end:
             ips.append(start.exploded)
-            start = start-1
+            start = start - 1
     else:
         ips.append(start_ip)
     return ips
 
 
-def generate_source_port_from_platform(platform: str, previousPort=0):
+def generate_source_port_from_platform(platform: str, previous_port=0):
     """
     Generates the next source port according to the TCP-port-selection strategy of the given platform
 
     :param platform: the platform for which to generate source ports
-    :param previousPort: the previously used/generated source port. Must be 0 if no port was generated before
+    :param previous_port: the previously used/generated source port. Must be 0 if no port was generated before
     :return: the next source port for the given platform
     """
     check_platform(platform)
     if platform in {"winnt", "winxp", "win2000"}:
-        if (previousPort == 0) or (previousPort + 1 > 5000):
-            return randint(1024, 5000)
+        if (previous_port == 0) or (previous_port + 1 > 5000):
+            return rnd.randint(1024, 5000)
         else:
-            return previousPort + 1
+            return previous_port + 1
     elif platform == "linux":
-        return randint(32768, 61000)
+        return rnd.randint(32768, 61000)
     else:
-        if (previousPort == 0) or (previousPort + 1 > 65535):
-            return randint(49152, 65535)
+        if (previous_port == 0) or (previous_port + 1 > 65535):
+            return rnd.randint(49152, 65535)
         else:
-            return previousPort + 1
+            return previous_port + 1
 
 
 def get_filetime_format(timestamp):
@@ -171,10 +164,10 @@ def get_filetime_format(timestamp):
     :param timestamp: a timestamp in seconds
     :return: MS FILETIME timestamp
     """
-    boot_datetime = datetime.fromtimestamp(timestamp)
+    boot_datetime = dt.datetime.fromtimestamp(timestamp)
     if boot_datetime.tzinfo is None or boot_datetime.tzinfo.utcoffset(boot_datetime) is None:
         boot_datetime = boot_datetime.replace(tzinfo=boot_datetime.tzname())
-    boot_filetime = 116444736000000000 + (timegm(boot_datetime.timetuple()) * 10000000)
+    boot_filetime = 116444736000000000 + (cal.timegm(boot_datetime.timetuple()) * 10000000)
     return boot_filetime + (boot_datetime.microsecond * 10)
 
 
@@ -188,15 +181,15 @@ def get_rnd_boot_time(timestamp, platform="winxp"):
     """
     check_platform(platform)
     if platform is "linux":
-        uptime_in_days = Lea.fromValFreqsDict({3: 50, 7: 25, 14: 12.5, 31: 6.25, 92: 3.125, 183: 1.5625,
-                                               365: 0.78125, 1461: 0.390625, 2922: 0.390625})
+        uptime_in_days = lea.Lea.fromValFreqsDict({3: 50, 7: 25, 14: 12.5, 31: 6.25, 92: 3.125, 183: 1.5625,
+                                                   365: 0.78125, 1461: 0.390625, 2922: 0.390625})
     elif platform is "macos":
-        uptime_in_days = Lea.fromValFreqsDict({7: 50, 14: 25, 31: 12.5, 92: 6.25, 183: 3.125, 365: 3.076171875,
-                                               1461: 0.048828125})
+        uptime_in_days = lea.Lea.fromValFreqsDict({7: 50, 14: 25, 31: 12.5, 92: 6.25, 183: 3.125, 365: 3.076171875,
+                                                   1461: 0.048828125})
     else:
-        uptime_in_days = Lea.fromValFreqsDict({3: 50, 7: 25, 14: 12.5, 31: 6.25, 92: 3.125, 183: 1.5625,
-                                               365: 0.78125, 1461: 0.78125})
-    timestamp -= randint(0, uptime_in_days.random()*86400)
+        uptime_in_days = lea.Lea.fromValFreqsDict({3: 50, 7: 25, 14: 12.5, 31: 6.25, 92: 3.125, 183: 1.5625,
+                                                   365: 0.78125, 1461: 0.78125})
+    timestamp -= rnd.randint(0, uptime_in_days.random() * 86400)
     return timestamp
 
 
@@ -216,10 +209,10 @@ def get_rnd_x86_nop(count=1, side_effect_free=False, char_filter=set()):
 
     if not isinstance(char_filter, set):
         char_filter = set(char_filter)
-    nops = list(nops-char_filter)
+    nops = list(nops - char_filter)
 
     for i in range(0, count):
-        result += nops[randint(0, len(nops) - 1)]
+        result += nops[rnd.randint(0, len(nops) - 1)]
     return result
 
 
@@ -235,9 +228,9 @@ def get_rnd_bytes(count=1, ignore=None):
         ignore = []
     result = b''
     for i in range(0, count):
-        char = urandom(1)
+        char = os.urandom(1)
         while char in ignore:
-            char = urandom(1)
+            char = os.urandom(1)
         result += char
     return result
 
@@ -245,6 +238,7 @@ def get_rnd_bytes(count=1, ignore=None):
 def check_payload_len(payload_len: int, limit: int):
     """
     Checks if the len of the payload exceeds a given limit
+
     :param payload_len: The length of the payload
     :param limit: The limit of the length of the payload which is allowed
     """
@@ -277,7 +271,7 @@ def get_bytes_from_file(filepath):
         content = file.read()
 
         if header == "hex":
-            content = content.replace(" ", "").replace("\n", "").replace("\\", "").replace("x", "").replace("\"", "")\
+            content = content.replace(" ", "").replace("\n", "").replace("\\", "").replace("x", "").replace("\"", "") \
                 .replace("'", "").replace("+", "").replace("\r", "")
             try:
                 result_bytes = bytes.fromhex(content)
@@ -321,39 +315,40 @@ def handle_most_used_outputs(most_used_x):
         return most_used_x
 
 
-def get_attacker_config(ip_source_list, ipAddress: str):
+def get_attacker_config(ip_source_list, ip_address: str):
     """
     Returns the attacker configuration depending on the IP address, this includes the port for the next
     attacking packet and the previously used (fixed) TTL value.
+
     :param ip_source_list: List of source IPs
-    :param ipAddress: The IP address of the attacker
+    :param ip_address: The IP address of the attacker
     :return: A tuple consisting of (port, ttlValue)
     """
     # Gamma distribution parameters derived from MAWI 13.8G dataset
     alpha, loc, beta = (2.3261710235, -0.188306914406, 44.4853123884)
-    gd = gamma.rvs(alpha, loc=loc, scale=beta, size=len(ip_source_list))
+    gd = stats.gamma.rvs(alpha, loc=loc, scale=beta, size=len(ip_source_list))
 
     # Determine port
-    port = attacker_port_mapping.get(ipAddress)
+    port = attacker_port_mapping.get(ip_address)
     if port is not None:  # use next port
-        next_port = attacker_port_mapping.get(ipAddress) + 1
+        next_port = attacker_port_mapping.get(ip_address) + 1
         if next_port > (2 ** 16 - 1):
             next_port = 1
     else:  # generate starting port
-        next_port = RandShort()
-    attacker_port_mapping[ipAddress] = next_port
+        next_port = inet.RandShort()
+    attacker_port_mapping[ip_address] = next_port
     # Determine TTL value
-    ttl = attacker_ttl_mapping.get(ipAddress)
+    ttl = attacker_ttl_mapping.get(ip_address)
     if ttl is None:  # determine TTL value
         is_invalid = True
-        pos = ip_source_list.index(ipAddress)
+        pos = ip_source_list.index(ip_address)
         pos_max = len(gd)
         while is_invalid:
             ttl = int(round(gd[pos]))
             if 0 < ttl < 256:  # validity check
                 is_invalid = False
             else:
-                pos = index_increment(pos, pos_max)
-        attacker_ttl_mapping[ipAddress] = ttl
+                pos = (pos + 1) % pos_max
+        attacker_ttl_mapping[ip_address] = ttl
     # return port and TTL
     return next_port, ttl

+ 0 - 10
code/Test/test_Utility.py

@@ -50,16 +50,6 @@ class TestUtility(unittest.TestCase):
     def test_get_nth_random_element_nothing(self):
         self.assertEqual(Utility.get_nth_random_element(), None)
 
-    def test_index_increment_not_max(self):
-        self.assertEqual(Utility.index_increment(5, 10), 6)
-
-    def test_index_increment_max(self):
-        self.assertEqual(Utility.index_increment(10, 10), 0)
-
-    # Correct?
-    def test_index_increment_max2(self):
-        self.assertEqual(Utility.index_increment(9, 10), 0)
-
     def test_get_rnd_os(self):
         self.assertIn(Utility.get_rnd_os(), Utility.platforms)