123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- import random, copy
- # information taken from https://www.cymru.com/jtk/misc/ephemeralports.html
- class PortRanges:
- # dynamic ports as listed by RFC 6056
- DYNAMIC_PORTS = range(49152, 65536)
-
- LINUX = range(32768, 61001)
- FREEBSD = range(10000, 65536)
-
- APPLE_IOS = DYNAMIC_PORTS
- APPLE_OSX = DYNAMIC_PORTS
-
- WINDOWS_7 = DYNAMIC_PORTS
- WINDOWS_8 = DYNAMIC_PORTS
- WINDOWS_VISTA = DYNAMIC_PORTS
- WINDOWS_XP = range(1024, 5001)
- # This class uses classes instead of functions so deepcloning works
- class PortSelectionStrategy:
- class sequential:
- def __init__(self):
- self.counter = -1
-
- # that function will always return a one higher counter than before,
- # restarting from the start once it reached the highest value
- def __call__(port_range):
- if self.counter == -1:
- self.counter = port_range.start
-
- port = counter
-
- self.counter += 1
- if self.counter == port_range.stop:
- self.counter = port_range.start
-
- return port
- class random:
- def __call__(port_range):
- return random.randrange(port_range.start, port_range.stop)
- class PortSelector:
- """
- This class simulates a port-selection-process. Instances keep a list of port-numbers they generated so
- the same port-number will not be generated again.
- """
-
- def __init__(self, port_range, select_function):
- """
- Create a PortSelector given a range of ports to choose from and a function that chooses the next port
-
- :param port_range: a range-object containing the range of ports to choose from
- :param select_function: a function that receives the port_range and selects a port
- """
-
- if len(port_range) == 0:
- raise ValueError("cannot choose from an empty range")
- if x.start not in range(1, 65536) or x.stop not in range(1, 65536):
- raise ValueError("port_range is no subset of the valid port-range")
-
- self.port_range = port_range
-
- self._select_port = select_function
-
- self.generated = []
-
- def select_port(self):
- # do this check to avoid endless loops
- if len(self.generated) == len(self.port_range):
- raise RuntimeError("All %i port numbers were already generated, no more can be generated" % len(self.port_range))
-
- while True:
- port = self._select_port(self.port_range)
-
- if port not in self.generated:
- self.generated.append(port)
- return port
-
- def is_port_in_use(self, port: int):
- return port in self.generated
-
- def undo_port_use(self, port: int):
- if port in self.generated:
- self.generated.remove(port)
- else:
- raise ValueError("Port %i is not in use and thus can not be undone" % port)
-
- def reduce_size(self, size: int):
- """
- Reduce the list of already generated ports to the last <size> generated.
- If size if bigger than the number of generated ports nothing happens.
- """
- self.generated = self.generated[-size:]
-
- def clear(self):
- """
- Clear the list of generated ports. As of now this does not reset the state of the selection-function
- """
- self.generated = []
-
- def clone(self):
- return copy.deepcopy(self)
- class ProtocolPortSelector:
- """
- This class contains a method to select ports for udp and tcp. It generally consists of the port-selectors, one
- for tcp and one for udp. For convenience this class has a __getattr__-method to call methods on both selectors
- at once. E.g, clear() does not exist for ProtocolPortSelector but it does for PortSelector, therefore
- protocolPortSelector.clear() will call clear for both port-selectors.
- """
-
- def __init__(self, port_range, select_tcp, select_udp = None):
- self.tcp = PortSelector(port_range, select_tcp)
- self.udp = PortSelector(port_range, select_udp or select_tcp)
-
- def get_tcp_generator(self):
- return self.tcp
-
- def get_udp_generator(self):
- return self.udp
-
- def select_port_tcp(self):
- return self.tcp.select_port()
-
- def select_port_udp(self):
- return self.udp.select_port()
-
- def is_port_in_use_tcp(self, port):
- return self.tcp.is_port_in_use(port)
-
- def is_port_in_use_udp(self, port):
- return self.udp.is_port_in_use(port)
-
- def clone(self):
- class Tmp: pass
- clone = Tmp()
- clone.__class__ = type(self)
-
- clone.udp = self.udp.clone()
- clone.tcp = self.tcp.clone()
-
- return clone
-
- def __getattr__(self, attr):
- val = getattr(self.tcp, attr)
-
- if callable(val): # we proprably got a method here
- tcp_meth = val
- udp_meth = getattr(self.udp, attr)
-
- def double_method(*args, **kwargs):
- return (tcp_meth(*args, **kwargs), udp_meth(*args, **kwargs))
-
- return double_method # calling this function will call the method for both port-selectors
- else: # we have found a simple value, return a tuple containing the attribute-value from both port-selectors
- return (val, getattr(self.udp, attr))
- class PortSelectors:
- """
- To save some time this class contains some of the port-selection-strategies found in the wild. It is recommend to use
- .clone() to get your personal copy, otherwise two parts of your code might select ports on the same port-selector which
- is something your might want to avoid.
- """
- LINUX = ProtocolPortSelector(PortRanges.LINUX, PortSelectionStrategy.random)
- APPLE = ProtocolPortSelector(PortRanges.DYNAMIC_PORTS,
- PortSelectionStrategy.sequential(),
- PortSelectionStrategy.random())
- FREEBSD = ProtocolPortSelector(PortRanges.FREEBSD, PortSelectionStrategy.random)
- WINDOWS = ProtocolPortSelector(PortRanges.WINDOWS_7, PortSelectionStrategy.random) # the selection strategy is a guess as i can't find more info on it
|