IPGenerator.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import random
  2. from .. import IPv4 as ip
  3. class IPChooser:
  4. def random_ip(self) -> ip.IPAddress:
  5. return ip.IPAddress.from_int(random.randrange(0, 1 << 32))
  6. def size(self) -> int:
  7. return 1 << 32
  8. def __len__(self) -> int:
  9. return self.size()
  10. class IPChooserByRange(IPChooser):
  11. def __init__(self, ip_range: ip.IPAddressBlock) -> "IPChooserByRange":
  12. self.range = ip_range
  13. def random_ip(self) -> ip.IPAddress:
  14. start = int(self.range.first_address())
  15. end = start + self.range.block_size()
  16. return ip.IPAddress.from_int(random.randrange(start, end))
  17. def size(self) -> int:
  18. return self.range.block_size()
  19. class IPChooserByList(IPChooser):
  20. def __init__(self, ips: "list[ip.IPAddress]") -> "IPChooserByList":
  21. self.ips = list(ips)
  22. if not self.ips:
  23. raise ValueError("list of ips must not be empty")
  24. def random_ip(self) -> ip.IPAddress:
  25. return random.choice(self.ips)
  26. def size(self) -> int:
  27. return len(self.ips)
  28. class IPGenerator:
  29. def __init__(self, ip_chooser = IPChooser(), # include all ip-addresses by default (before the blacklist)
  30. include_private_ips = False, include_localhost = False,
  31. include_multicast = False, include_reserved = False,
  32. include_link_local = False, blacklist = None) -> "IPGenerator":
  33. self.blacklist = []
  34. self.generated_ips = set()
  35. if not include_private_ips:
  36. for segment in ip.ReservedIPBlocks.PRIVATE_IP_SEGMENTS:
  37. self.add_to_blacklist(segment)
  38. if not include_localhost:
  39. self.add_to_blacklist(ip.ReservedIPBlocks.LOCALHOST_SEGMENT)
  40. if not include_multicast:
  41. self.add_to_blacklist(ip.ReservedIPBlocks.MULTICAST_SEGMENT)
  42. if not include_reserved:
  43. self.add_to_blacklist(ip.ReservedIPBlocks.RESERVED_SEGMENT)
  44. if not include_link_local:
  45. self.add_to_blacklist(ip.ReservedIPBlocks.ZERO_CONF_SEGMENT)
  46. if blacklist:
  47. for segment in blacklist:
  48. self.add_to_blacklist(segment)
  49. self.chooser = ip_chooser
  50. @staticmethod
  51. def from_range(range: ip.IPAddressBlock, *args, **kwargs) -> "IPGenerator":
  52. return IPGenerator(IPChooserByRange(range), *args, **kwargs)
  53. def add_to_blacklist(self, ip_segment: "Union[ip.IPAddressBlock, str]"):
  54. if isinstance(ip_segment, ip.IPAddressBlock):
  55. self.blacklist.append(ip_segment)
  56. else:
  57. self.blacklist.append(ip.IPAddressBlock.parse(ip_segment))
  58. def random_ip(self) -> ip.IPAddress:
  59. if len(self.generated_ips) == self.chooser.size():
  60. raise ValueError("Exhausted the space of possible ip-addresses, no new unique ip-address can be generated")
  61. while True:
  62. random_ip = self.chooser.random_ip()
  63. if not self._is_in_blacklist(random_ip) and random_ip not in self.generated_ips:
  64. self.generated_ips.add(random_ip)
  65. return str(random_ip)
  66. def clear(self, clear_blacklist = True, clear_generated_ips = True):
  67. if clear_blacklist: self.blacklist.clear()
  68. if clear_generated_ips: self.generated_ips.clear()
  69. def _is_in_blacklist(self, ip: ip.IPAddress) -> bool:
  70. return any(ip in block for block in self.blacklist)
  71. class MappingIPGenerator(IPGenerator):
  72. def __init__(self, *args, **kwargs) -> "MappingIPGenerator":
  73. super().__init__(self, *args, **kwargs)
  74. self.mapping = {}
  75. def clear(self, clear_generated_ips = True, *args, **kwargs):
  76. super().clear(self, clear_generated_ips = clear_generated_ips, *args, **kwargs)
  77. if clear_generated_ips:
  78. self.mapping = {}
  79. def get_mapped_ip(self, key) -> ip.IPAddress:
  80. if key not in self.mapping:
  81. self.mapping[key] = self.random_ip()
  82. return self.mapping[key]
  83. def __getitem__(self, item) -> ip.IPAddress:
  84. return self.get_mapped_ip(item)