MQTT_protocol.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. package de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols;
  2. import java.util.Arrays;
  3. import java.util.Collection;
  4. import java.util.HashMap;
  5. import java.util.Iterator;
  6. import java.util.LinkedList;
  7. import java.util.Random;
  8. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
  9. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port;
  10. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol;
  11. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice;
  12. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolCollector;
  13. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolSensor;
  14. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatCollector;
  15. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatSensor;
  16. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet;
  17. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair;
  18. /**
  19. * Implementation of the MQTT Protocol to generate packets for the simulation
  20. *
  21. *
  22. * @author Andreas T. Meyer-Berg
  23. */
  24. public class MQTT_protocol implements Protocol {
  25. /**
  26. * Broker which collects and distributes messages
  27. */
  28. private Port broker;
  29. /**
  30. * Publishers like sensors, which publish data
  31. */
  32. private LinkedList<Port> pubs;
  33. /**
  34. * Subscriber which subscribe to different Topics
  35. */
  36. private LinkedList<Port> subs;
  37. /**
  38. * Topics subscribed by each Port
  39. */
  40. private HashMap<Port, LinkedList<String>> subbedTopics = new HashMap<Port, LinkedList<String>>();
  41. /**
  42. * Devices that are Publisher and Subscriber and therefore send and receive
  43. * messages
  44. */
  45. private LinkedList<Port> pubSubs;
  46. /**
  47. * Packets which are currently being generated or were generated after the
  48. * last generatePackets Call
  49. */
  50. private LinkedList<Packet> currentPackets = new LinkedList<Packet>();
  51. // For Visualization in case of deleting the broker
  52. private LinkedList<Pair<Port, Port>> deletedConnectionLinks = new LinkedList<Pair<Port, Port>>();
  53. private SmartDevice deletedBroker = new SmartDevice("DeletedBroker");
  54. private Port deletedBrokerPort = new Port(deletedBroker, (short) -1);
  55. /**
  56. * Topics of the MQTT Broker
  57. */
  58. // private LinkedList<String> topics = new LinkedList<String>();
  59. /**
  60. * Creates a new MQTT Protocol
  61. */
  62. public MQTT_protocol() {
  63. this.broker = null;
  64. initialize();
  65. }
  66. /**
  67. * Creates a new MQTT Protocol
  68. *
  69. * @param broker
  70. * broker of the protocol
  71. */
  72. public MQTT_protocol(Port broker) {
  73. this.broker = broker;
  74. initialize();
  75. }
  76. /**
  77. * Initializes the different fields
  78. */
  79. private void initialize() {
  80. subs = new LinkedList<Port>();
  81. pubs = new LinkedList<Port>();
  82. pubSubs = new LinkedList<Port>();
  83. }
  84. @Override
  85. public Collection<Packet> generateNextPackets(Port port, long timestep, boolean packetLost) {
  86. /**
  87. * Packets which will be generated
  88. */
  89. LinkedList<Packet> returnPackets = new LinkedList<Packet>();
  90. // Update all Timestamps of previous created Packets
  91. for (Packet p : currentPackets) {
  92. p.setTimestamp(p.getTimestamp() + timestep);
  93. }
  94. // Add these packets to the return
  95. returnPackets.addAll(currentPackets);
  96. // remove packets from the old list
  97. currentPackets.clear();
  98. // Clear deleted connections
  99. deletedConnectionLinks.clear();
  100. // Return termination packets
  101. if (port == null)
  102. return returnPackets;
  103. SmartDevice device = port.getOwner();
  104. if (device == null)
  105. return returnPackets;
  106. /**
  107. * Update the lastTime the port was triggered
  108. */
  109. port.setLastTrigger(timestep);
  110. /**
  111. * if port null, skip this step
  112. */
  113. if (broker == null)
  114. return returnPackets;
  115. /**
  116. * Generate new Packets regarding to their class
  117. */
  118. if (port == broker) {
  119. // Broker should not send new packages, without new messages
  120. // But could perform some Ping request to random participants
  121. Collection<Port> devices = getDevices();
  122. devices.remove(port);
  123. Port dest = null;
  124. Iterator<Port> it = devices.iterator();
  125. for (int i = 0; i < (Math.random() * devices.size()) && it.hasNext(); i++) {
  126. dest = it.next();
  127. }
  128. if (dest != null) {
  129. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
  130. /**
  131. * Delay to the destination
  132. */
  133. long delayBrokerDest = broker.getTransmissionDelayTo(dest);
  134. if (dest.getStatus() != Port.CLOSED && delayBrokerDest != Long.MAX_VALUE)
  135. returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
  136. timestep + delayBrokerDest + dest.getJitter() + dest.getResponseTime(), dest, port));
  137. }
  138. }
  139. if (subs.contains(port) || pubSubs.contains(port)) {
  140. // Subs can either subscribe to topics or unsubscribe
  141. /**
  142. * Topics, the SmartDevice is subscribed to
  143. */
  144. LinkedList<String> tops = subbedTopics.get(port);
  145. /**
  146. * Topic which should be subscribed to
  147. */
  148. String newTopic = null;
  149. /**
  150. * Check if FloatCollector & not subscribed
  151. */
  152. if (device instanceof FloatCollector) {
  153. FloatCollector fCollector = (FloatCollector) device;
  154. if (!tops.contains(fCollector.getFCinfoName()))
  155. newTopic = fCollector.getFCinfoName();
  156. }
  157. /**
  158. * Check if BoolCollector & not subscribed
  159. */
  160. if (device instanceof BoolCollector) {
  161. BoolCollector bCollector = (BoolCollector) device;
  162. if (!tops.contains(bCollector.getBCinfoName()))
  163. newTopic = bCollector.getBCinfoName();
  164. }
  165. // Subscribe if no subscriptions so far or not subscribed to all,
  166. // with a probability of70%
  167. if (newTopic != null) {
  168. /**
  169. * Subscribe Request
  170. */
  171. returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
  172. /**
  173. * Delay to the broker
  174. */
  175. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  176. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  177. timestep += broker.getResponseTime();
  178. returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep + delayPortToBroker, broker, port));
  179. tops.add(newTopic);
  180. }
  181. } else {
  182. /**
  183. * Check if some topics should be unsubscribed (e.g., one topic
  184. * name change)
  185. */
  186. newTopic = null;
  187. LinkedList<String> oldTopics = new LinkedList<String>(tops);
  188. if (!oldTopics.isEmpty()) {
  189. /**
  190. * Check if FloatCollector & not subscribed
  191. */
  192. if (device instanceof FloatCollector) {
  193. FloatCollector fCollector = (FloatCollector) device;
  194. oldTopics.remove(fCollector.getFCinfoName());
  195. }
  196. /**
  197. * Check if BoolCollector & not subscribed
  198. */
  199. if (device instanceof BoolCollector) {
  200. BoolCollector bCollector = (BoolCollector) device;
  201. oldTopics.remove(bCollector.getBCinfoName());
  202. }
  203. if (!oldTopics.isEmpty()) {
  204. newTopic = oldTopics.getFirst();
  205. /**
  206. * Send Unsubscribe Packet
  207. */
  208. returnPackets.add(
  209. new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
  210. /**
  211. * Delay to the broker
  212. */
  213. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  214. /**
  215. * Ackknowledgement
  216. */
  217. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  218. timestep += broker.getResponseTime();
  219. returnPackets.add(
  220. new MQTT_packet(MQTT_packet.UNSUBACK, timestep + delayPortToBroker, broker, port));
  221. tops.remove(newTopic);
  222. }
  223. }
  224. }
  225. }
  226. }
  227. if (pubs.contains(port) || pubSubs.contains(port) && Math.random() < 0.3) {
  228. /**
  229. * Topic which should be published to
  230. */
  231. String newTopic = null;
  232. /**
  233. * Value which should be published
  234. */
  235. String newValue = null;
  236. /**
  237. * Check if FloatSensor
  238. */
  239. if (device instanceof FloatSensor) {
  240. FloatSensor fSensor = (FloatSensor) device;
  241. newTopic = fSensor.getFSinfoName();
  242. newValue = "" + fSensor.getFSval();
  243. }
  244. /**
  245. * Check if BoolSensor - if also FloatSensor: 50% chance of
  246. * overriding
  247. */
  248. if (device instanceof BoolSensor) {
  249. BoolSensor bSensor = (BoolSensor) device;
  250. if (newTopic == null || new Random().nextBoolean()) {
  251. newTopic = bSensor.getBSinfoName();
  252. newValue = "" + bSensor.getBSval();
  253. }
  254. }
  255. if (newTopic != null) {
  256. /**
  257. * Message to be published
  258. */
  259. String msg = "Topic:" + newTopic + ":" + newValue;
  260. /**
  261. * Send Packet
  262. */
  263. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, port, broker, msg));
  264. /**
  265. * Delay to the broker
  266. */
  267. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  268. // Publish to Subscribers
  269. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
  270. /**
  271. * Response/Acknowledgement
  272. */
  273. timestep += broker.getResponseTime() + delayPortToBroker;
  274. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
  275. for (Port p : subs) {
  276. /**
  277. * Skip unsubcribed ports
  278. */
  279. if (!subbedTopics.get(p).contains(newTopic))
  280. continue;
  281. /**
  282. * Delay broker to subscriber
  283. */
  284. long delayBrokerToSub = broker.getTransmissionDelayTo(p);
  285. timestep += broker.getResponseTime();
  286. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
  287. if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
  288. returnPackets.add(
  289. new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
  290. // Update Collector
  291. if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
  292. && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
  293. ((FloatCollector) p.getOwner()).setFCval(((FloatSensor) device).getFSval());
  294. } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
  295. && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
  296. ((BoolCollector) p.getOwner()).setBCval(((BoolSensor) device).getBSval());
  297. }
  298. }
  299. }
  300. for (Port p : pubSubs) {
  301. /**
  302. * Skip unsubscribed Ports
  303. */
  304. if (!subbedTopics.get(p).contains(newTopic) || p.getOwner() == null)
  305. continue;
  306. /**
  307. * Delay broker to subscriber
  308. */
  309. long delayBrokerToSub = broker.getTransmissionDelayTo(p);
  310. timestep += broker.getResponseTime();
  311. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
  312. if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
  313. returnPackets.add(
  314. new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
  315. // Update Collector
  316. if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
  317. && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
  318. ((FloatCollector) p.getOwner()).setFCval(((FloatSensor) device).getFSval());
  319. } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
  320. && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
  321. ((BoolCollector) p.getOwner()).setBCval(((BoolSensor) device).getBSval());
  322. }
  323. }
  324. }
  325. }
  326. }
  327. }
  328. /**
  329. * Rare Ping request to broker
  330. */
  331. if (Math.random() < 0.05 && port != broker) {
  332. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
  333. /**
  334. * Delay broker to subscriber
  335. */
  336. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  337. if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE)
  338. returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
  339. timestep + delayPortToBroker + broker.getResponseTime(), broker, port));
  340. }
  341. return returnPackets;
  342. }
  343. @Override
  344. public int getNumberOfRoles() {
  345. return 4;
  346. }
  347. @Override
  348. public String[] getRoles() {
  349. // PublisherSubscriber is Publisher as well as Subscriber
  350. return new String[] { "Broker", "PublisherSubscriber", "Publisher", "Subscriber" };
  351. }
  352. @Override
  353. public Collection<Port> getDevicesWithRole(int role) {
  354. switch (role) {
  355. case 0:
  356. return new LinkedList<Port>(Arrays.asList(broker));
  357. case 1:
  358. return pubSubs;
  359. case 2:
  360. return pubs;
  361. case 3:
  362. return subs;
  363. default:
  364. return null;
  365. }
  366. }
  367. @Override
  368. public boolean addDeviceOfRole(Port device, int role) {
  369. /*
  370. * First device has to be the Broker
  371. */
  372. if (broker == null) {
  373. if (role == 0) {
  374. broker = device;
  375. updateBrokerOnDeletedConnections(null);
  376. return true;
  377. } else {
  378. return false;
  379. }
  380. }
  381. switch (role) {
  382. case 0:
  383. // Just one broker allowed.
  384. return false;
  385. case 1:
  386. pubSubs.add(device);
  387. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  388. break;
  389. case 2:
  390. pubs.add(device);
  391. break;
  392. case 3:
  393. subs.add(device);
  394. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  395. break;
  396. default:
  397. // invalid role
  398. return false;
  399. }
  400. // Create packets for the connecting Client
  401. currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size() / 2, device, broker));
  402. currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size() / 2 + 1, broker, device));
  403. return true;
  404. }
  405. @Override
  406. public void removeDevice(Port device) {
  407. /**
  408. * true if the device was removed
  409. */
  410. boolean removedDevice = false;
  411. if (broker == device) {
  412. broker = null;
  413. deletedConnectionLinks
  414. .add(new Pair<Port, Port>(new Port(device.getOwner(), (short) -1), deletedBrokerPort));
  415. updateBrokerOnDeletedConnections(device);
  416. }
  417. removedDevice |= pubSubs.remove(device);
  418. removedDevice |= subs.remove(device);
  419. removedDevice |= pubs.remove(device);
  420. // Remove Port from topics and clear its list
  421. LinkedList<String> oldTopics = subbedTopics.remove(device);
  422. if (oldTopics != null)
  423. oldTopics.clear();
  424. if (removedDevice) {
  425. if (broker == null) {
  426. deletedConnectionLinks.add(new Pair<Port, Port>(deletedBrokerPort, device));
  427. } else {
  428. deletedConnectionLinks.add(new Pair<Port, Port>(broker, device));
  429. // If not the broker and device was removed -> disconnect
  430. currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size() / 2, device, broker));
  431. }
  432. }
  433. }
  434. @Override
  435. public String getName() {
  436. return "MQTT";
  437. }
  438. @Override
  439. public int getRoleOfDevice(Port device) {
  440. if (device == null)
  441. return -1;
  442. if (device == broker)
  443. return 0;
  444. if (pubSubs.contains(device))
  445. return 1;
  446. if (pubs.contains(device))
  447. return 2;
  448. if (subs.contains(device))
  449. return 3;
  450. return -1;
  451. }
  452. @Override
  453. public Collection<Port> getDevices() {
  454. LinkedList<Port> returnDevices = new LinkedList<Port>();
  455. if (broker != null)
  456. returnDevices.add(broker);
  457. returnDevices.addAll(pubSubs);
  458. returnDevices.addAll(pubs);
  459. returnDevices.addAll(subs);
  460. return returnDevices;
  461. }
  462. @Override
  463. public byte getTopologyType() {
  464. return STAR;
  465. }
  466. @Override
  467. public Collection<Pair<Port, Port>> getTopology() {
  468. LinkedList<Pair<Port, Port>> topology = new LinkedList<Pair<Port, Port>>();
  469. Port center = broker;
  470. calcDeletedBrokerPosition();
  471. if (broker == null)
  472. center = new Port(deletedBroker, (short) -1);
  473. for (Port p : pubSubs)
  474. topology.add(new Pair<Port, Port>(center, p));
  475. for (Port p : pubs)
  476. topology.add(new Pair<Port, Port>(center, p));
  477. for (Port p : subs)
  478. topology.add(new Pair<Port, Port>(center, p));
  479. return topology;
  480. }
  481. @Override
  482. public Collection<Pair<Port, Port>> getDeletedTopology() {
  483. calcDeletedBrokerPosition();
  484. return deletedConnectionLinks;
  485. }
  486. /**
  487. * Calculate and update the position of the deleted Broker
  488. */
  489. private void calcDeletedBrokerPosition() {
  490. if (broker == null) {
  491. int x = 0, y = 0, noP = 0;
  492. for (Port p : getDevices()) {
  493. if (p != null && p.getOwner() != null) {
  494. x += p.getOwner().getX();
  495. y += p.getOwner().getY();
  496. noP++;
  497. }
  498. }
  499. if (noP == 0)
  500. return;
  501. deletedBroker.setX(((int) (x * 1.0) / noP));
  502. deletedBroker.setY(((int) (y * 1.0) / noP));
  503. }
  504. }
  505. /**
  506. * Update the broker port on the deleted Connections
  507. *
  508. * @param device
  509. * old broker
  510. */
  511. private void updateBrokerOnDeletedConnections(Port device) {
  512. for (Pair<Port, Port> p : deletedConnectionLinks) {
  513. if (broker == null) {
  514. if (p.getLeft() == device)
  515. p.setLeft(deletedBrokerPort);
  516. if (p.getRight() == device)
  517. p.setRight(deletedBrokerPort);
  518. } else {
  519. if (p.getLeft() == deletedBrokerPort)
  520. p.setLeft(broker);
  521. if (p.getRight() == deletedBrokerPort)
  522. p.setRight(broker);
  523. }
  524. }
  525. }
  526. }