Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2009 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import static com.google.common.base.Preconditions.checkNotNull;
     20 
     21 import com.google.common.annotations.Beta;
     22 import com.google.common.annotations.VisibleForTesting;
     23 
     24 import java.util.concurrent.Executor;
     25 import java.util.concurrent.Executors;
     26 import java.util.concurrent.Future;
     27 import java.util.concurrent.ThreadFactory;
     28 import java.util.concurrent.atomic.AtomicBoolean;
     29 
     30 /**
     31  * Utilities necessary for working with libraries that supply plain {@link
     32  * Future} instances. Note that, whenver possible, it is strongly preferred to
     33  * modify those libraries to return {@code ListenableFuture} directly.
     34  *
     35  * @author Sven Mawson
     36  * @since 10.0 (replacing {@code Futures.makeListenable}, which
     37  *     existed in 1.0)
     38  */
     39 @Beta
     40 public final class JdkFutureAdapters {
     41   /**
     42    * Assigns a thread to the given {@link Future} to provide {@link
     43    * ListenableFuture} functionality.
     44    *
     45    * <p><b>Warning:</b> If the input future does not already implement {@link
     46    * ListenableFuture}, the returned future will emulate {@link
     47    * ListenableFuture#addListener} by taking a thread from an internal,
     48    * unbounded pool at the first call to {@code addListener} and holding it
     49    * until the future is {@linkplain Future#isDone() done}.
     50    *
     51    * <p>Prefer to create {@code ListenableFuture} instances with {@link
     52    * SettableFuture}, {@link MoreExecutors#listeningDecorator(
     53    * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
     54    * {@link AbstractFuture}, and other utilities over creating plain {@code
     55    * Future} instances to be upgraded to {@code ListenableFuture} after the
     56    * fact.
     57    */
     58   public static <V> ListenableFuture<V> listenInPoolThread(
     59       Future<V> future) {
     60     if (future instanceof ListenableFuture<?>) {
     61       return (ListenableFuture<V>) future;
     62     }
     63     return new ListenableFutureAdapter<V>(future);
     64   }
     65 
     66   @VisibleForTesting
     67   static <V> ListenableFuture<V> listenInPoolThread(
     68       Future<V> future, Executor executor) {
     69     checkNotNull(executor);
     70     if (future instanceof ListenableFuture<?>) {
     71       return (ListenableFuture<V>) future;
     72     }
     73     return new ListenableFutureAdapter<V>(future, executor);
     74   }
     75 
     76   /**
     77    * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
     78    * will wait on the future to finish, and when it completes, run the
     79    * listeners.  This implementation will wait on the source future
     80    * indefinitely, so if the source future never completes, the adapter will
     81    * never complete either.
     82    *
     83    * <p>If the delegate future is interrupted or throws an unexpected unchecked
     84    * exception, the listeners will not be invoked.
     85    */
     86   private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
     87       implements ListenableFuture<V> {
     88 
     89     private static final ThreadFactory threadFactory =
     90         new ThreadFactoryBuilder()
     91             .setDaemon(true)
     92             .setNameFormat("ListenableFutureAdapter-thread-%d")
     93             .build();
     94     private static final Executor defaultAdapterExecutor =
     95         Executors.newCachedThreadPool(threadFactory);
     96 
     97     private final Executor adapterExecutor;
     98 
     99     // The execution list to hold our listeners.
    100     private final ExecutionList executionList = new ExecutionList();
    101 
    102     // This allows us to only start up a thread waiting on the delegate future
    103     // when the first listener is added.
    104     private final AtomicBoolean hasListeners = new AtomicBoolean(false);
    105 
    106     // The delegate future.
    107     private final Future<V> delegate;
    108 
    109     ListenableFutureAdapter(Future<V> delegate) {
    110       this(delegate, defaultAdapterExecutor);
    111     }
    112 
    113     ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
    114       this.delegate = checkNotNull(delegate);
    115       this.adapterExecutor = checkNotNull(adapterExecutor);
    116     }
    117 
    118     @Override
    119     protected Future<V> delegate() {
    120       return delegate;
    121     }
    122 
    123     @Override
    124     public void addListener(Runnable listener, Executor exec) {
    125       executionList.add(listener, exec);
    126 
    127       // When a listener is first added, we run a task that will wait for
    128       // the delegate to finish, and when it is done will run the listeners.
    129       if (hasListeners.compareAndSet(false, true)) {
    130         if (delegate.isDone()) {
    131           // If the delegate is already done, run the execution list
    132           // immediately on the current thread.
    133           executionList.execute();
    134           return;
    135         }
    136 
    137         adapterExecutor.execute(new Runnable() {
    138           @Override
    139           public void run() {
    140             try {
    141               delegate.get();
    142             } catch (Error e) {
    143               throw e;
    144             } catch (InterruptedException e) {
    145               Thread.currentThread().interrupt();
    146               // Threads from our private pool are never interrupted.
    147               throw new AssertionError(e);
    148             } catch (Throwable e) {
    149               // ExecutionException / CancellationException / RuntimeException
    150               // The task is done, run the listeners.
    151             }
    152             executionList.execute();
    153           }
    154         });
    155       }
    156     }
    157   }
    158 
    159   private JdkFutureAdapters() {}
    160 }
    161