Ports.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. import random, copy
  2. # information taken from https://www.cymru.com/jtk/misc/ephemeralports.html
  3. class PortRanges:
  4. # dynamic ports as listed by RFC 6056
  5. DYNAMIC_PORTS = range(49152, 65536)
  6. LINUX = range(32768, 61001)
  7. FREEBSD = range(10000, 65536)
  8. APPLE_IOS = DYNAMIC_PORTS
  9. APPLE_OSX = DYNAMIC_PORTS
  10. WINDOWS_7 = DYNAMIC_PORTS
  11. WINDOWS_8 = DYNAMIC_PORTS
  12. WINDOWS_VISTA = DYNAMIC_PORTS
  13. WINDOWS_XP = range(1024, 5001)
  14. # This class uses classes instead of functions so deepcloning works
  15. class PortSelectionStrategy:
  16. class sequential:
  17. def __init__(self):
  18. self.counter = -1
  19. # that function will always return a one higher counter than before,
  20. # restarting from the start once it reached the highest value
  21. def __call__(self, port_range, *args):
  22. if self.counter == -1:
  23. self.counter = port_range.start
  24. port = self.counter
  25. self.counter += 1
  26. if self.counter == port_range.stop:
  27. self.counter = port_range.start
  28. return port
  29. class random:
  30. def __call__(self, port_range, *args):
  31. return random.randrange(port_range.start, port_range.stop)
  32. class linux_kernel:
  33. """
  34. A port-selectioin-strategy oriented on the linux-kernel
  35. The implementation follows https://github.com/torvalds/linux/blob/master/net/ipv4/inet_connection_sock.c#L173
  36. as much as possible when converting from one language to another (The newest file was used
  37. by the time of writing, make sure you select the correct one when following the link!)
  38. """
  39. def __call__(self, port_range: range, port_selector, *args):
  40. """
  41. This method is an attempt to map a c-function to python. To solve the goto-problem
  42. while-true's have been added. Both of the while-true's are placed where the original
  43. had a label to jump to. break's and continue's are set to preserve the original
  44. control flow. Another method could have been used to rewrite the c-code, however this
  45. was chosen to preserve the similarity between this and the original
  46. :param port_range: the port range to choose from
  47. :param port_selector: the port selector that tells which ports are in use
  48. :param args: Not used for now
  49. :return: A port number
  50. """
  51. port = 0
  52. low, high = port_range.start, port_range.stop
  53. # this var tells us if we should use the upper or lower port-range-half, or the whole range if
  54. # this var is None. The original was an enum of the values 0, 1 and 2. But I think an Optional[bool]
  55. # is more clear
  56. # None: use whole range, True: use lower half, False: use upper half
  57. attempt_half = True
  58. high += 1 # line 186 in the original file
  59. while True:
  60. if high - low < 4:
  61. attempt_half = None
  62. if attempt_half is not None:
  63. # appearently a fast method to find a number close to the real half
  64. # unless the difference between high and low is 4 (see above, note the 2-shift below)
  65. # this does not work
  66. half = low + (((high - low) >> 2) << 1)
  67. if attempt_half:
  68. high = half
  69. else:
  70. low = half
  71. remaining = high - low
  72. if remaining > 1:
  73. remaining &= ~1 # flip the 1-bit
  74. offset = random.randrange(0, remaining)
  75. offset |= 1;
  76. attempt_half_before = attempt_half # slight hack to keep track of change
  77. while True:
  78. port = low + offset
  79. for i in range(0, remaining, 2):
  80. if port >= high:
  81. port -= remaining
  82. if port_selector.is_port_in_use(port):
  83. port += 2
  84. continue
  85. return port
  86. offset -= 1
  87. if not (offset & 1):
  88. continue
  89. if attempt_half:
  90. attempt_half = False
  91. break
  92. if attempt_half_before: # we still got ports to search, attemp_half was just set to False
  93. continue
  94. if not attempt_half: # the port-range is exhausted
  95. break
  96. raise ValueError("Could not find suitable port")
  97. class PortSelector:
  98. """
  99. This class simulates a port-selection-process. Instances keep a list of port-numbers they generated so
  100. the same port-number will not be generated again.
  101. """
  102. def __init__(self, port_range, select_function):
  103. """
  104. Create a PortSelector given a range of ports to choose from and a function that chooses the next port
  105. :param port_range: a range-object containing the range of ports to choose from
  106. :param select_function: a function that receives the port_range and selects a port
  107. """
  108. if len(port_range) == 0:
  109. raise ValueError("cannot choose from an empty range")
  110. if port_range.start not in range(1, 65536) or port_range.stop not in range(1, 65536 + 1):
  111. raise ValueError("port_range is no subset of the valid port-range")
  112. self.port_range = port_range
  113. self._select_port = select_function
  114. self.generated = []
  115. def select_port(self):
  116. # do this check to avoid endless loops
  117. if len(self.generated) == len(self.port_range):
  118. raise RuntimeError(
  119. "All %i port numbers were already generated, no more can be generated" % len(self.port_range))
  120. while True:
  121. port = self._select_port(self.port_range, self)
  122. if port not in self.generated:
  123. self.generated.append(port)
  124. return port
  125. def is_port_in_use(self, port: int):
  126. return port in self.generated
  127. def undo_port_use(self, port: int):
  128. if port in self.generated:
  129. self.generated.remove(port)
  130. else:
  131. raise ValueError("Port %i is not in use and thus can not be undone" % port)
  132. def reduce_size(self, size: int):
  133. """
  134. Reduce the list of already generated ports to the last <size> generated.
  135. If size if bigger than the number of generated ports nothing happens.
  136. """
  137. self.generated = self.generated[-size:]
  138. def clear(self):
  139. """
  140. Clear the list of generated ports. As of now this does not reset the state of the selection-function
  141. """
  142. self.generated = []
  143. def clone(self):
  144. return copy.deepcopy(self)
  145. class ProtocolPortSelector:
  146. """
  147. This class contains a method to select ports for udp and tcp. It generally consists of the port-selectors, one
  148. for tcp and one for udp. For convenience this class has a __getattr__-method to call methods on both selectors
  149. at once. E.g, clear() does not exist for ProtocolPortSelector but it does for PortSelector, therefore
  150. protocolPortSelector.clear() will call clear for both port-selectors.
  151. """
  152. def __init__(self, port_range, select_tcp, select_udp=None):
  153. self.tcp = PortSelector(port_range, select_tcp)
  154. self.udp = PortSelector(port_range, select_udp or select_tcp)
  155. def get_tcp_generator(self):
  156. return self.tcp
  157. def get_udp_generator(self):
  158. return self.udp
  159. def select_port_tcp(self):
  160. return self.tcp.select_port()
  161. def select_port_udp(self):
  162. return self.udp.select_port()
  163. def is_port_in_use_tcp(self, port):
  164. return self.tcp.is_port_in_use(port)
  165. def is_port_in_use_udp(self, port):
  166. return self.udp.is_port_in_use(port)
  167. def clone(self):
  168. class Tmp: pass
  169. clone = Tmp()
  170. clone.__class__ = type(self)
  171. clone.udp = self.udp.clone()
  172. clone.tcp = self.tcp.clone()
  173. return clone
  174. def __getattr__(self, attr):
  175. val = getattr(self.tcp, attr)
  176. if callable(val): # we proprably got a method here
  177. tcp_meth = val
  178. udp_meth = getattr(self.udp, attr)
  179. def double_method(*args, **kwargs):
  180. return (tcp_meth(*args, **kwargs), udp_meth(*args, **kwargs))
  181. return double_method # calling this function will call the method for both port-selectors
  182. else: # we have found a simple value, return a tuple containing the attribute-value from both port-selectors
  183. return (val, getattr(self.udp, attr))
  184. class PortSelectors:
  185. """
  186. To save some time this class contains some of the port-selection-strategies found in the wild. It is recommend to use
  187. .clone() to get your personal copy, otherwise two parts of your code might select ports on the same port-selector which
  188. is something your might want to avoid.
  189. """
  190. LINUX = ProtocolPortSelector(PortRanges.LINUX, PortSelectionStrategy.random())
  191. APPLE = ProtocolPortSelector(PortRanges.DYNAMIC_PORTS,
  192. PortSelectionStrategy.sequential(),
  193. PortSelectionStrategy.random())
  194. FREEBSD = ProtocolPortSelector(PortRanges.FREEBSD, PortSelectionStrategy.random())
  195. WINDOWS = ProtocolPortSelector(PortRanges.WINDOWS_7,
  196. PortSelectionStrategy.random()) # the selection strategy is a guess as i can't find more info on it