Home | History | Annotate | Download | only in afe
      1 # pylint: disable=missing-docstring
      2 
      3 """\
      4 Functions to expose over the RPC interface.
      5 
      6 For all modify* and delete* functions that ask for an 'id' parameter to
      7 identify the object to operate on, the id may be either
      8  * the database row ID
      9  * the name of the object (label name, hostname, user login, etc.)
     10  * a dictionary containing uniquely identifying field (this option should seldom
     11    be used)
     12 
     13 When specifying foreign key fields (i.e. adding hosts to a label, or adding
     14 users to an ACL group), the given value may be either the database row ID or the
     15 name of the object.
     16 
     17 All get* functions return lists of dictionaries.  Each dictionary represents one
     18 object and maps field names to values.
     19 
     20 Some examples:
     21 modify_host(2, hostname='myhost') # modify hostname of host with database ID 2
     22 modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2'
     23 modify_test('sleeptest', test_type='Client', params=', seconds=60')
     24 delete_acl_group(1) # delete by ID
     25 delete_acl_group('Everyone') # delete by name
     26 acl_group_add_users('Everyone', ['mbligh', 'showard'])
     27 get_jobs(owner='showard', status='Queued')
     28 
     29 See doctests/001_rpc_test.txt for (lots) more examples.
     30 """
     31 
     32 __author__ = 'showard (at] google.com (Steve Howard)'
     33 
     34 import ast
     35 import collections
     36 import contextlib
     37 import datetime
     38 import logging
     39 import os
     40 import sys
     41 import warnings
     42 
     43 from django.db import connection as db_connection
     44 from django.db import transaction
     45 from django.db.models import Count
     46 from django.db.utils import DatabaseError
     47 
     48 import common
     49 from autotest_lib.client.common_lib import control_data
     50 from autotest_lib.client.common_lib import error
     51 from autotest_lib.client.common_lib import global_config
     52 from autotest_lib.client.common_lib import priorities
     53 from autotest_lib.client.common_lib import time_utils
     54 from autotest_lib.client.common_lib.cros import dev_server
     55 from autotest_lib.frontend.afe import control_file as control_file_lib
     56 from autotest_lib.frontend.afe import model_attributes
     57 from autotest_lib.frontend.afe import model_logic
     58 from autotest_lib.frontend.afe import models
     59 from autotest_lib.frontend.afe import rpc_utils
     60 from autotest_lib.frontend.tko import models as tko_models
     61 from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
     62 from autotest_lib.server import frontend
     63 from autotest_lib.server import utils
     64 from autotest_lib.server.cros import provision
     65 from autotest_lib.server.cros.dynamic_suite import constants
     66 from autotest_lib.server.cros.dynamic_suite import control_file_getter
     67 from autotest_lib.server.cros.dynamic_suite import suite as SuiteBase
     68 from autotest_lib.server.cros.dynamic_suite import tools
     69 from autotest_lib.server.cros.dynamic_suite.suite import Suite
     70 from autotest_lib.server.lib import status_history
     71 from autotest_lib.site_utils import job_history
     72 from autotest_lib.site_utils import server_manager_utils
     73 from autotest_lib.site_utils import stable_version_utils
     74 
     75 
     76 _CONFIG = global_config.global_config
     77 
     78 # Definition of LabHealthIndicator
     79 LabHealthIndicator = collections.namedtuple(
     80         'LabHealthIndicator',
     81         [
     82                 'if_lab_close',
     83                 'available_duts',
     84                 'devserver_health',
     85                 'upcoming_builds',
     86         ]
     87 )
     88 
     89 RESPECT_STATIC_LABELS = global_config.global_config.get_config_value(
     90         'SKYLAB', 'respect_static_labels', type=bool, default=False)
     91 
     92 RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value(
     93         'SKYLAB', 'respect_static_attributes', type=bool, default=False)
     94 
     95 # Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py.
     96 
     97 # labels
     98 
     99 def modify_label(id, **data):
    100     """Modify a label.
    101 
    102     @param id: id or name of a label. More often a label name.
    103     @param data: New data for a label.
    104     """
    105     label_model = models.Label.smart_get(id)
    106     if label_model.is_replaced_by_static():
    107         raise error.UnmodifiableLabelException(
    108                 'Failed to delete label "%s" because it is a static label. '
    109                 'Use go/chromeos-skylab-inventory-tools to modify this '
    110                 'label.' % label_model.name)
    111 
    112     label_model.update_object(data)
    113 
    114     # Master forwards the RPC to shards
    115     if not utils.is_shard():
    116         rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False,
    117                              id=id, **data)
    118 
    119 
    120 def delete_label(id):
    121     """Delete a label.
    122 
    123     @param id: id or name of a label. More often a label name.
    124     """
    125     label_model = models.Label.smart_get(id)
    126     if label_model.is_replaced_by_static():
    127         raise error.UnmodifiableLabelException(
    128                 'Failed to delete label "%s" because it is a static label. '
    129                 'Use go/chromeos-skylab-inventory-tools to modify this '
    130                 'label.' % label_model.name)
    131 
    132     # Hosts that have the label to be deleted. Save this info before
    133     # the label is deleted to use it later.
    134     hosts = []
    135     for h in label_model.host_set.all():
    136         hosts.append(models.Host.smart_get(h.id))
    137     label_model.delete()
    138 
    139     # Master forwards the RPC to shards
    140     if not utils.is_shard():
    141         rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id)
    142 
    143 
    144 def add_label(name, ignore_exception_if_exists=False, **kwargs):
    145     """Adds a new label of a given name.
    146 
    147     @param name: label name.
    148     @param ignore_exception_if_exists: If True and the exception was
    149         thrown due to the duplicated label name when adding a label,
    150         then suppress the exception. Default is False.
    151     @param kwargs: keyword args that store more info about a label
    152         other than the name.
    153     @return: int/long id of a new label.
    154     """
    155     # models.Label.add_object() throws model_logic.ValidationError
    156     # when it is given a label name that already exists.
    157     # However, ValidationError can be thrown with different errors,
    158     # and those errors should be thrown up to the call chain.
    159     try:
    160         label = models.Label.add_object(name=name, **kwargs)
    161     except:
    162         exc_info = sys.exc_info()
    163         if ignore_exception_if_exists:
    164             label = rpc_utils.get_label(name)
    165             # If the exception is raised not because of duplicated
    166             # "name", then raise the original exception.
    167             if label is None:
    168                 raise exc_info[0], exc_info[1], exc_info[2]
    169         else:
    170             raise exc_info[0], exc_info[1], exc_info[2]
    171     return label.id
    172 
    173 
    174 def add_label_to_hosts(id, hosts):
    175     """Adds a label of the given id to the given hosts only in local DB.
    176 
    177     @param id: id or name of a label. More often a label name.
    178     @param hosts: The hostnames of hosts that need the label.
    179 
    180     @raises models.Label.DoesNotExist: If the label with id doesn't exist.
    181     """
    182     label = models.Label.smart_get(id)
    183     if label.is_replaced_by_static():
    184         label = models.StaticLabel.smart_get(label.name)
    185 
    186     host_objs = models.Host.smart_get_bulk(hosts)
    187     if label.platform:
    188         models.Host.check_no_platform(host_objs)
    189     # Ensure a host has no more than one board label with it.
    190     if label.name.startswith('board:'):
    191         models.Host.check_board_labels_allowed(host_objs, [label.name])
    192     label.host_set.add(*host_objs)
    193 
    194 
    195 def _create_label_everywhere(id, hosts):
    196     """
    197     Yet another method to create labels.
    198 
    199     ALERT! This method should be run only on master not shards!
    200     DO NOT RUN THIS ON A SHARD!!!  Deputies will hate you if you do!!!
    201 
    202     This method exists primarily to serve label_add_hosts() and
    203     host_add_labels().  Basically it pulls out the label check/add logic
    204     from label_add_hosts() into this nice method that not only creates
    205     the label but also tells the shards that service the hosts to also
    206     create the label.
    207 
    208     @param id: id or name of a label. More often a label name.
    209     @param hosts: A list of hostnames or ids. More often hostnames.
    210     """
    211     try:
    212         label = models.Label.smart_get(id)
    213     except models.Label.DoesNotExist:
    214         # This matches the type checks in smart_get, which is a hack
    215         # in and off itself. The aim here is to create any non-existent
    216         # label, which we cannot do if the 'id' specified isn't a label name.
    217         if isinstance(id, basestring):
    218             label = models.Label.smart_get(add_label(id))
    219         else:
    220             raise ValueError('Label id (%s) does not exist. Please specify '
    221                              'the argument, id, as a string (label name).'
    222                              % id)
    223 
    224     # Make sure the label exists on the shard with the same id
    225     # as it is on the master.
    226     # It is possible that the label is already in a shard because
    227     # we are adding a new label only to shards of hosts that the label
    228     # is going to be attached.
    229     # For example, we add a label L1 to a host in shard S1.
    230     # Master and S1 will have L1 but other shards won't.
    231     # Later, when we add the same label L1 to hosts in shards S1 and S2,
    232     # S1 already has the label but S2 doesn't.
    233     # S2 should have the new label without any problem.
    234     # We ignore exception in such a case.
    235     host_objs = models.Host.smart_get_bulk(hosts)
    236     rpc_utils.fanout_rpc(
    237             host_objs, 'add_label', include_hostnames=False,
    238             name=label.name, ignore_exception_if_exists=True,
    239             id=label.id, platform=label.platform)
    240 
    241 
    242 @rpc_utils.route_rpc_to_master
    243 def label_add_hosts(id, hosts):
    244     """Adds a label with the given id to the given hosts.
    245 
    246     This method should be run only on master not shards.
    247     The given label will be created if it doesn't exist, provided the `id`
    248     supplied is a label name not an int/long id.
    249 
    250     @param id: id or name of a label. More often a label name.
    251     @param hosts: A list of hostnames or ids. More often hostnames.
    252 
    253     @raises ValueError: If the id specified is an int/long (label id)
    254                         while the label does not exist.
    255     """
    256     # Create the label.
    257     _create_label_everywhere(id, hosts)
    258 
    259     # Add it to the master.
    260     add_label_to_hosts(id, hosts)
    261 
    262     # Add it to the shards.
    263     host_objs = models.Host.smart_get_bulk(hosts)
    264     rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id)
    265 
    266 
    267 def remove_label_from_hosts(id, hosts):
    268     """Removes a label of the given id from the given hosts only in local DB.
    269 
    270     @param id: id or name of a label.
    271     @param hosts: The hostnames of hosts that need to remove the label from.
    272     """
    273     host_objs = models.Host.smart_get_bulk(hosts)
    274     label = models.Label.smart_get(id)
    275     if label.is_replaced_by_static():
    276         raise error.UnmodifiableLabelException(
    277                 'Failed to remove label "%s" for hosts "%r" because it is a '
    278                 'static label. Use go/chromeos-skylab-inventory-tools to '
    279                 'modify this label.' % (label.name, hosts))
    280 
    281     label.host_set.remove(*host_objs)
    282 
    283 
    284 @rpc_utils.route_rpc_to_master
    285 def label_remove_hosts(id, hosts):
    286     """Removes a label of the given id from the given hosts.
    287 
    288     This method should be run only on master not shards.
    289 
    290     @param id: id or name of a label.
    291     @param hosts: A list of hostnames or ids. More often hostnames.
    292     """
    293     host_objs = models.Host.smart_get_bulk(hosts)
    294     remove_label_from_hosts(id, hosts)
    295 
    296     rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id)
    297 
    298 
    299 def get_labels(exclude_filters=(), **filter_data):
    300     """\
    301     @param exclude_filters: A sequence of dictionaries of filters.
    302 
    303     @returns A sequence of nested dictionaries of label information.
    304     """
    305     labels = models.Label.query_objects(filter_data)
    306     for exclude_filter in exclude_filters:
    307         labels = labels.exclude(**exclude_filter)
    308 
    309     if not RESPECT_STATIC_LABELS:
    310         return rpc_utils.prepare_rows_as_nested_dicts(labels, ())
    311 
    312     static_labels = models.StaticLabel.query_objects(filter_data)
    313     for exclude_filter in exclude_filters:
    314         static_labels = static_labels.exclude(**exclude_filter)
    315 
    316     non_static_lists = rpc_utils.prepare_rows_as_nested_dicts(labels, ())
    317     static_lists = rpc_utils.prepare_rows_as_nested_dicts(static_labels, ())
    318 
    319     label_ids = [label.id for label in labels]
    320     replaced = models.ReplacedLabel.objects.filter(label__id__in=label_ids)
    321     replaced_ids = {r.label_id for r in replaced}
    322     replaced_label_names = {l.name for l in labels if l.id in replaced_ids}
    323 
    324     return_lists  = []
    325     for non_static_label in non_static_lists:
    326         if non_static_label.get('id') not in replaced_ids:
    327             return_lists.append(non_static_label)
    328 
    329     for static_label in static_lists:
    330         if static_label.get('name') in replaced_label_names:
    331             return_lists.append(static_label)
    332 
    333     return return_lists
    334 
    335 
    336 # hosts
    337 
    338 def add_host(hostname, status=None, locked=None, lock_reason='', protection=None):
    339     if locked and not lock_reason:
    340         raise model_logic.ValidationError(
    341             {'locked': 'Please provide a reason for locking when adding host.'})
    342 
    343     return models.Host.add_object(hostname=hostname, status=status,
    344                                   locked=locked, lock_reason=lock_reason,
    345                                   protection=protection).id
    346 
    347 
    348 @rpc_utils.route_rpc_to_master
    349 def modify_host(id, **kwargs):
    350     """Modify local attributes of a host.
    351 
    352     If this is called on the master, but the host is assigned to a shard, this
    353     will call `modify_host_local` RPC to the responsible shard. This means if
    354     a host is being locked using this function, this change will also propagate
    355     to shards.
    356     When this is called on a shard, the shard just routes the RPC to the master
    357     and does nothing.
    358 
    359     @param id: id of the host to modify.
    360     @param kwargs: key=value pairs of values to set on the host.
    361     """
    362     rpc_utils.check_modify_host(kwargs)
    363     host = models.Host.smart_get(id)
    364     try:
    365         rpc_utils.check_modify_host_locking(host, kwargs)
    366     except model_logic.ValidationError as e:
    367         if not kwargs.get('force_modify_locking', False):
    368             raise
    369         logging.exception('The following exception will be ignored and lock '
    370                           'modification will be enforced. %s', e)
    371 
    372     # This is required to make `lock_time` for a host be exactly same
    373     # between the master and a shard.
    374     if kwargs.get('locked', None) and 'lock_time' not in kwargs:
    375         kwargs['lock_time'] = datetime.datetime.now()
    376 
    377     # force_modifying_locking is not an internal field in database, remove.
    378     shard_kwargs = dict(kwargs)
    379     shard_kwargs.pop('force_modify_locking', None)
    380     rpc_utils.fanout_rpc([host], 'modify_host_local',
    381                          include_hostnames=False, id=id, **shard_kwargs)
    382 
    383     # Update the local DB **after** RPC fanout is complete.
    384     # This guarantees that the master state is only updated if the shards were
    385     # correctly updated.
    386     # In case the shard update fails mid-flight and the master-shard desync, we
    387     # always consider the master state to be the source-of-truth, and any
    388     # (automated) corrective actions will revert the (partial) shard updates.
    389     host.update_object(kwargs)
    390 
    391 
    392 def modify_host_local(id, **kwargs):
    393     """Modify host attributes in local DB.
    394 
    395     @param id: Host id.
    396     @param kwargs: key=value pairs of values to set on the host.
    397     """
    398     models.Host.smart_get(id).update_object(kwargs)
    399 
    400 
    401 @rpc_utils.route_rpc_to_master
    402 def modify_hosts(host_filter_data, update_data):
    403     """Modify local attributes of multiple hosts.
    404 
    405     If this is called on the master, but one of the hosts in that match the
    406     filters is assigned to a shard, this will call `modify_hosts_local` RPC
    407     to the responsible shard.
    408     When this is called on a shard, the shard just routes the RPC to the master
    409     and does nothing.
    410 
    411     The filters are always applied on the master, not on the shards. This means
    412     if the states of a host differ on the master and a shard, the state on the
    413     master will be used. I.e. this means:
    414     A host was synced to Shard 1. On Shard 1 the status of the host was set to
    415     'Repair Failed'.
    416     - A call to modify_hosts with host_filter_data={'status': 'Ready'} will
    417     update the host (both on the shard and on the master), because the state
    418     of the host as the master knows it is still 'Ready'.
    419     - A call to modify_hosts with host_filter_data={'status': 'Repair failed'
    420     will not update the host, because the filter doesn't apply on the master.
    421 
    422     @param host_filter_data: Filters out which hosts to modify.
    423     @param update_data: A dictionary with the changes to make to the hosts.
    424     """
    425     update_data = update_data.copy()
    426     rpc_utils.check_modify_host(update_data)
    427     hosts = models.Host.query_objects(host_filter_data)
    428 
    429     affected_shard_hostnames = set()
    430     affected_host_ids = []
    431 
    432     # Check all hosts before changing data for exception safety.
    433     for host in hosts:
    434         try:
    435             rpc_utils.check_modify_host_locking(host, update_data)
    436         except model_logic.ValidationError as e:
    437             if not update_data.get('force_modify_locking', False):
    438                 raise
    439             logging.exception('The following exception will be ignored and '
    440                               'lock modification will be enforced. %s', e)
    441 
    442         if host.shard:
    443             affected_shard_hostnames.add(host.shard.hostname)
    444             affected_host_ids.append(host.id)
    445 
    446     # This is required to make `lock_time` for a host be exactly same
    447     # between the master and a shard.
    448     if update_data.get('locked', None) and 'lock_time' not in update_data:
    449         update_data['lock_time'] = datetime.datetime.now()
    450     for host in hosts:
    451         host.update_object(update_data)
    452 
    453     update_data.pop('force_modify_locking', None)
    454     # Caution: Changing the filter from the original here. See docstring.
    455     rpc_utils.run_rpc_on_multiple_hostnames(
    456             'modify_hosts_local', affected_shard_hostnames,
    457             host_filter_data={'id__in': affected_host_ids},
    458             update_data=update_data)
    459 
    460 
    461 def modify_hosts_local(host_filter_data, update_data):
    462     """Modify attributes of hosts in local DB.
    463 
    464     @param host_filter_data: Filters out which hosts to modify.
    465     @param update_data: A dictionary with the changes to make to the hosts.
    466     """
    467     for host in models.Host.query_objects(host_filter_data):
    468         host.update_object(update_data)
    469 
    470 
    471 def add_labels_to_host(id, labels):
    472     """Adds labels to a given host only in local DB.
    473 
    474     @param id: id or hostname for a host.
    475     @param labels: ids or names for labels.
    476     """
    477     label_objs = models.Label.smart_get_bulk(labels)
    478     if not RESPECT_STATIC_LABELS:
    479         models.Host.smart_get(id).labels.add(*label_objs)
    480     else:
    481         static_labels, non_static_labels = models.Host.classify_label_objects(
    482             label_objs)
    483         host = models.Host.smart_get(id)
    484         host.static_labels.add(*static_labels)
    485         host.labels.add(*non_static_labels)
    486 
    487 
    488 @rpc_utils.route_rpc_to_master
    489 def host_add_labels(id, labels):
    490     """Adds labels to a given host.
    491 
    492     @param id: id or hostname for a host.
    493     @param labels: ids or names for labels.
    494 
    495     @raises ValidationError: If adding more than one platform/board label.
    496     """
    497     # Create the labels on the master/shards.
    498     for label in labels:
    499         _create_label_everywhere(label, [id])
    500 
    501     label_objs = models.Label.smart_get_bulk(labels)
    502 
    503     platforms = [label.name for label in label_objs if label.platform]
    504     boards = [label.name for label in label_objs
    505               if label.name.startswith('board:')]
    506     if len(platforms) > 1 or not utils.board_labels_allowed(boards):
    507         raise model_logic.ValidationError(
    508             {'labels': ('Adding more than one platform label, or a list of '
    509                         'non-compatible board labels.: %s %s' %
    510                         (', '.join(platforms), ', '.join(boards)))})
    511 
    512     host_obj = models.Host.smart_get(id)
    513     if platforms:
    514         models.Host.check_no_platform([host_obj])
    515     if boards:
    516         models.Host.check_board_labels_allowed([host_obj], labels)
    517     add_labels_to_host(id, labels)
    518 
    519     rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False,
    520                          id=id, labels=labels)
    521 
    522 
    523 def remove_labels_from_host(id, labels):
    524     """Removes labels from a given host only in local DB.
    525 
    526     @param id: id or hostname for a host.
    527     @param labels: ids or names for labels.
    528     """
    529     label_objs = models.Label.smart_get_bulk(labels)
    530     if not RESPECT_STATIC_LABELS:
    531         models.Host.smart_get(id).labels.remove(*label_objs)
    532     else:
    533         static_labels, non_static_labels = models.Host.classify_label_objects(
    534                 label_objs)
    535         host = models.Host.smart_get(id)
    536         host.labels.remove(*non_static_labels)
    537         if static_labels:
    538             logging.info('Cannot remove labels "%r" for host "%r" due to they '
    539                          'are static labels. Use '
    540                          'go/chromeos-skylab-inventory-tools to modify these '
    541                          'labels.', static_labels, id)
    542 
    543 
    544 @rpc_utils.route_rpc_to_master
    545 def host_remove_labels(id, labels):
    546     """Removes labels from a given host.
    547 
    548     @param id: id or hostname for a host.
    549     @param labels: ids or names for labels.
    550     """
    551     remove_labels_from_host(id, labels)
    552 
    553     host_obj = models.Host.smart_get(id)
    554     rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False,
    555                          id=id, labels=labels)
    556 
    557 
    558 def get_host_attribute(attribute, **host_filter_data):
    559     """
    560     @param attribute: string name of attribute
    561     @param host_filter_data: filter data to apply to Hosts to choose hosts to
    562                              act upon
    563     """
    564     hosts = rpc_utils.get_host_query((), False, True, host_filter_data)
    565     hosts = list(hosts)
    566     models.Host.objects.populate_relationships(hosts, models.HostAttribute,
    567                                                'attribute_list')
    568     host_attr_dicts = []
    569     host_objs = []
    570     for host_obj in hosts:
    571         for attr_obj in host_obj.attribute_list:
    572             if attr_obj.attribute == attribute:
    573                 host_attr_dicts.append(attr_obj.get_object_dict())
    574                 host_objs.append(host_obj)
    575 
    576     if RESPECT_STATIC_ATTRIBUTES:
    577         for host_attr, host_obj in zip(host_attr_dicts, host_objs):
    578             static_attrs = models.StaticHostAttribute.query_objects(
    579                     {'host_id': host_obj.id, 'attribute': attribute})
    580             if len(static_attrs) > 0:
    581                 host_attr['value'] = static_attrs[0].value
    582 
    583     return rpc_utils.prepare_for_serialization(host_attr_dicts)
    584 
    585 
    586 @rpc_utils.route_rpc_to_master
    587 def set_host_attribute(attribute, value, **host_filter_data):
    588     """Set an attribute on hosts.
    589 
    590     This RPC is a shim that forwards calls to master to be handled there.
    591 
    592     @param attribute: string name of attribute
    593     @param value: string, or None to delete an attribute
    594     @param host_filter_data: filter data to apply to Hosts to choose hosts to
    595                              act upon
    596     """
    597     assert not utils.is_shard()
    598     set_host_attribute_impl(attribute, value, **host_filter_data)
    599 
    600 
    601 def set_host_attribute_impl(attribute, value, **host_filter_data):
    602     """Set an attribute on hosts.
    603 
    604     *** DO NOT CALL THIS RPC from client code ***
    605     This RPC exists for master-shard communication only.
    606     Call set_host_attribute instead.
    607 
    608     @param attribute: string name of attribute
    609     @param value: string, or None to delete an attribute
    610     @param host_filter_data: filter data to apply to Hosts to choose hosts to
    611                              act upon
    612     """
    613     assert host_filter_data # disallow accidental actions on all hosts
    614     hosts = models.Host.query_objects(host_filter_data)
    615     models.AclGroup.check_for_acl_violation_hosts(hosts)
    616     for host in hosts:
    617         host.set_or_delete_attribute(attribute, value)
    618 
    619     # Master forwards this RPC to shards.
    620     if not utils.is_shard():
    621         rpc_utils.fanout_rpc(hosts, 'set_host_attribute_impl', False,
    622                 attribute=attribute, value=value, **host_filter_data)
    623 
    624 
    625 @rpc_utils.forward_single_host_rpc_to_shard
    626 def delete_host(id):
    627     models.Host.smart_get(id).delete()
    628 
    629 
    630 def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
    631               valid_only=True, include_current_job=False, **filter_data):
    632     """Get a list of dictionaries which contains the information of hosts.
    633 
    634     @param multiple_labels: match hosts in all of the labels given.  Should
    635             be a list of label names.
    636     @param exclude_only_if_needed_labels: Deprecated. Raise error if it's True.
    637     @param include_current_job: Set to True to include ids of currently running
    638             job and special task.
    639     """
    640     if exclude_only_if_needed_labels:
    641         raise error.RPCException('exclude_only_if_needed_labels is deprecated')
    642 
    643     hosts = rpc_utils.get_host_query(multiple_labels,
    644                                      exclude_only_if_needed_labels,
    645                                      valid_only, filter_data)
    646     hosts = list(hosts)
    647     models.Host.objects.populate_relationships(hosts, models.Label,
    648                                                'label_list')
    649     models.Host.objects.populate_relationships(hosts, models.AclGroup,
    650                                                'acl_list')
    651     models.Host.objects.populate_relationships(hosts, models.HostAttribute,
    652                                                'attribute_list')
    653     models.Host.objects.populate_relationships(hosts,
    654                                                models.StaticHostAttribute,
    655                                                'staticattribute_list')
    656     host_dicts = []
    657     for host_obj in hosts:
    658         host_dict = host_obj.get_object_dict()
    659         host_dict['acls'] = [acl.name for acl in host_obj.acl_list]
    660         host_dict['attributes'] = dict((attribute.attribute, attribute.value)
    661                                        for attribute in host_obj.attribute_list)
    662         if RESPECT_STATIC_LABELS:
    663             label_list = []
    664             # Only keep static labels which has a corresponding entries in
    665             # afe_labels.
    666             for label in host_obj.label_list:
    667                 if label.is_replaced_by_static():
    668                     static_label = models.StaticLabel.smart_get(label.name)
    669                     label_list.append(static_label)
    670                 else:
    671                     label_list.append(label)
    672 
    673             host_dict['labels'] = [label.name for label in label_list]
    674             host_dict['platform'] = rpc_utils.find_platform(
    675                     host_obj.hostname, label_list)
    676         else:
    677             host_dict['labels'] = [label.name for label in host_obj.label_list]
    678             host_dict['platform'] = rpc_utils.find_platform(
    679                     host_obj.hostname, host_obj.label_list)
    680 
    681         if RESPECT_STATIC_ATTRIBUTES:
    682             # Overwrite attribute with values in afe_static_host_attributes.
    683             for attr in host_obj.staticattribute_list:
    684                 if attr.attribute in host_dict['attributes']:
    685                     host_dict['attributes'][attr.attribute] = attr.value
    686 
    687         if include_current_job:
    688             host_dict['current_job'] = None
    689             host_dict['current_special_task'] = None
    690             entries = models.HostQueueEntry.objects.filter(
    691                     host_id=host_dict['id'], active=True, complete=False)
    692             if entries:
    693                 host_dict['current_job'] = (
    694                         entries[0].get_object_dict()['job'])
    695             tasks = models.SpecialTask.objects.filter(
    696                     host_id=host_dict['id'], is_active=True, is_complete=False)
    697             if tasks:
    698                 host_dict['current_special_task'] = (
    699                         '%d-%s' % (tasks[0].get_object_dict()['id'],
    700                                    tasks[0].get_object_dict()['task'].lower()))
    701         host_dicts.append(host_dict)
    702 
    703     return rpc_utils.prepare_for_serialization(host_dicts)
    704 
    705 
    706 def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
    707                   valid_only=True, **filter_data):
    708     """
    709     Same parameters as get_hosts().
    710 
    711     @returns The number of matching hosts.
    712     """
    713     if exclude_only_if_needed_labels:
    714         raise error.RPCException('exclude_only_if_needed_labels is deprecated')
    715 
    716     hosts = rpc_utils.get_host_query(multiple_labels,
    717                                      exclude_only_if_needed_labels,
    718                                      valid_only, filter_data)
    719     return len(hosts)
    720 
    721 
    722 # tests
    723 
    724 def get_tests(**filter_data):
    725     return rpc_utils.prepare_for_serialization(
    726         models.Test.list_objects(filter_data))
    727 
    728 
    729 def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
    730     """Gets the counts of all passed and failed tests from the matching jobs.
    731 
    732     @param job_name_prefix: Name prefix of the jobs to get the summary
    733            from, e.g., 'butterfly-release/r40-6457.21.0/bvt-cq/'. Prefix
    734            matching is case insensitive.
    735     @param label_name: Label that must be set in the jobs, e.g.,
    736             'cros-version:butterfly-release/R40-6457.21.0'.
    737 
    738     @returns A summary of the counts of all the passed and failed tests.
    739     """
    740     job_ids = list(models.Job.objects.filter(
    741             name__istartswith=job_name_prefix,
    742             dependency_labels__name=label_name).values_list(
    743                 'pk', flat=True))
    744     summary = {'passed': 0, 'failed': 0}
    745     if not job_ids:
    746         return summary
    747 
    748     counts = (tko_models.TestView.objects.filter(
    749             afe_job_id__in=job_ids).exclude(
    750                 test_name='SERVER_JOB').exclude(
    751                     test_name__startswith='CLIENT_JOB').values(
    752                         'status').annotate(
    753                             count=Count('status')))
    754     for status in counts:
    755         if status['status'] == 'GOOD':
    756             summary['passed'] += status['count']
    757         else:
    758             summary['failed'] += status['count']
    759     return summary
    760 
    761 
    762 # profilers
    763 
    764 def add_profiler(name, description=None):
    765     return models.Profiler.add_object(name=name, description=description).id
    766 
    767 
    768 def modify_profiler(id, **data):
    769     models.Profiler.smart_get(id).update_object(data)
    770 
    771 
    772 def delete_profiler(id):
    773     models.Profiler.smart_get(id).delete()
    774 
    775 
    776 def get_profilers(**filter_data):
    777     return rpc_utils.prepare_for_serialization(
    778         models.Profiler.list_objects(filter_data))
    779 
    780 
    781 # users
    782 
    783 def get_users(**filter_data):
    784     return rpc_utils.prepare_for_serialization(
    785         models.User.list_objects(filter_data))
    786 
    787 
    788 # acl groups
    789 
    790 def add_acl_group(name, description=None):
    791     group = models.AclGroup.add_object(name=name, description=description)
    792     group.users.add(models.User.current_user())
    793     return group.id
    794 
    795 
    796 def modify_acl_group(id, **data):
    797     group = models.AclGroup.smart_get(id)
    798     group.check_for_acl_violation_acl_group()
    799     group.update_object(data)
    800     group.add_current_user_if_empty()
    801 
    802 
    803 def acl_group_add_users(id, users):
    804     group = models.AclGroup.smart_get(id)
    805     group.check_for_acl_violation_acl_group()
    806     users = models.User.smart_get_bulk(users)
    807     group.users.add(*users)
    808 
    809 
    810 def acl_group_remove_users(id, users):
    811     group = models.AclGroup.smart_get(id)
    812     group.check_for_acl_violation_acl_group()
    813     users = models.User.smart_get_bulk(users)
    814     group.users.remove(*users)
    815     group.add_current_user_if_empty()
    816 
    817 
    818 def acl_group_add_hosts(id, hosts):
    819     group = models.AclGroup.smart_get(id)
    820     group.check_for_acl_violation_acl_group()
    821     hosts = models.Host.smart_get_bulk(hosts)
    822     group.hosts.add(*hosts)
    823     group.on_host_membership_change()
    824 
    825 
    826 def acl_group_remove_hosts(id, hosts):
    827     group = models.AclGroup.smart_get(id)
    828     group.check_for_acl_violation_acl_group()
    829     hosts = models.Host.smart_get_bulk(hosts)
    830     group.hosts.remove(*hosts)
    831     group.on_host_membership_change()
    832 
    833 
    834 def delete_acl_group(id):
    835     models.AclGroup.smart_get(id).delete()
    836 
    837 
    838 def get_acl_groups(**filter_data):
    839     acl_groups = models.AclGroup.list_objects(filter_data)
    840     for acl_group in acl_groups:
    841         acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])
    842         acl_group['users'] = [user.login
    843                               for user in acl_group_obj.users.all()]
    844         acl_group['hosts'] = [host.hostname
    845                               for host in acl_group_obj.hosts.all()]
    846     return rpc_utils.prepare_for_serialization(acl_groups)
    847 
    848 
    849 # jobs
    850 
    851 def generate_control_file(tests=(), profilers=(),
    852                           client_control_file='', use_container=False,
    853                           profile_only=None, db_tests=True,
    854                           test_source_build=None):
    855     """
    856     Generates a client-side control file to run tests.
    857 
    858     @param tests List of tests to run. See db_tests for more information.
    859     @param profilers List of profilers to activate during the job.
    860     @param client_control_file The contents of a client-side control file to
    861         run at the end of all tests.  If this is supplied, all tests must be
    862         client side.
    863         TODO: in the future we should support server control files directly
    864         to wrap with a kernel.  That'll require changing the parameter
    865         name and adding a boolean to indicate if it is a client or server
    866         control file.
    867     @param use_container unused argument today.  TODO: Enable containers
    868         on the host during a client side test.
    869     @param profile_only A boolean that indicates what default profile_only
    870         mode to use in the control file. Passing None will generate a
    871         control file that does not explcitly set the default mode at all.
    872     @param db_tests: if True, the test object can be found in the database
    873                      backing the test model. In this case, tests is a tuple
    874                      of test IDs which are used to retrieve the test objects
    875                      from the database. If False, tests is a tuple of test
    876                      dictionaries stored client-side in the AFE.
    877     @param test_source_build: Build to be used to retrieve test code. Default
    878                               to None.
    879 
    880     @returns a dict with the following keys:
    881         control_file: str, The control file text.
    882         is_server: bool, is the control file a server-side control file?
    883         synch_count: How many machines the job uses per autoserv execution.
    884             synch_count == 1 means the job is asynchronous.
    885         dependencies: A list of the names of labels on which the job depends.
    886     """
    887     if not tests and not client_control_file:
    888         return dict(control_file='', is_server=False, synch_count=1,
    889                     dependencies=[])
    890 
    891     cf_info, test_objects, profiler_objects = (
    892         rpc_utils.prepare_generate_control_file(tests, profilers,
    893                                                 db_tests))
    894     cf_info['control_file'] = control_file_lib.generate_control(
    895         tests=test_objects, profilers=profiler_objects,
    896         is_server=cf_info['is_server'],
    897         client_control_file=client_control_file, profile_only=profile_only,
    898         test_source_build=test_source_build)
    899     return cf_info
    900 
    901 
    902 def create_job_page_handler(name, priority, control_file, control_type,
    903                             image=None, hostless=False, firmware_rw_build=None,
    904                             firmware_ro_build=None, test_source_build=None,
    905                             is_cloning=False, cheets_build=None, **kwargs):
    906     """\
    907     Create and enqueue a job.
    908 
    909     @param name name of this job
    910     @param priority Integer priority of this job.  Higher is more important.
    911     @param control_file String contents of the control file.
    912     @param control_type Type of control file, Client or Server.
    913     @param image: ChromeOS build to be installed in the dut. Default to None.
    914     @param firmware_rw_build: Firmware build to update RW firmware. Default to
    915                               None, i.e., RW firmware will not be updated.
    916     @param firmware_ro_build: Firmware build to update RO firmware. Default to
    917                               None, i.e., RO firmware will not be updated.
    918     @param test_source_build: Build to be used to retrieve test code. Default
    919                               to None.
    920     @param is_cloning: True if creating a cloning job.
    921     @param cheets_build: ChromeOS Android build  to be installed in the dut.
    922                          Default to None. Cheets build will not be updated.
    923     @param kwargs extra args that will be required by create_suite_job or
    924                   create_job.
    925 
    926     @returns The created Job id number.
    927     """
    928     test_args = {}
    929     if kwargs.get('args'):
    930         # args' format is: ['disable_sysinfo=False', 'fast=True', ...]
    931         args = kwargs.get('args')
    932         for arg in args:
    933             k, v = arg.split('=')[0], arg.split('=')[1]
    934             test_args[k] = v
    935 
    936     if is_cloning:
    937         logging.info('Start to clone a new job')
    938         # When cloning a job, hosts and meta_hosts should not exist together,
    939         # which would cause host-scheduler to schedule two hqe jobs to one host
    940         # at the same time, and crash itself. Clear meta_hosts for this case.
    941         if kwargs.get('hosts') and kwargs.get('meta_hosts'):
    942             kwargs['meta_hosts'] = []
    943     else:
    944         logging.info('Start to create a new job')
    945     control_file = rpc_utils.encode_ascii(control_file)
    946     if not control_file:
    947         raise model_logic.ValidationError({
    948                 'control_file' : "Control file cannot be empty"})
    949 
    950     if image and hostless:
    951         builds = {}
    952         builds[provision.CROS_VERSION_PREFIX] = image
    953         if cheets_build:
    954             builds[provision.CROS_ANDROID_VERSION_PREFIX] = cheets_build
    955         if firmware_rw_build:
    956             builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
    957         if firmware_ro_build:
    958             builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
    959         return create_suite_job(
    960                 name=name, control_file=control_file, priority=priority,
    961                 builds=builds, test_source_build=test_source_build,
    962                 is_cloning=is_cloning, test_args=test_args, **kwargs)
    963 
    964     return create_job(name, priority, control_file, control_type, image=image,
    965                       hostless=hostless, test_args=test_args, **kwargs)
    966 
    967 
    968 @rpc_utils.route_rpc_to_master
    969 def create_job(
    970         name,
    971         priority,
    972         control_file,
    973         control_type,
    974         hosts=(),
    975         meta_hosts=(),
    976         one_time_hosts=(),
    977         synch_count=None,
    978         is_template=False,
    979         timeout=None,
    980         timeout_mins=None,
    981         max_runtime_mins=None,
    982         run_verify=False,
    983         email_list='',
    984         dependencies=(),
    985         reboot_before=None,
    986         reboot_after=None,
    987         parse_failed_repair=None,
    988         hostless=False,
    989         keyvals=None,
    990         drone_set=None,
    991         image=None,
    992         parent_job_id=None,
    993         test_retry=0,
    994         run_reset=True,
    995         require_ssp=None,
    996         test_args=None,
    997         **kwargs):
    998     """\
    999     Create and enqueue a job.
   1000 
   1001     @param name name of this job
   1002     @param priority Integer priority of this job.  Higher is more important.
   1003     @param control_file String contents of the control file.
   1004     @param control_type Type of control file, Client or Server.
   1005     @param synch_count How many machines the job uses per autoserv execution.
   1006         synch_count == 1 means the job is asynchronous.  If an atomic group is
   1007         given this value is treated as a minimum.
   1008     @param is_template If true then create a template job.
   1009     @param timeout Hours after this call returns until the job times out.
   1010     @param timeout_mins Minutes after this call returns until the job times
   1011         out.
   1012     @param max_runtime_mins Minutes from job starting time until job times out
   1013     @param run_verify Should the host be verified before running the test?
   1014     @param email_list String containing emails to mail when the job is done
   1015     @param dependencies List of label names on which this job depends
   1016     @param reboot_before Never, If dirty, or Always
   1017     @param reboot_after Never, If all tests passed, or Always
   1018     @param parse_failed_repair if true, results of failed repairs launched by
   1019         this job will be parsed as part of the job.
   1020     @param hostless if true, create a hostless job
   1021     @param keyvals dict of keyvals to associate with the job
   1022     @param hosts List of hosts to run job on.
   1023     @param meta_hosts List where each entry is a label name, and for each entry
   1024         one host will be chosen from that label to run the job on.
   1025     @param one_time_hosts List of hosts not in the database to run the job on.
   1026     @param drone_set The name of the drone set to run this test on.
   1027     @param image OS image to install before running job.
   1028     @param parent_job_id id of a job considered to be parent of created job.
   1029     @param test_retry Number of times to retry test if the test did not
   1030         complete successfully. (optional, default: 0)
   1031     @param run_reset Should the host be reset before running the test?
   1032     @param require_ssp Set to True to require server-side packaging to run the
   1033                        test. If it's set to None, drone will still try to run
   1034                        the server side with server-side packaging. If the
   1035                        autotest-server package doesn't exist for the build or
   1036                        image is not set, drone will run the test without server-
   1037                        side packaging. Default is None.
   1038     @param test_args A dict of args passed to be injected into control file.
   1039     @param kwargs extra keyword args. NOT USED.
   1040 
   1041     @returns The created Job id number.
   1042     """
   1043     if test_args:
   1044         control_file = tools.inject_vars(test_args, control_file)
   1045     if image:
   1046         dependencies += (provision.image_version_to_label(image),)
   1047     return rpc_utils.create_job_common(
   1048             name=name,
   1049             priority=priority,
   1050             control_type=control_type,
   1051             control_file=control_file,
   1052             hosts=hosts,
   1053             meta_hosts=meta_hosts,
   1054             one_time_hosts=one_time_hosts,
   1055             synch_count=synch_count,
   1056             is_template=is_template,
   1057             timeout=timeout,
   1058             timeout_mins=timeout_mins,
   1059             max_runtime_mins=max_runtime_mins,
   1060             run_verify=run_verify,
   1061             email_list=email_list,
   1062             dependencies=dependencies,
   1063             reboot_before=reboot_before,
   1064             reboot_after=reboot_after,
   1065             parse_failed_repair=parse_failed_repair,
   1066             hostless=hostless,
   1067             keyvals=keyvals,
   1068             drone_set=drone_set,
   1069             parent_job_id=parent_job_id,
   1070             test_retry=test_retry,
   1071             run_reset=run_reset,
   1072             require_ssp=require_ssp)
   1073 
   1074 
   1075 def abort_host_queue_entries(**filter_data):
   1076     """\
   1077     Abort a set of host queue entries.
   1078 
   1079     @return: A list of dictionaries, each contains information
   1080              about an aborted HQE.
   1081     """
   1082     query = models.HostQueueEntry.query_objects(filter_data)
   1083 
   1084     # Dont allow aborts on:
   1085     #   1. Jobs that have already completed (whether or not they were aborted)
   1086     #   2. Jobs that we have already been aborted (but may not have completed)
   1087     query = query.filter(complete=False).filter(aborted=False)
   1088     models.AclGroup.check_abort_permissions(query)
   1089     host_queue_entries = list(query.select_related())
   1090     rpc_utils.check_abort_synchronous_jobs(host_queue_entries)
   1091 
   1092     models.HostQueueEntry.abort_host_queue_entries(host_queue_entries)
   1093     hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id,
   1094                  'Job name': hqe.job.name} for hqe in host_queue_entries]
   1095     return hqe_info
   1096 
   1097 
   1098 def abort_special_tasks(**filter_data):
   1099     """\
   1100     Abort the special task, or tasks, specified in the filter.
   1101     """
   1102     query = models.SpecialTask.query_objects(filter_data)
   1103     special_tasks = query.filter(is_active=True)
   1104     for task in special_tasks:
   1105         task.abort()
   1106 
   1107 
   1108 def _call_special_tasks_on_hosts(task, hosts):
   1109     """\
   1110     Schedules a set of hosts for a special task.
   1111 
   1112     @returns A list of hostnames that a special task was created for.
   1113     """
   1114     models.AclGroup.check_for_acl_violation_hosts(hosts)
   1115     shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
   1116     if shard_host_map and not utils.is_shard():
   1117         raise ValueError('The following hosts are on shards, please '
   1118                          'follow the link to the shards and create jobs '
   1119                          'there instead. %s.' % shard_host_map)
   1120     for host in hosts:
   1121         models.SpecialTask.schedule_special_task(host, task)
   1122     return list(sorted(host.hostname for host in hosts))
   1123 
   1124 
   1125 def _forward_special_tasks_on_hosts(task, rpc, **filter_data):
   1126     """Forward special tasks to corresponding shards.
   1127 
   1128     For master, when special tasks are fired on hosts that are sharded,
   1129     forward the RPC to corresponding shards.
   1130 
   1131     For shard, create special task records in local DB.
   1132 
   1133     @param task: Enum value of frontend.afe.models.SpecialTask.Task
   1134     @param rpc: RPC name to forward.
   1135     @param filter_data: Filter keywords to be used for DB query.
   1136 
   1137     @return: A list of hostnames that a special task was created for.
   1138     """
   1139     hosts = models.Host.query_objects(filter_data)
   1140     shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
   1141 
   1142     # Filter out hosts on a shard from those on the master, forward
   1143     # rpcs to the shard with an additional hostname__in filter, and
   1144     # create a local SpecialTask for each remaining host.
   1145     if shard_host_map and not utils.is_shard():
   1146         hosts = [h for h in hosts if h.shard is None]
   1147         for shard, hostnames in shard_host_map.iteritems():
   1148 
   1149             # The main client of this module is the frontend website, and
   1150             # it invokes it with an 'id' or an 'id__in' filter. Regardless,
   1151             # the 'hostname' filter should narrow down the list of hosts on
   1152             # each shard even though we supply all the ids in filter_data.
   1153             # This method uses hostname instead of id because it fits better
   1154             # with the overall architecture of redirection functions in
   1155             # rpc_utils.
   1156             shard_filter = filter_data.copy()
   1157             shard_filter['hostname__in'] = hostnames
   1158             rpc_utils.run_rpc_on_multiple_hostnames(
   1159                     rpc, [shard], **shard_filter)
   1160 
   1161     # There is a race condition here if someone assigns a shard to one of these
   1162     # hosts before we create the task. The host will stay on the master if:
   1163     # 1. The host is not Ready
   1164     # 2. The host is Ready but has a task
   1165     # But if the host is Ready and doesn't have a task yet, it will get sent
   1166     # to the shard as we're creating a task here.
   1167 
   1168     # Given that we only rarely verify Ready hosts it isn't worth putting this
   1169     # entire method in a transaction. The worst case scenario is that we have
   1170     # a verify running on a Ready host while the shard is using it, if the
   1171     # verify fails no subsequent tasks will be created against the host on the
   1172     # master, and verifies are safe enough that this is OK.
   1173     return _call_special_tasks_on_hosts(task, hosts)
   1174 
   1175 
   1176 def reverify_hosts(**filter_data):
   1177     """\
   1178     Schedules a set of hosts for verify.
   1179 
   1180     @returns A list of hostnames that a verify task was created for.
   1181     """
   1182     return _forward_special_tasks_on_hosts(
   1183             models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data)
   1184 
   1185 
   1186 def repair_hosts(**filter_data):
   1187     """\
   1188     Schedules a set of hosts for repair.
   1189 
   1190     @returns A list of hostnames that a repair task was created for.
   1191     """
   1192     return _forward_special_tasks_on_hosts(
   1193             models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data)
   1194 
   1195 
   1196 def get_jobs(not_yet_run=False, running=False, finished=False,
   1197              suite=False, sub=False, standalone=False, **filter_data):
   1198     """\
   1199     Extra status filter args for get_jobs:
   1200     -not_yet_run: Include only jobs that have not yet started running.
   1201     -running: Include only jobs that have start running but for which not
   1202     all hosts have completed.
   1203     -finished: Include only jobs for which all hosts have completed (or
   1204     aborted).
   1205 
   1206     Extra type filter args for get_jobs:
   1207     -suite: Include only jobs with child jobs.
   1208     -sub: Include only jobs with a parent job.
   1209     -standalone: Inlcude only jobs with no child or parent jobs.
   1210     At most one of these three fields should be specified.
   1211     """
   1212     extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
   1213                                                     running,
   1214                                                     finished)
   1215     filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
   1216                                                                  suite,
   1217                                                                  sub,
   1218                                                                  standalone)
   1219     job_dicts = []
   1220     jobs = list(models.Job.query_objects(filter_data))
   1221     models.Job.objects.populate_relationships(jobs, models.Label,
   1222                                               'dependencies')
   1223     models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
   1224     for job in jobs:
   1225         job_dict = job.get_object_dict()
   1226         job_dict['dependencies'] = ','.join(label.name
   1227                                             for label in job.dependencies)
   1228         job_dict['keyvals'] = dict((keyval.key, keyval.value)
   1229                                    for keyval in job.keyvals)
   1230         job_dicts.append(job_dict)
   1231     return rpc_utils.prepare_for_serialization(job_dicts)
   1232 
   1233 
   1234 def get_num_jobs(not_yet_run=False, running=False, finished=False,
   1235                  suite=False, sub=False, standalone=False,
   1236                  **filter_data):
   1237     """\
   1238     See get_jobs() for documentation of extra filter parameters.
   1239     """
   1240     extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
   1241                                                     running,
   1242                                                     finished)
   1243     filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
   1244                                                                  suite,
   1245                                                                  sub,
   1246                                                                  standalone)
   1247     return models.Job.query_count(filter_data)
   1248 
   1249 
   1250 def get_jobs_summary(**filter_data):
   1251     """\
   1252     Like get_jobs(), but adds 'status_counts' and 'result_counts' field.
   1253 
   1254     'status_counts' filed is a dictionary mapping status strings to the number
   1255     of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}.
   1256 
   1257     'result_counts' field is piped to tko's rpc_interface and has the return
   1258     format specified under get_group_counts.
   1259     """
   1260     jobs = get_jobs(**filter_data)
   1261     ids = [job['id'] for job in jobs]
   1262     all_status_counts = models.Job.objects.get_status_counts(ids)
   1263     for job in jobs:
   1264         job['status_counts'] = all_status_counts[job['id']]
   1265         job['result_counts'] = tko_rpc_interface.get_status_counts(
   1266                 ['afe_job_id', 'afe_job_id'],
   1267                 header_groups=[['afe_job_id'], ['afe_job_id']],
   1268                 **{'afe_job_id': job['id']})
   1269     return rpc_utils.prepare_for_serialization(jobs)
   1270 
   1271 
   1272 def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):
   1273     """\
   1274     Retrieves all the information needed to clone a job.
   1275     """
   1276     job = models.Job.objects.get(id=id)
   1277     job_info = rpc_utils.get_job_info(job,
   1278                                       preserve_metahosts,
   1279                                       queue_entry_filter_data)
   1280 
   1281     host_dicts = []
   1282     for host in job_info['hosts']:
   1283         host_dict = get_hosts(id=host.id)[0]
   1284         other_labels = host_dict['labels']
   1285         if host_dict['platform']:
   1286             other_labels.remove(host_dict['platform'])
   1287         host_dict['other_labels'] = ', '.join(other_labels)
   1288         host_dicts.append(host_dict)
   1289 
   1290     for host in job_info['one_time_hosts']:
   1291         host_dict = dict(hostname=host.hostname,
   1292                          id=host.id,
   1293                          platform='(one-time host)',
   1294                          locked_text='')
   1295         host_dicts.append(host_dict)
   1296 
   1297     # convert keys from Label objects to strings (names of labels)
   1298     meta_host_counts = dict((meta_host.name, count) for meta_host, count
   1299                             in job_info['meta_host_counts'].iteritems())
   1300 
   1301     info = dict(job=job.get_object_dict(),
   1302                 meta_host_counts=meta_host_counts,
   1303                 hosts=host_dicts)
   1304     info['job']['dependencies'] = job_info['dependencies']
   1305     info['hostless'] = job_info['hostless']
   1306     info['drone_set'] = job.drone_set and job.drone_set.name
   1307 
   1308     image = _get_image_for_job(job, job_info['hostless'])
   1309     if image:
   1310         info['job']['image'] = image
   1311 
   1312     return rpc_utils.prepare_for_serialization(info)
   1313 
   1314 
   1315 def _get_image_for_job(job, hostless):
   1316     """Gets the image used for a job.
   1317 
   1318     Gets the image used for an AFE job from the job's keyvals 'build' or
   1319     'builds'. If that fails, and the job is a hostless job, tries to
   1320     get the image from its control file attributes 'build' or 'builds'.
   1321 
   1322     TODO(ntang): Needs to handle FAFT with two builds for ro/rw.
   1323 
   1324     @param job      An AFE job object.
   1325     @param hostless Boolean indicating whether the job is hostless.
   1326 
   1327     @returns The image build used for the job.
   1328     """
   1329     keyvals = job.keyval_dict()
   1330     image = keyvals.get('build')
   1331     if not image:
   1332         value = keyvals.get('builds')
   1333         builds = None
   1334         if isinstance(value, dict):
   1335             builds = value
   1336         elif isinstance(value, basestring):
   1337             builds = ast.literal_eval(value)
   1338         if builds:
   1339             image = builds.get('cros-version')
   1340     if not image and hostless and job.control_file:
   1341         try:
   1342             control_obj = control_data.parse_control_string(
   1343                     job.control_file)
   1344             if hasattr(control_obj, 'build'):
   1345                 image = getattr(control_obj, 'build')
   1346             if not image and hasattr(control_obj, 'builds'):
   1347                 builds = getattr(control_obj, 'builds')
   1348                 image = builds.get('cros-version')
   1349         except:
   1350             logging.warning('Failed to parse control file for job: %s',
   1351                             job.name)
   1352     return image
   1353 
   1354 
   1355 def get_host_queue_entries_by_insert_time(
   1356     insert_time_after=None, insert_time_before=None, **filter_data):
   1357     """Like get_host_queue_entries, but using the insert index table.
   1358 
   1359     @param insert_time_after: A lower bound on insert_time
   1360     @param insert_time_before: An upper bound on insert_time
   1361     @returns A sequence of nested dictionaries of host and job information.
   1362     """
   1363     assert insert_time_after is not None or insert_time_before is not None, \
   1364       ('Caller to get_host_queue_entries_by_insert_time must provide either'
   1365        ' insert_time_after or insert_time_before.')
   1366     # Get insert bounds on the index of the host queue entries.
   1367     if insert_time_after:
   1368         query = models.HostQueueEntryStartTimes.objects.filter(
   1369             # Note: '-insert_time' means descending. We want the largest
   1370             # insert time smaller than the insert time.
   1371             insert_time__lte=insert_time_after).order_by('-insert_time')
   1372         try:
   1373             constraint = query[0].highest_hqe_id
   1374             if 'id__gte' in filter_data:
   1375                 constraint = max(constraint, filter_data['id__gte'])
   1376             filter_data['id__gte'] = constraint
   1377         except IndexError:
   1378             pass
   1379 
   1380     # Get end bounds.
   1381     if insert_time_before:
   1382         query = models.HostQueueEntryStartTimes.objects.filter(
   1383             insert_time__gte=insert_time_before).order_by('insert_time')
   1384         try:
   1385             constraint = query[0].highest_hqe_id
   1386             if 'id__lte' in filter_data:
   1387                 constraint = min(constraint, filter_data['id__lte'])
   1388             filter_data['id__lte'] = constraint
   1389         except IndexError:
   1390             pass
   1391 
   1392     return rpc_utils.prepare_rows_as_nested_dicts(
   1393             models.HostQueueEntry.query_objects(filter_data),
   1394             ('host', 'job'))
   1395 
   1396 
   1397 def get_host_queue_entries(start_time=None, end_time=None, **filter_data):
   1398     """\
   1399     @returns A sequence of nested dictionaries of host and job information.
   1400     """
   1401     filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
   1402                                                    'started_on__lte',
   1403                                                    start_time,
   1404                                                    end_time,
   1405                                                    **filter_data)
   1406     return rpc_utils.prepare_rows_as_nested_dicts(
   1407             models.HostQueueEntry.query_objects(filter_data),
   1408             ('host', 'job'))
   1409 
   1410 
   1411 def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
   1412     """\
   1413     Get the number of host queue entries associated with this job.
   1414     """
   1415     filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
   1416                                                    'started_on__lte',
   1417                                                    start_time,
   1418                                                    end_time,
   1419                                                    **filter_data)
   1420     return models.HostQueueEntry.query_count(filter_data)
   1421 
   1422 
   1423 def get_hqe_percentage_complete(**filter_data):
   1424     """
   1425     Computes the fraction of host queue entries matching the given filter data
   1426     that are complete.
   1427     """
   1428     query = models.HostQueueEntry.query_objects(filter_data)
   1429     complete_count = query.filter(complete=True).count()
   1430     total_count = query.count()
   1431     if total_count == 0:
   1432         return 1
   1433     return float(complete_count) / total_count
   1434 
   1435 
   1436 # special tasks
   1437 
   1438 def get_special_tasks(**filter_data):
   1439     """Get special task entries from the local database.
   1440 
   1441     Query the special tasks table for tasks matching the given
   1442     `filter_data`, and return a list of the results.  No attempt is
   1443     made to forward the call to shards; the buck will stop here.
   1444     The caller is expected to know the target shard for such reasons
   1445     as:
   1446       * The caller is a service (such as gs_offloader) configured
   1447         to operate on behalf of one specific shard, and no other.
   1448       * The caller has a host as a parameter, and knows that this is
   1449         the shard assigned to that host.
   1450 
   1451     @param filter_data  Filter keywords to pass to the underlying
   1452                         database query.
   1453 
   1454     """
   1455     return rpc_utils.prepare_rows_as_nested_dicts(
   1456             models.SpecialTask.query_objects(filter_data),
   1457             ('host', 'queue_entry'))
   1458 
   1459 
   1460 def get_host_special_tasks(host_id, **filter_data):
   1461     """Get special task entries for a given host.
   1462 
   1463     Query the special tasks table for tasks that ran on the host
   1464     given by `host_id` and matching the given `filter_data`.
   1465     Return a list of the results.  If the host is assigned to a
   1466     shard, forward this call to that shard.
   1467 
   1468     @param host_id      Id in the database of the target host.
   1469     @param filter_data  Filter keywords to pass to the underlying
   1470                         database query.
   1471 
   1472     """
   1473     # Retrieve host data even if the host is in an invalid state.
   1474     host = models.Host.smart_get(host_id, False)
   1475     if not host.shard:
   1476         return get_special_tasks(host_id=host_id, **filter_data)
   1477     else:
   1478         # The return values from AFE methods are post-processed
   1479         # objects that aren't JSON-serializable.  So, we have to
   1480         # call AFE.run() to get the raw, serializable output from
   1481         # the shard.
   1482         shard_afe = frontend.AFE(server=host.shard.hostname)
   1483         return shard_afe.run('get_special_tasks',
   1484                              host_id=host_id, **filter_data)
   1485 
   1486 
   1487 def get_num_special_tasks(**kwargs):
   1488     """Get the number of special task entries from the local database.
   1489 
   1490     Query the special tasks table for tasks matching the given 'kwargs',
   1491     and return the number of the results. No attempt is made to forward
   1492     the call to shards; the buck will stop here.
   1493 
   1494     @param kwargs    Filter keywords to pass to the underlying database query.
   1495 
   1496     """
   1497     return models.SpecialTask.query_count(kwargs)
   1498 
   1499 
   1500 def get_host_num_special_tasks(host, **kwargs):
   1501     """Get special task entries for a given host.
   1502 
   1503     Query the special tasks table for tasks that ran on the host
   1504     given by 'host' and matching the given 'kwargs'.
   1505     Return a list of the results.  If the host is assigned to a
   1506     shard, forward this call to that shard.
   1507 
   1508     @param host      id or name of a host. More often a hostname.
   1509     @param kwargs    Filter keywords to pass to the underlying database query.
   1510 
   1511     """
   1512     # Retrieve host data even if the host is in an invalid state.
   1513     host_model = models.Host.smart_get(host, False)
   1514     if not host_model.shard:
   1515         return get_num_special_tasks(host=host, **kwargs)
   1516     else:
   1517         shard_afe = frontend.AFE(server=host_model.shard.hostname)
   1518         return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
   1519 
   1520 
   1521 def get_status_task(host_id, end_time):
   1522     """Get the "status task" for a host from the local shard.
   1523 
   1524     Returns a single special task representing the given host's
   1525     "status task".  The status task is a completed special task that
   1526     identifies whether the corresponding host was working or broken
   1527     when it completed.  A successful task indicates a working host;
   1528     a failed task indicates broken.
   1529 
   1530     This call will not be forward to a shard; the receiving server
   1531     must be the shard that owns the host.
   1532 
   1533     @param host_id      Id in the database of the target host.
   1534     @param end_time     Time reference for the host's status.
   1535 
   1536     @return A single task; its status (successful or not)
   1537             corresponds to the status of the host (working or
   1538             broken) at the given time.  If no task is found, return
   1539             `None`.
   1540 
   1541     """
   1542     tasklist = rpc_utils.prepare_rows_as_nested_dicts(
   1543             status_history.get_status_task(host_id, end_time),
   1544             ('host', 'queue_entry'))
   1545     return tasklist[0] if tasklist else None
   1546 
   1547 
   1548 def get_host_status_task(host_id, end_time):
   1549     """Get the "status task" for a host from its owning shard.
   1550 
   1551     Finds the given host's owning shard, and forwards to it a call
   1552     to `get_status_task()` (see above).
   1553 
   1554     @param host_id      Id in the database of the target host.
   1555     @param end_time     Time reference for the host's status.
   1556 
   1557     @return A single task; its status (successful or not)
   1558             corresponds to the status of the host (working or
   1559             broken) at the given time.  If no task is found, return
   1560             `None`.
   1561 
   1562     """
   1563     host = models.Host.smart_get(host_id)
   1564     if not host.shard:
   1565         return get_status_task(host_id, end_time)
   1566     else:
   1567         # The return values from AFE methods are post-processed
   1568         # objects that aren't JSON-serializable.  So, we have to
   1569         # call AFE.run() to get the raw, serializable output from
   1570         # the shard.
   1571         shard_afe = frontend.AFE(server=host.shard.hostname)
   1572         return shard_afe.run('get_status_task',
   1573                              host_id=host_id, end_time=end_time)
   1574 
   1575 
   1576 def get_host_diagnosis_interval(host_id, end_time, success):
   1577     """Find a "diagnosis interval" for a given host.
   1578 
   1579     A "diagnosis interval" identifies a start and end time where
   1580     the host went from "working" to "broken", or vice versa.  The
   1581     interval's starting time is the starting time of the last status
   1582     task with the old status; the end time is the finish time of the
   1583     first status task with the new status.
   1584 
   1585     This routine finds the most recent diagnosis interval for the
   1586     given host prior to `end_time`, with a starting status matching
   1587     `success`.  If `success` is true, the interval will start with a
   1588     successful status task; if false the interval will start with a
   1589     failed status task.
   1590 
   1591     @param host_id      Id in the database of the target host.
   1592     @param end_time     Time reference for the diagnosis interval.
   1593     @param success      Whether the diagnosis interval should start
   1594                         with a successful or failed status task.
   1595 
   1596     @return A list of two strings.  The first is the timestamp for
   1597             the beginning of the interval; the second is the
   1598             timestamp for the end.  If the host has never changed
   1599             state, the list is empty.
   1600 
   1601     """
   1602     host = models.Host.smart_get(host_id)
   1603     if not host.shard or utils.is_shard():
   1604         return status_history.get_diagnosis_interval(
   1605                 host_id, end_time, success)
   1606     else:
   1607         shard_afe = frontend.AFE(server=host.shard.hostname)
   1608         return shard_afe.get_host_diagnosis_interval(
   1609                 host_id, end_time, success)
   1610 
   1611 
   1612 # support for host detail view
   1613 
   1614 def get_host_queue_entries_and_special_tasks(host, query_start=None,
   1615                                              query_limit=None, start_time=None,
   1616                                              end_time=None):
   1617     """
   1618     @returns an interleaved list of HostQueueEntries and SpecialTasks,
   1619             in approximate run order.  each dict contains keys for type, host,
   1620             job, status, started_on, execution_path, and ID.
   1621     """
   1622     total_limit = None
   1623     if query_limit is not None:
   1624         total_limit = query_start + query_limit
   1625     filter_data_common = {'host': host,
   1626                           'query_limit': total_limit,
   1627                           'sort_by': ['-id']}
   1628 
   1629     filter_data_special_tasks = rpc_utils.inject_times_to_filter(
   1630             'time_started__gte', 'time_started__lte', start_time, end_time,
   1631             **filter_data_common)
   1632 
   1633     queue_entries = get_host_queue_entries(
   1634             start_time, end_time, **filter_data_common)
   1635     special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
   1636 
   1637     interleaved_entries = rpc_utils.interleave_entries(queue_entries,
   1638                                                        special_tasks)
   1639     if query_start is not None:
   1640         interleaved_entries = interleaved_entries[query_start:]
   1641     if query_limit is not None:
   1642         interleaved_entries = interleaved_entries[:query_limit]
   1643     return rpc_utils.prepare_host_queue_entries_and_special_tasks(
   1644             interleaved_entries, queue_entries)
   1645 
   1646 
   1647 def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
   1648                                                  end_time=None):
   1649     filter_data_common = {'host': host}
   1650 
   1651     filter_data_queue_entries, filter_data_special_tasks = (
   1652             rpc_utils.inject_times_to_hqe_special_tasks_filters(
   1653                     filter_data_common, start_time, end_time))
   1654 
   1655     return (models.HostQueueEntry.query_count(filter_data_queue_entries)
   1656             + get_host_num_special_tasks(**filter_data_special_tasks))
   1657 
   1658 
   1659 # other
   1660 
   1661 def echo(data=""):
   1662     """\
   1663     Returns a passed in string. For doing a basic test to see if RPC calls
   1664     can successfully be made.
   1665     """
   1666     return data
   1667 
   1668 
   1669 def get_motd():
   1670     """\
   1671     Returns the message of the day as a string.
   1672     """
   1673     return rpc_utils.get_motd()
   1674 
   1675 
   1676 def get_static_data():
   1677     """\
   1678     Returns a dictionary containing a bunch of data that shouldn't change
   1679     often and is otherwise inaccessible.  This includes:
   1680 
   1681     priorities: List of job priority choices.
   1682     default_priority: Default priority value for new jobs.
   1683     users: Sorted list of all users.
   1684     labels: Sorted list of labels not start with 'cros-version' and
   1685             'fw-version'.
   1686     tests: Sorted list of all tests.
   1687     profilers: Sorted list of all profilers.
   1688     current_user: Logged-in username.
   1689     host_statuses: Sorted list of possible Host statuses.
   1690     job_statuses: Sorted list of possible HostQueueEntry statuses.
   1691     job_timeout_default: The default job timeout length in minutes.
   1692     parse_failed_repair_default: Default value for the parse_failed_repair job
   1693             option.
   1694     reboot_before_options: A list of valid RebootBefore string enums.
   1695     reboot_after_options: A list of valid RebootAfter string enums.
   1696     motd: Server's message of the day.
   1697     status_dictionary: A mapping from one word job status names to a more
   1698             informative description.
   1699     """
   1700 
   1701     default_drone_set_name = models.DroneSet.default_drone_set_name()
   1702     drone_sets = ([default_drone_set_name] +
   1703                   sorted(drone_set.name for drone_set in
   1704                          models.DroneSet.objects.exclude(
   1705                                  name=default_drone_set_name)))
   1706 
   1707     result = {}
   1708     result['priorities'] = priorities.Priority.choices()
   1709     result['default_priority'] = 'Default'
   1710     result['max_schedulable_priority'] = priorities.Priority.DEFAULT
   1711     result['users'] = get_users(sort_by=['login'])
   1712 
   1713     label_exclude_filters = [{'name__startswith': 'cros-version'},
   1714                              {'name__startswith': 'fw-version'},
   1715                              {'name__startswith': 'fwrw-version'},
   1716                              {'name__startswith': 'fwro-version'},
   1717                              {'name__startswith': 'ab-version'},
   1718                              {'name__startswith': 'testbed-version'}]
   1719     result['labels'] = get_labels(
   1720         label_exclude_filters,
   1721         sort_by=['-platform', 'name'])
   1722 
   1723     result['tests'] = get_tests(sort_by=['name'])
   1724     result['profilers'] = get_profilers(sort_by=['name'])
   1725     result['current_user'] = rpc_utils.prepare_for_serialization(
   1726         models.User.current_user().get_object_dict())
   1727     result['host_statuses'] = sorted(models.Host.Status.names)
   1728     result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
   1729     result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS
   1730     result['job_max_runtime_mins_default'] = (
   1731         models.Job.DEFAULT_MAX_RUNTIME_MINS)
   1732     result['parse_failed_repair_default'] = bool(
   1733         models.Job.DEFAULT_PARSE_FAILED_REPAIR)
   1734     result['reboot_before_options'] = model_attributes.RebootBefore.names
   1735     result['reboot_after_options'] = model_attributes.RebootAfter.names
   1736     result['motd'] = rpc_utils.get_motd()
   1737     result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()
   1738     result['drone_sets'] = drone_sets
   1739 
   1740     result['status_dictionary'] = {"Aborted": "Aborted",
   1741                                    "Verifying": "Verifying Host",
   1742                                    "Provisioning": "Provisioning Host",
   1743                                    "Pending": "Waiting on other hosts",
   1744                                    "Running": "Running autoserv",
   1745                                    "Completed": "Autoserv completed",
   1746                                    "Failed": "Failed to complete",
   1747                                    "Queued": "Queued",
   1748                                    "Starting": "Next in host's queue",
   1749                                    "Stopped": "Other host(s) failed verify",
   1750                                    "Parsing": "Awaiting parse of final results",
   1751                                    "Gathering": "Gathering log files",
   1752                                    "Waiting": "Waiting for scheduler action",
   1753                                    "Archiving": "Archiving results",
   1754                                    "Resetting": "Resetting hosts"}
   1755 
   1756     result['wmatrix_url'] = rpc_utils.get_wmatrix_url()
   1757     result['stainless_url'] = rpc_utils.get_stainless_url()
   1758     result['is_moblab'] = bool(utils.is_moblab())
   1759 
   1760     return result
   1761 
   1762 
   1763 def get_server_time():
   1764     return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
   1765 
   1766 
   1767 def ping_db():
   1768     """Simple connection test to db"""
   1769     try:
   1770         db_connection.cursor()
   1771     except DatabaseError:
   1772         return [False]
   1773     return [True]
   1774 
   1775 
   1776 def get_hosts_by_attribute(attribute, value):
   1777     """
   1778     Get the list of valid hosts that share the same host attribute value.
   1779 
   1780     @param attribute: String of the host attribute to check.
   1781     @param value: String of the value that is shared between hosts.
   1782 
   1783     @returns List of hostnames that all have the same host attribute and
   1784              value.
   1785     """
   1786     rows = models.HostAttribute.query_objects({'attribute': attribute,
   1787                                                'value': value})
   1788     if RESPECT_STATIC_ATTRIBUTES:
   1789         returned_hosts = set()
   1790         # Add hosts:
   1791         #     * Non-valid
   1792         #     * Exist in afe_host_attribute with given attribute.
   1793         #     * Don't exist in afe_static_host_attribute OR exist in
   1794         #       afe_static_host_attribute with the same given value.
   1795         for row in rows:
   1796             if row.host.invalid != 0:
   1797                 continue
   1798 
   1799             static_hosts = models.StaticHostAttribute.query_objects(
   1800                 {'host_id': row.host.id, 'attribute': attribute})
   1801             values = [static_host.value for static_host in static_hosts]
   1802             if len(values) == 0 or values[0] == value:
   1803                 returned_hosts.add(row.host.hostname)
   1804 
   1805         # Add hosts:
   1806         #     * Non-valid
   1807         #     * Exist in afe_static_host_attribute with given attribute
   1808         #       and value
   1809         #     * No need to check whether each static attribute has its
   1810         #       corresponding entry in afe_host_attribute since it is ensured
   1811         #       in inventory sync.
   1812         static_rows = models.StaticHostAttribute.query_objects(
   1813                 {'attribute': attribute, 'value': value})
   1814         for row in static_rows:
   1815             if row.host.invalid != 0:
   1816                 continue
   1817 
   1818             returned_hosts.add(row.host.hostname)
   1819 
   1820         return list(returned_hosts)
   1821     else:
   1822         return [row.host.hostname for row in rows if row.host.invalid == 0]
   1823 
   1824 
   1825 def canonicalize_suite_name(suite_name):
   1826     """Canonicalize the suite's name.
   1827 
   1828     @param suite_name: the name of the suite.
   1829     """
   1830     # Do not change this naming convention without updating
   1831     # site_utils.parse_job_name.
   1832     return 'test_suites/control.%s' % suite_name
   1833 
   1834 
   1835 def formatted_now():
   1836     """Format the current datetime."""
   1837     return datetime.datetime.now().strftime(time_utils.TIME_FMT)
   1838 
   1839 
   1840 def _get_control_file_by_build(build, ds, suite_name):
   1841     """Return control file contents for |suite_name|.
   1842 
   1843     Query the dev server at |ds| for the control file |suite_name|, included
   1844     in |build| for |board|.
   1845 
   1846     @param build: unique name by which to refer to the image from now on.
   1847     @param ds: a dev_server.DevServer instance to fetch control file with.
   1848     @param suite_name: canonicalized suite name, e.g. test_suites/control.bvt.
   1849     @raises ControlFileNotFound if a unique suite control file doesn't exist.
   1850     @raises NoControlFileList if we can't list the control files at all.
   1851     @raises ControlFileEmpty if the control file exists on the server, but
   1852                              can't be read.
   1853 
   1854     @return the contents of the desired control file.
   1855     """
   1856     getter = control_file_getter.DevServerGetter.create(build, ds)
   1857     devserver_name = ds.hostname
   1858     # Get the control file for the suite.
   1859     try:
   1860         control_file_in = getter.get_control_file_contents_by_name(suite_name)
   1861     except error.CrosDynamicSuiteException as e:
   1862         raise type(e)('Failed to get control file for %s '
   1863                       '(devserver: %s) (error: %s)' %
   1864                       (build, devserver_name, e))
   1865     if not control_file_in:
   1866         raise error.ControlFileEmpty(
   1867             "Fetching %s returned no data. (devserver: %s)" %
   1868             (suite_name, devserver_name))
   1869     # Force control files to only contain ascii characters.
   1870     try:
   1871         control_file_in.encode('ascii')
   1872     except UnicodeDecodeError as e:
   1873         raise error.ControlFileMalformed(str(e))
   1874 
   1875     return control_file_in
   1876 
   1877 
   1878 def _get_control_file_by_suite(suite_name):
   1879     """Get control file contents by suite name.
   1880 
   1881     @param suite_name: Suite name as string.
   1882     @returns: Control file contents as string.
   1883     """
   1884     getter = control_file_getter.FileSystemGetter(
   1885             [_CONFIG.get_config_value('SCHEDULER',
   1886                                       'drone_installation_directory')])
   1887     return getter.get_control_file_contents_by_name(suite_name)
   1888 
   1889 
   1890 def _stage_build_artifacts(build, hostname=None):
   1891     """
   1892     Ensure components of |build| necessary for installing images are staged.
   1893 
   1894     @param build image we want to stage.
   1895     @param hostname hostname of a dut may run test on. This is to help to locate
   1896         a devserver closer to duts if needed. Default is None.
   1897 
   1898     @raises StageControlFileFailure: if the dev server throws 500 while staging
   1899         suite control files.
   1900 
   1901     @return: dev_server.ImageServer instance to use with this build.
   1902     @return: timings dictionary containing staging start/end times.
   1903     """
   1904     timings = {}
   1905     # Ensure components of |build| necessary for installing images are staged
   1906     # on the dev server. However set synchronous to False to allow other
   1907     # components to be downloaded in the background.
   1908     ds = dev_server.resolve(build, hostname=hostname)
   1909     ds_name = ds.hostname
   1910     timings[constants.DOWNLOAD_STARTED_TIME] = formatted_now()
   1911     try:
   1912         ds.stage_artifacts(image=build, artifacts=['test_suites'])
   1913     except dev_server.DevServerException as e:
   1914         raise error.StageControlFileFailure(
   1915                 "Failed to stage %s on %s: %s" % (build, ds_name, e))
   1916     timings[constants.PAYLOAD_FINISHED_TIME] = formatted_now()
   1917     return (ds, timings)
   1918 
   1919 
   1920 @rpc_utils.route_rpc_to_master
   1921 def create_suite_job(
   1922         name='',
   1923         board='',
   1924         pool='',
   1925         child_dependencies=(),
   1926         control_file='',
   1927         check_hosts=True,
   1928         num=None,
   1929         file_bugs=False,
   1930         timeout=24,
   1931         timeout_mins=None,
   1932         priority=priorities.Priority.DEFAULT,
   1933         suite_args=None,
   1934         wait_for_results=True,
   1935         job_retry=False,
   1936         max_retries=None,
   1937         max_runtime_mins=None,
   1938         suite_min_duts=0,
   1939         offload_failures_only=False,
   1940         builds=None,
   1941         test_source_build=None,
   1942         run_prod_code=False,
   1943         delay_minutes=0,
   1944         is_cloning=False,
   1945         job_keyvals=None,
   1946         test_args=None,
   1947         **kwargs):
   1948     """
   1949     Create a job to run a test suite on the given device with the given image.
   1950 
   1951     When the timeout specified in the control file is reached, the
   1952     job is guaranteed to have completed and results will be available.
   1953 
   1954     @param name: The test name if control_file is supplied, otherwise the name
   1955                  of the test suite to run, e.g. 'bvt'.
   1956     @param board: the kind of device to run the tests on.
   1957     @param builds: the builds to install e.g.
   1958                    {'cros-version:': 'x86-alex-release/R18-1655.0.0',
   1959                     'fwrw-version:':  'x86-alex-firmware/R36-5771.50.0',
   1960                     'fwro-version:':  'x86-alex-firmware/R36-5771.49.0'}
   1961                    If builds is given a value, it overrides argument build.
   1962     @param test_source_build: Build that contains the server-side test code.
   1963     @param pool: Specify the pool of machines to use for scheduling
   1964             purposes.
   1965     @param child_dependencies: (optional) list of additional dependency labels
   1966             (strings) that will be added as dependency labels to child jobs.
   1967     @param control_file: the control file of the job.
   1968     @param check_hosts: require appropriate live hosts to exist in the lab.
   1969     @param num: Specify the number of machines to schedule across (integer).
   1970                 Leave unspecified or use None to use default sharding factor.
   1971     @param file_bugs: File a bug on each test failure in this suite.
   1972     @param timeout: The max lifetime of this suite, in hours.
   1973     @param timeout_mins: The max lifetime of this suite, in minutes. Takes
   1974                          priority over timeout.
   1975     @param priority: Integer denoting priority. Higher is more important.
   1976     @param suite_args: Optional arguments which will be parsed by the suite
   1977                        control file. Used by control.test_that_wrapper to
   1978                        determine which tests to run.
   1979     @param wait_for_results: Set to False to run the suite job without waiting
   1980                              for test jobs to finish. Default is True.
   1981     @param job_retry: Set to True to enable job-level retry. Default is False.
   1982     @param max_retries: Integer, maximum job retries allowed at suite level.
   1983                         None for no max.
   1984     @param max_runtime_mins: Maximum amount of time a job can be running in
   1985                              minutes.
   1986     @param suite_min_duts: Integer. Scheduler will prioritize getting the
   1987                            minimum number of machines for the suite when it is
   1988                            competing with another suite that has a higher
   1989                            priority but already got minimum machines it needs.
   1990     @param offload_failures_only: Only enable gs_offloading for failed jobs.
   1991     @param run_prod_code: If True, the suite will run the test code that
   1992                           lives in prod aka the test code currently on the
   1993                           lab servers. If False, the control files and test
   1994                           code for this suite run will be retrieved from the
   1995                           build artifacts.
   1996     @param delay_minutes: Delay the creation of test jobs for a given number of
   1997                           minutes.
   1998     @param is_cloning: True if creating a cloning job.
   1999     @param job_keyvals: A dict of job keyvals to be inject to control file.
   2000     @param test_args: A dict of args passed all the way to each individual test
   2001                       that will be actually run.
   2002     @param kwargs: extra keyword args. NOT USED.
   2003 
   2004     @raises ControlFileNotFound: if a unique suite control file doesn't exist.
   2005     @raises NoControlFileList: if we can't list the control files at all.
   2006     @raises StageControlFileFailure: If the dev server throws 500 while
   2007                                      staging test_suites.
   2008     @raises ControlFileEmpty: if the control file exists on the server, but
   2009                               can't be read.
   2010 
   2011     @return: the job ID of the suite; -1 on error.
   2012     """
   2013     if num is not None:
   2014         warnings.warn('num is deprecated for create_suite_job')
   2015     del num
   2016 
   2017     if builds is None:
   2018         builds = {}
   2019 
   2020     # Default test source build to CrOS build if it's not specified and
   2021     # run_prod_code is set to False.
   2022     if not run_prod_code:
   2023         test_source_build = Suite.get_test_source_build(
   2024                 builds, test_source_build=test_source_build)
   2025 
   2026     sample_dut = rpc_utils.get_sample_dut(board, pool)
   2027 
   2028     suite_name = canonicalize_suite_name(name)
   2029     if run_prod_code:
   2030         ds = dev_server.resolve(test_source_build, hostname=sample_dut)
   2031         keyvals = {}
   2032     else:
   2033         (ds, keyvals) = _stage_build_artifacts(
   2034                 test_source_build, hostname=sample_dut)
   2035     keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts
   2036 
   2037     # Do not change this naming convention without updating
   2038     # site_utils.parse_job_name.
   2039     if run_prod_code:
   2040         # If run_prod_code is True, test_source_build is not set, use the
   2041         # first build in the builds list for the sutie job name.
   2042         name = '%s-%s' % (builds.values()[0], suite_name)
   2043     else:
   2044         name = '%s-%s' % (test_source_build, suite_name)
   2045 
   2046     timeout_mins = timeout_mins or timeout * 60
   2047     max_runtime_mins = max_runtime_mins or timeout * 60
   2048 
   2049     if not board:
   2050         board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0]
   2051 
   2052     if run_prod_code:
   2053         control_file = _get_control_file_by_suite(suite_name)
   2054 
   2055     if not control_file:
   2056         # No control file was supplied so look it up from the build artifacts.
   2057         control_file = _get_control_file_by_build(
   2058                 test_source_build, ds, suite_name)
   2059 
   2060     # Prepend builds and board to the control file.
   2061     if is_cloning:
   2062         control_file = tools.remove_injection(control_file)
   2063 
   2064     if suite_args is None:
   2065         suite_args = dict()
   2066 
   2067     # TODO(crbug.com/758427): suite_args_raw is needed to run old tests.
   2068     # Can be removed after R64.
   2069     if 'tests' in suite_args:
   2070         # TODO(crbug.com/758427): test_that used to have its own
   2071         # snowflake implementation of parsing command line arguments in
   2072         # the test
   2073         suite_args_raw = ' '.join([':lab:'] + suite_args['tests'])
   2074     # TODO(crbug.com/760675): Needed for CTS/GTS as above, but when
   2075     # 'tests' is not passed.  Can be removed after R64.
   2076     elif name.rpartition('/')[-1] in {'control.cts_N',
   2077                                       'control.cts_N_preconditions',
   2078                                       'control.gts'}:
   2079         suite_args_raw = ''
   2080     else:
   2081         # TODO(crbug.com/758427): This is for suite_attr_wrapper.  Can
   2082         # be removed after R64.
   2083         suite_args_raw = repr(suite_args)
   2084 
   2085     inject_dict = {
   2086         'board': board,
   2087         # `build` is needed for suites like AU to stage image inside suite
   2088         # control file.
   2089         'build': test_source_build,
   2090         'builds': builds,
   2091         'check_hosts': check_hosts,
   2092         'pool': pool,
   2093         'child_dependencies': child_dependencies,
   2094         'file_bugs': file_bugs,
   2095         'timeout': timeout,
   2096         'timeout_mins': timeout_mins,
   2097         'devserver_url': ds.url(),
   2098         'priority': priority,
   2099         # TODO(crbug.com/758427): injecting suite_args is needed to run
   2100         # old tests
   2101         'suite_args' : suite_args_raw,
   2102         'wait_for_results': wait_for_results,
   2103         'job_retry': job_retry,
   2104         'max_retries': max_retries,
   2105         'max_runtime_mins': max_runtime_mins,
   2106         'offload_failures_only': offload_failures_only,
   2107         'test_source_build': test_source_build,
   2108         'run_prod_code': run_prod_code,
   2109         'delay_minutes': delay_minutes,
   2110         'job_keyvals': job_keyvals,
   2111         'test_args': test_args,
   2112     }
   2113     inject_dict.update(suite_args)
   2114     control_file = tools.inject_vars(inject_dict, control_file)
   2115 
   2116     return rpc_utils.create_job_common(name,
   2117                                        priority=priority,
   2118                                        timeout_mins=timeout_mins,
   2119                                        max_runtime_mins=max_runtime_mins,
   2120                                        control_type='Server',
   2121                                        control_file=control_file,
   2122                                        hostless=True,
   2123                                        keyvals=keyvals)
   2124 
   2125 
   2126 def get_job_history(**filter_data):
   2127     """Get history of the job, including the special tasks executed for the job
   2128 
   2129     @param filter_data: filter for the call, should at least include
   2130                         {'job_id': [job id]}
   2131     @returns: JSON string of the job's history, including the information such
   2132               as the hosts run the job and the special tasks executed before
   2133               and after the job.
   2134     """
   2135     job_id = filter_data['job_id']
   2136     job_info = job_history.get_job_info(job_id)
   2137     return rpc_utils.prepare_for_serialization(job_info.get_history())
   2138 
   2139 
   2140 def get_host_history(start_time, end_time, hosts=None, board=None, pool=None):
   2141     """Deprecated."""
   2142     raise ValueError('get_host_history rpc is deprecated '
   2143                      'and no longer implemented.')
   2144 
   2145 
   2146 def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(),
   2147                     known_host_ids=(), known_host_statuses=()):
   2148     """Receive updates for job statuses from shards and assign hosts and jobs.
   2149 
   2150     @param shard_hostname: Hostname of the calling shard
   2151     @param jobs: Jobs in serialized form that should be updated with newer
   2152                  status from a shard.
   2153     @param hqes: Hostqueueentries in serialized form that should be updated with
   2154                  newer status from a shard. Note that for every hostqueueentry
   2155                  the corresponding job must be in jobs.
   2156     @param known_job_ids: List of ids of jobs the shard already has.
   2157     @param known_host_ids: List of ids of hosts the shard already has.
   2158     @param known_host_statuses: List of statuses of hosts the shard already has.
   2159 
   2160     @returns: Serialized representations of hosts, jobs, suite job keyvals
   2161               and their dependencies to be inserted into a shard's database.
   2162     """
   2163     # The following alternatives to sending host and job ids in every heartbeat
   2164     # have been considered:
   2165     # 1. Sending the highest known job and host ids. This would work for jobs:
   2166     #    Newer jobs always have larger ids. Also, if a job is not assigned to a
   2167     #    particular shard during a heartbeat, it never will be assigned to this
   2168     #    shard later.
   2169     #    This is not true for hosts though: A host that is leased won't be sent
   2170     #    to the shard now, but might be sent in a future heartbeat. This means
   2171     #    sometimes hosts should be transfered that have a lower id than the
   2172     #    maximum host id the shard knows.
   2173     # 2. Send the number of jobs/hosts the shard knows to the master in each
   2174     #    heartbeat. Compare these to the number of records that already have
   2175     #    the shard_id set to this shard. In the normal case, they should match.
   2176     #    In case they don't, resend all entities of that type.
   2177     #    This would work well for hosts, because there aren't that many.
   2178     #    Resending all jobs is quite a big overhead though.
   2179     #    Also, this approach might run into edge cases when entities are
   2180     #    ever deleted.
   2181     # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts.
   2182     #    Using two different approaches isn't consistent and might cause
   2183     #    confusion. Also the issues with the case of deletions might still
   2184     #    occur.
   2185     #
   2186     # The overhead of sending all job and host ids in every heartbeat is low:
   2187     # At peaks one board has about 1200 created but unfinished jobs.
   2188     # See the numbers here: http://goo.gl/gQCGWH
   2189     # Assuming that job id's have 6 digits and that json serialization takes a
   2190     # comma and a space as overhead, the traffic per id sent is about 8 bytes.
   2191     # If 5000 ids need to be sent, this means 40 kilobytes of traffic.
   2192     # A NOT IN query with 5000 ids took about 30ms in tests made.
   2193     # These numbers seem low enough to outweigh the disadvantages of the
   2194     # solutions described above.
   2195     shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
   2196     rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
   2197     assert len(known_host_ids) == len(known_host_statuses)
   2198     for i in range(len(known_host_ids)):
   2199         host_model = models.Host.objects.get(pk=known_host_ids[i])
   2200         if host_model.status != known_host_statuses[i]:
   2201             host_model.status = known_host_statuses[i]
   2202             host_model.save()
   2203 
   2204     hosts, jobs, suite_keyvals, inc_ids = rpc_utils.find_records_for_shard(
   2205             shard_obj, known_job_ids=known_job_ids,
   2206             known_host_ids=known_host_ids)
   2207     return {
   2208         'hosts': [host.serialize() for host in hosts],
   2209         'jobs': [job.serialize() for job in jobs],
   2210         'suite_keyvals': [kv.serialize() for kv in suite_keyvals],
   2211         'incorrect_host_ids': [int(i) for i in inc_ids],
   2212     }
   2213 
   2214 
   2215 def get_shards(**filter_data):
   2216     """Return a list of all shards.
   2217 
   2218     @returns A sequence of nested dictionaries of shard information.
   2219     """
   2220     shards = models.Shard.query_objects(filter_data)
   2221     serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ())
   2222     for serialized, shard in zip(serialized_shards, shards):
   2223         serialized['labels'] = [label.name for label in shard.labels.all()]
   2224 
   2225     return serialized_shards
   2226 
   2227 
   2228 def _assign_board_to_shard_precheck(labels):
   2229     """Verify whether board labels are valid to be added to a given shard.
   2230 
   2231     First check whether board label is in correct format. Second, check whether
   2232     the board label exist. Third, check whether the board has already been
   2233     assigned to shard.
   2234 
   2235     @param labels: Board labels separated by comma.
   2236 
   2237     @raises error.RPCException: If label provided doesn't start with `board:`
   2238             or board has been added to shard already.
   2239     @raises models.Label.DoesNotExist: If the label specified doesn't exist.
   2240 
   2241     @returns: A list of label models that ready to be added to shard.
   2242     """
   2243     if not labels:
   2244       # allow creation of label-less shards (labels='' would otherwise fail the
   2245       # checks below)
   2246       return []
   2247     labels = labels.split(',')
   2248     label_models = []
   2249     for label in labels:
   2250         # Check whether the board label is in correct format.
   2251         if not label.startswith('board:'):
   2252             raise error.RPCException('Sharding only supports `board:.*` label.')
   2253         # Check whether the board label exist. If not, exception will be thrown
   2254         # by smart_get function.
   2255         label = models.Label.smart_get(label)
   2256         # Check whether the board has been sharded already
   2257         try:
   2258             shard = models.Shard.objects.get(labels=label)
   2259             raise error.RPCException(
   2260                     '%s is already on shard %s' % (label, shard.hostname))
   2261         except models.Shard.DoesNotExist:
   2262             # board is not on any shard, so it's valid.
   2263             label_models.append(label)
   2264     return label_models
   2265 
   2266 
   2267 def add_shard(hostname, labels):
   2268     """Add a shard and start running jobs on it.
   2269 
   2270     @param hostname: The hostname of the shard to be added; needs to be unique.
   2271     @param labels: Board labels separated by comma. Jobs of one of the labels
   2272                    will be assigned to the shard.
   2273 
   2274     @raises error.RPCException: If label provided doesn't start with `board:` or
   2275             board has been added to shard already.
   2276     @raises model_logic.ValidationError: If a shard with the given hostname
   2277             already exist.
   2278     @raises models.Label.DoesNotExist: If the label specified doesn't exist.
   2279 
   2280     @returns: The id of the added shard.
   2281     """
   2282     labels = _assign_board_to_shard_precheck(labels)
   2283     shard = models.Shard.add_object(hostname=hostname)
   2284     for label in labels:
   2285         shard.labels.add(label)
   2286     return shard.id
   2287 
   2288 
   2289 def add_board_to_shard(hostname, labels):
   2290     """Add boards to a given shard
   2291 
   2292     @param hostname: The hostname of the shard to be changed.
   2293     @param labels: Board labels separated by comma.
   2294 
   2295     @raises error.RPCException: If label provided doesn't start with `board:` or
   2296             board has been added to shard already.
   2297     @raises models.Label.DoesNotExist: If the label specified doesn't exist.
   2298 
   2299     @returns: The id of the changed shard.
   2300     """
   2301     labels = _assign_board_to_shard_precheck(labels)
   2302     shard = models.Shard.objects.get(hostname=hostname)
   2303     for label in labels:
   2304         shard.labels.add(label)
   2305     return shard.id
   2306 
   2307 
   2308 # Remove board RPCs are rare, so we can afford to make them a bit more
   2309 # expensive (by performing in a transaction) in order to guarantee
   2310 # atomicity.
   2311 # TODO(akeshet): If we ever update to newer version of django, we need to
   2312 # migrate to transaction.atomic instead of commit_on_success
   2313 @transaction.commit_on_success
   2314 def remove_board_from_shard(hostname, label):
   2315     """Remove board from the given shard.
   2316     @param hostname: The hostname of the shard to be changed.
   2317     @param labels: Board label.
   2318 
   2319     @raises models.Label.DoesNotExist: If the label specified doesn't exist.
   2320 
   2321     @returns: The id of the changed shard.
   2322     """
   2323     shard = models.Shard.objects.get(hostname=hostname)
   2324     label = models.Label.smart_get(label)
   2325     if label not in shard.labels.all():
   2326         raise error.RPCException(
   2327           'Cannot remove label from shard that does not belong to it.')
   2328 
   2329     shard.labels.remove(label)
   2330     if label.is_replaced_by_static():
   2331         static_label = models.StaticLabel.smart_get(label.name)
   2332         models.Host.objects.filter(
   2333                 static_labels__in=[static_label]).update(shard=None)
   2334     else:
   2335         models.Host.objects.filter(labels__in=[label]).update(shard=None)
   2336 
   2337 
   2338 def delete_shard(hostname):
   2339     """Delete a shard and reclaim all resources from it.
   2340 
   2341     This claims back all assigned hosts from the shard.
   2342     """
   2343     shard = rpc_utils.retrieve_shard(shard_hostname=hostname)
   2344 
   2345     # Remove shard information.
   2346     models.Host.objects.filter(shard=shard).update(shard=None)
   2347 
   2348     # Note: The original job-cleanup query was performed with django call
   2349     #   models.Job.objects.filter(shard=shard).update(shard=None)
   2350     #
   2351     # But that started becoming unreliable due to the large size of afe_jobs.
   2352     #
   2353     # We don't need atomicity here, so the new cleanup method is iterative, in
   2354     # chunks of 100k jobs.
   2355     QUERY = ('UPDATE afe_jobs SET shard_id = NULL WHERE shard_id = %s '
   2356              'LIMIT 100000')
   2357     try:
   2358         with contextlib.closing(db_connection.cursor()) as cursor:
   2359             clear_jobs = True
   2360             assert shard.id is not None
   2361             while clear_jobs:
   2362                 res = cursor.execute(QUERY % shard.id).fetchone()
   2363                 clear_jobs = bool(res)
   2364     # Unit tests use sqlite backend instead of MySQL. sqlite does not support
   2365     # UPDATE ... LIMIT, so fall back to the old behavior.
   2366     except DatabaseError as e:
   2367         if 'syntax error' in str(e):
   2368             models.Job.objects.filter(shard=shard).update(shard=None)
   2369         else:
   2370             raise
   2371 
   2372     shard.labels.clear()
   2373     shard.delete()
   2374 
   2375 
   2376 def get_servers(hostname=None, role=None, status=None):
   2377     """Get a list of servers with matching role and status.
   2378 
   2379     @param hostname: FQDN of the server.
   2380     @param role: Name of the server role, e.g., drone, scheduler. Default to
   2381                  None to match any role.
   2382     @param status: Status of the server, e.g., primary, backup, repair_required.
   2383                    Default to None to match any server status.
   2384 
   2385     @raises error.RPCException: If server database is not used.
   2386     @return: A list of server names for servers with matching role and status.
   2387     """
   2388     if not server_manager_utils.use_server_db():
   2389         raise error.RPCException('Server database is not enabled. Please try '
   2390                                  'retrieve servers from global config.')
   2391     servers = server_manager_utils.get_servers(hostname=hostname, role=role,
   2392                                                status=status)
   2393     return [s.get_details() for s in servers]
   2394 
   2395 
   2396 @rpc_utils.route_rpc_to_master
   2397 def get_stable_version(board=stable_version_utils.DEFAULT, android=False):
   2398     """Get stable version for the given board.
   2399 
   2400     @param board: Name of the board.
   2401     @param android: If True, the given board is an Android-based device. If
   2402                     False, assume its a Chrome OS-based device.
   2403 
   2404     @return: Stable version of the given board. Return global configure value
   2405              of CROS.stable_cros_version if stable_versinos table does not have
   2406              entry of board DEFAULT.
   2407     """
   2408     return stable_version_utils.get(board=board, android=android)
   2409 
   2410 
   2411 @rpc_utils.route_rpc_to_master
   2412 def get_all_stable_versions():
   2413     """Get stable versions for all boards.
   2414 
   2415     @return: A dictionary of board:version.
   2416     """
   2417     return stable_version_utils.get_all()
   2418 
   2419 
   2420 @rpc_utils.route_rpc_to_master
   2421 def set_stable_version(version, board=stable_version_utils.DEFAULT):
   2422     """Modify stable version for the given board.
   2423 
   2424     @param version: The new value of stable version for given board.
   2425     @param board: Name of the board, default to value `DEFAULT`.
   2426     """
   2427     stable_version_utils.set(version=version, board=board)
   2428 
   2429 
   2430 @rpc_utils.route_rpc_to_master
   2431 def delete_stable_version(board):
   2432     """Modify stable version for the given board.
   2433 
   2434     Delete a stable version entry in afe_stable_versions table for a given
   2435     board, so default stable version will be used.
   2436 
   2437     @param board: Name of the board.
   2438     """
   2439     stable_version_utils.delete(board=board)
   2440 
   2441 
   2442 def get_tests_by_build(build, ignore_invalid_tests=True):
   2443     """Get the tests that are available for the specified build.
   2444 
   2445     @param build: unique name by which to refer to the image.
   2446     @param ignore_invalid_tests: flag on if unparsable tests are ignored.
   2447 
   2448     @return: A sorted list of all tests that are in the build specified.
   2449     """
   2450     # Collect the control files specified in this build
   2451     cfile_getter = control_file_lib._initialize_control_file_getter(build)
   2452     if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
   2453         control_file_info_list = cfile_getter.get_suite_info()
   2454         control_file_list = control_file_info_list.keys()
   2455     else:
   2456         control_file_list = cfile_getter.get_control_file_list()
   2457 
   2458     test_objects = []
   2459     _id = 0
   2460     for control_file_path in control_file_list:
   2461         # Read and parse the control file
   2462         if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
   2463             control_file = control_file_info_list[control_file_path]
   2464         else:
   2465             control_file = cfile_getter.get_control_file_contents(
   2466                     control_file_path)
   2467         try:
   2468             control_obj = control_data.parse_control_string(control_file)
   2469         except:
   2470             logging.info('Failed to parse control file: %s', control_file_path)
   2471             if not ignore_invalid_tests:
   2472                 raise
   2473 
   2474         # Extract the values needed for the AFE from the control_obj.
   2475         # The keys list represents attributes in the control_obj that
   2476         # are required by the AFE
   2477         keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental',
   2478                 'test_category', 'test_class', 'dependencies', 'run_verify',
   2479                 'sync_count', 'job_retries', 'retries', 'path']
   2480 
   2481         test_object = {}
   2482         for key in keys:
   2483             test_object[key] = getattr(control_obj, key) if hasattr(
   2484                     control_obj, key) else ''
   2485 
   2486         # Unfortunately, the AFE expects different key-names for certain
   2487         # values, these must be corrected to avoid the risk of tests
   2488         # being omitted by the AFE.
   2489         # The 'id' is an additional value used in the AFE.
   2490         # The control_data parsing does not reference 'run_reset', but it
   2491         # is also used in the AFE and defaults to True.
   2492         test_object['id'] = _id
   2493         test_object['run_reset'] = True
   2494         test_object['description'] = test_object.get('doc', '')
   2495         test_object['test_time'] = test_object.get('time', 0)
   2496         test_object['test_retry'] = test_object.get('retries', 0)
   2497 
   2498         # Fix the test name to be consistent with the current presentation
   2499         # of test names in the AFE.
   2500         testpath, subname = os.path.split(control_file_path)
   2501         testname = os.path.basename(testpath)
   2502         subname = subname.split('.')[1:]
   2503         if subname:
   2504             testname = '%s:%s' % (testname, ':'.join(subname))
   2505 
   2506         test_object['name'] = testname
   2507 
   2508         # Correct the test path as parse_control_string sets an empty string.
   2509         test_object['path'] = control_file_path
   2510 
   2511         _id += 1
   2512         test_objects.append(test_object)
   2513 
   2514     test_objects = sorted(test_objects, key=lambda x: x.get('name'))
   2515     return rpc_utils.prepare_for_serialization(test_objects)
   2516 
   2517 
   2518 @rpc_utils.route_rpc_to_master
   2519 def get_lab_health_indicators(board=None):
   2520     """Get the healthy indicators for whole lab.
   2521 
   2522     The indicators now includes:
   2523     1. lab is closed or not.
   2524     2. Available DUTs list for a given board.
   2525     3. Devserver capacity.
   2526     4. When is the next major DUT utilization (e.g. CQ is coming in 3 minutes).
   2527 
   2528     @param board: if board is specified, a list of available DUTs will be
   2529         returned for it. Otherwise, skip this indicator.
   2530 
   2531     @returns: A healthy indicator object including health info.
   2532     """
   2533     return LabHealthIndicator(None, None, None, None)
   2534