Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Written by Doug Lea with assistance from members of JCP JSR-166
      3  * Expert Group and released to the public domain, as explained at
      4  * http://creativecommons.org/publicdomain/zero/1.0/
      5  */
      6 
      7 package java.util.concurrent;
      8 
      9 import java.lang.Thread.UncaughtExceptionHandler;
     10 import java.util.ArrayList;
     11 import java.util.Arrays;
     12 import java.util.Collection;
     13 import java.util.Collections;
     14 import java.util.List;
     15 import java.util.concurrent.AbstractExecutorService;
     16 import java.util.concurrent.Callable;
     17 import java.util.concurrent.ExecutorService;
     18 import java.util.concurrent.Future;
     19 import java.util.concurrent.RejectedExecutionException;
     20 import java.util.concurrent.RunnableFuture;
     21 import java.util.concurrent.ThreadLocalRandom;
     22 import java.util.concurrent.TimeUnit;
     23 
     24 /**
     25  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
     26  * A {@code ForkJoinPool} provides the entry point for submissions
     27  * from non-{@code ForkJoinTask} clients, as well as management and
     28  * monitoring operations.
     29  *
     30  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
     31  * ExecutorService} mainly by virtue of employing
     32  * <em>work-stealing</em>: all threads in the pool attempt to find and
     33  * execute tasks submitted to the pool and/or created by other active
     34  * tasks (eventually blocking waiting for work if none exist). This
     35  * enables efficient processing when most tasks spawn other subtasks
     36  * (as do most {@code ForkJoinTask}s), as well as when many small
     37  * tasks are submitted to the pool from external clients.  Especially
     38  * when setting <em>asyncMode</em> to true in constructors, {@code
     39  * ForkJoinPool}s may also be appropriate for use with event-style
     40  * tasks that are never joined.
     41  *
     42  * <p>A static {@code commonPool()} is available and appropriate for
     43  * most applications. The common pool is used by any ForkJoinTask that
     44  * is not explicitly submitted to a specified pool. Using the common
     45  * pool normally reduces resource usage (its threads are slowly
     46  * reclaimed during periods of non-use, and reinstated upon subsequent
     47  * use).
     48  *
     49  * <p>For applications that require separate or custom pools, a {@code
     50  * ForkJoinPool} may be constructed with a given target parallelism
     51  * level; by default, equal to the number of available processors. The
     52  * pool attempts to maintain enough active (or available) threads by
     53  * dynamically adding, suspending, or resuming internal worker
     54  * threads, even if some tasks are stalled waiting to join others.
     55  * However, no such adjustments are guaranteed in the face of blocked
     56  * I/O or other unmanaged synchronization. The nested {@link
     57  * ManagedBlocker} interface enables extension of the kinds of
     58  * synchronization accommodated.
     59  *
     60  * <p>In addition to execution and lifecycle control methods, this
     61  * class provides status check methods (for example
     62  * {@link #getStealCount}) that are intended to aid in developing,
     63  * tuning, and monitoring fork/join applications. Also, method
     64  * {@link #toString} returns indications of pool state in a
     65  * convenient form for informal monitoring.
     66  *
     67  * <p>As is the case with other ExecutorServices, there are three
     68  * main task execution methods summarized in the following table.
     69  * These are designed to be used primarily by clients not already
     70  * engaged in fork/join computations in the current pool.  The main
     71  * forms of these methods accept instances of {@code ForkJoinTask},
     72  * but overloaded forms also allow mixed execution of plain {@code
     73  * Runnable}- or {@code Callable}- based activities as well.  However,
     74  * tasks that are already executing in a pool should normally instead
     75  * use the within-computation forms listed in the table unless using
     76  * async event-style tasks that are not usually joined, in which case
     77  * there is little difference among choice of methods.
     78  *
     79  * <table BORDER CELLPADDING=3 CELLSPACING=1>
     80  * <caption>Summary of task execution methods</caption>
     81  *  <tr>
     82  *    <td></td>
     83  *    <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
     84  *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
     85  *  </tr>
     86  *  <tr>
     87  *    <td> <b>Arrange async execution</b></td>
     88  *    <td> {@link #execute(ForkJoinTask)}</td>
     89  *    <td> {@link ForkJoinTask#fork}</td>
     90  *  </tr>
     91  *  <tr>
     92  *    <td> <b>Await and obtain result</b></td>
     93  *    <td> {@link #invoke(ForkJoinTask)}</td>
     94  *    <td> {@link ForkJoinTask#invoke}</td>
     95  *  </tr>
     96  *  <tr>
     97  *    <td> <b>Arrange exec and obtain Future</b></td>
     98  *    <td> {@link #submit(ForkJoinTask)}</td>
     99  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
    100  *  </tr>
    101  * </table>
    102  *
    103  * <p>The common pool is by default constructed with default
    104  * parameters, but these may be controlled by setting three
    105  * {@linkplain System#getProperty system properties}:
    106  * <ul>
    107  * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
    108  * - the parallelism level, a non-negative integer
    109  * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
    110  * - the class name of a {@link ForkJoinWorkerThreadFactory}
    111  * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
    112  * - the class name of a {@link UncaughtExceptionHandler}
    113  * </ul>
    114  * The system class loader is used to load these classes.
    115  * Upon any error in establishing these settings, default parameters
    116  * are used. It is possible to disable or limit the use of threads in
    117  * the common pool by setting the parallelism property to zero, and/or
    118  * using a factory that may return {@code null}.
    119  *
    120  * <p><b>Implementation notes</b>: This implementation restricts the
    121  * maximum number of running threads to 32767. Attempts to create
    122  * pools with greater than the maximum number result in
    123  * {@code IllegalArgumentException}.
    124  *
    125  * <p>This implementation rejects submitted tasks (that is, by throwing
    126  * {@link RejectedExecutionException}) only when the pool is shut down
    127  * or internal resources have been exhausted.
    128  *
    129  * @since 1.7
    130  * @author Doug Lea
    131  */
    132 public class ForkJoinPool extends AbstractExecutorService {
    133 
    134     /*
    135      * Implementation Overview
    136      *
    137      * This class and its nested classes provide the main
    138      * functionality and control for a set of worker threads:
    139      * Submissions from non-FJ threads enter into submission queues.
    140      * Workers take these tasks and typically split them into subtasks
    141      * that may be stolen by other workers.  Preference rules give
    142      * first priority to processing tasks from their own queues (LIFO
    143      * or FIFO, depending on mode), then to randomized FIFO steals of
    144      * tasks in other queues.
    145      *
    146      * WorkQueues
    147      * ==========
    148      *
    149      * Most operations occur within work-stealing queues (in nested
    150      * class WorkQueue).  These are special forms of Deques that
    151      * support only three of the four possible end-operations -- push,
    152      * pop, and poll (aka steal), under the further constraints that
    153      * push and pop are called only from the owning thread (or, as
    154      * extended here, under a lock), while poll may be called from
    155      * other threads.  (If you are unfamiliar with them, you probably
    156      * want to read Herlihy and Shavit's book "The Art of
    157      * Multiprocessor programming", chapter 16 describing these in
    158      * more detail before proceeding.)  The main work-stealing queue
    159      * design is roughly similar to those in the papers "Dynamic
    160      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
    161      * (http://research.sun.com/scalable/pubs/index.html) and
    162      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
    163      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
    164      * See also "Correct and Efficient Work-Stealing for Weak Memory
    165      * Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
    166      * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
    167      * analysis of memory ordering (atomic, volatile etc) issues.  The
    168      * main differences ultimately stem from GC requirements that we
    169      * null out taken slots as soon as we can, to maintain as small a
    170      * footprint as possible even in programs generating huge numbers
    171      * of tasks. To accomplish this, we shift the CAS arbitrating pop
    172      * vs poll (steal) from being on the indices ("base" and "top") to
    173      * the slots themselves.  So, both a successful pop and poll
    174      * mainly entail a CAS of a slot from non-null to null.  Because
    175      * we rely on CASes of references, we do not need tag bits on base
    176      * or top.  They are simple ints as used in any circular
    177      * array-based queue (see for example ArrayDeque).  Updates to the
    178      * indices must still be ordered in a way that guarantees that top
    179      * == base means the queue is empty, but otherwise may err on the
    180      * side of possibly making the queue appear nonempty when a push,
    181      * pop, or poll have not fully committed. Note that this means
    182      * that the poll operation, considered individually, is not
    183      * wait-free. One thief cannot successfully continue until another
    184      * in-progress one (or, if previously empty, a push) completes.
    185      * However, in the aggregate, we ensure at least probabilistic
    186      * non-blockingness.  If an attempted steal fails, a thief always
    187      * chooses a different random victim target to try next. So, in
    188      * order for one thief to progress, it suffices for any
    189      * in-progress poll or new push on any empty queue to
    190      * complete. (This is why we normally use method pollAt and its
    191      * variants that try once at the apparent base index, else
    192      * consider alternative actions, rather than method poll.)
    193      *
    194      * This approach also enables support of a user mode in which local
    195      * task processing is in FIFO, not LIFO order, simply by using
    196      * poll rather than pop.  This can be useful in message-passing
    197      * frameworks in which tasks are never joined.  However neither
    198      * mode considers affinities, loads, cache localities, etc, so
    199      * rarely provide the best possible performance on a given
    200      * machine, but portably provide good throughput by averaging over
    201      * these factors.  (Further, even if we did try to use such
    202      * information, we do not usually have a basis for exploiting it.
    203      * For example, some sets of tasks profit from cache affinities,
    204      * but others are harmed by cache pollution effects.)
    205      *
    206      * WorkQueues are also used in a similar way for tasks submitted
    207      * to the pool. We cannot mix these tasks in the same queues used
    208      * for work-stealing (this would contaminate lifo/fifo
    209      * processing). Instead, we randomly associate submission queues
    210      * with submitting threads, using a form of hashing.  The
    211      * Submitter probe value serves as a hash code for
    212      * choosing existing queues, and may be randomly repositioned upon
    213      * contention with other submitters.  In essence, submitters act
    214      * like workers except that they are restricted to executing local
    215      * tasks that they submitted. However, because most
    216      * shared/external queue operations are more expensive than
    217      * internal, and because, at steady state, external submitters
    218      * will compete for CPU with workers, ForkJoinTask.join and
    219      * related methods disable them from repeatedly helping to process
    220      * tasks if all workers are active.  Insertion of tasks in shared
    221      * mode requires a lock (mainly to protect in the case of
    222      * resizing) but we use only a simple spinlock (using bits in
    223      * field qlock), because submitters encountering a busy queue move
    224      * on to try or create other queues -- they block only when
    225      * creating and registering new queues.
    226      *
    227      * Management
    228      * ==========
    229      *
    230      * The main throughput advantages of work-stealing stem from
    231      * decentralized control -- workers mostly take tasks from
    232      * themselves or each other. We cannot negate this in the
    233      * implementation of other management responsibilities. The main
    234      * tactic for avoiding bottlenecks is packing nearly all
    235      * essentially atomic control state into two volatile variables
    236      * that are by far most often read (not written) as status and
    237      * consistency checks.
    238      *
    239      * Field "ctl" contains 64 bits holding all the information needed
    240      * to atomically decide to add, inactivate, enqueue (on an event
    241      * queue), dequeue, and/or re-activate workers.  To enable this
    242      * packing, we restrict maximum parallelism to (1<<15)-1 (which is
    243      * far in excess of normal operating range) to allow ids, counts,
    244      * and their negations (used for thresholding) to fit into 16bit
    245      * fields.
    246      *
    247      * Field "plock" is a form of sequence lock with a saturating
    248      * shutdown bit (similarly for per-queue "qlocks"), mainly
    249      * protecting updates to the workQueues array, as well as to
    250      * enable shutdown.  When used as a lock, it is normally only very
    251      * briefly held, so is nearly always available after at most a
    252      * brief spin, but we use a monitor-based backup strategy to
    253      * block when needed.
    254      *
    255      * Recording WorkQueues.  WorkQueues are recorded in the
    256      * "workQueues" array that is created upon first use and expanded
    257      * if necessary.  Updates to the array while recording new workers
    258      * and unrecording terminated ones are protected from each other
    259      * by a lock but the array is otherwise concurrently readable, and
    260      * accessed directly.  To simplify index-based operations, the
    261      * array size is always a power of two, and all readers must
    262      * tolerate null slots. Worker queues are at odd indices. Shared
    263      * (submission) queues are at even indices, up to a maximum of 64
    264      * slots, to limit growth even if array needs to expand to add
    265      * more workers. Grouping them together in this way simplifies and
    266      * speeds up task scanning.
    267      *
    268      * All worker thread creation is on-demand, triggered by task
    269      * submissions, replacement of terminated workers, and/or
    270      * compensation for blocked workers. However, all other support
    271      * code is set up to work with other policies.  To ensure that we
    272      * do not hold on to worker references that would prevent GC, ALL
    273      * accesses to workQueues are via indices into the workQueues
    274      * array (which is one source of some of the messy code
    275      * constructions here). In essence, the workQueues array serves as
    276      * a weak reference mechanism. Thus for example the wait queue
    277      * field of ctl stores indices, not references.  Access to the
    278      * workQueues in associated methods (for example signalWork) must
    279      * both index-check and null-check the IDs. All such accesses
    280      * ignore bad IDs by returning out early from what they are doing,
    281      * since this can only be associated with termination, in which
    282      * case it is OK to give up.  All uses of the workQueues array
    283      * also check that it is non-null (even if previously
    284      * non-null). This allows nulling during termination, which is
    285      * currently not necessary, but remains an option for
    286      * resource-revocation-based shutdown schemes. It also helps
    287      * reduce JIT issuance of uncommon-trap code, which tends to
    288      * unnecessarily complicate control flow in some methods.
    289      *
    290      * Event Queuing. Unlike HPC work-stealing frameworks, we cannot
    291      * let workers spin indefinitely scanning for tasks when none can
    292      * be found immediately, and we cannot start/resume workers unless
    293      * there appear to be tasks available.  On the other hand, we must
    294      * quickly prod them into action when new tasks are submitted or
    295      * generated. In many usages, ramp-up time to activate workers is
    296      * the main limiting factor in overall performance (this is
    297      * compounded at program start-up by JIT compilation and
    298      * allocation). So we try to streamline this as much as possible.
    299      * We park/unpark workers after placing in an event wait queue
    300      * when they cannot find work. This "queue" is actually a simple
    301      * Treiber stack, headed by the "id" field of ctl, plus a 15bit
    302      * counter value (that reflects the number of times a worker has
    303      * been inactivated) to avoid ABA effects (we need only as many
    304      * version numbers as worker threads). Successors are held in
    305      * field WorkQueue.nextWait.  Queuing deals with several intrinsic
    306      * races, mainly that a task-producing thread can miss seeing (and
    307      * signalling) another thread that gave up looking for work but
    308      * has not yet entered the wait queue. We solve this by requiring
    309      * a full sweep of all workers (via repeated calls to method
    310      * scan()) both before and after a newly waiting worker is added
    311      * to the wait queue.  Because enqueued workers may actually be
    312      * rescanning rather than waiting, we set and clear the "parker"
    313      * field of WorkQueues to reduce unnecessary calls to unpark.
    314      * (This requires a secondary recheck to avoid missed signals.)
    315      * Note the unusual conventions about Thread.interrupts
    316      * surrounding parking and other blocking: Because interrupts are
    317      * used solely to alert threads to check termination, which is
    318      * checked anyway upon blocking, we clear status (using
    319      * Thread.interrupted) before any call to park, so that park does
    320      * not immediately return due to status being set via some other
    321      * unrelated call to interrupt in user code.
    322      *
    323      * Signalling.  We create or wake up workers only when there
    324      * appears to be at least one task they might be able to find and
    325      * execute.  When a submission is added or another worker adds a
    326      * task to a queue that has fewer than two tasks, they signal
    327      * waiting workers (or trigger creation of new ones if fewer than
    328      * the given parallelism level -- signalWork).  These primary
    329      * signals are buttressed by others whenever other threads remove
    330      * a task from a queue and notice that there are other tasks there
    331      * as well.  So in general, pools will be over-signalled. On most
    332      * platforms, signalling (unpark) overhead time is noticeably
    333      * long, and the time between signalling a thread and it actually
    334      * making progress can be very noticeably long, so it is worth
    335      * offloading these delays from critical paths as much as
    336      * possible. Additionally, workers spin-down gradually, by staying
    337      * alive so long as they see the ctl state changing.  Similar
    338      * stability-sensing techniques are also used before blocking in
    339      * awaitJoin and helpComplete.
    340      *
    341      * Trimming workers. To release resources after periods of lack of
    342      * use, a worker starting to wait when the pool is quiescent will
    343      * time out and terminate if the pool has remained quiescent for a
    344      * given period -- a short period if there are more threads than
    345      * parallelism, longer as the number of threads decreases. This
    346      * will slowly propagate, eventually terminating all workers after
    347      * periods of non-use.
    348      *
    349      * Shutdown and Termination. A call to shutdownNow atomically sets
    350      * a plock bit and then (non-atomically) sets each worker's
    351      * qlock status, cancels all unprocessed tasks, and wakes up
    352      * all waiting workers.  Detecting whether termination should
    353      * commence after a non-abrupt shutdown() call requires more work
    354      * and bookkeeping. We need consensus about quiescence (i.e., that
    355      * there is no more work). The active count provides a primary
    356      * indication but non-abrupt shutdown still requires a rechecking
    357      * scan for any workers that are inactive but not queued.
    358      *
    359      * Joining Tasks
    360      * =============
    361      *
    362      * Any of several actions may be taken when one worker is waiting
    363      * to join a task stolen (or always held) by another.  Because we
    364      * are multiplexing many tasks on to a pool of workers, we can't
    365      * just let them block (as in Thread.join).  We also cannot just
    366      * reassign the joiner's run-time stack with another and replace
    367      * it later, which would be a form of "continuation", that even if
    368      * possible is not necessarily a good idea since we sometimes need
    369      * both an unblocked task and its continuation to progress.
    370      * Instead we combine two tactics:
    371      *
    372      *   Helping: Arranging for the joiner to execute some task that it
    373      *      would be running if the steal had not occurred.
    374      *
    375      *   Compensating: Unless there are already enough live threads,
    376      *      method tryCompensate() may create or re-activate a spare
    377      *      thread to compensate for blocked joiners until they unblock.
    378      *
    379      * A third form (implemented in tryRemoveAndExec) amounts to
    380      * helping a hypothetical compensator: If we can readily tell that
    381      * a possible action of a compensator is to steal and execute the
    382      * task being joined, the joining thread can do so directly,
    383      * without the need for a compensation thread (although at the
    384      * expense of larger run-time stacks, but the tradeoff is
    385      * typically worthwhile).
    386      *
    387      * The ManagedBlocker extension API can't use helping so relies
    388      * only on compensation in method awaitBlocker.
    389      *
    390      * The algorithm in tryHelpStealer entails a form of "linear"
    391      * helping: Each worker records (in field currentSteal) the most
    392      * recent task it stole from some other worker. Plus, it records
    393      * (in field currentJoin) the task it is currently actively
    394      * joining. Method tryHelpStealer uses these markers to try to
    395      * find a worker to help (i.e., steal back a task from and execute
    396      * it) that could hasten completion of the actively joined task.
    397      * In essence, the joiner executes a task that would be on its own
    398      * local deque had the to-be-joined task not been stolen. This may
    399      * be seen as a conservative variant of the approach in Wagner &
    400      * Calder "Leapfrogging: a portable technique for implementing
    401      * efficient futures" SIGPLAN Notices, 1993
    402      * (http://portal.acm.org/citation.cfm?id=155354). It differs in
    403      * that: (1) We only maintain dependency links across workers upon
    404      * steals, rather than use per-task bookkeeping.  This sometimes
    405      * requires a linear scan of workQueues array to locate stealers,
    406      * but often doesn't because stealers leave hints (that may become
    407      * stale/wrong) of where to locate them.  It is only a hint
    408      * because a worker might have had multiple steals and the hint
    409      * records only one of them (usually the most current).  Hinting
    410      * isolates cost to when it is needed, rather than adding to
    411      * per-task overhead.  (2) It is "shallow", ignoring nesting and
    412      * potentially cyclic mutual steals.  (3) It is intentionally
    413      * racy: field currentJoin is updated only while actively joining,
    414      * which means that we miss links in the chain during long-lived
    415      * tasks, GC stalls etc (which is OK since blocking in such cases
    416      * is usually a good idea).  (4) We bound the number of attempts
    417      * to find work (see MAX_HELP) and fall back to suspending the
    418      * worker and if necessary replacing it with another.
    419      *
    420      * It is impossible to keep exactly the target parallelism number
    421      * of threads running at any given time.  Determining the
    422      * existence of conservatively safe helping targets, the
    423      * availability of already-created spares, and the apparent need
    424      * to create new spares are all racy, so we rely on multiple
    425      * retries of each.  Compensation in the apparent absence of
    426      * helping opportunities is challenging to control on JVMs, where
    427      * GC and other activities can stall progress of tasks that in
    428      * turn stall out many other dependent tasks, without us being
    429      * able to determine whether they will ever require compensation.
    430      * Even though work-stealing otherwise encounters little
    431      * degradation in the presence of more threads than cores,
    432      * aggressively adding new threads in such cases entails risk of
    433      * unwanted positive feedback control loops in which more threads
    434      * cause more dependent stalls (as well as delayed progress of
    435      * unblocked threads to the point that we know they are available)
    436      * leading to more situations requiring more threads, and so
    437      * on. This aspect of control can be seen as an (analytically
    438      * intractable) game with an opponent that may choose the worst
    439      * (for us) active thread to stall at any time.  We take several
    440      * precautions to bound losses (and thus bound gains), mainly in
    441      * methods tryCompensate and awaitJoin.
    442      *
    443      * Common Pool
    444      * ===========
    445      *
    446      * The static common pool always exists after static
    447      * initialization.  Since it (or any other created pool) need
    448      * never be used, we minimize initial construction overhead and
    449      * footprint to the setup of about a dozen fields, with no nested
    450      * allocation. Most bootstrapping occurs within method
    451      * fullExternalPush during the first submission to the pool.
    452      *
    453      * When external threads submit to the common pool, they can
    454      * perform subtask processing (see externalHelpJoin and related
    455      * methods).  This caller-helps policy makes it sensible to set
    456      * common pool parallelism level to one (or more) less than the
    457      * total number of available cores, or even zero for pure
    458      * caller-runs.  We do not need to record whether external
    459      * submissions are to the common pool -- if not, externalHelpJoin
    460      * returns quickly (at the most helping to signal some common pool
    461      * workers). These submitters would otherwise be blocked waiting
    462      * for completion, so the extra effort (with liberally sprinkled
    463      * task status checks) in inapplicable cases amounts to an odd
    464      * form of limited spin-wait before blocking in ForkJoinTask.join.
    465      *
    466      * Style notes
    467      * ===========
    468      *
    469      * There is a lot of representation-level coupling among classes
    470      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
    471      * fields of WorkQueue maintain data structures managed by
    472      * ForkJoinPool, so are directly accessed.  There is little point
    473      * trying to reduce this, since any associated future changes in
    474      * representations will need to be accompanied by algorithmic
    475      * changes anyway. Several methods intrinsically sprawl because
    476      * they must accumulate sets of consistent reads of volatiles held
    477      * in local variables.  Methods signalWork() and scan() are the
    478      * main bottlenecks, so are especially heavily
    479      * micro-optimized/mangled.  There are lots of inline assignments
    480      * (of form "while ((local = field) != 0)") which are usually the
    481      * simplest way to ensure the required read orderings (which are
    482      * sometimes critical). This leads to a "C"-like style of listing
    483      * declarations of these locals at the heads of methods or blocks.
    484      * There are several occurrences of the unusual "do {} while
    485      * (!cas...)"  which is the simplest way to force an update of a
    486      * CAS'ed variable. There are also other coding oddities (including
    487      * several unnecessary-looking hoisted null checks) that help
    488      * some methods perform reasonably even when interpreted (not
    489      * compiled).
    490      *
    491      * The order of declarations in this file is:
    492      * (1) Static utility functions
    493      * (2) Nested (static) classes
    494      * (3) Static fields
    495      * (4) Fields, along with constants used when unpacking some of them
    496      * (5) Internal control methods
    497      * (6) Callbacks and other support for ForkJoinTask methods
    498      * (7) Exported methods
    499      * (8) Static block initializing statics in minimally dependent order
    500      */
    501 
    502     // Static utilities
    503 
    504     /**
    505      * If there is a security manager, makes sure caller has
    506      * permission to modify threads.
    507      */
    508     private static void checkPermission() {
    509         SecurityManager security = System.getSecurityManager();
    510         if (security != null)
    511             security.checkPermission(modifyThreadPermission);
    512     }
    513 
    514     // Nested classes
    515 
    516     /**
    517      * Factory for creating new {@link ForkJoinWorkerThread}s.
    518      * A {@code ForkJoinWorkerThreadFactory} must be defined and used
    519      * for {@code ForkJoinWorkerThread} subclasses that extend base
    520      * functionality or initialize threads with different contexts.
    521      */
    522     public static interface ForkJoinWorkerThreadFactory {
    523         /**
    524          * Returns a new worker thread operating in the given pool.
    525          *
    526          * @param pool the pool this thread works in
    527          * @throws NullPointerException if the pool is null
    528          * @return the new worker thread
    529          */
    530         public ForkJoinWorkerThread newThread(ForkJoinPool pool);
    531     }
    532 
    533     /**
    534      * Default ForkJoinWorkerThreadFactory implementation; creates a
    535      * new ForkJoinWorkerThread.
    536      */
    537     static final class DefaultForkJoinWorkerThreadFactory
    538         implements ForkJoinWorkerThreadFactory {
    539         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    540             return new ForkJoinWorkerThread(pool);
    541         }
    542     }
    543 
    544     /**
    545      * Class for artificial tasks that are used to replace the target
    546      * of local joins if they are removed from an interior queue slot
    547      * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
    548      * actually do anything beyond having a unique identity.
    549      */
    550     static final class EmptyTask extends ForkJoinTask<Void> {
    551         private static final long serialVersionUID = -7721805057305804111L;
    552         EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
    553         public final Void getRawResult() { return null; }
    554         public final void setRawResult(Void x) {}
    555         public final boolean exec() { return true; }
    556     }
    557 
    558     /**
    559      * Queues supporting work-stealing as well as external task
    560      * submission. See above for main rationale and algorithms.
    561      * Implementation relies heavily on "Unsafe" intrinsics
    562      * and selective use of "volatile":
    563      *
    564      * Field "base" is the index (mod array.length) of the least valid
    565      * queue slot, which is always the next position to steal (poll)
    566      * from if nonempty. Reads and writes require volatile orderings
    567      * but not CAS, because updates are only performed after slot
    568      * CASes.
    569      *
    570      * Field "top" is the index (mod array.length) of the next queue
    571      * slot to push to or pop from. It is written only by owner thread
    572      * for push, or under lock for external/shared push, and accessed
    573      * by other threads only after reading (volatile) base.  Both top
    574      * and base are allowed to wrap around on overflow, but (top -
    575      * base) (or more commonly -(base - top) to force volatile read of
    576      * base before top) still estimates size. The lock ("qlock") is
    577      * forced to -1 on termination, causing all further lock attempts
    578      * to fail. (Note: we don't need CAS for termination state because
    579      * upon pool shutdown, all shared-queues will stop being used
    580      * anyway.)  Nearly all lock bodies are set up so that exceptions
    581      * within lock bodies are "impossible" (modulo JVM errors that
    582      * would cause failure anyway.)
    583      *
    584      * The array slots are read and written using the emulation of
    585      * volatiles/atomics provided by Unsafe. Insertions must in
    586      * general use putOrderedObject as a form of releasing store to
    587      * ensure that all writes to the task object are ordered before
    588      * its publication in the queue.  All removals entail a CAS to
    589      * null.  The array is always a power of two. To ensure safety of
    590      * Unsafe array operations, all accesses perform explicit null
    591      * checks and implicit bounds checks via power-of-two masking.
    592      *
    593      * In addition to basic queuing support, this class contains
    594      * fields described elsewhere to control execution. It turns out
    595      * to work better memory-layout-wise to include them in this class
    596      * rather than a separate class.
    597      *
    598      * Performance on most platforms is very sensitive to placement of
    599      * instances of both WorkQueues and their arrays -- we absolutely
    600      * do not want multiple WorkQueue instances or multiple queue
    601      * arrays sharing cache lines. (It would be best for queue objects
    602      * and their arrays to share, but there is nothing available to
    603      * help arrange that). The @Contended annotation alerts JVMs to
    604      * try to keep instances apart.
    605      */
    606     static final class WorkQueue {
    607         /**
    608          * Capacity of work-stealing queue array upon initialization.
    609          * Must be a power of two; at least 4, but should be larger to
    610          * reduce or eliminate cacheline sharing among queues.
    611          * Currently, it is much larger, as a partial workaround for
    612          * the fact that JVMs often place arrays in locations that
    613          * share GC bookkeeping (especially cardmarks) such that
    614          * per-write accesses encounter serious memory contention.
    615          */
    616         static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
    617 
    618         /**
    619          * Maximum size for queue arrays. Must be a power of two less
    620          * than or equal to 1 << (31 - width of array entry) to ensure
    621          * lack of wraparound of index calculations, but defined to a
    622          * value a bit less than this to help users trap runaway
    623          * programs before saturating systems.
    624          */
    625         static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
    626 
    627         // Heuristic padding to ameliorate unfortunate memory placements
    628         volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
    629 
    630         volatile int eventCount;   // encoded inactivation count; < 0 if inactive
    631         int nextWait;              // encoded record of next event waiter
    632         int nsteals;               // number of steals
    633         int hint;                  // steal index hint
    634         short poolIndex;           // index of this queue in pool
    635         final short mode;          // 0: lifo, > 0: fifo, < 0: shared
    636         volatile int qlock;        // 1: locked, -1: terminate; else 0
    637         volatile int base;         // index of next slot for poll
    638         int top;                   // index of next slot for push
    639         ForkJoinTask<?>[] array;   // the elements (initially unallocated)
    640         final ForkJoinPool pool;   // the containing pool (may be null)
    641         final ForkJoinWorkerThread owner; // owning thread or null if shared
    642         volatile Thread parker;    // == owner during call to park; else null
    643         volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
    644         ForkJoinTask<?> currentSteal; // current non-local task being executed
    645 
    646         volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
    647         volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d;
    648 
    649         WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode,
    650                   int seed) {
    651             this.pool = pool;
    652             this.owner = owner;
    653             this.mode = (short)mode;
    654             this.hint = seed; // store initial seed for runWorker
    655             // Place indices in the center of array (that is not yet allocated)
    656             base = top = INITIAL_QUEUE_CAPACITY >>> 1;
    657         }
    658 
    659         /**
    660          * Returns the approximate number of tasks in the queue.
    661          */
    662         final int queueSize() {
    663             int n = base - top;       // non-owner callers must read base first
    664             return (n >= 0) ? 0 : -n; // ignore transient negative
    665         }
    666 
    667         /**
    668          * Provides a more accurate estimate of whether this queue has
    669          * any tasks than does queueSize, by checking whether a
    670          * near-empty queue has at least one unclaimed task.
    671          */
    672         final boolean isEmpty() {
    673             ForkJoinTask<?>[] a; int m, s;
    674             int n = base - (s = top);
    675             return (n >= 0 ||
    676                     (n == -1 &&
    677                      ((a = array) == null ||
    678                       (m = a.length - 1) < 0 ||
    679                       U.getObject
    680                       (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
    681         }
    682 
    683         /**
    684          * Pushes a task. Call only by owner in unshared queues.  (The
    685          * shared-queue version is embedded in method externalPush.)
    686          *
    687          * @param task the task. Caller must ensure non-null.
    688          * @throws RejectedExecutionException if array cannot be resized
    689          */
    690         final void push(ForkJoinTask<?> task) {
    691             ForkJoinTask<?>[] a; ForkJoinPool p;
    692             int s = top, n;
    693             if ((a = array) != null) {    // ignore if queue removed
    694                 int m = a.length - 1;
    695                 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
    696                 if ((n = (top = s + 1) - base) <= 2)
    697                     (p = pool).signalWork(p.workQueues, this);
    698                 else if (n >= m)
    699                     growArray();
    700             }
    701         }
    702 
    703         /**
    704          * Initializes or doubles the capacity of array. Call either
    705          * by owner or with lock held -- it is OK for base, but not
    706          * top, to move while resizings are in progress.
    707          */
    708         final ForkJoinTask<?>[] growArray() {
    709             ForkJoinTask<?>[] oldA = array;
    710             int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
    711             if (size > MAXIMUM_QUEUE_CAPACITY)
    712                 throw new RejectedExecutionException("Queue capacity exceeded");
    713             int oldMask, t, b;
    714             ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
    715             if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
    716                 (t = top) - (b = base) > 0) {
    717                 int mask = size - 1;
    718                 do {
    719                     ForkJoinTask<?> x;
    720                     int oldj = ((b & oldMask) << ASHIFT) + ABASE;
    721                     int j    = ((b &    mask) << ASHIFT) + ABASE;
    722                     x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
    723                     if (x != null &&
    724                         U.compareAndSwapObject(oldA, oldj, x, null))
    725                         U.putObjectVolatile(a, j, x);
    726                 } while (++b != t);
    727             }
    728             return a;
    729         }
    730 
    731         /**
    732          * Takes next task, if one exists, in LIFO order.  Call only
    733          * by owner in unshared queues.
    734          */
    735         final ForkJoinTask<?> pop() {
    736             ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
    737             if ((a = array) != null && (m = a.length - 1) >= 0) {
    738                 for (int s; (s = top - 1) - base >= 0;) {
    739                     long j = ((m & s) << ASHIFT) + ABASE;
    740                     if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
    741                         break;
    742                     if (U.compareAndSwapObject(a, j, t, null)) {
    743                         top = s;
    744                         return t;
    745                     }
    746                 }
    747             }
    748             return null;
    749         }
    750 
    751         /**
    752          * Takes a task in FIFO order if b is base of queue and a task
    753          * can be claimed without contention. Specialized versions
    754          * appear in ForkJoinPool methods scan and tryHelpStealer.
    755          */
    756         final ForkJoinTask<?> pollAt(int b) {
    757             ForkJoinTask<?> t; ForkJoinTask<?>[] a;
    758             if ((a = array) != null) {
    759                 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
    760                 if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
    761                     base == b && U.compareAndSwapObject(a, j, t, null)) {
    762                     U.putOrderedInt(this, QBASE, b + 1);
    763                     return t;
    764                 }
    765             }
    766             return null;
    767         }
    768 
    769         /**
    770          * Takes next task, if one exists, in FIFO order.
    771          */
    772         final ForkJoinTask<?> poll() {
    773             ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
    774             while ((b = base) - top < 0 && (a = array) != null) {
    775                 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
    776                 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
    777                 if (t != null) {
    778                     if (U.compareAndSwapObject(a, j, t, null)) {
    779                         U.putOrderedInt(this, QBASE, b + 1);
    780                         return t;
    781                     }
    782                 }
    783                 else if (base == b) {
    784                     if (b + 1 == top)
    785                         break;
    786                     Thread.yield(); // wait for lagging update (very rare)
    787                 }
    788             }
    789             return null;
    790         }
    791 
    792         /**
    793          * Takes next task, if one exists, in order specified by mode.
    794          */
    795         final ForkJoinTask<?> nextLocalTask() {
    796             return mode == 0 ? pop() : poll();
    797         }
    798 
    799         /**
    800          * Returns next task, if one exists, in order specified by mode.
    801          */
    802         final ForkJoinTask<?> peek() {
    803             ForkJoinTask<?>[] a = array; int m;
    804             if (a == null || (m = a.length - 1) < 0)
    805                 return null;
    806             int i = mode == 0 ? top - 1 : base;
    807             int j = ((i & m) << ASHIFT) + ABASE;
    808             return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
    809         }
    810 
    811         /**
    812          * Pops the given task only if it is at the current top.
    813          * (A shared version is available only via FJP.tryExternalUnpush)
    814          */
    815         final boolean tryUnpush(ForkJoinTask<?> t) {
    816             ForkJoinTask<?>[] a; int s;
    817             if ((a = array) != null && (s = top) != base &&
    818                 U.compareAndSwapObject
    819                 (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
    820                 top = s;
    821                 return true;
    822             }
    823             return false;
    824         }
    825 
    826         /**
    827          * Removes and cancels all known tasks, ignoring any exceptions.
    828          */
    829         final void cancelAll() {
    830             ForkJoinTask.cancelIgnoringExceptions(currentJoin);
    831             ForkJoinTask.cancelIgnoringExceptions(currentSteal);
    832             for (ForkJoinTask<?> t; (t = poll()) != null; )
    833                 ForkJoinTask.cancelIgnoringExceptions(t);
    834         }
    835 
    836         // Specialized execution methods
    837 
    838         /**
    839          * Polls and runs tasks until empty.
    840          */
    841         final void pollAndExecAll() {
    842             for (ForkJoinTask<?> t; (t = poll()) != null;)
    843                 t.doExec();
    844         }
    845 
    846         /**
    847          * Executes a top-level task and any local tasks remaining
    848          * after execution.
    849          */
    850         final void runTask(ForkJoinTask<?> task) {
    851             if ((currentSteal = task) != null) {
    852                 task.doExec();
    853                 ForkJoinTask<?>[] a = array;
    854                 int md = mode;
    855                 ++nsteals;
    856                 currentSteal = null;
    857                 if (md != 0)
    858                     pollAndExecAll();
    859                 else if (a != null) {
    860                     int s, m = a.length - 1;
    861                     while ((s = top - 1) - base >= 0) {
    862                         long i = ((m & s) << ASHIFT) + ABASE;
    863                         ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObject(a, i);
    864                         if (t == null)
    865                             break;
    866                         if (U.compareAndSwapObject(a, i, t, null)) {
    867                             top = s;
    868                             t.doExec();
    869                         }
    870                     }
    871                 }
    872             }
    873         }
    874 
    875         /**
    876          * If present, removes from queue and executes the given task,
    877          * or any other cancelled task. Returns (true) on any CAS
    878          * or consistency check failure so caller can retry.
    879          *
    880          * @return false if no progress can be made, else true
    881          */
    882         final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
    883             boolean stat;
    884             ForkJoinTask<?>[] a; int m, s, b, n;
    885             if (task != null && (a = array) != null && (m = a.length - 1) >= 0 &&
    886                 (n = (s = top) - (b = base)) > 0) {
    887                 boolean removed = false, empty = true;
    888                 stat = true;
    889                 for (ForkJoinTask<?> t;;) {           // traverse from s to b
    890                     long j = ((--s & m) << ASHIFT) + ABASE;
    891                     t = (ForkJoinTask<?>)U.getObject(a, j);
    892                     if (t == null)                    // inconsistent length
    893                         break;
    894                     else if (t == task) {
    895                         if (s + 1 == top) {           // pop
    896                             if (!U.compareAndSwapObject(a, j, task, null))
    897                                 break;
    898                             top = s;
    899                             removed = true;
    900                         }
    901                         else if (base == b)           // replace with proxy
    902                             removed = U.compareAndSwapObject(a, j, task,
    903                                                              new EmptyTask());
    904                         break;
    905                     }
    906                     else if (t.status >= 0)
    907                         empty = false;
    908                     else if (s + 1 == top) {          // pop and throw away
    909                         if (U.compareAndSwapObject(a, j, t, null))
    910                             top = s;
    911                         break;
    912                     }
    913                     if (--n == 0) {
    914                         if (!empty && base == b)
    915                             stat = false;
    916                         break;
    917                     }
    918                 }
    919                 if (removed)
    920                     task.doExec();
    921             }
    922             else
    923                 stat = false;
    924             return stat;
    925         }
    926 
    927         /**
    928          * Tries to poll for and execute the given task or any other
    929          * task in its CountedCompleter computation.
    930          */
    931         final boolean pollAndExecCC(CountedCompleter<?> root) {
    932             ForkJoinTask<?>[] a; int b; Object o; CountedCompleter<?> t, r;
    933             if ((b = base) - top < 0 && (a = array) != null) {
    934                 long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
    935                 if ((o = U.getObjectVolatile(a, j)) == null)
    936                     return true; // retry
    937                 if (o instanceof CountedCompleter) {
    938                     for (t = (CountedCompleter<?>)o, r = t;;) {
    939                         if (r == root) {
    940                             if (base == b &&
    941                                 U.compareAndSwapObject(a, j, t, null)) {
    942                                 U.putOrderedInt(this, QBASE, b + 1);
    943                                 t.doExec();
    944                             }
    945                             return true;
    946                         }
    947                         else if ((r = r.completer) == null)
    948                             break; // not part of root computation
    949                     }
    950                 }
    951             }
    952             return false;
    953         }
    954 
    955         /**
    956          * Tries to pop and execute the given task or any other task
    957          * in its CountedCompleter computation.
    958          */
    959         final boolean externalPopAndExecCC(CountedCompleter<?> root) {
    960             ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
    961             if (base - (s = top) < 0 && (a = array) != null) {
    962                 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
    963                 if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
    964                     for (t = (CountedCompleter<?>)o, r = t;;) {
    965                         if (r == root) {
    966                             if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
    967                                 if (top == s && array == a &&
    968                                     U.compareAndSwapObject(a, j, t, null)) {
    969                                     top = s - 1;
    970                                     qlock = 0;
    971                                     t.doExec();
    972                                 }
    973                                 else
    974                                     qlock = 0;
    975                             }
    976                             return true;
    977                         }
    978                         else if ((r = r.completer) == null)
    979                             break;
    980                     }
    981                 }
    982             }
    983             return false;
    984         }
    985 
    986         /**
    987          * Internal version
    988          */
    989         final boolean internalPopAndExecCC(CountedCompleter<?> root) {
    990             ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
    991             if (base - (s = top) < 0 && (a = array) != null) {
    992                 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
    993                 if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
    994                     for (t = (CountedCompleter<?>)o, r = t;;) {
    995                         if (r == root) {
    996                             if (U.compareAndSwapObject(a, j, t, null)) {
    997                                 top = s - 1;
    998                                 t.doExec();
    999                             }
   1000                             return true;
   1001                         }
   1002                         else if ((r = r.completer) == null)
   1003                             break;
   1004                     }
   1005                 }
   1006             }
   1007             return false;
   1008         }
   1009 
   1010         /**
   1011          * Returns true if owned and not known to be blocked.
   1012          */
   1013         final boolean isApparentlyUnblocked() {
   1014             Thread wt; Thread.State s;
   1015             return (eventCount >= 0 &&
   1016                     (wt = owner) != null &&
   1017                     (s = wt.getState()) != Thread.State.BLOCKED &&
   1018                     s != Thread.State.WAITING &&
   1019                     s != Thread.State.TIMED_WAITING);
   1020         }
   1021 
   1022         // Unsafe mechanics
   1023         private static final sun.misc.Unsafe U;
   1024         private static final long QBASE;
   1025         private static final long QLOCK;
   1026         private static final int ABASE;
   1027         private static final int ASHIFT;
   1028         static {
   1029             try {
   1030                 U = sun.misc.Unsafe.getUnsafe();
   1031                 Class<?> k = WorkQueue.class;
   1032                 Class<?> ak = ForkJoinTask[].class;
   1033                 QBASE = U.objectFieldOffset
   1034                     (k.getDeclaredField("base"));
   1035                 QLOCK = U.objectFieldOffset
   1036                     (k.getDeclaredField("qlock"));
   1037                 ABASE = U.arrayBaseOffset(ak);
   1038                 int scale = U.arrayIndexScale(ak);
   1039                 if ((scale & (scale - 1)) != 0)
   1040                     throw new Error("data type scale not a power of two");
   1041                 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
   1042             } catch (Exception e) {
   1043                 throw new Error(e);
   1044             }
   1045         }
   1046     }
   1047 
   1048     // static fields (initialized in static initializer below)
   1049 
   1050     /**
   1051      * Per-thread submission bookkeeping. Shared across all pools
   1052      * to reduce ThreadLocal pollution and because random motion
   1053      * to avoid contention in one pool is likely to hold for others.
   1054      * Lazily initialized on first submission (but null-checked
   1055      * in other contexts to avoid unnecessary initialization).
   1056      */
   1057     static final ThreadLocal<Submitter> submitters;
   1058 
   1059     /**
   1060      * Creates a new ForkJoinWorkerThread. This factory is used unless
   1061      * overridden in ForkJoinPool constructors.
   1062      */
   1063     public static final ForkJoinWorkerThreadFactory
   1064         defaultForkJoinWorkerThreadFactory;
   1065 
   1066     /**
   1067      * Permission required for callers of methods that may start or
   1068      * kill threads.
   1069      */
   1070     private static final RuntimePermission modifyThreadPermission;
   1071 
   1072     /**
   1073      * Common (static) pool. Non-null for public use unless a static
   1074      * construction exception, but internal usages null-check on use
   1075      * to paranoically avoid potential initialization circularities
   1076      * as well as to simplify generated code.
   1077      */
   1078     static final ForkJoinPool common;
   1079 
   1080     /**
   1081      * Common pool parallelism. To allow simpler use and management
   1082      * when common pool threads are disabled, we allow the underlying
   1083      * common.parallelism field to be zero, but in that case still report
   1084      * parallelism as 1 to reflect resulting caller-runs mechanics.
   1085      */
   1086     static final int commonParallelism;
   1087 
   1088     /**
   1089      * Sequence number for creating workerNamePrefix.
   1090      */
   1091     private static int poolNumberSequence;
   1092 
   1093     /**
   1094      * Returns the next sequence number. We don't expect this to
   1095      * ever contend, so use simple builtin sync.
   1096      */
   1097     private static final synchronized int nextPoolId() {
   1098         return ++poolNumberSequence;
   1099     }
   1100 
   1101     // static constants
   1102 
   1103     /**
   1104      * Initial timeout value (in nanoseconds) for the thread
   1105      * triggering quiescence to park waiting for new work. On timeout,
   1106      * the thread will instead try to shrink the number of
   1107      * workers. The value should be large enough to avoid overly
   1108      * aggressive shrinkage during most transient stalls (long GCs
   1109      * etc).
   1110      */
   1111     private static final long IDLE_TIMEOUT      = 2000L * 1000L * 1000L; // 2sec
   1112 
   1113     /**
   1114      * Timeout value when there are more threads than parallelism level
   1115      */
   1116     private static final long FAST_IDLE_TIMEOUT =  200L * 1000L * 1000L;
   1117 
   1118     /**
   1119      * Tolerance for idle timeouts, to cope with timer undershoots
   1120      */
   1121     private static final long TIMEOUT_SLOP = 2000000L;
   1122 
   1123     /**
   1124      * The maximum stolen->joining link depth allowed in method
   1125      * tryHelpStealer.  Must be a power of two.  Depths for legitimate
   1126      * chains are unbounded, but we use a fixed constant to avoid
   1127      * (otherwise unchecked) cycles and to bound staleness of
   1128      * traversal parameters at the expense of sometimes blocking when
   1129      * we could be helping.
   1130      */
   1131     private static final int MAX_HELP = 64;
   1132 
   1133     /**
   1134      * Increment for seed generators. See class ThreadLocal for
   1135      * explanation.
   1136      */
   1137     private static final int SEED_INCREMENT = 0x61c88647;
   1138 
   1139     /*
   1140      * Bits and masks for control variables
   1141      *
   1142      * Field ctl is a long packed with:
   1143      * AC: Number of active running workers minus target parallelism (16 bits)
   1144      * TC: Number of total workers minus target parallelism (16 bits)
   1145      * ST: true if pool is terminating (1 bit)
   1146      * EC: the wait count of top waiting thread (15 bits)
   1147      * ID: poolIndex of top of Treiber stack of waiters (16 bits)
   1148      *
   1149      * When convenient, we can extract the upper 32 bits of counts and
   1150      * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
   1151      * (int)ctl.  The ec field is never accessed alone, but always
   1152      * together with id and st. The offsets of counts by the target
   1153      * parallelism and the positionings of fields makes it possible to
   1154      * perform the most common checks via sign tests of fields: When
   1155      * ac is negative, there are not enough active workers, when tc is
   1156      * negative, there are not enough total workers, and when e is
   1157      * negative, the pool is terminating.  To deal with these possibly
   1158      * negative fields, we use casts in and out of "short" and/or
   1159      * signed shifts to maintain signedness.
   1160      *
   1161      * When a thread is queued (inactivated), its eventCount field is
   1162      * set negative, which is the only way to tell if a worker is
   1163      * prevented from executing tasks, even though it must continue to
   1164      * scan for them to avoid queuing races. Note however that
   1165      * eventCount updates lag releases so usage requires care.
   1166      *
   1167      * Field plock is an int packed with:
   1168      * SHUTDOWN: true if shutdown is enabled (1 bit)
   1169      * SEQ:  a sequence lock, with PL_LOCK bit set if locked (30 bits)
   1170      * SIGNAL: set when threads may be waiting on the lock (1 bit)
   1171      *
   1172      * The sequence number enables simple consistency checks:
   1173      * Staleness of read-only operations on the workQueues array can
   1174      * be checked by comparing plock before vs after the reads.
   1175      */
   1176 
   1177     // bit positions/shifts for fields
   1178     private static final int  AC_SHIFT   = 48;
   1179     private static final int  TC_SHIFT   = 32;
   1180     private static final int  ST_SHIFT   = 31;
   1181     private static final int  EC_SHIFT   = 16;
   1182 
   1183     // bounds
   1184     private static final int  SMASK      = 0xffff;  // short bits
   1185     private static final int  MAX_CAP    = 0x7fff;  // max #workers - 1
   1186     private static final int  EVENMASK   = 0xfffe;  // even short bits
   1187     private static final int  SQMASK     = 0x007e;  // max 64 (even) slots
   1188     private static final int  SHORT_SIGN = 1 << 15;
   1189     private static final int  INT_SIGN   = 1 << 31;
   1190 
   1191     // masks
   1192     private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
   1193     private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
   1194     private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
   1195 
   1196     // units for incrementing and decrementing
   1197     private static final long TC_UNIT    = 1L << TC_SHIFT;
   1198     private static final long AC_UNIT    = 1L << AC_SHIFT;
   1199 
   1200     // masks and units for dealing with u = (int)(ctl >>> 32)
   1201     private static final int  UAC_SHIFT  = AC_SHIFT - 32;
   1202     private static final int  UTC_SHIFT  = TC_SHIFT - 32;
   1203     private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
   1204     private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
   1205     private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
   1206     private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
   1207 
   1208     // masks and units for dealing with e = (int)ctl
   1209     private static final int E_MASK      = 0x7fffffff; // no STOP_BIT
   1210     private static final int E_SEQ       = 1 << EC_SHIFT;
   1211 
   1212     // plock bits
   1213     private static final int SHUTDOWN    = 1 << 31;
   1214     private static final int PL_LOCK     = 2;
   1215     private static final int PL_SIGNAL   = 1;
   1216     private static final int PL_SPINS    = 1 << 8;
   1217 
   1218     // access mode for WorkQueue
   1219     static final int LIFO_QUEUE          =  0;
   1220     static final int FIFO_QUEUE          =  1;
   1221     static final int SHARED_QUEUE        = -1;
   1222 
   1223     // Heuristic padding to ameliorate unfortunate memory placements
   1224     volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
   1225 
   1226     // Instance fields
   1227     volatile long stealCount;                  // collects worker counts
   1228     volatile long ctl;                         // main pool control
   1229     volatile int plock;                        // shutdown status and seqLock
   1230     volatile int indexSeed;                    // worker/submitter index seed
   1231     final short parallelism;                   // parallelism level
   1232     final short mode;                          // LIFO/FIFO
   1233     WorkQueue[] workQueues;                    // main registry
   1234     final ForkJoinWorkerThreadFactory factory;
   1235     final UncaughtExceptionHandler ueh;        // per-worker UEH
   1236     final String workerNamePrefix;             // to create worker name string
   1237 
   1238     volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
   1239     volatile Object pad18, pad19, pad1a, pad1b;
   1240 
   1241     /**
   1242      * Acquires the plock lock to protect worker array and related
   1243      * updates. This method is called only if an initial CAS on plock
   1244      * fails. This acts as a spinlock for normal cases, but falls back
   1245      * to builtin monitor to block when (rarely) needed. This would be
   1246      * a terrible idea for a highly contended lock, but works fine as
   1247      * a more conservative alternative to a pure spinlock.
   1248      */
   1249     private int acquirePlock() {
   1250         int spins = PL_SPINS, ps, nps;
   1251         for (;;) {
   1252             if (((ps = plock) & PL_LOCK) == 0 &&
   1253                 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
   1254                 return nps;
   1255             else if (spins >= 0) {
   1256                 if (ThreadLocalRandom.current().nextInt() >= 0)
   1257                     --spins;
   1258             }
   1259             else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
   1260                 synchronized (this) {
   1261                     if ((plock & PL_SIGNAL) != 0) {
   1262                         try {
   1263                             wait();
   1264                         } catch (InterruptedException ie) {
   1265                             try {
   1266                                 Thread.currentThread().interrupt();
   1267                             } catch (SecurityException ignore) {
   1268                             }
   1269                         }
   1270                     }
   1271                     else
   1272                         notifyAll();
   1273                 }
   1274             }
   1275         }
   1276     }
   1277 
   1278     /**
   1279      * Unlocks and signals any thread waiting for plock. Called only
   1280      * when CAS of seq value for unlock fails.
   1281      */
   1282     private void releasePlock(int ps) {
   1283         plock = ps;
   1284         synchronized (this) { notifyAll(); }
   1285     }
   1286 
   1287     /**
   1288      * Tries to create and start one worker if fewer than target
   1289      * parallelism level exist. Adjusts counts etc on failure.
   1290      */
   1291     private void tryAddWorker() {
   1292         long c; int u, e;
   1293         while ((u = (int)((c = ctl) >>> 32)) < 0 &&
   1294                (u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) {
   1295             long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) |
   1296                               ((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e;
   1297             if (U.compareAndSwapLong(this, CTL, c, nc)) {
   1298                 ForkJoinWorkerThreadFactory fac;
   1299                 Throwable ex = null;
   1300                 ForkJoinWorkerThread wt = null;
   1301                 try {
   1302                     if ((fac = factory) != null &&
   1303                         (wt = fac.newThread(this)) != null) {
   1304                         wt.start();
   1305                         break;
   1306                     }
   1307                 } catch (Throwable rex) {
   1308                     ex = rex;
   1309                 }
   1310                 deregisterWorker(wt, ex);
   1311                 break;
   1312             }
   1313         }
   1314     }
   1315 
   1316     //  Registering and deregistering workers
   1317 
   1318     /**
   1319      * Callback from ForkJoinWorkerThread to establish and record its
   1320      * WorkQueue. To avoid scanning bias due to packing entries in
   1321      * front of the workQueues array, we treat the array as a simple
   1322      * power-of-two hash table using per-thread seed as hash,
   1323      * expanding as needed.
   1324      *
   1325      * @param wt the worker thread
   1326      * @return the worker's queue
   1327      */
   1328     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
   1329         UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
   1330         wt.setDaemon(true);
   1331         if ((handler = ueh) != null)
   1332             wt.setUncaughtExceptionHandler(handler);
   1333         do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
   1334                                           s += SEED_INCREMENT) ||
   1335                      s == 0); // skip 0
   1336         WorkQueue w = new WorkQueue(this, wt, mode, s);
   1337         if (((ps = plock) & PL_LOCK) != 0 ||
   1338             !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
   1339             ps = acquirePlock();
   1340         int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
   1341         try {
   1342             if ((ws = workQueues) != null) {    // skip if shutting down
   1343                 int n = ws.length, m = n - 1;
   1344                 int r = (s << 1) | 1;           // use odd-numbered indices
   1345                 if (ws[r &= m] != null) {       // collision
   1346                     int probes = 0;             // step by approx half size
   1347                     int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
   1348                     while (ws[r = (r + step) & m] != null) {
   1349                         if (++probes >= n) {
   1350                             workQueues = ws = Arrays.copyOf(ws, n <<= 1);
   1351                             m = n - 1;
   1352                             probes = 0;
   1353                         }
   1354                     }
   1355                 }
   1356                 w.poolIndex = (short)r;
   1357                 w.eventCount = r; // volatile write orders
   1358                 ws[r] = w;
   1359             }
   1360         } finally {
   1361             if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
   1362                 releasePlock(nps);
   1363         }
   1364         wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1)));
   1365         return w;
   1366     }
   1367 
   1368     /**
   1369      * Final callback from terminating worker, as well as upon failure
   1370      * to construct or start a worker.  Removes record of worker from
   1371      * array, and adjusts counts. If pool is shutting down, tries to
   1372      * complete termination.
   1373      *
   1374      * @param wt the worker thread, or null if construction failed
   1375      * @param ex the exception causing failure, or null if none
   1376      */
   1377     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
   1378         WorkQueue w = null;
   1379         if (wt != null && (w = wt.workQueue) != null) {
   1380             int ps; long sc;
   1381             w.qlock = -1;                // ensure set
   1382             do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
   1383                                                sc = stealCount,
   1384                                                sc + w.nsteals));
   1385             if (((ps = plock) & PL_LOCK) != 0 ||
   1386                 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
   1387                 ps = acquirePlock();
   1388             int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
   1389             try {
   1390                 int idx = w.poolIndex;
   1391                 WorkQueue[] ws = workQueues;
   1392                 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
   1393                     ws[idx] = null;
   1394             } finally {
   1395                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
   1396                     releasePlock(nps);
   1397             }
   1398         }
   1399 
   1400         long c;                          // adjust ctl counts
   1401         do {} while (!U.compareAndSwapLong
   1402                      (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
   1403                                            ((c - TC_UNIT) & TC_MASK) |
   1404                                            (c & ~(AC_MASK|TC_MASK)))));
   1405 
   1406         if (!tryTerminate(false, false) && w != null && w.array != null) {
   1407             w.cancelAll();               // cancel remaining tasks
   1408             WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
   1409             while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
   1410                 if (e > 0) {             // activate or create replacement
   1411                     if ((ws = workQueues) == null ||
   1412                         (i = e & SMASK) >= ws.length ||
   1413                         (v = ws[i]) == null)
   1414                         break;
   1415                     long nc = (((long)(v.nextWait & E_MASK)) |
   1416                                ((long)(u + UAC_UNIT) << 32));
   1417                     if (v.eventCount != (e | INT_SIGN))
   1418                         break;
   1419                     if (U.compareAndSwapLong(this, CTL, c, nc)) {
   1420                         v.eventCount = (e + E_SEQ) & E_MASK;
   1421                         if ((p = v.parker) != null)
   1422                             U.unpark(p);
   1423                         break;
   1424                     }
   1425                 }
   1426                 else {
   1427                     if ((short)u < 0)
   1428                         tryAddWorker();
   1429                     break;
   1430                 }
   1431             }
   1432         }
   1433         if (ex == null)                     // help clean refs on way out
   1434             ForkJoinTask.helpExpungeStaleExceptions();
   1435         else                                // rethrow
   1436             ForkJoinTask.rethrow(ex);
   1437     }
   1438 
   1439     // Submissions
   1440 
   1441     /**
   1442      * Per-thread records for threads that submit to pools. Currently
   1443      * holds only pseudo-random seed / index that is used to choose
   1444      * submission queues in method externalPush. In the future, this may
   1445      * also incorporate a means to implement different task rejection
   1446      * and resubmission policies.
   1447      *
   1448      * Seeds for submitters and workers/workQueues work in basically
   1449      * the same way but are initialized and updated using slightly
   1450      * different mechanics. Both are initialized using the same
   1451      * approach as in class ThreadLocal, where successive values are
   1452      * unlikely to collide with previous values. Seeds are then
   1453      * randomly modified upon collisions using xorshifts, which
   1454      * requires a non-zero seed.
   1455      */
   1456     static final class Submitter {
   1457         int seed;
   1458         Submitter(int s) { seed = s; }
   1459     }
   1460 
   1461     /**
   1462      * Unless shutting down, adds the given task to a submission queue
   1463      * at submitter's current queue index (modulo submission
   1464      * range). Only the most common path is directly handled in this
   1465      * method. All others are relayed to fullExternalPush.
   1466      *
   1467      * @param task the task. Caller must ensure non-null.
   1468      */
   1469     final void externalPush(ForkJoinTask<?> task) {
   1470         Submitter z = submitters.get();
   1471         WorkQueue q; int r, m, s, n, am; ForkJoinTask<?>[] a;
   1472         int ps = plock;
   1473         WorkQueue[] ws = workQueues;
   1474         if (z != null && ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 &&
   1475             (q = ws[m & (r = z.seed) & SQMASK]) != null && r != 0 &&
   1476             U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
   1477             if ((a = q.array) != null &&
   1478                 (am = a.length - 1) > (n = (s = q.top) - q.base)) {
   1479                 int j = ((am & s) << ASHIFT) + ABASE;
   1480                 U.putOrderedObject(a, j, task);
   1481                 q.top = s + 1;                     // push on to deque
   1482                 q.qlock = 0;
   1483                 if (n <= 1)
   1484                     signalWork(ws, q);
   1485                 return;
   1486             }
   1487             q.qlock = 0;
   1488         }
   1489         fullExternalPush(task);
   1490     }
   1491 
   1492     /**
   1493      * Full version of externalPush. This method is called, among
   1494      * other times, upon the first submission of the first task to the
   1495      * pool, so must perform secondary initialization.  It also
   1496      * detects first submission by an external thread by looking up
   1497      * its ThreadLocal, and creates a new shared queue if the one at
   1498      * index if empty or contended. The plock lock body must be
   1499      * exception-free (so no try/finally) so we optimistically
   1500      * allocate new queues outside the lock and throw them away if
   1501      * (very rarely) not needed.
   1502      *
   1503      * Secondary initialization occurs when plock is zero, to create
   1504      * workQueue array and set plock to a valid value.  This lock body
   1505      * must also be exception-free. Because the plock seq value can
   1506      * eventually wrap around zero, this method harmlessly fails to
   1507      * reinitialize if workQueues exists, while still advancing plock.
   1508      */
   1509     private void fullExternalPush(ForkJoinTask<?> task) {
   1510         int r = 0; // random index seed
   1511         for (Submitter z = submitters.get();;) {
   1512             WorkQueue[] ws; WorkQueue q; int ps, m, k;
   1513             if (z == null) {
   1514                 if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed,
   1515                                         r += SEED_INCREMENT) && r != 0)
   1516                     submitters.set(z = new Submitter(r));
   1517             }
   1518             else if (r == 0) {                  // move to a different index
   1519                 r = z.seed;
   1520                 r ^= r << 13;                   // same xorshift as WorkQueues
   1521                 r ^= r >>> 17;
   1522                 z.seed = r ^= (r << 5);
   1523             }
   1524             if ((ps = plock) < 0)
   1525                 throw new RejectedExecutionException();
   1526             else if (ps == 0 || (ws = workQueues) == null ||
   1527                      (m = ws.length - 1) < 0) { // initialize workQueues
   1528                 int p = parallelism;            // find power of two table size
   1529                 int n = (p > 1) ? p - 1 : 1;    // ensure at least 2 slots
   1530                 n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
   1531                 n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
   1532                 WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ?
   1533                                    new WorkQueue[n] : null);
   1534                 if (((ps = plock) & PL_LOCK) != 0 ||
   1535                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
   1536                     ps = acquirePlock();
   1537                 if (((ws = workQueues) == null || ws.length == 0) && nws != null)
   1538                     workQueues = nws;
   1539                 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
   1540                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
   1541                     releasePlock(nps);
   1542             }
   1543             else if ((q = ws[k = r & m & SQMASK]) != null) {
   1544                 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
   1545                     ForkJoinTask<?>[] a = q.array;
   1546                     int s = q.top;
   1547                     boolean submitted = false;
   1548                     try {                      // locked version of push
   1549                         if ((a != null && a.length > s + 1 - q.base) ||
   1550                             (a = q.growArray()) != null) {   // must presize
   1551                             int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
   1552                             U.putOrderedObject(a, j, task);
   1553                             q.top = s + 1;
   1554                             submitted = true;
   1555                         }
   1556                     } finally {
   1557                         q.qlock = 0;  // unlock
   1558                     }
   1559                     if (submitted) {
   1560                         signalWork(ws, q);
   1561                         return;
   1562                     }
   1563                 }
   1564                 r = 0; // move on failure
   1565             }
   1566             else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
   1567                 q = new WorkQueue(this, null, SHARED_QUEUE, r);
   1568                 q.poolIndex = (short)k;
   1569                 if (((ps = plock) & PL_LOCK) != 0 ||
   1570                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
   1571                     ps = acquirePlock();
   1572                 if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
   1573                     ws[k] = q;
   1574                 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
   1575                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
   1576                     releasePlock(nps);
   1577             }
   1578             else
   1579                 r = 0;
   1580         }
   1581     }
   1582 
   1583     // Maintaining ctl counts
   1584 
   1585     /**
   1586      * Increments active count; mainly called upon return from blocking.
   1587      */
   1588     final void incrementActiveCount() {
   1589         long c;
   1590         do {} while (!U.compareAndSwapLong
   1591                      (this, CTL, c = ctl, ((c & ~AC_MASK) |
   1592                                            ((c & AC_MASK) + AC_UNIT))));
   1593     }
   1594 
   1595     /**
   1596      * Tries to create or activate a worker if too few are active.
   1597      *
   1598      * @param ws the worker array to use to find signallees
   1599      * @param q if non-null, the queue holding tasks to be processed
   1600      */
   1601     final void signalWork(WorkQueue[] ws, WorkQueue q) {
   1602         for (;;) {
   1603             long c; int e, u, i; WorkQueue w; Thread p;
   1604             if ((u = (int)((c = ctl) >>> 32)) >= 0)
   1605                 break;
   1606             if ((e = (int)c) <= 0) {
   1607                 if ((short)u < 0)
   1608                     tryAddWorker();
   1609                 break;
   1610             }
   1611             if (ws == null || ws.length <= (i = e & SMASK) ||
   1612                 (w = ws[i]) == null)
   1613                 break;
   1614             long nc = (((long)(w.nextWait & E_MASK)) |
   1615                        ((long)(u + UAC_UNIT)) << 32);
   1616             int ne = (e + E_SEQ) & E_MASK;
   1617             if (w.eventCount == (e | INT_SIGN) &&
   1618                 U.compareAndSwapLong(this, CTL, c, nc)) {
   1619                 w.eventCount = ne;
   1620                 if ((p = w.parker) != null)
   1621                     U.unpark(p);
   1622                 break;
   1623             }
   1624             if (q != null && q.base >= q.top)
   1625                 break;
   1626         }
   1627     }
   1628 
   1629     // Scanning for tasks
   1630 
   1631     /**
   1632      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
   1633      */
   1634     final void runWorker(WorkQueue w) {
   1635         w.growArray(); // allocate queue
   1636         for (int r = w.hint; scan(w, r) == 0; ) {
   1637             r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
   1638         }
   1639     }
   1640 
   1641     /**
   1642      * Scans for and, if found, runs one task, else possibly
   1643      * inactivates the worker. This method operates on single reads of
   1644      * volatile state and is designed to be re-invoked continuously,
   1645      * in part because it returns upon detecting inconsistencies,
   1646      * contention, or state changes that indicate possible success on
   1647      * re-invocation.
   1648      *
   1649      * The scan searches for tasks across queues starting at a random
   1650      * index, checking each at least twice.  The scan terminates upon
   1651      * either finding a non-empty queue, or completing the sweep. If
   1652      * the worker is not inactivated, it takes and runs a task from
   1653      * this queue. Otherwise, if not activated, it tries to activate
   1654      * itself or some other worker by signalling. On failure to find a
   1655      * task, returns (for retry) if pool state may have changed during
   1656      * an empty scan, or tries to inactivate if active, else possibly
   1657      * blocks or terminates via method awaitWork.
   1658      *
   1659      * @param w the worker (via its WorkQueue)
   1660      * @param r a random seed
   1661      * @return worker qlock status if would have waited, else 0
   1662      */
   1663     private final int scan(WorkQueue w, int r) {
   1664         WorkQueue[] ws; int m;
   1665         long c = ctl;                            // for consistency check
   1666         if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
   1667             for (int j = m + m + 1, ec = w.eventCount;;) {
   1668                 WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
   1669                 if ((q = ws[(r - j) & m]) != null &&
   1670                     (b = q.base) - q.top < 0 && (a = q.array) != null) {
   1671                     long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
   1672                     if ((t = ((ForkJoinTask<?>)
   1673                               U.getObjectVolatile(a, i))) != null) {
   1674                         if (ec < 0)
   1675                             helpRelease(c, ws, w, q, b);
   1676                         else if (q.base == b &&
   1677                                  U.compareAndSwapObject(a, i, t, null)) {
   1678                             U.putOrderedInt(q, QBASE, b + 1);
   1679                             if ((b + 1) - q.top < 0)
   1680                                 signalWork(ws, q);
   1681                             w.runTask(t);
   1682                         }
   1683                     }
   1684                     break;
   1685                 }
   1686                 else if (--j < 0) {
   1687                     if ((ec | (e = (int)c)) < 0) // inactive or terminating
   1688                         return awaitWork(w, c, ec);
   1689                     else if (ctl == c) {         // try to inactivate and enqueue
   1690                         long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
   1691                         w.nextWait = e;
   1692                         w.eventCount = ec | INT_SIGN;
   1693                         if (!U.compareAndSwapLong(this, CTL, c, nc))
   1694                             w.eventCount = ec;   // back out
   1695                     }
   1696                     break;
   1697                 }
   1698             }
   1699         }
   1700         return 0;
   1701     }
   1702 
   1703     /**
   1704      * A continuation of scan(), possibly blocking or terminating
   1705      * worker w. Returns without blocking if pool state has apparently
   1706      * changed since last invocation.  Also, if inactivating w has
   1707      * caused the pool to become quiescent, checks for pool
   1708      * termination, and, so long as this is not the only worker, waits
   1709      * for event for up to a given duration.  On timeout, if ctl has
   1710      * not changed, terminates the worker, which will in turn wake up
   1711      * another worker to possibly repeat this process.
   1712      *
   1713      * @param w the calling worker
   1714      * @param c the ctl value on entry to scan
   1715      * @param ec the worker's eventCount on entry to scan
   1716      */
   1717     private final int awaitWork(WorkQueue w, long c, int ec) {
   1718         int stat, ns; long parkTime, deadline;
   1719         if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c &&
   1720             !Thread.interrupted()) {
   1721             int e = (int)c;
   1722             int u = (int)(c >>> 32);
   1723             int d = (u >> UAC_SHIFT) + parallelism; // active count
   1724 
   1725             if (e < 0 || (d <= 0 && tryTerminate(false, false)))
   1726                 stat = w.qlock = -1;          // pool is terminating
   1727             else if ((ns = w.nsteals) != 0) { // collect steals and retry
   1728                 long sc;
   1729                 w.nsteals = 0;
   1730                 do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
   1731                                                    sc = stealCount, sc + ns));
   1732             }
   1733             else {
   1734                 long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L :
   1735                            ((long)(w.nextWait & E_MASK)) | // ctl to restore
   1736                            ((long)(u + UAC_UNIT)) << 32);
   1737                 if (pc != 0L) {               // timed wait if last waiter
   1738                     int dc = -(short)(c >>> TC_SHIFT);
   1739                     parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT:
   1740                                 (dc + 1) * IDLE_TIMEOUT);
   1741                     deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
   1742                 }
   1743                 else
   1744                     parkTime = deadline = 0L;
   1745                 if (w.eventCount == ec && ctl == c) {
   1746                     Thread wt = Thread.currentThread();
   1747                     U.putObject(wt, PARKBLOCKER, this);
   1748                     w.parker = wt;            // emulate LockSupport.park
   1749                     if (w.eventCount == ec && ctl == c)
   1750                         U.park(false, parkTime);  // must recheck before park
   1751                     w.parker = null;
   1752                     U.putObject(wt, PARKBLOCKER, null);
   1753                     if (parkTime != 0L && ctl == c &&
   1754                         deadline - System.nanoTime() <= 0L &&
   1755                         U.compareAndSwapLong(this, CTL, c, pc))
   1756                         stat = w.qlock = -1;  // shrink pool
   1757                 }
   1758             }
   1759         }
   1760         return stat;
   1761     }
   1762 
   1763     /**
   1764      * Possibly releases (signals) a worker. Called only from scan()
   1765      * when a worker with apparently inactive status finds a non-empty
   1766      * queue. This requires revalidating all of the associated state
   1767      * from caller.
   1768      */
   1769     private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w,
   1770                                    WorkQueue q, int b) {
   1771         WorkQueue v; int e, i; Thread p;
   1772         if (w != null && w.eventCount < 0 && (e = (int)c) > 0 &&
   1773             ws != null && ws.length > (i = e & SMASK) &&
   1774             (v = ws[i]) != null && ctl == c) {
   1775             long nc = (((long)(v.nextWait & E_MASK)) |
   1776                        ((long)((int)(c >>> 32) + UAC_UNIT)) << 32);
   1777             int ne = (e + E_SEQ) & E_MASK;
   1778             if (q != null && q.base == b && w.eventCount < 0 &&
   1779                 v.eventCount == (e | INT_SIGN) &&
   1780                 U.compareAndSwapLong(this, CTL, c, nc)) {
   1781                 v.eventCount = ne;
   1782                 if ((p = v.parker) != null)
   1783                     U.unpark(p);
   1784             }
   1785         }
   1786     }
   1787 
   1788     /**
   1789      * Tries to locate and execute tasks for a stealer of the given
   1790      * task, or in turn one of its stealers, Traces currentSteal ->
   1791      * currentJoin links looking for a thread working on a descendant
   1792      * of the given task and with a non-empty queue to steal back and
   1793      * execute tasks from. The first call to this method upon a
   1794      * waiting join will often entail scanning/search, (which is OK
   1795      * because the joiner has nothing better to do), but this method
   1796      * leaves hints in workers to speed up subsequent calls. The
   1797      * implementation is very branchy to cope with potential
   1798      * inconsistencies or loops encountering chains that are stale,
   1799      * unknown, or so long that they are likely cyclic.
   1800      *
   1801      * @param joiner the joining worker
   1802      * @param task the task to join
   1803      * @return 0 if no progress can be made, negative if task
   1804      * known complete, else positive
   1805      */
   1806     private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
   1807         int stat = 0, steps = 0;                    // bound to avoid cycles
   1808         if (task != null && joiner != null &&
   1809             joiner.base - joiner.top >= 0) {        // hoist checks
   1810             restart: for (;;) {
   1811                 ForkJoinTask<?> subtask = task;     // current target
   1812                 for (WorkQueue j = joiner, v;;) {   // v is stealer of subtask
   1813                     WorkQueue[] ws; int m, s, h;
   1814                     if ((s = task.status) < 0) {
   1815                         stat = s;
   1816                         break restart;
   1817                     }
   1818                     if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
   1819                         break restart;              // shutting down
   1820                     if ((v = ws[h = (j.hint | 1) & m]) == null ||
   1821                         v.currentSteal != subtask) {
   1822                         for (int origin = h;;) {    // find stealer
   1823                             if (((h = (h + 2) & m) & 15) == 1 &&
   1824                                 (subtask.status < 0 || j.currentJoin != subtask))
   1825                                 continue restart;   // occasional staleness check
   1826                             if ((v = ws[h]) != null &&
   1827                                 v.currentSteal == subtask) {
   1828                                 j.hint = h;        // save hint
   1829                                 break;
   1830                             }
   1831                             if (h == origin)
   1832                                 break restart;      // cannot find stealer
   1833                         }
   1834                     }
   1835                     for (;;) { // help stealer or descend to its stealer
   1836                         ForkJoinTask[] a; int b;
   1837                         if (subtask.status < 0)     // surround probes with
   1838                             continue restart;       //   consistency checks
   1839                         if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
   1840                             int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
   1841                             ForkJoinTask<?> t =
   1842                                 (ForkJoinTask<?>)U.getObjectVolatile(a, i);
   1843                             if (subtask.status < 0 || j.currentJoin != subtask ||
   1844                                 v.currentSteal != subtask)
   1845                                 continue restart;   // stale
   1846                             stat = 1;               // apparent progress
   1847                             if (v.base == b) {
   1848                                 if (t == null)
   1849                                     break restart;
   1850                                 if (U.compareAndSwapObject(a, i, t, null)) {
   1851                                     U.putOrderedInt(v, QBASE, b + 1);
   1852                                     ForkJoinTask<?> ps = joiner.currentSteal;
   1853                                     int jt = joiner.top;
   1854                                     do {
   1855                                         joiner.currentSteal = t;
   1856                                         t.doExec(); // clear local tasks too
   1857                                     } while (task.status >= 0 &&
   1858                                              joiner.top != jt &&
   1859                                              (t = joiner.pop()) != null);
   1860                                     joiner.currentSteal = ps;
   1861                                     break restart;
   1862                                 }
   1863                             }
   1864                         }
   1865                         else {                      // empty -- try to descend
   1866                             ForkJoinTask<?> next = v.currentJoin;
   1867                             if (subtask.status < 0 || j.currentJoin != subtask ||
   1868                                 v.currentSteal != subtask)
   1869                                 continue restart;   // stale
   1870                             else if (next == null || ++steps == MAX_HELP)
   1871                                 break restart;      // dead-end or maybe cyclic
   1872                             else {
   1873                                 subtask = next;
   1874                                 j = v;
   1875                                 break;
   1876                             }
   1877                         }
   1878                     }
   1879                 }
   1880             }
   1881         }
   1882         return stat;
   1883     }
   1884 
   1885     /**
   1886      * Analog of tryHelpStealer for CountedCompleters. Tries to steal
   1887      * and run tasks within the target's computation.
   1888      *
   1889      * @param task the task to join
   1890      */
   1891     private int helpComplete(WorkQueue joiner, CountedCompleter<?> task) {
   1892         WorkQueue[] ws; int m;
   1893         int s = 0;
   1894         if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
   1895             joiner != null && task != null) {
   1896             int j = joiner.poolIndex;
   1897             int scans = m + m + 1;
   1898             long c = 0L;              // for stability check
   1899             for (int k = scans; ; j += 2) {
   1900                 WorkQueue q;
   1901                 if ((s = task.status) < 0)
   1902                     break;
   1903                 else if (joiner.internalPopAndExecCC(task))
   1904                     k = scans;
   1905                 else if ((s = task.status) < 0)
   1906                     break;
   1907                 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task))
   1908                     k = scans;
   1909                 else if (--k < 0) {
   1910                     if (c == (c = ctl))
   1911                         break;
   1912                     k = scans;
   1913                 }
   1914             }
   1915         }
   1916         return s;
   1917     }
   1918 
   1919     /**
   1920      * Tries to decrement active count (sometimes implicitly) and
   1921      * possibly release or create a compensating worker in preparation
   1922      * for blocking. Fails on contention or termination. Otherwise,
   1923      * adds a new thread if no idle workers are available and pool
   1924      * may become starved.
   1925      *
   1926      * @param c the assumed ctl value
   1927      */
   1928     final boolean tryCompensate(long c) {
   1929         WorkQueue[] ws = workQueues;
   1930         int pc = parallelism, e = (int)c, m, tc;
   1931         if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) {
   1932             WorkQueue w = ws[e & m];
   1933             if (e != 0 && w != null) {
   1934                 Thread p;
   1935                 long nc = ((long)(w.nextWait & E_MASK) |
   1936                            (c & (AC_MASK|TC_MASK)));
   1937                 int ne = (e + E_SEQ) & E_MASK;
   1938                 if (w.eventCount == (e | INT_SIGN) &&
   1939                     U.compareAndSwapLong(this, CTL, c, nc)) {
   1940                     w.eventCount = ne;
   1941                     if ((p = w.parker) != null)
   1942                         U.unpark(p);
   1943                     return true;   // replace with idle worker
   1944                 }
   1945             }
   1946             else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 &&
   1947                      (int)(c >> AC_SHIFT) + pc > 1) {
   1948                 long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
   1949                 if (U.compareAndSwapLong(this, CTL, c, nc))
   1950                     return true;   // no compensation
   1951             }
   1952             else if (tc + pc < MAX_CAP) {
   1953                 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
   1954                 if (U.compareAndSwapLong(this, CTL, c, nc)) {
   1955                     ForkJoinWorkerThreadFactory fac;
   1956                     Throwable ex = null;
   1957                     ForkJoinWorkerThread wt = null;
   1958                     try {
   1959                         if ((fac = factory) != null &&
   1960                             (wt = fac.newThread(this)) != null) {
   1961                             wt.start();
   1962                             return true;
   1963                         }
   1964                     } catch (Throwable rex) {
   1965                         ex = rex;
   1966                     }
   1967                     deregisterWorker(wt, ex); // clean up and return false
   1968                 }
   1969             }
   1970         }
   1971         return false;
   1972     }
   1973 
   1974     /**
   1975      * Helps and/or blocks until the given task is done.
   1976      *
   1977      * @param joiner the joining worker
   1978      * @param task the task
   1979      * @return task status on exit
   1980      */
   1981     final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
   1982         int s = 0;
   1983         if (task != null && (s = task.status) >= 0 && joiner != null) {
   1984             ForkJoinTask<?> prevJoin = joiner.currentJoin;
   1985             joiner.currentJoin = task;
   1986             do {} while (joiner.tryRemoveAndExec(task) && // process local tasks
   1987                          (s = task.status) >= 0);
   1988             if (s >= 0 && (task instanceof CountedCompleter))
   1989                 s = helpComplete(joiner, (CountedCompleter<?>)task);
   1990             long cc = 0;        // for stability checks
   1991             while (s >= 0 && (s = task.status) >= 0) {
   1992                 if ((s = tryHelpStealer(joiner, task)) == 0 &&
   1993                     (s = task.status) >= 0) {
   1994                     if (!tryCompensate(cc))
   1995                         cc = ctl;
   1996                     else {
   1997                         if (task.trySetSignal() && (s = task.status) >= 0) {
   1998                             synchronized (task) {
   1999                                 if (task.status >= 0) {
   2000                                     try {                // see ForkJoinTask
   2001                                         task.wait();     //  for explanation
   2002                                     } catch (InterruptedException ie) {
   2003                                     }
   2004                                 }
   2005                                 else
   2006                                     task.notifyAll();
   2007                             }
   2008                         }
   2009                         long c; // reactivate
   2010                         do {} while (!U.compareAndSwapLong
   2011                                      (this, CTL, c = ctl,
   2012                                       ((c & ~AC_MASK) |
   2013                                        ((c & AC_MASK) + AC_UNIT))));
   2014                     }
   2015                 }
   2016             }
   2017             joiner.currentJoin = prevJoin;
   2018         }
   2019         return s;
   2020     }
   2021 
   2022     /**
   2023      * Stripped-down variant of awaitJoin used by timed joins. Tries
   2024      * to help join only while there is continuous progress. (Caller
   2025      * will then enter a timed wait.)
   2026      *
   2027      * @param joiner the joining worker
   2028      * @param task the task
   2029      */
   2030     final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) {
   2031         int s;
   2032         if (joiner != null && task != null && (s = task.status) >= 0) {
   2033             ForkJoinTask<?> prevJoin = joiner.currentJoin;
   2034             joiner.currentJoin = task;
   2035             do {} while (joiner.tryRemoveAndExec(task) && // process local tasks
   2036                          (s = task.status) >= 0);
   2037             if (s >= 0) {
   2038                 if (task instanceof CountedCompleter)
   2039                     helpComplete(joiner, (CountedCompleter<?>)task);
   2040                 do {} while (task.status >= 0 &&
   2041                              tryHelpStealer(joiner, task) > 0);
   2042             }
   2043             joiner.currentJoin = prevJoin;
   2044         }
   2045     }
   2046 
   2047     /**
   2048      * Returns a (probably) non-empty steal queue, if one is found
   2049      * during a scan, else null.  This method must be retried by
   2050      * caller if, by the time it tries to use the queue, it is empty.
   2051      */
   2052     private WorkQueue findNonEmptyStealQueue() {
   2053         int r = ThreadLocalRandom.current().nextInt();
   2054         for (;;) {
   2055             int ps = plock, m; WorkQueue[] ws; WorkQueue q;
   2056             if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
   2057                 for (int j = (m + 1) << 2; j >= 0; --j) {
   2058                     if ((q = ws[(((r - j) << 1) | 1) & m]) != null &&
   2059                         q.base - q.top < 0)
   2060                         return q;
   2061                 }
   2062             }
   2063             if (plock == ps)
   2064                 return null;
   2065         }
   2066     }
   2067 
   2068     /**
   2069      * Runs tasks until {@code isQuiescent()}. We piggyback on
   2070      * active count ctl maintenance, but rather than blocking
   2071      * when tasks cannot be found, we rescan until all others cannot
   2072      * find tasks either.
   2073      */
   2074     final void helpQuiescePool(WorkQueue w) {
   2075         ForkJoinTask<?> ps = w.currentSteal;
   2076         for (boolean active = true;;) {
   2077             long c; WorkQueue q; ForkJoinTask<?> t; int b;
   2078             while ((t = w.nextLocalTask()) != null)
   2079                 t.doExec();
   2080             if ((q = findNonEmptyStealQueue()) != null) {
   2081                 if (!active) {      // re-establish active count
   2082                     active = true;
   2083                     do {} while (!U.compareAndSwapLong
   2084                                  (this, CTL, c = ctl,
   2085                                   ((c & ~AC_MASK) |
   2086                                    ((c & AC_MASK) + AC_UNIT))));
   2087                 }
   2088                 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
   2089                     (w.currentSteal = t).doExec();
   2090                     w.currentSteal = ps;
   2091                 }
   2092             }
   2093             else if (active) {       // decrement active count without queuing
   2094                 long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT);
   2095                 if ((int)(nc >> AC_SHIFT) + parallelism == 0)
   2096                     break;          // bypass decrement-then-increment
   2097                 if (U.compareAndSwapLong(this, CTL, c, nc))
   2098                     active = false;
   2099             }
   2100             else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 &&
   2101                      U.compareAndSwapLong
   2102                      (this, CTL, c, ((c & ~AC_MASK) |
   2103                                      ((c & AC_MASK) + AC_UNIT))))
   2104                 break;
   2105         }
   2106     }
   2107 
   2108     /**
   2109      * Gets and removes a local or stolen task for the given worker.
   2110      *
   2111      * @return a task, if available
   2112      */
   2113     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
   2114         for (ForkJoinTask<?> t;;) {
   2115             WorkQueue q; int b;
   2116             if ((t = w.nextLocalTask()) != null)
   2117                 return t;
   2118             if ((q = findNonEmptyStealQueue()) == null)
   2119                 return null;
   2120             if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
   2121                 return t;
   2122         }
   2123     }
   2124 
   2125     /**
   2126      * Returns a cheap heuristic guide for task partitioning when
   2127      * programmers, frameworks, tools, or languages have little or no
   2128      * idea about task granularity.  In essence by offering this
   2129      * method, we ask users only about tradeoffs in overhead vs
   2130      * expected throughput and its variance, rather than how finely to
   2131      * partition tasks.
   2132      *
   2133      * In a steady state strict (tree-structured) computation, each
   2134      * thread makes available for stealing enough tasks for other
   2135      * threads to remain active. Inductively, if all threads play by
   2136      * the same rules, each thread should make available only a
   2137      * constant number of tasks.
   2138      *
   2139      * The minimum useful constant is just 1. But using a value of 1
   2140      * would require immediate replenishment upon each steal to
   2141      * maintain enough tasks, which is infeasible.  Further,
   2142      * partitionings/granularities of offered tasks should minimize
   2143      * steal rates, which in general means that threads nearer the top
   2144      * of computation tree should generate more than those nearer the
   2145      * bottom. In perfect steady state, each thread is at
   2146      * approximately the same level of computation tree. However,
   2147      * producing extra tasks amortizes the uncertainty of progress and
   2148      * diffusion assumptions.
   2149      *
   2150      * So, users will want to use values larger (but not much larger)
   2151      * than 1 to both smooth over transient shortages and hedge
   2152      * against uneven progress; as traded off against the cost of
   2153      * extra task overhead. We leave the user to pick a threshold
   2154      * value to compare with the results of this call to guide
   2155      * decisions, but recommend values such as 3.
   2156      *
   2157      * When all threads are active, it is on average OK to estimate
   2158      * surplus strictly locally. In steady-state, if one thread is
   2159      * maintaining say 2 surplus tasks, then so are others. So we can
   2160      * just use estimated queue length.  However, this strategy alone
   2161      * leads to serious mis-estimates in some non-steady-state
   2162      * conditions (ramp-up, ramp-down, other stalls). We can detect
   2163      * many of these by further considering the number of "idle"
   2164      * threads, that are known to have zero queued tasks, so
   2165      * compensate by a factor of (#idle/#active) threads.
   2166      *
   2167      * Note: The approximation of #busy workers as #active workers is
   2168      * not very good under current signalling scheme, and should be
   2169      * improved.
   2170      */
   2171     static int getSurplusQueuedTaskCount() {
   2172         Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
   2173         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
   2174             int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).parallelism;
   2175             int n = (q = wt.workQueue).top - q.base;
   2176             int a = (int)(pool.ctl >> AC_SHIFT) + p;
   2177             return n - (a > (p >>>= 1) ? 0 :
   2178                         a > (p >>>= 1) ? 1 :
   2179                         a > (p >>>= 1) ? 2 :
   2180                         a > (p >>>= 1) ? 4 :
   2181                         8);
   2182         }
   2183         return 0;
   2184     }
   2185 
   2186     //  Termination
   2187 
   2188     /**
   2189      * Possibly initiates and/or completes termination.  The caller
   2190      * triggering termination runs three passes through workQueues:
   2191      * (0) Setting termination status, followed by wakeups of queued
   2192      * workers; (1) cancelling all tasks; (2) interrupting lagging
   2193      * threads (likely in external tasks, but possibly also blocked in
   2194      * joins).  Each pass repeats previous steps because of potential
   2195      * lagging thread creation.
   2196      *
   2197      * @param now if true, unconditionally terminate, else only
   2198      * if no work and no active workers
   2199      * @param enable if true, enable shutdown when next possible
   2200      * @return true if now terminating or terminated
   2201      */
   2202     private boolean tryTerminate(boolean now, boolean enable) {
   2203         int ps;
   2204         if (this == common)                        // cannot shut down
   2205             return false;
   2206         if ((ps = plock) >= 0) {                   // enable by setting plock
   2207             if (!enable)
   2208                 return false;
   2209             if ((ps & PL_LOCK) != 0 ||
   2210                 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
   2211                 ps = acquirePlock();
   2212             int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN;
   2213             if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
   2214                 releasePlock(nps);
   2215         }
   2216         for (long c;;) {
   2217             if (((c = ctl) & STOP_BIT) != 0) {     // already terminating
   2218                 if ((short)(c >>> TC_SHIFT) + parallelism <= 0) {
   2219                     synchronized (this) {
   2220                         notifyAll();               // signal when 0 workers
   2221                     }
   2222                 }
   2223                 return true;
   2224             }
   2225             if (!now) {                            // check if idle & no tasks
   2226                 WorkQueue[] ws; WorkQueue w;
   2227                 if ((int)(c >> AC_SHIFT) + parallelism > 0)
   2228                     return false;
   2229                 if ((ws = workQueues) != null) {
   2230                     for (int i = 0; i < ws.length; ++i) {
   2231                         if ((w = ws[i]) != null &&
   2232                             (!w.isEmpty() ||
   2233                              ((i & 1) != 0 && w.eventCount >= 0))) {
   2234                             signalWork(ws, w);
   2235                             return false;
   2236                         }
   2237                     }
   2238                 }
   2239             }
   2240             if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
   2241                 for (int pass = 0; pass < 3; ++pass) {
   2242                     WorkQueue[] ws; WorkQueue w; Thread wt;
   2243                     if ((ws = workQueues) != null) {
   2244                         int n = ws.length;
   2245                         for (int i = 0; i < n; ++i) {
   2246                             if ((w = ws[i]) != null) {
   2247                                 w.qlock = -1;
   2248                                 if (pass > 0) {
   2249                                     w.cancelAll();
   2250                                     if (pass > 1 && (wt = w.owner) != null) {
   2251                                         if (!wt.isInterrupted()) {
   2252                                             try {
   2253                                                 wt.interrupt();
   2254                                             } catch (Throwable ignore) {
   2255                                             }
   2256                                         }
   2257                                         U.unpark(wt);
   2258                                     }
   2259                                 }
   2260                             }
   2261                         }
   2262                         // Wake up workers parked on event queue
   2263                         int i, e; long cc; Thread p;
   2264                         while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
   2265                                (i = e & SMASK) < n && i >= 0 &&
   2266                                (w = ws[i]) != null) {
   2267                             long nc = ((long)(w.nextWait & E_MASK) |
   2268                                        ((cc + AC_UNIT) & AC_MASK) |
   2269                                        (cc & (TC_MASK|STOP_BIT)));
   2270                             if (w.eventCount == (e | INT_SIGN) &&
   2271                                 U.compareAndSwapLong(this, CTL, cc, nc)) {
   2272                                 w.eventCount = (e + E_SEQ) & E_MASK;
   2273                                 w.qlock = -1;
   2274                                 if ((p = w.parker) != null)
   2275                                     U.unpark(p);
   2276                             }
   2277                         }
   2278                     }
   2279                 }
   2280             }
   2281         }
   2282     }
   2283 
   2284     // external operations on common pool
   2285 
   2286     /**
   2287      * Returns common pool queue for a thread that has submitted at
   2288      * least one task.
   2289      */
   2290     static WorkQueue commonSubmitterQueue() {
   2291         Submitter z; ForkJoinPool p; WorkQueue[] ws; int m, r;
   2292         return ((z = submitters.get()) != null &&
   2293                 (p = common) != null &&
   2294                 (ws = p.workQueues) != null &&
   2295                 (m = ws.length - 1) >= 0) ?
   2296             ws[m & z.seed & SQMASK] : null;
   2297     }
   2298 
   2299     /**
   2300      * Tries to pop the given task from submitter's queue in common pool.
   2301      */
   2302     final boolean tryExternalUnpush(ForkJoinTask<?> task) {
   2303         WorkQueue joiner; ForkJoinTask<?>[] a; int m, s;
   2304         Submitter z = submitters.get();
   2305         WorkQueue[] ws = workQueues;
   2306         boolean popped = false;
   2307         if (z != null && ws != null && (m = ws.length - 1) >= 0 &&
   2308             (joiner = ws[z.seed & m & SQMASK]) != null &&
   2309             joiner.base != (s = joiner.top) &&
   2310             (a = joiner.array) != null) {
   2311             long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
   2312             if (U.getObject(a, j) == task &&
   2313                 U.compareAndSwapInt(joiner, QLOCK, 0, 1)) {
   2314                 if (joiner.top == s && joiner.array == a &&
   2315                     U.compareAndSwapObject(a, j, task, null)) {
   2316                     joiner.top = s - 1;
   2317                     popped = true;
   2318                 }
   2319                 joiner.qlock = 0;
   2320             }
   2321         }
   2322         return popped;
   2323     }
   2324 
   2325     final int externalHelpComplete(CountedCompleter<?> task) {
   2326         WorkQueue joiner; int m, j;
   2327         Submitter z = submitters.get();
   2328         WorkQueue[] ws = workQueues;
   2329         int s = 0;
   2330         if (z != null && ws != null && (m = ws.length - 1) >= 0 &&
   2331             (joiner = ws[(j = z.seed) & m & SQMASK]) != null && task != null) {
   2332             int scans = m + m + 1;
   2333             long c = 0L;             // for stability check
   2334             j |= 1;                  // poll odd queues
   2335             for (int k = scans; ; j += 2) {
   2336                 WorkQueue q;
   2337                 if ((s = task.status) < 0)
   2338                     break;
   2339                 else if (joiner.externalPopAndExecCC(task))
   2340                     k = scans;
   2341                 else if ((s = task.status) < 0)
   2342                     break;
   2343                 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task))
   2344                     k = scans;
   2345                 else if (--k < 0) {
   2346                     if (c == (c = ctl))
   2347                         break;
   2348                     k = scans;
   2349                 }
   2350             }
   2351         }
   2352         return s;
   2353     }
   2354 
   2355     // Exported methods
   2356 
   2357     // Constructors
   2358 
   2359     /**
   2360      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
   2361      * java.lang.Runtime#availableProcessors}, using the {@linkplain
   2362      * #defaultForkJoinWorkerThreadFactory default thread factory},
   2363      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
   2364      */
   2365     public ForkJoinPool() {
   2366         this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
   2367              defaultForkJoinWorkerThreadFactory, null, false);
   2368     }
   2369 
   2370     /**
   2371      * Creates a {@code ForkJoinPool} with the indicated parallelism
   2372      * level, the {@linkplain
   2373      * #defaultForkJoinWorkerThreadFactory default thread factory},
   2374      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
   2375      *
   2376      * @param parallelism the parallelism level
   2377      * @throws IllegalArgumentException if parallelism less than or
   2378      *         equal to zero, or greater than implementation limit
   2379      */
   2380     public ForkJoinPool(int parallelism) {
   2381         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
   2382     }
   2383 
   2384     /**
   2385      * Creates a {@code ForkJoinPool} with the given parameters.
   2386      *
   2387      * @param parallelism the parallelism level. For default value,
   2388      * use {@link java.lang.Runtime#availableProcessors}.
   2389      * @param factory the factory for creating new threads. For default value,
   2390      * use {@link #defaultForkJoinWorkerThreadFactory}.
   2391      * @param handler the handler for internal worker threads that
   2392      * terminate due to unrecoverable errors encountered while executing
   2393      * tasks. For default value, use {@code null}.
   2394      * @param asyncMode if true,
   2395      * establishes local first-in-first-out scheduling mode for forked
   2396      * tasks that are never joined. This mode may be more appropriate
   2397      * than default locally stack-based mode in applications in which
   2398      * worker threads only process event-style asynchronous tasks.
   2399      * For default value, use {@code false}.
   2400      * @throws IllegalArgumentException if parallelism less than or
   2401      *         equal to zero, or greater than implementation limit
   2402      * @throws NullPointerException if the factory is null
   2403      */
   2404     public ForkJoinPool(int parallelism,
   2405                         ForkJoinWorkerThreadFactory factory,
   2406                         UncaughtExceptionHandler handler,
   2407                         boolean asyncMode) {
   2408         this(checkParallelism(parallelism),
   2409              checkFactory(factory),
   2410              handler,
   2411              (asyncMode ? FIFO_QUEUE : LIFO_QUEUE),
   2412              "ForkJoinPool-" + nextPoolId() + "-worker-");
   2413         checkPermission();
   2414     }
   2415 
   2416     private static int checkParallelism(int parallelism) {
   2417         if (parallelism <= 0 || parallelism > MAX_CAP)
   2418             throw new IllegalArgumentException();
   2419         return parallelism;
   2420     }
   2421 
   2422     private static ForkJoinWorkerThreadFactory checkFactory
   2423         (ForkJoinWorkerThreadFactory factory) {
   2424         if (factory == null)
   2425             throw new NullPointerException();
   2426         return factory;
   2427     }
   2428 
   2429     /**
   2430      * Creates a {@code ForkJoinPool} with the given parameters, without
   2431      * any security checks or parameter validation.  Invoked directly by
   2432      * makeCommonPool.
   2433      */
   2434     private ForkJoinPool(int parallelism,
   2435                          ForkJoinWorkerThreadFactory factory,
   2436                          UncaughtExceptionHandler handler,
   2437                          int mode,
   2438                          String workerNamePrefix) {
   2439         this.workerNamePrefix = workerNamePrefix;
   2440         this.factory = factory;
   2441         this.ueh = handler;
   2442         this.mode = (short)mode;
   2443         this.parallelism = (short)parallelism;
   2444         long np = (long)(-parallelism); // offset ctl counts
   2445         this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
   2446     }
   2447 
   2448     /**
   2449      * Returns the common pool instance. This pool is statically
   2450      * constructed; its run state is unaffected by attempts to {@link
   2451      * #shutdown} or {@link #shutdownNow}. However this pool and any
   2452      * ongoing processing are automatically terminated upon program
   2453      * {@link System#exit}.  Any program that relies on asynchronous
   2454      * task processing to complete before program termination should
   2455      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
   2456      * before exit.
   2457      *
   2458      * @return the common pool instance
   2459      * @since 1.8
   2460      * @hide
   2461      */
   2462     public static ForkJoinPool commonPool() {
   2463         // assert common != null : "static init error";
   2464         return common;
   2465     }
   2466 
   2467     // Execution methods
   2468 
   2469     /**
   2470      * Performs the given task, returning its result upon completion.
   2471      * If the computation encounters an unchecked Exception or Error,
   2472      * it is rethrown as the outcome of this invocation.  Rethrown
   2473      * exceptions behave in the same way as regular exceptions, but,
   2474      * when possible, contain stack traces (as displayed for example
   2475      * using {@code ex.printStackTrace()}) of both the current thread
   2476      * as well as the thread actually encountering the exception;
   2477      * minimally only the latter.
   2478      *
   2479      * @param task the task
   2480      * @return the task's result
   2481      * @throws NullPointerException if the task is null
   2482      * @throws RejectedExecutionException if the task cannot be
   2483      *         scheduled for execution
   2484      */
   2485     public <T> T invoke(ForkJoinTask<T> task) {
   2486         if (task == null)
   2487             throw new NullPointerException();
   2488         externalPush(task);
   2489         return task.join();
   2490     }
   2491 
   2492     /**
   2493      * Arranges for (asynchronous) execution of the given task.
   2494      *
   2495      * @param task the task
   2496      * @throws NullPointerException if the task is null
   2497      * @throws RejectedExecutionException if the task cannot be
   2498      *         scheduled for execution
   2499      */
   2500     public void execute(ForkJoinTask<?> task) {
   2501         if (task == null)
   2502             throw new NullPointerException();
   2503         externalPush(task);
   2504     }
   2505 
   2506     // AbstractExecutorService methods
   2507 
   2508     /**
   2509      * @throws NullPointerException if the task is null
   2510      * @throws RejectedExecutionException if the task cannot be
   2511      *         scheduled for execution
   2512      */
   2513     public void execute(Runnable task) {
   2514         if (task == null)
   2515             throw new NullPointerException();
   2516         ForkJoinTask<?> job;
   2517         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
   2518             job = (ForkJoinTask<?>) task;
   2519         else
   2520             job = new ForkJoinTask.RunnableExecuteAction(task);
   2521         externalPush(job);
   2522     }
   2523 
   2524     /**
   2525      * Submits a ForkJoinTask for execution.
   2526      *
   2527      * @param task the task to submit
   2528      * @return the task
   2529      * @throws NullPointerException if the task is null
   2530      * @throws RejectedExecutionException if the task cannot be
   2531      *         scheduled for execution
   2532      */
   2533     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
   2534         if (task == null)
   2535             throw new NullPointerException();
   2536         externalPush(task);
   2537         return task;
   2538     }
   2539 
   2540     /**
   2541      * @throws NullPointerException if the task is null
   2542      * @throws RejectedExecutionException if the task cannot be
   2543      *         scheduled for execution
   2544      */
   2545     public <T> ForkJoinTask<T> submit(Callable<T> task) {
   2546         ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
   2547         externalPush(job);
   2548         return job;
   2549     }
   2550 
   2551     /**
   2552      * @throws NullPointerException if the task is null
   2553      * @throws RejectedExecutionException if the task cannot be
   2554      *         scheduled for execution
   2555      */
   2556     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
   2557         ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
   2558         externalPush(job);
   2559         return job;
   2560     }
   2561 
   2562     /**
   2563      * @throws NullPointerException if the task is null
   2564      * @throws RejectedExecutionException if the task cannot be
   2565      *         scheduled for execution
   2566      */
   2567     public ForkJoinTask<?> submit(Runnable task) {
   2568         if (task == null)
   2569             throw new NullPointerException();
   2570         ForkJoinTask<?> job;
   2571         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
   2572             job = (ForkJoinTask<?>) task;
   2573         else
   2574             job = new ForkJoinTask.AdaptedRunnableAction(task);
   2575         externalPush(job);
   2576         return job;
   2577     }
   2578 
   2579     /**
   2580      * @throws NullPointerException       {@inheritDoc}
   2581      * @throws RejectedExecutionException {@inheritDoc}
   2582      */
   2583     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
   2584         // In previous versions of this class, this method constructed
   2585         // a task to run ForkJoinTask.invokeAll, but now external
   2586         // invocation of multiple tasks is at least as efficient.
   2587         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
   2588 
   2589         boolean done = false;
   2590         try {
   2591             for (Callable<T> t : tasks) {
   2592                 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
   2593                 futures.add(f);
   2594                 externalPush(f);
   2595             }
   2596             for (int i = 0, size = futures.size(); i < size; i++)
   2597                 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
   2598             done = true;
   2599             return futures;
   2600         } finally {
   2601             if (!done)
   2602                 for (int i = 0, size = futures.size(); i < size; i++)
   2603                     futures.get(i).cancel(false);
   2604         }
   2605     }
   2606 
   2607     /**
   2608      * Returns the factory used for constructing new workers.
   2609      *
   2610      * @return the factory used for constructing new workers
   2611      */
   2612     public ForkJoinWorkerThreadFactory getFactory() {
   2613         return factory;
   2614     }
   2615 
   2616     /**
   2617      * Returns the handler for internal worker threads that terminate
   2618      * due to unrecoverable errors encountered while executing tasks.
   2619      *
   2620      * @return the handler, or {@code null} if none
   2621      */
   2622     public UncaughtExceptionHandler getUncaughtExceptionHandler() {
   2623         return ueh;
   2624     }
   2625 
   2626     /**
   2627      * Returns the targeted parallelism level of this pool.
   2628      *
   2629      * @return the targeted parallelism level of this pool
   2630      */
   2631     public int getParallelism() {
   2632         int par;
   2633         return ((par = parallelism) > 0) ? par : 1;
   2634     }
   2635 
   2636     /**
   2637      * Returns the targeted parallelism level of the common pool.
   2638      *
   2639      * @return the targeted parallelism level of the common pool
   2640      * @since 1.8
   2641      * @hide
   2642      */
   2643     public static int getCommonPoolParallelism() {
   2644         return commonParallelism;
   2645     }
   2646 
   2647     /**
   2648      * Returns the number of worker threads that have started but not
   2649      * yet terminated.  The result returned by this method may differ
   2650      * from {@link #getParallelism} when threads are created to
   2651      * maintain parallelism when others are cooperatively blocked.
   2652      *
   2653      * @return the number of worker threads
   2654      */
   2655     public int getPoolSize() {
   2656         return parallelism + (short)(ctl >>> TC_SHIFT);
   2657     }
   2658 
   2659     /**
   2660      * Returns {@code true} if this pool uses local first-in-first-out
   2661      * scheduling mode for forked tasks that are never joined.
   2662      *
   2663      * @return {@code true} if this pool uses async mode
   2664      */
   2665     public boolean getAsyncMode() {
   2666         return mode == FIFO_QUEUE;
   2667     }
   2668 
   2669     /**
   2670      * Returns an estimate of the number of worker threads that are
   2671      * not blocked waiting to join tasks or for other managed
   2672      * synchronization. This method may overestimate the
   2673      * number of running threads.
   2674      *
   2675      * @return the number of worker threads
   2676      */
   2677     public int getRunningThreadCount() {
   2678         int rc = 0;
   2679         WorkQueue[] ws; WorkQueue w;
   2680         if ((ws = workQueues) != null) {
   2681             for (int i = 1; i < ws.length; i += 2) {
   2682                 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
   2683                     ++rc;
   2684             }
   2685         }
   2686         return rc;
   2687     }
   2688 
   2689     /**
   2690      * Returns an estimate of the number of threads that are currently
   2691      * stealing or executing tasks. This method may overestimate the
   2692      * number of active threads.
   2693      *
   2694      * @return the number of active threads
   2695      */
   2696     public int getActiveThreadCount() {
   2697         int r = parallelism + (int)(ctl >> AC_SHIFT);
   2698         return (r <= 0) ? 0 : r; // suppress momentarily negative values
   2699     }
   2700 
   2701     /**
   2702      * Returns {@code true} if all worker threads are currently idle.
   2703      * An idle worker is one that cannot obtain a task to execute
   2704      * because none are available to steal from other threads, and
   2705      * there are no pending submissions to the pool. This method is
   2706      * conservative; it might not return {@code true} immediately upon
   2707      * idleness of all threads, but will eventually become true if
   2708      * threads remain inactive.
   2709      *
   2710      * @return {@code true} if all threads are currently idle
   2711      */
   2712     public boolean isQuiescent() {
   2713         return parallelism + (int)(ctl >> AC_SHIFT) <= 0;
   2714     }
   2715 
   2716     /**
   2717      * Returns an estimate of the total number of tasks stolen from
   2718      * one thread's work queue by another. The reported value
   2719      * underestimates the actual total number of steals when the pool
   2720      * is not quiescent. This value may be useful for monitoring and
   2721      * tuning fork/join programs: in general, steal counts should be
   2722      * high enough to keep threads busy, but low enough to avoid
   2723      * overhead and contention across threads.
   2724      *
   2725      * @return the number of steals
   2726      */
   2727     public long getStealCount() {
   2728         long count = stealCount;
   2729         WorkQueue[] ws; WorkQueue w;
   2730         if ((ws = workQueues) != null) {
   2731             for (int i = 1; i < ws.length; i += 2) {
   2732                 if ((w = ws[i]) != null)
   2733                     count += w.nsteals;
   2734             }
   2735         }
   2736         return count;
   2737     }
   2738 
   2739     /**
   2740      * Returns an estimate of the total number of tasks currently held
   2741      * in queues by worker threads (but not including tasks submitted
   2742      * to the pool that have not begun executing). This value is only
   2743      * an approximation, obtained by iterating across all threads in
   2744      * the pool. This method may be useful for tuning task
   2745      * granularities.
   2746      *
   2747      * @return the number of queued tasks
   2748      */
   2749     public long getQueuedTaskCount() {
   2750         long count = 0;
   2751         WorkQueue[] ws; WorkQueue w;
   2752         if ((ws = workQueues) != null) {
   2753             for (int i = 1; i < ws.length; i += 2) {
   2754                 if ((w = ws[i]) != null)
   2755                     count += w.queueSize();
   2756             }
   2757         }
   2758         return count;
   2759     }
   2760 
   2761     /**
   2762      * Returns an estimate of the number of tasks submitted to this
   2763      * pool that have not yet begun executing.  This method may take
   2764      * time proportional to the number of submissions.
   2765      *
   2766      * @return the number of queued submissions
   2767      */
   2768     public int getQueuedSubmissionCount() {
   2769         int count = 0;
   2770         WorkQueue[] ws; WorkQueue w;
   2771         if ((ws = workQueues) != null) {
   2772             for (int i = 0; i < ws.length; i += 2) {
   2773                 if ((w = ws[i]) != null)
   2774                     count += w.queueSize();
   2775             }
   2776         }
   2777         return count;
   2778     }
   2779 
   2780     /**
   2781      * Returns {@code true} if there are any tasks submitted to this
   2782      * pool that have not yet begun executing.
   2783      *
   2784      * @return {@code true} if there are any queued submissions
   2785      */
   2786     public boolean hasQueuedSubmissions() {
   2787         WorkQueue[] ws; WorkQueue w;
   2788         if ((ws = workQueues) != null) {
   2789             for (int i = 0; i < ws.length; i += 2) {
   2790                 if ((w = ws[i]) != null && !w.isEmpty())
   2791                     return true;
   2792             }
   2793         }
   2794         return false;
   2795     }
   2796 
   2797     /**
   2798      * Removes and returns the next unexecuted submission if one is
   2799      * available.  This method may be useful in extensions to this
   2800      * class that re-assign work in systems with multiple pools.
   2801      *
   2802      * @return the next submission, or {@code null} if none
   2803      */
   2804     protected ForkJoinTask<?> pollSubmission() {
   2805         WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
   2806         if ((ws = workQueues) != null) {
   2807             for (int i = 0; i < ws.length; i += 2) {
   2808                 if ((w = ws[i]) != null && (t = w.poll()) != null)
   2809                     return t;
   2810             }
   2811         }
   2812         return null;
   2813     }
   2814 
   2815     /**
   2816      * Removes all available unexecuted submitted and forked tasks
   2817      * from scheduling queues and adds them to the given collection,
   2818      * without altering their execution status. These may include
   2819      * artificially generated or wrapped tasks. This method is
   2820      * designed to be invoked only when the pool is known to be
   2821      * quiescent. Invocations at other times may not remove all
   2822      * tasks. A failure encountered while attempting to add elements
   2823      * to collection {@code c} may result in elements being in
   2824      * neither, either or both collections when the associated
   2825      * exception is thrown.  The behavior of this operation is
   2826      * undefined if the specified collection is modified while the
   2827      * operation is in progress.
   2828      *
   2829      * @param c the collection to transfer elements into
   2830      * @return the number of elements transferred
   2831      */
   2832     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
   2833         int count = 0;
   2834         WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
   2835         if ((ws = workQueues) != null) {
   2836             for (int i = 0; i < ws.length; ++i) {
   2837                 if ((w = ws[i]) != null) {
   2838                     while ((t = w.poll()) != null) {
   2839                         c.add(t);
   2840                         ++count;
   2841                     }
   2842                 }
   2843             }
   2844         }
   2845         return count;
   2846     }
   2847 
   2848     /**
   2849      * Returns a string identifying this pool, as well as its state,
   2850      * including indications of run state, parallelism level, and
   2851      * worker and task counts.
   2852      *
   2853      * @return a string identifying this pool, as well as its state
   2854      */
   2855     public String toString() {
   2856         // Use a single pass through workQueues to collect counts
   2857         long qt = 0L, qs = 0L; int rc = 0;
   2858         long st = stealCount;
   2859         long c = ctl;
   2860         WorkQueue[] ws; WorkQueue w;
   2861         if ((ws = workQueues) != null) {
   2862             for (int i = 0; i < ws.length; ++i) {
   2863                 if ((w = ws[i]) != null) {
   2864                     int size = w.queueSize();
   2865                     if ((i & 1) == 0)
   2866                         qs += size;
   2867                     else {
   2868                         qt += size;
   2869                         st += w.nsteals;
   2870                         if (w.isApparentlyUnblocked())
   2871                             ++rc;
   2872                     }
   2873                 }
   2874             }
   2875         }
   2876         int pc = parallelism;
   2877         int tc = pc + (short)(c >>> TC_SHIFT);
   2878         int ac = pc + (int)(c >> AC_SHIFT);
   2879         if (ac < 0) // ignore transient negative
   2880             ac = 0;
   2881         String level;
   2882         if ((c & STOP_BIT) != 0)
   2883             level = (tc == 0) ? "Terminated" : "Terminating";
   2884         else
   2885             level = plock < 0 ? "Shutting down" : "Running";
   2886         return super.toString() +
   2887             "[" + level +
   2888             ", parallelism = " + pc +
   2889             ", size = " + tc +
   2890             ", active = " + ac +
   2891             ", running = " + rc +
   2892             ", steals = " + st +
   2893             ", tasks = " + qt +
   2894             ", submissions = " + qs +
   2895             "]";
   2896     }
   2897 
   2898     /**
   2899      * Possibly initiates an orderly shutdown in which previously
   2900      * submitted tasks are executed, but no new tasks will be
   2901      * accepted. Invocation has no effect on execution state if this
   2902      * is the {@code commonPool()}, and no additional effect if
   2903      * already shut down.  Tasks that are in the process of being
   2904      * submitted concurrently during the course of this method may or
   2905      * may not be rejected.
   2906      */
   2907     public void shutdown() {
   2908         checkPermission();
   2909         tryTerminate(false, true);
   2910     }
   2911 
   2912     /**
   2913      * Possibly attempts to cancel and/or stop all tasks, and reject
   2914      * all subsequently submitted tasks.  Invocation has no effect on
   2915      * execution state if this is the {@code commonPool()}, and no
   2916      * additional effect if already shut down. Otherwise, tasks that
   2917      * are in the process of being submitted or executed concurrently
   2918      * during the course of this method may or may not be
   2919      * rejected. This method cancels both existing and unexecuted
   2920      * tasks, in order to permit termination in the presence of task
   2921      * dependencies. So the method always returns an empty list
   2922      * (unlike the case for some other Executors).
   2923      *
   2924      * @return an empty list
   2925      */
   2926     public List<Runnable> shutdownNow() {
   2927         checkPermission();
   2928         tryTerminate(true, true);
   2929         return Collections.emptyList();
   2930     }
   2931 
   2932     /**
   2933      * Returns {@code true} if all tasks have completed following shut down.
   2934      *
   2935      * @return {@code true} if all tasks have completed following shut down
   2936      */
   2937     public boolean isTerminated() {
   2938         long c = ctl;
   2939         return ((c & STOP_BIT) != 0L &&
   2940                 (short)(c >>> TC_SHIFT) + parallelism <= 0);
   2941     }
   2942 
   2943     /**
   2944      * Returns {@code true} if the process of termination has
   2945      * commenced but not yet completed.  This method may be useful for
   2946      * debugging. A return of {@code true} reported a sufficient
   2947      * period after shutdown may indicate that submitted tasks have
   2948      * ignored or suppressed interruption, or are waiting for I/O,
   2949      * causing this executor not to properly terminate. (See the
   2950      * advisory notes for class {@link ForkJoinTask} stating that
   2951      * tasks should not normally entail blocking operations.  But if
   2952      * they do, they must abort them on interrupt.)
   2953      *
   2954      * @return {@code true} if terminating but not yet terminated
   2955      */
   2956     public boolean isTerminating() {
   2957         long c = ctl;
   2958         return ((c & STOP_BIT) != 0L &&
   2959                 (short)(c >>> TC_SHIFT) + parallelism > 0);
   2960     }
   2961 
   2962     /**
   2963      * Returns {@code true} if this pool has been shut down.
   2964      *
   2965      * @return {@code true} if this pool has been shut down
   2966      */
   2967     public boolean isShutdown() {
   2968         return plock < 0;
   2969     }
   2970 
   2971     /**
   2972      * Blocks until all tasks have completed execution after a
   2973      * shutdown request, or the timeout occurs, or the current thread
   2974      * is interrupted, whichever happens first. Because the {@code
   2975      * commonPool()} never terminates until program shutdown, when
   2976      * applied to the common pool, this method is equivalent to {@link
   2977      * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
   2978      *
   2979      * @param timeout the maximum time to wait
   2980      * @param unit the time unit of the timeout argument
   2981      * @return {@code true} if this executor terminated and
   2982      *         {@code false} if the timeout elapsed before termination
   2983      * @throws InterruptedException if interrupted while waiting
   2984      */
   2985     public boolean awaitTermination(long timeout, TimeUnit unit)
   2986         throws InterruptedException {
   2987         if (Thread.interrupted())
   2988             throw new InterruptedException();
   2989         if (this == common) {
   2990             awaitQuiescence(timeout, unit);
   2991             return false;
   2992         }
   2993         long nanos = unit.toNanos(timeout);
   2994         if (isTerminated())
   2995             return true;
   2996         if (nanos <= 0L)
   2997             return false;
   2998         long deadline = System.nanoTime() + nanos;
   2999         synchronized (this) {
   3000             for (;;) {
   3001                 if (isTerminated())
   3002                     return true;
   3003                 if (nanos <= 0L)
   3004                     return false;
   3005                 long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
   3006                 wait(millis > 0L ? millis : 1L);
   3007                 nanos = deadline - System.nanoTime();
   3008             }
   3009         }
   3010     }
   3011 
   3012     /**
   3013      * If called by a ForkJoinTask operating in this pool, equivalent
   3014      * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
   3015      * waits and/or attempts to assist performing tasks until this
   3016      * pool {@link #isQuiescent} or the indicated timeout elapses.
   3017      *
   3018      * @param timeout the maximum time to wait
   3019      * @param unit the time unit of the timeout argument
   3020      * @return {@code true} if quiescent; {@code false} if the
   3021      * timeout elapsed.
   3022      */
   3023     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
   3024         long nanos = unit.toNanos(timeout);
   3025         ForkJoinWorkerThread wt;
   3026         Thread thread = Thread.currentThread();
   3027         if ((thread instanceof ForkJoinWorkerThread) &&
   3028             (wt = (ForkJoinWorkerThread)thread).pool == this) {
   3029             helpQuiescePool(wt.workQueue);
   3030             return true;
   3031         }
   3032         long startTime = System.nanoTime();
   3033         WorkQueue[] ws;
   3034         int r = 0, m;
   3035         boolean found = true;
   3036         while (!isQuiescent() && (ws = workQueues) != null &&
   3037                (m = ws.length - 1) >= 0) {
   3038             if (!found) {
   3039                 if ((System.nanoTime() - startTime) > nanos)
   3040                     return false;
   3041                 Thread.yield(); // cannot block
   3042             }
   3043             found = false;
   3044             for (int j = (m + 1) << 2; j >= 0; --j) {
   3045                 ForkJoinTask<?> t; WorkQueue q; int b;
   3046                 if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
   3047                     found = true;
   3048                     if ((t = q.pollAt(b)) != null)
   3049                         t.doExec();
   3050                     break;
   3051                 }
   3052             }
   3053         }
   3054         return true;
   3055     }
   3056 
   3057     /**
   3058      * Waits and/or attempts to assist performing tasks indefinitely
   3059      * until the {@code commonPool()} {@link #isQuiescent}.
   3060      */
   3061     static void quiesceCommonPool() {
   3062         common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
   3063     }
   3064 
   3065     /**
   3066      * Interface for extending managed parallelism for tasks running
   3067      * in {@link ForkJoinPool}s.
   3068      *
   3069      * <p>A {@code ManagedBlocker} provides two methods.  Method
   3070      * {@code isReleasable} must return {@code true} if blocking is
   3071      * not necessary. Method {@code block} blocks the current thread
   3072      * if necessary (perhaps internally invoking {@code isReleasable}
   3073      * before actually blocking). These actions are performed by any
   3074      * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
   3075      * The unusual methods in this API accommodate synchronizers that
   3076      * may, but don't usually, block for long periods. Similarly, they
   3077      * allow more efficient internal handling of cases in which
   3078      * additional workers may be, but usually are not, needed to
   3079      * ensure sufficient parallelism.  Toward this end,
   3080      * implementations of method {@code isReleasable} must be amenable
   3081      * to repeated invocation.
   3082      *
   3083      * <p>For example, here is a ManagedBlocker based on a
   3084      * ReentrantLock:
   3085      *  <pre> {@code
   3086      * class ManagedLocker implements ManagedBlocker {
   3087      *   final ReentrantLock lock;
   3088      *   boolean hasLock = false;
   3089      *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
   3090      *   public boolean block() {
   3091      *     if (!hasLock)
   3092      *       lock.lock();
   3093      *     return true;
   3094      *   }
   3095      *   public boolean isReleasable() {
   3096      *     return hasLock || (hasLock = lock.tryLock());
   3097      *   }
   3098      * }}</pre>
   3099      *
   3100      * <p>Here is a class that possibly blocks waiting for an
   3101      * item on a given queue:
   3102      *  <pre> {@code
   3103      * class QueueTaker<E> implements ManagedBlocker {
   3104      *   final BlockingQueue<E> queue;
   3105      *   volatile E item = null;
   3106      *   QueueTaker(BlockingQueue<E> q) { this.queue = q; }
   3107      *   public boolean block() throws InterruptedException {
   3108      *     if (item == null)
   3109      *       item = queue.take();
   3110      *     return true;
   3111      *   }
   3112      *   public boolean isReleasable() {
   3113      *     return item != null || (item = queue.poll()) != null;
   3114      *   }
   3115      *   public E getItem() { // call after pool.managedBlock completes
   3116      *     return item;
   3117      *   }
   3118      * }}</pre>
   3119      */
   3120     public static interface ManagedBlocker {
   3121         /**
   3122          * Possibly blocks the current thread, for example waiting for
   3123          * a lock or condition.
   3124          *
   3125          * @return {@code true} if no additional blocking is necessary
   3126          * (i.e., if isReleasable would return true)
   3127          * @throws InterruptedException if interrupted while waiting
   3128          * (the method is not required to do so, but is allowed to)
   3129          */
   3130         boolean block() throws InterruptedException;
   3131 
   3132         /**
   3133          * Returns {@code true} if blocking is unnecessary.
   3134          * @return {@code true} if blocking is unnecessary
   3135          */
   3136         boolean isReleasable();
   3137     }
   3138 
   3139     /**
   3140      * Blocks in accord with the given blocker.  If the current thread
   3141      * is a {@link ForkJoinWorkerThread}, this method possibly
   3142      * arranges for a spare thread to be activated if necessary to
   3143      * ensure sufficient parallelism while the current thread is blocked.
   3144      *
   3145      * <p>If the caller is not a {@link ForkJoinTask}, this method is
   3146      * behaviorally equivalent to
   3147      *  <pre> {@code
   3148      * while (!blocker.isReleasable())
   3149      *   if (blocker.block())
   3150      *     return;
   3151      * }</pre>
   3152      *
   3153      * If the caller is a {@code ForkJoinTask}, then the pool may
   3154      * first be expanded to ensure parallelism, and later adjusted.
   3155      *
   3156      * @param blocker the blocker
   3157      * @throws InterruptedException if blocker.block did so
   3158      */
   3159     public static void managedBlock(ManagedBlocker blocker)
   3160         throws InterruptedException {
   3161         Thread t = Thread.currentThread();
   3162         if (t instanceof ForkJoinWorkerThread) {
   3163             ForkJoinPool p = ((ForkJoinWorkerThread)t).pool;
   3164             while (!blocker.isReleasable()) {
   3165                 if (p.tryCompensate(p.ctl)) {
   3166                     try {
   3167                         do {} while (!blocker.isReleasable() &&
   3168                                      !blocker.block());
   3169                     } finally {
   3170                         p.incrementActiveCount();
   3171                     }
   3172                     break;
   3173                 }
   3174             }
   3175         }
   3176         else {
   3177             do {} while (!blocker.isReleasable() &&
   3178                          !blocker.block());
   3179         }
   3180     }
   3181 
   3182     // AbstractExecutorService overrides.  These rely on undocumented
   3183     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
   3184     // implement RunnableFuture.
   3185 
   3186     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
   3187         return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
   3188     }
   3189 
   3190     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
   3191         return new ForkJoinTask.AdaptedCallable<T>(callable);
   3192     }
   3193 
   3194     // Unsafe mechanics
   3195     private static final sun.misc.Unsafe U;
   3196     private static final long CTL;
   3197     private static final long PARKBLOCKER;
   3198     private static final int ABASE;
   3199     private static final int ASHIFT;
   3200     private static final long STEALCOUNT;
   3201     private static final long PLOCK;
   3202     private static final long INDEXSEED;
   3203     private static final long QBASE;
   3204     private static final long QLOCK;
   3205 
   3206     static {
   3207         // initialize field offsets for CAS etc
   3208         try {
   3209             U = sun.misc.Unsafe.getUnsafe();
   3210             Class<?> k = ForkJoinPool.class;
   3211             CTL = U.objectFieldOffset
   3212                 (k.getDeclaredField("ctl"));
   3213             STEALCOUNT = U.objectFieldOffset
   3214                 (k.getDeclaredField("stealCount"));
   3215             PLOCK = U.objectFieldOffset
   3216                 (k.getDeclaredField("plock"));
   3217             INDEXSEED = U.objectFieldOffset
   3218                 (k.getDeclaredField("indexSeed"));
   3219             Class<?> tk = Thread.class;
   3220             PARKBLOCKER = U.objectFieldOffset
   3221                 (tk.getDeclaredField("parkBlocker"));
   3222             Class<?> wk = WorkQueue.class;
   3223             QBASE = U.objectFieldOffset
   3224                 (wk.getDeclaredField("base"));
   3225             QLOCK = U.objectFieldOffset
   3226                 (wk.getDeclaredField("qlock"));
   3227             Class<?> ak = ForkJoinTask[].class;
   3228             ABASE = U.arrayBaseOffset(ak);
   3229             int scale = U.arrayIndexScale(ak);
   3230             if ((scale & (scale - 1)) != 0)
   3231                 throw new Error("data type scale not a power of two");
   3232             ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
   3233         } catch (Exception e) {
   3234             throw new Error(e);
   3235         }
   3236 
   3237         submitters = new ThreadLocal<Submitter>();
   3238         defaultForkJoinWorkerThreadFactory =
   3239             new DefaultForkJoinWorkerThreadFactory();
   3240         modifyThreadPermission = new RuntimePermission("modifyThread");
   3241 
   3242         common = java.security.AccessController.doPrivileged
   3243             (new java.security.PrivilegedAction<ForkJoinPool>() {
   3244                 public ForkJoinPool run() { return makeCommonPool(); }});
   3245         int par = common.parallelism; // report 1 even if threads disabled
   3246         commonParallelism = par > 0 ? par : 1;
   3247     }
   3248 
   3249     /**
   3250      * Creates and returns the common pool, respecting user settings
   3251      * specified via system properties.
   3252      */
   3253     private static ForkJoinPool makeCommonPool() {
   3254         int parallelism = -1;
   3255         ForkJoinWorkerThreadFactory factory
   3256             = defaultForkJoinWorkerThreadFactory;
   3257         UncaughtExceptionHandler handler = null;
   3258         try {  // ignore exceptions in accessing/parsing properties
   3259             String pp = System.getProperty
   3260                 ("java.util.concurrent.ForkJoinPool.common.parallelism");
   3261             String fp = System.getProperty
   3262                 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
   3263             String hp = System.getProperty
   3264                 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
   3265             if (pp != null)
   3266                 parallelism = Integer.parseInt(pp);
   3267             if (fp != null)
   3268                 factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
   3269                            getSystemClassLoader().loadClass(fp).newInstance());
   3270             if (hp != null)
   3271                 handler = ((UncaughtExceptionHandler)ClassLoader.
   3272                            getSystemClassLoader().loadClass(hp).newInstance());
   3273         } catch (Exception ignore) {
   3274         }
   3275 
   3276         if (parallelism < 0 && // default 1 less than #cores
   3277             (parallelism = Runtime.getRuntime().availableProcessors() - 1) < 0)
   3278             parallelism = 0;
   3279         if (parallelism > MAX_CAP)
   3280             parallelism = MAX_CAP;
   3281         return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
   3282                                 "ForkJoinPool.commonPool-worker-");
   3283     }
   3284 
   3285 }
   3286