Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Written by Doug Lea with assistance from members of JCP JSR-166
      3  * Expert Group and released to the public domain, as explained at
      4  * http://creativecommons.org/publicdomain/zero/1.0/
      5  */
      6 
      7 package java.util.concurrent;
      8 import java.util.*;
      9 
     10 /**
     11  * Provides default implementations of {@link ExecutorService}
     12  * execution methods. This class implements the <tt>submit</tt>,
     13  * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
     14  * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
     15  * to the {@link FutureTask} class provided in this package.  For example,
     16  * the implementation of <tt>submit(Runnable)</tt> creates an
     17  * associated <tt>RunnableFuture</tt> that is executed and
     18  * returned. Subclasses may override the <tt>newTaskFor</tt> methods
     19  * to return <tt>RunnableFuture</tt> implementations other than
     20  * <tt>FutureTask</tt>.
     21  *
     22  * <p> <b>Extension example</b>. Here is a sketch of a class
     23  * that customizes {@link ThreadPoolExecutor} to use
     24  * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
     25  *  <pre> {@code
     26  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
     27  *
     28  *   static class CustomTask<V> implements RunnableFuture<V> {...}
     29  *
     30  *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
     31  *       return new CustomTask<V>(c);
     32  *   }
     33  *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
     34  *       return new CustomTask<V>(r, v);
     35  *   }
     36  *   // ... add constructors, etc.
     37  * }}</pre>
     38  *
     39  * @since 1.5
     40  * @author Doug Lea
     41  */
     42 public abstract class AbstractExecutorService implements ExecutorService {
     43 
     44     /**
     45      * Returns a <tt>RunnableFuture</tt> for the given runnable and default
     46      * value.
     47      *
     48      * @param runnable the runnable task being wrapped
     49      * @param value the default value for the returned future
     50      * @return a <tt>RunnableFuture</tt> which when run will run the
     51      * underlying runnable and which, as a <tt>Future</tt>, will yield
     52      * the given value as its result and provide for cancellation of
     53      * the underlying task.
     54      * @since 1.6
     55      */
     56     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
     57         return new FutureTask<T>(runnable, value);
     58     }
     59 
     60     /**
     61      * Returns a <tt>RunnableFuture</tt> for the given callable task.
     62      *
     63      * @param callable the callable task being wrapped
     64      * @return a <tt>RunnableFuture</tt> which when run will call the
     65      * underlying callable and which, as a <tt>Future</tt>, will yield
     66      * the callable's result as its result and provide for
     67      * cancellation of the underlying task.
     68      * @since 1.6
     69      */
     70     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
     71         return new FutureTask<T>(callable);
     72     }
     73 
     74     /**
     75      * @throws RejectedExecutionException {@inheritDoc}
     76      * @throws NullPointerException       {@inheritDoc}
     77      */
     78     public Future<?> submit(Runnable task) {
     79         if (task == null) throw new NullPointerException();
     80         RunnableFuture<Void> ftask = newTaskFor(task, null);
     81         execute(ftask);
     82         return ftask;
     83     }
     84 
     85     /**
     86      * @throws RejectedExecutionException {@inheritDoc}
     87      * @throws NullPointerException       {@inheritDoc}
     88      */
     89     public <T> Future<T> submit(Runnable task, T result) {
     90         if (task == null) throw new NullPointerException();
     91         RunnableFuture<T> ftask = newTaskFor(task, result);
     92         execute(ftask);
     93         return ftask;
     94     }
     95 
     96     /**
     97      * @throws RejectedExecutionException {@inheritDoc}
     98      * @throws NullPointerException       {@inheritDoc}
     99      */
    100     public <T> Future<T> submit(Callable<T> task) {
    101         if (task == null) throw new NullPointerException();
    102         RunnableFuture<T> ftask = newTaskFor(task);
    103         execute(ftask);
    104         return ftask;
    105     }
    106 
    107     /**
    108      * the main mechanics of invokeAny.
    109      */
    110     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
    111                             boolean timed, long nanos)
    112         throws InterruptedException, ExecutionException, TimeoutException {
    113         if (tasks == null)
    114             throw new NullPointerException();
    115         int ntasks = tasks.size();
    116         if (ntasks == 0)
    117             throw new IllegalArgumentException();
    118         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
    119         ExecutorCompletionService<T> ecs =
    120             new ExecutorCompletionService<T>(this);
    121 
    122         // For efficiency, especially in executors with limited
    123         // parallelism, check to see if previously submitted tasks are
    124         // done before submitting more of them. This interleaving
    125         // plus the exception mechanics account for messiness of main
    126         // loop.
    127 
    128         try {
    129             // Record exceptions so that if we fail to obtain any
    130             // result, we can throw the last exception we got.
    131             ExecutionException ee = null;
    132             long lastTime = timed ? System.nanoTime() : 0;
    133             Iterator<? extends Callable<T>> it = tasks.iterator();
    134 
    135             // Start one task for sure; the rest incrementally
    136             futures.add(ecs.submit(it.next()));
    137             --ntasks;
    138             int active = 1;
    139 
    140             for (;;) {
    141                 Future<T> f = ecs.poll();
    142                 if (f == null) {
    143                     if (ntasks > 0) {
    144                         --ntasks;
    145                         futures.add(ecs.submit(it.next()));
    146                         ++active;
    147                     }
    148                     else if (active == 0)
    149                         break;
    150                     else if (timed) {
    151                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
    152                         if (f == null)
    153                             throw new TimeoutException();
    154                         long now = System.nanoTime();
    155                         nanos -= now - lastTime;
    156                         lastTime = now;
    157                     }
    158                     else
    159                         f = ecs.take();
    160                 }
    161                 if (f != null) {
    162                     --active;
    163                     try {
    164                         return f.get();
    165                     } catch (ExecutionException eex) {
    166                         ee = eex;
    167                     } catch (RuntimeException rex) {
    168                         ee = new ExecutionException(rex);
    169                     }
    170                 }
    171             }
    172 
    173             if (ee == null)
    174                 ee = new ExecutionException();
    175             throw ee;
    176 
    177         } finally {
    178             for (Future<T> f : futures)
    179                 f.cancel(true);
    180         }
    181     }
    182 
    183     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    184         throws InterruptedException, ExecutionException {
    185         try {
    186             return doInvokeAny(tasks, false, 0);
    187         } catch (TimeoutException cannotHappen) {
    188             assert false;
    189             return null;
    190         }
    191     }
    192 
    193     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    194                            long timeout, TimeUnit unit)
    195         throws InterruptedException, ExecutionException, TimeoutException {
    196         return doInvokeAny(tasks, true, unit.toNanos(timeout));
    197     }
    198 
    199     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    200         throws InterruptedException {
    201         if (tasks == null)
    202             throw new NullPointerException();
    203         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    204         boolean done = false;
    205         try {
    206             for (Callable<T> t : tasks) {
    207                 RunnableFuture<T> f = newTaskFor(t);
    208                 futures.add(f);
    209                 execute(f);
    210             }
    211             for (Future<T> f : futures) {
    212                 if (!f.isDone()) {
    213                     try {
    214                         f.get();
    215                     } catch (CancellationException ignore) {
    216                     } catch (ExecutionException ignore) {
    217                     }
    218                 }
    219             }
    220             done = true;
    221             return futures;
    222         } finally {
    223             if (!done)
    224                 for (Future<T> f : futures)
    225                     f.cancel(true);
    226         }
    227     }
    228 
    229     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    230                                          long timeout, TimeUnit unit)
    231         throws InterruptedException {
    232         if (tasks == null || unit == null)
    233             throw new NullPointerException();
    234         long nanos = unit.toNanos(timeout);
    235         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    236         boolean done = false;
    237         try {
    238             for (Callable<T> t : tasks)
    239                 futures.add(newTaskFor(t));
    240 
    241             long lastTime = System.nanoTime();
    242 
    243             // Interleave time checks and calls to execute in case
    244             // executor doesn't have any/much parallelism.
    245             Iterator<Future<T>> it = futures.iterator();
    246             while (it.hasNext()) {
    247                 execute((Runnable)(it.next()));
    248                 long now = System.nanoTime();
    249                 nanos -= now - lastTime;
    250                 lastTime = now;
    251                 if (nanos <= 0)
    252                     return futures;
    253             }
    254 
    255             for (Future<T> f : futures) {
    256                 if (!f.isDone()) {
    257                     if (nanos <= 0)
    258                         return futures;
    259                     try {
    260                         f.get(nanos, TimeUnit.NANOSECONDS);
    261                     } catch (CancellationException ignore) {
    262                     } catch (ExecutionException ignore) {
    263                     } catch (TimeoutException toe) {
    264                         return futures;
    265                     }
    266                     long now = System.nanoTime();
    267                     nanos -= now - lastTime;
    268                     lastTime = now;
    269                 }
    270             }
    271             done = true;
    272             return futures;
    273         } finally {
    274             if (!done)
    275                 for (Future<T> f : futures)
    276                     f.cancel(true);
    277         }
    278     }
    279 
    280 }
    281