/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shenyu.register.client.server.etcd.client;

import com.coreos.jetcd.Client;
import com.coreos.jetcd.KV;
import com.coreos.jetcd.Lease;
import com.coreos.jetcd.Watch;
import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.data.KeyValue;
import com.coreos.jetcd.kv.GetResponse;
import com.coreos.jetcd.lease.LeaseGrantResponse;
import com.coreos.jetcd.options.GetOption;
import com.coreos.jetcd.options.WatchOption;
import com.coreos.jetcd.watch.WatchResponse;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.shenyu.register.client.server.etcd.client.EtcdListenHandler;
import org.apache.shenyu.register.client.server.etcd.client.Event;
import org.apache.shenyu.register.client.server.etcd.client.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EtcdClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(EtcdClient.class);
    private static final int EPHEMERAL_LEASE = 60;
    private static final int DEFAULT_CORE_POOL_SIZE = 10;
    private static final int DEFAULT_QUEUE_SIZE = 1000;
    private final ThreadPoolExecutor defaultPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1000), new ThreadFactoryBuilder().setNameFormat("etcd register center watch-%d").setDaemon(true).build());
    private final Client client;

    public EtcdClient(String urls) {
        this.client = Client.builder().endpoints(urls.split(",")).build();
        try {
            this.initLease();
        }
        catch (InterruptedException | ExecutionException e) {
            LOGGER.error("initLease error.", (Throwable)e);
        }
    }

    private void initLease() throws ExecutionException, InterruptedException {
        Lease lease = this.client.getLeaseClient();
        LeaseGrantResponse response = (LeaseGrantResponse)lease.grant(60L).get();
        long leaseId = response.getID();
        lease.keepAlive(leaseId);
    }

    public String read(String key) {
        KV kv = this.client.getKVClient();
        ByteSequence storeKey = Optional.ofNullable(key).map(ByteSequence::fromString).orElse(null);
        GetResponse response = null;
        try {
            response = (GetResponse)kv.get(storeKey).get();
        }
        catch (InterruptedException | ExecutionException e) {
            LOGGER.error("read(key:{}) error.", (Object)key, (Object)e);
        }
        if (Objects.isNull(response)) {
            return null;
        }
        LOGGER.debug(String.valueOf(response.getHeader()));
        Node info = response.getKvs().stream().map(EtcdClient::kv2NodeInfo).findFirst().orElse(null);
        assert (info != null);
        return info.getValue();
    }

    public List<String> getChildren(String path) {
        try {
            return this.listKeys(path);
        }
        catch (InterruptedException | ExecutionException e) {
            LOGGER.error("getChildren(path:{}) error.", (Object)path, (Object)e);
            return null;
        }
    }

    private List<String> listKeys(String prefix) throws ExecutionException, InterruptedException {
        KV kv = this.client.getKVClient();
        ByteSequence storePrefix = Optional.ofNullable(prefix).map(ByteSequence::fromString).orElse(null);
        GetOption option = GetOption.newBuilder().withKeysOnly(true).withPrefix(storePrefix).build();
        GetResponse response = (GetResponse)kv.get(storePrefix, option).get();
        return response.getKvs().stream().map(o -> o.getKey().toStringUtf8()).filter(k -> !k.equals(prefix)).collect(Collectors.toList());
    }

    public void subscribeChildChanges(String key, EtcdListenHandler handler) {
        this.defaultPoolExecutor.execute(() -> {
            Stoppable stoppable = new Stoppable();
            try {
                this.watchChildren(key, stoppable, handler);
            }
            catch (Exception e) {
                stoppable.stop();
                LOGGER.warn(String.format("Watch exception of %s", "/s"), (Throwable)e);
            }
        });
    }

    private void watchChildren(String key, Supplier<Boolean> exitSignSupplier, BiConsumer<Event, Node> consumer) throws InterruptedException {
        ByteSequence storeKey = Optional.ofNullable(key).map(ByteSequence::fromString).orElse(null);
        try (Watch watch = this.client.getWatchClient();
             Watch.Watcher watcher = watch.watch(storeKey, WatchOption.newBuilder().withPrefix(storeKey).build());){
            while (!exitSignSupplier.get().booleanValue()) {
                WatchResponse response = watcher.listen();
                response.getEvents().forEach(watchEvent -> {
                    Event event;
                    KeyValue keyValue = watchEvent.getKeyValue();
                    Node info = EtcdClient.kv2NodeInfo(keyValue);
                    if (watchEvent.getKeyValue().getKey().equals((Object)storeKey)) {
                        return;
                    }
                    switch (watchEvent.getEventType()) {
                        case PUT: {
                            event = Event.UPDATE;
                            break;
                        }
                        case DELETE: {
                            event = Event.DELETE;
                            break;
                        }
                        default: {
                            event = Event.UNRECOGNIZED;
                        }
                    }
                    consumer.accept(event, info);
                });
            }
        }
    }

    static Node kv2NodeInfo(KeyValue kv) {
        String key = kv.getKey().toStringUtf8();
        String value = Optional.ofNullable(kv.getValue()).map(ByteSequence::toStringUtf8).orElse("");
        return new Node(key, value, kv.getCreateRevision(), kv.getModRevision(), kv.getVersion());
    }

    public void close() {
        Optional.ofNullable(this.client).ifPresent(Client::close);
    }

    static class Stoppable
    implements Supplier<Boolean> {
        private boolean exit;

        Stoppable() {
        }

        @Override
        public Boolean get() {
            return this.exit;
        }

        void stop() {
            this.exit = true;
        }
    }
}

