/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.AbstractDataOutput;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
import org.apache.flink.streaming.runtime.io.TwoInputSelectionHandler;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

@Internal
public final class StreamTwoInputProcessor<IN1, IN2>
implements StreamInputProcessor {
    private final TwoInputSelectionHandler inputSelectionHandler;
    private final StreamTaskInput<IN1> input1;
    private final StreamTaskInput<IN2> input2;
    private final OperatorChain<?, ?> operatorChain;
    private final PushingAsyncDataInput.DataOutput<IN1> output1;
    private final PushingAsyncDataInput.DataOutput<IN2> output2;
    private InputStatus firstInputStatus = InputStatus.MORE_AVAILABLE;
    private InputStatus secondInputStatus = InputStatus.MORE_AVAILABLE;
    private StreamStatus firstStatus = StreamStatus.ACTIVE;
    private StreamStatus secondStatus = StreamStatus.ACTIVE;
    private int lastReadInputIndex = 1;
    private boolean isPrepared;

    public StreamTwoInputProcessor(CheckpointedInputGate[] checkpointedInputGates, TypeSerializer<IN1> inputSerializer1, TypeSerializer<IN2> inputSerializer2, IOManager ioManager, StreamStatusMaintainer streamStatusMaintainer, TwoInputStreamOperator<IN1, IN2, ?> streamOperator, TwoInputSelectionHandler inputSelectionHandler, WatermarkGauge input1WatermarkGauge, WatermarkGauge input2WatermarkGauge, OperatorChain<?, ?> operatorChain, Counter numRecordsIn) {
        this.inputSelectionHandler = (TwoInputSelectionHandler)Preconditions.checkNotNull((Object)inputSelectionHandler);
        this.output1 = new StreamTaskNetworkOutput<IN1>(streamOperator, record -> this.processRecord1((StreamRecord<IN1>)record, streamOperator, numRecordsIn), streamStatusMaintainer, input1WatermarkGauge, 0);
        this.output2 = new StreamTaskNetworkOutput<IN2>(streamOperator, record -> this.processRecord2((StreamRecord<IN2>)record, streamOperator, numRecordsIn), streamStatusMaintainer, input2WatermarkGauge, 1);
        this.input1 = new StreamTaskNetworkInput<IN1>(checkpointedInputGates[0], inputSerializer1, ioManager, new StatusWatermarkValve(checkpointedInputGates[0].getNumberOfInputChannels(), this.output1), 0);
        this.input2 = new StreamTaskNetworkInput<IN2>(checkpointedInputGates[1], inputSerializer2, ioManager, new StatusWatermarkValve(checkpointedInputGates[1].getNumberOfInputChannels(), this.output2), 1);
        this.operatorChain = (OperatorChain)Preconditions.checkNotNull(operatorChain);
    }

    private void processRecord1(StreamRecord<IN1> record, TwoInputStreamOperator<IN1, IN2, ?> streamOperator, Counter numRecordsIn) throws Exception {
        streamOperator.setKeyContextElement1(record);
        streamOperator.processElement1(record);
        this.postProcessRecord(numRecordsIn);
    }

    private void processRecord2(StreamRecord<IN2> record, TwoInputStreamOperator<IN1, IN2, ?> streamOperator, Counter numRecordsIn) throws Exception {
        streamOperator.setKeyContextElement2(record);
        streamOperator.processElement2(record);
        this.postProcessRecord(numRecordsIn);
    }

    private void postProcessRecord(Counter numRecordsIn) {
        numRecordsIn.inc();
        this.inputSelectionHandler.nextSelection();
    }

    public CompletableFuture<?> getAvailableFuture() {
        if (this.inputSelectionHandler.areAllInputsSelected()) {
            return this.isAnyInputAvailable();
        }
        StreamTaskInput<Object> input = this.inputSelectionHandler.isFirstInputSelected() ? this.input1 : this.input2;
        return input.getAvailableFuture();
    }

    @Override
    public InputStatus processInput() throws Exception {
        int readingInputIndex;
        if (this.isPrepared) {
            readingInputIndex = this.selectNextReadingInputIndex();
            assert (readingInputIndex != -1);
        } else {
            readingInputIndex = this.selectFirstReadingInputIndex();
            if (readingInputIndex == -1) {
                return InputStatus.NOTHING_AVAILABLE;
            }
        }
        this.lastReadInputIndex = readingInputIndex;
        if (readingInputIndex == 0) {
            this.firstInputStatus = this.input1.emitNext(this.output1);
            this.checkFinished(this.firstInputStatus, this.lastReadInputIndex);
        } else {
            this.secondInputStatus = this.input2.emitNext(this.output2);
            this.checkFinished(this.secondInputStatus, this.lastReadInputIndex);
        }
        return this.getInputStatus();
    }

    @Override
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException {
        return CompletableFuture.allOf(this.input1.prepareSnapshot(channelStateWriter, checkpointId), this.input2.prepareSnapshot(channelStateWriter, checkpointId));
    }

    private int selectFirstReadingInputIndex() throws IOException {
        this.inputSelectionHandler.nextSelection();
        this.isPrepared = true;
        return this.selectNextReadingInputIndex();
    }

    private void checkFinished(InputStatus status, int inputIndex) throws Exception {
        if (status == InputStatus.END_OF_INPUT) {
            this.operatorChain.endHeadOperatorInput(this.getInputId(inputIndex));
            this.inputSelectionHandler.nextSelection();
        }
    }

    private InputStatus getInputStatus() {
        if (this.firstInputStatus == InputStatus.END_OF_INPUT && this.secondInputStatus == InputStatus.END_OF_INPUT) {
            return InputStatus.END_OF_INPUT;
        }
        if (this.inputSelectionHandler.areAllInputsSelected()) {
            if (this.firstInputStatus == InputStatus.MORE_AVAILABLE || this.secondInputStatus == InputStatus.MORE_AVAILABLE) {
                return InputStatus.MORE_AVAILABLE;
            }
            return InputStatus.NOTHING_AVAILABLE;
        }
        InputStatus selectedStatus = this.inputSelectionHandler.isFirstInputSelected() ? this.firstInputStatus : this.secondInputStatus;
        InputStatus otherStatus = this.inputSelectionHandler.isFirstInputSelected() ? this.secondInputStatus : this.firstInputStatus;
        return selectedStatus == InputStatus.END_OF_INPUT ? otherStatus : selectedStatus;
    }

    @Override
    public void close() throws IOException {
        IOException ex = null;
        try {
            this.input1.close();
        }
        catch (IOException e) {
            ex = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)e, ex);
        }
        try {
            this.input2.close();
        }
        catch (IOException e) {
            ex = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)ex);
        }
        if (ex != null) {
            throw ex;
        }
    }

    private int selectNextReadingInputIndex() throws IOException {
        this.updateAvailability();
        this.checkInputSelectionAgainstIsFinished();
        int readingInputIndex = this.inputSelectionHandler.selectNextInputIndex(this.lastReadInputIndex);
        if (readingInputIndex == -1) {
            return -1;
        }
        if (this.inputSelectionHandler.shouldSetAvailableForAnotherInput()) {
            this.checkAndSetAvailable(1 - readingInputIndex);
        }
        return readingInputIndex;
    }

    private void checkInputSelectionAgainstIsFinished() throws IOException {
        if (this.inputSelectionHandler.areAllInputsSelected()) {
            return;
        }
        if (this.inputSelectionHandler.isFirstInputSelected() && this.firstInputStatus == InputStatus.END_OF_INPUT) {
            throw new IOException("Can not make a progress: only first input is selected but it is already finished");
        }
        if (this.inputSelectionHandler.isSecondInputSelected() && this.secondInputStatus == InputStatus.END_OF_INPUT) {
            throw new IOException("Can not make a progress: only second input is selected but it is already finished");
        }
    }

    private void updateAvailability() {
        this.updateAvailability(this.firstInputStatus, this.input1);
        this.updateAvailability(this.secondInputStatus, this.input2);
    }

    private void updateAvailability(InputStatus status, StreamTaskInput input) {
        if (status == InputStatus.MORE_AVAILABLE || status != InputStatus.END_OF_INPUT && input.isApproximatelyAvailable()) {
            this.inputSelectionHandler.setAvailableInput(input.getInputIndex());
        } else {
            this.inputSelectionHandler.setUnavailableInput(input.getInputIndex());
        }
    }

    private void checkAndSetAvailable(int inputIndex) {
        InputStatus status;
        InputStatus inputStatus = status = inputIndex == 0 ? this.firstInputStatus : this.secondInputStatus;
        if (status == InputStatus.END_OF_INPUT) {
            return;
        }
        if (this.getInput(inputIndex).isAvailable()) {
            this.inputSelectionHandler.setAvailableInput(inputIndex);
        }
    }

    private CompletableFuture<?> isAnyInputAvailable() {
        if (this.firstInputStatus == InputStatus.END_OF_INPUT) {
            return this.input2.getAvailableFuture();
        }
        if (this.secondInputStatus == InputStatus.END_OF_INPUT) {
            return this.input1.getAvailableFuture();
        }
        return this.input1.isApproximatelyAvailable() || this.input2.isApproximatelyAvailable() ? AVAILABLE : CompletableFuture.anyOf(this.input1.getAvailableFuture(), this.input2.getAvailableFuture());
    }

    private StreamTaskInput getInput(int inputIndex) {
        return inputIndex == 0 ? this.input1 : this.input2;
    }

    private int getInputId(int inputIndex) {
        return inputIndex + 1;
    }

    private class StreamTaskNetworkOutput<T>
    extends AbstractDataOutput<T> {
        private final TwoInputStreamOperator<IN1, IN2, ?> operator;
        private final ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer;
        private final WatermarkGauge inputWatermarkGauge;
        private final int inputIndex;

        private StreamTaskNetworkOutput(TwoInputStreamOperator<IN1, IN2, ?> operator, ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer, StreamStatusMaintainer streamStatusMaintainer, WatermarkGauge inputWatermarkGauge, int inputIndex) {
            super(streamStatusMaintainer);
            this.operator = (TwoInputStreamOperator)Preconditions.checkNotNull(operator);
            this.recordConsumer = (ThrowingConsumer)Preconditions.checkNotNull(recordConsumer);
            this.inputWatermarkGauge = (WatermarkGauge)Preconditions.checkNotNull((Object)inputWatermarkGauge);
            this.inputIndex = inputIndex;
        }

        @Override
        public void emitRecord(StreamRecord<T> record) throws Exception {
            this.recordConsumer.accept(record);
        }

        @Override
        public void emitWatermark(Watermark watermark) throws Exception {
            this.inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            if (this.inputIndex == 0) {
                this.operator.processWatermark1(watermark);
            } else {
                this.operator.processWatermark2(watermark);
            }
        }

        @Override
        public void emitStreamStatus(StreamStatus streamStatus) {
            StreamStatus anotherStreamStatus;
            if (this.inputIndex == 0) {
                StreamTwoInputProcessor.this.firstStatus = streamStatus;
                anotherStreamStatus = StreamTwoInputProcessor.this.secondStatus;
            } else {
                StreamTwoInputProcessor.this.secondStatus = streamStatus;
                anotherStreamStatus = StreamTwoInputProcessor.this.firstStatus;
            }
            if (!streamStatus.equals(this.streamStatusMaintainer.getStreamStatus())) {
                if (streamStatus.isActive()) {
                    this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                } else if (anotherStreamStatus.isIdle()) {
                    this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                }
            }
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            if (this.inputIndex == 0) {
                this.operator.processLatencyMarker1(latencyMarker);
            } else {
                this.operator.processLatencyMarker2(latencyMarker);
            }
        }
    }
}

