Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2007 Google Inc.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import java.util.Collections;
     20 import java.util.List;
     21 import java.util.concurrent.AbstractExecutorService;
     22 import java.util.concurrent.ExecutorService;
     23 import java.util.concurrent.RejectedExecutionException;
     24 import java.util.concurrent.ScheduledExecutorService;
     25 import java.util.concurrent.ScheduledThreadPoolExecutor;
     26 import java.util.concurrent.ThreadFactory;
     27 import java.util.concurrent.ThreadPoolExecutor;
     28 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
     29 import java.util.concurrent.TimeUnit;
     30 import java.util.concurrent.locks.Condition;
     31 import java.util.concurrent.locks.Lock;
     32 import java.util.concurrent.locks.ReentrantLock;
     33 
     34 /**
     35  * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link
     36  * ExecutorService}, and {@link ThreadFactory}.
     37  *
     38  * @author Eric Fellheimer
     39  * @author Kyle Littlefield
     40  * @author Justin Mahoney
     41  * @since 2009.09.15 <b>tentative</b>
     42  */
     43 public class Executors {
     44 
     45   /**
     46    * Converts the given ThreadPoolExecutor into an ExecutorService that exits
     47    * when the application is complete.  It does so by using daemon threads and
     48    * adding a shutdown hook to wait for their completion.
     49    *
     50    * <p>This is mainly for fixed thread pools.
     51    * See {@link java.util.concurrent.Executors#newFixedThreadPool(int)}.
     52    *
     53    * @param executor the executor to modify to make sure it exits when the
     54    *        application is finished
     55    * @param terminationTimeout how long to wait for the executor to
     56    *        finish before terminating the JVM
     57    * @param timeUnit unit of time for the time parameter
     58    * @return an unmodifiable version of the input which will not hang the JVM
     59    */
     60   public static ExecutorService getExitingExecutorService(
     61       ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
     62     executor.setThreadFactory(daemonThreadFactory(executor.getThreadFactory()));
     63 
     64     ExecutorService service = java.util.concurrent.Executors
     65         .unconfigurableExecutorService(executor);
     66 
     67     addDelayedShutdownHook(service, terminationTimeout, timeUnit);
     68 
     69     return service;
     70   }
     71 
     72   /**
     73    * Converts the given ScheduledThreadPoolExecutor into a
     74    * ScheduledExecutorService that exits when the application is complete.  It
     75    * does so by using daemon threads and adding a shutdown hook to wait for
     76    * their completion.
     77    *
     78    * <p>This is mainly for fixed thread pools.
     79    * See {@link java.util.concurrent.Executors#newScheduledThreadPool(int)}.
     80    *
     81    * @param executor the executor to modify to make sure it exits when the
     82    *        application is finished
     83    * @param terminationTimeout how long to wait for the executor to
     84    *        finish before terminating the JVM
     85    * @param timeUnit unit of time for the time parameter
     86    * @return an unmodifiable version of the input which will not hang the JVM
     87    */
     88   public static ScheduledExecutorService getExitingScheduledExecutorService(
     89       ScheduledThreadPoolExecutor executor, long terminationTimeout,
     90       TimeUnit timeUnit) {
     91     executor.setThreadFactory(daemonThreadFactory(executor.getThreadFactory()));
     92 
     93     ScheduledExecutorService service = java.util.concurrent.Executors
     94         .unconfigurableScheduledExecutorService(executor);
     95 
     96     addDelayedShutdownHook(service, terminationTimeout, timeUnit);
     97 
     98     return service;
     99   }
    100 
    101   /**
    102    * Add a shutdown hook to wait for thread completion in the given
    103    * {@link ExecutorService service}.  This is useful if the given service uses
    104    * daemon threads, and we want to keep the JVM from exiting immediately on
    105    * shutdown, instead giving these daemon threads a chance to terminate
    106    * normally.
    107    * @param service ExecutorService which uses daemon threads
    108    * @param terminationTimeout how long to wait for the executor to finish
    109    *        before terminating the JVM
    110    * @param timeUnit unit of time for the time parameter
    111    */
    112   public static void addDelayedShutdownHook(
    113       final ExecutorService service, final long terminationTimeout,
    114       final TimeUnit timeUnit) {
    115     Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    116       public void run() {
    117         try {
    118           // We'd like to log progress and failures that may arise in the
    119           // following code, but unfortunately the behavior of logging
    120           // is undefined in shutdown hooks.
    121           // This is because the logging code installs a shutdown hook of its
    122           // own. See Cleaner class inside {@link LogManager}.
    123           service.shutdown();
    124           service.awaitTermination(terminationTimeout, timeUnit);
    125         } catch (InterruptedException ignored) {
    126           // We're shutting down anyway, so just ignore.
    127         }
    128       }
    129     }));
    130   }
    131 
    132   /**
    133    * Converts the given ThreadPoolExecutor into an ExecutorService that exits
    134    * when the application is complete.  It does so by using daemon threads and
    135    * adding a shutdown hook to wait for their completion.
    136    *
    137    * <p>This method waits 120 seconds before continuing with JVM termination,
    138    * even if the executor has not finished its work.
    139    *
    140    * <p>This is mainly for fixed thread pools.
    141    * See {@link java.util.concurrent.Executors#newFixedThreadPool(int)}.
    142    *
    143    * @param executor the executor to modify to make sure it exits when the
    144    *        application is finished
    145    * @return an unmodifiable version of the input which will not hang the JVM
    146    */
    147   public static ExecutorService getExitingExecutorService(
    148       ThreadPoolExecutor executor) {
    149     return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
    150   }
    151 
    152   /**
    153    * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that
    154    * exits when the application is complete.  It does so by using daemon threads
    155    * and adding a shutdown hook to wait for their completion.
    156    *
    157    * <p>This method waits 120 seconds before continuing with JVM termination,
    158    * even if the executor has not finished its work.
    159    *
    160    * <p>This is mainly for fixed thread pools.
    161    * See {@link java.util.concurrent.Executors#newScheduledThreadPool(int)}.
    162    *
    163    * @param executor the executor to modify to make sure it exits when the
    164    *        application is finished
    165    * @return an unmodifiable version of the input which will not hang the JVM
    166    */
    167   public static ScheduledExecutorService getExitingScheduledExecutorService(
    168       ScheduledThreadPoolExecutor executor) {
    169     return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
    170   }
    171 
    172   /**
    173    * Returns a {@link ThreadFactory} which creates daemon threads. This is
    174    * implemented by wrapping {@link
    175    * java.util.concurrent.Executors#defaultThreadFactory()}, marking all new
    176    * threads as daemon threads
    177    *
    178    * @return a {@link ThreadFactory} which creates daemon threads
    179    */
    180   public static ThreadFactory daemonThreadFactory() {
    181     return daemonThreadFactory(
    182         java.util.concurrent.Executors.defaultThreadFactory());
    183   }
    184 
    185   /**
    186    * Wraps another {@link ThreadFactory}, making all new threads daemon threads.
    187    *
    188    * @param factory the {@link ThreadFactory} used to generate new threads
    189    * @return a new {@link ThreadFactory} backed by {@code factory} whose created
    190    *         threads are all daemon threads
    191    */
    192   public static ThreadFactory daemonThreadFactory(ThreadFactory factory) {
    193     return new DaemonThreadFactory(factory);
    194   }
    195 
    196   /**
    197    * Creates an executor service that runs each task in the thread
    198    * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
    199    * applies both to individually submitted tasks and to collections of tasks
    200    * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
    201    * tasks will run serially on the calling thread.  Tasks are run to
    202    * completion before a {@code Future} is returned to the caller (unless the
    203    * executor has been shutdown).
    204    *
    205    * <p>Although all tasks are immediately executed in the thread that
    206    * submitted the task, this {@code ExecutorService} imposes a small
    207    * locking overhead on each task submission in order to implement shutdown
    208    * and termination behavior.
    209    *
    210    * <p>The implementation deviates from the {@code ExecutorService}
    211    * specification with regards to the {@code shutdownNow} method.  First,
    212    * "best-effort" with regards to canceling running tasks is implemented
    213    * as "no-effort".  No interrupts or other attempts are made to stop
    214    * threads executing tasks.  Second, the returned list will always be empty,
    215    * as any submitted task is considered to have started execution.
    216    * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
    217    * which are pending serial execution, even the subset of the tasks that
    218    * have not yet started execution.  It is unclear from the
    219    * {@code ExecutorService} specification if these should be included, and
    220    * it's much easier to implement the interpretation that they not be.
    221    * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
    222    * in concurrent calls to {@code invokeAll/invokeAny} throwing
    223    * RejectedExecutionException, although a subset of the tasks may already
    224    * have been executed.
    225    */
    226   public static ExecutorService sameThreadExecutor() {
    227     return new SameThreadExecutorService();
    228   }
    229 
    230   // See sameThreadExecutor javadoc for behavioral notes.
    231   private static class SameThreadExecutorService extends AbstractExecutorService {
    232 
    233     /**
    234      * Lock used whenever accessing the state variables
    235      * (runningTasks, shutdown, terminationCondition) of the executor
    236      */
    237     private final Lock lock = new ReentrantLock();
    238 
    239     /** Signaled after the executor is shutdown and running tasks are done */
    240     private final Condition termination = lock.newCondition();
    241 
    242     /*
    243      * Conceptually, these two variables describe the executor being in
    244      * one of three states:
    245      *   - Active: shutdown == false
    246      *   - Shutdown: runningTasks > 0 and shutdown == true
    247      *   - Terminated: runningTasks == 0 and shutdown == true
    248      */
    249     private int runningTasks = 0;
    250     private boolean shutdown = false;
    251 
    252     /*@Override*/
    253     public void execute(Runnable command) {
    254       startTask();
    255       try {
    256         command.run();
    257       } finally {
    258         endTask();
    259       }
    260     }
    261 
    262     /*@Override*/
    263     public boolean isShutdown() {
    264       lock.lock();
    265       try {
    266         return shutdown;
    267       } finally {
    268         lock.unlock();
    269       }
    270     }
    271 
    272     /*@Override*/
    273     public void shutdown() {
    274       lock.lock();
    275       try {
    276         shutdown = true;
    277       } finally {
    278         lock.unlock();
    279       }
    280     }
    281 
    282     // See sameThreadExecutor javadoc for unusual behavior of this method.
    283     /*@Override*/
    284     public List<Runnable> shutdownNow() {
    285       shutdown();
    286       return Collections.emptyList();
    287     }
    288 
    289     /*@Override*/
    290     public boolean isTerminated() {
    291       lock.lock();
    292       try {
    293         return shutdown && runningTasks == 0;
    294       } finally {
    295         lock.unlock();
    296       }
    297     }
    298 
    299     /*@Override*/
    300     public boolean awaitTermination(long timeout, TimeUnit unit)
    301         throws InterruptedException {
    302       long nanos = unit.toNanos(timeout);
    303       lock.lock();
    304       try {
    305         for (;;) {
    306           if (isTerminated()) {
    307             return true;
    308           } else if (nanos <= 0) {
    309             return false;
    310           } else {
    311             nanos = termination.awaitNanos(nanos);
    312           }
    313         }
    314       } finally {
    315         lock.unlock();
    316       }
    317     }
    318 
    319     /**
    320      * Checks if the executor has been shut down and increments the running
    321      * task count.
    322      *
    323      * @throws RejectedExecutionException if the executor has been previously
    324      *         shutdown
    325      */
    326     private void startTask() {
    327       lock.lock();
    328       try {
    329         if (isShutdown()) {
    330           throw new RejectedExecutionException("Executor already shutdown");
    331         }
    332         runningTasks++;
    333       } finally {
    334         lock.unlock();
    335       }
    336     }
    337 
    338     /**
    339      * Decrements the running task count.
    340      */
    341     private void endTask() {
    342       lock.lock();
    343       try {
    344         runningTasks--;
    345         if (isTerminated()) {
    346           termination.signalAll();
    347         }
    348       } finally {
    349         lock.unlock();
    350       }
    351     }
    352   }
    353 }
    354