PcapAddressOperations.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. from random import choice
  2. from ID2TLib import Statistics
  3. from ID2TLib.IPv4 import IPAddress
  4. is_ipv4 = IPAddress.is_ipv4
  5. class PcapAddressOperations():
  6. def __init__(self, statistics: Statistics, uncertain_ip_mult: int=3):
  7. """
  8. Initializes a pcap information extractor that uses the provided statistics for its operations.
  9. :param statistics: The statistics of the pcap file
  10. :param uncertain_ip_mult: the mutliplier to create new address space when the remaining observed space has been drained
  11. """
  12. self.statistics = statistics
  13. self.UNCERTAIN_IPSPACE_MULTIPLIER = uncertain_ip_mult
  14. self._init_ipaddress_ops()
  15. def get_probable_router_mac(self):
  16. """
  17. Returns the most probable router MAC address based on the most used MAC address in the statistics.
  18. """
  19. self.probable_router_mac, count = self.statistics.process_db_query("most_used(macAddress)", print_results=False)[0]
  20. return self.probable_router_mac # and count as a measure of certainty?
  21. def pcap_contains_priv_ips(self):
  22. """
  23. Returns True if the provided traffic contains private IPs, otherwise False.
  24. """
  25. return self.contains_priv_ips
  26. def get_priv_address_range(self):
  27. """
  28. Returns a tuple with the start and end of the observed private IP range.
  29. """
  30. # better way to handle error?
  31. if not self.pcap_contains_priv_ips():
  32. print("Error: .pcap does not contain any private ips.")
  33. return -1, -1
  34. return str(self.min_priv_ip), str(self.max_priv_ip)
  35. def get_count_rem_priv_ips(self):
  36. """
  37. Returns the number of private IPs in the pcap file that have not aldready been returned by get_existing_priv_ips.
  38. """
  39. return len(self.remaining_priv_ips)
  40. def get_existing_priv_ips(self, count: int=1):
  41. """
  42. Returns the given number of private IPs that are existent in the pcap file.
  43. :param count: the number of IPs to return
  44. :return: the chosen private IPs
  45. """
  46. # reasonable to include this?
  47. if not self.pcap_contains_priv_ips():
  48. print("Warning: .pcap does not contain any private ips.")
  49. return []
  50. if count > len(self.remaining_priv_ips):
  51. print("Warning: There are no more {} private IPs in the .pcap file. Returning all remaining private IPs.".format(count))
  52. total = min(len(self.remaining_priv_ips), count)
  53. retr_priv_ips = []
  54. priv_ips = self.remaining_priv_ips
  55. for _ in range(0, total):
  56. random_priv_ip = choice(tuple(priv_ips))
  57. retr_priv_ips.append(str(random_priv_ip))
  58. priv_ips.remove(random_priv_ip)
  59. # if count == 1:
  60. # return retr_priv_ips[0]
  61. return retr_priv_ips
  62. # also use IPs below minimum observed IP?
  63. # offset for later, start at x after minimum? e.g. start at 192.168.0.100
  64. # exclude the last IP of an IP segment because its broadcast?
  65. def get_new_priv_ips(self, count: int=1):
  66. """
  67. Returns in the pcap not existent private IPs that match the used segment. IPs can be returned
  68. that are either between the minimum and maximum observed IP and are therefore considered certain
  69. or that are above the observed maximum address, are more likely to not belong to the network using the
  70. private IP segment and are therefore considered uncertain.
  71. :param count: the number of IPs to return
  72. :return: the newly created private IP addresses
  73. """
  74. if not self.pcap_contains_priv_ips():
  75. print("Error: .pcap does not contain any private ips.")
  76. return []
  77. unused_priv_ips = self.unused_priv_ips
  78. uncertain_priv_ips = self.uncertain_priv_ips
  79. # warning reasonable?
  80. if count > len(unused_priv_ips):
  81. print("Warning: there are no {0} unused certain priv IPs in the .pcap file.\n \
  82. Returning {1} certain and {2} uncertain priv IPs.".format(count, len(unused_priv_ips), count-len(unused_priv_ips)))
  83. count_certain = min(count, len(unused_priv_ips))
  84. retr_priv_ips = []
  85. for _ in range(0, count_certain):
  86. random_priv_ip = choice(tuple(unused_priv_ips))
  87. retr_priv_ips.append(str(random_priv_ip))
  88. unused_priv_ips.remove(random_priv_ip)
  89. # retrieve uncertain priv ips
  90. if count_certain < count:
  91. count_uncertain = count - count_certain
  92. # check if new uncertain IPs have to be created
  93. if len(uncertain_priv_ips) < count_uncertain:
  94. ipspace_multiplier = self.UNCERTAIN_IPSPACE_MULTIPLIER
  95. max_new_ip = self.max_uncertain_priv_ip.to_int() + ipspace_multiplier * count_uncertain
  96. # adjust IP space multiplier and prevent broadcast address of private segment from being chosen as new IP
  97. while max_new_ip >= self.priv_ip_segment.last_address().to_int():
  98. max_new_ip -= 1
  99. count_new_ips = max_new_ip - self.max_uncertain_priv_ip.to_int()
  100. if count_new_ips < count_uncertain:
  101. print("Error: Cannot generate enough new private IPs because they would exceed the maximum private segment IP. Returning {}.".format(count_new_ips))
  102. # create ipspace_multiplier * count_uncertain new uncertain IP addresses
  103. last_gen_ip = None
  104. for i in range(1, count_new_ips + 1):
  105. ip = IPAddress.from_int(self.max_uncertain_priv_ip.to_int() + i)
  106. # exclude the broadcast address
  107. if ip.to_int() >= self.priv_ip_segment.last_address().to_int():
  108. break
  109. uncertain_priv_ips.add(ip)
  110. last_gen_ip = ip
  111. self.max_uncertain_priv_ip = last_gen_ip
  112. # choose the uncertain IPs to return
  113. total_uncertain = min(count_uncertain, len(uncertain_priv_ips))
  114. for _ in range(0, total_uncertain):
  115. random_priv_ip = choice(tuple(uncertain_priv_ips))
  116. retr_priv_ips.append(str(random_priv_ip))
  117. uncertain_priv_ips.remove(random_priv_ip)
  118. # if count == 1:
  119. # return retr_priv_ips[0]
  120. return retr_priv_ips
  121. def get_existing_external_ips(self, count: int=1):
  122. """
  123. Returns the given number of private IPs that are existent in the pcap file.
  124. :param count: the number of IPs to return
  125. :return: the chosen private IPs
  126. """
  127. # reasonable to include this?
  128. if not (len(self.public_ips) > 0):
  129. print("Warning: .pcap does not contain any public ips.")
  130. return []
  131. if count > len(self.remaining_public_ips):
  132. print("Warning: There are no {} public IPs in the .pcap file. Returning all existing public IPs.".format(count))
  133. total = min(len(self.remaining_public_ips), count)
  134. retr_public_ips = []
  135. public_ips = self.remaining_public_ips
  136. for _ in range(0, total):
  137. random_public_ip = choice(tuple(public_ips))
  138. retr_public_ips.append(str(random_public_ip))
  139. public_ips.remove(random_public_ip)
  140. # if count == 1:
  141. # return retr_public_ips[0]
  142. return retr_public_ips
  143. def _init_ipaddress_ops(self):
  144. """
  145. Load and process data needed to perform functions on the IP addresses contained in the statistics
  146. """
  147. all_ips = self.statistics.process_db_query("all(ipAddress)", print_results=False)
  148. # Prepare private address operations and on the side save all external IPs
  149. # find the private IP segment in use
  150. public_ips = set()
  151. priv_ip_segment = None
  152. self.contains_priv_ips = False
  153. first_priv_ip = None
  154. first_priv_ip_idx = -1
  155. # for that iterate over all IPs until the first private IP is found
  156. for i, ip in enumerate(all_ips):
  157. if not is_ipv4(ip):
  158. continue
  159. ip = IPAddress.parse(ip)
  160. if ip.is_private():
  161. priv_ip_segment = ip.get_private_segment()
  162. first_priv_ip_idx = i
  163. first_priv_ip = ip
  164. self.contains_priv_ips = True
  165. break
  166. # new function in IPv4 to shorten this?
  167. elif (not ip.is_localhost()) and (not ip.is_localhost()) and (not ip.is_multicast()) and (not ip.is_reserved()) and (not ip.is_zero_conf()):
  168. public_ips.add(ip)
  169. if not self.contains_priv_ips:
  170. #print("The Pcap File does not contain any private IPs")
  171. return
  172. # get minimum and maximum seen private IP. The addresses in-bewteen are considered
  173. # as certain to be part of the network the pcap traffic is from
  174. min_priv_ip, max_priv_ip = first_priv_ip, first_priv_ip
  175. priv_ips = {first_priv_ip}
  176. for ip in all_ips[first_priv_ip_idx+1:]:
  177. if not is_ipv4(ip):
  178. continue
  179. ip = IPAddress.parse(ip)
  180. if ip in priv_ip_segment:
  181. priv_ips.add(ip)
  182. if ip > max_priv_ip:
  183. max_priv_ip = ip
  184. elif ip < min_priv_ip:
  185. min_priv_ip = ip
  186. # new function in IPv4 to shorten this?
  187. elif (not ip.is_private()) and (not ip.is_localhost()) and (not ip.is_localhost()) and (not ip.is_multicast()) and (not ip.is_reserved()) and (not ip.is_zero_conf()):
  188. public_ips.add(ip)
  189. # save the certain unused priv IPs of the network
  190. unused_priv_ips = set()
  191. for i in range (min_priv_ip.to_int() + 1, max_priv_ip.to_int()):
  192. ip = IPAddress.from_int(i)
  193. if not ip in priv_ips:
  194. unused_priv_ips.add(ip)
  195. # save the gathered information for efficient later use
  196. self.public_ips = frozenset(public_ips)
  197. self.remaining_public_ips = public_ips
  198. self.min_priv_ip, self.max_priv_ip = min_priv_ip, max_priv_ip
  199. self.max_uncertain_priv_ip = max_priv_ip
  200. self.priv_ips = frozenset(priv_ips)
  201. self.remaining_priv_ips = priv_ips
  202. self.unused_priv_ips = unused_priv_ips
  203. self.generated_uncertain_ips = set()
  204. self.uncertain_priv_ips = set()
  205. self.priv_ip_segment = priv_ip_segment