Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      3  *
      4  * This code is free software; you can redistribute it and/or modify it
      5  * under the terms of the GNU General Public License version 2 only, as
      6  * published by the Free Software Foundation.  Oracle designates this
      7  * particular file as subject to the "Classpath" exception as provided
      8  * by Oracle in the LICENSE file that accompanied this code.
      9  *
     10  * This code is distributed in the hope that it will be useful, but WITHOUT
     11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     13  * version 2 for more details (a copy is included in the LICENSE file that
     14  * accompanied this code).
     15  *
     16  * You should have received a copy of the GNU General Public License version
     17  * 2 along with this work; if not, write to the Free Software Foundation,
     18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     19  *
     20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     21  * or visit www.oracle.com if you need additional information or have any
     22  * questions.
     23  */
     24 
     25 /*
     26  * This file is available under and governed by the GNU General Public
     27  * License version 2 only, as published by the Free Software Foundation.
     28  * However, the following notice accompanied the original version of this
     29  * file:
     30  *
     31  * Written by Doug Lea, Bill Scherer, and Michael Scott with
     32  * assistance from members of JCP JSR-166 Expert Group and released to
     33  * the public domain, as explained at
     34  * http://creativecommons.org/publicdomain/zero/1.0/
     35  */
     36 
     37 package java.util.concurrent;
     38 
     39 import java.util.AbstractQueue;
     40 import java.util.Collection;
     41 import java.util.Collections;
     42 import java.util.Iterator;
     43 import java.util.Spliterator;
     44 import java.util.Spliterators;
     45 import java.util.concurrent.locks.LockSupport;
     46 import java.util.concurrent.locks.ReentrantLock;
     47 
     48 // BEGIN android-note
     49 // removed link to collections framework docs
     50 // END android-note
     51 
     52 /**
     53  * A {@linkplain BlockingQueue blocking queue} in which each insert
     54  * operation must wait for a corresponding remove operation by another
     55  * thread, and vice versa.  A synchronous queue does not have any
     56  * internal capacity, not even a capacity of one.  You cannot
     57  * {@code peek} at a synchronous queue because an element is only
     58  * present when you try to remove it; you cannot insert an element
     59  * (using any method) unless another thread is trying to remove it;
     60  * you cannot iterate as there is nothing to iterate.  The
     61  * <em>head</em> of the queue is the element that the first queued
     62  * inserting thread is trying to add to the queue; if there is no such
     63  * queued thread then no element is available for removal and
     64  * {@code poll()} will return {@code null}.  For purposes of other
     65  * {@code Collection} methods (for example {@code contains}), a
     66  * {@code SynchronousQueue} acts as an empty collection.  This queue
     67  * does not permit {@code null} elements.
     68  *
     69  * <p>Synchronous queues are similar to rendezvous channels used in
     70  * CSP and Ada. They are well suited for handoff designs, in which an
     71  * object running in one thread must sync up with an object running
     72  * in another thread in order to hand it some information, event, or
     73  * task.
     74  *
     75  * <p>This class supports an optional fairness policy for ordering
     76  * waiting producer and consumer threads.  By default, this ordering
     77  * is not guaranteed. However, a queue constructed with fairness set
     78  * to {@code true} grants threads access in FIFO order.
     79  *
     80  * <p>This class and its iterator implement all of the
     81  * <em>optional</em> methods of the {@link Collection} and {@link
     82  * Iterator} interfaces.
     83  *
     84  * @since 1.5
     85  * @author Doug Lea and Bill Scherer and Michael Scott
     86  * @param <E> the type of elements held in this queue
     87  */
     88 public class SynchronousQueue<E> extends AbstractQueue<E>
     89     implements BlockingQueue<E>, java.io.Serializable {
     90     private static final long serialVersionUID = -3223113410248163686L;
     91 
     92     /*
     93      * This class implements extensions of the dual stack and dual
     94      * queue algorithms described in "Nonblocking Concurrent Objects
     95      * with Condition Synchronization", by W. N. Scherer III and
     96      * M. L. Scott.  18th Annual Conf. on Distributed Computing,
     97      * Oct. 2004 (see also
     98      * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
     99      * The (Lifo) stack is used for non-fair mode, and the (Fifo)
    100      * queue for fair mode. The performance of the two is generally
    101      * similar. Fifo usually supports higher throughput under
    102      * contention but Lifo maintains higher thread locality in common
    103      * applications.
    104      *
    105      * A dual queue (and similarly stack) is one that at any given
    106      * time either holds "data" -- items provided by put operations,
    107      * or "requests" -- slots representing take operations, or is
    108      * empty. A call to "fulfill" (i.e., a call requesting an item
    109      * from a queue holding data or vice versa) dequeues a
    110      * complementary node.  The most interesting feature of these
    111      * queues is that any operation can figure out which mode the
    112      * queue is in, and act accordingly without needing locks.
    113      *
    114      * Both the queue and stack extend abstract class Transferer
    115      * defining the single method transfer that does a put or a
    116      * take. These are unified into a single method because in dual
    117      * data structures, the put and take operations are symmetrical,
    118      * so nearly all code can be combined. The resulting transfer
    119      * methods are on the long side, but are easier to follow than
    120      * they would be if broken up into nearly-duplicated parts.
    121      *
    122      * The queue and stack data structures share many conceptual
    123      * similarities but very few concrete details. For simplicity,
    124      * they are kept distinct so that they can later evolve
    125      * separately.
    126      *
    127      * The algorithms here differ from the versions in the above paper
    128      * in extending them for use in synchronous queues, as well as
    129      * dealing with cancellation. The main differences include:
    130      *
    131      *  1. The original algorithms used bit-marked pointers, but
    132      *     the ones here use mode bits in nodes, leading to a number
    133      *     of further adaptations.
    134      *  2. SynchronousQueues must block threads waiting to become
    135      *     fulfilled.
    136      *  3. Support for cancellation via timeout and interrupts,
    137      *     including cleaning out cancelled nodes/threads
    138      *     from lists to avoid garbage retention and memory depletion.
    139      *
    140      * Blocking is mainly accomplished using LockSupport park/unpark,
    141      * except that nodes that appear to be the next ones to become
    142      * fulfilled first spin a bit (on multiprocessors only). On very
    143      * busy synchronous queues, spinning can dramatically improve
    144      * throughput. And on less busy ones, the amount of spinning is
    145      * small enough not to be noticeable.
    146      *
    147      * Cleaning is done in different ways in queues vs stacks.  For
    148      * queues, we can almost always remove a node immediately in O(1)
    149      * time (modulo retries for consistency checks) when it is
    150      * cancelled. But if it may be pinned as the current tail, it must
    151      * wait until some subsequent cancellation. For stacks, we need a
    152      * potentially O(n) traversal to be sure that we can remove the
    153      * node, but this can run concurrently with other threads
    154      * accessing the stack.
    155      *
    156      * While garbage collection takes care of most node reclamation
    157      * issues that otherwise complicate nonblocking algorithms, care
    158      * is taken to "forget" references to data, other nodes, and
    159      * threads that might be held on to long-term by blocked
    160      * threads. In cases where setting to null would otherwise
    161      * conflict with main algorithms, this is done by changing a
    162      * node's link to now point to the node itself. This doesn't arise
    163      * much for Stack nodes (because blocked threads do not hang on to
    164      * old head pointers), but references in Queue nodes must be
    165      * aggressively forgotten to avoid reachability of everything any
    166      * node has ever referred to since arrival.
    167      */
    168 
    169     /**
    170      * Shared internal API for dual stacks and queues.
    171      */
    172     abstract static class Transferer<E> {
    173         /**
    174          * Performs a put or take.
    175          *
    176          * @param e if non-null, the item to be handed to a consumer;
    177          *          if null, requests that transfer return an item
    178          *          offered by producer.
    179          * @param timed if this operation should timeout
    180          * @param nanos the timeout, in nanoseconds
    181          * @return if non-null, the item provided or received; if null,
    182          *         the operation failed due to timeout or interrupt --
    183          *         the caller can distinguish which of these occurred
    184          *         by checking Thread.interrupted.
    185          */
    186         abstract E transfer(E e, boolean timed, long nanos);
    187     }
    188 
    189     /**
    190      * The number of times to spin before blocking in timed waits.
    191      * The value is empirically derived -- it works well across a
    192      * variety of processors and OSes. Empirically, the best value
    193      * seems not to vary with number of CPUs (beyond 2) so is just
    194      * a constant.
    195      */
    196     static final int MAX_TIMED_SPINS =
    197         (Runtime.getRuntime().availableProcessors() < 2) ? 0 : 32;
    198 
    199     /**
    200      * The number of times to spin before blocking in untimed waits.
    201      * This is greater than timed value because untimed waits spin
    202      * faster since they don't need to check times on each spin.
    203      */
    204     static final int MAX_UNTIMED_SPINS = MAX_TIMED_SPINS * 16;
    205 
    206     /**
    207      * The number of nanoseconds for which it is faster to spin
    208      * rather than to use timed park. A rough estimate suffices.
    209      */
    210     static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
    211 
    212     /** Dual stack */
    213     static final class TransferStack<E> extends Transferer<E> {
    214         /*
    215          * This extends Scherer-Scott dual stack algorithm, differing,
    216          * among other ways, by using "covering" nodes rather than
    217          * bit-marked pointers: Fulfilling operations push on marker
    218          * nodes (with FULFILLING bit set in mode) to reserve a spot
    219          * to match a waiting node.
    220          */
    221 
    222         /* Modes for SNodes, ORed together in node fields */
    223         /** Node represents an unfulfilled consumer */
    224         static final int REQUEST    = 0;
    225         /** Node represents an unfulfilled producer */
    226         static final int DATA       = 1;
    227         /** Node is fulfilling another unfulfilled DATA or REQUEST */
    228         static final int FULFILLING = 2;
    229 
    230         /** Returns true if m has fulfilling bit set. */
    231         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
    232 
    233         /** Node class for TransferStacks. */
    234         static final class SNode {
    235             volatile SNode next;        // next node in stack
    236             volatile SNode match;       // the node matched to this
    237             volatile Thread waiter;     // to control park/unpark
    238             Object item;                // data; or null for REQUESTs
    239             int mode;
    240             // Note: item and mode fields don't need to be volatile
    241             // since they are always written before, and read after,
    242             // other volatile/atomic operations.
    243 
    244             SNode(Object item) {
    245                 this.item = item;
    246             }
    247 
    248             boolean casNext(SNode cmp, SNode val) {
    249                 return cmp == next &&
    250                     U.compareAndSwapObject(this, NEXT, cmp, val);
    251             }
    252 
    253             /**
    254              * Tries to match node s to this node, if so, waking up thread.
    255              * Fulfillers call tryMatch to identify their waiters.
    256              * Waiters block until they have been matched.
    257              *
    258              * @param s the node to match
    259              * @return true if successfully matched to s
    260              */
    261             boolean tryMatch(SNode s) {
    262                 if (match == null &&
    263                     U.compareAndSwapObject(this, MATCH, null, s)) {
    264                     Thread w = waiter;
    265                     if (w != null) {    // waiters need at most one unpark
    266                         waiter = null;
    267                         LockSupport.unpark(w);
    268                     }
    269                     return true;
    270                 }
    271                 return match == s;
    272             }
    273 
    274             /**
    275              * Tries to cancel a wait by matching node to itself.
    276              */
    277             void tryCancel() {
    278                 U.compareAndSwapObject(this, MATCH, null, this);
    279             }
    280 
    281             boolean isCancelled() {
    282                 return match == this;
    283             }
    284 
    285             // Unsafe mechanics
    286             private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    287             private static final long MATCH;
    288             private static final long NEXT;
    289 
    290             static {
    291                 try {
    292                     MATCH = U.objectFieldOffset
    293                         (SNode.class.getDeclaredField("match"));
    294                     NEXT = U.objectFieldOffset
    295                         (SNode.class.getDeclaredField("next"));
    296                 } catch (ReflectiveOperationException e) {
    297                     throw new Error(e);
    298                 }
    299             }
    300         }
    301 
    302         /** The head (top) of the stack */
    303         volatile SNode head;
    304 
    305         boolean casHead(SNode h, SNode nh) {
    306             return h == head &&
    307                 U.compareAndSwapObject(this, HEAD, h, nh);
    308         }
    309 
    310         /**
    311          * Creates or resets fields of a node. Called only from transfer
    312          * where the node to push on stack is lazily created and
    313          * reused when possible to help reduce intervals between reads
    314          * and CASes of head and to avoid surges of garbage when CASes
    315          * to push nodes fail due to contention.
    316          */
    317         static SNode snode(SNode s, Object e, SNode next, int mode) {
    318             if (s == null) s = new SNode(e);
    319             s.mode = mode;
    320             s.next = next;
    321             return s;
    322         }
    323 
    324         /**
    325          * Puts or takes an item.
    326          */
    327         @SuppressWarnings("unchecked")
    328         E transfer(E e, boolean timed, long nanos) {
    329             /*
    330              * Basic algorithm is to loop trying one of three actions:
    331              *
    332              * 1. If apparently empty or already containing nodes of same
    333              *    mode, try to push node on stack and wait for a match,
    334              *    returning it, or null if cancelled.
    335              *
    336              * 2. If apparently containing node of complementary mode,
    337              *    try to push a fulfilling node on to stack, match
    338              *    with corresponding waiting node, pop both from
    339              *    stack, and return matched item. The matching or
    340              *    unlinking might not actually be necessary because of
    341              *    other threads performing action 3:
    342              *
    343              * 3. If top of stack already holds another fulfilling node,
    344              *    help it out by doing its match and/or pop
    345              *    operations, and then continue. The code for helping
    346              *    is essentially the same as for fulfilling, except
    347              *    that it doesn't return the item.
    348              */
    349 
    350             SNode s = null; // constructed/reused as needed
    351             int mode = (e == null) ? REQUEST : DATA;
    352 
    353             for (;;) {
    354                 SNode h = head;
    355                 if (h == null || h.mode == mode) {  // empty or same-mode
    356                     if (timed && nanos <= 0L) {     // can't wait
    357                         if (h != null && h.isCancelled())
    358                             casHead(h, h.next);     // pop cancelled node
    359                         else
    360                             return null;
    361                     } else if (casHead(h, s = snode(s, e, h, mode))) {
    362                         SNode m = awaitFulfill(s, timed, nanos);
    363                         if (m == s) {               // wait was cancelled
    364                             clean(s);
    365                             return null;
    366                         }
    367                         if ((h = head) != null && h.next == s)
    368                             casHead(h, s.next);     // help s's fulfiller
    369                         return (E) ((mode == REQUEST) ? m.item : s.item);
    370                     }
    371                 } else if (!isFulfilling(h.mode)) { // try to fulfill
    372                     if (h.isCancelled())            // already cancelled
    373                         casHead(h, h.next);         // pop and retry
    374                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
    375                         for (;;) { // loop until matched or waiters disappear
    376                             SNode m = s.next;       // m is s's match
    377                             if (m == null) {        // all waiters are gone
    378                                 casHead(s, null);   // pop fulfill node
    379                                 s = null;           // use new node next time
    380                                 break;              // restart main loop
    381                             }
    382                             SNode mn = m.next;
    383                             if (m.tryMatch(s)) {
    384                                 casHead(s, mn);     // pop both s and m
    385                                 return (E) ((mode == REQUEST) ? m.item : s.item);
    386                             } else                  // lost match
    387                                 s.casNext(m, mn);   // help unlink
    388                         }
    389                     }
    390                 } else {                            // help a fulfiller
    391                     SNode m = h.next;               // m is h's match
    392                     if (m == null)                  // waiter is gone
    393                         casHead(h, null);           // pop fulfilling node
    394                     else {
    395                         SNode mn = m.next;
    396                         if (m.tryMatch(h))          // help match
    397                             casHead(h, mn);         // pop both h and m
    398                         else                        // lost match
    399                             h.casNext(m, mn);       // help unlink
    400                     }
    401                 }
    402             }
    403         }
    404 
    405         /**
    406          * Spins/blocks until node s is matched by a fulfill operation.
    407          *
    408          * @param s the waiting node
    409          * @param timed true if timed wait
    410          * @param nanos timeout value
    411          * @return matched node, or s if cancelled
    412          */
    413         SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    414             /*
    415              * When a node/thread is about to block, it sets its waiter
    416              * field and then rechecks state at least one more time
    417              * before actually parking, thus covering race vs
    418              * fulfiller noticing that waiter is non-null so should be
    419              * woken.
    420              *
    421              * When invoked by nodes that appear at the point of call
    422              * to be at the head of the stack, calls to park are
    423              * preceded by spins to avoid blocking when producers and
    424              * consumers are arriving very close in time.  This can
    425              * happen enough to bother only on multiprocessors.
    426              *
    427              * The order of checks for returning out of main loop
    428              * reflects fact that interrupts have precedence over
    429              * normal returns, which have precedence over
    430              * timeouts. (So, on timeout, one last check for match is
    431              * done before giving up.) Except that calls from untimed
    432              * SynchronousQueue.{poll/offer} don't check interrupts
    433              * and don't wait at all, so are trapped in transfer
    434              * method rather than calling awaitFulfill.
    435              */
    436             final long deadline = timed ? System.nanoTime() + nanos : 0L;
    437             Thread w = Thread.currentThread();
    438             int spins = shouldSpin(s)
    439                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
    440                 : 0;
    441             for (;;) {
    442                 if (w.isInterrupted())
    443                     s.tryCancel();
    444                 SNode m = s.match;
    445                 if (m != null)
    446                     return m;
    447                 if (timed) {
    448                     nanos = deadline - System.nanoTime();
    449                     if (nanos <= 0L) {
    450                         s.tryCancel();
    451                         continue;
    452                     }
    453                 }
    454                 if (spins > 0)
    455                     spins = shouldSpin(s) ? (spins - 1) : 0;
    456                 else if (s.waiter == null)
    457                     s.waiter = w; // establish waiter so can park next iter
    458                 else if (!timed)
    459                     LockSupport.park(this);
    460                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
    461                     LockSupport.parkNanos(this, nanos);
    462             }
    463         }
    464 
    465         /**
    466          * Returns true if node s is at head or there is an active
    467          * fulfiller.
    468          */
    469         boolean shouldSpin(SNode s) {
    470             SNode h = head;
    471             return (h == s || h == null || isFulfilling(h.mode));
    472         }
    473 
    474         /**
    475          * Unlinks s from the stack.
    476          */
    477         void clean(SNode s) {
    478             s.item = null;   // forget item
    479             s.waiter = null; // forget thread
    480 
    481             /*
    482              * At worst we may need to traverse entire stack to unlink
    483              * s. If there are multiple concurrent calls to clean, we
    484              * might not see s if another thread has already removed
    485              * it. But we can stop when we see any node known to
    486              * follow s. We use s.next unless it too is cancelled, in
    487              * which case we try the node one past. We don't check any
    488              * further because we don't want to doubly traverse just to
    489              * find sentinel.
    490              */
    491 
    492             SNode past = s.next;
    493             if (past != null && past.isCancelled())
    494                 past = past.next;
    495 
    496             // Absorb cancelled nodes at head
    497             SNode p;
    498             while ((p = head) != null && p != past && p.isCancelled())
    499                 casHead(p, p.next);
    500 
    501             // Unsplice embedded nodes
    502             while (p != null && p != past) {
    503                 SNode n = p.next;
    504                 if (n != null && n.isCancelled())
    505                     p.casNext(n, n.next);
    506                 else
    507                     p = n;
    508             }
    509         }
    510 
    511         // Unsafe mechanics
    512         private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    513         private static final long HEAD;
    514         static {
    515             try {
    516                 HEAD = U.objectFieldOffset
    517                     (TransferStack.class.getDeclaredField("head"));
    518             } catch (ReflectiveOperationException e) {
    519                 throw new Error(e);
    520             }
    521         }
    522     }
    523 
    524     /** Dual Queue */
    525     static final class TransferQueue<E> extends Transferer<E> {
    526         /*
    527          * This extends Scherer-Scott dual queue algorithm, differing,
    528          * among other ways, by using modes within nodes rather than
    529          * marked pointers. The algorithm is a little simpler than
    530          * that for stacks because fulfillers do not need explicit
    531          * nodes, and matching is done by CAS'ing QNode.item field
    532          * from non-null to null (for put) or vice versa (for take).
    533          */
    534 
    535         /** Node class for TransferQueue. */
    536         static final class QNode {
    537             volatile QNode next;          // next node in queue
    538             volatile Object item;         // CAS'ed to or from null
    539             volatile Thread waiter;       // to control park/unpark
    540             final boolean isData;
    541 
    542             QNode(Object item, boolean isData) {
    543                 this.item = item;
    544                 this.isData = isData;
    545             }
    546 
    547             boolean casNext(QNode cmp, QNode val) {
    548                 return next == cmp &&
    549                     U.compareAndSwapObject(this, NEXT, cmp, val);
    550             }
    551 
    552             boolean casItem(Object cmp, Object val) {
    553                 return item == cmp &&
    554                     U.compareAndSwapObject(this, ITEM, cmp, val);
    555             }
    556 
    557             /**
    558              * Tries to cancel by CAS'ing ref to this as item.
    559              */
    560             void tryCancel(Object cmp) {
    561                 U.compareAndSwapObject(this, ITEM, cmp, this);
    562             }
    563 
    564             boolean isCancelled() {
    565                 return item == this;
    566             }
    567 
    568             /**
    569              * Returns true if this node is known to be off the queue
    570              * because its next pointer has been forgotten due to
    571              * an advanceHead operation.
    572              */
    573             boolean isOffList() {
    574                 return next == this;
    575             }
    576 
    577             // Unsafe mechanics
    578             private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    579             private static final long ITEM;
    580             private static final long NEXT;
    581 
    582             static {
    583                 try {
    584                     ITEM = U.objectFieldOffset
    585                         (QNode.class.getDeclaredField("item"));
    586                     NEXT = U.objectFieldOffset
    587                         (QNode.class.getDeclaredField("next"));
    588                 } catch (ReflectiveOperationException e) {
    589                     throw new Error(e);
    590                 }
    591             }
    592         }
    593 
    594         /** Head of queue */
    595         transient volatile QNode head;
    596         /** Tail of queue */
    597         transient volatile QNode tail;
    598         /**
    599          * Reference to a cancelled node that might not yet have been
    600          * unlinked from queue because it was the last inserted node
    601          * when it was cancelled.
    602          */
    603         transient volatile QNode cleanMe;
    604 
    605         TransferQueue() {
    606             QNode h = new QNode(null, false); // initialize to dummy node.
    607             head = h;
    608             tail = h;
    609         }
    610 
    611         /**
    612          * Tries to cas nh as new head; if successful, unlink
    613          * old head's next node to avoid garbage retention.
    614          */
    615         void advanceHead(QNode h, QNode nh) {
    616             if (h == head &&
    617                 U.compareAndSwapObject(this, HEAD, h, nh))
    618                 h.next = h; // forget old next
    619         }
    620 
    621         /**
    622          * Tries to cas nt as new tail.
    623          */
    624         void advanceTail(QNode t, QNode nt) {
    625             if (tail == t)
    626                 U.compareAndSwapObject(this, TAIL, t, nt);
    627         }
    628 
    629         /**
    630          * Tries to CAS cleanMe slot.
    631          */
    632         boolean casCleanMe(QNode cmp, QNode val) {
    633             return cleanMe == cmp &&
    634                 U.compareAndSwapObject(this, CLEANME, cmp, val);
    635         }
    636 
    637         /**
    638          * Puts or takes an item.
    639          */
    640         @SuppressWarnings("unchecked")
    641         E transfer(E e, boolean timed, long nanos) {
    642             /* Basic algorithm is to loop trying to take either of
    643              * two actions:
    644              *
    645              * 1. If queue apparently empty or holding same-mode nodes,
    646              *    try to add node to queue of waiters, wait to be
    647              *    fulfilled (or cancelled) and return matching item.
    648              *
    649              * 2. If queue apparently contains waiting items, and this
    650              *    call is of complementary mode, try to fulfill by CAS'ing
    651              *    item field of waiting node and dequeuing it, and then
    652              *    returning matching item.
    653              *
    654              * In each case, along the way, check for and try to help
    655              * advance head and tail on behalf of other stalled/slow
    656              * threads.
    657              *
    658              * The loop starts off with a null check guarding against
    659              * seeing uninitialized head or tail values. This never
    660              * happens in current SynchronousQueue, but could if
    661              * callers held non-volatile/final ref to the
    662              * transferer. The check is here anyway because it places
    663              * null checks at top of loop, which is usually faster
    664              * than having them implicitly interspersed.
    665              */
    666 
    667             QNode s = null; // constructed/reused as needed
    668             boolean isData = (e != null);
    669 
    670             for (;;) {
    671                 QNode t = tail;
    672                 QNode h = head;
    673                 if (t == null || h == null)         // saw uninitialized value
    674                     continue;                       // spin
    675 
    676                 if (h == t || t.isData == isData) { // empty or same-mode
    677                     QNode tn = t.next;
    678                     if (t != tail)                  // inconsistent read
    679                         continue;
    680                     if (tn != null) {               // lagging tail
    681                         advanceTail(t, tn);
    682                         continue;
    683                     }
    684                     if (timed && nanos <= 0L)       // can't wait
    685                         return null;
    686                     if (s == null)
    687                         s = new QNode(e, isData);
    688                     if (!t.casNext(null, s))        // failed to link in
    689                         continue;
    690 
    691                     advanceTail(t, s);              // swing tail and wait
    692                     Object x = awaitFulfill(s, e, timed, nanos);
    693                     if (x == s) {                   // wait was cancelled
    694                         clean(t, s);
    695                         return null;
    696                     }
    697 
    698                     if (!s.isOffList()) {           // not already unlinked
    699                         advanceHead(t, s);          // unlink if head
    700                         if (x != null)              // and forget fields
    701                             s.item = s;
    702                         s.waiter = null;
    703                     }
    704                     return (x != null) ? (E)x : e;
    705 
    706                 } else {                            // complementary-mode
    707                     QNode m = h.next;               // node to fulfill
    708                     if (t != tail || m == null || h != head)
    709                         continue;                   // inconsistent read
    710 
    711                     Object x = m.item;
    712                     if (isData == (x != null) ||    // m already fulfilled
    713                         x == m ||                   // m cancelled
    714                         !m.casItem(x, e)) {         // lost CAS
    715                         advanceHead(h, m);          // dequeue and retry
    716                         continue;
    717                     }
    718 
    719                     advanceHead(h, m);              // successfully fulfilled
    720                     LockSupport.unpark(m.waiter);
    721                     return (x != null) ? (E)x : e;
    722                 }
    723             }
    724         }
    725 
    726         /**
    727          * Spins/blocks until node s is fulfilled.
    728          *
    729          * @param s the waiting node
    730          * @param e the comparison value for checking match
    731          * @param timed true if timed wait
    732          * @param nanos timeout value
    733          * @return matched item, or s if cancelled
    734          */
    735         Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    736             /* Same idea as TransferStack.awaitFulfill */
    737             final long deadline = timed ? System.nanoTime() + nanos : 0L;
    738             Thread w = Thread.currentThread();
    739             int spins = (head.next == s)
    740                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
    741                 : 0;
    742             for (;;) {
    743                 if (w.isInterrupted())
    744                     s.tryCancel(e);
    745                 Object x = s.item;
    746                 if (x != e)
    747                     return x;
    748                 if (timed) {
    749                     nanos = deadline - System.nanoTime();
    750                     if (nanos <= 0L) {
    751                         s.tryCancel(e);
    752                         continue;
    753                     }
    754                 }
    755                 if (spins > 0)
    756                     --spins;
    757                 else if (s.waiter == null)
    758                     s.waiter = w;
    759                 else if (!timed)
    760                     LockSupport.park(this);
    761                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
    762                     LockSupport.parkNanos(this, nanos);
    763             }
    764         }
    765 
    766         /**
    767          * Gets rid of cancelled node s with original predecessor pred.
    768          */
    769         void clean(QNode pred, QNode s) {
    770             s.waiter = null; // forget thread
    771             /*
    772              * At any given time, exactly one node on list cannot be
    773              * deleted -- the last inserted node. To accommodate this,
    774              * if we cannot delete s, we save its predecessor as
    775              * "cleanMe", deleting the previously saved version
    776              * first. At least one of node s or the node previously
    777              * saved can always be deleted, so this always terminates.
    778              */
    779             while (pred.next == s) { // Return early if already unlinked
    780                 QNode h = head;
    781                 QNode hn = h.next;   // Absorb cancelled first node as head
    782                 if (hn != null && hn.isCancelled()) {
    783                     advanceHead(h, hn);
    784                     continue;
    785                 }
    786                 QNode t = tail;      // Ensure consistent read for tail
    787                 if (t == h)
    788                     return;
    789                 QNode tn = t.next;
    790                 if (t != tail)
    791                     continue;
    792                 if (tn != null) {
    793                     advanceTail(t, tn);
    794                     continue;
    795                 }
    796                 if (s != t) {        // If not tail, try to unsplice
    797                     QNode sn = s.next;
    798                     if (sn == s || pred.casNext(s, sn))
    799                         return;
    800                 }
    801                 QNode dp = cleanMe;
    802                 if (dp != null) {    // Try unlinking previous cancelled node
    803                     QNode d = dp.next;
    804                     QNode dn;
    805                     if (d == null ||               // d is gone or
    806                         d == dp ||                 // d is off list or
    807                         !d.isCancelled() ||        // d not cancelled or
    808                         (d != t &&                 // d not tail and
    809                          (dn = d.next) != null &&  //   has successor
    810                          dn != d &&                //   that is on list
    811                          dp.casNext(d, dn)))       // d unspliced
    812                         casCleanMe(dp, null);
    813                     if (dp == pred)
    814                         return;      // s is already saved node
    815                 } else if (casCleanMe(null, pred))
    816                     return;          // Postpone cleaning s
    817             }
    818         }
    819 
    820         private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    821         private static final long HEAD;
    822         private static final long TAIL;
    823         private static final long CLEANME;
    824         static {
    825             try {
    826                 HEAD = U.objectFieldOffset
    827                     (TransferQueue.class.getDeclaredField("head"));
    828                 TAIL = U.objectFieldOffset
    829                     (TransferQueue.class.getDeclaredField("tail"));
    830                 CLEANME = U.objectFieldOffset
    831                     (TransferQueue.class.getDeclaredField("cleanMe"));
    832             } catch (ReflectiveOperationException e) {
    833                 throw new Error(e);
    834             }
    835         }
    836     }
    837 
    838     /**
    839      * The transferer. Set only in constructor, but cannot be declared
    840      * as final without further complicating serialization.  Since
    841      * this is accessed only at most once per public method, there
    842      * isn't a noticeable performance penalty for using volatile
    843      * instead of final here.
    844      */
    845     private transient volatile Transferer<E> transferer;
    846 
    847     /**
    848      * Creates a {@code SynchronousQueue} with nonfair access policy.
    849      */
    850     public SynchronousQueue() {
    851         this(false);
    852     }
    853 
    854     /**
    855      * Creates a {@code SynchronousQueue} with the specified fairness policy.
    856      *
    857      * @param fair if true, waiting threads contend in FIFO order for
    858      *        access; otherwise the order is unspecified.
    859      */
    860     public SynchronousQueue(boolean fair) {
    861         transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    862     }
    863 
    864     /**
    865      * Adds the specified element to this queue, waiting if necessary for
    866      * another thread to receive it.
    867      *
    868      * @throws InterruptedException {@inheritDoc}
    869      * @throws NullPointerException {@inheritDoc}
    870      */
    871     public void put(E e) throws InterruptedException {
    872         if (e == null) throw new NullPointerException();
    873         if (transferer.transfer(e, false, 0) == null) {
    874             Thread.interrupted();
    875             throw new InterruptedException();
    876         }
    877     }
    878 
    879     /**
    880      * Inserts the specified element into this queue, waiting if necessary
    881      * up to the specified wait time for another thread to receive it.
    882      *
    883      * @return {@code true} if successful, or {@code false} if the
    884      *         specified waiting time elapses before a consumer appears
    885      * @throws InterruptedException {@inheritDoc}
    886      * @throws NullPointerException {@inheritDoc}
    887      */
    888     public boolean offer(E e, long timeout, TimeUnit unit)
    889         throws InterruptedException {
    890         if (e == null) throw new NullPointerException();
    891         if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
    892             return true;
    893         if (!Thread.interrupted())
    894             return false;
    895         throw new InterruptedException();
    896     }
    897 
    898     /**
    899      * Inserts the specified element into this queue, if another thread is
    900      * waiting to receive it.
    901      *
    902      * @param e the element to add
    903      * @return {@code true} if the element was added to this queue, else
    904      *         {@code false}
    905      * @throws NullPointerException if the specified element is null
    906      */
    907     public boolean offer(E e) {
    908         if (e == null) throw new NullPointerException();
    909         return transferer.transfer(e, true, 0) != null;
    910     }
    911 
    912     /**
    913      * Retrieves and removes the head of this queue, waiting if necessary
    914      * for another thread to insert it.
    915      *
    916      * @return the head of this queue
    917      * @throws InterruptedException {@inheritDoc}
    918      */
    919     public E take() throws InterruptedException {
    920         E e = transferer.transfer(null, false, 0);
    921         if (e != null)
    922             return e;
    923         Thread.interrupted();
    924         throw new InterruptedException();
    925     }
    926 
    927     /**
    928      * Retrieves and removes the head of this queue, waiting
    929      * if necessary up to the specified wait time, for another thread
    930      * to insert it.
    931      *
    932      * @return the head of this queue, or {@code null} if the
    933      *         specified waiting time elapses before an element is present
    934      * @throws InterruptedException {@inheritDoc}
    935      */
    936     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    937         E e = transferer.transfer(null, true, unit.toNanos(timeout));
    938         if (e != null || !Thread.interrupted())
    939             return e;
    940         throw new InterruptedException();
    941     }
    942 
    943     /**
    944      * Retrieves and removes the head of this queue, if another thread
    945      * is currently making an element available.
    946      *
    947      * @return the head of this queue, or {@code null} if no
    948      *         element is available
    949      */
    950     public E poll() {
    951         return transferer.transfer(null, true, 0);
    952     }
    953 
    954     /**
    955      * Always returns {@code true}.
    956      * A {@code SynchronousQueue} has no internal capacity.
    957      *
    958      * @return {@code true}
    959      */
    960     public boolean isEmpty() {
    961         return true;
    962     }
    963 
    964     /**
    965      * Always returns zero.
    966      * A {@code SynchronousQueue} has no internal capacity.
    967      *
    968      * @return zero
    969      */
    970     public int size() {
    971         return 0;
    972     }
    973 
    974     /**
    975      * Always returns zero.
    976      * A {@code SynchronousQueue} has no internal capacity.
    977      *
    978      * @return zero
    979      */
    980     public int remainingCapacity() {
    981         return 0;
    982     }
    983 
    984     /**
    985      * Does nothing.
    986      * A {@code SynchronousQueue} has no internal capacity.
    987      */
    988     public void clear() {
    989     }
    990 
    991     /**
    992      * Always returns {@code false}.
    993      * A {@code SynchronousQueue} has no internal capacity.
    994      *
    995      * @param o the element
    996      * @return {@code false}
    997      */
    998     public boolean contains(Object o) {
    999         return false;
   1000     }
   1001 
   1002     /**
   1003      * Always returns {@code false}.
   1004      * A {@code SynchronousQueue} has no internal capacity.
   1005      *
   1006      * @param o the element to remove
   1007      * @return {@code false}
   1008      */
   1009     public boolean remove(Object o) {
   1010         return false;
   1011     }
   1012 
   1013     /**
   1014      * Returns {@code false} unless the given collection is empty.
   1015      * A {@code SynchronousQueue} has no internal capacity.
   1016      *
   1017      * @param c the collection
   1018      * @return {@code false} unless given collection is empty
   1019      */
   1020     public boolean containsAll(Collection<?> c) {
   1021         return c.isEmpty();
   1022     }
   1023 
   1024     /**
   1025      * Always returns {@code false}.
   1026      * A {@code SynchronousQueue} has no internal capacity.
   1027      *
   1028      * @param c the collection
   1029      * @return {@code false}
   1030      */
   1031     public boolean removeAll(Collection<?> c) {
   1032         return false;
   1033     }
   1034 
   1035     /**
   1036      * Always returns {@code false}.
   1037      * A {@code SynchronousQueue} has no internal capacity.
   1038      *
   1039      * @param c the collection
   1040      * @return {@code false}
   1041      */
   1042     public boolean retainAll(Collection<?> c) {
   1043         return false;
   1044     }
   1045 
   1046     /**
   1047      * Always returns {@code null}.
   1048      * A {@code SynchronousQueue} does not return elements
   1049      * unless actively waited on.
   1050      *
   1051      * @return {@code null}
   1052      */
   1053     public E peek() {
   1054         return null;
   1055     }
   1056 
   1057     /**
   1058      * Returns an empty iterator in which {@code hasNext} always returns
   1059      * {@code false}.
   1060      *
   1061      * @return an empty iterator
   1062      */
   1063     public Iterator<E> iterator() {
   1064         return Collections.emptyIterator();
   1065     }
   1066 
   1067     /**
   1068      * Returns an empty spliterator in which calls to
   1069      * {@link java.util.Spliterator#trySplit()} always return {@code null}.
   1070      *
   1071      * @return an empty spliterator
   1072      * @since 1.8
   1073      */
   1074     public Spliterator<E> spliterator() {
   1075         return Spliterators.emptySpliterator();
   1076     }
   1077 
   1078     /**
   1079      * Returns a zero-length array.
   1080      * @return a zero-length array
   1081      */
   1082     public Object[] toArray() {
   1083         return new Object[0];
   1084     }
   1085 
   1086     /**
   1087      * Sets the zeroth element of the specified array to {@code null}
   1088      * (if the array has non-zero length) and returns it.
   1089      *
   1090      * @param a the array
   1091      * @return the specified array
   1092      * @throws NullPointerException if the specified array is null
   1093      */
   1094     public <T> T[] toArray(T[] a) {
   1095         if (a.length > 0)
   1096             a[0] = null;
   1097         return a;
   1098     }
   1099 
   1100     /**
   1101      * Always returns {@code "[]"}.
   1102      * @return {@code "[]"}
   1103      */
   1104     public String toString() {
   1105         return "[]";
   1106     }
   1107 
   1108     /**
   1109      * @throws UnsupportedOperationException {@inheritDoc}
   1110      * @throws ClassCastException            {@inheritDoc}
   1111      * @throws NullPointerException          {@inheritDoc}
   1112      * @throws IllegalArgumentException      {@inheritDoc}
   1113      */
   1114     public int drainTo(Collection<? super E> c) {
   1115         if (c == null)
   1116             throw new NullPointerException();
   1117         if (c == this)
   1118             throw new IllegalArgumentException();
   1119         int n = 0;
   1120         for (E e; (e = poll()) != null;) {
   1121             c.add(e);
   1122             ++n;
   1123         }
   1124         return n;
   1125     }
   1126 
   1127     /**
   1128      * @throws UnsupportedOperationException {@inheritDoc}
   1129      * @throws ClassCastException            {@inheritDoc}
   1130      * @throws NullPointerException          {@inheritDoc}
   1131      * @throws IllegalArgumentException      {@inheritDoc}
   1132      */
   1133     public int drainTo(Collection<? super E> c, int maxElements) {
   1134         if (c == null)
   1135             throw new NullPointerException();
   1136         if (c == this)
   1137             throw new IllegalArgumentException();
   1138         int n = 0;
   1139         for (E e; n < maxElements && (e = poll()) != null;) {
   1140             c.add(e);
   1141             ++n;
   1142         }
   1143         return n;
   1144     }
   1145 
   1146     /*
   1147      * To cope with serialization strategy in the 1.5 version of
   1148      * SynchronousQueue, we declare some unused classes and fields
   1149      * that exist solely to enable serializability across versions.
   1150      * These fields are never used, so are initialized only if this
   1151      * object is ever serialized or deserialized.
   1152      */
   1153 
   1154     @SuppressWarnings("serial")
   1155     static class WaitQueue implements java.io.Serializable { }
   1156     static class LifoWaitQueue extends WaitQueue {
   1157         private static final long serialVersionUID = -3633113410248163686L;
   1158     }
   1159     static class FifoWaitQueue extends WaitQueue {
   1160         private static final long serialVersionUID = -3623113410248163686L;
   1161     }
   1162     private ReentrantLock qlock;
   1163     private WaitQueue waitingProducers;
   1164     private WaitQueue waitingConsumers;
   1165 
   1166     /**
   1167      * Saves this queue to a stream (that is, serializes it).
   1168      * @param s the stream
   1169      * @throws java.io.IOException if an I/O error occurs
   1170      */
   1171     private void writeObject(java.io.ObjectOutputStream s)
   1172         throws java.io.IOException {
   1173         boolean fair = transferer instanceof TransferQueue;
   1174         if (fair) {
   1175             qlock = new ReentrantLock(true);
   1176             waitingProducers = new FifoWaitQueue();
   1177             waitingConsumers = new FifoWaitQueue();
   1178         }
   1179         else {
   1180             qlock = new ReentrantLock();
   1181             waitingProducers = new LifoWaitQueue();
   1182             waitingConsumers = new LifoWaitQueue();
   1183         }
   1184         s.defaultWriteObject();
   1185     }
   1186 
   1187     /**
   1188      * Reconstitutes this queue from a stream (that is, deserializes it).
   1189      * @param s the stream
   1190      * @throws ClassNotFoundException if the class of a serialized object
   1191      *         could not be found
   1192      * @throws java.io.IOException if an I/O error occurs
   1193      */
   1194     private void readObject(java.io.ObjectInputStream s)
   1195         throws java.io.IOException, ClassNotFoundException {
   1196         s.defaultReadObject();
   1197         if (waitingProducers instanceof FifoWaitQueue)
   1198             transferer = new TransferQueue<E>();
   1199         else
   1200             transferer = new TransferStack<E>();
   1201     }
   1202 
   1203     static {
   1204         // Reduce the risk of rare disastrous classloading in first call to
   1205         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
   1206         Class<?> ensureLoaded = LockSupport.class;
   1207     }
   1208 }
   1209