/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.filesystem;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileSystemLookupFunction<T extends InputSplit>
extends TableFunction<RowData> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemLookupFunction.class);
    private static final int MAX_RETRIES = 3;
    private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10L);
    private final InputFormat<RowData, T> inputFormat;
    private final String[] producedNames;
    private final DataType[] producedTypes;
    private final Duration cacheTTL;
    private final int[] lookupCols;
    private transient Map<Row, List<RowData>> cache;
    private transient long nextLoadTime;
    private transient TypeSerializer<RowData> serializer;
    private final DataFormatConverters.DataFormatConverter[] converters;

    public FileSystemLookupFunction(InputFormat<RowData, T> inputFormat, String[] lookupKeys, String[] producedNames, DataType[] producedTypes, Duration cacheTTL) {
        this.lookupCols = new int[lookupKeys.length];
        this.converters = new DataFormatConverters.DataFormatConverter[lookupKeys.length];
        Map<String, Integer> nameToIndex = IntStream.range(0, producedNames.length).boxed().collect(Collectors.toMap(i -> producedNames[i], i -> i));
        for (int i2 = 0; i2 < lookupKeys.length; ++i2) {
            Integer index = nameToIndex.get(lookupKeys[i2]);
            Preconditions.checkArgument((index != null ? 1 : 0) != 0, (String)"Lookup keys %s not selected", (Object[])new Object[]{Arrays.toString(lookupKeys)});
            this.converters[i2] = DataFormatConverters.getConverterForDataType(producedTypes[index]);
            this.lookupCols[i2] = index;
        }
        this.inputFormat = inputFormat;
        this.producedNames = producedNames;
        this.producedTypes = producedTypes;
        this.cacheTTL = cacheTTL;
    }

    public TypeInformation<RowData> getResultType() {
        return new RowDataTypeInfo((LogicalType[])Arrays.stream(this.producedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new), this.producedNames);
    }

    public void open(FunctionContext context) throws Exception {
        super.open(context);
        this.cache = new HashMap<Row, List<RowData>>();
        this.nextLoadTime = -1L;
        this.serializer = this.getResultType().createSerializer(new ExecutionConfig());
    }

    public void eval(Object ... values) {
        Preconditions.checkArgument((values.length == this.lookupCols.length ? 1 : 0) != 0, (Object)"Number of values and lookup keys mismatch");
        this.checkCacheReload();
        for (int i = 0; i < values.length; ++i) {
            values[i] = this.converters[i].toExternal(values[i]);
        }
        Row probeKey = Row.of((Object[])values);
        List<RowData> matchedRows = this.cache.get(probeKey);
        if (matchedRows != null) {
            for (RowData matchedRow : matchedRows) {
                this.collect(matchedRow);
            }
        }
    }

    @VisibleForTesting
    public Duration getCacheTTL() {
        return this.cacheTTL;
    }

    private void checkCacheReload() {
        if (this.nextLoadTime > System.currentTimeMillis()) {
            return;
        }
        if (this.nextLoadTime > 0L) {
            LOG.info("Lookup join cache has expired after {} minute(s), reloading", (Object)this.getCacheTTL().toMinutes());
        } else {
            LOG.info("Populating lookup join cache");
        }
        int numRetry = 0;
        while (true) {
            this.cache.clear();
            try {
                InputSplit[] inputSplits = this.inputFormat.createInputSplits(1);
                GenericRowData reuse = new GenericRowData(this.producedNames.length);
                long count = 0L;
                for (InputSplit split : inputSplits) {
                    this.inputFormat.open(split);
                    while (!this.inputFormat.reachedEnd()) {
                        RowData row = (RowData)this.inputFormat.nextRecord((Object)reuse);
                        ++count;
                        Row key = this.extractKey(row);
                        List rows = this.cache.computeIfAbsent(key, k -> new ArrayList());
                        rows.add(this.serializer.copy((Object)row));
                    }
                    this.inputFormat.close();
                }
                this.nextLoadTime = System.currentTimeMillis() + this.getCacheTTL().toMillis();
                LOG.info("Loaded {} row(s) into lookup join cache", (Object)count);
                return;
            }
            catch (IOException e) {
                if (numRetry >= 3) {
                    throw new FlinkRuntimeException(String.format("Failed to load table into cache after %d retries", numRetry), (Throwable)e);
                }
                long toSleep = (long)(++numRetry) * RETRY_INTERVAL.toMillis();
                LOG.warn(String.format("Failed to load table into cache, will retry in %d seconds", toSleep / 1000L), (Throwable)e);
                try {
                    Thread.sleep(toSleep);
                }
                catch (InterruptedException ex) {
                    LOG.warn("Interrupted while waiting to retry failed cache load, aborting");
                    throw new FlinkRuntimeException((Throwable)ex);
                }
            }
        }
    }

    private Row extractKey(RowData row) {
        Row key = new Row(this.lookupCols.length);
        for (int i = 0; i < this.lookupCols.length; ++i) {
            key.setField(i, this.converters[i].toExternal(row, this.lookupCols[i]));
        }
        return key;
    }
}

