1 # Copyright Martin J. Bligh, Google Inc 2008 2 # Released under the GPL v2 3 4 """ 5 This class allows you to communicate with the frontend to submit jobs etc 6 It is designed for writing more sophisiticated server-side control files that 7 can recursively add and manage other jobs. 8 9 We turn the JSON dictionaries into real objects that are more idiomatic 10 11 For docs, see: 12 http://www.chromium.org/chromium-os/testing/afe-rpc-infrastructure 13 http://docs.djangoproject.com/en/dev/ref/models/querysets/#queryset-api 14 """ 15 16 import getpass 17 import os 18 import re 19 import time 20 import traceback 21 22 import common 23 from autotest_lib.frontend.afe import rpc_client_lib 24 from autotest_lib.client.common_lib import control_data 25 from autotest_lib.client.common_lib import global_config 26 from autotest_lib.client.common_lib import utils 27 from autotest_lib.client.common_lib.cros.graphite import autotest_stats 28 from autotest_lib.tko import db 29 30 31 try: 32 from autotest_lib.server.site_common import site_utils as server_utils 33 except: 34 from autotest_lib.server import utils as server_utils 35 form_ntuples_from_machines = server_utils.form_ntuples_from_machines 36 37 GLOBAL_CONFIG = global_config.global_config 38 DEFAULT_SERVER = 'autotest' 39 40 _tko_timer = autotest_stats.Timer('tko') 41 42 def dump_object(header, obj): 43 """ 44 Standard way to print out the frontend objects (eg job, host, acl, label) 45 in a human-readable fashion for debugging 46 """ 47 result = header + '\n' 48 for key in obj.hash: 49 if key == 'afe' or key == 'hash': 50 continue 51 result += '%20s: %s\n' % (key, obj.hash[key]) 52 return result 53 54 55 class RpcClient(object): 56 """ 57 Abstract RPC class for communicating with the autotest frontend 58 Inherited for both TKO and AFE uses. 59 60 All the constructors go in the afe / tko class. 61 Manipulating methods go in the object classes themselves 62 """ 63 def __init__(self, path, user, server, print_log, debug, reply_debug): 64 """ 65 Create a cached instance of a connection to the frontend 66 67 user: username to connect as 68 server: frontend server to connect to 69 print_log: pring a logging message to stdout on every operation 70 debug: print out all RPC traffic 71 """ 72 if not user and utils.is_in_container(): 73 user = GLOBAL_CONFIG.get_config_value('SSP', 'user', default=None) 74 if not user: 75 user = getpass.getuser() 76 if not server: 77 if 'AUTOTEST_WEB' in os.environ: 78 server = os.environ['AUTOTEST_WEB'] 79 else: 80 server = GLOBAL_CONFIG.get_config_value('SERVER', 'hostname', 81 default=DEFAULT_SERVER) 82 self.server = server 83 self.user = user 84 self.print_log = print_log 85 self.debug = debug 86 self.reply_debug = reply_debug 87 headers = {'AUTHORIZATION': self.user} 88 rpc_server = 'http://' + server + path 89 if debug: 90 print 'SERVER: %s' % rpc_server 91 print 'HEADERS: %s' % headers 92 self.proxy = rpc_client_lib.get_proxy(rpc_server, headers=headers) 93 94 95 def run(self, call, **dargs): 96 """ 97 Make a RPC call to the AFE server 98 """ 99 rpc_call = getattr(self.proxy, call) 100 if self.debug: 101 print 'DEBUG: %s %s' % (call, dargs) 102 try: 103 result = utils.strip_unicode(rpc_call(**dargs)) 104 if self.reply_debug: 105 print result 106 return result 107 except Exception: 108 print 'FAILED RPC CALL: %s %s' % (call, dargs) 109 raise 110 111 112 def log(self, message): 113 if self.print_log: 114 print message 115 116 117 class Planner(RpcClient): 118 def __init__(self, user=None, server=None, print_log=True, debug=False, 119 reply_debug=False): 120 super(Planner, self).__init__(path='/planner/server/rpc/', 121 user=user, 122 server=server, 123 print_log=print_log, 124 debug=debug, 125 reply_debug=reply_debug) 126 127 128 class TKO(RpcClient): 129 def __init__(self, user=None, server=None, print_log=True, debug=False, 130 reply_debug=False): 131 super(TKO, self).__init__(path='/new_tko/server/noauth/rpc/', 132 user=user, 133 server=server, 134 print_log=print_log, 135 debug=debug, 136 reply_debug=reply_debug) 137 self._db = None 138 139 140 @_tko_timer.decorate 141 def get_job_test_statuses_from_db(self, job_id): 142 """Get job test statuses from the database. 143 144 Retrieve a set of fields from a job that reflect the status of each test 145 run within a job. 146 fields retrieved: status, test_name, reason, test_started_time, 147 test_finished_time, afe_job_id, job_owner, hostname. 148 149 @param job_id: The afe job id to look up. 150 @returns a TestStatus object of the resulting information. 151 """ 152 if self._db is None: 153 self._db = db.db() 154 fields = ['status', 'test_name', 'subdir', 'reason', 155 'test_started_time', 'test_finished_time', 'afe_job_id', 156 'job_owner', 'hostname', 'job_tag'] 157 table = 'tko_test_view_2' 158 where = 'job_tag like "%s-%%"' % job_id 159 test_status = [] 160 # Run commit before we query to ensure that we are pulling the latest 161 # results. 162 self._db.commit() 163 for entry in self._db.select(','.join(fields), table, (where, None)): 164 status_dict = {} 165 for key,value in zip(fields, entry): 166 # All callers expect values to be a str object. 167 status_dict[key] = str(value) 168 # id is used by TestStatus to uniquely identify each Test Status 169 # obj. 170 status_dict['id'] = [status_dict['reason'], status_dict['hostname'], 171 status_dict['test_name']] 172 test_status.append(status_dict) 173 174 return [TestStatus(self, e) for e in test_status] 175 176 177 def get_status_counts(self, job, **data): 178 entries = self.run('get_status_counts', 179 group_by=['hostname', 'test_name', 'reason'], 180 job_tag__startswith='%s-' % job, **data) 181 return [TestStatus(self, e) for e in entries['groups']] 182 183 184 class AFE(RpcClient): 185 def __init__(self, user=None, server=None, print_log=True, debug=False, 186 reply_debug=False, job=None): 187 self.job = job 188 super(AFE, self).__init__(path='/afe/server/noauth/rpc/', 189 user=user, 190 server=server, 191 print_log=print_log, 192 debug=debug, 193 reply_debug=reply_debug) 194 195 196 def host_statuses(self, live=None): 197 dead_statuses = ['Repair Failed', 'Repairing'] 198 statuses = self.run('get_static_data')['host_statuses'] 199 if live == True: 200 return list(set(statuses) - set(dead_statuses)) 201 if live == False: 202 return dead_statuses 203 else: 204 return statuses 205 206 207 @staticmethod 208 def _dict_for_host_query(hostnames=(), status=None, label=None): 209 query_args = {} 210 if hostnames: 211 query_args['hostname__in'] = hostnames 212 if status: 213 query_args['status'] = status 214 if label: 215 query_args['labels__name'] = label 216 return query_args 217 218 219 def get_hosts(self, hostnames=(), status=None, label=None, **dargs): 220 query_args = dict(dargs) 221 query_args.update(self._dict_for_host_query(hostnames=hostnames, 222 status=status, 223 label=label)) 224 hosts = self.run('get_hosts', **query_args) 225 return [Host(self, h) for h in hosts] 226 227 228 def get_hostnames(self, status=None, label=None, **dargs): 229 """Like get_hosts() but returns hostnames instead of Host objects.""" 230 # This implementation can be replaced with a more efficient one 231 # that does not query for entire host objects in the future. 232 return [host_obj.hostname for host_obj in 233 self.get_hosts(status=status, label=label, **dargs)] 234 235 236 def reverify_hosts(self, hostnames=(), status=None, label=None): 237 query_args = dict(locked=False, 238 aclgroup__users__login=self.user) 239 query_args.update(self._dict_for_host_query(hostnames=hostnames, 240 status=status, 241 label=label)) 242 return self.run('reverify_hosts', **query_args) 243 244 245 def create_host(self, hostname, **dargs): 246 id = self.run('add_host', hostname=hostname, **dargs) 247 return self.get_hosts(id=id)[0] 248 249 250 def get_host_attribute(self, attr, **dargs): 251 host_attrs = self.run('get_host_attribute', attribute=attr, **dargs) 252 return [HostAttribute(self, a) for a in host_attrs] 253 254 255 def set_host_attribute(self, attr, val, **dargs): 256 self.run('set_host_attribute', attribute=attr, value=val, **dargs) 257 258 259 def get_labels(self, **dargs): 260 labels = self.run('get_labels', **dargs) 261 return [Label(self, l) for l in labels] 262 263 264 def create_label(self, name, **dargs): 265 id = self.run('add_label', name=name, **dargs) 266 return self.get_labels(id=id)[0] 267 268 269 def get_acls(self, **dargs): 270 acls = self.run('get_acl_groups', **dargs) 271 return [Acl(self, a) for a in acls] 272 273 274 def create_acl(self, name, **dargs): 275 id = self.run('add_acl_group', name=name, **dargs) 276 return self.get_acls(id=id)[0] 277 278 279 def get_users(self, **dargs): 280 users = self.run('get_users', **dargs) 281 return [User(self, u) for u in users] 282 283 284 def generate_control_file(self, tests, **dargs): 285 ret = self.run('generate_control_file', tests=tests, **dargs) 286 return ControlFile(self, ret) 287 288 289 def get_jobs(self, summary=False, **dargs): 290 if summary: 291 jobs_data = self.run('get_jobs_summary', **dargs) 292 else: 293 jobs_data = self.run('get_jobs', **dargs) 294 jobs = [] 295 for j in jobs_data: 296 job = Job(self, j) 297 # Set up some extra information defaults 298 job.testname = re.sub('\s.*', '', job.name) # arbitrary default 299 job.platform_results = {} 300 job.platform_reasons = {} 301 jobs.append(job) 302 return jobs 303 304 305 def get_host_queue_entries(self, **data): 306 entries = self.run('get_host_queue_entries', **data) 307 job_statuses = [JobStatus(self, e) for e in entries] 308 309 # Sadly, get_host_queue_entries doesn't return platforms, we have 310 # to get those back from an explicit get_hosts queury, then patch 311 # the new host objects back into the host list. 312 hostnames = [s.host.hostname for s in job_statuses if s.host] 313 host_hash = {} 314 for host in self.get_hosts(hostname__in=hostnames): 315 host_hash[host.hostname] = host 316 for status in job_statuses: 317 if status.host: 318 status.host = host_hash.get(status.host.hostname) 319 # filter job statuses that have either host or meta_host 320 return [status for status in job_statuses if (status.host or 321 status.meta_host)] 322 323 324 def get_special_tasks(self, **data): 325 tasks = self.run('get_special_tasks', **data) 326 return [SpecialTask(self, t) for t in tasks] 327 328 329 def get_host_special_tasks(self, host_id, **data): 330 tasks = self.run('get_host_special_tasks', 331 host_id=host_id, **data) 332 return [SpecialTask(self, t) for t in tasks] 333 334 335 def get_host_status_task(self, host_id, end_time): 336 task = self.run('get_host_status_task', 337 host_id=host_id, end_time=end_time) 338 return SpecialTask(self, task) if task else None 339 340 341 def get_host_diagnosis_interval(self, host_id, end_time, success): 342 return self.run('get_host_diagnosis_interval', 343 host_id=host_id, end_time=end_time, 344 success=success) 345 346 347 def create_job_by_test(self, tests, kernel=None, use_container=False, 348 kernel_cmdline=None, **dargs): 349 """ 350 Given a test name, fetch the appropriate control file from the server 351 and submit it. 352 353 @param kernel: A comma separated list of kernel versions to boot. 354 @param kernel_cmdline: The command line used to boot all kernels listed 355 in the kernel parameter. 356 357 Returns a list of job objects 358 """ 359 assert ('hosts' in dargs or 360 'atomic_group_name' in dargs and 'synch_count' in dargs) 361 if kernel: 362 kernel_list = re.split('[\s,]+', kernel.strip()) 363 kernel_info = [] 364 for version in kernel_list: 365 kernel_dict = {'version': version} 366 if kernel_cmdline is not None: 367 kernel_dict['cmdline'] = kernel_cmdline 368 kernel_info.append(kernel_dict) 369 else: 370 kernel_info = None 371 control_file = self.generate_control_file( 372 tests=tests, kernel=kernel_info, use_container=use_container) 373 if control_file.is_server: 374 dargs['control_type'] = control_data.CONTROL_TYPE_NAMES.SERVER 375 else: 376 dargs['control_type'] = control_data.CONTROL_TYPE_NAMES.CLIENT 377 dargs['dependencies'] = dargs.get('dependencies', []) + \ 378 control_file.dependencies 379 dargs['control_file'] = control_file.control_file 380 if not dargs.get('synch_count', None): 381 dargs['synch_count'] = control_file.synch_count 382 if 'hosts' in dargs and len(dargs['hosts']) < dargs['synch_count']: 383 # will not be able to satisfy this request 384 return None 385 return self.create_job(**dargs) 386 387 388 def create_job(self, control_file, name=' ', priority='Medium', 389 control_type=control_data.CONTROL_TYPE_NAMES.CLIENT, **dargs): 390 id = self.run('create_job', name=name, priority=priority, 391 control_file=control_file, control_type=control_type, **dargs) 392 return self.get_jobs(id=id)[0] 393 394 395 def run_test_suites(self, pairings, kernel, kernel_label=None, 396 priority='Medium', wait=True, poll_interval=10, 397 email_from=None, email_to=None, timeout_mins=10080, 398 max_runtime_mins=10080, kernel_cmdline=None): 399 """ 400 Run a list of test suites on a particular kernel. 401 402 Poll for them to complete, and return whether they worked or not. 403 404 @param pairings: List of MachineTestPairing objects to invoke. 405 @param kernel: Name of the kernel to run. 406 @param kernel_label: Label (string) of the kernel to run such as 407 '<kernel-version> : <config> : <date>' 408 If any pairing object has its job_label attribute set it 409 will override this value for that particular job. 410 @param kernel_cmdline: The command line to boot the kernel(s) with. 411 @param wait: boolean - Wait for the results to come back? 412 @param poll_interval: Interval between polling for job results (in mins) 413 @param email_from: Send notification email upon completion from here. 414 @param email_from: Send notification email upon completion to here. 415 """ 416 jobs = [] 417 for pairing in pairings: 418 try: 419 new_job = self.invoke_test(pairing, kernel, kernel_label, 420 priority, timeout_mins=timeout_mins, 421 kernel_cmdline=kernel_cmdline, 422 max_runtime_mins=max_runtime_mins) 423 if not new_job: 424 continue 425 jobs.append(new_job) 426 except Exception, e: 427 traceback.print_exc() 428 if not wait or not jobs: 429 return 430 tko = TKO() 431 while True: 432 time.sleep(60 * poll_interval) 433 result = self.poll_all_jobs(tko, jobs, email_from, email_to) 434 if result is not None: 435 return result 436 437 438 def result_notify(self, job, email_from, email_to): 439 """ 440 Notify about the result of a job. Will always print, if email data 441 is provided, will send email for it as well. 442 443 job: job object to notify about 444 email_from: send notification email upon completion from here 445 email_from: send notification email upon completion to here 446 """ 447 if job.result == True: 448 subject = 'Testing PASSED: ' 449 else: 450 subject = 'Testing FAILED: ' 451 subject += '%s : %s\n' % (job.name, job.id) 452 text = [] 453 for platform in job.results_platform_map: 454 for status in job.results_platform_map[platform]: 455 if status == 'Total': 456 continue 457 for host in job.results_platform_map[platform][status]: 458 text.append('%20s %10s %10s' % (platform, status, host)) 459 if status == 'Failed': 460 for test_status in job.test_status[host].fail: 461 text.append('(%s, %s) : %s' % \ 462 (host, test_status.test_name, 463 test_status.reason)) 464 text.append('') 465 466 base_url = 'http://' + self.server 467 468 params = ('columns=test', 469 'rows=machine_group', 470 "condition=tag~'%s-%%25'" % job.id, 471 'title=Report') 472 query_string = '&'.join(params) 473 url = '%s/tko/compose_query.cgi?%s' % (base_url, query_string) 474 text.append(url + '\n') 475 url = '%s/afe/#tab_id=view_job&object_id=%s' % (base_url, job.id) 476 text.append(url + '\n') 477 478 body = '\n'.join(text) 479 print '---------------------------------------------------' 480 print 'Subject: ', subject 481 print body 482 print '---------------------------------------------------' 483 if email_from and email_to: 484 print 'Sending email ...' 485 utils.send_email(email_from, email_to, subject, body) 486 print 487 488 489 def print_job_result(self, job): 490 """ 491 Print the result of a single job. 492 job: a job object 493 """ 494 if job.result is None: 495 print 'PENDING', 496 elif job.result == True: 497 print 'PASSED', 498 elif job.result == False: 499 print 'FAILED', 500 elif job.result == "Abort": 501 print 'ABORT', 502 print ' %s : %s' % (job.id, job.name) 503 504 505 def poll_all_jobs(self, tko, jobs, email_from=None, email_to=None): 506 """ 507 Poll all jobs in a list. 508 jobs: list of job objects to poll 509 email_from: send notification email upon completion from here 510 email_from: send notification email upon completion to here 511 512 Returns: 513 a) All complete successfully (return True) 514 b) One or more has failed (return False) 515 c) Cannot tell yet (return None) 516 """ 517 results = [] 518 for job in jobs: 519 if getattr(job, 'result', None) is None: 520 job.result = self.poll_job_results(tko, job) 521 if job.result is not None: 522 self.result_notify(job, email_from, email_to) 523 524 results.append(job.result) 525 self.print_job_result(job) 526 527 if None in results: 528 return None 529 elif False in results or "Abort" in results: 530 return False 531 else: 532 return True 533 534 535 def _included_platform(self, host, platforms): 536 """ 537 See if host's platforms matches any of the patterns in the included 538 platforms list. 539 """ 540 if not platforms: 541 return True # No filtering of platforms 542 for platform in platforms: 543 if re.search(platform, host.platform): 544 return True 545 return False 546 547 548 def invoke_test(self, pairing, kernel, kernel_label, priority='Medium', 549 kernel_cmdline=None, **dargs): 550 """ 551 Given a pairing of a control file to a machine label, find all machines 552 with that label, and submit that control file to them. 553 554 @param kernel_label: Label (string) of the kernel to run such as 555 '<kernel-version> : <config> : <date>' 556 If any pairing object has its job_label attribute set it 557 will override this value for that particular job. 558 559 @returns A list of job objects. 560 """ 561 # The pairing can override the job label. 562 if pairing.job_label: 563 kernel_label = pairing.job_label 564 job_name = '%s : %s' % (pairing.machine_label, kernel_label) 565 hosts = self.get_hosts(multiple_labels=[pairing.machine_label]) 566 platforms = pairing.platforms 567 hosts = [h for h in hosts if self._included_platform(h, platforms)] 568 dead_statuses = self.host_statuses(live=False) 569 host_list = [h.hostname for h in hosts if h.status not in dead_statuses] 570 print 'HOSTS: %s' % host_list 571 if pairing.atomic_group_sched: 572 dargs['synch_count'] = pairing.synch_count 573 dargs['atomic_group_name'] = pairing.machine_label 574 else: 575 dargs['hosts'] = host_list 576 new_job = self.create_job_by_test(name=job_name, 577 dependencies=[pairing.machine_label], 578 tests=[pairing.control_file], 579 priority=priority, 580 kernel=kernel, 581 kernel_cmdline=kernel_cmdline, 582 use_container=pairing.container, 583 **dargs) 584 if new_job: 585 if pairing.testname: 586 new_job.testname = pairing.testname 587 print 'Invoked test %s : %s' % (new_job.id, job_name) 588 return new_job 589 590 591 def _job_test_results(self, tko, job, debug, tests=[]): 592 """ 593 Retrieve test results for a job 594 """ 595 job.test_status = {} 596 try: 597 test_statuses = tko.get_status_counts(job=job.id) 598 except Exception: 599 print "Ignoring exception on poll job; RPC interface is flaky" 600 traceback.print_exc() 601 return 602 603 for test_status in test_statuses: 604 # SERVER_JOB is buggy, and often gives false failures. Ignore it. 605 if test_status.test_name == 'SERVER_JOB': 606 continue 607 # if tests is not empty, restrict list of test_statuses to tests 608 if tests and test_status.test_name not in tests: 609 continue 610 if debug: 611 print test_status 612 hostname = test_status.hostname 613 if hostname not in job.test_status: 614 job.test_status[hostname] = TestResults() 615 job.test_status[hostname].add(test_status) 616 617 618 def _job_results_platform_map(self, job, debug): 619 # Figure out which hosts passed / failed / aborted in a job 620 # Creates a 2-dimensional hash, stored as job.results_platform_map 621 # 1st index - platform type (string) 622 # 2nd index - Status (string) 623 # 'Completed' / 'Failed' / 'Aborted' 624 # Data indexed by this hash is a list of hostnames (text strings) 625 job.results_platform_map = {} 626 try: 627 job_statuses = self.get_host_queue_entries(job=job.id) 628 except Exception: 629 print "Ignoring exception on poll job; RPC interface is flaky" 630 traceback.print_exc() 631 return None 632 633 platform_map = {} 634 job.job_status = {} 635 job.metahost_index = {} 636 for job_status in job_statuses: 637 # This is basically "for each host / metahost in the job" 638 if job_status.host: 639 hostname = job_status.host.hostname 640 else: # This is a metahost 641 metahost = job_status.meta_host 642 index = job.metahost_index.get(metahost, 1) 643 job.metahost_index[metahost] = index + 1 644 hostname = '%s.%s' % (metahost, index) 645 job.job_status[hostname] = job_status.status 646 status = job_status.status 647 # Skip hosts that failed verify or repair: 648 # that's a machine failure, not a job failure 649 if hostname in job.test_status: 650 verify_failed = False 651 for failure in job.test_status[hostname].fail: 652 if (failure.test_name == 'verify' or 653 failure.test_name == 'repair'): 654 verify_failed = True 655 break 656 if verify_failed: 657 continue 658 if hostname in job.test_status and job.test_status[hostname].fail: 659 # If the any tests failed in the job, we want to mark the 660 # job result as failed, overriding the default job status. 661 if status != "Aborted": # except if it's an aborted job 662 status = 'Failed' 663 if job_status.host: 664 platform = job_status.host.platform 665 else: # This is a metahost 666 platform = job_status.meta_host 667 if platform not in platform_map: 668 platform_map[platform] = {'Total' : [hostname]} 669 else: 670 platform_map[platform]['Total'].append(hostname) 671 new_host_list = platform_map[platform].get(status, []) + [hostname] 672 platform_map[platform][status] = new_host_list 673 job.results_platform_map = platform_map 674 675 676 def set_platform_results(self, test_job, platform, result): 677 """ 678 Result must be None, 'FAIL', 'WARN' or 'GOOD' 679 """ 680 if test_job.platform_results[platform] is not None: 681 # We're already done, and results recorded. This can't change later. 682 return 683 test_job.platform_results[platform] = result 684 # Note that self.job refers to the metajob we're IN, not the job 685 # that we're excuting from here. 686 testname = '%s.%s' % (test_job.testname, platform) 687 if self.job: 688 self.job.record(result, None, testname, status='') 689 690 691 def poll_job_results(self, tko, job, enough=1, debug=False): 692 """ 693 Analyse all job results by platform 694 695 params: 696 tko: a TKO object representing the results DB. 697 job: the job to be examined. 698 enough: the acceptable delta between the number of completed 699 tests and the total number of tests. 700 debug: enable debugging output. 701 702 returns: 703 False: if any platform has more than |enough| failures 704 None: if any platform has less than |enough| machines 705 not yet Good. 706 True: if all platforms have at least |enough| machines 707 Good. 708 """ 709 self._job_test_results(tko, job, debug) 710 if job.test_status == {}: 711 return None 712 self._job_results_platform_map(job, debug) 713 714 good_platforms = [] 715 failed_platforms = [] 716 aborted_platforms = [] 717 unknown_platforms = [] 718 platform_map = job.results_platform_map 719 for platform in platform_map: 720 if not job.platform_results.has_key(platform): 721 # record test start, but there's no way to do this right now 722 job.platform_results[platform] = None 723 total = len(platform_map[platform]['Total']) 724 completed = len(platform_map[platform].get('Completed', [])) 725 failed = len(platform_map[platform].get('Failed', [])) 726 aborted = len(platform_map[platform].get('Aborted', [])) 727 728 # We set up what we want to record here, but don't actually do 729 # it yet, until we have a decisive answer for this platform 730 if aborted or failed: 731 bad = aborted + failed 732 if (bad > 1) or (bad * 2 >= total): 733 platform_test_result = 'FAIL' 734 else: 735 platform_test_result = 'WARN' 736 737 if aborted > enough: 738 aborted_platforms.append(platform) 739 self.set_platform_results(job, platform, platform_test_result) 740 elif (failed * 2 >= total) or (failed > enough): 741 failed_platforms.append(platform) 742 self.set_platform_results(job, platform, platform_test_result) 743 elif (completed >= enough) and (completed + enough >= total): 744 good_platforms.append(platform) 745 self.set_platform_results(job, platform, 'GOOD') 746 else: 747 unknown_platforms.append(platform) 748 detail = [] 749 for status in platform_map[platform]: 750 if status == 'Total': 751 continue 752 detail.append('%s=%s' % (status,platform_map[platform][status])) 753 if debug: 754 print '%20s %d/%d %s' % (platform, completed, total, 755 ' '.join(detail)) 756 print 757 758 if len(aborted_platforms) > 0: 759 if debug: 760 print 'Result aborted - platforms: ', 761 print ' '.join(aborted_platforms) 762 return "Abort" 763 if len(failed_platforms) > 0: 764 if debug: 765 print 'Result bad - platforms: ' + ' '.join(failed_platforms) 766 return False 767 if len(unknown_platforms) > 0: 768 if debug: 769 platform_list = ' '.join(unknown_platforms) 770 print 'Result unknown - platforms: ', platform_list 771 return None 772 if debug: 773 platform_list = ' '.join(good_platforms) 774 print 'Result good - all platforms passed: ', platform_list 775 return True 776 777 778 def abort_jobs(self, jobs): 779 """Abort a list of jobs. 780 781 Already completed jobs will not be affected. 782 783 @param jobs: List of job ids to abort. 784 """ 785 for job in jobs: 786 self.run('abort_host_queue_entries', job_id=job) 787 788 789 class TestResults(object): 790 """ 791 Container class used to hold the results of the tests for a job 792 """ 793 def __init__(self): 794 self.good = [] 795 self.fail = [] 796 self.pending = [] 797 798 799 def add(self, result): 800 if result.complete_count > result.pass_count: 801 self.fail.append(result) 802 elif result.incomplete_count > 0: 803 self.pending.append(result) 804 else: 805 self.good.append(result) 806 807 808 class RpcObject(object): 809 """ 810 Generic object used to construct python objects from rpc calls 811 """ 812 def __init__(self, afe, hash): 813 self.afe = afe 814 self.hash = hash 815 self.__dict__.update(hash) 816 817 818 def __str__(self): 819 return dump_object(self.__repr__(), self) 820 821 822 class ControlFile(RpcObject): 823 """ 824 AFE control file object 825 826 Fields: synch_count, dependencies, control_file, is_server 827 """ 828 def __repr__(self): 829 return 'CONTROL FILE: %s' % self.control_file 830 831 832 class Label(RpcObject): 833 """ 834 AFE label object 835 836 Fields: 837 name, invalid, platform, kernel_config, id, only_if_needed 838 """ 839 def __repr__(self): 840 return 'LABEL: %s' % self.name 841 842 843 def add_hosts(self, hosts): 844 return self.afe.run('label_add_hosts', id=self.id, hosts=hosts) 845 846 847 def remove_hosts(self, hosts): 848 return self.afe.run('label_remove_hosts', id=self.id, hosts=hosts) 849 850 851 class Acl(RpcObject): 852 """ 853 AFE acl object 854 855 Fields: 856 users, hosts, description, name, id 857 """ 858 def __repr__(self): 859 return 'ACL: %s' % self.name 860 861 862 def add_hosts(self, hosts): 863 self.afe.log('Adding hosts %s to ACL %s' % (hosts, self.name)) 864 return self.afe.run('acl_group_add_hosts', self.id, hosts) 865 866 867 def remove_hosts(self, hosts): 868 self.afe.log('Removing hosts %s from ACL %s' % (hosts, self.name)) 869 return self.afe.run('acl_group_remove_hosts', self.id, hosts) 870 871 872 def add_users(self, users): 873 self.afe.log('Adding users %s to ACL %s' % (users, self.name)) 874 return self.afe.run('acl_group_add_users', id=self.name, users=users) 875 876 877 class Job(RpcObject): 878 """ 879 AFE job object 880 881 Fields: 882 name, control_file, control_type, synch_count, reboot_before, 883 run_verify, priority, email_list, created_on, dependencies, 884 timeout, owner, reboot_after, id 885 """ 886 def __repr__(self): 887 return 'JOB: %s' % self.id 888 889 890 class JobStatus(RpcObject): 891 """ 892 AFE job_status object 893 894 Fields: 895 status, complete, deleted, meta_host, host, active, execution_subdir, id 896 """ 897 def __init__(self, afe, hash): 898 super(JobStatus, self).__init__(afe, hash) 899 self.job = Job(afe, self.job) 900 if getattr(self, 'host'): 901 self.host = Host(afe, self.host) 902 903 904 def __repr__(self): 905 if self.host and self.host.hostname: 906 hostname = self.host.hostname 907 else: 908 hostname = 'None' 909 return 'JOB STATUS: %s-%s' % (self.job.id, hostname) 910 911 912 class SpecialTask(RpcObject): 913 """ 914 AFE special task object 915 """ 916 def __init__(self, afe, hash): 917 super(SpecialTask, self).__init__(afe, hash) 918 self.host = Host(afe, self.host) 919 920 921 def __repr__(self): 922 return 'SPECIAL TASK: %s' % self.id 923 924 925 class Host(RpcObject): 926 """ 927 AFE host object 928 929 Fields: 930 status, lock_time, locked_by, locked, hostname, invalid, 931 synch_id, labels, platform, protection, dirty, id 932 """ 933 def __repr__(self): 934 return 'HOST OBJECT: %s' % self.hostname 935 936 937 def show(self): 938 labels = list(set(self.labels) - set([self.platform])) 939 print '%-6s %-7s %-7s %-16s %s' % (self.hostname, self.status, 940 self.locked, self.platform, 941 ', '.join(labels)) 942 943 944 def delete(self): 945 return self.afe.run('delete_host', id=self.id) 946 947 948 def modify(self, **dargs): 949 return self.afe.run('modify_host', id=self.id, **dargs) 950 951 952 def get_acls(self): 953 return self.afe.get_acls(hosts__hostname=self.hostname) 954 955 956 def add_acl(self, acl_name): 957 self.afe.log('Adding ACL %s to host %s' % (acl_name, self.hostname)) 958 return self.afe.run('acl_group_add_hosts', id=acl_name, 959 hosts=[self.hostname]) 960 961 962 def remove_acl(self, acl_name): 963 self.afe.log('Removing ACL %s from host %s' % (acl_name, self.hostname)) 964 return self.afe.run('acl_group_remove_hosts', id=acl_name, 965 hosts=[self.hostname]) 966 967 968 def get_labels(self): 969 return self.afe.get_labels(host__hostname__in=[self.hostname]) 970 971 972 def add_labels(self, labels): 973 self.afe.log('Adding labels %s to host %s' % (labels, self.hostname)) 974 return self.afe.run('host_add_labels', id=self.id, labels=labels) 975 976 977 def remove_labels(self, labels): 978 self.afe.log('Removing labels %s from host %s' % (labels,self.hostname)) 979 return self.afe.run('host_remove_labels', id=self.id, labels=labels) 980 981 982 class User(RpcObject): 983 def __repr__(self): 984 return 'USER: %s' % self.login 985 986 987 class TestStatus(RpcObject): 988 """ 989 TKO test status object 990 991 Fields: 992 test_idx, hostname, testname, id 993 complete_count, incomplete_count, group_count, pass_count 994 """ 995 def __repr__(self): 996 return 'TEST STATUS: %s' % self.id 997 998 999 class HostAttribute(RpcObject): 1000 """ 1001 AFE host attribute object 1002 1003 Fields: 1004 id, host, attribute, value 1005 """ 1006 def __repr__(self): 1007 return 'HOST ATTRIBUTE %d' % self.id 1008 1009 1010 class MachineTestPairing(object): 1011 """ 1012 Object representing the pairing of a machine label with a control file 1013 1014 machine_label: use machines from this label 1015 control_file: use this control file (by name in the frontend) 1016 platforms: list of rexeps to filter platforms by. [] => no filtering 1017 job_label: The label (name) to give to the autotest job launched 1018 to run this pairing. '<kernel-version> : <config> : <date>' 1019 """ 1020 def __init__(self, machine_label, control_file, platforms=[], 1021 container=False, atomic_group_sched=False, synch_count=0, 1022 testname=None, job_label=None): 1023 self.machine_label = machine_label 1024 self.control_file = control_file 1025 self.platforms = platforms 1026 self.container = container 1027 self.atomic_group_sched = atomic_group_sched 1028 self.synch_count = synch_count 1029 self.testname = testname 1030 self.job_label = job_label 1031 1032 1033 def __repr__(self): 1034 return '%s %s %s %s' % (self.machine_label, self.control_file, 1035 self.platforms, self.container) 1036