PcapAddressOperations.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. from random import choice
  2. from Core import Statistics
  3. from ID2TLib.IPv4 import IPAddress
  4. is_ipv4 = IPAddress.is_ipv4
  5. class PcapAddressOperations():
  6. def __init__(self, statistics: Statistics, uncertain_ip_mult: int=3):
  7. """
  8. Initializes a pcap information extractor that uses the provided statistics for its operations.
  9. :param statistics: The statistics of the pcap file
  10. :param uncertain_ip_mult: the mutliplier to create new address space when the remaining observed space has been drained
  11. """
  12. self.statistics = statistics
  13. self.UNCERTAIN_IPSPACE_MULTIPLIER = uncertain_ip_mult
  14. stat_result = self.statistics.process_db_query("most_used(macAddress)", print_results=False)
  15. if isinstance(stat_result, list):
  16. self.probable_router_mac = choice(stat_result)
  17. else:
  18. self.probable_router_mac = stat_result
  19. self._init_ipaddress_ops()
  20. def get_probable_router_mac(self):
  21. """
  22. Returns the most probable router MAC address based on the most used MAC address in the statistics.
  23. :return: the MAC address
  24. """
  25. return self.probable_router_mac
  26. def pcap_contains_priv_ips(self):
  27. """
  28. Returns if the provided traffic contains private IPs.
  29. :return: True if the provided traffic contains private IPs, otherwise False
  30. """
  31. return self.contains_priv_ips
  32. def get_local_address_range(self):
  33. """
  34. Returns a tuple with the start and end of the observed local IP range.
  35. :return: The IP range as tuple
  36. """
  37. return str(self.min_local_ip), str(self.max_local_ip)
  38. def get_count_rem_local_ips(self):
  39. """
  40. Returns the number of local IPs in the pcap file that have not aldready been returned by get_existing_local_ips.
  41. :return: the not yet assigned local IPs
  42. """
  43. return len(self.remaining_local_ips)
  44. def get_existing_local_ips(self, count: int=1):
  45. """
  46. Returns the given number of local IPs that are existent in the pcap file.
  47. :param count: the number of local IPs to return
  48. :return: the chosen local IPs
  49. """
  50. if count > len(self.remaining_local_ips):
  51. print("Warning: There are no more {} local IPs in the .pcap file. Returning all remaining local IPs.".format(count))
  52. total = min(len(self.remaining_local_ips), count)
  53. retr_local_ips = []
  54. local_ips = self.remaining_local_ips
  55. for _ in range(0, total):
  56. random_local_ip = choice(sorted(local_ips))
  57. retr_local_ips.append(str(random_local_ip))
  58. local_ips.remove(random_local_ip)
  59. return retr_local_ips
  60. def get_new_local_ips(self, count: int=1):
  61. """
  62. Returns in the pcap not existent local IPs that are in proximity of the observed local IPs. IPs can be returned
  63. that are either between the minimum and maximum observed IP and are therefore considered certain
  64. or that are above the observed maximum address, are more likely to not belong to the local network
  65. and are therefore considered uncertain.
  66. :param count: the number of new local IPs to return
  67. :return: the newly created local IP addresses
  68. """
  69. # add more unused local ips to the pool, if needed
  70. while len(self.unused_local_ips) < count and self.expand_unused_local_ips() == True:
  71. pass
  72. unused_local_ips = self.unused_local_ips
  73. uncertain_local_ips = self.uncertain_local_ips
  74. count_certain = min(count, len(unused_local_ips))
  75. retr_local_ips = []
  76. for _ in range(0, count_certain):
  77. random_local_ip = choice(sorted(unused_local_ips))
  78. retr_local_ips.append(str(random_local_ip))
  79. unused_local_ips.remove(random_local_ip)
  80. # retrieve uncertain local ips
  81. if count_certain < count:
  82. count_uncertain = count - count_certain
  83. # check if new uncertain IPs have to be created
  84. if len(uncertain_local_ips) < count_uncertain:
  85. ipspace_multiplier = self.UNCERTAIN_IPSPACE_MULTIPLIER
  86. max_new_ip = self.max_uncertain_local_ip.to_int() + ipspace_multiplier * count_uncertain
  87. count_new_ips = max_new_ip - self.max_uncertain_local_ip.to_int()
  88. # create ipspace_multiplier * count_uncertain new uncertain local IP addresses
  89. last_gen_ip = None
  90. for i in range(1, count_new_ips + 1):
  91. ip = IPAddress.from_int(self.max_uncertain_local_ip.to_int() + i)
  92. # exclude the definite broadcast address
  93. if self.priv_ip_segment:
  94. if ip.to_int() >= self.priv_ip_segment.last_address().to_int():
  95. break
  96. uncertain_local_ips.add(ip)
  97. last_gen_ip = ip
  98. self.max_uncertain_local_ip = last_gen_ip
  99. # choose the uncertain IPs to return
  100. total_uncertain = min(count_uncertain, len(uncertain_local_ips))
  101. for _ in range(0, total_uncertain):
  102. random_local_ip = choice(sorted(uncertain_local_ips))
  103. retr_local_ips.append(str(random_local_ip))
  104. uncertain_local_ips.remove(random_local_ip)
  105. return retr_local_ips
  106. def get_existing_external_ips(self, count: int=1):
  107. """
  108. Returns the given number of external IPs that are existent in the pcap file.
  109. :param count: the number of external IPs to return
  110. :return: the chosen external IPs
  111. """
  112. if not (len(self.external_ips) > 0):
  113. print("Warning: .pcap does not contain any external ips.")
  114. return []
  115. total = min(len(self.remaining_external_ips), count)
  116. retr_external_ips = []
  117. external_ips = self.remaining_external_ips
  118. for _ in range(0, total):
  119. random_external_ip = choice(sorted(external_ips))
  120. retr_external_ips.append(str(random_external_ip))
  121. external_ips.remove(random_external_ip)
  122. return retr_external_ips
  123. def _init_ipaddress_ops(self):
  124. """
  125. Load and process data needed to perform functions on the IP addresses contained in the statistics
  126. """
  127. # retrieve local and external IPs
  128. all_ips_str = set(self.statistics.process_db_query("all(ipAddress)", print_results=False))
  129. # external_ips_str = set(self.statistics.process_db_query("ipAddress(macAddress=%s)" % self.get_probable_router_mac(), print_results=False)) # including router
  130. # local_ips_str = all_ips_str - external_ips_str
  131. external_ips = set()
  132. local_ips = set()
  133. all_ips = set()
  134. self.contains_priv_ips = False
  135. self.priv_ip_segment = None
  136. # convert IP strings to IPv4.IPAddress representation
  137. for ip in all_ips_str:
  138. if is_ipv4(ip):
  139. ip = IPAddress.parse(ip)
  140. # exclude local broadcast address and other special addresses
  141. if (not str(ip) == "255.255.255.255") and (not ip.is_localhost()) and (not ip.is_multicast()) and (
  142. not ip.is_reserved()) and (not ip.is_zero_conf()):
  143. all_ips.add(ip)
  144. for ip in all_ips:
  145. if ip.is_private():
  146. local_ips.add(ip)
  147. external_ips = all_ips - local_ips
  148. # save the certain unused local IPs of the network
  149. # to do that, divide the unused local Addressspace into chunks of (chunks_size) Addresses
  150. # initally only the first chunk will be used, but more chunks can be added to the pool of unused_local_ips if needed
  151. self.min_local_ip, self.max_local_ip = min(local_ips), max(local_ips)
  152. local_ip_range = (self.max_local_ip.to_int()) - (self.min_local_ip.to_int() + 1)
  153. if local_ip_range < 0:
  154. # for min,max pairs like (1,1), (1,2) there is no free address in between, but for (1,1) local_ip_range may be -1, because 1-(1+1)=-1
  155. local_ip_range = 0
  156. # chunk size can be adjusted if needed
  157. self.chunk_size = 200
  158. self.current_chunk = 1
  159. if local_ip_range < self.chunk_size:
  160. # there are not more than chunk_size unused IP Addresses to begin with
  161. self.chunks = 0
  162. self.chunk_remainder = local_ip_range
  163. else:
  164. # determine how many chunks of (chunk_size) Addresses there are and the save the remainder
  165. self.chunks = local_ip_range // self.chunk_size
  166. self.chunk_remainder = local_ip_range % self.chunk_size
  167. # add the first chunk of IP Addresses
  168. self.unused_local_ips = set()
  169. self.expand_unused_local_ips()
  170. # save the gathered information for efficient later use
  171. self.external_ips = frozenset(external_ips)
  172. self.remaining_external_ips = external_ips
  173. self.max_uncertain_local_ip = self.max_local_ip
  174. self.local_ips = frozenset(local_ips)
  175. # print("External IPS: " + str(external_ips))
  176. # print("LOCAL IPS: " + str(local_ips))
  177. self.remaining_local_ips = local_ips
  178. self.uncertain_local_ips = set()
  179. def expand_unused_local_ips(self):
  180. """
  181. expands the set of unused_local_ips by one chunk_size
  182. to illustrate this algorithm: suppose we have a chunksize of 100 and an Address space of 1 to 1000 (1 and 1000 are unused too), we then have 10 chunks
  183. every time this method is called, one chunk (100 Addresses) is added, each chunk starts at the base_address + the number of its chunk
  184. then, every chunk_amounth'th Address is added. Therefore for 10 chunks, every 10th address is added
  185. For the above example for the first, second and last call, we get the following IPs, respectively:
  186. first Call: 1+0, 1+10, 1+20, 1+30, ..., 1+990
  187. second Call: 2+0, 2+10, 2+20, 2+30, ..., 2+990
  188. ten'th Call: 10+0, 10+10, 10+20, 10+30, ..., 10+990
  189. :return: False if there are no more available unusd local IP Addresses, True otherwise
  190. """
  191. if self.current_chunk == self.chunks+1:
  192. # all chunks are used up, therefore add the remainder
  193. remainder_base_addr = self.min_local_ip.to_int() + self.chunks*self.chunk_size + 1
  194. for i in range(0,self.chunk_remainder):
  195. ip = IPAddress.from_int(remainder_base_addr + i)
  196. self.unused_local_ips.add(ip)
  197. self.current_chunk = self.current_chunk + 1
  198. return True
  199. elif self.current_chunk <= self.chunks:
  200. # add another chunk
  201. # choose IPs from the whole address space, that is available
  202. base_address = self.min_local_ip.to_int() + self.current_chunk
  203. for i in range(0,self.chunk_size):
  204. ip = IPAddress.from_int(base_address + i*self.chunks)
  205. self.unused_local_ips.add(ip)
  206. self.current_chunk = self.current_chunk + 1
  207. return True
  208. else:
  209. # no free IPs remaining
  210. return False