/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;

public class RecreateOnResetOperatorCoordinator
implements OperatorCoordinator {
    private final Provider provider;
    private QuiesceableContext quiesceableContext;
    private OperatorCoordinator coordinator;
    private boolean started;

    private RecreateOnResetOperatorCoordinator(QuiesceableContext context, Provider provider) {
        this.quiesceableContext = context;
        this.provider = provider;
        this.coordinator = provider.getCoordinator(context);
        this.started = false;
    }

    @Override
    public void start() throws Exception {
        this.coordinator.start();
        this.started = true;
    }

    @Override
    public void close() throws Exception {
        this.coordinator.close();
    }

    @Override
    public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
        this.coordinator.handleEventFromOperator(subtask, event);
    }

    @Override
    public void subtaskFailed(int subtask, @Nullable Throwable reason) {
        this.coordinator.subtaskFailed(subtask, reason);
    }

    @Override
    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {
        this.coordinator.checkpointCoordinator(checkpointId, resultFuture);
    }

    @Override
    public void checkpointComplete(long checkpointId) {
        this.coordinator.checkpointComplete(checkpointId);
    }

    @Override
    public void resetToCheckpoint(byte[] checkpointData) throws Exception {
        this.quiesceableContext.quiesce();
        this.coordinator.close();
        this.quiesceableContext = new QuiesceableContext(this.quiesceableContext.getContext());
        this.coordinator = this.provider.getCoordinator(this.quiesceableContext);
        this.coordinator.resetToCheckpoint(checkpointData);
        if (this.started) {
            this.coordinator.start();
        }
    }

    @VisibleForTesting
    public OperatorCoordinator getInternalCoordinator() {
        return this.coordinator;
    }

    @VisibleForTesting
    QuiesceableContext getQuiesceableContext() {
        return this.quiesceableContext;
    }

    @VisibleForTesting
    static class QuiesceableContext
    implements OperatorCoordinator.Context {
        private final OperatorCoordinator.Context context;
        private volatile boolean quiesced;

        QuiesceableContext(OperatorCoordinator.Context context) {
            this.context = context;
            this.quiesced = false;
        }

        @Override
        public OperatorID getOperatorId() {
            return this.context.getOperatorId();
        }

        @Override
        public synchronized CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int targetSubtask) throws TaskNotRunningException {
            if (this.quiesced) {
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            return this.context.sendEvent(evt, targetSubtask);
        }

        @Override
        public synchronized void failJob(Throwable cause) {
            if (this.quiesced) {
                return;
            }
            this.context.failJob(cause);
        }

        @Override
        public int currentParallelism() {
            return this.context.currentParallelism();
        }

        @VisibleForTesting
        synchronized void quiesce() {
            this.quiesced = true;
        }

        @VisibleForTesting
        boolean isQuiesced() {
            return this.quiesced;
        }

        private OperatorCoordinator.Context getContext() {
            return this.context;
        }
    }

    public static abstract class Provider
    implements OperatorCoordinator.Provider {
        private static final long serialVersionUID = 3002837631612629071L;
        private final OperatorID operatorID;

        public Provider(OperatorID operatorID) {
            this.operatorID = operatorID;
        }

        @Override
        public OperatorID getOperatorId() {
            return this.operatorID;
        }

        @Override
        public OperatorCoordinator create(OperatorCoordinator.Context context) {
            QuiesceableContext quiesceableContext = new QuiesceableContext(context);
            return new RecreateOnResetOperatorCoordinator(quiesceableContext, this);
        }

        protected abstract OperatorCoordinator getCoordinator(OperatorCoordinator.Context var1);
    }
}

