Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      3  *
      4  * This code is free software; you can redistribute it and/or modify it
      5  * under the terms of the GNU General Public License version 2 only, as
      6  * published by the Free Software Foundation.  Oracle designates this
      7  * particular file as subject to the "Classpath" exception as provided
      8  * by Oracle in the LICENSE file that accompanied this code.
      9  *
     10  * This code is distributed in the hope that it will be useful, but WITHOUT
     11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     13  * version 2 for more details (a copy is included in the LICENSE file that
     14  * accompanied this code).
     15  *
     16  * You should have received a copy of the GNU General Public License version
     17  * 2 along with this work; if not, write to the Free Software Foundation,
     18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     19  *
     20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     21  * or visit www.oracle.com if you need additional information or have any
     22  * questions.
     23  */
     24 
     25 /*
     26  * This file is available under and governed by the GNU General Public
     27  * License version 2 only, as published by the Free Software Foundation.
     28  * However, the following notice accompanied the original version of this
     29  * file:
     30  *
     31  * Written by Doug Lea with assistance from members of JCP JSR-166
     32  * Expert Group and released to the public domain, as explained at
     33  * http://creativecommons.org/publicdomain/zero/1.0/
     34  */
     35 
     36 package java.util.concurrent;
     37 
     38 import java.util.AbstractQueue;
     39 import java.util.Arrays;
     40 import java.util.Collection;
     41 import java.util.Iterator;
     42 import java.util.NoSuchElementException;
     43 import java.util.Queue;
     44 import java.util.Spliterator;
     45 import java.util.Spliterators;
     46 import java.util.concurrent.locks.LockSupport;
     47 import java.util.function.Consumer;
     48 
     49 // BEGIN android-note
     50 // removed link to collections framework docs
     51 // END android-note
     52 
     53 /**
     54  * An unbounded {@link TransferQueue} based on linked nodes.
     55  * This queue orders elements FIFO (first-in-first-out) with respect
     56  * to any given producer.  The <em>head</em> of the queue is that
     57  * element that has been on the queue the longest time for some
     58  * producer.  The <em>tail</em> of the queue is that element that has
     59  * been on the queue the shortest time for some producer.
     60  *
     61  * <p>Beware that, unlike in most collections, the {@code size} method
     62  * is <em>NOT</em> a constant-time operation. Because of the
     63  * asynchronous nature of these queues, determining the current number
     64  * of elements requires a traversal of the elements, and so may report
     65  * inaccurate results if this collection is modified during traversal.
     66  * Additionally, the bulk operations {@code addAll},
     67  * {@code removeAll}, {@code retainAll}, {@code containsAll},
     68  * {@code equals}, and {@code toArray} are <em>not</em> guaranteed
     69  * to be performed atomically. For example, an iterator operating
     70  * concurrently with an {@code addAll} operation might view only some
     71  * of the added elements.
     72  *
     73  * <p>This class and its iterator implement all of the
     74  * <em>optional</em> methods of the {@link Collection} and {@link
     75  * Iterator} interfaces.
     76  *
     77  * <p>Memory consistency effects: As with other concurrent
     78  * collections, actions in a thread prior to placing an object into a
     79  * {@code LinkedTransferQueue}
     80  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
     81  * actions subsequent to the access or removal of that element from
     82  * the {@code LinkedTransferQueue} in another thread.
     83  *
     84  * @since 1.7
     85  * @author Doug Lea
     86  * @param <E> the type of elements held in this queue
     87  */
     88 public class LinkedTransferQueue<E> extends AbstractQueue<E>
     89     implements TransferQueue<E>, java.io.Serializable {
     90     private static final long serialVersionUID = -3223113410248163686L;
     91 
     92     /*
     93      * *** Overview of Dual Queues with Slack ***
     94      *
     95      * Dual Queues, introduced by Scherer and Scott
     96      * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are
     97      * (linked) queues in which nodes may represent either data or
     98      * requests.  When a thread tries to enqueue a data node, but
     99      * encounters a request node, it instead "matches" and removes it;
    100      * and vice versa for enqueuing requests. Blocking Dual Queues
    101      * arrange that threads enqueuing unmatched requests block until
    102      * other threads provide the match. Dual Synchronous Queues (see
    103      * Scherer, Lea, & Scott
    104      * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
    105      * additionally arrange that threads enqueuing unmatched data also
    106      * block.  Dual Transfer Queues support all of these modes, as
    107      * dictated by callers.
    108      *
    109      * A FIFO dual queue may be implemented using a variation of the
    110      * Michael & Scott (M&S) lock-free queue algorithm
    111      * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf).
    112      * It maintains two pointer fields, "head", pointing to a
    113      * (matched) node that in turn points to the first actual
    114      * (unmatched) queue node (or null if empty); and "tail" that
    115      * points to the last node on the queue (or again null if
    116      * empty). For example, here is a possible queue with four data
    117      * elements:
    118      *
    119      *  head                tail
    120      *    |                   |
    121      *    v                   v
    122      *    M -> U -> U -> U -> U
    123      *
    124      * The M&S queue algorithm is known to be prone to scalability and
    125      * overhead limitations when maintaining (via CAS) these head and
    126      * tail pointers. This has led to the development of
    127      * contention-reducing variants such as elimination arrays (see
    128      * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and
    129      * optimistic back pointers (see Ladan-Mozes & Shavit
    130      * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf).
    131      * However, the nature of dual queues enables a simpler tactic for
    132      * improving M&S-style implementations when dual-ness is needed.
    133      *
    134      * In a dual queue, each node must atomically maintain its match
    135      * status. While there are other possible variants, we implement
    136      * this here as: for a data-mode node, matching entails CASing an
    137      * "item" field from a non-null data value to null upon match, and
    138      * vice-versa for request nodes, CASing from null to a data
    139      * value. (Note that the linearization properties of this style of
    140      * queue are easy to verify -- elements are made available by
    141      * linking, and unavailable by matching.) Compared to plain M&S
    142      * queues, this property of dual queues requires one additional
    143      * successful atomic operation per enq/deq pair. But it also
    144      * enables lower cost variants of queue maintenance mechanics. (A
    145      * variation of this idea applies even for non-dual queues that
    146      * support deletion of interior elements, such as
    147      * j.u.c.ConcurrentLinkedQueue.)
    148      *
    149      * Once a node is matched, its match status can never again
    150      * change.  We may thus arrange that the linked list of them
    151      * contain a prefix of zero or more matched nodes, followed by a
    152      * suffix of zero or more unmatched nodes. (Note that we allow
    153      * both the prefix and suffix to be zero length, which in turn
    154      * means that we do not use a dummy header.)  If we were not
    155      * concerned with either time or space efficiency, we could
    156      * correctly perform enqueue and dequeue operations by traversing
    157      * from a pointer to the initial node; CASing the item of the
    158      * first unmatched node on match and CASing the next field of the
    159      * trailing node on appends. (Plus some special-casing when
    160      * initially empty).  While this would be a terrible idea in
    161      * itself, it does have the benefit of not requiring ANY atomic
    162      * updates on head/tail fields.
    163      *
    164      * We introduce here an approach that lies between the extremes of
    165      * never versus always updating queue (head and tail) pointers.
    166      * This offers a tradeoff between sometimes requiring extra
    167      * traversal steps to locate the first and/or last unmatched
    168      * nodes, versus the reduced overhead and contention of fewer
    169      * updates to queue pointers. For example, a possible snapshot of
    170      * a queue is:
    171      *
    172      *  head           tail
    173      *    |              |
    174      *    v              v
    175      *    M -> M -> U -> U -> U -> U
    176      *
    177      * The best value for this "slack" (the targeted maximum distance
    178      * between the value of "head" and the first unmatched node, and
    179      * similarly for "tail") is an empirical matter. We have found
    180      * that using very small constants in the range of 1-3 work best
    181      * over a range of platforms. Larger values introduce increasing
    182      * costs of cache misses and risks of long traversal chains, while
    183      * smaller values increase CAS contention and overhead.
    184      *
    185      * Dual queues with slack differ from plain M&S dual queues by
    186      * virtue of only sometimes updating head or tail pointers when
    187      * matching, appending, or even traversing nodes; in order to
    188      * maintain a targeted slack.  The idea of "sometimes" may be
    189      * operationalized in several ways. The simplest is to use a
    190      * per-operation counter incremented on each traversal step, and
    191      * to try (via CAS) to update the associated queue pointer
    192      * whenever the count exceeds a threshold. Another, that requires
    193      * more overhead, is to use random number generators to update
    194      * with a given probability per traversal step.
    195      *
    196      * In any strategy along these lines, because CASes updating
    197      * fields may fail, the actual slack may exceed targeted
    198      * slack. However, they may be retried at any time to maintain
    199      * targets.  Even when using very small slack values, this
    200      * approach works well for dual queues because it allows all
    201      * operations up to the point of matching or appending an item
    202      * (hence potentially allowing progress by another thread) to be
    203      * read-only, thus not introducing any further contention. As
    204      * described below, we implement this by performing slack
    205      * maintenance retries only after these points.
    206      *
    207      * As an accompaniment to such techniques, traversal overhead can
    208      * be further reduced without increasing contention of head
    209      * pointer updates: Threads may sometimes shortcut the "next" link
    210      * path from the current "head" node to be closer to the currently
    211      * known first unmatched node, and similarly for tail. Again, this
    212      * may be triggered with using thresholds or randomization.
    213      *
    214      * These ideas must be further extended to avoid unbounded amounts
    215      * of costly-to-reclaim garbage caused by the sequential "next"
    216      * links of nodes starting at old forgotten head nodes: As first
    217      * described in detail by Boehm
    218      * (http://portal.acm.org/citation.cfm?doid=503272.503282), if a GC
    219      * delays noticing that any arbitrarily old node has become
    220      * garbage, all newer dead nodes will also be unreclaimed.
    221      * (Similar issues arise in non-GC environments.)  To cope with
    222      * this in our implementation, upon CASing to advance the head
    223      * pointer, we set the "next" link of the previous head to point
    224      * only to itself; thus limiting the length of connected dead lists.
    225      * (We also take similar care to wipe out possibly garbage
    226      * retaining values held in other Node fields.)  However, doing so
    227      * adds some further complexity to traversal: If any "next"
    228      * pointer links to itself, it indicates that the current thread
    229      * has lagged behind a head-update, and so the traversal must
    230      * continue from the "head".  Traversals trying to find the
    231      * current tail starting from "tail" may also encounter
    232      * self-links, in which case they also continue at "head".
    233      *
    234      * It is tempting in slack-based scheme to not even use CAS for
    235      * updates (similarly to Ladan-Mozes & Shavit). However, this
    236      * cannot be done for head updates under the above link-forgetting
    237      * mechanics because an update may leave head at a detached node.
    238      * And while direct writes are possible for tail updates, they
    239      * increase the risk of long retraversals, and hence long garbage
    240      * chains, which can be much more costly than is worthwhile
    241      * considering that the cost difference of performing a CAS vs
    242      * write is smaller when they are not triggered on each operation
    243      * (especially considering that writes and CASes equally require
    244      * additional GC bookkeeping ("write barriers") that are sometimes
    245      * more costly than the writes themselves because of contention).
    246      *
    247      * *** Overview of implementation ***
    248      *
    249      * We use a threshold-based approach to updates, with a slack
    250      * threshold of two -- that is, we update head/tail when the
    251      * current pointer appears to be two or more steps away from the
    252      * first/last node. The slack value is hard-wired: a path greater
    253      * than one is naturally implemented by checking equality of
    254      * traversal pointers except when the list has only one element,
    255      * in which case we keep slack threshold at one. Avoiding tracking
    256      * explicit counts across method calls slightly simplifies an
    257      * already-messy implementation. Using randomization would
    258      * probably work better if there were a low-quality dirt-cheap
    259      * per-thread one available, but even ThreadLocalRandom is too
    260      * heavy for these purposes.
    261      *
    262      * With such a small slack threshold value, it is not worthwhile
    263      * to augment this with path short-circuiting (i.e., unsplicing
    264      * interior nodes) except in the case of cancellation/removal (see
    265      * below).
    266      *
    267      * We allow both the head and tail fields to be null before any
    268      * nodes are enqueued; initializing upon first append.  This
    269      * simplifies some other logic, as well as providing more
    270      * efficient explicit control paths instead of letting JVMs insert
    271      * implicit NullPointerExceptions when they are null.  While not
    272      * currently fully implemented, we also leave open the possibility
    273      * of re-nulling these fields when empty (which is complicated to
    274      * arrange, for little benefit.)
    275      *
    276      * All enqueue/dequeue operations are handled by the single method
    277      * "xfer" with parameters indicating whether to act as some form
    278      * of offer, put, poll, take, or transfer (each possibly with
    279      * timeout). The relative complexity of using one monolithic
    280      * method outweighs the code bulk and maintenance problems of
    281      * using separate methods for each case.
    282      *
    283      * Operation consists of up to three phases. The first is
    284      * implemented within method xfer, the second in tryAppend, and
    285      * the third in method awaitMatch.
    286      *
    287      * 1. Try to match an existing node
    288      *
    289      *    Starting at head, skip already-matched nodes until finding
    290      *    an unmatched node of opposite mode, if one exists, in which
    291      *    case matching it and returning, also if necessary updating
    292      *    head to one past the matched node (or the node itself if the
    293      *    list has no other unmatched nodes). If the CAS misses, then
    294      *    a loop retries advancing head by two steps until either
    295      *    success or the slack is at most two. By requiring that each
    296      *    attempt advances head by two (if applicable), we ensure that
    297      *    the slack does not grow without bound. Traversals also check
    298      *    if the initial head is now off-list, in which case they
    299      *    start at the new head.
    300      *
    301      *    If no candidates are found and the call was untimed
    302      *    poll/offer, (argument "how" is NOW) return.
    303      *
    304      * 2. Try to append a new node (method tryAppend)
    305      *
    306      *    Starting at current tail pointer, find the actual last node
    307      *    and try to append a new node (or if head was null, establish
    308      *    the first node). Nodes can be appended only if their
    309      *    predecessors are either already matched or are of the same
    310      *    mode. If we detect otherwise, then a new node with opposite
    311      *    mode must have been appended during traversal, so we must
    312      *    restart at phase 1. The traversal and update steps are
    313      *    otherwise similar to phase 1: Retrying upon CAS misses and
    314      *    checking for staleness.  In particular, if a self-link is
    315      *    encountered, then we can safely jump to a node on the list
    316      *    by continuing the traversal at current head.
    317      *
    318      *    On successful append, if the call was ASYNC, return.
    319      *
    320      * 3. Await match or cancellation (method awaitMatch)
    321      *
    322      *    Wait for another thread to match node; instead cancelling if
    323      *    the current thread was interrupted or the wait timed out. On
    324      *    multiprocessors, we use front-of-queue spinning: If a node
    325      *    appears to be the first unmatched node in the queue, it
    326      *    spins a bit before blocking. In either case, before blocking
    327      *    it tries to unsplice any nodes between the current "head"
    328      *    and the first unmatched node.
    329      *
    330      *    Front-of-queue spinning vastly improves performance of
    331      *    heavily contended queues. And so long as it is relatively
    332      *    brief and "quiet", spinning does not much impact performance
    333      *    of less-contended queues.  During spins threads check their
    334      *    interrupt status and generate a thread-local random number
    335      *    to decide to occasionally perform a Thread.yield. While
    336      *    yield has underdefined specs, we assume that it might help,
    337      *    and will not hurt, in limiting impact of spinning on busy
    338      *    systems.  We also use smaller (1/2) spins for nodes that are
    339      *    not known to be front but whose predecessors have not
    340      *    blocked -- these "chained" spins avoid artifacts of
    341      *    front-of-queue rules which otherwise lead to alternating
    342      *    nodes spinning vs blocking. Further, front threads that
    343      *    represent phase changes (from data to request node or vice
    344      *    versa) compared to their predecessors receive additional
    345      *    chained spins, reflecting longer paths typically required to
    346      *    unblock threads during phase changes.
    347      *
    348      *
    349      * ** Unlinking removed interior nodes **
    350      *
    351      * In addition to minimizing garbage retention via self-linking
    352      * described above, we also unlink removed interior nodes. These
    353      * may arise due to timed out or interrupted waits, or calls to
    354      * remove(x) or Iterator.remove.  Normally, given a node that was
    355      * at one time known to be the predecessor of some node s that is
    356      * to be removed, we can unsplice s by CASing the next field of
    357      * its predecessor if it still points to s (otherwise s must
    358      * already have been removed or is now offlist). But there are two
    359      * situations in which we cannot guarantee to make node s
    360      * unreachable in this way: (1) If s is the trailing node of list
    361      * (i.e., with null next), then it is pinned as the target node
    362      * for appends, so can only be removed later after other nodes are
    363      * appended. (2) We cannot necessarily unlink s given a
    364      * predecessor node that is matched (including the case of being
    365      * cancelled): the predecessor may already be unspliced, in which
    366      * case some previous reachable node may still point to s.
    367      * (For further explanation see Herlihy & Shavit "The Art of
    368      * Multiprocessor Programming" chapter 9).  Although, in both
    369      * cases, we can rule out the need for further action if either s
    370      * or its predecessor are (or can be made to be) at, or fall off
    371      * from, the head of list.
    372      *
    373      * Without taking these into account, it would be possible for an
    374      * unbounded number of supposedly removed nodes to remain
    375      * reachable.  Situations leading to such buildup are uncommon but
    376      * can occur in practice; for example when a series of short timed
    377      * calls to poll repeatedly time out but never otherwise fall off
    378      * the list because of an untimed call to take at the front of the
    379      * queue.
    380      *
    381      * When these cases arise, rather than always retraversing the
    382      * entire list to find an actual predecessor to unlink (which
    383      * won't help for case (1) anyway), we record a conservative
    384      * estimate of possible unsplice failures (in "sweepVotes").
    385      * We trigger a full sweep when the estimate exceeds a threshold
    386      * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
    387      * removal failures to tolerate before sweeping through, unlinking
    388      * cancelled nodes that were not unlinked upon initial removal.
    389      * We perform sweeps by the thread hitting threshold (rather than
    390      * background threads or by spreading work to other threads)
    391      * because in the main contexts in which removal occurs, the
    392      * caller is already timed-out, cancelled, or performing a
    393      * potentially O(n) operation (e.g. remove(x)), none of which are
    394      * time-critical enough to warrant the overhead that alternatives
    395      * would impose on other threads.
    396      *
    397      * Because the sweepVotes estimate is conservative, and because
    398      * nodes become unlinked "naturally" as they fall off the head of
    399      * the queue, and because we allow votes to accumulate even while
    400      * sweeps are in progress, there are typically significantly fewer
    401      * such nodes than estimated.  Choice of a threshold value
    402      * balances the likelihood of wasted effort and contention, versus
    403      * providing a worst-case bound on retention of interior nodes in
    404      * quiescent queues. The value defined below was chosen
    405      * empirically to balance these under various timeout scenarios.
    406      *
    407      * Note that we cannot self-link unlinked interior nodes during
    408      * sweeps. However, the associated garbage chains terminate when
    409      * some successor ultimately falls off the head of the list and is
    410      * self-linked.
    411      */
    412 
    413     /** True if on multiprocessor */
    414     private static final boolean MP =
    415         Runtime.getRuntime().availableProcessors() > 1;
    416 
    417     /**
    418      * The number of times to spin (with randomly interspersed calls
    419      * to Thread.yield) on multiprocessor before blocking when a node
    420      * is apparently the first waiter in the queue.  See above for
    421      * explanation. Must be a power of two. The value is empirically
    422      * derived -- it works pretty well across a variety of processors,
    423      * numbers of CPUs, and OSes.
    424      */
    425     private static final int FRONT_SPINS   = 1 << 7;
    426 
    427     /**
    428      * The number of times to spin before blocking when a node is
    429      * preceded by another node that is apparently spinning.  Also
    430      * serves as an increment to FRONT_SPINS on phase changes, and as
    431      * base average frequency for yielding during spins. Must be a
    432      * power of two.
    433      */
    434     private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
    435 
    436     /**
    437      * The maximum number of estimated removal failures (sweepVotes)
    438      * to tolerate before sweeping through the queue unlinking
    439      * cancelled nodes that were not unlinked upon initial
    440      * removal. See above for explanation. The value must be at least
    441      * two to avoid useless sweeps when removing trailing nodes.
    442      */
    443     static final int SWEEP_THRESHOLD = 32;
    444 
    445     /**
    446      * Queue nodes. Uses Object, not E, for items to allow forgetting
    447      * them after use.  Relies heavily on Unsafe mechanics to minimize
    448      * unnecessary ordering constraints: Writes that are intrinsically
    449      * ordered wrt other accesses or CASes use simple relaxed forms.
    450      */
    451     static final class Node {
    452         final boolean isData;   // false if this is a request node
    453         volatile Object item;   // initially non-null if isData; CASed to match
    454         volatile Node next;
    455         volatile Thread waiter; // null until waiting
    456 
    457         // CAS methods for fields
    458         final boolean casNext(Node cmp, Node val) {
    459             return U.compareAndSwapObject(this, NEXT, cmp, val);
    460         }
    461 
    462         final boolean casItem(Object cmp, Object val) {
    463             // assert cmp == null || cmp.getClass() != Node.class;
    464             return U.compareAndSwapObject(this, ITEM, cmp, val);
    465         }
    466 
    467         /**
    468          * Constructs a new node.  Uses relaxed write because item can
    469          * only be seen after publication via casNext.
    470          */
    471         Node(Object item, boolean isData) {
    472             U.putObject(this, ITEM, item); // relaxed write
    473             this.isData = isData;
    474         }
    475 
    476         /**
    477          * Links node to itself to avoid garbage retention.  Called
    478          * only after CASing head field, so uses relaxed write.
    479          */
    480         final void forgetNext() {
    481             U.putObject(this, NEXT, this);
    482         }
    483 
    484         /**
    485          * Sets item to self and waiter to null, to avoid garbage
    486          * retention after matching or cancelling. Uses relaxed writes
    487          * because order is already constrained in the only calling
    488          * contexts: item is forgotten only after volatile/atomic
    489          * mechanics that extract items.  Similarly, clearing waiter
    490          * follows either CAS or return from park (if ever parked;
    491          * else we don't care).
    492          */
    493         final void forgetContents() {
    494             U.putObject(this, ITEM, this);
    495             U.putObject(this, WAITER, null);
    496         }
    497 
    498         /**
    499          * Returns true if this node has been matched, including the
    500          * case of artificial matches due to cancellation.
    501          */
    502         final boolean isMatched() {
    503             Object x = item;
    504             return (x == this) || ((x == null) == isData);
    505         }
    506 
    507         /**
    508          * Returns true if this is an unmatched request node.
    509          */
    510         final boolean isUnmatchedRequest() {
    511             return !isData && item == null;
    512         }
    513 
    514         /**
    515          * Returns true if a node with the given mode cannot be
    516          * appended to this node because this node is unmatched and
    517          * has opposite data mode.
    518          */
    519         final boolean cannotPrecede(boolean haveData) {
    520             boolean d = isData;
    521             Object x;
    522             return d != haveData && (x = item) != this && (x != null) == d;
    523         }
    524 
    525         /**
    526          * Tries to artificially match a data node -- used by remove.
    527          */
    528         final boolean tryMatchData() {
    529             // assert isData;
    530             Object x = item;
    531             if (x != null && x != this && casItem(x, null)) {
    532                 LockSupport.unpark(waiter);
    533                 return true;
    534             }
    535             return false;
    536         }
    537 
    538         private static final long serialVersionUID = -3375979862319811754L;
    539 
    540         // Unsafe mechanics
    541         private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    542         private static final long ITEM;
    543         private static final long NEXT;
    544         private static final long WAITER;
    545         static {
    546             try {
    547                 ITEM = U.objectFieldOffset
    548                     (Node.class.getDeclaredField("item"));
    549                 NEXT = U.objectFieldOffset
    550                     (Node.class.getDeclaredField("next"));
    551                 WAITER = U.objectFieldOffset
    552                     (Node.class.getDeclaredField("waiter"));
    553             } catch (ReflectiveOperationException e) {
    554                 throw new Error(e);
    555             }
    556         }
    557     }
    558 
    559     /** head of the queue; null until first enqueue */
    560     transient volatile Node head;
    561 
    562     /** tail of the queue; null until first append */
    563     private transient volatile Node tail;
    564 
    565     /** The number of apparent failures to unsplice removed nodes */
    566     private transient volatile int sweepVotes;
    567 
    568     // CAS methods for fields
    569     private boolean casTail(Node cmp, Node val) {
    570         return U.compareAndSwapObject(this, TAIL, cmp, val);
    571     }
    572 
    573     private boolean casHead(Node cmp, Node val) {
    574         return U.compareAndSwapObject(this, HEAD, cmp, val);
    575     }
    576 
    577     private boolean casSweepVotes(int cmp, int val) {
    578         return U.compareAndSwapInt(this, SWEEPVOTES, cmp, val);
    579     }
    580 
    581     /*
    582      * Possible values for "how" argument in xfer method.
    583      */
    584     private static final int NOW   = 0; // for untimed poll, tryTransfer
    585     private static final int ASYNC = 1; // for offer, put, add
    586     private static final int SYNC  = 2; // for transfer, take
    587     private static final int TIMED = 3; // for timed poll, tryTransfer
    588 
    589     /**
    590      * Implements all queuing methods. See above for explanation.
    591      *
    592      * @param e the item or null for take
    593      * @param haveData true if this is a put, else a take
    594      * @param how NOW, ASYNC, SYNC, or TIMED
    595      * @param nanos timeout in nanosecs, used only if mode is TIMED
    596      * @return an item if matched, else e
    597      * @throws NullPointerException if haveData mode but e is null
    598      */
    599     private E xfer(E e, boolean haveData, int how, long nanos) {
    600         if (haveData && (e == null))
    601             throw new NullPointerException();
    602         Node s = null;                        // the node to append, if needed
    603 
    604         retry:
    605         for (;;) {                            // restart on append race
    606 
    607             for (Node h = head, p = h; p != null;) { // find & match first node
    608                 boolean isData = p.isData;
    609                 Object item = p.item;
    610                 if (item != p && (item != null) == isData) { // unmatched
    611                     if (isData == haveData)   // can't match
    612                         break;
    613                     if (p.casItem(item, e)) { // match
    614                         for (Node q = p; q != h;) {
    615                             Node n = q.next;  // update by 2 unless singleton
    616                             if (head == h && casHead(h, n == null ? q : n)) {
    617                                 h.forgetNext();
    618                                 break;
    619                             }                 // advance and retry
    620                             if ((h = head)   == null ||
    621                                 (q = h.next) == null || !q.isMatched())
    622                                 break;        // unless slack < 2
    623                         }
    624                         LockSupport.unpark(p.waiter);
    625                         @SuppressWarnings("unchecked") E itemE = (E) item;
    626                         return itemE;
    627                     }
    628                 }
    629                 Node n = p.next;
    630                 p = (p != n) ? n : (h = head); // Use head if p offlist
    631             }
    632 
    633             if (how != NOW) {                 // No matches available
    634                 if (s == null)
    635                     s = new Node(e, haveData);
    636                 Node pred = tryAppend(s, haveData);
    637                 if (pred == null)
    638                     continue retry;           // lost race vs opposite mode
    639                 if (how != ASYNC)
    640                     return awaitMatch(s, pred, e, (how == TIMED), nanos);
    641             }
    642             return e; // not waiting
    643         }
    644     }
    645 
    646     /**
    647      * Tries to append node s as tail.
    648      *
    649      * @param s the node to append
    650      * @param haveData true if appending in data mode
    651      * @return null on failure due to losing race with append in
    652      * different mode, else s's predecessor, or s itself if no
    653      * predecessor
    654      */
    655     private Node tryAppend(Node s, boolean haveData) {
    656         for (Node t = tail, p = t;;) {        // move p to last node and append
    657             Node n, u;                        // temps for reads of next & tail
    658             if (p == null && (p = head) == null) {
    659                 if (casHead(null, s))
    660                     return s;                 // initialize
    661             }
    662             else if (p.cannotPrecede(haveData))
    663                 return null;                  // lost race vs opposite mode
    664             else if ((n = p.next) != null)    // not last; keep traversing
    665                 p = p != t && t != (u = tail) ? (t = u) : // stale tail
    666                     (p != n) ? n : null;      // restart if off list
    667             else if (!p.casNext(null, s))
    668                 p = p.next;                   // re-read on CAS failure
    669             else {
    670                 if (p != t) {                 // update if slack now >= 2
    671                     while ((tail != t || !casTail(t, s)) &&
    672                            (t = tail)   != null &&
    673                            (s = t.next) != null && // advance and retry
    674                            (s = s.next) != null && s != t);
    675                 }
    676                 return p;
    677             }
    678         }
    679     }
    680 
    681     /**
    682      * Spins/yields/blocks until node s is matched or caller gives up.
    683      *
    684      * @param s the waiting node
    685      * @param pred the predecessor of s, or s itself if it has no
    686      * predecessor, or null if unknown (the null case does not occur
    687      * in any current calls but may in possible future extensions)
    688      * @param e the comparison value for checking match
    689      * @param timed if true, wait only until timeout elapses
    690      * @param nanos timeout in nanosecs, used only if timed is true
    691      * @return matched item, or e if unmatched on interrupt or timeout
    692      */
    693     private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    694         final long deadline = timed ? System.nanoTime() + nanos : 0L;
    695         Thread w = Thread.currentThread();
    696         int spins = -1; // initialized after first item and cancel checks
    697         ThreadLocalRandom randomYields = null; // bound if needed
    698 
    699         for (;;) {
    700             Object item = s.item;
    701             if (item != e) {                  // matched
    702                 // assert item != s;
    703                 s.forgetContents();           // avoid garbage
    704                 @SuppressWarnings("unchecked") E itemE = (E) item;
    705                 return itemE;
    706             }
    707             else if (w.isInterrupted() || (timed && nanos <= 0L)) {
    708                 unsplice(pred, s);           // try to unlink and cancel
    709                 if (s.casItem(e, s))         // return normally if lost CAS
    710                     return e;
    711             }
    712             else if (spins < 0) {            // establish spins at/near front
    713                 if ((spins = spinsFor(pred, s.isData)) > 0)
    714                     randomYields = ThreadLocalRandom.current();
    715             }
    716             else if (spins > 0) {             // spin
    717                 --spins;
    718                 if (randomYields.nextInt(CHAINED_SPINS) == 0)
    719                     Thread.yield();           // occasionally yield
    720             }
    721             else if (s.waiter == null) {
    722                 s.waiter = w;                 // request unpark then recheck
    723             }
    724             else if (timed) {
    725                 nanos = deadline - System.nanoTime();
    726                 if (nanos > 0L)
    727                     LockSupport.parkNanos(this, nanos);
    728             }
    729             else {
    730                 LockSupport.park(this);
    731             }
    732         }
    733     }
    734 
    735     /**
    736      * Returns spin/yield value for a node with given predecessor and
    737      * data mode. See above for explanation.
    738      */
    739     private static int spinsFor(Node pred, boolean haveData) {
    740         if (MP && pred != null) {
    741             if (pred.isData != haveData)      // phase change
    742                 return FRONT_SPINS + CHAINED_SPINS;
    743             if (pred.isMatched())             // probably at front
    744                 return FRONT_SPINS;
    745             if (pred.waiter == null)          // pred apparently spinning
    746                 return CHAINED_SPINS;
    747         }
    748         return 0;
    749     }
    750 
    751     /* -------------- Traversal methods -------------- */
    752 
    753     /**
    754      * Returns the successor of p, or the head node if p.next has been
    755      * linked to self, which will only be true if traversing with a
    756      * stale pointer that is now off the list.
    757      */
    758     final Node succ(Node p) {
    759         Node next = p.next;
    760         return (p == next) ? head : next;
    761     }
    762 
    763     /**
    764      * Returns the first unmatched data node, or null if none.
    765      * Callers must recheck if the returned node's item field is null
    766      * or self-linked before using.
    767      */
    768     final Node firstDataNode() {
    769         restartFromHead: for (;;) {
    770             for (Node p = head; p != null;) {
    771                 Object item = p.item;
    772                 if (p.isData) {
    773                     if (item != null && item != p)
    774                         return p;
    775                 }
    776                 else if (item == null)
    777                     break;
    778                 if (p == (p = p.next))
    779                     continue restartFromHead;
    780             }
    781             return null;
    782         }
    783     }
    784 
    785     /**
    786      * Traverses and counts unmatched nodes of the given mode.
    787      * Used by methods size and getWaitingConsumerCount.
    788      */
    789     private int countOfMode(boolean data) {
    790         restartFromHead: for (;;) {
    791             int count = 0;
    792             for (Node p = head; p != null;) {
    793                 if (!p.isMatched()) {
    794                     if (p.isData != data)
    795                         return 0;
    796                     if (++count == Integer.MAX_VALUE)
    797                         break;  // @see Collection.size()
    798                 }
    799                 if (p == (p = p.next))
    800                     continue restartFromHead;
    801             }
    802             return count;
    803         }
    804     }
    805 
    806     public String toString() {
    807         String[] a = null;
    808         restartFromHead: for (;;) {
    809             int charLength = 0;
    810             int size = 0;
    811             for (Node p = head; p != null;) {
    812                 Object item = p.item;
    813                 if (p.isData) {
    814                     if (item != null && item != p) {
    815                         if (a == null)
    816                             a = new String[4];
    817                         else if (size == a.length)
    818                             a = Arrays.copyOf(a, 2 * size);
    819                         String s = item.toString();
    820                         a[size++] = s;
    821                         charLength += s.length();
    822                     }
    823                 } else if (item == null)
    824                     break;
    825                 if (p == (p = p.next))
    826                     continue restartFromHead;
    827             }
    828 
    829             if (size == 0)
    830                 return "[]";
    831 
    832             return Helpers.toString(a, size, charLength);
    833         }
    834     }
    835 
    836     private Object[] toArrayInternal(Object[] a) {
    837         Object[] x = a;
    838         restartFromHead: for (;;) {
    839             int size = 0;
    840             for (Node p = head; p != null;) {
    841                 Object item = p.item;
    842                 if (p.isData) {
    843                     if (item != null && item != p) {
    844                         if (x == null)
    845                             x = new Object[4];
    846                         else if (size == x.length)
    847                             x = Arrays.copyOf(x, 2 * (size + 4));
    848                         x[size++] = item;
    849                     }
    850                 } else if (item == null)
    851                     break;
    852                 if (p == (p = p.next))
    853                     continue restartFromHead;
    854             }
    855             if (x == null)
    856                 return new Object[0];
    857             else if (a != null && size <= a.length) {
    858                 if (a != x)
    859                     System.arraycopy(x, 0, a, 0, size);
    860                 if (size < a.length)
    861                     a[size] = null;
    862                 return a;
    863             }
    864             return (size == x.length) ? x : Arrays.copyOf(x, size);
    865         }
    866     }
    867 
    868     /**
    869      * Returns an array containing all of the elements in this queue, in
    870      * proper sequence.
    871      *
    872      * <p>The returned array will be "safe" in that no references to it are
    873      * maintained by this queue.  (In other words, this method must allocate
    874      * a new array).  The caller is thus free to modify the returned array.
    875      *
    876      * <p>This method acts as bridge between array-based and collection-based
    877      * APIs.
    878      *
    879      * @return an array containing all of the elements in this queue
    880      */
    881     public Object[] toArray() {
    882         return toArrayInternal(null);
    883     }
    884 
    885     /**
    886      * Returns an array containing all of the elements in this queue, in
    887      * proper sequence; the runtime type of the returned array is that of
    888      * the specified array.  If the queue fits in the specified array, it
    889      * is returned therein.  Otherwise, a new array is allocated with the
    890      * runtime type of the specified array and the size of this queue.
    891      *
    892      * <p>If this queue fits in the specified array with room to spare
    893      * (i.e., the array has more elements than this queue), the element in
    894      * the array immediately following the end of the queue is set to
    895      * {@code null}.
    896      *
    897      * <p>Like the {@link #toArray()} method, this method acts as bridge between
    898      * array-based and collection-based APIs.  Further, this method allows
    899      * precise control over the runtime type of the output array, and may,
    900      * under certain circumstances, be used to save allocation costs.
    901      *
    902      * <p>Suppose {@code x} is a queue known to contain only strings.
    903      * The following code can be used to dump the queue into a newly
    904      * allocated array of {@code String}:
    905      *
    906      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
    907      *
    908      * Note that {@code toArray(new Object[0])} is identical in function to
    909      * {@code toArray()}.
    910      *
    911      * @param a the array into which the elements of the queue are to
    912      *          be stored, if it is big enough; otherwise, a new array of the
    913      *          same runtime type is allocated for this purpose
    914      * @return an array containing all of the elements in this queue
    915      * @throws ArrayStoreException if the runtime type of the specified array
    916      *         is not a supertype of the runtime type of every element in
    917      *         this queue
    918      * @throws NullPointerException if the specified array is null
    919      */
    920     @SuppressWarnings("unchecked")
    921     public <T> T[] toArray(T[] a) {
    922         if (a == null) throw new NullPointerException();
    923         return (T[]) toArrayInternal(a);
    924     }
    925 
    926     final class Itr implements Iterator<E> {
    927         private Node nextNode;   // next node to return item for
    928         private E nextItem;      // the corresponding item
    929         private Node lastRet;    // last returned node, to support remove
    930         private Node lastPred;   // predecessor to unlink lastRet
    931 
    932         /**
    933          * Moves to next node after prev, or first node if prev null.
    934          */
    935         private void advance(Node prev) {
    936             /*
    937              * To track and avoid buildup of deleted nodes in the face
    938              * of calls to both Queue.remove and Itr.remove, we must
    939              * include variants of unsplice and sweep upon each
    940              * advance: Upon Itr.remove, we may need to catch up links
    941              * from lastPred, and upon other removes, we might need to
    942              * skip ahead from stale nodes and unsplice deleted ones
    943              * found while advancing.
    944              */
    945 
    946             Node r, b; // reset lastPred upon possible deletion of lastRet
    947             if ((r = lastRet) != null && !r.isMatched())
    948                 lastPred = r;    // next lastPred is old lastRet
    949             else if ((b = lastPred) == null || b.isMatched())
    950                 lastPred = null; // at start of list
    951             else {
    952                 Node s, n;       // help with removal of lastPred.next
    953                 while ((s = b.next) != null &&
    954                        s != b && s.isMatched() &&
    955                        (n = s.next) != null && n != s)
    956                     b.casNext(s, n);
    957             }
    958 
    959             this.lastRet = prev;
    960 
    961             for (Node p = prev, s, n;;) {
    962                 s = (p == null) ? head : p.next;
    963                 if (s == null)
    964                     break;
    965                 else if (s == p) {
    966                     p = null;
    967                     continue;
    968                 }
    969                 Object item = s.item;
    970                 if (s.isData) {
    971                     if (item != null && item != s) {
    972                         @SuppressWarnings("unchecked") E itemE = (E) item;
    973                         nextItem = itemE;
    974                         nextNode = s;
    975                         return;
    976                     }
    977                 }
    978                 else if (item == null)
    979                     break;
    980                 // assert s.isMatched();
    981                 if (p == null)
    982                     p = s;
    983                 else if ((n = s.next) == null)
    984                     break;
    985                 else if (s == n)
    986                     p = null;
    987                 else
    988                     p.casNext(s, n);
    989             }
    990             nextNode = null;
    991             nextItem = null;
    992         }
    993 
    994         Itr() {
    995             advance(null);
    996         }
    997 
    998         public final boolean hasNext() {
    999             return nextNode != null;
   1000         }
   1001 
   1002         public final E next() {
   1003             Node p = nextNode;
   1004             if (p == null) throw new NoSuchElementException();
   1005             E e = nextItem;
   1006             advance(p);
   1007             return e;
   1008         }
   1009 
   1010         public final void remove() {
   1011             final Node lastRet = this.lastRet;
   1012             if (lastRet == null)
   1013                 throw new IllegalStateException();
   1014             this.lastRet = null;
   1015             if (lastRet.tryMatchData())
   1016                 unsplice(lastPred, lastRet);
   1017         }
   1018     }
   1019 
   1020     /** A customized variant of Spliterators.IteratorSpliterator */
   1021     final class LTQSpliterator<E> implements Spliterator<E> {
   1022         static final int MAX_BATCH = 1 << 25;  // max batch array size;
   1023         Node current;       // current node; null until initialized
   1024         int batch;          // batch size for splits
   1025         boolean exhausted;  // true when no more nodes
   1026         LTQSpliterator() {}
   1027 
   1028         public Spliterator<E> trySplit() {
   1029             Node p;
   1030             int b = batch;
   1031             int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
   1032             if (!exhausted &&
   1033                 ((p = current) != null || (p = firstDataNode()) != null) &&
   1034                 p.next != null) {
   1035                 Object[] a = new Object[n];
   1036                 int i = 0;
   1037                 do {
   1038                     Object e = p.item;
   1039                     if (e != p && (a[i] = e) != null)
   1040                         ++i;
   1041                     if (p == (p = p.next))
   1042                         p = firstDataNode();
   1043                 } while (p != null && i < n && p.isData);
   1044                 if ((current = p) == null)
   1045                     exhausted = true;
   1046                 if (i > 0) {
   1047                     batch = i;
   1048                     return Spliterators.spliterator
   1049                         (a, 0, i, (Spliterator.ORDERED |
   1050                                    Spliterator.NONNULL |
   1051                                    Spliterator.CONCURRENT));
   1052                 }
   1053             }
   1054             return null;
   1055         }
   1056 
   1057         @SuppressWarnings("unchecked")
   1058         public void forEachRemaining(Consumer<? super E> action) {
   1059             Node p;
   1060             if (action == null) throw new NullPointerException();
   1061             if (!exhausted &&
   1062                 ((p = current) != null || (p = firstDataNode()) != null)) {
   1063                 exhausted = true;
   1064                 do {
   1065                     Object e = p.item;
   1066                     if (e != null && e != p)
   1067                         action.accept((E)e);
   1068                     if (p == (p = p.next))
   1069                         p = firstDataNode();
   1070                 } while (p != null && p.isData);
   1071             }
   1072         }
   1073 
   1074         @SuppressWarnings("unchecked")
   1075         public boolean tryAdvance(Consumer<? super E> action) {
   1076             Node p;
   1077             if (action == null) throw new NullPointerException();
   1078             if (!exhausted &&
   1079                 ((p = current) != null || (p = firstDataNode()) != null)) {
   1080                 Object e;
   1081                 do {
   1082                     if ((e = p.item) == p)
   1083                         e = null;
   1084                     if (p == (p = p.next))
   1085                         p = firstDataNode();
   1086                 } while (e == null && p != null && p.isData);
   1087                 if ((current = p) == null)
   1088                     exhausted = true;
   1089                 if (e != null) {
   1090                     action.accept((E)e);
   1091                     return true;
   1092                 }
   1093             }
   1094             return false;
   1095         }
   1096 
   1097         public long estimateSize() { return Long.MAX_VALUE; }
   1098 
   1099         public int characteristics() {
   1100             return Spliterator.ORDERED | Spliterator.NONNULL |
   1101                 Spliterator.CONCURRENT;
   1102         }
   1103     }
   1104 
   1105     /**
   1106      * Returns a {@link Spliterator} over the elements in this queue.
   1107      *
   1108      * <p>The returned spliterator is
   1109      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
   1110      *
   1111      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
   1112      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
   1113      *
   1114      * @implNote
   1115      * The {@code Spliterator} implements {@code trySplit} to permit limited
   1116      * parallelism.
   1117      *
   1118      * @return a {@code Spliterator} over the elements in this queue
   1119      * @since 1.8
   1120      */
   1121     public Spliterator<E> spliterator() {
   1122         return new LTQSpliterator<E>();
   1123     }
   1124 
   1125     /* -------------- Removal methods -------------- */
   1126 
   1127     /**
   1128      * Unsplices (now or later) the given deleted/cancelled node with
   1129      * the given predecessor.
   1130      *
   1131      * @param pred a node that was at one time known to be the
   1132      * predecessor of s, or null or s itself if s is/was at head
   1133      * @param s the node to be unspliced
   1134      */
   1135     final void unsplice(Node pred, Node s) {
   1136         s.waiter = null; // disable signals
   1137         /*
   1138          * See above for rationale. Briefly: if pred still points to
   1139          * s, try to unlink s.  If s cannot be unlinked, because it is
   1140          * trailing node or pred might be unlinked, and neither pred
   1141          * nor s are head or offlist, add to sweepVotes, and if enough
   1142          * votes have accumulated, sweep.
   1143          */
   1144         if (pred != null && pred != s && pred.next == s) {
   1145             Node n = s.next;
   1146             if (n == null ||
   1147                 (n != s && pred.casNext(s, n) && pred.isMatched())) {
   1148                 for (;;) {               // check if at, or could be, head
   1149                     Node h = head;
   1150                     if (h == pred || h == s || h == null)
   1151                         return;          // at head or list empty
   1152                     if (!h.isMatched())
   1153                         break;
   1154                     Node hn = h.next;
   1155                     if (hn == null)
   1156                         return;          // now empty
   1157                     if (hn != h && casHead(h, hn))
   1158                         h.forgetNext();  // advance head
   1159                 }
   1160                 if (pred.next != pred && s.next != s) { // recheck if offlist
   1161                     for (;;) {           // sweep now if enough votes
   1162                         int v = sweepVotes;
   1163                         if (v < SWEEP_THRESHOLD) {
   1164                             if (casSweepVotes(v, v + 1))
   1165                                 break;
   1166                         }
   1167                         else if (casSweepVotes(v, 0)) {
   1168                             sweep();
   1169                             break;
   1170                         }
   1171                     }
   1172                 }
   1173             }
   1174         }
   1175     }
   1176 
   1177     /**
   1178      * Unlinks matched (typically cancelled) nodes encountered in a
   1179      * traversal from head.
   1180      */
   1181     private void sweep() {
   1182         for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
   1183             if (!s.isMatched())
   1184                 // Unmatched nodes are never self-linked
   1185                 p = s;
   1186             else if ((n = s.next) == null) // trailing node is pinned
   1187                 break;
   1188             else if (s == n)    // stale
   1189                 // No need to also check for p == s, since that implies s == n
   1190                 p = head;
   1191             else
   1192                 p.casNext(s, n);
   1193         }
   1194     }
   1195 
   1196     /**
   1197      * Main implementation of remove(Object)
   1198      */
   1199     private boolean findAndRemove(Object e) {
   1200         if (e != null) {
   1201             for (Node pred = null, p = head; p != null; ) {
   1202                 Object item = p.item;
   1203                 if (p.isData) {
   1204                     if (item != null && item != p && e.equals(item) &&
   1205                         p.tryMatchData()) {
   1206                         unsplice(pred, p);
   1207                         return true;
   1208                     }
   1209                 }
   1210                 else if (item == null)
   1211                     break;
   1212                 pred = p;
   1213                 if ((p = p.next) == pred) { // stale
   1214                     pred = null;
   1215                     p = head;
   1216                 }
   1217             }
   1218         }
   1219         return false;
   1220     }
   1221 
   1222     /**
   1223      * Creates an initially empty {@code LinkedTransferQueue}.
   1224      */
   1225     public LinkedTransferQueue() {
   1226     }
   1227 
   1228     /**
   1229      * Creates a {@code LinkedTransferQueue}
   1230      * initially containing the elements of the given collection,
   1231      * added in traversal order of the collection's iterator.
   1232      *
   1233      * @param c the collection of elements to initially contain
   1234      * @throws NullPointerException if the specified collection or any
   1235      *         of its elements are null
   1236      */
   1237     public LinkedTransferQueue(Collection<? extends E> c) {
   1238         this();
   1239         addAll(c);
   1240     }
   1241 
   1242     /**
   1243      * Inserts the specified element at the tail of this queue.
   1244      * As the queue is unbounded, this method will never block.
   1245      *
   1246      * @throws NullPointerException if the specified element is null
   1247      */
   1248     public void put(E e) {
   1249         xfer(e, true, ASYNC, 0);
   1250     }
   1251 
   1252     /**
   1253      * Inserts the specified element at the tail of this queue.
   1254      * As the queue is unbounded, this method will never block or
   1255      * return {@code false}.
   1256      *
   1257      * @return {@code true} (as specified by
   1258      *  {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit)
   1259      *  BlockingQueue.offer})
   1260      * @throws NullPointerException if the specified element is null
   1261      */
   1262     public boolean offer(E e, long timeout, TimeUnit unit) {
   1263         xfer(e, true, ASYNC, 0);
   1264         return true;
   1265     }
   1266 
   1267     /**
   1268      * Inserts the specified element at the tail of this queue.
   1269      * As the queue is unbounded, this method will never return {@code false}.
   1270      *
   1271      * @return {@code true} (as specified by {@link Queue#offer})
   1272      * @throws NullPointerException if the specified element is null
   1273      */
   1274     public boolean offer(E e) {
   1275         xfer(e, true, ASYNC, 0);
   1276         return true;
   1277     }
   1278 
   1279     /**
   1280      * Inserts the specified element at the tail of this queue.
   1281      * As the queue is unbounded, this method will never throw
   1282      * {@link IllegalStateException} or return {@code false}.
   1283      *
   1284      * @return {@code true} (as specified by {@link Collection#add})
   1285      * @throws NullPointerException if the specified element is null
   1286      */
   1287     public boolean add(E e) {
   1288         xfer(e, true, ASYNC, 0);
   1289         return true;
   1290     }
   1291 
   1292     /**
   1293      * Transfers the element to a waiting consumer immediately, if possible.
   1294      *
   1295      * <p>More precisely, transfers the specified element immediately
   1296      * if there exists a consumer already waiting to receive it (in
   1297      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
   1298      * otherwise returning {@code false} without enqueuing the element.
   1299      *
   1300      * @throws NullPointerException if the specified element is null
   1301      */
   1302     public boolean tryTransfer(E e) {
   1303         return xfer(e, true, NOW, 0) == null;
   1304     }
   1305 
   1306     /**
   1307      * Transfers the element to a consumer, waiting if necessary to do so.
   1308      *
   1309      * <p>More precisely, transfers the specified element immediately
   1310      * if there exists a consumer already waiting to receive it (in
   1311      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
   1312      * else inserts the specified element at the tail of this queue
   1313      * and waits until the element is received by a consumer.
   1314      *
   1315      * @throws NullPointerException if the specified element is null
   1316      */
   1317     public void transfer(E e) throws InterruptedException {
   1318         if (xfer(e, true, SYNC, 0) != null) {
   1319             Thread.interrupted(); // failure possible only due to interrupt
   1320             throw new InterruptedException();
   1321         }
   1322     }
   1323 
   1324     /**
   1325      * Transfers the element to a consumer if it is possible to do so
   1326      * before the timeout elapses.
   1327      *
   1328      * <p>More precisely, transfers the specified element immediately
   1329      * if there exists a consumer already waiting to receive it (in
   1330      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
   1331      * else inserts the specified element at the tail of this queue
   1332      * and waits until the element is received by a consumer,
   1333      * returning {@code false} if the specified wait time elapses
   1334      * before the element can be transferred.
   1335      *
   1336      * @throws NullPointerException if the specified element is null
   1337      */
   1338     public boolean tryTransfer(E e, long timeout, TimeUnit unit)
   1339         throws InterruptedException {
   1340         if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
   1341             return true;
   1342         if (!Thread.interrupted())
   1343             return false;
   1344         throw new InterruptedException();
   1345     }
   1346 
   1347     public E take() throws InterruptedException {
   1348         E e = xfer(null, false, SYNC, 0);
   1349         if (e != null)
   1350             return e;
   1351         Thread.interrupted();
   1352         throw new InterruptedException();
   1353     }
   1354 
   1355     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
   1356         E e = xfer(null, false, TIMED, unit.toNanos(timeout));
   1357         if (e != null || !Thread.interrupted())
   1358             return e;
   1359         throw new InterruptedException();
   1360     }
   1361 
   1362     public E poll() {
   1363         return xfer(null, false, NOW, 0);
   1364     }
   1365 
   1366     /**
   1367      * @throws NullPointerException     {@inheritDoc}
   1368      * @throws IllegalArgumentException {@inheritDoc}
   1369      */
   1370     public int drainTo(Collection<? super E> c) {
   1371         if (c == null)
   1372             throw new NullPointerException();
   1373         if (c == this)
   1374             throw new IllegalArgumentException();
   1375         int n = 0;
   1376         for (E e; (e = poll()) != null;) {
   1377             c.add(e);
   1378             ++n;
   1379         }
   1380         return n;
   1381     }
   1382 
   1383     /**
   1384      * @throws NullPointerException     {@inheritDoc}
   1385      * @throws IllegalArgumentException {@inheritDoc}
   1386      */
   1387     public int drainTo(Collection<? super E> c, int maxElements) {
   1388         if (c == null)
   1389             throw new NullPointerException();
   1390         if (c == this)
   1391             throw new IllegalArgumentException();
   1392         int n = 0;
   1393         for (E e; n < maxElements && (e = poll()) != null;) {
   1394             c.add(e);
   1395             ++n;
   1396         }
   1397         return n;
   1398     }
   1399 
   1400     /**
   1401      * Returns an iterator over the elements in this queue in proper sequence.
   1402      * The elements will be returned in order from first (head) to last (tail).
   1403      *
   1404      * <p>The returned iterator is
   1405      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
   1406      *
   1407      * @return an iterator over the elements in this queue in proper sequence
   1408      */
   1409     public Iterator<E> iterator() {
   1410         return new Itr();
   1411     }
   1412 
   1413     public E peek() {
   1414         restartFromHead: for (;;) {
   1415             for (Node p = head; p != null;) {
   1416                 Object item = p.item;
   1417                 if (p.isData) {
   1418                     if (item != null && item != p) {
   1419                         @SuppressWarnings("unchecked") E e = (E) item;
   1420                         return e;
   1421                     }
   1422                 }
   1423                 else if (item == null)
   1424                     break;
   1425                 if (p == (p = p.next))
   1426                     continue restartFromHead;
   1427             }
   1428             return null;
   1429         }
   1430     }
   1431 
   1432     /**
   1433      * Returns {@code true} if this queue contains no elements.
   1434      *
   1435      * @return {@code true} if this queue contains no elements
   1436      */
   1437     public boolean isEmpty() {
   1438         return firstDataNode() == null;
   1439     }
   1440 
   1441     public boolean hasWaitingConsumer() {
   1442         restartFromHead: for (;;) {
   1443             for (Node p = head; p != null;) {
   1444                 Object item = p.item;
   1445                 if (p.isData) {
   1446                     if (item != null && item != p)
   1447                         break;
   1448                 }
   1449                 else if (item == null)
   1450                     return true;
   1451                 if (p == (p = p.next))
   1452                     continue restartFromHead;
   1453             }
   1454             return false;
   1455         }
   1456     }
   1457 
   1458     /**
   1459      * Returns the number of elements in this queue.  If this queue
   1460      * contains more than {@code Integer.MAX_VALUE} elements, returns
   1461      * {@code Integer.MAX_VALUE}.
   1462      *
   1463      * <p>Beware that, unlike in most collections, this method is
   1464      * <em>NOT</em> a constant-time operation. Because of the
   1465      * asynchronous nature of these queues, determining the current
   1466      * number of elements requires an O(n) traversal.
   1467      *
   1468      * @return the number of elements in this queue
   1469      */
   1470     public int size() {
   1471         return countOfMode(true);
   1472     }
   1473 
   1474     public int getWaitingConsumerCount() {
   1475         return countOfMode(false);
   1476     }
   1477 
   1478     /**
   1479      * Removes a single instance of the specified element from this queue,
   1480      * if it is present.  More formally, removes an element {@code e} such
   1481      * that {@code o.equals(e)}, if this queue contains one or more such
   1482      * elements.
   1483      * Returns {@code true} if this queue contained the specified element
   1484      * (or equivalently, if this queue changed as a result of the call).
   1485      *
   1486      * @param o element to be removed from this queue, if present
   1487      * @return {@code true} if this queue changed as a result of the call
   1488      */
   1489     public boolean remove(Object o) {
   1490         return findAndRemove(o);
   1491     }
   1492 
   1493     /**
   1494      * Returns {@code true} if this queue contains the specified element.
   1495      * More formally, returns {@code true} if and only if this queue contains
   1496      * at least one element {@code e} such that {@code o.equals(e)}.
   1497      *
   1498      * @param o object to be checked for containment in this queue
   1499      * @return {@code true} if this queue contains the specified element
   1500      */
   1501     public boolean contains(Object o) {
   1502         if (o != null) {
   1503             for (Node p = head; p != null; p = succ(p)) {
   1504                 Object item = p.item;
   1505                 if (p.isData) {
   1506                     if (item != null && item != p && o.equals(item))
   1507                         return true;
   1508                 }
   1509                 else if (item == null)
   1510                     break;
   1511             }
   1512         }
   1513         return false;
   1514     }
   1515 
   1516     /**
   1517      * Always returns {@code Integer.MAX_VALUE} because a
   1518      * {@code LinkedTransferQueue} is not capacity constrained.
   1519      *
   1520      * @return {@code Integer.MAX_VALUE} (as specified by
   1521      *         {@link java.util.concurrent.BlockingQueue#remainingCapacity()
   1522      *         BlockingQueue.remainingCapacity})
   1523      */
   1524     public int remainingCapacity() {
   1525         return Integer.MAX_VALUE;
   1526     }
   1527 
   1528     /**
   1529      * Saves this queue to a stream (that is, serializes it).
   1530      *
   1531      * @param s the stream
   1532      * @throws java.io.IOException if an I/O error occurs
   1533      * @serialData All of the elements (each an {@code E}) in
   1534      * the proper order, followed by a null
   1535      */
   1536     private void writeObject(java.io.ObjectOutputStream s)
   1537         throws java.io.IOException {
   1538         s.defaultWriteObject();
   1539         for (E e : this)
   1540             s.writeObject(e);
   1541         // Use trailing null as sentinel
   1542         s.writeObject(null);
   1543     }
   1544 
   1545     /**
   1546      * Reconstitutes this queue from a stream (that is, deserializes it).
   1547      * @param s the stream
   1548      * @throws ClassNotFoundException if the class of a serialized object
   1549      *         could not be found
   1550      * @throws java.io.IOException if an I/O error occurs
   1551      */
   1552     private void readObject(java.io.ObjectInputStream s)
   1553         throws java.io.IOException, ClassNotFoundException {
   1554         s.defaultReadObject();
   1555         for (;;) {
   1556             @SuppressWarnings("unchecked")
   1557             E item = (E) s.readObject();
   1558             if (item == null)
   1559                 break;
   1560             else
   1561                 offer(item);
   1562         }
   1563     }
   1564 
   1565     // Unsafe mechanics
   1566 
   1567     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
   1568     private static final long HEAD;
   1569     private static final long TAIL;
   1570     private static final long SWEEPVOTES;
   1571     static {
   1572         try {
   1573             HEAD = U.objectFieldOffset
   1574                 (LinkedTransferQueue.class.getDeclaredField("head"));
   1575             TAIL = U.objectFieldOffset
   1576                 (LinkedTransferQueue.class.getDeclaredField("tail"));
   1577             SWEEPVOTES = U.objectFieldOffset
   1578                 (LinkedTransferQueue.class.getDeclaredField("sweepVotes"));
   1579         } catch (ReflectiveOperationException e) {
   1580             throw new Error(e);
   1581         }
   1582 
   1583         // Reduce the risk of rare disastrous classloading in first call to
   1584         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
   1585         Class<?> ensureLoaded = LockSupport.class;
   1586     }
   1587 }
   1588