/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.hitsdb.client.consumer;

import com.aliyun.hitsdb.client.Config;
import com.aliyun.hitsdb.client.consumer.BatchPutRunnable;
import com.aliyun.hitsdb.client.consumer.BatchPutThreadFactory;
import com.aliyun.hitsdb.client.consumer.Consumer;
import com.aliyun.hitsdb.client.consumer.MultiFieldBatchPutRunnable;
import com.aliyun.hitsdb.client.consumer.PointsCollectionPutRunnable;
import com.aliyun.hitsdb.client.http.HttpClient;
import com.aliyun.hitsdb.client.queue.DataQueue;
import com.aliyun.hitsdb.client.util.guava.RateLimiter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBatchPutConsumer
implements Consumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBatchPutConsumer.class);
    private DataQueue dataQueue;
    private ExecutorService threadPool;
    private ExecutorService multiFieldThreadPool;
    private ExecutorService pointsThreadPool;
    private int batchPutConsumerThreadCount;
    private int multiFieldBatchPutConsumerThreadCount;
    private int pointsBatchPutConsumerThreadCount;
    private HttpClient httpclient;
    private Config config;
    private RateLimiter rateLimiter;
    private CountDownLatch countDownLatch;

    public DefaultBatchPutConsumer(DataQueue buffer, HttpClient httpclient, RateLimiter rateLimiter, Config config) {
        this.dataQueue = buffer;
        this.httpclient = httpclient;
        this.config = config;
        this.batchPutConsumerThreadCount = config.getBatchPutConsumerThreadCount();
        this.multiFieldBatchPutConsumerThreadCount = config.getMultiFieldBatchPutConsumerThreadCount();
        this.pointsBatchPutConsumerThreadCount = Math.max(this.batchPutConsumerThreadCount, this.multiFieldBatchPutConsumerThreadCount);
        this.rateLimiter = rateLimiter;
        if (this.batchPutConsumerThreadCount > 0) {
            this.threadPool = Executors.newFixedThreadPool(this.batchPutConsumerThreadCount, new BatchPutThreadFactory("batch-put-thread"));
        }
        if (this.multiFieldBatchPutConsumerThreadCount > 0) {
            this.multiFieldThreadPool = Executors.newFixedThreadPool(this.multiFieldBatchPutConsumerThreadCount, new BatchPutThreadFactory("multi-field-batch-put-thread"));
        }
        if (this.pointsBatchPutConsumerThreadCount > 0) {
            this.pointsThreadPool = Executors.newFixedThreadPool(this.pointsBatchPutConsumerThreadCount, new BatchPutThreadFactory("points-batch-put-thread"));
        }
        this.countDownLatch = new CountDownLatch(config.getBatchPutConsumerThreadCount() + config.getMultiFieldBatchPutConsumerThreadCount() + this.pointsBatchPutConsumerThreadCount);
    }

    @Override
    public void start() {
        int i;
        for (i = 0; i < this.batchPutConsumerThreadCount; ++i) {
            this.threadPool.submit(new BatchPutRunnable(this.dataQueue, this.httpclient, this.config, this.countDownLatch, this.rateLimiter));
        }
        for (i = 0; i < this.multiFieldBatchPutConsumerThreadCount; ++i) {
            this.multiFieldThreadPool.submit(new MultiFieldBatchPutRunnable(this.dataQueue, this.httpclient, this.config, this.countDownLatch, this.rateLimiter));
        }
        for (i = 0; i < this.pointsBatchPutConsumerThreadCount; ++i) {
            this.pointsThreadPool.submit(new PointsCollectionPutRunnable(this.dataQueue, this.httpclient, this.countDownLatch, this.config, this.rateLimiter));
        }
    }

    @Override
    public void stop() {
        this.stop(false);
    }

    @Override
    public void stop(boolean force) {
        if (force) {
            if (this.threadPool != null) {
                this.threadPool.shutdownNow();
            }
            if (this.multiFieldThreadPool != null) {
                this.multiFieldThreadPool.shutdownNow();
            }
            if (this.pointsThreadPool != null) {
                this.pointsThreadPool.shutdownNow();
            }
        } else {
            if (this.threadPool != null) {
                while (!this.threadPool.isShutdown() || !this.threadPool.isTerminated()) {
                    this.threadPool.shutdownNow();
                }
            }
            if (this.multiFieldThreadPool != null) {
                while (!this.multiFieldThreadPool.isShutdown() || !this.multiFieldThreadPool.isTerminated()) {
                    this.multiFieldThreadPool.shutdownNow();
                }
            }
            if (this.pointsThreadPool != null) {
                while (!this.pointsThreadPool.isShutdown() || !this.pointsThreadPool.isTerminated()) {
                    this.pointsThreadPool.shutdownNow();
                }
            }
            try {
                this.countDownLatch.await();
            }
            catch (InterruptedException e) {
                LOGGER.error("An error occurred waiting for the consumer thread to close", (Throwable)e);
            }
        }
        if (this.dataQueue != null) {
            this.dataQueue = null;
        }
    }
}

