/*
 * Decompiled with CFR 0.152.
 */
package io.seata.core.rpc.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.EventExecutorGroup;
import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.exception.FrameworkException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.NetUtil;
import io.seata.common.util.StringUtils;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.MergeMessage;
import io.seata.core.protocol.MergedWarpMessage;
import io.seata.core.protocol.MessageFuture;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.protocol.transaction.AbstractGlobalEndRequest;
import io.seata.core.protocol.transaction.BranchRegisterRequest;
import io.seata.core.protocol.transaction.BranchReportRequest;
import io.seata.core.protocol.transaction.GlobalBeginRequest;
import io.seata.core.rpc.RemotingClient;
import io.seata.core.rpc.TransactionMessageHandler;
import io.seata.core.rpc.netty.AbstractNettyRemoting;
import io.seata.core.rpc.netty.NettyClientBootstrap;
import io.seata.core.rpc.netty.NettyClientChannelManager;
import io.seata.core.rpc.netty.NettyClientConfig;
import io.seata.core.rpc.netty.NettyPoolKey;
import io.seata.core.rpc.netty.NettyPoolableFactory;
import io.seata.core.rpc.processor.Pair;
import io.seata.core.rpc.processor.RemotingProcessor;
import io.seata.discovery.loadbalance.LoadBalanceFactory;
import io.seata.discovery.registry.RegistryFactory;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractNettyRemotingClient
extends AbstractNettyRemoting
implements RemotingClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNettyRemotingClient.class);
    private static final String MSG_ID_PREFIX = "msgId:";
    private static final String FUTURES_PREFIX = "futures:";
    private static final String SINGLE_LOG_POSTFIX = ";";
    private static final int MAX_MERGE_SEND_MILLS = 1;
    private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
    private static final int MAX_MERGE_SEND_THREAD = 1;
    private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
    private static final long SCHEDULE_DELAY_MILLS = 60000L;
    private static final long SCHEDULE_INTERVAL_MILLS = 10000L;
    private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";
    protected final Object mergeLock = new Object();
    protected final Map<Integer, MergeMessage> mergeMsgMap = new ConcurrentHashMap<Integer, MergeMessage>();
    protected final ConcurrentHashMap<String, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap();
    private final NettyClientBootstrap clientBootstrap;
    private NettyClientChannelManager clientChannelManager;
    private final NettyPoolKey.TransactionRole transactionRole;
    private ExecutorService mergeSendExecutorService;
    private TransactionMessageHandler transactionMessageHandler;
    protected volatile boolean enableClientBatchSendRequest;

    @Override
    public void init() {
        this.timerExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                AbstractNettyRemotingClient.this.clientChannelManager.reconnect(AbstractNettyRemotingClient.this.getTransactionServiceGroup());
            }
        }, 60000L, 10000L, TimeUnit.MILLISECONDS);
        if (this.isEnableClientBatchSendRequest()) {
            this.mergeSendExecutorService = new ThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(this.getThreadPrefix(), 1));
            this.mergeSendExecutorService.submit(new MergedSendRunnable());
        }
        super.init();
        this.clientBootstrap.start();
    }

    public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
        super(messageExecutor);
        this.transactionRole = transactionRole;
        this.clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
        this.clientBootstrap.setChannelHandlers(new ChannelHandler[]{new ClientHandler()});
        this.clientChannelManager = new NettyClientChannelManager(new NettyPoolableFactory(this, this.clientBootstrap), this.getPoolKeyFunction(), nettyClientConfig);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Object sendSyncRequest(Object msg) throws TimeoutException {
        String serverAddress = this.loadBalance(this.getTransactionServiceGroup(), msg);
        long timeoutMillis = this.getRpcRequestTimeout();
        RpcMessage rpcMessage = this.buildRequestMessage(msg, (byte)0);
        if (this.isEnableClientBatchSendRequest()) {
            MessageFuture messageFuture = new MessageFuture();
            messageFuture.setRequestMessage(rpcMessage);
            messageFuture.setTimeout(timeoutMillis);
            this.futures.put(rpcMessage.getId(), messageFuture);
            BlockingQueue basket = CollectionUtils.computeIfAbsent(this.basketMap, serverAddress, key -> new LinkedBlockingQueue());
            if (!basket.offer(rpcMessage)) {
                LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}", (Object)serverAddress, (Object)rpcMessage);
                return null;
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("offer message: {}", rpcMessage.getBody());
            }
            if (!this.isSending) {
                Object object = this.mergeLock;
                synchronized (object) {
                    this.mergeLock.notifyAll();
                }
            }
            try {
                return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            }
            catch (Exception exx) {
                LOGGER.error("wait response error:{},ip:{},request:{}", new Object[]{exx.getMessage(), serverAddress, rpcMessage.getBody()});
                if (exx instanceof TimeoutException) {
                    throw (TimeoutException)exx;
                }
                throw new RuntimeException(exx);
            }
        }
        Channel channel = this.clientChannelManager.acquireChannel(serverAddress);
        return super.sendSync(channel, rpcMessage, timeoutMillis);
    }

    @Override
    public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException {
        if (channel == null) {
            LOGGER.warn("sendSyncRequest nothing, caused by null channel.");
            return null;
        }
        RpcMessage rpcMessage = this.buildRequestMessage(msg, (byte)0);
        return super.sendSync(channel, rpcMessage, this.getRpcRequestTimeout());
    }

    @Override
    public void sendAsyncRequest(Channel channel, Object msg) {
        if (channel == null) {
            LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");
            return;
        }
        RpcMessage rpcMessage = this.buildRequestMessage(msg, msg instanceof HeartbeatMessage ? (byte)3 : 2);
        if (rpcMessage.getBody() instanceof MergeMessage) {
            this.mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)rpcMessage.getBody());
        }
        super.sendAsync(channel, rpcMessage);
    }

    @Override
    public void sendAsyncResponse(String serverAddress, RpcMessage rpcMessage, Object msg) {
        RpcMessage rpcMsg = this.buildResponseMessage(rpcMessage, msg, (byte)1);
        Channel channel = this.clientChannelManager.acquireChannel(serverAddress);
        super.sendAsync(channel, rpcMsg);
    }

    @Override
    public void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {
        Pair<RemotingProcessor, ExecutorService> pair = new Pair<RemotingProcessor, ExecutorService>(processor, executor);
        this.processorTable.put(requestCode, pair);
    }

    @Override
    public void destroyChannel(String serverAddress, Channel channel) {
        this.clientChannelManager.destroyChannel(serverAddress, channel);
    }

    @Override
    public void destroy() {
        this.clientBootstrap.shutdown();
        if (this.mergeSendExecutorService != null) {
            this.mergeSendExecutorService.shutdown();
        }
        super.destroy();
    }

    public void setTransactionMessageHandler(TransactionMessageHandler transactionMessageHandler) {
        this.transactionMessageHandler = transactionMessageHandler;
    }

    public TransactionMessageHandler getTransactionMessageHandler() {
        return this.transactionMessageHandler;
    }

    public NettyClientChannelManager getClientChannelManager() {
        return this.clientChannelManager;
    }

    protected String loadBalance(String transactionServiceGroup, Object msg) {
        InetSocketAddress address = null;
        try {
            List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().aliveLookup(transactionServiceGroup);
            address = this.doSelect(inetSocketAddressList, msg);
        }
        catch (Exception ex) {
            LOGGER.error(ex.getMessage());
        }
        if (address == null) {
            throw new FrameworkException(FrameworkErrorCode.NoAvailableService);
        }
        return NetUtil.toStringAddress(address);
    }

    protected InetSocketAddress doSelect(List<InetSocketAddress> list, Object msg) throws Exception {
        if (CollectionUtils.isNotEmpty(list)) {
            if (list.size() > 1) {
                return LoadBalanceFactory.getInstance().select(list, this.getXid(msg));
            }
            return list.get(0);
        }
        return null;
    }

    protected String getXid(Object msg) {
        String xid = "";
        if (msg instanceof AbstractGlobalEndRequest) {
            xid = ((AbstractGlobalEndRequest)msg).getXid();
        } else if (msg instanceof GlobalBeginRequest) {
            xid = ((GlobalBeginRequest)msg).getTransactionName();
        } else if (msg instanceof BranchRegisterRequest) {
            xid = ((BranchRegisterRequest)msg).getXid();
        } else if (msg instanceof BranchReportRequest) {
            xid = ((BranchReportRequest)msg).getXid();
        } else {
            try {
                Field field = msg.getClass().getDeclaredField("xid");
                xid = String.valueOf(field.get(msg));
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        return StringUtils.isBlank(xid) ? String.valueOf(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)) : xid;
    }

    private String getThreadPrefix() {
        return "rpcMergeMessageSend_" + this.transactionRole.name();
    }

    protected abstract Function<String, NettyPoolKey> getPoolKeyFunction();

    protected abstract String getTransactionServiceGroup();

    protected abstract boolean isEnableClientBatchSendRequest();

    protected abstract long getRpcRequestTimeout();

    @ChannelHandler.Sharable
    class ClientHandler
    extends ChannelDuplexHandler {
        ClientHandler() {
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof RpcMessage)) {
                return;
            }
            AbstractNettyRemotingClient.this.processMessage(ctx, (RpcMessage)msg);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void channelWritabilityChanged(ChannelHandlerContext ctx) {
            Object object = AbstractNettyRemotingClient.this.lock;
            synchronized (object) {
                if (ctx.channel().isWritable()) {
                    AbstractNettyRemotingClient.this.lock.notifyAll();
                }
            }
            ctx.fireChannelWritabilityChanged();
        }

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            if (AbstractNettyRemotingClient.this.messageExecutor.isShutdown()) {
                return;
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("channel inactive: {}", (Object)ctx.channel());
            }
            AbstractNettyRemotingClient.this.clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress()));
            super.channelInactive(ctx);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent idleStateEvent = (IdleStateEvent)evt;
                if (idleStateEvent.state() == IdleState.READER_IDLE) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("channel {} read idle.", (Object)ctx.channel());
                    }
                    try {
                        String serverAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
                        AbstractNettyRemotingClient.this.clientChannelManager.invalidateObject(serverAddress, ctx.channel());
                    }
                    catch (Exception exx) {
                        LOGGER.error(exx.getMessage());
                    }
                    finally {
                        AbstractNettyRemotingClient.this.clientChannelManager.releaseChannel(ctx.channel(), AbstractNettyRemotingClient.this.getAddressFromContext(ctx));
                    }
                }
                if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {
                    try {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("will send ping msg,channel {}", (Object)ctx.channel());
                        }
                        AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), HeartbeatMessage.PING);
                    }
                    catch (Throwable throwable) {
                        LOGGER.error("send request error: {}", (Object)throwable.getMessage(), (Object)throwable);
                    }
                }
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(), (Object)(NetUtil.toStringAddress(ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage()), (Object)cause);
            AbstractNettyRemotingClient.this.clientChannelManager.releaseChannel(ctx.channel(), AbstractNettyRemotingClient.this.getAddressFromChannel(ctx.channel()));
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("remove exception rm channel:{}", (Object)ctx.channel());
            }
            super.exceptionCaught(ctx, cause);
        }

        public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(ctx + " will closed");
            }
            super.close(ctx, future);
        }
    }

    private class MergedSendRunnable
    implements Runnable {
        private MergedSendRunnable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (true) {
                Object object = AbstractNettyRemotingClient.this.mergeLock;
                synchronized (object) {
                    try {
                        AbstractNettyRemotingClient.this.mergeLock.wait(1L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
                AbstractNettyRemotingClient.this.isSending = true;
                AbstractNettyRemotingClient.this.basketMap.forEach((address, basket) -> {
                    if (basket.isEmpty()) {
                        return;
                    }
                    MergedWarpMessage mergeMessage = new MergedWarpMessage();
                    while (!basket.isEmpty()) {
                        RpcMessage msg = (RpcMessage)basket.poll();
                        mergeMessage.msgs.add((AbstractMessage)msg.getBody());
                        mergeMessage.msgIds.add(msg.getId());
                    }
                    if (mergeMessage.msgIds.size() > 1) {
                        this.printMergeMessageLog(mergeMessage);
                    }
                    Channel sendChannel = null;
                    try {
                        sendChannel = AbstractNettyRemotingClient.this.clientChannelManager.acquireChannel((String)address);
                        AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
                    }
                    catch (FrameworkException e) {
                        if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                            AbstractNettyRemotingClient.this.destroyChannel((String)address, sendChannel);
                        }
                        for (Integer msgId : mergeMessage.msgIds) {
                            MessageFuture messageFuture = (MessageFuture)AbstractNettyRemotingClient.this.futures.remove(msgId);
                            if (messageFuture == null) continue;
                            messageFuture.setResultMessage(new RuntimeException(String.format("%s is unreachable", address), e));
                        }
                        LOGGER.error("client merge call failed: {}", (Object)e.getMessage(), (Object)e);
                    }
                });
                AbstractNettyRemotingClient.this.isSending = false;
            }
        }

        private void printMergeMessageLog(MergedWarpMessage mergeMessage) {
            if (LOGGER.isDebugEnabled()) {
                long l;
                LOGGER.debug("merge msg size:{}", (Object)mergeMessage.msgIds.size());
                for (AbstractMessage cm : mergeMessage.msgs) {
                    LOGGER.debug(cm.toString());
                }
                StringBuilder sb = new StringBuilder();
                Iterator<Integer> iterator = mergeMessage.msgIds.iterator();
                while (iterator.hasNext()) {
                    l = iterator.next().intValue();
                    sb.append(AbstractNettyRemotingClient.MSG_ID_PREFIX).append(l).append(AbstractNettyRemotingClient.SINGLE_LOG_POSTFIX);
                }
                sb.append("\n");
                iterator = ((ConcurrentHashMap.KeySetView)AbstractNettyRemotingClient.this.futures.keySet()).iterator();
                while (iterator.hasNext()) {
                    l = iterator.next().intValue();
                    sb.append(AbstractNettyRemotingClient.FUTURES_PREFIX).append(l).append(AbstractNettyRemotingClient.SINGLE_LOG_POSTFIX);
                }
                LOGGER.debug(sb.toString());
            }
        }
    }
}

