/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.dts.subscribe.clients;

import com.aliyun.dts.subscribe.clients.ConsumerContext;
import com.aliyun.dts.subscribe.clients.DTSConsumer;
import com.aliyun.dts.subscribe.clients.check.CheckResult;
import com.aliyun.dts.subscribe.clients.check.DefaultCheckManager;
import com.aliyun.dts.subscribe.clients.check.SubscribeAuthChecker;
import com.aliyun.dts.subscribe.clients.check.SubscribeNetworkChecker;
import com.aliyun.dts.subscribe.clients.common.Checkpoint;
import com.aliyun.dts.subscribe.clients.common.RecordListener;
import com.aliyun.dts.subscribe.clients.common.Util;
import com.aliyun.dts.subscribe.clients.common.WorkThread;
import com.aliyun.dts.subscribe.clients.record.DefaultUserRecord;
import com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher;
import com.aliyun.dts.subscribe.clients.recordgenerator.UserRecordGenerator;
import com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultDTSConsumer
implements DTSConsumer {
    private static final Logger log = LoggerFactory.getLogger(DefaultDTSConsumer.class);
    private ConsumerContext consumerContext;
    private Map<String, RecordListener> recordListeners;
    private final LinkedBlockingQueue<ConsumerRecord> toProcessRecords;
    private final LinkedBlockingQueue<DefaultUserRecord> defaultUserRecords;
    private volatile boolean started = false;

    public DefaultDTSConsumer(ConsumerContext consumerContext) {
        this.consumerContext = consumerContext;
        this.toProcessRecords = new LinkedBlockingQueue(512);
        this.defaultUserRecords = new LinkedBlockingQueue(512);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start() {
        this.check();
        DefaultDTSConsumer defaultDTSConsumer = this;
        synchronized (defaultDTSConsumer) {
            DefaultDTSConsumer.initLog4j();
            if (this.started) {
                throw new IllegalStateException("The client has already been started");
            }
            KafkaRecordFetcher recordFetcher = new KafkaRecordFetcher(this.consumerContext, this.toProcessRecords);
            UserRecordGenerator userRecordGenerator = new UserRecordGenerator(this.consumerContext, this.toProcessRecords, this.defaultUserRecords, (tp, timestamp, offset, metadata) -> recordFetcher.setToCommitCheckpoint(new Checkpoint(tp, timestamp, offset, metadata)));
            EtlRecordProcessor etlRecordProcessor = new EtlRecordProcessor(this.consumerContext, this.defaultUserRecords, this.recordListeners);
            List<WorkThread> startStream = DefaultDTSConsumer.startWorker(etlRecordProcessor, userRecordGenerator, recordFetcher);
            while (!this.consumerContext.isExited()) {
                Util.sleepMS(1000L);
            }
            log.info("DTS Consumer: shutting down...");
            for (WorkThread workThread : startStream) {
                workThread.stop();
            }
            this.started = true;
        }
    }

    private static List<WorkThread> startWorker(EtlRecordProcessor etlRecordProcessor, UserRecordGenerator userRecordGenerator, KafkaRecordFetcher recordGenerator) {
        LinkedList<WorkThread> ret = new LinkedList<WorkThread>();
        ret.add(new WorkThread<EtlRecordProcessor>(etlRecordProcessor, EtlRecordProcessor.class.getName()));
        ret.add(new WorkThread<UserRecordGenerator>(userRecordGenerator, UserRecordGenerator.class.getName()));
        ret.add(new WorkThread<KafkaRecordFetcher>(recordGenerator, KafkaRecordFetcher.class.getName()));
        for (WorkThread workThread : ret) {
            workThread.start();
        }
        return ret;
    }

    @Override
    public void addRecordListeners(Map<String, RecordListener> recordListeners) {
        Util.require(null != recordListeners && !recordListeners.isEmpty(), "record listener required");
        recordListeners.forEach((k, v) -> log.info("register record listener " + k));
        this.recordListeners = recordListeners;
    }

    @Override
    public void check() {
        DefaultCheckManager checkerManager = new DefaultCheckManager(this.consumerContext);
        checkerManager.addCheckItem(new SubscribeNetworkChecker(this.consumerContext.getBrokerUrl()));
        checkerManager.addCheckItem(new SubscribeAuthChecker(this.consumerContext));
        CheckResult checkResult = checkerManager.check();
        if (checkResult.isOk()) {
            log.info(checkResult.toString());
        } else {
            log.error(checkResult.toString());
            System.exit(1);
        }
    }

    private static Properties initLog4j() {
        Properties properties = new Properties();
        InputStream log4jInput = null;
        try {
            log4jInput = Thread.currentThread().getContextClassLoader().getResourceAsStream("log4j.properties");
            PropertyConfigurator.configure((InputStream)log4jInput);
        }
        catch (Exception exception) {
            Util.swallowErrorClose(log4jInput);
        }
        catch (Throwable throwable) {
            Util.swallowErrorClose(log4jInput);
            throw throwable;
        }
        Util.swallowErrorClose(log4jInput);
        return properties;
    }
}

