/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.serde;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;

@Execution(value=ExecutionMode.CONCURRENT)
public class StateMetadataTest {
    private TableConfig tableConfig;

    @BeforeEach
    public void beforeEach() {
        this.tableConfig = TableConfig.getDefault();
    }

    @CsvSource(value={"0,0hour,fooState", "1,600000ms,barState", "2,10minute,meowState"})
    @ParameterizedTest
    public void testStateMetadataSerde(int index, String ttl, String name) throws IOException {
        JsonSerdeTestUtil.testJsonRoundTrip(new StateMetadata(index, ttl, name), StateMetadata.class);
    }

    @CsvSource(value={"{\"index\":0,\"name\":\"fooState\"}|state ttl should not be null", "{\"index\":-1,\"ttl\":\"3600000ms\",\"name\":\"barState\"}|state index should start from 0", "{\"ttl\":\"3600000ms\",\"index\":1}|state name should not be null"}, delimiterString="|")
    @ParameterizedTest
    public void testDeserializeFromMalformedJson(String malformedJson, String expectedMsg) {
        Assertions.assertThatThrownBy(() -> JsonSerdeTestUtil.toObject(JsonSerdeTestUtil.configuredSerdeContext(), malformedJson, StateMetadata.class)).hasMessageContaining(expectedMsg);
    }

    @MethodSource(value={"provideConfigForOneInput"})
    @ParameterizedTest
    public void testGetOneInputOperatorDefaultMeta(Consumer<TableConfig> configModifier, @Nullable Long stateTtlFromHint, String expectedStateName, long expectedTtlMillis) {
        configModifier.accept(this.tableConfig);
        List stateMetadataList = StateMetadata.getOneInputOperatorDefaultMeta((Long)stateTtlFromHint, (ReadableConfig)this.tableConfig, (String)expectedStateName);
        Assertions.assertThat((List)stateMetadataList).hasSize(1);
        Assertions.assertThat(stateMetadataList.get(0)).isEqualTo((Object)new StateMetadata(0, Duration.ofMillis(expectedTtlMillis), expectedStateName));
    }

    @MethodSource(value={"provideConfigForMultiInput"})
    @ParameterizedTest
    public void testGetMultiInputOperatorDefaultMeta(Consumer<TableConfig> configModifier, Map<Integer, Long> stateTtlFromHint, List<String> expectedStateNameList, List<Long> expectedTtlMillisList) {
        configModifier.accept(this.tableConfig);
        List stateMetadataList = StateMetadata.getMultiInputOperatorDefaultMeta(stateTtlFromHint, (ReadableConfig)this.tableConfig, (String[])expectedStateNameList.toArray(new String[0]));
        Assertions.assertThat((List)stateMetadataList).hasSameSizeAs(expectedStateNameList);
        IntStream.range(0, stateMetadataList.size()).forEach(i -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat(stateMetadataList.get(i)).isEqualTo((Object)new StateMetadata(i, Duration.ofMillis((Long)expectedTtlMillisList.get(i)), (String)expectedStateNameList.get(i)));
        });
    }

    @MethodSource(value={"provideStateMetaForOneInput"})
    @ParameterizedTest
    public void testGetStateTtlForOneInputOperator(Function<TableConfig, ExecNodeConfig> configModifier, @Nullable List<StateMetadata> stateMetadataList, long expectedStateTtl) {
        ExecNodeConfig nodeConfig = configModifier.apply(this.tableConfig);
        Assertions.assertThat((long)StateMetadata.getStateTtlForOneInputOperator((ExecNodeConfig)nodeConfig, stateMetadataList)).isEqualTo(expectedStateTtl);
    }

    @MethodSource(value={"provideStateMetaForMultiInput"})
    @ParameterizedTest
    public void testGetStateTtlForMultiInputOperator(Function<TableConfig, ExecNodeConfig> configModifier, @Nullable List<StateMetadata> stateMetadataList, List<Long> expectedStateTtlList) {
        ExecNodeConfig nodeConfig = configModifier.apply(this.tableConfig);
        Assertions.assertThat((List)StateMetadata.getStateTtlForMultiInputOperator((ExecNodeConfig)nodeConfig, (int)expectedStateTtlList.size(), stateMetadataList)).containsExactlyElementsOf(expectedStateTtlList);
    }

    @MethodSource(value={"provideMalformedStateMeta"})
    @ParameterizedTest
    public void testGetStateTtlFromInvalidStateMeta(int expectedInputNumOfOperator, List<StateMetadata> malformedStateMetadataList, String expectedMessage) {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> StateMetadata.getStateTtlForMultiInputOperator((ExecNodeConfig)ExecNodeConfig.ofTableConfig((TableConfig)this.tableConfig, (boolean)true), (int)expectedInputNumOfOperator, (List)malformedStateMetadataList)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining(expectedMessage);
    }

    public static Stream<Arguments> provideConfigForOneInput() {
        return Stream.of(Arguments.of((Object[])new Object[]{config -> {}, null, "fooState", 0L}), Arguments.of((Object[])new Object[]{config -> config.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, (Object)Duration.ofMinutes(10L)), null, "barState", 600000L}), Arguments.of((Object[])new Object[]{config -> {}, 100L, "bazState", 100L}), Arguments.of((Object[])new Object[]{config -> config.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, (Object)Duration.ofMinutes(10L)), 200L, "quxState", 200L}));
    }

    public static Stream<Arguments> provideConfigForMultiInput() {
        return Stream.of(Arguments.of((Object[])new Object[]{config -> {}, Collections.emptyMap(), Arrays.asList("fooState", "barState"), Stream.generate(() -> 0L).limit(2L).collect(Collectors.toList())}), Arguments.of((Object[])new Object[]{config -> config.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, (Object)Duration.ofDays(1L)), Collections.emptyMap(), Arrays.asList("firstState", "secondState", "thirdState", "fourthState"), Stream.generate(() -> 86400000L).limit(4L).collect(Collectors.toList())}), Arguments.of((Object[])new Object[]{config -> config.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, (Object)Duration.ofDays(1L)), Collections.singletonMap(0, 18000000L), Arrays.asList("meowState", "purrState"), Arrays.asList(18000000L, 86400000L)}), Arguments.of((Object[])new Object[]{config -> {}, Collections.singletonMap(1, 172800000L), Arrays.asList("leftState", "rightState"), Arrays.asList(0L, 172800000L)}));
    }

    public static Stream<Arguments> provideStateMetaForOneInput() {
        return Stream.of(Arguments.of((Object[])new Object[]{config -> ExecNodeConfig.ofTableConfig((TableConfig)config, (boolean)true), null, 0L}), Arguments.of((Object[])new Object[]{config -> {
            config.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, (Object)Duration.ofDays(1L));
            return ExecNodeConfig.ofTableConfig((TableConfig)config, (boolean)true);
        }, Collections.emptyList(), 86400000L}), Arguments.of((Object[])new Object[]{config -> ExecNodeConfig.ofTableConfig((TableConfig)config, (boolean)true), Collections.singletonList(new StateMetadata(0, Duration.ofMillis(3600000L), "fooState")), 3600000L}), Arguments.of((Object[])new Object[]{config -> {
            config.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, (Object)Duration.ofDays(1L));
            return ExecNodeConfig.ofTableConfig((TableConfig)config, (boolean)true);
        }, Collections.singletonList(new StateMetadata(0, Duration.ofMillis(172800000L), "barState")), 172800000L}));
    }

    public static Stream<Arguments> provideStateMetaForMultiInput() {
        return Stream.of(Arguments.of((Object[])new Object[]{config -> ExecNodeConfig.ofTableConfig((TableConfig)config, (boolean)true), null, Stream.generate(() -> 0L).limit(2L).collect(Collectors.toList())}), Arguments.of((Object[])new Object[]{config -> {
            config.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, (Object)Duration.ofDays(1L));
            return ExecNodeConfig.ofTableConfig((TableConfig)config, (boolean)true);
        }, Collections.emptyList(), Stream.generate(() -> 86400000L).limit(3L).collect(Collectors.toList())}), Arguments.of((Object[])new Object[]{config -> ExecNodeConfig.ofTableConfig((TableConfig)config, (boolean)true), Arrays.asList(new StateMetadata(1, Duration.ofMillis(86400000L), "fooState"), new StateMetadata(0, Duration.ofMillis(3600000L), "barState")), Arrays.asList(3600000L, 86400000L)}), Arguments.of((Object[])new Object[]{config -> {
            config.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, (Object)Duration.ofMinutes(30L));
            return ExecNodeConfig.ofTableConfig((TableConfig)config, (boolean)true);
        }, Arrays.asList(new StateMetadata(1, Duration.ofMillis(86400000L), "fooState"), new StateMetadata(0, Duration.ofMillis(3600000L), "barState"), new StateMetadata(2, Duration.ofMillis(3600000L), "meowState")), Arrays.asList(3600000L, 86400000L, 3600000L)}));
    }

    public static Stream<Arguments> provideMalformedStateMeta() {
        return Stream.of(Arguments.of((Object[])new Object[]{1, Arrays.asList(new StateMetadata(1, Duration.ofMillis(60000L), "fooState"), new StateMetadata(3, Duration.ofMillis(3600000L), "barState")), "Received 2 state meta for a OneInputStreamOperator."}), Arguments.of((Object[])new Object[]{2, Collections.singletonList(new StateMetadata(1, Duration.ofMillis(60000L), "fooState")), "Received 1 state meta for a TwoInputStreamOperator."}), Arguments.of((Object[])new Object[]{3, Collections.singletonList(new StateMetadata(0, Duration.ofMillis(60000L), "fooState")), "Received 1 state meta for a MultipleInputStreamOperator."}), Arguments.of((Object[])new Object[]{2, Arrays.asList(new StateMetadata(0, Duration.ofMillis(60000L), "fooState"), new StateMetadata(0, Duration.ofMillis(3600000L), "barState")), "The state index should not contain duplicates and start from 0 (inclusive) and monotonically increase to the input size (exclusive) of the operator."}), Arguments.of((Object[])new Object[]{2, Arrays.asList(new StateMetadata(1, Duration.ofMillis(3600000L), "barState"), new StateMetadata(3, Duration.ofMillis(3600000L), "barState")), "The state index should not contain duplicates and start from 0 (inclusive) and monotonically increase to the input size (exclusive) of the operator."}));
    }
}

