Home | History | Annotate | Download | only in tests
      1 # -*- coding: utf-8 -*-
      2 # Copyright 2013 Google Inc. All Rights Reserved.
      3 #
      4 # Permission is hereby granted, free of charge, to any person obtaining a
      5 # copy of this software and associated documentation files (the
      6 # "Software"), to deal in the Software without restriction, including
      7 # without limitation the rights to use, copy, modify, merge, publish, dis-
      8 # tribute, sublicense, and/or sell copies of the Software, and to permit
      9 # persons to whom the Software is furnished to do so, subject to the fol-
     10 # lowing conditions:
     11 #
     12 # The above copyright notice and this permission notice shall be included
     13 # in all copies or substantial portions of the Software.
     14 #
     15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
     16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
     17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
     18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
     19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     21 # IN THE SOFTWARE.
     22 """Unit tests for gsutil parallelism framework."""
     23 
     24 from __future__ import absolute_import
     25 
     26 import functools
     27 import os
     28 import signal
     29 import threading
     30 import time
     31 
     32 from boto.storage_uri import BucketStorageUri
     33 from gslib import cs_api_map
     34 from gslib.command import Command
     35 from gslib.command import CreateGsutilLogger
     36 from gslib.command import DummyArgChecker
     37 import gslib.tests.testcase as testcase
     38 from gslib.tests.testcase.base import RequiresIsolation
     39 from gslib.tests.util import unittest
     40 from gslib.util import CheckMultiprocessingAvailableAndInit
     41 from gslib.util import IS_WINDOWS
     42 
     43 
     44 # Amount of time for an individual test to run before timing out. We need a
     45 # reasonably high value since if many tests are running in parallel, an
     46 # individual test may take a while to complete.
     47 _TEST_TIMEOUT_SECONDS = 120
     48 
     49 
     50 def Timeout(func):
     51   """Decorator used to provide a timeout for functions."""
     52   @functools.wraps(func)
     53   def Wrapper(*args, **kwargs):
     54     if not IS_WINDOWS:
     55       signal.signal(signal.SIGALRM, _HandleAlarm)
     56       signal.alarm(_TEST_TIMEOUT_SECONDS)
     57     try:
     58       func(*args, **kwargs)
     59     finally:
     60       if not IS_WINDOWS:
     61         signal.alarm(0)  # Cancel the alarm.
     62   return Wrapper
     63 
     64 
     65 # pylint: disable=unused-argument
     66 def _HandleAlarm(signal_num, cur_stack_frame):
     67   raise Exception('Test timed out.')
     68 
     69 
     70 class CustomException(Exception):
     71 
     72   def __init__(self, exception_str):
     73     super(CustomException, self).__init__(exception_str)
     74 
     75 
     76 def _ReturnOneValue(cls, args, thread_state=None):
     77   return 1
     78 
     79 
     80 def _ReturnProcAndThreadId(cls, args, thread_state=None):
     81   return (os.getpid(), threading.currentThread().ident)
     82 
     83 
     84 def _SleepThenReturnProcAndThreadId(cls, args, thread_state=None):
     85   # This can fail if the total time to spawn new processes and threads takes
     86   # longer than 5 seconds, but if that occurs, then we have a performance
     87   # problem that needs to be addressed.
     88   time.sleep(5)
     89   return _ReturnProcAndThreadId(cls, args, thread_state=thread_state)
     90 
     91 
     92 def _FailureFunc(cls, args, thread_state=None):
     93   raise CustomException('Failing on purpose.')
     94 
     95 
     96 def _FailingExceptionHandler(cls, e):
     97   cls.failure_count += 1
     98   raise CustomException('Exception handler failing on purpose.')
     99 
    100 
    101 def _ExceptionHandler(cls, e):
    102   cls.logger.exception(e)
    103   cls.failure_count += 1
    104 
    105 
    106 def _IncrementByLength(cls, args, thread_state=None):
    107   cls.arg_length_sum += len(args)
    108 
    109 
    110 def _AdjustProcessCountIfWindows(process_count):
    111   if IS_WINDOWS:
    112     return 1
    113   else:
    114     return process_count
    115 
    116 
    117 def _ReApplyWithReplicatedArguments(cls, args, thread_state=None):
    118   """Calls Apply with arguments repeated seven times.
    119 
    120   The first two elements of args should be the process and thread counts,
    121   respectively, to be used for the recursive calls.
    122 
    123   Args:
    124     cls: The Command class to call Apply on.
    125     args: Arguments to pass to Apply.
    126     thread_state: Unused, required by function signature.
    127 
    128   Returns:
    129     Number of values returned by the two calls to Apply.
    130   """
    131   new_args = [args] * 7
    132   process_count = _AdjustProcessCountIfWindows(args[0])
    133   thread_count = args[1]
    134   return_values = cls.Apply(_PerformNRecursiveCalls, new_args,
    135                             _ExceptionHandler, arg_checker=DummyArgChecker,
    136                             process_count=process_count,
    137                             thread_count=thread_count,
    138                             should_return_results=True)
    139   ret = sum(return_values)
    140 
    141   return_values = cls.Apply(_ReturnOneValue, new_args,
    142                             _ExceptionHandler, arg_checker=DummyArgChecker,
    143                             process_count=process_count,
    144                             thread_count=thread_count,
    145                             should_return_results=True)
    146 
    147   return len(return_values) + ret
    148 
    149 
    150 def _PerformNRecursiveCalls(cls, args, thread_state=None):
    151   """Calls Apply to perform N recursive calls.
    152 
    153   The first two elements of args should be the process and thread counts,
    154   respectively, to be used for the recursive calls, while N is the third element
    155   (the number of recursive calls to make).
    156 
    157   Args:
    158     cls: The Command class to call Apply on.
    159     args: Arguments to pass to Apply.
    160     thread_state: Unused, required by function signature.
    161 
    162   Returns:
    163     Number of values returned by the call to Apply.
    164   """
    165   process_count = _AdjustProcessCountIfWindows(args[0])
    166   thread_count = args[1]
    167   return_values = cls.Apply(_ReturnOneValue, [()] * args[2], _ExceptionHandler,
    168                             arg_checker=DummyArgChecker,
    169                             process_count=process_count,
    170                             thread_count=thread_count,
    171                             should_return_results=True)
    172   return len(return_values)
    173 
    174 
    175 def _SkipEvenNumbersArgChecker(cls, arg):
    176   return arg % 2 != 0
    177 
    178 
    179 class FailingIterator(object):
    180 
    181   def __init__(self, size, failure_indices):
    182     self.size = size
    183     self.failure_indices = failure_indices
    184     self.current_index = 0
    185 
    186   def __iter__(self):
    187     return self
    188 
    189   def next(self):
    190     if self.current_index == self.size:
    191       raise StopIteration('')
    192     elif self.current_index in self.failure_indices:
    193       self.current_index += 1
    194       raise CustomException(
    195           'Iterator failing on purpose at index %d.' % self.current_index)
    196     else:
    197       self.current_index += 1
    198       return self.current_index - 1
    199 
    200 
    201 class FakeCommand(Command):
    202   """Fake command class for overriding command instance state."""
    203   command_spec = Command.CreateCommandSpec(
    204       'fake',
    205       command_name_aliases=[],
    206   )
    207   # Help specification. See help_provider.py for documentation.
    208   help_spec = Command.HelpSpec(
    209       help_name='fake',
    210       help_name_aliases=[],
    211       help_type='command_help',
    212       help_one_line_summary='Something to take up space.',
    213       help_text='Something else to take up space.',
    214       subcommand_help_text={},
    215   )
    216 
    217   def __init__(self, do_parallel):
    218     self.bucket_storage_uri_class = BucketStorageUri
    219     support_map = {
    220         'gs': ['JSON'],
    221         's3': ['XML']
    222     }
    223     default_map = {
    224         'gs': 'JSON',
    225         's3': 'XML'
    226     }
    227     self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap(
    228         cs_api_map.GsutilApiClassMapFactory, support_map, default_map)
    229     self.logger = CreateGsutilLogger('FakeCommand')
    230     self.parallel_operations = do_parallel
    231     self.failure_count = 0
    232     self.multiprocessing_is_available = (
    233         CheckMultiprocessingAvailableAndInit().is_available)
    234     self.debug = 0
    235 
    236 
    237 class FakeCommandWithoutMultiprocessingModule(FakeCommand):
    238 
    239   def __init__(self, do_parallel):
    240     super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel)
    241     self.multiprocessing_is_available = False
    242 
    243 
    244 # TODO: Figure out a good way to test that ctrl+C really stops execution,
    245 #       and also that ctrl+C works when there are still tasks enqueued.
    246 class TestParallelismFramework(testcase.GsUtilUnitTestCase):
    247   """gsutil parallelism framework test suite."""
    248 
    249   command_class = FakeCommand
    250 
    251   def _RunApply(self, func, args_iterator, process_count, thread_count,
    252                 command_inst=None, shared_attrs=None, fail_on_error=False,
    253                 thr_exc_handler=None, arg_checker=DummyArgChecker):
    254     command_inst = command_inst or self.command_class(True)
    255     exception_handler = thr_exc_handler or _ExceptionHandler
    256 
    257     return command_inst.Apply(func, args_iterator, exception_handler,
    258                               thread_count=thread_count,
    259                               process_count=process_count,
    260                               arg_checker=arg_checker,
    261                               should_return_results=True,
    262                               shared_attrs=shared_attrs,
    263                               fail_on_error=fail_on_error)
    264 
    265   @RequiresIsolation
    266   def testBasicApplySingleProcessSingleThread(self):
    267     self._TestBasicApply(1, 1)
    268 
    269   @RequiresIsolation
    270   def testBasicApplySingleProcessMultiThread(self):
    271     self._TestBasicApply(1, 3)
    272 
    273   @RequiresIsolation
    274   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    275   def testBasicApplyMultiProcessSingleThread(self):
    276     self._TestBasicApply(3, 1)
    277 
    278   @RequiresIsolation
    279   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    280   def testBasicApplyMultiProcessMultiThread(self):
    281     self._TestBasicApply(3, 3)
    282 
    283   @Timeout
    284   def _TestBasicApply(self, process_count, thread_count):
    285     args = [()] * (17 * process_count * thread_count + 1)
    286 
    287     results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
    288     self.assertEqual(len(args), len(results))
    289 
    290   @RequiresIsolation
    291   def testNoTasksSingleProcessSingleThread(self):
    292     self._TestApplyWithNoTasks(1, 1)
    293 
    294   @RequiresIsolation
    295   def testNoTasksSingleProcessMultiThread(self):
    296     self._TestApplyWithNoTasks(1, 3)
    297 
    298   @RequiresIsolation
    299   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    300   def testNoTasksMultiProcessSingleThread(self):
    301     self._TestApplyWithNoTasks(3, 1)
    302 
    303   @RequiresIsolation
    304   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    305   def testNoTasksMultiProcessMultiThread(self):
    306     self._TestApplyWithNoTasks(3, 3)
    307 
    308   @Timeout
    309   def _TestApplyWithNoTasks(self, process_count, thread_count):
    310     """Tests that calling Apply with no tasks releases locks/semaphores."""
    311     empty_args = [()]
    312 
    313     for _ in range(process_count * thread_count + 1):
    314       self._RunApply(_ReturnOneValue, empty_args, process_count, thread_count)
    315 
    316     # Ensure that work can still be performed.
    317     self._TestBasicApply(process_count, thread_count)
    318 
    319   @RequiresIsolation
    320   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    321   def testApplySaturatesMultiProcessSingleThread(self):
    322     self._TestApplySaturatesAvailableProcessesAndThreads(3, 1)
    323 
    324   @RequiresIsolation
    325   def testApplySaturatesSingleProcessMultiThread(self):
    326     self._TestApplySaturatesAvailableProcessesAndThreads(1, 3)
    327 
    328   @RequiresIsolation
    329   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    330   def testApplySaturatesMultiProcessMultiThread(self):
    331     self._TestApplySaturatesAvailableProcessesAndThreads(3, 3)
    332 
    333   @RequiresIsolation
    334   def _TestApplySaturatesAvailableProcessesAndThreads(self, process_count,
    335                                                       thread_count):
    336     """Tests that created processes and threads evenly share tasks."""
    337     calls_per_thread = 2
    338     args = [()] * (process_count * thread_count * calls_per_thread)
    339     expected_calls_per_thread = calls_per_thread
    340 
    341     if not self.command_class(True).multiprocessing_is_available:
    342       # When multiprocessing is unavailable, only a single process is used.
    343       # Calls should be evenly distributed across threads.
    344       expected_calls_per_thread = calls_per_thread * process_count
    345 
    346     results = self._RunApply(_SleepThenReturnProcAndThreadId, args,
    347                              process_count, thread_count)
    348     usage_dict = {}  # (process_id, thread_id): number of tasks performed
    349     for (process_id, thread_id) in results:
    350       usage_dict[(process_id, thread_id)] = (
    351           usage_dict.get((process_id, thread_id), 0) + 1)
    352 
    353     for (id_tuple, num_tasks_completed) in usage_dict.iteritems():
    354       self.assertEqual(num_tasks_completed, expected_calls_per_thread,
    355                        'Process %s thread %s completed %s tasks. Expected: %s' %
    356                        (id_tuple[0], id_tuple[1], num_tasks_completed,
    357                         expected_calls_per_thread))
    358 
    359   @RequiresIsolation
    360   def testIteratorFailureSingleProcessSingleThread(self):
    361     self._TestIteratorFailure(1, 1)
    362 
    363   @RequiresIsolation
    364   def testIteratorFailureSingleProcessMultiThread(self):
    365     self._TestIteratorFailure(1, 3)
    366 
    367   @RequiresIsolation
    368   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    369   def testIteratorFailureMultiProcessSingleThread(self):
    370     self._TestIteratorFailure(3, 1)
    371 
    372   @RequiresIsolation
    373   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    374   def testIteratorFailureMultiProcessMultiThread(self):
    375     self._TestIteratorFailure(3, 3)
    376 
    377   @Timeout
    378   def _TestIteratorFailure(self, process_count, thread_count):
    379     """Tests apply with a failing iterator."""
    380     # Tests for fail_on_error == False.
    381 
    382     args = FailingIterator(10, [0])
    383     results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
    384     self.assertEqual(9, len(results))
    385 
    386     args = FailingIterator(10, [5])
    387     results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
    388     self.assertEqual(9, len(results))
    389 
    390     args = FailingIterator(10, [9])
    391     results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
    392     self.assertEqual(9, len(results))
    393 
    394     if process_count * thread_count > 1:
    395       # In this case, we should ignore the fail_on_error flag.
    396       args = FailingIterator(10, [9])
    397       results = self._RunApply(_ReturnOneValue, args, process_count,
    398                                thread_count, fail_on_error=True)
    399       self.assertEqual(9, len(results))
    400 
    401     args = FailingIterator(10, range(10))
    402     results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
    403     self.assertEqual(0, len(results))
    404 
    405     args = FailingIterator(0, [])
    406     results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
    407     self.assertEqual(0, len(results))
    408 
    409   @RequiresIsolation
    410   def testTestSharedAttrsWorkSingleProcessSingleThread(self):
    411     self._TestSharedAttrsWork(1, 1)
    412 
    413   @RequiresIsolation
    414   def testTestSharedAttrsWorkSingleProcessMultiThread(self):
    415     self._TestSharedAttrsWork(1, 3)
    416 
    417   @RequiresIsolation
    418   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    419   def testTestSharedAttrsWorkMultiProcessSingleThread(self):
    420     self._TestSharedAttrsWork(3, 1)
    421 
    422   @RequiresIsolation
    423   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    424   def testTestSharedAttrsWorkMultiProcessMultiThread(self):
    425     self._TestSharedAttrsWork(3, 3)
    426 
    427   @Timeout
    428   def _TestSharedAttrsWork(self, process_count, thread_count):
    429     """Tests that Apply successfully uses shared_attrs."""
    430     command_inst = self.command_class(True)
    431     command_inst.arg_length_sum = 19
    432     args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd']
    433     self._RunApply(_IncrementByLength, args, process_count,
    434                    thread_count, command_inst=command_inst,
    435                    shared_attrs=['arg_length_sum'])
    436     expected_sum = 19
    437     for arg in args:
    438       expected_sum += len(arg)
    439     self.assertEqual(expected_sum, command_inst.arg_length_sum)
    440 
    441     # Test that shared variables work when the iterator fails at the beginning,
    442     # middle, and end.
    443     for (failing_iterator, expected_failure_count) in (
    444         (FailingIterator(5, [0]), 1),
    445         (FailingIterator(10, [1, 3, 5]), 3),
    446         (FailingIterator(5, [4]), 1)):
    447       command_inst = self.command_class(True)
    448       args = failing_iterator
    449       self._RunApply(_ReturnOneValue, args, process_count, thread_count,
    450                      command_inst=command_inst, shared_attrs=['failure_count'])
    451       self.assertEqual(
    452           expected_failure_count, command_inst.failure_count,
    453           msg='Failure count did not match. Expected: %s, actual: %s '
    454           'for failing iterator of size %s, failing indices %s' %
    455           (expected_failure_count, command_inst.failure_count,
    456            failing_iterator.size, failing_iterator.failure_indices))
    457 
    458   @RequiresIsolation
    459   def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self):
    460     self._TestThreadsSurviveExceptionsInFunc(1, 1)
    461 
    462   @RequiresIsolation
    463   def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self):
    464     self._TestThreadsSurviveExceptionsInFunc(1, 3)
    465 
    466   @RequiresIsolation
    467   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    468   def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self):
    469     self._TestThreadsSurviveExceptionsInFunc(3, 1)
    470 
    471   @RequiresIsolation
    472   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    473   def testThreadsSurviveExceptionsInFuncMultiProcessMultiThread(self):
    474     self._TestThreadsSurviveExceptionsInFunc(3, 3)
    475 
    476   @Timeout
    477   def _TestThreadsSurviveExceptionsInFunc(self, process_count, thread_count):
    478     command_inst = self.command_class(True)
    479     args = ([()] * 5)
    480     self._RunApply(_FailureFunc, args, process_count, thread_count,
    481                    command_inst=command_inst, shared_attrs=['failure_count'],
    482                    thr_exc_handler=_FailingExceptionHandler)
    483     self.assertEqual(len(args), command_inst.failure_count)
    484 
    485   @RequiresIsolation
    486   def testThreadsSurviveExceptionsInHandlerSingleProcessSingleThread(self):
    487     self._TestThreadsSurviveExceptionsInHandler(1, 1)
    488 
    489   @RequiresIsolation
    490   def testThreadsSurviveExceptionsInHandlerSingleProcessMultiThread(self):
    491     self._TestThreadsSurviveExceptionsInHandler(1, 3)
    492 
    493   @RequiresIsolation
    494   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    495   def testThreadsSurviveExceptionsInHandlerMultiProcessSingleThread(self):
    496     self._TestThreadsSurviveExceptionsInHandler(3, 1)
    497 
    498   @RequiresIsolation
    499   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    500   def testThreadsSurviveExceptionsInHandlerMultiProcessMultiThread(self):
    501     self._TestThreadsSurviveExceptionsInHandler(3, 3)
    502 
    503   @Timeout
    504   def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count):
    505     command_inst = self.command_class(True)
    506     args = ([()] * 5)
    507     self._RunApply(_FailureFunc, args, process_count, thread_count,
    508                    command_inst=command_inst, shared_attrs=['failure_count'],
    509                    thr_exc_handler=_FailingExceptionHandler)
    510     self.assertEqual(len(args), command_inst.failure_count)
    511 
    512   @RequiresIsolation
    513   @Timeout
    514   def testFailOnErrorFlag(self):
    515     """Tests that fail_on_error produces the correct exception on failure."""
    516     def _ExpectCustomException(test_func):
    517       try:
    518         test_func()
    519         self.fail(
    520             'Setting fail_on_error should raise any exception encountered.')
    521       except CustomException, e:
    522         pass
    523       except Exception, e:
    524         self.fail('Got unexpected error: ' + str(e))
    525 
    526     def _RunFailureFunc():
    527       command_inst = self.command_class(True)
    528       args = ([()] * 5)
    529       self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst,
    530                      shared_attrs=['failure_count'], fail_on_error=True)
    531     _ExpectCustomException(_RunFailureFunc)
    532 
    533     def _RunFailingIteratorFirstPosition():
    534       args = FailingIterator(10, [0])
    535       results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
    536       self.assertEqual(0, len(results))
    537     _ExpectCustomException(_RunFailingIteratorFirstPosition)
    538 
    539     def _RunFailingIteratorPositionMiddlePosition():
    540       args = FailingIterator(10, [5])
    541       results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
    542       self.assertEqual(5, len(results))
    543     _ExpectCustomException(_RunFailingIteratorPositionMiddlePosition)
    544 
    545     def _RunFailingIteratorLastPosition():
    546       args = FailingIterator(10, [9])
    547       results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
    548       self.assertEqual(9, len(results))
    549     _ExpectCustomException(_RunFailingIteratorLastPosition)
    550 
    551     def _RunFailingIteratorMultiplePositions():
    552       args = FailingIterator(10, [1, 3, 5])
    553       results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
    554       self.assertEqual(1, len(results))
    555     _ExpectCustomException(_RunFailingIteratorMultiplePositions)
    556 
    557   @RequiresIsolation
    558   def testRecursiveDepthThreeDifferentFunctionsSingleProcessSingleThread(self):
    559     self._TestRecursiveDepthThreeDifferentFunctions(1, 1)
    560 
    561   @RequiresIsolation
    562   def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self):
    563     self._TestRecursiveDepthThreeDifferentFunctions(1, 3)
    564 
    565   @RequiresIsolation
    566   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    567   def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self):
    568     self._TestRecursiveDepthThreeDifferentFunctions(3, 1)
    569 
    570   @RequiresIsolation
    571   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    572   def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self):
    573     self._TestRecursiveDepthThreeDifferentFunctions(3, 3)
    574 
    575   @Timeout
    576   def _TestRecursiveDepthThreeDifferentFunctions(self, process_count,
    577                                                  thread_count):
    578     """Tests recursive application of Apply.
    579 
    580     Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls
    581     Apply(C).
    582 
    583     Args:
    584       process_count: Number of processes to use.
    585       thread_count: Number of threads to use.
    586     """
    587     base_args = [3, 1, 4, 1, 5]
    588     args = [[process_count, thread_count, count] for count in base_args]
    589 
    590     results = self._RunApply(_ReApplyWithReplicatedArguments, args,
    591                              process_count, thread_count)
    592     self.assertEqual(7 * (sum(base_args) + len(base_args)), sum(results))
    593 
    594   @RequiresIsolation
    595   def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self):
    596     self._TestExceptionInProducerRaisesAndTerminates(1, 1)
    597 
    598   @RequiresIsolation
    599   def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self):
    600     self._TestExceptionInProducerRaisesAndTerminates(1, 3)
    601 
    602   @RequiresIsolation
    603   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    604   def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self):
    605     self._TestExceptionInProducerRaisesAndTerminates(3, 1)
    606 
    607   @RequiresIsolation
    608   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    609   def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self):
    610     self._TestExceptionInProducerRaisesAndTerminates(3, 3)
    611 
    612   @Timeout
    613   def _TestExceptionInProducerRaisesAndTerminates(self, process_count,
    614                                                   thread_count):
    615     args = self  # The ProducerThread will try and fail to iterate over this.
    616     try:
    617       self._RunApply(_ReturnOneValue, args, process_count, thread_count)
    618       self.fail('Did not raise expected exception.')
    619     except TypeError:
    620       pass
    621 
    622   @RequiresIsolation
    623   def testSkippedArgumentsSingleThreadSingleProcess(self):
    624     self._TestSkippedArguments(1, 1)
    625 
    626   @RequiresIsolation
    627   def testSkippedArgumentsMultiThreadSingleProcess(self):
    628     self._TestSkippedArguments(1, 3)
    629 
    630   @RequiresIsolation
    631   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    632   def testSkippedArgumentsSingleThreadMultiProcess(self):
    633     self._TestSkippedArguments(3, 1)
    634 
    635   @RequiresIsolation
    636   @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
    637   def testSkippedArgumentsMultiThreadMultiProcess(self):
    638     self._TestSkippedArguments(3, 3)
    639 
    640   @Timeout
    641   def _TestSkippedArguments(self, process_count, thread_count):
    642 
    643     # Skip a proper subset of the arguments.
    644     n = 2 * process_count * thread_count
    645     args = range(1, n + 1)
    646     results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
    647                              arg_checker=_SkipEvenNumbersArgChecker)
    648     self.assertEqual(n / 2, len(results))  # We know n is even.
    649     self.assertEqual(n / 2, sum(results))
    650 
    651     # Skip all arguments.
    652     args = [2 * x for x in args]
    653     results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
    654                              arg_checker=_SkipEvenNumbersArgChecker)
    655     self.assertEqual(0, len(results))
    656 
    657 
    658 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework):
    659   """Tests parallelism framework works with multiprocessing module unavailable.
    660 
    661   Notably, this test has no way to override previous calls
    662   to gslib.util.CheckMultiprocessingAvailableAndInit to prevent the
    663   initialization of all of the global variables in command.py, so this still
    664   behaves slightly differently than the behavior one would see on a machine
    665   where the multiprocessing functionality is actually not available (in
    666   particular, it will not catch the case where a global variable that is not
    667   available for the sequential path is referenced before initialization).
    668   """
    669   command_class = FakeCommandWithoutMultiprocessingModule
    670