/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.device;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.device.ReplyFailureHandler;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.server.MessageHandler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;

public class StandaloneDeviceMessageBroker
implements DeviceOperationBroker,
MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(StandaloneDeviceMessageBroker.class);
    private final FluxProcessor<Message, Message> messageEmitterProcessor;
    private final FluxSink<Message> sink;
    private final Map<String, FluxProcessor<DeviceMessageReply, DeviceMessageReply>> replyProcessor = new ConcurrentHashMap<String, FluxProcessor<DeviceMessageReply, DeviceMessageReply>>();
    private final Map<String, AtomicInteger> partCache = new ConcurrentHashMap<String, AtomicInteger>();
    private ReplyFailureHandler replyFailureHandler = (error, message) -> log.warn("unhandled reply message:{}", (Object)message, (Object)error);
    private final Map<String, Function<Publisher<String>, Flux<DeviceStateInfo>>> stateHandler = new ConcurrentHashMap<String, Function<Publisher<String>, Flux<DeviceStateInfo>>>();

    public StandaloneDeviceMessageBroker() {
        this((FluxProcessor<Message, Message>)EmitterProcessor.create((boolean)false));
    }

    public StandaloneDeviceMessageBroker(FluxProcessor<Message, Message> processor) {
        this.messageEmitterProcessor = processor;
        this.sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
    }

    @Override
    public Flux<Message> handleSendToDeviceMessage(String serverId) {
        return this.messageEmitterProcessor.map(Function.identity());
    }

    @Override
    public Disposable handleGetDeviceState(String serverId, Function<Publisher<String>, Flux<DeviceStateInfo>> stateMapper) {
        this.stateHandler.put(serverId, stateMapper);
        return () -> this.stateHandler.remove(serverId);
    }

    @Override
    public Flux<DeviceStateInfo> getDeviceState(String serviceId, Collection<String> deviceIdList) {
        return Mono.justOrEmpty(this.stateHandler.get(serviceId)).flatMapMany(fun -> (Flux)fun.apply(Flux.fromIterable((Iterable)deviceIdList)));
    }

    @Override
    public Mono<Boolean> reply(DeviceMessageReply message) {
        return Mono.defer(() -> {
            String messageId = message.getMessageId();
            if (StringUtils.isEmpty((Object)messageId)) {
                log.warn("reply message messageId is empty: {}", (Object)message);
                return Mono.just((Object)false);
            }
            String partMsgId = message.getHeader(Headers.fragmentBodyMessageId).orElse(null);
            if (partMsgId != null) {
                FluxProcessor<DeviceMessageReply, DeviceMessageReply> processor = this.replyProcessor.getOrDefault(partMsgId, this.replyProcessor.get(messageId));
                if (processor == null || processor.isDisposed()) {
                    this.replyFailureHandler.handle(new NullPointerException("no reply handler"), message);
                    this.replyProcessor.remove(partMsgId);
                    return Mono.just((Object)false);
                }
                int partTotal = message.getHeader(Headers.fragmentNumber).orElse(1);
                AtomicInteger counter = this.partCache.computeIfAbsent(partMsgId, ignore -> new AtomicInteger(partTotal));
                processor.onNext((Object)message);
                if (counter.decrementAndGet() <= 0) {
                    processor.onComplete();
                    this.replyProcessor.remove(partMsgId);
                }
                return Mono.just((Object)true);
            }
            FluxProcessor<DeviceMessageReply, DeviceMessageReply> processor = this.replyProcessor.get(messageId);
            if (processor == null || processor.isDisposed()) {
                this.replyProcessor.remove(messageId);
                this.replyFailureHandler.handle(new NullPointerException("no reply handler"), message);
                return Mono.just((Object)false);
            }
            processor.onNext((Object)message);
            processor.onComplete();
            return Mono.just((Object)true);
        }).doOnError(err -> this.replyFailureHandler.handle((Throwable)err, message));
    }

    @Override
    public Flux<DeviceMessageReply> handleReply(String deviceId, String messageId, Duration timeout) {
        return this.replyProcessor.computeIfAbsent(messageId, ignore -> UnicastProcessor.create()).timeout(timeout, (Publisher)Mono.error(() -> new DeviceOperationException(ErrorCode.TIME_OUT))).doFinally(signal -> this.replyProcessor.remove(messageId));
    }

    @Override
    public Mono<Integer> send(String serverId, Publisher<? extends Message> message) {
        if (!this.messageEmitterProcessor.hasDownstreams()) {
            return Mono.just((Object)0);
        }
        return Flux.from(message).doOnNext(arg_0 -> this.sink.next(arg_0)).then(Mono.just((Object)Long.valueOf(this.messageEmitterProcessor.downstreamCount()).intValue()));
    }

    @Override
    public Mono<Integer> send(Publisher<? extends BroadcastMessage> message) {
        return Mono.just((Object)0);
    }

    public void setReplyFailureHandler(ReplyFailureHandler replyFailureHandler) {
        this.replyFailureHandler = replyFailureHandler;
    }
}

