/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.functions.AsyncScalarFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;

public class AsyncCalcTestPrograms {
    static final TableTestProgram ASYNC_CALC_UDF_SIMPLE = TableTestProgram.of((String)"async-calc-simple", (String)"validates async calc node with simple UDF").setupTemporaryCatalogFunction("udf1", AsyncJavaFunc0.class).setupTableSource(((SourceTestStep.Builder)SourceTestStep.newBuilder((String)"source_t").addSchema(new String[]{"a INT"})).producedBeforeRestore(new Row[]{Row.of((Object[])new Object[]{5})}).producedAfterRestore(new Row[]{Row.of((Object[])new Object[]{5})}).build()).setupTableSink(((SinkTestStep.Builder)SinkTestStep.newBuilder((String)"sink_t").addSchema(new String[]{"a INT", "a1 BIGINT"})).consumedBeforeRestore(new Row[]{Row.of((Object[])new Object[]{5, 6L})}).consumedAfterRestore(new Row[]{Row.of((Object[])new Object[]{5, 6L})}).build()).runSql("INSERT INTO sink_t SELECT a, udf1(a) FROM source_t").build();
    static final TableTestProgram ASYNC_CALC_UDF_COMPLEX = TableTestProgram.of((String)"async-calc-complex", (String)"validates calc node with complex UDFs").setupTemporaryCatalogFunction("udf1", AsyncJavaFunc0.class).setupTemporaryCatalogFunction("udf2", AsyncJavaFunc1.class).setupTemporarySystemFunction("udf3", AsyncJavaFunc2.class).setupTemporarySystemFunction("udf4", AsyncUdfWithOpen.class).setupCatalogFunction("udf5", AsyncJavaFunc5.class).setupTableSource(((SourceTestStep.Builder)SourceTestStep.newBuilder((String)"source_t").addSchema(new String[]{"a BIGINT, b INT NOT NULL, c VARCHAR, d TIMESTAMP(3)"})).producedBeforeRestore(new Row[]{Row.of((Object[])new Object[]{5L, 11, "hello world", LocalDateTime.of(2023, 12, 16, 1, 1, 1, 123)})}).producedAfterRestore(new Row[]{Row.of((Object[])new Object[]{5L, 11, "hello world", LocalDateTime.of(2023, 12, 16, 1, 1, 1, 123)})}).build()).setupTableSink(((SinkTestStep.Builder)SinkTestStep.newBuilder((String)"sink_t").addSchema(new String[]{"a BIGINT", "a1 VARCHAR", "b INT NOT NULL", "b1 VARCHAR", "c1 VARCHAR", "c2 VARCHAR", "d1 TIMESTAMP(3)"})).consumedBeforeRestore(new Row[]{Row.of((Object[])new Object[]{5L, "5", 11, "11 and 11 and 1702688461000", "hello world11", "$hello", LocalDateTime.of(2023, 12, 16, 1, 1, 0, 0)})}).consumedAfterRestore(new Row[]{Row.of((Object[])new Object[]{5L, "5", 11, "11 and 11 and 1702688461000", "hello world11", "$hello", LocalDateTime.of(2023, 12, 16, 1, 1, 0, 0)})}).build()).runSql("INSERT INTO sink_t SELECT a, cast(a as VARCHAR) as a1, b, udf2(b, b, d) as b1, udf3(c, b) as c1, udf4(substring(c, 1, 5)) as c2, udf5(d, 1000) as d1 from source_t where (udf1(a) > 0 or (a * b) < 100) and b > 10").build();
    static final TableTestProgram ASYNC_CALC_UDF_NESTED = TableTestProgram.of((String)"async-calc-nested", (String)"validates async calc node when chained through nested calls").setupTemporaryCatalogFunction("udf1", AsyncJavaFunc0.class).setupTableSource(((SourceTestStep.Builder)SourceTestStep.newBuilder((String)"source_t").addSchema(new String[]{"a INT"})).producedBeforeRestore(new Row[]{Row.of((Object[])new Object[]{5})}).producedAfterRestore(new Row[]{Row.of((Object[])new Object[]{5})}).build()).setupTableSink(((SinkTestStep.Builder)SinkTestStep.newBuilder((String)"sink_t").addSchema(new String[]{"a INT", "a1 BIGINT"})).consumedBeforeRestore(new Row[]{Row.of((Object[])new Object[]{5, 8L})}).consumedAfterRestore(new Row[]{Row.of((Object[])new Object[]{5, 8L})}).build()).runSql("INSERT INTO sink_t SELECT a, udf1(udf1(udf1(a))) FROM source_t").build();
    static final TableTestProgram ASYNC_CALC_UDF_CONDITION = TableTestProgram.of((String)"async-calc-condition", (String)"validates async calc node with the udf written in the condition of the SQL query").setupTemporaryCatalogFunction("udf1", AsyncJavaFunc0.class).setupTableSource(((SourceTestStep.Builder)SourceTestStep.newBuilder((String)"source_t").addSchema(new String[]{"a INT"})).producedBeforeRestore(new Row[]{Row.of((Object[])new Object[]{5}), Row.of((Object[])new Object[]{6}), Row.of((Object[])new Object[]{4})}).producedAfterRestore(new Row[]{Row.of((Object[])new Object[]{7}), Row.of((Object[])new Object[]{3})}).build()).setupTableSink(((SinkTestStep.Builder)SinkTestStep.newBuilder((String)"sink_t").addSchema(new String[]{"a INT"})).consumedBeforeRestore(new Row[]{Row.of((Object[])new Object[]{6})}).consumedAfterRestore(new Row[]{Row.of((Object[])new Object[]{7})}).build()).runSql("INSERT INTO sink_t SELECT a FROM source_t WHERE udf1(a) > 6").build();
    static final TableTestProgram ASYNC_CALC_UDF_FAILURE_EXCEPTION = TableTestProgram.of((String)"async-calc-failure-exception", (String)"validates async calc node that fails some number of times and then recovers after restore").setupConfig(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_RETRY_DELAY, (Object)Duration.ofMillis(3000L)).setupConfig(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_MAX_ATTEMPTS, (Object)3).setupConfig(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY, (Object)5).setupTemporaryCatalogFunction("udf1", TwosFailFunction.class).setupTableSource(((SourceTestStep.Builder)SourceTestStep.newBuilder((String)"source_t").addSchema(new String[]{"a INT"})).producedBeforeRestore(new Row[]{Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2})}).producedAfterRestore(new Row[]{Row.of((Object[])new Object[]{3})}).build()).setupTableSink(((SinkTestStep.Builder)SinkTestStep.newBuilder((String)"sink_t").addSchema(new String[]{"a INT"})).consumedBeforeRestore(new Row[]{Row.of((Object[])new Object[]{1})}).consumedAfterRestore(new Row[]{Row.of((Object[])new Object[]{2}), Row.of((Object[])new Object[]{3})}).build()).runSql("INSERT INTO sink_t SELECT udf1(a) FROM source_t").build();

    public static class TwosFailFunction
    extends AsyncScalarFunction {
        private static final int TOTAL_FAILURES = 1;
        private final AtomicInteger calls = new AtomicInteger(0);

        public void eval(CompletableFuture<Integer> future, Integer c) {
            if (c != 2) {
                future.complete(c);
                return;
            }
            if (this.calls.incrementAndGet() > 1) {
                future.complete(c);
                return;
            }
            throw new RuntimeException("Failure " + this.calls.get());
        }
    }

    public static class AsyncUdfWithOpen
    extends AsyncScalarFunction {
        private transient boolean isOpened = false;

        public void open(FunctionContext context) throws Exception {
            super.open(context);
            this.isOpened = true;
        }

        public void eval(CompletableFuture<String> future, String c) {
            if (!this.isOpened) {
                throw new IllegalStateException("Open method is not called!");
            }
            future.complete("$" + c);
        }
    }

    public static class AsyncJavaFunc5
    extends AsyncScalarFunction {
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        public void open(FunctionContext context) {
            openCalled = true;
        }

        public void eval(@DataTypeHint(value="TIMESTAMP(3)") CompletableFuture<Timestamp> future, @DataTypeHint(value="TIMESTAMP(3)") TimestampData timestampData, Integer offset) {
            if (!openCalled) {
                Assertions.fail((String)"Open was not called before run.");
            }
            if (timestampData == null || offset == null) {
                future.complete(null);
            } else {
                long ts = timestampData.getMillisecond() - (long)offset.intValue();
                int tzOffset = TimeZone.getDefault().getOffset(ts);
                future.complete(new Timestamp(ts - (long)tzOffset));
            }
        }

        public void close() {
            closeCalled = true;
        }
    }

    public static class AsyncJavaFunc2
    extends AsyncScalarFunction {
        public void eval(CompletableFuture<String> future, String s, Integer ... a) {
            int m = 1;
            Integer[] integerArray = a;
            int n = integerArray.length;
            for (int i = 0; i < n; ++i) {
                int n2 = integerArray[i];
                m *= n2;
            }
            future.complete(s + m);
        }
    }

    public static class AsyncJavaFunc1
    extends AsyncScalarFunction {
        public void eval(CompletableFuture<String> future, Integer a, int b, @DataTypeHint(value="TIMESTAMP(3)") TimestampData c) {
            Long ts = c == null ? null : Long.valueOf(c.getMillisecond());
            future.complete(a + " and " + b + " and " + ts);
        }
    }

    public static class AsyncJavaFunc0
    extends AsyncScalarFunction {
        public void eval(CompletableFuture<Long> future, Long l) {
            future.complete(l + 1L);
        }
    }
}

