/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.KafkaStream;
import kafka.tools.ConsumerPerformance;
import kafka.tools.ConsumerPerformance$;
import kafka.utils.ToolsUtils$;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import scala.Function1;
import scala.MatchError;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

public final class ConsumerPerformance$
implements LazyLogging {
    public static final ConsumerPerformance$ MODULE$;
    private final Logger logger;
    private volatile boolean bitmap$0;

    static {
        new ConsumerPerformance$();
    }

    private Logger logger$lzycompute() {
        ConsumerPerformance$ consumerPerformance$ = this;
        synchronized (consumerPerformance$) {
            if (!this.bitmap$0) {
                this.logger = LazyLogging.class.logger((LazyLogging)this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    public void main(String[] args) {
        BoxedUnit boxedUnit;
        ConsumerPerformance.ConsumerPerfConfig config = new ConsumerPerformance.ConsumerPerfConfig(args);
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info("Starting consumer...");
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        AtomicLong totalMessagesRead = new AtomicLong(0L);
        AtomicLong totalBytesRead = new AtomicLong(0L);
        AtomicBoolean consumerTimeout = new AtomicBoolean(false);
        scala.collection.mutable.Map metrics = null;
        AtomicLong joinGroupTimeInMs = new AtomicLong(0L);
        if (!config.hideHeader()) {
            this.printHeader(config.showDetailedStats(), config.useOldConsumer());
        }
        long startMs = 0L;
        long endMs = 0L;
        if (config.useOldConsumer()) {
            BoxedUnit boxedUnit2;
            BoxedUnit boxedUnit3;
            ConsumerConfig consumerConfig = new ConsumerConfig(config.props());
            ConsumerConnector consumerConnector = Consumer$.MODULE$.create(consumerConfig);
            Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams((Map<String, Object>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)config.topic()), (Object)BoxesRunTime.boxToInteger((int)config.numThreads()))}))));
            ObjectRef threadList = ObjectRef.create((Object)Nil$.MODULE$);
            topicMessageStreams.values().foreach((Function1)new Serializable(config, totalMessagesRead, totalBytesRead, consumerTimeout, threadList){
                public static final long serialVersionUID = 0L;
                public final ConsumerPerformance.ConsumerPerfConfig config$1;
                public final AtomicLong totalMessagesRead$1;
                public final AtomicLong totalBytesRead$1;
                public final AtomicBoolean consumerTimeout$1;
                public final ObjectRef threadList$1;

                public final void apply(List<KafkaStream<byte[], byte[]>> streamList) {
                    RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), streamList.length()).foreach$mVc$sp((Function1)new Serializable(this, streamList){
                        public static final long serialVersionUID = 0L;
                        private final /* synthetic */ anonfun.main.1 $outer;
                        private final List streamList$1;

                        public final void apply(int i) {
                            this.apply$mcVI$sp(i);
                        }

                        public void apply$mcVI$sp(int i) {
                            this.$outer.threadList$1.elem = ((List)this.$outer.threadList$1.elem).$colon$colon((Object)new ConsumerPerformance.ConsumerPerfThread(i, new StringBuilder().append((Object)"kafka-zk-consumer-").append((Object)BoxesRunTime.boxToInteger((int)i)).toString(), (KafkaStream)this.streamList$1.apply(i), this.$outer.config$1, this.$outer.totalMessagesRead$1, this.$outer.totalBytesRead$1, this.$outer.consumerTimeout$1));
                        }
                        {
                            if ($outer == null) {
                                throw null;
                            }
                            this.$outer = $outer;
                            this.streamList$1 = streamList$1;
                        }
                    });
                }
                {
                    this.config$1 = config$1;
                    this.totalMessagesRead$1 = totalMessagesRead$1;
                    this.totalBytesRead$1 = totalBytesRead$1;
                    this.consumerTimeout$1 = consumerTimeout$1;
                    this.threadList$1 = threadList$1;
                }
            });
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Sleeping for 1 second.");
                boxedUnit3 = BoxedUnit.UNIT;
            } else {
                boxedUnit3 = BoxedUnit.UNIT;
            }
            Thread.sleep(1000L);
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("starting threads");
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            startMs = System.currentTimeMillis();
            ((List)threadList.elem).foreach((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final void apply(ConsumerPerformance.ConsumerPerfThread thread) {
                    thread.start();
                }
            });
            ((List)threadList.elem).foreach((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final void apply(ConsumerPerformance.ConsumerPerfThread thread) {
                    thread.join();
                }
            });
            endMs = consumerTimeout.get() ? System.currentTimeMillis() - (long)consumerConfig.consumerTimeoutMs() : System.currentTimeMillis();
            consumerConnector.shutdown();
        } else {
            KafkaConsumer consumer = new KafkaConsumer(config.props());
            consumer.subscribe(Collections.singletonList(config.topic()));
            startMs = System.currentTimeMillis();
            this.consume((KafkaConsumer<byte[], byte[]>)consumer, (List<String>)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{config.topic()})), config.numMessages(), 1000L, config, totalMessagesRead, totalBytesRead, joinGroupTimeInMs, startMs);
            endMs = System.currentTimeMillis();
            if (config.printMetrics()) {
                metrics = (scala.collection.mutable.Map)JavaConverters$.MODULE$.mapAsScalaMapConverter(consumer.metrics()).asScala();
            }
            consumer.close();
        }
        double elapsedSecs = (double)(endMs - startMs) / 1000.0;
        long fetchTimeInMs = endMs - startMs - joinGroupTimeInMs.get();
        if (!config.showDetailedStats()) {
            double totalMBRead = (double)totalBytesRead.get() * 1.0 / (double)0x100000;
            Predef$.MODULE$.print((Object)new StringOps(Predef$.MODULE$.augmentString("%s, %s, %.4f, %.4f, %d, %.4f")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{config.dateFormat().format(BoxesRunTime.boxToLong((long)startMs)), config.dateFormat().format(BoxesRunTime.boxToLong((long)endMs)), BoxesRunTime.boxToDouble((double)totalMBRead), BoxesRunTime.boxToDouble((double)(totalMBRead / elapsedSecs)), BoxesRunTime.boxToLong((long)totalMessagesRead.get()), BoxesRunTime.boxToDouble((double)((double)totalMessagesRead.get() / elapsedSecs))})));
            if (!config.useOldConsumer()) {
                Predef$.MODULE$.print((Object)new StringOps(Predef$.MODULE$.augmentString(", %d, %d, %.4f, %.4f")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)joinGroupTimeInMs.get()), BoxesRunTime.boxToLong((long)fetchTimeInMs), BoxesRunTime.boxToDouble((double)(totalMBRead / ((double)fetchTimeInMs / 1000.0))), BoxesRunTime.boxToDouble((double)((double)totalMessagesRead.get() / ((double)fetchTimeInMs / 1000.0)))})));
            }
            Predef$.MODULE$.println();
        }
        if (metrics != null) {
            ToolsUtils$.MODULE$.printMetrics((scala.collection.mutable.Map<MetricName, ? extends Metric>)metrics);
        }
    }

    public void printHeader(boolean showDetailedStats, boolean useOldConsumer) {
        String newFieldsInHeader;
        String string = newFieldsInHeader = useOldConsumer ? "" : ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec";
        if (showDetailedStats) {
            Predef$.MODULE$.println((Object)new StringBuilder().append((Object)"time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec").append((Object)newFieldsInHeader).toString());
        } else {
            Predef$.MODULE$.println((Object)new StringBuilder().append((Object)"start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec").append((Object)newFieldsInHeader).toString());
        }
    }

    public void consume(KafkaConsumer<byte[], byte[]> consumer, List<String> topics, long count, long timeout, ConsumerPerformance.ConsumerPerfConfig config, AtomicLong totalMessagesRead, AtomicLong totalBytesRead, AtomicLong joinTime, long testStartTime) {
        LongRef bytesRead = LongRef.create((long)0L);
        LongRef messagesRead = LongRef.create((long)0L);
        LongRef lastBytesRead = LongRef.create((long)0L);
        LongRef lastMessagesRead = LongRef.create((long)0L);
        LongRef joinStart = LongRef.create((long)0L);
        LongRef joinTimeMsInSingleRound = LongRef.create((long)0L);
        consumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(topics).asJava(), new ConsumerRebalanceListener(joinTime, joinStart, joinTimeMsInSingleRound){
            private final AtomicLong joinTime$1;
            private final LongRef joinStart$1;
            private final LongRef joinTimeMsInSingleRound$1;

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                this.joinTime$1.addAndGet(System.currentTimeMillis() - this.joinStart$1.elem);
                this.joinTimeMsInSingleRound$1.elem += System.currentTimeMillis() - this.joinStart$1.elem;
            }

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                this.joinStart$1.elem = System.currentTimeMillis();
            }
            {
                this.joinTime$1 = joinTime$1;
                this.joinStart$1 = joinStart$1;
                this.joinTimeMsInSingleRound$1 = joinTimeMsInSingleRound$1;
            }
        });
        consumer.poll(0L);
        consumer.seekToBeginning(Collections.emptyList());
        long startMs = System.currentTimeMillis();
        LongRef lastReportTime = LongRef.create((long)startMs);
        long lastConsumedTime = System.currentTimeMillis();
        LongRef currentTimeMillis = LongRef.create((long)lastConsumedTime);
        while (messagesRead.elem < count && currentTimeMillis.elem - lastConsumedTime <= timeout) {
            Iterable records = (Iterable)JavaConverters$.MODULE$.iterableAsScalaIterableConverter((java.lang.Iterable)consumer.poll(100L)).asScala();
            currentTimeMillis.elem = System.currentTimeMillis();
            if (records.nonEmpty()) {
                lastConsumedTime = currentTimeMillis.elem;
            }
            records.foreach((Function1)new Serializable(config, bytesRead, messagesRead, lastBytesRead, lastMessagesRead, joinTimeMsInSingleRound, lastReportTime, currentTimeMillis){
                public static final long serialVersionUID = 0L;
                private final ConsumerPerformance.ConsumerPerfConfig config$2;
                private final LongRef bytesRead$1;
                private final LongRef messagesRead$1;
                private final LongRef lastBytesRead$1;
                private final LongRef lastMessagesRead$1;
                private final LongRef joinTimeMsInSingleRound$1;
                private final LongRef lastReportTime$1;
                private final LongRef currentTimeMillis$1;

                public final void apply(ConsumerRecord<byte[], byte[]> record2) {
                    ++this.messagesRead$1.elem;
                    if (record2.key() != null) {
                        this.bytesRead$1.elem += (long)Predef$.MODULE$.byteArrayOps((byte[])record2.key()).size();
                    }
                    if (record2.value() != null) {
                        this.bytesRead$1.elem += (long)Predef$.MODULE$.byteArrayOps((byte[])record2.value()).size();
                    }
                    if (this.currentTimeMillis$1.elem - this.lastReportTime$1.elem >= (long)this.config$2.reportingInterval()) {
                        if (this.config$2.showDetailedStats()) {
                            ConsumerPerformance$.MODULE$.printNewConsumerProgress(0, this.bytesRead$1.elem, this.lastBytesRead$1.elem, this.messagesRead$1.elem, this.lastMessagesRead$1.elem, this.lastReportTime$1.elem, this.currentTimeMillis$1.elem, this.config$2.dateFormat(), this.joinTimeMsInSingleRound$1.elem);
                        }
                        this.joinTimeMsInSingleRound$1.elem = 0L;
                        this.lastReportTime$1.elem = this.currentTimeMillis$1.elem;
                        this.lastMessagesRead$1.elem = this.messagesRead$1.elem;
                        this.lastBytesRead$1.elem = this.bytesRead$1.elem;
                    }
                }
                {
                    this.config$2 = config$2;
                    this.bytesRead$1 = bytesRead$1;
                    this.messagesRead$1 = messagesRead$1;
                    this.lastBytesRead$1 = lastBytesRead$1;
                    this.lastMessagesRead$1 = lastMessagesRead$1;
                    this.joinTimeMsInSingleRound$1 = joinTimeMsInSingleRound$1;
                    this.lastReportTime$1 = lastReportTime$1;
                    this.currentTimeMillis$1 = currentTimeMillis$1;
                }
            });
        }
        totalMessagesRead.set(messagesRead.elem);
        totalBytesRead.set(bytesRead.elem);
    }

    public void printOldConsumerProgress(int id, long bytesRead, long lastBytesRead, long messagesRead, long lastMessagesRead, long startMs, long endMs, SimpleDateFormat dateFormat) {
        this.printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat);
        Predef$.MODULE$.println();
    }

    public void printNewConsumerProgress(int id, long bytesRead, long lastBytesRead, long messagesRead, long lastMessagesRead, long startMs, long endMs, SimpleDateFormat dateFormat, long periodicJoinTimeInMs) {
        this.printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat);
        this.printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, periodicJoinTimeInMs);
        Predef$.MODULE$.println();
    }

    private void printBasicProgress(int id, long bytesRead, long lastBytesRead, long messagesRead, long lastMessagesRead, long startMs, long endMs, SimpleDateFormat dateFormat) {
        double elapsedMs = endMs - startMs;
        double totalMbRead = (double)bytesRead * 1.0 / (double)0x100000;
        double intervalMbRead = (double)(bytesRead - lastBytesRead) * 1.0 / (double)0x100000;
        double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
        double intervalMessagesPerSec = (double)(messagesRead - lastMessagesRead) / elapsedMs * 1000.0;
        Predef$.MODULE$.print((Object)new StringOps(Predef$.MODULE$.augmentString("%s, %d, %.4f, %.4f, %d, %.4f")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{dateFormat.format(BoxesRunTime.boxToLong((long)endMs)), BoxesRunTime.boxToInteger((int)id), BoxesRunTime.boxToDouble((double)totalMbRead), BoxesRunTime.boxToDouble((double)intervalMbPerSec), BoxesRunTime.boxToLong((long)messagesRead), BoxesRunTime.boxToDouble((double)intervalMessagesPerSec)})));
    }

    private void printExtendedProgress(long bytesRead, long lastBytesRead, long messagesRead, long lastMessagesRead, long startMs, long endMs, long periodicJoinTimeInMs) {
        Tuple2.mcDD.sp sp2;
        long fetchTimeMs = endMs - startMs - periodicJoinTimeInMs;
        double intervalMbRead = (double)(bytesRead - lastBytesRead) * 1.0 / (double)0x100000;
        long intervalMessagesRead = messagesRead - lastMessagesRead;
        Tuple2.mcDD.sp sp3 = sp2 = fetchTimeMs <= 0L ? new Tuple2.mcDD.sp(0.0, 0.0) : new Tuple2.mcDD.sp(1000.0 * intervalMbRead / (double)fetchTimeMs, 1000.0 * (double)intervalMessagesRead / (double)fetchTimeMs);
        if (sp2 != null) {
            Tuple2.mcDD.sp sp4;
            double intervalMbPerSec = sp2._1$mcD$sp();
            double intervalMessagesPerSec = sp2._2$mcD$sp();
            Tuple2.mcDD.sp sp5 = sp4 = new Tuple2.mcDD.sp(intervalMbPerSec, intervalMessagesPerSec);
            double intervalMbPerSec2 = sp5._1$mcD$sp();
            double intervalMessagesPerSec2 = sp5._2$mcD$sp();
            Predef$.MODULE$.print((Object)new StringOps(Predef$.MODULE$.augmentString(", %d, %d, %.4f, %.4f")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)periodicJoinTimeInMs), BoxesRunTime.boxToLong((long)fetchTimeMs), BoxesRunTime.boxToDouble((double)intervalMbPerSec2), BoxesRunTime.boxToDouble((double)intervalMessagesPerSec2)})));
            return;
        }
        throw new MatchError((Object)sp2);
    }

    private ConsumerPerformance$() {
        MODULE$ = this;
        LazyLogging.class.$init$((LazyLogging)this);
    }
}

