/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import com.google.protobuf.Message;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.CallRunner;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcExecutor;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.util.ReflectionUtils;

@InterfaceAudience.LimitedPrivate(value={"Coprocesssor", "Phoenix"})
@InterfaceStability.Evolving
public class RWQueueRpcExecutor
extends RpcExecutor {
    private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
    public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.read.ratio";
    public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.scan.ratio";
    private final RpcExecutor.QueueBalancer writeBalancer;
    private final RpcExecutor.QueueBalancer readBalancer;
    private final RpcExecutor.QueueBalancer scanBalancer;
    private final int writeHandlersCount;
    private final int readHandlersCount;
    private final int scanHandlersCount;
    private final int numWriteQueues;
    private final int numReadQueues;
    private final int numScanQueues;
    private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0);
    private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0);
    private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0);

    public RWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength, PriorityFunction priority, Configuration conf, Abortable abortable) {
        super(name, handlerCount, maxQueueLength, priority, conf, abortable);
        float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0.0f);
        float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.0f);
        this.numWriteQueues = RWQueueRpcExecutor.calcNumWriters(this.numCallQueues, callqReadShare);
        this.writeHandlersCount = Math.max(this.numWriteQueues, RWQueueRpcExecutor.calcNumWriters(handlerCount, callqReadShare));
        int readQueues = RWQueueRpcExecutor.calcNumReaders(this.numCallQueues, callqReadShare);
        int readHandlers = Math.max(readQueues, RWQueueRpcExecutor.calcNumReaders(handlerCount, callqReadShare));
        int scanQueues = Math.max(0, (int)Math.floor((float)readQueues * callqScanShare));
        int scanHandlers = Math.max(0, (int)Math.floor((float)readHandlers * callqScanShare));
        if (readQueues - scanQueues > 0) {
            readQueues -= scanQueues;
            readHandlers -= scanHandlers;
        } else {
            scanQueues = 0;
            scanHandlers = 0;
        }
        this.numReadQueues = readQueues;
        this.readHandlersCount = readHandlers;
        this.numScanQueues = scanQueues;
        this.scanHandlersCount = scanHandlers;
        this.writeBalancer = RWQueueRpcExecutor.getBalancer(this.numWriteQueues);
        this.readBalancer = RWQueueRpcExecutor.getBalancer(this.numReadQueues);
        this.scanBalancer = this.numScanQueues > 0 ? RWQueueRpcExecutor.getBalancer(this.numScanQueues) : null;
        this.initializeQueues(this.numWriteQueues);
        this.initializeQueues(this.numReadQueues);
        this.initializeQueues(this.numScanQueues);
        LOG.info((Object)(this.getName() + " writeQueues=" + this.numWriteQueues + " writeHandlers=" + this.writeHandlersCount + " readQueues=" + this.numReadQueues + " readHandlers=" + this.readHandlersCount + " scanQueues=" + this.numScanQueues + " scanHandlers=" + this.scanHandlersCount));
    }

    @Deprecated
    public RWQueueRpcExecutor(String name, int handlerCount, int numQueues, float readShare, int maxQueueLength, Configuration conf, Abortable abortable) {
        this(name, handlerCount, numQueues, readShare, (float)maxQueueLength, 0, conf, abortable, LinkedBlockingQueue.class, new Object[0]);
    }

    @Deprecated
    public RWQueueRpcExecutor(String name, int handlerCount, int numQueues, float readShare, float scanShare, int maxQueueLength) {
        this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null);
    }

    @Deprecated
    public RWQueueRpcExecutor(String name, int handlerCount, int numQueues, float readShare, float scanShare, int maxQueueLength, Configuration conf, Abortable abortable) {
        this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, conf, abortable, LinkedBlockingQueue.class, new Object[0]);
    }

    @Deprecated
    public RWQueueRpcExecutor(String name, int handlerCount, int numQueues, float readShare, int maxQueueLength, Configuration conf, Abortable abortable, Class<? extends BlockingQueue> readQueueClass, Object ... readQueueInitArgs) {
        this(name, handlerCount, numQueues, readShare, 0.0f, maxQueueLength, conf, abortable, readQueueClass, readQueueInitArgs);
    }

    @Deprecated
    public RWQueueRpcExecutor(String name, int handlerCount, int numQueues, float readShare, float scanShare, int maxQueueLength, Configuration conf, Abortable abortable, Class<? extends BlockingQueue> readQueueClass, Object ... readQueueInitArgs) {
        this(name, RWQueueRpcExecutor.calcNumWriters(handlerCount, readShare), RWQueueRpcExecutor.calcNumReaders(handlerCount, readShare), RWQueueRpcExecutor.calcNumWriters(numQueues, readShare), RWQueueRpcExecutor.calcNumReaders(numQueues, readShare), scanShare, LinkedBlockingQueue.class, new Object[]{maxQueueLength}, readQueueClass, ArrayUtils.addAll((Object[])new Object[]{maxQueueLength}, (Object[])readQueueInitArgs));
    }

    @Deprecated
    public RWQueueRpcExecutor(String name, int handlerCount, int numQueues, float readShare, float scanShare, Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs, Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
        this(name, RWQueueRpcExecutor.calcNumWriters(handlerCount, readShare), RWQueueRpcExecutor.calcNumReaders(handlerCount, readShare), RWQueueRpcExecutor.calcNumWriters(numQueues, readShare), RWQueueRpcExecutor.calcNumReaders(numQueues, readShare), scanShare, writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs);
    }

    @Deprecated
    public RWQueueRpcExecutor(String name, int writeHandlers, int readHandlers, int numWriteQueues, int numReadQueues, Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs, Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
        this(name, writeHandlers, readHandlers, numWriteQueues, numReadQueues, 0.0f, writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs);
    }

    @Deprecated
    public RWQueueRpcExecutor(String name, int writeHandlers, int readHandlers, int numWriteQueues, int numReadQueues, float scanShare, Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs, Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
        super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues), numWriteQueues + numReadQueues);
        int i;
        this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
        this.numWriteQueues = numWriteQueues;
        int numScanQueues = Math.max(0, (int)Math.floor((float)numReadQueues * scanShare));
        int scanHandlers = Math.max(0, (int)Math.floor((float)readHandlers * scanShare));
        if (numReadQueues - numScanQueues > 0) {
            numReadQueues -= numScanQueues;
            readHandlers -= scanHandlers;
        } else {
            numScanQueues = 0;
            scanHandlers = 0;
        }
        this.readHandlersCount = Math.max(readHandlers, numReadQueues);
        this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
        this.numReadQueues = numReadQueues;
        this.numScanQueues = numScanQueues;
        this.writeBalancer = RWQueueRpcExecutor.getBalancer(numWriteQueues);
        this.readBalancer = RWQueueRpcExecutor.getBalancer(numReadQueues);
        this.scanBalancer = numScanQueues > 0 ? RWQueueRpcExecutor.getBalancer(numScanQueues) : null;
        LOG.info((Object)(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + this.writeHandlersCount + " readQueues=" + numReadQueues + " readHandlers=" + this.readHandlersCount + " scanQueues=" + numScanQueues + " scanHandlers=" + this.scanHandlersCount));
        if (writeQueueInitArgs.length > 0) {
            this.currentQueueLimit = (Integer)writeQueueInitArgs[0];
            writeQueueInitArgs[0] = Math.max((Integer)writeQueueInitArgs[0], 250);
        }
        for (i = 0; i < numWriteQueues; ++i) {
            this.queues.add((BlockingQueue)ReflectionUtils.newInstance(writeQueueClass, (Object[])writeQueueInitArgs));
        }
        if (readQueueInitArgs.length > 0) {
            this.currentQueueLimit = (Integer)readQueueInitArgs[0];
            readQueueInitArgs[0] = Math.max((Integer)readQueueInitArgs[0], 250);
        }
        for (i = 0; i < numReadQueues + numScanQueues; ++i) {
            this.queues.add((BlockingQueue)ReflectionUtils.newInstance(readQueueClass, (Object[])readQueueInitArgs));
        }
    }

    @Override
    protected int computeNumCallQueues(int handlerCount, float callQueuesHandlersFactor) {
        return Math.max(2, Math.round((float)handlerCount * callQueuesHandlersFactor));
    }

    @Override
    protected void startHandlers(int port) {
        this.startHandlers(".write", this.writeHandlersCount, this.queues, 0, this.numWriteQueues, port, this.activeWriteHandlerCount);
        this.startHandlers(".read", this.readHandlersCount, this.queues, this.numWriteQueues, this.numReadQueues, port, this.activeReadHandlerCount);
        if (this.numScanQueues > 0) {
            this.startHandlers(".scan", this.scanHandlersCount, this.queues, this.numWriteQueues + this.numReadQueues, this.numScanQueues, port, this.activeScanHandlerCount);
        }
    }

    @Override
    public boolean dispatch(CallRunner callTask) throws InterruptedException {
        RpcServer.Call call = callTask.getCall();
        int queueIndex = this.isWriteRequest(call.getHeader(), call.param) ? this.writeBalancer.getNextQueue() : (this.numScanQueues > 0 && this.isScanRequest(call.getHeader(), call.param) ? this.numWriteQueues + this.numReadQueues + this.scanBalancer.getNextQueue() : this.numWriteQueues + this.readBalancer.getNextQueue());
        BlockingQueue queue = (BlockingQueue)this.queues.get(queueIndex);
        if (queue.size() >= this.currentQueueLimit) {
            return false;
        }
        return queue.offer(callTask);
    }

    @Override
    public int getWriteQueueLength() {
        int length = 0;
        for (int i = 0; i < this.numWriteQueues; ++i) {
            length += ((BlockingQueue)this.queues.get(i)).size();
        }
        return length;
    }

    @Override
    public int getReadQueueLength() {
        int length = 0;
        for (int i = this.numWriteQueues; i < this.numWriteQueues + this.numReadQueues; ++i) {
            length += ((BlockingQueue)this.queues.get(i)).size();
        }
        return length;
    }

    @Override
    public int getScanQueueLength() {
        int length = 0;
        for (int i = this.numWriteQueues + this.numReadQueues; i < this.numWriteQueues + this.numReadQueues + this.numScanQueues; ++i) {
            length += ((BlockingQueue)this.queues.get(i)).size();
        }
        return length;
    }

    @Override
    public int getActiveHandlerCount() {
        return this.activeWriteHandlerCount.get() + this.activeReadHandlerCount.get() + this.activeScanHandlerCount.get();
    }

    @Override
    public int getActiveWriteHandlerCount() {
        return this.activeWriteHandlerCount.get();
    }

    @Override
    public int getActiveReadHandlerCount() {
        return this.activeReadHandlerCount.get();
    }

    @Override
    public int getActiveScanHandlerCount() {
        return this.activeScanHandlerCount.get();
    }

    private boolean isWriteRequest(RPCProtos.RequestHeader header, Message param) {
        if (param instanceof ClientProtos.MultiRequest) {
            ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest)param;
            for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
                for (ClientProtos.Action action : regionAction.getActionList()) {
                    if (!action.hasMutation()) continue;
                    return true;
                }
            }
        }
        if (param instanceof ClientProtos.MutateRequest) {
            return true;
        }
        if (param instanceof RegionServerStatusProtos.ReportRegionStateTransitionRequest) {
            return true;
        }
        if (param instanceof RegionServerStatusProtos.RegionServerStartupRequest) {
            return true;
        }
        return param instanceof RegionServerStatusProtos.RegionServerReportRequest;
    }

    private boolean isScanRequest(RPCProtos.RequestHeader header, Message param) {
        if (param instanceof ClientProtos.ScanRequest) {
            ClientProtos.ScanRequest request = (ClientProtos.ScanRequest)param;
            return request.hasScannerId();
        }
        return false;
    }

    private static int calcNumWriters(int count, float readShare) {
        return Math.max(1, count - Math.max(1, Math.round((float)count * readShare)));
    }

    private static int calcNumReaders(int count, float readShare) {
        return count - RWQueueRpcExecutor.calcNumWriters(count, readShare);
    }
}

