/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.rpc;

import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.Payload;
import org.jetlinks.core.codec.Decoder;
import org.jetlinks.core.codec.Encoder;
import org.jetlinks.core.codec.defaults.DirectCodec;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.rpc.Invoker;
import org.jetlinks.core.rpc.RpcDefinition;
import org.jetlinks.core.rpc.RpcService;
import org.jetlinks.supports.rpc.RpcRequest;
import org.jetlinks.supports.rpc.RpcResult;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class EventBusRpcService
implements RpcService {
    private static final Logger log = LoggerFactory.getLogger(EventBusRpcService.class);
    private final EventBus eventBus;
    private final long requesterId = (Long)IDGenerator.SNOW_FLAKE.generate();

    public <REQ, RES> Disposable listen(RpcDefinition<REQ, RES> definition, BiFunction<String, REQ, Publisher<RES>> call) {
        return this.doListen(definition, (s, reqPublisher) -> Flux.from((Publisher)reqPublisher).flatMap(req -> (Publisher)call.apply((String)s, (Object)req)));
    }

    public <RES> Disposable listen(RpcDefinition<Void, RES> definition, Function<String, Publisher<RES>> call) {
        return this.doListen(definition, (topic, request) -> Flux.from((Publisher)request).thenMany((Publisher)call.apply((String)topic)));
    }

    private String getTopic(RpcDefinition<?, ?> definition) {
        String address = definition.getAddress();
        if (!address.startsWith("/")) {
            address = "/" + address;
        }
        if (address.endsWith("/")) {
            address = address.substring(0, address.length() - 1);
        }
        return address;
    }

    public <REQ, RES> Invoker<REQ, RES> createInvoker(final RpcDefinition<REQ, RES> definition) {
        final String reqTopic = this.getTopic(definition);
        String reqTopicRes = reqTopic + "/" + this.requesterId + "/_reply";
        final ConcurrentHashMap request = new ConcurrentHashMap();
        final Disposable disposable = this.eventBus.subscribe(Subscription.of((String)definition.getId(), (String)reqTopicRes, (Subscription.Feature[])new Subscription.Feature[]{Subscription.Feature.local, Subscription.Feature.broker})).doOnNext(payload -> {
            try {
                RpcResult result = RpcResult.parse((Payload)payload);
                log.trace("handle rpc[{}] reply {} {}", new Object[]{definition, result.getType(), result.getRequestId()});
                FluxSink sink = (FluxSink)request.get(result.getRequestId());
                if (null != sink && !sink.isCancelled()) {
                    sink.next((Object)result);
                } else {
                    log.info("discard rpc[{}] reply {} {}", new Object[]{definition, result.getType(), result.getRequestId()});
                }
            }
            finally {
                ReferenceCountUtil.safeRelease((Object)payload);
            }
        }).onErrorContinue((err, obj) -> log.error(err.getMessage(), err)).subscribe();
        return new Invoker<REQ, RES>(){

            public Flux<RES> invoke() {
                return this.invoke(null);
            }

            private Mono<Long> doSend(long id, Publisher<? extends REQ> payload) {
                if (payload instanceof Mono) {
                    return Mono.from(payload).flatMap(req -> EventBusRpcService.this.eventBus.publish(reqTopic, (Encoder)DirectCodec.INSTANCE, (Object)RpcRequest.nextAndComplete(EventBusRpcService.this.requesterId, id, definition.requestCodec().encode(req)), Schedulers.immediate()));
                }
                if (payload instanceof Flux) {
                    return ((Mono)Flux.from(payload).map(req -> RpcRequest.next(EventBusRpcService.this.requesterId, id, definition.requestCodec().encode(req))).as(req -> EventBusRpcService.this.eventBus.publish(reqTopic, (Encoder)DirectCodec.INSTANCE, (Publisher)req, Schedulers.immediate()))).doOnSuccess(v -> EventBusRpcService.this.eventBus.publish(reqTopic, (Payload)RpcRequest.complete(EventBusRpcService.this.requesterId, id), Schedulers.immediate()).subscribe());
                }
                return EventBusRpcService.this.eventBus.publish(reqTopic, (Encoder)DirectCodec.INSTANCE, (Object)RpcRequest.nextAndComplete(EventBusRpcService.this.requesterId, id, Payload.voidPayload), Schedulers.immediate());
            }

            public Flux<RES> invoke(Publisher<? extends REQ> payload) {
                return Flux.create(sink -> {
                    long id = (Long)IDGenerator.SNOW_FLAKE.generate();
                    request.put(id, sink);
                    sink.onDispose(() -> {
                        FluxSink cfr_ignored_0 = (FluxSink)request.remove(id);
                    });
                    log.trace("do invoke rpc:{},requestId:{}", (Object)definition.getAddress(), (Object)id);
                    this.doSend(id, payload).doOnNext(l -> {
                        if (l == 0L) {
                            sink.error((Throwable)new UnsupportedOperationException("no rpc service for:" + definition.getAddress()));
                        }
                    }).doOnError(arg_0 -> ((FluxSink)sink).error(arg_0)).subscribe();
                }).handle((res, sink) -> {
                    try {
                        if (res.getType() == RpcResult.Type.RESULT_AND_COMPLETE) {
                            Object r = definition.responseCodec().decode((Payload)res);
                            if (r != null) {
                                sink.next(r);
                            }
                            sink.complete();
                        } else if (res.getType() == RpcResult.Type.RESULT) {
                            Object r = definition.responseCodec().decode((Payload)res);
                            if (r != null) {
                                sink.next(r);
                            }
                        } else if (res.getType() == RpcResult.Type.COMPLETE) {
                            sink.complete();
                        } else if (res.getType() == RpcResult.Type.ERROR) {
                            Throwable e = (Throwable)definition.errorCodec().decode((Payload)res);
                            if (e != null) {
                                sink.error(e);
                            } else {
                                sink.complete();
                            }
                        }
                    }
                    finally {
                        ReferenceCountUtil.safeRelease((Object)res);
                    }
                }).timeout(Duration.ofSeconds(10L), (Publisher)Mono.error(() -> new TimeoutException("invoke " + definition + "timeout")));
            }

            public void dispose() {
                disposable.dispose();
            }

            public boolean isDisposed() {
                return disposable.isDisposed();
            }
        };
    }

    protected Mono<Void> reply(String topic, RpcResult result) {
        return this.eventBus.publish(topic, (Payload)result, Schedulers.immediate()).doOnNext(i -> {
            if (i == 0L) {
                log.warn("reply rpc request {} requestId:{} failed: no listener[{}]", new Object[]{result.getType(), result.getRequestId(), topic});
                return;
            }
            log.trace("reply rpc request {} requestId:{}", (Object)result.getType(), (Object)result.getRequestId());
        }).then();
    }

    private <REQ, RES> Disposable doListen(RpcDefinition<REQ, RES> definition, BiFunction<String, Publisher<REQ>, Publisher<RES>> invokeResult) {
        ConcurrentHashMap request = new ConcurrentHashMap();
        return this.eventBus.subscribe(Subscription.of((String)definition.getId(), (String)definition.getAddress(), (Subscription.Feature[])new Subscription.Feature[]{Subscription.Feature.local, Subscription.Feature.broker})).map(RpcRequest::parse).doOnCancel(request::clear).subscribe(_req -> request.computeIfAbsent(_req.getRequestId(), requestId -> new PendingRequest(_req.getRequesterId(), (long)requestId, definition, invokeResult, () -> {
            PendingRequest cfr_ignored_0 = (PendingRequest)request.remove(requestId);
        })).next((RpcRequest)_req));
    }

    @ConstructorProperties(value={"eventBus"})
    public EventBusRpcService(EventBus eventBus) {
        this.eventBus = eventBus;
    }

    private class PendingRequest<REQ, RES> {
        long requestId;
        long requesterId;
        String reqTopicRes;
        String reqTopic;
        RpcDefinition<REQ, RES> definition;
        BiFunction<String, Publisher<REQ>, Publisher<RES>> invoker;
        EmitterProcessor<REQ> processor = EmitterProcessor.create((int)Integer.MAX_VALUE);
        FluxSink<REQ> sink = this.processor.sink();
        boolean started = false;

        public PendingRequest(long requesterId, long requestId, RpcDefinition<REQ, RES> definition, BiFunction<String, Publisher<REQ>, Publisher<RES>> invoker, Disposable disposable) {
            this.requestId = requestId;
            this.requesterId = requesterId;
            this.reqTopic = EventBusRpcService.this.getTopic(definition);
            this.reqTopicRes = this.reqTopic + "/" + requesterId + "/_reply";
            this.definition = definition;
            this.invoker = invoker;
            this.doStart();
            this.sink.onDispose(disposable);
        }

        void doStart() {
            if (this.started) {
                return;
            }
            log.trace("handle rpc request {},requestId:{}", this.definition, (Object)this.requestId);
            this.started = true;
            Flux.from(this.invoker.apply(this.reqTopic, (Publisher<REQ>)this.processor)).flatMap(res -> EventBusRpcService.this.reply(this.reqTopicRes, RpcResult.result(this.requestId, this.definition.responseCodec().encode(res)))).doOnComplete(() -> EventBusRpcService.this.reply(this.reqTopicRes, RpcResult.complete(this.requestId)).subscribe()).doOnError(e -> {
                log.error(e.getMessage(), e);
                EventBusRpcService.this.reply(this.reqTopicRes, RpcResult.error(this.requestId, this.definition.errorCodec().encode(e))).subscribe();
            }).subscribe();
        }

        void release() {
            this.processor.onComplete();
        }

        void next(RpcRequest req) {
            try {
                if (req.getType() == RpcRequest.Type.COMPLETE) {
                    this.sink.complete();
                    return;
                }
                Object v = req.decode((Decoder)this.definition.requestCodec(), false);
                if (v != null) {
                    this.sink.next(v);
                }
                if (!(v instanceof ReferenceCounted)) {
                    ReferenceCountUtil.safeRelease((Object)req);
                }
                if (req.getType() == RpcRequest.Type.NEXT_AND_END) {
                    this.sink.complete();
                }
            }
            catch (Throwable e) {
                log.error(e.getMessage(), e);
                this.sink.error(e);
            }
            finally {
                ReferenceCountUtil.safeRelease((Object)req);
            }
        }
    }
}

