1 # pylint: disable-msg=C0111 2 3 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 4 # Use of this source code is governed by a BSD-style license that can be 5 # found in the LICENSE file. 6 """ 7 The main job wrapper for the server side. 8 9 This is the core infrastructure. Derived from the client side job.py 10 11 Copyright Martin J. Bligh, Andy Whitcroft 2007 12 """ 13 14 import getpass, os, sys, re, tempfile, time, select, platform 15 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno 16 from autotest_lib.client.bin import sysinfo 17 from autotest_lib.client.common_lib import base_job, global_config 18 from autotest_lib.client.common_lib import error, utils, packages 19 from autotest_lib.client.common_lib import logging_manager 20 from autotest_lib.server import test, subcommand, profilers 21 from autotest_lib.server import utils as server_utils 22 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 23 from autotest_lib.server.hosts import abstract_ssh, factory as host_factory 24 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils 25 26 27 INCREMENTAL_TKO_PARSING = global_config.global_config.get_config_value( 28 'autoserv', 'incremental_tko_parsing', type=bool, default=False) 29 30 def _control_segment_path(name): 31 """Get the pathname of the named control segment file.""" 32 server_dir = os.path.dirname(os.path.abspath(__file__)) 33 return os.path.join(server_dir, "control_segments", name) 34 35 36 CLIENT_CONTROL_FILENAME = 'control' 37 SERVER_CONTROL_FILENAME = 'control.srv' 38 MACHINES_FILENAME = '.machines' 39 40 CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper') 41 CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps') 42 CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo') 43 INSTALL_CONTROL_FILE = _control_segment_path('install') 44 CLEANUP_CONTROL_FILE = _control_segment_path('cleanup') 45 VERIFY_CONTROL_FILE = _control_segment_path('verify') 46 REPAIR_CONTROL_FILE = _control_segment_path('repair') 47 PROVISION_CONTROL_FILE = _control_segment_path('provision') 48 VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url') 49 RESET_CONTROL_FILE = _control_segment_path('reset') 50 51 # by default provide a stub that generates no site data 52 def _get_site_job_data_dummy(job): 53 return {} 54 55 56 class status_indenter(base_job.status_indenter): 57 """Provide a simple integer-backed status indenter.""" 58 def __init__(self): 59 self._indent = 0 60 61 62 @property 63 def indent(self): 64 return self._indent 65 66 67 def increment(self): 68 self._indent += 1 69 70 71 def decrement(self): 72 self._indent -= 1 73 74 75 def get_context(self): 76 """Returns a context object for use by job.get_record_context.""" 77 class context(object): 78 def __init__(self, indenter, indent): 79 self._indenter = indenter 80 self._indent = indent 81 def restore(self): 82 self._indenter._indent = self._indent 83 return context(self, self._indent) 84 85 86 class server_job_record_hook(object): 87 """The job.record hook for server job. Used to inject WARN messages from 88 the console or vlm whenever new logs are written, and to echo any logs 89 to INFO level logging. Implemented as a class so that it can use state to 90 block recursive calls, so that the hook can call job.record itself to 91 log WARN messages. 92 93 Depends on job._read_warnings and job._logger. 94 """ 95 def __init__(self, job): 96 self._job = job 97 self._being_called = False 98 99 100 def __call__(self, entry): 101 """A wrapper around the 'real' record hook, the _hook method, which 102 prevents recursion. This isn't making any effort to be threadsafe, 103 the intent is to outright block infinite recursion via a 104 job.record->_hook->job.record->_hook->job.record... chain.""" 105 if self._being_called: 106 return 107 self._being_called = True 108 try: 109 self._hook(self._job, entry) 110 finally: 111 self._being_called = False 112 113 114 @staticmethod 115 def _hook(job, entry): 116 """The core hook, which can safely call job.record.""" 117 entries = [] 118 # poll all our warning loggers for new warnings 119 for timestamp, msg in job._read_warnings(): 120 warning_entry = base_job.status_log_entry( 121 'WARN', None, None, msg, {}, timestamp=timestamp) 122 entries.append(warning_entry) 123 job.record_entry(warning_entry) 124 # echo rendered versions of all the status logs to info 125 entries.append(entry) 126 for entry in entries: 127 rendered_entry = job._logger.render_entry(entry) 128 logging.info(rendered_entry) 129 job._parse_status(rendered_entry) 130 131 132 class base_server_job(base_job.base_job): 133 """The server-side concrete implementation of base_job. 134 135 Optional properties provided by this implementation: 136 serverdir 137 138 num_tests_run 139 num_tests_failed 140 141 warning_manager 142 warning_loggers 143 """ 144 145 _STATUS_VERSION = 1 146 147 # TODO crbug.com/285395 eliminate ssh_verbosity_flag 148 def __init__(self, control, args, resultdir, label, user, machines, 149 client=False, parse_job='', 150 ssh_user=host_factory.DEFAULT_SSH_USER, 151 ssh_port=host_factory.DEFAULT_SSH_PORT, 152 ssh_pass=host_factory.DEFAULT_SSH_PASS, 153 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY, 154 ssh_options=host_factory.DEFAULT_SSH_OPTIONS, 155 test_retry=0, group_name='', 156 tag='', disable_sysinfo=False, 157 control_filename=SERVER_CONTROL_FILENAME, 158 parent_job_id=None, host_attributes=None, in_lab=False): 159 """ 160 Create a server side job object. 161 162 @param control: The pathname of the control file. 163 @param args: Passed to the control file. 164 @param resultdir: Where to throw the results. 165 @param label: Description of the job. 166 @param user: Username for the job (email address). 167 @param client: True if this is a client-side control file. 168 @param parse_job: string, if supplied it is the job execution tag that 169 the results will be passed through to the TKO parser with. 170 @param ssh_user: The SSH username. [root] 171 @param ssh_port: The SSH port number. [22] 172 @param ssh_pass: The SSH passphrase, if needed. 173 @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv', 174 '-vvv', or an empty string if not needed. 175 @param ssh_options: A string giving additional options that will be 176 included in ssh commands. 177 @param test_retry: The number of times to retry a test if the test did 178 not complete successfully. 179 @param group_name: If supplied, this will be written out as 180 host_group_name in the keyvals file for the parser. 181 @param tag: The job execution tag from the scheduler. [optional] 182 @param disable_sysinfo: Whether we should disable the sysinfo step of 183 tests for a modest shortening of test time. [optional] 184 @param control_filename: The filename where the server control file 185 should be written in the results directory. 186 @param parent_job_id: Job ID of the parent job. Default to None if the 187 job does not have a parent job. 188 @param host_attributes: Dict of host attributes passed into autoserv 189 via the command line. If specified here, these 190 attributes will apply to all machines. 191 @param in_lab: Boolean that indicates if this is running in the lab 192 environment. 193 """ 194 super(base_server_job, self).__init__(resultdir=resultdir, 195 test_retry=test_retry) 196 path = os.path.dirname(__file__) 197 self.test_retry = test_retry 198 self.control = control 199 self._uncollected_log_file = os.path.join(self.resultdir, 200 'uncollected_logs') 201 debugdir = os.path.join(self.resultdir, 'debug') 202 if not os.path.exists(debugdir): 203 os.mkdir(debugdir) 204 205 if user: 206 self.user = user 207 else: 208 self.user = getpass.getuser() 209 210 self.args = args 211 self.label = label 212 self.machines = machines 213 self._client = client 214 self.warning_loggers = set() 215 self.warning_manager = warning_manager() 216 self._ssh_user = ssh_user 217 self._ssh_port = ssh_port 218 self._ssh_pass = ssh_pass 219 self._ssh_verbosity_flag = ssh_verbosity_flag 220 self._ssh_options = ssh_options 221 self.tag = tag 222 self.last_boot_tag = None 223 self.hosts = set() 224 self.drop_caches = False 225 self.drop_caches_between_iterations = False 226 self._control_filename = control_filename 227 self._disable_sysinfo = disable_sysinfo 228 229 self.logging = logging_manager.get_logging_manager( 230 manage_stdout_and_stderr=True, redirect_fds=True) 231 subcommand.logging_manager_object = self.logging 232 233 self.sysinfo = sysinfo.sysinfo(self.resultdir) 234 self.profilers = profilers.profilers(self) 235 236 job_data = {'label' : label, 'user' : user, 237 'hostname' : ','.join(machines), 238 'drone' : platform.node(), 239 'status_version' : str(self._STATUS_VERSION), 240 'job_started' : str(int(time.time()))} 241 # Save parent job id to keyvals, so parser can retrieve the info and 242 # write to tko_jobs record. 243 if parent_job_id: 244 job_data['parent_job_id'] = parent_job_id 245 if group_name: 246 job_data['host_group_name'] = group_name 247 248 # only write these keyvals out on the first job in a resultdir 249 if 'job_started' not in utils.read_keyval(self.resultdir): 250 job_data.update(get_site_job_data(self)) 251 utils.write_keyval(self.resultdir, job_data) 252 253 self._parse_job = parse_job 254 self._using_parser = (INCREMENTAL_TKO_PARSING and self._parse_job 255 and len(machines) <= 1) 256 self.pkgmgr = packages.PackageManager( 257 self.autodir, run_function_dargs={'timeout':600}) 258 self.num_tests_run = 0 259 self.num_tests_failed = 0 260 261 self._register_subcommand_hooks() 262 263 # these components aren't usable on the server 264 self.bootloader = None 265 self.harness = None 266 267 # set up the status logger 268 self._indenter = status_indenter() 269 self._logger = base_job.status_logger( 270 self, self._indenter, 'status.log', 'status.log', 271 record_hook=server_job_record_hook(self)) 272 273 # Initialize a flag to indicate DUT failure during the test, e.g., 274 # unexpected reboot. 275 self.failed_with_device_error = False 276 277 self.parent_job_id = parent_job_id 278 self.in_lab = in_lab 279 afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10) 280 self.machine_dict_list = [] 281 for machine in self.machines: 282 host_attributes = host_attributes or {} 283 if self.in_lab: 284 host = afe.get_hosts(hostname=machine)[0] 285 host_attributes.update(host.attributes) 286 self.machine_dict_list.append( 287 {'hostname' : machine, 288 'host_attributes' : host_attributes}) 289 290 291 @classmethod 292 def _find_base_directories(cls): 293 """ 294 Determine locations of autodir, clientdir and serverdir. Assumes 295 that this file is located within serverdir and uses __file__ along 296 with relative paths to resolve the location. 297 """ 298 serverdir = os.path.abspath(os.path.dirname(__file__)) 299 autodir = os.path.normpath(os.path.join(serverdir, '..')) 300 clientdir = os.path.join(autodir, 'client') 301 return autodir, clientdir, serverdir 302 303 304 def _find_resultdir(self, resultdir, *args, **dargs): 305 """ 306 Determine the location of resultdir. For server jobs we expect one to 307 always be explicitly passed in to __init__, so just return that. 308 """ 309 if resultdir: 310 return os.path.normpath(resultdir) 311 else: 312 return None 313 314 315 def _get_status_logger(self): 316 """Return a reference to the status logger.""" 317 return self._logger 318 319 320 @staticmethod 321 def _load_control_file(path): 322 f = open(path) 323 try: 324 control_file = f.read() 325 finally: 326 f.close() 327 return re.sub('\r', '', control_file) 328 329 330 def _register_subcommand_hooks(self): 331 """ 332 Register some hooks into the subcommand modules that allow us 333 to properly clean up self.hosts created in forked subprocesses. 334 """ 335 def on_fork(cmd): 336 self._existing_hosts_on_fork = set(self.hosts) 337 def on_join(cmd): 338 new_hosts = self.hosts - self._existing_hosts_on_fork 339 for host in new_hosts: 340 host.close() 341 subcommand.subcommand.register_fork_hook(on_fork) 342 subcommand.subcommand.register_join_hook(on_join) 343 344 345 def init_parser(self): 346 """ 347 Start the continuous parsing of self.resultdir. This sets up 348 the database connection and inserts the basic job object into 349 the database if necessary. 350 """ 351 if not self._using_parser: 352 return 353 # redirect parser debugging to .parse.log 354 parse_log = os.path.join(self.resultdir, '.parse.log') 355 parse_log = open(parse_log, 'w', 0) 356 tko_utils.redirect_parser_debugging(parse_log) 357 # create a job model object and set up the db 358 self.results_db = tko_db.db(autocommit=True) 359 self.parser = status_lib.parser(self._STATUS_VERSION) 360 self.job_model = self.parser.make_job(self.resultdir) 361 self.parser.start(self.job_model) 362 # check if a job already exists in the db and insert it if 363 # it does not 364 job_idx = self.results_db.find_job(self._parse_job) 365 if job_idx is None: 366 self.results_db.insert_job(self._parse_job, self.job_model, 367 self.parent_job_id) 368 else: 369 machine_idx = self.results_db.lookup_machine(self.job_model.machine) 370 self.job_model.index = job_idx 371 self.job_model.machine_idx = machine_idx 372 373 374 def cleanup_parser(self): 375 """ 376 This should be called after the server job is finished 377 to carry out any remaining cleanup (e.g. flushing any 378 remaining test results to the results db) 379 """ 380 if not self._using_parser: 381 return 382 final_tests = self.parser.end() 383 for test in final_tests: 384 self.__insert_test(test) 385 self._using_parser = False 386 387 # TODO crbug.com/285395 add a kwargs parameter. 388 def _make_namespace(self): 389 """Create a namespace dictionary to be passed along to control file. 390 391 Creates a namespace argument populated with standard values: 392 machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag, 393 and ssh_options. 394 """ 395 namespace = {'machines' : self.machine_dict_list, 396 'job' : self, 397 'ssh_user' : self._ssh_user, 398 'ssh_port' : self._ssh_port, 399 'ssh_pass' : self._ssh_pass, 400 'ssh_verbosity_flag' : self._ssh_verbosity_flag, 401 'ssh_options' : self._ssh_options} 402 return namespace 403 404 405 def cleanup(self, labels): 406 """Cleanup machines. 407 408 @param labels: Comma separated job labels, will be used to 409 determine special task actions. 410 """ 411 if not self.machines: 412 raise error.AutoservError('No machines specified to cleanup') 413 if self.resultdir: 414 os.chdir(self.resultdir) 415 416 namespace = self._make_namespace() 417 namespace.update({'job_labels': labels, 'args': ''}) 418 self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False) 419 420 421 def verify(self, labels): 422 """Verify machines are all ssh-able. 423 424 @param labels: Comma separated job labels, will be used to 425 determine special task actions. 426 """ 427 if not self.machines: 428 raise error.AutoservError('No machines specified to verify') 429 if self.resultdir: 430 os.chdir(self.resultdir) 431 432 namespace = self._make_namespace() 433 namespace.update({'job_labels': labels, 'args': ''}) 434 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False) 435 436 437 def reset(self, labels): 438 """Reset machines by first cleanup then verify each machine. 439 440 @param labels: Comma separated job labels, will be used to 441 determine special task actions. 442 """ 443 if not self.machines: 444 raise error.AutoservError('No machines specified to reset.') 445 if self.resultdir: 446 os.chdir(self.resultdir) 447 448 namespace = self._make_namespace() 449 namespace.update({'job_labels': labels, 'args': ''}) 450 self._execute_code(RESET_CONTROL_FILE, namespace, protect=False) 451 452 453 def repair(self, labels): 454 """Repair machines. 455 456 @param labels: Comma separated job labels, will be used to 457 determine special task actions. 458 """ 459 if not self.machines: 460 raise error.AutoservError('No machines specified to repair') 461 if self.resultdir: 462 os.chdir(self.resultdir) 463 464 namespace = self._make_namespace() 465 namespace.update({'job_labels': labels, 'args': ''}) 466 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False) 467 468 469 def provision(self, labels): 470 """ 471 Provision all hosts to match |labels|. 472 473 @param labels: A comma seperated string of labels to provision the 474 host to. 475 476 """ 477 control = self._load_control_file(PROVISION_CONTROL_FILE) 478 self.run(control=control, job_labels=labels) 479 480 481 def precheck(self): 482 """ 483 perform any additional checks in derived classes. 484 """ 485 pass 486 487 488 def enable_external_logging(self): 489 """ 490 Start or restart external logging mechanism. 491 """ 492 pass 493 494 495 def disable_external_logging(self): 496 """ 497 Pause or stop external logging mechanism. 498 """ 499 pass 500 501 502 def use_external_logging(self): 503 """ 504 Return True if external logging should be used. 505 """ 506 return False 507 508 509 def _make_parallel_wrapper(self, function, machines, log): 510 """Wrap function as appropriate for calling by parallel_simple.""" 511 # machines could be a list of dictionaries, e.g., 512 # [{'host_attributes': {}, 'hostname': '100.96.51.226'}] 513 # The dictionary is generated in base_server_job.__init__, refer to 514 # variable machine_dict_list, then passed in with namespace, see method 515 # base_server_job._make_namespace. 516 # To compare the machinese to self.machines, which is a list of machine 517 # hostname, we need to convert machines back to a list of hostnames. 518 # Note that the order of hostnames doesn't matter, as is_forking will be 519 # True if there are more than one machine. 520 if (machines and isinstance(machines, list) 521 and isinstance(machines[0], dict)): 522 machines = [m['hostname'] for m in machines] 523 is_forking = not (len(machines) == 1 and self.machines == machines) 524 if self._parse_job and is_forking and log: 525 def wrapper(machine): 526 hostname = server_utils.get_hostname_from_machine(machine) 527 self._parse_job += "/" + hostname 528 self._using_parser = INCREMENTAL_TKO_PARSING 529 self.machines = [machine] 530 self.push_execution_context(hostname) 531 os.chdir(self.resultdir) 532 utils.write_keyval(self.resultdir, {"hostname": hostname}) 533 self.init_parser() 534 result = function(machine) 535 self.cleanup_parser() 536 return result 537 elif len(machines) > 1 and log: 538 def wrapper(machine): 539 hostname = server_utils.get_hostname_from_machine(machine) 540 self.push_execution_context(hostname) 541 os.chdir(self.resultdir) 542 machine_data = {'hostname' : hostname, 543 'status_version' : str(self._STATUS_VERSION)} 544 utils.write_keyval(self.resultdir, machine_data) 545 result = function(machine) 546 return result 547 else: 548 wrapper = function 549 return wrapper 550 551 552 def parallel_simple(self, function, machines, log=True, timeout=None, 553 return_results=False): 554 """ 555 Run 'function' using parallel_simple, with an extra wrapper to handle 556 the necessary setup for continuous parsing, if possible. If continuous 557 parsing is already properly initialized then this should just work. 558 559 @param function: A callable to run in parallel given each machine. 560 @param machines: A list of machine names to be passed one per subcommand 561 invocation of function. 562 @param log: If True, output will be written to output in a subdirectory 563 named after each machine. 564 @param timeout: Seconds after which the function call should timeout. 565 @param return_results: If True instead of an AutoServError being raised 566 on any error a list of the results|exceptions from the function 567 called on each arg is returned. [default: False] 568 569 @raises error.AutotestError: If any of the functions failed. 570 """ 571 wrapper = self._make_parallel_wrapper(function, machines, log) 572 return subcommand.parallel_simple(wrapper, machines, 573 log=log, timeout=timeout, 574 return_results=return_results) 575 576 577 def parallel_on_machines(self, function, machines, timeout=None): 578 """ 579 @param function: Called in parallel with one machine as its argument. 580 @param machines: A list of machines to call function(machine) on. 581 @param timeout: Seconds after which the function call should timeout. 582 583 @returns A list of machines on which function(machine) returned 584 without raising an exception. 585 """ 586 results = self.parallel_simple(function, machines, timeout=timeout, 587 return_results=True) 588 success_machines = [] 589 for result, machine in itertools.izip(results, machines): 590 if not isinstance(result, Exception): 591 success_machines.append(machine) 592 return success_machines 593 594 595 _USE_TEMP_DIR = object() 596 def run(self, install_before=False, install_after=False, 597 collect_crashdumps=True, namespace={}, control=None, 598 control_file_dir=None, verify_job_repo_url=False, 599 only_collect_crashinfo=False, skip_crash_collection=False, 600 job_labels='', use_packaging=True): 601 # for a normal job, make sure the uncollected logs file exists 602 # for a crashinfo-only run it should already exist, bail out otherwise 603 created_uncollected_logs = False 604 logging.info("I am PID %s", os.getpid()) 605 if self.resultdir and not os.path.exists(self._uncollected_log_file): 606 if only_collect_crashinfo: 607 # if this is a crashinfo-only run, and there were no existing 608 # uncollected logs, just bail out early 609 logging.info("No existing uncollected logs, " 610 "skipping crashinfo collection") 611 return 612 else: 613 log_file = open(self._uncollected_log_file, "w") 614 pickle.dump([], log_file) 615 log_file.close() 616 created_uncollected_logs = True 617 618 # use a copy so changes don't affect the original dictionary 619 namespace = namespace.copy() 620 machines = self.machines 621 if control is None: 622 if self.control is None: 623 control = '' 624 else: 625 control = self._load_control_file(self.control) 626 if control_file_dir is None: 627 control_file_dir = self.resultdir 628 629 self.aborted = False 630 namespace.update(self._make_namespace()) 631 namespace.update({'args' : self.args, 632 'job_labels' : job_labels}) 633 test_start_time = int(time.time()) 634 635 if self.resultdir: 636 os.chdir(self.resultdir) 637 # touch status.log so that the parser knows a job is running here 638 open(self.get_status_log_path(), 'a').close() 639 self.enable_external_logging() 640 641 collect_crashinfo = True 642 temp_control_file_dir = None 643 try: 644 try: 645 if install_before and machines: 646 self._execute_code(INSTALL_CONTROL_FILE, namespace) 647 648 if only_collect_crashinfo: 649 return 650 651 # If the verify_job_repo_url option is set but we're unable 652 # to actually verify that the job_repo_url contains the autotest 653 # package, this job will fail. 654 if verify_job_repo_url: 655 self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE, 656 namespace) 657 else: 658 logging.warning('Not checking if job_repo_url contains ' 659 'autotest packages on %s', machines) 660 661 # determine the dir to write the control files to 662 cfd_specified = (control_file_dir 663 and control_file_dir is not self._USE_TEMP_DIR) 664 if cfd_specified: 665 temp_control_file_dir = None 666 else: 667 temp_control_file_dir = tempfile.mkdtemp( 668 suffix='temp_control_file_dir') 669 control_file_dir = temp_control_file_dir 670 server_control_file = os.path.join(control_file_dir, 671 self._control_filename) 672 client_control_file = os.path.join(control_file_dir, 673 CLIENT_CONTROL_FILENAME) 674 if self._client: 675 namespace['control'] = control 676 utils.open_write_close(client_control_file, control) 677 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE, 678 server_control_file) 679 else: 680 utils.open_write_close(server_control_file, control) 681 682 logging.info("Processing control file") 683 namespace['use_packaging'] = use_packaging 684 self._execute_code(server_control_file, namespace) 685 logging.info("Finished processing control file") 686 687 # If no device error occured, no need to collect crashinfo. 688 collect_crashinfo = self.failed_with_device_error 689 except Exception, e: 690 try: 691 logging.exception( 692 'Exception escaped control file, job aborting:') 693 reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX, 694 ' ', str(e)) 695 self.record('INFO', None, None, str(e), 696 {'job_abort_reason': reason}) 697 except: 698 pass # don't let logging exceptions here interfere 699 raise 700 finally: 701 if temp_control_file_dir: 702 # Clean up temp directory used for copies of the control files 703 try: 704 shutil.rmtree(temp_control_file_dir) 705 except Exception, e: 706 logging.warning('Could not remove temp directory %s: %s', 707 temp_control_file_dir, e) 708 709 if machines and (collect_crashdumps or collect_crashinfo): 710 namespace['test_start_time'] = test_start_time 711 if skip_crash_collection: 712 logging.info('Skipping crash dump/info collection ' 713 'as requested.') 714 elif collect_crashinfo: 715 # includes crashdumps 716 self._execute_code(CRASHINFO_CONTROL_FILE, namespace) 717 else: 718 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace) 719 self.disable_external_logging() 720 if self._uncollected_log_file and created_uncollected_logs: 721 os.remove(self._uncollected_log_file) 722 if install_after and machines: 723 self._execute_code(INSTALL_CONTROL_FILE, namespace) 724 725 726 def run_test(self, url, *args, **dargs): 727 """ 728 Summon a test object and run it. 729 730 tag 731 tag to add to testname 732 url 733 url of the test to run 734 """ 735 if self._disable_sysinfo: 736 dargs['disable_sysinfo'] = True 737 738 group, testname = self.pkgmgr.get_package_name(url, 'test') 739 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 740 outputdir = self._make_test_outputdir(subdir) 741 742 def group_func(): 743 try: 744 test.runtest(self, url, tag, args, dargs) 745 except error.TestBaseException, e: 746 self.record(e.exit_status, subdir, testname, str(e)) 747 raise 748 except Exception, e: 749 info = str(e) + "\n" + traceback.format_exc() 750 self.record('FAIL', subdir, testname, info) 751 raise 752 else: 753 self.record('GOOD', subdir, testname, 'completed successfully') 754 755 result, exc_info = self._run_group(testname, subdir, group_func) 756 if exc_info and isinstance(exc_info[1], error.TestBaseException): 757 return False 758 elif exc_info: 759 raise exc_info[0], exc_info[1], exc_info[2] 760 else: 761 return True 762 763 764 def _run_group(self, name, subdir, function, *args, **dargs): 765 """\ 766 Underlying method for running something inside of a group. 767 """ 768 result, exc_info = None, None 769 try: 770 self.record('START', subdir, name) 771 result = function(*args, **dargs) 772 except error.TestBaseException, e: 773 self.record("END %s" % e.exit_status, subdir, name) 774 exc_info = sys.exc_info() 775 except Exception, e: 776 err_msg = str(e) + '\n' 777 err_msg += traceback.format_exc() 778 self.record('END ABORT', subdir, name, err_msg) 779 raise error.JobError(name + ' failed\n' + traceback.format_exc()) 780 else: 781 self.record('END GOOD', subdir, name) 782 783 return result, exc_info 784 785 786 def run_group(self, function, *args, **dargs): 787 """\ 788 function: 789 subroutine to run 790 *args: 791 arguments for the function 792 """ 793 794 name = function.__name__ 795 796 # Allow the tag for the group to be specified. 797 tag = dargs.pop('tag', None) 798 if tag: 799 name = tag 800 801 return self._run_group(name, None, function, *args, **dargs)[0] 802 803 804 def run_op(self, op, op_func, get_kernel_func): 805 """\ 806 A specialization of run_group meant specifically for handling 807 management operation. Includes support for capturing the kernel version 808 after the operation. 809 810 Args: 811 op: name of the operation. 812 op_func: a function that carries out the operation (reboot, suspend) 813 get_kernel_func: a function that returns a string 814 representing the kernel version. 815 """ 816 try: 817 self.record('START', None, op) 818 op_func() 819 except Exception, e: 820 err_msg = str(e) + '\n' + traceback.format_exc() 821 self.record('END FAIL', None, op, err_msg) 822 raise 823 else: 824 kernel = get_kernel_func() 825 self.record('END GOOD', None, op, 826 optional_fields={"kernel": kernel}) 827 828 829 def run_control(self, path): 830 """Execute a control file found at path (relative to the autotest 831 path). Intended for executing a control file within a control file, 832 not for running the top-level job control file.""" 833 path = os.path.join(self.autodir, path) 834 control_file = self._load_control_file(path) 835 self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR) 836 837 838 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 839 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 840 on_every_test) 841 842 843 def add_sysinfo_logfile(self, file, on_every_test=False): 844 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 845 846 847 def _add_sysinfo_loggable(self, loggable, on_every_test): 848 if on_every_test: 849 self.sysinfo.test_loggables.add(loggable) 850 else: 851 self.sysinfo.boot_loggables.add(loggable) 852 853 854 def _read_warnings(self): 855 """Poll all the warning loggers and extract any new warnings that have 856 been logged. If the warnings belong to a category that is currently 857 disabled, this method will discard them and they will no longer be 858 retrievable. 859 860 Returns a list of (timestamp, message) tuples, where timestamp is an 861 integer epoch timestamp.""" 862 warnings = [] 863 while True: 864 # pull in a line of output from every logger that has 865 # output ready to be read 866 loggers, _, _ = select.select(self.warning_loggers, [], [], 0) 867 closed_loggers = set() 868 for logger in loggers: 869 line = logger.readline() 870 # record any broken pipes (aka line == empty) 871 if len(line) == 0: 872 closed_loggers.add(logger) 873 continue 874 # parse out the warning 875 timestamp, msgtype, msg = line.split('\t', 2) 876 timestamp = int(timestamp) 877 # if the warning is valid, add it to the results 878 if self.warning_manager.is_valid(timestamp, msgtype): 879 warnings.append((timestamp, msg.strip())) 880 881 # stop listening to loggers that are closed 882 self.warning_loggers -= closed_loggers 883 884 # stop if none of the loggers have any output left 885 if not loggers: 886 break 887 888 # sort into timestamp order 889 warnings.sort() 890 return warnings 891 892 893 def _unique_subdirectory(self, base_subdirectory_name): 894 """Compute a unique results subdirectory based on the given name. 895 896 Appends base_subdirectory_name with a number as necessary to find a 897 directory name that doesn't already exist. 898 """ 899 subdirectory = base_subdirectory_name 900 counter = 1 901 while os.path.exists(os.path.join(self.resultdir, subdirectory)): 902 subdirectory = base_subdirectory_name + '.' + str(counter) 903 counter += 1 904 return subdirectory 905 906 907 def get_record_context(self): 908 """Returns an object representing the current job.record context. 909 910 The object returned is an opaque object with a 0-arg restore method 911 which can be called to restore the job.record context (i.e. indentation) 912 to the current level. The intention is that it should be used when 913 something external which generate job.record calls (e.g. an autotest 914 client) can fail catastrophically and the server job record state 915 needs to be reset to its original "known good" state. 916 917 @return: A context object with a 0-arg restore() method.""" 918 return self._indenter.get_context() 919 920 921 def record_summary(self, status_code, test_name, reason='', attributes=None, 922 distinguishing_attributes=(), child_test_ids=None): 923 """Record a summary test result. 924 925 @param status_code: status code string, see 926 common_lib.log.is_valid_status() 927 @param test_name: name of the test 928 @param reason: (optional) string providing detailed reason for test 929 outcome 930 @param attributes: (optional) dict of string keyvals to associate with 931 this result 932 @param distinguishing_attributes: (optional) list of attribute names 933 that should be used to distinguish identically-named test 934 results. These attributes should be present in the attributes 935 parameter. This is used to generate user-friendly subdirectory 936 names. 937 @param child_test_ids: (optional) list of test indices for test results 938 used in generating this result. 939 """ 940 subdirectory_name_parts = [test_name] 941 for attribute in distinguishing_attributes: 942 assert attributes 943 assert attribute in attributes, '%s not in %s' % (attribute, 944 attributes) 945 subdirectory_name_parts.append(attributes[attribute]) 946 base_subdirectory_name = '.'.join(subdirectory_name_parts) 947 948 subdirectory = self._unique_subdirectory(base_subdirectory_name) 949 subdirectory_path = os.path.join(self.resultdir, subdirectory) 950 os.mkdir(subdirectory_path) 951 952 self.record(status_code, subdirectory, test_name, 953 status=reason, optional_fields={'is_summary': True}) 954 955 if attributes: 956 utils.write_keyval(subdirectory_path, attributes) 957 958 if child_test_ids: 959 ids_string = ','.join(str(test_id) for test_id in child_test_ids) 960 summary_data = {'child_test_ids': ids_string} 961 utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'), 962 summary_data) 963 964 965 def disable_warnings(self, warning_type): 966 self.warning_manager.disable_warnings(warning_type) 967 self.record("INFO", None, None, 968 "disabling %s warnings" % warning_type, 969 {"warnings.disable": warning_type}) 970 971 972 def enable_warnings(self, warning_type): 973 self.warning_manager.enable_warnings(warning_type) 974 self.record("INFO", None, None, 975 "enabling %s warnings" % warning_type, 976 {"warnings.enable": warning_type}) 977 978 979 def get_status_log_path(self, subdir=None): 980 """Return the path to the job status log. 981 982 @param subdir - Optional paramter indicating that you want the path 983 to a subdirectory status log. 984 985 @returns The path where the status log should be. 986 """ 987 if self.resultdir: 988 if subdir: 989 return os.path.join(self.resultdir, subdir, "status.log") 990 else: 991 return os.path.join(self.resultdir, "status.log") 992 else: 993 return None 994 995 996 def _update_uncollected_logs_list(self, update_func): 997 """Updates the uncollected logs list in a multi-process safe manner. 998 999 @param update_func - a function that updates the list of uncollected 1000 logs. Should take one parameter, the list to be updated. 1001 """ 1002 # Skip log collection if file _uncollected_log_file does not exist. 1003 if not (self._uncollected_log_file and 1004 os.path.exists(self._uncollected_log_file)): 1005 return 1006 if self._uncollected_log_file: 1007 log_file = open(self._uncollected_log_file, "r+") 1008 fcntl.flock(log_file, fcntl.LOCK_EX) 1009 try: 1010 uncollected_logs = pickle.load(log_file) 1011 update_func(uncollected_logs) 1012 log_file.seek(0) 1013 log_file.truncate() 1014 pickle.dump(uncollected_logs, log_file) 1015 log_file.flush() 1016 finally: 1017 fcntl.flock(log_file, fcntl.LOCK_UN) 1018 log_file.close() 1019 1020 1021 def add_client_log(self, hostname, remote_path, local_path): 1022 """Adds a new set of client logs to the list of uncollected logs, 1023 to allow for future log recovery. 1024 1025 @param host - the hostname of the machine holding the logs 1026 @param remote_path - the directory on the remote machine holding logs 1027 @param local_path - the local directory to copy the logs into 1028 """ 1029 def update_func(logs_list): 1030 logs_list.append((hostname, remote_path, local_path)) 1031 self._update_uncollected_logs_list(update_func) 1032 1033 1034 def remove_client_log(self, hostname, remote_path, local_path): 1035 """Removes a set of client logs from the list of uncollected logs, 1036 to allow for future log recovery. 1037 1038 @param host - the hostname of the machine holding the logs 1039 @param remote_path - the directory on the remote machine holding logs 1040 @param local_path - the local directory to copy the logs into 1041 """ 1042 def update_func(logs_list): 1043 logs_list.remove((hostname, remote_path, local_path)) 1044 self._update_uncollected_logs_list(update_func) 1045 1046 1047 def get_client_logs(self): 1048 """Retrieves the list of uncollected logs, if it exists. 1049 1050 @returns A list of (host, remote_path, local_path) tuples. Returns 1051 an empty list if no uncollected logs file exists. 1052 """ 1053 log_exists = (self._uncollected_log_file and 1054 os.path.exists(self._uncollected_log_file)) 1055 if log_exists: 1056 return pickle.load(open(self._uncollected_log_file)) 1057 else: 1058 return [] 1059 1060 1061 def _fill_server_control_namespace(self, namespace, protect=True): 1062 """ 1063 Prepare a namespace to be used when executing server control files. 1064 1065 This sets up the control file API by importing modules and making them 1066 available under the appropriate names within namespace. 1067 1068 For use by _execute_code(). 1069 1070 Args: 1071 namespace: The namespace dictionary to fill in. 1072 protect: Boolean. If True (the default) any operation that would 1073 clobber an existing entry in namespace will cause an error. 1074 Raises: 1075 error.AutoservError: When a name would be clobbered by import. 1076 """ 1077 def _import_names(module_name, names=()): 1078 """ 1079 Import a module and assign named attributes into namespace. 1080 1081 Args: 1082 module_name: The string module name. 1083 names: A limiting list of names to import from module_name. If 1084 empty (the default), all names are imported from the module 1085 similar to a "from foo.bar import *" statement. 1086 Raises: 1087 error.AutoservError: When a name being imported would clobber 1088 a name already in namespace. 1089 """ 1090 module = __import__(module_name, {}, {}, names) 1091 1092 # No names supplied? Import * from the lowest level module. 1093 # (Ugh, why do I have to implement this part myself?) 1094 if not names: 1095 for submodule_name in module_name.split('.')[1:]: 1096 module = getattr(module, submodule_name) 1097 if hasattr(module, '__all__'): 1098 names = getattr(module, '__all__') 1099 else: 1100 names = dir(module) 1101 1102 # Install each name into namespace, checking to make sure it 1103 # doesn't override anything that already exists. 1104 for name in names: 1105 # Check for conflicts to help prevent future problems. 1106 if name in namespace and protect: 1107 if namespace[name] is not getattr(module, name): 1108 raise error.AutoservError('importing name ' 1109 '%s from %s %r would override %r' % 1110 (name, module_name, getattr(module, name), 1111 namespace[name])) 1112 else: 1113 # Encourage cleanliness and the use of __all__ for a 1114 # more concrete API with less surprises on '*' imports. 1115 warnings.warn('%s (%r) being imported from %s for use ' 1116 'in server control files is not the ' 1117 'first occurrance of that import.' % 1118 (name, namespace[name], module_name)) 1119 1120 namespace[name] = getattr(module, name) 1121 1122 1123 # This is the equivalent of prepending a bunch of import statements to 1124 # the front of the control script. 1125 namespace.update(os=os, sys=sys, logging=logging) 1126 _import_names('autotest_lib.server', 1127 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler', 1128 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel')) 1129 _import_names('autotest_lib.server.subcommand', 1130 ('parallel', 'parallel_simple', 'subcommand')) 1131 _import_names('autotest_lib.server.utils', 1132 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine')) 1133 _import_names('autotest_lib.client.common_lib.error') 1134 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',)) 1135 1136 # Inject ourself as the job object into other classes within the API. 1137 # (Yuck, this injection is a gross thing be part of a public API. -gps) 1138 # 1139 # XXX Base & SiteAutotest do not appear to use .job. Who does? 1140 namespace['autotest'].Autotest.job = self 1141 # server.hosts.base_classes.Host uses .job. 1142 namespace['hosts'].Host.job = self 1143 namespace['hosts'].TestBed.job = self 1144 namespace['hosts'].factory.ssh_user = self._ssh_user 1145 namespace['hosts'].factory.ssh_port = self._ssh_port 1146 namespace['hosts'].factory.ssh_pass = self._ssh_pass 1147 namespace['hosts'].factory.ssh_verbosity_flag = ( 1148 self._ssh_verbosity_flag) 1149 namespace['hosts'].factory.ssh_options = self._ssh_options 1150 1151 1152 def _execute_code(self, code_file, namespace, protect=True): 1153 """ 1154 Execute code using a copy of namespace as a server control script. 1155 1156 Unless protect_namespace is explicitly set to False, the dict will not 1157 be modified. 1158 1159 Args: 1160 code_file: The filename of the control file to execute. 1161 namespace: A dict containing names to make available during execution. 1162 protect: Boolean. If True (the default) a copy of the namespace dict 1163 is used during execution to prevent the code from modifying its 1164 contents outside of this function. If False the raw dict is 1165 passed in and modifications will be allowed. 1166 """ 1167 if protect: 1168 namespace = namespace.copy() 1169 self._fill_server_control_namespace(namespace, protect=protect) 1170 # TODO: Simplify and get rid of the special cases for only 1 machine. 1171 if len(self.machines) > 1: 1172 machines_text = '\n'.join(self.machines) + '\n' 1173 # Only rewrite the file if it does not match our machine list. 1174 try: 1175 machines_f = open(MACHINES_FILENAME, 'r') 1176 existing_machines_text = machines_f.read() 1177 machines_f.close() 1178 except EnvironmentError: 1179 existing_machines_text = None 1180 if machines_text != existing_machines_text: 1181 utils.open_write_close(MACHINES_FILENAME, machines_text) 1182 execfile(code_file, namespace, namespace) 1183 1184 1185 def _parse_status(self, new_line): 1186 if not self._using_parser: 1187 return 1188 new_tests = self.parser.process_lines([new_line]) 1189 for test in new_tests: 1190 self.__insert_test(test) 1191 1192 1193 def __insert_test(self, test): 1194 """ 1195 An internal method to insert a new test result into the 1196 database. This method will not raise an exception, even if an 1197 error occurs during the insert, to avoid failing a test 1198 simply because of unexpected database issues.""" 1199 self.num_tests_run += 1 1200 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'): 1201 self.num_tests_failed += 1 1202 try: 1203 self.results_db.insert_test(self.job_model, test) 1204 except Exception: 1205 msg = ("WARNING: An unexpected error occured while " 1206 "inserting test results into the database. " 1207 "Ignoring error.\n" + traceback.format_exc()) 1208 print >> sys.stderr, msg 1209 1210 1211 def preprocess_client_state(self): 1212 """ 1213 Produce a state file for initializing the state of a client job. 1214 1215 Creates a new client state file with all the current server state, as 1216 well as some pre-set client state. 1217 1218 @returns The path of the file the state was written into. 1219 """ 1220 # initialize the sysinfo state 1221 self._state.set('client', 'sysinfo', self.sysinfo.serialize()) 1222 1223 # dump the state out to a tempfile 1224 fd, file_path = tempfile.mkstemp(dir=self.tmpdir) 1225 os.close(fd) 1226 1227 # write_to_file doesn't need locking, we exclusively own file_path 1228 self._state.write_to_file(file_path) 1229 return file_path 1230 1231 1232 def postprocess_client_state(self, state_path): 1233 """ 1234 Update the state of this job with the state from a client job. 1235 1236 Updates the state of the server side of a job with the final state 1237 of a client job that was run. Updates the non-client-specific state, 1238 pulls in some specific bits from the client-specific state, and then 1239 discards the rest. Removes the state file afterwards 1240 1241 @param state_file A path to the state file from the client. 1242 """ 1243 # update the on-disk state 1244 try: 1245 self._state.read_from_file(state_path) 1246 os.remove(state_path) 1247 except OSError, e: 1248 # ignore file-not-found errors 1249 if e.errno != errno.ENOENT: 1250 raise 1251 else: 1252 logging.debug('Client state file %s not found', state_path) 1253 1254 # update the sysinfo state 1255 if self._state.has('client', 'sysinfo'): 1256 self.sysinfo.deserialize(self._state.get('client', 'sysinfo')) 1257 1258 # drop all the client-specific state 1259 self._state.discard_namespace('client') 1260 1261 1262 def clear_all_known_hosts(self): 1263 """Clears known hosts files for all AbstractSSHHosts.""" 1264 for host in self.hosts: 1265 if isinstance(host, abstract_ssh.AbstractSSHHost): 1266 host.clear_known_hosts() 1267 1268 1269 class warning_manager(object): 1270 """Class for controlling warning logs. Manages the enabling and disabling 1271 of warnings.""" 1272 def __init__(self): 1273 # a map of warning types to a list of disabled time intervals 1274 self.disabled_warnings = {} 1275 1276 1277 def is_valid(self, timestamp, warning_type): 1278 """Indicates if a warning (based on the time it occured and its type) 1279 is a valid warning. A warning is considered "invalid" if this type of 1280 warning was marked as "disabled" at the time the warning occured.""" 1281 disabled_intervals = self.disabled_warnings.get(warning_type, []) 1282 for start, end in disabled_intervals: 1283 if timestamp >= start and (end is None or timestamp < end): 1284 return False 1285 return True 1286 1287 1288 def disable_warnings(self, warning_type, current_time_func=time.time): 1289 """As of now, disables all further warnings of this type.""" 1290 intervals = self.disabled_warnings.setdefault(warning_type, []) 1291 if not intervals or intervals[-1][1] is not None: 1292 intervals.append((int(current_time_func()), None)) 1293 1294 1295 def enable_warnings(self, warning_type, current_time_func=time.time): 1296 """As of now, enables all further warnings of this type.""" 1297 intervals = self.disabled_warnings.get(warning_type, []) 1298 if intervals and intervals[-1][1] is None: 1299 intervals[-1] = (intervals[-1][0], int(current_time_func())) 1300 1301 1302 # load up site-specific code for generating site-specific job data 1303 get_site_job_data = utils.import_site_function(__file__, 1304 "autotest_lib.server.site_server_job", "get_site_job_data", 1305 _get_site_job_data_dummy) 1306 1307 1308 site_server_job = utils.import_site_class( 1309 __file__, "autotest_lib.server.site_server_job", "site_server_job", 1310 base_server_job) 1311 1312 1313 class server_job(site_server_job): 1314 pass 1315