|
@@ -3,12 +3,18 @@ package de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
+import java.util.Iterator;
|
|
import java.util.LinkedList;
|
|
import java.util.LinkedList;
|
|
|
|
+import java.util.Random;
|
|
|
|
|
|
import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
|
|
import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
|
|
import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port;
|
|
import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port;
|
|
import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol;
|
|
import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol;
|
|
import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice;
|
|
import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice;
|
|
|
|
+import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolCollector;
|
|
|
|
+import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolSensor;
|
|
|
|
+import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatCollector;
|
|
|
|
+import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatSensor;
|
|
import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet;
|
|
import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet;
|
|
import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair;
|
|
import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair;
|
|
|
|
|
|
@@ -34,11 +40,11 @@ public class MQTT_protocol implements Protocol {
|
|
* Subscriber which subscribe to different Topics
|
|
* Subscriber which subscribe to different Topics
|
|
*/
|
|
*/
|
|
private LinkedList<Port> subs;
|
|
private LinkedList<Port> subs;
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Topics subscribed by each Port
|
|
* Topics subscribed by each Port
|
|
*/
|
|
*/
|
|
- private HashMap<Port,LinkedList<String>> subbedTopics = new HashMap<Port,LinkedList<String>>();
|
|
|
|
|
|
+ private HashMap<Port, LinkedList<String>> subbedTopics = new HashMap<Port, LinkedList<String>>();
|
|
|
|
|
|
/**
|
|
/**
|
|
* Devices that are Publisher and Subscriber and therefore send and receive
|
|
* Devices that are Publisher and Subscriber and therefore send and receive
|
|
@@ -51,67 +57,69 @@ public class MQTT_protocol implements Protocol {
|
|
* last generatePackets Call
|
|
* last generatePackets Call
|
|
*/
|
|
*/
|
|
private LinkedList<Packet> currentPackets = new LinkedList<Packet>();
|
|
private LinkedList<Packet> currentPackets = new LinkedList<Packet>();
|
|
-
|
|
|
|
- private LinkedList<Pair<Port,Port>> deletedConnectionLinks = new LinkedList<Pair<Port,Port>>();
|
|
|
|
|
|
+
|
|
|
|
+ // For Visualization in case of deleting the broker
|
|
|
|
+ private LinkedList<Pair<Port, Port>> deletedConnectionLinks = new LinkedList<Pair<Port, Port>>();
|
|
private SmartDevice deletedBroker = new SmartDevice("DeletedBroker");
|
|
private SmartDevice deletedBroker = new SmartDevice("DeletedBroker");
|
|
- private Port deletedBrokerPort = new Port(deletedBroker,(short)-1);
|
|
|
|
-
|
|
|
|
|
|
+ private Port deletedBrokerPort = new Port(deletedBroker, (short) -1);
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Topics of the MQTT Broker
|
|
* Topics of the MQTT Broker
|
|
*/
|
|
*/
|
|
- private LinkedList<String> topics = new LinkedList<String>();
|
|
|
|
|
|
+ // private LinkedList<String> topics = new LinkedList<String>();
|
|
|
|
|
|
/**
|
|
/**
|
|
* Creates a new MQTT Protocol
|
|
* Creates a new MQTT Protocol
|
|
*/
|
|
*/
|
|
public MQTT_protocol() {
|
|
public MQTT_protocol() {
|
|
this.broker = null;
|
|
this.broker = null;
|
|
- initialize();
|
|
|
|
-
|
|
|
|
|
|
+ initialize();
|
|
|
|
+
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Creates a new MQTT Protocol
|
|
* Creates a new MQTT Protocol
|
|
*
|
|
*
|
|
- * @param broker broker of the protocol
|
|
|
|
|
|
+ * @param broker
|
|
|
|
+ * broker of the protocol
|
|
*/
|
|
*/
|
|
public MQTT_protocol(Port broker) {
|
|
public MQTT_protocol(Port broker) {
|
|
this.broker = broker;
|
|
this.broker = broker;
|
|
initialize();
|
|
initialize();
|
|
-
|
|
|
|
|
|
+
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Initializes the different fields
|
|
* Initializes the different fields
|
|
*/
|
|
*/
|
|
private void initialize() {
|
|
private void initialize() {
|
|
- topics.add("/home/temperatureHot");
|
|
|
|
- topics.add("/home/doorOpen");
|
|
|
|
- topics.add("/home/lightOn");
|
|
|
|
- subs = new LinkedList<Port>();
|
|
|
|
- pubs = new LinkedList<Port>();
|
|
|
|
- pubSubs = new LinkedList<Port>();
|
|
|
|
|
|
+ subs = new LinkedList<Port>();
|
|
|
|
+ pubs = new LinkedList<Port>();
|
|
|
|
+ pubSubs = new LinkedList<Port>();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public Collection<Packet> generateNextPackets(Port port, long timestep, boolean packetLost) {
|
|
public Collection<Packet> generateNextPackets(Port port, long timestep, boolean packetLost) {
|
|
/**
|
|
/**
|
|
* Packets which will be generated
|
|
* Packets which will be generated
|
|
*/
|
|
*/
|
|
LinkedList<Packet> returnPackets = new LinkedList<Packet>();
|
|
LinkedList<Packet> returnPackets = new LinkedList<Packet>();
|
|
- //Update all Timestamps of previous created Packets
|
|
|
|
- for(Packet p: currentPackets){
|
|
|
|
- p.setTimestamp(p.getTimestamp()+timestep);
|
|
|
|
|
|
+ // Update all Timestamps of previous created Packets
|
|
|
|
+ for (Packet p : currentPackets) {
|
|
|
|
+ p.setTimestamp(p.getTimestamp() + timestep);
|
|
}
|
|
}
|
|
- //Add these packets to the return
|
|
|
|
|
|
+ // Add these packets to the return
|
|
returnPackets.addAll(currentPackets);
|
|
returnPackets.addAll(currentPackets);
|
|
- //remove packets from the old list
|
|
|
|
|
|
+ // remove packets from the old list
|
|
currentPackets.clear();
|
|
currentPackets.clear();
|
|
- //Clear deleted connections
|
|
|
|
|
|
+ // Clear deleted connections
|
|
deletedConnectionLinks.clear();
|
|
deletedConnectionLinks.clear();
|
|
- //Return termination packets
|
|
|
|
- if(port==null)return returnPackets;
|
|
|
|
-
|
|
|
|
|
|
+ // Return termination packets
|
|
|
|
+ if (port == null)
|
|
|
|
+ return returnPackets;
|
|
|
|
+ SmartDevice device = port.getOwner();
|
|
|
|
+ if (device == null)
|
|
|
|
+ return returnPackets;
|
|
/**
|
|
/**
|
|
* Update the lastTime the port was triggered
|
|
* Update the lastTime the port was triggered
|
|
*/
|
|
*/
|
|
@@ -119,131 +127,249 @@ public class MQTT_protocol implements Protocol {
|
|
/**
|
|
/**
|
|
* if port null, skip this step
|
|
* if port null, skip this step
|
|
*/
|
|
*/
|
|
- if(broker==null)return returnPackets;
|
|
|
|
-
|
|
|
|
|
|
+ if (broker == null)
|
|
|
|
+ return returnPackets;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Generate new Packets regarding to their class
|
|
* Generate new Packets regarding to their class
|
|
*/
|
|
*/
|
|
- if(port==broker){
|
|
|
|
- //Broker should not send new packages, without new messages
|
|
|
|
- //But could perform some Ping request to random participants
|
|
|
|
|
|
+ if (port == broker) {
|
|
|
|
+ // Broker should not send new packages, without new messages
|
|
|
|
+ // But could perform some Ping request to random participants
|
|
Collection<Port> devices = getDevices();
|
|
Collection<Port> devices = getDevices();
|
|
devices.remove(port);
|
|
devices.remove(port);
|
|
Port dest = null;
|
|
Port dest = null;
|
|
- java.util.Iterator<Port> it = devices.iterator();
|
|
|
|
- for(int i=0; i<(Math.random()*devices.size())&&it.hasNext();i++){
|
|
|
|
|
|
+ Iterator<Port> it = devices.iterator();
|
|
|
|
+ for (int i = 0; i < (Math.random() * devices.size()) && it.hasNext(); i++) {
|
|
dest = it.next();
|
|
dest = it.next();
|
|
}
|
|
}
|
|
- if(dest != null){
|
|
|
|
|
|
+ if (dest != null) {
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
|
|
/**
|
|
/**
|
|
* Delay to the destination
|
|
* Delay to the destination
|
|
*/
|
|
*/
|
|
long delayBrokerDest = broker.getTransmissionDelayTo(dest);
|
|
long delayBrokerDest = broker.getTransmissionDelayTo(dest);
|
|
- if(dest.getStatus()!=Port.CLOSED && delayBrokerDest!=Long.MAX_VALUE)
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP, timestep+delayBrokerDest+dest.getJitter()+dest.getResponseTime(), dest, port));
|
|
|
|
|
|
+ if (dest.getStatus() != Port.CLOSED && delayBrokerDest != Long.MAX_VALUE)
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
|
|
|
|
+ timestep + delayBrokerDest + dest.getJitter() + dest.getResponseTime(), dest, port));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if(subs.contains(port)||pubSubs.contains(port)){
|
|
|
|
-
|
|
|
|
- //Subs can either subscribe to topics or unsubscribe
|
|
|
|
|
|
+ if (subs.contains(port) || pubSubs.contains(port)) {
|
|
|
|
+ // Subs can either subscribe to topics or unsubscribe
|
|
/**
|
|
/**
|
|
* Topics, the SmartDevice is subscribed to
|
|
* Topics, the SmartDevice is subscribed to
|
|
*/
|
|
*/
|
|
- LinkedList<String> tops= subbedTopics.get(port);
|
|
|
|
-
|
|
|
|
|
|
+ LinkedList<String> tops = subbedTopics.get(port);
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Topic which should be subscribed to
|
|
|
|
+ */
|
|
|
|
+ String newTopic = null;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
- * Topic which should be subscribed or unsubscribed
|
|
|
|
|
|
+ * Check if FloatCollector & not subscribed
|
|
*/
|
|
*/
|
|
- String newTopic;
|
|
|
|
- //Subscribe if no subscriptions so far or not subscribed to all, with a probability of70%
|
|
|
|
- if(tops.size()==0||(tops.size()!=topics.size()&&Math.random()<0.7)){
|
|
|
|
- //Forced Subscribe
|
|
|
|
- LinkedList<String> available = new LinkedList<String>();
|
|
|
|
- available.addAll(topics);
|
|
|
|
- available.removeAll(tops);
|
|
|
|
-
|
|
|
|
- newTopic = available.get((int) Math.floor(Math.random()*available.size()));
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
|
|
|
|
-
|
|
|
|
|
|
+ if (device instanceof FloatCollector) {
|
|
|
|
+ FloatCollector fCollector = (FloatCollector) device;
|
|
|
|
+ if (!tops.contains(fCollector.getFCinfoName()))
|
|
|
|
+ newTopic = fCollector.getFCinfoName();
|
|
|
|
+ }
|
|
|
|
+ /**
|
|
|
|
+ * Check if BoolCollector & not subscribed
|
|
|
|
+ */
|
|
|
|
+ if (device instanceof BoolCollector) {
|
|
|
|
+ BoolCollector bCollector = (BoolCollector) device;
|
|
|
|
+ if (!tops.contains(bCollector.getBCinfoName()))
|
|
|
|
+ newTopic = bCollector.getBCinfoName();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Subscribe if no subscriptions so far or not subscribed to all,
|
|
|
|
+ // with a probability of70%
|
|
|
|
+ if (newTopic != null) {
|
|
|
|
+ /**
|
|
|
|
+ * Subscribe Request
|
|
|
|
+ */
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Delay to the broker
|
|
* Delay to the broker
|
|
*/
|
|
*/
|
|
long delayPortToBroker = port.getTransmissionDelayTo(broker);
|
|
long delayPortToBroker = port.getTransmissionDelayTo(broker);
|
|
-
|
|
|
|
- if(broker.getStatus()!=Port.CLOSED && delayPortToBroker!=Long.MAX_VALUE){
|
|
|
|
- timestep+=broker.getResponseTime();
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep+delayPortToBroker, broker, port));
|
|
|
|
|
|
+
|
|
|
|
+ if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
|
|
|
|
+ timestep += broker.getResponseTime();
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep + delayPortToBroker, broker, port));
|
|
tops.add(newTopic);
|
|
tops.add(newTopic);
|
|
}
|
|
}
|
|
- }
|
|
|
|
- else if(tops.size()==topics.size()){
|
|
|
|
- //Forced Unsubscribe
|
|
|
|
- newTopic = tops.get((int) Math.floor(Math.random()*tops.size()));
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
|
|
|
|
|
|
+ } else {
|
|
/**
|
|
/**
|
|
- * Delay to the broker
|
|
|
|
|
|
+ * Check if some topics should be unsubscribed (e.g., one topic
|
|
|
|
+ * name change)
|
|
*/
|
|
*/
|
|
- long delayPortToBroker = port.getTransmissionDelayTo(broker);
|
|
|
|
-
|
|
|
|
- if(broker.getStatus()!=Port.CLOSED && delayPortToBroker!=Long.MAX_VALUE){
|
|
|
|
- timestep+=broker.getResponseTime();
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBACK, timestep+delayPortToBroker, broker, port));
|
|
|
|
- tops.remove(newTopic);
|
|
|
|
|
|
+ newTopic = null;
|
|
|
|
+ LinkedList<String> oldTopics = new LinkedList<String>(tops);
|
|
|
|
+ if (!oldTopics.isEmpty()) {
|
|
|
|
+ /**
|
|
|
|
+ * Check if FloatCollector & not subscribed
|
|
|
|
+ */
|
|
|
|
+ if (device instanceof FloatCollector) {
|
|
|
|
+ FloatCollector fCollector = (FloatCollector) device;
|
|
|
|
+ oldTopics.remove(fCollector.getFCinfoName());
|
|
|
|
+ }
|
|
|
|
+ /**
|
|
|
|
+ * Check if BoolCollector & not subscribed
|
|
|
|
+ */
|
|
|
|
+ if (device instanceof BoolCollector) {
|
|
|
|
+ BoolCollector bCollector = (BoolCollector) device;
|
|
|
|
+ oldTopics.remove(bCollector.getBCinfoName());
|
|
|
|
+ }
|
|
|
|
+ if (!oldTopics.isEmpty()) {
|
|
|
|
+ newTopic = oldTopics.getFirst();
|
|
|
|
+ /**
|
|
|
|
+ * Send Unsubscribe Packet
|
|
|
|
+ */
|
|
|
|
+ returnPackets.add(
|
|
|
|
+ new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
|
|
|
|
+ /**
|
|
|
|
+ * Delay to the broker
|
|
|
|
+ */
|
|
|
|
+ long delayPortToBroker = port.getTransmissionDelayTo(broker);
|
|
|
|
+ /**
|
|
|
|
+ * Ackknowledgement
|
|
|
|
+ */
|
|
|
|
+ if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
|
|
|
|
+ timestep += broker.getResponseTime();
|
|
|
|
+ returnPackets.add(
|
|
|
|
+ new MQTT_packet(MQTT_packet.UNSUBACK, timestep + delayPortToBroker, broker, port));
|
|
|
|
+ tops.remove(newTopic);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if(pubs.contains(port)||pubSubs.contains(port)&&Math.random()<0.3){
|
|
|
|
- String newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
|
|
|
|
|
|
+
|
|
|
|
+ if (pubs.contains(port) || pubSubs.contains(port) && Math.random() < 0.3) {
|
|
/**
|
|
/**
|
|
- * Message to be published (Should be further specialized by a Sensor Device)
|
|
|
|
|
|
+ * Topic which should be published to
|
|
*/
|
|
*/
|
|
- String msg = "Topic:"+newTopic+":"+(Math.random()<0.5?"true":"false");
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, port, broker, msg));
|
|
|
|
-
|
|
|
|
|
|
+ String newTopic = null;
|
|
/**
|
|
/**
|
|
- * Delay to the broker
|
|
|
|
|
|
+ * Value which should be published
|
|
*/
|
|
*/
|
|
- long delayPortToBroker = port.getTransmissionDelayTo(broker);
|
|
|
|
- //Publish to Subscribers
|
|
|
|
- //Should be be improved to just notify Subs that are subscribed to the topic
|
|
|
|
- if(broker.getStatus()!=Port.CLOSED && delayPortToBroker!=Long.MAX_VALUE){
|
|
|
|
- //Response
|
|
|
|
- timestep+=broker.getResponseTime()+delayPortToBroker;
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
|
|
|
|
-
|
|
|
|
- for(Port p:subs){
|
|
|
|
- if(!subbedTopics.get(p).contains(newTopic))continue;//Skip unsubbed ports
|
|
|
|
- /**
|
|
|
|
- * Delay broker to subscriber
|
|
|
|
- */
|
|
|
|
- long delayBrokerToSub = broker.getTransmissionDelayTo(p);
|
|
|
|
- timestep+=broker.getResponseTime();
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
|
|
|
|
- if(p.getStatus()!=Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE)
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
|
|
|
|
|
|
+ String newValue = null;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Check if FloatSensor
|
|
|
|
+ */
|
|
|
|
+ if (device instanceof FloatSensor) {
|
|
|
|
+ FloatSensor fSensor = (FloatSensor) device;
|
|
|
|
+ newTopic = fSensor.getFSinfoName();
|
|
|
|
+ newValue = "" + fSensor.getFSval();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Check if BoolSensor - if also FloatSensor: 50% chance of
|
|
|
|
+ * overriding
|
|
|
|
+ */
|
|
|
|
+ if (device instanceof BoolSensor) {
|
|
|
|
+ BoolSensor bSensor = (BoolSensor) device;
|
|
|
|
+ if (newTopic == null || new Random().nextBoolean()) {
|
|
|
|
+ newTopic = bSensor.getBSinfoName();
|
|
|
|
+ newValue = "" + bSensor.getBSval();
|
|
}
|
|
}
|
|
- for(Port p:pubSubs){
|
|
|
|
- if(!subbedTopics.get(p).contains(newTopic))continue;//skip unsubbed ports
|
|
|
|
- timestep+=broker.getResponseTime();
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (newTopic != null) {
|
|
|
|
+ /**
|
|
|
|
+ * Message to be published
|
|
|
|
+ */
|
|
|
|
+ String msg = "Topic:" + newTopic + ":" + newValue;
|
|
|
|
+ /**
|
|
|
|
+ * Send Packet
|
|
|
|
+ */
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, port, broker, msg));
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Delay to the broker
|
|
|
|
+ */
|
|
|
|
+ long delayPortToBroker = port.getTransmissionDelayTo(broker);
|
|
|
|
+
|
|
|
|
+ // Publish to Subscribers
|
|
|
|
+ if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
|
|
/**
|
|
/**
|
|
- * Delay broker to subscriber
|
|
|
|
|
|
+ * Response/Acknowledgement
|
|
*/
|
|
*/
|
|
- long delayBrokerToSub = broker.getTransmissionDelayTo(p);
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
|
|
|
|
- if(p.getStatus()!=Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE)
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
|
|
|
|
|
|
+ timestep += broker.getResponseTime() + delayPortToBroker;
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
|
|
|
|
+
|
|
|
|
+ for (Port p : subs) {
|
|
|
|
+ /**
|
|
|
|
+ * Skip unsubcribed ports
|
|
|
|
+ */
|
|
|
|
+ if (!subbedTopics.get(p).contains(newTopic))
|
|
|
|
+ continue;
|
|
|
|
+ /**
|
|
|
|
+ * Delay broker to subscriber
|
|
|
|
+ */
|
|
|
|
+ long delayBrokerToSub = broker.getTransmissionDelayTo(p);
|
|
|
|
+ timestep += broker.getResponseTime();
|
|
|
|
+
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
|
|
|
|
+ if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
|
|
|
|
+ returnPackets.add(
|
|
|
|
+ new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
|
|
|
|
+ // Update Collector
|
|
|
|
+ if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
|
|
|
|
+ && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
|
|
|
|
+ ((FloatCollector) p.getOwner()).setFCval(((FloatSensor) device).getFSval());
|
|
|
|
+ } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
|
|
|
|
+ && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
|
|
|
|
+ ((BoolCollector) p.getOwner()).setBCval(((BoolSensor) device).getBSval());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ for (Port p : pubSubs) {
|
|
|
|
+ /**
|
|
|
|
+ * Skip unsubscribed Ports
|
|
|
|
+ */
|
|
|
|
+ if (!subbedTopics.get(p).contains(newTopic) || p.getOwner() == null)
|
|
|
|
+ continue;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Delay broker to subscriber
|
|
|
|
+ */
|
|
|
|
+ long delayBrokerToSub = broker.getTransmissionDelayTo(p);
|
|
|
|
+ timestep += broker.getResponseTime();
|
|
|
|
+
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
|
|
|
|
+ if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
|
|
|
|
+ returnPackets.add(
|
|
|
|
+ new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
|
|
|
|
+ // Update Collector
|
|
|
|
+ if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
|
|
|
|
+ && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
|
|
|
|
+ ((FloatCollector) p.getOwner()).setFCval(((FloatSensor) device).getFSval());
|
|
|
|
+ } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
|
|
|
|
+ && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
|
|
|
|
+ ((BoolCollector) p.getOwner()).setBCval(((BoolSensor) device).getBSval());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if(Math.random() < 0.05 && port != broker){
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Rare Ping request to broker
|
|
|
|
+ */
|
|
|
|
+ if (Math.random() < 0.05 && port != broker) {
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
|
|
/**
|
|
/**
|
|
* Delay broker to subscriber
|
|
* Delay broker to subscriber
|
|
*/
|
|
*/
|
|
long delayPortToBroker = port.getTransmissionDelayTo(broker);
|
|
long delayPortToBroker = port.getTransmissionDelayTo(broker);
|
|
- if(broker.getStatus()!=Port.CLOSED && delayPortToBroker != Long.MAX_VALUE)
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP, timestep+delayPortToBroker+broker.getResponseTime(), broker, port));
|
|
|
|
|
|
+ if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE)
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
|
|
|
|
+ timestep + delayPortToBroker + broker.getResponseTime(), broker, port));
|
|
}
|
|
}
|
|
return returnPackets;
|
|
return returnPackets;
|
|
}
|
|
}
|
|
@@ -279,9 +405,9 @@ public class MQTT_protocol implements Protocol {
|
|
@Override
|
|
@Override
|
|
public boolean addDeviceOfRole(Port device, int role) {
|
|
public boolean addDeviceOfRole(Port device, int role) {
|
|
/*
|
|
/*
|
|
- * First device has to be the Broker
|
|
|
|
|
|
+ * First device has to be the Broker
|
|
*/
|
|
*/
|
|
- if (broker==null) {
|
|
|
|
|
|
+ if (broker == null) {
|
|
if (role == 0) {
|
|
if (role == 0) {
|
|
broker = device;
|
|
broker = device;
|
|
updateBrokerOnDeletedConnections(null);
|
|
updateBrokerOnDeletedConnections(null);
|
|
@@ -292,7 +418,7 @@ public class MQTT_protocol implements Protocol {
|
|
}
|
|
}
|
|
switch (role) {
|
|
switch (role) {
|
|
case 0:
|
|
case 0:
|
|
- //Just one broker allowed.
|
|
|
|
|
|
+ // Just one broker allowed.
|
|
return false;
|
|
return false;
|
|
case 1:
|
|
case 1:
|
|
pubSubs.add(device);
|
|
pubSubs.add(device);
|
|
@@ -310,8 +436,8 @@ public class MQTT_protocol implements Protocol {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
// Create packets for the connecting Client
|
|
// Create packets for the connecting Client
|
|
- currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size()/2, device, broker));
|
|
|
|
- currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size()/2+1, broker, device));
|
|
|
|
|
|
+ currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size() / 2, device, broker));
|
|
|
|
+ currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size() / 2 + 1, broker, device));
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -320,81 +446,79 @@ public class MQTT_protocol implements Protocol {
|
|
/**
|
|
/**
|
|
* true if the device was removed
|
|
* true if the device was removed
|
|
*/
|
|
*/
|
|
- boolean removedDevice=false;
|
|
|
|
- if (broker == device){
|
|
|
|
|
|
+ boolean removedDevice = false;
|
|
|
|
+ if (broker == device) {
|
|
broker = null;
|
|
broker = null;
|
|
- deletedConnectionLinks.add(new Pair<Port, Port>(new Port(device.getOwner(), (short)-1), deletedBrokerPort));
|
|
|
|
|
|
+ deletedConnectionLinks
|
|
|
|
+ .add(new Pair<Port, Port>(new Port(device.getOwner(), (short) -1), deletedBrokerPort));
|
|
updateBrokerOnDeletedConnections(device);
|
|
updateBrokerOnDeletedConnections(device);
|
|
}
|
|
}
|
|
- removedDevice|=pubSubs.remove(device);
|
|
|
|
- removedDevice|=subs.remove(device);
|
|
|
|
- removedDevice|=pubs.remove(device);
|
|
|
|
- //Remove Port from topics and clear its list
|
|
|
|
|
|
+ removedDevice |= pubSubs.remove(device);
|
|
|
|
+ removedDevice |= subs.remove(device);
|
|
|
|
+ removedDevice |= pubs.remove(device);
|
|
|
|
+ // Remove Port from topics and clear its list
|
|
LinkedList<String> oldTopics = subbedTopics.remove(device);
|
|
LinkedList<String> oldTopics = subbedTopics.remove(device);
|
|
- if(oldTopics!=null)oldTopics.clear();
|
|
|
|
- if(removedDevice){
|
|
|
|
- if(broker == null){
|
|
|
|
- deletedConnectionLinks.add(new Pair<Port, Port>(deletedBrokerPort,device));
|
|
|
|
- }else{
|
|
|
|
|
|
+ if (oldTopics != null)
|
|
|
|
+ oldTopics.clear();
|
|
|
|
+ if (removedDevice) {
|
|
|
|
+ if (broker == null) {
|
|
|
|
+ deletedConnectionLinks.add(new Pair<Port, Port>(deletedBrokerPort, device));
|
|
|
|
+ } else {
|
|
deletedConnectionLinks.add(new Pair<Port, Port>(broker, device));
|
|
deletedConnectionLinks.add(new Pair<Port, Port>(broker, device));
|
|
- //If not the broker and device was removed -> disconnect
|
|
|
|
- currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size()/2, device, broker));
|
|
|
|
|
|
+ // If not the broker and device was removed -> disconnect
|
|
|
|
+ currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size() / 2, device, broker));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- public void addTopic(String topic){
|
|
|
|
- topics.add(topic);
|
|
|
|
- }
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public String getName() {
|
|
public String getName() {
|
|
return "MQTT";
|
|
return "MQTT";
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
- public int getRoleOfDevice(Port device){
|
|
|
|
- if(device == null)
|
|
|
|
|
|
+ public int getRoleOfDevice(Port device) {
|
|
|
|
+ if (device == null)
|
|
return -1;
|
|
return -1;
|
|
- if(device == broker)
|
|
|
|
|
|
+ if (device == broker)
|
|
return 0;
|
|
return 0;
|
|
- if(pubSubs.contains(device))
|
|
|
|
|
|
+ if (pubSubs.contains(device))
|
|
return 1;
|
|
return 1;
|
|
- if(pubs.contains(device))
|
|
|
|
|
|
+ if (pubs.contains(device))
|
|
return 2;
|
|
return 2;
|
|
- if(subs.contains(device))
|
|
|
|
|
|
+ if (subs.contains(device))
|
|
return 3;
|
|
return 3;
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
- public Collection<Port> getDevices(){
|
|
|
|
|
|
+ public Collection<Port> getDevices() {
|
|
LinkedList<Port> returnDevices = new LinkedList<Port>();
|
|
LinkedList<Port> returnDevices = new LinkedList<Port>();
|
|
- if(broker!=null)
|
|
|
|
|
|
+ if (broker != null)
|
|
returnDevices.add(broker);
|
|
returnDevices.add(broker);
|
|
returnDevices.addAll(pubSubs);
|
|
returnDevices.addAll(pubSubs);
|
|
returnDevices.addAll(pubs);
|
|
returnDevices.addAll(pubs);
|
|
returnDevices.addAll(subs);
|
|
returnDevices.addAll(subs);
|
|
return returnDevices;
|
|
return returnDevices;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public byte getTopologyType() {
|
|
public byte getTopologyType() {
|
|
return STAR;
|
|
return STAR;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
- public Collection<Pair<Port,Port>> getTopology(){
|
|
|
|
- LinkedList<Pair<Port,Port>> topology = new LinkedList<Pair<Port,Port>>();
|
|
|
|
|
|
+ public Collection<Pair<Port, Port>> getTopology() {
|
|
|
|
+ LinkedList<Pair<Port, Port>> topology = new LinkedList<Pair<Port, Port>>();
|
|
Port center = broker;
|
|
Port center = broker;
|
|
calcDeletedBrokerPosition();
|
|
calcDeletedBrokerPosition();
|
|
- if(broker==null)
|
|
|
|
- center = new Port(deletedBroker, (short)-1);
|
|
|
|
- for(Port p: pubSubs)
|
|
|
|
|
|
+ if (broker == null)
|
|
|
|
+ center = new Port(deletedBroker, (short) -1);
|
|
|
|
+ for (Port p : pubSubs)
|
|
topology.add(new Pair<Port, Port>(center, p));
|
|
topology.add(new Pair<Port, Port>(center, p));
|
|
- for(Port p: pubs)
|
|
|
|
|
|
+ for (Port p : pubs)
|
|
topology.add(new Pair<Port, Port>(center, p));
|
|
topology.add(new Pair<Port, Port>(center, p));
|
|
- for(Port p: subs)
|
|
|
|
|
|
+ for (Port p : subs)
|
|
topology.add(new Pair<Port, Port>(center, p));
|
|
topology.add(new Pair<Port, Port>(center, p));
|
|
return topology;
|
|
return topology;
|
|
}
|
|
}
|
|
@@ -404,45 +528,46 @@ public class MQTT_protocol implements Protocol {
|
|
calcDeletedBrokerPosition();
|
|
calcDeletedBrokerPosition();
|
|
return deletedConnectionLinks;
|
|
return deletedConnectionLinks;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Calculate and update the position of the deleted Broker
|
|
* Calculate and update the position of the deleted Broker
|
|
*/
|
|
*/
|
|
- private void calcDeletedBrokerPosition(){
|
|
|
|
- if(broker == null){
|
|
|
|
|
|
+ private void calcDeletedBrokerPosition() {
|
|
|
|
+ if (broker == null) {
|
|
int x = 0, y = 0, noP = 0;
|
|
int x = 0, y = 0, noP = 0;
|
|
- for(Port p: getDevices()){
|
|
|
|
- if(p!=null && p.getOwner()!=null){
|
|
|
|
|
|
+ for (Port p : getDevices()) {
|
|
|
|
+ if (p != null && p.getOwner() != null) {
|
|
x += p.getOwner().getX();
|
|
x += p.getOwner().getX();
|
|
y += p.getOwner().getY();
|
|
y += p.getOwner().getY();
|
|
noP++;
|
|
noP++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if(noP==0)
|
|
|
|
|
|
+ if (noP == 0)
|
|
return;
|
|
return;
|
|
- deletedBroker.setX(((int)(x*1.0)/noP));
|
|
|
|
- deletedBroker.setY(((int)(y*1.0)/noP));
|
|
|
|
|
|
+ deletedBroker.setX(((int) (x * 1.0) / noP));
|
|
|
|
+ deletedBroker.setY(((int) (y * 1.0) / noP));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Update the broker port on the deleted Connections
|
|
* Update the broker port on the deleted Connections
|
|
*
|
|
*
|
|
- * @param device old broker
|
|
|
|
|
|
+ * @param device
|
|
|
|
+ * old broker
|
|
*/
|
|
*/
|
|
- private void updateBrokerOnDeletedConnections(Port device){
|
|
|
|
- for(Pair<Port, Port> p: deletedConnectionLinks){
|
|
|
|
- if(broker == null){
|
|
|
|
- if(p.getLeft() == device)
|
|
|
|
|
|
+ private void updateBrokerOnDeletedConnections(Port device) {
|
|
|
|
+ for (Pair<Port, Port> p : deletedConnectionLinks) {
|
|
|
|
+ if (broker == null) {
|
|
|
|
+ if (p.getLeft() == device)
|
|
p.setLeft(deletedBrokerPort);
|
|
p.setLeft(deletedBrokerPort);
|
|
- if(p.getRight() == device)
|
|
|
|
|
|
+ if (p.getRight() == device)
|
|
p.setRight(deletedBrokerPort);
|
|
p.setRight(deletedBrokerPort);
|
|
- }else{
|
|
|
|
- if(p.getLeft() == deletedBrokerPort)
|
|
|
|
|
|
+ } else {
|
|
|
|
+ if (p.getLeft() == deletedBrokerPort)
|
|
p.setLeft(broker);
|
|
p.setLeft(broker);
|
|
- if(p.getRight() == deletedBrokerPort)
|
|
|
|
|
|
+ if (p.getRight() == deletedBrokerPort)
|
|
p.setRight(broker);
|
|
p.setRight(broker);
|
|
-
|
|
|
|
|
|
+
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|