/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.server.session;

import java.util.ArrayDeque;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.monitor.GatewayServerMonitor;
import org.jetlinks.core.server.session.ChildrenDeviceSession;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class DefaultDeviceSessionManager
implements DeviceSessionManager {
    private final Map<String, DeviceSession> repository = new ConcurrentHashMap<String, DeviceSession>(4096);
    private final Map<String, Map<String, ChildrenDeviceSession>> children = new ConcurrentHashMap<String, Map<String, ChildrenDeviceSession>>(4096);
    private Logger log = LoggerFactory.getLogger(DefaultDeviceSessionManager.class);
    private GatewayServerMonitor gatewayServerMonitor;
    private ScheduledExecutorService executorService;
    private DeviceRegistry registry;
    private FluxProcessor<DeviceSession, DeviceSession> onDeviceRegister = EmitterProcessor.create((boolean)false);
    private FluxProcessor<DeviceSession, DeviceSession> onDeviceUnRegister = EmitterProcessor.create((boolean)false);
    private String serverId;
    private Queue<Runnable> scheduleJobQueue = new ArrayDeque<Runnable>();
    private Map<String, LongAdder> transportCounter = new ConcurrentHashMap<String, LongAdder>();
    private Map<String, Long> transportLimits = new ConcurrentHashMap<String, Long>();

    public void setTransportLimit(Transport transport, long limit) {
        this.transportLimits.put(transport.getId(), limit);
    }

    public void shutdown() {
        this.repository.values().parallelStream().map(DeviceSession::getId).forEach(this::unregister);
    }

    public boolean isOutOfMaximumSessionLimit(Transport transport) {
        long max = this.getMaximumSession(transport);
        return max > 0L && this.getCurrentSession(transport) >= max;
    }

    public long getMaximumSession(Transport transport) {
        Long counter = this.transportLimits.get(transport.getId());
        return counter == null ? -1L : counter;
    }

    public long getCurrentSession(Transport transport) {
        LongAdder counter = this.transportCounter.get(transport.getId());
        return counter == null ? 0L : counter.longValue();
    }

    public Mono<Long> checkSession() {
        AtomicLong startWith = new AtomicLong();
        return Flux.fromIterable(this.repository.values()).distinct().publishOn(Schedulers.parallel()).filterWhen(session -> {
            if (!session.isAlive()) {
                return Mono.just((Object)true);
            }
            return session.getOperator().getConnectionServerId().switchIfEmpty(Mono.just((Object)"")).filter(s -> !this.serverId.equals(s)).doOnNext(ignore -> this.log.warn("device [{}] state error", (Object)session.getDeviceId())).flatMap(ignore -> session.getOperator().online(this.serverId, session.getId())).thenReturn((Object)false);
        }).map(DeviceSession::getId).doOnNext(this::unregister).collect(Collectors.counting()).doOnNext(l -> {
            if (this.log.isInfoEnabled() && l > 0L) {
                this.log.info("expired sessions:{}", l);
            }
        }).name("session_checker:".concat(this.serverId)).metrics().doOnError(err -> this.log.error(err.getMessage(), err)).doOnSubscribe(subscription -> {
            this.log.info("start check session");
            startWith.set(System.currentTimeMillis());
        }).doFinally(s -> {
            this.transportCounter.forEach((transport, number) -> this.gatewayServerMonitor.metrics().reportSession(transport, number.intValue()));
            Runnable runnable = this.scheduleJobQueue.poll();
            while (runnable != null) {
                runnable.run();
                runnable = this.scheduleJobQueue.poll();
            }
            if (this.log.isInfoEnabled()) {
                this.log.info("check session complete,current server sessions:{}.use time:{}ms.", this.transportCounter, (Object)(System.currentTimeMillis() - startWith.get()));
            }
        });
    }

    public void init() {
        Objects.requireNonNull(this.gatewayServerMonitor, "gatewayServerMonitor");
        Objects.requireNonNull(this.registry, "registry");
        if (this.executorService == null) {
            this.executorService = Executors.newSingleThreadScheduledExecutor();
        }
        this.serverId = this.gatewayServerMonitor.getCurrentServerId();
        this.executorService.scheduleAtFixedRate(() -> this.checkSession().subscribe(), 10L, 30L, TimeUnit.SECONDS);
    }

    public DeviceSession getSession(String clientId) {
        DeviceSession session = this.repository.get(clientId);
        if (session == null || !session.isAlive()) {
            return null;
        }
        return session;
    }

    public ChildrenDeviceSession getSession(String deviceId, String childrenId) {
        return Optional.ofNullable(this.children.get(deviceId)).map(map -> (ChildrenDeviceSession)map.get(childrenId)).filter(ChildrenDeviceSession::isAlive).orElse(null);
    }

    public Mono<ChildrenDeviceSession> registerChildren(String deviceId, String childrenDeviceId) {
        return Mono.defer(() -> {
            DeviceSession session = this.getSession(deviceId);
            if (session == null) {
                this.log.warn("device[{}] session not alive", (Object)deviceId);
                return Mono.empty();
            }
            return this.registry.getDevice(childrenDeviceId).switchIfEmpty(Mono.fromRunnable(() -> this.log.warn("children device [{}] not fond in registry", (Object)childrenDeviceId))).flatMap(deviceOperator -> deviceOperator.online(session.getServerId().orElse(this.serverId), session.getId()).thenReturn((Object)new ChildrenDeviceSession(childrenDeviceId, session, deviceOperator))).doOnSuccess(s -> this.children.computeIfAbsent(deviceId, __ -> new ConcurrentHashMap()).put(childrenDeviceId, s));
        });
    }

    public Mono<ChildrenDeviceSession> unRegisterChildren(String deviceId, String childrenId) {
        return Mono.justOrEmpty(this.children.get(deviceId)).flatMap(map -> Mono.justOrEmpty(map.remove(childrenId))).doOnNext(ChildrenDeviceSession::close).flatMap(session -> session.getOperator().offline().doFinally(s -> {
            if (this.onDeviceRegister.hasDownstreams()) {
                this.onDeviceRegister.onNext(session);
            }
        }).thenReturn(session));
    }

    public DeviceSession replace(DeviceSession oldSession, DeviceSession newSession) {
        DeviceSession old = this.repository.put(oldSession.getDeviceId(), newSession);
        if (old != null && !old.getId().equals(old.getDeviceId())) {
            this.repository.put(oldSession.getId(), newSession);
        }
        return newSession;
    }

    public DeviceSession register(DeviceSession session) {
        DeviceSession old = this.repository.put(session.getDeviceId(), session);
        if (old != null && !old.getId().equals(old.getDeviceId())) {
            this.repository.remove(old.getId());
        }
        if (!session.getId().equals(session.getDeviceId())) {
            this.repository.put(session.getId(), session);
        }
        if (null != old) {
            this.log.warn("device[{}] session exists,disconnect old session:{}", (Object)old.getDeviceId(), (Object)session);
            this.scheduleJobQueue.add(() -> ((DeviceSession)old).close());
        } else {
            this.transportCounter.computeIfAbsent(session.getTransport().getId(), transport -> new LongAdder()).increment();
        }
        session.getOperator().online(session.getServerId().orElse(this.serverId), session.getId()).doFinally(s -> {
            if (this.onDeviceRegister.hasDownstreams()) {
                this.onDeviceRegister.onNext((Object)session);
            }
        }).subscribe();
        return old;
    }

    public Flux<DeviceSession> onRegister() {
        return this.onDeviceRegister.map(Function.identity()).doOnError(err -> this.log.error(err.getMessage(), err));
    }

    public Flux<DeviceSession> onUnRegister() {
        return this.onDeviceUnRegister.map(Function.identity()).doOnError(err -> this.log.error(err.getMessage(), err));
    }

    public Flux<DeviceSession> getAllSession() {
        return Flux.fromIterable(this.repository.values()).distinct(DeviceSession::getDeviceId);
    }

    public boolean sessionIsAlive(String deviceId) {
        return this.getSession(deviceId) != null || this.children.values().stream().anyMatch(r -> {
            DeviceSession session = (DeviceSession)r.get(deviceId);
            return session != null && session.isAlive();
        });
    }

    public DeviceSession unregister(String idOrDeviceId) {
        DeviceSession session = this.repository.remove(idOrDeviceId);
        if (null != session) {
            if (!session.getId().equals(session.getDeviceId())) {
                this.repository.remove(session.getId().equals(idOrDeviceId) ? session.getDeviceId() : session.getId());
            }
            this.transportCounter.computeIfAbsent(session.getTransport().getId(), transport -> new LongAdder()).decrement();
            session.getOperator().offline().doFinally(s -> {
                if (this.onDeviceUnRegister.hasDownstreams()) {
                    this.onDeviceUnRegister.onNext((Object)session);
                }
            }).subscribe();
            Mono.justOrEmpty(this.children.remove(session.getDeviceId())).flatMapIterable(Map::values).flatMap(childrenDeviceSession -> childrenDeviceSession.getOperator().offline().doFinally(s -> {
                if (this.onDeviceUnRegister.hasDownstreams()) {
                    this.onDeviceUnRegister.onNext(childrenDeviceSession);
                }
                this.scheduleJobQueue.add(() -> ((ChildrenDeviceSession)childrenDeviceSession).close());
            })).subscribe();
            this.scheduleJobQueue.add(() -> ((DeviceSession)session).close());
        }
        return session;
    }

    public Logger getLog() {
        return this.log;
    }

    public void setLog(Logger log) {
        this.log = log;
    }

    public GatewayServerMonitor getGatewayServerMonitor() {
        return this.gatewayServerMonitor;
    }

    public void setGatewayServerMonitor(GatewayServerMonitor gatewayServerMonitor) {
        this.gatewayServerMonitor = gatewayServerMonitor;
    }

    public ScheduledExecutorService getExecutorService() {
        return this.executorService;
    }

    public void setExecutorService(ScheduledExecutorService executorService) {
        this.executorService = executorService;
    }

    public DeviceRegistry getRegistry() {
        return this.registry;
    }

    public void setRegistry(DeviceRegistry registry) {
        this.registry = registry;
    }

    public Map<String, Long> getTransportLimits() {
        return this.transportLimits;
    }

    public void setTransportLimits(Map<String, Long> transportLimits) {
        this.transportLimits = transportLimits;
    }
}

