/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EventAnnouncement;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class ChannelStatePersister {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelStatePersister.class);
    private final InputChannelInfo channelInfo;
    private CheckpointStatus checkpointStatus = CheckpointStatus.COMPLETED;
    private long lastSeenBarrier = -1L;
    private final ChannelStateWriter channelStateWriter;

    ChannelStatePersister(ChannelStateWriter channelStateWriter, InputChannelInfo channelInfo) {
        this.channelStateWriter = (ChannelStateWriter)Preconditions.checkNotNull((Object)channelStateWriter);
        this.channelInfo = (InputChannelInfo)Preconditions.checkNotNull((Object)channelInfo);
    }

    protected void startPersisting(long barrierId, List<Buffer> knownBuffers) throws CheckpointException {
        this.logEvent("startPersisting", barrierId);
        if (this.checkpointStatus == CheckpointStatus.BARRIER_RECEIVED && this.lastSeenBarrier > barrierId) {
            throw new CheckpointException(String.format("Barrier for newer checkpoint %d has already been received compared to the requested checkpoint %d", this.lastSeenBarrier, barrierId), CheckpointFailureReason.CHECKPOINT_SUBSUMED);
        }
        if (this.lastSeenBarrier < barrierId) {
            this.checkpointStatus = CheckpointStatus.BARRIER_PENDING;
            this.lastSeenBarrier = barrierId;
        }
        if (knownBuffers.size() > 0) {
            this.channelStateWriter.addInputData(barrierId, this.channelInfo, -2, (CloseableIterator<Buffer>)CloseableIterator.fromList(knownBuffers, Buffer::recycleBuffer));
        }
    }

    protected void stopPersisting(long id) {
        this.logEvent("stopPersisting", id);
        if (id >= this.lastSeenBarrier) {
            this.checkpointStatus = CheckpointStatus.COMPLETED;
            this.lastSeenBarrier = id;
        }
    }

    protected void maybePersist(Buffer buffer) {
        if (this.checkpointStatus == CheckpointStatus.BARRIER_PENDING && buffer.isBuffer()) {
            this.channelStateWriter.addInputData(this.lastSeenBarrier, this.channelInfo, -2, (CloseableIterator<Buffer>)CloseableIterator.ofElement((Object)buffer.retainBuffer(), Buffer::recycleBuffer));
        }
    }

    protected Optional<Long> checkForBarrier(Buffer buffer) throws IOException {
        EventAnnouncement announcement;
        AbstractEvent event = this.parseEvent(buffer);
        if (event instanceof CheckpointBarrier) {
            long expectedBarrierId;
            long barrierId = ((CheckpointBarrier)event).getId();
            long l = expectedBarrierId = this.checkpointStatus == CheckpointStatus.COMPLETED ? this.lastSeenBarrier + 1L : this.lastSeenBarrier;
            if (barrierId >= expectedBarrierId) {
                this.logEvent("found barrier", barrierId);
                this.checkpointStatus = CheckpointStatus.BARRIER_RECEIVED;
                this.lastSeenBarrier = barrierId;
                return Optional.of(this.lastSeenBarrier);
            }
            this.logEvent("ignoring barrier", barrierId);
        }
        if (event instanceof EventAnnouncement && (announcement = (EventAnnouncement)event).getAnnouncedEvent() instanceof CheckpointBarrier) {
            long barrierId = ((CheckpointBarrier)announcement.getAnnouncedEvent()).getId();
            this.logEvent("found announcement for barrier", barrierId);
            return Optional.of(barrierId);
        }
        return Optional.empty();
    }

    private void logEvent(String event, long barrierId) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} {}, lastSeenBarrier = {} ({}) @ {}", new Object[]{event, barrierId, this.lastSeenBarrier, this.checkpointStatus, this.channelInfo});
        }
    }

    @Nullable
    protected AbstractEvent parseEvent(Buffer buffer) throws IOException {
        if (buffer.isBuffer()) {
            return null;
        }
        AbstractEvent event = EventSerializer.fromBuffer(buffer, this.getClass().getClassLoader());
        buffer.setReaderIndex(0);
        return event;
    }

    protected boolean hasBarrierReceived() {
        return this.checkpointStatus == CheckpointStatus.BARRIER_RECEIVED;
    }

    public String toString() {
        return "ChannelStatePersister(lastSeenBarrier=" + this.lastSeenBarrier + " (" + (Object)((Object)this.checkpointStatus) + ")}";
    }

    private static enum CheckpointStatus {
        COMPLETED,
        BARRIER_PENDING,
        BARRIER_RECEIVED;

    }
}

