package com.tydic.nicc.mq.starter.autoconfigure;

import com.tydic.nicc.mq.starter.AbstractKKMqConsumer;
import com.tydic.nicc.mq.starter.KKMQMessageConverter;
import com.tydic.nicc.mq.starter.alimq.AliMqConsumerContainer;
import com.tydic.nicc.mq.starter.annotation.KKMqConsumer;
import com.tydic.nicc.mq.starter.api.KKMqConsumerListener;
import com.tydic.nicc.mq.starter.entity.eum.ConsumeMode;
import com.tydic.nicc.mq.starter.entity.eum.KKMqType;
import com.tydic.nicc.mq.starter.entity.eum.MessageModel;
import com.tydic.nicc.mq.starter.exception.KKMqClientException;
import com.tydic.nicc.mq.starter.exception.KKMqException;
import com.tydic.nicc.mq.starter.rocketmq.RocketMqConsumerContainer;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.BeanDefinitionCustomizer;
import org.springframework.beans.factory.support.BeanDefinitionValidationException;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.StringUtils;

@EnableConfigurationProperties({KKMqProperties.class})
@Configuration
/* loaded from: input_file:com/tydic/nicc/mq/starter/autoconfigure/KKMqConsumerConfiguration.class */
public class KKMqConsumerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
    private static final Logger log = LoggerFactory.getLogger(KKMqConsumerConfiguration.class);
    private ConfigurableApplicationContext applicationContext;
    private AtomicLong rocketmqCounter = new AtomicLong(0);
    private AtomicLong alimqCounter = new AtomicLong(0);
    private StandardEnvironment environment;
    private KKMqProperties mqProperties;
    private KKMQMessageConverter kkmqMessageConverter;

    public KKMqConsumerConfiguration(KKMQMessageConverter kKMQMessageConverter, StandardEnvironment standardEnvironment, KKMqProperties kKMqProperties) {
        this.kkmqMessageConverter = kKMQMessageConverter;
        this.environment = standardEnvironment;
        this.mqProperties = kKMqProperties;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }

    public void afterSingletonsInstantiated() {
        validateProperties(this.mqProperties);
        ((Map) this.applicationContext.getBeansWithAnnotation(KKMqConsumer.class).entrySet().stream().filter(entry -> {
            return !ScopedProxyUtils.isScopedTarget((String) entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).forEach(this::registerContainer);
    }

    private void registerContainer(String str, Object obj) {
        KKMqConsumer kKMqConsumer = (KKMqConsumer) AopProxyUtils.ultimateTargetClass(obj).getAnnotation(KKMqConsumer.class);
        validate(kKMqConsumer);
        AbstractKKMqConsumer registerConsumerBean = registerConsumerBean(str, obj, kKMqConsumer, (GenericApplicationContext) this.applicationContext);
        if (registerConsumerBean.isRunning()) {
            return;
        }
        try {
            registerConsumerBean.start();
            log.info("KKMQ注册消费者:{}|{}", registerConsumerBean, obj.getClass().getName());
        } catch (Exception e) {
            log.error("KKMQ注册消费者-消费者创建异常:{}", registerConsumerBean, e);
            throw new KKMqException(e.getMessage());
        }
    }

    private AbstractKKMqConsumer registerConsumerBean(String str, Object obj, KKMqConsumer kKMqConsumer, GenericApplicationContext genericApplicationContext) {
        AbstractKKMqConsumer abstractKKMqConsumer;
        String format = String.format("%s_%s", AbstractKKMqConsumer.class.getName(), Long.valueOf(this.alimqCounter.incrementAndGet()));
        genericApplicationContext.registerBean(format, AbstractKKMqConsumer.class, () -> {
            return createConsumer(str, obj, kKMqConsumer);
        }, new BeanDefinitionCustomizer[0]);
        KKMqType valueOf = KKMqType.valueOf(this.mqProperties.getMqType());
        if (KKMqType.alirmq.equals(valueOf)) {
            abstractKKMqConsumer = (AbstractKKMqConsumer) genericApplicationContext.getBean(format, AliMqConsumerContainer.class);
        } else {
            if (!KKMqType.rocketmq.equals(valueOf)) {
                throw new KKMqException("KKMQ注册消费者-MQ类型配置错误:" + this.mqProperties.getMqType());
            }
            abstractKKMqConsumer = (AbstractKKMqConsumer) genericApplicationContext.getBean(format, RocketMqConsumerContainer.class);
        }
        return abstractKKMqConsumer;
    }

    private AbstractKKMqConsumer createConsumer(String str, Object obj, KKMqConsumer kKMqConsumer) {
        AbstractKKMqConsumer rocketMqConsumerContainer;
        KKMqType valueOf = KKMqType.valueOf(this.mqProperties.getMqType());
        if (!StringUtils.isEmpty(kKMqConsumer.mqType())) {
            valueOf = KKMqType.valueOf(kKMqConsumer.mqType());
        }
        if (KKMqType.alirmq.equals(valueOf)) {
            rocketMqConsumerContainer = new AliMqConsumerContainer(this.mqProperties);
        } else {
            if (!KKMqType.rocketmq.equals(valueOf)) {
                throw new KKMqException("KKMQ注册消费者-MQ类型配置错误:" + this.mqProperties.getMqType());
            }
            rocketMqConsumerContainer = new RocketMqConsumerContainer(this.mqProperties);
        }
        rocketMqConsumerContainer.setKKmqConsumerAnno(kKMqConsumer);
        rocketMqConsumerContainer.setName(str);
        rocketMqConsumerContainer.setConsumerListener((KKMqConsumerListener) obj);
        rocketMqConsumerContainer.setMessageConverter(this.kkmqMessageConverter.getMessageConverter());
        rocketMqConsumerContainer.initConsumer();
        return rocketMqConsumerContainer;
    }

    private void validateProperties(KKMqProperties kKMqProperties) {
        if (StringUtils.isEmpty(kKMqProperties.getNameServer())) {
            throw new RuntimeException("KKMQ配置校验异常:请配置[nameServer]");
        }
        if (StringUtils.isEmpty(kKMqProperties.getMqType())) {
            throw new RuntimeException("KKMQ配置校验异常:请配置类型[type]");
        }
        if (kKMqProperties.getProducer() == null) {
            throw new RuntimeException("KKMQ配置校验异常:请配置生产者[producer]");
        }
    }

    private void validate(KKMqConsumer kKMqConsumer) {
        if (kKMqConsumer.consumeMode() == ConsumeMode.ORDERLY && kKMqConsumer.messageModel() == MessageModel.BROADCASTING) {
            throw new BeanDefinitionValidationException("Bad annotation definition in @KKMqConsumer, messageModel BROADCASTING does not support ORDERLY message!");
        }
        if (!StringUtils.isEmpty(kKMqConsumer.mqType()) && !KKMqType.rocketmq.name().equals(kKMqConsumer.mqType()) && !KKMqType.alirmq.name().equals(kKMqConsumer.mqType())) {
            throw new KKMqClientException("KKMQ配置校验异常:@KKMqConsumer 注解 mqType 错误!");
        }
    }
}
