package com.tairanchina.csp.dew.core.cluster.test;

import com.tairanchina.csp.dew.core.cluster.Cluster;
import com.tairanchina.csp.dew.core.cluster.ClusterMQ;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/tairanchina/csp/dew/core/cluster/test/ClusterMQTest.class */
public class ClusterMQTest {
    private static final Logger logger;
    static final /* synthetic */ boolean $assertionsDisabled;

    public void test(ClusterMQ clusterMQ) throws InterruptedException {
        testPubSub(clusterMQ);
        testReqResp(clusterMQ);
        testHA(clusterMQ);
    }

    private void testPubSub(ClusterMQ clusterMQ) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(40);
        new Thread(() -> {
            clusterMQ.subscribe("test_pub_sub", str -> {
                if (!$assertionsDisabled && !str.contains("msg")) {
                    throw new AssertionError();
                }
                logger.info("subscribe instance 1: pub_sub>>" + str);
                countDownLatch.countDown();
            });
        }).start();
        new Thread(() -> {
            clusterMQ.subscribe("test_pub_sub", str -> {
                if (!$assertionsDisabled && !str.contains("msg")) {
                    throw new AssertionError();
                }
                logger.info("subscribe instance 2: pub_sub>>" + str);
                countDownLatch.countDown();
            });
        }).start();
        Thread.sleep(1000L);
        for (int i = 0; i < 10; i++) {
            clusterMQ.publish("test_pub_sub", "msgA" + i);
            clusterMQ.publish("test_pub_sub", "msgB" + i);
        }
        countDownLatch.await();
    }

    private void testReqResp(ClusterMQ clusterMQ) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(20);
        ArrayList arrayList = new ArrayList();
        new Thread(() -> {
            clusterMQ.response("test_rep_resp", str -> {
                if (arrayList.contains(str)) {
                    if (!$assertionsDisabled) {
                        throw new AssertionError();
                    }
                } else {
                    arrayList.add(str);
                    logger.info("response instance 1: req_resp>>" + str);
                    countDownLatch.countDown();
                }
            });
        }).start();
        new Thread(() -> {
            clusterMQ.response("test_rep_resp", str -> {
                if (arrayList.contains(str)) {
                    if (!$assertionsDisabled) {
                        throw new AssertionError();
                    }
                } else {
                    arrayList.add(str);
                    logger.info("response instance 2: req_resp>>" + str);
                    countDownLatch.countDown();
                }
            });
        }).start();
        Thread.sleep(1000L);
        for (int i = 0; i < 10; i++) {
            clusterMQ.request("test_rep_resp", "msgA" + i);
            clusterMQ.request("test_rep_resp", "msgB" + i);
        }
        countDownLatch.await();
    }

    private void testHA(ClusterMQ clusterMQ) throws InterruptedException {
        Cluster.ha();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            clusterMQ.subscribe("test_ha", str -> {
                logger.info("subscribe instance: pub_sub_ha>>" + str);
                countDownLatch.countDown();
                if (countDownLatch.getCount() == 0) {
                    throw new RuntimeException("Mock Some Error");
                }
            });
        });
        thread.start();
        Thread.sleep(1000L);
        clusterMQ.publish("test_ha", "ha_msgA");
        countDownLatch.await();
        thread.stop();
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        new Thread(() -> {
            clusterMQ.subscribe("test_ha", str -> {
                logger.info("subscribe new instance: pub_sub_ha>>" + str);
                countDownLatch2.countDown();
            });
        }).start();
        Thread.sleep(1000L);
        clusterMQ.publish("test_ha", "ha_msgB");
        countDownLatch2.await();
    }

    static {
        $assertionsDisabled = !ClusterMQTest.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(ClusterMQTest.class);
    }
}
