ソースを参照

Merge branch 'feature/ip-refactoring' into develop

Denis Waßmann 7 年 前
コミット
8e68fdc0fe
3 ファイル変更405 行追加66 行削除
  1. 17 66
      code/ID2TLib/IPGenerator.py
  2. 177 0
      code/ID2TLib/IPv4.py
  3. 211 0
      code/ID2TLib/PcapAddressOperations.py

+ 17 - 66
code/ID2TLib/IPGenerator.py

@@ -1,28 +1,7 @@
 import random
-import re
+from . import IPv4 as ip
 
 class IPGenerator:
-	# see wikipedia
-	PRIVATE_IP_SEGMENTS = [
-		"10.0.0.0/8",
-		"172.16.0.0/12",
-		"192.168.0.0/16"
-	]
-	
-	LOCALHOST_SEGMENT = "127.0.0.0/8"
-	
-	MULTICAST_SEGMENT = "224.0.0.0/4" # class D segment
-	RESERVED_SEGMENT  = "240.0.0.0/4" # class E segment
-	
-	ZERO_CONF_SEGMENT = "169.254.0.0/16" # link local segment
-	
-	# a number between 0 and 255, no leading zeros
-	_IP_NUMBER_REGEXP = r"(25[0-5]|2[0-4]\d|1?[1-9]?\d)"
-	# 4 numbers between 0 and 255, joined together with dots
-	IP_REGEXP = r"{0}\.{0}\.{0}\.{0}".format(_IP_NUMBER_REGEXP)
-	# an ip address with an optional cidr-suffix
-	CIDR_REGEXP = IP_REGEXP + r"(\/(3[0-2]|[12]?\d)|)?"
-	
 	def __init__(self, include_private_ips = False, include_localhost = False,
 			include_multicast = False, include_reserved = False,
 			include_link_local = False, blacklist = None):
@@ -30,68 +9,40 @@ class IPGenerator:
 		self.generated_ips = set()
 		
 		if not include_private_ips:
-			for segment in self.PRIVATE_IP_SEGMENTS:
+			for segment in ip.ReservedIPBlocks.PRIVATE_IP_SEGMENTS:
 				self.add_to_blacklist(segment)
 		if not include_localhost:
-			self.add_to_blacklist(self.LOCALHOST_SEGMENT)
+			self.add_to_blacklist(ip.ReservedIPBlocks.LOCALHOST_SEGMENT)
 		if not include_multicast:
-			self.add_to_blacklist(self.MULTICAST_SEGMENT)
+			self.add_to_blacklist(ip.ReservedIPBlocks.MULTICAST_SEGMENT)
 		if not include_reserved:
-			self.add_to_blacklist(self.RESERVED_SEGMENT)
+			self.add_to_blacklist(ip.ReservedIPBlocks.RESERVED_SEGMENT)
 		if not include_link_local:
-			self.add_to_blacklist(self.ZERO_CONF_SEGMENT)
+			self.add_to_blacklist(ip.ReservedIPBlocks.ZERO_CONF_SEGMENT)
 		if blacklist:
 			for segment in blacklist:
 				self.add_to_blacklist(segment)	
 	
-	def add_to_blacklist(self, ip_segment: str):
-		self.blacklist.append(self._parse_cidr(ip_segment))
+	def add_to_blacklist(self, ip_segment):
+		if isinstance(ip_segment, ip.IPAddressBlock):
+			self.blacklist.append(ip_segment)
+		else:
+			self.blacklist.append(ip.IPAddressBlock.parse(ip_segment))
 	
 	def random_ip(self):
 		while True:
-			ip = random.randrange(0, 1 << 32)
+			random_ip = ip.IPAddress.from_int(random.randrange(0, 1 << 32))
 			
-			if not self._is_in_blacklist(ip) and ip not in self.generated_ips:
-				self.generated_ips.add(ip)
-				return self._ip_to_str(ip)
+			if not self._is_in_blacklist(random_ip) and random_ip not in self.generated_ips:
+				self.generated_ips.add(random_ip)
+				return str(random_ip)
 	
 	def clear(self, clear_blacklist = True, clear_generated_ips = True):
 		if clear_blacklist: self.blacklist.clear()
 		if clear_generated_ips: self.generated_ips.clear()
 	
-	# parses a str in cidr-notation and returns a tuple with 2 elements
-	# the first element is the ip-address as int, the second one is the cidr-suffix as int
-	def _parse_cidr(self, ip_segment: str):
-		match = re.match("^" + IPGenerator.CIDR_REGEXP + "$", ip_segment)
-		if not match:
-			raise ValueError("%s is no ip in cidr-notation" % ip_segment)
-		
-		ip = [int(match.group(i)) for i in range(1, 5)]
-		suffix = 32 if not match.group(6) else int(match.group(6))
-		
-		numeric_ip = (ip[0] << 24) | (ip[1] << 16) | (ip[2] << 8) | ip[3]
-		
-		return (numeric_ip & self._netmask(suffix), suffix)
-	
-	def _is_in_blacklist(self, ip: int):
-		for black_ip, cidr in self.blacklist:
-			if (ip & self._netmask(cidr)) == black_ip:
-				return True
-		
-		return False
-	
-	def _netmask(self, suffix: int):
-		ones = lambda x: (1 << x) - 1
-		
-		return ones(32) ^ ones(32 - suffix)
-	
-	def _ip_to_str(self, ip: int):
-		return "%i.%i.%i.%i" % (
-			ip >> 24,
-			(ip >> 16) & 255,
-			(ip >> 8) & 255,
-			ip & 255
-		)
+	def _is_in_blacklist(self, ip: ip.IPAddress):
+		return any(ip in block for block in self.blacklist)
 
 class MappingIPGenerator(IPGenerator):
 	def __init__(self, *args, **kwargs):

+ 177 - 0
code/ID2TLib/IPv4.py

@@ -0,0 +1,177 @@
+import re
+
+class IPAddress:
+	# a number between 0 and 255, no leading zeros
+	_IP_NUMBER_REGEXP = r"(25[0-5]|2[0-4]\d|1?[1-9]?\d)"
+	# 4 numbers between 0 and 255, joined together with dots
+	IP_REGEXP = r"{0}\.{0}\.{0}\.{0}".format(_IP_NUMBER_REGEXP)
+	
+	def __init__(self, intlist):
+		if not isinstance(intlist, list) or not all(isinstance(n, int) for n in intlist):
+			raise TypeError("The first constructor argument must be an list of ints")
+		if not len(intlist) == 4 or not all(0 <= n <= 255 for n in intlist):
+			raise ValueError("The integer list must contain 4 ints in range of 0 and 255, like an ip-address")
+		
+		self.ipnum = int.from_bytes(bytes(intlist), "big")
+	
+	@staticmethod
+	def parse(ip: str):
+		match = re.match("^" + IPAddress.IP_REGEXP + "$", ip)
+		if not match:
+			raise ValueError("%s is no ipv4-address" % ip)
+		
+		numbers = [int(match.group(i)) for i in range(1, 5)]
+		return IPAddress(numbers)
+	
+	@staticmethod
+	def from_int(numeric: int):
+		if numeric not in range(1 << 32):
+			raise ValueError("numeric value must be in uint-range")
+		
+		return IPAddress(list(numeric.to_bytes(4, "big")))
+	
+	@staticmethod
+	def is_ipv4(ip: str):
+		match = re.match("^" + IPAddress.IP_REGEXP + "$", ip)
+		return True if match else False
+
+	def to_int(self):
+		return self.ipnum
+	
+	def is_private(self):
+		return ReservedIPBlocks.is_private(self)
+	
+	def get_private_segment(self):
+		return ReservedIPBlocks.get_private_segment(self)
+
+	def is_localhost(self):
+		return ReservedIPBlocks.is_localhost(self)
+	
+	def is_multicast(self):
+		return ReservedIPBlocks.is_multicast(self)
+	
+	def is_reserved(self):
+		return ReservedIPBlocks.is_reserved(self)
+	
+	def is_zero_conf(self):
+		return ReservedIPBlocks.is_zero_conf(self)
+	
+	def _tuple(self):
+		return tuple(self.ipnum.to_bytes(4, "big"))
+	
+	def __repr__(self):
+		return "IPAddress([%i, %i, %i, %i])" % self._tuple()
+	
+	def __str__(self):
+		return "%i.%i.%i.%i" % self._tuple()
+	
+	def __hash__(self):
+		return self.ipnum
+	
+	def __eq__(self, other):
+		if other is None:
+			return False
+		
+		return isinstance(other, IPAddress) and self.ipnum == other.ipnum
+	
+	def __lt__(self, other):
+		if other is None:
+			raise TypeError("Cannot compare to None")
+		if not isinstance(other, IPAddress):
+			raise NotImplemented # maybe other can compare to self
+		
+		return self.ipnum < other.ipnum
+
+class IPAddressBlock:
+	CIDR_REGEXP = IPAddress.IP_REGEXP + r"(\/(3[0-2]|[12]?\d)|)?"
+	
+	def __init__(self, ip, netmask = 32):
+		if isinstance(ip, str):
+			ip = IPAddress.parse(ip)
+		elif isinstance(ip, list):
+			ip = IPAddress(ip)
+		
+		if not 1 <= netmask <= 32:
+			raise ValueError("netmask must lie between 1 and 32")
+		
+		self.ipnum = ip.to_int() & self._bitmask(netmask)
+		self.netmask = netmask
+		self.last_ipnum = self.ipnum + self.block_size() - 1
+
+	@staticmethod
+	def parse(cidr: str):
+		match = re.match("^" + IPAddressBlock.CIDR_REGEXP + "$", cidr)
+		if not match:
+			raise ValueError("%s is no valid cidr-notation" % cidr)
+		
+		ip = [int(match.group(i)) for i in range(1, 5)]
+		suffix = 32 if not match.group(6) else int(match.group(6))
+		
+		return IPAddressBlock(ip, suffix)
+	
+	def block_size(self):
+		return 2 ** (32 - self.netmask)
+	
+	def first_address(self):
+		return IPAddress.from_int(self.ipnum)
+
+	def last_address(self):
+		return IPAddress.from_int(self.last_ipnum)
+
+	def _bitmask(self, netmask):
+		ones = lambda x: (1 << x) - 1
+		
+		return ones(32) ^ ones(32 - netmask)
+	
+	def __repr__(self):
+		return "IPAddressBlock(%s, %i)" % (repr(IPAddress.from_int(self.ipnum)), self.netmask)
+	
+	def __self__(self):
+		return IPAddress.from_int(self.ipnum) + "/" + str(self.netmask)
+	
+	def __contains__(self, ip):
+		return (ip.to_int() & self._bitmask(self.netmask)) == self.ipnum
+
+class ReservedIPBlocks:
+	PRIVATE_IP_SEGMENTS = [
+		IPAddressBlock.parse(block)
+		for block in
+		("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16")
+	]
+	
+	LOCALHOST_SEGMENT = IPAddressBlock.parse("127.0.0.0/8")
+	
+	MULTICAST_SEGMENT = IPAddressBlock.parse("224.0.0.0/4")
+	RESERVED_SEGMENT = IPAddressBlock.parse("240.0.0.0/4")
+	
+	ZERO_CONF_SEGMENT = IPAddressBlock.parse("169.254.0.0/16")
+	
+	@staticmethod
+	def is_private(ip):
+		return any(ip in block for block in ReservedIPBlocks.PRIVATE_IP_SEGMENTS)
+	
+	@staticmethod
+	def get_private_segment(ip):
+		if not ReservedIPBlocks.is_private(ip):
+			raise ValueError("%s is not part of a private IP segment" % ip)
+
+		for block in ReservedIPBlocks.PRIVATE_IP_SEGMENTS:
+			if ip in block:
+				return block
+
+	@staticmethod
+	def is_localhost(ip):
+		return ip in ReservedIPBlocks.LOCALHOST_SEGMENT
+	
+	@staticmethod
+	def is_multicast(ip):
+		return ip in ReservedIPBlocks.MULTICAST_SEGMENT
+	
+	@staticmethod
+	def is_reserved(ip):
+		return ip in ReservedIPBlocks.RESERVED_SEGMENT
+	
+	@staticmethod
+	def is_zero_conf(ip):
+		return ip in ReservedIPBlocks.ZERO_CONF_SEGMENT
+

+ 211 - 0
code/ID2TLib/PcapAddressOperations.py

@@ -0,0 +1,211 @@
+from random import choice
+
+from ID2TLib import Statistics
+from ID2TLib.IPv4 import IPAddress
+
+is_ipv4 = IPAddress.is_ipv4
+
+class PcapAddressOperations():
+
+    def __init__(self, statistics: Statistics, uncertain_ip_mult: int=3):
+        """
+        Initializes a pcap information extractor that uses the provided statistics for its operations.
+
+        :param statistics: The statistics of the pcap file
+        :param uncertain_ip_mult: the mutliplier to create new address space when the remaining observed space has been drained
+        """
+        self.statistics = statistics
+        self.UNCERTAIN_IPSPACE_MULTIPLIER = uncertain_ip_mult
+        self._init_ipaddress_ops()
+
+    def get_probable_router_mac(self):
+        """
+        Returns the most probable router MAC address based on the most used MAC address in the statistics.
+        """
+        self.probable_router_mac, count = self.statistics.process_db_query("most_used(macAddress)", print_results=False)[0]
+        return self.probable_router_mac     # and count as a measure of certainty?
+
+    def pcap_contains_priv_ips(self):
+        """
+        Returns True if the provided traffic contains private IPs, otherwise False.
+        """
+        return self.contains_priv_ips
+
+    def get_priv_address_range(self):
+        """
+        Returns a tuple with the start and end of the observed private IP range.
+        """
+        # better way to handle error?
+        if not self.pcap_contains_priv_ips():
+            print("Error: .pcap does not contain any private ips.")
+            return -1, -1
+        return str(self.min_priv_ip), str(self.max_priv_ip)
+
+    def get_count_rem_priv_ips(self):
+        """
+        Returns the number of private IPs in the pcap file that have not aldready been returned by get_existing_priv_ips.
+        """
+        return len(self.remaining_priv_ips)
+
+    def get_existing_priv_ips(self, count: int=1):
+        """
+        Returns the given number of private IPs that are existent in the pcap file.
+
+        :param count: the number of IPs to return
+        :return: the chosen private IPs
+        """
+
+        # reasonable to include this?
+        if not self.pcap_contains_priv_ips():
+            print("Warning: .pcap does not contain any private ips.")
+            return []
+
+        if count > len(self.priv_ips):
+            print("Warning: There are no {} priv IPs in the .pcap file. Returning all existing priv IPs.".format(count))
+
+        total = min(len(self.priv_ips), count)
+
+        retr_priv_ips = []
+        priv_ips = self.remaining_priv_ips
+        for _ in range(0, total):
+            random_priv_ip = choice(tuple(priv_ips))
+            retr_priv_ips.append(str(random_priv_ip))
+            priv_ips.remove(random_priv_ip)
+
+        return retr_priv_ips
+
+    # also use IPs below minimum observed IP?
+    # offset for later, start at x after minimum? e.g. start at 192.168.0.100
+    # exclude the last IP of an IP segment because its broadcast?
+    def get_new_priv_ips(self, count: int=1):
+        """
+        Returns in the pcap not existent private IPs that match the used segment. IPs can be returned
+        that are either between the minimum and maximum observed IP and are therefore considered certain
+        or that are above the observed maximum address, are more likely to not belong to the network using the
+        private IP segment and are therefore considered uncertain.
+
+        :param count: the number of IPs to return
+        :return: the newly created private IP addresses
+        """
+
+        if not self.pcap_contains_priv_ips():
+            print("Error: .pcap does not contain any private ips.")
+            return []
+
+        unused_priv_ips = self.unused_priv_ips
+        uncertain_priv_ips = self.uncertain_priv_ips
+
+        # warning reasonable?
+        if count > len(unused_priv_ips):
+            print("Warning: there are no {0} unused certain priv IPs in the .pcap file.\n \
+                Returning {1} certain and {2} uncertain priv IPs.".format(count, len(unused_priv_ips), count-len(unused_priv_ips)))
+
+        count_certain = min(count, len(unused_priv_ips))
+    
+        retr_priv_ips = []
+        for _ in range(0, count_certain):
+            random_priv_ip = choice(tuple(unused_priv_ips))
+            retr_priv_ips.append(str(random_priv_ip))
+            unused_priv_ips.remove(random_priv_ip)
+
+        # retrieve uncertain priv ips
+        if count_certain < count:
+            count_uncertain = count - count_certain
+
+            # check if new uncertain IPs have to be created
+            if len(uncertain_priv_ips) < count_uncertain:
+                ipspace_multiplier = self.UNCERTAIN_IPSPACE_MULTIPLIER
+
+                max_new_ip = self.max_uncertain_priv_ip.to_int() + ipspace_multiplier * count_uncertain
+                # adjust IP space multiplier and prevent broadcast address of private segment from being chosen as new IP
+                while max_new_ip >= self.priv_ip_segment.last_address().to_int():
+                    max_new_ip -= 1
+            
+                count_new_ips = max_new_ip - self.max_uncertain_priv_ip.to_int()
+                if count_new_ips < count_uncertain:
+                    print("Error: Cannot generate enough new private IPs because they would exceed the maximum private segment IP. Returning {}.".format(count_new_ips))
+
+                # create ipspace_multiplier * count_uncertain new uncertain IP addresses
+                last_gen_ip = None
+                for i in range(1, count_new_ips + 1):
+                    ip = IPAddress.from_int(self.max_uncertain_priv_ip.to_int() + i)
+                    # exclude the broadcast address
+                    if ip.to_int() >= self.priv_ip_segment.last_address().to_int():
+                        break
+                    uncertain_priv_ips.add(ip)
+                    last_gen_ip = ip
+                self.max_uncertain_priv_ip = last_gen_ip
+
+            # choose the uncertain IPs to return
+            total_uncertain = min(count_uncertain, len(uncertain_priv_ips))
+            for _ in range(0, total_uncertain):
+                random_priv_ip = choice(tuple(uncertain_priv_ips))
+                retr_priv_ips.append(str(random_priv_ip))
+                uncertain_priv_ips.remove(random_priv_ip)
+
+        return retr_priv_ips
+
+    def _init_ipaddress_ops(self):
+        """
+        Load and process data needed to perform functions on the IP addresses contained in the statistics
+        """
+
+        all_ips = self.statistics.process_db_query("all(ipAddress)", print_results=False)
+
+        # find the private IP segment in use
+        priv_ip_segment = None
+        self.contains_priv_ips = False
+        first_priv_ip = None
+        first_priv_ip_idx = -1
+
+        # for that iterate over all IPs until the first private IP is found
+        for i, ip in enumerate(all_ips):
+            if not is_ipv4(ip): 
+                continue
+            ip = IPAddress.parse(ip)
+
+            if ip.is_private():
+                priv_ip_segment = ip.get_private_segment()
+                first_priv_ip_idx = i
+                first_priv_ip = ip
+                self.contains_priv_ips = True
+                break
+
+        if not self.contains_priv_ips:
+            #print("The Pcap File does not contain any private IPs")
+            return
+
+        # get minimum and maximum seen private IP. The addresses in-bewteen are considered 
+        # as certain to be part of the network the pcap traffic is from
+        min_priv_ip, max_priv_ip = first_priv_ip, first_priv_ip
+        priv_ips = {first_priv_ip}
+        for ip in all_ips[first_priv_ip_idx+1:]:
+            if not is_ipv4(ip): 
+                continue
+            ip = IPAddress.parse(ip)
+
+            if ip in priv_ip_segment:
+                priv_ips.add(ip)
+                if ip > max_priv_ip:
+                    max_priv_ip = ip
+                elif ip < min_priv_ip:
+                    min_priv_ip = ip
+
+        # save the certain unused priv IPs of the network
+        unused_priv_ips = set()
+        for i in range (min_priv_ip.to_int() + 1, max_priv_ip.to_int()):
+            ip = IPAddress.from_int(i)
+            if not ip in priv_ips:
+                unused_priv_ips.add(ip)
+
+        # save the gathered information for efficient later use
+        self.min_priv_ip, self.max_priv_ip = min_priv_ip, max_priv_ip
+        self.max_uncertain_priv_ip = max_priv_ip
+        self.priv_ips = frozenset(priv_ips)
+        self.remaining_priv_ips = priv_ips
+        self.unused_priv_ips = unused_priv_ips
+        self.generated_uncertain_ips = set()
+        self.uncertain_priv_ips = set()
+        self.priv_ip_segment = priv_ip_segment
+
+