Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Written by Doug Lea, Bill Scherer, and Michael Scott with
      3  * assistance from members of JCP JSR-166 Expert Group and released to
      4  * the public domain, as explained at
      5  * http://creativecommons.org/licenses/publicdomain
      6  */
      7 
      8 package java.util.concurrent;
      9 import java.util.concurrent.locks.*;
     10 import java.util.*;
     11 import libcore.util.EmptyArray;
     12 
     13 // BEGIN android-note
     14 // removed link to collections framework docs
     15 // END android-note
     16 
     17 /**
     18  * A {@linkplain BlockingQueue blocking queue} in which each insert
     19  * operation must wait for a corresponding remove operation by another
     20  * thread, and vice versa.  A synchronous queue does not have any
     21  * internal capacity, not even a capacity of one.  You cannot
     22  * <tt>peek</tt> at a synchronous queue because an element is only
     23  * present when you try to remove it; you cannot insert an element
     24  * (using any method) unless another thread is trying to remove it;
     25  * you cannot iterate as there is nothing to iterate.  The
     26  * <em>head</em> of the queue is the element that the first queued
     27  * inserting thread is trying to add to the queue; if there is no such
     28  * queued thread then no element is available for removal and
     29  * <tt>poll()</tt> will return <tt>null</tt>.  For purposes of other
     30  * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
     31  * <tt>SynchronousQueue</tt> acts as an empty collection.  This queue
     32  * does not permit <tt>null</tt> elements.
     33  *
     34  * <p>Synchronous queues are similar to rendezvous channels used in
     35  * CSP and Ada. They are well suited for handoff designs, in which an
     36  * object running in one thread must sync up with an object running
     37  * in another thread in order to hand it some information, event, or
     38  * task.
     39  *
     40  * <p> This class supports an optional fairness policy for ordering
     41  * waiting producer and consumer threads.  By default, this ordering
     42  * is not guaranteed. However, a queue constructed with fairness set
     43  * to <tt>true</tt> grants threads access in FIFO order.
     44  *
     45  * <p>This class and its iterator implement all of the
     46  * <em>optional</em> methods of the {@link Collection} and {@link
     47  * Iterator} interfaces.
     48  *
     49  * @since 1.5
     50  * @author Doug Lea and Bill Scherer and Michael Scott
     51  * @param <E> the type of elements held in this collection
     52  */
     53 public class SynchronousQueue<E> extends AbstractQueue<E>
     54     implements BlockingQueue<E>, java.io.Serializable {
     55     private static final long serialVersionUID = -3223113410248163686L;
     56 
     57     /*
     58      * This class implements extensions of the dual stack and dual
     59      * queue algorithms described in "Nonblocking Concurrent Objects
     60      * with Condition Synchronization", by W. N. Scherer III and
     61      * M. L. Scott.  18th Annual Conf. on Distributed Computing,
     62      * Oct. 2004 (see also
     63      * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
     64      * The (Lifo) stack is used for non-fair mode, and the (Fifo)
     65      * queue for fair mode. The performance of the two is generally
     66      * similar. Fifo usually supports higher throughput under
     67      * contention but Lifo maintains higher thread locality in common
     68      * applications.
     69      *
     70      * A dual queue (and similarly stack) is one that at any given
     71      * time either holds "data" -- items provided by put operations,
     72      * or "requests" -- slots representing take operations, or is
     73      * empty. A call to "fulfill" (i.e., a call requesting an item
     74      * from a queue holding data or vice versa) dequeues a
     75      * complementary node.  The most interesting feature of these
     76      * queues is that any operation can figure out which mode the
     77      * queue is in, and act accordingly without needing locks.
     78      *
     79      * Both the queue and stack extend abstract class Transferer
     80      * defining the single method transfer that does a put or a
     81      * take. These are unified into a single method because in dual
     82      * data structures, the put and take operations are symmetrical,
     83      * so nearly all code can be combined. The resulting transfer
     84      * methods are on the long side, but are easier to follow than
     85      * they would be if broken up into nearly-duplicated parts.
     86      *
     87      * The queue and stack data structures share many conceptual
     88      * similarities but very few concrete details. For simplicity,
     89      * they are kept distinct so that they can later evolve
     90      * separately.
     91      *
     92      * The algorithms here differ from the versions in the above paper
     93      * in extending them for use in synchronous queues, as well as
     94      * dealing with cancellation. The main differences include:
     95      *
     96      *  1. The original algorithms used bit-marked pointers, but
     97      *     the ones here use mode bits in nodes, leading to a number
     98      *     of further adaptations.
     99      *  2. SynchronousQueues must block threads waiting to become
    100      *     fulfilled.
    101      *  3. Support for cancellation via timeout and interrupts,
    102      *     including cleaning out cancelled nodes/threads
    103      *     from lists to avoid garbage retention and memory depletion.
    104      *
    105      * Blocking is mainly accomplished using LockSupport park/unpark,
    106      * except that nodes that appear to be the next ones to become
    107      * fulfilled first spin a bit (on multiprocessors only). On very
    108      * busy synchronous queues, spinning can dramatically improve
    109      * throughput. And on less busy ones, the amount of spinning is
    110      * small enough not to be noticeable.
    111      *
    112      * Cleaning is done in different ways in queues vs stacks.  For
    113      * queues, we can almost always remove a node immediately in O(1)
    114      * time (modulo retries for consistency checks) when it is
    115      * cancelled. But if it may be pinned as the current tail, it must
    116      * wait until some subsequent cancellation. For stacks, we need a
    117      * potentially O(n) traversal to be sure that we can remove the
    118      * node, but this can run concurrently with other threads
    119      * accessing the stack.
    120      *
    121      * While garbage collection takes care of most node reclamation
    122      * issues that otherwise complicate nonblocking algorithms, care
    123      * is taken to "forget" references to data, other nodes, and
    124      * threads that might be held on to long-term by blocked
    125      * threads. In cases where setting to null would otherwise
    126      * conflict with main algorithms, this is done by changing a
    127      * node's link to now point to the node itself. This doesn't arise
    128      * much for Stack nodes (because blocked threads do not hang on to
    129      * old head pointers), but references in Queue nodes must be
    130      * aggressively forgotten to avoid reachability of everything any
    131      * node has ever referred to since arrival.
    132      */
    133 
    134     /**
    135      * Shared internal API for dual stacks and queues.
    136      */
    137     abstract static class Transferer {
    138         /**
    139          * Performs a put or take.
    140          *
    141          * @param e if non-null, the item to be handed to a consumer;
    142          *          if null, requests that transfer return an item
    143          *          offered by producer.
    144          * @param timed if this operation should timeout
    145          * @param nanos the timeout, in nanoseconds
    146          * @return if non-null, the item provided or received; if null,
    147          *         the operation failed due to timeout or interrupt --
    148          *         the caller can distinguish which of these occurred
    149          *         by checking Thread.interrupted.
    150          */
    151         abstract Object transfer(Object e, boolean timed, long nanos);
    152     }
    153 
    154     /** The number of CPUs, for spin control */
    155     static final int NCPUS = Runtime.getRuntime().availableProcessors();
    156 
    157     /**
    158      * The number of times to spin before blocking in timed waits.
    159      * The value is empirically derived -- it works well across a
    160      * variety of processors and OSes. Empirically, the best value
    161      * seems not to vary with number of CPUs (beyond 2) so is just
    162      * a constant.
    163      */
    164     static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
    165 
    166     /**
    167      * The number of times to spin before blocking in untimed waits.
    168      * This is greater than timed value because untimed waits spin
    169      * faster since they don't need to check times on each spin.
    170      */
    171     static final int maxUntimedSpins = maxTimedSpins * 16;
    172 
    173     /**
    174      * The number of nanoseconds for which it is faster to spin
    175      * rather than to use timed park. A rough estimate suffices.
    176      */
    177     static final long spinForTimeoutThreshold = 1000L;
    178 
    179     /** Dual stack */
    180     static final class TransferStack extends Transferer {
    181         /*
    182          * This extends Scherer-Scott dual stack algorithm, differing,
    183          * among other ways, by using "covering" nodes rather than
    184          * bit-marked pointers: Fulfilling operations push on marker
    185          * nodes (with FULFILLING bit set in mode) to reserve a spot
    186          * to match a waiting node.
    187          */
    188 
    189         /* Modes for SNodes, ORed together in node fields */
    190         /** Node represents an unfulfilled consumer */
    191         static final int REQUEST    = 0;
    192         /** Node represents an unfulfilled producer */
    193         static final int DATA       = 1;
    194         /** Node is fulfilling another unfulfilled DATA or REQUEST */
    195         static final int FULFILLING = 2;
    196 
    197         /** Return true if m has fulfilling bit set */
    198         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
    199 
    200         /** Node class for TransferStacks. */
    201         static final class SNode {
    202             volatile SNode next;        // next node in stack
    203             volatile SNode match;       // the node matched to this
    204             volatile Thread waiter;     // to control park/unpark
    205             Object item;                // data; or null for REQUESTs
    206             int mode;
    207             // Note: item and mode fields don't need to be volatile
    208             // since they are always written before, and read after,
    209             // other volatile/atomic operations.
    210 
    211             SNode(Object item) {
    212                 this.item = item;
    213             }
    214 
    215             boolean casNext(SNode cmp, SNode val) {
    216                 return cmp == next &&
    217                     UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    218             }
    219 
    220             /**
    221              * Tries to match node s to this node, if so, waking up thread.
    222              * Fulfillers call tryMatch to identify their waiters.
    223              * Waiters block until they have been matched.
    224              *
    225              * @param s the node to match
    226              * @return true if successfully matched to s
    227              */
    228             boolean tryMatch(SNode s) {
    229                 if (match == null &&
    230                     UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
    231                     Thread w = waiter;
    232                     if (w != null) {    // waiters need at most one unpark
    233                         waiter = null;
    234                         LockSupport.unpark(w);
    235                     }
    236                     return true;
    237                 }
    238                 return match == s;
    239             }
    240 
    241             /**
    242              * Tries to cancel a wait by matching node to itself.
    243              */
    244             void tryCancel() {
    245                 UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
    246             }
    247 
    248             boolean isCancelled() {
    249                 return match == this;
    250             }
    251 
    252             // Unsafe mechanics
    253             private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
    254             private static final long nextOffset =
    255                 objectFieldOffset(UNSAFE, "next", SNode.class);
    256             private static final long matchOffset =
    257                 objectFieldOffset(UNSAFE, "match", SNode.class);
    258 
    259         }
    260 
    261         /** The head (top) of the stack */
    262         volatile SNode head;
    263 
    264         boolean casHead(SNode h, SNode nh) {
    265             return h == head &&
    266                 UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
    267         }
    268 
    269         /**
    270          * Creates or resets fields of a node. Called only from transfer
    271          * where the node to push on stack is lazily created and
    272          * reused when possible to help reduce intervals between reads
    273          * and CASes of head and to avoid surges of garbage when CASes
    274          * to push nodes fail due to contention.
    275          */
    276         static SNode snode(SNode s, Object e, SNode next, int mode) {
    277             if (s == null) s = new SNode(e);
    278             s.mode = mode;
    279             s.next = next;
    280             return s;
    281         }
    282 
    283         /**
    284          * Puts or takes an item.
    285          */
    286         Object transfer(Object e, boolean timed, long nanos) {
    287             /*
    288              * Basic algorithm is to loop trying one of three actions:
    289              *
    290              * 1. If apparently empty or already containing nodes of same
    291              *    mode, try to push node on stack and wait for a match,
    292              *    returning it, or null if cancelled.
    293              *
    294              * 2. If apparently containing node of complementary mode,
    295              *    try to push a fulfilling node on to stack, match
    296              *    with corresponding waiting node, pop both from
    297              *    stack, and return matched item. The matching or
    298              *    unlinking might not actually be necessary because of
    299              *    other threads performing action 3:
    300              *
    301              * 3. If top of stack already holds another fulfilling node,
    302              *    help it out by doing its match and/or pop
    303              *    operations, and then continue. The code for helping
    304              *    is essentially the same as for fulfilling, except
    305              *    that it doesn't return the item.
    306              */
    307 
    308             SNode s = null; // constructed/reused as needed
    309             int mode = (e == null) ? REQUEST : DATA;
    310 
    311             for (;;) {
    312                 SNode h = head;
    313                 if (h == null || h.mode == mode) {  // empty or same-mode
    314                     if (timed && nanos <= 0) {      // can't wait
    315                         if (h != null && h.isCancelled())
    316                             casHead(h, h.next);     // pop cancelled node
    317                         else
    318                             return null;
    319                     } else if (casHead(h, s = snode(s, e, h, mode))) {
    320                         SNode m = awaitFulfill(s, timed, nanos);
    321                         if (m == s) {               // wait was cancelled
    322                             clean(s);
    323                             return null;
    324                         }
    325                         if ((h = head) != null && h.next == s)
    326                             casHead(h, s.next);     // help s's fulfiller
    327                         return (mode == REQUEST) ? m.item : s.item;
    328                     }
    329                 } else if (!isFulfilling(h.mode)) { // try to fulfill
    330                     if (h.isCancelled())            // already cancelled
    331                         casHead(h, h.next);         // pop and retry
    332                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
    333                         for (;;) { // loop until matched or waiters disappear
    334                             SNode m = s.next;       // m is s's match
    335                             if (m == null) {        // all waiters are gone
    336                                 casHead(s, null);   // pop fulfill node
    337                                 s = null;           // use new node next time
    338                                 break;              // restart main loop
    339                             }
    340                             SNode mn = m.next;
    341                             if (m.tryMatch(s)) {
    342                                 casHead(s, mn);     // pop both s and m
    343                                 return (mode == REQUEST) ? m.item : s.item;
    344                             } else                  // lost match
    345                                 s.casNext(m, mn);   // help unlink
    346                         }
    347                     }
    348                 } else {                            // help a fulfiller
    349                     SNode m = h.next;               // m is h's match
    350                     if (m == null)                  // waiter is gone
    351                         casHead(h, null);           // pop fulfilling node
    352                     else {
    353                         SNode mn = m.next;
    354                         if (m.tryMatch(h))          // help match
    355                             casHead(h, mn);         // pop both h and m
    356                         else                        // lost match
    357                             h.casNext(m, mn);       // help unlink
    358                     }
    359                 }
    360             }
    361         }
    362 
    363         /**
    364          * Spins/blocks until node s is matched by a fulfill operation.
    365          *
    366          * @param s the waiting node
    367          * @param timed true if timed wait
    368          * @param nanos timeout value
    369          * @return matched node, or s if cancelled
    370          */
    371         SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    372             /*
    373              * When a node/thread is about to block, it sets its waiter
    374              * field and then rechecks state at least one more time
    375              * before actually parking, thus covering race vs
    376              * fulfiller noticing that waiter is non-null so should be
    377              * woken.
    378              *
    379              * When invoked by nodes that appear at the point of call
    380              * to be at the head of the stack, calls to park are
    381              * preceded by spins to avoid blocking when producers and
    382              * consumers are arriving very close in time.  This can
    383              * happen enough to bother only on multiprocessors.
    384              *
    385              * The order of checks for returning out of main loop
    386              * reflects fact that interrupts have precedence over
    387              * normal returns, which have precedence over
    388              * timeouts. (So, on timeout, one last check for match is
    389              * done before giving up.) Except that calls from untimed
    390              * SynchronousQueue.{poll/offer} don't check interrupts
    391              * and don't wait at all, so are trapped in transfer
    392              * method rather than calling awaitFulfill.
    393              */
    394             long lastTime = timed ? System.nanoTime() : 0;
    395             Thread w = Thread.currentThread();
    396             SNode h = head;
    397             int spins = (shouldSpin(s) ?
    398                          (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    399             for (;;) {
    400                 if (w.isInterrupted())
    401                     s.tryCancel();
    402                 SNode m = s.match;
    403                 if (m != null)
    404                     return m;
    405                 if (timed) {
    406                     long now = System.nanoTime();
    407                     nanos -= now - lastTime;
    408                     lastTime = now;
    409                     if (nanos <= 0) {
    410                         s.tryCancel();
    411                         continue;
    412                     }
    413                 }
    414                 if (spins > 0)
    415                     spins = shouldSpin(s) ? (spins-1) : 0;
    416                 else if (s.waiter == null)
    417                     s.waiter = w; // establish waiter so can park next iter
    418                 else if (!timed)
    419                     LockSupport.park(this);
    420                 else if (nanos > spinForTimeoutThreshold)
    421                     LockSupport.parkNanos(this, nanos);
    422             }
    423         }
    424 
    425         /**
    426          * Returns true if node s is at head or there is an active
    427          * fulfiller.
    428          */
    429         boolean shouldSpin(SNode s) {
    430             SNode h = head;
    431             return (h == s || h == null || isFulfilling(h.mode));
    432         }
    433 
    434         /**
    435          * Unlinks s from the stack.
    436          */
    437         void clean(SNode s) {
    438             s.item = null;   // forget item
    439             s.waiter = null; // forget thread
    440 
    441             /*
    442              * At worst we may need to traverse entire stack to unlink
    443              * s. If there are multiple concurrent calls to clean, we
    444              * might not see s if another thread has already removed
    445              * it. But we can stop when we see any node known to
    446              * follow s. We use s.next unless it too is cancelled, in
    447              * which case we try the node one past. We don't check any
    448              * further because we don't want to doubly traverse just to
    449              * find sentinel.
    450              */
    451 
    452             SNode past = s.next;
    453             if (past != null && past.isCancelled())
    454                 past = past.next;
    455 
    456             // Absorb cancelled nodes at head
    457             SNode p;
    458             while ((p = head) != null && p != past && p.isCancelled())
    459                 casHead(p, p.next);
    460 
    461             // Unsplice embedded nodes
    462             while (p != null && p != past) {
    463                 SNode n = p.next;
    464                 if (n != null && n.isCancelled())
    465                     p.casNext(n, n.next);
    466                 else
    467                     p = n;
    468             }
    469         }
    470 
    471         // Unsafe mechanics
    472         private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
    473         private static final long headOffset =
    474             objectFieldOffset(UNSAFE, "head", TransferStack.class);
    475 
    476     }
    477 
    478     /** Dual Queue */
    479     static final class TransferQueue extends Transferer {
    480         /*
    481          * This extends Scherer-Scott dual queue algorithm, differing,
    482          * among other ways, by using modes within nodes rather than
    483          * marked pointers. The algorithm is a little simpler than
    484          * that for stacks because fulfillers do not need explicit
    485          * nodes, and matching is done by CAS'ing QNode.item field
    486          * from non-null to null (for put) or vice versa (for take).
    487          */
    488 
    489         /** Node class for TransferQueue. */
    490         static final class QNode {
    491             volatile QNode next;          // next node in queue
    492             volatile Object item;         // CAS'ed to or from null
    493             volatile Thread waiter;       // to control park/unpark
    494             final boolean isData;
    495 
    496             QNode(Object item, boolean isData) {
    497                 this.item = item;
    498                 this.isData = isData;
    499             }
    500 
    501             boolean casNext(QNode cmp, QNode val) {
    502                 return next == cmp &&
    503                     UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    504             }
    505 
    506             boolean casItem(Object cmp, Object val) {
    507                 return item == cmp &&
    508                     UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    509             }
    510 
    511             /**
    512              * Tries to cancel by CAS'ing ref to this as item.
    513              */
    514             void tryCancel(Object cmp) {
    515                 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
    516             }
    517 
    518             boolean isCancelled() {
    519                 return item == this;
    520             }
    521 
    522             /**
    523              * Returns true if this node is known to be off the queue
    524              * because its next pointer has been forgotten due to
    525              * an advanceHead operation.
    526              */
    527             boolean isOffList() {
    528                 return next == this;
    529             }
    530 
    531             // Unsafe mechanics
    532             private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
    533             private static final long nextOffset =
    534                 objectFieldOffset(UNSAFE, "next", QNode.class);
    535             private static final long itemOffset =
    536                 objectFieldOffset(UNSAFE, "item", QNode.class);
    537         }
    538 
    539         /** Head of queue */
    540         transient volatile QNode head;
    541         /** Tail of queue */
    542         transient volatile QNode tail;
    543         /**
    544          * Reference to a cancelled node that might not yet have been
    545          * unlinked from queue because it was the last inserted node
    546          * when it cancelled.
    547          */
    548         transient volatile QNode cleanMe;
    549 
    550         TransferQueue() {
    551             QNode h = new QNode(null, false); // initialize to dummy node.
    552             head = h;
    553             tail = h;
    554         }
    555 
    556         /**
    557          * Tries to cas nh as new head; if successful, unlink
    558          * old head's next node to avoid garbage retention.
    559          */
    560         void advanceHead(QNode h, QNode nh) {
    561             if (h == head &&
    562                 UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
    563                 h.next = h; // forget old next
    564         }
    565 
    566         /**
    567          * Tries to cas nt as new tail.
    568          */
    569         void advanceTail(QNode t, QNode nt) {
    570             if (tail == t)
    571                 UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
    572         }
    573 
    574         /**
    575          * Tries to CAS cleanMe slot.
    576          */
    577         boolean casCleanMe(QNode cmp, QNode val) {
    578             return cleanMe == cmp &&
    579                 UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
    580         }
    581 
    582         /**
    583          * Puts or takes an item.
    584          */
    585         Object transfer(Object e, boolean timed, long nanos) {
    586             /* Basic algorithm is to loop trying to take either of
    587              * two actions:
    588              *
    589              * 1. If queue apparently empty or holding same-mode nodes,
    590              *    try to add node to queue of waiters, wait to be
    591              *    fulfilled (or cancelled) and return matching item.
    592              *
    593              * 2. If queue apparently contains waiting items, and this
    594              *    call is of complementary mode, try to fulfill by CAS'ing
    595              *    item field of waiting node and dequeuing it, and then
    596              *    returning matching item.
    597              *
    598              * In each case, along the way, check for and try to help
    599              * advance head and tail on behalf of other stalled/slow
    600              * threads.
    601              *
    602              * The loop starts off with a null check guarding against
    603              * seeing uninitialized head or tail values. This never
    604              * happens in current SynchronousQueue, but could if
    605              * callers held non-volatile/final ref to the
    606              * transferer. The check is here anyway because it places
    607              * null checks at top of loop, which is usually faster
    608              * than having them implicitly interspersed.
    609              */
    610 
    611             QNode s = null; // constructed/reused as needed
    612             boolean isData = (e != null);
    613 
    614             for (;;) {
    615                 QNode t = tail;
    616                 QNode h = head;
    617                 if (t == null || h == null)         // saw uninitialized value
    618                     continue;                       // spin
    619 
    620                 if (h == t || t.isData == isData) { // empty or same-mode
    621                     QNode tn = t.next;
    622                     if (t != tail)                  // inconsistent read
    623                         continue;
    624                     if (tn != null) {               // lagging tail
    625                         advanceTail(t, tn);
    626                         continue;
    627                     }
    628                     if (timed && nanos <= 0)        // can't wait
    629                         return null;
    630                     if (s == null)
    631                         s = new QNode(e, isData);
    632                     if (!t.casNext(null, s))        // failed to link in
    633                         continue;
    634 
    635                     advanceTail(t, s);              // swing tail and wait
    636                     Object x = awaitFulfill(s, e, timed, nanos);
    637                     if (x == s) {                   // wait was cancelled
    638                         clean(t, s);
    639                         return null;
    640                     }
    641 
    642                     if (!s.isOffList()) {           // not already unlinked
    643                         advanceHead(t, s);          // unlink if head
    644                         if (x != null)              // and forget fields
    645                             s.item = s;
    646                         s.waiter = null;
    647                     }
    648                     return (x != null) ? x : e;
    649 
    650                 } else {                            // complementary-mode
    651                     QNode m = h.next;               // node to fulfill
    652                     if (t != tail || m == null || h != head)
    653                         continue;                   // inconsistent read
    654 
    655                     Object x = m.item;
    656                     if (isData == (x != null) ||    // m already fulfilled
    657                         x == m ||                   // m cancelled
    658                         !m.casItem(x, e)) {         // lost CAS
    659                         advanceHead(h, m);          // dequeue and retry
    660                         continue;
    661                     }
    662 
    663                     advanceHead(h, m);              // successfully fulfilled
    664                     LockSupport.unpark(m.waiter);
    665                     return (x != null) ? x : e;
    666                 }
    667             }
    668         }
    669 
    670         /**
    671          * Spins/blocks until node s is fulfilled.
    672          *
    673          * @param s the waiting node
    674          * @param e the comparison value for checking match
    675          * @param timed true if timed wait
    676          * @param nanos timeout value
    677          * @return matched item, or s if cancelled
    678          */
    679         Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
    680             /* Same idea as TransferStack.awaitFulfill */
    681             long lastTime = timed ? System.nanoTime() : 0;
    682             Thread w = Thread.currentThread();
    683             int spins = ((head.next == s) ?
    684                          (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    685             for (;;) {
    686                 if (w.isInterrupted())
    687                     s.tryCancel(e);
    688                 Object x = s.item;
    689                 if (x != e)
    690                     return x;
    691                 if (timed) {
    692                     long now = System.nanoTime();
    693                     nanos -= now - lastTime;
    694                     lastTime = now;
    695                     if (nanos <= 0) {
    696                         s.tryCancel(e);
    697                         continue;
    698                     }
    699                 }
    700                 if (spins > 0)
    701                     --spins;
    702                 else if (s.waiter == null)
    703                     s.waiter = w;
    704                 else if (!timed)
    705                     LockSupport.park(this);
    706                 else if (nanos > spinForTimeoutThreshold)
    707                     LockSupport.parkNanos(this, nanos);
    708             }
    709         }
    710 
    711         /**
    712          * Gets rid of cancelled node s with original predecessor pred.
    713          */
    714         void clean(QNode pred, QNode s) {
    715             s.waiter = null; // forget thread
    716             /*
    717              * At any given time, exactly one node on list cannot be
    718              * deleted -- the last inserted node. To accommodate this,
    719              * if we cannot delete s, we save its predecessor as
    720              * "cleanMe", deleting the previously saved version
    721              * first. At least one of node s or the node previously
    722              * saved can always be deleted, so this always terminates.
    723              */
    724             while (pred.next == s) { // Return early if already unlinked
    725                 QNode h = head;
    726                 QNode hn = h.next;   // Absorb cancelled first node as head
    727                 if (hn != null && hn.isCancelled()) {
    728                     advanceHead(h, hn);
    729                     continue;
    730                 }
    731                 QNode t = tail;      // Ensure consistent read for tail
    732                 if (t == h)
    733                     return;
    734                 QNode tn = t.next;
    735                 if (t != tail)
    736                     continue;
    737                 if (tn != null) {
    738                     advanceTail(t, tn);
    739                     continue;
    740                 }
    741                 if (s != t) {        // If not tail, try to unsplice
    742                     QNode sn = s.next;
    743                     if (sn == s || pred.casNext(s, sn))
    744                         return;
    745                 }
    746                 QNode dp = cleanMe;
    747                 if (dp != null) {    // Try unlinking previous cancelled node
    748                     QNode d = dp.next;
    749                     QNode dn;
    750                     if (d == null ||               // d is gone or
    751                         d == dp ||                 // d is off list or
    752                         !d.isCancelled() ||        // d not cancelled or
    753                         (d != t &&                 // d not tail and
    754                          (dn = d.next) != null &&  //   has successor
    755                          dn != d &&                //   that is on list
    756                          dp.casNext(d, dn)))       // d unspliced
    757                         casCleanMe(dp, null);
    758                     if (dp == pred)
    759                         return;      // s is already saved node
    760                 } else if (casCleanMe(null, pred))
    761                     return;          // Postpone cleaning s
    762             }
    763         }
    764 
    765         // unsafe mechanics
    766         private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
    767         private static final long headOffset =
    768             objectFieldOffset(UNSAFE, "head", TransferQueue.class);
    769         private static final long tailOffset =
    770             objectFieldOffset(UNSAFE, "tail", TransferQueue.class);
    771         private static final long cleanMeOffset =
    772             objectFieldOffset(UNSAFE, "cleanMe", TransferQueue.class);
    773 
    774     }
    775 
    776     /**
    777      * The transferer. Set only in constructor, but cannot be declared
    778      * as final without further complicating serialization.  Since
    779      * this is accessed only at most once per public method, there
    780      * isn't a noticeable performance penalty for using volatile
    781      * instead of final here.
    782      */
    783     private transient volatile Transferer transferer;
    784 
    785     /**
    786      * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
    787      */
    788     public SynchronousQueue() {
    789         this(false);
    790     }
    791 
    792     /**
    793      * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
    794      *
    795      * @param fair if true, waiting threads contend in FIFO order for
    796      *        access; otherwise the order is unspecified.
    797      */
    798     public SynchronousQueue(boolean fair) {
    799         transferer = fair ? new TransferQueue() : new TransferStack();
    800     }
    801 
    802     /**
    803      * Adds the specified element to this queue, waiting if necessary for
    804      * another thread to receive it.
    805      *
    806      * @throws InterruptedException {@inheritDoc}
    807      * @throws NullPointerException {@inheritDoc}
    808      */
    809     public void put(E o) throws InterruptedException {
    810         if (o == null) throw new NullPointerException();
    811         if (transferer.transfer(o, false, 0) == null) {
    812             Thread.interrupted();
    813             throw new InterruptedException();
    814         }
    815     }
    816 
    817     /**
    818      * Inserts the specified element into this queue, waiting if necessary
    819      * up to the specified wait time for another thread to receive it.
    820      *
    821      * @return <tt>true</tt> if successful, or <tt>false</tt> if the
    822      *         specified waiting time elapses before a consumer appears.
    823      * @throws InterruptedException {@inheritDoc}
    824      * @throws NullPointerException {@inheritDoc}
    825      */
    826     public boolean offer(E o, long timeout, TimeUnit unit)
    827         throws InterruptedException {
    828         if (o == null) throw new NullPointerException();
    829         if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
    830             return true;
    831         if (!Thread.interrupted())
    832             return false;
    833         throw new InterruptedException();
    834     }
    835 
    836     /**
    837      * Inserts the specified element into this queue, if another thread is
    838      * waiting to receive it.
    839      *
    840      * @param e the element to add
    841      * @return <tt>true</tt> if the element was added to this queue, else
    842      *         <tt>false</tt>
    843      * @throws NullPointerException if the specified element is null
    844      */
    845     public boolean offer(E e) {
    846         if (e == null) throw new NullPointerException();
    847         return transferer.transfer(e, true, 0) != null;
    848     }
    849 
    850     /**
    851      * Retrieves and removes the head of this queue, waiting if necessary
    852      * for another thread to insert it.
    853      *
    854      * @return the head of this queue
    855      * @throws InterruptedException {@inheritDoc}
    856      */
    857     public E take() throws InterruptedException {
    858         Object e = transferer.transfer(null, false, 0);
    859         if (e != null)
    860             return (E)e;
    861         Thread.interrupted();
    862         throw new InterruptedException();
    863     }
    864 
    865     /**
    866      * Retrieves and removes the head of this queue, waiting
    867      * if necessary up to the specified wait time, for another thread
    868      * to insert it.
    869      *
    870      * @return the head of this queue, or <tt>null</tt> if the
    871      *         specified waiting time elapses before an element is present.
    872      * @throws InterruptedException {@inheritDoc}
    873      */
    874     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    875         Object e = transferer.transfer(null, true, unit.toNanos(timeout));
    876         if (e != null || !Thread.interrupted())
    877             return (E)e;
    878         throw new InterruptedException();
    879     }
    880 
    881     /**
    882      * Retrieves and removes the head of this queue, if another thread
    883      * is currently making an element available.
    884      *
    885      * @return the head of this queue, or <tt>null</tt> if no
    886      *         element is available.
    887      */
    888     public E poll() {
    889         return (E)transferer.transfer(null, true, 0);
    890     }
    891 
    892     /**
    893      * Always returns <tt>true</tt>.
    894      * A <tt>SynchronousQueue</tt> has no internal capacity.
    895      *
    896      * @return <tt>true</tt>
    897      */
    898     public boolean isEmpty() {
    899         return true;
    900     }
    901 
    902     /**
    903      * Always returns zero.
    904      * A <tt>SynchronousQueue</tt> has no internal capacity.
    905      *
    906      * @return zero.
    907      */
    908     public int size() {
    909         return 0;
    910     }
    911 
    912     /**
    913      * Always returns zero.
    914      * A <tt>SynchronousQueue</tt> has no internal capacity.
    915      *
    916      * @return zero.
    917      */
    918     public int remainingCapacity() {
    919         return 0;
    920     }
    921 
    922     /**
    923      * Does nothing.
    924      * A <tt>SynchronousQueue</tt> has no internal capacity.
    925      */
    926     public void clear() {
    927     }
    928 
    929     /**
    930      * Always returns <tt>false</tt>.
    931      * A <tt>SynchronousQueue</tt> has no internal capacity.
    932      *
    933      * @param o the element
    934      * @return <tt>false</tt>
    935      */
    936     public boolean contains(Object o) {
    937         return false;
    938     }
    939 
    940     /**
    941      * Always returns <tt>false</tt>.
    942      * A <tt>SynchronousQueue</tt> has no internal capacity.
    943      *
    944      * @param o the element to remove
    945      * @return <tt>false</tt>
    946      */
    947     public boolean remove(Object o) {
    948         return false;
    949     }
    950 
    951     /**
    952      * Returns <tt>false</tt> unless the given collection is empty.
    953      * A <tt>SynchronousQueue</tt> has no internal capacity.
    954      *
    955      * @param c the collection
    956      * @return <tt>false</tt> unless given collection is empty
    957      */
    958     public boolean containsAll(Collection<?> c) {
    959         return c.isEmpty();
    960     }
    961 
    962     /**
    963      * Always returns <tt>false</tt>.
    964      * A <tt>SynchronousQueue</tt> has no internal capacity.
    965      *
    966      * @param c the collection
    967      * @return <tt>false</tt>
    968      */
    969     public boolean removeAll(Collection<?> c) {
    970         return false;
    971     }
    972 
    973     /**
    974      * Always returns <tt>false</tt>.
    975      * A <tt>SynchronousQueue</tt> has no internal capacity.
    976      *
    977      * @param c the collection
    978      * @return <tt>false</tt>
    979      */
    980     public boolean retainAll(Collection<?> c) {
    981         return false;
    982     }
    983 
    984     /**
    985      * Always returns <tt>null</tt>.
    986      * A <tt>SynchronousQueue</tt> does not return elements
    987      * unless actively waited on.
    988      *
    989      * @return <tt>null</tt>
    990      */
    991     public E peek() {
    992         return null;
    993     }
    994 
    995     /**
    996      * Returns an empty iterator in which <tt>hasNext</tt> always returns
    997      * <tt>false</tt>.
    998      *
    999      * @return an empty iterator
   1000      */
   1001     public Iterator<E> iterator() {
   1002         return Collections.<E>emptySet().iterator(); // android-changed
   1003     }
   1004 
   1005     /**
   1006      * Returns a zero-length array.
   1007      * @return a zero-length array
   1008      */
   1009     public Object[] toArray() {
   1010         return EmptyArray.OBJECT; // android-changed
   1011     }
   1012 
   1013     /**
   1014      * Sets the zeroeth element of the specified array to <tt>null</tt>
   1015      * (if the array has non-zero length) and returns it.
   1016      *
   1017      * @param a the array
   1018      * @return the specified array
   1019      * @throws NullPointerException if the specified array is null
   1020      */
   1021     public <T> T[] toArray(T[] a) {
   1022         if (a.length > 0)
   1023             a[0] = null;
   1024         return a;
   1025     }
   1026 
   1027     /**
   1028      * @throws UnsupportedOperationException {@inheritDoc}
   1029      * @throws ClassCastException            {@inheritDoc}
   1030      * @throws NullPointerException          {@inheritDoc}
   1031      * @throws IllegalArgumentException      {@inheritDoc}
   1032      */
   1033     public int drainTo(Collection<? super E> c) {
   1034         if (c == null)
   1035             throw new NullPointerException();
   1036         if (c == this)
   1037             throw new IllegalArgumentException();
   1038         int n = 0;
   1039         E e;
   1040         while ( (e = poll()) != null) {
   1041             c.add(e);
   1042             ++n;
   1043         }
   1044         return n;
   1045     }
   1046 
   1047     /**
   1048      * @throws UnsupportedOperationException {@inheritDoc}
   1049      * @throws ClassCastException            {@inheritDoc}
   1050      * @throws NullPointerException          {@inheritDoc}
   1051      * @throws IllegalArgumentException      {@inheritDoc}
   1052      */
   1053     public int drainTo(Collection<? super E> c, int maxElements) {
   1054         if (c == null)
   1055             throw new NullPointerException();
   1056         if (c == this)
   1057             throw new IllegalArgumentException();
   1058         int n = 0;
   1059         E e;
   1060         while (n < maxElements && (e = poll()) != null) {
   1061             c.add(e);
   1062             ++n;
   1063         }
   1064         return n;
   1065     }
   1066 
   1067     /*
   1068      * To cope with serialization strategy in the 1.5 version of
   1069      * SynchronousQueue, we declare some unused classes and fields
   1070      * that exist solely to enable serializability across versions.
   1071      * These fields are never used, so are initialized only if this
   1072      * object is ever serialized or deserialized.
   1073      */
   1074 
   1075     static class WaitQueue implements java.io.Serializable { }
   1076     static class LifoWaitQueue extends WaitQueue {
   1077         private static final long serialVersionUID = -3633113410248163686L;
   1078     }
   1079     static class FifoWaitQueue extends WaitQueue {
   1080         private static final long serialVersionUID = -3623113410248163686L;
   1081     }
   1082     private ReentrantLock qlock;
   1083     private WaitQueue waitingProducers;
   1084     private WaitQueue waitingConsumers;
   1085 
   1086     /**
   1087      * Save the state to a stream (that is, serialize it).
   1088      *
   1089      * @param s the stream
   1090      */
   1091     private void writeObject(java.io.ObjectOutputStream s)
   1092         throws java.io.IOException {
   1093         boolean fair = transferer instanceof TransferQueue;
   1094         if (fair) {
   1095             qlock = new ReentrantLock(true);
   1096             waitingProducers = new FifoWaitQueue();
   1097             waitingConsumers = new FifoWaitQueue();
   1098         }
   1099         else {
   1100             qlock = new ReentrantLock();
   1101             waitingProducers = new LifoWaitQueue();
   1102             waitingConsumers = new LifoWaitQueue();
   1103         }
   1104         s.defaultWriteObject();
   1105     }
   1106 
   1107     private void readObject(final java.io.ObjectInputStream s)
   1108         throws java.io.IOException, ClassNotFoundException {
   1109         s.defaultReadObject();
   1110         if (waitingProducers instanceof FifoWaitQueue)
   1111             transferer = new TransferQueue();
   1112         else
   1113             transferer = new TransferStack();
   1114     }
   1115 
   1116     // Unsafe mechanics
   1117     static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
   1118                                   String field, Class<?> klazz) {
   1119         try {
   1120             return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
   1121         } catch (NoSuchFieldException e) {
   1122             // Convert Exception to corresponding Error
   1123             NoSuchFieldError error = new NoSuchFieldError(field);
   1124             error.initCause(e);
   1125             throw error;
   1126         }
   1127     }
   1128 
   1129 }
   1130