|
@@ -136,8 +136,12 @@ public class MQTT_protocol implements Protocol {
|
|
}
|
|
}
|
|
if(dest != null){
|
|
if(dest != null){
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
|
|
- if(dest.getStatus()!=Port.CLOSED)
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP, timestep, dest, port));
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Delay to the destination
|
|
|
|
+ */
|
|
|
|
+ long delayBrokerDest = broker.getTransmissionDelayTo(dest);
|
|
|
|
+ if(dest.getStatus()!=Port.CLOSED && delayBrokerDest!=Long.MAX_VALUE)
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP, timestep+delayBrokerDest+dest.getJitter()+dest.getResponseTime(), dest, port));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if(subs.contains(port)||pubSubs.contains(port)){
|
|
if(subs.contains(port)||pubSubs.contains(port)){
|
|
@@ -162,9 +166,14 @@ public class MQTT_protocol implements Protocol {
|
|
newTopic = available.get((int) Math.floor(Math.random()*available.size()));
|
|
newTopic = available.get((int) Math.floor(Math.random()*available.size()));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
|
|
|
|
|
|
- if(broker.getStatus()!=Port.CLOSED){
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Delay to the broker
|
|
|
|
+ */
|
|
|
|
+ long delayPortToBroker = port.getTransmissionDelayTo(broker);
|
|
|
|
+
|
|
|
|
+ if(broker.getStatus()!=Port.CLOSED && delayPortToBroker!=Long.MAX_VALUE){
|
|
timestep+=broker.getResponseTime();
|
|
timestep+=broker.getResponseTime();
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
|
|
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep+delayPortToBroker, broker, port));
|
|
tops.add(newTopic);
|
|
tops.add(newTopic);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -172,9 +181,14 @@ public class MQTT_protocol implements Protocol {
|
|
//Forced Unsubscribe
|
|
//Forced Unsubscribe
|
|
newTopic = tops.get((int) Math.floor(Math.random()*tops.size()));
|
|
newTopic = tops.get((int) Math.floor(Math.random()*tops.size()));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
|
|
- if(broker.getStatus()!=Port.CLOSED){
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Delay to the broker
|
|
|
|
+ */
|
|
|
|
+ long delayPortToBroker = port.getTransmissionDelayTo(broker);
|
|
|
|
+
|
|
|
|
+ if(broker.getStatus()!=Port.CLOSED && delayPortToBroker!=Long.MAX_VALUE){
|
|
timestep+=broker.getResponseTime();
|
|
timestep+=broker.getResponseTime();
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBACK, timestep, broker, port));
|
|
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBACK, timestep+delayPortToBroker, broker, port));
|
|
tops.remove(newTopic);
|
|
tops.remove(newTopic);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -186,31 +200,50 @@ public class MQTT_protocol implements Protocol {
|
|
*/
|
|
*/
|
|
String msg = "Topic:"+newTopic+":"+(Math.random()<0.5?"true":"false");
|
|
String msg = "Topic:"+newTopic+":"+(Math.random()<0.5?"true":"false");
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, port, broker, msg));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, port, broker, msg));
|
|
- //Response
|
|
|
|
- timestep+=broker.getResponseTime();
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Delay to the broker
|
|
|
|
+ */
|
|
|
|
+ long delayPortToBroker = port.getTransmissionDelayTo(broker);
|
|
//Publish to Subscribers
|
|
//Publish to Subscribers
|
|
//Should be be improved to just notify Subs that are subscribed to the topic
|
|
//Should be be improved to just notify Subs that are subscribed to the topic
|
|
- if(broker.getStatus()!=Port.CLOSED){
|
|
|
|
|
|
+ if(broker.getStatus()!=Port.CLOSED && delayPortToBroker!=Long.MAX_VALUE){
|
|
|
|
+ //Response
|
|
|
|
+ timestep+=broker.getResponseTime()+delayPortToBroker;
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
|
|
|
|
+
|
|
for(Port p:subs){
|
|
for(Port p:subs){
|
|
if(!subbedTopics.get(p).contains(newTopic))continue;//Skip unsubbed ports
|
|
if(!subbedTopics.get(p).contains(newTopic))continue;//Skip unsubbed ports
|
|
|
|
+ /**
|
|
|
|
+ * Delay broker to subscriber
|
|
|
|
+ */
|
|
|
|
+ long delayBrokerToSub = broker.getTransmissionDelayTo(p);
|
|
timestep+=broker.getResponseTime();
|
|
timestep+=broker.getResponseTime();
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
|
|
|
|
|
|
+ if(p.getStatus()!=Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE)
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
|
|
}
|
|
}
|
|
for(Port p:pubSubs){
|
|
for(Port p:pubSubs){
|
|
if(!subbedTopics.get(p).contains(newTopic))continue;//skip unsubbed ports
|
|
if(!subbedTopics.get(p).contains(newTopic))continue;//skip unsubbed ports
|
|
timestep+=broker.getResponseTime();
|
|
timestep+=broker.getResponseTime();
|
|
|
|
+ /**
|
|
|
|
+ * Delay broker to subscriber
|
|
|
|
+ */
|
|
|
|
+ long delayBrokerToSub = broker.getTransmissionDelayTo(p);
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
|
|
|
|
|
|
+ if(p.getStatus()!=Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE)
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if(Math.random() < 0.05 && port != broker){
|
|
if(Math.random() < 0.05 && port != broker){
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
|
|
- if(port.getStatus()!=Port.CLOSED)
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP, timestep, broker, port));
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Delay broker to subscriber
|
|
|
|
+ */
|
|
|
|
+ long delayPortToBroker = port.getTransmissionDelayTo(broker);
|
|
|
|
+ if(broker.getStatus()!=Port.CLOSED && delayPortToBroker != Long.MAX_VALUE)
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP, timestep+delayPortToBroker+broker.getResponseTime(), broker, port));
|
|
}
|
|
}
|
|
return returnPackets;
|
|
return returnPackets;
|
|
}
|
|
}
|