|
@@ -21,22 +21,98 @@ public class MQTT_protocol implements Protocol {
|
|
* Broker which collects and distributes messages
|
|
* Broker which collects and distributes messages
|
|
*/
|
|
*/
|
|
private Port broker;
|
|
private Port broker;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Publishers like sensors, which publish data
|
|
* Publishers like sensors, which publish data
|
|
*/
|
|
*/
|
|
private LinkedList<Port> pubs;
|
|
private LinkedList<Port> pubs;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Subscriber which subscribe to different Topics
|
|
* Subscriber which subscribe to different Topics
|
|
*/
|
|
*/
|
|
private LinkedList<Port> subs;
|
|
private LinkedList<Port> subs;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
- * Devices that are Publisher and Subscriber and therefore send and receive messages
|
|
|
|
|
|
+ * Devices that are Publisher and Subscriber and therefore send and receive
|
|
|
|
+ * messages
|
|
*/
|
|
*/
|
|
private LinkedList<Port> pubSubs;
|
|
private LinkedList<Port> pubSubs;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Packets which are currently being generated or were generated after the
|
|
|
|
+ * last generatePackets Call
|
|
|
|
+ */
|
|
|
|
+ private LinkedList<Packet> currentPackets = new LinkedList<Packet>();
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Topics of the MQTT Broker
|
|
|
|
+ */
|
|
|
|
+ private LinkedList<String> topics = new LinkedList<String>();
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Creates a new MQTT Protocol
|
|
|
|
+ *
|
|
|
|
+ * @param broker broker of the protocol
|
|
|
|
+ */
|
|
|
|
+ public MQTT_protocol(Port broker) {
|
|
|
|
+ this.broker = broker;
|
|
|
|
+ topics.add("temperature");
|
|
|
|
+ topics.add("doorState");
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
- public Packet generateNextPaket(Port port, long timestep, boolean packetLost) {
|
|
|
|
|
|
+ public Collection<Packet> generateNextPakets(Port port, long timestep, boolean packetLost) {
|
|
|
|
+ /**
|
|
|
|
+ * Packets which will be generated
|
|
|
|
+ */
|
|
|
|
+ LinkedList<Packet> returnPackets = new LinkedList<Packet>();
|
|
|
|
+ //Update all Timestamps of previous created Packets
|
|
|
|
+ for(Packet p: currentPackets){
|
|
|
|
+ p.setTimestamp(p.getTimestamp()+timestep);
|
|
|
|
+ }
|
|
|
|
+ //Add these packets to the return
|
|
|
|
+ returnPackets.addAll(currentPackets);
|
|
|
|
+ //remove packets from the old list
|
|
|
|
+ currentPackets.clear();
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Update the lastTime the port was triggered
|
|
|
|
+ */
|
|
port.setLastTrigger(timestep);
|
|
port.setLastTrigger(timestep);
|
|
- return new MQTT_packet(timestep);
|
|
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Generate new Packets regarding to their class
|
|
|
|
+ */
|
|
|
|
+ if(port==broker){
|
|
|
|
+ //Broker should not send new packages, without new messages
|
|
|
|
+ }else if(pubs.contains(port)){
|
|
|
|
+ /**
|
|
|
|
+ * Message to be published (Should be further specialized by a Sensor Device)
|
|
|
|
+ */
|
|
|
|
+ String msg = "Topic: DoorState:"+(Math.random()<0.5?"true":"false");
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, port, broker, msg));
|
|
|
|
+ //Response
|
|
|
|
+ timestep+=broker.getResponseTime();
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
|
|
|
|
+
|
|
|
|
+ //Publish to Subscribers
|
|
|
|
+ //Should be be improved to just notify Subs that are subscribed to the topic
|
|
|
|
+ for(Port p:subs){
|
|
|
|
+ timestep+=broker.getResponseTime();
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
|
|
|
|
+ }
|
|
|
|
+ for(Port p:pubSubs){
|
|
|
|
+ timestep+=broker.getResponseTime();
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
|
|
|
|
+ }
|
|
|
|
+ }else if(subs.contains(port)){
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "Topic: DoorState"));
|
|
|
|
+ timestep+=broker.getResponseTime();
|
|
|
|
+ returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
|
|
|
|
+ }
|
|
|
|
+ return null;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -46,8 +122,8 @@ public class MQTT_protocol implements Protocol {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public String[] getRoles() {
|
|
public String[] getRoles() {
|
|
- //PublisherSubscriber is Publisher as well as Subscriber
|
|
|
|
- return new String[]{"Broker", "PublisherSubsriber","Publisher","Subscriber"};
|
|
|
|
|
|
+ // PublisherSubscriber is Publisher as well as Subscriber
|
|
|
|
+ return new String[] { "Broker", "PublisherSubsriber", "Publisher", "Subscriber" };
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -69,33 +145,60 @@ public class MQTT_protocol implements Protocol {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public boolean addDeviceOfRole(Port device, int role) {
|
|
public boolean addDeviceOfRole(Port device, int role) {
|
|
- switch (role) {
|
|
|
|
- case 0:
|
|
|
|
- if(broker != null)
|
|
|
|
- return false;
|
|
|
|
- else{
|
|
|
|
|
|
+ /*
|
|
|
|
+ * First device has to be the Broker
|
|
|
|
+ */
|
|
|
|
+ if (broker==null) {
|
|
|
|
+ if (role == 0) {
|
|
broker = device;
|
|
broker = device;
|
|
|
|
+ return true;
|
|
|
|
+ } else {
|
|
|
|
+ return false;
|
|
}
|
|
}
|
|
- break;
|
|
|
|
|
|
+ }
|
|
|
|
+ switch (role) {
|
|
|
|
+ case 0:
|
|
|
|
+ //Just one broker allowed.
|
|
|
|
+ return false;
|
|
case 1:
|
|
case 1:
|
|
pubSubs.add(device);
|
|
pubSubs.add(device);
|
|
|
|
+ break;
|
|
case 2:
|
|
case 2:
|
|
pubs.add(device);
|
|
pubs.add(device);
|
|
|
|
+ break;
|
|
case 3:
|
|
case 3:
|
|
subs.add(device);
|
|
subs.add(device);
|
|
|
|
+ break;
|
|
default:
|
|
default:
|
|
|
|
+ // invalid role
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
+ // Create packets for the connecting Client
|
|
|
|
+ currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size()/2, device, broker));
|
|
|
|
+ currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size()/2+1, broker, device));
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void removeDevice(Port device) {
|
|
public void removeDevice(Port device) {
|
|
- if(broker == device)
|
|
|
|
|
|
+ /**
|
|
|
|
+ * true if the device was removed
|
|
|
|
+ */
|
|
|
|
+ boolean removedDevice=false;
|
|
|
|
+ if (broker == device){
|
|
broker = null;
|
|
broker = null;
|
|
- pubSubs.remove(device);
|
|
|
|
- subs.remove(device);
|
|
|
|
- pubs.remove(device);
|
|
|
|
|
|
+ }
|
|
|
|
+ removedDevice|=pubSubs.remove(device);
|
|
|
|
+ removedDevice|=subs.remove(device);
|
|
|
|
+ removedDevice|=pubs.remove(device);
|
|
|
|
+ if(removedDevice && broker!=null){
|
|
|
|
+ //If not the broker and device was removed -> disconnect
|
|
|
|
+ currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size()/2, device, broker));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void addTopic(String topic){
|
|
|
|
+ topics.add(topic);
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|