package org.apache.rocketmq.streams.core.running;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.RStreamsException;
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
import org.apache.rocketmq.streams.core.state.RocketMQStore;
import org.apache.rocketmq.streams.core.state.RocksDBStore;
import org.apache.rocketmq.streams.core.state.StateStore;
import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
import org.apache.rocketmq.streams.core.util.RocketMQUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/streams/core/running/WorkerThread.class */
public class WorkerThread extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(WorkerThread.class.getName());
    private final TopologyBuilder topologyBuilder;
    private final PlanetaryEngine<?, ?> planetaryEngine;
    private final Properties properties;

    /* loaded from: input_file:org/apache/rocketmq/streams/core/running/WorkerThread$PlanetaryEngine.class */
    class PlanetaryEngine<K, V> {
        private final DefaultLitePullConsumer unionConsumer;
        private final DefaultMQProducer producer;
        private final DefaultMQAdminExt mqAdmin;
        private final StateStore stateStore;
        private final MessageQueueListenerWrapper wrapper;
        private volatile boolean stop = false;

        public PlanetaryEngine(DefaultLitePullConsumer defaultLitePullConsumer, DefaultMQProducer defaultMQProducer, StateStore stateStore, DefaultMQAdminExt defaultMQAdminExt, MessageQueueListenerWrapper messageQueueListenerWrapper) {
            this.unionConsumer = defaultLitePullConsumer;
            this.producer = defaultMQProducer;
            this.mqAdmin = defaultMQAdminExt;
            this.stateStore = stateStore;
            this.wrapper = messageQueueListenerWrapper;
            this.wrapper.setRecoverHandler((set, set2) -> {
                try {
                    this.stateStore.recover(set, set2);
                    return null;
                } catch (Throwable th) {
                    WorkerThread.logger.error("recover error.", th);
                    return th;
                }
            });
        }

        void start() throws Throwable {
            createShuffleTopic();
            this.unionConsumer.start();
            this.producer.start();
            this.stateStore.init();
        }

        /* JADX WARN: Removed duplicated region for block: B:43:0x01fb A[LOOP:2: B:41:0x01f2->B:43:0x01fb, LOOP_END] */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        void runInLoop() throws java.lang.Throwable {
            /*
                Method dump skipped, instructions count: 554
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.streams.core.running.WorkerThread.PlanetaryEngine.runInLoop():void");
        }

        void createShuffleTopic() throws Throwable {
            Set<String> sourceTopic = WorkerThread.this.topologyBuilder.getSourceTopic();
            ArrayList arrayList = new ArrayList();
            for (String str : sourceTopic) {
                if (str.endsWith(Constant.SHUFFLE_TOPIC_SUFFIX)) {
                    arrayList.add(str);
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                RocketMQUtil.createStaticTopic(this.mqAdmin, (String) it.next(), StreamConfig.SHUFFLE_TOPIC_QUEUE_NUM.intValue());
            }
        }

        public void stop() {
            this.stop = true;
            try {
                this.stateStore.close();
                this.unionConsumer.shutdown();
                this.producer.shutdown();
                this.mqAdmin.shutdown();
            } catch (Throwable th) {
                WorkerThread.logger.error("error when stop engin.", th);
            }
        }
    }

    public WorkerThread(String str, TopologyBuilder topologyBuilder, Properties properties) throws MQClientException {
        super(str);
        this.topologyBuilder = topologyBuilder;
        this.properties = properties;
        String str2 = topologyBuilder.getJobId() + "_" + StreamConfig.ROCKETMQ_STREAMS_CONSUMER_GROUP;
        RocketMQClient rocketMQClient = new RocketMQClient(properties.getProperty("rocketmq.namesrv.addr"));
        DefaultLitePullConsumer pullConsumer = rocketMQClient.pullConsumer(str2, topologyBuilder.getSourceTopic());
        MessageQueueListenerWrapper messageQueueListenerWrapper = new MessageQueueListenerWrapper(pullConsumer.getMessageQueueListener(), topologyBuilder);
        pullConsumer.setMessageQueueListener(messageQueueListenerWrapper);
        DefaultMQProducer producer = rocketMQClient.producer(str2);
        DefaultMQAdminExt mQAdmin = rocketMQClient.getMQAdmin();
        this.planetaryEngine = new PlanetaryEngine<>(pullConsumer, producer, new RocketMQStore(producer, new RocksDBStore(), mQAdmin, this.properties), mQAdmin, messageQueueListenerWrapper);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        RStreamsException rStreamsException;
        try {
            try {
                this.planetaryEngine.start();
                this.planetaryEngine.runInLoop();
                logger.info("worker thread=[{}], engin stopped.", getName());
                this.planetaryEngine.stop();
            } finally {
            }
        } catch (Throwable th) {
            logger.info("worker thread=[{}], engin stopped.", getName());
            this.planetaryEngine.stop();
            throw th;
        }
    }

    public void shutdown() {
        this.planetaryEngine.stop();
    }
}
