Home | History | Annotate | Download | only in collect
      1 /*
      2  * Copyright (C) 2011 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.collect;
     18 
     19 import com.google.common.util.concurrent.Uninterruptibles;
     20 
     21 import junit.framework.TestCase;
     22 
     23 import java.util.Collection;
     24 import java.util.List;
     25 import java.util.concurrent.ArrayBlockingQueue;
     26 import java.util.concurrent.BlockingQueue;
     27 import java.util.concurrent.ExecutorService;
     28 import java.util.concurrent.Executors;
     29 import java.util.concurrent.Future;
     30 import java.util.concurrent.LinkedBlockingQueue;
     31 import java.util.concurrent.PriorityBlockingQueue;
     32 import java.util.concurrent.SynchronousQueue;
     33 import java.util.concurrent.TimeUnit;
     34 
     35 /**
     36  * Tests for {@link Queues}.
     37  *
     38  * @author Dimitris Andreou
     39  */
     40 
     41 public class QueuesTest extends TestCase {
     42   /*
     43    * All the following tests relate to BlockingQueue methods in Queues.
     44    */
     45 
     46   public static List<BlockingQueue<Object>> blockingQueues() {
     47     return ImmutableList.<BlockingQueue<Object>>of(
     48         new LinkedBlockingQueue<Object>(),
     49         new LinkedBlockingQueue<Object>(10),
     50         new SynchronousQueue<Object>(),
     51         new ArrayBlockingQueue<Object>(10),
     52         new PriorityBlockingQueue<Object>(10, Ordering.arbitrary()));
     53   }
     54 
     55   private ExecutorService threadPool;
     56 
     57   @Override
     58   public void setUp() {
     59     threadPool = Executors.newCachedThreadPool();
     60   }
     61 
     62   @Override
     63   public void tearDown() throws InterruptedException {
     64     // notice that if a Producer is interrupted (a bug), the Producer will go into an infinite
     65     // loop, which will be noticed here
     66     threadPool.shutdown();
     67     assertTrue("Some worker didn't finish in time",
     68         threadPool.awaitTermination(1, TimeUnit.SECONDS));
     69   }
     70 
     71   private static <T> int drain(BlockingQueue<T> q, Collection<? super T> buffer, int maxElements,
     72       long timeout, TimeUnit unit, boolean interruptibly) throws InterruptedException {
     73     return interruptibly
     74         ? Queues.drain(q, buffer, maxElements, timeout, unit)
     75         : Queues.drainUninterruptibly(q, buffer, maxElements, timeout, unit);
     76   }
     77 
     78   public void testMultipleProducers() throws Exception {
     79     for (BlockingQueue<Object> q : blockingQueues()) {
     80       testMultipleProducers(q);
     81     }
     82   }
     83 
     84   private void testMultipleProducers(BlockingQueue<Object> q)
     85       throws InterruptedException {
     86     for (boolean interruptibly : new boolean[] { true, false }) {
     87       threadPool.submit(new Producer(q, 20));
     88       threadPool.submit(new Producer(q, 20));
     89       threadPool.submit(new Producer(q, 20));
     90       threadPool.submit(new Producer(q, 20));
     91       threadPool.submit(new Producer(q, 20));
     92 
     93       List<Object> buf = Lists.newArrayList();
     94       int elements = drain(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS, interruptibly);
     95       assertEquals(100, elements);
     96       assertEquals(100, buf.size());
     97       assertDrained(q);
     98     }
     99   }
    100 
    101   public void testDrainTimesOut() throws Exception {
    102     for (BlockingQueue<Object> q : blockingQueues()) {
    103       testDrainTimesOut(q);
    104     }
    105   }
    106 
    107   private void testDrainTimesOut(BlockingQueue<Object> q) throws Exception {
    108     for (boolean interruptibly : new boolean[] { true, false }) {
    109       assertEquals(0, Queues.drain(q, ImmutableList.of(), 1, 10, TimeUnit.MILLISECONDS));
    110 
    111       // producing one, will ask for two
    112       Future<?> submitter = threadPool.submit(new Producer(q, 1));
    113 
    114       // make sure we time out
    115       long startTime = System.nanoTime();
    116 
    117       int drained = drain(q, Lists.newArrayList(), 2, 10, TimeUnit.MILLISECONDS, interruptibly);
    118       assertTrue(drained <= 1);
    119 
    120       assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10));
    121 
    122       // If even the first one wasn't there, clean up so that the next test doesn't see an element.
    123       submitter.get();
    124       if (drained == 0) {
    125         assertNotNull(q.poll());
    126       }
    127     }
    128   }
    129 
    130   public void testZeroElements() throws Exception {
    131     for (BlockingQueue<Object> q : blockingQueues()) {
    132       testZeroElements(q);
    133     }
    134   }
    135 
    136   private void testZeroElements(BlockingQueue<Object> q) throws InterruptedException {
    137     for (boolean interruptibly : new boolean[] { true, false }) {
    138       // asking to drain zero elements
    139       assertEquals(0, drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS, interruptibly));
    140     }
    141   }
    142 
    143   public void testEmpty() throws Exception {
    144     for (BlockingQueue<Object> q : blockingQueues()) {
    145       testEmpty(q);
    146     }
    147   }
    148 
    149   private void testEmpty(BlockingQueue<Object> q) {
    150     assertDrained(q);
    151   }
    152 
    153   public void testNegativeMaxElements() throws Exception {
    154     for (BlockingQueue<Object> q : blockingQueues()) {
    155       testNegativeMaxElements(q);
    156     }
    157   }
    158 
    159   private void testNegativeMaxElements(BlockingQueue<Object> q) throws InterruptedException {
    160     threadPool.submit(new Producer(q, 1));
    161 
    162     List<Object> buf = Lists.newArrayList();
    163     int elements = Queues.drain(q, buf, -1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    164     assertEquals(elements, 0);
    165     assertTrue(buf.isEmpty());
    166 
    167     // Clean up produced element to free the producer thread, otherwise it will complain
    168     // when we shutdown the threadpool.
    169     Queues.drain(q, buf, 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    170   }
    171 
    172   public void testDrain_throws() throws Exception {
    173     for (BlockingQueue<Object> q : blockingQueues()) {
    174       testDrain_throws(q);
    175     }
    176   }
    177 
    178   private void testDrain_throws(BlockingQueue<Object> q) {
    179     threadPool.submit(new Interrupter(Thread.currentThread()));
    180     try {
    181       Queues.drain(q, ImmutableList.of(), 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    182       fail();
    183     } catch (InterruptedException expected) {
    184     }
    185   }
    186 
    187   public void testDrainUninterruptibly_doesNotThrow() throws Exception {
    188     for (BlockingQueue<Object> q : blockingQueues()) {
    189       testDrainUninterruptibly_doesNotThrow(q);
    190     }
    191   }
    192 
    193   private void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q) {
    194     final Thread mainThread = Thread.currentThread();
    195     threadPool.submit(new Runnable() {
    196       public void run() {
    197         new Producer(q, 50).run();
    198         new Interrupter(mainThread).run();
    199         new Producer(q, 50).run();
    200       }
    201     });
    202     List<Object> buf = Lists.newArrayList();
    203     int elements =
    204         Queues.drainUninterruptibly(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    205     // so when this drains all elements, we know the thread has also been interrupted in between
    206     assertTrue(Thread.interrupted());
    207     assertEquals(100, elements);
    208     assertEquals(100, buf.size());
    209   }
    210 
    211   public void testNewLinkedBlockingQueueCapacity() {
    212     try {
    213       Queues.newLinkedBlockingQueue(0);
    214       fail("Should have thrown IllegalArgumentException");
    215     } catch (IllegalArgumentException expected) {
    216       // any capacity less than 1 should throw IllegalArgumentException
    217     }
    218     assertEquals(1, Queues.newLinkedBlockingQueue(1).remainingCapacity());
    219     assertEquals(11, Queues.newLinkedBlockingQueue(11).remainingCapacity());
    220   }
    221 
    222   /**
    223    * Checks that #drain() invocations behave correctly for a drained (empty) queue.
    224    */
    225   private void assertDrained(BlockingQueue<Object> q) {
    226     assertNull(q.peek());
    227     assertInterruptibleDrained(q);
    228     assertUninterruptibleDrained(q);
    229   }
    230 
    231   private void assertInterruptibleDrained(BlockingQueue<Object> q) {
    232     // nothing to drain, thus this should wait doing nothing
    233     try {
    234       assertEquals(0, Queues.drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS));
    235     } catch (InterruptedException e) {
    236       throw new AssertionError();
    237     }
    238 
    239     // but does the wait actually occurs?
    240     threadPool.submit(new Interrupter(Thread.currentThread()));
    241     try {
    242       // if waiting works, this should get stuck
    243       Queues.drain(q, Lists.newArrayList(), 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    244       fail();
    245     } catch (InterruptedException expected) {
    246       // we indeed waited; a slow thread had enough time to interrupt us
    247     }
    248   }
    249 
    250   // same as above; uninterruptible version
    251   private void assertUninterruptibleDrained(BlockingQueue<Object> q) {
    252     assertEquals(0,
    253         Queues.drainUninterruptibly(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS));
    254 
    255     // but does the wait actually occurs?
    256     threadPool.submit(new Interrupter(Thread.currentThread()));
    257 
    258     long startTime = System.nanoTime();
    259     Queues.drainUninterruptibly(
    260         q, Lists.newArrayList(), 1, 10, TimeUnit.MILLISECONDS);
    261     assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10));
    262     // wait for interrupted status and clear it
    263     while (!Thread.interrupted()) { Thread.yield(); }
    264   }
    265 
    266   private static class Producer implements Runnable {
    267     final BlockingQueue<Object> q;
    268     final int elements;
    269 
    270     Producer(BlockingQueue<Object> q, int elements) {
    271       this.q = q;
    272       this.elements = elements;
    273     }
    274 
    275     @Override public void run() {
    276       try {
    277         for (int i = 0; i < elements; i++) {
    278           q.put(new Object());
    279         }
    280       } catch (InterruptedException e) {
    281         // TODO(user): replace this when there is a better way to spawn threads in tests and
    282         // have threads propagate their errors back to the test thread.
    283         e.printStackTrace();
    284         // never returns, so that #tearDown() notices that one worker isn't done
    285         Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    286       }
    287     }
    288   }
    289 
    290   private static class Interrupter implements Runnable {
    291     final Thread threadToInterrupt;
    292 
    293     Interrupter(Thread threadToInterrupt) {
    294       this.threadToInterrupt = threadToInterrupt;
    295     }
    296 
    297     @Override public void run() {
    298       try {
    299         Thread.sleep(100);
    300       } catch (InterruptedException e) {
    301         throw new AssertionError();
    302       } finally {
    303         threadToInterrupt.interrupt();
    304       }
    305     }
    306   }
    307 }
    308