@@ -0,0 +1,393 @@
+from scapy.packet import Raw
+import numpy.random as random2
+import random
+import string
+from numpy.random import bytes
+from random import getrandbits
+from scapy.layers.inet import IP, Ether, UDP, TCP
+from scapy.packet import Raw
+from Attack.MembersMgmtCommAttack import MessageType
+from . import IPv4 as ip
+def add_padding(packet, bytes_padding = 0, user_padding=True, rnd = False):
+ '''
+ Adds padding to a packet with the given amount of bytes, but a maximum of 100 bytes.
+ :param packet: the packet that will be extended with the additional payload
+ :param bytes_padding: the amount of bytes that will be appended to the packet
+ :param user_padding: true, if the function add_padding is called from another class and the user
+ sets the padding manually
+ :param rnd: adds a random padding betwing 0 and bytes_adding, if true
+ :return: the initial packet, extended with the wanted amount of bytes of padding
+ '''
+ if(user_padding and bytes_padding > 100):
+ bytes_padding = 100
+ if (rnd is True):
+ r = int(round(bytes_padding / 4)) #sets bytes_padding to any number between 0 and bytes_padding
+ bytes_padding = random2.random_integers(0, r) * 4 #, that's dividable by 4
+ payload = generate_payload(bytes_padding)
+ packet[Raw].load += Raw(load=payload).load
+ return packet
+def equal_length(list_of_packets, length = 0, padding = 0):
+ '''
+ Equals the length of a given set of packets on the given length. If the given length is smaller than the largest
+ packet, all the other packets are extended to the largest packet's length.
+ :param list_of_packets: The given set of packet.
+ :param length: The length each packet should have.
+ :return: The set of extended packets.
+ '''
+ largest_packet = length
+ for packet in list_of_packets:
+ packet_length = len(packet)
+ if(packet_length > largest_packet):
+ largest_packet = packet_length
+ for packet in list_of_packets:
+ bytes_padding = largest_packet - len(packet)
+ if(bytes_padding > 0):
+ add_padding(packet, bytes_padding, False, False)
+ add_padding(packet, padding, False, True)
+ return list_of_packets
+def generate_payload(size:int=0):
+ """
+ Generates a payload of random bytes of the given amount
+ :param size: number of generated bytes
+ :return: the generated payload
+ """
+ payload = bytes(size)
+ return payload
+def gen_random_server_port(offset: int=2199):
+ """
+ Generates a valid random first and last character for a bots hostname
+ and computes a port from these two characters.
+ The default offset is chosen from a Sality implementation in 2011
+ """
+ firstLetter = random.choice(string.ascii_letters);
+ lastLetter = random.choice(string.ascii_letters + string.digits);
+ return (offset + ord(firstLetter) * ord(lastLetter));
+class MacAddressGenerator:
+ def __init__(self, include_broadcast_macs=False, include_virtual_macs=False):
+ self.broadcast = include_broadcast_macs
+ self.virtual = include_virtual_macs
+ self.generated = set()
+ def random_mac(self) -> str:
+ while True:
+ mac = self._random_mac()
+ if mac not in self.generated:
+ self.generated.add(mac)
+ return mac
+ def clear(self):
+ self.generated.clear()
+ def generates_broadcast_macs(self) -> bool:
+ return self.broadcast
+ def generates_virtual_macs(self) -> bool:
+ return self.virtual
+ def set_broadcast_generation(self, broadcast: bool):
+ self.broadcast = broadcast
+ def set_virtual_generation(self, virtual: bool):
+ self.virtual = virtual
+ def _random_mac(self) -> str:
+ mac_bytes = bytearray(getrandbits(8) for i in range(6))
+ if not self.broadcast:
+ mac_bytes[0] &= ~1 # clear the first bytes' first bit
+ if not self.virtual:
+ mac_bytes[0] &= ~2 # clear the first bytes' second bit
+ return ":".join("%02X" % b for b in mac_bytes)
+class PacketGenerator():
+ """
+ Creates packets, based on the set protocol
+ """
+ def __init__(self, protocol="udp"):
+ """
+ Creates a new Packet_Generator Object
+ :param protocol: the protocol of the packets to be created, udp or tcp
+ """
+ super(PacketGenerator, self).__init__()
+ self.protocol = protocol
+ def generate_packet(self, ip_src: str = "", ip_dst: str = "",
+ mac_src: str = "56:6D:D9:BC:70:1C",
+ mac_dst: str = "F4:2B:95:B3:0E:1A", port_src: int = 1337, port_dst: int = 6442, ttl: int = 64,
+ tcpflags: str = "S", payload: str = ""):
+ """
+ Creates a Packet with the specified Values for the current protocol
+ :param ip_src: the source IP address of the IP header
+ :param ip_dst the destination IP address of the IP header
+ :param mac_src: the source MAC address of the MAC header
+ :param mac_dst: the destination MAC address of the MAC header
+ :param port_src: the source port of the header
+ :param port_dst: the destination port of the header
+ :param ttl: the ttl Value of the packet
+ :param tcpflags: the TCP flags of the TCP header
+ :param payload: the payload of the packet
+ :return: the corresponding packet
+ """
+ if (self.protocol == "udp"):
+ packet = generate_udp_packet(ip_src=ip_src, ip_dst=ip_dst, mac_src=mac_src, mac_dst=mac_dst, ttl=ttl,
+ port_src=port_src, port_dst=port_dst, payload=payload)
+ elif (self.protocol == "tcp"):
+ packet = generate_tcp_packet(ip_src=ip_src, ip_dst=ip_dst, mac_src=mac_src, mac_dst=mac_dst, ttl=ttl,
+ port_src=port_src, port_dst=port_dst, tcpflags=tcpflags, payload=payload)
+ return packet
+ def generate_mmcom_packet(self, ip_src: str = "", ip_dst: str = "",
+ mac_src: str = "56:6D:D9:BC:70:1C",
+ mac_dst: str = "F4:2B:95:B3:0E:1A", port_src: int = 1337, port_dst: int = 6442,
+ tcpflags: str = "S", ttl: int = 64,
+ message_type: MessageType = MessageType.SALITY_HELLO, neighborlist_entries: int = 1):
+ """
+ Creates a Packet for Members-Management-Communication with the specified Values and the current protocol
+ :param ip_src: the source IP address of the IP header
+ :param ip_dst the destination IP address of the IP header
+ :param mac_src: the source MAC address of the MAC header
+ :param mac_dst: the destination MAC address of the MAC header
+ :param port_src: the source port of the header
+ :param port_dst: the destination port of the header
+ :param tcpflags: the TCP flags of the TCP header, if tcp is selected as protocol
+ :param ttl: the ttl Value of the packet
+ :param message_type: affects the size of the payload
+ :param neighborlist_entries: number of entries of a Neighbourlist-reply, affects the size of the payload
+ :return: the corresponding packet
+ """
+ # Determine length of the payload that has to be generated
+ if (message_type == MessageType.SALITY_HELLO):
+ payload_len = 0
+ elif (message_type == MessageType.SALITY_HELLO_REPLY):
+ payload_len = 22
+ elif (message_type == MessageType.SALITY_NL_REQUEST):
+ payload_len = 28
+ elif (message_type == MessageType.SALITY_NL_REPLY):
+ payload_len = 24 + 6 * neighborlist_entries
+ else:
+ payload_len = 0
+ payload = generate_payload(payload_len)
+ if (self.protocol == "udp"):
+ packet = generate_udp_packet(ip_src=ip_src, ip_dst=ip_dst, mac_src=mac_src, mac_dst=mac_dst, ttl=ttl,
+ port_src=port_src, port_dst=port_dst, payload=payload)
+ elif (self.protocol == "tcp"):
+ packet = generate_tcp_packet(ip_src=ip_src, ip_dst=ip_dst, mac_src=mac_src, mac_dst=mac_dst, ttl=ttl,
+ port_src=port_src, port_dst=port_dst, tcpflags=tcpflags, payload=payload)
+ else:
+ print("Error: unsupported protocol for generating Packets")
+ return packet
+def generate_tcp_packet(ip_src: str = "", ip_dst: str = "",
+ mac_src: str = "56:6D:D9:BC:70:1C", ttl: int = 64,
+ mac_dst: str = "F4:2B:95:B3:0E:1A", port_src: int = 1337, port_dst: int = 6442,
+ tcpflags: str = "S", payload: str = ""):
+ """
+ Builds a TCP packet with the values specified by the caller.
+ :param ip_src: the source IP address of the IP header
+ :param ip_dst the destination IP address of the IP header
+ :param mac_src: the source MAC address of the MAC header
+ :param ttl: the ttl value of the packet
+ :param mac_dst: the destination MAC address of the MAC header
+ :param port_src: the source port of the TCP header
+ :param port_dst: the destination port of the TCP header
+ :param tcpflags: the TCP flags of the TCP header
+ :param payload: the payload of the packet
+ :return: the corresponding TCP packet
+ """
+ ether = Ether(src=mac_src, dst=mac_dst)
+ ip = IP(src=ip_src, dst=ip_dst, ttl=ttl)
+ tcp = TCP(sport=port_src, dport=port_dst, flags=tcpflags)
+ packet = ether / ip / tcp / Raw(load=payload)
+ return packet
+def generate_udp_packet(ip_src: str = "", ip_dst: str = "",
+ mac_src: str = "56:6D:D9:BC:70:1C", ttl: int = 64,
+ mac_dst: str = "F4:2B:95:B3:0E:1A", port_src: int = 1337, port_dst: int = 6442,
+ payload: str = ""):
+ """
+ Builds an UDP packet with the values specified by the caller.
+ :param ip_src: the source IP address of the IP header
+ :param ip_dst the destination IP address of the IP header
+ :param mac_src: the source MAC address of the MAC header
+ :param ttl: the ttl value of the packet
+ :param mac_dst: the destination MAC address of the MAC header
+ :param port_src: the source port of the UDP header
+ :param port_dst: the destination port of the UDP header
+ :param payload: the payload of the packet
+ :return: the corresponding UDP packet
+ """
+ ether = Ether(src=mac_src, dst=mac_dst)
+ ip = IP(src=ip_src, dst=ip_dst, ttl=ttl)
+ udp = UDP(sport=port_src, dport=port_dst)
+ packet = ether / ip / udp / Raw(load=payload)
+ return packet
+class IPChooser:
+ def random_ip(self):
+ return ip.IPAddress.from_int(random.randrange(0, 1 << 32))
+ def size(self):
+ return 1 << 32
+ def __len__(self):
+ return self.size()
+class IPChooserByRange(IPChooser):
+ def __init__(self, ip_range):
+ self.range = ip_range
+ def random_ip(self):
+ start = int(self.range.first_address())
+ end = start + self.range.block_size()
+ return ip.IPAddress.from_int(random.randrange(start, end))
+ def size(self):
+ return self.range.block_size()
+class IPChooserByList(IPChooser):
+ def __init__(self, ips):
+ self.ips = list(ips)
+ if not self.ips:
+ raise ValueError("list of ips must not be empty")
+ def random_ip(self):
+ return random.choice(self.ips)
+ def size(self):
+ return len(self.ips)
+class IPGenerator:
+ def __init__(self, ip_chooser=IPChooser(), # include all ip-addresses by default (before the blacklist)
+ include_private_ips=False, include_localhost=False,
+ include_multicast=False, include_reserved=False,
+ include_link_local=False, blacklist=None):
+ self.blacklist = []
+ self.generated_ips = set()
+ if not include_private_ips:
+ for segment in ip.ReservedIPBlocks.PRIVATE_IP_SEGMENTS:
+ self.add_to_blacklist(segment)
+ if not include_localhost:
+ self.add_to_blacklist(ip.ReservedIPBlocks.LOCALHOST_SEGMENT)
+ if not include_multicast:
+ self.add_to_blacklist(ip.ReservedIPBlocks.MULTICAST_SEGMENT)
+ if not include_reserved:
+ self.add_to_blacklist(ip.ReservedIPBlocks.RESERVED_SEGMENT)
+ if not include_link_local:
+ self.add_to_blacklist(ip.ReservedIPBlocks.ZERO_CONF_SEGMENT)
+ if blacklist:
+ for segment in blacklist:
+ self.add_to_blacklist(segment)
+ self.chooser = ip_chooser
+ @staticmethod
+ def from_range(range, *args, **kwargs):
+ return IPGenerator(IPChooserByRange(range), *args, **kwargs)
+ def add_to_blacklist(self, ip_segment):
+ if isinstance(ip_segment, ip.IPAddressBlock):
+ self.blacklist.append(ip_segment)
+ else:
+ self.blacklist.append(ip.IPAddressBlock.parse(ip_segment))
+ def random_ip(self):
+ if len(self.generated_ips) == self.chooser.size():
+ raise ValueError("Exhausted the space of possible ip-addresses, no new unique ip-address can be generated")
+ while True:
+ random_ip = self.chooser.random_ip()
+ if not self._is_in_blacklist(random_ip) and random_ip not in self.generated_ips:
+ self.generated_ips.add(random_ip)
+ return str(random_ip)
+ def clear(self, clear_blacklist=True, clear_generated_ips=True):
+ if clear_blacklist: self.blacklist.clear()
+ if clear_generated_ips: self.generated_ips.clear()
+ def _is_in_blacklist(self, ip: ip.IPAddress):
+ return any(ip in block for block in self.blacklist)
+class MappingIPGenerator(IPGenerator):
+ def __init__(self, *args, **kwargs):
+ super().__init__(self, *args, **kwargs)
+ self.mapping = {}
+ def clear(self, clear_generated_ips=True, *args, **kwargs):
+ super().clear(self, clear_generated_ips=clear_generated_ips, *args, **kwargs)
+ if clear_generated_ips:
+ self.mapping = {}
+ def get_mapped_ip(self, key):
+ if key not in self.mapping:
+ self.mapping[key] = self.random_ip()
+ return self.mapping[key]
+ def __getitem__(self, item):
+ return self.get_mapped_ip(item)