Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Written by Doug Lea with assistance from members of JCP JSR-166
      3  * Expert Group and released to the public domain, as explained at
      4  * http://creativecommons.org/publicdomain/zero/1.0/
      5  */
      6 
      7 package java.util.concurrent;
      8 
      9 import java.util.concurrent.atomic.AtomicInteger;
     10 import java.util.concurrent.locks.Condition;
     11 import java.util.concurrent.locks.ReentrantLock;
     12 import java.util.AbstractQueue;
     13 import java.util.Collection;
     14 import java.util.Iterator;
     15 import java.util.NoSuchElementException;
     16 
     17 // BEGIN android-note
     18 // removed link to collections framework docs
     19 // END android-note
     20 
     21 /**
     22  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
     23  * linked nodes.
     24  * This queue orders elements FIFO (first-in-first-out).
     25  * The <em>head</em> of the queue is that element that has been on the
     26  * queue the longest time.
     27  * The <em>tail</em> of the queue is that element that has been on the
     28  * queue the shortest time. New elements
     29  * are inserted at the tail of the queue, and the queue retrieval
     30  * operations obtain elements at the head of the queue.
     31  * Linked queues typically have higher throughput than array-based queues but
     32  * less predictable performance in most concurrent applications.
     33  *
     34  * <p>The optional capacity bound constructor argument serves as a
     35  * way to prevent excessive queue expansion. The capacity, if unspecified,
     36  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
     37  * dynamically created upon each insertion unless this would bring the
     38  * queue above capacity.
     39  *
     40  * <p>This class and its iterator implement all of the
     41  * <em>optional</em> methods of the {@link Collection} and {@link
     42  * Iterator} interfaces.
     43  *
     44  * @since 1.5
     45  * @author Doug Lea
     46  * @param <E> the type of elements held in this collection
     47  */
     48 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
     49         implements BlockingQueue<E>, java.io.Serializable {
     50     private static final long serialVersionUID = -6903933977591709194L;
     51 
     52     /*
     53      * A variant of the "two lock queue" algorithm.  The putLock gates
     54      * entry to put (and offer), and has an associated condition for
     55      * waiting puts.  Similarly for the takeLock.  The "count" field
     56      * that they both rely on is maintained as an atomic to avoid
     57      * needing to get both locks in most cases. Also, to minimize need
     58      * for puts to get takeLock and vice-versa, cascading notifies are
     59      * used. When a put notices that it has enabled at least one take,
     60      * it signals taker. That taker in turn signals others if more
     61      * items have been entered since the signal. And symmetrically for
     62      * takes signalling puts. Operations such as remove(Object) and
     63      * iterators acquire both locks.
     64      *
     65      * Visibility between writers and readers is provided as follows:
     66      *
     67      * Whenever an element is enqueued, the putLock is acquired and
     68      * count updated.  A subsequent reader guarantees visibility to the
     69      * enqueued Node by either acquiring the putLock (via fullyLock)
     70      * or by acquiring the takeLock, and then reading n = count.get();
     71      * this gives visibility to the first n items.
     72      *
     73      * To implement weakly consistent iterators, it appears we need to
     74      * keep all Nodes GC-reachable from a predecessor dequeued Node.
     75      * That would cause two problems:
     76      * - allow a rogue Iterator to cause unbounded memory retention
     77      * - cause cross-generational linking of old Nodes to new Nodes if
     78      *   a Node was tenured while live, which generational GCs have a
     79      *   hard time dealing with, causing repeated major collections.
     80      * However, only non-deleted Nodes need to be reachable from
     81      * dequeued Nodes, and reachability does not necessarily have to
     82      * be of the kind understood by the GC.  We use the trick of
     83      * linking a Node that has just been dequeued to itself.  Such a
     84      * self-link implicitly means to advance to head.next.
     85      */
     86 
     87     /**
     88      * Linked list node class
     89      */
     90     static class Node<E> {
     91         E item;
     92 
     93         /**
     94          * One of:
     95          * - the real successor Node
     96          * - this Node, meaning the successor is head.next
     97          * - null, meaning there is no successor (this is the last node)
     98          */
     99         Node<E> next;
    100 
    101         Node(E x) { item = x; }
    102     }
    103 
    104     /** The capacity bound, or Integer.MAX_VALUE if none */
    105     private final int capacity;
    106 
    107     /** Current number of elements */
    108     private final AtomicInteger count = new AtomicInteger();
    109 
    110     /**
    111      * Head of linked list.
    112      * Invariant: head.item == null
    113      */
    114     transient Node<E> head;
    115 
    116     /**
    117      * Tail of linked list.
    118      * Invariant: last.next == null
    119      */
    120     private transient Node<E> last;
    121 
    122     /** Lock held by take, poll, etc */
    123     private final ReentrantLock takeLock = new ReentrantLock();
    124 
    125     /** Wait queue for waiting takes */
    126     private final Condition notEmpty = takeLock.newCondition();
    127 
    128     /** Lock held by put, offer, etc */
    129     private final ReentrantLock putLock = new ReentrantLock();
    130 
    131     /** Wait queue for waiting puts */
    132     private final Condition notFull = putLock.newCondition();
    133 
    134     /**
    135      * Signals a waiting take. Called only from put/offer (which do not
    136      * otherwise ordinarily lock takeLock.)
    137      */
    138     private void signalNotEmpty() {
    139         final ReentrantLock takeLock = this.takeLock;
    140         takeLock.lock();
    141         try {
    142             notEmpty.signal();
    143         } finally {
    144             takeLock.unlock();
    145         }
    146     }
    147 
    148     /**
    149      * Signals a waiting put. Called only from take/poll.
    150      */
    151     private void signalNotFull() {
    152         final ReentrantLock putLock = this.putLock;
    153         putLock.lock();
    154         try {
    155             notFull.signal();
    156         } finally {
    157             putLock.unlock();
    158         }
    159     }
    160 
    161     /**
    162      * Links node at end of queue.
    163      *
    164      * @param node the node
    165      */
    166     private void enqueue(Node<E> node) {
    167         // assert putLock.isHeldByCurrentThread();
    168         // assert last.next == null;
    169         last = last.next = node;
    170     }
    171 
    172     /**
    173      * Removes a node from head of queue.
    174      *
    175      * @return the node
    176      */
    177     private E dequeue() {
    178         // assert takeLock.isHeldByCurrentThread();
    179         // assert head.item == null;
    180         Node<E> h = head;
    181         Node<E> first = h.next;
    182         h.next = h; // help GC
    183         head = first;
    184         E x = first.item;
    185         first.item = null;
    186         return x;
    187     }
    188 
    189     /**
    190      * Locks to prevent both puts and takes.
    191      */
    192     void fullyLock() {
    193         putLock.lock();
    194         takeLock.lock();
    195     }
    196 
    197     /**
    198      * Unlocks to allow both puts and takes.
    199      */
    200     void fullyUnlock() {
    201         takeLock.unlock();
    202         putLock.unlock();
    203     }
    204 
    205 //     /**
    206 //      * Tells whether both locks are held by current thread.
    207 //      */
    208 //     boolean isFullyLocked() {
    209 //         return (putLock.isHeldByCurrentThread() &&
    210 //                 takeLock.isHeldByCurrentThread());
    211 //     }
    212 
    213     /**
    214      * Creates a {@code LinkedBlockingQueue} with a capacity of
    215      * {@link Integer#MAX_VALUE}.
    216      */
    217     public LinkedBlockingQueue() {
    218         this(Integer.MAX_VALUE);
    219     }
    220 
    221     /**
    222      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
    223      *
    224      * @param capacity the capacity of this queue
    225      * @throws IllegalArgumentException if {@code capacity} is not greater
    226      *         than zero
    227      */
    228     public LinkedBlockingQueue(int capacity) {
    229         if (capacity <= 0) throw new IllegalArgumentException();
    230         this.capacity = capacity;
    231         last = head = new Node<E>(null);
    232     }
    233 
    234     /**
    235      * Creates a {@code LinkedBlockingQueue} with a capacity of
    236      * {@link Integer#MAX_VALUE}, initially containing the elements of the
    237      * given collection,
    238      * added in traversal order of the collection's iterator.
    239      *
    240      * @param c the collection of elements to initially contain
    241      * @throws NullPointerException if the specified collection or any
    242      *         of its elements are null
    243      */
    244     public LinkedBlockingQueue(Collection<? extends E> c) {
    245         this(Integer.MAX_VALUE);
    246         final ReentrantLock putLock = this.putLock;
    247         putLock.lock(); // Never contended, but necessary for visibility
    248         try {
    249             int n = 0;
    250             for (E e : c) {
    251                 if (e == null)
    252                     throw new NullPointerException();
    253                 if (n == capacity)
    254                     throw new IllegalStateException("Queue full");
    255                 enqueue(new Node<E>(e));
    256                 ++n;
    257             }
    258             count.set(n);
    259         } finally {
    260             putLock.unlock();
    261         }
    262     }
    263 
    264     // this doc comment is overridden to remove the reference to collections
    265     // greater in size than Integer.MAX_VALUE
    266     /**
    267      * Returns the number of elements in this queue.
    268      *
    269      * @return the number of elements in this queue
    270      */
    271     public int size() {
    272         return count.get();
    273     }
    274 
    275     // this doc comment is a modified copy of the inherited doc comment,
    276     // without the reference to unlimited queues.
    277     /**
    278      * Returns the number of additional elements that this queue can ideally
    279      * (in the absence of memory or resource constraints) accept without
    280      * blocking. This is always equal to the initial capacity of this queue
    281      * less the current {@code size} of this queue.
    282      *
    283      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
    284      * an element will succeed by inspecting {@code remainingCapacity}
    285      * because it may be the case that another thread is about to
    286      * insert or remove an element.
    287      */
    288     public int remainingCapacity() {
    289         return capacity - count.get();
    290     }
    291 
    292     /**
    293      * Inserts the specified element at the tail of this queue, waiting if
    294      * necessary for space to become available.
    295      *
    296      * @throws InterruptedException {@inheritDoc}
    297      * @throws NullPointerException {@inheritDoc}
    298      */
    299     public void put(E e) throws InterruptedException {
    300         if (e == null) throw new NullPointerException();
    301         // Note: convention in all put/take/etc is to preset local var
    302         // holding count negative to indicate failure unless set.
    303         int c = -1;
    304         Node<E> node = new Node<E>(e);
    305         final ReentrantLock putLock = this.putLock;
    306         final AtomicInteger count = this.count;
    307         putLock.lockInterruptibly();
    308         try {
    309             /*
    310              * Note that count is used in wait guard even though it is
    311              * not protected by lock. This works because count can
    312              * only decrease at this point (all other puts are shut
    313              * out by lock), and we (or some other waiting put) are
    314              * signalled if it ever changes from capacity. Similarly
    315              * for all other uses of count in other wait guards.
    316              */
    317             while (count.get() == capacity) {
    318                 notFull.await();
    319             }
    320             enqueue(node);
    321             c = count.getAndIncrement();
    322             if (c + 1 < capacity)
    323                 notFull.signal();
    324         } finally {
    325             putLock.unlock();
    326         }
    327         if (c == 0)
    328             signalNotEmpty();
    329     }
    330 
    331     /**
    332      * Inserts the specified element at the tail of this queue, waiting if
    333      * necessary up to the specified wait time for space to become available.
    334      *
    335      * @return {@code true} if successful, or {@code false} if
    336      *         the specified waiting time elapses before space is available
    337      * @throws InterruptedException {@inheritDoc}
    338      * @throws NullPointerException {@inheritDoc}
    339      */
    340     public boolean offer(E e, long timeout, TimeUnit unit)
    341         throws InterruptedException {
    342 
    343         if (e == null) throw new NullPointerException();
    344         long nanos = unit.toNanos(timeout);
    345         int c = -1;
    346         final ReentrantLock putLock = this.putLock;
    347         final AtomicInteger count = this.count;
    348         putLock.lockInterruptibly();
    349         try {
    350             while (count.get() == capacity) {
    351                 if (nanos <= 0)
    352                     return false;
    353                 nanos = notFull.awaitNanos(nanos);
    354             }
    355             enqueue(new Node<E>(e));
    356             c = count.getAndIncrement();
    357             if (c + 1 < capacity)
    358                 notFull.signal();
    359         } finally {
    360             putLock.unlock();
    361         }
    362         if (c == 0)
    363             signalNotEmpty();
    364         return true;
    365     }
    366 
    367     /**
    368      * Inserts the specified element at the tail of this queue if it is
    369      * possible to do so immediately without exceeding the queue's capacity,
    370      * returning {@code true} upon success and {@code false} if this queue
    371      * is full.
    372      * When using a capacity-restricted queue, this method is generally
    373      * preferable to method {@link BlockingQueue#add add}, which can fail to
    374      * insert an element only by throwing an exception.
    375      *
    376      * @throws NullPointerException if the specified element is null
    377      */
    378     public boolean offer(E e) {
    379         if (e == null) throw new NullPointerException();
    380         final AtomicInteger count = this.count;
    381         if (count.get() == capacity)
    382             return false;
    383         int c = -1;
    384         Node<E> node = new Node<E>(e);
    385         final ReentrantLock putLock = this.putLock;
    386         putLock.lock();
    387         try {
    388             if (count.get() < capacity) {
    389                 enqueue(node);
    390                 c = count.getAndIncrement();
    391                 if (c + 1 < capacity)
    392                     notFull.signal();
    393             }
    394         } finally {
    395             putLock.unlock();
    396         }
    397         if (c == 0)
    398             signalNotEmpty();
    399         return c >= 0;
    400     }
    401 
    402     public E take() throws InterruptedException {
    403         E x;
    404         int c = -1;
    405         final AtomicInteger count = this.count;
    406         final ReentrantLock takeLock = this.takeLock;
    407         takeLock.lockInterruptibly();
    408         try {
    409             while (count.get() == 0) {
    410                 notEmpty.await();
    411             }
    412             x = dequeue();
    413             c = count.getAndDecrement();
    414             if (c > 1)
    415                 notEmpty.signal();
    416         } finally {
    417             takeLock.unlock();
    418         }
    419         if (c == capacity)
    420             signalNotFull();
    421         return x;
    422     }
    423 
    424     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    425         E x = null;
    426         int c = -1;
    427         long nanos = unit.toNanos(timeout);
    428         final AtomicInteger count = this.count;
    429         final ReentrantLock takeLock = this.takeLock;
    430         takeLock.lockInterruptibly();
    431         try {
    432             while (count.get() == 0) {
    433                 if (nanos <= 0)
    434                     return null;
    435                 nanos = notEmpty.awaitNanos(nanos);
    436             }
    437             x = dequeue();
    438             c = count.getAndDecrement();
    439             if (c > 1)
    440                 notEmpty.signal();
    441         } finally {
    442             takeLock.unlock();
    443         }
    444         if (c == capacity)
    445             signalNotFull();
    446         return x;
    447     }
    448 
    449     public E poll() {
    450         final AtomicInteger count = this.count;
    451         if (count.get() == 0)
    452             return null;
    453         E x = null;
    454         int c = -1;
    455         final ReentrantLock takeLock = this.takeLock;
    456         takeLock.lock();
    457         try {
    458             if (count.get() > 0) {
    459                 x = dequeue();
    460                 c = count.getAndDecrement();
    461                 if (c > 1)
    462                     notEmpty.signal();
    463             }
    464         } finally {
    465             takeLock.unlock();
    466         }
    467         if (c == capacity)
    468             signalNotFull();
    469         return x;
    470     }
    471 
    472     public E peek() {
    473         if (count.get() == 0)
    474             return null;
    475         final ReentrantLock takeLock = this.takeLock;
    476         takeLock.lock();
    477         try {
    478             Node<E> first = head.next;
    479             if (first == null)
    480                 return null;
    481             else
    482                 return first.item;
    483         } finally {
    484             takeLock.unlock();
    485         }
    486     }
    487 
    488     /**
    489      * Unlinks interior Node p with predecessor trail.
    490      */
    491     void unlink(Node<E> p, Node<E> trail) {
    492         // assert isFullyLocked();
    493         // p.next is not changed, to allow iterators that are
    494         // traversing p to maintain their weak-consistency guarantee.
    495         p.item = null;
    496         trail.next = p.next;
    497         if (last == p)
    498             last = trail;
    499         if (count.getAndDecrement() == capacity)
    500             notFull.signal();
    501     }
    502 
    503     /**
    504      * Removes a single instance of the specified element from this queue,
    505      * if it is present.  More formally, removes an element {@code e} such
    506      * that {@code o.equals(e)}, if this queue contains one or more such
    507      * elements.
    508      * Returns {@code true} if this queue contained the specified element
    509      * (or equivalently, if this queue changed as a result of the call).
    510      *
    511      * @param o element to be removed from this queue, if present
    512      * @return {@code true} if this queue changed as a result of the call
    513      */
    514     public boolean remove(Object o) {
    515         if (o == null) return false;
    516         fullyLock();
    517         try {
    518             for (Node<E> trail = head, p = trail.next;
    519                  p != null;
    520                  trail = p, p = p.next) {
    521                 if (o.equals(p.item)) {
    522                     unlink(p, trail);
    523                     return true;
    524                 }
    525             }
    526             return false;
    527         } finally {
    528             fullyUnlock();
    529         }
    530     }
    531 
    532     /**
    533      * Returns {@code true} if this queue contains the specified element.
    534      * More formally, returns {@code true} if and only if this queue contains
    535      * at least one element {@code e} such that {@code o.equals(e)}.
    536      *
    537      * @param o object to be checked for containment in this queue
    538      * @return {@code true} if this queue contains the specified element
    539      */
    540     public boolean contains(Object o) {
    541         if (o == null) return false;
    542         fullyLock();
    543         try {
    544             for (Node<E> p = head.next; p != null; p = p.next)
    545                 if (o.equals(p.item))
    546                     return true;
    547             return false;
    548         } finally {
    549             fullyUnlock();
    550         }
    551     }
    552 
    553     /**
    554      * Returns an array containing all of the elements in this queue, in
    555      * proper sequence.
    556      *
    557      * <p>The returned array will be "safe" in that no references to it are
    558      * maintained by this queue.  (In other words, this method must allocate
    559      * a new array).  The caller is thus free to modify the returned array.
    560      *
    561      * <p>This method acts as bridge between array-based and collection-based
    562      * APIs.
    563      *
    564      * @return an array containing all of the elements in this queue
    565      */
    566     public Object[] toArray() {
    567         fullyLock();
    568         try {
    569             int size = count.get();
    570             Object[] a = new Object[size];
    571             int k = 0;
    572             for (Node<E> p = head.next; p != null; p = p.next)
    573                 a[k++] = p.item;
    574             return a;
    575         } finally {
    576             fullyUnlock();
    577         }
    578     }
    579 
    580     /**
    581      * Returns an array containing all of the elements in this queue, in
    582      * proper sequence; the runtime type of the returned array is that of
    583      * the specified array.  If the queue fits in the specified array, it
    584      * is returned therein.  Otherwise, a new array is allocated with the
    585      * runtime type of the specified array and the size of this queue.
    586      *
    587      * <p>If this queue fits in the specified array with room to spare
    588      * (i.e., the array has more elements than this queue), the element in
    589      * the array immediately following the end of the queue is set to
    590      * {@code null}.
    591      *
    592      * <p>Like the {@link #toArray()} method, this method acts as bridge between
    593      * array-based and collection-based APIs.  Further, this method allows
    594      * precise control over the runtime type of the output array, and may,
    595      * under certain circumstances, be used to save allocation costs.
    596      *
    597      * <p>Suppose {@code x} is a queue known to contain only strings.
    598      * The following code can be used to dump the queue into a newly
    599      * allocated array of {@code String}:
    600      *
    601      *  <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
    602      *
    603      * Note that {@code toArray(new Object[0])} is identical in function to
    604      * {@code toArray()}.
    605      *
    606      * @param a the array into which the elements of the queue are to
    607      *          be stored, if it is big enough; otherwise, a new array of the
    608      *          same runtime type is allocated for this purpose
    609      * @return an array containing all of the elements in this queue
    610      * @throws ArrayStoreException if the runtime type of the specified array
    611      *         is not a supertype of the runtime type of every element in
    612      *         this queue
    613      * @throws NullPointerException if the specified array is null
    614      */
    615     @SuppressWarnings("unchecked")
    616     public <T> T[] toArray(T[] a) {
    617         fullyLock();
    618         try {
    619             int size = count.get();
    620             if (a.length < size)
    621                 a = (T[])java.lang.reflect.Array.newInstance
    622                     (a.getClass().getComponentType(), size);
    623 
    624             int k = 0;
    625             for (Node<E> p = head.next; p != null; p = p.next)
    626                 a[k++] = (T)p.item;
    627             if (a.length > k)
    628                 a[k] = null;
    629             return a;
    630         } finally {
    631             fullyUnlock();
    632         }
    633     }
    634 
    635     public String toString() {
    636         fullyLock();
    637         try {
    638             Node<E> p = head.next;
    639             if (p == null)
    640                 return "[]";
    641 
    642             StringBuilder sb = new StringBuilder();
    643             sb.append('[');
    644             for (;;) {
    645                 E e = p.item;
    646                 sb.append(e == this ? "(this Collection)" : e);
    647                 p = p.next;
    648                 if (p == null)
    649                     return sb.append(']').toString();
    650                 sb.append(',').append(' ');
    651             }
    652         } finally {
    653             fullyUnlock();
    654         }
    655     }
    656 
    657     /**
    658      * Atomically removes all of the elements from this queue.
    659      * The queue will be empty after this call returns.
    660      */
    661     public void clear() {
    662         fullyLock();
    663         try {
    664             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
    665                 h.next = h;
    666                 p.item = null;
    667             }
    668             head = last;
    669             // assert head.item == null && head.next == null;
    670             if (count.getAndSet(0) == capacity)
    671                 notFull.signal();
    672         } finally {
    673             fullyUnlock();
    674         }
    675     }
    676 
    677     /**
    678      * @throws UnsupportedOperationException {@inheritDoc}
    679      * @throws ClassCastException            {@inheritDoc}
    680      * @throws NullPointerException          {@inheritDoc}
    681      * @throws IllegalArgumentException      {@inheritDoc}
    682      */
    683     public int drainTo(Collection<? super E> c) {
    684         return drainTo(c, Integer.MAX_VALUE);
    685     }
    686 
    687     /**
    688      * @throws UnsupportedOperationException {@inheritDoc}
    689      * @throws ClassCastException            {@inheritDoc}
    690      * @throws NullPointerException          {@inheritDoc}
    691      * @throws IllegalArgumentException      {@inheritDoc}
    692      */
    693     public int drainTo(Collection<? super E> c, int maxElements) {
    694         if (c == null)
    695             throw new NullPointerException();
    696         if (c == this)
    697             throw new IllegalArgumentException();
    698         if (maxElements <= 0)
    699             return 0;
    700         boolean signalNotFull = false;
    701         final ReentrantLock takeLock = this.takeLock;
    702         takeLock.lock();
    703         try {
    704             int n = Math.min(maxElements, count.get());
    705             // count.get provides visibility to first n Nodes
    706             Node<E> h = head;
    707             int i = 0;
    708             try {
    709                 while (i < n) {
    710                     Node<E> p = h.next;
    711                     c.add(p.item);
    712                     p.item = null;
    713                     h.next = h;
    714                     h = p;
    715                     ++i;
    716                 }
    717                 return n;
    718             } finally {
    719                 // Restore invariants even if c.add() threw
    720                 if (i > 0) {
    721                     // assert h.item == null;
    722                     head = h;
    723                     signalNotFull = (count.getAndAdd(-i) == capacity);
    724                 }
    725             }
    726         } finally {
    727             takeLock.unlock();
    728             if (signalNotFull)
    729                 signalNotFull();
    730         }
    731     }
    732 
    733     /**
    734      * Returns an iterator over the elements in this queue in proper sequence.
    735      * The elements will be returned in order from first (head) to last (tail).
    736      *
    737      * <p>The returned iterator is a "weakly consistent" iterator that
    738      * will never throw {@link java.util.ConcurrentModificationException
    739      * ConcurrentModificationException}, and guarantees to traverse
    740      * elements as they existed upon construction of the iterator, and
    741      * may (but is not guaranteed to) reflect any modifications
    742      * subsequent to construction.
    743      *
    744      * @return an iterator over the elements in this queue in proper sequence
    745      */
    746     public Iterator<E> iterator() {
    747         return new Itr();
    748     }
    749 
    750     private class Itr implements Iterator<E> {
    751         /*
    752          * Basic weakly-consistent iterator.  At all times hold the next
    753          * item to hand out so that if hasNext() reports true, we will
    754          * still have it to return even if lost race with a take etc.
    755          */
    756 
    757         private Node<E> current;
    758         private Node<E> lastRet;
    759         private E currentElement;
    760 
    761         Itr() {
    762             fullyLock();
    763             try {
    764                 current = head.next;
    765                 if (current != null)
    766                     currentElement = current.item;
    767             } finally {
    768                 fullyUnlock();
    769             }
    770         }
    771 
    772         public boolean hasNext() {
    773             return current != null;
    774         }
    775 
    776         /**
    777          * Returns the next live successor of p, or null if no such.
    778          *
    779          * Unlike other traversal methods, iterators need to handle both:
    780          * - dequeued nodes (p.next == p)
    781          * - (possibly multiple) interior removed nodes (p.item == null)
    782          */
    783         private Node<E> nextNode(Node<E> p) {
    784             for (;;) {
    785                 Node<E> s = p.next;
    786                 if (s == p)
    787                     return head.next;
    788                 if (s == null || s.item != null)
    789                     return s;
    790                 p = s;
    791             }
    792         }
    793 
    794         public E next() {
    795             fullyLock();
    796             try {
    797                 if (current == null)
    798                     throw new NoSuchElementException();
    799                 E x = currentElement;
    800                 lastRet = current;
    801                 current = nextNode(current);
    802                 currentElement = (current == null) ? null : current.item;
    803                 return x;
    804             } finally {
    805                 fullyUnlock();
    806             }
    807         }
    808 
    809         public void remove() {
    810             if (lastRet == null)
    811                 throw new IllegalStateException();
    812             fullyLock();
    813             try {
    814                 Node<E> node = lastRet;
    815                 lastRet = null;
    816                 for (Node<E> trail = head, p = trail.next;
    817                      p != null;
    818                      trail = p, p = p.next) {
    819                     if (p == node) {
    820                         unlink(p, trail);
    821                         break;
    822                     }
    823                 }
    824             } finally {
    825                 fullyUnlock();
    826             }
    827         }
    828     }
    829 
    830     /**
    831      * Saves this queue to a stream (that is, serializes it).
    832      *
    833      * @serialData The capacity is emitted (int), followed by all of
    834      * its elements (each an {@code Object}) in the proper order,
    835      * followed by a null
    836      */
    837     private void writeObject(java.io.ObjectOutputStream s)
    838         throws java.io.IOException {
    839 
    840         fullyLock();
    841         try {
    842             // Write out any hidden stuff, plus capacity
    843             s.defaultWriteObject();
    844 
    845             // Write out all elements in the proper order.
    846             for (Node<E> p = head.next; p != null; p = p.next)
    847                 s.writeObject(p.item);
    848 
    849             // Use trailing null as sentinel
    850             s.writeObject(null);
    851         } finally {
    852             fullyUnlock();
    853         }
    854     }
    855 
    856     /**
    857      * Reconstitutes this queue from a stream (that is, deserializes it).
    858      */
    859     private void readObject(java.io.ObjectInputStream s)
    860         throws java.io.IOException, ClassNotFoundException {
    861         // Read in capacity, and any hidden stuff
    862         s.defaultReadObject();
    863 
    864         count.set(0);
    865         last = head = new Node<E>(null);
    866 
    867         // Read in all elements and place in queue
    868         for (;;) {
    869             @SuppressWarnings("unchecked")
    870             E item = (E)s.readObject();
    871             if (item == null)
    872                 break;
    873             add(item);
    874         }
    875     }
    876 }
    877