/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.client.executor.grid.queue.send;

import com.alibaba.dts.client.executor.grid.queue.TaskEvent;
import com.alibaba.dts.client.executor.grid.queue.send.SendManager;
import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.executor.job.context.JobContextImpl;
import com.alibaba.dts.client.store.access.TaskSnapshotAccess;
import com.alibaba.dts.common.context.InvocationContext;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.remoting.RemoteMachine;
import com.alibaba.dts.common.domain.result.Result;
import com.alibaba.dts.common.domain.result.ResultCode;
import com.alibaba.dts.common.domain.store.ExecutionCounter;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import com.alibaba.dts.common.service.NodeServerService;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class TaskSender
implements Runnable {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger(TaskSender.class);
    private ClientContextImpl clientContext;
    private SendManager sendManager;
    private TaskSnapshotAccess taskSnapshotDao;

    public TaskSender(ClientContextImpl clientContext, SendManager sendManager) {
        this.sendManager = sendManager;
        this.clientContext = clientContext;
        this.taskSnapshotDao = clientContext.getStore().getTaskSnapshotDao();
    }

    @Override
    public void run() {
        BlockingQueue<TaskEvent> sendQueue = this.sendManager.getSendQueue();
        block6: while (true) {
            try {
                while (true) {
                    TaskEvent taskEvent = sendQueue.take();
                    long jobInstanceId = taskEvent.getExecutableTask().getJobInstanceSnapshot().getId();
                    if (this.sendManager.isInterruptedJobInstance(taskEvent.getExecutableTask().getJobInstanceSnapshot().getId())) {
                        logger.debug("job instance interrupted, jobId=" + taskEvent.getExecutableTask().getJob().getId() + ", jobInstanceId=" + jobInstanceId);
                        continue;
                    }
                    ExecutableTask executableTask = taskEvent.getExecutableTask();
                    if (executableTask != null) {
                        RemoteMachine remoteMachine = taskEvent.getTargetMachine();
                        remoteMachine.setTimeout(30000L);
                        InvocationContext.setRemoteMachine(remoteMachine);
                        NodeServerService serverService = this.clientContext.getNodeServerService();
                        executableTask.setSendNodeAddress(this.clientContext.getNodeConfig().getLocalAddress());
                        Result<Boolean> result = null;
                        try {
                            result = serverService.receiveTasks(executableTask);
                            logger.debug("[TaskSender] send task,targetMachine:" + taskEvent.getTargetMachine() + ",result:" + result + ",jobID:" + executableTask.getJob().getId() + ",jobInstanceID:" + executableTask.getJobInstanceSnapshot().getId() + ",compensation:" + executableTask.isCompensation() + ",total tasks:" + executableTask.getTaskSnapshotList().size());
                        }
                        catch (Throwable throwable) {
                            logger.error("[TaskSender]:task send error,,jobID:" + executableTask.getJob().getId() + ",jobInstanceID:" + executableTask.getJobInstanceSnapshot().getId(), throwable);
                        }
                        try {
                            if (result != null && result.getResultCode() == ResultCode.SUCCESS) {
                                List<TaskSnapshot> taskSnapshots;
                                ConcurrentHashMap<String, ExecutionCounter> executionCounterMapByTaskNameExist;
                                ConcurrentHashMap<String, ExecutionCounter> executionCounterMapByTaskName;
                                String receiveNode = taskEvent.getTargetMachine().getRemoteAddress();
                                this.taskSnapshotDao.updateReceiveNodeBatch(executableTask.getTaskSnapshotList(), receiveNode);
                                if (!executableTask.isCompensation()) {
                                    this.taskSnapshotDao.updateStatus2QueueIfStatusIsInitBatch(executableTask.getTaskSnapshotList());
                                }
                                ConcurrentHashMap<String, ConcurrentHashMap<String, ExecutionCounter>> executionCounterMapByReceiveNode = this.clientContext.getExecutionCounterTable().get(jobInstanceId);
                                if (executionCounterMapByReceiveNode == null) {
                                    executionCounterMapByReceiveNode = new ConcurrentHashMap();
                                    ConcurrentHashMap<String, ConcurrentHashMap<String, ExecutionCounter>> executionCounterMapByReceiveNodeExist = this.clientContext.getExecutionCounterTable().putIfAbsent(jobInstanceId, executionCounterMapByReceiveNode);
                                    if (executionCounterMapByReceiveNodeExist != null) {
                                        executionCounterMapByReceiveNode = executionCounterMapByReceiveNodeExist;
                                    }
                                }
                                if ((executionCounterMapByTaskName = executionCounterMapByReceiveNode.get(receiveNode)) == null && (executionCounterMapByTaskNameExist = executionCounterMapByReceiveNode.putIfAbsent(receiveNode, executionCounterMapByTaskName = new ConcurrentHashMap())) != null) {
                                    executionCounterMapByTaskName = executionCounterMapByTaskNameExist;
                                }
                                if ((taskSnapshots = executableTask.getTaskSnapshotList()) == null || taskSnapshots.isEmpty()) {
                                    return;
                                }
                                Iterator<TaskSnapshot> iterator = taskSnapshots.iterator();
                                while (true) {
                                    if (!iterator.hasNext()) continue block6;
                                    TaskSnapshot taskSnapshot = iterator.next();
                                    String taskName = taskSnapshot.getTaskName();
                                    ExecutionCounter executionCounter = executionCounterMapByTaskName.get(taskName);
                                    if (executionCounter == null) {
                                        executionCounter = new ExecutionCounter();
                                        executionCounter.setReceiveNode(receiveNode);
                                        executionCounter.setTaskName(taskName);
                                        ExecutionCounter executionCounterExist = executionCounterMapByTaskName.putIfAbsent(taskName, executionCounter);
                                        if (executionCounterExist != null) {
                                            executionCounter = executionCounterExist;
                                        }
                                    }
                                    executionCounter.getTotalCounter().getAndIncrement();
                                    executionCounter.getQueuedCounter().getAndIncrement();
                                }
                            }
                            if (result == null) {
                                this.reSendTasks(taskEvent);
                                continue block6;
                            }
                            if (result.getResultCode() != ResultCode.NODE_RECEIVE_QUEUE_NOT_AVAILABLE) continue block6;
                            this.reSendTasks(taskEvent);
                            continue block6;
                        }
                        catch (Throwable e) {
                            logger.error("[TaskSender] process updateStatusBatch error,", e);
                            continue;
                        }
                    }
                    logger.warn("[TaskSender] executableTask is nulltargetMachine:" + taskEvent.getTargetMachine());
                }
            }
            catch (Throwable e) {
                logger.error("[TaskSender] process error,", e);
                continue;
            }
            break;
        }
    }

    private long getCountQueued() {
        long countQueued = 0L;
        ConcurrentHashMap<Long, ConcurrentHashMap<String, ConcurrentHashMap<String, ExecutionCounter>>> mapByJobInstanceId = this.clientContext.getExecutionCounterTable();
        for (ConcurrentHashMap<String, ConcurrentHashMap<String, ExecutionCounter>> mapByReceiveNode : mapByJobInstanceId.values()) {
            for (ConcurrentHashMap<String, ExecutionCounter> mapByTaskName : mapByReceiveNode.values()) {
                for (ExecutionCounter executionCounter : mapByTaskName.values()) {
                    countQueued += executionCounter.getQueuedCounter().get();
                }
            }
        }
        return countQueued;
    }

    private void reSendTasks(TaskEvent taskEvent) {
        final ExecutableTask executableTask = taskEvent.getExecutableTask();
        if (this.clientContext.getGridTaskSender().isInterruptedInstance(executableTask.getJobInstanceSnapshot().getId())) {
            logger.warn("[TaskSender]: reSendTasks force interrupt:,jobId:" + executableTask.getJob().getId() + ",jobInstanceId:" + executableTask.getJobInstanceSnapshot().getId() + ",total tasks:" + executableTask.getTaskSnapshotList().size());
            return;
        }
        final JobContextImpl jobContext = new JobContextImpl();
        jobContext.setJob(executableTask.getJob());
        jobContext.setJobInstanceSnapshot(executableTask.getJobInstanceSnapshot());
        this.clientContext.getGridTaskSender().getReSendExecutorService().submit(new Runnable(){

            @Override
            public void run() {
                Result<Boolean> dispatchResult;
                while ((dispatchResult = TaskSender.this.clientContext.getGridTaskSender().dispatchRetryTaskList(executableTask.getTaskSnapshotList(), jobContext)) == null || !dispatchResult.getData().booleanValue() && dispatchResult.getResultCode() != ResultCode.TASK_SEND_INTERRUPT) {
                    try {
                        TimeUnit.SECONDS.sleep(1L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        });
        logger.warn("[TaskSender] retry send tasks, previous receiveNodeAddress: " + taskEvent.getTargetMachine().getRemoteAddress() + ", jobId:" + executableTask.getJob().getId() + " ,jobInstanceId:" + executableTask.getJobInstanceSnapshot().getId() + " ,total tasks:" + executableTask.getTaskSnapshotList().size());
    }
}

