/*
 * Decompiled with CFR 0.152.
 */
package alibaba.drcnet.connection;

import alibaba.drcnet.buffer.CacheBuff;
import alibaba.drcnet.config.DRCNetConfig;
import alibaba.drcnet.config.UserConfig;
import alibaba.drcnet.connection.Connection;
import alibaba.drcnet.reactor.DRCNetReactor;
import alibaba.drcnet.util.Constant;
import alibaba.drcnet.util.DRCNetMessageInfo;
import alibaba.drcnet.util.MessageBox;
import alibaba.drcnet.util.MessageV1;
import alibaba.drcnet.util.SyncState;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.Attribute;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleDecomressConnection
implements Connection {
    private static final Logger log = LoggerFactory.getLogger(SingleDecomressConnection.class);
    private Bootstrap bootstrap = null;
    private EventLoopGroup workerGroup = null;
    private UserConfig userConfig = null;
    private ChannelFuture channelFuture = null;
    private Channel channel = null;
    private String ip = null;
    private String port = null;
    private CacheBuff cacheBuff = null;
    private SyncState syncState = null;
    private DRCNetConfig drcnetConfig = null;
    private volatile boolean stopped = false;
    private byte[] headerBuff = new byte[4];
    private byte[] messageIDBuff = new byte[4];
    private byte[] bigmsgLenBuff = new byte[4];
    private byte isBigMsg = 0;
    private long msgLen = 0L;
    private long orgmsgLen = 0L;
    private volatile boolean syncOK = false;
    private int connectionTimeOut = 120;
    private DRCNetReactor reactor = null;
    MessageBox messageBox = null;

    public UserConfig getUserConfig() {
        return this.userConfig;
    }

    public void setUserConfig(UserConfig userConfig) {
        this.userConfig = userConfig;
    }

    @Override
    public void setSyncOK() {
        this.syncOK = true;
    }

    @Override
    public int writeData(DRCNetMessageInfo msgInfo) {
        return 0;
    }

    @Override
    public MessageBox getMessageBox() {
        return this.messageBox;
    }

    @Override
    public int readData(DRCNetMessageInfo msgInfo, boolean usingMessageId) {
        int ret = -1;
        while (!this.stopped && (ret = this.cacheBuff.readData(this.headerBuff, 4)) != 4) {
            if (ret == 0) {
                this.messageBox.Wait();
                continue;
            }
            log.error("read buf failed");
            return -1;
        }
        if (this.stopped) {
            return -1;
        }
        if (usingMessageId) {
            ret = this.cacheBuff.readData(this.messageIDBuff, 4);
            if (ret != 4) {
                log.error("read len buf failed");
                return -1;
            }
            msgInfo.messageID = MessageV1.getInt32(this.messageIDBuff, 0);
        }
        this.isBigMsg = this.headerBuff[0];
        this.msgLen = MessageV1.getHeaderLen(this.headerBuff);
        msgInfo.type = this.isBigMsg;
        if (!MessageV1.isBigMsg(this.isBigMsg)) {
            msgInfo.isBigMsg = false;
            msgInfo.orgLen = this.msgLen;
        } else {
            msgInfo.isBigMsg = true;
            ret = this.cacheBuff.readData(this.bigmsgLenBuff, 4);
            if (ret != 4) {
                log.error("read len buf failed");
                return -1;
            }
            msgInfo.orgLen = MessageV1.getInt32(this.bigmsgLenBuff, 0);
        }
        byte[] retBuf = new byte[(int)this.msgLen];
        msgInfo.bufLen = this.cacheBuff.readData(retBuf, (int)this.msgLen);
        msgInfo.buf = retBuf;
        return 0;
    }

    public int handleWriteData() {
        return 0;
    }

    public int handleReadData() {
        return 0;
    }

    @Override
    public int startConnection(String ip, String port, UserConfig userConfig, DRCNetConfig netConfig, SyncState syncState, int ConncitonTimeOut) {
        this.bootstrap = new Bootstrap();
        this.workerGroup = new NioEventLoopGroup();
        this.messageBox = new MessageBox();
        this.cacheBuff = new CacheBuff(this.messageBox);
        this.reactor = new DRCNetReactor();
        if (this.workerGroup == null || this.cacheBuff == null || this.headerBuff == null || this.reactor == null) {
            log.error("get work group failed");
            return -1;
        }
        this.userConfig = userConfig;
        if (ip == null || port == null || syncState == null || null == netConfig) {
            log.error("ip or port or syncState or netconfig missing");
            return -1;
        }
        this.syncState = syncState;
        this.drcnetConfig = netConfig;
        this.ip = ip;
        this.port = port;
        this.bootstrap.group(this.workerGroup);
        this.bootstrap.channel(NioSocketChannel.class);
        this.bootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)true);
        this.bootstrap.option(ChannelOption.TCP_NODELAY, (Object)true);
        this.bootstrap.option(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT);
        this.connectionTimeOut = ConncitonTimeOut;
        log.warn("drcnet client read timeout: " + this.connectionTimeOut);
        this.bootstrap.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast("readTimeoutHandler", (ChannelHandler)new ReadTimeoutHandler(SingleDecomressConnection.this.connectionTimeOut));
                ch.pipeline().addLast(new ChannelHandler[]{SingleDecomressConnection.this.reactor});
            }
        });
        return this.connectDrcNet();
    }

    private int connectDrcNet() {
        if (this.ip == null || this.port == null) {
            log.error("connect error: ip or port missed");
            return -1;
        }
        this.channelFuture = this.bootstrap.connect(this.ip, Integer.parseInt(this.port));
        this.channel = this.channelFuture.channel();
        ChannelFuture closeFuture = this.channel.closeFuture();
        closeFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                log.warn("connection closed");
                Thread.sleep(2000L);
                SingleDecomressConnection.this.stopConnection();
            }
        });
        Attribute attribute = this.channelFuture.channel().attr(Constant.configKey);
        attribute.set((Object)this.userConfig);
        attribute = this.channelFuture.channel().attr(Constant.cacheBuffer);
        attribute.set((Object)this.cacheBuff);
        attribute = this.channelFuture.channel().attr(Constant.syncState);
        attribute.set((Object)this.syncState);
        attribute = this.channelFuture.channel().attr(Constant.connection);
        attribute.set((Object)this);
        attribute = this.channelFuture.channel().attr(Constant.drcnetConfig);
        attribute.set((Object)this.drcnetConfig);
        this.reactor.setInitOK();
        this.channelFuture.syncUninterruptibly();
        int retryTime = 10;
        while (!this.syncOK && !this.stopped && 0 < retryTime--) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                log.error("thread interrupt");
            }
        }
        if (!this.syncOK || this.stopped) {
            log.error("drcnet error: sync with server timeout");
            return -1;
        }
        return 0;
    }

    @Override
    public void stopConnection() {
        try {
            this.channelFuture.channel().close();
            if (this.cacheBuff != null) {
                log.warn("drcnet stopping...,drcnet recv buffer is set stopped, ignore data from netty");
                this.cacheBuff.setStop();
            }
            this.channelFuture.channel().closeFuture().sync();
        }
        catch (Exception e) {
            log.error("close connection interrupted: " + e);
        }
        finally {
            try {
                this.workerGroup.shutdownGracefully();
            }
            catch (Exception e) {
                log.error("shutdown netty workgroup failed, cause " + e.toString());
            }
            log.warn("connection is stopped");
            this.stopped = true;
            this.messageBox.Signal();
        }
    }
}

