Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2010 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import com.google.common.collect.ObjectArrays;
     20 
     21 import java.util.AbstractQueue;
     22 import java.util.Collection;
     23 import java.util.ConcurrentModificationException;
     24 import java.util.Iterator;
     25 import java.util.NoSuchElementException;
     26 import java.util.concurrent.BlockingQueue;
     27 import java.util.concurrent.TimeUnit;
     28 
     29 import javax.annotation.Nullable;
     30 
     31 /**
     32  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
     33  * array.  This queue orders elements FIFO (first-in-first-out).  The
     34  * <em>head</em> of the queue is that element that has been on the
     35  * queue the longest time.  The <em>tail</em> of the queue is that
     36  * element that has been on the queue the shortest time. New elements
     37  * are inserted at the tail of the queue, and the queue retrieval
     38  * operations obtain elements at the head of the queue.
     39  *
     40  * <p>This is a classic &quot;bounded buffer&quot;, in which a
     41  * fixed-sized array holds elements inserted by producers and
     42  * extracted by consumers.  Once created, the capacity cannot be
     43  * increased.  Attempts to <tt>put</tt> an element into a full queue
     44  * will result in the operation blocking; attempts to <tt>take</tt> an
     45  * element from an empty queue will similarly block.
     46  *
     47  * <p> This class supports an optional fairness policy for ordering
     48  * waiting producer and consumer threads.  By default, this ordering
     49  * is not guaranteed. However, a queue constructed with fairness set
     50  * to <tt>true</tt> grants threads access in FIFO order. Fairness
     51  * generally decreases throughput but reduces variability and avoids
     52  * starvation.
     53  *
     54  * <p>This class and its iterator implement all of the
     55  * <em>optional</em> methods of the {@link Collection} and {@link
     56  * Iterator} interfaces.
     57  *
     58  * @author Doug Lea
     59  * @author Justin T. Sampson
     60  * @param <E> the type of elements held in this collection
     61  */
     62 public class MonitorBasedArrayBlockingQueue<E> extends AbstractQueue<E>
     63         implements BlockingQueue<E> {
     64 
     65     // Based on revision 1.58 of ArrayBlockingQueue by Doug Lea, from
     66     // http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/
     67 
     68     /** The queued items  */
     69     final E[] items;
     70     /** items index for next take, poll or remove */
     71     int takeIndex;
     72     /** items index for next put, offer, or add. */
     73     int putIndex;
     74     /** Number of items in the queue */
     75     private int count;
     76 
     77     /*
     78      * Concurrency control uses the classic two-condition algorithm
     79      * found in any textbook.
     80      */
     81 
     82     /** Monitor guarding all access */
     83     final Monitor monitor;
     84 
     85     /** Guard for waiting takes */
     86     private final Monitor.Guard notEmpty;
     87 
     88     /** Guard for waiting puts */
     89     private final Monitor.Guard notFull;
     90 
     91     // Internal helper methods
     92 
     93     /**
     94      * Circularly increment i.
     95      */
     96     final int inc(int i) {
     97         return (++i == items.length) ? 0 : i;
     98     }
     99 
    100     /**
    101      * Inserts element at current put position, advances, and signals.
    102      * Call only when occupying monitor.
    103      */
    104     private void insert(E x) {
    105         items[putIndex] = x;
    106         putIndex = inc(putIndex);
    107         ++count;
    108     }
    109 
    110     /**
    111      * Extracts element at current take position, advances, and signals.
    112      * Call only when occupying monitor.
    113      */
    114     private E extract() {
    115         final E[] items = this.items;
    116         E x = items[takeIndex];
    117         items[takeIndex] = null;
    118         takeIndex = inc(takeIndex);
    119         --count;
    120         return x;
    121     }
    122 
    123     /**
    124      * Utility for remove and iterator.remove: Delete item at position i.
    125      * Call only when occupying monitor.
    126      */
    127     void removeAt(int i) {
    128         final E[] items = this.items;
    129         // if removing front item, just advance
    130         if (i == takeIndex) {
    131             items[takeIndex] = null;
    132             takeIndex = inc(takeIndex);
    133         } else {
    134             // slide over all others up through putIndex.
    135             for (;;) {
    136                 int nexti = inc(i);
    137                 if (nexti != putIndex) {
    138                     items[i] = items[nexti];
    139                     i = nexti;
    140                 } else {
    141                     items[i] = null;
    142                     putIndex = i;
    143                     break;
    144                 }
    145             }
    146         }
    147         --count;
    148     }
    149 
    150     /**
    151      * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed)
    152      * capacity and default access policy.
    153      *
    154      * @param capacity the capacity of this queue
    155      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
    156      */
    157     public MonitorBasedArrayBlockingQueue(int capacity) {
    158         this(capacity, false);
    159     }
    160 
    161     /**
    162      * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed)
    163      * capacity and the specified access policy.
    164      *
    165      * @param capacity the capacity of this queue
    166      * @param fair if <tt>true</tt> then queue accesses for threads blocked
    167      *        on insertion or removal, are processed in FIFO order;
    168      *        if <tt>false</tt> the access order is unspecified.
    169      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
    170      */
    171     public MonitorBasedArrayBlockingQueue(int capacity, boolean fair) {
    172         if (capacity <= 0)
    173             throw new IllegalArgumentException();
    174         this.items = newEArray(capacity);
    175         monitor = new Monitor(fair);
    176         notEmpty = new Monitor.Guard(monitor) {
    177             @Override public boolean isSatisfied() {
    178                 return count > 0;
    179             }
    180         };
    181         notFull = new Monitor.Guard(monitor) {
    182             @Override public boolean isSatisfied() {
    183                 return count < items.length;
    184             }
    185         };
    186     }
    187 
    188     @SuppressWarnings("unchecked") // please don't try this home, kids
    189     private static <E> E[] newEArray(int capacity) {
    190         return (E[]) new Object[capacity];
    191     }
    192 
    193     /**
    194      * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed)
    195      * capacity, the specified access policy and initially containing the
    196      * elements of the given collection,
    197      * added in traversal order of the collection's iterator.
    198      *
    199      * @param capacity the capacity of this queue
    200      * @param fair if <tt>true</tt> then queue accesses for threads blocked
    201      *        on insertion or removal, are processed in FIFO order;
    202      *        if <tt>false</tt> the access order is unspecified.
    203      * @param c the collection of elements to initially contain
    204      * @throws IllegalArgumentException if <tt>capacity</tt> is less than
    205      *         <tt>c.size()</tt>, or less than 1.
    206      * @throws NullPointerException if the specified collection or any
    207      *         of its elements are null
    208      */
    209     public MonitorBasedArrayBlockingQueue(int capacity, boolean fair,
    210                               Collection<? extends E> c) {
    211         this(capacity, fair);
    212         if (capacity < c.size())
    213             throw new IllegalArgumentException();
    214 
    215         for (E e : c)
    216             add(e);
    217     }
    218 
    219     /**
    220      * Inserts the specified element at the tail of this queue if it is
    221      * possible to do so immediately without exceeding the queue's capacity,
    222      * returning <tt>true</tt> upon success and throwing an
    223      * <tt>IllegalStateException</tt> if this queue is full.
    224      *
    225      * @param e the element to add
    226      * @return <tt>true</tt> (as specified by {@link Collection#add})
    227      * @throws IllegalStateException if this queue is full
    228      * @throws NullPointerException if the specified element is null
    229      */
    230     @Override public boolean add(E e) {
    231         return super.add(e);
    232     }
    233 
    234     /**
    235      * Inserts the specified element at the tail of this queue if it is
    236      * possible to do so immediately without exceeding the queue's capacity,
    237      * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
    238      * is full.  This method is generally preferable to method {@link #add},
    239      * which can fail to insert an element only by throwing an exception.
    240      *
    241      * @throws NullPointerException if the specified element is null
    242      */
    243     @Override
    244     public boolean offer(E e) {
    245         if (e == null) throw new NullPointerException();
    246         final Monitor monitor = this.monitor;
    247         if (monitor.enterIf(notFull)) {
    248             try {
    249                 insert(e);
    250                 return true;
    251             } finally {
    252                 monitor.leave();
    253             }
    254         } else {
    255           return false;
    256         }
    257     }
    258 
    259     /**
    260      * Inserts the specified element at the tail of this queue, waiting
    261      * for space to become available if the queue is full.
    262      *
    263      * @throws InterruptedException {@inheritDoc}
    264      * @throws NullPointerException {@inheritDoc}
    265      */
    266     @Override
    267     public void put(E e) throws InterruptedException {
    268         if (e == null) throw new NullPointerException();
    269         final Monitor monitor = this.monitor;
    270         monitor.enterWhen(notFull);
    271         try {
    272             insert(e);
    273         } finally {
    274             monitor.leave();
    275         }
    276     }
    277 
    278     /**
    279      * Inserts the specified element at the tail of this queue, waiting
    280      * up to the specified wait time for space to become available if
    281      * the queue is full.
    282      *
    283      * @throws InterruptedException {@inheritDoc}
    284      * @throws NullPointerException {@inheritDoc}
    285      */
    286     @Override
    287     public boolean offer(E e, long timeout, TimeUnit unit)
    288         throws InterruptedException {
    289 
    290         if (e == null) throw new NullPointerException();
    291         final Monitor monitor = this.monitor;
    292         if (monitor.enterWhen(notFull, timeout, unit)) {
    293             try {
    294                 insert(e);
    295                 return true;
    296             } finally {
    297                 monitor.leave();
    298             }
    299         } else {
    300           return false;
    301         }
    302     }
    303 
    304     @Override
    305     public E poll() {
    306         final Monitor monitor = this.monitor;
    307         if (monitor.enterIf(notEmpty)) {
    308             try {
    309                 return extract();
    310             } finally {
    311                 monitor.leave();
    312             }
    313         } else {
    314           return null;
    315         }
    316     }
    317 
    318     @Override
    319     public E take() throws InterruptedException {
    320         final Monitor monitor = this.monitor;
    321         monitor.enterWhen(notEmpty);
    322         try {
    323             return extract();
    324         } finally {
    325             monitor.leave();
    326         }
    327     }
    328 
    329     @Override
    330     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    331         final Monitor monitor = this.monitor;
    332         if (monitor.enterWhen(notEmpty, timeout, unit)) {
    333             try {
    334                 return extract();
    335             } finally {
    336                 monitor.leave();
    337             }
    338         } else {
    339           return null;
    340         }
    341     }
    342 
    343     @Override
    344     public E peek() {
    345         final Monitor monitor = this.monitor;
    346         if (monitor.enterIf(notEmpty)) {
    347             try {
    348                 return items[takeIndex];
    349             } finally {
    350                 monitor.leave();
    351             }
    352         } else {
    353             return null;
    354         }
    355     }
    356 
    357     // this doc comment is overridden to remove the reference to collections
    358     // greater in size than Integer.MAX_VALUE
    359     /**
    360      * Returns the number of elements in this queue.
    361      *
    362      * @return the number of elements in this queue
    363      */
    364     @Override public int size() {
    365         final Monitor monitor = this.monitor;
    366         monitor.enter();
    367         try {
    368             return count;
    369         } finally {
    370             monitor.leave();
    371         }
    372     }
    373 
    374     // this doc comment is a modified copy of the inherited doc comment,
    375     // without the reference to unlimited queues.
    376     /**
    377      * Returns the number of additional elements that this queue can ideally
    378      * (in the absence of memory or resource constraints) accept without
    379      * blocking. This is always equal to the initial capacity of this queue
    380      * less the current <tt>size</tt> of this queue.
    381      *
    382      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
    383      * an element will succeed by inspecting <tt>remainingCapacity</tt>
    384      * because it may be the case that another thread is about to
    385      * insert or remove an element.
    386      */
    387     @Override
    388     public int remainingCapacity() {
    389         final Monitor monitor = this.monitor;
    390         monitor.enter();
    391         try {
    392             return items.length - count;
    393         } finally {
    394             monitor.leave();
    395         }
    396     }
    397 
    398     /**
    399      * Removes a single instance of the specified element from this queue,
    400      * if it is present.  More formally, removes an element <tt>e</tt> such
    401      * that <tt>o.equals(e)</tt>, if this queue contains one or more such
    402      * elements.
    403      * Returns <tt>true</tt> if this queue contained the specified element
    404      * (or equivalently, if this queue changed as a result of the call).
    405      *
    406      * @param o element to be removed from this queue, if present
    407      * @return <tt>true</tt> if this queue changed as a result of the call
    408      */
    409     @Override public boolean remove(@Nullable Object o) {
    410         if (o == null) return false;
    411         final E[] items = this.items;
    412         final Monitor monitor = this.monitor;
    413         monitor.enter();
    414         try {
    415             int i = takeIndex;
    416             int k = 0;
    417             for (;;) {
    418                 if (k++ >= count)
    419                     return false;
    420                 if (o.equals(items[i])) {
    421                     removeAt(i);
    422                     return true;
    423                 }
    424                 i = inc(i);
    425             }
    426         } finally {
    427             monitor.leave();
    428         }
    429     }
    430 
    431     /**
    432      * Returns <tt>true</tt> if this queue contains the specified element.
    433      * More formally, returns <tt>true</tt> if and only if this queue contains
    434      * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
    435      *
    436      * @param o object to be checked for containment in this queue
    437      * @return <tt>true</tt> if this queue contains the specified element
    438      */
    439     @Override public boolean contains(@Nullable Object o) {
    440         if (o == null) return false;
    441         final E[] items = this.items;
    442         final Monitor monitor = this.monitor;
    443         monitor.enter();
    444         try {
    445             int i = takeIndex;
    446             int k = 0;
    447             while (k++ < count) {
    448                 if (o.equals(items[i]))
    449                     return true;
    450                 i = inc(i);
    451             }
    452             return false;
    453         } finally {
    454             monitor.leave();
    455         }
    456     }
    457 
    458     /**
    459      * Returns an array containing all of the elements in this queue, in
    460      * proper sequence.
    461      *
    462      * <p>The returned array will be "safe" in that no references to it are
    463      * maintained by this queue.  (In other words, this method must allocate
    464      * a new array).  The caller is thus free to modify the returned array.
    465      *
    466      * <p>This method acts as bridge between array-based and collection-based
    467      * APIs.
    468      *
    469      * @return an array containing all of the elements in this queue
    470      */
    471     @Override public Object[] toArray() {
    472         final E[] items = this.items;
    473         final Monitor monitor = this.monitor;
    474         monitor.enter();
    475         try {
    476             Object[] a = new Object[count];
    477             int k = 0;
    478             int i = takeIndex;
    479             while (k < count) {
    480                 a[k++] = items[i];
    481                 i = inc(i);
    482             }
    483             return a;
    484         } finally {
    485             monitor.leave();
    486         }
    487     }
    488 
    489     /**
    490      * Returns an array containing all of the elements in this queue, in
    491      * proper sequence; the runtime type of the returned array is that of
    492      * the specified array.  If the queue fits in the specified array, it
    493      * is returned therein.  Otherwise, a new array is allocated with the
    494      * runtime type of the specified array and the size of this queue.
    495      *
    496      * <p>If this queue fits in the specified array with room to spare
    497      * (i.e., the array has more elements than this queue), the element in
    498      * the array immediately following the end of the queue is set to
    499      * <tt>null</tt>.
    500      *
    501      * <p>Like the {@link #toArray()} method, this method acts as bridge between
    502      * array-based and collection-based APIs.  Further, this method allows
    503      * precise control over the runtime type of the output array, and may,
    504      * under certain circumstances, be used to save allocation costs.
    505      *
    506      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
    507      * The following code can be used to dump the queue into a newly
    508      * allocated array of <tt>String</tt>:
    509      *
    510      * <pre>
    511      *     String[] y = x.toArray(new String[0]);</pre>
    512      *
    513      * <p>Note that <tt>toArray(new Object[0])</tt> is identical in function to
    514      * <tt>toArray()</tt>.
    515      *
    516      * @param a the array into which the elements of the queue are to
    517      *          be stored, if it is big enough; otherwise, a new array of the
    518      *          same runtime type is allocated for this purpose
    519      * @return an array containing all of the elements in this queue
    520      * @throws ArrayStoreException if the runtime type of the specified array
    521      *         is not a supertype of the runtime type of every element in
    522      *         this queue
    523      * @throws NullPointerException if the specified array is null
    524      */
    525     @Override public <T> T[] toArray(T[] a) {
    526         final E[] items = this.items;
    527         final Monitor monitor = this.monitor;
    528         monitor.enter();
    529         try {
    530             if (a.length < count)
    531                 a = ObjectArrays.newArray(a, count);
    532 
    533             int k = 0;
    534             int i = takeIndex;
    535             while (k < count) {
    536                 // This cast is not itself safe, but the following statement
    537                 // will fail if the runtime type of items[i] is not assignable
    538                 // to the runtime type of a[k++], which is all that the method
    539                 // contract requires (see @throws ArrayStoreException above).
    540                 @SuppressWarnings("unchecked")
    541                 T t = (T) items[i];
    542                 a[k++] = t;
    543                 i = inc(i);
    544             }
    545             if (a.length > count)
    546                 a[count] = null;
    547             return a;
    548         } finally {
    549             monitor.leave();
    550         }
    551     }
    552 
    553     @Override public String toString() {
    554         final Monitor monitor = this.monitor;
    555         monitor.enter();
    556         try {
    557             return super.toString();
    558         } finally {
    559             monitor.leave();
    560         }
    561     }
    562 
    563     /**
    564      * Atomically removes all of the elements from this queue.
    565      * The queue will be empty after this call returns.
    566      */
    567     @Override public void clear() {
    568         final E[] items = this.items;
    569         final Monitor monitor = this.monitor;
    570         monitor.enter();
    571         try {
    572             int i = takeIndex;
    573             int k = count;
    574             while (k-- > 0) {
    575                 items[i] = null;
    576                 i = inc(i);
    577             }
    578             count = 0;
    579             putIndex = 0;
    580             takeIndex = 0;
    581         } finally {
    582             monitor.leave();
    583         }
    584     }
    585 
    586     /**
    587      * @throws UnsupportedOperationException {@inheritDoc}
    588      * @throws ClassCastException            {@inheritDoc}
    589      * @throws NullPointerException          {@inheritDoc}
    590      * @throws IllegalArgumentException      {@inheritDoc}
    591      */
    592     @Override
    593     public int drainTo(Collection<? super E> c) {
    594         if (c == null)
    595             throw new NullPointerException();
    596         if (c == this)
    597             throw new IllegalArgumentException();
    598         final E[] items = this.items;
    599         final Monitor monitor = this.monitor;
    600         monitor.enter();
    601         try {
    602             int i = takeIndex;
    603             int n = 0;
    604             int max = count;
    605             while (n < max) {
    606                 c.add(items[i]);
    607                 items[i] = null;
    608                 i = inc(i);
    609                 ++n;
    610             }
    611             if (n > 0) {
    612                 count = 0;
    613                 putIndex = 0;
    614                 takeIndex = 0;
    615             }
    616             return n;
    617         } finally {
    618             monitor.leave();
    619         }
    620     }
    621 
    622     /**
    623      * @throws UnsupportedOperationException {@inheritDoc}
    624      * @throws ClassCastException            {@inheritDoc}
    625      * @throws NullPointerException          {@inheritDoc}
    626      * @throws IllegalArgumentException      {@inheritDoc}
    627      */
    628     @Override
    629     public int drainTo(Collection<? super E> c, int maxElements) {
    630         if (c == null)
    631             throw new NullPointerException();
    632         if (c == this)
    633             throw new IllegalArgumentException();
    634         if (maxElements <= 0)
    635             return 0;
    636         final E[] items = this.items;
    637         final Monitor monitor = this.monitor;
    638         monitor.enter();
    639         try {
    640             int i = takeIndex;
    641             int n = 0;
    642             int max = (maxElements < count) ? maxElements : count;
    643             while (n < max) {
    644                 c.add(items[i]);
    645                 items[i] = null;
    646                 i = inc(i);
    647                 ++n;
    648             }
    649             if (n > 0) {
    650                 count -= n;
    651                 takeIndex = i;
    652             }
    653             return n;
    654         } finally {
    655             monitor.leave();
    656         }
    657     }
    658 
    659     /**
    660      * Returns an iterator over the elements in this queue in proper sequence.
    661      * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
    662      * will never throw {@link ConcurrentModificationException},
    663      * and guarantees to traverse elements as they existed upon
    664      * construction of the iterator, and may (but is not guaranteed to)
    665      * reflect any modifications subsequent to construction.
    666      *
    667      * @return an iterator over the elements in this queue in proper sequence
    668      */
    669     @Override public Iterator<E> iterator() {
    670         final Monitor monitor = this.monitor;
    671         monitor.enter();
    672         try {
    673             return new Itr();
    674         } finally {
    675             monitor.leave();
    676         }
    677     }
    678 
    679     /**
    680      * Iterator for MonitorBasedArrayBlockingQueue
    681      */
    682     private class Itr implements Iterator<E> {
    683         /**
    684          * Index of element to be returned by next,
    685          * or a negative number if no such.
    686          */
    687         private int nextIndex;
    688 
    689         /**
    690          * nextItem holds on to item fields because once we claim
    691          * that an element exists in hasNext(), we must return it in
    692          * the following next() call even if it was in the process of
    693          * being removed when hasNext() was called.
    694          */
    695         private E nextItem;
    696 
    697         /**
    698          * Index of element returned by most recent call to next.
    699          * Reset to -1 if this element is deleted by a call to remove.
    700          */
    701         private int lastRet;
    702 
    703         Itr() {
    704             lastRet = -1;
    705             if (count == 0)
    706                 nextIndex = -1;
    707             else {
    708                 nextIndex = takeIndex;
    709                 nextItem = items[takeIndex];
    710             }
    711         }
    712 
    713         @Override
    714         public boolean hasNext() {
    715             /*
    716              * No sync. We can return true by mistake here
    717              * only if this iterator passed across threads,
    718              * which we don't support anyway.
    719              */
    720             return nextIndex >= 0;
    721         }
    722 
    723         /**
    724          * Checks whether nextIndex is valid; if so setting nextItem.
    725          * Stops iterator when either hits putIndex or sees null item.
    726          */
    727         private void checkNext() {
    728             if (nextIndex == putIndex) {
    729                 nextIndex = -1;
    730                 nextItem = null;
    731             } else {
    732                 nextItem = items[nextIndex];
    733                 if (nextItem == null)
    734                     nextIndex = -1;
    735             }
    736         }
    737 
    738         @Override
    739         public E next() {
    740             final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor;
    741             monitor.enter();
    742             try {
    743                 if (nextIndex < 0)
    744                     throw new NoSuchElementException();
    745                 lastRet = nextIndex;
    746                 E x = nextItem;
    747                 nextIndex = inc(nextIndex);
    748                 checkNext();
    749                 return x;
    750             } finally {
    751                 monitor.leave();
    752             }
    753         }
    754 
    755         @Override
    756         public void remove() {
    757             final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor;
    758             monitor.enter();
    759             try {
    760                 int i = lastRet;
    761                 if (i == -1)
    762                     throw new IllegalStateException();
    763                 lastRet = -1;
    764 
    765                 int ti = takeIndex;
    766                 removeAt(i);
    767                 // back up cursor (reset to front if was first element)
    768                 nextIndex = (i == ti) ? takeIndex : i;
    769                 checkNext();
    770             } finally {
    771                 monitor.leave();
    772             }
    773         }
    774     }
    775 }
    776