package eu.play_project.dcep.distributedetalis;

import com.hp.hpl.jena.sparql.core.DatasetGraph;
import com.hp.hpl.jena.sparql.core.DatasetGraphFactory;
import eu.play_project.dcep.constants.DcepConstants;
import eu.play_project.dcep.distributedetalis.api.EcConnectionManager;
import eu.play_project.dcep.distributedetalis.api.EcConnectionmanagerException;
import eu.play_project.dcep.distributedetalis.join.SelectResults;
import eu.play_project.dcep.distributedetalis.listeners.EcConnectionListenerRest;
import eu.play_project.dcep.distributedetalis.listeners.EcConnectionListenerWsn;
import eu.play_project.dcep.distributedetalis.persistence.Persistence;
import eu.play_project.dcep.distributedetalis.persistence.PersistenceException;
import eu.play_project.dcep.distributedetalis.persistence.Sqlite;
import eu.play_project.dcep.distributedetalis.utils.EventCloudHelpers;
import eu.play_project.play_commons.constants.Stream;
import eu.play_project.play_eventadapter.AbstractReceiverRest;
import eu.play_project.play_eventadapter.AbstractSenderRest;
import eu.play_project.play_platformservices.api.BdplQuery;
import fr.inria.eventcloud.api.CompoundEvent;
import fr.inria.eventcloud.api.PublishSubscribeConstants;
import fr.inria.eventcloud.api.Quadruple;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.xml.namespace.QName;
import org.apache.jena.riot.RDFDataMgr;
import org.apache.jena.riot.RDFFormat;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.glassfish.jersey.moxy.json.MoxyJsonFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.petalslink.dsb.commons.service.api.Service;
import org.petalslink.dsb.notification.service.NotificationConsumerService;
import org.petalslink.dsb.soap.CXFExposer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/play_project/dcep/distributedetalis/EcConnectionManagerWsn.class */
public abstract class EcConnectionManagerWsn implements EcConnectionManager {
    private final Map<String, SubscriptionUsage> subscriptions = new HashMap();
    private final Logger logger = LoggerFactory.getLogger(EcConnectionManagerWsn.class);
    protected boolean init = false;
    private AbstractReceiverRest rdfReceiver;
    private AbstractSenderRest rdfSender;
    private final DistributedEtalis dEtalis;
    private EcConnectionListenerWsn dsbListener;
    private EcConnectionListenerRest dsbRestListener;
    static final Properties constants = DcepConstants.getProperties();
    private static final String SOAP_URI = constants.getProperty("dcep.notify.endpoint");
    private static final String REST_URI = constants.getProperty("dcep.notify.rest");
    private Service notifyReceiverSoap;
    private Server notifyReceiverRest;
    private Persistence persistence;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/play_project/dcep/distributedetalis/EcConnectionManagerWsn$SubscriptionUsage.class */
    public class SubscriptionUsage implements Serializable {
        private static final long serialVersionUID = 100;
        public String sub;
        public int usage = 1;

        public SubscriptionUsage(String str) {
            this.sub = str;
        }
    }

    public EcConnectionManagerWsn(DistributedEtalis distributedEtalis) {
        this.dEtalis = distributedEtalis;
    }

    public void init() throws EcConnectionmanagerException {
        if (this.init) {
            throw new IllegalStateException(getClass().getSimpleName() + " has ALREADY been initialized.");
        }
        this.logger.info("Initialising {}.", getClass().getSimpleName());
        this.rdfReceiver = new AbstractReceiverRest() { // from class: eu.play_project.dcep.distributedetalis.EcConnectionManagerWsn.1
        };
        this.rdfSender = new AbstractSenderRest(Stream.FacebookCepResults.getTopicQName()) { // from class: eu.play_project.dcep.distributedetalis.EcConnectionManagerWsn.2
        };
        try {
            this.dsbListener = new EcConnectionListenerWsn(this.rdfReceiver);
            this.dsbListener.setDetalis(this.dEtalis);
            QName qName = new QName("http://docs.oasis-open.org/wsn/bw-2", "NotificationConsumer");
            QName qName2 = new QName("http://docs.oasis-open.org/wsn/bw-2", "NotificationConsumerService");
            QName qName3 = new QName("http://docs.oasis-open.org/wsn/bw-2", "NotificationConsumerPort");
            String property = constants.getProperty("dcep.notify.endpoint.local");
            this.logger.info("Exposing SOAP notification endpoint at: {} which should be reachable at {}.", property, SOAP_URI);
            this.notifyReceiverSoap = new CXFExposer().expose(new NotificationConsumerService(qName, qName2, qName3, "NotificationConsumerService.wsdl", property, this.dsbListener));
            this.notifyReceiverSoap.start();
            try {
                this.dsbRestListener = new EcConnectionListenerRest(this.rdfReceiver);
                this.dsbRestListener.setDetalis(this.dEtalis);
                ResourceConfig register = new ResourceConfig().register(MoxyJsonFeature.class).register(this.dsbRestListener);
                String property2 = constants.getProperty("dcep.notify.rest.local");
                this.logger.info("Exposing REST notification endpoint at: {} which should be reachable at {}.", property2, REST_URI);
                this.notifyReceiverRest = new Server(URI.create(property2).getPort());
                ServletContextHandler servletContextHandler = new ServletContextHandler();
                servletContextHandler.setContextPath("/");
                servletContextHandler.addServlet(new ServletHolder(new ServletContainer(register)), "/");
                this.notifyReceiverRest.setHandler(servletContextHandler);
                this.notifyReceiverRest.start();
                try {
                    List topics = this.rdfReceiver.getTopics();
                    if (topics.isEmpty()) {
                        this.logger.warn("No topics were found in DSB, possible misconfiguration of event adapters.");
                    } else {
                        Iterator it = topics.iterator();
                        while (it.hasNext()) {
                            this.logger.info("Topic on the DSB: {}", (String) it.next());
                        }
                    }
                    try {
                        this.persistence = new Sqlite();
                        for (Sqlite.SubscriptionPerCloud subscriptionPerCloud : this.persistence.getSubscriptions()) {
                            this.logger.info("Cleaning stale subscription from cloud {}: {}", subscriptionPerCloud.cloudId, subscriptionPerCloud.subscriptionId);
                            try {
                                this.rdfReceiver.unsubscribe(subscriptionPerCloud.subscriptionId);
                            } catch (Exception e) {
                                this.logger.debug(e.getMessage());
                            }
                        }
                        this.persistence.deleteAllSubscriptions();
                        this.init = true;
                    } catch (PersistenceException e2) {
                        throw new EcConnectionmanagerException(e2.getMessage(), e2);
                    }
                } catch (Exception e3) {
                    throw new EcConnectionmanagerException("Error while checking the DSB.", e3);
                }
            } catch (Exception e4) {
                throw new EcConnectionmanagerException("Error while starting DSB listener (REST service).", e4);
            }
        } catch (Exception e5) {
            throw new EcConnectionmanagerException("Error while starting DSB listener (SOAP service).", e5);
        }
    }

    @Override // eu.play_project.dcep.distributedetalis.api.EcConnectionManager
    public void destroy() {
        this.logger.info("Terminating {}.", getClass().getSimpleName());
        this.logger.info("Unsubscribe from Topics");
        this.rdfReceiver.unsubscribeAll();
        this.subscriptions.clear();
        this.persistence.deleteAllSubscriptions();
        if (this.notifyReceiverSoap != null) {
            this.notifyReceiverSoap.stop();
        }
        if (this.notifyReceiverRest != null) {
            try {
                this.notifyReceiverRest.stop();
            } catch (Exception e) {
                this.logger.error("Exception while stoppping REST server. Nothing we can do now. {}", e.getMessage());
            }
            this.notifyReceiverRest.destroy();
        }
        this.init = false;
    }

    @Override // eu.play_project.dcep.distributedetalis.api.EcConnectionManager
    public abstract void putDataInCloud(CompoundEvent compoundEvent, String str);

    @Override // eu.play_project.dcep.distributedetalis.api.EcConnectionManager
    public abstract SelectResults getDataFromCloud(String str, String str2) throws EcConnectionmanagerException;

    @Override // eu.play_project.dcep.distributedetalis.api.EcConnectionManager
    public void publish(CompoundEvent compoundEvent) {
        if (!this.init) {
            throw new IllegalStateException(getClass().getSimpleName() + " has not been initialized.");
        }
        String cloudId = EventCloudHelpers.getCloudId(compoundEvent);
        if (cloudId.isEmpty()) {
            this.logger.warn("DCEP Failed Exit Got empty cloud ID from event '{}', don't know which cloud to publish to. Discarding complex event.", compoundEvent.getGraph() + "#event");
            return;
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        RDFDataMgr.write(byteArrayOutputStream, quadruplesToDatasetGraph(compoundEvent), RDFFormat.TRIG_BLOCKS);
        this.logger.info("DCEP Exit " + compoundEvent.getGraph() + " " + EventCloudHelpers.getMembers(compoundEvent));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("DCEP Complex Event:\n{}", compoundEvent.toString());
        }
        this.rdfSender.notify(new String(byteArrayOutputStream.toByteArray()), cloudId);
        putDataInCloud(compoundEvent, cloudId);
    }

    @Override // eu.play_project.dcep.distributedetalis.api.EcConnectionManager
    public void registerEventPattern(BdplQuery bdplQuery) throws EcConnectionmanagerException {
        if (!this.init) {
            throw new IllegalStateException(getClass().getSimpleName() + " has not been initialized.");
        }
        Iterator it = bdplQuery.getDetails().getInputStreams().iterator();
        while (it.hasNext()) {
            subscribe((String) it.next());
        }
    }

    @Override // eu.play_project.dcep.distributedetalis.api.EcConnectionManager
    public void unregisterEventPattern(BdplQuery bdplQuery) {
        for (String str : bdplQuery.getDetails().getInputStreams()) {
            SubscriptionUsage subscriptionUsage = this.subscriptions.get(str);
            if (subscriptionUsage != null) {
                unsubscribe(str, subscriptionUsage.sub);
            }
        }
    }

    private void subscribe(String str) throws EcConnectionmanagerException {
        if (!this.init) {
            throw new IllegalStateException(getClass().getSimpleName() + " has not been initialized.");
        }
        if (this.subscriptions.containsKey(str)) {
            this.logger.info("Still subscribed to topic {}.", str);
            this.subscriptions.get(str).usage++;
        } else {
            this.logger.info("Subscribing to topic {}.", str);
            String subscribe = this.rdfReceiver.subscribe(str, SOAP_URI);
            this.subscriptions.put(str, new SubscriptionUsage(subscribe));
            this.persistence.storeSubscription(str, subscribe);
        }
    }

    private void unsubscribe(String str, String str2) {
        if (!this.init) {
            throw new IllegalStateException(getClass().getSimpleName() + " has not been initialized.");
        }
        if (this.subscriptions.containsKey(str)) {
            this.subscriptions.get(str).usage--;
            if (this.subscriptions.get(str).usage != 0) {
                this.logger.info("Still subscribed to topic {}.", str);
                return;
            }
            this.logger.info("Unsubscribing from topic {}.", str);
            this.rdfReceiver.unsubscribe(str2);
            this.subscriptions.remove(str);
        }
    }

    private static DatasetGraph quadruplesToDatasetGraph(CompoundEvent compoundEvent) {
        DatasetGraph createMem = DatasetGraphFactory.createMem();
        Iterator it = compoundEvent.iterator();
        while (it.hasNext()) {
            Quadruple quadruple = (Quadruple) it.next();
            if (quadruple.getPredicate() != PublishSubscribeConstants.EVENT_NB_QUADRUPLES_NODE) {
                createMem.add(quadruple.getGraph(), quadruple.getSubject(), quadruple.getPredicate(), quadruple.getObject());
            }
        }
        return createMem;
    }
}
