Home | History | Annotate | Download | only in utils
      1 #! /usr/bin/env python
      2 # Copyright 2014 The Chromium Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 """Unit tests for the contents of parallelizer.py."""
      7 
      8 # pylint: disable=protected-access
      9 # pylint: disable=unused-argument
     10 
     11 import contextlib
     12 import os
     13 import tempfile
     14 import time
     15 import sys
     16 import unittest
     17 
     18 if __name__ == '__main__':
     19   sys.path.append(os.path.abspath(
     20       os.path.join(os.path.dirname(__file__), '..', '..')))
     21 
     22 from devil.utils import parallelizer
     23 
     24 
     25 class ParallelizerTestObject(object):
     26   """Class used to test parallelizer.Parallelizer."""
     27 
     28   parallel = parallelizer.Parallelizer
     29 
     30   def __init__(self, thing, completion_file_name=None):
     31     self._thing = thing
     32     self._completion_file_name = completion_file_name
     33     self.helper = ParallelizerTestObjectHelper(thing)
     34 
     35   @staticmethod
     36   def doReturn(what):
     37     return what
     38 
     39   @classmethod
     40   def doRaise(cls, what):
     41     raise what
     42 
     43   def doSetTheThing(self, new_thing):
     44     self._thing = new_thing
     45 
     46   def doReturnTheThing(self):
     47     return self._thing
     48 
     49   def doRaiseTheThing(self):
     50     raise self._thing
     51 
     52   def doRaiseIfExceptionElseSleepFor(self, sleep_duration):
     53     if isinstance(self._thing, Exception):
     54       raise self._thing
     55     time.sleep(sleep_duration)
     56     self._write_completion_file()
     57     return self._thing
     58 
     59   def _write_completion_file(self):
     60     if self._completion_file_name and len(self._completion_file_name):
     61       with open(self._completion_file_name, 'w+b') as completion_file:
     62         completion_file.write('complete')
     63 
     64   def __getitem__(self, index):
     65     return self._thing[index]
     66 
     67   def __str__(self):
     68     return type(self).__name__
     69 
     70 
     71 class ParallelizerTestObjectHelper(object):
     72 
     73   def __init__(self, thing):
     74     self._thing = thing
     75 
     76   def doReturnStringThing(self):
     77     return str(self._thing)
     78 
     79 
     80 class ParallelizerTest(unittest.TestCase):
     81 
     82   def testInitEmptyList(self):
     83     r = parallelizer.Parallelizer([]).replace('a', 'b').pGet(0.1)
     84     self.assertEquals([], r)
     85 
     86   def testMethodCall(self):
     87     test_data = ['abc_foo', 'def_foo', 'ghi_foo']
     88     expected = ['abc_bar', 'def_bar', 'ghi_bar']
     89     r = parallelizer.Parallelizer(test_data).replace('_foo', '_bar').pGet(0.1)
     90     self.assertEquals(expected, r)
     91 
     92   def testMutate(self):
     93     devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
     94     self.assertTrue(all(d.doReturnTheThing() for d in devices))
     95     ParallelizerTestObject.parallel(devices).doSetTheThing(False).pFinish(1)
     96     self.assertTrue(not any(d.doReturnTheThing() for d in devices))
     97 
     98   def testAllReturn(self):
     99     devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
    100     results = ParallelizerTestObject.parallel(
    101         devices).doReturnTheThing().pGet(1)
    102     self.assertTrue(isinstance(results, list))
    103     self.assertEquals(10, len(results))
    104     self.assertTrue(all(results))
    105 
    106   def testAllRaise(self):
    107     devices = [ParallelizerTestObject(Exception('thing %d' % i))
    108                for i in xrange(0, 10)]
    109     p = ParallelizerTestObject.parallel(devices).doRaiseTheThing()
    110     with self.assertRaises(Exception):
    111       p.pGet(1)
    112 
    113   def testOneFailOthersComplete(self):
    114     parallel_device_count = 10
    115     exception_index = 7
    116     exception_msg = 'thing %d' % exception_index
    117 
    118     try:
    119       completion_files = [tempfile.NamedTemporaryFile(delete=False)
    120                           for _ in xrange(0, parallel_device_count)]
    121       devices = [
    122           ParallelizerTestObject(
    123               i if i != exception_index else Exception(exception_msg),
    124               completion_files[i].name)
    125           for i in xrange(0, parallel_device_count)]
    126       for f in completion_files:
    127         f.close()
    128       p = ParallelizerTestObject.parallel(devices)
    129       with self.assertRaises(Exception) as e:
    130         p.doRaiseIfExceptionElseSleepFor(2).pGet(3)
    131       self.assertTrue(exception_msg in str(e.exception))
    132       for i in xrange(0, parallel_device_count):
    133         with open(completion_files[i].name) as f:
    134           if i == exception_index:
    135             self.assertEquals('', f.read())
    136           else:
    137             self.assertEquals('complete', f.read())
    138     finally:
    139       for f in completion_files:
    140         os.remove(f.name)
    141 
    142   def testReusable(self):
    143     devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
    144     p = ParallelizerTestObject.parallel(devices)
    145     results = p.doReturn(True).pGet(1)
    146     self.assertTrue(all(results))
    147     results = p.doReturn(True).pGet(1)
    148     self.assertTrue(all(results))
    149     with self.assertRaises(Exception):
    150       results = p.doRaise(Exception('reusableTest')).pGet(1)
    151 
    152   def testContained(self):
    153     devices = [ParallelizerTestObject(i) for i in xrange(0, 10)]
    154     results = (ParallelizerTestObject.parallel(devices).helper
    155         .doReturnStringThing().pGet(1))
    156     self.assertTrue(isinstance(results, list))
    157     self.assertEquals(10, len(results))
    158     for i in xrange(0, 10):
    159       self.assertEquals(str(i), results[i])
    160 
    161   def testGetItem(self):
    162     devices = [ParallelizerTestObject(range(i, i + 10)) for i in xrange(0, 10)]
    163     results = ParallelizerTestObject.parallel(devices)[9].pGet(1)
    164     self.assertEquals(range(9, 19), results)
    165 
    166 
    167 class SyncParallelizerTest(unittest.TestCase):
    168 
    169   def testContextManager(self):
    170     in_context = [False for i in xrange(10)]
    171 
    172     @contextlib.contextmanager
    173     def enter_into_context(i):
    174       in_context[i] = True
    175       try:
    176         yield
    177       finally:
    178         in_context[i] = False
    179 
    180     parallelized_context = parallelizer.SyncParallelizer(
    181         [enter_into_context(i) for i in xrange(10)])
    182 
    183     with parallelized_context:
    184       self.assertTrue(all(in_context))
    185     self.assertFalse(any(in_context))
    186 
    187 
    188 if __name__ == '__main__':
    189   unittest.main(verbosity=2)
    190 
    191