Home | History | Annotate | Download | only in afe
      1 # pylint: disable=missing-docstring
      2 
      3 """\
      4 Functions to expose over the RPC interface.
      5 
      6 For all modify* and delete* functions that ask for an 'id' parameter to
      7 identify the object to operate on, the id may be either
      8  * the database row ID
      9  * the name of the object (label name, hostname, user login, etc.)
     10  * a dictionary containing uniquely identifying field (this option should seldom
     11    be used)
     12 
     13 When specifying foreign key fields (i.e. adding hosts to a label, or adding
     14 users to an ACL group), the given value may be either the database row ID or the
     15 name of the object.
     16 
     17 All get* functions return lists of dictionaries.  Each dictionary represents one
     18 object and maps field names to values.
     19 
     20 Some examples:
     21 modify_host(2, hostname='myhost') # modify hostname of host with database ID 2
     22 modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2'
     23 modify_test('sleeptest', test_type='Client', params=', seconds=60')
     24 delete_acl_group(1) # delete by ID
     25 delete_acl_group('Everyone') # delete by name
     26 acl_group_add_users('Everyone', ['mbligh', 'showard'])
     27 get_jobs(owner='showard', status='Queued')
     28 
     29 See doctests/001_rpc_test.txt for (lots) more examples.
     30 """
     31 
     32 __author__ = 'showard (at] google.com (Steve Howard)'
     33 
     34 import ast
     35 import datetime
     36 import logging
     37 import os
     38 import sys
     39 
     40 from django.db.models import Count
     41 
     42 import common
     43 # TODO(akeshet): Replace with monarch stats once we know how to instrument rpc
     44 # server with ts_mon.
     45 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     46 from autotest_lib.client.common_lib import control_data
     47 from autotest_lib.client.common_lib import error
     48 from autotest_lib.client.common_lib import global_config
     49 from autotest_lib.client.common_lib import priorities
     50 from autotest_lib.client.common_lib import time_utils
     51 from autotest_lib.client.common_lib.cros import dev_server
     52 from autotest_lib.frontend.afe import control_file as control_file_lib
     53 from autotest_lib.frontend.afe import model_attributes
     54 from autotest_lib.frontend.afe import model_logic
     55 from autotest_lib.frontend.afe import models
     56 from autotest_lib.frontend.afe import rpc_utils
     57 from autotest_lib.frontend.tko import models as tko_models
     58 from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
     59 from autotest_lib.server import frontend
     60 from autotest_lib.server import utils
     61 from autotest_lib.server.cros import provision
     62 from autotest_lib.server.cros.dynamic_suite import constants
     63 from autotest_lib.server.cros.dynamic_suite import control_file_getter
     64 from autotest_lib.server.cros.dynamic_suite import suite as SuiteBase
     65 from autotest_lib.server.cros.dynamic_suite import tools
     66 from autotest_lib.server.cros.dynamic_suite.suite import Suite
     67 from autotest_lib.server.lib import status_history
     68 from autotest_lib.site_utils import host_history
     69 from autotest_lib.site_utils import job_history
     70 from autotest_lib.site_utils import server_manager_utils
     71 from autotest_lib.site_utils import stable_version_utils
     72 
     73 
     74 _CONFIG = global_config.global_config
     75 
     76 # Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py.
     77 
     78 # labels
     79 
     80 def modify_label(id, **data):
     81     """Modify a label.
     82 
     83     @param id: id or name of a label. More often a label name.
     84     @param data: New data for a label.
     85     """
     86     label_model = models.Label.smart_get(id)
     87     label_model.update_object(data)
     88 
     89     # Master forwards the RPC to shards
     90     if not utils.is_shard():
     91         rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False,
     92                              id=id, **data)
     93 
     94 
     95 def delete_label(id):
     96     """Delete a label.
     97 
     98     @param id: id or name of a label. More often a label name.
     99     """
    100     label_model = models.Label.smart_get(id)
    101     # Hosts that have the label to be deleted. Save this info before
    102     # the label is deleted to use it later.
    103     hosts = []
    104     for h in label_model.host_set.all():
    105         hosts.append(models.Host.smart_get(h.id))
    106     label_model.delete()
    107 
    108     # Master forwards the RPC to shards
    109     if not utils.is_shard():
    110         rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id)
    111 
    112 
    113 def add_label(name, ignore_exception_if_exists=False, **kwargs):
    114     """Adds a new label of a given name.
    115 
    116     @param name: label name.
    117     @param ignore_exception_if_exists: If True and the exception was
    118         thrown due to the duplicated label name when adding a label,
    119         then suppress the exception. Default is False.
    120     @param kwargs: keyword args that store more info about a label
    121         other than the name.
    122     @return: int/long id of a new label.
    123     """
    124     # models.Label.add_object() throws model_logic.ValidationError
    125     # when it is given a label name that already exists.
    126     # However, ValidationError can be thrown with different errors,
    127     # and those errors should be thrown up to the call chain.
    128     try:
    129         label = models.Label.add_object(name=name, **kwargs)
    130     except:
    131         exc_info = sys.exc_info()
    132         if ignore_exception_if_exists:
    133             label = rpc_utils.get_label(name)
    134             # If the exception is raised not because of duplicated
    135             # "name", then raise the original exception.
    136             if label is None:
    137                 raise exc_info[0], exc_info[1], exc_info[2]
    138         else:
    139             raise exc_info[0], exc_info[1], exc_info[2]
    140     return label.id
    141 
    142 
    143 def add_label_to_hosts(id, hosts):
    144     """Adds a label of the given id to the given hosts only in local DB.
    145 
    146     @param id: id or name of a label. More often a label name.
    147     @param hosts: The hostnames of hosts that need the label.
    148 
    149     @raises models.Label.DoesNotExist: If the label with id doesn't exist.
    150     """
    151     label = models.Label.smart_get(id)
    152     host_objs = models.Host.smart_get_bulk(hosts)
    153     if label.platform:
    154         models.Host.check_no_platform(host_objs)
    155     # Ensure a host has no more than one board label with it.
    156     if label.name.startswith('board:'):
    157         models.Host.check_board_labels_allowed(host_objs, [label.name])
    158     label.host_set.add(*host_objs)
    159 
    160 
    161 def _create_label_everywhere(id, hosts):
    162     """
    163     Yet another method to create labels.
    164 
    165     ALERT! This method should be run only on master not shards!
    166     DO NOT RUN THIS ON A SHARD!!!  Deputies will hate you if you do!!!
    167 
    168     This method exists primarily to serve label_add_hosts() and
    169     host_add_labels().  Basically it pulls out the label check/add logic
    170     from label_add_hosts() into this nice method that not only creates
    171     the label but also tells the shards that service the hosts to also
    172     create the label.
    173 
    174     @param id: id or name of a label. More often a label name.
    175     @param hosts: A list of hostnames or ids. More often hostnames.
    176     """
    177     try:
    178         label = models.Label.smart_get(id)
    179     except models.Label.DoesNotExist:
    180         # This matches the type checks in smart_get, which is a hack
    181         # in and off itself. The aim here is to create any non-existent
    182         # label, which we cannot do if the 'id' specified isn't a label name.
    183         if isinstance(id, basestring):
    184             label = models.Label.smart_get(add_label(id))
    185         else:
    186             raise ValueError('Label id (%s) does not exist. Please specify '
    187                              'the argument, id, as a string (label name).'
    188                              % id)
    189 
    190     # Make sure the label exists on the shard with the same id
    191     # as it is on the master.
    192     # It is possible that the label is already in a shard because
    193     # we are adding a new label only to shards of hosts that the label
    194     # is going to be attached.
    195     # For example, we add a label L1 to a host in shard S1.
    196     # Master and S1 will have L1 but other shards won't.
    197     # Later, when we add the same label L1 to hosts in shards S1 and S2,
    198     # S1 already has the label but S2 doesn't.
    199     # S2 should have the new label without any problem.
    200     # We ignore exception in such a case.
    201     host_objs = models.Host.smart_get_bulk(hosts)
    202     rpc_utils.fanout_rpc(
    203             host_objs, 'add_label', include_hostnames=False,
    204             name=label.name, ignore_exception_if_exists=True,
    205             id=label.id, platform=label.platform)
    206 
    207 
    208 @rpc_utils.route_rpc_to_master
    209 def label_add_hosts(id, hosts):
    210     """Adds a label with the given id to the given hosts.
    211 
    212     This method should be run only on master not shards.
    213     The given label will be created if it doesn't exist, provided the `id`
    214     supplied is a label name not an int/long id.
    215 
    216     @param id: id or name of a label. More often a label name.
    217     @param hosts: A list of hostnames or ids. More often hostnames.
    218 
    219     @raises ValueError: If the id specified is an int/long (label id)
    220                         while the label does not exist.
    221     """
    222     # Create the label.
    223     _create_label_everywhere(id, hosts)
    224 
    225     # Add it to the master.
    226     add_label_to_hosts(id, hosts)
    227 
    228     # Add it to the shards.
    229     host_objs = models.Host.smart_get_bulk(hosts)
    230     rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id)
    231 
    232 
    233 def remove_label_from_hosts(id, hosts):
    234     """Removes a label of the given id from the given hosts only in local DB.
    235 
    236     @param id: id or name of a label.
    237     @param hosts: The hostnames of hosts that need to remove the label from.
    238     """
    239     host_objs = models.Host.smart_get_bulk(hosts)
    240     models.Label.smart_get(id).host_set.remove(*host_objs)
    241 
    242 
    243 @rpc_utils.route_rpc_to_master
    244 def label_remove_hosts(id, hosts):
    245     """Removes a label of the given id from the given hosts.
    246 
    247     This method should be run only on master not shards.
    248 
    249     @param id: id or name of a label.
    250     @param hosts: A list of hostnames or ids. More often hostnames.
    251     """
    252     host_objs = models.Host.smart_get_bulk(hosts)
    253     remove_label_from_hosts(id, hosts)
    254 
    255     rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id)
    256 
    257 
    258 def get_labels(exclude_filters=(), **filter_data):
    259     """\
    260     @param exclude_filters: A sequence of dictionaries of filters.
    261 
    262     @returns A sequence of nested dictionaries of label information.
    263     """
    264     labels = models.Label.query_objects(filter_data)
    265     for exclude_filter in exclude_filters:
    266         labels = labels.exclude(**exclude_filter)
    267     return rpc_utils.prepare_rows_as_nested_dicts(labels, ())
    268 
    269 
    270 # hosts
    271 
    272 def add_host(hostname, status=None, locked=None, lock_reason='', protection=None):
    273     if locked and not lock_reason:
    274         raise model_logic.ValidationError(
    275             {'locked': 'Please provide a reason for locking when adding host.'})
    276 
    277     return models.Host.add_object(hostname=hostname, status=status,
    278                                   locked=locked, lock_reason=lock_reason,
    279                                   protection=protection).id
    280 
    281 
    282 @rpc_utils.route_rpc_to_master
    283 def modify_host(id, **kwargs):
    284     """Modify local attributes of a host.
    285 
    286     If this is called on the master, but the host is assigned to a shard, this
    287     will call `modify_host_local` RPC to the responsible shard. This means if
    288     a host is being locked using this function, this change will also propagate
    289     to shards.
    290     When this is called on a shard, the shard just routes the RPC to the master
    291     and does nothing.
    292 
    293     @param id: id of the host to modify.
    294     @param kwargs: key=value pairs of values to set on the host.
    295     """
    296     rpc_utils.check_modify_host(kwargs)
    297     host = models.Host.smart_get(id)
    298     try:
    299         rpc_utils.check_modify_host_locking(host, kwargs)
    300     except model_logic.ValidationError as e:
    301         if not kwargs.get('force_modify_locking', False):
    302             raise
    303         logging.exception('The following exception will be ignored and lock '
    304                           'modification will be enforced. %s', e)
    305 
    306     # This is required to make `lock_time` for a host be exactly same
    307     # between the master and a shard.
    308     if kwargs.get('locked', None) and 'lock_time' not in kwargs:
    309         kwargs['lock_time'] = datetime.datetime.now()
    310     host.update_object(kwargs)
    311 
    312     # force_modifying_locking is not an internal field in database, remove.
    313     kwargs.pop('force_modify_locking', None)
    314     rpc_utils.fanout_rpc([host], 'modify_host_local',
    315                          include_hostnames=False, id=id, **kwargs)
    316 
    317 
    318 def modify_host_local(id, **kwargs):
    319     """Modify host attributes in local DB.
    320 
    321     @param id: Host id.
    322     @param kwargs: key=value pairs of values to set on the host.
    323     """
    324     models.Host.smart_get(id).update_object(kwargs)
    325 
    326 
    327 @rpc_utils.route_rpc_to_master
    328 def modify_hosts(host_filter_data, update_data):
    329     """Modify local attributes of multiple hosts.
    330 
    331     If this is called on the master, but one of the hosts in that match the
    332     filters is assigned to a shard, this will call `modify_hosts_local` RPC
    333     to the responsible shard.
    334     When this is called on a shard, the shard just routes the RPC to the master
    335     and does nothing.
    336 
    337     The filters are always applied on the master, not on the shards. This means
    338     if the states of a host differ on the master and a shard, the state on the
    339     master will be used. I.e. this means:
    340     A host was synced to Shard 1. On Shard 1 the status of the host was set to
    341     'Repair Failed'.
    342     - A call to modify_hosts with host_filter_data={'status': 'Ready'} will
    343     update the host (both on the shard and on the master), because the state
    344     of the host as the master knows it is still 'Ready'.
    345     - A call to modify_hosts with host_filter_data={'status': 'Repair failed'
    346     will not update the host, because the filter doesn't apply on the master.
    347 
    348     @param host_filter_data: Filters out which hosts to modify.
    349     @param update_data: A dictionary with the changes to make to the hosts.
    350     """
    351     update_data = update_data.copy()
    352     rpc_utils.check_modify_host(update_data)
    353     hosts = models.Host.query_objects(host_filter_data)
    354 
    355     affected_shard_hostnames = set()
    356     affected_host_ids = []
    357 
    358     # Check all hosts before changing data for exception safety.
    359     for host in hosts:
    360         try:
    361             rpc_utils.check_modify_host_locking(host, update_data)
    362         except model_logic.ValidationError as e:
    363             if not update_data.get('force_modify_locking', False):
    364                 raise
    365             logging.exception('The following exception will be ignored and '
    366                               'lock modification will be enforced. %s', e)
    367 
    368         if host.shard:
    369             affected_shard_hostnames.add(host.shard.rpc_hostname())
    370             affected_host_ids.append(host.id)
    371 
    372     # This is required to make `lock_time` for a host be exactly same
    373     # between the master and a shard.
    374     if update_data.get('locked', None) and 'lock_time' not in update_data:
    375         update_data['lock_time'] = datetime.datetime.now()
    376     for host in hosts:
    377         host.update_object(update_data)
    378 
    379     update_data.pop('force_modify_locking', None)
    380     # Caution: Changing the filter from the original here. See docstring.
    381     rpc_utils.run_rpc_on_multiple_hostnames(
    382             'modify_hosts_local', affected_shard_hostnames,
    383             host_filter_data={'id__in': affected_host_ids},
    384             update_data=update_data)
    385 
    386 
    387 def modify_hosts_local(host_filter_data, update_data):
    388     """Modify attributes of hosts in local DB.
    389 
    390     @param host_filter_data: Filters out which hosts to modify.
    391     @param update_data: A dictionary with the changes to make to the hosts.
    392     """
    393     for host in models.Host.query_objects(host_filter_data):
    394         host.update_object(update_data)
    395 
    396 
    397 def add_labels_to_host(id, labels):
    398     """Adds labels to a given host only in local DB.
    399 
    400     @param id: id or hostname for a host.
    401     @param labels: ids or names for labels.
    402     """
    403     label_objs = models.Label.smart_get_bulk(labels)
    404     models.Host.smart_get(id).labels.add(*label_objs)
    405 
    406 
    407 @rpc_utils.route_rpc_to_master
    408 def host_add_labels(id, labels):
    409     """Adds labels to a given host.
    410 
    411     @param id: id or hostname for a host.
    412     @param labels: ids or names for labels.
    413 
    414     @raises ValidationError: If adding more than one platform/board label.
    415     """
    416     # Create the labels on the master/shards.
    417     for label in labels:
    418         _create_label_everywhere(label, [id])
    419 
    420     label_objs = models.Label.smart_get_bulk(labels)
    421     platforms = [label.name for label in label_objs if label.platform]
    422     boards = [label.name for label in label_objs
    423               if label.name.startswith('board:')]
    424     if len(platforms) > 1 or not utils.board_labels_allowed(boards):
    425         raise model_logic.ValidationError(
    426             {'labels': ('Adding more than one platform label, or a list of '
    427                         'non-compatible board labels.: %s %s' %
    428                         (', '.join(platforms), ', '.join(boards)))})
    429 
    430     host_obj = models.Host.smart_get(id)
    431     if platforms:
    432         models.Host.check_no_platform([host_obj])
    433     if boards:
    434         models.Host.check_board_labels_allowed([host_obj], labels)
    435     add_labels_to_host(id, labels)
    436 
    437     rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False,
    438                          id=id, labels=labels)
    439 
    440 
    441 def remove_labels_from_host(id, labels):
    442     """Removes labels from a given host only in local DB.
    443 
    444     @param id: id or hostname for a host.
    445     @param labels: ids or names for labels.
    446     """
    447     label_objs = models.Label.smart_get_bulk(labels)
    448     models.Host.smart_get(id).labels.remove(*label_objs)
    449 
    450 
    451 @rpc_utils.route_rpc_to_master
    452 def host_remove_labels(id, labels):
    453     """Removes labels from a given host.
    454 
    455     @param id: id or hostname for a host.
    456     @param labels: ids or names for labels.
    457     """
    458     remove_labels_from_host(id, labels)
    459 
    460     host_obj = models.Host.smart_get(id)
    461     rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False,
    462                          id=id, labels=labels)
    463 
    464 
    465 def get_host_attribute(attribute, **host_filter_data):
    466     """
    467     @param attribute: string name of attribute
    468     @param host_filter_data: filter data to apply to Hosts to choose hosts to
    469                              act upon
    470     """
    471     hosts = rpc_utils.get_host_query((), False, True, host_filter_data)
    472     hosts = list(hosts)
    473     models.Host.objects.populate_relationships(hosts, models.HostAttribute,
    474                                                'attribute_list')
    475     host_attr_dicts = []
    476     for host_obj in hosts:
    477         for attr_obj in host_obj.attribute_list:
    478             if attr_obj.attribute == attribute:
    479                 host_attr_dicts.append(attr_obj.get_object_dict())
    480     return rpc_utils.prepare_for_serialization(host_attr_dicts)
    481 
    482 
    483 def set_host_attribute(attribute, value, **host_filter_data):
    484     """
    485     @param attribute: string name of attribute
    486     @param value: string, or None to delete an attribute
    487     @param host_filter_data: filter data to apply to Hosts to choose hosts to
    488                              act upon
    489     """
    490     assert host_filter_data # disallow accidental actions on all hosts
    491     hosts = models.Host.query_objects(host_filter_data)
    492     models.AclGroup.check_for_acl_violation_hosts(hosts)
    493     for host in hosts:
    494         host.set_or_delete_attribute(attribute, value)
    495 
    496     # Master forwards this RPC to shards.
    497     if not utils.is_shard():
    498         rpc_utils.fanout_rpc(hosts, 'set_host_attribute', False,
    499                 attribute=attribute, value=value, **host_filter_data)
    500 
    501 
    502 @rpc_utils.forward_single_host_rpc_to_shard
    503 def delete_host(id):
    504     models.Host.smart_get(id).delete()
    505 
    506 
    507 def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
    508               valid_only=True, include_current_job=False, **filter_data):
    509     """Get a list of dictionaries which contains the information of hosts.
    510 
    511     @param multiple_labels: match hosts in all of the labels given.  Should
    512             be a list of label names.
    513     @param exclude_only_if_needed_labels: Exclude hosts with at least one
    514             "only_if_needed" label applied.
    515     @param include_current_job: Set to True to include ids of currently running
    516             job and special task.
    517     """
    518     hosts = rpc_utils.get_host_query(multiple_labels,
    519                                      exclude_only_if_needed_labels,
    520                                      valid_only, filter_data)
    521     hosts = list(hosts)
    522     models.Host.objects.populate_relationships(hosts, models.Label,
    523                                                'label_list')
    524     models.Host.objects.populate_relationships(hosts, models.AclGroup,
    525                                                'acl_list')
    526     models.Host.objects.populate_relationships(hosts, models.HostAttribute,
    527                                                'attribute_list')
    528     host_dicts = []
    529     for host_obj in hosts:
    530         host_dict = host_obj.get_object_dict()
    531         host_dict['labels'] = [label.name for label in host_obj.label_list]
    532         host_dict['platform'] = rpc_utils.find_platform(host_obj)
    533         host_dict['acls'] = [acl.name for acl in host_obj.acl_list]
    534         host_dict['attributes'] = dict((attribute.attribute, attribute.value)
    535                                        for attribute in host_obj.attribute_list)
    536         if include_current_job:
    537             host_dict['current_job'] = None
    538             host_dict['current_special_task'] = None
    539             entries = models.HostQueueEntry.objects.filter(
    540                     host_id=host_dict['id'], active=True, complete=False)
    541             if entries:
    542                 host_dict['current_job'] = (
    543                         entries[0].get_object_dict()['job'])
    544             tasks = models.SpecialTask.objects.filter(
    545                     host_id=host_dict['id'], is_active=True, is_complete=False)
    546             if tasks:
    547                 host_dict['current_special_task'] = (
    548                         '%d-%s' % (tasks[0].get_object_dict()['id'],
    549                                    tasks[0].get_object_dict()['task'].lower()))
    550         host_dicts.append(host_dict)
    551     return rpc_utils.prepare_for_serialization(host_dicts)
    552 
    553 
    554 def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
    555                   valid_only=True, **filter_data):
    556     """
    557     Same parameters as get_hosts().
    558 
    559     @returns The number of matching hosts.
    560     """
    561     hosts = rpc_utils.get_host_query(multiple_labels,
    562                                      exclude_only_if_needed_labels,
    563                                      valid_only, filter_data)
    564     return hosts.count()
    565 
    566 
    567 # tests
    568 
    569 def get_tests(**filter_data):
    570     return rpc_utils.prepare_for_serialization(
    571         models.Test.list_objects(filter_data))
    572 
    573 
    574 def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
    575     """Gets the counts of all passed and failed tests from the matching jobs.
    576 
    577     @param job_name_prefix: Name prefix of the jobs to get the summary
    578            from, e.g., 'butterfly-release/R40-6457.21.0/bvt-cq/'.
    579     @param label_name: Label that must be set in the jobs, e.g.,
    580             'cros-version:butterfly-release/R40-6457.21.0'.
    581 
    582     @returns A summary of the counts of all the passed and failed tests.
    583     """
    584     job_ids = list(models.Job.objects.filter(
    585             name__startswith=job_name_prefix,
    586             dependency_labels__name=label_name).values_list(
    587                 'pk', flat=True))
    588     summary = {'passed': 0, 'failed': 0}
    589     if not job_ids:
    590         return summary
    591 
    592     counts = (tko_models.TestView.objects.filter(
    593             afe_job_id__in=job_ids).exclude(
    594                 test_name='SERVER_JOB').exclude(
    595                     test_name__startswith='CLIENT_JOB').values(
    596                         'status').annotate(
    597                             count=Count('status')))
    598     for status in counts:
    599         if status['status'] == 'GOOD':
    600             summary['passed'] += status['count']
    601         else:
    602             summary['failed'] += status['count']
    603     return summary
    604 
    605 
    606 # profilers
    607 
    608 def add_profiler(name, description=None):
    609     return models.Profiler.add_object(name=name, description=description).id
    610 
    611 
    612 def modify_profiler(id, **data):
    613     models.Profiler.smart_get(id).update_object(data)
    614 
    615 
    616 def delete_profiler(id):
    617     models.Profiler.smart_get(id).delete()
    618 
    619 
    620 def get_profilers(**filter_data):
    621     return rpc_utils.prepare_for_serialization(
    622         models.Profiler.list_objects(filter_data))
    623 
    624 
    625 # users
    626 
    627 def get_users(**filter_data):
    628     return rpc_utils.prepare_for_serialization(
    629         models.User.list_objects(filter_data))
    630 
    631 
    632 # acl groups
    633 
    634 def add_acl_group(name, description=None):
    635     group = models.AclGroup.add_object(name=name, description=description)
    636     group.users.add(models.User.current_user())
    637     return group.id
    638 
    639 
    640 def modify_acl_group(id, **data):
    641     group = models.AclGroup.smart_get(id)
    642     group.check_for_acl_violation_acl_group()
    643     group.update_object(data)
    644     group.add_current_user_if_empty()
    645 
    646 
    647 def acl_group_add_users(id, users):
    648     group = models.AclGroup.smart_get(id)
    649     group.check_for_acl_violation_acl_group()
    650     users = models.User.smart_get_bulk(users)
    651     group.users.add(*users)
    652 
    653 
    654 def acl_group_remove_users(id, users):
    655     group = models.AclGroup.smart_get(id)
    656     group.check_for_acl_violation_acl_group()
    657     users = models.User.smart_get_bulk(users)
    658     group.users.remove(*users)
    659     group.add_current_user_if_empty()
    660 
    661 
    662 def acl_group_add_hosts(id, hosts):
    663     group = models.AclGroup.smart_get(id)
    664     group.check_for_acl_violation_acl_group()
    665     hosts = models.Host.smart_get_bulk(hosts)
    666     group.hosts.add(*hosts)
    667     group.on_host_membership_change()
    668 
    669 
    670 def acl_group_remove_hosts(id, hosts):
    671     group = models.AclGroup.smart_get(id)
    672     group.check_for_acl_violation_acl_group()
    673     hosts = models.Host.smart_get_bulk(hosts)
    674     group.hosts.remove(*hosts)
    675     group.on_host_membership_change()
    676 
    677 
    678 def delete_acl_group(id):
    679     models.AclGroup.smart_get(id).delete()
    680 
    681 
    682 def get_acl_groups(**filter_data):
    683     acl_groups = models.AclGroup.list_objects(filter_data)
    684     for acl_group in acl_groups:
    685         acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])
    686         acl_group['users'] = [user.login
    687                               for user in acl_group_obj.users.all()]
    688         acl_group['hosts'] = [host.hostname
    689                               for host in acl_group_obj.hosts.all()]
    690     return rpc_utils.prepare_for_serialization(acl_groups)
    691 
    692 
    693 # jobs
    694 
    695 def generate_control_file(tests=(), profilers=(),
    696                           client_control_file='', use_container=False,
    697                           profile_only=None, db_tests=True,
    698                           test_source_build=None):
    699     """
    700     Generates a client-side control file to run tests.
    701 
    702     @param tests List of tests to run. See db_tests for more information.
    703     @param profilers List of profilers to activate during the job.
    704     @param client_control_file The contents of a client-side control file to
    705         run at the end of all tests.  If this is supplied, all tests must be
    706         client side.
    707         TODO: in the future we should support server control files directly
    708         to wrap with a kernel.  That'll require changing the parameter
    709         name and adding a boolean to indicate if it is a client or server
    710         control file.
    711     @param use_container unused argument today.  TODO: Enable containers
    712         on the host during a client side test.
    713     @param profile_only A boolean that indicates what default profile_only
    714         mode to use in the control file. Passing None will generate a
    715         control file that does not explcitly set the default mode at all.
    716     @param db_tests: if True, the test object can be found in the database
    717                      backing the test model. In this case, tests is a tuple
    718                      of test IDs which are used to retrieve the test objects
    719                      from the database. If False, tests is a tuple of test
    720                      dictionaries stored client-side in the AFE.
    721     @param test_source_build: Build to be used to retrieve test code. Default
    722                               to None.
    723 
    724     @returns a dict with the following keys:
    725         control_file: str, The control file text.
    726         is_server: bool, is the control file a server-side control file?
    727         synch_count: How many machines the job uses per autoserv execution.
    728             synch_count == 1 means the job is asynchronous.
    729         dependencies: A list of the names of labels on which the job depends.
    730     """
    731     if not tests and not client_control_file:
    732         return dict(control_file='', is_server=False, synch_count=1,
    733                     dependencies=[])
    734 
    735     cf_info, test_objects, profiler_objects = (
    736         rpc_utils.prepare_generate_control_file(tests, profilers,
    737                                                 db_tests))
    738     cf_info['control_file'] = control_file_lib.generate_control(
    739         tests=test_objects, profilers=profiler_objects,
    740         is_server=cf_info['is_server'],
    741         client_control_file=client_control_file, profile_only=profile_only,
    742         test_source_build=test_source_build)
    743     return cf_info
    744 
    745 
    746 def create_job_page_handler(name, priority, control_file, control_type,
    747                             image=None, hostless=False, firmware_rw_build=None,
    748                             firmware_ro_build=None, test_source_build=None,
    749                             is_cloning=False, **kwargs):
    750     """\
    751     Create and enqueue a job.
    752 
    753     @param name name of this job
    754     @param priority Integer priority of this job.  Higher is more important.
    755     @param control_file String contents of the control file.
    756     @param control_type Type of control file, Client or Server.
    757     @param image: ChromeOS build to be installed in the dut. Default to None.
    758     @param firmware_rw_build: Firmware build to update RW firmware. Default to
    759                               None, i.e., RW firmware will not be updated.
    760     @param firmware_ro_build: Firmware build to update RO firmware. Default to
    761                               None, i.e., RO firmware will not be updated.
    762     @param test_source_build: Build to be used to retrieve test code. Default
    763                               to None.
    764     @param is_cloning: True if creating a cloning job.
    765     @param kwargs extra args that will be required by create_suite_job or
    766                   create_job.
    767 
    768     @returns The created Job id number.
    769     """
    770     if is_cloning:
    771         logging.info('Start to clone a new job')
    772         # When cloning a job, hosts and meta_hosts should not exist together,
    773         # which would cause host-scheduler to schedule two hqe jobs to one host
    774         # at the same time, and crash itself. Clear meta_hosts for this case.
    775         if kwargs.get('hosts') and kwargs.get('meta_hosts'):
    776             kwargs['meta_hosts'] = []
    777     else:
    778         logging.info('Start to create a new job')
    779     control_file = rpc_utils.encode_ascii(control_file)
    780     if not control_file:
    781         raise model_logic.ValidationError({
    782                 'control_file' : "Control file cannot be empty"})
    783 
    784     if image and hostless:
    785         builds = {}
    786         builds[provision.CROS_VERSION_PREFIX] = image
    787         if firmware_rw_build:
    788             builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
    789         if firmware_ro_build:
    790             builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
    791         return create_suite_job(
    792                 name=name, control_file=control_file, priority=priority,
    793                 builds=builds, test_source_build=test_source_build,
    794                 is_cloning=is_cloning, **kwargs)
    795     return create_job(name, priority, control_file, control_type, image=image,
    796                       hostless=hostless, **kwargs)
    797 
    798 
    799 @rpc_utils.route_rpc_to_master
    800 def create_job(
    801         name,
    802         priority,
    803         control_file,
    804         control_type,
    805         hosts=(),
    806         meta_hosts=(),
    807         one_time_hosts=(),
    808         synch_count=None,
    809         is_template=False,
    810         timeout=None,
    811         timeout_mins=None,
    812         max_runtime_mins=None,
    813         run_verify=False,
    814         email_list='',
    815         dependencies=(),
    816         reboot_before=None,
    817         reboot_after=None,
    818         parse_failed_repair=None,
    819         hostless=False,
    820         keyvals=None,
    821         drone_set=None,
    822         image=None,
    823         parent_job_id=None,
    824         test_retry=0,
    825         run_reset=True,
    826         require_ssp=None,
    827         args=(),
    828         **kwargs):
    829     """\
    830     Create and enqueue a job.
    831 
    832     @param name name of this job
    833     @param priority Integer priority of this job.  Higher is more important.
    834     @param control_file String contents of the control file.
    835     @param control_type Type of control file, Client or Server.
    836     @param synch_count How many machines the job uses per autoserv execution.
    837         synch_count == 1 means the job is asynchronous.  If an atomic group is
    838         given this value is treated as a minimum.
    839     @param is_template If true then create a template job.
    840     @param timeout Hours after this call returns until the job times out.
    841     @param timeout_mins Minutes after this call returns until the job times
    842         out.
    843     @param max_runtime_mins Minutes from job starting time until job times out
    844     @param run_verify Should the host be verified before running the test?
    845     @param email_list String containing emails to mail when the job is done
    846     @param dependencies List of label names on which this job depends
    847     @param reboot_before Never, If dirty, or Always
    848     @param reboot_after Never, If all tests passed, or Always
    849     @param parse_failed_repair if true, results of failed repairs launched by
    850         this job will be parsed as part of the job.
    851     @param hostless if true, create a hostless job
    852     @param keyvals dict of keyvals to associate with the job
    853     @param hosts List of hosts to run job on.
    854     @param meta_hosts List where each entry is a label name, and for each entry
    855         one host will be chosen from that label to run the job on.
    856     @param one_time_hosts List of hosts not in the database to run the job on.
    857     @param drone_set The name of the drone set to run this test on.
    858     @param image OS image to install before running job.
    859     @param parent_job_id id of a job considered to be parent of created job.
    860     @param test_retry Number of times to retry test if the test did not
    861         complete successfully. (optional, default: 0)
    862     @param run_reset Should the host be reset before running the test?
    863     @param require_ssp Set to True to require server-side packaging to run the
    864                        test. If it's set to None, drone will still try to run
    865                        the server side with server-side packaging. If the
    866                        autotest-server package doesn't exist for the build or
    867                        image is not set, drone will run the test without server-
    868                        side packaging. Default is None.
    869     @param args A list of args to be injected into control file.
    870     @param kwargs extra keyword args. NOT USED.
    871 
    872     @returns The created Job id number.
    873     """
    874     if args:
    875         control_file = tools.inject_vars({'args': args}, control_file)
    876     if image:
    877         dependencies += (provision.image_version_to_label(image),)
    878     return rpc_utils.create_job_common(
    879             name=name,
    880             priority=priority,
    881             control_type=control_type,
    882             control_file=control_file,
    883             hosts=hosts,
    884             meta_hosts=meta_hosts,
    885             one_time_hosts=one_time_hosts,
    886             synch_count=synch_count,
    887             is_template=is_template,
    888             timeout=timeout,
    889             timeout_mins=timeout_mins,
    890             max_runtime_mins=max_runtime_mins,
    891             run_verify=run_verify,
    892             email_list=email_list,
    893             dependencies=dependencies,
    894             reboot_before=reboot_before,
    895             reboot_after=reboot_after,
    896             parse_failed_repair=parse_failed_repair,
    897             hostless=hostless,
    898             keyvals=keyvals,
    899             drone_set=drone_set,
    900             parent_job_id=parent_job_id,
    901             test_retry=test_retry,
    902             run_reset=run_reset,
    903             require_ssp=require_ssp)
    904 
    905 
    906 def abort_host_queue_entries(**filter_data):
    907     """\
    908     Abort a set of host queue entries.
    909 
    910     @return: A list of dictionaries, each contains information
    911              about an aborted HQE.
    912     """
    913     query = models.HostQueueEntry.query_objects(filter_data)
    914 
    915     # Dont allow aborts on:
    916     #   1. Jobs that have already completed (whether or not they were aborted)
    917     #   2. Jobs that we have already been aborted (but may not have completed)
    918     query = query.filter(complete=False).filter(aborted=False)
    919     models.AclGroup.check_abort_permissions(query)
    920     host_queue_entries = list(query.select_related())
    921     rpc_utils.check_abort_synchronous_jobs(host_queue_entries)
    922 
    923     models.HostQueueEntry.abort_host_queue_entries(host_queue_entries)
    924     hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id,
    925                  'Job name': hqe.job.name} for hqe in host_queue_entries]
    926     return hqe_info
    927 
    928 
    929 def abort_special_tasks(**filter_data):
    930     """\
    931     Abort the special task, or tasks, specified in the filter.
    932     """
    933     query = models.SpecialTask.query_objects(filter_data)
    934     special_tasks = query.filter(is_active=True)
    935     for task in special_tasks:
    936         task.abort()
    937 
    938 
    939 def _call_special_tasks_on_hosts(task, hosts):
    940     """\
    941     Schedules a set of hosts for a special task.
    942 
    943     @returns A list of hostnames that a special task was created for.
    944     """
    945     models.AclGroup.check_for_acl_violation_hosts(hosts)
    946     shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
    947     if shard_host_map and not utils.is_shard():
    948         raise ValueError('The following hosts are on shards, please '
    949                          'follow the link to the shards and create jobs '
    950                          'there instead. %s.' % shard_host_map)
    951     for host in hosts:
    952         models.SpecialTask.schedule_special_task(host, task)
    953     return list(sorted(host.hostname for host in hosts))
    954 
    955 
    956 def _forward_special_tasks_on_hosts(task, rpc, **filter_data):
    957     """Forward special tasks to corresponding shards.
    958 
    959     For master, when special tasks are fired on hosts that are sharded,
    960     forward the RPC to corresponding shards.
    961 
    962     For shard, create special task records in local DB.
    963 
    964     @param task: Enum value of frontend.afe.models.SpecialTask.Task
    965     @param rpc: RPC name to forward.
    966     @param filter_data: Filter keywords to be used for DB query.
    967 
    968     @return: A list of hostnames that a special task was created for.
    969     """
    970     hosts = models.Host.query_objects(filter_data)
    971     shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts, rpc_hostnames=True)
    972 
    973     # Filter out hosts on a shard from those on the master, forward
    974     # rpcs to the shard with an additional hostname__in filter, and
    975     # create a local SpecialTask for each remaining host.
    976     if shard_host_map and not utils.is_shard():
    977         hosts = [h for h in hosts if h.shard is None]
    978         for shard, hostnames in shard_host_map.iteritems():
    979 
    980             # The main client of this module is the frontend website, and
    981             # it invokes it with an 'id' or an 'id__in' filter. Regardless,
    982             # the 'hostname' filter should narrow down the list of hosts on
    983             # each shard even though we supply all the ids in filter_data.
    984             # This method uses hostname instead of id because it fits better
    985             # with the overall architecture of redirection functions in
    986             # rpc_utils.
    987             shard_filter = filter_data.copy()
    988             shard_filter['hostname__in'] = hostnames
    989             rpc_utils.run_rpc_on_multiple_hostnames(
    990                     rpc, [shard], **shard_filter)
    991 
    992     # There is a race condition here if someone assigns a shard to one of these
    993     # hosts before we create the task. The host will stay on the master if:
    994     # 1. The host is not Ready
    995     # 2. The host is Ready but has a task
    996     # But if the host is Ready and doesn't have a task yet, it will get sent
    997     # to the shard as we're creating a task here.
    998 
    999     # Given that we only rarely verify Ready hosts it isn't worth putting this
   1000     # entire method in a transaction. The worst case scenario is that we have
   1001     # a verify running on a Ready host while the shard is using it, if the
   1002     # verify fails no subsequent tasks will be created against the host on the
   1003     # master, and verifies are safe enough that this is OK.
   1004     return _call_special_tasks_on_hosts(task, hosts)
   1005 
   1006 
   1007 def reverify_hosts(**filter_data):
   1008     """\
   1009     Schedules a set of hosts for verify.
   1010 
   1011     @returns A list of hostnames that a verify task was created for.
   1012     """
   1013     return _forward_special_tasks_on_hosts(
   1014             models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data)
   1015 
   1016 
   1017 def repair_hosts(**filter_data):
   1018     """\
   1019     Schedules a set of hosts for repair.
   1020 
   1021     @returns A list of hostnames that a repair task was created for.
   1022     """
   1023     return _forward_special_tasks_on_hosts(
   1024             models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data)
   1025 
   1026 
   1027 def get_jobs(not_yet_run=False, running=False, finished=False,
   1028              suite=False, sub=False, standalone=False, **filter_data):
   1029     """\
   1030     Extra status filter args for get_jobs:
   1031     -not_yet_run: Include only jobs that have not yet started running.
   1032     -running: Include only jobs that have start running but for which not
   1033     all hosts have completed.
   1034     -finished: Include only jobs for which all hosts have completed (or
   1035     aborted).
   1036 
   1037     Extra type filter args for get_jobs:
   1038     -suite: Include only jobs with child jobs.
   1039     -sub: Include only jobs with a parent job.
   1040     -standalone: Inlcude only jobs with no child or parent jobs.
   1041     At most one of these three fields should be specified.
   1042     """
   1043     extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
   1044                                                     running,
   1045                                                     finished)
   1046     filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
   1047                                                                  suite,
   1048                                                                  sub,
   1049                                                                  standalone)
   1050     job_dicts = []
   1051     jobs = list(models.Job.query_objects(filter_data))
   1052     models.Job.objects.populate_relationships(jobs, models.Label,
   1053                                               'dependencies')
   1054     models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
   1055     for job in jobs:
   1056         job_dict = job.get_object_dict()
   1057         job_dict['dependencies'] = ','.join(label.name
   1058                                             for label in job.dependencies)
   1059         job_dict['keyvals'] = dict((keyval.key, keyval.value)
   1060                                    for keyval in job.keyvals)
   1061         job_dicts.append(job_dict)
   1062     return rpc_utils.prepare_for_serialization(job_dicts)
   1063 
   1064 
   1065 def get_num_jobs(not_yet_run=False, running=False, finished=False,
   1066                  suite=False, sub=False, standalone=False,
   1067                  **filter_data):
   1068     """\
   1069     See get_jobs() for documentation of extra filter parameters.
   1070     """
   1071     extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
   1072                                                     running,
   1073                                                     finished)
   1074     filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
   1075                                                                  suite,
   1076                                                                  sub,
   1077                                                                  standalone)
   1078     return models.Job.query_count(filter_data)
   1079 
   1080 
   1081 def get_jobs_summary(**filter_data):
   1082     """\
   1083     Like get_jobs(), but adds 'status_counts' and 'result_counts' field.
   1084 
   1085     'status_counts' filed is a dictionary mapping status strings to the number
   1086     of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}.
   1087 
   1088     'result_counts' field is piped to tko's rpc_interface and has the return
   1089     format specified under get_group_counts.
   1090     """
   1091     jobs = get_jobs(**filter_data)
   1092     ids = [job['id'] for job in jobs]
   1093     all_status_counts = models.Job.objects.get_status_counts(ids)
   1094     for job in jobs:
   1095         job['status_counts'] = all_status_counts[job['id']]
   1096         job['result_counts'] = tko_rpc_interface.get_status_counts(
   1097                 ['afe_job_id', 'afe_job_id'],
   1098                 header_groups=[['afe_job_id'], ['afe_job_id']],
   1099                 **{'afe_job_id': job['id']})
   1100     return rpc_utils.prepare_for_serialization(jobs)
   1101 
   1102 
   1103 def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):
   1104     """\
   1105     Retrieves all the information needed to clone a job.
   1106     """
   1107     job = models.Job.objects.get(id=id)
   1108     job_info = rpc_utils.get_job_info(job,
   1109                                       preserve_metahosts,
   1110                                       queue_entry_filter_data)
   1111 
   1112     host_dicts = []
   1113     for host in job_info['hosts']:
   1114         host_dict = get_hosts(id=host.id)[0]
   1115         other_labels = host_dict['labels']
   1116         if host_dict['platform']:
   1117             other_labels.remove(host_dict['platform'])
   1118         host_dict['other_labels'] = ', '.join(other_labels)
   1119         host_dicts.append(host_dict)
   1120 
   1121     for host in job_info['one_time_hosts']:
   1122         host_dict = dict(hostname=host.hostname,
   1123                          id=host.id,
   1124                          platform='(one-time host)',
   1125                          locked_text='')
   1126         host_dicts.append(host_dict)
   1127 
   1128     # convert keys from Label objects to strings (names of labels)
   1129     meta_host_counts = dict((meta_host.name, count) for meta_host, count
   1130                             in job_info['meta_host_counts'].iteritems())
   1131 
   1132     info = dict(job=job.get_object_dict(),
   1133                 meta_host_counts=meta_host_counts,
   1134                 hosts=host_dicts)
   1135     info['job']['dependencies'] = job_info['dependencies']
   1136     info['hostless'] = job_info['hostless']
   1137     info['drone_set'] = job.drone_set and job.drone_set.name
   1138 
   1139     image = _get_image_for_job(job, job_info['hostless'])
   1140     if image:
   1141         info['job']['image'] = image
   1142 
   1143     return rpc_utils.prepare_for_serialization(info)
   1144 
   1145 
   1146 def _get_image_for_job(job, hostless):
   1147     """Gets the image used for a job.
   1148 
   1149     Gets the image used for an AFE job from the job's keyvals 'build' or
   1150     'builds'. If that fails, and the job is a hostless job, tries to
   1151     get the image from its control file attributes 'build' or 'builds'.
   1152 
   1153     TODO(ntang): Needs to handle FAFT with two builds for ro/rw.
   1154 
   1155     @param job      An AFE job object.
   1156     @param hostless Boolean indicating whether the job is hostless.
   1157 
   1158     @returns The image build used for the job.
   1159     """
   1160     keyvals = job.keyval_dict()
   1161     image = keyvals.get('build')
   1162     if not image:
   1163         value = keyvals.get('builds')
   1164         builds = None
   1165         if isinstance(value, dict):
   1166             builds = value
   1167         elif isinstance(value, basestring):
   1168             builds = ast.literal_eval(value)
   1169         if builds:
   1170             image = builds.get('cros-version')
   1171     if not image and hostless and job.control_file:
   1172         try:
   1173             control_obj = control_data.parse_control_string(
   1174                     job.control_file)
   1175             if hasattr(control_obj, 'build'):
   1176                 image = getattr(control_obj, 'build')
   1177             if not image and hasattr(control_obj, 'builds'):
   1178                 builds = getattr(control_obj, 'builds')
   1179                 image = builds.get('cros-version')
   1180         except:
   1181             logging.warning('Failed to parse control file for job: %s',
   1182                             job.name)
   1183     return image
   1184 
   1185 
   1186 def get_host_queue_entries(start_time=None, end_time=None, **filter_data):
   1187     """\
   1188     @returns A sequence of nested dictionaries of host and job information.
   1189     """
   1190     filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
   1191                                                    'started_on__lte',
   1192                                                    start_time,
   1193                                                    end_time,
   1194                                                    **filter_data)
   1195     return rpc_utils.prepare_rows_as_nested_dicts(
   1196             models.HostQueueEntry.query_objects(filter_data),
   1197             ('host', 'job'))
   1198 
   1199 
   1200 def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
   1201     """\
   1202     Get the number of host queue entries associated with this job.
   1203     """
   1204     filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
   1205                                                    'started_on__lte',
   1206                                                    start_time,
   1207                                                    end_time,
   1208                                                    **filter_data)
   1209     return models.HostQueueEntry.query_count(filter_data)
   1210 
   1211 
   1212 def get_hqe_percentage_complete(**filter_data):
   1213     """
   1214     Computes the fraction of host queue entries matching the given filter data
   1215     that are complete.
   1216     """
   1217     query = models.HostQueueEntry.query_objects(filter_data)
   1218     complete_count = query.filter(complete=True).count()
   1219     total_count = query.count()
   1220     if total_count == 0:
   1221         return 1
   1222     return float(complete_count) / total_count
   1223 
   1224 
   1225 # special tasks
   1226 
   1227 def get_special_tasks(**filter_data):
   1228     """Get special task entries from the local database.
   1229 
   1230     Query the special tasks table for tasks matching the given
   1231     `filter_data`, and return a list of the results.  No attempt is
   1232     made to forward the call to shards; the buck will stop here.
   1233     The caller is expected to know the target shard for such reasons
   1234     as:
   1235       * The caller is a service (such as gs_offloader) configured
   1236         to operate on behalf of one specific shard, and no other.
   1237       * The caller has a host as a parameter, and knows that this is
   1238         the shard assigned to that host.
   1239 
   1240     @param filter_data  Filter keywords to pass to the underlying
   1241                         database query.
   1242 
   1243     """
   1244     return rpc_utils.prepare_rows_as_nested_dicts(
   1245             models.SpecialTask.query_objects(filter_data),
   1246             ('host', 'queue_entry'))
   1247 
   1248 
   1249 def get_host_special_tasks(host_id, **filter_data):
   1250     """Get special task entries for a given host.
   1251 
   1252     Query the special tasks table for tasks that ran on the host
   1253     given by `host_id` and matching the given `filter_data`.
   1254     Return a list of the results.  If the host is assigned to a
   1255     shard, forward this call to that shard.
   1256 
   1257     @param host_id      Id in the database of the target host.
   1258     @param filter_data  Filter keywords to pass to the underlying
   1259                         database query.
   1260 
   1261     """
   1262     # Retrieve host data even if the host is in an invalid state.
   1263     host = models.Host.smart_get(host_id, False)
   1264     if not host.shard:
   1265         return get_special_tasks(host_id=host_id, **filter_data)
   1266     else:
   1267         # The return values from AFE methods are post-processed
   1268         # objects that aren't JSON-serializable.  So, we have to
   1269         # call AFE.run() to get the raw, serializable output from
   1270         # the shard.
   1271         shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
   1272         return shard_afe.run('get_special_tasks',
   1273                              host_id=host_id, **filter_data)
   1274 
   1275 
   1276 def get_num_special_tasks(**kwargs):
   1277     """Get the number of special task entries from the local database.
   1278 
   1279     Query the special tasks table for tasks matching the given 'kwargs',
   1280     and return the number of the results. No attempt is made to forward
   1281     the call to shards; the buck will stop here.
   1282 
   1283     @param kwargs    Filter keywords to pass to the underlying database query.
   1284 
   1285     """
   1286     return models.SpecialTask.query_count(kwargs)
   1287 
   1288 
   1289 def get_host_num_special_tasks(host, **kwargs):
   1290     """Get special task entries for a given host.
   1291 
   1292     Query the special tasks table for tasks that ran on the host
   1293     given by 'host' and matching the given 'kwargs'.
   1294     Return a list of the results.  If the host is assigned to a
   1295     shard, forward this call to that shard.
   1296 
   1297     @param host      id or name of a host. More often a hostname.
   1298     @param kwargs    Filter keywords to pass to the underlying database query.
   1299 
   1300     """
   1301     # Retrieve host data even if the host is in an invalid state.
   1302     host_model = models.Host.smart_get(host, False)
   1303     if not host_model.shard:
   1304         return get_num_special_tasks(host=host, **kwargs)
   1305     else:
   1306         shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname())
   1307         return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
   1308 
   1309 
   1310 def get_status_task(host_id, end_time):
   1311     """Get the "status task" for a host from the local shard.
   1312 
   1313     Returns a single special task representing the given host's
   1314     "status task".  The status task is a completed special task that
   1315     identifies whether the corresponding host was working or broken
   1316     when it completed.  A successful task indicates a working host;
   1317     a failed task indicates broken.
   1318 
   1319     This call will not be forward to a shard; the receiving server
   1320     must be the shard that owns the host.
   1321 
   1322     @param host_id      Id in the database of the target host.
   1323     @param end_time     Time reference for the host's status.
   1324 
   1325     @return A single task; its status (successful or not)
   1326             corresponds to the status of the host (working or
   1327             broken) at the given time.  If no task is found, return
   1328             `None`.
   1329 
   1330     """
   1331     tasklist = rpc_utils.prepare_rows_as_nested_dicts(
   1332             status_history.get_status_task(host_id, end_time),
   1333             ('host', 'queue_entry'))
   1334     return tasklist[0] if tasklist else None
   1335 
   1336 
   1337 def get_host_status_task(host_id, end_time):
   1338     """Get the "status task" for a host from its owning shard.
   1339 
   1340     Finds the given host's owning shard, and forwards to it a call
   1341     to `get_status_task()` (see above).
   1342 
   1343     @param host_id      Id in the database of the target host.
   1344     @param end_time     Time reference for the host's status.
   1345 
   1346     @return A single task; its status (successful or not)
   1347             corresponds to the status of the host (working or
   1348             broken) at the given time.  If no task is found, return
   1349             `None`.
   1350 
   1351     """
   1352     host = models.Host.smart_get(host_id)
   1353     if not host.shard:
   1354         return get_status_task(host_id, end_time)
   1355     else:
   1356         # The return values from AFE methods are post-processed
   1357         # objects that aren't JSON-serializable.  So, we have to
   1358         # call AFE.run() to get the raw, serializable output from
   1359         # the shard.
   1360         shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
   1361         return shard_afe.run('get_status_task',
   1362                              host_id=host_id, end_time=end_time)
   1363 
   1364 
   1365 def get_host_diagnosis_interval(host_id, end_time, success):
   1366     """Find a "diagnosis interval" for a given host.
   1367 
   1368     A "diagnosis interval" identifies a start and end time where
   1369     the host went from "working" to "broken", or vice versa.  The
   1370     interval's starting time is the starting time of the last status
   1371     task with the old status; the end time is the finish time of the
   1372     first status task with the new status.
   1373 
   1374     This routine finds the most recent diagnosis interval for the
   1375     given host prior to `end_time`, with a starting status matching
   1376     `success`.  If `success` is true, the interval will start with a
   1377     successful status task; if false the interval will start with a
   1378     failed status task.
   1379 
   1380     @param host_id      Id in the database of the target host.
   1381     @param end_time     Time reference for the diagnosis interval.
   1382     @param success      Whether the diagnosis interval should start
   1383                         with a successful or failed status task.
   1384 
   1385     @return A list of two strings.  The first is the timestamp for
   1386             the beginning of the interval; the second is the
   1387             timestamp for the end.  If the host has never changed
   1388             state, the list is empty.
   1389 
   1390     """
   1391     host = models.Host.smart_get(host_id)
   1392     if not host.shard or utils.is_shard():
   1393         return status_history.get_diagnosis_interval(
   1394                 host_id, end_time, success)
   1395     else:
   1396         shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
   1397         return shard_afe.get_host_diagnosis_interval(
   1398                 host_id, end_time, success)
   1399 
   1400 
   1401 # support for host detail view
   1402 
   1403 def get_host_queue_entries_and_special_tasks(host, query_start=None,
   1404                                              query_limit=None, start_time=None,
   1405                                              end_time=None):
   1406     """
   1407     @returns an interleaved list of HostQueueEntries and SpecialTasks,
   1408             in approximate run order.  each dict contains keys for type, host,
   1409             job, status, started_on, execution_path, and ID.
   1410     """
   1411     total_limit = None
   1412     if query_limit is not None:
   1413         total_limit = query_start + query_limit
   1414     filter_data_common = {'host': host,
   1415                           'query_limit': total_limit,
   1416                           'sort_by': ['-id']}
   1417 
   1418     filter_data_special_tasks = rpc_utils.inject_times_to_filter(
   1419             'time_started__gte', 'time_started__lte', start_time, end_time,
   1420             **filter_data_common)
   1421 
   1422     queue_entries = get_host_queue_entries(
   1423             start_time, end_time, **filter_data_common)
   1424     special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
   1425 
   1426     interleaved_entries = rpc_utils.interleave_entries(queue_entries,
   1427                                                        special_tasks)
   1428     if query_start is not None:
   1429         interleaved_entries = interleaved_entries[query_start:]
   1430     if query_limit is not None:
   1431         interleaved_entries = interleaved_entries[:query_limit]
   1432     return rpc_utils.prepare_host_queue_entries_and_special_tasks(
   1433             interleaved_entries, queue_entries)
   1434 
   1435 
   1436 def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
   1437                                                  end_time=None):
   1438     filter_data_common = {'host': host}
   1439 
   1440     filter_data_queue_entries, filter_data_special_tasks = (
   1441             rpc_utils.inject_times_to_hqe_special_tasks_filters(
   1442                     filter_data_common, start_time, end_time))
   1443 
   1444     return (models.HostQueueEntry.query_count(filter_data_queue_entries)
   1445             + get_host_num_special_tasks(**filter_data_special_tasks))
   1446 
   1447 
   1448 # other
   1449 
   1450 def echo(data=""):
   1451     """\
   1452     Returns a passed in string. For doing a basic test to see if RPC calls
   1453     can successfully be made.
   1454     """
   1455     return data
   1456 
   1457 
   1458 def get_motd():
   1459     """\
   1460     Returns the message of the day as a string.
   1461     """
   1462     return rpc_utils.get_motd()
   1463 
   1464 
   1465 def get_static_data():
   1466     """\
   1467     Returns a dictionary containing a bunch of data that shouldn't change
   1468     often and is otherwise inaccessible.  This includes:
   1469 
   1470     priorities: List of job priority choices.
   1471     default_priority: Default priority value for new jobs.
   1472     users: Sorted list of all users.
   1473     labels: Sorted list of labels not start with 'cros-version' and
   1474             'fw-version'.
   1475     tests: Sorted list of all tests.
   1476     profilers: Sorted list of all profilers.
   1477     current_user: Logged-in username.
   1478     host_statuses: Sorted list of possible Host statuses.
   1479     job_statuses: Sorted list of possible HostQueueEntry statuses.
   1480     job_timeout_default: The default job timeout length in minutes.
   1481     parse_failed_repair_default: Default value for the parse_failed_repair job
   1482             option.
   1483     reboot_before_options: A list of valid RebootBefore string enums.
   1484     reboot_after_options: A list of valid RebootAfter string enums.
   1485     motd: Server's message of the day.
   1486     status_dictionary: A mapping from one word job status names to a more
   1487             informative description.
   1488     """
   1489 
   1490     default_drone_set_name = models.DroneSet.default_drone_set_name()
   1491     drone_sets = ([default_drone_set_name] +
   1492                   sorted(drone_set.name for drone_set in
   1493                          models.DroneSet.objects.exclude(
   1494                                  name=default_drone_set_name)))
   1495 
   1496     result = {}
   1497     result['priorities'] = priorities.Priority.choices()
   1498     result['default_priority'] = 'Default'
   1499     result['max_schedulable_priority'] = priorities.Priority.DEFAULT
   1500     result['users'] = get_users(sort_by=['login'])
   1501 
   1502     label_exclude_filters = [{'name__startswith': 'cros-version'},
   1503                              {'name__startswith': 'fw-version'},
   1504                              {'name__startswith': 'fwrw-version'},
   1505                              {'name__startswith': 'fwro-version'},
   1506                              {'name__startswith': 'ab-version'},
   1507                              {'name__startswith': 'testbed-version'}]
   1508     result['labels'] = get_labels(
   1509         label_exclude_filters,
   1510         sort_by=['-platform', 'name'])
   1511 
   1512     result['tests'] = get_tests(sort_by=['name'])
   1513     result['profilers'] = get_profilers(sort_by=['name'])
   1514     result['current_user'] = rpc_utils.prepare_for_serialization(
   1515         models.User.current_user().get_object_dict())
   1516     result['host_statuses'] = sorted(models.Host.Status.names)
   1517     result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
   1518     result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS
   1519     result['job_max_runtime_mins_default'] = (
   1520         models.Job.DEFAULT_MAX_RUNTIME_MINS)
   1521     result['parse_failed_repair_default'] = bool(
   1522         models.Job.DEFAULT_PARSE_FAILED_REPAIR)
   1523     result['reboot_before_options'] = model_attributes.RebootBefore.names
   1524     result['reboot_after_options'] = model_attributes.RebootAfter.names
   1525     result['motd'] = rpc_utils.get_motd()
   1526     result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()
   1527     result['drone_sets'] = drone_sets
   1528 
   1529     result['status_dictionary'] = {"Aborted": "Aborted",
   1530                                    "Verifying": "Verifying Host",
   1531                                    "Provisioning": "Provisioning Host",
   1532                                    "Pending": "Waiting on other hosts",
   1533                                    "Running": "Running autoserv",
   1534                                    "Completed": "Autoserv completed",
   1535                                    "Failed": "Failed to complete",
   1536                                    "Queued": "Queued",
   1537                                    "Starting": "Next in host's queue",
   1538                                    "Stopped": "Other host(s) failed verify",
   1539                                    "Parsing": "Awaiting parse of final results",
   1540                                    "Gathering": "Gathering log files",
   1541                                    "Waiting": "Waiting for scheduler action",
   1542                                    "Archiving": "Archiving results",
   1543                                    "Resetting": "Resetting hosts"}
   1544 
   1545     result['wmatrix_url'] = rpc_utils.get_wmatrix_url()
   1546     result['is_moblab'] = bool(utils.is_moblab())
   1547 
   1548     return result
   1549 
   1550 
   1551 def get_server_time():
   1552     return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
   1553 
   1554 
   1555 def get_hosts_by_attribute(attribute, value):
   1556     """
   1557     Get the list of valid hosts that share the same host attribute value.
   1558 
   1559     @param attribute: String of the host attribute to check.
   1560     @param value: String of the value that is shared between hosts.
   1561 
   1562     @returns List of hostnames that all have the same host attribute and
   1563              value.
   1564     """
   1565     hosts = models.HostAttribute.query_objects({'attribute': attribute,
   1566                                                 'value': value})
   1567     return [row.host.hostname for row in hosts if row.host.invalid == 0]
   1568 
   1569 
   1570 def canonicalize_suite_name(suite_name):
   1571     """Canonicalize the suite's name.
   1572 
   1573     @param suite_name: the name of the suite.
   1574     """
   1575     # Do not change this naming convention without updating
   1576     # site_utils.parse_job_name.
   1577     return 'test_suites/control.%s' % suite_name
   1578 
   1579 
   1580 def formatted_now():
   1581     """Format the current datetime."""
   1582     return datetime.datetime.now().strftime(time_utils.TIME_FMT)
   1583 
   1584 
   1585 def _get_control_file_by_build(build, ds, suite_name):
   1586     """Return control file contents for |suite_name|.
   1587 
   1588     Query the dev server at |ds| for the control file |suite_name|, included
   1589     in |build| for |board|.
   1590 
   1591     @param build: unique name by which to refer to the image from now on.
   1592     @param ds: a dev_server.DevServer instance to fetch control file with.
   1593     @param suite_name: canonicalized suite name, e.g. test_suites/control.bvt.
   1594     @raises ControlFileNotFound if a unique suite control file doesn't exist.
   1595     @raises NoControlFileList if we can't list the control files at all.
   1596     @raises ControlFileEmpty if the control file exists on the server, but
   1597                              can't be read.
   1598 
   1599     @return the contents of the desired control file.
   1600     """
   1601     getter = control_file_getter.DevServerGetter.create(build, ds)
   1602     devserver_name = ds.hostname
   1603     timer = autotest_stats.Timer('control_files.parse.%s.%s' %
   1604                                  (devserver_name.replace('.', '_'),
   1605                                   suite_name.rsplit('.')[-1]))
   1606     # Get the control file for the suite.
   1607     try:
   1608         with timer:
   1609             control_file_in = getter.get_control_file_contents_by_name(
   1610                     suite_name)
   1611     except error.CrosDynamicSuiteException as e:
   1612         raise type(e)('Failed to get control file for %s '
   1613                       '(devserver: %s) (error: %s)' %
   1614                       (build, devserver_name, e))
   1615     if not control_file_in:
   1616         raise error.ControlFileEmpty(
   1617             "Fetching %s returned no data. (devserver: %s)" %
   1618             (suite_name, devserver_name))
   1619     # Force control files to only contain ascii characters.
   1620     try:
   1621         control_file_in.encode('ascii')
   1622     except UnicodeDecodeError as e:
   1623         raise error.ControlFileMalformed(str(e))
   1624 
   1625     return control_file_in
   1626 
   1627 
   1628 def _get_control_file_by_suite(suite_name):
   1629     """Get control file contents by suite name.
   1630 
   1631     @param suite_name: Suite name as string.
   1632     @returns: Control file contents as string.
   1633     """
   1634     getter = control_file_getter.FileSystemGetter(
   1635             [_CONFIG.get_config_value('SCHEDULER',
   1636                                       'drone_installation_directory')])
   1637     return getter.get_control_file_contents_by_name(suite_name)
   1638 
   1639 
   1640 def _stage_build_artifacts(build, hostname=None):
   1641     """
   1642     Ensure components of |build| necessary for installing images are staged.
   1643 
   1644     @param build image we want to stage.
   1645     @param hostname hostname of a dut may run test on. This is to help to locate
   1646         a devserver closer to duts if needed. Default is None.
   1647 
   1648     @raises StageControlFileFailure: if the dev server throws 500 while staging
   1649         suite control files.
   1650 
   1651     @return: dev_server.ImageServer instance to use with this build.
   1652     @return: timings dictionary containing staging start/end times.
   1653     """
   1654     timings = {}
   1655     # Ensure components of |build| necessary for installing images are staged
   1656     # on the dev server. However set synchronous to False to allow other
   1657     # components to be downloaded in the background.
   1658     ds = dev_server.resolve(build, hostname=hostname)
   1659     ds_name = ds.hostname
   1660     timings[constants.DOWNLOAD_STARTED_TIME] = formatted_now()
   1661     timer = autotest_stats.Timer('control_files.stage.%s' % (
   1662             ds_name.replace('.', '_')))
   1663     try:
   1664         with timer:
   1665             ds.stage_artifacts(image=build, artifacts=['test_suites'])
   1666     except dev_server.DevServerException as e:
   1667         raise error.StageControlFileFailure(
   1668                 "Failed to stage %s on %s: %s" % (build, ds_name, e))
   1669     timings[constants.PAYLOAD_FINISHED_TIME] = formatted_now()
   1670     return (ds, timings)
   1671 
   1672 
   1673 @rpc_utils.route_rpc_to_master
   1674 def create_suite_job(
   1675         name='',
   1676         board='',
   1677         pool='',
   1678         control_file='',
   1679         check_hosts=True,
   1680         num=None,
   1681         file_bugs=False,
   1682         timeout=24,
   1683         timeout_mins=None,
   1684         priority=priorities.Priority.DEFAULT,
   1685         suite_args=None,
   1686         wait_for_results=True,
   1687         job_retry=False,
   1688         max_retries=None,
   1689         max_runtime_mins=None,
   1690         suite_min_duts=0,
   1691         offload_failures_only=False,
   1692         builds=None,
   1693         test_source_build=None,
   1694         run_prod_code=False,
   1695         delay_minutes=0,
   1696         is_cloning=False,
   1697         job_keyvals=None,
   1698         test_args=None,
   1699         **kwargs
   1700 ):
   1701     """
   1702     Create a job to run a test suite on the given device with the given image.
   1703 
   1704     When the timeout specified in the control file is reached, the
   1705     job is guaranteed to have completed and results will be available.
   1706 
   1707     @param name: The test name if control_file is supplied, otherwise the name
   1708                  of the test suite to run, e.g. 'bvt'.
   1709     @param board: the kind of device to run the tests on.
   1710     @param builds: the builds to install e.g.
   1711                    {'cros-version:': 'x86-alex-release/R18-1655.0.0',
   1712                     'fwrw-version:':  'x86-alex-firmware/R36-5771.50.0',
   1713                     'fwro-version:':  'x86-alex-firmware/R36-5771.49.0'}
   1714                    If builds is given a value, it overrides argument build.
   1715     @param test_source_build: Build that contains the server-side test code.
   1716     @param pool: Specify the pool of machines to use for scheduling
   1717             purposes.
   1718     @param control_file: the control file of the job.
   1719     @param check_hosts: require appropriate live hosts to exist in the lab.
   1720     @param num: Specify the number of machines to schedule across (integer).
   1721                 Leave unspecified or use None to use default sharding factor.
   1722     @param file_bugs: File a bug on each test failure in this suite.
   1723     @param timeout: The max lifetime of this suite, in hours.
   1724     @param timeout_mins: The max lifetime of this suite, in minutes. Takes
   1725                          priority over timeout.
   1726     @param priority: Integer denoting priority. Higher is more important.
   1727     @param suite_args: Optional arguments which will be parsed by the suite
   1728                        control file. Used by control.test_that_wrapper to
   1729                        determine which tests to run.
   1730     @param wait_for_results: Set to False to run the suite job without waiting
   1731                              for test jobs to finish. Default is True.
   1732     @param job_retry: Set to True to enable job-level retry. Default is False.
   1733     @param max_retries: Integer, maximum job retries allowed at suite level.
   1734                         None for no max.
   1735     @param max_runtime_mins: Maximum amount of time a job can be running in
   1736                              minutes.
   1737     @param suite_min_duts: Integer. Scheduler will prioritize getting the
   1738                            minimum number of machines for the suite when it is
   1739                            competing with another suite that has a higher
   1740                            priority but already got minimum machines it needs.
   1741     @param offload_failures_only: Only enable gs_offloading for failed jobs.
   1742     @param run_prod_code: If True, the suite will run the test code that
   1743                           lives in prod aka the test code currently on the
   1744                           lab servers. If False, the control files and test
   1745                           code for this suite run will be retrieved from the
   1746                           build artifacts.
   1747     @param delay_minutes: Delay the creation of test jobs for a given number of
   1748                           minutes.
   1749     @param is_cloning: True if creating a cloning job.
   1750     @param job_keyvals: A dict of job keyvals to be inject to control file.
   1751     @param test_args: A dict of args passed all the way to each individual test
   1752                       that will be actually run.
   1753     @param kwargs: extra keyword args. NOT USED.
   1754 
   1755     @raises ControlFileNotFound: if a unique suite control file doesn't exist.
   1756     @raises NoControlFileList: if we can't list the control files at all.
   1757     @raises StageControlFileFailure: If the dev server throws 500 while
   1758                                      staging test_suites.
   1759     @raises ControlFileEmpty: if the control file exists on the server, but
   1760                               can't be read.
   1761 
   1762     @return: the job ID of the suite; -1 on error.
   1763     """
   1764     if type(num) is not int and num is not None:
   1765         raise error.SuiteArgumentException('Ill specified num argument %r. '
   1766                                            'Must be an integer or None.' % num)
   1767     if num == 0:
   1768         logging.warning("Can't run on 0 hosts; using default.")
   1769         num = None
   1770 
   1771     if builds is None:
   1772         builds = {}
   1773 
   1774     # Default test source build to CrOS build if it's not specified and
   1775     # run_prod_code is set to False.
   1776     if not run_prod_code:
   1777         test_source_build = Suite.get_test_source_build(
   1778                 builds, test_source_build=test_source_build)
   1779 
   1780     sample_dut = rpc_utils.get_sample_dut(board, pool)
   1781 
   1782     suite_name = canonicalize_suite_name(name)
   1783     if run_prod_code:
   1784         ds = dev_server.resolve(test_source_build, hostname=sample_dut)
   1785         keyvals = {}
   1786     else:
   1787         (ds, keyvals) = _stage_build_artifacts(
   1788                 test_source_build, hostname=sample_dut)
   1789     keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts
   1790 
   1791     # Do not change this naming convention without updating
   1792     # site_utils.parse_job_name.
   1793     if run_prod_code:
   1794         # If run_prod_code is True, test_source_build is not set, use the
   1795         # first build in the builds list for the sutie job name.
   1796         name = '%s-%s' % (builds.values()[0], suite_name)
   1797     else:
   1798         name = '%s-%s' % (test_source_build, suite_name)
   1799 
   1800     timeout_mins = timeout_mins or timeout * 60
   1801     max_runtime_mins = max_runtime_mins or timeout * 60
   1802 
   1803     if not board:
   1804         board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0]
   1805 
   1806     if run_prod_code:
   1807         control_file = _get_control_file_by_suite(suite_name)
   1808 
   1809     if not control_file:
   1810         # No control file was supplied so look it up from the build artifacts.
   1811         control_file = _get_control_file_by_build(
   1812                 test_source_build, ds, suite_name)
   1813 
   1814     # Prepend builds and board to the control file.
   1815     if is_cloning:
   1816         control_file = tools.remove_injection(control_file)
   1817 
   1818     inject_dict = {
   1819         'board': board,
   1820         # `build` is needed for suites like AU to stage image inside suite
   1821         # control file.
   1822         'build': test_source_build,
   1823         'builds': builds,
   1824         'check_hosts': check_hosts,
   1825         'pool': pool,
   1826         'num': num,
   1827         'file_bugs': file_bugs,
   1828         'timeout': timeout,
   1829         'timeout_mins': timeout_mins,
   1830         'devserver_url': ds.url(),
   1831         'priority': priority,
   1832         'suite_args' : suite_args,
   1833         'wait_for_results': wait_for_results,
   1834         'job_retry': job_retry,
   1835         'max_retries': max_retries,
   1836         'max_runtime_mins': max_runtime_mins,
   1837         'offload_failures_only': offload_failures_only,
   1838         'test_source_build': test_source_build,
   1839         'run_prod_code': run_prod_code,
   1840         'delay_minutes': delay_minutes,
   1841         'job_keyvals': job_keyvals,
   1842         'test_args': test_args,
   1843     }
   1844     control_file = tools.inject_vars(inject_dict, control_file)
   1845 
   1846     return rpc_utils.create_job_common(name,
   1847                                        priority=priority,
   1848                                        timeout_mins=timeout_mins,
   1849                                        max_runtime_mins=max_runtime_mins,
   1850                                        control_type='Server',
   1851                                        control_file=control_file,
   1852                                        hostless=True,
   1853                                        keyvals=keyvals)
   1854 
   1855 
   1856 def get_job_history(**filter_data):
   1857     """Get history of the job, including the special tasks executed for the job
   1858 
   1859     @param filter_data: filter for the call, should at least include
   1860                         {'job_id': [job id]}
   1861     @returns: JSON string of the job's history, including the information such
   1862               as the hosts run the job and the special tasks executed before
   1863               and after the job.
   1864     """
   1865     job_id = filter_data['job_id']
   1866     job_info = job_history.get_job_info(job_id)
   1867     return rpc_utils.prepare_for_serialization(job_info.get_history())
   1868 
   1869 
   1870 def get_host_history(start_time, end_time, hosts=None, board=None, pool=None):
   1871     """Get history of a list of host.
   1872 
   1873     The return is a JSON string of host history for each host, for example,
   1874     {'172.22.33.51': [{'status': 'Resetting'
   1875                        'start_time': '2014-08-07 10:02:16',
   1876                        'end_time': '2014-08-07 10:03:16',
   1877                        'log_url': 'http://autotest/reset-546546/debug',
   1878                        'dbg_str': 'Task: Special Task 19441991 (host ...)'},
   1879                        {'status': 'Running'
   1880                        'start_time': '2014-08-07 10:03:18',
   1881                        'end_time': '2014-08-07 10:13:00',
   1882                        'log_url': 'http://autotest/reset-546546/debug',
   1883                        'dbg_str': 'HQE: 15305005, for job: 14995562'}
   1884                      ]
   1885     }
   1886     @param start_time: start time to search for history, can be string value or
   1887                        epoch time.
   1888     @param end_time: end time to search for history, can be string value or
   1889                      epoch time.
   1890     @param hosts: A list of hosts to search for history. Default is None.
   1891     @param board: board type of hosts. Default is None.
   1892     @param pool: pool type of hosts. Default is None.
   1893     @returns: JSON string of the host history.
   1894     """
   1895     return rpc_utils.prepare_for_serialization(
   1896             host_history.get_history_details(
   1897                     start_time=start_time, end_time=end_time,
   1898                     hosts=hosts, board=board, pool=pool,
   1899                     process_pool_size=4))
   1900 
   1901 
   1902 def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(),
   1903                     known_host_ids=(), known_host_statuses=()):
   1904     """Receive updates for job statuses from shards and assign hosts and jobs.
   1905 
   1906     @param shard_hostname: Hostname of the calling shard
   1907     @param jobs: Jobs in serialized form that should be updated with newer
   1908                  status from a shard.
   1909     @param hqes: Hostqueueentries in serialized form that should be updated with
   1910                  newer status from a shard. Note that for every hostqueueentry
   1911                  the corresponding job must be in jobs.
   1912     @param known_job_ids: List of ids of jobs the shard already has.
   1913     @param known_host_ids: List of ids of hosts the shard already has.
   1914     @param known_host_statuses: List of statuses of hosts the shard already has.
   1915 
   1916     @returns: Serialized representations of hosts, jobs, suite job keyvals
   1917               and their dependencies to be inserted into a shard's database.
   1918     """
   1919     # The following alternatives to sending host and job ids in every heartbeat
   1920     # have been considered:
   1921     # 1. Sending the highest known job and host ids. This would work for jobs:
   1922     #    Newer jobs always have larger ids. Also, if a job is not assigned to a
   1923     #    particular shard during a heartbeat, it never will be assigned to this
   1924     #    shard later.
   1925     #    This is not true for hosts though: A host that is leased won't be sent
   1926     #    to the shard now, but might be sent in a future heartbeat. This means
   1927     #    sometimes hosts should be transfered that have a lower id than the
   1928     #    maximum host id the shard knows.
   1929     # 2. Send the number of jobs/hosts the shard knows to the master in each
   1930     #    heartbeat. Compare these to the number of records that already have
   1931     #    the shard_id set to this shard. In the normal case, they should match.
   1932     #    In case they don't, resend all entities of that type.
   1933     #    This would work well for hosts, because there aren't that many.
   1934     #    Resending all jobs is quite a big overhead though.
   1935     #    Also, this approach might run into edge cases when entities are
   1936     #    ever deleted.
   1937     # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts.
   1938     #    Using two different approaches isn't consistent and might cause
   1939     #    confusion. Also the issues with the case of deletions might still
   1940     #    occur.
   1941     #
   1942     # The overhead of sending all job and host ids in every heartbeat is low:
   1943     # At peaks one board has about 1200 created but unfinished jobs.
   1944     # See the numbers here: http://goo.gl/gQCGWH
   1945     # Assuming that job id's have 6 digits and that json serialization takes a
   1946     # comma and a space as overhead, the traffic per id sent is about 8 bytes.
   1947     # If 5000 ids need to be sent, this means 40 kilobytes of traffic.
   1948     # A NOT IN query with 5000 ids took about 30ms in tests made.
   1949     # These numbers seem low enough to outweigh the disadvantages of the
   1950     # solutions described above.
   1951     timer = autotest_stats.Timer('shard_heartbeat')
   1952     with timer:
   1953         shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
   1954         rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
   1955         assert len(known_host_ids) == len(known_host_statuses)
   1956         for i in range(len(known_host_ids)):
   1957             host_model = models.Host.objects.get(pk=known_host_ids[i])
   1958             if host_model.status != known_host_statuses[i]:
   1959                 host_model.status = known_host_statuses[i]
   1960                 host_model.save()
   1961 
   1962         hosts, jobs, suite_keyvals = rpc_utils.find_records_for_shard(
   1963                 shard_obj, known_job_ids=known_job_ids,
   1964                 known_host_ids=known_host_ids)
   1965         return {
   1966             'hosts': [host.serialize() for host in hosts],
   1967             'jobs': [job.serialize() for job in jobs],
   1968             'suite_keyvals': [kv.serialize() for kv in suite_keyvals],
   1969         }
   1970 
   1971 
   1972 def get_shards(**filter_data):
   1973     """Return a list of all shards.
   1974 
   1975     @returns A sequence of nested dictionaries of shard information.
   1976     """
   1977     shards = models.Shard.query_objects(filter_data)
   1978     serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ())
   1979     for serialized, shard in zip(serialized_shards, shards):
   1980         serialized['labels'] = [label.name for label in shard.labels.all()]
   1981 
   1982     return serialized_shards
   1983 
   1984 
   1985 def _assign_board_to_shard_precheck(labels):
   1986     """Verify whether board labels are valid to be added to a given shard.
   1987 
   1988     First check whether board label is in correct format. Second, check whether
   1989     the board label exist. Third, check whether the board has already been
   1990     assigned to shard.
   1991 
   1992     @param labels: Board labels separated by comma.
   1993 
   1994     @raises error.RPCException: If label provided doesn't start with `board:`
   1995             or board has been added to shard already.
   1996     @raises models.Label.DoesNotExist: If the label specified doesn't exist.
   1997 
   1998     @returns: A list of label models that ready to be added to shard.
   1999     """
   2000     if not labels:
   2001       # allow creation of label-less shards (labels='' would otherwise fail the
   2002       # checks below)
   2003       return []
   2004     labels = labels.split(',')
   2005     label_models = []
   2006     for label in labels:
   2007         # Check whether the board label is in correct format.
   2008         if not label.startswith('board:'):
   2009             raise error.RPCException('Sharding only supports `board:.*` label.')
   2010         # Check whether the board label exist. If not, exception will be thrown
   2011         # by smart_get function.
   2012         label = models.Label.smart_get(label)
   2013         # Check whether the board has been sharded already
   2014         try:
   2015             shard = models.Shard.objects.get(labels=label)
   2016             raise error.RPCException(
   2017                     '%s is already on shard %s' % (label, shard.hostname))
   2018         except models.Shard.DoesNotExist:
   2019             # board is not on any shard, so it's valid.
   2020             label_models.append(label)
   2021     return label_models
   2022 
   2023 
   2024 def add_shard(hostname, labels):
   2025     """Add a shard and start running jobs on it.
   2026 
   2027     @param hostname: The hostname of the shard to be added; needs to be unique.
   2028     @param labels: Board labels separated by comma. Jobs of one of the labels
   2029                    will be assigned to the shard.
   2030 
   2031     @raises error.RPCException: If label provided doesn't start with `board:` or
   2032             board has been added to shard already.
   2033     @raises model_logic.ValidationError: If a shard with the given hostname
   2034             already exist.
   2035     @raises models.Label.DoesNotExist: If the label specified doesn't exist.
   2036 
   2037     @returns: The id of the added shard.
   2038     """
   2039     labels = _assign_board_to_shard_precheck(labels)
   2040     shard = models.Shard.add_object(hostname=hostname)
   2041     for label in labels:
   2042         shard.labels.add(label)
   2043     return shard.id
   2044 
   2045 
   2046 def add_board_to_shard(hostname, labels):
   2047     """Add boards to a given shard
   2048 
   2049     @param hostname: The hostname of the shard to be changed.
   2050     @param labels: Board labels separated by comma.
   2051 
   2052     @raises error.RPCException: If label provided doesn't start with `board:` or
   2053             board has been added to shard already.
   2054     @raises models.Label.DoesNotExist: If the label specified doesn't exist.
   2055 
   2056     @returns: The id of the changed shard.
   2057     """
   2058     labels = _assign_board_to_shard_precheck(labels)
   2059     shard = models.Shard.objects.get(hostname=hostname)
   2060     for label in labels:
   2061         shard.labels.add(label)
   2062     return shard.id
   2063 
   2064 
   2065 def delete_shard(hostname):
   2066     """Delete a shard and reclaim all resources from it.
   2067 
   2068     This claims back all assigned hosts from the shard. To ensure all DUTs are
   2069     in a sane state, a Reboot task with highest priority is scheduled for them.
   2070     This reboots the DUTs and then all left tasks continue to run in drone of
   2071     the master.
   2072 
   2073     The procedure for deleting a shard:
   2074         * Lock all unlocked hosts on that shard.
   2075         * Remove shard information .
   2076         * Assign a reboot task with highest priority to these hosts.
   2077         * Unlock these hosts, then, the reboot tasks run in front of all other
   2078         tasks.
   2079 
   2080     The status of jobs that haven't been reported to be finished yet, will be
   2081     lost. The master scheduler will pick up the jobs and execute them.
   2082 
   2083     @param hostname: Hostname of the shard to delete.
   2084     """
   2085     shard = rpc_utils.retrieve_shard(shard_hostname=hostname)
   2086     hostnames_to_lock = [h.hostname for h in
   2087                          models.Host.objects.filter(shard=shard, locked=False)]
   2088 
   2089     # TODO(beeps): Power off shard
   2090     # For ChromeOS hosts, a reboot test with the highest priority is added to
   2091     # the DUT. After a reboot it should be ganranteed that no processes from
   2092     # prior tests that were run by a shard are still running on.
   2093 
   2094     # Lock all unlocked hosts.
   2095     dicts = {'locked': True, 'lock_time': datetime.datetime.now()}
   2096     models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts)
   2097 
   2098     # Remove shard information.
   2099     models.Host.objects.filter(shard=shard).update(shard=None)
   2100     models.Job.objects.filter(shard=shard).update(shard=None)
   2101     shard.labels.clear()
   2102     shard.delete()
   2103 
   2104     # Assign a reboot task with highest priority: Super.
   2105     t = models.Test.objects.get(name='platform_BootPerfServer:shard')
   2106     c = utils.read_file(os.path.join(common.autotest_dir, t.path))
   2107     if hostnames_to_lock:
   2108         rpc_utils.create_job_common(
   2109                 'reboot_dut_for_shard_deletion',
   2110                 priority=priorities.Priority.SUPER,
   2111                 control_type='Server',
   2112                 control_file=c, hosts=hostnames_to_lock)
   2113 
   2114     # Unlock these shard-related hosts.
   2115     dicts = {'locked': False, 'lock_time': None}
   2116     models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts)
   2117 
   2118 
   2119 def get_servers(hostname=None, role=None, status=None):
   2120     """Get a list of servers with matching role and status.
   2121 
   2122     @param hostname: FQDN of the server.
   2123     @param role: Name of the server role, e.g., drone, scheduler. Default to
   2124                  None to match any role.
   2125     @param status: Status of the server, e.g., primary, backup, repair_required.
   2126                    Default to None to match any server status.
   2127 
   2128     @raises error.RPCException: If server database is not used.
   2129     @return: A list of server names for servers with matching role and status.
   2130     """
   2131     if not server_manager_utils.use_server_db():
   2132         raise error.RPCException('Server database is not enabled. Please try '
   2133                                  'retrieve servers from global config.')
   2134     servers = server_manager_utils.get_servers(hostname=hostname, role=role,
   2135                                                status=status)
   2136     return [s.get_details() for s in servers]
   2137 
   2138 
   2139 @rpc_utils.route_rpc_to_master
   2140 def get_stable_version(board=stable_version_utils.DEFAULT, android=False):
   2141     """Get stable version for the given board.
   2142 
   2143     @param board: Name of the board.
   2144     @param android: If True, the given board is an Android-based device. If
   2145                     False, assume its a Chrome OS-based device.
   2146 
   2147     @return: Stable version of the given board. Return global configure value
   2148              of CROS.stable_cros_version if stable_versinos table does not have
   2149              entry of board DEFAULT.
   2150     """
   2151     return stable_version_utils.get(board=board, android=android)
   2152 
   2153 
   2154 @rpc_utils.route_rpc_to_master
   2155 def get_all_stable_versions():
   2156     """Get stable versions for all boards.
   2157 
   2158     @return: A dictionary of board:version.
   2159     """
   2160     return stable_version_utils.get_all()
   2161 
   2162 
   2163 @rpc_utils.route_rpc_to_master
   2164 def set_stable_version(version, board=stable_version_utils.DEFAULT):
   2165     """Modify stable version for the given board.
   2166 
   2167     @param version: The new value of stable version for given board.
   2168     @param board: Name of the board, default to value `DEFAULT`.
   2169     """
   2170     stable_version_utils.set(version=version, board=board)
   2171 
   2172 
   2173 @rpc_utils.route_rpc_to_master
   2174 def delete_stable_version(board):
   2175     """Modify stable version for the given board.
   2176 
   2177     Delete a stable version entry in afe_stable_versions table for a given
   2178     board, so default stable version will be used.
   2179 
   2180     @param board: Name of the board.
   2181     """
   2182     stable_version_utils.delete(board=board)
   2183 
   2184 
   2185 def get_tests_by_build(build, ignore_invalid_tests=True):
   2186     """Get the tests that are available for the specified build.
   2187 
   2188     @param build: unique name by which to refer to the image.
   2189     @param ignore_invalid_tests: flag on if unparsable tests are ignored.
   2190 
   2191     @return: A sorted list of all tests that are in the build specified.
   2192     """
   2193     # Collect the control files specified in this build
   2194     cfile_getter = control_file_lib._initialize_control_file_getter(build)
   2195     if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
   2196         control_file_info_list = cfile_getter.get_suite_info()
   2197         control_file_list = control_file_info_list.keys()
   2198     else:
   2199         control_file_list = cfile_getter.get_control_file_list()
   2200 
   2201     test_objects = []
   2202     _id = 0
   2203     for control_file_path in control_file_list:
   2204         # Read and parse the control file
   2205         if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
   2206             control_file = control_file_info_list[control_file_path]
   2207         else:
   2208             control_file = cfile_getter.get_control_file_contents(
   2209                     control_file_path)
   2210         try:
   2211             control_obj = control_data.parse_control_string(control_file)
   2212         except:
   2213             logging.info('Failed to parse control file: %s', control_file_path)
   2214             if not ignore_invalid_tests:
   2215                 raise
   2216 
   2217         # Extract the values needed for the AFE from the control_obj.
   2218         # The keys list represents attributes in the control_obj that
   2219         # are required by the AFE
   2220         keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental',
   2221                 'test_category', 'test_class', 'dependencies', 'run_verify',
   2222                 'sync_count', 'job_retries', 'retries', 'path']
   2223 
   2224         test_object = {}
   2225         for key in keys:
   2226             test_object[key] = getattr(control_obj, key) if hasattr(
   2227                     control_obj, key) else ''
   2228 
   2229         # Unfortunately, the AFE expects different key-names for certain
   2230         # values, these must be corrected to avoid the risk of tests
   2231         # being omitted by the AFE.
   2232         # The 'id' is an additional value used in the AFE.
   2233         # The control_data parsing does not reference 'run_reset', but it
   2234         # is also used in the AFE and defaults to True.
   2235         test_object['id'] = _id
   2236         test_object['run_reset'] = True
   2237         test_object['description'] = test_object.get('doc', '')
   2238         test_object['test_time'] = test_object.get('time', 0)
   2239         test_object['test_retry'] = test_object.get('retries', 0)
   2240 
   2241         # Fix the test name to be consistent with the current presentation
   2242         # of test names in the AFE.
   2243         testpath, subname = os.path.split(control_file_path)
   2244         testname = os.path.basename(testpath)
   2245         subname = subname.split('.')[1:]
   2246         if subname:
   2247             testname = '%s:%s' % (testname, ':'.join(subname))
   2248 
   2249         test_object['name'] = testname
   2250 
   2251         # Correct the test path as parse_control_string sets an empty string.
   2252         test_object['path'] = control_file_path
   2253 
   2254         _id += 1
   2255         test_objects.append(test_object)
   2256 
   2257     test_objects = sorted(test_objects, key=lambda x: x.get('name'))
   2258     return rpc_utils.prepare_for_serialization(test_objects)
   2259