/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.PullingAsyncDataInput;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.runtime.io.BufferStorage;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class CheckpointedInputGate
implements PullingAsyncDataInput<BufferOrEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointedInputGate.class);
    private final CheckpointBarrierHandler barrierHandler;
    private final InputGate inputGate;
    private final int channelIndexOffset;
    private final BufferStorage bufferStorage;
    private boolean endOfInputGate;
    private boolean isFinished;

    public CheckpointedInputGate(InputGate inputGate, BufferStorage bufferStorage, String taskName, @Nullable AbstractInvokable toNotifyOnCheckpoint) {
        this(inputGate, bufferStorage, new CheckpointBarrierAligner(inputGate.getNumberOfInputChannels(), taskName, toNotifyOnCheckpoint));
    }

    public CheckpointedInputGate(InputGate inputGate, BufferStorage bufferStorage, CheckpointBarrierHandler barrierHandler) {
        this(inputGate, bufferStorage, barrierHandler, 0);
    }

    public CheckpointedInputGate(InputGate inputGate, BufferStorage bufferStorage, CheckpointBarrierHandler barrierHandler, int channelIndexOffset) {
        this.inputGate = inputGate;
        this.channelIndexOffset = channelIndexOffset;
        this.bufferStorage = (BufferStorage)Preconditions.checkNotNull((Object)bufferStorage);
        this.barrierHandler = barrierHandler;
    }

    public CompletableFuture<?> getAvailableFuture() {
        if (this.bufferStorage.isEmpty()) {
            return this.inputGate.getAvailableFuture();
        }
        return AVAILABLE;
    }

    public Optional<BufferOrEvent> pollNext() throws Exception {
        BufferOrEvent bufferOrEvent;
        Optional<BufferOrEvent> next;
        while (true) {
            if (this.bufferStorage.isEmpty()) {
                next = this.inputGate.pollNext();
            } else {
                next = this.bufferStorage.pollNext();
                if (!next.isPresent()) {
                    return this.pollNext();
                }
            }
            if (!next.isPresent()) {
                return this.handleEmptyBuffer();
            }
            bufferOrEvent = next.get();
            if (this.barrierHandler.isBlocked(this.offsetChannelIndex(bufferOrEvent.getChannelIndex()))) {
                this.bufferStorage.add(bufferOrEvent);
                if (!this.bufferStorage.isFull()) continue;
                this.barrierHandler.checkpointSizeLimitExceeded(this.bufferStorage.getMaxBufferedBytes());
                this.bufferStorage.rollOver();
                continue;
            }
            if (bufferOrEvent.isBuffer()) {
                return next;
            }
            if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
                CheckpointBarrier checkpointBarrier = (CheckpointBarrier)bufferOrEvent.getEvent();
                if (this.endOfInputGate || !this.barrierHandler.processBarrier(checkpointBarrier, this.offsetChannelIndex(bufferOrEvent.getChannelIndex()), this.bufferStorage.getPendingBytes())) continue;
                this.bufferStorage.rollOver();
                continue;
            }
            if (bufferOrEvent.getEvent().getClass() != CancelCheckpointMarker.class) break;
            if (!this.barrierHandler.processCancellationBarrier((CancelCheckpointMarker)bufferOrEvent.getEvent())) continue;
            this.bufferStorage.rollOver();
        }
        if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && this.barrierHandler.processEndOfPartition()) {
            this.bufferStorage.rollOver();
        }
        return next;
    }

    private int offsetChannelIndex(int channelIndex) {
        return channelIndex + this.channelIndexOffset;
    }

    private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception {
        if (!this.inputGate.isFinished()) {
            return Optional.empty();
        }
        if (this.endOfInputGate) {
            this.isFinished = true;
            return Optional.empty();
        }
        this.endOfInputGate = true;
        this.barrierHandler.releaseBlocksAndResetBarriers();
        this.bufferStorage.rollOver();
        return this.pollNext();
    }

    public boolean isEmpty() {
        return this.bufferStorage.isEmpty();
    }

    public boolean isFinished() {
        return this.isFinished;
    }

    public void cleanup() throws IOException {
        this.bufferStorage.close();
    }

    public long getLatestCheckpointId() {
        return this.barrierHandler.getLatestCheckpointId();
    }

    public long getAlignmentDurationNanos() {
        return this.barrierHandler.getAlignmentDurationNanos();
    }

    public int getNumberOfInputChannels() {
        return this.inputGate.getNumberOfInputChannels();
    }

    public String toString() {
        return this.barrierHandler.toString();
    }
}

