/*
 * Decompiled with CFR 0.152.
 */
package io.gravitee.node.monitoring.handler;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.gravitee.common.service.AbstractService;
import io.gravitee.node.api.Monitoring;
import io.gravitee.node.api.Node;
import io.gravitee.node.api.cluster.ClusterManager;
import io.gravitee.node.api.cluster.messaging.Topic;
import io.gravitee.node.api.healthcheck.HealthCheck;
import io.gravitee.node.api.infos.NodeInfos;
import io.gravitee.node.api.monitor.Monitor;
import io.gravitee.node.monitoring.NodeMonitoringService;
import io.reactivex.rxjava3.core.Completable;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageConsumer;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NodeMonitoringEventHandler
extends AbstractService<NodeMonitoringEventHandler> {
    private static final Logger log = LoggerFactory.getLogger(NodeMonitoringEventHandler.class);
    private final Vertx vertx;
    private final ClusterManager clusterManager;
    private final ObjectMapper objectMapper;
    private final Node node;
    private final NodeMonitoringService nodeMonitoringService;
    private Topic<NodeInfos> nodeInfosTopic;
    private Topic<HealthCheck> healthCheckTopic;
    private Topic<Monitor> monitorTopic;
    private String monitorSubscriptionId;
    private String healthCheckSubscription;
    private String nodeInfoSubscription;
    private MessageConsumer<NodeInfos> nodeInfosMessageConsumer;
    private MessageConsumer<HealthCheck> healthCheckMessageConsumer;
    private MessageConsumer<Monitor> monitorMessageConsumer;

    protected void doStart() throws Exception {
        super.doStart();
        this.registerClusterListener();
        this.registerInternalListener();
    }

    private void registerClusterListener() {
        this.nodeInfosTopic = this.clusterManager.topic("node-infos");
        this.nodeInfoSubscription = this.nodeInfosTopic.addMessageListener(message -> {
            log.debug("Received node infos message from cluster");
            if (this.clusterManager.self().primary()) {
                log.debug("Processing node infos message");
                this.nodeMonitoringService.createOrUpdate(this.convert((NodeInfos)message.content())).ignoreElement().onErrorResumeNext(throwable -> {
                    log.error("Unable to process node infos message", throwable);
                    return Completable.complete();
                }).subscribe();
            }
        });
        this.healthCheckTopic = this.clusterManager.topic("node-healthcheck");
        this.healthCheckSubscription = this.healthCheckTopic.addMessageListener(message -> {
            log.debug("Received health check message from cluster");
            if (this.clusterManager.self().primary()) {
                log.debug("Processing health check message");
                this.nodeMonitoringService.createOrUpdate(this.convert((HealthCheck)message.content())).ignoreElement().onErrorResumeNext(throwable -> {
                    log.error("Unable to process health check message", throwable);
                    return Completable.complete();
                }).subscribe();
            }
        });
        this.monitorTopic = this.clusterManager.topic("node-monitor");
        this.monitorSubscriptionId = this.monitorTopic.addMessageListener(message -> {
            log.debug("Received monitor message from cluster");
            if (this.clusterManager.self().primary()) {
                log.debug("Processing monitor message");
                this.nodeMonitoringService.createOrUpdate(this.convert((Monitor)message.content())).ignoreElement().onErrorResumeNext(throwable -> {
                    log.error("Unable to process monitor message", throwable);
                    return Completable.complete();
                }).subscribe();
            }
        });
    }

    private void registerInternalListener() {
        this.nodeInfosMessageConsumer = this.vertx.eventBus().localConsumer("gio:node:infos", event -> {
            log.debug("Received node infos message from internal bus");
            this.nodeInfosTopic.publish((Object)((NodeInfos)event.body()));
        });
        this.healthCheckMessageConsumer = this.vertx.eventBus().localConsumer("gio:node:healthcheck", event -> {
            log.debug("Received health check message from internal bus");
            this.healthCheckTopic.publish((Object)((HealthCheck)event.body()));
        });
        this.monitorMessageConsumer = this.vertx.eventBus().localConsumer("gio:node:monitor", event -> {
            log.debug("Received monitor message from internal bus");
            this.monitorTopic.publish((Object)((Monitor)event.body()));
        });
    }

    protected void doStop() throws Exception {
        super.doStop();
        if (this.nodeInfosMessageConsumer != null) {
            this.nodeInfosMessageConsumer.unregister();
        }
        if (this.healthCheckMessageConsumer != null) {
            this.healthCheckMessageConsumer.unregister();
        }
        if (this.monitorMessageConsumer != null) {
            this.monitorMessageConsumer.unregister();
        }
        if (this.clusterManager != null) {
            this.monitorTopic.removeMessageListener(this.nodeInfoSubscription);
            this.monitorTopic.removeMessageListener(this.healthCheckSubscription);
            this.monitorTopic.removeMessageListener(this.monitorSubscriptionId);
        }
    }

    private Monitoring convert(NodeInfos nodeInfos) {
        Monitoring monitoring = this.buildMonitoring(nodeInfos);
        monitoring.setEvaluatedAt(new Date(nodeInfos.getEvaluatedAt()));
        monitoring.setType("NODE_INFOS");
        return monitoring;
    }

    private Monitoring convert(HealthCheck healthCheck) {
        Monitoring monitoring = this.buildMonitoring(healthCheck);
        monitoring.setEvaluatedAt(new Date(healthCheck.getEvaluatedAt()));
        monitoring.setType("HEALTH_CHECK");
        return monitoring;
    }

    private Monitoring convert(Monitor monitor) {
        Monitoring monitoring = this.buildMonitoring(monitor);
        monitoring.setEvaluatedAt(new Date(monitor.getTimestamp()));
        monitoring.setType("MONITOR");
        return monitoring;
    }

    private Monitoring buildMonitoring(Object payload) {
        Monitoring monitoring = new Monitoring();
        monitoring.setNodeId(this.node.id());
        try {
            if (payload != null) {
                monitoring.setPayload(this.objectMapper.writeValueAsString(payload));
            }
        }
        catch (JsonProcessingException e) {
            log.error("An error occurred when trying to serialize monitoring payload to json");
        }
        return monitoring;
    }

    public NodeMonitoringEventHandler(Vertx vertx, ClusterManager clusterManager, ObjectMapper objectMapper, Node node, NodeMonitoringService nodeMonitoringService) {
        this.vertx = vertx;
        this.clusterManager = clusterManager;
        this.objectMapper = objectMapper;
        this.node = node;
        this.nodeMonitoringService = nodeMonitoringService;
    }
}

