123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421 |
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.HashMap;
- import java.util.LinkedList;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Packet;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Port;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.Protocol;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.SmartDevice;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.protocols.packets.MQTT_packet;
- import de.tu_darmstadt.tk.SmartHomeNetworkSim.core.util.Pair;
- /**
- * Implementation of the MQTT Protocol to generate packets for the simulation
- *
- *
- * @author Andreas T. Meyer-Berg
- */
- public class MQTT_protocolProject1 implements Protocol {
- /**
- * Broker which collects and distributes messages
- */
- private Port broker;
- /**
- * Publishers like sensors, which publish data
- */
- private LinkedList<Port> pubs;
- /**
- * Subscriber which subscribe to different Topics
- */
- private LinkedList<Port> subs;
-
- /**
- * Topics subscribed by each Port
- */
- private HashMap<Port,LinkedList<String>> subbedTopics = new HashMap<Port,LinkedList<String>>();
- /**
- * Devices that are Publisher and Subscriber and therefore send and receive
- * messages
- */
- private LinkedList<Port> pubSubs;
- /**
- * Packets which are currently being generated or were generated after the
- * last generatePackets Call
- */
- private LinkedList<Packet> currentPackets = new LinkedList<Packet>();
-
- private LinkedList<Pair<Port,Port>> deletedConnectionLinks = new LinkedList<Pair<Port,Port>>();
- private SmartDevice deletedBroker = new SmartDevice("DeletedBroker");
- private Port deletedBrokerPort = new Port(deletedBroker,(short)-1);
-
- /**
- * Topics of the MQTT Broker
- */
- private LinkedList<String> topics = new LinkedList<String>();
- /**
- * Creates a new MQTT Protocol
- */
- public MQTT_protocolProject1() {
- this.broker = null;
- initialize();
-
- }
-
- /**
- * Creates a new MQTT Protocol
- *
- * @param broker broker of the protocol
- */
- public MQTT_protocolProject1(Port broker) {
- this.broker = broker;
- initialize();
-
- }
-
- /**
- * Initializes the different fields
- */
- private void initialize() {
- topics.add("/home/temperatureHot");
- topics.add("/home/doorOpen");
- topics.add("/home/lightOn");
- subs = new LinkedList<Port>();
- pubs = new LinkedList<Port>();
- pubSubs = new LinkedList<Port>();
- }
-
- @Override
- public Collection<Packet> generateNextPakets(Port port, long timestep, boolean packetLost) {
- /**
- * Packets which will be generated
- */
- LinkedList<Packet> returnPackets = new LinkedList<Packet>();
- //Update all Timestamps of previous created Packets
- for(Packet p: currentPackets){
- p.setTimestamp(p.getTimestamp()+timestep);
- }
- //Add these packets to the return
- returnPackets.addAll(currentPackets);
- //remove packets from the old list
- currentPackets.clear();
- //Clear deleted connections
- deletedConnectionLinks.clear();
- //Return termination packets
- if(port==null)return returnPackets;
-
- /**
- * Update the lastTime the port was triggered
- */
- port.setLastTrigger(timestep);
- /**
- * if port null, skip this step
- */
- if(broker==null)return returnPackets;
-
- /**
- * Generate new Packets regarding to their class
- */
- if(port==broker){
- //Broker should not send new packages, without new messages
- //But could perform some Ping request to random participants
- Collection<Port> devices = getDevices();
- devices.remove(port);
- Port dest = null;
- java.util.Iterator<Port> it = devices.iterator();
- for(int i=0; i<(Math.random()*devices.size())&&it.hasNext();i++){
- dest = it.next();
- }
- if(dest != null){
- returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, dest));
- if(dest.getStatus()!=Port.CLOSED)
- returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, dest, port));
- }
- }else if(subs.contains(port)||pubSubs.contains(port)){
-
- //Subs can either subscribe to topics or unsubscribe
- /**
- * Topics, the SmartDevice is subscribed to
- */
- LinkedList<String> tops= subbedTopics.get(port);
-
- /**
- * Topic which should be subscribed or unsubscribed
- */
- String newTopic;
- //Subscribe if no subscriptions so far or not subscribed to all, with a probability of70%
- if(tops.size()==0||(tops.size()!=subbedTopics.size()&&Math.random()<0.7)){
- //Forced Subscribe
- newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
- returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
- if(broker.getStatus()!=Port.CLOSED){
- timestep+=broker.getResponseTime();
- returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
- tops.add(newTopic);
- }
- }
- else if(tops.size()==subbedTopics.size()){
- //Forced Unsubscribe
- newTopic = tops.get((int) Math.floor(Math.random()*tops.size()));
- returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBSCRIBE, timestep, port, broker, "topic:"+newTopic));
- if(broker.getStatus()!=Port.CLOSED){
- timestep+=broker.getResponseTime();
- returnPackets.add(new MQTT_packet(MQTT_packet.UNSUBACK, timestep, broker, port));
- tops.remove(newTopic);
- }
- }
-
- if(pubSubs.contains(port)&&Math.random()<0.3){
- //When also Pub, publish sometimes
- newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
- returnPackets.add(new MQTT_packet(MQTT_packet.SUBSCRIBE, timestep, port, broker, "Topic:"+newTopic));
- if(broker.getStatus()!=Port.CLOSED){
- timestep+=broker.getResponseTime();
- returnPackets.add(new MQTT_packet(MQTT_packet.SUBACK, timestep, broker, port));
- }
- }
- }
- if(pubs.contains(port)||pubSubs.contains(port)&&Math.random()<0.3){
- String newTopic = topics.get((int) Math.floor(Math.random()*topics.size()));
- /**
- * Message to be published (Should be further specialized by a Sensor Device)
- */
- String msg = "Topic:"+newTopic+":"+(Math.random()<0.5?"true":"false");
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, port, broker, msg));
- //Response
- timestep+=broker.getResponseTime();
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep, broker, port));
-
- //Publish to Subscribers
- //Should be be improved to just notify Subs that are subscribed to the topic
- if(broker.getStatus()!=Port.CLOSED){
- for(Port p:subs){
- if(!subbedTopics.get(p).contains(newTopic))continue;//Skip unsubbed ports
- timestep+=broker.getResponseTime();
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
-
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
- }
- for(Port p:pubSubs){
- if(!subbedTopics.get(p).contains(newTopic))continue;//skip unsubbed ports
- timestep+=broker.getResponseTime();
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBLISH, timestep, broker, p, msg));
- returnPackets.add(new MQTT_packet(MQTT_packet.PUBACK, timestep+p.getResponseTime(), p, broker));
- }
- }
- }
- if(Math.random() < 0.05 && port != broker){
- returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, port, broker));
- if(port.getStatus()!=Port.CLOSED)
- returnPackets.add(new MQTT_packet(MQTT_packet.PINGREQ, timestep, broker, port));
- }
- return returnPackets;
- }
- @Override
- public int getNumberOfRoles() {
- return 4;
- }
- @Override
- public String[] getRoles() {
- // PublisherSubscriber is Publisher as well as Subscriber
- return new String[] { "Broker", "PublisherSubscriber", "Publisher", "Subscriber" };
- }
- @Override
- public Collection<Port> getDevicesWithRole(int role) {
- switch (role) {
- case 0:
- return new LinkedList<Port>(Arrays.asList(broker));
- case 1:
- return pubSubs;
- case 2:
- return pubs;
- case 3:
- return subs;
- default:
- return null;
- }
- }
- @Override
- public boolean addDeviceOfRole(Port device, int role) {
- /*
- * First device has to be the Broker
- */
- if (broker==null) {
- if (role == 0) {
- broker = device;
- updateBrokerOnDeletedConnections(null);
- return true;
- } else {
- return false;
- }
- }
- switch (role) {
- case 0:
- //Just one broker allowed.
- return false;
- case 1:
- pubSubs.add(device);
- subbedTopics.putIfAbsent(device, new LinkedList<String>());
- break;
- case 2:
- pubs.add(device);
- break;
- case 3:
- subs.add(device);
- subbedTopics.putIfAbsent(device, new LinkedList<String>());
- break;
- default:
- // invalid role
- return false;
- }
- // Create packets for the connecting Client
- currentPackets.add(new MQTT_packet(MQTT_packet.CONNECT, currentPackets.size()/2, device, broker));
- currentPackets.add(new MQTT_packet(MQTT_packet.CONNACK, currentPackets.size()/2+1, broker, device));
- return true;
- }
- @Override
- public void removeDevice(Port device) {
- /**
- * true if the device was removed
- */
- boolean removedDevice=false;
- if (broker == device){
- broker = null;
- deletedConnectionLinks.add(new Pair<Port, Port>(new Port(device.getOwner(), (short)-1), deletedBrokerPort));
- updateBrokerOnDeletedConnections(device);
- }
- removedDevice|=pubSubs.remove(device);
- removedDevice|=subs.remove(device);
- removedDevice|=pubs.remove(device);
- //Remove Port from topics and clear its list
- LinkedList<String> oldTopics = subbedTopics.remove(device);
- if(oldTopics!=null)oldTopics.clear();
- if(removedDevice){
- if(broker == null){
- deletedConnectionLinks.add(new Pair<Port, Port>(deletedBrokerPort,device));
- }else{
- deletedConnectionLinks.add(new Pair<Port, Port>(broker, device));
- //If not the broker and device was removed -> disconnect
- currentPackets.add(new MQTT_packet(MQTT_packet.DISCONNECT, currentPackets.size()/2, device, broker));
- }
- }
- }
-
- public void addTopic(String topic){
- topics.add(topic);
- }
- @Override
- public String getName() {
- return "MQTT";
- }
-
- @Override
- public int getRoleOfDevice(Port device){
- if(device == null)
- return -1;
- if(device == broker)
- return 0;
- if(pubSubs.contains(device))
- return 1;
- if(pubs.contains(device))
- return 2;
- if(subs.contains(device))
- return 3;
- return -1;
- }
-
- @Override
- public Collection<Port> getDevices(){
- LinkedList<Port> returnDevices = new LinkedList<Port>();
- if(broker!=null)
- returnDevices.add(broker);
- returnDevices.addAll(pubSubs);
- returnDevices.addAll(pubs);
- returnDevices.addAll(subs);
- return returnDevices;
- }
-
- @Override
- public byte getTopologyType() {
- return STAR;
- }
-
- @Override
- public Collection<Pair<Port,Port>> getTopology(){
- LinkedList<Pair<Port,Port>> topology = new LinkedList<Pair<Port,Port>>();
- Port center = broker;
- calcDeletedBrokerPosition();
- if(broker==null)
- center = new Port(deletedBroker, (short)-1);
- for(Port p: pubSubs)
- topology.add(new Pair<Port, Port>(center, p));
- for(Port p: pubs)
- topology.add(new Pair<Port, Port>(center, p));
- for(Port p: subs)
- topology.add(new Pair<Port, Port>(center, p));
- return topology;
- }
- @Override
- public Collection<Pair<Port, Port>> getDeletedTopology() {
- calcDeletedBrokerPosition();
- return deletedConnectionLinks;
- }
-
- /**
- * Calculate and update the position of the deleted Broker
- */
- private void calcDeletedBrokerPosition(){
- if(broker == null){
- int x = 0, y = 0, noP = 0;
- for(Port p: getDevices()){
- if(p!=null && p.getOwner()!=null){
- x += p.getOwner().getX();
- y += p.getOwner().getY();
- noP++;
- }
- }
- if(noP==0)
- return;
- deletedBroker.setX(((int)(x*1.0)/noP));
- deletedBroker.setY(((int)(y*1.0)/noP));
- }
- }
-
- /**
- * Update the broker port on the deleted Connections
- *
- * @param device old broker
- */
- private void updateBrokerOnDeletedConnections(Port device){
- for(Pair<Port, Port> p: deletedConnectionLinks){
- if(broker == null){
- if(p.getLeft() == device)
- p.setLeft(deletedBrokerPort);
- if(p.getRight() == device)
- p.setRight(deletedBrokerPort);
- }else{
- if(p.getLeft() == deletedBrokerPort)
- p.setLeft(broker);
- if(p.getRight() == deletedBrokerPort)
- p.setRight(broker);
-
- }
- }
- }
- }
|