Home | History | Annotate | Download | only in jsr166
      1 /*
      2  * Written by Doug Lea with assistance from members of JCP JSR-166
      3  * Expert Group and released to the public domain, as explained at
      4  * http://creativecommons.org/publicdomain/zero/1.0/
      5  * Other contributors include Andrew Wright, Jeffrey Hayes,
      6  * Pat Fisher, Mike Judd.
      7  */
      8 
      9 package jsr166;
     10 
     11 import junit.framework.*;
     12 import java.util.*;
     13 import java.util.concurrent.BrokenBarrierException;
     14 import java.util.concurrent.CountDownLatch;
     15 import java.util.concurrent.CyclicBarrier;
     16 import java.util.concurrent.TimeoutException;
     17 import java.util.concurrent.atomic.AtomicBoolean;
     18 import static java.util.concurrent.TimeUnit.MILLISECONDS;
     19 
     20 public class CyclicBarrierTest extends JSR166TestCase {
     21 
     22     private volatile int countAction;
     23     private class MyAction implements Runnable {
     24         public void run() { ++countAction; }
     25     }
     26 
     27     /**
     28      * Spin-waits till the number of waiters == numberOfWaiters.
     29      */
     30     void awaitNumberWaiting(CyclicBarrier barrier, int numberOfWaiters) {
     31         long startTime = System.nanoTime();
     32         while (barrier.getNumberWaiting() != numberOfWaiters) {
     33             if (millisElapsedSince(startTime) > LONG_DELAY_MS)
     34                 fail("timed out");
     35             Thread.yield();
     36         }
     37     }
     38 
     39     /**
     40      * Creating with negative parties throws IAE
     41      */
     42     public void testConstructor1() {
     43         try {
     44             new CyclicBarrier(-1, (Runnable)null);
     45             shouldThrow();
     46         } catch (IllegalArgumentException success) {}
     47     }
     48 
     49     /**
     50      * Creating with negative parties and no action throws IAE
     51      */
     52     public void testConstructor2() {
     53         try {
     54             new CyclicBarrier(-1);
     55             shouldThrow();
     56         } catch (IllegalArgumentException success) {}
     57     }
     58 
     59     /**
     60      * getParties returns the number of parties given in constructor
     61      */
     62     public void testGetParties() {
     63         CyclicBarrier b = new CyclicBarrier(2);
     64         assertEquals(2, b.getParties());
     65         assertEquals(0, b.getNumberWaiting());
     66     }
     67 
     68     /**
     69      * A 1-party barrier triggers after single await
     70      */
     71     public void testSingleParty() throws Exception {
     72         CyclicBarrier b = new CyclicBarrier(1);
     73         assertEquals(1, b.getParties());
     74         assertEquals(0, b.getNumberWaiting());
     75         b.await();
     76         b.await();
     77         assertEquals(0, b.getNumberWaiting());
     78     }
     79 
     80     /**
     81      * The supplied barrier action is run at barrier
     82      */
     83     public void testBarrierAction() throws Exception {
     84         countAction = 0;
     85         CyclicBarrier b = new CyclicBarrier(1, new MyAction());
     86         assertEquals(1, b.getParties());
     87         assertEquals(0, b.getNumberWaiting());
     88         b.await();
     89         b.await();
     90         assertEquals(0, b.getNumberWaiting());
     91         assertEquals(2, countAction);
     92     }
     93 
     94     /**
     95      * A 2-party/thread barrier triggers after both threads invoke await
     96      */
     97     public void testTwoParties() throws Exception {
     98         final CyclicBarrier b = new CyclicBarrier(2);
     99         Thread t = newStartedThread(new CheckedRunnable() {
    100             public void realRun() throws Exception {
    101                 b.await();
    102                 b.await();
    103                 b.await();
    104                 b.await();
    105             }});
    106 
    107         b.await();
    108         b.await();
    109         b.await();
    110         b.await();
    111         awaitTermination(t);
    112     }
    113 
    114     /**
    115      * An interruption in one party causes others waiting in await to
    116      * throw BrokenBarrierException
    117      */
    118     public void testAwait1_Interrupted_BrokenBarrier() {
    119         final CyclicBarrier c = new CyclicBarrier(3);
    120         final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
    121         Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
    122             public void realRun() throws Exception {
    123                 pleaseInterrupt.countDown();
    124                 c.await();
    125             }};
    126         Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
    127             public void realRun() throws Exception {
    128                 pleaseInterrupt.countDown();
    129                 c.await();
    130             }};
    131 
    132         t1.start();
    133         t2.start();
    134         await(pleaseInterrupt);
    135         t1.interrupt();
    136         awaitTermination(t1);
    137         awaitTermination(t2);
    138     }
    139 
    140     /**
    141      * An interruption in one party causes others waiting in timed await to
    142      * throw BrokenBarrierException
    143      */
    144     public void testAwait2_Interrupted_BrokenBarrier() throws Exception {
    145         final CyclicBarrier c = new CyclicBarrier(3);
    146         final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
    147         Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
    148             public void realRun() throws Exception {
    149                 pleaseInterrupt.countDown();
    150                 c.await(LONG_DELAY_MS, MILLISECONDS);
    151             }};
    152         Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
    153             public void realRun() throws Exception {
    154                 pleaseInterrupt.countDown();
    155                 c.await(LONG_DELAY_MS, MILLISECONDS);
    156             }};
    157 
    158         t1.start();
    159         t2.start();
    160         await(pleaseInterrupt);
    161         t1.interrupt();
    162         awaitTermination(t1);
    163         awaitTermination(t2);
    164     }
    165 
    166     /**
    167      * A timeout in timed await throws TimeoutException
    168      */
    169     public void testAwait3_TimeoutException() throws InterruptedException {
    170         final CyclicBarrier c = new CyclicBarrier(2);
    171         Thread t = newStartedThread(new CheckedRunnable() {
    172             public void realRun() throws Exception {
    173                 long startTime = System.nanoTime();
    174                 try {
    175                     c.await(timeoutMillis(), MILLISECONDS);
    176                     shouldThrow();
    177                 } catch (TimeoutException success) {}
    178                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
    179             }});
    180 
    181         awaitTermination(t);
    182     }
    183 
    184     /**
    185      * A timeout in one party causes others waiting in timed await to
    186      * throw BrokenBarrierException
    187      */
    188     public void testAwait4_Timeout_BrokenBarrier() throws InterruptedException {
    189         final CyclicBarrier c = new CyclicBarrier(3);
    190         Thread t1 = newStartedThread(new CheckedRunnable() {
    191             public void realRun() throws Exception {
    192                 try {
    193                     c.await(LONG_DELAY_MS, MILLISECONDS);
    194                     shouldThrow();
    195                 } catch (BrokenBarrierException success) {}
    196             }});
    197         Thread t2 = newStartedThread(new CheckedRunnable() {
    198             public void realRun() throws Exception {
    199                 awaitNumberWaiting(c, 1);
    200                 long startTime = System.nanoTime();
    201                 try {
    202                     c.await(timeoutMillis(), MILLISECONDS);
    203                     shouldThrow();
    204                 } catch (TimeoutException success) {}
    205                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
    206             }});
    207 
    208         awaitTermination(t1);
    209         awaitTermination(t2);
    210     }
    211 
    212     /**
    213      * A timeout in one party causes others waiting in await to
    214      * throw BrokenBarrierException
    215      */
    216     public void testAwait5_Timeout_BrokenBarrier() throws InterruptedException {
    217         final CyclicBarrier c = new CyclicBarrier(3);
    218         Thread t1 = newStartedThread(new CheckedRunnable() {
    219             public void realRun() throws Exception {
    220                 try {
    221                     c.await();
    222                     shouldThrow();
    223                 } catch (BrokenBarrierException success) {}
    224             }});
    225         Thread t2 = newStartedThread(new CheckedRunnable() {
    226             public void realRun() throws Exception {
    227                 awaitNumberWaiting(c, 1);
    228                 long startTime = System.nanoTime();
    229                 try {
    230                     c.await(timeoutMillis(), MILLISECONDS);
    231                     shouldThrow();
    232                 } catch (TimeoutException success) {}
    233                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
    234             }});
    235 
    236         awaitTermination(t1);
    237         awaitTermination(t2);
    238     }
    239 
    240     /**
    241      * A reset of an active barrier causes waiting threads to throw
    242      * BrokenBarrierException
    243      */
    244     public void testReset_BrokenBarrier() throws InterruptedException {
    245         final CyclicBarrier c = new CyclicBarrier(3);
    246         final CountDownLatch pleaseReset = new CountDownLatch(2);
    247         Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
    248             public void realRun() throws Exception {
    249                 pleaseReset.countDown();
    250                 c.await();
    251             }};
    252         Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
    253             public void realRun() throws Exception {
    254                 pleaseReset.countDown();
    255                 c.await();
    256             }};
    257 
    258         t1.start();
    259         t2.start();
    260         await(pleaseReset);
    261 
    262         awaitNumberWaiting(c, 2);
    263         c.reset();
    264         awaitTermination(t1);
    265         awaitTermination(t2);
    266     }
    267 
    268     /**
    269      * A reset before threads enter barrier does not throw
    270      * BrokenBarrierException
    271      */
    272     public void testReset_NoBrokenBarrier() throws Exception {
    273         final CyclicBarrier c = new CyclicBarrier(3);
    274         c.reset();
    275 
    276         Thread t1 = newStartedThread(new CheckedRunnable() {
    277             public void realRun() throws Exception {
    278                 c.await();
    279             }});
    280         Thread t2 = newStartedThread(new CheckedRunnable() {
    281             public void realRun() throws Exception {
    282                 c.await();
    283             }});
    284 
    285         c.await();
    286         awaitTermination(t1);
    287         awaitTermination(t2);
    288     }
    289 
    290     /**
    291      * All threads block while a barrier is broken.
    292      */
    293     public void testReset_Leakage() throws InterruptedException {
    294         final CyclicBarrier c = new CyclicBarrier(2);
    295         final AtomicBoolean done = new AtomicBoolean();
    296         Thread t = newStartedThread(new CheckedRunnable() {
    297             public void realRun() {
    298                 while (!done.get()) {
    299                     try {
    300                         while (c.isBroken())
    301                             c.reset();
    302 
    303                         c.await();
    304                         shouldThrow();
    305                     }
    306                     catch (BrokenBarrierException ok) {}
    307                     catch (InterruptedException ok) {}
    308                 }}});
    309 
    310         for (int i = 0; i < 4; i++) {
    311             delay(timeoutMillis());
    312             t.interrupt();
    313         }
    314         done.set(true);
    315         t.interrupt();
    316         awaitTermination(t);
    317     }
    318 
    319     /**
    320      * Reset of a non-broken barrier does not break barrier
    321      */
    322     public void testResetWithoutBreakage() throws Exception {
    323         final CyclicBarrier barrier = new CyclicBarrier(3);
    324         for (int i = 0; i < 3; i++) {
    325             final CyclicBarrier start = new CyclicBarrier(3);
    326             Thread t1 = newStartedThread(new CheckedRunnable() {
    327                 public void realRun() throws Exception {
    328                     start.await();
    329                     barrier.await();
    330                 }});
    331 
    332             Thread t2 = newStartedThread(new CheckedRunnable() {
    333                 public void realRun() throws Exception {
    334                     start.await();
    335                     barrier.await();
    336                 }});
    337 
    338             start.await();
    339             barrier.await();
    340             awaitTermination(t1);
    341             awaitTermination(t2);
    342             assertFalse(barrier.isBroken());
    343             assertEquals(0, barrier.getNumberWaiting());
    344             if (i == 1) barrier.reset();
    345             assertFalse(barrier.isBroken());
    346             assertEquals(0, barrier.getNumberWaiting());
    347         }
    348     }
    349 
    350     /**
    351      * Reset of a barrier after interruption reinitializes it.
    352      */
    353     public void testResetAfterInterrupt() throws Exception {
    354         final CyclicBarrier barrier = new CyclicBarrier(3);
    355         for (int i = 0; i < 2; i++) {
    356             final CyclicBarrier start = new CyclicBarrier(3);
    357             Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
    358                 public void realRun() throws Exception {
    359                     start.await();
    360                     barrier.await();
    361                 }};
    362 
    363             Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
    364                 public void realRun() throws Exception {
    365                     start.await();
    366                     barrier.await();
    367                 }};
    368 
    369             t1.start();
    370             t2.start();
    371             start.await();
    372             t1.interrupt();
    373             awaitTermination(t1);
    374             awaitTermination(t2);
    375             assertTrue(barrier.isBroken());
    376             assertEquals(0, barrier.getNumberWaiting());
    377             barrier.reset();
    378             assertFalse(barrier.isBroken());
    379             assertEquals(0, barrier.getNumberWaiting());
    380         }
    381     }
    382 
    383     /**
    384      * Reset of a barrier after timeout reinitializes it.
    385      */
    386     public void testResetAfterTimeout() throws Exception {
    387         final CyclicBarrier barrier = new CyclicBarrier(3);
    388         for (int i = 0; i < 2; i++) {
    389             assertEquals(0, barrier.getNumberWaiting());
    390             Thread t1 = newStartedThread(new CheckedRunnable() {
    391                 public void realRun() throws Exception {
    392                     try {
    393                         barrier.await();
    394                         shouldThrow();
    395                     } catch (BrokenBarrierException success) {}
    396                 }});
    397             Thread t2 = newStartedThread(new CheckedRunnable() {
    398                 public void realRun() throws Exception {
    399                     awaitNumberWaiting(barrier, 1);
    400                     long startTime = System.nanoTime();
    401                     try {
    402                         barrier.await(timeoutMillis(), MILLISECONDS);
    403                         shouldThrow();
    404                     } catch (TimeoutException success) {}
    405                     assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
    406                 }});
    407 
    408             awaitTermination(t1);
    409             awaitTermination(t2);
    410             assertEquals(0, barrier.getNumberWaiting());
    411             assertTrue(barrier.isBroken());
    412             assertEquals(0, barrier.getNumberWaiting());
    413             barrier.reset();
    414             assertFalse(barrier.isBroken());
    415             assertEquals(0, barrier.getNumberWaiting());
    416         }
    417     }
    418 
    419     /**
    420      * Reset of a barrier after a failed command reinitializes it.
    421      */
    422     public void testResetAfterCommandException() throws Exception {
    423         final CyclicBarrier barrier =
    424             new CyclicBarrier(3, new Runnable() {
    425                     public void run() {
    426                         throw new NullPointerException(); }});
    427         for (int i = 0; i < 2; i++) {
    428             final CyclicBarrier start = new CyclicBarrier(3);
    429             Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
    430                 public void realRun() throws Exception {
    431                     start.await();
    432                     barrier.await();
    433                 }};
    434 
    435             Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
    436                 public void realRun() throws Exception {
    437                     start.await();
    438                     barrier.await();
    439                 }};
    440 
    441             t1.start();
    442             t2.start();
    443             start.await();
    444             awaitNumberWaiting(barrier, 2);
    445             try {
    446                 barrier.await();
    447                 shouldThrow();
    448             } catch (NullPointerException success) {}
    449             awaitTermination(t1);
    450             awaitTermination(t2);
    451             assertTrue(barrier.isBroken());
    452             assertEquals(0, barrier.getNumberWaiting());
    453             barrier.reset();
    454             assertFalse(barrier.isBroken());
    455             assertEquals(0, barrier.getNumberWaiting());
    456         }
    457     }
    458 }
    459