MQTT_protocolDeepPackageTest.java 12 KB


  1. package packageTest.deepPackage.reallyDeepPackage;
  2. import java.util.Arrays;
  3. import java.util.Collection;
  4. import java.util.HashMap;
  5. import java.util.LinkedList;
  6. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
  7. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port;
  8. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol;
  9. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice;
  10. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet;
  11. import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair;
  12. /**
  13. * Implementation of the MQTT Protocol to generate packets for the simulation
  14. *
  15. *
  16. * @author Andreas T. Meyer-Berg
  17. */
  18. public class MQTT_protocolDeepPackageTest implements Protocol {
  19. /**
  20. * Broker which collects and distributes messages
  21. */
  22. private Port broker;
  23. /**
  24. * Publishers like sensors, which publish data
  25. */
  26. private LinkedList<Port> pubs;
  27. /**
  28. * Subscriber which subscribe to different Topics
  29. */
  30. private LinkedList<Port> subs;
  31. /**
  32. * Topics subscribed by each Port
  33. */
  34. private HashMap<Port,LinkedList<String>> subbedTopics = new HashMap<Port,LinkedList<String>>();
  35. /**
  36. * Devices that are Publisher and Subscriber and therefore send and receive
  37. * messages
  38. */
  39. private LinkedList<Port> pubSubs;
  40. /**
  41. * Packets which are currently being generated or were generated after the
  42. * last generatePackets Call
  43. */
  44. private LinkedList<Packet> currentPackets = new LinkedList<Packet>();
  45. private LinkedList<Pair<Port,Port>> deletedConnectionLinks = new LinkedList<Pair<Port,Port>>();
  46. private SmartDevice deletedBroker = new SmartDevice("DeletedBroker");
  47. private Port deletedBrokerPort = new Port(deletedBroker,(short)-1);
  48. /**
  49. * Topics of the MQTT Broker
  50. */
  51. private LinkedList<String> topics = new LinkedList<String>();
  52. /**
  53. * Creates a new MQTT Protocol
  54. */
  55. public MQTT_protocolDeepPackageTest() {
  56. this.broker = null;
  57. initialize();
  58. }
  59. /**
  60. * Creates a new MQTT Protocol
  61. *
  62. * @param broker broker of the protocol
  63. */
  64. public MQTT_protocolDeepPackageTest(Port broker) {
  65. this.broker = broker;
  66. initialize();
  67. }
  68. /**
  69. * Initializes the different fields
  70. */
  71. private void initialize() {
  72. topics.add("/home/temperatureHot");
  73. topics.add("/home/doorOpen");
  74. topics.add("/home/lightOn");
  75. subs = new LinkedList<Port>();
  76. pubs = new LinkedList<Port>();
  77. pubSubs = new LinkedList<Port>();
  78. }
  79. @Override
  80. public Collection<Packet> generateNextPakets(Port port, long timestep, boolean packetLost) {
  81. /**
  82. * Packets which will be generated
  83. */
  84. LinkedList<Packet> returnPackets = new LinkedList<Packet>();
  85. //Update all Timestamps of previous created Packets
  86. for(Packet p: currentPackets){
  87. p.setTimestamp(p.getTimestamp()+timestep);
  88. }
  89. //Add these packets to the return
  90. returnPackets.addAll(currentPackets);
  91. //remove packets from the old list
  92. currentPackets.clear();
  93. //Clear deleted connections
  94. deletedConnectionLinks.clear();
  95. //Return termination packets
  96. if(port==null)return returnPackets;
  97. /**
  98. * Update the lastTime the port was triggered
  99. */
  100. port.setLastTrigger(timestep);
  101. /**
  102. * if port null, skip this step
  103. */
  104. if(broker==null)return returnPackets;
  105. /**
  106. * Generate new Packets regarding to their class
  107. */
  108. if(port==broker){
  109. //Broker should not send new packages, without new messages
  110. //But could perform some Ping request to random participants
  111. Collection<Port> devices = getDevices();
  112. devices.remove(port);
  113. Port dest = null;
  114. java.util.Iterator<Port> it = devices.iterator();
  115. for(int i=0; i<(Math.random()*devices.size())&&it.hasNext();i++){
  116. dest = it.next();
  117. }
  118. if(dest != null){
  119. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
  120. if(dest.getStatus()!=Port.CLOSED)
  121. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, dest, port));
  122. }
  123. }else if(subs.contains(port)||pubSubs.contains(port)){
  124. //Subs can either subscribe to topics or unsubscribe
  125. /**
  126. * Topics, the SmartDevice is subscribed to
  127. */
  128. LinkedList<String> tops= subbedTopics.get(port);
  129. /**
  130. * Topic which should be subscribed or unsubscribed
  131. */
  132. String newTopic;
  133. //Subscribe if no subscriptions so far or not subscribed to all, with a probability of70%
  134. if(tops.size()==0||(tops.size()!=subbedTopics.size()&&Math.random()<0.7)){
  135. //Forced Subscribe
  136. newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
  137. returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
  138. if(broker.getStatus()!=Port.CLOSED){
  139. timestep+=broker.getResponseTime();
  140. returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
  141. tops.add(newTopic);
  142. }
  143. }
  144. else if(tops.size()==subbedTopics.size()){
  145. //Forced Unsubscribe
  146. newTopic = tops.get((int) Math.floor(Math.random()*tops.size()));
  147. returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
  148. if(broker.getStatus()!=Port.CLOSED){
  149. timestep+=broker.getResponseTime();
  150. returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBACK, timestep, broker, port));
  151. tops.remove(newTopic);
  152. }
  153. }
  154. if(pubSubs.contains(port)&&Math.random()<0.3){
  155. //When also Pub, publish sometimes
  156. newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
  157. returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "Topic:"+newTopic));
  158. if(broker.getStatus()!=Port.CLOSED){
  159. timestep+=broker.getResponseTime();
  160. returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
  161. }
  162. }
  163. }
  164. if(pubs.contains(port)||pubSubs.contains(port)&&Math.random()<0.3){
  165. String newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
  166. /**
  167. * Message to be published (Should be further specialized by a Sensor Device)
  168. */
  169. String msg = "Topic:"+newTopic+":"+(Math.random()<0.5?"true":"false");
  170. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, port, broker, msg));
  171. //Response
  172. timestep+=broker.getResponseTime();
  173. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
  174. //Publish to Subscribers
  175. //Should be be improved to just notify Subs that are subscribed to the topic
  176. if(broker.getStatus()!=Port.CLOSED){
  177. for(Port p:subs){
  178. if(!subbedTopics.get(p).contains(newTopic))continue;//Skip unsubbed ports
  179. timestep+=broker.getResponseTime();
  180. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
  181. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
  182. }
  183. for(Port p:pubSubs){
  184. if(!subbedTopics.get(p).contains(newTopic))continue;//skip unsubbed ports
  185. timestep+=broker.getResponseTime();
  186. returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
  187. returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
  188. }
  189. }
  190. }
  191. if(Math.random() < 0.05 && port != broker){
  192. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
  193. if(port.getStatus()!=Port.CLOSED)
  194. returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, broker, port));
  195. }
  196. return returnPackets;
  197. }
  198. @Override
  199. public int getNumberOfRoles() {
  200. return 4;
  201. }
  202. @Override
  203. public String[] getRoles() {
  204. // PublisherSubscriber is Publisher as well as Subscriber
  205. return new String[] { "Broker", "PublisherSubscriber", "Publisher", "Subscriber" };
  206. }
  207. @Override
  208. public Collection<Port> getDevicesWithRole(int role) {
  209. switch (role) {
  210. case 0:
  211. return new LinkedList<Port>(Arrays.asList(broker));
  212. case 1:
  213. return pubSubs;
  214. case 2:
  215. return pubs;
  216. case 3:
  217. return subs;
  218. default:
  219. return null;
  220. }
  221. }
  222. @Override
  223. public boolean addDeviceOfRole(Port device, int role) {
  224. /*
  225. * First device has to be the Broker
  226. */
  227. if (broker==null) {
  228. if (role == 0) {
  229. broker = device;
  230. updateBrokerOnDeletedConnections(null);
  231. return true;
  232. } else {
  233. return false;
  234. }
  235. }
  236. switch (role) {
  237. case 0:
  238. //Just one broker allowed.
  239. return false;
  240. case 1:
  241. pubSubs.add(device);
  242. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  243. break;
  244. case 2:
  245. pubs.add(device);
  246. break;
  247. case 3:
  248. subs.add(device);
  249. subbedTopics.putIfAbsent(device, new LinkedList<String>());
  250. break;
  251. default:
  252. // invalid role
  253. return false;
  254. }
  255. // Create packets for the connecting Client
  256. currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size()/2, device, broker));
  257. currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size()/2+1, broker, device));
  258. return true;
  259. }
  260. @Override
  261. public void removeDevice(Port device) {
  262. /**
  263. * true if the device was removed
  264. */
  265. boolean removedDevice=false;
  266. if (broker == device){
  267. broker = null;
  268. deletedConnectionLinks.add(new Pair<Port, Port>(new Port(device.getOwner(), (short)-1), deletedBrokerPort));
  269. updateBrokerOnDeletedConnections(device);
  270. }
  271. removedDevice|=pubSubs.remove(device);
  272. removedDevice|=subs.remove(device);
  273. removedDevice|=pubs.remove(device);
  274. //Remove Port from topics and clear its list
  275. LinkedList<String> oldTopics = subbedTopics.remove(device);
  276. if(oldTopics!=null)oldTopics.clear();
  277. if(removedDevice){
  278. if(broker == null){
  279. deletedConnectionLinks.add(new Pair<Port, Port>(deletedBrokerPort,device));
  280. }else{
  281. deletedConnectionLinks.add(new Pair<Port, Port>(broker, device));
  282. //If not the broker and device was removed -> disconnect
  283. currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size()/2, device, broker));
  284. }
  285. }
  286. }
  287. public void addTopic(String topic){
  288. topics.add(topic);
  289. }
  290. @Override
  291. public String getName() {
  292. return "MQTT";
  293. }
  294. @Override
  295. public int getRoleOfDevice(Port device){
  296. if(device == null)
  297. return -1;
  298. if(device == broker)
  299. return 0;
  300. if(pubSubs.contains(device))
  301. return 1;
  302. if(pubs.contains(device))
  303. return 2;
  304. if(subs.contains(device))
  305. return 3;
  306. return -1;
  307. }
  308. @Override
  309. public Collection<Port> getDevices(){
  310. LinkedList<Port> returnDevices = new LinkedList<Port>();
  311. if(broker!=null)
  312. returnDevices.add(broker);
  313. returnDevices.addAll(pubSubs);
  314. returnDevices.addAll(pubs);
  315. returnDevices.addAll(subs);
  316. return returnDevices;
  317. }
  318. @Override
  319. public byte getTopologyType() {
  320. return STAR;
  321. }
  322. @Override
  323. public Collection<Pair<Port,Port>> getTopology(){
  324. LinkedList<Pair<Port,Port>> topology = new LinkedList<Pair<Port,Port>>();
  325. Port center = broker;
  326. calcDeletedBrokerPosition();
  327. if(broker==null)
  328. center = new Port(deletedBroker, (short)-1);
  329. for(Port p: pubSubs)
  330. topology.add(new Pair<Port, Port>(center, p));
  331. for(Port p: pubs)
  332. topology.add(new Pair<Port, Port>(center, p));
  333. for(Port p: subs)
  334. topology.add(new Pair<Port, Port>(center, p));
  335. return topology;
  336. }
  337. @Override
  338. public Collection<Pair<Port, Port>> getDeletedTopology() {
  339. calcDeletedBrokerPosition();
  340. return deletedConnectionLinks;
  341. }
  342. /**
  343. * Calculate and update the position of the deleted Broker
  344. */
  345. private void calcDeletedBrokerPosition(){
  346. if(broker == null){
  347. int x = 0, y = 0, noP = 0;
  348. for(Port p: getDevices()){
  349. if(p!=null && p.getOwner()!=null){
  350. x += p.getOwner().getX();
  351. y += p.getOwner().getY();
  352. noP++;
  353. }
  354. }
  355. if(noP==0)
  356. return;
  357. deletedBroker.setX(((int)(x*1.0)/noP));
  358. deletedBroker.setY(((int)(y*1.0)/noP));
  359. }
  360. }
  361. /**
  362. * Update the broker port on the deleted Connections
  363. *
  364. * @param device old broker
  365. */
  366. private void updateBrokerOnDeletedConnections(Port device){
  367. for(Pair<Port, Port> p: deletedConnectionLinks){
  368. if(broker == null){
  369. if(p.getLeft() == device)
  370. p.setLeft(deletedBrokerPort);
  371. if(p.getRight() == device)
  372. p.setRight(deletedBrokerPort);
  373. }else{
  374. if(p.getLeft() == deletedBrokerPort)
  375. p.setLeft(broker);
  376. if(p.getRight() == deletedBrokerPort)
  377. p.setRight(broker);
  378. }
  379. }
  380. }
  381. }