Home | History | Annotate | Download | only in utils
      1 # Copyright 2014 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """ Wrapper that allows method execution in parallel.
      6 
      7 This class wraps a list of objects of the same type, emulates their
      8 interface, and executes any functions called on the objects in parallel
      9 in ReraiserThreads.
     10 
     11 This means that, given a list of objects:
     12 
     13   class Foo:
     14     def __init__(self):
     15       self.baz = Baz()
     16 
     17     def bar(self, my_param):
     18       // do something
     19 
     20   list_of_foos = [Foo(1), Foo(2), Foo(3)]
     21 
     22 we can take a sequential operation on that list of objects:
     23 
     24   for f in list_of_foos:
     25     f.bar('Hello')
     26 
     27 and run it in parallel across all of the objects:
     28 
     29   Parallelizer(list_of_foos).bar('Hello')
     30 
     31 It can also handle (non-method) attributes of objects, so that this:
     32 
     33   for f in list_of_foos:
     34     f.baz.myBazMethod()
     35 
     36 can be run in parallel with:
     37 
     38   Parallelizer(list_of_foos).baz.myBazMethod()
     39 
     40 Because it emulates the interface of the wrapped objects, a Parallelizer
     41 can be passed to a method or function that takes objects of that type:
     42 
     43   def DoesSomethingWithFoo(the_foo):
     44     the_foo.bar('Hello')
     45     the_foo.bar('world')
     46     the_foo.baz.myBazMethod
     47 
     48   DoesSomethingWithFoo(Parallelizer(list_of_foos))
     49 
     50 Note that this class spins up a thread for each object. Using this class
     51 to parallelize operations that are already fast will incur a net performance
     52 penalty.
     53 
     54 """
     55 # pylint: disable=protected-access
     56 
     57 from devil.utils import reraiser_thread
     58 from devil.utils import watchdog_timer
     59 
     60 _DEFAULT_TIMEOUT = 30
     61 _DEFAULT_RETRIES = 3
     62 
     63 
     64 class Parallelizer(object):
     65   """Allows parallel execution of method calls across a group of objects."""
     66 
     67   def __init__(self, objs):
     68     self._orig_objs = objs
     69     self._objs = objs
     70 
     71   def __getattr__(self, name):
     72     """Emulate getting the |name| attribute of |self|.
     73 
     74     Args:
     75       name: The name of the attribute to retrieve.
     76     Returns:
     77       A Parallelizer emulating the |name| attribute of |self|.
     78     """
     79     self.pGet(None)
     80 
     81     r = type(self)(self._orig_objs)
     82     r._objs = [getattr(o, name) for o in self._objs]
     83     return r
     84 
     85   def __getitem__(self, index):
     86     """Emulate getting the value of |self| at |index|.
     87 
     88     Returns:
     89       A Parallelizer emulating the value of |self| at |index|.
     90     """
     91     self.pGet(None)
     92 
     93     r = type(self)(self._orig_objs)
     94     r._objs = [o[index] for o in self._objs]
     95     return r
     96 
     97   def __call__(self, *args, **kwargs):
     98     """Emulate calling |self| with |args| and |kwargs|.
     99 
    100     Note that this call is asynchronous. Call pFinish on the return value to
    101     block until the call finishes.
    102 
    103     Returns:
    104       A Parallelizer wrapping the ReraiserThreadGroup running the call in
    105       parallel.
    106     Raises:
    107       AttributeError if the wrapped objects aren't callable.
    108     """
    109     self.pGet(None)
    110 
    111     for o in self._objs:
    112       if not callable(o):
    113         raise AttributeError("'%s' is not callable" % o.__name__)
    114 
    115     r = type(self)(self._orig_objs)
    116     r._objs = reraiser_thread.ReraiserThreadGroup(
    117         [reraiser_thread.ReraiserThread(
    118             o, args=args, kwargs=kwargs,
    119             name='%s.%s' % (str(d), o.__name__))
    120          for d, o in zip(self._orig_objs, self._objs)])
    121     r._objs.StartAll()
    122     return r
    123 
    124   def pFinish(self, timeout):
    125     """Finish any outstanding asynchronous operations.
    126 
    127     Args:
    128       timeout: The maximum number of seconds to wait for an individual
    129                result to return, or None to wait forever.
    130     Returns:
    131       self, now emulating the return values.
    132     """
    133     self._assertNoShadow('pFinish')
    134     if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
    135       self._objs.JoinAll()
    136       self._objs = self._objs.GetAllReturnValues(
    137           watchdog_timer.WatchdogTimer(timeout))
    138     return self
    139 
    140   def pGet(self, timeout):
    141     """Get the current wrapped objects.
    142 
    143     Args:
    144       timeout: Same as |pFinish|.
    145     Returns:
    146       A list of the results, in order of the provided devices.
    147     Raises:
    148       Any exception raised by any of the called functions.
    149     """
    150     self._assertNoShadow('pGet')
    151     self.pFinish(timeout)
    152     return self._objs
    153 
    154   def pMap(self, f, *args, **kwargs):
    155     """Map a function across the current wrapped objects in parallel.
    156 
    157     This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
    158 
    159     Note that this call is asynchronous. Call pFinish on the return value to
    160     block until the call finishes.
    161 
    162     Args:
    163       f: The function to call.
    164       args: The positional args to pass to f.
    165       kwargs: The keyword args to pass to f.
    166     Returns:
    167       A Parallelizer wrapping the ReraiserThreadGroup running the map in
    168       parallel.
    169     """
    170     self._assertNoShadow('pMap')
    171     r = type(self)(self._orig_objs)
    172     r._objs = reraiser_thread.ReraiserThreadGroup(
    173         [reraiser_thread.ReraiserThread(
    174             f, args=tuple([o] + list(args)), kwargs=kwargs,
    175             name='%s(%s)' % (f.__name__, d))
    176          for d, o in zip(self._orig_objs, self._objs)])
    177     r._objs.StartAll()
    178     return r
    179 
    180   def _assertNoShadow(self, attr_name):
    181     """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts.
    182 
    183     If the wrapped objects _do_ have an |attr_name| attribute, it will be
    184     inaccessible to clients.
    185 
    186     Args:
    187       attr_name: The attribute to check.
    188     Raises:
    189       AssertionError if the wrapped objects have an attribute named 'attr_name'
    190       or '_assertNoShadow'.
    191     """
    192     if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
    193       assert not hasattr(self._objs, '_assertNoShadow')
    194       assert not hasattr(self._objs, attr_name)
    195     else:
    196       assert not any(hasattr(o, '_assertNoShadow') for o in self._objs)
    197       assert not any(hasattr(o, attr_name) for o in self._objs)
    198 
    199 
    200 class SyncParallelizer(Parallelizer):
    201   """A Parallelizer that blocks on function calls."""
    202 
    203   def __enter__(self):
    204     """Emulate entering the context of |self|.
    205 
    206     Note that this call is synchronous.
    207 
    208     Returns:
    209       A Parallelizer emulating the value returned from entering into the
    210       context of |self|.
    211     """
    212     r = type(self)(self._orig_objs)
    213     r._objs = [o.__enter__ for o in r._objs]
    214     return r.__call__()
    215 
    216   def __exit__(self, exc_type, exc_val, exc_tb):
    217     """Emulate exiting the context of |self|.
    218 
    219     Note that this call is synchronous.
    220 
    221     Args:
    222       exc_type: the exception type.
    223       exc_val: the exception value.
    224       exc_tb: the exception traceback.
    225     """
    226     r = type(self)(self._orig_objs)
    227     r._objs = [o.__exit__ for o in r._objs]
    228     r.__call__(exc_type, exc_val, exc_tb)
    229 
    230   # override
    231   def __call__(self, *args, **kwargs):
    232     """Emulate calling |self| with |args| and |kwargs|.
    233 
    234     Note that this call is synchronous.
    235 
    236     Returns:
    237       A Parallelizer emulating the value returned from calling |self| with
    238       |args| and |kwargs|.
    239     Raises:
    240       AttributeError if the wrapped objects aren't callable.
    241     """
    242     r = super(SyncParallelizer, self).__call__(*args, **kwargs)
    243     r.pFinish(None)
    244     return r
    245 
    246   # override
    247   def pMap(self, f, *args, **kwargs):
    248     """Map a function across the current wrapped objects in parallel.
    249 
    250     This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
    251 
    252     Note that this call is synchronous.
    253 
    254     Args:
    255       f: The function to call.
    256       args: The positional args to pass to f.
    257       kwargs: The keyword args to pass to f.
    258     Returns:
    259       A Parallelizer wrapping the ReraiserThreadGroup running the map in
    260       parallel.
    261     """
    262     r = super(SyncParallelizer, self).pMap(f, *args, **kwargs)
    263     r.pFinish(None)
    264     return r
    265 
    266