/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.source.enumerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.source.TerminatingLogic;
import org.apache.flink.connector.source.enumerator.NoOpEnumState;
import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicFilteringValuesSourceEnumerator
implements SplitEnumerator<ValuesSourcePartitionSplit, NoOpEnumState> {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicFilteringValuesSourceEnumerator.class);
    private final SplitEnumeratorContext<ValuesSourcePartitionSplit> context;
    private final List<ValuesSourcePartitionSplit> allSplits;
    private final List<String> dynamicFilteringFields;
    private final TerminatingLogic terminatingLogic;
    private transient boolean receivedDynamicFilteringEvent;
    private transient List<ValuesSourcePartitionSplit> remainingSplits;

    public DynamicFilteringValuesSourceEnumerator(SplitEnumeratorContext<ValuesSourcePartitionSplit> context, TerminatingLogic terminatingLogic, List<ValuesSourcePartitionSplit> allSplits, List<String> dynamicFilteringFields) {
        this.context = context;
        this.allSplits = allSplits;
        this.terminatingLogic = terminatingLogic;
        this.dynamicFilteringFields = dynamicFilteringFields;
    }

    public void start() {
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        if (!this.receivedDynamicFilteringEvent) {
            throw new IllegalStateException("DynamicFilteringEvent has not receive");
        }
        if (this.remainingSplits.isEmpty()) {
            if (this.terminatingLogic == TerminatingLogic.INFINITE) {
                this.context.assignSplit((SourceSplit)new ValuesSourcePartitionSplit(Collections.emptyMap(), TerminatingLogic.INFINITE), subtaskId);
            } else {
                this.context.signalNoMoreSplits(subtaskId);
                LOG.info("No more splits available for subtask {}", (Object)subtaskId);
            }
        } else {
            ValuesSourcePartitionSplit split = this.remainingSplits.remove(0);
            LOG.debug("Assigned split to subtask {} : {}", (Object)subtaskId, (Object)split);
            this.context.assignSplit((SourceSplit)split, subtaskId);
        }
    }

    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        if (sourceEvent instanceof DynamicFilteringEvent) {
            LOG.warn("Received DynamicFilteringEvent: {}", (Object)subtaskId);
            this.receivedDynamicFilteringEvent = true;
            DynamicFilteringData dynamicFilteringData = ((DynamicFilteringEvent)sourceEvent).getData();
            this.assignPartitions(dynamicFilteringData);
        } else {
            LOG.error("Received unrecognized event: {}", (Object)sourceEvent);
        }
    }

    private void assignPartitions(DynamicFilteringData data) {
        if (data.isFiltering()) {
            this.remainingSplits = new ArrayList<ValuesSourcePartitionSplit>();
            for (ValuesSourcePartitionSplit split : this.allSplits) {
                List<String> values = this.dynamicFilteringFields.stream().map(k -> split.getPartition().get(k)).collect(Collectors.toList());
                LOG.info("values: " + values);
                if (!data.contains((RowData)this.generateRowData(values, data.getRowType()))) continue;
                this.remainingSplits.add(split);
            }
        } else {
            this.remainingSplits = new ArrayList<ValuesSourcePartitionSplit>(this.allSplits);
        }
        LOG.info("remainingSplits: " + this.remainingSplits);
    }

    private GenericRowData generateRowData(List<String> partitionValues, RowType rowType) {
        Preconditions.checkArgument((partitionValues.size() == rowType.getFieldCount() ? 1 : 0) != 0);
        Object[] values = new Object[partitionValues.size()];
        block5: for (int i = 0; i < rowType.getFieldCount(); ++i) {
            switch (rowType.getTypeAt(i).getTypeRoot()) {
                case VARCHAR: {
                    values[i] = partitionValues.get(i);
                    continue block5;
                }
                case INTEGER: {
                    values[i] = Integer.valueOf(partitionValues.get(i));
                    continue block5;
                }
                case BIGINT: {
                    values[i] = Long.valueOf(partitionValues.get(i));
                    continue block5;
                }
                default: {
                    throw new UnsupportedOperationException(rowType.getTypeAt(i).getTypeRoot() + " is not supported.");
                }
            }
        }
        return GenericRowData.of((Object[])values);
    }

    public void addSplitsBack(List<ValuesSourcePartitionSplit> splits, int subtaskId) {
        this.remainingSplits.addAll(splits);
    }

    public void addReader(int subtaskId) {
    }

    public NoOpEnumState snapshotState(long checkpointId) throws Exception {
        return new NoOpEnumState();
    }

    public void close() throws IOException {
    }
}

