1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 import java.util.concurrent.locks.Condition; 9 import java.util.concurrent.locks.ReentrantLock; 10 import java.util.AbstractQueue; 11 import java.util.Collection; 12 import java.util.Iterator; 13 import java.util.NoSuchElementException; 14 import java.lang.ref.WeakReference; 15 16 // BEGIN android-note 17 // removed link to collections framework docs 18 // END android-note 19 20 /** 21 * A bounded {@linkplain BlockingQueue blocking queue} backed by an 22 * array. This queue orders elements FIFO (first-in-first-out). The 23 * <em>head</em> of the queue is that element that has been on the 24 * queue the longest time. The <em>tail</em> of the queue is that 25 * element that has been on the queue the shortest time. New elements 26 * are inserted at the tail of the queue, and the queue retrieval 27 * operations obtain elements at the head of the queue. 28 * 29 * <p>This is a classic "bounded buffer", in which a 30 * fixed-sized array holds elements inserted by producers and 31 * extracted by consumers. Once created, the capacity cannot be 32 * changed. Attempts to {@code put} an element into a full queue 33 * will result in the operation blocking; attempts to {@code take} an 34 * element from an empty queue will similarly block. 35 * 36 * <p>This class supports an optional fairness policy for ordering 37 * waiting producer and consumer threads. By default, this ordering 38 * is not guaranteed. However, a queue constructed with fairness set 39 * to {@code true} grants threads access in FIFO order. Fairness 40 * generally decreases throughput but reduces variability and avoids 41 * starvation. 42 * 43 * <p>This class and its iterator implement all of the 44 * <em>optional</em> methods of the {@link Collection} and {@link 45 * Iterator} interfaces. 46 * 47 * @since 1.5 48 * @author Doug Lea 49 * @param <E> the type of elements held in this collection 50 */ 51 public class ArrayBlockingQueue<E> extends AbstractQueue<E> 52 implements BlockingQueue<E>, java.io.Serializable { 53 54 /** 55 * Serialization ID. This class relies on default serialization 56 * even for the items array, which is default-serialized, even if 57 * it is empty. Otherwise it could not be declared final, which is 58 * necessary here. 59 */ 60 private static final long serialVersionUID = -817911632652898426L; 61 62 /** The queued items */ 63 final Object[] items; 64 65 /** items index for next take, poll, peek or remove */ 66 int takeIndex; 67 68 /** items index for next put, offer, or add */ 69 int putIndex; 70 71 /** Number of elements in the queue */ 72 int count; 73 74 /* 75 * Concurrency control uses the classic two-condition algorithm 76 * found in any textbook. 77 */ 78 79 /** Main lock guarding all access */ 80 final ReentrantLock lock; 81 82 /** Condition for waiting takes */ 83 private final Condition notEmpty; 84 85 /** Condition for waiting puts */ 86 private final Condition notFull; 87 88 /** 89 * Shared state for currently active iterators, or null if there 90 * are known not to be any. Allows queue operations to update 91 * iterator state. 92 */ 93 transient Itrs itrs = null; 94 95 // Internal helper methods 96 97 /** 98 * Circularly increment i. 99 */ 100 final int inc(int i) { 101 return (++i == items.length) ? 0 : i; 102 } 103 104 /** 105 * Circularly decrement i. 106 */ 107 final int dec(int i) { 108 return ((i == 0) ? items.length : i) - 1; 109 } 110 111 /** 112 * Returns item at index i. 113 */ 114 @SuppressWarnings("unchecked") 115 final E itemAt(int i) { 116 return (E) items[i]; 117 } 118 119 /** 120 * Throws NullPointerException if argument is null. 121 * 122 * @param v the element 123 */ 124 private static void checkNotNull(Object v) { 125 if (v == null) 126 throw new NullPointerException(); 127 } 128 129 /** 130 * Inserts element at current put position, advances, and signals. 131 * Call only when holding lock. 132 */ 133 private void enqueue(E x) { 134 // assert lock.getHoldCount() == 1; 135 // assert items[putIndex] == null; 136 items[putIndex] = x; 137 putIndex = inc(putIndex); 138 count++; 139 notEmpty.signal(); 140 } 141 142 /** 143 * Extracts element at current take position, advances, and signals. 144 * Call only when holding lock. 145 */ 146 private E dequeue() { 147 // assert lock.getHoldCount() == 1; 148 // assert items[takeIndex] != null; 149 final Object[] items = this.items; 150 @SuppressWarnings("unchecked") 151 E x = (E) items[takeIndex]; 152 items[takeIndex] = null; 153 takeIndex = inc(takeIndex); 154 count--; 155 if (itrs != null) 156 itrs.elementDequeued(); 157 notFull.signal(); 158 return x; 159 } 160 161 /** 162 * Deletes item at array index removeIndex. 163 * Utility for remove(Object) and iterator.remove. 164 * Call only when holding lock. 165 */ 166 void removeAt(final int removeIndex) { 167 // assert lock.getHoldCount() == 1; 168 // assert items[removeIndex] != null; 169 // assert removeIndex >= 0 && removeIndex < items.length; 170 final Object[] items = this.items; 171 if (removeIndex == takeIndex) { 172 // removing front item; just advance 173 items[takeIndex] = null; 174 takeIndex = inc(takeIndex); 175 count--; 176 if (itrs != null) 177 itrs.elementDequeued(); 178 } else { 179 // an "interior" remove 180 181 // slide over all others up through putIndex. 182 final int putIndex = this.putIndex; 183 for (int i = removeIndex;;) { 184 int next = inc(i); 185 if (next != putIndex) { 186 items[i] = items[next]; 187 i = next; 188 } else { 189 items[i] = null; 190 this.putIndex = i; 191 break; 192 } 193 } 194 count--; 195 if (itrs != null) 196 itrs.removedAt(removeIndex); 197 } 198 notFull.signal(); 199 } 200 201 /** 202 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 203 * capacity and default access policy. 204 * 205 * @param capacity the capacity of this queue 206 * @throws IllegalArgumentException if {@code capacity < 1} 207 */ 208 public ArrayBlockingQueue(int capacity) { 209 this(capacity, false); 210 } 211 212 /** 213 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 214 * capacity and the specified access policy. 215 * 216 * @param capacity the capacity of this queue 217 * @param fair if {@code true} then queue accesses for threads blocked 218 * on insertion or removal, are processed in FIFO order; 219 * if {@code false} the access order is unspecified. 220 * @throws IllegalArgumentException if {@code capacity < 1} 221 */ 222 public ArrayBlockingQueue(int capacity, boolean fair) { 223 if (capacity <= 0) 224 throw new IllegalArgumentException(); 225 this.items = new Object[capacity]; 226 lock = new ReentrantLock(fair); 227 notEmpty = lock.newCondition(); 228 notFull = lock.newCondition(); 229 } 230 231 /** 232 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 233 * capacity, the specified access policy and initially containing the 234 * elements of the given collection, 235 * added in traversal order of the collection's iterator. 236 * 237 * @param capacity the capacity of this queue 238 * @param fair if {@code true} then queue accesses for threads blocked 239 * on insertion or removal, are processed in FIFO order; 240 * if {@code false} the access order is unspecified. 241 * @param c the collection of elements to initially contain 242 * @throws IllegalArgumentException if {@code capacity} is less than 243 * {@code c.size()}, or less than 1. 244 * @throws NullPointerException if the specified collection or any 245 * of its elements are null 246 */ 247 public ArrayBlockingQueue(int capacity, boolean fair, 248 Collection<? extends E> c) { 249 this(capacity, fair); 250 251 final ReentrantLock lock = this.lock; 252 lock.lock(); // Lock only for visibility, not mutual exclusion 253 try { 254 int i = 0; 255 try { 256 for (E e : c) { 257 checkNotNull(e); 258 items[i++] = e; 259 } 260 } catch (ArrayIndexOutOfBoundsException ex) { 261 throw new IllegalArgumentException(); 262 } 263 count = i; 264 putIndex = (i == capacity) ? 0 : i; 265 } finally { 266 lock.unlock(); 267 } 268 } 269 270 /** 271 * Inserts the specified element at the tail of this queue if it is 272 * possible to do so immediately without exceeding the queue's capacity, 273 * returning {@code true} upon success and throwing an 274 * {@code IllegalStateException} if this queue is full. 275 * 276 * @param e the element to add 277 * @return {@code true} (as specified by {@link Collection#add}) 278 * @throws IllegalStateException if this queue is full 279 * @throws NullPointerException if the specified element is null 280 */ 281 public boolean add(E e) { 282 return super.add(e); 283 } 284 285 /** 286 * Inserts the specified element at the tail of this queue if it is 287 * possible to do so immediately without exceeding the queue's capacity, 288 * returning {@code true} upon success and {@code false} if this queue 289 * is full. This method is generally preferable to method {@link #add}, 290 * which can fail to insert an element only by throwing an exception. 291 * 292 * @throws NullPointerException if the specified element is null 293 */ 294 public boolean offer(E e) { 295 checkNotNull(e); 296 final ReentrantLock lock = this.lock; 297 lock.lock(); 298 try { 299 if (count == items.length) 300 return false; 301 else { 302 enqueue(e); 303 return true; 304 } 305 } finally { 306 lock.unlock(); 307 } 308 } 309 310 /** 311 * Inserts the specified element at the tail of this queue, waiting 312 * for space to become available if the queue is full. 313 * 314 * @throws InterruptedException {@inheritDoc} 315 * @throws NullPointerException {@inheritDoc} 316 */ 317 public void put(E e) throws InterruptedException { 318 checkNotNull(e); 319 final ReentrantLock lock = this.lock; 320 lock.lockInterruptibly(); 321 try { 322 while (count == items.length) 323 notFull.await(); 324 enqueue(e); 325 } finally { 326 lock.unlock(); 327 } 328 } 329 330 /** 331 * Inserts the specified element at the tail of this queue, waiting 332 * up to the specified wait time for space to become available if 333 * the queue is full. 334 * 335 * @throws InterruptedException {@inheritDoc} 336 * @throws NullPointerException {@inheritDoc} 337 */ 338 public boolean offer(E e, long timeout, TimeUnit unit) 339 throws InterruptedException { 340 341 checkNotNull(e); 342 long nanos = unit.toNanos(timeout); 343 final ReentrantLock lock = this.lock; 344 lock.lockInterruptibly(); 345 try { 346 while (count == items.length) { 347 if (nanos <= 0) 348 return false; 349 nanos = notFull.awaitNanos(nanos); 350 } 351 enqueue(e); 352 return true; 353 } finally { 354 lock.unlock(); 355 } 356 } 357 358 public E poll() { 359 final ReentrantLock lock = this.lock; 360 lock.lock(); 361 try { 362 return (count == 0) ? null : dequeue(); 363 } finally { 364 lock.unlock(); 365 } 366 } 367 368 public E take() throws InterruptedException { 369 final ReentrantLock lock = this.lock; 370 lock.lockInterruptibly(); 371 try { 372 while (count == 0) 373 notEmpty.await(); 374 return dequeue(); 375 } finally { 376 lock.unlock(); 377 } 378 } 379 380 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 381 long nanos = unit.toNanos(timeout); 382 final ReentrantLock lock = this.lock; 383 lock.lockInterruptibly(); 384 try { 385 while (count == 0) { 386 if (nanos <= 0) 387 return null; 388 nanos = notEmpty.awaitNanos(nanos); 389 } 390 return dequeue(); 391 } finally { 392 lock.unlock(); 393 } 394 } 395 396 public E peek() { 397 final ReentrantLock lock = this.lock; 398 lock.lock(); 399 try { 400 return itemAt(takeIndex); // null when queue is empty 401 } finally { 402 lock.unlock(); 403 } 404 } 405 406 // this doc comment is overridden to remove the reference to collections 407 // greater in size than Integer.MAX_VALUE 408 /** 409 * Returns the number of elements in this queue. 410 * 411 * @return the number of elements in this queue 412 */ 413 public int size() { 414 final ReentrantLock lock = this.lock; 415 lock.lock(); 416 try { 417 return count; 418 } finally { 419 lock.unlock(); 420 } 421 } 422 423 // this doc comment is a modified copy of the inherited doc comment, 424 // without the reference to unlimited queues. 425 /** 426 * Returns the number of additional elements that this queue can ideally 427 * (in the absence of memory or resource constraints) accept without 428 * blocking. This is always equal to the initial capacity of this queue 429 * less the current {@code size} of this queue. 430 * 431 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 432 * an element will succeed by inspecting {@code remainingCapacity} 433 * because it may be the case that another thread is about to 434 * insert or remove an element. 435 */ 436 public int remainingCapacity() { 437 final ReentrantLock lock = this.lock; 438 lock.lock(); 439 try { 440 return items.length - count; 441 } finally { 442 lock.unlock(); 443 } 444 } 445 446 /** 447 * Removes a single instance of the specified element from this queue, 448 * if it is present. More formally, removes an element {@code e} such 449 * that {@code o.equals(e)}, if this queue contains one or more such 450 * elements. 451 * Returns {@code true} if this queue contained the specified element 452 * (or equivalently, if this queue changed as a result of the call). 453 * 454 * <p>Removal of interior elements in circular array based queues 455 * is an intrinsically slow and disruptive operation, so should 456 * be undertaken only in exceptional circumstances, ideally 457 * only when the queue is known not to be accessible by other 458 * threads. 459 * 460 * @param o element to be removed from this queue, if present 461 * @return {@code true} if this queue changed as a result of the call 462 */ 463 public boolean remove(Object o) { 464 if (o == null) return false; 465 final Object[] items = this.items; 466 final ReentrantLock lock = this.lock; 467 lock.lock(); 468 try { 469 if (count > 0) { 470 final int putIndex = this.putIndex; 471 int i = takeIndex; 472 do { 473 if (o.equals(items[i])) { 474 removeAt(i); 475 return true; 476 } 477 } while ((i = inc(i)) != putIndex); 478 } 479 return false; 480 } finally { 481 lock.unlock(); 482 } 483 } 484 485 /** 486 * Returns {@code true} if this queue contains the specified element. 487 * More formally, returns {@code true} if and only if this queue contains 488 * at least one element {@code e} such that {@code o.equals(e)}. 489 * 490 * @param o object to be checked for containment in this queue 491 * @return {@code true} if this queue contains the specified element 492 */ 493 public boolean contains(Object o) { 494 if (o == null) return false; 495 final Object[] items = this.items; 496 final ReentrantLock lock = this.lock; 497 lock.lock(); 498 try { 499 if (count > 0) { 500 final int putIndex = this.putIndex; 501 int i = takeIndex; 502 do { 503 if (o.equals(items[i])) 504 return true; 505 } while ((i = inc(i)) != putIndex); 506 } 507 return false; 508 } finally { 509 lock.unlock(); 510 } 511 } 512 513 /** 514 * Returns an array containing all of the elements in this queue, in 515 * proper sequence. 516 * 517 * <p>The returned array will be "safe" in that no references to it are 518 * maintained by this queue. (In other words, this method must allocate 519 * a new array). The caller is thus free to modify the returned array. 520 * 521 * <p>This method acts as bridge between array-based and collection-based 522 * APIs. 523 * 524 * @return an array containing all of the elements in this queue 525 */ 526 public Object[] toArray() { 527 final Object[] items = this.items; 528 final ReentrantLock lock = this.lock; 529 lock.lock(); 530 try { 531 final int count = this.count; 532 Object[] a = new Object[count]; 533 int n = items.length - takeIndex; 534 if (count <= n) { 535 System.arraycopy(items, takeIndex, a, 0, count); 536 } else { 537 System.arraycopy(items, takeIndex, a, 0, n); 538 System.arraycopy(items, 0, a, n, count - n); 539 } 540 return a; 541 } finally { 542 lock.unlock(); 543 } 544 } 545 546 /** 547 * Returns an array containing all of the elements in this queue, in 548 * proper sequence; the runtime type of the returned array is that of 549 * the specified array. If the queue fits in the specified array, it 550 * is returned therein. Otherwise, a new array is allocated with the 551 * runtime type of the specified array and the size of this queue. 552 * 553 * <p>If this queue fits in the specified array with room to spare 554 * (i.e., the array has more elements than this queue), the element in 555 * the array immediately following the end of the queue is set to 556 * {@code null}. 557 * 558 * <p>Like the {@link #toArray()} method, this method acts as bridge between 559 * array-based and collection-based APIs. Further, this method allows 560 * precise control over the runtime type of the output array, and may, 561 * under certain circumstances, be used to save allocation costs. 562 * 563 * <p>Suppose {@code x} is a queue known to contain only strings. 564 * The following code can be used to dump the queue into a newly 565 * allocated array of {@code String}: 566 * 567 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 568 * 569 * Note that {@code toArray(new Object[0])} is identical in function to 570 * {@code toArray()}. 571 * 572 * @param a the array into which the elements of the queue are to 573 * be stored, if it is big enough; otherwise, a new array of the 574 * same runtime type is allocated for this purpose 575 * @return an array containing all of the elements in this queue 576 * @throws ArrayStoreException if the runtime type of the specified array 577 * is not a supertype of the runtime type of every element in 578 * this queue 579 * @throws NullPointerException if the specified array is null 580 */ 581 @SuppressWarnings("unchecked") 582 public <T> T[] toArray(T[] a) { 583 final Object[] items = this.items; 584 final ReentrantLock lock = this.lock; 585 lock.lock(); 586 try { 587 final int count = this.count; 588 final int len = a.length; 589 if (len < count) 590 a = (T[])java.lang.reflect.Array.newInstance( 591 a.getClass().getComponentType(), count); 592 int n = items.length - takeIndex; 593 if (count <= n) 594 System.arraycopy(items, takeIndex, a, 0, count); 595 else { 596 System.arraycopy(items, takeIndex, a, 0, n); 597 System.arraycopy(items, 0, a, n, count - n); 598 } 599 if (len > count) 600 a[count] = null; 601 return a; 602 } finally { 603 lock.unlock(); 604 } 605 } 606 607 public String toString() { 608 final ReentrantLock lock = this.lock; 609 lock.lock(); 610 try { 611 int k = count; 612 if (k == 0) 613 return "[]"; 614 615 StringBuilder sb = new StringBuilder(); 616 sb.append('['); 617 for (int i = takeIndex; ; i = inc(i)) { 618 Object e = items[i]; 619 sb.append(e == this ? "(this Collection)" : e); 620 if (--k == 0) 621 return sb.append(']').toString(); 622 sb.append(',').append(' '); 623 } 624 } finally { 625 lock.unlock(); 626 } 627 } 628 629 /** 630 * Atomically removes all of the elements from this queue. 631 * The queue will be empty after this call returns. 632 */ 633 public void clear() { 634 final Object[] items = this.items; 635 final ReentrantLock lock = this.lock; 636 lock.lock(); 637 try { 638 int k = count; 639 if (k > 0) { 640 final int putIndex = this.putIndex; 641 int i = takeIndex; 642 do { 643 items[i] = null; 644 } while ((i = inc(i)) != putIndex); 645 takeIndex = putIndex; 646 count = 0; 647 if (itrs != null) 648 itrs.queueIsEmpty(); 649 for (; k > 0 && lock.hasWaiters(notFull); k--) 650 notFull.signal(); 651 } 652 } finally { 653 lock.unlock(); 654 } 655 } 656 657 /** 658 * @throws UnsupportedOperationException {@inheritDoc} 659 * @throws ClassCastException {@inheritDoc} 660 * @throws NullPointerException {@inheritDoc} 661 * @throws IllegalArgumentException {@inheritDoc} 662 */ 663 public int drainTo(Collection<? super E> c) { 664 return drainTo(c, Integer.MAX_VALUE); 665 } 666 667 /** 668 * @throws UnsupportedOperationException {@inheritDoc} 669 * @throws ClassCastException {@inheritDoc} 670 * @throws NullPointerException {@inheritDoc} 671 * @throws IllegalArgumentException {@inheritDoc} 672 */ 673 public int drainTo(Collection<? super E> c, int maxElements) { 674 checkNotNull(c); 675 if (c == this) 676 throw new IllegalArgumentException(); 677 if (maxElements <= 0) 678 return 0; 679 final Object[] items = this.items; 680 final ReentrantLock lock = this.lock; 681 lock.lock(); 682 try { 683 int n = Math.min(maxElements, count); 684 int take = takeIndex; 685 int i = 0; 686 try { 687 while (i < n) { 688 @SuppressWarnings("unchecked") 689 E x = (E) items[take]; 690 c.add(x); 691 items[take] = null; 692 take = inc(take); 693 i++; 694 } 695 return n; 696 } finally { 697 // Restore invariants even if c.add() threw 698 if (i > 0) { 699 count -= i; 700 takeIndex = take; 701 if (itrs != null) { 702 if (count == 0) 703 itrs.queueIsEmpty(); 704 else if (i > take) 705 itrs.takeIndexWrapped(); 706 } 707 for (; i > 0 && lock.hasWaiters(notFull); i--) 708 notFull.signal(); 709 } 710 } 711 } finally { 712 lock.unlock(); 713 } 714 } 715 716 /** 717 * Returns an iterator over the elements in this queue in proper sequence. 718 * The elements will be returned in order from first (head) to last (tail). 719 * 720 * <p>The returned iterator is a "weakly consistent" iterator that 721 * will never throw {@link java.util.ConcurrentModificationException 722 * ConcurrentModificationException}, and guarantees to traverse 723 * elements as they existed upon construction of the iterator, and 724 * may (but is not guaranteed to) reflect any modifications 725 * subsequent to construction. 726 * 727 * @return an iterator over the elements in this queue in proper sequence 728 */ 729 public Iterator<E> iterator() { 730 return new Itr(); 731 } 732 733 /** 734 * Shared data between iterators and their queue, allowing queue 735 * modifications to update iterators when elements are removed. 736 * 737 * This adds a lot of complexity for the sake of correctly 738 * handling some uncommon operations, but the combination of 739 * circular-arrays and supporting interior removes (i.e., those 740 * not at head) would cause iterators to sometimes lose their 741 * places and/or (re)report elements they shouldn't. To avoid 742 * this, when a queue has one or more iterators, it keeps iterator 743 * state consistent by: 744 * 745 * (1) keeping track of the number of "cycles", that is, the 746 * number of times takeIndex has wrapped around to 0. 747 * (2) notifying all iterators via the callback removedAt whenever 748 * an interior element is removed (and thus other elements may 749 * be shifted). 750 * 751 * These suffice to eliminate iterator inconsistencies, but 752 * unfortunately add the secondary responsibility of maintaining 753 * the list of iterators. We track all active iterators in a 754 * simple linked list (accessed only when the queue's lock is 755 * held) of weak references to Itr. The list is cleaned up using 756 * 3 different mechanisms: 757 * 758 * (1) Whenever a new iterator is created, do some O(1) checking for 759 * stale list elements. 760 * 761 * (2) Whenever takeIndex wraps around to 0, check for iterators 762 * that have been unused for more than one wrap-around cycle. 763 * 764 * (3) Whenever the queue becomes empty, all iterators are notified 765 * and this entire data structure is discarded. 766 * 767 * So in addition to the removedAt callback that is necessary for 768 * correctness, iterators have the shutdown and takeIndexWrapped 769 * callbacks that help remove stale iterators from the list. 770 * 771 * Whenever a list element is examined, it is expunged if either 772 * the GC has determined that the iterator is discarded, or if the 773 * iterator reports that it is "detached" (does not need any 774 * further state updates). Overhead is maximal when takeIndex 775 * never advances, iterators are discarded before they are 776 * exhausted, and all removals are interior removes, in which case 777 * all stale iterators are discovered by the GC. But even in this 778 * case we don't increase the amortized complexity. 779 * 780 * Care must be taken to keep list sweeping methods from 781 * reentrantly invoking another such method, causing subtle 782 * corruption bugs. 783 */ 784 class Itrs { 785 786 /** 787 * Node in a linked list of weak iterator references. 788 */ 789 private class Node extends WeakReference<Itr> { 790 Node next; 791 792 Node(Itr iterator, Node next) { 793 super(iterator); 794 this.next = next; 795 } 796 } 797 798 /** Incremented whenever takeIndex wraps around to 0 */ 799 int cycles = 0; 800 801 /** Linked list of weak iterator references */ 802 private Node head; 803 804 /** Used to expunge stale iterators */ 805 private Node sweeper = null; 806 807 private static final int SHORT_SWEEP_PROBES = 4; 808 private static final int LONG_SWEEP_PROBES = 16; 809 810 Itrs(Itr initial) { 811 register(initial); 812 } 813 814 /** 815 * Sweeps itrs, looking for and expunging stale iterators. 816 * If at least one was found, tries harder to find more. 817 * Called only from iterating thread. 818 * 819 * @param tryHarder whether to start in try-harder mode, because 820 * there is known to be at least one iterator to collect 821 */ 822 void doSomeSweeping(boolean tryHarder) { 823 // assert lock.getHoldCount() == 1; 824 // assert head != null; 825 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; 826 Node o, p; 827 final Node sweeper = this.sweeper; 828 boolean passedGo; // to limit search to one full sweep 829 830 if (sweeper == null) { 831 o = null; 832 p = head; 833 passedGo = true; 834 } else { 835 o = sweeper; 836 p = o.next; 837 passedGo = false; 838 } 839 840 for (; probes > 0; probes--) { 841 if (p == null) { 842 if (passedGo) 843 break; 844 o = null; 845 p = head; 846 passedGo = true; 847 } 848 final Itr it = p.get(); 849 final Node next = p.next; 850 if (it == null || it.isDetached()) { 851 // found a discarded/exhausted iterator 852 probes = LONG_SWEEP_PROBES; // "try harder" 853 // unlink p 854 p.clear(); 855 p.next = null; 856 if (o == null) { 857 head = next; 858 if (next == null) { 859 // We've run out of iterators to track; retire 860 itrs = null; 861 return; 862 } 863 } 864 else 865 o.next = next; 866 } else { 867 o = p; 868 } 869 p = next; 870 } 871 872 this.sweeper = (p == null) ? null : o; 873 } 874 875 /** 876 * Adds a new iterator to the linked list of tracked iterators. 877 */ 878 void register(Itr itr) { 879 // assert lock.getHoldCount() == 1; 880 head = new Node(itr, head); 881 } 882 883 /** 884 * Called whenever takeIndex wraps around to 0. 885 * 886 * Notifies all iterators, and expunges any that are now stale. 887 */ 888 void takeIndexWrapped() { 889 // assert lock.getHoldCount() == 1; 890 cycles++; 891 for (Node o = null, p = head; p != null;) { 892 final Itr it = p.get(); 893 final Node next = p.next; 894 if (it == null || it.takeIndexWrapped()) { 895 // unlink p 896 // assert it == null || it.isDetached(); 897 p.clear(); 898 p.next = null; 899 if (o == null) 900 head = next; 901 else 902 o.next = next; 903 } else { 904 o = p; 905 } 906 p = next; 907 } 908 if (head == null) // no more iterators to track 909 itrs = null; 910 } 911 912 /** 913 * Called whenever an interior remove (not at takeIndex) occured. 914 * 915 * Notifies all iterators, and expunges any that are now stale. 916 */ 917 void removedAt(int removedIndex) { 918 for (Node o = null, p = head; p != null;) { 919 final Itr it = p.get(); 920 final Node next = p.next; 921 if (it == null || it.removedAt(removedIndex)) { 922 // unlink p 923 // assert it == null || it.isDetached(); 924 p.clear(); 925 p.next = null; 926 if (o == null) 927 head = next; 928 else 929 o.next = next; 930 } else { 931 o = p; 932 } 933 p = next; 934 } 935 if (head == null) // no more iterators to track 936 itrs = null; 937 } 938 939 /** 940 * Called whenever the queue becomes empty. 941 * 942 * Notifies all active iterators that the queue is empty, 943 * clears all weak refs, and unlinks the itrs datastructure. 944 */ 945 void queueIsEmpty() { 946 // assert lock.getHoldCount() == 1; 947 for (Node p = head; p != null; p = p.next) { 948 Itr it = p.get(); 949 if (it != null) { 950 p.clear(); 951 it.shutdown(); 952 } 953 } 954 head = null; 955 itrs = null; 956 } 957 958 /** 959 * Called whenever an element has been dequeued (at takeIndex). 960 */ 961 void elementDequeued() { 962 // assert lock.getHoldCount() == 1; 963 if (count == 0) 964 queueIsEmpty(); 965 else if (takeIndex == 0) 966 takeIndexWrapped(); 967 } 968 } 969 970 /** 971 * Iterator for ArrayBlockingQueue. 972 * 973 * To maintain weak consistency with respect to puts and takes, we 974 * read ahead one slot, so as to not report hasNext true but then 975 * not have an element to return. 976 * 977 * We switch into "detached" mode (allowing prompt unlinking from 978 * itrs without help from the GC) when all indices are negative, or 979 * when hasNext returns false for the first time. This allows the 980 * iterator to track concurrent updates completely accurately, 981 * except for the corner case of the user calling Iterator.remove() 982 * after hasNext() returned false. Even in this case, we ensure 983 * that we don't remove the wrong element by keeping track of the 984 * expected element to remove, in lastItem. Yes, we may fail to 985 * remove lastItem from the queue if it moved due to an interleaved 986 * interior remove while in detached mode. 987 */ 988 private class Itr implements Iterator<E> { 989 /** Index to look for new nextItem; NONE at end */ 990 private int cursor; 991 992 /** Element to be returned by next call to next(); null if none */ 993 private E nextItem; 994 995 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */ 996 private int nextIndex; 997 998 /** Last element returned; null if none or not detached. */ 999 private E lastItem; 1000 1001 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */ 1002 private int lastRet; 1003 1004 /** Previous value of takeIndex, or DETACHED when detached */ 1005 private int prevTakeIndex; 1006 1007 /** Previous value of iters.cycles */ 1008 private int prevCycles; 1009 1010 /** Special index value indicating "not available" or "undefined" */ 1011 private static final int NONE = -1; 1012 1013 /** 1014 * Special index value indicating "removed elsewhere", that is, 1015 * removed by some operation other than a call to this.remove(). 1016 */ 1017 private static final int REMOVED = -2; 1018 1019 /** Special value for prevTakeIndex indicating "detached mode" */ 1020 private static final int DETACHED = -3; 1021 1022 Itr() { 1023 // assert lock.getHoldCount() == 0; 1024 lastRet = NONE; 1025 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1026 lock.lock(); 1027 try { 1028 if (count == 0) { 1029 // assert itrs == null; 1030 cursor = NONE; 1031 nextIndex = NONE; 1032 prevTakeIndex = DETACHED; 1033 } else { 1034 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1035 prevTakeIndex = takeIndex; 1036 nextItem = itemAt(nextIndex = takeIndex); 1037 cursor = incCursor(takeIndex); 1038 if (itrs == null) { 1039 itrs = new Itrs(this); 1040 } else { 1041 itrs.register(this); // in this order 1042 itrs.doSomeSweeping(false); 1043 } 1044 prevCycles = itrs.cycles; 1045 // assert takeIndex >= 0; 1046 // assert prevTakeIndex == takeIndex; 1047 // assert nextIndex >= 0; 1048 // assert nextItem != null; 1049 } 1050 } finally { 1051 lock.unlock(); 1052 } 1053 } 1054 1055 boolean isDetached() { 1056 // assert lock.getHoldCount() == 1; 1057 return prevTakeIndex < 0; 1058 } 1059 1060 private int incCursor(int index) { 1061 // assert lock.getHoldCount() == 1; 1062 index = inc(index); 1063 if (index == putIndex) 1064 index = NONE; 1065 return index; 1066 } 1067 1068 /** 1069 * Returns true if index is invalidated by the given number of 1070 * dequeues, starting from prevTakeIndex. 1071 */ 1072 private boolean invalidated(int index, int prevTakeIndex, 1073 long dequeues, int length) { 1074 if (index < 0) 1075 return false; 1076 int distance = index - prevTakeIndex; 1077 if (distance < 0) 1078 distance += length; 1079 return dequeues > distance; 1080 } 1081 1082 /** 1083 * Adjusts indices to incorporate all dequeues since the last 1084 * operation on this iterator. Call only from iterating thread. 1085 */ 1086 private void incorporateDequeues() { 1087 // assert lock.getHoldCount() == 1; 1088 // assert itrs != null; 1089 // assert !isDetached(); 1090 // assert count > 0; 1091 1092 final int cycles = itrs.cycles; 1093 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1094 final int prevCycles = this.prevCycles; 1095 final int prevTakeIndex = this.prevTakeIndex; 1096 1097 if (cycles != prevCycles || takeIndex != prevTakeIndex) { 1098 final int len = items.length; 1099 // how far takeIndex has advanced since the previous 1100 // operation of this iterator 1101 long dequeues = (cycles - prevCycles) * len 1102 + (takeIndex - prevTakeIndex); 1103 1104 // Check indices for invalidation 1105 if (invalidated(lastRet, prevTakeIndex, dequeues, len)) 1106 lastRet = REMOVED; 1107 if (invalidated(nextIndex, prevTakeIndex, dequeues, len)) 1108 nextIndex = REMOVED; 1109 if (invalidated(cursor, prevTakeIndex, dequeues, len)) 1110 cursor = takeIndex; 1111 1112 if (cursor < 0 && nextIndex < 0 && lastRet < 0) 1113 detach(); 1114 else { 1115 this.prevCycles = cycles; 1116 this.prevTakeIndex = takeIndex; 1117 } 1118 } 1119 } 1120 1121 /** 1122 * Called when itrs should stop tracking this iterator, either 1123 * because there are no more indices to update (cursor < 0 && 1124 * nextIndex < 0 && lastRet < 0) or as a special exception, when 1125 * lastRet >= 0, because hasNext() is about to return false for the 1126 * first time. Call only from iterating thread. 1127 */ 1128 private void detach() { 1129 // Switch to detached mode 1130 // assert lock.getHoldCount() == 1; 1131 // assert cursor == NONE; 1132 // assert nextIndex < 0; 1133 // assert lastRet < 0 || nextItem == null; 1134 // assert lastRet < 0 ^ lastItem != null; 1135 if (prevTakeIndex >= 0) { 1136 // assert itrs != null; 1137 prevTakeIndex = DETACHED; 1138 // try to unlink from itrs (but not too hard) 1139 itrs.doSomeSweeping(true); 1140 } 1141 } 1142 1143 /** 1144 * For performance reasons, we would like not to acquire a lock in 1145 * hasNext in the common case. To allow for this, we only access 1146 * fields (i.e. nextItem) that are not modified by update operations 1147 * triggered by queue modifications. 1148 */ 1149 public boolean hasNext() { 1150 // assert lock.getHoldCount() == 0; 1151 if (nextItem != null) 1152 return true; 1153 noNext(); 1154 return false; 1155 } 1156 1157 private void noNext() { 1158 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1159 lock.lock(); 1160 try { 1161 // assert cursor == NONE; 1162 // assert nextIndex == NONE; 1163 if (!isDetached()) { 1164 // assert lastRet >= 0; 1165 incorporateDequeues(); // might update lastRet 1166 if (lastRet >= 0) { 1167 lastItem = itemAt(lastRet); 1168 // assert lastItem != null; 1169 detach(); 1170 } 1171 } 1172 // assert isDetached(); 1173 // assert lastRet < 0 ^ lastItem != null; 1174 } finally { 1175 lock.unlock(); 1176 } 1177 } 1178 1179 public E next() { 1180 // assert lock.getHoldCount() == 0; 1181 final E x = nextItem; 1182 if (x == null) 1183 throw new NoSuchElementException(); 1184 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1185 lock.lock(); 1186 try { 1187 if (!isDetached()) 1188 incorporateDequeues(); 1189 // assert nextIndex != NONE; 1190 // assert lastItem == null; 1191 lastRet = nextIndex; 1192 final int cursor = this.cursor; 1193 if (cursor >= 0) { 1194 nextItem = itemAt(nextIndex = cursor); 1195 // assert nextItem != null; 1196 this.cursor = incCursor(cursor); 1197 } else { 1198 nextIndex = NONE; 1199 nextItem = null; 1200 } 1201 } finally { 1202 lock.unlock(); 1203 } 1204 return x; 1205 } 1206 1207 public void remove() { 1208 // assert lock.getHoldCount() == 0; 1209 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1210 lock.lock(); 1211 try { 1212 if (!isDetached()) 1213 incorporateDequeues(); // might update lastRet or detach 1214 final int lastRet = this.lastRet; 1215 this.lastRet = NONE; 1216 if (lastRet >= 0) { 1217 if (!isDetached()) 1218 removeAt(lastRet); 1219 else { 1220 final E lastItem = this.lastItem; 1221 // assert lastItem != null; 1222 this.lastItem = null; 1223 if (itemAt(lastRet) == lastItem) 1224 removeAt(lastRet); 1225 } 1226 } else if (lastRet == NONE) 1227 throw new IllegalStateException(); 1228 // else lastRet == REMOVED and the last returned element was 1229 // previously asynchronously removed via an operation other 1230 // than this.remove(), so nothing to do. 1231 1232 if (cursor < 0 && nextIndex < 0) 1233 detach(); 1234 } finally { 1235 lock.unlock(); 1236 // assert lastRet == NONE; 1237 // assert lastItem == null; 1238 } 1239 } 1240 1241 /** 1242 * Called to notify the iterator that the queue is empty, or that it 1243 * has fallen hopelessly behind, so that it should abandon any 1244 * further iteration, except possibly to return one more element 1245 * from next(), as promised by returning true from hasNext(). 1246 */ 1247 void shutdown() { 1248 // assert lock.getHoldCount() == 1; 1249 cursor = NONE; 1250 if (nextIndex >= 0) 1251 nextIndex = REMOVED; 1252 if (lastRet >= 0) { 1253 lastRet = REMOVED; 1254 lastItem = null; 1255 } 1256 prevTakeIndex = DETACHED; 1257 // Don't set nextItem to null because we must continue to be 1258 // able to return it on next(). 1259 // 1260 // Caller will unlink from itrs when convenient. 1261 } 1262 1263 private int distance(int index, int prevTakeIndex, int length) { 1264 int distance = index - prevTakeIndex; 1265 if (distance < 0) 1266 distance += length; 1267 return distance; 1268 } 1269 1270 /** 1271 * Called whenever an interior remove (not at takeIndex) occured. 1272 * 1273 * @return true if this iterator should be unlinked from itrs 1274 */ 1275 boolean removedAt(int removedIndex) { 1276 // assert lock.getHoldCount() == 1; 1277 if (isDetached()) 1278 return true; 1279 1280 final int cycles = itrs.cycles; 1281 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1282 final int prevCycles = this.prevCycles; 1283 final int prevTakeIndex = this.prevTakeIndex; 1284 final int len = items.length; 1285 int cycleDiff = cycles - prevCycles; 1286 if (removedIndex < takeIndex) 1287 cycleDiff++; 1288 final int removedDistance = 1289 (cycleDiff * len) + (removedIndex - prevTakeIndex); 1290 // assert removedDistance >= 0; 1291 int cursor = this.cursor; 1292 if (cursor >= 0) { 1293 int x = distance(cursor, prevTakeIndex, len); 1294 if (x == removedDistance) { 1295 if (cursor == putIndex) 1296 this.cursor = cursor = NONE; 1297 } 1298 else if (x > removedDistance) { 1299 // assert cursor != prevTakeIndex; 1300 this.cursor = cursor = dec(cursor); 1301 } 1302 } 1303 int lastRet = this.lastRet; 1304 if (lastRet >= 0) { 1305 int x = distance(lastRet, prevTakeIndex, len); 1306 if (x == removedDistance) 1307 this.lastRet = lastRet = REMOVED; 1308 else if (x > removedDistance) 1309 this.lastRet = lastRet = dec(lastRet); 1310 } 1311 int nextIndex = this.nextIndex; 1312 if (nextIndex >= 0) { 1313 int x = distance(nextIndex, prevTakeIndex, len); 1314 if (x == removedDistance) 1315 this.nextIndex = nextIndex = REMOVED; 1316 else if (x > removedDistance) 1317 this.nextIndex = nextIndex = dec(nextIndex); 1318 } 1319 else if (cursor < 0 && nextIndex < 0 && lastRet < 0) { 1320 this.prevTakeIndex = DETACHED; 1321 return true; 1322 } 1323 return false; 1324 } 1325 1326 /** 1327 * Called whenever takeIndex wraps around to zero. 1328 * 1329 * @return true if this iterator should be unlinked from itrs 1330 */ 1331 boolean takeIndexWrapped() { 1332 // assert lock.getHoldCount() == 1; 1333 if (isDetached()) 1334 return true; 1335 if (itrs.cycles - prevCycles > 1) { 1336 // All the elements that existed at the time of the last 1337 // operation are gone, so abandon further iteration. 1338 shutdown(); 1339 return true; 1340 } 1341 return false; 1342 } 1343 1344 // /** Uncomment for debugging. */ 1345 // public String toString() { 1346 // return ("cursor=" + cursor + " " + 1347 // "nextIndex=" + nextIndex + " " + 1348 // "lastRet=" + lastRet + " " + 1349 // "nextItem=" + nextItem + " " + 1350 // "lastItem=" + lastItem + " " + 1351 // "prevCycles=" + prevCycles + " " + 1352 // "prevTakeIndex=" + prevTakeIndex + " " + 1353 // "size()=" + size() + " " + 1354 // "remainingCapacity()=" + remainingCapacity()); 1355 // } 1356 } 1357 } 1358