Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Written by Doug Lea with assistance from members of JCP JSR-166
      3  * Expert Group and released to the public domain, as explained at
      4  * http://creativecommons.org/publicdomain/zero/1.0/
      5  */
      6 
      7 package java.util.concurrent;
      8 
      9 import java.util.ArrayList;
     10 import java.util.Arrays;
     11 import java.util.Collection;
     12 import java.util.Collections;
     13 import java.util.List;
     14 import java.util.Random;
     15 import java.util.concurrent.AbstractExecutorService;
     16 import java.util.concurrent.Callable;
     17 import java.util.concurrent.ExecutorService;
     18 import java.util.concurrent.Future;
     19 import java.util.concurrent.RejectedExecutionException;
     20 import java.util.concurrent.RunnableFuture;
     21 import java.util.concurrent.TimeUnit;
     22 import java.util.concurrent.atomic.AtomicInteger;
     23 import java.util.concurrent.locks.LockSupport;
     24 import java.util.concurrent.locks.ReentrantLock;
     25 import java.util.concurrent.locks.Condition;
     26 import libcore.util.SneakyThrow;
     27 
     28 // BEGIN android-note
     29 // removed security manager docs
     30 // END android-note
     31 
     32 /**
     33  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
     34  * A {@code ForkJoinPool} provides the entry point for submissions
     35  * from non-{@code ForkJoinTask} clients, as well as management and
     36  * monitoring operations.
     37  *
     38  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
     39  * ExecutorService} mainly by virtue of employing
     40  * <em>work-stealing</em>: all threads in the pool attempt to find and
     41  * execute subtasks created by other active tasks (eventually blocking
     42  * waiting for work if none exist). This enables efficient processing
     43  * when most tasks spawn other subtasks (as do most {@code
     44  * ForkJoinTask}s). When setting <em>asyncMode</em> to true in
     45  * constructors, {@code ForkJoinPool}s may also be appropriate for use
     46  * with event-style tasks that are never joined.
     47  *
     48  * <p>A {@code ForkJoinPool} is constructed with a given target
     49  * parallelism level; by default, equal to the number of available
     50  * processors. The pool attempts to maintain enough active (or
     51  * available) threads by dynamically adding, suspending, or resuming
     52  * internal worker threads, even if some tasks are stalled waiting to
     53  * join others. However, no such adjustments are guaranteed in the
     54  * face of blocked IO or other unmanaged synchronization. The nested
     55  * {@link ManagedBlocker} interface enables extension of the kinds of
     56  * synchronization accommodated.
     57  *
     58  * <p>In addition to execution and lifecycle control methods, this
     59  * class provides status check methods (for example
     60  * {@link #getStealCount}) that are intended to aid in developing,
     61  * tuning, and monitoring fork/join applications. Also, method
     62  * {@link #toString} returns indications of pool state in a
     63  * convenient form for informal monitoring.
     64  *
     65  * <p> As is the case with other ExecutorServices, there are three
     66  * main task execution methods summarized in the following
     67  * table. These are designed to be used by clients not already engaged
     68  * in fork/join computations in the current pool.  The main forms of
     69  * these methods accept instances of {@code ForkJoinTask}, but
     70  * overloaded forms also allow mixed execution of plain {@code
     71  * Runnable}- or {@code Callable}- based activities as well.  However,
     72  * tasks that are already executing in a pool should normally
     73  * <em>NOT</em> use these pool execution methods, but instead use the
     74  * within-computation forms listed in the table.
     75  *
     76  * <table BORDER CELLPADDING=3 CELLSPACING=1>
     77  *  <tr>
     78  *    <td></td>
     79  *    <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
     80  *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
     81  *  </tr>
     82  *  <tr>
     83  *    <td> <b>Arrange async execution</td>
     84  *    <td> {@link #execute(ForkJoinTask)}</td>
     85  *    <td> {@link ForkJoinTask#fork}</td>
     86  *  </tr>
     87  *  <tr>
     88  *    <td> <b>Await and obtain result</td>
     89  *    <td> {@link #invoke(ForkJoinTask)}</td>
     90  *    <td> {@link ForkJoinTask#invoke}</td>
     91  *  </tr>
     92  *  <tr>
     93  *    <td> <b>Arrange exec and obtain Future</td>
     94  *    <td> {@link #submit(ForkJoinTask)}</td>
     95  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
     96  *  </tr>
     97  * </table>
     98  *
     99  * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
    100  * used for all parallel task execution in a program or subsystem.
    101  * Otherwise, use would not usually outweigh the construction and
    102  * bookkeeping overhead of creating a large set of threads. For
    103  * example, a common pool could be used for the {@code SortTasks}
    104  * illustrated in {@link RecursiveAction}. Because {@code
    105  * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
    106  * daemon} mode, there is typically no need to explicitly {@link
    107  * #shutdown} such a pool upon program exit.
    108  *
    109  *  <pre> {@code
    110  * static final ForkJoinPool mainPool = new ForkJoinPool();
    111  * ...
    112  * public void sort(long[] array) {
    113  *   mainPool.invoke(new SortTask(array, 0, array.length));
    114  * }}</pre>
    115  *
    116  * <p><b>Implementation notes</b>: This implementation restricts the
    117  * maximum number of running threads to 32767. Attempts to create
    118  * pools with greater than the maximum number result in
    119  * {@code IllegalArgumentException}.
    120  *
    121  * <p>This implementation rejects submitted tasks (that is, by throwing
    122  * {@link RejectedExecutionException}) only when the pool is shut down
    123  * or internal resources have been exhausted.
    124  *
    125  * @since 1.7
    126  * @hide
    127  * @author Doug Lea
    128  */
    129 public class ForkJoinPool extends AbstractExecutorService {
    130 
    131     /*
    132      * Implementation Overview
    133      *
    134      * This class provides the central bookkeeping and control for a
    135      * set of worker threads: Submissions from non-FJ threads enter
    136      * into a submission queue. Workers take these tasks and typically
    137      * split them into subtasks that may be stolen by other workers.
    138      * Preference rules give first priority to processing tasks from
    139      * their own queues (LIFO or FIFO, depending on mode), then to
    140      * randomized FIFO steals of tasks in other worker queues, and
    141      * lastly to new submissions.
    142      *
    143      * The main throughput advantages of work-stealing stem from
    144      * decentralized control -- workers mostly take tasks from
    145      * themselves or each other. We cannot negate this in the
    146      * implementation of other management responsibilities. The main
    147      * tactic for avoiding bottlenecks is packing nearly all
    148      * essentially atomic control state into a single 64bit volatile
    149      * variable ("ctl"). This variable is read on the order of 10-100
    150      * times as often as it is modified (always via CAS). (There is
    151      * some additional control state, for example variable "shutdown"
    152      * for which we can cope with uncoordinated updates.)  This
    153      * streamlines synchronization and control at the expense of messy
    154      * constructions needed to repack status bits upon updates.
    155      * Updates tend not to contend with each other except during
    156      * bursts while submitted tasks begin or end.  In some cases when
    157      * they do contend, threads can instead do something else
    158      * (usually, scan for tasks) until contention subsides.
    159      *
    160      * To enable packing, we restrict maximum parallelism to (1<<15)-1
    161      * (which is far in excess of normal operating range) to allow
    162      * ids, counts, and their negations (used for thresholding) to fit
    163      * into 16bit fields.
    164      *
    165      * Recording Workers.  Workers are recorded in the "workers" array
    166      * that is created upon pool construction and expanded if (rarely)
    167      * necessary.  This is an array as opposed to some other data
    168      * structure to support index-based random steals by workers.
    169      * Updates to the array recording new workers and unrecording
    170      * terminated ones are protected from each other by a seqLock
    171      * (scanGuard) but the array is otherwise concurrently readable,
    172      * and accessed directly by workers. To simplify index-based
    173      * operations, the array size is always a power of two, and all
    174      * readers must tolerate null slots. To avoid flailing during
    175      * start-up, the array is presized to hold twice #parallelism
    176      * workers (which is unlikely to need further resizing during
    177      * execution). But to avoid dealing with so many null slots,
    178      * variable scanGuard includes a mask for the nearest power of two
    179      * that contains all current workers.  All worker thread creation
    180      * is on-demand, triggered by task submissions, replacement of
    181      * terminated workers, and/or compensation for blocked
    182      * workers. However, all other support code is set up to work with
    183      * other policies.  To ensure that we do not hold on to worker
    184      * references that would prevent GC, ALL accesses to workers are
    185      * via indices into the workers array (which is one source of some
    186      * of the messy code constructions here). In essence, the workers
    187      * array serves as a weak reference mechanism. Thus for example
    188      * the wait queue field of ctl stores worker indices, not worker
    189      * references.  Access to the workers in associated methods (for
    190      * example signalWork) must both index-check and null-check the
    191      * IDs. All such accesses ignore bad IDs by returning out early
    192      * from what they are doing, since this can only be associated
    193      * with termination, in which case it is OK to give up.
    194      *
    195      * All uses of the workers array, as well as queue arrays, check
    196      * that the array is non-null (even if previously non-null). This
    197      * allows nulling during termination, which is currently not
    198      * necessary, but remains an option for resource-revocation-based
    199      * shutdown schemes.
    200      *
    201      * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot
    202      * let workers spin indefinitely scanning for tasks when none can
    203      * be found immediately, and we cannot start/resume workers unless
    204      * there appear to be tasks available.  On the other hand, we must
    205      * quickly prod them into action when new tasks are submitted or
    206      * generated.  We park/unpark workers after placing in an event
    207      * wait queue when they cannot find work. This "queue" is actually
    208      * a simple Treiber stack, headed by the "id" field of ctl, plus a
    209      * 15bit counter value to both wake up waiters (by advancing their
    210      * count) and avoid ABA effects. Successors are held in worker
    211      * field "nextWait".  Queuing deals with several intrinsic races,
    212      * mainly that a task-producing thread can miss seeing (and
    213      * signalling) another thread that gave up looking for work but
    214      * has not yet entered the wait queue. We solve this by requiring
    215      * a full sweep of all workers both before (in scan()) and after
    216      * (in tryAwaitWork()) a newly waiting worker is added to the wait
    217      * queue. During a rescan, the worker might release some other
    218      * queued worker rather than itself, which has the same net
    219      * effect. Because enqueued workers may actually be rescanning
    220      * rather than waiting, we set and clear the "parked" field of
    221      * ForkJoinWorkerThread to reduce unnecessary calls to unpark.
    222      * (Use of the parked field requires a secondary recheck to avoid
    223      * missed signals.)
    224      *
    225      * Signalling.  We create or wake up workers only when there
    226      * appears to be at least one task they might be able to find and
    227      * execute.  When a submission is added or another worker adds a
    228      * task to a queue that previously had two or fewer tasks, they
    229      * signal waiting workers (or trigger creation of new ones if
    230      * fewer than the given parallelism level -- see signalWork).
    231      * These primary signals are buttressed by signals during rescans
    232      * as well as those performed when a worker steals a task and
    233      * notices that there are more tasks too; together these cover the
    234      * signals needed in cases when more than two tasks are pushed
    235      * but untaken.
    236      *
    237      * Trimming workers. To release resources after periods of lack of
    238      * use, a worker starting to wait when the pool is quiescent will
    239      * time out and terminate if the pool has remained quiescent for
    240      * SHRINK_RATE nanosecs. This will slowly propagate, eventually
    241      * terminating all workers after long periods of non-use.
    242      *
    243      * Submissions. External submissions are maintained in an
    244      * array-based queue that is structured identically to
    245      * ForkJoinWorkerThread queues except for the use of
    246      * submissionLock in method addSubmission. Unlike the case for
    247      * worker queues, multiple external threads can add new
    248      * submissions, so adding requires a lock.
    249      *
    250      * Compensation. Beyond work-stealing support and lifecycle
    251      * control, the main responsibility of this framework is to take
    252      * actions when one worker is waiting to join a task stolen (or
    253      * always held by) another.  Because we are multiplexing many
    254      * tasks on to a pool of workers, we can't just let them block (as
    255      * in Thread.join).  We also cannot just reassign the joiner's
    256      * run-time stack with another and replace it later, which would
    257      * be a form of "continuation", that even if possible is not
    258      * necessarily a good idea since we sometimes need both an
    259      * unblocked task and its continuation to progress. Instead we
    260      * combine two tactics:
    261      *
    262      *   Helping: Arranging for the joiner to execute some task that it
    263      *      would be running if the steal had not occurred.  Method
    264      *      ForkJoinWorkerThread.joinTask tracks joining->stealing
    265      *      links to try to find such a task.
    266      *
    267      *   Compensating: Unless there are already enough live threads,
    268      *      method tryPreBlock() may create or re-activate a spare
    269      *      thread to compensate for blocked joiners until they
    270      *      unblock.
    271      *
    272      * The ManagedBlocker extension API can't use helping so relies
    273      * only on compensation in method awaitBlocker.
    274      *
    275      * It is impossible to keep exactly the target parallelism number
    276      * of threads running at any given time.  Determining the
    277      * existence of conservatively safe helping targets, the
    278      * availability of already-created spares, and the apparent need
    279      * to create new spares are all racy and require heuristic
    280      * guidance, so we rely on multiple retries of each.  Currently,
    281      * in keeping with on-demand signalling policy, we compensate only
    282      * if blocking would leave less than one active (non-waiting,
    283      * non-blocked) worker. Additionally, to avoid some false alarms
    284      * due to GC, lagging counters, system activity, etc, compensated
    285      * blocking for joins is only attempted after rechecks stabilize
    286      * (retries are interspersed with Thread.yield, for good
    287      * citizenship).  The variable blockedCount, incremented before
    288      * blocking and decremented after, is sometimes needed to
    289      * distinguish cases of waiting for work vs blocking on joins or
    290      * other managed sync. Both cases are equivalent for most pool
    291      * control, so we can update non-atomically. (Additionally,
    292      * contention on blockedCount alleviates some contention on ctl).
    293      *
    294      * Shutdown and Termination. A call to shutdownNow atomically sets
    295      * the ctl stop bit and then (non-atomically) sets each workers
    296      * "terminate" status, cancels all unprocessed tasks, and wakes up
    297      * all waiting workers.  Detecting whether termination should
    298      * commence after a non-abrupt shutdown() call requires more work
    299      * and bookkeeping. We need consensus about quiescence (i.e., that
    300      * there is no more work) which is reflected in active counts so
    301      * long as there are no current blockers, as well as possible
    302      * re-evaluations during independent changes in blocking or
    303      * quiescing workers.
    304      *
    305      * Style notes: There is a lot of representation-level coupling
    306      * among classes ForkJoinPool, ForkJoinWorkerThread, and
    307      * ForkJoinTask.  Most fields of ForkJoinWorkerThread maintain
    308      * data structures managed by ForkJoinPool, so are directly
    309      * accessed.  Conversely we allow access to "workers" array by
    310      * workers, and direct access to ForkJoinTask.status by both
    311      * ForkJoinPool and ForkJoinWorkerThread.  There is little point
    312      * trying to reduce this, since any associated future changes in
    313      * representations will need to be accompanied by algorithmic
    314      * changes anyway. All together, these low-level implementation
    315      * choices produce as much as a factor of 4 performance
    316      * improvement compared to naive implementations, and enable the
    317      * processing of billions of tasks per second, at the expense of
    318      * some ugliness.
    319      *
    320      * Methods signalWork() and scan() are the main bottlenecks so are
    321      * especially heavily micro-optimized/mangled.  There are lots of
    322      * inline assignments (of form "while ((local = field) != 0)")
    323      * which are usually the simplest way to ensure the required read
    324      * orderings (which are sometimes critical). This leads to a
    325      * "C"-like style of listing declarations of these locals at the
    326      * heads of methods or blocks.  There are several occurrences of
    327      * the unusual "do {} while (!cas...)"  which is the simplest way
    328      * to force an update of a CAS'ed variable. There are also other
    329      * coding oddities that help some methods perform reasonably even
    330      * when interpreted (not compiled).
    331      *
    332      * The order of declarations in this file is: (1) declarations of
    333      * statics (2) fields (along with constants used when unpacking
    334      * some of them), listed in an order that tends to reduce
    335      * contention among them a bit under most JVMs.  (3) internal
    336      * control methods (4) callbacks and other support for
    337      * ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
    338      * methods (plus a few little helpers). (6) static block
    339      * initializing all statics in a minimally dependent order.
    340      */
    341 
    342     /**
    343      * Factory for creating new {@link ForkJoinWorkerThread}s.
    344      * A {@code ForkJoinWorkerThreadFactory} must be defined and used
    345      * for {@code ForkJoinWorkerThread} subclasses that extend base
    346      * functionality or initialize threads with different contexts.
    347      */
    348     public static interface ForkJoinWorkerThreadFactory {
    349         /**
    350          * Returns a new worker thread operating in the given pool.
    351          *
    352          * @param pool the pool this thread works in
    353          * @throws NullPointerException if the pool is null
    354          */
    355         public ForkJoinWorkerThread newThread(ForkJoinPool pool);
    356     }
    357 
    358     /**
    359      * Default ForkJoinWorkerThreadFactory implementation; creates a
    360      * new ForkJoinWorkerThread.
    361      */
    362     static class DefaultForkJoinWorkerThreadFactory
    363         implements ForkJoinWorkerThreadFactory {
    364         public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    365             return new ForkJoinWorkerThread(pool);
    366         }
    367     }
    368 
    369     /**
    370      * Creates a new ForkJoinWorkerThread. This factory is used unless
    371      * overridden in ForkJoinPool constructors.
    372      */
    373     public static final ForkJoinWorkerThreadFactory
    374         defaultForkJoinWorkerThreadFactory;
    375 
    376     /**
    377      * Permission required for callers of methods that may start or
    378      * kill threads.
    379      */
    380     private static final RuntimePermission modifyThreadPermission;
    381 
    382     /**
    383      * If there is a security manager, makes sure caller has
    384      * permission to modify threads.
    385      */
    386     private static void checkPermission() {
    387         SecurityManager security = System.getSecurityManager();
    388         if (security != null)
    389             security.checkPermission(modifyThreadPermission);
    390     }
    391 
    392     /**
    393      * Generator for assigning sequence numbers as pool names.
    394      */
    395     private static final AtomicInteger poolNumberGenerator;
    396 
    397     /**
    398      * Generator for initial random seeds for worker victim
    399      * selection. This is used only to create initial seeds. Random
    400      * steals use a cheaper xorshift generator per steal attempt. We
    401      * don't expect much contention on seedGenerator, so just use a
    402      * plain Random.
    403      */
    404     static final Random workerSeedGenerator;
    405 
    406     /**
    407      * Array holding all worker threads in the pool.  Initialized upon
    408      * construction. Array size must be a power of two.  Updates and
    409      * replacements are protected by scanGuard, but the array is
    410      * always kept in a consistent enough state to be randomly
    411      * accessed without locking by workers performing work-stealing,
    412      * as well as other traversal-based methods in this class, so long
    413      * as reads memory-acquire by first reading ctl. All readers must
    414      * tolerate that some array slots may be null.
    415      */
    416     ForkJoinWorkerThread[] workers;
    417 
    418     /**
    419      * Initial size for submission queue array. Must be a power of
    420      * two.  In many applications, these always stay small so we use a
    421      * small initial cap.
    422      */
    423     private static final int INITIAL_QUEUE_CAPACITY = 8;
    424 
    425     /**
    426      * Maximum size for submission queue array. Must be a power of two
    427      * less than or equal to 1 << (31 - width of array entry) to
    428      * ensure lack of index wraparound, but is capped at a lower
    429      * value to help users trap runaway computations.
    430      */
    431     private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
    432 
    433     /**
    434      * Array serving as submission queue. Initialized upon construction.
    435      */
    436     private ForkJoinTask<?>[] submissionQueue;
    437 
    438     /**
    439      * Lock protecting submissions array for addSubmission
    440      */
    441     private final ReentrantLock submissionLock;
    442 
    443     /**
    444      * Condition for awaitTermination, using submissionLock for
    445      * convenience.
    446      */
    447     private final Condition termination;
    448 
    449     /**
    450      * Creation factory for worker threads.
    451      */
    452     private final ForkJoinWorkerThreadFactory factory;
    453 
    454     /**
    455      * The uncaught exception handler used when any worker abruptly
    456      * terminates.
    457      */
    458     final Thread.UncaughtExceptionHandler ueh;
    459 
    460     /**
    461      * Prefix for assigning names to worker threads
    462      */
    463     private final String workerNamePrefix;
    464 
    465     /**
    466      * Sum of per-thread steal counts, updated only when threads are
    467      * idle or terminating.
    468      */
    469     private volatile long stealCount;
    470 
    471     /**
    472      * Main pool control -- a long packed with:
    473      * AC: Number of active running workers minus target parallelism (16 bits)
    474      * TC: Number of total workers minus target parallelism (16 bits)
    475      * ST: true if pool is terminating (1 bit)
    476      * EC: the wait count of top waiting thread (15 bits)
    477      * ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits)
    478      *
    479      * When convenient, we can extract the upper 32 bits of counts and
    480      * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
    481      * (int)ctl.  The ec field is never accessed alone, but always
    482      * together with id and st. The offsets of counts by the target
    483      * parallelism and the positionings of fields makes it possible to
    484      * perform the most common checks via sign tests of fields: When
    485      * ac is negative, there are not enough active workers, when tc is
    486      * negative, there are not enough total workers, when id is
    487      * negative, there is at least one waiting worker, and when e is
    488      * negative, the pool is terminating.  To deal with these possibly
    489      * negative fields, we use casts in and out of "short" and/or
    490      * signed shifts to maintain signedness.
    491      */
    492     volatile long ctl;
    493 
    494     // bit positions/shifts for fields
    495     private static final int  AC_SHIFT   = 48;
    496     private static final int  TC_SHIFT   = 32;
    497     private static final int  ST_SHIFT   = 31;
    498     private static final int  EC_SHIFT   = 16;
    499 
    500     // bounds
    501     private static final int  MAX_ID     = 0x7fff;  // max poolIndex
    502     private static final int  SMASK      = 0xffff;  // mask short bits
    503     private static final int  SHORT_SIGN = 1 << 15;
    504     private static final int  INT_SIGN   = 1 << 31;
    505 
    506     // masks
    507     private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
    508     private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
    509     private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
    510 
    511     // units for incrementing and decrementing
    512     private static final long TC_UNIT    = 1L << TC_SHIFT;
    513     private static final long AC_UNIT    = 1L << AC_SHIFT;
    514 
    515     // masks and units for dealing with u = (int)(ctl >>> 32)
    516     private static final int  UAC_SHIFT  = AC_SHIFT - 32;
    517     private static final int  UTC_SHIFT  = TC_SHIFT - 32;
    518     private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
    519     private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
    520     private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
    521     private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
    522 
    523     // masks and units for dealing with e = (int)ctl
    524     private static final int  E_MASK     = 0x7fffffff; // no STOP_BIT
    525     private static final int  EC_UNIT    = 1 << EC_SHIFT;
    526 
    527     /**
    528      * The target parallelism level.
    529      */
    530     final int parallelism;
    531 
    532     /**
    533      * Index (mod submission queue length) of next element to take
    534      * from submission queue. Usage is identical to that for
    535      * per-worker queues -- see ForkJoinWorkerThread internal
    536      * documentation.
    537      */
    538     volatile int queueBase;
    539 
    540     /**
    541      * Index (mod submission queue length) of next element to add
    542      * in submission queue. Usage is identical to that for
    543      * per-worker queues -- see ForkJoinWorkerThread internal
    544      * documentation.
    545      */
    546     int queueTop;
    547 
    548     /**
    549      * True when shutdown() has been called.
    550      */
    551     volatile boolean shutdown;
    552 
    553     /**
    554      * True if use local fifo, not default lifo, for local polling.
    555      * Read by, and replicated by ForkJoinWorkerThreads.
    556      */
    557     final boolean locallyFifo;
    558 
    559     /**
    560      * The number of threads in ForkJoinWorkerThreads.helpQuiescePool.
    561      * When non-zero, suppresses automatic shutdown when active
    562      * counts become zero.
    563      */
    564     volatile int quiescerCount;
    565 
    566     /**
    567      * The number of threads blocked in join.
    568      */
    569     volatile int blockedCount;
    570 
    571     /**
    572      * Counter for worker Thread names (unrelated to their poolIndex)
    573      */
    574     private volatile int nextWorkerNumber;
    575 
    576     /**
    577      * The index for the next created worker. Accessed under scanGuard.
    578      */
    579     private int nextWorkerIndex;
    580 
    581     /**
    582      * SeqLock and index masking for updates to workers array.  Locked
    583      * when SG_UNIT is set. Unlocking clears bit by adding
    584      * SG_UNIT. Staleness of read-only operations can be checked by
    585      * comparing scanGuard to value before the reads. The low 16 bits
    586      * (i.e, anding with SMASK) hold (the smallest power of two
    587      * covering all worker indices, minus one, and is used to avoid
    588      * dealing with large numbers of null slots when the workers array
    589      * is overallocated.
    590      */
    591     volatile int scanGuard;
    592 
    593     private static final int SG_UNIT = 1 << 16;
    594 
    595     /**
    596      * The wakeup interval (in nanoseconds) for a worker waiting for a
    597      * task when the pool is quiescent to instead try to shrink the
    598      * number of workers.  The exact value does not matter too
    599      * much. It must be short enough to release resources during
    600      * sustained periods of idleness, but not so short that threads
    601      * are continually re-created.
    602      */
    603     private static final long SHRINK_RATE =
    604         4L * 1000L * 1000L * 1000L; // 4 seconds
    605 
    606     /**
    607      * Top-level loop for worker threads: On each step: if the
    608      * previous step swept through all queues and found no tasks, or
    609      * there are excess threads, then possibly blocks. Otherwise,
    610      * scans for and, if found, executes a task. Returns when pool
    611      * and/or worker terminate.
    612      *
    613      * @param w the worker
    614      */
    615     final void work(ForkJoinWorkerThread w) {
    616         boolean swept = false;                // true on empty scans
    617         long c;
    618         while (!w.terminate && (int)(c = ctl) >= 0) {
    619             int a;                            // active count
    620             if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
    621                 swept = scan(w, a);
    622             else if (tryAwaitWork(w, c))
    623                 swept = false;
    624         }
    625     }
    626 
    627     // Signalling
    628 
    629     /**
    630      * Wakes up or creates a worker.
    631      */
    632     final void signalWork() {
    633         /*
    634          * The while condition is true if: (there is are too few total
    635          * workers OR there is at least one waiter) AND (there are too
    636          * few active workers OR the pool is terminating).  The value
    637          * of e distinguishes the remaining cases: zero (no waiters)
    638          * for create, negative if terminating (in which case do
    639          * nothing), else release a waiter. The secondary checks for
    640          * release (non-null array etc) can fail if the pool begins
    641          * terminating after the test, and don't impose any added cost
    642          * because JVMs must perform null and bounds checks anyway.
    643          */
    644         long c; int e, u;
    645         while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &
    646                 (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) {
    647             if (e > 0) {                         // release a waiting worker
    648                 int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
    649                 if ((ws = workers) == null ||
    650                     (i = ~e & SMASK) >= ws.length ||
    651                     (w = ws[i]) == null)
    652                     break;
    653                 long nc = (((long)(w.nextWait & E_MASK)) |
    654                            ((long)(u + UAC_UNIT) << 32));
    655                 if (w.eventCount == e &&
    656                     UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
    657                     w.eventCount = (e + EC_UNIT) & E_MASK;
    658                     if (w.parked)
    659                         UNSAFE.unpark(w);
    660                     break;
    661                 }
    662             }
    663             else if (UNSAFE.compareAndSwapLong
    664                      (this, ctlOffset, c,
    665                       (long)(((u + UTC_UNIT) & UTC_MASK) |
    666                              ((u + UAC_UNIT) & UAC_MASK)) << 32)) {
    667                 addWorker();
    668                 break;
    669             }
    670         }
    671     }
    672 
    673     /**
    674      * Variant of signalWork to help release waiters on rescans.
    675      * Tries once to release a waiter if active count < 0.
    676      *
    677      * @return false if failed due to contention, else true
    678      */
    679     private boolean tryReleaseWaiter() {
    680         long c; int e, i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
    681         if ((e = (int)(c = ctl)) > 0 &&
    682             (int)(c >> AC_SHIFT) < 0 &&
    683             (ws = workers) != null &&
    684             (i = ~e & SMASK) < ws.length &&
    685             (w = ws[i]) != null) {
    686             long nc = ((long)(w.nextWait & E_MASK) |
    687                        ((c + AC_UNIT) & (AC_MASK|TC_MASK)));
    688             if (w.eventCount != e ||
    689                 !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
    690                 return false;
    691             w.eventCount = (e + EC_UNIT) & E_MASK;
    692             if (w.parked)
    693                 UNSAFE.unpark(w);
    694         }
    695         return true;
    696     }
    697 
    698     // Scanning for tasks
    699 
    700     /**
    701      * Scans for and, if found, executes one task. Scans start at a
    702      * random index of workers array, and randomly select the first
    703      * (2*#workers)-1 probes, and then, if all empty, resort to 2
    704      * circular sweeps, which is necessary to check quiescence. and
    705      * taking a submission only if no stealable tasks were found.  The
    706      * steal code inside the loop is a specialized form of
    707      * ForkJoinWorkerThread.deqTask, followed bookkeeping to support
    708      * helpJoinTask and signal propagation. The code for submission
    709      * queues is almost identical. On each steal, the worker completes
    710      * not only the task, but also all local tasks that this task may
    711      * have generated. On detecting staleness or contention when
    712      * trying to take a task, this method returns without finishing
    713      * sweep, which allows global state rechecks before retry.
    714      *
    715      * @param w the worker
    716      * @param a the number of active workers
    717      * @return true if swept all queues without finding a task
    718      */
    719     private boolean scan(ForkJoinWorkerThread w, int a) {
    720         int g = scanGuard; // mask 0 avoids useless scans if only one active
    721         int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
    722         ForkJoinWorkerThread[] ws = workers;
    723         if (ws == null || ws.length <= m)         // staleness check
    724             return false;
    725         for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
    726             ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
    727             ForkJoinWorkerThread v = ws[k & m];
    728             if (v != null && (b = v.queueBase) != v.queueTop &&
    729                 (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
    730                 long u = (i << ASHIFT) + ABASE;
    731                 if ((t = q[i]) != null && v.queueBase == b &&
    732                     UNSAFE.compareAndSwapObject(q, u, t, null)) {
    733                     int d = (v.queueBase = b + 1) - v.queueTop;
    734                     v.stealHint = w.poolIndex;
    735                     if (d != 0)
    736                         signalWork();             // propagate if nonempty
    737                     w.execTask(t);
    738                 }
    739                 r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
    740                 return false;                     // store next seed
    741             }
    742             else if (j < 0) {                     // xorshift
    743                 r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
    744             }
    745             else
    746                 ++k;
    747         }
    748         if (scanGuard != g)                       // staleness check
    749             return false;
    750         else {                                    // try to take submission
    751             ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
    752             if ((b = queueBase) != queueTop &&
    753                 (q = submissionQueue) != null &&
    754                 (i = (q.length - 1) & b) >= 0) {
    755                 long u = (i << ASHIFT) + ABASE;
    756                 if ((t = q[i]) != null && queueBase == b &&
    757                     UNSAFE.compareAndSwapObject(q, u, t, null)) {
    758                     queueBase = b + 1;
    759                     w.execTask(t);
    760                 }
    761                 return false;
    762             }
    763             return true;                         // all queues empty
    764         }
    765     }
    766 
    767     /**
    768      * Tries to enqueue worker w in wait queue and await change in
    769      * worker's eventCount.  If the pool is quiescent and there is
    770      * more than one worker, possibly terminates worker upon exit.
    771      * Otherwise, before blocking, rescans queues to avoid missed
    772      * signals.  Upon finding work, releases at least one worker
    773      * (which may be the current worker). Rescans restart upon
    774      * detected staleness or failure to release due to
    775      * contention. Note the unusual conventions about Thread.interrupt
    776      * here and elsewhere: Because interrupts are used solely to alert
    777      * threads to check termination, which is checked here anyway, we
    778      * clear status (using Thread.interrupted) before any call to
    779      * park, so that park does not immediately return due to status
    780      * being set via some other unrelated call to interrupt in user
    781      * code.
    782      *
    783      * @param w the calling worker
    784      * @param c the ctl value on entry
    785      * @return true if waited or another thread was released upon enq
    786      */
    787     private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
    788         int v = w.eventCount;
    789         w.nextWait = (int)c;                      // w's successor record
    790         long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
    791         if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
    792             long d = ctl; // return true if lost to a deq, to force scan
    793             return (int)d != (int)c && (d & AC_MASK) >= (c & AC_MASK);
    794         }
    795         for (int sc = w.stealCount; sc != 0;) {   // accumulate stealCount
    796             long s = stealCount;
    797             if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
    798                 sc = w.stealCount = 0;
    799             else if (w.eventCount != v)
    800                 return true;                      // update next time
    801         }
    802         if ((!shutdown || !tryTerminate(false)) &&
    803             (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
    804             blockedCount == 0 && quiescerCount == 0)
    805             idleAwaitWork(w, nc, c, v);           // quiescent
    806         for (boolean rescanned = false;;) {
    807             if (w.eventCount != v)
    808                 return true;
    809             if (!rescanned) {
    810                 int g = scanGuard, m = g & SMASK;
    811                 ForkJoinWorkerThread[] ws = workers;
    812                 if (ws != null && m < ws.length) {
    813                     rescanned = true;
    814                     for (int i = 0; i <= m; ++i) {
    815                         ForkJoinWorkerThread u = ws[i];
    816                         if (u != null) {
    817                             if (u.queueBase != u.queueTop &&
    818                                 !tryReleaseWaiter())
    819                                 rescanned = false; // contended
    820                             if (w.eventCount != v)
    821                                 return true;
    822                         }
    823                     }
    824                 }
    825                 if (scanGuard != g ||              // stale
    826                     (queueBase != queueTop && !tryReleaseWaiter()))
    827                     rescanned = false;
    828                 if (!rescanned)
    829                     Thread.yield();                // reduce contention
    830                 else
    831                     Thread.interrupted();          // clear before park
    832             }
    833             else {
    834                 w.parked = true;                   // must recheck
    835                 if (w.eventCount != v) {
    836                     w.parked = false;
    837                     return true;
    838                 }
    839                 LockSupport.park(this);
    840                 rescanned = w.parked = false;
    841             }
    842         }
    843     }
    844 
    845     /**
    846      * If inactivating worker w has caused pool to become
    847      * quiescent, check for pool termination, and wait for event
    848      * for up to SHRINK_RATE nanosecs (rescans are unnecessary in
    849      * this case because quiescence reflects consensus about lack
    850      * of work). On timeout, if ctl has not changed, terminate the
    851      * worker. Upon its termination (see deregisterWorker), it may
    852      * wake up another worker to possibly repeat this process.
    853      *
    854      * @param w the calling worker
    855      * @param currentCtl the ctl value after enqueuing w
    856      * @param prevCtl the ctl value if w terminated
    857      * @param v the eventCount w awaits change
    858      */
    859     private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
    860                                long prevCtl, int v) {
    861         if (w.eventCount == v) {
    862             if (shutdown)
    863                 tryTerminate(false);
    864             ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
    865             while (ctl == currentCtl) {
    866                 long startTime = System.nanoTime();
    867                 w.parked = true;
    868                 if (w.eventCount == v)             // must recheck
    869                     LockSupport.parkNanos(this, SHRINK_RATE);
    870                 w.parked = false;
    871                 if (w.eventCount != v)
    872                     break;
    873                 else if (System.nanoTime() - startTime <
    874                          SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop
    875                     Thread.interrupted();          // spurious wakeup
    876                 else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
    877                                                    currentCtl, prevCtl)) {
    878                     w.terminate = true;            // restore previous
    879                     w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK;
    880                     break;
    881                 }
    882             }
    883         }
    884     }
    885 
    886     // Submissions
    887 
    888     /**
    889      * Enqueues the given task in the submissionQueue.  Same idea as
    890      * ForkJoinWorkerThread.pushTask except for use of submissionLock.
    891      *
    892      * @param t the task
    893      */
    894     private void addSubmission(ForkJoinTask<?> t) {
    895         final ReentrantLock lock = this.submissionLock;
    896         lock.lock();
    897         try {
    898             ForkJoinTask<?>[] q; int s, m;
    899             if ((q = submissionQueue) != null) {    // ignore if queue removed
    900                 long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
    901                 UNSAFE.putOrderedObject(q, u, t);
    902                 queueTop = s + 1;
    903                 if (s - queueBase == m)
    904                     growSubmissionQueue();
    905             }
    906         } finally {
    907             lock.unlock();
    908         }
    909         signalWork();
    910     }
    911 
    912     //  (pollSubmission is defined below with exported methods)
    913 
    914     /**
    915      * Creates or doubles submissionQueue array.
    916      * Basically identical to ForkJoinWorkerThread version.
    917      */
    918     private void growSubmissionQueue() {
    919         ForkJoinTask<?>[] oldQ = submissionQueue;
    920         int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
    921         if (size > MAXIMUM_QUEUE_CAPACITY)
    922             throw new RejectedExecutionException("Queue capacity exceeded");
    923         if (size < INITIAL_QUEUE_CAPACITY)
    924             size = INITIAL_QUEUE_CAPACITY;
    925         ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size];
    926         int mask = size - 1;
    927         int top = queueTop;
    928         int oldMask;
    929         if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
    930             for (int b = queueBase; b != top; ++b) {
    931                 long u = ((b & oldMask) << ASHIFT) + ABASE;
    932                 Object x = UNSAFE.getObjectVolatile(oldQ, u);
    933                 if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
    934                     UNSAFE.putObjectVolatile
    935                         (q, ((b & mask) << ASHIFT) + ABASE, x);
    936             }
    937         }
    938     }
    939 
    940     // Blocking support
    941 
    942     /**
    943      * Tries to increment blockedCount, decrement active count
    944      * (sometimes implicitly) and possibly release or create a
    945      * compensating worker in preparation for blocking. Fails
    946      * on contention or termination.
    947      *
    948      * @return true if the caller can block, else should recheck and retry
    949      */
    950     private boolean tryPreBlock() {
    951         int b = blockedCount;
    952         if (UNSAFE.compareAndSwapInt(this, blockedCountOffset, b, b + 1)) {
    953             int pc = parallelism;
    954             do {
    955                 ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w;
    956                 int e, ac, tc, i;
    957                 long c = ctl;
    958                 int u = (int)(c >>> 32);
    959                 if ((e = (int)c) < 0) {
    960                                                  // skip -- terminating
    961                 }
    962                 else if ((ac = (u >> UAC_SHIFT)) <= 0 && e != 0 &&
    963                          (ws = workers) != null &&
    964                          (i = ~e & SMASK) < ws.length &&
    965                          (w = ws[i]) != null) {
    966                     long nc = ((long)(w.nextWait & E_MASK) |
    967                                (c & (AC_MASK|TC_MASK)));
    968                     if (w.eventCount == e &&
    969                         UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
    970                         w.eventCount = (e + EC_UNIT) & E_MASK;
    971                         if (w.parked)
    972                             UNSAFE.unpark(w);
    973                         return true;             // release an idle worker
    974                     }
    975                 }
    976                 else if ((tc = (short)(u >>> UTC_SHIFT)) >= 0 && ac + pc > 1) {
    977                     long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
    978                     if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
    979                         return true;             // no compensation needed
    980                 }
    981                 else if (tc + pc < MAX_ID) {
    982                     long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
    983                     if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
    984                         addWorker();
    985                         return true;            // create a replacement
    986                     }
    987                 }
    988                 // try to back out on any failure and let caller retry
    989             } while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
    990                                                b = blockedCount, b - 1));
    991         }
    992         return false;
    993     }
    994 
    995     /**
    996      * Decrements blockedCount and increments active count.
    997      */
    998     private void postBlock() {
    999         long c;
   1000         do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset,  // no mask
   1001                                                 c = ctl, c + AC_UNIT));
   1002         int b;
   1003         do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
   1004                                                b = blockedCount, b - 1));
   1005     }
   1006 
   1007     /**
   1008      * Possibly blocks waiting for the given task to complete, or
   1009      * cancels the task if terminating.  Fails to wait if contended.
   1010      *
   1011      * @param joinMe the task
   1012      */
   1013     final void tryAwaitJoin(ForkJoinTask<?> joinMe) {
   1014         Thread.interrupted(); // clear interrupts before checking termination
   1015         if (joinMe.status >= 0) {
   1016             if (tryPreBlock()) {
   1017                 joinMe.tryAwaitDone(0L);
   1018                 postBlock();
   1019             }
   1020             else if ((ctl & STOP_BIT) != 0L)
   1021                 joinMe.cancelIgnoringExceptions();
   1022         }
   1023     }
   1024 
   1025     /**
   1026      * Possibly blocks the given worker waiting for joinMe to
   1027      * complete or timeout.
   1028      *
   1029      * @param joinMe the task
   1030      * @param nanos the wait time for underlying Object.wait
   1031      */
   1032     final void timedAwaitJoin(ForkJoinTask<?> joinMe, long nanos) {
   1033         while (joinMe.status >= 0) {
   1034             Thread.interrupted();
   1035             if ((ctl & STOP_BIT) != 0L) {
   1036                 joinMe.cancelIgnoringExceptions();
   1037                 break;
   1038             }
   1039             if (tryPreBlock()) {
   1040                 long last = System.nanoTime();
   1041                 while (joinMe.status >= 0) {
   1042                     long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
   1043                     if (millis <= 0)
   1044                         break;
   1045                     joinMe.tryAwaitDone(millis);
   1046                     if (joinMe.status < 0)
   1047                         break;
   1048                     if ((ctl & STOP_BIT) != 0L) {
   1049                         joinMe.cancelIgnoringExceptions();
   1050                         break;
   1051                     }
   1052                     long now = System.nanoTime();
   1053                     nanos -= now - last;
   1054                     last = now;
   1055                 }
   1056                 postBlock();
   1057                 break;
   1058             }
   1059         }
   1060     }
   1061 
   1062     /**
   1063      * If necessary, compensates for blocker, and blocks.
   1064      */
   1065     private void awaitBlocker(ManagedBlocker blocker)
   1066         throws InterruptedException {
   1067         while (!blocker.isReleasable()) {
   1068             if (tryPreBlock()) {
   1069                 try {
   1070                     do {} while (!blocker.isReleasable() && !blocker.block());
   1071                 } finally {
   1072                     postBlock();
   1073                 }
   1074                 break;
   1075             }
   1076         }
   1077     }
   1078 
   1079     // Creating, registering and deregistring workers
   1080 
   1081     /**
   1082      * Tries to create and start a worker; minimally rolls back counts
   1083      * on failure.
   1084      */
   1085     private void addWorker() {
   1086         Throwable ex = null;
   1087         ForkJoinWorkerThread t = null;
   1088         try {
   1089             t = factory.newThread(this);
   1090         } catch (Throwable e) {
   1091             ex = e;
   1092         }
   1093         if (t == null) {  // null or exceptional factory return
   1094             long c;       // adjust counts
   1095             do {} while (!UNSAFE.compareAndSwapLong
   1096                          (this, ctlOffset, c = ctl,
   1097                           (((c - AC_UNIT) & AC_MASK) |
   1098                            ((c - TC_UNIT) & TC_MASK) |
   1099                            (c & ~(AC_MASK|TC_MASK)))));
   1100             // Propagate exception if originating from an external caller
   1101             if (!tryTerminate(false) && ex != null &&
   1102                 !(Thread.currentThread() instanceof ForkJoinWorkerThread))
   1103                 SneakyThrow.sneakyThrow(ex); // android-changed
   1104         }
   1105         else
   1106             t.start();
   1107     }
   1108 
   1109     /**
   1110      * Callback from ForkJoinWorkerThread constructor to assign a
   1111      * public name
   1112      */
   1113     final String nextWorkerName() {
   1114         for (int n;;) {
   1115             if (UNSAFE.compareAndSwapInt(this, nextWorkerNumberOffset,
   1116                                          n = nextWorkerNumber, ++n))
   1117                 return workerNamePrefix + n;
   1118         }
   1119     }
   1120 
   1121     /**
   1122      * Callback from ForkJoinWorkerThread constructor to
   1123      * determine its poolIndex and record in workers array.
   1124      *
   1125      * @param w the worker
   1126      * @return the worker's pool index
   1127      */
   1128     final int registerWorker(ForkJoinWorkerThread w) {
   1129         /*
   1130          * In the typical case, a new worker acquires the lock, uses
   1131          * next available index and returns quickly.  Since we should
   1132          * not block callers (ultimately from signalWork or
   1133          * tryPreBlock) waiting for the lock needed to do this, we
   1134          * instead help release other workers while waiting for the
   1135          * lock.
   1136          */
   1137         for (int g;;) {
   1138             ForkJoinWorkerThread[] ws;
   1139             if (((g = scanGuard) & SG_UNIT) == 0 &&
   1140                 UNSAFE.compareAndSwapInt(this, scanGuardOffset,
   1141                                          g, g | SG_UNIT)) {
   1142                 int k = nextWorkerIndex;
   1143                 try {
   1144                     if ((ws = workers) != null) { // ignore on shutdown
   1145                         int n = ws.length;
   1146                         if (k < 0 || k >= n || ws[k] != null) {
   1147                             for (k = 0; k < n && ws[k] != null; ++k)
   1148                                 ;
   1149                             if (k == n)
   1150                                 ws = workers = Arrays.copyOf(ws, n << 1);
   1151                         }
   1152                         ws[k] = w;
   1153                         nextWorkerIndex = k + 1;
   1154                         int m = g & SMASK;
   1155                         g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
   1156                     }
   1157                 } finally {
   1158                     scanGuard = g;
   1159                 }
   1160                 return k;
   1161             }
   1162             else if ((ws = workers) != null) { // help release others
   1163                 for (ForkJoinWorkerThread u : ws) {
   1164                     if (u != null && u.queueBase != u.queueTop) {
   1165                         if (tryReleaseWaiter())
   1166                             break;
   1167                     }
   1168                 }
   1169             }
   1170         }
   1171     }
   1172 
   1173     /**
   1174      * Final callback from terminating worker.  Removes record of
   1175      * worker from array, and adjusts counts. If pool is shutting
   1176      * down, tries to complete termination.
   1177      *
   1178      * @param w the worker
   1179      */
   1180     final void deregisterWorker(ForkJoinWorkerThread w, Throwable ex) {
   1181         int idx = w.poolIndex;
   1182         int sc = w.stealCount;
   1183         int steps = 0;
   1184         // Remove from array, adjust worker counts and collect steal count.
   1185         // We can intermix failed removes or adjusts with steal updates
   1186         do {
   1187             long s, c;
   1188             int g;
   1189             if (steps == 0 && ((g = scanGuard) & SG_UNIT) == 0 &&
   1190                 UNSAFE.compareAndSwapInt(this, scanGuardOffset,
   1191                                          g, g |= SG_UNIT)) {
   1192                 ForkJoinWorkerThread[] ws = workers;
   1193                 if (ws != null && idx >= 0 &&
   1194                     idx < ws.length && ws[idx] == w)
   1195                     ws[idx] = null;    // verify
   1196                 nextWorkerIndex = idx;
   1197                 scanGuard = g + SG_UNIT;
   1198                 steps = 1;
   1199             }
   1200             if (steps == 1 &&
   1201                 UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
   1202                                           (((c - AC_UNIT) & AC_MASK) |
   1203                                            ((c - TC_UNIT) & TC_MASK) |
   1204                                            (c & ~(AC_MASK|TC_MASK)))))
   1205                 steps = 2;
   1206             if (sc != 0 &&
   1207                 UNSAFE.compareAndSwapLong(this, stealCountOffset,
   1208                                           s = stealCount, s + sc))
   1209                 sc = 0;
   1210         } while (steps != 2 || sc != 0);
   1211         if (!tryTerminate(false)) {
   1212             if (ex != null)   // possibly replace if died abnormally
   1213                 signalWork();
   1214             else
   1215                 tryReleaseWaiter();
   1216         }
   1217     }
   1218 
   1219     // Shutdown and termination
   1220 
   1221     /**
   1222      * Possibly initiates and/or completes termination.
   1223      *
   1224      * @param now if true, unconditionally terminate, else only
   1225      * if shutdown and empty queue and no active workers
   1226      * @return true if now terminating or terminated
   1227      */
   1228     private boolean tryTerminate(boolean now) {
   1229         long c;
   1230         while (((c = ctl) & STOP_BIT) == 0) {
   1231             if (!now) {
   1232                 if ((int)(c >> AC_SHIFT) != -parallelism)
   1233                     return false;
   1234                 if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
   1235                     queueBase != queueTop) {
   1236                     if (ctl == c) // staleness check
   1237                         return false;
   1238                     continue;
   1239                 }
   1240             }
   1241             if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
   1242                 startTerminating();
   1243         }
   1244         if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
   1245             final ReentrantLock lock = this.submissionLock;
   1246             lock.lock();
   1247             try {
   1248                 termination.signalAll();
   1249             } finally {
   1250                 lock.unlock();
   1251             }
   1252         }
   1253         return true;
   1254     }
   1255 
   1256     /**
   1257      * Runs up to three passes through workers: (0) Setting
   1258      * termination status for each worker, followed by wakeups up to
   1259      * queued workers; (1) helping cancel tasks; (2) interrupting
   1260      * lagging threads (likely in external tasks, but possibly also
   1261      * blocked in joins).  Each pass repeats previous steps because of
   1262      * potential lagging thread creation.
   1263      */
   1264     private void startTerminating() {
   1265         cancelSubmissions();
   1266         for (int pass = 0; pass < 3; ++pass) {
   1267             ForkJoinWorkerThread[] ws = workers;
   1268             if (ws != null) {
   1269                 for (ForkJoinWorkerThread w : ws) {
   1270                     if (w != null) {
   1271                         w.terminate = true;
   1272                         if (pass > 0) {
   1273                             w.cancelTasks();
   1274                             if (pass > 1 && !w.isInterrupted()) {
   1275                                 try {
   1276                                     w.interrupt();
   1277                                 } catch (SecurityException ignore) {
   1278                                 }
   1279                             }
   1280                         }
   1281                     }
   1282                 }
   1283                 terminateWaiters();
   1284             }
   1285         }
   1286     }
   1287 
   1288     /**
   1289      * Polls and cancels all submissions. Called only during termination.
   1290      */
   1291     private void cancelSubmissions() {
   1292         while (queueBase != queueTop) {
   1293             ForkJoinTask<?> task = pollSubmission();
   1294             if (task != null) {
   1295                 try {
   1296                     task.cancel(false);
   1297                 } catch (Throwable ignore) {
   1298                 }
   1299             }
   1300         }
   1301     }
   1302 
   1303     /**
   1304      * Tries to set the termination status of waiting workers, and
   1305      * then wakes them up (after which they will terminate).
   1306      */
   1307     private void terminateWaiters() {
   1308         ForkJoinWorkerThread[] ws = workers;
   1309         if (ws != null) {
   1310             ForkJoinWorkerThread w; long c; int i, e;
   1311             int n = ws.length;
   1312             while ((i = ~(e = (int)(c = ctl)) & SMASK) < n &&
   1313                    (w = ws[i]) != null && w.eventCount == (e & E_MASK)) {
   1314                 if (UNSAFE.compareAndSwapLong(this, ctlOffset, c,
   1315                                               (long)(w.nextWait & E_MASK) |
   1316                                               ((c + AC_UNIT) & AC_MASK) |
   1317                                               (c & (TC_MASK|STOP_BIT)))) {
   1318                     w.terminate = true;
   1319                     w.eventCount = e + EC_UNIT;
   1320                     if (w.parked)
   1321                         UNSAFE.unpark(w);
   1322                 }
   1323             }
   1324         }
   1325     }
   1326 
   1327     // misc ForkJoinWorkerThread support
   1328 
   1329     /**
   1330      * Increments or decrements quiescerCount. Needed only to prevent
   1331      * triggering shutdown if a worker is transiently inactive while
   1332      * checking quiescence.
   1333      *
   1334      * @param delta 1 for increment, -1 for decrement
   1335      */
   1336     final void addQuiescerCount(int delta) {
   1337         int c;
   1338         do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
   1339                                                c = quiescerCount, c + delta));
   1340     }
   1341 
   1342     /**
   1343      * Directly increments or decrements active count without queuing.
   1344      * This method is used to transiently assert inactivation while
   1345      * checking quiescence.
   1346      *
   1347      * @param delta 1 for increment, -1 for decrement
   1348      */
   1349     final void addActiveCount(int delta) {
   1350         long d = (long)delta << AC_SHIFT;
   1351         long c;
   1352         do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset,
   1353                                                 c = ctl, c + d));
   1354     }
   1355 
   1356     /**
   1357      * Returns the approximate (non-atomic) number of idle threads per
   1358      * active thread.
   1359      */
   1360     final int idlePerActive() {
   1361         // Approximate at powers of two for small values, saturate past 4
   1362         int p = parallelism;
   1363         int a = p + (int)(ctl >> AC_SHIFT);
   1364         return (a > (p >>>= 1) ? 0 :
   1365                 a > (p >>>= 1) ? 1 :
   1366                 a > (p >>>= 1) ? 2 :
   1367                 a > (p >>>= 1) ? 4 :
   1368                 8);
   1369     }
   1370 
   1371     // Exported methods
   1372 
   1373     // Constructors
   1374 
   1375     /**
   1376      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
   1377      * java.lang.Runtime#availableProcessors}, using the {@linkplain
   1378      * #defaultForkJoinWorkerThreadFactory default thread factory},
   1379      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
   1380      */
   1381     public ForkJoinPool() {
   1382         this(Runtime.getRuntime().availableProcessors(),
   1383              defaultForkJoinWorkerThreadFactory, null, false);
   1384     }
   1385 
   1386     /**
   1387      * Creates a {@code ForkJoinPool} with the indicated parallelism
   1388      * level, the {@linkplain
   1389      * #defaultForkJoinWorkerThreadFactory default thread factory},
   1390      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
   1391      *
   1392      * @param parallelism the parallelism level
   1393      * @throws IllegalArgumentException if parallelism less than or
   1394      *         equal to zero, or greater than implementation limit
   1395      */
   1396     public ForkJoinPool(int parallelism) {
   1397         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
   1398     }
   1399 
   1400     /**
   1401      * Creates a {@code ForkJoinPool} with the given parameters.
   1402      *
   1403      * @param parallelism the parallelism level. For default value,
   1404      * use {@link java.lang.Runtime#availableProcessors}.
   1405      * @param factory the factory for creating new threads. For default value,
   1406      * use {@link #defaultForkJoinWorkerThreadFactory}.
   1407      * @param handler the handler for internal worker threads that
   1408      * terminate due to unrecoverable errors encountered while executing
   1409      * tasks. For default value, use {@code null}.
   1410      * @param asyncMode if true,
   1411      * establishes local first-in-first-out scheduling mode for forked
   1412      * tasks that are never joined. This mode may be more appropriate
   1413      * than default locally stack-based mode in applications in which
   1414      * worker threads only process event-style asynchronous tasks.
   1415      * For default value, use {@code false}.
   1416      * @throws IllegalArgumentException if parallelism less than or
   1417      *         equal to zero, or greater than implementation limit
   1418      * @throws NullPointerException if the factory is null
   1419      */
   1420     public ForkJoinPool(int parallelism,
   1421                         ForkJoinWorkerThreadFactory factory,
   1422                         Thread.UncaughtExceptionHandler handler,
   1423                         boolean asyncMode) {
   1424         checkPermission();
   1425         if (factory == null)
   1426             throw new NullPointerException();
   1427         if (parallelism <= 0 || parallelism > MAX_ID)
   1428             throw new IllegalArgumentException();
   1429         this.parallelism = parallelism;
   1430         this.factory = factory;
   1431         this.ueh = handler;
   1432         this.locallyFifo = asyncMode;
   1433         long np = (long)(-parallelism); // offset ctl counts
   1434         this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
   1435         this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
   1436         // initialize workers array with room for 2*parallelism if possible
   1437         int n = parallelism << 1;
   1438         if (n >= MAX_ID)
   1439             n = MAX_ID;
   1440         else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
   1441             n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
   1442         }
   1443         workers = new ForkJoinWorkerThread[n + 1];
   1444         this.submissionLock = new ReentrantLock();
   1445         this.termination = submissionLock.newCondition();
   1446         StringBuilder sb = new StringBuilder("ForkJoinPool-");
   1447         sb.append(poolNumberGenerator.incrementAndGet());
   1448         sb.append("-worker-");
   1449         this.workerNamePrefix = sb.toString();
   1450     }
   1451 
   1452     // Execution methods
   1453 
   1454     /**
   1455      * Performs the given task, returning its result upon completion.
   1456      * If the computation encounters an unchecked Exception or Error,
   1457      * it is rethrown as the outcome of this invocation.  Rethrown
   1458      * exceptions behave in the same way as regular exceptions, but,
   1459      * when possible, contain stack traces (as displayed for example
   1460      * using {@code ex.printStackTrace()}) of both the current thread
   1461      * as well as the thread actually encountering the exception;
   1462      * minimally only the latter.
   1463      *
   1464      * @param task the task
   1465      * @return the task's result
   1466      * @throws NullPointerException if the task is null
   1467      * @throws RejectedExecutionException if the task cannot be
   1468      *         scheduled for execution
   1469      */
   1470     public <T> T invoke(ForkJoinTask<T> task) {
   1471         Thread t = Thread.currentThread();
   1472         if (task == null)
   1473             throw new NullPointerException();
   1474         if (shutdown)
   1475             throw new RejectedExecutionException();
   1476         if ((t instanceof ForkJoinWorkerThread) &&
   1477             ((ForkJoinWorkerThread)t).pool == this)
   1478             return task.invoke();  // bypass submit if in same pool
   1479         else {
   1480             addSubmission(task);
   1481             return task.join();
   1482         }
   1483     }
   1484 
   1485     /**
   1486      * Unless terminating, forks task if within an ongoing FJ
   1487      * computation in the current pool, else submits as external task.
   1488      */
   1489     private <T> void forkOrSubmit(ForkJoinTask<T> task) {
   1490         ForkJoinWorkerThread w;
   1491         Thread t = Thread.currentThread();
   1492         if (shutdown)
   1493             throw new RejectedExecutionException();
   1494         if ((t instanceof ForkJoinWorkerThread) &&
   1495             (w = (ForkJoinWorkerThread)t).pool == this)
   1496             w.pushTask(task);
   1497         else
   1498             addSubmission(task);
   1499     }
   1500 
   1501     /**
   1502      * Arranges for (asynchronous) execution of the given task.
   1503      *
   1504      * @param task the task
   1505      * @throws NullPointerException if the task is null
   1506      * @throws RejectedExecutionException if the task cannot be
   1507      *         scheduled for execution
   1508      */
   1509     public void execute(ForkJoinTask<?> task) {
   1510         if (task == null)
   1511             throw new NullPointerException();
   1512         forkOrSubmit(task);
   1513     }
   1514 
   1515     // AbstractExecutorService methods
   1516 
   1517     /**
   1518      * @throws NullPointerException if the task is null
   1519      * @throws RejectedExecutionException if the task cannot be
   1520      *         scheduled for execution
   1521      */
   1522     public void execute(Runnable task) {
   1523         if (task == null)
   1524             throw new NullPointerException();
   1525         ForkJoinTask<?> job;
   1526         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
   1527             job = (ForkJoinTask<?>) task;
   1528         else
   1529             job = ForkJoinTask.adapt(task, null);
   1530         forkOrSubmit(job);
   1531     }
   1532 
   1533     /**
   1534      * Submits a ForkJoinTask for execution.
   1535      *
   1536      * @param task the task to submit
   1537      * @return the task
   1538      * @throws NullPointerException if the task is null
   1539      * @throws RejectedExecutionException if the task cannot be
   1540      *         scheduled for execution
   1541      */
   1542     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
   1543         if (task == null)
   1544             throw new NullPointerException();
   1545         forkOrSubmit(task);
   1546         return task;
   1547     }
   1548 
   1549     /**
   1550      * @throws NullPointerException if the task is null
   1551      * @throws RejectedExecutionException if the task cannot be
   1552      *         scheduled for execution
   1553      */
   1554     public <T> ForkJoinTask<T> submit(Callable<T> task) {
   1555         if (task == null)
   1556             throw new NullPointerException();
   1557         ForkJoinTask<T> job = ForkJoinTask.adapt(task);
   1558         forkOrSubmit(job);
   1559         return job;
   1560     }
   1561 
   1562     /**
   1563      * @throws NullPointerException if the task is null
   1564      * @throws RejectedExecutionException if the task cannot be
   1565      *         scheduled for execution
   1566      */
   1567     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
   1568         if (task == null)
   1569             throw new NullPointerException();
   1570         ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
   1571         forkOrSubmit(job);
   1572         return job;
   1573     }
   1574 
   1575     /**
   1576      * @throws NullPointerException if the task is null
   1577      * @throws RejectedExecutionException if the task cannot be
   1578      *         scheduled for execution
   1579      */
   1580     public ForkJoinTask<?> submit(Runnable task) {
   1581         if (task == null)
   1582             throw new NullPointerException();
   1583         ForkJoinTask<?> job;
   1584         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
   1585             job = (ForkJoinTask<?>) task;
   1586         else
   1587             job = ForkJoinTask.adapt(task, null);
   1588         forkOrSubmit(job);
   1589         return job;
   1590     }
   1591 
   1592     /**
   1593      * @throws NullPointerException       {@inheritDoc}
   1594      * @throws RejectedExecutionException {@inheritDoc}
   1595      */
   1596     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
   1597         ArrayList<ForkJoinTask<T>> forkJoinTasks =
   1598             new ArrayList<ForkJoinTask<T>>(tasks.size());
   1599         for (Callable<T> task : tasks)
   1600             forkJoinTasks.add(ForkJoinTask.adapt(task));
   1601         invoke(new InvokeAll<T>(forkJoinTasks));
   1602 
   1603         @SuppressWarnings({"unchecked", "rawtypes"})
   1604             List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
   1605         return futures;
   1606     }
   1607 
   1608     static final class InvokeAll<T> extends RecursiveAction {
   1609         final ArrayList<ForkJoinTask<T>> tasks;
   1610         InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
   1611         public void compute() {
   1612             try { invokeAll(tasks); }
   1613             catch (Exception ignore) {}
   1614         }
   1615         private static final long serialVersionUID = -7914297376763021607L;
   1616     }
   1617 
   1618     /**
   1619      * Returns the factory used for constructing new workers.
   1620      *
   1621      * @return the factory used for constructing new workers
   1622      */
   1623     public ForkJoinWorkerThreadFactory getFactory() {
   1624         return factory;
   1625     }
   1626 
   1627     /**
   1628      * Returns the handler for internal worker threads that terminate
   1629      * due to unrecoverable errors encountered while executing tasks.
   1630      *
   1631      * @return the handler, or {@code null} if none
   1632      */
   1633     public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
   1634         return ueh;
   1635     }
   1636 
   1637     /**
   1638      * Returns the targeted parallelism level of this pool.
   1639      *
   1640      * @return the targeted parallelism level of this pool
   1641      */
   1642     public int getParallelism() {
   1643         return parallelism;
   1644     }
   1645 
   1646     /**
   1647      * Returns the number of worker threads that have started but not
   1648      * yet terminated.  The result returned by this method may differ
   1649      * from {@link #getParallelism} when threads are created to
   1650      * maintain parallelism when others are cooperatively blocked.
   1651      *
   1652      * @return the number of worker threads
   1653      */
   1654     public int getPoolSize() {
   1655         return parallelism + (short)(ctl >>> TC_SHIFT);
   1656     }
   1657 
   1658     /**
   1659      * Returns {@code true} if this pool uses local first-in-first-out
   1660      * scheduling mode for forked tasks that are never joined.
   1661      *
   1662      * @return {@code true} if this pool uses async mode
   1663      */
   1664     public boolean getAsyncMode() {
   1665         return locallyFifo;
   1666     }
   1667 
   1668     /**
   1669      * Returns an estimate of the number of worker threads that are
   1670      * not blocked waiting to join tasks or for other managed
   1671      * synchronization. This method may overestimate the
   1672      * number of running threads.
   1673      *
   1674      * @return the number of worker threads
   1675      */
   1676     public int getRunningThreadCount() {
   1677         int r = parallelism + (int)(ctl >> AC_SHIFT);
   1678         return (r <= 0) ? 0 : r; // suppress momentarily negative values
   1679     }
   1680 
   1681     /**
   1682      * Returns an estimate of the number of threads that are currently
   1683      * stealing or executing tasks. This method may overestimate the
   1684      * number of active threads.
   1685      *
   1686      * @return the number of active threads
   1687      */
   1688     public int getActiveThreadCount() {
   1689         int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount;
   1690         return (r <= 0) ? 0 : r; // suppress momentarily negative values
   1691     }
   1692 
   1693     /**
   1694      * Returns {@code true} if all worker threads are currently idle.
   1695      * An idle worker is one that cannot obtain a task to execute
   1696      * because none are available to steal from other threads, and
   1697      * there are no pending submissions to the pool. This method is
   1698      * conservative; it might not return {@code true} immediately upon
   1699      * idleness of all threads, but will eventually become true if
   1700      * threads remain inactive.
   1701      *
   1702      * @return {@code true} if all threads are currently idle
   1703      */
   1704     public boolean isQuiescent() {
   1705         return parallelism + (int)(ctl >> AC_SHIFT) + blockedCount == 0;
   1706     }
   1707 
   1708     /**
   1709      * Returns an estimate of the total number of tasks stolen from
   1710      * one thread's work queue by another. The reported value
   1711      * underestimates the actual total number of steals when the pool
   1712      * is not quiescent. This value may be useful for monitoring and
   1713      * tuning fork/join programs: in general, steal counts should be
   1714      * high enough to keep threads busy, but low enough to avoid
   1715      * overhead and contention across threads.
   1716      *
   1717      * @return the number of steals
   1718      */
   1719     public long getStealCount() {
   1720         return stealCount;
   1721     }
   1722 
   1723     /**
   1724      * Returns an estimate of the total number of tasks currently held
   1725      * in queues by worker threads (but not including tasks submitted
   1726      * to the pool that have not begun executing). This value is only
   1727      * an approximation, obtained by iterating across all threads in
   1728      * the pool. This method may be useful for tuning task
   1729      * granularities.
   1730      *
   1731      * @return the number of queued tasks
   1732      */
   1733     public long getQueuedTaskCount() {
   1734         long count = 0;
   1735         ForkJoinWorkerThread[] ws;
   1736         if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
   1737             (ws = workers) != null) {
   1738             for (ForkJoinWorkerThread w : ws)
   1739                 if (w != null)
   1740                     count -= w.queueBase - w.queueTop; // must read base first
   1741         }
   1742         return count;
   1743     }
   1744 
   1745     /**
   1746      * Returns an estimate of the number of tasks submitted to this
   1747      * pool that have not yet begun executing.  This method may take
   1748      * time proportional to the number of submissions.
   1749      *
   1750      * @return the number of queued submissions
   1751      */
   1752     public int getQueuedSubmissionCount() {
   1753         return -queueBase + queueTop;
   1754     }
   1755 
   1756     /**
   1757      * Returns {@code true} if there are any tasks submitted to this
   1758      * pool that have not yet begun executing.
   1759      *
   1760      * @return {@code true} if there are any queued submissions
   1761      */
   1762     public boolean hasQueuedSubmissions() {
   1763         return queueBase != queueTop;
   1764     }
   1765 
   1766     /**
   1767      * Removes and returns the next unexecuted submission if one is
   1768      * available.  This method may be useful in extensions to this
   1769      * class that re-assign work in systems with multiple pools.
   1770      *
   1771      * @return the next submission, or {@code null} if none
   1772      */
   1773     protected ForkJoinTask<?> pollSubmission() {
   1774         ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
   1775         while ((b = queueBase) != queueTop &&
   1776                (q = submissionQueue) != null &&
   1777                (i = (q.length - 1) & b) >= 0) {
   1778             long u = (i << ASHIFT) + ABASE;
   1779             if ((t = q[i]) != null &&
   1780                 queueBase == b &&
   1781                 UNSAFE.compareAndSwapObject(q, u, t, null)) {
   1782                 queueBase = b + 1;
   1783                 return t;
   1784             }
   1785         }
   1786         return null;
   1787     }
   1788 
   1789     /**
   1790      * Removes all available unexecuted submitted and forked tasks
   1791      * from scheduling queues and adds them to the given collection,
   1792      * without altering their execution status. These may include
   1793      * artificially generated or wrapped tasks. This method is
   1794      * designed to be invoked only when the pool is known to be
   1795      * quiescent. Invocations at other times may not remove all
   1796      * tasks. A failure encountered while attempting to add elements
   1797      * to collection {@code c} may result in elements being in
   1798      * neither, either or both collections when the associated
   1799      * exception is thrown.  The behavior of this operation is
   1800      * undefined if the specified collection is modified while the
   1801      * operation is in progress.
   1802      *
   1803      * @param c the collection to transfer elements into
   1804      * @return the number of elements transferred
   1805      */
   1806     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
   1807         int count = 0;
   1808         while (queueBase != queueTop) {
   1809             ForkJoinTask<?> t = pollSubmission();
   1810             if (t != null) {
   1811                 c.add(t);
   1812                 ++count;
   1813             }
   1814         }
   1815         ForkJoinWorkerThread[] ws;
   1816         if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
   1817             (ws = workers) != null) {
   1818             for (ForkJoinWorkerThread w : ws)
   1819                 if (w != null)
   1820                     count += w.drainTasksTo(c);
   1821         }
   1822         return count;
   1823     }
   1824 
   1825     /**
   1826      * Returns a string identifying this pool, as well as its state,
   1827      * including indications of run state, parallelism level, and
   1828      * worker and task counts.
   1829      *
   1830      * @return a string identifying this pool, as well as its state
   1831      */
   1832     public String toString() {
   1833         long st = getStealCount();
   1834         long qt = getQueuedTaskCount();
   1835         long qs = getQueuedSubmissionCount();
   1836         int pc = parallelism;
   1837         long c = ctl;
   1838         int tc = pc + (short)(c >>> TC_SHIFT);
   1839         int rc = pc + (int)(c >> AC_SHIFT);
   1840         if (rc < 0) // ignore transient negative
   1841             rc = 0;
   1842         int ac = rc + blockedCount;
   1843         String level;
   1844         if ((c & STOP_BIT) != 0)
   1845             level = (tc == 0) ? "Terminated" : "Terminating";
   1846         else
   1847             level = shutdown ? "Shutting down" : "Running";
   1848         return super.toString() +
   1849             "[" + level +
   1850             ", parallelism = " + pc +
   1851             ", size = " + tc +
   1852             ", active = " + ac +
   1853             ", running = " + rc +
   1854             ", steals = " + st +
   1855             ", tasks = " + qt +
   1856             ", submissions = " + qs +
   1857             "]";
   1858     }
   1859 
   1860     /**
   1861      * Initiates an orderly shutdown in which previously submitted
   1862      * tasks are executed, but no new tasks will be accepted.
   1863      * Invocation has no additional effect if already shut down.
   1864      * Tasks that are in the process of being submitted concurrently
   1865      * during the course of this method may or may not be rejected.
   1866      */
   1867     public void shutdown() {
   1868         checkPermission();
   1869         shutdown = true;
   1870         tryTerminate(false);
   1871     }
   1872 
   1873     /**
   1874      * Attempts to cancel and/or stop all tasks, and reject all
   1875      * subsequently submitted tasks.  Tasks that are in the process of
   1876      * being submitted or executed concurrently during the course of
   1877      * this method may or may not be rejected. This method cancels
   1878      * both existing and unexecuted tasks, in order to permit
   1879      * termination in the presence of task dependencies. So the method
   1880      * always returns an empty list (unlike the case for some other
   1881      * Executors).
   1882      *
   1883      * @return an empty list
   1884      */
   1885     public List<Runnable> shutdownNow() {
   1886         checkPermission();
   1887         shutdown = true;
   1888         tryTerminate(true);
   1889         return Collections.emptyList();
   1890     }
   1891 
   1892     /**
   1893      * Returns {@code true} if all tasks have completed following shut down.
   1894      *
   1895      * @return {@code true} if all tasks have completed following shut down
   1896      */
   1897     public boolean isTerminated() {
   1898         long c = ctl;
   1899         return ((c & STOP_BIT) != 0L &&
   1900                 (short)(c >>> TC_SHIFT) == -parallelism);
   1901     }
   1902 
   1903     /**
   1904      * Returns {@code true} if the process of termination has
   1905      * commenced but not yet completed.  This method may be useful for
   1906      * debugging. A return of {@code true} reported a sufficient
   1907      * period after shutdown may indicate that submitted tasks have
   1908      * ignored or suppressed interruption, or are waiting for IO,
   1909      * causing this executor not to properly terminate. (See the
   1910      * advisory notes for class {@link ForkJoinTask} stating that
   1911      * tasks should not normally entail blocking operations.  But if
   1912      * they do, they must abort them on interrupt.)
   1913      *
   1914      * @return {@code true} if terminating but not yet terminated
   1915      */
   1916     public boolean isTerminating() {
   1917         long c = ctl;
   1918         return ((c & STOP_BIT) != 0L &&
   1919                 (short)(c >>> TC_SHIFT) != -parallelism);
   1920     }
   1921 
   1922     /**
   1923      * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
   1924      */
   1925     final boolean isAtLeastTerminating() {
   1926         return (ctl & STOP_BIT) != 0L;
   1927     }
   1928 
   1929     /**
   1930      * Returns {@code true} if this pool has been shut down.
   1931      *
   1932      * @return {@code true} if this pool has been shut down
   1933      */
   1934     public boolean isShutdown() {
   1935         return shutdown;
   1936     }
   1937 
   1938     /**
   1939      * Blocks until all tasks have completed execution after a shutdown
   1940      * request, or the timeout occurs, or the current thread is
   1941      * interrupted, whichever happens first.
   1942      *
   1943      * @param timeout the maximum time to wait
   1944      * @param unit the time unit of the timeout argument
   1945      * @return {@code true} if this executor terminated and
   1946      *         {@code false} if the timeout elapsed before termination
   1947      * @throws InterruptedException if interrupted while waiting
   1948      */
   1949     public boolean awaitTermination(long timeout, TimeUnit unit)
   1950         throws InterruptedException {
   1951         long nanos = unit.toNanos(timeout);
   1952         final ReentrantLock lock = this.submissionLock;
   1953         lock.lock();
   1954         try {
   1955             for (;;) {
   1956                 if (isTerminated())
   1957                     return true;
   1958                 if (nanos <= 0)
   1959                     return false;
   1960                 nanos = termination.awaitNanos(nanos);
   1961             }
   1962         } finally {
   1963             lock.unlock();
   1964         }
   1965     }
   1966 
   1967     /**
   1968      * Interface for extending managed parallelism for tasks running
   1969      * in {@link ForkJoinPool}s.
   1970      *
   1971      * <p>A {@code ManagedBlocker} provides two methods.  Method
   1972      * {@code isReleasable} must return {@code true} if blocking is
   1973      * not necessary. Method {@code block} blocks the current thread
   1974      * if necessary (perhaps internally invoking {@code isReleasable}
   1975      * before actually blocking). These actions are performed by any
   1976      * thread invoking {@link ForkJoinPool#managedBlock}.  The
   1977      * unusual methods in this API accommodate synchronizers that may,
   1978      * but don't usually, block for long periods. Similarly, they
   1979      * allow more efficient internal handling of cases in which
   1980      * additional workers may be, but usually are not, needed to
   1981      * ensure sufficient parallelism.  Toward this end,
   1982      * implementations of method {@code isReleasable} must be amenable
   1983      * to repeated invocation.
   1984      *
   1985      * <p>For example, here is a ManagedBlocker based on a
   1986      * ReentrantLock:
   1987      *  <pre> {@code
   1988      * class ManagedLocker implements ManagedBlocker {
   1989      *   final ReentrantLock lock;
   1990      *   boolean hasLock = false;
   1991      *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
   1992      *   public boolean block() {
   1993      *     if (!hasLock)
   1994      *       lock.lock();
   1995      *     return true;
   1996      *   }
   1997      *   public boolean isReleasable() {
   1998      *     return hasLock || (hasLock = lock.tryLock());
   1999      *   }
   2000      * }}</pre>
   2001      *
   2002      * <p>Here is a class that possibly blocks waiting for an
   2003      * item on a given queue:
   2004      *  <pre> {@code
   2005      * class QueueTaker<E> implements ManagedBlocker {
   2006      *   final BlockingQueue<E> queue;
   2007      *   volatile E item = null;
   2008      *   QueueTaker(BlockingQueue<E> q) { this.queue = q; }
   2009      *   public boolean block() throws InterruptedException {
   2010      *     if (item == null)
   2011      *       item = queue.take();
   2012      *     return true;
   2013      *   }
   2014      *   public boolean isReleasable() {
   2015      *     return item != null || (item = queue.poll()) != null;
   2016      *   }
   2017      *   public E getItem() { // call after pool.managedBlock completes
   2018      *     return item;
   2019      *   }
   2020      * }}</pre>
   2021      */
   2022     public static interface ManagedBlocker {
   2023         /**
   2024          * Possibly blocks the current thread, for example waiting for
   2025          * a lock or condition.
   2026          *
   2027          * @return {@code true} if no additional blocking is necessary
   2028          * (i.e., if isReleasable would return true)
   2029          * @throws InterruptedException if interrupted while waiting
   2030          * (the method is not required to do so, but is allowed to)
   2031          */
   2032         boolean block() throws InterruptedException;
   2033 
   2034         /**
   2035          * Returns {@code true} if blocking is unnecessary.
   2036          */
   2037         boolean isReleasable();
   2038     }
   2039 
   2040     /**
   2041      * Blocks in accord with the given blocker.  If the current thread
   2042      * is a {@link ForkJoinWorkerThread}, this method possibly
   2043      * arranges for a spare thread to be activated if necessary to
   2044      * ensure sufficient parallelism while the current thread is blocked.
   2045      *
   2046      * <p>If the caller is not a {@link ForkJoinTask}, this method is
   2047      * behaviorally equivalent to
   2048      *  <pre> {@code
   2049      * while (!blocker.isReleasable())
   2050      *   if (blocker.block())
   2051      *     return;
   2052      * }</pre>
   2053      *
   2054      * If the caller is a {@code ForkJoinTask}, then the pool may
   2055      * first be expanded to ensure parallelism, and later adjusted.
   2056      *
   2057      * @param blocker the blocker
   2058      * @throws InterruptedException if blocker.block did so
   2059      */
   2060     public static void managedBlock(ManagedBlocker blocker)
   2061         throws InterruptedException {
   2062         Thread t = Thread.currentThread();
   2063         if (t instanceof ForkJoinWorkerThread) {
   2064             ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
   2065             w.pool.awaitBlocker(blocker);
   2066         }
   2067         else {
   2068             do {} while (!blocker.isReleasable() && !blocker.block());
   2069         }
   2070     }
   2071 
   2072     // AbstractExecutorService overrides.  These rely on undocumented
   2073     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
   2074     // implement RunnableFuture.
   2075 
   2076     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
   2077         return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
   2078     }
   2079 
   2080     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
   2081         return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
   2082     }
   2083 
   2084     // Unsafe mechanics
   2085     private static final sun.misc.Unsafe UNSAFE;
   2086     private static final long ctlOffset;
   2087     private static final long stealCountOffset;
   2088     private static final long blockedCountOffset;
   2089     private static final long quiescerCountOffset;
   2090     private static final long scanGuardOffset;
   2091     private static final long nextWorkerNumberOffset;
   2092     private static final long ABASE;
   2093     private static final int ASHIFT;
   2094 
   2095     static {
   2096         poolNumberGenerator = new AtomicInteger();
   2097         workerSeedGenerator = new Random();
   2098         modifyThreadPermission = new RuntimePermission("modifyThread");
   2099         defaultForkJoinWorkerThreadFactory =
   2100             new DefaultForkJoinWorkerThreadFactory();
   2101         try {
   2102             UNSAFE = sun.misc.Unsafe.getUnsafe();
   2103             Class<?> k = ForkJoinPool.class;
   2104             ctlOffset = UNSAFE.objectFieldOffset
   2105                 (k.getDeclaredField("ctl"));
   2106             stealCountOffset = UNSAFE.objectFieldOffset
   2107                 (k.getDeclaredField("stealCount"));
   2108             blockedCountOffset = UNSAFE.objectFieldOffset
   2109                 (k.getDeclaredField("blockedCount"));
   2110             quiescerCountOffset = UNSAFE.objectFieldOffset
   2111                 (k.getDeclaredField("quiescerCount"));
   2112             scanGuardOffset = UNSAFE.objectFieldOffset
   2113                 (k.getDeclaredField("scanGuard"));
   2114             nextWorkerNumberOffset = UNSAFE.objectFieldOffset
   2115                 (k.getDeclaredField("nextWorkerNumber"));
   2116         } catch (Exception e) {
   2117             throw new Error(e);
   2118         }
   2119         Class<?> a = ForkJoinTask[].class;
   2120         ABASE = UNSAFE.arrayBaseOffset(a);
   2121         int s = UNSAFE.arrayIndexScale(a);
   2122         if ((s & (s-1)) != 0)
   2123             throw new Error("data type scale not a power of two");
   2124         ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
   2125     }
   2126 
   2127 }
   2128