package tech.powerjob.server.core.workflow;

import com.alibaba.fastjson.JSON;
import java.util.Date;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.SwitchableStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.response.WorkflowInstanceInfoDTO;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.core.lock.UseCacheLock;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import tech.powerjob.server.remote.server.redirector.DesignateServer;

@Service
/* loaded from: input_file:tech/powerjob/server/core/workflow/WorkflowInstanceService.class */
public class WorkflowInstanceService {
    private static final Logger log = LoggerFactory.getLogger(WorkflowInstanceService.class);
    private final InstanceService instanceService;
    private final WorkflowInstanceInfoRepository wfInstanceInfoRepository;
    private final WorkflowInstanceManager workflowInstanceManager;
    private final WorkflowInfoRepository workflowInfoRepository;

    public void stopWorkflowInstanceEntrance(Long l, Long l2) {
        WorkflowInstanceInfoDO fetchWfInstance = fetchWfInstance(l, l2);
        if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(fetchWfInstance.getStatus())) {
            throw new PowerJobException("workflow instance already stopped");
        }
        if (fetchWfInstance.getParentWfInstanceId() != null) {
            ((WorkflowInstanceService) SpringUtils.getBean(getClass())).stopWorkflowInstance(fetchWfInstance.getParentWfInstanceId(), l2);
        } else {
            ((WorkflowInstanceService) SpringUtils.getBean(getClass())).stopWorkflowInstance(l, l2);
        }
    }

    @UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024)
    @DesignateServer
    public void stopWorkflowInstance(Long l, Long l2) {
        WorkflowInstanceInfoDO fetchWfInstance = fetchWfInstance(l, l2);
        if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(fetchWfInstance.getStatus())) {
            throw new PowerJobException("workflow instance already stopped");
        }
        PEWorkflowDAG pEWorkflowDAG = (PEWorkflowDAG) JSON.parseObject(fetchWfInstance.getDag(), PEWorkflowDAG.class);
        pEWorkflowDAG.getNodes().forEach(node -> {
            try {
                if (node.getInstanceId() != null && InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) {
                    log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", l, node.getInstanceId());
                    node.setStatus(Integer.valueOf(InstanceStatus.STOPPED.getV()));
                    node.setResult("stopped by user");
                    if (Objects.equals(node.getNodeType(), Integer.valueOf(WorkflowNodeType.NESTED_WORKFLOW.getCode()))) {
                        stopWorkflowInstance(node.getInstanceId(), l2);
                    } else {
                        this.instanceService.stopInstance(l2, node.getInstanceId());
                    }
                }
            } catch (Exception e) {
                log.warn("[WfInstance-{}] stop instance({}) failed.", new Object[]{l, JSON.toJSONString(node), e});
            }
        });
        fetchWfInstance.setDag(JSON.toJSONString(pEWorkflowDAG));
        fetchWfInstance.setStatus(Integer.valueOf(WorkflowInstanceStatus.STOPPED.getV()));
        fetchWfInstance.setResult("stopped by user");
        fetchWfInstance.setGmtModified(new Date());
        this.wfInstanceInfoRepository.saveAndFlush(fetchWfInstance);
        log.info("[WfInstance-{}] stop workflow instance successfully~", l);
    }

    @UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024)
    @DesignateServer
    public void retryWorkflowInstance(Long l, Long l2) {
        WorkflowInstanceInfoDO fetchWfInstance = fetchWfInstance(l, l2);
        if (WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(fetchWfInstance.getStatus())) {
            throw new PowerJobException("workflow instance is running");
        }
        if (fetchWfInstance.getStatus().intValue() == WorkflowInstanceStatus.SUCCEED.getV()) {
            throw new PowerJobException("workflow instance is already successful");
        }
        if ("can't find some job".equals(fetchWfInstance.getResult())) {
            throw new PowerJobException("you can't retry the workflow instance which is missing job info!");
        }
        try {
            PEWorkflowDAG pEWorkflowDAG = (PEWorkflowDAG) JSON.parseObject(fetchWfInstance.getDag(), PEWorkflowDAG.class);
            if (!WorkflowDAGUtils.valid(pEWorkflowDAG)) {
                throw new PowerJobException("invalid dag");
            }
            Optional findById = this.workflowInfoRepository.findById(fetchWfInstance.getWorkflowId());
            if (!findById.isPresent() || ((WorkflowInfoDO) findById.get()).getStatus().intValue() == SwitchableStatus.DISABLE.getV()) {
                throw new PowerJobException("you can't retry the workflow instance whose metadata is unavailable!");
            }
            WorkflowDAGUtils.resetRetryableNode(pEWorkflowDAG);
            fetchWfInstance.setDag(JSON.toJSONString(pEWorkflowDAG));
            fetchWfInstance.setStatus(Integer.valueOf(WorkflowInstanceStatus.WAITING.getV()));
            fetchWfInstance.setGmtModified(new Date());
            this.wfInstanceInfoRepository.saveAndFlush(fetchWfInstance);
            this.workflowInstanceManager.start((WorkflowInfoDO) findById.get(), l);
        } catch (Exception e) {
            throw new PowerJobException("you can't retry the workflow instance whose DAG is illegal!");
        }
    }

    public WorkflowInstanceInfoDTO fetchWorkflowInstanceInfo(Long l, Long l2) {
        WorkflowInstanceInfoDO fetchWfInstance = fetchWfInstance(l, l2);
        WorkflowInstanceInfoDTO workflowInstanceInfoDTO = new WorkflowInstanceInfoDTO();
        BeanUtils.copyProperties(fetchWfInstance, workflowInstanceInfoDTO);
        return workflowInstanceInfoDTO;
    }

    public WorkflowInstanceInfoDO fetchWfInstance(Long l, Long l2) {
        WorkflowInstanceInfoDO workflowInstanceInfoDO = (WorkflowInstanceInfoDO) this.wfInstanceInfoRepository.findByWfInstanceId(l).orElseThrow(() -> {
            return new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + l);
        });
        if (Objects.equals(l2, workflowInstanceInfoDO.getAppId())) {
            return workflowInstanceInfoDO;
        }
        throw new PowerJobException("Permission Denied!");
    }

    @UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024)
    @DesignateServer
    public void markNodeAsSuccess(Long l, Long l2, Long l3) {
        WorkflowInstanceInfoDO fetchWfInstance = fetchWfInstance(l2, l);
        if (WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(fetchWfInstance.getStatus())) {
            throw new PowerJobException("you can't mark the node in a running workflow!");
        }
        PEWorkflowDAG pEWorkflowDAG = (PEWorkflowDAG) JSON.parseObject(fetchWfInstance.getDag(), PEWorkflowDAG.class);
        PEWorkflowDAG.Node node = null;
        Iterator it = pEWorkflowDAG.getNodes().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            PEWorkflowDAG.Node node2 = (PEWorkflowDAG.Node) it.next();
            if (node2.getNodeId().equals(l3)) {
                node = node2;
                break;
            }
        }
        if (node == null) {
            throw new PowerJobException("can't find the node in current DAG!");
        }
        boolean z = node.getSkipWhenFailed() != null && node.getSkipWhenFailed().booleanValue();
        if (node.getInstanceId() == null || node.getStatus().intValue() != InstanceStatus.FAILED.getV() || z) {
            throw new PowerJobException("you can only mark the node which is failed and not allow to skip!");
        }
        node.setStatus(Integer.valueOf(InstanceStatus.SUCCEED.getV())).setResult("mark as successful node");
        fetchWfInstance.setDag(JSON.toJSONString(pEWorkflowDAG));
        this.wfInstanceInfoRepository.saveAndFlush(fetchWfInstance);
    }

    public WorkflowInstanceService(InstanceService instanceService, WorkflowInstanceInfoRepository workflowInstanceInfoRepository, WorkflowInstanceManager workflowInstanceManager, WorkflowInfoRepository workflowInfoRepository) {
        this.instanceService = instanceService;
        this.wfInstanceInfoRepository = workflowInstanceInfoRepository;
        this.workflowInstanceManager = workflowInstanceManager;
        this.workflowInfoRepository = workflowInfoRepository;
    }
}
