""" This is a highly performance optimized ipaddress like module. Subnet creating is approximatelly 50 times faster. """ from utility import int_to_ip_str, ip_str_to_int import logging import struct import re split_slash = re.compile("/").split unpack_ipv4address = struct.Struct(">I").unpack pack_ipv4address = struct.Struct(">I").pack logging.basicConfig(format="%(levelname)s (%(funcName)s): %(message)s") logger = logging.getLogger("pra_framework") logger.setLevel(logging.DEBUG) class IPv4Address(object): def __init__(self, ip_int=None, ip_bytes=None, ip_str=None): if ip_int is not None: self._ip_int = ip_int elif ip_bytes is not None: self._ip_int = unpack_ipv4address(ip_bytes)[0] elif ip_str is not None: self._ip_int = ip_str_to_int(ip_str) packed = property(lambda self: pack_ipv4address(self._ip_int)) compressed = property(lambda self: int_to_ip_str(self._ip_int)) ip_int = property(lambda self: self._ip_int) def __repr__(self): return self.compressed class IPv4Network(object): def __init__(self, nw_ip_int=None, nw_ip_bytes=None, nw_ip_str=None, nw_ip_str_prefix=None, prefixlen=32): if nw_ip_int is not None: self._nw_ip_int = nw_ip_int elif nw_ip_bytes is not None: self._nw_ip_int = unpack_ipv4address(nw_ip_bytes)[0] elif nw_ip_str is not None: self._nw_ip_int = ip_str_to_int(nw_ip_str) elif nw_ip_str_prefix is not None: ip, prefix = split_slash(nw_ip_str_prefix) prefixlen = int(prefix) self._nw_ip_int = ip_str_to_int(ip) self._prefixlen = prefixlen prefixlen = property(lambda self: self._prefixlen) ip_int = property(lambda self: self._nw_ip_int) def subnets(self, prefixlen_diff): if (self._nw_ip_int << (self._prefixlen + prefixlen_diff) & 0xFFFFFFFF) != 0: raise Exception("Host bits are not empty: %d" % ((self._nw_ip_int << (self._prefixlen + prefixlen_diff)) & 0xFFFFFFFF)) prefixlen_new = self._prefixlen + prefixlen_diff if prefixlen_new > 32: raise Exception("32 CIDR bits reached") #logger.debug("new subnets: %d, new prefix: %d" % (2**prefixlen_diff, prefixlen_new)) nw_add = 2 ** (32 - prefixlen_new) #logger.debug("host bits: %d" % (32 - prefixlen_new)) nw_int = self._nw_ip_int return [IPv4Network(nw_ip_int=nw_int + nw_add * x, prefixlen=prefixlen_new) for x in range(2 ** prefixlen_diff)] def _get_num_addresses(self): try: return self._host_addresses except AttributeError: host_addresses = 2 ** (32 - self._prefixlen) self._host_addresses = host_addresses return host_addresses num_addresses = property(_get_num_addresses) def _get_hosts(self): try: host_addresses = self._host_addresses except AttributeError: host_addresses = 2 ** (32 - self._prefixlen) self._host_addresses = host_addresses return [pack_ipv4address(ip) for ip in range(self._nw_ip_int, self._nw_ip_int + host_addresses)] # get hosts as packed bytes (including netowrk and broadcast address hosts = property(_get_hosts) def __repr__(self): return self.compressed def __contains__(self, other): """ other -- IPv4Address object return -- True if other is contained in this network, False otherwise """ #logger.debug("checking: %r <-> %r" % (self, other)) try: host_addresses = self._host_addresses except AttributeError: host_addresses = 2 ** (32 - self._prefixlen) self._host_addresses = host_addresses try: #logger.debug("%d <= %d < %d" % (self._nw_ip_int, other._ip_int, (self._nw_ip_int + host_addresses))) return self._nw_ip_int <= other._ip_int < (self._nw_ip_int + host_addresses) except AttributeError: # only works on IPv4Address return False compressed = property(lambda self: int_to_ip_str(self._nw_ip_int) + "/%d" % self._prefixlen)