123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- import os.path
- from xml.dom.minidom import *
- class MessageMapping:
- TAG_MAPPING_GROUP = "mappings"
- TAG_MAPPING = "mapping"
- ATTR_ID = "id"
- ATTR_LINENO = "line_number"
- ATTR_HAS_PACKET = "mapped"
- ATTR_PACKET_TIME = "packet_time"
- def __init__(self, messages):
- self.messages = messages
- self.id_to_packet = {}
- def map_message(self, message, packet):
- self.id_to_packet[message.msg_id] = packet
- def to_xml(self):
- doc = Document()
- mappings = doc.createElement(self.TAG_MAPPING_GROUP)
- doc.appendChild(mappings)
- for message in self.messages:
- mapping = doc.createElement(self.TAG_MAPPING)
- mapping.setAttribute(self.ATTR_ID, str(message.msg_id))
- mapping.setAttribute(self.ATTR_LINENO, str(message.line_no))
- mapping.setAttribute("Src", str(message.src["ID"]))
- mapping.setAttribute("Dst", str(message.dst["ID"]))
- mapping.setAttribute("Type", str(message.type.value))
- mapping.setAttribute("Time", str(message.time))
- packet = self.id_to_packet.get(message.msg_id)
- mapping.setAttribute(self.ATTR_HAS_PACKET, "true" if packet is not None else "false")
- if packet:
- mapping.setAttribute(self.ATTR_PACKET_TIME, str(packet.time))
- mappings.appendChild(mapping)
- return doc
- def write_to(self, buffer, close = True):
- buffer.write(self.to_xml().toprettyxml())
- if close: buffer.close()
- def write_to_file(self, filename: str, *args, **kwargs):
- self.write_to(open(filename, "w", *args, **kwargs))
- def write_next_to_pcap_file(self, pcap_filename : str, mapping_ext = "_mapping.xml", *args, **kwargs):
- pcap_base = os.path.splitext(pcap_filename)[0]
- self.write_to_file(pcap_base + mapping_ext, *args, **kwargs)
|