/*
 * Decompiled with CFR 0.152.
 */
package com.github.ltsopensource.jobtracker.support;

import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.commons.utils.DotLogUtils;
import com.github.ltsopensource.core.commons.utils.Holder;
import com.github.ltsopensource.core.constant.Constants;
import com.github.ltsopensource.core.exception.RemotingSendException;
import com.github.ltsopensource.core.exception.RequestTimeoutException;
import com.github.ltsopensource.core.factory.NamedThreadFactory;
import com.github.ltsopensource.core.json.JSON;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.core.protocol.JobProtos;
import com.github.ltsopensource.core.protocol.command.JobPullRequest;
import com.github.ltsopensource.core.protocol.command.JobPushRequest;
import com.github.ltsopensource.core.protocol.command.JobPushResponse;
import com.github.ltsopensource.core.remoting.RemotingServerDelegate;
import com.github.ltsopensource.core.support.JobDomainConverter;
import com.github.ltsopensource.jobtracker.domain.JobTrackerAppContext;
import com.github.ltsopensource.jobtracker.domain.TaskTrackerNode;
import com.github.ltsopensource.jobtracker.monitor.JobTrackerMStatReporter;
import com.github.ltsopensource.jobtracker.sender.JobPushResult;
import com.github.ltsopensource.jobtracker.sender.JobSender;
import com.github.ltsopensource.queue.domain.JobPo;
import com.github.ltsopensource.remoting.AsyncCallback;
import com.github.ltsopensource.remoting.ResponseFuture;
import com.github.ltsopensource.remoting.protocol.RemotingCommand;
import com.github.ltsopensource.store.jdbc.exception.DupEntryException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class JobPusher {
    private final Logger LOGGER = LoggerFactory.getLogger(JobPusher.class);
    private JobTrackerAppContext appContext;
    private final ExecutorService executorService;
    private final ExecutorService pushExecutorService;
    private JobTrackerMStatReporter stat;
    private RemotingServerDelegate remotingServer;
    private int jobPushBatchSize = 10;
    private ConcurrentHashMap<String, AtomicBoolean> PUSHING_FLAG = new ConcurrentHashMap();

    public JobPusher(JobTrackerAppContext appContext) {
        this.appContext = appContext;
        this.executorService = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR * 5, new NamedThreadFactory(JobPusher.class.getSimpleName() + "-Executor", true));
        int processorSize = appContext.getConfig().getParameter("lts.job.tracker.pusher.thread.num", Constants.DEFAULT_JOB_TRACKER_PUSHER_THREAD_NUM);
        this.pushExecutorService = Executors.newFixedThreadPool(processorSize, new NamedThreadFactory(JobPusher.class.getSimpleName() + "-AsyncPusher", true));
        this.stat = (JobTrackerMStatReporter)appContext.getMStatReporter();
        this.remotingServer = appContext.getRemotingServer();
        this.jobPushBatchSize = appContext.getConfig().getParameter("lts.job.tracker.push.batch.size", 10);
    }

    public void push(final JobPullRequest request) {
        this.executorService.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    JobPusher.this.push0(request);
                }
                catch (Exception e) {
                    JobPusher.this.LOGGER.error("Job push failed!", e);
                }
            }
        });
    }

    private AtomicBoolean getPushingFlag(TaskTrackerNode taskTrackerNode) {
        AtomicBoolean flag = this.PUSHING_FLAG.get(taskTrackerNode.getIdentity());
        if (flag == null) {
            flag = new AtomicBoolean(false);
            AtomicBoolean exist = this.PUSHING_FLAG.putIfAbsent(taskTrackerNode.getIdentity(), flag);
            if (exist != null) {
                flag = exist;
            }
        }
        return flag;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void push0(JobPullRequest request) {
        String nodeGroup = request.getNodeGroup();
        String identity = request.getIdentity();
        this.appContext.getTaskTrackerManager().updateTaskTrackerAvailableThreads(nodeGroup, identity, request.getAvailableThreads(), request.getTimestamp());
        final TaskTrackerNode taskTrackerNode = this.appContext.getTaskTrackerManager().getTaskTrackerNode(nodeGroup, identity);
        if (taskTrackerNode == null) {
            if (this.LOGGER.isDebugEnabled()) {
                this.LOGGER.debug("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , didn't have node.", nodeGroup, identity);
            }
            return;
        }
        int availableThread = taskTrackerNode.getAvailableThread().get();
        if (availableThread <= 0) {
            return;
        }
        AtomicBoolean pushingFlag = this.getPushingFlag(taskTrackerNode);
        if (pushingFlag.compareAndSet(false, true)) {
            try {
                int batchSize = this.jobPushBatchSize;
                int it = availableThread % batchSize == 0 ? availableThread / batchSize : availableThread / batchSize + 1;
                final CountDownLatch latch = new CountDownLatch(it);
                for (int i = 1; i <= it; ++i) {
                    int size = batchSize;
                    if (i == it) {
                        size = availableThread - batchSize * (it - 1);
                    }
                    final int finalSize = size;
                    this.pushExecutorService.execute(new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            try {
                                JobPusher.this.send(JobPusher.this.remotingServer, finalSize, taskTrackerNode);
                            }
                            catch (Throwable t) {
                                JobPusher.this.LOGGER.error("Error on Push Job to {}", taskTrackerNode, t);
                            }
                            finally {
                                latch.countDown();
                            }
                        }
                    });
                }
                try {
                    latch.await();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                DotLogUtils.dot("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , pushing finished. batchTimes:{}, size:{}", nodeGroup, identity, it, availableThread);
            }
            finally {
                pushingFlag.compareAndSet(true, false);
            }
        }
    }

    private JobPushResult send(final RemotingServerDelegate remotingServer, int size, final TaskTrackerNode taskTrackerNode) {
        final String nodeGroup = taskTrackerNode.getNodeGroup();
        final String identity = taskTrackerNode.getIdentity();
        JobSender.SendResult sendResult = this.appContext.getJobSender().send(nodeGroup, identity, size, new JobSender.SendInvoker(){

            @Override
            public JobSender.SendResult invoke(final List<JobPo> jobPos) {
                JobPushRequest body = JobPusher.this.appContext.getCommandBodyWrapper().wrapper(new JobPushRequest());
                body.setJobMetaList(JobDomainConverter.convert(jobPos));
                RemotingCommand commandRequest = RemotingCommand.createRequestCommand(JobProtos.RequestCode.PUSH_JOB.code(), body);
                final Holder<Boolean> pushSuccess = new Holder<Boolean>(false);
                final CountDownLatch latch = new CountDownLatch(1);
                try {
                    remotingServer.invokeAsync(taskTrackerNode.getChannel().getChannel(), commandRequest, new AsyncCallback(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void operationComplete(ResponseFuture responseFuture) {
                            try {
                                RemotingCommand responseCommand = responseFuture.getResponseCommand();
                                if (responseCommand == null) {
                                    JobPusher.this.LOGGER.warn("Job push failed! response command is null!");
                                    return;
                                }
                                if (responseCommand.getCode() == JobProtos.ResponseCode.JOB_PUSH_SUCCESS.code()) {
                                    if (JobPusher.this.LOGGER.isDebugEnabled()) {
                                        JobPusher.this.LOGGER.debug("Job push success! nodeGroup=" + nodeGroup + ", identity=" + identity + ", jobList=" + JSON.toJSONString(jobPos));
                                    }
                                    pushSuccess.set(true);
                                    JobPusher.this.stat.incPushJobNum(jobPos.size());
                                } else if (responseCommand.getCode() == JobProtos.ResponseCode.NO_AVAILABLE_JOB_RUNNER.code()) {
                                    JobPushResponse jobPushResponse = (JobPushResponse)responseCommand.getBody();
                                    if (jobPushResponse != null && CollectionUtils.isNotEmpty(jobPushResponse.getFailedJobIds())) {
                                        block4: for (String jobId : jobPushResponse.getFailedJobIds()) {
                                            for (JobPo jobPo : jobPos) {
                                                if (!jobId.equals(jobPo.getJobId())) continue;
                                                JobPusher.this.resumeJob(jobPo);
                                                continue block4;
                                            }
                                        }
                                        JobPusher.this.stat.incPushJobNum(jobPos.size() - jobPushResponse.getFailedJobIds().size());
                                    } else {
                                        JobPusher.this.stat.incPushJobNum(jobPos.size());
                                    }
                                    pushSuccess.set(true);
                                }
                            }
                            finally {
                                latch.countDown();
                            }
                        }
                    });
                }
                catch (RemotingSendException e) {
                    JobPusher.this.LOGGER.error("Remoting send error, jobPos={}", JSON.toJSONObject(jobPos), e);
                    return new JobSender.SendResult(false, (Object)JobPushResult.SENT_ERROR);
                }
                try {
                    latch.await(60000L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    throw new RequestTimeoutException(e);
                }
                if (!pushSuccess.get().booleanValue()) {
                    if (JobPusher.this.LOGGER.isDebugEnabled()) {
                        JobPusher.this.LOGGER.debug("Job push failed! nodeGroup=" + nodeGroup + ", identity=" + identity + ", jobs=" + JSON.toJSONObject(jobPos));
                    }
                    for (JobPo jobPo : jobPos) {
                        JobPusher.this.resumeJob(jobPo);
                    }
                    return new JobSender.SendResult(false, (Object)JobPushResult.SENT_ERROR);
                }
                return new JobSender.SendResult(true, (Object)JobPushResult.SUCCESS);
            }
        });
        return (JobPushResult)((Object)sendResult.getReturnValue());
    }

    private void resumeJob(JobPo jobPo) {
        boolean needResume = true;
        try {
            jobPo.setIsRunning(true);
            this.appContext.getExecutableJobQueue().add(jobPo);
        }
        catch (DupEntryException e) {
            this.LOGGER.warn("ExecutableJobQueue already exist:" + JSON.toJSONString(jobPo));
            needResume = false;
        }
        this.appContext.getExecutingJobQueue().remove(jobPo.getJobId());
        if (needResume) {
            this.appContext.getExecutableJobQueue().resume(jobPo);
        }
    }
}

