/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Deque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.CompoundStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
public class BufferPoolTest {
    private final MockTime time = new MockTime();
    private final Metrics metrics = new Metrics((Time)this.time);
    private final long maxBlockTimeMs = 2000L;
    private final String metricGroup = "TestMetrics";

    @After
    public void teardown() {
        this.metrics.close();
    }

    @Test
    public void testSimple() throws Exception {
        long totalMemory = 65536L;
        int size = 1024;
        BufferPool pool = new BufferPool(totalMemory, size, this.metrics, (Time)this.time, "TestMetrics");
        ByteBuffer buffer = pool.allocate(size, 2000L);
        Assert.assertEquals((String)"Buffer size should equal requested size.", (long)size, (long)buffer.limit());
        Assert.assertEquals((String)"Unallocated memory should have shrunk", (long)(totalMemory - (long)size), (long)pool.unallocatedMemory());
        Assert.assertEquals((String)"Available memory should have shrunk", (long)(totalMemory - (long)size), (long)pool.availableMemory());
        buffer.putInt(1);
        buffer.flip();
        pool.deallocate(buffer);
        Assert.assertEquals((String)"All memory should be available", (long)totalMemory, (long)pool.availableMemory());
        Assert.assertEquals((String)"But now some is on the free list", (long)(totalMemory - (long)size), (long)pool.unallocatedMemory());
        buffer = pool.allocate(size, 2000L);
        Assert.assertEquals((String)"Recycled buffer should be cleared.", (long)0L, (long)buffer.position());
        Assert.assertEquals((String)"Recycled buffer should be cleared.", (long)buffer.capacity(), (long)buffer.limit());
        pool.deallocate(buffer);
        Assert.assertEquals((String)"All memory should be available", (long)totalMemory, (long)pool.availableMemory());
        Assert.assertEquals((String)"Still a single buffer on the free list", (long)(totalMemory - (long)size), (long)pool.unallocatedMemory());
        buffer = pool.allocate(2 * size, 2000L);
        pool.deallocate(buffer);
        Assert.assertEquals((String)"All memory should be available", (long)totalMemory, (long)pool.availableMemory());
        Assert.assertEquals((String)"Non-standard size didn't go to the free list.", (long)(totalMemory - (long)size), (long)pool.unallocatedMemory());
    }

    @Test(expected=IllegalArgumentException.class)
    public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
        BufferPool pool = new BufferPool(1024L, 512, this.metrics, (Time)this.time, "TestMetrics");
        ByteBuffer buffer = pool.allocate(1024, 2000L);
        Assert.assertEquals((long)1024L, (long)buffer.limit());
        pool.deallocate(buffer);
        pool.allocate(1025, 2000L);
    }

    @Test
    public void testDelayedAllocation() throws Exception {
        BufferPool pool = new BufferPool(5120L, 1024, this.metrics, (Time)this.time, "TestMetrics");
        ByteBuffer buffer = pool.allocate(1024, 2000L);
        CountDownLatch doDealloc = this.asyncDeallocate(pool, buffer);
        CountDownLatch allocation = this.asyncAllocate(pool, 5120);
        Assert.assertEquals((String)"Allocation shouldn't have happened yet, waiting on memory.", (long)1L, (long)allocation.getCount());
        doDealloc.countDown();
        Assert.assertTrue((String)"Allocation should succeed soon after de-allocation", (boolean)allocation.await(1L, TimeUnit.SECONDS));
    }

    private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) {
        final CountDownLatch latch = new CountDownLatch(1);
        Thread thread = new Thread(){

            @Override
            public void run() {
                try {
                    latch.await();
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                pool.deallocate(buffer);
            }
        };
        thread.start();
        return latch;
    }

    private void delayedDeallocate(final BufferPool pool, final ByteBuffer buffer, final long delayMs) {
        Thread thread = new Thread(){

            @Override
            public void run() {
                Time.SYSTEM.sleep(delayMs);
                pool.deallocate(buffer);
            }
        };
        thread.start();
    }

    private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
        final CountDownLatch completed = new CountDownLatch(1);
        Thread thread = new Thread(){

            @Override
            public void run() {
                try {
                    pool.allocate(size, 2000L);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                finally {
                    completed.countDown();
                }
            }
        };
        thread.start();
        return completed;
    }

    @Test
    public void testBlockTimeout() throws Exception {
        BufferPool pool = new BufferPool(10L, 1, this.metrics, Time.SYSTEM, "TestMetrics");
        ByteBuffer buffer1 = pool.allocate(1, 2000L);
        ByteBuffer buffer2 = pool.allocate(1, 2000L);
        ByteBuffer buffer3 = pool.allocate(1, 2000L);
        this.delayedDeallocate(pool, buffer1, 1000L);
        this.delayedDeallocate(pool, buffer2, 2000L);
        this.delayedDeallocate(pool, buffer3, 5000L);
        long beginTimeMs = Time.SYSTEM.milliseconds();
        try {
            pool.allocate(10, 2000L);
            Assert.fail((String)"The buffer allocated more memory than its maximum value 10");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assert.assertTrue((String)("available memory" + pool.availableMemory()), (pool.availableMemory() >= 9L && pool.availableMemory() <= 10L ? 1 : 0) != 0);
        long endTimeMs = Time.SYSTEM.milliseconds();
        Assert.assertTrue((String)"Allocation should finish not much later than maxBlockTimeMs", (endTimeMs - beginTimeMs < 3000L ? 1 : 0) != 0);
    }

    @Test
    public void testCleanupMemoryAvailabilityWaiterOnBlockTimeout() throws Exception {
        BufferPool pool = new BufferPool(2L, 1, this.metrics, (Time)this.time, "TestMetrics");
        pool.allocate(1, 2000L);
        try {
            pool.allocate(2, 2000L);
            Assert.fail((String)"The buffer allocated more memory than its maximum value 2");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assert.assertTrue((pool.queued() == 0 ? 1 : 0) != 0);
    }

    @Test
    public void testCleanupMemoryAvailabilityWaiterOnInterruption() throws Exception {
        BufferPool pool = new BufferPool(2L, 1, this.metrics, (Time)this.time, "TestMetrics");
        long blockTime = 5000L;
        pool.allocate(1, 2000L);
        Thread t1 = new Thread(new BufferPoolAllocator(pool, blockTime));
        Thread t2 = new Thread(new BufferPoolAllocator(pool, blockTime));
        t1.start();
        Thread.sleep(500L);
        Deque waiters = pool.waiters();
        Condition c1 = (Condition)waiters.getFirst();
        t2.start();
        Thread.sleep(500L);
        t1.interrupt();
        Thread.sleep(500L);
        Condition c2 = (Condition)waiters.getLast();
        t2.interrupt();
        Assert.assertNotEquals((Object)c1, (Object)c2);
        t1.join();
        t2.join();
        Assert.assertEquals((long)pool.queued(), (long)0L);
    }

    @PrepareForTest(value={Sensor.class, MetricName.class})
    @Test
    public void testCleanupMemoryAvailabilityOnMetricsException() throws Exception {
        Metrics mockedMetrics = (Metrics)EasyMock.createNiceMock(Metrics.class);
        Sensor mockedSensor = (Sensor)EasyMock.createNiceMock(Sensor.class);
        MetricName metricName = (MetricName)EasyMock.createNiceMock(MetricName.class);
        MetricName rateMetricName = (MetricName)EasyMock.createNiceMock(MetricName.class);
        MetricName totalMetricName = (MetricName)EasyMock.createNiceMock(MetricName.class);
        EasyMock.expect((Object)mockedMetrics.sensor("bufferpool-wait-time")).andReturn((Object)mockedSensor);
        mockedSensor.record(EasyMock.anyDouble(), EasyMock.anyLong());
        EasyMock.expectLastCall().andThrow((Throwable)new OutOfMemoryError());
        EasyMock.expect((Object)mockedMetrics.metricName(EasyMock.anyString(), (String)EasyMock.eq((Object)"TestMetrics"), EasyMock.anyString())).andReturn((Object)metricName);
        mockedSensor.add((CompoundStat)new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName));
        EasyMock.replay((Object[])new Object[]{mockedMetrics, mockedSensor, metricName});
        BufferPool bufferPool = new BufferPool(2L, 1, mockedMetrics, (Time)this.time, "TestMetrics");
        bufferPool.allocate(1, 0L);
        try {
            bufferPool.allocate(2, 1000L);
            Assert.assertTrue((String)"Expected oom.", (boolean)false);
        }
        catch (OutOfMemoryError outOfMemoryError) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)bufferPool.availableMemory());
        Assert.assertEquals((long)0L, (long)bufferPool.queued());
        bufferPool.allocate(1, 0L);
    }

    @Test
    public void testStressfulSituation() throws Exception {
        int numThreads = 10;
        int iterations = 50000;
        int poolableSize = 1024;
        long totalMemory = numThreads / 2 * 1024;
        BufferPool pool = new BufferPool(totalMemory, 1024, this.metrics, (Time)this.time, "TestMetrics");
        ArrayList<StressTestThread> threads = new ArrayList<StressTestThread>();
        for (int i = 0; i < numThreads; ++i) {
            threads.add(new StressTestThread(pool, 50000));
        }
        for (StressTestThread thread : threads) {
            thread.start();
        }
        for (StressTestThread thread : threads) {
            thread.join();
        }
        for (StressTestThread thread : threads) {
            Assert.assertTrue((String)"Thread should have completed all iterations successfully.", (boolean)thread.success.get());
        }
        Assert.assertEquals((long)totalMemory, (long)pool.availableMemory());
    }

    @Test
    public void testLargeAvailableMemory() throws Exception {
        long memory = 20000000000L;
        int poolableSize = 2000000000;
        final AtomicInteger freeSize = new AtomicInteger(0);
        BufferPool pool = new BufferPool(memory, poolableSize, this.metrics, this.time, "TestMetrics"){

            protected ByteBuffer allocateByteBuffer(int size) {
                return ByteBuffer.allocate(0);
            }

            protected int freeSize() {
                return freeSize.get();
            }
        };
        pool.allocate(poolableSize, 0L);
        Assert.assertEquals((long)18000000000L, (long)pool.availableMemory());
        pool.allocate(poolableSize, 0L);
        Assert.assertEquals((long)16000000000L, (long)pool.availableMemory());
        freeSize.incrementAndGet();
        Assert.assertEquals((long)18000000000L, (long)pool.availableMemory());
        freeSize.incrementAndGet();
        Assert.assertEquals((long)20000000000L, (long)pool.availableMemory());
    }

    @Test
    public void outOfMemoryOnAllocation() {
        BufferPool bufferPool = new BufferPool(1024L, 1024, this.metrics, this.time, "TestMetrics"){

            protected ByteBuffer allocateByteBuffer(int size) {
                throw new OutOfMemoryError();
            }
        };
        try {
            bufferPool.allocateByteBuffer(1024);
            Assert.fail((String)"Should have thrown OutOfMemoryError");
        }
        catch (OutOfMemoryError outOfMemoryError) {
            // empty catch block
        }
        Assert.assertEquals((long)bufferPool.availableMemory(), (long)1024L);
    }

    public static class StressTestThread
    extends Thread {
        private final int iterations;
        private final BufferPool pool;
        private final long maxBlockTimeMs = 20000L;
        public final AtomicBoolean success = new AtomicBoolean(false);

        public StressTestThread(BufferPool pool, int iterations) {
            this.iterations = iterations;
            this.pool = pool;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < this.iterations; ++i) {
                    int size = TestUtils.RANDOM.nextBoolean() ? this.pool.poolableSize() : TestUtils.RANDOM.nextInt((int)this.pool.totalMemory());
                    ByteBuffer buffer = this.pool.allocate(size, 20000L);
                    this.pool.deallocate(buffer);
                }
                this.success.set(true);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static class BufferPoolAllocator
    implements Runnable {
        BufferPool pool;
        long maxBlockTimeMs;

        BufferPoolAllocator(BufferPool pool, long maxBlockTimeMs) {
            this.pool = pool;
            this.maxBlockTimeMs = maxBlockTimeMs;
        }

        @Override
        public void run() {
            try {
                this.pool.allocate(2, this.maxBlockTimeMs);
                Assert.fail((String)"The buffer allocated more memory than its maximum value 2");
            }
            catch (TimeoutException timeoutException) {
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

