/*
 * Decompiled with CFR 0.152.
 */
package com.coreos.jetcd.internal.impl;

import com.coreos.jetcd.Lease;
import com.coreos.jetcd.api.LeaseGrantRequest;
import com.coreos.jetcd.api.LeaseGrpc;
import com.coreos.jetcd.api.LeaseKeepAliveRequest;
import com.coreos.jetcd.api.LeaseRevokeRequest;
import com.coreos.jetcd.api.LeaseTimeToLiveRequest;
import com.coreos.jetcd.exception.ErrorCode;
import com.coreos.jetcd.exception.EtcdException;
import com.coreos.jetcd.exception.EtcdExceptionFactory;
import com.coreos.jetcd.internal.impl.ClientConnectionManager;
import com.coreos.jetcd.internal.impl.Util;
import com.coreos.jetcd.lease.LeaseGrantResponse;
import com.coreos.jetcd.lease.LeaseKeepAliveResponse;
import com.coreos.jetcd.lease.LeaseKeepAliveResponseWithError;
import com.coreos.jetcd.lease.LeaseRevokeResponse;
import com.coreos.jetcd.lease.LeaseTimeToLiveResponse;
import com.coreos.jetcd.options.LeaseOption;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class LeaseImpl
implements Lease {
    private static final int FIRST_KEEPALIVE_TIMEOUT_MS = 5000;
    private final ClientConnectionManager connectionManager;
    private final LeaseGrpc.LeaseFutureStub stub;
    private final LeaseGrpc.LeaseStub leaseStub;
    private final Map<Long, KeepAlive> keepAlives = new ConcurrentHashMap<Long, KeepAlive>();
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
    private ScheduledFuture<?> keepAliveFuture;
    private ScheduledFuture<?> deadlineFuture;
    private StreamObserver<LeaseKeepAliveRequest> keepAliveRequestObserver;
    private StreamObserver<com.coreos.jetcd.api.LeaseKeepAliveResponse> keepAliveResponseObserver;
    private boolean hasKeepAliveServiceStarted = false;
    private boolean closed;

    LeaseImpl(ClientConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
        this.stub = connectionManager.newStub(LeaseGrpc::newFutureStub);
        this.leaseStub = connectionManager.newStub(LeaseGrpc::newStub);
    }

    @Override
    public CompletableFuture<LeaseGrantResponse> grant(long ttl) {
        LeaseGrantRequest leaseGrantRequest = LeaseGrantRequest.newBuilder().setTTL(ttl).build();
        return Util.toCompletableFutureWithRetry(() -> this.stub.leaseGrant(leaseGrantRequest), LeaseGrantResponse::new, Util::isRetriable, this.connectionManager.getExecutorService());
    }

    @Override
    public CompletableFuture<LeaseRevokeResponse> revoke(long leaseId) {
        LeaseRevokeRequest leaseRevokeRequest = LeaseRevokeRequest.newBuilder().setID(leaseId).build();
        return Util.toCompletableFutureWithRetry(() -> this.stub.leaseRevoke(leaseRevokeRequest), LeaseRevokeResponse::new, Util::isRetriable, this.connectionManager.getExecutorService());
    }

    @Override
    public synchronized Lease.KeepAliveListener keepAlive(long leaseId) {
        if (this.closed) {
            throw EtcdExceptionFactory.newClosedLeaseClientException();
        }
        KeepAlive keepAlive = this.keepAlives.computeIfAbsent(leaseId, key -> {
            KeepAlive ka = new KeepAlive(this.keepAlives, this, leaseId);
            long now = System.currentTimeMillis();
            ka.setDeadLine(now + 5000L);
            ka.setNextKeepAlive(now);
            return ka;
        });
        KeepAliveListenerImpl kal = new KeepAliveListenerImpl(keepAlive);
        keepAlive.addListener(kal);
        if (!this.hasKeepAliveServiceStarted) {
            this.hasKeepAliveServiceStarted = true;
            this.start();
        }
        return kal;
    }

    @Override
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        if (!this.hasKeepAliveServiceStarted) {
            return;
        }
        this.keepAliveFuture.cancel(true);
        this.deadlineFuture.cancel(true);
        this.keepAliveRequestObserver.onCompleted();
        this.keepAliveResponseObserver.onCompleted();
        this.scheduledExecutorService.shutdownNow();
        this.closeKeepAlives();
    }

    private synchronized void removeKeepAlive(long leaseId) {
        this.keepAlives.remove(leaseId);
    }

    private void closeKeepAlives() {
        LeaseKeepAliveResponseWithError errResp = new LeaseKeepAliveResponseWithError(EtcdExceptionFactory.newClosedLeaseClientException());
        this.keepAlives.values().forEach(ka -> ka.sentKeepAliveResp(errResp));
        this.keepAlives.clear();
    }

    private void start() {
        this.sendKeepAliveExecutor();
        this.deadLineExecutor();
    }

    private void reset() {
        this.keepAliveFuture.cancel(true);
        this.keepAliveRequestObserver.onCompleted();
        this.keepAliveResponseObserver.onCompleted();
        this.sendKeepAliveExecutor();
    }

    private void sendKeepAliveExecutor() {
        this.keepAliveResponseObserver = this.createResponseObserver();
        this.keepAliveRequestObserver = this.leaseStub.leaseKeepAlive(this.keepAliveResponseObserver);
        this.keepAliveFuture = this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            long now = System.currentTimeMillis();
            this.keepAlives.entrySet().stream().filter(entry -> ((KeepAlive)entry.getValue()).getNextKeepAlive() < now).map(Map.Entry::getKey).map(this::newKeepAliveRequest).forEach(arg_0 -> this.keepAliveRequestObserver.onNext(arg_0));
        }, 0L, 500L, TimeUnit.MILLISECONDS);
    }

    private StreamObserver<com.coreos.jetcd.api.LeaseKeepAliveResponse> createResponseObserver() {
        return new StreamObserver<com.coreos.jetcd.api.LeaseKeepAliveResponse>(){

            public void onNext(com.coreos.jetcd.api.LeaseKeepAliveResponse leaseKeepAliveResponse) {
                LeaseImpl.this.processKeepAliveResponse(leaseKeepAliveResponse);
            }

            public void onError(Throwable throwable) {
                LeaseImpl.this.processOnError();
            }

            public void onCompleted() {
            }
        };
    }

    private synchronized void processOnError() {
        if (this.closed) {
            return;
        }
        this.scheduledExecutorService.schedule(() -> this.reset(), 500L, TimeUnit.MILLISECONDS);
    }

    private synchronized void processKeepAliveResponse(com.coreos.jetcd.api.LeaseKeepAliveResponse leaseKeepAliveResponse) {
        if (this.closed) {
            return;
        }
        long leaseID = leaseKeepAliveResponse.getID();
        long ttl = leaseKeepAliveResponse.getTTL();
        KeepAlive ka = this.keepAlives.get(leaseID);
        if (ka == null) {
            return;
        }
        if (ttl <= 0L) {
            this.removeKeepAlive(leaseID);
            ka.sentKeepAliveResp(new LeaseKeepAliveResponseWithError(EtcdExceptionFactory.newEtcdException(ErrorCode.NOT_FOUND, "etcdserver: requested lease not found")));
            return;
        }
        long nextKeepAlive = System.currentTimeMillis() + ttl * 1000L / 3L;
        ka.setNextKeepAlive(nextKeepAlive);
        ka.setDeadLine(System.currentTimeMillis() + ttl * 1000L);
        ka.sentKeepAliveResp(new LeaseKeepAliveResponseWithError(leaseKeepAliveResponse));
    }

    private void deadLineExecutor() {
        this.deadlineFuture = this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            long now = System.currentTimeMillis();
            this.keepAlives.values().removeIf(ka -> {
                if (ka.getDeadLine() < now) {
                    ka.close();
                    return true;
                }
                return false;
            });
        }, 0L, 1000L, TimeUnit.MILLISECONDS);
    }

    @Override
    public CompletableFuture<LeaseKeepAliveResponse> keepAliveOnce(long leaseId) {
        final CompletableFuture<LeaseKeepAliveResponse> lkaFuture = new CompletableFuture<LeaseKeepAliveResponse>();
        StreamObserver<LeaseKeepAliveRequest> requestObserver = this.leaseStub.leaseKeepAlive(new StreamObserver<com.coreos.jetcd.api.LeaseKeepAliveResponse>(){

            public void onNext(com.coreos.jetcd.api.LeaseKeepAliveResponse leaseKeepAliveResponse) {
                lkaFuture.complete(new LeaseKeepAliveResponse(leaseKeepAliveResponse));
            }

            public void onError(Throwable throwable) {
                lkaFuture.completeExceptionally(EtcdExceptionFactory.toEtcdException(throwable));
            }

            public void onCompleted() {
            }
        });
        requestObserver.onNext((Object)this.newKeepAliveRequest(leaseId));
        lkaFuture.whenCompleteAsync((val, throwable) -> requestObserver.onCompleted(), (Executor)this.connectionManager.getExecutorService());
        return lkaFuture;
    }

    @Override
    public CompletableFuture<LeaseTimeToLiveResponse> timeToLive(long leaseId, LeaseOption option) {
        Preconditions.checkNotNull((Object)option, (Object)"LeaseOption should not be null");
        LeaseTimeToLiveRequest leaseTimeToLiveRequest = LeaseTimeToLiveRequest.newBuilder().setID(leaseId).setKeys(option.isAttachedKeys()).build();
        return Util.toCompletableFutureWithRetry(() -> this.stub.leaseTimeToLive(leaseTimeToLiveRequest), LeaseTimeToLiveResponse::new, Util::isRetriable, this.connectionManager.getExecutorService());
    }

    private LeaseKeepAliveRequest newKeepAliveRequest(long leaseId) {
        return LeaseKeepAliveRequest.newBuilder().setID(leaseId).build();
    }

    private static class KeepAlive {
        private final Object ownerLock;
        private long deadLine;
        private long nextKeepAlive;
        private Map<Long, KeepAlive> owner;
        private long leaseId;
        private Set<KeepAliveListenerImpl> listenersSet = Collections.newSetFromMap(new ConcurrentHashMap());

        public KeepAlive(Map<Long, KeepAlive> owner, Object ownerLock, long leaseId) {
            this.owner = owner;
            this.ownerLock = ownerLock;
            this.leaseId = leaseId;
        }

        public long getDeadLine() {
            return this.deadLine;
        }

        public void setDeadLine(long deadLine) {
            this.deadLine = deadLine;
        }

        public void addListener(KeepAliveListenerImpl listener) {
            this.listenersSet.add(listener);
        }

        public long getNextKeepAlive() {
            return this.nextKeepAlive;
        }

        public void setNextKeepAlive(long nextKeepAlive) {
            this.nextKeepAlive = nextKeepAlive;
        }

        public void sentKeepAliveResp(LeaseKeepAliveResponseWithError lkae) {
            this.listenersSet.forEach(l -> l.enqueue(lkae));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void removeListener(KeepAliveListenerImpl l) {
            this.listenersSet.remove(l);
            Object object = this.ownerLock;
            synchronized (object) {
                if (this.listenersSet.isEmpty()) {
                    this.owner.remove(this.leaseId);
                }
            }
        }

        public void close() {
            this.listenersSet.forEach(l -> l.close());
            this.listenersSet.clear();
        }
    }

    private static class KeepAliveListenerImpl
    implements Lease.KeepAliveListener {
        private final Object closedLock = new Object();
        private BlockingQueue<LeaseKeepAliveResponseWithError> queue = new LinkedBlockingDeque<LeaseKeepAliveResponseWithError>(1);
        private ExecutorService service = Executors.newSingleThreadExecutor();
        private boolean closed = false;
        private KeepAlive owner;

        public KeepAliveListenerImpl(KeepAlive owner) {
            this.owner = owner;
        }

        public void enqueue(LeaseKeepAliveResponseWithError lkae) {
            if (this.isClosed()) {
                return;
            }
            if (lkae.error != null) {
                this.queue.clear();
            }
            this.queue.offer(lkae);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public synchronized LeaseKeepAliveResponse listen() throws InterruptedException {
            if (this.isClosed()) {
                throw EtcdExceptionFactory.newClosedKeepAliveListenerException();
            }
            Future<LeaseKeepAliveResponse> future = this.service.submit(() -> {
                LeaseKeepAliveResponseWithError lkae = this.queue.take();
                if (lkae.error != null) {
                    throw lkae.error;
                }
                return new LeaseKeepAliveResponse(lkae.leaseKeepAliveResponse);
            });
            try {
                return future.get();
            }
            catch (ExecutionException e) {
                Object object = this.closedLock;
                synchronized (object) {
                    if (this.isClosed()) {
                        throw EtcdExceptionFactory.newClosedKeepAliveListenerException();
                    }
                }
                Throwable t = e.getCause();
                if (t instanceof EtcdException) {
                    throw (EtcdException)t;
                }
                throw EtcdExceptionFactory.toEtcdException(e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw e;
            }
            catch (RejectedExecutionException e) {
                throw EtcdExceptionFactory.newClosedKeepAliveListenerException();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean isClosed() {
            Object object = this.closedLock;
            synchronized (object) {
                return this.closed;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            Object object = this.closedLock;
            synchronized (object) {
                this.closed = true;
                this.owner.removeListener(this);
                this.service.shutdownNow();
            }
        }
    }
}

