1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import java.util.concurrent.locks.LockSupport; 10 11 /** 12 * A cancellable asynchronous computation. This class provides a base 13 * implementation of {@link Future}, with methods to start and cancel 14 * a computation, query to see if the computation is complete, and 15 * retrieve the result of the computation. The result can only be 16 * retrieved when the computation has completed; the {@code get} 17 * methods will block if the computation has not yet completed. Once 18 * the computation has completed, the computation cannot be restarted 19 * or cancelled (unless the computation is invoked using 20 * {@link #runAndReset}). 21 * 22 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or 23 * {@link Runnable} object. Because {@code FutureTask} implements 24 * {@code Runnable}, a {@code FutureTask} can be submitted to an 25 * {@link Executor} for execution. 26 * 27 * <p>In addition to serving as a standalone class, this class provides 28 * {@code protected} functionality that may be useful when creating 29 * customized task classes. 30 * 31 * @since 1.5 32 * @author Doug Lea 33 * @param <V> The result type returned by this FutureTask's {@code get} methods 34 */ 35 public class FutureTask<V> implements RunnableFuture<V> { 36 /* 37 * Revision notes: This differs from previous versions of this 38 * class that relied on AbstractQueuedSynchronizer, mainly to 39 * avoid surprising users about retaining interrupt status during 40 * cancellation races. Sync control in the current design relies 41 * on a "state" field updated via CAS to track completion, along 42 * with a simple Treiber stack to hold waiting threads. 43 * 44 * Style note: As usual, we bypass overhead of using 45 * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics. 46 */ 47 48 /** 49 * The run state of this task, initially NEW. The run state 50 * transitions to a terminal state only in methods set, 51 * setException, and cancel. During completion, state may take on 52 * transient values of COMPLETING (while outcome is being set) or 53 * INTERRUPTING (only while interrupting the runner to satisfy a 54 * cancel(true)). Transitions from these intermediate to final 55 * states use cheaper ordered/lazy writes because values are unique 56 * and cannot be further modified. 57 * 58 * Possible state transitions: 59 * NEW -> COMPLETING -> NORMAL 60 * NEW -> COMPLETING -> EXCEPTIONAL 61 * NEW -> CANCELLED 62 * NEW -> INTERRUPTING -> INTERRUPTED 63 */ 64 private volatile int state; 65 private static final int NEW = 0; 66 private static final int COMPLETING = 1; 67 private static final int NORMAL = 2; 68 private static final int EXCEPTIONAL = 3; 69 private static final int CANCELLED = 4; 70 private static final int INTERRUPTING = 5; 71 private static final int INTERRUPTED = 6; 72 73 /** The underlying callable; nulled out after running */ 74 private Callable<V> callable; 75 /** The result to return or exception to throw from get() */ 76 private Object outcome; // non-volatile, protected by state reads/writes 77 /** The thread running the callable; CASed during run() */ 78 private volatile Thread runner; 79 /** Treiber stack of waiting threads */ 80 private volatile WaitNode waiters; 81 82 /** 83 * Returns result or throws exception for completed task. 84 * 85 * @param s completed state value 86 */ 87 @SuppressWarnings("unchecked") 88 private V report(int s) throws ExecutionException { 89 Object x = outcome; 90 if (s == NORMAL) 91 return (V)x; 92 if (s >= CANCELLED) 93 throw new CancellationException(); 94 throw new ExecutionException((Throwable)x); 95 } 96 97 /** 98 * Creates a {@code FutureTask} that will, upon running, execute the 99 * given {@code Callable}. 100 * 101 * @param callable the callable task 102 * @throws NullPointerException if the callable is null 103 */ 104 public FutureTask(Callable<V> callable) { 105 if (callable == null) 106 throw new NullPointerException(); 107 this.callable = callable; 108 this.state = NEW; // ensure visibility of callable 109 } 110 111 /** 112 * Creates a {@code FutureTask} that will, upon running, execute the 113 * given {@code Runnable}, and arrange that {@code get} will return the 114 * given result on successful completion. 115 * 116 * @param runnable the runnable task 117 * @param result the result to return on successful completion. If 118 * you don't need a particular result, consider using 119 * constructions of the form: 120 * {@code Future<?> f = new FutureTask<Void>(runnable, null)} 121 * @throws NullPointerException if the runnable is null 122 */ 123 public FutureTask(Runnable runnable, V result) { 124 this.callable = Executors.callable(runnable, result); 125 this.state = NEW; // ensure visibility of callable 126 } 127 128 public boolean isCancelled() { 129 return state >= CANCELLED; 130 } 131 132 public boolean isDone() { 133 return state != NEW; 134 } 135 136 public boolean cancel(boolean mayInterruptIfRunning) { 137 if (!(state == NEW && 138 U.compareAndSwapInt(this, STATE, NEW, 139 mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 140 return false; 141 try { // in case call to interrupt throws exception 142 if (mayInterruptIfRunning) { 143 try { 144 Thread t = runner; 145 if (t != null) 146 t.interrupt(); 147 } finally { // final state 148 U.putOrderedInt(this, STATE, INTERRUPTED); 149 } 150 } 151 } finally { 152 finishCompletion(); 153 } 154 return true; 155 } 156 157 /** 158 * @throws CancellationException {@inheritDoc} 159 */ 160 public V get() throws InterruptedException, ExecutionException { 161 int s = state; 162 if (s <= COMPLETING) 163 s = awaitDone(false, 0L); 164 return report(s); 165 } 166 167 /** 168 * @throws CancellationException {@inheritDoc} 169 */ 170 public V get(long timeout, TimeUnit unit) 171 throws InterruptedException, ExecutionException, TimeoutException { 172 if (unit == null) 173 throw new NullPointerException(); 174 int s = state; 175 if (s <= COMPLETING && 176 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) 177 throw new TimeoutException(); 178 return report(s); 179 } 180 181 /** 182 * Protected method invoked when this task transitions to state 183 * {@code isDone} (whether normally or via cancellation). The 184 * default implementation does nothing. Subclasses may override 185 * this method to invoke completion callbacks or perform 186 * bookkeeping. Note that you can query status inside the 187 * implementation of this method to determine whether this task 188 * has been cancelled. 189 */ 190 protected void done() { } 191 192 /** 193 * Sets the result of this future to the given value unless 194 * this future has already been set or has been cancelled. 195 * 196 * <p>This method is invoked internally by the {@link #run} method 197 * upon successful completion of the computation. 198 * 199 * @param v the value 200 */ 201 protected void set(V v) { 202 if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) { 203 outcome = v; 204 U.putOrderedInt(this, STATE, NORMAL); // final state 205 finishCompletion(); 206 } 207 } 208 209 /** 210 * Causes this future to report an {@link ExecutionException} 211 * with the given throwable as its cause, unless this future has 212 * already been set or has been cancelled. 213 * 214 * <p>This method is invoked internally by the {@link #run} method 215 * upon failure of the computation. 216 * 217 * @param t the cause of failure 218 */ 219 protected void setException(Throwable t) { 220 if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) { 221 outcome = t; 222 U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state 223 finishCompletion(); 224 } 225 } 226 227 public void run() { 228 if (state != NEW || 229 !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread())) 230 return; 231 try { 232 Callable<V> c = callable; 233 if (c != null && state == NEW) { 234 V result; 235 boolean ran; 236 try { 237 result = c.call(); 238 ran = true; 239 } catch (Throwable ex) { 240 result = null; 241 ran = false; 242 setException(ex); 243 } 244 if (ran) 245 set(result); 246 } 247 } finally { 248 // runner must be non-null until state is settled to 249 // prevent concurrent calls to run() 250 runner = null; 251 // state must be re-read after nulling runner to prevent 252 // leaked interrupts 253 int s = state; 254 if (s >= INTERRUPTING) 255 handlePossibleCancellationInterrupt(s); 256 } 257 } 258 259 /** 260 * Executes the computation without setting its result, and then 261 * resets this future to initial state, failing to do so if the 262 * computation encounters an exception or is cancelled. This is 263 * designed for use with tasks that intrinsically execute more 264 * than once. 265 * 266 * @return {@code true} if successfully run and reset 267 */ 268 protected boolean runAndReset() { 269 if (state != NEW || 270 !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread())) 271 return false; 272 boolean ran = false; 273 int s = state; 274 try { 275 Callable<V> c = callable; 276 if (c != null && s == NEW) { 277 try { 278 c.call(); // don't set result 279 ran = true; 280 } catch (Throwable ex) { 281 setException(ex); 282 } 283 } 284 } finally { 285 // runner must be non-null until state is settled to 286 // prevent concurrent calls to run() 287 runner = null; 288 // state must be re-read after nulling runner to prevent 289 // leaked interrupts 290 s = state; 291 if (s >= INTERRUPTING) 292 handlePossibleCancellationInterrupt(s); 293 } 294 return ran && s == NEW; 295 } 296 297 /** 298 * Ensures that any interrupt from a possible cancel(true) is only 299 * delivered to a task while in run or runAndReset. 300 */ 301 private void handlePossibleCancellationInterrupt(int s) { 302 // It is possible for our interrupter to stall before getting a 303 // chance to interrupt us. Let's spin-wait patiently. 304 if (s == INTERRUPTING) 305 while (state == INTERRUPTING) 306 Thread.yield(); // wait out pending interrupt 307 308 // assert state == INTERRUPTED; 309 310 // We want to clear any interrupt we may have received from 311 // cancel(true). However, it is permissible to use interrupts 312 // as an independent mechanism for a task to communicate with 313 // its caller, and there is no way to clear only the 314 // cancellation interrupt. 315 // 316 // Thread.interrupted(); 317 } 318 319 /** 320 * Simple linked list nodes to record waiting threads in a Treiber 321 * stack. See other classes such as Phaser and SynchronousQueue 322 * for more detailed explanation. 323 */ 324 static final class WaitNode { 325 volatile Thread thread; 326 volatile WaitNode next; 327 WaitNode() { thread = Thread.currentThread(); } 328 } 329 330 /** 331 * Removes and signals all waiting threads, invokes done(), and 332 * nulls out callable. 333 */ 334 private void finishCompletion() { 335 // assert state > COMPLETING; 336 for (WaitNode q; (q = waiters) != null;) { 337 if (U.compareAndSwapObject(this, WAITERS, q, null)) { 338 for (;;) { 339 Thread t = q.thread; 340 if (t != null) { 341 q.thread = null; 342 LockSupport.unpark(t); 343 } 344 WaitNode next = q.next; 345 if (next == null) 346 break; 347 q.next = null; // unlink to help gc 348 q = next; 349 } 350 break; 351 } 352 } 353 354 done(); 355 356 callable = null; // to reduce footprint 357 } 358 359 /** 360 * Awaits completion or aborts on interrupt or timeout. 361 * 362 * @param timed true if use timed waits 363 * @param nanos time to wait, if timed 364 * @return state upon completion or at timeout 365 */ 366 private int awaitDone(boolean timed, long nanos) 367 throws InterruptedException { 368 // The code below is very delicate, to achieve these goals: 369 // - call nanoTime exactly once for each call to park 370 // - if nanos <= 0L, return promptly without allocation or nanoTime 371 // - if nanos == Long.MIN_VALUE, don't underflow 372 // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic 373 // and we suffer a spurious wakeup, we will do no worse than 374 // to park-spin for a while 375 long startTime = 0L; // Special value 0L means not yet parked 376 WaitNode q = null; 377 boolean queued = false; 378 for (;;) { 379 int s = state; 380 if (s > COMPLETING) { 381 if (q != null) 382 q.thread = null; 383 return s; 384 } 385 else if (s == COMPLETING) 386 // We may have already promised (via isDone) that we are done 387 // so never return empty-handed or throw InterruptedException 388 Thread.yield(); 389 else if (Thread.interrupted()) { 390 removeWaiter(q); 391 throw new InterruptedException(); 392 } 393 else if (q == null) { 394 if (timed && nanos <= 0L) 395 return s; 396 q = new WaitNode(); 397 } 398 else if (!queued) 399 queued = U.compareAndSwapObject(this, WAITERS, 400 q.next = waiters, q); 401 else if (timed) { 402 final long parkNanos; 403 if (startTime == 0L) { // first time 404 startTime = System.nanoTime(); 405 if (startTime == 0L) 406 startTime = 1L; 407 parkNanos = nanos; 408 } else { 409 long elapsed = System.nanoTime() - startTime; 410 if (elapsed >= nanos) { 411 removeWaiter(q); 412 return state; 413 } 414 parkNanos = nanos - elapsed; 415 } 416 // nanoTime may be slow; recheck before parking 417 if (state < COMPLETING) 418 LockSupport.parkNanos(this, parkNanos); 419 } 420 else 421 LockSupport.park(this); 422 } 423 } 424 425 /** 426 * Tries to unlink a timed-out or interrupted wait node to avoid 427 * accumulating garbage. Internal nodes are simply unspliced 428 * without CAS since it is harmless if they are traversed anyway 429 * by releasers. To avoid effects of unsplicing from already 430 * removed nodes, the list is retraversed in case of an apparent 431 * race. This is slow when there are a lot of nodes, but we don't 432 * expect lists to be long enough to outweigh higher-overhead 433 * schemes. 434 */ 435 private void removeWaiter(WaitNode node) { 436 if (node != null) { 437 node.thread = null; 438 retry: 439 for (;;) { // restart on removeWaiter race 440 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { 441 s = q.next; 442 if (q.thread != null) 443 pred = q; 444 else if (pred != null) { 445 pred.next = s; 446 if (pred.thread == null) // check for race 447 continue retry; 448 } 449 else if (!U.compareAndSwapObject(this, WAITERS, q, s)) 450 continue retry; 451 } 452 break; 453 } 454 } 455 } 456 457 // Unsafe mechanics 458 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 459 private static final long STATE; 460 private static final long RUNNER; 461 private static final long WAITERS; 462 static { 463 try { 464 STATE = U.objectFieldOffset 465 (FutureTask.class.getDeclaredField("state")); 466 RUNNER = U.objectFieldOffset 467 (FutureTask.class.getDeclaredField("runner")); 468 WAITERS = U.objectFieldOffset 469 (FutureTask.class.getDeclaredField("waiters")); 470 } catch (ReflectiveOperationException e) { 471 throw new Error(e); 472 } 473 474 // Reduce the risk of rare disastrous classloading in first call to 475 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 476 Class<?> ensureLoaded = LockSupport.class; 477 } 478 479 } 480