瀏覽代碼

Adds MQTT Ping packages & removes responses if dest port is closed

Andreas T. Meyer-Berg 6 年之前
父節點
當前提交
529dfe2d4f
共有 1 個文件被更改,包括 46 次插入20 次删除
  1. 46 20
      src/main/java/de/tu_darmstadt/tk/SmartHomeNetworkSim/core/protocols/MQTT_protocol.java

+ 46 - 20
src/main/java/de/tu_darmstadt/tk/SmartHomeNetworkSim/core/protocols/MQTT_protocol.java

@@ -126,6 +126,19 @@ public class MQTT_protocol implements Protocol {
 		 */
 		if(port==broker){
 			//Broker should not send new packages, without new messages
+			//But could perform some Ping request to random participants
+			Collection<Port> devices = getDevices();
+			devices.remove(port);
+			Port dest = null;
+			java.util.Iterator<Port> it = devices.iterator();
+			for(int i=0; i<(Math.random()*devices.size())&&it.hasNext();i++){
+				dest = it.next();
+			}
+			if(dest != null){
+				returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
+				if(dest.getStatus()!=Port.CLOSED)
+					returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, dest, port));
+			}
 		}else if(subs.contains(port)||pubSubs.contains(port)){
 			
 			//Subs can either subscribe to topics or unsubscribe
@@ -143,27 +156,32 @@ public class MQTT_protocol implements Protocol {
 				//Forced Subscribe
 				newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
 				returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
-				timestep+=broker.getResponseTime();
-				returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
-				tops.add(newTopic);
+				if(broker.getStatus()!=Port.CLOSED){
+					timestep+=broker.getResponseTime();
+					returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
+					tops.add(newTopic);
+				}
 			}
 			else if(tops.size()==subbedTopics.size()){
 				//Forced Unsubscribe
 				newTopic = tops.get((int) Math.floor(Math.random()*tops.size()));
 				returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
-				timestep+=broker.getResponseTime();
-				returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBACK, timestep, broker, port));
-				tops.remove(newTopic);
+				if(broker.getStatus()!=Port.CLOSED){
+					timestep+=broker.getResponseTime();
+					returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBACK, timestep, broker, port));
+					tops.remove(newTopic);
+				}
 			}
 			
 			if(pubSubs.contains(port)&&Math.random()<0.3){
 				//When also Pub, publish sometimes 
 				newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
 				returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "Topic:"+newTopic));
-				timestep+=broker.getResponseTime();
-				returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
+				if(broker.getStatus()!=Port.CLOSED){
+					timestep+=broker.getResponseTime();
+					returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
+				}
 			}
-			
 		}
 		if(pubs.contains(port)||pubSubs.contains(port)&&Math.random()<0.3){
 			String newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
@@ -178,19 +196,27 @@ public class MQTT_protocol implements Protocol {
 			
 			//Publish to Subscribers
 			//Should be be improved to just notify Subs that are subscribed to the topic
-			for(Port p:subs){
-				if(!subbedTopics.get(p).contains(newTopic))continue;//Skip unsubbed ports
-				timestep+=broker.getResponseTime();
-				returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
-				returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
-			}
-			for(Port p:pubSubs){
-				if(!subbedTopics.get(p).contains(newTopic))continue;//skip unsubbed ports
-				timestep+=broker.getResponseTime();
-				returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
-				returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
+			if(broker.getStatus()!=Port.CLOSED){
+				for(Port p:subs){
+					if(!subbedTopics.get(p).contains(newTopic))continue;//Skip unsubbed ports
+					timestep+=broker.getResponseTime();
+					returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
+					
+					returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
+				}
+				for(Port p:pubSubs){
+					if(!subbedTopics.get(p).contains(newTopic))continue;//skip unsubbed ports
+					timestep+=broker.getResponseTime();
+					returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
+					returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
+				}
 			}
 		}
+		if(Math.random() < 0.05 && port != broker){
+			returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
+			if(port.getStatus()!=Port.CLOSED)
+				returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, broker, port));
+		}
 		return returnPackets;
 	}