Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2010 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import static com.google.common.base.Preconditions.checkNotNull;
     20 
     21 import java.util.AbstractQueue;
     22 import java.util.Collection;
     23 import java.util.Comparator;
     24 import java.util.ConcurrentModificationException;
     25 import java.util.Iterator;
     26 import java.util.NoSuchElementException;
     27 import java.util.PriorityQueue;
     28 import java.util.Queue;
     29 import java.util.SortedSet;
     30 import java.util.concurrent.BlockingQueue;
     31 import java.util.concurrent.TimeUnit;
     32 
     33 import javax.annotation.Nullable;
     34 
     35 /**
     36  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
     37  * the same ordering rules as class {@link PriorityQueue} and supplies
     38  * blocking retrieval operations.  While this queue is logically
     39  * unbounded, attempted additions may fail due to resource exhaustion
     40  * (causing <tt>OutOfMemoryError</tt>). This class does not permit
     41  * <tt>null</tt> elements.  A priority queue relying on {@linkplain
     42  * Comparable natural ordering} also does not permit insertion of
     43  * non-comparable objects (doing so results in
     44  * <tt>ClassCastException</tt>).
     45  *
     46  * <p>This class and its iterator implement all of the
     47  * <em>optional</em> methods of the {@link Collection} and {@link
     48  * Iterator} interfaces.  The Iterator provided in method {@link
     49  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
     50  * the MonitorBasedPriorityBlockingQueue in any particular order. If you need
     51  * ordered traversal, consider using
     52  * <tt>Arrays.sort(pq.toArray())</tt>.  Also, method <tt>drainTo</tt>
     53  * can be used to <em>remove</em> some or all elements in priority
     54  * order and place them in another collection.
     55  *
     56  * <p>Operations on this class make no guarantees about the ordering
     57  * of elements with equal priority. If you need to enforce an
     58  * ordering, you can define custom classes or comparators that use a
     59  * secondary key to break ties in primary priority values.  For
     60  * example, here is a class that applies first-in-first-out
     61  * tie-breaking to comparable elements. To use it, you would insert a
     62  * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
     63  *
     64  * <pre>
     65  * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
     66  *     implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
     67  *   final static AtomicLong seq = new AtomicLong();
     68  *   final long seqNum;
     69  *   final E entry;
     70  *   public FIFOEntry(E entry) {
     71  *     seqNum = seq.getAndIncrement();
     72  *     this.entry = entry;
     73  *   }
     74  *   public E getEntry() { return entry; }
     75  *   public int compareTo(FIFOEntry&lt;E&gt; other) {
     76  *     int res = entry.compareTo(other.entry);
     77  *     if (res == 0 &amp;&amp; other.entry != this.entry)
     78  *       res = (seqNum &lt; other.seqNum ? -1 : 1);
     79  *     return res;
     80  *   }
     81  * }</pre>
     82  *
     83  * @author Doug Lea
     84  * @author Justin T. Sampson
     85  * @param <E> the type of elements held in this collection
     86  */
     87 public class MonitorBasedPriorityBlockingQueue<E> extends AbstractQueue<E>
     88     implements BlockingQueue<E> {
     89 
     90     // Based on revision 1.55 of PriorityBlockingQueue by Doug Lea, from
     91     // http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/
     92 
     93     private static final long serialVersionUID = 5595510919245408276L;
     94 
     95     final PriorityQueue<E> q;
     96     final Monitor monitor = new Monitor(true);
     97     private final Monitor.Guard notEmpty =
     98         new Monitor.Guard(monitor) {
     99             @Override public boolean isSatisfied() {
    100               return !q.isEmpty();
    101             }
    102         };
    103 
    104     /**
    105      * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the default
    106      * initial capacity (11) that orders its elements according to
    107      * their {@linkplain Comparable natural ordering}.
    108      */
    109     public MonitorBasedPriorityBlockingQueue() {
    110         q = new PriorityQueue<E>();
    111     }
    112 
    113     /**
    114      * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the specified
    115      * initial capacity that orders its elements according to their
    116      * {@linkplain Comparable natural ordering}.
    117      *
    118      * @param initialCapacity the initial capacity for this priority queue
    119      * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
    120      *         than 1
    121      */
    122     public MonitorBasedPriorityBlockingQueue(int initialCapacity) {
    123         q = new PriorityQueue<E>(initialCapacity, null);
    124     }
    125 
    126     /**
    127      * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the specified initial
    128      * capacity that orders its elements according to the specified
    129      * comparator.
    130      *
    131      * @param initialCapacity the initial capacity for this priority queue
    132      * @param  comparator the comparator that will be used to order this
    133      *         priority queue.  If {@code null}, the {@linkplain Comparable
    134      *         natural ordering} of the elements will be used.
    135      * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
    136      *         than 1
    137      */
    138     public MonitorBasedPriorityBlockingQueue(int initialCapacity,
    139                                  @Nullable Comparator<? super E> comparator) {
    140         q = new PriorityQueue<E>(initialCapacity, comparator);
    141     }
    142 
    143     /**
    144      * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> containing the elements
    145      * in the specified collection.  If the specified collection is a
    146      * {@link SortedSet} or a {@link PriorityQueue},  this
    147      * priority queue will be ordered according to the same ordering.
    148      * Otherwise, this priority queue will be ordered according to the
    149      * {@linkplain Comparable natural ordering} of its elements.
    150      *
    151      * @param  c the collection whose elements are to be placed
    152      *         into this priority queue
    153      * @throws ClassCastException if elements of the specified collection
    154      *         cannot be compared to one another according to the priority
    155      *         queue's ordering
    156      * @throws NullPointerException if the specified collection or any
    157      *         of its elements are null
    158      */
    159     public MonitorBasedPriorityBlockingQueue(Collection<? extends E> c) {
    160         q = new PriorityQueue<E>(c);
    161     }
    162 
    163     /**
    164      * Inserts the specified element into this priority queue.
    165      *
    166      * @param e the element to add
    167      * @return <tt>true</tt> (as specified by {@link Collection#add})
    168      * @throws ClassCastException if the specified element cannot be compared
    169      *         with elements currently in the priority queue according to the
    170      *         priority queue's ordering
    171      * @throws NullPointerException if the specified element is null
    172      */
    173     @Override public boolean add(E e) {
    174         return offer(e);
    175     }
    176 
    177     /**
    178      * Inserts the specified element into this priority queue.
    179      *
    180      * @param e the element to add
    181      * @return <tt>true</tt> (as specified by {@link Queue#offer})
    182      * @throws ClassCastException if the specified element cannot be compared
    183      *         with elements currently in the priority queue according to the
    184      *         priority queue's ordering
    185      * @throws NullPointerException if the specified element is null
    186      */
    187     @Override
    188     public boolean offer(E e) {
    189         final Monitor monitor = this.monitor;
    190         monitor.enter();
    191         try {
    192             boolean ok = q.offer(e);
    193             if (!ok) {
    194               throw new AssertionError();
    195             }
    196             return true;
    197         } finally {
    198             monitor.leave();
    199         }
    200     }
    201 
    202     /**
    203      * Inserts the specified element into this priority queue. As the queue is
    204      * unbounded this method will never block.
    205      *
    206      * @param e the element to add
    207      * @throws ClassCastException if the specified element cannot be compared
    208      *         with elements currently in the priority queue according to the
    209      *         priority queue's ordering
    210      * @throws NullPointerException if the specified element is null
    211      */
    212     @Override
    213     public void put(E e) {
    214         offer(e); // never need to block
    215     }
    216 
    217     /**
    218      * Inserts the specified element into this priority queue. As the queue is
    219      * unbounded this method will never block.
    220      *
    221      * @param e the element to add
    222      * @param timeout This parameter is ignored as the method never blocks
    223      * @param unit This parameter is ignored as the method never blocks
    224      * @return <tt>true</tt>
    225      * @throws ClassCastException if the specified element cannot be compared
    226      *         with elements currently in the priority queue according to the
    227      *         priority queue's ordering
    228      * @throws NullPointerException if the specified element is null
    229      */
    230     @Override
    231     public boolean offer(E e, long timeout, TimeUnit unit) {
    232         checkNotNull(unit);
    233         return offer(e); // never need to block
    234     }
    235 
    236     @Override
    237     public E poll() {
    238         final Monitor monitor = this.monitor;
    239         monitor.enter();
    240         try {
    241             return q.poll();
    242         } finally {
    243             monitor.leave();
    244         }
    245     }
    246 
    247     @Override
    248     public E take() throws InterruptedException {
    249         final Monitor monitor = this.monitor;
    250         monitor.enterWhen(notEmpty);
    251         try {
    252             return q.poll();
    253         } finally {
    254             monitor.leave();
    255         }
    256     }
    257 
    258     @Override
    259     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    260         final Monitor monitor = this.monitor;
    261         if (monitor.enterWhen(notEmpty, timeout, unit)) {
    262             try {
    263                 return q.poll();
    264             } finally {
    265                 monitor.leave();
    266             }
    267         } else {
    268             return null;
    269         }
    270     }
    271 
    272     @Override
    273     public E peek() {
    274         final Monitor monitor = this.monitor;
    275         monitor.enter();
    276         try {
    277             return q.peek();
    278         } finally {
    279             monitor.leave();
    280         }
    281     }
    282 
    283     /**
    284      * Returns the comparator used to order the elements in this queue,
    285      * or <tt>null</tt> if this queue uses the {@linkplain Comparable
    286      * natural ordering} of its elements.
    287      *
    288      * @return the comparator used to order the elements in this queue,
    289      *         or <tt>null</tt> if this queue uses the natural
    290      *         ordering of its elements
    291      */
    292     public Comparator<? super E> comparator() {
    293         return q.comparator();
    294     }
    295 
    296     @Override public int size() {
    297         final Monitor monitor = this.monitor;
    298         monitor.enter();
    299         try {
    300             return q.size();
    301         } finally {
    302             monitor.leave();
    303         }
    304     }
    305 
    306     /**
    307      * Always returns <tt>Integer.MAX_VALUE</tt> because
    308      * a <tt>MonitorBasedPriorityBlockingQueue</tt> is not capacity constrained.
    309      * @return <tt>Integer.MAX_VALUE</tt>
    310      */
    311     @Override
    312     public int remainingCapacity() {
    313         return Integer.MAX_VALUE;
    314     }
    315 
    316     /**
    317      * Removes a single instance of the specified element from this queue,
    318      * if it is present.  More formally, removes an element {@code e} such
    319      * that {@code o.equals(e)}, if this queue contains one or more such
    320      * elements.  Returns {@code true} if and only if this queue contained
    321      * the specified element (or equivalently, if this queue changed as a
    322      * result of the call).
    323      *
    324      * @param o element to be removed from this queue, if present
    325      * @return <tt>true</tt> if this queue changed as a result of the call
    326      */
    327     @Override public boolean remove(@Nullable Object o) {
    328         final Monitor monitor = this.monitor;
    329         monitor.enter();
    330         try {
    331             return q.remove(o);
    332         } finally {
    333             monitor.leave();
    334         }
    335     }
    336 
    337     /**
    338      * Returns {@code true} if this queue contains the specified element.
    339      * More formally, returns {@code true} if and only if this queue contains
    340      * at least one element {@code e} such that {@code o.equals(e)}.
    341      *
    342      * @param o object to be checked for containment in this queue
    343      * @return <tt>true</tt> if this queue contains the specified element
    344      */
    345     @Override public boolean contains(@Nullable Object o) {
    346         final Monitor monitor = this.monitor;
    347         monitor.enter();
    348         try {
    349             return q.contains(o);
    350         } finally {
    351             monitor.leave();
    352         }
    353     }
    354 
    355     /**
    356      * Returns an array containing all of the elements in this queue.
    357      * The returned array elements are in no particular order.
    358      *
    359      * <p>The returned array will be "safe" in that no references to it are
    360      * maintained by this queue.  (In other words, this method must allocate
    361      * a new array).  The caller is thus free to modify the returned array.
    362      *
    363      * <p>This method acts as bridge between array-based and collection-based
    364      * APIs.
    365      *
    366      * @return an array containing all of the elements in this queue
    367      */
    368     @Override public Object[] toArray() {
    369         final Monitor monitor = this.monitor;
    370         monitor.enter();
    371         try {
    372             return q.toArray();
    373         } finally {
    374             monitor.leave();
    375         }
    376     }
    377 
    378     @Override public String toString() {
    379         final Monitor monitor = this.monitor;
    380         monitor.enter();
    381         try {
    382             return q.toString();
    383         } finally {
    384             monitor.leave();
    385         }
    386     }
    387 
    388     /**
    389      * @throws UnsupportedOperationException {@inheritDoc}
    390      * @throws ClassCastException            {@inheritDoc}
    391      * @throws NullPointerException          {@inheritDoc}
    392      * @throws IllegalArgumentException      {@inheritDoc}
    393      */
    394     @Override
    395     public int drainTo(Collection<? super E> c) {
    396         if (c == null)
    397             throw new NullPointerException();
    398         if (c == this)
    399             throw new IllegalArgumentException();
    400         final Monitor monitor = this.monitor;
    401         monitor.enter();
    402         try {
    403             int n = 0;
    404             E e;
    405             while ( (e = q.poll()) != null) {
    406                 c.add(e);
    407                 ++n;
    408             }
    409             return n;
    410         } finally {
    411             monitor.leave();
    412         }
    413     }
    414 
    415     /**
    416      * @throws UnsupportedOperationException {@inheritDoc}
    417      * @throws ClassCastException            {@inheritDoc}
    418      * @throws NullPointerException          {@inheritDoc}
    419      * @throws IllegalArgumentException      {@inheritDoc}
    420      */
    421     @Override
    422     public int drainTo(Collection<? super E> c, int maxElements) {
    423         if (c == null)
    424             throw new NullPointerException();
    425         if (c == this)
    426             throw new IllegalArgumentException();
    427         if (maxElements <= 0)
    428             return 0;
    429         final Monitor monitor = this.monitor;
    430         monitor.enter();
    431         try {
    432             int n = 0;
    433             E e;
    434             while (n < maxElements && (e = q.poll()) != null) {
    435                 c.add(e);
    436                 ++n;
    437             }
    438             return n;
    439         } finally {
    440             monitor.leave();
    441         }
    442     }
    443 
    444     /**
    445      * Atomically removes all of the elements from this queue.
    446      * The queue will be empty after this call returns.
    447      */
    448     @Override public void clear() {
    449         final Monitor monitor = this.monitor;
    450         monitor.enter();
    451         try {
    452             q.clear();
    453         } finally {
    454             monitor.leave();
    455         }
    456     }
    457 
    458     /**
    459      * Returns an array containing all of the elements in this queue; the
    460      * runtime type of the returned array is that of the specified array.
    461      * The returned array elements are in no particular order.
    462      * If the queue fits in the specified array, it is returned therein.
    463      * Otherwise, a new array is allocated with the runtime type of the
    464      * specified array and the size of this queue.
    465      *
    466      * <p>If this queue fits in the specified array with room to spare
    467      * (i.e., the array has more elements than this queue), the element in
    468      * the array immediately following the end of the queue is set to
    469      * <tt>null</tt>.
    470      *
    471      * <p>Like the {@link #toArray()} method, this method acts as bridge between
    472      * array-based and collection-based APIs.  Further, this method allows
    473      * precise control over the runtime type of the output array, and may,
    474      * under certain circumstances, be used to save allocation costs.
    475      *
    476      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
    477      * The following code can be used to dump the queue into a newly
    478      * allocated array of <tt>String</tt>:
    479      *
    480      * <pre>
    481      *     String[] y = x.toArray(new String[0]);</pre>
    482      *
    483      * <p>Note that <tt>toArray(new Object[0])</tt> is identical in function to
    484      * <tt>toArray()</tt>.
    485      *
    486      * @param a the array into which the elements of the queue are to
    487      *          be stored, if it is big enough; otherwise, a new array of the
    488      *          same runtime type is allocated for this purpose
    489      * @return an array containing all of the elements in this queue
    490      * @throws ArrayStoreException if the runtime type of the specified array
    491      *         is not a supertype of the runtime type of every element in
    492      *         this queue
    493      * @throws NullPointerException if the specified array is null
    494      */
    495     @Override public <T> T[] toArray(T[] a) {
    496         final Monitor monitor = this.monitor;
    497         monitor.enter();
    498         try {
    499             return q.toArray(a);
    500         } finally {
    501             monitor.leave();
    502         }
    503     }
    504 
    505     /**
    506      * Returns an iterator over the elements in this queue. The
    507      * iterator does not return the elements in any particular order.
    508      * The returned <tt>Iterator</tt> is a "weakly consistent"
    509      * iterator that will never throw {@link
    510      * ConcurrentModificationException}, and guarantees to traverse
    511      * elements as they existed upon construction of the iterator, and
    512      * may (but is not guaranteed to) reflect any modifications
    513      * subsequent to construction.
    514      *
    515      * @return an iterator over the elements in this queue
    516      */
    517     @Override public Iterator<E> iterator() {
    518         return new Itr(toArray());
    519     }
    520 
    521     /**
    522      * Snapshot iterator that works off copy of underlying q array.
    523      */
    524     private class Itr implements Iterator<E> {
    525         final Object[] array; // Array of all elements
    526         int cursor;           // index of next element to return;
    527         int lastRet;          // index of last element, or -1 if no such
    528 
    529         Itr(Object[] array) {
    530             lastRet = -1;
    531             this.array = array;
    532         }
    533 
    534         @Override
    535         public boolean hasNext() {
    536             return cursor < array.length;
    537         }
    538 
    539         @Override
    540         public E next() {
    541             if (cursor >= array.length)
    542                 throw new NoSuchElementException();
    543             lastRet = cursor;
    544 
    545             // array comes from q.toArray() and so should have only E's in it
    546             @SuppressWarnings("unchecked")
    547             E e = (E) array[cursor++];
    548             return e;
    549         }
    550 
    551         @Override
    552         public void remove() {
    553             if (lastRet < 0)
    554                 throw new IllegalStateException();
    555             Object x = array[lastRet];
    556             lastRet = -1;
    557             // Traverse underlying queue to find == element,
    558             // not just a .equals element.
    559             monitor.enter();
    560             try {
    561                 for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
    562                     if (it.next() == x) {
    563                         it.remove();
    564                         return;
    565                     }
    566                 }
    567             } finally {
    568                 monitor.leave();
    569             }
    570         }
    571     }
    572 
    573     /**
    574      * Saves the state to a stream (that is, serializes it).  This
    575      * merely wraps default serialization within the monitor.  The
    576      * serialization strategy for items is left to underlying
    577      * Queue. Note that locking is not needed on deserialization, so
    578      * readObject is not defined, just relying on default.
    579      */
    580     private void writeObject(java.io.ObjectOutputStream s)
    581         throws java.io.IOException {
    582         monitor.enter();
    583         try {
    584             s.defaultWriteObject();
    585         } finally {
    586             monitor.leave();
    587         }
    588     }
    589 
    590 }
    591