ipv4.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. """
  2. This is a highly performance optimized ipaddress like module.
  3. Subnet creating is approximatelly 50 times faster.
  4. """
  5. from utility import int_to_ip_str, ip_str_to_int
  6. import logging
  7. import struct
  8. import re
  9. split_slash = re.compile("/").split
  10. unpack_ipv4address = struct.Struct(">I").unpack
  11. pack_ipv4address = struct.Struct(">I").pack
  12. logging.basicConfig(format="%(levelname)s (%(funcName)s): %(message)s")
  13. logger = logging.getLogger("pra_framework")
  14. logger.setLevel(logging.DEBUG)
  15. class IPv4Address(object):
  16. def __init__(self, ip_int=None, ip_bytes=None, ip_str=None):
  17. if ip_int is not None:
  18. self._ip_int = ip_int
  19. elif ip_bytes is not None:
  20. self._ip_int = unpack_ipv4address(ip_bytes)[0]
  21. elif ip_str is not None:
  22. self._ip_int = ip_str_to_int(ip_str)
  23. packed = property(lambda self: pack_ipv4address(self._ip_int))
  24. compressed = property(lambda self: int_to_ip_str(self._ip_int))
  25. ip_int = property(lambda self: self._ip_int)
  26. def __repr__(self):
  27. return self.compressed
  28. class IPv4Network(object):
  29. def __init__(self, nw_ip_int=None, nw_ip_bytes=None, nw_ip_str=None, nw_ip_str_prefix=None, prefixlen=32):
  30. if nw_ip_int is not None:
  31. self._nw_ip_int = nw_ip_int
  32. elif nw_ip_bytes is not None:
  33. self._nw_ip_int = unpack_ipv4address(nw_ip_bytes)[0]
  34. elif nw_ip_str is not None:
  35. self._nw_ip_int = ip_str_to_int(nw_ip_str)
  36. elif nw_ip_str_prefix is not None:
  37. ip, prefix = split_slash(nw_ip_str_prefix)
  38. prefixlen = int(prefix)
  39. self._nw_ip_int = ip_str_to_int(ip)
  40. self._prefixlen = prefixlen
  41. prefixlen = property(lambda self: self._prefixlen)
  42. ip_int = property(lambda self: self._nw_ip_int)
  43. def subnets(self, prefixlen_diff):
  44. if (self._nw_ip_int << (self._prefixlen + prefixlen_diff) & 0xFFFFFFFF) != 0:
  45. raise Exception("Host bits are not empty: %d" %
  46. ((self._nw_ip_int << (self._prefixlen + prefixlen_diff)) & 0xFFFFFFFF))
  47. prefixlen_new = self._prefixlen + prefixlen_diff
  48. if prefixlen_new > 32:
  49. raise Exception("32 CIDR bits reached")
  50. #logger.debug("new subnets: %d, new prefix: %d" % (2**prefixlen_diff, prefixlen_new))
  51. nw_add = 2 ** (32 - prefixlen_new)
  52. #logger.debug("host bits: %d" % (32 - prefixlen_new))
  53. nw_int = self._nw_ip_int
  54. return [IPv4Network(nw_ip_int=nw_int + nw_add * x, prefixlen=prefixlen_new) for x in range(2 ** prefixlen_diff)]
  55. def _get_num_addresses(self):
  56. try:
  57. return self._host_addresses
  58. except AttributeError:
  59. host_addresses = 2 ** (32 - self._prefixlen)
  60. self._host_addresses = host_addresses
  61. return host_addresses
  62. num_addresses = property(_get_num_addresses)
  63. def _get_hosts(self):
  64. try:
  65. host_addresses = self._host_addresses
  66. except AttributeError:
  67. host_addresses = 2 ** (32 - self._prefixlen)
  68. self._host_addresses = host_addresses
  69. return [pack_ipv4address(ip) for ip in range(self._nw_ip_int, self._nw_ip_int + host_addresses)]
  70. # get hosts as packed bytes (including netowrk and broadcast address
  71. hosts = property(_get_hosts)
  72. def __repr__(self):
  73. return self.compressed
  74. def __contains__(self, other):
  75. """
  76. other -- IPv4Address object
  77. return -- True if other is contained in this network, False otherwise
  78. """
  79. #logger.debug("checking: %r <-> %r" % (self, other))
  80. try:
  81. host_addresses = self._host_addresses
  82. except AttributeError:
  83. host_addresses = 2 ** (32 - self._prefixlen)
  84. self._host_addresses = host_addresses
  85. try:
  86. #logger.debug("%d <= %d < %d" % (self._nw_ip_int, other._ip_int, (self._nw_ip_int + host_addresses)))
  87. return self._nw_ip_int <= other._ip_int < (self._nw_ip_int + host_addresses)
  88. except AttributeError:
  89. # only works on IPv4Address
  90. return False
  91. compressed = property(lambda self: int_to_ip_str(self._nw_ip_int) + "/%d" % self._prefixlen)