package org.apache.dolphinscheduler.server.worker.runner;

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.class */
public class WorkerManagerThread implements Runnable {
    private final WorkerExecService workerExecService;
    private final WorkerConfig workerConfig;
    private final int workerExecThreads;
    private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
    private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap = new ConcurrentHashMap<>();
    private final DelayQueue<WorkerDelayTaskExecuteRunnable> waitSubmitQueue = new DelayQueue<>();

    public WorkerManagerThread(WorkerConfig workerConfig) {
        this.workerConfig = workerConfig;
        this.workerExecThreads = workerConfig.getExecThreads();
        this.workerExecService = new WorkerExecService(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()), this.taskExecuteThreadMap);
    }

    @Nullable
    public WorkerTaskExecuteRunnable getTaskExecuteThread(Integer num) {
        return this.taskExecuteThreadMap.get(num);
    }

    public int getWaitSubmitQueueSize() {
        return this.waitSubmitQueue.size();
    }

    public int getThreadPoolQueueSize() {
        return this.workerExecService.getThreadPoolQueueSize();
    }

    public void killTaskBeforeExecuteByInstanceId(Integer num) {
        Stream filter = this.waitSubmitQueue.stream().filter(workerDelayTaskExecuteRunnable -> {
            return workerDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId() == num.intValue();
        });
        DelayQueue<WorkerDelayTaskExecuteRunnable> delayQueue = this.waitSubmitQueue;
        Objects.requireNonNull(delayQueue);
        filter.forEach((v1) -> {
            r1.remove(v1);
        });
    }

    public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
        if (this.workerConfig.getTaskExecuteThreadsFullPolicy() == TaskExecuteThreadsFullPolicy.CONTINUE) {
            return this.waitSubmitQueue.offer((DelayQueue<WorkerDelayTaskExecuteRunnable>) workerDelayTaskExecuteRunnable);
        }
        if (this.waitSubmitQueue.size() > this.workerExecThreads) {
            this.logger.warn("Wait submit queue is full, will retry submit task later");
            WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
            ThreadUtils.sleep(1000L);
            if (this.waitSubmitQueue.size() > this.workerExecThreads) {
                return false;
            }
        }
        return this.waitSubmitQueue.offer((DelayQueue<WorkerDelayTaskExecuteRunnable>) workerDelayTaskExecuteRunnable);
    }

    public void start() {
        this.logger.info("Worker manager thread starting");
        Thread thread = new Thread(this, getClass().getName());
        thread.setDaemon(true);
        thread.start();
        this.logger.info("Worker manager thread started");
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread.currentThread().setName("Worker-Execute-Manager-Thread");
        while (!ServerLifeCycleManager.isStopped()) {
            try {
                if (!ServerLifeCycleManager.isRunning()) {
                    Thread.sleep(1000L);
                }
                if (getThreadPoolQueueSize() <= this.workerExecThreads) {
                    this.workerExecService.submit(this.waitSubmitQueue.take());
                } else {
                    WorkerServerMetrics.incWorkerOverloadCount();
                    this.logger.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}", Integer.valueOf(getWaitSubmitQueueSize()), Integer.valueOf(getThreadPoolQueueSize()));
                    ThreadUtils.sleep(1000L);
                }
            } catch (Exception e) {
                this.logger.error("An unexpected interrupt is happened, the exception will be ignored and this thread will continue to run", e);
            }
        }
    }

    public void clearTask() {
        this.waitSubmitQueue.clear();
        this.workerExecService.getTaskExecuteThreadMap().values().forEach(workerTaskExecuteRunnable -> {
            int taskInstanceId = workerTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
            try {
                try {
                    workerTaskExecuteRunnable.cancelTask();
                    this.logger.info("Cancel the taskInstance in worker  {}", Integer.valueOf(taskInstanceId));
                    TaskExecutionContextCacheManager.removeByTaskInstanceId(Integer.valueOf(taskInstanceId));
                } catch (Exception e) {
                    this.logger.error("Cancel the taskInstance error {}", Integer.valueOf(taskInstanceId), e);
                    TaskExecutionContextCacheManager.removeByTaskInstanceId(Integer.valueOf(taskInstanceId));
                }
            } catch (Throwable th) {
                TaskExecutionContextCacheManager.removeByTaskInstanceId(Integer.valueOf(taskInstanceId));
                throw th;
            }
        });
        this.workerExecService.getTaskExecuteThreadMap().clear();
    }
}
