123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635 |
- package de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.LinkedList;
- import java.util.Random;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SimulationManager;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolCollector;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.BoolSensor;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatCollector;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.devices.FloatSensor;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTTpublishPacket;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.scheduler.AbstractEvent;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair;
- /**
- * Implementation of the MQTT Protocol to generate packets for the simulation
- *
- *
- * @author Andreas T. Meyer-Berg
- */
- public class MQTT_protocol implements Protocol {
- /**
- * Broker which collects and distributes messages
- */
- private Port broker;
- /**
- * Publishers like sensors, which publish data
- */
- private LinkedList<Port> pubs;
- /**
- * Subscriber which subscribe to different Topics
- */
- private LinkedList<Port> subs;
- /**
- * Topics subscribed by each Port
- */
- private HashMap<Port, LinkedList<String>> subbedTopics = new HashMap<Port, LinkedList<String>>();
- /**
- * Devices that are Publisher and Subscriber and therefore send and receive
- * messages
- */
- private LinkedList<Port> pubSubs;
- /**
- * Packets which are currently being generated or were generated after the
- * last generatePackets Call
- */
- private LinkedList<Packet> currentPackets = new LinkedList<Packet>();
- // For Visualization in case of deleting the broker
- private LinkedList<Pair<Port, Port>> deletedConnectionLinks = new LinkedList<Pair<Port, Port>>();
- private SmartDevice deletedBroker = new SmartDevice("DeletedBroker");
- private Port deletedBrokerPort = new Port(deletedBroker, (short) -1);
- /**
- * Topics of the MQTT Broker
- */
- // private LinkedList<String> topics = new LinkedList<String>();
- /**
- * Creates a new MQTT Protocol
- */
- public MQTT_protocol() {
- this.broker = null;
- initialize();
- }
- /**
- * Creates a new MQTT Protocol
- *
- * @param broker
- * broker of the protocol
- */
- public MQTT_protocol(Port broker) {
- this.broker = broker;
- initialize();
- }
- /**
- * Initializes the different fields
- */
- private void initialize() {
- subs = new LinkedList<Port>();
- pubs = new LinkedList<Port>();
- pubSubs = new LinkedList<Port>();
- }
- @Override
- public Collection<Packet> generateNextPackets(Port port, long timestep, boolean packetLost) {
- /**
- * Packets which will be generated
- */
- LinkedList<Packet> returnPackets = new LinkedList<Packet>();
- // Update all Timestamps of previous created Packets
- for (Packet p : currentPackets) {
- p.setTimestamp(p.getTimestamp() + timestep);
- }
- // Add these packets to the return
- returnPackets.addAll(currentPackets);
- // remove packets from the old list
- currentPackets.clear();
- // Clear deleted connections
- deletedConnectionLinks.clear();
- // Return termination packets
- if (port == null)
- return returnPackets;
- SmartDevice device = port.getOwner();
- if (device == null)
- return returnPackets;
- /**
- * Update the lastTime the port was triggered
- */
- port.setLastTrigger(timestep);
- /**
- * if port null, skip this step
- */
- if (broker == null)
- return returnPackets;
- /**
- * Generate new Packets regarding to their class
- */
- if (port == broker) {
- // Broker should not send new packages, without new messages
- // But could perform some Ping request to random participants
- Collection<Port> devices = getDevices();
- devices.remove(port);
- Port dest = null;
- Iterator<Port> it = devices.iterator();
- for (int i = 0; i < (Math.random() * devices.size()) && it.hasNext(); i++) {
- dest = it.next();
- }
- if (dest != null) {
- returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
- /**
- * Delay to the destination
- */
- long delayBrokerDest = broker.getTransmissionDelayTo(dest);
- if (dest.getStatus() != Port.CLOSED && delayBrokerDest != Long.MAX_VALUE)
- returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
- timestep + delayBrokerDest + dest.getJitter() + dest.getResponseTime(), dest, port));
- }
- }
- if (subs.contains(port) || pubSubs.contains(port)) {
- // Subs can either subscribe to topics or unsubscribe
- /**
- * Topics, the SmartDevice is subscribed to
- */
- LinkedList<String> tops = subbedTopics.get(port);
- /**
- * Topic which should be subscribed to
- */
- String newTopic = null;
-
- /**
- * Check if FloatCollector & not subscribed
- */
- if (device instanceof FloatCollector) {
- FloatCollector fCollector = (FloatCollector) device;
- if (!tops.contains(fCollector.getFCinfoName()))
- newTopic = fCollector.getFCinfoName();
- }
- /**
- * Check if BoolCollector & not subscribed
- */
- if (device instanceof BoolCollector) {
- BoolCollector bCollector = (BoolCollector) device;
- if (!tops.contains(bCollector.getBCinfoName()))
- newTopic = bCollector.getBCinfoName();
- }
- // Subscribe if no subscriptions so far or not subscribed to all,
- // with a probability of70%
- if (newTopic != null) {
- /**
- * Subscribe Request
- */
- returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
- /**
- * Delay to the broker
- */
- long delayPortToBroker = port.getTransmissionDelayTo(broker);
- if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
- timestep += broker.getResponseTime();
- returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep + delayPortToBroker, broker, port));
- tops.add(newTopic);
- }
- } else {
- /**
- * Check if some topics should be unsubscribed (e.g., one topic
- * name change)
- */
- newTopic = null;
- LinkedList<String> oldTopics = new LinkedList<String>(tops);
- if (!oldTopics.isEmpty()) {
- /**
- * Check if FloatCollector & not subscribed
- */
- if (device instanceof FloatCollector) {
- FloatCollector fCollector = (FloatCollector) device;
- oldTopics.remove(fCollector.getFCinfoName());
- }
- /**
- * Check if BoolCollector & not subscribed
- */
- if (device instanceof BoolCollector) {
- BoolCollector bCollector = (BoolCollector) device;
- oldTopics.remove(bCollector.getBCinfoName());
- }
- if (!oldTopics.isEmpty()) {
- newTopic = oldTopics.getFirst();
- /**
- * Send Unsubscribe Packet
- */
- returnPackets.add(
- new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:" + newTopic));
- /**
- * Delay to the broker
- */
- long delayPortToBroker = port.getTransmissionDelayTo(broker);
- /**
- * Ackknowledgement
- */
- if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
- timestep += broker.getResponseTime();
- returnPackets.add(
- new MQTT_packet(MQTT_packet.UNSUBACK, timestep + delayPortToBroker, broker, port));
- tops.remove(newTopic);
- }
- }
- }
- }
- }
- if (pubs.contains(port) || pubSubs.contains(port) && Math.random() < 0.3) {
- /**
- * Topic which should be published to
- */
- String newTopic = null;
- /**
- * Value which should be published
- */
- String newValue = null;
- boolean isBoolean = false;
- /**
- * True if value anomaly (value = -1)
- */
- boolean valueAnomaly = (device.getLabel()==(short)-1);
- /**
- * Check if FloatSensor
- */
- if (device instanceof FloatSensor) {
- FloatSensor fSensor = (FloatSensor) device;
- newTopic = fSensor.getFSinfoName();
- newValue = "" + fSensor.getFSval();
- }
- /**
- * Check if BoolSensor - if also FloatSensor: 50% chance of
- * overriding
- */
- if (device instanceof BoolSensor) {
- BoolSensor bSensor = (BoolSensor) device;
- if (newTopic == null || new Random().nextBoolean()) {
- newTopic = bSensor.getBSinfoName();
- newValue = "" + bSensor.getBSval();
- isBoolean = true;
- }
- }
- if (newTopic != null) {
- /**
- * Packet for publishing the new value
- */
- Packet pubPacket = null;
- if(isBoolean) {
- pubPacket = new MQTTpublishPacket(timestep, port, broker, newTopic, Boolean.parseBoolean(newValue));
- }else {
- pubPacket = new MQTTpublishPacket(timestep, port, broker, newTopic, Float.parseFloat(newValue));
- }
- if(valueAnomaly) {
- pubPacket.setLabel((short) 1);
- }
- /**
- * Send Packet
- */
- returnPackets.add(pubPacket);
- /**
- * Delay to the broker
- */
- long delayPortToBroker = port.getTransmissionDelayTo(broker);
- // Publish to Subscribers
- if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE) {
- /**
- * Response/Acknowledgement
- */
- timestep += broker.getResponseTime() + delayPortToBroker;
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
- for (Port p : subs) {
- /**
- * Skip unsubcribed ports
- */
- if (!subbedTopics.get(p).contains(newTopic))
- continue;
- /**
- * Delay broker to subscriber
- */
- long delayBrokerToSub = broker.getTransmissionDelayTo(p);
- timestep += broker.getResponseTime();
- /**
- * Packet Broker -> Subscriber
- */
- if(isBoolean) {
- pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Boolean.parseBoolean(newValue));
- }else {
- pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Float.parseFloat(newValue));
- }
- if(valueAnomaly) {
- pubPacket.setLabel((short) 1);
- }
- returnPackets.add(pubPacket);
- if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
- returnPackets.add(
- new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
- // Update Collector
- if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
- && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
- /**
- * original Float Value -> if it might change during the two events
- */
- float oldValue = ((FloatSensor) device).getFSval();
- /**
- * Schedule Event to update the sensor
- * -> therefore multiple parallel interactions would be in the right order
- */
- SimulationManager.scheduleEvent(new AbstractEvent(timestep + p.getResponseTime()) {
- @Override
- public void simulateEvent(long time) {
- ((FloatCollector) p.getOwner()).setFCval(oldValue);
- }
- });
- } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
- && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
- /**
- * Original sensor value
- */
- boolean oldValue = ((BoolSensor) device).getBSval();
- /**
- * Schedule Event to update the sensor
- * -> therefore multiple parallel interactions would be in the right order
- */
- SimulationManager.scheduleEvent(new AbstractEvent(timestep + p.getResponseTime()) {
- @Override
- public void simulateEvent(long time) {
- ((BoolCollector) p.getOwner()).setBCval(oldValue);
- }
- });
- }
- }
- }
- for (Port p : pubSubs) {
- /**
- * Skip unsubscribed Ports
- */
- if (!subbedTopics.get(p).contains(newTopic) || p.getOwner() == null)
- continue;
- /**
- * Delay broker to subscriber
- */
- long delayBrokerToSub = broker.getTransmissionDelayTo(p);
- timestep += broker.getResponseTime();
-
- if(isBoolean) {
- pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Boolean.parseBoolean(newValue));
- }else {
- pubPacket = new MQTTpublishPacket(timestep, broker, p, newTopic, Float.parseFloat(newValue));
- }
- if(valueAnomaly) {
- pubPacket.setLabel((short) 1);
- }
- returnPackets.add(pubPacket);
- if (p.getStatus() != Port.CLOSED && delayBrokerToSub != Long.MAX_VALUE) {
- returnPackets.add(
- new MQTT_packet(MQTT_packet.PUBACK, timestep + p.getResponseTime(), p, broker));
- // Update Collector
- if (device instanceof FloatSensor && p.getOwner() instanceof FloatCollector
- && newTopic.equals(((FloatSensor) device).getFSinfoName())) {
- ((FloatCollector) p.getOwner()).setFCval(((FloatSensor) device).getFSval());
- } else if (device instanceof BoolSensor && p.getOwner() instanceof BoolCollector
- && newTopic.equals(((BoolSensor) device).getBSinfoName())) {
- ((BoolCollector) p.getOwner()).setBCval(((BoolSensor) device).getBSval());
- }
- }
- }
- }
- }
- }
- /**
- * Rare Ping request to broker
- */
- if (Math.random() < 0.05 && port != broker) {
- returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
- /**
- * Delay broker to subscriber
- */
- long delayPortToBroker = port.getTransmissionDelayTo(broker);
- if (broker.getStatus() != Port.CLOSED && delayPortToBroker != Long.MAX_VALUE)
- returnPackets.add(new MQTT_packet(MQTT_packet.PINGRESP,
- timestep + delayPortToBroker + broker.getResponseTime(), broker, port));
- }
- return returnPackets;
- }
- @Override
- public int getNumberOfRoles() {
- return 4;
- }
- @Override
- public String[] getRoles() {
- // PublisherSubscriber is Publisher as well as Subscriber
- return new String[] { "Broker", "PublisherSubscriber", "Publisher", "Subscriber" };
- }
- @Override
- public Collection<Port> getDevicesWithRole(int role) {
- switch (role) {
- case 0:
- return new LinkedList<Port>(Arrays.asList(broker));
- case 1:
- return pubSubs;
- case 2:
- return pubs;
- case 3:
- return subs;
- default:
- return null;
- }
- }
- @Override
- public boolean addDeviceOfRole(Port device, int role) {
- /*
- * First device has to be the Broker
- */
- if (broker == null) {
- if (role == 0) {
- broker = device;
- updateBrokerOnDeletedConnections(null);
- return true;
- } else {
- return false;
- }
- }
- switch (role) {
- case 0:
- // Just one broker allowed.
- return false;
- case 1:
- pubSubs.add(device);
- subbedTopics.putIfAbsent(device, new LinkedList<String>());
- break;
- case 2:
- pubs.add(device);
- break;
- case 3:
- subs.add(device);
- subbedTopics.putIfAbsent(device, new LinkedList<String>());
- break;
- default:
- // invalid role
- return false;
- }
- // Create packets for the connecting Client
- currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size() / 2, device, broker));
- currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size() / 2 + 1, broker, device));
- return true;
- }
- @Override
- public void removeDevice(Port device) {
- /**
- * true if the device was removed
- */
- boolean removedDevice = false;
- if (broker == device) {
- broker = null;
- deletedConnectionLinks
- .add(new Pair<Port, Port>(new Port(device.getOwner(), (short) -1), deletedBrokerPort));
- updateBrokerOnDeletedConnections(device);
- }
- removedDevice |= pubSubs.remove(device);
- removedDevice |= subs.remove(device);
- removedDevice |= pubs.remove(device);
- // Remove Port from topics and clear its list
- LinkedList<String> oldTopics = subbedTopics.remove(device);
- if (oldTopics != null)
- oldTopics.clear();
- if (removedDevice) {
- if (broker == null) {
- deletedConnectionLinks.add(new Pair<Port, Port>(deletedBrokerPort, device));
- } else {
- deletedConnectionLinks.add(new Pair<Port, Port>(broker, device));
- // If not the broker and device was removed -> disconnect
- currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size() / 2, device, broker));
- }
- }
- }
- @Override
- public String getName() {
- return "MQTT";
- }
- @Override
- public int getRoleOfDevice(Port device) {
- if (device == null)
- return -1;
- if (device == broker)
- return 0;
- if (pubSubs.contains(device))
- return 1;
- if (pubs.contains(device))
- return 2;
- if (subs.contains(device))
- return 3;
- return -1;
- }
- @Override
- public Collection<Port> getDevices() {
- LinkedList<Port> returnDevices = new LinkedList<Port>();
- if (broker != null)
- returnDevices.add(broker);
- returnDevices.addAll(pubSubs);
- returnDevices.addAll(pubs);
- returnDevices.addAll(subs);
- return returnDevices;
- }
- @Override
- public byte getTopologyType() {
- return STAR;
- }
- @Override
- public Collection<Pair<Port, Port>> getTopology() {
- LinkedList<Pair<Port, Port>> topology = new LinkedList<Pair<Port, Port>>();
- Port center = broker;
- calcDeletedBrokerPosition();
- if (broker == null)
- center = new Port(deletedBroker, (short) -1);
- for (Port p : pubSubs)
- topology.add(new Pair<Port, Port>(center, p));
- for (Port p : pubs)
- topology.add(new Pair<Port, Port>(center, p));
- for (Port p : subs)
- topology.add(new Pair<Port, Port>(center, p));
- return topology;
- }
- @Override
- public Collection<Pair<Port, Port>> getDeletedTopology() {
- calcDeletedBrokerPosition();
- return deletedConnectionLinks;
- }
- /**
- * Calculate and update the position of the deleted Broker
- */
- private void calcDeletedBrokerPosition() {
- if (broker == null) {
- int x = 0, y = 0, noP = 0;
- for (Port p : getDevices()) {
- if (p != null && p.getOwner() != null) {
- x += p.getOwner().getX();
- y += p.getOwner().getY();
- noP++;
- }
- }
- if (noP == 0)
- return;
- deletedBroker.setX(((int) (x * 1.0) / noP));
- deletedBroker.setY(((int) (y * 1.0) / noP));
- }
- }
- /**
- * Update the broker port on the deleted Connections
- *
- * @param device
- * old broker
- */
- private void updateBrokerOnDeletedConnections(Port device) {
- for (Pair<Port, Port> p : deletedConnectionLinks) {
- if (broker == null) {
- if (p.getLeft() == device)
- p.setLeft(deletedBrokerPort);
- if (p.getRight() == device)
- p.setRight(deletedBrokerPort);
- } else {
- if (p.getLeft() == deletedBrokerPort)
- p.setLeft(broker);
- if (p.getRight() == deletedBrokerPort)
- p.setRight(broker);
- }
- }
- }
- }
|