123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- import struct
- class MetaPacket(type):
- """
- This Metaclass is a more efficient way of setting attributes than using __init__.
- This is done by reading name, format and default value out of __hdr__ in every subclass.
- This configuration is set one time when loading the module (not at instatiation).
- Attributes can be normally accessed using "obj.field" notation.
- General note: Callflaw is: __new__ (loading module) -> __init__ (initiate class)
- CAUTION:
- - List et al are _SHARED_ among all instantiated classes! A copy is needed on changes to them
- - New protocols: don't use header fields having same name as methods in Packet class
- """
- def __new__(cls, clsname, clsbases, clsdict):
- # Using properties will slow down access to header fields but it's needed:
- # This way we get informed about get-access more efficiently than using
- # __getattribute__ (slow access for header fields vs. slow access
- # for ALL class fields).
- def get_setter(varname, is_field_type_simple=True, is_field_static=True):
- """
- varname -- name of the variable to set the property for
- is_field_type_simple -- get property for simple static or dynamic type if True, else TriggerList
- is_field_static -- if is_field_type_simple is True: get static type (int, fixed size bytes, ...),
- else dynamic (format "xs") which can change in format (eg DNS names)
- return -- set-property for simple types or triggerlist
- """
- varname_shadowed = "_%s" % varname
- def setfield_simple(obj, value):
- """
- Unpack field ondemand
- """
- if obj._unpacked is not None and not obj._unpacked:
- # obj._unpacked = None means: dissect not yet finished
- obj._unpack()
- if value is None and obj.__getattribute__(varname_shadowed + "_active"):
- object.__setattr__(obj, varname_shadowed + "_active", False)
- obj._header_format_changed = True
- # logger.debug("deactivating field: %s" % varname_shadowed)
- elif value is not None and not obj.__getattribute__(varname_shadowed + "_active"):
- object.__setattr__(obj, varname_shadowed + "_active", True)
- obj._header_format_changed = True
- # logger.debug("activating field: %s" % varname_shadowed)
- if not is_field_static and value is not None:
- # simple dynamic field
- format_new = "%ds" % len(value)
- # logger.debug(">>> changing format for dynamic field: %r / %s / %s" % (obj.__class__, varname_shadowed, format_new))
- object.__setattr__(obj, varname_shadowed + "_format", format_new)
- obj._header_format_changed = True
- object.__setattr__(obj, varname_shadowed, value)
- obj._header_changed = True
- obj._notify_changelistener()
- def setfield_triggerlist(obj, value):
- """
- Clear list and add value as only value.
- value -- Packet, bytes (single or as list)
- """
- tl = obj.__getattribute__(varname_shadowed)
- if type(tl) is list:
- # we need to create the original TriggerList in order to unpack correctly
- # _triggerlistName = [b"bytes", callback] or
- # _triggerlistName = [b"", callback] (default initiation)
- # logger.debug(">>> initiating TriggerList")
- tl = obj._header_fields_dyn_dict[varname_shadowed](obj, dissect_callback=tl[1], buffer=tl[0])
- object.__setattr__(obj, varname_shadowed, tl)
- # this will trigger unpacking
- del tl[:]
- # TriggerList: avoid overwriting dynamic fields eg when using keyword constructor Class(key=val)
- if type(value) is list:
- tl.extend(value)
- else:
- tl.append(value)
- obj._header_changed = True
- obj._notify_changelistener()
- if is_field_type_simple:
- return setfield_simple
- else:
- return setfield_triggerlist
- def get_getter(varname, is_field_type_simple=True):
- """
- varname -- name of the variable to set the property for
- is_field_type_simple -- get property for simple static or dynamic type if True, else TriggerList
- return -- get-property for simple type or triggerlist
- """
- varname_shadowed = "_%s" % varname
- def getfield_simple(obj):
- """
- Unpack field ondemand
- """
- # logger.debug("getting value for simple field: %s" % varname_shadowed)
- if obj._unpacked is not None and not obj._unpacked:
- obj._unpack()
- # logger.debug("now returning value")
- return obj.__getattribute__(varname_shadowed)
- def getfield_triggerlist(obj):
- tl = obj.__getattribute__(varname_shadowed)
- # logger.debug(">>> getting Triggerlist: %r" % tl)
- if type(tl) is list:
- # _triggerlistName = [b"bytes", callback] or
- # _triggerlistName = [b"", callback] (default initiation)
- tl = obj._header_fields_dyn_dict[varname_shadowed](obj, dissect_callback=tl[1], buffer=tl[0])
- object.__setattr__(obj, varname_shadowed, tl)
- return tl
- if is_field_type_simple:
- return getfield_simple
- else:
- return getfield_triggerlist
- t = type.__new__(cls, clsname, clsbases, clsdict)
- # dictionary of TriggerLists: name -> TriggerListClass
- t._header_fields_dyn_dict = {}
- # get header-infos from subclass: [("name", "format", value), ...]
- hdrs = getattr(t, "__hdr__", None)
- # cache header for performance reasons, will be set to bytes later on
- t._header_cached = []
- # all header names
- t._header_field_names = []
- t._header_format_order = getattr(t, "__byte_order__", ">")
- # all header formats including byte order
- header_fmt = [t._header_format_order]
- if hdrs is not None:
- # every header var will get two additional values set:
- # var_active = indicates if header is active
- # var_format = indicates the header format
- # logger.debug("loading meta for: %s, st: %s" % (clsname, st))
- for hdr in hdrs:
- shadowed_name = "_%s" % hdr[0]
- t._header_field_names.append(shadowed_name)
- setattr(t, shadowed_name + "_active", True)
- # remember header format
- # t._header_field_infos[shadowed_name] = [True, hdr[1]]
- is_field_type_simple = False
- is_field_static = True
- if hdr[1] is not None or (hdr[2] is None or type(hdr[2]) == bytes):
- # simple static or simple dynamic type
- # we got one of: ("name", format, ???) = static or
- # ("name", None, ???) = dynamic
- # -> Format given = static, Format None = dynamic
- is_field_type_simple = True
- if hdr[1] is None:
- # assume simple dynamic field
- is_field_static = False
- setattr(t, shadowed_name + "_format", hdr[1])
- if is_field_type_simple:
- fmt = hdr[1]
- if hdr[2] is not None:
- # value given: field is active
- if fmt is None:
- # dynamic field
- fmt = "%ds" % len(hdr[2])
- setattr(t, shadowed_name + "_format", fmt)
- header_fmt.append(fmt)
- t._header_cached.append(hdr[2])
- """
- if fmt is not None:
- header_fmt.append(fmt)
- t._header_cached.append(hdr[2])
- """
- # logger.debug("--------> field is active: %r" % hdr[0])
- else:
- setattr(t, shadowed_name + "_active", False)
- # only simple fields can get deactivated
- setattr(t, shadowed_name + "_active", True if hdr[2] is not None else False)
- # set initial value via shadowed variable: _varname <- varname [optional in subclass: <- varname_s]
- # setting/getting value is done via properties.
- # logger.debug("init simple type: %s=%r" % (shadowed_name, hdr[2]))
- setattr(t, shadowed_name, hdr[2])
- setattr(t, hdr[0], property(
- get_getter(hdr[0], is_field_type_simple=True),
- get_setter(hdr[0], is_field_type_simple=True, is_field_static=is_field_static)
- )
- )
- else:
- # assume TriggerList
- # Triggerlists don't have initial default values (and can't get deactivated) TODO?
- t._header_fields_dyn_dict[shadowed_name] = hdr[2]
- # initial value of TiggerLists is: values to init empty list
- setattr(t, shadowed_name, [b"", None])
- setattr(t, hdr[0], property(
- get_getter(hdr[0], is_field_type_simple=False),
- get_setter(hdr[0], is_field_type_simple=False, is_field_static=is_field_static)
- )
- )
- # format and value needed for correct length in _unpack()
- header_fmt.append("0s")
- t._header_cached.append(b"")
- # logger.debug("<<<<")
- # logger.debug(">>> translated header names: %s/%r" % (clsname, t._header_name_translate))
- # current format as string
- t._header_format = struct.Struct("".join(header_fmt))
- # header size can be assigened by __init__() directly or given by _header_format.size
- t._header_len = t._header_format.size
- # track changes to header format (changes to simple dynamic fields or TriggerList)
- t._header_format_changed = False
- # cached header, return this if nothing changed
- t._header_cached = t._header_format.pack(*t._header_cached)
- # logger.debug("formatstring is: %s" % header_fmt)
- # body as raw byte string (None if handler is present)
- t._body_bytes = b""
- # name of the attribute which holds the object representing the body aka the body handler
- t._bodytypename = None
- # next lower layer: a = b + c -> b will be lower layer for c
- t._lower_layer = None
- # track changes to header values: This is needed for layers like TCP for
- # checksum-recalculation. Set to "True" on changes to header/body values, set to False on "bin()"
- # track changes to header values
- t._header_changed = False
- # track changes to body value like [None | bytes | body-handler] -> [None | bytes | body-handler]
- t._body_changed = False
- # objects which get notified on changes on header or body (shared)
- # TODO: use sets here
- t._changelistener = []
- # lazy handler data: [name, class, bytes]
- t._lazy_handler_data = None
- # indicates the most top layer until which should be unpacked (vs. lazy dissecting = just next upper layer)
- t._target_unpack_clz = None
- # inicates if static header values got already unpacked
- # [True|False] = Status after dissect, None = pre-dissect (not unpacked)
- t._unpacked = None
- # indicates if this packet contains fragmented data saved as body bytes
- t._fragmented = False
- t._dissect_error = False
- return t
|