package c.a.b.batch.service;

import com.alibaba.fastjson.JSONObject;
import com.ohaotian.plugin.cache.CacheClient;
import com.ohaotian.plugin.mq.proxy.DefaultProxyMessageConfig;
import com.ohaotian.plugin.mq.proxy.ProxyMessage;
import com.ohaotian.plugin.mq.proxy.ProxyMessageConsumer;
import com.ohaotian.plugin.mq.proxy.ProxyMessageProducer;
import com.ohaotian.plugin.mq.proxy.status.ProxyConsumerStatus;
import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

/* loaded from: input_file:c/a/b/batch/service/AbstractConsumerServ.class */
public abstract class AbstractConsumerServ extends DefaultProxyMessageConfig implements ProxyMessageConsumer, ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(AbstractConsumerServ.class);
    ProxyMessageProducer proxyMessageProducer;

    @Autowired
    CacheClient cacheClient;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.proxyMessageProducer = (ProxyMessageProducer) applicationContext.getBeansOfType(ProxyMessageProducer.class).get("producer");
    }

    public void distributeData(MqMessageBO mqMessageBO) {
        log.info("获取数据开始...");
        getDataList(mqMessageBO);
        if (null != mqMessageBO.getBusiList() && mqMessageBO.getBusiList().size() > 0) {
            log.info("获取到业务数据");
            int dealNumByOne = mqMessageBO.getDealNumByOne();
            int size = mqMessageBO.getBusiList().size();
            log.info("总量为：" + size);
            log.info("单次处理：" + dealNumByOne);
            mqMessageBO.setAmount(Integer.valueOf(size));
            mqMessageBO.setStatus(MqConstantBO.ZSK_TASK_BATCH_STATUS_2);
            mqMessageBO.setCurrentSuccess(0);
            mqMessageBO.setCurrentFailed(0);
            int i = 0;
            int i2 = 0;
            while (true) {
                int i3 = i2;
                if (i3 >= size) {
                    break;
                }
                MqMessageBO mqMessageBO2 = new MqMessageBO();
                BeanUtils.copyProperties(mqMessageBO, mqMessageBO2, new String[]{"busiList"});
                ArrayList arrayList = new ArrayList();
                for (int i4 = 0; i4 < dealNumByOne; i4++) {
                    arrayList.add(mqMessageBO.getBusiList().get(i));
                    i++;
                    if (i == size) {
                        break;
                    }
                }
                mqMessageBO2.setCurrentAmount(Integer.valueOf(arrayList.size()));
                mqMessageBO2.setBusiList(arrayList);
                log.info("本次处理条数：" + arrayList.size());
                sendMsg(mqMessageBO2, MqConstantBO.BATCH_PROCESSING);
                i2 = i3 + dealNumByOne;
            }
        } else {
            log.info("获取业务数据为空，直接发送回执消息...");
            sendMsg(mqMessageBO, MqConstantBO.RECEIPT_TAG);
        }
        log.info("获取数据结束...");
    }

    public void sendMsg(MqMessageBO mqMessageBO, String str) {
        String jSONString = JSONObject.toJSONString(mqMessageBO);
        log.info("发送开始...");
        log.debug("消息内容：" + jSONString);
        this.proxyMessageProducer.send(new ProxyMessage(mqMessageBO.getTopic(), str, jSONString));
        log.info("发送结束...");
    }

    public abstract void getDataList(MqMessageBO mqMessageBO);

    public void invokeTaskByBatch(MqMessageBO mqMessageBO) {
        invokeBatch(mqMessageBO);
        mqMessageBO.getCurrentSuccess().intValue();
        mqMessageBO.getCurrentFailed().intValue();
        int intValue = mqMessageBO.getCurrentAmount().intValue();
        if (this.cacheClient.incrExpireTimeBy(MqConstantBO.COUNT_DOING + mqMessageBO.getBatchNo(), intValue, MqConstantBO.REDIS_TIMEOUT).longValue() == mqMessageBO.getAmount().intValue()) {
            sendMsg(mqMessageBO, MqConstantBO.RECEIPT_TAG);
        }
    }

    public abstract void invokeBatch(MqMessageBO mqMessageBO);

    public void receiptTask(MqMessageBO mqMessageBO) {
        log.info("回执方法处理开始...");
        log.info("回执方法处理结束...");
    }

    public ProxyConsumerStatus onMessage(ProxyMessage proxyMessage) {
        try {
            String tag = proxyMessage.getTag();
            MqMessageBO mqMessageBO = (MqMessageBO) JSONObject.parseObject(proxyMessage.getContent(), MqMessageBO.class);
            boolean z = -1;
            switch (tag.hashCode()) {
                case -898974458:
                    if (tag.equals(MqConstantBO.BATCH_PROCESSING)) {
                        z = true;
                        break;
                    }
                    break;
                case -739043975:
                    if (tag.equals(MqConstantBO.INIT_TAG)) {
                        z = false;
                        break;
                    }
                    break;
                case 1954086191:
                    if (tag.equals(MqConstantBO.RECEIPT_TAG)) {
                        z = 2;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    distributeData(mqMessageBO);
                    break;
                case true:
                    invokeTaskByBatch(mqMessageBO);
                    break;
                case true:
                    receiptTask(mqMessageBO);
                    break;
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("批次消息处理失败:" + e);
            try {
                execError(null);
            } catch (Exception e2) {
                e2.printStackTrace();
                log.error("异常处理失败:" + e2);
            }
        }
        return ProxyConsumerStatus.CONSUME_SUCCESS;
    }

    public void execError(MqMessageBO mqMessageBO) {
        log.info("错误处理...");
    }
}
