/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.RestoreOperation;
import org.apache.flink.runtime.state.SavepointKeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotExecutionType;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateTable;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.heap.HeapRestoreOperation;
import org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation;
import org.apache.flink.runtime.state.heap.HeapSnapshotStrategy;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.heap.StateTableFactory;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;

public class HeapKeyedStateBackendBuilder<K>
extends AbstractKeyedStateBackendBuilder<K> {
    private final LocalRecoveryConfig localRecoveryConfig;
    private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
    private final boolean asynchronousSnapshots;

    public HeapKeyedStateBackendBuilder(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, @Nonnull Collection<KeyedStateHandle> stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, LocalRecoveryConfig localRecoveryConfig, HeapPriorityQueueSetFactory priorityQueueSetFactory, boolean asynchronousSnapshots, CloseableRegistry cancelStreamRegistry) {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider, latencyTrackingStateConfig, stateHandles, keyGroupCompressionDecorator, cancelStreamRegistry);
        this.localRecoveryConfig = localRecoveryConfig;
        this.priorityQueueSetFactory = priorityQueueSetFactory;
        this.asynchronousSnapshots = asynchronousSnapshots;
    }

    @Override
    public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
        HashMap registeredKVStates = new HashMap();
        HashMap registeredPQStates = new HashMap();
        CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
        HeapSnapshotStrategy<K> snapshotStrategy = this.initSnapshotStrategy(registeredKVStates, registeredPQStates);
        InternalKeyContextImpl keyContext = new InternalKeyContextImpl(this.keyGroupRange, this.numberOfKeyGroups);
        StateTableFactory stateTableFactory = this.asynchronousSnapshots ? CopyOnWriteStateTable::new : NestedMapsStateTable::new;
        this.restoreState(registeredKVStates, registeredPQStates, keyContext, stateTableFactory);
        return new HeapKeyedStateBackend(this.kvStateRegistry, this.keySerializerProvider.currentSchemaSerializer(), this.userCodeClassLoader, this.executionConfig, this.ttlTimeProvider, this.latencyTrackingStateConfig, cancelStreamRegistryForBackend, this.keyGroupCompressionDecorator, registeredKVStates, registeredPQStates, this.localRecoveryConfig, this.priorityQueueSetFactory, snapshotStrategy, this.asynchronousSnapshots ? SnapshotExecutionType.ASYNCHRONOUS : SnapshotExecutionType.SYNCHRONOUS, stateTableFactory, keyContext);
    }

    private void restoreState(Map<String, StateTable<K, ?, ?>> registeredKVStates, Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, InternalKeyContext<K> keyContext, StateTableFactory<K> stateTableFactory) throws BackendBuildingException {
        KeyedStateHandle firstHandle = this.restoreStateHandles.isEmpty() ? null : (KeyedStateHandle)this.restoreStateHandles.iterator().next();
        RestoreOperation<Void> restoreOperation = firstHandle instanceof SavepointKeyedStateHandle ? new HeapSavepointRestoreOperation<K>(this.restoreStateHandles, this.keySerializerProvider, this.userCodeClassLoader, registeredKVStates, registeredPQStates, this.priorityQueueSetFactory, this.keyGroupRange, this.numberOfKeyGroups, stateTableFactory, keyContext) : new HeapRestoreOperation<K>(this.restoreStateHandles, this.keySerializerProvider, this.userCodeClassLoader, registeredKVStates, registeredPQStates, this.cancelStreamRegistry, this.priorityQueueSetFactory, this.keyGroupRange, this.numberOfKeyGroups, stateTableFactory, keyContext);
        try {
            restoreOperation.restore();
            this.logger.info("Finished to build heap keyed state-backend.");
        }
        catch (Exception e) {
            throw new BackendBuildingException("Failed when trying to restore heap backend", e);
        }
    }

    private HeapSnapshotStrategy<K> initSnapshotStrategy(Map<String, StateTable<K, ?, ?>> registeredKVStates, Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates) {
        return new HeapSnapshotStrategy<K>(registeredKVStates, registeredPQStates, this.keyGroupCompressionDecorator, this.localRecoveryConfig, this.keyGroupRange, this.keySerializerProvider, this.numberOfKeyGroups);
    }
}

