Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2009 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import static com.google.common.base.Preconditions.checkNotNull;
     20 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
     21 
     22 import com.google.common.annotations.Beta;
     23 
     24 import java.util.concurrent.Executor;
     25 import java.util.concurrent.Executors;
     26 import java.util.concurrent.Future;
     27 import java.util.concurrent.ThreadFactory;
     28 import java.util.concurrent.atomic.AtomicBoolean;
     29 
     30 /**
     31  * Utilities necessary for working with libraries that supply plain {@link
     32  * Future} instances. Note that, whenver possible, it is strongly preferred to
     33  * modify those libraries to return {@code ListenableFuture} directly.
     34  *
     35  * @author Sven Mawson
     36  * @since 10.0 (replacing {@code Futures.makeListenable}, which
     37  *     existed in 1.0)
     38  */
     39 @Beta
     40 public final class JdkFutureAdapters {
     41   /**
     42    * Assigns a thread to the given {@link Future} to provide {@link
     43    * ListenableFuture} functionality.
     44    *
     45    * <p><b>Warning:</b> If the input future does not already implement {@code
     46    * ListenableFuture}, the returned future will emulate {@link
     47    * ListenableFuture#addListener} by taking a thread from an internal,
     48    * unbounded pool at the first call to {@code addListener} and holding it
     49    * until the future is {@linkplain Future#isDone() done}.
     50    *
     51    * <p>Prefer to create {@code ListenableFuture} instances with {@link
     52    * SettableFuture}, {@link MoreExecutors#listeningDecorator(
     53    * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
     54    * {@link AbstractFuture}, and other utilities over creating plain {@code
     55    * Future} instances to be upgraded to {@code ListenableFuture} after the
     56    * fact.
     57    */
     58   public static <V> ListenableFuture<V> listenInPoolThread(
     59       Future<V> future) {
     60     if (future instanceof ListenableFuture) {
     61       return (ListenableFuture<V>) future;
     62     }
     63     return new ListenableFutureAdapter<V>(future);
     64   }
     65 
     66   /**
     67    * Submits a blocking task for the given {@link Future} to provide {@link
     68    * ListenableFuture} functionality.
     69    *
     70    * <p><b>Warning:</b> If the input future does not already implement {@code
     71    * ListenableFuture}, the returned future will emulate {@link
     72    * ListenableFuture#addListener} by submitting a task to the given executor at
     73    * the first call to {@code addListener}. The task must be started by the
     74    * executor promptly, or else the returned {@code ListenableFuture} may fail
     75    * to work.  The task's execution consists of blocking until the input future
     76    * is {@linkplain Future#isDone() done}, so each call to this method may
     77    * claim and hold a thread for an arbitrary length of time. Use of bounded
     78    * executors or other executors that may fail to execute a task promptly may
     79    * result in deadlocks.
     80    *
     81    * <p>Prefer to create {@code ListenableFuture} instances with {@link
     82    * SettableFuture}, {@link MoreExecutors#listeningDecorator(
     83    * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
     84    * {@link AbstractFuture}, and other utilities over creating plain {@code
     85    * Future} instances to be upgraded to {@code ListenableFuture} after the
     86    * fact.
     87    *
     88    * @since 12.0
     89    */
     90   public static <V> ListenableFuture<V> listenInPoolThread(
     91       Future<V> future, Executor executor) {
     92     checkNotNull(executor);
     93     if (future instanceof ListenableFuture) {
     94       return (ListenableFuture<V>) future;
     95     }
     96     return new ListenableFutureAdapter<V>(future, executor);
     97   }
     98 
     99   /**
    100    * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
    101    * will wait on the future to finish, and when it completes, run the
    102    * listeners.  This implementation will wait on the source future
    103    * indefinitely, so if the source future never completes, the adapter will
    104    * never complete either.
    105    *
    106    * <p>If the delegate future is interrupted or throws an unexpected unchecked
    107    * exception, the listeners will not be invoked.
    108    */
    109   private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
    110       implements ListenableFuture<V> {
    111 
    112     private static final ThreadFactory threadFactory =
    113         new ThreadFactoryBuilder()
    114             .setDaemon(true)
    115             .setNameFormat("ListenableFutureAdapter-thread-%d")
    116             .build();
    117     private static final Executor defaultAdapterExecutor =
    118         Executors.newCachedThreadPool(threadFactory);
    119 
    120     private final Executor adapterExecutor;
    121 
    122     // The execution list to hold our listeners.
    123     private final ExecutionList executionList = new ExecutionList();
    124 
    125     // This allows us to only start up a thread waiting on the delegate future
    126     // when the first listener is added.
    127     private final AtomicBoolean hasListeners = new AtomicBoolean(false);
    128 
    129     // The delegate future.
    130     private final Future<V> delegate;
    131 
    132     ListenableFutureAdapter(Future<V> delegate) {
    133       this(delegate, defaultAdapterExecutor);
    134     }
    135 
    136     ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
    137       this.delegate = checkNotNull(delegate);
    138       this.adapterExecutor = checkNotNull(adapterExecutor);
    139     }
    140 
    141     @Override
    142     protected Future<V> delegate() {
    143       return delegate;
    144     }
    145 
    146     @Override
    147     public void addListener(Runnable listener, Executor exec) {
    148       executionList.add(listener, exec);
    149 
    150       // When a listener is first added, we run a task that will wait for
    151       // the delegate to finish, and when it is done will run the listeners.
    152       if (hasListeners.compareAndSet(false, true)) {
    153         if (delegate.isDone()) {
    154           // If the delegate is already done, run the execution list
    155           // immediately on the current thread.
    156           executionList.execute();
    157           return;
    158         }
    159 
    160         adapterExecutor.execute(new Runnable() {
    161           @Override
    162           public void run() {
    163             try {
    164               /*
    165                * Threads from our private pool are never interrupted. Threads
    166                * from a user-supplied executor might be, but... what can we do?
    167                * This is another reason to return a proper ListenableFuture
    168                * instead of using listenInPoolThread.
    169                */
    170               getUninterruptibly(delegate);
    171             } catch (Error e) {
    172               throw e;
    173             } catch (Throwable e) {
    174               // ExecutionException / CancellationException / RuntimeException
    175               // The task is done, run the listeners.
    176             }
    177             executionList.execute();
    178           }
    179         });
    180       }
    181     }
    182   }
    183 
    184   private JdkFutureAdapters() {}
    185 }
    186