1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 import java.util.concurrent.locks.Condition; 9 import java.util.concurrent.locks.ReentrantLock; 10 import java.util.AbstractQueue; 11 import java.util.Collection; 12 import java.util.Iterator; 13 import java.util.NoSuchElementException; 14 import java.lang.ref.WeakReference; 15 16 // BEGIN android-note 17 // removed link to collections framework docs 18 // END android-note 19 20 /** 21 * A bounded {@linkplain BlockingQueue blocking queue} backed by an 22 * array. This queue orders elements FIFO (first-in-first-out). The 23 * <em>head</em> of the queue is that element that has been on the 24 * queue the longest time. The <em>tail</em> of the queue is that 25 * element that has been on the queue the shortest time. New elements 26 * are inserted at the tail of the queue, and the queue retrieval 27 * operations obtain elements at the head of the queue. 28 * 29 * <p>This is a classic "bounded buffer", in which a 30 * fixed-sized array holds elements inserted by producers and 31 * extracted by consumers. Once created, the capacity cannot be 32 * changed. Attempts to {@code put} an element into a full queue 33 * will result in the operation blocking; attempts to {@code take} an 34 * element from an empty queue will similarly block. 35 * 36 * <p>This class supports an optional fairness policy for ordering 37 * waiting producer and consumer threads. By default, this ordering 38 * is not guaranteed. However, a queue constructed with fairness set 39 * to {@code true} grants threads access in FIFO order. Fairness 40 * generally decreases throughput but reduces variability and avoids 41 * starvation. 42 * 43 * <p>This class and its iterator implement all of the 44 * <em>optional</em> methods of the {@link Collection} and {@link 45 * Iterator} interfaces. 46 * 47 * @since 1.5 48 * @author Doug Lea 49 * @param <E> the type of elements held in this collection 50 */ 51 public class ArrayBlockingQueue<E> extends AbstractQueue<E> 52 implements BlockingQueue<E>, java.io.Serializable { 53 54 /** 55 * Serialization ID. This class relies on default serialization 56 * even for the items array, which is default-serialized, even if 57 * it is empty. Otherwise it could not be declared final, which is 58 * necessary here. 59 */ 60 private static final long serialVersionUID = -817911632652898426L; 61 62 /** The queued items */ 63 final Object[] items; 64 65 /** items index for next take, poll, peek or remove */ 66 int takeIndex; 67 68 /** items index for next put, offer, or add */ 69 int putIndex; 70 71 /** Number of elements in the queue */ 72 int count; 73 74 /* 75 * Concurrency control uses the classic two-condition algorithm 76 * found in any textbook. 77 */ 78 79 /** Main lock guarding all access */ 80 final ReentrantLock lock; 81 82 /** Condition for waiting takes */ 83 private final Condition notEmpty; 84 85 /** Condition for waiting puts */ 86 private final Condition notFull; 87 88 /** 89 * Shared state for currently active iterators, or null if there 90 * are known not to be any. Allows queue operations to update 91 * iterator state. 92 */ 93 transient Itrs itrs = null; 94 95 // Internal helper methods 96 97 /** 98 * Circularly increment i. 99 */ 100 final int inc(int i) { 101 return (++i == items.length) ? 0 : i; 102 } 103 104 /** 105 * Circularly decrement i. 106 */ 107 final int dec(int i) { 108 return ((i == 0) ? items.length : i) - 1; 109 } 110 111 /** 112 * Returns item at index i. 113 */ 114 final E itemAt(int i) { 115 @SuppressWarnings("unchecked") E x = (E) items[i]; 116 return x; 117 } 118 119 /** 120 * Throws NullPointerException if argument is null. 121 * 122 * @param v the element 123 */ 124 private static void checkNotNull(Object v) { 125 if (v == null) 126 throw new NullPointerException(); 127 } 128 129 /** 130 * Inserts element at current put position, advances, and signals. 131 * Call only when holding lock. 132 */ 133 private void enqueue(E x) { 134 // assert lock.getHoldCount() == 1; 135 // assert items[putIndex] == null; 136 items[putIndex] = x; 137 putIndex = inc(putIndex); 138 count++; 139 notEmpty.signal(); 140 } 141 142 /** 143 * Extracts element at current take position, advances, and signals. 144 * Call only when holding lock. 145 */ 146 private E dequeue() { 147 // assert lock.getHoldCount() == 1; 148 // assert items[takeIndex] != null; 149 final Object[] items = this.items; 150 @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; 151 items[takeIndex] = null; 152 takeIndex = inc(takeIndex); 153 count--; 154 if (itrs != null) 155 itrs.elementDequeued(); 156 notFull.signal(); 157 return x; 158 } 159 160 /** 161 * Deletes item at array index removeIndex. 162 * Utility for remove(Object) and iterator.remove. 163 * Call only when holding lock. 164 */ 165 void removeAt(final int removeIndex) { 166 // assert lock.getHoldCount() == 1; 167 // assert items[removeIndex] != null; 168 // assert removeIndex >= 0 && removeIndex < items.length; 169 final Object[] items = this.items; 170 if (removeIndex == takeIndex) { 171 // removing front item; just advance 172 items[takeIndex] = null; 173 takeIndex = inc(takeIndex); 174 count--; 175 if (itrs != null) 176 itrs.elementDequeued(); 177 } else { 178 // an "interior" remove 179 180 // slide over all others up through putIndex. 181 final int putIndex = this.putIndex; 182 for (int i = removeIndex;;) { 183 int next = inc(i); 184 if (next != putIndex) { 185 items[i] = items[next]; 186 i = next; 187 } else { 188 items[i] = null; 189 this.putIndex = i; 190 break; 191 } 192 } 193 count--; 194 if (itrs != null) 195 itrs.removedAt(removeIndex); 196 } 197 notFull.signal(); 198 } 199 200 /** 201 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 202 * capacity and default access policy. 203 * 204 * @param capacity the capacity of this queue 205 * @throws IllegalArgumentException if {@code capacity < 1} 206 */ 207 public ArrayBlockingQueue(int capacity) { 208 this(capacity, false); 209 } 210 211 /** 212 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 213 * capacity and the specified access policy. 214 * 215 * @param capacity the capacity of this queue 216 * @param fair if {@code true} then queue accesses for threads blocked 217 * on insertion or removal, are processed in FIFO order; 218 * if {@code false} the access order is unspecified. 219 * @throws IllegalArgumentException if {@code capacity < 1} 220 */ 221 public ArrayBlockingQueue(int capacity, boolean fair) { 222 if (capacity <= 0) 223 throw new IllegalArgumentException(); 224 this.items = new Object[capacity]; 225 lock = new ReentrantLock(fair); 226 notEmpty = lock.newCondition(); 227 notFull = lock.newCondition(); 228 } 229 230 /** 231 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 232 * capacity, the specified access policy and initially containing the 233 * elements of the given collection, 234 * added in traversal order of the collection's iterator. 235 * 236 * @param capacity the capacity of this queue 237 * @param fair if {@code true} then queue accesses for threads blocked 238 * on insertion or removal, are processed in FIFO order; 239 * if {@code false} the access order is unspecified. 240 * @param c the collection of elements to initially contain 241 * @throws IllegalArgumentException if {@code capacity} is less than 242 * {@code c.size()}, or less than 1. 243 * @throws NullPointerException if the specified collection or any 244 * of its elements are null 245 */ 246 public ArrayBlockingQueue(int capacity, boolean fair, 247 Collection<? extends E> c) { 248 this(capacity, fair); 249 250 final ReentrantLock lock = this.lock; 251 lock.lock(); // Lock only for visibility, not mutual exclusion 252 try { 253 int i = 0; 254 try { 255 for (E e : c) { 256 checkNotNull(e); 257 items[i++] = e; 258 } 259 } catch (ArrayIndexOutOfBoundsException ex) { 260 throw new IllegalArgumentException(); 261 } 262 count = i; 263 putIndex = (i == capacity) ? 0 : i; 264 } finally { 265 lock.unlock(); 266 } 267 } 268 269 /** 270 * Inserts the specified element at the tail of this queue if it is 271 * possible to do so immediately without exceeding the queue's capacity, 272 * returning {@code true} upon success and throwing an 273 * {@code IllegalStateException} if this queue is full. 274 * 275 * @param e the element to add 276 * @return {@code true} (as specified by {@link Collection#add}) 277 * @throws IllegalStateException if this queue is full 278 * @throws NullPointerException if the specified element is null 279 */ 280 public boolean add(E e) { 281 return super.add(e); 282 } 283 284 /** 285 * Inserts the specified element at the tail of this queue if it is 286 * possible to do so immediately without exceeding the queue's capacity, 287 * returning {@code true} upon success and {@code false} if this queue 288 * is full. This method is generally preferable to method {@link #add}, 289 * which can fail to insert an element only by throwing an exception. 290 * 291 * @throws NullPointerException if the specified element is null 292 */ 293 public boolean offer(E e) { 294 checkNotNull(e); 295 final ReentrantLock lock = this.lock; 296 lock.lock(); 297 try { 298 if (count == items.length) 299 return false; 300 else { 301 enqueue(e); 302 return true; 303 } 304 } finally { 305 lock.unlock(); 306 } 307 } 308 309 /** 310 * Inserts the specified element at the tail of this queue, waiting 311 * for space to become available if the queue is full. 312 * 313 * @throws InterruptedException {@inheritDoc} 314 * @throws NullPointerException {@inheritDoc} 315 */ 316 public void put(E e) throws InterruptedException { 317 checkNotNull(e); 318 final ReentrantLock lock = this.lock; 319 lock.lockInterruptibly(); 320 try { 321 while (count == items.length) 322 notFull.await(); 323 enqueue(e); 324 } finally { 325 lock.unlock(); 326 } 327 } 328 329 /** 330 * Inserts the specified element at the tail of this queue, waiting 331 * up to the specified wait time for space to become available if 332 * the queue is full. 333 * 334 * @throws InterruptedException {@inheritDoc} 335 * @throws NullPointerException {@inheritDoc} 336 */ 337 public boolean offer(E e, long timeout, TimeUnit unit) 338 throws InterruptedException { 339 340 checkNotNull(e); 341 long nanos = unit.toNanos(timeout); 342 final ReentrantLock lock = this.lock; 343 lock.lockInterruptibly(); 344 try { 345 while (count == items.length) { 346 if (nanos <= 0) 347 return false; 348 nanos = notFull.awaitNanos(nanos); 349 } 350 enqueue(e); 351 return true; 352 } finally { 353 lock.unlock(); 354 } 355 } 356 357 public E poll() { 358 final ReentrantLock lock = this.lock; 359 lock.lock(); 360 try { 361 return (count == 0) ? null : dequeue(); 362 } finally { 363 lock.unlock(); 364 } 365 } 366 367 public E take() throws InterruptedException { 368 final ReentrantLock lock = this.lock; 369 lock.lockInterruptibly(); 370 try { 371 while (count == 0) 372 notEmpty.await(); 373 return dequeue(); 374 } finally { 375 lock.unlock(); 376 } 377 } 378 379 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 380 long nanos = unit.toNanos(timeout); 381 final ReentrantLock lock = this.lock; 382 lock.lockInterruptibly(); 383 try { 384 while (count == 0) { 385 if (nanos <= 0) 386 return null; 387 nanos = notEmpty.awaitNanos(nanos); 388 } 389 return dequeue(); 390 } finally { 391 lock.unlock(); 392 } 393 } 394 395 public E peek() { 396 final ReentrantLock lock = this.lock; 397 lock.lock(); 398 try { 399 return (count == 0) ? null : itemAt(takeIndex); 400 } finally { 401 lock.unlock(); 402 } 403 } 404 405 // this doc comment is overridden to remove the reference to collections 406 // greater in size than Integer.MAX_VALUE 407 /** 408 * Returns the number of elements in this queue. 409 * 410 * @return the number of elements in this queue 411 */ 412 public int size() { 413 final ReentrantLock lock = this.lock; 414 lock.lock(); 415 try { 416 return count; 417 } finally { 418 lock.unlock(); 419 } 420 } 421 422 // this doc comment is a modified copy of the inherited doc comment, 423 // without the reference to unlimited queues. 424 /** 425 * Returns the number of additional elements that this queue can ideally 426 * (in the absence of memory or resource constraints) accept without 427 * blocking. This is always equal to the initial capacity of this queue 428 * less the current {@code size} of this queue. 429 * 430 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 431 * an element will succeed by inspecting {@code remainingCapacity} 432 * because it may be the case that another thread is about to 433 * insert or remove an element. 434 */ 435 public int remainingCapacity() { 436 final ReentrantLock lock = this.lock; 437 lock.lock(); 438 try { 439 return items.length - count; 440 } finally { 441 lock.unlock(); 442 } 443 } 444 445 /** 446 * Removes a single instance of the specified element from this queue, 447 * if it is present. More formally, removes an element {@code e} such 448 * that {@code o.equals(e)}, if this queue contains one or more such 449 * elements. 450 * Returns {@code true} if this queue contained the specified element 451 * (or equivalently, if this queue changed as a result of the call). 452 * 453 * <p>Removal of interior elements in circular array based queues 454 * is an intrinsically slow and disruptive operation, so should 455 * be undertaken only in exceptional circumstances, ideally 456 * only when the queue is known not to be accessible by other 457 * threads. 458 * 459 * @param o element to be removed from this queue, if present 460 * @return {@code true} if this queue changed as a result of the call 461 */ 462 public boolean remove(Object o) { 463 if (o == null) return false; 464 final Object[] items = this.items; 465 final ReentrantLock lock = this.lock; 466 lock.lock(); 467 try { 468 if (count > 0) { 469 final int putIndex = this.putIndex; 470 int i = takeIndex; 471 do { 472 if (o.equals(items[i])) { 473 removeAt(i); 474 return true; 475 } 476 } while ((i = inc(i)) != putIndex); 477 } 478 return false; 479 } finally { 480 lock.unlock(); 481 } 482 } 483 484 /** 485 * Returns {@code true} if this queue contains the specified element. 486 * More formally, returns {@code true} if and only if this queue contains 487 * at least one element {@code e} such that {@code o.equals(e)}. 488 * 489 * @param o object to be checked for containment in this queue 490 * @return {@code true} if this queue contains the specified element 491 */ 492 public boolean contains(Object o) { 493 if (o == null) return false; 494 final Object[] items = this.items; 495 final ReentrantLock lock = this.lock; 496 lock.lock(); 497 try { 498 if (count > 0) { 499 final int putIndex = this.putIndex; 500 int i = takeIndex; 501 do { 502 if (o.equals(items[i])) 503 return true; 504 } while ((i = inc(i)) != putIndex); 505 } 506 return false; 507 } finally { 508 lock.unlock(); 509 } 510 } 511 512 /** 513 * Returns an array containing all of the elements in this queue, in 514 * proper sequence. 515 * 516 * <p>The returned array will be "safe" in that no references to it are 517 * maintained by this queue. (In other words, this method must allocate 518 * a new array). The caller is thus free to modify the returned array. 519 * 520 * <p>This method acts as bridge between array-based and collection-based 521 * APIs. 522 * 523 * @return an array containing all of the elements in this queue 524 */ 525 public Object[] toArray() { 526 final Object[] items = this.items; 527 final ReentrantLock lock = this.lock; 528 lock.lock(); 529 try { 530 final int count = this.count; 531 Object[] a = new Object[count]; 532 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++) 533 a[k] = items[i]; 534 return a; 535 } finally { 536 lock.unlock(); 537 } 538 } 539 540 /** 541 * Returns an array containing all of the elements in this queue, in 542 * proper sequence; the runtime type of the returned array is that of 543 * the specified array. If the queue fits in the specified array, it 544 * is returned therein. Otherwise, a new array is allocated with the 545 * runtime type of the specified array and the size of this queue. 546 * 547 * <p>If this queue fits in the specified array with room to spare 548 * (i.e., the array has more elements than this queue), the element in 549 * the array immediately following the end of the queue is set to 550 * {@code null}. 551 * 552 * <p>Like the {@link #toArray()} method, this method acts as bridge between 553 * array-based and collection-based APIs. Further, this method allows 554 * precise control over the runtime type of the output array, and may, 555 * under certain circumstances, be used to save allocation costs. 556 * 557 * <p>Suppose {@code x} is a queue known to contain only strings. 558 * The following code can be used to dump the queue into a newly 559 * allocated array of {@code String}: 560 * 561 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 562 * 563 * Note that {@code toArray(new Object[0])} is identical in function to 564 * {@code toArray()}. 565 * 566 * @param a the array into which the elements of the queue are to 567 * be stored, if it is big enough; otherwise, a new array of the 568 * same runtime type is allocated for this purpose 569 * @return an array containing all of the elements in this queue 570 * @throws ArrayStoreException if the runtime type of the specified array 571 * is not a supertype of the runtime type of every element in 572 * this queue 573 * @throws NullPointerException if the specified array is null 574 */ 575 @SuppressWarnings("unchecked") 576 public <T> T[] toArray(T[] a) { 577 final Object[] items = this.items; 578 final ReentrantLock lock = this.lock; 579 lock.lock(); 580 try { 581 final int count = this.count; 582 final int len = a.length; 583 if (len < count) 584 a = (T[])java.lang.reflect.Array.newInstance( 585 a.getClass().getComponentType(), count); 586 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++) 587 a[k] = (T) items[i]; 588 if (len > count) 589 a[count] = null; 590 return a; 591 } finally { 592 lock.unlock(); 593 } 594 } 595 596 public String toString() { 597 final ReentrantLock lock = this.lock; 598 lock.lock(); 599 try { 600 int k = count; 601 if (k == 0) 602 return "[]"; 603 604 StringBuilder sb = new StringBuilder(); 605 sb.append('['); 606 for (int i = takeIndex; ; i = inc(i)) { 607 Object e = items[i]; 608 sb.append(e == this ? "(this Collection)" : e); 609 if (--k == 0) 610 return sb.append(']').toString(); 611 sb.append(',').append(' '); 612 } 613 } finally { 614 lock.unlock(); 615 } 616 } 617 618 /** 619 * Atomically removes all of the elements from this queue. 620 * The queue will be empty after this call returns. 621 */ 622 public void clear() { 623 final Object[] items = this.items; 624 final ReentrantLock lock = this.lock; 625 lock.lock(); 626 try { 627 int k = count; 628 if (k > 0) { 629 final int putIndex = this.putIndex; 630 int i = takeIndex; 631 do { 632 items[i] = null; 633 } while ((i = inc(i)) != putIndex); 634 takeIndex = putIndex; 635 count = 0; 636 if (itrs != null) 637 itrs.queueIsEmpty(); 638 for (; k > 0 && lock.hasWaiters(notFull); k--) 639 notFull.signal(); 640 } 641 } finally { 642 lock.unlock(); 643 } 644 } 645 646 /** 647 * @throws UnsupportedOperationException {@inheritDoc} 648 * @throws ClassCastException {@inheritDoc} 649 * @throws NullPointerException {@inheritDoc} 650 * @throws IllegalArgumentException {@inheritDoc} 651 */ 652 public int drainTo(Collection<? super E> c) { 653 return drainTo(c, Integer.MAX_VALUE); 654 } 655 656 /** 657 * @throws UnsupportedOperationException {@inheritDoc} 658 * @throws ClassCastException {@inheritDoc} 659 * @throws NullPointerException {@inheritDoc} 660 * @throws IllegalArgumentException {@inheritDoc} 661 */ 662 public int drainTo(Collection<? super E> c, int maxElements) { 663 checkNotNull(c); 664 if (c == this) 665 throw new IllegalArgumentException(); 666 if (maxElements <= 0) 667 return 0; 668 final Object[] items = this.items; 669 final ReentrantLock lock = this.lock; 670 lock.lock(); 671 try { 672 int n = Math.min(maxElements, count); 673 int take = takeIndex; 674 int i = 0; 675 try { 676 while (i < n) { 677 @SuppressWarnings("unchecked") E x = (E) items[take]; 678 c.add(x); 679 items[take] = null; 680 take = inc(take); 681 i++; 682 } 683 return n; 684 } finally { 685 // Restore invariants even if c.add() threw 686 if (i > 0) { 687 count -= i; 688 takeIndex = take; 689 if (itrs != null) { 690 if (count == 0) 691 itrs.queueIsEmpty(); 692 else if (i > take) 693 itrs.takeIndexWrapped(); 694 } 695 for (; i > 0 && lock.hasWaiters(notFull); i--) 696 notFull.signal(); 697 } 698 } 699 } finally { 700 lock.unlock(); 701 } 702 } 703 704 /** 705 * Returns an iterator over the elements in this queue in proper sequence. 706 * The elements will be returned in order from first (head) to last (tail). 707 * 708 * <p>The returned iterator is a "weakly consistent" iterator that 709 * will never throw {@link java.util.ConcurrentModificationException 710 * ConcurrentModificationException}, and guarantees to traverse 711 * elements as they existed upon construction of the iterator, and 712 * may (but is not guaranteed to) reflect any modifications 713 * subsequent to construction. 714 * 715 * @return an iterator over the elements in this queue in proper sequence 716 */ 717 public Iterator<E> iterator() { 718 return new Itr(); 719 } 720 721 /** 722 * Shared data between iterators and their queue, allowing queue 723 * modifications to update iterators when elements are removed. 724 * 725 * This adds a lot of complexity for the sake of correctly 726 * handling some uncommon operations, but the combination of 727 * circular-arrays and supporting interior removes (i.e., those 728 * not at head) would cause iterators to sometimes lose their 729 * places and/or (re)report elements they shouldn't. To avoid 730 * this, when a queue has one or more iterators, it keeps iterator 731 * state consistent by: 732 * 733 * (1) keeping track of the number of "cycles", that is, the 734 * number of times takeIndex has wrapped around to 0. 735 * (2) notifying all iterators via the callback removedAt whenever 736 * an interior element is removed (and thus other elements may 737 * be shifted). 738 * 739 * These suffice to eliminate iterator inconsistencies, but 740 * unfortunately add the secondary responsibility of maintaining 741 * the list of iterators. We track all active iterators in a 742 * simple linked list (accessed only when the queue's lock is 743 * held) of weak references to Itr. The list is cleaned up using 744 * 3 different mechanisms: 745 * 746 * (1) Whenever a new iterator is created, do some O(1) checking for 747 * stale list elements. 748 * 749 * (2) Whenever takeIndex wraps around to 0, check for iterators 750 * that have been unused for more than one wrap-around cycle. 751 * 752 * (3) Whenever the queue becomes empty, all iterators are notified 753 * and this entire data structure is discarded. 754 * 755 * So in addition to the removedAt callback that is necessary for 756 * correctness, iterators have the shutdown and takeIndexWrapped 757 * callbacks that help remove stale iterators from the list. 758 * 759 * Whenever a list element is examined, it is expunged if either 760 * the GC has determined that the iterator is discarded, or if the 761 * iterator reports that it is "detached" (does not need any 762 * further state updates). Overhead is maximal when takeIndex 763 * never advances, iterators are discarded before they are 764 * exhausted, and all removals are interior removes, in which case 765 * all stale iterators are discovered by the GC. But even in this 766 * case we don't increase the amortized complexity. 767 * 768 * Care must be taken to keep list sweeping methods from 769 * reentrantly invoking another such method, causing subtle 770 * corruption bugs. 771 */ 772 class Itrs { 773 774 /** 775 * Node in a linked list of weak iterator references. 776 */ 777 private class Node extends WeakReference<Itr> { 778 Node next; 779 780 Node(Itr iterator, Node next) { 781 super(iterator); 782 this.next = next; 783 } 784 } 785 786 /** Incremented whenever takeIndex wraps around to 0 */ 787 int cycles = 0; 788 789 /** Linked list of weak iterator references */ 790 private Node head; 791 792 /** Used to expunge stale iterators */ 793 private Node sweeper = null; 794 795 private static final int SHORT_SWEEP_PROBES = 4; 796 private static final int LONG_SWEEP_PROBES = 16; 797 798 Itrs(Itr initial) { 799 register(initial); 800 } 801 802 /** 803 * Sweeps itrs, looking for and expunging stale iterators. 804 * If at least one was found, tries harder to find more. 805 * Called only from iterating thread. 806 * 807 * @param tryHarder whether to start in try-harder mode, because 808 * there is known to be at least one iterator to collect 809 */ 810 void doSomeSweeping(boolean tryHarder) { 811 // assert lock.getHoldCount() == 1; 812 // assert head != null; 813 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; 814 Node o, p; 815 final Node sweeper = this.sweeper; 816 boolean passedGo; // to limit search to one full sweep 817 818 if (sweeper == null) { 819 o = null; 820 p = head; 821 passedGo = true; 822 } else { 823 o = sweeper; 824 p = o.next; 825 passedGo = false; 826 } 827 828 for (; probes > 0; probes--) { 829 if (p == null) { 830 if (passedGo) 831 break; 832 o = null; 833 p = head; 834 passedGo = true; 835 } 836 final Itr it = p.get(); 837 final Node next = p.next; 838 if (it == null || it.isDetached()) { 839 // found a discarded/exhausted iterator 840 probes = LONG_SWEEP_PROBES; // "try harder" 841 // unlink p 842 p.clear(); 843 p.next = null; 844 if (o == null) { 845 head = next; 846 if (next == null) { 847 // We've run out of iterators to track; retire 848 itrs = null; 849 return; 850 } 851 } 852 else 853 o.next = next; 854 } else { 855 o = p; 856 } 857 p = next; 858 } 859 860 this.sweeper = (p == null) ? null : o; 861 } 862 863 /** 864 * Adds a new iterator to the linked list of tracked iterators. 865 */ 866 void register(Itr itr) { 867 // assert lock.getHoldCount() == 1; 868 head = new Node(itr, head); 869 } 870 871 /** 872 * Called whenever takeIndex wraps around to 0. 873 * 874 * Notifies all iterators, and expunges any that are now stale. 875 */ 876 void takeIndexWrapped() { 877 // assert lock.getHoldCount() == 1; 878 cycles++; 879 for (Node o = null, p = head; p != null;) { 880 final Itr it = p.get(); 881 final Node next = p.next; 882 if (it == null || it.takeIndexWrapped()) { 883 // unlink p 884 // assert it == null || it.isDetached(); 885 p.clear(); 886 p.next = null; 887 if (o == null) 888 head = next; 889 else 890 o.next = next; 891 } else { 892 o = p; 893 } 894 p = next; 895 } 896 if (head == null) // no more iterators to track 897 itrs = null; 898 } 899 900 /** 901 * Called whenever an interior remove (not at takeIndex) occured. 902 * 903 * Notifies all iterators, and expunges any that are now stale. 904 */ 905 void removedAt(int removedIndex) { 906 for (Node o = null, p = head; p != null;) { 907 final Itr it = p.get(); 908 final Node next = p.next; 909 if (it == null || it.removedAt(removedIndex)) { 910 // unlink p 911 // assert it == null || it.isDetached(); 912 p.clear(); 913 p.next = null; 914 if (o == null) 915 head = next; 916 else 917 o.next = next; 918 } else { 919 o = p; 920 } 921 p = next; 922 } 923 if (head == null) // no more iterators to track 924 itrs = null; 925 } 926 927 /** 928 * Called whenever the queue becomes empty. 929 * 930 * Notifies all active iterators that the queue is empty, 931 * clears all weak refs, and unlinks the itrs datastructure. 932 */ 933 void queueIsEmpty() { 934 // assert lock.getHoldCount() == 1; 935 for (Node p = head; p != null; p = p.next) { 936 Itr it = p.get(); 937 if (it != null) { 938 p.clear(); 939 it.shutdown(); 940 } 941 } 942 head = null; 943 itrs = null; 944 } 945 946 /** 947 * Called whenever an element has been dequeued (at takeIndex). 948 */ 949 void elementDequeued() { 950 // assert lock.getHoldCount() == 1; 951 if (count == 0) 952 queueIsEmpty(); 953 else if (takeIndex == 0) 954 takeIndexWrapped(); 955 } 956 } 957 958 /** 959 * Iterator for ArrayBlockingQueue. 960 * 961 * To maintain weak consistency with respect to puts and takes, we 962 * read ahead one slot, so as to not report hasNext true but then 963 * not have an element to return. 964 * 965 * We switch into "detached" mode (allowing prompt unlinking from 966 * itrs without help from the GC) when all indices are negative, or 967 * when hasNext returns false for the first time. This allows the 968 * iterator to track concurrent updates completely accurately, 969 * except for the corner case of the user calling Iterator.remove() 970 * after hasNext() returned false. Even in this case, we ensure 971 * that we don't remove the wrong element by keeping track of the 972 * expected element to remove, in lastItem. Yes, we may fail to 973 * remove lastItem from the queue if it moved due to an interleaved 974 * interior remove while in detached mode. 975 */ 976 private class Itr implements Iterator<E> { 977 /** Index to look for new nextItem; NONE at end */ 978 private int cursor; 979 980 /** Element to be returned by next call to next(); null if none */ 981 private E nextItem; 982 983 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */ 984 private int nextIndex; 985 986 /** Last element returned; null if none or not detached. */ 987 private E lastItem; 988 989 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */ 990 private int lastRet; 991 992 /** Previous value of takeIndex, or DETACHED when detached */ 993 private int prevTakeIndex; 994 995 /** Previous value of iters.cycles */ 996 private int prevCycles; 997 998 /** Special index value indicating "not available" or "undefined" */ 999 private static final int NONE = -1; 1000 1001 /** 1002 * Special index value indicating "removed elsewhere", that is, 1003 * removed by some operation other than a call to this.remove(). 1004 */ 1005 private static final int REMOVED = -2; 1006 1007 /** Special value for prevTakeIndex indicating "detached mode" */ 1008 private static final int DETACHED = -3; 1009 1010 Itr() { 1011 // assert lock.getHoldCount() == 0; 1012 lastRet = NONE; 1013 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1014 lock.lock(); 1015 try { 1016 if (count == 0) { 1017 // assert itrs == null; 1018 cursor = NONE; 1019 nextIndex = NONE; 1020 prevTakeIndex = DETACHED; 1021 } else { 1022 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1023 prevTakeIndex = takeIndex; 1024 nextItem = itemAt(nextIndex = takeIndex); 1025 cursor = incCursor(takeIndex); 1026 if (itrs == null) { 1027 itrs = new Itrs(this); 1028 } else { 1029 itrs.register(this); // in this order 1030 itrs.doSomeSweeping(false); 1031 } 1032 prevCycles = itrs.cycles; 1033 // assert takeIndex >= 0; 1034 // assert prevTakeIndex == takeIndex; 1035 // assert nextIndex >= 0; 1036 // assert nextItem != null; 1037 } 1038 } finally { 1039 lock.unlock(); 1040 } 1041 } 1042 1043 boolean isDetached() { 1044 // assert lock.getHoldCount() == 1; 1045 return prevTakeIndex < 0; 1046 } 1047 1048 private int incCursor(int index) { 1049 // assert lock.getHoldCount() == 1; 1050 index = inc(index); 1051 if (index == putIndex) 1052 index = NONE; 1053 return index; 1054 } 1055 1056 /** 1057 * Returns true if index is invalidated by the given number of 1058 * dequeues, starting from prevTakeIndex. 1059 */ 1060 private boolean invalidated(int index, int prevTakeIndex, 1061 long dequeues, int length) { 1062 if (index < 0) 1063 return false; 1064 int distance = index - prevTakeIndex; 1065 if (distance < 0) 1066 distance += length; 1067 return dequeues > distance; 1068 } 1069 1070 /** 1071 * Adjusts indices to incorporate all dequeues since the last 1072 * operation on this iterator. Call only from iterating thread. 1073 */ 1074 private void incorporateDequeues() { 1075 // assert lock.getHoldCount() == 1; 1076 // assert itrs != null; 1077 // assert !isDetached(); 1078 // assert count > 0; 1079 1080 final int cycles = itrs.cycles; 1081 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1082 final int prevCycles = this.prevCycles; 1083 final int prevTakeIndex = this.prevTakeIndex; 1084 1085 if (cycles != prevCycles || takeIndex != prevTakeIndex) { 1086 final int len = items.length; 1087 // how far takeIndex has advanced since the previous 1088 // operation of this iterator 1089 long dequeues = (cycles - prevCycles) * len 1090 + (takeIndex - prevTakeIndex); 1091 1092 // Check indices for invalidation 1093 if (invalidated(lastRet, prevTakeIndex, dequeues, len)) 1094 lastRet = REMOVED; 1095 if (invalidated(nextIndex, prevTakeIndex, dequeues, len)) 1096 nextIndex = REMOVED; 1097 if (invalidated(cursor, prevTakeIndex, dequeues, len)) 1098 cursor = takeIndex; 1099 1100 if (cursor < 0 && nextIndex < 0 && lastRet < 0) 1101 detach(); 1102 else { 1103 this.prevCycles = cycles; 1104 this.prevTakeIndex = takeIndex; 1105 } 1106 } 1107 } 1108 1109 /** 1110 * Called when itrs should stop tracking this iterator, either 1111 * because there are no more indices to update (cursor < 0 && 1112 * nextIndex < 0 && lastRet < 0) or as a special exception, when 1113 * lastRet >= 0, because hasNext() is about to return false for the 1114 * first time. Call only from iterating thread. 1115 */ 1116 private void detach() { 1117 // Switch to detached mode 1118 // assert lock.getHoldCount() == 1; 1119 // assert cursor == NONE; 1120 // assert nextIndex < 0; 1121 // assert lastRet < 0 || nextItem == null; 1122 // assert lastRet < 0 ^ lastItem != null; 1123 if (prevTakeIndex >= 0) { 1124 // assert itrs != null; 1125 prevTakeIndex = DETACHED; 1126 // try to unlink from itrs (but not too hard) 1127 itrs.doSomeSweeping(true); 1128 } 1129 } 1130 1131 /** 1132 * For performance reasons, we would like not to acquire a lock in 1133 * hasNext in the common case. To allow for this, we only access 1134 * fields (i.e. nextItem) that are not modified by update operations 1135 * triggered by queue modifications. 1136 */ 1137 public boolean hasNext() { 1138 // assert lock.getHoldCount() == 0; 1139 if (nextItem != null) 1140 return true; 1141 noNext(); 1142 return false; 1143 } 1144 1145 private void noNext() { 1146 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1147 lock.lock(); 1148 try { 1149 // assert cursor == NONE; 1150 // assert nextIndex == NONE; 1151 if (!isDetached()) { 1152 // assert lastRet >= 0; 1153 incorporateDequeues(); // might update lastRet 1154 if (lastRet >= 0) { 1155 lastItem = itemAt(lastRet); 1156 // assert lastItem != null; 1157 detach(); 1158 } 1159 } 1160 // assert isDetached(); 1161 // assert lastRet < 0 ^ lastItem != null; 1162 } finally { 1163 lock.unlock(); 1164 } 1165 } 1166 1167 public E next() { 1168 // assert lock.getHoldCount() == 0; 1169 final E x = nextItem; 1170 if (x == null) 1171 throw new NoSuchElementException(); 1172 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1173 lock.lock(); 1174 try { 1175 if (!isDetached()) 1176 incorporateDequeues(); 1177 // assert nextIndex != NONE; 1178 // assert lastItem == null; 1179 lastRet = nextIndex; 1180 final int cursor = this.cursor; 1181 if (cursor >= 0) { 1182 nextItem = itemAt(nextIndex = cursor); 1183 // assert nextItem != null; 1184 this.cursor = incCursor(cursor); 1185 } else { 1186 nextIndex = NONE; 1187 nextItem = null; 1188 } 1189 } finally { 1190 lock.unlock(); 1191 } 1192 return x; 1193 } 1194 1195 public void remove() { 1196 // assert lock.getHoldCount() == 0; 1197 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1198 lock.lock(); 1199 try { 1200 if (!isDetached()) 1201 incorporateDequeues(); // might update lastRet or detach 1202 final int lastRet = this.lastRet; 1203 this.lastRet = NONE; 1204 if (lastRet >= 0) { 1205 if (!isDetached()) 1206 removeAt(lastRet); 1207 else { 1208 final E lastItem = this.lastItem; 1209 // assert lastItem != null; 1210 this.lastItem = null; 1211 if (itemAt(lastRet) == lastItem) 1212 removeAt(lastRet); 1213 } 1214 } else if (lastRet == NONE) 1215 throw new IllegalStateException(); 1216 // else lastRet == REMOVED and the last returned element was 1217 // previously asynchronously removed via an operation other 1218 // than this.remove(), so nothing to do. 1219 1220 if (cursor < 0 && nextIndex < 0) 1221 detach(); 1222 } finally { 1223 lock.unlock(); 1224 // assert lastRet == NONE; 1225 // assert lastItem == null; 1226 } 1227 } 1228 1229 /** 1230 * Called to notify the iterator that the queue is empty, or that it 1231 * has fallen hopelessly behind, so that it should abandon any 1232 * further iteration, except possibly to return one more element 1233 * from next(), as promised by returning true from hasNext(). 1234 */ 1235 void shutdown() { 1236 // assert lock.getHoldCount() == 1; 1237 cursor = NONE; 1238 if (nextIndex >= 0) 1239 nextIndex = REMOVED; 1240 if (lastRet >= 0) { 1241 lastRet = REMOVED; 1242 lastItem = null; 1243 } 1244 prevTakeIndex = DETACHED; 1245 // Don't set nextItem to null because we must continue to be 1246 // able to return it on next(). 1247 // 1248 // Caller will unlink from itrs when convenient. 1249 } 1250 1251 private int distance(int index, int prevTakeIndex, int length) { 1252 int distance = index - prevTakeIndex; 1253 if (distance < 0) 1254 distance += length; 1255 return distance; 1256 } 1257 1258 /** 1259 * Called whenever an interior remove (not at takeIndex) occured. 1260 * 1261 * @return true if this iterator should be unlinked from itrs 1262 */ 1263 boolean removedAt(int removedIndex) { 1264 // assert lock.getHoldCount() == 1; 1265 if (isDetached()) 1266 return true; 1267 1268 final int cycles = itrs.cycles; 1269 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1270 final int prevCycles = this.prevCycles; 1271 final int prevTakeIndex = this.prevTakeIndex; 1272 final int len = items.length; 1273 int cycleDiff = cycles - prevCycles; 1274 if (removedIndex < takeIndex) 1275 cycleDiff++; 1276 final int removedDistance = 1277 (cycleDiff * len) + (removedIndex - prevTakeIndex); 1278 // assert removedDistance >= 0; 1279 int cursor = this.cursor; 1280 if (cursor >= 0) { 1281 int x = distance(cursor, prevTakeIndex, len); 1282 if (x == removedDistance) { 1283 if (cursor == putIndex) 1284 this.cursor = cursor = NONE; 1285 } 1286 else if (x > removedDistance) { 1287 // assert cursor != prevTakeIndex; 1288 this.cursor = cursor = dec(cursor); 1289 } 1290 } 1291 int lastRet = this.lastRet; 1292 if (lastRet >= 0) { 1293 int x = distance(lastRet, prevTakeIndex, len); 1294 if (x == removedDistance) 1295 this.lastRet = lastRet = REMOVED; 1296 else if (x > removedDistance) 1297 this.lastRet = lastRet = dec(lastRet); 1298 } 1299 int nextIndex = this.nextIndex; 1300 if (nextIndex >= 0) { 1301 int x = distance(nextIndex, prevTakeIndex, len); 1302 if (x == removedDistance) 1303 this.nextIndex = nextIndex = REMOVED; 1304 else if (x > removedDistance) 1305 this.nextIndex = nextIndex = dec(nextIndex); 1306 } 1307 else if (cursor < 0 && nextIndex < 0 && lastRet < 0) { 1308 this.prevTakeIndex = DETACHED; 1309 return true; 1310 } 1311 return false; 1312 } 1313 1314 /** 1315 * Called whenever takeIndex wraps around to zero. 1316 * 1317 * @return true if this iterator should be unlinked from itrs 1318 */ 1319 boolean takeIndexWrapped() { 1320 // assert lock.getHoldCount() == 1; 1321 if (isDetached()) 1322 return true; 1323 if (itrs.cycles - prevCycles > 1) { 1324 // All the elements that existed at the time of the last 1325 // operation are gone, so abandon further iteration. 1326 shutdown(); 1327 return true; 1328 } 1329 return false; 1330 } 1331 1332 // /** Uncomment for debugging. */ 1333 // public String toString() { 1334 // return ("cursor=" + cursor + " " + 1335 // "nextIndex=" + nextIndex + " " + 1336 // "lastRet=" + lastRet + " " + 1337 // "nextItem=" + nextItem + " " + 1338 // "lastItem=" + lastItem + " " + 1339 // "prevCycles=" + prevCycles + " " + 1340 // "prevTakeIndex=" + prevTakeIndex + " " + 1341 // "size()=" + size() + " " + 1342 // "remainingCapacity()=" + remainingCapacity()); 1343 // } 1344 } 1345 } 1346