Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2007 Google Inc.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import java.util.concurrent.CancellationException;
     20 import java.util.concurrent.ExecutionException;
     21 import java.util.concurrent.Future;
     22 import java.util.concurrent.TimeUnit;
     23 import java.util.concurrent.TimeoutException;
     24 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
     25 
     26 /**
     27  * <p>An abstract implementation of the {@link Future} interface.  This class
     28  * is an abstraction of {@link java.util.concurrent.FutureTask} to support use
     29  * for tasks other than {@link Runnable}s.  It uses an
     30  * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and
     31  * guarantee thread safety.  It could be used as a base class to
     32  * {@code FutureTask}, or any other implementor of the {@code Future} interface.
     33  *
     34  * <p>This class implements all methods in {@code Future}.  Subclasses should
     35  * provide a way to set the result of the computation through the protected
     36  * methods {@link #set(Object)}, {@link #setException(Throwable)}, or
     37  * {@link #cancel()}.  If subclasses want to implement cancellation they can
     38  * override the {@link #cancel(boolean)} method with a real implementation, the
     39  * default implementation doesn't support cancellation.
     40  *
     41  * <p>The state changing methods all return a boolean indicating success or
     42  * failure in changing the future's state.  Valid states are running,
     43  * completed, failed, or cancelled.  Because this class does not implement
     44  * cancellation it is left to the subclass to distinguish between created
     45  * and running tasks.
     46  *
     47  * @author Sven Mawson
     48  * @since 2009.09.15 <b>tentative</b>
     49  */
     50 public abstract class AbstractFuture<V> implements Future<V> {
     51 
     52   /** Synchronization control for AbstractFutures. */
     53   private final Sync<V> sync = new Sync<V>();
     54 
     55   /*
     56    * Blocks until either the task completes or the timeout expires.  Uses the
     57    * sync blocking-with-timeout support provided by AQS.
     58    */
     59   public V get(long timeout, TimeUnit unit) throws InterruptedException,
     60       TimeoutException, ExecutionException {
     61     return sync.get(unit.toNanos(timeout));
     62   }
     63 
     64   /*
     65    * Blocks until the task completes or we get interrupted. Uses the
     66    * interruptible blocking support provided by AQS.
     67    */
     68   public V get() throws InterruptedException, ExecutionException {
     69     return sync.get();
     70   }
     71 
     72   /*
     73    * Checks if the sync is not in the running state.
     74    */
     75   public boolean isDone() {
     76     return sync.isDone();
     77   }
     78 
     79   /*
     80    * Checks if the sync is in the cancelled state.
     81    */
     82   public boolean isCancelled() {
     83     return sync.isCancelled();
     84   }
     85 
     86   /*
     87    * Default implementation of cancel that never cancels the future.
     88    * Subclasses should override this to implement cancellation if desired.
     89    */
     90   public boolean cancel(boolean mayInterruptIfRunning) {
     91     return false;
     92   }
     93 
     94   /**
     95    * Subclasses should invoke this method to set the result of the computation
     96    * to {@code value}.  This will set the state of the future to
     97    * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
     98    * state was successfully changed.
     99    *
    100    * @param value the value that was the result of the task.
    101    * @return true if the state was successfully changed.
    102    */
    103   protected boolean set(V value) {
    104     boolean result = sync.set(value);
    105     if (result) {
    106       done();
    107     }
    108     return result;
    109   }
    110 
    111   /**
    112    * Subclasses should invoke this method to set the result of the computation
    113    * to an error, {@code throwable}.  This will set the state of the future to
    114    * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
    115    * state was successfully changed.
    116    *
    117    * @param throwable the exception that the task failed with.
    118    * @return true if the state was successfully changed.
    119    * @throws Error if the throwable was an {@link Error}.
    120    */
    121   protected boolean setException(Throwable throwable) {
    122     boolean result = sync.setException(throwable);
    123     if (result) {
    124       done();
    125     }
    126 
    127     // If it's an Error, we want to make sure it reaches the top of the
    128     // call stack, so we rethrow it.
    129     if (throwable instanceof Error) {
    130       throw (Error) throwable;
    131     }
    132     return result;
    133   }
    134 
    135   /**
    136    * Subclasses should invoke this method to mark the future as cancelled.
    137    * This will set the state of the future to {@link
    138    * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was
    139    * successfully changed.
    140    *
    141    * @return true if the state was successfully changed.
    142    */
    143   protected final boolean cancel() {
    144     boolean result = sync.cancel();
    145     if (result) {
    146       done();
    147     }
    148     return result;
    149   }
    150 
    151   /*
    152    * Called by the success, failed, or cancelled methods to indicate that the
    153    * value is now available and the latch can be released.  Subclasses can
    154    * use this method to deal with any actions that should be undertaken when
    155    * the task has completed.
    156    */
    157   protected void done() {
    158     // Default implementation does nothing.
    159   }
    160 
    161   /**
    162    * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
    163    * private subclass to hold the synchronizer.  This synchronizer is used to
    164    * implement the blocking and waiting calls as well as to handle state changes
    165    * in a thread-safe manner.  The current state of the future is held in the
    166    * Sync state, and the lock is released whenever the state changes to either
    167    * {@link #COMPLETED} or {@link #CANCELLED}.
    168    *
    169    * <p>To avoid races between threads doing release and acquire, we transition
    170    * to the final state in two steps.  One thread will successfully CAS from
    171    * RUNNING to COMPLETING, that thread will then set the result of the
    172    * computation, and only then transition to COMPLETED or CANCELLED.
    173    *
    174    * <p>We don't use the integer argument passed between acquire methods so we
    175    * pass around a -1 everywhere.
    176    */
    177   static final class Sync<V> extends AbstractQueuedSynchronizer {
    178 
    179     private static final long serialVersionUID = 0L;
    180 
    181     /* Valid states. */
    182     static final int RUNNING = 0;
    183     static final int COMPLETING = 1;
    184     static final int COMPLETED = 2;
    185     static final int CANCELLED = 4;
    186 
    187     private V value;
    188     private ExecutionException exception;
    189 
    190     /*
    191      * Acquisition succeeds if the future is done, otherwise it fails.
    192      */
    193     @Override
    194     protected int tryAcquireShared(int ignored) {
    195       if (isDone()) {
    196         return 1;
    197       }
    198       return -1;
    199     }
    200 
    201     /*
    202      * We always allow a release to go through, this means the state has been
    203      * successfully changed and the result is available.
    204      */
    205     @Override
    206     protected boolean tryReleaseShared(int finalState) {
    207       setState(finalState);
    208       return true;
    209     }
    210 
    211     /**
    212      * Blocks until the task is complete or the timeout expires.  Throws a
    213      * {@link TimeoutException} if the timer expires, otherwise behaves like
    214      * {@link #get()}.
    215      */
    216     V get(long nanos) throws TimeoutException, CancellationException,
    217         ExecutionException, InterruptedException {
    218 
    219       // Attempt to acquire the shared lock with a timeout.
    220       if (!tryAcquireSharedNanos(-1, nanos)) {
    221         throw new TimeoutException("Timeout waiting for task.");
    222       }
    223 
    224       return getValue();
    225     }
    226 
    227     /**
    228      * Blocks until {@link #complete(Object, Throwable, int)} has been
    229      * successfully called.  Throws a {@link CancellationException} if the task
    230      * was cancelled, or a {@link ExecutionException} if the task completed with
    231      * an error.
    232      */
    233     V get() throws CancellationException, ExecutionException,
    234         InterruptedException {
    235 
    236       // Acquire the shared lock allowing interruption.
    237       acquireSharedInterruptibly(-1);
    238       return getValue();
    239     }
    240 
    241     /**
    242      * Implementation of the actual value retrieval.  Will return the value
    243      * on success, an exception on failure, a cancellation on cancellation, or
    244      * an illegal state if the synchronizer is in an invalid state.
    245      */
    246     private V getValue() throws CancellationException, ExecutionException {
    247       int state = getState();
    248       switch (state) {
    249         case COMPLETED:
    250           if (exception != null) {
    251             throw exception;
    252           } else {
    253             return value;
    254           }
    255 
    256         case CANCELLED:
    257           throw new CancellationException("Task was cancelled.");
    258 
    259         default:
    260           throw new IllegalStateException(
    261               "Error, synchronizer in invalid state: " + state);
    262       }
    263     }
    264 
    265     /**
    266      * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
    267      */
    268     boolean isDone() {
    269       return (getState() & (COMPLETED | CANCELLED)) != 0;
    270     }
    271 
    272     /**
    273      * Checks if the state is {@link #CANCELLED}.
    274      */
    275     boolean isCancelled() {
    276       return getState() == CANCELLED;
    277     }
    278 
    279     /**
    280      * Transition to the COMPLETED state and set the value.
    281      */
    282     boolean set(V v) {
    283       return complete(v, null, COMPLETED);
    284     }
    285 
    286     /**
    287      * Transition to the COMPLETED state and set the exception.
    288      */
    289     boolean setException(Throwable t) {
    290       return complete(null, t, COMPLETED);
    291     }
    292 
    293     /**
    294      * Transition to the CANCELLED state.
    295      */
    296     boolean cancel() {
    297       return complete(null, null, CANCELLED);
    298     }
    299 
    300     /**
    301      * Implementation of completing a task.  Either {@code v} or {@code t} will
    302      * be set but not both.  The {@code finalState} is the state to change to
    303      * from {@link #RUNNING}.  If the state is not in the RUNNING state we
    304      * return {@code false}.
    305      *
    306      * @param v the value to set as the result of the computation.
    307      * @param t the exception to set as the result of the computation.
    308      * @param finalState the state to transition to.
    309      */
    310     private boolean complete(V v, Throwable t, int finalState) {
    311       if (compareAndSetState(RUNNING, COMPLETING)) {
    312         this.value = v;
    313         this.exception = t == null ? null : new ExecutionException(t);
    314         releaseShared(finalState);
    315         return true;
    316       }
    317 
    318       // The state was not RUNNING, so there are no valid transitions.
    319       return false;
    320     }
    321   }
    322 }
    323