package com.tydic.nicc.mq.starter.redismq;

import com.tydic.nicc.mq.starter.AbstractKKMqConsumer;
import com.tydic.nicc.mq.starter.KKMsgBuilder;
import com.tydic.nicc.mq.starter.autoconfigure.KKMqProperties;
import com.tydic.nicc.mq.starter.entity.KKMqMsg;
import com.tydic.nicc.mq.starter.exception.KKMqClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/* loaded from: input_file:com/tydic/nicc/mq/starter/redismq/RedisMqConsumerContainer.class */
public class RedisMqConsumerContainer extends AbstractKKMqConsumer implements MessageListener {
    private final RedisMessageListenerContainer container;
    private static final Logger log = LoggerFactory.getLogger(RedisMqConsumerContainer.class);

    public RedisMqConsumerContainer(RedisMessageListenerContainer redisMessageListenerContainer, KKMqProperties kKMqProperties) {
        super(kKMqProperties);
        this.container = redisMessageListenerContainer;
    }

    @Override // com.tydic.nicc.mq.starter.AbstractKKMqConsumer
    public void start() {
        if (isRunning()) {
            throw new IllegalStateException("container already running. " + this);
        }
        try {
            setRunning(true);
        } catch (Exception e) {
            throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
        }
    }

    @Override // com.tydic.nicc.mq.starter.AbstractKKMqConsumer
    public void destroy() {
    }

    @Override // com.tydic.nicc.mq.starter.AbstractKKMqConsumer
    public void initConsumer() throws KKMqClientException {
        log.debug("KKMQ消费者-初始化消费者:{}", getTopic());
        this.container.addMessageListener(new MessageListenerAdapter(this, "onMessage"), new PatternTopic(getTopic()));
        parseMessageTypeAndParameter();
    }

    public void onMessage(Message message, byte[] bArr) {
        if (log.isTraceEnabled()) {
            log.trace("KKMQ消费者-收到消息:{}", message);
        }
        KKMqMsg convertRedisMsg = KKMsgBuilder.convertRedisMsg(message, bArr);
        long currentTimeMillis = System.currentTimeMillis();
        handleMessage(convertRedisMsg);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (log.isTraceEnabled()) {
            log.trace("KKMQ消费者-消息处理耗时:{}", Long.valueOf(currentTimeMillis2));
        }
    }
}
