/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.nio.ByteBuffer;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKey;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchema;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO>
implements ProcessorSupplier<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
    private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionProcessorSupplier.class);
    private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
    private final CombinedKeySchema<KO, K> keySchema;

    public ForeignJoinSubscriptionProcessorSupplier(StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder, CombinedKeySchema<KO, K> keySchema) {
        this.storeBuilder = storeBuilder;
        this.keySchema = keySchema;
    }

    @Override
    public Processor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> get() {
        return new KTableKTableJoinProcessor();
    }

    private final class KTableKTableJoinProcessor
    extends ContextualProcessor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
        private Sensor droppedRecordsSensor;
        private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;

        private KTableKTableJoinProcessor() {
        }

        @Override
        public void init(ProcessorContext<K, SubscriptionResponseWrapper<VO>> context) {
            super.init(context);
            InternalProcessorContext internalProcessorContext = (InternalProcessorContext)context;
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), internalProcessorContext.taskId().toString(), internalProcessorContext.metrics());
            this.store = (TimestampedKeyValueStore)internalProcessorContext.getStateStore(ForeignJoinSubscriptionProcessorSupplier.this.storeBuilder);
        }

        @Override
        public void process(Record<KO, Change<VO>> record) {
            if (record.key() == null) {
                if (this.context().recordMetadata().isPresent()) {
                    RecordMetadata recordMetadata = this.context().recordMetadata().get();
                    LOG.warn("Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]", new Object[]{recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()});
                } else {
                    LOG.warn("Skipping record due to null key. Topic, partition, and offset not known.");
                }
                this.droppedRecordsSensor.record();
                return;
            }
            Bytes prefixBytes = ForeignJoinSubscriptionProcessorSupplier.this.keySchema.prefixBytes(record.key());
            try (KeyValueIterator prefixScanResults = this.store.range(prefixBytes, Bytes.increment((Bytes)prefixBytes));){
                while (prefixScanResults.hasNext()) {
                    KeyValue next = (KeyValue)prefixScanResults.next();
                    if (!this.prefixEquals(((Bytes)next.key).get(), prefixBytes.get())) continue;
                    CombinedKey combinedKey = ForeignJoinSubscriptionProcessorSupplier.this.keySchema.fromBytes((Bytes)next.key);
                    this.context().forward(record.withKey(combinedKey.getPrimaryKey()).withValue(new SubscriptionResponseWrapper(((SubscriptionWrapper)((ValueAndTimestamp)next.value).value()).getHash(), record.value().newValue, ((SubscriptionWrapper)((ValueAndTimestamp)next.value).value()).getPrimaryPartition())));
                }
            }
        }

        private boolean prefixEquals(byte[] x, byte[] y) {
            int min = Math.min(x.length, y.length);
            ByteBuffer xSlice = ByteBuffer.wrap(x, 0, min);
            ByteBuffer ySlice = ByteBuffer.wrap(y, 0, min);
            return xSlice.equals(ySlice);
        }
    }
}

