Ports.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import random, copy
  2. # information taken from https://www.cymru.com/jtk/misc/ephemeralports.html
  3. class PortRanges:
  4. # dynamic ports as listed by RFC 6056
  5. DYNAMIC_PORTS = range(49152, 65536)
  6. LINUX = range(32768, 61001)
  7. FREEBSD = range(10000, 65536)
  8. APPLE_IOS = DYNAMIC_PORTS
  9. APPLE_OSX = DYNAMIC_PORTS
  10. WINDOWS_7 = DYNAMIC_PORTS
  11. WINDOWS_8 = DYNAMIC_PORTS
  12. WINDOWS_VISTA = DYNAMIC_PORTS
  13. WINDOWS_XP = range(1024, 5001)
  14. # This class uses classes instead of functions so deepcloning works
  15. class PortSelectionStrategy:
  16. class sequential:
  17. def __init__(self):
  18. self.counter = -1
  19. # that function will always return a one higher counter than before,
  20. # restarting from the start once it reached the highest value
  21. def __call__(port_range):
  22. if self.counter == -1:
  23. self.counter = port_range.start
  24. port = counter
  25. self.counter += 1
  26. if self.counter == port_range.stop:
  27. self.counter = port_range.start
  28. return port
  29. class random:
  30. def __call__(port_range):
  31. return random.randrange(port_range.start, port_range.stop)
  32. class PortSelector:
  33. def __init__(self, port_range, select_function):
  34. self.port_range = port_range
  35. self._select_port = select_function
  36. self.generated = []
  37. def select_port(self):
  38. if len(self.generated) == len(self.port_range):
  39. raise RuntimeError("All %i port numbers were already generated, no more can be generated" % len(self.port_range))
  40. while True:
  41. port = self._select_port(self.port_range)
  42. if port not in self.generated:
  43. self.generated.append(port)
  44. return port
  45. def is_port_in_use(self, port):
  46. return port in self.generated
  47. def undo_port_use(self, port):
  48. if port in self.generated:
  49. self.generated.remove(port)
  50. else:
  51. raise ValueError("Port %i is not in use and thus can not be undone" % port)
  52. def reduce_size(self, size):
  53. self.generated = self.generated[-size:]
  54. def clear(self):
  55. self.generated = []
  56. def clone(self):
  57. return copy.deepcopy(self)
  58. class ProtocolPortSelector:
  59. def __init__(self, port_range, select_tcp, select_udp = None):
  60. self.tcp = PortSelector(port_range, select_tcp)
  61. self.udp = PortSelector(port_range, select_udp or select_tcp)
  62. def get_tcp_generator(self):
  63. return self.tcp
  64. def get_udp_generator(self):
  65. return self.udp
  66. def select_port_tcp(self):
  67. return self.tcp.select_port()
  68. def select_port_udp(self):
  69. return self.udp.select_port()
  70. def is_port_in_use_tcp(self, port):
  71. return self.tcp.is_port_in_use(port)
  72. def is_port_in_use_udp(self, port):
  73. return self.udp.is_port_in_use(port)
  74. def clone(self):
  75. class Tmp: pass
  76. clone = Tmp()
  77. clone.__class__ = type(self)
  78. clone.udp = self.udp.clone()
  79. clone.tcp = self.tcp.clone()
  80. return clone
  81. def __getattr__(self, attr):
  82. val = getattr(self.tcp, attr)
  83. if callable(val):
  84. tcp_meth = val
  85. udp_meth = getattr(self.udp, attr)
  86. def double_method(*args, **kwargs):
  87. return (tcp_meth(*args, **kwargs), udp_meth(*args, **kwargs))
  88. return double_method
  89. else:
  90. return (val, getattr(self.udp, attr))
  91. class PortSelectors:
  92. LINUX = ProtocolPortSelector(PortRanges.LINUX, PortSelectionStrategy.random)
  93. APPLE = ProtocolPortSelector(PortRanges.DYNAMIC_PORTS,
  94. PortSelectionStrategy.sequential(),
  95. PortSelectionStrategy.random())
  96. FREEBSD = ProtocolPortSelector(PortRanges.FREEBSD, PortSelectionStrategy.random)
  97. WINDOWS = ProtocolPortSelector(PortRanges.WINDOWS_7, PortSelectionStrategy.random) # the selection strategy is a guess as i can't find more info on it