/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.table;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.functions.AsyncScalarFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class AsyncCalcITCase
extends StreamingTestBase {
    private TableEnvironment tEnv;

    @Override
    @BeforeEach
    public void before() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        this.tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env, (EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        this.tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY, (Object)2);
        this.tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_TIMEOUT, (Object)Duration.ofMinutes(1L));
    }

    @Test
    public void testSimpleTableSelect() {
        Table t1 = this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)new AsyncFunc());
        List<Row> results = this.executeSql("select func(f1) from t1");
        List<Row> expectedRows = Arrays.asList(Row.of((Object[])new Object[]{"val 1"}), Row.of((Object[])new Object[]{"val 2"}), Row.of((Object[])new Object[]{"val 3"}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    @Test
    public void testLiteralPlusTableSelect() {
        Table t1 = this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)new AsyncFunc());
        List<Row> results = this.executeSql("select 'foo', func(f1) from t1");
        List<Row> expectedRows = Arrays.asList(Row.of((Object[])new Object[]{"foo", "val 1"}), Row.of((Object[])new Object[]{"foo", "val 2"}), Row.of((Object[])new Object[]{"foo", "val 3"}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    @Test
    public void testFieldPlusTableSelect() {
        Table t1 = this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)new AsyncFunc());
        List<Row> results = this.executeSql("select f1, func(f1) from t1");
        List<Row> expectedRows = Arrays.asList(Row.of((Object[])new Object[]{1, "val 1"}), Row.of((Object[])new Object[]{2, "val 2"}), Row.of((Object[])new Object[]{3, "val 3"}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    @Test
    public void testTwoCalls() {
        Table t1 = this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)new AsyncFunc());
        List<Row> results = this.executeSql("select func(f1), func(f1) from t1");
        List<Row> expectedRows = Arrays.asList(Row.of((Object[])new Object[]{"val 1", "val 1"}), Row.of((Object[])new Object[]{"val 2", "val 2"}), Row.of((Object[])new Object[]{"val 3", "val 3"}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    @Test
    public void testThreeNestedCalls() {
        Table t1 = this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)new AsyncFuncAdd10());
        List<Row> results = this.executeSql("select func(func(f1)), func(func(func(f1))), func(f1) from t1");
        List<Row> expectedRows = Arrays.asList(Row.of((Object[])new Object[]{21, 31, 11}), Row.of((Object[])new Object[]{22, 32, 12}), Row.of((Object[])new Object[]{23, 33, 13}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    @Test
    public void testPassedToOtherUDF() {
        Table t1 = this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)new AsyncFunc());
        List<Row> results = this.executeSql("select Concat(func(f1), 'foo') from t1");
        List<Row> expectedRows = Arrays.asList(Row.of((Object[])new Object[]{"val 1foo"}), Row.of((Object[])new Object[]{"val 2foo"}), Row.of((Object[])new Object[]{"val 3foo"}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    @Test
    public void testJustCall() {
        Table t1 = this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)new AsyncFunc());
        List<Row> results = this.executeSql("select func(1)");
        List<Row> expectedRows = Collections.singletonList(Row.of((Object[])new Object[]{"val 1"}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    @Test
    public void testWhereConditionAndProjection() {
        Table t1 = this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)new AsyncFunc());
        List<Row> results = this.executeSql("select func(f1) from t1 where REGEXP(func(f1), 'val (2|3)')");
        List<Row> expectedRows = Arrays.asList(Row.of((Object[])new Object[]{"val 2"}), Row.of((Object[])new Object[]{"val 3"}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    @Test
    public void testFieldAccessAfter() {
        Table t1 = this.tEnv.fromValues(new Object[]{2}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)new AsyncFuncRow());
        List<Row> results = this.executeSql("select func(f1).f0 from t1");
        List<Row> expectedRows = Collections.singletonList(Row.of((Object[])new Object[]{3}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    @Test
    public void testFieldOperand() {
        Table t1 = this.tEnv.fromValues(new Object[]{2}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)new AsyncFuncRow());
        this.tEnv.createTemporarySystemFunction("func2", (UserDefinedFunction)new AsyncFuncAdd10());
        Table structs = this.tEnv.sqlQuery("select func(f1) from t1");
        this.tEnv.createTemporaryView("t2", structs);
        List<Row> results = this.executeSql("select func2(t2.f0) from t2");
        List<Row> expectedRows = Collections.singletonList(Row.of((Object[])new Object[]{13}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    @Test
    public void testOverload() {
        Table t1 = this.tEnv.fromValues(new Object[]{1}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)new AsyncFuncOverload());
        List<Row> results = this.executeSql("select func(f1), func(cast(f1 as String)) from t1");
        List<Row> expectedRows = Collections.singletonList(Row.of((Object[])new Object[]{"int version 1", "string version 1"}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    @Test
    public void testMultiLayerGeneric() {
        Table t1 = this.tEnv.fromValues(new Object[]{1}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)new LongAsyncFuncGeneric());
        List<Row> results = this.executeSql("select func(f1) from t1");
        List<Row> expectedRows = Collections.singletonList(Row.of((Object[])new Object[]{new Long[]{11L}}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    @Test
    public void testMultiLayerMoreGeneric() {
        Table t1 = this.tEnv.fromValues(new Object[]{1}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)new LongAsyncFuncMoreGeneric());
        List<Row> results = this.executeSql("select func(f1) from t1");
        List<Row> expectedRows = Collections.singletonList(Row.of((Object[])new Object[]{new Long[]{11L}}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    @Test
    public void testFailures() {
        this.tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY, (Object)1);
        Table t1 = this.tEnv.fromValues(new Object[]{1}).as("f1", new String[0]);
        this.tEnv.createTemporaryView("t1", t1);
        AsyncFuncFail func = new AsyncFuncFail(2);
        this.tEnv.createTemporarySystemFunction("func", (UserDefinedFunction)func);
        List<Row> results = this.executeSql("select func(f1) from t1");
        List<Row> expectedRows = Collections.singletonList(Row.of((Object[])new Object[]{3}));
        Assertions.assertThat(results).containsSequence(expectedRows);
    }

    private List<Row> executeSql(String sql) {
        TableResult result = this.tEnv.executeSql(sql);
        ArrayList<Row> rows = new ArrayList<Row>();
        result.collect().forEachRemaining(rows::add);
        return rows;
    }

    public static class AsyncFuncBase
    extends AsyncScalarFunction {
        protected ScheduledExecutorService executor;

        public void open(FunctionContext context) {
            this.executor = Executors.newSingleThreadScheduledExecutor();
        }

        public void close() {
            if (null != this.executor && !this.executor.isShutdown()) {
                this.executor.shutdownNow();
            }
        }
    }

    public static class LongAsyncFuncMoreGeneric
    extends AsyncFuncMoreGeneric<CompletableFuture<Long[]>> {
        @Override
        void finish(CompletableFuture<Long[]> future, int param) {
            Long[] result = new Long[]{10L + (long)param};
            future.complete(result);
        }
    }

    public static abstract class AsyncFuncMoreGeneric<T>
    extends AsyncFuncBase {
        private static final long serialVersionUID = 3L;

        abstract void finish(T var1, int var2);

        public void eval(T future, Integer param) {
            this.executor.schedule(() -> this.finish(future, param), 10L, TimeUnit.MILLISECONDS);
        }
    }

    public static class LongAsyncFuncGeneric
    extends AsyncFuncGeneric<Long> {
        Long[] newT(int param) {
            Long[] result = new Long[]{10L + (long)param};
            return result;
        }
    }

    public static abstract class AsyncFuncGeneric<T>
    extends AsyncFuncBase {
        private static final long serialVersionUID = 3L;

        abstract T[] newT(int var1);

        public void eval(CompletableFuture<T[]> future, Integer param) {
            this.executor.schedule(() -> future.complete(this.newT(param)), 10L, TimeUnit.MILLISECONDS);
        }
    }

    public static class AsyncFuncFail
    extends AsyncFuncBase
    implements Serializable {
        private static final long serialVersionUID = 8996145425452974113L;
        private final int numFailures;
        private final AtomicInteger failures = new AtomicInteger(0);

        public AsyncFuncFail(int numFailures) {
            this.numFailures = numFailures;
        }

        public void eval(CompletableFuture<Integer> future, int ignoredA) {
            if (this.failures.getAndIncrement() < this.numFailures) {
                future.completeExceptionally(new RuntimeException("Error " + this.failures.get()));
                return;
            }
            future.complete(this.failures.get());
        }
    }

    public static class AsyncFuncRow
    extends AsyncScalarFunction {
        @DataTypeHint(value="ROW<f0 INT, f1 String>")
        public void eval(CompletableFuture<Row> future, int a) {
            future.complete(Row.of((Object[])new Object[]{a + 1, "" + a * a}));
        }
    }

    public static class AsyncFuncOverload
    extends AsyncFuncBase {
        private static final long serialVersionUID = 3L;

        public void eval(CompletableFuture<String> future, Integer param) {
            this.executor.schedule(() -> future.complete("int version " + param), 10L, TimeUnit.MILLISECONDS);
        }

        public void eval(CompletableFuture<String> future, String param) {
            this.executor.schedule(() -> future.complete("string version " + param), 10L, TimeUnit.MILLISECONDS);
        }
    }

    public static class AsyncFuncAdd10
    extends AsyncFuncBase {
        private static final long serialVersionUID = 2L;

        public void eval(CompletableFuture<Integer> future, Integer param) {
            this.executor.schedule(() -> future.complete(param + 10), 10L, TimeUnit.MILLISECONDS);
        }
    }

    public static class AsyncFunc
    extends AsyncFuncBase {
        private static final long serialVersionUID = 1L;

        public void eval(CompletableFuture<String> future, Integer param) {
            this.executor.schedule(() -> future.complete("val " + param), 10L, TimeUnit.MILLISECONDS);
        }
    }
}

