package org.objectweb.joram.mom.dest.amqp;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import fr.dyade.aaa.common.Debug;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.objectweb.joram.mom.dest.DistributionHandler;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;

/* loaded from: input_file:org/objectweb/joram/mom/dest/amqp/AmqpDistribution.class */
public class AmqpDistribution implements DistributionHandler {
    private static final Logger logger = Debug.getLogger(AmqpDistribution.class.getName());
    private static final String QUEUE_NAME_PROP = "amqp.QueueName";
    private static final String QUEUE_PASSIVE_PROP = "amqp.Queue.DeclarePassive";
    private static final String QUEUE_EXCLUSIVE_PROP = "amqp.Queue.DeclareExclusive";
    private static final String QUEUE_DURABLE_PROP = "amqp.Queue.DeclareDurable";
    private static final String QUEUE_AUTODELETE_PROP = "amqp.Queue.DeclareAutoDelete";
    private static final String UPDATE_PERIOD_PROP = "amqp.ConnectionUpdatePeriod";
    private static final String ROUTING_PROP = "amqp.Routing";
    private LinkedHashMap<String, Channel> channels = new LinkedHashMap<>(16, 0.75f, true);
    private List<String> connectionNames = null;
    private String amqpQueue = null;
    private boolean amqpQueuePassive = true;
    private boolean amqpQueueExclusive = true;
    private boolean amqpQueueDurable = true;
    private boolean amqpQueueAutoDelete = true;
    private long lastUpdate = 0;
    private long updatePeriod = 5000;

    public void init(Properties properties, boolean z) {
        this.amqpQueue = properties.getProperty(QUEUE_NAME_PROP);
        if (this.amqpQueue == null) {
            logger.log(BasicLevel.ERROR, "The amqp queue name property amqp.QueueName must be specified.");
        }
        this.amqpQueuePassive = Boolean.parseBoolean(properties.getProperty(QUEUE_PASSIVE_PROP, "true"));
        this.amqpQueueExclusive = Boolean.parseBoolean(properties.getProperty(QUEUE_EXCLUSIVE_PROP, "false"));
        this.amqpQueueDurable = Boolean.parseBoolean(properties.getProperty(QUEUE_DURABLE_PROP, "true"));
        this.amqpQueueAutoDelete = Boolean.parseBoolean(properties.getProperty(QUEUE_AUTODELETE_PROP, "false"));
        try {
            if (properties.containsKey(UPDATE_PERIOD_PROP)) {
                this.updatePeriod = Long.parseLong(properties.getProperty(UPDATE_PERIOD_PROP));
            }
        } catch (NumberFormatException e) {
            logger.log(BasicLevel.ERROR, "Property amqp.ConnectionUpdatePeriodcould not be parsed properly, use default value.", e);
        }
        if (properties.containsKey(ROUTING_PROP)) {
            this.connectionNames = AmqpConnectionService.convertToList(properties.getProperty(ROUTING_PROP));
        }
    }

    public void distribute(Message message) throws Exception {
        List<String> list = this.connectionNames;
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties();
        if (message.persistent) {
            basicProperties.setDeliveryMode(2);
        } else {
            basicProperties.setDeliveryMode(1);
        }
        basicProperties.setCorrelationId(message.correlationId);
        basicProperties.setPriority(Integer.valueOf(message.priority));
        basicProperties.setTimestamp(new Date(message.timestamp));
        basicProperties.setMessageId(message.id);
        basicProperties.setType(String.valueOf(message.type));
        basicProperties.setExpiration(String.valueOf(message.expiration));
        if (message.properties != null) {
            HashMap hashMap = new HashMap();
            message.properties.copyInto(hashMap);
            basicProperties.setHeaders(hashMap);
            Object obj = message.properties.get(ROUTING_PROP);
            if (obj != null && (obj instanceof String)) {
                list = AmqpConnectionService.convertToList((String) obj);
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.lastUpdate > this.updatePeriod) {
            if (logger.isLoggable(BasicLevel.DEBUG)) {
                logger.log(BasicLevel.DEBUG, "Updating channels.");
            }
            for (LiveServerConnection liveServerConnection : AmqpConnectionService.getInstance().getConnections()) {
                if (!this.channels.containsKey(liveServerConnection.getName())) {
                    if (logger.isLoggable(BasicLevel.DEBUG)) {
                        logger.log(BasicLevel.DEBUG, liveServerConnection.getName() + ": New channel available for distribution.");
                    }
                    try {
                        Channel createChannel = liveServerConnection.getConnection().createChannel();
                        if (this.amqpQueuePassive) {
                            createChannel.queueDeclarePassive(this.amqpQueue);
                        } else {
                            createChannel.queueDeclare(this.amqpQueue, this.amqpQueueDurable, this.amqpQueueExclusive, this.amqpQueueAutoDelete, null);
                        }
                        this.channels.put(liveServerConnection.getName(), createChannel);
                    } catch (IOException e) {
                        if (logger.isLoggable(BasicLevel.DEBUG)) {
                            logger.log(BasicLevel.DEBUG, "Channel is not usable.", e);
                        }
                    }
                }
            }
            this.lastUpdate = currentTimeMillis;
        }
        Iterator<Map.Entry<String, Channel>> it = this.channels.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Channel> next = it.next();
            try {
                Channel value = next.getValue();
                String key = next.getKey();
                if (!value.isOpen()) {
                    it.remove();
                } else if (list == null || list.contains(key)) {
                    if (logger.isLoggable(BasicLevel.DEBUG)) {
                        logger.log(BasicLevel.DEBUG, "Sending message on " + key);
                    }
                    value.basicPublish("", this.amqpQueue, basicProperties, message.getBody());
                    this.channels.get(key);
                    return;
                }
            } catch (AlreadyClosedException e2) {
                if (logger.isLoggable(BasicLevel.DEBUG)) {
                    logger.log(BasicLevel.DEBUG, "Channel is not usable, remove from table.", e2);
                }
                it.remove();
            } catch (IOException e3) {
                if (logger.isLoggable(BasicLevel.DEBUG)) {
                    logger.log(BasicLevel.DEBUG, "Channel is not usable, remove from table.", e3);
                }
                it.remove();
            }
        }
        throw new Exception("Message could not be sent, no usable channel found.");
    }

    public void close() {
        Iterator<Channel> it = this.channels.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (IOException e) {
                if (logger.isLoggable(BasicLevel.DEBUG)) {
                    logger.log(BasicLevel.DEBUG, "Error while stopping AmqpDistribution.", e);
                }
            }
        }
        this.channels.clear();
    }
}
