1 /* 2 * Written by Doug Lea, Bill Scherer, and Michael Scott with 3 * assistance from members of JCP JSR-166 Expert Group and released to 4 * the public domain, as explained at 5 * http://creativecommons.org/licenses/publicdomain 6 */ 7 8 package java.util.concurrent; 9 import java.util.concurrent.locks.*; 10 import java.util.*; 11 import libcore.util.EmptyArray; 12 13 // BEGIN android-note 14 // removed link to collections framework docs 15 // END android-note 16 17 /** 18 * A {@linkplain BlockingQueue blocking queue} in which each insert 19 * operation must wait for a corresponding remove operation by another 20 * thread, and vice versa. A synchronous queue does not have any 21 * internal capacity, not even a capacity of one. You cannot 22 * <tt>peek</tt> at a synchronous queue because an element is only 23 * present when you try to remove it; you cannot insert an element 24 * (using any method) unless another thread is trying to remove it; 25 * you cannot iterate as there is nothing to iterate. The 26 * <em>head</em> of the queue is the element that the first queued 27 * inserting thread is trying to add to the queue; if there is no such 28 * queued thread then no element is available for removal and 29 * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other 30 * <tt>Collection</tt> methods (for example <tt>contains</tt>), a 31 * <tt>SynchronousQueue</tt> acts as an empty collection. This queue 32 * does not permit <tt>null</tt> elements. 33 * 34 * <p>Synchronous queues are similar to rendezvous channels used in 35 * CSP and Ada. They are well suited for handoff designs, in which an 36 * object running in one thread must sync up with an object running 37 * in another thread in order to hand it some information, event, or 38 * task. 39 * 40 * <p> This class supports an optional fairness policy for ordering 41 * waiting producer and consumer threads. By default, this ordering 42 * is not guaranteed. However, a queue constructed with fairness set 43 * to <tt>true</tt> grants threads access in FIFO order. 44 * 45 * <p>This class and its iterator implement all of the 46 * <em>optional</em> methods of the {@link Collection} and {@link 47 * Iterator} interfaces. 48 * 49 * @since 1.5 50 * @author Doug Lea and Bill Scherer and Michael Scott 51 * @param <E> the type of elements held in this collection 52 */ 53 public class SynchronousQueue<E> extends AbstractQueue<E> 54 implements BlockingQueue<E>, java.io.Serializable { 55 private static final long serialVersionUID = -3223113410248163686L; 56 57 /* 58 * This class implements extensions of the dual stack and dual 59 * queue algorithms described in "Nonblocking Concurrent Objects 60 * with Condition Synchronization", by W. N. Scherer III and 61 * M. L. Scott. 18th Annual Conf. on Distributed Computing, 62 * Oct. 2004 (see also 63 * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html). 64 * The (Lifo) stack is used for non-fair mode, and the (Fifo) 65 * queue for fair mode. The performance of the two is generally 66 * similar. Fifo usually supports higher throughput under 67 * contention but Lifo maintains higher thread locality in common 68 * applications. 69 * 70 * A dual queue (and similarly stack) is one that at any given 71 * time either holds "data" -- items provided by put operations, 72 * or "requests" -- slots representing take operations, or is 73 * empty. A call to "fulfill" (i.e., a call requesting an item 74 * from a queue holding data or vice versa) dequeues a 75 * complementary node. The most interesting feature of these 76 * queues is that any operation can figure out which mode the 77 * queue is in, and act accordingly without needing locks. 78 * 79 * Both the queue and stack extend abstract class Transferer 80 * defining the single method transfer that does a put or a 81 * take. These are unified into a single method because in dual 82 * data structures, the put and take operations are symmetrical, 83 * so nearly all code can be combined. The resulting transfer 84 * methods are on the long side, but are easier to follow than 85 * they would be if broken up into nearly-duplicated parts. 86 * 87 * The queue and stack data structures share many conceptual 88 * similarities but very few concrete details. For simplicity, 89 * they are kept distinct so that they can later evolve 90 * separately. 91 * 92 * The algorithms here differ from the versions in the above paper 93 * in extending them for use in synchronous queues, as well as 94 * dealing with cancellation. The main differences include: 95 * 96 * 1. The original algorithms used bit-marked pointers, but 97 * the ones here use mode bits in nodes, leading to a number 98 * of further adaptations. 99 * 2. SynchronousQueues must block threads waiting to become 100 * fulfilled. 101 * 3. Support for cancellation via timeout and interrupts, 102 * including cleaning out cancelled nodes/threads 103 * from lists to avoid garbage retention and memory depletion. 104 * 105 * Blocking is mainly accomplished using LockSupport park/unpark, 106 * except that nodes that appear to be the next ones to become 107 * fulfilled first spin a bit (on multiprocessors only). On very 108 * busy synchronous queues, spinning can dramatically improve 109 * throughput. And on less busy ones, the amount of spinning is 110 * small enough not to be noticeable. 111 * 112 * Cleaning is done in different ways in queues vs stacks. For 113 * queues, we can almost always remove a node immediately in O(1) 114 * time (modulo retries for consistency checks) when it is 115 * cancelled. But if it may be pinned as the current tail, it must 116 * wait until some subsequent cancellation. For stacks, we need a 117 * potentially O(n) traversal to be sure that we can remove the 118 * node, but this can run concurrently with other threads 119 * accessing the stack. 120 * 121 * While garbage collection takes care of most node reclamation 122 * issues that otherwise complicate nonblocking algorithms, care 123 * is taken to "forget" references to data, other nodes, and 124 * threads that might be held on to long-term by blocked 125 * threads. In cases where setting to null would otherwise 126 * conflict with main algorithms, this is done by changing a 127 * node's link to now point to the node itself. This doesn't arise 128 * much for Stack nodes (because blocked threads do not hang on to 129 * old head pointers), but references in Queue nodes must be 130 * aggressively forgotten to avoid reachability of everything any 131 * node has ever referred to since arrival. 132 */ 133 134 /** 135 * Shared internal API for dual stacks and queues. 136 */ 137 abstract static class Transferer { 138 /** 139 * Performs a put or take. 140 * 141 * @param e if non-null, the item to be handed to a consumer; 142 * if null, requests that transfer return an item 143 * offered by producer. 144 * @param timed if this operation should timeout 145 * @param nanos the timeout, in nanoseconds 146 * @return if non-null, the item provided or received; if null, 147 * the operation failed due to timeout or interrupt -- 148 * the caller can distinguish which of these occurred 149 * by checking Thread.interrupted. 150 */ 151 abstract Object transfer(Object e, boolean timed, long nanos); 152 } 153 154 /** The number of CPUs, for spin control */ 155 static final int NCPUS = Runtime.getRuntime().availableProcessors(); 156 157 /** 158 * The number of times to spin before blocking in timed waits. 159 * The value is empirically derived -- it works well across a 160 * variety of processors and OSes. Empirically, the best value 161 * seems not to vary with number of CPUs (beyond 2) so is just 162 * a constant. 163 */ 164 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; 165 166 /** 167 * The number of times to spin before blocking in untimed waits. 168 * This is greater than timed value because untimed waits spin 169 * faster since they don't need to check times on each spin. 170 */ 171 static final int maxUntimedSpins = maxTimedSpins * 16; 172 173 /** 174 * The number of nanoseconds for which it is faster to spin 175 * rather than to use timed park. A rough estimate suffices. 176 */ 177 static final long spinForTimeoutThreshold = 1000L; 178 179 /** Dual stack */ 180 static final class TransferStack extends Transferer { 181 /* 182 * This extends Scherer-Scott dual stack algorithm, differing, 183 * among other ways, by using "covering" nodes rather than 184 * bit-marked pointers: Fulfilling operations push on marker 185 * nodes (with FULFILLING bit set in mode) to reserve a spot 186 * to match a waiting node. 187 */ 188 189 /* Modes for SNodes, ORed together in node fields */ 190 /** Node represents an unfulfilled consumer */ 191 static final int REQUEST = 0; 192 /** Node represents an unfulfilled producer */ 193 static final int DATA = 1; 194 /** Node is fulfilling another unfulfilled DATA or REQUEST */ 195 static final int FULFILLING = 2; 196 197 /** Return true if m has fulfilling bit set */ 198 static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } 199 200 /** Node class for TransferStacks. */ 201 static final class SNode { 202 volatile SNode next; // next node in stack 203 volatile SNode match; // the node matched to this 204 volatile Thread waiter; // to control park/unpark 205 Object item; // data; or null for REQUESTs 206 int mode; 207 // Note: item and mode fields don't need to be volatile 208 // since they are always written before, and read after, 209 // other volatile/atomic operations. 210 211 SNode(Object item) { 212 this.item = item; 213 } 214 215 boolean casNext(SNode cmp, SNode val) { 216 return cmp == next && 217 UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 218 } 219 220 /** 221 * Tries to match node s to this node, if so, waking up thread. 222 * Fulfillers call tryMatch to identify their waiters. 223 * Waiters block until they have been matched. 224 * 225 * @param s the node to match 226 * @return true if successfully matched to s 227 */ 228 boolean tryMatch(SNode s) { 229 if (match == null && 230 UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { 231 Thread w = waiter; 232 if (w != null) { // waiters need at most one unpark 233 waiter = null; 234 LockSupport.unpark(w); 235 } 236 return true; 237 } 238 return match == s; 239 } 240 241 /** 242 * Tries to cancel a wait by matching node to itself. 243 */ 244 void tryCancel() { 245 UNSAFE.compareAndSwapObject(this, matchOffset, null, this); 246 } 247 248 boolean isCancelled() { 249 return match == this; 250 } 251 252 // Unsafe mechanics 253 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 254 private static final long nextOffset = 255 objectFieldOffset(UNSAFE, "next", SNode.class); 256 private static final long matchOffset = 257 objectFieldOffset(UNSAFE, "match", SNode.class); 258 259 } 260 261 /** The head (top) of the stack */ 262 volatile SNode head; 263 264 boolean casHead(SNode h, SNode nh) { 265 return h == head && 266 UNSAFE.compareAndSwapObject(this, headOffset, h, nh); 267 } 268 269 /** 270 * Creates or resets fields of a node. Called only from transfer 271 * where the node to push on stack is lazily created and 272 * reused when possible to help reduce intervals between reads 273 * and CASes of head and to avoid surges of garbage when CASes 274 * to push nodes fail due to contention. 275 */ 276 static SNode snode(SNode s, Object e, SNode next, int mode) { 277 if (s == null) s = new SNode(e); 278 s.mode = mode; 279 s.next = next; 280 return s; 281 } 282 283 /** 284 * Puts or takes an item. 285 */ 286 Object transfer(Object e, boolean timed, long nanos) { 287 /* 288 * Basic algorithm is to loop trying one of three actions: 289 * 290 * 1. If apparently empty or already containing nodes of same 291 * mode, try to push node on stack and wait for a match, 292 * returning it, or null if cancelled. 293 * 294 * 2. If apparently containing node of complementary mode, 295 * try to push a fulfilling node on to stack, match 296 * with corresponding waiting node, pop both from 297 * stack, and return matched item. The matching or 298 * unlinking might not actually be necessary because of 299 * other threads performing action 3: 300 * 301 * 3. If top of stack already holds another fulfilling node, 302 * help it out by doing its match and/or pop 303 * operations, and then continue. The code for helping 304 * is essentially the same as for fulfilling, except 305 * that it doesn't return the item. 306 */ 307 308 SNode s = null; // constructed/reused as needed 309 int mode = (e == null) ? REQUEST : DATA; 310 311 for (;;) { 312 SNode h = head; 313 if (h == null || h.mode == mode) { // empty or same-mode 314 if (timed && nanos <= 0) { // can't wait 315 if (h != null && h.isCancelled()) 316 casHead(h, h.next); // pop cancelled node 317 else 318 return null; 319 } else if (casHead(h, s = snode(s, e, h, mode))) { 320 SNode m = awaitFulfill(s, timed, nanos); 321 if (m == s) { // wait was cancelled 322 clean(s); 323 return null; 324 } 325 if ((h = head) != null && h.next == s) 326 casHead(h, s.next); // help s's fulfiller 327 return (mode == REQUEST) ? m.item : s.item; 328 } 329 } else if (!isFulfilling(h.mode)) { // try to fulfill 330 if (h.isCancelled()) // already cancelled 331 casHead(h, h.next); // pop and retry 332 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { 333 for (;;) { // loop until matched or waiters disappear 334 SNode m = s.next; // m is s's match 335 if (m == null) { // all waiters are gone 336 casHead(s, null); // pop fulfill node 337 s = null; // use new node next time 338 break; // restart main loop 339 } 340 SNode mn = m.next; 341 if (m.tryMatch(s)) { 342 casHead(s, mn); // pop both s and m 343 return (mode == REQUEST) ? m.item : s.item; 344 } else // lost match 345 s.casNext(m, mn); // help unlink 346 } 347 } 348 } else { // help a fulfiller 349 SNode m = h.next; // m is h's match 350 if (m == null) // waiter is gone 351 casHead(h, null); // pop fulfilling node 352 else { 353 SNode mn = m.next; 354 if (m.tryMatch(h)) // help match 355 casHead(h, mn); // pop both h and m 356 else // lost match 357 h.casNext(m, mn); // help unlink 358 } 359 } 360 } 361 } 362 363 /** 364 * Spins/blocks until node s is matched by a fulfill operation. 365 * 366 * @param s the waiting node 367 * @param timed true if timed wait 368 * @param nanos timeout value 369 * @return matched node, or s if cancelled 370 */ 371 SNode awaitFulfill(SNode s, boolean timed, long nanos) { 372 /* 373 * When a node/thread is about to block, it sets its waiter 374 * field and then rechecks state at least one more time 375 * before actually parking, thus covering race vs 376 * fulfiller noticing that waiter is non-null so should be 377 * woken. 378 * 379 * When invoked by nodes that appear at the point of call 380 * to be at the head of the stack, calls to park are 381 * preceded by spins to avoid blocking when producers and 382 * consumers are arriving very close in time. This can 383 * happen enough to bother only on multiprocessors. 384 * 385 * The order of checks for returning out of main loop 386 * reflects fact that interrupts have precedence over 387 * normal returns, which have precedence over 388 * timeouts. (So, on timeout, one last check for match is 389 * done before giving up.) Except that calls from untimed 390 * SynchronousQueue.{poll/offer} don't check interrupts 391 * and don't wait at all, so are trapped in transfer 392 * method rather than calling awaitFulfill. 393 */ 394 long lastTime = timed ? System.nanoTime() : 0; 395 Thread w = Thread.currentThread(); 396 SNode h = head; 397 int spins = (shouldSpin(s) ? 398 (timed ? maxTimedSpins : maxUntimedSpins) : 0); 399 for (;;) { 400 if (w.isInterrupted()) 401 s.tryCancel(); 402 SNode m = s.match; 403 if (m != null) 404 return m; 405 if (timed) { 406 long now = System.nanoTime(); 407 nanos -= now - lastTime; 408 lastTime = now; 409 if (nanos <= 0) { 410 s.tryCancel(); 411 continue; 412 } 413 } 414 if (spins > 0) 415 spins = shouldSpin(s) ? (spins-1) : 0; 416 else if (s.waiter == null) 417 s.waiter = w; // establish waiter so can park next iter 418 else if (!timed) 419 LockSupport.park(this); 420 else if (nanos > spinForTimeoutThreshold) 421 LockSupport.parkNanos(this, nanos); 422 } 423 } 424 425 /** 426 * Returns true if node s is at head or there is an active 427 * fulfiller. 428 */ 429 boolean shouldSpin(SNode s) { 430 SNode h = head; 431 return (h == s || h == null || isFulfilling(h.mode)); 432 } 433 434 /** 435 * Unlinks s from the stack. 436 */ 437 void clean(SNode s) { 438 s.item = null; // forget item 439 s.waiter = null; // forget thread 440 441 /* 442 * At worst we may need to traverse entire stack to unlink 443 * s. If there are multiple concurrent calls to clean, we 444 * might not see s if another thread has already removed 445 * it. But we can stop when we see any node known to 446 * follow s. We use s.next unless it too is cancelled, in 447 * which case we try the node one past. We don't check any 448 * further because we don't want to doubly traverse just to 449 * find sentinel. 450 */ 451 452 SNode past = s.next; 453 if (past != null && past.isCancelled()) 454 past = past.next; 455 456 // Absorb cancelled nodes at head 457 SNode p; 458 while ((p = head) != null && p != past && p.isCancelled()) 459 casHead(p, p.next); 460 461 // Unsplice embedded nodes 462 while (p != null && p != past) { 463 SNode n = p.next; 464 if (n != null && n.isCancelled()) 465 p.casNext(n, n.next); 466 else 467 p = n; 468 } 469 } 470 471 // Unsafe mechanics 472 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 473 private static final long headOffset = 474 objectFieldOffset(UNSAFE, "head", TransferStack.class); 475 476 } 477 478 /** Dual Queue */ 479 static final class TransferQueue extends Transferer { 480 /* 481 * This extends Scherer-Scott dual queue algorithm, differing, 482 * among other ways, by using modes within nodes rather than 483 * marked pointers. The algorithm is a little simpler than 484 * that for stacks because fulfillers do not need explicit 485 * nodes, and matching is done by CAS'ing QNode.item field 486 * from non-null to null (for put) or vice versa (for take). 487 */ 488 489 /** Node class for TransferQueue. */ 490 static final class QNode { 491 volatile QNode next; // next node in queue 492 volatile Object item; // CAS'ed to or from null 493 volatile Thread waiter; // to control park/unpark 494 final boolean isData; 495 496 QNode(Object item, boolean isData) { 497 this.item = item; 498 this.isData = isData; 499 } 500 501 boolean casNext(QNode cmp, QNode val) { 502 return next == cmp && 503 UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 504 } 505 506 boolean casItem(Object cmp, Object val) { 507 return item == cmp && 508 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); 509 } 510 511 /** 512 * Tries to cancel by CAS'ing ref to this as item. 513 */ 514 void tryCancel(Object cmp) { 515 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); 516 } 517 518 boolean isCancelled() { 519 return item == this; 520 } 521 522 /** 523 * Returns true if this node is known to be off the queue 524 * because its next pointer has been forgotten due to 525 * an advanceHead operation. 526 */ 527 boolean isOffList() { 528 return next == this; 529 } 530 531 // Unsafe mechanics 532 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 533 private static final long nextOffset = 534 objectFieldOffset(UNSAFE, "next", QNode.class); 535 private static final long itemOffset = 536 objectFieldOffset(UNSAFE, "item", QNode.class); 537 } 538 539 /** Head of queue */ 540 transient volatile QNode head; 541 /** Tail of queue */ 542 transient volatile QNode tail; 543 /** 544 * Reference to a cancelled node that might not yet have been 545 * unlinked from queue because it was the last inserted node 546 * when it cancelled. 547 */ 548 transient volatile QNode cleanMe; 549 550 TransferQueue() { 551 QNode h = new QNode(null, false); // initialize to dummy node. 552 head = h; 553 tail = h; 554 } 555 556 /** 557 * Tries to cas nh as new head; if successful, unlink 558 * old head's next node to avoid garbage retention. 559 */ 560 void advanceHead(QNode h, QNode nh) { 561 if (h == head && 562 UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) 563 h.next = h; // forget old next 564 } 565 566 /** 567 * Tries to cas nt as new tail. 568 */ 569 void advanceTail(QNode t, QNode nt) { 570 if (tail == t) 571 UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); 572 } 573 574 /** 575 * Tries to CAS cleanMe slot. 576 */ 577 boolean casCleanMe(QNode cmp, QNode val) { 578 return cleanMe == cmp && 579 UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val); 580 } 581 582 /** 583 * Puts or takes an item. 584 */ 585 Object transfer(Object e, boolean timed, long nanos) { 586 /* Basic algorithm is to loop trying to take either of 587 * two actions: 588 * 589 * 1. If queue apparently empty or holding same-mode nodes, 590 * try to add node to queue of waiters, wait to be 591 * fulfilled (or cancelled) and return matching item. 592 * 593 * 2. If queue apparently contains waiting items, and this 594 * call is of complementary mode, try to fulfill by CAS'ing 595 * item field of waiting node and dequeuing it, and then 596 * returning matching item. 597 * 598 * In each case, along the way, check for and try to help 599 * advance head and tail on behalf of other stalled/slow 600 * threads. 601 * 602 * The loop starts off with a null check guarding against 603 * seeing uninitialized head or tail values. This never 604 * happens in current SynchronousQueue, but could if 605 * callers held non-volatile/final ref to the 606 * transferer. The check is here anyway because it places 607 * null checks at top of loop, which is usually faster 608 * than having them implicitly interspersed. 609 */ 610 611 QNode s = null; // constructed/reused as needed 612 boolean isData = (e != null); 613 614 for (;;) { 615 QNode t = tail; 616 QNode h = head; 617 if (t == null || h == null) // saw uninitialized value 618 continue; // spin 619 620 if (h == t || t.isData == isData) { // empty or same-mode 621 QNode tn = t.next; 622 if (t != tail) // inconsistent read 623 continue; 624 if (tn != null) { // lagging tail 625 advanceTail(t, tn); 626 continue; 627 } 628 if (timed && nanos <= 0) // can't wait 629 return null; 630 if (s == null) 631 s = new QNode(e, isData); 632 if (!t.casNext(null, s)) // failed to link in 633 continue; 634 635 advanceTail(t, s); // swing tail and wait 636 Object x = awaitFulfill(s, e, timed, nanos); 637 if (x == s) { // wait was cancelled 638 clean(t, s); 639 return null; 640 } 641 642 if (!s.isOffList()) { // not already unlinked 643 advanceHead(t, s); // unlink if head 644 if (x != null) // and forget fields 645 s.item = s; 646 s.waiter = null; 647 } 648 return (x != null) ? x : e; 649 650 } else { // complementary-mode 651 QNode m = h.next; // node to fulfill 652 if (t != tail || m == null || h != head) 653 continue; // inconsistent read 654 655 Object x = m.item; 656 if (isData == (x != null) || // m already fulfilled 657 x == m || // m cancelled 658 !m.casItem(x, e)) { // lost CAS 659 advanceHead(h, m); // dequeue and retry 660 continue; 661 } 662 663 advanceHead(h, m); // successfully fulfilled 664 LockSupport.unpark(m.waiter); 665 return (x != null) ? x : e; 666 } 667 } 668 } 669 670 /** 671 * Spins/blocks until node s is fulfilled. 672 * 673 * @param s the waiting node 674 * @param e the comparison value for checking match 675 * @param timed true if timed wait 676 * @param nanos timeout value 677 * @return matched item, or s if cancelled 678 */ 679 Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) { 680 /* Same idea as TransferStack.awaitFulfill */ 681 long lastTime = timed ? System.nanoTime() : 0; 682 Thread w = Thread.currentThread(); 683 int spins = ((head.next == s) ? 684 (timed ? maxTimedSpins : maxUntimedSpins) : 0); 685 for (;;) { 686 if (w.isInterrupted()) 687 s.tryCancel(e); 688 Object x = s.item; 689 if (x != e) 690 return x; 691 if (timed) { 692 long now = System.nanoTime(); 693 nanos -= now - lastTime; 694 lastTime = now; 695 if (nanos <= 0) { 696 s.tryCancel(e); 697 continue; 698 } 699 } 700 if (spins > 0) 701 --spins; 702 else if (s.waiter == null) 703 s.waiter = w; 704 else if (!timed) 705 LockSupport.park(this); 706 else if (nanos > spinForTimeoutThreshold) 707 LockSupport.parkNanos(this, nanos); 708 } 709 } 710 711 /** 712 * Gets rid of cancelled node s with original predecessor pred. 713 */ 714 void clean(QNode pred, QNode s) { 715 s.waiter = null; // forget thread 716 /* 717 * At any given time, exactly one node on list cannot be 718 * deleted -- the last inserted node. To accommodate this, 719 * if we cannot delete s, we save its predecessor as 720 * "cleanMe", deleting the previously saved version 721 * first. At least one of node s or the node previously 722 * saved can always be deleted, so this always terminates. 723 */ 724 while (pred.next == s) { // Return early if already unlinked 725 QNode h = head; 726 QNode hn = h.next; // Absorb cancelled first node as head 727 if (hn != null && hn.isCancelled()) { 728 advanceHead(h, hn); 729 continue; 730 } 731 QNode t = tail; // Ensure consistent read for tail 732 if (t == h) 733 return; 734 QNode tn = t.next; 735 if (t != tail) 736 continue; 737 if (tn != null) { 738 advanceTail(t, tn); 739 continue; 740 } 741 if (s != t) { // If not tail, try to unsplice 742 QNode sn = s.next; 743 if (sn == s || pred.casNext(s, sn)) 744 return; 745 } 746 QNode dp = cleanMe; 747 if (dp != null) { // Try unlinking previous cancelled node 748 QNode d = dp.next; 749 QNode dn; 750 if (d == null || // d is gone or 751 d == dp || // d is off list or 752 !d.isCancelled() || // d not cancelled or 753 (d != t && // d not tail and 754 (dn = d.next) != null && // has successor 755 dn != d && // that is on list 756 dp.casNext(d, dn))) // d unspliced 757 casCleanMe(dp, null); 758 if (dp == pred) 759 return; // s is already saved node 760 } else if (casCleanMe(null, pred)) 761 return; // Postpone cleaning s 762 } 763 } 764 765 // unsafe mechanics 766 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 767 private static final long headOffset = 768 objectFieldOffset(UNSAFE, "head", TransferQueue.class); 769 private static final long tailOffset = 770 objectFieldOffset(UNSAFE, "tail", TransferQueue.class); 771 private static final long cleanMeOffset = 772 objectFieldOffset(UNSAFE, "cleanMe", TransferQueue.class); 773 774 } 775 776 /** 777 * The transferer. Set only in constructor, but cannot be declared 778 * as final without further complicating serialization. Since 779 * this is accessed only at most once per public method, there 780 * isn't a noticeable performance penalty for using volatile 781 * instead of final here. 782 */ 783 private transient volatile Transferer transferer; 784 785 /** 786 * Creates a <tt>SynchronousQueue</tt> with nonfair access policy. 787 */ 788 public SynchronousQueue() { 789 this(false); 790 } 791 792 /** 793 * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy. 794 * 795 * @param fair if true, waiting threads contend in FIFO order for 796 * access; otherwise the order is unspecified. 797 */ 798 public SynchronousQueue(boolean fair) { 799 transferer = fair ? new TransferQueue() : new TransferStack(); 800 } 801 802 /** 803 * Adds the specified element to this queue, waiting if necessary for 804 * another thread to receive it. 805 * 806 * @throws InterruptedException {@inheritDoc} 807 * @throws NullPointerException {@inheritDoc} 808 */ 809 public void put(E o) throws InterruptedException { 810 if (o == null) throw new NullPointerException(); 811 if (transferer.transfer(o, false, 0) == null) { 812 Thread.interrupted(); 813 throw new InterruptedException(); 814 } 815 } 816 817 /** 818 * Inserts the specified element into this queue, waiting if necessary 819 * up to the specified wait time for another thread to receive it. 820 * 821 * @return <tt>true</tt> if successful, or <tt>false</tt> if the 822 * specified waiting time elapses before a consumer appears. 823 * @throws InterruptedException {@inheritDoc} 824 * @throws NullPointerException {@inheritDoc} 825 */ 826 public boolean offer(E o, long timeout, TimeUnit unit) 827 throws InterruptedException { 828 if (o == null) throw new NullPointerException(); 829 if (transferer.transfer(o, true, unit.toNanos(timeout)) != null) 830 return true; 831 if (!Thread.interrupted()) 832 return false; 833 throw new InterruptedException(); 834 } 835 836 /** 837 * Inserts the specified element into this queue, if another thread is 838 * waiting to receive it. 839 * 840 * @param e the element to add 841 * @return <tt>true</tt> if the element was added to this queue, else 842 * <tt>false</tt> 843 * @throws NullPointerException if the specified element is null 844 */ 845 public boolean offer(E e) { 846 if (e == null) throw new NullPointerException(); 847 return transferer.transfer(e, true, 0) != null; 848 } 849 850 /** 851 * Retrieves and removes the head of this queue, waiting if necessary 852 * for another thread to insert it. 853 * 854 * @return the head of this queue 855 * @throws InterruptedException {@inheritDoc} 856 */ 857 public E take() throws InterruptedException { 858 Object e = transferer.transfer(null, false, 0); 859 if (e != null) 860 return (E)e; 861 Thread.interrupted(); 862 throw new InterruptedException(); 863 } 864 865 /** 866 * Retrieves and removes the head of this queue, waiting 867 * if necessary up to the specified wait time, for another thread 868 * to insert it. 869 * 870 * @return the head of this queue, or <tt>null</tt> if the 871 * specified waiting time elapses before an element is present. 872 * @throws InterruptedException {@inheritDoc} 873 */ 874 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 875 Object e = transferer.transfer(null, true, unit.toNanos(timeout)); 876 if (e != null || !Thread.interrupted()) 877 return (E)e; 878 throw new InterruptedException(); 879 } 880 881 /** 882 * Retrieves and removes the head of this queue, if another thread 883 * is currently making an element available. 884 * 885 * @return the head of this queue, or <tt>null</tt> if no 886 * element is available. 887 */ 888 public E poll() { 889 return (E)transferer.transfer(null, true, 0); 890 } 891 892 /** 893 * Always returns <tt>true</tt>. 894 * A <tt>SynchronousQueue</tt> has no internal capacity. 895 * 896 * @return <tt>true</tt> 897 */ 898 public boolean isEmpty() { 899 return true; 900 } 901 902 /** 903 * Always returns zero. 904 * A <tt>SynchronousQueue</tt> has no internal capacity. 905 * 906 * @return zero. 907 */ 908 public int size() { 909 return 0; 910 } 911 912 /** 913 * Always returns zero. 914 * A <tt>SynchronousQueue</tt> has no internal capacity. 915 * 916 * @return zero. 917 */ 918 public int remainingCapacity() { 919 return 0; 920 } 921 922 /** 923 * Does nothing. 924 * A <tt>SynchronousQueue</tt> has no internal capacity. 925 */ 926 public void clear() { 927 } 928 929 /** 930 * Always returns <tt>false</tt>. 931 * A <tt>SynchronousQueue</tt> has no internal capacity. 932 * 933 * @param o the element 934 * @return <tt>false</tt> 935 */ 936 public boolean contains(Object o) { 937 return false; 938 } 939 940 /** 941 * Always returns <tt>false</tt>. 942 * A <tt>SynchronousQueue</tt> has no internal capacity. 943 * 944 * @param o the element to remove 945 * @return <tt>false</tt> 946 */ 947 public boolean remove(Object o) { 948 return false; 949 } 950 951 /** 952 * Returns <tt>false</tt> unless the given collection is empty. 953 * A <tt>SynchronousQueue</tt> has no internal capacity. 954 * 955 * @param c the collection 956 * @return <tt>false</tt> unless given collection is empty 957 */ 958 public boolean containsAll(Collection<?> c) { 959 return c.isEmpty(); 960 } 961 962 /** 963 * Always returns <tt>false</tt>. 964 * A <tt>SynchronousQueue</tt> has no internal capacity. 965 * 966 * @param c the collection 967 * @return <tt>false</tt> 968 */ 969 public boolean removeAll(Collection<?> c) { 970 return false; 971 } 972 973 /** 974 * Always returns <tt>false</tt>. 975 * A <tt>SynchronousQueue</tt> has no internal capacity. 976 * 977 * @param c the collection 978 * @return <tt>false</tt> 979 */ 980 public boolean retainAll(Collection<?> c) { 981 return false; 982 } 983 984 /** 985 * Always returns <tt>null</tt>. 986 * A <tt>SynchronousQueue</tt> does not return elements 987 * unless actively waited on. 988 * 989 * @return <tt>null</tt> 990 */ 991 public E peek() { 992 return null; 993 } 994 995 /** 996 * Returns an empty iterator in which <tt>hasNext</tt> always returns 997 * <tt>false</tt>. 998 * 999 * @return an empty iterator 1000 */ 1001 public Iterator<E> iterator() { 1002 return Collections.<E>emptySet().iterator(); // android-changed 1003 } 1004 1005 /** 1006 * Returns a zero-length array. 1007 * @return a zero-length array 1008 */ 1009 public Object[] toArray() { 1010 return EmptyArray.OBJECT; // android-changed 1011 } 1012 1013 /** 1014 * Sets the zeroeth element of the specified array to <tt>null</tt> 1015 * (if the array has non-zero length) and returns it. 1016 * 1017 * @param a the array 1018 * @return the specified array 1019 * @throws NullPointerException if the specified array is null 1020 */ 1021 public <T> T[] toArray(T[] a) { 1022 if (a.length > 0) 1023 a[0] = null; 1024 return a; 1025 } 1026 1027 /** 1028 * @throws UnsupportedOperationException {@inheritDoc} 1029 * @throws ClassCastException {@inheritDoc} 1030 * @throws NullPointerException {@inheritDoc} 1031 * @throws IllegalArgumentException {@inheritDoc} 1032 */ 1033 public int drainTo(Collection<? super E> c) { 1034 if (c == null) 1035 throw new NullPointerException(); 1036 if (c == this) 1037 throw new IllegalArgumentException(); 1038 int n = 0; 1039 E e; 1040 while ( (e = poll()) != null) { 1041 c.add(e); 1042 ++n; 1043 } 1044 return n; 1045 } 1046 1047 /** 1048 * @throws UnsupportedOperationException {@inheritDoc} 1049 * @throws ClassCastException {@inheritDoc} 1050 * @throws NullPointerException {@inheritDoc} 1051 * @throws IllegalArgumentException {@inheritDoc} 1052 */ 1053 public int drainTo(Collection<? super E> c, int maxElements) { 1054 if (c == null) 1055 throw new NullPointerException(); 1056 if (c == this) 1057 throw new IllegalArgumentException(); 1058 int n = 0; 1059 E e; 1060 while (n < maxElements && (e = poll()) != null) { 1061 c.add(e); 1062 ++n; 1063 } 1064 return n; 1065 } 1066 1067 /* 1068 * To cope with serialization strategy in the 1.5 version of 1069 * SynchronousQueue, we declare some unused classes and fields 1070 * that exist solely to enable serializability across versions. 1071 * These fields are never used, so are initialized only if this 1072 * object is ever serialized or deserialized. 1073 */ 1074 1075 static class WaitQueue implements java.io.Serializable { } 1076 static class LifoWaitQueue extends WaitQueue { 1077 private static final long serialVersionUID = -3633113410248163686L; 1078 } 1079 static class FifoWaitQueue extends WaitQueue { 1080 private static final long serialVersionUID = -3623113410248163686L; 1081 } 1082 private ReentrantLock qlock; 1083 private WaitQueue waitingProducers; 1084 private WaitQueue waitingConsumers; 1085 1086 /** 1087 * Save the state to a stream (that is, serialize it). 1088 * 1089 * @param s the stream 1090 */ 1091 private void writeObject(java.io.ObjectOutputStream s) 1092 throws java.io.IOException { 1093 boolean fair = transferer instanceof TransferQueue; 1094 if (fair) { 1095 qlock = new ReentrantLock(true); 1096 waitingProducers = new FifoWaitQueue(); 1097 waitingConsumers = new FifoWaitQueue(); 1098 } 1099 else { 1100 qlock = new ReentrantLock(); 1101 waitingProducers = new LifoWaitQueue(); 1102 waitingConsumers = new LifoWaitQueue(); 1103 } 1104 s.defaultWriteObject(); 1105 } 1106 1107 private void readObject(final java.io.ObjectInputStream s) 1108 throws java.io.IOException, ClassNotFoundException { 1109 s.defaultReadObject(); 1110 if (waitingProducers instanceof FifoWaitQueue) 1111 transferer = new TransferQueue(); 1112 else 1113 transferer = new TransferStack(); 1114 } 1115 1116 // Unsafe mechanics 1117 static long objectFieldOffset(sun.misc.Unsafe UNSAFE, 1118 String field, Class<?> klazz) { 1119 try { 1120 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); 1121 } catch (NoSuchFieldException e) { 1122 // Convert Exception to corresponding Error 1123 NoSuchFieldError error = new NoSuchFieldError(field); 1124 error.initCause(e); 1125 throw error; 1126 } 1127 } 1128 1129 } 1130