1 # pylint: disable-msg=C0111 2 3 """\ 4 Functions to expose over the RPC interface. 5 6 For all modify* and delete* functions that ask for an 'id' parameter to 7 identify the object to operate on, the id may be either 8 * the database row ID 9 * the name of the object (label name, hostname, user login, etc.) 10 * a dictionary containing uniquely identifying field (this option should seldom 11 be used) 12 13 When specifying foreign key fields (i.e. adding hosts to a label, or adding 14 users to an ACL group), the given value may be either the database row ID or the 15 name of the object. 16 17 All get* functions return lists of dictionaries. Each dictionary represents one 18 object and maps field names to values. 19 20 Some examples: 21 modify_host(2, hostname='myhost') # modify hostname of host with database ID 2 22 modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2' 23 modify_test('sleeptest', test_type='Client', params=', seconds=60') 24 delete_acl_group(1) # delete by ID 25 delete_acl_group('Everyone') # delete by name 26 acl_group_add_users('Everyone', ['mbligh', 'showard']) 27 get_jobs(owner='showard', status='Queued') 28 29 See doctests/001_rpc_test.txt for (lots) more examples. 30 """ 31 32 __author__ = 'showard (at] google.com (Steve Howard)' 33 34 import sys 35 import datetime 36 import logging 37 38 from django.db.models import Count 39 import common 40 from autotest_lib.client.common_lib import priorities 41 from autotest_lib.client.common_lib.cros import dev_server 42 from autotest_lib.client.common_lib.cros.graphite import autotest_stats 43 from autotest_lib.frontend.afe import control_file, rpc_utils 44 from autotest_lib.frontend.afe import models, model_logic, model_attributes 45 from autotest_lib.frontend.afe import site_rpc_interface 46 from autotest_lib.frontend.tko import models as tko_models 47 from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface 48 from autotest_lib.server import frontend 49 from autotest_lib.server import utils 50 from autotest_lib.server.cros import provision 51 from autotest_lib.server.cros.dynamic_suite import tools 52 from autotest_lib.site_utils import status_history 53 54 55 _timer = autotest_stats.Timer('rpc_interface') 56 57 def get_parameterized_autoupdate_image_url(job): 58 """Get the parameterized autoupdate image url from a parameterized job.""" 59 known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob') 60 image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj, 61 name='image') 62 para_set = job.parameterized_job.parameterizedjobparameter_set 63 job_test_para = para_set.get(test_parameter=image_parameter) 64 return job_test_para.parameter_value 65 66 67 # labels 68 69 def modify_label(id, **data): 70 """Modify a label. 71 72 @param id: id or name of a label. More often a label name. 73 @param data: New data for a label. 74 """ 75 label_model = models.Label.smart_get(id) 76 label_model.update_object(data) 77 78 # Master forwards the RPC to shards 79 if not utils.is_shard(): 80 rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False, 81 id=id, **data) 82 83 84 def delete_label(id): 85 """Delete a label. 86 87 @param id: id or name of a label. More often a label name. 88 """ 89 label_model = models.Label.smart_get(id) 90 # Hosts that have the label to be deleted. Save this info before 91 # the label is deleted to use it later. 92 hosts = [] 93 for h in label_model.host_set.all(): 94 hosts.append(models.Host.smart_get(h.id)) 95 label_model.delete() 96 97 # Master forwards the RPC to shards 98 if not utils.is_shard(): 99 rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id) 100 101 102 def add_label(name, ignore_exception_if_exists=False, **kwargs): 103 """Adds a new label of a given name. 104 105 @param name: label name. 106 @param ignore_exception_if_exists: If True and the exception was 107 thrown due to the duplicated label name when adding a label, 108 then suppress the exception. Default is False. 109 @param kwargs: keyword args that store more info about a label 110 other than the name. 111 @return: int/long id of a new label. 112 """ 113 # models.Label.add_object() throws model_logic.ValidationError 114 # when it is given a label name that already exists. 115 # However, ValidationError can be thrown with different errors, 116 # and those errors should be thrown up to the call chain. 117 try: 118 label = models.Label.add_object(name=name, **kwargs) 119 except: 120 exc_info = sys.exc_info() 121 if ignore_exception_if_exists: 122 label = rpc_utils.get_label(name) 123 # If the exception is raised not because of duplicated 124 # "name", then raise the original exception. 125 if label is None: 126 raise exc_info[0], exc_info[1], exc_info[2] 127 else: 128 raise exc_info[0], exc_info[1], exc_info[2] 129 return label.id 130 131 132 def add_label_to_hosts(id, hosts): 133 """Adds a label of the given id to the given hosts only in local DB. 134 135 @param id: id or name of a label. More often a label name. 136 @param hosts: The hostnames of hosts that need the label. 137 138 @raises models.Label.DoesNotExist: If the label with id doesn't exist. 139 """ 140 label = models.Label.smart_get(id) 141 host_objs = models.Host.smart_get_bulk(hosts) 142 if label.platform: 143 models.Host.check_no_platform(host_objs) 144 label.host_set.add(*host_objs) 145 146 147 @rpc_utils.route_rpc_to_master 148 def label_add_hosts(id, hosts): 149 """Adds a label with the given id to the given hosts. 150 151 This method should be run only on master not shards. 152 The given label will be created if it doesn't exist, provided the `id` 153 supplied is a label name not an int/long id. 154 155 @param id: id or name of a label. More often a label name. 156 @param hosts: A list of hostnames or ids. More often hostnames. 157 158 @raises ValueError: If the id specified is an int/long (label id) 159 while the label does not exist. 160 """ 161 try: 162 label = models.Label.smart_get(id) 163 except models.Label.DoesNotExist: 164 # This matches the type checks in smart_get, which is a hack 165 # in and off itself. The aim here is to create any non-existent 166 # label, which we cannot do if the 'id' specified isn't a label name. 167 if isinstance(id, basestring): 168 label = models.Label.smart_get(add_label(id)) 169 else: 170 raise ValueError('Label id (%s) does not exist. Please specify ' 171 'the argument, id, as a string (label name).' 172 % id) 173 add_label_to_hosts(id, hosts) 174 175 host_objs = models.Host.smart_get_bulk(hosts) 176 # Make sure the label exists on the shard with the same id 177 # as it is on the master. 178 # It is possible that the label is already in a shard because 179 # we are adding a new label only to shards of hosts that the label 180 # is going to be attached. 181 # For example, we add a label L1 to a host in shard S1. 182 # Master and S1 will have L1 but other shards won't. 183 # Later, when we add the same label L1 to hosts in shards S1 and S2, 184 # S1 already has the label but S2 doesn't. 185 # S2 should have the new label without any problem. 186 # We ignore exception in such a case. 187 rpc_utils.fanout_rpc( 188 host_objs, 'add_label', include_hostnames=False, 189 name=label.name, ignore_exception_if_exists=True, 190 id=label.id, platform=label.platform) 191 rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id) 192 193 194 def remove_label_from_hosts(id, hosts): 195 """Removes a label of the given id from the given hosts only in local DB. 196 197 @param id: id or name of a label. 198 @param hosts: The hostnames of hosts that need to remove the label from. 199 """ 200 host_objs = models.Host.smart_get_bulk(hosts) 201 models.Label.smart_get(id).host_set.remove(*host_objs) 202 203 204 @rpc_utils.route_rpc_to_master 205 def label_remove_hosts(id, hosts): 206 """Removes a label of the given id from the given hosts. 207 208 This method should be run only on master not shards. 209 210 @param id: id or name of a label. 211 @param hosts: A list of hostnames or ids. More often hostnames. 212 """ 213 host_objs = models.Host.smart_get_bulk(hosts) 214 remove_label_from_hosts(id, hosts) 215 216 rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id) 217 218 219 def get_labels(exclude_filters=(), **filter_data): 220 """\ 221 @param exclude_filters: A sequence of dictionaries of filters. 222 223 @returns A sequence of nested dictionaries of label information. 224 """ 225 labels = models.Label.query_objects(filter_data) 226 for exclude_filter in exclude_filters: 227 labels = labels.exclude(**exclude_filter) 228 return rpc_utils.prepare_rows_as_nested_dicts(labels, ('atomic_group',)) 229 230 231 # atomic groups 232 233 def add_atomic_group(name, max_number_of_machines=None, description=None): 234 return models.AtomicGroup.add_object( 235 name=name, max_number_of_machines=max_number_of_machines, 236 description=description).id 237 238 239 def modify_atomic_group(id, **data): 240 models.AtomicGroup.smart_get(id).update_object(data) 241 242 243 def delete_atomic_group(id): 244 models.AtomicGroup.smart_get(id).delete() 245 246 247 def atomic_group_add_labels(id, labels): 248 label_objs = models.Label.smart_get_bulk(labels) 249 models.AtomicGroup.smart_get(id).label_set.add(*label_objs) 250 251 252 def atomic_group_remove_labels(id, labels): 253 label_objs = models.Label.smart_get_bulk(labels) 254 models.AtomicGroup.smart_get(id).label_set.remove(*label_objs) 255 256 257 def get_atomic_groups(**filter_data): 258 return rpc_utils.prepare_for_serialization( 259 models.AtomicGroup.list_objects(filter_data)) 260 261 262 # hosts 263 264 def add_host(hostname, status=None, locked=None, lock_reason='', protection=None): 265 if locked and not lock_reason: 266 raise model_logic.ValidationError( 267 {'locked': 'Please provide a reason for locking when adding host.'}) 268 269 return models.Host.add_object(hostname=hostname, status=status, 270 locked=locked, lock_reason=lock_reason, 271 protection=protection).id 272 273 274 @rpc_utils.route_rpc_to_master 275 def modify_host(id, **kwargs): 276 """Modify local attributes of a host. 277 278 If this is called on the master, but the host is assigned to a shard, this 279 will call `modify_host_local` RPC to the responsible shard. This means if 280 a host is being locked using this function, this change will also propagate 281 to shards. 282 When this is called on a shard, the shard just routes the RPC to the master 283 and does nothing. 284 285 @param id: id of the host to modify. 286 @param kwargs: key=value pairs of values to set on the host. 287 """ 288 rpc_utils.check_modify_host(kwargs) 289 host = models.Host.smart_get(id) 290 try: 291 rpc_utils.check_modify_host_locking(host, kwargs) 292 except model_logic.ValidationError as e: 293 if not kwargs.get('force_modify_locking', False): 294 raise 295 logging.exception('The following exception will be ignored and lock ' 296 'modification will be enforced. %s', e) 297 298 # This is required to make `lock_time` for a host be exactly same 299 # between the master and a shard. 300 if kwargs.get('locked', None) and 'lock_time' not in kwargs: 301 kwargs['lock_time'] = datetime.datetime.now() 302 host.update_object(kwargs) 303 304 # force_modifying_locking is not an internal field in database, remove. 305 kwargs.pop('force_modify_locking', None) 306 rpc_utils.fanout_rpc([host], 'modify_host_local', 307 include_hostnames=False, id=id, **kwargs) 308 309 310 def modify_host_local(id, **kwargs): 311 """Modify host attributes in local DB. 312 313 @param id: Host id. 314 @param kwargs: key=value pairs of values to set on the host. 315 """ 316 models.Host.smart_get(id).update_object(kwargs) 317 318 319 @rpc_utils.route_rpc_to_master 320 def modify_hosts(host_filter_data, update_data): 321 """Modify local attributes of multiple hosts. 322 323 If this is called on the master, but one of the hosts in that match the 324 filters is assigned to a shard, this will call `modify_hosts_local` RPC 325 to the responsible shard. 326 When this is called on a shard, the shard just routes the RPC to the master 327 and does nothing. 328 329 The filters are always applied on the master, not on the shards. This means 330 if the states of a host differ on the master and a shard, the state on the 331 master will be used. I.e. this means: 332 A host was synced to Shard 1. On Shard 1 the status of the host was set to 333 'Repair Failed'. 334 - A call to modify_hosts with host_filter_data={'status': 'Ready'} will 335 update the host (both on the shard and on the master), because the state 336 of the host as the master knows it is still 'Ready'. 337 - A call to modify_hosts with host_filter_data={'status': 'Repair failed' 338 will not update the host, because the filter doesn't apply on the master. 339 340 @param host_filter_data: Filters out which hosts to modify. 341 @param update_data: A dictionary with the changes to make to the hosts. 342 """ 343 update_data = update_data.copy() 344 rpc_utils.check_modify_host(update_data) 345 hosts = models.Host.query_objects(host_filter_data) 346 347 affected_shard_hostnames = set() 348 affected_host_ids = [] 349 350 # Check all hosts before changing data for exception safety. 351 for host in hosts: 352 try: 353 rpc_utils.check_modify_host_locking(host, update_data) 354 except model_logic.ValidationError as e: 355 if not update_data.get('force_modify_locking', False): 356 raise 357 logging.exception('The following exception will be ignored and ' 358 'lock modification will be enforced. %s', e) 359 360 if host.shard: 361 affected_shard_hostnames.add(host.shard.rpc_hostname()) 362 affected_host_ids.append(host.id) 363 364 # This is required to make `lock_time` for a host be exactly same 365 # between the master and a shard. 366 if update_data.get('locked', None) and 'lock_time' not in update_data: 367 update_data['lock_time'] = datetime.datetime.now() 368 for host in hosts: 369 host.update_object(update_data) 370 371 update_data.pop('force_modify_locking', None) 372 # Caution: Changing the filter from the original here. See docstring. 373 rpc_utils.run_rpc_on_multiple_hostnames( 374 'modify_hosts_local', affected_shard_hostnames, 375 host_filter_data={'id__in': affected_host_ids}, 376 update_data=update_data) 377 378 379 def modify_hosts_local(host_filter_data, update_data): 380 """Modify attributes of hosts in local DB. 381 382 @param host_filter_data: Filters out which hosts to modify. 383 @param update_data: A dictionary with the changes to make to the hosts. 384 """ 385 for host in models.Host.query_objects(host_filter_data): 386 host.update_object(update_data) 387 388 389 def add_labels_to_host(id, labels): 390 """Adds labels to a given host only in local DB. 391 392 @param id: id or hostname for a host. 393 @param labels: ids or names for labels. 394 """ 395 label_objs = models.Label.smart_get_bulk(labels) 396 models.Host.smart_get(id).labels.add(*label_objs) 397 398 399 @rpc_utils.route_rpc_to_master 400 def host_add_labels(id, labels): 401 """Adds labels to a given host. 402 403 @param id: id or hostname for a host. 404 @param labels: ids or names for labels. 405 406 @raises ValidationError: If adding more than one platform label. 407 """ 408 label_objs = models.Label.smart_get_bulk(labels) 409 platforms = [label.name for label in label_objs if label.platform] 410 if len(platforms) > 1: 411 raise model_logic.ValidationError( 412 {'labels': 'Adding more than one platform label: %s' % 413 ', '.join(platforms)}) 414 415 host_obj = models.Host.smart_get(id) 416 if len(platforms) == 1: 417 models.Host.check_no_platform([host_obj]) 418 add_labels_to_host(id, labels) 419 420 rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False, 421 id=id, labels=labels) 422 423 424 def remove_labels_from_host(id, labels): 425 """Removes labels from a given host only in local DB. 426 427 @param id: id or hostname for a host. 428 @param labels: ids or names for labels. 429 """ 430 label_objs = models.Label.smart_get_bulk(labels) 431 models.Host.smart_get(id).labels.remove(*label_objs) 432 433 434 @rpc_utils.route_rpc_to_master 435 def host_remove_labels(id, labels): 436 """Removes labels from a given host. 437 438 @param id: id or hostname for a host. 439 @param labels: ids or names for labels. 440 """ 441 remove_labels_from_host(id, labels) 442 443 host_obj = models.Host.smart_get(id) 444 rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False, 445 id=id, labels=labels) 446 447 448 def get_host_attribute(attribute, **host_filter_data): 449 """ 450 @param attribute: string name of attribute 451 @param host_filter_data: filter data to apply to Hosts to choose hosts to 452 act upon 453 """ 454 hosts = rpc_utils.get_host_query((), False, False, True, host_filter_data) 455 hosts = list(hosts) 456 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 457 'attribute_list') 458 host_attr_dicts = [] 459 for host_obj in hosts: 460 for attr_obj in host_obj.attribute_list: 461 if attr_obj.attribute == attribute: 462 host_attr_dicts.append(attr_obj.get_object_dict()) 463 return rpc_utils.prepare_for_serialization(host_attr_dicts) 464 465 466 def set_host_attribute(attribute, value, **host_filter_data): 467 """ 468 @param attribute: string name of attribute 469 @param value: string, or None to delete an attribute 470 @param host_filter_data: filter data to apply to Hosts to choose hosts to 471 act upon 472 """ 473 assert host_filter_data # disallow accidental actions on all hosts 474 hosts = models.Host.query_objects(host_filter_data) 475 models.AclGroup.check_for_acl_violation_hosts(hosts) 476 for host in hosts: 477 host.set_or_delete_attribute(attribute, value) 478 479 # Master forwards this RPC to shards. 480 if not utils.is_shard(): 481 rpc_utils.fanout_rpc(hosts, 'set_host_attribute', False, 482 attribute=attribute, value=value, **host_filter_data) 483 484 485 @rpc_utils.forward_single_host_rpc_to_shard 486 def delete_host(id): 487 models.Host.smart_get(id).delete() 488 489 490 def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 491 exclude_atomic_group_hosts=False, valid_only=True, 492 include_current_job=False, **filter_data): 493 """Get a list of dictionaries which contains the information of hosts. 494 495 @param multiple_labels: match hosts in all of the labels given. Should 496 be a list of label names. 497 @param exclude_only_if_needed_labels: Exclude hosts with at least one 498 "only_if_needed" label applied. 499 @param exclude_atomic_group_hosts: Exclude hosts that have one or more 500 atomic group labels associated with them. 501 @param include_current_job: Set to True to include ids of currently running 502 job and special task. 503 """ 504 hosts = rpc_utils.get_host_query(multiple_labels, 505 exclude_only_if_needed_labels, 506 exclude_atomic_group_hosts, 507 valid_only, filter_data) 508 hosts = list(hosts) 509 models.Host.objects.populate_relationships(hosts, models.Label, 510 'label_list') 511 models.Host.objects.populate_relationships(hosts, models.AclGroup, 512 'acl_list') 513 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 514 'attribute_list') 515 host_dicts = [] 516 for host_obj in hosts: 517 host_dict = host_obj.get_object_dict() 518 host_dict['labels'] = [label.name for label in host_obj.label_list] 519 host_dict['platform'], host_dict['atomic_group'] = (rpc_utils. 520 find_platform_and_atomic_group(host_obj)) 521 host_dict['acls'] = [acl.name for acl in host_obj.acl_list] 522 host_dict['attributes'] = dict((attribute.attribute, attribute.value) 523 for attribute in host_obj.attribute_list) 524 if include_current_job: 525 host_dict['current_job'] = None 526 host_dict['current_special_task'] = None 527 entries = models.HostQueueEntry.objects.filter( 528 host_id=host_dict['id'], active=True, complete=False) 529 if entries: 530 host_dict['current_job'] = ( 531 entries[0].get_object_dict()['job']) 532 tasks = models.SpecialTask.objects.filter( 533 host_id=host_dict['id'], is_active=True, is_complete=False) 534 if tasks: 535 host_dict['current_special_task'] = ( 536 '%d-%s' % (tasks[0].get_object_dict()['id'], 537 tasks[0].get_object_dict()['task'].lower())) 538 host_dicts.append(host_dict) 539 return rpc_utils.prepare_for_serialization(host_dicts) 540 541 542 def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 543 exclude_atomic_group_hosts=False, valid_only=True, 544 **filter_data): 545 """ 546 Same parameters as get_hosts(). 547 548 @returns The number of matching hosts. 549 """ 550 hosts = rpc_utils.get_host_query(multiple_labels, 551 exclude_only_if_needed_labels, 552 exclude_atomic_group_hosts, 553 valid_only, filter_data) 554 return hosts.count() 555 556 557 # tests 558 559 def add_test(name, test_type, path, author=None, dependencies=None, 560 experimental=True, run_verify=None, test_class=None, 561 test_time=None, test_category=None, description=None, 562 sync_count=1): 563 return models.Test.add_object(name=name, test_type=test_type, path=path, 564 author=author, dependencies=dependencies, 565 experimental=experimental, 566 run_verify=run_verify, test_time=test_time, 567 test_category=test_category, 568 sync_count=sync_count, 569 test_class=test_class, 570 description=description).id 571 572 573 def modify_test(id, **data): 574 models.Test.smart_get(id).update_object(data) 575 576 577 def delete_test(id): 578 models.Test.smart_get(id).delete() 579 580 581 def get_tests(**filter_data): 582 return rpc_utils.prepare_for_serialization( 583 models.Test.list_objects(filter_data)) 584 585 586 @_timer.decorate 587 def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name): 588 """Gets the counts of all passed and failed tests from the matching jobs. 589 590 @param job_name_prefix: Name prefix of the jobs to get the summary from, e.g., 591 'butterfly-release/R40-6457.21.0/bvt-cq/'. 592 @param label_name: Label that must be set in the jobs, e.g., 593 'cros-version:butterfly-release/R40-6457.21.0'. 594 595 @returns A summary of the counts of all the passed and failed tests. 596 """ 597 job_ids = list(models.Job.objects.filter( 598 name__startswith=job_name_prefix, 599 dependency_labels__name=label_name).values_list( 600 'pk', flat=True)) 601 summary = {'passed': 0, 'failed': 0} 602 if not job_ids: 603 return summary 604 605 counts = (tko_models.TestView.objects.filter( 606 afe_job_id__in=job_ids).exclude( 607 test_name='SERVER_JOB').exclude( 608 test_name__startswith='CLIENT_JOB').values( 609 'status').annotate( 610 count=Count('status'))) 611 for status in counts: 612 if status['status'] == 'GOOD': 613 summary['passed'] += status['count'] 614 else: 615 summary['failed'] += status['count'] 616 return summary 617 618 619 # profilers 620 621 def add_profiler(name, description=None): 622 return models.Profiler.add_object(name=name, description=description).id 623 624 625 def modify_profiler(id, **data): 626 models.Profiler.smart_get(id).update_object(data) 627 628 629 def delete_profiler(id): 630 models.Profiler.smart_get(id).delete() 631 632 633 def get_profilers(**filter_data): 634 return rpc_utils.prepare_for_serialization( 635 models.Profiler.list_objects(filter_data)) 636 637 638 # users 639 640 def add_user(login, access_level=None): 641 return models.User.add_object(login=login, access_level=access_level).id 642 643 644 def modify_user(id, **data): 645 models.User.smart_get(id).update_object(data) 646 647 648 def delete_user(id): 649 models.User.smart_get(id).delete() 650 651 652 def get_users(**filter_data): 653 return rpc_utils.prepare_for_serialization( 654 models.User.list_objects(filter_data)) 655 656 657 # acl groups 658 659 def add_acl_group(name, description=None): 660 group = models.AclGroup.add_object(name=name, description=description) 661 group.users.add(models.User.current_user()) 662 return group.id 663 664 665 def modify_acl_group(id, **data): 666 group = models.AclGroup.smart_get(id) 667 group.check_for_acl_violation_acl_group() 668 group.update_object(data) 669 group.add_current_user_if_empty() 670 671 672 def acl_group_add_users(id, users): 673 group = models.AclGroup.smart_get(id) 674 group.check_for_acl_violation_acl_group() 675 users = models.User.smart_get_bulk(users) 676 group.users.add(*users) 677 678 679 def acl_group_remove_users(id, users): 680 group = models.AclGroup.smart_get(id) 681 group.check_for_acl_violation_acl_group() 682 users = models.User.smart_get_bulk(users) 683 group.users.remove(*users) 684 group.add_current_user_if_empty() 685 686 687 def acl_group_add_hosts(id, hosts): 688 group = models.AclGroup.smart_get(id) 689 group.check_for_acl_violation_acl_group() 690 hosts = models.Host.smart_get_bulk(hosts) 691 group.hosts.add(*hosts) 692 group.on_host_membership_change() 693 694 695 def acl_group_remove_hosts(id, hosts): 696 group = models.AclGroup.smart_get(id) 697 group.check_for_acl_violation_acl_group() 698 hosts = models.Host.smart_get_bulk(hosts) 699 group.hosts.remove(*hosts) 700 group.on_host_membership_change() 701 702 703 def delete_acl_group(id): 704 models.AclGroup.smart_get(id).delete() 705 706 707 def get_acl_groups(**filter_data): 708 acl_groups = models.AclGroup.list_objects(filter_data) 709 for acl_group in acl_groups: 710 acl_group_obj = models.AclGroup.objects.get(id=acl_group['id']) 711 acl_group['users'] = [user.login 712 for user in acl_group_obj.users.all()] 713 acl_group['hosts'] = [host.hostname 714 for host in acl_group_obj.hosts.all()] 715 return rpc_utils.prepare_for_serialization(acl_groups) 716 717 718 # jobs 719 720 def generate_control_file(tests=(), kernel=None, label=None, profilers=(), 721 client_control_file='', use_container=False, 722 profile_only=None, upload_kernel_config=False, 723 db_tests=True): 724 """ 725 Generates a client-side control file to load a kernel and run tests. 726 727 @param tests List of tests to run. See db_tests for more information. 728 @param kernel A list of kernel info dictionaries configuring which kernels 729 to boot for this job and other options for them 730 @param label Name of label to grab kernel config from. 731 @param profilers List of profilers to activate during the job. 732 @param client_control_file The contents of a client-side control file to 733 run at the end of all tests. If this is supplied, all tests must be 734 client side. 735 TODO: in the future we should support server control files directly 736 to wrap with a kernel. That'll require changing the parameter 737 name and adding a boolean to indicate if it is a client or server 738 control file. 739 @param use_container unused argument today. TODO: Enable containers 740 on the host during a client side test. 741 @param profile_only A boolean that indicates what default profile_only 742 mode to use in the control file. Passing None will generate a 743 control file that does not explcitly set the default mode at all. 744 @param upload_kernel_config: if enabled it will generate server control 745 file code that uploads the kernel config file to the client and 746 tells the client of the new (local) path when compiling the kernel; 747 the tests must be server side tests 748 @param db_tests: if True, the test object can be found in the database 749 backing the test model. In this case, tests is a tuple 750 of test IDs which are used to retrieve the test objects 751 from the database. If False, tests is a tuple of test 752 dictionaries stored client-side in the AFE. 753 754 @returns a dict with the following keys: 755 control_file: str, The control file text. 756 is_server: bool, is the control file a server-side control file? 757 synch_count: How many machines the job uses per autoserv execution. 758 synch_count == 1 means the job is asynchronous. 759 dependencies: A list of the names of labels on which the job depends. 760 """ 761 if not tests and not client_control_file: 762 return dict(control_file='', is_server=False, synch_count=1, 763 dependencies=[]) 764 765 cf_info, test_objects, profiler_objects, label = ( 766 rpc_utils.prepare_generate_control_file(tests, kernel, label, 767 profilers, db_tests)) 768 cf_info['control_file'] = control_file.generate_control( 769 tests=test_objects, kernels=kernel, platform=label, 770 profilers=profiler_objects, is_server=cf_info['is_server'], 771 client_control_file=client_control_file, profile_only=profile_only, 772 upload_kernel_config=upload_kernel_config) 773 return cf_info 774 775 776 def create_parameterized_job(name, priority, test, parameters, kernel=None, 777 label=None, profilers=(), profiler_parameters=None, 778 use_container=False, profile_only=None, 779 upload_kernel_config=False, hosts=(), 780 meta_hosts=(), one_time_hosts=(), 781 atomic_group_name=None, synch_count=None, 782 is_template=False, timeout=None, 783 timeout_mins=None, max_runtime_mins=None, 784 run_verify=False, email_list='', dependencies=(), 785 reboot_before=None, reboot_after=None, 786 parse_failed_repair=None, hostless=False, 787 keyvals=None, drone_set=None, run_reset=True, 788 require_ssp=None): 789 """ 790 Creates and enqueues a parameterized job. 791 792 Most parameters a combination of the parameters for generate_control_file() 793 and create_job(), with the exception of: 794 795 @param test name or ID of the test to run 796 @param parameters a map of parameter name -> 797 tuple of (param value, param type) 798 @param profiler_parameters a dictionary of parameters for the profilers: 799 key: profiler name 800 value: dict of param name -> tuple of 801 (param value, 802 param type) 803 """ 804 # Save the values of the passed arguments here. What we're going to do with 805 # them is pass them all to rpc_utils.get_create_job_common_args(), which 806 # will extract the subset of these arguments that apply for 807 # rpc_utils.create_job_common(), which we then pass in to that function. 808 args = locals() 809 810 # Set up the parameterized job configs 811 test_obj = models.Test.smart_get(test) 812 control_type = test_obj.test_type 813 814 try: 815 label = models.Label.smart_get(label) 816 except models.Label.DoesNotExist: 817 label = None 818 819 kernel_objs = models.Kernel.create_kernels(kernel) 820 profiler_objs = [models.Profiler.smart_get(profiler) 821 for profiler in profilers] 822 823 parameterized_job = models.ParameterizedJob.objects.create( 824 test=test_obj, label=label, use_container=use_container, 825 profile_only=profile_only, 826 upload_kernel_config=upload_kernel_config) 827 parameterized_job.kernels.add(*kernel_objs) 828 829 for profiler in profiler_objs: 830 parameterized_profiler = models.ParameterizedJobProfiler.objects.create( 831 parameterized_job=parameterized_job, 832 profiler=profiler) 833 profiler_params = profiler_parameters.get(profiler.name, {}) 834 for name, (value, param_type) in profiler_params.iteritems(): 835 models.ParameterizedJobProfilerParameter.objects.create( 836 parameterized_job_profiler=parameterized_profiler, 837 parameter_name=name, 838 parameter_value=value, 839 parameter_type=param_type) 840 841 try: 842 for parameter in test_obj.testparameter_set.all(): 843 if parameter.name in parameters: 844 param_value, param_type = parameters.pop(parameter.name) 845 parameterized_job.parameterizedjobparameter_set.create( 846 test_parameter=parameter, parameter_value=param_value, 847 parameter_type=param_type) 848 849 if parameters: 850 raise Exception('Extra parameters remain: %r' % parameters) 851 852 return rpc_utils.create_job_common( 853 parameterized_job=parameterized_job.id, 854 control_type=control_type, 855 **rpc_utils.get_create_job_common_args(args)) 856 except: 857 parameterized_job.delete() 858 raise 859 860 861 def create_job_page_handler(name, priority, control_file, control_type, 862 image=None, hostless=False, firmware_rw_build=None, 863 firmware_ro_build=None, test_source_build=None, 864 **kwargs): 865 """\ 866 Create and enqueue a job. 867 868 @param name name of this job 869 @param priority Integer priority of this job. Higher is more important. 870 @param control_file String contents of the control file. 871 @param control_type Type of control file, Client or Server. 872 @param image: ChromeOS build to be installed in the dut. Default to None. 873 @param firmware_rw_build: Firmware build to update RW firmware. Default to 874 None, i.e., RW firmware will not be updated. 875 @param firmware_ro_build: Firmware build to update RO firmware. Default to 876 None, i.e., RO firmware will not be updated. 877 @param test_source_build: Build to be used to retrieve test code. Default 878 to None. 879 @param kwargs extra args that will be required by create_suite_job or 880 create_job. 881 882 @returns The created Job id number. 883 """ 884 control_file = rpc_utils.encode_ascii(control_file) 885 if not control_file: 886 raise model_logic.ValidationError({ 887 'control_file' : "Control file cannot be empty"}) 888 889 if image and hostless: 890 builds = {} 891 builds[provision.CROS_VERSION_PREFIX] = image 892 if firmware_rw_build: 893 builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build 894 if firmware_ro_build: 895 builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build 896 return site_rpc_interface.create_suite_job( 897 name=name, control_file=control_file, priority=priority, 898 builds=builds, test_source_build=test_source_build, **kwargs) 899 return create_job(name, priority, control_file, control_type, image=image, 900 hostless=hostless, **kwargs) 901 902 903 @rpc_utils.route_rpc_to_master 904 def create_job(name, priority, control_file, control_type, 905 hosts=(), meta_hosts=(), one_time_hosts=(), 906 atomic_group_name=None, synch_count=None, is_template=False, 907 timeout=None, timeout_mins=None, max_runtime_mins=None, 908 run_verify=False, email_list='', dependencies=(), 909 reboot_before=None, reboot_after=None, parse_failed_repair=None, 910 hostless=False, keyvals=None, drone_set=None, image=None, 911 parent_job_id=None, test_retry=0, run_reset=True, 912 require_ssp=None, args=(), **kwargs): 913 """\ 914 Create and enqueue a job. 915 916 @param name name of this job 917 @param priority Integer priority of this job. Higher is more important. 918 @param control_file String contents of the control file. 919 @param control_type Type of control file, Client or Server. 920 @param synch_count How many machines the job uses per autoserv execution. 921 synch_count == 1 means the job is asynchronous. If an atomic group is 922 given this value is treated as a minimum. 923 @param is_template If true then create a template job. 924 @param timeout Hours after this call returns until the job times out. 925 @param timeout_mins Minutes after this call returns until the job times 926 out. 927 @param max_runtime_mins Minutes from job starting time until job times out 928 @param run_verify Should the host be verified before running the test? 929 @param email_list String containing emails to mail when the job is done 930 @param dependencies List of label names on which this job depends 931 @param reboot_before Never, If dirty, or Always 932 @param reboot_after Never, If all tests passed, or Always 933 @param parse_failed_repair if true, results of failed repairs launched by 934 this job will be parsed as part of the job. 935 @param hostless if true, create a hostless job 936 @param keyvals dict of keyvals to associate with the job 937 @param hosts List of hosts to run job on. 938 @param meta_hosts List where each entry is a label name, and for each entry 939 one host will be chosen from that label to run the job on. 940 @param one_time_hosts List of hosts not in the database to run the job on. 941 @param atomic_group_name The name of an atomic group to schedule the job on. 942 @param drone_set The name of the drone set to run this test on. 943 @param image OS image to install before running job. 944 @param parent_job_id id of a job considered to be parent of created job. 945 @param test_retry Number of times to retry test if the test did not 946 complete successfully. (optional, default: 0) 947 @param run_reset Should the host be reset before running the test? 948 @param require_ssp Set to True to require server-side packaging to run the 949 test. If it's set to None, drone will still try to run 950 the server side with server-side packaging. If the 951 autotest-server package doesn't exist for the build or 952 image is not set, drone will run the test without server- 953 side packaging. Default is None. 954 @param args A list of args to be injected into control file. 955 @param kwargs extra keyword args. NOT USED. 956 957 @returns The created Job id number. 958 """ 959 if args: 960 control_file = tools.inject_vars({'args': args}, control_file) 961 962 if image is None: 963 return rpc_utils.create_job_common( 964 **rpc_utils.get_create_job_common_args(locals())) 965 966 # Translate the image name, in case its a relative build name. 967 ds = dev_server.ImageServer.resolve(image) 968 image = ds.translate(image) 969 970 # When image is supplied use a known parameterized test already in the 971 # database to pass the OS image path from the front end, through the 972 # scheduler, and finally to autoserv as the --image parameter. 973 974 # The test autoupdate_ParameterizedJob is in afe_autotests and used to 975 # instantiate a Test object and from there a ParameterizedJob. 976 known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob') 977 known_parameterized_job = models.ParameterizedJob.objects.create( 978 test=known_test_obj) 979 980 # autoupdate_ParameterizedJob has a single parameter, the image parameter, 981 # stored in the table afe_test_parameters. We retrieve and set this 982 # instance of the parameter to the OS image path. 983 image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj, 984 name='image') 985 known_parameterized_job.parameterizedjobparameter_set.create( 986 test_parameter=image_parameter, parameter_value=image, 987 parameter_type='string') 988 989 # TODO(crbug.com/502638): save firmware build etc to parameterized_job. 990 991 # By passing a parameterized_job to create_job_common the job entry in 992 # the afe_jobs table will have the field parameterized_job_id set. 993 # The scheduler uses this id in the afe_parameterized_jobs table to 994 # match this job to our known test, and then with the 995 # afe_parameterized_job_parameters table to get the actual image path. 996 return rpc_utils.create_job_common( 997 parameterized_job=known_parameterized_job.id, 998 **rpc_utils.get_create_job_common_args(locals())) 999 1000 1001 def abort_host_queue_entries(**filter_data): 1002 """\ 1003 Abort a set of host queue entries. 1004 1005 @return: A list of dictionaries, each contains information 1006 about an aborted HQE. 1007 """ 1008 query = models.HostQueueEntry.query_objects(filter_data) 1009 1010 # Dont allow aborts on: 1011 # 1. Jobs that have already completed (whether or not they were aborted) 1012 # 2. Jobs that we have already been aborted (but may not have completed) 1013 query = query.filter(complete=False).filter(aborted=False) 1014 models.AclGroup.check_abort_permissions(query) 1015 host_queue_entries = list(query.select_related()) 1016 rpc_utils.check_abort_synchronous_jobs(host_queue_entries) 1017 1018 models.HostQueueEntry.abort_host_queue_entries(host_queue_entries) 1019 hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id, 1020 'Job name': hqe.job.name} for hqe in host_queue_entries] 1021 return hqe_info 1022 1023 1024 def abort_special_tasks(**filter_data): 1025 """\ 1026 Abort the special task, or tasks, specified in the filter. 1027 """ 1028 query = models.SpecialTask.query_objects(filter_data) 1029 special_tasks = query.filter(is_active=True) 1030 for task in special_tasks: 1031 task.abort() 1032 1033 1034 def _call_special_tasks_on_hosts(task, hosts): 1035 """\ 1036 Schedules a set of hosts for a special task. 1037 1038 @returns A list of hostnames that a special task was created for. 1039 """ 1040 models.AclGroup.check_for_acl_violation_hosts(hosts) 1041 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts) 1042 if shard_host_map and not utils.is_shard(): 1043 raise ValueError('The following hosts are on shards, please ' 1044 'follow the link to the shards and create jobs ' 1045 'there instead. %s.' % shard_host_map) 1046 for host in hosts: 1047 models.SpecialTask.schedule_special_task(host, task) 1048 return list(sorted(host.hostname for host in hosts)) 1049 1050 1051 def _forward_special_tasks_on_hosts(task, rpc, **filter_data): 1052 """Forward special tasks to corresponding shards. 1053 1054 For master, when special tasks are fired on hosts that are sharded, 1055 forward the RPC to corresponding shards. 1056 1057 For shard, create special task records in local DB. 1058 1059 @param task: Enum value of frontend.afe.models.SpecialTask.Task 1060 @param rpc: RPC name to forward. 1061 @param filter_data: Filter keywords to be used for DB query. 1062 1063 @return: A list of hostnames that a special task was created for. 1064 """ 1065 hosts = models.Host.query_objects(filter_data) 1066 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts, rpc_hostnames=True) 1067 1068 # Filter out hosts on a shard from those on the master, forward 1069 # rpcs to the shard with an additional hostname__in filter, and 1070 # create a local SpecialTask for each remaining host. 1071 if shard_host_map and not utils.is_shard(): 1072 hosts = [h for h in hosts if h.shard is None] 1073 for shard, hostnames in shard_host_map.iteritems(): 1074 1075 # The main client of this module is the frontend website, and 1076 # it invokes it with an 'id' or an 'id__in' filter. Regardless, 1077 # the 'hostname' filter should narrow down the list of hosts on 1078 # each shard even though we supply all the ids in filter_data. 1079 # This method uses hostname instead of id because it fits better 1080 # with the overall architecture of redirection functions in 1081 # rpc_utils. 1082 shard_filter = filter_data.copy() 1083 shard_filter['hostname__in'] = hostnames 1084 rpc_utils.run_rpc_on_multiple_hostnames( 1085 rpc, [shard], **shard_filter) 1086 1087 # There is a race condition here if someone assigns a shard to one of these 1088 # hosts before we create the task. The host will stay on the master if: 1089 # 1. The host is not Ready 1090 # 2. The host is Ready but has a task 1091 # But if the host is Ready and doesn't have a task yet, it will get sent 1092 # to the shard as we're creating a task here. 1093 1094 # Given that we only rarely verify Ready hosts it isn't worth putting this 1095 # entire method in a transaction. The worst case scenario is that we have 1096 # a verify running on a Ready host while the shard is using it, if the 1097 # verify fails no subsequent tasks will be created against the host on the 1098 # master, and verifies are safe enough that this is OK. 1099 return _call_special_tasks_on_hosts(task, hosts) 1100 1101 1102 def reverify_hosts(**filter_data): 1103 """\ 1104 Schedules a set of hosts for verify. 1105 1106 @returns A list of hostnames that a verify task was created for. 1107 """ 1108 return _forward_special_tasks_on_hosts( 1109 models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data) 1110 1111 1112 def repair_hosts(**filter_data): 1113 """\ 1114 Schedules a set of hosts for repair. 1115 1116 @returns A list of hostnames that a repair task was created for. 1117 """ 1118 return _forward_special_tasks_on_hosts( 1119 models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data) 1120 1121 1122 def get_jobs(not_yet_run=False, running=False, finished=False, 1123 suite=False, sub=False, standalone=False, **filter_data): 1124 """\ 1125 Extra status filter args for get_jobs: 1126 -not_yet_run: Include only jobs that have not yet started running. 1127 -running: Include only jobs that have start running but for which not 1128 all hosts have completed. 1129 -finished: Include only jobs for which all hosts have completed (or 1130 aborted). 1131 1132 Extra type filter args for get_jobs: 1133 -suite: Include only jobs with child jobs. 1134 -sub: Include only jobs with a parent job. 1135 -standalone: Inlcude only jobs with no child or parent jobs. 1136 At most one of these three fields should be specified. 1137 """ 1138 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1139 running, 1140 finished) 1141 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1142 suite, 1143 sub, 1144 standalone) 1145 job_dicts = [] 1146 jobs = list(models.Job.query_objects(filter_data)) 1147 models.Job.objects.populate_relationships(jobs, models.Label, 1148 'dependencies') 1149 models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals') 1150 for job in jobs: 1151 job_dict = job.get_object_dict() 1152 job_dict['dependencies'] = ','.join(label.name 1153 for label in job.dependencies) 1154 job_dict['keyvals'] = dict((keyval.key, keyval.value) 1155 for keyval in job.keyvals) 1156 if job.parameterized_job: 1157 job_dict['image'] = get_parameterized_autoupdate_image_url(job) 1158 job_dicts.append(job_dict) 1159 return rpc_utils.prepare_for_serialization(job_dicts) 1160 1161 1162 def get_num_jobs(not_yet_run=False, running=False, finished=False, 1163 suite=False, sub=False, standalone=False, 1164 **filter_data): 1165 """\ 1166 See get_jobs() for documentation of extra filter parameters. 1167 """ 1168 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1169 running, 1170 finished) 1171 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1172 suite, 1173 sub, 1174 standalone) 1175 return models.Job.query_count(filter_data) 1176 1177 1178 def get_jobs_summary(**filter_data): 1179 """\ 1180 Like get_jobs(), but adds 'status_counts' and 'result_counts' field. 1181 1182 'status_counts' filed is a dictionary mapping status strings to the number 1183 of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}. 1184 1185 'result_counts' field is piped to tko's rpc_interface and has the return 1186 format specified under get_group_counts. 1187 """ 1188 jobs = get_jobs(**filter_data) 1189 ids = [job['id'] for job in jobs] 1190 all_status_counts = models.Job.objects.get_status_counts(ids) 1191 for job in jobs: 1192 job['status_counts'] = all_status_counts[job['id']] 1193 job['result_counts'] = tko_rpc_interface.get_status_counts( 1194 ['afe_job_id', 'afe_job_id'], 1195 header_groups=[['afe_job_id'], ['afe_job_id']], 1196 **{'afe_job_id': job['id']}) 1197 return rpc_utils.prepare_for_serialization(jobs) 1198 1199 1200 def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None): 1201 """\ 1202 Retrieves all the information needed to clone a job. 1203 """ 1204 job = models.Job.objects.get(id=id) 1205 job_info = rpc_utils.get_job_info(job, 1206 preserve_metahosts, 1207 queue_entry_filter_data) 1208 1209 host_dicts = [] 1210 for host in job_info['hosts']: 1211 host_dict = get_hosts(id=host.id)[0] 1212 other_labels = host_dict['labels'] 1213 if host_dict['platform']: 1214 other_labels.remove(host_dict['platform']) 1215 host_dict['other_labels'] = ', '.join(other_labels) 1216 host_dicts.append(host_dict) 1217 1218 for host in job_info['one_time_hosts']: 1219 host_dict = dict(hostname=host.hostname, 1220 id=host.id, 1221 platform='(one-time host)', 1222 locked_text='') 1223 host_dicts.append(host_dict) 1224 1225 # convert keys from Label objects to strings (names of labels) 1226 meta_host_counts = dict((meta_host.name, count) for meta_host, count 1227 in job_info['meta_host_counts'].iteritems()) 1228 1229 info = dict(job=job.get_object_dict(), 1230 meta_host_counts=meta_host_counts, 1231 hosts=host_dicts) 1232 info['job']['dependencies'] = job_info['dependencies'] 1233 if job_info['atomic_group']: 1234 info['atomic_group_name'] = (job_info['atomic_group']).name 1235 else: 1236 info['atomic_group_name'] = None 1237 info['hostless'] = job_info['hostless'] 1238 info['drone_set'] = job.drone_set and job.drone_set.name 1239 1240 if job.parameterized_job: 1241 info['job']['image'] = get_parameterized_autoupdate_image_url(job) 1242 1243 return rpc_utils.prepare_for_serialization(info) 1244 1245 1246 # host queue entries 1247 1248 def get_host_queue_entries(start_time=None, end_time=None, **filter_data): 1249 """\ 1250 @returns A sequence of nested dictionaries of host and job information. 1251 """ 1252 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1253 'started_on__lte', 1254 start_time, 1255 end_time, 1256 **filter_data) 1257 return rpc_utils.prepare_rows_as_nested_dicts( 1258 models.HostQueueEntry.query_objects(filter_data), 1259 ('host', 'atomic_group', 'job')) 1260 1261 1262 def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data): 1263 """\ 1264 Get the number of host queue entries associated with this job. 1265 """ 1266 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1267 'started_on__lte', 1268 start_time, 1269 end_time, 1270 **filter_data) 1271 return models.HostQueueEntry.query_count(filter_data) 1272 1273 1274 def get_hqe_percentage_complete(**filter_data): 1275 """ 1276 Computes the fraction of host queue entries matching the given filter data 1277 that are complete. 1278 """ 1279 query = models.HostQueueEntry.query_objects(filter_data) 1280 complete_count = query.filter(complete=True).count() 1281 total_count = query.count() 1282 if total_count == 0: 1283 return 1 1284 return float(complete_count) / total_count 1285 1286 1287 # special tasks 1288 1289 def get_special_tasks(**filter_data): 1290 """Get special task entries from the local database. 1291 1292 Query the special tasks table for tasks matching the given 1293 `filter_data`, and return a list of the results. No attempt is 1294 made to forward the call to shards; the buck will stop here. 1295 The caller is expected to know the target shard for such reasons 1296 as: 1297 * The caller is a service (such as gs_offloader) configured 1298 to operate on behalf of one specific shard, and no other. 1299 * The caller has a host as a parameter, and knows that this is 1300 the shard assigned to that host. 1301 1302 @param filter_data Filter keywords to pass to the underlying 1303 database query. 1304 1305 """ 1306 return rpc_utils.prepare_rows_as_nested_dicts( 1307 models.SpecialTask.query_objects(filter_data), 1308 ('host', 'queue_entry')) 1309 1310 1311 def get_host_special_tasks(host_id, **filter_data): 1312 """Get special task entries for a given host. 1313 1314 Query the special tasks table for tasks that ran on the host 1315 given by `host_id` and matching the given `filter_data`. 1316 Return a list of the results. If the host is assigned to a 1317 shard, forward this call to that shard. 1318 1319 @param host_id Id in the database of the target host. 1320 @param filter_data Filter keywords to pass to the underlying 1321 database query. 1322 1323 """ 1324 # Retrieve host data even if the host is in an invalid state. 1325 host = models.Host.smart_get(host_id, False) 1326 if not host.shard: 1327 return get_special_tasks(host_id=host_id, **filter_data) 1328 else: 1329 # The return values from AFE methods are post-processed 1330 # objects that aren't JSON-serializable. So, we have to 1331 # call AFE.run() to get the raw, serializable output from 1332 # the shard. 1333 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1334 return shard_afe.run('get_special_tasks', 1335 host_id=host_id, **filter_data) 1336 1337 1338 def get_num_special_tasks(**kwargs): 1339 """Get the number of special task entries from the local database. 1340 1341 Query the special tasks table for tasks matching the given 'kwargs', 1342 and return the number of the results. No attempt is made to forward 1343 the call to shards; the buck will stop here. 1344 1345 @param kwargs Filter keywords to pass to the underlying database query. 1346 1347 """ 1348 return models.SpecialTask.query_count(kwargs) 1349 1350 1351 def get_host_num_special_tasks(host, **kwargs): 1352 """Get special task entries for a given host. 1353 1354 Query the special tasks table for tasks that ran on the host 1355 given by 'host' and matching the given 'kwargs'. 1356 Return a list of the results. If the host is assigned to a 1357 shard, forward this call to that shard. 1358 1359 @param host id or name of a host. More often a hostname. 1360 @param kwargs Filter keywords to pass to the underlying database query. 1361 1362 """ 1363 # Retrieve host data even if the host is in an invalid state. 1364 host_model = models.Host.smart_get(host, False) 1365 if not host_model.shard: 1366 return get_num_special_tasks(host=host, **kwargs) 1367 else: 1368 shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname()) 1369 return shard_afe.run('get_num_special_tasks', host=host, **kwargs) 1370 1371 1372 def get_status_task(host_id, end_time): 1373 """Get the "status task" for a host from the local shard. 1374 1375 Returns a single special task representing the given host's 1376 "status task". The status task is a completed special task that 1377 identifies whether the corresponding host was working or broken 1378 when it completed. A successful task indicates a working host; 1379 a failed task indicates broken. 1380 1381 This call will not be forward to a shard; the receiving server 1382 must be the shard that owns the host. 1383 1384 @param host_id Id in the database of the target host. 1385 @param end_time Time reference for the host's status. 1386 1387 @return A single task; its status (successful or not) 1388 corresponds to the status of the host (working or 1389 broken) at the given time. If no task is found, return 1390 `None`. 1391 1392 """ 1393 tasklist = rpc_utils.prepare_rows_as_nested_dicts( 1394 status_history.get_status_task(host_id, end_time), 1395 ('host', 'queue_entry')) 1396 return tasklist[0] if tasklist else None 1397 1398 1399 def get_host_status_task(host_id, end_time): 1400 """Get the "status task" for a host from its owning shard. 1401 1402 Finds the given host's owning shard, and forwards to it a call 1403 to `get_status_task()` (see above). 1404 1405 @param host_id Id in the database of the target host. 1406 @param end_time Time reference for the host's status. 1407 1408 @return A single task; its status (successful or not) 1409 corresponds to the status of the host (working or 1410 broken) at the given time. If no task is found, return 1411 `None`. 1412 1413 """ 1414 host = models.Host.smart_get(host_id) 1415 if not host.shard: 1416 return get_status_task(host_id, end_time) 1417 else: 1418 # The return values from AFE methods are post-processed 1419 # objects that aren't JSON-serializable. So, we have to 1420 # call AFE.run() to get the raw, serializable output from 1421 # the shard. 1422 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1423 return shard_afe.run('get_status_task', 1424 host_id=host_id, end_time=end_time) 1425 1426 1427 def get_host_diagnosis_interval(host_id, end_time, success): 1428 """Find a "diagnosis interval" for a given host. 1429 1430 A "diagnosis interval" identifies a start and end time where 1431 the host went from "working" to "broken", or vice versa. The 1432 interval's starting time is the starting time of the last status 1433 task with the old status; the end time is the finish time of the 1434 first status task with the new status. 1435 1436 This routine finds the most recent diagnosis interval for the 1437 given host prior to `end_time`, with a starting status matching 1438 `success`. If `success` is true, the interval will start with a 1439 successful status task; if false the interval will start with a 1440 failed status task. 1441 1442 @param host_id Id in the database of the target host. 1443 @param end_time Time reference for the diagnosis interval. 1444 @param success Whether the diagnosis interval should start 1445 with a successful or failed status task. 1446 1447 @return A list of two strings. The first is the timestamp for 1448 the beginning of the interval; the second is the 1449 timestamp for the end. If the host has never changed 1450 state, the list is empty. 1451 1452 """ 1453 host = models.Host.smart_get(host_id) 1454 if not host.shard or utils.is_shard(): 1455 return status_history.get_diagnosis_interval( 1456 host_id, end_time, success) 1457 else: 1458 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1459 return shard_afe.get_host_diagnosis_interval( 1460 host_id, end_time, success) 1461 1462 1463 # support for host detail view 1464 1465 def get_host_queue_entries_and_special_tasks(host, query_start=None, 1466 query_limit=None, start_time=None, 1467 end_time=None): 1468 """ 1469 @returns an interleaved list of HostQueueEntries and SpecialTasks, 1470 in approximate run order. each dict contains keys for type, host, 1471 job, status, started_on, execution_path, and ID. 1472 """ 1473 total_limit = None 1474 if query_limit is not None: 1475 total_limit = query_start + query_limit 1476 filter_data_common = {'host': host, 1477 'query_limit': total_limit, 1478 'sort_by': ['-id']} 1479 1480 filter_data_special_tasks = rpc_utils.inject_times_to_filter( 1481 'time_started__gte', 'time_started__lte', start_time, end_time, 1482 **filter_data_common) 1483 1484 queue_entries = get_host_queue_entries( 1485 start_time, end_time, **filter_data_common) 1486 special_tasks = get_host_special_tasks(host, **filter_data_special_tasks) 1487 1488 interleaved_entries = rpc_utils.interleave_entries(queue_entries, 1489 special_tasks) 1490 if query_start is not None: 1491 interleaved_entries = interleaved_entries[query_start:] 1492 if query_limit is not None: 1493 interleaved_entries = interleaved_entries[:query_limit] 1494 return rpc_utils.prepare_host_queue_entries_and_special_tasks( 1495 interleaved_entries, queue_entries) 1496 1497 1498 def get_num_host_queue_entries_and_special_tasks(host, start_time=None, 1499 end_time=None): 1500 filter_data_common = {'host': host} 1501 1502 filter_data_queue_entries, filter_data_special_tasks = ( 1503 rpc_utils.inject_times_to_hqe_special_tasks_filters( 1504 filter_data_common, start_time, end_time)) 1505 1506 return (models.HostQueueEntry.query_count(filter_data_queue_entries) 1507 + get_host_num_special_tasks(**filter_data_special_tasks)) 1508 1509 1510 # recurring run 1511 1512 def get_recurring(**filter_data): 1513 return rpc_utils.prepare_rows_as_nested_dicts( 1514 models.RecurringRun.query_objects(filter_data), 1515 ('job', 'owner')) 1516 1517 1518 def get_num_recurring(**filter_data): 1519 return models.RecurringRun.query_count(filter_data) 1520 1521 1522 def delete_recurring_runs(**filter_data): 1523 to_delete = models.RecurringRun.query_objects(filter_data) 1524 to_delete.delete() 1525 1526 1527 def create_recurring_run(job_id, start_date, loop_period, loop_count): 1528 owner = models.User.current_user().login 1529 job = models.Job.objects.get(id=job_id) 1530 return job.create_recurring_job(start_date=start_date, 1531 loop_period=loop_period, 1532 loop_count=loop_count, 1533 owner=owner) 1534 1535 1536 # other 1537 1538 def echo(data=""): 1539 """\ 1540 Returns a passed in string. For doing a basic test to see if RPC calls 1541 can successfully be made. 1542 """ 1543 return data 1544 1545 1546 def get_motd(): 1547 """\ 1548 Returns the message of the day as a string. 1549 """ 1550 return rpc_utils.get_motd() 1551 1552 1553 def get_static_data(): 1554 """\ 1555 Returns a dictionary containing a bunch of data that shouldn't change 1556 often and is otherwise inaccessible. This includes: 1557 1558 priorities: List of job priority choices. 1559 default_priority: Default priority value for new jobs. 1560 users: Sorted list of all users. 1561 labels: Sorted list of labels not start with 'cros-version' and 1562 'fw-version'. 1563 atomic_groups: Sorted list of all atomic groups. 1564 tests: Sorted list of all tests. 1565 profilers: Sorted list of all profilers. 1566 current_user: Logged-in username. 1567 host_statuses: Sorted list of possible Host statuses. 1568 job_statuses: Sorted list of possible HostQueueEntry statuses. 1569 job_timeout_default: The default job timeout length in minutes. 1570 parse_failed_repair_default: Default value for the parse_failed_repair job 1571 option. 1572 reboot_before_options: A list of valid RebootBefore string enums. 1573 reboot_after_options: A list of valid RebootAfter string enums. 1574 motd: Server's message of the day. 1575 status_dictionary: A mapping from one word job status names to a more 1576 informative description. 1577 """ 1578 1579 job_fields = models.Job.get_field_dict() 1580 default_drone_set_name = models.DroneSet.default_drone_set_name() 1581 drone_sets = ([default_drone_set_name] + 1582 sorted(drone_set.name for drone_set in 1583 models.DroneSet.objects.exclude( 1584 name=default_drone_set_name))) 1585 1586 result = {} 1587 result['priorities'] = priorities.Priority.choices() 1588 default_priority = priorities.Priority.DEFAULT 1589 result['default_priority'] = 'Default' 1590 result['max_schedulable_priority'] = priorities.Priority.DEFAULT 1591 result['users'] = get_users(sort_by=['login']) 1592 1593 label_exclude_filters = [{'name__startswith': 'cros-version'}, 1594 {'name__startswith': 'fw-version'}, 1595 {'name__startswith': 'fwrw-version'}, 1596 {'name__startswith': 'fwro-version'}] 1597 result['labels'] = get_labels( 1598 label_exclude_filters, 1599 sort_by=['-platform', 'name']) 1600 1601 result['atomic_groups'] = get_atomic_groups(sort_by=['name']) 1602 result['tests'] = get_tests(sort_by=['name']) 1603 result['profilers'] = get_profilers(sort_by=['name']) 1604 result['current_user'] = rpc_utils.prepare_for_serialization( 1605 models.User.current_user().get_object_dict()) 1606 result['host_statuses'] = sorted(models.Host.Status.names) 1607 result['job_statuses'] = sorted(models.HostQueueEntry.Status.names) 1608 result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS 1609 result['job_max_runtime_mins_default'] = ( 1610 models.Job.DEFAULT_MAX_RUNTIME_MINS) 1611 result['parse_failed_repair_default'] = bool( 1612 models.Job.DEFAULT_PARSE_FAILED_REPAIR) 1613 result['reboot_before_options'] = model_attributes.RebootBefore.names 1614 result['reboot_after_options'] = model_attributes.RebootAfter.names 1615 result['motd'] = rpc_utils.get_motd() 1616 result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled() 1617 result['drone_sets'] = drone_sets 1618 result['parameterized_jobs'] = models.Job.parameterized_jobs_enabled() 1619 1620 result['status_dictionary'] = {"Aborted": "Aborted", 1621 "Verifying": "Verifying Host", 1622 "Provisioning": "Provisioning Host", 1623 "Pending": "Waiting on other hosts", 1624 "Running": "Running autoserv", 1625 "Completed": "Autoserv completed", 1626 "Failed": "Failed to complete", 1627 "Queued": "Queued", 1628 "Starting": "Next in host's queue", 1629 "Stopped": "Other host(s) failed verify", 1630 "Parsing": "Awaiting parse of final results", 1631 "Gathering": "Gathering log files", 1632 "Template": "Template job for recurring run", 1633 "Waiting": "Waiting for scheduler action", 1634 "Archiving": "Archiving results", 1635 "Resetting": "Resetting hosts"} 1636 1637 result['wmatrix_url'] = rpc_utils.get_wmatrix_url() 1638 result['is_moblab'] = bool(utils.is_moblab()) 1639 1640 return result 1641 1642 1643 def get_server_time(): 1644 return datetime.datetime.now().strftime("%Y-%m-%d %H:%M") 1645