123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- from scapy.packet import Raw
- import numpy.random as random2
- import random
- import string
- from numpy.random import bytes
- from random import getrandbits
- from scapy.layers.inet import IP, Ether, UDP, TCP
- from scapy.packet import Raw
- from ID2TLib.Botnet.Message import MessageType
- from . import IPv4 as ip
- def add_padding(packet, bytes_padding:int = 0, user_padding:bool=True, rnd:bool = False):
- '''
- Adds padding to a packet with the given amount of bytes, but a maximum of 100 bytes, if called by the user.
- :param packet: the packet that will be extended with the additional payload
- :param bytes_padding: the amount of bytes that will be appended to the packet. Capped to 100,
- if called by the user.
- :param user_padding: true, if the function add_padding by the user and not within the code
- :param rnd: adds a random padding between 0 and bytes_padding, if true
- :return: the initial packet, extended with the wanted amount of bytes of padding
- '''
- if(user_padding == True and bytes_padding > 100):
- bytes_padding = 100
- if (rnd is True):
- r = int(round(bytes_padding / 4))
- bytes_padding = random2.random_integers(0, r) * 4
- payload = generate_payload(bytes_padding)
- packet[Raw].load += Raw(load=payload).load
- return packet
- def equal_length(list_of_packets:list, length:int = 0, padding:int = 0, force_len:bool = False):
- '''
- Equals the length of all packets of a given list of packets to the given length. If the given length is smaller than the largest
- packet, all the other packets are extended to the largest packet's length. Adds additional padding
- afterwards to create realism.
- :param list_of_packets: The given list of packet.
- :param length: The length each packet should have. Can be redundant, if the largest packet has more bytes
- :param force_len: if true, all packets are forced to take on the length of param length
- than length.
- :param padding: The amount of bytes that can be appended at most by the second add, that creates realism.
- (The called function will randomly chose a value between zero and padding.)
- :return: The list of extended packets.
- '''
- if not force_len:
- largest_packet = length
- for packet in list_of_packets:
- packet_length = len(packet)
- if(packet_length > largest_packet):
- largest_packet = packet_length
- else:
- largest_packet = length
- for packet in list_of_packets:
- bytes_padding = largest_packet - len(packet)
- if(bytes_padding > 0):
- add_padding(packet, bytes_padding, False, False)
- add_padding(packet, padding, False, True)
- return list_of_packets
- def generate_payload(size:int=0):
- """
- Generates a payload of random bytes of the given amount
- :param size: number of generated bytes
- :return: the generated payload
- """
- payload = bytes(size)
- return payload
- def gen_random_server_port(offset: int=2199):
- """
- Generates a valid random first and last character for a bots hostname
- and computes a port from these two characters.
- The default offset is chosen from a Sality implementation in 2011
- :param offset: port base for calculation
- :return: the generated server port
- """
- firstLetter = random.choice(string.ascii_letters);
- lastLetter = random.choice(string.ascii_letters + string.digits);
- return (offset + ord(firstLetter) * ord(lastLetter));
- class MacAddressGenerator:
- def __init__(self, include_broadcast_macs=False, include_virtual_macs=False):
- self.broadcast = include_broadcast_macs
- self.virtual = include_virtual_macs
- self.generated = set()
- def random_mac(self) -> str:
- while True:
- mac = self._random_mac()
- if mac not in self.generated:
- self.generated.add(mac)
- return mac
- def clear(self):
- self.generated.clear()
- def generates_broadcast_macs(self) -> bool:
- return self.broadcast
- def generates_virtual_macs(self) -> bool:
- return self.virtual
- def set_broadcast_generation(self, broadcast: bool):
- self.broadcast = broadcast
- def set_virtual_generation(self, virtual: bool):
- self.virtual = virtual
- def _random_mac(self) -> str:
- mac_bytes = bytearray(getrandbits(8) for i in range(6))
- if not self.broadcast:
- mac_bytes[0] &= ~1
- if not self.virtual:
- mac_bytes[0] &= ~2
- return ":".join("%02X" % b for b in mac_bytes)
- class PacketGenerator():
- """
- Creates packets, based on the set protocol
- """
- def __init__(self, protocol="udp"):
- """
- Creates a new Packet_Generator Object
- :param protocol: the protocol of the packets to be created, udp or tcp
- """
- super(PacketGenerator, self).__init__()
- self.protocol = protocol
- def generate_packet(self, ip_src: str = "192.168.64.32", ip_dst: str = "192.168.64.48",
- mac_src: str = "56:6D:D9:BC:70:1C",
- mac_dst: str = "F4:2B:95:B3:0E:1A", port_src: int = 1337, port_dst: int = 6442, ttl: int = 64,
- tcpflags: str = "S", payload: str = ""):
- """
- Creates a Packet with the specified Values for the current protocol
- :param ip_src: the source IP address of the IP header
- :param ip_dst the destination IP address of the IP header
- :param mac_src: the source MAC address of the MAC header
- :param mac_dst: the destination MAC address of the MAC header
- :param port_src: the source port of the header
- :param port_dst: the destination port of the header
- :param ttl: the ttl Value of the packet
- :param tcpflags: the TCP flags of the TCP header
- :param payload: the payload of the packet
- :return: the corresponding packet
- """
- if (self.protocol == "udp"):
- packet = generate_udp_packet(ip_src=ip_src, ip_dst=ip_dst, mac_src=mac_src, mac_dst=mac_dst, ttl=ttl,
- port_src=port_src, port_dst=port_dst, payload=payload)
- elif (self.protocol == "tcp"):
- packet = generate_tcp_packet(ip_src=ip_src, ip_dst=ip_dst, mac_src=mac_src, mac_dst=mac_dst, ttl=ttl,
- port_src=port_src, port_dst=port_dst, tcpflags=tcpflags, payload=payload)
- return packet
- def generate_mmcom_packet(self, ip_src: str = "192.168.64.32", ip_dst: str = "192.168.64.48",
- mac_src: str = "56:6D:D9:BC:70:1C",
- mac_dst: str = "F4:2B:95:B3:0E:1A", port_src: int = 1337, port_dst: int = 6442,
- tcpflags: str = "S", ttl: int = 64,
- message_type: MessageType = MessageType.SALITY_HELLO, neighborlist_entries: int = 1):
- """
- Creates a Packet for Members-Management-Communication with the specified Values and the current protocol
- :param ip_src: the source IP address of the IP header
- :param ip_dst the destination IP address of the IP header
- :param mac_src: the source MAC address of the MAC header
- :param mac_dst: the destination MAC address of the MAC header
- :param port_src: the source port of the header
- :param port_dst: the destination port of the header
- :param tcpflags: the TCP flags of the TCP header, if tcp is selected as protocol
- :param ttl: the ttl Value of the packet
- :param message_type: affects the size of the payload
- :param neighborlist_entries: number of entries of a Neighbourlist-reply, affects the size of the payload
- :return: the corresponding packet
- """
-
- if (message_type == MessageType.SALITY_HELLO):
- payload_len = 0
- elif (message_type == MessageType.SALITY_HELLO_REPLY):
- payload_len = 22
- elif (message_type == MessageType.SALITY_NL_REQUEST):
- payload_len = 28
- elif (message_type == MessageType.SALITY_NL_REPLY):
- payload_len = 24 + 6 * neighborlist_entries
- else:
- payload_len = 0
- payload = generate_payload(payload_len)
- if (self.protocol == "udp"):
- packet = generate_udp_packet(ip_src=ip_src, ip_dst=ip_dst, mac_src=mac_src, mac_dst=mac_dst, ttl=ttl,
- port_src=port_src, port_dst=port_dst, payload=payload)
- elif (self.protocol == "tcp"):
- packet = generate_tcp_packet(ip_src=ip_src, ip_dst=ip_dst, mac_src=mac_src, mac_dst=mac_dst, ttl=ttl,
- port_src=port_src, port_dst=port_dst, tcpflags=tcpflags, payload=payload)
- else:
- print("Error: unsupported protocol for generating Packets")
- return packet
- def generate_tcp_packet(ip_src: str = "192.168.64.32", ip_dst: str = "192.168.64.48",
- mac_src: str = "56:6D:D9:BC:70:1C", ttl: int = 64,
- mac_dst: str = "F4:2B:95:B3:0E:1A", port_src: int = 1337, port_dst: int = 6442,
- tcpflags: str = "S", payload: str = ""):
- """
- Builds a TCP packet with the values specified by the caller.
- :param ip_src: the source IP address of the IP header
- :param ip_dst the destination IP address of the IP header
- :param mac_src: the source MAC address of the MAC header
- :param ttl: the ttl value of the packet
- :param mac_dst: the destination MAC address of the MAC header
- :param port_src: the source port of the TCP header
- :param port_dst: the destination port of the TCP header
- :param tcpflags: the TCP flags of the TCP header
- :param payload: the payload of the packet
- :return: the corresponding TCP packet
- """
- ether = Ether(src=mac_src, dst=mac_dst)
- ip = IP(src=ip_src, dst=ip_dst, ttl=ttl)
- tcp = TCP(sport=port_src, dport=port_dst, flags=tcpflags)
- packet = ether / ip / tcp / Raw(load=payload)
- return packet
- def generate_udp_packet(ip_src: str = "192.168.64.32", ip_dst: str = "192.168.64.48",
- mac_src: str = "56:6D:D9:BC:70:1C", ttl: int = 64,
- mac_dst: str = "F4:2B:95:B3:0E:1A", port_src: int = 1337, port_dst: int = 6442,
- payload: str = ""):
- """
- Builds an UDP packet with the values specified by the caller.
- :param ip_src: the source IP address of the IP header
- :param ip_dst the destination IP address of the IP header
- :param mac_src: the source MAC address of the MAC header
- :param ttl: the ttl value of the packet
- :param mac_dst: the destination MAC address of the MAC header
- :param port_src: the source port of the UDP header
- :param port_dst: the destination port of the UDP header
- :param payload: the payload of the packet
- :return: the corresponding UDP packet
- """
- ether = Ether(src=mac_src, dst=mac_dst)
- ip = IP(src=ip_src, dst=ip_dst, ttl=ttl)
- udp = UDP(sport=port_src, dport=port_dst)
- packet = ether / ip / udp / Raw(load=payload)
- return packet
- class IPChooser:
- def random_ip(self):
- return ip.IPAddress.from_int(random.randrange(0, 1 << 32))
- def size(self):
- return 1 << 32
- def __len__(self):
- return self.size()
- class IPChooserByRange(IPChooser):
- def __init__(self, ip_range):
- self.range = ip_range
- def random_ip(self):
- start = int(self.range.first_address())
- end = start + self.range.block_size()
- return ip.IPAddress.from_int(random.randrange(start, end))
- def size(self):
- return self.range.block_size()
- class IPChooserByList(IPChooser):
- def __init__(self, ips):
- self.ips = list(ips)
- if not self.ips:
- raise ValueError("list of ips must not be empty")
- def random_ip(self):
- return random.choice(self.ips)
- def size(self):
- return len(self.ips)
- class IPGenerator:
- def __init__(self, ip_chooser=IPChooser(),
- include_private_ips=False, include_localhost=False,
- include_multicast=False, include_reserved=False,
- include_link_local=False, blacklist=None):
- self.blacklist = []
- self.generated_ips = set()
- if not include_private_ips:
- for segment in ip.ReservedIPBlocks.PRIVATE_IP_SEGMENTS:
- self.add_to_blacklist(segment)
- if not include_localhost:
- self.add_to_blacklist(ip.ReservedIPBlocks.LOCALHOST_SEGMENT)
- if not include_multicast:
- self.add_to_blacklist(ip.ReservedIPBlocks.MULTICAST_SEGMENT)
- if not include_reserved:
- self.add_to_blacklist(ip.ReservedIPBlocks.RESERVED_SEGMENT)
- if not include_link_local:
- self.add_to_blacklist(ip.ReservedIPBlocks.ZERO_CONF_SEGMENT)
- if blacklist:
- for segment in blacklist:
- self.add_to_blacklist(segment)
- self.chooser = ip_chooser
- @staticmethod
- def from_range(range, *args, **kwargs):
- return IPGenerator(IPChooserByRange(range), *args, **kwargs)
- def add_to_blacklist(self, ip_segment):
- if isinstance(ip_segment, ip.IPAddressBlock):
- self.blacklist.append(ip_segment)
- else:
- self.blacklist.append(ip.IPAddressBlock.parse(ip_segment))
- def random_ip(self):
- if len(self.generated_ips) == self.chooser.size():
- raise ValueError("Exhausted the space of possible ip-addresses, no new unique ip-address can be generated")
- while True:
- random_ip = self.chooser.random_ip()
- if not self._is_in_blacklist(random_ip) and random_ip not in self.generated_ips:
- self.generated_ips.add(random_ip)
- return str(random_ip)
- def clear(self, clear_blacklist=True, clear_generated_ips=True):
- if clear_blacklist: self.blacklist.clear()
- if clear_generated_ips: self.generated_ips.clear()
- def _is_in_blacklist(self, ip: ip.IPAddress):
- return any(ip in block for block in self.blacklist)
- class MappingIPGenerator(IPGenerator):
- def __init__(self, *args, **kwargs):
- super().__init__(self, *args, **kwargs)
- self.mapping = {}
- def clear(self, clear_generated_ips=True, *args, **kwargs):
- super().clear(self, clear_generated_ips=clear_generated_ips, *args, **kwargs)
- if clear_generated_ips:
- self.mapping = {}
- def get_mapped_ip(self, key):
- if key not in self.mapping:
- self.mapping[key] = self.random_ip()
- return self.mapping[key]
- def __getitem__(self, item):
- return self.get_mapped_ip(item)
|