/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.hsf.remoting.netty.server;

import com.taobao.eagleeye.EagleEye;
import com.taobao.hsf.logger.LoggerInit;
import com.taobao.hsf.remoting.BaseRequest;
import com.taobao.hsf.remoting.BaseResponse;
import com.taobao.hsf.remoting.Connection;
import com.taobao.hsf.remoting.RemotingRuntimeInfoHolder;
import com.taobao.hsf.remoting.ResponseStatus;
import com.taobao.hsf.remoting.netty.NettyConnection;
import com.taobao.hsf.remoting.server.ServerHandler;
import com.taobao.hsf.util.InetAddressUtil;
import com.taobao.middleware.logger.Logger;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.internal.chmv8.ConcurrentHashMapV8;
import java.net.ConnectException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;

@ChannelHandler.Sharable
public class NettyServerHandler
extends SimpleChannelInboundHandler<Object> {
    private static final Logger LOGGER = LoggerInit.LOGGER;
    private static final String ERROR_FORMATTOR = "[HSF-Provider-" + InetAddressUtil.getIP() + "] Error log: {0}";
    private static final String RECEIVE_MESSAGE_ERROR = "receive message error,only support BaseRequest";
    private final ConcurrentMap<Channel, NettyConnection> channels = new ConcurrentHashMapV8();

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof ConnectException) {
            return;
        }
        LOGGER.warn("########## catch some exception:", new Object[]{cause});
    }

    protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
        if (!(message instanceof BaseRequest)) {
            LOGGER.warn("##########receive message error,only support BaseRequest");
            throw new Exception(RECEIVE_MESSAGE_ERROR);
        }
        this.handleRequest(ctx, message);
    }

    public List<Channel> getChannels() {
        return new ArrayList<Channel>(this.channels.keySet());
    }

    public List<Connection> getConnections() {
        return new ArrayList<Connection>(this.channels.values());
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        RemotingRuntimeInfoHolder.getInstance().increaseCountConnectionsAsServer();
        Channel channel = ctx.channel();
        this.channels.putIfAbsent(channel, new NettyConnection(channel));
        super.channelActive(ctx);
        LoggerInit.LOGGER_CONN.info(">>>>>S>>>>>" + ctx.channel());
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        RemotingRuntimeInfoHolder.getInstance().decreaseCountConnectionsAsServer();
        ctx.channel().close();
        this.channels.remove(ctx.channel());
        super.channelInactive(ctx);
        LoggerInit.LOGGER_CONN.info("<<<<<S<<<<<" + ctx.channel());
    }

    private void handleRequest(ChannelHandlerContext ctx, Object message) {
        Connection conneciton = (Connection)this.channels.get(ctx.channel());
        conneciton.refreshLastReadTime(System.currentTimeMillis());
        BaseRequest request = (BaseRequest)message;
        ServerHandler<BaseRequest> serverhandler = request.getServerHandler();
        Executor executor = serverhandler.getExecutor(request);
        if (executor == null) {
            serverhandler.handleRequest(request, conneciton, System.currentTimeMillis());
        } else {
            try {
                executor.execute(new HandlerRunnable(conneciton, request, serverhandler));
            }
            catch (Throwable t) {
                BaseResponse responseWrapper = request.createErrorResponse(MessageFormat.format(ERROR_FORMATTOR, "HSF thread pool is full."));
                responseWrapper.setStatus(ResponseStatus.THREADPOOL_BUSY);
                conneciton.writeReponseToChannel(responseWrapper);
            }
        }
    }

    private static class HandlerRunnable
    implements Runnable {
        private final Connection connection;
        private final BaseRequest message;
        private final ServerHandler<BaseRequest> serverHandler;
        private final long dispatchTime = System.currentTimeMillis();

        public HandlerRunnable(Connection conneciton, BaseRequest message, ServerHandler<BaseRequest> serverHandler) {
            this.connection = conneciton;
            this.message = message;
            this.serverHandler = serverHandler;
        }

        @Override
        public void run() {
            EagleEye.requestSize((long)this.message.size());
            this.serverHandler.handleRequest(this.message, this.connection, this.dispatchTime);
        }
    }
}

