|
@@ -0,0 +1,123 @@
|
|
|
+import random
|
|
|
+
|
|
|
+# information taken from https://www.cymru.com/jtk/misc/ephemeralports.html
|
|
|
+class PortRanges:
|
|
|
+ # dynamic ports as listed by RFC 6056
|
|
|
+ DYNAMIC_PORTS = range(49152, 65536)
|
|
|
+
|
|
|
+ LINUX = range(32768, 61001)
|
|
|
+ FREEBSD = range(10000, 65536)
|
|
|
+
|
|
|
+ APPLE_IOS = DYNAMIC_PORTS
|
|
|
+ APPLE_OSX = DYNAMIC_PORTS
|
|
|
+
|
|
|
+ WINDOWS_7 = DYNAMIC_PORTS
|
|
|
+ WINDOWS_8 = DYNAMIC_PORTS
|
|
|
+ WINDOWS_VISTA = DYNAMIC_PORTS
|
|
|
+ WINDOWS_XP = range(1024, 5001)
|
|
|
+
|
|
|
+class PortSelectionStrategy:
|
|
|
+ @staticmethod
|
|
|
+ def sequential():
|
|
|
+ counter = -1
|
|
|
+
|
|
|
+ # that function will always return a one higher counter than before,
|
|
|
+ # restarting from the start once it reached the highest value
|
|
|
+ def select_port(port_range):
|
|
|
+ global counter
|
|
|
+ if counter == -1:
|
|
|
+ counter = port_range.start
|
|
|
+
|
|
|
+ port = counter
|
|
|
+
|
|
|
+ counter += 1
|
|
|
+ if counter == port_range.stop:
|
|
|
+ counter = port_range.start
|
|
|
+
|
|
|
+ return port
|
|
|
+
|
|
|
+ return select_port
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def random(port_range):
|
|
|
+ return random.randrange(port_range.start, port_range.stop)
|
|
|
+
|
|
|
+class PortSelector:
|
|
|
+ def __init__(self, port_range, select_function):
|
|
|
+ self.port_range = port_range
|
|
|
+
|
|
|
+ self._select_port = select_function
|
|
|
+
|
|
|
+ self.generated = []
|
|
|
+
|
|
|
+ def select_port(self):
|
|
|
+ if len(self.generated) == len(self.port_range):
|
|
|
+ raise RuntimeError("All %i port numbers were already generated, no more can be generated" % len(self.port_range))
|
|
|
+
|
|
|
+ while True:
|
|
|
+ port = self._select_port(self.port_range)
|
|
|
+
|
|
|
+ if port not in self.generated:
|
|
|
+ self.generated.append(port)
|
|
|
+ return port
|
|
|
+
|
|
|
+ def is_port_in_use(self, port):
|
|
|
+ return port in self.generated
|
|
|
+
|
|
|
+ def undo_port_use(self, port):
|
|
|
+ if port in self.generated:
|
|
|
+ self.generated.remove(port)
|
|
|
+ else:
|
|
|
+ raise ValueError("Port %i is not in use and thus can not be undone" % port)
|
|
|
+
|
|
|
+ def reduce_size(self, size):
|
|
|
+ self.generated = self.generated[-size:]
|
|
|
+
|
|
|
+ def clear(self):
|
|
|
+ self.generated = []
|
|
|
+
|
|
|
+class ProtocolPortSelector:
|
|
|
+ def __init__(self, port_range, select_tcp, select_udp = None):
|
|
|
+ self.tcp = PortSelector(port_range, select_tcp)
|
|
|
+ self.udp = PortSelector(port_range, select_udp or select_tcp)
|
|
|
+
|
|
|
+ def get_tcp_generator(self):
|
|
|
+ return self.tcp
|
|
|
+
|
|
|
+ def get_udp_generator(self):
|
|
|
+ return self.udp
|
|
|
+
|
|
|
+ def select_port_tcp(self):
|
|
|
+ return self.tcp.select_port()
|
|
|
+
|
|
|
+ def select_port_udp(self):
|
|
|
+ return self.udp.select_port()
|
|
|
+
|
|
|
+ def is_port_in_use_tcp(self, port):
|
|
|
+ return self.tcp.is_port_in_use(port)
|
|
|
+
|
|
|
+ def is_port_in_use_udp(self, port):
|
|
|
+ return self.udp.is_port_in_use(port)
|
|
|
+
|
|
|
+ def __getattr__(self, attr):
|
|
|
+ val = getattr(self.tcp, attr)
|
|
|
+
|
|
|
+ if callable(val):
|
|
|
+ tcp_meth = val
|
|
|
+ udp_meth = getattr(self.udp, attr)
|
|
|
+
|
|
|
+ def double_method(*args, **kwargs):
|
|
|
+ return (tcp_meth(*args, **kwargs), udp_meth(*args, **kwargs))
|
|
|
+
|
|
|
+ return double_method
|
|
|
+ else:
|
|
|
+ return (val, getattr(self.udp, attr))
|
|
|
+
|
|
|
+class PortSelectors:
|
|
|
+ LINUX = ProtocolPortSelector(PortRanges.LINUX, PortSelectionStrategy.random)
|
|
|
+ APPLE = ProtocolPortSelector(PortRanges.DYNAMIC_PORTS,
|
|
|
+ PortSelectionStrategy.sequential(),
|
|
|
+ PortSelectionStrategy.random())
|
|
|
+ FREEBSD = ProtocolPortSelector(PortRanges.FREEBSD, PortSelectionStrategy.random)
|
|
|
+ WINDOWS = ProtocolPortSelector(PortRanges.WINDOWS_7, PortSelectionStrategy.random) # the selection strategy is a guess as i can't find more info on it
|
|
|
+
|