import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet; import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair; /** * Implementation of the MQTT Protocol to generate packets for the simulation * * * @author Andreas T. Meyer-Berg */ public class MQTT_protocolProject1 implements Protocol { /** * Broker which collects and distributes messages */ private Port broker; /** * Publishers like sensors, which publish data */ private LinkedList pubs; /** * Subscriber which subscribe to different Topics */ private LinkedList subs; /** * Topics subscribed by each Port */ private HashMap> subbedTopics = new HashMap>(); /** * Devices that are Publisher and Subscriber and therefore send and receive * messages */ private LinkedList pubSubs; /** * Packets which are currently being generated or were generated after the * last generatePackets Call */ private LinkedList currentPackets = new LinkedList(); private LinkedList> deletedConnectionLinks = new LinkedList>(); private SmartDevice deletedBroker = new SmartDevice("DeletedBroker"); private Port deletedBrokerPort = new Port(deletedBroker,(short)-1); /** * Topics of the MQTT Broker */ private LinkedList topics = new LinkedList(); /** * Creates a new MQTT Protocol */ public MQTT_protocolProject1() { this.broker = null; initialize(); } /** * Creates a new MQTT Protocol * * @param broker broker of the protocol */ public MQTT_protocolProject1(Port broker) { this.broker = broker; initialize(); } /** * Initializes the different fields */ private void initialize() { topics.add("/home/temperatureHot"); topics.add("/home/doorOpen"); topics.add("/home/lightOn"); subs = new LinkedList(); pubs = new LinkedList(); pubSubs = new LinkedList(); } @Override public Collection generateNextPackets(Port port, long timestep, boolean packetLost) { /** * Packets which will be generated */ LinkedList returnPackets = new LinkedList(); //Update all Timestamps of previous created Packets for(Packet p: currentPackets){ p.setTimestamp(p.getTimestamp()+timestep); } //Add these packets to the return returnPackets.addAll(currentPackets); //remove packets from the old list currentPackets.clear(); //Clear deleted connections deletedConnectionLinks.clear(); //Return termination packets if(port==null)return returnPackets; /** * Update the lastTime the port was triggered */ port.setLastTrigger(timestep); /** * if port null, skip this step */ if(broker==null)return returnPackets; /** * Generate new Packets regarding to their class */ if(port==broker){ //Broker should not send new packages, without new messages //But could perform some Ping request to random participants Collection devices = getDevices(); devices.remove(port); Port dest = null; java.util.Iterator it = devices.iterator(); for(int i=0; i<(Math.random()*devices.size())&&it.hasNext();i++){ dest = it.next(); } if(dest != null){ returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest)); if(dest.getStatus()!=Port.CLOSED) returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, dest, port)); } }else if(subs.contains(port)||pubSubs.contains(port)){ //Subs can either subscribe to topics or unsubscribe /** * Topics, the SmartDevice is subscribed to */ LinkedList tops= subbedTopics.get(port); /** * Topic which should be subscribed or unsubscribed */ String newTopic; //Subscribe if no subscriptions so far or not subscribed to all, with a probability of70% if(tops.size()==0||(tops.size()!=subbedTopics.size()&&Math.random()<0.7)){ //Forced Subscribe newTopic = topics.get((int) Math.floor(Math.random()*topics.size())); returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:"+newTopic)); if(broker.getStatus()!=Port.CLOSED){ timestep+=broker.getResponseTime(); returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port)); tops.add(newTopic); } } else if(tops.size()==subbedTopics.size()){ //Forced Unsubscribe newTopic = tops.get((int) Math.floor(Math.random()*tops.size())); returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:"+newTopic)); if(broker.getStatus()!=Port.CLOSED){ timestep+=broker.getResponseTime(); returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBACK, timestep, broker, port)); tops.remove(newTopic); } } if(pubSubs.contains(port)&&Math.random()<0.3){ //When also Pub, publish sometimes newTopic = topics.get((int) Math.floor(Math.random()*topics.size())); returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "Topic:"+newTopic)); if(broker.getStatus()!=Port.CLOSED){ timestep+=broker.getResponseTime(); returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port)); } } } if(pubs.contains(port)||pubSubs.contains(port)&&Math.random()<0.3){ String newTopic = topics.get((int) Math.floor(Math.random()*topics.size())); /** * Message to be published (Should be further specialized by a Sensor Device) */ String msg = "Topic:"+newTopic+":"+(Math.random()<0.5?"true":"false"); returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, port, broker, msg)); //Response timestep+=broker.getResponseTime(); returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port)); //Publish to Subscribers //Should be be improved to just notify Subs that are subscribed to the topic if(broker.getStatus()!=Port.CLOSED){ for(Port p:subs){ if(!subbedTopics.get(p).contains(newTopic))continue;//Skip unsubbed ports timestep+=broker.getResponseTime(); returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg)); returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker)); } for(Port p:pubSubs){ if(!subbedTopics.get(p).contains(newTopic))continue;//skip unsubbed ports timestep+=broker.getResponseTime(); returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg)); returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker)); } } } if(Math.random() < 0.05 && port != broker){ returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker)); if(port.getStatus()!=Port.CLOSED) returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, broker, port)); } return returnPackets; } @Override public int getNumberOfRoles() { return 4; } @Override public String[] getRoles() { // PublisherSubscriber is Publisher as well as Subscriber return new String[] { "Broker", "PublisherSubscriber", "Publisher", "Subscriber" }; } @Override public Collection getDevicesWithRole(int role) { switch (role) { case 0: return new LinkedList(Arrays.asList(broker)); case 1: return pubSubs; case 2: return pubs; case 3: return subs; default: return null; } } @Override public boolean addDeviceOfRole(Port device, int role) { /* * First device has to be the Broker */ if (broker==null) { if (role == 0) { broker = device; updateBrokerOnDeletedConnections(null); return true; } else { return false; } } switch (role) { case 0: //Just one broker allowed. return false; case 1: pubSubs.add(device); subbedTopics.putIfAbsent(device, new LinkedList()); break; case 2: pubs.add(device); break; case 3: subs.add(device); subbedTopics.putIfAbsent(device, new LinkedList()); break; default: // invalid role return false; } // Create packets for the connecting Client currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size()/2, device, broker)); currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size()/2+1, broker, device)); return true; } @Override public void removeDevice(Port device) { /** * true if the device was removed */ boolean removedDevice=false; if (broker == device){ broker = null; deletedConnectionLinks.add(new Pair(new Port(device.getOwner(), (short)-1), deletedBrokerPort)); updateBrokerOnDeletedConnections(device); } removedDevice|=pubSubs.remove(device); removedDevice|=subs.remove(device); removedDevice|=pubs.remove(device); //Remove Port from topics and clear its list LinkedList oldTopics = subbedTopics.remove(device); if(oldTopics!=null)oldTopics.clear(); if(removedDevice){ if(broker == null){ deletedConnectionLinks.add(new Pair(deletedBrokerPort,device)); }else{ deletedConnectionLinks.add(new Pair(broker, device)); //If not the broker and device was removed -> disconnect currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size()/2, device, broker)); } } } public void addTopic(String topic){ topics.add(topic); } @Override public String getName() { return "MQTT"; } @Override public int getRoleOfDevice(Port device){ if(device == null) return -1; if(device == broker) return 0; if(pubSubs.contains(device)) return 1; if(pubs.contains(device)) return 2; if(subs.contains(device)) return 3; return -1; } @Override public Collection getDevices(){ LinkedList returnDevices = new LinkedList(); if(broker!=null) returnDevices.add(broker); returnDevices.addAll(pubSubs); returnDevices.addAll(pubs); returnDevices.addAll(subs); return returnDevices; } @Override public byte getTopologyType() { return STAR; } @Override public Collection> getTopology(){ LinkedList> topology = new LinkedList>(); Port center = broker; calcDeletedBrokerPosition(); if(broker==null) center = new Port(deletedBroker, (short)-1); for(Port p: pubSubs) topology.add(new Pair(center, p)); for(Port p: pubs) topology.add(new Pair(center, p)); for(Port p: subs) topology.add(new Pair(center, p)); return topology; } @Override public Collection> getDeletedTopology() { calcDeletedBrokerPosition(); return deletedConnectionLinks; } /** * Calculate and update the position of the deleted Broker */ private void calcDeletedBrokerPosition(){ if(broker == null){ int x = 0, y = 0, noP = 0; for(Port p: getDevices()){ if(p!=null && p.getOwner()!=null){ x += p.getOwner().getX(); y += p.getOwner().getY(); noP++; } } if(noP==0) return; deletedBroker.setX(((int)(x*1.0)/noP)); deletedBroker.setY(((int)(y*1.0)/noP)); } } /** * Update the broker port on the deleted Connections * * @param device old broker */ private void updateBrokerOnDeletedConnections(Port device){ for(Pair p: deletedConnectionLinks){ if(broker == null){ if(p.getLeft() == device) p.setLeft(deletedBrokerPort); if(p.getRight() == device) p.setRight(deletedBrokerPort); }else{ if(p.getLeft() == deletedBrokerPort) p.setLeft(broker); if(p.getRight() == deletedBrokerPort) p.setRight(broker); } } } }