1 #! /usr/bin/env python 2 # Copyright 2014 The Chromium Authors. All rights reserved. 3 # Use of this source code is governed by a BSD-style license that can be 4 # found in the LICENSE file. 5 6 """Unit tests for the contents of parallelizer.py.""" 7 8 # pylint: disable=protected-access 9 # pylint: disable=unused-argument 10 11 import contextlib 12 import os 13 import tempfile 14 import time 15 import sys 16 import unittest 17 18 if __name__ == '__main__': 19 sys.path.append(os.path.abspath( 20 os.path.join(os.path.dirname(__file__), '..', '..'))) 21 22 from devil.utils import parallelizer 23 24 25 class ParallelizerTestObject(object): 26 """Class used to test parallelizer.Parallelizer.""" 27 28 parallel = parallelizer.Parallelizer 29 30 def __init__(self, thing, completion_file_name=None): 31 self._thing = thing 32 self._completion_file_name = completion_file_name 33 self.helper = ParallelizerTestObjectHelper(thing) 34 35 @staticmethod 36 def doReturn(what): 37 return what 38 39 @classmethod 40 def doRaise(cls, what): 41 raise what 42 43 def doSetTheThing(self, new_thing): 44 self._thing = new_thing 45 46 def doReturnTheThing(self): 47 return self._thing 48 49 def doRaiseTheThing(self): 50 raise self._thing 51 52 def doRaiseIfExceptionElseSleepFor(self, sleep_duration): 53 if isinstance(self._thing, Exception): 54 raise self._thing 55 time.sleep(sleep_duration) 56 self._write_completion_file() 57 return self._thing 58 59 def _write_completion_file(self): 60 if self._completion_file_name and len(self._completion_file_name): 61 with open(self._completion_file_name, 'w+b') as completion_file: 62 completion_file.write('complete') 63 64 def __getitem__(self, index): 65 return self._thing[index] 66 67 def __str__(self): 68 return type(self).__name__ 69 70 71 class ParallelizerTestObjectHelper(object): 72 73 def __init__(self, thing): 74 self._thing = thing 75 76 def doReturnStringThing(self): 77 return str(self._thing) 78 79 80 class ParallelizerTest(unittest.TestCase): 81 82 def testInitEmptyList(self): 83 r = parallelizer.Parallelizer([]).replace('a', 'b').pGet(0.1) 84 self.assertEquals([], r) 85 86 def testMethodCall(self): 87 test_data = ['abc_foo', 'def_foo', 'ghi_foo'] 88 expected = ['abc_bar', 'def_bar', 'ghi_bar'] 89 r = parallelizer.Parallelizer(test_data).replace('_foo', '_bar').pGet(0.1) 90 self.assertEquals(expected, r) 91 92 def testMutate(self): 93 devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)] 94 self.assertTrue(all(d.doReturnTheThing() for d in devices)) 95 ParallelizerTestObject.parallel(devices).doSetTheThing(False).pFinish(1) 96 self.assertTrue(not any(d.doReturnTheThing() for d in devices)) 97 98 def testAllReturn(self): 99 devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)] 100 results = ParallelizerTestObject.parallel( 101 devices).doReturnTheThing().pGet(1) 102 self.assertTrue(isinstance(results, list)) 103 self.assertEquals(10, len(results)) 104 self.assertTrue(all(results)) 105 106 def testAllRaise(self): 107 devices = [ParallelizerTestObject(Exception('thing %d' % i)) 108 for i in xrange(0, 10)] 109 p = ParallelizerTestObject.parallel(devices).doRaiseTheThing() 110 with self.assertRaises(Exception): 111 p.pGet(1) 112 113 def testOneFailOthersComplete(self): 114 parallel_device_count = 10 115 exception_index = 7 116 exception_msg = 'thing %d' % exception_index 117 118 try: 119 completion_files = [tempfile.NamedTemporaryFile(delete=False) 120 for _ in xrange(0, parallel_device_count)] 121 devices = [ 122 ParallelizerTestObject( 123 i if i != exception_index else Exception(exception_msg), 124 completion_files[i].name) 125 for i in xrange(0, parallel_device_count)] 126 for f in completion_files: 127 f.close() 128 p = ParallelizerTestObject.parallel(devices) 129 with self.assertRaises(Exception) as e: 130 p.doRaiseIfExceptionElseSleepFor(2).pGet(3) 131 self.assertTrue(exception_msg in str(e.exception)) 132 for i in xrange(0, parallel_device_count): 133 with open(completion_files[i].name) as f: 134 if i == exception_index: 135 self.assertEquals('', f.read()) 136 else: 137 self.assertEquals('complete', f.read()) 138 finally: 139 for f in completion_files: 140 os.remove(f.name) 141 142 def testReusable(self): 143 devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)] 144 p = ParallelizerTestObject.parallel(devices) 145 results = p.doReturn(True).pGet(1) 146 self.assertTrue(all(results)) 147 results = p.doReturn(True).pGet(1) 148 self.assertTrue(all(results)) 149 with self.assertRaises(Exception): 150 results = p.doRaise(Exception('reusableTest')).pGet(1) 151 152 def testContained(self): 153 devices = [ParallelizerTestObject(i) for i in xrange(0, 10)] 154 results = (ParallelizerTestObject.parallel(devices).helper 155 .doReturnStringThing().pGet(1)) 156 self.assertTrue(isinstance(results, list)) 157 self.assertEquals(10, len(results)) 158 for i in xrange(0, 10): 159 self.assertEquals(str(i), results[i]) 160 161 def testGetItem(self): 162 devices = [ParallelizerTestObject(range(i, i + 10)) for i in xrange(0, 10)] 163 results = ParallelizerTestObject.parallel(devices)[9].pGet(1) 164 self.assertEquals(range(9, 19), results) 165 166 167 class SyncParallelizerTest(unittest.TestCase): 168 169 def testContextManager(self): 170 in_context = [False for i in xrange(10)] 171 172 @contextlib.contextmanager 173 def enter_into_context(i): 174 in_context[i] = True 175 try: 176 yield 177 finally: 178 in_context[i] = False 179 180 parallelized_context = parallelizer.SyncParallelizer( 181 [enter_into_context(i) for i in xrange(10)]) 182 183 with parallelized_context: 184 self.assertTrue(all(in_context)) 185 self.assertFalse(any(in_context)) 186 187 188 if __name__ == '__main__': 189 unittest.main(verbosity=2) 190 191