/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateBackend;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.Snapshotable;
import org.apache.flink.util.Preconditions;

public abstract class AbstractKeyedStateBackend<K>
implements KeyedStateBackend<K>,
Snapshotable<KeyGroupsStateHandle>,
Closeable {
    protected final TypeSerializer<K> keySerializer;
    protected K currentKey;
    private int currentKeyGroup;
    protected HashMap<String, KvState<?>> keyValueStatesByName;
    private String lastName;
    private KvState lastState;
    protected final int numberOfKeyGroups;
    protected final KeyGroupRange keyGroupRange;
    protected final TaskKvStateRegistry kvStateRegistry;
    protected CloseableRegistry cancelStreamRegistry;
    protected final ClassLoader userCodeClassLoader;

    public AbstractKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, int numberOfKeyGroups, KeyGroupRange keyGroupRange) {
        this.kvStateRegistry = kvStateRegistry;
        this.keySerializer = (TypeSerializer)Preconditions.checkNotNull(keySerializer);
        this.numberOfKeyGroups = (Integer)Preconditions.checkNotNull((Object)numberOfKeyGroups);
        this.userCodeClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)userCodeClassLoader);
        this.keyGroupRange = (KeyGroupRange)Preconditions.checkNotNull((Object)keyGroupRange);
        this.cancelStreamRegistry = new CloseableRegistry();
    }

    @Override
    public void dispose() {
        if (this.kvStateRegistry != null) {
            this.kvStateRegistry.unregisterAll();
        }
        this.lastName = null;
        this.lastState = null;
        this.keyValueStatesByName = null;
    }

    protected abstract <N, T> ValueState<T> createValueState(TypeSerializer<N> var1, ValueStateDescriptor<T> var2) throws Exception;

    protected abstract <N, T> ListState<T> createListState(TypeSerializer<N> var1, ListStateDescriptor<T> var2) throws Exception;

    protected abstract <N, T> ReducingState<T> createReducingState(TypeSerializer<N> var1, ReducingStateDescriptor<T> var2) throws Exception;

    protected abstract <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> var1, FoldingStateDescriptor<T, ACC> var2) throws Exception;

    @Override
    public void setCurrentKey(K newKey) {
        this.currentKey = newKey;
        this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(newKey, this.numberOfKeyGroups);
    }

    @Override
    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    @Override
    public K getCurrentKey() {
        return this.currentKey;
    }

    @Override
    public int getCurrentKeyGroupIndex() {
        return this.currentKeyGroup;
    }

    @Override
    public int getNumberOfKeyGroups() {
        return this.numberOfKeyGroups;
    }

    @Override
    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    @Override
    public <N, S extends State> S getPartitionedState(N namespace, final TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(namespace, (String)"Namespace");
        Preconditions.checkNotNull(namespaceSerializer, (String)"Namespace serializer");
        if (this.keySerializer == null) {
            throw new RuntimeException("State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        }
        if (!stateDescriptor.isSerializerInitialized()) {
            stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        }
        if (this.keyValueStatesByName == null) {
            this.keyValueStatesByName = new HashMap();
        }
        if (this.lastName != null && this.lastName.equals(stateDescriptor.getName())) {
            this.lastState.setCurrentNamespace(namespace);
            return (S)((State)this.lastState);
        }
        KvState<?> previous = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (previous != null) {
            this.lastState = previous;
            this.lastState.setCurrentNamespace(namespace);
            this.lastName = stateDescriptor.getName();
            return (S)((State)previous);
        }
        State state = stateDescriptor.bind(new StateBackend(){

            public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
            }

            public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
            }

            public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
            }

            public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
            }
        });
        KvState kvState = (KvState)state;
        this.keyValueStatesByName.put(stateDescriptor.getName(), kvState);
        this.lastName = stateDescriptor.getName();
        this.lastState = kvState;
        kvState.setCurrentNamespace(namespace);
        if (stateDescriptor.isQueryable()) {
            if (this.kvStateRegistry == null) {
                throw new IllegalStateException("State backend has not been initialized for job.");
            }
            String name = stateDescriptor.getQueryableStateName();
            this.kvStateRegistry.registerKvState(this.keyGroupRange, name, kvState);
        }
        return (S)state;
    }

    @Override
    public <N, S extends MergingState<?, ?>> void mergePartitionedStates(N target, Collection<N> sources, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        if (stateDescriptor instanceof ReducingStateDescriptor) {
            ReducingStateDescriptor reducingStateDescriptor = (ReducingStateDescriptor)stateDescriptor;
            ReduceFunction reduceFn = reducingStateDescriptor.getReduceFunction();
            ReducingState state = (ReducingState)this.getPartitionedState(target, namespaceSerializer, stateDescriptor);
            KvState kvState = (KvState)state;
            Object result = null;
            for (N source : sources) {
                kvState.setCurrentNamespace(source);
                Object sourceValue = state.get();
                if (result == null) {
                    result = state.get();
                } else if (sourceValue != null) {
                    result = reduceFn.reduce(result, sourceValue);
                }
                state.clear();
            }
            kvState.setCurrentNamespace(target);
            if (result != null) {
                state.add(result);
            }
        } else if (stateDescriptor instanceof ListStateDescriptor) {
            ListState state = (ListState)this.getPartitionedState(target, namespaceSerializer, stateDescriptor);
            KvState kvState = (KvState)state;
            ArrayList result = new ArrayList();
            for (N source : sources) {
                kvState.setCurrentNamespace(source);
                Iterable sourceValue = (Iterable)state.get();
                if (sourceValue != null) {
                    for (Object o : sourceValue) {
                        result.add(o);
                    }
                }
                state.clear();
            }
            kvState.setCurrentNamespace(target);
            for (Object o : result) {
                state.add(o);
            }
        } else {
            throw new RuntimeException("Cannot merge states for " + stateDescriptor);
        }
    }

    @Override
    public void close() throws IOException {
        this.cancelStreamRegistry.close();
    }
}

