/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.metrics.TimerGauge;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

public abstract class BufferWritingResultPartition
extends ResultPartition {
    protected final ResultSubpartition[] subpartitions;
    private final BufferBuilder[] unicastBufferBuilders;
    private BufferBuilder broadcastBufferBuilder;
    private TimerGauge backPressuredTimeMsPerSecond = new TimerGauge();

    public BufferWritingResultPartition(String owningTaskName, int partitionIndex, ResultPartitionID partitionId, ResultPartitionType partitionType, ResultSubpartition[] subpartitions, int numTargetKeyGroups, ResultPartitionManager partitionManager, @Nullable BufferCompressor bufferCompressor, SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
        super(owningTaskName, partitionIndex, partitionId, partitionType, subpartitions.length, numTargetKeyGroups, partitionManager, bufferCompressor, bufferPoolFactory);
        this.subpartitions = (ResultSubpartition[])Preconditions.checkNotNull((Object)subpartitions);
        this.unicastBufferBuilders = new BufferBuilder[subpartitions.length];
    }

    @Override
    public void setup() throws IOException {
        super.setup();
        Preconditions.checkState((this.bufferPool.getNumberOfRequiredMemorySegments() >= this.getNumberOfSubpartitions() ? 1 : 0) != 0, (Object)"Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition.");
    }

    @Override
    public int getNumberOfQueuedBuffers() {
        int totalBuffers = 0;
        for (ResultSubpartition subpartition : this.subpartitions) {
            totalBuffers += subpartition.unsynchronizedGetNumberOfQueuedBuffers();
        }
        return totalBuffers;
    }

    @Override
    public int getNumberOfQueuedBuffers(int targetSubpartition) {
        Preconditions.checkArgument((targetSubpartition >= 0 && targetSubpartition < this.numSubpartitions ? 1 : 0) != 0);
        return this.subpartitions[targetSubpartition].unsynchronizedGetNumberOfQueuedBuffers();
    }

    protected void flushSubpartition(int targetSubpartition, boolean finishProducers) {
        if (finishProducers) {
            this.finishBroadcastBufferBuilder();
            this.finishUnicastBufferBuilder(targetSubpartition);
        }
        this.subpartitions[targetSubpartition].flush();
    }

    protected void flushAllSubpartitions(boolean finishProducers) {
        if (finishProducers) {
            this.finishBroadcastBufferBuilder();
            this.finishUnicastBufferBuilders();
        }
        for (ResultSubpartition subpartition : this.subpartitions) {
            subpartition.flush();
        }
    }

    @Override
    public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
        BufferBuilder buffer = this.appendUnicastDataForNewRecord(record, targetSubpartition);
        while (record.hasRemaining()) {
            this.finishUnicastBufferBuilder(targetSubpartition);
            buffer = this.appendUnicastDataForRecordContinuation(record, targetSubpartition);
        }
        if (buffer.isFull()) {
            this.finishUnicastBufferBuilder(targetSubpartition);
        }
    }

    @Override
    public void broadcastRecord(ByteBuffer record) throws IOException {
        BufferBuilder buffer = this.appendBroadcastDataForNewRecord(record);
        while (record.hasRemaining()) {
            this.finishBroadcastBufferBuilder();
            buffer = this.appendBroadcastDataForRecordContinuation(record);
        }
        if (buffer.isFull()) {
            this.finishBroadcastBufferBuilder();
        }
    }

    @Override
    public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
        this.checkInProduceState();
        this.finishBroadcastBufferBuilder();
        this.finishUnicastBufferBuilders();
        try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event, isPriorityEvent);){
            for (ResultSubpartition subpartition : this.subpartitions) {
                subpartition.add(eventBufferConsumer.copy(), 0);
            }
        }
    }

    @Override
    public void setMetricGroup(TaskIOMetricGroup metrics) {
        super.setMetricGroup(metrics);
        this.backPressuredTimeMsPerSecond = metrics.getBackPressuredTimePerSecond();
    }

    @Override
    public ResultSubpartitionView createSubpartitionView(int subpartitionIndex, BufferAvailabilityListener availabilityListener) throws IOException {
        Preconditions.checkElementIndex((int)subpartitionIndex, (int)this.numSubpartitions, (String)"Subpartition not found.");
        Preconditions.checkState((!this.isReleased() ? 1 : 0) != 0, (Object)"Partition released.");
        ResultSubpartition subpartition = this.subpartitions[subpartitionIndex];
        ResultSubpartitionView readView = subpartition.createReadView(availabilityListener);
        LOG.debug("Created {}", (Object)readView);
        return readView;
    }

    @Override
    public void finish() throws IOException {
        this.finishBroadcastBufferBuilder();
        this.finishUnicastBufferBuilders();
        for (ResultSubpartition subpartition : this.subpartitions) {
            subpartition.finish();
        }
        super.finish();
    }

    @Override
    protected void releaseInternal() {
        for (ResultSubpartition subpartition : this.subpartitions) {
            try {
                subpartition.release();
            }
            catch (Throwable t) {
                LOG.error("Error during release of result subpartition: " + t.getMessage(), t);
            }
        }
    }

    private BufferBuilder appendUnicastDataForNewRecord(ByteBuffer record, int targetSubpartition) throws IOException {
        if (targetSubpartition < 0 || targetSubpartition > this.unicastBufferBuilders.length) {
            throw new ArrayIndexOutOfBoundsException(targetSubpartition);
        }
        BufferBuilder buffer = this.unicastBufferBuilders[targetSubpartition];
        if (buffer == null) {
            buffer = this.requestNewUnicastBufferBuilder(targetSubpartition);
            this.subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(), 0);
        }
        buffer.appendAndCommit(record);
        return buffer;
    }

    private BufferBuilder appendUnicastDataForRecordContinuation(ByteBuffer remainingRecordBytes, int targetSubpartition) throws IOException {
        BufferBuilder buffer = this.requestNewUnicastBufferBuilder(targetSubpartition);
        int partialRecordBytes = buffer.appendAndCommit(remainingRecordBytes);
        this.subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(), partialRecordBytes);
        return buffer;
    }

    private BufferBuilder appendBroadcastDataForNewRecord(ByteBuffer record) throws IOException {
        BufferBuilder buffer = this.broadcastBufferBuilder;
        if (buffer == null) {
            buffer = this.requestNewBroadcastBufferBuilder();
            this.createBroadcastBufferConsumers(buffer, 0);
        }
        buffer.appendAndCommit(record);
        return buffer;
    }

    private BufferBuilder appendBroadcastDataForRecordContinuation(ByteBuffer remainingRecordBytes) throws IOException {
        BufferBuilder buffer = this.requestNewBroadcastBufferBuilder();
        int partialRecordBytes = buffer.appendAndCommit(remainingRecordBytes);
        this.createBroadcastBufferConsumers(buffer, partialRecordBytes);
        return buffer;
    }

    private void createBroadcastBufferConsumers(BufferBuilder buffer, int partialRecordBytes) throws IOException {
        try (BufferConsumer consumer = buffer.createBufferConsumerFromBeginning();){
            for (ResultSubpartition subpartition : this.subpartitions) {
                subpartition.add(consumer.copy(), partialRecordBytes);
            }
        }
    }

    private BufferBuilder requestNewUnicastBufferBuilder(int targetSubpartition) throws IOException {
        BufferBuilder bufferBuilder;
        this.checkInProduceState();
        this.ensureUnicastMode();
        this.unicastBufferBuilders[targetSubpartition] = bufferBuilder = this.requestNewBufferBuilderFromPool(targetSubpartition);
        return bufferBuilder;
    }

    private BufferBuilder requestNewBroadcastBufferBuilder() throws IOException {
        BufferBuilder bufferBuilder;
        this.checkInProduceState();
        this.ensureBroadcastMode();
        this.broadcastBufferBuilder = bufferBuilder = this.requestNewBufferBuilderFromPool(0);
        return bufferBuilder;
    }

    private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition) throws IOException {
        BufferBuilder bufferBuilder = this.bufferPool.requestBufferBuilder(targetSubpartition);
        if (bufferBuilder != null) {
            return bufferBuilder;
        }
        this.backPressuredTimeMsPerSecond.markStart();
        try {
            bufferBuilder = this.bufferPool.requestBufferBuilderBlocking(targetSubpartition);
            this.backPressuredTimeMsPerSecond.markEnd();
            return bufferBuilder;
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted while waiting for buffer");
        }
    }

    private void finishUnicastBufferBuilder(int targetSubpartition) {
        BufferBuilder bufferBuilder = this.unicastBufferBuilders[targetSubpartition];
        if (bufferBuilder != null) {
            this.numBytesOut.inc((long)bufferBuilder.finish());
            this.numBuffersOut.inc();
            this.unicastBufferBuilders[targetSubpartition] = null;
        }
    }

    private void finishUnicastBufferBuilders() {
        for (int channelIndex = 0; channelIndex < this.numSubpartitions; ++channelIndex) {
            this.finishUnicastBufferBuilder(channelIndex);
        }
    }

    private void finishBroadcastBufferBuilder() {
        if (this.broadcastBufferBuilder != null) {
            this.numBytesOut.inc((long)(this.broadcastBufferBuilder.finish() * this.numSubpartitions));
            this.numBuffersOut.inc((long)this.numSubpartitions);
            this.broadcastBufferBuilder = null;
        }
    }

    private void ensureUnicastMode() {
        this.finishBroadcastBufferBuilder();
    }

    private void ensureBroadcastMode() {
        this.finishUnicastBufferBuilders();
    }

    @VisibleForTesting
    public TimerGauge getBackPressuredTimeMsPerSecond() {
        return this.backPressuredTimeMsPerSecond;
    }

    @VisibleForTesting
    public ResultSubpartition[] getAllPartitions() {
        return this.subpartitions;
    }
}

