/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.apache.kafka.trogdor.workload.ConsumeBenchSpec;
import org.apache.kafka.trogdor.workload.Histogram;
import org.apache.kafka.trogdor.workload.PartitionsSpec;
import org.apache.kafka.trogdor.workload.Throttle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumeBenchWorker
implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger(ConsumeBenchWorker.class);
    private static final int THROTTLE_PERIOD_MS = 100;
    private final String id;
    private final ConsumeBenchSpec spec;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ScheduledExecutorService executor;
    private WorkerStatusTracker status;
    private KafkaFutureImpl<String> doneFuture;
    private KafkaConsumer<byte[], byte[]> consumer;

    public ConsumeBenchWorker(String id, ConsumeBenchSpec spec) {
        this.id = id;
        this.spec = spec;
    }

    @Override
    public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> doneFuture) throws Exception {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("ConsumeBenchWorker is already running.");
        }
        log.info("{}: Activating ConsumeBenchWorker with {}", (Object)this.id, (Object)this.spec);
        this.executor = Executors.newScheduledThreadPool(2, ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", false));
        this.status = status;
        this.doneFuture = doneFuture;
        this.executor.submit(new Prepare());
    }

    @Override
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("ConsumeBenchWorker is not running.");
        }
        log.info("{}: Deactivating ConsumeBenchWorker.", (Object)this.id);
        this.doneFuture.complete((Object)"");
        this.executor.shutdownNow();
        this.executor.awaitTermination(1L, TimeUnit.DAYS);
        Utils.closeQuietly(this.consumer, (String)"consumer");
        this.consumer = null;
        this.executor = null;
        this.status = null;
        this.doneFuture = null;
    }

    public static class StatusData {
        private final long totalMessagesReceived;
        private final long totalBytesReceived;
        private final long averageMessageSizeBytes;
        private final float averageLatencyMs;
        private final int p50LatencyMs;
        private final int p95LatencyMs;
        private final int p99LatencyMs;
        static final float[] PERCENTILES = new float[]{0.5f, 0.95f, 0.99f};

        @JsonCreator
        StatusData(@JsonProperty(value="totalMessagesReceived") long totalMessagesReceived, @JsonProperty(value="totalBytesReceived") long totalBytesReceived, @JsonProperty(value="averageMessageSizeBytes") long averageMessageSizeBytes, @JsonProperty(value="averageLatencyMs") float averageLatencyMs, @JsonProperty(value="p50LatencyMs") int p50latencyMs, @JsonProperty(value="p95LatencyMs") int p95latencyMs, @JsonProperty(value="p99LatencyMs") int p99latencyMs) {
            this.totalMessagesReceived = totalMessagesReceived;
            this.totalBytesReceived = totalBytesReceived;
            this.averageMessageSizeBytes = averageMessageSizeBytes;
            this.averageLatencyMs = averageLatencyMs;
            this.p50LatencyMs = p50latencyMs;
            this.p95LatencyMs = p95latencyMs;
            this.p99LatencyMs = p99latencyMs;
        }

        @JsonProperty
        public long totalMessagesReceived() {
            return this.totalMessagesReceived;
        }

        @JsonProperty
        public long totalBytesReceived() {
            return this.totalBytesReceived;
        }

        @JsonProperty
        public long averageMessageSizeBytes() {
            return this.averageMessageSizeBytes;
        }

        @JsonProperty
        public float averageLatencyMs() {
            return this.averageLatencyMs;
        }

        @JsonProperty
        public int p50LatencyMs() {
            return this.p50LatencyMs;
        }

        @JsonProperty
        public int p95LatencyMs() {
            return this.p95LatencyMs;
        }

        @JsonProperty
        public int p99LatencyMs() {
            return this.p99LatencyMs;
        }
    }

    public class StatusUpdater
    implements Runnable {
        private final Histogram latencyHistogram;
        private final Histogram messageSizeHistogram;

        StatusUpdater(Histogram latencyHistogram, Histogram messageSizeHistogram) {
            this.latencyHistogram = latencyHistogram;
            this.messageSizeHistogram = messageSizeHistogram;
        }

        @Override
        public void run() {
            try {
                this.update();
            }
            catch (Exception e) {
                WorkerUtils.abort(log, "StatusUpdater", e, (KafkaFutureImpl<String>)ConsumeBenchWorker.this.doneFuture);
            }
        }

        StatusData update() {
            Histogram.Summary latSummary = this.latencyHistogram.summarize(StatusData.PERCENTILES);
            Histogram.Summary msgSummary = this.messageSizeHistogram.summarize(StatusData.PERCENTILES);
            StatusData statusData = new StatusData(latSummary.numSamples(), (long)((float)msgSummary.numSamples() * msgSummary.average()), (long)msgSummary.average(), latSummary.average(), latSummary.percentiles().get(0).value(), latSummary.percentiles().get(1).value(), latSummary.percentiles().get(2).value());
            ConsumeBenchWorker.this.status.update(JsonUtil.JSON_SERDE.valueToTree((Object)statusData));
            log.info("Status={}", (Object)JsonUtil.toJsonString(statusData));
            return statusData;
        }
    }

    public class ConsumeMessages
    implements Callable<Void> {
        private final Histogram latencyHistogram = new Histogram(5000);
        private final Histogram messageSizeHistogram = new Histogram(0x200000);
        private final Future<?> statusUpdaterFuture;
        private final Throttle throttle;

        ConsumeMessages(Collection<TopicPartition> topicPartitions) {
            this.statusUpdaterFuture = ConsumeBenchWorker.this.executor.scheduleAtFixedRate(new StatusUpdater(this.latencyHistogram, this.messageSizeHistogram), 1L, 1L, TimeUnit.MINUTES);
            Properties props = new Properties();
            props.put("bootstrap.servers", ConsumeBenchWorker.this.spec.bootstrapServers());
            props.put("client.id", "consumer." + ConsumeBenchWorker.this.id);
            props.put("group.id", "consumer-group-1");
            props.put("auto.offset.reset", "earliest");
            props.put("max.poll.interval.ms", (Object)100000);
            WorkerUtils.addConfigsToProperties(props, ConsumeBenchWorker.this.spec.commonClientConf(), ConsumeBenchWorker.this.spec.consumerConf());
            ConsumeBenchWorker.this.consumer = new KafkaConsumer(props, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
            ConsumeBenchWorker.this.consumer.assign(topicPartitions);
            int perPeriod = WorkerUtils.perSecToPerPeriod(ConsumeBenchWorker.this.spec.targetMessagesPerSec(), 100L);
            this.throttle = new Throttle(perPeriod, 100);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            long startTimeMs;
            long messagesConsumed = 0L;
            long bytesConsumed = 0L;
            long startBatchMs = startTimeMs = Time.SYSTEM.milliseconds();
            try {
                while (messagesConsumed < (long)ConsumeBenchWorker.this.spec.maxMessages()) {
                    ConsumerRecords records = ConsumeBenchWorker.this.consumer.poll(50L);
                    if (records.isEmpty()) continue;
                    long endBatchMs = Time.SYSTEM.milliseconds();
                    long elapsedBatchMs = endBatchMs - startBatchMs;
                    for (ConsumerRecord record : records) {
                        ++messagesConsumed;
                        long messageBytes = 0L;
                        if (record.key() != null) {
                            messageBytes += (long)record.serializedKeySize();
                        }
                        if (record.value() != null) {
                            messageBytes += (long)record.serializedValueSize();
                        }
                        this.latencyHistogram.add(elapsedBatchMs);
                        this.messageSizeHistogram.add(messageBytes);
                        bytesConsumed += messageBytes;
                        this.throttle.increment();
                    }
                    startBatchMs = Time.SYSTEM.milliseconds();
                }
                this.statusUpdaterFuture.cancel(false);
            }
            catch (Exception e) {
                try {
                    WorkerUtils.abort(log, "ConsumeRecords", e, (KafkaFutureImpl<String>)ConsumeBenchWorker.this.doneFuture);
                    this.statusUpdaterFuture.cancel(false);
                }
                catch (Throwable throwable) {
                    this.statusUpdaterFuture.cancel(false);
                    StatusData statusData = new StatusUpdater(this.latencyHistogram, this.messageSizeHistogram).update();
                    long curTimeMs = Time.SYSTEM.milliseconds();
                    log.info("Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData});
                    throw throwable;
                }
                StatusData statusData = new StatusUpdater(this.latencyHistogram, this.messageSizeHistogram).update();
                long curTimeMs = Time.SYSTEM.milliseconds();
                log.info("Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData});
            }
            StatusData statusData = new StatusUpdater(this.latencyHistogram, this.messageSizeHistogram).update();
            long curTimeMs = Time.SYSTEM.milliseconds();
            log.info("Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData});
            ConsumeBenchWorker.this.doneFuture.complete((Object)"");
            return null;
        }
    }

    public class Prepare
    implements Runnable {
        @Override
        public void run() {
            try {
                HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
                for (Map.Entry<String, PartitionsSpec> entry : ConsumeBenchWorker.this.spec.activeTopics().materialize().entrySet()) {
                    for (Integer partitionNumber : entry.getValue().partitionNumbers()) {
                        partitions.add(new TopicPartition(entry.getKey(), partitionNumber.intValue()));
                    }
                }
                log.info("Will consume from {}", partitions);
                ConsumeBenchWorker.this.executor.submit(new ConsumeMessages(partitions));
            }
            catch (Throwable e) {
                WorkerUtils.abort(log, "Prepare", e, (KafkaFutureImpl<String>)ConsumeBenchWorker.this.doneFuture);
            }
        }
    }
}

