package org.objectweb.joram.mom.dest.amqp;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.impl.LongString;
import fr.dyade.aaa.common.Daemon;
import fr.dyade.aaa.common.Debug;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.objectweb.joram.mom.dest.AcquisitionDaemon;
import org.objectweb.joram.mom.dest.ReliableTransmitter;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;

/* loaded from: input_file:org/objectweb/joram/mom/dest/amqp/AmqpAcquisition.class */
public class AmqpAcquisition implements AcquisitionDaemon {
    private static final Logger logger = Debug.getLogger(AmqpAcquisition.class.getName());
    private static final String QUEUE_NAME_PROP = "amqp.QueueName";
    private static final String UPDATE_PERIOD_PROP = "amqp.ConnectionUpdatePeriod";
    private static final String ROUTING_PROP = "amqp.Routing";
    private static ConnectionUpdater connectionUpdater;
    private ReliableTransmitter transmitter;
    private Map<String, Channel> channels = new Hashtable();
    private List<String> connectionNames = null;
    private String amqpQueue = null;
    private volatile boolean closing = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/objectweb/joram/mom/dest/amqp/AmqpAcquisition$AmqpConsumer.class */
    public class AmqpConsumer extends DefaultConsumer {
        private String name;

        public AmqpConsumer(Channel channel, String str) {
            super(channel);
            this.name = str;
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            Message message = new Message();
            message.body = bArr;
            try {
                message.type = Byte.parseByte(basicProperties.getType());
            } catch (NumberFormatException e) {
                if (AmqpAcquisition.logger.isLoggable(BasicLevel.WARN)) {
                    AmqpAcquisition.logger.log(BasicLevel.WARN, "Message Type field could not be parsed.", e);
                }
                message.type = 5;
            }
            message.correlationId = basicProperties.getCorrelationId();
            Integer deliveryMode = basicProperties.getDeliveryMode();
            if (deliveryMode != null) {
                if (deliveryMode.intValue() == 1) {
                    message.persistent = false;
                } else if (deliveryMode.intValue() == 2) {
                    message.persistent = true;
                }
            }
            if (basicProperties.getPriority() != null) {
                message.priority = basicProperties.getPriority().intValue();
            }
            if (basicProperties.getTimestamp() != null) {
                message.timestamp = basicProperties.getTimestamp().getTime();
            }
            try {
                if (basicProperties.getExpiration() != null) {
                    message.expiration = Long.parseLong(basicProperties.getExpiration());
                }
            } catch (NumberFormatException e2) {
                if (AmqpAcquisition.logger.isLoggable(BasicLevel.WARN)) {
                    AmqpAcquisition.logger.log(BasicLevel.WARN, "Expiration field could not be parsed.", e2);
                }
            }
            if (basicProperties.getHeaders() != null) {
                for (Map.Entry<String, Object> entry : basicProperties.getHeaders().entrySet()) {
                    try {
                        if (entry.getValue() instanceof LongString) {
                            message.setProperty(entry.getKey(), entry.getValue().toString());
                        } else {
                            message.setProperty(entry.getKey(), entry.getValue());
                        }
                    } catch (ClassCastException e3) {
                        if (AmqpAcquisition.logger.isLoggable(BasicLevel.ERROR)) {
                            AmqpAcquisition.logger.log(BasicLevel.ERROR, "Property can't be mapped to JMS message property.", e3);
                        }
                    }
                }
            }
            if (AmqpAcquisition.logger.isLoggable(BasicLevel.DEBUG)) {
                AmqpAcquisition.logger.log(BasicLevel.DEBUG, this.name + ": New incoming message : " + message);
            }
            AmqpAcquisition.this.transmitter.transmit(message, basicProperties.getMessageId());
            getChannel().basicAck(envelope.getDeliveryTag(), false);
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
            if (AmqpAcquisition.logger.isLoggable(BasicLevel.DEBUG)) {
                AmqpAcquisition.logger.log(BasicLevel.DEBUG, this.name + ": Consumer error for connection " + getChannel().getConnection());
            }
            if (AmqpAcquisition.this.closing) {
                return;
            }
            AmqpAcquisition.this.channels.remove(this.name);
        }
    }

    /* loaded from: input_file:org/objectweb/joram/mom/dest/amqp/AmqpAcquisition$ConnectionUpdater.class */
    private static class ConnectionUpdater extends Daemon {
        private List<AmqpAcquisition> listeners;
        private long period;

        protected ConnectionUpdater(long j) {
            super("AMQP_ConnectionUpdater", AmqpAcquisition.logger);
            this.listeners = new ArrayList();
            setDaemon(false);
            this.period = j;
            if (this.logmon.isLoggable(BasicLevel.DEBUG)) {
                this.logmon.log(BasicLevel.DEBUG, "ConnectionUpdater<init>");
            }
        }

        public void run() {
            if (this.logmon.isLoggable(BasicLevel.DEBUG)) {
                this.logmon.log(BasicLevel.DEBUG, "run()");
            }
            boolean z = true;
            while (this.running) {
                try {
                    if (this.logmon.isLoggable(BasicLevel.DEBUG)) {
                        this.logmon.log(BasicLevel.DEBUG, "update connections in " + this.period + " ms");
                    }
                    this.canStop = true;
                    if (z) {
                        z = false;
                    } else {
                        try {
                            Thread.sleep(this.period);
                        } catch (InterruptedException e) {
                            if (this.logmon.isLoggable(BasicLevel.DEBUG)) {
                                this.logmon.log(BasicLevel.DEBUG, "Thread interrupted.");
                            }
                        }
                    }
                    this.canStop = false;
                    if (this.logmon.isLoggable(BasicLevel.DEBUG)) {
                        this.logmon.log(BasicLevel.DEBUG, "update connections");
                    }
                    List<LiveServerConnection> connections = AmqpConnectionService.getInstance().getConnections();
                    synchronized (this.listeners) {
                        if (this.listeners.size() == 0) {
                            stop();
                        }
                        Iterator<AmqpAcquisition> it = this.listeners.iterator();
                        while (it.hasNext()) {
                            it.next().updateConnections(connections);
                        }
                    }
                } finally {
                    finish();
                }
            }
        }

        public void shutdown() {
            interrupt();
        }

        public void close() {
        }

        protected void addUpdateListener(AmqpAcquisition amqpAcquisition) {
            synchronized (this.listeners) {
                this.listeners.add(amqpAcquisition);
                if (this.listeners.size() == 1) {
                    start();
                }
            }
            amqpAcquisition.updateConnections(AmqpConnectionService.getInstance().getConnections());
        }

        protected void removeUpdateListener(AmqpAcquisition amqpAcquisition) {
            synchronized (this.listeners) {
                this.listeners.remove(amqpAcquisition);
                if (this.listeners.size() == 0) {
                    stop();
                }
            }
        }
    }

    public void start(Properties properties, ReliableTransmitter reliableTransmitter) {
        this.transmitter = reliableTransmitter;
        this.amqpQueue = properties.getProperty(QUEUE_NAME_PROP);
        if (this.amqpQueue == null) {
            logger.log(BasicLevel.ERROR, "The amqp queue name property amqp.QueueName must be specified.");
        }
        long j = 5000;
        try {
            if (properties.containsKey(UPDATE_PERIOD_PROP)) {
                j = Long.parseLong(properties.getProperty(UPDATE_PERIOD_PROP));
            }
        } catch (NumberFormatException e) {
            logger.log(BasicLevel.ERROR, "Property amqp.ConnectionUpdatePeriodcould not be parsed properly, use default value.", e);
        }
        this.connectionNames = null;
        if (properties.containsKey(ROUTING_PROP)) {
            this.connectionNames = AmqpConnectionService.convertToList(properties.getProperty(ROUTING_PROP));
        }
        if (connectionUpdater == null) {
            connectionUpdater = new ConnectionUpdater(j);
        }
        connectionUpdater.addUpdateListener(this);
    }

    public void stop() {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "Stop AmqpAcquisition.");
        }
        connectionUpdater.removeUpdateListener(this);
        this.closing = true;
        synchronized (this.channels) {
            Iterator<Channel> it = this.channels.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                } catch (IOException e) {
                    if (logger.isLoggable(BasicLevel.DEBUG)) {
                        logger.log(BasicLevel.DEBUG, "Error while stopping AmqpAcquisition.", e);
                    }
                }
            }
            this.channels.clear();
        }
    }

    public void updateConnections(List<LiveServerConnection> list) {
        for (LiveServerConnection liveServerConnection : list) {
            if (!this.channels.containsKey(liveServerConnection.getName()) && (this.connectionNames == null || this.connectionNames.contains(liveServerConnection.getName()))) {
                if (logger.isLoggable(BasicLevel.DEBUG)) {
                    logger.log(BasicLevel.DEBUG, "Creating a new consumer on queue " + this.amqpQueue + " for connection " + liveServerConnection.getName());
                }
                try {
                    Channel createChannel = liveServerConnection.getConnection().createChannel();
                    createChannel.queueDeclarePassive(this.amqpQueue);
                    createChannel.basicConsume(this.amqpQueue, false, new AmqpConsumer(createChannel, liveServerConnection.getName()));
                    this.channels.put(liveServerConnection.getName(), createChannel);
                } catch (Exception e) {
                    logger.log(BasicLevel.ERROR, "Error while starting consumer on connection: " + liveServerConnection.getName(), e);
                }
            }
        }
    }
}
