Home | History | Annotate | Download | only in server
      1 # Copyright Martin J. Bligh, Google Inc 2008
      2 # Released under the GPL v2
      3 
      4 """
      5 This class allows you to communicate with the frontend to submit jobs etc
      6 It is designed for writing more sophisiticated server-side control files that
      7 can recursively add and manage other jobs.
      8 
      9 We turn the JSON dictionaries into real objects that are more idiomatic
     10 
     11 For docs, see:
     12     http://www.chromium.org/chromium-os/testing/afe-rpc-infrastructure
     13     http://docs.djangoproject.com/en/dev/ref/models/querysets/#queryset-api
     14 """
     15 
     16 import getpass
     17 import os
     18 import re
     19 import time
     20 import traceback
     21 
     22 import common
     23 from autotest_lib.frontend.afe import rpc_client_lib
     24 from autotest_lib.client.common_lib import control_data
     25 from autotest_lib.client.common_lib import global_config
     26 from autotest_lib.client.common_lib import utils
     27 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     28 from autotest_lib.tko import db
     29 
     30 
     31 try:
     32     from autotest_lib.server.site_common import site_utils as server_utils
     33 except:
     34     from autotest_lib.server import utils as server_utils
     35 form_ntuples_from_machines = server_utils.form_ntuples_from_machines
     36 
     37 GLOBAL_CONFIG = global_config.global_config
     38 DEFAULT_SERVER = 'autotest'
     39 
     40 _tko_timer = autotest_stats.Timer('tko')
     41 
     42 def dump_object(header, obj):
     43     """
     44     Standard way to print out the frontend objects (eg job, host, acl, label)
     45     in a human-readable fashion for debugging
     46     """
     47     result = header + '\n'
     48     for key in obj.hash:
     49         if key == 'afe' or key == 'hash':
     50             continue
     51         result += '%20s: %s\n' % (key, obj.hash[key])
     52     return result
     53 
     54 
     55 class RpcClient(object):
     56     """
     57     Abstract RPC class for communicating with the autotest frontend
     58     Inherited for both TKO and AFE uses.
     59 
     60     All the constructors go in the afe / tko class.
     61     Manipulating methods go in the object classes themselves
     62     """
     63     def __init__(self, path, user, server, print_log, debug, reply_debug):
     64         """
     65         Create a cached instance of a connection to the frontend
     66 
     67             user: username to connect as
     68             server: frontend server to connect to
     69             print_log: pring a logging message to stdout on every operation
     70             debug: print out all RPC traffic
     71         """
     72         if not user and utils.is_in_container():
     73             user = GLOBAL_CONFIG.get_config_value('SSP', 'user', default=None)
     74         if not user:
     75             user = getpass.getuser()
     76         if not server:
     77             if 'AUTOTEST_WEB' in os.environ:
     78                 server = os.environ['AUTOTEST_WEB']
     79             else:
     80                 server = GLOBAL_CONFIG.get_config_value('SERVER', 'hostname',
     81                                                         default=DEFAULT_SERVER)
     82         self.server = server
     83         self.user = user
     84         self.print_log = print_log
     85         self.debug = debug
     86         self.reply_debug = reply_debug
     87         headers = {'AUTHORIZATION': self.user}
     88         rpc_server = 'http://' + server + path
     89         if debug:
     90             print 'SERVER: %s' % rpc_server
     91             print 'HEADERS: %s' % headers
     92         self.proxy = rpc_client_lib.get_proxy(rpc_server, headers=headers)
     93 
     94 
     95     def run(self, call, **dargs):
     96         """
     97         Make a RPC call to the AFE server
     98         """
     99         rpc_call = getattr(self.proxy, call)
    100         if self.debug:
    101             print 'DEBUG: %s %s' % (call, dargs)
    102         try:
    103             result = utils.strip_unicode(rpc_call(**dargs))
    104             if self.reply_debug:
    105                 print result
    106             return result
    107         except Exception:
    108             print 'FAILED RPC CALL: %s %s' % (call, dargs)
    109             raise
    110 
    111 
    112     def log(self, message):
    113         if self.print_log:
    114             print message
    115 
    116 
    117 class Planner(RpcClient):
    118     def __init__(self, user=None, server=None, print_log=True, debug=False,
    119                  reply_debug=False):
    120         super(Planner, self).__init__(path='/planner/server/rpc/',
    121                                       user=user,
    122                                       server=server,
    123                                       print_log=print_log,
    124                                       debug=debug,
    125                                       reply_debug=reply_debug)
    126 
    127 
    128 class TKO(RpcClient):
    129     def __init__(self, user=None, server=None, print_log=True, debug=False,
    130                  reply_debug=False):
    131         super(TKO, self).__init__(path='/new_tko/server/noauth/rpc/',
    132                                   user=user,
    133                                   server=server,
    134                                   print_log=print_log,
    135                                   debug=debug,
    136                                   reply_debug=reply_debug)
    137         self._db = None
    138 
    139 
    140     @_tko_timer.decorate
    141     def get_job_test_statuses_from_db(self, job_id):
    142         """Get job test statuses from the database.
    143 
    144         Retrieve a set of fields from a job that reflect the status of each test
    145         run within a job.
    146         fields retrieved: status, test_name, reason, test_started_time,
    147                           test_finished_time, afe_job_id, job_owner, hostname.
    148 
    149         @param job_id: The afe job id to look up.
    150         @returns a TestStatus object of the resulting information.
    151         """
    152         if self._db is None:
    153             self._db = db.db()
    154         fields = ['status', 'test_name', 'subdir', 'reason',
    155                   'test_started_time', 'test_finished_time', 'afe_job_id',
    156                   'job_owner', 'hostname', 'job_tag']
    157         table = 'tko_test_view_2'
    158         where = 'job_tag like "%s-%%"' % job_id
    159         test_status = []
    160         # Run commit before we query to ensure that we are pulling the latest
    161         # results.
    162         self._db.commit()
    163         for entry in self._db.select(','.join(fields), table, (where, None)):
    164             status_dict = {}
    165             for key,value in zip(fields, entry):
    166                 # All callers expect values to be a str object.
    167                 status_dict[key] = str(value)
    168             # id is used by TestStatus to uniquely identify each Test Status
    169             # obj.
    170             status_dict['id'] = [status_dict['reason'], status_dict['hostname'],
    171                                  status_dict['test_name']]
    172             test_status.append(status_dict)
    173 
    174         return [TestStatus(self, e) for e in test_status]
    175 
    176 
    177     def get_status_counts(self, job, **data):
    178         entries = self.run('get_status_counts',
    179                            group_by=['hostname', 'test_name', 'reason'],
    180                            job_tag__startswith='%s-' % job, **data)
    181         return [TestStatus(self, e) for e in entries['groups']]
    182 
    183 
    184 class AFE(RpcClient):
    185     def __init__(self, user=None, server=None, print_log=True, debug=False,
    186                  reply_debug=False, job=None):
    187         self.job = job
    188         super(AFE, self).__init__(path='/afe/server/noauth/rpc/',
    189                                   user=user,
    190                                   server=server,
    191                                   print_log=print_log,
    192                                   debug=debug,
    193                                   reply_debug=reply_debug)
    194 
    195 
    196     def host_statuses(self, live=None):
    197         dead_statuses = ['Repair Failed', 'Repairing']
    198         statuses = self.run('get_static_data')['host_statuses']
    199         if live == True:
    200             return list(set(statuses) - set(dead_statuses))
    201         if live == False:
    202             return dead_statuses
    203         else:
    204             return statuses
    205 
    206 
    207     @staticmethod
    208     def _dict_for_host_query(hostnames=(), status=None, label=None):
    209         query_args = {}
    210         if hostnames:
    211             query_args['hostname__in'] = hostnames
    212         if status:
    213             query_args['status'] = status
    214         if label:
    215             query_args['labels__name'] = label
    216         return query_args
    217 
    218 
    219     def get_hosts(self, hostnames=(), status=None, label=None, **dargs):
    220         query_args = dict(dargs)
    221         query_args.update(self._dict_for_host_query(hostnames=hostnames,
    222                                                     status=status,
    223                                                     label=label))
    224         hosts = self.run('get_hosts', **query_args)
    225         return [Host(self, h) for h in hosts]
    226 
    227 
    228     def get_hostnames(self, status=None, label=None, **dargs):
    229         """Like get_hosts() but returns hostnames instead of Host objects."""
    230         # This implementation can be replaced with a more efficient one
    231         # that does not query for entire host objects in the future.
    232         return [host_obj.hostname for host_obj in
    233                 self.get_hosts(status=status, label=label, **dargs)]
    234 
    235 
    236     def reverify_hosts(self, hostnames=(), status=None, label=None):
    237         query_args = dict(locked=False,
    238                           aclgroup__users__login=self.user)
    239         query_args.update(self._dict_for_host_query(hostnames=hostnames,
    240                                                     status=status,
    241                                                     label=label))
    242         return self.run('reverify_hosts', **query_args)
    243 
    244 
    245     def create_host(self, hostname, **dargs):
    246         id = self.run('add_host', hostname=hostname, **dargs)
    247         return self.get_hosts(id=id)[0]
    248 
    249 
    250     def get_host_attribute(self, attr, **dargs):
    251         host_attrs = self.run('get_host_attribute', attribute=attr, **dargs)
    252         return [HostAttribute(self, a) for a in host_attrs]
    253 
    254 
    255     def set_host_attribute(self, attr, val, **dargs):
    256         self.run('set_host_attribute', attribute=attr, value=val, **dargs)
    257 
    258 
    259     def get_labels(self, **dargs):
    260         labels = self.run('get_labels', **dargs)
    261         return [Label(self, l) for l in labels]
    262 
    263 
    264     def create_label(self, name, **dargs):
    265         id = self.run('add_label', name=name, **dargs)
    266         return self.get_labels(id=id)[0]
    267 
    268 
    269     def get_acls(self, **dargs):
    270         acls = self.run('get_acl_groups', **dargs)
    271         return [Acl(self, a) for a in acls]
    272 
    273 
    274     def create_acl(self, name, **dargs):
    275         id = self.run('add_acl_group', name=name, **dargs)
    276         return self.get_acls(id=id)[0]
    277 
    278 
    279     def get_users(self, **dargs):
    280         users = self.run('get_users', **dargs)
    281         return [User(self, u) for u in users]
    282 
    283 
    284     def generate_control_file(self, tests, **dargs):
    285         ret = self.run('generate_control_file', tests=tests, **dargs)
    286         return ControlFile(self, ret)
    287 
    288 
    289     def get_jobs(self, summary=False, **dargs):
    290         if summary:
    291             jobs_data = self.run('get_jobs_summary', **dargs)
    292         else:
    293             jobs_data = self.run('get_jobs', **dargs)
    294         jobs = []
    295         for j in jobs_data:
    296             job = Job(self, j)
    297             # Set up some extra information defaults
    298             job.testname = re.sub('\s.*', '', job.name) # arbitrary default
    299             job.platform_results = {}
    300             job.platform_reasons = {}
    301             jobs.append(job)
    302         return jobs
    303 
    304 
    305     def get_host_queue_entries(self, **data):
    306         entries = self.run('get_host_queue_entries', **data)
    307         job_statuses = [JobStatus(self, e) for e in entries]
    308 
    309         # Sadly, get_host_queue_entries doesn't return platforms, we have
    310         # to get those back from an explicit get_hosts queury, then patch
    311         # the new host objects back into the host list.
    312         hostnames = [s.host.hostname for s in job_statuses if s.host]
    313         host_hash = {}
    314         for host in self.get_hosts(hostname__in=hostnames):
    315             host_hash[host.hostname] = host
    316         for status in job_statuses:
    317             if status.host:
    318                 status.host = host_hash.get(status.host.hostname)
    319         # filter job statuses that have either host or meta_host
    320         return [status for status in job_statuses if (status.host or
    321                                                       status.meta_host)]
    322 
    323 
    324     def get_special_tasks(self, **data):
    325         tasks = self.run('get_special_tasks', **data)
    326         return [SpecialTask(self, t) for t in tasks]
    327 
    328 
    329     def get_host_special_tasks(self, host_id, **data):
    330         tasks = self.run('get_host_special_tasks',
    331                          host_id=host_id, **data)
    332         return [SpecialTask(self, t) for t in tasks]
    333 
    334 
    335     def get_host_status_task(self, host_id, end_time):
    336         task = self.run('get_host_status_task',
    337                         host_id=host_id, end_time=end_time)
    338         return SpecialTask(self, task) if task else None
    339 
    340 
    341     def get_host_diagnosis_interval(self, host_id, end_time, success):
    342         return self.run('get_host_diagnosis_interval',
    343                         host_id=host_id, end_time=end_time,
    344                         success=success)
    345 
    346 
    347     def create_job_by_test(self, tests, kernel=None, use_container=False,
    348                            kernel_cmdline=None, **dargs):
    349         """
    350         Given a test name, fetch the appropriate control file from the server
    351         and submit it.
    352 
    353         @param kernel: A comma separated list of kernel versions to boot.
    354         @param kernel_cmdline: The command line used to boot all kernels listed
    355                 in the kernel parameter.
    356 
    357         Returns a list of job objects
    358         """
    359         assert ('hosts' in dargs or
    360                 'atomic_group_name' in dargs and 'synch_count' in dargs)
    361         if kernel:
    362             kernel_list =  re.split('[\s,]+', kernel.strip())
    363             kernel_info = []
    364             for version in kernel_list:
    365                 kernel_dict = {'version': version}
    366                 if kernel_cmdline is not None:
    367                     kernel_dict['cmdline'] = kernel_cmdline
    368                 kernel_info.append(kernel_dict)
    369         else:
    370             kernel_info = None
    371         control_file = self.generate_control_file(
    372                 tests=tests, kernel=kernel_info, use_container=use_container)
    373         if control_file.is_server:
    374             dargs['control_type'] = control_data.CONTROL_TYPE_NAMES.SERVER
    375         else:
    376             dargs['control_type'] = control_data.CONTROL_TYPE_NAMES.CLIENT
    377         dargs['dependencies'] = dargs.get('dependencies', []) + \
    378                                 control_file.dependencies
    379         dargs['control_file'] = control_file.control_file
    380         if not dargs.get('synch_count', None):
    381             dargs['synch_count'] = control_file.synch_count
    382         if 'hosts' in dargs and len(dargs['hosts']) < dargs['synch_count']:
    383             # will not be able to satisfy this request
    384             return None
    385         return self.create_job(**dargs)
    386 
    387 
    388     def create_job(self, control_file, name=' ', priority='Medium',
    389                 control_type=control_data.CONTROL_TYPE_NAMES.CLIENT, **dargs):
    390         id = self.run('create_job', name=name, priority=priority,
    391                  control_file=control_file, control_type=control_type, **dargs)
    392         return self.get_jobs(id=id)[0]
    393 
    394 
    395     def run_test_suites(self, pairings, kernel, kernel_label=None,
    396                         priority='Medium', wait=True, poll_interval=10,
    397                         email_from=None, email_to=None, timeout_mins=10080,
    398                         max_runtime_mins=10080, kernel_cmdline=None):
    399         """
    400         Run a list of test suites on a particular kernel.
    401 
    402         Poll for them to complete, and return whether they worked or not.
    403 
    404         @param pairings: List of MachineTestPairing objects to invoke.
    405         @param kernel: Name of the kernel to run.
    406         @param kernel_label: Label (string) of the kernel to run such as
    407                     '<kernel-version> : <config> : <date>'
    408                     If any pairing object has its job_label attribute set it
    409                     will override this value for that particular job.
    410         @param kernel_cmdline: The command line to boot the kernel(s) with.
    411         @param wait: boolean - Wait for the results to come back?
    412         @param poll_interval: Interval between polling for job results (in mins)
    413         @param email_from: Send notification email upon completion from here.
    414         @param email_from: Send notification email upon completion to here.
    415         """
    416         jobs = []
    417         for pairing in pairings:
    418             try:
    419                 new_job = self.invoke_test(pairing, kernel, kernel_label,
    420                                            priority, timeout_mins=timeout_mins,
    421                                            kernel_cmdline=kernel_cmdline,
    422                                            max_runtime_mins=max_runtime_mins)
    423                 if not new_job:
    424                     continue
    425                 jobs.append(new_job)
    426             except Exception, e:
    427                 traceback.print_exc()
    428         if not wait or not jobs:
    429             return
    430         tko = TKO()
    431         while True:
    432             time.sleep(60 * poll_interval)
    433             result = self.poll_all_jobs(tko, jobs, email_from, email_to)
    434             if result is not None:
    435                 return result
    436 
    437 
    438     def result_notify(self, job, email_from, email_to):
    439         """
    440         Notify about the result of a job. Will always print, if email data
    441         is provided, will send email for it as well.
    442 
    443             job: job object to notify about
    444             email_from: send notification email upon completion from here
    445             email_from: send notification email upon completion to here
    446         """
    447         if job.result == True:
    448             subject = 'Testing PASSED: '
    449         else:
    450             subject = 'Testing FAILED: '
    451         subject += '%s : %s\n' % (job.name, job.id)
    452         text = []
    453         for platform in job.results_platform_map:
    454             for status in job.results_platform_map[platform]:
    455                 if status == 'Total':
    456                     continue
    457                 for host in job.results_platform_map[platform][status]:
    458                     text.append('%20s %10s %10s' % (platform, status, host))
    459                     if status == 'Failed':
    460                         for test_status in job.test_status[host].fail:
    461                             text.append('(%s, %s) : %s' % \
    462                                         (host, test_status.test_name,
    463                                          test_status.reason))
    464                         text.append('')
    465 
    466         base_url = 'http://' + self.server
    467 
    468         params = ('columns=test',
    469                   'rows=machine_group',
    470                   "condition=tag~'%s-%%25'" % job.id,
    471                   'title=Report')
    472         query_string = '&'.join(params)
    473         url = '%s/tko/compose_query.cgi?%s' % (base_url, query_string)
    474         text.append(url + '\n')
    475         url = '%s/afe/#tab_id=view_job&object_id=%s' % (base_url, job.id)
    476         text.append(url + '\n')
    477 
    478         body = '\n'.join(text)
    479         print '---------------------------------------------------'
    480         print 'Subject: ', subject
    481         print body
    482         print '---------------------------------------------------'
    483         if email_from and email_to:
    484             print 'Sending email ...'
    485             utils.send_email(email_from, email_to, subject, body)
    486         print
    487 
    488 
    489     def print_job_result(self, job):
    490         """
    491         Print the result of a single job.
    492             job: a job object
    493         """
    494         if job.result is None:
    495             print 'PENDING',
    496         elif job.result == True:
    497             print 'PASSED',
    498         elif job.result == False:
    499             print 'FAILED',
    500         elif job.result == "Abort":
    501             print 'ABORT',
    502         print ' %s : %s' % (job.id, job.name)
    503 
    504 
    505     def poll_all_jobs(self, tko, jobs, email_from=None, email_to=None):
    506         """
    507         Poll all jobs in a list.
    508             jobs: list of job objects to poll
    509             email_from: send notification email upon completion from here
    510             email_from: send notification email upon completion to here
    511 
    512         Returns:
    513             a) All complete successfully (return True)
    514             b) One or more has failed (return False)
    515             c) Cannot tell yet (return None)
    516         """
    517         results = []
    518         for job in jobs:
    519             if getattr(job, 'result', None) is None:
    520                 job.result = self.poll_job_results(tko, job)
    521                 if job.result is not None:
    522                     self.result_notify(job, email_from, email_to)
    523 
    524             results.append(job.result)
    525             self.print_job_result(job)
    526 
    527         if None in results:
    528             return None
    529         elif False in results or "Abort" in results:
    530             return False
    531         else:
    532             return True
    533 
    534 
    535     def _included_platform(self, host, platforms):
    536         """
    537         See if host's platforms matches any of the patterns in the included
    538         platforms list.
    539         """
    540         if not platforms:
    541             return True        # No filtering of platforms
    542         for platform in platforms:
    543             if re.search(platform, host.platform):
    544                 return True
    545         return False
    546 
    547 
    548     def invoke_test(self, pairing, kernel, kernel_label, priority='Medium',
    549                     kernel_cmdline=None, **dargs):
    550         """
    551         Given a pairing of a control file to a machine label, find all machines
    552         with that label, and submit that control file to them.
    553 
    554         @param kernel_label: Label (string) of the kernel to run such as
    555                 '<kernel-version> : <config> : <date>'
    556                 If any pairing object has its job_label attribute set it
    557                 will override this value for that particular job.
    558 
    559         @returns A list of job objects.
    560         """
    561         # The pairing can override the job label.
    562         if pairing.job_label:
    563             kernel_label = pairing.job_label
    564         job_name = '%s : %s' % (pairing.machine_label, kernel_label)
    565         hosts = self.get_hosts(multiple_labels=[pairing.machine_label])
    566         platforms = pairing.platforms
    567         hosts = [h for h in hosts if self._included_platform(h, platforms)]
    568         dead_statuses = self.host_statuses(live=False)
    569         host_list = [h.hostname for h in hosts if h.status not in dead_statuses]
    570         print 'HOSTS: %s' % host_list
    571         if pairing.atomic_group_sched:
    572             dargs['synch_count'] = pairing.synch_count
    573             dargs['atomic_group_name'] = pairing.machine_label
    574         else:
    575             dargs['hosts'] = host_list
    576         new_job = self.create_job_by_test(name=job_name,
    577                                           dependencies=[pairing.machine_label],
    578                                           tests=[pairing.control_file],
    579                                           priority=priority,
    580                                           kernel=kernel,
    581                                           kernel_cmdline=kernel_cmdline,
    582                                           use_container=pairing.container,
    583                                           **dargs)
    584         if new_job:
    585             if pairing.testname:
    586                 new_job.testname = pairing.testname
    587             print 'Invoked test %s : %s' % (new_job.id, job_name)
    588         return new_job
    589 
    590 
    591     def _job_test_results(self, tko, job, debug, tests=[]):
    592         """
    593         Retrieve test results for a job
    594         """
    595         job.test_status = {}
    596         try:
    597             test_statuses = tko.get_status_counts(job=job.id)
    598         except Exception:
    599             print "Ignoring exception on poll job; RPC interface is flaky"
    600             traceback.print_exc()
    601             return
    602 
    603         for test_status in test_statuses:
    604             # SERVER_JOB is buggy, and often gives false failures. Ignore it.
    605             if test_status.test_name == 'SERVER_JOB':
    606                 continue
    607             # if tests is not empty, restrict list of test_statuses to tests
    608             if tests and test_status.test_name not in tests:
    609                 continue
    610             if debug:
    611                 print test_status
    612             hostname = test_status.hostname
    613             if hostname not in job.test_status:
    614                 job.test_status[hostname] = TestResults()
    615             job.test_status[hostname].add(test_status)
    616 
    617 
    618     def _job_results_platform_map(self, job, debug):
    619         # Figure out which hosts passed / failed / aborted in a job
    620         # Creates a 2-dimensional hash, stored as job.results_platform_map
    621         #     1st index - platform type (string)
    622         #     2nd index - Status (string)
    623         #         'Completed' / 'Failed' / 'Aborted'
    624         #     Data indexed by this hash is a list of hostnames (text strings)
    625         job.results_platform_map = {}
    626         try:
    627             job_statuses = self.get_host_queue_entries(job=job.id)
    628         except Exception:
    629             print "Ignoring exception on poll job; RPC interface is flaky"
    630             traceback.print_exc()
    631             return None
    632 
    633         platform_map = {}
    634         job.job_status = {}
    635         job.metahost_index = {}
    636         for job_status in job_statuses:
    637             # This is basically "for each host / metahost in the job"
    638             if job_status.host:
    639                 hostname = job_status.host.hostname
    640             else:              # This is a metahost
    641                 metahost = job_status.meta_host
    642                 index = job.metahost_index.get(metahost, 1)
    643                 job.metahost_index[metahost] = index + 1
    644                 hostname = '%s.%s' % (metahost, index)
    645             job.job_status[hostname] = job_status.status
    646             status = job_status.status
    647             # Skip hosts that failed verify or repair:
    648             # that's a machine failure, not a job failure
    649             if hostname in job.test_status:
    650                 verify_failed = False
    651                 for failure in job.test_status[hostname].fail:
    652                     if (failure.test_name == 'verify' or
    653                             failure.test_name == 'repair'):
    654                         verify_failed = True
    655                         break
    656                 if verify_failed:
    657                     continue
    658             if hostname in job.test_status and job.test_status[hostname].fail:
    659                 # If the any tests failed in the job, we want to mark the
    660                 # job result as failed, overriding the default job status.
    661                 if status != "Aborted":         # except if it's an aborted job
    662                     status = 'Failed'
    663             if job_status.host:
    664                 platform = job_status.host.platform
    665             else:              # This is a metahost
    666                 platform = job_status.meta_host
    667             if platform not in platform_map:
    668                 platform_map[platform] = {'Total' : [hostname]}
    669             else:
    670                 platform_map[platform]['Total'].append(hostname)
    671             new_host_list = platform_map[platform].get(status, []) + [hostname]
    672             platform_map[platform][status] = new_host_list
    673         job.results_platform_map = platform_map
    674 
    675 
    676     def set_platform_results(self, test_job, platform, result):
    677         """
    678         Result must be None, 'FAIL', 'WARN' or 'GOOD'
    679         """
    680         if test_job.platform_results[platform] is not None:
    681             # We're already done, and results recorded. This can't change later.
    682             return
    683         test_job.platform_results[platform] = result
    684         # Note that self.job refers to the metajob we're IN, not the job
    685         # that we're excuting from here.
    686         testname = '%s.%s' % (test_job.testname, platform)
    687         if self.job:
    688             self.job.record(result, None, testname, status='')
    689 
    690 
    691     def poll_job_results(self, tko, job, enough=1, debug=False):
    692         """
    693         Analyse all job results by platform
    694 
    695           params:
    696             tko: a TKO object representing the results DB.
    697             job: the job to be examined.
    698             enough: the acceptable delta between the number of completed
    699                     tests and the total number of tests.
    700             debug: enable debugging output.
    701 
    702           returns:
    703             False: if any platform has more than |enough| failures
    704             None:  if any platform has less than |enough| machines
    705                    not yet Good.
    706             True:  if all platforms have at least |enough| machines
    707                    Good.
    708         """
    709         self._job_test_results(tko, job, debug)
    710         if job.test_status == {}:
    711             return None
    712         self._job_results_platform_map(job, debug)
    713 
    714         good_platforms = []
    715         failed_platforms = []
    716         aborted_platforms = []
    717         unknown_platforms = []
    718         platform_map = job.results_platform_map
    719         for platform in platform_map:
    720             if not job.platform_results.has_key(platform):
    721                 # record test start, but there's no way to do this right now
    722                 job.platform_results[platform] = None
    723             total = len(platform_map[platform]['Total'])
    724             completed = len(platform_map[platform].get('Completed', []))
    725             failed = len(platform_map[platform].get('Failed', []))
    726             aborted = len(platform_map[platform].get('Aborted', []))
    727 
    728             # We set up what we want to record here, but don't actually do
    729             # it yet, until we have a decisive answer for this platform
    730             if aborted or failed:
    731                 bad = aborted + failed
    732                 if (bad > 1) or (bad * 2 >= total):
    733                     platform_test_result = 'FAIL'
    734                 else:
    735                     platform_test_result = 'WARN'
    736 
    737             if aborted > enough:
    738                 aborted_platforms.append(platform)
    739                 self.set_platform_results(job, platform, platform_test_result)
    740             elif (failed * 2 >= total) or (failed > enough):
    741                 failed_platforms.append(platform)
    742                 self.set_platform_results(job, platform, platform_test_result)
    743             elif (completed >= enough) and (completed + enough >= total):
    744                 good_platforms.append(platform)
    745                 self.set_platform_results(job, platform, 'GOOD')
    746             else:
    747                 unknown_platforms.append(platform)
    748             detail = []
    749             for status in platform_map[platform]:
    750                 if status == 'Total':
    751                     continue
    752                 detail.append('%s=%s' % (status,platform_map[platform][status]))
    753             if debug:
    754                 print '%20s %d/%d %s' % (platform, completed, total,
    755                                          ' '.join(detail))
    756                 print
    757 
    758         if len(aborted_platforms) > 0:
    759             if debug:
    760                 print 'Result aborted - platforms: ',
    761                 print ' '.join(aborted_platforms)
    762             return "Abort"
    763         if len(failed_platforms) > 0:
    764             if debug:
    765                 print 'Result bad - platforms: ' + ' '.join(failed_platforms)
    766             return False
    767         if len(unknown_platforms) > 0:
    768             if debug:
    769                 platform_list = ' '.join(unknown_platforms)
    770                 print 'Result unknown - platforms: ', platform_list
    771             return None
    772         if debug:
    773             platform_list = ' '.join(good_platforms)
    774             print 'Result good - all platforms passed: ', platform_list
    775         return True
    776 
    777 
    778     def abort_jobs(self, jobs):
    779         """Abort a list of jobs.
    780 
    781         Already completed jobs will not be affected.
    782 
    783         @param jobs: List of job ids to abort.
    784         """
    785         for job in jobs:
    786             self.run('abort_host_queue_entries', job_id=job)
    787 
    788 
    789 class TestResults(object):
    790     """
    791     Container class used to hold the results of the tests for a job
    792     """
    793     def __init__(self):
    794         self.good = []
    795         self.fail = []
    796         self.pending = []
    797 
    798 
    799     def add(self, result):
    800         if result.complete_count > result.pass_count:
    801             self.fail.append(result)
    802         elif result.incomplete_count > 0:
    803             self.pending.append(result)
    804         else:
    805             self.good.append(result)
    806 
    807 
    808 class RpcObject(object):
    809     """
    810     Generic object used to construct python objects from rpc calls
    811     """
    812     def __init__(self, afe, hash):
    813         self.afe = afe
    814         self.hash = hash
    815         self.__dict__.update(hash)
    816 
    817 
    818     def __str__(self):
    819         return dump_object(self.__repr__(), self)
    820 
    821 
    822 class ControlFile(RpcObject):
    823     """
    824     AFE control file object
    825 
    826     Fields: synch_count, dependencies, control_file, is_server
    827     """
    828     def __repr__(self):
    829         return 'CONTROL FILE: %s' % self.control_file
    830 
    831 
    832 class Label(RpcObject):
    833     """
    834     AFE label object
    835 
    836     Fields:
    837         name, invalid, platform, kernel_config, id, only_if_needed
    838     """
    839     def __repr__(self):
    840         return 'LABEL: %s' % self.name
    841 
    842 
    843     def add_hosts(self, hosts):
    844         return self.afe.run('label_add_hosts', id=self.id, hosts=hosts)
    845 
    846 
    847     def remove_hosts(self, hosts):
    848         return self.afe.run('label_remove_hosts', id=self.id, hosts=hosts)
    849 
    850 
    851 class Acl(RpcObject):
    852     """
    853     AFE acl object
    854 
    855     Fields:
    856         users, hosts, description, name, id
    857     """
    858     def __repr__(self):
    859         return 'ACL: %s' % self.name
    860 
    861 
    862     def add_hosts(self, hosts):
    863         self.afe.log('Adding hosts %s to ACL %s' % (hosts, self.name))
    864         return self.afe.run('acl_group_add_hosts', self.id, hosts)
    865 
    866 
    867     def remove_hosts(self, hosts):
    868         self.afe.log('Removing hosts %s from ACL %s' % (hosts, self.name))
    869         return self.afe.run('acl_group_remove_hosts', self.id, hosts)
    870 
    871 
    872     def add_users(self, users):
    873         self.afe.log('Adding users %s to ACL %s' % (users, self.name))
    874         return self.afe.run('acl_group_add_users', id=self.name, users=users)
    875 
    876 
    877 class Job(RpcObject):
    878     """
    879     AFE job object
    880 
    881     Fields:
    882         name, control_file, control_type, synch_count, reboot_before,
    883         run_verify, priority, email_list, created_on, dependencies,
    884         timeout, owner, reboot_after, id
    885     """
    886     def __repr__(self):
    887         return 'JOB: %s' % self.id
    888 
    889 
    890 class JobStatus(RpcObject):
    891     """
    892     AFE job_status object
    893 
    894     Fields:
    895         status, complete, deleted, meta_host, host, active, execution_subdir, id
    896     """
    897     def __init__(self, afe, hash):
    898         super(JobStatus, self).__init__(afe, hash)
    899         self.job = Job(afe, self.job)
    900         if getattr(self, 'host'):
    901             self.host = Host(afe, self.host)
    902 
    903 
    904     def __repr__(self):
    905         if self.host and self.host.hostname:
    906             hostname = self.host.hostname
    907         else:
    908             hostname = 'None'
    909         return 'JOB STATUS: %s-%s' % (self.job.id, hostname)
    910 
    911 
    912 class SpecialTask(RpcObject):
    913     """
    914     AFE special task object
    915     """
    916     def __init__(self, afe, hash):
    917         super(SpecialTask, self).__init__(afe, hash)
    918         self.host = Host(afe, self.host)
    919 
    920 
    921     def __repr__(self):
    922         return 'SPECIAL TASK: %s' % self.id
    923 
    924 
    925 class Host(RpcObject):
    926     """
    927     AFE host object
    928 
    929     Fields:
    930         status, lock_time, locked_by, locked, hostname, invalid,
    931         synch_id, labels, platform, protection, dirty, id
    932     """
    933     def __repr__(self):
    934         return 'HOST OBJECT: %s' % self.hostname
    935 
    936 
    937     def show(self):
    938         labels = list(set(self.labels) - set([self.platform]))
    939         print '%-6s %-7s %-7s %-16s %s' % (self.hostname, self.status,
    940                                            self.locked, self.platform,
    941                                            ', '.join(labels))
    942 
    943 
    944     def delete(self):
    945         return self.afe.run('delete_host', id=self.id)
    946 
    947 
    948     def modify(self, **dargs):
    949         return self.afe.run('modify_host', id=self.id, **dargs)
    950 
    951 
    952     def get_acls(self):
    953         return self.afe.get_acls(hosts__hostname=self.hostname)
    954 
    955 
    956     def add_acl(self, acl_name):
    957         self.afe.log('Adding ACL %s to host %s' % (acl_name, self.hostname))
    958         return self.afe.run('acl_group_add_hosts', id=acl_name,
    959                             hosts=[self.hostname])
    960 
    961 
    962     def remove_acl(self, acl_name):
    963         self.afe.log('Removing ACL %s from host %s' % (acl_name, self.hostname))
    964         return self.afe.run('acl_group_remove_hosts', id=acl_name,
    965                             hosts=[self.hostname])
    966 
    967 
    968     def get_labels(self):
    969         return self.afe.get_labels(host__hostname__in=[self.hostname])
    970 
    971 
    972     def add_labels(self, labels):
    973         self.afe.log('Adding labels %s to host %s' % (labels, self.hostname))
    974         return self.afe.run('host_add_labels', id=self.id, labels=labels)
    975 
    976 
    977     def remove_labels(self, labels):
    978         self.afe.log('Removing labels %s from host %s' % (labels,self.hostname))
    979         return self.afe.run('host_remove_labels', id=self.id, labels=labels)
    980 
    981 
    982 class User(RpcObject):
    983     def __repr__(self):
    984         return 'USER: %s' % self.login
    985 
    986 
    987 class TestStatus(RpcObject):
    988     """
    989     TKO test status object
    990 
    991     Fields:
    992         test_idx, hostname, testname, id
    993         complete_count, incomplete_count, group_count, pass_count
    994     """
    995     def __repr__(self):
    996         return 'TEST STATUS: %s' % self.id
    997 
    998 
    999 class HostAttribute(RpcObject):
   1000     """
   1001     AFE host attribute object
   1002 
   1003     Fields:
   1004         id, host, attribute, value
   1005     """
   1006     def __repr__(self):
   1007         return 'HOST ATTRIBUTE %d' % self.id
   1008 
   1009 
   1010 class MachineTestPairing(object):
   1011     """
   1012     Object representing the pairing of a machine label with a control file
   1013 
   1014     machine_label: use machines from this label
   1015     control_file: use this control file (by name in the frontend)
   1016     platforms: list of rexeps to filter platforms by. [] => no filtering
   1017     job_label: The label (name) to give to the autotest job launched
   1018             to run this pairing.  '<kernel-version> : <config> : <date>'
   1019     """
   1020     def __init__(self, machine_label, control_file, platforms=[],
   1021                  container=False, atomic_group_sched=False, synch_count=0,
   1022                  testname=None, job_label=None):
   1023         self.machine_label = machine_label
   1024         self.control_file = control_file
   1025         self.platforms = platforms
   1026         self.container = container
   1027         self.atomic_group_sched = atomic_group_sched
   1028         self.synch_count = synch_count
   1029         self.testname = testname
   1030         self.job_label = job_label
   1031 
   1032 
   1033     def __repr__(self):
   1034         return '%s %s %s %s' % (self.machine_label, self.control_file,
   1035                                 self.platforms, self.container)
   1036