|
@@ -230,7 +230,6 @@ class BaseAttack(metaclass=ABCMeta):
|
|
|
except ValueError:
|
|
|
return False, value
|
|
|
|
|
|
-
|
|
|
@staticmethod
|
|
|
def _is_domain(val: str):
|
|
|
"""
|
|
@@ -383,114 +382,6 @@ class BaseAttack(metaclass=ABCMeta):
|
|
|
|
|
|
return destination
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def generate_random_ipv4_address(ipClass, n: int = 1):
|
|
|
- """
|
|
|
- Generates n random IPv4 addresses.
|
|
|
- :param n: The number of IP addresses to be generated
|
|
|
- :return: A single IP address, or if n>1, a list of IP addresses
|
|
|
- """
|
|
|
-
|
|
|
- def is_invalid(ipAddress: ipaddress.IPv4Address):
|
|
|
- return ipAddress.is_multicast or ipAddress.is_unspecified or ipAddress.is_loopback or \
|
|
|
- ipAddress.is_link_local or ipAddress.is_reserved or ipAddress.is_private
|
|
|
-
|
|
|
-
|
|
|
- def generate_address(ipClass):
|
|
|
- if ipClass == "Unknown":
|
|
|
- return ipaddress.IPv4Address(random.randint(0, 2 ** 32 - 1))
|
|
|
- else:
|
|
|
-
|
|
|
- if "private" in ipClass:
|
|
|
- ipClass = ipClass[0]
|
|
|
- ipClassesByte1 = {"A": {1,126}, "B": {128,191}, "C":{192, 223}, "D":{224, 239}, "E":{240, 254}}
|
|
|
- temp = list(ipClassesByte1[ipClass])
|
|
|
- minB1 = temp[0]
|
|
|
- maxB1 = temp[1]
|
|
|
- b1 = random.randint(minB1, maxB1)
|
|
|
- b2 = random.randint(1, 255)
|
|
|
- b3 = random.randint(1, 255)
|
|
|
- b4 = random.randint(1, 255)
|
|
|
-
|
|
|
- ipAddress = ipaddress.IPv4Address(str(b1) +"."+ str(b2) + "." + str(b3) + "." + str(b4))
|
|
|
-
|
|
|
- return ipAddress
|
|
|
-
|
|
|
-
|
|
|
- ip_addresses = []
|
|
|
- for i in range(0, n):
|
|
|
- address = generate_address(ipClass)
|
|
|
- while is_invalid(address):
|
|
|
- address = generate_address(ipClass)
|
|
|
- ip_addresses.append(str(address))
|
|
|
-
|
|
|
- if n == 1:
|
|
|
- return ip_addresses[0]
|
|
|
- else:
|
|
|
- return ip_addresses
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def generate_random_ipv6_address(n: int = 1):
|
|
|
- """
|
|
|
- Generates n random IPv6 addresses.
|
|
|
- :param n: The number of IP addresses to be generated
|
|
|
- :return: A single IP address, or if n>1, a list of IP addresses
|
|
|
- """
|
|
|
-
|
|
|
- def is_invalid(ipAddress: ipaddress.IPv6Address):
|
|
|
- return ipAddress.is_multicast or ipAddress.is_unspecified or ipAddress.is_loopback or \
|
|
|
- ipAddress.is_link_local or ipAddress.is_private or ipAddress.is_reserved
|
|
|
-
|
|
|
- def generate_address():
|
|
|
- return ipaddress.IPv6Address(random.randint(0, 2 ** 128 - 1))
|
|
|
-
|
|
|
- ip_addresses = []
|
|
|
- for i in range(0, n):
|
|
|
- address = generate_address()
|
|
|
- while is_invalid(address):
|
|
|
- address = generate_address()
|
|
|
- ip_addresses.append(str(address))
|
|
|
-
|
|
|
- if n == 1:
|
|
|
- return ip_addresses[0]
|
|
|
- else:
|
|
|
- return ip_addresses
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def generate_random_mac_address(n: int = 1):
|
|
|
- """
|
|
|
- Generates n random MAC addresses.
|
|
|
- :param n: The number of MAC addresses to be generated.
|
|
|
- :return: A single MAC addres, or if n>1, a list of MAC addresses
|
|
|
- """
|
|
|
-
|
|
|
- def is_invalid(address: str):
|
|
|
- first_octet = int(address[0:2], 16)
|
|
|
- is_multicast_address = bool(first_octet & 0b01)
|
|
|
- is_locally_administered = bool(first_octet & 0b10)
|
|
|
- return is_multicast_address or is_locally_administered
|
|
|
-
|
|
|
- def generate_address():
|
|
|
- mac = [random.randint(0x00, 0xff) for i in range(0, 6)]
|
|
|
- return ':'.join(map(lambda x: "%02x" % x, mac))
|
|
|
-
|
|
|
- mac_addresses = []
|
|
|
- for i in range(0, n):
|
|
|
- address = generate_address()
|
|
|
- while is_invalid(address):
|
|
|
- address = generate_address()
|
|
|
- mac_addresses.append(address)
|
|
|
-
|
|
|
- if n == 1:
|
|
|
- return mac_addresses[0]
|
|
|
- else:
|
|
|
- return mac_addresses
|
|
|
-
|
|
|
-
|
|
|
def get_reply_delay(self, ip_dst):
|
|
|
"""
|
|
|
Gets the minimum and the maximum reply delay for all the connections of a specific IP.
|
|
@@ -513,7 +404,6 @@ class BaseAttack(metaclass=ABCMeta):
|
|
|
maxDelay = int(maxDelay) * 10 ** -6
|
|
|
return minDelay, maxDelay
|
|
|
|
|
|
-
|
|
|
def packetsToConvs(self,exploit_raw_packets):
|
|
|
"""
|
|
|
Classifies a bunch of packets to conversations groups. A conversation is a set of packets go between host A (IP,port)
|
|
@@ -588,6 +478,11 @@ class BaseAttack(metaclass=ABCMeta):
|
|
|
|
|
|
|
|
|
def get_inter_arrival_time_dist(self, packets):
|
|
|
+ """
|
|
|
+ Gets the inter-arrival time distribution of a set of packets.
|
|
|
+
|
|
|
+ :param packets: the packets to extract their inter-arrival time.
|
|
|
+ """
|
|
|
timeSteps = []
|
|
|
prvsPktTime = 0
|
|
|
for index, pkt in enumerate(packets):
|
|
@@ -607,17 +502,138 @@ class BaseAttack(metaclass=ABCMeta):
|
|
|
return dict
|
|
|
|
|
|
def clean_white_spaces(self, str):
|
|
|
+ """
|
|
|
+ Delete extra backslash from white spaces. This function is used to process the payload of packets.
|
|
|
+
|
|
|
+ :param str: the payload to be processed.
|
|
|
+ """
|
|
|
str = str.replace("\\n", "\n")
|
|
|
str = str.replace("\\r", "\r")
|
|
|
str = str.replace("\\t", "\t")
|
|
|
str = str.replace("\\\'", "\'")
|
|
|
return str
|
|
|
|
|
|
- def modify_payload(self,str_tcp_seg, orig_target_uri, target_uri, orig_ip_dst, target_host):
|
|
|
+ def modify_http_header(self,str_tcp_seg, orig_target_uri, target_uri, orig_ip_dst, target_host):
|
|
|
+ """
|
|
|
+ Substitute the URI and HOST in a HTTP header with new values.
|
|
|
+
|
|
|
+ :param str_tcp_seg: the payload to be processed.
|
|
|
+ :param orig_target_uri: old URI
|
|
|
+ :param target_uri: new URI
|
|
|
+ :param orig_ip_dst: old host
|
|
|
+ :param target_host: new host
|
|
|
+ """
|
|
|
if len(str_tcp_seg) > 0:
|
|
|
|
|
|
str_tcp_seg = str_tcp_seg[2:-1]
|
|
|
str_tcp_seg = str_tcp_seg.replace(orig_target_uri, target_uri)
|
|
|
str_tcp_seg = str_tcp_seg.replace(orig_ip_dst, target_host)
|
|
|
str_tcp_seg = self.clean_white_spaces(str_tcp_seg)
|
|
|
- return str_tcp_seg
|
|
|
+ return str_tcp_seg
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def generate_random_ipv4_address(ipClass, n: int = 1):
|
|
|
+ """
|
|
|
+ Generates n random IPv4 addresses.
|
|
|
+ :param n: The number of IP addresses to be generated
|
|
|
+ :return: A single IP address, or if n>1, a list of IP addresses
|
|
|
+ """
|
|
|
+
|
|
|
+ def is_invalid(ipAddress: ipaddress.IPv4Address):
|
|
|
+ return ipAddress.is_multicast or ipAddress.is_unspecified or ipAddress.is_loopback or \
|
|
|
+ ipAddress.is_link_local or ipAddress.is_reserved or ipAddress.is_private
|
|
|
+
|
|
|
+
|
|
|
+ def generate_address(ipClass):
|
|
|
+ if ipClass == "Unknown":
|
|
|
+ return ipaddress.IPv4Address(random.randint(0, 2 ** 32 - 1))
|
|
|
+ else:
|
|
|
+
|
|
|
+ if "private" in ipClass:
|
|
|
+ ipClass = ipClass[0]
|
|
|
+ ipClassesByte1 = {"A": {1,126}, "B": {128,191}, "C":{192, 223}, "D":{224, 239}, "E":{240, 254}}
|
|
|
+ temp = list(ipClassesByte1[ipClass])
|
|
|
+ minB1 = temp[0]
|
|
|
+ maxB1 = temp[1]
|
|
|
+ b1 = random.randint(minB1, maxB1)
|
|
|
+ b2 = random.randint(1, 255)
|
|
|
+ b3 = random.randint(1, 255)
|
|
|
+ b4 = random.randint(1, 255)
|
|
|
+
|
|
|
+ ipAddress = ipaddress.IPv4Address(str(b1) +"."+ str(b2) + "." + str(b3) + "." + str(b4))
|
|
|
+
|
|
|
+ return ipAddress
|
|
|
+
|
|
|
+ ip_addresses = []
|
|
|
+ for i in range(0, n):
|
|
|
+ address = generate_address(ipClass)
|
|
|
+ while is_invalid(address):
|
|
|
+ address = generate_address(ipClass)
|
|
|
+ ip_addresses.append(str(address))
|
|
|
+
|
|
|
+ if n == 1:
|
|
|
+ return ip_addresses[0]
|
|
|
+ else:
|
|
|
+ return ip_addresses
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def generate_random_ipv6_address(n: int = 1):
|
|
|
+ """
|
|
|
+ Generates n random IPv6 addresses.
|
|
|
+ :param n: The number of IP addresses to be generated
|
|
|
+ :return: A single IP address, or if n>1, a list of IP addresses
|
|
|
+ """
|
|
|
+
|
|
|
+ def is_invalid(ipAddress: ipaddress.IPv6Address):
|
|
|
+ return ipAddress.is_multicast or ipAddress.is_unspecified or ipAddress.is_loopback or \
|
|
|
+ ipAddress.is_link_local or ipAddress.is_private or ipAddress.is_reserved
|
|
|
+
|
|
|
+ def generate_address():
|
|
|
+ return ipaddress.IPv6Address(random.randint(0, 2 ** 128 - 1))
|
|
|
+
|
|
|
+ ip_addresses = []
|
|
|
+ for i in range(0, n):
|
|
|
+ address = generate_address()
|
|
|
+ while is_invalid(address):
|
|
|
+ address = generate_address()
|
|
|
+ ip_addresses.append(str(address))
|
|
|
+
|
|
|
+ if n == 1:
|
|
|
+ return ip_addresses[0]
|
|
|
+ else:
|
|
|
+ return ip_addresses
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def generate_random_mac_address(n: int = 1):
|
|
|
+ """
|
|
|
+ Generates n random MAC addresses.
|
|
|
+ :param n: The number of MAC addresses to be generated.
|
|
|
+ :return: A single MAC addres, or if n>1, a list of MAC addresses
|
|
|
+ """
|
|
|
+
|
|
|
+ def is_invalid(address: str):
|
|
|
+ first_octet = int(address[0:2], 16)
|
|
|
+ is_multicast_address = bool(first_octet & 0b01)
|
|
|
+ is_locally_administered = bool(first_octet & 0b10)
|
|
|
+ return is_multicast_address or is_locally_administered
|
|
|
+
|
|
|
+ def generate_address():
|
|
|
+ mac = [random.randint(0x00, 0xff) for i in range(0, 6)]
|
|
|
+ return ':'.join(map(lambda x: "%02x" % x, mac))
|
|
|
+
|
|
|
+ mac_addresses = []
|
|
|
+ for i in range(0, n):
|
|
|
+ address = generate_address()
|
|
|
+ while is_invalid(address):
|
|
|
+ address = generate_address()
|
|
|
+ mac_addresses.append(address)
|
|
|
+
|
|
|
+ if n == 1:
|
|
|
+ return mac_addresses[0]
|
|
|
+ else:
|
|
|
+ return mac_addresses
|