/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.job.task.support.generator.task;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.allocate.client.ClientLoadBalanceManager;
import com.aizuda.snailjob.server.common.dto.InstanceLiveInfo;
import com.aizuda.snailjob.server.common.dto.InstanceSelectCondition;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.InstanceManager;
import com.aizuda.snailjob.server.common.triple.Pair;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.MapReduceArgsStrDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.generator.task.AbstractJobTaskGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.google.common.collect.Lists;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import lombok.Generated;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;

@Component
public class MapReduceTaskGenerator
extends AbstractJobTaskGenerator {
    private static final String MERGE_REDUCE_TASK = "MERGE_REDUCE_TASK";
    private static final String REDUCE_TASK = "REDUCE_TASK";
    private final JobTaskMapper jobTaskMapper;
    private final TransactionTemplate transactionTemplate;
    private final InstanceManager instanceManager;

    @Override
    public JobTaskTypeEnum getTaskInstanceType() {
        return JobTaskTypeEnum.MAP_REDUCE;
    }

    @Override
    protected List<JobTask> doGenerate(JobTaskGenerateContext context) {
        MapReduceStageEnum mapReduceStageEnum = MapReduceStageEnum.ofStage((Integer)context.getMrStage());
        Assert.notNull((Object)mapReduceStageEnum, () -> new SnailJobServerException("Map reduce stage is not existed"));
        switch (Objects.requireNonNull(mapReduceStageEnum)) {
            case MAP: {
                return this.createMapJobTasks(context);
            }
            case REDUCE: {
                return this.createReduceJobTasks(context);
            }
            case MERGE_REDUCE: {
                return this.createMergeReduceJobTasks(context);
            }
        }
        throw new SnailJobServerException("Map reduce stage is not existed");
    }

    private List<JobTask> createMergeReduceJobTasks(JobTaskGenerateContext context) {
        List jobTasks = this.jobTaskMapper.selectList((Wrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{JobTask::getResultMessage}).eq(JobTask::getTaskBatchId, (Object)context.getTaskBatchId())).eq(JobTask::getMrStage, (Object)MapReduceStageEnum.REDUCE.getStage())).eq(JobTask::getLeaf, (Object)StatusEnum.YES.getStatus()));
        MapReduceArgsStrDTO jobParams = this.getJobParams(context);
        Pair<String, Integer> clientInfo = this.getClientNodeInfo(context);
        JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
        jobTask.setClientInfo((String)clientInfo.getKey());
        jobTask.setArgsType(context.getArgsType());
        JobArgsHolder jobArgsHolder = new JobArgsHolder();
        jobArgsHolder.setJobParams((Object)jobParams.getArgsStr());
        jobArgsHolder.setReduces((Object)StreamUtils.toList((Collection)jobTasks, JobTask::getResultMessage));
        jobTask.setArgsStr(JsonUtil.toJsonString((Object)jobArgsHolder));
        jobTask.setTaskStatus((Integer)clientInfo.getValue());
        jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(""));
        jobTask.setMrStage(Integer.valueOf(MapReduceStageEnum.MERGE_REDUCE.getStage()));
        jobTask.setTaskName(MERGE_REDUCE_TASK);
        Assert.isTrue((1 == this.jobTaskMapper.insert((Object)jobTask) ? 1 : 0) != 0, () -> new SnailJobServerException("Adding new task instance failed"));
        return Lists.newArrayList((Object[])new JobTask[]{jobTask});
    }

    private List<JobTask> createReduceJobTasks(final JobTaskGenerateContext context) {
        final MapReduceArgsStrDTO jobParams = this.getJobParams(context);
        int reduceParallel = Math.max(1, Optional.ofNullable(jobParams.getShardNum()).orElse(1));
        ArrayList<JobTask> jobTasks = this.jobTaskMapper.selectList((Wrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{JobTask::getResultMessage, JobTask::getId}).eq(JobTask::getTaskBatchId, (Object)context.getTaskBatchId())).eq(JobTask::getMrStage, (Object)MapReduceStageEnum.MAP.getStage())).eq(JobTask::getLeaf, (Object)StatusEnum.YES.getStatus()));
        if (CollUtil.isEmpty((Collection)jobTasks)) {
            return Lists.newArrayList();
        }
        List allMapJobTasks = StreamUtils.toList((Collection)jobTasks, JobTask::getResultMessage);
        final List<List<String>> partition = this.averageAlgorithm(allMapJobTasks, reduceParallel);
        final ArrayList<JobTask> finalJobTasks = jobTasks = new ArrayList<JobTask>(partition.size());
        this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult(){

            protected void doInTransactionWithoutResult(TransactionStatus status) {
                for (int index = 0; index < partition.size(); ++index) {
                    Pair<String, Integer> clientInfo = MapReduceTaskGenerator.this.getClientNodeInfo(context);
                    JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
                    jobTask.setClientInfo((String)clientInfo.getKey());
                    jobTask.setArgsType(context.getArgsType());
                    JobArgsHolder jobArgsHolder = new JobArgsHolder();
                    jobArgsHolder.setJobParams((Object)jobParams.getArgsStr());
                    jobArgsHolder.setMaps(partition.get(index));
                    jobTask.setArgsStr(JsonUtil.toJsonString((Object)jobArgsHolder));
                    jobTask.setTaskStatus((Integer)clientInfo.getValue());
                    jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(""));
                    jobTask.setMrStage(Integer.valueOf(MapReduceStageEnum.REDUCE.getStage()));
                    jobTask.setTaskName(MapReduceTaskGenerator.REDUCE_TASK);
                    jobTask.setParentId(Long.valueOf(0L));
                    jobTask.setRetryCount(Integer.valueOf(0));
                    jobTask.setLeaf(StatusEnum.YES.getStatus());
                    jobTask.setCreateDt(LocalDateTime.now());
                    jobTask.setUpdateDt(LocalDateTime.now());
                    finalJobTasks.add(jobTask);
                }
                MapReduceTaskGenerator.this.batchSaveJobTasks(finalJobTasks);
            }
        });
        return finalJobTasks;
    }

    private List<JobTask> createMapJobTasks(JobTaskGenerateContext context) {
        List<?> mapSubTask = context.getMapSubTask();
        if (CollUtil.isEmpty(mapSubTask)) {
            SnailJobLog.LOCAL.warn("Map sub task is empty. TaskBatchId:[{}]", new Object[]{context.getTaskBatchId()});
            return Lists.newArrayList();
        }
        MapReduceArgsStrDTO jobParams = this.getJobParams(context);
        JobTask parentJobTask = (JobTask)this.jobTaskMapper.selectOne((Wrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{JobTask::getId}).eq(JobTask::getId, (Object)Optional.ofNullable(context.getParentId()).orElse(0L))).eq(JobTask::getLeaf, (Object)StatusEnum.YES.getStatus()));
        return (List)this.transactionTemplate.execute(status -> {
            ArrayList<JobTask> jobTasks = new ArrayList<JobTask>(mapSubTask.size());
            for (int index = 0; index < mapSubTask.size(); ++index) {
                Pair<String, Integer> clientInfo = this.getClientNodeInfo(context);
                JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
                jobTask.setClientInfo((String)clientInfo.getKey());
                jobTask.setArgsType(context.getArgsType());
                JobArgsHolder jobArgsHolder = new JobArgsHolder();
                jobArgsHolder.setJobParams((Object)jobParams.getArgsStr());
                jobArgsHolder.setMaps(mapSubTask.get(index));
                jobTask.setArgsStr(JsonUtil.toJsonString((Object)jobArgsHolder));
                jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
                jobTask.setTaskStatus((Integer)clientInfo.getValue());
                jobTask.setMrStage(Integer.valueOf(MapReduceStageEnum.MAP.getStage()));
                jobTask.setTaskName(context.getTaskName());
                jobTask.setLeaf(StatusEnum.YES.getStatus());
                jobTask.setParentId(Long.valueOf(Objects.isNull(context.getParentId()) ? 0L : context.getParentId()));
                jobTask.setRetryCount(Integer.valueOf(0));
                jobTask.setCreateDt(LocalDateTime.now());
                jobTask.setUpdateDt(LocalDateTime.now());
                jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(""));
                jobTasks.add(jobTask);
            }
            this.batchSaveJobTasks(jobTasks);
            if (Objects.nonNull(parentJobTask)) {
                JobTask parentJobTask1 = new JobTask();
                parentJobTask1.setId(context.getParentId());
                parentJobTask1.setLeaf(StatusEnum.NO.getStatus());
                Assert.isTrue((1 == this.jobTaskMapper.updateById((Object)parentJobTask1) ? 1 : 0) != 0, () -> new SnailJobMapReduceException("Updating parent node failed"));
            }
            return jobTasks;
        });
    }

    protected MapReduceArgsStrDTO getJobParams(JobTaskGenerateContext context) {
        try {
            return (MapReduceArgsStrDTO)JsonUtil.parseObject((String)context.getArgsStr(), MapReduceArgsStrDTO.class);
        }
        catch (Exception e) {
            SnailJobLog.LOCAL.error("map reduce args parse error. argsStr:[{}]", new Object[]{context.getArgsStr()});
            return new MapReduceArgsStrDTO();
        }
    }

    private Pair<String, Integer> getClientNodeInfo(JobTaskGenerateContext context) {
        InstanceSelectCondition condition = InstanceSelectCondition.builder().allocKey(String.valueOf(context.getJobId())).groupName(context.getGroupName()).namespaceId(context.getNamespaceId()).routeKey(Integer.valueOf(ClientLoadBalanceManager.AllocationAlgorithmEnum.ROUND.getType())).targetLabels(context.getLabels()).build();
        InstanceLiveInfo instance = this.instanceManager.getALiveInstanceByRouteKey(condition);
        String clientInfo = "";
        int JobTaskStatus = JobTaskStatusEnum.RUNNING.getStatus();
        if (Objects.isNull(instance)) {
            JobTaskStatus = JobTaskStatusEnum.CANCEL.getStatus();
        } else {
            clientInfo = ClientInfoUtils.generate((RegisterNodeInfo)instance.getNodeInfo());
        }
        return Pair.of((Object)clientInfo, (Object)JobTaskStatus);
    }

    private List<List<String>> averageAlgorithm(List<String> allMapJobTasks, int shard) {
        shard = Math.min(allMapJobTasks.size(), shard);
        int totalSize = allMapJobTasks.size();
        ArrayList<Integer> partitionSizes = new ArrayList<Integer>();
        int quotient = totalSize / shard;
        int remainder = totalSize % shard;
        for (int i = 0; i < shard; ++i) {
            partitionSizes.add(quotient + (i < remainder ? 1 : 0));
        }
        ArrayList<List<String>> partitions = new ArrayList<List<String>>();
        int currentIndex = 0;
        Iterator iterator = partitionSizes.iterator();
        while (iterator.hasNext()) {
            int size = (Integer)iterator.next();
            int endIndex = Math.min(currentIndex + size, totalSize);
            partitions.add(new ArrayList<String>(allMapJobTasks.subList(currentIndex, endIndex)));
            currentIndex = endIndex;
        }
        return partitions;
    }

    @Generated
    public MapReduceTaskGenerator(JobTaskMapper jobTaskMapper, TransactionTemplate transactionTemplate, InstanceManager instanceManager) {
        this.jobTaskMapper = jobTaskMapper;
        this.transactionTemplate = transactionTemplate;
        this.instanceManager = instanceManager;
    }
}

