/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ons.open.trace.core.dispatch.impl;

import com.alibaba.ons.open.trace.core.dispatch.AsyncAppender;
import com.alibaba.ons.open.trace.core.dispatch.AsyncDispatcher;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;

public class AsyncTraceDispatcher
extends AsyncDispatcher {
    private static final Logger clientlog = ClientLogger.getLog();
    private final Object[] entries;
    private final int queueSize;
    private final int indexMask;
    private final int notifyThreshold;
    private final int maxDelayTime = 20;
    private final ReentrantLock lock;
    private final Condition notEmpty;
    private AtomicLong putIndex;
    private AtomicLong discardCount;
    private AtomicLong takeIndex;
    private AsyncAppender appender;
    private String workerName;
    private Thread worker;
    private AtomicBoolean running;

    public AsyncTraceDispatcher(Properties properties) {
        int queueSize = Integer.parseInt(properties.getProperty("AsyncBufferSize", "2048"));
        this.queueSize = queueSize = 1 << 32 - Integer.numberOfLeadingZeros(queueSize - 1);
        this.entries = new Object[queueSize];
        this.indexMask = queueSize - 1;
        this.notifyThreshold = Integer.parseInt(properties.getProperty("WakeUpNum", "1"));
        this.putIndex = new AtomicLong(0L);
        this.discardCount = new AtomicLong(0L);
        this.takeIndex = new AtomicLong(0L);
        this.running = new AtomicBoolean(false);
        this.lock = new ReentrantLock(false);
        this.notEmpty = this.lock.newCondition();
    }

    @Override
    public void start(AsyncAppender appender, String workerName) {
        this.appender = appender;
        this.workerName = workerName;
        this.worker = new Thread((Runnable)new AsyncRunnable(), "MQ-AsyncDispatcher-Thread-" + workerName);
        this.worker.setDaemon(true);
        this.worker.start();
    }

    public int size() {
        return (int)(this.putIndex.get() - this.takeIndex.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean append(Object ctx) {
        long size;
        long put;
        long qsize = this.queueSize;
        do {
            if ((size = (put = this.putIndex.get()) - this.takeIndex.get()) < qsize) continue;
            clientlog.info("msgtrace buffer is full,the loss count is" + this.discardCount.incrementAndGet() + "  " + ctx);
            return false;
        } while (!this.putIndex.compareAndSet(put, put + 1L));
        this.entries[(int)put & this.indexMask] = ctx;
        if (size >= (long)this.notifyThreshold && !this.running.get() && this.lock.tryLock()) {
            try {
                this.notEmpty.signal();
            }
            catch (Exception e) {
                clientlog.info("fail to signal notEmpty,maybe block!");
            }
            finally {
                this.lock.unlock();
            }
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flush() throws IOException {
        long end = System.currentTimeMillis() + 500L;
        while (this.size() > 0 && System.currentTimeMillis() <= end) {
            if (this.running.get()) {
                try {
                    Thread.sleep(1L);
                    continue;
                }
                catch (InterruptedException e) {
                    break;
                }
            }
            if (!this.lock.tryLock()) continue;
            try {
                this.notEmpty.signal();
            }
            catch (Exception e) {
                clientlog.info("fail to signal notEmpty,maybe block!");
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    class AsyncRunnable
    implements Runnable {
        AsyncRunnable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            AsyncTraceDispatcher parent = AsyncTraceDispatcher.this;
            int indexMask = parent.indexMask;
            int queueSize = parent.queueSize;
            String workerName = parent.workerName;
            Object[] entries = parent.entries;
            AtomicLong putIndex = parent.putIndex;
            AtomicLong takeIndex = parent.takeIndex;
            AtomicLong discardCount = parent.discardCount;
            AtomicBoolean running = parent.running;
            ReentrantLock lock = parent.lock;
            Condition notEmpty = parent.notEmpty;
            long outputSpan = TimeUnit.MINUTES.toMillis(1L);
            long lastOutputTime = System.currentTimeMillis();
            while (true) {
                try {
                    while (true) {
                        long now;
                        running.set(true);
                        long take = takeIndex.get();
                        long size = putIndex.get() - take;
                        if (size <= 0L) {
                            if (!lock.tryLock()) continue;
                            try {
                                running.set(false);
                                notEmpty.await(20L, TimeUnit.MILLISECONDS);
                            }
                            finally {
                                lock.unlock();
                            }
                            continue;
                        }
                        do {
                            int idx = (int)take & indexMask;
                            Object ctx = entries[idx];
                            while (ctx == null) {
                                Thread.yield();
                                ctx = entries[idx];
                            }
                            entries[idx] = null;
                            takeIndex.set(++take);
                            parent.appender.append(ctx);
                        } while (--size > 0L);
                        parent.appender.flush();
                        long discardNum = discardCount.get();
                        if (discardNum <= 0L || (now = System.currentTimeMillis()) - lastOutputTime <= outputSpan) continue;
                        discardNum = discardCount.get();
                        discardCount.lazySet(0L);
                        lastOutputTime = now;
                    }
                }
                catch (InterruptedException e) {
                    clientlog.info("[WARN] " + workerName + " async thread is iterrupted");
                }
                catch (Exception e) {
                    clientlog.info("[ERROR] Fail to async write log");
                    continue;
                }
                break;
            }
            running.set(false);
        }
    }
}

