Home | History | Annotate | Download | only in afe
      1 # pylint: disable-msg=C0111
      2 
      3 """\
      4 Functions to expose over the RPC interface.
      5 
      6 For all modify* and delete* functions that ask for an 'id' parameter to
      7 identify the object to operate on, the id may be either
      8  * the database row ID
      9  * the name of the object (label name, hostname, user login, etc.)
     10  * a dictionary containing uniquely identifying field (this option should seldom
     11    be used)
     12 
     13 When specifying foreign key fields (i.e. adding hosts to a label, or adding
     14 users to an ACL group), the given value may be either the database row ID or the
     15 name of the object.
     16 
     17 All get* functions return lists of dictionaries.  Each dictionary represents one
     18 object and maps field names to values.
     19 
     20 Some examples:
     21 modify_host(2, hostname='myhost') # modify hostname of host with database ID 2
     22 modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2'
     23 modify_test('sleeptest', test_type='Client', params=', seconds=60')
     24 delete_acl_group(1) # delete by ID
     25 delete_acl_group('Everyone') # delete by name
     26 acl_group_add_users('Everyone', ['mbligh', 'showard'])
     27 get_jobs(owner='showard', status='Queued')
     28 
     29 See doctests/001_rpc_test.txt for (lots) more examples.
     30 """
     31 
     32 __author__ = 'showard (at] google.com (Steve Howard)'
     33 
     34 import sys
     35 import datetime
     36 import logging
     37 
     38 from django.db.models import Count
     39 import common
     40 from autotest_lib.client.common_lib import priorities
     41 from autotest_lib.client.common_lib.cros import dev_server
     42 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     43 from autotest_lib.frontend.afe import control_file, rpc_utils
     44 from autotest_lib.frontend.afe import models, model_logic, model_attributes
     45 from autotest_lib.frontend.afe import site_rpc_interface
     46 from autotest_lib.frontend.tko import models as tko_models
     47 from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
     48 from autotest_lib.server import frontend
     49 from autotest_lib.server import utils
     50 from autotest_lib.server.cros import provision
     51 from autotest_lib.server.cros.dynamic_suite import tools
     52 from autotest_lib.site_utils import status_history
     53 
     54 
     55 _timer = autotest_stats.Timer('rpc_interface')
     56 
     57 def get_parameterized_autoupdate_image_url(job):
     58     """Get the parameterized autoupdate image url from a parameterized job."""
     59     known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob')
     60     image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj,
     61                                                            name='image')
     62     para_set = job.parameterized_job.parameterizedjobparameter_set
     63     job_test_para = para_set.get(test_parameter=image_parameter)
     64     return job_test_para.parameter_value
     65 
     66 
     67 # labels
     68 
     69 def modify_label(id, **data):
     70     """Modify a label.
     71 
     72     @param id: id or name of a label. More often a label name.
     73     @param data: New data for a label.
     74     """
     75     label_model = models.Label.smart_get(id)
     76     label_model.update_object(data)
     77 
     78     # Master forwards the RPC to shards
     79     if not utils.is_shard():
     80         rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False,
     81                              id=id, **data)
     82 
     83 
     84 def delete_label(id):
     85     """Delete a label.
     86 
     87     @param id: id or name of a label. More often a label name.
     88     """
     89     label_model = models.Label.smart_get(id)
     90     # Hosts that have the label to be deleted. Save this info before
     91     # the label is deleted to use it later.
     92     hosts = []
     93     for h in label_model.host_set.all():
     94         hosts.append(models.Host.smart_get(h.id))
     95     label_model.delete()
     96 
     97     # Master forwards the RPC to shards
     98     if not utils.is_shard():
     99         rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id)
    100 
    101 
    102 def add_label(name, ignore_exception_if_exists=False, **kwargs):
    103     """Adds a new label of a given name.
    104 
    105     @param name: label name.
    106     @param ignore_exception_if_exists: If True and the exception was
    107         thrown due to the duplicated label name when adding a label,
    108         then suppress the exception. Default is False.
    109     @param kwargs: keyword args that store more info about a label
    110         other than the name.
    111     @return: int/long id of a new label.
    112     """
    113     # models.Label.add_object() throws model_logic.ValidationError
    114     # when it is given a label name that already exists.
    115     # However, ValidationError can be thrown with different errors,
    116     # and those errors should be thrown up to the call chain.
    117     try:
    118         label = models.Label.add_object(name=name, **kwargs)
    119     except:
    120         exc_info = sys.exc_info()
    121         if ignore_exception_if_exists:
    122             label = rpc_utils.get_label(name)
    123             # If the exception is raised not because of duplicated
    124             # "name", then raise the original exception.
    125             if label is None:
    126                 raise exc_info[0], exc_info[1], exc_info[2]
    127         else:
    128             raise exc_info[0], exc_info[1], exc_info[2]
    129     return label.id
    130 
    131 
    132 def add_label_to_hosts(id, hosts):
    133     """Adds a label of the given id to the given hosts only in local DB.
    134 
    135     @param id: id or name of a label. More often a label name.
    136     @param hosts: The hostnames of hosts that need the label.
    137 
    138     @raises models.Label.DoesNotExist: If the label with id doesn't exist.
    139     """
    140     label = models.Label.smart_get(id)
    141     host_objs = models.Host.smart_get_bulk(hosts)
    142     if label.platform:
    143         models.Host.check_no_platform(host_objs)
    144     label.host_set.add(*host_objs)
    145 
    146 
    147 @rpc_utils.route_rpc_to_master
    148 def label_add_hosts(id, hosts):
    149     """Adds a label with the given id to the given hosts.
    150 
    151     This method should be run only on master not shards.
    152     The given label will be created if it doesn't exist, provided the `id`
    153     supplied is a label name not an int/long id.
    154 
    155     @param id: id or name of a label. More often a label name.
    156     @param hosts: A list of hostnames or ids. More often hostnames.
    157 
    158     @raises ValueError: If the id specified is an int/long (label id)
    159                         while the label does not exist.
    160     """
    161     try:
    162         label = models.Label.smart_get(id)
    163     except models.Label.DoesNotExist:
    164         # This matches the type checks in smart_get, which is a hack
    165         # in and off itself. The aim here is to create any non-existent
    166         # label, which we cannot do if the 'id' specified isn't a label name.
    167         if isinstance(id, basestring):
    168             label = models.Label.smart_get(add_label(id))
    169         else:
    170             raise ValueError('Label id (%s) does not exist. Please specify '
    171                              'the argument, id, as a string (label name).'
    172                              % id)
    173     add_label_to_hosts(id, hosts)
    174 
    175     host_objs = models.Host.smart_get_bulk(hosts)
    176     # Make sure the label exists on the shard with the same id
    177     # as it is on the master.
    178     # It is possible that the label is already in a shard because
    179     # we are adding a new label only to shards of hosts that the label
    180     # is going to be attached.
    181     # For example, we add a label L1 to a host in shard S1.
    182     # Master and S1 will have L1 but other shards won't.
    183     # Later, when we add the same label L1 to hosts in shards S1 and S2,
    184     # S1 already has the label but S2 doesn't.
    185     # S2 should have the new label without any problem.
    186     # We ignore exception in such a case.
    187     rpc_utils.fanout_rpc(
    188             host_objs, 'add_label', include_hostnames=False,
    189             name=label.name, ignore_exception_if_exists=True,
    190             id=label.id, platform=label.platform)
    191     rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id)
    192 
    193 
    194 def remove_label_from_hosts(id, hosts):
    195     """Removes a label of the given id from the given hosts only in local DB.
    196 
    197     @param id: id or name of a label.
    198     @param hosts: The hostnames of hosts that need to remove the label from.
    199     """
    200     host_objs = models.Host.smart_get_bulk(hosts)
    201     models.Label.smart_get(id).host_set.remove(*host_objs)
    202 
    203 
    204 @rpc_utils.route_rpc_to_master
    205 def label_remove_hosts(id, hosts):
    206     """Removes a label of the given id from the given hosts.
    207 
    208     This method should be run only on master not shards.
    209 
    210     @param id: id or name of a label.
    211     @param hosts: A list of hostnames or ids. More often hostnames.
    212     """
    213     host_objs = models.Host.smart_get_bulk(hosts)
    214     remove_label_from_hosts(id, hosts)
    215 
    216     rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id)
    217 
    218 
    219 def get_labels(exclude_filters=(), **filter_data):
    220     """\
    221     @param exclude_filters: A sequence of dictionaries of filters.
    222 
    223     @returns A sequence of nested dictionaries of label information.
    224     """
    225     labels = models.Label.query_objects(filter_data)
    226     for exclude_filter in exclude_filters:
    227         labels = labels.exclude(**exclude_filter)
    228     return rpc_utils.prepare_rows_as_nested_dicts(labels, ('atomic_group',))
    229 
    230 
    231 # atomic groups
    232 
    233 def add_atomic_group(name, max_number_of_machines=None, description=None):
    234     return models.AtomicGroup.add_object(
    235             name=name, max_number_of_machines=max_number_of_machines,
    236             description=description).id
    237 
    238 
    239 def modify_atomic_group(id, **data):
    240     models.AtomicGroup.smart_get(id).update_object(data)
    241 
    242 
    243 def delete_atomic_group(id):
    244     models.AtomicGroup.smart_get(id).delete()
    245 
    246 
    247 def atomic_group_add_labels(id, labels):
    248     label_objs = models.Label.smart_get_bulk(labels)
    249     models.AtomicGroup.smart_get(id).label_set.add(*label_objs)
    250 
    251 
    252 def atomic_group_remove_labels(id, labels):
    253     label_objs = models.Label.smart_get_bulk(labels)
    254     models.AtomicGroup.smart_get(id).label_set.remove(*label_objs)
    255 
    256 
    257 def get_atomic_groups(**filter_data):
    258     return rpc_utils.prepare_for_serialization(
    259             models.AtomicGroup.list_objects(filter_data))
    260 
    261 
    262 # hosts
    263 
    264 def add_host(hostname, status=None, locked=None, lock_reason='', protection=None):
    265     if locked and not lock_reason:
    266         raise model_logic.ValidationError(
    267             {'locked': 'Please provide a reason for locking when adding host.'})
    268 
    269     return models.Host.add_object(hostname=hostname, status=status,
    270                                   locked=locked, lock_reason=lock_reason,
    271                                   protection=protection).id
    272 
    273 
    274 @rpc_utils.route_rpc_to_master
    275 def modify_host(id, **kwargs):
    276     """Modify local attributes of a host.
    277 
    278     If this is called on the master, but the host is assigned to a shard, this
    279     will call `modify_host_local` RPC to the responsible shard. This means if
    280     a host is being locked using this function, this change will also propagate
    281     to shards.
    282     When this is called on a shard, the shard just routes the RPC to the master
    283     and does nothing.
    284 
    285     @param id: id of the host to modify.
    286     @param kwargs: key=value pairs of values to set on the host.
    287     """
    288     rpc_utils.check_modify_host(kwargs)
    289     host = models.Host.smart_get(id)
    290     try:
    291         rpc_utils.check_modify_host_locking(host, kwargs)
    292     except model_logic.ValidationError as e:
    293         if not kwargs.get('force_modify_locking', False):
    294             raise
    295         logging.exception('The following exception will be ignored and lock '
    296                           'modification will be enforced. %s', e)
    297 
    298     # This is required to make `lock_time` for a host be exactly same
    299     # between the master and a shard.
    300     if kwargs.get('locked', None) and 'lock_time' not in kwargs:
    301         kwargs['lock_time'] = datetime.datetime.now()
    302     host.update_object(kwargs)
    303 
    304     # force_modifying_locking is not an internal field in database, remove.
    305     kwargs.pop('force_modify_locking', None)
    306     rpc_utils.fanout_rpc([host], 'modify_host_local',
    307                          include_hostnames=False, id=id, **kwargs)
    308 
    309 
    310 def modify_host_local(id, **kwargs):
    311     """Modify host attributes in local DB.
    312 
    313     @param id: Host id.
    314     @param kwargs: key=value pairs of values to set on the host.
    315     """
    316     models.Host.smart_get(id).update_object(kwargs)
    317 
    318 
    319 @rpc_utils.route_rpc_to_master
    320 def modify_hosts(host_filter_data, update_data):
    321     """Modify local attributes of multiple hosts.
    322 
    323     If this is called on the master, but one of the hosts in that match the
    324     filters is assigned to a shard, this will call `modify_hosts_local` RPC
    325     to the responsible shard.
    326     When this is called on a shard, the shard just routes the RPC to the master
    327     and does nothing.
    328 
    329     The filters are always applied on the master, not on the shards. This means
    330     if the states of a host differ on the master and a shard, the state on the
    331     master will be used. I.e. this means:
    332     A host was synced to Shard 1. On Shard 1 the status of the host was set to
    333     'Repair Failed'.
    334     - A call to modify_hosts with host_filter_data={'status': 'Ready'} will
    335     update the host (both on the shard and on the master), because the state
    336     of the host as the master knows it is still 'Ready'.
    337     - A call to modify_hosts with host_filter_data={'status': 'Repair failed'
    338     will not update the host, because the filter doesn't apply on the master.
    339 
    340     @param host_filter_data: Filters out which hosts to modify.
    341     @param update_data: A dictionary with the changes to make to the hosts.
    342     """
    343     update_data = update_data.copy()
    344     rpc_utils.check_modify_host(update_data)
    345     hosts = models.Host.query_objects(host_filter_data)
    346 
    347     affected_shard_hostnames = set()
    348     affected_host_ids = []
    349 
    350     # Check all hosts before changing data for exception safety.
    351     for host in hosts:
    352         try:
    353             rpc_utils.check_modify_host_locking(host, update_data)
    354         except model_logic.ValidationError as e:
    355             if not update_data.get('force_modify_locking', False):
    356                 raise
    357             logging.exception('The following exception will be ignored and '
    358                               'lock modification will be enforced. %s', e)
    359 
    360         if host.shard:
    361             affected_shard_hostnames.add(host.shard.rpc_hostname())
    362             affected_host_ids.append(host.id)
    363 
    364     # This is required to make `lock_time` for a host be exactly same
    365     # between the master and a shard.
    366     if update_data.get('locked', None) and 'lock_time' not in update_data:
    367         update_data['lock_time'] = datetime.datetime.now()
    368     for host in hosts:
    369         host.update_object(update_data)
    370 
    371     update_data.pop('force_modify_locking', None)
    372     # Caution: Changing the filter from the original here. See docstring.
    373     rpc_utils.run_rpc_on_multiple_hostnames(
    374             'modify_hosts_local', affected_shard_hostnames,
    375             host_filter_data={'id__in': affected_host_ids},
    376             update_data=update_data)
    377 
    378 
    379 def modify_hosts_local(host_filter_data, update_data):
    380     """Modify attributes of hosts in local DB.
    381 
    382     @param host_filter_data: Filters out which hosts to modify.
    383     @param update_data: A dictionary with the changes to make to the hosts.
    384     """
    385     for host in models.Host.query_objects(host_filter_data):
    386         host.update_object(update_data)
    387 
    388 
    389 def add_labels_to_host(id, labels):
    390     """Adds labels to a given host only in local DB.
    391 
    392     @param id: id or hostname for a host.
    393     @param labels: ids or names for labels.
    394     """
    395     label_objs = models.Label.smart_get_bulk(labels)
    396     models.Host.smart_get(id).labels.add(*label_objs)
    397 
    398 
    399 @rpc_utils.route_rpc_to_master
    400 def host_add_labels(id, labels):
    401     """Adds labels to a given host.
    402 
    403     @param id: id or hostname for a host.
    404     @param labels: ids or names for labels.
    405 
    406     @raises ValidationError: If adding more than one platform label.
    407     """
    408     label_objs = models.Label.smart_get_bulk(labels)
    409     platforms = [label.name for label in label_objs if label.platform]
    410     if len(platforms) > 1:
    411         raise model_logic.ValidationError(
    412             {'labels': 'Adding more than one platform label: %s' %
    413                        ', '.join(platforms)})
    414 
    415     host_obj = models.Host.smart_get(id)
    416     if len(platforms) == 1:
    417         models.Host.check_no_platform([host_obj])
    418     add_labels_to_host(id, labels)
    419 
    420     rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False,
    421                          id=id, labels=labels)
    422 
    423 
    424 def remove_labels_from_host(id, labels):
    425     """Removes labels from a given host only in local DB.
    426 
    427     @param id: id or hostname for a host.
    428     @param labels: ids or names for labels.
    429     """
    430     label_objs = models.Label.smart_get_bulk(labels)
    431     models.Host.smart_get(id).labels.remove(*label_objs)
    432 
    433 
    434 @rpc_utils.route_rpc_to_master
    435 def host_remove_labels(id, labels):
    436     """Removes labels from a given host.
    437 
    438     @param id: id or hostname for a host.
    439     @param labels: ids or names for labels.
    440     """
    441     remove_labels_from_host(id, labels)
    442 
    443     host_obj = models.Host.smart_get(id)
    444     rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False,
    445                          id=id, labels=labels)
    446 
    447 
    448 def get_host_attribute(attribute, **host_filter_data):
    449     """
    450     @param attribute: string name of attribute
    451     @param host_filter_data: filter data to apply to Hosts to choose hosts to
    452                              act upon
    453     """
    454     hosts = rpc_utils.get_host_query((), False, False, True, host_filter_data)
    455     hosts = list(hosts)
    456     models.Host.objects.populate_relationships(hosts, models.HostAttribute,
    457                                                'attribute_list')
    458     host_attr_dicts = []
    459     for host_obj in hosts:
    460         for attr_obj in host_obj.attribute_list:
    461             if attr_obj.attribute == attribute:
    462                 host_attr_dicts.append(attr_obj.get_object_dict())
    463     return rpc_utils.prepare_for_serialization(host_attr_dicts)
    464 
    465 
    466 def set_host_attribute(attribute, value, **host_filter_data):
    467     """
    468     @param attribute: string name of attribute
    469     @param value: string, or None to delete an attribute
    470     @param host_filter_data: filter data to apply to Hosts to choose hosts to
    471                              act upon
    472     """
    473     assert host_filter_data # disallow accidental actions on all hosts
    474     hosts = models.Host.query_objects(host_filter_data)
    475     models.AclGroup.check_for_acl_violation_hosts(hosts)
    476     for host in hosts:
    477         host.set_or_delete_attribute(attribute, value)
    478 
    479     # Master forwards this RPC to shards.
    480     if not utils.is_shard():
    481         rpc_utils.fanout_rpc(hosts, 'set_host_attribute', False,
    482                 attribute=attribute, value=value, **host_filter_data)
    483 
    484 
    485 @rpc_utils.forward_single_host_rpc_to_shard
    486 def delete_host(id):
    487     models.Host.smart_get(id).delete()
    488 
    489 
    490 def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
    491               exclude_atomic_group_hosts=False, valid_only=True,
    492               include_current_job=False, **filter_data):
    493     """Get a list of dictionaries which contains the information of hosts.
    494 
    495     @param multiple_labels: match hosts in all of the labels given.  Should
    496             be a list of label names.
    497     @param exclude_only_if_needed_labels: Exclude hosts with at least one
    498             "only_if_needed" label applied.
    499     @param exclude_atomic_group_hosts: Exclude hosts that have one or more
    500             atomic group labels associated with them.
    501     @param include_current_job: Set to True to include ids of currently running
    502             job and special task.
    503     """
    504     hosts = rpc_utils.get_host_query(multiple_labels,
    505                                      exclude_only_if_needed_labels,
    506                                      exclude_atomic_group_hosts,
    507                                      valid_only, filter_data)
    508     hosts = list(hosts)
    509     models.Host.objects.populate_relationships(hosts, models.Label,
    510                                                'label_list')
    511     models.Host.objects.populate_relationships(hosts, models.AclGroup,
    512                                                'acl_list')
    513     models.Host.objects.populate_relationships(hosts, models.HostAttribute,
    514                                                'attribute_list')
    515     host_dicts = []
    516     for host_obj in hosts:
    517         host_dict = host_obj.get_object_dict()
    518         host_dict['labels'] = [label.name for label in host_obj.label_list]
    519         host_dict['platform'], host_dict['atomic_group'] = (rpc_utils.
    520                 find_platform_and_atomic_group(host_obj))
    521         host_dict['acls'] = [acl.name for acl in host_obj.acl_list]
    522         host_dict['attributes'] = dict((attribute.attribute, attribute.value)
    523                                        for attribute in host_obj.attribute_list)
    524         if include_current_job:
    525             host_dict['current_job'] = None
    526             host_dict['current_special_task'] = None
    527             entries = models.HostQueueEntry.objects.filter(
    528                     host_id=host_dict['id'], active=True, complete=False)
    529             if entries:
    530                 host_dict['current_job'] = (
    531                         entries[0].get_object_dict()['job'])
    532             tasks = models.SpecialTask.objects.filter(
    533                     host_id=host_dict['id'], is_active=True, is_complete=False)
    534             if tasks:
    535                 host_dict['current_special_task'] = (
    536                         '%d-%s' % (tasks[0].get_object_dict()['id'],
    537                                    tasks[0].get_object_dict()['task'].lower()))
    538         host_dicts.append(host_dict)
    539     return rpc_utils.prepare_for_serialization(host_dicts)
    540 
    541 
    542 def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
    543                   exclude_atomic_group_hosts=False, valid_only=True,
    544                   **filter_data):
    545     """
    546     Same parameters as get_hosts().
    547 
    548     @returns The number of matching hosts.
    549     """
    550     hosts = rpc_utils.get_host_query(multiple_labels,
    551                                      exclude_only_if_needed_labels,
    552                                      exclude_atomic_group_hosts,
    553                                      valid_only, filter_data)
    554     return hosts.count()
    555 
    556 
    557 # tests
    558 
    559 def add_test(name, test_type, path, author=None, dependencies=None,
    560              experimental=True, run_verify=None, test_class=None,
    561              test_time=None, test_category=None, description=None,
    562              sync_count=1):
    563     return models.Test.add_object(name=name, test_type=test_type, path=path,
    564                                   author=author, dependencies=dependencies,
    565                                   experimental=experimental,
    566                                   run_verify=run_verify, test_time=test_time,
    567                                   test_category=test_category,
    568                                   sync_count=sync_count,
    569                                   test_class=test_class,
    570                                   description=description).id
    571 
    572 
    573 def modify_test(id, **data):
    574     models.Test.smart_get(id).update_object(data)
    575 
    576 
    577 def delete_test(id):
    578     models.Test.smart_get(id).delete()
    579 
    580 
    581 def get_tests(**filter_data):
    582     return rpc_utils.prepare_for_serialization(
    583         models.Test.list_objects(filter_data))
    584 
    585 
    586 @_timer.decorate
    587 def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
    588     """Gets the counts of all passed and failed tests from the matching jobs.
    589 
    590     @param job_name_prefix: Name prefix of the jobs to get the summary from, e.g.,
    591             'butterfly-release/R40-6457.21.0/bvt-cq/'.
    592     @param label_name: Label that must be set in the jobs, e.g.,
    593             'cros-version:butterfly-release/R40-6457.21.0'.
    594 
    595     @returns A summary of the counts of all the passed and failed tests.
    596     """
    597     job_ids = list(models.Job.objects.filter(
    598             name__startswith=job_name_prefix,
    599             dependency_labels__name=label_name).values_list(
    600                 'pk', flat=True))
    601     summary = {'passed': 0, 'failed': 0}
    602     if not job_ids:
    603         return summary
    604 
    605     counts = (tko_models.TestView.objects.filter(
    606             afe_job_id__in=job_ids).exclude(
    607                 test_name='SERVER_JOB').exclude(
    608                     test_name__startswith='CLIENT_JOB').values(
    609                         'status').annotate(
    610                             count=Count('status')))
    611     for status in counts:
    612         if status['status'] == 'GOOD':
    613             summary['passed'] += status['count']
    614         else:
    615             summary['failed'] += status['count']
    616     return summary
    617 
    618 
    619 # profilers
    620 
    621 def add_profiler(name, description=None):
    622     return models.Profiler.add_object(name=name, description=description).id
    623 
    624 
    625 def modify_profiler(id, **data):
    626     models.Profiler.smart_get(id).update_object(data)
    627 
    628 
    629 def delete_profiler(id):
    630     models.Profiler.smart_get(id).delete()
    631 
    632 
    633 def get_profilers(**filter_data):
    634     return rpc_utils.prepare_for_serialization(
    635         models.Profiler.list_objects(filter_data))
    636 
    637 
    638 # users
    639 
    640 def add_user(login, access_level=None):
    641     return models.User.add_object(login=login, access_level=access_level).id
    642 
    643 
    644 def modify_user(id, **data):
    645     models.User.smart_get(id).update_object(data)
    646 
    647 
    648 def delete_user(id):
    649     models.User.smart_get(id).delete()
    650 
    651 
    652 def get_users(**filter_data):
    653     return rpc_utils.prepare_for_serialization(
    654         models.User.list_objects(filter_data))
    655 
    656 
    657 # acl groups
    658 
    659 def add_acl_group(name, description=None):
    660     group = models.AclGroup.add_object(name=name, description=description)
    661     group.users.add(models.User.current_user())
    662     return group.id
    663 
    664 
    665 def modify_acl_group(id, **data):
    666     group = models.AclGroup.smart_get(id)
    667     group.check_for_acl_violation_acl_group()
    668     group.update_object(data)
    669     group.add_current_user_if_empty()
    670 
    671 
    672 def acl_group_add_users(id, users):
    673     group = models.AclGroup.smart_get(id)
    674     group.check_for_acl_violation_acl_group()
    675     users = models.User.smart_get_bulk(users)
    676     group.users.add(*users)
    677 
    678 
    679 def acl_group_remove_users(id, users):
    680     group = models.AclGroup.smart_get(id)
    681     group.check_for_acl_violation_acl_group()
    682     users = models.User.smart_get_bulk(users)
    683     group.users.remove(*users)
    684     group.add_current_user_if_empty()
    685 
    686 
    687 def acl_group_add_hosts(id, hosts):
    688     group = models.AclGroup.smart_get(id)
    689     group.check_for_acl_violation_acl_group()
    690     hosts = models.Host.smart_get_bulk(hosts)
    691     group.hosts.add(*hosts)
    692     group.on_host_membership_change()
    693 
    694 
    695 def acl_group_remove_hosts(id, hosts):
    696     group = models.AclGroup.smart_get(id)
    697     group.check_for_acl_violation_acl_group()
    698     hosts = models.Host.smart_get_bulk(hosts)
    699     group.hosts.remove(*hosts)
    700     group.on_host_membership_change()
    701 
    702 
    703 def delete_acl_group(id):
    704     models.AclGroup.smart_get(id).delete()
    705 
    706 
    707 def get_acl_groups(**filter_data):
    708     acl_groups = models.AclGroup.list_objects(filter_data)
    709     for acl_group in acl_groups:
    710         acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])
    711         acl_group['users'] = [user.login
    712                               for user in acl_group_obj.users.all()]
    713         acl_group['hosts'] = [host.hostname
    714                               for host in acl_group_obj.hosts.all()]
    715     return rpc_utils.prepare_for_serialization(acl_groups)
    716 
    717 
    718 # jobs
    719 
    720 def generate_control_file(tests=(), kernel=None, label=None, profilers=(),
    721                           client_control_file='', use_container=False,
    722                           profile_only=None, upload_kernel_config=False,
    723                           db_tests=True):
    724     """
    725     Generates a client-side control file to load a kernel and run tests.
    726 
    727     @param tests List of tests to run. See db_tests for more information.
    728     @param kernel A list of kernel info dictionaries configuring which kernels
    729         to boot for this job and other options for them
    730     @param label Name of label to grab kernel config from.
    731     @param profilers List of profilers to activate during the job.
    732     @param client_control_file The contents of a client-side control file to
    733         run at the end of all tests.  If this is supplied, all tests must be
    734         client side.
    735         TODO: in the future we should support server control files directly
    736         to wrap with a kernel.  That'll require changing the parameter
    737         name and adding a boolean to indicate if it is a client or server
    738         control file.
    739     @param use_container unused argument today.  TODO: Enable containers
    740         on the host during a client side test.
    741     @param profile_only A boolean that indicates what default profile_only
    742         mode to use in the control file. Passing None will generate a
    743         control file that does not explcitly set the default mode at all.
    744     @param upload_kernel_config: if enabled it will generate server control
    745             file code that uploads the kernel config file to the client and
    746             tells the client of the new (local) path when compiling the kernel;
    747             the tests must be server side tests
    748     @param db_tests: if True, the test object can be found in the database
    749                      backing the test model. In this case, tests is a tuple
    750                      of test IDs which are used to retrieve the test objects
    751                      from the database. If False, tests is a tuple of test
    752                      dictionaries stored client-side in the AFE.
    753 
    754     @returns a dict with the following keys:
    755         control_file: str, The control file text.
    756         is_server: bool, is the control file a server-side control file?
    757         synch_count: How many machines the job uses per autoserv execution.
    758             synch_count == 1 means the job is asynchronous.
    759         dependencies: A list of the names of labels on which the job depends.
    760     """
    761     if not tests and not client_control_file:
    762         return dict(control_file='', is_server=False, synch_count=1,
    763                     dependencies=[])
    764 
    765     cf_info, test_objects, profiler_objects, label = (
    766         rpc_utils.prepare_generate_control_file(tests, kernel, label,
    767                                                 profilers, db_tests))
    768     cf_info['control_file'] = control_file.generate_control(
    769         tests=test_objects, kernels=kernel, platform=label,
    770         profilers=profiler_objects, is_server=cf_info['is_server'],
    771         client_control_file=client_control_file, profile_only=profile_only,
    772         upload_kernel_config=upload_kernel_config)
    773     return cf_info
    774 
    775 
    776 def create_parameterized_job(name, priority, test, parameters, kernel=None,
    777                              label=None, profilers=(), profiler_parameters=None,
    778                              use_container=False, profile_only=None,
    779                              upload_kernel_config=False, hosts=(),
    780                              meta_hosts=(), one_time_hosts=(),
    781                              atomic_group_name=None, synch_count=None,
    782                              is_template=False, timeout=None,
    783                              timeout_mins=None, max_runtime_mins=None,
    784                              run_verify=False, email_list='', dependencies=(),
    785                              reboot_before=None, reboot_after=None,
    786                              parse_failed_repair=None, hostless=False,
    787                              keyvals=None, drone_set=None, run_reset=True,
    788                              require_ssp=None):
    789     """
    790     Creates and enqueues a parameterized job.
    791 
    792     Most parameters a combination of the parameters for generate_control_file()
    793     and create_job(), with the exception of:
    794 
    795     @param test name or ID of the test to run
    796     @param parameters a map of parameter name ->
    797                           tuple of (param value, param type)
    798     @param profiler_parameters a dictionary of parameters for the profilers:
    799                                    key: profiler name
    800                                    value: dict of param name -> tuple of
    801                                                                 (param value,
    802                                                                  param type)
    803     """
    804     # Save the values of the passed arguments here. What we're going to do with
    805     # them is pass them all to rpc_utils.get_create_job_common_args(), which
    806     # will extract the subset of these arguments that apply for
    807     # rpc_utils.create_job_common(), which we then pass in to that function.
    808     args = locals()
    809 
    810     # Set up the parameterized job configs
    811     test_obj = models.Test.smart_get(test)
    812     control_type = test_obj.test_type
    813 
    814     try:
    815         label = models.Label.smart_get(label)
    816     except models.Label.DoesNotExist:
    817         label = None
    818 
    819     kernel_objs = models.Kernel.create_kernels(kernel)
    820     profiler_objs = [models.Profiler.smart_get(profiler)
    821                      for profiler in profilers]
    822 
    823     parameterized_job = models.ParameterizedJob.objects.create(
    824             test=test_obj, label=label, use_container=use_container,
    825             profile_only=profile_only,
    826             upload_kernel_config=upload_kernel_config)
    827     parameterized_job.kernels.add(*kernel_objs)
    828 
    829     for profiler in profiler_objs:
    830         parameterized_profiler = models.ParameterizedJobProfiler.objects.create(
    831                 parameterized_job=parameterized_job,
    832                 profiler=profiler)
    833         profiler_params = profiler_parameters.get(profiler.name, {})
    834         for name, (value, param_type) in profiler_params.iteritems():
    835             models.ParameterizedJobProfilerParameter.objects.create(
    836                     parameterized_job_profiler=parameterized_profiler,
    837                     parameter_name=name,
    838                     parameter_value=value,
    839                     parameter_type=param_type)
    840 
    841     try:
    842         for parameter in test_obj.testparameter_set.all():
    843             if parameter.name in parameters:
    844                 param_value, param_type = parameters.pop(parameter.name)
    845                 parameterized_job.parameterizedjobparameter_set.create(
    846                         test_parameter=parameter, parameter_value=param_value,
    847                         parameter_type=param_type)
    848 
    849         if parameters:
    850             raise Exception('Extra parameters remain: %r' % parameters)
    851 
    852         return rpc_utils.create_job_common(
    853                 parameterized_job=parameterized_job.id,
    854                 control_type=control_type,
    855                 **rpc_utils.get_create_job_common_args(args))
    856     except:
    857         parameterized_job.delete()
    858         raise
    859 
    860 
    861 def create_job_page_handler(name, priority, control_file, control_type,
    862                             image=None, hostless=False, firmware_rw_build=None,
    863                             firmware_ro_build=None, test_source_build=None,
    864                             **kwargs):
    865     """\
    866     Create and enqueue a job.
    867 
    868     @param name name of this job
    869     @param priority Integer priority of this job.  Higher is more important.
    870     @param control_file String contents of the control file.
    871     @param control_type Type of control file, Client or Server.
    872     @param image: ChromeOS build to be installed in the dut. Default to None.
    873     @param firmware_rw_build: Firmware build to update RW firmware. Default to
    874                               None, i.e., RW firmware will not be updated.
    875     @param firmware_ro_build: Firmware build to update RO firmware. Default to
    876                               None, i.e., RO firmware will not be updated.
    877     @param test_source_build: Build to be used to retrieve test code. Default
    878                               to None.
    879     @param kwargs extra args that will be required by create_suite_job or
    880                   create_job.
    881 
    882     @returns The created Job id number.
    883     """
    884     control_file = rpc_utils.encode_ascii(control_file)
    885     if not control_file:
    886         raise model_logic.ValidationError({
    887                 'control_file' : "Control file cannot be empty"})
    888 
    889     if image and hostless:
    890         builds = {}
    891         builds[provision.CROS_VERSION_PREFIX] = image
    892         if firmware_rw_build:
    893             builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
    894         if firmware_ro_build:
    895             builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
    896         return site_rpc_interface.create_suite_job(
    897                 name=name, control_file=control_file, priority=priority,
    898                 builds=builds, test_source_build=test_source_build, **kwargs)
    899     return create_job(name, priority, control_file, control_type, image=image,
    900                       hostless=hostless, **kwargs)
    901 
    902 
    903 @rpc_utils.route_rpc_to_master
    904 def create_job(name, priority, control_file, control_type,
    905                hosts=(), meta_hosts=(), one_time_hosts=(),
    906                atomic_group_name=None, synch_count=None, is_template=False,
    907                timeout=None, timeout_mins=None, max_runtime_mins=None,
    908                run_verify=False, email_list='', dependencies=(),
    909                reboot_before=None, reboot_after=None, parse_failed_repair=None,
    910                hostless=False, keyvals=None, drone_set=None, image=None,
    911                parent_job_id=None, test_retry=0, run_reset=True,
    912                require_ssp=None, args=(), **kwargs):
    913     """\
    914     Create and enqueue a job.
    915 
    916     @param name name of this job
    917     @param priority Integer priority of this job.  Higher is more important.
    918     @param control_file String contents of the control file.
    919     @param control_type Type of control file, Client or Server.
    920     @param synch_count How many machines the job uses per autoserv execution.
    921         synch_count == 1 means the job is asynchronous.  If an atomic group is
    922         given this value is treated as a minimum.
    923     @param is_template If true then create a template job.
    924     @param timeout Hours after this call returns until the job times out.
    925     @param timeout_mins Minutes after this call returns until the job times
    926         out.
    927     @param max_runtime_mins Minutes from job starting time until job times out
    928     @param run_verify Should the host be verified before running the test?
    929     @param email_list String containing emails to mail when the job is done
    930     @param dependencies List of label names on which this job depends
    931     @param reboot_before Never, If dirty, or Always
    932     @param reboot_after Never, If all tests passed, or Always
    933     @param parse_failed_repair if true, results of failed repairs launched by
    934         this job will be parsed as part of the job.
    935     @param hostless if true, create a hostless job
    936     @param keyvals dict of keyvals to associate with the job
    937     @param hosts List of hosts to run job on.
    938     @param meta_hosts List where each entry is a label name, and for each entry
    939         one host will be chosen from that label to run the job on.
    940     @param one_time_hosts List of hosts not in the database to run the job on.
    941     @param atomic_group_name The name of an atomic group to schedule the job on.
    942     @param drone_set The name of the drone set to run this test on.
    943     @param image OS image to install before running job.
    944     @param parent_job_id id of a job considered to be parent of created job.
    945     @param test_retry Number of times to retry test if the test did not
    946         complete successfully. (optional, default: 0)
    947     @param run_reset Should the host be reset before running the test?
    948     @param require_ssp Set to True to require server-side packaging to run the
    949                        test. If it's set to None, drone will still try to run
    950                        the server side with server-side packaging. If the
    951                        autotest-server package doesn't exist for the build or
    952                        image is not set, drone will run the test without server-
    953                        side packaging. Default is None.
    954     @param args A list of args to be injected into control file.
    955     @param kwargs extra keyword args. NOT USED.
    956 
    957     @returns The created Job id number.
    958     """
    959     if args:
    960         control_file = tools.inject_vars({'args': args}, control_file)
    961 
    962     if image is None:
    963         return rpc_utils.create_job_common(
    964                 **rpc_utils.get_create_job_common_args(locals()))
    965 
    966     # Translate the image name, in case its a relative build name.
    967     ds = dev_server.ImageServer.resolve(image)
    968     image = ds.translate(image)
    969 
    970     # When image is supplied use a known parameterized test already in the
    971     # database to pass the OS image path from the front end, through the
    972     # scheduler, and finally to autoserv as the --image parameter.
    973 
    974     # The test autoupdate_ParameterizedJob is in afe_autotests and used to
    975     # instantiate a Test object and from there a ParameterizedJob.
    976     known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob')
    977     known_parameterized_job = models.ParameterizedJob.objects.create(
    978             test=known_test_obj)
    979 
    980     # autoupdate_ParameterizedJob has a single parameter, the image parameter,
    981     # stored in the table afe_test_parameters.  We retrieve and set this
    982     # instance of the parameter to the OS image path.
    983     image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj,
    984                                                            name='image')
    985     known_parameterized_job.parameterizedjobparameter_set.create(
    986             test_parameter=image_parameter, parameter_value=image,
    987             parameter_type='string')
    988 
    989     # TODO(crbug.com/502638): save firmware build etc to parameterized_job.
    990 
    991     # By passing a parameterized_job to create_job_common the job entry in
    992     # the afe_jobs table will have the field parameterized_job_id set.
    993     # The scheduler uses this id in the afe_parameterized_jobs table to
    994     # match this job to our known test, and then with the
    995     # afe_parameterized_job_parameters table to get the actual image path.
    996     return rpc_utils.create_job_common(
    997             parameterized_job=known_parameterized_job.id,
    998             **rpc_utils.get_create_job_common_args(locals()))
    999 
   1000 
   1001 def abort_host_queue_entries(**filter_data):
   1002     """\
   1003     Abort a set of host queue entries.
   1004 
   1005     @return: A list of dictionaries, each contains information
   1006              about an aborted HQE.
   1007     """
   1008     query = models.HostQueueEntry.query_objects(filter_data)
   1009 
   1010     # Dont allow aborts on:
   1011     #   1. Jobs that have already completed (whether or not they were aborted)
   1012     #   2. Jobs that we have already been aborted (but may not have completed)
   1013     query = query.filter(complete=False).filter(aborted=False)
   1014     models.AclGroup.check_abort_permissions(query)
   1015     host_queue_entries = list(query.select_related())
   1016     rpc_utils.check_abort_synchronous_jobs(host_queue_entries)
   1017 
   1018     models.HostQueueEntry.abort_host_queue_entries(host_queue_entries)
   1019     hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id,
   1020                  'Job name': hqe.job.name} for hqe in host_queue_entries]
   1021     return hqe_info
   1022 
   1023 
   1024 def abort_special_tasks(**filter_data):
   1025     """\
   1026     Abort the special task, or tasks, specified in the filter.
   1027     """
   1028     query = models.SpecialTask.query_objects(filter_data)
   1029     special_tasks = query.filter(is_active=True)
   1030     for task in special_tasks:
   1031         task.abort()
   1032 
   1033 
   1034 def _call_special_tasks_on_hosts(task, hosts):
   1035     """\
   1036     Schedules a set of hosts for a special task.
   1037 
   1038     @returns A list of hostnames that a special task was created for.
   1039     """
   1040     models.AclGroup.check_for_acl_violation_hosts(hosts)
   1041     shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
   1042     if shard_host_map and not utils.is_shard():
   1043         raise ValueError('The following hosts are on shards, please '
   1044                          'follow the link to the shards and create jobs '
   1045                          'there instead. %s.' % shard_host_map)
   1046     for host in hosts:
   1047         models.SpecialTask.schedule_special_task(host, task)
   1048     return list(sorted(host.hostname for host in hosts))
   1049 
   1050 
   1051 def _forward_special_tasks_on_hosts(task, rpc, **filter_data):
   1052     """Forward special tasks to corresponding shards.
   1053 
   1054     For master, when special tasks are fired on hosts that are sharded,
   1055     forward the RPC to corresponding shards.
   1056 
   1057     For shard, create special task records in local DB.
   1058 
   1059     @param task: Enum value of frontend.afe.models.SpecialTask.Task
   1060     @param rpc: RPC name to forward.
   1061     @param filter_data: Filter keywords to be used for DB query.
   1062 
   1063     @return: A list of hostnames that a special task was created for.
   1064     """
   1065     hosts = models.Host.query_objects(filter_data)
   1066     shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts, rpc_hostnames=True)
   1067 
   1068     # Filter out hosts on a shard from those on the master, forward
   1069     # rpcs to the shard with an additional hostname__in filter, and
   1070     # create a local SpecialTask for each remaining host.
   1071     if shard_host_map and not utils.is_shard():
   1072         hosts = [h for h in hosts if h.shard is None]
   1073         for shard, hostnames in shard_host_map.iteritems():
   1074 
   1075             # The main client of this module is the frontend website, and
   1076             # it invokes it with an 'id' or an 'id__in' filter. Regardless,
   1077             # the 'hostname' filter should narrow down the list of hosts on
   1078             # each shard even though we supply all the ids in filter_data.
   1079             # This method uses hostname instead of id because it fits better
   1080             # with the overall architecture of redirection functions in
   1081             # rpc_utils.
   1082             shard_filter = filter_data.copy()
   1083             shard_filter['hostname__in'] = hostnames
   1084             rpc_utils.run_rpc_on_multiple_hostnames(
   1085                     rpc, [shard], **shard_filter)
   1086 
   1087     # There is a race condition here if someone assigns a shard to one of these
   1088     # hosts before we create the task. The host will stay on the master if:
   1089     # 1. The host is not Ready
   1090     # 2. The host is Ready but has a task
   1091     # But if the host is Ready and doesn't have a task yet, it will get sent
   1092     # to the shard as we're creating a task here.
   1093 
   1094     # Given that we only rarely verify Ready hosts it isn't worth putting this
   1095     # entire method in a transaction. The worst case scenario is that we have
   1096     # a verify running on a Ready host while the shard is using it, if the
   1097     # verify fails no subsequent tasks will be created against the host on the
   1098     # master, and verifies are safe enough that this is OK.
   1099     return _call_special_tasks_on_hosts(task, hosts)
   1100 
   1101 
   1102 def reverify_hosts(**filter_data):
   1103     """\
   1104     Schedules a set of hosts for verify.
   1105 
   1106     @returns A list of hostnames that a verify task was created for.
   1107     """
   1108     return _forward_special_tasks_on_hosts(
   1109             models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data)
   1110 
   1111 
   1112 def repair_hosts(**filter_data):
   1113     """\
   1114     Schedules a set of hosts for repair.
   1115 
   1116     @returns A list of hostnames that a repair task was created for.
   1117     """
   1118     return _forward_special_tasks_on_hosts(
   1119             models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data)
   1120 
   1121 
   1122 def get_jobs(not_yet_run=False, running=False, finished=False,
   1123              suite=False, sub=False, standalone=False, **filter_data):
   1124     """\
   1125     Extra status filter args for get_jobs:
   1126     -not_yet_run: Include only jobs that have not yet started running.
   1127     -running: Include only jobs that have start running but for which not
   1128     all hosts have completed.
   1129     -finished: Include only jobs for which all hosts have completed (or
   1130     aborted).
   1131 
   1132     Extra type filter args for get_jobs:
   1133     -suite: Include only jobs with child jobs.
   1134     -sub: Include only jobs with a parent job.
   1135     -standalone: Inlcude only jobs with no child or parent jobs.
   1136     At most one of these three fields should be specified.
   1137     """
   1138     extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
   1139                                                     running,
   1140                                                     finished)
   1141     filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
   1142                                                                  suite,
   1143                                                                  sub,
   1144                                                                  standalone)
   1145     job_dicts = []
   1146     jobs = list(models.Job.query_objects(filter_data))
   1147     models.Job.objects.populate_relationships(jobs, models.Label,
   1148                                               'dependencies')
   1149     models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
   1150     for job in jobs:
   1151         job_dict = job.get_object_dict()
   1152         job_dict['dependencies'] = ','.join(label.name
   1153                                             for label in job.dependencies)
   1154         job_dict['keyvals'] = dict((keyval.key, keyval.value)
   1155                                    for keyval in job.keyvals)
   1156         if job.parameterized_job:
   1157             job_dict['image'] = get_parameterized_autoupdate_image_url(job)
   1158         job_dicts.append(job_dict)
   1159     return rpc_utils.prepare_for_serialization(job_dicts)
   1160 
   1161 
   1162 def get_num_jobs(not_yet_run=False, running=False, finished=False,
   1163                  suite=False, sub=False, standalone=False,
   1164                  **filter_data):
   1165     """\
   1166     See get_jobs() for documentation of extra filter parameters.
   1167     """
   1168     extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
   1169                                                     running,
   1170                                                     finished)
   1171     filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
   1172                                                                  suite,
   1173                                                                  sub,
   1174                                                                  standalone)
   1175     return models.Job.query_count(filter_data)
   1176 
   1177 
   1178 def get_jobs_summary(**filter_data):
   1179     """\
   1180     Like get_jobs(), but adds 'status_counts' and 'result_counts' field.
   1181 
   1182     'status_counts' filed is a dictionary mapping status strings to the number
   1183     of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}.
   1184 
   1185     'result_counts' field is piped to tko's rpc_interface and has the return
   1186     format specified under get_group_counts.
   1187     """
   1188     jobs = get_jobs(**filter_data)
   1189     ids = [job['id'] for job in jobs]
   1190     all_status_counts = models.Job.objects.get_status_counts(ids)
   1191     for job in jobs:
   1192         job['status_counts'] = all_status_counts[job['id']]
   1193         job['result_counts'] = tko_rpc_interface.get_status_counts(
   1194                 ['afe_job_id', 'afe_job_id'],
   1195                 header_groups=[['afe_job_id'], ['afe_job_id']],
   1196                 **{'afe_job_id': job['id']})
   1197     return rpc_utils.prepare_for_serialization(jobs)
   1198 
   1199 
   1200 def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):
   1201     """\
   1202     Retrieves all the information needed to clone a job.
   1203     """
   1204     job = models.Job.objects.get(id=id)
   1205     job_info = rpc_utils.get_job_info(job,
   1206                                       preserve_metahosts,
   1207                                       queue_entry_filter_data)
   1208 
   1209     host_dicts = []
   1210     for host in job_info['hosts']:
   1211         host_dict = get_hosts(id=host.id)[0]
   1212         other_labels = host_dict['labels']
   1213         if host_dict['platform']:
   1214             other_labels.remove(host_dict['platform'])
   1215         host_dict['other_labels'] = ', '.join(other_labels)
   1216         host_dicts.append(host_dict)
   1217 
   1218     for host in job_info['one_time_hosts']:
   1219         host_dict = dict(hostname=host.hostname,
   1220                          id=host.id,
   1221                          platform='(one-time host)',
   1222                          locked_text='')
   1223         host_dicts.append(host_dict)
   1224 
   1225     # convert keys from Label objects to strings (names of labels)
   1226     meta_host_counts = dict((meta_host.name, count) for meta_host, count
   1227                             in job_info['meta_host_counts'].iteritems())
   1228 
   1229     info = dict(job=job.get_object_dict(),
   1230                 meta_host_counts=meta_host_counts,
   1231                 hosts=host_dicts)
   1232     info['job']['dependencies'] = job_info['dependencies']
   1233     if job_info['atomic_group']:
   1234         info['atomic_group_name'] = (job_info['atomic_group']).name
   1235     else:
   1236         info['atomic_group_name'] = None
   1237     info['hostless'] = job_info['hostless']
   1238     info['drone_set'] = job.drone_set and job.drone_set.name
   1239 
   1240     if job.parameterized_job:
   1241         info['job']['image'] = get_parameterized_autoupdate_image_url(job)
   1242 
   1243     return rpc_utils.prepare_for_serialization(info)
   1244 
   1245 
   1246 # host queue entries
   1247 
   1248 def get_host_queue_entries(start_time=None, end_time=None, **filter_data):
   1249     """\
   1250     @returns A sequence of nested dictionaries of host and job information.
   1251     """
   1252     filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
   1253                                                    'started_on__lte',
   1254                                                    start_time,
   1255                                                    end_time,
   1256                                                    **filter_data)
   1257     return rpc_utils.prepare_rows_as_nested_dicts(
   1258             models.HostQueueEntry.query_objects(filter_data),
   1259             ('host', 'atomic_group', 'job'))
   1260 
   1261 
   1262 def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
   1263     """\
   1264     Get the number of host queue entries associated with this job.
   1265     """
   1266     filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
   1267                                                    'started_on__lte',
   1268                                                    start_time,
   1269                                                    end_time,
   1270                                                    **filter_data)
   1271     return models.HostQueueEntry.query_count(filter_data)
   1272 
   1273 
   1274 def get_hqe_percentage_complete(**filter_data):
   1275     """
   1276     Computes the fraction of host queue entries matching the given filter data
   1277     that are complete.
   1278     """
   1279     query = models.HostQueueEntry.query_objects(filter_data)
   1280     complete_count = query.filter(complete=True).count()
   1281     total_count = query.count()
   1282     if total_count == 0:
   1283         return 1
   1284     return float(complete_count) / total_count
   1285 
   1286 
   1287 # special tasks
   1288 
   1289 def get_special_tasks(**filter_data):
   1290     """Get special task entries from the local database.
   1291 
   1292     Query the special tasks table for tasks matching the given
   1293     `filter_data`, and return a list of the results.  No attempt is
   1294     made to forward the call to shards; the buck will stop here.
   1295     The caller is expected to know the target shard for such reasons
   1296     as:
   1297       * The caller is a service (such as gs_offloader) configured
   1298         to operate on behalf of one specific shard, and no other.
   1299       * The caller has a host as a parameter, and knows that this is
   1300         the shard assigned to that host.
   1301 
   1302     @param filter_data  Filter keywords to pass to the underlying
   1303                         database query.
   1304 
   1305     """
   1306     return rpc_utils.prepare_rows_as_nested_dicts(
   1307             models.SpecialTask.query_objects(filter_data),
   1308             ('host', 'queue_entry'))
   1309 
   1310 
   1311 def get_host_special_tasks(host_id, **filter_data):
   1312     """Get special task entries for a given host.
   1313 
   1314     Query the special tasks table for tasks that ran on the host
   1315     given by `host_id` and matching the given `filter_data`.
   1316     Return a list of the results.  If the host is assigned to a
   1317     shard, forward this call to that shard.
   1318 
   1319     @param host_id      Id in the database of the target host.
   1320     @param filter_data  Filter keywords to pass to the underlying
   1321                         database query.
   1322 
   1323     """
   1324     # Retrieve host data even if the host is in an invalid state.
   1325     host = models.Host.smart_get(host_id, False)
   1326     if not host.shard:
   1327         return get_special_tasks(host_id=host_id, **filter_data)
   1328     else:
   1329         # The return values from AFE methods are post-processed
   1330         # objects that aren't JSON-serializable.  So, we have to
   1331         # call AFE.run() to get the raw, serializable output from
   1332         # the shard.
   1333         shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
   1334         return shard_afe.run('get_special_tasks',
   1335                              host_id=host_id, **filter_data)
   1336 
   1337 
   1338 def get_num_special_tasks(**kwargs):
   1339     """Get the number of special task entries from the local database.
   1340 
   1341     Query the special tasks table for tasks matching the given 'kwargs',
   1342     and return the number of the results. No attempt is made to forward
   1343     the call to shards; the buck will stop here.
   1344 
   1345     @param kwargs    Filter keywords to pass to the underlying database query.
   1346 
   1347     """
   1348     return models.SpecialTask.query_count(kwargs)
   1349 
   1350 
   1351 def get_host_num_special_tasks(host, **kwargs):
   1352     """Get special task entries for a given host.
   1353 
   1354     Query the special tasks table for tasks that ran on the host
   1355     given by 'host' and matching the given 'kwargs'.
   1356     Return a list of the results.  If the host is assigned to a
   1357     shard, forward this call to that shard.
   1358 
   1359     @param host      id or name of a host. More often a hostname.
   1360     @param kwargs    Filter keywords to pass to the underlying database query.
   1361 
   1362     """
   1363     # Retrieve host data even if the host is in an invalid state.
   1364     host_model = models.Host.smart_get(host, False)
   1365     if not host_model.shard:
   1366         return get_num_special_tasks(host=host, **kwargs)
   1367     else:
   1368         shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname())
   1369         return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
   1370 
   1371 
   1372 def get_status_task(host_id, end_time):
   1373     """Get the "status task" for a host from the local shard.
   1374 
   1375     Returns a single special task representing the given host's
   1376     "status task".  The status task is a completed special task that
   1377     identifies whether the corresponding host was working or broken
   1378     when it completed.  A successful task indicates a working host;
   1379     a failed task indicates broken.
   1380 
   1381     This call will not be forward to a shard; the receiving server
   1382     must be the shard that owns the host.
   1383 
   1384     @param host_id      Id in the database of the target host.
   1385     @param end_time     Time reference for the host's status.
   1386 
   1387     @return A single task; its status (successful or not)
   1388             corresponds to the status of the host (working or
   1389             broken) at the given time.  If no task is found, return
   1390             `None`.
   1391 
   1392     """
   1393     tasklist = rpc_utils.prepare_rows_as_nested_dicts(
   1394             status_history.get_status_task(host_id, end_time),
   1395             ('host', 'queue_entry'))
   1396     return tasklist[0] if tasklist else None
   1397 
   1398 
   1399 def get_host_status_task(host_id, end_time):
   1400     """Get the "status task" for a host from its owning shard.
   1401 
   1402     Finds the given host's owning shard, and forwards to it a call
   1403     to `get_status_task()` (see above).
   1404 
   1405     @param host_id      Id in the database of the target host.
   1406     @param end_time     Time reference for the host's status.
   1407 
   1408     @return A single task; its status (successful or not)
   1409             corresponds to the status of the host (working or
   1410             broken) at the given time.  If no task is found, return
   1411             `None`.
   1412 
   1413     """
   1414     host = models.Host.smart_get(host_id)
   1415     if not host.shard:
   1416         return get_status_task(host_id, end_time)
   1417     else:
   1418         # The return values from AFE methods are post-processed
   1419         # objects that aren't JSON-serializable.  So, we have to
   1420         # call AFE.run() to get the raw, serializable output from
   1421         # the shard.
   1422         shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
   1423         return shard_afe.run('get_status_task',
   1424                              host_id=host_id, end_time=end_time)
   1425 
   1426 
   1427 def get_host_diagnosis_interval(host_id, end_time, success):
   1428     """Find a "diagnosis interval" for a given host.
   1429 
   1430     A "diagnosis interval" identifies a start and end time where
   1431     the host went from "working" to "broken", or vice versa.  The
   1432     interval's starting time is the starting time of the last status
   1433     task with the old status; the end time is the finish time of the
   1434     first status task with the new status.
   1435 
   1436     This routine finds the most recent diagnosis interval for the
   1437     given host prior to `end_time`, with a starting status matching
   1438     `success`.  If `success` is true, the interval will start with a
   1439     successful status task; if false the interval will start with a
   1440     failed status task.
   1441 
   1442     @param host_id      Id in the database of the target host.
   1443     @param end_time     Time reference for the diagnosis interval.
   1444     @param success      Whether the diagnosis interval should start
   1445                         with a successful or failed status task.
   1446 
   1447     @return A list of two strings.  The first is the timestamp for
   1448             the beginning of the interval; the second is the
   1449             timestamp for the end.  If the host has never changed
   1450             state, the list is empty.
   1451 
   1452     """
   1453     host = models.Host.smart_get(host_id)
   1454     if not host.shard or utils.is_shard():
   1455         return status_history.get_diagnosis_interval(
   1456                 host_id, end_time, success)
   1457     else:
   1458         shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
   1459         return shard_afe.get_host_diagnosis_interval(
   1460                 host_id, end_time, success)
   1461 
   1462 
   1463 # support for host detail view
   1464 
   1465 def get_host_queue_entries_and_special_tasks(host, query_start=None,
   1466                                              query_limit=None, start_time=None,
   1467                                              end_time=None):
   1468     """
   1469     @returns an interleaved list of HostQueueEntries and SpecialTasks,
   1470             in approximate run order.  each dict contains keys for type, host,
   1471             job, status, started_on, execution_path, and ID.
   1472     """
   1473     total_limit = None
   1474     if query_limit is not None:
   1475         total_limit = query_start + query_limit
   1476     filter_data_common = {'host': host,
   1477                           'query_limit': total_limit,
   1478                           'sort_by': ['-id']}
   1479 
   1480     filter_data_special_tasks = rpc_utils.inject_times_to_filter(
   1481             'time_started__gte', 'time_started__lte', start_time, end_time,
   1482             **filter_data_common)
   1483 
   1484     queue_entries = get_host_queue_entries(
   1485             start_time, end_time, **filter_data_common)
   1486     special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
   1487 
   1488     interleaved_entries = rpc_utils.interleave_entries(queue_entries,
   1489                                                        special_tasks)
   1490     if query_start is not None:
   1491         interleaved_entries = interleaved_entries[query_start:]
   1492     if query_limit is not None:
   1493         interleaved_entries = interleaved_entries[:query_limit]
   1494     return rpc_utils.prepare_host_queue_entries_and_special_tasks(
   1495             interleaved_entries, queue_entries)
   1496 
   1497 
   1498 def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
   1499                                                  end_time=None):
   1500     filter_data_common = {'host': host}
   1501 
   1502     filter_data_queue_entries, filter_data_special_tasks = (
   1503             rpc_utils.inject_times_to_hqe_special_tasks_filters(
   1504                     filter_data_common, start_time, end_time))
   1505 
   1506     return (models.HostQueueEntry.query_count(filter_data_queue_entries)
   1507             + get_host_num_special_tasks(**filter_data_special_tasks))
   1508 
   1509 
   1510 # recurring run
   1511 
   1512 def get_recurring(**filter_data):
   1513     return rpc_utils.prepare_rows_as_nested_dicts(
   1514             models.RecurringRun.query_objects(filter_data),
   1515             ('job', 'owner'))
   1516 
   1517 
   1518 def get_num_recurring(**filter_data):
   1519     return models.RecurringRun.query_count(filter_data)
   1520 
   1521 
   1522 def delete_recurring_runs(**filter_data):
   1523     to_delete = models.RecurringRun.query_objects(filter_data)
   1524     to_delete.delete()
   1525 
   1526 
   1527 def create_recurring_run(job_id, start_date, loop_period, loop_count):
   1528     owner = models.User.current_user().login
   1529     job = models.Job.objects.get(id=job_id)
   1530     return job.create_recurring_job(start_date=start_date,
   1531                                     loop_period=loop_period,
   1532                                     loop_count=loop_count,
   1533                                     owner=owner)
   1534 
   1535 
   1536 # other
   1537 
   1538 def echo(data=""):
   1539     """\
   1540     Returns a passed in string. For doing a basic test to see if RPC calls
   1541     can successfully be made.
   1542     """
   1543     return data
   1544 
   1545 
   1546 def get_motd():
   1547     """\
   1548     Returns the message of the day as a string.
   1549     """
   1550     return rpc_utils.get_motd()
   1551 
   1552 
   1553 def get_static_data():
   1554     """\
   1555     Returns a dictionary containing a bunch of data that shouldn't change
   1556     often and is otherwise inaccessible.  This includes:
   1557 
   1558     priorities: List of job priority choices.
   1559     default_priority: Default priority value for new jobs.
   1560     users: Sorted list of all users.
   1561     labels: Sorted list of labels not start with 'cros-version' and
   1562             'fw-version'.
   1563     atomic_groups: Sorted list of all atomic groups.
   1564     tests: Sorted list of all tests.
   1565     profilers: Sorted list of all profilers.
   1566     current_user: Logged-in username.
   1567     host_statuses: Sorted list of possible Host statuses.
   1568     job_statuses: Sorted list of possible HostQueueEntry statuses.
   1569     job_timeout_default: The default job timeout length in minutes.
   1570     parse_failed_repair_default: Default value for the parse_failed_repair job
   1571             option.
   1572     reboot_before_options: A list of valid RebootBefore string enums.
   1573     reboot_after_options: A list of valid RebootAfter string enums.
   1574     motd: Server's message of the day.
   1575     status_dictionary: A mapping from one word job status names to a more
   1576             informative description.
   1577     """
   1578 
   1579     job_fields = models.Job.get_field_dict()
   1580     default_drone_set_name = models.DroneSet.default_drone_set_name()
   1581     drone_sets = ([default_drone_set_name] +
   1582                   sorted(drone_set.name for drone_set in
   1583                          models.DroneSet.objects.exclude(
   1584                                  name=default_drone_set_name)))
   1585 
   1586     result = {}
   1587     result['priorities'] = priorities.Priority.choices()
   1588     default_priority = priorities.Priority.DEFAULT
   1589     result['default_priority'] = 'Default'
   1590     result['max_schedulable_priority'] = priorities.Priority.DEFAULT
   1591     result['users'] = get_users(sort_by=['login'])
   1592 
   1593     label_exclude_filters = [{'name__startswith': 'cros-version'},
   1594                              {'name__startswith': 'fw-version'},
   1595                              {'name__startswith': 'fwrw-version'},
   1596                              {'name__startswith': 'fwro-version'}]
   1597     result['labels'] = get_labels(
   1598         label_exclude_filters,
   1599         sort_by=['-platform', 'name'])
   1600 
   1601     result['atomic_groups'] = get_atomic_groups(sort_by=['name'])
   1602     result['tests'] = get_tests(sort_by=['name'])
   1603     result['profilers'] = get_profilers(sort_by=['name'])
   1604     result['current_user'] = rpc_utils.prepare_for_serialization(
   1605         models.User.current_user().get_object_dict())
   1606     result['host_statuses'] = sorted(models.Host.Status.names)
   1607     result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
   1608     result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS
   1609     result['job_max_runtime_mins_default'] = (
   1610         models.Job.DEFAULT_MAX_RUNTIME_MINS)
   1611     result['parse_failed_repair_default'] = bool(
   1612         models.Job.DEFAULT_PARSE_FAILED_REPAIR)
   1613     result['reboot_before_options'] = model_attributes.RebootBefore.names
   1614     result['reboot_after_options'] = model_attributes.RebootAfter.names
   1615     result['motd'] = rpc_utils.get_motd()
   1616     result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()
   1617     result['drone_sets'] = drone_sets
   1618     result['parameterized_jobs'] = models.Job.parameterized_jobs_enabled()
   1619 
   1620     result['status_dictionary'] = {"Aborted": "Aborted",
   1621                                    "Verifying": "Verifying Host",
   1622                                    "Provisioning": "Provisioning Host",
   1623                                    "Pending": "Waiting on other hosts",
   1624                                    "Running": "Running autoserv",
   1625                                    "Completed": "Autoserv completed",
   1626                                    "Failed": "Failed to complete",
   1627                                    "Queued": "Queued",
   1628                                    "Starting": "Next in host's queue",
   1629                                    "Stopped": "Other host(s) failed verify",
   1630                                    "Parsing": "Awaiting parse of final results",
   1631                                    "Gathering": "Gathering log files",
   1632                                    "Template": "Template job for recurring run",
   1633                                    "Waiting": "Waiting for scheduler action",
   1634                                    "Archiving": "Archiving results",
   1635                                    "Resetting": "Resetting hosts"}
   1636 
   1637     result['wmatrix_url'] = rpc_utils.get_wmatrix_url()
   1638     result['is_moblab'] = bool(utils.is_moblab())
   1639 
   1640     return result
   1641 
   1642 
   1643 def get_server_time():
   1644     return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
   1645