/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shenyu.plugin.logging.rocketmq;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.shenyu.common.utils.ThreadUtils;
import org.apache.shenyu.plugin.logging.rocketmq.LogCollector;
import org.apache.shenyu.plugin.logging.rocketmq.LogConsumeClient;
import org.apache.shenyu.plugin.logging.rocketmq.entity.ShenyuRequestLog;
import org.apache.shenyu.plugin.logging.rocketmq.utils.LogCollectConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractLogCollector
implements LogCollector {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractLogCollector.class);
    private int bufferSize;
    private BlockingQueue<ShenyuRequestLog> bufferQueue;
    private long lastPushTime;
    private final AtomicBoolean started = new AtomicBoolean(true);

    @Override
    public void start() {
        this.bufferSize = LogCollectConfigUtils.getGlobalLogConfig().getBufferQueueSize();
        this.bufferQueue = new LinkedBlockingDeque<ShenyuRequestLog>(this.bufferSize);
        ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
        this.started.set(true);
        threadExecutor.execute(this::consume);
    }

    @Override
    public void collect(ShenyuRequestLog log) {
        if (Objects.isNull(log) || Objects.isNull(this.getLogConsumeClient())) {
            return;
        }
        if (this.bufferQueue.size() < this.bufferSize) {
            this.bufferQueue.add(log);
        }
    }

    private void consume() {
        while (this.started.get()) {
            int diffTimeMSForPush = 100;
            try {
                ArrayList<ShenyuRequestLog> logs = new ArrayList<ShenyuRequestLog>();
                int size = this.bufferQueue.size();
                long time = System.currentTimeMillis();
                long timeDiffMs = time - this.lastPushTime;
                int batchSize = 100;
                if (size >= batchSize || timeDiffMs > (long)diffTimeMSForPush) {
                    this.bufferQueue.drainTo(logs, batchSize);
                    LogConsumeClient logCollectClient = this.getLogConsumeClient();
                    if (logCollectClient != null) {
                        logCollectClient.consume(logs);
                    }
                    this.lastPushTime = time;
                    continue;
                }
                ThreadUtils.sleep((TimeUnit)TimeUnit.MILLISECONDS, (int)diffTimeMSForPush);
            }
            catch (Exception e) {
                LOG.error("DefaultLogCollector collect log error", (Throwable)e);
                ThreadUtils.sleep((TimeUnit)TimeUnit.MILLISECONDS, (int)diffTimeMSForPush);
            }
        }
    }

    protected abstract LogConsumeClient getLogConsumeClient();

    @Override
    public void close() throws Exception {
        this.started.set(false);
        LogConsumeClient logCollectClient = this.getLogConsumeClient();
        if (logCollectClient != null) {
            logCollectClient.close();
        }
    }
}

