Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      3  *
      4  * This code is free software; you can redistribute it and/or modify it
      5  * under the terms of the GNU General Public License version 2 only, as
      6  * published by the Free Software Foundation.  Oracle designates this
      7  * particular file as subject to the "Classpath" exception as provided
      8  * by Oracle in the LICENSE file that accompanied this code.
      9  *
     10  * This code is distributed in the hope that it will be useful, but WITHOUT
     11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     13  * version 2 for more details (a copy is included in the LICENSE file that
     14  * accompanied this code).
     15  *
     16  * You should have received a copy of the GNU General Public License version
     17  * 2 along with this work; if not, write to the Free Software Foundation,
     18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     19  *
     20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     21  * or visit www.oracle.com if you need additional information or have any
     22  * questions.
     23  */
     24 
     25 /*
     26  * This file is available under and governed by the GNU General Public
     27  * License version 2 only, as published by the Free Software Foundation.
     28  * However, the following notice accompanied the original version of this
     29  * file:
     30  *
     31  * Written by Doug Lea with assistance from members of JCP JSR-166
     32  * Expert Group and released to the public domain, as explained at
     33  * http://creativecommons.org/publicdomain/zero/1.0/
     34  */
     35 
     36 package java.util.concurrent;
     37 
     38 import java.util.concurrent.locks.LockSupport;
     39 
     40 /**
     41  * A cancellable asynchronous computation.  This class provides a base
     42  * implementation of {@link Future}, with methods to start and cancel
     43  * a computation, query to see if the computation is complete, and
     44  * retrieve the result of the computation.  The result can only be
     45  * retrieved when the computation has completed; the {@code get}
     46  * methods will block if the computation has not yet completed.  Once
     47  * the computation has completed, the computation cannot be restarted
     48  * or cancelled (unless the computation is invoked using
     49  * {@link #runAndReset}).
     50  *
     51  * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
     52  * {@link Runnable} object.  Because {@code FutureTask} implements
     53  * {@code Runnable}, a {@code FutureTask} can be submitted to an
     54  * {@link Executor} for execution.
     55  *
     56  * <p>In addition to serving as a standalone class, this class provides
     57  * {@code protected} functionality that may be useful when creating
     58  * customized task classes.
     59  *
     60  * @since 1.5
     61  * @author Doug Lea
     62  * @param <V> The result type returned by this FutureTask's {@code get} methods
     63  */
     64 public class FutureTask<V> implements RunnableFuture<V> {
     65     /*
     66      * Revision notes: This differs from previous versions of this
     67      * class that relied on AbstractQueuedSynchronizer, mainly to
     68      * avoid surprising users about retaining interrupt status during
     69      * cancellation races. Sync control in the current design relies
     70      * on a "state" field updated via CAS to track completion, along
     71      * with a simple Treiber stack to hold waiting threads.
     72      *
     73      * Style note: As usual, we bypass overhead of using
     74      * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
     75      */
     76 
     77     /**
     78      * The run state of this task, initially NEW.  The run state
     79      * transitions to a terminal state only in methods set,
     80      * setException, and cancel.  During completion, state may take on
     81      * transient values of COMPLETING (while outcome is being set) or
     82      * INTERRUPTING (only while interrupting the runner to satisfy a
     83      * cancel(true)). Transitions from these intermediate to final
     84      * states use cheaper ordered/lazy writes because values are unique
     85      * and cannot be further modified.
     86      *
     87      * Possible state transitions:
     88      * NEW -> COMPLETING -> NORMAL
     89      * NEW -> COMPLETING -> EXCEPTIONAL
     90      * NEW -> CANCELLED
     91      * NEW -> INTERRUPTING -> INTERRUPTED
     92      */
     93     private volatile int state;
     94     private static final int NEW          = 0;
     95     private static final int COMPLETING   = 1;
     96     private static final int NORMAL       = 2;
     97     private static final int EXCEPTIONAL  = 3;
     98     private static final int CANCELLED    = 4;
     99     private static final int INTERRUPTING = 5;
    100     private static final int INTERRUPTED  = 6;
    101 
    102     /** The underlying callable; nulled out after running */
    103     private Callable<V> callable;
    104     /** The result to return or exception to throw from get() */
    105     private Object outcome; // non-volatile, protected by state reads/writes
    106     /** The thread running the callable; CASed during run() */
    107     private volatile Thread runner;
    108     /** Treiber stack of waiting threads */
    109     private volatile WaitNode waiters;
    110 
    111     /**
    112      * Returns result or throws exception for completed task.
    113      *
    114      * @param s completed state value
    115      */
    116     @SuppressWarnings("unchecked")
    117     private V report(int s) throws ExecutionException {
    118         Object x = outcome;
    119         if (s == NORMAL)
    120             return (V)x;
    121         if (s >= CANCELLED)
    122             throw new CancellationException();
    123         throw new ExecutionException((Throwable)x);
    124     }
    125 
    126     /**
    127      * Creates a {@code FutureTask} that will, upon running, execute the
    128      * given {@code Callable}.
    129      *
    130      * @param  callable the callable task
    131      * @throws NullPointerException if the callable is null
    132      */
    133     public FutureTask(Callable<V> callable) {
    134         if (callable == null)
    135             throw new NullPointerException();
    136         this.callable = callable;
    137         this.state = NEW;       // ensure visibility of callable
    138     }
    139 
    140     /**
    141      * Creates a {@code FutureTask} that will, upon running, execute the
    142      * given {@code Runnable}, and arrange that {@code get} will return the
    143      * given result on successful completion.
    144      *
    145      * @param runnable the runnable task
    146      * @param result the result to return on successful completion. If
    147      * you don't need a particular result, consider using
    148      * constructions of the form:
    149      * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
    150      * @throws NullPointerException if the runnable is null
    151      */
    152     public FutureTask(Runnable runnable, V result) {
    153         this.callable = Executors.callable(runnable, result);
    154         this.state = NEW;       // ensure visibility of callable
    155     }
    156 
    157     public boolean isCancelled() {
    158         return state >= CANCELLED;
    159     }
    160 
    161     public boolean isDone() {
    162         return state != NEW;
    163     }
    164 
    165     public boolean cancel(boolean mayInterruptIfRunning) {
    166         if (!(state == NEW &&
    167               U.compareAndSwapInt(this, STATE, NEW,
    168                   mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    169             return false;
    170         try {    // in case call to interrupt throws exception
    171             if (mayInterruptIfRunning) {
    172                 try {
    173                     Thread t = runner;
    174                     if (t != null)
    175                         t.interrupt();
    176                 } finally { // final state
    177                     U.putOrderedInt(this, STATE, INTERRUPTED);
    178                 }
    179             }
    180         } finally {
    181             finishCompletion();
    182         }
    183         return true;
    184     }
    185 
    186     /**
    187      * @throws CancellationException {@inheritDoc}
    188      */
    189     public V get() throws InterruptedException, ExecutionException {
    190         int s = state;
    191         if (s <= COMPLETING)
    192             s = awaitDone(false, 0L);
    193         return report(s);
    194     }
    195 
    196     /**
    197      * @throws CancellationException {@inheritDoc}
    198      */
    199     public V get(long timeout, TimeUnit unit)
    200         throws InterruptedException, ExecutionException, TimeoutException {
    201         if (unit == null)
    202             throw new NullPointerException();
    203         int s = state;
    204         if (s <= COMPLETING &&
    205             (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
    206             throw new TimeoutException();
    207         return report(s);
    208     }
    209 
    210     /**
    211      * Protected method invoked when this task transitions to state
    212      * {@code isDone} (whether normally or via cancellation). The
    213      * default implementation does nothing.  Subclasses may override
    214      * this method to invoke completion callbacks or perform
    215      * bookkeeping. Note that you can query status inside the
    216      * implementation of this method to determine whether this task
    217      * has been cancelled.
    218      */
    219     protected void done() { }
    220 
    221     /**
    222      * Sets the result of this future to the given value unless
    223      * this future has already been set or has been cancelled.
    224      *
    225      * <p>This method is invoked internally by the {@link #run} method
    226      * upon successful completion of the computation.
    227      *
    228      * @param v the value
    229      */
    230     protected void set(V v) {
    231         if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
    232             outcome = v;
    233             U.putOrderedInt(this, STATE, NORMAL); // final state
    234             finishCompletion();
    235         }
    236     }
    237 
    238     /**
    239      * Causes this future to report an {@link ExecutionException}
    240      * with the given throwable as its cause, unless this future has
    241      * already been set or has been cancelled.
    242      *
    243      * <p>This method is invoked internally by the {@link #run} method
    244      * upon failure of the computation.
    245      *
    246      * @param t the cause of failure
    247      */
    248     protected void setException(Throwable t) {
    249         if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
    250             outcome = t;
    251             U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
    252             finishCompletion();
    253         }
    254     }
    255 
    256     public void run() {
    257         if (state != NEW ||
    258             !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
    259             return;
    260         try {
    261             Callable<V> c = callable;
    262             if (c != null && state == NEW) {
    263                 V result;
    264                 boolean ran;
    265                 try {
    266                     result = c.call();
    267                     ran = true;
    268                 } catch (Throwable ex) {
    269                     result = null;
    270                     ran = false;
    271                     setException(ex);
    272                 }
    273                 if (ran)
    274                     set(result);
    275             }
    276         } finally {
    277             // runner must be non-null until state is settled to
    278             // prevent concurrent calls to run()
    279             runner = null;
    280             // state must be re-read after nulling runner to prevent
    281             // leaked interrupts
    282             int s = state;
    283             if (s >= INTERRUPTING)
    284                 handlePossibleCancellationInterrupt(s);
    285         }
    286     }
    287 
    288     /**
    289      * Executes the computation without setting its result, and then
    290      * resets this future to initial state, failing to do so if the
    291      * computation encounters an exception or is cancelled.  This is
    292      * designed for use with tasks that intrinsically execute more
    293      * than once.
    294      *
    295      * @return {@code true} if successfully run and reset
    296      */
    297     protected boolean runAndReset() {
    298         if (state != NEW ||
    299             !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
    300             return false;
    301         boolean ran = false;
    302         int s = state;
    303         try {
    304             Callable<V> c = callable;
    305             if (c != null && s == NEW) {
    306                 try {
    307                     c.call(); // don't set result
    308                     ran = true;
    309                 } catch (Throwable ex) {
    310                     setException(ex);
    311                 }
    312             }
    313         } finally {
    314             // runner must be non-null until state is settled to
    315             // prevent concurrent calls to run()
    316             runner = null;
    317             // state must be re-read after nulling runner to prevent
    318             // leaked interrupts
    319             s = state;
    320             if (s >= INTERRUPTING)
    321                 handlePossibleCancellationInterrupt(s);
    322         }
    323         return ran && s == NEW;
    324     }
    325 
    326     /**
    327      * Ensures that any interrupt from a possible cancel(true) is only
    328      * delivered to a task while in run or runAndReset.
    329      */
    330     private void handlePossibleCancellationInterrupt(int s) {
    331         // It is possible for our interrupter to stall before getting a
    332         // chance to interrupt us.  Let's spin-wait patiently.
    333         if (s == INTERRUPTING)
    334             while (state == INTERRUPTING)
    335                 Thread.yield(); // wait out pending interrupt
    336 
    337         // assert state == INTERRUPTED;
    338 
    339         // We want to clear any interrupt we may have received from
    340         // cancel(true).  However, it is permissible to use interrupts
    341         // as an independent mechanism for a task to communicate with
    342         // its caller, and there is no way to clear only the
    343         // cancellation interrupt.
    344         //
    345         // Thread.interrupted();
    346     }
    347 
    348     /**
    349      * Simple linked list nodes to record waiting threads in a Treiber
    350      * stack.  See other classes such as Phaser and SynchronousQueue
    351      * for more detailed explanation.
    352      */
    353     static final class WaitNode {
    354         volatile Thread thread;
    355         volatile WaitNode next;
    356         WaitNode() { thread = Thread.currentThread(); }
    357     }
    358 
    359     /**
    360      * Removes and signals all waiting threads, invokes done(), and
    361      * nulls out callable.
    362      */
    363     private void finishCompletion() {
    364         // assert state > COMPLETING;
    365         for (WaitNode q; (q = waiters) != null;) {
    366             if (U.compareAndSwapObject(this, WAITERS, q, null)) {
    367                 for (;;) {
    368                     Thread t = q.thread;
    369                     if (t != null) {
    370                         q.thread = null;
    371                         LockSupport.unpark(t);
    372                     }
    373                     WaitNode next = q.next;
    374                     if (next == null)
    375                         break;
    376                     q.next = null; // unlink to help gc
    377                     q = next;
    378                 }
    379                 break;
    380             }
    381         }
    382 
    383         done();
    384 
    385         callable = null;        // to reduce footprint
    386     }
    387 
    388     /**
    389      * Awaits completion or aborts on interrupt or timeout.
    390      *
    391      * @param timed true if use timed waits
    392      * @param nanos time to wait, if timed
    393      * @return state upon completion or at timeout
    394      */
    395     private int awaitDone(boolean timed, long nanos)
    396         throws InterruptedException {
    397         // The code below is very delicate, to achieve these goals:
    398         // - call nanoTime exactly once for each call to park
    399         // - if nanos <= 0L, return promptly without allocation or nanoTime
    400         // - if nanos == Long.MIN_VALUE, don't underflow
    401         // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
    402         //   and we suffer a spurious wakeup, we will do no worse than
    403         //   to park-spin for a while
    404         long startTime = 0L;    // Special value 0L means not yet parked
    405         WaitNode q = null;
    406         boolean queued = false;
    407         for (;;) {
    408             int s = state;
    409             if (s > COMPLETING) {
    410                 if (q != null)
    411                     q.thread = null;
    412                 return s;
    413             }
    414             else if (s == COMPLETING)
    415                 // We may have already promised (via isDone) that we are done
    416                 // so never return empty-handed or throw InterruptedException
    417                 Thread.yield();
    418             else if (Thread.interrupted()) {
    419                 removeWaiter(q);
    420                 throw new InterruptedException();
    421             }
    422             else if (q == null) {
    423                 if (timed && nanos <= 0L)
    424                     return s;
    425                 q = new WaitNode();
    426             }
    427             else if (!queued)
    428                 queued = U.compareAndSwapObject(this, WAITERS,
    429                                                 q.next = waiters, q);
    430             else if (timed) {
    431                 final long parkNanos;
    432                 if (startTime == 0L) { // first time
    433                     startTime = System.nanoTime();
    434                     if (startTime == 0L)
    435                         startTime = 1L;
    436                     parkNanos = nanos;
    437                 } else {
    438                     long elapsed = System.nanoTime() - startTime;
    439                     if (elapsed >= nanos) {
    440                         removeWaiter(q);
    441                         return state;
    442                     }
    443                     parkNanos = nanos - elapsed;
    444                 }
    445                 // nanoTime may be slow; recheck before parking
    446                 if (state < COMPLETING)
    447                     LockSupport.parkNanos(this, parkNanos);
    448             }
    449             else
    450                 LockSupport.park(this);
    451         }
    452     }
    453 
    454     /**
    455      * Tries to unlink a timed-out or interrupted wait node to avoid
    456      * accumulating garbage.  Internal nodes are simply unspliced
    457      * without CAS since it is harmless if they are traversed anyway
    458      * by releasers.  To avoid effects of unsplicing from already
    459      * removed nodes, the list is retraversed in case of an apparent
    460      * race.  This is slow when there are a lot of nodes, but we don't
    461      * expect lists to be long enough to outweigh higher-overhead
    462      * schemes.
    463      */
    464     private void removeWaiter(WaitNode node) {
    465         if (node != null) {
    466             node.thread = null;
    467             retry:
    468             for (;;) {          // restart on removeWaiter race
    469                 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
    470                     s = q.next;
    471                     if (q.thread != null)
    472                         pred = q;
    473                     else if (pred != null) {
    474                         pred.next = s;
    475                         if (pred.thread == null) // check for race
    476                             continue retry;
    477                     }
    478                     else if (!U.compareAndSwapObject(this, WAITERS, q, s))
    479                         continue retry;
    480                 }
    481                 break;
    482             }
    483         }
    484     }
    485 
    486     // Unsafe mechanics
    487     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    488     private static final long STATE;
    489     private static final long RUNNER;
    490     private static final long WAITERS;
    491     static {
    492         try {
    493             STATE = U.objectFieldOffset
    494                 (FutureTask.class.getDeclaredField("state"));
    495             RUNNER = U.objectFieldOffset
    496                 (FutureTask.class.getDeclaredField("runner"));
    497             WAITERS = U.objectFieldOffset
    498                 (FutureTask.class.getDeclaredField("waiters"));
    499         } catch (ReflectiveOperationException e) {
    500             throw new Error(e);
    501         }
    502 
    503         // Reduce the risk of rare disastrous classloading in first call to
    504         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
    505         Class<?> ensureLoaded = LockSupport.class;
    506     }
    507 
    508 }
    509