/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.plugin.registry.mysql.task;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlOperator;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscribeDataManager
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeDataManager.class);
    private final MysqlOperator mysqlOperator;
    private final MysqlRegistryProperties registryProperties;
    private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<String, List<SubscribeListener>>();
    private final ScheduledExecutorService dataSubscribeCheckThreadPool;
    private final Map<String, MysqlRegistryData> mysqlRegistryDataMap = new ConcurrentHashMap<String, MysqlRegistryData>();

    public SubscribeDataManager(MysqlRegistryProperties registryProperties, MysqlOperator mysqlOperator) {
        this.registryProperties = registryProperties;
        this.mysqlOperator = mysqlOperator;
        this.dataSubscribeCheckThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("MysqlRegistrySubscribeDataCheckThread").setDaemon(true).build());
    }

    public void start() {
        this.dataSubscribeCheckThreadPool.scheduleWithFixedDelay(new RegistrySubscribeDataCheckTask(this.dataSubScribeMap, this.mysqlOperator, this.mysqlRegistryDataMap), this.registryProperties.getTermRefreshInterval().toMillis(), this.registryProperties.getTermRefreshInterval().toMillis(), TimeUnit.MILLISECONDS);
    }

    public void addListener(String path, SubscribeListener subscribeListener) {
        this.dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList()).add(subscribeListener);
    }

    public void removeListener(String path) {
        this.dataSubScribeMap.remove(path);
    }

    public String getData(String path) {
        MysqlRegistryData mysqlRegistryData = this.mysqlRegistryDataMap.get(path);
        if (mysqlRegistryData == null) {
            return null;
        }
        return mysqlRegistryData.getData();
    }

    @Override
    public void close() {
        this.dataSubscribeCheckThreadPool.shutdownNow();
        this.dataSubScribeMap.clear();
    }

    static class RegistrySubscribeDataCheckTask
    implements Runnable {
        private final Map<String, List<SubscribeListener>> dataSubScribeMap;
        private final MysqlOperator mysqlOperator;
        private final Map<String, MysqlRegistryData> mysqlRegistryDataMap;

        @Override
        public void run() {
            try {
                Map currentMysqlDataMap = this.mysqlOperator.queryAllMysqlRegistryData().stream().collect(Collectors.toMap(MysqlRegistryData::getKey, Function.identity()));
                ArrayList<MysqlRegistryData> addedData = new ArrayList<MysqlRegistryData>();
                ArrayList<MysqlRegistryData> deletedData = new ArrayList<MysqlRegistryData>();
                ArrayList<MysqlRegistryData> updatedData = new ArrayList<MysqlRegistryData>();
                for (Map.Entry entry : currentMysqlDataMap.entrySet()) {
                    MysqlRegistryData newData = (MysqlRegistryData)entry.getValue();
                    MysqlRegistryData oldData = this.mysqlRegistryDataMap.get(entry.getKey());
                    if (oldData == null) {
                        addedData.add(newData);
                        continue;
                    }
                    if (((MysqlRegistryData)entry.getValue()).getLastUpdateTime().equals(oldData.getLastUpdateTime())) continue;
                    updatedData.add(newData);
                }
                for (Map.Entry<String, Object> entry : this.mysqlRegistryDataMap.entrySet()) {
                    if (currentMysqlDataMap.containsKey(entry.getKey())) continue;
                    deletedData.add((MysqlRegistryData)entry.getValue());
                }
                this.mysqlRegistryDataMap.clear();
                this.mysqlRegistryDataMap.putAll(currentMysqlDataMap);
                for (Map.Entry<String, Object> entry : this.dataSubScribeMap.entrySet()) {
                    String subscribeKey = entry.getKey();
                    List subscribeListeners = (List)entry.getValue();
                    this.triggerListener(addedData, subscribeKey, subscribeListeners, Event.Type.ADD);
                    this.triggerListener(deletedData, subscribeKey, subscribeListeners, Event.Type.REMOVE);
                    this.triggerListener(updatedData, subscribeKey, subscribeListeners, Event.Type.UPDATE);
                }
            }
            catch (Exception e) {
                LOGGER.error("Query data from mysql registry error");
            }
        }

        private void triggerListener(List<MysqlRegistryData> dataList, String subscribeKey, List<SubscribeListener> subscribeListeners, Event.Type type) {
            for (MysqlRegistryData data : dataList) {
                if (!data.getKey().startsWith(subscribeKey)) continue;
                subscribeListeners.forEach(subscribeListener -> subscribeListener.notify(new Event(data.getKey(), data.getKey(), data.getData(), type)));
            }
        }

        @Generated
        public RegistrySubscribeDataCheckTask(Map<String, List<SubscribeListener>> dataSubScribeMap, MysqlOperator mysqlOperator, Map<String, MysqlRegistryData> mysqlRegistryDataMap) {
            this.dataSubScribeMap = dataSubScribeMap;
            this.mysqlOperator = mysqlOperator;
            this.mysqlRegistryDataMap = mysqlRegistryDataMap;
        }
    }
}

