1 # -*- coding: utf-8 -*- 2 # Copyright 2011 Google Inc. All Rights Reserved. 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 """Implementation of gsutil test command.""" 16 17 from __future__ import absolute_import 18 19 from collections import namedtuple 20 import logging 21 import os 22 import subprocess 23 import sys 24 import tempfile 25 import textwrap 26 import time 27 28 import gslib 29 from gslib.command import Command 30 from gslib.command import ResetFailureCount 31 from gslib.exception import CommandException 32 from gslib.project_id import PopulateProjectId 33 import gslib.tests as tests 34 from gslib.util import IS_WINDOWS 35 from gslib.util import NO_MAX 36 37 38 # For Python 2.6, unittest2 is required to run the tests. If it's not available, 39 # display an error if the test command is run instead of breaking the whole 40 # program. 41 # pylint: disable=g-import-not-at-top 42 try: 43 from gslib.tests.util import GetTestNames 44 from gslib.tests.util import unittest 45 except ImportError as e: 46 if 'unittest2' in str(e): 47 unittest = None 48 GetTestNames = None # pylint: disable=invalid-name 49 else: 50 raise 51 52 53 try: 54 import coverage 55 except ImportError: 56 coverage = None 57 58 59 _DEFAULT_TEST_PARALLEL_PROCESSES = 5 60 _DEFAULT_S3_TEST_PARALLEL_PROCESSES = 50 61 _SEQUENTIAL_ISOLATION_FLAG = 'sequential_only' 62 63 64 _SYNOPSIS = """ 65 gsutil test [-l] [-u] [-f] [command command...] 66 """ 67 68 _DETAILED_HELP_TEXT = (""" 69 <B>SYNOPSIS</B> 70 """ + _SYNOPSIS + """ 71 72 73 <B>DESCRIPTION</B> 74 The gsutil test command runs the gsutil unit tests and integration tests. 75 The unit tests use an in-memory mock storage service implementation, while 76 the integration tests send requests to the production service using the 77 preferred API set in the boto configuration file (see "gsutil help apis" for 78 details). 79 80 To run both the unit tests and integration tests, run the command with no 81 arguments: 82 83 gsutil test 84 85 To run the unit tests only (which run quickly): 86 87 gsutil test -u 88 89 Tests run in parallel regardless of whether the top-level -m flag is 90 present. To limit the number of tests run in parallel to 10 at a time: 91 92 gsutil test -p 10 93 94 To force tests to run sequentially: 95 96 gsutil test -p 1 97 98 To have sequentially-run tests stop running immediately when an error occurs: 99 100 gsutil test -f 101 102 To run tests for one or more individual commands add those commands as 103 arguments. For example, the following command will run the cp and mv command 104 tests: 105 106 gsutil test cp mv 107 108 To list available tests, run the test command with the -l argument: 109 110 gsutil test -l 111 112 The tests are defined in the code under the gslib/tests module. Each test 113 file is of the format test_[name].py where [name] is the test name you can 114 pass to this command. For example, running "gsutil test ls" would run the 115 tests in "gslib/tests/test_ls.py". 116 117 You can also run an individual test class or function name by passing the 118 test module followed by the class name and optionally a test name. For 119 example, to run the an entire test class by name: 120 121 gsutil test naming.GsutilNamingTests 122 123 or an individual test function: 124 125 gsutil test cp.TestCp.test_streaming 126 127 You can list the available tests under a module or class by passing arguments 128 with the -l option. For example, to list all available test functions in the 129 cp module: 130 131 gsutil test -l cp 132 133 To output test coverage: 134 135 gsutil test -c -p 500 136 coverage html 137 138 This will output an HTML report to a directory named 'htmlcov'. 139 140 141 <B>OPTIONS</B> 142 -c Output coverage information. 143 144 -f Exit on first sequential test failure. 145 146 -l List available tests. 147 148 -p N Run at most N tests in parallel. The default value is %d. 149 150 -s Run tests against S3 instead of GS. 151 152 -u Only run unit tests. 153 """ % _DEFAULT_TEST_PARALLEL_PROCESSES) 154 155 156 TestProcessData = namedtuple('TestProcessData', 157 'name return_code stdout stderr') 158 159 160 def MakeCustomTestResultClass(total_tests): 161 """Creates a closure of CustomTestResult. 162 163 Args: 164 total_tests: The total number of tests being run. 165 166 Returns: 167 An instance of CustomTestResult. 168 """ 169 170 class CustomTestResult(unittest.TextTestResult): 171 """A subclass of unittest.TextTestResult that prints a progress report.""" 172 173 def startTest(self, test): 174 super(CustomTestResult, self).startTest(test) 175 if self.dots: 176 test_id = '.'.join(test.id().split('.')[-2:]) 177 message = ('\r%d/%d finished - E[%d] F[%d] s[%d] - %s' % ( 178 self.testsRun, total_tests, len(self.errors), 179 len(self.failures), len(self.skipped), test_id)) 180 message = message[:73] 181 message = message.ljust(73) 182 self.stream.write('%s - ' % message) 183 184 return CustomTestResult 185 186 187 def GetTestNamesFromSuites(test_suite): 188 """Takes a list of test suites and returns a list of contained test names.""" 189 suites = [test_suite] 190 test_names = [] 191 while suites: 192 suite = suites.pop() 193 for test in suite: 194 if isinstance(test, unittest.TestSuite): 195 suites.append(test) 196 else: 197 test_names.append(test.id()[len('gslib.tests.test_'):]) 198 return test_names 199 200 201 # pylint: disable=protected-access 202 # Need to get into the guts of unittest to evaluate test cases for parallelism. 203 def TestCaseToName(test_case): 204 """Converts a python.unittest to its gsutil test-callable name.""" 205 return (str(test_case.__class__).split('\'')[1] + '.' + 206 test_case._testMethodName) 207 208 209 # pylint: disable=protected-access 210 # Need to get into the guts of unittest to evaluate test cases for parallelism. 211 def SplitParallelizableTestSuite(test_suite): 212 """Splits a test suite into groups with different running properties. 213 214 Args: 215 test_suite: A python unittest test suite. 216 217 Returns: 218 4-part tuple of lists of test names: 219 (tests that must be run sequentially, 220 tests that must be isolated in a separate process but can be run either 221 sequentially or in parallel, 222 unit tests that can be run in parallel, 223 integration tests that can run in parallel) 224 """ 225 # pylint: disable=import-not-at-top 226 # Need to import this after test globals are set so that skip functions work. 227 from gslib.tests.testcase.unit_testcase import GsUtilUnitTestCase 228 isolated_tests = [] 229 sequential_tests = [] 230 parallelizable_integration_tests = [] 231 parallelizable_unit_tests = [] 232 233 items_to_evaluate = [test_suite] 234 cases_to_evaluate = [] 235 # Expand the test suites into individual test cases: 236 while items_to_evaluate: 237 suite_or_case = items_to_evaluate.pop() 238 if isinstance(suite_or_case, unittest.suite.TestSuite): 239 for item in suite_or_case._tests: 240 items_to_evaluate.append(item) 241 elif isinstance(suite_or_case, unittest.TestCase): 242 cases_to_evaluate.append(suite_or_case) 243 244 for test_case in cases_to_evaluate: 245 test_method = getattr(test_case, test_case._testMethodName, None) 246 if getattr(test_method, 'requires_isolation', False): 247 # Test must be isolated to a separate process, even it if is being 248 # run sequentially. 249 isolated_tests.append(TestCaseToName(test_case)) 250 elif not getattr(test_method, 'is_parallelizable', True): 251 sequential_tests.append(TestCaseToName(test_case)) 252 elif isinstance(test_case, GsUtilUnitTestCase): 253 parallelizable_unit_tests.append(TestCaseToName(test_case)) 254 else: 255 parallelizable_integration_tests.append(TestCaseToName(test_case)) 256 257 return (sorted(sequential_tests), 258 sorted(isolated_tests), 259 sorted(parallelizable_unit_tests), 260 sorted(parallelizable_integration_tests)) 261 262 263 def CountFalseInList(input_list): 264 """Counts number of falses in the input list.""" 265 num_false = 0 266 for item in input_list: 267 if not item: 268 num_false += 1 269 return num_false 270 271 272 def CreateTestProcesses(parallel_tests, test_index, process_list, process_done, 273 max_parallel_tests, root_coverage_file=None): 274 """Creates test processes to run tests in parallel. 275 276 Args: 277 parallel_tests: List of all parallel tests. 278 test_index: List index of last created test before this function call. 279 process_list: List of running subprocesses. Created processes are appended 280 to this list. 281 process_done: List of booleans indicating process completion. One 'False' 282 will be added per process created. 283 max_parallel_tests: Maximum number of tests to run in parallel. 284 root_coverage_file: The root .coverage filename if coverage is requested. 285 286 Returns: 287 Index of last created test. 288 """ 289 orig_test_index = test_index 290 executable_prefix = [sys.executable] if sys.executable and IS_WINDOWS else [] 291 s3_argument = ['-s'] if tests.util.RUN_S3_TESTS else [] 292 293 process_create_start_time = time.time() 294 last_log_time = process_create_start_time 295 while (CountFalseInList(process_done) < max_parallel_tests and 296 test_index < len(parallel_tests)): 297 env = os.environ.copy() 298 if root_coverage_file: 299 env['GSUTIL_COVERAGE_OUTPUT_FILE'] = root_coverage_file 300 process_list.append(subprocess.Popen( 301 executable_prefix + [gslib.GSUTIL_PATH] + 302 ['-o', 'GSUtil:default_project_id=' + PopulateProjectId()] + 303 ['test'] + s3_argument + 304 ['--' + _SEQUENTIAL_ISOLATION_FLAG] + 305 [parallel_tests[test_index][len('gslib.tests.test_'):]], 306 stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)) 307 test_index += 1 308 process_done.append(False) 309 if time.time() - last_log_time > 5: 310 print ('Created %d new processes (total %d/%d created)' % 311 (test_index - orig_test_index, len(process_list), 312 len(parallel_tests))) 313 last_log_time = time.time() 314 if test_index == len(parallel_tests): 315 print ('Test process creation finished (%d/%d created)' % 316 (len(process_list), len(parallel_tests))) 317 return test_index 318 319 320 class TestCommand(Command): 321 """Implementation of gsutil test command.""" 322 323 # Command specification. See base class for documentation. 324 command_spec = Command.CreateCommandSpec( 325 'test', 326 command_name_aliases=[], 327 usage_synopsis=_SYNOPSIS, 328 min_args=0, 329 max_args=NO_MAX, 330 supported_sub_args='uflp:sc', 331 file_url_ok=True, 332 provider_url_ok=False, 333 urls_start_arg=0, 334 supported_private_args=[_SEQUENTIAL_ISOLATION_FLAG] 335 ) 336 # Help specification. See help_provider.py for documentation. 337 help_spec = Command.HelpSpec( 338 help_name='test', 339 help_name_aliases=[], 340 help_type='command_help', 341 help_one_line_summary='Run gsutil tests', 342 help_text=_DETAILED_HELP_TEXT, 343 subcommand_help_text={}, 344 ) 345 346 def RunParallelTests(self, parallel_integration_tests, 347 max_parallel_tests, coverage_filename): 348 """Executes the parallel/isolated portion of the test suite. 349 350 Args: 351 parallel_integration_tests: List of tests to execute. 352 max_parallel_tests: Maximum number of parallel tests to run at once. 353 coverage_filename: If not None, filename for coverage output. 354 355 Returns: 356 (int number of test failures, float elapsed time) 357 """ 358 process_list = [] 359 process_done = [] 360 process_results = [] # Tuples of (name, return code, stdout, stderr) 361 num_parallel_failures = 0 362 # Number of logging cycles we ran with no progress. 363 progress_less_logging_cycles = 0 364 completed_as_of_last_log = 0 365 num_parallel_tests = len(parallel_integration_tests) 366 parallel_start_time = last_log_time = time.time() 367 test_index = CreateTestProcesses( 368 parallel_integration_tests, 0, process_list, process_done, 369 max_parallel_tests, root_coverage_file=coverage_filename) 370 while len(process_results) < num_parallel_tests: 371 for proc_num in xrange(len(process_list)): 372 if process_done[proc_num] or process_list[proc_num].poll() is None: 373 continue 374 process_done[proc_num] = True 375 stdout, stderr = process_list[proc_num].communicate() 376 process_list[proc_num].stdout.close() 377 process_list[proc_num].stderr.close() 378 # TODO: Differentiate test failures from errors. 379 if process_list[proc_num].returncode != 0: 380 num_parallel_failures += 1 381 process_results.append(TestProcessData( 382 name=parallel_integration_tests[proc_num], 383 return_code=process_list[proc_num].returncode, 384 stdout=stdout, stderr=stderr)) 385 if len(process_list) < num_parallel_tests: 386 test_index = CreateTestProcesses( 387 parallel_integration_tests, test_index, process_list, 388 process_done, max_parallel_tests, 389 root_coverage_file=coverage_filename) 390 if len(process_results) < num_parallel_tests: 391 if time.time() - last_log_time > 5: 392 print '%d/%d finished - %d failures' % ( 393 len(process_results), num_parallel_tests, num_parallel_failures) 394 if len(process_results) == completed_as_of_last_log: 395 progress_less_logging_cycles += 1 396 else: 397 completed_as_of_last_log = len(process_results) 398 # A process completed, so we made progress. 399 progress_less_logging_cycles = 0 400 if progress_less_logging_cycles > 4: 401 # Ran 5 or more logging cycles with no progress, let the user 402 # know which tests are running slowly or hanging. 403 still_running = [] 404 for proc_num in xrange(len(process_list)): 405 if not process_done[proc_num]: 406 still_running.append(parallel_integration_tests[proc_num]) 407 print 'Still running: %s' % still_running 408 # TODO: Terminate still-running processes if they 409 # hang for a long time. 410 last_log_time = time.time() 411 time.sleep(1) 412 process_run_finish_time = time.time() 413 if num_parallel_failures: 414 for result in process_results: 415 if result.return_code != 0: 416 new_stderr = result.stderr.split('\n') 417 print 'Results for failed test %s:' % result.name 418 for line in new_stderr: 419 print line 420 421 return (num_parallel_failures, 422 (process_run_finish_time - parallel_start_time)) 423 424 def PrintTestResults(self, num_sequential_tests, sequential_success, 425 sequential_time_elapsed, 426 num_parallel_tests, num_parallel_failures, 427 parallel_time_elapsed): 428 """Prints test results for parallel and sequential tests.""" 429 # TODO: Properly track test skips. 430 print 'Parallel tests complete. Success: %s Fail: %s' % ( 431 num_parallel_tests - num_parallel_failures, num_parallel_failures) 432 print ( 433 'Ran %d tests in %.3fs (%d sequential in %.3fs, %d parallel in %.3fs)' 434 % (num_parallel_tests + num_sequential_tests, 435 float(sequential_time_elapsed + parallel_time_elapsed), 436 num_sequential_tests, 437 float(sequential_time_elapsed), 438 num_parallel_tests, 439 float(parallel_time_elapsed))) 440 print 441 442 if not num_parallel_failures and sequential_success: 443 print 'OK' 444 else: 445 if num_parallel_failures: 446 print 'FAILED (parallel tests)' 447 if not sequential_success: 448 print 'FAILED (sequential tests)' 449 450 def RunCommand(self): 451 """Command entry point for the test command.""" 452 if not unittest: 453 raise CommandException('On Python 2.6, the unittest2 module is required ' 454 'to run the gsutil tests.') 455 456 failfast = False 457 list_tests = False 458 max_parallel_tests = _DEFAULT_TEST_PARALLEL_PROCESSES 459 perform_coverage = False 460 sequential_only = False 461 if self.sub_opts: 462 for o, a in self.sub_opts: 463 if o == '-c': 464 perform_coverage = True 465 elif o == '-f': 466 failfast = True 467 elif o == '-l': 468 list_tests = True 469 elif o == ('--' + _SEQUENTIAL_ISOLATION_FLAG): 470 # Called to isolate a single test in a separate process. 471 # Don't try to isolate it again (would lead to an infinite loop). 472 sequential_only = True 473 elif o == '-p': 474 max_parallel_tests = long(a) 475 elif o == '-s': 476 if not tests.util.HAS_S3_CREDS: 477 raise CommandException('S3 tests require S3 credentials. Please ' 478 'add appropriate credentials to your .boto ' 479 'file and re-run.') 480 tests.util.RUN_S3_TESTS = True 481 elif o == '-u': 482 tests.util.RUN_INTEGRATION_TESTS = False 483 484 if perform_coverage and not coverage: 485 raise CommandException( 486 'Coverage has been requested but the coverage module was not found. ' 487 'You can install it with "pip install coverage".') 488 489 if (tests.util.RUN_S3_TESTS and 490 max_parallel_tests > _DEFAULT_S3_TEST_PARALLEL_PROCESSES): 491 self.logger.warn( 492 'Reducing parallel tests to %d due to S3 maximum bucket ' 493 'limitations.', _DEFAULT_S3_TEST_PARALLEL_PROCESSES) 494 max_parallel_tests = _DEFAULT_S3_TEST_PARALLEL_PROCESSES 495 496 test_names = sorted(GetTestNames()) 497 if list_tests and not self.args: 498 print 'Found %d test names:' % len(test_names) 499 print ' ', '\n '.join(sorted(test_names)) 500 return 0 501 502 # Set list of commands to test if supplied. 503 if self.args: 504 commands_to_test = [] 505 for name in self.args: 506 if name in test_names or name.split('.')[0] in test_names: 507 commands_to_test.append('gslib.tests.test_%s' % name) 508 else: 509 commands_to_test.append(name) 510 else: 511 commands_to_test = ['gslib.tests.test_%s' % name for name in test_names] 512 513 # Installs a ctrl-c handler that tries to cleanly tear down tests. 514 unittest.installHandler() 515 516 loader = unittest.TestLoader() 517 518 if commands_to_test: 519 try: 520 suite = loader.loadTestsFromNames(commands_to_test) 521 except (ImportError, AttributeError) as e: 522 raise CommandException('Invalid test argument name: %s' % e) 523 524 if list_tests: 525 test_names = GetTestNamesFromSuites(suite) 526 print 'Found %d test names:' % len(test_names) 527 print ' ', '\n '.join(sorted(test_names)) 528 return 0 529 530 if logging.getLogger().getEffectiveLevel() <= logging.INFO: 531 verbosity = 1 532 else: 533 verbosity = 2 534 logging.disable(logging.ERROR) 535 536 if perform_coverage: 537 # We want to run coverage over the gslib module, but filter out the test 538 # modules and any third-party code. We also filter out anything under the 539 # temporary directory. Otherwise, the gsutil update test (which copies 540 # code to the temporary directory) gets included in the output. 541 coverage_controller = coverage.coverage( 542 source=['gslib'], omit=['gslib/third_party/*', 'gslib/tests/*', 543 tempfile.gettempdir() + '*']) 544 coverage_controller.erase() 545 coverage_controller.start() 546 547 num_parallel_failures = 0 548 sequential_success = False 549 550 (sequential_tests, isolated_tests, 551 parallel_unit_tests, parallel_integration_tests) = ( 552 SplitParallelizableTestSuite(suite)) 553 554 # Since parallel integration tests are run in a separate process, they 555 # won't get the override to tests.util, so skip them here. 556 if not tests.util.RUN_INTEGRATION_TESTS: 557 parallel_integration_tests = [] 558 559 logging.debug('Sequential tests to run: %s', sequential_tests) 560 logging.debug('Isolated tests to run: %s', isolated_tests) 561 logging.debug('Parallel unit tests to run: %s', parallel_unit_tests) 562 logging.debug('Parallel integration tests to run: %s', 563 parallel_integration_tests) 564 565 # If we're running an already-isolated test (spawned in isolation by a 566 # previous test process), or we have no parallel tests to run, 567 # just run sequentially. For now, unit tests are always run sequentially. 568 run_tests_sequentially = (sequential_only or 569 (len(parallel_integration_tests) <= 1 570 and not isolated_tests)) 571 572 if run_tests_sequentially: 573 total_tests = suite.countTestCases() 574 resultclass = MakeCustomTestResultClass(total_tests) 575 576 runner = unittest.TextTestRunner(verbosity=verbosity, 577 resultclass=resultclass, 578 failfast=failfast) 579 ret = runner.run(suite) 580 sequential_success = ret.wasSuccessful() 581 else: 582 if max_parallel_tests == 1: 583 # We can't take advantage of parallelism, though we may have tests that 584 # need isolation. 585 sequential_tests += parallel_integration_tests 586 parallel_integration_tests = [] 587 588 sequential_start_time = time.time() 589 # TODO: For now, run unit tests sequentially because they are fast. 590 # We could potentially shave off several seconds of execution time 591 # by executing them in parallel with the integration tests. 592 if len(sequential_tests) + len(parallel_unit_tests): 593 print 'Running %d tests sequentially.' % (len(sequential_tests) + 594 len(parallel_unit_tests)) 595 sequential_tests_to_run = sequential_tests + parallel_unit_tests 596 suite = loader.loadTestsFromNames( 597 sorted([test_name for test_name in sequential_tests_to_run])) 598 num_sequential_tests = suite.countTestCases() 599 resultclass = MakeCustomTestResultClass(num_sequential_tests) 600 runner = unittest.TextTestRunner(verbosity=verbosity, 601 resultclass=resultclass, 602 failfast=failfast) 603 604 ret = runner.run(suite) 605 sequential_success = ret.wasSuccessful() 606 else: 607 num_sequential_tests = 0 608 sequential_success = True 609 sequential_time_elapsed = time.time() - sequential_start_time 610 611 # At this point, all tests get their own process so just treat the 612 # isolated tests as parallel tests. 613 parallel_integration_tests += isolated_tests 614 num_parallel_tests = len(parallel_integration_tests) 615 616 if not num_parallel_tests: 617 pass 618 else: 619 num_processes = min(max_parallel_tests, num_parallel_tests) 620 if num_parallel_tests > 1 and max_parallel_tests > 1: 621 message = 'Running %d tests in parallel mode (%d processes).' 622 if num_processes > _DEFAULT_TEST_PARALLEL_PROCESSES: 623 message += ( 624 ' Please be patient while your CPU is incinerated. ' 625 'If your machine becomes unresponsive, consider reducing ' 626 'the amount of parallel test processes by running ' 627 '\'gsutil test -p <num_processes>\'.') 628 print ('\n'.join(textwrap.wrap( 629 message % (num_parallel_tests, num_processes)))) 630 else: 631 print ('Running %d tests sequentially in isolated processes.' % 632 num_parallel_tests) 633 (num_parallel_failures, parallel_time_elapsed) = self.RunParallelTests( 634 parallel_integration_tests, max_parallel_tests, 635 coverage_controller.data.filename if perform_coverage else None) 636 self.PrintTestResults( 637 num_sequential_tests, sequential_success, 638 sequential_time_elapsed, 639 num_parallel_tests, num_parallel_failures, 640 parallel_time_elapsed) 641 642 if perform_coverage: 643 coverage_controller.stop() 644 coverage_controller.combine() 645 coverage_controller.save() 646 print ('Coverage information was saved to: %s' % 647 coverage_controller.data.filename) 648 649 if sequential_success and not num_parallel_failures: 650 ResetFailureCount() 651 return 0 652 return 1 653