Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2007 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import static com.google.common.base.Preconditions.checkNotNull;
     20 
     21 import com.google.common.annotations.Beta;
     22 
     23 import java.util.Collections;
     24 import java.util.List;
     25 import java.util.concurrent.Callable;
     26 import java.util.concurrent.ExecutorService;
     27 import java.util.concurrent.Executors;
     28 import java.util.concurrent.RejectedExecutionException;
     29 import java.util.concurrent.ScheduledExecutorService;
     30 import java.util.concurrent.ScheduledFuture;
     31 import java.util.concurrent.ScheduledThreadPoolExecutor;
     32 import java.util.concurrent.ThreadFactory;
     33 import java.util.concurrent.ThreadPoolExecutor;
     34 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
     35 import java.util.concurrent.TimeUnit;
     36 import java.util.concurrent.locks.Condition;
     37 import java.util.concurrent.locks.Lock;
     38 import java.util.concurrent.locks.ReentrantLock;
     39 
     40 /**
     41  * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link
     42  * ExecutorService}, and {@link ThreadFactory}.
     43  *
     44  * @author Eric Fellheimer
     45  * @author Kyle Littlefield
     46  * @author Justin Mahoney
     47  * @since 3.0
     48  */
     49 public final class MoreExecutors {
     50   private MoreExecutors() {}
     51 
     52   /**
     53    * Converts the given ThreadPoolExecutor into an ExecutorService that exits
     54    * when the application is complete.  It does so by using daemon threads and
     55    * adding a shutdown hook to wait for their completion.
     56    *
     57    * <p>This is mainly for fixed thread pools.
     58    * See {@link Executors#newFixedThreadPool(int)}.
     59    *
     60    * @param executor the executor to modify to make sure it exits when the
     61    *        application is finished
     62    * @param terminationTimeout how long to wait for the executor to
     63    *        finish before terminating the JVM
     64    * @param timeUnit unit of time for the time parameter
     65    * @return an unmodifiable version of the input which will not hang the JVM
     66    */
     67   @Beta
     68   public static ExecutorService getExitingExecutorService(
     69       ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
     70     executor.setThreadFactory(new ThreadFactoryBuilder()
     71         .setDaemon(true)
     72         .setThreadFactory(executor.getThreadFactory())
     73         .build());
     74 
     75     ExecutorService service = Executors.unconfigurableExecutorService(executor);
     76 
     77     addDelayedShutdownHook(service, terminationTimeout, timeUnit);
     78 
     79     return service;
     80   }
     81 
     82   /**
     83    * Converts the given ScheduledThreadPoolExecutor into a
     84    * ScheduledExecutorService that exits when the application is complete.  It
     85    * does so by using daemon threads and adding a shutdown hook to wait for
     86    * their completion.
     87    *
     88    * <p>This is mainly for fixed thread pools.
     89    * See {@link Executors#newScheduledThreadPool(int)}.
     90    *
     91    * @param executor the executor to modify to make sure it exits when the
     92    *        application is finished
     93    * @param terminationTimeout how long to wait for the executor to
     94    *        finish before terminating the JVM
     95    * @param timeUnit unit of time for the time parameter
     96    * @return an unmodifiable version of the input which will not hang the JVM
     97    */
     98   @Beta
     99   public static ScheduledExecutorService getExitingScheduledExecutorService(
    100       ScheduledThreadPoolExecutor executor, long terminationTimeout,
    101       TimeUnit timeUnit) {
    102     executor.setThreadFactory(new ThreadFactoryBuilder()
    103         .setDaemon(true)
    104         .setThreadFactory(executor.getThreadFactory())
    105         .build());
    106 
    107     ScheduledExecutorService service =
    108         Executors.unconfigurableScheduledExecutorService(executor);
    109 
    110     addDelayedShutdownHook(service, terminationTimeout, timeUnit);
    111 
    112     return service;
    113   }
    114 
    115   /**
    116    * Add a shutdown hook to wait for thread completion in the given
    117    * {@link ExecutorService service}.  This is useful if the given service uses
    118    * daemon threads, and we want to keep the JVM from exiting immediately on
    119    * shutdown, instead giving these daemon threads a chance to terminate
    120    * normally.
    121    * @param service ExecutorService which uses daemon threads
    122    * @param terminationTimeout how long to wait for the executor to finish
    123    *        before terminating the JVM
    124    * @param timeUnit unit of time for the time parameter
    125    */
    126   @Beta
    127   public static void addDelayedShutdownHook(
    128       final ExecutorService service, final long terminationTimeout,
    129       final TimeUnit timeUnit) {
    130     Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    131       @Override
    132       public void run() {
    133         try {
    134           // We'd like to log progress and failures that may arise in the
    135           // following code, but unfortunately the behavior of logging
    136           // is undefined in shutdown hooks.
    137           // This is because the logging code installs a shutdown hook of its
    138           // own. See Cleaner class inside {@link LogManager}.
    139           service.shutdown();
    140           service.awaitTermination(terminationTimeout, timeUnit);
    141         } catch (InterruptedException ignored) {
    142           // We're shutting down anyway, so just ignore.
    143         }
    144       }
    145     }));
    146   }
    147 
    148   /**
    149    * Converts the given ThreadPoolExecutor into an ExecutorService that exits
    150    * when the application is complete.  It does so by using daemon threads and
    151    * adding a shutdown hook to wait for their completion.
    152    *
    153    * <p>This method waits 120 seconds before continuing with JVM termination,
    154    * even if the executor has not finished its work.
    155    *
    156    * <p>This is mainly for fixed thread pools.
    157    * See {@link Executors#newFixedThreadPool(int)}.
    158    *
    159    * @param executor the executor to modify to make sure it exits when the
    160    *        application is finished
    161    * @return an unmodifiable version of the input which will not hang the JVM
    162    */
    163   @Beta
    164   public static ExecutorService getExitingExecutorService(
    165       ThreadPoolExecutor executor) {
    166     return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
    167   }
    168 
    169   /**
    170    * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that
    171    * exits when the application is complete.  It does so by using daemon threads
    172    * and adding a shutdown hook to wait for their completion.
    173    *
    174    * <p>This method waits 120 seconds before continuing with JVM termination,
    175    * even if the executor has not finished its work.
    176    *
    177    * <p>This is mainly for fixed thread pools.
    178    * See {@link Executors#newScheduledThreadPool(int)}.
    179    *
    180    * @param executor the executor to modify to make sure it exits when the
    181    *        application is finished
    182    * @return an unmodifiable version of the input which will not hang the JVM
    183    */
    184   @Beta
    185   public static ScheduledExecutorService getExitingScheduledExecutorService(
    186       ScheduledThreadPoolExecutor executor) {
    187     return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
    188   }
    189 
    190   /**
    191    * Creates an executor service that runs each task in the thread
    192    * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
    193    * applies both to individually submitted tasks and to collections of tasks
    194    * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
    195    * tasks will run serially on the calling thread.  Tasks are run to
    196    * completion before a {@code Future} is returned to the caller (unless the
    197    * executor has been shutdown).
    198    *
    199    * <p>Although all tasks are immediately executed in the thread that
    200    * submitted the task, this {@code ExecutorService} imposes a small
    201    * locking overhead on each task submission in order to implement shutdown
    202    * and termination behavior.
    203    *
    204    * <p>The implementation deviates from the {@code ExecutorService}
    205    * specification with regards to the {@code shutdownNow} method.  First,
    206    * "best-effort" with regards to canceling running tasks is implemented
    207    * as "no-effort".  No interrupts or other attempts are made to stop
    208    * threads executing tasks.  Second, the returned list will always be empty,
    209    * as any submitted task is considered to have started execution.
    210    * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
    211    * which are pending serial execution, even the subset of the tasks that
    212    * have not yet started execution.  It is unclear from the
    213    * {@code ExecutorService} specification if these should be included, and
    214    * it's much easier to implement the interpretation that they not be.
    215    * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
    216    * in concurrent calls to {@code invokeAll/invokeAny} throwing
    217    * RejectedExecutionException, although a subset of the tasks may already
    218    * have been executed.
    219    *
    220    * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility"
    221    *        >mostly source-compatible</a> since 3.0)
    222    */
    223   public static ListeningExecutorService sameThreadExecutor() {
    224     return new SameThreadExecutorService();
    225   }
    226 
    227   // See sameThreadExecutor javadoc for behavioral notes.
    228   private static class SameThreadExecutorService
    229       extends AbstractListeningExecutorService {
    230     /**
    231      * Lock used whenever accessing the state variables
    232      * (runningTasks, shutdown, terminationCondition) of the executor
    233      */
    234     private final Lock lock = new ReentrantLock();
    235 
    236     /** Signaled after the executor is shutdown and running tasks are done */
    237     private final Condition termination = lock.newCondition();
    238 
    239     /*
    240      * Conceptually, these two variables describe the executor being in
    241      * one of three states:
    242      *   - Active: shutdown == false
    243      *   - Shutdown: runningTasks > 0 and shutdown == true
    244      *   - Terminated: runningTasks == 0 and shutdown == true
    245      */
    246     private int runningTasks = 0;
    247     private boolean shutdown = false;
    248 
    249     @Override
    250     public void execute(Runnable command) {
    251       startTask();
    252       try {
    253         command.run();
    254       } finally {
    255         endTask();
    256       }
    257     }
    258 
    259     @Override
    260     public boolean isShutdown() {
    261       lock.lock();
    262       try {
    263         return shutdown;
    264       } finally {
    265         lock.unlock();
    266       }
    267     }
    268 
    269     @Override
    270     public void shutdown() {
    271       lock.lock();
    272       try {
    273         shutdown = true;
    274       } finally {
    275         lock.unlock();
    276       }
    277     }
    278 
    279     // See sameThreadExecutor javadoc for unusual behavior of this method.
    280     @Override
    281     public List<Runnable> shutdownNow() {
    282       shutdown();
    283       return Collections.emptyList();
    284     }
    285 
    286     @Override
    287     public boolean isTerminated() {
    288       lock.lock();
    289       try {
    290         return shutdown && runningTasks == 0;
    291       } finally {
    292         lock.unlock();
    293       }
    294     }
    295 
    296     @Override
    297     public boolean awaitTermination(long timeout, TimeUnit unit)
    298         throws InterruptedException {
    299       long nanos = unit.toNanos(timeout);
    300       lock.lock();
    301       try {
    302         for (;;) {
    303           if (isTerminated()) {
    304             return true;
    305           } else if (nanos <= 0) {
    306             return false;
    307           } else {
    308             nanos = termination.awaitNanos(nanos);
    309           }
    310         }
    311       } finally {
    312         lock.unlock();
    313       }
    314     }
    315 
    316     /**
    317      * Checks if the executor has been shut down and increments the running
    318      * task count.
    319      *
    320      * @throws RejectedExecutionException if the executor has been previously
    321      *         shutdown
    322      */
    323     private void startTask() {
    324       lock.lock();
    325       try {
    326         if (isShutdown()) {
    327           throw new RejectedExecutionException("Executor already shutdown");
    328         }
    329         runningTasks++;
    330       } finally {
    331         lock.unlock();
    332       }
    333     }
    334 
    335     /**
    336      * Decrements the running task count.
    337      */
    338     private void endTask() {
    339       lock.lock();
    340       try {
    341         runningTasks--;
    342         if (isTerminated()) {
    343           termination.signalAll();
    344         }
    345       } finally {
    346         lock.unlock();
    347       }
    348     }
    349   }
    350 
    351   /**
    352    * Creates an {@link ExecutorService} whose {@code submit} and {@code
    353    * invokeAll} methods submit {@link ListenableFutureTask} instances to the
    354    * given delegate executor. Those methods, as well as {@code execute} and
    355    * {@code invokeAny}, are implemented in terms of calls to {@code
    356    * delegate.execute}. All other methods are forwarded unchanged to the
    357    * delegate. This implies that the returned {@code ListeningExecutorService}
    358    * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code
    359    * invokeAny} methods, so any special handling of tasks must be implemented in
    360    * the delegate's {@code execute} method or by wrapping the returned {@code
    361    * ListeningExecutorService}.
    362    *
    363    * <p>If the delegate executor was already an instance of {@code
    364    * ListeningExecutorService}, it is returned untouched, and the rest of this
    365    * documentation does not apply.
    366    *
    367    * @since 10.0
    368    */
    369   public static ListeningExecutorService listeningDecorator(
    370       ExecutorService delegate) {
    371     return (delegate instanceof ListeningExecutorService)
    372         ? (ListeningExecutorService) delegate
    373         : (delegate instanceof ScheduledExecutorService)
    374         ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
    375         : new ListeningDecorator(delegate);
    376   }
    377 
    378   /**
    379    * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code
    380    * invokeAll} methods submit {@link ListenableFutureTask} instances to the
    381    * given delegate executor. Those methods, as well as {@code execute} and
    382    * {@code invokeAny}, are implemented in terms of calls to {@code
    383    * delegate.execute}. All other methods are forwarded unchanged to the
    384    * delegate. This implies that the returned {@code
    385    * SchedulingListeningExecutorService} never calls the delegate's {@code
    386    * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special
    387    * handling of tasks must be implemented in the delegate's {@code execute}
    388    * method or by wrapping the returned {@code
    389    * SchedulingListeningExecutorService}.
    390    *
    391    * <p>If the delegate executor was already an instance of {@code
    392    * ListeningScheduledExecutorService}, it is returned untouched, and the rest
    393    * of this documentation does not apply.
    394    *
    395    * @since 10.0
    396    */
    397   public static ListeningScheduledExecutorService listeningDecorator(
    398       ScheduledExecutorService delegate) {
    399     return (delegate instanceof ListeningScheduledExecutorService)
    400         ? (ListeningScheduledExecutorService) delegate
    401         : new ScheduledListeningDecorator(delegate);
    402   }
    403 
    404   private static class ListeningDecorator
    405       extends AbstractListeningExecutorService {
    406     final ExecutorService delegate;
    407 
    408     ListeningDecorator(ExecutorService delegate) {
    409       this.delegate = checkNotNull(delegate);
    410     }
    411 
    412     @Override
    413     public boolean awaitTermination(long timeout, TimeUnit unit)
    414         throws InterruptedException {
    415       return delegate.awaitTermination(timeout, unit);
    416     }
    417 
    418     @Override
    419     public boolean isShutdown() {
    420       return delegate.isShutdown();
    421     }
    422 
    423     @Override
    424     public boolean isTerminated() {
    425       return delegate.isTerminated();
    426     }
    427 
    428     @Override
    429     public void shutdown() {
    430       delegate.shutdown();
    431     }
    432 
    433     @Override
    434     public List<Runnable> shutdownNow() {
    435       return delegate.shutdownNow();
    436     }
    437 
    438     @Override
    439     public void execute(Runnable command) {
    440       delegate.execute(command);
    441     }
    442   }
    443 
    444   private static class ScheduledListeningDecorator
    445       extends ListeningDecorator implements ListeningScheduledExecutorService {
    446     final ScheduledExecutorService delegate;
    447 
    448     ScheduledListeningDecorator(ScheduledExecutorService delegate) {
    449       super(delegate);
    450       this.delegate = checkNotNull(delegate);
    451     }
    452 
    453     @Override
    454     public ScheduledFuture<?> schedule(
    455         Runnable command, long delay, TimeUnit unit) {
    456       return delegate.schedule(command, delay, unit);
    457     }
    458 
    459     @Override
    460     public <V> ScheduledFuture<V> schedule(
    461         Callable<V> callable, long delay, TimeUnit unit) {
    462       return delegate.schedule(callable, delay, unit);
    463     }
    464 
    465     @Override
    466     public ScheduledFuture<?> scheduleAtFixedRate(
    467         Runnable command, long initialDelay, long period, TimeUnit unit) {
    468       return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
    469     }
    470 
    471     @Override
    472     public ScheduledFuture<?> scheduleWithFixedDelay(
    473         Runnable command, long initialDelay, long delay, TimeUnit unit) {
    474       return delegate.scheduleWithFixedDelay(
    475           command, initialDelay, delay, unit);
    476     }
    477   }
    478 }
    479