123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- """
- This is a highly performance optimized ipaddress like module.
- Subnet creating is approximatelly 50 times faster.
- """
- from utility import int_to_ip_str, ip_str_to_int
- import logging
- import struct
- import re
- split_slash = re.compile("/").split
- unpack_ipv4address = struct.Struct(">I").unpack
- pack_ipv4address = struct.Struct(">I").pack
- logging.basicConfig(format="%(levelname)s (%(funcName)s): %(message)s")
- logger = logging.getLogger("pra_framework")
- logger.setLevel(logging.DEBUG)
- class IPv4Address(object):
- def __init__(self, ip_int=None, ip_bytes=None, ip_str=None):
- if ip_int is not None:
- self._ip_int = ip_int
- elif ip_bytes is not None:
- self._ip_int = unpack_ipv4address(ip_bytes)[0]
- elif ip_str is not None:
- self._ip_int = ip_str_to_int(ip_str)
- packed = property(lambda self: pack_ipv4address(self._ip_int))
- compressed = property(lambda self: int_to_ip_str(self._ip_int))
- ip_int = property(lambda self: self._ip_int)
- def __repr__(self):
- return self.compressed
- class IPv4Network(object):
- def __init__(self, nw_ip_int=None, nw_ip_bytes=None, nw_ip_str=None, nw_ip_str_prefix=None, prefixlen=32):
- if nw_ip_int is not None:
- self._nw_ip_int = nw_ip_int
- elif nw_ip_bytes is not None:
- self._nw_ip_int = unpack_ipv4address(nw_ip_bytes)[0]
- elif nw_ip_str is not None:
- self._nw_ip_int = ip_str_to_int(nw_ip_str)
- elif nw_ip_str_prefix is not None:
- ip, prefix = split_slash(nw_ip_str_prefix)
- prefixlen = int(prefix)
- self._nw_ip_int = ip_str_to_int(ip)
- self._prefixlen = prefixlen
- prefixlen = property(lambda self: self._prefixlen)
- ip_int = property(lambda self: self._nw_ip_int)
- def subnets(self, prefixlen_diff):
- if (self._nw_ip_int << (self._prefixlen + prefixlen_diff) & 0xFFFFFFFF) != 0:
- raise Exception("Host bits are not empty: %d" %
- ((self._nw_ip_int << (self._prefixlen + prefixlen_diff)) & 0xFFFFFFFF))
- prefixlen_new = self._prefixlen + prefixlen_diff
- if prefixlen_new > 32:
- raise Exception("32 CIDR bits reached")
- #logger.debug("new subnets: %d, new prefix: %d" % (2**prefixlen_diff, prefixlen_new))
- nw_add = 2 ** (32 - prefixlen_new)
- #logger.debug("host bits: %d" % (32 - prefixlen_new))
- nw_int = self._nw_ip_int
- return [IPv4Network(nw_ip_int=nw_int + nw_add * x, prefixlen=prefixlen_new) for x in range(2 ** prefixlen_diff)]
- def _get_num_addresses(self):
- try:
- return self._host_addresses
- except AttributeError:
- host_addresses = 2 ** (32 - self._prefixlen)
- self._host_addresses = host_addresses
- return host_addresses
- num_addresses = property(_get_num_addresses)
- def _get_hosts(self):
- try:
- host_addresses = self._host_addresses
- except AttributeError:
- host_addresses = 2 ** (32 - self._prefixlen)
- self._host_addresses = host_addresses
- return [pack_ipv4address(ip) for ip in range(self._nw_ip_int, self._nw_ip_int + host_addresses)]
- # get hosts as packed bytes (including netowrk and broadcast address
- hosts = property(_get_hosts)
- def __repr__(self):
- return self.compressed
- def __contains__(self, other):
- """
- other -- IPv4Address object
- return -- True if other is contained in this network, False otherwise
- """
- #logger.debug("checking: %r <-> %r" % (self, other))
- try:
- host_addresses = self._host_addresses
- except AttributeError:
- host_addresses = 2 ** (32 - self._prefixlen)
- self._host_addresses = host_addresses
- try:
- #logger.debug("%d <= %d < %d" % (self._nw_ip_int, other._ip_int, (self._nw_ip_int + host_addresses)))
- return self._nw_ip_int <= other._ip_int < (self._nw_ip_int + host_addresses)
- except AttributeError:
- # only works on IPv4Address
- return False
- compressed = property(lambda self: int_to_ip_str(self._nw_ip_int) + "/%d" % self._prefixlen)
|