/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.sofamq.client.elastic;

import com.alipay.sofa.sofamq.client.BatchConsumerImpl;
import com.alipay.sofa.sofamq.client.MQUtil;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.utils.ThreadUtils;
import io.openmessaging.api.batch.BatchMessageListener;
import io.openmessaging.api.batch.GenericBatchMessageListener;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class ElasticBatchConsumerImpl
extends BatchConsumerImpl {
    private static final ScheduledExecutorService scheduler = ThreadUtils.newSingleThreadScheduledExecutor("SofaMQElasticMetaSync", false);
    private Map<String, BatchConsumerImpl> elasticConsumers = new ConcurrentHashMap<String, BatchConsumerImpl>();
    private Properties baseProperties;
    private Map<String, Object[]> subscription = new HashMap<String, Object[]>();
    private ScheduledFuture elasticSyncFuture;

    public ElasticBatchConsumerImpl(Properties properties) {
        super(properties);
        this.baseProperties = properties;
    }

    @Override
    public void subscribe(String topic, String subExpression, BatchMessageListener listener) {
        super.subscribe(topic, subExpression, listener);
        this.subscription.put(topic, new Object[]{subExpression, listener});
        for (BatchConsumerImpl consumer : this.elasticConsumers.values()) {
            consumer.subscribe(topic, subExpression, listener);
        }
    }

    @Override
    public <T> void subscribe(String topic, String subExpression, GenericBatchMessageListener<T> listener) {
        super.subscribe(topic, subExpression, listener);
        this.subscription.put(topic, new Object[]{subExpression, listener});
        for (BatchConsumerImpl consumer : this.elasticConsumers.values()) {
            consumer.subscribe(topic, subExpression, listener);
        }
    }

    @Override
    public void unsubscribe(String topic) {
        super.unsubscribe(topic);
        this.subscription.remove(topic);
        for (BatchConsumerImpl consumer : this.elasticConsumers.values()) {
            consumer.unsubscribe(topic);
        }
    }

    @Override
    public void start() {
        super.start();
        if ("antfin".equals(this.baseProperties.getProperty("site")) && this.elasticSyncFuture == null) {
            this.elasticSyncFuture = scheduler.scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    try {
                        ElasticBatchConsumerImpl.this.syncElasticCells();
                    }
                    catch (Exception e) {
                        LOGGER.warn("sync {} elastic rule fail", (Object)ElasticBatchConsumerImpl.this.cell, (Object)e);
                    }
                }
            }, 0L, 30L, TimeUnit.SECONDS);
        }
    }

    @Override
    public void shutdown() {
        super.shutdown();
        if (this.elasticSyncFuture != null) {
            this.elasticSyncFuture.cancel(true);
            try {
                this.elasticSyncFuture.get();
            }
            catch (Exception e) {
                LOGGER.warn("elasticSyncFuture wait ex", e);
            }
            for (BatchConsumerImpl consumer : this.elasticConsumers.values()) {
                consumer.shutdown();
            }
            this.elasticConsumers.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncElasticCells() throws Exception {
        Map<String, String> elasticCells = this.getElasticCells();
        if (elasticCells == null) {
            return;
        }
        for (String elasticCell : elasticCells.keySet()) {
            if (this.elasticConsumers.containsKey(elasticCell)) continue;
            Properties properties = new Properties();
            properties.putAll((Map<?, ?>)this.baseProperties);
            properties.put("cell", elasticCell);
            MQUtil.replaceEndpoint(properties, elasticCells.get(elasticCell));
            try {
                BatchConsumerImpl consumer = new BatchConsumerImpl(properties);
                ElasticBatchConsumerImpl elasticBatchConsumerImpl = this;
                synchronized (elasticBatchConsumerImpl) {
                    for (Map.Entry<String, Object[]> entry : this.subscription.entrySet()) {
                        if (entry.getValue()[0] instanceof String && entry.getValue()[1] instanceof BatchMessageListener) {
                            consumer.subscribe(entry.getKey(), (String)entry.getValue()[0], (BatchMessageListener)entry.getValue()[1]);
                            continue;
                        }
                        consumer.subscribe(entry.getKey(), (String)entry.getValue()[0], (GenericBatchMessageListener)entry.getValue()[1]);
                    }
                    consumer.start();
                    LOGGER.info("start elastic {} consumer with props[{}]", (Object)elasticCell, (Object)properties);
                    this.elasticConsumers.put(elasticCell, consumer);
                }
            }
            catch (Throwable e) {
                LOGGER.warn("start elastic consumer fail, props[{}]", (Object)properties);
            }
        }
        for (String cell : this.elasticConsumers.keySet()) {
            if (elasticCells.containsKey(cell)) continue;
            BatchConsumerImpl consumer = this.elasticConsumers.remove(cell);
            consumer.shutdown();
            LOGGER.info("shutdown elastic {} consumer", (Object)cell);
        }
    }
}

