1 /* 2 * Copyright (C) 2009 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 21 22 import com.google.common.annotations.Beta; 23 24 import java.util.concurrent.Executor; 25 import java.util.concurrent.Executors; 26 import java.util.concurrent.Future; 27 import java.util.concurrent.ThreadFactory; 28 import java.util.concurrent.atomic.AtomicBoolean; 29 30 /** 31 * Utilities necessary for working with libraries that supply plain {@link 32 * Future} instances. Note that, whenver possible, it is strongly preferred to 33 * modify those libraries to return {@code ListenableFuture} directly. 34 * 35 * @author Sven Mawson 36 * @since 10.0 (replacing {@code Futures.makeListenable}, which 37 * existed in 1.0) 38 */ 39 @Beta 40 public final class JdkFutureAdapters { 41 /** 42 * Assigns a thread to the given {@link Future} to provide {@link 43 * ListenableFuture} functionality. 44 * 45 * <p><b>Warning:</b> If the input future does not already implement {@code 46 * ListenableFuture}, the returned future will emulate {@link 47 * ListenableFuture#addListener} by taking a thread from an internal, 48 * unbounded pool at the first call to {@code addListener} and holding it 49 * until the future is {@linkplain Future#isDone() done}. 50 * 51 * <p>Prefer to create {@code ListenableFuture} instances with {@link 52 * SettableFuture}, {@link MoreExecutors#listeningDecorator( 53 * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask}, 54 * {@link AbstractFuture}, and other utilities over creating plain {@code 55 * Future} instances to be upgraded to {@code ListenableFuture} after the 56 * fact. 57 */ 58 public static <V> ListenableFuture<V> listenInPoolThread( 59 Future<V> future) { 60 if (future instanceof ListenableFuture) { 61 return (ListenableFuture<V>) future; 62 } 63 return new ListenableFutureAdapter<V>(future); 64 } 65 66 /** 67 * Submits a blocking task for the given {@link Future} to provide {@link 68 * ListenableFuture} functionality. 69 * 70 * <p><b>Warning:</b> If the input future does not already implement {@code 71 * ListenableFuture}, the returned future will emulate {@link 72 * ListenableFuture#addListener} by submitting a task to the given executor at 73 * the first call to {@code addListener}. The task must be started by the 74 * executor promptly, or else the returned {@code ListenableFuture} may fail 75 * to work. The task's execution consists of blocking until the input future 76 * is {@linkplain Future#isDone() done}, so each call to this method may 77 * claim and hold a thread for an arbitrary length of time. Use of bounded 78 * executors or other executors that may fail to execute a task promptly may 79 * result in deadlocks. 80 * 81 * <p>Prefer to create {@code ListenableFuture} instances with {@link 82 * SettableFuture}, {@link MoreExecutors#listeningDecorator( 83 * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask}, 84 * {@link AbstractFuture}, and other utilities over creating plain {@code 85 * Future} instances to be upgraded to {@code ListenableFuture} after the 86 * fact. 87 * 88 * @since 12.0 89 */ 90 public static <V> ListenableFuture<V> listenInPoolThread( 91 Future<V> future, Executor executor) { 92 checkNotNull(executor); 93 if (future instanceof ListenableFuture) { 94 return (ListenableFuture<V>) future; 95 } 96 return new ListenableFutureAdapter<V>(future, executor); 97 } 98 99 /** 100 * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This 101 * will wait on the future to finish, and when it completes, run the 102 * listeners. This implementation will wait on the source future 103 * indefinitely, so if the source future never completes, the adapter will 104 * never complete either. 105 * 106 * <p>If the delegate future is interrupted or throws an unexpected unchecked 107 * exception, the listeners will not be invoked. 108 */ 109 private static class ListenableFutureAdapter<V> extends ForwardingFuture<V> 110 implements ListenableFuture<V> { 111 112 private static final ThreadFactory threadFactory = 113 new ThreadFactoryBuilder() 114 .setDaemon(true) 115 .setNameFormat("ListenableFutureAdapter-thread-%d") 116 .build(); 117 private static final Executor defaultAdapterExecutor = 118 Executors.newCachedThreadPool(threadFactory); 119 120 private final Executor adapterExecutor; 121 122 // The execution list to hold our listeners. 123 private final ExecutionList executionList = new ExecutionList(); 124 125 // This allows us to only start up a thread waiting on the delegate future 126 // when the first listener is added. 127 private final AtomicBoolean hasListeners = new AtomicBoolean(false); 128 129 // The delegate future. 130 private final Future<V> delegate; 131 132 ListenableFutureAdapter(Future<V> delegate) { 133 this(delegate, defaultAdapterExecutor); 134 } 135 136 ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) { 137 this.delegate = checkNotNull(delegate); 138 this.adapterExecutor = checkNotNull(adapterExecutor); 139 } 140 141 @Override 142 protected Future<V> delegate() { 143 return delegate; 144 } 145 146 @Override 147 public void addListener(Runnable listener, Executor exec) { 148 executionList.add(listener, exec); 149 150 // When a listener is first added, we run a task that will wait for 151 // the delegate to finish, and when it is done will run the listeners. 152 if (hasListeners.compareAndSet(false, true)) { 153 if (delegate.isDone()) { 154 // If the delegate is already done, run the execution list 155 // immediately on the current thread. 156 executionList.execute(); 157 return; 158 } 159 160 adapterExecutor.execute(new Runnable() { 161 @Override 162 public void run() { 163 try { 164 /* 165 * Threads from our private pool are never interrupted. Threads 166 * from a user-supplied executor might be, but... what can we do? 167 * This is another reason to return a proper ListenableFuture 168 * instead of using listenInPoolThread. 169 */ 170 getUninterruptibly(delegate); 171 } catch (Error e) { 172 throw e; 173 } catch (Throwable e) { 174 // ExecutionException / CancellationException / RuntimeException 175 // The task is done, run the listeners. 176 } 177 executionList.execute(); 178 } 179 }); 180 } 181 } 182 } 183 184 private JdkFutureAdapters() {} 185 } 186