/*
 * Decompiled with CFR 0.152.
 */
package io.github.pnoker.common.mqtt.service.job;

import io.github.pnoker.common.mqtt.entity.MqttMessage;
import io.github.pnoker.common.mqtt.service.MqttReceiveService;
import jakarta.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;

@Component
public class MqttScheduleJob
extends QuartzJobBean {
    private static final Logger log = LoggerFactory.getLogger(MqttScheduleJob.class);
    public static final ReentrantReadWriteLock messageLock = new ReentrantReadWriteLock();
    public static final AtomicLong messageCount = new AtomicLong(0L);
    public static final AtomicLong messageSpeed = new AtomicLong(0L);
    private static final List<MqttMessage> mqttMessages = new ArrayList<MqttMessage>();
    @Value(value="${driver.mqtt.batch.speed}")
    private Integer batchSpeed;
    @Value(value="${driver.mqtt.batch.interval}")
    private Integer interval;
    @Resource
    private MqttReceiveService mqttReceiveService;
    @Resource
    private ThreadPoolExecutor threadPoolExecutor;

    public static int getMqttMessagesSize() {
        return mqttMessages.size();
    }

    public static void clearMqttMessages() {
        mqttMessages.clear();
    }

    public static void addMqttMessages(MqttMessage mqttMessage) {
        mqttMessages.add(mqttMessage);
    }

    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        long speed = messageCount.getAndSet(0L);
        messageSpeed.set(speed);
        if ((speed /= (long)this.interval.intValue()) >= (long)this.batchSpeed.intValue()) {
            log.debug("Mqtt message receiver speed: {} /s, value size: {}, interval: {}", new Object[]{speed, MqttScheduleJob.getMqttMessagesSize(), this.interval});
        }
        this.threadPoolExecutor.execute(() -> {
            messageLock.writeLock().lock();
            if (!mqttMessages.isEmpty()) {
                this.mqttReceiveService.receiveValues(mqttMessages);
                MqttScheduleJob.clearMqttMessages();
            }
            messageLock.writeLock().unlock();
        });
    }
}

