/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
implements OperatorCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinator.class);
    private final String operatorName;
    private final ExecutorService coordinatorExecutor;
    private final Source<?, SplitT, EnumChkT> source;
    private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
    private final SimpleVersionedSerializer<SplitT> splitSerializer;
    private final SourceCoordinatorContext<SplitT> context;
    private SplitEnumerator<SplitT, EnumChkT> enumerator;
    private boolean started;

    public SourceCoordinator(String operatorName, ExecutorService coordinatorExecutor, Source<?, SplitT, EnumChkT> source, SourceCoordinatorContext<SplitT> context) {
        this.operatorName = operatorName;
        this.coordinatorExecutor = coordinatorExecutor;
        this.source = source;
        this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
        this.splitSerializer = source.getSplitSerializer();
        this.context = context;
        this.enumerator = source.createEnumerator(context);
        this.started = false;
    }

    @Override
    public void start() throws Exception {
        LOG.info("Starting split enumerator for source {}.", (Object)this.operatorName);
        this.enumerator.start();
        this.started = true;
    }

    @Override
    public void close() throws Exception {
        LOG.info("Closing SourceCoordinator for source {}.", (Object)this.operatorName);
        boolean successfullyClosed = false;
        try {
            if (this.started) {
                this.context.close();
                this.enumerator.close();
            }
        }
        finally {
            this.coordinatorExecutor.shutdownNow();
            successfullyClosed = this.coordinatorExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        }
        if (!successfullyClosed) {
            throw new TimeoutException("The source coordinator failed to close before timeout.");
        }
        LOG.info("Source coordinator for source {} closed.", (Object)this.operatorName);
    }

    @Override
    public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
        this.ensureStarted();
        this.coordinatorExecutor.execute(() -> {
            try {
                LOG.debug("Handling event from subtask {} of source {}: {}", new Object[]{subtask, this.operatorName, event});
                if (event instanceof SourceEventWrapper) {
                    this.enumerator.handleSourceEvent(subtask, ((SourceEventWrapper)event).getSourceEvent());
                } else if (event instanceof ReaderRegistrationEvent) {
                    this.handleReaderRegistrationEvent((ReaderRegistrationEvent)event);
                }
            }
            catch (Exception e) {
                LOG.error("Failing the job due to exception when handling operator event {} from subtask {} of source {}.", new Object[]{event, subtask, this.operatorName, e});
                this.context.failJob(e);
            }
        });
    }

    @Override
    public void subtaskFailed(int subtaskId, @Nullable Throwable reason) {
        this.ensureStarted();
        this.coordinatorExecutor.execute(() -> {
            try {
                LOG.info("Handling subtask {} failure of source {}.", (Object)subtaskId, (Object)this.operatorName);
                List<SplitT> splitsToAddBack = this.context.getAndRemoveUncheckpointedAssignment(subtaskId);
                this.context.unregisterSourceReader(subtaskId);
                LOG.debug("Adding {} back to the split enumerator of source {}.", splitsToAddBack, (Object)this.operatorName);
                this.enumerator.addSplitsBack(splitsToAddBack, subtaskId);
            }
            catch (Exception e) {
                LOG.error("Failing the job due to exception when handling subtask {} failure in source {}.", new Object[]{subtaskId, this.operatorName, e});
                this.context.failJob(e);
            }
        });
    }

    @Override
    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
        this.ensureStarted();
        this.coordinatorExecutor.execute(() -> {
            try {
                LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", (Object)this.operatorName, (Object)checkpointId);
                result.complete(this.toBytes(checkpointId));
            }
            catch (Exception e) {
                result.completeExceptionally(new CompletionException(String.format("Failed to checkpoint coordinator for source %s due to ", this.operatorName), e));
            }
        });
    }

    @Override
    public void checkpointComplete(long checkpointId) {
        this.ensureStarted();
        this.coordinatorExecutor.execute(() -> {
            try {
                LOG.info("Marking checkpoint {} as completed for source {}.", (Object)checkpointId, (Object)this.operatorName);
                this.context.onCheckpointComplete(checkpointId);
            }
            catch (Exception e) {
                LOG.error("Failing the job due to exception when completing the checkpoint {} for source {}.", new Object[]{checkpointId, this.operatorName, e});
                this.context.failJob(e);
            }
        });
    }

    @Override
    public void resetToCheckpoint(byte[] checkpointData) throws Exception {
        if (this.started) {
            throw new IllegalStateException(String.format("The coordinator for source %s has started. The source coordinator state can only be reset to a checkpoint before it starts.", this.operatorName));
        }
        LOG.info("Resetting coordinator of source {} from checkpoint.", (Object)this.operatorName);
        this.fromBytes(checkpointData);
    }

    @VisibleForTesting
    SplitEnumerator<SplitT, EnumChkT> getEnumerator() {
        return this.enumerator;
    }

    @VisibleForTesting
    SourceCoordinatorContext<SplitT> getContext() {
        return this.context;
    }

    /*
     * Exception decompiling
     */
    private byte[] toBytes(long checkpointId) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void fromBytes(byte[] bytes) throws Exception {
        try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
             DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)bais);){
            SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion((DataInputStream)in);
            int enumSerializerVersion = in.readInt();
            int serializedEnumChkptSize = in.readInt();
            byte[] serializedEnumChkpt = SourceCoordinatorSerdeUtils.readBytes((DataInputStream)in, serializedEnumChkptSize);
            Object enumChkpt = this.enumCheckpointSerializer.deserialize(enumSerializerVersion, serializedEnumChkpt);
            this.context.restoreState(this.splitSerializer, (DataInputStream)in);
            this.enumerator = this.source.restoreEnumerator(this.context, enumChkpt);
        }
    }

    private void handleReaderRegistrationEvent(ReaderRegistrationEvent event) {
        this.context.registerSourceReader(new ReaderInfo(event.subtaskId(), event.location()));
        this.enumerator.addReader(event.subtaskId());
    }

    private void ensureStarted() {
        if (!this.started) {
            throw new IllegalStateException("The coordinator has not started yet.");
        }
    }
}

