MQTT_protocol.java 18 KB


  1. package de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols;
  2. import java.util.Arrays;
  3. import java.util.Collection;
  4. import java.util.HashMap;
  5. import java.util.Iterator;
  6. import java.util.LinkedList;
  7. import java.util.Random;
  8. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
  9. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port;
  10. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol;
  11. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SimulationManager;
  12. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice;
  13. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolCollector;
  14. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolSensor;
  15. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatCollector;
  16. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatSensor;
  17. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet;
  18. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.scheduler.AbstractEvent;
  19. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair;
  20. /**
  21. * Implementation of the MQTT Protocol to generate packets for the simulation
  22. *
  23. *
  24. * @author Andreas T. Meyer-Berg
  25. */
  26. public class MQTT_protocol implements Protocol {
  27. /**
  28. * Broker which collects and distributes messages
  29. */
  30. private Port broker;
  31. /**
  32. * Publishers like sensors, which publish data
  33. */
  34. private LinkedList<Port> pubs;
  35. /**
  36. * Subscriber which subscribe to different Topics
  37. */
  38. private LinkedList<Port> subs;
  39. /**
  40. * Topics subscribed by each Port
  41. */
  42. private HashMap<Port, LinkedList<String>> subbedTopics = new HashMap<Port, LinkedList<String>>();
  43. /**
  44. * Devices that are Publisher and Subscriber and therefore send and receive
  45. * messages
  46. */
  47. private LinkedList<Port> pubSubs;
  48. /**
  49. * Packets which are currently being generated or were generated after the
  50. * last generatePackets Call
  51. */
  52. private LinkedList<Packet> currentPackets = new LinkedList<Packet>();
  53. // For Visualization in case of deleting the broker
  54. private LinkedList<Pair<Port, Port>> deletedConnectionLinks = new LinkedList<Pair<Port, Port>>();
  55. private SmartDevice deletedBroker = new SmartDevice("DeletedBroker");
  56. private Port deletedBrokerPort = new Port(deletedBroker, (short) -1);
  57. /**
  58. * Topics of the MQTT Broker
  59. */
  60. // private LinkedList<String> topics = new LinkedList<String>();
  61. /**
  62. * Creates a new MQTT Protocol
  63. */
  64. public MQTT_protocol() {
  65. this.broker = null;
  66. initialize();
  67. }
  68. /**
  69. * Creates a new MQTT Protocol
  70. *
  71. * @param broker
  72. * broker of the protocol
  73. */
  74. public MQTT_protocol(Port broker) {
  75. this.broker = broker;
  76. initialize();
  77. }
  78. /**
  79. * Initializes the different fields
  80. */
  81. private void initialize() {
  82. subs = new LinkedList<Port>();
  83. pubs = new LinkedList<Port>();
  84. pubSubs = new LinkedList<Port>();
  85. }
  86. @Override
  87. public Collection<Packet> generateNextPackets(Port port, long timestep, boolean packetLost) {
  88. /**
  89. * Packets which will be generated
  90. */
  91. LinkedList<Packet> returnPackets = new LinkedList<Packet>();
  92. // Update all Timestamps of previous created Packets
  93. for (Packet p : currentPackets) {
  94. p.setTimestamp(p.getTimestamp() + timestep);
  95. }
  96. // Add these packets to the return
  97. returnPackets.addAll(currentPackets);
  98. // remove packets from the old list
  99. currentPackets.clear();
  100. // Clear deleted connections
  101. deletedConnectionLinks.clear();
  102. // Return termination packets
  103. if (port == null)
  104. return returnPackets;
  105. SmartDevice device = port.getOwner();
  106. if (device == null)
  107. return returnPackets;
  108. /**
  109. * Update the lastTime the port was triggered
  110. */
  111. port.setLastTrigger(timestep);
  112. /**
  113. * if port null, skip this step
  114. */
  115. if (broker == null)
  116. return returnPackets;
  117. /**
  118. * Generate new Packets regarding to their class
  119. */
  120. if (port == broker) {
  121. // Broker should not send new packages, without new messages
  122. // But could perform some Ping request to random participants
  123. Collection<Port> devices = getDevices();
  124. devices.remove(port);
  125. Port dest = null;
  126. Iterator<Port> it = devices.iterator();
  127. for (int i = 0; i < (Math.random() * devices.size()) && it.hasNext(); i++) {
  128. dest = it.next();
  129. }
  130. if (dest != null) {
  131. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
  132. /**
  133. * Delay to the destination
  134. */
  135. long delayBrokerDest = broker.getTransmissionDelayTo(dest);
  136. if (dest.getStatus() != Port.CLOSED && delayBrokerDest != Long.MAX_VALUE)
  137. returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
  138. timestep + delayBrokerDest + dest.getJitter() + dest.getResponseTime(), dest, port));
  139. }
  140. }
  141. if (subs.contains(port) || pubSubs.contains(port)) {
  142. // Subs can either subscribe to topics or unsubscribe
  143. /**
  144. * Topics, the SmartDevice is subscribed to
  145. */
  146. LinkedList<String> tops = subbedTopics.get(port);
  147. /**
  148. * Topic which should be subscribed to
  149. */
  150. String newTopic = null;
  151. /**
  152. * Check if FloatCollector & not subscribed
  153. */
  154. if (device instanceof FloatCollector) {
  155. FloatCollector fCollector = (FloatCollector) device;
  156. if (!tops.contains(fCollector.getFCinfoName()))
  157. newTopic = fCollector.getFCinfoName();
  158. }
  159. /**
  160. * Check if BoolCollector & not subscribed
  161. */
  162. if (device instanceof BoolCollector) {
  163. BoolCollector bCollector = (BoolCollector) device;
  164. if (!tops.contains(bCollector.getBCinfoName()))
  165. newTopic = bCollector.getBCinfoName();
  166. }
  167. // Subscribe if no subscriptions so far or not subscribed to all,
  168. // with a probability of70%
  169. if (newTopic != null) {
  170. /**
  171. * Subscribe Request
  172. */
  173. returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
  174. /**
  175. * Delay to the broker
  176. */
  177. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  178. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  179. timestep += broker.getResponseTime();
  180. returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep + delayPortToBroker, broker, port));
  181. tops.add(newTopic);
  182. }
  183. } else {
  184. /**
  185. * Check if some topics should be unsubscribed (e.g., one topic
  186. * name change)
  187. */
  188. newTopic = null;
  189. LinkedList<String> oldTopics = new LinkedList<String>(tops);
  190. if (!oldTopics.isEmpty()) {
  191. /**
  192. * Check if FloatCollector & not subscribed
  193. */
  194. if (device instanceof FloatCollector) {
  195. FloatCollector fCollector = (FloatCollector) device;
  196. oldTopics.remove(fCollector.getFCinfoName());
  197. }
  198. /**
  199. * Check if BoolCollector & not subscribed
  200. */
  201. if (device instanceof BoolCollector) {
  202. BoolCollector bCollector = (BoolCollector) device;
  203. oldTopics.remove(bCollector.getBCinfoName());
  204. }
  205. if (!oldTopics.isEmpty()) {
  206. newTopic = oldTopics.getFirst();
  207. /**
  208. * Send Unsubscribe Packet
  209. */
  210. returnPackets.add(
  211. new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
  212. /**
  213. * Delay to the broker
  214. */
  215. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  216. /**
  217. * Ackknowledgement
  218. */
  219. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  220. timestep += broker.getResponseTime();
  221. returnPackets.add(
  222. new MQTT_packet(MQTT_packet.UNSUBACK, timestep + delayPortToBroker, broker, port));
  223. tops.remove(newTopic);
  224. }
  225. }
  226. }
  227. }
  228. }
  229. if (pubs.contains(port) || pubSubs.contains(port) && Math.random() < 0.3) {
  230. /**
  231. * Topic which should be published to
  232. */
  233. String newTopic = null;
  234. /**
  235. * Value which should be published
  236. */
  237. String newValue = null;
  238. /**
  239. * Check if FloatSensor
  240. */
  241. if (device instanceof FloatSensor) {
  242. FloatSensor fSensor = (FloatSensor) device;
  243. newTopic = fSensor.getFSinfoName();
  244. newValue = "" + fSensor.getFSval();
  245. }
  246. /**
  247. * Check if BoolSensor - if also FloatSensor: 50% chance of
  248. * overriding
  249. */
  250. if (device instanceof BoolSensor) {
  251. BoolSensor bSensor = (BoolSensor) device;
  252. if (newTopic == null || new Random().nextBoolean()) {
  253. newTopic = bSensor.getBSinfoName();
  254. newValue = "" + bSensor.getBSval();
  255. }
  256. }
  257. if (newTopic != null) {
  258. /**
  259. * Message to be published
  260. */
  261. String msg = "Topic:" + newTopic + ":" + newValue;
  262. /**
  263. * Send Packet
  264. */
  265. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, port, broker, msg));
  266. /**
  267. * Delay to the broker
  268. */
  269. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  270. // Publish to Subscribers
  271. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  272. /**
  273. * Response/Acknowledgement
  274. */
  275. timestep += broker.getResponseTime() + delayPortToBroker;
  276. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
  277. for (Port p : subs) {
  278. /**
  279. * Skip unsubcribed ports
  280. */
  281. if (!subbedTopics.get(p).contains(newTopic))
  282. continue;
  283. /**
  284. * Delay broker to subscriber
  285. */
  286. long delayBrokerToSub = broker.getTransmissionDelayTo(p);
  287. timestep += broker.getResponseTime();
  288. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
  289. if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
  290. returnPackets.add(
  291. new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
  292. // Update Collector
  293. if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
  294. && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
  295. /**
  296. * original Float Value -> if it might change during the two events
  297. */
  298. float oldValue = ((FloatSensor) device).getFSval();
  299. /**
  300. * Schedule Event to update the sensor
  301. * -> therefore multiple parallel interactions would be in the right order
  302. */
  303. SimulationManager.scheduleEvent(new AbstractEvent(timestep + p.getResponseTime()) {
  304. @Override
  305. public void simulateEvent(long time) {
  306. ((FloatCollector) p.getOwner()).setFCval(oldValue);
  307. }
  308. });
  309. } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
  310. && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
  311. /**
  312. * Original sensor value
  313. */
  314. boolean oldValue = ((BoolSensor) device).getBSval();
  315. /**
  316. * Schedule Event to update the sensor
  317. * -> therefore multiple parallel interactions would be in the right order
  318. */
  319. SimulationManager.scheduleEvent(new AbstractEvent(timestep + p.getResponseTime()) {
  320. @Override
  321. public void simulateEvent(long time) {
  322. ((BoolCollector) p.getOwner()).setBCval(oldValue);
  323. }
  324. });
  325. }
  326. }
  327. }
  328. for (Port p : pubSubs) {
  329. /**
  330. * Skip unsubscribed Ports
  331. */
  332. if (!subbedTopics.get(p).contains(newTopic) || p.getOwner() == null)
  333. continue;
  334. /**
  335. * Delay broker to subscriber
  336. */
  337. long delayBrokerToSub = broker.getTransmissionDelayTo(p);
  338. timestep += broker.getResponseTime();
  339. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
  340. if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
  341. returnPackets.add(
  342. new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
  343. // Update Collector
  344. if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
  345. && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
  346. ((FloatCollector) p.getOwner()).setFCval(((FloatSensor) device).getFSval());
  347. } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
  348. && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
  349. ((BoolCollector) p.getOwner()).setBCval(((BoolSensor) device).getBSval());
  350. }
  351. }
  352. }
  353. }
  354. }
  355. }
  356. /**
  357. * Rare Ping request to broker
  358. */
  359. if (Math.random() < 0.05 && port != broker) {
  360. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
  361. /**
  362. * Delay broker to subscriber
  363. */
  364. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  365. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE)
  366. returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
  367. timestep + delayPortToBroker + broker.getResponseTime(), broker, port));
  368. }
  369. return returnPackets;
  370. }
  371. @Override
  372. public int getNumberOfRoles() {
  373. return 4;
  374. }
  375. @Override
  376. public String[] getRoles() {
  377. // PublisherSubscriber is Publisher as well as Subscriber
  378. return new String[] { "Broker", "PublisherSubscriber", "Publisher", "Subscriber" };
  379. }
  380. @Override
  381. public Collection<Port> getDevicesWithRole(int role) {
  382. switch (role) {
  383. case 0:
  384. return new LinkedList<Port>(Arrays.asList(broker));
  385. case 1:
  386. return pubSubs;
  387. case 2:
  388. return pubs;
  389. case 3:
  390. return subs;
  391. default:
  392. return null;
  393. }
  394. }
  395. @Override
  396. public boolean addDeviceOfRole(Port device, int role) {
  397. /*
  398. * First device has to be the Broker
  399. */
  400. if (broker == null) {
  401. if (role == 0) {
  402. broker = device;
  403. updateBrokerOnDeletedConnections(null);
  404. return true;
  405. } else {
  406. return false;
  407. }
  408. }
  409. switch (role) {
  410. case 0:
  411. // Just one broker allowed.
  412. return false;
  413. case 1:
  414. pubSubs.add(device);
  415. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  416. break;
  417. case 2:
  418. pubs.add(device);
  419. break;
  420. case 3:
  421. subs.add(device);
  422. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  423. break;
  424. default:
  425. // invalid role
  426. return false;
  427. }
  428. // Create packets for the connecting Client
  429. currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size() / 2, device, broker));
  430. currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size() / 2 + 1, broker, device));
  431. return true;
  432. }
  433. @Override
  434. public void removeDevice(Port device) {
  435. /**
  436. * true if the device was removed
  437. */
  438. boolean removedDevice = false;
  439. if (broker == device) {
  440. broker = null;
  441. deletedConnectionLinks
  442. .add(new Pair<Port, Port>(new Port(device.getOwner(), (short) -1), deletedBrokerPort));
  443. updateBrokerOnDeletedConnections(device);
  444. }
  445. removedDevice |= pubSubs.remove(device);
  446. removedDevice |= subs.remove(device);
  447. removedDevice |= pubs.remove(device);
  448. // Remove Port from topics and clear its list
  449. LinkedList<String> oldTopics = subbedTopics.remove(device);
  450. if (oldTopics != null)
  451. oldTopics.clear();
  452. if (removedDevice) {
  453. if (broker == null) {
  454. deletedConnectionLinks.add(new Pair<Port, Port>(deletedBrokerPort, device));
  455. } else {
  456. deletedConnectionLinks.add(new Pair<Port, Port>(broker, device));
  457. // If not the broker and device was removed -> disconnect
  458. currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size() / 2, device, broker));
  459. }
  460. }
  461. }
  462. @Override
  463. public String getName() {
  464. return "MQTT";
  465. }
  466. @Override
  467. public int getRoleOfDevice(Port device) {
  468. if (device == null)
  469. return -1;
  470. if (device == broker)
  471. return 0;
  472. if (pubSubs.contains(device))
  473. return 1;
  474. if (pubs.contains(device))
  475. return 2;
  476. if (subs.contains(device))
  477. return 3;
  478. return -1;
  479. }
  480. @Override
  481. public Collection<Port> getDevices() {
  482. LinkedList<Port> returnDevices = new LinkedList<Port>();
  483. if (broker != null)
  484. returnDevices.add(broker);
  485. returnDevices.addAll(pubSubs);
  486. returnDevices.addAll(pubs);
  487. returnDevices.addAll(subs);
  488. return returnDevices;
  489. }
  490. @Override
  491. public byte getTopologyType() {
  492. return STAR;
  493. }
  494. @Override
  495. public Collection<Pair<Port, Port>> getTopology() {
  496. LinkedList<Pair<Port, Port>> topology = new LinkedList<Pair<Port, Port>>();
  497. Port center = broker;
  498. calcDeletedBrokerPosition();
  499. if (broker == null)
  500. center = new Port(deletedBroker, (short) -1);
  501. for (Port p : pubSubs)
  502. topology.add(new Pair<Port, Port>(center, p));
  503. for (Port p : pubs)
  504. topology.add(new Pair<Port, Port>(center, p));
  505. for (Port p : subs)
  506. topology.add(new Pair<Port, Port>(center, p));
  507. return topology;
  508. }
  509. @Override
  510. public Collection<Pair<Port, Port>> getDeletedTopology() {
  511. calcDeletedBrokerPosition();
  512. return deletedConnectionLinks;
  513. }
  514. /**
  515. * Calculate and update the position of the deleted Broker
  516. */
  517. private void calcDeletedBrokerPosition() {
  518. if (broker == null) {
  519. int x = 0, y = 0, noP = 0;
  520. for (Port p : getDevices()) {
  521. if (p != null && p.getOwner() != null) {
  522. x += p.getOwner().getX();
  523. y += p.getOwner().getY();
  524. noP++;
  525. }
  526. }
  527. if (noP == 0)
  528. return;
  529. deletedBroker.setX(((int) (x * 1.0) / noP));
  530. deletedBroker.setY(((int) (y * 1.0) / noP));
  531. }
  532. }
  533. /**
  534. * Update the broker port on the deleted Connections
  535. *
  536. * @param device
  537. * old broker
  538. */
  539. private void updateBrokerOnDeletedConnections(Port device) {
  540. for (Pair<Port, Port> p : deletedConnectionLinks) {
  541. if (broker == null) {
  542. if (p.getLeft() == device)
  543. p.setLeft(deletedBrokerPort);
  544. if (p.getRight() == device)
  545. p.setRight(deletedBrokerPort);
  546. } else {
  547. if (p.getLeft() == deletedBrokerPort)
  548. p.setLeft(broker);
  549. if (p.getRight() == deletedBrokerPort)
  550. p.setRight(broker);
  551. }
  552. }
  553. }
  554. }