Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      3  *
      4  * This code is free software; you can redistribute it and/or modify it
      5  * under the terms of the GNU General Public License version 2 only, as
      6  * published by the Free Software Foundation.  Oracle designates this
      7  * particular file as subject to the "Classpath" exception as provided
      8  * by Oracle in the LICENSE file that accompanied this code.
      9  *
     10  * This code is distributed in the hope that it will be useful, but WITHOUT
     11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     13  * version 2 for more details (a copy is included in the LICENSE file that
     14  * accompanied this code).
     15  *
     16  * You should have received a copy of the GNU General Public License version
     17  * 2 along with this work; if not, write to the Free Software Foundation,
     18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     19  *
     20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     21  * or visit www.oracle.com if you need additional information or have any
     22  * questions.
     23  */
     24 
     25 /*
     26  * This file is available under and governed by the GNU General Public
     27  * License version 2 only, as published by the Free Software Foundation.
     28  * However, the following notice accompanied the original version of this
     29  * file:
     30  *
     31  * Written by Doug Lea with assistance from members of JCP JSR-166
     32  * Expert Group and released to the public domain, as explained at
     33  * http://creativecommons.org/publicdomain/zero/1.0/
     34  */
     35 
     36 package java.util.concurrent;
     37 
     38 import java.util.concurrent.locks.Condition;
     39 import java.util.concurrent.locks.ReentrantLock;
     40 
     41 /**
     42  * A synchronization aid that allows a set of threads to all wait for
     43  * each other to reach a common barrier point.  CyclicBarriers are
     44  * useful in programs involving a fixed sized party of threads that
     45  * must occasionally wait for each other. The barrier is called
     46  * <em>cyclic</em> because it can be re-used after the waiting threads
     47  * are released.
     48  *
     49  * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
     50  * that is run once per barrier point, after the last thread in the party
     51  * arrives, but before any threads are released.
     52  * This <em>barrier action</em> is useful
     53  * for updating shared-state before any of the parties continue.
     54  *
     55  * <p><b>Sample usage:</b> Here is an example of using a barrier in a
     56  * parallel decomposition design:
     57  *
     58  * <pre> {@code
     59  * class Solver {
     60  *   final int N;
     61  *   final float[][] data;
     62  *   final CyclicBarrier barrier;
     63  *
     64  *   class Worker implements Runnable {
     65  *     int myRow;
     66  *     Worker(int row) { myRow = row; }
     67  *     public void run() {
     68  *       while (!done()) {
     69  *         processRow(myRow);
     70  *
     71  *         try {
     72  *           barrier.await();
     73  *         } catch (InterruptedException ex) {
     74  *           return;
     75  *         } catch (BrokenBarrierException ex) {
     76  *           return;
     77  *         }
     78  *       }
     79  *     }
     80  *   }
     81  *
     82  *   public Solver(float[][] matrix) {
     83  *     data = matrix;
     84  *     N = matrix.length;
     85  *     Runnable barrierAction =
     86  *       new Runnable() { public void run() { mergeRows(...); }};
     87  *     barrier = new CyclicBarrier(N, barrierAction);
     88  *
     89  *     List<Thread> threads = new ArrayList<>(N);
     90  *     for (int i = 0; i < N; i++) {
     91  *       Thread thread = new Thread(new Worker(i));
     92  *       threads.add(thread);
     93  *       thread.start();
     94  *     }
     95  *
     96  *     // wait until done
     97  *     for (Thread thread : threads)
     98  *       thread.join();
     99  *   }
    100  * }}</pre>
    101  *
    102  * Here, each worker thread processes a row of the matrix then waits at the
    103  * barrier until all rows have been processed. When all rows are processed
    104  * the supplied {@link Runnable} barrier action is executed and merges the
    105  * rows. If the merger
    106  * determines that a solution has been found then {@code done()} will return
    107  * {@code true} and each worker will terminate.
    108  *
    109  * <p>If the barrier action does not rely on the parties being suspended when
    110  * it is executed, then any of the threads in the party could execute that
    111  * action when it is released. To facilitate this, each invocation of
    112  * {@link #await} returns the arrival index of that thread at the barrier.
    113  * You can then choose which thread should execute the barrier action, for
    114  * example:
    115  * <pre> {@code
    116  * if (barrier.await() == 0) {
    117  *   // log the completion of this iteration
    118  * }}</pre>
    119  *
    120  * <p>The {@code CyclicBarrier} uses an all-or-none breakage model
    121  * for failed synchronization attempts: If a thread leaves a barrier
    122  * point prematurely because of interruption, failure, or timeout, all
    123  * other threads waiting at that barrier point will also leave
    124  * abnormally via {@link BrokenBarrierException} (or
    125  * {@link InterruptedException} if they too were interrupted at about
    126  * the same time).
    127  *
    128  * <p>Memory consistency effects: Actions in a thread prior to calling
    129  * {@code await()}
    130  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
    131  * actions that are part of the barrier action, which in turn
    132  * <i>happen-before</i> actions following a successful return from the
    133  * corresponding {@code await()} in other threads.
    134  *
    135  * @since 1.5
    136  * @see CountDownLatch
    137  *
    138  * @author Doug Lea
    139  */
    140 public class CyclicBarrier {
    141     /**
    142      * Each use of the barrier is represented as a generation instance.
    143      * The generation changes whenever the barrier is tripped, or
    144      * is reset. There can be many generations associated with threads
    145      * using the barrier - due to the non-deterministic way the lock
    146      * may be allocated to waiting threads - but only one of these
    147      * can be active at a time (the one to which {@code count} applies)
    148      * and all the rest are either broken or tripped.
    149      * There need not be an active generation if there has been a break
    150      * but no subsequent reset.
    151      */
    152     private static class Generation {
    153         boolean broken;         // initially false
    154     }
    155 
    156     /** The lock for guarding barrier entry */
    157     private final ReentrantLock lock = new ReentrantLock();
    158     /** Condition to wait on until tripped */
    159     private final Condition trip = lock.newCondition();
    160     /** The number of parties */
    161     private final int parties;
    162     /** The command to run when tripped */
    163     private final Runnable barrierCommand;
    164     /** The current generation */
    165     private Generation generation = new Generation();
    166 
    167     /**
    168      * Number of parties still waiting. Counts down from parties to 0
    169      * on each generation.  It is reset to parties on each new
    170      * generation or when broken.
    171      */
    172     private int count;
    173 
    174     /**
    175      * Updates state on barrier trip and wakes up everyone.
    176      * Called only while holding lock.
    177      */
    178     private void nextGeneration() {
    179         // signal completion of last generation
    180         trip.signalAll();
    181         // set up next generation
    182         count = parties;
    183         generation = new Generation();
    184     }
    185 
    186     /**
    187      * Sets current barrier generation as broken and wakes up everyone.
    188      * Called only while holding lock.
    189      */
    190     private void breakBarrier() {
    191         generation.broken = true;
    192         count = parties;
    193         trip.signalAll();
    194     }
    195 
    196     /**
    197      * Main barrier code, covering the various policies.
    198      */
    199     private int dowait(boolean timed, long nanos)
    200         throws InterruptedException, BrokenBarrierException,
    201                TimeoutException {
    202         final ReentrantLock lock = this.lock;
    203         lock.lock();
    204         try {
    205             final Generation g = generation;
    206 
    207             if (g.broken)
    208                 throw new BrokenBarrierException();
    209 
    210             if (Thread.interrupted()) {
    211                 breakBarrier();
    212                 throw new InterruptedException();
    213             }
    214 
    215             int index = --count;
    216             if (index == 0) {  // tripped
    217                 boolean ranAction = false;
    218                 try {
    219                     final Runnable command = barrierCommand;
    220                     if (command != null)
    221                         command.run();
    222                     ranAction = true;
    223                     nextGeneration();
    224                     return 0;
    225                 } finally {
    226                     if (!ranAction)
    227                         breakBarrier();
    228                 }
    229             }
    230 
    231             // loop until tripped, broken, interrupted, or timed out
    232             for (;;) {
    233                 try {
    234                     if (!timed)
    235                         trip.await();
    236                     else if (nanos > 0L)
    237                         nanos = trip.awaitNanos(nanos);
    238                 } catch (InterruptedException ie) {
    239                     if (g == generation && ! g.broken) {
    240                         breakBarrier();
    241                         throw ie;
    242                     } else {
    243                         // We're about to finish waiting even if we had not
    244                         // been interrupted, so this interrupt is deemed to
    245                         // "belong" to subsequent execution.
    246                         Thread.currentThread().interrupt();
    247                     }
    248                 }
    249 
    250                 if (g.broken)
    251                     throw new BrokenBarrierException();
    252 
    253                 if (g != generation)
    254                     return index;
    255 
    256                 if (timed && nanos <= 0L) {
    257                     breakBarrier();
    258                     throw new TimeoutException();
    259                 }
    260             }
    261         } finally {
    262             lock.unlock();
    263         }
    264     }
    265 
    266     /**
    267      * Creates a new {@code CyclicBarrier} that will trip when the
    268      * given number of parties (threads) are waiting upon it, and which
    269      * will execute the given barrier action when the barrier is tripped,
    270      * performed by the last thread entering the barrier.
    271      *
    272      * @param parties the number of threads that must invoke {@link #await}
    273      *        before the barrier is tripped
    274      * @param barrierAction the command to execute when the barrier is
    275      *        tripped, or {@code null} if there is no action
    276      * @throws IllegalArgumentException if {@code parties} is less than 1
    277      */
    278     public CyclicBarrier(int parties, Runnable barrierAction) {
    279         if (parties <= 0) throw new IllegalArgumentException();
    280         this.parties = parties;
    281         this.count = parties;
    282         this.barrierCommand = barrierAction;
    283     }
    284 
    285     /**
    286      * Creates a new {@code CyclicBarrier} that will trip when the
    287      * given number of parties (threads) are waiting upon it, and
    288      * does not perform a predefined action when the barrier is tripped.
    289      *
    290      * @param parties the number of threads that must invoke {@link #await}
    291      *        before the barrier is tripped
    292      * @throws IllegalArgumentException if {@code parties} is less than 1
    293      */
    294     public CyclicBarrier(int parties) {
    295         this(parties, null);
    296     }
    297 
    298     /**
    299      * Returns the number of parties required to trip this barrier.
    300      *
    301      * @return the number of parties required to trip this barrier
    302      */
    303     public int getParties() {
    304         return parties;
    305     }
    306 
    307     /**
    308      * Waits until all {@linkplain #getParties parties} have invoked
    309      * {@code await} on this barrier.
    310      *
    311      * <p>If the current thread is not the last to arrive then it is
    312      * disabled for thread scheduling purposes and lies dormant until
    313      * one of the following things happens:
    314      * <ul>
    315      * <li>The last thread arrives; or
    316      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
    317      * the current thread; or
    318      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
    319      * one of the other waiting threads; or
    320      * <li>Some other thread times out while waiting for barrier; or
    321      * <li>Some other thread invokes {@link #reset} on this barrier.
    322      * </ul>
    323      *
    324      * <p>If the current thread:
    325      * <ul>
    326      * <li>has its interrupted status set on entry to this method; or
    327      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
    328      * </ul>
    329      * then {@link InterruptedException} is thrown and the current thread's
    330      * interrupted status is cleared.
    331      *
    332      * <p>If the barrier is {@link #reset} while any thread is waiting,
    333      * or if the barrier {@linkplain #isBroken is broken} when
    334      * {@code await} is invoked, or while any thread is waiting, then
    335      * {@link BrokenBarrierException} is thrown.
    336      *
    337      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
    338      * then all other waiting threads will throw
    339      * {@link BrokenBarrierException} and the barrier is placed in the broken
    340      * state.
    341      *
    342      * <p>If the current thread is the last thread to arrive, and a
    343      * non-null barrier action was supplied in the constructor, then the
    344      * current thread runs the action before allowing the other threads to
    345      * continue.
    346      * If an exception occurs during the barrier action then that exception
    347      * will be propagated in the current thread and the barrier is placed in
    348      * the broken state.
    349      *
    350      * @return the arrival index of the current thread, where index
    351      *         {@code getParties() - 1} indicates the first
    352      *         to arrive and zero indicates the last to arrive
    353      * @throws InterruptedException if the current thread was interrupted
    354      *         while waiting
    355      * @throws BrokenBarrierException if <em>another</em> thread was
    356      *         interrupted or timed out while the current thread was
    357      *         waiting, or the barrier was reset, or the barrier was
    358      *         broken when {@code await} was called, or the barrier
    359      *         action (if present) failed due to an exception
    360      */
    361     public int await() throws InterruptedException, BrokenBarrierException {
    362         try {
    363             return dowait(false, 0L);
    364         } catch (TimeoutException toe) {
    365             throw new Error(toe); // cannot happen
    366         }
    367     }
    368 
    369     /**
    370      * Waits until all {@linkplain #getParties parties} have invoked
    371      * {@code await} on this barrier, or the specified waiting time elapses.
    372      *
    373      * <p>If the current thread is not the last to arrive then it is
    374      * disabled for thread scheduling purposes and lies dormant until
    375      * one of the following things happens:
    376      * <ul>
    377      * <li>The last thread arrives; or
    378      * <li>The specified timeout elapses; or
    379      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
    380      * the current thread; or
    381      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
    382      * one of the other waiting threads; or
    383      * <li>Some other thread times out while waiting for barrier; or
    384      * <li>Some other thread invokes {@link #reset} on this barrier.
    385      * </ul>
    386      *
    387      * <p>If the current thread:
    388      * <ul>
    389      * <li>has its interrupted status set on entry to this method; or
    390      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
    391      * </ul>
    392      * then {@link InterruptedException} is thrown and the current thread's
    393      * interrupted status is cleared.
    394      *
    395      * <p>If the specified waiting time elapses then {@link TimeoutException}
    396      * is thrown. If the time is less than or equal to zero, the
    397      * method will not wait at all.
    398      *
    399      * <p>If the barrier is {@link #reset} while any thread is waiting,
    400      * or if the barrier {@linkplain #isBroken is broken} when
    401      * {@code await} is invoked, or while any thread is waiting, then
    402      * {@link BrokenBarrierException} is thrown.
    403      *
    404      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
    405      * waiting, then all other waiting threads will throw {@link
    406      * BrokenBarrierException} and the barrier is placed in the broken
    407      * state.
    408      *
    409      * <p>If the current thread is the last thread to arrive, and a
    410      * non-null barrier action was supplied in the constructor, then the
    411      * current thread runs the action before allowing the other threads to
    412      * continue.
    413      * If an exception occurs during the barrier action then that exception
    414      * will be propagated in the current thread and the barrier is placed in
    415      * the broken state.
    416      *
    417      * @param timeout the time to wait for the barrier
    418      * @param unit the time unit of the timeout parameter
    419      * @return the arrival index of the current thread, where index
    420      *         {@code getParties() - 1} indicates the first
    421      *         to arrive and zero indicates the last to arrive
    422      * @throws InterruptedException if the current thread was interrupted
    423      *         while waiting
    424      * @throws TimeoutException if the specified timeout elapses.
    425      *         In this case the barrier will be broken.
    426      * @throws BrokenBarrierException if <em>another</em> thread was
    427      *         interrupted or timed out while the current thread was
    428      *         waiting, or the barrier was reset, or the barrier was broken
    429      *         when {@code await} was called, or the barrier action (if
    430      *         present) failed due to an exception
    431      */
    432     public int await(long timeout, TimeUnit unit)
    433         throws InterruptedException,
    434                BrokenBarrierException,
    435                TimeoutException {
    436         return dowait(true, unit.toNanos(timeout));
    437     }
    438 
    439     /**
    440      * Queries if this barrier is in a broken state.
    441      *
    442      * @return {@code true} if one or more parties broke out of this
    443      *         barrier due to interruption or timeout since
    444      *         construction or the last reset, or a barrier action
    445      *         failed due to an exception; {@code false} otherwise.
    446      */
    447     public boolean isBroken() {
    448         final ReentrantLock lock = this.lock;
    449         lock.lock();
    450         try {
    451             return generation.broken;
    452         } finally {
    453             lock.unlock();
    454         }
    455     }
    456 
    457     /**
    458      * Resets the barrier to its initial state.  If any parties are
    459      * currently waiting at the barrier, they will return with a
    460      * {@link BrokenBarrierException}. Note that resets <em>after</em>
    461      * a breakage has occurred for other reasons can be complicated to
    462      * carry out; threads need to re-synchronize in some other way,
    463      * and choose one to perform the reset.  It may be preferable to
    464      * instead create a new barrier for subsequent use.
    465      */
    466     public void reset() {
    467         final ReentrantLock lock = this.lock;
    468         lock.lock();
    469         try {
    470             breakBarrier();   // break the current generation
    471             nextGeneration(); // start a new generation
    472         } finally {
    473             lock.unlock();
    474         }
    475     }
    476 
    477     /**
    478      * Returns the number of parties currently waiting at the barrier.
    479      * This method is primarily useful for debugging and assertions.
    480      *
    481      * @return the number of parties currently blocked in {@link #await}
    482      */
    483     public int getNumberWaiting() {
    484         final ReentrantLock lock = this.lock;
    485         lock.lock();
    486         try {
    487             return parties - count;
    488         } finally {
    489             lock.unlock();
    490         }
    491     }
    492 }
    493