MQTT_protocolProject1.javaTest 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. import java.util.Arrays;
  2. import java.util.Collection;
  3. import java.util.HashMap;
  4. import java.util.LinkedList;
  5. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
  6. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port;
  7. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol;
  8. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice;
  9. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet;
  10. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair;
  11. /**
  12. * Implementation of the MQTT Protocol to generate packets for the simulation
  13. *
  14. *
  15. * @author Andreas T. Meyer-Berg
  16. */
  17. public class MQTT_protocolProject1 implements Protocol {
  18. /**
  19. * Broker which collects and distributes messages
  20. */
  21. private Port broker;
  22. /**
  23. * Publishers like sensors, which publish data
  24. */
  25. private LinkedList<Port> pubs;
  26. /**
  27. * Subscriber which subscribe to different Topics
  28. */
  29. private LinkedList<Port> subs;
  30. /**
  31. * Topics subscribed by each Port
  32. */
  33. private HashMap<Port,LinkedList<String>> subbedTopics = new HashMap<Port,LinkedList<String>>();
  34. /**
  35. * Devices that are Publisher and Subscriber and therefore send and receive
  36. * messages
  37. */
  38. private LinkedList<Port> pubSubs;
  39. /**
  40. * Packets which are currently being generated or were generated after the
  41. * last generatePackets Call
  42. */
  43. private LinkedList<Packet> currentPackets = new LinkedList<Packet>();
  44. private LinkedList<Pair<Port,Port>> deletedConnectionLinks = new LinkedList<Pair<Port,Port>>();
  45. private SmartDevice deletedBroker = new SmartDevice("DeletedBroker");
  46. private Port deletedBrokerPort = new Port(deletedBroker,(short)-1);
  47. /**
  48. * Topics of the MQTT Broker
  49. */
  50. private LinkedList<String> topics = new LinkedList<String>();
  51. /**
  52. * Creates a new MQTT Protocol
  53. */
  54. public MQTT_protocolProject1() {
  55. this.broker = null;
  56. initialize();
  57. }
  58. /**
  59. * Creates a new MQTT Protocol
  60. *
  61. * @param broker broker of the protocol
  62. */
  63. public MQTT_protocolProject1(Port broker) {
  64. this.broker = broker;
  65. initialize();
  66. }
  67. /**
  68. * Initializes the different fields
  69. */
  70. private void initialize() {
  71. topics.add("/home/temperatureHot");
  72. topics.add("/home/doorOpen");
  73. topics.add("/home/lightOn");
  74. subs = new LinkedList<Port>();
  75. pubs = new LinkedList<Port>();
  76. pubSubs = new LinkedList<Port>();
  77. }
  78. @Override
  79. public Collection<Packet> generateNextPackets(Port port, long timestep, boolean packetLost) {
  80. /**
  81. * Packets which will be generated
  82. */
  83. LinkedList<Packet> returnPackets = new LinkedList<Packet>();
  84. //Update all Timestamps of previous created Packets
  85. for(Packet p: currentPackets){
  86. p.setTimestamp(p.getTimestamp()+timestep);
  87. }
  88. //Add these packets to the return
  89. returnPackets.addAll(currentPackets);
  90. //remove packets from the old list
  91. currentPackets.clear();
  92. //Clear deleted connections
  93. deletedConnectionLinks.clear();
  94. //Return termination packets
  95. if(port==null)return returnPackets;
  96. /**
  97. * Update the lastTime the port was triggered
  98. */
  99. port.setLastTrigger(timestep);
  100. /**
  101. * if port null, skip this step
  102. */
  103. if(broker==null)return returnPackets;
  104. /**
  105. * Generate new Packets regarding to their class
  106. */
  107. if(port==broker){
  108. //Broker should not send new packages, without new messages
  109. //But could perform some Ping request to random participants
  110. Collection<Port> devices = getDevices();
  111. devices.remove(port);
  112. Port dest = null;
  113. java.util.Iterator<Port> it = devices.iterator();
  114. for(int i=0; i<(Math.random()*devices.size())&&it.hasNext();i++){
  115. dest = it.next();
  116. }
  117. if(dest != null){
  118. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
  119. if(dest.getStatus()!=Port.CLOSED)
  120. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, dest, port));
  121. }
  122. }else if(subs.contains(port)||pubSubs.contains(port)){
  123. //Subs can either subscribe to topics or unsubscribe
  124. /**
  125. * Topics, the SmartDevice is subscribed to
  126. */
  127. LinkedList<String> tops= subbedTopics.get(port);
  128. /**
  129. * Topic which should be subscribed or unsubscribed
  130. */
  131. String newTopic;
  132. //Subscribe if no subscriptions so far or not subscribed to all, with a probability of70%
  133. if(tops.size()==0||(tops.size()!=subbedTopics.size()&&Math.random()<0.7)){
  134. //Forced Subscribe
  135. newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
  136. returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
  137. if(broker.getStatus()!=Port.CLOSED){
  138. timestep+=broker.getResponseTime();
  139. returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
  140. tops.add(newTopic);
  141. }
  142. }
  143. else if(tops.size()==subbedTopics.size()){
  144. //Forced Unsubscribe
  145. newTopic = tops.get((int) Math.floor(Math.random()*tops.size()));
  146. returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
  147. if(broker.getStatus()!=Port.CLOSED){
  148. timestep+=broker.getResponseTime();
  149. returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBACK, timestep, broker, port));
  150. tops.remove(newTopic);
  151. }
  152. }
  153. if(pubSubs.contains(port)&&Math.random()<0.3){
  154. //When also Pub, publish sometimes
  155. newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
  156. returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "Topic:"+newTopic));
  157. if(broker.getStatus()!=Port.CLOSED){
  158. timestep+=broker.getResponseTime();
  159. returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
  160. }
  161. }
  162. }
  163. if(pubs.contains(port)||pubSubs.contains(port)&&Math.random()<0.3){
  164. String newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
  165. /**
  166. * Message to be published (Should be further specialized by a Sensor Device)
  167. */
  168. String msg = "Topic:"+newTopic+":"+(Math.random()<0.5?"true":"false");
  169. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, port, broker, msg));
  170. //Response
  171. timestep+=broker.getResponseTime();
  172. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
  173. //Publish to Subscribers
  174. //Should be be improved to just notify Subs that are subscribed to the topic
  175. if(broker.getStatus()!=Port.CLOSED){
  176. for(Port p:subs){
  177. if(!subbedTopics.get(p).contains(newTopic))continue;//Skip unsubbed ports
  178. timestep+=broker.getResponseTime();
  179. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
  180. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
  181. }
  182. for(Port p:pubSubs){
  183. if(!subbedTopics.get(p).contains(newTopic))continue;//skip unsubbed ports
  184. timestep+=broker.getResponseTime();
  185. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
  186. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
  187. }
  188. }
  189. }
  190. if(Math.random() < 0.05 && port != broker){
  191. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
  192. if(port.getStatus()!=Port.CLOSED)
  193. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, broker, port));
  194. }
  195. return returnPackets;
  196. }
  197. @Override
  198. public int getNumberOfRoles() {
  199. return 4;
  200. }
  201. @Override
  202. public String[] getRoles() {
  203. // PublisherSubscriber is Publisher as well as Subscriber
  204. return new String[] { "Broker", "PublisherSubscriber", "Publisher", "Subscriber" };
  205. }
  206. @Override
  207. public Collection<Port> getDevicesWithRole(int role) {
  208. switch (role) {
  209. case 0:
  210. return new LinkedList<Port>(Arrays.asList(broker));
  211. case 1:
  212. return pubSubs;
  213. case 2:
  214. return pubs;
  215. case 3:
  216. return subs;
  217. default:
  218. return null;
  219. }
  220. }
  221. @Override
  222. public boolean addDeviceOfRole(Port device, int role) {
  223. /*
  224. * First device has to be the Broker
  225. */
  226. if (broker==null) {
  227. if (role == 0) {
  228. broker = device;
  229. updateBrokerOnDeletedConnections(null);
  230. return true;
  231. } else {
  232. return false;
  233. }
  234. }
  235. switch (role) {
  236. case 0:
  237. //Just one broker allowed.
  238. return false;
  239. case 1:
  240. pubSubs.add(device);
  241. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  242. break;
  243. case 2:
  244. pubs.add(device);
  245. break;
  246. case 3:
  247. subs.add(device);
  248. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  249. break;
  250. default:
  251. // invalid role
  252. return false;
  253. }
  254. // Create packets for the connecting Client
  255. currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size()/2, device, broker));
  256. currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size()/2+1, broker, device));
  257. return true;
  258. }
  259. @Override
  260. public void removeDevice(Port device) {
  261. /**
  262. * true if the device was removed
  263. */
  264. boolean removedDevice=false;
  265. if (broker == device){
  266. broker = null;
  267. deletedConnectionLinks.add(new Pair<Port, Port>(new Port(device.getOwner(), (short)-1), deletedBrokerPort));
  268. updateBrokerOnDeletedConnections(device);
  269. }
  270. removedDevice|=pubSubs.remove(device);
  271. removedDevice|=subs.remove(device);
  272. removedDevice|=pubs.remove(device);
  273. //Remove Port from topics and clear its list
  274. LinkedList<String> oldTopics = subbedTopics.remove(device);
  275. if(oldTopics!=null)oldTopics.clear();
  276. if(removedDevice){
  277. if(broker == null){
  278. deletedConnectionLinks.add(new Pair<Port, Port>(deletedBrokerPort,device));
  279. }else{
  280. deletedConnectionLinks.add(new Pair<Port, Port>(broker, device));
  281. //If not the broker and device was removed -> disconnect
  282. currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size()/2, device, broker));
  283. }
  284. }
  285. }
  286. public void addTopic(String topic){
  287. topics.add(topic);
  288. }
  289. @Override
  290. public String getName() {
  291. return "MQTT";
  292. }
  293. @Override
  294. public int getRoleOfDevice(Port device){
  295. if(device == null)
  296. return -1;
  297. if(device == broker)
  298. return 0;
  299. if(pubSubs.contains(device))
  300. return 1;
  301. if(pubs.contains(device))
  302. return 2;
  303. if(subs.contains(device))
  304. return 3;
  305. return -1;
  306. }
  307. @Override
  308. public Collection<Port> getDevices(){
  309. LinkedList<Port> returnDevices = new LinkedList<Port>();
  310. if(broker!=null)
  311. returnDevices.add(broker);
  312. returnDevices.addAll(pubSubs);
  313. returnDevices.addAll(pubs);
  314. returnDevices.addAll(subs);
  315. return returnDevices;
  316. }
  317. @Override
  318. public byte getTopologyType() {
  319. return STAR;
  320. }
  321. @Override
  322. public Collection<Pair<Port,Port>> getTopology(){
  323. LinkedList<Pair<Port,Port>> topology = new LinkedList<Pair<Port,Port>>();
  324. Port center = broker;
  325. calcDeletedBrokerPosition();
  326. if(broker==null)
  327. center = new Port(deletedBroker, (short)-1);
  328. for(Port p: pubSubs)
  329. topology.add(new Pair<Port, Port>(center, p));
  330. for(Port p: pubs)
  331. topology.add(new Pair<Port, Port>(center, p));
  332. for(Port p: subs)
  333. topology.add(new Pair<Port, Port>(center, p));
  334. return topology;
  335. }
  336. @Override
  337. public Collection<Pair<Port, Port>> getDeletedTopology() {
  338. calcDeletedBrokerPosition();
  339. return deletedConnectionLinks;
  340. }
  341. /**
  342. * Calculate and update the position of the deleted Broker
  343. */
  344. private void calcDeletedBrokerPosition(){
  345. if(broker == null){
  346. int x = 0, y = 0, noP = 0;
  347. for(Port p: getDevices()){
  348. if(p!=null && p.getOwner()!=null){
  349. x += p.getOwner().getX();
  350. y += p.getOwner().getY();
  351. noP++;
  352. }
  353. }
  354. if(noP==0)
  355. return;
  356. deletedBroker.setX(((int)(x*1.0)/noP));
  357. deletedBroker.setY(((int)(y*1.0)/noP));
  358. }
  359. }
  360. /**
  361. * Update the broker port on the deleted Connections
  362. *
  363. * @param device old broker
  364. */
  365. private void updateBrokerOnDeletedConnections(Port device){
  366. for(Pair<Port, Port> p: deletedConnectionLinks){
  367. if(broker == null){
  368. if(p.getLeft() == device)
  369. p.setLeft(deletedBrokerPort);
  370. if(p.getRight() == device)
  371. p.setRight(deletedBrokerPort);
  372. }else{
  373. if(p.getLeft() == deletedBrokerPort)
  374. p.setLeft(broker);
  375. if(p.getRight() == deletedBrokerPort)
  376. p.setRight(broker);
  377. }
  378. }
  379. }
  380. }