/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.PartitionRequestQueue;
import org.apache.flink.runtime.io.network.netty.SequenceNumberingViewReader;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PartitionRequestServerHandler
extends SimpleChannelInboundHandler<NettyMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestServerHandler.class);
    private final ResultPartitionProvider partitionProvider;
    private final TaskEventDispatcher taskEventDispatcher;
    private final PartitionRequestQueue outboundQueue;
    private final NetworkBufferPool networkBufferPool;
    private BufferPool bufferPool;

    PartitionRequestServerHandler(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, PartitionRequestQueue outboundQueue, NetworkBufferPool networkBufferPool) {
        this.partitionProvider = partitionProvider;
        this.taskEventDispatcher = taskEventDispatcher;
        this.outboundQueue = outboundQueue;
        this.networkBufferPool = networkBufferPool;
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        this.bufferPool = this.networkBufferPool.createBufferPool(1, false);
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
        if (this.bufferPool != null) {
            this.bufferPool.lazyDestroy();
        }
    }

    protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
        try {
            Class<?> msgClazz = msg.getClass();
            if (msgClazz == NettyMessage.PartitionRequest.class) {
                NettyMessage.PartitionRequest request = (NettyMessage.PartitionRequest)msg;
                LOG.debug("Read channel on {}: {}.", (Object)ctx.channel().localAddress(), (Object)request);
                try {
                    SequenceNumberingViewReader reader = new SequenceNumberingViewReader(request.receiverId, this.outboundQueue);
                    reader.requestSubpartitionView(this.partitionProvider, request.partitionId, request.queueIndex, this.bufferPool);
                }
                catch (PartitionNotFoundException notFound) {
                    this.respondWithError(ctx, notFound, request.receiverId);
                }
            } else if (msgClazz == NettyMessage.TaskEventRequest.class) {
                NettyMessage.TaskEventRequest request = (NettyMessage.TaskEventRequest)msg;
                if (!this.taskEventDispatcher.publish(request.partitionId, request.event)) {
                    this.respondWithError(ctx, new IllegalArgumentException("Task event receiver not found."), request.receiverId);
                }
            } else if (msgClazz == NettyMessage.CancelPartitionRequest.class) {
                NettyMessage.CancelPartitionRequest request = (NettyMessage.CancelPartitionRequest)msg;
                this.outboundQueue.cancel(request.receiverId);
            } else if (msgClazz == NettyMessage.CloseRequest.class) {
                this.outboundQueue.close();
            } else {
                LOG.warn("Received unexpected client request: {}", (Object)msg);
            }
        }
        catch (Throwable t) {
            this.respondWithError(ctx, t);
        }
    }

    private void respondWithError(ChannelHandlerContext ctx, Throwable error) {
        ctx.writeAndFlush((Object)new NettyMessage.ErrorResponse(error));
    }

    private void respondWithError(ChannelHandlerContext ctx, Throwable error, InputChannelID sourceId) {
        LOG.debug("Responding with error: {}.", error.getClass());
        ctx.writeAndFlush((Object)new NettyMessage.ErrorResponse(error, sourceId));
    }
}

