Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      3  *
      4  * This code is free software; you can redistribute it and/or modify it
      5  * under the terms of the GNU General Public License version 2 only, as
      6  * published by the Free Software Foundation.  Oracle designates this
      7  * particular file as subject to the "Classpath" exception as provided
      8  * by Oracle in the LICENSE file that accompanied this code.
      9  *
     10  * This code is distributed in the hope that it will be useful, but WITHOUT
     11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     13  * version 2 for more details (a copy is included in the LICENSE file that
     14  * accompanied this code).
     15  *
     16  * You should have received a copy of the GNU General Public License version
     17  * 2 along with this work; if not, write to the Free Software Foundation,
     18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     19  *
     20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     21  * or visit www.oracle.com if you need additional information or have any
     22  * questions.
     23  */
     24 
     25 /*
     26  * This file is available under and governed by the GNU General Public
     27  * License version 2 only, as published by the Free Software Foundation.
     28  * However, the following notice accompanied the original version of this
     29  * file:
     30  *
     31  * Written by Doug Lea with assistance from members of JCP JSR-166
     32  * Expert Group and released to the public domain, as explained at
     33  * http://creativecommons.org/publicdomain/zero/1.0/
     34  */
     35 
     36 package java.util.concurrent;
     37 
     38 import static java.util.concurrent.TimeUnit.NANOSECONDS;
     39 
     40 import java.util.AbstractQueue;
     41 import java.util.Collection;
     42 import java.util.Iterator;
     43 import java.util.NoSuchElementException;
     44 import java.util.PriorityQueue;
     45 import java.util.concurrent.locks.Condition;
     46 import java.util.concurrent.locks.ReentrantLock;
     47 
     48 // BEGIN android-note
     49 // removed link to collections framework docs
     50 // END android-note
     51 
     52 /**
     53  * An unbounded {@linkplain BlockingQueue blocking queue} of
     54  * {@code Delayed} elements, in which an element can only be taken
     55  * when its delay has expired.  The <em>head</em> of the queue is that
     56  * {@code Delayed} element whose delay expired furthest in the
     57  * past.  If no delay has expired there is no head and {@code poll}
     58  * will return {@code null}. Expiration occurs when an element's
     59  * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
     60  * than or equal to zero.  Even though unexpired elements cannot be
     61  * removed using {@code take} or {@code poll}, they are otherwise
     62  * treated as normal elements. For example, the {@code size} method
     63  * returns the count of both expired and unexpired elements.
     64  * This queue does not permit null elements.
     65  *
     66  * <p>This class and its iterator implement all of the
     67  * <em>optional</em> methods of the {@link Collection} and {@link
     68  * Iterator} interfaces.  The Iterator provided in method {@link
     69  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
     70  * the DelayQueue in any particular order.
     71  *
     72  * @since 1.5
     73  * @author Doug Lea
     74  * @param <E> the type of elements held in this queue
     75  */
     76 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
     77     implements BlockingQueue<E> {
     78 
     79     private final transient ReentrantLock lock = new ReentrantLock();
     80     private final PriorityQueue<E> q = new PriorityQueue<E>();
     81 
     82     /**
     83      * Thread designated to wait for the element at the head of
     84      * the queue.  This variant of the Leader-Follower pattern
     85      * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
     86      * minimize unnecessary timed waiting.  When a thread becomes
     87      * the leader, it waits only for the next delay to elapse, but
     88      * other threads await indefinitely.  The leader thread must
     89      * signal some other thread before returning from take() or
     90      * poll(...), unless some other thread becomes leader in the
     91      * interim.  Whenever the head of the queue is replaced with
     92      * an element with an earlier expiration time, the leader
     93      * field is invalidated by being reset to null, and some
     94      * waiting thread, but not necessarily the current leader, is
     95      * signalled.  So waiting threads must be prepared to acquire
     96      * and lose leadership while waiting.
     97      */
     98     private Thread leader;
     99 
    100     /**
    101      * Condition signalled when a newer element becomes available
    102      * at the head of the queue or a new thread may need to
    103      * become leader.
    104      */
    105     private final Condition available = lock.newCondition();
    106 
    107     /**
    108      * Creates a new {@code DelayQueue} that is initially empty.
    109      */
    110     public DelayQueue() {}
    111 
    112     /**
    113      * Creates a {@code DelayQueue} initially containing the elements of the
    114      * given collection of {@link Delayed} instances.
    115      *
    116      * @param c the collection of elements to initially contain
    117      * @throws NullPointerException if the specified collection or any
    118      *         of its elements are null
    119      */
    120     public DelayQueue(Collection<? extends E> c) {
    121         this.addAll(c);
    122     }
    123 
    124     /**
    125      * Inserts the specified element into this delay queue.
    126      *
    127      * @param e the element to add
    128      * @return {@code true} (as specified by {@link Collection#add})
    129      * @throws NullPointerException if the specified element is null
    130      */
    131     public boolean add(E e) {
    132         return offer(e);
    133     }
    134 
    135     /**
    136      * Inserts the specified element into this delay queue.
    137      *
    138      * @param e the element to add
    139      * @return {@code true}
    140      * @throws NullPointerException if the specified element is null
    141      */
    142     public boolean offer(E e) {
    143         final ReentrantLock lock = this.lock;
    144         lock.lock();
    145         try {
    146             q.offer(e);
    147             if (q.peek() == e) {
    148                 leader = null;
    149                 available.signal();
    150             }
    151             return true;
    152         } finally {
    153             lock.unlock();
    154         }
    155     }
    156 
    157     /**
    158      * Inserts the specified element into this delay queue. As the queue is
    159      * unbounded this method will never block.
    160      *
    161      * @param e the element to add
    162      * @throws NullPointerException {@inheritDoc}
    163      */
    164     public void put(E e) {
    165         offer(e);
    166     }
    167 
    168     /**
    169      * Inserts the specified element into this delay queue. As the queue is
    170      * unbounded this method will never block.
    171      *
    172      * @param e the element to add
    173      * @param timeout This parameter is ignored as the method never blocks
    174      * @param unit This parameter is ignored as the method never blocks
    175      * @return {@code true}
    176      * @throws NullPointerException {@inheritDoc}
    177      */
    178     public boolean offer(E e, long timeout, TimeUnit unit) {
    179         return offer(e);
    180     }
    181 
    182     /**
    183      * Retrieves and removes the head of this queue, or returns {@code null}
    184      * if this queue has no elements with an expired delay.
    185      *
    186      * @return the head of this queue, or {@code null} if this
    187      *         queue has no elements with an expired delay
    188      */
    189     public E poll() {
    190         final ReentrantLock lock = this.lock;
    191         lock.lock();
    192         try {
    193             E first = q.peek();
    194             return (first == null || first.getDelay(NANOSECONDS) > 0)
    195                 ? null
    196                 : q.poll();
    197         } finally {
    198             lock.unlock();
    199         }
    200     }
    201 
    202     /**
    203      * Retrieves and removes the head of this queue, waiting if necessary
    204      * until an element with an expired delay is available on this queue.
    205      *
    206      * @return the head of this queue
    207      * @throws InterruptedException {@inheritDoc}
    208      */
    209     public E take() throws InterruptedException {
    210         final ReentrantLock lock = this.lock;
    211         lock.lockInterruptibly();
    212         try {
    213             for (;;) {
    214                 E first = q.peek();
    215                 if (first == null)
    216                     available.await();
    217                 else {
    218                     long delay = first.getDelay(NANOSECONDS);
    219                     if (delay <= 0L)
    220                         return q.poll();
    221                     first = null; // don't retain ref while waiting
    222                     if (leader != null)
    223                         available.await();
    224                     else {
    225                         Thread thisThread = Thread.currentThread();
    226                         leader = thisThread;
    227                         try {
    228                             available.awaitNanos(delay);
    229                         } finally {
    230                             if (leader == thisThread)
    231                                 leader = null;
    232                         }
    233                     }
    234                 }
    235             }
    236         } finally {
    237             if (leader == null && q.peek() != null)
    238                 available.signal();
    239             lock.unlock();
    240         }
    241     }
    242 
    243     /**
    244      * Retrieves and removes the head of this queue, waiting if necessary
    245      * until an element with an expired delay is available on this queue,
    246      * or the specified wait time expires.
    247      *
    248      * @return the head of this queue, or {@code null} if the
    249      *         specified waiting time elapses before an element with
    250      *         an expired delay becomes available
    251      * @throws InterruptedException {@inheritDoc}
    252      */
    253     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    254         long nanos = unit.toNanos(timeout);
    255         final ReentrantLock lock = this.lock;
    256         lock.lockInterruptibly();
    257         try {
    258             for (;;) {
    259                 E first = q.peek();
    260                 if (first == null) {
    261                     if (nanos <= 0L)
    262                         return null;
    263                     else
    264                         nanos = available.awaitNanos(nanos);
    265                 } else {
    266                     long delay = first.getDelay(NANOSECONDS);
    267                     if (delay <= 0L)
    268                         return q.poll();
    269                     if (nanos <= 0L)
    270                         return null;
    271                     first = null; // don't retain ref while waiting
    272                     if (nanos < delay || leader != null)
    273                         nanos = available.awaitNanos(nanos);
    274                     else {
    275                         Thread thisThread = Thread.currentThread();
    276                         leader = thisThread;
    277                         try {
    278                             long timeLeft = available.awaitNanos(delay);
    279                             nanos -= delay - timeLeft;
    280                         } finally {
    281                             if (leader == thisThread)
    282                                 leader = null;
    283                         }
    284                     }
    285                 }
    286             }
    287         } finally {
    288             if (leader == null && q.peek() != null)
    289                 available.signal();
    290             lock.unlock();
    291         }
    292     }
    293 
    294     /**
    295      * Retrieves, but does not remove, the head of this queue, or
    296      * returns {@code null} if this queue is empty.  Unlike
    297      * {@code poll}, if no expired elements are available in the queue,
    298      * this method returns the element that will expire next,
    299      * if one exists.
    300      *
    301      * @return the head of this queue, or {@code null} if this
    302      *         queue is empty
    303      */
    304     public E peek() {
    305         final ReentrantLock lock = this.lock;
    306         lock.lock();
    307         try {
    308             return q.peek();
    309         } finally {
    310             lock.unlock();
    311         }
    312     }
    313 
    314     public int size() {
    315         final ReentrantLock lock = this.lock;
    316         lock.lock();
    317         try {
    318             return q.size();
    319         } finally {
    320             lock.unlock();
    321         }
    322     }
    323 
    324     /**
    325      * Returns first element only if it is expired.
    326      * Used only by drainTo.  Call only when holding lock.
    327      */
    328     private E peekExpired() {
    329         // assert lock.isHeldByCurrentThread();
    330         E first = q.peek();
    331         return (first == null || first.getDelay(NANOSECONDS) > 0) ?
    332             null : first;
    333     }
    334 
    335     /**
    336      * @throws UnsupportedOperationException {@inheritDoc}
    337      * @throws ClassCastException            {@inheritDoc}
    338      * @throws NullPointerException          {@inheritDoc}
    339      * @throws IllegalArgumentException      {@inheritDoc}
    340      */
    341     public int drainTo(Collection<? super E> c) {
    342         if (c == null)
    343             throw new NullPointerException();
    344         if (c == this)
    345             throw new IllegalArgumentException();
    346         final ReentrantLock lock = this.lock;
    347         lock.lock();
    348         try {
    349             int n = 0;
    350             for (E e; (e = peekExpired()) != null;) {
    351                 c.add(e);       // In this order, in case add() throws.
    352                 q.poll();
    353                 ++n;
    354             }
    355             return n;
    356         } finally {
    357             lock.unlock();
    358         }
    359     }
    360 
    361     /**
    362      * @throws UnsupportedOperationException {@inheritDoc}
    363      * @throws ClassCastException            {@inheritDoc}
    364      * @throws NullPointerException          {@inheritDoc}
    365      * @throws IllegalArgumentException      {@inheritDoc}
    366      */
    367     public int drainTo(Collection<? super E> c, int maxElements) {
    368         if (c == null)
    369             throw new NullPointerException();
    370         if (c == this)
    371             throw new IllegalArgumentException();
    372         if (maxElements <= 0)
    373             return 0;
    374         final ReentrantLock lock = this.lock;
    375         lock.lock();
    376         try {
    377             int n = 0;
    378             for (E e; n < maxElements && (e = peekExpired()) != null;) {
    379                 c.add(e);       // In this order, in case add() throws.
    380                 q.poll();
    381                 ++n;
    382             }
    383             return n;
    384         } finally {
    385             lock.unlock();
    386         }
    387     }
    388 
    389     /**
    390      * Atomically removes all of the elements from this delay queue.
    391      * The queue will be empty after this call returns.
    392      * Elements with an unexpired delay are not waited for; they are
    393      * simply discarded from the queue.
    394      */
    395     public void clear() {
    396         final ReentrantLock lock = this.lock;
    397         lock.lock();
    398         try {
    399             q.clear();
    400         } finally {
    401             lock.unlock();
    402         }
    403     }
    404 
    405     /**
    406      * Always returns {@code Integer.MAX_VALUE} because
    407      * a {@code DelayQueue} is not capacity constrained.
    408      *
    409      * @return {@code Integer.MAX_VALUE}
    410      */
    411     public int remainingCapacity() {
    412         return Integer.MAX_VALUE;
    413     }
    414 
    415     /**
    416      * Returns an array containing all of the elements in this queue.
    417      * The returned array elements are in no particular order.
    418      *
    419      * <p>The returned array will be "safe" in that no references to it are
    420      * maintained by this queue.  (In other words, this method must allocate
    421      * a new array).  The caller is thus free to modify the returned array.
    422      *
    423      * <p>This method acts as bridge between array-based and collection-based
    424      * APIs.
    425      *
    426      * @return an array containing all of the elements in this queue
    427      */
    428     public Object[] toArray() {
    429         final ReentrantLock lock = this.lock;
    430         lock.lock();
    431         try {
    432             return q.toArray();
    433         } finally {
    434             lock.unlock();
    435         }
    436     }
    437 
    438     /**
    439      * Returns an array containing all of the elements in this queue; the
    440      * runtime type of the returned array is that of the specified array.
    441      * The returned array elements are in no particular order.
    442      * If the queue fits in the specified array, it is returned therein.
    443      * Otherwise, a new array is allocated with the runtime type of the
    444      * specified array and the size of this queue.
    445      *
    446      * <p>If this queue fits in the specified array with room to spare
    447      * (i.e., the array has more elements than this queue), the element in
    448      * the array immediately following the end of the queue is set to
    449      * {@code null}.
    450      *
    451      * <p>Like the {@link #toArray()} method, this method acts as bridge between
    452      * array-based and collection-based APIs.  Further, this method allows
    453      * precise control over the runtime type of the output array, and may,
    454      * under certain circumstances, be used to save allocation costs.
    455      *
    456      * <p>The following code can be used to dump a delay queue into a newly
    457      * allocated array of {@code Delayed}:
    458      *
    459      * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
    460      *
    461      * Note that {@code toArray(new Object[0])} is identical in function to
    462      * {@code toArray()}.
    463      *
    464      * @param a the array into which the elements of the queue are to
    465      *          be stored, if it is big enough; otherwise, a new array of the
    466      *          same runtime type is allocated for this purpose
    467      * @return an array containing all of the elements in this queue
    468      * @throws ArrayStoreException if the runtime type of the specified array
    469      *         is not a supertype of the runtime type of every element in
    470      *         this queue
    471      * @throws NullPointerException if the specified array is null
    472      */
    473     public <T> T[] toArray(T[] a) {
    474         final ReentrantLock lock = this.lock;
    475         lock.lock();
    476         try {
    477             return q.toArray(a);
    478         } finally {
    479             lock.unlock();
    480         }
    481     }
    482 
    483     /**
    484      * Removes a single instance of the specified element from this
    485      * queue, if it is present, whether or not it has expired.
    486      */
    487     public boolean remove(Object o) {
    488         final ReentrantLock lock = this.lock;
    489         lock.lock();
    490         try {
    491             return q.remove(o);
    492         } finally {
    493             lock.unlock();
    494         }
    495     }
    496 
    497     /**
    498      * Identity-based version for use in Itr.remove.
    499      */
    500     void removeEQ(Object o) {
    501         final ReentrantLock lock = this.lock;
    502         lock.lock();
    503         try {
    504             for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
    505                 if (o == it.next()) {
    506                     it.remove();
    507                     break;
    508                 }
    509             }
    510         } finally {
    511             lock.unlock();
    512         }
    513     }
    514 
    515     /**
    516      * Returns an iterator over all the elements (both expired and
    517      * unexpired) in this queue. The iterator does not return the
    518      * elements in any particular order.
    519      *
    520      * <p>The returned iterator is
    521      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
    522      *
    523      * @return an iterator over the elements in this queue
    524      */
    525     public Iterator<E> iterator() {
    526         return new Itr(toArray());
    527     }
    528 
    529     /**
    530      * Snapshot iterator that works off copy of underlying q array.
    531      */
    532     private class Itr implements Iterator<E> {
    533         final Object[] array; // Array of all elements
    534         int cursor;           // index of next element to return
    535         int lastRet;          // index of last element, or -1 if no such
    536 
    537         Itr(Object[] array) {
    538             lastRet = -1;
    539             this.array = array;
    540         }
    541 
    542         public boolean hasNext() {
    543             return cursor < array.length;
    544         }
    545 
    546         @SuppressWarnings("unchecked")
    547         public E next() {
    548             if (cursor >= array.length)
    549                 throw new NoSuchElementException();
    550             lastRet = cursor;
    551             return (E)array[cursor++];
    552         }
    553 
    554         public void remove() {
    555             if (lastRet < 0)
    556                 throw new IllegalStateException();
    557             removeEQ(array[lastRet]);
    558             lastRet = -1;
    559         }
    560     }
    561 
    562 }
    563