Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * This file is a modified version of
      3  * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java?revision=1.20
      4  * which contained the following notice:
      5  *
      6  * Written by Doug Lea with assistance from members of JCP JSR-166
      7  * Expert Group and released to the public domain, as explained at
      8  * http://creativecommons.org/licenses/publicdomain
      9  */
     10 
     11 package java.util.concurrent;
     12 
     13 import java.util.*;
     14 
     15 public abstract class AbstractExecutorService implements ExecutorService {
     16 
     17     public Future<?> submit(Runnable task) {
     18         if (task == null) throw new NullPointerException();
     19         FutureTask<Object> ftask = new FutureTask<Object>(task, null);
     20         execute(ftask);
     21         return ftask;
     22     }
     23 
     24     public <T> Future<T> submit(Runnable task, T result) {
     25         if (task == null) throw new NullPointerException();
     26         FutureTask<T> ftask = new FutureTask<T>(task, result);
     27         execute(ftask);
     28         return ftask;
     29     }
     30 
     31     public <T> Future<T> submit(Callable<T> task) {
     32         if (task == null) throw new NullPointerException();
     33         FutureTask<T> ftask = new FutureTask<T>(task);
     34         execute(ftask);
     35         return ftask;
     36     }
     37 
     38     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
     39                             boolean timed, long nanos)
     40         throws InterruptedException, ExecutionException, TimeoutException {
     41         if (tasks == null)
     42             throw new NullPointerException();
     43         int ntasks = tasks.size();
     44         if (ntasks == 0)
     45             throw new IllegalArgumentException();
     46         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
     47         ExecutorCompletionService<T> ecs =
     48             new ExecutorCompletionService<T>(this);
     49 
     50         try {
     51             ExecutionException ee = null;
     52             long lastTime = (timed)? System.nanoTime() : 0;
     53             Iterator<? extends Callable<T>> it = tasks.iterator();
     54 
     55             futures.add(ecs.submit(it.next()));
     56             --ntasks;
     57             int active = 1;
     58 
     59             for (;;) {
     60                 Future<T> f = ecs.poll();
     61                 if (f == null) {
     62                     if (ntasks > 0) {
     63                         --ntasks;
     64                         futures.add(ecs.submit(it.next()));
     65                         ++active;
     66                     }
     67                     else if (active == 0)
     68                         break;
     69                     else if (timed) {
     70                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
     71                         if (f == null)
     72                             throw new TimeoutException();
     73                         long now = System.nanoTime();
     74                         nanos -= now - lastTime;
     75                         lastTime = now;
     76                     }
     77                     else
     78                         f = ecs.take();
     79                 }
     80                 if (f != null) {
     81                     --active;
     82                     try {
     83                         return f.get();
     84                     } catch (InterruptedException ie) {
     85                         throw ie;
     86                     } catch (ExecutionException eex) {
     87                         ee = eex;
     88                     } catch (RuntimeException rex) {
     89                         ee = new ExecutionException(rex);
     90                     }
     91                 }
     92             }
     93 
     94             if (ee == null)
     95                 ee = new ExecutionException();
     96             throw ee;
     97 
     98         } finally {
     99             for (Future<T> f : futures)
    100                 f.cancel(true);
    101         }
    102     }
    103 
    104     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    105         throws InterruptedException, ExecutionException {
    106         try {
    107             return doInvokeAny(tasks, false, 0);
    108         } catch (TimeoutException cannotHappen) {
    109             assert false;
    110             return null;
    111         }
    112     }
    113 
    114     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    115                            long timeout, TimeUnit unit)
    116         throws InterruptedException, ExecutionException, TimeoutException {
    117         return doInvokeAny(tasks, true, unit.toNanos(timeout));
    118     }
    119 
    120     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    121         throws InterruptedException {
    122         if (tasks == null)
    123             throw new NullPointerException();
    124         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    125         boolean done = false;
    126         try {
    127             for (Callable<T> t : tasks) {
    128                 FutureTask<T> f = new FutureTask<T>(t);
    129                 futures.add(f);
    130                 execute(f);
    131             }
    132             for (Future<T> f : futures) {
    133                 if (!f.isDone()) {
    134                     try {
    135                         f.get();
    136                     } catch (CancellationException ignore) {
    137                     } catch (ExecutionException ignore) {
    138                     }
    139                 }
    140             }
    141             done = true;
    142             return futures;
    143         } finally {
    144             if (!done)
    145                 for (Future<T> f : futures)
    146                     f.cancel(true);
    147         }
    148     }
    149 
    150     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    151                                          long timeout, TimeUnit unit)
    152         throws InterruptedException {
    153         if (tasks == null || unit == null)
    154             throw new NullPointerException();
    155         long nanos = unit.toNanos(timeout);
    156         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    157         boolean done = false;
    158         try {
    159             for (Callable<T> t : tasks)
    160                 futures.add(new FutureTask<T>(t));
    161 
    162             long lastTime = System.nanoTime();
    163 
    164             Iterator<Future<T>> it = futures.iterator();
    165             while (it.hasNext()) {
    166                 execute((Runnable)(it.next()));
    167                 long now = System.nanoTime();
    168                 nanos -= now - lastTime;
    169                 lastTime = now;
    170                 if (nanos <= 0)
    171                     return futures;
    172             }
    173 
    174             for (Future<T> f : futures) {
    175                 if (!f.isDone()) {
    176                     if (nanos <= 0)
    177                         return futures;
    178                     try {
    179                         f.get(nanos, TimeUnit.NANOSECONDS);
    180                     } catch (CancellationException ignore) {
    181                     } catch (ExecutionException ignore) {
    182                     } catch (TimeoutException toe) {
    183                         return futures;
    184                     }
    185                     long now = System.nanoTime();
    186                     nanos -= now - lastTime;
    187                     lastTime = now;
    188                 }
    189             }
    190             done = true;
    191             return futures;
    192         } finally {
    193             if (!done)
    194                 for (Future<T> f : futures)
    195                     f.cancel(true);
    196         }
    197     }
    198 
    199 }
    200