MQTT_protocol.java 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. package de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols;
  2. import java.util.Arrays;
  3. import java.util.Collection;
  4. import java.util.HashMap;
  5. import java.util.Iterator;
  6. import java.util.LinkedList;
  7. import java.util.Random;
  8. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
  9. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port;
  10. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol;
  11. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SimulationManager;
  12. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice;
  13. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolCollector;
  14. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolSensor;
  15. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatCollector;
  16. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatSensor;
  17. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.SensorLabel;
  18. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet;
  19. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTTpublishPacket;
  20. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.scheduler.AbstractEvent;
  21. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair;
  22. /**
  23. * Implementation of the MQTT Protocol to generate packets for the simulation
  24. *
  25. *
  26. * @author Andreas T. Meyer-Berg
  27. */
  28. public class MQTT_protocol implements Protocol {
  29. /**
  30. * Broker which collects and distributes messages
  31. */
  32. private Port broker;
  33. /**
  34. * Publishers like sensors, which publish data
  35. */
  36. private LinkedList<Port> pubs;
  37. /**
  38. * Subscriber which subscribe to different Topics
  39. */
  40. private LinkedList<Port> subs;
  41. /**
  42. * Topics subscribed by each Port
  43. */
  44. private HashMap<Port, LinkedList<String>> subbedTopics = new HashMap<Port, LinkedList<String>>();
  45. /**
  46. * Devices that are Publisher and Subscriber and therefore send and receive
  47. * messages
  48. */
  49. private LinkedList<Port> pubSubs;
  50. /**
  51. * Packets which are currently being generated or were generated after the
  52. * last generatePackets Call
  53. */
  54. private LinkedList<Packet> currentPackets = new LinkedList<Packet>();
  55. // For Visualization in case of deleting the broker
  56. private LinkedList<Pair<Port, Port>> deletedConnectionLinks = new LinkedList<Pair<Port, Port>>();
  57. private SmartDevice deletedBroker = new SmartDevice("DeletedBroker");
  58. private Port deletedBrokerPort = new Port(deletedBroker, (short) -1);
  59. /**
  60. * Topics of the MQTT Broker
  61. */
  62. // private LinkedList<String> topics = new LinkedList<String>();
  63. /**
  64. * Creates a new MQTT Protocol
  65. */
  66. public MQTT_protocol() {
  67. this.broker = null;
  68. initialize();
  69. }
  70. /**
  71. * Creates a new MQTT Protocol
  72. *
  73. * @param broker
  74. * broker of the protocol
  75. */
  76. public MQTT_protocol(Port broker) {
  77. this.broker = broker;
  78. initialize();
  79. }
  80. /**
  81. * Initializes the different fields
  82. */
  83. private void initialize() {
  84. subs = new LinkedList<Port>();
  85. pubs = new LinkedList<Port>();
  86. pubSubs = new LinkedList<Port>();
  87. }
  88. @Override
  89. public Collection<Packet> generateNextPackets(Port port, long timestep, boolean packetLost) {
  90. /**
  91. * Packets which will be generated
  92. */
  93. LinkedList<Packet> returnPackets = new LinkedList<Packet>();
  94. // Update all Timestamps of previous created Packets
  95. for (Packet p : currentPackets) {
  96. p.setTimestamp(p.getTimestamp() + timestep);
  97. }
  98. // Add these packets to the return
  99. returnPackets.addAll(currentPackets);
  100. // remove packets from the old list
  101. currentPackets.clear();
  102. // Clear deleted connections
  103. deletedConnectionLinks.clear();
  104. // Return termination packets
  105. if (port == null)
  106. return returnPackets;
  107. SmartDevice device = port.getOwner();
  108. if (device == null)
  109. return returnPackets;
  110. /**
  111. * Update the lastTime the port was triggered
  112. */
  113. port.setLastTrigger(timestep);
  114. /**
  115. * if port null, skip this step
  116. */
  117. if (broker == null)
  118. return returnPackets;
  119. /**
  120. * Generate new Packets regarding to their class
  121. */
  122. if (port == broker) {
  123. // Broker should not send new packages, without new messages
  124. // But could perform some Ping request to random participants
  125. Collection<Port> devices = getDevices();
  126. devices.remove(port);
  127. Port dest = null;
  128. Iterator<Port> it = devices.iterator();
  129. for (int i = 0; i < (Math.random() * devices.size()) && it.hasNext(); i++) {
  130. dest = it.next();
  131. }
  132. if (dest != null) {
  133. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
  134. /**
  135. * Delay to the destination
  136. */
  137. long delayBrokerDest = broker.getTransmissionDelayTo(dest);
  138. if (dest.getStatus() != Port.CLOSED && delayBrokerDest != Long.MAX_VALUE)
  139. returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
  140. timestep + delayBrokerDest + dest.getJitter() + dest.getResponseTime(), dest, port));
  141. }
  142. }
  143. if (subs.contains(port) || pubSubs.contains(port)) {
  144. // Subs can either subscribe to topics or unsubscribe
  145. /**
  146. * Topics, the SmartDevice is subscribed to
  147. */
  148. LinkedList<String> tops = subbedTopics.get(port);
  149. /**
  150. * Topic which should be subscribed to
  151. */
  152. String newTopic = null;
  153. /**
  154. * Check if FloatCollector & not subscribed
  155. */
  156. if (device instanceof FloatCollector) {
  157. FloatCollector fCollector = (FloatCollector) device;
  158. if (!tops.contains(fCollector.getFCinfoName()))
  159. newTopic = fCollector.getFCinfoName();
  160. }
  161. /**
  162. * Check if BoolCollector & not subscribed
  163. */
  164. if (device instanceof BoolCollector) {
  165. BoolCollector bCollector = (BoolCollector) device;
  166. if (!tops.contains(bCollector.getBCinfoName()))
  167. newTopic = bCollector.getBCinfoName();
  168. }
  169. // Subscribe if no subscriptions so far or not subscribed to all,
  170. // with a probability of70%
  171. if (newTopic != null) {
  172. /**
  173. * Subscribe Request
  174. */
  175. returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
  176. /**
  177. * Delay to the broker
  178. */
  179. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  180. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  181. timestep += broker.getResponseTime();
  182. returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep + delayPortToBroker, broker, port));
  183. tops.add(newTopic);
  184. }
  185. } else {
  186. /**
  187. * Check if some topics should be unsubscribed (e.g., one topic
  188. * name change)
  189. */
  190. newTopic = null;
  191. LinkedList<String> oldTopics = new LinkedList<String>(tops);
  192. if (!oldTopics.isEmpty()) {
  193. /**
  194. * Check if FloatCollector & not subscribed
  195. */
  196. if (device instanceof FloatCollector) {
  197. FloatCollector fCollector = (FloatCollector) device;
  198. oldTopics.remove(fCollector.getFCinfoName());
  199. }
  200. /**
  201. * Check if BoolCollector & not subscribed
  202. */
  203. if (device instanceof BoolCollector) {
  204. BoolCollector bCollector = (BoolCollector) device;
  205. oldTopics.remove(bCollector.getBCinfoName());
  206. }
  207. if (!oldTopics.isEmpty()) {
  208. newTopic = oldTopics.getFirst();
  209. /**
  210. * Send Unsubscribe Packet
  211. */
  212. returnPackets.add(
  213. new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
  214. /**
  215. * Delay to the broker
  216. */
  217. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  218. /**
  219. * Ackknowledgement
  220. */
  221. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  222. timestep += broker.getResponseTime();
  223. returnPackets.add(
  224. new MQTT_packet(MQTT_packet.UNSUBACK, timestep + delayPortToBroker, broker, port));
  225. tops.remove(newTopic);
  226. }
  227. }
  228. }
  229. }
  230. }
  231. if (pubs.contains(port) || pubSubs.contains(port) && Math.random() < 0.3) {
  232. /**
  233. * Topic which should be published to
  234. */
  235. String newTopic = null;
  236. /**
  237. * Value which should be published
  238. */
  239. String newValue = null;
  240. /**
  241. * Value of the connected Sensor (as label)
  242. */
  243. String newSensorValue = null;
  244. boolean isBoolean = false;
  245. /**
  246. * True if value anomaly (value = -1)
  247. */
  248. boolean valueAnomaly = (device.getLabel()==(short)-1);
  249. /**
  250. * Check if FloatSensor
  251. */
  252. if (device instanceof FloatSensor) {
  253. FloatSensor fSensor = (FloatSensor) device;
  254. newTopic = fSensor.getFSinfoName();
  255. newValue = "" + fSensor.getFSval();
  256. }
  257. /**
  258. * Check if BoolSensor - if also FloatSensor: 50% chance of
  259. * overriding
  260. */
  261. if (device instanceof BoolSensor) {
  262. BoolSensor bSensor = (BoolSensor) device;
  263. if (newTopic == null || new Random().nextBoolean()) {
  264. newTopic = bSensor.getBSinfoName();
  265. newValue = "" + bSensor.getBSval();
  266. isBoolean = true;
  267. }
  268. }
  269. if(device instanceof SensorLabel) {
  270. newSensorValue = ((SensorLabel)device).getSensorLabel();
  271. }else {
  272. newSensorValue=newValue;
  273. }
  274. if (newTopic != null) {
  275. /**
  276. * Packet for publishing the new value
  277. */
  278. Packet pubPacket = null;
  279. if(isBoolean) {
  280. pubPacket = new MQTTpublishPacket(timestep, port, broker, newTopic, Boolean.parseBoolean(newValue), Boolean.parseBoolean(newSensorValue));
  281. }else {
  282. pubPacket = new MQTTpublishPacket(timestep, port, broker, newTopic, Float.parseFloat(newValue), Float.parseFloat(newSensorValue));
  283. }
  284. if(valueAnomaly) {
  285. pubPacket.setLabel((short) 1);
  286. }
  287. /**
  288. * Send Packet
  289. */
  290. returnPackets.add(pubPacket);
  291. /**
  292. * Delay to the broker
  293. */
  294. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  295. // Publish to Subscribers
  296. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  297. /**
  298. * Response/Acknowledgement
  299. */
  300. timestep += broker.getResponseTime() + delayPortToBroker;
  301. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
  302. for (Port p : subs) {
  303. /**
  304. * Skip unsubcribed ports
  305. */
  306. if (!subbedTopics.get(p).contains(newTopic))
  307. continue;
  308. /**
  309. * Delay broker to subscriber
  310. */
  311. long delayBrokerToSub = broker.getTransmissionDelayTo(p);
  312. timestep += broker.getResponseTime();
  313. /**
  314. * Packet Broker -> Subscriber
  315. */
  316. if(isBoolean) {
  317. pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Boolean.parseBoolean(newValue), Boolean.parseBoolean(newSensorValue));
  318. }else {
  319. pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Float.parseFloat(newValue), Float.parseFloat(newSensorValue));
  320. }
  321. if(valueAnomaly) {
  322. pubPacket.setLabel((short) 1);
  323. }
  324. returnPackets.add(pubPacket);
  325. if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
  326. returnPackets.add(
  327. new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
  328. // Update Collector
  329. if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
  330. && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
  331. /**
  332. * original Float Value -> if it might change during the two events
  333. */
  334. float oldValue = ((FloatSensor) device).getFSval();
  335. /**
  336. * Schedule Event to update the sensor
  337. * -> therefore multiple parallel interactions would be in the right order
  338. */
  339. SimulationManager.scheduleEvent(new AbstractEvent(timestep + p.getResponseTime()) {
  340. @Override
  341. public void simulateEvent(long time) {
  342. ((FloatCollector) p.getOwner()).setFCval(oldValue);
  343. }
  344. });
  345. } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
  346. && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
  347. /**
  348. * Original sensor value
  349. */
  350. boolean oldValue = ((BoolSensor) device).getBSval();
  351. /**
  352. * Schedule Event to update the sensor
  353. * -> therefore multiple parallel interactions would be in the right order
  354. */
  355. SimulationManager.scheduleEvent(new AbstractEvent(timestep + p.getResponseTime()) {
  356. @Override
  357. public void simulateEvent(long time) {
  358. ((BoolCollector) p.getOwner()).setBCval(oldValue);
  359. }
  360. });
  361. }
  362. }
  363. }
  364. for (Port p : pubSubs) {
  365. /**
  366. * Skip unsubscribed Ports
  367. */
  368. if (!subbedTopics.get(p).contains(newTopic) || p.getOwner() == null)
  369. continue;
  370. /**
  371. * Delay broker to subscriber
  372. */
  373. long delayBrokerToSub = broker.getTransmissionDelayTo(p);
  374. timestep += broker.getResponseTime();
  375. if(isBoolean) {
  376. pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Boolean.parseBoolean(newValue), Boolean.parseBoolean(newSensorValue));
  377. }else {
  378. pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Float.parseFloat(newValue), Float.parseFloat(newSensorValue));
  379. }
  380. if(valueAnomaly) {
  381. pubPacket.setLabel((short) 1);
  382. }
  383. returnPackets.add(pubPacket);
  384. if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
  385. returnPackets.add(
  386. new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
  387. // Update Collector
  388. if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
  389. && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
  390. ((FloatCollector) p.getOwner()).setFCval(((FloatSensor) device).getFSval());
  391. } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
  392. && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
  393. ((BoolCollector) p.getOwner()).setBCval(((BoolSensor) device).getBSval());
  394. }
  395. }
  396. }
  397. }
  398. }
  399. }
  400. /**
  401. * Rare Ping request to broker
  402. */
  403. if (Math.random() < 0.05 && port != broker) {
  404. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
  405. /**
  406. * Delay broker to subscriber
  407. */
  408. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  409. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE)
  410. returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
  411. timestep + delayPortToBroker + broker.getResponseTime(), broker, port));
  412. }
  413. return returnPackets;
  414. }
  415. @Override
  416. public int getNumberOfRoles() {
  417. return 4;
  418. }
  419. @Override
  420. public String[] getRoles() {
  421. // PublisherSubscriber is Publisher as well as Subscriber
  422. return new String[] { "Broker", "PublisherSubscriber", "Publisher", "Subscriber" };
  423. }
  424. @Override
  425. public Collection<Port> getDevicesWithRole(int role) {
  426. switch (role) {
  427. case 0:
  428. return new LinkedList<Port>(Arrays.asList(broker));
  429. case 1:
  430. return pubSubs;
  431. case 2:
  432. return pubs;
  433. case 3:
  434. return subs;
  435. default:
  436. return null;
  437. }
  438. }
  439. @Override
  440. public boolean addDeviceOfRole(Port device, int role) {
  441. /*
  442. * First device has to be the Broker
  443. */
  444. if (broker == null) {
  445. if (role == 0) {
  446. broker = device;
  447. updateBrokerOnDeletedConnections(null);
  448. return true;
  449. } else {
  450. return false;
  451. }
  452. }
  453. switch (role) {
  454. case 0:
  455. // Just one broker allowed.
  456. return false;
  457. case 1:
  458. pubSubs.add(device);
  459. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  460. break;
  461. case 2:
  462. pubs.add(device);
  463. break;
  464. case 3:
  465. subs.add(device);
  466. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  467. break;
  468. default:
  469. // invalid role
  470. return false;
  471. }
  472. // Create packets for the connecting Client
  473. currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size() / 2, device, broker));
  474. currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size() / 2 + 1, broker, device));
  475. return true;
  476. }
  477. @Override
  478. public void removeDevice(Port device) {
  479. /**
  480. * true if the device was removed
  481. */
  482. boolean removedDevice = false;
  483. if (broker == device) {
  484. broker = null;
  485. deletedConnectionLinks
  486. .add(new Pair<Port, Port>(new Port(device.getOwner(), (short) -1), deletedBrokerPort));
  487. updateBrokerOnDeletedConnections(device);
  488. }
  489. removedDevice |= pubSubs.remove(device);
  490. removedDevice |= subs.remove(device);
  491. removedDevice |= pubs.remove(device);
  492. // Remove Port from topics and clear its list
  493. LinkedList<String> oldTopics = subbedTopics.remove(device);
  494. if (oldTopics != null)
  495. oldTopics.clear();
  496. if (removedDevice) {
  497. if (broker == null) {
  498. deletedConnectionLinks.add(new Pair<Port, Port>(deletedBrokerPort, device));
  499. } else {
  500. deletedConnectionLinks.add(new Pair<Port, Port>(broker, device));
  501. // If not the broker and device was removed -> disconnect
  502. currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size() / 2, device, broker));
  503. }
  504. }
  505. }
  506. @Override
  507. public String getName() {
  508. return "MQTT";
  509. }
  510. @Override
  511. public int getRoleOfDevice(Port device) {
  512. if (device == null)
  513. return -1;
  514. if (device == broker)
  515. return 0;
  516. if (pubSubs.contains(device))
  517. return 1;
  518. if (pubs.contains(device))
  519. return 2;
  520. if (subs.contains(device))
  521. return 3;
  522. return -1;
  523. }
  524. @Override
  525. public Collection<Port> getDevices() {
  526. LinkedList<Port> returnDevices = new LinkedList<Port>();
  527. if (broker != null)
  528. returnDevices.add(broker);
  529. returnDevices.addAll(pubSubs);
  530. returnDevices.addAll(pubs);
  531. returnDevices.addAll(subs);
  532. return returnDevices;
  533. }
  534. @Override
  535. public byte getTopologyType() {
  536. return STAR;
  537. }
  538. @Override
  539. public Collection<Pair<Port, Port>> getTopology() {
  540. LinkedList<Pair<Port, Port>> topology = new LinkedList<Pair<Port, Port>>();
  541. Port center = broker;
  542. calcDeletedBrokerPosition();
  543. if (broker == null)
  544. center = new Port(deletedBroker, (short) -1);
  545. for (Port p : pubSubs)
  546. topology.add(new Pair<Port, Port>(center, p));
  547. for (Port p : pubs)
  548. topology.add(new Pair<Port, Port>(center, p));
  549. for (Port p : subs)
  550. topology.add(new Pair<Port, Port>(center, p));
  551. return topology;
  552. }
  553. @Override
  554. public Collection<Pair<Port, Port>> getDeletedTopology() {
  555. calcDeletedBrokerPosition();
  556. return deletedConnectionLinks;
  557. }
  558. /**
  559. * Calculate and update the position of the deleted Broker
  560. */
  561. private void calcDeletedBrokerPosition() {
  562. if (broker == null) {
  563. int x = 0, y = 0, noP = 0;
  564. for (Port p : getDevices()) {
  565. if (p != null && p.getOwner() != null) {
  566. x += p.getOwner().getX();
  567. y += p.getOwner().getY();
  568. noP++;
  569. }
  570. }
  571. if (noP == 0)
  572. return;
  573. deletedBroker.setX(((int) (x * 1.0) / noP));
  574. deletedBroker.setY(((int) (y * 1.0) / noP));
  575. }
  576. }
  577. /**
  578. * Update the broker port on the deleted Connections
  579. *
  580. * @param device
  581. * old broker
  582. */
  583. private void updateBrokerOnDeletedConnections(Port device) {
  584. for (Pair<Port, Port> p : deletedConnectionLinks) {
  585. if (broker == null) {
  586. if (p.getLeft() == device)
  587. p.setLeft(deletedBrokerPort);
  588. if (p.getRight() == device)
  589. p.setRight(deletedBrokerPort);
  590. } else {
  591. if (p.getLeft() == deletedBrokerPort)
  592. p.setLeft(broker);
  593. if (p.getRight() == deletedBrokerPort)
  594. p.setRight(broker);
  595. }
  596. }
  597. }
  598. }