Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Written by Doug Lea with assistance from members of JCP JSR-166
      3  * Expert Group and released to the public domain, as explained at
      4  * http://creativecommons.org/publicdomain/zero/1.0/
      5  */
      6 
      7 package java.util.concurrent;
      8 import java.util.concurrent.locks.Condition;
      9 import java.util.concurrent.locks.ReentrantLock;
     10 import java.util.AbstractQueue;
     11 import java.util.Collection;
     12 import java.util.Iterator;
     13 import java.util.NoSuchElementException;
     14 import java.lang.ref.WeakReference;
     15 
     16 // BEGIN android-note
     17 // removed link to collections framework docs
     18 // END android-note
     19 
     20 /**
     21  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
     22  * array.  This queue orders elements FIFO (first-in-first-out).  The
     23  * <em>head</em> of the queue is that element that has been on the
     24  * queue the longest time.  The <em>tail</em> of the queue is that
     25  * element that has been on the queue the shortest time. New elements
     26  * are inserted at the tail of the queue, and the queue retrieval
     27  * operations obtain elements at the head of the queue.
     28  *
     29  * <p>This is a classic &quot;bounded buffer&quot;, in which a
     30  * fixed-sized array holds elements inserted by producers and
     31  * extracted by consumers.  Once created, the capacity cannot be
     32  * changed.  Attempts to {@code put} an element into a full queue
     33  * will result in the operation blocking; attempts to {@code take} an
     34  * element from an empty queue will similarly block.
     35  *
     36  * <p>This class supports an optional fairness policy for ordering
     37  * waiting producer and consumer threads.  By default, this ordering
     38  * is not guaranteed. However, a queue constructed with fairness set
     39  * to {@code true} grants threads access in FIFO order. Fairness
     40  * generally decreases throughput but reduces variability and avoids
     41  * starvation.
     42  *
     43  * <p>This class and its iterator implement all of the
     44  * <em>optional</em> methods of the {@link Collection} and {@link
     45  * Iterator} interfaces.
     46  *
     47  * @since 1.5
     48  * @author Doug Lea
     49  * @param <E> the type of elements held in this collection
     50  */
     51 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
     52         implements BlockingQueue<E>, java.io.Serializable {
     53 
     54     /**
     55      * Serialization ID. This class relies on default serialization
     56      * even for the items array, which is default-serialized, even if
     57      * it is empty. Otherwise it could not be declared final, which is
     58      * necessary here.
     59      */
     60     private static final long serialVersionUID = -817911632652898426L;
     61 
     62     /** The queued items */
     63     final Object[] items;
     64 
     65     /** items index for next take, poll, peek or remove */
     66     int takeIndex;
     67 
     68     /** items index for next put, offer, or add */
     69     int putIndex;
     70 
     71     /** Number of elements in the queue */
     72     int count;
     73 
     74     /*
     75      * Concurrency control uses the classic two-condition algorithm
     76      * found in any textbook.
     77      */
     78 
     79     /** Main lock guarding all access */
     80     final ReentrantLock lock;
     81 
     82     /** Condition for waiting takes */
     83     private final Condition notEmpty;
     84 
     85     /** Condition for waiting puts */
     86     private final Condition notFull;
     87 
     88     /**
     89      * Shared state for currently active iterators, or null if there
     90      * are known not to be any.  Allows queue operations to update
     91      * iterator state.
     92      */
     93     transient Itrs itrs = null;
     94 
     95     // Internal helper methods
     96 
     97     /**
     98      * Circularly increment i.
     99      */
    100     final int inc(int i) {
    101         return (++i == items.length) ? 0 : i;
    102     }
    103 
    104     /**
    105      * Circularly decrement i.
    106      */
    107     final int dec(int i) {
    108         return ((i == 0) ? items.length : i) - 1;
    109     }
    110 
    111     /**
    112      * Returns item at index i.
    113      */
    114     @SuppressWarnings("unchecked")
    115     final E itemAt(int i) {
    116         return (E) items[i];
    117     }
    118 
    119     /**
    120      * Throws NullPointerException if argument is null.
    121      *
    122      * @param v the element
    123      */
    124     private static void checkNotNull(Object v) {
    125         if (v == null)
    126             throw new NullPointerException();
    127     }
    128 
    129     /**
    130      * Inserts element at current put position, advances, and signals.
    131      * Call only when holding lock.
    132      */
    133     private void enqueue(E x) {
    134         // assert lock.getHoldCount() == 1;
    135         // assert items[putIndex] == null;
    136         items[putIndex] = x;
    137         putIndex = inc(putIndex);
    138         count++;
    139         notEmpty.signal();
    140     }
    141 
    142     /**
    143      * Extracts element at current take position, advances, and signals.
    144      * Call only when holding lock.
    145      */
    146     private E dequeue() {
    147         // assert lock.getHoldCount() == 1;
    148         // assert items[takeIndex] != null;
    149         final Object[] items = this.items;
    150         @SuppressWarnings("unchecked")
    151         E x = (E) items[takeIndex];
    152         items[takeIndex] = null;
    153         takeIndex = inc(takeIndex);
    154         count--;
    155         if (itrs != null)
    156             itrs.elementDequeued();
    157         notFull.signal();
    158         return x;
    159     }
    160 
    161     /**
    162      * Deletes item at array index removeIndex.
    163      * Utility for remove(Object) and iterator.remove.
    164      * Call only when holding lock.
    165      */
    166     void removeAt(final int removeIndex) {
    167         // assert lock.getHoldCount() == 1;
    168         // assert items[removeIndex] != null;
    169         // assert removeIndex >= 0 && removeIndex < items.length;
    170         final Object[] items = this.items;
    171         if (removeIndex == takeIndex) {
    172             // removing front item; just advance
    173             items[takeIndex] = null;
    174             takeIndex = inc(takeIndex);
    175             count--;
    176             if (itrs != null)
    177                 itrs.elementDequeued();
    178         } else {
    179             // an "interior" remove
    180 
    181             // slide over all others up through putIndex.
    182             final int putIndex = this.putIndex;
    183             for (int i = removeIndex;;) {
    184                 int next = inc(i);
    185                 if (next != putIndex) {
    186                     items[i] = items[next];
    187                     i = next;
    188                 } else {
    189                     items[i] = null;
    190                     this.putIndex = i;
    191                     break;
    192                 }
    193             }
    194             count--;
    195             if (itrs != null)
    196                 itrs.removedAt(removeIndex);
    197         }
    198         notFull.signal();
    199     }
    200 
    201     /**
    202      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
    203      * capacity and default access policy.
    204      *
    205      * @param capacity the capacity of this queue
    206      * @throws IllegalArgumentException if {@code capacity < 1}
    207      */
    208     public ArrayBlockingQueue(int capacity) {
    209         this(capacity, false);
    210     }
    211 
    212     /**
    213      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
    214      * capacity and the specified access policy.
    215      *
    216      * @param capacity the capacity of this queue
    217      * @param fair if {@code true} then queue accesses for threads blocked
    218      *        on insertion or removal, are processed in FIFO order;
    219      *        if {@code false} the access order is unspecified.
    220      * @throws IllegalArgumentException if {@code capacity < 1}
    221      */
    222     public ArrayBlockingQueue(int capacity, boolean fair) {
    223         if (capacity <= 0)
    224             throw new IllegalArgumentException();
    225         this.items = new Object[capacity];
    226         lock = new ReentrantLock(fair);
    227         notEmpty = lock.newCondition();
    228         notFull =  lock.newCondition();
    229     }
    230 
    231     /**
    232      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
    233      * capacity, the specified access policy and initially containing the
    234      * elements of the given collection,
    235      * added in traversal order of the collection's iterator.
    236      *
    237      * @param capacity the capacity of this queue
    238      * @param fair if {@code true} then queue accesses for threads blocked
    239      *        on insertion or removal, are processed in FIFO order;
    240      *        if {@code false} the access order is unspecified.
    241      * @param c the collection of elements to initially contain
    242      * @throws IllegalArgumentException if {@code capacity} is less than
    243      *         {@code c.size()}, or less than 1.
    244      * @throws NullPointerException if the specified collection or any
    245      *         of its elements are null
    246      */
    247     public ArrayBlockingQueue(int capacity, boolean fair,
    248                               Collection<? extends E> c) {
    249         this(capacity, fair);
    250 
    251         final ReentrantLock lock = this.lock;
    252         lock.lock(); // Lock only for visibility, not mutual exclusion
    253         try {
    254             int i = 0;
    255             try {
    256                 for (E e : c) {
    257                     checkNotNull(e);
    258                     items[i++] = e;
    259                 }
    260             } catch (ArrayIndexOutOfBoundsException ex) {
    261                 throw new IllegalArgumentException();
    262             }
    263             count = i;
    264             putIndex = (i == capacity) ? 0 : i;
    265         } finally {
    266             lock.unlock();
    267         }
    268     }
    269 
    270     /**
    271      * Inserts the specified element at the tail of this queue if it is
    272      * possible to do so immediately without exceeding the queue's capacity,
    273      * returning {@code true} upon success and throwing an
    274      * {@code IllegalStateException} if this queue is full.
    275      *
    276      * @param e the element to add
    277      * @return {@code true} (as specified by {@link Collection#add})
    278      * @throws IllegalStateException if this queue is full
    279      * @throws NullPointerException if the specified element is null
    280      */
    281     public boolean add(E e) {
    282         return super.add(e);
    283     }
    284 
    285     /**
    286      * Inserts the specified element at the tail of this queue if it is
    287      * possible to do so immediately without exceeding the queue's capacity,
    288      * returning {@code true} upon success and {@code false} if this queue
    289      * is full.  This method is generally preferable to method {@link #add},
    290      * which can fail to insert an element only by throwing an exception.
    291      *
    292      * @throws NullPointerException if the specified element is null
    293      */
    294     public boolean offer(E e) {
    295         checkNotNull(e);
    296         final ReentrantLock lock = this.lock;
    297         lock.lock();
    298         try {
    299             if (count == items.length)
    300                 return false;
    301             else {
    302                 enqueue(e);
    303                 return true;
    304             }
    305         } finally {
    306             lock.unlock();
    307         }
    308     }
    309 
    310     /**
    311      * Inserts the specified element at the tail of this queue, waiting
    312      * for space to become available if the queue is full.
    313      *
    314      * @throws InterruptedException {@inheritDoc}
    315      * @throws NullPointerException {@inheritDoc}
    316      */
    317     public void put(E e) throws InterruptedException {
    318         checkNotNull(e);
    319         final ReentrantLock lock = this.lock;
    320         lock.lockInterruptibly();
    321         try {
    322             while (count == items.length)
    323                 notFull.await();
    324             enqueue(e);
    325         } finally {
    326             lock.unlock();
    327         }
    328     }
    329 
    330     /**
    331      * Inserts the specified element at the tail of this queue, waiting
    332      * up to the specified wait time for space to become available if
    333      * the queue is full.
    334      *
    335      * @throws InterruptedException {@inheritDoc}
    336      * @throws NullPointerException {@inheritDoc}
    337      */
    338     public boolean offer(E e, long timeout, TimeUnit unit)
    339         throws InterruptedException {
    340 
    341         checkNotNull(e);
    342         long nanos = unit.toNanos(timeout);
    343         final ReentrantLock lock = this.lock;
    344         lock.lockInterruptibly();
    345         try {
    346             while (count == items.length) {
    347                 if (nanos <= 0)
    348                     return false;
    349                 nanos = notFull.awaitNanos(nanos);
    350             }
    351             enqueue(e);
    352             return true;
    353         } finally {
    354             lock.unlock();
    355         }
    356     }
    357 
    358     public E poll() {
    359         final ReentrantLock lock = this.lock;
    360         lock.lock();
    361         try {
    362             return (count == 0) ? null : dequeue();
    363         } finally {
    364             lock.unlock();
    365         }
    366     }
    367 
    368     public E take() throws InterruptedException {
    369         final ReentrantLock lock = this.lock;
    370         lock.lockInterruptibly();
    371         try {
    372             while (count == 0)
    373                 notEmpty.await();
    374             return dequeue();
    375         } finally {
    376             lock.unlock();
    377         }
    378     }
    379 
    380     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    381         long nanos = unit.toNanos(timeout);
    382         final ReentrantLock lock = this.lock;
    383         lock.lockInterruptibly();
    384         try {
    385             while (count == 0) {
    386                 if (nanos <= 0)
    387                     return null;
    388                 nanos = notEmpty.awaitNanos(nanos);
    389             }
    390             return dequeue();
    391         } finally {
    392             lock.unlock();
    393         }
    394     }
    395 
    396     public E peek() {
    397         final ReentrantLock lock = this.lock;
    398         lock.lock();
    399         try {
    400             return itemAt(takeIndex); // null when queue is empty
    401         } finally {
    402             lock.unlock();
    403         }
    404     }
    405 
    406     // this doc comment is overridden to remove the reference to collections
    407     // greater in size than Integer.MAX_VALUE
    408     /**
    409      * Returns the number of elements in this queue.
    410      *
    411      * @return the number of elements in this queue
    412      */
    413     public int size() {
    414         final ReentrantLock lock = this.lock;
    415         lock.lock();
    416         try {
    417             return count;
    418         } finally {
    419             lock.unlock();
    420         }
    421     }
    422 
    423     // this doc comment is a modified copy of the inherited doc comment,
    424     // without the reference to unlimited queues.
    425     /**
    426      * Returns the number of additional elements that this queue can ideally
    427      * (in the absence of memory or resource constraints) accept without
    428      * blocking. This is always equal to the initial capacity of this queue
    429      * less the current {@code size} of this queue.
    430      *
    431      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
    432      * an element will succeed by inspecting {@code remainingCapacity}
    433      * because it may be the case that another thread is about to
    434      * insert or remove an element.
    435      */
    436     public int remainingCapacity() {
    437         final ReentrantLock lock = this.lock;
    438         lock.lock();
    439         try {
    440             return items.length - count;
    441         } finally {
    442             lock.unlock();
    443         }
    444     }
    445 
    446     /**
    447      * Removes a single instance of the specified element from this queue,
    448      * if it is present.  More formally, removes an element {@code e} such
    449      * that {@code o.equals(e)}, if this queue contains one or more such
    450      * elements.
    451      * Returns {@code true} if this queue contained the specified element
    452      * (or equivalently, if this queue changed as a result of the call).
    453      *
    454      * <p>Removal of interior elements in circular array based queues
    455      * is an intrinsically slow and disruptive operation, so should
    456      * be undertaken only in exceptional circumstances, ideally
    457      * only when the queue is known not to be accessible by other
    458      * threads.
    459      *
    460      * @param o element to be removed from this queue, if present
    461      * @return {@code true} if this queue changed as a result of the call
    462      */
    463     public boolean remove(Object o) {
    464         if (o == null) return false;
    465         final Object[] items = this.items;
    466         final ReentrantLock lock = this.lock;
    467         lock.lock();
    468         try {
    469             if (count > 0) {
    470                 final int putIndex = this.putIndex;
    471                 int i = takeIndex;
    472                 do {
    473                     if (o.equals(items[i])) {
    474                         removeAt(i);
    475                         return true;
    476                     }
    477                 } while ((i = inc(i)) != putIndex);
    478             }
    479             return false;
    480         } finally {
    481             lock.unlock();
    482         }
    483     }
    484 
    485     /**
    486      * Returns {@code true} if this queue contains the specified element.
    487      * More formally, returns {@code true} if and only if this queue contains
    488      * at least one element {@code e} such that {@code o.equals(e)}.
    489      *
    490      * @param o object to be checked for containment in this queue
    491      * @return {@code true} if this queue contains the specified element
    492      */
    493     public boolean contains(Object o) {
    494         if (o == null) return false;
    495         final Object[] items = this.items;
    496         final ReentrantLock lock = this.lock;
    497         lock.lock();
    498         try {
    499             if (count > 0) {
    500                 final int putIndex = this.putIndex;
    501                 int i = takeIndex;
    502                 do {
    503                     if (o.equals(items[i]))
    504                         return true;
    505                 } while ((i = inc(i)) != putIndex);
    506             }
    507             return false;
    508         } finally {
    509             lock.unlock();
    510         }
    511     }
    512 
    513     /**
    514      * Returns an array containing all of the elements in this queue, in
    515      * proper sequence.
    516      *
    517      * <p>The returned array will be "safe" in that no references to it are
    518      * maintained by this queue.  (In other words, this method must allocate
    519      * a new array).  The caller is thus free to modify the returned array.
    520      *
    521      * <p>This method acts as bridge between array-based and collection-based
    522      * APIs.
    523      *
    524      * @return an array containing all of the elements in this queue
    525      */
    526     public Object[] toArray() {
    527         final Object[] items = this.items;
    528         final ReentrantLock lock = this.lock;
    529         lock.lock();
    530         try {
    531             final int count = this.count;
    532             Object[] a = new Object[count];
    533             int n = items.length - takeIndex;
    534             if (count <= n) {
    535                 System.arraycopy(items, takeIndex, a, 0, count);
    536             } else {
    537                 System.arraycopy(items, takeIndex, a, 0, n);
    538                 System.arraycopy(items, 0, a, n, count - n);
    539             }
    540             return a;
    541         } finally {
    542             lock.unlock();
    543         }
    544     }
    545 
    546     /**
    547      * Returns an array containing all of the elements in this queue, in
    548      * proper sequence; the runtime type of the returned array is that of
    549      * the specified array.  If the queue fits in the specified array, it
    550      * is returned therein.  Otherwise, a new array is allocated with the
    551      * runtime type of the specified array and the size of this queue.
    552      *
    553      * <p>If this queue fits in the specified array with room to spare
    554      * (i.e., the array has more elements than this queue), the element in
    555      * the array immediately following the end of the queue is set to
    556      * {@code null}.
    557      *
    558      * <p>Like the {@link #toArray()} method, this method acts as bridge between
    559      * array-based and collection-based APIs.  Further, this method allows
    560      * precise control over the runtime type of the output array, and may,
    561      * under certain circumstances, be used to save allocation costs.
    562      *
    563      * <p>Suppose {@code x} is a queue known to contain only strings.
    564      * The following code can be used to dump the queue into a newly
    565      * allocated array of {@code String}:
    566      *
    567      *  <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
    568      *
    569      * Note that {@code toArray(new Object[0])} is identical in function to
    570      * {@code toArray()}.
    571      *
    572      * @param a the array into which the elements of the queue are to
    573      *          be stored, if it is big enough; otherwise, a new array of the
    574      *          same runtime type is allocated for this purpose
    575      * @return an array containing all of the elements in this queue
    576      * @throws ArrayStoreException if the runtime type of the specified array
    577      *         is not a supertype of the runtime type of every element in
    578      *         this queue
    579      * @throws NullPointerException if the specified array is null
    580      */
    581     @SuppressWarnings("unchecked")
    582     public <T> T[] toArray(T[] a) {
    583         final Object[] items = this.items;
    584         final ReentrantLock lock = this.lock;
    585         lock.lock();
    586         try {
    587             final int count = this.count;
    588             final int len = a.length;
    589             if (len < count)
    590                 a = (T[])java.lang.reflect.Array.newInstance(
    591                     a.getClass().getComponentType(), count);
    592             int n = items.length - takeIndex;
    593             if (count <= n)
    594                 System.arraycopy(items, takeIndex, a, 0, count);
    595             else {
    596                 System.arraycopy(items, takeIndex, a, 0, n);
    597                 System.arraycopy(items, 0, a, n, count - n);
    598             }
    599             if (len > count)
    600                 a[count] = null;
    601             return a;
    602         } finally {
    603             lock.unlock();
    604         }
    605     }
    606 
    607     public String toString() {
    608         final ReentrantLock lock = this.lock;
    609         lock.lock();
    610         try {
    611             int k = count;
    612             if (k == 0)
    613                 return "[]";
    614 
    615             StringBuilder sb = new StringBuilder();
    616             sb.append('[');
    617             for (int i = takeIndex; ; i = inc(i)) {
    618                 Object e = items[i];
    619                 sb.append(e == this ? "(this Collection)" : e);
    620                 if (--k == 0)
    621                     return sb.append(']').toString();
    622                 sb.append(',').append(' ');
    623             }
    624         } finally {
    625             lock.unlock();
    626         }
    627     }
    628 
    629     /**
    630      * Atomically removes all of the elements from this queue.
    631      * The queue will be empty after this call returns.
    632      */
    633     public void clear() {
    634         final Object[] items = this.items;
    635         final ReentrantLock lock = this.lock;
    636         lock.lock();
    637         try {
    638             int k = count;
    639             if (k > 0) {
    640                 final int putIndex = this.putIndex;
    641                 int i = takeIndex;
    642                 do {
    643                     items[i] = null;
    644                 } while ((i = inc(i)) != putIndex);
    645                 takeIndex = putIndex;
    646                 count = 0;
    647                 if (itrs != null)
    648                     itrs.queueIsEmpty();
    649                 for (; k > 0 && lock.hasWaiters(notFull); k--)
    650                     notFull.signal();
    651             }
    652         } finally {
    653             lock.unlock();
    654         }
    655     }
    656 
    657     /**
    658      * @throws UnsupportedOperationException {@inheritDoc}
    659      * @throws ClassCastException            {@inheritDoc}
    660      * @throws NullPointerException          {@inheritDoc}
    661      * @throws IllegalArgumentException      {@inheritDoc}
    662      */
    663     public int drainTo(Collection<? super E> c) {
    664         return drainTo(c, Integer.MAX_VALUE);
    665     }
    666 
    667     /**
    668      * @throws UnsupportedOperationException {@inheritDoc}
    669      * @throws ClassCastException            {@inheritDoc}
    670      * @throws NullPointerException          {@inheritDoc}
    671      * @throws IllegalArgumentException      {@inheritDoc}
    672      */
    673     public int drainTo(Collection<? super E> c, int maxElements) {
    674         checkNotNull(c);
    675         if (c == this)
    676             throw new IllegalArgumentException();
    677         if (maxElements <= 0)
    678             return 0;
    679         final Object[] items = this.items;
    680         final ReentrantLock lock = this.lock;
    681         lock.lock();
    682         try {
    683             int n = Math.min(maxElements, count);
    684             int take = takeIndex;
    685             int i = 0;
    686             try {
    687                 while (i < n) {
    688                     @SuppressWarnings("unchecked")
    689                     E x = (E) items[take];
    690                     c.add(x);
    691                     items[take] = null;
    692                     take = inc(take);
    693                     i++;
    694                 }
    695                 return n;
    696             } finally {
    697                 // Restore invariants even if c.add() threw
    698                 if (i > 0) {
    699                     count -= i;
    700                     takeIndex = take;
    701                     if (itrs != null) {
    702                         if (count == 0)
    703                             itrs.queueIsEmpty();
    704                         else if (i > take)
    705                             itrs.takeIndexWrapped();
    706                     }
    707                     for (; i > 0 && lock.hasWaiters(notFull); i--)
    708                         notFull.signal();
    709                 }
    710             }
    711         } finally {
    712             lock.unlock();
    713         }
    714     }
    715 
    716     /**
    717      * Returns an iterator over the elements in this queue in proper sequence.
    718      * The elements will be returned in order from first (head) to last (tail).
    719      *
    720      * <p>The returned iterator is a "weakly consistent" iterator that
    721      * will never throw {@link java.util.ConcurrentModificationException
    722      * ConcurrentModificationException}, and guarantees to traverse
    723      * elements as they existed upon construction of the iterator, and
    724      * may (but is not guaranteed to) reflect any modifications
    725      * subsequent to construction.
    726      *
    727      * @return an iterator over the elements in this queue in proper sequence
    728      */
    729     public Iterator<E> iterator() {
    730         return new Itr();
    731     }
    732 
    733     /**
    734      * Shared data between iterators and their queue, allowing queue
    735      * modifications to update iterators when elements are removed.
    736      *
    737      * This adds a lot of complexity for the sake of correctly
    738      * handling some uncommon operations, but the combination of
    739      * circular-arrays and supporting interior removes (i.e., those
    740      * not at head) would cause iterators to sometimes lose their
    741      * places and/or (re)report elements they shouldn't.  To avoid
    742      * this, when a queue has one or more iterators, it keeps iterator
    743      * state consistent by:
    744      *
    745      * (1) keeping track of the number of "cycles", that is, the
    746      *     number of times takeIndex has wrapped around to 0.
    747      * (2) notifying all iterators via the callback removedAt whenever
    748      *     an interior element is removed (and thus other elements may
    749      *     be shifted).
    750      *
    751      * These suffice to eliminate iterator inconsistencies, but
    752      * unfortunately add the secondary responsibility of maintaining
    753      * the list of iterators.  We track all active iterators in a
    754      * simple linked list (accessed only when the queue's lock is
    755      * held) of weak references to Itr.  The list is cleaned up using
    756      * 3 different mechanisms:
    757      *
    758      * (1) Whenever a new iterator is created, do some O(1) checking for
    759      *     stale list elements.
    760      *
    761      * (2) Whenever takeIndex wraps around to 0, check for iterators
    762      *     that have been unused for more than one wrap-around cycle.
    763      *
    764      * (3) Whenever the queue becomes empty, all iterators are notified
    765      *     and this entire data structure is discarded.
    766      *
    767      * So in addition to the removedAt callback that is necessary for
    768      * correctness, iterators have the shutdown and takeIndexWrapped
    769      * callbacks that help remove stale iterators from the list.
    770      *
    771      * Whenever a list element is examined, it is expunged if either
    772      * the GC has determined that the iterator is discarded, or if the
    773      * iterator reports that it is "detached" (does not need any
    774      * further state updates).  Overhead is maximal when takeIndex
    775      * never advances, iterators are discarded before they are
    776      * exhausted, and all removals are interior removes, in which case
    777      * all stale iterators are discovered by the GC.  But even in this
    778      * case we don't increase the amortized complexity.
    779      *
    780      * Care must be taken to keep list sweeping methods from
    781      * reentrantly invoking another such method, causing subtle
    782      * corruption bugs.
    783      */
    784     class Itrs {
    785 
    786         /**
    787          * Node in a linked list of weak iterator references.
    788          */
    789         private class Node extends WeakReference<Itr> {
    790             Node next;
    791 
    792             Node(Itr iterator, Node next) {
    793                 super(iterator);
    794                 this.next = next;
    795             }
    796         }
    797 
    798         /** Incremented whenever takeIndex wraps around to 0 */
    799         int cycles = 0;
    800 
    801         /** Linked list of weak iterator references */
    802         private Node head;
    803 
    804         /** Used to expunge stale iterators */
    805         private Node sweeper = null;
    806 
    807         private static final int SHORT_SWEEP_PROBES = 4;
    808         private static final int LONG_SWEEP_PROBES = 16;
    809 
    810         Itrs(Itr initial) {
    811             register(initial);
    812         }
    813 
    814         /**
    815          * Sweeps itrs, looking for and expunging stale iterators.
    816          * If at least one was found, tries harder to find more.
    817          * Called only from iterating thread.
    818          *
    819          * @param tryHarder whether to start in try-harder mode, because
    820          * there is known to be at least one iterator to collect
    821          */
    822         void doSomeSweeping(boolean tryHarder) {
    823             // assert lock.getHoldCount() == 1;
    824             // assert head != null;
    825             int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
    826             Node o, p;
    827             final Node sweeper = this.sweeper;
    828             boolean passedGo;   // to limit search to one full sweep
    829 
    830             if (sweeper == null) {
    831                 o = null;
    832                 p = head;
    833                 passedGo = true;
    834             } else {
    835                 o = sweeper;
    836                 p = o.next;
    837                 passedGo = false;
    838             }
    839 
    840             for (; probes > 0; probes--) {
    841                 if (p == null) {
    842                     if (passedGo)
    843                         break;
    844                     o = null;
    845                     p = head;
    846                     passedGo = true;
    847                 }
    848                 final Itr it = p.get();
    849                 final Node next = p.next;
    850                 if (it == null || it.isDetached()) {
    851                     // found a discarded/exhausted iterator
    852                     probes = LONG_SWEEP_PROBES; // "try harder"
    853                     // unlink p
    854                     p.clear();
    855                     p.next = null;
    856                     if (o == null) {
    857                         head = next;
    858                         if (next == null) {
    859                             // We've run out of iterators to track; retire
    860                             itrs = null;
    861                             return;
    862                         }
    863                     }
    864                     else
    865                         o.next = next;
    866                 } else {
    867                     o = p;
    868                 }
    869                 p = next;
    870             }
    871 
    872             this.sweeper = (p == null) ? null : o;
    873         }
    874 
    875         /**
    876          * Adds a new iterator to the linked list of tracked iterators.
    877          */
    878         void register(Itr itr) {
    879             // assert lock.getHoldCount() == 1;
    880             head = new Node(itr, head);
    881         }
    882 
    883         /**
    884          * Called whenever takeIndex wraps around to 0.
    885          *
    886          * Notifies all iterators, and expunges any that are now stale.
    887          */
    888         void takeIndexWrapped() {
    889             // assert lock.getHoldCount() == 1;
    890             cycles++;
    891             for (Node o = null, p = head; p != null;) {
    892                 final Itr it = p.get();
    893                 final Node next = p.next;
    894                 if (it == null || it.takeIndexWrapped()) {
    895                     // unlink p
    896                     // assert it == null || it.isDetached();
    897                     p.clear();
    898                     p.next = null;
    899                     if (o == null)
    900                         head = next;
    901                     else
    902                         o.next = next;
    903                 } else {
    904                     o = p;
    905                 }
    906                 p = next;
    907             }
    908             if (head == null)   // no more iterators to track
    909                 itrs = null;
    910         }
    911 
    912         /**
    913          * Called whenever an interior remove (not at takeIndex) occured.
    914          *
    915          * Notifies all iterators, and expunges any that are now stale.
    916          */
    917         void removedAt(int removedIndex) {
    918             for (Node o = null, p = head; p != null;) {
    919                 final Itr it = p.get();
    920                 final Node next = p.next;
    921                 if (it == null || it.removedAt(removedIndex)) {
    922                     // unlink p
    923                     // assert it == null || it.isDetached();
    924                     p.clear();
    925                     p.next = null;
    926                     if (o == null)
    927                         head = next;
    928                     else
    929                         o.next = next;
    930                 } else {
    931                     o = p;
    932                 }
    933                 p = next;
    934             }
    935             if (head == null)   // no more iterators to track
    936                 itrs = null;
    937         }
    938 
    939         /**
    940          * Called whenever the queue becomes empty.
    941          *
    942          * Notifies all active iterators that the queue is empty,
    943          * clears all weak refs, and unlinks the itrs datastructure.
    944          */
    945         void queueIsEmpty() {
    946             // assert lock.getHoldCount() == 1;
    947             for (Node p = head; p != null; p = p.next) {
    948                 Itr it = p.get();
    949                 if (it != null) {
    950                     p.clear();
    951                     it.shutdown();
    952                 }
    953             }
    954             head = null;
    955             itrs = null;
    956         }
    957 
    958         /**
    959          * Called whenever an element has been dequeued (at takeIndex).
    960          */
    961         void elementDequeued() {
    962             // assert lock.getHoldCount() == 1;
    963             if (count == 0)
    964                 queueIsEmpty();
    965             else if (takeIndex == 0)
    966                 takeIndexWrapped();
    967         }
    968     }
    969 
    970     /**
    971      * Iterator for ArrayBlockingQueue.
    972      *
    973      * To maintain weak consistency with respect to puts and takes, we
    974      * read ahead one slot, so as to not report hasNext true but then
    975      * not have an element to return.
    976      *
    977      * We switch into "detached" mode (allowing prompt unlinking from
    978      * itrs without help from the GC) when all indices are negative, or
    979      * when hasNext returns false for the first time.  This allows the
    980      * iterator to track concurrent updates completely accurately,
    981      * except for the corner case of the user calling Iterator.remove()
    982      * after hasNext() returned false.  Even in this case, we ensure
    983      * that we don't remove the wrong element by keeping track of the
    984      * expected element to remove, in lastItem.  Yes, we may fail to
    985      * remove lastItem from the queue if it moved due to an interleaved
    986      * interior remove while in detached mode.
    987      */
    988     private class Itr implements Iterator<E> {
    989         /** Index to look for new nextItem; NONE at end */
    990         private int cursor;
    991 
    992         /** Element to be returned by next call to next(); null if none */
    993         private E nextItem;
    994 
    995         /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
    996         private int nextIndex;
    997 
    998         /** Last element returned; null if none or not detached. */
    999         private E lastItem;
   1000 
   1001         /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
   1002         private int lastRet;
   1003 
   1004         /** Previous value of takeIndex, or DETACHED when detached */
   1005         private int prevTakeIndex;
   1006 
   1007         /** Previous value of iters.cycles */
   1008         private int prevCycles;
   1009 
   1010         /** Special index value indicating "not available" or "undefined" */
   1011         private static final int NONE = -1;
   1012 
   1013         /**
   1014          * Special index value indicating "removed elsewhere", that is,
   1015          * removed by some operation other than a call to this.remove().
   1016          */
   1017         private static final int REMOVED = -2;
   1018 
   1019         /** Special value for prevTakeIndex indicating "detached mode" */
   1020         private static final int DETACHED = -3;
   1021 
   1022         Itr() {
   1023             // assert lock.getHoldCount() == 0;
   1024             lastRet = NONE;
   1025             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
   1026             lock.lock();
   1027             try {
   1028                 if (count == 0) {
   1029                     // assert itrs == null;
   1030                     cursor = NONE;
   1031                     nextIndex = NONE;
   1032                     prevTakeIndex = DETACHED;
   1033                 } else {
   1034                     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
   1035                     prevTakeIndex = takeIndex;
   1036                     nextItem = itemAt(nextIndex = takeIndex);
   1037                     cursor = incCursor(takeIndex);
   1038                     if (itrs == null) {
   1039                         itrs = new Itrs(this);
   1040                     } else {
   1041                         itrs.register(this); // in this order
   1042                         itrs.doSomeSweeping(false);
   1043                     }
   1044                     prevCycles = itrs.cycles;
   1045                     // assert takeIndex >= 0;
   1046                     // assert prevTakeIndex == takeIndex;
   1047                     // assert nextIndex >= 0;
   1048                     // assert nextItem != null;
   1049                 }
   1050             } finally {
   1051                 lock.unlock();
   1052             }
   1053         }
   1054 
   1055         boolean isDetached() {
   1056             // assert lock.getHoldCount() == 1;
   1057             return prevTakeIndex < 0;
   1058         }
   1059 
   1060         private int incCursor(int index) {
   1061             // assert lock.getHoldCount() == 1;
   1062             index = inc(index);
   1063             if (index == putIndex)
   1064                 index = NONE;
   1065             return index;
   1066         }
   1067 
   1068         /**
   1069          * Returns true if index is invalidated by the given number of
   1070          * dequeues, starting from prevTakeIndex.
   1071          */
   1072         private boolean invalidated(int index, int prevTakeIndex,
   1073                                     long dequeues, int length) {
   1074             if (index < 0)
   1075                 return false;
   1076             int distance = index - prevTakeIndex;
   1077             if (distance < 0)
   1078                 distance += length;
   1079             return dequeues > distance;
   1080         }
   1081 
   1082         /**
   1083          * Adjusts indices to incorporate all dequeues since the last
   1084          * operation on this iterator.  Call only from iterating thread.
   1085          */
   1086         private void incorporateDequeues() {
   1087             // assert lock.getHoldCount() == 1;
   1088             // assert itrs != null;
   1089             // assert !isDetached();
   1090             // assert count > 0;
   1091 
   1092             final int cycles = itrs.cycles;
   1093             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
   1094             final int prevCycles = this.prevCycles;
   1095             final int prevTakeIndex = this.prevTakeIndex;
   1096 
   1097             if (cycles != prevCycles || takeIndex != prevTakeIndex) {
   1098                 final int len = items.length;
   1099                 // how far takeIndex has advanced since the previous
   1100                 // operation of this iterator
   1101                 long dequeues = (cycles - prevCycles) * len
   1102                     + (takeIndex - prevTakeIndex);
   1103 
   1104                 // Check indices for invalidation
   1105                 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
   1106                     lastRet = REMOVED;
   1107                 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
   1108                     nextIndex = REMOVED;
   1109                 if (invalidated(cursor, prevTakeIndex, dequeues, len))
   1110                     cursor = takeIndex;
   1111 
   1112                 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
   1113                     detach();
   1114                 else {
   1115                     this.prevCycles = cycles;
   1116                     this.prevTakeIndex = takeIndex;
   1117                 }
   1118             }
   1119         }
   1120 
   1121         /**
   1122          * Called when itrs should stop tracking this iterator, either
   1123          * because there are no more indices to update (cursor < 0 &&
   1124          * nextIndex < 0 && lastRet < 0) or as a special exception, when
   1125          * lastRet >= 0, because hasNext() is about to return false for the
   1126          * first time.  Call only from iterating thread.
   1127          */
   1128         private void detach() {
   1129             // Switch to detached mode
   1130             // assert lock.getHoldCount() == 1;
   1131             // assert cursor == NONE;
   1132             // assert nextIndex < 0;
   1133             // assert lastRet < 0 || nextItem == null;
   1134             // assert lastRet < 0 ^ lastItem != null;
   1135             if (prevTakeIndex >= 0) {
   1136                 // assert itrs != null;
   1137                 prevTakeIndex = DETACHED;
   1138                 // try to unlink from itrs (but not too hard)
   1139                 itrs.doSomeSweeping(true);
   1140             }
   1141         }
   1142 
   1143         /**
   1144          * For performance reasons, we would like not to acquire a lock in
   1145          * hasNext in the common case.  To allow for this, we only access
   1146          * fields (i.e. nextItem) that are not modified by update operations
   1147          * triggered by queue modifications.
   1148          */
   1149         public boolean hasNext() {
   1150             // assert lock.getHoldCount() == 0;
   1151             if (nextItem != null)
   1152                 return true;
   1153             noNext();
   1154             return false;
   1155         }
   1156 
   1157         private void noNext() {
   1158             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
   1159             lock.lock();
   1160             try {
   1161                 // assert cursor == NONE;
   1162                 // assert nextIndex == NONE;
   1163                 if (!isDetached()) {
   1164                     // assert lastRet >= 0;
   1165                     incorporateDequeues(); // might update lastRet
   1166                     if (lastRet >= 0) {
   1167                         lastItem = itemAt(lastRet);
   1168                         // assert lastItem != null;
   1169                         detach();
   1170                     }
   1171                 }
   1172                 // assert isDetached();
   1173                 // assert lastRet < 0 ^ lastItem != null;
   1174             } finally {
   1175                 lock.unlock();
   1176             }
   1177         }
   1178 
   1179         public E next() {
   1180             // assert lock.getHoldCount() == 0;
   1181             final E x = nextItem;
   1182             if (x == null)
   1183                 throw new NoSuchElementException();
   1184             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
   1185             lock.lock();
   1186             try {
   1187                 if (!isDetached())
   1188                     incorporateDequeues();
   1189                 // assert nextIndex != NONE;
   1190                 // assert lastItem == null;
   1191                 lastRet = nextIndex;
   1192                 final int cursor = this.cursor;
   1193                 if (cursor >= 0) {
   1194                     nextItem = itemAt(nextIndex = cursor);
   1195                     // assert nextItem != null;
   1196                     this.cursor = incCursor(cursor);
   1197                 } else {
   1198                     nextIndex = NONE;
   1199                     nextItem = null;
   1200                 }
   1201             } finally {
   1202                 lock.unlock();
   1203             }
   1204             return x;
   1205         }
   1206 
   1207         public void remove() {
   1208             // assert lock.getHoldCount() == 0;
   1209             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
   1210             lock.lock();
   1211             try {
   1212                 if (!isDetached())
   1213                     incorporateDequeues(); // might update lastRet or detach
   1214                 final int lastRet = this.lastRet;
   1215                 this.lastRet = NONE;
   1216                 if (lastRet >= 0) {
   1217                     if (!isDetached())
   1218                         removeAt(lastRet);
   1219                     else {
   1220                         final E lastItem = this.lastItem;
   1221                         // assert lastItem != null;
   1222                         this.lastItem = null;
   1223                         if (itemAt(lastRet) == lastItem)
   1224                             removeAt(lastRet);
   1225                     }
   1226                 } else if (lastRet == NONE)
   1227                     throw new IllegalStateException();
   1228                 // else lastRet == REMOVED and the last returned element was
   1229                 // previously asynchronously removed via an operation other
   1230                 // than this.remove(), so nothing to do.
   1231 
   1232                 if (cursor < 0 && nextIndex < 0)
   1233                     detach();
   1234             } finally {
   1235                 lock.unlock();
   1236                 // assert lastRet == NONE;
   1237                 // assert lastItem == null;
   1238             }
   1239         }
   1240 
   1241         /**
   1242          * Called to notify the iterator that the queue is empty, or that it
   1243          * has fallen hopelessly behind, so that it should abandon any
   1244          * further iteration, except possibly to return one more element
   1245          * from next(), as promised by returning true from hasNext().
   1246          */
   1247         void shutdown() {
   1248             // assert lock.getHoldCount() == 1;
   1249             cursor = NONE;
   1250             if (nextIndex >= 0)
   1251                 nextIndex = REMOVED;
   1252             if (lastRet >= 0) {
   1253                 lastRet = REMOVED;
   1254                 lastItem = null;
   1255             }
   1256             prevTakeIndex = DETACHED;
   1257             // Don't set nextItem to null because we must continue to be
   1258             // able to return it on next().
   1259             //
   1260             // Caller will unlink from itrs when convenient.
   1261         }
   1262 
   1263         private int distance(int index, int prevTakeIndex, int length) {
   1264             int distance = index - prevTakeIndex;
   1265             if (distance < 0)
   1266                 distance += length;
   1267             return distance;
   1268         }
   1269 
   1270         /**
   1271          * Called whenever an interior remove (not at takeIndex) occured.
   1272          *
   1273          * @return true if this iterator should be unlinked from itrs
   1274          */
   1275         boolean removedAt(int removedIndex) {
   1276             // assert lock.getHoldCount() == 1;
   1277             if (isDetached())
   1278                 return true;
   1279 
   1280             final int cycles = itrs.cycles;
   1281             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
   1282             final int prevCycles = this.prevCycles;
   1283             final int prevTakeIndex = this.prevTakeIndex;
   1284             final int len = items.length;
   1285             int cycleDiff = cycles - prevCycles;
   1286             if (removedIndex < takeIndex)
   1287                 cycleDiff++;
   1288             final int removedDistance =
   1289                 (cycleDiff * len) + (removedIndex - prevTakeIndex);
   1290             // assert removedDistance >= 0;
   1291             int cursor = this.cursor;
   1292             if (cursor >= 0) {
   1293                 int x = distance(cursor, prevTakeIndex, len);
   1294                 if (x == removedDistance) {
   1295                     if (cursor == putIndex)
   1296                         this.cursor = cursor = NONE;
   1297                 }
   1298                 else if (x > removedDistance) {
   1299                     // assert cursor != prevTakeIndex;
   1300                     this.cursor = cursor = dec(cursor);
   1301                 }
   1302             }
   1303             int lastRet = this.lastRet;
   1304             if (lastRet >= 0) {
   1305                 int x = distance(lastRet, prevTakeIndex, len);
   1306                 if (x == removedDistance)
   1307                     this.lastRet = lastRet = REMOVED;
   1308                 else if (x > removedDistance)
   1309                     this.lastRet = lastRet = dec(lastRet);
   1310             }
   1311             int nextIndex = this.nextIndex;
   1312             if (nextIndex >= 0) {
   1313                 int x = distance(nextIndex, prevTakeIndex, len);
   1314                 if (x == removedDistance)
   1315                     this.nextIndex = nextIndex = REMOVED;
   1316                 else if (x > removedDistance)
   1317                     this.nextIndex = nextIndex = dec(nextIndex);
   1318             }
   1319             else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
   1320                 this.prevTakeIndex = DETACHED;
   1321                 return true;
   1322             }
   1323             return false;
   1324         }
   1325 
   1326         /**
   1327          * Called whenever takeIndex wraps around to zero.
   1328          *
   1329          * @return true if this iterator should be unlinked from itrs
   1330          */
   1331         boolean takeIndexWrapped() {
   1332             // assert lock.getHoldCount() == 1;
   1333             if (isDetached())
   1334                 return true;
   1335             if (itrs.cycles - prevCycles > 1) {
   1336                 // All the elements that existed at the time of the last
   1337                 // operation are gone, so abandon further iteration.
   1338                 shutdown();
   1339                 return true;
   1340             }
   1341             return false;
   1342         }
   1343 
   1344 //         /** Uncomment for debugging. */
   1345 //         public String toString() {
   1346 //             return ("cursor=" + cursor + " " +
   1347 //                     "nextIndex=" + nextIndex + " " +
   1348 //                     "lastRet=" + lastRet + " " +
   1349 //                     "nextItem=" + nextItem + " " +
   1350 //                     "lastItem=" + lastItem + " " +
   1351 //                     "prevCycles=" + prevCycles + " " +
   1352 //                     "prevTakeIndex=" + prevTakeIndex + " " +
   1353 //                     "size()=" + size() + " " +
   1354 //                     "remainingCapacity()=" + remainingCapacity());
   1355 //         }
   1356     }
   1357 }
   1358