MQTT_protocol.java 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635
  1. package de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols;
  2. import java.util.Arrays;
  3. import java.util.Collection;
  4. import java.util.HashMap;
  5. import java.util.Iterator;
  6. import java.util.LinkedList;
  7. import java.util.Random;
  8. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
  9. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port;
  10. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol;
  11. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SimulationManager;
  12. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice;
  13. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolCollector;
  14. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolSensor;
  15. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatCollector;
  16. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatSensor;
  17. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet;
  18. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTTpublishPacket;
  19. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.scheduler.AbstractEvent;
  20. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair;
  21. /**
  22. * Implementation of the MQTT Protocol to generate packets for the simulation
  23. *
  24. *
  25. * @author Andreas T. Meyer-Berg
  26. */
  27. public class MQTT_protocol implements Protocol {
  28. /**
  29. * Broker which collects and distributes messages
  30. */
  31. private Port broker;
  32. /**
  33. * Publishers like sensors, which publish data
  34. */
  35. private LinkedList<Port> pubs;
  36. /**
  37. * Subscriber which subscribe to different Topics
  38. */
  39. private LinkedList<Port> subs;
  40. /**
  41. * Topics subscribed by each Port
  42. */
  43. private HashMap<Port, LinkedList<String>> subbedTopics = new HashMap<Port, LinkedList<String>>();
  44. /**
  45. * Devices that are Publisher and Subscriber and therefore send and receive
  46. * messages
  47. */
  48. private LinkedList<Port> pubSubs;
  49. /**
  50. * Packets which are currently being generated or were generated after the
  51. * last generatePackets Call
  52. */
  53. private LinkedList<Packet> currentPackets = new LinkedList<Packet>();
  54. // For Visualization in case of deleting the broker
  55. private LinkedList<Pair<Port, Port>> deletedConnectionLinks = new LinkedList<Pair<Port, Port>>();
  56. private SmartDevice deletedBroker = new SmartDevice("DeletedBroker");
  57. private Port deletedBrokerPort = new Port(deletedBroker, (short) -1);
  58. /**
  59. * Topics of the MQTT Broker
  60. */
  61. // private LinkedList<String> topics = new LinkedList<String>();
  62. /**
  63. * Creates a new MQTT Protocol
  64. */
  65. public MQTT_protocol() {
  66. this.broker = null;
  67. initialize();
  68. }
  69. /**
  70. * Creates a new MQTT Protocol
  71. *
  72. * @param broker
  73. * broker of the protocol
  74. */
  75. public MQTT_protocol(Port broker) {
  76. this.broker = broker;
  77. initialize();
  78. }
  79. /**
  80. * Initializes the different fields
  81. */
  82. private void initialize() {
  83. subs = new LinkedList<Port>();
  84. pubs = new LinkedList<Port>();
  85. pubSubs = new LinkedList<Port>();
  86. }
  87. @Override
  88. public Collection<Packet> generateNextPackets(Port port, long timestep, boolean packetLost) {
  89. /**
  90. * Packets which will be generated
  91. */
  92. LinkedList<Packet> returnPackets = new LinkedList<Packet>();
  93. // Update all Timestamps of previous created Packets
  94. for (Packet p : currentPackets) {
  95. p.setTimestamp(p.getTimestamp() + timestep);
  96. }
  97. // Add these packets to the return
  98. returnPackets.addAll(currentPackets);
  99. // remove packets from the old list
  100. currentPackets.clear();
  101. // Clear deleted connections
  102. deletedConnectionLinks.clear();
  103. // Return termination packets
  104. if (port == null)
  105. return returnPackets;
  106. SmartDevice device = port.getOwner();
  107. if (device == null)
  108. return returnPackets;
  109. /**
  110. * Update the lastTime the port was triggered
  111. */
  112. port.setLastTrigger(timestep);
  113. /**
  114. * if port null, skip this step
  115. */
  116. if (broker == null)
  117. return returnPackets;
  118. /**
  119. * Generate new Packets regarding to their class
  120. */
  121. if (port == broker) {
  122. // Broker should not send new packages, without new messages
  123. // But could perform some Ping request to random participants
  124. Collection<Port> devices = getDevices();
  125. devices.remove(port);
  126. Port dest = null;
  127. Iterator<Port> it = devices.iterator();
  128. for (int i = 0; i < (Math.random() * devices.size()) && it.hasNext(); i++) {
  129. dest = it.next();
  130. }
  131. if (dest != null) {
  132. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
  133. /**
  134. * Delay to the destination
  135. */
  136. long delayBrokerDest = broker.getTransmissionDelayTo(dest);
  137. if (dest.getStatus() != Port.CLOSED && delayBrokerDest != Long.MAX_VALUE)
  138. returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
  139. timestep + delayBrokerDest + dest.getJitter() + dest.getResponseTime(), dest, port));
  140. }
  141. }
  142. if (subs.contains(port) || pubSubs.contains(port)) {
  143. // Subs can either subscribe to topics or unsubscribe
  144. /**
  145. * Topics, the SmartDevice is subscribed to
  146. */
  147. LinkedList<String> tops = subbedTopics.get(port);
  148. /**
  149. * Topic which should be subscribed to
  150. */
  151. String newTopic = null;
  152. /**
  153. * Check if FloatCollector & not subscribed
  154. */
  155. if (device instanceof FloatCollector) {
  156. FloatCollector fCollector = (FloatCollector) device;
  157. if (!tops.contains(fCollector.getFCinfoName()))
  158. newTopic = fCollector.getFCinfoName();
  159. }
  160. /**
  161. * Check if BoolCollector & not subscribed
  162. */
  163. if (device instanceof BoolCollector) {
  164. BoolCollector bCollector = (BoolCollector) device;
  165. if (!tops.contains(bCollector.getBCinfoName()))
  166. newTopic = bCollector.getBCinfoName();
  167. }
  168. // Subscribe if no subscriptions so far or not subscribed to all,
  169. // with a probability of70%
  170. if (newTopic != null) {
  171. /**
  172. * Subscribe Request
  173. */
  174. returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
  175. /**
  176. * Delay to the broker
  177. */
  178. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  179. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  180. timestep += broker.getResponseTime();
  181. returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep + delayPortToBroker, broker, port));
  182. tops.add(newTopic);
  183. }
  184. } else {
  185. /**
  186. * Check if some topics should be unsubscribed (e.g., one topic
  187. * name change)
  188. */
  189. newTopic = null;
  190. LinkedList<String> oldTopics = new LinkedList<String>(tops);
  191. if (!oldTopics.isEmpty()) {
  192. /**
  193. * Check if FloatCollector & not subscribed
  194. */
  195. if (device instanceof FloatCollector) {
  196. FloatCollector fCollector = (FloatCollector) device;
  197. oldTopics.remove(fCollector.getFCinfoName());
  198. }
  199. /**
  200. * Check if BoolCollector & not subscribed
  201. */
  202. if (device instanceof BoolCollector) {
  203. BoolCollector bCollector = (BoolCollector) device;
  204. oldTopics.remove(bCollector.getBCinfoName());
  205. }
  206. if (!oldTopics.isEmpty()) {
  207. newTopic = oldTopics.getFirst();
  208. /**
  209. * Send Unsubscribe Packet
  210. */
  211. returnPackets.add(
  212. new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
  213. /**
  214. * Delay to the broker
  215. */
  216. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  217. /**
  218. * Ackknowledgement
  219. */
  220. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  221. timestep += broker.getResponseTime();
  222. returnPackets.add(
  223. new MQTT_packet(MQTT_packet.UNSUBACK, timestep + delayPortToBroker, broker, port));
  224. tops.remove(newTopic);
  225. }
  226. }
  227. }
  228. }
  229. }
  230. if (pubs.contains(port) || pubSubs.contains(port) && Math.random() < 0.3) {
  231. /**
  232. * Topic which should be published to
  233. */
  234. String newTopic = null;
  235. /**
  236. * Value which should be published
  237. */
  238. String newValue = null;
  239. boolean isBoolean = false;
  240. /**
  241. * True if value anomaly (value = -1)
  242. */
  243. boolean valueAnomaly = (device.getLabel()==(short)-1);
  244. /**
  245. * Check if FloatSensor
  246. */
  247. if (device instanceof FloatSensor) {
  248. FloatSensor fSensor = (FloatSensor) device;
  249. newTopic = fSensor.getFSinfoName();
  250. newValue = "" + fSensor.getFSval();
  251. }
  252. /**
  253. * Check if BoolSensor - if also FloatSensor: 50% chance of
  254. * overriding
  255. */
  256. if (device instanceof BoolSensor) {
  257. BoolSensor bSensor = (BoolSensor) device;
  258. if (newTopic == null || new Random().nextBoolean()) {
  259. newTopic = bSensor.getBSinfoName();
  260. newValue = "" + bSensor.getBSval();
  261. isBoolean = true;
  262. }
  263. }
  264. if (newTopic != null) {
  265. /**
  266. * Packet for publishing the new value
  267. */
  268. Packet pubPacket = null;
  269. if(isBoolean) {
  270. pubPacket = new MQTTpublishPacket(timestep, port, broker, newTopic, Boolean.parseBoolean(newValue));
  271. }else {
  272. pubPacket = new MQTTpublishPacket(timestep, port, broker, newTopic, Float.parseFloat(newValue));
  273. }
  274. if(valueAnomaly) {
  275. pubPacket.setLabel((short) 1);
  276. }
  277. /**
  278. * Send Packet
  279. */
  280. returnPackets.add(pubPacket);
  281. /**
  282. * Delay to the broker
  283. */
  284. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  285. // Publish to Subscribers
  286. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  287. /**
  288. * Response/Acknowledgement
  289. */
  290. timestep += broker.getResponseTime() + delayPortToBroker;
  291. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
  292. for (Port p : subs) {
  293. /**
  294. * Skip unsubcribed ports
  295. */
  296. if (!subbedTopics.get(p).contains(newTopic))
  297. continue;
  298. /**
  299. * Delay broker to subscriber
  300. */
  301. long delayBrokerToSub = broker.getTransmissionDelayTo(p);
  302. timestep += broker.getResponseTime();
  303. /**
  304. * Packet Broker -> Subscriber
  305. */
  306. if(isBoolean) {
  307. pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Boolean.parseBoolean(newValue));
  308. }else {
  309. pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Float.parseFloat(newValue));
  310. }
  311. if(valueAnomaly) {
  312. pubPacket.setLabel((short) 1);
  313. }
  314. returnPackets.add(pubPacket);
  315. if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
  316. returnPackets.add(
  317. new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
  318. // Update Collector
  319. if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
  320. && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
  321. /**
  322. * original Float Value -> if it might change during the two events
  323. */
  324. float oldValue = ((FloatSensor) device).getFSval();
  325. /**
  326. * Schedule Event to update the sensor
  327. * -> therefore multiple parallel interactions would be in the right order
  328. */
  329. SimulationManager.scheduleEvent(new AbstractEvent(timestep + p.getResponseTime()) {
  330. @Override
  331. public void simulateEvent(long time) {
  332. ((FloatCollector) p.getOwner()).setFCval(oldValue);
  333. }
  334. });
  335. } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
  336. && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
  337. /**
  338. * Original sensor value
  339. */
  340. boolean oldValue = ((BoolSensor) device).getBSval();
  341. /**
  342. * Schedule Event to update the sensor
  343. * -> therefore multiple parallel interactions would be in the right order
  344. */
  345. SimulationManager.scheduleEvent(new AbstractEvent(timestep + p.getResponseTime()) {
  346. @Override
  347. public void simulateEvent(long time) {
  348. ((BoolCollector) p.getOwner()).setBCval(oldValue);
  349. }
  350. });
  351. }
  352. }
  353. }
  354. for (Port p : pubSubs) {
  355. /**
  356. * Skip unsubscribed Ports
  357. */
  358. if (!subbedTopics.get(p).contains(newTopic) || p.getOwner() == null)
  359. continue;
  360. /**
  361. * Delay broker to subscriber
  362. */
  363. long delayBrokerToSub = broker.getTransmissionDelayTo(p);
  364. timestep += broker.getResponseTime();
  365. if(isBoolean) {
  366. pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Boolean.parseBoolean(newValue));
  367. }else {
  368. pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Float.parseFloat(newValue));
  369. }
  370. if(valueAnomaly) {
  371. pubPacket.setLabel((short) 1);
  372. }
  373. returnPackets.add(pubPacket);
  374. if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
  375. returnPackets.add(
  376. new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
  377. // Update Collector
  378. if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
  379. && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
  380. ((FloatCollector) p.getOwner()).setFCval(((FloatSensor) device).getFSval());
  381. } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
  382. && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
  383. ((BoolCollector) p.getOwner()).setBCval(((BoolSensor) device).getBSval());
  384. }
  385. }
  386. }
  387. }
  388. }
  389. }
  390. /**
  391. * Rare Ping request to broker
  392. */
  393. if (Math.random() < 0.05 && port != broker) {
  394. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
  395. /**
  396. * Delay broker to subscriber
  397. */
  398. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  399. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE)
  400. returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
  401. timestep + delayPortToBroker + broker.getResponseTime(), broker, port));
  402. }
  403. return returnPackets;
  404. }
  405. @Override
  406. public int getNumberOfRoles() {
  407. return 4;
  408. }
  409. @Override
  410. public String[] getRoles() {
  411. // PublisherSubscriber is Publisher as well as Subscriber
  412. return new String[] { "Broker", "PublisherSubscriber", "Publisher", "Subscriber" };
  413. }
  414. @Override
  415. public Collection<Port> getDevicesWithRole(int role) {
  416. switch (role) {
  417. case 0:
  418. return new LinkedList<Port>(Arrays.asList(broker));
  419. case 1:
  420. return pubSubs;
  421. case 2:
  422. return pubs;
  423. case 3:
  424. return subs;
  425. default:
  426. return null;
  427. }
  428. }
  429. @Override
  430. public boolean addDeviceOfRole(Port device, int role) {
  431. /*
  432. * First device has to be the Broker
  433. */
  434. if (broker == null) {
  435. if (role == 0) {
  436. broker = device;
  437. updateBrokerOnDeletedConnections(null);
  438. return true;
  439. } else {
  440. return false;
  441. }
  442. }
  443. switch (role) {
  444. case 0:
  445. // Just one broker allowed.
  446. return false;
  447. case 1:
  448. pubSubs.add(device);
  449. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  450. break;
  451. case 2:
  452. pubs.add(device);
  453. break;
  454. case 3:
  455. subs.add(device);
  456. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  457. break;
  458. default:
  459. // invalid role
  460. return false;
  461. }
  462. // Create packets for the connecting Client
  463. currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size() / 2, device, broker));
  464. currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size() / 2 + 1, broker, device));
  465. return true;
  466. }
  467. @Override
  468. public void removeDevice(Port device) {
  469. /**
  470. * true if the device was removed
  471. */
  472. boolean removedDevice = false;
  473. if (broker == device) {
  474. broker = null;
  475. deletedConnectionLinks
  476. .add(new Pair<Port, Port>(new Port(device.getOwner(), (short) -1), deletedBrokerPort));
  477. updateBrokerOnDeletedConnections(device);
  478. }
  479. removedDevice |= pubSubs.remove(device);
  480. removedDevice |= subs.remove(device);
  481. removedDevice |= pubs.remove(device);
  482. // Remove Port from topics and clear its list
  483. LinkedList<String> oldTopics = subbedTopics.remove(device);
  484. if (oldTopics != null)
  485. oldTopics.clear();
  486. if (removedDevice) {
  487. if (broker == null) {
  488. deletedConnectionLinks.add(new Pair<Port, Port>(deletedBrokerPort, device));
  489. } else {
  490. deletedConnectionLinks.add(new Pair<Port, Port>(broker, device));
  491. // If not the broker and device was removed -> disconnect
  492. currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size() / 2, device, broker));
  493. }
  494. }
  495. }
  496. @Override
  497. public String getName() {
  498. return "MQTT";
  499. }
  500. @Override
  501. public int getRoleOfDevice(Port device) {
  502. if (device == null)
  503. return -1;
  504. if (device == broker)
  505. return 0;
  506. if (pubSubs.contains(device))
  507. return 1;
  508. if (pubs.contains(device))
  509. return 2;
  510. if (subs.contains(device))
  511. return 3;
  512. return -1;
  513. }
  514. @Override
  515. public Collection<Port> getDevices() {
  516. LinkedList<Port> returnDevices = new LinkedList<Port>();
  517. if (broker != null)
  518. returnDevices.add(broker);
  519. returnDevices.addAll(pubSubs);
  520. returnDevices.addAll(pubs);
  521. returnDevices.addAll(subs);
  522. return returnDevices;
  523. }
  524. @Override
  525. public byte getTopologyType() {
  526. return STAR;
  527. }
  528. @Override
  529. public Collection<Pair<Port, Port>> getTopology() {
  530. LinkedList<Pair<Port, Port>> topology = new LinkedList<Pair<Port, Port>>();
  531. Port center = broker;
  532. calcDeletedBrokerPosition();
  533. if (broker == null)
  534. center = new Port(deletedBroker, (short) -1);
  535. for (Port p : pubSubs)
  536. topology.add(new Pair<Port, Port>(center, p));
  537. for (Port p : pubs)
  538. topology.add(new Pair<Port, Port>(center, p));
  539. for (Port p : subs)
  540. topology.add(new Pair<Port, Port>(center, p));
  541. return topology;
  542. }
  543. @Override
  544. public Collection<Pair<Port, Port>> getDeletedTopology() {
  545. calcDeletedBrokerPosition();
  546. return deletedConnectionLinks;
  547. }
  548. /**
  549. * Calculate and update the position of the deleted Broker
  550. */
  551. private void calcDeletedBrokerPosition() {
  552. if (broker == null) {
  553. int x = 0, y = 0, noP = 0;
  554. for (Port p : getDevices()) {
  555. if (p != null && p.getOwner() != null) {
  556. x += p.getOwner().getX();
  557. y += p.getOwner().getY();
  558. noP++;
  559. }
  560. }
  561. if (noP == 0)
  562. return;
  563. deletedBroker.setX(((int) (x * 1.0) / noP));
  564. deletedBroker.setY(((int) (y * 1.0) / noP));
  565. }
  566. }
  567. /**
  568. * Update the broker port on the deleted Connections
  569. *
  570. * @param device
  571. * old broker
  572. */
  573. private void updateBrokerOnDeletedConnections(Port device) {
  574. for (Pair<Port, Port> p : deletedConnectionLinks) {
  575. if (broker == null) {
  576. if (p.getLeft() == device)
  577. p.setLeft(deletedBrokerPort);
  578. if (p.getRight() == device)
  579. p.setRight(deletedBrokerPort);
  580. } else {
  581. if (p.getLeft() == deletedBrokerPort)
  582. p.setLeft(broker);
  583. if (p.getRight() == deletedBrokerPort)
  584. p.setRight(broker);
  585. }
  586. }
  587. }
  588. }