123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521 |
- """
- Network traffic visualizer framework using Graph Tool and GTK+
- Requirements:
- graph-tool (>=v2.2.31)
- gtk+ libraries (>=3.12.2)
- Graph tool sources:
- http://graph-tool.skewed.de/download
- http://graph-tool.skewed.de/static/doc/quickstart.html
- Installation Note:
- Compiling Graph tool needs a whole lot of memory (>4GiB). On a 32Bit system you
- are encouraged to use the newst gcc compiler in order to minimize memory
- usage (gcc v4.8.3 worked on graph-tool v2.2.31)
- """
- import logging
- import time
- import random
- import threading
- logger = logging.getLogger("pypacker")
- try:
- import graph_tool
- from graph_tool import Graph, Vertex, Edge, GraphView
- from graph_tool.draw import arf_layout, sfdp_layout, fruchterman_reingold_layout, radial_tree_layout, GraphWindow
- from gi.repository import Gtk, GObject
- except Exception as e:
- logger.warning("Could not find graph-tool and/or Gtk+ libs which are needed for visualizer")
- logger.exception(e)
- _key_listener = []
- def key_press_event(self, widget, event):
- r"""Handle key press."""
- # print(event.keyval)
- if event.keyval == 114:
- self.fit_to_window()
- self.regenerate_surface(timeout=50)
- self.queue_draw()
- elif event.keyval == 115:
- self.reset_layout()
- elif event.keyval == 97:
- self.apply_transform()
- elif event.keyval == 112:
- if self.picked is False:
- self.init_picked()
- else:
- self.picked = False
- self.selected.fa = False
- self.vertex_matrix = None
- self.queue_draw()
- elif event.keyval == 0x7a:
- if isinstance(self.picked, PropertyMap):
- u = GraphView(self.g, vfilt=self.picked)
- self.fit_to_window(g=u)
- self.regenerate_surface(timeout=50)
- self.queue_draw()
- # key "t": call listener callbacks and update layout
- elif event.keyval == 116:
- # print("resetting positions")
- for l in _key_listener:
- l()
- self.apply_transform()
- self.reset_layout()
- return True
- # add new listener: press "t" to reorder node effectively
- graph_tool.draw.gtk_draw.GraphWidget.key_press_event = key_press_event
- def config_cb_default(packet, vertex_src, vertex_dst, edge, vertexprop_dict, edgeprop_dict):
- """
- Default configuration callback.
- """
- vertexprop_dict["text"][vertex_src] = "N"
- if vertex_dst is not None:
- vertexprop_dict["text"][vertex_dst] = "N"
- edgeprop_dict["text"][edge] = "E"
- def __getattr__autocreate(self, name):
- """
- Set default values for unknown variables having names suffix like:
- _n = 0
- _s = ""
- _b = False
- _y = b""
- _l = []
- _d = {}
- _e = set()
- """
- defaults = {"_n": 0, "_s": "", "_b": False, "_y": b"", "_l": [], "_d": {}, "_e": set()}
- try:
- value = defaults[name[-2:]]
- # logger.debug("suffix: %s" % value)
- except:
- raise AttributeError()
- object.__setattr__(self, name, value)
- return value
- # Allow auto creation of variables for convenience.
- # This comes in handy when implementing "config_cb"
- Vertex.__getattr__ = __getattr__autocreate
- Edge.__getattr__ = __getattr__autocreate
- class Visualizer(object):
- """
- Vizualizer using graph-tool to visualize nodes in a network.
- Note: xxx_cb is callback returning values stated in the desriptions.
- Livecycle: stopped - started (paused <-> running) - stopped (terminated)
- iterable -- an object which is iterable and returns packets, raises StopIteration on EOF
- src_dst_cb -- returns list like [source, destination] or [source, None] eg ["127.0.0.1", "127.0.0.2"].
- Destination can be set to None eg for broadcast-packets. If both are not None an edge will be
- added automatically. Source/destination must uniquely identify a node.
- Callback-structure: fct(packet)
- config_cb -- updates a dict representing the current node-config
- Callback-structure: fct(packet, vertex_src, vertex_dst, edge, vertex_props, edge_props).
- In order to store additional data (like number of total packets from this source) save
- them in the vertex-object itself like "vertex_src.my_data=123".
- additional_vertexprops -- additional vertex properties to be added via [name, format, defaultvalue]
- Property definitions: http://graph-tool.skewed.de/static/doc/draw.html
- additional_edgeprops -- additional properties to be added via [name, format, defaultvalue]
- Property definitions: http://graph-tool.skewed.de/static/doc/draw.html
- node_timeout -- timeout until node vanishes in seconds, default is 60
- #update_interval -- update interval for drawing in seconds, default is 1
- """
- def __init__(self, iterable,
- src_dst_cb,
- config_cb=config_cb_default,
- node_timeout=10,
- update_interval=1,
- additional_vertexprops=[],
- additional_edgeprops=[]):
- # given params
- self._iterable = iterable
- self._src_dst_cb = src_dst_cb
- self._config_cb = config_cb
- self._node_timeout = node_timeout
- # self._update_interval = update_interval
- # additional fields
- self._graphics_start_thread = threading.Thread(target=self._start_graphics)
- self._packet_update_thread = threading.Thread(target=self._packet_read_loop)
- self._packet_update_sema = threading.Semaphore(value=0)
- # removing vertices/edges in parallel makes trouble, synchronize update and graphics thread
- self._cleanup_sema = threading.Semaphore(value=0)
- self._want_cleanup = False
- self._cleanup_vertices = []
- # temporarily paused
- self._is_paused = True
- self._is_stopped = True
- # is visualizer definitely terminated?
- self._is_terminated = False
- #
- self._last_cleanup = time.time()
- # self._last_graphic_update = self._last_cleanup
- # self._psocket = None
- # dict: unique name (src) -> vertex object
- self._vertices_dict = {}
- # dict: unique name (src_dst) -> edge object
- self._edges_dict = {}
- # name : object
- self._vertex_properties = {}
- # name : default value
- self._vertex_properties_defaultvalues = {}
- self._vertex_livetime = {}
- self._edge_properties = {}
- # name : default value
- self._edge_properties_defaultvalues = {}
- self._init_graphwindow(additional_vertexprops, additional_edgeprops)
- # add reset-callback listener called when pressing "t"
- # only 1 instance allowed, old one will be removed
- _key_listener.clear()
- _key_listener.append(self._reset_positions)
- # TODO: more default properties
- DEFAULT_PROPERTIES_VERTEX = [["text", "string", "NODE!!!"],
- ["size", "int", 50],
- ["shape", "string", "circle"],
- ["color", "vector<float>", [0, 0, 0, 0.0]],
- ["fill_color", "vector<float>", [1, 1, 1, 0.0]],
- ["halo", "bool", False],
- ["halo_color", "vector<float>", [1, 0, 0, 0.4]]
- ]
- DEFAULT_PROPERTIES_EDGE = [["text", "string", "EDGE!!!"],
- ["color", "vector<float>", [0, 0, 0, 1]],
- ["dash_style", "vector<float>", []]
- ]
- def _init_graphwindow(self, additional_vertexprops=[], additional_edgeprops=[]):
- self._graph = Graph(prune=True, directed=False)
- # load properties
- self._positions = self._graph.new_vertex_property("vector<float>")
- for prop in Visualizer.DEFAULT_PROPERTIES_VERTEX + additional_vertexprops:
- self._add_property(True, prop)
- for prop in Visualizer.DEFAULT_PROPERTIES_EDGE + additional_edgeprops:
- self._add_property(False, prop)
- # pos = fruchterman_reingold_layout(self._graph, pos=self._positions)
- pos_layout = sfdp_layout(self._graph, K=10, verbose=True, pos=self._positions)
- # pos_layout = sfdp_layout(self._graph)
- # pos = radial_tree_layout(self._graph, 0)
- # pos_layout = self._positions
- self._graphwindow = GraphWindow(
- self._graph,
- # update_layout=True,
- # pos=self._positions,
- pos=pos_layout,
- # TODO: make this dynamic
- geometry=(400, 300),
- vertex_font_size=10,
- vertex_pen_width=1,
- # vertex_text_offset=[0,0],
- vprops=self._vertex_properties,
- edge_font_size=10,
- edge_pen_width=1,
- # edge_marker_size=12,
- # markers added allthough undirected???
- # edge_start_marker="arrow",
- # edge_end_marker="arrow",
- # edge_text_distance=2,
- eprops=self._edge_properties)
- # set optimal distance in order to make auto-layout working
- # self._graphwindow.graph.layout_K = 40
- # minimum 1 vertex on graph (avoid bug in graph-tool which leads to division by zero)
- # TODO: remove
- logger.debug("adding initial vertices")
- self._update_vertices("A", "B")
- self._update_vertices("B", "A")
- """
- self._update_vertices("A", "C")
- self._update_vertices("A", "D")
- self._update_vertices("D", None)
- self._update_vertices("E", None)
- self._update_vertices("F", None)
- self._update_vertices("G", None)
- self._update_vertices("H", None)
- self._update_vertices("I", None)
- self._update_vertices("J", None)
- self._update_vertices("K", None)
- self._update_vertices("L", None)
- self._update_vertices("M", None)
- self._update_vertices("M", "L")
- self._update_vertices("M", "F")
- self._update_vertices("A", "F")
- self._update_vertices("F", "F")
- self._update_vertices("K", "L")
- self._update_vertices("A", "L")
- """
- def _add_property(self, for_vertex, property_config):
- """
- Add a new property to be modified.
- for_vertex -- add vertex property if True, else add an edge property
- property_config -- property description like as list: ["name", "type_description", default_value]
- """
- logger.debug("adding property (%s): %r" % ("vertex" if for_vertex else "edge", property_config))
- if for_vertex:
- property = self._graph.new_vertex_property(property_config[1])
- self._vertex_properties[property_config[0]] = property
- self._vertex_properties_defaultvalues[property_config[0]] = property_config[2]
- else:
- property = self._graph.new_edge_property(property_config[1])
- self._edge_properties[property_config[0]] = property
- self._edge_properties_defaultvalues[property_config[0]] = property_config[2]
- def _cleanup_graph(self, current_time):
- """
- Remove vertices (+ attached edges) which are too old.
- """
- logger.debug("cleaning up graph")
- vertex_remove_local = []
- for name, last_update in self._vertex_livetime.items():
- if current_time - last_update > self._node_timeout:
- vertex_remove_local.append(name)
- vertex = self._vertices_dict[name]
- logger.debug("vertex to remove: %r" % vertex)
- # edges in graph should be removed automatically
- self._cleanup_vertices.append(vertex)
- self._want_cleanup = True
- # wait until graphics thread has removed vertices
- logger.debug("waiting until vertices are removed")
- self._cleanup_sema.acquire()
- for name in vertex_remove_local:
- del self._vertex_livetime[name]
- del self._vertices_dict[name]
- # vertex can be placed as _edges_dict[name][...] or _edges_dict[...][name]
- try:
- del self._edges_dict[name]
- except KeyError:
- # name not present
- pass
- for vertex_a in self._edges_dict:
- try:
- del self._edges_dict[vertex_a][name]
- except KeyError:
- # name not present
- pass
- logger.debug("finished removing local vertices")
- def _add_vertex(self):
- """
- Place a new vertex at a random position.
- return -- the newly added vertex
- """
- # random position in start
- # TODO: width/height
- random.seed(time.time())
- x = random.randint(10, 100)
- random.seed(time.time() + 1)
- y = random.randint(10, 100)
- vertex = self._graph.add_vertex()
- self._positions[vertex] = (x, y)
- # self._positions[vertex] = (50.0, 50.0)
- # TODO: find better place for this
- # self._reset_positions()
- return vertex
- def _reset_positions(self):
- """
- Put all vertices in a close distance in order to reorder them fast afterwards.
- """
- # logger.debug("resetting")
- cnt = 1
- for name, vertex in self._vertices_dict.items():
- random.seed(cnt + time.time())
- x = random.randint(1, 10)
- random.seed(cnt + time.time() + 1)
- y = random.randint(1, 10)
- self._positions[vertex] = (x, y)
- cnt += 1
- def _update_vertices(self, src, dst=None):
- """
- Add new vertex identified by src (and dst + edge between them) if not allready present.
- src -- unique source string
- dst -- unique destination string or None
- return -- vertex_source, vertex_dest, edge where vertex_dest and edge can be None
- """
- vertex_to_update = []
- # create new vertex
- try:
- vertex_src = self._vertices_dict[src]
- except KeyError:
- vertex_src = self._add_vertex()
- self._vertices_dict[src] = vertex_src
- vertex_to_update.append(vertex_src)
- vertex_dst = None
- edge_src_dst = None
- if dst is not None:
- # initiate dst and add edge between src<->dst
- try:
- vertex_dst = self._vertices_dict[dst]
- except KeyError:
- vertex_dst = self._add_vertex()
- self._vertices_dict[dst] = vertex_dst
- vertex_to_update.append(vertex_dst)
- edge = sorted([src, dst])
- add_edge = False
- try:
- edge_src_dst = self._edges_dict[edge[0]][edge[1]]
- except KeyError:
- if not edge[0] in self._edges_dict:
- self._edges_dict[edge[0]] = {}
- add_edge = True
- if add_edge:
- # TODO: don't add second edge but update arrows (both directions)
- edge_src_dst = self._graph.add_edge(vertex_src, vertex_dst)
- self._edges_dict[edge[0]][edge[1]] = edge_src_dst
- # logger.debug("!!!!! adding edge")
- # set default property values for edge
- for k, v in self._edge_properties_defaultvalues.items():
- # logger.debug("edge default val: %r: %s=%s" % (self._edge_properties[k], k, v))
- self._edge_properties[k][edge_src_dst] = v
- for vertex in vertex_to_update:
- # set default property values for vertices
- for k, v in self._vertex_properties_defaultvalues.items():
- # logger.debug("vertex default val: %r: %s=%s" % (self._vertex_properties[k], k, v))
- self._vertex_properties[k][vertex] = v
- return vertex_src, vertex_dst, edge_src_dst
- def _packet_read_loop(self):
- """
- Read packets from _iterable and update graph data until StopIteration
- is thrown by it or Visualizer is stopped.
- Take packets (pkt) instead of eg raw bytes for _src_dst_cb and _config_cb:
- avoid unneeded reparsing.
- """
- for pkt in self._iterable:
- # time.sleep(1)
- if self._is_paused:
- self._packet_update_sema.acquire()
- if self._is_stopped:
- break
- # analyze packet and update graph
- src, dst = self._src_dst_cb(pkt)
- if src is None:
- continue
- vertex_src, vertex_dst, edge = self._update_vertices(src, dst)
- self._config_cb(pkt,
- vertex_src,
- vertex_dst,
- edge,
- self._vertex_properties,
- self._edge_properties)
- # cleanup logic
- current_time = time.time()
- if dst is not None:
- self._vertex_livetime[dst] = current_time
- self._vertex_livetime[src] = current_time
- if current_time - self._last_cleanup > self._node_timeout:
- # TODO: temporarily disabled
- # self._cleanup_graph(current_time)
- self._last_cleanup = current_time
- logger.debug("finished iterating packets")
- def _update_graphics(self):
- if self._want_cleanup:
- for vertex in self._cleanup_vertices:
- logger.debug("removing vertex: %r" % vertex)
- # remove in/out-edges
- self._graph.clear_vertex(vertex)
- # nomen est omen
- self._graph.remove_vertex(vertex)
- self._cleanup_vertices.clear()
- self._want_cleanup = False
- self._cleanup_sema.release()
- # self._graphwindow.graph.regenerate_surface(lazy=True)
- self._graphwindow.graph.regenerate_surface(lazy=False)
- self._graphwindow.graph.queue_draw()
- return True
- def _start_graphics(self):
- logger.debug("initiating graphics")
- # cid = GObject.idle_add(self._update_graphics)
- cid = GObject.timeout_add(150, self._update_graphics)
- self._graphwindow.connect("delete_event", Gtk.main_quit)
- self._graphwindow.show_all()
- Gtk.main()
- logger.debug("window was closed...stopping")
- # no graphics = window was closed = nothing to be done anymore
- self.stop()
- def start(self):
- if self._is_terminated:
- return
- logger.debug("starting visualizer")
- self._is_stopped = False
- self._is_paused = False
- self._graphics_start_thread.start()
- self._packet_update_thread.start()
- def pause(self):
- if self._is_stopped:
- return
- logger.debug("pausing visualizer")
- self._is_paused = True
- def resume(self):
- if self._is_stopped:
- return
- logger.debug("resuming visualizer")
- self._is_paused = False
- # TODO: check locking mechanisms
- self._packet_update_sema.release()
- def stop(self):
- if self._is_terminated:
- return
- logger.debug("stopping visualizer")
- self._is_terminated = True
- # unlock locked packet-reader
- self.resume()
- self._is_stopped = True
|