/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.client.common.window;

import com.aizuda.snailjob.common.core.window.Listener;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlidingRingWindow<T> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SlidingRingWindow.class);
    private final AtomicReferenceArray<Window<T>> ringArray;
    private final Duration duration;
    private final Integer totalThreshold;
    private final List<Listener<T>> listener;
    private final AtomicLong sequencer = new AtomicLong();
    private final ScheduledExecutorService scheduler;
    private static final int DEFAULT_RING_SIZE = 4;

    public SlidingRingWindow(Duration duration, Integer totalThreshold, List<Listener<T>> listener) {
        this(4, duration, totalThreshold, listener, new ScheduledThreadPoolExecutor(3));
    }

    public SlidingRingWindow(int ringSize, Duration duration, Integer totalThreshold, List<Listener<T>> listener) {
        this(ringSize, duration, totalThreshold, listener, new ScheduledThreadPoolExecutor(3));
    }

    public SlidingRingWindow(int ringSize, Duration duration, Integer totalThreshold, List<Listener<T>> listener, ScheduledExecutorService scheduler) {
        this.duration = duration;
        this.totalThreshold = totalThreshold;
        this.ringArray = new AtomicReferenceArray(ringSize);
        this.listener = listener;
        this.scheduler = scheduler;
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("JVM is about to exit, emitting data in the Window");
            for (int i = 0; i < ringSize; ++i) {
                this.emit(this.getWindow(i));
            }
        }));
    }

    public void shutdown() {
        log.info("Sliding window is about to exit, emitting data in the Window");
        for (int i = 0; i < this.ringArray.length(); ++i) {
            this.emit(this.getWindow(i));
        }
    }

    public void add(T data) {
        Window<T> currentWindow;
        block3: {
            Window<T> newWindow;
            long now = System.currentTimeMillis();
            while (true) {
                long currentSequence;
                if ((currentWindow = this.getWindow(currentSequence = this.sequencer.get())) == null) {
                    newWindow = this.createNewWindow(currentSequence, null, now);
                    if (newWindow == null) continue;
                    newWindow.getQueue().add(data);
                    this.scheduleWindowEmit(newWindow, now);
                    return;
                }
                if (!currentWindow.isOutWindow(now)) break block3;
                long nextSequence = currentSequence + 1L;
                Window<T> nextWindow = this.getWindow(nextSequence);
                if ((nextWindow == null || nextWindow.isOutWindow(now)) && (newWindow = this.createNewWindow(nextSequence, nextWindow, now)) != null) break;
            }
            this.sequencer.incrementAndGet();
            newWindow.getQueue().add(data);
            this.scheduleWindowEmit(newWindow, now);
            return;
        }
        ConcurrentLinkedQueue<T> queue = currentWindow.getQueue();
        queue.add(data);
        if (queue.size() >= this.totalThreshold || currentWindow.getEmitStatus()) {
            this.emit(currentWindow);
        }
    }

    private void scheduleWindowEmit(Window<T> window, long now) {
        this.scheduler.schedule(() -> {
            window.setEmitted();
            this.emit(window);
        }, Math.max(window.windowTime - now, 100L), TimeUnit.MILLISECONDS);
    }

    private void emit(Window<T> window) {
        if (Objects.isNull(window)) {
            return;
        }
        List drainList = window.drain();
        if (drainList.isEmpty()) {
            return;
        }
        try {
            this.listener.forEach(consumer -> consumer.handler(drainList));
        }
        catch (Throwable e) {
            log.error("sliding window emit is error", e);
        }
    }

    private Window<T> createNewWindow(long sequence, Window<T> oldWindow, long now) {
        Window newWindow;
        int index = this.getIndex(sequence);
        if (this.ringArray.compareAndSet(index, oldWindow, newWindow = new Window(now + this.duration.toMillis(), ConcurrentLinkedQueue::new))) {
            return newWindow;
        }
        return null;
    }

    private Window<T> getWindow(long sequence) {
        int index = this.getIndex(sequence);
        return this.ringArray.get(index);
    }

    private int getIndex(long sequence) {
        return (int)(sequence % (long)this.ringArray.length());
    }

    private static final class Window<T> {
        private final long windowTime;
        private final AtomicBoolean emitStatus = new AtomicBoolean(false);
        private final Supplier<ConcurrentLinkedQueue<T>> dataQueueSupplier;
        private volatile ConcurrentLinkedQueue<T> dataQueue;
        private final AtomicInteger wip = new AtomicInteger();

        private Window(long windowTime, Supplier<ConcurrentLinkedQueue<T>> dataQueueSupplier) {
            this.windowTime = windowTime;
            this.dataQueueSupplier = dataQueueSupplier;
        }

        public void setEmitted() {
            this.emitStatus.set(true);
        }

        public boolean getEmitStatus() {
            return this.emitStatus.get();
        }

        public boolean isOutWindow(long now) {
            return this.windowTime < now;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ConcurrentLinkedQueue<T> getQueue() {
            if (this.dataQueue == null) {
                Window window = this;
                synchronized (window) {
                    if (this.dataQueue == null) {
                        this.dataQueue = this.dataQueueSupplier.get();
                    }
                }
            }
            return this.dataQueue;
        }

        public List<T> drain() {
            if (this.wip.getAndAdd(1) == 0) {
                ArrayList<T> list = new ArrayList<T>();
                int missed = -1;
                while (true) {
                    T element;
                    if ((element = this.dataQueue.poll()) != null) {
                        list.add(element);
                        continue;
                    }
                    if ((missed = this.wip.addAndGet(-missed)) == 0) break;
                }
                return list;
            }
            return List.of();
        }
    }
}

