/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.sofamq.spring;

import com.alipay.sofa.sofamq.api.MessageConsumer;
import com.alipay.sofa.sofamq.com.shade.google.common.base.Objects;
import com.alipay.sofa.sofamq.org.shade.apache.commons.lang3.StringUtils;
import com.alipay.sofa.sofamq.spring.AbstractComponent;
import com.alipay.sofa.sofamq.spring.AsyncMessageProcessor;
import com.alipay.sofa.sofamq.spring.MessageProcessor;
import io.openmessaging.api.Action;
import io.openmessaging.api.Admin;
import io.openmessaging.api.AsyncConsumeContext;
import io.openmessaging.api.AsyncGenericMessageListener;
import io.openmessaging.api.Consumer;
import io.openmessaging.api.GenericMessage;
import io.openmessaging.api.GenericMessageListener;
import io.openmessaging.api.MessageConsumeContext;
import io.openmessaging.api.MessageSelector;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSResponseStatus;
import io.openmessaging.api.exception.OMSUnsupportException;
import io.openmessaging.api.order.GenericMessageOrderListener;
import io.openmessaging.api.order.OrderAction;
import io.openmessaging.api.order.OrderConsumer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class ConsumerManager
extends AbstractComponent {
    private Set<String> subscriptions = new HashSet<String>();
    private Map<String, MessageConsumer> baseConfigs = new HashMap<String, MessageConsumer>();
    private Map<String, Admin> consumers = new HashMap<String, Admin>();
    private String endpoint;
    private String schemaRegistryUrl;

    public ConsumerManager(String endpoint, String schemaRegistryUrl) {
        this.endpoint = endpoint;
        this.schemaRegistryUrl = schemaRegistryUrl;
    }

    public synchronized void registerMessageProcessor(MessageConsumer messageConsumer, final Class<?> payload, final MessageProcessor messageProcessor) {
        this.checkSubscription(messageConsumer);
        Admin consumer = this.getMessageConsumer(messageConsumer);
        MessageSelector selector = this.processMessageSelector(messageConsumer);
        if (StringUtils.isBlank(messageConsumer.topic())) {
            throw new IllegalArgumentException(String.format("MessageConsumer[%s]'s topic is empty", messageConsumer));
        }
        if (!messageConsumer.orderly()) {
            ((Consumer)consumer).subscribe(messageConsumer.topic(), selector, new GenericMessageListener(){

                @Override
                public Class payloadClass() {
                    return payload;
                }

                public Action consume(GenericMessage message, MessageConsumeContext context) {
                    return messageProcessor.process(message, context) ? Action.CommitMessage : Action.ReconsumeLater;
                }
            });
        } else {
            ((OrderConsumer)consumer).subscribe(messageConsumer.topic(), selector, new GenericMessageOrderListener(){

                @Override
                public Class payloadClass() {
                    return payload;
                }

                public OrderAction consume(GenericMessage message, MessageConsumeContext context) {
                    return messageProcessor.process(message, context) ? OrderAction.Success : OrderAction.Suspend;
                }
            });
        }
    }

    public synchronized void registerAsyncMessageProcessor(MessageConsumer messageConsumer, final Class<?> payload, final AsyncMessageProcessor<?> messageProcessor) {
        if (messageConsumer.orderly()) {
            throw new OMSUnsupportException(OMSResponseStatus.STATUS_1101.getStatusCode(), "Unsupported async type of message listener for order consumer");
        }
        this.checkSubscription(messageConsumer);
        Admin consumer = this.getMessageConsumer(messageConsumer);
        MessageSelector selector = this.processMessageSelector(messageConsumer);
        ((Consumer)consumer).subscribe(messageConsumer.topic(), selector, new AsyncGenericMessageListener(){

            @Override
            public Class payloadClass() {
                return payload;
            }

            public void consume(GenericMessage message, AsyncConsumeContext context) {
                messageProcessor.process(message, context);
            }
        });
    }

    private void checkSubscription(MessageConsumer messageConsumer) {
        String group = messageConsumer.group();
        String subscription = group + ":" + messageConsumer.topic();
        if (this.subscriptions.contains(subscription)) {
            throw new IllegalArgumentException(String.format("Consumer[%s] subscribe topic[%s] multiple times", group, messageConsumer.topic()));
        }
        this.subscriptions.add(subscription);
    }

    public Admin getMessageConsumer(MessageConsumer messageConsumer) {
        String group = messageConsumer.group();
        if (this.baseConfigs.containsKey(messageConsumer.group())) {
            MessageConsumer base = this.baseConfigs.get(messageConsumer.group());
            if (!Objects.equal(base.messageModel(), messageConsumer.messageModel())) {
                throw new IllegalArgumentException(String.format("Consumer[%s] set with different message model", group));
            }
            if (base.orderly() != messageConsumer.orderly()) {
                throw new IllegalArgumentException(String.format("Consumer[%s] can only be non-orderly or orderly", group));
            }
        }
        this.baseConfigs.put(group, messageConsumer);
        Admin consumer = this.consumers.get(group);
        if (consumer == null) {
            Properties properties = new Properties();
            String messageModel = messageConsumer.messageModel();
            properties.setProperty("groupId", messageConsumer.group());
            properties.setProperty("messageModel", messageConsumer.messageModel());
            properties.setProperty("ldcSubMode", messageConsumer.ldcSubMode());
            properties.setProperty("consumeThreadNums", Integer.toString(messageConsumer.consumeThreadNumber()));
            properties.setProperty("suspendTimeMills", Long.toString(messageConsumer.suspendTimeMills()));
            properties.setProperty("maxRetryTimes", Integer.toString(messageConsumer.maxRetryTimes()));
            properties.setProperty("consumeTimeout", Integer.toString(messageConsumer.consumeTimeoutMinutes()));
            properties.setProperty("maxCachedMessageAmount", Integer.toString(messageConsumer.maxCachedMessageAmount()));
            properties.setProperty("maxCachedMessageSizeInMiB", Integer.toString(messageConsumer.maxCachedMessageSizeInMiB()));
            properties.setProperty("enableOrderlyConsumeAccelerator", Boolean.toString(messageConsumer.enableOrderlyConsumeAccelerator()));
            properties.setProperty("elastic", Boolean.toString(messageConsumer.elastic()));
            if (StringUtils.equals(messageModel, "SITE_BROADCASTING")) {
                properties.setProperty("siteMessageEnv", messageConsumer.siteMessageEnv());
            }
            consumer = this.newConsumer(properties, messageConsumer.orderly());
            this.consumers.put(group, consumer);
        }
        return consumer;
    }

    final MessageSelector processMessageSelector(MessageConsumer messageConsumer) {
        MessageSelector selector = "TAG".equals(messageConsumer.filterType()) ? MessageSelector.byTag(messageConsumer.filter()) : MessageSelector.bySql(messageConsumer.filter());
        return selector;
    }

    protected Admin newConsumer(Properties properties, boolean orderly) {
        if (orderly) {
            return OMS.builder().driver("sofamq").endpoint(this.endpoint).schemaRegistryUrl(this.schemaRegistryUrl).build().createOrderedConsumer(properties);
        }
        return OMS.builder().driver("sofamq").endpoint(this.endpoint).schemaRegistryUrl(this.schemaRegistryUrl).build().createConsumer(properties);
    }

    @Override
    protected void start0() {
        for (Admin consumer : this.consumers.values()) {
            consumer.start();
        }
    }

    @Override
    protected void stop0() {
        for (Admin consumer : this.consumers.values()) {
            consumer.shutdown();
        }
    }
}

