1 /* 2 * Copyright (C) 2007 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import java.util.Collections; 20 import java.util.List; 21 import java.util.concurrent.AbstractExecutorService; 22 import java.util.concurrent.ExecutorService; 23 import java.util.concurrent.RejectedExecutionException; 24 import java.util.concurrent.ScheduledExecutorService; 25 import java.util.concurrent.ScheduledThreadPoolExecutor; 26 import java.util.concurrent.ThreadFactory; 27 import java.util.concurrent.ThreadPoolExecutor; 28 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; 29 import java.util.concurrent.TimeUnit; 30 import java.util.concurrent.locks.Condition; 31 import java.util.concurrent.locks.Lock; 32 import java.util.concurrent.locks.ReentrantLock; 33 34 /** 35 * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link 36 * ExecutorService}, and {@link ThreadFactory}. 37 * 38 * @author Eric Fellheimer 39 * @author Kyle Littlefield 40 * @author Justin Mahoney 41 * @since 2009.09.15 <b>tentative</b> 42 */ 43 public class Executors { 44 45 /** 46 * Converts the given ThreadPoolExecutor into an ExecutorService that exits 47 * when the application is complete. It does so by using daemon threads and 48 * adding a shutdown hook to wait for their completion. 49 * 50 * <p>This is mainly for fixed thread pools. 51 * See {@link java.util.concurrent.Executors#newFixedThreadPool(int)}. 52 * 53 * @param executor the executor to modify to make sure it exits when the 54 * application is finished 55 * @param terminationTimeout how long to wait for the executor to 56 * finish before terminating the JVM 57 * @param timeUnit unit of time for the time parameter 58 * @return an unmodifiable version of the input which will not hang the JVM 59 */ 60 public static ExecutorService getExitingExecutorService( 61 ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) { 62 executor.setThreadFactory(daemonThreadFactory(executor.getThreadFactory())); 63 64 ExecutorService service = java.util.concurrent.Executors 65 .unconfigurableExecutorService(executor); 66 67 addDelayedShutdownHook(service, terminationTimeout, timeUnit); 68 69 return service; 70 } 71 72 /** 73 * Converts the given ScheduledThreadPoolExecutor into a 74 * ScheduledExecutorService that exits when the application is complete. It 75 * does so by using daemon threads and adding a shutdown hook to wait for 76 * their completion. 77 * 78 * <p>This is mainly for fixed thread pools. 79 * See {@link java.util.concurrent.Executors#newScheduledThreadPool(int)}. 80 * 81 * @param executor the executor to modify to make sure it exits when the 82 * application is finished 83 * @param terminationTimeout how long to wait for the executor to 84 * finish before terminating the JVM 85 * @param timeUnit unit of time for the time parameter 86 * @return an unmodifiable version of the input which will not hang the JVM 87 */ 88 public static ScheduledExecutorService getExitingScheduledExecutorService( 89 ScheduledThreadPoolExecutor executor, long terminationTimeout, 90 TimeUnit timeUnit) { 91 executor.setThreadFactory(daemonThreadFactory(executor.getThreadFactory())); 92 93 ScheduledExecutorService service = java.util.concurrent.Executors 94 .unconfigurableScheduledExecutorService(executor); 95 96 addDelayedShutdownHook(service, terminationTimeout, timeUnit); 97 98 return service; 99 } 100 101 /** 102 * Add a shutdown hook to wait for thread completion in the given 103 * {@link ExecutorService service}. This is useful if the given service uses 104 * daemon threads, and we want to keep the JVM from exiting immediately on 105 * shutdown, instead giving these daemon threads a chance to terminate 106 * normally. 107 * @param service ExecutorService which uses daemon threads 108 * @param terminationTimeout how long to wait for the executor to finish 109 * before terminating the JVM 110 * @param timeUnit unit of time for the time parameter 111 */ 112 public static void addDelayedShutdownHook( 113 final ExecutorService service, final long terminationTimeout, 114 final TimeUnit timeUnit) { 115 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 116 public void run() { 117 try { 118 // We'd like to log progress and failures that may arise in the 119 // following code, but unfortunately the behavior of logging 120 // is undefined in shutdown hooks. 121 // This is because the logging code installs a shutdown hook of its 122 // own. See Cleaner class inside {@link LogManager}. 123 service.shutdown(); 124 service.awaitTermination(terminationTimeout, timeUnit); 125 } catch (InterruptedException ignored) { 126 // We're shutting down anyway, so just ignore. 127 } 128 } 129 })); 130 } 131 132 /** 133 * Converts the given ThreadPoolExecutor into an ExecutorService that exits 134 * when the application is complete. It does so by using daemon threads and 135 * adding a shutdown hook to wait for their completion. 136 * 137 * <p>This method waits 120 seconds before continuing with JVM termination, 138 * even if the executor has not finished its work. 139 * 140 * <p>This is mainly for fixed thread pools. 141 * See {@link java.util.concurrent.Executors#newFixedThreadPool(int)}. 142 * 143 * @param executor the executor to modify to make sure it exits when the 144 * application is finished 145 * @return an unmodifiable version of the input which will not hang the JVM 146 */ 147 public static ExecutorService getExitingExecutorService( 148 ThreadPoolExecutor executor) { 149 return getExitingExecutorService(executor, 120, TimeUnit.SECONDS); 150 } 151 152 /** 153 * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that 154 * exits when the application is complete. It does so by using daemon threads 155 * and adding a shutdown hook to wait for their completion. 156 * 157 * <p>This method waits 120 seconds before continuing with JVM termination, 158 * even if the executor has not finished its work. 159 * 160 * <p>This is mainly for fixed thread pools. 161 * See {@link java.util.concurrent.Executors#newScheduledThreadPool(int)}. 162 * 163 * @param executor the executor to modify to make sure it exits when the 164 * application is finished 165 * @return an unmodifiable version of the input which will not hang the JVM 166 */ 167 public static ScheduledExecutorService getExitingScheduledExecutorService( 168 ScheduledThreadPoolExecutor executor) { 169 return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS); 170 } 171 172 /** 173 * Returns a {@link ThreadFactory} which creates daemon threads. This is 174 * implemented by wrapping {@link 175 * java.util.concurrent.Executors#defaultThreadFactory()}, marking all new 176 * threads as daemon threads 177 * 178 * @return a {@link ThreadFactory} which creates daemon threads 179 */ 180 public static ThreadFactory daemonThreadFactory() { 181 return daemonThreadFactory( 182 java.util.concurrent.Executors.defaultThreadFactory()); 183 } 184 185 /** 186 * Wraps another {@link ThreadFactory}, making all new threads daemon threads. 187 * 188 * @param factory the {@link ThreadFactory} used to generate new threads 189 * @return a new {@link ThreadFactory} backed by {@code factory} whose created 190 * threads are all daemon threads 191 */ 192 public static ThreadFactory daemonThreadFactory(ThreadFactory factory) { 193 return new DaemonThreadFactory(factory); 194 } 195 196 /** 197 * Creates an executor service that runs each task in the thread 198 * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy} This 199 * applies both to individually submitted tasks and to collections of tasks 200 * submitted via {@code invokeAll} or {@code invokeAny}. In the latter case, 201 * tasks will run serially on the calling thread. Tasks are run to 202 * completion before a {@code Future} is returned to the caller (unless the 203 * executor has been shutdown). 204 * 205 * <p>Although all tasks are immediately executed in the thread that 206 * submitted the task, this {@code ExecutorService} imposes a small 207 * locking overhead on each task submission in order to implement shutdown 208 * and termination behavior. 209 * 210 * <p>The implementation deviates from the {@code ExecutorService} 211 * specification with regards to the {@code shutdownNow} method. First, 212 * "best-effort" with regards to canceling running tasks is implemented 213 * as "no-effort". No interrupts or other attempts are made to stop 214 * threads executing tasks. Second, the returned list will always be empty, 215 * as any submitted task is considered to have started execution. 216 * This applies also to tasks given to {@code invokeAll} or {@code invokeAny} 217 * which are pending serial execution, even the subset of the tasks that 218 * have not yet started execution. It is unclear from the 219 * {@code ExecutorService} specification if these should be included, and 220 * it's much easier to implement the interpretation that they not be. 221 * Finally, a call to {@code shutdown} or {@code shutdownNow} may result 222 * in concurrent calls to {@code invokeAll/invokeAny} throwing 223 * RejectedExecutionException, although a subset of the tasks may already 224 * have been executed. 225 */ 226 public static ExecutorService sameThreadExecutor() { 227 return new SameThreadExecutorService(); 228 } 229 230 // See sameThreadExecutor javadoc for behavioral notes. 231 private static class SameThreadExecutorService extends AbstractExecutorService { 232 233 /** 234 * Lock used whenever accessing the state variables 235 * (runningTasks, shutdown, terminationCondition) of the executor 236 */ 237 private final Lock lock = new ReentrantLock(); 238 239 /** Signaled after the executor is shutdown and running tasks are done */ 240 private final Condition termination = lock.newCondition(); 241 242 /* 243 * Conceptually, these two variables describe the executor being in 244 * one of three states: 245 * - Active: shutdown == false 246 * - Shutdown: runningTasks > 0 and shutdown == true 247 * - Terminated: runningTasks == 0 and shutdown == true 248 */ 249 private int runningTasks = 0; 250 private boolean shutdown = false; 251 252 /*@Override*/ 253 public void execute(Runnable command) { 254 startTask(); 255 try { 256 command.run(); 257 } finally { 258 endTask(); 259 } 260 } 261 262 /*@Override*/ 263 public boolean isShutdown() { 264 lock.lock(); 265 try { 266 return shutdown; 267 } finally { 268 lock.unlock(); 269 } 270 } 271 272 /*@Override*/ 273 public void shutdown() { 274 lock.lock(); 275 try { 276 shutdown = true; 277 } finally { 278 lock.unlock(); 279 } 280 } 281 282 // See sameThreadExecutor javadoc for unusual behavior of this method. 283 /*@Override*/ 284 public List<Runnable> shutdownNow() { 285 shutdown(); 286 return Collections.emptyList(); 287 } 288 289 /*@Override*/ 290 public boolean isTerminated() { 291 lock.lock(); 292 try { 293 return shutdown && runningTasks == 0; 294 } finally { 295 lock.unlock(); 296 } 297 } 298 299 /*@Override*/ 300 public boolean awaitTermination(long timeout, TimeUnit unit) 301 throws InterruptedException { 302 long nanos = unit.toNanos(timeout); 303 lock.lock(); 304 try { 305 for (;;) { 306 if (isTerminated()) { 307 return true; 308 } else if (nanos <= 0) { 309 return false; 310 } else { 311 nanos = termination.awaitNanos(nanos); 312 } 313 } 314 } finally { 315 lock.unlock(); 316 } 317 } 318 319 /** 320 * Checks if the executor has been shut down and increments the running 321 * task count. 322 * 323 * @throws RejectedExecutionException if the executor has been previously 324 * shutdown 325 */ 326 private void startTask() { 327 lock.lock(); 328 try { 329 if (isShutdown()) { 330 throw new RejectedExecutionException("Executor already shutdown"); 331 } 332 runningTasks++; 333 } finally { 334 lock.unlock(); 335 } 336 } 337 338 /** 339 * Decrements the running task count. 340 */ 341 private void endTask() { 342 lock.lock(); 343 try { 344 runningTasks--; 345 if (isTerminated()) { 346 termination.signalAll(); 347 } 348 } finally { 349 lock.unlock(); 350 } 351 } 352 } 353 } 354