/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.job.task.support.result.job;

import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.pekko.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.server.job.task.support.result.job.AbstractJobExecutorResultHandler;
import com.aizuda.snailjob.server.job.task.support.result.job.JobExecutorResultContext;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import java.util.Objects;
import org.apache.pekko.actor.ActorRef;
import org.springframework.stereotype.Component;

@Component
public class MapReduceJobExecutorHandler
extends AbstractJobExecutorResultHandler {
    public MapReduceJobExecutorHandler(JobTaskMapper jobTaskMapper, JobTaskBatchMapper jobTaskBatchMapper, WorkflowBatchHandler workflowBatchHandler, GroupConfigMapper groupConfigMapper) {
        super(jobTaskMapper, jobTaskBatchMapper, workflowBatchHandler, groupConfigMapper);
    }

    @Override
    public JobTaskTypeEnum getTaskInstanceType() {
        return JobTaskTypeEnum.MAP_REDUCE;
    }

    @Override
    protected void doHandleSuccess(JobExecutorResultContext context) {
        context.setCreateReduceTask(this.needReduceTask(context));
    }

    @Override
    protected void doHandleStop(JobExecutorResultContext context) {
    }

    @Override
    protected void doHandleFail(JobExecutorResultContext context) {
    }

    @Override
    protected boolean updateStatus(JobExecutorResultContext context, Integer status) {
        if (context.isCreateReduceTask()) {
            return false;
        }
        return super.updateStatus(context, status);
    }

    private boolean needReduceTask(JobExecutorResultContext context) {
        int mrStage;
        int reduceCount = 0;
        int mapCount = 0;
        for (JobTask jobTask : context.getJobTaskList()) {
            if (Objects.isNull(jobTask.getMrStage())) continue;
            if (MapReduceStageEnum.MERGE_REDUCE.getStage() == jobTask.getMrStage().intValue()) {
                return false;
            }
            if (MapReduceStageEnum.REDUCE.getStage() == jobTask.getMrStage().intValue()) {
                ++reduceCount;
                continue;
            }
            if (MapReduceStageEnum.MAP.getStage() != jobTask.getMrStage().intValue()) continue;
            ++mapCount;
        }
        if (reduceCount > 1) {
            mrStage = MapReduceStageEnum.MERGE_REDUCE.getStage();
        } else if (mapCount == context.getJobTaskList().size()) {
            mrStage = MapReduceStageEnum.REDUCE.getStage();
        } else {
            return false;
        }
        try {
            ReduceTaskDTO reduceTaskDTO = JobTaskConverter.INSTANCE.toReduceTaskDTO(context);
            reduceTaskDTO.setMrStage(mrStage);
            ActorRef actorRef = ActorGenerator.jobReduceActor();
            actorRef.tell((Object)reduceTaskDTO, actorRef);
            return true;
        }
        catch (Exception e) {
            SnailJobLog.LOCAL.error("tell reduce actor error", new Object[]{e});
            return false;
        }
    }

    @Override
    protected void openNextWorkflowNode(JobExecutorResultContext context) {
        if (context.isCreateReduceTask()) {
            return;
        }
        super.openNextWorkflowNode(context);
    }
}

