Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Written by Doug Lea with assistance from members of JCP JSR-166
      3  * Expert Group and released to the public domain, as explained at
      4  * http://creativecommons.org/licenses/publicdomain
      5  */
      6 
      7 package java.util.concurrent;
      8 
      9 import java.util.concurrent.atomic.AtomicInteger;
     10 import java.util.concurrent.locks.Condition;
     11 import java.util.concurrent.locks.ReentrantLock;
     12 import java.util.AbstractQueue;
     13 import java.util.Collection;
     14 import java.util.Iterator;
     15 import java.util.NoSuchElementException;
     16 
     17 // BEGIN android-note
     18 // removed link to collections framework docs
     19 // END android-note
     20 
     21 /**
     22  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
     23  * linked nodes.
     24  * This queue orders elements FIFO (first-in-first-out).
     25  * The <em>head</em> of the queue is that element that has been on the
     26  * queue the longest time.
     27  * The <em>tail</em> of the queue is that element that has been on the
     28  * queue the shortest time. New elements
     29  * are inserted at the tail of the queue, and the queue retrieval
     30  * operations obtain elements at the head of the queue.
     31  * Linked queues typically have higher throughput than array-based queues but
     32  * less predictable performance in most concurrent applications.
     33  *
     34  * <p> The optional capacity bound constructor argument serves as a
     35  * way to prevent excessive queue expansion. The capacity, if unspecified,
     36  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
     37  * dynamically created upon each insertion unless this would bring the
     38  * queue above capacity.
     39  *
     40  * <p>This class and its iterator implement all of the
     41  * <em>optional</em> methods of the {@link Collection} and {@link
     42  * Iterator} interfaces.
     43  *
     44  * @since 1.5
     45  * @author Doug Lea
     46  * @param <E> the type of elements held in this collection
     47  *
     48  */
     49 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
     50         implements BlockingQueue<E>, java.io.Serializable {
     51     private static final long serialVersionUID = -6903933977591709194L;
     52 
     53     /*
     54      * A variant of the "two lock queue" algorithm.  The putLock gates
     55      * entry to put (and offer), and has an associated condition for
     56      * waiting puts.  Similarly for the takeLock.  The "count" field
     57      * that they both rely on is maintained as an atomic to avoid
     58      * needing to get both locks in most cases. Also, to minimize need
     59      * for puts to get takeLock and vice-versa, cascading notifies are
     60      * used. When a put notices that it has enabled at least one take,
     61      * it signals taker. That taker in turn signals others if more
     62      * items have been entered since the signal. And symmetrically for
     63      * takes signalling puts. Operations such as remove(Object) and
     64      * iterators acquire both locks.
     65      *
     66      * Visibility between writers and readers is provided as follows:
     67      *
     68      * Whenever an element is enqueued, the putLock is acquired and
     69      * count updated.  A subsequent reader guarantees visibility to the
     70      * enqueued Node by either acquiring the putLock (via fullyLock)
     71      * or by acquiring the takeLock, and then reading n = count.get();
     72      * this gives visibility to the first n items.
     73      *
     74      * To implement weakly consistent iterators, it appears we need to
     75      * keep all Nodes GC-reachable from a predecessor dequeued Node.
     76      * That would cause two problems:
     77      * - allow a rogue Iterator to cause unbounded memory retention
     78      * - cause cross-generational linking of old Nodes to new Nodes if
     79      *   a Node was tenured while live, which generational GCs have a
     80      *   hard time dealing with, causing repeated major collections.
     81      * However, only non-deleted Nodes need to be reachable from
     82      * dequeued Nodes, and reachability does not necessarily have to
     83      * be of the kind understood by the GC.  We use the trick of
     84      * linking a Node that has just been dequeued to itself.  Such a
     85      * self-link implicitly means to advance to head.next.
     86      */
     87 
     88     /**
     89      * Linked list node class
     90      */
     91     static class Node<E> {
     92         E item;
     93 
     94         /**
     95          * One of:
     96          * - the real successor Node
     97          * - this Node, meaning the successor is head.next
     98          * - null, meaning there is no successor (this is the last node)
     99          */
    100         Node<E> next;
    101 
    102         Node(E x) { item = x; }
    103     }
    104 
    105     /** The capacity bound, or Integer.MAX_VALUE if none */
    106     private final int capacity;
    107 
    108     /** Current number of elements */
    109     private final AtomicInteger count = new AtomicInteger(0);
    110 
    111     /**
    112      * Head of linked list.
    113      * Invariant: head.item == null
    114      */
    115     private transient Node<E> head;
    116 
    117     /**
    118      * Tail of linked list.
    119      * Invariant: last.next == null
    120      */
    121     private transient Node<E> last;
    122 
    123     /** Lock held by take, poll, etc */
    124     private final ReentrantLock takeLock = new ReentrantLock();
    125 
    126     /** Wait queue for waiting takes */
    127     private final Condition notEmpty = takeLock.newCondition();
    128 
    129     /** Lock held by put, offer, etc */
    130     private final ReentrantLock putLock = new ReentrantLock();
    131 
    132     /** Wait queue for waiting puts */
    133     private final Condition notFull = putLock.newCondition();
    134 
    135     /**
    136      * Signals a waiting take. Called only from put/offer (which do not
    137      * otherwise ordinarily lock takeLock.)
    138      */
    139     private void signalNotEmpty() {
    140         final ReentrantLock takeLock = this.takeLock;
    141         takeLock.lock();
    142         try {
    143             notEmpty.signal();
    144         } finally {
    145             takeLock.unlock();
    146         }
    147     }
    148 
    149     /**
    150      * Signals a waiting put. Called only from take/poll.
    151      */
    152     private void signalNotFull() {
    153         final ReentrantLock putLock = this.putLock;
    154         putLock.lock();
    155         try {
    156             notFull.signal();
    157         } finally {
    158             putLock.unlock();
    159         }
    160     }
    161 
    162     /**
    163      * Links node at end of queue.
    164      *
    165      * @param node the node
    166      */
    167     private void enqueue(Node<E> node) {
    168         // assert putLock.isHeldByCurrentThread();
    169         // assert last.next == null;
    170         last = last.next = node;
    171     }
    172 
    173     /**
    174      * Removes a node from head of queue.
    175      *
    176      * @return the node
    177      */
    178     private E dequeue() {
    179         // assert takeLock.isHeldByCurrentThread();
    180         // assert head.item == null;
    181         Node<E> h = head;
    182         Node<E> first = h.next;
    183         h.next = h; // help GC
    184         head = first;
    185         E x = first.item;
    186         first.item = null;
    187         return x;
    188     }
    189 
    190     /**
    191      * Lock to prevent both puts and takes.
    192      */
    193     void fullyLock() {
    194         putLock.lock();
    195         takeLock.lock();
    196     }
    197 
    198     /**
    199      * Unlock to allow both puts and takes.
    200      */
    201     void fullyUnlock() {
    202         takeLock.unlock();
    203         putLock.unlock();
    204     }
    205 
    206 //     /**
    207 //      * Tells whether both locks are held by current thread.
    208 //      */
    209 //     boolean isFullyLocked() {
    210 //         return (putLock.isHeldByCurrentThread() &&
    211 //                 takeLock.isHeldByCurrentThread());
    212 //     }
    213 
    214     /**
    215      * Creates a {@code LinkedBlockingQueue} with a capacity of
    216      * {@link Integer#MAX_VALUE}.
    217      */
    218     public LinkedBlockingQueue() {
    219         this(Integer.MAX_VALUE);
    220     }
    221 
    222     /**
    223      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
    224      *
    225      * @param capacity the capacity of this queue
    226      * @throws IllegalArgumentException if {@code capacity} is not greater
    227      *         than zero
    228      */
    229     public LinkedBlockingQueue(int capacity) {
    230         if (capacity <= 0) throw new IllegalArgumentException();
    231         this.capacity = capacity;
    232         last = head = new Node<E>(null);
    233     }
    234 
    235     /**
    236      * Creates a {@code LinkedBlockingQueue} with a capacity of
    237      * {@link Integer#MAX_VALUE}, initially containing the elements of the
    238      * given collection,
    239      * added in traversal order of the collection's iterator.
    240      *
    241      * @param c the collection of elements to initially contain
    242      * @throws NullPointerException if the specified collection or any
    243      *         of its elements are null
    244      */
    245     public LinkedBlockingQueue(Collection<? extends E> c) {
    246         this(Integer.MAX_VALUE);
    247         final ReentrantLock putLock = this.putLock;
    248         putLock.lock(); // Never contended, but necessary for visibility
    249         try {
    250             int n = 0;
    251             for (E e : c) {
    252                 if (e == null)
    253                     throw new NullPointerException();
    254                 if (n == capacity)
    255                     throw new IllegalStateException("Queue full");
    256                 enqueue(new Node<E>(e));
    257                 ++n;
    258             }
    259             count.set(n);
    260         } finally {
    261             putLock.unlock();
    262         }
    263     }
    264 
    265 
    266     // this doc comment is overridden to remove the reference to collections
    267     // greater in size than Integer.MAX_VALUE
    268     /**
    269      * Returns the number of elements in this queue.
    270      *
    271      * @return the number of elements in this queue
    272      */
    273     public int size() {
    274         return count.get();
    275     }
    276 
    277     // this doc comment is a modified copy of the inherited doc comment,
    278     // without the reference to unlimited queues.
    279     /**
    280      * Returns the number of additional elements that this queue can ideally
    281      * (in the absence of memory or resource constraints) accept without
    282      * blocking. This is always equal to the initial capacity of this queue
    283      * less the current {@code size} of this queue.
    284      *
    285      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
    286      * an element will succeed by inspecting {@code remainingCapacity}
    287      * because it may be the case that another thread is about to
    288      * insert or remove an element.
    289      */
    290     public int remainingCapacity() {
    291         return capacity - count.get();
    292     }
    293 
    294     /**
    295      * Inserts the specified element at the tail of this queue, waiting if
    296      * necessary for space to become available.
    297      *
    298      * @throws InterruptedException {@inheritDoc}
    299      * @throws NullPointerException {@inheritDoc}
    300      */
    301     public void put(E e) throws InterruptedException {
    302         if (e == null) throw new NullPointerException();
    303         // Note: convention in all put/take/etc is to preset local var
    304         // holding count negative to indicate failure unless set.
    305         int c = -1;
    306         Node<E> node = new Node(e);
    307         final ReentrantLock putLock = this.putLock;
    308         final AtomicInteger count = this.count;
    309         putLock.lockInterruptibly();
    310         try {
    311             /*
    312              * Note that count is used in wait guard even though it is
    313              * not protected by lock. This works because count can
    314              * only decrease at this point (all other puts are shut
    315              * out by lock), and we (or some other waiting put) are
    316              * signalled if it ever changes from capacity. Similarly
    317              * for all other uses of count in other wait guards.
    318              */
    319             while (count.get() == capacity) {
    320                 notFull.await();
    321             }
    322             enqueue(node);
    323             c = count.getAndIncrement();
    324             if (c + 1 < capacity)
    325                 notFull.signal();
    326         } finally {
    327             putLock.unlock();
    328         }
    329         if (c == 0)
    330             signalNotEmpty();
    331     }
    332 
    333     /**
    334      * Inserts the specified element at the tail of this queue, waiting if
    335      * necessary up to the specified wait time for space to become available.
    336      *
    337      * @return {@code true} if successful, or {@code false} if
    338      *         the specified waiting time elapses before space is available.
    339      * @throws InterruptedException {@inheritDoc}
    340      * @throws NullPointerException {@inheritDoc}
    341      */
    342     public boolean offer(E e, long timeout, TimeUnit unit)
    343         throws InterruptedException {
    344 
    345         if (e == null) throw new NullPointerException();
    346         long nanos = unit.toNanos(timeout);
    347         int c = -1;
    348         final ReentrantLock putLock = this.putLock;
    349         final AtomicInteger count = this.count;
    350         putLock.lockInterruptibly();
    351         try {
    352             while (count.get() == capacity) {
    353                 if (nanos <= 0)
    354                     return false;
    355                 nanos = notFull.awaitNanos(nanos);
    356             }
    357             enqueue(new Node<E>(e));
    358             c = count.getAndIncrement();
    359             if (c + 1 < capacity)
    360                 notFull.signal();
    361         } finally {
    362             putLock.unlock();
    363         }
    364         if (c == 0)
    365             signalNotEmpty();
    366         return true;
    367     }
    368 
    369     /**
    370      * Inserts the specified element at the tail of this queue if it is
    371      * possible to do so immediately without exceeding the queue's capacity,
    372      * returning {@code true} upon success and {@code false} if this queue
    373      * is full.
    374      * When using a capacity-restricted queue, this method is generally
    375      * preferable to method {@link BlockingQueue#add add}, which can fail to
    376      * insert an element only by throwing an exception.
    377      *
    378      * @throws NullPointerException if the specified element is null
    379      */
    380     public boolean offer(E e) {
    381         if (e == null) throw new NullPointerException();
    382         final AtomicInteger count = this.count;
    383         if (count.get() == capacity)
    384             return false;
    385         int c = -1;
    386         Node<E> node = new Node(e);
    387         final ReentrantLock putLock = this.putLock;
    388         putLock.lock();
    389         try {
    390             if (count.get() < capacity) {
    391                 enqueue(node);
    392                 c = count.getAndIncrement();
    393                 if (c + 1 < capacity)
    394                     notFull.signal();
    395             }
    396         } finally {
    397             putLock.unlock();
    398         }
    399         if (c == 0)
    400             signalNotEmpty();
    401         return c >= 0;
    402     }
    403 
    404 
    405     public E take() throws InterruptedException {
    406         E x;
    407         int c = -1;
    408         final AtomicInteger count = this.count;
    409         final ReentrantLock takeLock = this.takeLock;
    410         takeLock.lockInterruptibly();
    411         try {
    412             while (count.get() == 0) {
    413                 notEmpty.await();
    414             }
    415             x = dequeue();
    416             c = count.getAndDecrement();
    417             if (c > 1)
    418                 notEmpty.signal();
    419         } finally {
    420             takeLock.unlock();
    421         }
    422         if (c == capacity)
    423             signalNotFull();
    424         return x;
    425     }
    426 
    427     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    428         E x = null;
    429         int c = -1;
    430         long nanos = unit.toNanos(timeout);
    431         final AtomicInteger count = this.count;
    432         final ReentrantLock takeLock = this.takeLock;
    433         takeLock.lockInterruptibly();
    434         try {
    435             while (count.get() == 0) {
    436                 if (nanos <= 0)
    437                     return null;
    438                 nanos = notEmpty.awaitNanos(nanos);
    439             }
    440             x = dequeue();
    441             c = count.getAndDecrement();
    442             if (c > 1)
    443                 notEmpty.signal();
    444         } finally {
    445             takeLock.unlock();
    446         }
    447         if (c == capacity)
    448             signalNotFull();
    449         return x;
    450     }
    451 
    452     public E poll() {
    453         final AtomicInteger count = this.count;
    454         if (count.get() == 0)
    455             return null;
    456         E x = null;
    457         int c = -1;
    458         final ReentrantLock takeLock = this.takeLock;
    459         takeLock.lock();
    460         try {
    461             if (count.get() > 0) {
    462                 x = dequeue();
    463                 c = count.getAndDecrement();
    464                 if (c > 1)
    465                     notEmpty.signal();
    466             }
    467         } finally {
    468             takeLock.unlock();
    469         }
    470         if (c == capacity)
    471             signalNotFull();
    472         return x;
    473     }
    474 
    475     public E peek() {
    476         if (count.get() == 0)
    477             return null;
    478         final ReentrantLock takeLock = this.takeLock;
    479         takeLock.lock();
    480         try {
    481             Node<E> first = head.next;
    482             if (first == null)
    483                 return null;
    484             else
    485                 return first.item;
    486         } finally {
    487             takeLock.unlock();
    488         }
    489     }
    490 
    491     /**
    492      * Unlinks interior Node p with predecessor trail.
    493      */
    494     void unlink(Node<E> p, Node<E> trail) {
    495         // assert isFullyLocked();
    496         // p.next is not changed, to allow iterators that are
    497         // traversing p to maintain their weak-consistency guarantee.
    498         p.item = null;
    499         trail.next = p.next;
    500         if (last == p)
    501             last = trail;
    502         if (count.getAndDecrement() == capacity)
    503             notFull.signal();
    504     }
    505 
    506     /**
    507      * Removes a single instance of the specified element from this queue,
    508      * if it is present.  More formally, removes an element {@code e} such
    509      * that {@code o.equals(e)}, if this queue contains one or more such
    510      * elements.
    511      * Returns {@code true} if this queue contained the specified element
    512      * (or equivalently, if this queue changed as a result of the call).
    513      *
    514      * @param o element to be removed from this queue, if present
    515      * @return {@code true} if this queue changed as a result of the call
    516      */
    517     public boolean remove(Object o) {
    518         if (o == null) return false;
    519         fullyLock();
    520         try {
    521             for (Node<E> trail = head, p = trail.next;
    522                  p != null;
    523                  trail = p, p = p.next) {
    524                 if (o.equals(p.item)) {
    525                     unlink(p, trail);
    526                     return true;
    527                 }
    528             }
    529             return false;
    530         } finally {
    531             fullyUnlock();
    532         }
    533     }
    534 
    535     /**
    536      * Returns {@code true} if this queue contains the specified element.
    537      * More formally, returns {@code true} if and only if this queue contains
    538      * at least one element {@code e} such that {@code o.equals(e)}.
    539      *
    540      * @param o object to be checked for containment in this queue
    541      * @return {@code true} if this queue contains the specified element
    542      */
    543     public boolean contains(Object o) {
    544         if (o == null) return false;
    545         fullyLock();
    546         try {
    547             for (Node<E> p = head.next; p != null; p = p.next)
    548                 if (o.equals(p.item))
    549                     return true;
    550             return false;
    551         } finally {
    552             fullyUnlock();
    553         }
    554     }
    555 
    556     /**
    557      * Returns an array containing all of the elements in this queue, in
    558      * proper sequence.
    559      *
    560      * <p>The returned array will be "safe" in that no references to it are
    561      * maintained by this queue.  (In other words, this method must allocate
    562      * a new array).  The caller is thus free to modify the returned array.
    563      *
    564      * <p>This method acts as bridge between array-based and collection-based
    565      * APIs.
    566      *
    567      * @return an array containing all of the elements in this queue
    568      */
    569     public Object[] toArray() {
    570         fullyLock();
    571         try {
    572             int size = count.get();
    573             Object[] a = new Object[size];
    574             int k = 0;
    575             for (Node<E> p = head.next; p != null; p = p.next)
    576                 a[k++] = p.item;
    577             return a;
    578         } finally {
    579             fullyUnlock();
    580         }
    581     }
    582 
    583     /**
    584      * Returns an array containing all of the elements in this queue, in
    585      * proper sequence; the runtime type of the returned array is that of
    586      * the specified array.  If the queue fits in the specified array, it
    587      * is returned therein.  Otherwise, a new array is allocated with the
    588      * runtime type of the specified array and the size of this queue.
    589      *
    590      * <p>If this queue fits in the specified array with room to spare
    591      * (i.e., the array has more elements than this queue), the element in
    592      * the array immediately following the end of the queue is set to
    593      * {@code null}.
    594      *
    595      * <p>Like the {@link #toArray()} method, this method acts as bridge between
    596      * array-based and collection-based APIs.  Further, this method allows
    597      * precise control over the runtime type of the output array, and may,
    598      * under certain circumstances, be used to save allocation costs.
    599      *
    600      * <p>Suppose {@code x} is a queue known to contain only strings.
    601      * The following code can be used to dump the queue into a newly
    602      * allocated array of {@code String}:
    603      *
    604      * <pre>
    605      *     String[] y = x.toArray(new String[0]);</pre>
    606      *
    607      * Note that {@code toArray(new Object[0])} is identical in function to
    608      * {@code toArray()}.
    609      *
    610      * @param a the array into which the elements of the queue are to
    611      *          be stored, if it is big enough; otherwise, a new array of the
    612      *          same runtime type is allocated for this purpose
    613      * @return an array containing all of the elements in this queue
    614      * @throws ArrayStoreException if the runtime type of the specified array
    615      *         is not a supertype of the runtime type of every element in
    616      *         this queue
    617      * @throws NullPointerException if the specified array is null
    618      */
    619     @SuppressWarnings("unchecked")
    620     public <T> T[] toArray(T[] a) {
    621         fullyLock();
    622         try {
    623             int size = count.get();
    624             if (a.length < size)
    625                 a = (T[])java.lang.reflect.Array.newInstance
    626                     (a.getClass().getComponentType(), size);
    627 
    628             int k = 0;
    629             for (Node<E> p = head.next; p != null; p = p.next)
    630                 a[k++] = (T)p.item;
    631             if (a.length > k)
    632                 a[k] = null;
    633             return a;
    634         } finally {
    635             fullyUnlock();
    636         }
    637     }
    638 
    639     public String toString() {
    640         fullyLock();
    641         try {
    642             Node<E> p = head.next;
    643             if (p == null)
    644                 return "[]";
    645 
    646             StringBuilder sb = new StringBuilder();
    647             sb.append('[');
    648             for (;;) {
    649                 E e = p.item;
    650                 sb.append(e == this ? "(this Collection)" : e);
    651                 p = p.next;
    652                 if (p == null)
    653                     return sb.append(']').toString();
    654                 sb.append(',').append(' ');
    655             }
    656         } finally {
    657             fullyUnlock();
    658         }
    659     }
    660 
    661     /**
    662      * Atomically removes all of the elements from this queue.
    663      * The queue will be empty after this call returns.
    664      */
    665     public void clear() {
    666         fullyLock();
    667         try {
    668             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
    669                 h.next = h;
    670                 p.item = null;
    671             }
    672             head = last;
    673             // assert head.item == null && head.next == null;
    674             if (count.getAndSet(0) == capacity)
    675                 notFull.signal();
    676         } finally {
    677             fullyUnlock();
    678         }
    679     }
    680 
    681     /**
    682      * @throws UnsupportedOperationException {@inheritDoc}
    683      * @throws ClassCastException            {@inheritDoc}
    684      * @throws NullPointerException          {@inheritDoc}
    685      * @throws IllegalArgumentException      {@inheritDoc}
    686      */
    687     public int drainTo(Collection<? super E> c) {
    688         return drainTo(c, Integer.MAX_VALUE);
    689     }
    690 
    691     /**
    692      * @throws UnsupportedOperationException {@inheritDoc}
    693      * @throws ClassCastException            {@inheritDoc}
    694      * @throws NullPointerException          {@inheritDoc}
    695      * @throws IllegalArgumentException      {@inheritDoc}
    696      */
    697     public int drainTo(Collection<? super E> c, int maxElements) {
    698         if (c == null)
    699             throw new NullPointerException();
    700         if (c == this)
    701             throw new IllegalArgumentException();
    702         boolean signalNotFull = false;
    703         final ReentrantLock takeLock = this.takeLock;
    704         takeLock.lock();
    705         try {
    706             int n = Math.min(maxElements, count.get());
    707             // count.get provides visibility to first n Nodes
    708             Node<E> h = head;
    709             int i = 0;
    710             try {
    711                 while (i < n) {
    712                     Node<E> p = h.next;
    713                     c.add(p.item);
    714                     p.item = null;
    715                     h.next = h;
    716                     h = p;
    717                     ++i;
    718                 }
    719                 return n;
    720             } finally {
    721                 // Restore invariants even if c.add() threw
    722                 if (i > 0) {
    723                     // assert h.item == null;
    724                     head = h;
    725                     signalNotFull = (count.getAndAdd(-i) == capacity);
    726                 }
    727             }
    728         } finally {
    729             takeLock.unlock();
    730             if (signalNotFull)
    731                 signalNotFull();
    732         }
    733     }
    734 
    735     /**
    736      * Returns an iterator over the elements in this queue in proper sequence.
    737      * The elements will be returned in order from first (head) to last (tail).
    738      *
    739      * <p>The returned iterator is a "weakly consistent" iterator that
    740      * will never throw {@link java.util.ConcurrentModificationException
    741      * ConcurrentModificationException}, and guarantees to traverse
    742      * elements as they existed upon construction of the iterator, and
    743      * may (but is not guaranteed to) reflect any modifications
    744      * subsequent to construction.
    745      *
    746      * @return an iterator over the elements in this queue in proper sequence
    747      */
    748     public Iterator<E> iterator() {
    749       return new Itr();
    750     }
    751 
    752     private class Itr implements Iterator<E> {
    753         /*
    754          * Basic weakly-consistent iterator.  At all times hold the next
    755          * item to hand out so that if hasNext() reports true, we will
    756          * still have it to return even if lost race with a take etc.
    757          */
    758         private Node<E> current;
    759         private Node<E> lastRet;
    760         private E currentElement;
    761 
    762         Itr() {
    763             fullyLock();
    764             try {
    765                 current = head.next;
    766                 if (current != null)
    767                     currentElement = current.item;
    768             } finally {
    769                 fullyUnlock();
    770             }
    771         }
    772 
    773         public boolean hasNext() {
    774             return current != null;
    775         }
    776 
    777         /**
    778          * Returns the next live successor of p, or null if no such.
    779          *
    780          * Unlike other traversal methods, iterators need to handle both:
    781          * - dequeued nodes (p.next == p)
    782          * - (possibly multiple) interior removed nodes (p.item == null)
    783          */
    784         private Node<E> nextNode(Node<E> p) {
    785             for (;;) {
    786                 Node<E> s = p.next;
    787                 if (s == p)
    788                     return head.next;
    789                 if (s == null || s.item != null)
    790                     return s;
    791                 p = s;
    792             }
    793         }
    794 
    795         public E next() {
    796             fullyLock();
    797             try {
    798                 if (current == null)
    799                     throw new NoSuchElementException();
    800                 E x = currentElement;
    801                 lastRet = current;
    802                 current = nextNode(current);
    803                 currentElement = (current == null) ? null : current.item;
    804                 return x;
    805             } finally {
    806                 fullyUnlock();
    807             }
    808         }
    809 
    810         public void remove() {
    811             if (lastRet == null)
    812                 throw new IllegalStateException();
    813             fullyLock();
    814             try {
    815                 Node<E> node = lastRet;
    816                 lastRet = null;
    817                 for (Node<E> trail = head, p = trail.next;
    818                      p != null;
    819                      trail = p, p = p.next) {
    820                     if (p == node) {
    821                         unlink(p, trail);
    822                         break;
    823                     }
    824                 }
    825             } finally {
    826                 fullyUnlock();
    827             }
    828         }
    829     }
    830 
    831     /**
    832      * Save the state to a stream (that is, serialize it).
    833      *
    834      * @serialData The capacity is emitted (int), followed by all of
    835      * its elements (each an {@code Object}) in the proper order,
    836      * followed by a null
    837      * @param s the stream
    838      */
    839     private void writeObject(java.io.ObjectOutputStream s)
    840         throws java.io.IOException {
    841 
    842         fullyLock();
    843         try {
    844             // Write out any hidden stuff, plus capacity
    845             s.defaultWriteObject();
    846 
    847             // Write out all elements in the proper order.
    848             for (Node<E> p = head.next; p != null; p = p.next)
    849                 s.writeObject(p.item);
    850 
    851             // Use trailing null as sentinel
    852             s.writeObject(null);
    853         } finally {
    854             fullyUnlock();
    855         }
    856     }
    857 
    858     /**
    859      * Reconstitute this queue instance from a stream (that is,
    860      * deserialize it).
    861      *
    862      * @param s the stream
    863      */
    864     private void readObject(java.io.ObjectInputStream s)
    865         throws java.io.IOException, ClassNotFoundException {
    866         // Read in capacity, and any hidden stuff
    867         s.defaultReadObject();
    868 
    869         count.set(0);
    870         last = head = new Node<E>(null);
    871 
    872         // Read in all elements and place in queue
    873         for (;;) {
    874             @SuppressWarnings("unchecked")
    875             E item = (E)s.readObject();
    876             if (item == null)
    877                 break;
    878             add(item);
    879         }
    880     }
    881 }
    882