/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ipc;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.ipc.CallQueueManager;
import org.junit.Assert;
import org.junit.Test;

public class TestCallQueueManager {
    private CallQueueManager<FakeCall> manager;
    private static final Class<? extends BlockingQueue<FakeCall>> queueClass = CallQueueManager.convertQueueClass(LinkedBlockingQueue.class, FakeCall.class);

    public void assertCanTake(CallQueueManager<FakeCall> cq, int numberOfTakes, int takeAttempts) throws InterruptedException {
        Taker taker = new Taker(cq, takeAttempts, -1);
        Thread t = new Thread(taker);
        t.start();
        t.join(100L);
        Assert.assertEquals((long)taker.callsTaken, (long)numberOfTakes);
        t.interrupt();
    }

    public void assertCanPut(CallQueueManager<FakeCall> cq, int numberOfPuts, int putAttempts) throws InterruptedException {
        Putter putter = new Putter(cq, putAttempts, -1);
        Thread t = new Thread(putter);
        t.start();
        t.join(100L);
        Assert.assertEquals((long)putter.callsAdded, (long)numberOfPuts);
        t.interrupt();
    }

    @Test
    public void testCallQueueCapacity() throws InterruptedException {
        this.manager = new CallQueueManager(queueClass, 10, "", null);
        this.assertCanPut(this.manager, 10, 20);
    }

    @Test
    public void testEmptyConsume() throws InterruptedException {
        this.manager = new CallQueueManager(queueClass, 10, "", null);
        this.assertCanTake(this.manager, 0, 1);
    }

    @Test(timeout=60000L)
    public void testSwapUnderContention() throws InterruptedException {
        int i;
        this.manager = new CallQueueManager(queueClass, 5000, "", null);
        ArrayList<Putter> producers = new ArrayList<Putter>();
        ArrayList<Taker> consumers = new ArrayList<Taker>();
        HashMap<Runnable, Thread> threads = new HashMap<Runnable, Thread>();
        for (i = 0; i < 50; ++i) {
            Putter p = new Putter(this.manager, -1, -1);
            Thread pt = new Thread(p);
            producers.add(p);
            threads.put(p, pt);
            pt.start();
        }
        for (i = 0; i < 20; ++i) {
            Taker t = new Taker(this.manager, -1, -1);
            Thread tt = new Thread(t);
            consumers.add(t);
            threads.put(t, tt);
            tt.start();
        }
        Thread.sleep(10L);
        for (i = 0; i < 5; ++i) {
            this.manager.swapQueue(queueClass, 5000, "", null);
        }
        for (Putter p : producers) {
            p.stop();
        }
        Thread.sleep(2000L);
        Assert.assertEquals((long)0L, (long)this.manager.size());
        long totalCallsCreated = 0L;
        long totalCallsConsumed = 0L;
        for (Putter p : producers) {
            totalCallsCreated += (long)p.callsAdded;
            ((Thread)threads.get(p)).interrupt();
        }
        for (Taker t : consumers) {
            totalCallsConsumed += (long)t.callsTaken;
            ((Thread)threads.get(t)).interrupt();
        }
        Assert.assertEquals((long)totalCallsConsumed, (long)totalCallsCreated);
    }

    public class Taker
    implements Runnable {
        private final CallQueueManager<FakeCall> cq;
        public final int tag;
        public volatile int callsTaken = 0;
        public volatile FakeCall lastResult = null;
        private final int maxCalls;

        public Taker(CallQueueManager<FakeCall> aCq, int maxCalls, int tag) {
            this.maxCalls = maxCalls;
            this.cq = aCq;
            this.tag = tag;
        }

        @Override
        public void run() {
            try {
                while (this.callsTaken < this.maxCalls || this.maxCalls < 0) {
                    FakeCall res = (FakeCall)this.cq.take();
                    if (this.tag >= 0 && res.tag != this.tag) {
                        this.cq.put((Object)res);
                        continue;
                    }
                    ++this.callsTaken;
                    this.lastResult = res;
                }
            }
            catch (InterruptedException e) {
                return;
            }
        }
    }

    public class Putter
    implements Runnable {
        private final CallQueueManager<FakeCall> cq;
        public final int tag;
        public volatile int callsAdded = 0;
        private final int maxCalls;
        private boolean isRunning = true;

        public Putter(CallQueueManager<FakeCall> aCq, int maxCalls, int tag) {
            this.maxCalls = maxCalls;
            this.cq = aCq;
            this.tag = tag;
        }

        @Override
        public void run() {
            try {
                while (this.isRunning && (this.callsAdded < this.maxCalls || this.maxCalls < 0)) {
                    this.cq.put((Object)new FakeCall(this.tag));
                    ++this.callsAdded;
                }
            }
            catch (InterruptedException e) {
                return;
            }
        }

        public void stop() {
            this.isRunning = false;
        }
    }

    public class FakeCall {
        public final int tag;

        public FakeCall(int tag) {
            this.tag = tag;
        }
    }
}

