package de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Random; import org.hamcrest.core.IsAnything; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SimulationManager; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolCollector; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolSensor; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatCollector; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatSensor; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTTpublishPacket; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.scheduler.AbstractEvent; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair; /** * Implementation of the MQTT Protocol to generate packets for the simulation * * * @author Andreas T. Meyer-Berg */ public class MQTT_protocol implements Protocol { /** * Broker which collects and distributes messages */ private Port broker; /** * Publishers like sensors, which publish data */ private LinkedList pubs; /** * Subscriber which subscribe to different Topics */ private LinkedList subs; /** * Topics subscribed by each Port */ private HashMap> subbedTopics = new HashMap>(); /** * Devices that are Publisher and Subscriber and therefore send and receive * messages */ private LinkedList pubSubs; /** * Packets which are currently being generated or were generated after the * last generatePackets Call */ private LinkedList currentPackets = new LinkedList(); // For Visualization in case of deleting the broker private LinkedList> deletedConnectionLinks = new LinkedList>(); private SmartDevice deletedBroker = new SmartDevice("DeletedBroker"); private Port deletedBrokerPort = new Port(deletedBroker, (short) -1); /** * Topics of the MQTT Broker */ // private LinkedList topics = new LinkedList(); /** * Creates a new MQTT Protocol */ public MQTT_protocol() { this.broker = null; initialize(); } /** * Creates a new MQTT Protocol * * @param broker * broker of the protocol */ public MQTT_protocol(Port broker) { this.broker = broker; initialize(); } /** * Initializes the different fields */ private void initialize() { subs = new LinkedList(); pubs = new LinkedList(); pubSubs = new LinkedList(); } @Override public Collection generateNextPackets(Port port, long timestep, boolean packetLost) { /** * Packets which will be generated */ LinkedList returnPackets = new LinkedList(); // Update all Timestamps of previous created Packets for (Packet p : currentPackets) { p.setTimestamp(p.getTimestamp() + timestep); } // Add these packets to the return returnPackets.addAll(currentPackets); // remove packets from the old list currentPackets.clear(); // Clear deleted connections deletedConnectionLinks.clear(); // Return termination packets if (port == null) return returnPackets; SmartDevice device = port.getOwner(); if (device == null) return returnPackets; /** * Update the lastTime the port was triggered */ port.setLastTrigger(timestep); /** * if port null, skip this step */ if (broker == null) return returnPackets; /** * Generate new Packets regarding to their class */ if (port == broker) { // Broker should not send new packages, without new messages // But could perform some Ping request to random participants Collection devices = getDevices(); devices.remove(port); Port dest = null; Iterator it = devices.iterator(); for (int i = 0; i < (Math.random() * devices.size()) && it.hasNext(); i++) { dest = it.next(); } if (dest != null) { returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest)); /** * Delay to the destination */ long delayBrokerDest = broker.getTransmissionDelayTo(dest); if (dest.getStatus() != Port.CLOSED && delayBrokerDest != Long.MAX_VALUE) returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP, timestep + delayBrokerDest + dest.getJitter() + dest.getResponseTime(), dest, port)); } } if (subs.contains(port) || pubSubs.contains(port)) { // Subs can either subscribe to topics or unsubscribe /** * Topics, the SmartDevice is subscribed to */ LinkedList tops = subbedTopics.get(port); /** * Topic which should be subscribed to */ String newTopic = null; /** * Check if FloatCollector & not subscribed */ if (device instanceof FloatCollector) { FloatCollector fCollector = (FloatCollector) device; if (!tops.contains(fCollector.getFCinfoName())) newTopic = fCollector.getFCinfoName(); } /** * Check if BoolCollector & not subscribed */ if (device instanceof BoolCollector) { BoolCollector bCollector = (BoolCollector) device; if (!tops.contains(bCollector.getBCinfoName())) newTopic = bCollector.getBCinfoName(); } // Subscribe if no subscriptions so far or not subscribed to all, // with a probability of70% if (newTopic != null) { /** * Subscribe Request */ returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:" + newTopic)); /** * Delay to the broker */ long delayPortToBroker = port.getTransmissionDelayTo(broker); if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) { timestep += broker.getResponseTime(); returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep + delayPortToBroker, broker, port)); tops.add(newTopic); } } else { /** * Check if some topics should be unsubscribed (e.g., one topic * name change) */ newTopic = null; LinkedList oldTopics = new LinkedList(tops); if (!oldTopics.isEmpty()) { /** * Check if FloatCollector & not subscribed */ if (device instanceof FloatCollector) { FloatCollector fCollector = (FloatCollector) device; oldTopics.remove(fCollector.getFCinfoName()); } /** * Check if BoolCollector & not subscribed */ if (device instanceof BoolCollector) { BoolCollector bCollector = (BoolCollector) device; oldTopics.remove(bCollector.getBCinfoName()); } if (!oldTopics.isEmpty()) { newTopic = oldTopics.getFirst(); /** * Send Unsubscribe Packet */ returnPackets.add( new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:" + newTopic)); /** * Delay to the broker */ long delayPortToBroker = port.getTransmissionDelayTo(broker); /** * Ackknowledgement */ if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) { timestep += broker.getResponseTime(); returnPackets.add( new MQTT_packet(MQTT_packet.UNSUBACK, timestep + delayPortToBroker, broker, port)); tops.remove(newTopic); } } } } } if (pubs.contains(port) || pubSubs.contains(port) && Math.random() < 0.3) { /** * Topic which should be published to */ String newTopic = null; /** * Value which should be published */ String newValue = null; boolean isBoolean = false; /** * True if value anomaly (value = -1) */ boolean valueAnomaly = (device.getLabel()==(short)-1); /** * Check if FloatSensor */ if (device instanceof FloatSensor) { FloatSensor fSensor = (FloatSensor) device; newTopic = fSensor.getFSinfoName(); newValue = "" + fSensor.getFSval(); } /** * Check if BoolSensor - if also FloatSensor: 50% chance of * overriding */ if (device instanceof BoolSensor) { BoolSensor bSensor = (BoolSensor) device; if (newTopic == null || new Random().nextBoolean()) { newTopic = bSensor.getBSinfoName(); newValue = "" + bSensor.getBSval(); isBoolean = true; } } if (newTopic != null) { /** * Packet for publishing the new value */ Packet pubPacket = null; if(isBoolean) { pubPacket = new MQTTpublishPacket(timestep, port, broker, newTopic, Boolean.parseBoolean(newValue)); }else { pubPacket = new MQTTpublishPacket(timestep, port, broker, newTopic, Float.parseFloat(newValue)); } if(valueAnomaly) { pubPacket.setLabel((short) 1); } /** * Send Packet */ returnPackets.add(pubPacket); /** * Delay to the broker */ long delayPortToBroker = port.getTransmissionDelayTo(broker); // Publish to Subscribers if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) { /** * Response/Acknowledgement */ timestep += broker.getResponseTime() + delayPortToBroker; returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port)); for (Port p : subs) { /** * Skip unsubcribed ports */ if (!subbedTopics.get(p).contains(newTopic)) continue; /** * Delay broker to subscriber */ long delayBrokerToSub = broker.getTransmissionDelayTo(p); timestep += broker.getResponseTime(); /** * Packet Broker -> Subscriber */ if(isBoolean) { pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Boolean.parseBoolean(newValue)); }else { pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Float.parseFloat(newValue)); } if(valueAnomaly) { pubPacket.setLabel((short) 1); } returnPackets.add(pubPacket); if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) { returnPackets.add( new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker)); // Update Collector if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector && newTopic.equals(((FloatSensor) device).getFSinfoName())) { /** * original Float Value -> if it might change during the two events */ float oldValue = ((FloatSensor) device).getFSval(); /** * Schedule Event to update the sensor * -> therefore multiple parallel interactions would be in the right order */ SimulationManager.scheduleEvent(new AbstractEvent(timestep + p.getResponseTime()) { @Override public void simulateEvent(long time) { ((FloatCollector) p.getOwner()).setFCval(oldValue); } }); } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector && newTopic.equals(((BoolSensor) device).getBSinfoName())) { /** * Original sensor value */ boolean oldValue = ((BoolSensor) device).getBSval(); /** * Schedule Event to update the sensor * -> therefore multiple parallel interactions would be in the right order */ SimulationManager.scheduleEvent(new AbstractEvent(timestep + p.getResponseTime()) { @Override public void simulateEvent(long time) { ((BoolCollector) p.getOwner()).setBCval(oldValue); } }); } } } for (Port p : pubSubs) { /** * Skip unsubscribed Ports */ if (!subbedTopics.get(p).contains(newTopic) || p.getOwner() == null) continue; /** * Delay broker to subscriber */ long delayBrokerToSub = broker.getTransmissionDelayTo(p); timestep += broker.getResponseTime(); if(isBoolean) { pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Boolean.parseBoolean(newValue)); }else { pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Float.parseFloat(newValue)); } if(valueAnomaly) { pubPacket.setLabel((short) 1); } returnPackets.add(pubPacket); if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) { returnPackets.add( new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker)); // Update Collector if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector && newTopic.equals(((FloatSensor) device).getFSinfoName())) { ((FloatCollector) p.getOwner()).setFCval(((FloatSensor) device).getFSval()); } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector && newTopic.equals(((BoolSensor) device).getBSinfoName())) { ((BoolCollector) p.getOwner()).setBCval(((BoolSensor) device).getBSval()); } } } } } } /** * Rare Ping request to broker */ if (Math.random() < 0.05 && port != broker) { returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker)); /** * Delay broker to subscriber */ long delayPortToBroker = port.getTransmissionDelayTo(broker); if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP, timestep + delayPortToBroker + broker.getResponseTime(), broker, port)); } return returnPackets; } @Override public int getNumberOfRoles() { return 4; } @Override public String[] getRoles() { // PublisherSubscriber is Publisher as well as Subscriber return new String[] { "Broker", "PublisherSubscriber", "Publisher", "Subscriber" }; } @Override public Collection getDevicesWithRole(int role) { switch (role) { case 0: return new LinkedList(Arrays.asList(broker)); case 1: return pubSubs; case 2: return pubs; case 3: return subs; default: return null; } } @Override public boolean addDeviceOfRole(Port device, int role) { /* * First device has to be the Broker */ if (broker == null) { if (role == 0) { broker = device; updateBrokerOnDeletedConnections(null); return true; } else { return false; } } switch (role) { case 0: // Just one broker allowed. return false; case 1: pubSubs.add(device); subbedTopics.putIfAbsent(device, new LinkedList()); break; case 2: pubs.add(device); break; case 3: subs.add(device); subbedTopics.putIfAbsent(device, new LinkedList()); break; default: // invalid role return false; } // Create packets for the connecting Client currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size() / 2, device, broker)); currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size() / 2 + 1, broker, device)); return true; } @Override public void removeDevice(Port device) { /** * true if the device was removed */ boolean removedDevice = false; if (broker == device) { broker = null; deletedConnectionLinks .add(new Pair(new Port(device.getOwner(), (short) -1), deletedBrokerPort)); updateBrokerOnDeletedConnections(device); } removedDevice |= pubSubs.remove(device); removedDevice |= subs.remove(device); removedDevice |= pubs.remove(device); // Remove Port from topics and clear its list LinkedList oldTopics = subbedTopics.remove(device); if (oldTopics != null) oldTopics.clear(); if (removedDevice) { if (broker == null) { deletedConnectionLinks.add(new Pair(deletedBrokerPort, device)); } else { deletedConnectionLinks.add(new Pair(broker, device)); // If not the broker and device was removed -> disconnect currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size() / 2, device, broker)); } } } @Override public String getName() { return "MQTT"; } @Override public int getRoleOfDevice(Port device) { if (device == null) return -1; if (device == broker) return 0; if (pubSubs.contains(device)) return 1; if (pubs.contains(device)) return 2; if (subs.contains(device)) return 3; return -1; } @Override public Collection getDevices() { LinkedList returnDevices = new LinkedList(); if (broker != null) returnDevices.add(broker); returnDevices.addAll(pubSubs); returnDevices.addAll(pubs); returnDevices.addAll(subs); return returnDevices; } @Override public byte getTopologyType() { return STAR; } @Override public Collection> getTopology() { LinkedList> topology = new LinkedList>(); Port center = broker; calcDeletedBrokerPosition(); if (broker == null) center = new Port(deletedBroker, (short) -1); for (Port p : pubSubs) topology.add(new Pair(center, p)); for (Port p : pubs) topology.add(new Pair(center, p)); for (Port p : subs) topology.add(new Pair(center, p)); return topology; } @Override public Collection> getDeletedTopology() { calcDeletedBrokerPosition(); return deletedConnectionLinks; } /** * Calculate and update the position of the deleted Broker */ private void calcDeletedBrokerPosition() { if (broker == null) { int x = 0, y = 0, noP = 0; for (Port p : getDevices()) { if (p != null && p.getOwner() != null) { x += p.getOwner().getX(); y += p.getOwner().getY(); noP++; } } if (noP == 0) return; deletedBroker.setX(((int) (x * 1.0) / noP)); deletedBroker.setY(((int) (y * 1.0) / noP)); } } /** * Update the broker port on the deleted Connections * * @param device * old broker */ private void updateBrokerOnDeletedConnections(Port device) { for (Pair p : deletedConnectionLinks) { if (broker == null) { if (p.getLeft() == device) p.setLeft(deletedBrokerPort); if (p.getRight() == device) p.setRight(deletedBrokerPort); } else { if (p.getLeft() == deletedBrokerPort) p.setLeft(broker); if (p.getRight() == deletedBrokerPort) p.setRight(broker); } } } }