1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import java.util.ArrayList; 10 import java.util.Arrays; 11 import java.util.Collection; 12 import java.util.Collections; 13 import java.util.List; 14 import java.util.Random; 15 import java.util.concurrent.AbstractExecutorService; 16 import java.util.concurrent.Callable; 17 import java.util.concurrent.ExecutorService; 18 import java.util.concurrent.Future; 19 import java.util.concurrent.RejectedExecutionException; 20 import java.util.concurrent.RunnableFuture; 21 import java.util.concurrent.TimeUnit; 22 import java.util.concurrent.atomic.AtomicInteger; 23 import java.util.concurrent.locks.LockSupport; 24 import java.util.concurrent.locks.ReentrantLock; 25 import java.util.concurrent.locks.Condition; 26 import libcore.util.SneakyThrow; 27 28 // BEGIN android-note 29 // removed security manager docs 30 // END android-note 31 32 /** 33 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 34 * A {@code ForkJoinPool} provides the entry point for submissions 35 * from non-{@code ForkJoinTask} clients, as well as management and 36 * monitoring operations. 37 * 38 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 39 * ExecutorService} mainly by virtue of employing 40 * <em>work-stealing</em>: all threads in the pool attempt to find and 41 * execute subtasks created by other active tasks (eventually blocking 42 * waiting for work if none exist). This enables efficient processing 43 * when most tasks spawn other subtasks (as do most {@code 44 * ForkJoinTask}s). When setting <em>asyncMode</em> to true in 45 * constructors, {@code ForkJoinPool}s may also be appropriate for use 46 * with event-style tasks that are never joined. 47 * 48 * <p>A {@code ForkJoinPool} is constructed with a given target 49 * parallelism level; by default, equal to the number of available 50 * processors. The pool attempts to maintain enough active (or 51 * available) threads by dynamically adding, suspending, or resuming 52 * internal worker threads, even if some tasks are stalled waiting to 53 * join others. However, no such adjustments are guaranteed in the 54 * face of blocked IO or other unmanaged synchronization. The nested 55 * {@link ManagedBlocker} interface enables extension of the kinds of 56 * synchronization accommodated. 57 * 58 * <p>In addition to execution and lifecycle control methods, this 59 * class provides status check methods (for example 60 * {@link #getStealCount}) that are intended to aid in developing, 61 * tuning, and monitoring fork/join applications. Also, method 62 * {@link #toString} returns indications of pool state in a 63 * convenient form for informal monitoring. 64 * 65 * <p> As is the case with other ExecutorServices, there are three 66 * main task execution methods summarized in the following 67 * table. These are designed to be used by clients not already engaged 68 * in fork/join computations in the current pool. The main forms of 69 * these methods accept instances of {@code ForkJoinTask}, but 70 * overloaded forms also allow mixed execution of plain {@code 71 * Runnable}- or {@code Callable}- based activities as well. However, 72 * tasks that are already executing in a pool should normally 73 * <em>NOT</em> use these pool execution methods, but instead use the 74 * within-computation forms listed in the table. 75 * 76 * <table BORDER CELLPADDING=3 CELLSPACING=1> 77 * <tr> 78 * <td></td> 79 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td> 80 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td> 81 * </tr> 82 * <tr> 83 * <td> <b>Arrange async execution</td> 84 * <td> {@link #execute(ForkJoinTask)}</td> 85 * <td> {@link ForkJoinTask#fork}</td> 86 * </tr> 87 * <tr> 88 * <td> <b>Await and obtain result</td> 89 * <td> {@link #invoke(ForkJoinTask)}</td> 90 * <td> {@link ForkJoinTask#invoke}</td> 91 * </tr> 92 * <tr> 93 * <td> <b>Arrange exec and obtain Future</td> 94 * <td> {@link #submit(ForkJoinTask)}</td> 95 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 96 * </tr> 97 * </table> 98 * 99 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is 100 * used for all parallel task execution in a program or subsystem. 101 * Otherwise, use would not usually outweigh the construction and 102 * bookkeeping overhead of creating a large set of threads. For 103 * example, a common pool could be used for the {@code SortTasks} 104 * illustrated in {@link RecursiveAction}. Because {@code 105 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon 106 * daemon} mode, there is typically no need to explicitly {@link 107 * #shutdown} such a pool upon program exit. 108 * 109 * <pre> {@code 110 * static final ForkJoinPool mainPool = new ForkJoinPool(); 111 * ... 112 * public void sort(long[] array) { 113 * mainPool.invoke(new SortTask(array, 0, array.length)); 114 * }}</pre> 115 * 116 * <p><b>Implementation notes</b>: This implementation restricts the 117 * maximum number of running threads to 32767. Attempts to create 118 * pools with greater than the maximum number result in 119 * {@code IllegalArgumentException}. 120 * 121 * <p>This implementation rejects submitted tasks (that is, by throwing 122 * {@link RejectedExecutionException}) only when the pool is shut down 123 * or internal resources have been exhausted. 124 * 125 * @since 1.7 126 * @hide 127 * @author Doug Lea 128 */ 129 public class ForkJoinPool extends AbstractExecutorService { 130 131 /* 132 * Implementation Overview 133 * 134 * This class provides the central bookkeeping and control for a 135 * set of worker threads: Submissions from non-FJ threads enter 136 * into a submission queue. Workers take these tasks and typically 137 * split them into subtasks that may be stolen by other workers. 138 * Preference rules give first priority to processing tasks from 139 * their own queues (LIFO or FIFO, depending on mode), then to 140 * randomized FIFO steals of tasks in other worker queues, and 141 * lastly to new submissions. 142 * 143 * The main throughput advantages of work-stealing stem from 144 * decentralized control -- workers mostly take tasks from 145 * themselves or each other. We cannot negate this in the 146 * implementation of other management responsibilities. The main 147 * tactic for avoiding bottlenecks is packing nearly all 148 * essentially atomic control state into a single 64bit volatile 149 * variable ("ctl"). This variable is read on the order of 10-100 150 * times as often as it is modified (always via CAS). (There is 151 * some additional control state, for example variable "shutdown" 152 * for which we can cope with uncoordinated updates.) This 153 * streamlines synchronization and control at the expense of messy 154 * constructions needed to repack status bits upon updates. 155 * Updates tend not to contend with each other except during 156 * bursts while submitted tasks begin or end. In some cases when 157 * they do contend, threads can instead do something else 158 * (usually, scan for tasks) until contention subsides. 159 * 160 * To enable packing, we restrict maximum parallelism to (1<<15)-1 161 * (which is far in excess of normal operating range) to allow 162 * ids, counts, and their negations (used for thresholding) to fit 163 * into 16bit fields. 164 * 165 * Recording Workers. Workers are recorded in the "workers" array 166 * that is created upon pool construction and expanded if (rarely) 167 * necessary. This is an array as opposed to some other data 168 * structure to support index-based random steals by workers. 169 * Updates to the array recording new workers and unrecording 170 * terminated ones are protected from each other by a seqLock 171 * (scanGuard) but the array is otherwise concurrently readable, 172 * and accessed directly by workers. To simplify index-based 173 * operations, the array size is always a power of two, and all 174 * readers must tolerate null slots. To avoid flailing during 175 * start-up, the array is presized to hold twice #parallelism 176 * workers (which is unlikely to need further resizing during 177 * execution). But to avoid dealing with so many null slots, 178 * variable scanGuard includes a mask for the nearest power of two 179 * that contains all current workers. All worker thread creation 180 * is on-demand, triggered by task submissions, replacement of 181 * terminated workers, and/or compensation for blocked 182 * workers. However, all other support code is set up to work with 183 * other policies. To ensure that we do not hold on to worker 184 * references that would prevent GC, ALL accesses to workers are 185 * via indices into the workers array (which is one source of some 186 * of the messy code constructions here). In essence, the workers 187 * array serves as a weak reference mechanism. Thus for example 188 * the wait queue field of ctl stores worker indices, not worker 189 * references. Access to the workers in associated methods (for 190 * example signalWork) must both index-check and null-check the 191 * IDs. All such accesses ignore bad IDs by returning out early 192 * from what they are doing, since this can only be associated 193 * with termination, in which case it is OK to give up. 194 * 195 * All uses of the workers array, as well as queue arrays, check 196 * that the array is non-null (even if previously non-null). This 197 * allows nulling during termination, which is currently not 198 * necessary, but remains an option for resource-revocation-based 199 * shutdown schemes. 200 * 201 * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot 202 * let workers spin indefinitely scanning for tasks when none can 203 * be found immediately, and we cannot start/resume workers unless 204 * there appear to be tasks available. On the other hand, we must 205 * quickly prod them into action when new tasks are submitted or 206 * generated. We park/unpark workers after placing in an event 207 * wait queue when they cannot find work. This "queue" is actually 208 * a simple Treiber stack, headed by the "id" field of ctl, plus a 209 * 15bit counter value to both wake up waiters (by advancing their 210 * count) and avoid ABA effects. Successors are held in worker 211 * field "nextWait". Queuing deals with several intrinsic races, 212 * mainly that a task-producing thread can miss seeing (and 213 * signalling) another thread that gave up looking for work but 214 * has not yet entered the wait queue. We solve this by requiring 215 * a full sweep of all workers both before (in scan()) and after 216 * (in tryAwaitWork()) a newly waiting worker is added to the wait 217 * queue. During a rescan, the worker might release some other 218 * queued worker rather than itself, which has the same net 219 * effect. Because enqueued workers may actually be rescanning 220 * rather than waiting, we set and clear the "parked" field of 221 * ForkJoinWorkerThread to reduce unnecessary calls to unpark. 222 * (Use of the parked field requires a secondary recheck to avoid 223 * missed signals.) 224 * 225 * Signalling. We create or wake up workers only when there 226 * appears to be at least one task they might be able to find and 227 * execute. When a submission is added or another worker adds a 228 * task to a queue that previously had two or fewer tasks, they 229 * signal waiting workers (or trigger creation of new ones if 230 * fewer than the given parallelism level -- see signalWork). 231 * These primary signals are buttressed by signals during rescans 232 * as well as those performed when a worker steals a task and 233 * notices that there are more tasks too; together these cover the 234 * signals needed in cases when more than two tasks are pushed 235 * but untaken. 236 * 237 * Trimming workers. To release resources after periods of lack of 238 * use, a worker starting to wait when the pool is quiescent will 239 * time out and terminate if the pool has remained quiescent for 240 * SHRINK_RATE nanosecs. This will slowly propagate, eventually 241 * terminating all workers after long periods of non-use. 242 * 243 * Submissions. External submissions are maintained in an 244 * array-based queue that is structured identically to 245 * ForkJoinWorkerThread queues except for the use of 246 * submissionLock in method addSubmission. Unlike the case for 247 * worker queues, multiple external threads can add new 248 * submissions, so adding requires a lock. 249 * 250 * Compensation. Beyond work-stealing support and lifecycle 251 * control, the main responsibility of this framework is to take 252 * actions when one worker is waiting to join a task stolen (or 253 * always held by) another. Because we are multiplexing many 254 * tasks on to a pool of workers, we can't just let them block (as 255 * in Thread.join). We also cannot just reassign the joiner's 256 * run-time stack with another and replace it later, which would 257 * be a form of "continuation", that even if possible is not 258 * necessarily a good idea since we sometimes need both an 259 * unblocked task and its continuation to progress. Instead we 260 * combine two tactics: 261 * 262 * Helping: Arranging for the joiner to execute some task that it 263 * would be running if the steal had not occurred. Method 264 * ForkJoinWorkerThread.joinTask tracks joining->stealing 265 * links to try to find such a task. 266 * 267 * Compensating: Unless there are already enough live threads, 268 * method tryPreBlock() may create or re-activate a spare 269 * thread to compensate for blocked joiners until they 270 * unblock. 271 * 272 * The ManagedBlocker extension API can't use helping so relies 273 * only on compensation in method awaitBlocker. 274 * 275 * It is impossible to keep exactly the target parallelism number 276 * of threads running at any given time. Determining the 277 * existence of conservatively safe helping targets, the 278 * availability of already-created spares, and the apparent need 279 * to create new spares are all racy and require heuristic 280 * guidance, so we rely on multiple retries of each. Currently, 281 * in keeping with on-demand signalling policy, we compensate only 282 * if blocking would leave less than one active (non-waiting, 283 * non-blocked) worker. Additionally, to avoid some false alarms 284 * due to GC, lagging counters, system activity, etc, compensated 285 * blocking for joins is only attempted after rechecks stabilize 286 * (retries are interspersed with Thread.yield, for good 287 * citizenship). The variable blockedCount, incremented before 288 * blocking and decremented after, is sometimes needed to 289 * distinguish cases of waiting for work vs blocking on joins or 290 * other managed sync. Both cases are equivalent for most pool 291 * control, so we can update non-atomically. (Additionally, 292 * contention on blockedCount alleviates some contention on ctl). 293 * 294 * Shutdown and Termination. A call to shutdownNow atomically sets 295 * the ctl stop bit and then (non-atomically) sets each workers 296 * "terminate" status, cancels all unprocessed tasks, and wakes up 297 * all waiting workers. Detecting whether termination should 298 * commence after a non-abrupt shutdown() call requires more work 299 * and bookkeeping. We need consensus about quiescence (i.e., that 300 * there is no more work) which is reflected in active counts so 301 * long as there are no current blockers, as well as possible 302 * re-evaluations during independent changes in blocking or 303 * quiescing workers. 304 * 305 * Style notes: There is a lot of representation-level coupling 306 * among classes ForkJoinPool, ForkJoinWorkerThread, and 307 * ForkJoinTask. Most fields of ForkJoinWorkerThread maintain 308 * data structures managed by ForkJoinPool, so are directly 309 * accessed. Conversely we allow access to "workers" array by 310 * workers, and direct access to ForkJoinTask.status by both 311 * ForkJoinPool and ForkJoinWorkerThread. There is little point 312 * trying to reduce this, since any associated future changes in 313 * representations will need to be accompanied by algorithmic 314 * changes anyway. All together, these low-level implementation 315 * choices produce as much as a factor of 4 performance 316 * improvement compared to naive implementations, and enable the 317 * processing of billions of tasks per second, at the expense of 318 * some ugliness. 319 * 320 * Methods signalWork() and scan() are the main bottlenecks so are 321 * especially heavily micro-optimized/mangled. There are lots of 322 * inline assignments (of form "while ((local = field) != 0)") 323 * which are usually the simplest way to ensure the required read 324 * orderings (which are sometimes critical). This leads to a 325 * "C"-like style of listing declarations of these locals at the 326 * heads of methods or blocks. There are several occurrences of 327 * the unusual "do {} while (!cas...)" which is the simplest way 328 * to force an update of a CAS'ed variable. There are also other 329 * coding oddities that help some methods perform reasonably even 330 * when interpreted (not compiled). 331 * 332 * The order of declarations in this file is: (1) declarations of 333 * statics (2) fields (along with constants used when unpacking 334 * some of them), listed in an order that tends to reduce 335 * contention among them a bit under most JVMs. (3) internal 336 * control methods (4) callbacks and other support for 337 * ForkJoinTask and ForkJoinWorkerThread classes, (5) exported 338 * methods (plus a few little helpers). (6) static block 339 * initializing all statics in a minimally dependent order. 340 */ 341 342 /** 343 * Factory for creating new {@link ForkJoinWorkerThread}s. 344 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 345 * for {@code ForkJoinWorkerThread} subclasses that extend base 346 * functionality or initialize threads with different contexts. 347 */ 348 public static interface ForkJoinWorkerThreadFactory { 349 /** 350 * Returns a new worker thread operating in the given pool. 351 * 352 * @param pool the pool this thread works in 353 * @throws NullPointerException if the pool is null 354 */ 355 public ForkJoinWorkerThread newThread(ForkJoinPool pool); 356 } 357 358 /** 359 * Default ForkJoinWorkerThreadFactory implementation; creates a 360 * new ForkJoinWorkerThread. 361 */ 362 static class DefaultForkJoinWorkerThreadFactory 363 implements ForkJoinWorkerThreadFactory { 364 public ForkJoinWorkerThread newThread(ForkJoinPool pool) { 365 return new ForkJoinWorkerThread(pool); 366 } 367 } 368 369 /** 370 * Creates a new ForkJoinWorkerThread. This factory is used unless 371 * overridden in ForkJoinPool constructors. 372 */ 373 public static final ForkJoinWorkerThreadFactory 374 defaultForkJoinWorkerThreadFactory; 375 376 /** 377 * Permission required for callers of methods that may start or 378 * kill threads. 379 */ 380 private static final RuntimePermission modifyThreadPermission; 381 382 /** 383 * If there is a security manager, makes sure caller has 384 * permission to modify threads. 385 */ 386 private static void checkPermission() { 387 SecurityManager security = System.getSecurityManager(); 388 if (security != null) 389 security.checkPermission(modifyThreadPermission); 390 } 391 392 /** 393 * Generator for assigning sequence numbers as pool names. 394 */ 395 private static final AtomicInteger poolNumberGenerator; 396 397 /** 398 * Generator for initial random seeds for worker victim 399 * selection. This is used only to create initial seeds. Random 400 * steals use a cheaper xorshift generator per steal attempt. We 401 * don't expect much contention on seedGenerator, so just use a 402 * plain Random. 403 */ 404 static final Random workerSeedGenerator; 405 406 /** 407 * Array holding all worker threads in the pool. Initialized upon 408 * construction. Array size must be a power of two. Updates and 409 * replacements are protected by scanGuard, but the array is 410 * always kept in a consistent enough state to be randomly 411 * accessed without locking by workers performing work-stealing, 412 * as well as other traversal-based methods in this class, so long 413 * as reads memory-acquire by first reading ctl. All readers must 414 * tolerate that some array slots may be null. 415 */ 416 ForkJoinWorkerThread[] workers; 417 418 /** 419 * Initial size for submission queue array. Must be a power of 420 * two. In many applications, these always stay small so we use a 421 * small initial cap. 422 */ 423 private static final int INITIAL_QUEUE_CAPACITY = 8; 424 425 /** 426 * Maximum size for submission queue array. Must be a power of two 427 * less than or equal to 1 << (31 - width of array entry) to 428 * ensure lack of index wraparound, but is capped at a lower 429 * value to help users trap runaway computations. 430 */ 431 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M 432 433 /** 434 * Array serving as submission queue. Initialized upon construction. 435 */ 436 private ForkJoinTask<?>[] submissionQueue; 437 438 /** 439 * Lock protecting submissions array for addSubmission 440 */ 441 private final ReentrantLock submissionLock; 442 443 /** 444 * Condition for awaitTermination, using submissionLock for 445 * convenience. 446 */ 447 private final Condition termination; 448 449 /** 450 * Creation factory for worker threads. 451 */ 452 private final ForkJoinWorkerThreadFactory factory; 453 454 /** 455 * The uncaught exception handler used when any worker abruptly 456 * terminates. 457 */ 458 final Thread.UncaughtExceptionHandler ueh; 459 460 /** 461 * Prefix for assigning names to worker threads 462 */ 463 private final String workerNamePrefix; 464 465 /** 466 * Sum of per-thread steal counts, updated only when threads are 467 * idle or terminating. 468 */ 469 private volatile long stealCount; 470 471 /** 472 * Main pool control -- a long packed with: 473 * AC: Number of active running workers minus target parallelism (16 bits) 474 * TC: Number of total workers minus target parallelism (16 bits) 475 * ST: true if pool is terminating (1 bit) 476 * EC: the wait count of top waiting thread (15 bits) 477 * ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits) 478 * 479 * When convenient, we can extract the upper 32 bits of counts and 480 * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e = 481 * (int)ctl. The ec field is never accessed alone, but always 482 * together with id and st. The offsets of counts by the target 483 * parallelism and the positionings of fields makes it possible to 484 * perform the most common checks via sign tests of fields: When 485 * ac is negative, there are not enough active workers, when tc is 486 * negative, there are not enough total workers, when id is 487 * negative, there is at least one waiting worker, and when e is 488 * negative, the pool is terminating. To deal with these possibly 489 * negative fields, we use casts in and out of "short" and/or 490 * signed shifts to maintain signedness. 491 */ 492 volatile long ctl; 493 494 // bit positions/shifts for fields 495 private static final int AC_SHIFT = 48; 496 private static final int TC_SHIFT = 32; 497 private static final int ST_SHIFT = 31; 498 private static final int EC_SHIFT = 16; 499 500 // bounds 501 private static final int MAX_ID = 0x7fff; // max poolIndex 502 private static final int SMASK = 0xffff; // mask short bits 503 private static final int SHORT_SIGN = 1 << 15; 504 private static final int INT_SIGN = 1 << 31; 505 506 // masks 507 private static final long STOP_BIT = 0x0001L << ST_SHIFT; 508 private static final long AC_MASK = ((long)SMASK) << AC_SHIFT; 509 private static final long TC_MASK = ((long)SMASK) << TC_SHIFT; 510 511 // units for incrementing and decrementing 512 private static final long TC_UNIT = 1L << TC_SHIFT; 513 private static final long AC_UNIT = 1L << AC_SHIFT; 514 515 // masks and units for dealing with u = (int)(ctl >>> 32) 516 private static final int UAC_SHIFT = AC_SHIFT - 32; 517 private static final int UTC_SHIFT = TC_SHIFT - 32; 518 private static final int UAC_MASK = SMASK << UAC_SHIFT; 519 private static final int UTC_MASK = SMASK << UTC_SHIFT; 520 private static final int UAC_UNIT = 1 << UAC_SHIFT; 521 private static final int UTC_UNIT = 1 << UTC_SHIFT; 522 523 // masks and units for dealing with e = (int)ctl 524 private static final int E_MASK = 0x7fffffff; // no STOP_BIT 525 private static final int EC_UNIT = 1 << EC_SHIFT; 526 527 /** 528 * The target parallelism level. 529 */ 530 final int parallelism; 531 532 /** 533 * Index (mod submission queue length) of next element to take 534 * from submission queue. Usage is identical to that for 535 * per-worker queues -- see ForkJoinWorkerThread internal 536 * documentation. 537 */ 538 volatile int queueBase; 539 540 /** 541 * Index (mod submission queue length) of next element to add 542 * in submission queue. Usage is identical to that for 543 * per-worker queues -- see ForkJoinWorkerThread internal 544 * documentation. 545 */ 546 int queueTop; 547 548 /** 549 * True when shutdown() has been called. 550 */ 551 volatile boolean shutdown; 552 553 /** 554 * True if use local fifo, not default lifo, for local polling. 555 * Read by, and replicated by ForkJoinWorkerThreads. 556 */ 557 final boolean locallyFifo; 558 559 /** 560 * The number of threads in ForkJoinWorkerThreads.helpQuiescePool. 561 * When non-zero, suppresses automatic shutdown when active 562 * counts become zero. 563 */ 564 volatile int quiescerCount; 565 566 /** 567 * The number of threads blocked in join. 568 */ 569 volatile int blockedCount; 570 571 /** 572 * Counter for worker Thread names (unrelated to their poolIndex) 573 */ 574 private volatile int nextWorkerNumber; 575 576 /** 577 * The index for the next created worker. Accessed under scanGuard. 578 */ 579 private int nextWorkerIndex; 580 581 /** 582 * SeqLock and index masking for updates to workers array. Locked 583 * when SG_UNIT is set. Unlocking clears bit by adding 584 * SG_UNIT. Staleness of read-only operations can be checked by 585 * comparing scanGuard to value before the reads. The low 16 bits 586 * (i.e, anding with SMASK) hold (the smallest power of two 587 * covering all worker indices, minus one, and is used to avoid 588 * dealing with large numbers of null slots when the workers array 589 * is overallocated. 590 */ 591 volatile int scanGuard; 592 593 private static final int SG_UNIT = 1 << 16; 594 595 /** 596 * The wakeup interval (in nanoseconds) for a worker waiting for a 597 * task when the pool is quiescent to instead try to shrink the 598 * number of workers. The exact value does not matter too 599 * much. It must be short enough to release resources during 600 * sustained periods of idleness, but not so short that threads 601 * are continually re-created. 602 */ 603 private static final long SHRINK_RATE = 604 4L * 1000L * 1000L * 1000L; // 4 seconds 605 606 /** 607 * Top-level loop for worker threads: On each step: if the 608 * previous step swept through all queues and found no tasks, or 609 * there are excess threads, then possibly blocks. Otherwise, 610 * scans for and, if found, executes a task. Returns when pool 611 * and/or worker terminate. 612 * 613 * @param w the worker 614 */ 615 final void work(ForkJoinWorkerThread w) { 616 boolean swept = false; // true on empty scans 617 long c; 618 while (!w.terminate && (int)(c = ctl) >= 0) { 619 int a; // active count 620 if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0) 621 swept = scan(w, a); 622 else if (tryAwaitWork(w, c)) 623 swept = false; 624 } 625 } 626 627 // Signalling 628 629 /** 630 * Wakes up or creates a worker. 631 */ 632 final void signalWork() { 633 /* 634 * The while condition is true if: (there is are too few total 635 * workers OR there is at least one waiter) AND (there are too 636 * few active workers OR the pool is terminating). The value 637 * of e distinguishes the remaining cases: zero (no waiters) 638 * for create, negative if terminating (in which case do 639 * nothing), else release a waiter. The secondary checks for 640 * release (non-null array etc) can fail if the pool begins 641 * terminating after the test, and don't impose any added cost 642 * because JVMs must perform null and bounds checks anyway. 643 */ 644 long c; int e, u; 645 while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) & 646 (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) { 647 if (e > 0) { // release a waiting worker 648 int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws; 649 if ((ws = workers) == null || 650 (i = ~e & SMASK) >= ws.length || 651 (w = ws[i]) == null) 652 break; 653 long nc = (((long)(w.nextWait & E_MASK)) | 654 ((long)(u + UAC_UNIT) << 32)); 655 if (w.eventCount == e && 656 UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { 657 w.eventCount = (e + EC_UNIT) & E_MASK; 658 if (w.parked) 659 UNSAFE.unpark(w); 660 break; 661 } 662 } 663 else if (UNSAFE.compareAndSwapLong 664 (this, ctlOffset, c, 665 (long)(((u + UTC_UNIT) & UTC_MASK) | 666 ((u + UAC_UNIT) & UAC_MASK)) << 32)) { 667 addWorker(); 668 break; 669 } 670 } 671 } 672 673 /** 674 * Variant of signalWork to help release waiters on rescans. 675 * Tries once to release a waiter if active count < 0. 676 * 677 * @return false if failed due to contention, else true 678 */ 679 private boolean tryReleaseWaiter() { 680 long c; int e, i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws; 681 if ((e = (int)(c = ctl)) > 0 && 682 (int)(c >> AC_SHIFT) < 0 && 683 (ws = workers) != null && 684 (i = ~e & SMASK) < ws.length && 685 (w = ws[i]) != null) { 686 long nc = ((long)(w.nextWait & E_MASK) | 687 ((c + AC_UNIT) & (AC_MASK|TC_MASK))); 688 if (w.eventCount != e || 689 !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) 690 return false; 691 w.eventCount = (e + EC_UNIT) & E_MASK; 692 if (w.parked) 693 UNSAFE.unpark(w); 694 } 695 return true; 696 } 697 698 // Scanning for tasks 699 700 /** 701 * Scans for and, if found, executes one task. Scans start at a 702 * random index of workers array, and randomly select the first 703 * (2*#workers)-1 probes, and then, if all empty, resort to 2 704 * circular sweeps, which is necessary to check quiescence. and 705 * taking a submission only if no stealable tasks were found. The 706 * steal code inside the loop is a specialized form of 707 * ForkJoinWorkerThread.deqTask, followed bookkeeping to support 708 * helpJoinTask and signal propagation. The code for submission 709 * queues is almost identical. On each steal, the worker completes 710 * not only the task, but also all local tasks that this task may 711 * have generated. On detecting staleness or contention when 712 * trying to take a task, this method returns without finishing 713 * sweep, which allows global state rechecks before retry. 714 * 715 * @param w the worker 716 * @param a the number of active workers 717 * @return true if swept all queues without finding a task 718 */ 719 private boolean scan(ForkJoinWorkerThread w, int a) { 720 int g = scanGuard; // mask 0 avoids useless scans if only one active 721 int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK; 722 ForkJoinWorkerThread[] ws = workers; 723 if (ws == null || ws.length <= m) // staleness check 724 return false; 725 for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) { 726 ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; 727 ForkJoinWorkerThread v = ws[k & m]; 728 if (v != null && (b = v.queueBase) != v.queueTop && 729 (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) { 730 long u = (i << ASHIFT) + ABASE; 731 if ((t = q[i]) != null && v.queueBase == b && 732 UNSAFE.compareAndSwapObject(q, u, t, null)) { 733 int d = (v.queueBase = b + 1) - v.queueTop; 734 v.stealHint = w.poolIndex; 735 if (d != 0) 736 signalWork(); // propagate if nonempty 737 w.execTask(t); 738 } 739 r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5); 740 return false; // store next seed 741 } 742 else if (j < 0) { // xorshift 743 r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5; 744 } 745 else 746 ++k; 747 } 748 if (scanGuard != g) // staleness check 749 return false; 750 else { // try to take submission 751 ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; 752 if ((b = queueBase) != queueTop && 753 (q = submissionQueue) != null && 754 (i = (q.length - 1) & b) >= 0) { 755 long u = (i << ASHIFT) + ABASE; 756 if ((t = q[i]) != null && queueBase == b && 757 UNSAFE.compareAndSwapObject(q, u, t, null)) { 758 queueBase = b + 1; 759 w.execTask(t); 760 } 761 return false; 762 } 763 return true; // all queues empty 764 } 765 } 766 767 /** 768 * Tries to enqueue worker w in wait queue and await change in 769 * worker's eventCount. If the pool is quiescent and there is 770 * more than one worker, possibly terminates worker upon exit. 771 * Otherwise, before blocking, rescans queues to avoid missed 772 * signals. Upon finding work, releases at least one worker 773 * (which may be the current worker). Rescans restart upon 774 * detected staleness or failure to release due to 775 * contention. Note the unusual conventions about Thread.interrupt 776 * here and elsewhere: Because interrupts are used solely to alert 777 * threads to check termination, which is checked here anyway, we 778 * clear status (using Thread.interrupted) before any call to 779 * park, so that park does not immediately return due to status 780 * being set via some other unrelated call to interrupt in user 781 * code. 782 * 783 * @param w the calling worker 784 * @param c the ctl value on entry 785 * @return true if waited or another thread was released upon enq 786 */ 787 private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) { 788 int v = w.eventCount; 789 w.nextWait = (int)c; // w's successor record 790 long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); 791 if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { 792 long d = ctl; // return true if lost to a deq, to force scan 793 return (int)d != (int)c && (d & AC_MASK) >= (c & AC_MASK); 794 } 795 for (int sc = w.stealCount; sc != 0;) { // accumulate stealCount 796 long s = stealCount; 797 if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc)) 798 sc = w.stealCount = 0; 799 else if (w.eventCount != v) 800 return true; // update next time 801 } 802 if ((!shutdown || !tryTerminate(false)) && 803 (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 && 804 blockedCount == 0 && quiescerCount == 0) 805 idleAwaitWork(w, nc, c, v); // quiescent 806 for (boolean rescanned = false;;) { 807 if (w.eventCount != v) 808 return true; 809 if (!rescanned) { 810 int g = scanGuard, m = g & SMASK; 811 ForkJoinWorkerThread[] ws = workers; 812 if (ws != null && m < ws.length) { 813 rescanned = true; 814 for (int i = 0; i <= m; ++i) { 815 ForkJoinWorkerThread u = ws[i]; 816 if (u != null) { 817 if (u.queueBase != u.queueTop && 818 !tryReleaseWaiter()) 819 rescanned = false; // contended 820 if (w.eventCount != v) 821 return true; 822 } 823 } 824 } 825 if (scanGuard != g || // stale 826 (queueBase != queueTop && !tryReleaseWaiter())) 827 rescanned = false; 828 if (!rescanned) 829 Thread.yield(); // reduce contention 830 else 831 Thread.interrupted(); // clear before park 832 } 833 else { 834 w.parked = true; // must recheck 835 if (w.eventCount != v) { 836 w.parked = false; 837 return true; 838 } 839 LockSupport.park(this); 840 rescanned = w.parked = false; 841 } 842 } 843 } 844 845 /** 846 * If inactivating worker w has caused pool to become 847 * quiescent, check for pool termination, and wait for event 848 * for up to SHRINK_RATE nanosecs (rescans are unnecessary in 849 * this case because quiescence reflects consensus about lack 850 * of work). On timeout, if ctl has not changed, terminate the 851 * worker. Upon its termination (see deregisterWorker), it may 852 * wake up another worker to possibly repeat this process. 853 * 854 * @param w the calling worker 855 * @param currentCtl the ctl value after enqueuing w 856 * @param prevCtl the ctl value if w terminated 857 * @param v the eventCount w awaits change 858 */ 859 private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl, 860 long prevCtl, int v) { 861 if (w.eventCount == v) { 862 if (shutdown) 863 tryTerminate(false); 864 ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs 865 while (ctl == currentCtl) { 866 long startTime = System.nanoTime(); 867 w.parked = true; 868 if (w.eventCount == v) // must recheck 869 LockSupport.parkNanos(this, SHRINK_RATE); 870 w.parked = false; 871 if (w.eventCount != v) 872 break; 873 else if (System.nanoTime() - startTime < 874 SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop 875 Thread.interrupted(); // spurious wakeup 876 else if (UNSAFE.compareAndSwapLong(this, ctlOffset, 877 currentCtl, prevCtl)) { 878 w.terminate = true; // restore previous 879 w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK; 880 break; 881 } 882 } 883 } 884 } 885 886 // Submissions 887 888 /** 889 * Enqueues the given task in the submissionQueue. Same idea as 890 * ForkJoinWorkerThread.pushTask except for use of submissionLock. 891 * 892 * @param t the task 893 */ 894 private void addSubmission(ForkJoinTask<?> t) { 895 final ReentrantLock lock = this.submissionLock; 896 lock.lock(); 897 try { 898 ForkJoinTask<?>[] q; int s, m; 899 if ((q = submissionQueue) != null) { // ignore if queue removed 900 long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; 901 UNSAFE.putOrderedObject(q, u, t); 902 queueTop = s + 1; 903 if (s - queueBase == m) 904 growSubmissionQueue(); 905 } 906 } finally { 907 lock.unlock(); 908 } 909 signalWork(); 910 } 911 912 // (pollSubmission is defined below with exported methods) 913 914 /** 915 * Creates or doubles submissionQueue array. 916 * Basically identical to ForkJoinWorkerThread version. 917 */ 918 private void growSubmissionQueue() { 919 ForkJoinTask<?>[] oldQ = submissionQueue; 920 int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY; 921 if (size > MAXIMUM_QUEUE_CAPACITY) 922 throw new RejectedExecutionException("Queue capacity exceeded"); 923 if (size < INITIAL_QUEUE_CAPACITY) 924 size = INITIAL_QUEUE_CAPACITY; 925 ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size]; 926 int mask = size - 1; 927 int top = queueTop; 928 int oldMask; 929 if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) { 930 for (int b = queueBase; b != top; ++b) { 931 long u = ((b & oldMask) << ASHIFT) + ABASE; 932 Object x = UNSAFE.getObjectVolatile(oldQ, u); 933 if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null)) 934 UNSAFE.putObjectVolatile 935 (q, ((b & mask) << ASHIFT) + ABASE, x); 936 } 937 } 938 } 939 940 // Blocking support 941 942 /** 943 * Tries to increment blockedCount, decrement active count 944 * (sometimes implicitly) and possibly release or create a 945 * compensating worker in preparation for blocking. Fails 946 * on contention or termination. 947 * 948 * @return true if the caller can block, else should recheck and retry 949 */ 950 private boolean tryPreBlock() { 951 int b = blockedCount; 952 if (UNSAFE.compareAndSwapInt(this, blockedCountOffset, b, b + 1)) { 953 int pc = parallelism; 954 do { 955 ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w; 956 int e, ac, tc, i; 957 long c = ctl; 958 int u = (int)(c >>> 32); 959 if ((e = (int)c) < 0) { 960 // skip -- terminating 961 } 962 else if ((ac = (u >> UAC_SHIFT)) <= 0 && e != 0 && 963 (ws = workers) != null && 964 (i = ~e & SMASK) < ws.length && 965 (w = ws[i]) != null) { 966 long nc = ((long)(w.nextWait & E_MASK) | 967 (c & (AC_MASK|TC_MASK))); 968 if (w.eventCount == e && 969 UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { 970 w.eventCount = (e + EC_UNIT) & E_MASK; 971 if (w.parked) 972 UNSAFE.unpark(w); 973 return true; // release an idle worker 974 } 975 } 976 else if ((tc = (short)(u >>> UTC_SHIFT)) >= 0 && ac + pc > 1) { 977 long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK); 978 if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) 979 return true; // no compensation needed 980 } 981 else if (tc + pc < MAX_ID) { 982 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); 983 if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { 984 addWorker(); 985 return true; // create a replacement 986 } 987 } 988 // try to back out on any failure and let caller retry 989 } while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset, 990 b = blockedCount, b - 1)); 991 } 992 return false; 993 } 994 995 /** 996 * Decrements blockedCount and increments active count. 997 */ 998 private void postBlock() { 999 long c; 1000 do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, // no mask 1001 c = ctl, c + AC_UNIT)); 1002 int b; 1003 do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset, 1004 b = blockedCount, b - 1)); 1005 } 1006 1007 /** 1008 * Possibly blocks waiting for the given task to complete, or 1009 * cancels the task if terminating. Fails to wait if contended. 1010 * 1011 * @param joinMe the task 1012 */ 1013 final void tryAwaitJoin(ForkJoinTask<?> joinMe) { 1014 Thread.interrupted(); // clear interrupts before checking termination 1015 if (joinMe.status >= 0) { 1016 if (tryPreBlock()) { 1017 joinMe.tryAwaitDone(0L); 1018 postBlock(); 1019 } 1020 else if ((ctl & STOP_BIT) != 0L) 1021 joinMe.cancelIgnoringExceptions(); 1022 } 1023 } 1024 1025 /** 1026 * Possibly blocks the given worker waiting for joinMe to 1027 * complete or timeout. 1028 * 1029 * @param joinMe the task 1030 * @param nanos the wait time for underlying Object.wait 1031 */ 1032 final void timedAwaitJoin(ForkJoinTask<?> joinMe, long nanos) { 1033 while (joinMe.status >= 0) { 1034 Thread.interrupted(); 1035 if ((ctl & STOP_BIT) != 0L) { 1036 joinMe.cancelIgnoringExceptions(); 1037 break; 1038 } 1039 if (tryPreBlock()) { 1040 long last = System.nanoTime(); 1041 while (joinMe.status >= 0) { 1042 long millis = TimeUnit.NANOSECONDS.toMillis(nanos); 1043 if (millis <= 0) 1044 break; 1045 joinMe.tryAwaitDone(millis); 1046 if (joinMe.status < 0) 1047 break; 1048 if ((ctl & STOP_BIT) != 0L) { 1049 joinMe.cancelIgnoringExceptions(); 1050 break; 1051 } 1052 long now = System.nanoTime(); 1053 nanos -= now - last; 1054 last = now; 1055 } 1056 postBlock(); 1057 break; 1058 } 1059 } 1060 } 1061 1062 /** 1063 * If necessary, compensates for blocker, and blocks. 1064 */ 1065 private void awaitBlocker(ManagedBlocker blocker) 1066 throws InterruptedException { 1067 while (!blocker.isReleasable()) { 1068 if (tryPreBlock()) { 1069 try { 1070 do {} while (!blocker.isReleasable() && !blocker.block()); 1071 } finally { 1072 postBlock(); 1073 } 1074 break; 1075 } 1076 } 1077 } 1078 1079 // Creating, registering and deregistring workers 1080 1081 /** 1082 * Tries to create and start a worker; minimally rolls back counts 1083 * on failure. 1084 */ 1085 private void addWorker() { 1086 Throwable ex = null; 1087 ForkJoinWorkerThread t = null; 1088 try { 1089 t = factory.newThread(this); 1090 } catch (Throwable e) { 1091 ex = e; 1092 } 1093 if (t == null) { // null or exceptional factory return 1094 long c; // adjust counts 1095 do {} while (!UNSAFE.compareAndSwapLong 1096 (this, ctlOffset, c = ctl, 1097 (((c - AC_UNIT) & AC_MASK) | 1098 ((c - TC_UNIT) & TC_MASK) | 1099 (c & ~(AC_MASK|TC_MASK))))); 1100 // Propagate exception if originating from an external caller 1101 if (!tryTerminate(false) && ex != null && 1102 !(Thread.currentThread() instanceof ForkJoinWorkerThread)) 1103 SneakyThrow.sneakyThrow(ex); // android-changed 1104 } 1105 else 1106 t.start(); 1107 } 1108 1109 /** 1110 * Callback from ForkJoinWorkerThread constructor to assign a 1111 * public name 1112 */ 1113 final String nextWorkerName() { 1114 for (int n;;) { 1115 if (UNSAFE.compareAndSwapInt(this, nextWorkerNumberOffset, 1116 n = nextWorkerNumber, ++n)) 1117 return workerNamePrefix + n; 1118 } 1119 } 1120 1121 /** 1122 * Callback from ForkJoinWorkerThread constructor to 1123 * determine its poolIndex and record in workers array. 1124 * 1125 * @param w the worker 1126 * @return the worker's pool index 1127 */ 1128 final int registerWorker(ForkJoinWorkerThread w) { 1129 /* 1130 * In the typical case, a new worker acquires the lock, uses 1131 * next available index and returns quickly. Since we should 1132 * not block callers (ultimately from signalWork or 1133 * tryPreBlock) waiting for the lock needed to do this, we 1134 * instead help release other workers while waiting for the 1135 * lock. 1136 */ 1137 for (int g;;) { 1138 ForkJoinWorkerThread[] ws; 1139 if (((g = scanGuard) & SG_UNIT) == 0 && 1140 UNSAFE.compareAndSwapInt(this, scanGuardOffset, 1141 g, g | SG_UNIT)) { 1142 int k = nextWorkerIndex; 1143 try { 1144 if ((ws = workers) != null) { // ignore on shutdown 1145 int n = ws.length; 1146 if (k < 0 || k >= n || ws[k] != null) { 1147 for (k = 0; k < n && ws[k] != null; ++k) 1148 ; 1149 if (k == n) 1150 ws = workers = Arrays.copyOf(ws, n << 1); 1151 } 1152 ws[k] = w; 1153 nextWorkerIndex = k + 1; 1154 int m = g & SMASK; 1155 g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); 1156 } 1157 } finally { 1158 scanGuard = g; 1159 } 1160 return k; 1161 } 1162 else if ((ws = workers) != null) { // help release others 1163 for (ForkJoinWorkerThread u : ws) { 1164 if (u != null && u.queueBase != u.queueTop) { 1165 if (tryReleaseWaiter()) 1166 break; 1167 } 1168 } 1169 } 1170 } 1171 } 1172 1173 /** 1174 * Final callback from terminating worker. Removes record of 1175 * worker from array, and adjusts counts. If pool is shutting 1176 * down, tries to complete termination. 1177 * 1178 * @param w the worker 1179 */ 1180 final void deregisterWorker(ForkJoinWorkerThread w, Throwable ex) { 1181 int idx = w.poolIndex; 1182 int sc = w.stealCount; 1183 int steps = 0; 1184 // Remove from array, adjust worker counts and collect steal count. 1185 // We can intermix failed removes or adjusts with steal updates 1186 do { 1187 long s, c; 1188 int g; 1189 if (steps == 0 && ((g = scanGuard) & SG_UNIT) == 0 && 1190 UNSAFE.compareAndSwapInt(this, scanGuardOffset, 1191 g, g |= SG_UNIT)) { 1192 ForkJoinWorkerThread[] ws = workers; 1193 if (ws != null && idx >= 0 && 1194 idx < ws.length && ws[idx] == w) 1195 ws[idx] = null; // verify 1196 nextWorkerIndex = idx; 1197 scanGuard = g + SG_UNIT; 1198 steps = 1; 1199 } 1200 if (steps == 1 && 1201 UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl, 1202 (((c - AC_UNIT) & AC_MASK) | 1203 ((c - TC_UNIT) & TC_MASK) | 1204 (c & ~(AC_MASK|TC_MASK))))) 1205 steps = 2; 1206 if (sc != 0 && 1207 UNSAFE.compareAndSwapLong(this, stealCountOffset, 1208 s = stealCount, s + sc)) 1209 sc = 0; 1210 } while (steps != 2 || sc != 0); 1211 if (!tryTerminate(false)) { 1212 if (ex != null) // possibly replace if died abnormally 1213 signalWork(); 1214 else 1215 tryReleaseWaiter(); 1216 } 1217 } 1218 1219 // Shutdown and termination 1220 1221 /** 1222 * Possibly initiates and/or completes termination. 1223 * 1224 * @param now if true, unconditionally terminate, else only 1225 * if shutdown and empty queue and no active workers 1226 * @return true if now terminating or terminated 1227 */ 1228 private boolean tryTerminate(boolean now) { 1229 long c; 1230 while (((c = ctl) & STOP_BIT) == 0) { 1231 if (!now) { 1232 if ((int)(c >> AC_SHIFT) != -parallelism) 1233 return false; 1234 if (!shutdown || blockedCount != 0 || quiescerCount != 0 || 1235 queueBase != queueTop) { 1236 if (ctl == c) // staleness check 1237 return false; 1238 continue; 1239 } 1240 } 1241 if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT)) 1242 startTerminating(); 1243 } 1244 if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers 1245 final ReentrantLock lock = this.submissionLock; 1246 lock.lock(); 1247 try { 1248 termination.signalAll(); 1249 } finally { 1250 lock.unlock(); 1251 } 1252 } 1253 return true; 1254 } 1255 1256 /** 1257 * Runs up to three passes through workers: (0) Setting 1258 * termination status for each worker, followed by wakeups up to 1259 * queued workers; (1) helping cancel tasks; (2) interrupting 1260 * lagging threads (likely in external tasks, but possibly also 1261 * blocked in joins). Each pass repeats previous steps because of 1262 * potential lagging thread creation. 1263 */ 1264 private void startTerminating() { 1265 cancelSubmissions(); 1266 for (int pass = 0; pass < 3; ++pass) { 1267 ForkJoinWorkerThread[] ws = workers; 1268 if (ws != null) { 1269 for (ForkJoinWorkerThread w : ws) { 1270 if (w != null) { 1271 w.terminate = true; 1272 if (pass > 0) { 1273 w.cancelTasks(); 1274 if (pass > 1 && !w.isInterrupted()) { 1275 try { 1276 w.interrupt(); 1277 } catch (SecurityException ignore) { 1278 } 1279 } 1280 } 1281 } 1282 } 1283 terminateWaiters(); 1284 } 1285 } 1286 } 1287 1288 /** 1289 * Polls and cancels all submissions. Called only during termination. 1290 */ 1291 private void cancelSubmissions() { 1292 while (queueBase != queueTop) { 1293 ForkJoinTask<?> task = pollSubmission(); 1294 if (task != null) { 1295 try { 1296 task.cancel(false); 1297 } catch (Throwable ignore) { 1298 } 1299 } 1300 } 1301 } 1302 1303 /** 1304 * Tries to set the termination status of waiting workers, and 1305 * then wakes them up (after which they will terminate). 1306 */ 1307 private void terminateWaiters() { 1308 ForkJoinWorkerThread[] ws = workers; 1309 if (ws != null) { 1310 ForkJoinWorkerThread w; long c; int i, e; 1311 int n = ws.length; 1312 while ((i = ~(e = (int)(c = ctl)) & SMASK) < n && 1313 (w = ws[i]) != null && w.eventCount == (e & E_MASK)) { 1314 if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, 1315 (long)(w.nextWait & E_MASK) | 1316 ((c + AC_UNIT) & AC_MASK) | 1317 (c & (TC_MASK|STOP_BIT)))) { 1318 w.terminate = true; 1319 w.eventCount = e + EC_UNIT; 1320 if (w.parked) 1321 UNSAFE.unpark(w); 1322 } 1323 } 1324 } 1325 } 1326 1327 // misc ForkJoinWorkerThread support 1328 1329 /** 1330 * Increments or decrements quiescerCount. Needed only to prevent 1331 * triggering shutdown if a worker is transiently inactive while 1332 * checking quiescence. 1333 * 1334 * @param delta 1 for increment, -1 for decrement 1335 */ 1336 final void addQuiescerCount(int delta) { 1337 int c; 1338 do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset, 1339 c = quiescerCount, c + delta)); 1340 } 1341 1342 /** 1343 * Directly increments or decrements active count without queuing. 1344 * This method is used to transiently assert inactivation while 1345 * checking quiescence. 1346 * 1347 * @param delta 1 for increment, -1 for decrement 1348 */ 1349 final void addActiveCount(int delta) { 1350 long d = (long)delta << AC_SHIFT; 1351 long c; 1352 do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, 1353 c = ctl, c + d)); 1354 } 1355 1356 /** 1357 * Returns the approximate (non-atomic) number of idle threads per 1358 * active thread. 1359 */ 1360 final int idlePerActive() { 1361 // Approximate at powers of two for small values, saturate past 4 1362 int p = parallelism; 1363 int a = p + (int)(ctl >> AC_SHIFT); 1364 return (a > (p >>>= 1) ? 0 : 1365 a > (p >>>= 1) ? 1 : 1366 a > (p >>>= 1) ? 2 : 1367 a > (p >>>= 1) ? 4 : 1368 8); 1369 } 1370 1371 // Exported methods 1372 1373 // Constructors 1374 1375 /** 1376 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 1377 * java.lang.Runtime#availableProcessors}, using the {@linkplain 1378 * #defaultForkJoinWorkerThreadFactory default thread factory}, 1379 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 1380 */ 1381 public ForkJoinPool() { 1382 this(Runtime.getRuntime().availableProcessors(), 1383 defaultForkJoinWorkerThreadFactory, null, false); 1384 } 1385 1386 /** 1387 * Creates a {@code ForkJoinPool} with the indicated parallelism 1388 * level, the {@linkplain 1389 * #defaultForkJoinWorkerThreadFactory default thread factory}, 1390 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 1391 * 1392 * @param parallelism the parallelism level 1393 * @throws IllegalArgumentException if parallelism less than or 1394 * equal to zero, or greater than implementation limit 1395 */ 1396 public ForkJoinPool(int parallelism) { 1397 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); 1398 } 1399 1400 /** 1401 * Creates a {@code ForkJoinPool} with the given parameters. 1402 * 1403 * @param parallelism the parallelism level. For default value, 1404 * use {@link java.lang.Runtime#availableProcessors}. 1405 * @param factory the factory for creating new threads. For default value, 1406 * use {@link #defaultForkJoinWorkerThreadFactory}. 1407 * @param handler the handler for internal worker threads that 1408 * terminate due to unrecoverable errors encountered while executing 1409 * tasks. For default value, use {@code null}. 1410 * @param asyncMode if true, 1411 * establishes local first-in-first-out scheduling mode for forked 1412 * tasks that are never joined. This mode may be more appropriate 1413 * than default locally stack-based mode in applications in which 1414 * worker threads only process event-style asynchronous tasks. 1415 * For default value, use {@code false}. 1416 * @throws IllegalArgumentException if parallelism less than or 1417 * equal to zero, or greater than implementation limit 1418 * @throws NullPointerException if the factory is null 1419 */ 1420 public ForkJoinPool(int parallelism, 1421 ForkJoinWorkerThreadFactory factory, 1422 Thread.UncaughtExceptionHandler handler, 1423 boolean asyncMode) { 1424 checkPermission(); 1425 if (factory == null) 1426 throw new NullPointerException(); 1427 if (parallelism <= 0 || parallelism > MAX_ID) 1428 throw new IllegalArgumentException(); 1429 this.parallelism = parallelism; 1430 this.factory = factory; 1431 this.ueh = handler; 1432 this.locallyFifo = asyncMode; 1433 long np = (long)(-parallelism); // offset ctl counts 1434 this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); 1435 this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; 1436 // initialize workers array with room for 2*parallelism if possible 1437 int n = parallelism << 1; 1438 if (n >= MAX_ID) 1439 n = MAX_ID; 1440 else { // See Hackers Delight, sec 3.2, where n < (1 << 16) 1441 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; 1442 } 1443 workers = new ForkJoinWorkerThread[n + 1]; 1444 this.submissionLock = new ReentrantLock(); 1445 this.termination = submissionLock.newCondition(); 1446 StringBuilder sb = new StringBuilder("ForkJoinPool-"); 1447 sb.append(poolNumberGenerator.incrementAndGet()); 1448 sb.append("-worker-"); 1449 this.workerNamePrefix = sb.toString(); 1450 } 1451 1452 // Execution methods 1453 1454 /** 1455 * Performs the given task, returning its result upon completion. 1456 * If the computation encounters an unchecked Exception or Error, 1457 * it is rethrown as the outcome of this invocation. Rethrown 1458 * exceptions behave in the same way as regular exceptions, but, 1459 * when possible, contain stack traces (as displayed for example 1460 * using {@code ex.printStackTrace()}) of both the current thread 1461 * as well as the thread actually encountering the exception; 1462 * minimally only the latter. 1463 * 1464 * @param task the task 1465 * @return the task's result 1466 * @throws NullPointerException if the task is null 1467 * @throws RejectedExecutionException if the task cannot be 1468 * scheduled for execution 1469 */ 1470 public <T> T invoke(ForkJoinTask<T> task) { 1471 Thread t = Thread.currentThread(); 1472 if (task == null) 1473 throw new NullPointerException(); 1474 if (shutdown) 1475 throw new RejectedExecutionException(); 1476 if ((t instanceof ForkJoinWorkerThread) && 1477 ((ForkJoinWorkerThread)t).pool == this) 1478 return task.invoke(); // bypass submit if in same pool 1479 else { 1480 addSubmission(task); 1481 return task.join(); 1482 } 1483 } 1484 1485 /** 1486 * Unless terminating, forks task if within an ongoing FJ 1487 * computation in the current pool, else submits as external task. 1488 */ 1489 private <T> void forkOrSubmit(ForkJoinTask<T> task) { 1490 ForkJoinWorkerThread w; 1491 Thread t = Thread.currentThread(); 1492 if (shutdown) 1493 throw new RejectedExecutionException(); 1494 if ((t instanceof ForkJoinWorkerThread) && 1495 (w = (ForkJoinWorkerThread)t).pool == this) 1496 w.pushTask(task); 1497 else 1498 addSubmission(task); 1499 } 1500 1501 /** 1502 * Arranges for (asynchronous) execution of the given task. 1503 * 1504 * @param task the task 1505 * @throws NullPointerException if the task is null 1506 * @throws RejectedExecutionException if the task cannot be 1507 * scheduled for execution 1508 */ 1509 public void execute(ForkJoinTask<?> task) { 1510 if (task == null) 1511 throw new NullPointerException(); 1512 forkOrSubmit(task); 1513 } 1514 1515 // AbstractExecutorService methods 1516 1517 /** 1518 * @throws NullPointerException if the task is null 1519 * @throws RejectedExecutionException if the task cannot be 1520 * scheduled for execution 1521 */ 1522 public void execute(Runnable task) { 1523 if (task == null) 1524 throw new NullPointerException(); 1525 ForkJoinTask<?> job; 1526 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 1527 job = (ForkJoinTask<?>) task; 1528 else 1529 job = ForkJoinTask.adapt(task, null); 1530 forkOrSubmit(job); 1531 } 1532 1533 /** 1534 * Submits a ForkJoinTask for execution. 1535 * 1536 * @param task the task to submit 1537 * @return the task 1538 * @throws NullPointerException if the task is null 1539 * @throws RejectedExecutionException if the task cannot be 1540 * scheduled for execution 1541 */ 1542 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 1543 if (task == null) 1544 throw new NullPointerException(); 1545 forkOrSubmit(task); 1546 return task; 1547 } 1548 1549 /** 1550 * @throws NullPointerException if the task is null 1551 * @throws RejectedExecutionException if the task cannot be 1552 * scheduled for execution 1553 */ 1554 public <T> ForkJoinTask<T> submit(Callable<T> task) { 1555 if (task == null) 1556 throw new NullPointerException(); 1557 ForkJoinTask<T> job = ForkJoinTask.adapt(task); 1558 forkOrSubmit(job); 1559 return job; 1560 } 1561 1562 /** 1563 * @throws NullPointerException if the task is null 1564 * @throws RejectedExecutionException if the task cannot be 1565 * scheduled for execution 1566 */ 1567 public <T> ForkJoinTask<T> submit(Runnable task, T result) { 1568 if (task == null) 1569 throw new NullPointerException(); 1570 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result); 1571 forkOrSubmit(job); 1572 return job; 1573 } 1574 1575 /** 1576 * @throws NullPointerException if the task is null 1577 * @throws RejectedExecutionException if the task cannot be 1578 * scheduled for execution 1579 */ 1580 public ForkJoinTask<?> submit(Runnable task) { 1581 if (task == null) 1582 throw new NullPointerException(); 1583 ForkJoinTask<?> job; 1584 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 1585 job = (ForkJoinTask<?>) task; 1586 else 1587 job = ForkJoinTask.adapt(task, null); 1588 forkOrSubmit(job); 1589 return job; 1590 } 1591 1592 /** 1593 * @throws NullPointerException {@inheritDoc} 1594 * @throws RejectedExecutionException {@inheritDoc} 1595 */ 1596 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 1597 ArrayList<ForkJoinTask<T>> forkJoinTasks = 1598 new ArrayList<ForkJoinTask<T>>(tasks.size()); 1599 for (Callable<T> task : tasks) 1600 forkJoinTasks.add(ForkJoinTask.adapt(task)); 1601 invoke(new InvokeAll<T>(forkJoinTasks)); 1602 1603 @SuppressWarnings({"unchecked", "rawtypes"}) 1604 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks; 1605 return futures; 1606 } 1607 1608 static final class InvokeAll<T> extends RecursiveAction { 1609 final ArrayList<ForkJoinTask<T>> tasks; 1610 InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; } 1611 public void compute() { 1612 try { invokeAll(tasks); } 1613 catch (Exception ignore) {} 1614 } 1615 private static final long serialVersionUID = -7914297376763021607L; 1616 } 1617 1618 /** 1619 * Returns the factory used for constructing new workers. 1620 * 1621 * @return the factory used for constructing new workers 1622 */ 1623 public ForkJoinWorkerThreadFactory getFactory() { 1624 return factory; 1625 } 1626 1627 /** 1628 * Returns the handler for internal worker threads that terminate 1629 * due to unrecoverable errors encountered while executing tasks. 1630 * 1631 * @return the handler, or {@code null} if none 1632 */ 1633 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { 1634 return ueh; 1635 } 1636 1637 /** 1638 * Returns the targeted parallelism level of this pool. 1639 * 1640 * @return the targeted parallelism level of this pool 1641 */ 1642 public int getParallelism() { 1643 return parallelism; 1644 } 1645 1646 /** 1647 * Returns the number of worker threads that have started but not 1648 * yet terminated. The result returned by this method may differ 1649 * from {@link #getParallelism} when threads are created to 1650 * maintain parallelism when others are cooperatively blocked. 1651 * 1652 * @return the number of worker threads 1653 */ 1654 public int getPoolSize() { 1655 return parallelism + (short)(ctl >>> TC_SHIFT); 1656 } 1657 1658 /** 1659 * Returns {@code true} if this pool uses local first-in-first-out 1660 * scheduling mode for forked tasks that are never joined. 1661 * 1662 * @return {@code true} if this pool uses async mode 1663 */ 1664 public boolean getAsyncMode() { 1665 return locallyFifo; 1666 } 1667 1668 /** 1669 * Returns an estimate of the number of worker threads that are 1670 * not blocked waiting to join tasks or for other managed 1671 * synchronization. This method may overestimate the 1672 * number of running threads. 1673 * 1674 * @return the number of worker threads 1675 */ 1676 public int getRunningThreadCount() { 1677 int r = parallelism + (int)(ctl >> AC_SHIFT); 1678 return (r <= 0) ? 0 : r; // suppress momentarily negative values 1679 } 1680 1681 /** 1682 * Returns an estimate of the number of threads that are currently 1683 * stealing or executing tasks. This method may overestimate the 1684 * number of active threads. 1685 * 1686 * @return the number of active threads 1687 */ 1688 public int getActiveThreadCount() { 1689 int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount; 1690 return (r <= 0) ? 0 : r; // suppress momentarily negative values 1691 } 1692 1693 /** 1694 * Returns {@code true} if all worker threads are currently idle. 1695 * An idle worker is one that cannot obtain a task to execute 1696 * because none are available to steal from other threads, and 1697 * there are no pending submissions to the pool. This method is 1698 * conservative; it might not return {@code true} immediately upon 1699 * idleness of all threads, but will eventually become true if 1700 * threads remain inactive. 1701 * 1702 * @return {@code true} if all threads are currently idle 1703 */ 1704 public boolean isQuiescent() { 1705 return parallelism + (int)(ctl >> AC_SHIFT) + blockedCount == 0; 1706 } 1707 1708 /** 1709 * Returns an estimate of the total number of tasks stolen from 1710 * one thread's work queue by another. The reported value 1711 * underestimates the actual total number of steals when the pool 1712 * is not quiescent. This value may be useful for monitoring and 1713 * tuning fork/join programs: in general, steal counts should be 1714 * high enough to keep threads busy, but low enough to avoid 1715 * overhead and contention across threads. 1716 * 1717 * @return the number of steals 1718 */ 1719 public long getStealCount() { 1720 return stealCount; 1721 } 1722 1723 /** 1724 * Returns an estimate of the total number of tasks currently held 1725 * in queues by worker threads (but not including tasks submitted 1726 * to the pool that have not begun executing). This value is only 1727 * an approximation, obtained by iterating across all threads in 1728 * the pool. This method may be useful for tuning task 1729 * granularities. 1730 * 1731 * @return the number of queued tasks 1732 */ 1733 public long getQueuedTaskCount() { 1734 long count = 0; 1735 ForkJoinWorkerThread[] ws; 1736 if ((short)(ctl >>> TC_SHIFT) > -parallelism && 1737 (ws = workers) != null) { 1738 for (ForkJoinWorkerThread w : ws) 1739 if (w != null) 1740 count -= w.queueBase - w.queueTop; // must read base first 1741 } 1742 return count; 1743 } 1744 1745 /** 1746 * Returns an estimate of the number of tasks submitted to this 1747 * pool that have not yet begun executing. This method may take 1748 * time proportional to the number of submissions. 1749 * 1750 * @return the number of queued submissions 1751 */ 1752 public int getQueuedSubmissionCount() { 1753 return -queueBase + queueTop; 1754 } 1755 1756 /** 1757 * Returns {@code true} if there are any tasks submitted to this 1758 * pool that have not yet begun executing. 1759 * 1760 * @return {@code true} if there are any queued submissions 1761 */ 1762 public boolean hasQueuedSubmissions() { 1763 return queueBase != queueTop; 1764 } 1765 1766 /** 1767 * Removes and returns the next unexecuted submission if one is 1768 * available. This method may be useful in extensions to this 1769 * class that re-assign work in systems with multiple pools. 1770 * 1771 * @return the next submission, or {@code null} if none 1772 */ 1773 protected ForkJoinTask<?> pollSubmission() { 1774 ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; 1775 while ((b = queueBase) != queueTop && 1776 (q = submissionQueue) != null && 1777 (i = (q.length - 1) & b) >= 0) { 1778 long u = (i << ASHIFT) + ABASE; 1779 if ((t = q[i]) != null && 1780 queueBase == b && 1781 UNSAFE.compareAndSwapObject(q, u, t, null)) { 1782 queueBase = b + 1; 1783 return t; 1784 } 1785 } 1786 return null; 1787 } 1788 1789 /** 1790 * Removes all available unexecuted submitted and forked tasks 1791 * from scheduling queues and adds them to the given collection, 1792 * without altering their execution status. These may include 1793 * artificially generated or wrapped tasks. This method is 1794 * designed to be invoked only when the pool is known to be 1795 * quiescent. Invocations at other times may not remove all 1796 * tasks. A failure encountered while attempting to add elements 1797 * to collection {@code c} may result in elements being in 1798 * neither, either or both collections when the associated 1799 * exception is thrown. The behavior of this operation is 1800 * undefined if the specified collection is modified while the 1801 * operation is in progress. 1802 * 1803 * @param c the collection to transfer elements into 1804 * @return the number of elements transferred 1805 */ 1806 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 1807 int count = 0; 1808 while (queueBase != queueTop) { 1809 ForkJoinTask<?> t = pollSubmission(); 1810 if (t != null) { 1811 c.add(t); 1812 ++count; 1813 } 1814 } 1815 ForkJoinWorkerThread[] ws; 1816 if ((short)(ctl >>> TC_SHIFT) > -parallelism && 1817 (ws = workers) != null) { 1818 for (ForkJoinWorkerThread w : ws) 1819 if (w != null) 1820 count += w.drainTasksTo(c); 1821 } 1822 return count; 1823 } 1824 1825 /** 1826 * Returns a string identifying this pool, as well as its state, 1827 * including indications of run state, parallelism level, and 1828 * worker and task counts. 1829 * 1830 * @return a string identifying this pool, as well as its state 1831 */ 1832 public String toString() { 1833 long st = getStealCount(); 1834 long qt = getQueuedTaskCount(); 1835 long qs = getQueuedSubmissionCount(); 1836 int pc = parallelism; 1837 long c = ctl; 1838 int tc = pc + (short)(c >>> TC_SHIFT); 1839 int rc = pc + (int)(c >> AC_SHIFT); 1840 if (rc < 0) // ignore transient negative 1841 rc = 0; 1842 int ac = rc + blockedCount; 1843 String level; 1844 if ((c & STOP_BIT) != 0) 1845 level = (tc == 0) ? "Terminated" : "Terminating"; 1846 else 1847 level = shutdown ? "Shutting down" : "Running"; 1848 return super.toString() + 1849 "[" + level + 1850 ", parallelism = " + pc + 1851 ", size = " + tc + 1852 ", active = " + ac + 1853 ", running = " + rc + 1854 ", steals = " + st + 1855 ", tasks = " + qt + 1856 ", submissions = " + qs + 1857 "]"; 1858 } 1859 1860 /** 1861 * Initiates an orderly shutdown in which previously submitted 1862 * tasks are executed, but no new tasks will be accepted. 1863 * Invocation has no additional effect if already shut down. 1864 * Tasks that are in the process of being submitted concurrently 1865 * during the course of this method may or may not be rejected. 1866 */ 1867 public void shutdown() { 1868 checkPermission(); 1869 shutdown = true; 1870 tryTerminate(false); 1871 } 1872 1873 /** 1874 * Attempts to cancel and/or stop all tasks, and reject all 1875 * subsequently submitted tasks. Tasks that are in the process of 1876 * being submitted or executed concurrently during the course of 1877 * this method may or may not be rejected. This method cancels 1878 * both existing and unexecuted tasks, in order to permit 1879 * termination in the presence of task dependencies. So the method 1880 * always returns an empty list (unlike the case for some other 1881 * Executors). 1882 * 1883 * @return an empty list 1884 */ 1885 public List<Runnable> shutdownNow() { 1886 checkPermission(); 1887 shutdown = true; 1888 tryTerminate(true); 1889 return Collections.emptyList(); 1890 } 1891 1892 /** 1893 * Returns {@code true} if all tasks have completed following shut down. 1894 * 1895 * @return {@code true} if all tasks have completed following shut down 1896 */ 1897 public boolean isTerminated() { 1898 long c = ctl; 1899 return ((c & STOP_BIT) != 0L && 1900 (short)(c >>> TC_SHIFT) == -parallelism); 1901 } 1902 1903 /** 1904 * Returns {@code true} if the process of termination has 1905 * commenced but not yet completed. This method may be useful for 1906 * debugging. A return of {@code true} reported a sufficient 1907 * period after shutdown may indicate that submitted tasks have 1908 * ignored or suppressed interruption, or are waiting for IO, 1909 * causing this executor not to properly terminate. (See the 1910 * advisory notes for class {@link ForkJoinTask} stating that 1911 * tasks should not normally entail blocking operations. But if 1912 * they do, they must abort them on interrupt.) 1913 * 1914 * @return {@code true} if terminating but not yet terminated 1915 */ 1916 public boolean isTerminating() { 1917 long c = ctl; 1918 return ((c & STOP_BIT) != 0L && 1919 (short)(c >>> TC_SHIFT) != -parallelism); 1920 } 1921 1922 /** 1923 * Returns true if terminating or terminated. Used by ForkJoinWorkerThread. 1924 */ 1925 final boolean isAtLeastTerminating() { 1926 return (ctl & STOP_BIT) != 0L; 1927 } 1928 1929 /** 1930 * Returns {@code true} if this pool has been shut down. 1931 * 1932 * @return {@code true} if this pool has been shut down 1933 */ 1934 public boolean isShutdown() { 1935 return shutdown; 1936 } 1937 1938 /** 1939 * Blocks until all tasks have completed execution after a shutdown 1940 * request, or the timeout occurs, or the current thread is 1941 * interrupted, whichever happens first. 1942 * 1943 * @param timeout the maximum time to wait 1944 * @param unit the time unit of the timeout argument 1945 * @return {@code true} if this executor terminated and 1946 * {@code false} if the timeout elapsed before termination 1947 * @throws InterruptedException if interrupted while waiting 1948 */ 1949 public boolean awaitTermination(long timeout, TimeUnit unit) 1950 throws InterruptedException { 1951 long nanos = unit.toNanos(timeout); 1952 final ReentrantLock lock = this.submissionLock; 1953 lock.lock(); 1954 try { 1955 for (;;) { 1956 if (isTerminated()) 1957 return true; 1958 if (nanos <= 0) 1959 return false; 1960 nanos = termination.awaitNanos(nanos); 1961 } 1962 } finally { 1963 lock.unlock(); 1964 } 1965 } 1966 1967 /** 1968 * Interface for extending managed parallelism for tasks running 1969 * in {@link ForkJoinPool}s. 1970 * 1971 * <p>A {@code ManagedBlocker} provides two methods. Method 1972 * {@code isReleasable} must return {@code true} if blocking is 1973 * not necessary. Method {@code block} blocks the current thread 1974 * if necessary (perhaps internally invoking {@code isReleasable} 1975 * before actually blocking). These actions are performed by any 1976 * thread invoking {@link ForkJoinPool#managedBlock}. The 1977 * unusual methods in this API accommodate synchronizers that may, 1978 * but don't usually, block for long periods. Similarly, they 1979 * allow more efficient internal handling of cases in which 1980 * additional workers may be, but usually are not, needed to 1981 * ensure sufficient parallelism. Toward this end, 1982 * implementations of method {@code isReleasable} must be amenable 1983 * to repeated invocation. 1984 * 1985 * <p>For example, here is a ManagedBlocker based on a 1986 * ReentrantLock: 1987 * <pre> {@code 1988 * class ManagedLocker implements ManagedBlocker { 1989 * final ReentrantLock lock; 1990 * boolean hasLock = false; 1991 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 1992 * public boolean block() { 1993 * if (!hasLock) 1994 * lock.lock(); 1995 * return true; 1996 * } 1997 * public boolean isReleasable() { 1998 * return hasLock || (hasLock = lock.tryLock()); 1999 * } 2000 * }}</pre> 2001 * 2002 * <p>Here is a class that possibly blocks waiting for an 2003 * item on a given queue: 2004 * <pre> {@code 2005 * class QueueTaker<E> implements ManagedBlocker { 2006 * final BlockingQueue<E> queue; 2007 * volatile E item = null; 2008 * QueueTaker(BlockingQueue<E> q) { this.queue = q; } 2009 * public boolean block() throws InterruptedException { 2010 * if (item == null) 2011 * item = queue.take(); 2012 * return true; 2013 * } 2014 * public boolean isReleasable() { 2015 * return item != null || (item = queue.poll()) != null; 2016 * } 2017 * public E getItem() { // call after pool.managedBlock completes 2018 * return item; 2019 * } 2020 * }}</pre> 2021 */ 2022 public static interface ManagedBlocker { 2023 /** 2024 * Possibly blocks the current thread, for example waiting for 2025 * a lock or condition. 2026 * 2027 * @return {@code true} if no additional blocking is necessary 2028 * (i.e., if isReleasable would return true) 2029 * @throws InterruptedException if interrupted while waiting 2030 * (the method is not required to do so, but is allowed to) 2031 */ 2032 boolean block() throws InterruptedException; 2033 2034 /** 2035 * Returns {@code true} if blocking is unnecessary. 2036 */ 2037 boolean isReleasable(); 2038 } 2039 2040 /** 2041 * Blocks in accord with the given blocker. If the current thread 2042 * is a {@link ForkJoinWorkerThread}, this method possibly 2043 * arranges for a spare thread to be activated if necessary to 2044 * ensure sufficient parallelism while the current thread is blocked. 2045 * 2046 * <p>If the caller is not a {@link ForkJoinTask}, this method is 2047 * behaviorally equivalent to 2048 * <pre> {@code 2049 * while (!blocker.isReleasable()) 2050 * if (blocker.block()) 2051 * return; 2052 * }</pre> 2053 * 2054 * If the caller is a {@code ForkJoinTask}, then the pool may 2055 * first be expanded to ensure parallelism, and later adjusted. 2056 * 2057 * @param blocker the blocker 2058 * @throws InterruptedException if blocker.block did so 2059 */ 2060 public static void managedBlock(ManagedBlocker blocker) 2061 throws InterruptedException { 2062 Thread t = Thread.currentThread(); 2063 if (t instanceof ForkJoinWorkerThread) { 2064 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; 2065 w.pool.awaitBlocker(blocker); 2066 } 2067 else { 2068 do {} while (!blocker.isReleasable() && !blocker.block()); 2069 } 2070 } 2071 2072 // AbstractExecutorService overrides. These rely on undocumented 2073 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also 2074 // implement RunnableFuture. 2075 2076 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 2077 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value); 2078 } 2079 2080 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 2081 return (RunnableFuture<T>) ForkJoinTask.adapt(callable); 2082 } 2083 2084 // Unsafe mechanics 2085 private static final sun.misc.Unsafe UNSAFE; 2086 private static final long ctlOffset; 2087 private static final long stealCountOffset; 2088 private static final long blockedCountOffset; 2089 private static final long quiescerCountOffset; 2090 private static final long scanGuardOffset; 2091 private static final long nextWorkerNumberOffset; 2092 private static final long ABASE; 2093 private static final int ASHIFT; 2094 2095 static { 2096 poolNumberGenerator = new AtomicInteger(); 2097 workerSeedGenerator = new Random(); 2098 modifyThreadPermission = new RuntimePermission("modifyThread"); 2099 defaultForkJoinWorkerThreadFactory = 2100 new DefaultForkJoinWorkerThreadFactory(); 2101 try { 2102 UNSAFE = sun.misc.Unsafe.getUnsafe(); 2103 Class<?> k = ForkJoinPool.class; 2104 ctlOffset = UNSAFE.objectFieldOffset 2105 (k.getDeclaredField("ctl")); 2106 stealCountOffset = UNSAFE.objectFieldOffset 2107 (k.getDeclaredField("stealCount")); 2108 blockedCountOffset = UNSAFE.objectFieldOffset 2109 (k.getDeclaredField("blockedCount")); 2110 quiescerCountOffset = UNSAFE.objectFieldOffset 2111 (k.getDeclaredField("quiescerCount")); 2112 scanGuardOffset = UNSAFE.objectFieldOffset 2113 (k.getDeclaredField("scanGuard")); 2114 nextWorkerNumberOffset = UNSAFE.objectFieldOffset 2115 (k.getDeclaredField("nextWorkerNumber")); 2116 } catch (Exception e) { 2117 throw new Error(e); 2118 } 2119 Class<?> a = ForkJoinTask[].class; 2120 ABASE = UNSAFE.arrayBaseOffset(a); 2121 int s = UNSAFE.arrayIndexScale(a); 2122 if ((s & (s-1)) != 0) 2123 throw new Error("data type scale not a power of two"); 2124 ASHIFT = 31 - Integer.numberOfLeadingZeros(s); 2125 } 2126 2127 } 2128