/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.ipc;

import io.netty.buffer.ByteBuf;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetlinks.core.Payload;
import org.jetlinks.core.codec.Decoder;
import org.jetlinks.core.codec.Encoder;
import org.jetlinks.core.codec.defaults.DirectCodec;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.ipc.IpcCode;
import org.jetlinks.core.ipc.IpcDefinition;
import org.jetlinks.core.ipc.IpcException;
import org.jetlinks.core.ipc.IpcInvoker;
import org.jetlinks.supports.ipc.IpcRequest;
import org.jetlinks.supports.ipc.IpcRequestHandler;
import org.jetlinks.supports.ipc.IpcResponse;
import org.jetlinks.supports.ipc.RequestIdSupplier;
import org.jetlinks.supports.ipc.RequestType;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class EventBusIpcRequester<REQ, RES>
implements IpcInvoker<REQ, RES> {
    private final int id;
    private final String name;
    private final EventBus eventBus;
    private final IpcDefinition<REQ, RES> definition;
    private final Disposable.Composite disposable = Disposables.composite();
    private final Map<Integer, IpcRequestHandler<RES>> pending = new ConcurrentHashMap<Integer, IpcRequestHandler<RES>>();
    private final RequestIdSupplier requestIdInc = new RequestIdSupplier();
    private final String sendTopic;
    private final Logger log;

    EventBusIpcRequester(int id, String name, EventBus eventBus, IpcDefinition<REQ, RES> definition) {
        this.id = id;
        this.name = name;
        this.eventBus = eventBus;
        this.definition = definition;
        this.sendTopic = "/_ipc/" + definition.getAddress().replace("/", "-") + "/" + name;
        this.log = LoggerFactory.getLogger((String)("ipc.requester." + definition.getAddress() + "." + name));
        this.init();
    }

    void init() {
        String replyTopic = this.sendTopic + "/" + this.id + "/_reply";
        String subscriberId = String.join((CharSequence)"-", "ipc", String.valueOf(this.id), this.name, "handler");
        this.disposable.add(this.eventBus.subscribe(Subscription.builder().subscriberId(subscriberId).topics(new String[]{replyTopic}).broker().local().shared().build()).doOnCancel(() -> this.log.debug("cancel accept ipc[{}] response", (Object)this.id)).subscribe(this::handleReply));
    }

    public Mono<RES> request() {
        return this.doRequestWithHandler(RequestType.noArgRequest, null).flatMap(IpcRequestHandler::handleRequest);
    }

    public Mono<RES> request(REQ req) {
        return this.doRequestWithHandler(RequestType.request, req).flatMap(IpcRequestHandler::handleRequest);
    }

    public Flux<RES> requestStream(REQ req) {
        return this.doRequestWithHandler(RequestType.requestStream, req).flatMapMany(IpcRequestHandler::handleStream);
    }

    public Flux<RES> requestStream() {
        return this.doRequestWithHandler(RequestType.noArgRequestStream, null).flatMapMany(IpcRequestHandler::handleStream);
    }

    public Flux<RES> requestChannel(Publisher<REQ> req) {
        return this.doRequestChannel(req).handleStream();
    }

    public Mono<Void> fireAndForget() {
        return this.doRequest(RequestType.noArgFireAndForget, 0, null);
    }

    public Mono<Void> fireAndForget(REQ req) {
        return this.doRequest(RequestType.fireAndForget, 0, req);
    }

    Mono<Void> doRequest(RequestType requestType, int requestId, REQ request) {
        return this.doRequest(requestType, requestId, -1, request);
    }

    Mono<Void> doRequest(RequestType requestType, int requestId, int seq, REQ request) {
        this.log.trace("do ipc request {} {}", (Object)requestType, (Object)requestId);
        return this.eventBus.publish(this.sendTopic, this.encodeRequest(requestType, requestId, seq, request)).doOnNext(i -> {
            if (i == 0L) {
                throw new IpcException(IpcCode.ipcServiceUnavailable, "Service " + this.name + " Unavailable");
            }
            if (i > 1L) {
                this.log.warn("service {} request {} has multi({}) producer", new Object[]{requestType, request, i});
            }
        }).then();
    }

    IpcRequestHandler<RES> newHandler(int requestId) {
        IpcRequestHandler handler = new IpcRequestHandler();
        IpcRequestHandler old = this.pending.put(requestId, handler);
        if (old != null) {
            this.log.warn("repeat request id :{}", (Object)requestId);
            old.complete();
        }
        return handler.doOnDispose(() -> this.pending.remove(requestId));
    }

    Mono<IpcRequestHandler<RES>> doRequestWithHandler(RequestType requestType, REQ request) {
        int requestId = this.nextRequestId();
        IpcRequestHandler<RES> handler = this.newHandler(requestId);
        return this.doRequest(requestType, requestId, request).thenReturn(handler);
    }

    IpcRequestHandler<RES> doRequestChannel(Publisher<REQ> channel) {
        int requestId = this.nextRequestId();
        IpcRequestHandler handler = this.newHandler(requestId);
        AtomicInteger seq = new AtomicInteger();
        handler.doOnDispose(this.eventBus.publish(this.sendTopic, (Encoder)DirectCodec.instance(), (Publisher)Flux.from(channel).index().map(request -> {
            int sqlVal = ((Long)request.getT1()).intValue();
            seq.set(sqlVal);
            return this.encodeRequest(RequestType.requestChannel, requestId, sqlVal, request.getT2());
        }).doFinally(s -> this.doRequest(RequestType.cancel, requestId, seq.get(), null).subscribe())).doOnNext(len -> {
            if (len == 0L) {
                handler.error((Throwable)new IpcException(IpcCode.ipcServiceUnavailable));
            }
        }).subscribe());
        return handler;
    }

    Payload encodeRequest(RequestType type, int messageId, int seq, REQ data) {
        return Payload.of((ByteBuf)IpcRequest.of(type, this.id, messageId, seq, data).toByteBuf((Encoder<REQ>)this.definition.requestCodec()));
    }

    void handleReply(TopicPayload payload) {
        try {
            IpcResponse response = IpcResponse.decode((Payload)payload, this.definition.responseCodec(), (Decoder<Throwable>)this.definition.errorCodec());
            this.log.trace("handle ipc response {} id:{} seq:{}", new Object[]{response.getType(), response.getMessageId(), response.getSeq()});
            IpcRequestHandler handler = this.pending.get(response.getMessageId());
            if (handler == null) {
                this.log.debug("unknown response {}", response);
            } else {
                handler.handle(response);
            }
        }
        catch (Throwable throwable) {
            this.log.error("handle response error", throwable);
        }
    }

    public int nextRequestId() {
        return this.requestIdInc.nextId(this.pending::containsKey);
    }

    public void dispose() {
        this.pending.values().forEach(IpcRequestHandler::dispose);
        this.pending.clear();
        this.disposable.dispose();
    }

    public String getName() {
        return this.name;
    }
}

