1 # pylint: disable=missing-docstring 2 3 """\ 4 Functions to expose over the RPC interface. 5 6 For all modify* and delete* functions that ask for an 'id' parameter to 7 identify the object to operate on, the id may be either 8 * the database row ID 9 * the name of the object (label name, hostname, user login, etc.) 10 * a dictionary containing uniquely identifying field (this option should seldom 11 be used) 12 13 When specifying foreign key fields (i.e. adding hosts to a label, or adding 14 users to an ACL group), the given value may be either the database row ID or the 15 name of the object. 16 17 All get* functions return lists of dictionaries. Each dictionary represents one 18 object and maps field names to values. 19 20 Some examples: 21 modify_host(2, hostname='myhost') # modify hostname of host with database ID 2 22 modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2' 23 modify_test('sleeptest', test_type='Client', params=', seconds=60') 24 delete_acl_group(1) # delete by ID 25 delete_acl_group('Everyone') # delete by name 26 acl_group_add_users('Everyone', ['mbligh', 'showard']) 27 get_jobs(owner='showard', status='Queued') 28 29 See doctests/001_rpc_test.txt for (lots) more examples. 30 """ 31 32 __author__ = 'showard (at] google.com (Steve Howard)' 33 34 import ast 35 import collections 36 import contextlib 37 import datetime 38 import logging 39 import os 40 import sys 41 import warnings 42 43 from django.db import connection as db_connection 44 from django.db import transaction 45 from django.db.models import Count 46 from django.db.utils import DatabaseError 47 48 import common 49 from autotest_lib.client.common_lib import control_data 50 from autotest_lib.client.common_lib import error 51 from autotest_lib.client.common_lib import global_config 52 from autotest_lib.client.common_lib import priorities 53 from autotest_lib.client.common_lib import time_utils 54 from autotest_lib.client.common_lib.cros import dev_server 55 from autotest_lib.frontend.afe import control_file as control_file_lib 56 from autotest_lib.frontend.afe import model_attributes 57 from autotest_lib.frontend.afe import model_logic 58 from autotest_lib.frontend.afe import models 59 from autotest_lib.frontend.afe import rpc_utils 60 from autotest_lib.frontend.tko import models as tko_models 61 from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface 62 from autotest_lib.server import frontend 63 from autotest_lib.server import utils 64 from autotest_lib.server.cros import provision 65 from autotest_lib.server.cros.dynamic_suite import constants 66 from autotest_lib.server.cros.dynamic_suite import control_file_getter 67 from autotest_lib.server.cros.dynamic_suite import suite as SuiteBase 68 from autotest_lib.server.cros.dynamic_suite import tools 69 from autotest_lib.server.cros.dynamic_suite.suite import Suite 70 from autotest_lib.server.lib import status_history 71 from autotest_lib.site_utils import job_history 72 from autotest_lib.site_utils import server_manager_utils 73 from autotest_lib.site_utils import stable_version_utils 74 75 76 _CONFIG = global_config.global_config 77 78 # Definition of LabHealthIndicator 79 LabHealthIndicator = collections.namedtuple( 80 'LabHealthIndicator', 81 [ 82 'if_lab_close', 83 'available_duts', 84 'devserver_health', 85 'upcoming_builds', 86 ] 87 ) 88 89 RESPECT_STATIC_LABELS = global_config.global_config.get_config_value( 90 'SKYLAB', 'respect_static_labels', type=bool, default=False) 91 92 RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value( 93 'SKYLAB', 'respect_static_attributes', type=bool, default=False) 94 95 # Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py. 96 97 # labels 98 99 def modify_label(id, **data): 100 """Modify a label. 101 102 @param id: id or name of a label. More often a label name. 103 @param data: New data for a label. 104 """ 105 label_model = models.Label.smart_get(id) 106 if label_model.is_replaced_by_static(): 107 raise error.UnmodifiableLabelException( 108 'Failed to delete label "%s" because it is a static label. ' 109 'Use go/chromeos-skylab-inventory-tools to modify this ' 110 'label.' % label_model.name) 111 112 label_model.update_object(data) 113 114 # Master forwards the RPC to shards 115 if not utils.is_shard(): 116 rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False, 117 id=id, **data) 118 119 120 def delete_label(id): 121 """Delete a label. 122 123 @param id: id or name of a label. More often a label name. 124 """ 125 label_model = models.Label.smart_get(id) 126 if label_model.is_replaced_by_static(): 127 raise error.UnmodifiableLabelException( 128 'Failed to delete label "%s" because it is a static label. ' 129 'Use go/chromeos-skylab-inventory-tools to modify this ' 130 'label.' % label_model.name) 131 132 # Hosts that have the label to be deleted. Save this info before 133 # the label is deleted to use it later. 134 hosts = [] 135 for h in label_model.host_set.all(): 136 hosts.append(models.Host.smart_get(h.id)) 137 label_model.delete() 138 139 # Master forwards the RPC to shards 140 if not utils.is_shard(): 141 rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id) 142 143 144 def add_label(name, ignore_exception_if_exists=False, **kwargs): 145 """Adds a new label of a given name. 146 147 @param name: label name. 148 @param ignore_exception_if_exists: If True and the exception was 149 thrown due to the duplicated label name when adding a label, 150 then suppress the exception. Default is False. 151 @param kwargs: keyword args that store more info about a label 152 other than the name. 153 @return: int/long id of a new label. 154 """ 155 # models.Label.add_object() throws model_logic.ValidationError 156 # when it is given a label name that already exists. 157 # However, ValidationError can be thrown with different errors, 158 # and those errors should be thrown up to the call chain. 159 try: 160 label = models.Label.add_object(name=name, **kwargs) 161 except: 162 exc_info = sys.exc_info() 163 if ignore_exception_if_exists: 164 label = rpc_utils.get_label(name) 165 # If the exception is raised not because of duplicated 166 # "name", then raise the original exception. 167 if label is None: 168 raise exc_info[0], exc_info[1], exc_info[2] 169 else: 170 raise exc_info[0], exc_info[1], exc_info[2] 171 return label.id 172 173 174 def add_label_to_hosts(id, hosts): 175 """Adds a label of the given id to the given hosts only in local DB. 176 177 @param id: id or name of a label. More often a label name. 178 @param hosts: The hostnames of hosts that need the label. 179 180 @raises models.Label.DoesNotExist: If the label with id doesn't exist. 181 """ 182 label = models.Label.smart_get(id) 183 if label.is_replaced_by_static(): 184 label = models.StaticLabel.smart_get(label.name) 185 186 host_objs = models.Host.smart_get_bulk(hosts) 187 if label.platform: 188 models.Host.check_no_platform(host_objs) 189 # Ensure a host has no more than one board label with it. 190 if label.name.startswith('board:'): 191 models.Host.check_board_labels_allowed(host_objs, [label.name]) 192 label.host_set.add(*host_objs) 193 194 195 def _create_label_everywhere(id, hosts): 196 """ 197 Yet another method to create labels. 198 199 ALERT! This method should be run only on master not shards! 200 DO NOT RUN THIS ON A SHARD!!! Deputies will hate you if you do!!! 201 202 This method exists primarily to serve label_add_hosts() and 203 host_add_labels(). Basically it pulls out the label check/add logic 204 from label_add_hosts() into this nice method that not only creates 205 the label but also tells the shards that service the hosts to also 206 create the label. 207 208 @param id: id or name of a label. More often a label name. 209 @param hosts: A list of hostnames or ids. More often hostnames. 210 """ 211 try: 212 label = models.Label.smart_get(id) 213 except models.Label.DoesNotExist: 214 # This matches the type checks in smart_get, which is a hack 215 # in and off itself. The aim here is to create any non-existent 216 # label, which we cannot do if the 'id' specified isn't a label name. 217 if isinstance(id, basestring): 218 label = models.Label.smart_get(add_label(id)) 219 else: 220 raise ValueError('Label id (%s) does not exist. Please specify ' 221 'the argument, id, as a string (label name).' 222 % id) 223 224 # Make sure the label exists on the shard with the same id 225 # as it is on the master. 226 # It is possible that the label is already in a shard because 227 # we are adding a new label only to shards of hosts that the label 228 # is going to be attached. 229 # For example, we add a label L1 to a host in shard S1. 230 # Master and S1 will have L1 but other shards won't. 231 # Later, when we add the same label L1 to hosts in shards S1 and S2, 232 # S1 already has the label but S2 doesn't. 233 # S2 should have the new label without any problem. 234 # We ignore exception in such a case. 235 host_objs = models.Host.smart_get_bulk(hosts) 236 rpc_utils.fanout_rpc( 237 host_objs, 'add_label', include_hostnames=False, 238 name=label.name, ignore_exception_if_exists=True, 239 id=label.id, platform=label.platform) 240 241 242 @rpc_utils.route_rpc_to_master 243 def label_add_hosts(id, hosts): 244 """Adds a label with the given id to the given hosts. 245 246 This method should be run only on master not shards. 247 The given label will be created if it doesn't exist, provided the `id` 248 supplied is a label name not an int/long id. 249 250 @param id: id or name of a label. More often a label name. 251 @param hosts: A list of hostnames or ids. More often hostnames. 252 253 @raises ValueError: If the id specified is an int/long (label id) 254 while the label does not exist. 255 """ 256 # Create the label. 257 _create_label_everywhere(id, hosts) 258 259 # Add it to the master. 260 add_label_to_hosts(id, hosts) 261 262 # Add it to the shards. 263 host_objs = models.Host.smart_get_bulk(hosts) 264 rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id) 265 266 267 def remove_label_from_hosts(id, hosts): 268 """Removes a label of the given id from the given hosts only in local DB. 269 270 @param id: id or name of a label. 271 @param hosts: The hostnames of hosts that need to remove the label from. 272 """ 273 host_objs = models.Host.smart_get_bulk(hosts) 274 label = models.Label.smart_get(id) 275 if label.is_replaced_by_static(): 276 raise error.UnmodifiableLabelException( 277 'Failed to remove label "%s" for hosts "%r" because it is a ' 278 'static label. Use go/chromeos-skylab-inventory-tools to ' 279 'modify this label.' % (label.name, hosts)) 280 281 label.host_set.remove(*host_objs) 282 283 284 @rpc_utils.route_rpc_to_master 285 def label_remove_hosts(id, hosts): 286 """Removes a label of the given id from the given hosts. 287 288 This method should be run only on master not shards. 289 290 @param id: id or name of a label. 291 @param hosts: A list of hostnames or ids. More often hostnames. 292 """ 293 host_objs = models.Host.smart_get_bulk(hosts) 294 remove_label_from_hosts(id, hosts) 295 296 rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id) 297 298 299 def get_labels(exclude_filters=(), **filter_data): 300 """\ 301 @param exclude_filters: A sequence of dictionaries of filters. 302 303 @returns A sequence of nested dictionaries of label information. 304 """ 305 labels = models.Label.query_objects(filter_data) 306 for exclude_filter in exclude_filters: 307 labels = labels.exclude(**exclude_filter) 308 309 if not RESPECT_STATIC_LABELS: 310 return rpc_utils.prepare_rows_as_nested_dicts(labels, ()) 311 312 static_labels = models.StaticLabel.query_objects(filter_data) 313 for exclude_filter in exclude_filters: 314 static_labels = static_labels.exclude(**exclude_filter) 315 316 non_static_lists = rpc_utils.prepare_rows_as_nested_dicts(labels, ()) 317 static_lists = rpc_utils.prepare_rows_as_nested_dicts(static_labels, ()) 318 319 label_ids = [label.id for label in labels] 320 replaced = models.ReplacedLabel.objects.filter(label__id__in=label_ids) 321 replaced_ids = {r.label_id for r in replaced} 322 replaced_label_names = {l.name for l in labels if l.id in replaced_ids} 323 324 return_lists = [] 325 for non_static_label in non_static_lists: 326 if non_static_label.get('id') not in replaced_ids: 327 return_lists.append(non_static_label) 328 329 for static_label in static_lists: 330 if static_label.get('name') in replaced_label_names: 331 return_lists.append(static_label) 332 333 return return_lists 334 335 336 # hosts 337 338 def add_host(hostname, status=None, locked=None, lock_reason='', protection=None): 339 if locked and not lock_reason: 340 raise model_logic.ValidationError( 341 {'locked': 'Please provide a reason for locking when adding host.'}) 342 343 return models.Host.add_object(hostname=hostname, status=status, 344 locked=locked, lock_reason=lock_reason, 345 protection=protection).id 346 347 348 @rpc_utils.route_rpc_to_master 349 def modify_host(id, **kwargs): 350 """Modify local attributes of a host. 351 352 If this is called on the master, but the host is assigned to a shard, this 353 will call `modify_host_local` RPC to the responsible shard. This means if 354 a host is being locked using this function, this change will also propagate 355 to shards. 356 When this is called on a shard, the shard just routes the RPC to the master 357 and does nothing. 358 359 @param id: id of the host to modify. 360 @param kwargs: key=value pairs of values to set on the host. 361 """ 362 rpc_utils.check_modify_host(kwargs) 363 host = models.Host.smart_get(id) 364 try: 365 rpc_utils.check_modify_host_locking(host, kwargs) 366 except model_logic.ValidationError as e: 367 if not kwargs.get('force_modify_locking', False): 368 raise 369 logging.exception('The following exception will be ignored and lock ' 370 'modification will be enforced. %s', e) 371 372 # This is required to make `lock_time` for a host be exactly same 373 # between the master and a shard. 374 if kwargs.get('locked', None) and 'lock_time' not in kwargs: 375 kwargs['lock_time'] = datetime.datetime.now() 376 377 # force_modifying_locking is not an internal field in database, remove. 378 shard_kwargs = dict(kwargs) 379 shard_kwargs.pop('force_modify_locking', None) 380 rpc_utils.fanout_rpc([host], 'modify_host_local', 381 include_hostnames=False, id=id, **shard_kwargs) 382 383 # Update the local DB **after** RPC fanout is complete. 384 # This guarantees that the master state is only updated if the shards were 385 # correctly updated. 386 # In case the shard update fails mid-flight and the master-shard desync, we 387 # always consider the master state to be the source-of-truth, and any 388 # (automated) corrective actions will revert the (partial) shard updates. 389 host.update_object(kwargs) 390 391 392 def modify_host_local(id, **kwargs): 393 """Modify host attributes in local DB. 394 395 @param id: Host id. 396 @param kwargs: key=value pairs of values to set on the host. 397 """ 398 models.Host.smart_get(id).update_object(kwargs) 399 400 401 @rpc_utils.route_rpc_to_master 402 def modify_hosts(host_filter_data, update_data): 403 """Modify local attributes of multiple hosts. 404 405 If this is called on the master, but one of the hosts in that match the 406 filters is assigned to a shard, this will call `modify_hosts_local` RPC 407 to the responsible shard. 408 When this is called on a shard, the shard just routes the RPC to the master 409 and does nothing. 410 411 The filters are always applied on the master, not on the shards. This means 412 if the states of a host differ on the master and a shard, the state on the 413 master will be used. I.e. this means: 414 A host was synced to Shard 1. On Shard 1 the status of the host was set to 415 'Repair Failed'. 416 - A call to modify_hosts with host_filter_data={'status': 'Ready'} will 417 update the host (both on the shard and on the master), because the state 418 of the host as the master knows it is still 'Ready'. 419 - A call to modify_hosts with host_filter_data={'status': 'Repair failed' 420 will not update the host, because the filter doesn't apply on the master. 421 422 @param host_filter_data: Filters out which hosts to modify. 423 @param update_data: A dictionary with the changes to make to the hosts. 424 """ 425 update_data = update_data.copy() 426 rpc_utils.check_modify_host(update_data) 427 hosts = models.Host.query_objects(host_filter_data) 428 429 affected_shard_hostnames = set() 430 affected_host_ids = [] 431 432 # Check all hosts before changing data for exception safety. 433 for host in hosts: 434 try: 435 rpc_utils.check_modify_host_locking(host, update_data) 436 except model_logic.ValidationError as e: 437 if not update_data.get('force_modify_locking', False): 438 raise 439 logging.exception('The following exception will be ignored and ' 440 'lock modification will be enforced. %s', e) 441 442 if host.shard: 443 affected_shard_hostnames.add(host.shard.hostname) 444 affected_host_ids.append(host.id) 445 446 # This is required to make `lock_time` for a host be exactly same 447 # between the master and a shard. 448 if update_data.get('locked', None) and 'lock_time' not in update_data: 449 update_data['lock_time'] = datetime.datetime.now() 450 for host in hosts: 451 host.update_object(update_data) 452 453 update_data.pop('force_modify_locking', None) 454 # Caution: Changing the filter from the original here. See docstring. 455 rpc_utils.run_rpc_on_multiple_hostnames( 456 'modify_hosts_local', affected_shard_hostnames, 457 host_filter_data={'id__in': affected_host_ids}, 458 update_data=update_data) 459 460 461 def modify_hosts_local(host_filter_data, update_data): 462 """Modify attributes of hosts in local DB. 463 464 @param host_filter_data: Filters out which hosts to modify. 465 @param update_data: A dictionary with the changes to make to the hosts. 466 """ 467 for host in models.Host.query_objects(host_filter_data): 468 host.update_object(update_data) 469 470 471 def add_labels_to_host(id, labels): 472 """Adds labels to a given host only in local DB. 473 474 @param id: id or hostname for a host. 475 @param labels: ids or names for labels. 476 """ 477 label_objs = models.Label.smart_get_bulk(labels) 478 if not RESPECT_STATIC_LABELS: 479 models.Host.smart_get(id).labels.add(*label_objs) 480 else: 481 static_labels, non_static_labels = models.Host.classify_label_objects( 482 label_objs) 483 host = models.Host.smart_get(id) 484 host.static_labels.add(*static_labels) 485 host.labels.add(*non_static_labels) 486 487 488 @rpc_utils.route_rpc_to_master 489 def host_add_labels(id, labels): 490 """Adds labels to a given host. 491 492 @param id: id or hostname for a host. 493 @param labels: ids or names for labels. 494 495 @raises ValidationError: If adding more than one platform/board label. 496 """ 497 # Create the labels on the master/shards. 498 for label in labels: 499 _create_label_everywhere(label, [id]) 500 501 label_objs = models.Label.smart_get_bulk(labels) 502 503 platforms = [label.name for label in label_objs if label.platform] 504 boards = [label.name for label in label_objs 505 if label.name.startswith('board:')] 506 if len(platforms) > 1 or not utils.board_labels_allowed(boards): 507 raise model_logic.ValidationError( 508 {'labels': ('Adding more than one platform label, or a list of ' 509 'non-compatible board labels.: %s %s' % 510 (', '.join(platforms), ', '.join(boards)))}) 511 512 host_obj = models.Host.smart_get(id) 513 if platforms: 514 models.Host.check_no_platform([host_obj]) 515 if boards: 516 models.Host.check_board_labels_allowed([host_obj], labels) 517 add_labels_to_host(id, labels) 518 519 rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False, 520 id=id, labels=labels) 521 522 523 def remove_labels_from_host(id, labels): 524 """Removes labels from a given host only in local DB. 525 526 @param id: id or hostname for a host. 527 @param labels: ids or names for labels. 528 """ 529 label_objs = models.Label.smart_get_bulk(labels) 530 if not RESPECT_STATIC_LABELS: 531 models.Host.smart_get(id).labels.remove(*label_objs) 532 else: 533 static_labels, non_static_labels = models.Host.classify_label_objects( 534 label_objs) 535 host = models.Host.smart_get(id) 536 host.labels.remove(*non_static_labels) 537 if static_labels: 538 logging.info('Cannot remove labels "%r" for host "%r" due to they ' 539 'are static labels. Use ' 540 'go/chromeos-skylab-inventory-tools to modify these ' 541 'labels.', static_labels, id) 542 543 544 @rpc_utils.route_rpc_to_master 545 def host_remove_labels(id, labels): 546 """Removes labels from a given host. 547 548 @param id: id or hostname for a host. 549 @param labels: ids or names for labels. 550 """ 551 remove_labels_from_host(id, labels) 552 553 host_obj = models.Host.smart_get(id) 554 rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False, 555 id=id, labels=labels) 556 557 558 def get_host_attribute(attribute, **host_filter_data): 559 """ 560 @param attribute: string name of attribute 561 @param host_filter_data: filter data to apply to Hosts to choose hosts to 562 act upon 563 """ 564 hosts = rpc_utils.get_host_query((), False, True, host_filter_data) 565 hosts = list(hosts) 566 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 567 'attribute_list') 568 host_attr_dicts = [] 569 host_objs = [] 570 for host_obj in hosts: 571 for attr_obj in host_obj.attribute_list: 572 if attr_obj.attribute == attribute: 573 host_attr_dicts.append(attr_obj.get_object_dict()) 574 host_objs.append(host_obj) 575 576 if RESPECT_STATIC_ATTRIBUTES: 577 for host_attr, host_obj in zip(host_attr_dicts, host_objs): 578 static_attrs = models.StaticHostAttribute.query_objects( 579 {'host_id': host_obj.id, 'attribute': attribute}) 580 if len(static_attrs) > 0: 581 host_attr['value'] = static_attrs[0].value 582 583 return rpc_utils.prepare_for_serialization(host_attr_dicts) 584 585 586 @rpc_utils.route_rpc_to_master 587 def set_host_attribute(attribute, value, **host_filter_data): 588 """Set an attribute on hosts. 589 590 This RPC is a shim that forwards calls to master to be handled there. 591 592 @param attribute: string name of attribute 593 @param value: string, or None to delete an attribute 594 @param host_filter_data: filter data to apply to Hosts to choose hosts to 595 act upon 596 """ 597 assert not utils.is_shard() 598 set_host_attribute_impl(attribute, value, **host_filter_data) 599 600 601 def set_host_attribute_impl(attribute, value, **host_filter_data): 602 """Set an attribute on hosts. 603 604 *** DO NOT CALL THIS RPC from client code *** 605 This RPC exists for master-shard communication only. 606 Call set_host_attribute instead. 607 608 @param attribute: string name of attribute 609 @param value: string, or None to delete an attribute 610 @param host_filter_data: filter data to apply to Hosts to choose hosts to 611 act upon 612 """ 613 assert host_filter_data # disallow accidental actions on all hosts 614 hosts = models.Host.query_objects(host_filter_data) 615 models.AclGroup.check_for_acl_violation_hosts(hosts) 616 for host in hosts: 617 host.set_or_delete_attribute(attribute, value) 618 619 # Master forwards this RPC to shards. 620 if not utils.is_shard(): 621 rpc_utils.fanout_rpc(hosts, 'set_host_attribute_impl', False, 622 attribute=attribute, value=value, **host_filter_data) 623 624 625 @rpc_utils.forward_single_host_rpc_to_shard 626 def delete_host(id): 627 models.Host.smart_get(id).delete() 628 629 630 def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 631 valid_only=True, include_current_job=False, **filter_data): 632 """Get a list of dictionaries which contains the information of hosts. 633 634 @param multiple_labels: match hosts in all of the labels given. Should 635 be a list of label names. 636 @param exclude_only_if_needed_labels: Deprecated. Raise error if it's True. 637 @param include_current_job: Set to True to include ids of currently running 638 job and special task. 639 """ 640 if exclude_only_if_needed_labels: 641 raise error.RPCException('exclude_only_if_needed_labels is deprecated') 642 643 hosts = rpc_utils.get_host_query(multiple_labels, 644 exclude_only_if_needed_labels, 645 valid_only, filter_data) 646 hosts = list(hosts) 647 models.Host.objects.populate_relationships(hosts, models.Label, 648 'label_list') 649 models.Host.objects.populate_relationships(hosts, models.AclGroup, 650 'acl_list') 651 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 652 'attribute_list') 653 models.Host.objects.populate_relationships(hosts, 654 models.StaticHostAttribute, 655 'staticattribute_list') 656 host_dicts = [] 657 for host_obj in hosts: 658 host_dict = host_obj.get_object_dict() 659 host_dict['acls'] = [acl.name for acl in host_obj.acl_list] 660 host_dict['attributes'] = dict((attribute.attribute, attribute.value) 661 for attribute in host_obj.attribute_list) 662 if RESPECT_STATIC_LABELS: 663 label_list = [] 664 # Only keep static labels which has a corresponding entries in 665 # afe_labels. 666 for label in host_obj.label_list: 667 if label.is_replaced_by_static(): 668 static_label = models.StaticLabel.smart_get(label.name) 669 label_list.append(static_label) 670 else: 671 label_list.append(label) 672 673 host_dict['labels'] = [label.name for label in label_list] 674 host_dict['platform'] = rpc_utils.find_platform( 675 host_obj.hostname, label_list) 676 else: 677 host_dict['labels'] = [label.name for label in host_obj.label_list] 678 host_dict['platform'] = rpc_utils.find_platform( 679 host_obj.hostname, host_obj.label_list) 680 681 if RESPECT_STATIC_ATTRIBUTES: 682 # Overwrite attribute with values in afe_static_host_attributes. 683 for attr in host_obj.staticattribute_list: 684 if attr.attribute in host_dict['attributes']: 685 host_dict['attributes'][attr.attribute] = attr.value 686 687 if include_current_job: 688 host_dict['current_job'] = None 689 host_dict['current_special_task'] = None 690 entries = models.HostQueueEntry.objects.filter( 691 host_id=host_dict['id'], active=True, complete=False) 692 if entries: 693 host_dict['current_job'] = ( 694 entries[0].get_object_dict()['job']) 695 tasks = models.SpecialTask.objects.filter( 696 host_id=host_dict['id'], is_active=True, is_complete=False) 697 if tasks: 698 host_dict['current_special_task'] = ( 699 '%d-%s' % (tasks[0].get_object_dict()['id'], 700 tasks[0].get_object_dict()['task'].lower())) 701 host_dicts.append(host_dict) 702 703 return rpc_utils.prepare_for_serialization(host_dicts) 704 705 706 def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 707 valid_only=True, **filter_data): 708 """ 709 Same parameters as get_hosts(). 710 711 @returns The number of matching hosts. 712 """ 713 if exclude_only_if_needed_labels: 714 raise error.RPCException('exclude_only_if_needed_labels is deprecated') 715 716 hosts = rpc_utils.get_host_query(multiple_labels, 717 exclude_only_if_needed_labels, 718 valid_only, filter_data) 719 return len(hosts) 720 721 722 # tests 723 724 def get_tests(**filter_data): 725 return rpc_utils.prepare_for_serialization( 726 models.Test.list_objects(filter_data)) 727 728 729 def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name): 730 """Gets the counts of all passed and failed tests from the matching jobs. 731 732 @param job_name_prefix: Name prefix of the jobs to get the summary 733 from, e.g., 'butterfly-release/r40-6457.21.0/bvt-cq/'. Prefix 734 matching is case insensitive. 735 @param label_name: Label that must be set in the jobs, e.g., 736 'cros-version:butterfly-release/R40-6457.21.0'. 737 738 @returns A summary of the counts of all the passed and failed tests. 739 """ 740 job_ids = list(models.Job.objects.filter( 741 name__istartswith=job_name_prefix, 742 dependency_labels__name=label_name).values_list( 743 'pk', flat=True)) 744 summary = {'passed': 0, 'failed': 0} 745 if not job_ids: 746 return summary 747 748 counts = (tko_models.TestView.objects.filter( 749 afe_job_id__in=job_ids).exclude( 750 test_name='SERVER_JOB').exclude( 751 test_name__startswith='CLIENT_JOB').values( 752 'status').annotate( 753 count=Count('status'))) 754 for status in counts: 755 if status['status'] == 'GOOD': 756 summary['passed'] += status['count'] 757 else: 758 summary['failed'] += status['count'] 759 return summary 760 761 762 # profilers 763 764 def add_profiler(name, description=None): 765 return models.Profiler.add_object(name=name, description=description).id 766 767 768 def modify_profiler(id, **data): 769 models.Profiler.smart_get(id).update_object(data) 770 771 772 def delete_profiler(id): 773 models.Profiler.smart_get(id).delete() 774 775 776 def get_profilers(**filter_data): 777 return rpc_utils.prepare_for_serialization( 778 models.Profiler.list_objects(filter_data)) 779 780 781 # users 782 783 def get_users(**filter_data): 784 return rpc_utils.prepare_for_serialization( 785 models.User.list_objects(filter_data)) 786 787 788 # acl groups 789 790 def add_acl_group(name, description=None): 791 group = models.AclGroup.add_object(name=name, description=description) 792 group.users.add(models.User.current_user()) 793 return group.id 794 795 796 def modify_acl_group(id, **data): 797 group = models.AclGroup.smart_get(id) 798 group.check_for_acl_violation_acl_group() 799 group.update_object(data) 800 group.add_current_user_if_empty() 801 802 803 def acl_group_add_users(id, users): 804 group = models.AclGroup.smart_get(id) 805 group.check_for_acl_violation_acl_group() 806 users = models.User.smart_get_bulk(users) 807 group.users.add(*users) 808 809 810 def acl_group_remove_users(id, users): 811 group = models.AclGroup.smart_get(id) 812 group.check_for_acl_violation_acl_group() 813 users = models.User.smart_get_bulk(users) 814 group.users.remove(*users) 815 group.add_current_user_if_empty() 816 817 818 def acl_group_add_hosts(id, hosts): 819 group = models.AclGroup.smart_get(id) 820 group.check_for_acl_violation_acl_group() 821 hosts = models.Host.smart_get_bulk(hosts) 822 group.hosts.add(*hosts) 823 group.on_host_membership_change() 824 825 826 def acl_group_remove_hosts(id, hosts): 827 group = models.AclGroup.smart_get(id) 828 group.check_for_acl_violation_acl_group() 829 hosts = models.Host.smart_get_bulk(hosts) 830 group.hosts.remove(*hosts) 831 group.on_host_membership_change() 832 833 834 def delete_acl_group(id): 835 models.AclGroup.smart_get(id).delete() 836 837 838 def get_acl_groups(**filter_data): 839 acl_groups = models.AclGroup.list_objects(filter_data) 840 for acl_group in acl_groups: 841 acl_group_obj = models.AclGroup.objects.get(id=acl_group['id']) 842 acl_group['users'] = [user.login 843 for user in acl_group_obj.users.all()] 844 acl_group['hosts'] = [host.hostname 845 for host in acl_group_obj.hosts.all()] 846 return rpc_utils.prepare_for_serialization(acl_groups) 847 848 849 # jobs 850 851 def generate_control_file(tests=(), profilers=(), 852 client_control_file='', use_container=False, 853 profile_only=None, db_tests=True, 854 test_source_build=None): 855 """ 856 Generates a client-side control file to run tests. 857 858 @param tests List of tests to run. See db_tests for more information. 859 @param profilers List of profilers to activate during the job. 860 @param client_control_file The contents of a client-side control file to 861 run at the end of all tests. If this is supplied, all tests must be 862 client side. 863 TODO: in the future we should support server control files directly 864 to wrap with a kernel. That'll require changing the parameter 865 name and adding a boolean to indicate if it is a client or server 866 control file. 867 @param use_container unused argument today. TODO: Enable containers 868 on the host during a client side test. 869 @param profile_only A boolean that indicates what default profile_only 870 mode to use in the control file. Passing None will generate a 871 control file that does not explcitly set the default mode at all. 872 @param db_tests: if True, the test object can be found in the database 873 backing the test model. In this case, tests is a tuple 874 of test IDs which are used to retrieve the test objects 875 from the database. If False, tests is a tuple of test 876 dictionaries stored client-side in the AFE. 877 @param test_source_build: Build to be used to retrieve test code. Default 878 to None. 879 880 @returns a dict with the following keys: 881 control_file: str, The control file text. 882 is_server: bool, is the control file a server-side control file? 883 synch_count: How many machines the job uses per autoserv execution. 884 synch_count == 1 means the job is asynchronous. 885 dependencies: A list of the names of labels on which the job depends. 886 """ 887 if not tests and not client_control_file: 888 return dict(control_file='', is_server=False, synch_count=1, 889 dependencies=[]) 890 891 cf_info, test_objects, profiler_objects = ( 892 rpc_utils.prepare_generate_control_file(tests, profilers, 893 db_tests)) 894 cf_info['control_file'] = control_file_lib.generate_control( 895 tests=test_objects, profilers=profiler_objects, 896 is_server=cf_info['is_server'], 897 client_control_file=client_control_file, profile_only=profile_only, 898 test_source_build=test_source_build) 899 return cf_info 900 901 902 def create_job_page_handler(name, priority, control_file, control_type, 903 image=None, hostless=False, firmware_rw_build=None, 904 firmware_ro_build=None, test_source_build=None, 905 is_cloning=False, cheets_build=None, **kwargs): 906 """\ 907 Create and enqueue a job. 908 909 @param name name of this job 910 @param priority Integer priority of this job. Higher is more important. 911 @param control_file String contents of the control file. 912 @param control_type Type of control file, Client or Server. 913 @param image: ChromeOS build to be installed in the dut. Default to None. 914 @param firmware_rw_build: Firmware build to update RW firmware. Default to 915 None, i.e., RW firmware will not be updated. 916 @param firmware_ro_build: Firmware build to update RO firmware. Default to 917 None, i.e., RO firmware will not be updated. 918 @param test_source_build: Build to be used to retrieve test code. Default 919 to None. 920 @param is_cloning: True if creating a cloning job. 921 @param cheets_build: ChromeOS Android build to be installed in the dut. 922 Default to None. Cheets build will not be updated. 923 @param kwargs extra args that will be required by create_suite_job or 924 create_job. 925 926 @returns The created Job id number. 927 """ 928 test_args = {} 929 if kwargs.get('args'): 930 # args' format is: ['disable_sysinfo=False', 'fast=True', ...] 931 args = kwargs.get('args') 932 for arg in args: 933 k, v = arg.split('=')[0], arg.split('=')[1] 934 test_args[k] = v 935 936 if is_cloning: 937 logging.info('Start to clone a new job') 938 # When cloning a job, hosts and meta_hosts should not exist together, 939 # which would cause host-scheduler to schedule two hqe jobs to one host 940 # at the same time, and crash itself. Clear meta_hosts for this case. 941 if kwargs.get('hosts') and kwargs.get('meta_hosts'): 942 kwargs['meta_hosts'] = [] 943 else: 944 logging.info('Start to create a new job') 945 control_file = rpc_utils.encode_ascii(control_file) 946 if not control_file: 947 raise model_logic.ValidationError({ 948 'control_file' : "Control file cannot be empty"}) 949 950 if image and hostless: 951 builds = {} 952 builds[provision.CROS_VERSION_PREFIX] = image 953 if cheets_build: 954 builds[provision.CROS_ANDROID_VERSION_PREFIX] = cheets_build 955 if firmware_rw_build: 956 builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build 957 if firmware_ro_build: 958 builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build 959 return create_suite_job( 960 name=name, control_file=control_file, priority=priority, 961 builds=builds, test_source_build=test_source_build, 962 is_cloning=is_cloning, test_args=test_args, **kwargs) 963 964 return create_job(name, priority, control_file, control_type, image=image, 965 hostless=hostless, test_args=test_args, **kwargs) 966 967 968 @rpc_utils.route_rpc_to_master 969 def create_job( 970 name, 971 priority, 972 control_file, 973 control_type, 974 hosts=(), 975 meta_hosts=(), 976 one_time_hosts=(), 977 synch_count=None, 978 is_template=False, 979 timeout=None, 980 timeout_mins=None, 981 max_runtime_mins=None, 982 run_verify=False, 983 email_list='', 984 dependencies=(), 985 reboot_before=None, 986 reboot_after=None, 987 parse_failed_repair=None, 988 hostless=False, 989 keyvals=None, 990 drone_set=None, 991 image=None, 992 parent_job_id=None, 993 test_retry=0, 994 run_reset=True, 995 require_ssp=None, 996 test_args=None, 997 **kwargs): 998 """\ 999 Create and enqueue a job. 1000 1001 @param name name of this job 1002 @param priority Integer priority of this job. Higher is more important. 1003 @param control_file String contents of the control file. 1004 @param control_type Type of control file, Client or Server. 1005 @param synch_count How many machines the job uses per autoserv execution. 1006 synch_count == 1 means the job is asynchronous. If an atomic group is 1007 given this value is treated as a minimum. 1008 @param is_template If true then create a template job. 1009 @param timeout Hours after this call returns until the job times out. 1010 @param timeout_mins Minutes after this call returns until the job times 1011 out. 1012 @param max_runtime_mins Minutes from job starting time until job times out 1013 @param run_verify Should the host be verified before running the test? 1014 @param email_list String containing emails to mail when the job is done 1015 @param dependencies List of label names on which this job depends 1016 @param reboot_before Never, If dirty, or Always 1017 @param reboot_after Never, If all tests passed, or Always 1018 @param parse_failed_repair if true, results of failed repairs launched by 1019 this job will be parsed as part of the job. 1020 @param hostless if true, create a hostless job 1021 @param keyvals dict of keyvals to associate with the job 1022 @param hosts List of hosts to run job on. 1023 @param meta_hosts List where each entry is a label name, and for each entry 1024 one host will be chosen from that label to run the job on. 1025 @param one_time_hosts List of hosts not in the database to run the job on. 1026 @param drone_set The name of the drone set to run this test on. 1027 @param image OS image to install before running job. 1028 @param parent_job_id id of a job considered to be parent of created job. 1029 @param test_retry Number of times to retry test if the test did not 1030 complete successfully. (optional, default: 0) 1031 @param run_reset Should the host be reset before running the test? 1032 @param require_ssp Set to True to require server-side packaging to run the 1033 test. If it's set to None, drone will still try to run 1034 the server side with server-side packaging. If the 1035 autotest-server package doesn't exist for the build or 1036 image is not set, drone will run the test without server- 1037 side packaging. Default is None. 1038 @param test_args A dict of args passed to be injected into control file. 1039 @param kwargs extra keyword args. NOT USED. 1040 1041 @returns The created Job id number. 1042 """ 1043 if test_args: 1044 control_file = tools.inject_vars(test_args, control_file) 1045 if image: 1046 dependencies += (provision.image_version_to_label(image),) 1047 return rpc_utils.create_job_common( 1048 name=name, 1049 priority=priority, 1050 control_type=control_type, 1051 control_file=control_file, 1052 hosts=hosts, 1053 meta_hosts=meta_hosts, 1054 one_time_hosts=one_time_hosts, 1055 synch_count=synch_count, 1056 is_template=is_template, 1057 timeout=timeout, 1058 timeout_mins=timeout_mins, 1059 max_runtime_mins=max_runtime_mins, 1060 run_verify=run_verify, 1061 email_list=email_list, 1062 dependencies=dependencies, 1063 reboot_before=reboot_before, 1064 reboot_after=reboot_after, 1065 parse_failed_repair=parse_failed_repair, 1066 hostless=hostless, 1067 keyvals=keyvals, 1068 drone_set=drone_set, 1069 parent_job_id=parent_job_id, 1070 test_retry=test_retry, 1071 run_reset=run_reset, 1072 require_ssp=require_ssp) 1073 1074 1075 def abort_host_queue_entries(**filter_data): 1076 """\ 1077 Abort a set of host queue entries. 1078 1079 @return: A list of dictionaries, each contains information 1080 about an aborted HQE. 1081 """ 1082 query = models.HostQueueEntry.query_objects(filter_data) 1083 1084 # Dont allow aborts on: 1085 # 1. Jobs that have already completed (whether or not they were aborted) 1086 # 2. Jobs that we have already been aborted (but may not have completed) 1087 query = query.filter(complete=False).filter(aborted=False) 1088 models.AclGroup.check_abort_permissions(query) 1089 host_queue_entries = list(query.select_related()) 1090 rpc_utils.check_abort_synchronous_jobs(host_queue_entries) 1091 1092 models.HostQueueEntry.abort_host_queue_entries(host_queue_entries) 1093 hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id, 1094 'Job name': hqe.job.name} for hqe in host_queue_entries] 1095 return hqe_info 1096 1097 1098 def abort_special_tasks(**filter_data): 1099 """\ 1100 Abort the special task, or tasks, specified in the filter. 1101 """ 1102 query = models.SpecialTask.query_objects(filter_data) 1103 special_tasks = query.filter(is_active=True) 1104 for task in special_tasks: 1105 task.abort() 1106 1107 1108 def _call_special_tasks_on_hosts(task, hosts): 1109 """\ 1110 Schedules a set of hosts for a special task. 1111 1112 @returns A list of hostnames that a special task was created for. 1113 """ 1114 models.AclGroup.check_for_acl_violation_hosts(hosts) 1115 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts) 1116 if shard_host_map and not utils.is_shard(): 1117 raise ValueError('The following hosts are on shards, please ' 1118 'follow the link to the shards and create jobs ' 1119 'there instead. %s.' % shard_host_map) 1120 for host in hosts: 1121 models.SpecialTask.schedule_special_task(host, task) 1122 return list(sorted(host.hostname for host in hosts)) 1123 1124 1125 def _forward_special_tasks_on_hosts(task, rpc, **filter_data): 1126 """Forward special tasks to corresponding shards. 1127 1128 For master, when special tasks are fired on hosts that are sharded, 1129 forward the RPC to corresponding shards. 1130 1131 For shard, create special task records in local DB. 1132 1133 @param task: Enum value of frontend.afe.models.SpecialTask.Task 1134 @param rpc: RPC name to forward. 1135 @param filter_data: Filter keywords to be used for DB query. 1136 1137 @return: A list of hostnames that a special task was created for. 1138 """ 1139 hosts = models.Host.query_objects(filter_data) 1140 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts) 1141 1142 # Filter out hosts on a shard from those on the master, forward 1143 # rpcs to the shard with an additional hostname__in filter, and 1144 # create a local SpecialTask for each remaining host. 1145 if shard_host_map and not utils.is_shard(): 1146 hosts = [h for h in hosts if h.shard is None] 1147 for shard, hostnames in shard_host_map.iteritems(): 1148 1149 # The main client of this module is the frontend website, and 1150 # it invokes it with an 'id' or an 'id__in' filter. Regardless, 1151 # the 'hostname' filter should narrow down the list of hosts on 1152 # each shard even though we supply all the ids in filter_data. 1153 # This method uses hostname instead of id because it fits better 1154 # with the overall architecture of redirection functions in 1155 # rpc_utils. 1156 shard_filter = filter_data.copy() 1157 shard_filter['hostname__in'] = hostnames 1158 rpc_utils.run_rpc_on_multiple_hostnames( 1159 rpc, [shard], **shard_filter) 1160 1161 # There is a race condition here if someone assigns a shard to one of these 1162 # hosts before we create the task. The host will stay on the master if: 1163 # 1. The host is not Ready 1164 # 2. The host is Ready but has a task 1165 # But if the host is Ready and doesn't have a task yet, it will get sent 1166 # to the shard as we're creating a task here. 1167 1168 # Given that we only rarely verify Ready hosts it isn't worth putting this 1169 # entire method in a transaction. The worst case scenario is that we have 1170 # a verify running on a Ready host while the shard is using it, if the 1171 # verify fails no subsequent tasks will be created against the host on the 1172 # master, and verifies are safe enough that this is OK. 1173 return _call_special_tasks_on_hosts(task, hosts) 1174 1175 1176 def reverify_hosts(**filter_data): 1177 """\ 1178 Schedules a set of hosts for verify. 1179 1180 @returns A list of hostnames that a verify task was created for. 1181 """ 1182 return _forward_special_tasks_on_hosts( 1183 models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data) 1184 1185 1186 def repair_hosts(**filter_data): 1187 """\ 1188 Schedules a set of hosts for repair. 1189 1190 @returns A list of hostnames that a repair task was created for. 1191 """ 1192 return _forward_special_tasks_on_hosts( 1193 models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data) 1194 1195 1196 def get_jobs(not_yet_run=False, running=False, finished=False, 1197 suite=False, sub=False, standalone=False, **filter_data): 1198 """\ 1199 Extra status filter args for get_jobs: 1200 -not_yet_run: Include only jobs that have not yet started running. 1201 -running: Include only jobs that have start running but for which not 1202 all hosts have completed. 1203 -finished: Include only jobs for which all hosts have completed (or 1204 aborted). 1205 1206 Extra type filter args for get_jobs: 1207 -suite: Include only jobs with child jobs. 1208 -sub: Include only jobs with a parent job. 1209 -standalone: Inlcude only jobs with no child or parent jobs. 1210 At most one of these three fields should be specified. 1211 """ 1212 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1213 running, 1214 finished) 1215 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1216 suite, 1217 sub, 1218 standalone) 1219 job_dicts = [] 1220 jobs = list(models.Job.query_objects(filter_data)) 1221 models.Job.objects.populate_relationships(jobs, models.Label, 1222 'dependencies') 1223 models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals') 1224 for job in jobs: 1225 job_dict = job.get_object_dict() 1226 job_dict['dependencies'] = ','.join(label.name 1227 for label in job.dependencies) 1228 job_dict['keyvals'] = dict((keyval.key, keyval.value) 1229 for keyval in job.keyvals) 1230 job_dicts.append(job_dict) 1231 return rpc_utils.prepare_for_serialization(job_dicts) 1232 1233 1234 def get_num_jobs(not_yet_run=False, running=False, finished=False, 1235 suite=False, sub=False, standalone=False, 1236 **filter_data): 1237 """\ 1238 See get_jobs() for documentation of extra filter parameters. 1239 """ 1240 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1241 running, 1242 finished) 1243 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1244 suite, 1245 sub, 1246 standalone) 1247 return models.Job.query_count(filter_data) 1248 1249 1250 def get_jobs_summary(**filter_data): 1251 """\ 1252 Like get_jobs(), but adds 'status_counts' and 'result_counts' field. 1253 1254 'status_counts' filed is a dictionary mapping status strings to the number 1255 of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}. 1256 1257 'result_counts' field is piped to tko's rpc_interface and has the return 1258 format specified under get_group_counts. 1259 """ 1260 jobs = get_jobs(**filter_data) 1261 ids = [job['id'] for job in jobs] 1262 all_status_counts = models.Job.objects.get_status_counts(ids) 1263 for job in jobs: 1264 job['status_counts'] = all_status_counts[job['id']] 1265 job['result_counts'] = tko_rpc_interface.get_status_counts( 1266 ['afe_job_id', 'afe_job_id'], 1267 header_groups=[['afe_job_id'], ['afe_job_id']], 1268 **{'afe_job_id': job['id']}) 1269 return rpc_utils.prepare_for_serialization(jobs) 1270 1271 1272 def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None): 1273 """\ 1274 Retrieves all the information needed to clone a job. 1275 """ 1276 job = models.Job.objects.get(id=id) 1277 job_info = rpc_utils.get_job_info(job, 1278 preserve_metahosts, 1279 queue_entry_filter_data) 1280 1281 host_dicts = [] 1282 for host in job_info['hosts']: 1283 host_dict = get_hosts(id=host.id)[0] 1284 other_labels = host_dict['labels'] 1285 if host_dict['platform']: 1286 other_labels.remove(host_dict['platform']) 1287 host_dict['other_labels'] = ', '.join(other_labels) 1288 host_dicts.append(host_dict) 1289 1290 for host in job_info['one_time_hosts']: 1291 host_dict = dict(hostname=host.hostname, 1292 id=host.id, 1293 platform='(one-time host)', 1294 locked_text='') 1295 host_dicts.append(host_dict) 1296 1297 # convert keys from Label objects to strings (names of labels) 1298 meta_host_counts = dict((meta_host.name, count) for meta_host, count 1299 in job_info['meta_host_counts'].iteritems()) 1300 1301 info = dict(job=job.get_object_dict(), 1302 meta_host_counts=meta_host_counts, 1303 hosts=host_dicts) 1304 info['job']['dependencies'] = job_info['dependencies'] 1305 info['hostless'] = job_info['hostless'] 1306 info['drone_set'] = job.drone_set and job.drone_set.name 1307 1308 image = _get_image_for_job(job, job_info['hostless']) 1309 if image: 1310 info['job']['image'] = image 1311 1312 return rpc_utils.prepare_for_serialization(info) 1313 1314 1315 def _get_image_for_job(job, hostless): 1316 """Gets the image used for a job. 1317 1318 Gets the image used for an AFE job from the job's keyvals 'build' or 1319 'builds'. If that fails, and the job is a hostless job, tries to 1320 get the image from its control file attributes 'build' or 'builds'. 1321 1322 TODO(ntang): Needs to handle FAFT with two builds for ro/rw. 1323 1324 @param job An AFE job object. 1325 @param hostless Boolean indicating whether the job is hostless. 1326 1327 @returns The image build used for the job. 1328 """ 1329 keyvals = job.keyval_dict() 1330 image = keyvals.get('build') 1331 if not image: 1332 value = keyvals.get('builds') 1333 builds = None 1334 if isinstance(value, dict): 1335 builds = value 1336 elif isinstance(value, basestring): 1337 builds = ast.literal_eval(value) 1338 if builds: 1339 image = builds.get('cros-version') 1340 if not image and hostless and job.control_file: 1341 try: 1342 control_obj = control_data.parse_control_string( 1343 job.control_file) 1344 if hasattr(control_obj, 'build'): 1345 image = getattr(control_obj, 'build') 1346 if not image and hasattr(control_obj, 'builds'): 1347 builds = getattr(control_obj, 'builds') 1348 image = builds.get('cros-version') 1349 except: 1350 logging.warning('Failed to parse control file for job: %s', 1351 job.name) 1352 return image 1353 1354 1355 def get_host_queue_entries_by_insert_time( 1356 insert_time_after=None, insert_time_before=None, **filter_data): 1357 """Like get_host_queue_entries, but using the insert index table. 1358 1359 @param insert_time_after: A lower bound on insert_time 1360 @param insert_time_before: An upper bound on insert_time 1361 @returns A sequence of nested dictionaries of host and job information. 1362 """ 1363 assert insert_time_after is not None or insert_time_before is not None, \ 1364 ('Caller to get_host_queue_entries_by_insert_time must provide either' 1365 ' insert_time_after or insert_time_before.') 1366 # Get insert bounds on the index of the host queue entries. 1367 if insert_time_after: 1368 query = models.HostQueueEntryStartTimes.objects.filter( 1369 # Note: '-insert_time' means descending. We want the largest 1370 # insert time smaller than the insert time. 1371 insert_time__lte=insert_time_after).order_by('-insert_time') 1372 try: 1373 constraint = query[0].highest_hqe_id 1374 if 'id__gte' in filter_data: 1375 constraint = max(constraint, filter_data['id__gte']) 1376 filter_data['id__gte'] = constraint 1377 except IndexError: 1378 pass 1379 1380 # Get end bounds. 1381 if insert_time_before: 1382 query = models.HostQueueEntryStartTimes.objects.filter( 1383 insert_time__gte=insert_time_before).order_by('insert_time') 1384 try: 1385 constraint = query[0].highest_hqe_id 1386 if 'id__lte' in filter_data: 1387 constraint = min(constraint, filter_data['id__lte']) 1388 filter_data['id__lte'] = constraint 1389 except IndexError: 1390 pass 1391 1392 return rpc_utils.prepare_rows_as_nested_dicts( 1393 models.HostQueueEntry.query_objects(filter_data), 1394 ('host', 'job')) 1395 1396 1397 def get_host_queue_entries(start_time=None, end_time=None, **filter_data): 1398 """\ 1399 @returns A sequence of nested dictionaries of host and job information. 1400 """ 1401 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1402 'started_on__lte', 1403 start_time, 1404 end_time, 1405 **filter_data) 1406 return rpc_utils.prepare_rows_as_nested_dicts( 1407 models.HostQueueEntry.query_objects(filter_data), 1408 ('host', 'job')) 1409 1410 1411 def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data): 1412 """\ 1413 Get the number of host queue entries associated with this job. 1414 """ 1415 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1416 'started_on__lte', 1417 start_time, 1418 end_time, 1419 **filter_data) 1420 return models.HostQueueEntry.query_count(filter_data) 1421 1422 1423 def get_hqe_percentage_complete(**filter_data): 1424 """ 1425 Computes the fraction of host queue entries matching the given filter data 1426 that are complete. 1427 """ 1428 query = models.HostQueueEntry.query_objects(filter_data) 1429 complete_count = query.filter(complete=True).count() 1430 total_count = query.count() 1431 if total_count == 0: 1432 return 1 1433 return float(complete_count) / total_count 1434 1435 1436 # special tasks 1437 1438 def get_special_tasks(**filter_data): 1439 """Get special task entries from the local database. 1440 1441 Query the special tasks table for tasks matching the given 1442 `filter_data`, and return a list of the results. No attempt is 1443 made to forward the call to shards; the buck will stop here. 1444 The caller is expected to know the target shard for such reasons 1445 as: 1446 * The caller is a service (such as gs_offloader) configured 1447 to operate on behalf of one specific shard, and no other. 1448 * The caller has a host as a parameter, and knows that this is 1449 the shard assigned to that host. 1450 1451 @param filter_data Filter keywords to pass to the underlying 1452 database query. 1453 1454 """ 1455 return rpc_utils.prepare_rows_as_nested_dicts( 1456 models.SpecialTask.query_objects(filter_data), 1457 ('host', 'queue_entry')) 1458 1459 1460 def get_host_special_tasks(host_id, **filter_data): 1461 """Get special task entries for a given host. 1462 1463 Query the special tasks table for tasks that ran on the host 1464 given by `host_id` and matching the given `filter_data`. 1465 Return a list of the results. If the host is assigned to a 1466 shard, forward this call to that shard. 1467 1468 @param host_id Id in the database of the target host. 1469 @param filter_data Filter keywords to pass to the underlying 1470 database query. 1471 1472 """ 1473 # Retrieve host data even if the host is in an invalid state. 1474 host = models.Host.smart_get(host_id, False) 1475 if not host.shard: 1476 return get_special_tasks(host_id=host_id, **filter_data) 1477 else: 1478 # The return values from AFE methods are post-processed 1479 # objects that aren't JSON-serializable. So, we have to 1480 # call AFE.run() to get the raw, serializable output from 1481 # the shard. 1482 shard_afe = frontend.AFE(server=host.shard.hostname) 1483 return shard_afe.run('get_special_tasks', 1484 host_id=host_id, **filter_data) 1485 1486 1487 def get_num_special_tasks(**kwargs): 1488 """Get the number of special task entries from the local database. 1489 1490 Query the special tasks table for tasks matching the given 'kwargs', 1491 and return the number of the results. No attempt is made to forward 1492 the call to shards; the buck will stop here. 1493 1494 @param kwargs Filter keywords to pass to the underlying database query. 1495 1496 """ 1497 return models.SpecialTask.query_count(kwargs) 1498 1499 1500 def get_host_num_special_tasks(host, **kwargs): 1501 """Get special task entries for a given host. 1502 1503 Query the special tasks table for tasks that ran on the host 1504 given by 'host' and matching the given 'kwargs'. 1505 Return a list of the results. If the host is assigned to a 1506 shard, forward this call to that shard. 1507 1508 @param host id or name of a host. More often a hostname. 1509 @param kwargs Filter keywords to pass to the underlying database query. 1510 1511 """ 1512 # Retrieve host data even if the host is in an invalid state. 1513 host_model = models.Host.smart_get(host, False) 1514 if not host_model.shard: 1515 return get_num_special_tasks(host=host, **kwargs) 1516 else: 1517 shard_afe = frontend.AFE(server=host_model.shard.hostname) 1518 return shard_afe.run('get_num_special_tasks', host=host, **kwargs) 1519 1520 1521 def get_status_task(host_id, end_time): 1522 """Get the "status task" for a host from the local shard. 1523 1524 Returns a single special task representing the given host's 1525 "status task". The status task is a completed special task that 1526 identifies whether the corresponding host was working or broken 1527 when it completed. A successful task indicates a working host; 1528 a failed task indicates broken. 1529 1530 This call will not be forward to a shard; the receiving server 1531 must be the shard that owns the host. 1532 1533 @param host_id Id in the database of the target host. 1534 @param end_time Time reference for the host's status. 1535 1536 @return A single task; its status (successful or not) 1537 corresponds to the status of the host (working or 1538 broken) at the given time. If no task is found, return 1539 `None`. 1540 1541 """ 1542 tasklist = rpc_utils.prepare_rows_as_nested_dicts( 1543 status_history.get_status_task(host_id, end_time), 1544 ('host', 'queue_entry')) 1545 return tasklist[0] if tasklist else None 1546 1547 1548 def get_host_status_task(host_id, end_time): 1549 """Get the "status task" for a host from its owning shard. 1550 1551 Finds the given host's owning shard, and forwards to it a call 1552 to `get_status_task()` (see above). 1553 1554 @param host_id Id in the database of the target host. 1555 @param end_time Time reference for the host's status. 1556 1557 @return A single task; its status (successful or not) 1558 corresponds to the status of the host (working or 1559 broken) at the given time. If no task is found, return 1560 `None`. 1561 1562 """ 1563 host = models.Host.smart_get(host_id) 1564 if not host.shard: 1565 return get_status_task(host_id, end_time) 1566 else: 1567 # The return values from AFE methods are post-processed 1568 # objects that aren't JSON-serializable. So, we have to 1569 # call AFE.run() to get the raw, serializable output from 1570 # the shard. 1571 shard_afe = frontend.AFE(server=host.shard.hostname) 1572 return shard_afe.run('get_status_task', 1573 host_id=host_id, end_time=end_time) 1574 1575 1576 def get_host_diagnosis_interval(host_id, end_time, success): 1577 """Find a "diagnosis interval" for a given host. 1578 1579 A "diagnosis interval" identifies a start and end time where 1580 the host went from "working" to "broken", or vice versa. The 1581 interval's starting time is the starting time of the last status 1582 task with the old status; the end time is the finish time of the 1583 first status task with the new status. 1584 1585 This routine finds the most recent diagnosis interval for the 1586 given host prior to `end_time`, with a starting status matching 1587 `success`. If `success` is true, the interval will start with a 1588 successful status task; if false the interval will start with a 1589 failed status task. 1590 1591 @param host_id Id in the database of the target host. 1592 @param end_time Time reference for the diagnosis interval. 1593 @param success Whether the diagnosis interval should start 1594 with a successful or failed status task. 1595 1596 @return A list of two strings. The first is the timestamp for 1597 the beginning of the interval; the second is the 1598 timestamp for the end. If the host has never changed 1599 state, the list is empty. 1600 1601 """ 1602 host = models.Host.smart_get(host_id) 1603 if not host.shard or utils.is_shard(): 1604 return status_history.get_diagnosis_interval( 1605 host_id, end_time, success) 1606 else: 1607 shard_afe = frontend.AFE(server=host.shard.hostname) 1608 return shard_afe.get_host_diagnosis_interval( 1609 host_id, end_time, success) 1610 1611 1612 # support for host detail view 1613 1614 def get_host_queue_entries_and_special_tasks(host, query_start=None, 1615 query_limit=None, start_time=None, 1616 end_time=None): 1617 """ 1618 @returns an interleaved list of HostQueueEntries and SpecialTasks, 1619 in approximate run order. each dict contains keys for type, host, 1620 job, status, started_on, execution_path, and ID. 1621 """ 1622 total_limit = None 1623 if query_limit is not None: 1624 total_limit = query_start + query_limit 1625 filter_data_common = {'host': host, 1626 'query_limit': total_limit, 1627 'sort_by': ['-id']} 1628 1629 filter_data_special_tasks = rpc_utils.inject_times_to_filter( 1630 'time_started__gte', 'time_started__lte', start_time, end_time, 1631 **filter_data_common) 1632 1633 queue_entries = get_host_queue_entries( 1634 start_time, end_time, **filter_data_common) 1635 special_tasks = get_host_special_tasks(host, **filter_data_special_tasks) 1636 1637 interleaved_entries = rpc_utils.interleave_entries(queue_entries, 1638 special_tasks) 1639 if query_start is not None: 1640 interleaved_entries = interleaved_entries[query_start:] 1641 if query_limit is not None: 1642 interleaved_entries = interleaved_entries[:query_limit] 1643 return rpc_utils.prepare_host_queue_entries_and_special_tasks( 1644 interleaved_entries, queue_entries) 1645 1646 1647 def get_num_host_queue_entries_and_special_tasks(host, start_time=None, 1648 end_time=None): 1649 filter_data_common = {'host': host} 1650 1651 filter_data_queue_entries, filter_data_special_tasks = ( 1652 rpc_utils.inject_times_to_hqe_special_tasks_filters( 1653 filter_data_common, start_time, end_time)) 1654 1655 return (models.HostQueueEntry.query_count(filter_data_queue_entries) 1656 + get_host_num_special_tasks(**filter_data_special_tasks)) 1657 1658 1659 # other 1660 1661 def echo(data=""): 1662 """\ 1663 Returns a passed in string. For doing a basic test to see if RPC calls 1664 can successfully be made. 1665 """ 1666 return data 1667 1668 1669 def get_motd(): 1670 """\ 1671 Returns the message of the day as a string. 1672 """ 1673 return rpc_utils.get_motd() 1674 1675 1676 def get_static_data(): 1677 """\ 1678 Returns a dictionary containing a bunch of data that shouldn't change 1679 often and is otherwise inaccessible. This includes: 1680 1681 priorities: List of job priority choices. 1682 default_priority: Default priority value for new jobs. 1683 users: Sorted list of all users. 1684 labels: Sorted list of labels not start with 'cros-version' and 1685 'fw-version'. 1686 tests: Sorted list of all tests. 1687 profilers: Sorted list of all profilers. 1688 current_user: Logged-in username. 1689 host_statuses: Sorted list of possible Host statuses. 1690 job_statuses: Sorted list of possible HostQueueEntry statuses. 1691 job_timeout_default: The default job timeout length in minutes. 1692 parse_failed_repair_default: Default value for the parse_failed_repair job 1693 option. 1694 reboot_before_options: A list of valid RebootBefore string enums. 1695 reboot_after_options: A list of valid RebootAfter string enums. 1696 motd: Server's message of the day. 1697 status_dictionary: A mapping from one word job status names to a more 1698 informative description. 1699 """ 1700 1701 default_drone_set_name = models.DroneSet.default_drone_set_name() 1702 drone_sets = ([default_drone_set_name] + 1703 sorted(drone_set.name for drone_set in 1704 models.DroneSet.objects.exclude( 1705 name=default_drone_set_name))) 1706 1707 result = {} 1708 result['priorities'] = priorities.Priority.choices() 1709 result['default_priority'] = 'Default' 1710 result['max_schedulable_priority'] = priorities.Priority.DEFAULT 1711 result['users'] = get_users(sort_by=['login']) 1712 1713 label_exclude_filters = [{'name__startswith': 'cros-version'}, 1714 {'name__startswith': 'fw-version'}, 1715 {'name__startswith': 'fwrw-version'}, 1716 {'name__startswith': 'fwro-version'}, 1717 {'name__startswith': 'ab-version'}, 1718 {'name__startswith': 'testbed-version'}] 1719 result['labels'] = get_labels( 1720 label_exclude_filters, 1721 sort_by=['-platform', 'name']) 1722 1723 result['tests'] = get_tests(sort_by=['name']) 1724 result['profilers'] = get_profilers(sort_by=['name']) 1725 result['current_user'] = rpc_utils.prepare_for_serialization( 1726 models.User.current_user().get_object_dict()) 1727 result['host_statuses'] = sorted(models.Host.Status.names) 1728 result['job_statuses'] = sorted(models.HostQueueEntry.Status.names) 1729 result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS 1730 result['job_max_runtime_mins_default'] = ( 1731 models.Job.DEFAULT_MAX_RUNTIME_MINS) 1732 result['parse_failed_repair_default'] = bool( 1733 models.Job.DEFAULT_PARSE_FAILED_REPAIR) 1734 result['reboot_before_options'] = model_attributes.RebootBefore.names 1735 result['reboot_after_options'] = model_attributes.RebootAfter.names 1736 result['motd'] = rpc_utils.get_motd() 1737 result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled() 1738 result['drone_sets'] = drone_sets 1739 1740 result['status_dictionary'] = {"Aborted": "Aborted", 1741 "Verifying": "Verifying Host", 1742 "Provisioning": "Provisioning Host", 1743 "Pending": "Waiting on other hosts", 1744 "Running": "Running autoserv", 1745 "Completed": "Autoserv completed", 1746 "Failed": "Failed to complete", 1747 "Queued": "Queued", 1748 "Starting": "Next in host's queue", 1749 "Stopped": "Other host(s) failed verify", 1750 "Parsing": "Awaiting parse of final results", 1751 "Gathering": "Gathering log files", 1752 "Waiting": "Waiting for scheduler action", 1753 "Archiving": "Archiving results", 1754 "Resetting": "Resetting hosts"} 1755 1756 result['wmatrix_url'] = rpc_utils.get_wmatrix_url() 1757 result['stainless_url'] = rpc_utils.get_stainless_url() 1758 result['is_moblab'] = bool(utils.is_moblab()) 1759 1760 return result 1761 1762 1763 def get_server_time(): 1764 return datetime.datetime.now().strftime("%Y-%m-%d %H:%M") 1765 1766 1767 def ping_db(): 1768 """Simple connection test to db""" 1769 try: 1770 db_connection.cursor() 1771 except DatabaseError: 1772 return [False] 1773 return [True] 1774 1775 1776 def get_hosts_by_attribute(attribute, value): 1777 """ 1778 Get the list of valid hosts that share the same host attribute value. 1779 1780 @param attribute: String of the host attribute to check. 1781 @param value: String of the value that is shared between hosts. 1782 1783 @returns List of hostnames that all have the same host attribute and 1784 value. 1785 """ 1786 rows = models.HostAttribute.query_objects({'attribute': attribute, 1787 'value': value}) 1788 if RESPECT_STATIC_ATTRIBUTES: 1789 returned_hosts = set() 1790 # Add hosts: 1791 # * Non-valid 1792 # * Exist in afe_host_attribute with given attribute. 1793 # * Don't exist in afe_static_host_attribute OR exist in 1794 # afe_static_host_attribute with the same given value. 1795 for row in rows: 1796 if row.host.invalid != 0: 1797 continue 1798 1799 static_hosts = models.StaticHostAttribute.query_objects( 1800 {'host_id': row.host.id, 'attribute': attribute}) 1801 values = [static_host.value for static_host in static_hosts] 1802 if len(values) == 0 or values[0] == value: 1803 returned_hosts.add(row.host.hostname) 1804 1805 # Add hosts: 1806 # * Non-valid 1807 # * Exist in afe_static_host_attribute with given attribute 1808 # and value 1809 # * No need to check whether each static attribute has its 1810 # corresponding entry in afe_host_attribute since it is ensured 1811 # in inventory sync. 1812 static_rows = models.StaticHostAttribute.query_objects( 1813 {'attribute': attribute, 'value': value}) 1814 for row in static_rows: 1815 if row.host.invalid != 0: 1816 continue 1817 1818 returned_hosts.add(row.host.hostname) 1819 1820 return list(returned_hosts) 1821 else: 1822 return [row.host.hostname for row in rows if row.host.invalid == 0] 1823 1824 1825 def canonicalize_suite_name(suite_name): 1826 """Canonicalize the suite's name. 1827 1828 @param suite_name: the name of the suite. 1829 """ 1830 # Do not change this naming convention without updating 1831 # site_utils.parse_job_name. 1832 return 'test_suites/control.%s' % suite_name 1833 1834 1835 def formatted_now(): 1836 """Format the current datetime.""" 1837 return datetime.datetime.now().strftime(time_utils.TIME_FMT) 1838 1839 1840 def _get_control_file_by_build(build, ds, suite_name): 1841 """Return control file contents for |suite_name|. 1842 1843 Query the dev server at |ds| for the control file |suite_name|, included 1844 in |build| for |board|. 1845 1846 @param build: unique name by which to refer to the image from now on. 1847 @param ds: a dev_server.DevServer instance to fetch control file with. 1848 @param suite_name: canonicalized suite name, e.g. test_suites/control.bvt. 1849 @raises ControlFileNotFound if a unique suite control file doesn't exist. 1850 @raises NoControlFileList if we can't list the control files at all. 1851 @raises ControlFileEmpty if the control file exists on the server, but 1852 can't be read. 1853 1854 @return the contents of the desired control file. 1855 """ 1856 getter = control_file_getter.DevServerGetter.create(build, ds) 1857 devserver_name = ds.hostname 1858 # Get the control file for the suite. 1859 try: 1860 control_file_in = getter.get_control_file_contents_by_name(suite_name) 1861 except error.CrosDynamicSuiteException as e: 1862 raise type(e)('Failed to get control file for %s ' 1863 '(devserver: %s) (error: %s)' % 1864 (build, devserver_name, e)) 1865 if not control_file_in: 1866 raise error.ControlFileEmpty( 1867 "Fetching %s returned no data. (devserver: %s)" % 1868 (suite_name, devserver_name)) 1869 # Force control files to only contain ascii characters. 1870 try: 1871 control_file_in.encode('ascii') 1872 except UnicodeDecodeError as e: 1873 raise error.ControlFileMalformed(str(e)) 1874 1875 return control_file_in 1876 1877 1878 def _get_control_file_by_suite(suite_name): 1879 """Get control file contents by suite name. 1880 1881 @param suite_name: Suite name as string. 1882 @returns: Control file contents as string. 1883 """ 1884 getter = control_file_getter.FileSystemGetter( 1885 [_CONFIG.get_config_value('SCHEDULER', 1886 'drone_installation_directory')]) 1887 return getter.get_control_file_contents_by_name(suite_name) 1888 1889 1890 def _stage_build_artifacts(build, hostname=None): 1891 """ 1892 Ensure components of |build| necessary for installing images are staged. 1893 1894 @param build image we want to stage. 1895 @param hostname hostname of a dut may run test on. This is to help to locate 1896 a devserver closer to duts if needed. Default is None. 1897 1898 @raises StageControlFileFailure: if the dev server throws 500 while staging 1899 suite control files. 1900 1901 @return: dev_server.ImageServer instance to use with this build. 1902 @return: timings dictionary containing staging start/end times. 1903 """ 1904 timings = {} 1905 # Ensure components of |build| necessary for installing images are staged 1906 # on the dev server. However set synchronous to False to allow other 1907 # components to be downloaded in the background. 1908 ds = dev_server.resolve(build, hostname=hostname) 1909 ds_name = ds.hostname 1910 timings[constants.DOWNLOAD_STARTED_TIME] = formatted_now() 1911 try: 1912 ds.stage_artifacts(image=build, artifacts=['test_suites']) 1913 except dev_server.DevServerException as e: 1914 raise error.StageControlFileFailure( 1915 "Failed to stage %s on %s: %s" % (build, ds_name, e)) 1916 timings[constants.PAYLOAD_FINISHED_TIME] = formatted_now() 1917 return (ds, timings) 1918 1919 1920 @rpc_utils.route_rpc_to_master 1921 def create_suite_job( 1922 name='', 1923 board='', 1924 pool='', 1925 child_dependencies=(), 1926 control_file='', 1927 check_hosts=True, 1928 num=None, 1929 file_bugs=False, 1930 timeout=24, 1931 timeout_mins=None, 1932 priority=priorities.Priority.DEFAULT, 1933 suite_args=None, 1934 wait_for_results=True, 1935 job_retry=False, 1936 max_retries=None, 1937 max_runtime_mins=None, 1938 suite_min_duts=0, 1939 offload_failures_only=False, 1940 builds=None, 1941 test_source_build=None, 1942 run_prod_code=False, 1943 delay_minutes=0, 1944 is_cloning=False, 1945 job_keyvals=None, 1946 test_args=None, 1947 **kwargs): 1948 """ 1949 Create a job to run a test suite on the given device with the given image. 1950 1951 When the timeout specified in the control file is reached, the 1952 job is guaranteed to have completed and results will be available. 1953 1954 @param name: The test name if control_file is supplied, otherwise the name 1955 of the test suite to run, e.g. 'bvt'. 1956 @param board: the kind of device to run the tests on. 1957 @param builds: the builds to install e.g. 1958 {'cros-version:': 'x86-alex-release/R18-1655.0.0', 1959 'fwrw-version:': 'x86-alex-firmware/R36-5771.50.0', 1960 'fwro-version:': 'x86-alex-firmware/R36-5771.49.0'} 1961 If builds is given a value, it overrides argument build. 1962 @param test_source_build: Build that contains the server-side test code. 1963 @param pool: Specify the pool of machines to use for scheduling 1964 purposes. 1965 @param child_dependencies: (optional) list of additional dependency labels 1966 (strings) that will be added as dependency labels to child jobs. 1967 @param control_file: the control file of the job. 1968 @param check_hosts: require appropriate live hosts to exist in the lab. 1969 @param num: Specify the number of machines to schedule across (integer). 1970 Leave unspecified or use None to use default sharding factor. 1971 @param file_bugs: File a bug on each test failure in this suite. 1972 @param timeout: The max lifetime of this suite, in hours. 1973 @param timeout_mins: The max lifetime of this suite, in minutes. Takes 1974 priority over timeout. 1975 @param priority: Integer denoting priority. Higher is more important. 1976 @param suite_args: Optional arguments which will be parsed by the suite 1977 control file. Used by control.test_that_wrapper to 1978 determine which tests to run. 1979 @param wait_for_results: Set to False to run the suite job without waiting 1980 for test jobs to finish. Default is True. 1981 @param job_retry: Set to True to enable job-level retry. Default is False. 1982 @param max_retries: Integer, maximum job retries allowed at suite level. 1983 None for no max. 1984 @param max_runtime_mins: Maximum amount of time a job can be running in 1985 minutes. 1986 @param suite_min_duts: Integer. Scheduler will prioritize getting the 1987 minimum number of machines for the suite when it is 1988 competing with another suite that has a higher 1989 priority but already got minimum machines it needs. 1990 @param offload_failures_only: Only enable gs_offloading for failed jobs. 1991 @param run_prod_code: If True, the suite will run the test code that 1992 lives in prod aka the test code currently on the 1993 lab servers. If False, the control files and test 1994 code for this suite run will be retrieved from the 1995 build artifacts. 1996 @param delay_minutes: Delay the creation of test jobs for a given number of 1997 minutes. 1998 @param is_cloning: True if creating a cloning job. 1999 @param job_keyvals: A dict of job keyvals to be inject to control file. 2000 @param test_args: A dict of args passed all the way to each individual test 2001 that will be actually run. 2002 @param kwargs: extra keyword args. NOT USED. 2003 2004 @raises ControlFileNotFound: if a unique suite control file doesn't exist. 2005 @raises NoControlFileList: if we can't list the control files at all. 2006 @raises StageControlFileFailure: If the dev server throws 500 while 2007 staging test_suites. 2008 @raises ControlFileEmpty: if the control file exists on the server, but 2009 can't be read. 2010 2011 @return: the job ID of the suite; -1 on error. 2012 """ 2013 if num is not None: 2014 warnings.warn('num is deprecated for create_suite_job') 2015 del num 2016 2017 if builds is None: 2018 builds = {} 2019 2020 # Default test source build to CrOS build if it's not specified and 2021 # run_prod_code is set to False. 2022 if not run_prod_code: 2023 test_source_build = Suite.get_test_source_build( 2024 builds, test_source_build=test_source_build) 2025 2026 sample_dut = rpc_utils.get_sample_dut(board, pool) 2027 2028 suite_name = canonicalize_suite_name(name) 2029 if run_prod_code: 2030 ds = dev_server.resolve(test_source_build, hostname=sample_dut) 2031 keyvals = {} 2032 else: 2033 (ds, keyvals) = _stage_build_artifacts( 2034 test_source_build, hostname=sample_dut) 2035 keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts 2036 2037 # Do not change this naming convention without updating 2038 # site_utils.parse_job_name. 2039 if run_prod_code: 2040 # If run_prod_code is True, test_source_build is not set, use the 2041 # first build in the builds list for the sutie job name. 2042 name = '%s-%s' % (builds.values()[0], suite_name) 2043 else: 2044 name = '%s-%s' % (test_source_build, suite_name) 2045 2046 timeout_mins = timeout_mins or timeout * 60 2047 max_runtime_mins = max_runtime_mins or timeout * 60 2048 2049 if not board: 2050 board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0] 2051 2052 if run_prod_code: 2053 control_file = _get_control_file_by_suite(suite_name) 2054 2055 if not control_file: 2056 # No control file was supplied so look it up from the build artifacts. 2057 control_file = _get_control_file_by_build( 2058 test_source_build, ds, suite_name) 2059 2060 # Prepend builds and board to the control file. 2061 if is_cloning: 2062 control_file = tools.remove_injection(control_file) 2063 2064 if suite_args is None: 2065 suite_args = dict() 2066 2067 # TODO(crbug.com/758427): suite_args_raw is needed to run old tests. 2068 # Can be removed after R64. 2069 if 'tests' in suite_args: 2070 # TODO(crbug.com/758427): test_that used to have its own 2071 # snowflake implementation of parsing command line arguments in 2072 # the test 2073 suite_args_raw = ' '.join([':lab:'] + suite_args['tests']) 2074 # TODO(crbug.com/760675): Needed for CTS/GTS as above, but when 2075 # 'tests' is not passed. Can be removed after R64. 2076 elif name.rpartition('/')[-1] in {'control.cts_N', 2077 'control.cts_N_preconditions', 2078 'control.gts'}: 2079 suite_args_raw = '' 2080 else: 2081 # TODO(crbug.com/758427): This is for suite_attr_wrapper. Can 2082 # be removed after R64. 2083 suite_args_raw = repr(suite_args) 2084 2085 inject_dict = { 2086 'board': board, 2087 # `build` is needed for suites like AU to stage image inside suite 2088 # control file. 2089 'build': test_source_build, 2090 'builds': builds, 2091 'check_hosts': check_hosts, 2092 'pool': pool, 2093 'child_dependencies': child_dependencies, 2094 'file_bugs': file_bugs, 2095 'timeout': timeout, 2096 'timeout_mins': timeout_mins, 2097 'devserver_url': ds.url(), 2098 'priority': priority, 2099 # TODO(crbug.com/758427): injecting suite_args is needed to run 2100 # old tests 2101 'suite_args' : suite_args_raw, 2102 'wait_for_results': wait_for_results, 2103 'job_retry': job_retry, 2104 'max_retries': max_retries, 2105 'max_runtime_mins': max_runtime_mins, 2106 'offload_failures_only': offload_failures_only, 2107 'test_source_build': test_source_build, 2108 'run_prod_code': run_prod_code, 2109 'delay_minutes': delay_minutes, 2110 'job_keyvals': job_keyvals, 2111 'test_args': test_args, 2112 } 2113 inject_dict.update(suite_args) 2114 control_file = tools.inject_vars(inject_dict, control_file) 2115 2116 return rpc_utils.create_job_common(name, 2117 priority=priority, 2118 timeout_mins=timeout_mins, 2119 max_runtime_mins=max_runtime_mins, 2120 control_type='Server', 2121 control_file=control_file, 2122 hostless=True, 2123 keyvals=keyvals) 2124 2125 2126 def get_job_history(**filter_data): 2127 """Get history of the job, including the special tasks executed for the job 2128 2129 @param filter_data: filter for the call, should at least include 2130 {'job_id': [job id]} 2131 @returns: JSON string of the job's history, including the information such 2132 as the hosts run the job and the special tasks executed before 2133 and after the job. 2134 """ 2135 job_id = filter_data['job_id'] 2136 job_info = job_history.get_job_info(job_id) 2137 return rpc_utils.prepare_for_serialization(job_info.get_history()) 2138 2139 2140 def get_host_history(start_time, end_time, hosts=None, board=None, pool=None): 2141 """Deprecated.""" 2142 raise ValueError('get_host_history rpc is deprecated ' 2143 'and no longer implemented.') 2144 2145 2146 def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(), 2147 known_host_ids=(), known_host_statuses=()): 2148 """Receive updates for job statuses from shards and assign hosts and jobs. 2149 2150 @param shard_hostname: Hostname of the calling shard 2151 @param jobs: Jobs in serialized form that should be updated with newer 2152 status from a shard. 2153 @param hqes: Hostqueueentries in serialized form that should be updated with 2154 newer status from a shard. Note that for every hostqueueentry 2155 the corresponding job must be in jobs. 2156 @param known_job_ids: List of ids of jobs the shard already has. 2157 @param known_host_ids: List of ids of hosts the shard already has. 2158 @param known_host_statuses: List of statuses of hosts the shard already has. 2159 2160 @returns: Serialized representations of hosts, jobs, suite job keyvals 2161 and their dependencies to be inserted into a shard's database. 2162 """ 2163 # The following alternatives to sending host and job ids in every heartbeat 2164 # have been considered: 2165 # 1. Sending the highest known job and host ids. This would work for jobs: 2166 # Newer jobs always have larger ids. Also, if a job is not assigned to a 2167 # particular shard during a heartbeat, it never will be assigned to this 2168 # shard later. 2169 # This is not true for hosts though: A host that is leased won't be sent 2170 # to the shard now, but might be sent in a future heartbeat. This means 2171 # sometimes hosts should be transfered that have a lower id than the 2172 # maximum host id the shard knows. 2173 # 2. Send the number of jobs/hosts the shard knows to the master in each 2174 # heartbeat. Compare these to the number of records that already have 2175 # the shard_id set to this shard. In the normal case, they should match. 2176 # In case they don't, resend all entities of that type. 2177 # This would work well for hosts, because there aren't that many. 2178 # Resending all jobs is quite a big overhead though. 2179 # Also, this approach might run into edge cases when entities are 2180 # ever deleted. 2181 # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts. 2182 # Using two different approaches isn't consistent and might cause 2183 # confusion. Also the issues with the case of deletions might still 2184 # occur. 2185 # 2186 # The overhead of sending all job and host ids in every heartbeat is low: 2187 # At peaks one board has about 1200 created but unfinished jobs. 2188 # See the numbers here: http://goo.gl/gQCGWH 2189 # Assuming that job id's have 6 digits and that json serialization takes a 2190 # comma and a space as overhead, the traffic per id sent is about 8 bytes. 2191 # If 5000 ids need to be sent, this means 40 kilobytes of traffic. 2192 # A NOT IN query with 5000 ids took about 30ms in tests made. 2193 # These numbers seem low enough to outweigh the disadvantages of the 2194 # solutions described above. 2195 shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname) 2196 rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes) 2197 assert len(known_host_ids) == len(known_host_statuses) 2198 for i in range(len(known_host_ids)): 2199 host_model = models.Host.objects.get(pk=known_host_ids[i]) 2200 if host_model.status != known_host_statuses[i]: 2201 host_model.status = known_host_statuses[i] 2202 host_model.save() 2203 2204 hosts, jobs, suite_keyvals, inc_ids = rpc_utils.find_records_for_shard( 2205 shard_obj, known_job_ids=known_job_ids, 2206 known_host_ids=known_host_ids) 2207 return { 2208 'hosts': [host.serialize() for host in hosts], 2209 'jobs': [job.serialize() for job in jobs], 2210 'suite_keyvals': [kv.serialize() for kv in suite_keyvals], 2211 'incorrect_host_ids': [int(i) for i in inc_ids], 2212 } 2213 2214 2215 def get_shards(**filter_data): 2216 """Return a list of all shards. 2217 2218 @returns A sequence of nested dictionaries of shard information. 2219 """ 2220 shards = models.Shard.query_objects(filter_data) 2221 serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ()) 2222 for serialized, shard in zip(serialized_shards, shards): 2223 serialized['labels'] = [label.name for label in shard.labels.all()] 2224 2225 return serialized_shards 2226 2227 2228 def _assign_board_to_shard_precheck(labels): 2229 """Verify whether board labels are valid to be added to a given shard. 2230 2231 First check whether board label is in correct format. Second, check whether 2232 the board label exist. Third, check whether the board has already been 2233 assigned to shard. 2234 2235 @param labels: Board labels separated by comma. 2236 2237 @raises error.RPCException: If label provided doesn't start with `board:` 2238 or board has been added to shard already. 2239 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2240 2241 @returns: A list of label models that ready to be added to shard. 2242 """ 2243 if not labels: 2244 # allow creation of label-less shards (labels='' would otherwise fail the 2245 # checks below) 2246 return [] 2247 labels = labels.split(',') 2248 label_models = [] 2249 for label in labels: 2250 # Check whether the board label is in correct format. 2251 if not label.startswith('board:'): 2252 raise error.RPCException('Sharding only supports `board:.*` label.') 2253 # Check whether the board label exist. If not, exception will be thrown 2254 # by smart_get function. 2255 label = models.Label.smart_get(label) 2256 # Check whether the board has been sharded already 2257 try: 2258 shard = models.Shard.objects.get(labels=label) 2259 raise error.RPCException( 2260 '%s is already on shard %s' % (label, shard.hostname)) 2261 except models.Shard.DoesNotExist: 2262 # board is not on any shard, so it's valid. 2263 label_models.append(label) 2264 return label_models 2265 2266 2267 def add_shard(hostname, labels): 2268 """Add a shard and start running jobs on it. 2269 2270 @param hostname: The hostname of the shard to be added; needs to be unique. 2271 @param labels: Board labels separated by comma. Jobs of one of the labels 2272 will be assigned to the shard. 2273 2274 @raises error.RPCException: If label provided doesn't start with `board:` or 2275 board has been added to shard already. 2276 @raises model_logic.ValidationError: If a shard with the given hostname 2277 already exist. 2278 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2279 2280 @returns: The id of the added shard. 2281 """ 2282 labels = _assign_board_to_shard_precheck(labels) 2283 shard = models.Shard.add_object(hostname=hostname) 2284 for label in labels: 2285 shard.labels.add(label) 2286 return shard.id 2287 2288 2289 def add_board_to_shard(hostname, labels): 2290 """Add boards to a given shard 2291 2292 @param hostname: The hostname of the shard to be changed. 2293 @param labels: Board labels separated by comma. 2294 2295 @raises error.RPCException: If label provided doesn't start with `board:` or 2296 board has been added to shard already. 2297 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2298 2299 @returns: The id of the changed shard. 2300 """ 2301 labels = _assign_board_to_shard_precheck(labels) 2302 shard = models.Shard.objects.get(hostname=hostname) 2303 for label in labels: 2304 shard.labels.add(label) 2305 return shard.id 2306 2307 2308 # Remove board RPCs are rare, so we can afford to make them a bit more 2309 # expensive (by performing in a transaction) in order to guarantee 2310 # atomicity. 2311 # TODO(akeshet): If we ever update to newer version of django, we need to 2312 # migrate to transaction.atomic instead of commit_on_success 2313 @transaction.commit_on_success 2314 def remove_board_from_shard(hostname, label): 2315 """Remove board from the given shard. 2316 @param hostname: The hostname of the shard to be changed. 2317 @param labels: Board label. 2318 2319 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2320 2321 @returns: The id of the changed shard. 2322 """ 2323 shard = models.Shard.objects.get(hostname=hostname) 2324 label = models.Label.smart_get(label) 2325 if label not in shard.labels.all(): 2326 raise error.RPCException( 2327 'Cannot remove label from shard that does not belong to it.') 2328 2329 shard.labels.remove(label) 2330 if label.is_replaced_by_static(): 2331 static_label = models.StaticLabel.smart_get(label.name) 2332 models.Host.objects.filter( 2333 static_labels__in=[static_label]).update(shard=None) 2334 else: 2335 models.Host.objects.filter(labels__in=[label]).update(shard=None) 2336 2337 2338 def delete_shard(hostname): 2339 """Delete a shard and reclaim all resources from it. 2340 2341 This claims back all assigned hosts from the shard. 2342 """ 2343 shard = rpc_utils.retrieve_shard(shard_hostname=hostname) 2344 2345 # Remove shard information. 2346 models.Host.objects.filter(shard=shard).update(shard=None) 2347 2348 # Note: The original job-cleanup query was performed with django call 2349 # models.Job.objects.filter(shard=shard).update(shard=None) 2350 # 2351 # But that started becoming unreliable due to the large size of afe_jobs. 2352 # 2353 # We don't need atomicity here, so the new cleanup method is iterative, in 2354 # chunks of 100k jobs. 2355 QUERY = ('UPDATE afe_jobs SET shard_id = NULL WHERE shard_id = %s ' 2356 'LIMIT 100000') 2357 try: 2358 with contextlib.closing(db_connection.cursor()) as cursor: 2359 clear_jobs = True 2360 assert shard.id is not None 2361 while clear_jobs: 2362 res = cursor.execute(QUERY % shard.id).fetchone() 2363 clear_jobs = bool(res) 2364 # Unit tests use sqlite backend instead of MySQL. sqlite does not support 2365 # UPDATE ... LIMIT, so fall back to the old behavior. 2366 except DatabaseError as e: 2367 if 'syntax error' in str(e): 2368 models.Job.objects.filter(shard=shard).update(shard=None) 2369 else: 2370 raise 2371 2372 shard.labels.clear() 2373 shard.delete() 2374 2375 2376 def get_servers(hostname=None, role=None, status=None): 2377 """Get a list of servers with matching role and status. 2378 2379 @param hostname: FQDN of the server. 2380 @param role: Name of the server role, e.g., drone, scheduler. Default to 2381 None to match any role. 2382 @param status: Status of the server, e.g., primary, backup, repair_required. 2383 Default to None to match any server status. 2384 2385 @raises error.RPCException: If server database is not used. 2386 @return: A list of server names for servers with matching role and status. 2387 """ 2388 if not server_manager_utils.use_server_db(): 2389 raise error.RPCException('Server database is not enabled. Please try ' 2390 'retrieve servers from global config.') 2391 servers = server_manager_utils.get_servers(hostname=hostname, role=role, 2392 status=status) 2393 return [s.get_details() for s in servers] 2394 2395 2396 @rpc_utils.route_rpc_to_master 2397 def get_stable_version(board=stable_version_utils.DEFAULT, android=False): 2398 """Get stable version for the given board. 2399 2400 @param board: Name of the board. 2401 @param android: If True, the given board is an Android-based device. If 2402 False, assume its a Chrome OS-based device. 2403 2404 @return: Stable version of the given board. Return global configure value 2405 of CROS.stable_cros_version if stable_versinos table does not have 2406 entry of board DEFAULT. 2407 """ 2408 return stable_version_utils.get(board=board, android=android) 2409 2410 2411 @rpc_utils.route_rpc_to_master 2412 def get_all_stable_versions(): 2413 """Get stable versions for all boards. 2414 2415 @return: A dictionary of board:version. 2416 """ 2417 return stable_version_utils.get_all() 2418 2419 2420 @rpc_utils.route_rpc_to_master 2421 def set_stable_version(version, board=stable_version_utils.DEFAULT): 2422 """Modify stable version for the given board. 2423 2424 @param version: The new value of stable version for given board. 2425 @param board: Name of the board, default to value `DEFAULT`. 2426 """ 2427 stable_version_utils.set(version=version, board=board) 2428 2429 2430 @rpc_utils.route_rpc_to_master 2431 def delete_stable_version(board): 2432 """Modify stable version for the given board. 2433 2434 Delete a stable version entry in afe_stable_versions table for a given 2435 board, so default stable version will be used. 2436 2437 @param board: Name of the board. 2438 """ 2439 stable_version_utils.delete(board=board) 2440 2441 2442 def get_tests_by_build(build, ignore_invalid_tests=True): 2443 """Get the tests that are available for the specified build. 2444 2445 @param build: unique name by which to refer to the image. 2446 @param ignore_invalid_tests: flag on if unparsable tests are ignored. 2447 2448 @return: A sorted list of all tests that are in the build specified. 2449 """ 2450 # Collect the control files specified in this build 2451 cfile_getter = control_file_lib._initialize_control_file_getter(build) 2452 if SuiteBase.ENABLE_CONTROLS_IN_BATCH: 2453 control_file_info_list = cfile_getter.get_suite_info() 2454 control_file_list = control_file_info_list.keys() 2455 else: 2456 control_file_list = cfile_getter.get_control_file_list() 2457 2458 test_objects = [] 2459 _id = 0 2460 for control_file_path in control_file_list: 2461 # Read and parse the control file 2462 if SuiteBase.ENABLE_CONTROLS_IN_BATCH: 2463 control_file = control_file_info_list[control_file_path] 2464 else: 2465 control_file = cfile_getter.get_control_file_contents( 2466 control_file_path) 2467 try: 2468 control_obj = control_data.parse_control_string(control_file) 2469 except: 2470 logging.info('Failed to parse control file: %s', control_file_path) 2471 if not ignore_invalid_tests: 2472 raise 2473 2474 # Extract the values needed for the AFE from the control_obj. 2475 # The keys list represents attributes in the control_obj that 2476 # are required by the AFE 2477 keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental', 2478 'test_category', 'test_class', 'dependencies', 'run_verify', 2479 'sync_count', 'job_retries', 'retries', 'path'] 2480 2481 test_object = {} 2482 for key in keys: 2483 test_object[key] = getattr(control_obj, key) if hasattr( 2484 control_obj, key) else '' 2485 2486 # Unfortunately, the AFE expects different key-names for certain 2487 # values, these must be corrected to avoid the risk of tests 2488 # being omitted by the AFE. 2489 # The 'id' is an additional value used in the AFE. 2490 # The control_data parsing does not reference 'run_reset', but it 2491 # is also used in the AFE and defaults to True. 2492 test_object['id'] = _id 2493 test_object['run_reset'] = True 2494 test_object['description'] = test_object.get('doc', '') 2495 test_object['test_time'] = test_object.get('time', 0) 2496 test_object['test_retry'] = test_object.get('retries', 0) 2497 2498 # Fix the test name to be consistent with the current presentation 2499 # of test names in the AFE. 2500 testpath, subname = os.path.split(control_file_path) 2501 testname = os.path.basename(testpath) 2502 subname = subname.split('.')[1:] 2503 if subname: 2504 testname = '%s:%s' % (testname, ':'.join(subname)) 2505 2506 test_object['name'] = testname 2507 2508 # Correct the test path as parse_control_string sets an empty string. 2509 test_object['path'] = control_file_path 2510 2511 _id += 1 2512 test_objects.append(test_object) 2513 2514 test_objects = sorted(test_objects, key=lambda x: x.get('name')) 2515 return rpc_utils.prepare_for_serialization(test_objects) 2516 2517 2518 @rpc_utils.route_rpc_to_master 2519 def get_lab_health_indicators(board=None): 2520 """Get the healthy indicators for whole lab. 2521 2522 The indicators now includes: 2523 1. lab is closed or not. 2524 2. Available DUTs list for a given board. 2525 3. Devserver capacity. 2526 4. When is the next major DUT utilization (e.g. CQ is coming in 3 minutes). 2527 2528 @param board: if board is specified, a list of available DUTs will be 2529 returned for it. Otherwise, skip this indicator. 2530 2531 @returns: A healthy indicator object including health info. 2532 """ 2533 return LabHealthIndicator(None, None, None, None) 2534