1 /* 2 * Copyright (C) 2010 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import java.util.AbstractQueue; 22 import java.util.Collection; 23 import java.util.Comparator; 24 import java.util.ConcurrentModificationException; 25 import java.util.Iterator; 26 import java.util.NoSuchElementException; 27 import java.util.PriorityQueue; 28 import java.util.Queue; 29 import java.util.SortedSet; 30 import java.util.concurrent.BlockingQueue; 31 import java.util.concurrent.TimeUnit; 32 33 import javax.annotation.Nullable; 34 35 /** 36 * An unbounded {@linkplain BlockingQueue blocking queue} that uses 37 * the same ordering rules as class {@link PriorityQueue} and supplies 38 * blocking retrieval operations. While this queue is logically 39 * unbounded, attempted additions may fail due to resource exhaustion 40 * (causing <tt>OutOfMemoryError</tt>). This class does not permit 41 * <tt>null</tt> elements. A priority queue relying on {@linkplain 42 * Comparable natural ordering} also does not permit insertion of 43 * non-comparable objects (doing so results in 44 * <tt>ClassCastException</tt>). 45 * 46 * <p>This class and its iterator implement all of the 47 * <em>optional</em> methods of the {@link Collection} and {@link 48 * Iterator} interfaces. The Iterator provided in method {@link 49 * #iterator()} is <em>not</em> guaranteed to traverse the elements of 50 * the MonitorBasedPriorityBlockingQueue in any particular order. If you need 51 * ordered traversal, consider using 52 * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt> 53 * can be used to <em>remove</em> some or all elements in priority 54 * order and place them in another collection. 55 * 56 * <p>Operations on this class make no guarantees about the ordering 57 * of elements with equal priority. If you need to enforce an 58 * ordering, you can define custom classes or comparators that use a 59 * secondary key to break ties in primary priority values. For 60 * example, here is a class that applies first-in-first-out 61 * tie-breaking to comparable elements. To use it, you would insert a 62 * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object. 63 * 64 * <pre> 65 * class FIFOEntry<E extends Comparable<? super E>> 66 * implements Comparable<FIFOEntry<E>> { 67 * final static AtomicLong seq = new AtomicLong(); 68 * final long seqNum; 69 * final E entry; 70 * public FIFOEntry(E entry) { 71 * seqNum = seq.getAndIncrement(); 72 * this.entry = entry; 73 * } 74 * public E getEntry() { return entry; } 75 * public int compareTo(FIFOEntry<E> other) { 76 * int res = entry.compareTo(other.entry); 77 * if (res == 0 && other.entry != this.entry) 78 * res = (seqNum < other.seqNum ? -1 : 1); 79 * return res; 80 * } 81 * }</pre> 82 * 83 * @author Doug Lea 84 * @author Justin T. Sampson 85 * @param <E> the type of elements held in this collection 86 */ 87 public class MonitorBasedPriorityBlockingQueue<E> extends AbstractQueue<E> 88 implements BlockingQueue<E> { 89 90 // Based on revision 1.55 of PriorityBlockingQueue by Doug Lea, from 91 // http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/ 92 93 private static final long serialVersionUID = 5595510919245408276L; 94 95 final PriorityQueue<E> q; 96 final Monitor monitor = new Monitor(true); 97 private final Monitor.Guard notEmpty = 98 new Monitor.Guard(monitor) { 99 @Override public boolean isSatisfied() { 100 return !q.isEmpty(); 101 } 102 }; 103 104 /** 105 * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the default 106 * initial capacity (11) that orders its elements according to 107 * their {@linkplain Comparable natural ordering}. 108 */ 109 public MonitorBasedPriorityBlockingQueue() { 110 q = new PriorityQueue<E>(); 111 } 112 113 /** 114 * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the specified 115 * initial capacity that orders its elements according to their 116 * {@linkplain Comparable natural ordering}. 117 * 118 * @param initialCapacity the initial capacity for this priority queue 119 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less 120 * than 1 121 */ 122 public MonitorBasedPriorityBlockingQueue(int initialCapacity) { 123 q = new PriorityQueue<E>(initialCapacity, null); 124 } 125 126 /** 127 * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the specified initial 128 * capacity that orders its elements according to the specified 129 * comparator. 130 * 131 * @param initialCapacity the initial capacity for this priority queue 132 * @param comparator the comparator that will be used to order this 133 * priority queue. If {@code null}, the {@linkplain Comparable 134 * natural ordering} of the elements will be used. 135 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less 136 * than 1 137 */ 138 public MonitorBasedPriorityBlockingQueue(int initialCapacity, 139 @Nullable Comparator<? super E> comparator) { 140 q = new PriorityQueue<E>(initialCapacity, comparator); 141 } 142 143 /** 144 * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> containing the elements 145 * in the specified collection. If the specified collection is a 146 * {@link SortedSet} or a {@link PriorityQueue}, this 147 * priority queue will be ordered according to the same ordering. 148 * Otherwise, this priority queue will be ordered according to the 149 * {@linkplain Comparable natural ordering} of its elements. 150 * 151 * @param c the collection whose elements are to be placed 152 * into this priority queue 153 * @throws ClassCastException if elements of the specified collection 154 * cannot be compared to one another according to the priority 155 * queue's ordering 156 * @throws NullPointerException if the specified collection or any 157 * of its elements are null 158 */ 159 public MonitorBasedPriorityBlockingQueue(Collection<? extends E> c) { 160 q = new PriorityQueue<E>(c); 161 } 162 163 /** 164 * Inserts the specified element into this priority queue. 165 * 166 * @param e the element to add 167 * @return <tt>true</tt> (as specified by {@link Collection#add}) 168 * @throws ClassCastException if the specified element cannot be compared 169 * with elements currently in the priority queue according to the 170 * priority queue's ordering 171 * @throws NullPointerException if the specified element is null 172 */ 173 @Override public boolean add(E e) { 174 return offer(e); 175 } 176 177 /** 178 * Inserts the specified element into this priority queue. 179 * 180 * @param e the element to add 181 * @return <tt>true</tt> (as specified by {@link Queue#offer}) 182 * @throws ClassCastException if the specified element cannot be compared 183 * with elements currently in the priority queue according to the 184 * priority queue's ordering 185 * @throws NullPointerException if the specified element is null 186 */ 187 @Override 188 public boolean offer(E e) { 189 final Monitor monitor = this.monitor; 190 monitor.enter(); 191 try { 192 boolean ok = q.offer(e); 193 if (!ok) { 194 throw new AssertionError(); 195 } 196 return true; 197 } finally { 198 monitor.leave(); 199 } 200 } 201 202 /** 203 * Inserts the specified element into this priority queue. As the queue is 204 * unbounded this method will never block. 205 * 206 * @param e the element to add 207 * @throws ClassCastException if the specified element cannot be compared 208 * with elements currently in the priority queue according to the 209 * priority queue's ordering 210 * @throws NullPointerException if the specified element is null 211 */ 212 @Override 213 public void put(E e) { 214 offer(e); // never need to block 215 } 216 217 /** 218 * Inserts the specified element into this priority queue. As the queue is 219 * unbounded this method will never block. 220 * 221 * @param e the element to add 222 * @param timeout This parameter is ignored as the method never blocks 223 * @param unit This parameter is ignored as the method never blocks 224 * @return <tt>true</tt> 225 * @throws ClassCastException if the specified element cannot be compared 226 * with elements currently in the priority queue according to the 227 * priority queue's ordering 228 * @throws NullPointerException if the specified element is null 229 */ 230 @Override 231 public boolean offer(E e, long timeout, TimeUnit unit) { 232 checkNotNull(unit); 233 return offer(e); // never need to block 234 } 235 236 @Override 237 public E poll() { 238 final Monitor monitor = this.monitor; 239 monitor.enter(); 240 try { 241 return q.poll(); 242 } finally { 243 monitor.leave(); 244 } 245 } 246 247 @Override 248 public E take() throws InterruptedException { 249 final Monitor monitor = this.monitor; 250 monitor.enterWhen(notEmpty); 251 try { 252 return q.poll(); 253 } finally { 254 monitor.leave(); 255 } 256 } 257 258 @Override 259 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 260 final Monitor monitor = this.monitor; 261 if (monitor.enterWhen(notEmpty, timeout, unit)) { 262 try { 263 return q.poll(); 264 } finally { 265 monitor.leave(); 266 } 267 } else { 268 return null; 269 } 270 } 271 272 @Override 273 public E peek() { 274 final Monitor monitor = this.monitor; 275 monitor.enter(); 276 try { 277 return q.peek(); 278 } finally { 279 monitor.leave(); 280 } 281 } 282 283 /** 284 * Returns the comparator used to order the elements in this queue, 285 * or <tt>null</tt> if this queue uses the {@linkplain Comparable 286 * natural ordering} of its elements. 287 * 288 * @return the comparator used to order the elements in this queue, 289 * or <tt>null</tt> if this queue uses the natural 290 * ordering of its elements 291 */ 292 public Comparator<? super E> comparator() { 293 return q.comparator(); 294 } 295 296 @Override public int size() { 297 final Monitor monitor = this.monitor; 298 monitor.enter(); 299 try { 300 return q.size(); 301 } finally { 302 monitor.leave(); 303 } 304 } 305 306 /** 307 * Always returns <tt>Integer.MAX_VALUE</tt> because 308 * a <tt>MonitorBasedPriorityBlockingQueue</tt> is not capacity constrained. 309 * @return <tt>Integer.MAX_VALUE</tt> 310 */ 311 @Override 312 public int remainingCapacity() { 313 return Integer.MAX_VALUE; 314 } 315 316 /** 317 * Removes a single instance of the specified element from this queue, 318 * if it is present. More formally, removes an element {@code e} such 319 * that {@code o.equals(e)}, if this queue contains one or more such 320 * elements. Returns {@code true} if and only if this queue contained 321 * the specified element (or equivalently, if this queue changed as a 322 * result of the call). 323 * 324 * @param o element to be removed from this queue, if present 325 * @return <tt>true</tt> if this queue changed as a result of the call 326 */ 327 @Override public boolean remove(@Nullable Object o) { 328 final Monitor monitor = this.monitor; 329 monitor.enter(); 330 try { 331 return q.remove(o); 332 } finally { 333 monitor.leave(); 334 } 335 } 336 337 /** 338 * Returns {@code true} if this queue contains the specified element. 339 * More formally, returns {@code true} if and only if this queue contains 340 * at least one element {@code e} such that {@code o.equals(e)}. 341 * 342 * @param o object to be checked for containment in this queue 343 * @return <tt>true</tt> if this queue contains the specified element 344 */ 345 @Override public boolean contains(@Nullable Object o) { 346 final Monitor monitor = this.monitor; 347 monitor.enter(); 348 try { 349 return q.contains(o); 350 } finally { 351 monitor.leave(); 352 } 353 } 354 355 /** 356 * Returns an array containing all of the elements in this queue. 357 * The returned array elements are in no particular order. 358 * 359 * <p>The returned array will be "safe" in that no references to it are 360 * maintained by this queue. (In other words, this method must allocate 361 * a new array). The caller is thus free to modify the returned array. 362 * 363 * <p>This method acts as bridge between array-based and collection-based 364 * APIs. 365 * 366 * @return an array containing all of the elements in this queue 367 */ 368 @Override public Object[] toArray() { 369 final Monitor monitor = this.monitor; 370 monitor.enter(); 371 try { 372 return q.toArray(); 373 } finally { 374 monitor.leave(); 375 } 376 } 377 378 @Override public String toString() { 379 final Monitor monitor = this.monitor; 380 monitor.enter(); 381 try { 382 return q.toString(); 383 } finally { 384 monitor.leave(); 385 } 386 } 387 388 /** 389 * @throws UnsupportedOperationException {@inheritDoc} 390 * @throws ClassCastException {@inheritDoc} 391 * @throws NullPointerException {@inheritDoc} 392 * @throws IllegalArgumentException {@inheritDoc} 393 */ 394 @Override 395 public int drainTo(Collection<? super E> c) { 396 if (c == null) 397 throw new NullPointerException(); 398 if (c == this) 399 throw new IllegalArgumentException(); 400 final Monitor monitor = this.monitor; 401 monitor.enter(); 402 try { 403 int n = 0; 404 E e; 405 while ( (e = q.poll()) != null) { 406 c.add(e); 407 ++n; 408 } 409 return n; 410 } finally { 411 monitor.leave(); 412 } 413 } 414 415 /** 416 * @throws UnsupportedOperationException {@inheritDoc} 417 * @throws ClassCastException {@inheritDoc} 418 * @throws NullPointerException {@inheritDoc} 419 * @throws IllegalArgumentException {@inheritDoc} 420 */ 421 @Override 422 public int drainTo(Collection<? super E> c, int maxElements) { 423 if (c == null) 424 throw new NullPointerException(); 425 if (c == this) 426 throw new IllegalArgumentException(); 427 if (maxElements <= 0) 428 return 0; 429 final Monitor monitor = this.monitor; 430 monitor.enter(); 431 try { 432 int n = 0; 433 E e; 434 while (n < maxElements && (e = q.poll()) != null) { 435 c.add(e); 436 ++n; 437 } 438 return n; 439 } finally { 440 monitor.leave(); 441 } 442 } 443 444 /** 445 * Atomically removes all of the elements from this queue. 446 * The queue will be empty after this call returns. 447 */ 448 @Override public void clear() { 449 final Monitor monitor = this.monitor; 450 monitor.enter(); 451 try { 452 q.clear(); 453 } finally { 454 monitor.leave(); 455 } 456 } 457 458 /** 459 * Returns an array containing all of the elements in this queue; the 460 * runtime type of the returned array is that of the specified array. 461 * The returned array elements are in no particular order. 462 * If the queue fits in the specified array, it is returned therein. 463 * Otherwise, a new array is allocated with the runtime type of the 464 * specified array and the size of this queue. 465 * 466 * <p>If this queue fits in the specified array with room to spare 467 * (i.e., the array has more elements than this queue), the element in 468 * the array immediately following the end of the queue is set to 469 * <tt>null</tt>. 470 * 471 * <p>Like the {@link #toArray()} method, this method acts as bridge between 472 * array-based and collection-based APIs. Further, this method allows 473 * precise control over the runtime type of the output array, and may, 474 * under certain circumstances, be used to save allocation costs. 475 * 476 * <p>Suppose <tt>x</tt> is a queue known to contain only strings. 477 * The following code can be used to dump the queue into a newly 478 * allocated array of <tt>String</tt>: 479 * 480 * <pre> 481 * String[] y = x.toArray(new String[0]);</pre> 482 * 483 * <p>Note that <tt>toArray(new Object[0])</tt> is identical in function to 484 * <tt>toArray()</tt>. 485 * 486 * @param a the array into which the elements of the queue are to 487 * be stored, if it is big enough; otherwise, a new array of the 488 * same runtime type is allocated for this purpose 489 * @return an array containing all of the elements in this queue 490 * @throws ArrayStoreException if the runtime type of the specified array 491 * is not a supertype of the runtime type of every element in 492 * this queue 493 * @throws NullPointerException if the specified array is null 494 */ 495 @Override public <T> T[] toArray(T[] a) { 496 final Monitor monitor = this.monitor; 497 monitor.enter(); 498 try { 499 return q.toArray(a); 500 } finally { 501 monitor.leave(); 502 } 503 } 504 505 /** 506 * Returns an iterator over the elements in this queue. The 507 * iterator does not return the elements in any particular order. 508 * The returned <tt>Iterator</tt> is a "weakly consistent" 509 * iterator that will never throw {@link 510 * ConcurrentModificationException}, and guarantees to traverse 511 * elements as they existed upon construction of the iterator, and 512 * may (but is not guaranteed to) reflect any modifications 513 * subsequent to construction. 514 * 515 * @return an iterator over the elements in this queue 516 */ 517 @Override public Iterator<E> iterator() { 518 return new Itr(toArray()); 519 } 520 521 /** 522 * Snapshot iterator that works off copy of underlying q array. 523 */ 524 private class Itr implements Iterator<E> { 525 final Object[] array; // Array of all elements 526 int cursor; // index of next element to return; 527 int lastRet; // index of last element, or -1 if no such 528 529 Itr(Object[] array) { 530 lastRet = -1; 531 this.array = array; 532 } 533 534 @Override 535 public boolean hasNext() { 536 return cursor < array.length; 537 } 538 539 @Override 540 public E next() { 541 if (cursor >= array.length) 542 throw new NoSuchElementException(); 543 lastRet = cursor; 544 545 // array comes from q.toArray() and so should have only E's in it 546 @SuppressWarnings("unchecked") 547 E e = (E) array[cursor++]; 548 return e; 549 } 550 551 @Override 552 public void remove() { 553 if (lastRet < 0) 554 throw new IllegalStateException(); 555 Object x = array[lastRet]; 556 lastRet = -1; 557 // Traverse underlying queue to find == element, 558 // not just a .equals element. 559 monitor.enter(); 560 try { 561 for (Iterator<E> it = q.iterator(); it.hasNext(); ) { 562 if (it.next() == x) { 563 it.remove(); 564 return; 565 } 566 } 567 } finally { 568 monitor.leave(); 569 } 570 } 571 } 572 573 /** 574 * Saves the state to a stream (that is, serializes it). This 575 * merely wraps default serialization within the monitor. The 576 * serialization strategy for items is left to underlying 577 * Queue. Note that locking is not needed on deserialization, so 578 * readObject is not defined, just relying on default. 579 */ 580 private void writeObject(java.io.ObjectOutputStream s) 581 throws java.io.IOException { 582 monitor.enter(); 583 try { 584 s.defaultWriteObject(); 585 } finally { 586 monitor.leave(); 587 } 588 } 589 590 } 591