/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.task.batcher;

import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.task.batcher.AcceptorExecutor;
import com.alipay.sofa.registry.task.batcher.TaskHolder;
import com.alipay.sofa.registry.task.batcher.TaskProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

public class TaskExecutors<ID, T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutors.class);
    private final AtomicBoolean isShutdown;
    private final List<Thread> workerThreads;

    TaskExecutors(Function<Integer, WorkerRunnable<ID, T>> workerRunnableFactory, int workerCount, AtomicBoolean isShutdown) {
        this.isShutdown = isShutdown;
        this.workerThreads = new ArrayList<Thread>();
        ThreadGroup threadGroup = new ThreadGroup("serverTaskExecutors");
        for (int i = 0; i < workerCount; ++i) {
            WorkerRunnable<ID, T> runnable = workerRunnableFactory.apply(i);
            Thread workerThread = new Thread(threadGroup, runnable, runnable.getWorkerName());
            this.workerThreads.add(workerThread);
            workerThread.setDaemon(true);
            workerThread.start();
        }
    }

    static <ID, T> TaskExecutors<ID, T> createTaskExecutors(String name, int workerCount, TaskProcessor<T> processor, AcceptorExecutor<ID, T> acceptorExecutor) {
        AtomicBoolean isShutdown = new AtomicBoolean();
        return new TaskExecutors<ID, T>(idx -> new WorkerRunnable(name + '-' + idx, isShutdown, processor, acceptorExecutor), workerCount, isShutdown);
    }

    void shutdown() {
        if (this.isShutdown.compareAndSet(false, true)) {
            for (Thread workerThread : this.workerThreads) {
                workerThread.interrupt();
            }
        }
    }

    static class WorkerRunnable<ID, T>
    implements Runnable {
        final String workerName;
        final AtomicBoolean isShutdown;
        final TaskProcessor<T> processor;
        final AcceptorExecutor<ID, T> acceptorExecutor;

        WorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskProcessor<T> processor, AcceptorExecutor<ID, T> acceptorExecutor) {
            this.workerName = workerName;
            this.isShutdown = isShutdown;
            this.processor = processor;
            this.acceptorExecutor = acceptorExecutor;
        }

        String getWorkerName() {
            return this.workerName;
        }

        @Override
        public void run() {
            try {
                while (!this.isShutdown.get()) {
                    try {
                        TaskHolder<ID, T> taskHolder;
                        BlockingQueue<TaskHolder<ID, T>> workQueue = this.acceptorExecutor.requestWorkItem();
                        while ((taskHolder = workQueue.poll(1L, TimeUnit.SECONDS)) == null) {
                            if (!this.isShutdown.get()) continue;
                            return;
                        }
                        TaskProcessor.ProcessingResult result = this.processor.process(taskHolder.getTask());
                        switch (result) {
                            case Success: {
                                break;
                            }
                            case Congestion: {
                                this.acceptorExecutor.reprocess(taskHolder, result);
                                break;
                            }
                            case TransientError: {
                                this.acceptorExecutor.reprocess(taskHolder, result);
                                break;
                            }
                            case PermanentError: {
                                LOGGER.warn("Discarding a task of {} due to permanent error", (Object)this.workerName);
                                break;
                            }
                        }
                    }
                    catch (InterruptedException workQueue) {
                    }
                    catch (Throwable e) {
                        LOGGER.error("Single WorkerThread process error", e);
                    }
                }
            }
            catch (Throwable e) {
                LOGGER.error("WorkerThread error", e);
            }
        }
    }
}

