/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster.event;

import io.netty.util.ReferenceCountUtil;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.ByteBufPayload;
import io.rsocket.util.DefaultPayload;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.jetlinks.core.Payload;
import org.jetlinks.core.cluster.ClusterCache;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.cluster.ServerNode;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.supports.cluster.event.RSocketAddress;
import org.jetlinks.supports.cluster.event.RSocketPayload;
import org.jetlinks.supports.cluster.event.RedisClusterEventBroker;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

public class RedisRSocketEventBroker
extends RedisClusterEventBroker {
    private static final Logger log = LoggerFactory.getLogger(RedisRSocketEventBroker.class);
    private final RSocketAddress address;
    private String serverId;
    private final ConcurrentMap<String, RSocket> sockets = new ConcurrentHashMap<String, RSocket>();
    private ClusterCache<String, RSocketAddress> addressCache;
    private final ConcurrentMap<String, RSocketAddress> remotes = new ConcurrentHashMap<String, RSocketAddress>();
    private final Map<String, EmitterProcessor<TopicPayload>> remoteSink = new ConcurrentHashMap<String, EmitterProcessor<TopicPayload>>();
    private final Map<String, EmitterProcessor<TopicPayload>> localSink = new ConcurrentHashMap<String, EmitterProcessor<TopicPayload>>();
    private final Set<String> connecting = new HashSet<String>();
    private final Map<String, Disposable> polling = new ConcurrentHashMap<String, Disposable>();

    public RedisRSocketEventBroker(ClusterManager clusterManager, ReactiveRedisConnectionFactory factory, RSocketAddress address) {
        super(clusterManager, factory);
        this.address = address;
        this.init();
    }

    private void doStartPollEvent(String remote, RSocket socket) {
        log.debug("{} start poll broker event from {}", (Object)this.serverId, (Object)remote);
        Disposable old = this.polling.remove(remote);
        if (null != old) {
            old.dispose();
        }
        this.polling.put(remote, socket.requestStream(ByteBufPayload.create((String)this.serverId)).doOnCancel(() -> {
            socket.dispose();
            log.debug("{} cancel poll broker event from {}", (Object)this.serverId, (Object)remote);
        }).subscribe(payload -> {
            try {
                EmitterProcessor<TopicPayload> processor = this.getOrCreateLocalSink(remote);
                if (!processor.hasDownstreams()) {
                    ReferenceCountUtil.safeRelease((Object)payload);
                } else {
                    String topic = payload.getMetadataUtf8();
                    processor.onNext((Object)TopicPayload.of((String)topic, (Payload)RSocketPayload.of(payload)));
                }
            }
            catch (Throwable e) {
                log.error("handle broker [{}] event error", (Object)remote, (Object)e);
                ReferenceCountUtil.safeRelease((Object)payload);
            }
        }));
    }

    public void connectRemote(String remote) {
        if (this.serverId.equals(remote)) {
            return;
        }
        if (this.connecting.contains(remote)) {
            return;
        }
        EmitterProcessor<TopicPayload> processor = this.getOrCreateLocalSink(remote);
        RSocket socket2 = (RSocket)this.sockets.get(remote);
        if (socket2 != null && !socket2.isDisposed() && processor.hasDownstreams()) {
            if (this.polling.get(remote) != null && this.polling.get(remote).isDisposed()) {
                this.doStartPollEvent(remote, socket2);
            }
            return;
        }
        this.connecting.add(remote);
        RSocketConnector.create().payloadDecoder(PayloadDecoder.ZERO_COPY).reconnect((Retry)Retry.backoff((long)10L, (Duration)Duration.ofSeconds(1L)).filter(err -> this.remotes.containsKey(remote)).doBeforeRetry(s -> {
            if (s.failure() != null) {
                RSocketAddress address = (RSocketAddress)this.remotes.get(remote);
                log.warn("reconnect rsocket event broker {}{}:{}", new Object[]{remote, address, s.failure().getMessage()});
            }
        })).connect(() -> {
            RSocketAddress address = (RSocketAddress)this.remotes.get(remote);
            if (address == null) {
                return null;
            }
            return TcpClientTransport.create((String)address.getPublicAddress(), (int)address.getPublicPort());
        }).doOnNext(socket -> {
            RSocket old = this.sockets.put(remote, (RSocket)socket);
            if (old != null && old != socket) {
                old.dispose();
            }
            this.doStartPollEvent(remote, (RSocket)socket);
        }).doOnError(err -> log.error("connect to cluster node [{}] error", (Object)remote, err)).doFinally(s -> this.connecting.remove(remote)).subscribe();
    }

    @Override
    protected void handleServerNodeLeave(ServerNode node) {
        this.remotes.clear();
        this.reloadAddresses().subscribe();
    }

    @Override
    protected void handleServerNodeJoin(ServerNode node) {
        if (!this.serverId.equals(node.getId())) {
            this.getOrCreateRemoteSink(node.getId());
            this.addressCache.get((Object)node.getId()).switchIfEmpty(Mono.delay((Duration)Duration.ofSeconds(1L)).then(this.addressCache.get((Object)node.getId()))).subscribe(address -> {
                this.remotes.put(node.getId(), (RSocketAddress)address);
                this.connectRemote(node.getId());
            });
        }
    }

    public Mono<Void> reloadAddresses() {
        return this.addressCache.entries().doOnNext(e -> {
            this.remotes.put((String)e.getKey(), (RSocketAddress)e.getValue());
            this.connectRemote((String)e.getKey());
        }).then();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Mono<io.rsocket.Payload> topicPayloadToRSocketPayload(TopicPayload payload) {
        try {
            ByteBuffer bodyBuffer = payload.getBody().nioBuffer();
            Mono mono = Mono.just((Object)DefaultPayload.create((ByteBuffer)bodyBuffer, (ByteBuffer)ByteBuffer.wrap(payload.getTopic().getBytes())));
            return mono;
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
        }
        finally {
            ReferenceCountUtil.safeRelease((Object)payload);
        }
        return Mono.empty();
    }

    public void init() {
        this.addressCache = this.clusterManager.getCache("__rsocket_addresses");
        this.serverId = this.clusterManager.getCurrentServerId();
        CloseableChannel closeableChannel = (CloseableChannel)RSocketServer.create((SocketAcceptor)SocketAcceptor.forRequestStream(payload -> {
            String broker = payload.getDataUtf8();
            log.debug("{} handle broker[{}] event request", (Object)this.serverId, (Object)broker);
            ReferenceCountUtil.safeRelease((Object)payload);
            EmitterProcessor<TopicPayload> processor = this.getOrCreateRemoteSink(broker);
            if (processor.hasDownstreams()) {
                return Flux.empty();
            }
            return processor.doOnCancel(() -> log.debug("stop handle broker[{}] event request", (Object)broker)).flatMap(this::topicPayloadToRSocketPayload);
        })).bind((ServerTransport)TcpServerTransport.create((int)this.address.getPort())).doOnError(err -> log.error(err.getMessage(), err)).block();
        if (closeableChannel == null) {
            throw new IllegalStateException("start rsocket server" + this.address + " error");
        }
        this.disposable.add((Disposable)closeableChannel);
        this.addressCache.put((Object)this.serverId, (Object)this.address).block(Duration.ofSeconds(10L));
        this.reloadAddresses().block(Duration.ofSeconds(10L));
        this.disposable.add(Flux.interval((Duration)Duration.ofSeconds(10L)).flatMap(i -> this.reloadAddresses().onErrorContinue((err, v) -> {})).subscribe());
        super.startup();
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.addressCache.remove((Object)this.serverId).block();
        this.sockets.values().forEach(RSocket::dispose);
    }

    @Override
    public void startup() {
    }

    private EmitterProcessor<TopicPayload> getOrCreateRemoteSink(String brokerId) {
        return this.remoteSink.compute(brokerId, (k, val) -> {
            if (val != null && !val.isDisposed()) {
                return val;
            }
            return EmitterProcessor.create((int)Integer.MAX_VALUE, (boolean)false);
        });
    }

    private EmitterProcessor<TopicPayload> getOrCreateLocalSink(String brokerId) {
        return this.localSink.compute(brokerId, (k, val) -> {
            if (val != null && !val.isDisposed()) {
                return val;
            }
            return EmitterProcessor.create((int)Integer.MAX_VALUE, (boolean)false);
        });
    }

    @Override
    protected Flux<TopicPayload> listen(String localId, String brokerId) {
        return Flux.merge((Publisher[])new Publisher[]{this.getOrCreateLocalSink(brokerId), super.listen(localId, brokerId)});
    }

    @Override
    protected Mono<Void> dispatch(String localId, String brokerId, TopicPayload payload) {
        if (!this.remotes.containsKey(brokerId)) {
            ReferenceCountUtil.safeRelease((Object)payload);
            return Mono.empty();
        }
        EmitterProcessor<TopicPayload> processor = this.remoteSink.get(brokerId);
        if (processor == null || !processor.hasDownstreams() || processor.isDisposed()) {
            log.debug("no rsocket broker [{}] event listener,fallback to redis", (Object)brokerId);
            this.connectRemote(brokerId);
            return super.dispatch(localId, brokerId, payload);
        }
        processor.onNext((Object)payload);
        return Mono.empty();
    }
}

