/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.source;

import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks;
import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.source.WatermarkToDataOutput;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

@Internal
public class StreamingTimestampsAndWatermarks<T>
implements TimestampsAndWatermarks<T> {
    private final TimestampAssigner<T> timestampAssigner;
    private final WatermarkGeneratorSupplier<T> watermarksFactory;
    private final WatermarkGeneratorSupplier.Context watermarksContext;
    private final ProcessingTimeService timeService;
    private final long periodicWatermarkInterval;
    @Nullable
    private SplitLocalOutputs<T> currentPerSplitOutputs;
    @Nullable
    private StreamingReaderOutput<T> currentMainOutput;
    @Nullable
    private ScheduledFuture<?> periodicEmitHandle;

    public StreamingTimestampsAndWatermarks(TimestampAssigner<T> timestampAssigner, WatermarkGeneratorSupplier<T> watermarksFactory, WatermarkGeneratorSupplier.Context watermarksContext, ProcessingTimeService timeService, Duration periodicWatermarkInterval) {
        long periodicWatermarkIntervalMillis;
        this.timestampAssigner = timestampAssigner;
        this.watermarksFactory = watermarksFactory;
        this.watermarksContext = watermarksContext;
        this.timeService = timeService;
        try {
            periodicWatermarkIntervalMillis = periodicWatermarkInterval.toMillis();
        }
        catch (ArithmeticException ignored) {
            periodicWatermarkIntervalMillis = Long.MAX_VALUE;
        }
        this.periodicWatermarkInterval = periodicWatermarkIntervalMillis;
    }

    @Override
    public ReaderOutput<T> createMainOutput(PushingAsyncDataInput.DataOutput<T> output) {
        Preconditions.checkState((this.currentMainOutput == null && this.currentPerSplitOutputs == null ? 1 : 0) != 0, (Object)"already created a main output");
        WatermarkToDataOutput watermarkOutput = new WatermarkToDataOutput(output);
        WatermarkGenerator watermarkGenerator = this.watermarksFactory.createWatermarkGenerator(this.watermarksContext);
        this.currentPerSplitOutputs = new SplitLocalOutputs(output, watermarkOutput, this.timestampAssigner, this.watermarksFactory, this.watermarksContext);
        this.currentMainOutput = new StreamingReaderOutput<T>(output, (WatermarkOutput)watermarkOutput, this.timestampAssigner, watermarkGenerator, this.currentPerSplitOutputs);
        return this.currentMainOutput;
    }

    @Override
    public void startPeriodicWatermarkEmits() {
        Preconditions.checkState((this.periodicEmitHandle == null ? 1 : 0) != 0, (Object)"periodic emitter already started");
        if (this.periodicWatermarkInterval == 0L) {
            return;
        }
        this.periodicEmitHandle = this.timeService.scheduleWithFixedDelay(this::triggerPeriodicEmit, this.periodicWatermarkInterval, this.periodicWatermarkInterval);
    }

    @Override
    public void stopPeriodicWatermarkEmits() {
        if (this.periodicEmitHandle != null) {
            this.periodicEmitHandle.cancel(false);
            this.periodicEmitHandle = null;
        }
    }

    void triggerPeriodicEmit(long wallClockTimestamp) {
        if (this.currentPerSplitOutputs != null) {
            this.currentPerSplitOutputs.emitPeriodicWatermark();
        }
        if (this.currentMainOutput != null) {
            this.currentMainOutput.emitPeriodicWatermark();
        }
    }

    private static final class SplitLocalOutputs<T> {
        private final WatermarkOutputMultiplexer watermarkMultiplexer;
        private final Map<String, SourceOutputWithWatermarks<T>> localOutputs;
        private final PushingAsyncDataInput.DataOutput<T> recordOutput;
        private final TimestampAssigner<T> timestampAssigner;
        private final WatermarkGeneratorSupplier<T> watermarksFactory;
        private final WatermarkGeneratorSupplier.Context watermarkContext;

        private SplitLocalOutputs(PushingAsyncDataInput.DataOutput<T> recordOutput, WatermarkOutput watermarkOutput, TimestampAssigner<T> timestampAssigner, WatermarkGeneratorSupplier<T> watermarksFactory, WatermarkGeneratorSupplier.Context watermarkContext) {
            this.recordOutput = recordOutput;
            this.timestampAssigner = timestampAssigner;
            this.watermarksFactory = watermarksFactory;
            this.watermarkContext = watermarkContext;
            this.watermarkMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
            this.localOutputs = new LinkedHashMap<String, SourceOutputWithWatermarks<T>>();
        }

        SourceOutput<T> createOutputForSplit(String splitId) {
            SourceOutputWithWatermarks<T> previous = this.localOutputs.get(splitId);
            if (previous != null) {
                return previous;
            }
            this.watermarkMultiplexer.registerNewOutput(splitId);
            WatermarkOutput onEventOutput = this.watermarkMultiplexer.getImmediateOutput(splitId);
            WatermarkOutput periodicOutput = this.watermarkMultiplexer.getDeferredOutput(splitId);
            WatermarkGenerator watermarks = this.watermarksFactory.createWatermarkGenerator(this.watermarkContext);
            SourceOutputWithWatermarks<T> localOutput = SourceOutputWithWatermarks.createWithSeparateOutputs(this.recordOutput, onEventOutput, periodicOutput, this.timestampAssigner, watermarks);
            this.localOutputs.put(splitId, localOutput);
            return localOutput;
        }

        void releaseOutputForSplit(String splitId) {
            this.localOutputs.remove(splitId);
            this.watermarkMultiplexer.unregisterOutput(splitId);
        }

        void emitPeriodicWatermark() {
            for (SourceOutputWithWatermarks<T> output : this.localOutputs.values()) {
                output.emitPeriodicWatermark();
            }
            this.watermarkMultiplexer.onPeriodicEmit();
        }
    }

    private static final class StreamingReaderOutput<T>
    extends SourceOutputWithWatermarks<T>
    implements ReaderOutput<T> {
        private final SplitLocalOutputs<T> splitLocalOutputs;

        StreamingReaderOutput(PushingAsyncDataInput.DataOutput<T> output, WatermarkOutput watermarkOutput, TimestampAssigner<T> timestampAssigner, WatermarkGenerator<T> watermarkGenerator, SplitLocalOutputs<T> splitLocalOutputs) {
            super(output, watermarkOutput, watermarkOutput, timestampAssigner, watermarkGenerator);
            this.splitLocalOutputs = splitLocalOutputs;
        }

        public SourceOutput<T> createOutputForSplit(String splitId) {
            return this.splitLocalOutputs.createOutputForSplit(splitId);
        }

        public void releaseOutputForSplit(String splitId) {
            this.splitLocalOutputs.releaseOutputForSplit(splitId);
        }
    }
}

