1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 import java.util.*; 9 10 /** 11 * Provides default implementations of {@link ExecutorService} 12 * execution methods. This class implements the <tt>submit</tt>, 13 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a 14 * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults 15 * to the {@link FutureTask} class provided in this package. For example, 16 * the implementation of <tt>submit(Runnable)</tt> creates an 17 * associated <tt>RunnableFuture</tt> that is executed and 18 * returned. Subclasses may override the <tt>newTaskFor</tt> methods 19 * to return <tt>RunnableFuture</tt> implementations other than 20 * <tt>FutureTask</tt>. 21 * 22 * <p> <b>Extension example</b>. Here is a sketch of a class 23 * that customizes {@link ThreadPoolExecutor} to use 24 * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>: 25 * <pre> {@code 26 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor { 27 * 28 * static class CustomTask<V> implements RunnableFuture<V> {...} 29 * 30 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { 31 * return new CustomTask<V>(c); 32 * } 33 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) { 34 * return new CustomTask<V>(r, v); 35 * } 36 * // ... add constructors, etc. 37 * }}</pre> 38 * 39 * @since 1.5 40 * @author Doug Lea 41 */ 42 public abstract class AbstractExecutorService implements ExecutorService { 43 44 /** 45 * Returns a <tt>RunnableFuture</tt> for the given runnable and default 46 * value. 47 * 48 * @param runnable the runnable task being wrapped 49 * @param value the default value for the returned future 50 * @return a <tt>RunnableFuture</tt> which when run will run the 51 * underlying runnable and which, as a <tt>Future</tt>, will yield 52 * the given value as its result and provide for cancellation of 53 * the underlying task. 54 * @since 1.6 55 */ 56 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 57 return new FutureTask<T>(runnable, value); 58 } 59 60 /** 61 * Returns a <tt>RunnableFuture</tt> for the given callable task. 62 * 63 * @param callable the callable task being wrapped 64 * @return a <tt>RunnableFuture</tt> which when run will call the 65 * underlying callable and which, as a <tt>Future</tt>, will yield 66 * the callable's result as its result and provide for 67 * cancellation of the underlying task. 68 * @since 1.6 69 */ 70 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 71 return new FutureTask<T>(callable); 72 } 73 74 /** 75 * @throws RejectedExecutionException {@inheritDoc} 76 * @throws NullPointerException {@inheritDoc} 77 */ 78 public Future<?> submit(Runnable task) { 79 if (task == null) throw new NullPointerException(); 80 RunnableFuture<Void> ftask = newTaskFor(task, null); 81 execute(ftask); 82 return ftask; 83 } 84 85 /** 86 * @throws RejectedExecutionException {@inheritDoc} 87 * @throws NullPointerException {@inheritDoc} 88 */ 89 public <T> Future<T> submit(Runnable task, T result) { 90 if (task == null) throw new NullPointerException(); 91 RunnableFuture<T> ftask = newTaskFor(task, result); 92 execute(ftask); 93 return ftask; 94 } 95 96 /** 97 * @throws RejectedExecutionException {@inheritDoc} 98 * @throws NullPointerException {@inheritDoc} 99 */ 100 public <T> Future<T> submit(Callable<T> task) { 101 if (task == null) throw new NullPointerException(); 102 RunnableFuture<T> ftask = newTaskFor(task); 103 execute(ftask); 104 return ftask; 105 } 106 107 /** 108 * the main mechanics of invokeAny. 109 */ 110 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 111 boolean timed, long nanos) 112 throws InterruptedException, ExecutionException, TimeoutException { 113 if (tasks == null) 114 throw new NullPointerException(); 115 int ntasks = tasks.size(); 116 if (ntasks == 0) 117 throw new IllegalArgumentException(); 118 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks); 119 ExecutorCompletionService<T> ecs = 120 new ExecutorCompletionService<T>(this); 121 122 // For efficiency, especially in executors with limited 123 // parallelism, check to see if previously submitted tasks are 124 // done before submitting more of them. This interleaving 125 // plus the exception mechanics account for messiness of main 126 // loop. 127 128 try { 129 // Record exceptions so that if we fail to obtain any 130 // result, we can throw the last exception we got. 131 ExecutionException ee = null; 132 long lastTime = timed ? System.nanoTime() : 0; 133 Iterator<? extends Callable<T>> it = tasks.iterator(); 134 135 // Start one task for sure; the rest incrementally 136 futures.add(ecs.submit(it.next())); 137 --ntasks; 138 int active = 1; 139 140 for (;;) { 141 Future<T> f = ecs.poll(); 142 if (f == null) { 143 if (ntasks > 0) { 144 --ntasks; 145 futures.add(ecs.submit(it.next())); 146 ++active; 147 } 148 else if (active == 0) 149 break; 150 else if (timed) { 151 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 152 if (f == null) 153 throw new TimeoutException(); 154 long now = System.nanoTime(); 155 nanos -= now - lastTime; 156 lastTime = now; 157 } 158 else 159 f = ecs.take(); 160 } 161 if (f != null) { 162 --active; 163 try { 164 return f.get(); 165 } catch (ExecutionException eex) { 166 ee = eex; 167 } catch (RuntimeException rex) { 168 ee = new ExecutionException(rex); 169 } 170 } 171 } 172 173 if (ee == null) 174 ee = new ExecutionException(); 175 throw ee; 176 177 } finally { 178 for (Future<T> f : futures) 179 f.cancel(true); 180 } 181 } 182 183 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 184 throws InterruptedException, ExecutionException { 185 try { 186 return doInvokeAny(tasks, false, 0); 187 } catch (TimeoutException cannotHappen) { 188 assert false; 189 return null; 190 } 191 } 192 193 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 194 long timeout, TimeUnit unit) 195 throws InterruptedException, ExecutionException, TimeoutException { 196 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 197 } 198 199 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 200 throws InterruptedException { 201 if (tasks == null) 202 throw new NullPointerException(); 203 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 204 boolean done = false; 205 try { 206 for (Callable<T> t : tasks) { 207 RunnableFuture<T> f = newTaskFor(t); 208 futures.add(f); 209 execute(f); 210 } 211 for (Future<T> f : futures) { 212 if (!f.isDone()) { 213 try { 214 f.get(); 215 } catch (CancellationException ignore) { 216 } catch (ExecutionException ignore) { 217 } 218 } 219 } 220 done = true; 221 return futures; 222 } finally { 223 if (!done) 224 for (Future<T> f : futures) 225 f.cancel(true); 226 } 227 } 228 229 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 230 long timeout, TimeUnit unit) 231 throws InterruptedException { 232 if (tasks == null || unit == null) 233 throw new NullPointerException(); 234 long nanos = unit.toNanos(timeout); 235 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 236 boolean done = false; 237 try { 238 for (Callable<T> t : tasks) 239 futures.add(newTaskFor(t)); 240 241 long lastTime = System.nanoTime(); 242 243 // Interleave time checks and calls to execute in case 244 // executor doesn't have any/much parallelism. 245 Iterator<Future<T>> it = futures.iterator(); 246 while (it.hasNext()) { 247 execute((Runnable)(it.next())); 248 long now = System.nanoTime(); 249 nanos -= now - lastTime; 250 lastTime = now; 251 if (nanos <= 0) 252 return futures; 253 } 254 255 for (Future<T> f : futures) { 256 if (!f.isDone()) { 257 if (nanos <= 0) 258 return futures; 259 try { 260 f.get(nanos, TimeUnit.NANOSECONDS); 261 } catch (CancellationException ignore) { 262 } catch (ExecutionException ignore) { 263 } catch (TimeoutException toe) { 264 return futures; 265 } 266 long now = System.nanoTime(); 267 nanos -= now - lastTime; 268 lastTime = now; 269 } 270 } 271 done = true; 272 return futures; 273 } finally { 274 if (!done) 275 for (Future<T> f : futures) 276 f.cancel(true); 277 } 278 } 279 280 } 281