1 # pylint: disable-msg=C0111 2 3 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 4 # Use of this source code is governed by a BSD-style license that can be 5 # found in the LICENSE file. 6 """ 7 The main job wrapper for the server side. 8 9 This is the core infrastructure. Derived from the client side job.py 10 11 Copyright Martin J. Bligh, Andy Whitcroft 2007 12 """ 13 14 import errno 15 import fcntl 16 import getpass 17 import itertools 18 import logging 19 import os 20 import pickle 21 import platform 22 import re 23 import select 24 import shutil 25 import sys 26 import tempfile 27 import time 28 import traceback 29 import warnings 30 31 from autotest_lib.client.bin import sysinfo 32 from autotest_lib.client.common_lib import base_job 33 from autotest_lib.client.common_lib import error 34 from autotest_lib.client.common_lib import global_config 35 from autotest_lib.client.common_lib import logging_manager 36 from autotest_lib.client.common_lib import packages 37 from autotest_lib.client.common_lib import utils 38 from autotest_lib.server import profilers 39 from autotest_lib.server import subcommand 40 from autotest_lib.server import test 41 from autotest_lib.server import utils as server_utils 42 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 43 from autotest_lib.server.hosts import abstract_ssh 44 from autotest_lib.server.hosts import afe_store 45 from autotest_lib.server.hosts import factory as host_factory 46 from autotest_lib.server.hosts import host_info 47 from autotest_lib.tko import db as tko_db 48 from autotest_lib.tko import models as tko_models 49 from autotest_lib.tko import status_lib 50 from autotest_lib.tko import parser_lib 51 from autotest_lib.tko import utils as tko_utils 52 53 54 INCREMENTAL_TKO_PARSING = global_config.global_config.get_config_value( 55 'autoserv', 'incremental_tko_parsing', type=bool, default=False) 56 57 def _control_segment_path(name): 58 """Get the pathname of the named control segment file.""" 59 server_dir = os.path.dirname(os.path.abspath(__file__)) 60 return os.path.join(server_dir, "control_segments", name) 61 62 63 CLIENT_CONTROL_FILENAME = 'control' 64 SERVER_CONTROL_FILENAME = 'control.srv' 65 MACHINES_FILENAME = '.machines' 66 67 CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper') 68 CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps') 69 CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo') 70 INSTALL_CONTROL_FILE = _control_segment_path('install') 71 CLEANUP_CONTROL_FILE = _control_segment_path('cleanup') 72 VERIFY_CONTROL_FILE = _control_segment_path('verify') 73 REPAIR_CONTROL_FILE = _control_segment_path('repair') 74 PROVISION_CONTROL_FILE = _control_segment_path('provision') 75 VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url') 76 RESET_CONTROL_FILE = _control_segment_path('reset') 77 GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats') 78 79 # by default provide a stub that generates no site data 80 def _get_site_job_data_dummy(job): 81 return {} 82 83 84 def get_machine_dicts(machine_names, in_lab, host_attributes=None): 85 """Converts a list of machine names to list of dicts. 86 87 @param machine_names: A list of machine names. 88 @param in_lab: A boolean indicating whether we're running in lab. 89 @param host_attributes: Optional list of host attributes to add for each 90 host. 91 @returns: A list of dicts. Each dict has the following keys: 92 'hostname': Name of the machine originally in machine_names (str). 93 'afe_host': A frontend.Host object for the machine, or a stub if 94 in_lab is false. 95 'host_info_store': A host_info.CachingHostInfoStore object to obtain 96 host information. A stub if in_lab is False. 97 """ 98 if host_attributes is None: 99 host_attributes = dict() 100 machine_dict_list = [] 101 for machine in machine_names: 102 afe_host = _create_afe_host(machine, in_lab) 103 afe_host.attributes.update(host_attributes) 104 machine_dict_list.append({ 105 'hostname' : machine, 106 'afe_host' : afe_host, 107 'host_info_store': _create_host_info_store(machine, in_lab), 108 }) 109 return machine_dict_list 110 111 112 class status_indenter(base_job.status_indenter): 113 """Provide a simple integer-backed status indenter.""" 114 def __init__(self): 115 self._indent = 0 116 117 118 @property 119 def indent(self): 120 return self._indent 121 122 123 def increment(self): 124 self._indent += 1 125 126 127 def decrement(self): 128 self._indent -= 1 129 130 131 def get_context(self): 132 """Returns a context object for use by job.get_record_context.""" 133 class context(object): 134 def __init__(self, indenter, indent): 135 self._indenter = indenter 136 self._indent = indent 137 def restore(self): 138 self._indenter._indent = self._indent 139 return context(self, self._indent) 140 141 142 class server_job_record_hook(object): 143 """The job.record hook for server job. Used to inject WARN messages from 144 the console or vlm whenever new logs are written, and to echo any logs 145 to INFO level logging. Implemented as a class so that it can use state to 146 block recursive calls, so that the hook can call job.record itself to 147 log WARN messages. 148 149 Depends on job._read_warnings and job._logger. 150 """ 151 def __init__(self, job): 152 self._job = job 153 self._being_called = False 154 155 156 def __call__(self, entry): 157 """A wrapper around the 'real' record hook, the _hook method, which 158 prevents recursion. This isn't making any effort to be threadsafe, 159 the intent is to outright block infinite recursion via a 160 job.record->_hook->job.record->_hook->job.record... chain.""" 161 if self._being_called: 162 return 163 self._being_called = True 164 try: 165 self._hook(self._job, entry) 166 finally: 167 self._being_called = False 168 169 170 @staticmethod 171 def _hook(job, entry): 172 """The core hook, which can safely call job.record.""" 173 entries = [] 174 # poll all our warning loggers for new warnings 175 for timestamp, msg in job._read_warnings(): 176 warning_entry = base_job.status_log_entry( 177 'WARN', None, None, msg, {}, timestamp=timestamp) 178 entries.append(warning_entry) 179 job.record_entry(warning_entry) 180 # echo rendered versions of all the status logs to info 181 entries.append(entry) 182 for entry in entries: 183 rendered_entry = job._logger.render_entry(entry) 184 logging.info(rendered_entry) 185 job._parse_status(rendered_entry) 186 187 188 class base_server_job(base_job.base_job): 189 """The server-side concrete implementation of base_job. 190 191 Optional properties provided by this implementation: 192 serverdir 193 194 num_tests_run 195 num_tests_failed 196 197 warning_manager 198 warning_loggers 199 """ 200 201 _STATUS_VERSION = 1 202 203 # TODO crbug.com/285395 eliminate ssh_verbosity_flag 204 def __init__(self, control, args, resultdir, label, user, machines, 205 client=False, parse_job='', 206 ssh_user=host_factory.DEFAULT_SSH_USER, 207 ssh_port=host_factory.DEFAULT_SSH_PORT, 208 ssh_pass=host_factory.DEFAULT_SSH_PASS, 209 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY, 210 ssh_options=host_factory.DEFAULT_SSH_OPTIONS, 211 test_retry=0, group_name='', 212 tag='', disable_sysinfo=False, 213 control_filename=SERVER_CONTROL_FILENAME, 214 parent_job_id=None, host_attributes=None, in_lab=False): 215 """ 216 Create a server side job object. 217 218 @param control: The pathname of the control file. 219 @param args: Passed to the control file. 220 @param resultdir: Where to throw the results. 221 @param label: Description of the job. 222 @param user: Username for the job (email address). 223 @param client: True if this is a client-side control file. 224 @param parse_job: string, if supplied it is the job execution tag that 225 the results will be passed through to the TKO parser with. 226 @param ssh_user: The SSH username. [root] 227 @param ssh_port: The SSH port number. [22] 228 @param ssh_pass: The SSH passphrase, if needed. 229 @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv', 230 '-vvv', or an empty string if not needed. 231 @param ssh_options: A string giving additional options that will be 232 included in ssh commands. 233 @param test_retry: The number of times to retry a test if the test did 234 not complete successfully. 235 @param group_name: If supplied, this will be written out as 236 host_group_name in the keyvals file for the parser. 237 @param tag: The job execution tag from the scheduler. [optional] 238 @param disable_sysinfo: Whether we should disable the sysinfo step of 239 tests for a modest shortening of test time. [optional] 240 @param control_filename: The filename where the server control file 241 should be written in the results directory. 242 @param parent_job_id: Job ID of the parent job. Default to None if the 243 job does not have a parent job. 244 @param host_attributes: Dict of host attributes passed into autoserv 245 via the command line. If specified here, these 246 attributes will apply to all machines. 247 @param in_lab: Boolean that indicates if this is running in the lab 248 environment. 249 """ 250 super(base_server_job, self).__init__(resultdir=resultdir, 251 test_retry=test_retry) 252 path = os.path.dirname(__file__) 253 self.test_retry = test_retry 254 self.control = control 255 self._uncollected_log_file = os.path.join(self.resultdir, 256 'uncollected_logs') 257 debugdir = os.path.join(self.resultdir, 'debug') 258 if not os.path.exists(debugdir): 259 os.mkdir(debugdir) 260 261 if user: 262 self.user = user 263 else: 264 self.user = getpass.getuser() 265 266 self.args = args 267 self.label = label 268 self.machines = machines 269 self._client = client 270 self.warning_loggers = set() 271 self.warning_manager = warning_manager() 272 self._ssh_user = ssh_user 273 self._ssh_port = ssh_port 274 self._ssh_pass = ssh_pass 275 self._ssh_verbosity_flag = ssh_verbosity_flag 276 self._ssh_options = ssh_options 277 self.tag = tag 278 self.hosts = set() 279 self.drop_caches = False 280 self.drop_caches_between_iterations = False 281 self._control_filename = control_filename 282 self._disable_sysinfo = disable_sysinfo 283 284 self.logging = logging_manager.get_logging_manager( 285 manage_stdout_and_stderr=True, redirect_fds=True) 286 subcommand.logging_manager_object = self.logging 287 288 self.sysinfo = sysinfo.sysinfo(self.resultdir) 289 self.profilers = profilers.profilers(self) 290 291 job_data = {'label' : label, 'user' : user, 292 'hostname' : ','.join(machines), 293 'drone' : platform.node(), 294 'status_version' : str(self._STATUS_VERSION), 295 'job_started' : str(int(time.time()))} 296 # Save parent job id to keyvals, so parser can retrieve the info and 297 # write to tko_jobs record. 298 if parent_job_id: 299 job_data['parent_job_id'] = parent_job_id 300 if group_name: 301 job_data['host_group_name'] = group_name 302 303 # only write these keyvals out on the first job in a resultdir 304 if 'job_started' not in utils.read_keyval(self.resultdir): 305 job_data.update(get_site_job_data(self)) 306 utils.write_keyval(self.resultdir, job_data) 307 308 self._parse_job = parse_job 309 self._using_parser = (INCREMENTAL_TKO_PARSING and self._parse_job 310 and len(machines) <= 1) 311 self.pkgmgr = packages.PackageManager( 312 self.autodir, run_function_dargs={'timeout':600}) 313 self.num_tests_run = 0 314 self.num_tests_failed = 0 315 316 self._register_subcommand_hooks() 317 318 # set up the status logger 319 self._indenter = status_indenter() 320 self._logger = base_job.status_logger( 321 self, self._indenter, 'status.log', 'status.log', 322 record_hook=server_job_record_hook(self)) 323 324 # Initialize a flag to indicate DUT failure during the test, e.g., 325 # unexpected reboot. 326 self.failed_with_device_error = False 327 328 self.parent_job_id = parent_job_id 329 self.in_lab = in_lab 330 self.machine_dict_list = get_machine_dicts( 331 self.machines, self.in_lab, host_attributes) 332 333 # TODO(jrbarnette) The harness attribute is only relevant to 334 # client jobs, but it's required to be present, or we will fail 335 # server job unit tests. Yes, really. 336 # 337 # TODO(jrbarnette) The utility of the 'harness' attribute even 338 # to client jobs is suspect. Probably, we should remove it. 339 self.harness = None 340 341 342 @classmethod 343 def _find_base_directories(cls): 344 """ 345 Determine locations of autodir, clientdir and serverdir. Assumes 346 that this file is located within serverdir and uses __file__ along 347 with relative paths to resolve the location. 348 """ 349 serverdir = os.path.abspath(os.path.dirname(__file__)) 350 autodir = os.path.normpath(os.path.join(serverdir, '..')) 351 clientdir = os.path.join(autodir, 'client') 352 return autodir, clientdir, serverdir 353 354 355 def _find_resultdir(self, resultdir, *args, **dargs): 356 """ 357 Determine the location of resultdir. For server jobs we expect one to 358 always be explicitly passed in to __init__, so just return that. 359 """ 360 if resultdir: 361 return os.path.normpath(resultdir) 362 else: 363 return None 364 365 366 def _get_status_logger(self): 367 """Return a reference to the status logger.""" 368 return self._logger 369 370 371 @staticmethod 372 def _load_control_file(path): 373 f = open(path) 374 try: 375 control_file = f.read() 376 finally: 377 f.close() 378 return re.sub('\r', '', control_file) 379 380 381 def _register_subcommand_hooks(self): 382 """ 383 Register some hooks into the subcommand modules that allow us 384 to properly clean up self.hosts created in forked subprocesses. 385 """ 386 def on_fork(cmd): 387 self._existing_hosts_on_fork = set(self.hosts) 388 def on_join(cmd): 389 new_hosts = self.hosts - self._existing_hosts_on_fork 390 for host in new_hosts: 391 host.close() 392 subcommand.subcommand.register_fork_hook(on_fork) 393 subcommand.subcommand.register_join_hook(on_join) 394 395 396 def init_parser(self): 397 """ 398 Start the continuous parsing of self.resultdir. This sets up 399 the database connection and inserts the basic job object into 400 the database if necessary. 401 """ 402 if not self._using_parser: 403 return 404 # redirect parser debugging to .parse.log 405 parse_log = os.path.join(self.resultdir, '.parse.log') 406 parse_log = open(parse_log, 'w', 0) 407 tko_utils.redirect_parser_debugging(parse_log) 408 # create a job model object and set up the db 409 self.results_db = tko_db.db(autocommit=True) 410 self.parser = parser_lib.parser(self._STATUS_VERSION) 411 self.job_model = self.parser.make_job(self.resultdir) 412 self.parser.start(self.job_model) 413 # check if a job already exists in the db and insert it if 414 # it does not 415 job_idx = self.results_db.find_job(self._parse_job) 416 if job_idx is None: 417 self.results_db.insert_job(self._parse_job, self.job_model, 418 self.parent_job_id) 419 else: 420 machine_idx = self.results_db.lookup_machine(self.job_model.machine) 421 self.job_model.index = job_idx 422 self.job_model.machine_idx = machine_idx 423 424 425 def cleanup_parser(self): 426 """ 427 This should be called after the server job is finished 428 to carry out any remaining cleanup (e.g. flushing any 429 remaining test results to the results db) 430 """ 431 if not self._using_parser: 432 return 433 final_tests = self.parser.end() 434 for test in final_tests: 435 self.__insert_test(test) 436 self._using_parser = False 437 438 # TODO crbug.com/285395 add a kwargs parameter. 439 def _make_namespace(self): 440 """Create a namespace dictionary to be passed along to control file. 441 442 Creates a namespace argument populated with standard values: 443 machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag, 444 and ssh_options. 445 """ 446 namespace = {'machines' : self.machine_dict_list, 447 'job' : self, 448 'ssh_user' : self._ssh_user, 449 'ssh_port' : self._ssh_port, 450 'ssh_pass' : self._ssh_pass, 451 'ssh_verbosity_flag' : self._ssh_verbosity_flag, 452 'ssh_options' : self._ssh_options} 453 return namespace 454 455 456 def cleanup(self, labels): 457 """Cleanup machines. 458 459 @param labels: Comma separated job labels, will be used to 460 determine special task actions. 461 """ 462 if not self.machines: 463 raise error.AutoservError('No machines specified to cleanup') 464 if self.resultdir: 465 os.chdir(self.resultdir) 466 467 namespace = self._make_namespace() 468 namespace.update({'job_labels': labels, 'args': ''}) 469 self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False) 470 471 472 def verify(self, labels): 473 """Verify machines are all ssh-able. 474 475 @param labels: Comma separated job labels, will be used to 476 determine special task actions. 477 """ 478 if not self.machines: 479 raise error.AutoservError('No machines specified to verify') 480 if self.resultdir: 481 os.chdir(self.resultdir) 482 483 namespace = self._make_namespace() 484 namespace.update({'job_labels': labels, 'args': ''}) 485 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False) 486 487 488 def reset(self, labels): 489 """Reset machines by first cleanup then verify each machine. 490 491 @param labels: Comma separated job labels, will be used to 492 determine special task actions. 493 """ 494 if not self.machines: 495 raise error.AutoservError('No machines specified to reset.') 496 if self.resultdir: 497 os.chdir(self.resultdir) 498 499 namespace = self._make_namespace() 500 namespace.update({'job_labels': labels, 'args': ''}) 501 self._execute_code(RESET_CONTROL_FILE, namespace, protect=False) 502 503 504 def repair(self, labels): 505 """Repair machines. 506 507 @param labels: Comma separated job labels, will be used to 508 determine special task actions. 509 """ 510 if not self.machines: 511 raise error.AutoservError('No machines specified to repair') 512 if self.resultdir: 513 os.chdir(self.resultdir) 514 515 namespace = self._make_namespace() 516 namespace.update({'job_labels': labels, 'args': ''}) 517 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False) 518 519 520 def provision(self, labels): 521 """ 522 Provision all hosts to match |labels|. 523 524 @param labels: A comma seperated string of labels to provision the 525 host to. 526 527 """ 528 control = self._load_control_file(PROVISION_CONTROL_FILE) 529 self.run(control=control, job_labels=labels) 530 531 532 def precheck(self): 533 """ 534 perform any additional checks in derived classes. 535 """ 536 pass 537 538 539 def enable_external_logging(self): 540 """ 541 Start or restart external logging mechanism. 542 """ 543 pass 544 545 546 def disable_external_logging(self): 547 """ 548 Pause or stop external logging mechanism. 549 """ 550 pass 551 552 553 def use_external_logging(self): 554 """ 555 Return True if external logging should be used. 556 """ 557 return False 558 559 560 def _make_parallel_wrapper(self, function, machines, log): 561 """Wrap function as appropriate for calling by parallel_simple.""" 562 # machines could be a list of dictionaries, e.g., 563 # [{'host_attributes': {}, 'hostname': '100.96.51.226'}] 564 # The dictionary is generated in base_server_job.__init__, refer to 565 # variable machine_dict_list, then passed in with namespace, see method 566 # base_server_job._make_namespace. 567 # To compare the machinese to self.machines, which is a list of machine 568 # hostname, we need to convert machines back to a list of hostnames. 569 # Note that the order of hostnames doesn't matter, as is_forking will be 570 # True if there are more than one machine. 571 if (machines and isinstance(machines, list) 572 and isinstance(machines[0], dict)): 573 machines = [m['hostname'] for m in machines] 574 is_forking = not (len(machines) == 1 and self.machines == machines) 575 if self._parse_job and is_forking and log: 576 def wrapper(machine): 577 hostname = server_utils.get_hostname_from_machine(machine) 578 self._parse_job += "/" + hostname 579 self._using_parser = INCREMENTAL_TKO_PARSING 580 self.machines = [machine] 581 self.push_execution_context(hostname) 582 os.chdir(self.resultdir) 583 utils.write_keyval(self.resultdir, {"hostname": hostname}) 584 self.init_parser() 585 result = function(machine) 586 self.cleanup_parser() 587 return result 588 elif len(machines) > 1 and log: 589 def wrapper(machine): 590 hostname = server_utils.get_hostname_from_machine(machine) 591 self.push_execution_context(hostname) 592 os.chdir(self.resultdir) 593 machine_data = {'hostname' : hostname, 594 'status_version' : str(self._STATUS_VERSION)} 595 utils.write_keyval(self.resultdir, machine_data) 596 result = function(machine) 597 return result 598 else: 599 wrapper = function 600 return wrapper 601 602 603 def parallel_simple(self, function, machines, log=True, timeout=None, 604 return_results=False): 605 """ 606 Run 'function' using parallel_simple, with an extra wrapper to handle 607 the necessary setup for continuous parsing, if possible. If continuous 608 parsing is already properly initialized then this should just work. 609 610 @param function: A callable to run in parallel given each machine. 611 @param machines: A list of machine names to be passed one per subcommand 612 invocation of function. 613 @param log: If True, output will be written to output in a subdirectory 614 named after each machine. 615 @param timeout: Seconds after which the function call should timeout. 616 @param return_results: If True instead of an AutoServError being raised 617 on any error a list of the results|exceptions from the function 618 called on each arg is returned. [default: False] 619 620 @raises error.AutotestError: If any of the functions failed. 621 """ 622 wrapper = self._make_parallel_wrapper(function, machines, log) 623 return subcommand.parallel_simple(wrapper, machines, 624 log=log, timeout=timeout, 625 return_results=return_results) 626 627 628 def parallel_on_machines(self, function, machines, timeout=None): 629 """ 630 @param function: Called in parallel with one machine as its argument. 631 @param machines: A list of machines to call function(machine) on. 632 @param timeout: Seconds after which the function call should timeout. 633 634 @returns A list of machines on which function(machine) returned 635 without raising an exception. 636 """ 637 results = self.parallel_simple(function, machines, timeout=timeout, 638 return_results=True) 639 success_machines = [] 640 for result, machine in itertools.izip(results, machines): 641 if not isinstance(result, Exception): 642 success_machines.append(machine) 643 return success_machines 644 645 646 def _has_failed_tests(self): 647 """Parse status log for failed tests. 648 649 This checks the current working directory and is intended only for use 650 by the run() method. 651 652 @return boolean 653 """ 654 path = os.getcwd() 655 656 # TODO(ayatane): Copied from tko/parse.py. Needs extensive refactor to 657 # make code reuse plausible. 658 job_keyval = tko_models.job.read_keyval(path) 659 status_version = job_keyval.get("status_version", 0) 660 661 # parse out the job 662 parser = parser_lib.parser(status_version) 663 job = parser.make_job(path) 664 status_log = os.path.join(path, "status.log") 665 if not os.path.exists(status_log): 666 status_log = os.path.join(path, "status") 667 if not os.path.exists(status_log): 668 logging.warning("! Unable to parse job, no status file") 669 return True 670 671 # parse the status logs 672 status_lines = open(status_log).readlines() 673 parser.start(job) 674 tests = parser.end(status_lines) 675 676 # parser.end can return the same object multiple times, so filter out 677 # dups 678 job.tests = [] 679 already_added = set() 680 for test in tests: 681 if test not in already_added: 682 already_added.add(test) 683 job.tests.append(test) 684 685 failed = False 686 for test in job.tests: 687 # The current job is still running and shouldn't count as failed. 688 # The parser will fail to parse the exit status of the job since it 689 # hasn't exited yet (this running right now is the job). 690 failed = failed or (test.status != 'GOOD' 691 and not _is_current_server_job(test)) 692 return failed 693 694 695 def _collect_crashes(self, namespace, collect_crashinfo): 696 """Collect crashes. 697 698 @param namespace: namespace dict. 699 @param collect_crashinfo: whether to collect crashinfo in addition to 700 dumps 701 """ 702 if collect_crashinfo: 703 # includes crashdumps 704 crash_control_file = CRASHINFO_CONTROL_FILE 705 else: 706 crash_control_file = CRASHDUMPS_CONTROL_FILE 707 self._execute_code(crash_control_file, namespace) 708 709 710 _USE_TEMP_DIR = object() 711 def run(self, install_before=False, install_after=False, 712 collect_crashdumps=True, namespace={}, control=None, 713 control_file_dir=None, verify_job_repo_url=False, 714 only_collect_crashinfo=False, skip_crash_collection=False, 715 job_labels='', use_packaging=True): 716 # for a normal job, make sure the uncollected logs file exists 717 # for a crashinfo-only run it should already exist, bail out otherwise 718 created_uncollected_logs = False 719 logging.info("I am PID %s", os.getpid()) 720 if self.resultdir and not os.path.exists(self._uncollected_log_file): 721 if only_collect_crashinfo: 722 # if this is a crashinfo-only run, and there were no existing 723 # uncollected logs, just bail out early 724 logging.info("No existing uncollected logs, " 725 "skipping crashinfo collection") 726 return 727 else: 728 log_file = open(self._uncollected_log_file, "w") 729 pickle.dump([], log_file) 730 log_file.close() 731 created_uncollected_logs = True 732 733 # use a copy so changes don't affect the original dictionary 734 namespace = namespace.copy() 735 machines = self.machines 736 if control is None: 737 if self.control is None: 738 control = '' 739 else: 740 control = self._load_control_file(self.control) 741 if control_file_dir is None: 742 control_file_dir = self.resultdir 743 744 self.aborted = False 745 namespace.update(self._make_namespace()) 746 namespace.update({'args' : self.args, 747 'job_labels' : job_labels}) 748 test_start_time = int(time.time()) 749 750 if self.resultdir: 751 os.chdir(self.resultdir) 752 # touch status.log so that the parser knows a job is running here 753 open(self.get_status_log_path(), 'a').close() 754 self.enable_external_logging() 755 756 collect_crashinfo = True 757 temp_control_file_dir = None 758 try: 759 try: 760 namespace['network_stats_label'] = 'at-start' 761 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, namespace) 762 if install_before and machines: 763 self._execute_code(INSTALL_CONTROL_FILE, namespace) 764 765 if only_collect_crashinfo: 766 return 767 768 # If the verify_job_repo_url option is set but we're unable 769 # to actually verify that the job_repo_url contains the autotest 770 # package, this job will fail. 771 if verify_job_repo_url: 772 self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE, 773 namespace) 774 else: 775 logging.warning('Not checking if job_repo_url contains ' 776 'autotest packages on %s', machines) 777 778 # determine the dir to write the control files to 779 cfd_specified = (control_file_dir 780 and control_file_dir is not self._USE_TEMP_DIR) 781 if cfd_specified: 782 temp_control_file_dir = None 783 else: 784 temp_control_file_dir = tempfile.mkdtemp( 785 suffix='temp_control_file_dir') 786 control_file_dir = temp_control_file_dir 787 server_control_file = os.path.join(control_file_dir, 788 self._control_filename) 789 client_control_file = os.path.join(control_file_dir, 790 CLIENT_CONTROL_FILENAME) 791 if self._client: 792 namespace['control'] = control 793 utils.open_write_close(client_control_file, control) 794 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE, 795 server_control_file) 796 else: 797 utils.open_write_close(server_control_file, control) 798 799 logging.info("Processing control file") 800 namespace['use_packaging'] = use_packaging 801 self._execute_code(server_control_file, namespace) 802 logging.info("Finished processing control file") 803 804 # If no device error occured, no need to collect crashinfo. 805 collect_crashinfo = self.failed_with_device_error 806 except Exception, e: 807 try: 808 logging.exception( 809 'Exception escaped control file, job aborting:') 810 reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX, 811 ' ', str(e)) 812 self.record('INFO', None, None, str(e), 813 {'job_abort_reason': reason}) 814 except: 815 pass # don't let logging exceptions here interfere 816 raise 817 finally: 818 if temp_control_file_dir: 819 # Clean up temp directory used for copies of the control files 820 try: 821 shutil.rmtree(temp_control_file_dir) 822 except Exception, e: 823 logging.warning('Could not remove temp directory %s: %s', 824 temp_control_file_dir, e) 825 826 if machines and (collect_crashdumps or collect_crashinfo): 827 if skip_crash_collection: 828 logging.info('Skipping crash dump/info collection ' 829 'as requested.') 830 else: 831 namespace['test_start_time'] = test_start_time 832 # Remove crash files for passing tests. 833 # TODO(ayatane): Tests that create crash files should be 834 # reported. 835 namespace['has_failed_tests'] = self._has_failed_tests() 836 self._collect_crashes(namespace, collect_crashinfo) 837 self.disable_external_logging() 838 if self._uncollected_log_file and created_uncollected_logs: 839 os.remove(self._uncollected_log_file) 840 if install_after and machines: 841 self._execute_code(INSTALL_CONTROL_FILE, namespace) 842 namespace['network_stats_label'] = 'at-end' 843 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, namespace) 844 845 846 def run_test(self, url, *args, **dargs): 847 """ 848 Summon a test object and run it. 849 850 tag 851 tag to add to testname 852 url 853 url of the test to run 854 """ 855 if self._disable_sysinfo: 856 dargs['disable_sysinfo'] = True 857 858 group, testname = self.pkgmgr.get_package_name(url, 'test') 859 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 860 outputdir = self._make_test_outputdir(subdir) 861 862 def group_func(): 863 try: 864 test.runtest(self, url, tag, args, dargs) 865 except error.TestBaseException, e: 866 self.record(e.exit_status, subdir, testname, str(e)) 867 raise 868 except Exception, e: 869 info = str(e) + "\n" + traceback.format_exc() 870 self.record('FAIL', subdir, testname, info) 871 raise 872 else: 873 self.record('GOOD', subdir, testname, 'completed successfully') 874 875 result, exc_info = self._run_group(testname, subdir, group_func) 876 if exc_info and isinstance(exc_info[1], error.TestBaseException): 877 return False 878 elif exc_info: 879 raise exc_info[0], exc_info[1], exc_info[2] 880 else: 881 return True 882 883 884 def _run_group(self, name, subdir, function, *args, **dargs): 885 """\ 886 Underlying method for running something inside of a group. 887 """ 888 result, exc_info = None, None 889 try: 890 self.record('START', subdir, name) 891 result = function(*args, **dargs) 892 except error.TestBaseException, e: 893 self.record("END %s" % e.exit_status, subdir, name) 894 exc_info = sys.exc_info() 895 except Exception, e: 896 err_msg = str(e) + '\n' 897 err_msg += traceback.format_exc() 898 self.record('END ABORT', subdir, name, err_msg) 899 raise error.JobError(name + ' failed\n' + traceback.format_exc()) 900 else: 901 self.record('END GOOD', subdir, name) 902 903 return result, exc_info 904 905 906 def run_group(self, function, *args, **dargs): 907 """\ 908 function: 909 subroutine to run 910 *args: 911 arguments for the function 912 """ 913 914 name = function.__name__ 915 916 # Allow the tag for the group to be specified. 917 tag = dargs.pop('tag', None) 918 if tag: 919 name = tag 920 921 return self._run_group(name, None, function, *args, **dargs)[0] 922 923 924 def run_op(self, op, op_func, get_kernel_func): 925 """\ 926 A specialization of run_group meant specifically for handling 927 management operation. Includes support for capturing the kernel version 928 after the operation. 929 930 Args: 931 op: name of the operation. 932 op_func: a function that carries out the operation (reboot, suspend) 933 get_kernel_func: a function that returns a string 934 representing the kernel version. 935 """ 936 try: 937 self.record('START', None, op) 938 op_func() 939 except Exception, e: 940 err_msg = str(e) + '\n' + traceback.format_exc() 941 self.record('END FAIL', None, op, err_msg) 942 raise 943 else: 944 kernel = get_kernel_func() 945 self.record('END GOOD', None, op, 946 optional_fields={"kernel": kernel}) 947 948 949 def run_control(self, path): 950 """Execute a control file found at path (relative to the autotest 951 path). Intended for executing a control file within a control file, 952 not for running the top-level job control file.""" 953 path = os.path.join(self.autodir, path) 954 control_file = self._load_control_file(path) 955 self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR) 956 957 958 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 959 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 960 on_every_test) 961 962 963 def add_sysinfo_logfile(self, file, on_every_test=False): 964 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 965 966 967 def _add_sysinfo_loggable(self, loggable, on_every_test): 968 if on_every_test: 969 self.sysinfo.test_loggables.add(loggable) 970 else: 971 self.sysinfo.boot_loggables.add(loggable) 972 973 974 def _read_warnings(self): 975 """Poll all the warning loggers and extract any new warnings that have 976 been logged. If the warnings belong to a category that is currently 977 disabled, this method will discard them and they will no longer be 978 retrievable. 979 980 Returns a list of (timestamp, message) tuples, where timestamp is an 981 integer epoch timestamp.""" 982 warnings = [] 983 while True: 984 # pull in a line of output from every logger that has 985 # output ready to be read 986 loggers, _, _ = select.select(self.warning_loggers, [], [], 0) 987 closed_loggers = set() 988 for logger in loggers: 989 line = logger.readline() 990 # record any broken pipes (aka line == empty) 991 if len(line) == 0: 992 closed_loggers.add(logger) 993 continue 994 # parse out the warning 995 timestamp, msgtype, msg = line.split('\t', 2) 996 timestamp = int(timestamp) 997 # if the warning is valid, add it to the results 998 if self.warning_manager.is_valid(timestamp, msgtype): 999 warnings.append((timestamp, msg.strip())) 1000 1001 # stop listening to loggers that are closed 1002 self.warning_loggers -= closed_loggers 1003 1004 # stop if none of the loggers have any output left 1005 if not loggers: 1006 break 1007 1008 # sort into timestamp order 1009 warnings.sort() 1010 return warnings 1011 1012 1013 def _unique_subdirectory(self, base_subdirectory_name): 1014 """Compute a unique results subdirectory based on the given name. 1015 1016 Appends base_subdirectory_name with a number as necessary to find a 1017 directory name that doesn't already exist. 1018 """ 1019 subdirectory = base_subdirectory_name 1020 counter = 1 1021 while os.path.exists(os.path.join(self.resultdir, subdirectory)): 1022 subdirectory = base_subdirectory_name + '.' + str(counter) 1023 counter += 1 1024 return subdirectory 1025 1026 1027 def get_record_context(self): 1028 """Returns an object representing the current job.record context. 1029 1030 The object returned is an opaque object with a 0-arg restore method 1031 which can be called to restore the job.record context (i.e. indentation) 1032 to the current level. The intention is that it should be used when 1033 something external which generate job.record calls (e.g. an autotest 1034 client) can fail catastrophically and the server job record state 1035 needs to be reset to its original "known good" state. 1036 1037 @return: A context object with a 0-arg restore() method.""" 1038 return self._indenter.get_context() 1039 1040 1041 def record_summary(self, status_code, test_name, reason='', attributes=None, 1042 distinguishing_attributes=(), child_test_ids=None): 1043 """Record a summary test result. 1044 1045 @param status_code: status code string, see 1046 common_lib.log.is_valid_status() 1047 @param test_name: name of the test 1048 @param reason: (optional) string providing detailed reason for test 1049 outcome 1050 @param attributes: (optional) dict of string keyvals to associate with 1051 this result 1052 @param distinguishing_attributes: (optional) list of attribute names 1053 that should be used to distinguish identically-named test 1054 results. These attributes should be present in the attributes 1055 parameter. This is used to generate user-friendly subdirectory 1056 names. 1057 @param child_test_ids: (optional) list of test indices for test results 1058 used in generating this result. 1059 """ 1060 subdirectory_name_parts = [test_name] 1061 for attribute in distinguishing_attributes: 1062 assert attributes 1063 assert attribute in attributes, '%s not in %s' % (attribute, 1064 attributes) 1065 subdirectory_name_parts.append(attributes[attribute]) 1066 base_subdirectory_name = '.'.join(subdirectory_name_parts) 1067 1068 subdirectory = self._unique_subdirectory(base_subdirectory_name) 1069 subdirectory_path = os.path.join(self.resultdir, subdirectory) 1070 os.mkdir(subdirectory_path) 1071 1072 self.record(status_code, subdirectory, test_name, 1073 status=reason, optional_fields={'is_summary': True}) 1074 1075 if attributes: 1076 utils.write_keyval(subdirectory_path, attributes) 1077 1078 if child_test_ids: 1079 ids_string = ','.join(str(test_id) for test_id in child_test_ids) 1080 summary_data = {'child_test_ids': ids_string} 1081 utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'), 1082 summary_data) 1083 1084 1085 def disable_warnings(self, warning_type): 1086 self.warning_manager.disable_warnings(warning_type) 1087 self.record("INFO", None, None, 1088 "disabling %s warnings" % warning_type, 1089 {"warnings.disable": warning_type}) 1090 1091 1092 def enable_warnings(self, warning_type): 1093 self.warning_manager.enable_warnings(warning_type) 1094 self.record("INFO", None, None, 1095 "enabling %s warnings" % warning_type, 1096 {"warnings.enable": warning_type}) 1097 1098 1099 def get_status_log_path(self, subdir=None): 1100 """Return the path to the job status log. 1101 1102 @param subdir - Optional paramter indicating that you want the path 1103 to a subdirectory status log. 1104 1105 @returns The path where the status log should be. 1106 """ 1107 if self.resultdir: 1108 if subdir: 1109 return os.path.join(self.resultdir, subdir, "status.log") 1110 else: 1111 return os.path.join(self.resultdir, "status.log") 1112 else: 1113 return None 1114 1115 1116 def _update_uncollected_logs_list(self, update_func): 1117 """Updates the uncollected logs list in a multi-process safe manner. 1118 1119 @param update_func - a function that updates the list of uncollected 1120 logs. Should take one parameter, the list to be updated. 1121 """ 1122 # Skip log collection if file _uncollected_log_file does not exist. 1123 if not (self._uncollected_log_file and 1124 os.path.exists(self._uncollected_log_file)): 1125 return 1126 if self._uncollected_log_file: 1127 log_file = open(self._uncollected_log_file, "r+") 1128 fcntl.flock(log_file, fcntl.LOCK_EX) 1129 try: 1130 uncollected_logs = pickle.load(log_file) 1131 update_func(uncollected_logs) 1132 log_file.seek(0) 1133 log_file.truncate() 1134 pickle.dump(uncollected_logs, log_file) 1135 log_file.flush() 1136 finally: 1137 fcntl.flock(log_file, fcntl.LOCK_UN) 1138 log_file.close() 1139 1140 1141 def add_client_log(self, hostname, remote_path, local_path): 1142 """Adds a new set of client logs to the list of uncollected logs, 1143 to allow for future log recovery. 1144 1145 @param host - the hostname of the machine holding the logs 1146 @param remote_path - the directory on the remote machine holding logs 1147 @param local_path - the local directory to copy the logs into 1148 """ 1149 def update_func(logs_list): 1150 logs_list.append((hostname, remote_path, local_path)) 1151 self._update_uncollected_logs_list(update_func) 1152 1153 1154 def remove_client_log(self, hostname, remote_path, local_path): 1155 """Removes a set of client logs from the list of uncollected logs, 1156 to allow for future log recovery. 1157 1158 @param host - the hostname of the machine holding the logs 1159 @param remote_path - the directory on the remote machine holding logs 1160 @param local_path - the local directory to copy the logs into 1161 """ 1162 def update_func(logs_list): 1163 logs_list.remove((hostname, remote_path, local_path)) 1164 self._update_uncollected_logs_list(update_func) 1165 1166 1167 def get_client_logs(self): 1168 """Retrieves the list of uncollected logs, if it exists. 1169 1170 @returns A list of (host, remote_path, local_path) tuples. Returns 1171 an empty list if no uncollected logs file exists. 1172 """ 1173 log_exists = (self._uncollected_log_file and 1174 os.path.exists(self._uncollected_log_file)) 1175 if log_exists: 1176 return pickle.load(open(self._uncollected_log_file)) 1177 else: 1178 return [] 1179 1180 1181 def _fill_server_control_namespace(self, namespace, protect=True): 1182 """ 1183 Prepare a namespace to be used when executing server control files. 1184 1185 This sets up the control file API by importing modules and making them 1186 available under the appropriate names within namespace. 1187 1188 For use by _execute_code(). 1189 1190 Args: 1191 namespace: The namespace dictionary to fill in. 1192 protect: Boolean. If True (the default) any operation that would 1193 clobber an existing entry in namespace will cause an error. 1194 Raises: 1195 error.AutoservError: When a name would be clobbered by import. 1196 """ 1197 def _import_names(module_name, names=()): 1198 """ 1199 Import a module and assign named attributes into namespace. 1200 1201 Args: 1202 module_name: The string module name. 1203 names: A limiting list of names to import from module_name. If 1204 empty (the default), all names are imported from the module 1205 similar to a "from foo.bar import *" statement. 1206 Raises: 1207 error.AutoservError: When a name being imported would clobber 1208 a name already in namespace. 1209 """ 1210 module = __import__(module_name, {}, {}, names) 1211 1212 # No names supplied? Import * from the lowest level module. 1213 # (Ugh, why do I have to implement this part myself?) 1214 if not names: 1215 for submodule_name in module_name.split('.')[1:]: 1216 module = getattr(module, submodule_name) 1217 if hasattr(module, '__all__'): 1218 names = getattr(module, '__all__') 1219 else: 1220 names = dir(module) 1221 1222 # Install each name into namespace, checking to make sure it 1223 # doesn't override anything that already exists. 1224 for name in names: 1225 # Check for conflicts to help prevent future problems. 1226 if name in namespace and protect: 1227 if namespace[name] is not getattr(module, name): 1228 raise error.AutoservError('importing name ' 1229 '%s from %s %r would override %r' % 1230 (name, module_name, getattr(module, name), 1231 namespace[name])) 1232 else: 1233 # Encourage cleanliness and the use of __all__ for a 1234 # more concrete API with less surprises on '*' imports. 1235 warnings.warn('%s (%r) being imported from %s for use ' 1236 'in server control files is not the ' 1237 'first occurrence of that import.' % 1238 (name, namespace[name], module_name)) 1239 1240 namespace[name] = getattr(module, name) 1241 1242 1243 # This is the equivalent of prepending a bunch of import statements to 1244 # the front of the control script. 1245 namespace.update(os=os, sys=sys, logging=logging) 1246 _import_names('autotest_lib.server', 1247 ('hosts', 'autotest', 'standalone_profiler')) 1248 _import_names('autotest_lib.server.subcommand', 1249 ('parallel', 'parallel_simple', 'subcommand')) 1250 _import_names('autotest_lib.server.utils', 1251 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine')) 1252 _import_names('autotest_lib.client.common_lib.error') 1253 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',)) 1254 1255 # Inject ourself as the job object into other classes within the API. 1256 # (Yuck, this injection is a gross thing be part of a public API. -gps) 1257 # 1258 # XXX Base & SiteAutotest do not appear to use .job. Who does? 1259 namespace['autotest'].Autotest.job = self 1260 # server.hosts.base_classes.Host uses .job. 1261 namespace['hosts'].Host.job = self 1262 namespace['hosts'].TestBed.job = self 1263 namespace['hosts'].factory.ssh_user = self._ssh_user 1264 namespace['hosts'].factory.ssh_port = self._ssh_port 1265 namespace['hosts'].factory.ssh_pass = self._ssh_pass 1266 namespace['hosts'].factory.ssh_verbosity_flag = ( 1267 self._ssh_verbosity_flag) 1268 namespace['hosts'].factory.ssh_options = self._ssh_options 1269 1270 1271 def _execute_code(self, code_file, namespace, protect=True): 1272 """ 1273 Execute code using a copy of namespace as a server control script. 1274 1275 Unless protect_namespace is explicitly set to False, the dict will not 1276 be modified. 1277 1278 Args: 1279 code_file: The filename of the control file to execute. 1280 namespace: A dict containing names to make available during execution. 1281 protect: Boolean. If True (the default) a copy of the namespace dict 1282 is used during execution to prevent the code from modifying its 1283 contents outside of this function. If False the raw dict is 1284 passed in and modifications will be allowed. 1285 """ 1286 if protect: 1287 namespace = namespace.copy() 1288 self._fill_server_control_namespace(namespace, protect=protect) 1289 # TODO: Simplify and get rid of the special cases for only 1 machine. 1290 if len(self.machines) > 1: 1291 machines_text = '\n'.join(self.machines) + '\n' 1292 # Only rewrite the file if it does not match our machine list. 1293 try: 1294 machines_f = open(MACHINES_FILENAME, 'r') 1295 existing_machines_text = machines_f.read() 1296 machines_f.close() 1297 except EnvironmentError: 1298 existing_machines_text = None 1299 if machines_text != existing_machines_text: 1300 utils.open_write_close(MACHINES_FILENAME, machines_text) 1301 execfile(code_file, namespace, namespace) 1302 1303 1304 def _parse_status(self, new_line): 1305 if not self._using_parser: 1306 return 1307 new_tests = self.parser.process_lines([new_line]) 1308 for test in new_tests: 1309 self.__insert_test(test) 1310 1311 1312 def __insert_test(self, test): 1313 """ 1314 An internal method to insert a new test result into the 1315 database. This method will not raise an exception, even if an 1316 error occurs during the insert, to avoid failing a test 1317 simply because of unexpected database issues.""" 1318 self.num_tests_run += 1 1319 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'): 1320 self.num_tests_failed += 1 1321 try: 1322 self.results_db.insert_test(self.job_model, test) 1323 except Exception: 1324 msg = ("WARNING: An unexpected error occured while " 1325 "inserting test results into the database. " 1326 "Ignoring error.\n" + traceback.format_exc()) 1327 print >> sys.stderr, msg 1328 1329 1330 def preprocess_client_state(self): 1331 """ 1332 Produce a state file for initializing the state of a client job. 1333 1334 Creates a new client state file with all the current server state, as 1335 well as some pre-set client state. 1336 1337 @returns The path of the file the state was written into. 1338 """ 1339 # initialize the sysinfo state 1340 self._state.set('client', 'sysinfo', self.sysinfo.serialize()) 1341 1342 # dump the state out to a tempfile 1343 fd, file_path = tempfile.mkstemp(dir=self.tmpdir) 1344 os.close(fd) 1345 1346 # write_to_file doesn't need locking, we exclusively own file_path 1347 self._state.write_to_file(file_path) 1348 return file_path 1349 1350 1351 def postprocess_client_state(self, state_path): 1352 """ 1353 Update the state of this job with the state from a client job. 1354 1355 Updates the state of the server side of a job with the final state 1356 of a client job that was run. Updates the non-client-specific state, 1357 pulls in some specific bits from the client-specific state, and then 1358 discards the rest. Removes the state file afterwards 1359 1360 @param state_file A path to the state file from the client. 1361 """ 1362 # update the on-disk state 1363 try: 1364 self._state.read_from_file(state_path) 1365 os.remove(state_path) 1366 except OSError, e: 1367 # ignore file-not-found errors 1368 if e.errno != errno.ENOENT: 1369 raise 1370 else: 1371 logging.debug('Client state file %s not found', state_path) 1372 1373 # update the sysinfo state 1374 if self._state.has('client', 'sysinfo'): 1375 self.sysinfo.deserialize(self._state.get('client', 'sysinfo')) 1376 1377 # drop all the client-specific state 1378 self._state.discard_namespace('client') 1379 1380 1381 def clear_all_known_hosts(self): 1382 """Clears known hosts files for all AbstractSSHHosts.""" 1383 for host in self.hosts: 1384 if isinstance(host, abstract_ssh.AbstractSSHHost): 1385 host.clear_known_hosts() 1386 1387 1388 class warning_manager(object): 1389 """Class for controlling warning logs. Manages the enabling and disabling 1390 of warnings.""" 1391 def __init__(self): 1392 # a map of warning types to a list of disabled time intervals 1393 self.disabled_warnings = {} 1394 1395 1396 def is_valid(self, timestamp, warning_type): 1397 """Indicates if a warning (based on the time it occured and its type) 1398 is a valid warning. A warning is considered "invalid" if this type of 1399 warning was marked as "disabled" at the time the warning occured.""" 1400 disabled_intervals = self.disabled_warnings.get(warning_type, []) 1401 for start, end in disabled_intervals: 1402 if timestamp >= start and (end is None or timestamp < end): 1403 return False 1404 return True 1405 1406 1407 def disable_warnings(self, warning_type, current_time_func=time.time): 1408 """As of now, disables all further warnings of this type.""" 1409 intervals = self.disabled_warnings.setdefault(warning_type, []) 1410 if not intervals or intervals[-1][1] is not None: 1411 intervals.append((int(current_time_func()), None)) 1412 1413 1414 def enable_warnings(self, warning_type, current_time_func=time.time): 1415 """As of now, enables all further warnings of this type.""" 1416 intervals = self.disabled_warnings.get(warning_type, []) 1417 if intervals and intervals[-1][1] is None: 1418 intervals[-1] = (intervals[-1][0], int(current_time_func())) 1419 1420 1421 def _is_current_server_job(test): 1422 """Return True if parsed test is the currently running job. 1423 1424 @param test: test instance from tko parser. 1425 """ 1426 return test.testname == 'SERVER_JOB' 1427 1428 1429 def _create_afe_host(hostname, in_lab): 1430 """Create a real or stub frontend.Host object. 1431 1432 @param hostname: Name of the host for which we want the Host object. 1433 @param in_lab: (bool) whether we have access to the AFE. 1434 @returns: An object of type frontend.AFE 1435 """ 1436 if not in_lab: 1437 return server_utils.EmptyAFEHost() 1438 1439 afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10) 1440 hosts = afe.get_hosts(hostname=hostname) 1441 if not hosts: 1442 raise error.AutoservError('No hosts named %s found' % hostname) 1443 1444 return hosts[0] 1445 1446 1447 def _create_host_info_store(hostname, in_lab): 1448 """Create a real or stub afe_store.AfeStore object. 1449 1450 @param hostname: Name of the host for which we want the store. 1451 @param in_lab: (bool) whether we have access to the AFE. 1452 @returns: An object of type afe_store.AfeStore 1453 """ 1454 if not in_lab: 1455 return host_info.InMemoryHostInfoStore() 1456 1457 host_info_store = afe_store.AfeStore(hostname) 1458 try: 1459 host_info_store.get(force_refresh=True) 1460 except host_info.StoreError: 1461 raise error.AutoservError('Could not obtain HostInfo for hostname %s' % 1462 hostname) 1463 return host_info_store 1464 1465 1466 # load up site-specific code for generating site-specific job data 1467 get_site_job_data = utils.import_site_function(__file__, 1468 "autotest_lib.server.site_server_job", "get_site_job_data", 1469 _get_site_job_data_dummy) 1470 1471 1472 site_server_job = utils.import_site_class( 1473 __file__, "autotest_lib.server.site_server_job", "site_server_job", 1474 base_server_job) 1475 1476 1477 class server_job(site_server_job): 1478 pass 1479