Home | History | Annotate | Download | only in jsr166
      1 /*
      2  * Written by Doug Lea with assistance from members of JCP JSR-166
      3  * Expert Group and released to the public domain, as explained at
      4  * http://creativecommons.org/publicdomain/zero/1.0/
      5  * Other contributors include Andrew Wright, Jeffrey Hayes,
      6  * Pat Fisher, Mike Judd.
      7  */
      8 
      9 package jsr166;
     10 
     11 import static java.util.concurrent.TimeUnit.MILLISECONDS;
     12 
     13 import java.util.concurrent.ArrayBlockingQueue;
     14 import java.util.concurrent.Callable;
     15 import java.util.concurrent.ExecutorCompletionService;
     16 import java.util.concurrent.Executors;
     17 import java.util.concurrent.ExecutorService;
     18 import java.util.concurrent.Future;
     19 import java.util.concurrent.FutureTask;
     20 import java.util.concurrent.RunnableFuture;
     21 import java.util.concurrent.ThreadPoolExecutor;
     22 import java.util.concurrent.TimeUnit;
     23 import java.util.concurrent.atomic.AtomicBoolean;
     24 
     25 import junit.framework.Test;
     26 import junit.framework.TestSuite;
     27 
     28 public class ExecutorCompletionServiceTest extends JSR166TestCase {
     29     // android-note: Removed because the CTS runner does a bad job of
     30     // retrying tests that have suite() declarations.
     31     //
     32     // public static void main(String[] args) {
     33     //     main(suite(), args);
     34     // }
     35     // public static Test suite() {
     36     //     return new TestSuite(ExecutorCompletionServiceTest.class);
     37     // }
     38 
     39     /**
     40      * Creating a new ECS with null Executor throw NPE
     41      */
     42     public void testConstructorNPE() {
     43         try {
     44             new ExecutorCompletionService(null);
     45             shouldThrow();
     46         } catch (NullPointerException success) {}
     47     }
     48 
     49     /**
     50      * Creating a new ECS with null queue throw NPE
     51      */
     52     public void testConstructorNPE2() {
     53         try {
     54             ExecutorService e = Executors.newCachedThreadPool();
     55             new ExecutorCompletionService(e, null);
     56             shouldThrow();
     57         } catch (NullPointerException success) {}
     58     }
     59 
     60     /**
     61      * Submitting a null callable throws NPE
     62      */
     63     public void testSubmitNPE() {
     64         final ExecutorService e = Executors.newCachedThreadPool();
     65         final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
     66         try (PoolCleaner cleaner = cleaner(e)) {
     67             Callable c = null;
     68             try {
     69                 ecs.submit(c);
     70                 shouldThrow();
     71             } catch (NullPointerException success) {}
     72         }
     73     }
     74 
     75     /**
     76      * Submitting a null runnable throws NPE
     77      */
     78     public void testSubmitNPE2() {
     79         final ExecutorService e = Executors.newCachedThreadPool();
     80         final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
     81         try (PoolCleaner cleaner = cleaner(e)) {
     82             Runnable r = null;
     83             try {
     84                 ecs.submit(r, Boolean.TRUE);
     85                 shouldThrow();
     86             } catch (NullPointerException success) {}
     87         }
     88     }
     89 
     90     /**
     91      * A taken submitted task is completed
     92      */
     93     public void testTake() throws InterruptedException {
     94         final ExecutorService e = Executors.newCachedThreadPool();
     95         final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
     96         try (PoolCleaner cleaner = cleaner(e)) {
     97             Callable c = new StringTask();
     98             ecs.submit(c);
     99             Future f = ecs.take();
    100             assertTrue(f.isDone());
    101         }
    102     }
    103 
    104     /**
    105      * Take returns the same future object returned by submit
    106      */
    107     public void testTake2() throws InterruptedException {
    108         final ExecutorService e = Executors.newCachedThreadPool();
    109         final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
    110         try (PoolCleaner cleaner = cleaner(e)) {
    111             Callable c = new StringTask();
    112             Future f1 = ecs.submit(c);
    113             Future f2 = ecs.take();
    114             assertSame(f1, f2);
    115         }
    116     }
    117 
    118     /**
    119      * If poll returns non-null, the returned task is completed
    120      */
    121     public void testPoll1() throws Exception {
    122         final ExecutorService e = Executors.newCachedThreadPool();
    123         final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
    124         try (PoolCleaner cleaner = cleaner(e)) {
    125             assertNull(ecs.poll());
    126             Callable c = new StringTask();
    127             ecs.submit(c);
    128 
    129             long startTime = System.nanoTime();
    130             Future f;
    131             while ((f = ecs.poll()) == null) {
    132                 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
    133                     fail("timed out");
    134                 Thread.yield();
    135             }
    136             assertTrue(f.isDone());
    137             assertSame(TEST_STRING, f.get());
    138         }
    139     }
    140 
    141     /**
    142      * If timed poll returns non-null, the returned task is completed
    143      */
    144     public void testPoll2() throws InterruptedException {
    145         final ExecutorService e = Executors.newCachedThreadPool();
    146         final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
    147         try (PoolCleaner cleaner = cleaner(e)) {
    148             assertNull(ecs.poll());
    149             Callable c = new StringTask();
    150             ecs.submit(c);
    151             Future f = ecs.poll(SHORT_DELAY_MS, MILLISECONDS);
    152             if (f != null)
    153                 assertTrue(f.isDone());
    154         }
    155     }
    156 
    157     /**
    158      * Submitting to underlying AES that overrides newTaskFor(Callable)
    159      * returns and eventually runs Future returned by newTaskFor.
    160      */
    161     public void testNewTaskForCallable() throws InterruptedException {
    162         final AtomicBoolean done = new AtomicBoolean(false);
    163         class MyCallableFuture<V> extends FutureTask<V> {
    164             MyCallableFuture(Callable<V> c) { super(c); }
    165             protected void done() { done.set(true); }
    166         }
    167         final ExecutorService e =
    168             new ThreadPoolExecutor(1, 1,
    169                                    30L, TimeUnit.SECONDS,
    170                                    new ArrayBlockingQueue<Runnable>(1)) {
    171                 protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
    172                     return new MyCallableFuture<T>(c);
    173                 }};
    174         ExecutorCompletionService<String> ecs =
    175             new ExecutorCompletionService<String>(e);
    176         try (PoolCleaner cleaner = cleaner(e)) {
    177             assertNull(ecs.poll());
    178             Callable<String> c = new StringTask();
    179             Future f1 = ecs.submit(c);
    180             assertTrue("submit must return MyCallableFuture",
    181                        f1 instanceof MyCallableFuture);
    182             Future f2 = ecs.take();
    183             assertSame("submit and take must return same objects", f1, f2);
    184             assertTrue("completed task must have set done", done.get());
    185         }
    186     }
    187 
    188     /**
    189      * Submitting to underlying AES that overrides newTaskFor(Runnable,T)
    190      * returns and eventually runs Future returned by newTaskFor.
    191      */
    192     public void testNewTaskForRunnable() throws InterruptedException {
    193         final AtomicBoolean done = new AtomicBoolean(false);
    194         class MyRunnableFuture<V> extends FutureTask<V> {
    195             MyRunnableFuture(Runnable t, V r) { super(t, r); }
    196             protected void done() { done.set(true); }
    197         }
    198         final ExecutorService e =
    199             new ThreadPoolExecutor(1, 1,
    200                                    30L, TimeUnit.SECONDS,
    201                                    new ArrayBlockingQueue<Runnable>(1)) {
    202                 protected <T> RunnableFuture<T> newTaskFor(Runnable t, T r) {
    203                     return new MyRunnableFuture<T>(t, r);
    204                 }};
    205         final ExecutorCompletionService<String> ecs =
    206             new ExecutorCompletionService<String>(e);
    207         try (PoolCleaner cleaner = cleaner(e)) {
    208             assertNull(ecs.poll());
    209             Runnable r = new NoOpRunnable();
    210             Future f1 = ecs.submit(r, null);
    211             assertTrue("submit must return MyRunnableFuture",
    212                        f1 instanceof MyRunnableFuture);
    213             Future f2 = ecs.take();
    214             assertSame("submit and take must return same objects", f1, f2);
    215             assertTrue("completed task must have set done", done.get());
    216         }
    217     }
    218 
    219 }
    220