/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.FlushOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

public class RocksDBStore
implements KeyValueStore<Bytes, byte[]> {
    private static final int TTL_NOT_USED = -1;
    private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
    private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
    private static final long WRITE_BUFFER_SIZE = 0x1000000L;
    private static final long BLOCK_CACHE_SIZE = 0x3200000L;
    private static final long BLOCK_SIZE = 4096L;
    private static final int TTL_SECONDS = -1;
    private static final int MAX_WRITE_BUFFERS = 3;
    private static final String DB_FILE_DIR = "rocksdb";
    private final String name;
    private final String parentDir;
    private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet());
    File dbDir;
    private RocksDB db;
    private Options options;
    private WriteOptions wOptions;
    private FlushOptions fOptions;
    private volatile boolean prepareForBulkload = false;
    private ProcessorContext internalProcessorContext;
    volatile BatchingStateRestoreCallback batchingStateRestoreCallback = null;
    protected volatile boolean open = false;

    RocksDBStore(String name) {
        this(name, DB_FILE_DIR);
    }

    RocksDBStore(String name, String parentDir) {
        this.name = name;
        this.parentDir = parentDir;
    }

    public void openDB(ProcessorContext context) {
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockCacheSize(0x3200000L);
        tableConfig.setBlockSize(4096L);
        this.options = new Options();
        this.options.setTableFormatConfig((TableFormatConfig)tableConfig);
        this.options.setWriteBufferSize(0x1000000L);
        this.options.setCompressionType(COMPRESSION_TYPE);
        this.options.setCompactionStyle(COMPACTION_STYLE);
        this.options.setMaxWriteBufferNumber(3);
        this.options.setCreateIfMissing(true);
        this.options.setErrorIfExists(false);
        this.options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
        this.options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2));
        this.wOptions = new WriteOptions();
        this.wOptions.setDisableWAL(true);
        this.fOptions = new FlushOptions();
        this.fOptions.setWaitForFlush(true);
        Map<String, Object> configs = context.appConfigs();
        Class configSetterClass = (Class)configs.get("rocksdb.config.setter");
        if (configSetterClass != null) {
            RocksDBConfigSetter configSetter = (RocksDBConfigSetter)Utils.newInstance((Class)configSetterClass);
            configSetter.setConfig(this.name, this.options, configs);
        }
        if (this.prepareForBulkload) {
            this.options.prepareForBulkLoad();
        }
        this.dbDir = new File(new File(context.stateDir(), this.parentDir), this.name);
        try {
            this.db = this.openDB(this.dbDir, this.options, -1);
        }
        catch (IOException e) {
            throw new ProcessorStateException(e);
        }
        this.open = true;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.internalProcessorContext = context;
        this.openDB(context);
        this.batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this);
        context.register(root, this.batchingStateRestoreCallback);
    }

    private RocksDB openDB(File dir, Options options, int ttl) throws IOException {
        try {
            if (ttl == -1) {
                Files.createDirectories(dir.getParentFile().toPath(), new FileAttribute[0]);
                return RocksDB.open((Options)options, (String)dir.getAbsolutePath());
            }
            throw new UnsupportedOperationException("Change log is not supported for store " + this.name + " since it is TTL based.");
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error opening store " + this.name + " at location " + dir.toString(), e);
        }
    }

    boolean isPrepareForBulkload() {
        return this.prepareForBulkload;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public boolean persistent() {
        return true;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    @Override
    public synchronized byte[] get(Bytes key) {
        this.validateStoreOpen();
        return this.getInternal(key.get());
    }

    private void validateStoreOpen() {
        if (!this.open) {
            throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
        }
    }

    private byte[] getInternal(byte[] rawKey) {
        try {
            return this.db.get(rawKey);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while getting value for key %s from store " + this.name, e);
        }
    }

    private void toggleDbForBulkLoading(boolean prepareForBulkload) {
        String[] sstFileNames;
        if (prepareForBulkload && (sstFileNames = this.dbDir.list(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return name.matches(".*\\.sst");
            }
        })) != null && sstFileNames.length > 0) {
            try {
                this.db.compactRange(true, 1, 0);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error while range compacting during restoring  store " + this.name, e);
            }
            this.close();
            this.openDB(this.internalProcessorContext);
        }
        this.close();
        this.prepareForBulkload = prepareForBulkload;
        this.openDB(this.internalProcessorContext);
    }

    @Override
    public synchronized void put(Bytes key, byte[] value) {
        Objects.requireNonNull(key, "key cannot be null");
        this.validateStoreOpen();
        this.putInternal(key.get(), value);
    }

    @Override
    public synchronized byte[] putIfAbsent(Bytes key, byte[] value) {
        Objects.requireNonNull(key, "key cannot be null");
        byte[] originalValue = this.get(key);
        if (originalValue == null) {
            this.put(key, value);
        }
        return originalValue;
    }

    private void restoreAllInternal(Collection<KeyValue<byte[], byte[]>> records) {
        try (WriteBatch batch = new WriteBatch();){
            for (KeyValue<byte[], byte[]> record : records) {
                if (record.value == null) {
                    batch.remove((byte[])record.key);
                    continue;
                }
                batch.put((byte[])record.key, (byte[])record.value);
            }
            this.db.write(this.wOptions, batch);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
        }
    }

    private void putInternal(byte[] rawKey, byte[] rawValue) {
        if (rawValue == null) {
            try {
                this.db.delete(this.wOptions, rawKey);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error while removing key %s from store " + this.name, e);
            }
        }
        try {
            this.db.put(this.wOptions, rawKey, rawValue);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while putting key %s value %s into store " + this.name, e);
        }
    }

    @Override
    public void putAll(List<KeyValue<Bytes, byte[]>> entries) {
        try (WriteBatch batch = new WriteBatch();){
            for (KeyValue<Bytes, byte[]> entry : entries) {
                Objects.requireNonNull(entry.key, "key cannot be null");
                if (entry.value == null) {
                    batch.remove(((Bytes)entry.key).get());
                    continue;
                }
                batch.put(((Bytes)entry.key).get(), (byte[])entry.value);
            }
            this.db.write(this.wOptions, batch);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
        }
    }

    @Override
    public synchronized byte[] delete(Bytes key) {
        Objects.requireNonNull(key, "key cannot be null");
        byte[] value = this.get(key);
        this.put(key, null);
        return value;
    }

    @Override
    public synchronized KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to) {
        Objects.requireNonNull(from, "from cannot be null");
        Objects.requireNonNull(to, "to cannot be null");
        this.validateStoreOpen();
        RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(this.name, this.db.newIterator(), from, to);
        this.openIterators.add(rocksDBRangeIterator);
        return rocksDBRangeIterator;
    }

    @Override
    public synchronized KeyValueIterator<Bytes, byte[]> all() {
        this.validateStoreOpen();
        RocksIterator innerIter = this.db.newIterator();
        innerIter.seekToFirst();
        RocksDbIterator rocksDbIterator = new RocksDbIterator(this.name, innerIter);
        this.openIterators.add(rocksDbIterator);
        return rocksDbIterator;
    }

    @Override
    public long approximateNumEntries() {
        long value;
        this.validateStoreOpen();
        try {
            value = this.db.getLongProperty("rocksdb.estimate-num-keys");
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error fetching property from store " + this.name, e);
        }
        if (this.isOverflowing(value)) {
            return Long.MAX_VALUE;
        }
        return value;
    }

    private boolean isOverflowing(long value) {
        return value < 0L;
    }

    @Override
    public synchronized void flush() {
        if (this.db == null) {
            return;
        }
        this.flushInternal();
    }

    private void flushInternal() {
        try {
            this.db.flush(this.fOptions);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
        }
    }

    @Override
    public synchronized void close() {
        if (!this.open) {
            return;
        }
        this.open = false;
        this.closeOpenIterators();
        this.options.close();
        this.wOptions.close();
        this.fOptions.close();
        this.db.close();
        this.options = null;
        this.wOptions = null;
        this.fOptions = null;
        this.db = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeOpenIterators() {
        HashSet<KeyValueIterator> iterators;
        Set<KeyValueIterator> set = this.openIterators;
        synchronized (set) {
            iterators = new HashSet<KeyValueIterator>(this.openIterators);
        }
        for (KeyValueIterator iterator : iterators) {
            iterator.close();
        }
    }

    static class RocksDBBatchingRestoreCallback
    extends AbstractNotifyingBatchingRestoreCallback {
        private final RocksDBStore rocksDBStore;

        RocksDBBatchingRestoreCallback(RocksDBStore rocksDBStore) {
            this.rocksDBStore = rocksDBStore;
        }

        @Override
        public void restoreAll(Collection<KeyValue<byte[], byte[]>> records) {
            this.rocksDBStore.restoreAllInternal(records);
        }

        @Override
        public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
            this.rocksDBStore.toggleDbForBulkLoading(true);
        }

        @Override
        public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
            this.rocksDBStore.toggleDbForBulkLoading(false);
        }
    }

    private class RocksDBRangeIterator
    extends RocksDbIterator {
        private final Comparator<byte[]> comparator;
        private final byte[] rawToKey;

        RocksDBRangeIterator(String storeName, RocksIterator iter, Bytes from, Bytes to) {
            super(storeName, iter);
            this.comparator = Bytes.BYTES_LEXICO_COMPARATOR;
            iter.seek(from.get());
            this.rawToKey = to.get();
            if (this.rawToKey == null) {
                throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to);
            }
        }

        @Override
        public KeyValue<Bytes, byte[]> makeNext() {
            Object next = super.makeNext();
            if (next == null) {
                return (KeyValue)this.allDone();
            }
            if (this.comparator.compare(((Bytes)((KeyValue)next).key).get(), this.rawToKey) <= 0) {
                return next;
            }
            return (KeyValue)this.allDone();
        }
    }

    private class RocksDbIterator
    extends AbstractIterator<KeyValue<Bytes, byte[]>>
    implements KeyValueIterator<Bytes, byte[]> {
        private final String storeName;
        private final RocksIterator iter;
        private volatile boolean open = true;
        private KeyValue<Bytes, byte[]> next;

        RocksDbIterator(String storeName, RocksIterator iter) {
            this.iter = iter;
            this.storeName = storeName;
        }

        @Override
        public synchronized boolean hasNext() {
            if (!this.open) {
                throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", this.storeName));
            }
            return super.hasNext();
        }

        @Override
        public synchronized KeyValue<Bytes, byte[]> next() {
            return (KeyValue)super.next();
        }

        public KeyValue<Bytes, byte[]> makeNext() {
            if (!this.iter.isValid()) {
                return (KeyValue)this.allDone();
            }
            this.next = this.getKeyValue();
            this.iter.next();
            return this.next;
        }

        private KeyValue<Bytes, byte[]> getKeyValue() {
            return new KeyValue<Bytes, byte[]>(new Bytes(this.iter.key()), this.iter.value());
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("RocksDB iterator does not support remove()");
        }

        @Override
        public synchronized void close() {
            RocksDBStore.this.openIterators.remove(this);
            this.iter.close();
            this.open = false;
        }

        @Override
        public Bytes peekNextKey() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return (Bytes)this.next.key;
        }
    }
}

