import random, copy # information taken from https://www.cymru.com/jtk/misc/ephemeralports.html class PortRanges: # dynamic ports as listed by RFC 6056 DYNAMIC_PORTS = range(49152, 65536) LINUX = range(32768, 61001) FREEBSD = range(10000, 65536) APPLE_IOS = DYNAMIC_PORTS APPLE_OSX = DYNAMIC_PORTS WINDOWS_7 = DYNAMIC_PORTS WINDOWS_8 = DYNAMIC_PORTS WINDOWS_VISTA = DYNAMIC_PORTS WINDOWS_XP = range(1024, 5001) # This class uses classes instead of functions so deepcloning works class PortSelectionStrategy: class sequential: def __init__(self): self.counter = -1 # that function will always return a one higher counter than before, # restarting from the start once it reached the highest value def __call__(port_range): if self.counter == -1: self.counter = port_range.start port = counter self.counter += 1 if self.counter == port_range.stop: self.counter = port_range.start return port class random: def __call__(port_range): return random.randrange(port_range.start, port_range.stop) class PortSelector: """ This class simulates a port-selection-process. Instances keep a list of port-numbers they generated so the same port-number will not be generated again. """ def __init__(self, port_range, select_function): """ Create a PortSelector given a range of ports to choose from and a function that chooses the next port :param port_range: a range-object containing the range of ports to choose from :param select_function: a function that receives the port_range and selects a port """ if len(port_range) == 0: raise ValueError("cannot choose from an empty range") if x.start not in range(1, 65536) or x.stop not in range(1, 65536): raise ValueError("port_range is no subset of the valid port-range") self.port_range = port_range self._select_port = select_function self.generated = [] def select_port(self): # do this check to avoid endless loops if len(self.generated) == len(self.port_range): raise RuntimeError("All %i port numbers were already generated, no more can be generated" % len(self.port_range)) while True: port = self._select_port(self.port_range) if port not in self.generated: self.generated.append(port) return port def is_port_in_use(self, port: int): return port in self.generated def undo_port_use(self, port: int): if port in self.generated: self.generated.remove(port) else: raise ValueError("Port %i is not in use and thus can not be undone" % port) def reduce_size(self, size: int): """ Reduce the list of already generated ports to the last generated. If size if bigger than the number of generated ports nothing happens. """ self.generated = self.generated[-size:] def clear(self): """ Clear the list of generated ports. As of now this does not reset the state of the selection-function """ self.generated = [] def clone(self): return copy.deepcopy(self) class ProtocolPortSelector: """ This class contains a method to select ports for udp and tcp. It generally consists of the port-selectors, one for tcp and one for udp. For convenience this class has a __getattr__-method to call methods on both selectors at once. E.g, clear() does not exist for ProtocolPortSelector but it does for PortSelector, therefore protocolPortSelector.clear() will call clear for both port-selectors. """ def __init__(self, port_range, select_tcp, select_udp = None): self.tcp = PortSelector(port_range, select_tcp) self.udp = PortSelector(port_range, select_udp or select_tcp) def get_tcp_generator(self): return self.tcp def get_udp_generator(self): return self.udp def select_port_tcp(self): return self.tcp.select_port() def select_port_udp(self): return self.udp.select_port() def is_port_in_use_tcp(self, port): return self.tcp.is_port_in_use(port) def is_port_in_use_udp(self, port): return self.udp.is_port_in_use(port) def clone(self): class Tmp: pass clone = Tmp() clone.__class__ = type(self) clone.udp = self.udp.clone() clone.tcp = self.tcp.clone() return clone def __getattr__(self, attr): val = getattr(self.tcp, attr) if callable(val): # we proprably got a method here tcp_meth = val udp_meth = getattr(self.udp, attr) def double_method(*args, **kwargs): return (tcp_meth(*args, **kwargs), udp_meth(*args, **kwargs)) return double_method # calling this function will call the method for both port-selectors else: # we have found a simple value, return a tuple containing the attribute-value from both port-selectors return (val, getattr(self.udp, attr)) class PortSelectors: """ To save some time this class contains some of the port-selection-strategies found in the wild. It is recommend to use .clone() to get your personal copy, otherwise two parts of your code might select ports on the same port-selector which is something your might want to avoid. """ LINUX = ProtocolPortSelector(PortRanges.LINUX, PortSelectionStrategy.random) APPLE = ProtocolPortSelector(PortRanges.DYNAMIC_PORTS, PortSelectionStrategy.sequential(), PortSelectionStrategy.random()) FREEBSD = ProtocolPortSelector(PortRanges.FREEBSD, PortSelectionStrategy.random) WINDOWS = ProtocolPortSelector(PortRanges.WINDOWS_7, PortSelectionStrategy.random) # the selection strategy is a guess as i can't find more info on it