package org.ow2.proactive.resourcemanager.frontend;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.objectweb.proactive.Body;
import org.objectweb.proactive.InitActive;
import org.objectweb.proactive.RunActive;
import org.objectweb.proactive.Service;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.core.ProActiveException;
import org.objectweb.proactive.core.UniqueID;
import org.objectweb.proactive.core.body.request.Request;
import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
import org.objectweb.proactive.extensions.annotation.ActiveObject;
import org.ow2.proactive.resourcemanager.authentication.Client;
import org.ow2.proactive.resourcemanager.common.RMConstants;
import org.ow2.proactive.resourcemanager.common.event.RMEvent;
import org.ow2.proactive.resourcemanager.common.event.RMEventType;
import org.ow2.proactive.resourcemanager.common.event.RMInitialState;
import org.ow2.proactive.resourcemanager.common.event.RMNodeEvent;
import org.ow2.proactive.resourcemanager.common.event.RMNodeSourceEvent;
import org.ow2.proactive.resourcemanager.core.RMCore;
import org.ow2.proactive.resourcemanager.core.history.NodeHistory;
import org.ow2.proactive.resourcemanager.core.jmx.RMJMXHelper;
import org.ow2.proactive.resourcemanager.core.properties.PAResourceManagerProperties;
import org.ow2.proactive.resourcemanager.exception.RMException;
import org.ow2.proactive.resourcemanager.utils.AtomicRMStatisticsHolder;

@ActiveObject
/* loaded from: input_file:org/ow2/proactive/resourcemanager/frontend/RMMonitoringImpl.class */
public class RMMonitoringImpl implements RMMonitoring, RMEventListener, InitActive, RunActive {
    private RMCore rmcore;
    private Map<UniqueID, EventDispatcher> dispatchers;
    private transient ExecutorService eventDispatcherThreadPool;
    private static final Logger logger = Logger.getLogger(RMMonitoringImpl.class);
    public static final AtomicRMStatisticsHolder rmStatistics = new AtomicRMStatisticsHolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/ow2/proactive/resourcemanager/frontend/RMMonitoringImpl$EventDispatcher.class */
    public class EventDispatcher implements Runnable {
        protected Client client;
        protected RMEventListener listener;
        protected LinkedList<RMEvent> events;
        protected List<RMEventType> eventTypes;
        protected AtomicBoolean inProcess = new AtomicBoolean(false);

        public EventDispatcher(Client client, RMEventListener rMEventListener, RMEventType[] rMEventTypeArr) {
            this.eventTypes = null;
            this.client = client;
            this.listener = rMEventListener;
            if (rMEventTypeArr != null && rMEventTypeArr.length > 0) {
                this.eventTypes = Arrays.asList(rMEventTypeArr);
            }
            this.events = new LinkedList<>();
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            long currentTimeMillis = System.currentTimeMillis();
            if (RMMonitoringImpl.logger.isDebugEnabled()) {
                RMMonitoringImpl.logger.debug("Initializing " + Thread.currentThread() + " for events delivery to client '" + this.client + "'");
            }
            while (true) {
                RMEvent rMEvent = null;
                synchronized (this.events) {
                    if (RMMonitoringImpl.logger.isDebugEnabled()) {
                        RMMonitoringImpl.logger.debug(this.events.size() + " pending events for the client '" + this.client + "'");
                    }
                    if (this.events.size() > 0) {
                        rMEvent = this.events.removeFirst();
                    } else {
                        this.inProcess.set(false);
                    }
                }
                if (rMEvent == null) {
                    break;
                }
                deliverEvent(rMEvent);
                i++;
            }
            if (RMMonitoringImpl.logger.isDebugEnabled()) {
                RMMonitoringImpl.logger.debug("Finnishing delivery in " + Thread.currentThread() + " to client '" + this.client + "'. " + i + " events were delivered in " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
            }
        }

        private void deliverEvent(RMEvent rMEvent) {
            long currentTimeMillis = System.currentTimeMillis();
            if (RMMonitoringImpl.logger.isDebugEnabled()) {
                RMMonitoringImpl.logger.debug("Dispatching event '" + rMEvent.toString() + "' to client " + this.client);
            }
            try {
                if (rMEvent instanceof RMNodeEvent) {
                    this.listener.nodeEvent((RMNodeEvent) rMEvent);
                } else if (rMEvent instanceof RMNodeSourceEvent) {
                    this.listener.nodeSourceEvent((RMNodeSourceEvent) rMEvent);
                } else {
                    this.listener.rmEvent(rMEvent);
                }
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (RMMonitoringImpl.logger.isDebugEnabled()) {
                    RMMonitoringImpl.logger.debug("Event '" + rMEvent.toString() + "' has been delivered to client " + this.client + " in " + currentTimeMillis2 + " ms");
                }
            } catch (Exception e) {
                RMMonitoringImpl.logger.warn("Cannot send events to " + this.client, e);
                synchronized (RMMonitoringImpl.this.dispatchers) {
                    RMMonitoringImpl.this.dispatchers.remove(this.client.getId());
                    RMMonitoringImpl.logger.warn(this.client + " was removed from listeners");
                }
            }
        }

        public void queueEvent(RMEvent rMEvent) {
            synchronized (this.events) {
                if (this.eventTypes == null || this.eventTypes.contains(rMEvent.getEventType())) {
                    this.events.add(rMEvent);
                    if (!this.inProcess.get()) {
                        this.inProcess.set(true);
                        RMMonitoringImpl.this.eventDispatcherThreadPool.submit(this);
                    } else if (RMMonitoringImpl.logger.isDebugEnabled()) {
                        RMMonitoringImpl.logger.debug("Communication to the client " + this.client + " is in progress in one thread of the thread pool. Either events come too quick or the client is slow. Do not initiate connection from another thread.");
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/ow2/proactive/resourcemanager/frontend/RMMonitoringImpl$GroupEventDispatcher.class */
    private class GroupEventDispatcher extends EventDispatcher {
        public GroupEventDispatcher(Client client, RMEventListener rMEventListener, RMEventType[] rMEventTypeArr) {
            super(client, rMEventListener, rMEventTypeArr);
        }

        @Override // org.ow2.proactive.resourcemanager.frontend.RMMonitoringImpl.EventDispatcher, java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            if (RMMonitoringImpl.logger.isDebugEnabled()) {
                RMMonitoringImpl.logger.debug("Initializing " + Thread.currentThread() + " for events delivery to client '" + this.client + "'");
            }
            LinkedList linkedList = new LinkedList();
            synchronized (this.events) {
                if (RMMonitoringImpl.logger.isDebugEnabled()) {
                    RMMonitoringImpl.logger.debug(this.events.size() + " pending events for the client '" + this.client + "'");
                }
                if (this.events.size() > 0) {
                    linkedList.clear();
                    linkedList.addAll(this.events);
                    this.events.clear();
                }
                this.inProcess.set(false);
            }
            if (linkedList.size() <= 0) {
                RMMonitoringImpl.logger.debug("No events to deliver to client " + this.client);
            } else if (deliverEvents(linkedList) && RMMonitoringImpl.logger.isDebugEnabled()) {
                RMMonitoringImpl.logger.debug("Finnishing delivery in " + Thread.currentThread() + " to client '" + this.client + "'. " + linkedList.size() + " events were delivered in " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
            }
        }

        private boolean deliverEvents(Collection<RMEvent> collection) {
            long currentTimeMillis = System.currentTimeMillis();
            if (RMMonitoringImpl.logger.isDebugEnabled()) {
                Iterator<RMEvent> it = collection.iterator();
                while (it.hasNext()) {
                    RMMonitoringImpl.logger.debug("Dispatching events '" + it.next().toString() + "' to client " + this.client);
                }
            }
            try {
                ((RMGroupEventListener) this.listener).notify(collection);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (RMMonitoringImpl.logger.isDebugEnabled()) {
                    RMMonitoringImpl.logger.debug("Events has been delivered to client " + this.client + " in " + currentTimeMillis2 + " ms");
                }
                return true;
            } catch (Exception e) {
                RMMonitoringImpl.logger.warn("Cannot send events to " + this.client, e);
                synchronized (RMMonitoringImpl.this.dispatchers) {
                    RMMonitoringImpl.this.dispatchers.remove(this.client.getId());
                    RMMonitoringImpl.logger.warn(this.client + " was removed from listeners");
                    return false;
                }
            }
        }
    }

    public RMMonitoringImpl() {
    }

    public RMMonitoringImpl(RMCore rMCore) {
        this.dispatchers = new HashMap();
        this.rmcore = rMCore;
    }

    public void initActivity(Body body) {
        try {
            PAActiveObject.registerByName(PAActiveObject.getStubOnThis(), RMConstants.NAME_ACTIVE_OBJECT_RMMONITORING);
            this.eventDispatcherThreadPool = Executors.newFixedThreadPool(PAResourceManagerProperties.RM_MONITORING_MAX_THREAD_NUMBER.getValueAsInt());
        } catch (ProActiveException e) {
            logger.debug("Cannot register RMMonitoring. Aborting...", e);
            PAActiveObject.terminateActiveObject(true);
        }
    }

    public void runActivity(Body body) {
        Service service = new Service(body);
        while (body.isActive()) {
            Request blockingRemoveOldest = service.blockingRemoveOldest();
            if (blockingRemoveOldest != null) {
                try {
                    service.serve(blockingRemoveOldest);
                } catch (Throwable th) {
                    logger.error("Cannot serve request: " + blockingRemoveOldest, th);
                }
            }
        }
    }

    @Override // org.ow2.proactive.resourcemanager.frontend.RMMonitoring
    public RMInitialState addRMEventListener(RMEventListener rMEventListener, RMEventType... rMEventTypeArr) {
        Client client;
        UniqueID sourceBodyID = PAActiveObject.getContext().getCurrentRequest().getSourceBodyID();
        logger.debug("Adding the RM listner for " + sourceBodyID.shortString());
        synchronized (this.dispatchers) {
            synchronized (RMCore.clients) {
                client = RMCore.clients.get(sourceBodyID);
            }
            if (client == null) {
                throw new IllegalArgumentException("Unknown client " + sourceBodyID.shortString());
            }
            if (rMEventListener instanceof RMGroupEventListener) {
                this.dispatchers.put(sourceBodyID, new GroupEventDispatcher(client, rMEventListener, rMEventTypeArr));
            } else {
                this.dispatchers.put(sourceBodyID, new EventDispatcher(client, rMEventListener, rMEventTypeArr));
            }
        }
        return this.rmcore.getRMInitialState();
    }

    @Override // org.ow2.proactive.resourcemanager.frontend.RMMonitoring
    public void removeRMEventListener() throws RMException {
        UniqueID sourceBodyID = PAActiveObject.getContext().getCurrentRequest().getSourceBodyID();
        synchronized (this.dispatchers) {
            if (!this.dispatchers.containsKey(sourceBodyID)) {
                throw new RMException("Listener is unknown");
            }
            logger.debug("Removing the RM listner for " + sourceBodyID.shortString());
            this.dispatchers.remove(sourceBodyID);
        }
    }

    @Override // org.ow2.proactive.resourcemanager.frontend.RMMonitoring
    @Deprecated
    public boolean isAlive() {
        return true;
    }

    public void queueEvent(RMEvent rMEvent) {
        if (logger.isDebugEnabled()) {
            logger.debug(rMEvent.toString() + " event");
        }
        synchronized (this.dispatchers) {
            Iterator<EventDispatcher> it = this.dispatchers.values().iterator();
            while (it.hasNext()) {
                it.next().queueEvent(rMEvent);
            }
        }
    }

    @Override // org.ow2.proactive.resourcemanager.frontend.RMEventListener
    public void nodeEvent(RMNodeEvent rMNodeEvent) {
        rmStatistics.nodeEvent(rMNodeEvent);
        new NodeHistory(rMNodeEvent).save();
        queueEvent(rMNodeEvent);
    }

    @Override // org.ow2.proactive.resourcemanager.frontend.RMEventListener
    public void nodeSourceEvent(RMNodeSourceEvent rMNodeSourceEvent) {
        queueEvent(rMNodeSourceEvent);
    }

    @Override // org.ow2.proactive.resourcemanager.frontend.RMEventListener
    public void rmEvent(RMEvent rMEvent) {
        rmStatistics.rmEvent(rMEvent);
        queueEvent(rMEvent);
    }

    public BooleanWrapper shutdown() {
        rmEvent(new RMEvent(RMEventType.SHUTDOWN));
        PAActiveObject.terminateActiveObject(false);
        RMJMXHelper.getInstance().shutdown();
        this.eventDispatcherThreadPool.shutdown();
        try {
            this.eventDispatcherThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            logger.warn("", e);
        }
        return new BooleanWrapper(true);
    }

    public Logger getLogger() {
        return logger;
    }

    @Override // org.ow2.proactive.resourcemanager.frontend.RMMonitoring
    public RMInitialState getState() {
        return this.rmcore.getRMInitialState();
    }
}
