MQTT_protocol.java 19 KB


  1. package de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols;
  2. import java.util.Arrays;
  3. import java.util.Collection;
  4. import java.util.HashMap;
  5. import java.util.Iterator;
  6. import java.util.LinkedList;
  7. import java.util.Random;
  8. import org.hamcrest.core.IsAnything;
  9. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
  10. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port;
  11. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol;
  12. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SimulationManager;
  13. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice;
  14. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolCollector;
  15. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolSensor;
  16. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatCollector;
  17. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatSensor;
  18. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet;
  19. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTTpublishPacket;
  20. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.scheduler.AbstractEvent;
  21. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair;
  22. /**
  23. * Implementation of the MQTT Protocol to generate packets for the simulation
  24. *
  25. *
  26. * @author Andreas T. Meyer-Berg
  27. */
  28. public class MQTT_protocol implements Protocol {
  29. /**
  30. * Broker which collects and distributes messages
  31. */
  32. private Port broker;
  33. /**
  34. * Publishers like sensors, which publish data
  35. */
  36. private LinkedList<Port> pubs;
  37. /**
  38. * Subscriber which subscribe to different Topics
  39. */
  40. private LinkedList<Port> subs;
  41. /**
  42. * Topics subscribed by each Port
  43. */
  44. private HashMap<Port, LinkedList<String>> subbedTopics = new HashMap<Port, LinkedList<String>>();
  45. /**
  46. * Devices that are Publisher and Subscriber and therefore send and receive
  47. * messages
  48. */
  49. private LinkedList<Port> pubSubs;
  50. /**
  51. * Packets which are currently being generated or were generated after the
  52. * last generatePackets Call
  53. */
  54. private LinkedList<Packet> currentPackets = new LinkedList<Packet>();
  55. // For Visualization in case of deleting the broker
  56. private LinkedList<Pair<Port, Port>> deletedConnectionLinks = new LinkedList<Pair<Port, Port>>();
  57. private SmartDevice deletedBroker = new SmartDevice("DeletedBroker");
  58. private Port deletedBrokerPort = new Port(deletedBroker, (short) -1);
  59. /**
  60. * Topics of the MQTT Broker
  61. */
  62. // private LinkedList<String> topics = new LinkedList<String>();
  63. /**
  64. * Creates a new MQTT Protocol
  65. */
  66. public MQTT_protocol() {
  67. this.broker = null;
  68. initialize();
  69. }
  70. /**
  71. * Creates a new MQTT Protocol
  72. *
  73. * @param broker
  74. * broker of the protocol
  75. */
  76. public MQTT_protocol(Port broker) {
  77. this.broker = broker;
  78. initialize();
  79. }
  80. /**
  81. * Initializes the different fields
  82. */
  83. private void initialize() {
  84. subs = new LinkedList<Port>();
  85. pubs = new LinkedList<Port>();
  86. pubSubs = new LinkedList<Port>();
  87. }
  88. @Override
  89. public Collection<Packet> generateNextPackets(Port port, long timestep, boolean packetLost) {
  90. /**
  91. * Packets which will be generated
  92. */
  93. LinkedList<Packet> returnPackets = new LinkedList<Packet>();
  94. // Update all Timestamps of previous created Packets
  95. for (Packet p : currentPackets) {
  96. p.setTimestamp(p.getTimestamp() + timestep);
  97. }
  98. // Add these packets to the return
  99. returnPackets.addAll(currentPackets);
  100. // remove packets from the old list
  101. currentPackets.clear();
  102. // Clear deleted connections
  103. deletedConnectionLinks.clear();
  104. // Return termination packets
  105. if (port == null)
  106. return returnPackets;
  107. SmartDevice device = port.getOwner();
  108. if (device == null)
  109. return returnPackets;
  110. /**
  111. * Update the lastTime the port was triggered
  112. */
  113. port.setLastTrigger(timestep);
  114. /**
  115. * if port null, skip this step
  116. */
  117. if (broker == null)
  118. return returnPackets;
  119. /**
  120. * Generate new Packets regarding to their class
  121. */
  122. if (port == broker) {
  123. // Broker should not send new packages, without new messages
  124. // But could perform some Ping request to random participants
  125. Collection<Port> devices = getDevices();
  126. devices.remove(port);
  127. Port dest = null;
  128. Iterator<Port> it = devices.iterator();
  129. for (int i = 0; i < (Math.random() * devices.size()) && it.hasNext(); i++) {
  130. dest = it.next();
  131. }
  132. if (dest != null) {
  133. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
  134. /**
  135. * Delay to the destination
  136. */
  137. long delayBrokerDest = broker.getTransmissionDelayTo(dest);
  138. if (dest.getStatus() != Port.CLOSED && delayBrokerDest != Long.MAX_VALUE)
  139. returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
  140. timestep + delayBrokerDest + dest.getJitter() + dest.getResponseTime(), dest, port));
  141. }
  142. }
  143. if (subs.contains(port) || pubSubs.contains(port)) {
  144. // Subs can either subscribe to topics or unsubscribe
  145. /**
  146. * Topics, the SmartDevice is subscribed to
  147. */
  148. LinkedList<String> tops = subbedTopics.get(port);
  149. /**
  150. * Topic which should be subscribed to
  151. */
  152. String newTopic = null;
  153. /**
  154. * Check if FloatCollector & not subscribed
  155. */
  156. if (device instanceof FloatCollector) {
  157. FloatCollector fCollector = (FloatCollector) device;
  158. if (!tops.contains(fCollector.getFCinfoName()))
  159. newTopic = fCollector.getFCinfoName();
  160. }
  161. /**
  162. * Check if BoolCollector & not subscribed
  163. */
  164. if (device instanceof BoolCollector) {
  165. BoolCollector bCollector = (BoolCollector) device;
  166. if (!tops.contains(bCollector.getBCinfoName()))
  167. newTopic = bCollector.getBCinfoName();
  168. }
  169. // Subscribe if no subscriptions so far or not subscribed to all,
  170. // with a probability of70%
  171. if (newTopic != null) {
  172. /**
  173. * Subscribe Request
  174. */
  175. returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
  176. /**
  177. * Delay to the broker
  178. */
  179. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  180. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  181. timestep += broker.getResponseTime();
  182. returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep + delayPortToBroker, broker, port));
  183. tops.add(newTopic);
  184. }
  185. } else {
  186. /**
  187. * Check if some topics should be unsubscribed (e.g., one topic
  188. * name change)
  189. */
  190. newTopic = null;
  191. LinkedList<String> oldTopics = new LinkedList<String>(tops);
  192. if (!oldTopics.isEmpty()) {
  193. /**
  194. * Check if FloatCollector & not subscribed
  195. */
  196. if (device instanceof FloatCollector) {
  197. FloatCollector fCollector = (FloatCollector) device;
  198. oldTopics.remove(fCollector.getFCinfoName());
  199. }
  200. /**
  201. * Check if BoolCollector & not subscribed
  202. */
  203. if (device instanceof BoolCollector) {
  204. BoolCollector bCollector = (BoolCollector) device;
  205. oldTopics.remove(bCollector.getBCinfoName());
  206. }
  207. if (!oldTopics.isEmpty()) {
  208. newTopic = oldTopics.getFirst();
  209. /**
  210. * Send Unsubscribe Packet
  211. */
  212. returnPackets.add(
  213. new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
  214. /**
  215. * Delay to the broker
  216. */
  217. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  218. /**
  219. * Ackknowledgement
  220. */
  221. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  222. timestep += broker.getResponseTime();
  223. returnPackets.add(
  224. new MQTT_packet(MQTT_packet.UNSUBACK, timestep + delayPortToBroker, broker, port));
  225. tops.remove(newTopic);
  226. }
  227. }
  228. }
  229. }
  230. }
  231. if (pubs.contains(port) || pubSubs.contains(port) && Math.random() < 0.3) {
  232. /**
  233. * Topic which should be published to
  234. */
  235. String newTopic = null;
  236. /**
  237. * Value which should be published
  238. */
  239. String newValue = null;
  240. boolean isBoolean = false;
  241. /**
  242. * True if value anomaly (value = -1)
  243. */
  244. boolean valueAnomaly = (device.getLabel()==(short)-1);
  245. /**
  246. * Check if FloatSensor
  247. */
  248. if (device instanceof FloatSensor) {
  249. FloatSensor fSensor = (FloatSensor) device;
  250. newTopic = fSensor.getFSinfoName();
  251. newValue = "" + fSensor.getFSval();
  252. }
  253. /**
  254. * Check if BoolSensor - if also FloatSensor: 50% chance of
  255. * overriding
  256. */
  257. if (device instanceof BoolSensor) {
  258. BoolSensor bSensor = (BoolSensor) device;
  259. if (newTopic == null || new Random().nextBoolean()) {
  260. newTopic = bSensor.getBSinfoName();
  261. newValue = "" + bSensor.getBSval();
  262. isBoolean = true;
  263. }
  264. }
  265. if (newTopic != null) {
  266. /**
  267. * Packet for publishing the new value
  268. */
  269. Packet pubPacket = null;
  270. if(isBoolean) {
  271. pubPacket = new MQTTpublishPacket(timestep, port, broker, newTopic, Boolean.parseBoolean(newValue));
  272. }else {
  273. pubPacket = new MQTTpublishPacket(timestep, port, broker, newTopic, Float.parseFloat(newValue));
  274. }
  275. if(valueAnomaly) {
  276. pubPacket.setLabel((short) 1);
  277. }
  278. /**
  279. * Send Packet
  280. */
  281. returnPackets.add(pubPacket);
  282. /**
  283. * Delay to the broker
  284. */
  285. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  286. // Publish to Subscribers
  287. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  288. /**
  289. * Response/Acknowledgement
  290. */
  291. timestep += broker.getResponseTime() + delayPortToBroker;
  292. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
  293. for (Port p : subs) {
  294. /**
  295. * Skip unsubcribed ports
  296. */
  297. if (!subbedTopics.get(p).contains(newTopic))
  298. continue;
  299. /**
  300. * Delay broker to subscriber
  301. */
  302. long delayBrokerToSub = broker.getTransmissionDelayTo(p);
  303. timestep += broker.getResponseTime();
  304. /**
  305. * Packet Broker -> Subscriber
  306. */
  307. if(isBoolean) {
  308. pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Boolean.parseBoolean(newValue));
  309. }else {
  310. pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Float.parseFloat(newValue));
  311. }
  312. if(valueAnomaly) {
  313. pubPacket.setLabel((short) 1);
  314. }
  315. returnPackets.add(pubPacket);
  316. if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
  317. returnPackets.add(
  318. new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
  319. // Update Collector
  320. if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
  321. && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
  322. /**
  323. * original Float Value -> if it might change during the two events
  324. */
  325. float oldValue = ((FloatSensor) device).getFSval();
  326. /**
  327. * Schedule Event to update the sensor
  328. * -> therefore multiple parallel interactions would be in the right order
  329. */
  330. SimulationManager.scheduleEvent(new AbstractEvent(timestep + p.getResponseTime()) {
  331. @Override
  332. public void simulateEvent(long time) {
  333. ((FloatCollector) p.getOwner()).setFCval(oldValue);
  334. }
  335. });
  336. } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
  337. && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
  338. /**
  339. * Original sensor value
  340. */
  341. boolean oldValue = ((BoolSensor) device).getBSval();
  342. /**
  343. * Schedule Event to update the sensor
  344. * -> therefore multiple parallel interactions would be in the right order
  345. */
  346. SimulationManager.scheduleEvent(new AbstractEvent(timestep + p.getResponseTime()) {
  347. @Override
  348. public void simulateEvent(long time) {
  349. ((BoolCollector) p.getOwner()).setBCval(oldValue);
  350. }
  351. });
  352. }
  353. }
  354. }
  355. for (Port p : pubSubs) {
  356. /**
  357. * Skip unsubscribed Ports
  358. */
  359. if (!subbedTopics.get(p).contains(newTopic) || p.getOwner() == null)
  360. continue;
  361. /**
  362. * Delay broker to subscriber
  363. */
  364. long delayBrokerToSub = broker.getTransmissionDelayTo(p);
  365. timestep += broker.getResponseTime();
  366. if(isBoolean) {
  367. pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Boolean.parseBoolean(newValue));
  368. }else {
  369. pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Float.parseFloat(newValue));
  370. }
  371. if(valueAnomaly) {
  372. pubPacket.setLabel((short) 1);
  373. }
  374. returnPackets.add(pubPacket);
  375. if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
  376. returnPackets.add(
  377. new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
  378. // Update Collector
  379. if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
  380. && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
  381. ((FloatCollector) p.getOwner()).setFCval(((FloatSensor) device).getFSval());
  382. } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
  383. && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
  384. ((BoolCollector) p.getOwner()).setBCval(((BoolSensor) device).getBSval());
  385. }
  386. }
  387. }
  388. }
  389. }
  390. }
  391. /**
  392. * Rare Ping request to broker
  393. */
  394. if (Math.random() < 0.05 && port != broker) {
  395. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
  396. /**
  397. * Delay broker to subscriber
  398. */
  399. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  400. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE)
  401. returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
  402. timestep + delayPortToBroker + broker.getResponseTime(), broker, port));
  403. }
  404. return returnPackets;
  405. }
  406. @Override
  407. public int getNumberOfRoles() {
  408. return 4;
  409. }
  410. @Override
  411. public String[] getRoles() {
  412. // PublisherSubscriber is Publisher as well as Subscriber
  413. return new String[] { "Broker", "PublisherSubscriber", "Publisher", "Subscriber" };
  414. }
  415. @Override
  416. public Collection<Port> getDevicesWithRole(int role) {
  417. switch (role) {
  418. case 0:
  419. return new LinkedList<Port>(Arrays.asList(broker));
  420. case 1:
  421. return pubSubs;
  422. case 2:
  423. return pubs;
  424. case 3:
  425. return subs;
  426. default:
  427. return null;
  428. }
  429. }
  430. @Override
  431. public boolean addDeviceOfRole(Port device, int role) {
  432. /*
  433. * First device has to be the Broker
  434. */
  435. if (broker == null) {
  436. if (role == 0) {
  437. broker = device;
  438. updateBrokerOnDeletedConnections(null);
  439. return true;
  440. } else {
  441. return false;
  442. }
  443. }
  444. switch (role) {
  445. case 0:
  446. // Just one broker allowed.
  447. return false;
  448. case 1:
  449. pubSubs.add(device);
  450. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  451. break;
  452. case 2:
  453. pubs.add(device);
  454. break;
  455. case 3:
  456. subs.add(device);
  457. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  458. break;
  459. default:
  460. // invalid role
  461. return false;
  462. }
  463. // Create packets for the connecting Client
  464. currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size() / 2, device, broker));
  465. currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size() / 2 + 1, broker, device));
  466. return true;
  467. }
  468. @Override
  469. public void removeDevice(Port device) {
  470. /**
  471. * true if the device was removed
  472. */
  473. boolean removedDevice = false;
  474. if (broker == device) {
  475. broker = null;
  476. deletedConnectionLinks
  477. .add(new Pair<Port, Port>(new Port(device.getOwner(), (short) -1), deletedBrokerPort));
  478. updateBrokerOnDeletedConnections(device);
  479. }
  480. removedDevice |= pubSubs.remove(device);
  481. removedDevice |= subs.remove(device);
  482. removedDevice |= pubs.remove(device);
  483. // Remove Port from topics and clear its list
  484. LinkedList<String> oldTopics = subbedTopics.remove(device);
  485. if (oldTopics != null)
  486. oldTopics.clear();
  487. if (removedDevice) {
  488. if (broker == null) {
  489. deletedConnectionLinks.add(new Pair<Port, Port>(deletedBrokerPort, device));
  490. } else {
  491. deletedConnectionLinks.add(new Pair<Port, Port>(broker, device));
  492. // If not the broker and device was removed -> disconnect
  493. currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size() / 2, device, broker));
  494. }
  495. }
  496. }
  497. @Override
  498. public String getName() {
  499. return "MQTT";
  500. }
  501. @Override
  502. public int getRoleOfDevice(Port device) {
  503. if (device == null)
  504. return -1;
  505. if (device == broker)
  506. return 0;
  507. if (pubSubs.contains(device))
  508. return 1;
  509. if (pubs.contains(device))
  510. return 2;
  511. if (subs.contains(device))
  512. return 3;
  513. return -1;
  514. }
  515. @Override
  516. public Collection<Port> getDevices() {
  517. LinkedList<Port> returnDevices = new LinkedList<Port>();
  518. if (broker != null)
  519. returnDevices.add(broker);
  520. returnDevices.addAll(pubSubs);
  521. returnDevices.addAll(pubs);
  522. returnDevices.addAll(subs);
  523. return returnDevices;
  524. }
  525. @Override
  526. public byte getTopologyType() {
  527. return STAR;
  528. }
  529. @Override
  530. public Collection<Pair<Port, Port>> getTopology() {
  531. LinkedList<Pair<Port, Port>> topology = new LinkedList<Pair<Port, Port>>();
  532. Port center = broker;
  533. calcDeletedBrokerPosition();
  534. if (broker == null)
  535. center = new Port(deletedBroker, (short) -1);
  536. for (Port p : pubSubs)
  537. topology.add(new Pair<Port, Port>(center, p));
  538. for (Port p : pubs)
  539. topology.add(new Pair<Port, Port>(center, p));
  540. for (Port p : subs)
  541. topology.add(new Pair<Port, Port>(center, p));
  542. return topology;
  543. }
  544. @Override
  545. public Collection<Pair<Port, Port>> getDeletedTopology() {
  546. calcDeletedBrokerPosition();
  547. return deletedConnectionLinks;
  548. }
  549. /**
  550. * Calculate and update the position of the deleted Broker
  551. */
  552. private void calcDeletedBrokerPosition() {
  553. if (broker == null) {
  554. int x = 0, y = 0, noP = 0;
  555. for (Port p : getDevices()) {
  556. if (p != null && p.getOwner() != null) {
  557. x += p.getOwner().getX();
  558. y += p.getOwner().getY();
  559. noP++;
  560. }
  561. }
  562. if (noP == 0)
  563. return;
  564. deletedBroker.setX(((int) (x * 1.0) / noP));
  565. deletedBroker.setY(((int) (y * 1.0) / noP));
  566. }
  567. }
  568. /**
  569. * Update the broker port on the deleted Connections
  570. *
  571. * @param device
  572. * old broker
  573. */
  574. private void updateBrokerOnDeletedConnections(Port device) {
  575. for (Pair<Port, Port> p : deletedConnectionLinks) {
  576. if (broker == null) {
  577. if (p.getLeft() == device)
  578. p.setLeft(deletedBrokerPort);
  579. if (p.getRight() == device)
  580. p.setRight(deletedBrokerPort);
  581. } else {
  582. if (p.getLeft() == deletedBrokerPort)
  583. p.setLeft(broker);
  584. if (p.getRight() == deletedBrokerPort)
  585. p.setRight(broker);
  586. }
  587. }
  588. }
  589. }