1 /* 2 * Copyright (C) 2006 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import com.google.common.base.Function; 20 21 import java.lang.reflect.UndeclaredThrowableException; 22 import java.util.concurrent.CancellationException; 23 import java.util.concurrent.ExecutionException; 24 import java.util.concurrent.Executor; 25 import java.util.concurrent.Future; 26 import java.util.concurrent.TimeUnit; 27 import java.util.concurrent.TimeoutException; 28 import java.util.concurrent.atomic.AtomicBoolean; 29 30 import javax.annotation.Nullable; 31 32 import static com.google.common.base.Preconditions.checkNotNull; 33 34 /** 35 * Static utility methods pertaining to the {@link Future} interface. 36 * 37 * @author Kevin Bourrillion 38 * @author Nishant Thakkar 39 * @author Sven Mawson 40 * @since 2009.09.15 <b>tentative</b> 41 */ 42 public class Futures { 43 private Futures() {} 44 45 /** 46 * Returns an uninterruptible view of a {@code Future}. If a thread is 47 * interrupted during an attempt to {@code get()} from the returned future, it 48 * continues to wait on the result until it is available or the timeout 49 * elapses, and only then re-interrupts the thread. 50 */ 51 public static <V> UninterruptibleFuture<V> makeUninterruptible( 52 final Future<V> future) { 53 checkNotNull(future); 54 if (future instanceof UninterruptibleFuture) { 55 return (UninterruptibleFuture<V>) future; 56 } 57 return new UninterruptibleFuture<V>() { 58 public boolean cancel(boolean mayInterruptIfRunning) { 59 return future.cancel(mayInterruptIfRunning); 60 } 61 public boolean isCancelled() { 62 return future.isCancelled(); 63 } 64 public boolean isDone() { 65 return future.isDone(); 66 } 67 68 public V get(long timeoutDuration, TimeUnit timeoutUnit) 69 throws TimeoutException, ExecutionException { 70 boolean interrupted = false; 71 try { 72 long timeoutNanos = timeoutUnit.toNanos(timeoutDuration); 73 long end = System.nanoTime() + timeoutNanos; 74 for (long remaining = timeoutNanos; remaining > 0; 75 remaining = end - System.nanoTime()) { 76 try { 77 return future.get(remaining, TimeUnit.NANOSECONDS); 78 } catch (InterruptedException ignored) { 79 interrupted = true; 80 } 81 } 82 throw new TimeoutException(); 83 } finally { 84 if (interrupted) { 85 Thread.currentThread().interrupt(); 86 } 87 } 88 } 89 90 public V get() throws ExecutionException { 91 boolean interrupted = false; 92 try { 93 while (true) { 94 try { 95 return future.get(); 96 } catch (InterruptedException ignored) { 97 interrupted = true; 98 } 99 } 100 } finally { 101 if (interrupted) { 102 Thread.currentThread().interrupt(); 103 } 104 } 105 } 106 }; 107 } 108 109 /** 110 * Creates a {@link ListenableFuture} out of a normal {@link Future}. The 111 * returned future will create a thread to wait for the source future to 112 * complete before executing the listeners. 113 * 114 * <p>Callers who have a future that subclasses 115 * {@link java.util.concurrent.FutureTask} may want to instead subclass 116 * {@link ListenableFutureTask}, which adds the {@link ListenableFuture} 117 * functionality to the standard {@code FutureTask} implementation. 118 */ 119 public static <T> ListenableFuture<T> makeListenable(Future<T> future) { 120 if (future instanceof ListenableFuture) { 121 return (ListenableFuture<T>) future; 122 } 123 return new ListenableFutureAdapter<T>(future); 124 } 125 126 /** 127 * Creates a {@link CheckedFuture} out of a normal {@link Future} and a 128 * {@link Function} that maps from {@link Exception} instances into the 129 * appropriate checked type. 130 * 131 * <p>The given mapping function will be applied to an 132 * {@link InterruptedException}, a {@link CancellationException}, or an 133 * {@link ExecutionException} with the actual cause of the exception. 134 * See {@link Future#get()} for details on the exceptions thrown. 135 */ 136 public static <T, E extends Exception> CheckedFuture<T, E> makeChecked( 137 Future<T> future, Function<Exception, E> mapper) { 138 return new MappingCheckedFuture<T, E>(makeListenable(future), mapper); 139 } 140 141 /** 142 * Creates a {@code ListenableFuture} which has its value set immediately upon 143 * construction. The getters just return the value. This {@code Future} can't 144 * be canceled or timed out and its {@code isDone()} method always returns 145 * {@code true}. It's useful for returning something that implements the 146 * {@code ListenableFuture} interface but already has the result. 147 */ 148 public static <T> ListenableFuture<T> immediateFuture(@Nullable T value) { 149 ValueFuture<T> future = ValueFuture.create(); 150 future.set(value); 151 return future; 152 } 153 154 /** 155 * Creates a {@code CheckedFuture} which has its value set immediately upon 156 * construction. The getters just return the value. This {@code Future} can't 157 * be canceled or timed out and its {@code isDone()} method always returns 158 * {@code true}. It's useful for returning something that implements the 159 * {@code CheckedFuture} interface but already has the result. 160 */ 161 public static <T, E extends Exception> CheckedFuture<T, E> 162 immediateCheckedFuture(@Nullable T value) { 163 ValueFuture<T> future = ValueFuture.create(); 164 future.set(value); 165 return Futures.makeChecked(future, new Function<Exception, E>() { 166 public E apply(Exception e) { 167 throw new AssertionError("impossible"); 168 } 169 }); 170 } 171 172 /** 173 * Creates a {@code ListenableFuture} which has an exception set immediately 174 * upon construction. The getters just return the value. This {@code Future} 175 * can't be canceled or timed out and its {@code isDone()} method always 176 * returns {@code true}. It's useful for returning something that implements 177 * the {@code ListenableFuture} interface but already has a failed 178 * result. Calling {@code get()} will throw the provided {@code Throwable} 179 * (wrapped in an {@code ExecutionException}). 180 * 181 * @throws Error if the throwable was an {@link Error}. 182 */ 183 public static <T> ListenableFuture<T> immediateFailedFuture( 184 Throwable throwable) { 185 checkNotNull(throwable); 186 ValueFuture<T> future = ValueFuture.create(); 187 future.setException(throwable); 188 return future; 189 } 190 191 /** 192 * Creates a {@code CheckedFuture} which has an exception set immediately 193 * upon construction. The getters just return the value. This {@code Future} 194 * can't be canceled or timed out and its {@code isDone()} method always 195 * returns {@code true}. It's useful for returning something that implements 196 * the {@code CheckedFuture} interface but already has a failed result. 197 * Calling {@code get()} will throw the provided {@code Throwable} (wrapped in 198 * an {@code ExecutionException}) and calling {@code checkedGet()} will throw 199 * the provided exception itself. 200 * 201 * @throws Error if the throwable was an {@link Error}. 202 */ 203 public static <T, E extends Exception> CheckedFuture<T, E> 204 immediateFailedCheckedFuture(final E exception) { 205 checkNotNull(exception); 206 return makeChecked(Futures.<T>immediateFailedFuture(exception), 207 new Function<Exception, E>() { 208 public E apply(Exception e) { 209 return exception; 210 } 211 }); 212 } 213 214 /** 215 * Creates a new {@code ListenableFuture} that wraps another 216 * {@code ListenableFuture}. The result of the new future is the result of 217 * the provided function called on the result of the provided future. 218 * The resulting future doesn't interrupt when aborted. 219 * 220 * <p>TODO: Add a version that accepts a normal {@code Future} 221 * 222 * <p>The typical use for this method would be when a RPC call is dependent on 223 * the results of another RPC. One would call the first RPC (input), create a 224 * function that calls another RPC based on input's result, and then call 225 * chain on input and that function to get a {@code ListenableFuture} of 226 * the result. 227 * 228 * @param input The future to chain 229 * @param function A function to chain the results of the provided future 230 * to the results of the returned future. This will be run in the thread 231 * that notifies input it is complete. 232 * @return A future that holds result of the chain. 233 */ 234 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, 235 Function<? super I, ? extends ListenableFuture<? extends O>> function) { 236 return chain(input, function, Executors.sameThreadExecutor()); 237 } 238 239 /** 240 * Creates a new {@code ListenableFuture} that wraps another 241 * {@code ListenableFuture}. The result of the new future is the result of 242 * the provided function called on the result of the provided future. 243 * The resulting future doesn't interrupt when aborted. 244 * 245 * <p>This version allows an arbitrary executor to be passed in for running 246 * the chained Function. When using {@link Executors#sameThreadExecutor}, the 247 * thread chained Function executes in will be whichever thread set the 248 * result of the input Future, which may be the network thread in the case of 249 * RPC-based Futures. 250 * 251 * @param input The future to chain 252 * @param function A function to chain the results of the provided future 253 * to the results of the returned future. 254 * @param exec Executor to run the function in. 255 * @return A future that holds result of the chain. 256 */ 257 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, 258 Function<? super I, ? extends ListenableFuture<? extends O>> function, 259 Executor exec) { 260 ChainingListenableFuture<I, O> chain = 261 new ChainingListenableFuture<I, O>(function, input); 262 input.addListener(chain, exec); 263 return chain; 264 } 265 266 /** 267 * Creates a new {@code ListenableFuture} that wraps another 268 * {@code ListenableFuture}. The result of the new future is the result of 269 * the provided function called on the result of the provided future. 270 * The resulting future doesn't interrupt when aborted. 271 * 272 * <p>An example use of this method is to convert a serializable object 273 * returned from an RPC into a POJO. 274 * 275 * @param future The future to compose 276 * @param function A Function to compose the results of the provided future 277 * to the results of the returned future. This will be run in the thread 278 * that notifies input it is complete. 279 * @return A future that holds result of the composition. 280 */ 281 public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future, 282 final Function<? super I, ? extends O> function) { 283 return compose(future, function, Executors.sameThreadExecutor()); 284 } 285 286 /** 287 * Creates a new {@code ListenableFuture} that wraps another 288 * {@code ListenableFuture}. The result of the new future is the result of 289 * the provided function called on the result of the provided future. 290 * The resulting future doesn't interrupt when aborted. 291 * 292 * <p>An example use of this method is to convert a serializable object 293 * returned from an RPC into a POJO. 294 * 295 * <p>This version allows an arbitrary executor to be passed in for running 296 * the chained Function. When using {@link Executors#sameThreadExecutor}, the 297 * thread chained Function executes in will be whichever thread set the result 298 * of the input Future, which may be the network thread in the case of 299 * RPC-based Futures. 300 * 301 * @param future The future to compose 302 * @param function A Function to compose the results of the provided future 303 * to the results of the returned future. 304 * @param exec Executor to run the function in. 305 * @return A future that holds result of the composition. 306 * @since 2010.01.04 <b>tentative</b> 307 */ 308 public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future, 309 final Function<? super I, ? extends O> function, Executor exec) { 310 Function<I, ListenableFuture<O>> wrapperFunction 311 = new Function<I, ListenableFuture<O>>() { 312 /*@Override*/ public ListenableFuture<O> apply(I input) { 313 O output = function.apply(input); 314 return immediateFuture(output); 315 } 316 }; 317 return chain(future, wrapperFunction, exec); 318 } 319 320 /** 321 * Creates a new {@code Future} that wraps another {@code Future}. 322 * The result of the new future is the result of the provided function called 323 * on the result of the provided future. 324 * 325 * <p>An example use of this method is to convert a Future that produces a 326 * handle to an object to a future that produces the object itself. 327 * 328 * <p>Each call to {@code Future<O>.get(*)} results in a call to 329 * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it 330 * is assumed that {@code Future<I>.get(*)} is idempotent. 331 * 332 * <p>When calling {@link Future#get(long, TimeUnit)} on the returned 333 * future, the timeout only applies to the future passed in to this method. 334 * Any additional time taken by applying {@code function} is not considered. 335 * 336 * @param future The future to compose 337 * @param function A Function to compose the results of the provided future 338 * to the results of the returned future. This will be run in the thread 339 * that calls one of the varieties of {@code get()}. 340 * @return A future that computes result of the composition. 341 */ 342 public static <I, O> Future<O> compose(final Future<I> future, 343 final Function<? super I, ? extends O> function) { 344 345 return new Future<O>() { 346 347 /* 348 * Concurrency detail: 349 * 350 * <p>To preserve the idempotency of calls to this.get(*) calls to the 351 * function are only applied once. A lock is required to prevent multiple 352 * applications of the function. The calls to future.get(*) are performed 353 * outside the lock, as is required to prevent calls to 354 * get(long, TimeUnit) to persist beyond their timeout. 355 * 356 * <p>Calls to future.get(*) on every call to this.get(*) also provide 357 * the cancellation behavior for this. 358 * 359 * <p>(Consider: in thread A, call get(), in thread B call get(long, 360 * TimeUnit). Thread B may have to wait for Thread A to finish, which 361 * would be unacceptable.) 362 * 363 * <p>Note that each call to Future<O>.get(*) results in a call to 364 * Future<I>.get(*), but the function is only applied once, so 365 * Future<I>.get(*) is assumed to be idempotent. 366 */ 367 368 private final Object lock = new Object(); 369 private boolean set = false; 370 private O value = null; 371 372 /*@Override*/ 373 public O get() throws InterruptedException, ExecutionException { 374 return apply(future.get()); 375 } 376 377 /*@Override*/ 378 public O get(long timeout, TimeUnit unit) throws InterruptedException, 379 ExecutionException, TimeoutException { 380 return apply(future.get(timeout, unit)); 381 } 382 383 private O apply(I raw) { 384 synchronized(lock) { 385 if (!set) { 386 value = function.apply(raw); 387 set = true; 388 } 389 return value; 390 } 391 } 392 393 /*@Override*/ 394 public boolean cancel(boolean mayInterruptIfRunning) { 395 return future.cancel(mayInterruptIfRunning); 396 } 397 398 /*@Override*/ 399 public boolean isCancelled() { 400 return future.isCancelled(); 401 } 402 403 /*@Override*/ 404 public boolean isDone() { 405 return future.isDone(); 406 } 407 }; 408 } 409 410 /** 411 * An implementation of {@code ListenableFuture} that also implements 412 * {@code Runnable} so that it can be used to nest ListenableFutures. 413 * Once the passed-in {@code ListenableFuture} is complete, it calls the 414 * passed-in {@code Function} to generate the result. 415 * The resulting future doesn't interrupt when aborted. 416 * 417 * <p>If the function throws any checked exceptions, they should be wrapped 418 * in a {@code UndeclaredThrowableException} so that this class can get 419 * access to the cause. 420 */ 421 private static class ChainingListenableFuture<I, O> 422 extends AbstractListenableFuture<O> implements Runnable { 423 424 private final Function<? super I, ? extends ListenableFuture<? extends O>> 425 function; 426 private final UninterruptibleFuture<? extends I> inputFuture; 427 428 private ChainingListenableFuture( 429 Function<? super I, ? extends ListenableFuture<? extends O>> function, 430 ListenableFuture<? extends I> inputFuture) { 431 this.function = function; 432 this.inputFuture = makeUninterruptible(inputFuture); 433 } 434 435 public void run() { 436 try { 437 I sourceResult; 438 try { 439 sourceResult = inputFuture.get(); 440 } catch (CancellationException e) { 441 // Cancel this future and return. 442 cancel(); 443 return; 444 } catch (ExecutionException e) { 445 // Set the cause of the exception as this future's exception 446 setException(e.getCause()); 447 return; 448 } 449 450 final ListenableFuture<? extends O> outputFuture = 451 function.apply(sourceResult); 452 outputFuture.addListener(new Runnable() { 453 public void run() { 454 try { 455 // Here it would have been nice to have had an 456 // UninterruptibleListenableFuture, but we don't want to start a 457 // combinatorial explosion of interfaces, so we have to make do. 458 set(makeUninterruptible(outputFuture).get()); 459 } catch (ExecutionException e) { 460 // Set the cause of the exception as this future's exception 461 setException(e.getCause()); 462 } 463 } 464 }, Executors.sameThreadExecutor()); 465 } catch (UndeclaredThrowableException e) { 466 // Set the cause of the exception as this future's exception 467 setException(e.getCause()); 468 } catch (RuntimeException e) { 469 // This exception is irrelevant in this thread, but useful for the 470 // client 471 setException(e); 472 } catch (Error e) { 473 // This seems evil, but the client needs to know an error occured and 474 // the error needs to be propagated ASAP. 475 setException(e); 476 throw e; 477 } 478 } 479 } 480 481 /** 482 * A checked future that uses a function to map from exceptions to the 483 * appropriate checked type. 484 */ 485 private static class MappingCheckedFuture<T, E extends Exception> extends 486 AbstractCheckedFuture<T, E> { 487 488 final Function<Exception, E> mapper; 489 490 MappingCheckedFuture(ListenableFuture<T> delegate, 491 Function<Exception, E> mapper) { 492 super(delegate); 493 494 this.mapper = mapper; 495 } 496 497 @Override 498 protected E mapException(Exception e) { 499 return mapper.apply(e); 500 } 501 } 502 503 /** 504 * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This 505 * will wait on the future to finish, and when it completes, run the 506 * listeners. This implementation will wait on the source future 507 * indefinitely, so if the source future never completes, the adapter will 508 * never complete either. 509 * 510 * <p>If the delegate future is interrupted or throws an unexpected unchecked 511 * exception, the listeners will not be invoked. 512 */ 513 private static class ListenableFutureAdapter<T> extends ForwardingFuture<T> 514 implements ListenableFuture<T> { 515 516 private static final Executor adapterExecutor = 517 java.util.concurrent.Executors.newCachedThreadPool(); 518 519 // The execution list to hold our listeners. 520 private final ExecutionList executionList = new ExecutionList(); 521 522 // This allows us to only start up a thread waiting on the delegate future 523 // when the first listener is added. 524 private final AtomicBoolean hasListeners = new AtomicBoolean(false); 525 526 // The delegate future. 527 private final Future<T> delegate; 528 529 ListenableFutureAdapter(final Future<T> delegate) { 530 this.delegate = delegate; 531 } 532 533 @Override 534 protected Future<T> delegate() { 535 return delegate; 536 } 537 538 /*@Override*/ 539 public void addListener(Runnable listener, Executor exec) { 540 541 // When a listener is first added, we run a task that will wait for 542 // the delegate to finish, and when it is done will run the listeners. 543 if (!hasListeners.get() && hasListeners.compareAndSet(false, true)) { 544 adapterExecutor.execute(new Runnable() { 545 /*@Override*/ 546 public void run() { 547 try { 548 delegate.get(); 549 } catch (CancellationException e) { 550 // The task was cancelled, so it is done, run the listeners. 551 } catch (InterruptedException e) { 552 // This thread was interrupted. This should never happen, so we 553 // throw an IllegalStateException. 554 throw new IllegalStateException("Adapter thread interrupted!", e); 555 } catch (ExecutionException e) { 556 // The task caused an exception, so it is done, run the listeners. 557 } 558 executionList.run(); 559 } 560 }); 561 } 562 executionList.add(listener, exec); 563 } 564 } 565 } 566