Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Written by Doug Lea with assistance from members of JCP JSR-166
      3  * Expert Group and released to the public domain, as explained at
      4  * http://creativecommons.org/publicdomain/zero/1.0/
      5  */
      6 
      7 package java.util.concurrent;
      8 
      9 import java.util.concurrent.locks.LockSupport;
     10 
     11 /**
     12  * A cancellable asynchronous computation.  This class provides a base
     13  * implementation of {@link Future}, with methods to start and cancel
     14  * a computation, query to see if the computation is complete, and
     15  * retrieve the result of the computation.  The result can only be
     16  * retrieved when the computation has completed; the {@code get}
     17  * methods will block if the computation has not yet completed.  Once
     18  * the computation has completed, the computation cannot be restarted
     19  * or cancelled (unless the computation is invoked using
     20  * {@link #runAndReset}).
     21  *
     22  * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
     23  * {@link Runnable} object.  Because {@code FutureTask} implements
     24  * {@code Runnable}, a {@code FutureTask} can be submitted to an
     25  * {@link Executor} for execution.
     26  *
     27  * <p>In addition to serving as a standalone class, this class provides
     28  * {@code protected} functionality that may be useful when creating
     29  * customized task classes.
     30  *
     31  * @since 1.5
     32  * @author Doug Lea
     33  * @param <V> The result type returned by this FutureTask's {@code get} methods
     34  */
     35 public class FutureTask<V> implements RunnableFuture<V> {
     36     /*
     37      * Revision notes: This differs from previous versions of this
     38      * class that relied on AbstractQueuedSynchronizer, mainly to
     39      * avoid surprising users about retaining interrupt status during
     40      * cancellation races. Sync control in the current design relies
     41      * on a "state" field updated via CAS to track completion, along
     42      * with a simple Treiber stack to hold waiting threads.
     43      *
     44      * Style note: As usual, we bypass overhead of using
     45      * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
     46      */
     47 
     48     /**
     49      * The run state of this task, initially NEW.  The run state
     50      * transitions to a terminal state only in methods set,
     51      * setException, and cancel.  During completion, state may take on
     52      * transient values of COMPLETING (while outcome is being set) or
     53      * INTERRUPTING (only while interrupting the runner to satisfy a
     54      * cancel(true)). Transitions from these intermediate to final
     55      * states use cheaper ordered/lazy writes because values are unique
     56      * and cannot be further modified.
     57      *
     58      * Possible state transitions:
     59      * NEW -> COMPLETING -> NORMAL
     60      * NEW -> COMPLETING -> EXCEPTIONAL
     61      * NEW -> CANCELLED
     62      * NEW -> INTERRUPTING -> INTERRUPTED
     63      */
     64     private volatile int state;
     65     private static final int NEW          = 0;
     66     private static final int COMPLETING   = 1;
     67     private static final int NORMAL       = 2;
     68     private static final int EXCEPTIONAL  = 3;
     69     private static final int CANCELLED    = 4;
     70     private static final int INTERRUPTING = 5;
     71     private static final int INTERRUPTED  = 6;
     72 
     73     /** The underlying callable; nulled out after running */
     74     private Callable<V> callable;
     75     /** The result to return or exception to throw from get() */
     76     private Object outcome; // non-volatile, protected by state reads/writes
     77     /** The thread running the callable; CASed during run() */
     78     private volatile Thread runner;
     79     /** Treiber stack of waiting threads */
     80     private volatile WaitNode waiters;
     81 
     82     /**
     83      * Returns result or throws exception for completed task.
     84      *
     85      * @param s completed state value
     86      */
     87     @SuppressWarnings("unchecked")
     88     private V report(int s) throws ExecutionException {
     89         Object x = outcome;
     90         if (s == NORMAL)
     91             return (V)x;
     92         if (s >= CANCELLED)
     93             throw new CancellationException();
     94         throw new ExecutionException((Throwable)x);
     95     }
     96 
     97     /**
     98      * Creates a {@code FutureTask} that will, upon running, execute the
     99      * given {@code Callable}.
    100      *
    101      * @param  callable the callable task
    102      * @throws NullPointerException if the callable is null
    103      */
    104     public FutureTask(Callable<V> callable) {
    105         if (callable == null)
    106             throw new NullPointerException();
    107         this.callable = callable;
    108         this.state = NEW;       // ensure visibility of callable
    109     }
    110 
    111     /**
    112      * Creates a {@code FutureTask} that will, upon running, execute the
    113      * given {@code Runnable}, and arrange that {@code get} will return the
    114      * given result on successful completion.
    115      *
    116      * @param runnable the runnable task
    117      * @param result the result to return on successful completion. If
    118      * you don't need a particular result, consider using
    119      * constructions of the form:
    120      * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
    121      * @throws NullPointerException if the runnable is null
    122      */
    123     public FutureTask(Runnable runnable, V result) {
    124         this.callable = Executors.callable(runnable, result);
    125         this.state = NEW;       // ensure visibility of callable
    126     }
    127 
    128     public boolean isCancelled() {
    129         return state >= CANCELLED;
    130     }
    131 
    132     public boolean isDone() {
    133         return state != NEW;
    134     }
    135 
    136     public boolean cancel(boolean mayInterruptIfRunning) {
    137         if (!(state == NEW &&
    138               U.compareAndSwapInt(this, STATE, NEW,
    139                   mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    140             return false;
    141         try {    // in case call to interrupt throws exception
    142             if (mayInterruptIfRunning) {
    143                 try {
    144                     Thread t = runner;
    145                     if (t != null)
    146                         t.interrupt();
    147                 } finally { // final state
    148                     U.putOrderedInt(this, STATE, INTERRUPTED);
    149                 }
    150             }
    151         } finally {
    152             finishCompletion();
    153         }
    154         return true;
    155     }
    156 
    157     /**
    158      * @throws CancellationException {@inheritDoc}
    159      */
    160     public V get() throws InterruptedException, ExecutionException {
    161         int s = state;
    162         if (s <= COMPLETING)
    163             s = awaitDone(false, 0L);
    164         return report(s);
    165     }
    166 
    167     /**
    168      * @throws CancellationException {@inheritDoc}
    169      */
    170     public V get(long timeout, TimeUnit unit)
    171         throws InterruptedException, ExecutionException, TimeoutException {
    172         if (unit == null)
    173             throw new NullPointerException();
    174         int s = state;
    175         if (s <= COMPLETING &&
    176             (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
    177             throw new TimeoutException();
    178         return report(s);
    179     }
    180 
    181     /**
    182      * Protected method invoked when this task transitions to state
    183      * {@code isDone} (whether normally or via cancellation). The
    184      * default implementation does nothing.  Subclasses may override
    185      * this method to invoke completion callbacks or perform
    186      * bookkeeping. Note that you can query status inside the
    187      * implementation of this method to determine whether this task
    188      * has been cancelled.
    189      */
    190     protected void done() { }
    191 
    192     /**
    193      * Sets the result of this future to the given value unless
    194      * this future has already been set or has been cancelled.
    195      *
    196      * <p>This method is invoked internally by the {@link #run} method
    197      * upon successful completion of the computation.
    198      *
    199      * @param v the value
    200      */
    201     protected void set(V v) {
    202         if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
    203             outcome = v;
    204             U.putOrderedInt(this, STATE, NORMAL); // final state
    205             finishCompletion();
    206         }
    207     }
    208 
    209     /**
    210      * Causes this future to report an {@link ExecutionException}
    211      * with the given throwable as its cause, unless this future has
    212      * already been set or has been cancelled.
    213      *
    214      * <p>This method is invoked internally by the {@link #run} method
    215      * upon failure of the computation.
    216      *
    217      * @param t the cause of failure
    218      */
    219     protected void setException(Throwable t) {
    220         if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
    221             outcome = t;
    222             U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
    223             finishCompletion();
    224         }
    225     }
    226 
    227     public void run() {
    228         if (state != NEW ||
    229             !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
    230             return;
    231         try {
    232             Callable<V> c = callable;
    233             if (c != null && state == NEW) {
    234                 V result;
    235                 boolean ran;
    236                 try {
    237                     result = c.call();
    238                     ran = true;
    239                 } catch (Throwable ex) {
    240                     result = null;
    241                     ran = false;
    242                     setException(ex);
    243                 }
    244                 if (ran)
    245                     set(result);
    246             }
    247         } finally {
    248             // runner must be non-null until state is settled to
    249             // prevent concurrent calls to run()
    250             runner = null;
    251             // state must be re-read after nulling runner to prevent
    252             // leaked interrupts
    253             int s = state;
    254             if (s >= INTERRUPTING)
    255                 handlePossibleCancellationInterrupt(s);
    256         }
    257     }
    258 
    259     /**
    260      * Executes the computation without setting its result, and then
    261      * resets this future to initial state, failing to do so if the
    262      * computation encounters an exception or is cancelled.  This is
    263      * designed for use with tasks that intrinsically execute more
    264      * than once.
    265      *
    266      * @return {@code true} if successfully run and reset
    267      */
    268     protected boolean runAndReset() {
    269         if (state != NEW ||
    270             !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
    271             return false;
    272         boolean ran = false;
    273         int s = state;
    274         try {
    275             Callable<V> c = callable;
    276             if (c != null && s == NEW) {
    277                 try {
    278                     c.call(); // don't set result
    279                     ran = true;
    280                 } catch (Throwable ex) {
    281                     setException(ex);
    282                 }
    283             }
    284         } finally {
    285             // runner must be non-null until state is settled to
    286             // prevent concurrent calls to run()
    287             runner = null;
    288             // state must be re-read after nulling runner to prevent
    289             // leaked interrupts
    290             s = state;
    291             if (s >= INTERRUPTING)
    292                 handlePossibleCancellationInterrupt(s);
    293         }
    294         return ran && s == NEW;
    295     }
    296 
    297     /**
    298      * Ensures that any interrupt from a possible cancel(true) is only
    299      * delivered to a task while in run or runAndReset.
    300      */
    301     private void handlePossibleCancellationInterrupt(int s) {
    302         // It is possible for our interrupter to stall before getting a
    303         // chance to interrupt us.  Let's spin-wait patiently.
    304         if (s == INTERRUPTING)
    305             while (state == INTERRUPTING)
    306                 Thread.yield(); // wait out pending interrupt
    307 
    308         // assert state == INTERRUPTED;
    309 
    310         // We want to clear any interrupt we may have received from
    311         // cancel(true).  However, it is permissible to use interrupts
    312         // as an independent mechanism for a task to communicate with
    313         // its caller, and there is no way to clear only the
    314         // cancellation interrupt.
    315         //
    316         // Thread.interrupted();
    317     }
    318 
    319     /**
    320      * Simple linked list nodes to record waiting threads in a Treiber
    321      * stack.  See other classes such as Phaser and SynchronousQueue
    322      * for more detailed explanation.
    323      */
    324     static final class WaitNode {
    325         volatile Thread thread;
    326         volatile WaitNode next;
    327         WaitNode() { thread = Thread.currentThread(); }
    328     }
    329 
    330     /**
    331      * Removes and signals all waiting threads, invokes done(), and
    332      * nulls out callable.
    333      */
    334     private void finishCompletion() {
    335         // assert state > COMPLETING;
    336         for (WaitNode q; (q = waiters) != null;) {
    337             if (U.compareAndSwapObject(this, WAITERS, q, null)) {
    338                 for (;;) {
    339                     Thread t = q.thread;
    340                     if (t != null) {
    341                         q.thread = null;
    342                         LockSupport.unpark(t);
    343                     }
    344                     WaitNode next = q.next;
    345                     if (next == null)
    346                         break;
    347                     q.next = null; // unlink to help gc
    348                     q = next;
    349                 }
    350                 break;
    351             }
    352         }
    353 
    354         done();
    355 
    356         callable = null;        // to reduce footprint
    357     }
    358 
    359     /**
    360      * Awaits completion or aborts on interrupt or timeout.
    361      *
    362      * @param timed true if use timed waits
    363      * @param nanos time to wait, if timed
    364      * @return state upon completion or at timeout
    365      */
    366     private int awaitDone(boolean timed, long nanos)
    367         throws InterruptedException {
    368         // The code below is very delicate, to achieve these goals:
    369         // - call nanoTime exactly once for each call to park
    370         // - if nanos <= 0L, return promptly without allocation or nanoTime
    371         // - if nanos == Long.MIN_VALUE, don't underflow
    372         // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
    373         //   and we suffer a spurious wakeup, we will do no worse than
    374         //   to park-spin for a while
    375         long startTime = 0L;    // Special value 0L means not yet parked
    376         WaitNode q = null;
    377         boolean queued = false;
    378         for (;;) {
    379             int s = state;
    380             if (s > COMPLETING) {
    381                 if (q != null)
    382                     q.thread = null;
    383                 return s;
    384             }
    385             else if (s == COMPLETING)
    386                 // We may have already promised (via isDone) that we are done
    387                 // so never return empty-handed or throw InterruptedException
    388                 Thread.yield();
    389             else if (Thread.interrupted()) {
    390                 removeWaiter(q);
    391                 throw new InterruptedException();
    392             }
    393             else if (q == null) {
    394                 if (timed && nanos <= 0L)
    395                     return s;
    396                 q = new WaitNode();
    397             }
    398             else if (!queued)
    399                 queued = U.compareAndSwapObject(this, WAITERS,
    400                                                 q.next = waiters, q);
    401             else if (timed) {
    402                 final long parkNanos;
    403                 if (startTime == 0L) { // first time
    404                     startTime = System.nanoTime();
    405                     if (startTime == 0L)
    406                         startTime = 1L;
    407                     parkNanos = nanos;
    408                 } else {
    409                     long elapsed = System.nanoTime() - startTime;
    410                     if (elapsed >= nanos) {
    411                         removeWaiter(q);
    412                         return state;
    413                     }
    414                     parkNanos = nanos - elapsed;
    415                 }
    416                 // nanoTime may be slow; recheck before parking
    417                 if (state < COMPLETING)
    418                     LockSupport.parkNanos(this, parkNanos);
    419             }
    420             else
    421                 LockSupport.park(this);
    422         }
    423     }
    424 
    425     /**
    426      * Tries to unlink a timed-out or interrupted wait node to avoid
    427      * accumulating garbage.  Internal nodes are simply unspliced
    428      * without CAS since it is harmless if they are traversed anyway
    429      * by releasers.  To avoid effects of unsplicing from already
    430      * removed nodes, the list is retraversed in case of an apparent
    431      * race.  This is slow when there are a lot of nodes, but we don't
    432      * expect lists to be long enough to outweigh higher-overhead
    433      * schemes.
    434      */
    435     private void removeWaiter(WaitNode node) {
    436         if (node != null) {
    437             node.thread = null;
    438             retry:
    439             for (;;) {          // restart on removeWaiter race
    440                 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
    441                     s = q.next;
    442                     if (q.thread != null)
    443                         pred = q;
    444                     else if (pred != null) {
    445                         pred.next = s;
    446                         if (pred.thread == null) // check for race
    447                             continue retry;
    448                     }
    449                     else if (!U.compareAndSwapObject(this, WAITERS, q, s))
    450                         continue retry;
    451                 }
    452                 break;
    453             }
    454         }
    455     }
    456 
    457     // Unsafe mechanics
    458     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    459     private static final long STATE;
    460     private static final long RUNNER;
    461     private static final long WAITERS;
    462     static {
    463         try {
    464             STATE = U.objectFieldOffset
    465                 (FutureTask.class.getDeclaredField("state"));
    466             RUNNER = U.objectFieldOffset
    467                 (FutureTask.class.getDeclaredField("runner"));
    468             WAITERS = U.objectFieldOffset
    469                 (FutureTask.class.getDeclaredField("waiters"));
    470         } catch (ReflectiveOperationException e) {
    471             throw new Error(e);
    472         }
    473 
    474         // Reduce the risk of rare disastrous classloading in first call to
    475         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
    476         Class<?> ensureLoaded = LockSupport.class;
    477     }
    478 
    479 }
    480