Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      3  *
      4  * This code is free software; you can redistribute it and/or modify it
      5  * under the terms of the GNU General Public License version 2 only, as
      6  * published by the Free Software Foundation.  Oracle designates this
      7  * particular file as subject to the "Classpath" exception as provided
      8  * by Oracle in the LICENSE file that accompanied this code.
      9  *
     10  * This code is distributed in the hope that it will be useful, but WITHOUT
     11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     13  * version 2 for more details (a copy is included in the LICENSE file that
     14  * accompanied this code).
     15  *
     16  * You should have received a copy of the GNU General Public License version
     17  * 2 along with this work; if not, write to the Free Software Foundation,
     18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     19  *
     20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     21  * or visit www.oracle.com if you need additional information or have any
     22  * questions.
     23  */
     24 
     25 /*
     26  * This file is available under and governed by the GNU General Public
     27  * License version 2 only, as published by the Free Software Foundation.
     28  * However, the following notice accompanied the original version of this
     29  * file:
     30  *
     31  * Written by Doug Lea with assistance from members of JCP JSR-166
     32  * Expert Group and released to the public domain, as explained at
     33  * http://creativecommons.org/publicdomain/zero/1.0/
     34  */
     35 
     36 package java.util.concurrent;
     37 
     38 import java.util.AbstractQueue;
     39 import java.util.Collection;
     40 import java.util.Iterator;
     41 import java.util.NoSuchElementException;
     42 import java.util.Spliterator;
     43 import java.util.Spliterators;
     44 import java.util.concurrent.atomic.AtomicInteger;
     45 import java.util.concurrent.locks.Condition;
     46 import java.util.concurrent.locks.ReentrantLock;
     47 import java.util.function.Consumer;
     48 
     49 // BEGIN android-note
     50 // removed link to collections framework docs
     51 // END android-note
     52 
     53 /**
     54  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
     55  * linked nodes.
     56  * This queue orders elements FIFO (first-in-first-out).
     57  * The <em>head</em> of the queue is that element that has been on the
     58  * queue the longest time.
     59  * The <em>tail</em> of the queue is that element that has been on the
     60  * queue the shortest time. New elements
     61  * are inserted at the tail of the queue, and the queue retrieval
     62  * operations obtain elements at the head of the queue.
     63  * Linked queues typically have higher throughput than array-based queues but
     64  * less predictable performance in most concurrent applications.
     65  *
     66  * <p>The optional capacity bound constructor argument serves as a
     67  * way to prevent excessive queue expansion. The capacity, if unspecified,
     68  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
     69  * dynamically created upon each insertion unless this would bring the
     70  * queue above capacity.
     71  *
     72  * <p>This class and its iterator implement all of the
     73  * <em>optional</em> methods of the {@link Collection} and {@link
     74  * Iterator} interfaces.
     75  *
     76  * @since 1.5
     77  * @author Doug Lea
     78  * @param <E> the type of elements held in this queue
     79  */
     80 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
     81         implements BlockingQueue<E>, java.io.Serializable {
     82     private static final long serialVersionUID = -6903933977591709194L;
     83 
     84     /*
     85      * A variant of the "two lock queue" algorithm.  The putLock gates
     86      * entry to put (and offer), and has an associated condition for
     87      * waiting puts.  Similarly for the takeLock.  The "count" field
     88      * that they both rely on is maintained as an atomic to avoid
     89      * needing to get both locks in most cases. Also, to minimize need
     90      * for puts to get takeLock and vice-versa, cascading notifies are
     91      * used. When a put notices that it has enabled at least one take,
     92      * it signals taker. That taker in turn signals others if more
     93      * items have been entered since the signal. And symmetrically for
     94      * takes signalling puts. Operations such as remove(Object) and
     95      * iterators acquire both locks.
     96      *
     97      * Visibility between writers and readers is provided as follows:
     98      *
     99      * Whenever an element is enqueued, the putLock is acquired and
    100      * count updated.  A subsequent reader guarantees visibility to the
    101      * enqueued Node by either acquiring the putLock (via fullyLock)
    102      * or by acquiring the takeLock, and then reading n = count.get();
    103      * this gives visibility to the first n items.
    104      *
    105      * To implement weakly consistent iterators, it appears we need to
    106      * keep all Nodes GC-reachable from a predecessor dequeued Node.
    107      * That would cause two problems:
    108      * - allow a rogue Iterator to cause unbounded memory retention
    109      * - cause cross-generational linking of old Nodes to new Nodes if
    110      *   a Node was tenured while live, which generational GCs have a
    111      *   hard time dealing with, causing repeated major collections.
    112      * However, only non-deleted Nodes need to be reachable from
    113      * dequeued Nodes, and reachability does not necessarily have to
    114      * be of the kind understood by the GC.  We use the trick of
    115      * linking a Node that has just been dequeued to itself.  Such a
    116      * self-link implicitly means to advance to head.next.
    117      */
    118 
    119     /**
    120      * Linked list node class.
    121      */
    122     static class Node<E> {
    123         E item;
    124 
    125         /**
    126          * One of:
    127          * - the real successor Node
    128          * - this Node, meaning the successor is head.next
    129          * - null, meaning there is no successor (this is the last node)
    130          */
    131         Node<E> next;
    132 
    133         Node(E x) { item = x; }
    134     }
    135 
    136     /** The capacity bound, or Integer.MAX_VALUE if none */
    137     private final int capacity;
    138 
    139     /** Current number of elements */
    140     private final AtomicInteger count = new AtomicInteger();
    141 
    142     /**
    143      * Head of linked list.
    144      * Invariant: head.item == null
    145      */
    146     transient Node<E> head;
    147 
    148     /**
    149      * Tail of linked list.
    150      * Invariant: last.next == null
    151      */
    152     private transient Node<E> last;
    153 
    154     /** Lock held by take, poll, etc */
    155     private final ReentrantLock takeLock = new ReentrantLock();
    156 
    157     /** Wait queue for waiting takes */
    158     private final Condition notEmpty = takeLock.newCondition();
    159 
    160     /** Lock held by put, offer, etc */
    161     private final ReentrantLock putLock = new ReentrantLock();
    162 
    163     /** Wait queue for waiting puts */
    164     private final Condition notFull = putLock.newCondition();
    165 
    166     /**
    167      * Signals a waiting take. Called only from put/offer (which do not
    168      * otherwise ordinarily lock takeLock.)
    169      */
    170     private void signalNotEmpty() {
    171         final ReentrantLock takeLock = this.takeLock;
    172         takeLock.lock();
    173         try {
    174             notEmpty.signal();
    175         } finally {
    176             takeLock.unlock();
    177         }
    178     }
    179 
    180     /**
    181      * Signals a waiting put. Called only from take/poll.
    182      */
    183     private void signalNotFull() {
    184         final ReentrantLock putLock = this.putLock;
    185         putLock.lock();
    186         try {
    187             notFull.signal();
    188         } finally {
    189             putLock.unlock();
    190         }
    191     }
    192 
    193     /**
    194      * Links node at end of queue.
    195      *
    196      * @param node the node
    197      */
    198     private void enqueue(Node<E> node) {
    199         // assert putLock.isHeldByCurrentThread();
    200         // assert last.next == null;
    201         last = last.next = node;
    202     }
    203 
    204     /**
    205      * Removes a node from head of queue.
    206      *
    207      * @return the node
    208      */
    209     private E dequeue() {
    210         // assert takeLock.isHeldByCurrentThread();
    211         // assert head.item == null;
    212         Node<E> h = head;
    213         Node<E> first = h.next;
    214         h.next = h; // help GC
    215         head = first;
    216         E x = first.item;
    217         first.item = null;
    218         return x;
    219     }
    220 
    221     /**
    222      * Locks to prevent both puts and takes.
    223      */
    224     void fullyLock() {
    225         putLock.lock();
    226         takeLock.lock();
    227     }
    228 
    229     /**
    230      * Unlocks to allow both puts and takes.
    231      */
    232     void fullyUnlock() {
    233         takeLock.unlock();
    234         putLock.unlock();
    235     }
    236 
    237 //     /**
    238 //      * Tells whether both locks are held by current thread.
    239 //      */
    240 //     boolean isFullyLocked() {
    241 //         return (putLock.isHeldByCurrentThread() &&
    242 //                 takeLock.isHeldByCurrentThread());
    243 //     }
    244 
    245     /**
    246      * Creates a {@code LinkedBlockingQueue} with a capacity of
    247      * {@link Integer#MAX_VALUE}.
    248      */
    249     public LinkedBlockingQueue() {
    250         this(Integer.MAX_VALUE);
    251     }
    252 
    253     /**
    254      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
    255      *
    256      * @param capacity the capacity of this queue
    257      * @throws IllegalArgumentException if {@code capacity} is not greater
    258      *         than zero
    259      */
    260     public LinkedBlockingQueue(int capacity) {
    261         if (capacity <= 0) throw new IllegalArgumentException();
    262         this.capacity = capacity;
    263         last = head = new Node<E>(null);
    264     }
    265 
    266     /**
    267      * Creates a {@code LinkedBlockingQueue} with a capacity of
    268      * {@link Integer#MAX_VALUE}, initially containing the elements of the
    269      * given collection,
    270      * added in traversal order of the collection's iterator.
    271      *
    272      * @param c the collection of elements to initially contain
    273      * @throws NullPointerException if the specified collection or any
    274      *         of its elements are null
    275      */
    276     public LinkedBlockingQueue(Collection<? extends E> c) {
    277         this(Integer.MAX_VALUE);
    278         final ReentrantLock putLock = this.putLock;
    279         putLock.lock(); // Never contended, but necessary for visibility
    280         try {
    281             int n = 0;
    282             for (E e : c) {
    283                 if (e == null)
    284                     throw new NullPointerException();
    285                 if (n == capacity)
    286                     throw new IllegalStateException("Queue full");
    287                 enqueue(new Node<E>(e));
    288                 ++n;
    289             }
    290             count.set(n);
    291         } finally {
    292             putLock.unlock();
    293         }
    294     }
    295 
    296     // this doc comment is overridden to remove the reference to collections
    297     // greater in size than Integer.MAX_VALUE
    298     /**
    299      * Returns the number of elements in this queue.
    300      *
    301      * @return the number of elements in this queue
    302      */
    303     public int size() {
    304         return count.get();
    305     }
    306 
    307     // this doc comment is a modified copy of the inherited doc comment,
    308     // without the reference to unlimited queues.
    309     /**
    310      * Returns the number of additional elements that this queue can ideally
    311      * (in the absence of memory or resource constraints) accept without
    312      * blocking. This is always equal to the initial capacity of this queue
    313      * less the current {@code size} of this queue.
    314      *
    315      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
    316      * an element will succeed by inspecting {@code remainingCapacity}
    317      * because it may be the case that another thread is about to
    318      * insert or remove an element.
    319      */
    320     public int remainingCapacity() {
    321         return capacity - count.get();
    322     }
    323 
    324     /**
    325      * Inserts the specified element at the tail of this queue, waiting if
    326      * necessary for space to become available.
    327      *
    328      * @throws InterruptedException {@inheritDoc}
    329      * @throws NullPointerException {@inheritDoc}
    330      */
    331     public void put(E e) throws InterruptedException {
    332         if (e == null) throw new NullPointerException();
    333         // Note: convention in all put/take/etc is to preset local var
    334         // holding count negative to indicate failure unless set.
    335         int c = -1;
    336         Node<E> node = new Node<E>(e);
    337         final ReentrantLock putLock = this.putLock;
    338         final AtomicInteger count = this.count;
    339         putLock.lockInterruptibly();
    340         try {
    341             /*
    342              * Note that count is used in wait guard even though it is
    343              * not protected by lock. This works because count can
    344              * only decrease at this point (all other puts are shut
    345              * out by lock), and we (or some other waiting put) are
    346              * signalled if it ever changes from capacity. Similarly
    347              * for all other uses of count in other wait guards.
    348              */
    349             while (count.get() == capacity) {
    350                 notFull.await();
    351             }
    352             enqueue(node);
    353             c = count.getAndIncrement();
    354             if (c + 1 < capacity)
    355                 notFull.signal();
    356         } finally {
    357             putLock.unlock();
    358         }
    359         if (c == 0)
    360             signalNotEmpty();
    361     }
    362 
    363     /**
    364      * Inserts the specified element at the tail of this queue, waiting if
    365      * necessary up to the specified wait time for space to become available.
    366      *
    367      * @return {@code true} if successful, or {@code false} if
    368      *         the specified waiting time elapses before space is available
    369      * @throws InterruptedException {@inheritDoc}
    370      * @throws NullPointerException {@inheritDoc}
    371      */
    372     public boolean offer(E e, long timeout, TimeUnit unit)
    373         throws InterruptedException {
    374 
    375         if (e == null) throw new NullPointerException();
    376         long nanos = unit.toNanos(timeout);
    377         int c = -1;
    378         final ReentrantLock putLock = this.putLock;
    379         final AtomicInteger count = this.count;
    380         putLock.lockInterruptibly();
    381         try {
    382             while (count.get() == capacity) {
    383                 if (nanos <= 0L)
    384                     return false;
    385                 nanos = notFull.awaitNanos(nanos);
    386             }
    387             enqueue(new Node<E>(e));
    388             c = count.getAndIncrement();
    389             if (c + 1 < capacity)
    390                 notFull.signal();
    391         } finally {
    392             putLock.unlock();
    393         }
    394         if (c == 0)
    395             signalNotEmpty();
    396         return true;
    397     }
    398 
    399     /**
    400      * Inserts the specified element at the tail of this queue if it is
    401      * possible to do so immediately without exceeding the queue's capacity,
    402      * returning {@code true} upon success and {@code false} if this queue
    403      * is full.
    404      * When using a capacity-restricted queue, this method is generally
    405      * preferable to method {@link BlockingQueue#add add}, which can fail to
    406      * insert an element only by throwing an exception.
    407      *
    408      * @throws NullPointerException if the specified element is null
    409      */
    410     public boolean offer(E e) {
    411         if (e == null) throw new NullPointerException();
    412         final AtomicInteger count = this.count;
    413         if (count.get() == capacity)
    414             return false;
    415         int c = -1;
    416         Node<E> node = new Node<E>(e);
    417         final ReentrantLock putLock = this.putLock;
    418         putLock.lock();
    419         try {
    420             if (count.get() < capacity) {
    421                 enqueue(node);
    422                 c = count.getAndIncrement();
    423                 if (c + 1 < capacity)
    424                     notFull.signal();
    425             }
    426         } finally {
    427             putLock.unlock();
    428         }
    429         if (c == 0)
    430             signalNotEmpty();
    431         return c >= 0;
    432     }
    433 
    434     public E take() throws InterruptedException {
    435         E x;
    436         int c = -1;
    437         final AtomicInteger count = this.count;
    438         final ReentrantLock takeLock = this.takeLock;
    439         takeLock.lockInterruptibly();
    440         try {
    441             while (count.get() == 0) {
    442                 notEmpty.await();
    443             }
    444             x = dequeue();
    445             c = count.getAndDecrement();
    446             if (c > 1)
    447                 notEmpty.signal();
    448         } finally {
    449             takeLock.unlock();
    450         }
    451         if (c == capacity)
    452             signalNotFull();
    453         return x;
    454     }
    455 
    456     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    457         E x = null;
    458         int c = -1;
    459         long nanos = unit.toNanos(timeout);
    460         final AtomicInteger count = this.count;
    461         final ReentrantLock takeLock = this.takeLock;
    462         takeLock.lockInterruptibly();
    463         try {
    464             while (count.get() == 0) {
    465                 if (nanos <= 0L)
    466                     return null;
    467                 nanos = notEmpty.awaitNanos(nanos);
    468             }
    469             x = dequeue();
    470             c = count.getAndDecrement();
    471             if (c > 1)
    472                 notEmpty.signal();
    473         } finally {
    474             takeLock.unlock();
    475         }
    476         if (c == capacity)
    477             signalNotFull();
    478         return x;
    479     }
    480 
    481     public E poll() {
    482         final AtomicInteger count = this.count;
    483         if (count.get() == 0)
    484             return null;
    485         E x = null;
    486         int c = -1;
    487         final ReentrantLock takeLock = this.takeLock;
    488         takeLock.lock();
    489         try {
    490             if (count.get() > 0) {
    491                 x = dequeue();
    492                 c = count.getAndDecrement();
    493                 if (c > 1)
    494                     notEmpty.signal();
    495             }
    496         } finally {
    497             takeLock.unlock();
    498         }
    499         if (c == capacity)
    500             signalNotFull();
    501         return x;
    502     }
    503 
    504     public E peek() {
    505         if (count.get() == 0)
    506             return null;
    507         final ReentrantLock takeLock = this.takeLock;
    508         takeLock.lock();
    509         try {
    510             return (count.get() > 0) ? head.next.item : null;
    511         } finally {
    512             takeLock.unlock();
    513         }
    514     }
    515 
    516     /**
    517      * Unlinks interior Node p with predecessor trail.
    518      */
    519     void unlink(Node<E> p, Node<E> trail) {
    520         // assert isFullyLocked();
    521         // p.next is not changed, to allow iterators that are
    522         // traversing p to maintain their weak-consistency guarantee.
    523         p.item = null;
    524         trail.next = p.next;
    525         if (last == p)
    526             last = trail;
    527         if (count.getAndDecrement() == capacity)
    528             notFull.signal();
    529     }
    530 
    531     /**
    532      * Removes a single instance of the specified element from this queue,
    533      * if it is present.  More formally, removes an element {@code e} such
    534      * that {@code o.equals(e)}, if this queue contains one or more such
    535      * elements.
    536      * Returns {@code true} if this queue contained the specified element
    537      * (or equivalently, if this queue changed as a result of the call).
    538      *
    539      * @param o element to be removed from this queue, if present
    540      * @return {@code true} if this queue changed as a result of the call
    541      */
    542     public boolean remove(Object o) {
    543         if (o == null) return false;
    544         fullyLock();
    545         try {
    546             for (Node<E> trail = head, p = trail.next;
    547                  p != null;
    548                  trail = p, p = p.next) {
    549                 if (o.equals(p.item)) {
    550                     unlink(p, trail);
    551                     return true;
    552                 }
    553             }
    554             return false;
    555         } finally {
    556             fullyUnlock();
    557         }
    558     }
    559 
    560     /**
    561      * Returns {@code true} if this queue contains the specified element.
    562      * More formally, returns {@code true} if and only if this queue contains
    563      * at least one element {@code e} such that {@code o.equals(e)}.
    564      *
    565      * @param o object to be checked for containment in this queue
    566      * @return {@code true} if this queue contains the specified element
    567      */
    568     public boolean contains(Object o) {
    569         if (o == null) return false;
    570         fullyLock();
    571         try {
    572             for (Node<E> p = head.next; p != null; p = p.next)
    573                 if (o.equals(p.item))
    574                     return true;
    575             return false;
    576         } finally {
    577             fullyUnlock();
    578         }
    579     }
    580 
    581     /**
    582      * Returns an array containing all of the elements in this queue, in
    583      * proper sequence.
    584      *
    585      * <p>The returned array will be "safe" in that no references to it are
    586      * maintained by this queue.  (In other words, this method must allocate
    587      * a new array).  The caller is thus free to modify the returned array.
    588      *
    589      * <p>This method acts as bridge between array-based and collection-based
    590      * APIs.
    591      *
    592      * @return an array containing all of the elements in this queue
    593      */
    594     public Object[] toArray() {
    595         fullyLock();
    596         try {
    597             int size = count.get();
    598             Object[] a = new Object[size];
    599             int k = 0;
    600             for (Node<E> p = head.next; p != null; p = p.next)
    601                 a[k++] = p.item;
    602             return a;
    603         } finally {
    604             fullyUnlock();
    605         }
    606     }
    607 
    608     /**
    609      * Returns an array containing all of the elements in this queue, in
    610      * proper sequence; the runtime type of the returned array is that of
    611      * the specified array.  If the queue fits in the specified array, it
    612      * is returned therein.  Otherwise, a new array is allocated with the
    613      * runtime type of the specified array and the size of this queue.
    614      *
    615      * <p>If this queue fits in the specified array with room to spare
    616      * (i.e., the array has more elements than this queue), the element in
    617      * the array immediately following the end of the queue is set to
    618      * {@code null}.
    619      *
    620      * <p>Like the {@link #toArray()} method, this method acts as bridge between
    621      * array-based and collection-based APIs.  Further, this method allows
    622      * precise control over the runtime type of the output array, and may,
    623      * under certain circumstances, be used to save allocation costs.
    624      *
    625      * <p>Suppose {@code x} is a queue known to contain only strings.
    626      * The following code can be used to dump the queue into a newly
    627      * allocated array of {@code String}:
    628      *
    629      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
    630      *
    631      * Note that {@code toArray(new Object[0])} is identical in function to
    632      * {@code toArray()}.
    633      *
    634      * @param a the array into which the elements of the queue are to
    635      *          be stored, if it is big enough; otherwise, a new array of the
    636      *          same runtime type is allocated for this purpose
    637      * @return an array containing all of the elements in this queue
    638      * @throws ArrayStoreException if the runtime type of the specified array
    639      *         is not a supertype of the runtime type of every element in
    640      *         this queue
    641      * @throws NullPointerException if the specified array is null
    642      */
    643     @SuppressWarnings("unchecked")
    644     public <T> T[] toArray(T[] a) {
    645         fullyLock();
    646         try {
    647             int size = count.get();
    648             if (a.length < size)
    649                 a = (T[])java.lang.reflect.Array.newInstance
    650                     (a.getClass().getComponentType(), size);
    651 
    652             int k = 0;
    653             for (Node<E> p = head.next; p != null; p = p.next)
    654                 a[k++] = (T)p.item;
    655             if (a.length > k)
    656                 a[k] = null;
    657             return a;
    658         } finally {
    659             fullyUnlock();
    660         }
    661     }
    662 
    663     public String toString() {
    664         return Helpers.collectionToString(this);
    665     }
    666 
    667     /**
    668      * Atomically removes all of the elements from this queue.
    669      * The queue will be empty after this call returns.
    670      */
    671     public void clear() {
    672         fullyLock();
    673         try {
    674             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
    675                 h.next = h;
    676                 p.item = null;
    677             }
    678             head = last;
    679             // assert head.item == null && head.next == null;
    680             if (count.getAndSet(0) == capacity)
    681                 notFull.signal();
    682         } finally {
    683             fullyUnlock();
    684         }
    685     }
    686 
    687     /**
    688      * @throws UnsupportedOperationException {@inheritDoc}
    689      * @throws ClassCastException            {@inheritDoc}
    690      * @throws NullPointerException          {@inheritDoc}
    691      * @throws IllegalArgumentException      {@inheritDoc}
    692      */
    693     public int drainTo(Collection<? super E> c) {
    694         return drainTo(c, Integer.MAX_VALUE);
    695     }
    696 
    697     /**
    698      * @throws UnsupportedOperationException {@inheritDoc}
    699      * @throws ClassCastException            {@inheritDoc}
    700      * @throws NullPointerException          {@inheritDoc}
    701      * @throws IllegalArgumentException      {@inheritDoc}
    702      */
    703     public int drainTo(Collection<? super E> c, int maxElements) {
    704         if (c == null)
    705             throw new NullPointerException();
    706         if (c == this)
    707             throw new IllegalArgumentException();
    708         if (maxElements <= 0)
    709             return 0;
    710         boolean signalNotFull = false;
    711         final ReentrantLock takeLock = this.takeLock;
    712         takeLock.lock();
    713         try {
    714             int n = Math.min(maxElements, count.get());
    715             // count.get provides visibility to first n Nodes
    716             Node<E> h = head;
    717             int i = 0;
    718             try {
    719                 while (i < n) {
    720                     Node<E> p = h.next;
    721                     c.add(p.item);
    722                     p.item = null;
    723                     h.next = h;
    724                     h = p;
    725                     ++i;
    726                 }
    727                 return n;
    728             } finally {
    729                 // Restore invariants even if c.add() threw
    730                 if (i > 0) {
    731                     // assert h.item == null;
    732                     head = h;
    733                     signalNotFull = (count.getAndAdd(-i) == capacity);
    734                 }
    735             }
    736         } finally {
    737             takeLock.unlock();
    738             if (signalNotFull)
    739                 signalNotFull();
    740         }
    741     }
    742 
    743     /**
    744      * Returns an iterator over the elements in this queue in proper sequence.
    745      * The elements will be returned in order from first (head) to last (tail).
    746      *
    747      * <p>The returned iterator is
    748      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
    749      *
    750      * @return an iterator over the elements in this queue in proper sequence
    751      */
    752     public Iterator<E> iterator() {
    753         return new Itr();
    754     }
    755 
    756     private class Itr implements Iterator<E> {
    757         /*
    758          * Basic weakly-consistent iterator.  At all times hold the next
    759          * item to hand out so that if hasNext() reports true, we will
    760          * still have it to return even if lost race with a take etc.
    761          */
    762 
    763         private Node<E> current;
    764         private Node<E> lastRet;
    765         private E currentElement;
    766 
    767         Itr() {
    768             fullyLock();
    769             try {
    770                 current = head.next;
    771                 if (current != null)
    772                     currentElement = current.item;
    773             } finally {
    774                 fullyUnlock();
    775             }
    776         }
    777 
    778         public boolean hasNext() {
    779             return current != null;
    780         }
    781 
    782         public E next() {
    783             fullyLock();
    784             try {
    785                 if (current == null)
    786                     throw new NoSuchElementException();
    787                 lastRet = current;
    788                 E item = null;
    789                 // Unlike other traversal methods, iterators must handle both:
    790                 // - dequeued nodes (p.next == p)
    791                 // - (possibly multiple) interior removed nodes (p.item == null)
    792                 for (Node<E> p = current, q;; p = q) {
    793                     if ((q = p.next) == p)
    794                         q = head.next;
    795                     if (q == null || (item = q.item) != null) {
    796                         current = q;
    797                         E x = currentElement;
    798                         currentElement = item;
    799                         return x;
    800                     }
    801                 }
    802             } finally {
    803                 fullyUnlock();
    804             }
    805         }
    806 
    807         public void remove() {
    808             if (lastRet == null)
    809                 throw new IllegalStateException();
    810             fullyLock();
    811             try {
    812                 Node<E> node = lastRet;
    813                 lastRet = null;
    814                 for (Node<E> trail = head, p = trail.next;
    815                      p != null;
    816                      trail = p, p = p.next) {
    817                     if (p == node) {
    818                         unlink(p, trail);
    819                         break;
    820                     }
    821                 }
    822             } finally {
    823                 fullyUnlock();
    824             }
    825         }
    826     }
    827 
    828     /** A customized variant of Spliterators.IteratorSpliterator */
    829     static final class LBQSpliterator<E> implements Spliterator<E> {
    830         static final int MAX_BATCH = 1 << 25;  // max batch array size;
    831         final LinkedBlockingQueue<E> queue;
    832         Node<E> current;    // current node; null until initialized
    833         int batch;          // batch size for splits
    834         boolean exhausted;  // true when no more nodes
    835         long est;           // size estimate
    836         LBQSpliterator(LinkedBlockingQueue<E> queue) {
    837             this.queue = queue;
    838             this.est = queue.size();
    839         }
    840 
    841         public long estimateSize() { return est; }
    842 
    843         public Spliterator<E> trySplit() {
    844             Node<E> h;
    845             final LinkedBlockingQueue<E> q = this.queue;
    846             int b = batch;
    847             int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
    848             if (!exhausted &&
    849                 ((h = current) != null || (h = q.head.next) != null) &&
    850                 h.next != null) {
    851                 Object[] a = new Object[n];
    852                 int i = 0;
    853                 Node<E> p = current;
    854                 q.fullyLock();
    855                 try {
    856                     if (p != null || (p = q.head.next) != null) {
    857                         do {
    858                             if ((a[i] = p.item) != null)
    859                                 ++i;
    860                         } while ((p = p.next) != null && i < n);
    861                     }
    862                 } finally {
    863                     q.fullyUnlock();
    864                 }
    865                 if ((current = p) == null) {
    866                     est = 0L;
    867                     exhausted = true;
    868                 }
    869                 else if ((est -= i) < 0L)
    870                     est = 0L;
    871                 if (i > 0) {
    872                     batch = i;
    873                     return Spliterators.spliterator
    874                         (a, 0, i, (Spliterator.ORDERED |
    875                                    Spliterator.NONNULL |
    876                                    Spliterator.CONCURRENT));
    877                 }
    878             }
    879             return null;
    880         }
    881 
    882         public void forEachRemaining(Consumer<? super E> action) {
    883             if (action == null) throw new NullPointerException();
    884             final LinkedBlockingQueue<E> q = this.queue;
    885             if (!exhausted) {
    886                 exhausted = true;
    887                 Node<E> p = current;
    888                 do {
    889                     E e = null;
    890                     q.fullyLock();
    891                     try {
    892                         if (p == null)
    893                             p = q.head.next;
    894                         while (p != null) {
    895                             e = p.item;
    896                             p = p.next;
    897                             if (e != null)
    898                                 break;
    899                         }
    900                     } finally {
    901                         q.fullyUnlock();
    902                     }
    903                     if (e != null)
    904                         action.accept(e);
    905                 } while (p != null);
    906             }
    907         }
    908 
    909         public boolean tryAdvance(Consumer<? super E> action) {
    910             if (action == null) throw new NullPointerException();
    911             final LinkedBlockingQueue<E> q = this.queue;
    912             if (!exhausted) {
    913                 E e = null;
    914                 q.fullyLock();
    915                 try {
    916                     if (current == null)
    917                         current = q.head.next;
    918                     while (current != null) {
    919                         e = current.item;
    920                         current = current.next;
    921                         if (e != null)
    922                             break;
    923                     }
    924                 } finally {
    925                     q.fullyUnlock();
    926                 }
    927                 if (current == null)
    928                     exhausted = true;
    929                 if (e != null) {
    930                     action.accept(e);
    931                     return true;
    932                 }
    933             }
    934             return false;
    935         }
    936 
    937         public int characteristics() {
    938             return Spliterator.ORDERED | Spliterator.NONNULL |
    939                 Spliterator.CONCURRENT;
    940         }
    941     }
    942 
    943     /**
    944      * Returns a {@link Spliterator} over the elements in this queue.
    945      *
    946      * <p>The returned spliterator is
    947      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
    948      *
    949      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
    950      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
    951      *
    952      * @implNote
    953      * The {@code Spliterator} implements {@code trySplit} to permit limited
    954      * parallelism.
    955      *
    956      * @return a {@code Spliterator} over the elements in this queue
    957      * @since 1.8
    958      */
    959     public Spliterator<E> spliterator() {
    960         return new LBQSpliterator<E>(this);
    961     }
    962 
    963     /**
    964      * Saves this queue to a stream (that is, serializes it).
    965      *
    966      * @param s the stream
    967      * @throws java.io.IOException if an I/O error occurs
    968      * @serialData The capacity is emitted (int), followed by all of
    969      * its elements (each an {@code Object}) in the proper order,
    970      * followed by a null
    971      */
    972     private void writeObject(java.io.ObjectOutputStream s)
    973         throws java.io.IOException {
    974 
    975         fullyLock();
    976         try {
    977             // Write out any hidden stuff, plus capacity
    978             s.defaultWriteObject();
    979 
    980             // Write out all elements in the proper order.
    981             for (Node<E> p = head.next; p != null; p = p.next)
    982                 s.writeObject(p.item);
    983 
    984             // Use trailing null as sentinel
    985             s.writeObject(null);
    986         } finally {
    987             fullyUnlock();
    988         }
    989     }
    990 
    991     /**
    992      * Reconstitutes this queue from a stream (that is, deserializes it).
    993      * @param s the stream
    994      * @throws ClassNotFoundException if the class of a serialized object
    995      *         could not be found
    996      * @throws java.io.IOException if an I/O error occurs
    997      */
    998     private void readObject(java.io.ObjectInputStream s)
    999         throws java.io.IOException, ClassNotFoundException {
   1000         // Read in capacity, and any hidden stuff
   1001         s.defaultReadObject();
   1002 
   1003         count.set(0);
   1004         last = head = new Node<E>(null);
   1005 
   1006         // Read in all elements and place in queue
   1007         for (;;) {
   1008             @SuppressWarnings("unchecked")
   1009             E item = (E)s.readObject();
   1010             if (item == null)
   1011                 break;
   1012             add(item);
   1013         }
   1014     }
   1015 }
   1016