package org.ow2.joram.mom.amqp;

import fr.dyade.aaa.agent.Agent;
import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.agent.AgentServer;
import fr.dyade.aaa.agent.Channel;
import fr.dyade.aaa.agent.Notification;
import fr.dyade.aaa.common.Debug;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import org.ow2.joram.mom.amqp.structures.Recover;

/* loaded from: input_file:org/ow2/joram/mom/amqp/AMQPAgent.class */
public class AMQPAgent extends Agent {
    private static final long serialVersionUID = 1;
    public static Logger logger = Debug.getLogger(AMQPAgent.class.getName());
    private static Map<Long, Long> lockers;
    private static Map<Long, Object> responses;
    public static StubAgentIn stubAgentIn;
    public static StubAgentOut stubAgentOut;

    public AMQPAgent() {
        super("AMQPAgent", true, AgentId.AMQPAgentStamp);
    }

    protected void agentInitialize(boolean z) throws Exception {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "AMQPAgent.agentInitialize(" + z + ')');
        }
        super.agentInitialize(z);
        lockers = new HashMap();
        responses = new HashMap();
        stubAgentIn = new StubAgentIn();
        stubAgentOut = new StubAgentOut(60000L);
        if (z) {
            return;
        }
        sendRestart();
    }

    public void agentFinalize(boolean z) {
        super.agentFinalize(z);
    }

    public void react(AgentId agentId, Notification notification) throws Exception {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "AMQPAgent.react(" + agentId + ',' + notification + ')');
        }
        setNoSave();
        if (notification instanceof AMQPRequestNot) {
            StubAgentIn.processRequest(agentId, ((AMQPRequestNot) notification).keyLock, ((AMQPRequestNot) notification).proxyId, ((AMQPRequestNot) notification).obj);
            return;
        }
        if (notification instanceof AMQPResponseNot) {
            StubAgentIn.processResponse(agentId, ((AMQPResponseNot) notification).keyLock, ((AMQPResponseNot) notification).obj);
        } else {
            if (!(notification instanceof RestartNot)) {
                super.react(agentId, notification);
                return;
            }
            short from = agentId.getFrom();
            Iterator<Proxy> it = Naming.getProxies().iterator();
            while (it.hasNext()) {
                it.next().cleanConsumers(from);
            }
            Iterator<Queue> it2 = Naming.getQueues().iterator();
            while (it2.hasNext()) {
                it2.next().cleanConsumers(from);
            }
        }
    }

    public static AgentId getAMQPId(short s) {
        return new AgentId(s, s, AgentId.AMQPAgentStamp);
    }

    public static void sendRequestTo(Object obj, short s, long j, Long l) {
        AMQPRequestNot aMQPRequestNot = new AMQPRequestNot();
        aMQPRequestNot.obj = obj;
        aMQPRequestNot.proxyId = j;
        aMQPRequestNot.keyLock = -1L;
        if (l != null) {
            lockers.put(l, l);
            aMQPRequestNot.keyLock = l.longValue();
        }
        Channel.sendTo(getAMQPId(s), aMQPRequestNot);
    }

    public static void putResponse(AgentId agentId, long j, Object obj) {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "AMQPAgent.putResponse(" + j + ',' + obj + ')');
        }
        if (lockers.containsKey(Long.valueOf(j))) {
            responses.put(Long.valueOf(j), obj);
            Long remove = lockers.remove(Long.valueOf(j));
            synchronized (remove) {
                remove.notify();
            }
            return;
        }
        if (!(obj instanceof Message)) {
            logger.log(BasicLevel.ERROR, "!!!!!!! TODO recover? response", new Exception());
            return;
        }
        Message message = (Message) obj;
        AMQPRequestNot aMQPRequestNot = new AMQPRequestNot();
        ArrayList arrayList = new ArrayList();
        arrayList.add(Long.valueOf(message.queueMsgId));
        aMQPRequestNot.obj = new Recover(message.queueName, arrayList);
        Channel.sendTo(agentId, aMQPRequestNot);
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "AMQPAgent.putResponse recover queueMsgId = " + message.queueMsgId);
        }
    }

    public static Object getResponse(long j) {
        lockers.remove(Long.valueOf(j));
        return responses.remove(Long.valueOf(j));
    }

    private void sendRestart() {
        Enumeration serversIds = AgentServer.getServersIds();
        while (serversIds.hasMoreElements()) {
            Short sh = (Short) serversIds.nextElement();
            if (sh.shortValue() != AgentServer.getServerId()) {
                if (logger.isLoggable(BasicLevel.DEBUG)) {
                    logger.log(BasicLevel.DEBUG, "AMQPAgent.sendRestart notification to " + sh);
                }
                Channel.sendTo(getAMQPId(sh.shortValue()), new RestartNot());
            }
        }
    }
}
