/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.sofamq.client;

import com.alipay.sofa.sofamq.client.ConsumerImpl;
import com.alipay.sofa.sofamq.client.LdcSubMode;
import com.alipay.sofa.sofamq.client.MQUtil;
import com.alipay.sofa.sofamq.org.shade.apache.commons.lang3.StringUtils;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.utils.ThreadUtils;
import io.openmessaging.api.AsyncGenericMessageListener;
import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Consumer;
import io.openmessaging.api.GenericMessageListener;
import io.openmessaging.api.MessageListener;
import io.openmessaging.api.MessageSelector;
import io.openmessaging.api.OMSResponseStatus;
import io.openmessaging.api.exception.OMSRuntimeException;
import io.openmessaging.api.exception.OMSUnsupportException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SiteConsumerImpl
extends ConsumerImpl
implements Consumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SiteConsumerImpl.class);
    private static final ScheduledExecutorService scheduler = ThreadUtils.newSingleThreadScheduledExecutor("SofamqSiteConsumerSync", false);
    private final boolean needConsumeFromProd;
    private final Map<String, Consumer> innerDestZoneConsumers = new ConcurrentHashMap<String, Consumer>();
    private final Map<String, Object[]> innerTopicListeners = new ConcurrentHashMap<String, Object[]>();
    private ScheduledFuture<?> syncTask;

    public SiteConsumerImpl(Properties properties) {
        super(properties);
        boolean isGray = StringUtils.isNotBlank(System.getenv("ALIPAY_APP_ENV")) && StringUtils.equalsIgnoreCase(System.getenv("ALIPAY_APP_ENV"), "gray");
        String msgEnv = properties.getProperty("siteMessageEnv", "CURRENT");
        this.needConsumeFromProd = isGray && StringUtils.equals(msgEnv, "ALL");
        String subMode = properties.getProperty("ldcSubMode");
        if (StringUtils.isNotBlank(subMode) && LdcSubMode.valueOf(subMode) != LdcSubMode.DEFAULT && LdcSubMode.valueOf(subMode) != LdcSubMode.CZONE) {
            throw new OMSUnsupportException(OMSResponseStatus.STATUS_1418.getStatusCode(), String.format("Illegal sub mode [%s] for site consumer", subMode));
        }
    }

    @Override
    public void start() {
        super.start();
        this.syncInnerDestZones();
        this.syncTask = scheduler.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                SiteConsumerImpl.this.syncInnerDestZones();
            }
        }, 30L, 30L, TimeUnit.SECONDS);
    }

    private void syncInnerDestZones() {
        try {
            this.syncAndConnectDestZones();
        }
        catch (Throwable ex) {
            LOGGER.error("Failed to sync destination zones", ex);
        }
    }

    @Override
    public void shutdown() {
        super.shutdown();
        if (this.syncTask != null) {
            this.syncTask.cancel(true);
            try {
                this.syncTask.get();
            }
            catch (Exception ex) {
                LOGGER.warn("Wait to cancel syncTask", (Throwable)ex);
            }
        }
        scheduler.shutdown();
        for (Consumer siteConsumer : this.innerDestZoneConsumers.values()) {
            if (siteConsumer == null) continue;
            siteConsumer.shutdown();
        }
        this.innerDestZoneConsumers.clear();
    }

    @Override
    public synchronized void subscribe(String topic, String subExpression, MessageListener listener) {
        this.innerTopicListeners.put(topic, new Object[]{subExpression, listener});
        for (Consumer siteConsumer : this.innerDestZoneConsumers.values()) {
            if (siteConsumer == null) continue;
            siteConsumer.subscribe(topic, subExpression, listener);
        }
    }

    @Override
    public synchronized void subscribe(String topic, MessageSelector selector, MessageListener listener) {
        this.innerTopicListeners.put(topic, new Object[]{selector, listener});
        for (Consumer siteConsumer : this.innerDestZoneConsumers.values()) {
            if (siteConsumer == null) continue;
            siteConsumer.subscribe(topic, selector, listener);
        }
    }

    @Override
    public synchronized <T> void subscribe(String topic, String subExpression, GenericMessageListener<T> listener) {
        this.innerTopicListeners.put(topic, new Object[]{subExpression, listener});
        for (Consumer siteConsumer : this.innerDestZoneConsumers.values()) {
            if (siteConsumer == null) continue;
            siteConsumer.subscribe(topic, subExpression, listener);
        }
    }

    @Override
    public synchronized <T> void subscribe(String topic, MessageSelector selector, GenericMessageListener<T> listener) {
        this.innerTopicListeners.put(topic, new Object[]{selector, listener});
        for (Consumer siteConsumer : this.innerDestZoneConsumers.values()) {
            if (siteConsumer == null) continue;
            siteConsumer.subscribe(topic, selector, listener);
        }
    }

    @Override
    public synchronized void unsubscribe(String topic) {
        this.innerTopicListeners.remove(topic);
        for (Consumer siteConsumer : this.innerDestZoneConsumers.values()) {
            if (siteConsumer == null) continue;
            siteConsumer.unsubscribe(topic);
        }
    }

    @Override
    public synchronized void subscribe(String topic, String subExpression, AsyncMessageListener listener) {
        this.innerTopicListeners.put(topic, new Object[]{subExpression, listener});
        for (Consumer siteConsumer : this.innerDestZoneConsumers.values()) {
            if (siteConsumer == null) continue;
            siteConsumer.subscribe(topic, subExpression, listener);
        }
    }

    @Override
    public synchronized void subscribe(String topic, MessageSelector selector, AsyncMessageListener listener) {
        this.innerTopicListeners.put(topic, new Object[]{selector, listener});
        for (Consumer siteConsumer : this.innerDestZoneConsumers.values()) {
            if (siteConsumer == null) continue;
            siteConsumer.subscribe(topic, selector, listener);
        }
    }

    @Override
    public synchronized <T> void subscribe(String topic, String subExpression, AsyncGenericMessageListener<T> listener) {
        this.innerTopicListeners.put(topic, new Object[]{subExpression, listener});
        for (Consumer siteConsumer : this.innerDestZoneConsumers.values()) {
            if (siteConsumer == null) continue;
            siteConsumer.subscribe(topic, subExpression, listener);
        }
    }

    @Override
    public synchronized <T> void subscribe(String topic, MessageSelector selector, AsyncGenericMessageListener<T> listener) {
        this.innerTopicListeners.put(topic, new Object[]{selector, listener});
        for (Consumer siteConsumer : this.innerDestZoneConsumers.values()) {
            if (siteConsumer == null) continue;
            siteConsumer.subscribe(topic, selector, listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncAndConnectDestZones() throws Exception {
        Map<String, String> innerZoneDomainMap = this.getSiteDestZones(this.needConsumeFromProd);
        if (innerZoneDomainMap == null) {
            LOGGER.error("No destination zone found");
            return;
        }
        for (String zone : innerZoneDomainMap.keySet()) {
            try {
                if (this.innerDestZoneConsumers.containsKey(zone)) {
                    Consumer existedSiteConsumer = this.innerDestZoneConsumers.get(zone);
                    if (existedSiteConsumer.isStarted()) continue;
                    existedSiteConsumer.start();
                    continue;
                }
                Properties newProps = new Properties();
                newProps.putAll((Map<?, ?>)this.properties);
                newProps.put("messageModel", "BROADCASTING");
                newProps.put("groupId", this.properties.getProperty("groupId"));
                newProps.put("cell", zone);
                MQUtil.replaceEndpoint(newProps, innerZoneDomainMap.get(zone));
                ConsumerImpl innerDestZoneConsumer = new ConsumerImpl(newProps);
                innerDestZoneConsumer.switchSiteMessagePredicate(true);
                SiteConsumerImpl siteConsumerImpl = this;
                synchronized (siteConsumerImpl) {
                    for (Map.Entry<String, Object[]> entry : this.innerTopicListeners.entrySet()) {
                        String topic = entry.getKey();
                        Object subValue1stParam = entry.getValue()[0];
                        Object subValue2ndParam = entry.getValue()[1];
                        if (subValue1stParam instanceof String && subValue2ndParam instanceof MessageListener) {
                            innerDestZoneConsumer.subscribe(topic, (String)subValue1stParam, (MessageListener)subValue2ndParam);
                            continue;
                        }
                        if (subValue1stParam instanceof MessageSelector && subValue2ndParam instanceof MessageListener) {
                            innerDestZoneConsumer.subscribe(topic, (MessageSelector)subValue1stParam, (MessageListener)subValue2ndParam);
                            continue;
                        }
                        if (subValue1stParam instanceof String && subValue2ndParam instanceof GenericMessageListener) {
                            innerDestZoneConsumer.subscribe(topic, (String)subValue1stParam, (GenericMessageListener)subValue2ndParam);
                            continue;
                        }
                        if (subValue1stParam instanceof MessageSelector && subValue2ndParam instanceof GenericMessageListener) {
                            innerDestZoneConsumer.subscribe(topic, (MessageSelector)subValue1stParam, (GenericMessageListener)subValue2ndParam);
                            continue;
                        }
                        if (subValue1stParam instanceof String && subValue2ndParam instanceof AsyncMessageListener) {
                            innerDestZoneConsumer.subscribe(topic, (String)subValue1stParam, (AsyncMessageListener)subValue2ndParam);
                            continue;
                        }
                        if (subValue1stParam instanceof MessageSelector && subValue2ndParam instanceof AsyncMessageListener) {
                            innerDestZoneConsumer.subscribe(topic, (MessageSelector)subValue1stParam, (AsyncMessageListener)subValue2ndParam);
                            continue;
                        }
                        if (subValue1stParam instanceof String && subValue2ndParam instanceof AsyncGenericMessageListener) {
                            innerDestZoneConsumer.subscribe(topic, (String)subValue1stParam, (AsyncGenericMessageListener)subValue2ndParam);
                            continue;
                        }
                        if (subValue1stParam instanceof MessageSelector && subValue2ndParam instanceof AsyncGenericMessageListener) {
                            innerDestZoneConsumer.subscribe(topic, (MessageSelector)subValue1stParam, (AsyncGenericMessageListener)subValue2ndParam);
                            continue;
                        }
                        throw new OMSRuntimeException(String.format("Unknown type of filter rule [%s] and message listener [%s]", subValue1stParam.getClass(), subValue2ndParam.getClass()));
                    }
                    innerDestZoneConsumer.start();
                    LOGGER.info("start inner zone consumer {} with props [{}]", (Object)zone, (Object)newProps);
                    this.innerDestZoneConsumers.put(zone, innerDestZoneConsumer);
                }
            }
            catch (Exception ex) {
                LOGGER.warn("Failed to start inner zone consumer, zone={}", (Object)zone, (Object)ex);
            }
        }
        for (String existedCell : this.innerDestZoneConsumers.keySet()) {
            if (innerZoneDomainMap.containsKey(existedCell)) continue;
            Consumer innerDestZoneConsumer = this.innerDestZoneConsumers.remove(existedCell);
            if (innerDestZoneConsumer != null) {
                innerDestZoneConsumer.shutdown();
            }
            LOGGER.info("shutdown existed inner zone consumer {}", (Object)existedCell);
        }
    }

    public Map<String, Consumer> getInnerDestZoneConsumers() {
        return this.innerDestZoneConsumers;
    }
}

