from scapy.packet import Raw import numpy.random as random2 import random import string from numpy.random import bytes from random import getrandbits from scapy.layers.inet import IP, Ether, UDP, TCP from scapy.packet import Raw from Attack.MembersMgmtCommAttack import MessageType from . import IPv4 as ip '''PaddingGenerator ''' def add_padding(packet, bytes_padding = 0, user_padding=True, rnd = False): ''' Adds padding to a packet with the given amount of bytes, but a maximum of 100 bytes. :param packet: the packet that will be extended with the additional payload :param bytes_padding: the amount of bytes that will be appended to the packet :param user_padding: true, if the function add_padding is called from another class and the user sets the padding manually :param rnd: adds a random padding betwing 0 and bytes_adding, if true :return: the initial packet, extended with the wanted amount of bytes of padding ''' if(user_padding and bytes_padding > 100): bytes_padding = 100 if (rnd is True): r = int(round(bytes_padding / 4)) #sets bytes_padding to any number between 0 and bytes_padding bytes_padding = random2.random_integers(0, r) * 4 #, that's dividable by 4 payload = generate_payload(bytes_padding) packet[Raw].load += Raw(load=payload).load return packet def equal_length(list_of_packets, length = 0, padding = 0): ''' Equals the length of a given set of packets on the given length. If the given length is smaller than the largest packet, all the other packets are extended to the largest packet's length. :param list_of_packets: The given set of packet. :param length: The length each packet should have. :return: The set of extended packets. ''' largest_packet = length for packet in list_of_packets: packet_length = len(packet) if(packet_length > largest_packet): largest_packet = packet_length for packet in list_of_packets: bytes_padding = largest_packet - len(packet) if(bytes_padding > 0): add_padding(packet, bytes_padding, False, False) add_padding(packet, padding, False, True) return list_of_packets '''PayloadGenerator ''' def generate_payload(size:int=0): """ Generates a payload of random bytes of the given amount :param size: number of generated bytes :return: the generated payload """ payload = bytes(size) return payload '''PortGenerator ''' def gen_random_server_port(offset: int=2199): """ Generates a valid random first and last character for a bots hostname and computes a port from these two characters. The default offset is chosen from a Sality implementation in 2011 """ firstLetter = random.choice(string.ascii_letters); lastLetter = random.choice(string.ascii_letters + string.digits); return (offset + ord(firstLetter) * ord(lastLetter)); '''MacAddressGenerator ''' class MacAddressGenerator: def __init__(self, include_broadcast_macs=False, include_virtual_macs=False): self.broadcast = include_broadcast_macs self.virtual = include_virtual_macs self.generated = set() def random_mac(self) -> str: while True: mac = self._random_mac() if mac not in self.generated: self.generated.add(mac) return mac def clear(self): self.generated.clear() def generates_broadcast_macs(self) -> bool: return self.broadcast def generates_virtual_macs(self) -> bool: return self.virtual def set_broadcast_generation(self, broadcast: bool): self.broadcast = broadcast def set_virtual_generation(self, virtual: bool): self.virtual = virtual def _random_mac(self) -> str: mac_bytes = bytearray(getrandbits(8) for i in range(6)) if not self.broadcast: mac_bytes[0] &= ~1 # clear the first bytes' first bit if not self.virtual: mac_bytes[0] &= ~2 # clear the first bytes' second bit return ":".join("%02X" % b for b in mac_bytes) '''PacketGenerator ''' class PacketGenerator(): """ Creates packets, based on the set protocol """ def __init__(self, protocol="udp"): """ Creates a new Packet_Generator Object :param protocol: the protocol of the packets to be created, udp or tcp """ super(PacketGenerator, self).__init__() self.protocol = protocol def generate_packet(self, ip_src: str = "192.168.64.32", ip_dst: str = "192.168.64.48", mac_src: str = "56:6D:D9:BC:70:1C", mac_dst: str = "F4:2B:95:B3:0E:1A", port_src: int = 1337, port_dst: int = 6442, ttl: int = 64, tcpflags: str = "S", payload: str = ""): """ Creates a Packet with the specified Values for the current protocol :param ip_src: the source IP address of the IP header :param ip_dst the destination IP address of the IP header :param mac_src: the source MAC address of the MAC header :param mac_dst: the destination MAC address of the MAC header :param port_src: the source port of the header :param port_dst: the destination port of the header :param ttl: the ttl Value of the packet :param tcpflags: the TCP flags of the TCP header :param payload: the payload of the packet :return: the corresponding packet """ if (self.protocol == "udp"): packet = generate_udp_packet(ip_src=ip_src, ip_dst=ip_dst, mac_src=mac_src, mac_dst=mac_dst, ttl=ttl, port_src=port_src, port_dst=port_dst, payload=payload) elif (self.protocol == "tcp"): packet = generate_tcp_packet(ip_src=ip_src, ip_dst=ip_dst, mac_src=mac_src, mac_dst=mac_dst, ttl=ttl, port_src=port_src, port_dst=port_dst, tcpflags=tcpflags, payload=payload) return packet def generate_mmcom_packet(self, ip_src: str = "192.168.64.32", ip_dst: str = "192.168.64.48", mac_src: str = "56:6D:D9:BC:70:1C", mac_dst: str = "F4:2B:95:B3:0E:1A", port_src: int = 1337, port_dst: int = 6442, tcpflags: str = "S", ttl: int = 64, message_type: MessageType = MessageType.SALITY_HELLO, neighborlist_entries: int = 1): """ Creates a Packet for Members-Management-Communication with the specified Values and the current protocol :param ip_src: the source IP address of the IP header :param ip_dst the destination IP address of the IP header :param mac_src: the source MAC address of the MAC header :param mac_dst: the destination MAC address of the MAC header :param port_src: the source port of the header :param port_dst: the destination port of the header :param tcpflags: the TCP flags of the TCP header, if tcp is selected as protocol :param ttl: the ttl Value of the packet :param message_type: affects the size of the payload :param neighborlist_entries: number of entries of a Neighbourlist-reply, affects the size of the payload :return: the corresponding packet """ # Determine length of the payload that has to be generated if (message_type == MessageType.SALITY_HELLO): payload_len = 0 elif (message_type == MessageType.SALITY_HELLO_REPLY): payload_len = 22 elif (message_type == MessageType.SALITY_NL_REQUEST): payload_len = 28 elif (message_type == MessageType.SALITY_NL_REPLY): payload_len = 24 + 6 * neighborlist_entries else: payload_len = 0 payload = generate_payload(payload_len) if (self.protocol == "udp"): packet = generate_udp_packet(ip_src=ip_src, ip_dst=ip_dst, mac_src=mac_src, mac_dst=mac_dst, ttl=ttl, port_src=port_src, port_dst=port_dst, payload=payload) elif (self.protocol == "tcp"): packet = generate_tcp_packet(ip_src=ip_src, ip_dst=ip_dst, mac_src=mac_src, mac_dst=mac_dst, ttl=ttl, port_src=port_src, port_dst=port_dst, tcpflags=tcpflags, payload=payload) else: print("Error: unsupported protocol for generating Packets") return packet def generate_tcp_packet(ip_src: str = "192.168.64.32", ip_dst: str = "192.168.64.48", mac_src: str = "56:6D:D9:BC:70:1C", ttl: int = 64, mac_dst: str = "F4:2B:95:B3:0E:1A", port_src: int = 1337, port_dst: int = 6442, tcpflags: str = "S", payload: str = ""): """ Builds a TCP packet with the values specified by the caller. :param ip_src: the source IP address of the IP header :param ip_dst the destination IP address of the IP header :param mac_src: the source MAC address of the MAC header :param ttl: the ttl value of the packet :param mac_dst: the destination MAC address of the MAC header :param port_src: the source port of the TCP header :param port_dst: the destination port of the TCP header :param tcpflags: the TCP flags of the TCP header :param payload: the payload of the packet :return: the corresponding TCP packet """ ether = Ether(src=mac_src, dst=mac_dst) ip = IP(src=ip_src, dst=ip_dst, ttl=ttl) tcp = TCP(sport=port_src, dport=port_dst, flags=tcpflags) packet = ether / ip / tcp / Raw(load=payload) return packet def generate_udp_packet(ip_src: str = "192.168.64.32", ip_dst: str = "192.168.64.48", mac_src: str = "56:6D:D9:BC:70:1C", ttl: int = 64, mac_dst: str = "F4:2B:95:B3:0E:1A", port_src: int = 1337, port_dst: int = 6442, payload: str = ""): """ Builds an UDP packet with the values specified by the caller. :param ip_src: the source IP address of the IP header :param ip_dst the destination IP address of the IP header :param mac_src: the source MAC address of the MAC header :param ttl: the ttl value of the packet :param mac_dst: the destination MAC address of the MAC header :param port_src: the source port of the UDP header :param port_dst: the destination port of the UDP header :param payload: the payload of the packet :return: the corresponding UDP packet """ ether = Ether(src=mac_src, dst=mac_dst) ip = IP(src=ip_src, dst=ip_dst, ttl=ttl) udp = UDP(sport=port_src, dport=port_dst) packet = ether / ip / udp / Raw(load=payload) return packet '''IPGenerator ''' class IPChooser: def random_ip(self): return ip.IPAddress.from_int(random.randrange(0, 1 << 32)) def size(self): return 1 << 32 def __len__(self): return self.size() class IPChooserByRange(IPChooser): def __init__(self, ip_range): self.range = ip_range def random_ip(self): start = int(self.range.first_address()) end = start + self.range.block_size() return ip.IPAddress.from_int(random.randrange(start, end)) def size(self): return self.range.block_size() class IPChooserByList(IPChooser): def __init__(self, ips): self.ips = list(ips) if not self.ips: raise ValueError("list of ips must not be empty") def random_ip(self): return random.choice(self.ips) def size(self): return len(self.ips) class IPGenerator: def __init__(self, ip_chooser=IPChooser(), # include all ip-addresses by default (before the blacklist) include_private_ips=False, include_localhost=False, include_multicast=False, include_reserved=False, include_link_local=False, blacklist=None): self.blacklist = [] self.generated_ips = set() if not include_private_ips: for segment in ip.ReservedIPBlocks.PRIVATE_IP_SEGMENTS: self.add_to_blacklist(segment) if not include_localhost: self.add_to_blacklist(ip.ReservedIPBlocks.LOCALHOST_SEGMENT) if not include_multicast: self.add_to_blacklist(ip.ReservedIPBlocks.MULTICAST_SEGMENT) if not include_reserved: self.add_to_blacklist(ip.ReservedIPBlocks.RESERVED_SEGMENT) if not include_link_local: self.add_to_blacklist(ip.ReservedIPBlocks.ZERO_CONF_SEGMENT) if blacklist: for segment in blacklist: self.add_to_blacklist(segment) self.chooser = ip_chooser @staticmethod def from_range(range, *args, **kwargs): return IPGenerator(IPChooserByRange(range), *args, **kwargs) def add_to_blacklist(self, ip_segment): if isinstance(ip_segment, ip.IPAddressBlock): self.blacklist.append(ip_segment) else: self.blacklist.append(ip.IPAddressBlock.parse(ip_segment)) def random_ip(self): if len(self.generated_ips) == self.chooser.size(): raise ValueError("Exhausted the space of possible ip-addresses, no new unique ip-address can be generated") while True: random_ip = self.chooser.random_ip() if not self._is_in_blacklist(random_ip) and random_ip not in self.generated_ips: self.generated_ips.add(random_ip) return str(random_ip) def clear(self, clear_blacklist=True, clear_generated_ips=True): if clear_blacklist: self.blacklist.clear() if clear_generated_ips: self.generated_ips.clear() def _is_in_blacklist(self, ip: ip.IPAddress): return any(ip in block for block in self.blacklist) class MappingIPGenerator(IPGenerator): def __init__(self, *args, **kwargs): super().__init__(self, *args, **kwargs) self.mapping = {} def clear(self, clear_generated_ips=True, *args, **kwargs): super().clear(self, clear_generated_ips=clear_generated_ips, *args, **kwargs) if clear_generated_ips: self.mapping = {} def get_mapped_ip(self, key): if key not in self.mapping: self.mapping[key] = self.random_ip() return self.mapping[key] def __getitem__(self, item): return self.get_mapped_ip(item)