Home | History | Annotate | Download | only in utils
      1 # Copyright 2014 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Unit tests for the contents of parallelizer.py."""
      6 
      7 # pylint: disable=W0212
      8 # pylint: disable=W0613
      9 
     10 import os
     11 import tempfile
     12 import time
     13 import unittest
     14 
     15 from pylib.utils import parallelizer
     16 
     17 
     18 class ParallelizerTestObject(object):
     19   """Class used to test parallelizer.Parallelizer."""
     20 
     21   parallel = parallelizer.Parallelizer
     22 
     23   def __init__(self, thing, completion_file_name=None):
     24     self._thing = thing
     25     self._completion_file_name = completion_file_name
     26     self.helper = ParallelizerTestObjectHelper(thing)
     27 
     28   @staticmethod
     29   def doReturn(what):
     30     return what
     31 
     32   @classmethod
     33   def doRaise(cls, what):
     34     raise what
     35 
     36   def doSetTheThing(self, new_thing):
     37     self._thing = new_thing
     38 
     39   def doReturnTheThing(self):
     40     return self._thing
     41 
     42   def doRaiseTheThing(self):
     43     raise self._thing
     44 
     45   def doRaiseIfExceptionElseSleepFor(self, sleep_duration):
     46     if isinstance(self._thing, Exception):
     47       raise self._thing
     48     time.sleep(sleep_duration)
     49     self._write_completion_file()
     50     return self._thing
     51 
     52   def _write_completion_file(self):
     53     if self._completion_file_name and len(self._completion_file_name):
     54       with open(self._completion_file_name, 'w+b') as completion_file:
     55         completion_file.write('complete')
     56 
     57   def __getitem__(self, index):
     58     return self._thing[index]
     59 
     60   def __str__(self):
     61     return type(self).__name__
     62 
     63 
     64 class ParallelizerTestObjectHelper(object):
     65 
     66   def __init__(self, thing):
     67     self._thing = thing
     68 
     69   def doReturnStringThing(self):
     70     return str(self._thing)
     71 
     72 
     73 class ParallelizerTest(unittest.TestCase):
     74 
     75   def testInitWithNone(self):
     76     with self.assertRaises(AssertionError):
     77       parallelizer.Parallelizer(None)
     78 
     79   def testInitEmptyList(self):
     80     with self.assertRaises(AssertionError):
     81       parallelizer.Parallelizer([])
     82 
     83   def testMethodCall(self):
     84     test_data = ['abc_foo', 'def_foo', 'ghi_foo']
     85     expected = ['abc_bar', 'def_bar', 'ghi_bar']
     86     r = parallelizer.Parallelizer(test_data).replace('_foo', '_bar').pGet(0.1)
     87     self.assertEquals(expected, r)
     88 
     89   def testMutate(self):
     90     devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
     91     self.assertTrue(all(d.doReturnTheThing() for d in devices))
     92     ParallelizerTestObject.parallel(devices).doSetTheThing(False).pFinish(1)
     93     self.assertTrue(not any(d.doReturnTheThing() for d in devices))
     94 
     95   def testAllReturn(self):
     96     devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
     97     results = ParallelizerTestObject.parallel(
     98         devices).doReturnTheThing().pGet(1)
     99     self.assertTrue(isinstance(results, list))
    100     self.assertEquals(10, len(results))
    101     self.assertTrue(all(results))
    102 
    103   def testAllRaise(self):
    104     devices = [ParallelizerTestObject(Exception('thing %d' % i))
    105                for i in xrange(0, 10)]
    106     p = ParallelizerTestObject.parallel(devices).doRaiseTheThing()
    107     with self.assertRaises(Exception):
    108       p.pGet(1)
    109 
    110   def testOneFailOthersComplete(self):
    111     parallel_device_count = 10
    112     exception_index = 7
    113     exception_msg = 'thing %d' % exception_index
    114 
    115     try:
    116       completion_files = [tempfile.NamedTemporaryFile(delete=False)
    117                           for _ in xrange(0, parallel_device_count)]
    118       devices = [
    119           ParallelizerTestObject(
    120               i if i != exception_index else Exception(exception_msg),
    121               completion_files[i].name)
    122           for i in xrange(0, parallel_device_count)]
    123       for f in completion_files:
    124         f.close()
    125       p = ParallelizerTestObject.parallel(devices)
    126       with self.assertRaises(Exception) as e:
    127         p.doRaiseIfExceptionElseSleepFor(2).pGet(3)
    128       self.assertTrue(exception_msg in str(e.exception))
    129       for i in xrange(0, parallel_device_count):
    130         with open(completion_files[i].name) as f:
    131           if i == exception_index:
    132             self.assertEquals('', f.read())
    133           else:
    134             self.assertEquals('complete', f.read())
    135     finally:
    136       for f in completion_files:
    137         os.remove(f.name)
    138 
    139   def testReusable(self):
    140     devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
    141     p = ParallelizerTestObject.parallel(devices)
    142     results = p.doReturn(True).pGet(1)
    143     self.assertTrue(all(results))
    144     results = p.doReturn(True).pGet(1)
    145     self.assertTrue(all(results))
    146     with self.assertRaises(Exception):
    147       results = p.doRaise(Exception('reusableTest')).pGet(1)
    148 
    149   def testContained(self):
    150     devices = [ParallelizerTestObject(i) for i in xrange(0, 10)]
    151     results = (ParallelizerTestObject.parallel(devices).helper
    152         .doReturnStringThing().pGet(1))
    153     self.assertTrue(isinstance(results, list))
    154     self.assertEquals(10, len(results))
    155     for i in xrange(0, 10):
    156       self.assertEquals(str(i), results[i])
    157 
    158   def testGetItem(self):
    159     devices = [ParallelizerTestObject(range(i, i+10)) for i in xrange(0, 10)]
    160     results = ParallelizerTestObject.parallel(devices)[9].pGet(1)
    161     self.assertEquals(range(9, 19), results)
    162 
    163 
    164 if __name__ == '__main__':
    165   unittest.main(verbosity=2)
    166 
    167