/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.query.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateServerHandler;
import org.apache.flink.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KvStateServer {
    private static final Logger LOG = LoggerFactory.getLogger(KvStateServer.class);
    private static final int LOW_WATER_MARK = 8192;
    private static final int HIGH_WATER_MARK = 32768;
    private final ServerBootstrap bootstrap;
    private final ExecutorService queryExecutor;
    private KvStateServerAddress serverAddress;

    public KvStateServer(InetAddress bindAddress, int bindPort, int numEventLoopThreads, int numQueryThreads, KvStateRegistry kvStateRegistry, KvStateRequestStats stats) {
        Preconditions.checkArgument((bindPort >= 0 && bindPort <= 65536 ? 1 : 0) != 0, (Object)("Port " + bindPort + " is out of valid port range (0-65536)."));
        Preconditions.checkArgument((numEventLoopThreads >= 1 ? 1 : 0) != 0, (Object)"Non-positive number of event loop threads.");
        Preconditions.checkArgument((numQueryThreads >= 1 ? 1 : 0) != 0, (Object)"Non-positive number of query threads.");
        Preconditions.checkNotNull((Object)kvStateRegistry, (String)"KvStateRegistry");
        Preconditions.checkNotNull((Object)stats, (String)"KvStateRequestStats");
        NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flink KvStateServer EventLoop Thread %d").build();
        NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
        this.queryExecutor = KvStateServer.createQueryExecutor(numQueryThreads);
        KvStateServerHandler serverHandler = new KvStateServerHandler(kvStateRegistry, this.queryExecutor, stats);
        this.bootstrap = ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().localAddress(bindAddress, bindPort)).group((EventLoopGroup)nioGroup).channel(NioServerSocketChannel.class)).option(ChannelOption.ALLOCATOR, (Object)bufferPool)).childOption(ChannelOption.ALLOCATOR, (Object)bufferPool).childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, (Object)8192).childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, (Object)32768).childHandler((ChannelHandler)new KvStateServerChannelInitializer(serverHandler));
    }

    public void start() throws InterruptedException {
        Channel channel = this.bootstrap.bind().sync().channel();
        InetSocketAddress localAddress = (InetSocketAddress)channel.localAddress();
        this.serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
    }

    public KvStateServerAddress getAddress() {
        if (this.serverAddress == null) {
            throw new IllegalStateException("KvStateServer not started yet.");
        }
        return this.serverAddress;
    }

    public void shutDown() {
        EventLoopGroup group;
        if (this.bootstrap != null && (group = this.bootstrap.group()) != null) {
            group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
        }
        if (this.queryExecutor != null) {
            this.queryExecutor.shutdown();
        }
        this.serverAddress = null;
    }

    private static ExecutorService createQueryExecutor(int numQueryThreads) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flink KvStateServer Query Thread %d").build();
        return Executors.newFixedThreadPool(numQueryThreads, threadFactory);
    }

    private static final class KvStateServerChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final KvStateServerHandler sharedRequestHandler;

        public KvStateServerChannelInitializer(KvStateServerHandler sharedRequestHandler) {
            this.sharedRequestHandler = (KvStateServerHandler)((Object)Preconditions.checkNotNull((Object)((Object)sharedRequestHandler), (String)"Request handler"));
        }

        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new ChannelHandler[]{new ChunkedWriteHandler()}).addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)}).addLast(new ChannelHandler[]{this.sharedRequestHandler});
        }
    }
}

