123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- import random, copy
- # information taken from https://www.cymru.com/jtk/misc/ephemeralports.html
- class PortRanges:
- # dynamic ports as listed by RFC 6056
- DYNAMIC_PORTS = range(49152, 65536)
- LINUX = range(32768, 61001)
- FREEBSD = range(10000, 65536)
- APPLE_IOS = DYNAMIC_PORTS
- APPLE_OSX = DYNAMIC_PORTS
- WINDOWS_7 = DYNAMIC_PORTS
- WINDOWS_8 = DYNAMIC_PORTS
- WINDOWS_VISTA = DYNAMIC_PORTS
- WINDOWS_XP = range(1024, 5001)
- # This class uses classes instead of functions so deepcloning works
- class PortSelectionStrategy:
- class sequential:
- def __init__(self):
- self.counter = -1
- # that function will always return a one higher counter than before,
- # restarting from the start once it reached the highest value
- def __call__(self, port_range, *args):
- if self.counter == -1:
- self.counter = port_range.start
- port = self.counter
- self.counter += 1
- if self.counter == port_range.stop:
- self.counter = port_range.start
- return port
- class random:
- def __call__(self, port_range, *args):
- return random.randrange(port_range.start, port_range.stop)
- class linux_kernel:
- """
- A port-selectioin-strategy oriented on the linux-kernel
- The implementation follows https://github.com/torvalds/linux/blob/master/net/ipv4/inet_connection_sock.c#L173
- as much as possible when converting from one language to another (The newest file was used
- by the time of writing, make sure you select the correct one when following the link!)
- """
- def __call__(self, port_range: range, port_selector, *args):
- """
- This method is an attempt to map a c-function to python. To solve the goto-problem
- while-true's have been added. Both of the while-true's are placed where the original
- had a label to jump to. break's and continue's are set to preserve the original
- control flow. Another method could have been used to rewrite the c-code, however this
- was chosen to preserve the similarity between this and the original
- :param port_range: the port range to choose from
- :param port_selector: the port selector that tells which ports are in use
- :param args: Not used for now
- :return: A port number
- """
- port = 0
- low, high = port_range.start, port_range.stop
- # this var tells us if we should use the upper or lower port-range-half, or the whole range if
- # this var is None. The original was an enum of the values 0, 1 and 2. But I think an Optional[bool]
- # is more clear
- # None: use whole range, True: use lower half, False: use upper half
- attempt_half = True
- high += 1 # line 186 in the original file
- while True:
- if high - low < 4:
- attempt_half = None
- if attempt_half is not None:
- # appearently a fast method to find a number close to the real half
- # unless the difference between high and low is 4 (see above, note the 2-shift below)
- # this does not work
- half = low + (((high - low) >> 2) << 1)
- if attempt_half:
- high = half
- else:
- low = half
- remaining = high - low
- if remaining > 1:
- remaining &= ~1 # flip the 1-bit
- offset = random.randrange(0, remaining)
- offset |= 1;
- attempt_half_before = attempt_half # slight hack to keep track of change
- while True:
- port = low + offset
- for i in range(0, remaining, 2):
- if port >= high:
- port -= remaining
- if port_selector.is_port_in_use(port):
- port += 2
- continue
- return port
- offset -= 1
- if not (offset & 1):
- continue
- if attempt_half:
- attempt_half = False
- break
- if attempt_half_before: # we still got ports to search, attemp_half was just set to False
- continue
- if not attempt_half: # the port-range is exhausted
- break
- raise ValueError("Could not find suitable port")
- class PortSelector:
- """
- This class simulates a port-selection-process. Instances keep a list of port-numbers they generated so
- the same port-number will not be generated again.
- """
- def __init__(self, port_range, select_function):
- """
- Create a PortSelector given a range of ports to choose from and a function that chooses the next port
-
- :param port_range: a range-object containing the range of ports to choose from
- :param select_function: a function that receives the port_range and selects a port
- """
- if len(port_range) == 0:
- raise ValueError("cannot choose from an empty range")
- if port_range.start not in range(1, 65536) or port_range.stop not in range(1, 65536 + 1):
- raise ValueError("port_range is no subset of the valid port-range")
- self.port_range = port_range
- self._select_port = select_function
- self.generated = []
- def select_port(self):
- # do this check to avoid endless loops
- if len(self.generated) == len(self.port_range):
- raise RuntimeError(
- "All %i port numbers were already generated, no more can be generated" % len(self.port_range))
- while True:
- port = self._select_port(self.port_range, self)
- if port not in self.generated:
- self.generated.append(port)
- return port
- def is_port_in_use(self, port: int):
- return port in self.generated
- def undo_port_use(self, port: int):
- if port in self.generated:
- self.generated.remove(port)
- else:
- raise ValueError("Port %i is not in use and thus can not be undone" % port)
- def reduce_size(self, size: int):
- """
- Reduce the list of already generated ports to the last <size> generated.
- If size if bigger than the number of generated ports nothing happens.
- """
- self.generated = self.generated[-size:]
- def clear(self):
- """
- Clear the list of generated ports. As of now this does not reset the state of the selection-function
- """
- self.generated = []
- def clone(self):
- return copy.deepcopy(self)
- class ProtocolPortSelector:
- """
- This class contains a method to select ports for udp and tcp. It generally consists of the port-selectors, one
- for tcp and one for udp. For convenience this class has a __getattr__-method to call methods on both selectors
- at once. E.g, clear() does not exist for ProtocolPortSelector but it does for PortSelector, therefore
- protocolPortSelector.clear() will call clear for both port-selectors.
- """
- def __init__(self, port_range, select_tcp, select_udp=None):
- self.tcp = PortSelector(port_range, select_tcp)
- self.udp = PortSelector(port_range, select_udp or select_tcp)
- def get_tcp_generator(self):
- return self.tcp
- def get_udp_generator(self):
- return self.udp
- def select_port_tcp(self):
- return self.tcp.select_port()
- def select_port_udp(self):
- return self.udp.select_port()
- def is_port_in_use_tcp(self, port):
- return self.tcp.is_port_in_use(port)
- def is_port_in_use_udp(self, port):
- return self.udp.is_port_in_use(port)
- def clone(self):
- class Tmp: pass
- clone = Tmp()
- clone.__class__ = type(self)
- clone.udp = self.udp.clone()
- clone.tcp = self.tcp.clone()
- return clone
- def __getattr__(self, attr):
- val = getattr(self.tcp, attr)
- if callable(val): # we proprably got a method here
- tcp_meth = val
- udp_meth = getattr(self.udp, attr)
- def double_method(*args, **kwargs):
- return (tcp_meth(*args, **kwargs), udp_meth(*args, **kwargs))
- return double_method # calling this function will call the method for both port-selectors
- else: # we have found a simple value, return a tuple containing the attribute-value from both port-selectors
- return (val, getattr(self.udp, attr))
- class PortSelectors:
- """
- To save some time this class contains some of the port-selection-strategies found in the wild. It is recommend to use
- .clone() to get your personal copy, otherwise two parts of your code might select ports on the same port-selector which
- is something your might want to avoid.
- """
- LINUX = ProtocolPortSelector(PortRanges.LINUX, PortSelectionStrategy.random())
- APPLE = ProtocolPortSelector(PortRanges.DYNAMIC_PORTS,
- PortSelectionStrategy.sequential(),
- PortSelectionStrategy.random())
- FREEBSD = ProtocolPortSelector(PortRanges.FREEBSD, PortSelectionStrategy.random())
- WINDOWS = ProtocolPortSelector(PortRanges.WINDOWS_7,
- PortSelectionStrategy.random()) # the selection strategy is a guess as i can't find more info on it
|