Browse Source

Adds MQTT subscribed topics and improves subscription/publish packets

Andreas T. Meyer-Berg 5 years ago
parent
commit
1f6e6a4b8f

+ 56 - 7
src/main/java/de/tu_darmstadt/tk/SmartHomeNetworkSim/core/protocols/MQTT_protocol.java

@@ -2,6 +2,7 @@ package de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedList;
 
 import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
@@ -31,6 +32,11 @@ public class MQTT_protocol implements Protocol {
 	 * Subscriber which subscribe to different Topics
 	 */
 	private LinkedList<Port> subs;
+	
+	/**
+	 * Topics subscribed by each Port
+	 */
+	private HashMap<Port,LinkedList<String>> subbedTopics = new HashMap<Port,LinkedList<String>>();
 
 	/**
 	 * Devices that are Publisher and Subscriber and therefore send and receive
@@ -85,11 +91,51 @@ public class MQTT_protocol implements Protocol {
 		 */
 		if(port==broker){
 			//Broker should not send new packages, without new messages
-		}else if(pubs.contains(port)){
+		}else if(subs.contains(port)||pubSubs.contains(port)){
+			
+			//Subs can either subscribe to topics or unsubscribe
+			/**
+			 * Topics, the SmartDevice is subscribed to
+			 */
+			LinkedList<String> tops= subbedTopics.get(port);
+			
+			/**
+			 * Topic which should be subscribed or unsubscribed
+			 */
+			String newTopic;
+			//Subsribe if no subscriptions so far or not subscribed to all, with a probability of70%
+			if(tops.size()==0||(tops.size()!=subbedTopics.size()&&Math.random()<0.7)){
+				//Forced Subscribe
+				newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
+				returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
+				timestep+=broker.getResponseTime();
+				returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
+				tops.add(newTopic);
+			}
+			else if(tops.size()==subbedTopics.size()){
+				//Forced Unsubscribe
+				newTopic = tops.get((int) Math.floor(Math.random()*tops.size()));
+				returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
+				timestep+=broker.getResponseTime();
+				returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBACK, timestep, broker, port));
+				tops.remove(newTopic);
+			}
+			
+			if(pubSubs.contains(port)&&Math.random()<0.3){
+				//When also Pub, publish sometimes 
+				newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
+				returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "Topic:"+newTopic));
+				timestep+=broker.getResponseTime();
+				returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
+			}
+			
+		}
+		if(pubs.contains(port)||pubSubs.contains(port)&&Math.random()<0.3){
+			String newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
 			/**
 			 * Message to be published (Should be further specialized by a Sensor Device) 
 			 */
-			String msg = "Topic: DoorState:"+(Math.random()<0.5?"true":"false");
+			String msg = "Topic:"+newTopic+":"+(Math.random()<0.5?"true":"false");
 			returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, port, broker, msg));
 			//Response
 			timestep+=broker.getResponseTime();
@@ -98,21 +144,19 @@ public class MQTT_protocol implements Protocol {
 			//Publish to Subscribers
 			//Should be be improved to just notify Subs that are subscribed to the topic
 			for(Port p:subs){
+				if(!subbedTopics.get(p).contains(newTopic))continue;//Skip unsubbed ports
 				timestep+=broker.getResponseTime();
 				returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
 				returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
 			}
 			for(Port p:pubSubs){
+				if(!subbedTopics.get(p).contains(newTopic))continue;//skip unsubbed ports
 				timestep+=broker.getResponseTime();
 				returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
 				returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
 			}
-		}else if(subs.contains(port)){
-			returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "Topic: DoorState"));
-			timestep+=broker.getResponseTime();
-			returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
 		}
-		return null;
+		return returnPackets;
 	}
 
 	@Override
@@ -162,12 +206,15 @@ public class MQTT_protocol implements Protocol {
 			return false;
 		case 1:
 			pubSubs.add(device);
+
+			subbedTopics.putIfAbsent(device, new LinkedList<String>());
 			break;
 		case 2:
 			pubs.add(device);
 			break;
 		case 3:
 			subs.add(device);
+			subbedTopics.putIfAbsent(device, new LinkedList<String>());
 			break;
 		default:
 			// invalid role
@@ -191,6 +238,8 @@ public class MQTT_protocol implements Protocol {
 		removedDevice|=pubSubs.remove(device);
 		removedDevice|=subs.remove(device);
 		removedDevice|=pubs.remove(device);
+		//Remove Port from topics and clear its list
+		subbedTopics.remove(device).clear();
 		if(removedDevice && broker!=null){
 			//If not the broker and device was removed -> disconnect
 			currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size()/2, device, broker));

+ 1 - 1
src/main/java/de/tu_darmstadt/tk/SmartHomeNetworkSim/core/protocols/packets/MQTT_packet.java

@@ -26,7 +26,7 @@ public class MQTT_packet extends Packet {
 	/**
 	 * Reserved (forbidden)
 	 */
-	public static final byte RESERVED = 0x00;
+	public static final byte RESERVED = (byte) 0x00;
 	/**
 	 * Client request to connect to Server (Client->Server)
 	 */

+ 0 - 1
src/main/java/de/tu_darmstadt/tk/SmartHomeNetworkSim/core/simpleImplementation/SimplePacket.java

@@ -2,7 +2,6 @@ package de.tu_darmstadt.tk.SmartHomeNetworkSim.core.simpleImplementation;
 
 import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
 import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port;
-import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice;
 
 /**
  * Simple dummy packet for testing purposes