/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.client.service;

import com.alibaba.dts.client.executor.grid.queue.receive.TaskReceiveHandler;
import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.store.access.TaskSnapshotAccess;
import com.alibaba.dts.common.context.InvocationContext;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.remoting.RemoteMachine;
import com.alibaba.dts.common.domain.result.Result;
import com.alibaba.dts.common.domain.result.ResultCode;
import com.alibaba.dts.common.domain.store.ExecutionCounter;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.exception.AccessException;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import com.alibaba.dts.common.service.NodeServerService;
import com.alibaba.dts.common.util.NamedThreadFactory;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.util.CollectionUtils;

public class NodeServerServiceImpl
implements NodeServerService {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger(NodeServerServiceImpl.class);
    private final ConcurrentHashMap<String, AtomicInteger> logTable = new ConcurrentHashMap();
    private ClientContextImpl clientContext;
    private BlockingQueue<ExecutableTask> taskReceiveBuffer;
    TaskReceiveHandler taskReceiveHandler;
    private int bufferSize;
    private ExecutorService executor = Executors.newFixedThreadPool(1);
    private ConcurrentHashMap<Long, ExecutorService> executorServiceMap = new ConcurrentHashMap();
    private TaskSnapshotAccess taskSnapshotDao;
    private final ConcurrentHashMap<String, RemoteMachine> connectToNodes = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, RemoteMachine> connectFromNodes = new ConcurrentHashMap();

    public NodeServerServiceImpl(ClientContextImpl clientContext) {
        this.clientContext = clientContext;
    }

    public void init() {
        this.bufferSize = this.clientContext.nodeConfig.getReceiveBufferSize();
        this.taskSnapshotDao = this.clientContext.getStore().getTaskSnapshotDao();
        this.taskReceiveBuffer = new ArrayBlockingQueue<ExecutableTask>(this.bufferSize);
        this.taskReceiveHandler = new TaskReceiveHandler(this.clientContext, this.executorServiceMap);
        this.taskReceiveHandler.listen(this.taskReceiveBuffer);
    }

    @Override
    public Result<Boolean> connect() {
        return new Result<Boolean>(true, ResultCode.SUCCESS);
    }

    @Override
    public Result<Boolean> receiveTasks(ExecutableTask executableTask) {
        try {
            boolean publishEventResult;
            Long jobInstanceId = executableTask.getJobInstanceSnapshot().getId();
            if (this.clientContext.getGridTaskSender().isInterruptedInstance(jobInstanceId)) {
                logger.warn("[NodeServerService]: receiveTasks force interrupt:,jobId:" + executableTask.getJob().getId() + ",jobInstanceId:" + executableTask.getJobInstanceSnapshot().getId() + ",total tasks:" + executableTask.getTaskSnapshotList().size());
                return new Result<Boolean>(true, ResultCode.FAILURE);
            }
            ExecutorService executorService = this.executorServiceMap.get(jobInstanceId);
            if (executorService == null) {
                ExecutorService executorServiceExist;
                int consumerThreads = this.clientContext.getClientConfig().getConsumerThreads();
                Map<String, Integer> consumerThreadsMap = this.clientContext.getClientConfig().getConsumerThreadsMap();
                if (!CollectionUtils.isEmpty(consumerThreadsMap) && consumerThreadsMap.get(executableTask.getJob().getJobProcessor()) != null) {
                    consumerThreads = this.clientContext.getClientConfig().checkConsumerThreads(consumerThreadsMap.get(executableTask.getJob().getJobProcessor()));
                }
                if ((executorServiceExist = this.executorServiceMap.putIfAbsent(jobInstanceId, executorService = Executors.newFixedThreadPool(consumerThreads, new NamedThreadFactory("SchedulerX-Grid-Task-Processor_" + executableTask.getJob().getId() + "_" + jobInstanceId + "-")))) != null) {
                    executorService.shutdownNow();
                }
            }
            if (publishEventResult = this.taskReceiveBuffer.offer(executableTask)) {
                return new Result<Boolean>(true, ResultCode.SUCCESS);
            }
            return new Result<Boolean>(false, ResultCode.NODE_RECEIVE_QUEUE_NOT_AVAILABLE);
        }
        catch (Exception e) {
            logger.error("Job\u63a5\u6536\u9519\u8bef, {}", (Object)executableTask, (Object)e);
            return new Result<Boolean>(false);
        }
    }

    @Override
    public Result<Boolean> acknowledge(TaskSnapshot taskSnapshot) {
        try {
            String taskName;
            ExecutionCounter executionCounter;
            ConcurrentHashMap<String, ExecutionCounter> executionCounterMapByTaskNameExist;
            String receiveNode;
            ConcurrentHashMap<String, ExecutionCounter> executionCounterMapByTaskName;
            int rst;
            int deletedCount;
            RemoteMachine remoteMachine = InvocationContext.acquireRemoteMachine();
            if (taskSnapshot.getStatus() == 3 || taskSnapshot.getStatus() == 4 && taskSnapshot.getRetryCount() <= 0 ? (deletedCount = this.taskSnapshotDao.delete(taskSnapshot)) <= 0 : (rst = this.taskSnapshotDao.taskSnapshotAck(taskSnapshot)) <= 0) {
                return new Result<Boolean>(true);
            }
            int status = taskSnapshot.getStatus();
            Long jobInstanceId = taskSnapshot.getJobInstanceId();
            ConcurrentHashMap<String, ConcurrentHashMap<String, ExecutionCounter>> executionCounterMapByReceiveNode = this.clientContext.getExecutionCounterTable().get(jobInstanceId);
            if (executionCounterMapByReceiveNode == null) {
                executionCounterMapByReceiveNode = new ConcurrentHashMap();
                ConcurrentHashMap<String, ConcurrentHashMap<String, ExecutionCounter>> executionCounterMapByReceiveNodeExist = this.clientContext.getExecutionCounterTable().putIfAbsent(jobInstanceId, executionCounterMapByReceiveNode);
                if (executionCounterMapByReceiveNodeExist != null) {
                    executionCounterMapByReceiveNode = executionCounterMapByReceiveNodeExist;
                }
            }
            if ((executionCounterMapByTaskName = executionCounterMapByReceiveNode.get(receiveNode = remoteMachine.getRemoteAddress().substring(0, remoteMachine.getRemoteAddress().indexOf(58) + 1) + remoteMachine.getNodeListenPort())) == null && (executionCounterMapByTaskNameExist = executionCounterMapByReceiveNode.putIfAbsent(receiveNode, executionCounterMapByTaskName = new ConcurrentHashMap())) != null) {
                executionCounterMapByTaskName = executionCounterMapByTaskNameExist;
            }
            if ((executionCounter = executionCounterMapByTaskName.get(taskName = taskSnapshot.getTaskName())) == null) {
                executionCounter = new ExecutionCounter();
                executionCounter.setReceiveNode(receiveNode);
                executionCounter.setTaskName(taskName);
                ExecutionCounter executionCounterExist = executionCounterMapByTaskName.putIfAbsent(taskName, executionCounter);
                if (executionCounterExist != null) {
                    executionCounter = executionCounterExist;
                }
            }
            if (status == 3) {
                executionCounter.getSuccessCounter().getAndIncrement();
                executionCounter.getQueuedCounter().decrementAndGet();
            } else {
                executionCounter.getFailCounter().getAndIncrement();
                executionCounter.getQueuedCounter().decrementAndGet();
            }
            return new Result<Boolean>(true);
        }
        catch (Throwable t) {
            logger.error("Task snapshot ack failed, {}", (Object)taskSnapshot, (Object)t);
            return new Result<Boolean>(false);
        }
    }

    public boolean stopTask(long jobId, long jobInstanceId) {
        try {
            this.clientContext.getGridTaskSender().addInterruptedJobInstance(jobInstanceId);
            ExecutorService executorService = this.executorServiceMap.get(jobInstanceId);
            if (executorService != null) {
                this.executorServiceMap.remove(jobInstanceId);
                executorService.shutdown();
            }
            this.clientContext.getGridTaskSender().clearInsertBuffer(jobInstanceId);
            this.clientContext.getExecutor().doGridJobCleanTask(jobInstanceId);
            this.fixDispatchedTasksStatus(jobInstanceId);
            return true;
        }
        catch (Throwable throwable) {
            logger.error("failed to stop job, id = {}, jonInstanceId = {}", jobId, jobInstanceId, throwable);
            return false;
        }
    }

    private void fixDispatchedTasksStatus(final long jobInstanceId) throws AccessException {
        new Thread(new Runnable(){

            @Override
            public void run() {
                boolean stopFlag = false;
                while (!stopFlag) {
                    try {
                        TimeUnit.SECONDS.sleep(10L);
                        long count = NodeServerServiceImpl.this.clientContext.getStore().getTaskSnapshotDao().deleteByJobInstanceId(jobInstanceId);
                        while (count > 0L) {
                            count = NodeServerServiceImpl.this.clientContext.getStore().getTaskSnapshotDao().deleteByJobInstanceId(jobInstanceId);
                        }
                        stopFlag = true;
                    }
                    catch (Throwable e) {
                        logger.error("deleteByJobInstanceId error", e);
                        stopFlag = false;
                    }
                }
            }
        }).start();
    }

    public ConcurrentHashMap<Long, ExecutorService> getExecutorServiceMap() {
        return this.executorServiceMap;
    }
}

