/*
 * Decompiled with CFR 0.152.
 */
package com.espertech.esper.core.thread;

import com.espertech.esper.client.ConfigurationEngineDefaults;
import com.espertech.esper.core.service.EPRuntimeImpl;
import com.espertech.esper.core.service.EPServicesContext;
import com.espertech.esper.core.thread.EngineThreadFactory;
import com.espertech.esper.core.thread.InboundUnitRunnable;
import com.espertech.esper.core.thread.OutboundUnitRunnable;
import com.espertech.esper.core.thread.RouteUnitRunnable;
import com.espertech.esper.core.thread.ThreadingOption;
import com.espertech.esper.core.thread.ThreadingService;
import com.espertech.esper.core.thread.TimerUnit;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ThreadingServiceImpl
implements ThreadingService {
    private static final Log log = LogFactory.getLog(ThreadingServiceImpl.class);
    private final ConfigurationEngineDefaults.Threading config;
    private final boolean isTimerThreading;
    private final boolean isInboundThreading;
    private final boolean isRouteThreading;
    private final boolean isOutboundThreading;
    private BlockingQueue<Runnable> timerQueue;
    private BlockingQueue<Runnable> inboundQueue;
    private BlockingQueue<Runnable> routeQueue;
    private BlockingQueue<Runnable> outboundQueue;
    private ThreadPoolExecutor timerThreadPool;
    private ThreadPoolExecutor inboundThreadPool;
    private ThreadPoolExecutor routeThreadPool;
    private ThreadPoolExecutor outboundThreadPool;

    public ThreadingServiceImpl(ConfigurationEngineDefaults.Threading threadingConfig) {
        this.config = threadingConfig;
        if (ThreadingOption.isThreadingEnabled()) {
            this.isTimerThreading = threadingConfig.isThreadPoolTimerExec();
            this.isInboundThreading = threadingConfig.isThreadPoolInbound();
            this.isRouteThreading = threadingConfig.isThreadPoolRouteExec();
            this.isOutboundThreading = threadingConfig.isThreadPoolOutbound();
        } else {
            this.isTimerThreading = false;
            this.isInboundThreading = false;
            this.isRouteThreading = false;
            this.isOutboundThreading = false;
        }
    }

    @Override
    public boolean isRouteThreading() {
        return this.isRouteThreading;
    }

    @Override
    public boolean isInboundThreading() {
        return this.isInboundThreading;
    }

    @Override
    public boolean isTimerThreading() {
        return this.isTimerThreading;
    }

    @Override
    public boolean isOutboundThreading() {
        return this.isOutboundThreading;
    }

    @Override
    public void initThreading(EPServicesContext services, EPRuntimeImpl runtime) {
        if (this.isInboundThreading) {
            this.inboundQueue = this.makeQueue(this.config.getThreadPoolInboundCapacity());
            this.inboundThreadPool = this.getThreadPool(services.getEngineURI(), "Inbound", this.inboundQueue, this.config.getThreadPoolInboundNumThreads());
        }
        if (this.isTimerThreading) {
            this.timerQueue = this.makeQueue(this.config.getThreadPoolTimerExecCapacity());
            this.timerThreadPool = this.getThreadPool(services.getEngineURI(), "TimerExec", this.timerQueue, this.config.getThreadPoolTimerExecNumThreads());
        }
        if (this.isRouteThreading) {
            this.routeQueue = this.makeQueue(this.config.getThreadPoolRouteExecCapacity());
            this.routeThreadPool = this.getThreadPool(services.getEngineURI(), "RouteExec", this.routeQueue, this.config.getThreadPoolRouteExecNumThreads());
        }
        if (this.isOutboundThreading) {
            this.outboundQueue = this.makeQueue(this.config.getThreadPoolOutboundCapacity());
            this.outboundThreadPool = this.getThreadPool(services.getEngineURI(), "Outbound", this.outboundQueue, this.config.getThreadPoolOutboundNumThreads());
        }
    }

    private BlockingQueue<Runnable> makeQueue(Integer threadPoolTimerExecCapacity) {
        if (threadPoolTimerExecCapacity == null || threadPoolTimerExecCapacity <= 0 || threadPoolTimerExecCapacity == Integer.MAX_VALUE) {
            return new LinkedBlockingQueue<Runnable>();
        }
        return new ArrayBlockingQueue<Runnable>(threadPoolTimerExecCapacity);
    }

    @Override
    public void submitRoute(RouteUnitRunnable unit) {
        try {
            this.routeQueue.put(unit);
        }
        catch (InterruptedException e) {
            log.info((Object)("Submit interrupted:" + e));
        }
    }

    @Override
    public void submitInbound(InboundUnitRunnable unit) {
        try {
            this.inboundQueue.put(unit);
        }
        catch (InterruptedException e) {
            log.info((Object)("Submit interrupted:" + e));
        }
    }

    @Override
    public void submitOutbound(OutboundUnitRunnable unit) {
        try {
            this.outboundQueue.put(unit);
        }
        catch (InterruptedException e) {
            log.info((Object)("Submit interrupted:" + e));
        }
    }

    @Override
    public void submitTimerWork(TimerUnit unit) {
        try {
            this.timerQueue.put(unit);
        }
        catch (InterruptedException e) {
            log.info((Object)("Submit interrupted:" + e));
        }
    }

    @Override
    public BlockingQueue<Runnable> getOutboundQueue() {
        return this.outboundQueue;
    }

    @Override
    public ThreadPoolExecutor getOutboundThreadPool() {
        return this.outboundThreadPool;
    }

    @Override
    public BlockingQueue<Runnable> getRouteQueue() {
        return this.routeQueue;
    }

    @Override
    public ThreadPoolExecutor getRouteThreadPool() {
        return this.routeThreadPool;
    }

    @Override
    public BlockingQueue<Runnable> getTimerQueue() {
        return this.timerQueue;
    }

    @Override
    public ThreadPoolExecutor getTimerThreadPool() {
        return this.timerThreadPool;
    }

    @Override
    public BlockingQueue<Runnable> getInboundQueue() {
        return this.inboundQueue;
    }

    @Override
    public ThreadPoolExecutor getInboundThreadPool() {
        return this.inboundThreadPool;
    }

    @Override
    public synchronized void destroy() {
        if (this.timerThreadPool != null) {
            this.stopPool(this.timerThreadPool, this.timerQueue, "TimerExec");
        }
        if (this.routeThreadPool != null) {
            this.stopPool(this.routeThreadPool, this.routeQueue, "RouteExec");
        }
        if (this.outboundThreadPool != null) {
            this.stopPool(this.outboundThreadPool, this.outboundQueue, "Outbound");
        }
        if (this.inboundThreadPool != null) {
            this.stopPool(this.inboundThreadPool, this.inboundQueue, "Inbound");
        }
        this.timerThreadPool = null;
        this.routeThreadPool = null;
        this.outboundThreadPool = null;
        this.inboundThreadPool = null;
    }

    private ThreadPoolExecutor getThreadPool(String engineURI, String name, BlockingQueue<Runnable> queue, int numThreads) {
        if (log.isInfoEnabled()) {
            log.info((Object)("Starting pool " + name + " with " + numThreads + " threads"));
        }
        if (engineURI == null) {
            engineURI = "default";
        }
        String threadGroupName = "com.espertech.esper." + engineURI + "-" + name;
        ThreadGroup threadGroup = new ThreadGroup(threadGroupName);
        ThreadPoolExecutor pool = new ThreadPoolExecutor(numThreads, numThreads, 1L, TimeUnit.SECONDS, queue, new EngineThreadFactory(engineURI, name, threadGroup, 5));
        pool.prestartAllCoreThreads();
        return pool;
    }

    @Override
    public Thread makeEventSourceThread(String engineURI, String sourceName, Runnable runnable) {
        if (engineURI == null) {
            engineURI = "default";
        }
        String threadGroupName = "com.espertech.esper." + engineURI + "-source-" + sourceName;
        ThreadGroup threadGroup = new ThreadGroup(threadGroupName);
        return new Thread(threadGroup, runnable);
    }

    private void stopPool(ThreadPoolExecutor threadPool, BlockingQueue<Runnable> queue, String name) {
        if (log.isInfoEnabled()) {
            log.info((Object)("Shutting down pool " + name));
        }
        queue.clear();
        threadPool.shutdown();
        try {
            threadPool.awaitTermination(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            log.error((Object)"Interruped awaiting termination", (Throwable)e);
        }
    }
}

