/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.BufferKey;
import org.apache.kafka.streams.state.internals.BufferValue;
import org.apache.kafka.streams.state.internals.ContextualRecord;
import org.apache.kafka.streams.state.internals.Maybe;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import org.apache.kafka.streams.state.internals.metrics.Sensors;

public final class InMemoryTimeOrderedKeyValueBuffer<K, V>
implements TimeOrderedKeyValueBuffer<K, V> {
    private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
    private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
    private static final RecordHeaders V_1_CHANGELOG_HEADERS = new RecordHeaders(new Header[]{new RecordHeader("v", new byte[]{1})});
    private static final RecordHeaders V_2_CHANGELOG_HEADERS = new RecordHeaders(new Header[]{new RecordHeader("v", new byte[]{2})});
    private final Map<Bytes, BufferKey> index = new HashMap<Bytes, BufferKey>();
    private final TreeMap<BufferKey, BufferValue> sortedMap = new TreeMap();
    private final Set<Bytes> dirtyKeys = new HashSet<Bytes>();
    private final String storeName;
    private final boolean loggingEnabled;
    private Serde<K> keySerde;
    private FullChangeSerde<V> valueSerde;
    private long memBufferSize = 0L;
    private long minTimestamp = Long.MAX_VALUE;
    private RecordCollector collector;
    private String changelogTopic;
    private Sensor bufferSizeSensor;
    private Sensor bufferCountSensor;
    private volatile boolean open;
    private int partition;

    private InMemoryTimeOrderedKeyValueBuffer(String storeName, boolean loggingEnabled, Serde<K> keySerde, Serde<V> valueSerde) {
        this.storeName = storeName;
        this.loggingEnabled = loggingEnabled;
        this.keySerde = keySerde;
        this.valueSerde = FullChangeSerde.castOrWrap(valueSerde);
    }

    @Override
    public String name() {
        return this.storeName;
    }

    @Override
    public boolean persistent() {
        return false;
    }

    @Override
    public void setSerdesIfNull(Serde<K> keySerde, Serde<V> valueSerde) {
        this.keySerde = this.keySerde == null ? keySerde : this.keySerde;
        this.valueSerde = this.valueSerde == null ? FullChangeSerde.castOrWrap(valueSerde) : this.valueSerde;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        InternalProcessorContext internalProcessorContext = (InternalProcessorContext)context;
        this.bufferSizeSensor = Sensors.createBufferSizeSensor(this, internalProcessorContext);
        this.bufferCountSensor = Sensors.createBufferCountSensor(this, internalProcessorContext);
        context.register(root, this::restoreBatch);
        if (this.loggingEnabled) {
            this.collector = ((RecordCollector.Supplier)((Object)context)).recordCollector();
            this.changelogTopic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), this.storeName);
        }
        this.updateBufferMetrics();
        this.open = true;
        this.partition = context.taskId().partition;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    @Override
    public void close() {
        this.open = false;
        this.index.clear();
        this.sortedMap.clear();
        this.dirtyKeys.clear();
        this.memBufferSize = 0L;
        this.minTimestamp = Long.MAX_VALUE;
        this.updateBufferMetrics();
    }

    @Override
    public void flush() {
        if (this.loggingEnabled) {
            for (Bytes key : this.dirtyKeys) {
                BufferKey bufferKey = this.index.get(key);
                if (bufferKey == null) {
                    this.logTombstone(key);
                    continue;
                }
                BufferValue value = this.sortedMap.get(bufferKey);
                this.logValue(key, bufferKey, value);
            }
            this.dirtyKeys.clear();
        }
    }

    private void logValue(Bytes key, BufferKey bufferKey, BufferValue value) {
        int sizeOfBufferTime = 8;
        ByteBuffer buffer = value.serialize(8);
        buffer.putLong(bufferKey.time());
        this.collector.send(this.changelogTopic, key, buffer.array(), (Headers)V_2_CHANGELOG_HEADERS, this.partition, (Long)null, KEY_SERIALIZER, VALUE_SERIALIZER);
    }

    private void logTombstone(Bytes key) {
        this.collector.send(this.changelogTopic, key, null, (Headers)null, this.partition, (Long)null, KEY_SERIALIZER, VALUE_SERIALIZER);
    }

    private void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> batch) {
        for (ConsumerRecord<byte[], byte[]> record : batch) {
            byte[] changelogValue;
            ByteBuffer timeAndValue;
            Bytes key = Bytes.wrap((byte[])((byte[])record.key()));
            if (record.value() == null) {
                BufferKey bufferKey = this.index.remove(key);
                if (bufferKey != null) {
                    BufferValue removed = this.sortedMap.remove(bufferKey);
                    if (removed != null) {
                        this.memBufferSize -= InMemoryTimeOrderedKeyValueBuffer.computeRecordSize(bufferKey.key(), removed);
                    }
                    if (bufferKey.time() == this.minTimestamp) {
                        long l = this.minTimestamp = this.sortedMap.isEmpty() ? Long.MAX_VALUE : this.sortedMap.firstKey().time();
                    }
                }
                if (record.partition() != this.partition) {
                    throw new IllegalStateException(String.format("record partition [%d] is being restored by the wrong suppress partition [%d]", record.partition(), this.partition));
                }
            } else if (record.headers().lastHeader("v") == null) {
                timeAndValue = ByteBuffer.wrap((byte[])record.value());
                long time = timeAndValue.getLong();
                changelogValue = new byte[((byte[])record.value()).length - 8];
                timeAndValue.get(changelogValue);
                this.cleanPut(time, key, new BufferValue(new ContextualRecord(changelogValue, new ProcessorRecordContext(record.timestamp(), record.offset(), record.partition(), record.topic(), record.headers())), this.inferPriorValue(key, changelogValue)));
            } else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
                timeAndValue = ByteBuffer.wrap((byte[])record.value());
                long time = timeAndValue.getLong();
                changelogValue = new byte[((byte[])record.value()).length - 8];
                timeAndValue.get(changelogValue);
                ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
                this.cleanPut(time, key, new BufferValue(contextualRecord, this.inferPriorValue(key, contextualRecord.value())));
            } else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
                ByteBuffer valueAndTime = ByteBuffer.wrap((byte[])record.value());
                BufferValue bufferValue = BufferValue.deserialize(valueAndTime);
                long time = valueAndTime.getLong();
                this.cleanPut(time, key, bufferValue);
            } else {
                throw new IllegalArgumentException("Restoring apparently invalid changelog record: " + record);
            }
            if (record.partition() == this.partition) continue;
            throw new IllegalStateException(String.format("record partition [%d] is being restored by the wrong suppress partition [%d]", record.partition(), this.partition));
        }
        this.updateBufferMetrics();
    }

    private byte[] inferPriorValue(Bytes key, byte[] serializedChange) {
        return this.index.containsKey(key) ? this.internalPriorValueForBuffered(key) : FullChangeSerde.extractOldValuePart(ByteBuffer.wrap(serializedChange));
    }

    @Override
    public void evictWhile(Supplier<Boolean> predicate, Consumer<TimeOrderedKeyValueBuffer.Eviction<K, V>> callback) {
        Iterator<Map.Entry<BufferKey, BufferValue>> delegate = this.sortedMap.entrySet().iterator();
        int evictions = 0;
        if (predicate.get().booleanValue()) {
            Map.Entry<BufferKey, BufferValue> next = null;
            if (delegate.hasNext()) {
                next = delegate.next();
            }
            while (next != null && predicate.get().booleanValue()) {
                if (next.getKey().time() != this.minTimestamp) {
                    throw new IllegalStateException("minTimestamp [" + this.minTimestamp + "] did not match the actual min timestamp [" + next.getKey().time() + "]");
                }
                Object key = this.keySerde.deserializer().deserialize(this.changelogTopic, next.getKey().key().get());
                BufferValue bufferValue = next.getValue();
                ContextualRecord record = bufferValue.record();
                Change value = (Change)this.valueSerde.deserializer().deserialize(this.changelogTopic, record.value());
                callback.accept(new TimeOrderedKeyValueBuffer.Eviction(key, value, record.recordContext()));
                delegate.remove();
                this.index.remove(next.getKey().key());
                this.dirtyKeys.add(next.getKey().key());
                this.memBufferSize -= InMemoryTimeOrderedKeyValueBuffer.computeRecordSize(next.getKey().key(), bufferValue);
                if (delegate.hasNext()) {
                    next = delegate.next();
                    this.minTimestamp = next == null ? Long.MAX_VALUE : next.getKey().time();
                } else {
                    next = null;
                    this.minTimestamp = Long.MAX_VALUE;
                }
                ++evictions;
            }
        }
        if (evictions > 0) {
            this.updateBufferMetrics();
        }
    }

    @Override
    public Maybe<ValueAndTimestamp<V>> priorValueForBuffered(K key) {
        Bytes serializedKey = Bytes.wrap((byte[])this.keySerde.serializer().serialize(this.changelogTopic, key));
        if (this.index.containsKey(serializedKey)) {
            byte[] serializedValue = this.internalPriorValueForBuffered(serializedKey);
            Object deserializedValue = this.valueSerde.innerSerde().deserializer().deserialize(this.changelogTopic, serializedValue);
            return Maybe.defined(ValueAndTimestamp.make(deserializedValue, -1L));
        }
        return Maybe.undefined();
    }

    private byte[] internalPriorValueForBuffered(Bytes key) {
        BufferKey bufferKey = this.index.get(key);
        if (bufferKey == null) {
            throw new NoSuchElementException("Key [" + key + "] is not in the buffer.");
        }
        BufferValue bufferValue = this.sortedMap.get(bufferKey);
        return bufferValue.priorValue();
    }

    @Override
    public void put(long time, K key, Change<V> value, ProcessorRecordContext recordContext) {
        byte[] serializedPriorValue;
        Objects.requireNonNull(value, "value cannot be null");
        Objects.requireNonNull(recordContext, "recordContext cannot be null");
        Bytes serializedKey = Bytes.wrap((byte[])this.keySerde.serializer().serialize(this.changelogTopic, key));
        byte[] serializedValue = this.valueSerde.serializer().serialize(this.changelogTopic, value);
        BufferValue buffered = this.getBuffered(serializedKey);
        if (buffered == null) {
            Object priorValue = value.oldValue;
            serializedPriorValue = this.valueSerde.innerSerde().serializer().serialize(this.changelogTopic, priorValue);
        } else {
            serializedPriorValue = buffered.priorValue();
        }
        this.cleanPut(time, serializedKey, new BufferValue(new ContextualRecord(serializedValue, recordContext), serializedPriorValue));
        this.dirtyKeys.add(serializedKey);
        this.updateBufferMetrics();
    }

    private BufferValue getBuffered(Bytes key) {
        BufferKey bufferKey = this.index.get(key);
        return bufferKey == null ? null : this.sortedMap.get(bufferKey);
    }

    private void cleanPut(long time, Bytes key, BufferValue value) {
        BufferKey previousKey = this.index.get(key);
        if (previousKey == null) {
            BufferKey nextKey = new BufferKey(time, key);
            this.index.put(key, nextKey);
            this.sortedMap.put(nextKey, value);
            this.minTimestamp = Math.min(this.minTimestamp, time);
            this.memBufferSize += InMemoryTimeOrderedKeyValueBuffer.computeRecordSize(key, value);
        } else {
            BufferValue removedValue = this.sortedMap.put(previousKey, value);
            this.memBufferSize = this.memBufferSize + InMemoryTimeOrderedKeyValueBuffer.computeRecordSize(key, value) - (removedValue == null ? 0L : InMemoryTimeOrderedKeyValueBuffer.computeRecordSize(key, removedValue));
        }
    }

    @Override
    public int numRecords() {
        return this.index.size();
    }

    @Override
    public long bufferSize() {
        return this.memBufferSize;
    }

    @Override
    public long minTimestamp() {
        return this.minTimestamp;
    }

    private static long computeRecordSize(Bytes key, BufferValue value) {
        long size = 0L;
        size += 8L;
        size += (long)key.get().length;
        if (value != null) {
            size += value.sizeBytes();
        }
        return size;
    }

    private void updateBufferMetrics() {
        this.bufferSizeSensor.record((double)this.memBufferSize);
        this.bufferCountSensor.record((double)this.index.size());
    }

    public String toString() {
        return "InMemoryTimeOrderedKeyValueBuffer{storeName='" + this.storeName + '\'' + ", changelogTopic='" + this.changelogTopic + '\'' + ", open=" + this.open + ", loggingEnabled=" + this.loggingEnabled + ", minTimestamp=" + this.minTimestamp + ", memBufferSize=" + this.memBufferSize + ", \n\tdirtyKeys=" + this.dirtyKeys + ", \n\tindex=" + this.index + ", \n\tsortedMap=" + this.sortedMap + '}';
    }

    public static class Builder<K, V>
    implements StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> {
        private final String storeName;
        private final Serde<K> keySerde;
        private final Serde<V> valSerde;
        private boolean loggingEnabled = true;

        public Builder(String storeName, Serde<K> keySerde, Serde<V> valSerde) {
            this.storeName = storeName;
            this.keySerde = keySerde;
            this.valSerde = valSerde;
        }

        @Override
        public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> withCachingEnabled() {
            return this;
        }

        @Override
        public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> withCachingDisabled() {
            return this;
        }

        @Override
        public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> withLoggingEnabled(Map<String, String> config) {
            throw new UnsupportedOperationException();
        }

        @Override
        public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> withLoggingDisabled() {
            this.loggingEnabled = false;
            return this;
        }

        @Override
        public InMemoryTimeOrderedKeyValueBuffer<K, V> build() {
            return new InMemoryTimeOrderedKeyValueBuffer(this.storeName, this.loggingEnabled, this.keySerde, this.valSerde);
        }

        @Override
        public Map<String, String> logConfig() {
            return Collections.emptyMap();
        }

        @Override
        public boolean loggingEnabled() {
            return this.loggingEnabled;
        }

        @Override
        public String name() {
            return this.storeName;
        }
    }
}

