Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Written by Doug Lea with assistance from members of JCP JSR-166
      3  * Expert Group and released to the public domain, as explained at
      4  * http://creativecommons.org/publicdomain/zero/1.0/
      5  */
      6 
      7 package java.util.concurrent;
      8 
      9 import java.lang.ref.WeakReference;
     10 import java.util.AbstractQueue;
     11 import java.util.Arrays;
     12 import java.util.Collection;
     13 import java.util.Iterator;
     14 import java.util.NoSuchElementException;
     15 import java.util.Objects;
     16 import java.util.Spliterator;
     17 import java.util.Spliterators;
     18 import java.util.concurrent.locks.Condition;
     19 import java.util.concurrent.locks.ReentrantLock;
     20 
     21 // BEGIN android-note
     22 // removed link to collections framework docs
     23 // END android-note
     24 
     25 /**
     26  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
     27  * array.  This queue orders elements FIFO (first-in-first-out).  The
     28  * <em>head</em> of the queue is that element that has been on the
     29  * queue the longest time.  The <em>tail</em> of the queue is that
     30  * element that has been on the queue the shortest time. New elements
     31  * are inserted at the tail of the queue, and the queue retrieval
     32  * operations obtain elements at the head of the queue.
     33  *
     34  * <p>This is a classic &quot;bounded buffer&quot;, in which a
     35  * fixed-sized array holds elements inserted by producers and
     36  * extracted by consumers.  Once created, the capacity cannot be
     37  * changed.  Attempts to {@code put} an element into a full queue
     38  * will result in the operation blocking; attempts to {@code take} an
     39  * element from an empty queue will similarly block.
     40  *
     41  * <p>This class supports an optional fairness policy for ordering
     42  * waiting producer and consumer threads.  By default, this ordering
     43  * is not guaranteed. However, a queue constructed with fairness set
     44  * to {@code true} grants threads access in FIFO order. Fairness
     45  * generally decreases throughput but reduces variability and avoids
     46  * starvation.
     47  *
     48  * <p>This class and its iterator implement all of the
     49  * <em>optional</em> methods of the {@link Collection} and {@link
     50  * Iterator} interfaces.
     51  *
     52  * @since 1.5
     53  * @author Doug Lea
     54  * @param <E> the type of elements held in this queue
     55  */
     56 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
     57         implements BlockingQueue<E>, java.io.Serializable {
     58 
     59     /**
     60      * Serialization ID. This class relies on default serialization
     61      * even for the items array, which is default-serialized, even if
     62      * it is empty. Otherwise it could not be declared final, which is
     63      * necessary here.
     64      */
     65     private static final long serialVersionUID = -817911632652898426L;
     66 
     67     /** The queued items */
     68     final Object[] items;
     69 
     70     /** items index for next take, poll, peek or remove */
     71     int takeIndex;
     72 
     73     /** items index for next put, offer, or add */
     74     int putIndex;
     75 
     76     /** Number of elements in the queue */
     77     int count;
     78 
     79     /*
     80      * Concurrency control uses the classic two-condition algorithm
     81      * found in any textbook.
     82      */
     83 
     84     /** Main lock guarding all access */
     85     final ReentrantLock lock;
     86 
     87     /** Condition for waiting takes */
     88     private final Condition notEmpty;
     89 
     90     /** Condition for waiting puts */
     91     private final Condition notFull;
     92 
     93     /**
     94      * Shared state for currently active iterators, or null if there
     95      * are known not to be any.  Allows queue operations to update
     96      * iterator state.
     97      */
     98     transient Itrs itrs;
     99 
    100     // Internal helper methods
    101 
    102     /**
    103      * Circularly decrements array index i.
    104      */
    105     final int dec(int i) {
    106         return ((i == 0) ? items.length : i) - 1;
    107     }
    108 
    109     /**
    110      * Returns item at index i.
    111      */
    112     @SuppressWarnings("unchecked")
    113     final E itemAt(int i) {
    114         return (E) items[i];
    115     }
    116 
    117     /**
    118      * Inserts element at current put position, advances, and signals.
    119      * Call only when holding lock.
    120      */
    121     private void enqueue(E x) {
    122         // assert lock.getHoldCount() == 1;
    123         // assert items[putIndex] == null;
    124         final Object[] items = this.items;
    125         items[putIndex] = x;
    126         if (++putIndex == items.length) putIndex = 0;
    127         count++;
    128         notEmpty.signal();
    129     }
    130 
    131     /**
    132      * Extracts element at current take position, advances, and signals.
    133      * Call only when holding lock.
    134      */
    135     private E dequeue() {
    136         // assert lock.getHoldCount() == 1;
    137         // assert items[takeIndex] != null;
    138         final Object[] items = this.items;
    139         @SuppressWarnings("unchecked")
    140         E x = (E) items[takeIndex];
    141         items[takeIndex] = null;
    142         if (++takeIndex == items.length) takeIndex = 0;
    143         count--;
    144         if (itrs != null)
    145             itrs.elementDequeued();
    146         notFull.signal();
    147         return x;
    148     }
    149 
    150     /**
    151      * Deletes item at array index removeIndex.
    152      * Utility for remove(Object) and iterator.remove.
    153      * Call only when holding lock.
    154      */
    155     void removeAt(final int removeIndex) {
    156         // assert lock.getHoldCount() == 1;
    157         // assert items[removeIndex] != null;
    158         // assert removeIndex >= 0 && removeIndex < items.length;
    159         final Object[] items = this.items;
    160         if (removeIndex == takeIndex) {
    161             // removing front item; just advance
    162             items[takeIndex] = null;
    163             if (++takeIndex == items.length) takeIndex = 0;
    164             count--;
    165             if (itrs != null)
    166                 itrs.elementDequeued();
    167         } else {
    168             // an "interior" remove
    169 
    170             // slide over all others up through putIndex.
    171             for (int i = removeIndex, putIndex = this.putIndex;;) {
    172                 int pred = i;
    173                 if (++i == items.length) i = 0;
    174                 if (i == putIndex) {
    175                     items[pred] = null;
    176                     this.putIndex = pred;
    177                     break;
    178                 }
    179                 items[pred] = items[i];
    180             }
    181             count--;
    182             if (itrs != null)
    183                 itrs.removedAt(removeIndex);
    184         }
    185         notFull.signal();
    186     }
    187 
    188     /**
    189      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
    190      * capacity and default access policy.
    191      *
    192      * @param capacity the capacity of this queue
    193      * @throws IllegalArgumentException if {@code capacity < 1}
    194      */
    195     public ArrayBlockingQueue(int capacity) {
    196         this(capacity, false);
    197     }
    198 
    199     /**
    200      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
    201      * capacity and the specified access policy.
    202      *
    203      * @param capacity the capacity of this queue
    204      * @param fair if {@code true} then queue accesses for threads blocked
    205      *        on insertion or removal, are processed in FIFO order;
    206      *        if {@code false} the access order is unspecified.
    207      * @throws IllegalArgumentException if {@code capacity < 1}
    208      */
    209     public ArrayBlockingQueue(int capacity, boolean fair) {
    210         if (capacity <= 0)
    211             throw new IllegalArgumentException();
    212         this.items = new Object[capacity];
    213         lock = new ReentrantLock(fair);
    214         notEmpty = lock.newCondition();
    215         notFull =  lock.newCondition();
    216     }
    217 
    218     /**
    219      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
    220      * capacity, the specified access policy and initially containing the
    221      * elements of the given collection,
    222      * added in traversal order of the collection's iterator.
    223      *
    224      * @param capacity the capacity of this queue
    225      * @param fair if {@code true} then queue accesses for threads blocked
    226      *        on insertion or removal, are processed in FIFO order;
    227      *        if {@code false} the access order is unspecified.
    228      * @param c the collection of elements to initially contain
    229      * @throws IllegalArgumentException if {@code capacity} is less than
    230      *         {@code c.size()}, or less than 1.
    231      * @throws NullPointerException if the specified collection or any
    232      *         of its elements are null
    233      */
    234     public ArrayBlockingQueue(int capacity, boolean fair,
    235                               Collection<? extends E> c) {
    236         this(capacity, fair);
    237 
    238         final ReentrantLock lock = this.lock;
    239         lock.lock(); // Lock only for visibility, not mutual exclusion
    240         try {
    241             int i = 0;
    242             try {
    243                 for (E e : c)
    244                     items[i++] = Objects.requireNonNull(e);
    245             } catch (ArrayIndexOutOfBoundsException ex) {
    246                 throw new IllegalArgumentException();
    247             }
    248             count = i;
    249             putIndex = (i == capacity) ? 0 : i;
    250         } finally {
    251             lock.unlock();
    252         }
    253     }
    254 
    255     /**
    256      * Inserts the specified element at the tail of this queue if it is
    257      * possible to do so immediately without exceeding the queue's capacity,
    258      * returning {@code true} upon success and throwing an
    259      * {@code IllegalStateException} if this queue is full.
    260      *
    261      * @param e the element to add
    262      * @return {@code true} (as specified by {@link Collection#add})
    263      * @throws IllegalStateException if this queue is full
    264      * @throws NullPointerException if the specified element is null
    265      */
    266     public boolean add(E e) {
    267         return super.add(e);
    268     }
    269 
    270     /**
    271      * Inserts the specified element at the tail of this queue if it is
    272      * possible to do so immediately without exceeding the queue's capacity,
    273      * returning {@code true} upon success and {@code false} if this queue
    274      * is full.  This method is generally preferable to method {@link #add},
    275      * which can fail to insert an element only by throwing an exception.
    276      *
    277      * @throws NullPointerException if the specified element is null
    278      */
    279     public boolean offer(E e) {
    280         Objects.requireNonNull(e);
    281         final ReentrantLock lock = this.lock;
    282         lock.lock();
    283         try {
    284             if (count == items.length)
    285                 return false;
    286             else {
    287                 enqueue(e);
    288                 return true;
    289             }
    290         } finally {
    291             lock.unlock();
    292         }
    293     }
    294 
    295     /**
    296      * Inserts the specified element at the tail of this queue, waiting
    297      * for space to become available if the queue is full.
    298      *
    299      * @throws InterruptedException {@inheritDoc}
    300      * @throws NullPointerException {@inheritDoc}
    301      */
    302     public void put(E e) throws InterruptedException {
    303         Objects.requireNonNull(e);
    304         final ReentrantLock lock = this.lock;
    305         lock.lockInterruptibly();
    306         try {
    307             while (count == items.length)
    308                 notFull.await();
    309             enqueue(e);
    310         } finally {
    311             lock.unlock();
    312         }
    313     }
    314 
    315     /**
    316      * Inserts the specified element at the tail of this queue, waiting
    317      * up to the specified wait time for space to become available if
    318      * the queue is full.
    319      *
    320      * @throws InterruptedException {@inheritDoc}
    321      * @throws NullPointerException {@inheritDoc}
    322      */
    323     public boolean offer(E e, long timeout, TimeUnit unit)
    324         throws InterruptedException {
    325 
    326         Objects.requireNonNull(e);
    327         long nanos = unit.toNanos(timeout);
    328         final ReentrantLock lock = this.lock;
    329         lock.lockInterruptibly();
    330         try {
    331             while (count == items.length) {
    332                 if (nanos <= 0L)
    333                     return false;
    334                 nanos = notFull.awaitNanos(nanos);
    335             }
    336             enqueue(e);
    337             return true;
    338         } finally {
    339             lock.unlock();
    340         }
    341     }
    342 
    343     public E poll() {
    344         final ReentrantLock lock = this.lock;
    345         lock.lock();
    346         try {
    347             return (count == 0) ? null : dequeue();
    348         } finally {
    349             lock.unlock();
    350         }
    351     }
    352 
    353     public E take() throws InterruptedException {
    354         final ReentrantLock lock = this.lock;
    355         lock.lockInterruptibly();
    356         try {
    357             while (count == 0)
    358                 notEmpty.await();
    359             return dequeue();
    360         } finally {
    361             lock.unlock();
    362         }
    363     }
    364 
    365     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    366         long nanos = unit.toNanos(timeout);
    367         final ReentrantLock lock = this.lock;
    368         lock.lockInterruptibly();
    369         try {
    370             while (count == 0) {
    371                 if (nanos <= 0L)
    372                     return null;
    373                 nanos = notEmpty.awaitNanos(nanos);
    374             }
    375             return dequeue();
    376         } finally {
    377             lock.unlock();
    378         }
    379     }
    380 
    381     public E peek() {
    382         final ReentrantLock lock = this.lock;
    383         lock.lock();
    384         try {
    385             return itemAt(takeIndex); // null when queue is empty
    386         } finally {
    387             lock.unlock();
    388         }
    389     }
    390 
    391     // this doc comment is overridden to remove the reference to collections
    392     // greater in size than Integer.MAX_VALUE
    393     /**
    394      * Returns the number of elements in this queue.
    395      *
    396      * @return the number of elements in this queue
    397      */
    398     public int size() {
    399         final ReentrantLock lock = this.lock;
    400         lock.lock();
    401         try {
    402             return count;
    403         } finally {
    404             lock.unlock();
    405         }
    406     }
    407 
    408     // this doc comment is a modified copy of the inherited doc comment,
    409     // without the reference to unlimited queues.
    410     /**
    411      * Returns the number of additional elements that this queue can ideally
    412      * (in the absence of memory or resource constraints) accept without
    413      * blocking. This is always equal to the initial capacity of this queue
    414      * less the current {@code size} of this queue.
    415      *
    416      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
    417      * an element will succeed by inspecting {@code remainingCapacity}
    418      * because it may be the case that another thread is about to
    419      * insert or remove an element.
    420      */
    421     public int remainingCapacity() {
    422         final ReentrantLock lock = this.lock;
    423         lock.lock();
    424         try {
    425             return items.length - count;
    426         } finally {
    427             lock.unlock();
    428         }
    429     }
    430 
    431     /**
    432      * Removes a single instance of the specified element from this queue,
    433      * if it is present.  More formally, removes an element {@code e} such
    434      * that {@code o.equals(e)}, if this queue contains one or more such
    435      * elements.
    436      * Returns {@code true} if this queue contained the specified element
    437      * (or equivalently, if this queue changed as a result of the call).
    438      *
    439      * <p>Removal of interior elements in circular array based queues
    440      * is an intrinsically slow and disruptive operation, so should
    441      * be undertaken only in exceptional circumstances, ideally
    442      * only when the queue is known not to be accessible by other
    443      * threads.
    444      *
    445      * @param o element to be removed from this queue, if present
    446      * @return {@code true} if this queue changed as a result of the call
    447      */
    448     public boolean remove(Object o) {
    449         if (o == null) return false;
    450         final ReentrantLock lock = this.lock;
    451         lock.lock();
    452         try {
    453             if (count > 0) {
    454                 final Object[] items = this.items;
    455                 final int putIndex = this.putIndex;
    456                 int i = takeIndex;
    457                 do {
    458                     if (o.equals(items[i])) {
    459                         removeAt(i);
    460                         return true;
    461                     }
    462                     if (++i == items.length) i = 0;
    463                 } while (i != putIndex);
    464             }
    465             return false;
    466         } finally {
    467             lock.unlock();
    468         }
    469     }
    470 
    471     /**
    472      * Returns {@code true} if this queue contains the specified element.
    473      * More formally, returns {@code true} if and only if this queue contains
    474      * at least one element {@code e} such that {@code o.equals(e)}.
    475      *
    476      * @param o object to be checked for containment in this queue
    477      * @return {@code true} if this queue contains the specified element
    478      */
    479     public boolean contains(Object o) {
    480         if (o == null) return false;
    481         final ReentrantLock lock = this.lock;
    482         lock.lock();
    483         try {
    484             if (count > 0) {
    485                 final Object[] items = this.items;
    486                 final int putIndex = this.putIndex;
    487                 int i = takeIndex;
    488                 do {
    489                     if (o.equals(items[i]))
    490                         return true;
    491                     if (++i == items.length) i = 0;
    492                 } while (i != putIndex);
    493             }
    494             return false;
    495         } finally {
    496             lock.unlock();
    497         }
    498     }
    499 
    500     /**
    501      * Returns an array containing all of the elements in this queue, in
    502      * proper sequence.
    503      *
    504      * <p>The returned array will be "safe" in that no references to it are
    505      * maintained by this queue.  (In other words, this method must allocate
    506      * a new array).  The caller is thus free to modify the returned array.
    507      *
    508      * <p>This method acts as bridge between array-based and collection-based
    509      * APIs.
    510      *
    511      * @return an array containing all of the elements in this queue
    512      */
    513     public Object[] toArray() {
    514         final ReentrantLock lock = this.lock;
    515         lock.lock();
    516         try {
    517             final Object[] items = this.items;
    518             final int end = takeIndex + count;
    519             final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
    520             if (end != putIndex)
    521                 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
    522             return a;
    523         } finally {
    524             lock.unlock();
    525         }
    526     }
    527 
    528     /**
    529      * Returns an array containing all of the elements in this queue, in
    530      * proper sequence; the runtime type of the returned array is that of
    531      * the specified array.  If the queue fits in the specified array, it
    532      * is returned therein.  Otherwise, a new array is allocated with the
    533      * runtime type of the specified array and the size of this queue.
    534      *
    535      * <p>If this queue fits in the specified array with room to spare
    536      * (i.e., the array has more elements than this queue), the element in
    537      * the array immediately following the end of the queue is set to
    538      * {@code null}.
    539      *
    540      * <p>Like the {@link #toArray()} method, this method acts as bridge between
    541      * array-based and collection-based APIs.  Further, this method allows
    542      * precise control over the runtime type of the output array, and may,
    543      * under certain circumstances, be used to save allocation costs.
    544      *
    545      * <p>Suppose {@code x} is a queue known to contain only strings.
    546      * The following code can be used to dump the queue into a newly
    547      * allocated array of {@code String}:
    548      *
    549      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
    550      *
    551      * Note that {@code toArray(new Object[0])} is identical in function to
    552      * {@code toArray()}.
    553      *
    554      * @param a the array into which the elements of the queue are to
    555      *          be stored, if it is big enough; otherwise, a new array of the
    556      *          same runtime type is allocated for this purpose
    557      * @return an array containing all of the elements in this queue
    558      * @throws ArrayStoreException if the runtime type of the specified array
    559      *         is not a supertype of the runtime type of every element in
    560      *         this queue
    561      * @throws NullPointerException if the specified array is null
    562      */
    563     @SuppressWarnings("unchecked")
    564     public <T> T[] toArray(T[] a) {
    565         final ReentrantLock lock = this.lock;
    566         lock.lock();
    567         try {
    568             final Object[] items = this.items;
    569             final int count = this.count;
    570             final int firstLeg = Math.min(items.length - takeIndex, count);
    571             if (a.length < count) {
    572                 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
    573                                              a.getClass());
    574             } else {
    575                 System.arraycopy(items, takeIndex, a, 0, firstLeg);
    576                 if (a.length > count)
    577                     a[count] = null;
    578             }
    579             if (firstLeg < count)
    580                 System.arraycopy(items, 0, a, firstLeg, putIndex);
    581             return a;
    582         } finally {
    583             lock.unlock();
    584         }
    585     }
    586 
    587     public String toString() {
    588         return Helpers.collectionToString(this);
    589     }
    590 
    591     /**
    592      * Atomically removes all of the elements from this queue.
    593      * The queue will be empty after this call returns.
    594      */
    595     public void clear() {
    596         final ReentrantLock lock = this.lock;
    597         lock.lock();
    598         try {
    599             int k = count;
    600             if (k > 0) {
    601                 final Object[] items = this.items;
    602                 final int putIndex = this.putIndex;
    603                 int i = takeIndex;
    604                 do {
    605                     items[i] = null;
    606                     if (++i == items.length) i = 0;
    607                 } while (i != putIndex);
    608                 takeIndex = putIndex;
    609                 count = 0;
    610                 if (itrs != null)
    611                     itrs.queueIsEmpty();
    612                 for (; k > 0 && lock.hasWaiters(notFull); k--)
    613                     notFull.signal();
    614             }
    615         } finally {
    616             lock.unlock();
    617         }
    618     }
    619 
    620     /**
    621      * @throws UnsupportedOperationException {@inheritDoc}
    622      * @throws ClassCastException            {@inheritDoc}
    623      * @throws NullPointerException          {@inheritDoc}
    624      * @throws IllegalArgumentException      {@inheritDoc}
    625      */
    626     public int drainTo(Collection<? super E> c) {
    627         return drainTo(c, Integer.MAX_VALUE);
    628     }
    629 
    630     /**
    631      * @throws UnsupportedOperationException {@inheritDoc}
    632      * @throws ClassCastException            {@inheritDoc}
    633      * @throws NullPointerException          {@inheritDoc}
    634      * @throws IllegalArgumentException      {@inheritDoc}
    635      */
    636     public int drainTo(Collection<? super E> c, int maxElements) {
    637         Objects.requireNonNull(c);
    638         if (c == this)
    639             throw new IllegalArgumentException();
    640         if (maxElements <= 0)
    641             return 0;
    642         final Object[] items = this.items;
    643         final ReentrantLock lock = this.lock;
    644         lock.lock();
    645         try {
    646             int n = Math.min(maxElements, count);
    647             int take = takeIndex;
    648             int i = 0;
    649             try {
    650                 while (i < n) {
    651                     @SuppressWarnings("unchecked")
    652                     E x = (E) items[take];
    653                     c.add(x);
    654                     items[take] = null;
    655                     if (++take == items.length) take = 0;
    656                     i++;
    657                 }
    658                 return n;
    659             } finally {
    660                 // Restore invariants even if c.add() threw
    661                 if (i > 0) {
    662                     count -= i;
    663                     takeIndex = take;
    664                     if (itrs != null) {
    665                         if (count == 0)
    666                             itrs.queueIsEmpty();
    667                         else if (i > take)
    668                             itrs.takeIndexWrapped();
    669                     }
    670                     for (; i > 0 && lock.hasWaiters(notFull); i--)
    671                         notFull.signal();
    672                 }
    673             }
    674         } finally {
    675             lock.unlock();
    676         }
    677     }
    678 
    679     /**
    680      * Returns an iterator over the elements in this queue in proper sequence.
    681      * The elements will be returned in order from first (head) to last (tail).
    682      *
    683      * <p>The returned iterator is
    684      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
    685      *
    686      * @return an iterator over the elements in this queue in proper sequence
    687      */
    688     public Iterator<E> iterator() {
    689         return new Itr();
    690     }
    691 
    692     /**
    693      * Shared data between iterators and their queue, allowing queue
    694      * modifications to update iterators when elements are removed.
    695      *
    696      * This adds a lot of complexity for the sake of correctly
    697      * handling some uncommon operations, but the combination of
    698      * circular-arrays and supporting interior removes (i.e., those
    699      * not at head) would cause iterators to sometimes lose their
    700      * places and/or (re)report elements they shouldn't.  To avoid
    701      * this, when a queue has one or more iterators, it keeps iterator
    702      * state consistent by:
    703      *
    704      * (1) keeping track of the number of "cycles", that is, the
    705      *     number of times takeIndex has wrapped around to 0.
    706      * (2) notifying all iterators via the callback removedAt whenever
    707      *     an interior element is removed (and thus other elements may
    708      *     be shifted).
    709      *
    710      * These suffice to eliminate iterator inconsistencies, but
    711      * unfortunately add the secondary responsibility of maintaining
    712      * the list of iterators.  We track all active iterators in a
    713      * simple linked list (accessed only when the queue's lock is
    714      * held) of weak references to Itr.  The list is cleaned up using
    715      * 3 different mechanisms:
    716      *
    717      * (1) Whenever a new iterator is created, do some O(1) checking for
    718      *     stale list elements.
    719      *
    720      * (2) Whenever takeIndex wraps around to 0, check for iterators
    721      *     that have been unused for more than one wrap-around cycle.
    722      *
    723      * (3) Whenever the queue becomes empty, all iterators are notified
    724      *     and this entire data structure is discarded.
    725      *
    726      * So in addition to the removedAt callback that is necessary for
    727      * correctness, iterators have the shutdown and takeIndexWrapped
    728      * callbacks that help remove stale iterators from the list.
    729      *
    730      * Whenever a list element is examined, it is expunged if either
    731      * the GC has determined that the iterator is discarded, or if the
    732      * iterator reports that it is "detached" (does not need any
    733      * further state updates).  Overhead is maximal when takeIndex
    734      * never advances, iterators are discarded before they are
    735      * exhausted, and all removals are interior removes, in which case
    736      * all stale iterators are discovered by the GC.  But even in this
    737      * case we don't increase the amortized complexity.
    738      *
    739      * Care must be taken to keep list sweeping methods from
    740      * reentrantly invoking another such method, causing subtle
    741      * corruption bugs.
    742      */
    743     class Itrs {
    744 
    745         /**
    746          * Node in a linked list of weak iterator references.
    747          */
    748         private class Node extends WeakReference<Itr> {
    749             Node next;
    750 
    751             Node(Itr iterator, Node next) {
    752                 super(iterator);
    753                 this.next = next;
    754             }
    755         }
    756 
    757         /** Incremented whenever takeIndex wraps around to 0 */
    758         int cycles;
    759 
    760         /** Linked list of weak iterator references */
    761         private Node head;
    762 
    763         /** Used to expunge stale iterators */
    764         private Node sweeper;
    765 
    766         private static final int SHORT_SWEEP_PROBES = 4;
    767         private static final int LONG_SWEEP_PROBES = 16;
    768 
    769         Itrs(Itr initial) {
    770             register(initial);
    771         }
    772 
    773         /**
    774          * Sweeps itrs, looking for and expunging stale iterators.
    775          * If at least one was found, tries harder to find more.
    776          * Called only from iterating thread.
    777          *
    778          * @param tryHarder whether to start in try-harder mode, because
    779          * there is known to be at least one iterator to collect
    780          */
    781         void doSomeSweeping(boolean tryHarder) {
    782             // assert lock.getHoldCount() == 1;
    783             // assert head != null;
    784             int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
    785             Node o, p;
    786             final Node sweeper = this.sweeper;
    787             boolean passedGo;   // to limit search to one full sweep
    788 
    789             if (sweeper == null) {
    790                 o = null;
    791                 p = head;
    792                 passedGo = true;
    793             } else {
    794                 o = sweeper;
    795                 p = o.next;
    796                 passedGo = false;
    797             }
    798 
    799             for (; probes > 0; probes--) {
    800                 if (p == null) {
    801                     if (passedGo)
    802                         break;
    803                     o = null;
    804                     p = head;
    805                     passedGo = true;
    806                 }
    807                 final Itr it = p.get();
    808                 final Node next = p.next;
    809                 if (it == null || it.isDetached()) {
    810                     // found a discarded/exhausted iterator
    811                     probes = LONG_SWEEP_PROBES; // "try harder"
    812                     // unlink p
    813                     p.clear();
    814                     p.next = null;
    815                     if (o == null) {
    816                         head = next;
    817                         if (next == null) {
    818                             // We've run out of iterators to track; retire
    819                             itrs = null;
    820                             return;
    821                         }
    822                     }
    823                     else
    824                         o.next = next;
    825                 } else {
    826                     o = p;
    827                 }
    828                 p = next;
    829             }
    830 
    831             this.sweeper = (p == null) ? null : o;
    832         }
    833 
    834         /**
    835          * Adds a new iterator to the linked list of tracked iterators.
    836          */
    837         void register(Itr itr) {
    838             // assert lock.getHoldCount() == 1;
    839             head = new Node(itr, head);
    840         }
    841 
    842         /**
    843          * Called whenever takeIndex wraps around to 0.
    844          *
    845          * Notifies all iterators, and expunges any that are now stale.
    846          */
    847         void takeIndexWrapped() {
    848             // assert lock.getHoldCount() == 1;
    849             cycles++;
    850             for (Node o = null, p = head; p != null;) {
    851                 final Itr it = p.get();
    852                 final Node next = p.next;
    853                 if (it == null || it.takeIndexWrapped()) {
    854                     // unlink p
    855                     // assert it == null || it.isDetached();
    856                     p.clear();
    857                     p.next = null;
    858                     if (o == null)
    859                         head = next;
    860                     else
    861                         o.next = next;
    862                 } else {
    863                     o = p;
    864                 }
    865                 p = next;
    866             }
    867             if (head == null)   // no more iterators to track
    868                 itrs = null;
    869         }
    870 
    871         /**
    872          * Called whenever an interior remove (not at takeIndex) occurred.
    873          *
    874          * Notifies all iterators, and expunges any that are now stale.
    875          */
    876         void removedAt(int removedIndex) {
    877             for (Node o = null, p = head; p != null;) {
    878                 final Itr it = p.get();
    879                 final Node next = p.next;
    880                 if (it == null || it.removedAt(removedIndex)) {
    881                     // unlink p
    882                     // assert it == null || it.isDetached();
    883                     p.clear();
    884                     p.next = null;
    885                     if (o == null)
    886                         head = next;
    887                     else
    888                         o.next = next;
    889                 } else {
    890                     o = p;
    891                 }
    892                 p = next;
    893             }
    894             if (head == null)   // no more iterators to track
    895                 itrs = null;
    896         }
    897 
    898         /**
    899          * Called whenever the queue becomes empty.
    900          *
    901          * Notifies all active iterators that the queue is empty,
    902          * clears all weak refs, and unlinks the itrs datastructure.
    903          */
    904         void queueIsEmpty() {
    905             // assert lock.getHoldCount() == 1;
    906             for (Node p = head; p != null; p = p.next) {
    907                 Itr it = p.get();
    908                 if (it != null) {
    909                     p.clear();
    910                     it.shutdown();
    911                 }
    912             }
    913             head = null;
    914             itrs = null;
    915         }
    916 
    917         /**
    918          * Called whenever an element has been dequeued (at takeIndex).
    919          */
    920         void elementDequeued() {
    921             // assert lock.getHoldCount() == 1;
    922             if (count == 0)
    923                 queueIsEmpty();
    924             else if (takeIndex == 0)
    925                 takeIndexWrapped();
    926         }
    927     }
    928 
    929     /**
    930      * Iterator for ArrayBlockingQueue.
    931      *
    932      * To maintain weak consistency with respect to puts and takes, we
    933      * read ahead one slot, so as to not report hasNext true but then
    934      * not have an element to return.
    935      *
    936      * We switch into "detached" mode (allowing prompt unlinking from
    937      * itrs without help from the GC) when all indices are negative, or
    938      * when hasNext returns false for the first time.  This allows the
    939      * iterator to track concurrent updates completely accurately,
    940      * except for the corner case of the user calling Iterator.remove()
    941      * after hasNext() returned false.  Even in this case, we ensure
    942      * that we don't remove the wrong element by keeping track of the
    943      * expected element to remove, in lastItem.  Yes, we may fail to
    944      * remove lastItem from the queue if it moved due to an interleaved
    945      * interior remove while in detached mode.
    946      */
    947     private class Itr implements Iterator<E> {
    948         /** Index to look for new nextItem; NONE at end */
    949         private int cursor;
    950 
    951         /** Element to be returned by next call to next(); null if none */
    952         private E nextItem;
    953 
    954         /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
    955         private int nextIndex;
    956 
    957         /** Last element returned; null if none or not detached. */
    958         private E lastItem;
    959 
    960         /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
    961         private int lastRet;
    962 
    963         /** Previous value of takeIndex, or DETACHED when detached */
    964         private int prevTakeIndex;
    965 
    966         /** Previous value of iters.cycles */
    967         private int prevCycles;
    968 
    969         /** Special index value indicating "not available" or "undefined" */
    970         private static final int NONE = -1;
    971 
    972         /**
    973          * Special index value indicating "removed elsewhere", that is,
    974          * removed by some operation other than a call to this.remove().
    975          */
    976         private static final int REMOVED = -2;
    977 
    978         /** Special value for prevTakeIndex indicating "detached mode" */
    979         private static final int DETACHED = -3;
    980 
    981         Itr() {
    982             // assert lock.getHoldCount() == 0;
    983             lastRet = NONE;
    984             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    985             lock.lock();
    986             try {
    987                 if (count == 0) {
    988                     // assert itrs == null;
    989                     cursor = NONE;
    990                     nextIndex = NONE;
    991                     prevTakeIndex = DETACHED;
    992                 } else {
    993                     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
    994                     prevTakeIndex = takeIndex;
    995                     nextItem = itemAt(nextIndex = takeIndex);
    996                     cursor = incCursor(takeIndex);
    997                     if (itrs == null) {
    998                         itrs = new Itrs(this);
    999                     } else {
   1000                         itrs.register(this); // in this order
   1001                         itrs.doSomeSweeping(false);
   1002                     }
   1003                     prevCycles = itrs.cycles;
   1004                     // assert takeIndex >= 0;
   1005                     // assert prevTakeIndex == takeIndex;
   1006                     // assert nextIndex >= 0;
   1007                     // assert nextItem != null;
   1008                 }
   1009             } finally {
   1010                 lock.unlock();
   1011             }
   1012         }
   1013 
   1014         boolean isDetached() {
   1015             // assert lock.getHoldCount() == 1;
   1016             return prevTakeIndex < 0;
   1017         }
   1018 
   1019         private int incCursor(int index) {
   1020             // assert lock.getHoldCount() == 1;
   1021             if (++index == items.length) index = 0;
   1022             if (index == putIndex) index = NONE;
   1023             return index;
   1024         }
   1025 
   1026         /**
   1027          * Returns true if index is invalidated by the given number of
   1028          * dequeues, starting from prevTakeIndex.
   1029          */
   1030         private boolean invalidated(int index, int prevTakeIndex,
   1031                                     long dequeues, int length) {
   1032             if (index < 0)
   1033                 return false;
   1034             int distance = index - prevTakeIndex;
   1035             if (distance < 0)
   1036                 distance += length;
   1037             return dequeues > distance;
   1038         }
   1039 
   1040         /**
   1041          * Adjusts indices to incorporate all dequeues since the last
   1042          * operation on this iterator.  Call only from iterating thread.
   1043          */
   1044         private void incorporateDequeues() {
   1045             // assert lock.getHoldCount() == 1;
   1046             // assert itrs != null;
   1047             // assert !isDetached();
   1048             // assert count > 0;
   1049 
   1050             final int cycles = itrs.cycles;
   1051             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
   1052             final int prevCycles = this.prevCycles;
   1053             final int prevTakeIndex = this.prevTakeIndex;
   1054 
   1055             if (cycles != prevCycles || takeIndex != prevTakeIndex) {
   1056                 final int len = items.length;
   1057                 // how far takeIndex has advanced since the previous
   1058                 // operation of this iterator
   1059                 long dequeues = (cycles - prevCycles) * len
   1060                     + (takeIndex - prevTakeIndex);
   1061 
   1062                 // Check indices for invalidation
   1063                 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
   1064                     lastRet = REMOVED;
   1065                 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
   1066                     nextIndex = REMOVED;
   1067                 if (invalidated(cursor, prevTakeIndex, dequeues, len))
   1068                     cursor = takeIndex;
   1069 
   1070                 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
   1071                     detach();
   1072                 else {
   1073                     this.prevCycles = cycles;
   1074                     this.prevTakeIndex = takeIndex;
   1075                 }
   1076             }
   1077         }
   1078 
   1079         /**
   1080          * Called when itrs should stop tracking this iterator, either
   1081          * because there are no more indices to update (cursor < 0 &&
   1082          * nextIndex < 0 && lastRet < 0) or as a special exception, when
   1083          * lastRet >= 0, because hasNext() is about to return false for the
   1084          * first time.  Call only from iterating thread.
   1085          */
   1086         private void detach() {
   1087             // Switch to detached mode
   1088             // assert lock.getHoldCount() == 1;
   1089             // assert cursor == NONE;
   1090             // assert nextIndex < 0;
   1091             // assert lastRet < 0 || nextItem == null;
   1092             // assert lastRet < 0 ^ lastItem != null;
   1093             if (prevTakeIndex >= 0) {
   1094                 // assert itrs != null;
   1095                 prevTakeIndex = DETACHED;
   1096                 // try to unlink from itrs (but not too hard)
   1097                 itrs.doSomeSweeping(true);
   1098             }
   1099         }
   1100 
   1101         /**
   1102          * For performance reasons, we would like not to acquire a lock in
   1103          * hasNext in the common case.  To allow for this, we only access
   1104          * fields (i.e. nextItem) that are not modified by update operations
   1105          * triggered by queue modifications.
   1106          */
   1107         public boolean hasNext() {
   1108             // assert lock.getHoldCount() == 0;
   1109             if (nextItem != null)
   1110                 return true;
   1111             noNext();
   1112             return false;
   1113         }
   1114 
   1115         private void noNext() {
   1116             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
   1117             lock.lock();
   1118             try {
   1119                 // assert cursor == NONE;
   1120                 // assert nextIndex == NONE;
   1121                 if (!isDetached()) {
   1122                     // assert lastRet >= 0;
   1123                     incorporateDequeues(); // might update lastRet
   1124                     if (lastRet >= 0) {
   1125                         lastItem = itemAt(lastRet);
   1126                         // assert lastItem != null;
   1127                         detach();
   1128                     }
   1129                 }
   1130                 // assert isDetached();
   1131                 // assert lastRet < 0 ^ lastItem != null;
   1132             } finally {
   1133                 lock.unlock();
   1134             }
   1135         }
   1136 
   1137         public E next() {
   1138             // assert lock.getHoldCount() == 0;
   1139             final E x = nextItem;
   1140             if (x == null)
   1141                 throw new NoSuchElementException();
   1142             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
   1143             lock.lock();
   1144             try {
   1145                 if (!isDetached())
   1146                     incorporateDequeues();
   1147                 // assert nextIndex != NONE;
   1148                 // assert lastItem == null;
   1149                 lastRet = nextIndex;
   1150                 final int cursor = this.cursor;
   1151                 if (cursor >= 0) {
   1152                     nextItem = itemAt(nextIndex = cursor);
   1153                     // assert nextItem != null;
   1154                     this.cursor = incCursor(cursor);
   1155                 } else {
   1156                     nextIndex = NONE;
   1157                     nextItem = null;
   1158                 }
   1159             } finally {
   1160                 lock.unlock();
   1161             }
   1162             return x;
   1163         }
   1164 
   1165         public void remove() {
   1166             // assert lock.getHoldCount() == 0;
   1167             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
   1168             lock.lock();
   1169             try {
   1170                 if (!isDetached())
   1171                     incorporateDequeues(); // might update lastRet or detach
   1172                 final int lastRet = this.lastRet;
   1173                 this.lastRet = NONE;
   1174                 if (lastRet >= 0) {
   1175                     if (!isDetached())
   1176                         removeAt(lastRet);
   1177                     else {
   1178                         final E lastItem = this.lastItem;
   1179                         // assert lastItem != null;
   1180                         this.lastItem = null;
   1181                         if (itemAt(lastRet) == lastItem)
   1182                             removeAt(lastRet);
   1183                     }
   1184                 } else if (lastRet == NONE)
   1185                     throw new IllegalStateException();
   1186                 // else lastRet == REMOVED and the last returned element was
   1187                 // previously asynchronously removed via an operation other
   1188                 // than this.remove(), so nothing to do.
   1189 
   1190                 if (cursor < 0 && nextIndex < 0)
   1191                     detach();
   1192             } finally {
   1193                 lock.unlock();
   1194                 // assert lastRet == NONE;
   1195                 // assert lastItem == null;
   1196             }
   1197         }
   1198 
   1199         /**
   1200          * Called to notify the iterator that the queue is empty, or that it
   1201          * has fallen hopelessly behind, so that it should abandon any
   1202          * further iteration, except possibly to return one more element
   1203          * from next(), as promised by returning true from hasNext().
   1204          */
   1205         void shutdown() {
   1206             // assert lock.getHoldCount() == 1;
   1207             cursor = NONE;
   1208             if (nextIndex >= 0)
   1209                 nextIndex = REMOVED;
   1210             if (lastRet >= 0) {
   1211                 lastRet = REMOVED;
   1212                 lastItem = null;
   1213             }
   1214             prevTakeIndex = DETACHED;
   1215             // Don't set nextItem to null because we must continue to be
   1216             // able to return it on next().
   1217             //
   1218             // Caller will unlink from itrs when convenient.
   1219         }
   1220 
   1221         private int distance(int index, int prevTakeIndex, int length) {
   1222             int distance = index - prevTakeIndex;
   1223             if (distance < 0)
   1224                 distance += length;
   1225             return distance;
   1226         }
   1227 
   1228         /**
   1229          * Called whenever an interior remove (not at takeIndex) occurred.
   1230          *
   1231          * @return true if this iterator should be unlinked from itrs
   1232          */
   1233         boolean removedAt(int removedIndex) {
   1234             // assert lock.getHoldCount() == 1;
   1235             if (isDetached())
   1236                 return true;
   1237 
   1238             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
   1239             final int prevTakeIndex = this.prevTakeIndex;
   1240             final int len = items.length;
   1241             // distance from prevTakeIndex to removedIndex
   1242             final int removedDistance =
   1243                 len * (itrs.cycles - this.prevCycles
   1244                        + ((removedIndex < takeIndex) ? 1 : 0))
   1245                 + (removedIndex - prevTakeIndex);
   1246             // assert itrs.cycles - this.prevCycles >= 0;
   1247             // assert itrs.cycles - this.prevCycles <= 1;
   1248             // assert removedDistance > 0;
   1249             // assert removedIndex != takeIndex;
   1250             int cursor = this.cursor;
   1251             if (cursor >= 0) {
   1252                 int x = distance(cursor, prevTakeIndex, len);
   1253                 if (x == removedDistance) {
   1254                     if (cursor == putIndex)
   1255                         this.cursor = cursor = NONE;
   1256                 }
   1257                 else if (x > removedDistance) {
   1258                     // assert cursor != prevTakeIndex;
   1259                     this.cursor = cursor = dec(cursor);
   1260                 }
   1261             }
   1262             int lastRet = this.lastRet;
   1263             if (lastRet >= 0) {
   1264                 int x = distance(lastRet, prevTakeIndex, len);
   1265                 if (x == removedDistance)
   1266                     this.lastRet = lastRet = REMOVED;
   1267                 else if (x > removedDistance)
   1268                     this.lastRet = lastRet = dec(lastRet);
   1269             }
   1270             int nextIndex = this.nextIndex;
   1271             if (nextIndex >= 0) {
   1272                 int x = distance(nextIndex, prevTakeIndex, len);
   1273                 if (x == removedDistance)
   1274                     this.nextIndex = nextIndex = REMOVED;
   1275                 else if (x > removedDistance)
   1276                     this.nextIndex = nextIndex = dec(nextIndex);
   1277             }
   1278             if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
   1279                 this.prevTakeIndex = DETACHED;
   1280                 return true;
   1281             }
   1282             return false;
   1283         }
   1284 
   1285         /**
   1286          * Called whenever takeIndex wraps around to zero.
   1287          *
   1288          * @return true if this iterator should be unlinked from itrs
   1289          */
   1290         boolean takeIndexWrapped() {
   1291             // assert lock.getHoldCount() == 1;
   1292             if (isDetached())
   1293                 return true;
   1294             if (itrs.cycles - prevCycles > 1) {
   1295                 // All the elements that existed at the time of the last
   1296                 // operation are gone, so abandon further iteration.
   1297                 shutdown();
   1298                 return true;
   1299             }
   1300             return false;
   1301         }
   1302 
   1303 //         /** Uncomment for debugging. */
   1304 //         public String toString() {
   1305 //             return ("cursor=" + cursor + " " +
   1306 //                     "nextIndex=" + nextIndex + " " +
   1307 //                     "lastRet=" + lastRet + " " +
   1308 //                     "nextItem=" + nextItem + " " +
   1309 //                     "lastItem=" + lastItem + " " +
   1310 //                     "prevCycles=" + prevCycles + " " +
   1311 //                     "prevTakeIndex=" + prevTakeIndex + " " +
   1312 //                     "size()=" + size() + " " +
   1313 //                     "remainingCapacity()=" + remainingCapacity());
   1314 //         }
   1315     }
   1316 
   1317     /**
   1318      * Returns a {@link Spliterator} over the elements in this queue.
   1319      *
   1320      * <p>The returned spliterator is
   1321      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
   1322      *
   1323      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
   1324      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
   1325      *
   1326      * @implNote
   1327      * The {@code Spliterator} implements {@code trySplit} to permit limited
   1328      * parallelism.
   1329      *
   1330      * @return a {@code Spliterator} over the elements in this queue
   1331      * @since 1.8
   1332      */
   1333     public Spliterator<E> spliterator() {
   1334         return Spliterators.spliterator
   1335             (this, (Spliterator.ORDERED |
   1336                     Spliterator.NONNULL |
   1337                     Spliterator.CONCURRENT));
   1338     }
   1339 
   1340 }
   1341