/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec;

import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.WithBoundedness;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.PlanReference;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.CompiledPlanUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.utils.JsonTestUtils;
import org.apache.flink.table.types.AbstractDataType;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@Execution(value=ExecutionMode.CONCURRENT)
class TransformationsTest {
    TransformationsTest() {
    }

    @Test
    public void testLegacyBatchSource() {
        StreamTableEnvironment env = StreamTableEnvironment.create((StreamExecutionEnvironment)StreamExecutionEnvironment.getExecutionEnvironment(), (EnvironmentSettings)EnvironmentSettings.newInstance().inBatchMode().build());
        Table table = env.from(TableDescriptor.forConnector((String)"values").option("bounded", "true").schema(TransformationsTest.dummySchema()).build());
        LegacySourceTransformation<?> sourceTransform = TransformationsTest.toLegacySourceTransformation(env, table);
        TransformationsTest.assertBoundedness(Boundedness.BOUNDED, sourceTransform);
        Assertions.assertThat((boolean)sourceTransform.getOperator().emitsProgressiveWatermarks()).isFalse();
    }

    @Test
    public void testLegacyStreamSource() {
        StreamTableEnvironment env = StreamTableEnvironment.create((StreamExecutionEnvironment)StreamExecutionEnvironment.getExecutionEnvironment(), (EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().build());
        Table table = env.from(TableDescriptor.forConnector((String)"values").option("bounded", "false").schema(TransformationsTest.dummySchema()).build());
        LegacySourceTransformation<?> sourceTransform = TransformationsTest.toLegacySourceTransformation(env, table);
        TransformationsTest.assertBoundedness(Boundedness.CONTINUOUS_UNBOUNDED, sourceTransform);
        Assertions.assertThat((boolean)sourceTransform.getOperator().emitsProgressiveWatermarks()).isTrue();
    }

    @Test
    public void testLegacyBatchValues() {
        StreamTableEnvironment env = StreamTableEnvironment.create((StreamExecutionEnvironment)StreamExecutionEnvironment.getExecutionEnvironment(), (EnvironmentSettings)EnvironmentSettings.newInstance().inBatchMode().build());
        Table table = env.fromValues(new Object[]{1, 2, 3});
        LegacySourceTransformation<?> sourceTransform = TransformationsTest.toLegacySourceTransformation(env, table);
        TransformationsTest.assertBoundedness(Boundedness.BOUNDED, sourceTransform);
    }

    @Test
    public void testUidGeneration() {
        TransformationsTest.checkUids(c -> c.set(ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION, (Object)ExecutionConfigOptions.UidGeneration.PLAN_ONLY), true, false);
        TransformationsTest.checkUids(c -> c.set(ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION, (Object)ExecutionConfigOptions.UidGeneration.ALWAYS), true, true);
        TransformationsTest.checkUids(c -> c.set(ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION, (Object)ExecutionConfigOptions.UidGeneration.DISABLED), false, false);
        TransformationsTest.checkUids(c -> {
            c.set(ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION, (Object)ExecutionConfigOptions.UidGeneration.PLAN_ONLY);
            c.set(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS, (Object)true);
        }, false, false);
    }

    private static void checkUids(Consumer<TableConfig> config, boolean expectUidWithCompilation, boolean expectUidWithoutCompilation) {
        StreamTableEnvironment env = StreamTableEnvironment.create((StreamExecutionEnvironment)StreamExecutionEnvironment.getExecutionEnvironment(), (EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().build());
        config.accept(env.getConfig());
        env.createTemporaryTable("source_table", TableDescriptor.forConnector((String)"values").option("bounded", "true").schema(TransformationsTest.dummySchema()).build());
        env.createTemporaryTable("sink_table", TableDescriptor.forConnector((String)"values").schema(TransformationsTest.dummySchema()).build());
        Table table = env.from("source_table").select(new Expression[]{(Expression)Expressions.$((String)"i").abs()});
        CompiledPlan memoryPlan = table.insertInto("sink_table").compilePlan();
        List memoryUids = CompiledPlanUtils.toTransformations((TableEnvironment)env, memoryPlan).get(0).getTransitivePredecessors().stream().map(Transformation::getUid).collect(Collectors.toList());
        Assertions.assertThat(memoryUids).hasSize(3);
        if (expectUidWithCompilation) {
            Assertions.assertThat(memoryUids).allSatisfy(u -> {
                AbstractStringAssert cfr_ignored_0 = (AbstractStringAssert)Assertions.assertThat((String)u).isNotNull();
            });
        } else {
            Assertions.assertThat(memoryUids).allSatisfy(u -> Assertions.assertThat((String)u).isNull());
        }
        String jsonPlan = table.insertInto("sink_table").compilePlan().asJsonString();
        List jsonUids = CompiledPlanUtils.toTransformations((TableEnvironment)env, env.loadPlan(PlanReference.fromJsonString((String)jsonPlan))).get(0).getTransitivePredecessors().stream().map(Transformation::getUid).collect(Collectors.toList());
        Assertions.assertThat(jsonUids).hasSize(3);
        if (expectUidWithCompilation) {
            Assertions.assertThat(jsonUids).allSatisfy(u -> {
                AbstractStringAssert cfr_ignored_0 = (AbstractStringAssert)Assertions.assertThat((String)u).isNotNull();
            });
        } else {
            Assertions.assertThat(jsonUids).allSatisfy(u -> Assertions.assertThat((String)u).isNull());
        }
        List inlineUids = env.toChangelogStream(table).getTransformation().getTransitivePredecessors().stream().map(Transformation::getUid).collect(Collectors.toList());
        Assertions.assertThat(inlineUids).hasSize(3);
        if (expectUidWithoutCompilation) {
            Assertions.assertThat(inlineUids).allSatisfy(u -> {
                AbstractStringAssert cfr_ignored_0 = (AbstractStringAssert)Assertions.assertThat((String)u).isNotNull();
            });
        } else {
            Assertions.assertThat(inlineUids).allSatisfy(u -> Assertions.assertThat((String)u).isNull());
        }
    }

    @Test
    public void testUidDefaults() throws IOException {
        TransformationsTest.checkUidModification(config -> {}, json -> {}, env -> TransformationsTest.planFromCurrentFlinkVersion(env).asJsonString(), "\\d+_sink", "\\d+_constraint-validator", "\\d+_values");
    }

    @Test
    public void testUidFlink1_15() throws IOException {
        TransformationsTest.checkUidModification(config -> config.set(ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT, (Object)"<id>_<type>_<version>_<transformation>"), json -> {}, env -> TransformationsTest.planFromFlink1_15(env).asJsonString(), "\\d+_stream-exec-sink_1_sink", "\\d+_stream-exec-sink_1_constraint-validator", "\\d+_stream-exec-values_1_values");
    }

    @Test
    public void testUidFlink1_18() throws IOException {
        TransformationsTest.checkUidModification(config -> config.set(ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT, (Object)"<id>_<type>_<version>_<transformation>"), json -> {}, env -> TransformationsTest.planFromCurrentFlinkVersion(env).asJsonString(), "\\d+_stream-exec-sink_1_sink", "\\d+_stream-exec-sink_1_constraint-validator", "\\d+_stream-exec-values_1_values");
    }

    @Test
    public void testPerNodeCustomUid() throws IOException {
        TransformationsTest.checkUidModification(config -> {}, json -> JsonTestUtils.setExecNodeConfig(json, "stream-exec-sink_1", ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT.key(), "my_custom_<transformation>_<id>"), env -> TransformationsTest.planFromCurrentFlinkVersion(env).asJsonString(), "my_custom_sink_\\d+", "my_custom_constraint-validator_\\d+", "\\d+_values");
    }

    private static void checkUidModification(Consumer<TableConfig> configModifier, Consumer<JsonNode> jsonModifier, Function<TableEnvironment, String> planGenerator, String ... expectedUidPatterns) throws IOException {
        TableEnvironment env = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        configModifier.accept(env.getConfig());
        JsonNode json = JsonTestUtils.readFromString(planGenerator.apply(env));
        jsonModifier.accept(json);
        List planUids = CompiledPlanUtils.toTransformations(env, env.loadPlan(PlanReference.fromJsonString((String)json.toString()))).get(0).getTransitivePredecessors().stream().map(Transformation::getUid).collect(Collectors.toList());
        Assertions.assertThat(planUids).hasSize(expectedUidPatterns.length);
        IntStream.range(0, expectedUidPatterns.length).forEach(i -> {
            AbstractStringAssert cfr_ignored_0 = (AbstractStringAssert)Assertions.assertThat((String)((String)planUids.get(i))).matches((CharSequence)expectedUidPatterns[i]);
        });
    }

    private static CompiledPlan planFromCurrentFlinkVersion(TableEnvironment env) {
        return env.fromValues(new Object[]{1, 2, 3}).insertInto(TableDescriptor.forConnector((String)"blackhole").build()).compilePlan();
    }

    private static CompiledPlan planFromFlink1_15(TableEnvironment env) {
        return env.loadPlan(PlanReference.fromResource((String)"/jsonplan/testUidFlink1_15.out"));
    }

    private static LegacySourceTransformation<?> toLegacySourceTransformation(StreamTableEnvironment env, Table table) {
        Transformation transform = env.toChangelogStream(table).getTransformation();
        while (transform.getInputs().size() == 1) {
            transform = (Transformation)transform.getInputs().get(0);
        }
        Assertions.assertThat((Object)transform).isInstanceOf(LegacySourceTransformation.class);
        return (LegacySourceTransformation)transform;
    }

    private static void assertBoundedness(Boundedness boundedness, Transformation<?> transform) {
        ((ObjectAssert)Assertions.assertThat(transform).asInstanceOf(InstanceOfAssertFactories.type(WithBoundedness.class))).extracting(WithBoundedness::getBoundedness).isEqualTo((Object)boundedness);
    }

    private static Schema dummySchema() {
        return Schema.newBuilder().column("i", (AbstractDataType)DataTypes.INT()).build();
    }
}

