package org.jgroups.tests;

import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import junit.textui.TestRunner;
import org.hsqldb.Types;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.protocols.TP;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Promise;
import org.jgroups.util.Util;

/* loaded from: input_file:jgroups-2.6.3.GA.jar:org/jgroups/tests/MessageBundlingTest.class */
public class MessageBundlingTest extends ChannelTestBase {
    private JChannel ch1;
    private JChannel ch2;
    private MyReceiver r2;
    private static final long LATENCY = 1500;
    private static final long SLEEP = 5000;
    private static final boolean BUNDLING = true;
    private static final int MAX_BYTES = 20000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:jgroups-2.6.3.GA.jar:org/jgroups/tests/MessageBundlingTest$MyReceiver.class */
    public static class MyReceiver extends ReceiverAdapter {
        private final List<Long> times;
        private int num_expected_msgs;
        private Promise<Integer> promise;

        private MyReceiver() {
            this.times = new LinkedList();
        }

        public List<Long> getTimes() {
            return this.times;
        }

        public void setNumExpectedMesssages(int i) {
            this.num_expected_msgs = i;
        }

        public void setPromise(Promise<Integer> promise) {
            this.promise = promise;
        }

        public int size() {
            return this.times.size();
        }

        @Override // org.jgroups.ReceiverAdapter, org.jgroups.MessageListener
        public void receive(Message message) {
            this.times.add(new Long(System.currentTimeMillis()));
            System.out.println("<<< received message from " + message.getSrc() + " at " + new Date());
            if (this.times.size() < this.num_expected_msgs || this.promise == null) {
                return;
            }
            this.promise.setResult(Integer.valueOf(this.times.size()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:jgroups-2.6.3.GA.jar:org/jgroups/tests/MessageBundlingTest$NullReceiver.class */
    public static class NullReceiver extends ReceiverAdapter {
        private NullReceiver() {
        }

        @Override // org.jgroups.ReceiverAdapter, org.jgroups.MessageListener
        public void receive(Message message) {
        }
    }

    /* loaded from: input_file:jgroups-2.6.3.GA.jar:org/jgroups/tests/MessageBundlingTest$SimpleReceiver.class */
    private static class SimpleReceiver extends ReceiverAdapter {
        long start;

        private SimpleReceiver() {
            this.start = System.currentTimeMillis();
        }

        @Override // org.jgroups.ReceiverAdapter, org.jgroups.MessageListener
        public void receive(Message message) {
            System.out.println("<<< received message from " + message.getSrc() + " at " + new Date() + ", latency=" + (System.currentTimeMillis() - this.start) + " ms");
        }
    }

    @Override // org.jgroups.tests.ChannelTestBase
    public void setUp() throws Exception {
        super.setUp();
    }

    @Override // org.jgroups.tests.ChannelTestBase
    public void tearDown() throws Exception {
        closeChannel(this.ch2);
        closeChannel(this.ch1);
        super.tearDown();
    }

    @Override // org.jgroups.tests.ChannelTestBase
    protected boolean useBlocking() {
        return false;
    }

    private void prepareChannels() throws Exception, ChannelException {
        this.ch1 = createChannel();
        setBundling(this.ch1, true, MAX_BYTES, LATENCY);
        setLoopback(this.ch1, false);
        this.ch1.setReceiver(new NullReceiver());
        this.ch1.connect("x");
        this.ch2 = createChannel();
        setBundling(this.ch2, true, MAX_BYTES, LATENCY);
        setLoopback(this.ch2, false);
        this.r2 = new MyReceiver();
        this.ch2.setReceiver(this.r2);
        this.ch2.connect("x");
        assertEquals(2, this.ch2.getView().size());
    }

    public void testLatencyWithoutMessageBundling() throws Exception {
        prepareChannels();
        Message message = new Message();
        setBundling(this.ch1, false, MAX_BYTES, 30L);
        this.r2.setNumExpectedMesssages(1);
        Promise<Integer> promise = new Promise<>();
        this.r2.setPromise(promise);
        long currentTimeMillis = System.currentTimeMillis();
        this.ch1.send(message);
        System.out.println(">>> sent message at " + new Date());
        promise.getResult(SLEEP);
        List<Long> times = this.r2.getTimes();
        assertEquals(1, times.size());
        long longValue = times.get(0).longValue() - currentTimeMillis;
        System.out.println("latency: " + longValue + " ms");
        assertTrue("latency (" + longValue + "ms) should be less than " + LATENCY + " ms", longValue <= LATENCY);
    }

    public void testLatencyWithMessageBundling() throws Exception {
        prepareChannels();
        Message message = new Message();
        this.r2.setNumExpectedMesssages(1);
        Promise<Integer> promise = new Promise<>();
        this.r2.setPromise(promise);
        long currentTimeMillis = System.currentTimeMillis();
        this.ch1.send(message);
        System.out.println(">>> sent message at " + new Date());
        promise.getResult(SLEEP);
        List<Long> times = this.r2.getTimes();
        assertEquals(1, times.size());
        long longValue = times.get(0).longValue() - currentTimeMillis;
        System.out.println("latency: " + longValue + " ms");
        assertTrue("latency (" + longValue + "ms) should be more than the bundling timeout (" + LATENCY + "ms), but less than 2 times the LATENCY (" + Global.THREADPOOL_SHUTDOWN_WAIT_TIME + ")", longValue >= LATENCY && longValue <= Global.THREADPOOL_SHUTDOWN_WAIT_TIME);
    }

    public void testLatencyWithMessageBundlingAndLoopback() throws Exception {
        prepareChannels();
        Message message = new Message();
        setLoopback(this.ch1, true);
        setLoopback(this.ch2, true);
        this.r2.setNumExpectedMesssages(1);
        Promise<Integer> promise = new Promise<>();
        this.r2.setPromise(promise);
        long currentTimeMillis = System.currentTimeMillis();
        System.out.println(">>> sending message at " + new Date());
        this.ch1.send(message);
        promise.getResult(SLEEP);
        List<Long> times = this.r2.getTimes();
        assertEquals(1, times.size());
        long longValue = times.get(0).longValue() - currentTimeMillis;
        System.out.println("latency: " + longValue + " ms");
        assertTrue("latency (" + longValue + "ms) should be more than the bundling timeout (" + LATENCY + "ms), but less than 2 times the LATENCY (" + Global.THREADPOOL_SHUTDOWN_WAIT_TIME + ")", longValue >= LATENCY && longValue <= Global.THREADPOOL_SHUTDOWN_WAIT_TIME);
    }

    public void testLatencyWithMessageBundlingAndMaxBytes() throws Exception {
        prepareChannels();
        setLoopback(this.ch1, true);
        setLoopback(this.ch2, true);
        this.r2.setNumExpectedMesssages(10);
        Promise<Integer> promise = new Promise<>();
        this.r2.setPromise(promise);
        Util.sleep(Global.THREADPOOL_SHUTDOWN_WAIT_TIME);
        System.out.println(">>> sending 10 messages at " + new Date());
        for (int i = 0; i < 10; i++) {
            this.ch1.send(new Message((Address) null, (Address) null, new byte[Types.JAVA_OBJECT]));
        }
        promise.getResult(SLEEP);
        List<Long> times = this.r2.getTimes();
        assertEquals(10, times.size());
        Iterator<Long> it = times.iterator();
        while (it.hasNext()) {
            System.out.println(it.next());
        }
    }

    public void testSimple() throws Exception {
        prepareChannels();
        Message message = new Message();
        this.ch2.setReceiver(new SimpleReceiver());
        this.ch1.send(message);
        System.out.println(">>> sent message at " + new Date());
        Util.sleep(SLEEP);
    }

    private void setLoopback(JChannel jChannel, boolean z) {
        ((TP) jChannel.getProtocolStack().getProtocols().lastElement()).setLoopback(z);
    }

    private void setBundling(JChannel jChannel, boolean z, int i, long j) {
        ProtocolStack protocolStack = jChannel.getProtocolStack();
        TP tp = (TP) protocolStack.getProtocols().lastElement();
        tp.setEnableBundling(z);
        tp.setMaxBundleSize(i);
        tp.setMaxBundleTimeout(j);
        tp.setEnable_unicast_bundling(false);
        if (z) {
            GMS gms = (GMS) protocolStack.findProtocol(GMS.name);
            gms.setViewAckCollectionTimeout(Global.THREADPOOL_SHUTDOWN_WAIT_TIME);
            gms.setJoinTimeout(Global.THREADPOOL_SHUTDOWN_WAIT_TIME);
        }
    }

    private void closeChannel(Channel channel) {
        if (channel != null) {
            if (channel.isOpen() || channel.isConnected()) {
                channel.close();
            }
        }
    }

    public static void main(String[] strArr) {
        TestRunner.main(new String[]{MessageBundlingTest.class.getName()});
    }
}
