1 # -*- coding: utf-8 -*- 2 # Copyright 2013 Google Inc. All Rights Reserved. 3 # 4 # Permission is hereby granted, free of charge, to any person obtaining a 5 # copy of this software and associated documentation files (the 6 # "Software"), to deal in the Software without restriction, including 7 # without limitation the rights to use, copy, modify, merge, publish, dis- 8 # tribute, sublicense, and/or sell copies of the Software, and to permit 9 # persons to whom the Software is furnished to do so, subject to the fol- 10 # lowing conditions: 11 # 12 # The above copyright notice and this permission notice shall be included 13 # in all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21 # IN THE SOFTWARE. 22 """Unit tests for gsutil parallelism framework.""" 23 24 from __future__ import absolute_import 25 26 import functools 27 import os 28 import signal 29 import threading 30 import time 31 32 from boto.storage_uri import BucketStorageUri 33 from gslib import cs_api_map 34 from gslib.command import Command 35 from gslib.command import CreateGsutilLogger 36 from gslib.command import DummyArgChecker 37 import gslib.tests.testcase as testcase 38 from gslib.tests.testcase.base import RequiresIsolation 39 from gslib.tests.util import unittest 40 from gslib.util import CheckMultiprocessingAvailableAndInit 41 from gslib.util import IS_WINDOWS 42 43 44 # Amount of time for an individual test to run before timing out. We need a 45 # reasonably high value since if many tests are running in parallel, an 46 # individual test may take a while to complete. 47 _TEST_TIMEOUT_SECONDS = 120 48 49 50 def Timeout(func): 51 """Decorator used to provide a timeout for functions.""" 52 @functools.wraps(func) 53 def Wrapper(*args, **kwargs): 54 if not IS_WINDOWS: 55 signal.signal(signal.SIGALRM, _HandleAlarm) 56 signal.alarm(_TEST_TIMEOUT_SECONDS) 57 try: 58 func(*args, **kwargs) 59 finally: 60 if not IS_WINDOWS: 61 signal.alarm(0) # Cancel the alarm. 62 return Wrapper 63 64 65 # pylint: disable=unused-argument 66 def _HandleAlarm(signal_num, cur_stack_frame): 67 raise Exception('Test timed out.') 68 69 70 class CustomException(Exception): 71 72 def __init__(self, exception_str): 73 super(CustomException, self).__init__(exception_str) 74 75 76 def _ReturnOneValue(cls, args, thread_state=None): 77 return 1 78 79 80 def _ReturnProcAndThreadId(cls, args, thread_state=None): 81 return (os.getpid(), threading.currentThread().ident) 82 83 84 def _SleepThenReturnProcAndThreadId(cls, args, thread_state=None): 85 # This can fail if the total time to spawn new processes and threads takes 86 # longer than 5 seconds, but if that occurs, then we have a performance 87 # problem that needs to be addressed. 88 time.sleep(5) 89 return _ReturnProcAndThreadId(cls, args, thread_state=thread_state) 90 91 92 def _FailureFunc(cls, args, thread_state=None): 93 raise CustomException('Failing on purpose.') 94 95 96 def _FailingExceptionHandler(cls, e): 97 cls.failure_count += 1 98 raise CustomException('Exception handler failing on purpose.') 99 100 101 def _ExceptionHandler(cls, e): 102 cls.logger.exception(e) 103 cls.failure_count += 1 104 105 106 def _IncrementByLength(cls, args, thread_state=None): 107 cls.arg_length_sum += len(args) 108 109 110 def _AdjustProcessCountIfWindows(process_count): 111 if IS_WINDOWS: 112 return 1 113 else: 114 return process_count 115 116 117 def _ReApplyWithReplicatedArguments(cls, args, thread_state=None): 118 """Calls Apply with arguments repeated seven times. 119 120 The first two elements of args should be the process and thread counts, 121 respectively, to be used for the recursive calls. 122 123 Args: 124 cls: The Command class to call Apply on. 125 args: Arguments to pass to Apply. 126 thread_state: Unused, required by function signature. 127 128 Returns: 129 Number of values returned by the two calls to Apply. 130 """ 131 new_args = [args] * 7 132 process_count = _AdjustProcessCountIfWindows(args[0]) 133 thread_count = args[1] 134 return_values = cls.Apply(_PerformNRecursiveCalls, new_args, 135 _ExceptionHandler, arg_checker=DummyArgChecker, 136 process_count=process_count, 137 thread_count=thread_count, 138 should_return_results=True) 139 ret = sum(return_values) 140 141 return_values = cls.Apply(_ReturnOneValue, new_args, 142 _ExceptionHandler, arg_checker=DummyArgChecker, 143 process_count=process_count, 144 thread_count=thread_count, 145 should_return_results=True) 146 147 return len(return_values) + ret 148 149 150 def _PerformNRecursiveCalls(cls, args, thread_state=None): 151 """Calls Apply to perform N recursive calls. 152 153 The first two elements of args should be the process and thread counts, 154 respectively, to be used for the recursive calls, while N is the third element 155 (the number of recursive calls to make). 156 157 Args: 158 cls: The Command class to call Apply on. 159 args: Arguments to pass to Apply. 160 thread_state: Unused, required by function signature. 161 162 Returns: 163 Number of values returned by the call to Apply. 164 """ 165 process_count = _AdjustProcessCountIfWindows(args[0]) 166 thread_count = args[1] 167 return_values = cls.Apply(_ReturnOneValue, [()] * args[2], _ExceptionHandler, 168 arg_checker=DummyArgChecker, 169 process_count=process_count, 170 thread_count=thread_count, 171 should_return_results=True) 172 return len(return_values) 173 174 175 def _SkipEvenNumbersArgChecker(cls, arg): 176 return arg % 2 != 0 177 178 179 class FailingIterator(object): 180 181 def __init__(self, size, failure_indices): 182 self.size = size 183 self.failure_indices = failure_indices 184 self.current_index = 0 185 186 def __iter__(self): 187 return self 188 189 def next(self): 190 if self.current_index == self.size: 191 raise StopIteration('') 192 elif self.current_index in self.failure_indices: 193 self.current_index += 1 194 raise CustomException( 195 'Iterator failing on purpose at index %d.' % self.current_index) 196 else: 197 self.current_index += 1 198 return self.current_index - 1 199 200 201 class FakeCommand(Command): 202 """Fake command class for overriding command instance state.""" 203 command_spec = Command.CreateCommandSpec( 204 'fake', 205 command_name_aliases=[], 206 ) 207 # Help specification. See help_provider.py for documentation. 208 help_spec = Command.HelpSpec( 209 help_name='fake', 210 help_name_aliases=[], 211 help_type='command_help', 212 help_one_line_summary='Something to take up space.', 213 help_text='Something else to take up space.', 214 subcommand_help_text={}, 215 ) 216 217 def __init__(self, do_parallel): 218 self.bucket_storage_uri_class = BucketStorageUri 219 support_map = { 220 'gs': ['JSON'], 221 's3': ['XML'] 222 } 223 default_map = { 224 'gs': 'JSON', 225 's3': 'XML' 226 } 227 self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap( 228 cs_api_map.GsutilApiClassMapFactory, support_map, default_map) 229 self.logger = CreateGsutilLogger('FakeCommand') 230 self.parallel_operations = do_parallel 231 self.failure_count = 0 232 self.multiprocessing_is_available = ( 233 CheckMultiprocessingAvailableAndInit().is_available) 234 self.debug = 0 235 236 237 class FakeCommandWithoutMultiprocessingModule(FakeCommand): 238 239 def __init__(self, do_parallel): 240 super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel) 241 self.multiprocessing_is_available = False 242 243 244 # TODO: Figure out a good way to test that ctrl+C really stops execution, 245 # and also that ctrl+C works when there are still tasks enqueued. 246 class TestParallelismFramework(testcase.GsUtilUnitTestCase): 247 """gsutil parallelism framework test suite.""" 248 249 command_class = FakeCommand 250 251 def _RunApply(self, func, args_iterator, process_count, thread_count, 252 command_inst=None, shared_attrs=None, fail_on_error=False, 253 thr_exc_handler=None, arg_checker=DummyArgChecker): 254 command_inst = command_inst or self.command_class(True) 255 exception_handler = thr_exc_handler or _ExceptionHandler 256 257 return command_inst.Apply(func, args_iterator, exception_handler, 258 thread_count=thread_count, 259 process_count=process_count, 260 arg_checker=arg_checker, 261 should_return_results=True, 262 shared_attrs=shared_attrs, 263 fail_on_error=fail_on_error) 264 265 @RequiresIsolation 266 def testBasicApplySingleProcessSingleThread(self): 267 self._TestBasicApply(1, 1) 268 269 @RequiresIsolation 270 def testBasicApplySingleProcessMultiThread(self): 271 self._TestBasicApply(1, 3) 272 273 @RequiresIsolation 274 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 275 def testBasicApplyMultiProcessSingleThread(self): 276 self._TestBasicApply(3, 1) 277 278 @RequiresIsolation 279 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 280 def testBasicApplyMultiProcessMultiThread(self): 281 self._TestBasicApply(3, 3) 282 283 @Timeout 284 def _TestBasicApply(self, process_count, thread_count): 285 args = [()] * (17 * process_count * thread_count + 1) 286 287 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) 288 self.assertEqual(len(args), len(results)) 289 290 @RequiresIsolation 291 def testNoTasksSingleProcessSingleThread(self): 292 self._TestApplyWithNoTasks(1, 1) 293 294 @RequiresIsolation 295 def testNoTasksSingleProcessMultiThread(self): 296 self._TestApplyWithNoTasks(1, 3) 297 298 @RequiresIsolation 299 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 300 def testNoTasksMultiProcessSingleThread(self): 301 self._TestApplyWithNoTasks(3, 1) 302 303 @RequiresIsolation 304 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 305 def testNoTasksMultiProcessMultiThread(self): 306 self._TestApplyWithNoTasks(3, 3) 307 308 @Timeout 309 def _TestApplyWithNoTasks(self, process_count, thread_count): 310 """Tests that calling Apply with no tasks releases locks/semaphores.""" 311 empty_args = [()] 312 313 for _ in range(process_count * thread_count + 1): 314 self._RunApply(_ReturnOneValue, empty_args, process_count, thread_count) 315 316 # Ensure that work can still be performed. 317 self._TestBasicApply(process_count, thread_count) 318 319 @RequiresIsolation 320 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 321 def testApplySaturatesMultiProcessSingleThread(self): 322 self._TestApplySaturatesAvailableProcessesAndThreads(3, 1) 323 324 @RequiresIsolation 325 def testApplySaturatesSingleProcessMultiThread(self): 326 self._TestApplySaturatesAvailableProcessesAndThreads(1, 3) 327 328 @RequiresIsolation 329 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 330 def testApplySaturatesMultiProcessMultiThread(self): 331 self._TestApplySaturatesAvailableProcessesAndThreads(3, 3) 332 333 @RequiresIsolation 334 def _TestApplySaturatesAvailableProcessesAndThreads(self, process_count, 335 thread_count): 336 """Tests that created processes and threads evenly share tasks.""" 337 calls_per_thread = 2 338 args = [()] * (process_count * thread_count * calls_per_thread) 339 expected_calls_per_thread = calls_per_thread 340 341 if not self.command_class(True).multiprocessing_is_available: 342 # When multiprocessing is unavailable, only a single process is used. 343 # Calls should be evenly distributed across threads. 344 expected_calls_per_thread = calls_per_thread * process_count 345 346 results = self._RunApply(_SleepThenReturnProcAndThreadId, args, 347 process_count, thread_count) 348 usage_dict = {} # (process_id, thread_id): number of tasks performed 349 for (process_id, thread_id) in results: 350 usage_dict[(process_id, thread_id)] = ( 351 usage_dict.get((process_id, thread_id), 0) + 1) 352 353 for (id_tuple, num_tasks_completed) in usage_dict.iteritems(): 354 self.assertEqual(num_tasks_completed, expected_calls_per_thread, 355 'Process %s thread %s completed %s tasks. Expected: %s' % 356 (id_tuple[0], id_tuple[1], num_tasks_completed, 357 expected_calls_per_thread)) 358 359 @RequiresIsolation 360 def testIteratorFailureSingleProcessSingleThread(self): 361 self._TestIteratorFailure(1, 1) 362 363 @RequiresIsolation 364 def testIteratorFailureSingleProcessMultiThread(self): 365 self._TestIteratorFailure(1, 3) 366 367 @RequiresIsolation 368 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 369 def testIteratorFailureMultiProcessSingleThread(self): 370 self._TestIteratorFailure(3, 1) 371 372 @RequiresIsolation 373 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 374 def testIteratorFailureMultiProcessMultiThread(self): 375 self._TestIteratorFailure(3, 3) 376 377 @Timeout 378 def _TestIteratorFailure(self, process_count, thread_count): 379 """Tests apply with a failing iterator.""" 380 # Tests for fail_on_error == False. 381 382 args = FailingIterator(10, [0]) 383 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) 384 self.assertEqual(9, len(results)) 385 386 args = FailingIterator(10, [5]) 387 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) 388 self.assertEqual(9, len(results)) 389 390 args = FailingIterator(10, [9]) 391 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) 392 self.assertEqual(9, len(results)) 393 394 if process_count * thread_count > 1: 395 # In this case, we should ignore the fail_on_error flag. 396 args = FailingIterator(10, [9]) 397 results = self._RunApply(_ReturnOneValue, args, process_count, 398 thread_count, fail_on_error=True) 399 self.assertEqual(9, len(results)) 400 401 args = FailingIterator(10, range(10)) 402 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) 403 self.assertEqual(0, len(results)) 404 405 args = FailingIterator(0, []) 406 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) 407 self.assertEqual(0, len(results)) 408 409 @RequiresIsolation 410 def testTestSharedAttrsWorkSingleProcessSingleThread(self): 411 self._TestSharedAttrsWork(1, 1) 412 413 @RequiresIsolation 414 def testTestSharedAttrsWorkSingleProcessMultiThread(self): 415 self._TestSharedAttrsWork(1, 3) 416 417 @RequiresIsolation 418 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 419 def testTestSharedAttrsWorkMultiProcessSingleThread(self): 420 self._TestSharedAttrsWork(3, 1) 421 422 @RequiresIsolation 423 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 424 def testTestSharedAttrsWorkMultiProcessMultiThread(self): 425 self._TestSharedAttrsWork(3, 3) 426 427 @Timeout 428 def _TestSharedAttrsWork(self, process_count, thread_count): 429 """Tests that Apply successfully uses shared_attrs.""" 430 command_inst = self.command_class(True) 431 command_inst.arg_length_sum = 19 432 args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd'] 433 self._RunApply(_IncrementByLength, args, process_count, 434 thread_count, command_inst=command_inst, 435 shared_attrs=['arg_length_sum']) 436 expected_sum = 19 437 for arg in args: 438 expected_sum += len(arg) 439 self.assertEqual(expected_sum, command_inst.arg_length_sum) 440 441 # Test that shared variables work when the iterator fails at the beginning, 442 # middle, and end. 443 for (failing_iterator, expected_failure_count) in ( 444 (FailingIterator(5, [0]), 1), 445 (FailingIterator(10, [1, 3, 5]), 3), 446 (FailingIterator(5, [4]), 1)): 447 command_inst = self.command_class(True) 448 args = failing_iterator 449 self._RunApply(_ReturnOneValue, args, process_count, thread_count, 450 command_inst=command_inst, shared_attrs=['failure_count']) 451 self.assertEqual( 452 expected_failure_count, command_inst.failure_count, 453 msg='Failure count did not match. Expected: %s, actual: %s ' 454 'for failing iterator of size %s, failing indices %s' % 455 (expected_failure_count, command_inst.failure_count, 456 failing_iterator.size, failing_iterator.failure_indices)) 457 458 @RequiresIsolation 459 def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self): 460 self._TestThreadsSurviveExceptionsInFunc(1, 1) 461 462 @RequiresIsolation 463 def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self): 464 self._TestThreadsSurviveExceptionsInFunc(1, 3) 465 466 @RequiresIsolation 467 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 468 def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self): 469 self._TestThreadsSurviveExceptionsInFunc(3, 1) 470 471 @RequiresIsolation 472 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 473 def testThreadsSurviveExceptionsInFuncMultiProcessMultiThread(self): 474 self._TestThreadsSurviveExceptionsInFunc(3, 3) 475 476 @Timeout 477 def _TestThreadsSurviveExceptionsInFunc(self, process_count, thread_count): 478 command_inst = self.command_class(True) 479 args = ([()] * 5) 480 self._RunApply(_FailureFunc, args, process_count, thread_count, 481 command_inst=command_inst, shared_attrs=['failure_count'], 482 thr_exc_handler=_FailingExceptionHandler) 483 self.assertEqual(len(args), command_inst.failure_count) 484 485 @RequiresIsolation 486 def testThreadsSurviveExceptionsInHandlerSingleProcessSingleThread(self): 487 self._TestThreadsSurviveExceptionsInHandler(1, 1) 488 489 @RequiresIsolation 490 def testThreadsSurviveExceptionsInHandlerSingleProcessMultiThread(self): 491 self._TestThreadsSurviveExceptionsInHandler(1, 3) 492 493 @RequiresIsolation 494 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 495 def testThreadsSurviveExceptionsInHandlerMultiProcessSingleThread(self): 496 self._TestThreadsSurviveExceptionsInHandler(3, 1) 497 498 @RequiresIsolation 499 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 500 def testThreadsSurviveExceptionsInHandlerMultiProcessMultiThread(self): 501 self._TestThreadsSurviveExceptionsInHandler(3, 3) 502 503 @Timeout 504 def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count): 505 command_inst = self.command_class(True) 506 args = ([()] * 5) 507 self._RunApply(_FailureFunc, args, process_count, thread_count, 508 command_inst=command_inst, shared_attrs=['failure_count'], 509 thr_exc_handler=_FailingExceptionHandler) 510 self.assertEqual(len(args), command_inst.failure_count) 511 512 @RequiresIsolation 513 @Timeout 514 def testFailOnErrorFlag(self): 515 """Tests that fail_on_error produces the correct exception on failure.""" 516 def _ExpectCustomException(test_func): 517 try: 518 test_func() 519 self.fail( 520 'Setting fail_on_error should raise any exception encountered.') 521 except CustomException, e: 522 pass 523 except Exception, e: 524 self.fail('Got unexpected error: ' + str(e)) 525 526 def _RunFailureFunc(): 527 command_inst = self.command_class(True) 528 args = ([()] * 5) 529 self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst, 530 shared_attrs=['failure_count'], fail_on_error=True) 531 _ExpectCustomException(_RunFailureFunc) 532 533 def _RunFailingIteratorFirstPosition(): 534 args = FailingIterator(10, [0]) 535 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) 536 self.assertEqual(0, len(results)) 537 _ExpectCustomException(_RunFailingIteratorFirstPosition) 538 539 def _RunFailingIteratorPositionMiddlePosition(): 540 args = FailingIterator(10, [5]) 541 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) 542 self.assertEqual(5, len(results)) 543 _ExpectCustomException(_RunFailingIteratorPositionMiddlePosition) 544 545 def _RunFailingIteratorLastPosition(): 546 args = FailingIterator(10, [9]) 547 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) 548 self.assertEqual(9, len(results)) 549 _ExpectCustomException(_RunFailingIteratorLastPosition) 550 551 def _RunFailingIteratorMultiplePositions(): 552 args = FailingIterator(10, [1, 3, 5]) 553 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) 554 self.assertEqual(1, len(results)) 555 _ExpectCustomException(_RunFailingIteratorMultiplePositions) 556 557 @RequiresIsolation 558 def testRecursiveDepthThreeDifferentFunctionsSingleProcessSingleThread(self): 559 self._TestRecursiveDepthThreeDifferentFunctions(1, 1) 560 561 @RequiresIsolation 562 def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self): 563 self._TestRecursiveDepthThreeDifferentFunctions(1, 3) 564 565 @RequiresIsolation 566 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 567 def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self): 568 self._TestRecursiveDepthThreeDifferentFunctions(3, 1) 569 570 @RequiresIsolation 571 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 572 def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self): 573 self._TestRecursiveDepthThreeDifferentFunctions(3, 3) 574 575 @Timeout 576 def _TestRecursiveDepthThreeDifferentFunctions(self, process_count, 577 thread_count): 578 """Tests recursive application of Apply. 579 580 Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls 581 Apply(C). 582 583 Args: 584 process_count: Number of processes to use. 585 thread_count: Number of threads to use. 586 """ 587 base_args = [3, 1, 4, 1, 5] 588 args = [[process_count, thread_count, count] for count in base_args] 589 590 results = self._RunApply(_ReApplyWithReplicatedArguments, args, 591 process_count, thread_count) 592 self.assertEqual(7 * (sum(base_args) + len(base_args)), sum(results)) 593 594 @RequiresIsolation 595 def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self): 596 self._TestExceptionInProducerRaisesAndTerminates(1, 1) 597 598 @RequiresIsolation 599 def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self): 600 self._TestExceptionInProducerRaisesAndTerminates(1, 3) 601 602 @RequiresIsolation 603 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 604 def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self): 605 self._TestExceptionInProducerRaisesAndTerminates(3, 1) 606 607 @RequiresIsolation 608 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 609 def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self): 610 self._TestExceptionInProducerRaisesAndTerminates(3, 3) 611 612 @Timeout 613 def _TestExceptionInProducerRaisesAndTerminates(self, process_count, 614 thread_count): 615 args = self # The ProducerThread will try and fail to iterate over this. 616 try: 617 self._RunApply(_ReturnOneValue, args, process_count, thread_count) 618 self.fail('Did not raise expected exception.') 619 except TypeError: 620 pass 621 622 @RequiresIsolation 623 def testSkippedArgumentsSingleThreadSingleProcess(self): 624 self._TestSkippedArguments(1, 1) 625 626 @RequiresIsolation 627 def testSkippedArgumentsMultiThreadSingleProcess(self): 628 self._TestSkippedArguments(1, 3) 629 630 @RequiresIsolation 631 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 632 def testSkippedArgumentsSingleThreadMultiProcess(self): 633 self._TestSkippedArguments(3, 1) 634 635 @RequiresIsolation 636 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 637 def testSkippedArgumentsMultiThreadMultiProcess(self): 638 self._TestSkippedArguments(3, 3) 639 640 @Timeout 641 def _TestSkippedArguments(self, process_count, thread_count): 642 643 # Skip a proper subset of the arguments. 644 n = 2 * process_count * thread_count 645 args = range(1, n + 1) 646 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, 647 arg_checker=_SkipEvenNumbersArgChecker) 648 self.assertEqual(n / 2, len(results)) # We know n is even. 649 self.assertEqual(n / 2, sum(results)) 650 651 # Skip all arguments. 652 args = [2 * x for x in args] 653 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, 654 arg_checker=_SkipEvenNumbersArgChecker) 655 self.assertEqual(0, len(results)) 656 657 658 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework): 659 """Tests parallelism framework works with multiprocessing module unavailable. 660 661 Notably, this test has no way to override previous calls 662 to gslib.util.CheckMultiprocessingAvailableAndInit to prevent the 663 initialization of all of the global variables in command.py, so this still 664 behaves slightly differently than the behavior one would see on a machine 665 where the multiprocessing functionality is actually not available (in 666 particular, it will not catch the case where a global variable that is not 667 available for the sequential path is referenced before initialization). 668 """ 669 command_class = FakeCommandWithoutMultiprocessingModule 670