Home | History | Annotate | Download | only in afe
      1 # pylint: disable=missing-docstring
      2 
      3 """\
      4 Functions to expose over the RPC interface.
      5 
      6 For all modify* and delete* functions that ask for an 'id' parameter to
      7 identify the object to operate on, the id may be either
      8  * the database row ID
      9  * the name of the object (label name, hostname, user login, etc.)
     10  * a dictionary containing uniquely identifying field (this option should seldom
     11    be used)
     12 
     13 When specifying foreign key fields (i.e. adding hosts to a label, or adding
     14 users to an ACL group), the given value may be either the database row ID or the
     15 name of the object.
     16 
     17 All get* functions return lists of dictionaries.  Each dictionary represents one
     18 object and maps field names to values.
     19 
     20 Some examples:
     21 modify_host(2, hostname='myhost') # modify hostname of host with database ID 2
     22 modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2'
     23 modify_test('sleeptest', test_type='Client', params=', seconds=60')
     24 delete_acl_group(1) # delete by ID
     25 delete_acl_group('Everyone') # delete by name
     26 acl_group_add_users('Everyone', ['mbligh', 'showard'])
     27 get_jobs(owner='showard', status='Queued')
     28 
     29 See doctests/001_rpc_test.txt for (lots) more examples.
     30 """
     31 
     32 __author__ = 'showard (at] google.com (Steve Howard)'
     33 
     34 import ast
     35 import collections
     36 import contextlib
     37 import datetime
     38 import logging
     39 import os
     40 import sys
     41 import warnings
     42 
     43 from django.db import connection as db_connection
     44 from django.db import transaction
     45 from django.db.models import Count
     46 from django.db.utils import DatabaseError
     47 
     48 import common
     49 from autotest_lib.client.common_lib import control_data
     50 from autotest_lib.client.common_lib import error
     51 from autotest_lib.client.common_lib import global_config
     52 from autotest_lib.client.common_lib import priorities
     53 from autotest_lib.client.common_lib.cros import dev_server
     54 from autotest_lib.frontend.afe import control_file as control_file_lib
     55 from autotest_lib.frontend.afe import model_attributes
     56 from autotest_lib.frontend.afe import model_logic
     57 from autotest_lib.frontend.afe import models
     58 from autotest_lib.frontend.afe import rpc_utils
     59 from autotest_lib.frontend.tko import models as tko_models
     60 from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
     61 from autotest_lib.server import frontend
     62 from autotest_lib.server import utils
     63 from autotest_lib.server.cros import provision
     64 from autotest_lib.server.cros.dynamic_suite import constants
     65 from autotest_lib.server.cros.dynamic_suite import control_file_getter
     66 from autotest_lib.server.cros.dynamic_suite import suite_common
     67 from autotest_lib.server.cros.dynamic_suite import tools
     68 from autotest_lib.server.cros.dynamic_suite.suite import Suite
     69 from autotest_lib.server.lib import status_history
     70 from autotest_lib.site_utils import job_history
     71 from autotest_lib.site_utils import server_manager_utils
     72 from autotest_lib.site_utils import stable_version_utils
     73 
     74 
     75 _CONFIG = global_config.global_config
     76 
     77 # Definition of LabHealthIndicator
     78 LabHealthIndicator = collections.namedtuple(
     79         'LabHealthIndicator',
     80         [
     81                 'if_lab_close',
     82                 'available_duts',
     83                 'devserver_health',
     84                 'upcoming_builds',
     85         ]
     86 )
     87 
     88 RESPECT_STATIC_LABELS = global_config.global_config.get_config_value(
     89         'SKYLAB', 'respect_static_labels', type=bool, default=False)
     90 
     91 RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value(
     92         'SKYLAB', 'respect_static_attributes', type=bool, default=False)
     93 
     94 # Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py.
     95 
     96 # labels
     97 
     98 def modify_label(id, **data):
     99     """Modify a label.
    100 
    101     @param id: id or name of a label. More often a label name.
    102     @param data: New data for a label.
    103     """
    104     label_model = models.Label.smart_get(id)
    105     if label_model.is_replaced_by_static():
    106         raise error.UnmodifiableLabelException(
    107                 'Failed to delete label "%s" because it is a static label. '
    108                 'Use go/chromeos-skylab-inventory-tools to modify this '
    109                 'label.' % label_model.name)
    110 
    111     label_model.update_object(data)
    112 
    113     # Master forwards the RPC to shards
    114     if not utils.is_shard():
    115         rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False,
    116                              id=id, **data)
    117 
    118 
    119 def delete_label(id):
    120     """Delete a label.
    121 
    122     @param id: id or name of a label. More often a label name.
    123     """
    124     label_model = models.Label.smart_get(id)
    125     if label_model.is_replaced_by_static():
    126         raise error.UnmodifiableLabelException(
    127                 'Failed to delete label "%s" because it is a static label. '
    128                 'Use go/chromeos-skylab-inventory-tools to modify this '
    129                 'label.' % label_model.name)
    130 
    131     # Hosts that have the label to be deleted. Save this info before
    132     # the label is deleted to use it later.
    133     hosts = []
    134     for h in label_model.host_set.all():
    135         hosts.append(models.Host.smart_get(h.id))
    136     label_model.delete()
    137 
    138     # Master forwards the RPC to shards
    139     if not utils.is_shard():
    140         rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id)
    141 
    142 
    143 def add_label(name, ignore_exception_if_exists=False, **kwargs):
    144     """Adds a new label of a given name.
    145 
    146     @param name: label name.
    147     @param ignore_exception_if_exists: If True and the exception was
    148         thrown due to the duplicated label name when adding a label,
    149         then suppress the exception. Default is False.
    150     @param kwargs: keyword args that store more info about a label
    151         other than the name.
    152     @return: int/long id of a new label.
    153     """
    154     # models.Label.add_object() throws model_logic.ValidationError
    155     # when it is given a label name that already exists.
    156     # However, ValidationError can be thrown with different errors,
    157     # and those errors should be thrown up to the call chain.
    158     try:
    159         label = models.Label.add_object(name=name, **kwargs)
    160     except:
    161         exc_info = sys.exc_info()
    162         if ignore_exception_if_exists:
    163             label = rpc_utils.get_label(name)
    164             # If the exception is raised not because of duplicated
    165             # "name", then raise the original exception.
    166             if label is None:
    167                 raise exc_info[0], exc_info[1], exc_info[2]
    168         else:
    169             raise exc_info[0], exc_info[1], exc_info[2]
    170     return label.id
    171 
    172 
    173 def add_label_to_hosts(id, hosts):
    174     """Adds a label of the given id to the given hosts only in local DB.
    175 
    176     @param id: id or name of a label. More often a label name.
    177     @param hosts: The hostnames of hosts that need the label.
    178 
    179     @raises models.Label.DoesNotExist: If the label with id doesn't exist.
    180     """
    181     label = models.Label.smart_get(id)
    182     if label.is_replaced_by_static():
    183         label = models.StaticLabel.smart_get(label.name)
    184 
    185     host_objs = models.Host.smart_get_bulk(hosts)
    186     if label.platform:
    187         models.Host.check_no_platform(host_objs)
    188     # Ensure a host has no more than one board label with it.
    189     if label.name.startswith('board:'):
    190         models.Host.check_board_labels_allowed(host_objs, [label.name])
    191     label.host_set.add(*host_objs)
    192 
    193 
    194 def _create_label_everywhere(id, hosts):
    195     """
    196     Yet another method to create labels.
    197 
    198     ALERT! This method should be run only on master not shards!
    199     DO NOT RUN THIS ON A SHARD!!!  Deputies will hate you if you do!!!
    200 
    201     This method exists primarily to serve label_add_hosts() and
    202     host_add_labels().  Basically it pulls out the label check/add logic
    203     from label_add_hosts() into this nice method that not only creates
    204     the label but also tells the shards that service the hosts to also
    205     create the label.
    206 
    207     @param id: id or name of a label. More often a label name.
    208     @param hosts: A list of hostnames or ids. More often hostnames.
    209     """
    210     try:
    211         label = models.Label.smart_get(id)
    212     except models.Label.DoesNotExist:
    213         # This matches the type checks in smart_get, which is a hack
    214         # in and off itself. The aim here is to create any non-existent
    215         # label, which we cannot do if the 'id' specified isn't a label name.
    216         if isinstance(id, basestring):
    217             label = models.Label.smart_get(add_label(id))
    218         else:
    219             raise ValueError('Label id (%s) does not exist. Please specify '
    220                              'the argument, id, as a string (label name).'
    221                              % id)
    222 
    223     # Make sure the label exists on the shard with the same id
    224     # as it is on the master.
    225     # It is possible that the label is already in a shard because
    226     # we are adding a new label only to shards of hosts that the label
    227     # is going to be attached.
    228     # For example, we add a label L1 to a host in shard S1.
    229     # Master and S1 will have L1 but other shards won't.
    230     # Later, when we add the same label L1 to hosts in shards S1 and S2,
    231     # S1 already has the label but S2 doesn't.
    232     # S2 should have the new label without any problem.
    233     # We ignore exception in such a case.
    234     host_objs = models.Host.smart_get_bulk(hosts)
    235     rpc_utils.fanout_rpc(
    236             host_objs, 'add_label', include_hostnames=False,
    237             name=label.name, ignore_exception_if_exists=True,
    238             id=label.id, platform=label.platform)
    239 
    240 
    241 @rpc_utils.route_rpc_to_master
    242 def label_add_hosts(id, hosts):
    243     """Adds a label with the given id to the given hosts.
    244 
    245     This method should be run only on master not shards.
    246     The given label will be created if it doesn't exist, provided the `id`
    247     supplied is a label name not an int/long id.
    248 
    249     @param id: id or name of a label. More often a label name.
    250     @param hosts: A list of hostnames or ids. More often hostnames.
    251 
    252     @raises ValueError: If the id specified is an int/long (label id)
    253                         while the label does not exist.
    254     """
    255     # Create the label.
    256     _create_label_everywhere(id, hosts)
    257 
    258     # Add it to the master.
    259     add_label_to_hosts(id, hosts)
    260 
    261     # Add it to the shards.
    262     host_objs = models.Host.smart_get_bulk(hosts)
    263     rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id)
    264 
    265 
    266 def remove_label_from_hosts(id, hosts):
    267     """Removes a label of the given id from the given hosts only in local DB.
    268 
    269     @param id: id or name of a label.
    270     @param hosts: The hostnames of hosts that need to remove the label from.
    271     """
    272     host_objs = models.Host.smart_get_bulk(hosts)
    273     label = models.Label.smart_get(id)
    274     if label.is_replaced_by_static():
    275         raise error.UnmodifiableLabelException(
    276                 'Failed to remove label "%s" for hosts "%r" because it is a '
    277                 'static label. Use go/chromeos-skylab-inventory-tools to '
    278                 'modify this label.' % (label.name, hosts))
    279 
    280     label.host_set.remove(*host_objs)
    281 
    282 
    283 @rpc_utils.route_rpc_to_master
    284 def label_remove_hosts(id, hosts):
    285     """Removes a label of the given id from the given hosts.
    286 
    287     This method should be run only on master not shards.
    288 
    289     @param id: id or name of a label.
    290     @param hosts: A list of hostnames or ids. More often hostnames.
    291     """
    292     host_objs = models.Host.smart_get_bulk(hosts)
    293     remove_label_from_hosts(id, hosts)
    294 
    295     rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id)
    296 
    297 
    298 def get_labels(exclude_filters=(), **filter_data):
    299     """\
    300     @param exclude_filters: A sequence of dictionaries of filters.
    301 
    302     @returns A sequence of nested dictionaries of label information.
    303     """
    304     labels = models.Label.query_objects(filter_data)
    305     for exclude_filter in exclude_filters:
    306         labels = labels.exclude(**exclude_filter)
    307 
    308     if not RESPECT_STATIC_LABELS:
    309         return rpc_utils.prepare_rows_as_nested_dicts(labels, ())
    310 
    311     static_labels = models.StaticLabel.query_objects(filter_data)
    312     for exclude_filter in exclude_filters:
    313         static_labels = static_labels.exclude(**exclude_filter)
    314 
    315     non_static_lists = rpc_utils.prepare_rows_as_nested_dicts(labels, ())
    316     static_lists = rpc_utils.prepare_rows_as_nested_dicts(static_labels, ())
    317 
    318     label_ids = [label.id for label in labels]
    319     replaced = models.ReplacedLabel.objects.filter(label__id__in=label_ids)
    320     replaced_ids = {r.label_id for r in replaced}
    321     replaced_label_names = {l.name for l in labels if l.id in replaced_ids}
    322 
    323     return_lists  = []
    324     for non_static_label in non_static_lists:
    325         if non_static_label.get('id') not in replaced_ids:
    326             return_lists.append(non_static_label)
    327 
    328     for static_label in static_lists:
    329         if static_label.get('name') in replaced_label_names:
    330             return_lists.append(static_label)
    331 
    332     return return_lists
    333 
    334 
    335 # hosts
    336 
    337 def add_host(hostname, status=None, locked=None, lock_reason='', protection=None):
    338     if locked and not lock_reason:
    339         raise model_logic.ValidationError(
    340             {'locked': 'Please provide a reason for locking when adding host.'})
    341 
    342     return models.Host.add_object(hostname=hostname, status=status,
    343                                   locked=locked, lock_reason=lock_reason,
    344                                   protection=protection).id
    345 
    346 
    347 @rpc_utils.route_rpc_to_master
    348 def modify_host(id, **kwargs):
    349     """Modify local attributes of a host.
    350 
    351     If this is called on the master, but the host is assigned to a shard, this
    352     will call `modify_host_local` RPC to the responsible shard. This means if
    353     a host is being locked using this function, this change will also propagate
    354     to shards.
    355     When this is called on a shard, the shard just routes the RPC to the master
    356     and does nothing.
    357 
    358     @param id: id of the host to modify.
    359     @param kwargs: key=value pairs of values to set on the host.
    360     """
    361     rpc_utils.check_modify_host(kwargs)
    362     host = models.Host.smart_get(id)
    363     try:
    364         rpc_utils.check_modify_host_locking(host, kwargs)
    365     except model_logic.ValidationError as e:
    366         if not kwargs.get('force_modify_locking', False):
    367             raise
    368         logging.exception('The following exception will be ignored and lock '
    369                           'modification will be enforced. %s', e)
    370 
    371     # This is required to make `lock_time` for a host be exactly same
    372     # between the master and a shard.
    373     if kwargs.get('locked', None) and 'lock_time' not in kwargs:
    374         kwargs['lock_time'] = datetime.datetime.now()
    375 
    376     # force_modifying_locking is not an internal field in database, remove.
    377     shard_kwargs = dict(kwargs)
    378     shard_kwargs.pop('force_modify_locking', None)
    379     rpc_utils.fanout_rpc([host], 'modify_host_local',
    380                          include_hostnames=False, id=id, **shard_kwargs)
    381 
    382     # Update the local DB **after** RPC fanout is complete.
    383     # This guarantees that the master state is only updated if the shards were
    384     # correctly updated.
    385     # In case the shard update fails mid-flight and the master-shard desync, we
    386     # always consider the master state to be the source-of-truth, and any
    387     # (automated) corrective actions will revert the (partial) shard updates.
    388     host.update_object(kwargs)
    389 
    390 
    391 def modify_host_local(id, **kwargs):
    392     """Modify host attributes in local DB.
    393 
    394     @param id: Host id.
    395     @param kwargs: key=value pairs of values to set on the host.
    396     """
    397     models.Host.smart_get(id).update_object(kwargs)
    398 
    399 
    400 @rpc_utils.route_rpc_to_master
    401 def modify_hosts(host_filter_data, update_data):
    402     """Modify local attributes of multiple hosts.
    403 
    404     If this is called on the master, but one of the hosts in that match the
    405     filters is assigned to a shard, this will call `modify_hosts_local` RPC
    406     to the responsible shard.
    407     When this is called on a shard, the shard just routes the RPC to the master
    408     and does nothing.
    409 
    410     The filters are always applied on the master, not on the shards. This means
    411     if the states of a host differ on the master and a shard, the state on the
    412     master will be used. I.e. this means:
    413     A host was synced to Shard 1. On Shard 1 the status of the host was set to
    414     'Repair Failed'.
    415     - A call to modify_hosts with host_filter_data={'status': 'Ready'} will
    416     update the host (both on the shard and on the master), because the state
    417     of the host as the master knows it is still 'Ready'.
    418     - A call to modify_hosts with host_filter_data={'status': 'Repair failed'
    419     will not update the host, because the filter doesn't apply on the master.
    420 
    421     @param host_filter_data: Filters out which hosts to modify.
    422     @param update_data: A dictionary with the changes to make to the hosts.
    423     """
    424     update_data = update_data.copy()
    425     rpc_utils.check_modify_host(update_data)
    426     hosts = models.Host.query_objects(host_filter_data)
    427 
    428     affected_shard_hostnames = set()
    429     affected_host_ids = []
    430 
    431     # Check all hosts before changing data for exception safety.
    432     for host in hosts:
    433         try:
    434             rpc_utils.check_modify_host_locking(host, update_data)
    435         except model_logic.ValidationError as e:
    436             if not update_data.get('force_modify_locking', False):
    437                 raise
    438             logging.exception('The following exception will be ignored and '
    439                               'lock modification will be enforced. %s', e)
    440 
    441         if host.shard:
    442             affected_shard_hostnames.add(host.shard.hostname)
    443             affected_host_ids.append(host.id)
    444 
    445     # This is required to make `lock_time` for a host be exactly same
    446     # between the master and a shard.
    447     if update_data.get('locked', None) and 'lock_time' not in update_data:
    448         update_data['lock_time'] = datetime.datetime.now()
    449     for host in hosts:
    450         host.update_object(update_data)
    451 
    452     update_data.pop('force_modify_locking', None)
    453     # Caution: Changing the filter from the original here. See docstring.
    454     rpc_utils.run_rpc_on_multiple_hostnames(
    455             'modify_hosts_local', affected_shard_hostnames,
    456             host_filter_data={'id__in': affected_host_ids},
    457             update_data=update_data)
    458 
    459 
    460 def modify_hosts_local(host_filter_data, update_data):
    461     """Modify attributes of hosts in local DB.
    462 
    463     @param host_filter_data: Filters out which hosts to modify.
    464     @param update_data: A dictionary with the changes to make to the hosts.
    465     """
    466     for host in models.Host.query_objects(host_filter_data):
    467         host.update_object(update_data)
    468 
    469 
    470 def add_labels_to_host(id, labels):
    471     """Adds labels to a given host only in local DB.
    472 
    473     @param id: id or hostname for a host.
    474     @param labels: ids or names for labels.
    475     """
    476     label_objs = models.Label.smart_get_bulk(labels)
    477     if not RESPECT_STATIC_LABELS:
    478         models.Host.smart_get(id).labels.add(*label_objs)
    479     else:
    480         static_labels, non_static_labels = models.Host.classify_label_objects(
    481             label_objs)
    482         host = models.Host.smart_get(id)
    483         host.static_labels.add(*static_labels)
    484         host.labels.add(*non_static_labels)
    485 
    486 
    487 @rpc_utils.route_rpc_to_master
    488 def host_add_labels(id, labels):
    489     """Adds labels to a given host.
    490 
    491     @param id: id or hostname for a host.
    492     @param labels: ids or names for labels.
    493 
    494     @raises ValidationError: If adding more than one platform/board label.
    495     """
    496     # Create the labels on the master/shards.
    497     for label in labels:
    498         _create_label_everywhere(label, [id])
    499 
    500     label_objs = models.Label.smart_get_bulk(labels)
    501 
    502     platforms = [label.name for label in label_objs if label.platform]
    503     if len(platforms) > 1:
    504         raise model_logic.ValidationError(
    505             {'labels': ('Adding more than one platform: %s' %
    506                         ', '.join(platforms))})
    507 
    508     host_obj = models.Host.smart_get(id)
    509     if platforms:
    510         models.Host.check_no_platform([host_obj])
    511     if any(label_name.startswith('board:') for label_name in labels):
    512         models.Host.check_board_labels_allowed([host_obj], labels)
    513     add_labels_to_host(id, labels)
    514 
    515     rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False,
    516                          id=id, labels=labels)
    517 
    518 
    519 def remove_labels_from_host(id, labels):
    520     """Removes labels from a given host only in local DB.
    521 
    522     @param id: id or hostname for a host.
    523     @param labels: ids or names for labels.
    524     """
    525     label_objs = models.Label.smart_get_bulk(labels)
    526     if not RESPECT_STATIC_LABELS:
    527         models.Host.smart_get(id).labels.remove(*label_objs)
    528     else:
    529         static_labels, non_static_labels = models.Host.classify_label_objects(
    530                 label_objs)
    531         host = models.Host.smart_get(id)
    532         host.labels.remove(*non_static_labels)
    533         if static_labels:
    534             logging.info('Cannot remove labels "%r" for host "%r" due to they '
    535                          'are static labels. Use '
    536                          'go/chromeos-skylab-inventory-tools to modify these '
    537                          'labels.', static_labels, id)
    538 
    539 
    540 @rpc_utils.route_rpc_to_master
    541 def host_remove_labels(id, labels):
    542     """Removes labels from a given host.
    543 
    544     @param id: id or hostname for a host.
    545     @param labels: ids or names for labels.
    546     """
    547     remove_labels_from_host(id, labels)
    548 
    549     host_obj = models.Host.smart_get(id)
    550     rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False,
    551                          id=id, labels=labels)
    552 
    553 
    554 def get_host_attribute(attribute, **host_filter_data):
    555     """
    556     @param attribute: string name of attribute
    557     @param host_filter_data: filter data to apply to Hosts to choose hosts to
    558                              act upon
    559     """
    560     hosts = rpc_utils.get_host_query((), False, True, host_filter_data)
    561     hosts = list(hosts)
    562     models.Host.objects.populate_relationships(hosts, models.HostAttribute,
    563                                                'attribute_list')
    564     host_attr_dicts = []
    565     host_objs = []
    566     for host_obj in hosts:
    567         for attr_obj in host_obj.attribute_list:
    568             if attr_obj.attribute == attribute:
    569                 host_attr_dicts.append(attr_obj.get_object_dict())
    570                 host_objs.append(host_obj)
    571 
    572     if RESPECT_STATIC_ATTRIBUTES:
    573         for host_attr, host_obj in zip(host_attr_dicts, host_objs):
    574             static_attrs = models.StaticHostAttribute.query_objects(
    575                     {'host_id': host_obj.id, 'attribute': attribute})
    576             if len(static_attrs) > 0:
    577                 host_attr['value'] = static_attrs[0].value
    578 
    579     return rpc_utils.prepare_for_serialization(host_attr_dicts)
    580 
    581 
    582 @rpc_utils.route_rpc_to_master
    583 def set_host_attribute(attribute, value, **host_filter_data):
    584     """Set an attribute on hosts.
    585 
    586     This RPC is a shim that forwards calls to master to be handled there.
    587 
    588     @param attribute: string name of attribute
    589     @param value: string, or None to delete an attribute
    590     @param host_filter_data: filter data to apply to Hosts to choose hosts to
    591                              act upon
    592     """
    593     assert not utils.is_shard()
    594     set_host_attribute_impl(attribute, value, **host_filter_data)
    595 
    596 
    597 def set_host_attribute_impl(attribute, value, **host_filter_data):
    598     """Set an attribute on hosts.
    599 
    600     *** DO NOT CALL THIS RPC from client code ***
    601     This RPC exists for master-shard communication only.
    602     Call set_host_attribute instead.
    603 
    604     @param attribute: string name of attribute
    605     @param value: string, or None to delete an attribute
    606     @param host_filter_data: filter data to apply to Hosts to choose hosts to
    607                              act upon
    608     """
    609     assert host_filter_data # disallow accidental actions on all hosts
    610     hosts = models.Host.query_objects(host_filter_data)
    611     models.AclGroup.check_for_acl_violation_hosts(hosts)
    612     for host in hosts:
    613         host.set_or_delete_attribute(attribute, value)
    614 
    615     # Master forwards this RPC to shards.
    616     if not utils.is_shard():
    617         rpc_utils.fanout_rpc(hosts, 'set_host_attribute_impl', False,
    618                 attribute=attribute, value=value, **host_filter_data)
    619 
    620 
    621 @rpc_utils.forward_single_host_rpc_to_shard
    622 def delete_host(id):
    623     models.Host.smart_get(id).delete()
    624 
    625 
    626 def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
    627               valid_only=True, include_current_job=False, **filter_data):
    628     """Get a list of dictionaries which contains the information of hosts.
    629 
    630     @param multiple_labels: match hosts in all of the labels given.  Should
    631             be a list of label names.
    632     @param exclude_only_if_needed_labels: Deprecated. Raise error if it's True.
    633     @param include_current_job: Set to True to include ids of currently running
    634             job and special task.
    635     """
    636     if exclude_only_if_needed_labels:
    637         raise error.RPCException('exclude_only_if_needed_labels is deprecated')
    638 
    639     hosts = rpc_utils.get_host_query(multiple_labels,
    640                                      exclude_only_if_needed_labels,
    641                                      valid_only, filter_data)
    642     hosts = list(hosts)
    643     models.Host.objects.populate_relationships(hosts, models.Label,
    644                                                'label_list')
    645     models.Host.objects.populate_relationships(hosts, models.AclGroup,
    646                                                'acl_list')
    647     models.Host.objects.populate_relationships(hosts, models.HostAttribute,
    648                                                'attribute_list')
    649     models.Host.objects.populate_relationships(hosts,
    650                                                models.StaticHostAttribute,
    651                                                'staticattribute_list')
    652     host_dicts = []
    653     for host_obj in hosts:
    654         host_dict = host_obj.get_object_dict()
    655         host_dict['acls'] = [acl.name for acl in host_obj.acl_list]
    656         host_dict['attributes'] = dict((attribute.attribute, attribute.value)
    657                                        for attribute in host_obj.attribute_list)
    658         if RESPECT_STATIC_LABELS:
    659             label_list = []
    660             # Only keep static labels which has a corresponding entries in
    661             # afe_labels.
    662             for label in host_obj.label_list:
    663                 if label.is_replaced_by_static():
    664                     static_label = models.StaticLabel.smart_get(label.name)
    665                     label_list.append(static_label)
    666                 else:
    667                     label_list.append(label)
    668 
    669             host_dict['labels'] = [label.name for label in label_list]
    670             host_dict['platform'] = rpc_utils.find_platform(
    671                     host_obj.hostname, label_list)
    672         else:
    673             host_dict['labels'] = [label.name for label in host_obj.label_list]
    674             host_dict['platform'] = rpc_utils.find_platform(
    675                     host_obj.hostname, host_obj.label_list)
    676 
    677         if RESPECT_STATIC_ATTRIBUTES:
    678             # Overwrite attribute with values in afe_static_host_attributes.
    679             for attr in host_obj.staticattribute_list:
    680                 if attr.attribute in host_dict['attributes']:
    681                     host_dict['attributes'][attr.attribute] = attr.value
    682 
    683         if include_current_job:
    684             host_dict['current_job'] = None
    685             host_dict['current_special_task'] = None
    686             entries = models.HostQueueEntry.objects.filter(
    687                     host_id=host_dict['id'], active=True, complete=False)
    688             if entries:
    689                 host_dict['current_job'] = (
    690                         entries[0].get_object_dict()['job'])
    691             tasks = models.SpecialTask.objects.filter(
    692                     host_id=host_dict['id'], is_active=True, is_complete=False)
    693             if tasks:
    694                 host_dict['current_special_task'] = (
    695                         '%d-%s' % (tasks[0].get_object_dict()['id'],
    696                                    tasks[0].get_object_dict()['task'].lower()))
    697         host_dicts.append(host_dict)
    698 
    699     return rpc_utils.prepare_for_serialization(host_dicts)
    700 
    701 
    702 def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
    703                   valid_only=True, **filter_data):
    704     """
    705     Same parameters as get_hosts().
    706 
    707     @returns The number of matching hosts.
    708     """
    709     if exclude_only_if_needed_labels:
    710         raise error.RPCException('exclude_only_if_needed_labels is deprecated')
    711 
    712     hosts = rpc_utils.get_host_query(multiple_labels,
    713                                      exclude_only_if_needed_labels,
    714                                      valid_only, filter_data)
    715     return len(hosts)
    716 
    717 
    718 # tests
    719 
    720 def get_tests(**filter_data):
    721     return rpc_utils.prepare_for_serialization(
    722         models.Test.list_objects(filter_data))
    723 
    724 
    725 def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
    726     """Gets the counts of all passed and failed tests from the matching jobs.
    727 
    728     @param job_name_prefix: Name prefix of the jobs to get the summary
    729            from, e.g., 'butterfly-release/r40-6457.21.0/bvt-cq/'. Prefix
    730            matching is case insensitive.
    731     @param label_name: Label that must be set in the jobs, e.g.,
    732             'cros-version:butterfly-release/R40-6457.21.0'.
    733 
    734     @returns A summary of the counts of all the passed and failed tests.
    735     """
    736     job_ids = list(models.Job.objects.filter(
    737             name__istartswith=job_name_prefix,
    738             dependency_labels__name=label_name).values_list(
    739                 'pk', flat=True))
    740     summary = {'passed': 0, 'failed': 0}
    741     if not job_ids:
    742         return summary
    743 
    744     counts = (tko_models.TestView.objects.filter(
    745             afe_job_id__in=job_ids).exclude(
    746                 test_name='SERVER_JOB').exclude(
    747                     test_name__startswith='CLIENT_JOB').values(
    748                         'status').annotate(
    749                             count=Count('status')))
    750     for status in counts:
    751         if status['status'] == 'GOOD':
    752             summary['passed'] += status['count']
    753         else:
    754             summary['failed'] += status['count']
    755     return summary
    756 
    757 
    758 # profilers
    759 
    760 def add_profiler(name, description=None):
    761     return models.Profiler.add_object(name=name, description=description).id
    762 
    763 
    764 def modify_profiler(id, **data):
    765     models.Profiler.smart_get(id).update_object(data)
    766 
    767 
    768 def delete_profiler(id):
    769     models.Profiler.smart_get(id).delete()
    770 
    771 
    772 def get_profilers(**filter_data):
    773     return rpc_utils.prepare_for_serialization(
    774         models.Profiler.list_objects(filter_data))
    775 
    776 
    777 # users
    778 
    779 def get_users(**filter_data):
    780     return rpc_utils.prepare_for_serialization(
    781         models.User.list_objects(filter_data))
    782 
    783 
    784 # acl groups
    785 
    786 def add_acl_group(name, description=None):
    787     group = models.AclGroup.add_object(name=name, description=description)
    788     group.users.add(models.User.current_user())
    789     return group.id
    790 
    791 
    792 def modify_acl_group(id, **data):
    793     group = models.AclGroup.smart_get(id)
    794     group.check_for_acl_violation_acl_group()
    795     group.update_object(data)
    796     group.add_current_user_if_empty()
    797 
    798 
    799 def acl_group_add_users(id, users):
    800     group = models.AclGroup.smart_get(id)
    801     group.check_for_acl_violation_acl_group()
    802     users = models.User.smart_get_bulk(users)
    803     group.users.add(*users)
    804 
    805 
    806 def acl_group_remove_users(id, users):
    807     group = models.AclGroup.smart_get(id)
    808     group.check_for_acl_violation_acl_group()
    809     users = models.User.smart_get_bulk(users)
    810     group.users.remove(*users)
    811     group.add_current_user_if_empty()
    812 
    813 
    814 def acl_group_add_hosts(id, hosts):
    815     group = models.AclGroup.smart_get(id)
    816     group.check_for_acl_violation_acl_group()
    817     hosts = models.Host.smart_get_bulk(hosts)
    818     group.hosts.add(*hosts)
    819     group.on_host_membership_change()
    820 
    821 
    822 def acl_group_remove_hosts(id, hosts):
    823     group = models.AclGroup.smart_get(id)
    824     group.check_for_acl_violation_acl_group()
    825     hosts = models.Host.smart_get_bulk(hosts)
    826     group.hosts.remove(*hosts)
    827     group.on_host_membership_change()
    828 
    829 
    830 def delete_acl_group(id):
    831     models.AclGroup.smart_get(id).delete()
    832 
    833 
    834 def get_acl_groups(**filter_data):
    835     acl_groups = models.AclGroup.list_objects(filter_data)
    836     for acl_group in acl_groups:
    837         acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])
    838         acl_group['users'] = [user.login
    839                               for user in acl_group_obj.users.all()]
    840         acl_group['hosts'] = [host.hostname
    841                               for host in acl_group_obj.hosts.all()]
    842     return rpc_utils.prepare_for_serialization(acl_groups)
    843 
    844 
    845 # jobs
    846 
    847 def generate_control_file(tests=(), profilers=(),
    848                           client_control_file='', use_container=False,
    849                           profile_only=None, db_tests=True,
    850                           test_source_build=None):
    851     """
    852     Generates a client-side control file to run tests.
    853 
    854     @param tests List of tests to run. See db_tests for more information.
    855     @param profilers List of profilers to activate during the job.
    856     @param client_control_file The contents of a client-side control file to
    857         run at the end of all tests.  If this is supplied, all tests must be
    858         client side.
    859         TODO: in the future we should support server control files directly
    860         to wrap with a kernel.  That'll require changing the parameter
    861         name and adding a boolean to indicate if it is a client or server
    862         control file.
    863     @param use_container unused argument today.  TODO: Enable containers
    864         on the host during a client side test.
    865     @param profile_only A boolean that indicates what default profile_only
    866         mode to use in the control file. Passing None will generate a
    867         control file that does not explcitly set the default mode at all.
    868     @param db_tests: if True, the test object can be found in the database
    869                      backing the test model. In this case, tests is a tuple
    870                      of test IDs which are used to retrieve the test objects
    871                      from the database. If False, tests is a tuple of test
    872                      dictionaries stored client-side in the AFE.
    873     @param test_source_build: Build to be used to retrieve test code. Default
    874                               to None.
    875 
    876     @returns a dict with the following keys:
    877         control_file: str, The control file text.
    878         is_server: bool, is the control file a server-side control file?
    879         synch_count: How many machines the job uses per autoserv execution.
    880             synch_count == 1 means the job is asynchronous.
    881         dependencies: A list of the names of labels on which the job depends.
    882     """
    883     if not tests and not client_control_file:
    884         return dict(control_file='', is_server=False, synch_count=1,
    885                     dependencies=[])
    886 
    887     cf_info, test_objects, profiler_objects = (
    888         rpc_utils.prepare_generate_control_file(tests, profilers,
    889                                                 db_tests))
    890     cf_info['control_file'] = control_file_lib.generate_control(
    891         tests=test_objects, profilers=profiler_objects,
    892         is_server=cf_info['is_server'],
    893         client_control_file=client_control_file, profile_only=profile_only,
    894         test_source_build=test_source_build)
    895     return cf_info
    896 
    897 
    898 def create_job_page_handler(name, priority, control_file, control_type,
    899                             image=None, hostless=False, firmware_rw_build=None,
    900                             firmware_ro_build=None, test_source_build=None,
    901                             is_cloning=False, cheets_build=None, **kwargs):
    902     """\
    903     Create and enqueue a job.
    904 
    905     @param name name of this job
    906     @param priority Integer priority of this job.  Higher is more important.
    907     @param control_file String contents of the control file.
    908     @param control_type Type of control file, Client or Server.
    909     @param image: ChromeOS build to be installed in the dut. Default to None.
    910     @param firmware_rw_build: Firmware build to update RW firmware. Default to
    911                               None, i.e., RW firmware will not be updated.
    912     @param firmware_ro_build: Firmware build to update RO firmware. Default to
    913                               None, i.e., RO firmware will not be updated.
    914     @param test_source_build: Build to be used to retrieve test code. Default
    915                               to None.
    916     @param is_cloning: True if creating a cloning job.
    917     @param cheets_build: ChromeOS Android build  to be installed in the dut.
    918                          Default to None. Cheets build will not be updated.
    919     @param kwargs extra args that will be required by create_suite_job or
    920                   create_job.
    921 
    922     @returns The created Job id number.
    923     """
    924     test_args = {}
    925     if kwargs.get('args'):
    926         # args' format is: ['disable_sysinfo=False', 'fast=True', ...]
    927         args = kwargs.get('args')
    928         for arg in args:
    929             k, v = arg.split('=')[0], arg.split('=')[1]
    930             test_args[k] = v
    931 
    932     if is_cloning:
    933         logging.info('Start to clone a new job')
    934         # When cloning a job, hosts and meta_hosts should not exist together,
    935         # which would cause host-scheduler to schedule two hqe jobs to one host
    936         # at the same time, and crash itself. Clear meta_hosts for this case.
    937         if kwargs.get('hosts') and kwargs.get('meta_hosts'):
    938             kwargs['meta_hosts'] = []
    939     else:
    940         logging.info('Start to create a new job')
    941     control_file = rpc_utils.encode_ascii(control_file)
    942     if not control_file:
    943         raise model_logic.ValidationError({
    944                 'control_file' : "Control file cannot be empty"})
    945 
    946     if image and hostless:
    947         builds = {}
    948         builds[provision.CROS_VERSION_PREFIX] = image
    949         if cheets_build:
    950             builds[provision.CROS_ANDROID_VERSION_PREFIX] = cheets_build
    951         if firmware_rw_build:
    952             builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
    953         if firmware_ro_build:
    954             builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
    955         return create_suite_job(
    956                 name=name, control_file=control_file, priority=priority,
    957                 builds=builds, test_source_build=test_source_build,
    958                 is_cloning=is_cloning, test_args=test_args, **kwargs)
    959 
    960     return create_job(name, priority, control_file, control_type, image=image,
    961                       hostless=hostless, test_args=test_args, **kwargs)
    962 
    963 
    964 @rpc_utils.route_rpc_to_master
    965 def create_job(
    966         name,
    967         priority,
    968         control_file,
    969         control_type,
    970         hosts=(),
    971         meta_hosts=(),
    972         one_time_hosts=(),
    973         synch_count=None,
    974         is_template=False,
    975         timeout=None,
    976         timeout_mins=None,
    977         max_runtime_mins=None,
    978         run_verify=False,
    979         email_list='',
    980         dependencies=(),
    981         reboot_before=None,
    982         reboot_after=None,
    983         parse_failed_repair=None,
    984         hostless=False,
    985         keyvals=None,
    986         drone_set=None,
    987         image=None,
    988         parent_job_id=None,
    989         test_retry=0,
    990         run_reset=True,
    991         require_ssp=None,
    992         test_args=None,
    993         **kwargs):
    994     """\
    995     Create and enqueue a job.
    996 
    997     @param name name of this job
    998     @param priority Integer priority of this job.  Higher is more important.
    999     @param control_file String contents of the control file.
   1000     @param control_type Type of control file, Client or Server.
   1001     @param synch_count How many machines the job uses per autoserv execution.
   1002         synch_count == 1 means the job is asynchronous.  If an atomic group is
   1003         given this value is treated as a minimum.
   1004     @param is_template If true then create a template job.
   1005     @param timeout Hours after this call returns until the job times out.
   1006     @param timeout_mins Minutes after this call returns until the job times
   1007         out.
   1008     @param max_runtime_mins Minutes from job starting time until job times out
   1009     @param run_verify Should the host be verified before running the test?
   1010     @param email_list String containing emails to mail when the job is done
   1011     @param dependencies List of label names on which this job depends
   1012     @param reboot_before Never, If dirty, or Always
   1013     @param reboot_after Never, If all tests passed, or Always
   1014     @param parse_failed_repair if true, results of failed repairs launched by
   1015         this job will be parsed as part of the job.
   1016     @param hostless if true, create a hostless job
   1017     @param keyvals dict of keyvals to associate with the job
   1018     @param hosts List of hosts to run job on.
   1019     @param meta_hosts List where each entry is a label name, and for each entry
   1020         one host will be chosen from that label to run the job on.
   1021     @param one_time_hosts List of hosts not in the database to run the job on.
   1022     @param drone_set The name of the drone set to run this test on.
   1023     @param image OS image to install before running job.
   1024     @param parent_job_id id of a job considered to be parent of created job.
   1025     @param test_retry DEPRECATED
   1026     @param run_reset Should the host be reset before running the test?
   1027     @param require_ssp Set to True to require server-side packaging to run the
   1028                        test. If it's set to None, drone will still try to run
   1029                        the server side with server-side packaging. If the
   1030                        autotest-server package doesn't exist for the build or
   1031                        image is not set, drone will run the test without server-
   1032                        side packaging. Default is None.
   1033     @param test_args A dict of args passed to be injected into control file.
   1034     @param kwargs extra keyword args. NOT USED.
   1035 
   1036     @returns The created Job id number.
   1037     """
   1038     if test_args:
   1039         control_file = tools.inject_vars(test_args, control_file)
   1040     if image:
   1041         dependencies += (provision.image_version_to_label(image),)
   1042     return rpc_utils.create_job_common(
   1043             name=name,
   1044             priority=priority,
   1045             control_type=control_type,
   1046             control_file=control_file,
   1047             hosts=hosts,
   1048             meta_hosts=meta_hosts,
   1049             one_time_hosts=one_time_hosts,
   1050             synch_count=synch_count,
   1051             is_template=is_template,
   1052             timeout=timeout,
   1053             timeout_mins=timeout_mins,
   1054             max_runtime_mins=max_runtime_mins,
   1055             run_verify=run_verify,
   1056             email_list=email_list,
   1057             dependencies=dependencies,
   1058             reboot_before=reboot_before,
   1059             reboot_after=reboot_after,
   1060             parse_failed_repair=parse_failed_repair,
   1061             hostless=hostless,
   1062             keyvals=keyvals,
   1063             drone_set=drone_set,
   1064             parent_job_id=parent_job_id,
   1065             run_reset=run_reset,
   1066             require_ssp=require_ssp)
   1067 
   1068 
   1069 def abort_host_queue_entries(**filter_data):
   1070     """\
   1071     Abort a set of host queue entries.
   1072 
   1073     @return: A list of dictionaries, each contains information
   1074              about an aborted HQE.
   1075     """
   1076     query = models.HostQueueEntry.query_objects(filter_data)
   1077 
   1078     # Dont allow aborts on:
   1079     #   1. Jobs that have already completed (whether or not they were aborted)
   1080     #   2. Jobs that we have already been aborted (but may not have completed)
   1081     query = query.filter(complete=False).filter(aborted=False)
   1082     models.AclGroup.check_abort_permissions(query)
   1083     host_queue_entries = list(query.select_related())
   1084     rpc_utils.check_abort_synchronous_jobs(host_queue_entries)
   1085 
   1086     models.HostQueueEntry.abort_host_queue_entries(host_queue_entries)
   1087     hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id,
   1088                  'Job name': hqe.job.name} for hqe in host_queue_entries]
   1089     return hqe_info
   1090 
   1091 
   1092 def abort_special_tasks(**filter_data):
   1093     """\
   1094     Abort the special task, or tasks, specified in the filter.
   1095     """
   1096     query = models.SpecialTask.query_objects(filter_data)
   1097     special_tasks = query.filter(is_active=True)
   1098     for task in special_tasks:
   1099         task.abort()
   1100 
   1101 
   1102 def _call_special_tasks_on_hosts(task, hosts):
   1103     """\
   1104     Schedules a set of hosts for a special task.
   1105 
   1106     @returns A list of hostnames that a special task was created for.
   1107     """
   1108     models.AclGroup.check_for_acl_violation_hosts(hosts)
   1109     shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
   1110     if shard_host_map and not utils.is_shard():
   1111         raise ValueError('The following hosts are on shards, please '
   1112                          'follow the link to the shards and create jobs '
   1113                          'there instead. %s.' % shard_host_map)
   1114     for host in hosts:
   1115         models.SpecialTask.schedule_special_task(host, task)
   1116     return list(sorted(host.hostname for host in hosts))
   1117 
   1118 
   1119 def _forward_special_tasks_on_hosts(task, rpc, **filter_data):
   1120     """Forward special tasks to corresponding shards.
   1121 
   1122     For master, when special tasks are fired on hosts that are sharded,
   1123     forward the RPC to corresponding shards.
   1124 
   1125     For shard, create special task records in local DB.
   1126 
   1127     @param task: Enum value of frontend.afe.models.SpecialTask.Task
   1128     @param rpc: RPC name to forward.
   1129     @param filter_data: Filter keywords to be used for DB query.
   1130 
   1131     @return: A list of hostnames that a special task was created for.
   1132     """
   1133     hosts = models.Host.query_objects(filter_data)
   1134     shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
   1135 
   1136     # Filter out hosts on a shard from those on the master, forward
   1137     # rpcs to the shard with an additional hostname__in filter, and
   1138     # create a local SpecialTask for each remaining host.
   1139     if shard_host_map and not utils.is_shard():
   1140         hosts = [h for h in hosts if h.shard is None]
   1141         for shard, hostnames in shard_host_map.iteritems():
   1142 
   1143             # The main client of this module is the frontend website, and
   1144             # it invokes it with an 'id' or an 'id__in' filter. Regardless,
   1145             # the 'hostname' filter should narrow down the list of hosts on
   1146             # each shard even though we supply all the ids in filter_data.
   1147             # This method uses hostname instead of id because it fits better
   1148             # with the overall architecture of redirection functions in
   1149             # rpc_utils.
   1150             shard_filter = filter_data.copy()
   1151             shard_filter['hostname__in'] = hostnames
   1152             rpc_utils.run_rpc_on_multiple_hostnames(
   1153                     rpc, [shard], **shard_filter)
   1154 
   1155     # There is a race condition here if someone assigns a shard to one of these
   1156     # hosts before we create the task. The host will stay on the master if:
   1157     # 1. The host is not Ready
   1158     # 2. The host is Ready but has a task
   1159     # But if the host is Ready and doesn't have a task yet, it will get sent
   1160     # to the shard as we're creating a task here.
   1161 
   1162     # Given that we only rarely verify Ready hosts it isn't worth putting this
   1163     # entire method in a transaction. The worst case scenario is that we have
   1164     # a verify running on a Ready host while the shard is using it, if the
   1165     # verify fails no subsequent tasks will be created against the host on the
   1166     # master, and verifies are safe enough that this is OK.
   1167     return _call_special_tasks_on_hosts(task, hosts)
   1168 
   1169 
   1170 def reverify_hosts(**filter_data):
   1171     """\
   1172     Schedules a set of hosts for verify.
   1173 
   1174     @returns A list of hostnames that a verify task was created for.
   1175     """
   1176     return _forward_special_tasks_on_hosts(
   1177             models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data)
   1178 
   1179 
   1180 def repair_hosts(**filter_data):
   1181     """\
   1182     Schedules a set of hosts for repair.
   1183 
   1184     @returns A list of hostnames that a repair task was created for.
   1185     """
   1186     return _forward_special_tasks_on_hosts(
   1187             models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data)
   1188 
   1189 
   1190 def get_jobs(not_yet_run=False, running=False, finished=False,
   1191              suite=False, sub=False, standalone=False, **filter_data):
   1192     """\
   1193     Extra status filter args for get_jobs:
   1194     -not_yet_run: Include only jobs that have not yet started running.
   1195     -running: Include only jobs that have start running but for which not
   1196     all hosts have completed.
   1197     -finished: Include only jobs for which all hosts have completed (or
   1198     aborted).
   1199 
   1200     Extra type filter args for get_jobs:
   1201     -suite: Include only jobs with child jobs.
   1202     -sub: Include only jobs with a parent job.
   1203     -standalone: Inlcude only jobs with no child or parent jobs.
   1204     At most one of these three fields should be specified.
   1205     """
   1206     extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
   1207                                                     running,
   1208                                                     finished)
   1209     filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
   1210                                                                  suite,
   1211                                                                  sub,
   1212                                                                  standalone)
   1213     job_dicts = []
   1214     jobs = list(models.Job.query_objects(filter_data))
   1215     models.Job.objects.populate_relationships(jobs, models.Label,
   1216                                               'dependencies')
   1217     models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
   1218     for job in jobs:
   1219         job_dict = job.get_object_dict()
   1220         job_dict['dependencies'] = ','.join(label.name
   1221                                             for label in job.dependencies)
   1222         job_dict['keyvals'] = dict((keyval.key, keyval.value)
   1223                                    for keyval in job.keyvals)
   1224         job_dicts.append(job_dict)
   1225     return rpc_utils.prepare_for_serialization(job_dicts)
   1226 
   1227 
   1228 def get_num_jobs(not_yet_run=False, running=False, finished=False,
   1229                  suite=False, sub=False, standalone=False,
   1230                  **filter_data):
   1231     """\
   1232     See get_jobs() for documentation of extra filter parameters.
   1233     """
   1234     extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
   1235                                                     running,
   1236                                                     finished)
   1237     filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
   1238                                                                  suite,
   1239                                                                  sub,
   1240                                                                  standalone)
   1241     return models.Job.query_count(filter_data)
   1242 
   1243 
   1244 def get_jobs_summary(**filter_data):
   1245     """\
   1246     Like get_jobs(), but adds 'status_counts' and 'result_counts' field.
   1247 
   1248     'status_counts' filed is a dictionary mapping status strings to the number
   1249     of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}.
   1250 
   1251     'result_counts' field is piped to tko's rpc_interface and has the return
   1252     format specified under get_group_counts.
   1253     """
   1254     jobs = get_jobs(**filter_data)
   1255     ids = [job['id'] for job in jobs]
   1256     all_status_counts = models.Job.objects.get_status_counts(ids)
   1257     for job in jobs:
   1258         job['status_counts'] = all_status_counts[job['id']]
   1259         job['result_counts'] = tko_rpc_interface.get_status_counts(
   1260                 ['afe_job_id', 'afe_job_id'],
   1261                 header_groups=[['afe_job_id'], ['afe_job_id']],
   1262                 **{'afe_job_id': job['id']})
   1263     return rpc_utils.prepare_for_serialization(jobs)
   1264 
   1265 
   1266 def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):
   1267     """\
   1268     Retrieves all the information needed to clone a job.
   1269     """
   1270     job = models.Job.objects.get(id=id)
   1271     job_info = rpc_utils.get_job_info(job,
   1272                                       preserve_metahosts,
   1273                                       queue_entry_filter_data)
   1274 
   1275     host_dicts = []
   1276     for host in job_info['hosts']:
   1277         host_dict = get_hosts(id=host.id)[0]
   1278         other_labels = host_dict['labels']
   1279         if host_dict['platform']:
   1280             other_labels.remove(host_dict['platform'])
   1281         host_dict['other_labels'] = ', '.join(other_labels)
   1282         host_dicts.append(host_dict)
   1283 
   1284     for host in job_info['one_time_hosts']:
   1285         host_dict = dict(hostname=host.hostname,
   1286                          id=host.id,
   1287                          platform='(one-time host)',
   1288                          locked_text='')
   1289         host_dicts.append(host_dict)
   1290 
   1291     # convert keys from Label objects to strings (names of labels)
   1292     meta_host_counts = dict((meta_host.name, count) for meta_host, count
   1293                             in job_info['meta_host_counts'].iteritems())
   1294 
   1295     info = dict(job=job.get_object_dict(),
   1296                 meta_host_counts=meta_host_counts,
   1297                 hosts=host_dicts)
   1298     info['job']['dependencies'] = job_info['dependencies']
   1299     info['hostless'] = job_info['hostless']
   1300     info['drone_set'] = job.drone_set and job.drone_set.name
   1301 
   1302     image = _get_image_for_job(job, job_info['hostless'])
   1303     if image:
   1304         info['job']['image'] = image
   1305 
   1306     return rpc_utils.prepare_for_serialization(info)
   1307 
   1308 
   1309 def _get_image_for_job(job, hostless):
   1310     """Gets the image used for a job.
   1311 
   1312     Gets the image used for an AFE job from the job's keyvals 'build' or
   1313     'builds'. If that fails, and the job is a hostless job, tries to
   1314     get the image from its control file attributes 'build' or 'builds'.
   1315 
   1316     TODO(ntang): Needs to handle FAFT with two builds for ro/rw.
   1317 
   1318     @param job      An AFE job object.
   1319     @param hostless Boolean indicating whether the job is hostless.
   1320 
   1321     @returns The image build used for the job.
   1322     """
   1323     keyvals = job.keyval_dict()
   1324     image = keyvals.get('build')
   1325     if not image:
   1326         value = keyvals.get('builds')
   1327         builds = None
   1328         if isinstance(value, dict):
   1329             builds = value
   1330         elif isinstance(value, basestring):
   1331             builds = ast.literal_eval(value)
   1332         if builds:
   1333             image = builds.get('cros-version')
   1334     if not image and hostless and job.control_file:
   1335         try:
   1336             control_obj = control_data.parse_control_string(
   1337                     job.control_file)
   1338             if hasattr(control_obj, 'build'):
   1339                 image = getattr(control_obj, 'build')
   1340             if not image and hasattr(control_obj, 'builds'):
   1341                 builds = getattr(control_obj, 'builds')
   1342                 image = builds.get('cros-version')
   1343         except:
   1344             logging.warning('Failed to parse control file for job: %s',
   1345                             job.name)
   1346     return image
   1347 
   1348 
   1349 def get_host_queue_entries_by_insert_time(
   1350     insert_time_after=None, insert_time_before=None, **filter_data):
   1351     """Like get_host_queue_entries, but using the insert index table.
   1352 
   1353     @param insert_time_after: A lower bound on insert_time
   1354     @param insert_time_before: An upper bound on insert_time
   1355     @returns A sequence of nested dictionaries of host and job information.
   1356     """
   1357     assert insert_time_after is not None or insert_time_before is not None, \
   1358       ('Caller to get_host_queue_entries_by_insert_time must provide either'
   1359        ' insert_time_after or insert_time_before.')
   1360     # Get insert bounds on the index of the host queue entries.
   1361     if insert_time_after:
   1362         query = models.HostQueueEntryStartTimes.objects.filter(
   1363             # Note: '-insert_time' means descending. We want the largest
   1364             # insert time smaller than the insert time.
   1365             insert_time__lte=insert_time_after).order_by('-insert_time')
   1366         try:
   1367             constraint = query[0].highest_hqe_id
   1368             if 'id__gte' in filter_data:
   1369                 constraint = max(constraint, filter_data['id__gte'])
   1370             filter_data['id__gte'] = constraint
   1371         except IndexError:
   1372             pass
   1373 
   1374     # Get end bounds.
   1375     if insert_time_before:
   1376         query = models.HostQueueEntryStartTimes.objects.filter(
   1377             insert_time__gte=insert_time_before).order_by('insert_time')
   1378         try:
   1379             constraint = query[0].highest_hqe_id
   1380             if 'id__lte' in filter_data:
   1381                 constraint = min(constraint, filter_data['id__lte'])
   1382             filter_data['id__lte'] = constraint
   1383         except IndexError:
   1384             pass
   1385 
   1386     return rpc_utils.prepare_rows_as_nested_dicts(
   1387             models.HostQueueEntry.query_objects(filter_data),
   1388             ('host', 'job'))
   1389 
   1390 
   1391 def get_host_queue_entries(start_time=None, end_time=None, **filter_data):
   1392     """\
   1393     @returns A sequence of nested dictionaries of host and job information.
   1394     """
   1395     filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
   1396                                                    'started_on__lte',
   1397                                                    start_time,
   1398                                                    end_time,
   1399                                                    **filter_data)
   1400     return rpc_utils.prepare_rows_as_nested_dicts(
   1401             models.HostQueueEntry.query_objects(filter_data),
   1402             ('host', 'job'))
   1403 
   1404 
   1405 def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
   1406     """\
   1407     Get the number of host queue entries associated with this job.
   1408     """
   1409     filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
   1410                                                    'started_on__lte',
   1411                                                    start_time,
   1412                                                    end_time,
   1413                                                    **filter_data)
   1414     return models.HostQueueEntry.query_count(filter_data)
   1415 
   1416 
   1417 def get_hqe_percentage_complete(**filter_data):
   1418     """
   1419     Computes the fraction of host queue entries matching the given filter data
   1420     that are complete.
   1421     """
   1422     query = models.HostQueueEntry.query_objects(filter_data)
   1423     complete_count = query.filter(complete=True).count()
   1424     total_count = query.count()
   1425     if total_count == 0:
   1426         return 1
   1427     return float(complete_count) / total_count
   1428 
   1429 
   1430 # special tasks
   1431 
   1432 def get_special_tasks(**filter_data):
   1433     """Get special task entries from the local database.
   1434 
   1435     Query the special tasks table for tasks matching the given
   1436     `filter_data`, and return a list of the results.  No attempt is
   1437     made to forward the call to shards; the buck will stop here.
   1438     The caller is expected to know the target shard for such reasons
   1439     as:
   1440       * The caller is a service (such as gs_offloader) configured
   1441         to operate on behalf of one specific shard, and no other.
   1442       * The caller has a host as a parameter, and knows that this is
   1443         the shard assigned to that host.
   1444 
   1445     @param filter_data  Filter keywords to pass to the underlying
   1446                         database query.
   1447 
   1448     """
   1449     return rpc_utils.prepare_rows_as_nested_dicts(
   1450             models.SpecialTask.query_objects(filter_data),
   1451             ('host', 'queue_entry'))
   1452 
   1453 
   1454 def get_host_special_tasks(host_id, **filter_data):
   1455     """Get special task entries for a given host.
   1456 
   1457     Query the special tasks table for tasks that ran on the host
   1458     given by `host_id` and matching the given `filter_data`.
   1459     Return a list of the results.  If the host is assigned to a
   1460     shard, forward this call to that shard.
   1461 
   1462     @param host_id      Id in the database of the target host.
   1463     @param filter_data  Filter keywords to pass to the underlying
   1464                         database query.
   1465 
   1466     """
   1467     # Retrieve host data even if the host is in an invalid state.
   1468     host = models.Host.smart_get(host_id, False)
   1469     if not host.shard:
   1470         return get_special_tasks(host_id=host_id, **filter_data)
   1471     else:
   1472         # The return values from AFE methods are post-processed
   1473         # objects that aren't JSON-serializable.  So, we have to
   1474         # call AFE.run() to get the raw, serializable output from
   1475         # the shard.
   1476         shard_afe = frontend.AFE(server=host.shard.hostname)
   1477         return shard_afe.run('get_special_tasks',
   1478                              host_id=host_id, **filter_data)
   1479 
   1480 
   1481 def get_num_special_tasks(**kwargs):
   1482     """Get the number of special task entries from the local database.
   1483 
   1484     Query the special tasks table for tasks matching the given 'kwargs',
   1485     and return the number of the results. No attempt is made to forward
   1486     the call to shards; the buck will stop here.
   1487 
   1488     @param kwargs    Filter keywords to pass to the underlying database query.
   1489 
   1490     """
   1491     return models.SpecialTask.query_count(kwargs)
   1492 
   1493 
   1494 def get_host_num_special_tasks(host, **kwargs):
   1495     """Get special task entries for a given host.
   1496 
   1497     Query the special tasks table for tasks that ran on the host
   1498     given by 'host' and matching the given 'kwargs'.
   1499     Return a list of the results.  If the host is assigned to a
   1500     shard, forward this call to that shard.
   1501 
   1502     @param host      id or name of a host. More often a hostname.
   1503     @param kwargs    Filter keywords to pass to the underlying database query.
   1504 
   1505     """
   1506     # Retrieve host data even if the host is in an invalid state.
   1507     host_model = models.Host.smart_get(host, False)
   1508     if not host_model.shard:
   1509         return get_num_special_tasks(host=host, **kwargs)
   1510     else:
   1511         shard_afe = frontend.AFE(server=host_model.shard.hostname)
   1512         return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
   1513 
   1514 
   1515 def get_status_task(host_id, end_time):
   1516     """Get the "status task" for a host from the local shard.
   1517 
   1518     Returns a single special task representing the given host's
   1519     "status task".  The status task is a completed special task that
   1520     identifies whether the corresponding host was working or broken
   1521     when it completed.  A successful task indicates a working host;
   1522     a failed task indicates broken.
   1523 
   1524     This call will not be forward to a shard; the receiving server
   1525     must be the shard that owns the host.
   1526 
   1527     @param host_id      Id in the database of the target host.
   1528     @param end_time     Time reference for the host's status.
   1529 
   1530     @return A single task; its status (successful or not)
   1531             corresponds to the status of the host (working or
   1532             broken) at the given time.  If no task is found, return
   1533             `None`.
   1534 
   1535     """
   1536     tasklist = rpc_utils.prepare_rows_as_nested_dicts(
   1537             status_history.get_status_task(host_id, end_time),
   1538             ('host', 'queue_entry'))
   1539     return tasklist[0] if tasklist else None
   1540 
   1541 
   1542 def get_host_status_task(host_id, end_time):
   1543     """Get the "status task" for a host from its owning shard.
   1544 
   1545     Finds the given host's owning shard, and forwards to it a call
   1546     to `get_status_task()` (see above).
   1547 
   1548     @param host_id      Id in the database of the target host.
   1549     @param end_time     Time reference for the host's status.
   1550 
   1551     @return A single task; its status (successful or not)
   1552             corresponds to the status of the host (working or
   1553             broken) at the given time.  If no task is found, return
   1554             `None`.
   1555 
   1556     """
   1557     host = models.Host.smart_get(host_id)
   1558     if not host.shard:
   1559         return get_status_task(host_id, end_time)
   1560     else:
   1561         # The return values from AFE methods are post-processed
   1562         # objects that aren't JSON-serializable.  So, we have to
   1563         # call AFE.run() to get the raw, serializable output from
   1564         # the shard.
   1565         shard_afe = frontend.AFE(server=host.shard.hostname)
   1566         return shard_afe.run('get_status_task',
   1567                              host_id=host_id, end_time=end_time)
   1568 
   1569 
   1570 def get_host_diagnosis_interval(host_id, end_time, success):
   1571     """Find a "diagnosis interval" for a given host.
   1572 
   1573     A "diagnosis interval" identifies a start and end time where
   1574     the host went from "working" to "broken", or vice versa.  The
   1575     interval's starting time is the starting time of the last status
   1576     task with the old status; the end time is the finish time of the
   1577     first status task with the new status.
   1578 
   1579     This routine finds the most recent diagnosis interval for the
   1580     given host prior to `end_time`, with a starting status matching
   1581     `success`.  If `success` is true, the interval will start with a
   1582     successful status task; if false the interval will start with a
   1583     failed status task.
   1584 
   1585     @param host_id      Id in the database of the target host.
   1586     @param end_time     Time reference for the diagnosis interval.
   1587     @param success      Whether the diagnosis interval should start
   1588                         with a successful or failed status task.
   1589 
   1590     @return A list of two strings.  The first is the timestamp for
   1591             the beginning of the interval; the second is the
   1592             timestamp for the end.  If the host has never changed
   1593             state, the list is empty.
   1594 
   1595     """
   1596     host = models.Host.smart_get(host_id)
   1597     if not host.shard or utils.is_shard():
   1598         return status_history.get_diagnosis_interval(
   1599                 host_id, end_time, success)
   1600     else:
   1601         shard_afe = frontend.AFE(server=host.shard.hostname)
   1602         return shard_afe.get_host_diagnosis_interval(
   1603                 host_id, end_time, success)
   1604 
   1605 
   1606 # support for host detail view
   1607 
   1608 def get_host_queue_entries_and_special_tasks(host, query_start=None,
   1609                                              query_limit=None, start_time=None,
   1610                                              end_time=None):
   1611     """
   1612     @returns an interleaved list of HostQueueEntries and SpecialTasks,
   1613             in approximate run order.  each dict contains keys for type, host,
   1614             job, status, started_on, execution_path, and ID.
   1615     """
   1616     total_limit = None
   1617     if query_limit is not None:
   1618         total_limit = query_start + query_limit
   1619     filter_data_common = {'host': host,
   1620                           'query_limit': total_limit,
   1621                           'sort_by': ['-id']}
   1622 
   1623     filter_data_special_tasks = rpc_utils.inject_times_to_filter(
   1624             'time_started__gte', 'time_started__lte', start_time, end_time,
   1625             **filter_data_common)
   1626 
   1627     queue_entries = get_host_queue_entries(
   1628             start_time, end_time, **filter_data_common)
   1629     special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
   1630 
   1631     interleaved_entries = rpc_utils.interleave_entries(queue_entries,
   1632                                                        special_tasks)
   1633     if query_start is not None:
   1634         interleaved_entries = interleaved_entries[query_start:]
   1635     if query_limit is not None:
   1636         interleaved_entries = interleaved_entries[:query_limit]
   1637     return rpc_utils.prepare_host_queue_entries_and_special_tasks(
   1638             interleaved_entries, queue_entries)
   1639 
   1640 
   1641 def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
   1642                                                  end_time=None):
   1643     filter_data_common = {'host': host}
   1644 
   1645     filter_data_queue_entries, filter_data_special_tasks = (
   1646             rpc_utils.inject_times_to_hqe_special_tasks_filters(
   1647                     filter_data_common, start_time, end_time))
   1648 
   1649     return (models.HostQueueEntry.query_count(filter_data_queue_entries)
   1650             + get_host_num_special_tasks(**filter_data_special_tasks))
   1651 
   1652 
   1653 # other
   1654 
   1655 def echo(data=""):
   1656     """\
   1657     Returns a passed in string. For doing a basic test to see if RPC calls
   1658     can successfully be made.
   1659     """
   1660     return data
   1661 
   1662 
   1663 def get_motd():
   1664     """\
   1665     Returns the message of the day as a string.
   1666     """
   1667     return rpc_utils.get_motd()
   1668 
   1669 
   1670 def get_static_data():
   1671     """\
   1672     Returns a dictionary containing a bunch of data that shouldn't change
   1673     often and is otherwise inaccessible.  This includes:
   1674 
   1675     priorities: List of job priority choices.
   1676     default_priority: Default priority value for new jobs.
   1677     users: Sorted list of all users.
   1678     labels: Sorted list of labels not start with 'cros-version' and
   1679             'fw-version'.
   1680     tests: Sorted list of all tests.
   1681     profilers: Sorted list of all profilers.
   1682     current_user: Logged-in username.
   1683     host_statuses: Sorted list of possible Host statuses.
   1684     job_statuses: Sorted list of possible HostQueueEntry statuses.
   1685     job_timeout_default: The default job timeout length in minutes.
   1686     parse_failed_repair_default: Default value for the parse_failed_repair job
   1687             option.
   1688     reboot_before_options: A list of valid RebootBefore string enums.
   1689     reboot_after_options: A list of valid RebootAfter string enums.
   1690     motd: Server's message of the day.
   1691     status_dictionary: A mapping from one word job status names to a more
   1692             informative description.
   1693     """
   1694 
   1695     default_drone_set_name = models.DroneSet.default_drone_set_name()
   1696     drone_sets = ([default_drone_set_name] +
   1697                   sorted(drone_set.name for drone_set in
   1698                          models.DroneSet.objects.exclude(
   1699                                  name=default_drone_set_name)))
   1700 
   1701     result = {}
   1702     result['priorities'] = priorities.Priority.choices()
   1703     result['default_priority'] = 'Default'
   1704     result['max_schedulable_priority'] = priorities.Priority.DEFAULT
   1705     result['users'] = get_users(sort_by=['login'])
   1706 
   1707     label_exclude_filters = [{'name__startswith': 'cros-version'},
   1708                              {'name__startswith': 'fw-version'},
   1709                              {'name__startswith': 'fwrw-version'},
   1710                              {'name__startswith': 'fwro-version'}]
   1711     result['labels'] = get_labels(
   1712         label_exclude_filters,
   1713         sort_by=['-platform', 'name'])
   1714 
   1715     result['tests'] = get_tests(sort_by=['name'])
   1716     result['profilers'] = get_profilers(sort_by=['name'])
   1717     result['current_user'] = rpc_utils.prepare_for_serialization(
   1718         models.User.current_user().get_object_dict())
   1719     result['host_statuses'] = sorted(models.Host.Status.names)
   1720     result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
   1721     result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS
   1722     result['job_max_runtime_mins_default'] = (
   1723         models.Job.DEFAULT_MAX_RUNTIME_MINS)
   1724     result['parse_failed_repair_default'] = bool(
   1725         models.Job.DEFAULT_PARSE_FAILED_REPAIR)
   1726     result['reboot_before_options'] = model_attributes.RebootBefore.names
   1727     result['reboot_after_options'] = model_attributes.RebootAfter.names
   1728     result['motd'] = rpc_utils.get_motd()
   1729     result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()
   1730     result['drone_sets'] = drone_sets
   1731 
   1732     result['status_dictionary'] = {"Aborted": "Aborted",
   1733                                    "Verifying": "Verifying Host",
   1734                                    "Provisioning": "Provisioning Host",
   1735                                    "Pending": "Waiting on other hosts",
   1736                                    "Running": "Running autoserv",
   1737                                    "Completed": "Autoserv completed",
   1738                                    "Failed": "Failed to complete",
   1739                                    "Queued": "Queued",
   1740                                    "Starting": "Next in host's queue",
   1741                                    "Stopped": "Other host(s) failed verify",
   1742                                    "Parsing": "Awaiting parse of final results",
   1743                                    "Gathering": "Gathering log files",
   1744                                    "Waiting": "Waiting for scheduler action",
   1745                                    "Archiving": "Archiving results",
   1746                                    "Resetting": "Resetting hosts"}
   1747 
   1748     result['wmatrix_url'] = rpc_utils.get_wmatrix_url()
   1749     result['stainless_url'] = rpc_utils.get_stainless_url()
   1750     result['is_moblab'] = bool(utils.is_moblab())
   1751 
   1752     return result
   1753 
   1754 
   1755 def get_server_time():
   1756     return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
   1757 
   1758 
   1759 def ping_db():
   1760     """Simple connection test to db"""
   1761     try:
   1762         db_connection.cursor()
   1763     except DatabaseError:
   1764         return [False]
   1765     return [True]
   1766 
   1767 
   1768 def get_hosts_by_attribute(attribute, value):
   1769     """
   1770     Get the list of valid hosts that share the same host attribute value.
   1771 
   1772     @param attribute: String of the host attribute to check.
   1773     @param value: String of the value that is shared between hosts.
   1774 
   1775     @returns List of hostnames that all have the same host attribute and
   1776              value.
   1777     """
   1778     rows = models.HostAttribute.query_objects({'attribute': attribute,
   1779                                                'value': value})
   1780     if RESPECT_STATIC_ATTRIBUTES:
   1781         returned_hosts = set()
   1782         # Add hosts:
   1783         #     * Non-valid
   1784         #     * Exist in afe_host_attribute with given attribute.
   1785         #     * Don't exist in afe_static_host_attribute OR exist in
   1786         #       afe_static_host_attribute with the same given value.
   1787         for row in rows:
   1788             if row.host.invalid != 0:
   1789                 continue
   1790 
   1791             static_hosts = models.StaticHostAttribute.query_objects(
   1792                 {'host_id': row.host.id, 'attribute': attribute})
   1793             values = [static_host.value for static_host in static_hosts]
   1794             if len(values) == 0 or values[0] == value:
   1795                 returned_hosts.add(row.host.hostname)
   1796 
   1797         # Add hosts:
   1798         #     * Non-valid
   1799         #     * Exist in afe_static_host_attribute with given attribute
   1800         #       and value
   1801         #     * No need to check whether each static attribute has its
   1802         #       corresponding entry in afe_host_attribute since it is ensured
   1803         #       in inventory sync.
   1804         static_rows = models.StaticHostAttribute.query_objects(
   1805                 {'attribute': attribute, 'value': value})
   1806         for row in static_rows:
   1807             if row.host.invalid != 0:
   1808                 continue
   1809 
   1810             returned_hosts.add(row.host.hostname)
   1811 
   1812         return list(returned_hosts)
   1813     else:
   1814         return [row.host.hostname for row in rows if row.host.invalid == 0]
   1815 
   1816 
   1817 def _get_control_file_by_suite(suite_name):
   1818     """Get control file contents by suite name.
   1819 
   1820     @param suite_name: Suite name as string.
   1821     @returns: Control file contents as string.
   1822     """
   1823     getter = control_file_getter.FileSystemGetter(
   1824             [_CONFIG.get_config_value('SCHEDULER',
   1825                                       'drone_installation_directory')])
   1826     return getter.get_control_file_contents_by_name(suite_name)
   1827 
   1828 
   1829 @rpc_utils.route_rpc_to_master
   1830 def create_suite_job(
   1831         name='',
   1832         board='',
   1833         pool='',
   1834         child_dependencies=(),
   1835         control_file='',
   1836         check_hosts=True,
   1837         num=None,
   1838         file_bugs=False,
   1839         timeout=24,
   1840         timeout_mins=None,
   1841         priority=priorities.Priority.DEFAULT,
   1842         suite_args=None,
   1843         wait_for_results=True,
   1844         job_retry=False,
   1845         max_retries=None,
   1846         max_runtime_mins=None,
   1847         suite_min_duts=0,
   1848         offload_failures_only=False,
   1849         builds=None,
   1850         test_source_build=None,
   1851         run_prod_code=False,
   1852         delay_minutes=0,
   1853         is_cloning=False,
   1854         job_keyvals=None,
   1855         test_args=None,
   1856         **kwargs):
   1857     """
   1858     Create a job to run a test suite on the given device with the given image.
   1859 
   1860     When the timeout specified in the control file is reached, the
   1861     job is guaranteed to have completed and results will be available.
   1862 
   1863     @param name: The test name if control_file is supplied, otherwise the name
   1864                  of the test suite to run, e.g. 'bvt'.
   1865     @param board: the kind of device to run the tests on.
   1866     @param builds: the builds to install e.g.
   1867                    {'cros-version:': 'x86-alex-release/R18-1655.0.0',
   1868                     'fwrw-version:':  'x86-alex-firmware/R36-5771.50.0',
   1869                     'fwro-version:':  'x86-alex-firmware/R36-5771.49.0'}
   1870                    If builds is given a value, it overrides argument build.
   1871     @param test_source_build: Build that contains the server-side test code.
   1872     @param pool: Specify the pool of machines to use for scheduling
   1873             purposes.
   1874     @param child_dependencies: (optional) list of additional dependency labels
   1875             (strings) that will be added as dependency labels to child jobs.
   1876     @param control_file: the control file of the job.
   1877     @param check_hosts: require appropriate live hosts to exist in the lab.
   1878     @param num: Specify the number of machines to schedule across (integer).
   1879                 Leave unspecified or use None to use default sharding factor.
   1880     @param file_bugs: File a bug on each test failure in this suite.
   1881     @param timeout: The max lifetime of this suite, in hours.
   1882     @param timeout_mins: The max lifetime of this suite, in minutes. Takes
   1883                          priority over timeout.
   1884     @param priority: Integer denoting priority. Higher is more important.
   1885     @param suite_args: Optional arguments which will be parsed by the suite
   1886                        control file. Used by control.test_that_wrapper to
   1887                        determine which tests to run.
   1888     @param wait_for_results: Set to False to run the suite job without waiting
   1889                              for test jobs to finish. Default is True.
   1890     @param job_retry: Set to True to enable job-level retry. Default is False.
   1891     @param max_retries: Integer, maximum job retries allowed at suite level.
   1892                         None for no max.
   1893     @param max_runtime_mins: Maximum amount of time a job can be running in
   1894                              minutes.
   1895     @param suite_min_duts: Integer. Scheduler will prioritize getting the
   1896                            minimum number of machines for the suite when it is
   1897                            competing with another suite that has a higher
   1898                            priority but already got minimum machines it needs.
   1899     @param offload_failures_only: Only enable gs_offloading for failed jobs.
   1900     @param run_prod_code: If True, the suite will run the test code that
   1901                           lives in prod aka the test code currently on the
   1902                           lab servers. If False, the control files and test
   1903                           code for this suite run will be retrieved from the
   1904                           build artifacts.
   1905     @param delay_minutes: Delay the creation of test jobs for a given number of
   1906                           minutes.
   1907     @param is_cloning: True if creating a cloning job.
   1908     @param job_keyvals: A dict of job keyvals to be inject to control file.
   1909     @param test_args: A dict of args passed all the way to each individual test
   1910                       that will be actually run.
   1911     @param kwargs: extra keyword args. NOT USED.
   1912 
   1913     @raises ControlFileNotFound: if a unique suite control file doesn't exist.
   1914     @raises NoControlFileList: if we can't list the control files at all.
   1915     @raises StageControlFileFailure: If the dev server throws 500 while
   1916                                      staging test_suites.
   1917     @raises ControlFileEmpty: if the control file exists on the server, but
   1918                               can't be read.
   1919 
   1920     @return: the job ID of the suite; -1 on error.
   1921     """
   1922     if num is not None:
   1923         warnings.warn('num is deprecated for create_suite_job')
   1924     del num
   1925 
   1926     if builds is None:
   1927         builds = {}
   1928 
   1929     # Default test source build to CrOS build if it's not specified and
   1930     # run_prod_code is set to False.
   1931     if not run_prod_code:
   1932         test_source_build = Suite.get_test_source_build(
   1933                 builds, test_source_build=test_source_build)
   1934 
   1935     sample_dut = rpc_utils.get_sample_dut(board, pool)
   1936 
   1937     suite_name = suite_common.canonicalize_suite_name(name)
   1938     if run_prod_code:
   1939         ds = dev_server.resolve(test_source_build, hostname=sample_dut)
   1940         keyvals = {}
   1941     else:
   1942         ds, keyvals = suite_common.stage_build_artifacts(
   1943                 test_source_build, hostname=sample_dut)
   1944     keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts
   1945 
   1946     # Do not change this naming convention without updating
   1947     # site_utils.parse_job_name.
   1948     if run_prod_code:
   1949         # If run_prod_code is True, test_source_build is not set, use the
   1950         # first build in the builds list for the sutie job name.
   1951         name = '%s-%s' % (builds.values()[0], suite_name)
   1952     else:
   1953         name = '%s-%s' % (test_source_build, suite_name)
   1954 
   1955     timeout_mins = timeout_mins or timeout * 60
   1956     max_runtime_mins = max_runtime_mins or timeout * 60
   1957 
   1958     if not board:
   1959         board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0]
   1960 
   1961     if run_prod_code:
   1962         control_file = _get_control_file_by_suite(suite_name)
   1963 
   1964     if not control_file:
   1965         # No control file was supplied so look it up from the build artifacts.
   1966         control_file = suite_common.get_control_file_by_build(
   1967                 test_source_build, ds, suite_name)
   1968 
   1969     # Prepend builds and board to the control file.
   1970     if is_cloning:
   1971         control_file = tools.remove_injection(control_file)
   1972 
   1973     if suite_args is None:
   1974         suite_args = dict()
   1975 
   1976     inject_dict = {
   1977         'board': board,
   1978         # `build` is needed for suites like AU to stage image inside suite
   1979         # control file.
   1980         'build': test_source_build,
   1981         'builds': builds,
   1982         'check_hosts': check_hosts,
   1983         'pool': pool,
   1984         'child_dependencies': child_dependencies,
   1985         'file_bugs': file_bugs,
   1986         'timeout': timeout,
   1987         'timeout_mins': timeout_mins,
   1988         'devserver_url': ds.url(),
   1989         'priority': priority,
   1990         'wait_for_results': wait_for_results,
   1991         'job_retry': job_retry,
   1992         'max_retries': max_retries,
   1993         'max_runtime_mins': max_runtime_mins,
   1994         'offload_failures_only': offload_failures_only,
   1995         'test_source_build': test_source_build,
   1996         'run_prod_code': run_prod_code,
   1997         'delay_minutes': delay_minutes,
   1998         'job_keyvals': job_keyvals,
   1999         'test_args': test_args,
   2000     }
   2001     inject_dict.update(suite_args)
   2002     control_file = tools.inject_vars(inject_dict, control_file)
   2003 
   2004     return rpc_utils.create_job_common(name,
   2005                                        priority=priority,
   2006                                        timeout_mins=timeout_mins,
   2007                                        max_runtime_mins=max_runtime_mins,
   2008                                        control_type='Server',
   2009                                        control_file=control_file,
   2010                                        hostless=True,
   2011                                        keyvals=keyvals)
   2012 
   2013 
   2014 def get_job_history(**filter_data):
   2015     """Get history of the job, including the special tasks executed for the job
   2016 
   2017     @param filter_data: filter for the call, should at least include
   2018                         {'job_id': [job id]}
   2019     @returns: JSON string of the job's history, including the information such
   2020               as the hosts run the job and the special tasks executed before
   2021               and after the job.
   2022     """
   2023     job_id = filter_data['job_id']
   2024     job_info = job_history.get_job_info(job_id)
   2025     return rpc_utils.prepare_for_serialization(job_info.get_history())
   2026 
   2027 
   2028 def get_host_history(start_time, end_time, hosts=None, board=None, pool=None):
   2029     """Deprecated."""
   2030     raise ValueError('get_host_history rpc is deprecated '
   2031                      'and no longer implemented.')
   2032 
   2033 
   2034 def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(),
   2035                     known_host_ids=(), known_host_statuses=()):
   2036     """Receive updates for job statuses from shards and assign hosts and jobs.
   2037 
   2038     @param shard_hostname: Hostname of the calling shard
   2039     @param jobs: Jobs in serialized form that should be updated with newer
   2040                  status from a shard.
   2041     @param hqes: Hostqueueentries in serialized form that should be updated with
   2042                  newer status from a shard. Note that for every hostqueueentry
   2043                  the corresponding job must be in jobs.
   2044     @param known_job_ids: List of ids of jobs the shard already has.
   2045     @param known_host_ids: List of ids of hosts the shard already has.
   2046     @param known_host_statuses: List of statuses of hosts the shard already has.
   2047 
   2048     @returns: Serialized representations of hosts, jobs, suite job keyvals
   2049               and their dependencies to be inserted into a shard's database.
   2050     """
   2051     # The following alternatives to sending host and job ids in every heartbeat
   2052     # have been considered:
   2053     # 1. Sending the highest known job and host ids. This would work for jobs:
   2054     #    Newer jobs always have larger ids. Also, if a job is not assigned to a
   2055     #    particular shard during a heartbeat, it never will be assigned to this
   2056     #    shard later.
   2057     #    This is not true for hosts though: A host that is leased won't be sent
   2058     #    to the shard now, but might be sent in a future heartbeat. This means
   2059     #    sometimes hosts should be transfered that have a lower id than the
   2060     #    maximum host id the shard knows.
   2061     # 2. Send the number of jobs/hosts the shard knows to the master in each
   2062     #    heartbeat. Compare these to the number of records that already have
   2063     #    the shard_id set to this shard. In the normal case, they should match.
   2064     #    In case they don't, resend all entities of that type.
   2065     #    This would work well for hosts, because there aren't that many.
   2066     #    Resending all jobs is quite a big overhead though.
   2067     #    Also, this approach might run into edge cases when entities are
   2068     #    ever deleted.
   2069     # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts.
   2070     #    Using two different approaches isn't consistent and might cause
   2071     #    confusion. Also the issues with the case of deletions might still
   2072     #    occur.
   2073     #
   2074     # The overhead of sending all job and host ids in every heartbeat is low:
   2075     # At peaks one board has about 1200 created but unfinished jobs.
   2076     # See the numbers here: http://goo.gl/gQCGWH
   2077     # Assuming that job id's have 6 digits and that json serialization takes a
   2078     # comma and a space as overhead, the traffic per id sent is about 8 bytes.
   2079     # If 5000 ids need to be sent, this means 40 kilobytes of traffic.
   2080     # A NOT IN query with 5000 ids took about 30ms in tests made.
   2081     # These numbers seem low enough to outweigh the disadvantages of the
   2082     # solutions described above.
   2083     shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
   2084     rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
   2085     assert len(known_host_ids) == len(known_host_statuses)
   2086     for i in range(len(known_host_ids)):
   2087         host_model = models.Host.objects.get(pk=known_host_ids[i])
   2088         if host_model.status != known_host_statuses[i]:
   2089             host_model.status = known_host_statuses[i]
   2090             host_model.save()
   2091 
   2092     hosts, jobs, suite_keyvals, inc_ids = rpc_utils.find_records_for_shard(
   2093             shard_obj, known_job_ids=known_job_ids,
   2094             known_host_ids=known_host_ids)
   2095     return {
   2096         'hosts': [host.serialize() for host in hosts],
   2097         'jobs': [job.serialize() for job in jobs],
   2098         'suite_keyvals': [kv.serialize() for kv in suite_keyvals],
   2099         'incorrect_host_ids': [int(i) for i in inc_ids],
   2100     }
   2101 
   2102 
   2103 def get_shards(**filter_data):
   2104     """Return a list of all shards.
   2105 
   2106     @returns A sequence of nested dictionaries of shard information.
   2107     """
   2108     shards = models.Shard.query_objects(filter_data)
   2109     serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ())
   2110     for serialized, shard in zip(serialized_shards, shards):
   2111         serialized['labels'] = [label.name for label in shard.labels.all()]
   2112 
   2113     return serialized_shards
   2114 
   2115 
   2116 def _assign_board_to_shard_precheck(labels):
   2117     """Verify whether board labels are valid to be added to a given shard.
   2118 
   2119     First check whether board label is in correct format. Second, check whether
   2120     the board label exist. Third, check whether the board has already been
   2121     assigned to shard.
   2122 
   2123     @param labels: Board labels separated by comma.
   2124 
   2125     @raises error.RPCException: If label provided doesn't start with `board:`
   2126             or board has been added to shard already.
   2127     @raises models.Label.DoesNotExist: If the label specified doesn't exist.
   2128 
   2129     @returns: A list of label models that ready to be added to shard.
   2130     """
   2131     if not labels:
   2132       # allow creation of label-less shards (labels='' would otherwise fail the
   2133       # checks below)
   2134       return []
   2135     labels = labels.split(',')
   2136     label_models = []
   2137     for label in labels:
   2138         # Check whether the board label is in correct format.
   2139         if not label.startswith('board:'):
   2140             raise error.RPCException('Sharding only supports `board:.*` label.')
   2141         # Check whether the board label exist. If not, exception will be thrown
   2142         # by smart_get function.
   2143         label = models.Label.smart_get(label)
   2144         # Check whether the board has been sharded already
   2145         try:
   2146             shard = models.Shard.objects.get(labels=label)
   2147             raise error.RPCException(
   2148                     '%s is already on shard %s' % (label, shard.hostname))
   2149         except models.Shard.DoesNotExist:
   2150             # board is not on any shard, so it's valid.
   2151             label_models.append(label)
   2152     return label_models
   2153 
   2154 
   2155 def add_shard(hostname, labels):
   2156     """Add a shard and start running jobs on it.
   2157 
   2158     @param hostname: The hostname of the shard to be added; needs to be unique.
   2159     @param labels: Board labels separated by comma. Jobs of one of the labels
   2160                    will be assigned to the shard.
   2161 
   2162     @raises error.RPCException: If label provided doesn't start with `board:` or
   2163             board has been added to shard already.
   2164     @raises model_logic.ValidationError: If a shard with the given hostname
   2165             already exist.
   2166     @raises models.Label.DoesNotExist: If the label specified doesn't exist.
   2167 
   2168     @returns: The id of the added shard.
   2169     """
   2170     labels = _assign_board_to_shard_precheck(labels)
   2171     shard = models.Shard.add_object(hostname=hostname)
   2172     for label in labels:
   2173         shard.labels.add(label)
   2174     return shard.id
   2175 
   2176 
   2177 def add_board_to_shard(hostname, labels):
   2178     """Add boards to a given shard
   2179 
   2180     @param hostname: The hostname of the shard to be changed.
   2181     @param labels: Board labels separated by comma.
   2182 
   2183     @raises error.RPCException: If label provided doesn't start with `board:` or
   2184             board has been added to shard already.
   2185     @raises models.Label.DoesNotExist: If the label specified doesn't exist.
   2186 
   2187     @returns: The id of the changed shard.
   2188     """
   2189     labels = _assign_board_to_shard_precheck(labels)
   2190     shard = models.Shard.objects.get(hostname=hostname)
   2191     for label in labels:
   2192         shard.labels.add(label)
   2193     return shard.id
   2194 
   2195 
   2196 # Remove board RPCs are rare, so we can afford to make them a bit more
   2197 # expensive (by performing in a transaction) in order to guarantee
   2198 # atomicity.
   2199 # TODO(akeshet): If we ever update to newer version of django, we need to
   2200 # migrate to transaction.atomic instead of commit_on_success
   2201 @transaction.commit_on_success
   2202 def remove_board_from_shard(hostname, label):
   2203     """Remove board from the given shard.
   2204     @param hostname: The hostname of the shard to be changed.
   2205     @param labels: Board label.
   2206 
   2207     @raises models.Label.DoesNotExist: If the label specified doesn't exist.
   2208 
   2209     @returns: The id of the changed shard.
   2210     """
   2211     shard = models.Shard.objects.get(hostname=hostname)
   2212     label = models.Label.smart_get(label)
   2213     if label not in shard.labels.all():
   2214         raise error.RPCException(
   2215           'Cannot remove label from shard that does not belong to it.')
   2216 
   2217     shard.labels.remove(label)
   2218     if label.is_replaced_by_static():
   2219         static_label = models.StaticLabel.smart_get(label.name)
   2220         models.Host.objects.filter(
   2221                 static_labels__in=[static_label]).update(shard=None)
   2222     else:
   2223         models.Host.objects.filter(labels__in=[label]).update(shard=None)
   2224 
   2225 
   2226 def delete_shard(hostname):
   2227     """Delete a shard and reclaim all resources from it.
   2228 
   2229     This claims back all assigned hosts from the shard.
   2230     """
   2231     shard = rpc_utils.retrieve_shard(shard_hostname=hostname)
   2232 
   2233     # Remove shard information.
   2234     models.Host.objects.filter(shard=shard).update(shard=None)
   2235 
   2236     # Note: The original job-cleanup query was performed with django call
   2237     #   models.Job.objects.filter(shard=shard).update(shard=None)
   2238     #
   2239     # But that started becoming unreliable due to the large size of afe_jobs.
   2240     #
   2241     # We don't need atomicity here, so the new cleanup method is iterative, in
   2242     # chunks of 100k jobs.
   2243     QUERY = ('UPDATE afe_jobs SET shard_id = NULL WHERE shard_id = %s '
   2244              'LIMIT 100000')
   2245     try:
   2246         with contextlib.closing(db_connection.cursor()) as cursor:
   2247             clear_jobs = True
   2248             assert shard.id is not None
   2249             while clear_jobs:
   2250                 cursor.execute(QUERY % shard.id)
   2251                 clear_jobs = bool(cursor.fetchone())
   2252     # Unit tests use sqlite backend instead of MySQL. sqlite does not support
   2253     # UPDATE ... LIMIT, so fall back to the old behavior.
   2254     except DatabaseError as e:
   2255         if 'syntax error' in str(e):
   2256             models.Job.objects.filter(shard=shard).update(shard=None)
   2257         else:
   2258             raise
   2259 
   2260     shard.labels.clear()
   2261     shard.delete()
   2262 
   2263 
   2264 def get_servers(hostname=None, role=None, status=None):
   2265     """Get a list of servers with matching role and status.
   2266 
   2267     @param hostname: FQDN of the server.
   2268     @param role: Name of the server role, e.g., drone, scheduler. Default to
   2269                  None to match any role.
   2270     @param status: Status of the server, e.g., primary, backup, repair_required.
   2271                    Default to None to match any server status.
   2272 
   2273     @raises error.RPCException: If server database is not used.
   2274     @return: A list of server names for servers with matching role and status.
   2275     """
   2276     if not server_manager_utils.use_server_db():
   2277         raise error.RPCException('Server database is not enabled. Please try '
   2278                                  'retrieve servers from global config.')
   2279     servers = server_manager_utils.get_servers(hostname=hostname, role=role,
   2280                                                status=status)
   2281     return [s.get_details() for s in servers]
   2282 
   2283 
   2284 @rpc_utils.route_rpc_to_master
   2285 def get_stable_version(board=stable_version_utils.DEFAULT, android=False):
   2286     """Get stable version for the given board.
   2287 
   2288     @param board: Name of the board.
   2289     @param android: Unused legacy parameter.  This is maintained for the
   2290             sake of clients on old branches that still pass the
   2291             parameter.  TODO(jrbarnette) Remove this completely once R68
   2292             drops off stable.
   2293 
   2294     @return: Stable version of the given board. Return global configure value
   2295              of CROS.stable_cros_version if stable_versinos table does not have
   2296              entry of board DEFAULT.
   2297     """
   2298     assert not android, 'get_stable_version no longer supports `android`.'
   2299     return stable_version_utils.get(board=board)
   2300 
   2301 
   2302 @rpc_utils.route_rpc_to_master
   2303 def get_all_stable_versions():
   2304     """Get stable versions for all boards.
   2305 
   2306     @return: A dictionary of board:version.
   2307     """
   2308     return stable_version_utils.get_all()
   2309 
   2310 
   2311 @rpc_utils.route_rpc_to_master
   2312 def set_stable_version(version, board=stable_version_utils.DEFAULT):
   2313     """Modify stable version for the given board.
   2314 
   2315     @param version: The new value of stable version for given board.
   2316     @param board: Name of the board, default to value `DEFAULT`.
   2317     """
   2318     stable_version_utils.set(version=version, board=board)
   2319 
   2320 
   2321 @rpc_utils.route_rpc_to_master
   2322 def delete_stable_version(board):
   2323     """Modify stable version for the given board.
   2324 
   2325     Delete a stable version entry in afe_stable_versions table for a given
   2326     board, so default stable version will be used.
   2327 
   2328     @param board: Name of the board.
   2329     """
   2330     stable_version_utils.delete(board=board)
   2331 
   2332 
   2333 def get_tests_by_build(build, ignore_invalid_tests=True):
   2334     """Get the tests that are available for the specified build.
   2335 
   2336     @param build: unique name by which to refer to the image.
   2337     @param ignore_invalid_tests: flag on if unparsable tests are ignored.
   2338 
   2339     @return: A sorted list of all tests that are in the build specified.
   2340     """
   2341     # Collect the control files specified in this build
   2342     cfile_getter = control_file_lib._initialize_control_file_getter(build)
   2343     if suite_common.ENABLE_CONTROLS_IN_BATCH:
   2344         control_file_info_list = cfile_getter.get_suite_info()
   2345         control_file_list = control_file_info_list.keys()
   2346     else:
   2347         control_file_list = cfile_getter.get_control_file_list()
   2348 
   2349     test_objects = []
   2350     _id = 0
   2351     for control_file_path in control_file_list:
   2352         # Read and parse the control file
   2353         if suite_common.ENABLE_CONTROLS_IN_BATCH:
   2354             control_file = control_file_info_list[control_file_path]
   2355         else:
   2356             control_file = cfile_getter.get_control_file_contents(
   2357                     control_file_path)
   2358         try:
   2359             control_obj = control_data.parse_control_string(control_file)
   2360         except:
   2361             logging.info('Failed to parse control file: %s', control_file_path)
   2362             if not ignore_invalid_tests:
   2363                 raise
   2364 
   2365         # Extract the values needed for the AFE from the control_obj.
   2366         # The keys list represents attributes in the control_obj that
   2367         # are required by the AFE
   2368         keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental',
   2369                 'test_category', 'test_class', 'dependencies', 'run_verify',
   2370                 'sync_count', 'job_retries', 'path']
   2371 
   2372         test_object = {}
   2373         for key in keys:
   2374             test_object[key] = getattr(control_obj, key) if hasattr(
   2375                     control_obj, key) else ''
   2376 
   2377         # Unfortunately, the AFE expects different key-names for certain
   2378         # values, these must be corrected to avoid the risk of tests
   2379         # being omitted by the AFE.
   2380         # The 'id' is an additional value used in the AFE.
   2381         # The control_data parsing does not reference 'run_reset', but it
   2382         # is also used in the AFE and defaults to True.
   2383         test_object['id'] = _id
   2384         test_object['run_reset'] = True
   2385         test_object['description'] = test_object.get('doc', '')
   2386         test_object['test_time'] = test_object.get('time', 0)
   2387 
   2388         # TODO(crbug.com/873716) DEPRECATED. Remove entirely.
   2389         test_object['test_retry'] = 0
   2390 
   2391         # Fix the test name to be consistent with the current presentation
   2392         # of test names in the AFE.
   2393         testpath, subname = os.path.split(control_file_path)
   2394         testname = os.path.basename(testpath)
   2395         subname = subname.split('.')[1:]
   2396         if subname:
   2397             testname = '%s:%s' % (testname, ':'.join(subname))
   2398 
   2399         test_object['name'] = testname
   2400 
   2401         # Correct the test path as parse_control_string sets an empty string.
   2402         test_object['path'] = control_file_path
   2403 
   2404         _id += 1
   2405         test_objects.append(test_object)
   2406 
   2407     test_objects = sorted(test_objects, key=lambda x: x.get('name'))
   2408     return rpc_utils.prepare_for_serialization(test_objects)
   2409 
   2410 
   2411 @rpc_utils.route_rpc_to_master
   2412 def get_lab_health_indicators(board=None):
   2413     """Get the healthy indicators for whole lab.
   2414 
   2415     The indicators now includes:
   2416     1. lab is closed or not.
   2417     2. Available DUTs list for a given board.
   2418     3. Devserver capacity.
   2419     4. When is the next major DUT utilization (e.g. CQ is coming in 3 minutes).
   2420 
   2421     @param board: if board is specified, a list of available DUTs will be
   2422         returned for it. Otherwise, skip this indicator.
   2423 
   2424     @returns: A healthy indicator object including health info.
   2425     """
   2426     return LabHealthIndicator(None, None, None, None)
   2427