/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.AbstractFileAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;

@Timeout(value=90L, unit=TimeUnit.SECONDS)
public abstract class CompactionITCaseBase
extends StreamingTestBase {
    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
    private String resultPath;
    private List<Row> expectedRows;

    @BeforeEach
    protected void init() throws IOException {
        this.resultPath = TempDirUtils.newFolder((Path)this.tempFolder()).toURI().toString();
        this.env().setParallelism(3);
        this.env().enableCheckpointing(100L);
        ArrayList<Row> rows = new ArrayList<Row>();
        for (int i = 0; i < 100; ++i) {
            rows.add(Row.of((Object[])new Object[]{i, String.valueOf(i % 10), String.valueOf(i % 10)}));
        }
        this.expectedRows = new ArrayList<Row>();
        this.expectedRows.addAll(rows);
        this.expectedRows.addAll(rows);
        this.expectedRows.sort(Comparator.comparingInt(o -> (Integer)o.getField(0)));
        org.apache.flink.streaming.api.scala.DataStream stream = new org.apache.flink.streaming.api.scala.DataStream((DataStream)this.env().getJavaEnv().addSource((SourceFunction)new FiniteTestSource(rows), (TypeInformation)new RowTypeInfo(new TypeInformation[]{Types.INT, Types.STRING, Types.STRING}, new String[]{"a", "b", "c"}))).filter((FilterFunction & Serializable)value -> true).setParallelism(3);
        this.tEnv().createTemporaryView("my_table", stream);
    }

    protected abstract String partitionField();

    protected abstract void createTable(String var1);

    protected abstract void createPartitionTable(String var1);

    @TestTemplate
    void testSingleParallelism() throws Exception {
        this.innerTestNonPartition(1);
    }

    @TestTemplate
    void testNonPartition() throws Exception {
        this.innerTestNonPartition(3);
    }

    void innerTestNonPartition(int parallelism) throws Exception {
        this.createTable(this.resultPath);
        String sql = String.format("insert into sink_table /*+ OPTIONS('sink.parallelism' = '%d') */ select * from my_table", parallelism);
        this.tEnv().executeSql(sql).await();
        this.assertIterator((CloseableIterator<Row>)this.tEnv().executeSql("select * from sink_table").collect());
        this.assertFiles(new File(URI.create(this.resultPath)).listFiles(), false);
    }

    @TestTemplate
    void testPartition() throws Exception {
        this.createPartitionTable(this.resultPath);
        this.tEnv().executeSql("insert into sink_table select * from my_table").await();
        this.assertIterator((CloseableIterator<Row>)this.tEnv().executeSql("select * from sink_table").collect());
        File path = new File(URI.create(this.resultPath));
        Assertions.assertThat((Object[])path.listFiles()).hasSize(10);
        for (int i = 0; i < 10; ++i) {
            File partition = new File(path, this.partitionField() + "=" + i);
            this.assertFiles(partition.listFiles(), true);
        }
    }

    private void assertIterator(CloseableIterator<Row> iterator) throws Exception {
        List result = CollectionUtil.iteratorToList(iterator);
        iterator.close();
        result.sort(Comparator.comparingInt(o -> (Integer)o.getField(0)));
        Assertions.assertThat((List)result).isEqualTo(this.expectedRows);
    }

    private void assertFiles(File[] files, boolean containSuccess) {
        File successFile = null;
        for (File file : files) {
            if (file.isHidden()) continue;
            if (containSuccess && file.getName().equals("_SUCCESS")) {
                successFile = file;
                continue;
            }
            ((AbstractStringAssert)Assertions.assertThat((String)file.getName()).as(file.getName(), new Object[0])).startsWith((CharSequence)"compacted-");
        }
        if (containSuccess) {
            ((AbstractFileAssert)Assertions.assertThat(successFile).as("Should contains success file", new Object[0])).isNotNull();
        }
    }
}

