Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2006 Google Inc.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import com.google.common.base.Function;
     20 
     21 import java.lang.reflect.UndeclaredThrowableException;
     22 import java.util.concurrent.CancellationException;
     23 import java.util.concurrent.ExecutionException;
     24 import java.util.concurrent.Executor;
     25 import java.util.concurrent.Future;
     26 import java.util.concurrent.TimeUnit;
     27 import java.util.concurrent.TimeoutException;
     28 import java.util.concurrent.atomic.AtomicBoolean;
     29 
     30 import javax.annotation.Nullable;
     31 
     32 import static com.google.common.base.Preconditions.checkNotNull;
     33 
     34 /**
     35  * Static utility methods pertaining to the {@link Future} interface.
     36  *
     37  * @author Kevin Bourrillion
     38  * @author Nishant Thakkar
     39  * @author Sven Mawson
     40  * @since 2009.09.15 <b>tentative</b>
     41  */
     42 public class Futures {
     43   private Futures() {}
     44 
     45   /**
     46    * Returns an uninterruptible view of a {@code Future}. If a thread is
     47    * interrupted during an attempt to {@code get()} from the returned future, it
     48    * continues to wait on the result until it is available or the timeout
     49    * elapses, and only then re-interrupts the thread.
     50    */
     51   public static <V> UninterruptibleFuture<V> makeUninterruptible(
     52       final Future<V> future) {
     53     checkNotNull(future);
     54     if (future instanceof UninterruptibleFuture) {
     55       return (UninterruptibleFuture<V>) future;
     56     }
     57     return new UninterruptibleFuture<V>() {
     58       public boolean cancel(boolean mayInterruptIfRunning) {
     59         return future.cancel(mayInterruptIfRunning);
     60       }
     61       public boolean isCancelled() {
     62         return future.isCancelled();
     63       }
     64       public boolean isDone() {
     65         return future.isDone();
     66       }
     67 
     68       public V get(long timeoutDuration, TimeUnit timeoutUnit)
     69           throws TimeoutException, ExecutionException {
     70         boolean interrupted = false;
     71         try {
     72           long timeoutNanos = timeoutUnit.toNanos(timeoutDuration);
     73           long end = System.nanoTime() + timeoutNanos;
     74           for (long remaining = timeoutNanos; remaining > 0;
     75               remaining = end - System.nanoTime()) {
     76             try {
     77               return future.get(remaining, TimeUnit.NANOSECONDS);
     78             } catch (InterruptedException ignored) {
     79               interrupted = true;
     80             }
     81           }
     82           throw new TimeoutException();
     83         } finally {
     84           if (interrupted) {
     85             Thread.currentThread().interrupt();
     86           }
     87         }
     88       }
     89 
     90       public V get() throws ExecutionException {
     91         boolean interrupted = false;
     92         try {
     93           while (true) {
     94             try {
     95               return future.get();
     96             } catch (InterruptedException ignored) {
     97               interrupted = true;
     98             }
     99           }
    100         } finally {
    101           if (interrupted) {
    102             Thread.currentThread().interrupt();
    103           }
    104         }
    105       }
    106     };
    107   }
    108 
    109   /**
    110    * Creates a {@link ListenableFuture} out of a normal {@link Future}. The
    111    * returned future will create a thread to wait for the source future to
    112    * complete before executing the listeners.
    113    *
    114    * <p>Callers who have a future that subclasses
    115    * {@link java.util.concurrent.FutureTask} may want to instead subclass
    116    * {@link ListenableFutureTask}, which adds the {@link ListenableFuture}
    117    * functionality to the standard {@code FutureTask} implementation.
    118    */
    119   public static <T> ListenableFuture<T> makeListenable(Future<T> future) {
    120     if (future instanceof ListenableFuture) {
    121       return (ListenableFuture<T>) future;
    122     }
    123     return new ListenableFutureAdapter<T>(future);
    124   }
    125 
    126   /**
    127    * Creates a {@link CheckedFuture} out of a normal {@link Future} and a
    128    * {@link Function} that maps from {@link Exception} instances into the
    129    * appropriate checked type.
    130    *
    131    * <p>The given mapping function will be applied to an
    132    * {@link InterruptedException}, a {@link CancellationException}, or an
    133    * {@link ExecutionException} with the actual cause of the exception.
    134    * See {@link Future#get()} for details on the exceptions thrown.
    135    */
    136   public static <T, E extends Exception> CheckedFuture<T, E> makeChecked(
    137       Future<T> future, Function<Exception, E> mapper) {
    138     return new MappingCheckedFuture<T, E>(makeListenable(future), mapper);
    139   }
    140 
    141   /**
    142    * Creates a {@code ListenableFuture} which has its value set immediately upon
    143    * construction. The getters just return the value. This {@code Future} can't
    144    * be canceled or timed out and its {@code isDone()} method always returns
    145    * {@code true}. It's useful for returning something that implements the
    146    * {@code ListenableFuture} interface but already has the result.
    147    */
    148   public static <T> ListenableFuture<T> immediateFuture(@Nullable T value) {
    149     ValueFuture<T> future = ValueFuture.create();
    150     future.set(value);
    151     return future;
    152   }
    153 
    154   /**
    155    * Creates a {@code CheckedFuture} which has its value set immediately upon
    156    * construction. The getters just return the value. This {@code Future} can't
    157    * be canceled or timed out and its {@code isDone()} method always returns
    158    * {@code true}. It's useful for returning something that implements the
    159    * {@code CheckedFuture} interface but already has the result.
    160    */
    161   public static <T, E extends Exception> CheckedFuture<T, E>
    162       immediateCheckedFuture(@Nullable T value) {
    163     ValueFuture<T> future = ValueFuture.create();
    164     future.set(value);
    165     return Futures.makeChecked(future, new Function<Exception, E>() {
    166       public E apply(Exception e) {
    167         throw new AssertionError("impossible");
    168       }
    169     });
    170   }
    171 
    172   /**
    173    * Creates a {@code ListenableFuture} which has an exception set immediately
    174    * upon construction. The getters just return the value. This {@code Future}
    175    * can't be canceled or timed out and its {@code isDone()} method always
    176    * returns {@code true}. It's useful for returning something that implements
    177    * the {@code ListenableFuture} interface but already has a failed
    178    * result. Calling {@code get()} will throw the provided {@code Throwable}
    179    * (wrapped in an {@code ExecutionException}).
    180    *
    181    * @throws Error if the throwable was an {@link Error}.
    182    */
    183   public static <T> ListenableFuture<T> immediateFailedFuture(
    184       Throwable throwable) {
    185     checkNotNull(throwable);
    186     ValueFuture<T> future = ValueFuture.create();
    187     future.setException(throwable);
    188     return future;
    189   }
    190 
    191   /**
    192    * Creates a {@code CheckedFuture} which has an exception set immediately
    193    * upon construction. The getters just return the value. This {@code Future}
    194    * can't be canceled or timed out and its {@code isDone()} method always
    195    * returns {@code true}. It's useful for returning something that implements
    196    * the {@code CheckedFuture} interface but already has a failed result.
    197    * Calling {@code get()} will throw the provided {@code Throwable} (wrapped in
    198    * an {@code ExecutionException}) and calling {@code checkedGet()} will throw
    199    * the provided exception itself.
    200    *
    201    * @throws Error if the throwable was an {@link Error}.
    202    */
    203   public static <T, E extends Exception> CheckedFuture<T, E>
    204       immediateFailedCheckedFuture(final E exception) {
    205     checkNotNull(exception);
    206     return makeChecked(Futures.<T>immediateFailedFuture(exception),
    207         new Function<Exception, E>() {
    208           public E apply(Exception e) {
    209             return exception;
    210           }
    211         });
    212   }
    213 
    214   /**
    215    * Creates a new {@code ListenableFuture} that wraps another
    216    * {@code ListenableFuture}.  The result of the new future is the result of
    217    * the provided function called on the result of the provided future.
    218    * The resulting future doesn't interrupt when aborted.
    219    *
    220    * <p>TODO: Add a version that accepts a normal {@code Future}
    221    *
    222    * <p>The typical use for this method would be when a RPC call is dependent on
    223    * the results of another RPC.  One would call the first RPC (input), create a
    224    * function that calls another RPC based on input's result, and then call
    225    * chain on input and that function to get a {@code ListenableFuture} of
    226    * the result.
    227    *
    228    * @param input The future to chain
    229    * @param function A function to chain the results of the provided future
    230    *     to the results of the returned future.  This will be run in the thread
    231    *     that notifies input it is complete.
    232    * @return A future that holds result of the chain.
    233    */
    234   public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
    235       Function<? super I, ? extends ListenableFuture<? extends O>> function) {
    236     return chain(input, function, Executors.sameThreadExecutor());
    237   }
    238 
    239   /**
    240    * Creates a new {@code ListenableFuture} that wraps another
    241    * {@code ListenableFuture}.  The result of the new future is the result of
    242    * the provided function called on the result of the provided future.
    243    * The resulting future doesn't interrupt when aborted.
    244    *
    245    * <p>This version allows an arbitrary executor to be passed in for running
    246    * the chained Function. When using {@link Executors#sameThreadExecutor}, the
    247    * thread chained Function executes in will be whichever thread set the
    248    * result of the input Future, which may be the network thread in the case of
    249    * RPC-based Futures.
    250    *
    251    * @param input The future to chain
    252    * @param function A function to chain the results of the provided future
    253    *     to the results of the returned future.
    254    * @param exec Executor to run the function in.
    255    * @return A future that holds result of the chain.
    256    */
    257   public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
    258       Function<? super I, ? extends ListenableFuture<? extends O>> function,
    259       Executor exec) {
    260     ChainingListenableFuture<I, O> chain =
    261         new ChainingListenableFuture<I, O>(function, input);
    262     input.addListener(chain, exec);
    263     return chain;
    264   }
    265 
    266   /**
    267    * Creates a new {@code ListenableFuture} that wraps another
    268    * {@code ListenableFuture}.  The result of the new future is the result of
    269    * the provided function called on the result of the provided future.
    270    * The resulting future doesn't interrupt when aborted.
    271    *
    272    * <p>An example use of this method is to convert a serializable object
    273    * returned from an RPC into a POJO.
    274    *
    275    * @param future The future to compose
    276    * @param function A Function to compose the results of the provided future
    277    *     to the results of the returned future.  This will be run in the thread
    278    *     that notifies input it is complete.
    279    * @return A future that holds result of the composition.
    280    */
    281   public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future,
    282       final Function<? super I, ? extends O> function) {
    283     return compose(future, function, Executors.sameThreadExecutor());
    284   }
    285 
    286   /**
    287    * Creates a new {@code ListenableFuture} that wraps another
    288    * {@code ListenableFuture}.  The result of the new future is the result of
    289    * the provided function called on the result of the provided future.
    290    * The resulting future doesn't interrupt when aborted.
    291    *
    292    * <p>An example use of this method is to convert a serializable object
    293    * returned from an RPC into a POJO.
    294    *
    295    * <p>This version allows an arbitrary executor to be passed in for running
    296    * the chained Function. When using {@link Executors#sameThreadExecutor}, the
    297    * thread chained Function executes in will be whichever thread set the result
    298    * of the input Future, which may be the network thread in the case of
    299    * RPC-based Futures.
    300    *
    301    * @param future The future to compose
    302    * @param function A Function to compose the results of the provided future
    303    *     to the results of the returned future.
    304    * @param exec Executor to run the function in.
    305    * @return A future that holds result of the composition.
    306    * @since 2010.01.04 <b>tentative</b>
    307    */
    308   public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future,
    309       final Function<? super I, ? extends O> function, Executor exec) {
    310     Function<I, ListenableFuture<O>> wrapperFunction
    311         = new Function<I, ListenableFuture<O>>() {
    312             /*@Override*/ public ListenableFuture<O> apply(I input) {
    313               O output = function.apply(input);
    314               return immediateFuture(output);
    315             }
    316         };
    317     return chain(future, wrapperFunction, exec);
    318   }
    319 
    320   /**
    321    * Creates a new {@code Future} that wraps another {@code Future}.
    322    * The result of the new future is the result of the provided function called
    323    * on the result of the provided future.
    324    *
    325    * <p>An example use of this method is to convert a Future that produces a
    326    * handle to an object to a future that produces the object itself.
    327    *
    328    * <p>Each call to {@code Future<O>.get(*)} results in a call to
    329    * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it
    330    * is assumed that {@code Future<I>.get(*)} is idempotent.
    331    *
    332    * <p>When calling {@link Future#get(long, TimeUnit)} on the returned
    333    * future, the timeout only applies to the future passed in to this method.
    334    * Any additional time taken by applying {@code function} is not considered.
    335    *
    336    * @param future The future to compose
    337    * @param function A Function to compose the results of the provided future
    338    *     to the results of the returned future.  This will be run in the thread
    339    *     that calls one of the varieties of {@code get()}.
    340    * @return A future that computes result of the composition.
    341    */
    342   public static <I, O> Future<O> compose(final Future<I> future,
    343       final Function<? super I, ? extends O> function) {
    344 
    345     return new Future<O>() {
    346 
    347       /*
    348        * Concurrency detail:
    349        *
    350        * <p>To preserve the idempotency of calls to this.get(*) calls to the
    351        * function are only applied once. A lock is required to prevent multiple
    352        * applications of the function. The calls to future.get(*) are performed
    353        * outside the lock, as is required to prevent calls to
    354        * get(long, TimeUnit) to persist beyond their timeout.
    355        *
    356        * <p>Calls to future.get(*) on every call to this.get(*) also provide
    357        * the cancellation behavior for this.
    358        *
    359        * <p>(Consider: in thread A, call get(), in thread B call get(long,
    360        * TimeUnit). Thread B may have to wait for Thread A to finish, which
    361        * would be unacceptable.)
    362        *
    363        * <p>Note that each call to Future<O>.get(*) results in a call to
    364        * Future<I>.get(*), but the function is only applied once, so
    365        * Future<I>.get(*) is assumed to be idempotent.
    366        */
    367 
    368       private final Object lock = new Object();
    369       private boolean set = false;
    370       private O value = null;
    371 
    372       /*@Override*/
    373       public O get() throws InterruptedException, ExecutionException {
    374         return apply(future.get());
    375       }
    376 
    377       /*@Override*/
    378       public O get(long timeout, TimeUnit unit) throws InterruptedException,
    379           ExecutionException, TimeoutException {
    380         return apply(future.get(timeout, unit));
    381       }
    382 
    383       private O apply(I raw) {
    384         synchronized(lock) {
    385           if (!set) {
    386             value = function.apply(raw);
    387             set = true;
    388           }
    389           return value;
    390         }
    391       }
    392 
    393       /*@Override*/
    394       public boolean cancel(boolean mayInterruptIfRunning) {
    395         return future.cancel(mayInterruptIfRunning);
    396       }
    397 
    398       /*@Override*/
    399       public boolean isCancelled() {
    400         return future.isCancelled();
    401       }
    402 
    403       /*@Override*/
    404       public boolean isDone() {
    405         return future.isDone();
    406       }
    407     };
    408   }
    409 
    410   /**
    411    * An implementation of {@code ListenableFuture} that also implements
    412    * {@code Runnable} so that it can be used to nest ListenableFutures.
    413    * Once the passed-in {@code ListenableFuture} is complete, it calls the
    414    * passed-in {@code Function} to generate the result.
    415    * The resulting future doesn't interrupt when aborted.
    416    *
    417    * <p>If the function throws any checked exceptions, they should be wrapped
    418    * in a {@code UndeclaredThrowableException} so that this class can get
    419    * access to the cause.
    420    */
    421   private static class ChainingListenableFuture<I, O>
    422       extends AbstractListenableFuture<O> implements Runnable {
    423 
    424     private final Function<? super I, ? extends ListenableFuture<? extends O>>
    425         function;
    426     private final UninterruptibleFuture<? extends I> inputFuture;
    427 
    428     private ChainingListenableFuture(
    429         Function<? super I, ? extends ListenableFuture<? extends O>> function,
    430         ListenableFuture<? extends I> inputFuture) {
    431       this.function = function;
    432       this.inputFuture = makeUninterruptible(inputFuture);
    433     }
    434 
    435     public void run() {
    436       try {
    437         I sourceResult;
    438         try {
    439           sourceResult = inputFuture.get();
    440         } catch (CancellationException e) {
    441           // Cancel this future and return.
    442           cancel();
    443           return;
    444         } catch (ExecutionException e) {
    445           // Set the cause of the exception as this future's exception
    446           setException(e.getCause());
    447           return;
    448         }
    449 
    450         final ListenableFuture<? extends O> outputFuture =
    451             function.apply(sourceResult);
    452         outputFuture.addListener(new Runnable() {
    453             public void run() {
    454               try {
    455                 // Here it would have been nice to have had an
    456                 // UninterruptibleListenableFuture, but we don't want to start a
    457                 // combinatorial explosion of interfaces, so we have to make do.
    458                 set(makeUninterruptible(outputFuture).get());
    459               } catch (ExecutionException e) {
    460                 // Set the cause of the exception as this future's exception
    461                 setException(e.getCause());
    462               }
    463             }
    464           }, Executors.sameThreadExecutor());
    465       } catch (UndeclaredThrowableException e) {
    466         // Set the cause of the exception as this future's exception
    467         setException(e.getCause());
    468       } catch (RuntimeException e) {
    469         // This exception is irrelevant in this thread, but useful for the
    470         // client
    471         setException(e);
    472       } catch (Error e) {
    473         // This seems evil, but the client needs to know an error occured and
    474         // the error needs to be propagated ASAP.
    475         setException(e);
    476         throw e;
    477       }
    478     }
    479   }
    480 
    481   /**
    482    * A checked future that uses a function to map from exceptions to the
    483    * appropriate checked type.
    484    */
    485   private static class MappingCheckedFuture<T, E extends Exception> extends
    486       AbstractCheckedFuture<T, E> {
    487 
    488     final Function<Exception, E> mapper;
    489 
    490     MappingCheckedFuture(ListenableFuture<T> delegate,
    491         Function<Exception, E> mapper) {
    492       super(delegate);
    493 
    494       this.mapper = mapper;
    495     }
    496 
    497     @Override
    498     protected E mapException(Exception e) {
    499       return mapper.apply(e);
    500     }
    501   }
    502 
    503   /**
    504    * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
    505    * will wait on the future to finish, and when it completes, run the
    506    * listeners.  This implementation will wait on the source future
    507    * indefinitely, so if the source future never completes, the adapter will
    508    * never complete either.
    509    *
    510    * <p>If the delegate future is interrupted or throws an unexpected unchecked
    511    * exception, the listeners will not be invoked.
    512    */
    513   private static class ListenableFutureAdapter<T> extends ForwardingFuture<T>
    514       implements ListenableFuture<T> {
    515 
    516     private static final Executor adapterExecutor =
    517         java.util.concurrent.Executors.newCachedThreadPool();
    518 
    519     // The execution list to hold our listeners.
    520     private final ExecutionList executionList = new ExecutionList();
    521 
    522     // This allows us to only start up a thread waiting on the delegate future
    523     // when the first listener is added.
    524     private final AtomicBoolean hasListeners = new AtomicBoolean(false);
    525 
    526     // The delegate future.
    527     private final Future<T> delegate;
    528 
    529     ListenableFutureAdapter(final Future<T> delegate) {
    530       this.delegate = delegate;
    531     }
    532 
    533     @Override
    534     protected Future<T> delegate() {
    535       return delegate;
    536     }
    537 
    538     /*@Override*/
    539     public void addListener(Runnable listener, Executor exec) {
    540 
    541       // When a listener is first added, we run a task that will wait for
    542       // the delegate to finish, and when it is done will run the listeners.
    543       if (!hasListeners.get() && hasListeners.compareAndSet(false, true)) {
    544         adapterExecutor.execute(new Runnable() {
    545           /*@Override*/
    546           public void run() {
    547             try {
    548               delegate.get();
    549             } catch (CancellationException e) {
    550               // The task was cancelled, so it is done, run the listeners.
    551             } catch (InterruptedException e) {
    552               // This thread was interrupted.  This should never happen, so we
    553               // throw an IllegalStateException.
    554               throw new IllegalStateException("Adapter thread interrupted!", e);
    555             } catch (ExecutionException e) {
    556               // The task caused an exception, so it is done, run the listeners.
    557             }
    558             executionList.run();
    559           }
    560         });
    561       }
    562       executionList.add(listener, exec);
    563     }
    564   }
    565 }
    566