/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

@Internal
public class SourceOperator<OUT, SplitT extends SourceSplit>
extends AbstractStreamOperator<OUT>
implements OperatorEventHandler,
PushingAsyncDataInput<OUT> {
    private static final long serialVersionUID = 1405537676017904695L;
    static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC = new ListStateDescriptor("SourceReaderState", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
    private final Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory;
    private final SimpleVersionedSerializer<SplitT> splitSerializer;
    private final OperatorEventGateway operatorEventGateway;
    private final WatermarkStrategy<OUT> watermarkStrategy;
    private SourceReader<OUT, SplitT> sourceReader;
    private ReaderOutput<OUT> currentMainOutput;
    private PushingAsyncDataInput.DataOutput<OUT> lastInvokedOutput;
    private ListState<SplitT> readerState;
    private TimestampsAndWatermarks<OUT> eventTimeLogic;

    public SourceOperator(Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory, OperatorEventGateway operatorEventGateway, SimpleVersionedSerializer<SplitT> splitSerializer, WatermarkStrategy<OUT> watermarkStrategy, ProcessingTimeService timeService) {
        this.readerFactory = (Function)Preconditions.checkNotNull(readerFactory);
        this.operatorEventGateway = (OperatorEventGateway)Preconditions.checkNotNull((Object)operatorEventGateway);
        this.splitSerializer = (SimpleVersionedSerializer)Preconditions.checkNotNull(splitSerializer);
        this.watermarkStrategy = (WatermarkStrategy)Preconditions.checkNotNull(watermarkStrategy);
        this.processingTimeService = timeService;
    }

    @Override
    public void open() throws Exception {
        final MetricGroup metricGroup = this.getMetricGroup();
        SourceReaderContext context = new SourceReaderContext(){

            public MetricGroup metricGroup() {
                return metricGroup;
            }

            public void sendSourceEventToCoordinator(SourceEvent event) {
                SourceOperator.this.operatorEventGateway.sendEventToCoordinator((OperatorEvent)new SourceEventWrapper(event));
            }
        };
        this.eventTimeLogic = TimestampsAndWatermarks.createStreamingEventTimeLogic(this.watermarkStrategy, metricGroup, this.getProcessingTimeService(), this.getExecutionConfig().getAutoWatermarkInterval());
        this.sourceReader = this.readerFactory.apply(context);
        List splits = CollectionUtil.iterableToList((Iterable)((Iterable)this.readerState.get()));
        if (!splits.isEmpty()) {
            this.sourceReader.addSplits(splits);
        }
        this.sourceReader.start();
        this.registerReader();
        this.eventTimeLogic.startPeriodicWatermarkEmits();
    }

    @Override
    public void close() throws Exception {
        this.eventTimeLogic.stopPeriodicWatermarkEmits();
        super.close();
    }

    @Override
    public InputStatus emitNext(PushingAsyncDataInput.DataOutput<OUT> output) throws Exception {
        assert (this.lastInvokedOutput == output || this.lastInvokedOutput == null);
        if (this.currentMainOutput != null) {
            return this.sourceReader.pollNext(this.currentMainOutput);
        }
        this.currentMainOutput = this.eventTimeLogic.createMainOutput(output);
        this.lastInvokedOutput = output;
        return this.sourceReader.pollNext(this.currentMainOutput);
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        LOG.debug("Taking a snapshot for checkpoint {}", (Object)context.getCheckpointId());
        this.readerState.update(this.sourceReader.snapshotState());
    }

    public CompletableFuture<?> getAvailableFuture() {
        return this.sourceReader.isAvailable();
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        ListState rawState = context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
        this.readerState = new SimpleVersionedListState<SplitT>((ListState<byte[]>)rawState, this.splitSerializer);
    }

    public void handleOperatorEvent(OperatorEvent event) {
        if (event instanceof AddSplitEvent) {
            try {
                this.sourceReader.addSplits(((AddSplitEvent)event).splits(this.splitSerializer));
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Failed to deserialize the splits.", (Throwable)e);
            }
        } else if (event instanceof SourceEventWrapper) {
            this.sourceReader.handleSourceEvents(((SourceEventWrapper)event).getSourceEvent());
        } else {
            throw new IllegalStateException("Received unexpected operator event " + event);
        }
    }

    private void registerReader() {
        this.operatorEventGateway.sendEventToCoordinator((OperatorEvent)new ReaderRegistrationEvent(this.getRuntimeContext().getIndexOfThisSubtask(), "UNKNOWN_LOCATION"));
    }

    @VisibleForTesting
    public SourceReader<OUT, SplitT> getSourceReader() {
        return this.sourceReader;
    }

    @VisibleForTesting
    ListState<SplitT> getReaderState() {
        return this.readerState;
    }
}

