Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2007 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import static com.google.common.base.Preconditions.checkArgument;
     20 import static com.google.common.base.Preconditions.checkNotNull;
     21 
     22 import com.google.common.annotations.Beta;
     23 import com.google.common.annotations.VisibleForTesting;
     24 import com.google.common.base.Supplier;
     25 import com.google.common.base.Throwables;
     26 import com.google.common.collect.Lists;
     27 import com.google.common.collect.Queues;
     28 import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
     29 
     30 import java.lang.reflect.InvocationTargetException;
     31 import java.util.Collection;
     32 import java.util.Collections;
     33 import java.util.Iterator;
     34 import java.util.List;
     35 import java.util.concurrent.BlockingQueue;
     36 import java.util.concurrent.Callable;
     37 import java.util.concurrent.Delayed;
     38 import java.util.concurrent.ExecutionException;
     39 import java.util.concurrent.Executor;
     40 import java.util.concurrent.ExecutorService;
     41 import java.util.concurrent.Executors;
     42 import java.util.concurrent.Future;
     43 import java.util.concurrent.RejectedExecutionException;
     44 import java.util.concurrent.ScheduledExecutorService;
     45 import java.util.concurrent.ScheduledFuture;
     46 import java.util.concurrent.ScheduledThreadPoolExecutor;
     47 import java.util.concurrent.ThreadFactory;
     48 import java.util.concurrent.ThreadPoolExecutor;
     49 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
     50 import java.util.concurrent.TimeUnit;
     51 import java.util.concurrent.TimeoutException;
     52 import java.util.concurrent.locks.Condition;
     53 import java.util.concurrent.locks.Lock;
     54 import java.util.concurrent.locks.ReentrantLock;
     55 
     56 /**
     57  * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link
     58  * ExecutorService}, and {@link ThreadFactory}.
     59  *
     60  * @author Eric Fellheimer
     61  * @author Kyle Littlefield
     62  * @author Justin Mahoney
     63  * @since 3.0
     64  */
     65 public final class MoreExecutors {
     66   private MoreExecutors() {}
     67 
     68   /**
     69    * Converts the given ThreadPoolExecutor into an ExecutorService that exits
     70    * when the application is complete.  It does so by using daemon threads and
     71    * adding a shutdown hook to wait for their completion.
     72    *
     73    * <p>This is mainly for fixed thread pools.
     74    * See {@link Executors#newFixedThreadPool(int)}.
     75    *
     76    * @param executor the executor to modify to make sure it exits when the
     77    *        application is finished
     78    * @param terminationTimeout how long to wait for the executor to
     79    *        finish before terminating the JVM
     80    * @param timeUnit unit of time for the time parameter
     81    * @return an unmodifiable version of the input which will not hang the JVM
     82    */
     83   @Beta
     84   public static ExecutorService getExitingExecutorService(
     85       ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
     86     return new Application()
     87         .getExitingExecutorService(executor, terminationTimeout, timeUnit);
     88   }
     89 
     90   /**
     91    * Converts the given ScheduledThreadPoolExecutor into a
     92    * ScheduledExecutorService that exits when the application is complete.  It
     93    * does so by using daemon threads and adding a shutdown hook to wait for
     94    * their completion.
     95    *
     96    * <p>This is mainly for fixed thread pools.
     97    * See {@link Executors#newScheduledThreadPool(int)}.
     98    *
     99    * @param executor the executor to modify to make sure it exits when the
    100    *        application is finished
    101    * @param terminationTimeout how long to wait for the executor to
    102    *        finish before terminating the JVM
    103    * @param timeUnit unit of time for the time parameter
    104    * @return an unmodifiable version of the input which will not hang the JVM
    105    */
    106   @Beta
    107   public static ScheduledExecutorService getExitingScheduledExecutorService(
    108       ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
    109     return new Application()
    110         .getExitingScheduledExecutorService(executor, terminationTimeout, timeUnit);
    111   }
    112 
    113   /**
    114    * Add a shutdown hook to wait for thread completion in the given
    115    * {@link ExecutorService service}.  This is useful if the given service uses
    116    * daemon threads, and we want to keep the JVM from exiting immediately on
    117    * shutdown, instead giving these daemon threads a chance to terminate
    118    * normally.
    119    * @param service ExecutorService which uses daemon threads
    120    * @param terminationTimeout how long to wait for the executor to finish
    121    *        before terminating the JVM
    122    * @param timeUnit unit of time for the time parameter
    123    */
    124   @Beta
    125   public static void addDelayedShutdownHook(
    126       ExecutorService service, long terminationTimeout, TimeUnit timeUnit) {
    127     new Application()
    128         .addDelayedShutdownHook(service, terminationTimeout, timeUnit);
    129   }
    130 
    131   /**
    132    * Converts the given ThreadPoolExecutor into an ExecutorService that exits
    133    * when the application is complete.  It does so by using daemon threads and
    134    * adding a shutdown hook to wait for their completion.
    135    *
    136    * <p>This method waits 120 seconds before continuing with JVM termination,
    137    * even if the executor has not finished its work.
    138    *
    139    * <p>This is mainly for fixed thread pools.
    140    * See {@link Executors#newFixedThreadPool(int)}.
    141    *
    142    * @param executor the executor to modify to make sure it exits when the
    143    *        application is finished
    144    * @return an unmodifiable version of the input which will not hang the JVM
    145    */
    146   @Beta
    147   public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
    148     return new Application().getExitingExecutorService(executor);
    149   }
    150 
    151   /**
    152    * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that
    153    * exits when the application is complete.  It does so by using daemon threads
    154    * and adding a shutdown hook to wait for their completion.
    155    *
    156    * <p>This method waits 120 seconds before continuing with JVM termination,
    157    * even if the executor has not finished its work.
    158    *
    159    * <p>This is mainly for fixed thread pools.
    160    * See {@link Executors#newScheduledThreadPool(int)}.
    161    *
    162    * @param executor the executor to modify to make sure it exits when the
    163    *        application is finished
    164    * @return an unmodifiable version of the input which will not hang the JVM
    165    */
    166   @Beta
    167   public static ScheduledExecutorService getExitingScheduledExecutorService(
    168       ScheduledThreadPoolExecutor executor) {
    169     return new Application().getExitingScheduledExecutorService(executor);
    170   }
    171 
    172   /** Represents the current application to register shutdown hooks. */
    173   @VisibleForTesting static class Application {
    174 
    175     final ExecutorService getExitingExecutorService(
    176         ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
    177       useDaemonThreadFactory(executor);
    178       ExecutorService service = Executors.unconfigurableExecutorService(executor);
    179       addDelayedShutdownHook(service, terminationTimeout, timeUnit);
    180       return service;
    181     }
    182 
    183     final ScheduledExecutorService getExitingScheduledExecutorService(
    184         ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
    185       useDaemonThreadFactory(executor);
    186       ScheduledExecutorService service = Executors.unconfigurableScheduledExecutorService(executor);
    187       addDelayedShutdownHook(service, terminationTimeout, timeUnit);
    188       return service;
    189     }
    190 
    191     final void addDelayedShutdownHook(
    192         final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) {
    193       checkNotNull(service);
    194       checkNotNull(timeUnit);
    195       addShutdownHook(MoreExecutors.newThread("DelayedShutdownHook-for-" + service, new Runnable() {
    196         @Override
    197         public void run() {
    198           try {
    199             // We'd like to log progress and failures that may arise in the
    200             // following code, but unfortunately the behavior of logging
    201             // is undefined in shutdown hooks.
    202             // This is because the logging code installs a shutdown hook of its
    203             // own. See Cleaner class inside {@link LogManager}.
    204             service.shutdown();
    205             service.awaitTermination(terminationTimeout, timeUnit);
    206           } catch (InterruptedException ignored) {
    207             // We're shutting down anyway, so just ignore.
    208           }
    209         }
    210       }));
    211     }
    212 
    213     final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
    214       return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
    215     }
    216 
    217     final ScheduledExecutorService getExitingScheduledExecutorService(
    218         ScheduledThreadPoolExecutor executor) {
    219       return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
    220     }
    221 
    222     @VisibleForTesting void addShutdownHook(Thread hook) {
    223       Runtime.getRuntime().addShutdownHook(hook);
    224     }
    225   }
    226 
    227   private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
    228     executor.setThreadFactory(new ThreadFactoryBuilder()
    229         .setDaemon(true)
    230         .setThreadFactory(executor.getThreadFactory())
    231         .build());
    232   }
    233 
    234   /**
    235    * Creates an executor service that runs each task in the thread
    236    * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
    237    * applies both to individually submitted tasks and to collections of tasks
    238    * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
    239    * tasks will run serially on the calling thread.  Tasks are run to
    240    * completion before a {@code Future} is returned to the caller (unless the
    241    * executor has been shutdown).
    242    *
    243    * <p>Although all tasks are immediately executed in the thread that
    244    * submitted the task, this {@code ExecutorService} imposes a small
    245    * locking overhead on each task submission in order to implement shutdown
    246    * and termination behavior.
    247    *
    248    * <p>The implementation deviates from the {@code ExecutorService}
    249    * specification with regards to the {@code shutdownNow} method.  First,
    250    * "best-effort" with regards to canceling running tasks is implemented
    251    * as "no-effort".  No interrupts or other attempts are made to stop
    252    * threads executing tasks.  Second, the returned list will always be empty,
    253    * as any submitted task is considered to have started execution.
    254    * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
    255    * which are pending serial execution, even the subset of the tasks that
    256    * have not yet started execution.  It is unclear from the
    257    * {@code ExecutorService} specification if these should be included, and
    258    * it's much easier to implement the interpretation that they not be.
    259    * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
    260    * in concurrent calls to {@code invokeAll/invokeAny} throwing
    261    * RejectedExecutionException, although a subset of the tasks may already
    262    * have been executed.
    263    *
    264    * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility"
    265    *        >mostly source-compatible</a> since 3.0)
    266    * @deprecated Use {@link #directExecutor()} if you only require an {@link Executor} and
    267    *     {@link #newDirectExecutorService()} if you need a {@link ListeningExecutorService}.
    268    */
    269   @Deprecated public static ListeningExecutorService sameThreadExecutor() {
    270     return new DirectExecutorService();
    271   }
    272 
    273   // See sameThreadExecutor javadoc for behavioral notes.
    274   private static class DirectExecutorService
    275       extends AbstractListeningExecutorService {
    276     /**
    277      * Lock used whenever accessing the state variables
    278      * (runningTasks, shutdown, terminationCondition) of the executor
    279      */
    280     private final Lock lock = new ReentrantLock();
    281 
    282     /** Signaled after the executor is shutdown and running tasks are done */
    283     private final Condition termination = lock.newCondition();
    284 
    285     /*
    286      * Conceptually, these two variables describe the executor being in
    287      * one of three states:
    288      *   - Active: shutdown == false
    289      *   - Shutdown: runningTasks > 0 and shutdown == true
    290      *   - Terminated: runningTasks == 0 and shutdown == true
    291      */
    292     private int runningTasks = 0;
    293     private boolean shutdown = false;
    294 
    295     @Override
    296     public void execute(Runnable command) {
    297       startTask();
    298       try {
    299         command.run();
    300       } finally {
    301         endTask();
    302       }
    303     }
    304 
    305     @Override
    306     public boolean isShutdown() {
    307       lock.lock();
    308       try {
    309         return shutdown;
    310       } finally {
    311         lock.unlock();
    312       }
    313     }
    314 
    315     @Override
    316     public void shutdown() {
    317       lock.lock();
    318       try {
    319         shutdown = true;
    320       } finally {
    321         lock.unlock();
    322       }
    323     }
    324 
    325     // See sameThreadExecutor javadoc for unusual behavior of this method.
    326     @Override
    327     public List<Runnable> shutdownNow() {
    328       shutdown();
    329       return Collections.emptyList();
    330     }
    331 
    332     @Override
    333     public boolean isTerminated() {
    334       lock.lock();
    335       try {
    336         return shutdown && runningTasks == 0;
    337       } finally {
    338         lock.unlock();
    339       }
    340     }
    341 
    342     @Override
    343     public boolean awaitTermination(long timeout, TimeUnit unit)
    344         throws InterruptedException {
    345       long nanos = unit.toNanos(timeout);
    346       lock.lock();
    347       try {
    348         for (;;) {
    349           if (isTerminated()) {
    350             return true;
    351           } else if (nanos <= 0) {
    352             return false;
    353           } else {
    354             nanos = termination.awaitNanos(nanos);
    355           }
    356         }
    357       } finally {
    358         lock.unlock();
    359       }
    360     }
    361 
    362     /**
    363      * Checks if the executor has been shut down and increments the running
    364      * task count.
    365      *
    366      * @throws RejectedExecutionException if the executor has been previously
    367      *         shutdown
    368      */
    369     private void startTask() {
    370       lock.lock();
    371       try {
    372         if (isShutdown()) {
    373           throw new RejectedExecutionException("Executor already shutdown");
    374         }
    375         runningTasks++;
    376       } finally {
    377         lock.unlock();
    378       }
    379     }
    380 
    381     /**
    382      * Decrements the running task count.
    383      */
    384     private void endTask() {
    385       lock.lock();
    386       try {
    387         runningTasks--;
    388         if (isTerminated()) {
    389           termination.signalAll();
    390         }
    391       } finally {
    392         lock.unlock();
    393       }
    394     }
    395   }
    396 
    397   /**
    398    * Creates an executor service that runs each task in the thread
    399    * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
    400    * applies both to individually submitted tasks and to collections of tasks
    401    * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
    402    * tasks will run serially on the calling thread.  Tasks are run to
    403    * completion before a {@code Future} is returned to the caller (unless the
    404    * executor has been shutdown).
    405    *
    406    * <p>Although all tasks are immediately executed in the thread that
    407    * submitted the task, this {@code ExecutorService} imposes a small
    408    * locking overhead on each task submission in order to implement shutdown
    409    * and termination behavior.
    410    *
    411    * <p>The implementation deviates from the {@code ExecutorService}
    412    * specification with regards to the {@code shutdownNow} method.  First,
    413    * "best-effort" with regards to canceling running tasks is implemented
    414    * as "no-effort".  No interrupts or other attempts are made to stop
    415    * threads executing tasks.  Second, the returned list will always be empty,
    416    * as any submitted task is considered to have started execution.
    417    * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
    418    * which are pending serial execution, even the subset of the tasks that
    419    * have not yet started execution.  It is unclear from the
    420    * {@code ExecutorService} specification if these should be included, and
    421    * it's much easier to implement the interpretation that they not be.
    422    * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
    423    * in concurrent calls to {@code invokeAll/invokeAny} throwing
    424    * RejectedExecutionException, although a subset of the tasks may already
    425    * have been executed.
    426    *
    427    * @since 18.0 (present as MoreExecutors.sameThreadExecutor() since 10.0)
    428    */
    429   public static ListeningExecutorService newDirectExecutorService() {
    430     return new DirectExecutorService();
    431   }
    432 
    433   /**
    434    * Returns an {@link Executor} that runs each task in the thread that invokes
    435    * {@link Executor#execute execute}, as in {@link CallerRunsPolicy}.
    436    *
    437    * <p>This instance is equivalent to: <pre>   {@code
    438    *   final class DirectExecutor implements Executor {
    439    *     public void execute(Runnable r) {
    440    *       r.run();
    441    *     }
    442    *   }}</pre>
    443    *
    444    * <p>This should be preferred to {@link #newDirectExecutorService()} because the implementing the
    445    * {@link ExecutorService} subinterface necessitates significant performance overhead.
    446    *
    447    * @since 18.0
    448    */
    449   public static Executor directExecutor() {
    450     return DirectExecutor.INSTANCE;
    451   }
    452 
    453   /** See {@link #directExecutor} for behavioral notes. */
    454   private enum DirectExecutor implements Executor {
    455     INSTANCE;
    456     @Override public void execute(Runnable command) {
    457       command.run();
    458     }
    459   }
    460 
    461   /**
    462    * Creates an {@link ExecutorService} whose {@code submit} and {@code
    463    * invokeAll} methods submit {@link ListenableFutureTask} instances to the
    464    * given delegate executor. Those methods, as well as {@code execute} and
    465    * {@code invokeAny}, are implemented in terms of calls to {@code
    466    * delegate.execute}. All other methods are forwarded unchanged to the
    467    * delegate. This implies that the returned {@code ListeningExecutorService}
    468    * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code
    469    * invokeAny} methods, so any special handling of tasks must be implemented in
    470    * the delegate's {@code execute} method or by wrapping the returned {@code
    471    * ListeningExecutorService}.
    472    *
    473    * <p>If the delegate executor was already an instance of {@code
    474    * ListeningExecutorService}, it is returned untouched, and the rest of this
    475    * documentation does not apply.
    476    *
    477    * @since 10.0
    478    */
    479   public static ListeningExecutorService listeningDecorator(
    480       ExecutorService delegate) {
    481     return (delegate instanceof ListeningExecutorService)
    482         ? (ListeningExecutorService) delegate
    483         : (delegate instanceof ScheduledExecutorService)
    484         ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
    485         : new ListeningDecorator(delegate);
    486   }
    487 
    488   /**
    489    * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code
    490    * invokeAll} methods submit {@link ListenableFutureTask} instances to the
    491    * given delegate executor. Those methods, as well as {@code execute} and
    492    * {@code invokeAny}, are implemented in terms of calls to {@code
    493    * delegate.execute}. All other methods are forwarded unchanged to the
    494    * delegate. This implies that the returned {@code
    495    * ListeningScheduledExecutorService} never calls the delegate's {@code
    496    * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special
    497    * handling of tasks must be implemented in the delegate's {@code execute}
    498    * method or by wrapping the returned {@code
    499    * ListeningScheduledExecutorService}.
    500    *
    501    * <p>If the delegate executor was already an instance of {@code
    502    * ListeningScheduledExecutorService}, it is returned untouched, and the rest
    503    * of this documentation does not apply.
    504    *
    505    * @since 10.0
    506    */
    507   public static ListeningScheduledExecutorService listeningDecorator(
    508       ScheduledExecutorService delegate) {
    509     return (delegate instanceof ListeningScheduledExecutorService)
    510         ? (ListeningScheduledExecutorService) delegate
    511         : new ScheduledListeningDecorator(delegate);
    512   }
    513 
    514   private static class ListeningDecorator
    515       extends AbstractListeningExecutorService {
    516     private final ExecutorService delegate;
    517 
    518     ListeningDecorator(ExecutorService delegate) {
    519       this.delegate = checkNotNull(delegate);
    520     }
    521 
    522     @Override
    523     public boolean awaitTermination(long timeout, TimeUnit unit)
    524         throws InterruptedException {
    525       return delegate.awaitTermination(timeout, unit);
    526     }
    527 
    528     @Override
    529     public boolean isShutdown() {
    530       return delegate.isShutdown();
    531     }
    532 
    533     @Override
    534     public boolean isTerminated() {
    535       return delegate.isTerminated();
    536     }
    537 
    538     @Override
    539     public void shutdown() {
    540       delegate.shutdown();
    541     }
    542 
    543     @Override
    544     public List<Runnable> shutdownNow() {
    545       return delegate.shutdownNow();
    546     }
    547 
    548     @Override
    549     public void execute(Runnable command) {
    550       delegate.execute(command);
    551     }
    552   }
    553 
    554   private static class ScheduledListeningDecorator
    555       extends ListeningDecorator implements ListeningScheduledExecutorService {
    556     @SuppressWarnings("hiding")
    557     final ScheduledExecutorService delegate;
    558 
    559     ScheduledListeningDecorator(ScheduledExecutorService delegate) {
    560       super(delegate);
    561       this.delegate = checkNotNull(delegate);
    562     }
    563 
    564     @Override
    565     public ListenableScheduledFuture<?> schedule(
    566         Runnable command, long delay, TimeUnit unit) {
    567       ListenableFutureTask<Void> task =
    568           ListenableFutureTask.create(command, null);
    569       ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
    570       return new ListenableScheduledTask<Void>(task, scheduled);
    571     }
    572 
    573     @Override
    574     public <V> ListenableScheduledFuture<V> schedule(
    575         Callable<V> callable, long delay, TimeUnit unit) {
    576       ListenableFutureTask<V> task = ListenableFutureTask.create(callable);
    577       ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
    578       return new ListenableScheduledTask<V>(task, scheduled);
    579     }
    580 
    581     @Override
    582     public ListenableScheduledFuture<?> scheduleAtFixedRate(
    583         Runnable command, long initialDelay, long period, TimeUnit unit) {
    584       NeverSuccessfulListenableFutureTask task =
    585           new NeverSuccessfulListenableFutureTask(command);
    586       ScheduledFuture<?> scheduled =
    587           delegate.scheduleAtFixedRate(task, initialDelay, period, unit);
    588       return new ListenableScheduledTask<Void>(task, scheduled);
    589     }
    590 
    591     @Override
    592     public ListenableScheduledFuture<?> scheduleWithFixedDelay(
    593         Runnable command, long initialDelay, long delay, TimeUnit unit) {
    594       NeverSuccessfulListenableFutureTask task =
    595           new NeverSuccessfulListenableFutureTask(command);
    596       ScheduledFuture<?> scheduled =
    597           delegate.scheduleWithFixedDelay(task, initialDelay, delay, unit);
    598       return new ListenableScheduledTask<Void>(task, scheduled);
    599     }
    600 
    601     private static final class ListenableScheduledTask<V>
    602         extends SimpleForwardingListenableFuture<V>
    603         implements ListenableScheduledFuture<V> {
    604 
    605       private final ScheduledFuture<?> scheduledDelegate;
    606 
    607       public ListenableScheduledTask(
    608           ListenableFuture<V> listenableDelegate,
    609           ScheduledFuture<?> scheduledDelegate) {
    610         super(listenableDelegate);
    611         this.scheduledDelegate = scheduledDelegate;
    612       }
    613 
    614       @Override
    615       public boolean cancel(boolean mayInterruptIfRunning) {
    616         boolean cancelled = super.cancel(mayInterruptIfRunning);
    617         if (cancelled) {
    618           // Unless it is cancelled, the delegate may continue being scheduled
    619           scheduledDelegate.cancel(mayInterruptIfRunning);
    620 
    621           // TODO(user): Cancel "this" if "scheduledDelegate" is cancelled.
    622         }
    623         return cancelled;
    624       }
    625 
    626       @Override
    627       public long getDelay(TimeUnit unit) {
    628         return scheduledDelegate.getDelay(unit);
    629       }
    630 
    631       @Override
    632       public int compareTo(Delayed other) {
    633         return scheduledDelegate.compareTo(other);
    634       }
    635     }
    636 
    637     private static final class NeverSuccessfulListenableFutureTask
    638         extends AbstractFuture<Void>
    639         implements Runnable {
    640       private final Runnable delegate;
    641 
    642       public NeverSuccessfulListenableFutureTask(Runnable delegate) {
    643         this.delegate = checkNotNull(delegate);
    644       }
    645 
    646       @Override public void run() {
    647         try {
    648           delegate.run();
    649         } catch (Throwable t) {
    650           setException(t);
    651           throw Throwables.propagate(t);
    652         }
    653       }
    654     }
    655   }
    656 
    657   /*
    658    * This following method is a modified version of one found in
    659    * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
    660    * which contained the following notice:
    661    *
    662    * Written by Doug Lea with assistance from members of JCP JSR-166
    663    * Expert Group and released to the public domain, as explained at
    664    * http://creativecommons.org/publicdomain/zero/1.0/
    665    * Other contributors include Andrew Wright, Jeffrey Hayes,
    666    * Pat Fisher, Mike Judd.
    667    */
    668 
    669   /**
    670    * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
    671    * implementations.
    672    */ static <T> T invokeAnyImpl(ListeningExecutorService executorService,
    673       Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
    674           throws InterruptedException, ExecutionException, TimeoutException {
    675     checkNotNull(executorService);
    676     int ntasks = tasks.size();
    677     checkArgument(ntasks > 0);
    678     List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks);
    679     BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue();
    680 
    681     // For efficiency, especially in executors with limited
    682     // parallelism, check to see if previously submitted tasks are
    683     // done before submitting more of them. This interleaving
    684     // plus the exception mechanics account for messiness of main
    685     // loop.
    686 
    687     try {
    688       // Record exceptions so that if we fail to obtain any
    689       // result, we can throw the last exception we got.
    690       ExecutionException ee = null;
    691       long lastTime = timed ? System.nanoTime() : 0;
    692       Iterator<? extends Callable<T>> it = tasks.iterator();
    693 
    694       futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
    695       --ntasks;
    696       int active = 1;
    697 
    698       for (;;) {
    699         Future<T> f = futureQueue.poll();
    700         if (f == null) {
    701           if (ntasks > 0) {
    702             --ntasks;
    703             futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
    704             ++active;
    705           } else if (active == 0) {
    706             break;
    707           } else if (timed) {
    708             f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);
    709             if (f == null) {
    710               throw new TimeoutException();
    711             }
    712             long now = System.nanoTime();
    713             nanos -= now - lastTime;
    714             lastTime = now;
    715           } else {
    716             f = futureQueue.take();
    717           }
    718         }
    719         if (f != null) {
    720           --active;
    721           try {
    722             return f.get();
    723           } catch (ExecutionException eex) {
    724             ee = eex;
    725           } catch (RuntimeException rex) {
    726             ee = new ExecutionException(rex);
    727           }
    728         }
    729       }
    730 
    731       if (ee == null) {
    732         ee = new ExecutionException(null);
    733       }
    734       throw ee;
    735     } finally {
    736       for (Future<T> f : futures) {
    737         f.cancel(true);
    738       }
    739     }
    740   }
    741 
    742   /**
    743    * Submits the task and adds a listener that adds the future to {@code queue} when it completes.
    744    */
    745   private static <T> ListenableFuture<T> submitAndAddQueueListener(
    746       ListeningExecutorService executorService, Callable<T> task,
    747       final BlockingQueue<Future<T>> queue) {
    748     final ListenableFuture<T> future = executorService.submit(task);
    749     future.addListener(new Runnable() {
    750       @Override public void run() {
    751         queue.add(future);
    752       }
    753     }, directExecutor());
    754     return future;
    755   }
    756 
    757   /**
    758    * Returns a default thread factory used to create new threads.
    759    *
    760    * <p>On AppEngine, returns {@code ThreadManager.currentRequestThreadFactory()}.
    761    * Otherwise, returns {@link Executors#defaultThreadFactory()}.
    762    *
    763    * @since 14.0
    764    */
    765   @Beta
    766   public static ThreadFactory platformThreadFactory() {
    767     if (!isAppEngine()) {
    768       return Executors.defaultThreadFactory();
    769     }
    770     try {
    771       return (ThreadFactory) Class.forName("com.google.appengine.api.ThreadManager")
    772           .getMethod("currentRequestThreadFactory")
    773           .invoke(null);
    774     } catch (IllegalAccessException e) {
    775       throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
    776     } catch (ClassNotFoundException e) {
    777       throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
    778     } catch (NoSuchMethodException e) {
    779       throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
    780     } catch (InvocationTargetException e) {
    781       throw Throwables.propagate(e.getCause());
    782     }
    783   }
    784 
    785   private static boolean isAppEngine() {
    786     if (System.getProperty("com.google.appengine.runtime.environment") == null) {
    787       return false;
    788     }
    789     try {
    790       // If the current environment is null, we're not inside AppEngine.
    791       return Class.forName("com.google.apphosting.api.ApiProxy")
    792           .getMethod("getCurrentEnvironment")
    793           .invoke(null) != null;
    794     } catch (ClassNotFoundException e) {
    795       // If ApiProxy doesn't exist, we're not on AppEngine at all.
    796       return false;
    797     } catch (InvocationTargetException e) {
    798       // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
    799       return false;
    800     } catch (IllegalAccessException e) {
    801       // If the method isn't accessible, we're not on a supported version of AppEngine;
    802       return false;
    803     } catch (NoSuchMethodException e) {
    804       // If the method doesn't exist, we're not on a supported version of AppEngine;
    805       return false;
    806     }
    807   }
    808 
    809   /**
    810    * Creates a thread using {@link #platformThreadFactory}, and sets its name to {@code name}
    811    * unless changing the name is forbidden by the security manager.
    812    */
    813   static Thread newThread(String name, Runnable runnable) {
    814     checkNotNull(name);
    815     checkNotNull(runnable);
    816     Thread result = platformThreadFactory().newThread(runnable);
    817     try {
    818       result.setName(name);
    819     } catch (SecurityException e) {
    820       // OK if we can't set the name in this environment.
    821     }
    822     return result;
    823   }
    824 
    825   // TODO(user): provide overloads for ListeningExecutorService? ListeningScheduledExecutorService?
    826   // TODO(user): provide overloads that take constant strings? Function<Runnable, String>s to
    827   // calculate names?
    828 
    829   /**
    830    * Creates an {@link Executor} that renames the {@link Thread threads} that its tasks run in.
    831    *
    832    * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
    833    * right before each task is run.  The renaming is best effort, if a {@link SecurityManager}
    834    * prevents the renaming then it will be skipped but the tasks will still execute.
    835    *
    836    *
    837    * @param executor The executor to decorate
    838    * @param nameSupplier The source of names for each task
    839    */
    840   static Executor renamingDecorator(final Executor executor, final Supplier<String> nameSupplier) {
    841     checkNotNull(executor);
    842     checkNotNull(nameSupplier);
    843     if (isAppEngine()) {
    844       // AppEngine doesn't support thread renaming, so don't even try
    845       return executor;
    846     }
    847     return new Executor() {
    848       @Override public void execute(Runnable command) {
    849         executor.execute(Callables.threadRenaming(command, nameSupplier));
    850       }
    851     };
    852   }
    853 
    854   /**
    855    * Creates an {@link ExecutorService} that renames the {@link Thread threads} that its tasks run
    856    * in.
    857    *
    858    * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
    859    * right before each task is run.  The renaming is best effort, if a {@link SecurityManager}
    860    * prevents the renaming then it will be skipped but the tasks will still execute.
    861    *
    862    *
    863    * @param service The executor to decorate
    864    * @param nameSupplier The source of names for each task
    865    */
    866   static ExecutorService renamingDecorator(final ExecutorService service,
    867       final Supplier<String> nameSupplier) {
    868     checkNotNull(service);
    869     checkNotNull(nameSupplier);
    870     if (isAppEngine()) {
    871       // AppEngine doesn't support thread renaming, so don't even try.
    872       return service;
    873     }
    874     return new WrappingExecutorService(service) {
    875       @Override protected <T> Callable<T> wrapTask(Callable<T> callable) {
    876         return Callables.threadRenaming(callable, nameSupplier);
    877       }
    878       @Override protected Runnable wrapTask(Runnable command) {
    879         return Callables.threadRenaming(command, nameSupplier);
    880       }
    881     };
    882   }
    883 
    884   /**
    885    * Creates a {@link ScheduledExecutorService} that renames the {@link Thread threads} that its
    886    * tasks run in.
    887    *
    888    * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
    889    * right before each task is run.  The renaming is best effort, if a {@link SecurityManager}
    890    * prevents the renaming then it will be skipped but the tasks will still execute.
    891    *
    892    *
    893    * @param service The executor to decorate
    894    * @param nameSupplier The source of names for each task
    895    */
    896   static ScheduledExecutorService renamingDecorator(final ScheduledExecutorService service,
    897       final Supplier<String> nameSupplier) {
    898     checkNotNull(service);
    899     checkNotNull(nameSupplier);
    900     if (isAppEngine()) {
    901       // AppEngine doesn't support thread renaming, so don't even try.
    902       return service;
    903     }
    904     return new WrappingScheduledExecutorService(service) {
    905       @Override protected <T> Callable<T> wrapTask(Callable<T> callable) {
    906         return Callables.threadRenaming(callable, nameSupplier);
    907       }
    908       @Override protected Runnable wrapTask(Runnable command) {
    909         return Callables.threadRenaming(command, nameSupplier);
    910       }
    911     };
    912   }
    913 
    914   /**
    915    * Shuts down the given executor gradually, first disabling new submissions and later cancelling
    916    * existing tasks.
    917    *
    918    * <p>The method takes the following steps:
    919    * <ol>
    920    *  <li>calls {@link ExecutorService#shutdown()}, disabling acceptance of new submitted tasks.
    921    *  <li>waits for half of the specified timeout.
    922    *  <li>if the timeout expires, it calls {@link ExecutorService#shutdownNow()}, cancelling
    923    *  pending tasks and interrupting running tasks.
    924    *  <li>waits for the other half of the specified timeout.
    925    * </ol>
    926    *
    927    * <p>If, at any step of the process, the given executor is terminated or the calling thread is
    928    * interrupted, the method calls {@link ExecutorService#shutdownNow()}, cancelling
    929    * pending tasks and interrupting running tasks.
    930    *
    931    * @param service the {@code ExecutorService} to shut down
    932    * @param timeout the maximum time to wait for the {@code ExecutorService} to terminate
    933    * @param unit the time unit of the timeout argument
    934    * @return {@code true} if the pool was terminated successfully, {@code false} if the
    935    *     {@code ExecutorService} could not terminate <b>or</b> the thread running this method
    936    *     is interrupted while waiting for the {@code ExecutorService} to terminate
    937    * @since 17.0
    938    */
    939   @Beta
    940   public static boolean shutdownAndAwaitTermination(
    941       ExecutorService service, long timeout, TimeUnit unit) {
    942     checkNotNull(unit);
    943     // Disable new tasks from being submitted
    944     service.shutdown();
    945     try {
    946       long halfTimeoutNanos = TimeUnit.NANOSECONDS.convert(timeout, unit) / 2;
    947       // Wait for half the duration of the timeout for existing tasks to terminate
    948       if (!service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS)) {
    949         // Cancel currently executing tasks
    950         service.shutdownNow();
    951         // Wait the other half of the timeout for tasks to respond to being cancelled
    952         service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS);
    953       }
    954     } catch (InterruptedException ie) {
    955       // Preserve interrupt status
    956       Thread.currentThread().interrupt();
    957       // (Re-)Cancel if current thread also interrupted
    958       service.shutdownNow();
    959     }
    960     return service.isTerminated();
    961   }
    962 }
    963