SimpleMQTT.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. package de.tu_darmstadt.tk.SmartHomeNetworkSim.core.simpleImplementation;
  2. import java.util.Arrays;
  3. import java.util.Collection;
  4. import java.util.HashMap;
  5. import java.util.LinkedList;
  6. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
  7. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port;
  8. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol;
  9. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice;
  10. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet;
  11. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair;
  12. /**
  13. * SimpleImplementation of the MQTT Protocol to generate packets for the simulation, it generates random Publish/Subscribe packets for one of three topics with random boolean values
  14. *
  15. *
  16. * @author Andreas T. Meyer-Berg
  17. */
  18. public class SimpleMQTT implements Protocol {
  19. /**
  20. * Broker which collects and distributes messages
  21. */
  22. private Port broker;
  23. /**
  24. * Publishers like sensors, which publish data
  25. */
  26. private LinkedList<Port> pubs;
  27. /**
  28. * Subscriber which subscribe to different Topics
  29. */
  30. private LinkedList<Port> subs;
  31. /**
  32. * Topics subscribed by each Port
  33. */
  34. private HashMap<Port,LinkedList<String>> subbedTopics = new HashMap<Port,LinkedList<String>>();
  35. /**
  36. * Devices that are Publisher and Subscriber and therefore send and receive
  37. * messages
  38. */
  39. private LinkedList<Port> pubSubs;
  40. /**
  41. * Packets which are currently being generated or were generated after the
  42. * last generatePackets Call
  43. */
  44. private LinkedList<Packet> currentPackets = new LinkedList<Packet>();
  45. private LinkedList<Pair<Port,Port>> deletedConnectionLinks = new LinkedList<Pair<Port,Port>>();
  46. private SmartDevice deletedBroker = new SmartDevice("DeletedBroker");
  47. private Port deletedBrokerPort = new Port(deletedBroker,(short)-1);
  48. /**
  49. * Topics of the MQTT Broker
  50. */
  51. private LinkedList<String> topics = new LinkedList<String>();
  52. /**
  53. * Creates a new MQTT Protocol
  54. */
  55. public SimpleMQTT() {
  56. this.broker = null;
  57. initialize();
  58. }
  59. /**
  60. * Creates a new MQTT Protocol
  61. *
  62. * @param broker broker of the protocol
  63. */
  64. public SimpleMQTT(Port broker) {
  65. this.broker = broker;
  66. initialize();
  67. }
  68. /**
  69. * Initializes the different fields
  70. */
  71. private void initialize() {
  72. topics.add("/home/temperatureHot");
  73. topics.add("/home/doorOpen");
  74. topics.add("/home/lightOn");
  75. subs = new LinkedList<Port>();
  76. pubs = new LinkedList<Port>();
  77. pubSubs = new LinkedList<Port>();
  78. }
  79. @Override
  80. public Collection<Packet> generateNextPackets(Port port, long timestep, boolean packetLost) {
  81. /**
  82. * Packets which will be generated
  83. */
  84. LinkedList<Packet> returnPackets = new LinkedList<Packet>();
  85. //Update all Timestamps of previous created Packets
  86. for(Packet p: currentPackets){
  87. p.setTimestamp(p.getTimestamp()+timestep);
  88. }
  89. //Add these packets to the return
  90. returnPackets.addAll(currentPackets);
  91. //remove packets from the old list
  92. currentPackets.clear();
  93. //Clear deleted connections
  94. deletedConnectionLinks.clear();
  95. //Return termination packets
  96. if(port==null)return returnPackets;
  97. /**
  98. * Update the lastTime the port was triggered
  99. */
  100. port.setLastTrigger(timestep);
  101. /**
  102. * if port null, skip this step
  103. */
  104. if(broker==null)return returnPackets;
  105. /**
  106. * Generate new Packets regarding to their class
  107. */
  108. if(port==broker){
  109. //Broker should not send new packages, without new messages
  110. //But could perform some Ping request to random participants
  111. Collection<Port> devices = getDevices();
  112. devices.remove(port);
  113. Port dest = null;
  114. java.util.Iterator<Port> it = devices.iterator();
  115. for(int i=0; i<(Math.random()*devices.size())&&it.hasNext();i++){
  116. dest = it.next();
  117. }
  118. if(dest != null){
  119. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
  120. /**
  121. * Delay to the destination
  122. */
  123. long delayBrokerDest = broker.getTransmissionDelayTo(dest);
  124. if(dest.getStatus()!=Port.CLOSED && delayBrokerDest!=Long.MAX_VALUE)
  125. returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP, timestep+delayBrokerDest+dest.getJitter()+dest.getResponseTime(), dest, port));
  126. }
  127. }
  128. if(subs.contains(port)||pubSubs.contains(port)){
  129. //Subs can either subscribe to topics or unsubscribe
  130. /**
  131. * Topics, the SmartDevice is subscribed to
  132. */
  133. LinkedList<String> tops= subbedTopics.get(port);
  134. /**
  135. * Topic which should be subscribed or unsubscribed
  136. */
  137. String newTopic;
  138. //Subscribe if no subscriptions so far or not subscribed to all, with a probability of70%
  139. if(tops.size()==0||(tops.size()!=topics.size()&&Math.random()<0.7)){
  140. //Forced Subscribe
  141. LinkedList<String> available = new LinkedList<String>();
  142. available.addAll(topics);
  143. available.removeAll(tops);
  144. newTopic = available.get((int) Math.floor(Math.random()*available.size()));
  145. returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
  146. /**
  147. * Delay to the broker
  148. */
  149. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  150. if(broker.getStatus()!=Port.CLOSED && delayPortToBroker!=Long.MAX_VALUE){
  151. timestep+=broker.getResponseTime();
  152. returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep+delayPortToBroker, broker, port));
  153. tops.add(newTopic);
  154. }
  155. }
  156. else if(tops.size()==topics.size()){
  157. //Forced Unsubscribe
  158. newTopic = tops.get((int) Math.floor(Math.random()*tops.size()));
  159. returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
  160. /**
  161. * Delay to the broker
  162. */
  163. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  164. if(broker.getStatus()!=Port.CLOSED && delayPortToBroker!=Long.MAX_VALUE){
  165. timestep+=broker.getResponseTime();
  166. returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBACK, timestep+delayPortToBroker, broker, port));
  167. tops.remove(newTopic);
  168. }
  169. }
  170. }
  171. if(pubs.contains(port)||pubSubs.contains(port)&&Math.random()<0.3){
  172. String newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
  173. /**
  174. * Message to be published (Should be further specialized by a Sensor Device)
  175. */
  176. String msg = "Topic:"+newTopic+":"+(Math.random()<0.5?"true":"false");
  177. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, port, broker, msg));
  178. /**
  179. * Delay to the broker
  180. */
  181. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  182. //Publish to Subscribers
  183. //Should be be improved to just notify Subs that are subscribed to the topic
  184. if(broker.getStatus()!=Port.CLOSED && delayPortToBroker!=Long.MAX_VALUE){
  185. //Response
  186. timestep+=broker.getResponseTime()+delayPortToBroker;
  187. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
  188. for(Port p:subs){
  189. if(!subbedTopics.get(p).contains(newTopic))continue;//Skip unsubbed ports
  190. /**
  191. * Delay broker to subscriber
  192. */
  193. long delayBrokerToSub = broker.getTransmissionDelayTo(p);
  194. timestep+=broker.getResponseTime();
  195. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
  196. if(p.getStatus()!=Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE)
  197. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
  198. }
  199. for(Port p:pubSubs){
  200. if(!subbedTopics.get(p).contains(newTopic))continue;//skip unsubbed ports
  201. timestep+=broker.getResponseTime();
  202. /**
  203. * Delay broker to subscriber
  204. */
  205. long delayBrokerToSub = broker.getTransmissionDelayTo(p);
  206. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
  207. if(p.getStatus()!=Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE)
  208. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
  209. }
  210. }
  211. }
  212. if(Math.random() < 0.05 && port != broker){
  213. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
  214. /**
  215. * Delay broker to subscriber
  216. */
  217. long delayPortToBroker = port.getTransmissionDelayTo(broker);
  218. if(broker.getStatus()!=Port.CLOSED && delayPortToBroker != Long.MAX_VALUE)
  219. returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP, timestep+delayPortToBroker+broker.getResponseTime(), broker, port));
  220. }
  221. return returnPackets;
  222. }
  223. @Override
  224. public int getNumberOfRoles() {
  225. return 4;
  226. }
  227. @Override
  228. public String[] getRoles() {
  229. // PublisherSubscriber is Publisher as well as Subscriber
  230. return new String[] { "Broker", "PublisherSubscriber", "Publisher", "Subscriber" };
  231. }
  232. @Override
  233. public Collection<Port> getDevicesWithRole(int role) {
  234. switch (role) {
  235. case 0:
  236. return new LinkedList<Port>(Arrays.asList(broker));
  237. case 1:
  238. return pubSubs;
  239. case 2:
  240. return pubs;
  241. case 3:
  242. return subs;
  243. default:
  244. return null;
  245. }
  246. }
  247. @Override
  248. public boolean addDeviceOfRole(Port device, int role) {
  249. /*
  250. * First device has to be the Broker
  251. */
  252. if (broker==null) {
  253. if (role == 0) {
  254. broker = device;
  255. updateBrokerOnDeletedConnections(null);
  256. return true;
  257. } else {
  258. return false;
  259. }
  260. }
  261. switch (role) {
  262. case 0:
  263. //Just one broker allowed.
  264. return false;
  265. case 1:
  266. pubSubs.add(device);
  267. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  268. break;
  269. case 2:
  270. pubs.add(device);
  271. break;
  272. case 3:
  273. subs.add(device);
  274. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  275. break;
  276. default:
  277. // invalid role
  278. return false;
  279. }
  280. // Create packets for the connecting Client
  281. currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size()/2, device, broker));
  282. currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size()/2+1, broker, device));
  283. return true;
  284. }
  285. @Override
  286. public void removeDevice(Port device) {
  287. /**
  288. * true if the device was removed
  289. */
  290. boolean removedDevice=false;
  291. if (broker == device){
  292. broker = null;
  293. deletedConnectionLinks.add(new Pair<Port, Port>(new Port(device.getOwner(), (short)-1), deletedBrokerPort));
  294. updateBrokerOnDeletedConnections(device);
  295. }
  296. removedDevice|=pubSubs.remove(device);
  297. removedDevice|=subs.remove(device);
  298. removedDevice|=pubs.remove(device);
  299. //Remove Port from topics and clear its list
  300. LinkedList<String> oldTopics = subbedTopics.remove(device);
  301. if(oldTopics!=null)oldTopics.clear();
  302. if(removedDevice){
  303. if(broker == null){
  304. deletedConnectionLinks.add(new Pair<Port, Port>(deletedBrokerPort,device));
  305. }else{
  306. deletedConnectionLinks.add(new Pair<Port, Port>(broker, device));
  307. //If not the broker and device was removed -> disconnect
  308. currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size()/2, device, broker));
  309. }
  310. }
  311. }
  312. public void addTopic(String topic){
  313. topics.add(topic);
  314. }
  315. @Override
  316. public String getName() {
  317. return "SimpleMQTT";
  318. }
  319. @Override
  320. public int getRoleOfDevice(Port device){
  321. if(device == null)
  322. return -1;
  323. if(device == broker)
  324. return 0;
  325. if(pubSubs.contains(device))
  326. return 1;
  327. if(pubs.contains(device))
  328. return 2;
  329. if(subs.contains(device))
  330. return 3;
  331. return -1;
  332. }
  333. @Override
  334. public Collection<Port> getDevices(){
  335. LinkedList<Port> returnDevices = new LinkedList<Port>();
  336. if(broker!=null)
  337. returnDevices.add(broker);
  338. returnDevices.addAll(pubSubs);
  339. returnDevices.addAll(pubs);
  340. returnDevices.addAll(subs);
  341. return returnDevices;
  342. }
  343. @Override
  344. public byte getTopologyType() {
  345. return STAR;
  346. }
  347. @Override
  348. public Collection<Pair<Port,Port>> getTopology(){
  349. LinkedList<Pair<Port,Port>> topology = new LinkedList<Pair<Port,Port>>();
  350. Port center = broker;
  351. calcDeletedBrokerPosition();
  352. if(broker==null)
  353. center = new Port(deletedBroker, (short)-1);
  354. for(Port p: pubSubs)
  355. topology.add(new Pair<Port, Port>(center, p));
  356. for(Port p: pubs)
  357. topology.add(new Pair<Port, Port>(center, p));
  358. for(Port p: subs)
  359. topology.add(new Pair<Port, Port>(center, p));
  360. return topology;
  361. }
  362. @Override
  363. public Collection<Pair<Port, Port>> getDeletedTopology() {
  364. calcDeletedBrokerPosition();
  365. return deletedConnectionLinks;
  366. }
  367. /**
  368. * Calculate and update the position of the deleted Broker
  369. */
  370. private void calcDeletedBrokerPosition(){
  371. if(broker == null){
  372. int x = 0, y = 0, noP = 0;
  373. for(Port p: getDevices()){
  374. if(p!=null && p.getOwner()!=null){
  375. x += p.getOwner().getX();
  376. y += p.getOwner().getY();
  377. noP++;
  378. }
  379. }
  380. if(noP==0)
  381. return;
  382. deletedBroker.setX(((int)(x*1.0)/noP));
  383. deletedBroker.setY(((int)(y*1.0)/noP));
  384. }
  385. }
  386. /**
  387. * Update the broker port on the deleted Connections
  388. *
  389. * @param device old broker
  390. */
  391. private void updateBrokerOnDeletedConnections(Port device){
  392. for(Pair<Port, Port> p: deletedConnectionLinks){
  393. if(broker == null){
  394. if(p.getLeft() == device)
  395. p.setLeft(deletedBrokerPort);
  396. if(p.getRight() == device)
  397. p.setRight(deletedBrokerPort);
  398. }else{
  399. if(p.getLeft() == deletedBrokerPort)
  400. p.setLeft(broker);
  401. if(p.getRight() == deletedBrokerPort)
  402. p.setRight(broker);
  403. }
  404. }
  405. }
  406. }