package org.ow2.joram.mom.amqp;

import fr.dyade.aaa.agent.AgentServer;
import fr.dyade.aaa.common.Daemon;
import fr.dyade.aaa.common.Debug;
import fr.dyade.aaa.common.StoppedQueueException;
import fr.dyade.aaa.util.Transaction;
import fr.dyade.aaa.util.management.MXWrapper;
import java.io.IOException;
import java.rmi.AlreadyBoundException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import org.ow2.joram.mom.amqp.ChannelContext;
import org.ow2.joram.mom.amqp.exceptions.AMQPException;
import org.ow2.joram.mom.amqp.exceptions.AccessRefusedException;
import org.ow2.joram.mom.amqp.exceptions.ChannelException;
import org.ow2.joram.mom.amqp.exceptions.CommandInvalidException;
import org.ow2.joram.mom.amqp.exceptions.InternalErrorException;
import org.ow2.joram.mom.amqp.exceptions.NotAllowedException;
import org.ow2.joram.mom.amqp.exceptions.NotFoundException;
import org.ow2.joram.mom.amqp.exceptions.NotImplementedException;
import org.ow2.joram.mom.amqp.exceptions.PreconditionFailedException;
import org.ow2.joram.mom.amqp.exceptions.ResourceLockedException;
import org.ow2.joram.mom.amqp.exceptions.SyntaxErrorException;
import org.ow2.joram.mom.amqp.exceptions.TransactionException;
import org.ow2.joram.mom.amqp.marshalling.AMQP;
import org.ow2.joram.mom.amqp.marshalling.AbstractMarshallingMethod;
import org.ow2.joram.mom.amqp.structures.Ack;
import org.ow2.joram.mom.amqp.structures.Cancel;
import org.ow2.joram.mom.amqp.structures.Deliver;
import org.ow2.joram.mom.amqp.structures.GetDeliveries;
import org.ow2.joram.mom.amqp.structures.GetResponse;
import org.ow2.joram.mom.amqp.structures.Recover;
import org.ow2.joram.mom.amqp.structures.Returned;

/* loaded from: input_file:org/ow2/joram/mom/amqp/Proxy.class */
public class Proxy implements DeliveryListener, ProxyMBean {
    public static final String PREFIX_PX = "AMQPPx";
    private ProxyName name;
    private fr.dyade.aaa.common.Queue queueIn;
    private java.util.Queue queueOut;
    private NetServerIn netServerIn;
    private Transaction transaction;
    private Set<QueueShell> exclusiveQueues = new HashSet();
    private Map<Integer, ChannelContext> channelContexts = new HashMap();
    public static Logger logger = Debug.getLogger(Proxy.class.getName());
    private static volatile long proxyId = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/ow2/joram/mom/amqp/Proxy$NetServerIn.class */
    public final class NetServerIn extends Daemon {
        protected NetServerIn(String str) {
            super(str);
        }

        public void run() {
            if (Proxy.logger.isLoggable(BasicLevel.DEBUG)) {
                Proxy.logger.log(BasicLevel.DEBUG, "Proxy.run()");
            }
            while (this.running) {
                try {
                    this.canStop = true;
                    Object andPop = Proxy.this.queueIn.getAndPop();
                    this.canStop = false;
                    if (Proxy.logger.isLoggable(BasicLevel.DEBUG)) {
                        Proxy.logger.log(BasicLevel.DEBUG, "Proxy: object on queue : " + andPop.getClass().getName());
                    }
                    if (andPop instanceof AbstractMarshallingMethod) {
                        AbstractMarshallingMethod abstractMarshallingMethod = (AbstractMarshallingMethod) andPop;
                        try {
                            Proxy.this.doProcessMethod(abstractMarshallingMethod);
                        } catch (AMQPException e) {
                            if (Proxy.logger.isLoggable(BasicLevel.DEBUG)) {
                                Proxy.logger.log(BasicLevel.DEBUG, "Proxy: AMQP error: " + e.getMessage());
                            }
                            Proxy.this.throwException(e, abstractMarshallingMethod.channelNumber, abstractMarshallingMethod.getClassId(), abstractMarshallingMethod.getMethodId());
                        }
                    } else if (andPop instanceof PublishRequest) {
                        PublishRequest publishRequest = (PublishRequest) andPop;
                        try {
                            Proxy.this.basicPublish(publishRequest);
                        } catch (AMQPException e2) {
                            if (Proxy.logger.isLoggable(BasicLevel.DEBUG)) {
                                Proxy.logger.log(BasicLevel.DEBUG, "Proxy: AMQP error: " + e2.getMessage());
                            }
                            Proxy.this.throwException(e2, publishRequest.channel, publishRequest.getPublish().getClassId(), publishRequest.getPublish().getMethodId());
                        }
                    } else if (andPop instanceof GetDeliveries) {
                        Proxy.this.getDeliveries((GetDeliveries) andPop);
                    } else if (Proxy.logger.isLoggable(BasicLevel.ERROR)) {
                        Proxy.logger.log(BasicLevel.ERROR, "Proxy: UNEXPECTED OBJECT CLASS: " + andPop.getClass().getName());
                    }
                    try {
                        Proxy.this.commitTx();
                    } catch (TransactionException e3) {
                        if (andPop instanceof AbstractMarshallingMethod) {
                            AbstractMarshallingMethod abstractMarshallingMethod2 = (AbstractMarshallingMethod) andPop;
                            Proxy.this.throwException(e3, abstractMarshallingMethod2.channelNumber, abstractMarshallingMethod2.getClassId(), abstractMarshallingMethod2.getMethodId());
                        } else if (andPop instanceof PublishRequest) {
                            PublishRequest publishRequest2 = (PublishRequest) andPop;
                            Proxy.this.throwException(e3, publishRequest2.channel, publishRequest2.getPublish().getClassId(), publishRequest2.getPublish().getMethodId());
                        }
                    }
                } catch (InterruptedException e4) {
                    if (Proxy.logger.isLoggable(BasicLevel.ERROR)) {
                        Proxy.logger.log(BasicLevel.ERROR, "Proxy: error ", e4);
                    }
                } catch (Exception e5) {
                    if (Proxy.logger.isLoggable(BasicLevel.ERROR)) {
                        Proxy.logger.log(BasicLevel.ERROR, "Proxy: error ", e5);
                    }
                }
            }
        }

        protected void close() {
        }

        protected void shutdown() {
            Proxy.this.queueIn.close();
        }
    }

    private static synchronized long getNextProxyId() {
        long j = proxyId;
        proxyId = j + 1;
        return j;
    }

    public Proxy(fr.dyade.aaa.common.Queue queue, java.util.Queue queue2) throws IOException {
        this.queueIn = null;
        this.queueOut = null;
        this.transaction = null;
        if (AgentServer.getTransaction().isPersistent()) {
            loadProxyId();
        }
        this.name = new ProxyName(AgentServer.getServerId(), getNextProxyId());
        if (AgentServer.getTransaction().isPersistent()) {
            saveProxyId();
        }
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "<Proxy> = " + this.name);
        }
        try {
            Naming.bindProxy(this.name, this);
        } catch (AlreadyBoundException e) {
            if (logger.isLoggable(BasicLevel.ERROR)) {
                logger.log(BasicLevel.ERROR, "Name already bound, should never happen.", e);
            }
        }
        this.queueIn = queue;
        this.queueOut = queue2;
        this.netServerIn = new NetServerIn(this.name.toString());
        this.transaction = AgentServer.getTransaction();
    }

    public void loadProxyId() throws IOException {
        try {
            Long l = (Long) AgentServer.getTransaction().load(PREFIX_PX);
            if (l != null) {
                proxyId = l.longValue();
            }
        } catch (ClassNotFoundException e) {
            if (logger.isLoggable(BasicLevel.ERROR)) {
                logger.log(BasicLevel.ERROR, "loadProxyId", e);
            }
        }
    }

    public void saveProxyId() throws IOException {
        AgentServer.getTransaction().create(new Long(proxyId), PREFIX_PX);
        AgentServer.getTransaction().begin();
        AgentServer.getTransaction().commit(true);
    }

    protected void doProcessMethod(AbstractMarshallingMethod abstractMarshallingMethod) throws AMQPException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "+ doProcess marshallingMethod = " + abstractMarshallingMethod);
        }
        int i = abstractMarshallingMethod.channelNumber;
        switch (abstractMarshallingMethod.getClassId()) {
            case 10:
                if (abstractMarshallingMethod.getMethodId() != AMQP.Connection.Close.INDEX) {
                    throw new IllegalStateException();
                }
                connectionClose();
                try {
                    send(new AMQP.Connection.CloseOk());
                    return;
                } catch (StoppedQueueException e) {
                    return;
                }
            case 20:
                if (abstractMarshallingMethod.getMethodId() != 40) {
                    throw new IllegalStateException();
                }
                channelClose(i);
                AMQP.Channel.CloseOk closeOk = new AMQP.Channel.CloseOk();
                closeOk.channelNumber = i;
                send(closeOk);
                return;
            case 40:
                switch (abstractMarshallingMethod.getMethodId()) {
                    case 10:
                        AMQP.Exchange.Declare declare = (AMQP.Exchange.Declare) abstractMarshallingMethod;
                        exchangeDeclare(declare);
                        if (declare.noWait) {
                            return;
                        }
                        AMQP.Exchange.DeclareOk declareOk = new AMQP.Exchange.DeclareOk();
                        declareOk.channelNumber = i;
                        send(declareOk);
                        return;
                    case 20:
                        AMQP.Exchange.Delete delete = (AMQP.Exchange.Delete) abstractMarshallingMethod;
                        exchangeDelete(delete);
                        if (delete.noWait) {
                            return;
                        }
                        AMQP.Exchange.DeleteOk deleteOk = new AMQP.Exchange.DeleteOk();
                        deleteOk.channelNumber = i;
                        send(deleteOk);
                        return;
                    default:
                        return;
                }
            case 50:
                switch (abstractMarshallingMethod.getMethodId()) {
                    case 10:
                        AMQP.Queue.Declare declare2 = (AMQP.Queue.Declare) abstractMarshallingMethod;
                        AMQP.Queue.DeclareOk queueDeclare = queueDeclare(declare2);
                        if (declare2.noWait) {
                            return;
                        }
                        queueDeclare.channelNumber = i;
                        send(queueDeclare);
                        return;
                    case 20:
                        AMQP.Queue.Bind bind = (AMQP.Queue.Bind) abstractMarshallingMethod;
                        queueBind(bind);
                        if (bind.noWait) {
                            return;
                        }
                        AMQP.Queue.BindOk bindOk = new AMQP.Queue.BindOk();
                        bindOk.channelNumber = i;
                        send(bindOk);
                        return;
                    case 30:
                        AMQP.Queue.Purge purge = (AMQP.Queue.Purge) abstractMarshallingMethod;
                        AMQP.Queue.PurgeOk queuePurge = queuePurge(purge);
                        if (purge.noWait) {
                            return;
                        }
                        queuePurge.channelNumber = i;
                        send(queuePurge);
                        return;
                    case 40:
                        AMQP.Queue.Delete delete2 = (AMQP.Queue.Delete) abstractMarshallingMethod;
                        AMQP.Queue.DeleteOk queueDelete = queueDelete(delete2);
                        if (delete2.noWait) {
                            return;
                        }
                        queueDelete.channelNumber = i;
                        send(queueDelete);
                        return;
                    case 50:
                        queueUnbind((AMQP.Queue.Unbind) abstractMarshallingMethod);
                        AMQP.Queue.UnbindOk unbindOk = new AMQP.Queue.UnbindOk();
                        unbindOk.channelNumber = i;
                        send(unbindOk);
                        return;
                    default:
                        return;
                }
            case 60:
                switch (abstractMarshallingMethod.getMethodId()) {
                    case 10:
                        basicQoS((AMQP.Basic.Qos) abstractMarshallingMethod);
                        AMQP.Basic.QosOk qosOk = new AMQP.Basic.QosOk();
                        qosOk.channelNumber = i;
                        send(qosOk);
                        return;
                    case 20:
                        AMQP.Basic.Consume consume = (AMQP.Basic.Consume) abstractMarshallingMethod;
                        if (logger.isLoggable(BasicLevel.DEBUG)) {
                            logger.log(BasicLevel.DEBUG, "consume = " + consume);
                        }
                        basicConsume(consume);
                        return;
                    case 30:
                        AMQP.Basic.Cancel cancel = (AMQP.Basic.Cancel) abstractMarshallingMethod;
                        if (logger.isLoggable(BasicLevel.DEBUG)) {
                            logger.log(BasicLevel.DEBUG, "cancel consumerTag = " + cancel.consumerTag + " nowait = " + cancel.noWait);
                        }
                        basicCancel(cancel.consumerTag, i);
                        if (cancel.noWait) {
                            return;
                        }
                        AMQP.Basic.CancelOk cancelOk = new AMQP.Basic.CancelOk(cancel.consumerTag);
                        cancelOk.channelNumber = i;
                        send(cancelOk);
                        return;
                    case AMQP.Basic.Get.INDEX /* 70 */:
                        GetResponse basicGet = basicGet((AMQP.Basic.Get) abstractMarshallingMethod);
                        if (basicGet != null) {
                            basicGet.getOk.channelNumber = i;
                            send(basicGet);
                            return;
                        } else {
                            AMQP.Basic.GetEmpty getEmpty = new AMQP.Basic.GetEmpty();
                            getEmpty.channelNumber = i;
                            send(getEmpty);
                            return;
                        }
                    case AMQP.Basic.Ack.INDEX /* 80 */:
                        AMQP.Basic.Ack ack = (AMQP.Basic.Ack) abstractMarshallingMethod;
                        if (logger.isLoggable(BasicLevel.DEBUG)) {
                            logger.log(BasicLevel.DEBUG, "ACK = " + ack);
                        }
                        basicAck(ack);
                        return;
                    case 90:
                        basicReject((AMQP.Basic.Reject) abstractMarshallingMethod);
                        return;
                    case AMQP.Basic.RecoverAsync.INDEX /* 100 */:
                        basicRecover(((AMQP.Basic.RecoverAsync) abstractMarshallingMethod).requeue, i);
                        return;
                    case AMQP.Basic.Recover.INDEX /* 110 */:
                        basicRecover(((AMQP.Basic.Recover) abstractMarshallingMethod).requeue, i);
                        AMQP.Basic.RecoverOk recoverOk = new AMQP.Basic.RecoverOk();
                        recoverOk.channelNumber = i;
                        send(recoverOk);
                        return;
                    default:
                        return;
                }
            case 90:
                switch (abstractMarshallingMethod.getMethodId()) {
                    case 10:
                        getContext(i).transacted = true;
                        AMQP.Tx.SelectOk selectOk = new AMQP.Tx.SelectOk();
                        selectOk.channelNumber = i;
                        send(selectOk);
                        return;
                    case 20:
                        txCommit(i);
                        AMQP.Tx.CommitOk commitOk = new AMQP.Tx.CommitOk();
                        commitOk.channelNumber = i;
                        send(commitOk);
                        return;
                    case 30:
                        txRollback(i);
                        AMQP.Tx.RollbackOk rollbackOk = new AMQP.Tx.RollbackOk();
                        rollbackOk.channelNumber = i;
                        send(rollbackOk);
                        return;
                    default:
                        return;
                }
            default:
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void throwException(AMQPException aMQPException, int i, int i2, int i3) throws AMQPException {
        if (!(aMQPException instanceof ChannelException)) {
            connectionClose();
            send(new AMQP.Connection.Close(aMQPException.getCode(), aMQPException.getMessage(), i2, i3));
        } else {
            channelClose(i);
            AMQP.Channel.Close close = new AMQP.Channel.Close(aMQPException.getCode(), aMQPException.getMessage(), i2, i3);
            close.channelNumber = i;
            send(close);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitTx() throws TransactionException {
        try {
            if (logger.isLoggable(BasicLevel.DEBUG)) {
                logger.log(BasicLevel.DEBUG, "Proxy.commitTx: phase = " + this.transaction.getPhaseInfo());
            }
            this.transaction.begin();
            this.transaction.commit(true);
        } catch (IOException e) {
            if (logger.isLoggable(BasicLevel.ERROR)) {
                logger.log(BasicLevel.ERROR, "Proxy.commitTx() ERROR::", e);
            }
            throw new TransactionException(e.getMessage());
        }
    }

    private ChannelContext getContext(int i) {
        ChannelContext channelContext = this.channelContexts.get(Integer.valueOf(i));
        if (channelContext == null) {
            channelContext = new ChannelContext();
            this.channelContexts.put(new Integer(i), channelContext);
        }
        return channelContext;
    }

    public synchronized void cleanConsumers(short s) throws AMQPException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.cleanConsumers(" + ((int) s) + ')');
        }
        for (Integer num : this.channelContexts.keySet()) {
            int intValue = num.intValue();
            ChannelContext channelContext = this.channelContexts.get(num);
            if (channelContext.consumerQueues != null) {
                for (QueueShell queueShell : channelContext.consumerQueues.values()) {
                    if (!queueShell.islocal() && Naming.resolveServerId(queueShell.getName()) == s) {
                        throwException(new InternalErrorException("server " + ((int) s) + " restart."), intValue, -1, -1);
                    }
                }
            }
        }
    }

    public void basicAck(AMQP.Basic.Ack ack) throws PreconditionFailedException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.basicAck(" + ack.deliveryTag + ", " + ack.channelNumber + ')');
        }
        ChannelContext context = getContext(ack.channelNumber);
        Iterator<ChannelContext.Delivery> it = context.deliveriesToAck.iterator();
        if (ack.multiple) {
            HashMap hashMap = new HashMap();
            while (it.hasNext()) {
                ChannelContext.Delivery next = it.next();
                if (next.deliveryTag > ack.deliveryTag && ack.deliveryTag != 0) {
                    if (next.deliveryTag > ack.deliveryTag) {
                        break;
                    }
                } else {
                    if (context.transacted) {
                        if (!next.waitingCommit) {
                            next.waitingCommit = true;
                        }
                    }
                    List list = (List) hashMap.get(next.queue);
                    if (list == null) {
                        list = new ArrayList();
                        hashMap.put(next.queue, list);
                    }
                    list.add(new Long(next.queueMsgId));
                    if (!context.transacted) {
                        it.remove();
                    }
                }
            }
            if (hashMap.size() == 0) {
                throw new PreconditionFailedException("Acknowledgement error: invalid tag '" + ack.deliveryTag + "'.");
            }
            if (context.transacted) {
                return;
            }
            for (QueueShell queueShell : hashMap.keySet()) {
                if (queueShell.islocal()) {
                    queueShell.getReference().ackMessages((List) hashMap.get(queueShell));
                } else {
                    StubAgentOut.asyncSend(new Ack(queueShell.getName(), (List) hashMap.get(queueShell)), Naming.resolveServerId(queueShell.getName()));
                }
            }
            if (context.prefetchCount != 0) {
                this.queueIn.push(new GetDeliveries(null, ack.channelNumber));
                return;
            }
            return;
        }
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ChannelContext.Delivery next2 = it.next();
            if (next2.deliveryTag == ack.deliveryTag) {
                if (!context.transacted) {
                    ArrayList arrayList = new ArrayList(1);
                    arrayList.add(new Long(next2.queueMsgId));
                    it.remove();
                    if (next2.queue.islocal()) {
                        next2.queue.getReference().ackMessages(arrayList);
                    } else {
                        StubAgentOut.asyncSend(new Ack(next2.queue.getName(), arrayList), Naming.resolveServerId(next2.queue.getName()));
                    }
                    if (context.prefetchCount != 0) {
                        this.queueIn.push(new GetDeliveries(null, ack.channelNumber));
                        return;
                    }
                    return;
                }
                if (!next2.waitingCommit) {
                    next2.waitingCommit = true;
                    return;
                }
            }
        }
        throw new PreconditionFailedException("Acknowledgement error: invalid tag '" + ack.deliveryTag + "'.");
    }

    public void basicCancel(String str, int i) throws AMQPException, ResourceLockedException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.basicCancel(" + str + ", " + i + ')');
        }
        QueueShell remove = getContext(i).consumerQueues.remove(str);
        if (remove == null) {
            return;
        }
        doCancel(str, i, remove);
    }

    private void doCancel(String str, int i, QueueShell queueShell) throws ResourceLockedException, NotFoundException, PreconditionFailedException, AMQPException {
        if (!queueShell.islocal()) {
            Boolean bool = (Boolean) StubAgentOut.syncSend(new Cancel(str, queueShell.getName(), i), Naming.resolveServerId(queueShell.getName()), this.name.proxyId);
            if (bool == null || bool.booleanValue()) {
                cleanQueueContext(i, queueShell);
                return;
            }
            return;
        }
        Queue reference = queueShell.getReference();
        if (reference != null) {
            reference.cancel(str, i, this.name.serverId, this.name.proxyId);
            if (reference.getConsumerCount() == 0 && reference.isAutodelete()) {
                if (logger.isLoggable(BasicLevel.DEBUG)) {
                    logger.log(BasicLevel.DEBUG, "Proxy: no more consumers -> autodelete");
                }
                AMQP.Queue.Delete delete = new AMQP.Queue.Delete(0, reference.getName(), true, false, true);
                delete.channelNumber = i;
                queueDelete(delete);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void getDeliveries(GetDeliveries getDeliveries) {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.getDeliveries(" + getDeliveries.consumerTag + ')');
        }
        ChannelContext channelContext = this.channelContexts.get(Integer.valueOf(getDeliveries.channelId));
        if (channelContext == null) {
            return;
        }
        int size = channelContext.prefetchCount > 0 ? channelContext.prefetchCount - channelContext.deliveriesToAck.size() : -1;
        if (size == 0) {
            return;
        }
        if (getDeliveries.consumerTag != null) {
            QueueShell queueShell = channelContext.consumerQueues.get(getDeliveries.consumerTag);
            if (queueShell != null) {
                doGetDeliveries(getDeliveries.consumerTag, getDeliveries.channelId, size, queueShell.getReference());
                return;
            }
            return;
        }
        String[] strArr = (String[]) channelContext.consumerQueues.keySet().toArray(new String[channelContext.consumerQueues.size()]);
        for (int i = 0; i < strArr.length && size != 0; i++) {
            String str = strArr[i];
            size -= doGetDeliveries(str, getDeliveries.channelId, size, channelContext.consumerQueues.get(str).getReference());
        }
    }

    private int doGetDeliveries(String str, int i, int i2, Queue queue) {
        List<Deliver> deliveries = queue.getDeliveries(str, i, i2, this.name.serverId, this.name.proxyId);
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.doGetDeliveries(" + deliveries + ')');
        }
        if (deliveries == null) {
            return 0;
        }
        Iterator<Deliver> it = deliveries.iterator();
        while (it.hasNext()) {
            send(it.next(), new QueueShell(queue));
        }
        return deliveries.size();
    }

    public void basicConsume(AMQP.Basic.Consume consume) throws NotFoundException, NotAllowedException, AMQPException, AccessRefusedException, ResourceLockedException {
        QueueShell queueShell;
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.basicConsume(" + consume + ')');
        }
        String str = consume.queue;
        if (str == null || str.equals(IExchange.DEFAULT_EXCHANGE_NAME)) {
            throw new NotAllowedException("Consuming from unspecified queue.");
        }
        ChannelContext context = getContext(consume.channelNumber);
        String str2 = consume.consumerTag;
        if (str2.equals(IExchange.DEFAULT_EXCHANGE_NAME)) {
            context.consumerTagCounter++;
            str2 = "genTag-" + context.consumerTagCounter;
        }
        if (context.consumerQueues.get(str2) != null) {
            throw new NotAllowedException("Consume request failed due to non-unique tag: '" + str2 + "'.");
        }
        if (Naming.isLocal(str)) {
            StubLocal.basicConsume(this, consume.queue, str2, consume.exclusive, consume.noAck, consume.noLocal, consume.channelNumber, this.name.serverId, this.name.proxyId);
            queueShell = new QueueShell(Naming.lookupQueue(str));
        } else {
            queueShell = new QueueShell(consume.queue);
            consume.consumerTag = str2;
            StubAgentOut.asyncSend(consume, Naming.resolveServerId(consume.queue), this.name.proxyId);
        }
        if (!consume.noWait) {
            AMQP.Basic.ConsumeOk consumeOk = new AMQP.Basic.ConsumeOk(str2);
            consumeOk.channelNumber = consume.channelNumber;
            send(consumeOk);
        }
        context.consumerQueues.put(str2, queueShell);
        this.queueIn.push(new GetDeliveries(str2, consume.channelNumber));
    }

    public GetResponse basicGet(AMQP.Basic.Get get) throws NotFoundException, AMQPException, SyntaxErrorException, ResourceLockedException {
        Message message;
        QueueShell queueShell;
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.basicGet(" + get + ')');
        }
        String str = get.queue;
        ChannelContext context = getContext(get.channelNumber);
        if (str.equals(IExchange.DEFAULT_EXCHANGE_NAME)) {
            str = context.lastQueueCreated;
            if (str == null) {
                throw new SyntaxErrorException("No queue previously declared on the channel.");
            }
        }
        if (Naming.isLocal(str)) {
            message = StubLocal.basicGet(str, get.noAck, this.name.serverId, this.name.proxyId);
            queueShell = new QueueShell(Naming.lookupQueue(str));
        } else {
            get.queue = str;
            message = (Message) StubAgentOut.syncSend(get, Naming.resolveServerId(get.queue));
            queueShell = new QueueShell(get.queue);
        }
        if (message == null) {
            return null;
        }
        long nextDeliveryTag = context.nextDeliveryTag();
        AMQP.Basic.GetOk getOk = new AMQP.Basic.GetOk(nextDeliveryTag, message.redelivered, message.exchange, message.routingKey, message.queueSize);
        if (!get.noAck) {
            context.deliveriesToAck.add(new ChannelContext.Delivery(nextDeliveryTag, message.queueMsgId, queueShell));
        }
        return new GetResponse(getOk, message.properties, message.body);
    }

    public void basicPublish(PublishRequest publishRequest) throws NotFoundException, TransactionException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.basicPublish(" + publishRequest + ')');
        }
        ChannelContext context = getContext(publishRequest.channel);
        if (context.transacted) {
            if (Naming.lookupExchange(publishRequest.getPublish().exchange) == null) {
                throw new NotFoundException("Can't publish on an unknwon exchange: '" + publishRequest.getPublish().exchange + "'.");
            }
            context.pubToCommit.add(publishRequest);
        } else {
            if (!Naming.isLocal(publishRequest.getPublish().exchange)) {
                StubAgentOut.asyncSend(publishRequest, Naming.resolveServerId(publishRequest.getPublish().exchange), this.name.proxyId);
                return;
            }
            try {
                AgentServer.getTransaction().begin();
                try {
                    StubLocal.basicPublish(publishRequest, this.name.serverId, this.name.proxyId);
                } catch (AMQPException e) {
                    AMQP.Basic.Return r0 = new AMQP.Basic.Return(e.getCode(), e.getMessage(), publishRequest.getPublish().exchange, publishRequest.getPublish().routingKey);
                    r0.channelNumber = publishRequest.channel;
                    send(new Returned(r0, publishRequest.getHeader(), publishRequest.getBody()));
                }
                try {
                    AgentServer.getTransaction().commit(true);
                } catch (IOException e2) {
                    throw new TransactionException(e2.getMessage());
                }
            } catch (IOException e3) {
                throw new TransactionException(e3.getMessage());
            }
        }
    }

    public void basicRecover(boolean z, int i) throws TransactionException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.basicRecover(" + z + ", " + i + ')');
        }
        ChannelContext context = getContext(i);
        HashMap hashMap = new HashMap();
        Iterator<ChannelContext.Delivery> it = context.deliveriesToAck.iterator();
        while (it.hasNext()) {
            ChannelContext.Delivery next = it.next();
            if (!next.waitingCommit) {
                List list = (List) hashMap.get(next.queue);
                if (list == null) {
                    list = new ArrayList();
                    hashMap.put(next.queue, list);
                }
                list.add(Long.valueOf(next.queueMsgId));
                it.remove();
            }
        }
        if (hashMap.size() > 0 && context.prefetchCount != 0) {
            this.queueIn.push(new GetDeliveries(null, i));
        }
        for (QueueShell queueShell : hashMap.keySet()) {
            if (queueShell.islocal()) {
                queueShell.getReference().recoverMessages((List) hashMap.get(queueShell));
            } else {
                StubAgentOut.asyncSend(new Recover(queueShell.getName(), (List) hashMap.get(queueShell)), Naming.resolveServerId(queueShell.getName()));
            }
        }
    }

    public void basicReject(AMQP.Basic.Reject reject) throws TransactionException, PreconditionFailedException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.basicReject(" + reject + ')');
        }
        Iterator<ChannelContext.Delivery> it = getContext(reject.channelNumber).deliveriesToAck.iterator();
        while (it.hasNext()) {
            ChannelContext.Delivery next = it.next();
            QueueShell queueShell = next.queue;
            if (next.deliveryTag == reject.deliveryTag) {
                ArrayList arrayList = new ArrayList(1);
                arrayList.add(Long.valueOf(next.queueMsgId));
                if (reject.requeue) {
                    if (queueShell.islocal()) {
                        queueShell.getReference().recoverMessages(arrayList);
                    } else {
                        StubAgentOut.asyncSend(new Recover(queueShell.getName(), arrayList), Naming.resolveServerId(queueShell.getName()));
                    }
                } else if (queueShell.islocal()) {
                    queueShell.getReference().ackMessages(arrayList);
                } else {
                    StubAgentOut.asyncSend(new Ack(queueShell.getName(), arrayList), Naming.resolveServerId(queueShell.getName()));
                }
                it.remove();
                return;
            }
            if (next.deliveryTag > reject.deliveryTag) {
                break;
            }
        }
        throw new PreconditionFailedException("Reject error: invalid tag '" + reject.deliveryTag + "'.");
    }

    public void basicQoS(AMQP.Basic.Qos qos) throws NotImplementedException {
        if (qos.global) {
            throw new NotImplementedException("Global Qos prefetch not implemented.");
        }
        if (qos.prefetchSize != 0) {
            throw new NotImplementedException("Qos prefetch size currently not implemented.");
        }
        ChannelContext context = getContext(qos.channelNumber);
        if (qos.prefetchCount > context.prefetchCount || qos.prefetchCount == 0) {
            this.queueIn.push(new GetDeliveries(null, qos.channelNumber));
        }
        context.prefetchCount = qos.prefetchCount;
    }

    public void channelClose(int i) throws AMQPException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.channelClose(" + i + ")");
        }
        ChannelContext channelContext = this.channelContexts.get(Integer.valueOf(i));
        if (channelContext != null) {
            Iterator<Map.Entry<String, QueueShell>> it = channelContext.consumerQueues.entrySet().iterator();
            while (it.hasNext()) {
                try {
                    Map.Entry<String, QueueShell> next = it.next();
                    it.remove();
                    doCancel(next.getKey(), i, next.getValue());
                } catch (ResourceLockedException e) {
                }
            }
            if (channelContext.transacted) {
                txRollback(i);
            }
        }
        basicRecover(true, i);
        this.channelContexts.remove(Integer.valueOf(i));
    }

    public void connectionClose() {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.connectionClose()");
        }
        Integer[] numArr = (Integer[]) this.channelContexts.keySet().toArray(new Integer[this.channelContexts.size()]);
        for (int i = 0; i < numArr.length; i++) {
            try {
                channelClose(numArr[i].intValue());
            } catch (AMQPException e) {
                if (logger.isLoggable(BasicLevel.WARN)) {
                    logger.log(BasicLevel.WARN, "Error while cleaning channel " + numArr[i], e);
                }
            }
        }
        for (QueueShell queueShell : (QueueShell[]) this.exclusiveQueues.toArray(new QueueShell[this.exclusiveQueues.size()])) {
            try {
                if (queueShell.islocal()) {
                    queueDelete(new AMQP.Queue.Delete(0, queueShell.getReference().getName(), false, false, true));
                } else {
                    queueDelete(new AMQP.Queue.Delete(0, queueShell.getName(), false, false, true));
                }
            } catch (AMQPException e2) {
                if (logger.isLoggable(BasicLevel.WARN)) {
                    logger.log(BasicLevel.WARN, "Error while cleaning exclusive queue " + queueShell, e2);
                }
            }
        }
        this.exclusiveQueues.clear();
        stop();
    }

    public void exchangeDeclare(AMQP.Exchange.Declare declare) throws CommandInvalidException, NotAllowedException, NotFoundException, AMQPException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.exchangeDeclare(" + declare + ')');
        }
        if (Naming.isLocal(declare.exchange)) {
            StubLocal.exchangeDeclare(declare.exchange, declare.type, declare.durable, declare.passive);
        } else {
            StubAgentOut.syncSend(declare, Naming.resolveServerId(declare.exchange));
        }
    }

    public void exchangeDelete(AMQP.Exchange.Delete delete) throws NotFoundException, PreconditionFailedException, AMQPException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.exchangeDelete(" + delete + ')');
        }
        if (Naming.isLocal(delete.exchange)) {
            StubLocal.exchangeDelete(delete.exchange, delete.ifUnused);
        } else {
            StubAgentOut.syncSend(delete, Naming.resolveServerId(delete.exchange));
        }
    }

    public void queueBind(AMQP.Queue.Bind bind) throws NotFoundException, SyntaxErrorException, ResourceLockedException, AMQPException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.queueBind(" + bind + ')');
        }
        String str = bind.queue;
        if (str.equals(IExchange.DEFAULT_EXCHANGE_NAME)) {
            str = getContext(bind.channelNumber).lastQueueCreated;
            if (str == null) {
                throw new SyntaxErrorException("No queue previously declared on the channel.");
            }
            if (bind.routingKey.equals(IExchange.DEFAULT_EXCHANGE_NAME)) {
                bind.routingKey = str;
            }
        }
        if (Naming.isLocal(bind.exchange)) {
            StubLocal.queueBind(str, bind.exchange, bind.routingKey, bind.arguments, this.name.serverId, this.name.proxyId);
        } else {
            bind.queue = Naming.getGlobalName(str);
            StubAgentOut.syncSend(bind, Naming.resolveServerId(bind.exchange));
        }
    }

    public AMQP.Queue.DeclareOk queueDeclare(AMQP.Queue.Declare declare) throws NotFoundException, ResourceLockedException, AMQPException {
        AMQP.Queue.DeclareOk declareOk;
        QueueShell queueShell;
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.queueDeclare(" + declare + ')');
        }
        if (Naming.isLocal(declare.queue)) {
            declareOk = StubLocal.queueDeclare(declare.queue, declare.passive, declare.durable, declare.autoDelete, declare.exclusive, this.name.serverId, this.name.proxyId);
            queueShell = new QueueShell(Naming.lookupQueue(declareOk.queue));
        } else {
            declareOk = (AMQP.Queue.DeclareOk) StubAgentOut.syncSend(declare, Naming.resolveServerId(declare.queue));
            queueShell = new QueueShell(declareOk.queue);
        }
        if (!declare.passive && declare.exclusive) {
            this.exclusiveQueues.add(queueShell);
        }
        getContext(declare.channelNumber).lastQueueCreated = declareOk.queue;
        return declareOk;
    }

    public AMQP.Queue.DeleteOk queueDelete(AMQP.Queue.Delete delete) throws NotFoundException, PreconditionFailedException, ResourceLockedException, AMQPException {
        QueueShell queueShell;
        AMQP.Queue.DeleteOk deleteOk;
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.queueDelete(" + delete + ')');
        }
        if (Naming.isLocal(delete.queue)) {
            queueShell = new QueueShell(Naming.lookupQueue(delete.queue));
            deleteOk = new AMQP.Queue.DeleteOk(StubLocal.queueDelete(delete.queue, delete.ifUnused, delete.ifEmpty, this.name.serverId, this.name.proxyId));
        } else {
            queueShell = new QueueShell(delete.queue);
            deleteOk = (AMQP.Queue.DeleteOk) StubAgentOut.syncSend(delete, Naming.resolveServerId(delete.queue));
        }
        cleanQueueContext(delete.channelNumber, queueShell);
        return deleteOk;
    }

    private void cleanQueueContext(int i, QueueShell queueShell) {
        ChannelContext context = getContext(i);
        Iterator<ChannelContext.Delivery> it = context.deliveriesToAck.iterator();
        while (it.hasNext()) {
            if (it.next().queue.equals(queueShell)) {
                it.remove();
            }
        }
        this.exclusiveQueues.remove(queueShell);
        Iterator<QueueShell> it2 = context.consumerQueues.values().iterator();
        while (it2.hasNext()) {
            if (it2.next().equals(queueShell)) {
                it2.remove();
            }
        }
    }

    public AMQP.Queue.PurgeOk queuePurge(AMQP.Queue.Purge purge) throws NotFoundException, NotAllowedException, ResourceLockedException, SyntaxErrorException, AMQPException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.queuepurge(" + purge + ')');
        }
        if (purge.queue == null || purge.queue.equals(IExchange.DEFAULT_EXCHANGE_NAME)) {
            throw new NotAllowedException("Purging unspecified queue.");
        }
        return Naming.isLocal(purge.queue) ? new AMQP.Queue.PurgeOk(StubLocal.queuePurge(purge.queue, this.name.serverId, this.name.proxyId)) : (AMQP.Queue.PurgeOk) StubAgentOut.syncSend(purge, Naming.resolveServerId(purge.queue));
    }

    public void queueUnbind(AMQP.Queue.Unbind unbind) throws NotFoundException, AMQPException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.queueUnbind(" + unbind + ')');
        }
        if (Naming.isLocal(unbind.exchange)) {
            StubLocal.queueUnbind(unbind.exchange, unbind.queue, unbind.routingKey, unbind.arguments, this.name.serverId, this.name.proxyId);
        } else {
            unbind.queue = Naming.getGlobalName(unbind.queue);
            StubAgentOut.syncSend(unbind, Naming.resolveServerId(unbind.exchange));
        }
    }

    public void txCommit(int i) throws PreconditionFailedException, TransactionException {
        ChannelContext context = getContext(i);
        if (!context.transacted) {
            throw new PreconditionFailedException("Can't commit a non-transacted channel.");
        }
        try {
            AgentServer.getTransaction().begin();
            for (PublishRequest publishRequest : context.pubToCommit) {
                if (Naming.isLocal(publishRequest.getPublish().exchange)) {
                    try {
                        StubLocal.basicPublish(publishRequest, this.name.serverId, this.name.proxyId);
                    } catch (AMQPException e) {
                        AMQP.Basic.Return r0 = new AMQP.Basic.Return(e.getCode(), e.getMessage(), publishRequest.getPublish().exchange, publishRequest.getPublish().routingKey);
                        r0.channelNumber = publishRequest.channel;
                        send(new Returned(r0, publishRequest.getHeader(), publishRequest.getBody()));
                    }
                } else {
                    StubAgentOut.asyncSend(publishRequest, Naming.resolveServerId(publishRequest.getPublish().exchange), this.name.proxyId);
                }
            }
            context.pubToCommit.clear();
            HashMap hashMap = new HashMap();
            Iterator<ChannelContext.Delivery> it = context.deliveriesToAck.iterator();
            while (it.hasNext()) {
                ChannelContext.Delivery next = it.next();
                if (next.waitingCommit) {
                    List list = (List) hashMap.get(next.queue);
                    if (list == null) {
                        list = new ArrayList();
                        hashMap.put(next.queue, list);
                    }
                    list.add(new Long(next.queueMsgId));
                    it.remove();
                }
            }
            for (QueueShell queueShell : hashMap.keySet()) {
                if (queueShell.islocal()) {
                    queueShell.getReference().ackMessages((List) hashMap.get(queueShell));
                } else {
                    StubAgentOut.asyncSend(new Ack(queueShell.getName(), (List) hashMap.get(queueShell)), Naming.resolveServerId(queueShell.getName()));
                }
            }
            try {
                AgentServer.getTransaction().commit(true);
                if (context.prefetchCount != 0) {
                    this.queueIn.push(new GetDeliveries(null, i));
                }
            } catch (IOException e2) {
                throw new TransactionException(e2.getMessage());
            }
        } catch (IOException e3) {
            throw new TransactionException(e3.getMessage());
        }
    }

    public void txRollback(int i) throws PreconditionFailedException {
        ChannelContext context = getContext(i);
        if (!context.transacted) {
            throw new PreconditionFailedException("Can't rollback a non-transacted channel.");
        }
        context.pubToCommit.clear();
        Iterator<ChannelContext.Delivery> it = context.deliveriesToAck.iterator();
        while (it.hasNext()) {
            it.next().waitingCommit = false;
        }
    }

    @Override // org.ow2.joram.mom.amqp.DeliveryListener
    public boolean deliver(String str, int i, Queue queue, short s, long j) {
        ChannelContext context = getContext(i);
        if (context.prefetchCount > 0 && context.deliveriesToAck.size() == context.prefetchCount) {
            return false;
        }
        this.queueIn.push(new GetDeliveries(str, i));
        return true;
    }

    public void send(AbstractMarshallingMethod abstractMarshallingMethod) {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.send(" + abstractMarshallingMethod + ")");
        }
        this.queueOut.add(abstractMarshallingMethod);
    }

    public void send(GetResponse getResponse) {
        this.queueOut.add(getResponse);
    }

    public void send(Deliver deliver, QueueShell queueShell) {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Proxy.send(" + deliver.msgId + ", " + queueShell + ')');
        }
        ChannelContext context = getContext(deliver.deliver.channelNumber);
        long nextDeliveryTag = context.nextDeliveryTag();
        long j = deliver.deliver.deliveryTag;
        deliver.deliver.deliveryTag = nextDeliveryTag;
        if (!deliver.noAck) {
            context.deliveriesToAck.add(new ChannelContext.Delivery(nextDeliveryTag, j, queueShell));
        }
        try {
            this.queueOut.add(deliver);
        } catch (Exception e) {
            if (logger.isLoggable(BasicLevel.ERROR)) {
                logger.log(BasicLevel.ERROR, "Proxy.send ERROR", e);
            }
        }
    }

    public void send(Returned returned) {
        this.queueOut.add(returned);
    }

    public void stop() {
        this.netServerIn.stop();
        try {
            MXWrapper.unregisterMBean("AMQP", "type=Proxy,name=" + this.name);
        } catch (Exception e) {
            logger.log(BasicLevel.DEBUG, "Error unregistering MBean.", e);
        }
    }

    public void start() {
        this.netServerIn.start();
        try {
            MXWrapper.registerMBean(this, "AMQP", "type=Proxy,name=" + this.name);
        } catch (Exception e) {
            logger.log(BasicLevel.DEBUG, "Error registering MBean.", e);
        }
    }

    @Override // org.ow2.joram.mom.amqp.ProxyMBean
    public int getQueueInSize() {
        return this.queueIn.size();
    }

    @Override // org.ow2.joram.mom.amqp.ProxyMBean
    public int getQueueOutSize() {
        return this.queueOut.size();
    }

    @Override // org.ow2.joram.mom.amqp.ProxyMBean
    public Set<String> getExclusiveQueues() {
        HashSet hashSet = new HashSet(this.exclusiveQueues.size());
        Iterator<QueueShell> it = this.exclusiveQueues.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getName());
        }
        return hashSet;
    }

    @Override // org.ow2.joram.mom.amqp.ProxyMBean
    public Integer[] getOpenedChannels() {
        return (Integer[]) this.channelContexts.keySet().toArray(new Integer[this.channelContexts.size()]);
    }
}
