radiotap.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. """Radiotap"""
  2. from pypacker import pypacker, triggerlist
  3. import struct
  4. import logging
  5. logger = logging.getLogger("pypacker")
  6. RTAP_TYPE_80211 = 0
  7. # Ref: http://www.radiotap.org
  8. # Fields Ref: http://www.radiotap.org/defined-fields/all
  9. # defined flags ordered by appearance (big endian)
  10. TSFT_MASK = 0x01000000
  11. FLAGS_MASK = 0x02000000
  12. RATE_MASK = 0x04000000
  13. CHANNEL_MASK = 0x08000000
  14. FHSS_MASK = 0x10000000
  15. DB_ANT_SIG_MASK = 0x20000000
  16. DB_ANT_NOISE_MASK = 0x40000000
  17. LOCK_QUAL_MASK = 0x80000000
  18. TX_ATTN_MASK = 0x00010000
  19. DB_TX_ATTN_MASK = 0x00020000
  20. DBM_TX_POWER_MASK = 0x00040000
  21. ANTENNA_MASK = 0x00080000
  22. ANT_SIG_MASK = 0x00100000
  23. ANT_NOISE_MASK = 0x00200000
  24. RX_FLAGS_MASK = 0x00400000
  25. CHANNELPLUS_MASK = 0x00000400
  26. HT_MASK = 0x00000800
  27. AMPDU_MASK = 0x00001000
  28. VHT_MASK = 0x00002000
  29. # 7 bits reserved
  30. RT_NS_NEXT_MASK = 0x00000020
  31. VENDOR_NS_NEXT = 0x00000040
  32. EXT_MASK = 0x00000080
  33. # mask -> (length, alignment)
  34. RADIO_FIELDS = {
  35. TSFT_MASK : (8, 8),
  36. FLAGS_MASK : (1, 1),
  37. RATE_MASK : (1, 1),
  38. # channel + flags
  39. CHANNEL_MASK : (4, 2),
  40. # fhss + pattern
  41. FHSS_MASK : (2, 1),
  42. DB_ANT_SIG_MASK : (1, 1),
  43. DB_ANT_NOISE_MASK : (1, 1),
  44. LOCK_QUAL_MASK : (2, 2),
  45. TX_ATTN_MASK : (2, 2),
  46. DB_TX_ATTN_MASK : (2, 2),
  47. DBM_TX_POWER_MASK : (1, 1),
  48. ANTENNA_MASK : (1, 1),
  49. ANT_SIG_MASK : (1, 1),
  50. ANT_NOISE_MASK : (1, 1),
  51. RX_FLAGS_MASK : (2, 2),
  52. # CHANNELPLUS_MASK :,
  53. HT_MASK : (3, 1),
  54. AMPDU_MASK : (8, 4),
  55. VHT_MASK : (12, 2)
  56. # RT_NS_NEXT_MASK :,
  57. # VENDOR_NS_NEXT :,
  58. # EXT_MASK :
  59. }
  60. RADIO_FIELDS_MASKS = [
  61. TSFT_MASK,
  62. FLAGS_MASK,
  63. RATE_MASK,
  64. # channel + flags
  65. CHANNEL_MASK,
  66. # fhss + pattern
  67. FHSS_MASK,
  68. DB_ANT_SIG_MASK,
  69. DB_ANT_NOISE_MASK,
  70. LOCK_QUAL_MASK,
  71. TX_ATTN_MASK,
  72. DB_TX_ATTN_MASK,
  73. DBM_TX_POWER_MASK,
  74. ANTENNA_MASK,
  75. ANT_SIG_MASK,
  76. ANT_NOISE_MASK,
  77. RX_FLAGS_MASK,
  78. HT_MASK,
  79. AMPDU_MASK,
  80. VHT_MASK
  81. ]
  82. class FlagTriggerList(triggerlist.TriggerList):
  83. # no __init__ needed: we just add tuples
  84. def _pack(self):
  85. return b"".join([flag[1] for flag in self])
  86. def get_channelinfo(channel_bytes):
  87. """
  88. return -- [channel_mhz, channel_flags]
  89. """
  90. return [struct.unpack("<H", channel_bytes[0:2])[0], struct.unpack("<H", channel_bytes[2:4])[0]]
  91. class Radiotap(pypacker.Packet):
  92. __hdr__ = (
  93. ("version", "B", 0),
  94. ("pad", "B", 0),
  95. ("len", "H", 0x0800),
  96. ("present_flags", "I", 0),
  97. ("flags", None, FlagTriggerList) # stores: (XXX_MASK, value)
  98. )
  99. # handle frame check sequence
  100. def __get_fcs(self):
  101. try:
  102. return self._fcs
  103. except AttributeError:
  104. return b""
  105. def __set_fcs(self, fcs):
  106. self._fcs = fcs
  107. fcs = property(__get_fcs, __set_fcs)
  108. def _dissect(self, buf):
  109. flags = struct.unpack(">I", buf[4:8])[0]
  110. fcs_present = False
  111. off = 0
  112. if flags & TSFT_MASK == TSFT_MASK:
  113. off = 8
  114. if flags & FLAGS_MASK == FLAGS_MASK and struct.unpack(">B", buf[off: off])[0] & 0x10 != 0:
  115. logger.debug("fcs found")
  116. fcs_present = True
  117. pos_end = len(buf)
  118. if fcs_present:
  119. self._fcs = buf[-4:]
  120. pos_end = -4
  121. size_align = RADIO_FIELDS[mask]
  122. size = size_align[0]
  123. # check alignment
  124. mod = off % size_align[1]
  125. if mod != 0:
  126. # enlarge size by alignment
  127. size += (size_align[1] - mod)
  128. # logger.debug("got flag %02X, length/align: %r" % (mask, size_align))
  129. # add all fields for the stated flag
  130. value = buf[off: off + size]
  131. # FCS present?
  132. if mask == FLAGS_MASK and struct.unpack(">B", value)[0] & 0x10 != 0:
  133. # logger.debug("fcs found")
  134. fcs_present = True
  135. # logger.debug("adding flag: %s" % str(mask))
  136. flags.append((mask, value))
  137. off += size
  138. def bin(self, update_auto_fields=True):
  139. """Custom bin(): handle FCS."""
  140. return pypacker.Packet.bin(self, update_auto_fields=update_auto_fields) + self.fcs
  141. # load handler
  142. from pypacker.layer12 import ieee80211
  143. pypacker.Packet.load_handler(Radiotap,
  144. {
  145. RTAP_TYPE_80211: ieee80211.IEEE80211
  146. }
  147. )