Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Written by Doug Lea with assistance from members of JCP JSR-166
      3  * Expert Group and released to the public domain, as explained at
      4  * http://creativecommons.org/publicdomain/zero/1.0/
      5  */
      6 
      7 package java.util.concurrent;
      8 import java.util.concurrent.locks.Condition;
      9 import java.util.concurrent.locks.ReentrantLock;
     10 import java.util.AbstractQueue;
     11 import java.util.Collection;
     12 import java.util.Iterator;
     13 import java.util.NoSuchElementException;
     14 import java.lang.ref.WeakReference;
     15 
     16 // BEGIN android-note
     17 // removed link to collections framework docs
     18 // END android-note
     19 
     20 /**
     21  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
     22  * array.  This queue orders elements FIFO (first-in-first-out).  The
     23  * <em>head</em> of the queue is that element that has been on the
     24  * queue the longest time.  The <em>tail</em> of the queue is that
     25  * element that has been on the queue the shortest time. New elements
     26  * are inserted at the tail of the queue, and the queue retrieval
     27  * operations obtain elements at the head of the queue.
     28  *
     29  * <p>This is a classic &quot;bounded buffer&quot;, in which a
     30  * fixed-sized array holds elements inserted by producers and
     31  * extracted by consumers.  Once created, the capacity cannot be
     32  * changed.  Attempts to {@code put} an element into a full queue
     33  * will result in the operation blocking; attempts to {@code take} an
     34  * element from an empty queue will similarly block.
     35  *
     36  * <p>This class supports an optional fairness policy for ordering
     37  * waiting producer and consumer threads.  By default, this ordering
     38  * is not guaranteed. However, a queue constructed with fairness set
     39  * to {@code true} grants threads access in FIFO order. Fairness
     40  * generally decreases throughput but reduces variability and avoids
     41  * starvation.
     42  *
     43  * <p>This class and its iterator implement all of the
     44  * <em>optional</em> methods of the {@link Collection} and {@link
     45  * Iterator} interfaces.
     46  *
     47  * @since 1.5
     48  * @author Doug Lea
     49  * @param <E> the type of elements held in this collection
     50  */
     51 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
     52         implements BlockingQueue<E>, java.io.Serializable {
     53 
     54     /**
     55      * Serialization ID. This class relies on default serialization
     56      * even for the items array, which is default-serialized, even if
     57      * it is empty. Otherwise it could not be declared final, which is
     58      * necessary here.
     59      */
     60     private static final long serialVersionUID = -817911632652898426L;
     61 
     62     /** The queued items */
     63     final Object[] items;
     64 
     65     /** items index for next take, poll, peek or remove */
     66     int takeIndex;
     67 
     68     /** items index for next put, offer, or add */
     69     int putIndex;
     70 
     71     /** Number of elements in the queue */
     72     int count;
     73 
     74     /*
     75      * Concurrency control uses the classic two-condition algorithm
     76      * found in any textbook.
     77      */
     78 
     79     /** Main lock guarding all access */
     80     final ReentrantLock lock;
     81 
     82     /** Condition for waiting takes */
     83     private final Condition notEmpty;
     84 
     85     /** Condition for waiting puts */
     86     private final Condition notFull;
     87 
     88     /**
     89      * Shared state for currently active iterators, or null if there
     90      * are known not to be any.  Allows queue operations to update
     91      * iterator state.
     92      */
     93     transient Itrs itrs = null;
     94 
     95     // Internal helper methods
     96 
     97     /**
     98      * Circularly increment i.
     99      */
    100     final int inc(int i) {
    101         return (++i == items.length) ? 0 : i;
    102     }
    103 
    104     /**
    105      * Circularly decrement i.
    106      */
    107     final int dec(int i) {
    108         return ((i == 0) ? items.length : i) - 1;
    109     }
    110 
    111     /**
    112      * Returns item at index i.
    113      */
    114     final E itemAt(int i) {
    115         @SuppressWarnings("unchecked") E x = (E) items[i];
    116         return x;
    117     }
    118 
    119     /**
    120      * Throws NullPointerException if argument is null.
    121      *
    122      * @param v the element
    123      */
    124     private static void checkNotNull(Object v) {
    125         if (v == null)
    126             throw new NullPointerException();
    127     }
    128 
    129     /**
    130      * Inserts element at current put position, advances, and signals.
    131      * Call only when holding lock.
    132      */
    133     private void enqueue(E x) {
    134         // assert lock.getHoldCount() == 1;
    135         // assert items[putIndex] == null;
    136         items[putIndex] = x;
    137         putIndex = inc(putIndex);
    138         count++;
    139         notEmpty.signal();
    140     }
    141 
    142     /**
    143      * Extracts element at current take position, advances, and signals.
    144      * Call only when holding lock.
    145      */
    146     private E dequeue() {
    147         // assert lock.getHoldCount() == 1;
    148         // assert items[takeIndex] != null;
    149         final Object[] items = this.items;
    150         @SuppressWarnings("unchecked") E x = (E) items[takeIndex];
    151         items[takeIndex] = null;
    152         takeIndex = inc(takeIndex);
    153         count--;
    154         if (itrs != null)
    155             itrs.elementDequeued();
    156         notFull.signal();
    157         return x;
    158     }
    159 
    160     /**
    161      * Deletes item at array index removeIndex.
    162      * Utility for remove(Object) and iterator.remove.
    163      * Call only when holding lock.
    164      */
    165     void removeAt(final int removeIndex) {
    166         // assert lock.getHoldCount() == 1;
    167         // assert items[removeIndex] != null;
    168         // assert removeIndex >= 0 && removeIndex < items.length;
    169         final Object[] items = this.items;
    170         if (removeIndex == takeIndex) {
    171             // removing front item; just advance
    172             items[takeIndex] = null;
    173             takeIndex = inc(takeIndex);
    174             count--;
    175             if (itrs != null)
    176                 itrs.elementDequeued();
    177         } else {
    178             // an "interior" remove
    179 
    180             // slide over all others up through putIndex.
    181             final int putIndex = this.putIndex;
    182             for (int i = removeIndex;;) {
    183                 int next = inc(i);
    184                 if (next != putIndex) {
    185                     items[i] = items[next];
    186                     i = next;
    187                 } else {
    188                     items[i] = null;
    189                     this.putIndex = i;
    190                     break;
    191                 }
    192             }
    193             count--;
    194             if (itrs != null)
    195                 itrs.removedAt(removeIndex);
    196         }
    197         notFull.signal();
    198     }
    199 
    200     /**
    201      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
    202      * capacity and default access policy.
    203      *
    204      * @param capacity the capacity of this queue
    205      * @throws IllegalArgumentException if {@code capacity < 1}
    206      */
    207     public ArrayBlockingQueue(int capacity) {
    208         this(capacity, false);
    209     }
    210 
    211     /**
    212      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
    213      * capacity and the specified access policy.
    214      *
    215      * @param capacity the capacity of this queue
    216      * @param fair if {@code true} then queue accesses for threads blocked
    217      *        on insertion or removal, are processed in FIFO order;
    218      *        if {@code false} the access order is unspecified.
    219      * @throws IllegalArgumentException if {@code capacity < 1}
    220      */
    221     public ArrayBlockingQueue(int capacity, boolean fair) {
    222         if (capacity <= 0)
    223             throw new IllegalArgumentException();
    224         this.items = new Object[capacity];
    225         lock = new ReentrantLock(fair);
    226         notEmpty = lock.newCondition();
    227         notFull =  lock.newCondition();
    228     }
    229 
    230     /**
    231      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
    232      * capacity, the specified access policy and initially containing the
    233      * elements of the given collection,
    234      * added in traversal order of the collection's iterator.
    235      *
    236      * @param capacity the capacity of this queue
    237      * @param fair if {@code true} then queue accesses for threads blocked
    238      *        on insertion or removal, are processed in FIFO order;
    239      *        if {@code false} the access order is unspecified.
    240      * @param c the collection of elements to initially contain
    241      * @throws IllegalArgumentException if {@code capacity} is less than
    242      *         {@code c.size()}, or less than 1.
    243      * @throws NullPointerException if the specified collection or any
    244      *         of its elements are null
    245      */
    246     public ArrayBlockingQueue(int capacity, boolean fair,
    247                               Collection<? extends E> c) {
    248         this(capacity, fair);
    249 
    250         final ReentrantLock lock = this.lock;
    251         lock.lock(); // Lock only for visibility, not mutual exclusion
    252         try {
    253             int i = 0;
    254             try {
    255                 for (E e : c) {
    256                     checkNotNull(e);
    257                     items[i++] = e;
    258                 }
    259             } catch (ArrayIndexOutOfBoundsException ex) {
    260                 throw new IllegalArgumentException();
    261             }
    262             count = i;
    263             putIndex = (i == capacity) ? 0 : i;
    264         } finally {
    265             lock.unlock();
    266         }
    267     }
    268 
    269     /**
    270      * Inserts the specified element at the tail of this queue if it is
    271      * possible to do so immediately without exceeding the queue's capacity,
    272      * returning {@code true} upon success and throwing an
    273      * {@code IllegalStateException} if this queue is full.
    274      *
    275      * @param e the element to add
    276      * @return {@code true} (as specified by {@link Collection#add})
    277      * @throws IllegalStateException if this queue is full
    278      * @throws NullPointerException if the specified element is null
    279      */
    280     public boolean add(E e) {
    281         return super.add(e);
    282     }
    283 
    284     /**
    285      * Inserts the specified element at the tail of this queue if it is
    286      * possible to do so immediately without exceeding the queue's capacity,
    287      * returning {@code true} upon success and {@code false} if this queue
    288      * is full.  This method is generally preferable to method {@link #add},
    289      * which can fail to insert an element only by throwing an exception.
    290      *
    291      * @throws NullPointerException if the specified element is null
    292      */
    293     public boolean offer(E e) {
    294         checkNotNull(e);
    295         final ReentrantLock lock = this.lock;
    296         lock.lock();
    297         try {
    298             if (count == items.length)
    299                 return false;
    300             else {
    301                 enqueue(e);
    302                 return true;
    303             }
    304         } finally {
    305             lock.unlock();
    306         }
    307     }
    308 
    309     /**
    310      * Inserts the specified element at the tail of this queue, waiting
    311      * for space to become available if the queue is full.
    312      *
    313      * @throws InterruptedException {@inheritDoc}
    314      * @throws NullPointerException {@inheritDoc}
    315      */
    316     public void put(E e) throws InterruptedException {
    317         checkNotNull(e);
    318         final ReentrantLock lock = this.lock;
    319         lock.lockInterruptibly();
    320         try {
    321             while (count == items.length)
    322                 notFull.await();
    323             enqueue(e);
    324         } finally {
    325             lock.unlock();
    326         }
    327     }
    328 
    329     /**
    330      * Inserts the specified element at the tail of this queue, waiting
    331      * up to the specified wait time for space to become available if
    332      * the queue is full.
    333      *
    334      * @throws InterruptedException {@inheritDoc}
    335      * @throws NullPointerException {@inheritDoc}
    336      */
    337     public boolean offer(E e, long timeout, TimeUnit unit)
    338         throws InterruptedException {
    339 
    340         checkNotNull(e);
    341         long nanos = unit.toNanos(timeout);
    342         final ReentrantLock lock = this.lock;
    343         lock.lockInterruptibly();
    344         try {
    345             while (count == items.length) {
    346                 if (nanos <= 0)
    347                     return false;
    348                 nanos = notFull.awaitNanos(nanos);
    349             }
    350             enqueue(e);
    351             return true;
    352         } finally {
    353             lock.unlock();
    354         }
    355     }
    356 
    357     public E poll() {
    358         final ReentrantLock lock = this.lock;
    359         lock.lock();
    360         try {
    361             return (count == 0) ? null : dequeue();
    362         } finally {
    363             lock.unlock();
    364         }
    365     }
    366 
    367     public E take() throws InterruptedException {
    368         final ReentrantLock lock = this.lock;
    369         lock.lockInterruptibly();
    370         try {
    371             while (count == 0)
    372                 notEmpty.await();
    373             return dequeue();
    374         } finally {
    375             lock.unlock();
    376         }
    377     }
    378 
    379     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    380         long nanos = unit.toNanos(timeout);
    381         final ReentrantLock lock = this.lock;
    382         lock.lockInterruptibly();
    383         try {
    384             while (count == 0) {
    385                 if (nanos <= 0)
    386                     return null;
    387                 nanos = notEmpty.awaitNanos(nanos);
    388             }
    389             return dequeue();
    390         } finally {
    391             lock.unlock();
    392         }
    393     }
    394 
    395     public E peek() {
    396         final ReentrantLock lock = this.lock;
    397         lock.lock();
    398         try {
    399             return (count == 0) ? null : itemAt(takeIndex);
    400         } finally {
    401             lock.unlock();
    402         }
    403     }
    404 
    405     // this doc comment is overridden to remove the reference to collections
    406     // greater in size than Integer.MAX_VALUE
    407     /**
    408      * Returns the number of elements in this queue.
    409      *
    410      * @return the number of elements in this queue
    411      */
    412     public int size() {
    413         final ReentrantLock lock = this.lock;
    414         lock.lock();
    415         try {
    416             return count;
    417         } finally {
    418             lock.unlock();
    419         }
    420     }
    421 
    422     // this doc comment is a modified copy of the inherited doc comment,
    423     // without the reference to unlimited queues.
    424     /**
    425      * Returns the number of additional elements that this queue can ideally
    426      * (in the absence of memory or resource constraints) accept without
    427      * blocking. This is always equal to the initial capacity of this queue
    428      * less the current {@code size} of this queue.
    429      *
    430      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
    431      * an element will succeed by inspecting {@code remainingCapacity}
    432      * because it may be the case that another thread is about to
    433      * insert or remove an element.
    434      */
    435     public int remainingCapacity() {
    436         final ReentrantLock lock = this.lock;
    437         lock.lock();
    438         try {
    439             return items.length - count;
    440         } finally {
    441             lock.unlock();
    442         }
    443     }
    444 
    445     /**
    446      * Removes a single instance of the specified element from this queue,
    447      * if it is present.  More formally, removes an element {@code e} such
    448      * that {@code o.equals(e)}, if this queue contains one or more such
    449      * elements.
    450      * Returns {@code true} if this queue contained the specified element
    451      * (or equivalently, if this queue changed as a result of the call).
    452      *
    453      * <p>Removal of interior elements in circular array based queues
    454      * is an intrinsically slow and disruptive operation, so should
    455      * be undertaken only in exceptional circumstances, ideally
    456      * only when the queue is known not to be accessible by other
    457      * threads.
    458      *
    459      * @param o element to be removed from this queue, if present
    460      * @return {@code true} if this queue changed as a result of the call
    461      */
    462     public boolean remove(Object o) {
    463         if (o == null) return false;
    464         final Object[] items = this.items;
    465         final ReentrantLock lock = this.lock;
    466         lock.lock();
    467         try {
    468             if (count > 0) {
    469                 final int putIndex = this.putIndex;
    470                 int i = takeIndex;
    471                 do {
    472                     if (o.equals(items[i])) {
    473                         removeAt(i);
    474                         return true;
    475                     }
    476                 } while ((i = inc(i)) != putIndex);
    477             }
    478             return false;
    479         } finally {
    480             lock.unlock();
    481         }
    482     }
    483 
    484     /**
    485      * Returns {@code true} if this queue contains the specified element.
    486      * More formally, returns {@code true} if and only if this queue contains
    487      * at least one element {@code e} such that {@code o.equals(e)}.
    488      *
    489      * @param o object to be checked for containment in this queue
    490      * @return {@code true} if this queue contains the specified element
    491      */
    492     public boolean contains(Object o) {
    493         if (o == null) return false;
    494         final Object[] items = this.items;
    495         final ReentrantLock lock = this.lock;
    496         lock.lock();
    497         try {
    498             if (count > 0) {
    499                 final int putIndex = this.putIndex;
    500                 int i = takeIndex;
    501                 do {
    502                     if (o.equals(items[i]))
    503                         return true;
    504                 } while ((i = inc(i)) != putIndex);
    505             }
    506             return false;
    507         } finally {
    508             lock.unlock();
    509         }
    510     }
    511 
    512     /**
    513      * Returns an array containing all of the elements in this queue, in
    514      * proper sequence.
    515      *
    516      * <p>The returned array will be "safe" in that no references to it are
    517      * maintained by this queue.  (In other words, this method must allocate
    518      * a new array).  The caller is thus free to modify the returned array.
    519      *
    520      * <p>This method acts as bridge between array-based and collection-based
    521      * APIs.
    522      *
    523      * @return an array containing all of the elements in this queue
    524      */
    525     public Object[] toArray() {
    526         final Object[] items = this.items;
    527         final ReentrantLock lock = this.lock;
    528         lock.lock();
    529         try {
    530             final int count = this.count;
    531             Object[] a = new Object[count];
    532             for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
    533                 a[k] = items[i];
    534             return a;
    535         } finally {
    536             lock.unlock();
    537         }
    538     }
    539 
    540     /**
    541      * Returns an array containing all of the elements in this queue, in
    542      * proper sequence; the runtime type of the returned array is that of
    543      * the specified array.  If the queue fits in the specified array, it
    544      * is returned therein.  Otherwise, a new array is allocated with the
    545      * runtime type of the specified array and the size of this queue.
    546      *
    547      * <p>If this queue fits in the specified array with room to spare
    548      * (i.e., the array has more elements than this queue), the element in
    549      * the array immediately following the end of the queue is set to
    550      * {@code null}.
    551      *
    552      * <p>Like the {@link #toArray()} method, this method acts as bridge between
    553      * array-based and collection-based APIs.  Further, this method allows
    554      * precise control over the runtime type of the output array, and may,
    555      * under certain circumstances, be used to save allocation costs.
    556      *
    557      * <p>Suppose {@code x} is a queue known to contain only strings.
    558      * The following code can be used to dump the queue into a newly
    559      * allocated array of {@code String}:
    560      *
    561      *  <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
    562      *
    563      * Note that {@code toArray(new Object[0])} is identical in function to
    564      * {@code toArray()}.
    565      *
    566      * @param a the array into which the elements of the queue are to
    567      *          be stored, if it is big enough; otherwise, a new array of the
    568      *          same runtime type is allocated for this purpose
    569      * @return an array containing all of the elements in this queue
    570      * @throws ArrayStoreException if the runtime type of the specified array
    571      *         is not a supertype of the runtime type of every element in
    572      *         this queue
    573      * @throws NullPointerException if the specified array is null
    574      */
    575     @SuppressWarnings("unchecked")
    576     public <T> T[] toArray(T[] a) {
    577         final Object[] items = this.items;
    578         final ReentrantLock lock = this.lock;
    579         lock.lock();
    580         try {
    581             final int count = this.count;
    582             final int len = a.length;
    583             if (len < count)
    584                 a = (T[])java.lang.reflect.Array.newInstance(
    585                     a.getClass().getComponentType(), count);
    586             for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
    587                 a[k] = (T) items[i];
    588             if (len > count)
    589                 a[count] = null;
    590             return a;
    591         } finally {
    592             lock.unlock();
    593         }
    594     }
    595 
    596     public String toString() {
    597         final ReentrantLock lock = this.lock;
    598         lock.lock();
    599         try {
    600             int k = count;
    601             if (k == 0)
    602                 return "[]";
    603 
    604             StringBuilder sb = new StringBuilder();
    605             sb.append('[');
    606             for (int i = takeIndex; ; i = inc(i)) {
    607                 Object e = items[i];
    608                 sb.append(e == this ? "(this Collection)" : e);
    609                 if (--k == 0)
    610                     return sb.append(']').toString();
    611                 sb.append(',').append(' ');
    612             }
    613         } finally {
    614             lock.unlock();
    615         }
    616     }
    617 
    618     /**
    619      * Atomically removes all of the elements from this queue.
    620      * The queue will be empty after this call returns.
    621      */
    622     public void clear() {
    623         final Object[] items = this.items;
    624         final ReentrantLock lock = this.lock;
    625         lock.lock();
    626         try {
    627             int k = count;
    628             if (k > 0) {
    629                 final int putIndex = this.putIndex;
    630                 int i = takeIndex;
    631                 do {
    632                     items[i] = null;
    633                 } while ((i = inc(i)) != putIndex);
    634                 takeIndex = putIndex;
    635                 count = 0;
    636                 if (itrs != null)
    637                     itrs.queueIsEmpty();
    638                 for (; k > 0 && lock.hasWaiters(notFull); k--)
    639                     notFull.signal();
    640             }
    641         } finally {
    642             lock.unlock();
    643         }
    644     }
    645 
    646     /**
    647      * @throws UnsupportedOperationException {@inheritDoc}
    648      * @throws ClassCastException            {@inheritDoc}
    649      * @throws NullPointerException          {@inheritDoc}
    650      * @throws IllegalArgumentException      {@inheritDoc}
    651      */
    652     public int drainTo(Collection<? super E> c) {
    653         return drainTo(c, Integer.MAX_VALUE);
    654     }
    655 
    656     /**
    657      * @throws UnsupportedOperationException {@inheritDoc}
    658      * @throws ClassCastException            {@inheritDoc}
    659      * @throws NullPointerException          {@inheritDoc}
    660      * @throws IllegalArgumentException      {@inheritDoc}
    661      */
    662     public int drainTo(Collection<? super E> c, int maxElements) {
    663         checkNotNull(c);
    664         if (c == this)
    665             throw new IllegalArgumentException();
    666         if (maxElements <= 0)
    667             return 0;
    668         final Object[] items = this.items;
    669         final ReentrantLock lock = this.lock;
    670         lock.lock();
    671         try {
    672             int n = Math.min(maxElements, count);
    673             int take = takeIndex;
    674             int i = 0;
    675             try {
    676                 while (i < n) {
    677                     @SuppressWarnings("unchecked") E x = (E) items[take];
    678                     c.add(x);
    679                     items[take] = null;
    680                     take = inc(take);
    681                     i++;
    682                 }
    683                 return n;
    684             } finally {
    685                 // Restore invariants even if c.add() threw
    686                 if (i > 0) {
    687                     count -= i;
    688                     takeIndex = take;
    689                     if (itrs != null) {
    690                         if (count == 0)
    691                             itrs.queueIsEmpty();
    692                         else if (i > take)
    693                             itrs.takeIndexWrapped();
    694                     }
    695                     for (; i > 0 && lock.hasWaiters(notFull); i--)
    696                         notFull.signal();
    697                 }
    698             }
    699         } finally {
    700             lock.unlock();
    701         }
    702     }
    703 
    704     /**
    705      * Returns an iterator over the elements in this queue in proper sequence.
    706      * The elements will be returned in order from first (head) to last (tail).
    707      *
    708      * <p>The returned iterator is a "weakly consistent" iterator that
    709      * will never throw {@link java.util.ConcurrentModificationException
    710      * ConcurrentModificationException}, and guarantees to traverse
    711      * elements as they existed upon construction of the iterator, and
    712      * may (but is not guaranteed to) reflect any modifications
    713      * subsequent to construction.
    714      *
    715      * @return an iterator over the elements in this queue in proper sequence
    716      */
    717     public Iterator<E> iterator() {
    718         return new Itr();
    719     }
    720 
    721     /**
    722      * Shared data between iterators and their queue, allowing queue
    723      * modifications to update iterators when elements are removed.
    724      *
    725      * This adds a lot of complexity for the sake of correctly
    726      * handling some uncommon operations, but the combination of
    727      * circular-arrays and supporting interior removes (i.e., those
    728      * not at head) would cause iterators to sometimes lose their
    729      * places and/or (re)report elements they shouldn't.  To avoid
    730      * this, when a queue has one or more iterators, it keeps iterator
    731      * state consistent by:
    732      *
    733      * (1) keeping track of the number of "cycles", that is, the
    734      *     number of times takeIndex has wrapped around to 0.
    735      * (2) notifying all iterators via the callback removedAt whenever
    736      *     an interior element is removed (and thus other elements may
    737      *     be shifted).
    738      *
    739      * These suffice to eliminate iterator inconsistencies, but
    740      * unfortunately add the secondary responsibility of maintaining
    741      * the list of iterators.  We track all active iterators in a
    742      * simple linked list (accessed only when the queue's lock is
    743      * held) of weak references to Itr.  The list is cleaned up using
    744      * 3 different mechanisms:
    745      *
    746      * (1) Whenever a new iterator is created, do some O(1) checking for
    747      *     stale list elements.
    748      *
    749      * (2) Whenever takeIndex wraps around to 0, check for iterators
    750      *     that have been unused for more than one wrap-around cycle.
    751      *
    752      * (3) Whenever the queue becomes empty, all iterators are notified
    753      *     and this entire data structure is discarded.
    754      *
    755      * So in addition to the removedAt callback that is necessary for
    756      * correctness, iterators have the shutdown and takeIndexWrapped
    757      * callbacks that help remove stale iterators from the list.
    758      *
    759      * Whenever a list element is examined, it is expunged if either
    760      * the GC has determined that the iterator is discarded, or if the
    761      * iterator reports that it is "detached" (does not need any
    762      * further state updates).  Overhead is maximal when takeIndex
    763      * never advances, iterators are discarded before they are
    764      * exhausted, and all removals are interior removes, in which case
    765      * all stale iterators are discovered by the GC.  But even in this
    766      * case we don't increase the amortized complexity.
    767      *
    768      * Care must be taken to keep list sweeping methods from
    769      * reentrantly invoking another such method, causing subtle
    770      * corruption bugs.
    771      */
    772     class Itrs {
    773 
    774         /**
    775          * Node in a linked list of weak iterator references.
    776          */
    777         private class Node extends WeakReference<Itr> {
    778             Node next;
    779 
    780             Node(Itr iterator, Node next) {
    781                 super(iterator);
    782                 this.next = next;
    783             }
    784         }
    785 
    786         /** Incremented whenever takeIndex wraps around to 0 */
    787         int cycles = 0;
    788 
    789         /** Linked list of weak iterator references */
    790         private Node head;
    791 
    792         /** Used to expunge stale iterators */
    793         private Node sweeper = null;
    794 
    795         private static final int SHORT_SWEEP_PROBES = 4;
    796         private static final int LONG_SWEEP_PROBES = 16;
    797 
    798         Itrs(Itr initial) {
    799             register(initial);
    800         }
    801 
    802         /**
    803          * Sweeps itrs, looking for and expunging stale iterators.
    804          * If at least one was found, tries harder to find more.
    805          * Called only from iterating thread.
    806          *
    807          * @param tryHarder whether to start in try-harder mode, because
    808          * there is known to be at least one iterator to collect
    809          */
    810         void doSomeSweeping(boolean tryHarder) {
    811             // assert lock.getHoldCount() == 1;
    812             // assert head != null;
    813             int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
    814             Node o, p;
    815             final Node sweeper = this.sweeper;
    816             boolean passedGo;   // to limit search to one full sweep
    817 
    818             if (sweeper == null) {
    819                 o = null;
    820                 p = head;
    821                 passedGo = true;
    822             } else {
    823                 o = sweeper;
    824                 p = o.next;
    825                 passedGo = false;
    826             }
    827 
    828             for (; probes > 0; probes--) {
    829                 if (p == null) {
    830                     if (passedGo)
    831                         break;
    832                     o = null;
    833                     p = head;
    834                     passedGo = true;
    835                 }
    836                 final Itr it = p.get();
    837                 final Node next = p.next;
    838                 if (it == null || it.isDetached()) {
    839                     // found a discarded/exhausted iterator
    840                     probes = LONG_SWEEP_PROBES; // "try harder"
    841                     // unlink p
    842                     p.clear();
    843                     p.next = null;
    844                     if (o == null) {
    845                         head = next;
    846                         if (next == null) {
    847                             // We've run out of iterators to track; retire
    848                             itrs = null;
    849                             return;
    850                         }
    851                     }
    852                     else
    853                         o.next = next;
    854                 } else {
    855                     o = p;
    856                 }
    857                 p = next;
    858             }
    859 
    860             this.sweeper = (p == null) ? null : o;
    861         }
    862 
    863         /**
    864          * Adds a new iterator to the linked list of tracked iterators.
    865          */
    866         void register(Itr itr) {
    867             // assert lock.getHoldCount() == 1;
    868             head = new Node(itr, head);
    869         }
    870 
    871         /**
    872          * Called whenever takeIndex wraps around to 0.
    873          *
    874          * Notifies all iterators, and expunges any that are now stale.
    875          */
    876         void takeIndexWrapped() {
    877             // assert lock.getHoldCount() == 1;
    878             cycles++;
    879             for (Node o = null, p = head; p != null;) {
    880                 final Itr it = p.get();
    881                 final Node next = p.next;
    882                 if (it == null || it.takeIndexWrapped()) {
    883                     // unlink p
    884                     // assert it == null || it.isDetached();
    885                     p.clear();
    886                     p.next = null;
    887                     if (o == null)
    888                         head = next;
    889                     else
    890                         o.next = next;
    891                 } else {
    892                     o = p;
    893                 }
    894                 p = next;
    895             }
    896             if (head == null)   // no more iterators to track
    897                 itrs = null;
    898         }
    899 
    900         /**
    901          * Called whenever an interior remove (not at takeIndex) occured.
    902          *
    903          * Notifies all iterators, and expunges any that are now stale.
    904          */
    905         void removedAt(int removedIndex) {
    906             for (Node o = null, p = head; p != null;) {
    907                 final Itr it = p.get();
    908                 final Node next = p.next;
    909                 if (it == null || it.removedAt(removedIndex)) {
    910                     // unlink p
    911                     // assert it == null || it.isDetached();
    912                     p.clear();
    913                     p.next = null;
    914                     if (o == null)
    915                         head = next;
    916                     else
    917                         o.next = next;
    918                 } else {
    919                     o = p;
    920                 }
    921                 p = next;
    922             }
    923             if (head == null)   // no more iterators to track
    924                 itrs = null;
    925         }
    926 
    927         /**
    928          * Called whenever the queue becomes empty.
    929          *
    930          * Notifies all active iterators that the queue is empty,
    931          * clears all weak refs, and unlinks the itrs datastructure.
    932          */
    933         void queueIsEmpty() {
    934             // assert lock.getHoldCount() == 1;
    935             for (Node p = head; p != null; p = p.next) {
    936                 Itr it = p.get();
    937                 if (it != null) {
    938                     p.clear();
    939                     it.shutdown();
    940                 }
    941             }
    942             head = null;
    943             itrs = null;
    944         }
    945 
    946         /**
    947          * Called whenever an element has been dequeued (at takeIndex).
    948          */
    949         void elementDequeued() {
    950             // assert lock.getHoldCount() == 1;
    951             if (count == 0)
    952                 queueIsEmpty();
    953             else if (takeIndex == 0)
    954                 takeIndexWrapped();
    955         }
    956     }
    957 
    958     /**
    959      * Iterator for ArrayBlockingQueue.
    960      *
    961      * To maintain weak consistency with respect to puts and takes, we
    962      * read ahead one slot, so as to not report hasNext true but then
    963      * not have an element to return.
    964      *
    965      * We switch into "detached" mode (allowing prompt unlinking from
    966      * itrs without help from the GC) when all indices are negative, or
    967      * when hasNext returns false for the first time.  This allows the
    968      * iterator to track concurrent updates completely accurately,
    969      * except for the corner case of the user calling Iterator.remove()
    970      * after hasNext() returned false.  Even in this case, we ensure
    971      * that we don't remove the wrong element by keeping track of the
    972      * expected element to remove, in lastItem.  Yes, we may fail to
    973      * remove lastItem from the queue if it moved due to an interleaved
    974      * interior remove while in detached mode.
    975      */
    976     private class Itr implements Iterator<E> {
    977         /** Index to look for new nextItem; NONE at end */
    978         private int cursor;
    979 
    980         /** Element to be returned by next call to next(); null if none */
    981         private E nextItem;
    982 
    983         /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
    984         private int nextIndex;
    985 
    986         /** Last element returned; null if none or not detached. */
    987         private E lastItem;
    988 
    989         /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
    990         private int lastRet;
    991 
    992         /** Previous value of takeIndex, or DETACHED when detached */
    993         private int prevTakeIndex;
    994 
    995         /** Previous value of iters.cycles */
    996         private int prevCycles;
    997 
    998         /** Special index value indicating "not available" or "undefined" */
    999         private static final int NONE = -1;
   1000 
   1001         /**
   1002          * Special index value indicating "removed elsewhere", that is,
   1003          * removed by some operation other than a call to this.remove().
   1004          */
   1005         private static final int REMOVED = -2;
   1006 
   1007         /** Special value for prevTakeIndex indicating "detached mode" */
   1008         private static final int DETACHED = -3;
   1009 
   1010         Itr() {
   1011             // assert lock.getHoldCount() == 0;
   1012             lastRet = NONE;
   1013             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
   1014             lock.lock();
   1015             try {
   1016                 if (count == 0) {
   1017                     // assert itrs == null;
   1018                     cursor = NONE;
   1019                     nextIndex = NONE;
   1020                     prevTakeIndex = DETACHED;
   1021                 } else {
   1022                     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
   1023                     prevTakeIndex = takeIndex;
   1024                     nextItem = itemAt(nextIndex = takeIndex);
   1025                     cursor = incCursor(takeIndex);
   1026                     if (itrs == null) {
   1027                         itrs = new Itrs(this);
   1028                     } else {
   1029                         itrs.register(this); // in this order
   1030                         itrs.doSomeSweeping(false);
   1031                     }
   1032                     prevCycles = itrs.cycles;
   1033                     // assert takeIndex >= 0;
   1034                     // assert prevTakeIndex == takeIndex;
   1035                     // assert nextIndex >= 0;
   1036                     // assert nextItem != null;
   1037                 }
   1038             } finally {
   1039                 lock.unlock();
   1040             }
   1041         }
   1042 
   1043         boolean isDetached() {
   1044             // assert lock.getHoldCount() == 1;
   1045             return prevTakeIndex < 0;
   1046         }
   1047 
   1048         private int incCursor(int index) {
   1049             // assert lock.getHoldCount() == 1;
   1050             index = inc(index);
   1051             if (index == putIndex)
   1052                 index = NONE;
   1053             return index;
   1054         }
   1055 
   1056         /**
   1057          * Returns true if index is invalidated by the given number of
   1058          * dequeues, starting from prevTakeIndex.
   1059          */
   1060         private boolean invalidated(int index, int prevTakeIndex,
   1061                                     long dequeues, int length) {
   1062             if (index < 0)
   1063                 return false;
   1064             int distance = index - prevTakeIndex;
   1065             if (distance < 0)
   1066                 distance += length;
   1067             return dequeues > distance;
   1068         }
   1069 
   1070         /**
   1071          * Adjusts indices to incorporate all dequeues since the last
   1072          * operation on this iterator.  Call only from iterating thread.
   1073          */
   1074         private void incorporateDequeues() {
   1075             // assert lock.getHoldCount() == 1;
   1076             // assert itrs != null;
   1077             // assert !isDetached();
   1078             // assert count > 0;
   1079 
   1080             final int cycles = itrs.cycles;
   1081             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
   1082             final int prevCycles = this.prevCycles;
   1083             final int prevTakeIndex = this.prevTakeIndex;
   1084 
   1085             if (cycles != prevCycles || takeIndex != prevTakeIndex) {
   1086                 final int len = items.length;
   1087                 // how far takeIndex has advanced since the previous
   1088                 // operation of this iterator
   1089                 long dequeues = (cycles - prevCycles) * len
   1090                     + (takeIndex - prevTakeIndex);
   1091 
   1092                 // Check indices for invalidation
   1093                 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
   1094                     lastRet = REMOVED;
   1095                 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
   1096                     nextIndex = REMOVED;
   1097                 if (invalidated(cursor, prevTakeIndex, dequeues, len))
   1098                     cursor = takeIndex;
   1099 
   1100                 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
   1101                     detach();
   1102                 else {
   1103                     this.prevCycles = cycles;
   1104                     this.prevTakeIndex = takeIndex;
   1105                 }
   1106             }
   1107         }
   1108 
   1109         /**
   1110          * Called when itrs should stop tracking this iterator, either
   1111          * because there are no more indices to update (cursor < 0 &&
   1112          * nextIndex < 0 && lastRet < 0) or as a special exception, when
   1113          * lastRet >= 0, because hasNext() is about to return false for the
   1114          * first time.  Call only from iterating thread.
   1115          */
   1116         private void detach() {
   1117             // Switch to detached mode
   1118             // assert lock.getHoldCount() == 1;
   1119             // assert cursor == NONE;
   1120             // assert nextIndex < 0;
   1121             // assert lastRet < 0 || nextItem == null;
   1122             // assert lastRet < 0 ^ lastItem != null;
   1123             if (prevTakeIndex >= 0) {
   1124                 // assert itrs != null;
   1125                 prevTakeIndex = DETACHED;
   1126                 // try to unlink from itrs (but not too hard)
   1127                 itrs.doSomeSweeping(true);
   1128             }
   1129         }
   1130 
   1131         /**
   1132          * For performance reasons, we would like not to acquire a lock in
   1133          * hasNext in the common case.  To allow for this, we only access
   1134          * fields (i.e. nextItem) that are not modified by update operations
   1135          * triggered by queue modifications.
   1136          */
   1137         public boolean hasNext() {
   1138             // assert lock.getHoldCount() == 0;
   1139             if (nextItem != null)
   1140                 return true;
   1141             noNext();
   1142             return false;
   1143         }
   1144 
   1145         private void noNext() {
   1146             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
   1147             lock.lock();
   1148             try {
   1149                 // assert cursor == NONE;
   1150                 // assert nextIndex == NONE;
   1151                 if (!isDetached()) {
   1152                     // assert lastRet >= 0;
   1153                     incorporateDequeues(); // might update lastRet
   1154                     if (lastRet >= 0) {
   1155                         lastItem = itemAt(lastRet);
   1156                         // assert lastItem != null;
   1157                         detach();
   1158                     }
   1159                 }
   1160                 // assert isDetached();
   1161                 // assert lastRet < 0 ^ lastItem != null;
   1162             } finally {
   1163                 lock.unlock();
   1164             }
   1165         }
   1166 
   1167         public E next() {
   1168             // assert lock.getHoldCount() == 0;
   1169             final E x = nextItem;
   1170             if (x == null)
   1171                 throw new NoSuchElementException();
   1172             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
   1173             lock.lock();
   1174             try {
   1175                 if (!isDetached())
   1176                     incorporateDequeues();
   1177                 // assert nextIndex != NONE;
   1178                 // assert lastItem == null;
   1179                 lastRet = nextIndex;
   1180                 final int cursor = this.cursor;
   1181                 if (cursor >= 0) {
   1182                     nextItem = itemAt(nextIndex = cursor);
   1183                     // assert nextItem != null;
   1184                     this.cursor = incCursor(cursor);
   1185                 } else {
   1186                     nextIndex = NONE;
   1187                     nextItem = null;
   1188                 }
   1189             } finally {
   1190                 lock.unlock();
   1191             }
   1192             return x;
   1193         }
   1194 
   1195         public void remove() {
   1196             // assert lock.getHoldCount() == 0;
   1197             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
   1198             lock.lock();
   1199             try {
   1200                 if (!isDetached())
   1201                     incorporateDequeues(); // might update lastRet or detach
   1202                 final int lastRet = this.lastRet;
   1203                 this.lastRet = NONE;
   1204                 if (lastRet >= 0) {
   1205                     if (!isDetached())
   1206                         removeAt(lastRet);
   1207                     else {
   1208                         final E lastItem = this.lastItem;
   1209                         // assert lastItem != null;
   1210                         this.lastItem = null;
   1211                         if (itemAt(lastRet) == lastItem)
   1212                             removeAt(lastRet);
   1213                     }
   1214                 } else if (lastRet == NONE)
   1215                     throw new IllegalStateException();
   1216                 // else lastRet == REMOVED and the last returned element was
   1217                 // previously asynchronously removed via an operation other
   1218                 // than this.remove(), so nothing to do.
   1219 
   1220                 if (cursor < 0 && nextIndex < 0)
   1221                     detach();
   1222             } finally {
   1223                 lock.unlock();
   1224                 // assert lastRet == NONE;
   1225                 // assert lastItem == null;
   1226             }
   1227         }
   1228 
   1229         /**
   1230          * Called to notify the iterator that the queue is empty, or that it
   1231          * has fallen hopelessly behind, so that it should abandon any
   1232          * further iteration, except possibly to return one more element
   1233          * from next(), as promised by returning true from hasNext().
   1234          */
   1235         void shutdown() {
   1236             // assert lock.getHoldCount() == 1;
   1237             cursor = NONE;
   1238             if (nextIndex >= 0)
   1239                 nextIndex = REMOVED;
   1240             if (lastRet >= 0) {
   1241                 lastRet = REMOVED;
   1242                 lastItem = null;
   1243             }
   1244             prevTakeIndex = DETACHED;
   1245             // Don't set nextItem to null because we must continue to be
   1246             // able to return it on next().
   1247             //
   1248             // Caller will unlink from itrs when convenient.
   1249         }
   1250 
   1251         private int distance(int index, int prevTakeIndex, int length) {
   1252             int distance = index - prevTakeIndex;
   1253             if (distance < 0)
   1254                 distance += length;
   1255             return distance;
   1256         }
   1257 
   1258         /**
   1259          * Called whenever an interior remove (not at takeIndex) occured.
   1260          *
   1261          * @return true if this iterator should be unlinked from itrs
   1262          */
   1263         boolean removedAt(int removedIndex) {
   1264             // assert lock.getHoldCount() == 1;
   1265             if (isDetached())
   1266                 return true;
   1267 
   1268             final int cycles = itrs.cycles;
   1269             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
   1270             final int prevCycles = this.prevCycles;
   1271             final int prevTakeIndex = this.prevTakeIndex;
   1272             final int len = items.length;
   1273             int cycleDiff = cycles - prevCycles;
   1274             if (removedIndex < takeIndex)
   1275                 cycleDiff++;
   1276             final int removedDistance =
   1277                 (cycleDiff * len) + (removedIndex - prevTakeIndex);
   1278             // assert removedDistance >= 0;
   1279             int cursor = this.cursor;
   1280             if (cursor >= 0) {
   1281                 int x = distance(cursor, prevTakeIndex, len);
   1282                 if (x == removedDistance) {
   1283                     if (cursor == putIndex)
   1284                         this.cursor = cursor = NONE;
   1285                 }
   1286                 else if (x > removedDistance) {
   1287                     // assert cursor != prevTakeIndex;
   1288                     this.cursor = cursor = dec(cursor);
   1289                 }
   1290             }
   1291             int lastRet = this.lastRet;
   1292             if (lastRet >= 0) {
   1293                 int x = distance(lastRet, prevTakeIndex, len);
   1294                 if (x == removedDistance)
   1295                     this.lastRet = lastRet = REMOVED;
   1296                 else if (x > removedDistance)
   1297                     this.lastRet = lastRet = dec(lastRet);
   1298             }
   1299             int nextIndex = this.nextIndex;
   1300             if (nextIndex >= 0) {
   1301                 int x = distance(nextIndex, prevTakeIndex, len);
   1302                 if (x == removedDistance)
   1303                     this.nextIndex = nextIndex = REMOVED;
   1304                 else if (x > removedDistance)
   1305                     this.nextIndex = nextIndex = dec(nextIndex);
   1306             }
   1307             else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
   1308                 this.prevTakeIndex = DETACHED;
   1309                 return true;
   1310             }
   1311             return false;
   1312         }
   1313 
   1314         /**
   1315          * Called whenever takeIndex wraps around to zero.
   1316          *
   1317          * @return true if this iterator should be unlinked from itrs
   1318          */
   1319         boolean takeIndexWrapped() {
   1320             // assert lock.getHoldCount() == 1;
   1321             if (isDetached())
   1322                 return true;
   1323             if (itrs.cycles - prevCycles > 1) {
   1324                 // All the elements that existed at the time of the last
   1325                 // operation are gone, so abandon further iteration.
   1326                 shutdown();
   1327                 return true;
   1328             }
   1329             return false;
   1330         }
   1331 
   1332 //         /** Uncomment for debugging. */
   1333 //         public String toString() {
   1334 //             return ("cursor=" + cursor + " " +
   1335 //                     "nextIndex=" + nextIndex + " " +
   1336 //                     "lastRet=" + lastRet + " " +
   1337 //                     "nextItem=" + nextItem + " " +
   1338 //                     "lastItem=" + lastItem + " " +
   1339 //                     "prevCycles=" + prevCycles + " " +
   1340 //                     "prevTakeIndex=" + prevTakeIndex + " " +
   1341 //                     "size()=" + size() + " " +
   1342 //                     "remainingCapacity()=" + remainingCapacity());
   1343 //         }
   1344     }
   1345 }
   1346