package org.objectweb.joram.mom.dest;

import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.common.Debug;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.notifications.WakeUpNot;
import org.objectweb.joram.mom.util.DMQManager;
import org.objectweb.joram.shared.excepts.MessageValueException;
import org.objectweb.joram.shared.excepts.RequestException;
import org.objectweb.joram.shared.messages.ConversionHelper;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;

/* loaded from: input_file:joram-mom-core-5.8.0.jar:org/objectweb/joram/mom/dest/DistributionQueue.class */
public class DistributionQueue extends Queue {
    public static Logger logger = Debug.getLogger(DistributionQueue.class.getName());
    public static final long DEFAULT_PERIOD = 1000;
    public static final String BATCH_DISTRIBUTION_OPTION = "distribution.batch";
    public static final String ASYNC_DISTRIBUTION_OPTION = "distribution.async";
    private static final long serialVersionUID = 1;
    private transient DistributionModule distributionModule;
    private transient DistributionDaemon distributionDaemon;
    private String distributionClassName;
    private boolean batchDistribution;
    private boolean isAsyncDistribution;
    private Properties properties;

    public DistributionQueue() {
        this.fixed = true;
    }

    @Override // org.objectweb.joram.mom.dest.Destination
    public void setProperties(Properties properties, boolean z) throws Exception {
        super.setProperties(properties, z);
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "DistributionQueue.setProperties prop = " + properties);
        }
        this.properties = properties;
        this.batchDistribution = false;
        this.isAsyncDistribution = false;
        if (properties != null) {
            if (properties.containsKey(BATCH_DISTRIBUTION_OPTION)) {
                try {
                    this.batchDistribution = ConversionHelper.toBoolean(properties.get(BATCH_DISTRIBUTION_OPTION));
                } catch (MessageValueException e) {
                    logger.log(BasicLevel.ERROR, "DistributionModule: can't parse batch option.", e);
                }
            }
            this.isAsyncDistribution = isAsyncDistribution(properties);
            if (this.isAsyncDistribution) {
                this.batchDistribution = true;
            }
        }
        if (z) {
            if (properties != null) {
                this.distributionClassName = properties.getProperty(DistributionModule.CLASS_NAME);
                properties.remove(DistributionModule.CLASS_NAME);
            }
            if (this.distributionClassName == null) {
                throw new RequestException("Distribution class name not found: distribution.className property must be set on queue creation.");
            }
            try {
                Class.forName(this.distributionClassName).getConstructor(new Class[0]);
                return;
            } catch (Exception e2) {
                logger.log(BasicLevel.ERROR, "DistributionQueue: error with distribution class.", e2);
                throw new RequestException(e2.getMessage());
            }
        }
        this.distributionModule.setProperties(properties, z);
        if (this.distributionDaemon == null && this.isAsyncDistribution) {
            this.distributionDaemon = new DistributionDaemon(this.distributionModule.getDistributionHandler(), getName(), this);
            this.distributionDaemon.start();
            return;
        }
        if (this.distributionDaemon == null || this.isAsyncDistribution) {
            return;
        }
        this.distributionDaemon.close();
        this.distributionDaemon = null;
        if (properties.containsKey(BATCH_DISTRIBUTION_OPTION)) {
            try {
                this.batchDistribution = ConversionHelper.toBoolean(properties.get(BATCH_DISTRIBUTION_OPTION));
            } catch (MessageValueException e3) {
                logger.log(BasicLevel.ERROR, "DistributionModule: can't parse batch option.", e3);
            }
        }
    }

    private boolean isAsyncDistribution(Properties properties) {
        if (!properties.containsKey(ASYNC_DISTRIBUTION_OPTION)) {
            return false;
        }
        try {
            return ConversionHelper.toBoolean(properties.get(ASYNC_DISTRIBUTION_OPTION));
        } catch (MessageValueException e) {
            logger.log(BasicLevel.ERROR, "DistributionModule: can't parse DaemonDistribution option.", e);
            return false;
        }
    }

    @Override // org.objectweb.joram.mom.dest.Queue, org.objectweb.joram.mom.dest.Destination
    public void initialize(boolean z) {
        super.initialize(z);
        if (this.distributionModule == null) {
            this.distributionModule = new DistributionModule(this.distributionClassName, this.properties, z);
        }
        if (this.properties != null) {
            this.isAsyncDistribution = isAsyncDistribution(this.properties);
        }
        if (this.distributionDaemon == null && this.isAsyncDistribution) {
            this.distributionDaemon = new DistributionDaemon(this.distributionModule.getDistributionHandler(), getName(), this);
            this.distributionDaemon.start();
        }
    }

    @Override // org.objectweb.joram.mom.dest.Destination, fr.dyade.aaa.agent.Agent
    public void agentFinalize(boolean z) {
        super.agentFinalize(z);
        if (this.distributionModule != null) {
            this.distributionModule.close();
        }
        if (this.distributionDaemon != null) {
            this.distributionDaemon.close();
        }
    }

    @Override // org.objectweb.joram.mom.dest.Destination
    public ClientMessages preProcess(AgentId agentId, ClientMessages clientMessages) {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "DistributionQueue.preProcess(" + agentId + ", " + clientMessages + ')');
        }
        if (!this.batchDistribution && this.messages.size() > 0) {
            return clientMessages;
        }
        List messages = clientMessages.getMessages();
        Iterator it = messages.iterator();
        while (it.hasNext()) {
            Message message = (Message) it.next();
            try {
                this.distributionModule.processMessage(message);
                this.nbMsgsDeliverSinceCreation += serialVersionUID;
                it.remove();
            } catch (Exception e) {
                if (this.isAsyncDistribution) {
                    if (this.distributionDaemon != null) {
                        this.distributionDaemon.push(message);
                    } else if (logger.isLoggable(BasicLevel.WARN)) {
                        logger.log(BasicLevel.WARN, "DistributionQueue.preProcess: distribution distributionDaemon = null but we are in async distribution mode.", e);
                    }
                } else if (logger.isLoggable(BasicLevel.WARN)) {
                    logger.log(BasicLevel.WARN, "DistributionQueue.preProcess: distribution error.", e);
                }
                if (!this.batchDistribution) {
                    break;
                }
            }
        }
        if (messages.size() > 0) {
            return clientMessages;
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.objectweb.joram.mom.dest.Destination
    public void postProcess(ClientMessages clientMessages) {
        super.postProcess(clientMessages);
        if (this.distributionDaemon != null) {
            if (logger.isLoggable(BasicLevel.DEBUG)) {
                logger.log(BasicLevel.DEBUG, "DistributionQueue.postProcess(...)");
            }
            List<String> ackList = this.distributionDaemon.getAckList();
            if (ackList != null) {
                for (String str : ackList) {
                    while (true) {
                        if (0 < this.messages.size()) {
                            org.objectweb.joram.mom.messages.Message message = (org.objectweb.joram.mom.messages.Message) this.messages.get(0);
                            if (str.equals(message.getId())) {
                                this.messages.remove(0);
                                message.delete();
                                if (logger.isLoggable(BasicLevel.DEBUG)) {
                                    logger.log(BasicLevel.DEBUG, "DistributionQueue.postProcess removes " + str);
                                }
                            }
                        }
                    }
                }
            }
        }
    }

    @Override // org.objectweb.joram.mom.dest.Queue, fr.dyade.aaa.agent.Agent, fr.dyade.aaa.agent.AgentMBean
    public String toString() {
        return "DistributionQueue:" + getId().toString();
    }

    @Override // org.objectweb.joram.mom.dest.Queue, org.objectweb.joram.mom.dest.Destination
    public void wakeUpNot(WakeUpNot wakeUpNot) {
        List<String> ackList;
        if (logger.isLoggable(BasicLevel.DEBUG) && !this.isAsyncDistribution) {
            logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot(" + wakeUpNot + ')');
        }
        super.wakeUpNot(wakeUpNot);
        if (this.distributionDaemon != null && (ackList = this.distributionDaemon.getAckList()) != null) {
            for (String str : ackList) {
                while (true) {
                    if (0 < this.messages.size()) {
                        org.objectweb.joram.mom.messages.Message message = (org.objectweb.joram.mom.messages.Message) this.messages.get(0);
                        if (str.equals(message.getId())) {
                            this.messages.remove(0);
                            message.delete();
                            if (logger.isLoggable(BasicLevel.DEBUG)) {
                                logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot removes " + str);
                            }
                        }
                    }
                }
            }
        }
        Iterator it = this.messages.iterator();
        while (it.hasNext()) {
            org.objectweb.joram.mom.messages.Message message2 = (org.objectweb.joram.mom.messages.Message) it.next();
            try {
                this.distributionModule.processMessage(message2.getFullMessage());
                this.nbMsgsDeliverSinceCreation += serialVersionUID;
                it.remove();
                message2.delete();
            } catch (Exception e) {
                if (logger.isLoggable(BasicLevel.DEBUG) && !this.isAsyncDistribution) {
                    logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot redelivery number " + message2.getDeliveryCount() + " failed.", e);
                } else if (logger.isLoggable(BasicLevel.DEBUG)) {
                    logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot redelivery " + message2.getId() + " number " + message2.getDeliveryCount());
                }
                if (this.distributionDaemon == null) {
                    message2.incDeliveryCount();
                }
                if (isUndeliverable(message2)) {
                    if (logger.isLoggable(BasicLevel.DEBUG)) {
                        logger.log(BasicLevel.DEBUG, "Message can't be delivered, send to DMQ.");
                    }
                    it.remove();
                    message2.delete();
                    DMQManager dMQManager = new DMQManager(this.dmqId, getId());
                    this.nbMsgsSentToDMQSinceCreation += serialVersionUID;
                    dMQManager.addDeadMessage(message2.getFullMessage(), (short) 2);
                    dMQManager.sendToDMQ();
                } else {
                    if (this.distributionDaemon != null) {
                        synchronized (this.distributionDaemon) {
                            this.distributionDaemon.notify();
                        }
                    }
                    if (logger.isLoggable(BasicLevel.DEBUG) && this.distributionDaemon != null) {
                        logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot distributionDaemon = " + this.distributionDaemon + ", distributionDaemon.isEmpty() = " + this.distributionDaemon.isEmpty());
                    }
                    if (this.distributionDaemon != null) {
                        if (!this.distributionDaemon.isEmpty()) {
                            return;
                        } else {
                            this.distributionDaemon.push(message2.getFullMessage());
                        }
                    }
                    if (!this.batchDistribution) {
                        return;
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.objectweb.joram.mom.dest.Destination
    public void processSetRight(AgentId agentId, int i) throws RequestException {
        if (i == READ) {
            throw new RequestException("A distribution queue can't be set readable.");
        }
        super.processSetRight(agentId, i);
    }
}
