/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.runner;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.common.base.Strings;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import javax.annotation.Nullable;
import lombok.NonNull;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.runner.TaskCallbackImpl;
import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.storage.StorageOperate;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.service.utils.CommonUtils;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import org.apache.dolphinscheduler.service.utils.ProcessUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class WorkerTaskExecuteRunnable
implements Runnable {
    protected final Logger logger = LoggerFactory.getLogger((String)String.format("TaskLogLogger-%s", WorkerTaskExecuteRunnable.class));
    protected final TaskExecutionContext taskExecutionContext;
    protected final WorkerConfig workerConfig;
    protected final String masterAddress;
    protected final WorkerMessageSender workerMessageSender;
    protected final AlertClientService alertClientService;
    protected final TaskPluginManager taskPluginManager;
    @Nullable
    protected final StorageOperate storageOperate;
    @Nullable
    protected AbstractTask task;

    protected WorkerTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext, @NonNull WorkerConfig workerConfig, @NonNull String masterAddress, @NonNull WorkerMessageSender workerMessageSender, @NonNull AlertClientService alertClientService, @NonNull TaskPluginManager taskPluginManager, @Nullable StorageOperate storageOperate) {
        if (taskExecutionContext == null) {
            throw new NullPointerException("taskExecutionContext is marked non-null but is null");
        }
        if (workerConfig == null) {
            throw new NullPointerException("workerConfig is marked non-null but is null");
        }
        if (masterAddress == null) {
            throw new NullPointerException("masterAddress is marked non-null but is null");
        }
        if (workerMessageSender == null) {
            throw new NullPointerException("workerMessageSender is marked non-null but is null");
        }
        if (alertClientService == null) {
            throw new NullPointerException("alertClientService is marked non-null but is null");
        }
        if (taskPluginManager == null) {
            throw new NullPointerException("taskPluginManager is marked non-null but is null");
        }
        this.taskExecutionContext = taskExecutionContext;
        this.workerConfig = workerConfig;
        this.masterAddress = masterAddress;
        this.workerMessageSender = workerMessageSender;
        this.alertClientService = alertClientService;
        this.taskPluginManager = taskPluginManager;
        this.storageOperate = storageOperate;
        String taskLogName = LoggerUtils.buildTaskId((Date)taskExecutionContext.getFirstSubmitTime(), (Long)taskExecutionContext.getProcessDefineCode(), (int)taskExecutionContext.getProcessDefineVersion(), (int)taskExecutionContext.getProcessInstanceId(), (int)taskExecutionContext.getTaskInstanceId());
        taskExecutionContext.setTaskLogName(taskLogName);
        this.logger.info("Set task logger name: {}", (Object)taskLogName);
    }

    protected abstract void executeTask(TaskCallBack var1);

    protected void afterExecute() throws TaskException {
        if (this.task == null) {
            throw new TaskException("The current task instance is null");
        }
        this.sendAlertIfNeeded();
        this.sendTaskResult();
        TaskExecutionContextCacheManager.removeByTaskInstanceId((Integer)this.taskExecutionContext.getTaskInstanceId());
        this.logger.info("Remove the current task execute context from worker cache");
        this.clearTaskExecPathIfNeeded();
    }

    protected void afterThrowing(Throwable throwable) throws TaskException {
        this.cancelTask();
        TaskExecutionContextCacheManager.removeByTaskInstanceId((Integer)this.taskExecutionContext.getTaskInstanceId());
        this.taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
        this.taskExecutionContext.setEndTime(new Date());
        this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, this.masterAddress, CommandType.TASK_EXECUTE_RESULT);
        this.logger.info("Get a exception when execute the task, will send the task execute result to master, the current task execute result is {}", (Object)TaskExecutionStatus.FAILURE);
    }

    public void cancelTask() {
        if (this.task != null) {
            try {
                this.task.cancel();
                List appIds = LogUtils.getAppIdsFromLogFile((String)this.taskExecutionContext.getLogPath());
                if (CollectionUtils.isNotEmpty((Collection)appIds)) {
                    ProcessUtils.cancelApplication((List)appIds, (Logger)this.logger, (String)this.taskExecutionContext.getTenantCode(), (String)this.taskExecutionContext.getExecutePath());
                }
            }
            catch (Exception e) {
                this.logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", (Throwable)e);
            }
        }
    }

    @Override
    public void run() {
        try {
            Thread.currentThread().setName(this.taskExecutionContext.getTaskLogName());
            LoggerUtils.setWorkflowAndTaskInstanceIDMDC((Integer)this.taskExecutionContext.getProcessInstanceId(), (Integer)this.taskExecutionContext.getTaskInstanceId());
            this.logger.info("Begin to pulling task");
            this.initializeTask();
            if (1 == this.taskExecutionContext.getDryRun()) {
                this.taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
                this.taskExecutionContext.setEndTime(new Date());
                TaskExecutionContextCacheManager.removeByTaskInstanceId((Integer)this.taskExecutionContext.getTaskInstanceId());
                this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, this.masterAddress, CommandType.TASK_EXECUTE_RESULT);
                this.logger.info("The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
                return;
            }
            this.beforeExecute();
            TaskCallbackImpl taskCallBack = TaskCallbackImpl.builder().workerMessageSender(this.workerMessageSender).masterAddress(this.masterAddress).build();
            this.executeTask(taskCallBack);
            this.afterExecute();
        }
        catch (Throwable ex) {
            this.logger.error("Task execute failed, due to meet an exception", ex);
            this.afterThrowing(ex);
        }
        finally {
            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
        }
    }

    protected void initializeTask() {
        this.logger.info("Begin to initialize task");
        Date taskStartTime = new Date();
        this.taskExecutionContext.setStartTime(taskStartTime);
        this.logger.info("Set task startTime: {}", (Object)taskStartTime);
        String systemEnvPath = CommonUtils.getSystemEnvPath();
        this.taskExecutionContext.setEnvFile(systemEnvPath);
        this.logger.info("Set task envFile: {}", (Object)systemEnvPath);
        String taskAppId = String.format("%s_%s", this.taskExecutionContext.getProcessInstanceId(), this.taskExecutionContext.getTaskInstanceId());
        this.taskExecutionContext.setTaskAppId(taskAppId);
        this.logger.info("Set task appId: {}", (Object)taskAppId);
        this.logger.info("End initialize task");
    }

    protected void beforeExecute() {
        this.taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
        this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, this.masterAddress, CommandType.TASK_EXECUTE_RUNNING);
        this.logger.info("Set task status to {}", (Object)TaskExecutionStatus.RUNNING_EXECUTION);
        TaskExecutionCheckerUtils.checkTenantExist(this.workerConfig, this.taskExecutionContext);
        this.logger.info("TenantCode:{} check success", (Object)this.taskExecutionContext.getTenantCode());
        TaskExecutionCheckerUtils.createProcessLocalPathIfAbsent(this.taskExecutionContext);
        this.logger.info("ProcessExecDir:{} check success", (Object)this.taskExecutionContext.getExecutePath());
        TaskExecutionCheckerUtils.downloadResourcesIfNeeded(this.storageOperate, this.taskExecutionContext, this.logger);
        this.logger.info("Resources:{} check success", (Object)this.taskExecutionContext.getResources());
        TaskChannel taskChannel = (TaskChannel)this.taskPluginManager.getTaskChannelMap().get(this.taskExecutionContext.getTaskType());
        if (null == taskChannel) {
            throw new TaskPluginException(String.format("%s task plugin not found, please check config file.", this.taskExecutionContext.getTaskType()));
        }
        this.task = taskChannel.createTask(this.taskExecutionContext);
        if (this.task == null) {
            throw new TaskPluginException(String.format("%s task is null, please check the task plugin is correct", this.taskExecutionContext.getTaskType()));
        }
        this.logger.info("Task plugin: {} create success", (Object)this.taskExecutionContext.getTaskType());
        this.task.init();
        this.logger.info("Success initialized task plugin instance success");
        this.task.getParameters().setVarPool(this.taskExecutionContext.getVarPool());
        this.logger.info("Success set taskVarPool: {}", (Object)this.taskExecutionContext.getVarPool());
    }

    protected void sendAlertIfNeeded() {
        if (!this.task.getNeedAlert()) {
            return;
        }
        this.logger.info("The current task need to send alert, begin to send alert");
        TaskExecutionStatus status = this.task.getExitStatus();
        TaskAlertInfo taskAlertInfo = this.task.getTaskAlertInfo();
        int strategy = status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
        this.alertClientService.sendAlert(taskAlertInfo.getAlertGroupId().intValue(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), strategy);
        this.logger.info("Success send alert");
    }

    protected void sendTaskResult() {
        this.taskExecutionContext.setCurrentExecutionStatus(this.task.getExitStatus());
        this.taskExecutionContext.setEndTime(new Date());
        this.taskExecutionContext.setProcessId(this.task.getProcessId());
        this.taskExecutionContext.setAppIds(this.task.getAppIds());
        this.taskExecutionContext.setVarPool(JSONUtils.toJsonString((Object)this.task.getParameters().getVarPool()));
        this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, this.masterAddress, CommandType.TASK_EXECUTE_RESULT);
        this.logger.info("Send task execute result to master, the current task status: {}", (Object)this.taskExecutionContext.getCurrentExecutionStatus());
    }

    protected void clearTaskExecPathIfNeeded() {
        String execLocalPath = this.taskExecutionContext.getExecutePath();
        if (!CommonUtils.isDevelopMode()) {
            this.logger.info("The current execute mode isn't develop mode, will clear the task execute file: {}", (Object)execLocalPath);
            if (Strings.isNullOrEmpty((String)execLocalPath)) {
                this.logger.warn("The task execute file is {} no need to clear", (Object)this.taskExecutionContext.getTaskName());
                return;
            }
            if ("/".equals(execLocalPath)) {
                this.logger.warn("The task execute file is '/', direct deletion is not allowed");
                return;
            }
            try {
                FileUtils.deleteDirectory((File)new File(execLocalPath));
                this.logger.info("Success clear the task execute file: {}", (Object)execLocalPath);
            }
            catch (IOException e) {
                if (!(e instanceof NoSuchFileException)) {
                    this.logger.error("Delete task execute file: {} failed, this will not affect the task status, but you need to clear this manually", (Object)execLocalPath, (Object)e);
                }
            }
        } else {
            this.logger.info("The current execute mode is develop mode, will not clear the task execute file: {}", (Object)execLocalPath);
        }
    }

    @NonNull
    public TaskExecutionContext getTaskExecutionContext() {
        return this.taskExecutionContext;
    }

    @Nullable
    public AbstractTask getTask() {
        return this.task;
    }
}

