import random, copy # information taken from https://www.cymru.com/jtk/misc/ephemeralports.html class PortRanges: # dynamic ports as listed by RFC 6056 DYNAMIC_PORTS = range(49152, 65536) LINUX = range(32768, 61001) FREEBSD = range(10000, 65536) APPLE_IOS = DYNAMIC_PORTS APPLE_OSX = DYNAMIC_PORTS WINDOWS_7 = DYNAMIC_PORTS WINDOWS_8 = DYNAMIC_PORTS WINDOWS_VISTA = DYNAMIC_PORTS WINDOWS_XP = range(1024, 5001) # This class uses classes instead of functions so deepcloning works class PortSelectionStrategy: class sequential: def __init__(self): self.counter = -1 # that function will always return a one higher counter than before, # restarting from the start once it reached the highest value def __call__(port_range): if self.counter == -1: self.counter = port_range.start port = counter self.counter += 1 if self.counter == port_range.stop: self.counter = port_range.start return port class random: def __call__(port_range): return random.randrange(port_range.start, port_range.stop) class PortSelector: def __init__(self, port_range, select_function): self.port_range = port_range self._select_port = select_function self.generated = [] def select_port(self): if len(self.generated) == len(self.port_range): raise RuntimeError("All %i port numbers were already generated, no more can be generated" % len(self.port_range)) while True: port = self._select_port(self.port_range) if port not in self.generated: self.generated.append(port) return port def is_port_in_use(self, port): return port in self.generated def undo_port_use(self, port): if port in self.generated: self.generated.remove(port) else: raise ValueError("Port %i is not in use and thus can not be undone" % port) def reduce_size(self, size): self.generated = self.generated[-size:] def clear(self): self.generated = [] def clone(self): return copy.deepcopy(self) class ProtocolPortSelector: def __init__(self, port_range, select_tcp, select_udp = None): self.tcp = PortSelector(port_range, select_tcp) self.udp = PortSelector(port_range, select_udp or select_tcp) def get_tcp_generator(self): return self.tcp def get_udp_generator(self): return self.udp def select_port_tcp(self): return self.tcp.select_port() def select_port_udp(self): return self.udp.select_port() def is_port_in_use_tcp(self, port): return self.tcp.is_port_in_use(port) def is_port_in_use_udp(self, port): return self.udp.is_port_in_use(port) def clone(self): class Tmp: pass clone = Tmp() clone.__class__ = type(self) clone.udp = self.udp.clone() clone.tcp = self.tcp.clone() return clone def __getattr__(self, attr): val = getattr(self.tcp, attr) if callable(val): tcp_meth = val udp_meth = getattr(self.udp, attr) def double_method(*args, **kwargs): return (tcp_meth(*args, **kwargs), udp_meth(*args, **kwargs)) return double_method else: return (val, getattr(self.udp, attr)) class PortSelectors: LINUX = ProtocolPortSelector(PortRanges.LINUX, PortSelectionStrategy.random) APPLE = ProtocolPortSelector(PortRanges.DYNAMIC_PORTS, PortSelectionStrategy.sequential(), PortSelectionStrategy.random()) FREEBSD = ProtocolPortSelector(PortRanges.FREEBSD, PortSelectionStrategy.random) WINDOWS = ProtocolPortSelector(PortRanges.WINDOWS_7, PortSelectionStrategy.random) # the selection strategy is a guess as i can't find more info on it