123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import random, copy
- # information taken from https://www.cymru.com/jtk/misc/ephemeralports.html
- class PortRanges:
- # dynamic ports as listed by RFC 6056
- DYNAMIC_PORTS = range(49152, 65536)
-
- LINUX = range(32768, 61001)
- FREEBSD = range(10000, 65536)
-
- APPLE_IOS = DYNAMIC_PORTS
- APPLE_OSX = DYNAMIC_PORTS
-
- WINDOWS_7 = DYNAMIC_PORTS
- WINDOWS_8 = DYNAMIC_PORTS
- WINDOWS_VISTA = DYNAMIC_PORTS
- WINDOWS_XP = range(1024, 5001)
- # This class uses classes instead of functions so deepcloning works
- class PortSelectionStrategy:
- class sequential:
- def __init__(self):
- self.counter = -1
-
- # that function will always return a one higher counter than before,
- # restarting from the start once it reached the highest value
- def __call__(port_range):
- if self.counter == -1:
- self.counter = port_range.start
-
- port = counter
-
- self.counter += 1
- if self.counter == port_range.stop:
- self.counter = port_range.start
-
- return port
- class random:
- def __call__(port_range):
- return random.randrange(port_range.start, port_range.stop)
- class PortSelector:
- def __init__(self, port_range, select_function):
- self.port_range = port_range
-
- self._select_port = select_function
-
- self.generated = []
-
- def select_port(self):
- if len(self.generated) == len(self.port_range):
- raise RuntimeError("All %i port numbers were already generated, no more can be generated" % len(self.port_range))
-
- while True:
- port = self._select_port(self.port_range)
-
- if port not in self.generated:
- self.generated.append(port)
- return port
-
- def is_port_in_use(self, port):
- return port in self.generated
-
- def undo_port_use(self, port):
- if port in self.generated:
- self.generated.remove(port)
- else:
- raise ValueError("Port %i is not in use and thus can not be undone" % port)
-
- def reduce_size(self, size):
- self.generated = self.generated[-size:]
-
- def clear(self):
- self.generated = []
-
- def clone(self):
- return copy.deepcopy(self)
- class ProtocolPortSelector:
- def __init__(self, port_range, select_tcp, select_udp = None):
- self.tcp = PortSelector(port_range, select_tcp)
- self.udp = PortSelector(port_range, select_udp or select_tcp)
-
- def get_tcp_generator(self):
- return self.tcp
-
- def get_udp_generator(self):
- return self.udp
-
- def select_port_tcp(self):
- return self.tcp.select_port()
-
- def select_port_udp(self):
- return self.udp.select_port()
-
- def is_port_in_use_tcp(self, port):
- return self.tcp.is_port_in_use(port)
-
- def is_port_in_use_udp(self, port):
- return self.udp.is_port_in_use(port)
-
- def clone(self):
- class Tmp: pass
- clone = Tmp()
- clone.__class__ = type(self)
-
- clone.udp = self.udp.clone()
- clone.tcp = self.tcp.clone()
-
- return clone
-
- def __getattr__(self, attr):
- val = getattr(self.tcp, attr)
-
- if callable(val):
- tcp_meth = val
- udp_meth = getattr(self.udp, attr)
-
- def double_method(*args, **kwargs):
- return (tcp_meth(*args, **kwargs), udp_meth(*args, **kwargs))
-
- return double_method
- else:
- return (val, getattr(self.udp, attr))
- class PortSelectors:
- LINUX = ProtocolPortSelector(PortRanges.LINUX, PortSelectionStrategy.random)
- APPLE = ProtocolPortSelector(PortRanges.DYNAMIC_PORTS,
- PortSelectionStrategy.sequential(),
- PortSelectionStrategy.random())
- FREEBSD = ProtocolPortSelector(PortRanges.FREEBSD, PortSelectionStrategy.random)
- WINDOWS = ProtocolPortSelector(PortRanges.WINDOWS_7, PortSelectionStrategy.random) # the selection strategy is a guess as i can't find more info on it
|