/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.table;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class PrintConnectorITCase
extends StreamingTestBase {
    private final PrintStream originalSystemOut = System.out;
    private final PrintStream originalSystemErr = System.err;
    private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
    private final ByteArrayOutputStream arrayErrorStream = new ByteArrayOutputStream();

    PrintConnectorITCase() {
    }

    @BeforeEach
    void setUp() {
        System.setOut(new PrintStream(this.arrayOutputStream));
        System.setErr(new PrintStream(this.arrayErrorStream));
    }

    @AfterEach
    void tearDown() {
        if (System.out != this.originalSystemOut) {
            System.out.close();
        }
        if (System.err != this.originalSystemErr) {
            System.err.close();
        }
        System.setOut(this.originalSystemOut);
        System.setErr(this.originalSystemErr);
    }

    @Test
    void testTypes() throws Exception {
        this.test(false);
    }

    @Test
    void testStandardError() throws Exception {
        this.test(true);
    }

    @Test
    void testWithParallelism() throws Exception {
        this.tEnv().executeSql("create table print_t (f0 int,f1 double) with ('connector' = 'print','print-identifier' = 'test_print','sink.parallelism' = '2','standard-error'='false')");
        DataType type = this.tEnv().from("print_t").getResolvedSchema().toPhysicalRowDataType();
        Row row = Row.of((Object[])new Object[]{1, 1.1});
        this.tEnv().fromValues((AbstractDataType)type, Collections.singleton(row)).executeInsert("print_t").await();
        String expectedLine1 = "test_print:1> +I[1, 1.1]";
        String expectedLine2 = "test_print:2> +I[1, 1.1]";
        Assertions.assertThat((this.arrayOutputStream.toString().equals(expectedLine1 + "\n") || this.arrayOutputStream.toString().equals(expectedLine2 + "\n") ? 1 : 0) != 0).isTrue();
    }

    @Test
    void testWithPartitionedTableAll() throws Exception {
        this.createPartitionedTable();
        this.tEnv().executeSql("INSERT INTO print_t PARTITION (f0=1,f1=1.1) SELECT 'n1'").await();
        String expectedLine1 = "test_print:f0=1:f1=1.1:1> +I[1, 1.1, n1]";
        String expectedLine2 = "test_print:f0=1:f1=1.1:2> +I[1, 1.1, n1]";
        Assertions.assertThat((this.arrayOutputStream.toString().equals(expectedLine1 + "\n") || this.arrayOutputStream.toString().equals(expectedLine2 + "\n") ? 1 : 0) != 0).isTrue();
    }

    @Test
    void testWithPartitionedTablePart() throws Exception {
        this.createPartitionedTable();
        this.tEnv().executeSql("INSERT INTO print_t PARTITION (f0=1) SELECT 1.1, 'n1'").await();
        String expectedLine1 = "test_print:f0=1:1> +I[1, 1.1, n1]";
        String expectedLine2 = "test_print:f0=1:2> +I[1, 1.1, n1]";
        Assertions.assertThat((this.arrayOutputStream.toString().equals(expectedLine1 + "\n") || this.arrayOutputStream.toString().equals(expectedLine2 + "\n") ? 1 : 0) != 0).isTrue();
    }

    private void createPartitionedTable() {
        this.tEnv().executeSql("create table print_t (f0 int,f1 double,f2 string) PARTITIONED BY (f0, f1) with ('connector' = 'print','print-identifier' = 'test_print','sink.parallelism' = '2','standard-error'='false')");
    }

    private void test(boolean standardError) throws Exception {
        this.tEnv().executeSql(String.format("create table print_t (f0 int,f1 double,f2 decimal(5, 2),f3 boolean,f4 string,f5 date,f6 time,f7 timestamp,f8 bytes,f9 array<int>,f10 map<int, int>,f11 row<n0 int, n1 string>) with ('connector' = 'print','print-identifier' = '%s','standard-error'='%b')", "test_print", standardError));
        DataType type = this.tEnv().from("print_t").getResolvedSchema().toPhysicalRowDataType();
        HashMap<Integer, Integer> mapData = new HashMap<Integer, Integer>();
        mapData.put(1, 1);
        mapData.put(2, 2);
        Row row = Row.of((Object[])new Object[]{1, 1.1, BigDecimal.valueOf(1.11), false, "f4", LocalDate.of(2020, 11, 5), LocalTime.of(12, 22, 35), LocalDateTime.of(2020, 11, 5, 12, 22, 35), new byte[]{1, 2, 3}, new int[]{11, 22, 33}, mapData, Row.of((Object[])new Object[]{1, "1"})});
        this.tEnv().fromValues((AbstractDataType)type, Arrays.asList(row, row)).executeInsert("print_t").await();
        String expectedLine = "test_print> +I[1, 1.1, 1.11, false, f4, 2020-11-05, 12:22:35, 2020-11-05T12:22:35, [1, 2, 3], [11, 22, 33], {1=1, 2=2}, +I[1, 1]]";
        Assertions.assertThat((String)(standardError ? this.arrayErrorStream.toString() : this.arrayOutputStream.toString())).isEqualTo(expectedLine + "\n" + expectedLine + "\n");
    }
}

