123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- import importlib
- import datetime as dt
- import os.path
- import xml.dom.minidom as minidom
- import pytz as pytz
- import ID2TLib.Label as Label
- import ID2TLib.TestLibrary as Lib
- class LabelManager:
- TAG_ROOT = 'labels'
- TAG_INPUT = 'input'
- TAG_OUTPUT = 'output'
- TAG_FILE_NAME = 'filename'
- TAG_FILE_HASH = 'sha256'
- TAG_ATTACK = 'attack'
- TAG_ATTACK_NAME = 'name'
- TAG_ATTACK_NOTE = 'note'
- TAG_ATTACK_SEED = 'seed'
- TAG_ATTACK_PACKETS = 'injected_packets'
- TAG_TIMESTAMP_START = 'timestamp_start'
- TAG_TIMESTAMP_END = 'timestamp_end'
- TAG_TIMESTAMP = 'timestamp'
- TAG_TIMESTAMP_HR = 'timestamp_hr'
- TAG_PARAMETERS = 'parameters'
- ATTR_VERSION = 'version_parser'
- ATTR_PARAM_USERSPECIFIED = 'user_specified'
- # update this attribute if XML scheme was modified
- ATTR_VERSION_VALUE = '0.3'
- def __init__(self, filepath_pcap=None):
- """
- Creates a new LabelManager for managing the attack's labels.
- :param filepath_pcap: The path to the PCAP file associated to the labels.
- """
- self.labels = list()
- self.filepath_input_pcap = filepath_pcap
- if filepath_pcap is not None:
- self.label_file_path = os.path.splitext(filepath_pcap)[0] + '_labels.xml'
- # only load labels if label file is existing
- if os.path.exists(self.label_file_path):
- self.load_labels()
- def add_labels(self, labels):
- """
- Adds a label to the internal list of labels.
- :param labels: The labels to be added
- """
- if isinstance(labels, list):
- self.labels = self.labels + [labels]
- elif isinstance(labels, tuple):
- for l in labels:
- self.labels.append(l)
- else:
- self.labels.append(labels)
- # sorts the labels ascending by their timestamp
- self.labels.sort()
- def write_label_file(self, filepath=None):
- """
- Writes previously added/loaded labels to a XML file. Uses the given filepath as destination path, if no path is
- given, uses the path in label_file_path.
- :param filepath: The path where the label file should be written to.
- """
- def get_subtree_fileinfo(xml_tag_root, filename) -> minidom.Element:
- """
- Creates the subtree for pcap file information (filename and hash).
- :return: The root node of the XML subtree
- """
- input_root = doc.createElement(xml_tag_root)
- file = doc.createElement(self.TAG_FILE_NAME)
- file.appendChild(doc.createTextNode(os.path.split(filename)[-1]))
- input_root.appendChild(file)
- hash_node = doc.createElement(self.TAG_FILE_HASH)
- hash_node.appendChild(doc.createTextNode(Lib.get_sha256(filename)))
- input_root.appendChild(hash_node)
- return input_root
- def get_subtree_timestamp(xml_tag_root, timestamp_entry):
- """
- Creates the subtree for a given timestamp, consisting of the unix time format (seconds) and a human-readable
- output.
- :param xml_tag_root: The tag name for the root of the subtree
- :param timestamp_entry: The timestamp as unix time
- :return: The root node of the XML subtree
- """
- timestamp_root = doc.createElement(xml_tag_root)
- # add timestamp in unix format
- timestamp = doc.createElement(self.TAG_TIMESTAMP)
- timestamp.appendChild(doc.createTextNode(str(timestamp_entry)))
- timestamp_root.appendChild(timestamp)
- # add timestamp in human-readable format
- timestamp_hr = doc.createElement(self.TAG_TIMESTAMP_HR)
- timestamp_hr_text = dt.datetime.fromtimestamp(timestamp_entry).astimezone(pytz.timezone('UTC')).strftime('%Y-%m-%d %H:%M:%S.%f')
- timestamp_hr.appendChild(doc.createTextNode(timestamp_hr_text))
- timestamp_root.appendChild(timestamp_hr)
- return timestamp_root
- def get_subtree_parameters(parameters):
- """
- Creates a subtree containing all parameters used to construct the attack
- :param parameters: The list of parameters used to run the attack
- :return: The root node of the XML subtree
- """
- parameters_root = doc.createElement(self.TAG_PARAMETERS)
- for param_key, param_value in parameters.items():
- param = doc.createElement(param_key.value)
- param.appendChild(doc.createTextNode(str(param_value.value)))
- param.setAttribute(self.ATTR_PARAM_USERSPECIFIED, str(param_value.user_specified))
- parameters_root.appendChild(param)
- return parameters_root
- if filepath is not None:
- self.label_file_path = os.path.splitext(filepath)[0] + '_labels.xml'
- # Generate XML
- doc = minidom.Document()
- node = doc.createElement(self.TAG_ROOT)
- node.setAttribute(self.ATTR_VERSION, self.ATTR_VERSION_VALUE)
- node.appendChild(get_subtree_fileinfo(self.TAG_INPUT, self.filepath_input_pcap))
- node.appendChild(get_subtree_fileinfo(self.TAG_OUTPUT, filepath))
- for label in self.labels:
- xml_tree = doc.createElement(self.TAG_ATTACK)
- # add attack to XML tree
- attack_name = doc.createElement(self.TAG_ATTACK_NAME)
- attack_name.appendChild(doc.createTextNode(str(label.attack_name)))
- xml_tree.appendChild(attack_name)
- attack_note = doc.createElement(self.TAG_ATTACK_NOTE)
- attack_note.appendChild(doc.createTextNode(str(label.attack_note)))
- xml_tree.appendChild(attack_note)
- attack_seed = doc.createElement(self.TAG_ATTACK_SEED)
- attack_seed.appendChild(doc.createTextNode(str(label.seed)))
- xml_tree.appendChild(attack_seed)
- injected_packets = doc.createElement(self.TAG_ATTACK_PACKETS)
- injected_packets.appendChild(doc.createTextNode(str(label.injected_packets)))
- xml_tree.appendChild(injected_packets)
- # add timestamp_start to XML tree
- xml_tree.appendChild(get_subtree_timestamp(self.TAG_TIMESTAMP_START, label.timestamp_start))
- # add timestamp_end to XML tree
- xml_tree.appendChild(get_subtree_timestamp(self.TAG_TIMESTAMP_END, label.timestamp_end))
- # add parameters to XML tree
- xml_tree.appendChild(get_subtree_parameters(label.parameters))
- node.appendChild(xml_tree)
- doc.appendChild(node)
- # Write XML to file
- file = open(self.label_file_path, 'w')
- file.write(doc.toprettyxml())
- file.close()
- def load_labels(self):
- """
- Loads the labels from an already existing label XML file located at label_file_path (set by constructor).
- """
- def get_value_from_node(node, tag_name, *child_number):
- """
- Returns the value located in the tag specified by tag_name from a given node. Walks therefor the
- node's children along as indicated by child_number, e.g., childNumber = (1,2,) first goes to the 1st child,
- and then to the 2nd child of the first child -> elem.childNodes[1].childNodes[2].
- """
- elem = node.getElementsByTagName(tag_name)
- if len(elem) == 1:
- elem = elem[0]
- for c in child_number:
- if len(elem.childNodes) > 0:
- elem = elem.childNodes[c]
- else:
- return ""
- return elem.data
- else:
- return ""
- print("Label file found. Loading labels...")
- try:
- dom = minidom.parse(self.label_file_path)
- except Exception:
- # TODO: more specific exception
- print('ERROR: Provided label file could not be parsed. Ignoring label file')
- return
- # Check if version of parser and version of file match
- version = dom.getElementsByTagName(self.TAG_ROOT)
- if len(version) > 0:
- version = version[0].getAttribute(self.ATTR_VERSION)
- if version == [] or not version == self.ATTR_VERSION_VALUE:
- print(
- "The file " + self.label_file_path + " was created by another version of ID2TLib.LabelManager. "
- "Ignoring label file.")
- self.input_filename = get_value_from_node(dom, self.TAG_INPUT, 1, 0)
- self.input_hash = get_value_from_node(dom, self.TAG_INPUT, 3, 0)
- self.output_filename = get_value_from_node(dom, self.TAG_OUTPUT, 1, 0)
- self.output_hash = get_value_from_node(dom, self.TAG_OUTPUT, 3, 0)
- # Parse attacks from XML file
- attacks = dom.getElementsByTagName(self.TAG_ATTACK)
- count_labels = 0
- for a in attacks:
- attack_name = get_value_from_node(a, self.TAG_ATTACK_NAME, 0)
- attack_note = get_value_from_node(a, self.TAG_ATTACK_NOTE, 0)
- timestamp_start = get_value_from_node(a, self.TAG_TIMESTAMP_START, 1, 0)
- timestamp_end = get_value_from_node(a, self.TAG_TIMESTAMP_END, 1, 0)
- attack_seed = get_value_from_node(a, self.TAG_ATTACK_SEED, 0)
- # Instantiate this attack to create a parameter list with the correct types
- attack_module = importlib.import_module("Attack." + attack_name)
- attack_class = getattr(attack_module, attack_name)
- attack = attack_class()
- # Loop through all parameters listed in the XML file
- param = a.getElementsByTagName(self.TAG_PARAMETERS)[0]
- for param in param.childNodes:
- # Skip empty text nodes returned by minidom
- if not isinstance(param, minidom.Text):
- import distutils.util
- param_name = param.tagName
- param_value = param.childNodes[0].nodeValue
- param_userspecified = bool(distutils.util.strtobool(param.getAttribute(self.ATTR_PARAM_USERSPECIFIED)))
- attack.add_param_value(param_name, param_value, param_userspecified)
- # Create the label from the data read
- label = Label.Label(attack_name, float(timestamp_start), float(timestamp_end), attack_seed, attack.params,
- attack_note)
- self.labels.append(label)
- count_labels += 1
- print("Read " + str(count_labels) + " label(s) successfully.")
|