Node.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. from classes.Utilities import random_string, StructuredMessage, get_exponential_delay
  2. import math
  3. import numpy as np
  4. from classes.Packet import Packet
  5. from classes.Message import Message
  6. import random
  7. class Node(object):
  8. def __init__(self, env, conf, net=None, label=0, loggers=None, id=None):
  9. self.env = env
  10. self.conf = conf
  11. self.id = id or random_string(self.conf["misc"]["id_len"])
  12. self.net = net
  13. self.pkts_received = 0
  14. self.pkts_sent = 0
  15. self.avg_delay = 0.0 if self.conf["mixnodes"]["avg_delay"] == 0.0 else float(self.conf["mixnodes"]["avg_delay"])
  16. # State
  17. self.pool = {}
  18. self.inter_pkts = 0 #ctr which count how many new packets arrived since the last time a packet left
  19. self.probability_mass = None
  20. self.sender_estimates = None
  21. self.pool = {}
  22. self.mixlogging = False
  23. self.loggers = loggers if loggers else None
  24. (self.packet_logger, self.message_logger, self.entropy_logger) = self.loggers
  25. #State
  26. self.alive = True
  27. self.rate_sending = 1.0 / float(self.conf["clients"]["rate_sending"])
  28. self.rate_generating = float(self.conf["clients"][ "sim_add_buffer"]) # this specifies how often we put a real message into a buffer
  29. self.cover_traffic = self.conf["clients"]["cover_traffic"]
  30. self.cover_traffic_rate = 1.0 / float(self.conf["clients"]["cover_traffic_rate"])
  31. self.verbose = False
  32. self.pkt_buffer_out = []
  33. self.pkt_buffer_out_not_ack = {}
  34. self.label = label
  35. self.send_dummy_ACK = self.conf["clients"]["dummies_acks"]
  36. self.send_ACK = self.conf["clients"]["ACK"]
  37. self.num_received_packets = 0
  38. self.msg_buffer_in = {}
  39. self.start_logs = False
  40. self.batch_num = 0
  41. self.free_to_batch = True
  42. def start(self, dest):
  43. ''' Main client method; It sends packets out.
  44. It checks if there are any new packets in the outgoing buffer.
  45. If it finds any, it sends the first of them.
  46. If none are found, the client sends out a dummy
  47. packet (i.e., cover loop packet).
  48. '''
  49. delays = []
  50. while True:
  51. if self.alive:
  52. if delays == []:
  53. delays = list(np.random.exponential(self.rate_sending, 10000))
  54. delay = delays.pop()
  55. yield self.env.timeout(float(delay))
  56. if len(self.pkt_buffer_out) > 0: #If there is a packet to be send
  57. tmp_pkt = self.pkt_buffer_out.pop(0)
  58. self.send_packet(tmp_pkt)
  59. self.env.total_messages_sent += 1
  60. else:
  61. print("Dummy package being sent..")
  62. tmp_pkt = Packet.dummy(conf=self.conf, net=self.net, dest=self, sender=self) # sender_estimates[sender.label] = 1.0
  63. tmp_pkt.time_queued = self.env.now
  64. self.send_packet(tmp_pkt)
  65. self.env.total_messages_sent += 1
  66. else:
  67. break
  68. def start_no_dummy(self):
  69. delays = []
  70. while True:
  71. if self.alive:
  72. if delays == []:
  73. delays = list(np.random.exponential(self.rate_sending, 10000))
  74. delay = delays.pop()
  75. yield self.env.timeout(float(delay))
  76. if len(self.pkt_buffer_out) > 0: #If there is a packet to be sent
  77. tmp_pkt = self.pkt_buffer_out.pop(0)
  78. self.send_packet(tmp_pkt)
  79. self.env.total_messages_sent += 1
  80. else:
  81. break
  82. def start_loop_cover_traffc(self):
  83. ''' Function responsible for managing the independent Poisson stream
  84. of loop cover traffic.
  85. '''
  86. if self.cover_traffic:
  87. delays = []
  88. while True:
  89. if self.alive:
  90. if delays == []:
  91. delays = list(np.random.exponential(self.cover_traffic_rate, 10000))
  92. delay = delays.pop()
  93. yield self.env.timeout(float(delay))
  94. cover_loop_packet = Packet.dummy(conf=self.conf, net = self.net, dest=self, sender=self)
  95. cover_loop_packet.time_queued = self.env.now
  96. self.send_packet(cover_loop_packet)
  97. self.env.total_messages_sent += 1
  98. else:
  99. break
  100. else:
  101. pass
  102. def send_packet(self, packet):
  103. ''' Methods sends a packet into the network,
  104. and logs information about the sending.
  105. Keyword arguments:
  106. packet - an object of class Packet which is sent into the network.
  107. '''
  108. packet.time_sent = self.env.now
  109. #print(packet.time_sent)
  110. packet.current_node = -1 # If it's a retransmission this needs to be reset
  111. packet.times_transmitted += 1
  112. if packet.type == "REAL" and packet.message.time_sent is None:
  113. packet.message.time_sent = packet.time_sent
  114. self.env.process(self.net.forward_packet(packet))
  115. def process_batch_round(self):
  116. ''' Additional function if we want to simulate a batching technique.
  117. '''
  118. self.batch_num += 1
  119. batch = list(self.pool.keys())[:int(self.conf["mixnodes"]["batch_size"])]
  120. random.shuffle(batch)
  121. for pktid in batch:
  122. if pktid in self.pool.keys():
  123. pkt = self.pool[pktid]
  124. yield self.env.timeout(0.000386) # add some delay for packet cryptographinc processing
  125. self.forward_packet(pkt)
  126. self.free_to_batch = True
  127. return
  128. yield
  129. def process_packet(self, packet):
  130. ''' Function performs processing of the given packet and logs information
  131. about it and forwards it to the next destionation.
  132. While processing the packet, the function also calculates the probability
  133. that the given packet comes from a particular sender (target sender).
  134. Keyword arguments:
  135. packet - the packet which should be processed.
  136. '''
  137. # Check if this is the desired destination
  138. if self.id == packet.dest.id:
  139. self.env.process(self.process_received_packet(packet))
  140. else:
  141. self.pkts_received += 1
  142. self.add_pkt_in_pool(packet)
  143. if (self.net.type == "cascade" or self.net.type == "multi_cascade") and self.conf["mixnodes"]["batch"] == True:
  144. if len(self.pool) >= int(self.conf["mixnodes"]["batch_size"]) and self.free_to_batch == True:
  145. self.free_to_batch = False
  146. self.env.process(self.process_batch_round())
  147. else:
  148. delay = get_exponential_delay(self.avg_delay) if self.avg_delay != 0.0 else 0.0
  149. wait = delay + 0.000386 # add the time of processing the Sphinx packet (benchmarked using our Sphinx rust implementation).
  150. yield self.env.timeout(wait)
  151. if not packet.dropped: # It may get dropped if pool gets full, while waiting
  152. self.forward_packet(packet)
  153. else:
  154. pass
  155. def process_received_packet(self, packet):
  156. ''' 1. Processes the received packets and logs informatiomn about them.
  157. 2. If enabled, it sends an ACK packet to the sender.
  158. 3. Checks whether all the packets of particular message were received and logs the information about the reconstructed message.
  159. Keyword arguments:
  160. packet - the received packet.
  161. '''
  162. packet.time_delivered = self.env.now
  163. self.env.total_messages_received += 1
  164. # print(self.env.message_ctr)
  165. if packet.type == "REAL":
  166. self.num_received_packets += 1
  167. msg = packet.message
  168. if not msg.complete_receiving:
  169. msg.register_received_pkt(packet)
  170. self.msg_buffer_in[msg.id] = msg
  171. if self.conf["logging"]["enabled"] and self.packet_logger is not None and self.start_logs:
  172. self.packet_logger.info(StructuredMessage(metadata=("RCV_PKT_REAL", self.env.now, self.id, packet.id, packet.type, packet.msg_id, packet.time_queued, packet.time_sent, packet.time_delivered, packet.fragments, packet.sender_estimates[0], packet.sender_estimates[1], packet.sender_estimates[2], packet.real_sender.label, packet.route, packet.pool_logs)))
  173. if msg.complete_receiving:
  174. msg_transit_time = (msg.time_delivered - msg.time_sent)
  175. if self.conf["logging"]["enabled"] and self.message_logger is not None and self.start_logs:
  176. self.message_logger.info(StructuredMessage(metadata=("RCV_MSG", self.env.now, self.id, msg.id, len(msg.pkts), msg.time_queued, msg.time_sent, msg.time_delivered, msg_transit_time, len(msg.payload), msg.real_sender.label, msg.real_sender.id)))
  177. self.env.message_ctr -= 1
  178. # this part is used to stop the simulator at a time when all sent packets got delivered!
  179. if self.env.finished == True and self.env.message_ctr <= 0:
  180. #print('> The stop simulation condition happend.')
  181. #self.env.stop_sim_event.succeed()
  182. if not self.env.phase_transitions[0]:
  183. print("> Phase 1 condition happened")
  184. self.env.phase_transitions[0] = True
  185. self.env.stop_first_phase.succeed()
  186. else:
  187. print("> Phase 2 condition happened")
  188. self.env.phase_transitions[1] = True
  189. self.env.stop_sim_event.succeed()
  190. elif packet.type == "DUMMY":
  191. pass
  192. else:
  193. raise Exception("Packet type not recognised")
  194. return
  195. yield # self.env.timeout(0.0)
  196. def forward_packet(self, packet):
  197. if not self.probability_mass is None:
  198. packet.probability_mass = self.probability_mass.copy()
  199. if not self.sender_estimates is None:
  200. packet.sender_estimates = self.sender_estimates.copy()
  201. #If it has been dropped in the meantime, we just skip sending it.
  202. try:
  203. self.pool.pop(packet.id)
  204. except Exception as e:
  205. pass
  206. # If the pool dries out, then we start measurments from scratch
  207. if len(self.pool) == 0:
  208. self.sender_estimates = None
  209. self.probability_mass = None
  210. self.pkts_sent += 1
  211. # If this is the last mixnode, update the entropy taking into account probabilities
  212. # of packets leaving the network
  213. if self.id == packet.route[-2].id and self.mixlogging:
  214. self.update_entropy(packet)
  215. self.env.process(self.net.forward_packet(packet))
  216. def update_entropy(self, packet):
  217. for i, pr in enumerate(packet.probability_mass):
  218. if pr != 0.0:
  219. self.env.entropy[i] += -(float(pr) * math.log(float(pr), 2))
  220. def add_pkt_in_pool(self, packet):
  221. ''' Method adds incoming packet in mixnode pool and updates the vector
  222. of estimated probabilities, taking into account the new state of the pool.
  223. Keyword arguments:
  224. packet - the packet for which we update the probabilities vector
  225. '''
  226. self.inter_pkts += 1
  227. if self.probability_mass is None and self.sender_estimates is None:
  228. self.pool[packet.id] = packet
  229. self.probability_mass = packet.probability_mass.copy()
  230. self.sender_estimates = packet.sender_estimates.copy()
  231. else:
  232. dist_pm = self.probability_mass * len(self.pool) + packet.probability_mass
  233. dist_se = self.sender_estimates * len(self.pool) + packet.sender_estimates
  234. self.pool[packet.id] = packet # Add Packet in Pool
  235. dist_pm = dist_pm / float(len(self.pool))
  236. dist_se = dist_se / float(len(self.pool))
  237. self.probability_mass = dist_pm.copy()
  238. self.sender_estimates = dist_se.copy()
  239. def set_start_logs(self, time=0.0):
  240. yield self.env.timeout(time)
  241. self.start_logs = True
  242. if self.verbose:
  243. print("> Logs set on for Client %s." % self.id)
  244. def simulate_adding_packets_into_buffer(self, dest):
  245. # This function is used in the test mode
  246. ''' This method generates the actual 'real' messages for which we compute the entropy.
  247. The rate at which we generate this traffic is defined by rate_generating variable in
  248. the config file.
  249. Keyword arguments:
  250. dest - the destination of the message.
  251. '''
  252. i = 0
  253. # Note: if you want to send messages or larger size than a single packet this function must be updated
  254. while i < self.conf["misc"]["num_target_packets"]:
  255. yield self.env.timeout(float(self.rate_generating))
  256. msg = Message.random(conf=self.conf, net=self.net, sender=self, dest=dest) # New Message
  257. current_time = self.env.now
  258. msg.time_queued = current_time # The time when the message was created and placed into the queue
  259. for pkt in msg.pkts:
  260. pkt.time_queued = current_time
  261. pkt.probability_mass[i] = 1.0
  262. self.add_to_buffer(msg.pkts)
  263. self.env.message_ctr += 1
  264. i += 1
  265. self.env.finished = True
  266. def terminate(self, delay=0.0):
  267. ''' Function changes user's alive status to False after a particular delay
  268. Keyword argument:
  269. delayd (float) - time after the alice status should be switched to False.
  270. '''
  271. yield self.env.timeout(delay)
  272. self.alive = False
  273. print("Node %s terminated at time %s ." % (self.id, self.env.now))
  274. def add_to_buffer(self, packets):
  275. for pkt in packets:
  276. tmp_now = self.env.now
  277. pkt.time_queued = tmp_now
  278. self.pkt_buffer_out.append(pkt)
  279. def __repr__(self):
  280. return self.id