/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamOperatorStateHandler {
    protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class);
    @Nullable
    private final AbstractKeyedStateBackend<?> keyedStateBackend;
    private final CloseableRegistry closeableRegistry;
    @Nullable
    private final DefaultKeyedStateStore keyedStateStore;
    private final OperatorStateBackend operatorStateBackend;
    private final StreamOperatorStateContext context;

    public StreamOperatorStateHandler(StreamOperatorStateContext context, ExecutionConfig executionConfig, CloseableRegistry closeableRegistry) {
        this.context = context;
        this.operatorStateBackend = context.operatorStateBackend();
        this.keyedStateBackend = context.keyedStateBackend();
        this.closeableRegistry = closeableRegistry;
        this.keyedStateStore = this.keyedStateBackend != null ? new DefaultKeyedStateStore(this.keyedStateBackend, executionConfig) : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initializeOperatorState(CheckpointedStreamOperator streamOperator) throws Exception {
        CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = this.context.rawKeyedStateInputs();
        CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = this.context.rawOperatorStateInputs();
        try {
            StateInitializationContextImpl initializationContext = new StateInitializationContextImpl(this.context.isRestored(), (OperatorStateStore)this.operatorStateBackend, (KeyedStateStore)this.keyedStateStore, keyedStateInputs, operatorStateInputs);
            streamOperator.initializeState((StateInitializationContext)initializationContext);
        }
        finally {
            StreamOperatorStateHandler.closeFromRegistry(operatorStateInputs, this.closeableRegistry);
            StreamOperatorStateHandler.closeFromRegistry(keyedStateInputs, this.closeableRegistry);
        }
    }

    private static void closeFromRegistry(Closeable closeable, CloseableRegistry registry) {
        if (registry.unregisterCloseable(closeable)) {
            IOUtils.closeQuietly((AutoCloseable)closeable);
        }
    }

    public void dispose() throws Exception {
        try (Closer closer = Closer.create();){
            if (this.closeableRegistry.unregisterCloseable((Closeable)this.operatorStateBackend)) {
                closer.register((Closeable)this.operatorStateBackend);
            }
            if (this.closeableRegistry.unregisterCloseable(this.keyedStateBackend)) {
                closer.register(this.keyedStateBackend);
            }
            if (this.operatorStateBackend != null) {
                closer.register(() -> this.operatorStateBackend.dispose());
            }
            if (this.keyedStateBackend != null) {
                closer.register(() -> this.keyedStateBackend.dispose());
            }
        }
    }

    public OperatorSnapshotFutures snapshotState(CheckpointedStreamOperator streamOperator, Optional<InternalTimeServiceManager<?>> timeServiceManager, String operatorName, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws CheckpointException {
        KeyGroupRange keyGroupRange = null != this.keyedStateBackend ? this.keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
        StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(checkpointId, timestamp, factory, keyGroupRange, this.closeableRegistry);
        this.snapshotState(streamOperator, timeServiceManager, operatorName, checkpointId, timestamp, checkpointOptions, factory, snapshotInProgress, snapshotContext);
        return snapshotInProgress;
    }

    @VisibleForTesting
    void snapshotState(CheckpointedStreamOperator streamOperator, Optional<InternalTimeServiceManager<?>> timeServiceManager, String operatorName, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory, OperatorSnapshotFutures snapshotInProgress, StateSnapshotContextSynchronousImpl snapshotContext) throws CheckpointException {
        try {
            if (timeServiceManager.isPresent()) {
                Preconditions.checkState((this.keyedStateBackend != null ? 1 : 0) != 0, (Object)"keyedStateBackend should be available with timeServiceManager");
                timeServiceManager.get().snapshotState((KeyedStateBackend<?>)this.keyedStateBackend, (StateSnapshotContext)snapshotContext, operatorName);
            }
            streamOperator.snapshotState((StateSnapshotContext)snapshotContext);
            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
            if (null != this.operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(this.operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
            if (null != this.keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(this.keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
        }
        catch (Exception snapshotException) {
            try {
                snapshotInProgress.cancel();
            }
            catch (Exception e) {
                snapshotException.addSuppressed(e);
            }
            String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " + operatorName + ".";
            try {
                snapshotContext.closeExceptionally();
            }
            catch (IOException e) {
                snapshotException.addSuppressed(e);
            }
            throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, (Throwable)snapshotException);
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (this.keyedStateBackend != null) {
            this.keyedStateBackend.notifyCheckpointComplete(checkpointId);
        }
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        if (this.keyedStateBackend != null) {
            this.keyedStateBackend.notifyCheckpointAborted(checkpointId);
        }
    }

    public <K> KeyedStateBackend<K> getKeyedStateBackend() {
        return this.keyedStateBackend;
    }

    public OperatorStateBackend getOperatorStateBackend() {
        return this.operatorStateBackend;
    }

    public <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        if (this.keyedStateStore != null) {
            return (S)this.keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
        }
        throw new IllegalStateException("Cannot create partitioned state. The keyed state backend has not been set.This indicates that the operator is not partitioned/keyed.");
    }

    protected <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        if (this.keyedStateStore != null) {
            return (S)this.keyedStateBackend.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
        }
        throw new RuntimeException("Cannot create partitioned state. The keyed state backend has not been set. This indicates that the operator is not partitioned/keyed.");
    }

    public void setCurrentKey(Object key) {
        if (this.keyedStateBackend != null) {
            try {
                AbstractKeyedStateBackend<?> rawBackend = this.keyedStateBackend;
                rawBackend.setCurrentKey(key);
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while setting the current key context.", e);
            }
        }
    }

    public Object getCurrentKey() {
        if (this.keyedStateBackend != null) {
            return this.keyedStateBackend.getCurrentKey();
        }
        throw new UnsupportedOperationException("Key can only be retrieved on KeyedStream.");
    }

    public Optional<KeyedStateStore> getKeyedStateStore() {
        return Optional.ofNullable(this.keyedStateStore);
    }

    public static interface CheckpointedStreamOperator {
        public void initializeState(StateInitializationContext var1) throws Exception;

        public void snapshotState(StateSnapshotContext var1) throws Exception;
    }
}

