/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.record;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;

public class MemoryRecordsTest {
    private final long logAppendTime = System.currentTimeMillis();

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testIterator(Args args) {
        SimpleRecord[] records;
        CompressionType compression = args.compression;
        byte magic = args.magic;
        long pid = args.pid;
        short epoch = args.epoch;
        int firstSequence = args.firstSequence;
        long firstOffset = args.firstOffset;
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int partitionLeaderEpoch = 998;
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compression, TimestampType.CREATE_TIME, firstOffset, this.logAppendTime, pid, epoch, firstSequence, false, false, partitionLeaderEpoch, buffer.limit());
        for (SimpleRecord record : records = new SimpleRecord[]{new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes()), new SimpleRecord(4L, null, "4".getBytes()), new SimpleRecord(5L, "d".getBytes(), null), new SimpleRecord(6L, (byte[])null, null)}) {
            builder.append(record);
        }
        MemoryRecords memoryRecords = builder.build();
        for (int iteration = 0; iteration < 2; ++iteration) {
            int total = 0;
            for (RecordBatch batch : memoryRecords.batches()) {
                Assertions.assertTrue((boolean)batch.isValid());
                Assertions.assertEquals((Object)compression, (Object)batch.compressionType());
                Assertions.assertEquals((long)(firstOffset + (long)total), (long)batch.baseOffset());
                if (magic >= 2) {
                    Assertions.assertEquals((long)pid, (long)batch.producerId());
                    Assertions.assertEquals((short)epoch, (short)batch.producerEpoch());
                    Assertions.assertEquals((int)(firstSequence + total), (int)batch.baseSequence());
                    Assertions.assertEquals((int)partitionLeaderEpoch, (int)batch.partitionLeaderEpoch());
                    Assertions.assertEquals((int)records.length, (int)batch.countOrNull());
                    Assertions.assertEquals((Object)TimestampType.CREATE_TIME, (Object)batch.timestampType());
                    Assertions.assertEquals((long)records[records.length - 1].timestamp(), (long)batch.maxTimestamp());
                } else {
                    Assertions.assertEquals((long)-1L, (long)batch.producerId());
                    Assertions.assertEquals((short)-1, (short)batch.producerEpoch());
                    Assertions.assertEquals((int)-1, (int)batch.baseSequence());
                    Assertions.assertEquals((int)-1, (int)batch.partitionLeaderEpoch());
                    Assertions.assertNull((Object)batch.countOrNull());
                    if (magic == 0) {
                        Assertions.assertEquals((Object)TimestampType.NO_TIMESTAMP_TYPE, (Object)batch.timestampType());
                    } else {
                        Assertions.assertEquals((Object)TimestampType.CREATE_TIME, (Object)batch.timestampType());
                    }
                }
                int recordCount = 0;
                for (Record record : batch) {
                    record.ensureValid();
                    Assertions.assertTrue((boolean)record.hasMagic(batch.magic()));
                    Assertions.assertFalse((boolean)record.isCompressed());
                    Assertions.assertEquals((long)(firstOffset + (long)total), (long)record.offset());
                    Assertions.assertEquals((Object)records[total].key(), (Object)record.key());
                    Assertions.assertEquals((Object)records[total].value(), (Object)record.value());
                    if (magic >= 2) {
                        Assertions.assertEquals((int)(firstSequence + total), (int)record.sequence());
                    }
                    Assertions.assertFalse((boolean)record.hasTimestampType(TimestampType.LOG_APPEND_TIME));
                    if (magic == 0) {
                        Assertions.assertEquals((long)-1L, (long)record.timestamp());
                        Assertions.assertFalse((boolean)record.hasTimestampType(TimestampType.CREATE_TIME));
                        Assertions.assertTrue((boolean)record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
                    } else {
                        Assertions.assertEquals((long)records[total].timestamp(), (long)record.timestamp());
                        Assertions.assertFalse((boolean)record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
                        if (magic < 2) {
                            Assertions.assertTrue((boolean)record.hasTimestampType(TimestampType.CREATE_TIME));
                        } else {
                            Assertions.assertFalse((boolean)record.hasTimestampType(TimestampType.CREATE_TIME));
                        }
                    }
                    ++total;
                    ++recordCount;
                }
                Assertions.assertEquals((long)(batch.baseOffset() + (long)recordCount - 1L), (long)batch.lastOffset());
            }
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testHasRoomForMethod(Args args) {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (byte)args.magic, (CompressionType)args.compression, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        builder.append(0L, "a".getBytes(), "1".getBytes());
        Assertions.assertTrue((boolean)builder.hasRoomFor(1L, "b".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS));
        builder.close();
        Assertions.assertFalse((boolean)builder.hasRoomFor(1L, "b".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS));
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testHasRoomForMethodWithHeaders(Args args) {
        byte magic = args.magic;
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(120), (byte)magic, (CompressionType)args.compression, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        builder.append(this.logAppendTime, "key".getBytes(), "value".getBytes());
        RecordHeaders headers = new RecordHeaders();
        for (int i = 0; i < 10; ++i) {
            headers.add("hello", "world.world".getBytes());
        }
        Assertions.assertTrue((boolean)builder.hasRoomFor(this.logAppendTime, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS));
        if (magic < 2) {
            Assertions.assertTrue((boolean)builder.hasRoomFor(this.logAppendTime, "key".getBytes(), "value".getBytes(), headers.toArray()));
        } else {
            Assertions.assertFalse((boolean)builder.hasRoomFor(this.logAppendTime, "key".getBytes(), "value".getBytes(), headers.toArray()));
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testChecksum(Args args) {
        CompressionType compression = args.compression;
        byte magic = args.magic;
        if (compression != CompressionType.NONE && compression != CompressionType.LZ4) {
            return;
        }
        SimpleRecord[] records = new SimpleRecord[]{new SimpleRecord(283843L, "key1".getBytes(), "value1".getBytes()), new SimpleRecord(1234L, "key2".getBytes(), "value2".getBytes())};
        RecordBatch batch = (RecordBatch)MemoryRecords.withRecords((byte)magic, (CompressionType)compression, (SimpleRecord[])records).batches().iterator().next();
        long expectedChecksum = magic == 0 ? (compression == CompressionType.NONE ? 1978725405L : 66944826L) : (magic == 1 ? (compression == CompressionType.NONE ? 109425508L : 1407303399L) : (compression == CompressionType.NONE ? 3851219455L : 2745969314L));
        Assertions.assertEquals((long)expectedChecksum, (long)batch.checksum(), (String)("Unexpected checksum for magic " + magic + " and compression type " + compression));
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testFilterToPreservesPartitionLeaderEpoch(Args args) {
        byte magic = args.magic;
        int partitionLeaderEpoch = 67;
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)args.compression, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)-1L, (int)partitionLeaderEpoch);
        builder.append(10L, null, "a".getBytes());
        builder.append(11L, "1".getBytes(), "b".getBytes());
        builder.append(12L, null, "c".getBytes());
        ByteBuffer filtered = ByteBuffer.allocate(2048);
        builder.build().filterTo(new TopicPartition("foo", 0), (MemoryRecords.RecordFilter)new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
        filtered.flip();
        MemoryRecords filteredRecords = MemoryRecords.readableRecords((ByteBuffer)filtered);
        List batches = TestUtils.toList(filteredRecords.batches());
        Assertions.assertEquals((int)1, (int)batches.size());
        MutableRecordBatch firstBatch = (MutableRecordBatch)batches.get(0);
        if (magic < 2) {
            Assertions.assertEquals((int)-1, (int)firstBatch.partitionLeaderEpoch());
        } else {
            Assertions.assertEquals((int)partitionLeaderEpoch, (int)firstBatch.partitionLeaderEpoch());
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testFilterToEmptyBatchRetention(Args args) {
        byte magic = args.magic;
        for (boolean isTransactional : Arrays.asList(true, false)) {
            ByteBuffer buffer = ByteBuffer.allocate(2048);
            long producerId = 23L;
            short producerEpoch = 5;
            long baseOffset = 3L;
            int baseSequence = 10;
            int partitionLeaderEpoch = 293;
            int numRecords = 2;
            Supplier<MemoryRecordsBuilder> supplier = () -> MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)args.compression, (TimestampType)TimestampType.CREATE_TIME, (long)baseOffset, (long)-1L, (long)producerId, (short)producerEpoch, (int)baseSequence, (boolean)isTransactional, (int)partitionLeaderEpoch);
            if (isTransactional && magic < 2) {
                Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
                continue;
            }
            MemoryRecordsBuilder builder = supplier.get();
            builder.append(11L, "2".getBytes(), "b".getBytes());
            builder.append(12L, "3".getBytes(), "c".getBytes());
            if (magic < 2) {
                Assertions.assertThrows(IllegalArgumentException.class, () -> ((MemoryRecordsBuilder)builder).close());
                continue;
            }
            builder.close();
            MemoryRecords records = builder.build();
            ByteBuffer filtered = ByteBuffer.allocate(2048);
            MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter(0L, 0L){

                protected MemoryRecords.RecordFilter.BatchRetentionResult checkBatchRetention(RecordBatch batch) {
                    return new MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.RETAIN_EMPTY, false);
                }

                protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
                    return false;
                }
            }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
            Assertions.assertEquals((int)numRecords, (int)filterResult.messagesRead());
            Assertions.assertEquals((int)records.sizeInBytes(), (int)filterResult.bytesRead());
            Assertions.assertEquals((long)(baseOffset + 1L), (long)filterResult.maxOffset());
            Assertions.assertEquals((int)0, (int)filterResult.messagesRetained());
            Assertions.assertEquals((int)61, (int)filterResult.bytesRetained());
            Assertions.assertEquals((long)12L, (long)filterResult.maxTimestamp());
            Assertions.assertEquals((long)(baseOffset + 1L), (long)filterResult.shallowOffsetOfMaxTimestamp());
            filtered.flip();
            MemoryRecords filteredRecords = MemoryRecords.readableRecords((ByteBuffer)filtered);
            List batches = TestUtils.toList(filteredRecords.batches());
            Assertions.assertEquals((int)1, (int)batches.size());
            MutableRecordBatch batch = (MutableRecordBatch)batches.get(0);
            Assertions.assertEquals((int)0, (int)batch.countOrNull());
            Assertions.assertEquals((long)12L, (long)batch.maxTimestamp());
            Assertions.assertEquals((Object)TimestampType.CREATE_TIME, (Object)batch.timestampType());
            Assertions.assertEquals((long)baseOffset, (long)batch.baseOffset());
            Assertions.assertEquals((long)(baseOffset + 1L), (long)batch.lastOffset());
            Assertions.assertEquals((int)baseSequence, (int)batch.baseSequence());
            Assertions.assertEquals((int)(baseSequence + 1), (int)batch.lastSequence());
            Assertions.assertEquals((Object)isTransactional, (Object)batch.isTransactional());
        }
    }

    @Test
    public void testEmptyBatchRetention() {
        ByteBuffer buffer = ByteBuffer.allocate(61);
        long producerId = 23L;
        short producerEpoch = 5;
        long baseOffset = 3L;
        int baseSequence = 10;
        int partitionLeaderEpoch = 293;
        long timestamp = System.currentTimeMillis();
        DefaultRecordBatch.writeEmptyHeader((ByteBuffer)buffer, (byte)2, (long)producerId, (short)producerEpoch, (int)baseSequence, (long)baseOffset, (long)baseOffset, (int)partitionLeaderEpoch, (TimestampType)TimestampType.CREATE_TIME, (long)timestamp, (boolean)false, (boolean)false);
        buffer.flip();
        ByteBuffer filtered = ByteBuffer.allocate(2048);
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter(0L, 0L){

            protected MemoryRecords.RecordFilter.BatchRetentionResult checkBatchRetention(RecordBatch batch) {
                return new MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.RETAIN_EMPTY, false);
            }

            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
                return false;
            }
        }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
        Assertions.assertEquals((int)0, (int)filterResult.messagesRead());
        Assertions.assertEquals((int)records.sizeInBytes(), (int)filterResult.bytesRead());
        Assertions.assertEquals((long)baseOffset, (long)filterResult.maxOffset());
        Assertions.assertEquals((int)0, (int)filterResult.messagesRetained());
        Assertions.assertEquals((int)61, (int)filterResult.bytesRetained());
        Assertions.assertEquals((long)timestamp, (long)filterResult.maxTimestamp());
        Assertions.assertEquals((long)baseOffset, (long)filterResult.shallowOffsetOfMaxTimestamp());
        Assertions.assertTrue((filterResult.outputBuffer().position() > 0 ? 1 : 0) != 0);
        filtered.flip();
        MemoryRecords filteredRecords = MemoryRecords.readableRecords((ByteBuffer)filtered);
        Assertions.assertEquals((int)61, (int)filteredRecords.sizeInBytes());
    }

    @Test
    public void testEmptyBatchDeletion() {
        for (final MemoryRecords.RecordFilter.BatchRetention deleteRetention : Arrays.asList(MemoryRecords.RecordFilter.BatchRetention.DELETE, MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY)) {
            ByteBuffer buffer = ByteBuffer.allocate(61);
            long producerId = 23L;
            short producerEpoch = 5;
            long baseOffset = 3L;
            int baseSequence = 10;
            int partitionLeaderEpoch = 293;
            long timestamp = System.currentTimeMillis();
            DefaultRecordBatch.writeEmptyHeader((ByteBuffer)buffer, (byte)2, (long)producerId, (short)producerEpoch, (int)baseSequence, (long)baseOffset, (long)baseOffset, (int)partitionLeaderEpoch, (TimestampType)TimestampType.CREATE_TIME, (long)timestamp, (boolean)false, (boolean)false);
            buffer.flip();
            ByteBuffer filtered = ByteBuffer.allocate(2048);
            MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
            MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter(0L, 0L){

                protected MemoryRecords.RecordFilter.BatchRetentionResult checkBatchRetention(RecordBatch batch) {
                    return new MemoryRecords.RecordFilter.BatchRetentionResult(deleteRetention, false);
                }

                protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
                    return false;
                }
            }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
            Assertions.assertEquals((int)0, (int)filterResult.outputBuffer().position());
            filtered.flip();
            MemoryRecords filteredRecords = MemoryRecords.readableRecords((ByteBuffer)filtered);
            Assertions.assertEquals((int)0, (int)filteredRecords.sizeInBytes());
        }
    }

    @Test
    public void testBuildEndTxnMarker() {
        long producerId = 73L;
        short producerEpoch = 13;
        long initialOffset = 983L;
        int coordinatorEpoch = 347;
        int partitionLeaderEpoch = 29;
        EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch);
        MemoryRecords records = MemoryRecords.withEndTransactionMarker((long)initialOffset, (long)System.currentTimeMillis(), (int)partitionLeaderEpoch, (long)producerId, (short)producerEpoch, (EndTransactionMarker)marker);
        Assertions.assertEquals((int)records.buffer().remaining(), (int)records.buffer().capacity());
        List batches = TestUtils.toList(records.batches());
        Assertions.assertEquals((int)1, (int)batches.size());
        RecordBatch batch = (RecordBatch)batches.get(0);
        Assertions.assertTrue((boolean)batch.isControlBatch());
        Assertions.assertEquals((long)producerId, (long)batch.producerId());
        Assertions.assertEquals((short)producerEpoch, (short)batch.producerEpoch());
        Assertions.assertEquals((long)initialOffset, (long)batch.baseOffset());
        Assertions.assertEquals((int)partitionLeaderEpoch, (int)batch.partitionLeaderEpoch());
        Assertions.assertTrue((boolean)batch.isValid());
        List createdRecords = TestUtils.toList(batch);
        Assertions.assertEquals((int)1, (int)createdRecords.size());
        Record record = (Record)createdRecords.get(0);
        record.ensureValid();
        EndTransactionMarker deserializedMarker = EndTransactionMarker.deserialize((Record)record);
        Assertions.assertEquals((Object)ControlRecordType.COMMIT, (Object)deserializedMarker.controlType());
        Assertions.assertEquals((int)coordinatorEpoch, (int)deserializedMarker.coordinatorEpoch());
    }

    @ParameterizedTest
    @ArgumentsSource(value=V2MemoryRecordsArgumentsProvider.class)
    public void testBaseTimestampToDeleteHorizonConversion(Args args) {
        int partitionLeaderEpoch = 998;
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)args.magic, (CompressionType)args.compression, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)-1L, (int)partitionLeaderEpoch);
        builder.append(5L, "0".getBytes(), "0".getBytes());
        builder.append(10L, "1".getBytes(), null);
        builder.append(15L, "2".getBytes(), "2".getBytes());
        ByteBuffer filtered = ByteBuffer.allocate(2048);
        long deleteHorizon = 0x3FFFFFFFL;
        MemoryRecords.RecordFilter recordFilter = new MemoryRecords.RecordFilter(0x3FFFFFFEL, 1L){

            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
                return true;
            }

            protected MemoryRecords.RecordFilter.BatchRetentionResult checkBatchRetention(RecordBatch batch) {
                return new MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.RETAIN_EMPTY, false);
            }
        };
        builder.build().filterTo(new TopicPartition("random", 0), recordFilter, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
        filtered.flip();
        MemoryRecords filteredRecords = MemoryRecords.readableRecords((ByteBuffer)filtered);
        List batches = TestUtils.toList(filteredRecords.batches());
        Assertions.assertEquals((int)1, (int)batches.size());
        Assertions.assertEquals((Object)OptionalLong.of(0x3FFFFFFFL), (Object)((MutableRecordBatch)batches.get(0)).deleteHorizonMs());
        CloseableIterator recordIterator = ((MutableRecordBatch)batches.get(0)).streamingIterator(BufferSupplier.create());
        Record record = (Record)recordIterator.next();
        Assertions.assertEquals((long)5L, (long)record.timestamp());
        record = (Record)recordIterator.next();
        Assertions.assertEquals((long)10L, (long)record.timestamp());
        record = (Record)recordIterator.next();
        Assertions.assertEquals((long)15L, (long)record.timestamp());
        recordIterator.close();
    }

    @Test
    public void testBuildLeaderChangeMessage() {
        int leaderId = 5;
        int leaderEpoch = 20;
        int voterId = 6;
        long initialOffset = 983L;
        LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage().setLeaderId(5).setVoters(Collections.singletonList(new LeaderChangeMessage.Voter().setVoterId(6)));
        ByteBuffer buffer = ByteBuffer.allocate(256);
        MemoryRecords records = MemoryRecords.withLeaderChangeMessage((long)initialOffset, (long)System.currentTimeMillis(), (int)20, (ByteBuffer)buffer, (LeaderChangeMessage)leaderChangeMessage);
        List batches = TestUtils.toList(records.batches());
        Assertions.assertEquals((int)1, (int)batches.size());
        RecordBatch batch = (RecordBatch)batches.get(0);
        Assertions.assertTrue((boolean)batch.isControlBatch());
        Assertions.assertEquals((long)initialOffset, (long)batch.baseOffset());
        Assertions.assertEquals((int)20, (int)batch.partitionLeaderEpoch());
        Assertions.assertTrue((boolean)batch.isValid());
        List createdRecords = TestUtils.toList(batch);
        Assertions.assertEquals((int)1, (int)createdRecords.size());
        Record record = (Record)createdRecords.get(0);
        record.ensureValid();
        Assertions.assertEquals((Object)ControlRecordType.LEADER_CHANGE, (Object)ControlRecordType.parse((ByteBuffer)record.key()));
        LeaderChangeMessage deserializedMessage = ControlRecordUtils.deserializeLeaderChangeMessage((Record)record);
        Assertions.assertEquals((int)5, (int)deserializedMessage.leaderId());
        Assertions.assertEquals((int)1, (int)deserializedMessage.voters().size());
        Assertions.assertEquals((int)6, (int)deserializedMessage.voters().get(0).voterId());
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testFilterToBatchDiscard(Args args) {
        CompressionType compression = args.compression;
        byte magic = args.magic;
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        builder.append(10L, "1".getBytes(), "a".getBytes());
        builder.close();
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)1L);
        builder.append(11L, "2".getBytes(), "b".getBytes());
        builder.append(12L, "3".getBytes(), "c".getBytes());
        builder.close();
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)3L);
        builder.append(13L, "4".getBytes(), "d".getBytes());
        builder.append(20L, "5".getBytes(), "e".getBytes());
        builder.append(15L, "6".getBytes(), "f".getBytes());
        builder.close();
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)6L);
        builder.append(16L, "7".getBytes(), "g".getBytes());
        builder.close();
        buffer.flip();
        ByteBuffer filtered = ByteBuffer.allocate(2048);
        MemoryRecords.readableRecords((ByteBuffer)buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter(0L, 0L){

            protected MemoryRecords.RecordFilter.BatchRetentionResult checkBatchRetention(RecordBatch batch) {
                if (batch.lastOffset() == 2L || batch.lastOffset() == 6L) {
                    return new MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.DELETE, false);
                }
                return new MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY, false);
            }

            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
                return true;
            }
        }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
        filtered.flip();
        MemoryRecords filteredRecords = MemoryRecords.readableRecords((ByteBuffer)filtered);
        List batches = TestUtils.toList(filteredRecords.batches());
        if (compression != CompressionType.NONE || magic >= 2) {
            Assertions.assertEquals((int)2, (int)batches.size());
            Assertions.assertEquals((long)0L, (long)((MutableRecordBatch)batches.get(0)).lastOffset());
            Assertions.assertEquals((long)5L, (long)((MutableRecordBatch)batches.get(1)).lastOffset());
        } else {
            Assertions.assertEquals((int)5, (int)batches.size());
            Assertions.assertEquals((long)0L, (long)((MutableRecordBatch)batches.get(0)).lastOffset());
            Assertions.assertEquals((long)1L, (long)((MutableRecordBatch)batches.get(1)).lastOffset());
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testFilterToAlreadyCompactedLog(Args args) {
        byte magic = args.magic;
        CompressionType compression = args.compression;
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        builder.appendWithOffset(5L, 10L, null, "a".getBytes());
        builder.appendWithOffset(8L, 11L, "1".getBytes(), "b".getBytes());
        builder.appendWithOffset(10L, 12L, null, "c".getBytes());
        builder.close();
        buffer.flip();
        ByteBuffer filtered = ByteBuffer.allocate(2048);
        MemoryRecords.readableRecords((ByteBuffer)buffer).filterTo(new TopicPartition("foo", 0), (MemoryRecords.RecordFilter)new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
        filtered.flip();
        MemoryRecords filteredRecords = MemoryRecords.readableRecords((ByteBuffer)filtered);
        List batches = TestUtils.toList(filteredRecords.batches());
        Assertions.assertEquals((int)1, (int)batches.size());
        MutableRecordBatch batch = (MutableRecordBatch)batches.get(0);
        List records = TestUtils.toList(batch);
        Assertions.assertEquals((int)1, (int)records.size());
        Assertions.assertEquals((long)8L, (long)((Record)records.get(0)).offset());
        if (magic >= 1) {
            Assertions.assertEquals((Object)new SimpleRecord(11L, "1".getBytes(), "b".getBytes()), (Object)new SimpleRecord((Record)records.get(0)));
        } else {
            Assertions.assertEquals((Object)new SimpleRecord(-1L, "1".getBytes(), "b".getBytes()), (Object)new SimpleRecord((Record)records.get(0)));
        }
        if (magic >= 2) {
            Assertions.assertEquals((long)0L, (long)batch.baseOffset());
            Assertions.assertEquals((long)10L, (long)batch.lastOffset());
        } else {
            Assertions.assertEquals((long)8L, (long)batch.baseOffset());
            Assertions.assertEquals((long)8L, (long)batch.lastOffset());
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testFilterToPreservesProducerInfo(Args args) {
        byte magic = args.magic;
        CompressionType compression = args.compression;
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        builder.append(10L, null, "a".getBytes());
        builder.append(11L, "1".getBytes(), "b".getBytes());
        builder.append(12L, null, "c".getBytes());
        builder.close();
        long pid1 = 23L;
        short epoch1 = 5;
        int baseSequence1 = 10;
        MemoryRecordsBuilder idempotentBuilder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)3L, (long)-1L, (long)pid1, (short)epoch1, (int)baseSequence1);
        idempotentBuilder.append(13L, null, "d".getBytes());
        idempotentBuilder.append(14L, "4".getBytes(), "e".getBytes());
        idempotentBuilder.append(15L, "5".getBytes(), "f".getBytes());
        if (magic < 2) {
            Assertions.assertThrows(IllegalArgumentException.class, () -> ((MemoryRecordsBuilder)idempotentBuilder).close());
        } else {
            idempotentBuilder.close();
        }
        long pid2 = 99384L;
        short epoch2 = 234;
        int baseSequence2 = 15;
        Supplier<MemoryRecordsBuilder> transactionSupplier = () -> MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)3L, (long)-1L, (long)pid2, (short)epoch2, (int)baseSequence2, (boolean)true, (int)-1);
        if (magic < 2) {
            Assertions.assertThrows(IllegalArgumentException.class, transactionSupplier::get);
        } else {
            builder = transactionSupplier.get();
            builder.append(16L, "6".getBytes(), "g".getBytes());
            builder.append(17L, "7".getBytes(), "h".getBytes());
            builder.append(18L, null, "i".getBytes());
            builder.close();
            buffer.flip();
            ByteBuffer filtered = ByteBuffer.allocate(2048);
            MemoryRecords.readableRecords((ByteBuffer)buffer).filterTo(new TopicPartition("foo", 0), (MemoryRecords.RecordFilter)new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
            filtered.flip();
            MemoryRecords filteredRecords = MemoryRecords.readableRecords((ByteBuffer)filtered);
            List batches = TestUtils.toList(filteredRecords.batches());
            Assertions.assertEquals((int)3, (int)batches.size());
            MutableRecordBatch firstBatch = (MutableRecordBatch)batches.get(0);
            Assertions.assertEquals((int)1, (int)firstBatch.countOrNull());
            Assertions.assertEquals((long)0L, (long)firstBatch.baseOffset());
            Assertions.assertEquals((long)2L, (long)firstBatch.lastOffset());
            Assertions.assertEquals((long)-1L, (long)firstBatch.producerId());
            Assertions.assertEquals((short)-1, (short)firstBatch.producerEpoch());
            Assertions.assertEquals((int)-1, (int)firstBatch.baseSequence());
            Assertions.assertEquals((int)-1, (int)firstBatch.lastSequence());
            Assertions.assertFalse((boolean)firstBatch.isTransactional());
            List firstBatchRecords = TestUtils.toList(firstBatch);
            Assertions.assertEquals((int)1, (int)firstBatchRecords.size());
            Assertions.assertEquals((int)-1, (int)((Record)firstBatchRecords.get(0)).sequence());
            Assertions.assertEquals((Object)new SimpleRecord(11L, "1".getBytes(), "b".getBytes()), (Object)new SimpleRecord((Record)firstBatchRecords.get(0)));
            MutableRecordBatch secondBatch = (MutableRecordBatch)batches.get(1);
            Assertions.assertEquals((int)2, (int)secondBatch.countOrNull());
            Assertions.assertEquals((long)3L, (long)secondBatch.baseOffset());
            Assertions.assertEquals((long)5L, (long)secondBatch.lastOffset());
            Assertions.assertEquals((long)pid1, (long)secondBatch.producerId());
            Assertions.assertEquals((short)epoch1, (short)secondBatch.producerEpoch());
            Assertions.assertEquals((int)baseSequence1, (int)secondBatch.baseSequence());
            Assertions.assertEquals((int)(baseSequence1 + 2), (int)secondBatch.lastSequence());
            Assertions.assertFalse((boolean)secondBatch.isTransactional());
            List secondBatchRecords = TestUtils.toList(secondBatch);
            Assertions.assertEquals((int)2, (int)secondBatchRecords.size());
            Assertions.assertEquals((int)(baseSequence1 + 1), (int)((Record)secondBatchRecords.get(0)).sequence());
            Assertions.assertEquals((Object)new SimpleRecord(14L, "4".getBytes(), "e".getBytes()), (Object)new SimpleRecord((Record)secondBatchRecords.get(0)));
            Assertions.assertEquals((int)(baseSequence1 + 2), (int)((Record)secondBatchRecords.get(1)).sequence());
            Assertions.assertEquals((Object)new SimpleRecord(15L, "5".getBytes(), "f".getBytes()), (Object)new SimpleRecord((Record)secondBatchRecords.get(1)));
            MutableRecordBatch thirdBatch = (MutableRecordBatch)batches.get(2);
            Assertions.assertEquals((int)2, (int)thirdBatch.countOrNull());
            Assertions.assertEquals((long)3L, (long)thirdBatch.baseOffset());
            Assertions.assertEquals((long)5L, (long)thirdBatch.lastOffset());
            Assertions.assertEquals((long)pid2, (long)thirdBatch.producerId());
            Assertions.assertEquals((short)epoch2, (short)thirdBatch.producerEpoch());
            Assertions.assertEquals((int)baseSequence2, (int)thirdBatch.baseSequence());
            Assertions.assertEquals((int)(baseSequence2 + 2), (int)thirdBatch.lastSequence());
            Assertions.assertTrue((boolean)thirdBatch.isTransactional());
            List thirdBatchRecords = TestUtils.toList(thirdBatch);
            Assertions.assertEquals((int)2, (int)thirdBatchRecords.size());
            Assertions.assertEquals((int)baseSequence2, (int)((Record)thirdBatchRecords.get(0)).sequence());
            Assertions.assertEquals((Object)new SimpleRecord(16L, "6".getBytes(), "g".getBytes()), (Object)new SimpleRecord((Record)thirdBatchRecords.get(0)));
            Assertions.assertEquals((int)(baseSequence2 + 1), (int)((Record)thirdBatchRecords.get(1)).sequence());
            Assertions.assertEquals((Object)new SimpleRecord(17L, "7".getBytes(), "h".getBytes()), (Object)new SimpleRecord((Record)thirdBatchRecords.get(1)));
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testFilterToWithUndersizedBuffer(Args args) {
        byte magic = args.magic;
        CompressionType compression = args.compression;
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        builder.append(10L, null, "a".getBytes());
        builder.close();
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)1L);
        builder.append(11L, "1".getBytes(), new byte[128]);
        builder.append(12L, "2".getBytes(), "c".getBytes());
        builder.append(13L, null, "d".getBytes());
        builder.close();
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)4L);
        builder.append(14L, null, "e".getBytes());
        builder.append(15L, "5".getBytes(), "f".getBytes());
        builder.append(16L, "6".getBytes(), "g".getBytes());
        builder.close();
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)7L);
        builder.append(17L, "7".getBytes(), new byte[128]);
        builder.close();
        buffer.flip();
        ByteBuffer output = ByteBuffer.allocate(64);
        ArrayList records = new ArrayList();
        while (buffer.hasRemaining()) {
            output.rewind();
            MemoryRecords.FilterResult result = MemoryRecords.readableRecords((ByteBuffer)buffer).filterTo(new TopicPartition("foo", 0), (MemoryRecords.RecordFilter)new RetainNonNullKeysFilter(), output, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
            buffer.position(buffer.position() + result.bytesRead());
            result.outputBuffer().flip();
            if (output != result.outputBuffer()) {
                Assertions.assertEquals((int)0, (int)output.position());
            }
            MemoryRecords filtered = MemoryRecords.readableRecords((ByteBuffer)result.outputBuffer());
            records.addAll(TestUtils.toList(filtered.records()));
        }
        Assertions.assertEquals((int)5, (int)records.size());
        for (Record record : records) {
            Assertions.assertNotNull((Object)record.key());
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testFilterTo(Args args) {
        List<Long> expectedMaxTimestamps;
        List<Long> expectedStartOffsets;
        List<Long> expectedEndOffsets;
        byte magic = args.magic;
        CompressionType compression = args.compression;
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        builder.append(10L, null, "a".getBytes());
        builder.close();
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)1L);
        builder.append(11L, "1".getBytes(), "b".getBytes());
        builder.append(12L, null, "c".getBytes());
        builder.close();
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)3L);
        builder.append(13L, null, "d".getBytes());
        builder.append(20L, "4".getBytes(), "e".getBytes());
        builder.append(15L, "5".getBytes(), "f".getBytes());
        builder.close();
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.CREATE_TIME, (long)6L);
        builder.append(16L, "6".getBytes(), "g".getBytes());
        builder.close();
        buffer.flip();
        ByteBuffer filtered = ByteBuffer.allocate(2048);
        MemoryRecords.FilterResult result = MemoryRecords.readableRecords((ByteBuffer)buffer).filterTo(new TopicPartition("foo", 0), (MemoryRecords.RecordFilter)new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
        filtered.flip();
        Assertions.assertEquals((int)7, (int)result.messagesRead());
        Assertions.assertEquals((int)4, (int)result.messagesRetained());
        Assertions.assertEquals((int)buffer.limit(), (int)result.bytesRead());
        Assertions.assertEquals((int)filtered.limit(), (int)result.bytesRetained());
        if (magic > 0) {
            Assertions.assertEquals((long)20L, (long)result.maxTimestamp());
            if (compression == CompressionType.NONE && magic < 2) {
                Assertions.assertEquals((long)4L, (long)result.shallowOffsetOfMaxTimestamp());
            } else {
                Assertions.assertEquals((long)5L, (long)result.shallowOffsetOfMaxTimestamp());
            }
        }
        MemoryRecords filteredRecords = MemoryRecords.readableRecords((ByteBuffer)filtered);
        List batches = TestUtils.toList(filteredRecords.batches());
        if (magic < 2 && compression == CompressionType.NONE) {
            expectedEndOffsets = Arrays.asList(1L, 4L, 5L, 6L);
            expectedStartOffsets = Arrays.asList(1L, 4L, 5L, 6L);
            expectedMaxTimestamps = Arrays.asList(11L, 20L, 15L, 16L);
        } else if (magic < 2) {
            expectedEndOffsets = Arrays.asList(1L, 5L, 6L);
            expectedStartOffsets = Arrays.asList(1L, 4L, 6L);
            expectedMaxTimestamps = Arrays.asList(11L, 20L, 16L);
        } else {
            expectedEndOffsets = Arrays.asList(2L, 5L, 6L);
            expectedStartOffsets = Arrays.asList(1L, 3L, 6L);
            expectedMaxTimestamps = Arrays.asList(11L, 20L, 16L);
        }
        Assertions.assertEquals((int)expectedEndOffsets.size(), (int)batches.size());
        for (int i = 0; i < expectedEndOffsets.size(); ++i) {
            RecordBatch batch = (RecordBatch)batches.get(i);
            Assertions.assertEquals((long)expectedStartOffsets.get(i), (long)batch.baseOffset());
            Assertions.assertEquals((long)expectedEndOffsets.get(i), (long)batch.lastOffset());
            Assertions.assertEquals((byte)magic, (byte)batch.magic());
            Assertions.assertEquals((Object)compression, (Object)batch.compressionType());
            if (magic >= 1) {
                Assertions.assertEquals((long)expectedMaxTimestamps.get(i), (long)batch.maxTimestamp());
                Assertions.assertEquals((Object)TimestampType.CREATE_TIME, (Object)batch.timestampType());
                continue;
            }
            Assertions.assertEquals((long)-1L, (long)batch.maxTimestamp());
            Assertions.assertEquals((Object)TimestampType.NO_TIMESTAMP_TYPE, (Object)batch.timestampType());
        }
        List records = TestUtils.toList(filteredRecords.records());
        Assertions.assertEquals((int)4, (int)records.size());
        Record first = (Record)records.get(0);
        Assertions.assertEquals((long)1L, (long)first.offset());
        if (magic > 0) {
            Assertions.assertEquals((long)11L, (long)first.timestamp());
        }
        Assertions.assertEquals((Object)"1", (Object)Utils.utf8((ByteBuffer)first.key(), (int)first.keySize()));
        Assertions.assertEquals((Object)"b", (Object)Utils.utf8((ByteBuffer)first.value(), (int)first.valueSize()));
        Record second = (Record)records.get(1);
        Assertions.assertEquals((long)4L, (long)second.offset());
        if (magic > 0) {
            Assertions.assertEquals((long)20L, (long)second.timestamp());
        }
        Assertions.assertEquals((Object)"4", (Object)Utils.utf8((ByteBuffer)second.key(), (int)second.keySize()));
        Assertions.assertEquals((Object)"e", (Object)Utils.utf8((ByteBuffer)second.value(), (int)second.valueSize()));
        Record third = (Record)records.get(2);
        Assertions.assertEquals((long)5L, (long)third.offset());
        if (magic > 0) {
            Assertions.assertEquals((long)15L, (long)third.timestamp());
        }
        Assertions.assertEquals((Object)"5", (Object)Utils.utf8((ByteBuffer)third.key(), (int)third.keySize()));
        Assertions.assertEquals((Object)"f", (Object)Utils.utf8((ByteBuffer)third.value(), (int)third.valueSize()));
        Record fourth = (Record)records.get(3);
        Assertions.assertEquals((long)6L, (long)fourth.offset());
        if (magic > 0) {
            Assertions.assertEquals((long)16L, (long)fourth.timestamp());
        }
        Assertions.assertEquals((Object)"6", (Object)Utils.utf8((ByteBuffer)fourth.key(), (int)fourth.keySize()));
        Assertions.assertEquals((Object)"g", (Object)Utils.utf8((ByteBuffer)fourth.value(), (int)fourth.valueSize()));
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testFilterToPreservesLogAppendTime(Args args) {
        byte magic = args.magic;
        CompressionType compression = args.compression;
        long pid = args.pid;
        short epoch = args.epoch;
        int firstSequence = args.firstSequence;
        long logAppendTime = System.currentTimeMillis();
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.LOG_APPEND_TIME, (long)0L, (long)logAppendTime, (long)pid, (short)epoch, (int)firstSequence);
        builder.append(10L, null, "a".getBytes());
        builder.close();
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.LOG_APPEND_TIME, (long)1L, (long)logAppendTime, (long)pid, (short)epoch, (int)firstSequence);
        builder.append(11L, "1".getBytes(), "b".getBytes());
        builder.append(12L, null, "c".getBytes());
        builder.close();
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compression, (TimestampType)TimestampType.LOG_APPEND_TIME, (long)3L, (long)logAppendTime, (long)pid, (short)epoch, (int)firstSequence);
        builder.append(13L, null, "d".getBytes());
        builder.append(14L, "4".getBytes(), "e".getBytes());
        builder.append(15L, "5".getBytes(), "f".getBytes());
        builder.close();
        buffer.flip();
        ByteBuffer filtered = ByteBuffer.allocate(2048);
        MemoryRecords.readableRecords((ByteBuffer)buffer).filterTo(new TopicPartition("foo", 0), (MemoryRecords.RecordFilter)new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
        filtered.flip();
        MemoryRecords filteredRecords = MemoryRecords.readableRecords((ByteBuffer)filtered);
        List<RecordBatch> batches = TestUtils.toList(filteredRecords.batches());
        Assertions.assertEquals((int)(magic < 2 && compression == CompressionType.NONE ? 3 : 2), (int)batches.size());
        for (RecordBatch batch : batches) {
            Assertions.assertEquals((Object)compression, (Object)batch.compressionType());
            if (magic <= 0) continue;
            Assertions.assertEquals((Object)TimestampType.LOG_APPEND_TIME, (Object)batch.timestampType());
            Assertions.assertEquals((long)logAppendTime, (long)batch.maxTimestamp());
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testNextBatchSize(Args args) {
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)args.magic, (CompressionType)args.compression, (TimestampType)TimestampType.LOG_APPEND_TIME, (long)0L, (long)this.logAppendTime, (long)args.pid, (short)args.epoch, (int)args.firstSequence);
        builder.append(10L, null, "abc".getBytes());
        builder.close();
        buffer.flip();
        int size = buffer.remaining();
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        Assertions.assertEquals((int)size, (int)records.firstBatchSize());
        Assertions.assertEquals((int)0, (int)buffer.position());
        buffer.limit(1);
        Assertions.assertNull((Object)records.firstBatchSize());
        buffer.limit(12);
        Assertions.assertNull((Object)records.firstBatchSize());
        buffer.limit(17);
        Assertions.assertEquals((int)size, (int)records.firstBatchSize());
        buffer.limit(size);
        byte magic = buffer.get(16);
        buffer.put(16, (byte)10);
        Assertions.assertThrows(CorruptRecordException.class, () -> ((MemoryRecords)records).firstBatchSize());
        buffer.put(16, magic);
        buffer.put(11, (byte)0);
        Assertions.assertThrows(CorruptRecordException.class, () -> ((MemoryRecords)records).firstBatchSize());
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsArgumentsProvider.class)
    public void testWithRecords(Args args) {
        CompressionType compression = args.compression;
        byte magic = args.magic;
        MemoryRecords memoryRecords = MemoryRecords.withRecords((byte)magic, (CompressionType)compression, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(10L, "key1".getBytes(), "value1".getBytes())});
        String key = Utils.utf8((ByteBuffer)((Record)((MutableRecordBatch)memoryRecords.batches().iterator().next()).iterator().next()).key());
        Assertions.assertEquals((Object)"key1", (Object)key);
    }

    @Test
    public void testUnsupportedCompress() {
        BiFunction<Byte, CompressionType, MemoryRecords> builderBiFunction = (magic, compressionType) -> MemoryRecords.withRecords((byte)magic, (CompressionType)compressionType, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(10L, "key1".getBytes(), "value1".getBytes())});
        Arrays.asList((byte)0, (byte)1).forEach(magic -> {
            Exception e = (Exception)Assertions.assertThrows(IllegalArgumentException.class, () -> {
                MemoryRecords cfr_ignored_0 = (MemoryRecords)builderBiFunction.apply((Byte)magic, CompressionType.ZSTD);
            });
            Assertions.assertEquals((Object)e.getMessage(), (Object)("ZStandard compression is not supported for magic " + magic));
        });
    }

    private static class RetainNonNullKeysFilter
    extends MemoryRecords.RecordFilter {
        public RetainNonNullKeysFilter() {
            super(0L, 0L);
        }

        protected MemoryRecords.RecordFilter.BatchRetentionResult checkBatchRetention(RecordBatch batch) {
            return new MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY, false);
        }

        public boolean shouldRetainRecord(RecordBatch batch, Record record) {
            return record.hasKey();
        }
    }

    private static class V2MemoryRecordsArgumentsProvider
    implements ArgumentsProvider {
        private V2MemoryRecordsArgumentsProvider() {
        }

        public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
            ArrayList<Arguments> arguments = new ArrayList<Arguments>();
            for (long firstOffset : Arrays.asList(0L, 57L)) {
                for (CompressionType type : CompressionType.values()) {
                    arguments.add(Arguments.of((Object[])new Object[]{new Args(2, firstOffset, type)}));
                }
            }
            return arguments.stream();
        }
    }

    private static class MemoryRecordsArgumentsProvider
    implements ArgumentsProvider {
        private MemoryRecordsArgumentsProvider() {
        }

        public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
            ArrayList<Arguments> arguments = new ArrayList<Arguments>();
            for (long firstOffset : Arrays.asList(0L, 57L)) {
                for (CompressionType type : CompressionType.values()) {
                    List<Byte> magics = type == CompressionType.ZSTD ? Collections.singletonList((byte)2) : Arrays.asList((byte)0, (byte)1, (byte)2);
                    for (byte magic : magics) {
                        arguments.add(Arguments.of((Object[])new Object[]{new Args(magic, firstOffset, type)}));
                    }
                }
            }
            return arguments.stream();
        }
    }

    private static class Args {
        final CompressionType compression;
        final byte magic;
        final long firstOffset;
        final long pid;
        final short epoch;
        final int firstSequence;

        public Args(byte magic, long firstOffset, CompressionType compression) {
            this.magic = magic;
            this.compression = compression;
            this.firstOffset = firstOffset;
            if (magic >= 2) {
                this.pid = 134234L;
                this.epoch = (short)28;
                this.firstSequence = 777;
            } else {
                this.pid = -1L;
                this.epoch = (short)-1;
                this.firstSequence = -1;
            }
        }

        public String toString() {
            return "magic=" + this.magic + ", firstOffset=" + this.firstOffset + ", compressionType=" + this.compression;
        }
    }
}

