123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561 |
- import os
- import logging
- import argparse
- import time
- import socket
- import random
- import requests
- import json
- import struct
- import multiprocessing
- from multiprocessing import Process, Queue
- import re
- from ipv4 import IPv4Address, IPv4Network
- import numpy as np
- from pypacker import psocket
- from pypacker import ppcap
- from pypacker.layer12.ethernet import Ethernet
- from pypacker.layer3 import ip
- from pypacker.layer4 import tcp
- logging.basicConfig(format="%(levelname)s (%(funcName)s): %(message)s")
- logger = logging.getLogger("pra_framework")
- logger.setLevel(logging.DEBUG)
- CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
- FILE_NODES = CURRENT_DIR + "/simulated_nodes.txt"
- RESPONSETYPE_NONE = 0
- RESPONSETYPE_TCP_RST = 1
- RESPONSETYPE_TCP_SYNACK = 2
- RESPONSETYPE_ICMP = 4
- RESPONSETYPES_TCP = {RESPONSETYPE_TCP_RST, RESPONSETYPE_TCP_SYNACK}
- RESPONSE_TYPES_DESCR = {RESPONSETYPE_NONE: "NONE",
- RESPONSETYPE_TCP_RST: "RST",
- RESPONSETYPE_TCP_SYNACK: "SYNACK",
- RESPONSETYPE_ICMP: "ICMP"}
- packipv4 = struct.Struct(">I").pack
- split_tab = re.compile("\t").split
- #cert = ("ssl/simulator/simulator_cert.pem", "ssl/simulator/simulator_key.pem")
- requests.packages.urllib3.disable_warnings()
- def store_packets(packets, filename="exception_packets.pcap"):
- writer = ppcap.Writer(filename=filename)
- for bts in packets:
- writer.write(bts)
- writer.close()
- class MonitorSimulator(object):
- """
- Simulates monitor nodes on a host system which in turn send attack information to a central CIDS.
- The simulation can be done using a virtual interface:
- modprobe dummy (optional if already loaded)
- ip link set name eth10 dev dummy0
- ip link show eth10
- ifconfig eth10 txqueuelen 10000
- Optimazation of /etc/sysctl.conf should be done.
- """
- def __init__(self,
- # total amount of monitors (monitor_ips + random generated)
- amount_monitors=10000,
- cidr_bits=0,
- # total amount of addresses which give scanner feedback
- amount_non_monitors=10000,
- # monitor IP addresses as ["1.2.3.4", ...]
- monitor_ips=[],
- seed=123456789,
- url_tracing="https://localhost:443",
- interface_name="lo",
- buffer_size=500000,
- # probability that a monitor returns a relevant scanner feedback (RST, SYN/ACK)
- prop_mon_scannerfeedback=0.8,
- prop_nwdrop=0.0,
- file_blacklist=None):
- logger.debug("monitors=%d, non monitors=%d, scanner feedback=%f, nw drop=%f" % (amount_monitors,
- amount_non_monitors,
- prop_mon_scannerfeedback,
- prop_nwdrop))
- self._amount_monitors = amount_monitors
- self._amount_non_monitors = amount_non_monitors
- self._prop_mon_scannerfeedback = prop_mon_scannerfeedback
- self._prop_nwdrop = prop_nwdrop
- self._blacklist_nw_objs = []
- self._blacklist_int_sub8 = set()
- if file_blacklist is not None:
- self._blacklist_nw_objs, self._blacklist_int_sub8 = self.read_blacklist(file_blacklist)
- self._rand = random.Random()
- self._rand.seed(a=seed, version=1)
- self._rand_nwloss = random.Random()
- self._rand_nwloss.seed(a=seed, version=1)
- logger.debug("TraCINg url=%s, interface=%s" % (url_tracing, interface_name))
- logger.debug("Initiating Queue, size: %d" % buffer_size)
- self._queue = Queue(maxsize=buffer_size)
- # Group response probabilities
- self._responsetypes = [
- RESPONSETYPE_NONE,
- RESPONSETYPE_TCP_RST,
- RESPONSETYPE_TCP_SYNACK,
- RESPONSETYPE_ICMP]
- self._file_identified_monitors = "./output_dir_testing/identified_monitors.csv"
- self._listening_ips = {}
- # TODO: adjust this for different subnets
- # this has to match _cidr_bits_stage1 in main_attack.py
- # /0
- self._host_addresses_mask = 0xFFFFFFFF
- self._host_addresses_mask >>= cidr_bits
- logger.debug("Max host IP addresses (host mask): %X" % self._host_addresses_mask)
- self._initiate_listening_ips(monitor_ips)
- logger.debug("total amount of listening addresses=%d (w/ + w/o feedback)" % len(self._listening_ips))
- self._sockets_read = []
- self._sockets_write = []
- self._is_running = False
- # TODO: adjust this on other platforms
- self._read_processes_amount = 2
- self._read_processes = []
- socket_read = psocket.SocketHndl(iface_name=interface_name,
- timeout=10,
- buffersize_recv=2 ** 29)
- self._sockets_read.append(socket_read)
- logger.debug("creating %d processes for reading packets" % self._read_processes_amount)
- for cnt in range(self._read_processes_amount):
- proc = Process(target=self._packet_collect_cycler, args=(cnt + 1,
- socket_read,
- self._queue,
- self._listening_ips))
- self._read_processes.append(proc)
- requests_session = requests.Session()
- #adapter = requests.adapters.HTTPAdapter(pool_connections=10000, pool_maxsize=10000)
- #requests_session.mount('https://', adapter)
- self._attack_reaction_processes = []
- # TODO: adjust this on other platforms
- self._attack_reaction_processes_amount = 5
- logger.debug("creating %d processes for attack reaction" % self._attack_reaction_processes_amount)
- for cnt in range(self._attack_reaction_processes_amount):
- socket_write = psocket.SocketHndl(iface_name=interface_name,
- timeout=60 * 60 * 24 * 7,
- #buffersize_send=2 ** 23)
- buffersize_send=2 ** 29)
- self._sockets_write.append(socket_write)
- attack_reaction_process = Process(
- target=self._attack_reaction_cycler,
- args=(cnt + 1,
- socket_write,
- self._queue,
- requests_session,
- url_tracing,
- self._listening_ips))
- self._attack_reaction_processes.append(attack_reaction_process)
- def start(self):
- if self._is_running:
- return
- logger.debug("starting collection and reaction logic")
- self._is_running = True
- for process in self._attack_reaction_processes:
- process.start()
- time.sleep(2)
- for proc in self._read_processes:
- proc.start()
- def stop(self):
- if not self._is_running:
- return
- self._is_running = False
- for sock in self._sockets_read + self._sockets_write:
- sock.close()
- self._queue.close()
- for proc in self._read_processes:
- # don't wait for process to finish, just terminate
- proc.terminate()
- for proc in self._attack_reaction_processes:
- # don't wait for process to finish, just terminate
- proc.terminate()
- def _initiate_listening_ips(self, monitor_ips):
- """
- Initiate IP addresses which trigger alerts and/or give scanner feedback.
- monitor_ips -- monitor IPs to be added as "a.b.c.d"
- """
- # apply reaction on custom defined monitors
- if len(monitor_ips) > 0:
- logger.debug("listening IP addresses were given explicitly, adding %d" % len(monitor_ips))
- for ip_mon in monitor_ips:
- self._listening_ips[IPv4Address(ip_str=ip_mon).packed] = [RESPONSETYPE_TCP_RST, True]
- #logger.info("creating %d random IPs" % self._amount_monitors)
- self._create_random_ips(self._amount_monitors, is_monitor_ip=True)
- logger.debug("total amount of monitors=%d" % len(self._listening_ips))
- if self._amount_non_monitors > 0:
- # target = current amount + non_monitor_ips * probability_nonmonitor
- target_amount = len(self._listening_ips) + self._amount_non_monitors
- logger.info("creating %d non monitor feedback IPs" % self._amount_non_monitors)
- if target_amount > self._host_addresses_mask:
- raise Exception("!!!! too many addresses to create! %d >= %d" % (target_amount, self._host_addresses_mask))
- self._create_random_ips(target_amount, is_monitor_ip=False)
- amount_mon = sum([1 for _, resp_mon in self._listening_ips.items() if resp_mon[1]])
- amount_nonmon = sum([1 for _, resp_mon in self._listening_ips.items() if not resp_mon[1]])
- logger.info("sanity check: monitors=%d, non monitors=%d" % (amount_mon, amount_nonmon))
- self._set_feedback_types()
- def check_matches(self):
- """
- Check if every match in ./output_dir_testing/identified_monitors.csv is
- really a monitor.
- """
- fd = open(self._file_identified_monitors, "r")
- monitor_found = 0
- monitor_false_positive_no_monitor = 0
- monitor_false_positive_unknown = 0
- # skip header
- fd.readline()
- all_ips = set()
- for line in fd:
- ip_str = split_tab(line)[0]
- try:
- ips = IPv4Network(nw_ip_str_prefix=ip_str).hosts if "/" in line else [IPv4Address(ip_str=ip_str).packed]
- except TypeError:
- continue
- except Exception as ex:
- logger.warning("something went wrong while checking IP address=%r" % ip_str)
- print(ex)
- break
- for ip_bytes in ips:
- if ip_bytes in all_ips:
- logger.warning("allready counted, duplicate? from file=%s, converted=%r" %
- (ip_str, IPv4Address(ip_bytes=ip_bytes)))
- all_ips.add(ip_bytes)
- try:
- assert self._listening_ips[ip_bytes][1] is True
- monitor_found += 1
- except KeyError:
- monitor_false_positive_unknown += 1
- if monitor_false_positive_unknown % 100 == 0:
- logger.debug("%d: unknown to simulator: %r" % (monitor_false_positive_unknown,
- IPv4Address(ip_bytes=ip_bytes)))
- except AssertionError:
- monitor_false_positive_no_monitor += 1
- logger.debug("known to simulator but not a monitor: %r" % IPv4Address(ip_bytes=ip_bytes))
- logger.info("correctly identified monitors: %d" % monitor_found)
- logger.info("unknown to simulator (false positive): %d" % monitor_false_positive_unknown)
- logger.info("found but not monitor (false positive): %d" % monitor_false_positive_no_monitor)
- fd.close()
- def _set_feedback_types(self):
- """
- Set the type of feedback given on scanner level.
- """
- logger.debug("setting feedback types")
- randrange = self._rand.randrange
- rand_0_1 = self._rand.random
- for key, _ in self._listening_ips.items():
- #logger.debug(ip_bytes)
- # non monitor = give feedback in any case, monitor = feedback based on probability
- feedback_prop = self._prop_mon_scannerfeedback if self._listening_ips[key][1] else 1.0
- is_feedback = rand_0_1() <= feedback_prop
- # feedback: RST or SYN/ACK
- self._listening_ips[key][0] = self._responsetypes[randrange(1, 3)] if is_feedback else RESPONSETYPE_NONE
- @staticmethod
- def read_blacklist(filename):
- """
- return -- list of blacklisted IP network objects
- """
- logger.debug("Reading blacklist from: %s" % filename)
- addresses_nw_obj = []
- addresses_int_sub8 = set()
- total_addresses = 0
- fd = open(filename, "r")
- for line in fd:
- try:
- if line[0] == "#":
- continue
- nw = IPv4Network(nw_ip_str_prefix=line)
- total_addresses += nw.num_addresses
- addresses_nw_obj.append(nw)
- addresses_int_sub8.add(nw.ip_int >> 24)
- except:
- pass
- fd.close()
- logger.debug("Blacklist address groups=%d, addresses=%d, sub8=%d" %
- (len(addresses_nw_obj), total_addresses, len(addresses_int_sub8)))
- return addresses_nw_obj, addresses_int_sub8
- def _create_random_ips(self, target_amount, is_monitor_ip=True):
- """
- Create random even distributed IP addresses.
- target_amount -- Add addresses until this size is reached
- is_monitor_ip -- Add addresses of type monitor if True, add non monitor otherwise
- """
- logger.debug("creating %d random IP addresses, monitor=%r" % (target_amount, is_monitor_ip))
- randrange = self._rand.randrange
- #feedback_prop = self._prop_mon_scannerfeedback if is_monitor_ip else 1.0
- cnt = 0
- while len(self._listening_ips) < target_amount:
- cnt += 1
- ip_num = randrange(0, self._host_addresses_mask)
- ip_bytes = packipv4(ip_num)
- # don't overwrite old values
- #if ip_bytes in self._listening_ips:
- # continue
- if (ip_num >> 24) in self._blacklist_int_sub8:
- #logger.debug("checking: %r" % ip_bytes)
- for ip_nw in self._blacklist_nw_objs:
- if ip_num & ip_nw.ip_int == ip_nw.ip_int:
- #logger.debug("skipping: %r" % ip_nw)
- continue
- #logger.debug(ip_bytes)
- # feedback: RST or SYN/ACK
- #print(ip_bytes)
- self._listening_ips[ip_bytes] = [None, is_monitor_ip]
- def _create_clustered_monitors(self, target_amount):
- """
- Create clustered IP addresses by generating sequential addresses using
- pareto and exponential distributions.
- target_amount -- target amount of IP addresses for _listening_ips
- """
- logger.info("creating %d random clustered IPs" % self._amount_monitors)
- # classical pareto by "scale=m=1"
- #s = np.random.pareto(a, 10000) + m
- # shape = a = 1.x, scale = 1 (not adjusted), location = 1 (min value 1)
- # mean = sum/length = shape * scale / (shape - 1)
- # -> a=2.1: mean=~1.9
- # -> a=1.1: mean=~10
- # -> a=1.01: mean=~100
- pareto = np.random.pareto
- pareto_shape = 1.01
- exponential = np.random.exponential
- # mean=scale
- exponential_scale = 200
- current_ip = 0
- ip_max = self._host_addresses_mask - 10000
- if ip_max < 0:
- logger.warning("address range too small: %d<%d, will not create any monitors" %
- (self._host_addresses_mask, 10000))
- return
- while len(self._listening_ips) < target_amount and current_ip < ip_max:
- sequential_add = int(pareto(pareto_shape) + 1)
- logger.debug(sequential_add)
- for ip_int in range(current_ip, current_ip + sequential_add):
- ip_bytes = packipv4(ip_int)
- self._listening_ips[ip_bytes] = [None, True]
- current_ip += sequential_add + int(exponential(scale=exponential_scale))
- if len(self._listening_ips) < target_amount:
- logger.warning("could not create enough monitors (to small range?) %d < %d" %
- (len(self._listening_ips), target_amount))
- def _packet_collect_cycler(self, procnum, sockethndl, queue, ip_whitelist):
- """
- Collects packets and puts them into the Queue if the destination IP address
- matches an address in ip_whitelist.
- sockethndl -- A SocketHandler to read bytes from
- queue -- A queue to write bytes to if an destination IP addres matches
- ip_whitelist -- A dictionary posing a whitelist
- """
- logger.debug("starting listening process Nr. %d" % procnum)
- psock_recv = sockethndl._socket_recv.recv
- queue_put = queue.put
- cnt = 0
- last_cnt = 0
- while True:
- try:
- bts = psock_recv(64)
- cnt += 1
- # logger.debug("...")
- # time.sleep(1)
- if not bts[14 + 16: 14 + 16 + 4] in ip_whitelist:
- continue
- #logger.debug("!!!!!!! got a packet")
- queue_put(bts)
- except socket.timeout:
- # logger.debug("read timeout..")
- if last_cnt != cnt:
- logger.debug("collector %d: amount of probes to NW=%d" % (procnum, cnt))
- last_cnt = cnt
- #continue
- def _attack_reaction_cycler(self, procnum, socket_write, queue, requests_session, url_tracing, listening_ips):
- """
- Examines packets in the buffer and reacts on SYN-pings for
- specific target IPs. Possible reactions are: sending attack
- events to TraCINg and/or feedback on scanner level
- """
- logger.debug("starting reaction process No. %d" % procnum)
- cnt = 0
- while True:
- bts = queue.get()
- try:
- packet = Ethernet(bts)
- except:
- logger.warning("could not parse received packet:\n%r" % bts)
- store_packets(bts)
- continue
- ip_bytes = packet[ip.IP].dst
- # this is a monitor, send event to TraCINg
- if listening_ips[ip_bytes][1]:
- cnt += 1
- try:
- #if packet.body_handler.dst not in listening_ips:
- # continue
- p_ip = packet.ip
- p_tcp = p_ip.tcp
- if cnt % 500 == 0:
- logger.debug("%d> %d: monitor was scanned: %s:%d -> %s:%d" %
- (procnum, cnt, p_ip.src_s, p_tcp.sport, p_ip.dst_s, p_tcp.dport))
- """
- Inform TraCINg about an attack.
- """
- post_data_dict = {
- "sensor": {
- "name": "monitorsimulation_" + packet[ip.IP].dst_s,
- "type": "Honeypot"},
- "src": {
- "ip": packet[ip.IP].src_s,
- "port": "%d" % packet[tcp.TCP].sport},
- "dst": {
- "ip": packet[ip.IP].dst_s,
- "port": "%d" % packet[tcp.TCP].dport},
- "type": 11,
- "log": "Predefined Log",
- "md5sum": "7867de13bf22a7f3e3559044053e33e7",
- "date": ("%d" % time.time())
- }
- #headers = {"Content-type": "application/x-www-form-urlencoded",
- # "Accept": "text/plain"}
- post_data = json.dumps(post_data_dict)
- # logger.debug("POST data:")
- # logger.debug(post_data)
- requests_session.post(url=url_tracing, data=post_data, verify=False, stream=False)
- # response = conn.getresponse()
- #logger.debug(response)
- #conn = http.client.HTTPSConnection(self._tracing_host, self._tracing_port)
- #conn.request("POST", self._tracing_path, body=post_data)
- #logger.warning("attack event sent to TraCINg, monitor: %s" % packet[ip.IP].dst_s)
- except Exception as ex:
- logger.warning("could not inform TraCINg: %r" % ex)
- try:
- responsetype = listening_ips[ip_bytes][0]
- if responsetype == RESPONSETYPE_NONE:
- continue
- elif responsetype in RESPONSETYPES_TCP:
- tcp_packet = packet.body_handler.body_handler
- tcp_packet.ack = tcp_packet.seq + 1
- tcp_packet.seq = 12345
- tcp_packet.flags = tcp.TH_SYN | tcp.TH_ACK if responsetype == RESPONSETYPE_TCP_SYNACK else tcp.TH_RST
- packet.reverse_all_address()
- #socket_write.send(packet.bin())
- socket_write.send(packet.bin(update_auto_fields=False))
- elif responsetype == RESPONSETYPE_ICMP:
- # ICMP indicates unreachable hosts, ignore
- pass
- else:
- logger.warning("unknown response type for %s: %r" % (packet.ip.dst_s, responsetype))
- except Exception as ex:
- logger.warning("could not send scanner feedback: %r" % ex)
- logger.debug("reaction cycler is terminating")
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("-i", "--interface", help="Interface to listen on", default="eth10")
- parser.add_argument("-m", "--monitors", help="Amount of monitors to be simulated", type=int, default=1000)
- parser.add_argument("-r", "--cidrbits", help="CIDR bits of simulated network", type=int, default=0)
- parser.add_argument("-f", "--monitorfeedback", help="Probability for monitor feedback", type=float, default=0.9)
- parser.add_argument("-n", "--nonmonitors", help="Amount of non-monitors to be simulated", type=int, default=0)
- parser.add_argument("-d", "--nwdrop", help="Probability for network drops", type=float, default=0)
- parser.add_argument("-b", "--buffersize", help="Buffer to be used for storing received packets", type=int,
- default=1000000)
- parser.add_argument("-s", "--seed", help="Seed to be used to distribute nodes", type=int, default=123456789)
- parser.add_argument("-u", "--url", help="HTTPS URL of TraCINg to send events to", default="https://localhost:443")
- args = parser.parse_args()
- monitor_ips_init = []
- logger.info("amount of CPUs: %d" % multiprocessing.cpu_count())
- monitorsimulator = MonitorSimulator(
- amount_monitors=args.monitors,
- cidr_bits=args.cidrbits,
- amount_non_monitors=args.nonmonitors,
- monitor_ips=monitor_ips_init,
- interface_name=args.interface,
- buffer_size=args.buffersize,
- url_tracing=args.url,
- prop_mon_scannerfeedback=args.monitorfeedback,
- prop_nwdrop=args.nwdrop)
- input("press enter to continue")
- monitorsimulator.start()
- print("")
- user_input = None
- while user_input != "quit":
- user_input = input("enter 'quit' to quit, 'check' to compare monitors to those in %s\n" %
- monitorsimulator._file_identified_monitors)
- if user_input == "check":
- monitorsimulator.check_matches()
- else:
- logger.debug("%d" % monitorsimulator._queue.qsize())
- print("")
- logger.debug("stopping simulation")
- monitorsimulator.stop()
|