1 #!/usr/bin/python 2 3 import logging, os, shutil, sys, time, StringIO 4 import common 5 6 from autotest_lib.client.bin import job, boottool, config, sysinfo, harness 7 from autotest_lib.client.bin import test, xen, kernel, utils 8 from autotest_lib.client.common_lib import packages, error, log 9 from autotest_lib.client.common_lib import logging_manager, logging_config 10 from autotest_lib.client.common_lib import base_job_unittest 11 from autotest_lib.client.common_lib.test_utils import mock, unittest 12 13 14 class job_test_case(unittest.TestCase): 15 """Generic job TestCase class that defines a standard job setUp and 16 tearDown, with some standard stubs.""" 17 18 job_class = job.base_client_job 19 20 def setUp(self): 21 self.god = mock.mock_god(ut=self) 22 self.god.stub_with(job.base_client_job, '_get_environ_autodir', 23 classmethod(lambda cls: '/adir')) 24 self.job = self.job_class.__new__(self.job_class) 25 self.job._job_directory = base_job_unittest.stub_job_directory 26 27 28 def tearDown(self): 29 self.god.unstub_all() 30 31 32 class test_find_base_directories( 33 base_job_unittest.test_find_base_directories.generic_tests, 34 job_test_case): 35 36 def test_autodir_equals_clientdir(self): 37 autodir, clientdir, _ = self.job._find_base_directories() 38 self.assertEqual(autodir, '/adir') 39 self.assertEqual(clientdir, '/adir') 40 41 42 def test_serverdir_is_none(self): 43 _, _, serverdir = self.job._find_base_directories() 44 self.assertEqual(serverdir, None) 45 46 47 class abstract_test_init(base_job_unittest.test_init.generic_tests): 48 """Generic client job mixin used when defining variations on the 49 job.__init__ generic tests.""" 50 OPTIONAL_ATTRIBUTES = ( 51 base_job_unittest.test_init.generic_tests.OPTIONAL_ATTRIBUTES 52 - set(['control', 'bootloader', 'harness'])) 53 54 55 class test_init_minimal_options(abstract_test_init, job_test_case): 56 def call_init(self): 57 # TODO(jadmanski): refactor more of the __init__ code to not need to 58 # stub out countless random APIs 59 self.god.stub_function_to_return(job.os, 'mkdir', None) 60 self.god.stub_function_to_return(job.os.path, 'exists', True) 61 self.god.stub_function_to_return(self.job, '_load_state', None) 62 self.god.stub_function_to_return(self.job, 'record', None) 63 self.god.stub_function_to_return(job.shutil, 'copyfile', None) 64 self.god.stub_function_to_return(job.logging_manager, 65 'configure_logging', None) 66 class manager: 67 def start_logging(self): 68 return None 69 self.god.stub_function_to_return(job.logging_manager, 70 'get_logging_manager', manager()) 71 class stub_sysinfo: 72 def log_per_reboot_data(self): 73 return None 74 self.god.stub_function_to_return(job.sysinfo, 'sysinfo', 75 stub_sysinfo()) 76 class stub_harness: 77 run_start = lambda self: None 78 self.god.stub_function_to_return(job.harness, 'select', stub_harness()) 79 self.god.stub_function_to_return(job.boottool, 'boottool', object()) 80 class options: 81 tag = '' 82 verbose = False 83 cont = False 84 harness = 'stub' 85 harness_args = None 86 hostname = None 87 user = None 88 log = False 89 args = '' 90 output_dir = '' 91 tap_report = None 92 self.god.stub_function_to_return(job.utils, 'drop_caches', None) 93 94 self.job._job_state = base_job_unittest.stub_job_state 95 self.job.__init__('/control', options) 96 97 98 class dummy(object): 99 """A simple placeholder for attributes""" 100 pass 101 102 103 class first_line_comparator(mock.argument_comparator): 104 def __init__(self, first_line): 105 self.first_line = first_line 106 107 108 def is_satisfied_by(self, parameter): 109 return self.first_line == parameter.splitlines()[0] 110 111 112 class test_base_job(unittest.TestCase): 113 def setUp(self): 114 # make god 115 self.god = mock.mock_god(ut=self) 116 117 # need to set some environ variables 118 self.autodir = "autodir" 119 os.environ['AUTODIR'] = self.autodir 120 121 # set up some variables 122 self.control = "control" 123 self.jobtag = "jobtag" 124 125 # get rid of stdout and logging 126 sys.stdout = StringIO.StringIO() 127 logging_manager.configure_logging(logging_config.TestingConfig()) 128 logging.disable(logging.CRITICAL) 129 def dummy_configure_logging(*args, **kwargs): 130 pass 131 self.god.stub_with(logging_manager, 'configure_logging', 132 dummy_configure_logging) 133 real_get_logging_manager = logging_manager.get_logging_manager 134 def get_logging_manager_no_fds(manage_stdout_and_stderr=False, 135 redirect_fds=False): 136 return real_get_logging_manager(manage_stdout_and_stderr, False) 137 self.god.stub_with(logging_manager, 'get_logging_manager', 138 get_logging_manager_no_fds) 139 140 # stub out some stuff 141 self.god.stub_function(os.path, 'exists') 142 self.god.stub_function(os.path, 'isdir') 143 self.god.stub_function(os, 'makedirs') 144 self.god.stub_function(os, 'mkdir') 145 self.god.stub_function(os, 'remove') 146 self.god.stub_function(shutil, 'rmtree') 147 self.god.stub_function(shutil, 'copyfile') 148 self.god.stub_function(job, 'open') 149 self.god.stub_function(utils, 'system') 150 self.god.stub_function(utils, 'drop_caches') 151 self.god.stub_function(harness, 'select') 152 self.god.stub_function(sysinfo, 'log_per_reboot_data') 153 154 self.god.stub_class(config, 'config') 155 self.god.stub_class(job.local_host, 'LocalHost') 156 self.god.stub_class(boottool, 'boottool') 157 self.god.stub_class(sysinfo, 'sysinfo') 158 159 self.god.stub_class_method(job.base_client_job, 160 '_cleanup_debugdir_files') 161 self.god.stub_class_method(job.base_client_job, '_cleanup_results_dir') 162 163 self.god.stub_with(job.base_job.job_directory, '_ensure_valid', 164 lambda *_: None) 165 166 167 def tearDown(self): 168 sys.stdout = sys.__stdout__ 169 self.god.unstub_all() 170 171 172 def _setup_pre_record_init(self, cont): 173 self.god.stub_function(self.job, '_load_state') 174 175 resultdir = os.path.join(self.autodir, 'results', self.jobtag) 176 tmpdir = os.path.join(self.autodir, 'tmp') 177 if not cont: 178 job.base_client_job._cleanup_debugdir_files.expect_call() 179 job.base_client_job._cleanup_results_dir.expect_call() 180 181 self.job._load_state.expect_call() 182 183 my_harness = self.god.create_mock_class(harness.harness, 184 'my_harness') 185 harness.select.expect_call(None, 186 self.job, 187 None).and_return(my_harness) 188 189 return resultdir, my_harness 190 191 192 def _setup_post_record_init(self, cont, resultdir, my_harness): 193 # now some specific stubs 194 self.god.stub_function(self.job, 'config_get') 195 self.god.stub_function(self.job, 'config_set') 196 self.god.stub_function(self.job, 'record') 197 198 # other setup 199 results = os.path.join(self.autodir, 'results') 200 download = os.path.join(self.autodir, 'tests', 'download') 201 pkgdir = os.path.join(self.autodir, 'packages') 202 203 utils.drop_caches.expect_call() 204 job_sysinfo = sysinfo.sysinfo.expect_new(resultdir) 205 if not cont: 206 os.path.exists.expect_call(download).and_return(False) 207 os.mkdir.expect_call(download) 208 shutil.copyfile.expect_call(mock.is_string_comparator(), 209 os.path.join(resultdir, 'control')) 210 211 self.config = config.config.expect_new(self.job) 212 self.job.config_get.expect_call( 213 'boottool.executable').and_return(None) 214 bootloader = boottool.boottool.expect_new(None) 215 job.local_host.LocalHost.expect_new(hostname='localhost', 216 bootloader=bootloader) 217 job_sysinfo.log_per_reboot_data.expect_call() 218 if not cont: 219 self.job.record.expect_call('START', None, None) 220 221 my_harness.run_start.expect_call() 222 223 self.god.stub_function(utils, 'read_one_line') 224 utils.read_one_line.expect_call('/proc/cmdline').and_return( 225 'blah more-blah root=lala IDENT=81234567 blah-again console=tty1') 226 self.job.config_set.expect_call('boot.default_args', 227 'more-blah console=tty1') 228 229 230 def construct_job(self, cont): 231 # will construct class instance using __new__ 232 self.job = job.base_client_job.__new__(job.base_client_job) 233 234 # record 235 resultdir, my_harness = self._setup_pre_record_init(cont) 236 self._setup_post_record_init(cont, resultdir, my_harness) 237 238 # finish constructor 239 options = dummy() 240 options.tag = self.jobtag 241 options.cont = cont 242 options.harness = None 243 options.harness_args = None 244 options.log = False 245 options.verbose = False 246 options.hostname = 'localhost' 247 options.user = 'my_user' 248 options.args = '' 249 options.output_dir = '' 250 options.tap_report = None 251 self.job.__init__(self.control, options, 252 extra_copy_cmdline=['more-blah']) 253 254 # check 255 self.god.check_playback() 256 257 258 def get_partition_mock(self, devname): 259 """ 260 Create a mock of a partition object and return it. 261 """ 262 class mock(object): 263 device = devname 264 get_mountpoint = self.god.create_mock_function('get_mountpoint') 265 return mock 266 267 268 def test_constructor_first_run(self): 269 self.construct_job(False) 270 271 272 def test_constructor_continuation(self): 273 self.construct_job(True) 274 275 276 def test_constructor_post_record_failure(self): 277 """ 278 Test post record initialization failure. 279 """ 280 self.job = job.base_client_job.__new__(job.base_client_job) 281 options = dummy() 282 options.tag = self.jobtag 283 options.cont = False 284 options.harness = None 285 options.harness_args = None 286 options.log = False 287 options.verbose = False 288 options.hostname = 'localhost' 289 options.user = 'my_user' 290 options.args = '' 291 options.output_dir = '' 292 options.tap_report = None 293 error = Exception('fail') 294 295 self.god.stub_function(self.job, '_post_record_init') 296 self.god.stub_function(self.job, 'record') 297 298 self._setup_pre_record_init(False) 299 self.job._post_record_init.expect_call( 300 self.control, options, True, ['more-blah']).and_raises(error) 301 self.job.record.expect_call( 302 'ABORT', None, None,'client.bin.job.__init__ failed: %s' % 303 str(error)) 304 305 self.assertRaises( 306 Exception, self.job.__init__, self.control, options, 307 drop_caches=True, extra_copy_cmdline=['more-blah']) 308 309 # check 310 self.god.check_playback() 311 312 313 def test_relative_path(self): 314 self.construct_job(True) 315 dummy = "asdf" 316 ret = self.job.relative_path(os.path.join(self.job.resultdir, dummy)) 317 self.assertEquals(ret, dummy) 318 319 320 def test_control_functions(self): 321 self.construct_job(True) 322 control_file = "blah" 323 self.job.control_set(control_file) 324 self.assertEquals(self.job.control_get(), os.path.abspath(control_file)) 325 326 327 def test_harness_select(self): 328 self.construct_job(True) 329 330 # record 331 which = "which" 332 harness_args = '' 333 harness.select.expect_call(which, self.job, 334 harness_args).and_return(None) 335 336 # run and test 337 self.job.harness_select(which, harness_args) 338 self.god.check_playback() 339 340 341 def test_config_set(self): 342 self.construct_job(True) 343 344 # unstub config_set 345 self.god.unstub(self.job, 'config_set') 346 # record 347 name = "foo" 348 val = 10 349 self.config.set.expect_call(name, val) 350 351 # run and test 352 self.job.config_set(name, val) 353 self.god.check_playback() 354 355 356 def test_config_get(self): 357 self.construct_job(True) 358 359 # unstub config_get 360 self.god.unstub(self.job, 'config_get') 361 # record 362 name = "foo" 363 val = 10 364 self.config.get.expect_call(name).and_return(val) 365 366 # run and test 367 self.job.config_get(name) 368 self.god.check_playback() 369 370 371 def test_setup_dirs_raise(self): 372 self.construct_job(True) 373 374 # setup 375 results_dir = 'foo' 376 tmp_dir = 'bar' 377 378 # record 379 os.path.exists.expect_call(tmp_dir).and_return(True) 380 os.path.isdir.expect_call(tmp_dir).and_return(False) 381 382 # test 383 self.assertRaises(ValueError, self.job.setup_dirs, results_dir, tmp_dir) 384 self.god.check_playback() 385 386 387 def test_setup_dirs(self): 388 self.construct_job(True) 389 390 # setup 391 results_dir1 = os.path.join(self.job.resultdir, 'build') 392 results_dir2 = os.path.join(self.job.resultdir, 'build.2') 393 results_dir3 = os.path.join(self.job.resultdir, 'build.3') 394 tmp_dir = 'bar' 395 396 # record 397 os.path.exists.expect_call(tmp_dir).and_return(False) 398 os.mkdir.expect_call(tmp_dir) 399 os.path.isdir.expect_call(tmp_dir).and_return(True) 400 os.path.exists.expect_call(results_dir1).and_return(True) 401 os.path.exists.expect_call(results_dir2).and_return(True) 402 os.path.exists.expect_call(results_dir3).and_return(False) 403 os.path.exists.expect_call(results_dir3).and_return(False) 404 os.mkdir.expect_call(results_dir3) 405 406 # test 407 self.assertEqual(self.job.setup_dirs(None, tmp_dir), 408 (results_dir3, tmp_dir)) 409 self.god.check_playback() 410 411 412 def test_xen(self): 413 self.construct_job(True) 414 415 # setup 416 self.god.stub_function(self.job, "setup_dirs") 417 self.god.stub_class(xen, "xen") 418 results = 'results_dir' 419 tmp = 'tmp' 420 build = 'xen' 421 base_tree = object() 422 423 # record 424 self.job.setup_dirs.expect_call(results, 425 tmp).and_return((results, tmp)) 426 myxen = xen.xen.expect_new(self.job, base_tree, results, tmp, build, 427 False, None) 428 429 # run job and check 430 axen = self.job.xen(base_tree, results, tmp) 431 self.god.check_playback() 432 self.assertEquals(myxen, axen) 433 434 435 def test_kernel_rpm(self): 436 self.construct_job(True) 437 438 # setup 439 self.god.stub_function(self.job, "setup_dirs") 440 self.god.stub_class(kernel, "rpm_kernel") 441 self.god.stub_function(kernel, "preprocess_path") 442 self.god.stub_function(self.job.pkgmgr, "fetch_pkg") 443 self.god.stub_function(utils, "get_os_vendor") 444 results = 'results_dir' 445 tmp = 'tmp' 446 build = 'xen' 447 path = "somepath.rpm" 448 packages_dir = os.path.join("autodir/packages", path) 449 450 # record 451 self.job.setup_dirs.expect_call(results, 452 tmp).and_return((results, tmp)) 453 kernel.preprocess_path.expect_call(path).and_return(path) 454 os.path.exists.expect_call(path).and_return(False) 455 self.job.pkgmgr.fetch_pkg.expect_call(path, packages_dir, repo_url='') 456 utils.get_os_vendor.expect_call() 457 mykernel = kernel.rpm_kernel.expect_new(self.job, [packages_dir], 458 results) 459 460 # check 461 akernel = self.job.kernel(path, results, tmp) 462 self.god.check_playback() 463 self.assertEquals(mykernel, akernel) 464 465 466 def test_kernel(self): 467 self.construct_job(True) 468 469 # setup 470 self.god.stub_function(self.job, "setup_dirs") 471 self.god.stub_class(kernel, "kernel") 472 self.god.stub_function(kernel, "preprocess_path") 473 results = 'results_dir' 474 tmp = 'tmp' 475 build = 'linux' 476 path = "somepath.deb" 477 478 # record 479 self.job.setup_dirs.expect_call(results, 480 tmp).and_return((results, tmp)) 481 kernel.preprocess_path.expect_call(path).and_return(path) 482 mykernel = kernel.kernel.expect_new(self.job, path, results, tmp, 483 build, False) 484 485 # check 486 akernel = self.job.kernel(path, results, tmp) 487 self.god.check_playback() 488 self.assertEquals(mykernel, akernel) 489 490 491 def test_run_test_logs_test_error_from_unhandled_error(self): 492 self.construct_job(True) 493 494 # set up stubs 495 self.god.stub_function(self.job.pkgmgr, 'get_package_name') 496 self.god.stub_function(self.job, "_runtest") 497 498 # create an unhandled error object 499 class MyError(error.TestError): 500 pass 501 real_error = MyError("this is the real error message") 502 unhandled_error = error.UnhandledTestError(real_error) 503 504 # set up the recording 505 testname = "error_test" 506 outputdir = os.path.join(self.job.resultdir, testname) 507 self.job.pkgmgr.get_package_name.expect_call( 508 testname, 'test').and_return(("", testname)) 509 os.path.exists.expect_call(outputdir).and_return(False) 510 self.job.record.expect_call("START", testname, testname, 511 optional_fields=None) 512 self.job._runtest.expect_call(testname, "", None, (), {}).and_raises( 513 unhandled_error) 514 self.job.record.expect_call("ERROR", testname, testname, 515 first_line_comparator(str(real_error))) 516 self.job.record.expect_call("END ERROR", testname, testname) 517 self.job.harness.run_test_complete.expect_call() 518 utils.drop_caches.expect_call() 519 520 # run and check 521 self.job.run_test(testname) 522 self.god.check_playback() 523 524 525 def test_run_test_logs_non_test_error_from_unhandled_error(self): 526 self.construct_job(True) 527 528 # set up stubs 529 self.god.stub_function(self.job.pkgmgr, 'get_package_name') 530 self.god.stub_function(self.job, "_runtest") 531 532 # create an unhandled error object 533 class MyError(Exception): 534 pass 535 real_error = MyError("this is the real error message") 536 unhandled_error = error.UnhandledTestError(real_error) 537 reason = first_line_comparator("Unhandled MyError: %s" % real_error) 538 539 # set up the recording 540 testname = "error_test" 541 outputdir = os.path.join(self.job.resultdir, testname) 542 self.job.pkgmgr.get_package_name.expect_call( 543 testname, 'test').and_return(("", testname)) 544 os.path.exists.expect_call(outputdir).and_return(False) 545 self.job.record.expect_call("START", testname, testname, 546 optional_fields=None) 547 self.job._runtest.expect_call(testname, "", None, (), {}).and_raises( 548 unhandled_error) 549 self.job.record.expect_call("ERROR", testname, testname, reason) 550 self.job.record.expect_call("END ERROR", testname, testname) 551 self.job.harness.run_test_complete.expect_call() 552 utils.drop_caches.expect_call() 553 554 # run and check 555 self.job.run_test(testname) 556 self.god.check_playback() 557 558 559 def test_report_reboot_failure(self): 560 self.construct_job(True) 561 562 # record 563 self.job.record.expect_call("ABORT", "sub", "reboot.verify", 564 "boot failure") 565 self.job.record.expect_call("END ABORT", "sub", "reboot", 566 optional_fields={"kernel": "2.6.15-smp"}) 567 568 # playback 569 self.job._record_reboot_failure("sub", "reboot.verify", "boot failure", 570 running_id="2.6.15-smp") 571 self.god.check_playback() 572 573 574 def _setup_check_post_reboot(self, mount_info, cpu_count): 575 # setup 576 self.god.stub_function(job.partition_lib, "get_partition_list") 577 self.god.stub_function(utils, "count_cpus") 578 579 part_list = [self.get_partition_mock("/dev/hda1"), 580 self.get_partition_mock("/dev/hdb1")] 581 mount_list = ["/mnt/hda1", "/mnt/hdb1"] 582 583 # record 584 job.partition_lib.get_partition_list.expect_call( 585 self.job, exclude_swap=False).and_return(part_list) 586 for i in xrange(len(part_list)): 587 part_list[i].get_mountpoint.expect_call().and_return(mount_list[i]) 588 if cpu_count is not None: 589 utils.count_cpus.expect_call().and_return(cpu_count) 590 self.job._state.set('client', 'mount_info', mount_info) 591 self.job._state.set('client', 'cpu_count', 8) 592 593 594 def test_check_post_reboot_success(self): 595 self.construct_job(True) 596 597 mount_info = set([("/dev/hda1", "/mnt/hda1"), 598 ("/dev/hdb1", "/mnt/hdb1")]) 599 self._setup_check_post_reboot(mount_info, 8) 600 601 # playback 602 self.job._check_post_reboot("sub") 603 self.god.check_playback() 604 605 606 def test_check_post_reboot_mounts_failure(self): 607 self.construct_job(True) 608 609 mount_info = set([("/dev/hda1", "/mnt/hda1")]) 610 self._setup_check_post_reboot(mount_info, None) 611 612 self.god.stub_function(self.job, "_record_reboot_failure") 613 self.job._record_reboot_failure.expect_call("sub", 614 "reboot.verify_config", "mounted partitions are different after" 615 " reboot (old entries: set([]), new entries: set([('/dev/hdb1'," 616 " '/mnt/hdb1')]))", running_id=None) 617 618 # playback 619 self.assertRaises(error.JobError, self.job._check_post_reboot, "sub") 620 self.god.check_playback() 621 622 623 def test_check_post_reboot_cpu_failure(self): 624 self.construct_job(True) 625 626 mount_info = set([("/dev/hda1", "/mnt/hda1"), 627 ("/dev/hdb1", "/mnt/hdb1")]) 628 self._setup_check_post_reboot(mount_info, 4) 629 630 self.god.stub_function(self.job, "_record_reboot_failure") 631 self.job._record_reboot_failure.expect_call( 632 'sub', 'reboot.verify_config', 633 'Number of CPUs changed after reboot (old count: 8, new count: 4)', 634 running_id=None) 635 636 # playback 637 self.assertRaises(error.JobError, self.job._check_post_reboot, "sub") 638 self.god.check_playback() 639 640 641 def test_end_boot(self): 642 self.construct_job(True) 643 self.god.stub_function(self.job, "_check_post_reboot") 644 645 # set up the job class 646 self.job._record_prefix = '\t\t' 647 648 self.job._check_post_reboot.expect_call("sub", running_id=None) 649 self.job.record.expect_call("END GOOD", "sub", "reboot", 650 optional_fields={"kernel": "2.6.15-smp", 651 "patch0": "patchname"}) 652 653 # run test 654 self.job.end_reboot("sub", "2.6.15-smp", ["patchname"]) 655 self.god.check_playback() 656 657 658 def test_end_boot_and_verify_success(self): 659 self.construct_job(True) 660 self.god.stub_function(self.job, "_check_post_reboot") 661 662 # set up the job class 663 self.job._record_prefix = '\t\t' 664 665 self.god.stub_function(utils, "running_os_ident") 666 utils.running_os_ident.expect_call().and_return("2.6.15-smp") 667 668 utils.read_one_line.expect_call("/proc/cmdline").and_return( 669 "blah more-blah root=lala IDENT=81234567 blah-again") 670 671 self.god.stub_function(utils, "running_os_full_version") 672 running_id = "2.6.15-smp" 673 utils.running_os_full_version.expect_call().and_return(running_id) 674 675 self.job.record.expect_call("GOOD", "sub", "reboot.verify", 676 running_id) 677 self.job._check_post_reboot.expect_call("sub", running_id=running_id) 678 self.job.record.expect_call("END GOOD", "sub", "reboot", 679 optional_fields={"kernel": running_id}) 680 681 # run test 682 self.job.end_reboot_and_verify(81234567, "2.6.15-smp", "sub") 683 self.god.check_playback() 684 685 686 def test_end_boot_and_verify_failure(self): 687 self.construct_job(True) 688 self.god.stub_function(self.job, "_record_reboot_failure") 689 690 # set up the job class 691 self.job._record_prefix = '\t\t' 692 693 self.god.stub_function(utils, "running_os_ident") 694 utils.running_os_ident.expect_call().and_return("2.6.15-smp") 695 696 utils.read_one_line.expect_call("/proc/cmdline").and_return( 697 "blah more-blah root=lala IDENT=81234567 blah-again") 698 699 self.job._record_reboot_failure.expect_call("sub", "reboot.verify", 700 "boot failure", running_id="2.6.15-smp") 701 702 # run test 703 self.assertRaises(error.JobError, self.job.end_reboot_and_verify, 704 91234567, "2.6.16-smp", "sub") 705 self.god.check_playback() 706 707 708 def test_parse_args(self): 709 test_set = {"a='foo bar baz' b='moo apt'": 710 ["a='foo bar baz'", "b='moo apt'"], 711 "a='foo bar baz' only=gah": 712 ["a='foo bar baz'", "only=gah"], 713 "a='b c d' no=argh": 714 ["a='b c d'", "no=argh"]} 715 for t in test_set: 716 parsed_args = job.base_client_job._parse_args(t) 717 expected_args = test_set[t] 718 self.assertEqual(parsed_args, expected_args) 719 720 721 def test_run_test_timeout_parameter_is_propagated(self): 722 self.construct_job(True) 723 724 # set up stubs 725 self.god.stub_function(self.job.pkgmgr, 'get_package_name') 726 self.god.stub_function(self.job, "_runtest") 727 728 # create an unhandled error object 729 #class MyError(error.TestError): 730 # pass 731 #real_error = MyError("this is the real error message") 732 #unhandled_error = error.UnhandledTestError(real_error) 733 734 # set up the recording 735 testname = "test" 736 outputdir = os.path.join(self.job.resultdir, testname) 737 self.job.pkgmgr.get_package_name.expect_call( 738 testname, 'test').and_return(("", testname)) 739 os.path.exists.expect_call(outputdir).and_return(False) 740 timeout = 60 741 optional_fields = {} 742 optional_fields['timeout'] = timeout 743 self.job.record.expect_call("START", testname, testname, 744 optional_fields=optional_fields) 745 self.job._runtest.expect_call(testname, "", timeout, (), {}) 746 self.job.record.expect_call("GOOD", testname, testname, 747 "completed successfully") 748 self.job.record.expect_call("END GOOD", testname, testname) 749 self.job.harness.run_test_complete.expect_call() 750 utils.drop_caches.expect_call() 751 752 # run and check 753 self.job.run_test(testname, timeout=timeout) 754 self.god.check_playback() 755 756 757 if __name__ == "__main__": 758 unittest.main() 759