Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 from multiprocessing import Queue, queues
      6 
      7 
      8 class QueueBarrierTimeout(Exception):
      9     """QueueBarrier timeout exception."""
     10 
     11 
     12 class QueueBarrier(object):
     13     """This class implements a simple barrier to synchronize processes. The
     14     barrier relies on the fact that there a single process "master" and |n|
     15     different "slaves" to make the implementation simpler. Also, given this
     16     hierarchy, the slaves and the master can exchange a token while passing
     17     through the barrier.
     18 
     19     The so called "master" shall call master_barrier() while the "slave" shall
     20     call the slave_barrier() method.
     21 
     22     If the same group of |n| slaves and the same master are participating in the
     23     barrier, it is totally safe to reuse the barrier several times with the same
     24     group of processes.
     25     """
     26 
     27 
     28     def __init__(self, n):
     29         """Initializes the barrier with |n| slave processes and a master.
     30 
     31         @param n: The number of slave processes."""
     32         self.n_ = n
     33         self.queue_master_ = Queue()
     34         self.queue_slave_ = Queue()
     35 
     36 
     37     def master_barrier(self, token=None, timeout=None):
     38         """Makes the master wait until all the "n" slaves have reached this
     39         point.
     40 
     41         @param token: A value passed to every slave.
     42         @param timeout: The timeout, in seconds, to wait for the slaves.
     43                 A None value will block forever.
     44 
     45         Returns the list of received tokens from the slaves.
     46         """
     47         # Wait for all the slaves.
     48         result = []
     49         try:
     50             for _ in range(self.n_):
     51                 result.append(self.queue_master_.get(timeout=timeout))
     52         except queues.Empty:
     53             # Timeout expired
     54             raise QueueBarrierTimeout()
     55         # Release all the blocked slaves.
     56         for _ in range(self.n_):
     57             self.queue_slave_.put(token)
     58         return result
     59 
     60 
     61     def slave_barrier(self, token=None, timeout=None):
     62         """Makes a slave wait until all the "n" slaves and the master have
     63         reached this point.
     64 
     65         @param token: A value passed to the master.
     66         @param timeout: The timeout, in seconds, to wait for the slaves.
     67                 A None value will block forever.
     68         """
     69         self.queue_master_.put(token)
     70         try:
     71             return self.queue_slave_.get(timeout=timeout)
     72         except queues.Empty:
     73             # Timeout expired
     74             raise QueueBarrierTimeout()
     75