/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.client.executor.grid.queue.send;

import com.alibaba.dts.client.executor.grid.queue.TaskEvent;
import com.alibaba.dts.client.executor.grid.queue.send.MergingTaskGroup;
import com.alibaba.dts.client.executor.grid.queue.send.TaskMergeMonitor;
import com.alibaba.dts.client.executor.grid.queue.send.TaskMerger;
import com.alibaba.dts.client.executor.grid.queue.send.TaskRouter;
import com.alibaba.dts.client.executor.grid.queue.send.TaskSender;
import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.common.domain.remoting.RemoteMachine;
import com.alibaba.dts.common.exception.InitException;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import com.alibaba.dts.common.util.NamedThreadFactory;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class SendManager {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger(SendManager.class);
    private AtomicBoolean readyForSend = new AtomicBoolean(false);
    private ClientContextImpl clientContext;
    private int routeQueueSize = 0x800000;
    private int mergeQueueSize = 0x800000;
    private int sendQueueSize = 0x800000;
    private int routeThreadCount = 4;
    private int mergeThreadCount = 1;
    private int sendThreadCount = 16;
    private BlockingQueue<TaskEvent> routeQueue;
    private BlockingQueue<TaskEvent> mergeQueue;
    private BlockingQueue<TaskEvent> sendQueue;
    private ConcurrentHashMap<String, ConcurrentHashMap<Long, MergingTaskGroup>> mergingTaskGroupMapByTargetMachine = new ConcurrentHashMap();
    private ConcurrentHashMap<String, ConcurrentHashMap<Long, MergingTaskGroup>> mergingTaskGroupMapByTargetMachineCompensation = new ConcurrentHashMap();
    private ConcurrentHashMap<Long, Object> interruptedJobInstanceMap = new ConcurrentHashMap();
    private ConcurrentHashMap<Long, List<RemoteMachine>> machinesByJob = new ConcurrentHashMap();

    public void init(ClientContextImpl clientContext) throws InitException {
        try {
            this.clientContext = clientContext;
            this.routeQueue = new ArrayBlockingQueue<TaskEvent>(this.routeQueueSize);
            this.mergeQueue = new ArrayBlockingQueue<TaskEvent>(this.mergeQueueSize);
            this.sendQueue = new ArrayBlockingQueue<TaskEvent>(this.sendQueueSize);
            ExecutorService routeThreadPool = Executors.newFixedThreadPool(this.routeThreadCount, new NamedThreadFactory("SchedulerX-Task-Route-"));
            for (int i = 0; i < this.routeThreadCount; ++i) {
                TaskRouter taskRouter = new TaskRouter(clientContext, this);
                routeThreadPool.submit(taskRouter);
            }
            ExecutorService mergeThreadPool = Executors.newFixedThreadPool(this.mergeThreadCount, new NamedThreadFactory("SchedulerX-Task-Merge-"));
            for (int i = 0; i < this.mergeThreadCount; ++i) {
                mergeThreadPool.submit(new TaskMerger(this));
            }
            ExecutorService sendThreadPool = Executors.newFixedThreadPool(this.sendThreadCount, new NamedThreadFactory("SchedulerX-Task-Send-"));
            for (int i = 0; i < this.sendThreadCount; ++i) {
                sendThreadPool.submit(new TaskSender(clientContext, this));
            }
            Executors.newScheduledThreadPool(1, new NamedThreadFactory("SchedulerX-TaskMergeMonitor -")).scheduleAtFixedRate(new TaskMergeMonitor(this), 0L, 2L, TimeUnit.SECONDS);
        }
        catch (Throwable throwable) {
            throw new InitException("failed to init SendManager", throwable);
        }
    }

    public void putTasksToRouteQueue(List<TaskEvent> taskEvents, long jobInstanceId) {
        if (this.isInterruptedJobInstance(jobInstanceId)) {
            return;
        }
        for (TaskEvent taskEvent : taskEvents) {
            this.putSingleTaskToRouteQueue(taskEvent);
        }
    }

    public void putSingleTaskToRouteQueue(TaskEvent taskEvent) {
        try {
            this.routeQueue.put(taskEvent);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public void resetRoutesMachines(long jobId, List<RemoteMachine> machines) {
        this.machinesByJob.put(jobId, machines);
    }

    public boolean isInterruptedJobInstance(long jobInstanceId) {
        return this.interruptedJobInstanceMap.containsKey(jobInstanceId);
    }

    public void addInterruptedInstance(long instanceId) {
        try {
            this.interruptedJobInstanceMap.put(instanceId, new Date());
            logger.info("[SendManager]: addInterruptedInstance instanceId:" + instanceId);
        }
        catch (Throwable e) {
            logger.error("[SendManager]: addInterruptedInstance error,instanceId:" + instanceId, e);
        }
    }

    public void removeInterruptedJobInstance(long instanceId) {
        try {
            this.interruptedJobInstanceMap.remove(instanceId);
            logger.info("[SendManager]: removeInterceptInstance instanceId:" + instanceId);
        }
        catch (Throwable e) {
            logger.error("[SendManager]: removeInterceptInstance error,instanceId:" + instanceId, e);
        }
    }

    public void clearMergingTaskGroupMap(long jobInstanceId) {
        try {
            for (ConcurrentHashMap<Long, MergingTaskGroup> remoteMachineMapByJobInstanceId : this.mergingTaskGroupMapByTargetMachine.values()) {
                remoteMachineMapByJobInstanceId.remove(jobInstanceId);
            }
            for (ConcurrentHashMap<Long, MergingTaskGroup> remoteMachineMapByJobInstanceId : this.mergingTaskGroupMapByTargetMachineCompensation.values()) {
                remoteMachineMapByJobInstanceId.remove(jobInstanceId);
            }
        }
        catch (Throwable throwable) {
            logger.error("faild to clearMergingTaskGroupMap, jobInstanceId=" + jobInstanceId, throwable);
        }
    }

    public BlockingQueue<TaskEvent> getRouteQueue() {
        return this.routeQueue;
    }

    public BlockingQueue<TaskEvent> getMergeQueue() {
        return this.mergeQueue;
    }

    public BlockingQueue<TaskEvent> getSendQueue() {
        return this.sendQueue;
    }

    public ConcurrentHashMap<Long, List<RemoteMachine>> getMachinesByJob() {
        return this.machinesByJob;
    }

    public ConcurrentHashMap<Long, Object> getInterruptedJobInstanceMap() {
        return this.interruptedJobInstanceMap;
    }

    public ConcurrentHashMap<String, ConcurrentHashMap<Long, MergingTaskGroup>> getMergingTaskGroupMapByTargetMachine() {
        return this.mergingTaskGroupMapByTargetMachine;
    }

    public ConcurrentHashMap<String, ConcurrentHashMap<Long, MergingTaskGroup>> getMergingTaskGroupMapByTargetMachineCompensation() {
        return this.mergingTaskGroupMapByTargetMachineCompensation;
    }

    public AtomicBoolean getReadyForSend() {
        return this.readyForSend;
    }

    public ClientContextImpl getClientContext() {
        return this.clientContext;
    }

    public void setRouteQueueSize(int routeQueueSize) {
        this.routeQueueSize = routeQueueSize;
    }

    public void setMergeQueueSize(int mergeQueueSize) {
        this.mergeQueueSize = mergeQueueSize;
    }

    public void setSendQueueSize(int sendQueueSize) {
        this.sendQueueSize = sendQueueSize;
    }
}

