Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2007 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import static com.google.common.base.Preconditions.checkNotNull;
     20 
     21 import java.util.concurrent.CancellationException;
     22 import java.util.concurrent.ExecutionException;
     23 import java.util.concurrent.Executor;
     24 import java.util.concurrent.TimeUnit;
     25 import java.util.concurrent.TimeoutException;
     26 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
     27 
     28 import javax.annotation.Nullable;
     29 
     30 /**
     31  * An abstract implementation of the {@link ListenableFuture} interface. This
     32  * class is preferable to {@link java.util.concurrent.FutureTask} for two
     33  * reasons: It implements {@code ListenableFuture}, and it does not implement
     34  * {@code Runnable}. (If you want a {@code Runnable} implementation of {@code
     35  * ListenableFuture}, create a {@link ListenableFutureTask}, or submit your
     36  * tasks to a {@link ListeningExecutorService}.)
     37  *
     38  * <p>This class implements all methods in {@code ListenableFuture}.
     39  * Subclasses should provide a way to set the result of the computation through
     40  * the protected methods {@link #set(Object)} and
     41  * {@link #setException(Throwable)}. Subclasses may also override {@link
     42  * #interruptTask()}, which will be invoked automatically if a call to {@link
     43  * #cancel(boolean) cancel(true)} succeeds in canceling the future.
     44  *
     45  * <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal
     46  * with concurrency issues and guarantee thread safety.
     47  *
     48  * <p>The state changing methods all return a boolean indicating success or
     49  * failure in changing the future's state.  Valid states are running,
     50  * completed, failed, or cancelled.
     51  *
     52  * <p>This class uses an {@link ExecutionList} to guarantee that all registered
     53  * listeners will be executed, either when the future finishes or, for listeners
     54  * that are added after the future completes, immediately.
     55  * {@code Runnable}-{@code Executor} pairs are stored in the execution list but
     56  * are not necessarily executed in the order in which they were added.  (If a
     57  * listener is added after the Future is complete, it will be executed
     58  * immediately, even if earlier listeners have not been executed. Additionally,
     59  * executors need not guarantee FIFO execution, or different listeners may run
     60  * in different executors.)
     61  *
     62  * @author Sven Mawson
     63  * @since 1.0
     64  */
     65 public abstract class AbstractFuture<V> implements ListenableFuture<V> {
     66 
     67   /** Synchronization control for AbstractFutures. */
     68   private final Sync<V> sync = new Sync<V>();
     69 
     70   // The execution list to hold our executors.
     71   private final ExecutionList executionList = new ExecutionList();
     72 
     73   /**
     74    * Constructor for use by subclasses.
     75    */
     76   protected AbstractFuture() {}
     77 
     78   /*
     79    * Improve the documentation of when InterruptedException is thrown. Our
     80    * behavior matches the JDK's, but the JDK's documentation is misleading.
     81    */
     82   /**
     83    * {@inheritDoc}
     84    *
     85    * <p>The default {@link AbstractFuture} implementation throws {@code
     86    * InterruptedException} if the current thread is interrupted before or during
     87    * the call, even if the value is already available.
     88    *
     89    * @throws InterruptedException if the current thread was interrupted before
     90    *     or during the call (optional but recommended).
     91    * @throws CancellationException {@inheritDoc}
     92    */
     93   @Override
     94   public V get(long timeout, TimeUnit unit) throws InterruptedException,
     95       TimeoutException, ExecutionException {
     96     return sync.get(unit.toNanos(timeout));
     97   }
     98 
     99   /*
    100    * Improve the documentation of when InterruptedException is thrown. Our
    101    * behavior matches the JDK's, but the JDK's documentation is misleading.
    102    */
    103   /**
    104    * {@inheritDoc}
    105    *
    106    * <p>The default {@link AbstractFuture} implementation throws {@code
    107    * InterruptedException} if the current thread is interrupted before or during
    108    * the call, even if the value is already available.
    109    *
    110    * @throws InterruptedException if the current thread was interrupted before
    111    *     or during the call (optional but recommended).
    112    * @throws CancellationException {@inheritDoc}
    113    */
    114   @Override
    115   public V get() throws InterruptedException, ExecutionException {
    116     return sync.get();
    117   }
    118 
    119   @Override
    120   public boolean isDone() {
    121     return sync.isDone();
    122   }
    123 
    124   @Override
    125   public boolean isCancelled() {
    126     return sync.isCancelled();
    127   }
    128 
    129   @Override
    130   public boolean cancel(boolean mayInterruptIfRunning) {
    131     if (!sync.cancel(mayInterruptIfRunning)) {
    132       return false;
    133     }
    134     executionList.execute();
    135     if (mayInterruptIfRunning) {
    136       interruptTask();
    137     }
    138     return true;
    139   }
    140 
    141   /**
    142    * Subclasses can override this method to implement interruption of the
    143    * future's computation. The method is invoked automatically by a successful
    144    * call to {@link #cancel(boolean) cancel(true)}.
    145    *
    146    * <p>The default implementation does nothing.
    147    *
    148    * @since 10.0
    149    */
    150   protected void interruptTask() {
    151   }
    152 
    153   /**
    154    * Returns true if this future was cancelled with {@code
    155    * mayInterruptIfRunning} set to {@code true}.
    156    *
    157    * @since 14.0
    158    */
    159   protected final boolean wasInterrupted() {
    160     return sync.wasInterrupted();
    161   }
    162 
    163   /**
    164    * {@inheritDoc}
    165    *
    166    * @since 10.0
    167    */
    168   @Override
    169   public void addListener(Runnable listener, Executor exec) {
    170     executionList.add(listener, exec);
    171   }
    172 
    173   /**
    174    * Subclasses should invoke this method to set the result of the computation
    175    * to {@code value}.  This will set the state of the future to
    176    * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
    177    * state was successfully changed.
    178    *
    179    * @param value the value that was the result of the task.
    180    * @return true if the state was successfully changed.
    181    */
    182   protected boolean set(@Nullable V value) {
    183     boolean result = sync.set(value);
    184     if (result) {
    185       executionList.execute();
    186     }
    187     return result;
    188   }
    189 
    190   /**
    191    * Subclasses should invoke this method to set the result of the computation
    192    * to an error, {@code throwable}.  This will set the state of the future to
    193    * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
    194    * state was successfully changed.
    195    *
    196    * @param throwable the exception that the task failed with.
    197    * @return true if the state was successfully changed.
    198    */
    199   protected boolean setException(Throwable throwable) {
    200     boolean result = sync.setException(checkNotNull(throwable));
    201     if (result) {
    202       executionList.execute();
    203     }
    204     return result;
    205   }
    206 
    207   /**
    208    * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
    209    * private subclass to hold the synchronizer.  This synchronizer is used to
    210    * implement the blocking and waiting calls as well as to handle state changes
    211    * in a thread-safe manner.  The current state of the future is held in the
    212    * Sync state, and the lock is released whenever the state changes to
    213    * {@link #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED}
    214    *
    215    * <p>To avoid races between threads doing release and acquire, we transition
    216    * to the final state in two steps.  One thread will successfully CAS from
    217    * RUNNING to COMPLETING, that thread will then set the result of the
    218    * computation, and only then transition to COMPLETED, CANCELLED, or
    219    * INTERRUPTED.
    220    *
    221    * <p>We don't use the integer argument passed between acquire methods so we
    222    * pass around a -1 everywhere.
    223    */
    224   static final class Sync<V> extends AbstractQueuedSynchronizer {
    225 
    226     private static final long serialVersionUID = 0L;
    227 
    228     /* Valid states. */
    229     static final int RUNNING = 0;
    230     static final int COMPLETING = 1;
    231     static final int COMPLETED = 2;
    232     static final int CANCELLED = 4;
    233     static final int INTERRUPTED = 8;
    234 
    235     private V value;
    236     private Throwable exception;
    237 
    238     /*
    239      * Acquisition succeeds if the future is done, otherwise it fails.
    240      */
    241     @Override
    242     protected int tryAcquireShared(int ignored) {
    243       if (isDone()) {
    244         return 1;
    245       }
    246       return -1;
    247     }
    248 
    249     /*
    250      * We always allow a release to go through, this means the state has been
    251      * successfully changed and the result is available.
    252      */
    253     @Override
    254     protected boolean tryReleaseShared(int finalState) {
    255       setState(finalState);
    256       return true;
    257     }
    258 
    259     /**
    260      * Blocks until the task is complete or the timeout expires.  Throws a
    261      * {@link TimeoutException} if the timer expires, otherwise behaves like
    262      * {@link #get()}.
    263      */
    264     V get(long nanos) throws TimeoutException, CancellationException,
    265         ExecutionException, InterruptedException {
    266 
    267       // Attempt to acquire the shared lock with a timeout.
    268       if (!tryAcquireSharedNanos(-1, nanos)) {
    269         throw new TimeoutException("Timeout waiting for task.");
    270       }
    271 
    272       return getValue();
    273     }
    274 
    275     /**
    276      * Blocks until {@link #complete(Object, Throwable, int)} has been
    277      * successfully called.  Throws a {@link CancellationException} if the task
    278      * was cancelled, or a {@link ExecutionException} if the task completed with
    279      * an error.
    280      */
    281     V get() throws CancellationException, ExecutionException,
    282         InterruptedException {
    283 
    284       // Acquire the shared lock allowing interruption.
    285       acquireSharedInterruptibly(-1);
    286       return getValue();
    287     }
    288 
    289     /**
    290      * Implementation of the actual value retrieval.  Will return the value
    291      * on success, an exception on failure, a cancellation on cancellation, or
    292      * an illegal state if the synchronizer is in an invalid state.
    293      */
    294     private V getValue() throws CancellationException, ExecutionException {
    295       int state = getState();
    296       switch (state) {
    297         case COMPLETED:
    298           if (exception != null) {
    299             throw new ExecutionException(exception);
    300           } else {
    301             return value;
    302           }
    303 
    304         case CANCELLED:
    305         case INTERRUPTED:
    306           throw cancellationExceptionWithCause(
    307               "Task was cancelled.", exception);
    308 
    309         default:
    310           throw new IllegalStateException(
    311               "Error, synchronizer in invalid state: " + state);
    312       }
    313     }
    314 
    315     /**
    316      * Checks if the state is {@link #COMPLETED}, {@link #CANCELLED}, or {@link
    317      * INTERRUPTED}.
    318      */
    319     boolean isDone() {
    320       return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0;
    321     }
    322 
    323     /**
    324      * Checks if the state is {@link #CANCELLED} or {@link #INTERRUPTED}.
    325      */
    326     boolean isCancelled() {
    327       return (getState() & (CANCELLED | INTERRUPTED)) != 0;
    328     }
    329 
    330     /**
    331      * Checks if the state is {@link #INTERRUPTED}.
    332      */
    333     boolean wasInterrupted() {
    334       return getState() == INTERRUPTED;
    335     }
    336 
    337     /**
    338      * Transition to the COMPLETED state and set the value.
    339      */
    340     boolean set(@Nullable V v) {
    341       return complete(v, null, COMPLETED);
    342     }
    343 
    344     /**
    345      * Transition to the COMPLETED state and set the exception.
    346      */
    347     boolean setException(Throwable t) {
    348       return complete(null, t, COMPLETED);
    349     }
    350 
    351     /**
    352      * Transition to the CANCELLED or INTERRUPTED state.
    353      */
    354     boolean cancel(boolean interrupt) {
    355       return complete(null, null, interrupt ? INTERRUPTED : CANCELLED);
    356     }
    357 
    358     /**
    359      * Implementation of completing a task.  Either {@code v} or {@code t} will
    360      * be set but not both.  The {@code finalState} is the state to change to
    361      * from {@link #RUNNING}.  If the state is not in the RUNNING state we
    362      * return {@code false} after waiting for the state to be set to a valid
    363      * final state ({@link #COMPLETED}, {@link #CANCELLED}, or {@link
    364      * #INTERRUPTED}).
    365      *
    366      * @param v the value to set as the result of the computation.
    367      * @param t the exception to set as the result of the computation.
    368      * @param finalState the state to transition to.
    369      */
    370     private boolean complete(@Nullable V v, @Nullable Throwable t,
    371         int finalState) {
    372       boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
    373       if (doCompletion) {
    374         // If this thread successfully transitioned to COMPLETING, set the value
    375         // and exception and then release to the final state.
    376         this.value = v;
    377         // Don't actually construct a CancellationException until necessary.
    378         this.exception = ((finalState & (CANCELLED | INTERRUPTED)) != 0)
    379             ? new CancellationException("Future.cancel() was called.") : t;
    380         releaseShared(finalState);
    381       } else if (getState() == COMPLETING) {
    382         // If some other thread is currently completing the future, block until
    383         // they are done so we can guarantee completion.
    384         acquireShared(-1);
    385       }
    386       return doCompletion;
    387     }
    388   }
    389 
    390   static final CancellationException cancellationExceptionWithCause(
    391       @Nullable String message, @Nullable Throwable cause) {
    392     CancellationException exception = new CancellationException(message);
    393     exception.initCause(cause);
    394     return exception;
    395   }
    396 }
    397