/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.sofamq.client.elastic;

import com.alipay.sofa.sofamq.client.ConsumerImpl;
import com.alipay.sofa.sofamq.client.MQUtil;
import com.alipay.sofa.sofamq.client.RateLimiterSetter;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.utils.ThreadUtils;
import io.openmessaging.api.Consumer;
import io.openmessaging.api.GenericMessageListener;
import io.openmessaging.api.MessageListener;
import io.openmessaging.api.MessageSelector;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class ElasticConsumerImpl
extends ConsumerImpl
implements Consumer {
    private static final ScheduledExecutorService scheduler = ThreadUtils.newSingleThreadScheduledExecutor("SofaMQElasticMetaSync", false);
    private Map<String, ConsumerImpl> elasticConsumers = new ConcurrentHashMap<String, ConsumerImpl>();
    private Properties baseProperties;
    private Map<String, Object[]> subscription = new HashMap<String, Object[]>();
    private ScheduledFuture elasticSyncFuture;
    private Map<String, Integer> topic2Rate = new HashMap<String, Integer>();

    public ElasticConsumerImpl(Properties properties) {
        super(properties);
        this.baseProperties = properties;
    }

    @Override
    public void start() {
        super.start();
        if ("antfin".equals(this.baseProperties.getProperty("site")) && this.elasticSyncFuture == null) {
            this.elasticSyncFuture = scheduler.scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    try {
                        ElasticConsumerImpl.this.syncElasticCells();
                    }
                    catch (Exception e) {
                        LOGGER.warn("sync {} elastic rule fail", (Object)ElasticConsumerImpl.this.cell, (Object)e);
                    }
                }
            }, 0L, 30L, TimeUnit.SECONDS);
        }
    }

    @Override
    public synchronized void subscribe(String topic, String subExpression, MessageListener listener) {
        super.subscribe(topic, subExpression, listener);
        this.subscription.put(topic, new Object[]{subExpression, listener});
        for (ConsumerImpl consumer : this.elasticConsumers.values()) {
            consumer.subscribe(topic, subExpression, listener);
        }
    }

    @Override
    public synchronized void subscribe(String topic, MessageSelector selector, MessageListener listener) {
        super.subscribe(topic, selector, listener);
        this.subscription.put(topic, new Object[]{selector, listener});
        for (ConsumerImpl consumer : this.elasticConsumers.values()) {
            consumer.subscribe(topic, selector, listener);
        }
    }

    @Override
    public synchronized <T> void subscribe(String topic, String subExpression, GenericMessageListener<T> listener) {
        super.subscribe(topic, subExpression, listener);
        this.subscription.put(topic, new Object[]{subExpression, listener});
        for (ConsumerImpl consumer : this.elasticConsumers.values()) {
            consumer.subscribe(topic, subExpression, listener);
        }
    }

    @Override
    public synchronized <T> void subscribe(String topic, MessageSelector selector, GenericMessageListener<T> listener) {
        super.subscribe(topic, selector, listener);
        this.subscription.put(topic, new Object[]{selector, listener});
        for (ConsumerImpl consumer : this.elasticConsumers.values()) {
            consumer.subscribe(topic, selector, listener);
        }
    }

    @Override
    public synchronized void unsubscribe(String topic) {
        super.unsubscribe(topic);
        this.subscription.remove(topic);
        for (ConsumerImpl consumer : this.elasticConsumers.values()) {
            consumer.unsubscribe(topic);
        }
    }

    @Override
    public void shutdown() {
        super.shutdown();
        if (this.elasticSyncFuture != null) {
            this.elasticSyncFuture.cancel(true);
            try {
                this.elasticSyncFuture.get();
            }
            catch (Exception e) {
                LOGGER.warn("elasticSyncFuture wait ex", e);
            }
            for (ConsumerImpl consumer : this.elasticConsumers.values()) {
                consumer.shutdown();
            }
            this.elasticConsumers.clear();
        }
    }

    @Override
    public synchronized void setRate(String topic, int tps) {
        this.topic2Rate.put(topic, tps);
        int consumerCount = this.elasticConsumers.size() + 1;
        int unitTps = tps == 0 ? 0 : Math.max(1, tps / consumerCount);
        super.setRate(topic, unitTps);
        for (ConsumerImpl consumer : this.elasticConsumers.values()) {
            RateLimiterSetter.resetRate(consumer, topic, unitTps);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncElasticCells() throws Exception {
        Map<String, String> elasticCells = this.getElasticCells();
        if (elasticCells == null) {
            return;
        }
        for (String elasticCell : elasticCells.keySet()) {
            if (this.elasticConsumers.containsKey(elasticCell)) continue;
            Properties properties = new Properties();
            properties.putAll((Map<?, ?>)this.baseProperties);
            properties.put("cell", elasticCell);
            MQUtil.replaceEndpoint(properties, elasticCells.get(elasticCell));
            try {
                ConsumerImpl consumer = new ConsumerImpl(properties);
                ElasticConsumerImpl elasticConsumerImpl = this;
                synchronized (elasticConsumerImpl) {
                    for (Map.Entry<String, Object[]> entry : this.subscription.entrySet()) {
                        if (entry.getValue()[0] instanceof String && entry.getValue()[1] instanceof MessageListener) {
                            consumer.subscribe(entry.getKey(), (String)entry.getValue()[0], (MessageListener)entry.getValue()[1]);
                            continue;
                        }
                        if (entry.getValue()[0] instanceof MessageSelector && entry.getValue()[1] instanceof MessageListener) {
                            consumer.subscribe(entry.getKey(), (MessageSelector)entry.getValue()[0], (MessageListener)entry.getValue()[1]);
                            continue;
                        }
                        if (entry.getValue()[0] instanceof String && entry.getValue()[1] instanceof GenericMessageListener) {
                            consumer.subscribe(entry.getKey(), (String)entry.getValue()[0], (GenericMessageListener)entry.getValue()[1]);
                            continue;
                        }
                        consumer.subscribe(entry.getKey(), (MessageSelector)entry.getValue()[0], (GenericMessageListener)entry.getValue()[1]);
                    }
                    consumer.start();
                    LOGGER.info("start elastic {} consumer with props[{}]", (Object)elasticCell, (Object)properties);
                    this.elasticConsumers.put(elasticCell, consumer);
                }
                for (Map.Entry entry : this.topic2Rate.entrySet()) {
                    this.setRate((String)entry.getKey(), (Integer)entry.getValue());
                }
            }
            catch (Throwable e) {
                LOGGER.warn("start elastic consumer fail, props[{}]", (Object)properties);
            }
        }
        for (String cell : this.elasticConsumers.keySet()) {
            if (elasticCells.containsKey(cell)) continue;
            ConsumerImpl consumer = this.elasticConsumers.remove(cell);
            consumer.shutdown();
            LOGGER.info("shutdown elastic {} consumer", (Object)cell);
        }
    }
}

