Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Written by Doug Lea with assistance from members of JCP JSR-166
      3  * Expert Group and released to the public domain, as explained at
      4  * http://creativecommons.org/licenses/publicdomain
      5  */
      6 
      7 package java.util.concurrent;
      8 import java.util.concurrent.locks.*;
      9 
     10 /**
     11  * A cancellable asynchronous computation.  This class provides a base
     12  * implementation of {@link Future}, with methods to start and cancel
     13  * a computation, query to see if the computation is complete, and
     14  * retrieve the result of the computation.  The result can only be
     15  * retrieved when the computation has completed; the <tt>get</tt>
     16  * method will block if the computation has not yet completed.  Once
     17  * the computation has completed, the computation cannot be restarted
     18  * or cancelled.
     19  *
     20  * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
     21  * {@link java.lang.Runnable} object.  Because <tt>FutureTask</tt>
     22  * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
     23  * submitted to an {@link Executor} for execution.
     24  *
     25  * <p>In addition to serving as a standalone class, this class provides
     26  * <tt>protected</tt> functionality that may be useful when creating
     27  * customized task classes.
     28  *
     29  * @since 1.5
     30  * @author Doug Lea
     31  * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
     32  */
     33 public class FutureTask<V> implements Future<V>, Runnable {
     34     /** Synchronization control for FutureTask */
     35     private final Sync sync;
     36 
     37     /**
     38      * Creates a <tt>FutureTask</tt> that will, upon running, execute the
     39      * given <tt>Callable</tt>.
     40      *
     41      * @param  callable the callable task
     42      * @throws NullPointerException if callable is null
     43      */
     44     public FutureTask(Callable<V> callable) {
     45         if (callable == null)
     46             throw new NullPointerException();
     47         sync = new Sync(callable);
     48     }
     49 
     50     /**
     51      * Creates a <tt>FutureTask</tt> that will, upon running, execute the
     52      * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
     53      * given result on successful completion.
     54      *
     55      * @param runnable the runnable task
     56      * @param result the result to return on successful completion. If
     57      * you don't need a particular result, consider using
     58      * constructions of the form:
     59      * <tt>Future&lt;?&gt; f = new FutureTask&lt;Object&gt;(runnable, null)</tt>
     60      * @throws NullPointerException if runnable is null
     61      */
     62     public FutureTask(Runnable runnable, V result) {
     63         sync = new Sync(Executors.callable(runnable, result));
     64     }
     65 
     66     public boolean isCancelled() {
     67         return sync.innerIsCancelled();
     68     }
     69 
     70     public boolean isDone() {
     71         return sync.innerIsDone();
     72     }
     73 
     74     public boolean cancel(boolean mayInterruptIfRunning) {
     75         return sync.innerCancel(mayInterruptIfRunning);
     76     }
     77 
     78     /**
     79      * @throws CancellationException {@inheritDoc}
     80      */
     81     public V get() throws InterruptedException, ExecutionException {
     82         return sync.innerGet();
     83     }
     84 
     85     /**
     86      * @throws CancellationException {@inheritDoc}
     87      */
     88     public V get(long timeout, TimeUnit unit)
     89         throws InterruptedException, ExecutionException, TimeoutException {
     90         return sync.innerGet(unit.toNanos(timeout));
     91     }
     92 
     93     /**
     94      * Protected method invoked when this task transitions to state
     95      * <tt>isDone</tt> (whether normally or via cancellation). The
     96      * default implementation does nothing.  Subclasses may override
     97      * this method to invoke completion callbacks or perform
     98      * bookkeeping. Note that you can query status inside the
     99      * implementation of this method to determine whether this task
    100      * has been cancelled.
    101      */
    102     protected void done() { }
    103 
    104     /**
    105      * Sets the result of this Future to the given value unless
    106      * this future has already been set or has been cancelled.
    107      * This method is invoked internally by the <tt>run</tt> method
    108      * upon successful completion of the computation.
    109      * @param v the value
    110      */
    111     protected void set(V v) {
    112         sync.innerSet(v);
    113     }
    114 
    115     /**
    116      * Causes this future to report an <tt>ExecutionException</tt>
    117      * with the given throwable as its cause, unless this Future has
    118      * already been set or has been cancelled.
    119      * This method is invoked internally by the <tt>run</tt> method
    120      * upon failure of the computation.
    121      * @param t the cause of failure
    122      */
    123     protected void setException(Throwable t) {
    124         sync.innerSetException(t);
    125     }
    126 
    127     // The following (duplicated) doc comment can be removed once
    128     //
    129     // 6270645: Javadoc comments should be inherited from most derived
    130     //          superinterface or superclass
    131     // is fixed.
    132     /**
    133      * Sets this Future to the result of its computation
    134      * unless it has been cancelled.
    135      */
    136     public void run() {
    137         sync.innerRun();
    138     }
    139 
    140     /**
    141      * Executes the computation without setting its result, and then
    142      * resets this Future to initial state, failing to do so if the
    143      * computation encounters an exception or is cancelled.  This is
    144      * designed for use with tasks that intrinsically execute more
    145      * than once.
    146      * @return true if successfully run and reset
    147      */
    148     protected boolean runAndReset() {
    149         return sync.innerRunAndReset();
    150     }
    151 
    152     /**
    153      * Synchronization control for FutureTask. Note that this must be
    154      * a non-static inner class in order to invoke the protected
    155      * <tt>done</tt> method. For clarity, all inner class support
    156      * methods are same as outer, prefixed with "inner".
    157      *
    158      * Uses AQS sync state to represent run status
    159      */
    160     private final class Sync extends AbstractQueuedSynchronizer {
    161         private static final long serialVersionUID = -7828117401763700385L;
    162 
    163         /** State value representing that task is ready to run */
    164         private static final int READY     = 0;
    165         /** State value representing that task is running */
    166         private static final int RUNNING   = 1;
    167         /** State value representing that task ran */
    168         private static final int RAN       = 2;
    169         /** State value representing that task was cancelled */
    170         private static final int CANCELLED = 4;
    171 
    172         /** The underlying callable */
    173         private final Callable<V> callable;
    174         /** The result to return from get() */
    175         private V result;
    176         /** The exception to throw from get() */
    177         private Throwable exception;
    178 
    179         /**
    180          * The thread running task. When nulled after set/cancel, this
    181          * indicates that the results are accessible.  Must be
    182          * volatile, to ensure visibility upon completion.
    183          */
    184         private volatile Thread runner;
    185 
    186         Sync(Callable<V> callable) {
    187             this.callable = callable;
    188         }
    189 
    190         private boolean ranOrCancelled(int state) {
    191             return (state & (RAN | CANCELLED)) != 0;
    192         }
    193 
    194         /**
    195          * Implements AQS base acquire to succeed if ran or cancelled
    196          */
    197         protected int tryAcquireShared(int ignore) {
    198             return innerIsDone() ? 1 : -1;
    199         }
    200 
    201         /**
    202          * Implements AQS base release to always signal after setting
    203          * final done status by nulling runner thread.
    204          */
    205         protected boolean tryReleaseShared(int ignore) {
    206             runner = null;
    207             return true;
    208         }
    209 
    210         boolean innerIsCancelled() {
    211             return getState() == CANCELLED;
    212         }
    213 
    214         boolean innerIsDone() {
    215             return ranOrCancelled(getState()) && runner == null;
    216         }
    217 
    218         V innerGet() throws InterruptedException, ExecutionException {
    219             acquireSharedInterruptibly(0);
    220             if (getState() == CANCELLED)
    221                 throw new CancellationException();
    222             if (exception != null)
    223                 throw new ExecutionException(exception);
    224             return result;
    225         }
    226 
    227         V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
    228             if (!tryAcquireSharedNanos(0, nanosTimeout))
    229                 throw new TimeoutException();
    230             if (getState() == CANCELLED)
    231                 throw new CancellationException();
    232             if (exception != null)
    233                 throw new ExecutionException(exception);
    234             return result;
    235         }
    236 
    237         void innerSet(V v) {
    238             for (;;) {
    239                 int s = getState();
    240                 if (s == RAN)
    241                     return;
    242                 if (s == CANCELLED) {
    243                     // aggressively release to set runner to null,
    244                     // in case we are racing with a cancel request
    245                     // that will try to interrupt runner
    246                     releaseShared(0);
    247                     return;
    248                 }
    249                 if (compareAndSetState(s, RAN)) {
    250                     result = v;
    251                     releaseShared(0);
    252                     done();
    253                     return;
    254                 }
    255             }
    256         }
    257 
    258         void innerSetException(Throwable t) {
    259             for (;;) {
    260                 int s = getState();
    261                 if (s == RAN)
    262                     return;
    263                 if (s == CANCELLED) {
    264                     // aggressively release to set runner to null,
    265                     // in case we are racing with a cancel request
    266                     // that will try to interrupt runner
    267                     releaseShared(0);
    268                     return;
    269                 }
    270                 if (compareAndSetState(s, RAN)) {
    271                     exception = t;
    272                     releaseShared(0);
    273                     done();
    274                     return;
    275                 }
    276             }
    277         }
    278 
    279         boolean innerCancel(boolean mayInterruptIfRunning) {
    280             for (;;) {
    281                 int s = getState();
    282                 if (ranOrCancelled(s))
    283                     return false;
    284                 if (compareAndSetState(s, CANCELLED))
    285                     break;
    286             }
    287             if (mayInterruptIfRunning) {
    288                 Thread r = runner;
    289                 if (r != null)
    290                     r.interrupt();
    291             }
    292             releaseShared(0);
    293             done();
    294             return true;
    295         }
    296 
    297         void innerRun() {
    298             if (!compareAndSetState(READY, RUNNING))
    299                 return;
    300 
    301             runner = Thread.currentThread();
    302             if (getState() == RUNNING) { // recheck after setting thread
    303                 V result;
    304                 try {
    305                     result = callable.call();
    306                 } catch (Throwable ex) {
    307                     setException(ex);
    308                     return;
    309                 }
    310                 set(result);
    311             } else {
    312                 releaseShared(0); // cancel
    313             }
    314         }
    315 
    316         boolean innerRunAndReset() {
    317             if (!compareAndSetState(READY, RUNNING))
    318                 return false;
    319             try {
    320                 runner = Thread.currentThread();
    321                 if (getState() == RUNNING)
    322                     callable.call(); // don't set result
    323                 runner = null;
    324                 return compareAndSetState(RUNNING, READY);
    325             } catch (Throwable ex) {
    326                 setException(ex);
    327                 return false;
    328             }
    329         }
    330     }
    331 }
    332