Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      3  *
      4  * This code is free software; you can redistribute it and/or modify it
      5  * under the terms of the GNU General Public License version 2 only, as
      6  * published by the Free Software Foundation.  Oracle designates this
      7  * particular file as subject to the "Classpath" exception as provided
      8  * by Oracle in the LICENSE file that accompanied this code.
      9  *
     10  * This code is distributed in the hope that it will be useful, but WITHOUT
     11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     13  * version 2 for more details (a copy is included in the LICENSE file that
     14  * accompanied this code).
     15  *
     16  * You should have received a copy of the GNU General Public License version
     17  * 2 along with this work; if not, write to the Free Software Foundation,
     18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     19  *
     20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     21  * or visit www.oracle.com if you need additional information or have any
     22  * questions.
     23  */
     24 
     25 /*
     26  * This file is available under and governed by the GNU General Public
     27  * License version 2 only, as published by the Free Software Foundation.
     28  * However, the following notice accompanied the original version of this
     29  * file:
     30  *
     31  * Written by Doug Lea with assistance from members of JCP JSR-166
     32  * Expert Group and released to the public domain, as explained at
     33  * http://creativecommons.org/publicdomain/zero/1.0/
     34  */
     35 
     36 package java.util.concurrent;
     37 
     38 import java.util.AbstractQueue;
     39 import java.util.Arrays;
     40 import java.util.Collection;
     41 import java.util.Comparator;
     42 import java.util.Iterator;
     43 import java.util.NoSuchElementException;
     44 import java.util.PriorityQueue;
     45 import java.util.Queue;
     46 import java.util.SortedSet;
     47 import java.util.Spliterator;
     48 import java.util.concurrent.locks.Condition;
     49 import java.util.concurrent.locks.ReentrantLock;
     50 import java.util.function.Consumer;
     51 
     52 // BEGIN android-note
     53 // removed link to collections framework docs
     54 // END android-note
     55 
     56 /**
     57  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
     58  * the same ordering rules as class {@link PriorityQueue} and supplies
     59  * blocking retrieval operations.  While this queue is logically
     60  * unbounded, attempted additions may fail due to resource exhaustion
     61  * (causing {@code OutOfMemoryError}). This class does not permit
     62  * {@code null} elements.  A priority queue relying on {@linkplain
     63  * Comparable natural ordering} also does not permit insertion of
     64  * non-comparable objects (doing so results in
     65  * {@code ClassCastException}).
     66  *
     67  * <p>This class and its iterator implement all of the
     68  * <em>optional</em> methods of the {@link Collection} and {@link
     69  * Iterator} interfaces.  The Iterator provided in method {@link
     70  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
     71  * the PriorityBlockingQueue in any particular order. If you need
     72  * ordered traversal, consider using
     73  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}
     74  * can be used to <em>remove</em> some or all elements in priority
     75  * order and place them in another collection.
     76  *
     77  * <p>Operations on this class make no guarantees about the ordering
     78  * of elements with equal priority. If you need to enforce an
     79  * ordering, you can define custom classes or comparators that use a
     80  * secondary key to break ties in primary priority values.  For
     81  * example, here is a class that applies first-in-first-out
     82  * tie-breaking to comparable elements. To use it, you would insert a
     83  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
     84  *
     85  * <pre> {@code
     86  * class FIFOEntry<E extends Comparable<? super E>>
     87  *     implements Comparable<FIFOEntry<E>> {
     88  *   static final AtomicLong seq = new AtomicLong(0);
     89  *   final long seqNum;
     90  *   final E entry;
     91  *   public FIFOEntry(E entry) {
     92  *     seqNum = seq.getAndIncrement();
     93  *     this.entry = entry;
     94  *   }
     95  *   public E getEntry() { return entry; }
     96  *   public int compareTo(FIFOEntry<E> other) {
     97  *     int res = entry.compareTo(other.entry);
     98  *     if (res == 0 && other.entry != this.entry)
     99  *       res = (seqNum < other.seqNum ? -1 : 1);
    100  *     return res;
    101  *   }
    102  * }}</pre>
    103  *
    104  * @since 1.5
    105  * @author Doug Lea
    106  * @param <E> the type of elements held in this queue
    107  */
    108 @SuppressWarnings("unchecked")
    109 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    110     implements BlockingQueue<E>, java.io.Serializable {
    111     private static final long serialVersionUID = 5595510919245408276L;
    112 
    113     /*
    114      * The implementation uses an array-based binary heap, with public
    115      * operations protected with a single lock. However, allocation
    116      * during resizing uses a simple spinlock (used only while not
    117      * holding main lock) in order to allow takes to operate
    118      * concurrently with allocation.  This avoids repeated
    119      * postponement of waiting consumers and consequent element
    120      * build-up. The need to back away from lock during allocation
    121      * makes it impossible to simply wrap delegated
    122      * java.util.PriorityQueue operations within a lock, as was done
    123      * in a previous version of this class. To maintain
    124      * interoperability, a plain PriorityQueue is still used during
    125      * serialization, which maintains compatibility at the expense of
    126      * transiently doubling overhead.
    127      */
    128 
    129     /**
    130      * Default array capacity.
    131      */
    132     private static final int DEFAULT_INITIAL_CAPACITY = 11;
    133 
    134     /**
    135      * The maximum size of array to allocate.
    136      * Some VMs reserve some header words in an array.
    137      * Attempts to allocate larger arrays may result in
    138      * OutOfMemoryError: Requested array size exceeds VM limit
    139      */
    140     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    141 
    142     /**
    143      * Priority queue represented as a balanced binary heap: the two
    144      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
    145      * priority queue is ordered by comparator, or by the elements'
    146      * natural ordering, if comparator is null: For each node n in the
    147      * heap and each descendant d of n, n <= d.  The element with the
    148      * lowest value is in queue[0], assuming the queue is nonempty.
    149      */
    150     private transient Object[] queue;
    151 
    152     /**
    153      * The number of elements in the priority queue.
    154      */
    155     private transient int size;
    156 
    157     /**
    158      * The comparator, or null if priority queue uses elements'
    159      * natural ordering.
    160      */
    161     private transient Comparator<? super E> comparator;
    162 
    163     /**
    164      * Lock used for all public operations.
    165      */
    166     private final ReentrantLock lock;
    167 
    168     /**
    169      * Condition for blocking when empty.
    170      */
    171     private final Condition notEmpty;
    172 
    173     /**
    174      * Spinlock for allocation, acquired via CAS.
    175      */
    176     private transient volatile int allocationSpinLock;
    177 
    178     /**
    179      * A plain PriorityQueue used only for serialization,
    180      * to maintain compatibility with previous versions
    181      * of this class. Non-null only during serialization/deserialization.
    182      */
    183     private PriorityQueue<E> q;
    184 
    185     /**
    186      * Creates a {@code PriorityBlockingQueue} with the default
    187      * initial capacity (11) that orders its elements according to
    188      * their {@linkplain Comparable natural ordering}.
    189      */
    190     public PriorityBlockingQueue() {
    191         this(DEFAULT_INITIAL_CAPACITY, null);
    192     }
    193 
    194     /**
    195      * Creates a {@code PriorityBlockingQueue} with the specified
    196      * initial capacity that orders its elements according to their
    197      * {@linkplain Comparable natural ordering}.
    198      *
    199      * @param initialCapacity the initial capacity for this priority queue
    200      * @throws IllegalArgumentException if {@code initialCapacity} is less
    201      *         than 1
    202      */
    203     public PriorityBlockingQueue(int initialCapacity) {
    204         this(initialCapacity, null);
    205     }
    206 
    207     /**
    208      * Creates a {@code PriorityBlockingQueue} with the specified initial
    209      * capacity that orders its elements according to the specified
    210      * comparator.
    211      *
    212      * @param initialCapacity the initial capacity for this priority queue
    213      * @param  comparator the comparator that will be used to order this
    214      *         priority queue.  If {@code null}, the {@linkplain Comparable
    215      *         natural ordering} of the elements will be used.
    216      * @throws IllegalArgumentException if {@code initialCapacity} is less
    217      *         than 1
    218      */
    219     public PriorityBlockingQueue(int initialCapacity,
    220                                  Comparator<? super E> comparator) {
    221         if (initialCapacity < 1)
    222             throw new IllegalArgumentException();
    223         this.lock = new ReentrantLock();
    224         this.notEmpty = lock.newCondition();
    225         this.comparator = comparator;
    226         this.queue = new Object[initialCapacity];
    227     }
    228 
    229     /**
    230      * Creates a {@code PriorityBlockingQueue} containing the elements
    231      * in the specified collection.  If the specified collection is a
    232      * {@link SortedSet} or a {@link PriorityQueue}, this
    233      * priority queue will be ordered according to the same ordering.
    234      * Otherwise, this priority queue will be ordered according to the
    235      * {@linkplain Comparable natural ordering} of its elements.
    236      *
    237      * @param  c the collection whose elements are to be placed
    238      *         into this priority queue
    239      * @throws ClassCastException if elements of the specified collection
    240      *         cannot be compared to one another according to the priority
    241      *         queue's ordering
    242      * @throws NullPointerException if the specified collection or any
    243      *         of its elements are null
    244      */
    245     public PriorityBlockingQueue(Collection<? extends E> c) {
    246         this.lock = new ReentrantLock();
    247         this.notEmpty = lock.newCondition();
    248         boolean heapify = true; // true if not known to be in heap order
    249         boolean screen = true;  // true if must screen for nulls
    250         if (c instanceof SortedSet<?>) {
    251             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
    252             this.comparator = (Comparator<? super E>) ss.comparator();
    253             heapify = false;
    254         }
    255         else if (c instanceof PriorityBlockingQueue<?>) {
    256             PriorityBlockingQueue<? extends E> pq =
    257                 (PriorityBlockingQueue<? extends E>) c;
    258             this.comparator = (Comparator<? super E>) pq.comparator();
    259             screen = false;
    260             if (pq.getClass() == PriorityBlockingQueue.class) // exact match
    261                 heapify = false;
    262         }
    263         Object[] a = c.toArray();
    264         int n = a.length;
    265         // If c.toArray incorrectly doesn't return Object[], copy it.
    266         if (a.getClass() != Object[].class)
    267             a = Arrays.copyOf(a, n, Object[].class);
    268         if (screen && (n == 1 || this.comparator != null)) {
    269             for (int i = 0; i < n; ++i)
    270                 if (a[i] == null)
    271                     throw new NullPointerException();
    272         }
    273         this.queue = a;
    274         this.size = n;
    275         if (heapify)
    276             heapify();
    277     }
    278 
    279     /**
    280      * Tries to grow array to accommodate at least one more element
    281      * (but normally expand by about 50%), giving up (allowing retry)
    282      * on contention (which we expect to be rare). Call only while
    283      * holding lock.
    284      *
    285      * @param array the heap array
    286      * @param oldCap the length of the array
    287      */
    288     private void tryGrow(Object[] array, int oldCap) {
    289         lock.unlock(); // must release and then re-acquire main lock
    290         Object[] newArray = null;
    291         if (allocationSpinLock == 0 &&
    292             U.compareAndSwapInt(this, ALLOCATIONSPINLOCK, 0, 1)) {
    293             try {
    294                 int newCap = oldCap + ((oldCap < 64) ?
    295                                        (oldCap + 2) : // grow faster if small
    296                                        (oldCap >> 1));
    297                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
    298                     int minCap = oldCap + 1;
    299                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
    300                         throw new OutOfMemoryError();
    301                     newCap = MAX_ARRAY_SIZE;
    302                 }
    303                 if (newCap > oldCap && queue == array)
    304                     newArray = new Object[newCap];
    305             } finally {
    306                 allocationSpinLock = 0;
    307             }
    308         }
    309         if (newArray == null) // back off if another thread is allocating
    310             Thread.yield();
    311         lock.lock();
    312         if (newArray != null && queue == array) {
    313             queue = newArray;
    314             System.arraycopy(array, 0, newArray, 0, oldCap);
    315         }
    316     }
    317 
    318     /**
    319      * Mechanics for poll().  Call only while holding lock.
    320      */
    321     private E dequeue() {
    322         int n = size - 1;
    323         if (n < 0)
    324             return null;
    325         else {
    326             Object[] array = queue;
    327             E result = (E) array[0];
    328             E x = (E) array[n];
    329             array[n] = null;
    330             Comparator<? super E> cmp = comparator;
    331             if (cmp == null)
    332                 siftDownComparable(0, x, array, n);
    333             else
    334                 siftDownUsingComparator(0, x, array, n, cmp);
    335             size = n;
    336             return result;
    337         }
    338     }
    339 
    340     /**
    341      * Inserts item x at position k, maintaining heap invariant by
    342      * promoting x up the tree until it is greater than or equal to
    343      * its parent, or is the root.
    344      *
    345      * To simplify and speed up coercions and comparisons. the
    346      * Comparable and Comparator versions are separated into different
    347      * methods that are otherwise identical. (Similarly for siftDown.)
    348      * These methods are static, with heap state as arguments, to
    349      * simplify use in light of possible comparator exceptions.
    350      *
    351      * @param k the position to fill
    352      * @param x the item to insert
    353      * @param array the heap array
    354      */
    355     private static <T> void siftUpComparable(int k, T x, Object[] array) {
    356         Comparable<? super T> key = (Comparable<? super T>) x;
    357         while (k > 0) {
    358             int parent = (k - 1) >>> 1;
    359             Object e = array[parent];
    360             if (key.compareTo((T) e) >= 0)
    361                 break;
    362             array[k] = e;
    363             k = parent;
    364         }
    365         array[k] = key;
    366     }
    367 
    368     private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
    369                                        Comparator<? super T> cmp) {
    370         while (k > 0) {
    371             int parent = (k - 1) >>> 1;
    372             Object e = array[parent];
    373             if (cmp.compare(x, (T) e) >= 0)
    374                 break;
    375             array[k] = e;
    376             k = parent;
    377         }
    378         array[k] = x;
    379     }
    380 
    381     /**
    382      * Inserts item x at position k, maintaining heap invariant by
    383      * demoting x down the tree repeatedly until it is less than or
    384      * equal to its children or is a leaf.
    385      *
    386      * @param k the position to fill
    387      * @param x the item to insert
    388      * @param array the heap array
    389      * @param n heap size
    390      */
    391     private static <T> void siftDownComparable(int k, T x, Object[] array,
    392                                                int n) {
    393         if (n > 0) {
    394             Comparable<? super T> key = (Comparable<? super T>)x;
    395             int half = n >>> 1;           // loop while a non-leaf
    396             while (k < half) {
    397                 int child = (k << 1) + 1; // assume left child is least
    398                 Object c = array[child];
    399                 int right = child + 1;
    400                 if (right < n &&
    401                     ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
    402                     c = array[child = right];
    403                 if (key.compareTo((T) c) <= 0)
    404                     break;
    405                 array[k] = c;
    406                 k = child;
    407             }
    408             array[k] = key;
    409         }
    410     }
    411 
    412     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
    413                                                     int n,
    414                                                     Comparator<? super T> cmp) {
    415         if (n > 0) {
    416             int half = n >>> 1;
    417             while (k < half) {
    418                 int child = (k << 1) + 1;
    419                 Object c = array[child];
    420                 int right = child + 1;
    421                 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
    422                     c = array[child = right];
    423                 if (cmp.compare(x, (T) c) <= 0)
    424                     break;
    425                 array[k] = c;
    426                 k = child;
    427             }
    428             array[k] = x;
    429         }
    430     }
    431 
    432     /**
    433      * Establishes the heap invariant (described above) in the entire tree,
    434      * assuming nothing about the order of the elements prior to the call.
    435      */
    436     private void heapify() {
    437         Object[] array = queue;
    438         int n = size;
    439         int half = (n >>> 1) - 1;
    440         Comparator<? super E> cmp = comparator;
    441         if (cmp == null) {
    442             for (int i = half; i >= 0; i--)
    443                 siftDownComparable(i, (E) array[i], array, n);
    444         }
    445         else {
    446             for (int i = half; i >= 0; i--)
    447                 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
    448         }
    449     }
    450 
    451     /**
    452      * Inserts the specified element into this priority queue.
    453      *
    454      * @param e the element to add
    455      * @return {@code true} (as specified by {@link Collection#add})
    456      * @throws ClassCastException if the specified element cannot be compared
    457      *         with elements currently in the priority queue according to the
    458      *         priority queue's ordering
    459      * @throws NullPointerException if the specified element is null
    460      */
    461     public boolean add(E e) {
    462         return offer(e);
    463     }
    464 
    465     /**
    466      * Inserts the specified element into this priority queue.
    467      * As the queue is unbounded, this method will never return {@code false}.
    468      *
    469      * @param e the element to add
    470      * @return {@code true} (as specified by {@link Queue#offer})
    471      * @throws ClassCastException if the specified element cannot be compared
    472      *         with elements currently in the priority queue according to the
    473      *         priority queue's ordering
    474      * @throws NullPointerException if the specified element is null
    475      */
    476     public boolean offer(E e) {
    477         if (e == null)
    478             throw new NullPointerException();
    479         final ReentrantLock lock = this.lock;
    480         lock.lock();
    481         int n, cap;
    482         Object[] array;
    483         while ((n = size) >= (cap = (array = queue).length))
    484             tryGrow(array, cap);
    485         try {
    486             Comparator<? super E> cmp = comparator;
    487             if (cmp == null)
    488                 siftUpComparable(n, e, array);
    489             else
    490                 siftUpUsingComparator(n, e, array, cmp);
    491             size = n + 1;
    492             notEmpty.signal();
    493         } finally {
    494             lock.unlock();
    495         }
    496         return true;
    497     }
    498 
    499     /**
    500      * Inserts the specified element into this priority queue.
    501      * As the queue is unbounded, this method will never block.
    502      *
    503      * @param e the element to add
    504      * @throws ClassCastException if the specified element cannot be compared
    505      *         with elements currently in the priority queue according to the
    506      *         priority queue's ordering
    507      * @throws NullPointerException if the specified element is null
    508      */
    509     public void put(E e) {
    510         offer(e); // never need to block
    511     }
    512 
    513     /**
    514      * Inserts the specified element into this priority queue.
    515      * As the queue is unbounded, this method will never block or
    516      * return {@code false}.
    517      *
    518      * @param e the element to add
    519      * @param timeout This parameter is ignored as the method never blocks
    520      * @param unit This parameter is ignored as the method never blocks
    521      * @return {@code true} (as specified by
    522      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
    523      * @throws ClassCastException if the specified element cannot be compared
    524      *         with elements currently in the priority queue according to the
    525      *         priority queue's ordering
    526      * @throws NullPointerException if the specified element is null
    527      */
    528     public boolean offer(E e, long timeout, TimeUnit unit) {
    529         return offer(e); // never need to block
    530     }
    531 
    532     public E poll() {
    533         final ReentrantLock lock = this.lock;
    534         lock.lock();
    535         try {
    536             return dequeue();
    537         } finally {
    538             lock.unlock();
    539         }
    540     }
    541 
    542     public E take() throws InterruptedException {
    543         final ReentrantLock lock = this.lock;
    544         lock.lockInterruptibly();
    545         E result;
    546         try {
    547             while ( (result = dequeue()) == null)
    548                 notEmpty.await();
    549         } finally {
    550             lock.unlock();
    551         }
    552         return result;
    553     }
    554 
    555     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    556         long nanos = unit.toNanos(timeout);
    557         final ReentrantLock lock = this.lock;
    558         lock.lockInterruptibly();
    559         E result;
    560         try {
    561             while ( (result = dequeue()) == null && nanos > 0)
    562                 nanos = notEmpty.awaitNanos(nanos);
    563         } finally {
    564             lock.unlock();
    565         }
    566         return result;
    567     }
    568 
    569     public E peek() {
    570         final ReentrantLock lock = this.lock;
    571         lock.lock();
    572         try {
    573             return (size == 0) ? null : (E) queue[0];
    574         } finally {
    575             lock.unlock();
    576         }
    577     }
    578 
    579     /**
    580      * Returns the comparator used to order the elements in this queue,
    581      * or {@code null} if this queue uses the {@linkplain Comparable
    582      * natural ordering} of its elements.
    583      *
    584      * @return the comparator used to order the elements in this queue,
    585      *         or {@code null} if this queue uses the natural
    586      *         ordering of its elements
    587      */
    588     public Comparator<? super E> comparator() {
    589         return comparator;
    590     }
    591 
    592     public int size() {
    593         final ReentrantLock lock = this.lock;
    594         lock.lock();
    595         try {
    596             return size;
    597         } finally {
    598             lock.unlock();
    599         }
    600     }
    601 
    602     /**
    603      * Always returns {@code Integer.MAX_VALUE} because
    604      * a {@code PriorityBlockingQueue} is not capacity constrained.
    605      * @return {@code Integer.MAX_VALUE} always
    606      */
    607     public int remainingCapacity() {
    608         return Integer.MAX_VALUE;
    609     }
    610 
    611     private int indexOf(Object o) {
    612         if (o != null) {
    613             Object[] array = queue;
    614             int n = size;
    615             for (int i = 0; i < n; i++)
    616                 if (o.equals(array[i]))
    617                     return i;
    618         }
    619         return -1;
    620     }
    621 
    622     /**
    623      * Removes the ith element from queue.
    624      */
    625     private void removeAt(int i) {
    626         Object[] array = queue;
    627         int n = size - 1;
    628         if (n == i) // removed last element
    629             array[i] = null;
    630         else {
    631             E moved = (E) array[n];
    632             array[n] = null;
    633             Comparator<? super E> cmp = comparator;
    634             if (cmp == null)
    635                 siftDownComparable(i, moved, array, n);
    636             else
    637                 siftDownUsingComparator(i, moved, array, n, cmp);
    638             if (array[i] == moved) {
    639                 if (cmp == null)
    640                     siftUpComparable(i, moved, array);
    641                 else
    642                     siftUpUsingComparator(i, moved, array, cmp);
    643             }
    644         }
    645         size = n;
    646     }
    647 
    648     /**
    649      * Removes a single instance of the specified element from this queue,
    650      * if it is present.  More formally, removes an element {@code e} such
    651      * that {@code o.equals(e)}, if this queue contains one or more such
    652      * elements.  Returns {@code true} if and only if this queue contained
    653      * the specified element (or equivalently, if this queue changed as a
    654      * result of the call).
    655      *
    656      * @param o element to be removed from this queue, if present
    657      * @return {@code true} if this queue changed as a result of the call
    658      */
    659     public boolean remove(Object o) {
    660         final ReentrantLock lock = this.lock;
    661         lock.lock();
    662         try {
    663             int i = indexOf(o);
    664             if (i == -1)
    665                 return false;
    666             removeAt(i);
    667             return true;
    668         } finally {
    669             lock.unlock();
    670         }
    671     }
    672 
    673     /**
    674      * Identity-based version for use in Itr.remove.
    675      */
    676     void removeEQ(Object o) {
    677         final ReentrantLock lock = this.lock;
    678         lock.lock();
    679         try {
    680             Object[] array = queue;
    681             for (int i = 0, n = size; i < n; i++) {
    682                 if (o == array[i]) {
    683                     removeAt(i);
    684                     break;
    685                 }
    686             }
    687         } finally {
    688             lock.unlock();
    689         }
    690     }
    691 
    692     /**
    693      * Returns {@code true} if this queue contains the specified element.
    694      * More formally, returns {@code true} if and only if this queue contains
    695      * at least one element {@code e} such that {@code o.equals(e)}.
    696      *
    697      * @param o object to be checked for containment in this queue
    698      * @return {@code true} if this queue contains the specified element
    699      */
    700     public boolean contains(Object o) {
    701         final ReentrantLock lock = this.lock;
    702         lock.lock();
    703         try {
    704             return indexOf(o) != -1;
    705         } finally {
    706             lock.unlock();
    707         }
    708     }
    709 
    710     public String toString() {
    711         return Helpers.collectionToString(this);
    712     }
    713 
    714     /**
    715      * @throws UnsupportedOperationException {@inheritDoc}
    716      * @throws ClassCastException            {@inheritDoc}
    717      * @throws NullPointerException          {@inheritDoc}
    718      * @throws IllegalArgumentException      {@inheritDoc}
    719      */
    720     public int drainTo(Collection<? super E> c) {
    721         return drainTo(c, Integer.MAX_VALUE);
    722     }
    723 
    724     /**
    725      * @throws UnsupportedOperationException {@inheritDoc}
    726      * @throws ClassCastException            {@inheritDoc}
    727      * @throws NullPointerException          {@inheritDoc}
    728      * @throws IllegalArgumentException      {@inheritDoc}
    729      */
    730     public int drainTo(Collection<? super E> c, int maxElements) {
    731         if (c == null)
    732             throw new NullPointerException();
    733         if (c == this)
    734             throw new IllegalArgumentException();
    735         if (maxElements <= 0)
    736             return 0;
    737         final ReentrantLock lock = this.lock;
    738         lock.lock();
    739         try {
    740             int n = Math.min(size, maxElements);
    741             for (int i = 0; i < n; i++) {
    742                 c.add((E) queue[0]); // In this order, in case add() throws.
    743                 dequeue();
    744             }
    745             return n;
    746         } finally {
    747             lock.unlock();
    748         }
    749     }
    750 
    751     /**
    752      * Atomically removes all of the elements from this queue.
    753      * The queue will be empty after this call returns.
    754      */
    755     public void clear() {
    756         final ReentrantLock lock = this.lock;
    757         lock.lock();
    758         try {
    759             Object[] array = queue;
    760             int n = size;
    761             size = 0;
    762             for (int i = 0; i < n; i++)
    763                 array[i] = null;
    764         } finally {
    765             lock.unlock();
    766         }
    767     }
    768 
    769     /**
    770      * Returns an array containing all of the elements in this queue.
    771      * The returned array elements are in no particular order.
    772      *
    773      * <p>The returned array will be "safe" in that no references to it are
    774      * maintained by this queue.  (In other words, this method must allocate
    775      * a new array).  The caller is thus free to modify the returned array.
    776      *
    777      * <p>This method acts as bridge between array-based and collection-based
    778      * APIs.
    779      *
    780      * @return an array containing all of the elements in this queue
    781      */
    782     public Object[] toArray() {
    783         final ReentrantLock lock = this.lock;
    784         lock.lock();
    785         try {
    786             return Arrays.copyOf(queue, size);
    787         } finally {
    788             lock.unlock();
    789         }
    790     }
    791 
    792     /**
    793      * Returns an array containing all of the elements in this queue; the
    794      * runtime type of the returned array is that of the specified array.
    795      * The returned array elements are in no particular order.
    796      * If the queue fits in the specified array, it is returned therein.
    797      * Otherwise, a new array is allocated with the runtime type of the
    798      * specified array and the size of this queue.
    799      *
    800      * <p>If this queue fits in the specified array with room to spare
    801      * (i.e., the array has more elements than this queue), the element in
    802      * the array immediately following the end of the queue is set to
    803      * {@code null}.
    804      *
    805      * <p>Like the {@link #toArray()} method, this method acts as bridge between
    806      * array-based and collection-based APIs.  Further, this method allows
    807      * precise control over the runtime type of the output array, and may,
    808      * under certain circumstances, be used to save allocation costs.
    809      *
    810      * <p>Suppose {@code x} is a queue known to contain only strings.
    811      * The following code can be used to dump the queue into a newly
    812      * allocated array of {@code String}:
    813      *
    814      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
    815      *
    816      * Note that {@code toArray(new Object[0])} is identical in function to
    817      * {@code toArray()}.
    818      *
    819      * @param a the array into which the elements of the queue are to
    820      *          be stored, if it is big enough; otherwise, a new array of the
    821      *          same runtime type is allocated for this purpose
    822      * @return an array containing all of the elements in this queue
    823      * @throws ArrayStoreException if the runtime type of the specified array
    824      *         is not a supertype of the runtime type of every element in
    825      *         this queue
    826      * @throws NullPointerException if the specified array is null
    827      */
    828     public <T> T[] toArray(T[] a) {
    829         final ReentrantLock lock = this.lock;
    830         lock.lock();
    831         try {
    832             int n = size;
    833             if (a.length < n)
    834                 // Make a new array of a's runtime type, but my contents:
    835                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
    836             System.arraycopy(queue, 0, a, 0, n);
    837             if (a.length > n)
    838                 a[n] = null;
    839             return a;
    840         } finally {
    841             lock.unlock();
    842         }
    843     }
    844 
    845     /**
    846      * Returns an iterator over the elements in this queue. The
    847      * iterator does not return the elements in any particular order.
    848      *
    849      * <p>The returned iterator is
    850      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
    851      *
    852      * @return an iterator over the elements in this queue
    853      */
    854     public Iterator<E> iterator() {
    855         return new Itr(toArray());
    856     }
    857 
    858     /**
    859      * Snapshot iterator that works off copy of underlying q array.
    860      */
    861     final class Itr implements Iterator<E> {
    862         final Object[] array; // Array of all elements
    863         int cursor;           // index of next element to return
    864         int lastRet;          // index of last element, or -1 if no such
    865 
    866         Itr(Object[] array) {
    867             lastRet = -1;
    868             this.array = array;
    869         }
    870 
    871         public boolean hasNext() {
    872             return cursor < array.length;
    873         }
    874 
    875         public E next() {
    876             if (cursor >= array.length)
    877                 throw new NoSuchElementException();
    878             lastRet = cursor;
    879             return (E)array[cursor++];
    880         }
    881 
    882         public void remove() {
    883             if (lastRet < 0)
    884                 throw new IllegalStateException();
    885             removeEQ(array[lastRet]);
    886             lastRet = -1;
    887         }
    888     }
    889 
    890     /**
    891      * Saves this queue to a stream (that is, serializes it).
    892      *
    893      * For compatibility with previous version of this class, elements
    894      * are first copied to a java.util.PriorityQueue, which is then
    895      * serialized.
    896      *
    897      * @param s the stream
    898      * @throws java.io.IOException if an I/O error occurs
    899      */
    900     private void writeObject(java.io.ObjectOutputStream s)
    901         throws java.io.IOException {
    902         lock.lock();
    903         try {
    904             // avoid zero capacity argument
    905             q = new PriorityQueue<E>(Math.max(size, 1), comparator);
    906             q.addAll(this);
    907             s.defaultWriteObject();
    908         } finally {
    909             q = null;
    910             lock.unlock();
    911         }
    912     }
    913 
    914     /**
    915      * Reconstitutes this queue from a stream (that is, deserializes it).
    916      * @param s the stream
    917      * @throws ClassNotFoundException if the class of a serialized object
    918      *         could not be found
    919      * @throws java.io.IOException if an I/O error occurs
    920      */
    921     private void readObject(java.io.ObjectInputStream s)
    922         throws java.io.IOException, ClassNotFoundException {
    923         try {
    924             s.defaultReadObject();
    925             this.queue = new Object[q.size()];
    926             comparator = q.comparator();
    927             addAll(q);
    928         } finally {
    929             q = null;
    930         }
    931     }
    932 
    933     // Similar to Collections.ArraySnapshotSpliterator but avoids
    934     // commitment to toArray until needed
    935     static final class PBQSpliterator<E> implements Spliterator<E> {
    936         final PriorityBlockingQueue<E> queue;
    937         Object[] array;
    938         int index;
    939         int fence;
    940 
    941         PBQSpliterator(PriorityBlockingQueue<E> queue, Object[] array,
    942                        int index, int fence) {
    943             this.queue = queue;
    944             this.array = array;
    945             this.index = index;
    946             this.fence = fence;
    947         }
    948 
    949         final int getFence() {
    950             int hi;
    951             if ((hi = fence) < 0)
    952                 hi = fence = (array = queue.toArray()).length;
    953             return hi;
    954         }
    955 
    956         public PBQSpliterator<E> trySplit() {
    957             int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
    958             return (lo >= mid) ? null :
    959                 new PBQSpliterator<E>(queue, array, lo, index = mid);
    960         }
    961 
    962         @SuppressWarnings("unchecked")
    963         public void forEachRemaining(Consumer<? super E> action) {
    964             Object[] a; int i, hi; // hoist accesses and checks from loop
    965             if (action == null)
    966                 throw new NullPointerException();
    967             if ((a = array) == null)
    968                 fence = (a = queue.toArray()).length;
    969             if ((hi = fence) <= a.length &&
    970                 (i = index) >= 0 && i < (index = hi)) {
    971                 do { action.accept((E)a[i]); } while (++i < hi);
    972             }
    973         }
    974 
    975         public boolean tryAdvance(Consumer<? super E> action) {
    976             if (action == null)
    977                 throw new NullPointerException();
    978             if (getFence() > index && index >= 0) {
    979                 @SuppressWarnings("unchecked") E e = (E) array[index++];
    980                 action.accept(e);
    981                 return true;
    982             }
    983             return false;
    984         }
    985 
    986         public long estimateSize() { return (long)(getFence() - index); }
    987 
    988         public int characteristics() {
    989             return Spliterator.NONNULL | Spliterator.SIZED | Spliterator.SUBSIZED;
    990         }
    991     }
    992 
    993     /**
    994      * Returns a {@link Spliterator} over the elements in this queue.
    995      *
    996      * <p>The returned spliterator is
    997      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
    998      *
    999      * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
   1000      * {@link Spliterator#NONNULL}.
   1001      *
   1002      * @implNote
   1003      * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
   1004      *
   1005      * @return a {@code Spliterator} over the elements in this queue
   1006      * @since 1.8
   1007      */
   1008     public Spliterator<E> spliterator() {
   1009         return new PBQSpliterator<E>(this, null, 0, -1);
   1010     }
   1011 
   1012     // Unsafe mechanics
   1013     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
   1014     private static final long ALLOCATIONSPINLOCK;
   1015     static {
   1016         try {
   1017             ALLOCATIONSPINLOCK = U.objectFieldOffset
   1018                 (PriorityBlockingQueue.class.getDeclaredField("allocationSpinLock"));
   1019         } catch (ReflectiveOperationException e) {
   1020             throw new Error(e);
   1021         }
   1022     }
   1023 }
   1024