Ports.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import random
  2. # information taken from https://www.cymru.com/jtk/misc/ephemeralports.html
  3. class PortRanges:
  4. # dynamic ports as listed by RFC 6056
  5. DYNAMIC_PORTS = range(49152, 65536)
  6. LINUX = range(32768, 61001)
  7. FREEBSD = range(10000, 65536)
  8. APPLE_IOS = DYNAMIC_PORTS
  9. APPLE_OSX = DYNAMIC_PORTS
  10. WINDOWS_7 = DYNAMIC_PORTS
  11. WINDOWS_8 = DYNAMIC_PORTS
  12. WINDOWS_VISTA = DYNAMIC_PORTS
  13. WINDOWS_XP = range(1024, 5001)
  14. class PortSelectionStrategy:
  15. @staticmethod
  16. def sequential():
  17. counter = -1
  18. # that function will always return a one higher counter than before,
  19. # restarting from the start once it reached the highest value
  20. def select_port(port_range):
  21. global counter
  22. if counter == -1:
  23. counter = port_range.start
  24. port = counter
  25. counter += 1
  26. if counter == port_range.stop:
  27. counter = port_range.start
  28. return port
  29. return select_port
  30. @staticmethod
  31. def random(port_range):
  32. return random.randrange(port_range.start, port_range.stop)
  33. class PortSelector:
  34. def __init__(self, port_range, select_function):
  35. self.port_range = port_range
  36. self._select_port = select_function
  37. self.generated = []
  38. def select_port(self):
  39. if len(self.generated) == len(self.port_range):
  40. raise RuntimeError("All %i port numbers were already generated, no more can be generated" % len(self.port_range))
  41. while True:
  42. port = self._select_port(self.port_range)
  43. if port not in self.generated:
  44. self.generated.append(port)
  45. return port
  46. def is_port_in_use(self, port):
  47. return port in self.generated
  48. def undo_port_use(self, port):
  49. if port in self.generated:
  50. self.generated.remove(port)
  51. else:
  52. raise ValueError("Port %i is not in use and thus can not be undone" % port)
  53. def reduce_size(self, size):
  54. self.generated = self.generated[-size:]
  55. def clear(self):
  56. self.generated = []
  57. class ProtocolPortSelector:
  58. def __init__(self, port_range, select_tcp, select_udp = None):
  59. self.tcp = PortSelector(port_range, select_tcp)
  60. self.udp = PortSelector(port_range, select_udp or select_tcp)
  61. def get_tcp_generator(self):
  62. return self.tcp
  63. def get_udp_generator(self):
  64. return self.udp
  65. def select_port_tcp(self):
  66. return self.tcp.select_port()
  67. def select_port_udp(self):
  68. return self.udp.select_port()
  69. def is_port_in_use_tcp(self, port):
  70. return self.tcp.is_port_in_use(port)
  71. def is_port_in_use_udp(self, port):
  72. return self.udp.is_port_in_use(port)
  73. def __getattr__(self, attr):
  74. val = getattr(self.tcp, attr)
  75. if callable(val):
  76. tcp_meth = val
  77. udp_meth = getattr(self.udp, attr)
  78. def double_method(*args, **kwargs):
  79. return (tcp_meth(*args, **kwargs), udp_meth(*args, **kwargs))
  80. return double_method
  81. else:
  82. return (val, getattr(self.udp, attr))
  83. class PortSelectors:
  84. LINUX = ProtocolPortSelector(PortRanges.LINUX, PortSelectionStrategy.random)
  85. APPLE = ProtocolPortSelector(PortRanges.DYNAMIC_PORTS,
  86. PortSelectionStrategy.sequential(),
  87. PortSelectionStrategy.random())
  88. FREEBSD = ProtocolPortSelector(PortRanges.FREEBSD, PortSelectionStrategy.random)
  89. WINDOWS = ProtocolPortSelector(PortRanges.WINDOWS_7, PortSelectionStrategy.random) # the selection strategy is a guess as i can't find more info on it