/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.hsf.remoting.netty.client;

import com.ali.com.google.common.collect.Maps;
import com.taobao.hsf.domain.HSFResponse;
import com.taobao.hsf.logger.LoggerInit;
import com.taobao.hsf.remoting.BaseRequest;
import com.taobao.hsf.remoting.BaseResponse;
import com.taobao.hsf.remoting.RemotingRuntimeInfoHolder;
import com.taobao.hsf.remoting.ResponseStatus;
import com.taobao.hsf.remoting.client.BaseRequestWrapperInterface;
import com.taobao.hsf.remoting.client.Client;
import com.taobao.hsf.remoting.client.RequestOfCleanInvalidCallback;
import com.taobao.hsf.remoting.client.RequestWrapperOfCallback;
import com.taobao.hsf.remoting.client.RequestWrapperOfFuture;
import com.taobao.hsf.remoting.client.SendCallBackListener;
import com.taobao.hsf.remoting.netty.NettySharedHolder;
import com.taobao.hsf.remoting.netty.client.NettyClientFactory;
import com.taobao.middleware.logger.Logger;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.ConnectException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class NettyClientHandler
extends ChannelDuplexHandler {
    private static final String ERROR_TO_WRITE_CHANNEL = "Send request error when write to channel";
    private static final Logger LOGGER = LoggerInit.LOGGER;
    private final NettyClientFactory factory;
    private final Map<Long, BaseRequestWrapperInterface> mappers = Maps.newHashMap();

    public NettyClientHandler(NettyClientFactory factory) {
        this.factory = factory;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof BaseResponse) {
            Client cientByChannel;
            BaseResponse response = (BaseResponse)msg;
            BaseRequestWrapperInterface object = this.mappers.remove(response.getRequestID());
            if (object != null) {
                object.onResponse(response);
            } else if (response.getRequestID() == -1L && response.getStatus() == ResponseStatus.SERVER_CLOSING && (cientByChannel = this.factory.getCientByChannel(ctx.channel())) != null) {
                cientByChannel.disableOut();
            }
        } else {
            LOGGER.warn("##########receive message error,only support BaseResponse");
            throw new Exception("receive message error,only support BaseResponse");
        }
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof RequestWrapperOfFuture) {
            this.sendRequestWithFuture(ctx, promise, (RequestWrapperOfFuture)msg);
        } else if (msg instanceof RequestWrapperOfCallback) {
            this.sendRequestWithCallback(ctx, promise, (RequestWrapperOfCallback)msg);
        } else if (msg instanceof RequestOfCleanInvalidCallback) {
            this.cleanupInvalidCallBack(ctx.channel(), ((RequestOfCleanInvalidCallback)msg).getJobStartTime());
        }
    }

    private void cleanupInvalidCallBack(Channel channel, long jobStartTime) {
        if (channel.isActive()) {
            for (BaseRequestWrapperInterface callback : this.mappers.values()) {
                if ((long)callback.getRequest().getTimeout() >= jobStartTime - callback.getStartTime()) continue;
                callback.cleanupInvalidCallBack();
            }
        } else {
            channel.close();
        }
    }

    private void sendRequestWithFuture(final ChannelHandlerContext ctx, ChannelPromise promise, final RequestWrapperOfFuture requestWrapperOfFuture) {
        ChannelFuture writeFuture = ctx.writeAndFlush((Object)requestWrapperOfFuture.getRequest(), promise);
        writeFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    NettyClientHandler.this.mappers.put(requestWrapperOfFuture.getRequest().getRequestID(), requestWrapperOfFuture);
                } else {
                    BaseResponse response = NettyClientHandler.this.handleExecptionCondition2(ctx, requestWrapperOfFuture.getRequest(), channelFuture);
                    requestWrapperOfFuture.set(response);
                    if (!ctx.channel().isActive()) {
                        NettyClientHandler.this.factory.remove(ctx.channel());
                    }
                }
            }
        });
    }

    private void sendRequestWithCallback(final ChannelHandlerContext ctx, ChannelPromise promise, final RequestWrapperOfCallback requestWrapperOfCallback) {
        final BaseRequest request = requestWrapperOfCallback.getRequest();
        this.startTiming(request, ctx);
        ChannelFuture writeFuture = ctx.writeAndFlush((Object)request, promise);
        writeFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    NettyClientHandler.this.mappers.put(request.getRequestID(), requestWrapperOfCallback);
                } else {
                    final HSFResponse response = NettyClientHandler.this.handleExecptionCondition(ctx, request, channelFuture);
                    final SendCallBackListener listener = requestWrapperOfCallback.getListener();
                    if (listener.getExecutor() != null) {
                        listener.getExecutor().execute(new Runnable(){

                            @Override
                            public void run() {
                                listener.onResponse(response);
                            }
                        });
                    } else {
                        listener.onResponse(response);
                    }
                }
            }
        });
    }

    private HSFResponse handleExecptionCondition(ChannelHandlerContext ctx, BaseRequest request, ChannelFuture channelFuture) {
        LOGGER.error("", "##########Send request error when write to channel" + ctx.channel(), channelFuture.cause());
        HSFResponse response = new HSFResponse();
        response.setClientErrorMsg(ERROR_TO_WRITE_CHANNEL);
        return response;
    }

    private BaseResponse handleExecptionCondition2(ChannelHandlerContext ctx, BaseRequest request, ChannelFuture channelFuture) {
        LOGGER.error("", "##########Send request error when write to channel" + ctx.channel(), channelFuture.cause());
        BaseResponse response = request.createErrorResponse(ERROR_TO_WRITE_CHANNEL);
        response.setStatus(ResponseStatus.COMM_ERROR);
        return response;
    }

    private void startTiming(final BaseRequest request, final ChannelHandlerContext ctx) {
        NettySharedHolder.timer.newTimeout(new TimerTask(){

            public void run(Timeout timeoutObject) throws Exception {
                ctx.channel().eventLoop().submit(new Runnable(){

                    @Override
                    public void run() {
                        RequestWrapperOfCallback callBackWrapper = (RequestWrapperOfCallback)NettyClientHandler.this.mappers.remove(request.getRequestID());
                        if (callBackWrapper != null) {
                            String errorInfo = "invalid call because of timeout:" + request.getTimeout();
                            LOGGER.warn("##########" + errorInfo);
                            HSFResponse response = new HSFResponse();
                            response.setClientErrorMsg(errorInfo);
                            callBackWrapper.getListener().onResponse(response);
                        }
                    }
                });
            }
        }, (long)request.getTimeout(), TimeUnit.MILLISECONDS);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof ConnectException) {
            return;
        }
        LOGGER.warn("##########catch some exception:" + ctx.channel(), new Object[]{cause});
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        RemotingRuntimeInfoHolder.getInstance().increaseCountConnectionsAsClient();
        super.channelActive(ctx);
        LoggerInit.LOGGER_CONN.info(">>>>>C>>>>>" + ctx.channel());
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("##########connection closed: " + this.factory.getCientByChannel(ctx.channel()));
        RemotingRuntimeInfoHolder.getInstance().decreaseCountConnectionsAsClient();
        this.removeAllRequestCallBackWhenClose();
        this.factory.remove(ctx.channel());
        super.channelInactive(ctx);
        LoggerInit.LOGGER_CONN.info("<<<<<C<<<<<" + ctx.channel());
    }

    private void removeAllRequestCallBackWhenClose() {
        for (Map.Entry<Long, BaseRequestWrapperInterface> entry : this.mappers.entrySet()) {
            entry.getValue().removeAllRequestCallBackWhenClose();
        }
    }
}

