/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayDeque;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.streaming.runtime.io.AbstractBufferStorage;
import org.apache.flink.streaming.runtime.io.BufferOrEventSequence;

@Internal
public class CachedBufferStorage
extends AbstractBufferStorage {
    private final int pageSize;
    private long bytesBlocked;
    private ArrayDeque<BufferOrEvent> currentBuffers;

    public CachedBufferStorage(int pageSize) {
        this(pageSize, -1L, "Unknown");
    }

    public CachedBufferStorage(int pageSize, long maxBufferedBytes, String taskName) {
        super(maxBufferedBytes, taskName);
        this.pageSize = pageSize;
        this.currentBuffers = new ArrayDeque();
    }

    @Override
    public void add(BufferOrEvent boe) {
        this.bytesBlocked += (long)this.pageSize;
        this.currentBuffers.add(boe);
    }

    @Override
    public BufferOrEventSequence rollOverReusingResources() {
        return this.rollOverWithoutReusingResources();
    }

    @Override
    public BufferOrEventSequence rollOverWithoutReusingResources() {
        if (this.bytesBlocked == 0L) {
            return null;
        }
        CachedBufferOrEventSequence currentSequence = new CachedBufferOrEventSequence(this.currentBuffers, this.bytesBlocked);
        this.currentBuffers = new ArrayDeque();
        this.bytesBlocked = 0L;
        return currentSequence;
    }

    @Override
    public void close() throws IOException {
        BufferOrEvent boe;
        while ((boe = this.currentBuffers.poll()) != null) {
            if (!boe.isBuffer()) continue;
            boe.getBuffer().recycleBuffer();
        }
        super.close();
    }

    @Override
    public long getPendingBytes() {
        return this.bytesBlocked;
    }

    public static class CachedBufferOrEventSequence
    implements BufferOrEventSequence {
        private final ArrayDeque<BufferOrEvent> queuedBuffers;
        private final long size;

        CachedBufferOrEventSequence(ArrayDeque<BufferOrEvent> buffers, long size) {
            this.queuedBuffers = buffers;
            this.size = size;
        }

        @Override
        public void open() {
        }

        @Override
        @Nullable
        public BufferOrEvent getNext() {
            return this.queuedBuffers.poll();
        }

        @Override
        public void cleanup() {
            BufferOrEvent boe;
            while ((boe = this.queuedBuffers.poll()) != null) {
                if (!boe.isBuffer()) continue;
                boe.getBuffer().recycleBuffer();
            }
        }

        @Override
        public long size() {
            return this.size;
        }
    }
}

