package com.aliyun.dms.subscribe.clients;

import com.aliyun.dts.subscribe.clients.ConsumerContext;
import com.aliyun.dts.subscribe.clients.DTSConsumer;
import com.aliyun.dts.subscribe.clients.common.Checkpoint;
import com.aliyun.dts.subscribe.clients.common.RecordListener;
import com.aliyun.dts.subscribe.clients.metastore.MetaStore;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/dms/subscribe/clients/DefaultDistributedDTSConsumer.class */
public class DefaultDistributedDTSConsumer implements DistributedDTSConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultDistributedDTSConsumer.class);
    private ThreadPoolExecutor executor;
    private List<DTSConsumer> dtsConsumers = new ArrayList();
    private int corePoolSize = 8;
    private int maximumPoolSize = 8;
    private volatile boolean isClosePoolExecutor = false;

    public void addDTSConsumer(DTSConsumer dTSConsumer) {
        this.dtsConsumers.add(dTSConsumer);
    }

    public void init(Map<String, String> map, DBMapper dBMapper, String str, Map<String, String> map2, String str2, String str3, ConsumerContext.ConsumerSubscribeMode consumerSubscribeMode, boolean z, MetaStore<Checkpoint> metaStore, Map<String, RecordListener> map3) {
        init(map, dBMapper, str, map2, str2, str3, consumerSubscribeMode, z, metaStore, map3, new Properties());
    }

    public void init(Map<String, String> map, DBMapper dBMapper, String str, Map<String, String> map2, String str2, String str3, ConsumerContext.ConsumerSubscribeMode consumerSubscribeMode, boolean z, MetaStore<Checkpoint> metaStore, Map<String, RecordListener> map3, Properties properties) {
        this.executor = new ThreadPoolExecutor(this.corePoolSize, this.maximumPoolSize, 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        for (Map.Entry<String, String> entry : map.entrySet()) {
            ConsumerContext consumerContext = new ConsumerContext(dBMapper, str, entry.getKey(), map2.get(entry.getKey()), str2, str3, entry.getValue(), consumerSubscribeMode, properties);
            consumerContext.setUserRegisteredStore(metaStore);
            consumerContext.setForceUseCheckpoint(z);
            DTSConsumerWithDBMapping dTSConsumerWithDBMapping = new DTSConsumerWithDBMapping(consumerContext);
            dTSConsumerWithDBMapping.addRecordListeners(map3);
            addDTSConsumer(dTSConsumerWithDBMapping);
        }
    }

    @Override // com.aliyun.dms.subscribe.clients.DistributedDTSConsumer
    public void start() {
        for (DTSConsumer dTSConsumer : this.dtsConsumers) {
            try {
                ThreadPoolExecutor threadPoolExecutor = this.executor;
                dTSConsumer.getClass();
                threadPoolExecutor.submit(dTSConsumer::start);
            } catch (Exception e) {
                LOG.error("error starting consumer:" + e);
                shutdownGracefully(10L, TimeUnit.SECONDS);
            }
        }
    }

    public void shutdownGracefully(long j, TimeUnit timeUnit) {
        this.executor.shutdown();
        try {
            try {
                if (!this.executor.awaitTermination(j, timeUnit)) {
                    this.executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                this.executor.shutdownNow();
                Thread.currentThread().interrupt();
                this.isClosePoolExecutor = true;
            }
        } finally {
            this.isClosePoolExecutor = true;
        }
    }

    @Override // com.aliyun.dms.subscribe.clients.DistributedDTSConsumer
    public void addRecordListeners(Map<String, RecordListener> map) {
        Iterator<DTSConsumer> it = this.dtsConsumers.iterator();
        while (it.hasNext()) {
            it.next().addRecordListeners(map);
        }
    }
}
