|
@@ -139,7 +139,8 @@ public class MQTT_protocol implements Protocol {
|
|
if(dest.getStatus()!=Port.CLOSED)
|
|
if(dest.getStatus()!=Port.CLOSED)
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, dest, port));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, dest, port));
|
|
}
|
|
}
|
|
- }else if(subs.contains(port)||pubSubs.contains(port)){
|
|
|
|
|
|
+ }
|
|
|
|
+ if(subs.contains(port)||pubSubs.contains(port)){
|
|
|
|
|
|
//Subs can either subscribe to topics or unsubscribe
|
|
//Subs can either subscribe to topics or unsubscribe
|
|
/**
|
|
/**
|
|
@@ -152,17 +153,22 @@ public class MQTT_protocol implements Protocol {
|
|
*/
|
|
*/
|
|
String newTopic;
|
|
String newTopic;
|
|
//Subscribe if no subscriptions so far or not subscribed to all, with a probability of70%
|
|
//Subscribe if no subscriptions so far or not subscribed to all, with a probability of70%
|
|
- if(tops.size()==0||(tops.size()!=subbedTopics.size()&&Math.random()<0.7)){
|
|
|
|
|
|
+ if(tops.size()==0||(tops.size()!=topics.size()&&Math.random()<0.7)){
|
|
//Forced Subscribe
|
|
//Forced Subscribe
|
|
- newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
|
|
|
|
|
|
+ LinkedList<String> available = new LinkedList<String>();
|
|
|
|
+ available.addAll(topics);
|
|
|
|
+ available.removeAll(tops);
|
|
|
|
+
|
|
|
|
+ newTopic = available.get((int) Math.floor(Math.random()*available.size()));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
|
|
|
|
+
|
|
if(broker.getStatus()!=Port.CLOSED){
|
|
if(broker.getStatus()!=Port.CLOSED){
|
|
timestep+=broker.getResponseTime();
|
|
timestep+=broker.getResponseTime();
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
|
|
tops.add(newTopic);
|
|
tops.add(newTopic);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- else if(tops.size()==subbedTopics.size()){
|
|
|
|
|
|
+ else if(tops.size()==topics.size()){
|
|
//Forced Unsubscribe
|
|
//Forced Unsubscribe
|
|
newTopic = tops.get((int) Math.floor(Math.random()*tops.size()));
|
|
newTopic = tops.get((int) Math.floor(Math.random()*tops.size()));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
|
|
@@ -172,16 +178,6 @@ public class MQTT_protocol implements Protocol {
|
|
tops.remove(newTopic);
|
|
tops.remove(newTopic);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- if(pubSubs.contains(port)&&Math.random()<0.3){
|
|
|
|
- //When also Pub, publish sometimes
|
|
|
|
- newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "Topic:"+newTopic));
|
|
|
|
- if(broker.getStatus()!=Port.CLOSED){
|
|
|
|
- timestep+=broker.getResponseTime();
|
|
|
|
- returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
if(pubs.contains(port)||pubSubs.contains(port)&&Math.random()<0.3){
|
|
if(pubs.contains(port)||pubSubs.contains(port)&&Math.random()<0.3){
|
|
String newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
|
|
String newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
|
|
@@ -201,7 +197,6 @@ public class MQTT_protocol implements Protocol {
|
|
if(!subbedTopics.get(p).contains(newTopic))continue;//Skip unsubbed ports
|
|
if(!subbedTopics.get(p).contains(newTopic))continue;//Skip unsubbed ports
|
|
timestep+=broker.getResponseTime();
|
|
timestep+=broker.getResponseTime();
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
|
|
-
|
|
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
|
|
returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
|
|
}
|
|
}
|
|
for(Port p:pubSubs){
|
|
for(Port p:pubSubs){
|