1 # pylint: disable-msg=C0111 2 3 import logging 4 from datetime import datetime 5 import django.core 6 try: 7 from django.db import models as dbmodels, connection 8 except django.core.exceptions.ImproperlyConfigured: 9 raise ImportError('Django database not yet configured. Import either ' 10 'setup_django_environment or ' 11 'setup_django_lite_environment from ' 12 'autotest_lib.frontend before any imports that ' 13 'depend on django models.') 14 from xml.sax import saxutils 15 import common 16 from autotest_lib.frontend.afe import model_logic, model_attributes 17 from autotest_lib.frontend.afe import rdb_model_extensions 18 from autotest_lib.frontend import settings, thread_local 19 from autotest_lib.client.common_lib import enum, error, host_protections 20 from autotest_lib.client.common_lib import global_config 21 from autotest_lib.client.common_lib import host_queue_entry_states 22 from autotest_lib.client.common_lib import control_data, priorities, decorators 23 from autotest_lib.client.common_lib import site_utils 24 from autotest_lib.client.common_lib.cros.graphite import autotest_es 25 from autotest_lib.server import utils as server_utils 26 27 # job options and user preferences 28 DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY 29 DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER 30 31 32 class AclAccessViolation(Exception): 33 """\ 34 Raised when an operation is attempted with proper permissions as 35 dictated by ACLs. 36 """ 37 38 39 class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model): 40 """\ 41 An atomic group defines a collection of hosts which must only be scheduled 42 all at once. Any host with a label having an atomic group will only be 43 scheduled for a job at the same time as other hosts sharing that label. 44 45 Required: 46 name: A name for this atomic group, e.g. 'rack23' or 'funky_net'. 47 max_number_of_machines: The maximum number of machines that will be 48 scheduled at once when scheduling jobs to this atomic group. 49 The job.synch_count is considered the minimum. 50 51 Optional: 52 description: Arbitrary text description of this group's purpose. 53 """ 54 name = dbmodels.CharField(max_length=255, unique=True) 55 description = dbmodels.TextField(blank=True) 56 # This magic value is the default to simplify the scheduler logic. 57 # It must be "large". The common use of atomic groups is to want all 58 # machines in the group to be used, limits on which subset used are 59 # often chosen via dependency labels. 60 # TODO(dennisjeffrey): Revisit this so we don't have to assume that 61 # "infinity" is around 3.3 million. 62 INFINITE_MACHINES = 333333333 63 max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES) 64 invalid = dbmodels.BooleanField(default=False, 65 editable=settings.FULL_ADMIN) 66 67 name_field = 'name' 68 objects = model_logic.ModelWithInvalidManager() 69 valid_objects = model_logic.ValidObjectsManager() 70 71 72 def enqueue_job(self, job, is_template=False): 73 """Enqueue a job on an associated atomic group of hosts. 74 75 @param job: A job to enqueue. 76 @param is_template: Whether the status should be "Template". 77 """ 78 queue_entry = HostQueueEntry.create(atomic_group=self, job=job, 79 is_template=is_template) 80 queue_entry.save() 81 82 83 def clean_object(self): 84 self.label_set.clear() 85 86 87 class Meta: 88 """Metadata for class AtomicGroup.""" 89 db_table = 'afe_atomic_groups' 90 91 92 def __unicode__(self): 93 return unicode(self.name) 94 95 96 class Label(model_logic.ModelWithInvalid, dbmodels.Model): 97 """\ 98 Required: 99 name: label name 100 101 Optional: 102 kernel_config: URL/path to kernel config for jobs run on this label. 103 platform: If True, this is a platform label (defaults to False). 104 only_if_needed: If True, a Host with this label can only be used if that 105 label is requested by the job/test (either as the meta_host or 106 in the job_dependencies). 107 atomic_group: The atomic group associated with this label. 108 """ 109 name = dbmodels.CharField(max_length=255, unique=True) 110 kernel_config = dbmodels.CharField(max_length=255, blank=True) 111 platform = dbmodels.BooleanField(default=False) 112 invalid = dbmodels.BooleanField(default=False, 113 editable=settings.FULL_ADMIN) 114 only_if_needed = dbmodels.BooleanField(default=False) 115 116 name_field = 'name' 117 objects = model_logic.ModelWithInvalidManager() 118 valid_objects = model_logic.ValidObjectsManager() 119 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 120 121 122 def clean_object(self): 123 self.host_set.clear() 124 self.test_set.clear() 125 126 127 def enqueue_job(self, job, atomic_group=None, is_template=False): 128 """Enqueue a job on any host of this label. 129 130 @param job: A job to enqueue. 131 @param atomic_group: The associated atomic group. 132 @param is_template: Whether the status should be "Template". 133 """ 134 queue_entry = HostQueueEntry.create(meta_host=self, job=job, 135 is_template=is_template, 136 atomic_group=atomic_group) 137 queue_entry.save() 138 139 140 141 class Meta: 142 """Metadata for class Label.""" 143 db_table = 'afe_labels' 144 145 146 def __unicode__(self): 147 return unicode(self.name) 148 149 150 class Shard(dbmodels.Model, model_logic.ModelExtensions): 151 152 hostname = dbmodels.CharField(max_length=255, unique=True) 153 154 name_field = 'hostname' 155 156 labels = dbmodels.ManyToManyField(Label, blank=True, 157 db_table='afe_shards_labels') 158 159 class Meta: 160 """Metadata for class ParameterizedJob.""" 161 db_table = 'afe_shards' 162 163 164 def rpc_hostname(self): 165 """Get the rpc hostname of the shard. 166 167 @return: Just the shard hostname for all non-testing environments. 168 The address of the default gateway for vm testing environments. 169 """ 170 # TODO: Figure out a better solution for testing. Since no 2 shards 171 # can run on the same host, if the shard hostname is localhost we 172 # conclude that it must be a vm in a test cluster. In such situations 173 # a name of localhost:<port> is necessary to achieve the correct 174 # afe links/redirection from the frontend (this happens through the 175 # host), but for rpcs that are performed *on* the shard, they need to 176 # use the address of the gateway. 177 # In the virtual machine testing environment (i.e., puppylab), each 178 # shard VM has a hostname like localhost:<port>. In the real cluster 179 # environment, a shard node does not have 'localhost' for its hostname. 180 # The following hostname substitution is needed only for the VM 181 # in puppylab. 182 # The 'hostname' should not be replaced in the case of real cluster. 183 if site_utils.is_puppylab_vm(self.hostname): 184 hostname = self.hostname.split(':')[0] 185 return self.hostname.replace( 186 hostname, site_utils.DEFAULT_VM_GATEWAY) 187 return self.hostname 188 189 190 class Drone(dbmodels.Model, model_logic.ModelExtensions): 191 """ 192 A scheduler drone 193 194 hostname: the drone's hostname 195 """ 196 hostname = dbmodels.CharField(max_length=255, unique=True) 197 198 name_field = 'hostname' 199 objects = model_logic.ExtendedManager() 200 201 202 def save(self, *args, **kwargs): 203 if not User.current_user().is_superuser(): 204 raise Exception('Only superusers may edit drones') 205 super(Drone, self).save(*args, **kwargs) 206 207 208 def delete(self): 209 if not User.current_user().is_superuser(): 210 raise Exception('Only superusers may delete drones') 211 super(Drone, self).delete() 212 213 214 class Meta: 215 """Metadata for class Drone.""" 216 db_table = 'afe_drones' 217 218 def __unicode__(self): 219 return unicode(self.hostname) 220 221 222 class DroneSet(dbmodels.Model, model_logic.ModelExtensions): 223 """ 224 A set of scheduler drones 225 226 These will be used by the scheduler to decide what drones a job is allowed 227 to run on. 228 229 name: the drone set's name 230 drones: the drones that are part of the set 231 """ 232 DRONE_SETS_ENABLED = global_config.global_config.get_config_value( 233 'SCHEDULER', 'drone_sets_enabled', type=bool, default=False) 234 DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value( 235 'SCHEDULER', 'default_drone_set_name', default=None) 236 237 name = dbmodels.CharField(max_length=255, unique=True) 238 drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones') 239 240 name_field = 'name' 241 objects = model_logic.ExtendedManager() 242 243 244 def save(self, *args, **kwargs): 245 if not User.current_user().is_superuser(): 246 raise Exception('Only superusers may edit drone sets') 247 super(DroneSet, self).save(*args, **kwargs) 248 249 250 def delete(self): 251 if not User.current_user().is_superuser(): 252 raise Exception('Only superusers may delete drone sets') 253 super(DroneSet, self).delete() 254 255 256 @classmethod 257 def drone_sets_enabled(cls): 258 """Returns whether drone sets are enabled. 259 260 @param cls: Implicit class object. 261 """ 262 return cls.DRONE_SETS_ENABLED 263 264 265 @classmethod 266 def default_drone_set_name(cls): 267 """Returns the default drone set name. 268 269 @param cls: Implicit class object. 270 """ 271 return cls.DEFAULT_DRONE_SET_NAME 272 273 274 @classmethod 275 def get_default(cls): 276 """Gets the default drone set name, compatible with Job.add_object. 277 278 @param cls: Implicit class object. 279 """ 280 return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME) 281 282 283 @classmethod 284 def resolve_name(cls, drone_set_name): 285 """ 286 Returns the name of one of these, if not None, in order of preference: 287 1) the drone set given, 288 2) the current user's default drone set, or 289 3) the global default drone set 290 291 or returns None if drone sets are disabled 292 293 @param cls: Implicit class object. 294 @param drone_set_name: A drone set name. 295 """ 296 if not cls.drone_sets_enabled(): 297 return None 298 299 user = User.current_user() 300 user_drone_set_name = user.drone_set and user.drone_set.name 301 302 return drone_set_name or user_drone_set_name or cls.get_default().name 303 304 305 def get_drone_hostnames(self): 306 """ 307 Gets the hostnames of all drones in this drone set 308 """ 309 return set(self.drones.all().values_list('hostname', flat=True)) 310 311 312 class Meta: 313 """Metadata for class DroneSet.""" 314 db_table = 'afe_drone_sets' 315 316 def __unicode__(self): 317 return unicode(self.name) 318 319 320 class User(dbmodels.Model, model_logic.ModelExtensions): 321 """\ 322 Required: 323 login :user login name 324 325 Optional: 326 access_level: 0=User (default), 1=Admin, 100=Root 327 """ 328 ACCESS_ROOT = 100 329 ACCESS_ADMIN = 1 330 ACCESS_USER = 0 331 332 AUTOTEST_SYSTEM = 'autotest_system' 333 334 login = dbmodels.CharField(max_length=255, unique=True) 335 access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True) 336 337 # user preferences 338 reboot_before = dbmodels.SmallIntegerField( 339 choices=model_attributes.RebootBefore.choices(), blank=True, 340 default=DEFAULT_REBOOT_BEFORE) 341 reboot_after = dbmodels.SmallIntegerField( 342 choices=model_attributes.RebootAfter.choices(), blank=True, 343 default=DEFAULT_REBOOT_AFTER) 344 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 345 show_experimental = dbmodels.BooleanField(default=False) 346 347 name_field = 'login' 348 objects = model_logic.ExtendedManager() 349 350 351 def save(self, *args, **kwargs): 352 # is this a new object being saved for the first time? 353 first_time = (self.id is None) 354 user = thread_local.get_user() 355 if user and not user.is_superuser() and user.login != self.login: 356 raise AclAccessViolation("You cannot modify user " + self.login) 357 super(User, self).save(*args, **kwargs) 358 if first_time: 359 everyone = AclGroup.objects.get(name='Everyone') 360 everyone.users.add(self) 361 362 363 def is_superuser(self): 364 """Returns whether the user has superuser access.""" 365 return self.access_level >= self.ACCESS_ROOT 366 367 368 @classmethod 369 def current_user(cls): 370 """Returns the current user. 371 372 @param cls: Implicit class object. 373 """ 374 user = thread_local.get_user() 375 if user is None: 376 user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM) 377 user.access_level = cls.ACCESS_ROOT 378 user.save() 379 return user 380 381 382 @classmethod 383 def get_record(cls, data): 384 """Check the database for an identical record. 385 386 Check for a record with matching id and login. If one exists, 387 return it. If one does not exist there is a possibility that 388 the following cases have happened: 389 1. Same id, different login 390 We received: "1 chromeos-test" 391 And we have: "1 debug-user" 392 In this case we need to delete "1 debug_user" and insert 393 "1 chromeos-test". 394 395 2. Same login, different id: 396 We received: "1 chromeos-test" 397 And we have: "2 chromeos-test" 398 In this case we need to delete "2 chromeos-test" and insert 399 "1 chromeos-test". 400 401 As long as this method deletes bad records and raises the 402 DoesNotExist exception the caller will handle creating the 403 new record. 404 405 @raises: DoesNotExist, if a record with the matching login and id 406 does not exist. 407 """ 408 409 # Both the id and login should be uniqe but there are cases when 410 # we might already have a user with the same login/id because 411 # current_user will proactively create a user record if it doesn't 412 # exist. Since we want to avoid conflict between the master and 413 # shard, just delete any existing user records that don't match 414 # what we're about to deserialize from the master. 415 try: 416 return cls.objects.get(login=data['login'], id=data['id']) 417 except cls.DoesNotExist: 418 cls.delete_matching_record(login=data['login']) 419 cls.delete_matching_record(id=data['id']) 420 raise 421 422 423 class Meta: 424 """Metadata for class User.""" 425 db_table = 'afe_users' 426 427 def __unicode__(self): 428 return unicode(self.login) 429 430 431 class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel, 432 model_logic.ModelWithAttributes): 433 """\ 434 Required: 435 hostname 436 437 optional: 438 locked: if true, host is locked and will not be queued 439 440 Internal: 441 From AbstractHostModel: 442 synch_id: currently unused 443 status: string describing status of host 444 invalid: true if the host has been deleted 445 protection: indicates what can be done to this host during repair 446 lock_time: DateTime at which the host was locked 447 dirty: true if the host has been used without being rebooted 448 Local: 449 locked_by: user that locked the host, or null if the host is unlocked 450 """ 451 452 SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set', 453 'hostattribute_set', 454 'labels', 455 'shard']) 456 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid']) 457 458 459 def custom_deserialize_relation(self, link, data): 460 assert link == 'shard', 'Link %s should not be deserialized' % link 461 self.shard = Shard.deserialize(data) 462 463 464 # Note: Only specify foreign keys here, specify all native host columns in 465 # rdb_model_extensions instead. 466 Protection = host_protections.Protection 467 labels = dbmodels.ManyToManyField(Label, blank=True, 468 db_table='afe_hosts_labels') 469 locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False) 470 name_field = 'hostname' 471 objects = model_logic.ModelWithInvalidManager() 472 valid_objects = model_logic.ValidObjectsManager() 473 leased_objects = model_logic.LeasedHostManager() 474 475 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 476 477 def __init__(self, *args, **kwargs): 478 super(Host, self).__init__(*args, **kwargs) 479 self._record_attributes(['status']) 480 481 482 @staticmethod 483 def create_one_time_host(hostname): 484 """Creates a one-time host. 485 486 @param hostname: The name for the host. 487 """ 488 query = Host.objects.filter(hostname=hostname) 489 if query.count() == 0: 490 host = Host(hostname=hostname, invalid=True) 491 host.do_validate() 492 else: 493 host = query[0] 494 if not host.invalid: 495 raise model_logic.ValidationError({ 496 'hostname' : '%s already exists in the autotest DB. ' 497 'Select it rather than entering it as a one time ' 498 'host.' % hostname 499 }) 500 host.protection = host_protections.Protection.DO_NOT_REPAIR 501 host.locked = False 502 host.save() 503 host.clean_object() 504 return host 505 506 507 @classmethod 508 def assign_to_shard(cls, shard, known_ids): 509 """Assigns hosts to a shard. 510 511 For all labels that have been assigned to a shard, all hosts that 512 have at least one of the shard's labels are assigned to the shard. 513 Hosts that are assigned to the shard but aren't already present on the 514 shard are returned. 515 516 Board to shard mapping is many-to-one. Many different boards can be 517 hosted in a shard. However, DUTs of a single board cannot be distributed 518 into more than one shard. 519 520 @param shard: The shard object to assign labels/hosts for. 521 @param known_ids: List of all host-ids the shard already knows. 522 This is used to figure out which hosts should be sent 523 to the shard. If shard_ids were used instead, hosts 524 would only be transferred once, even if the client 525 failed persisting them. 526 The number of hosts usually lies in O(100), so the 527 overhead is acceptable. 528 529 @returns the hosts objects that should be sent to the shard. 530 """ 531 532 # Disclaimer: concurrent heartbeats should theoretically not occur in 533 # the current setup. As they may be introduced in the near future, 534 # this comment will be left here. 535 536 # Sending stuff twice is acceptable, but forgetting something isn't. 537 # Detecting duplicates on the client is easy, but here it's harder. The 538 # following options were considered: 539 # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more 540 # than select returned, as concurrently more hosts might have been 541 # inserted 542 # - UPDATE and then SELECT WHERE shard=shard: select always returns all 543 # hosts for the shard, this is overhead 544 # - SELECT and then UPDATE only selected without requerying afterwards: 545 # returns the old state of the records. 546 host_ids = set(Host.objects.filter( 547 labels__in=shard.labels.all(), 548 leased=False 549 ).exclude( 550 id__in=known_ids, 551 ).values_list('pk', flat=True)) 552 553 if host_ids: 554 Host.objects.filter(pk__in=host_ids).update(shard=shard) 555 return list(Host.objects.filter(pk__in=host_ids).all()) 556 return [] 557 558 def resurrect_object(self, old_object): 559 super(Host, self).resurrect_object(old_object) 560 # invalid hosts can be in use by the scheduler (as one-time hosts), so 561 # don't change the status 562 self.status = old_object.status 563 564 565 def clean_object(self): 566 self.aclgroup_set.clear() 567 self.labels.clear() 568 569 570 def record_state(self, type_str, state, value, other_metadata=None): 571 """Record metadata in elasticsearch. 572 573 @param type_str: sets the _type field in elasticsearch db. 574 @param state: string representing what state we are recording, 575 e.g. 'locked' 576 @param value: value of the state, e.g. True 577 @param other_metadata: Other metadata to store in metaDB. 578 """ 579 metadata = { 580 state: value, 581 'hostname': self.hostname, 582 } 583 if other_metadata: 584 metadata = dict(metadata.items() + other_metadata.items()) 585 autotest_es.post(use_http=True, type_str=type_str, metadata=metadata) 586 587 588 def save(self, *args, **kwargs): 589 # extra spaces in the hostname can be a sneaky source of errors 590 self.hostname = self.hostname.strip() 591 # is this a new object being saved for the first time? 592 first_time = (self.id is None) 593 if not first_time: 594 AclGroup.check_for_acl_violation_hosts([self]) 595 # If locked is changed, send its status and user made the change to 596 # metaDB. Locks are important in host history because if a device is 597 # locked then we don't really care what state it is in. 598 if self.locked and not self.locked_by: 599 self.locked_by = User.current_user() 600 if not self.lock_time: 601 self.lock_time = datetime.now() 602 self.record_state('lock_history', 'locked', self.locked, 603 {'changed_by': self.locked_by.login, 604 'lock_reason': self.lock_reason}) 605 self.dirty = True 606 elif not self.locked and self.locked_by: 607 self.record_state('lock_history', 'locked', self.locked, 608 {'changed_by': self.locked_by.login}) 609 self.locked_by = None 610 self.lock_time = None 611 super(Host, self).save(*args, **kwargs) 612 if first_time: 613 everyone = AclGroup.objects.get(name='Everyone') 614 everyone.hosts.add(self) 615 self._check_for_updated_attributes() 616 617 618 def delete(self): 619 AclGroup.check_for_acl_violation_hosts([self]) 620 for queue_entry in self.hostqueueentry_set.all(): 621 queue_entry.deleted = True 622 queue_entry.abort() 623 super(Host, self).delete() 624 625 626 def on_attribute_changed(self, attribute, old_value): 627 assert attribute == 'status' 628 logging.info(self.hostname + ' -> ' + self.status) 629 630 631 def enqueue_job(self, job, atomic_group=None, is_template=False): 632 """Enqueue a job on this host. 633 634 @param job: A job to enqueue. 635 @param atomic_group: The associated atomic group. 636 @param is_template: Whther the status should be "Template". 637 """ 638 queue_entry = HostQueueEntry.create(host=self, job=job, 639 is_template=is_template, 640 atomic_group=atomic_group) 641 # allow recovery of dead hosts from the frontend 642 if not self.active_queue_entry() and self.is_dead(): 643 self.status = Host.Status.READY 644 self.save() 645 queue_entry.save() 646 647 block = IneligibleHostQueue(job=job, host=self) 648 block.save() 649 650 651 def platform(self): 652 """The platform of the host.""" 653 # TODO(showard): slighly hacky? 654 platforms = self.labels.filter(platform=True) 655 if len(platforms) == 0: 656 return None 657 return platforms[0] 658 platform.short_description = 'Platform' 659 660 661 @classmethod 662 def check_no_platform(cls, hosts): 663 """Verify the specified hosts have no associated platforms. 664 665 @param cls: Implicit class object. 666 @param hosts: The hosts to verify. 667 @raises model_logic.ValidationError if any hosts already have a 668 platform. 669 """ 670 Host.objects.populate_relationships(hosts, Label, 'label_list') 671 errors = [] 672 for host in hosts: 673 platforms = [label.name for label in host.label_list 674 if label.platform] 675 if platforms: 676 # do a join, just in case this host has multiple platforms, 677 # we'll be able to see it 678 errors.append('Host %s already has a platform: %s' % ( 679 host.hostname, ', '.join(platforms))) 680 if errors: 681 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 682 683 684 def is_dead(self): 685 """Returns whether the host is dead (has status repair failed).""" 686 return self.status == Host.Status.REPAIR_FAILED 687 688 689 def active_queue_entry(self): 690 """Returns the active queue entry for this host, or None if none.""" 691 active = list(self.hostqueueentry_set.filter(active=True)) 692 if not active: 693 return None 694 assert len(active) == 1, ('More than one active entry for ' 695 'host ' + self.hostname) 696 return active[0] 697 698 699 def _get_attribute_model_and_args(self, attribute): 700 return HostAttribute, dict(host=self, attribute=attribute) 701 702 703 @classmethod 704 def get_attribute_model(cls): 705 """Return the attribute model. 706 707 Override method in parent class. See ModelExtensions for details. 708 @returns: The attribute model of Host. 709 """ 710 return HostAttribute 711 712 713 class Meta: 714 """Metadata for the Host class.""" 715 db_table = 'afe_hosts' 716 717 718 def __unicode__(self): 719 return unicode(self.hostname) 720 721 722 class HostAttribute(dbmodels.Model, model_logic.ModelExtensions): 723 """Arbitrary keyvals associated with hosts.""" 724 725 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 726 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 727 host = dbmodels.ForeignKey(Host) 728 attribute = dbmodels.CharField(max_length=90) 729 value = dbmodels.CharField(max_length=300) 730 731 objects = model_logic.ExtendedManager() 732 733 class Meta: 734 """Metadata for the HostAttribute class.""" 735 db_table = 'afe_host_attributes' 736 737 738 @classmethod 739 def get_record(cls, data): 740 """Check the database for an identical record. 741 742 Use host_id and attribute to search for a existing record. 743 744 @raises: DoesNotExist, if no record found 745 @raises: MultipleObjectsReturned if multiple records found. 746 """ 747 # TODO(fdeng): We should use host_id and attribute together as 748 # a primary key in the db. 749 return cls.objects.get(host_id=data['host_id'], 750 attribute=data['attribute']) 751 752 753 @classmethod 754 def deserialize(cls, data): 755 """Override deserialize in parent class. 756 757 Do not deserialize id as id is not kept consistent on master and shards. 758 759 @param data: A dictionary of data to deserialize. 760 761 @returns: A HostAttribute object. 762 """ 763 if data: 764 data.pop('id') 765 return super(HostAttribute, cls).deserialize(data) 766 767 768 class Test(dbmodels.Model, model_logic.ModelExtensions): 769 """\ 770 Required: 771 author: author name 772 description: description of the test 773 name: test name 774 time: short, medium, long 775 test_class: This describes the class for your the test belongs in. 776 test_category: This describes the category for your tests 777 test_type: Client or Server 778 path: path to pass to run_test() 779 sync_count: is a number >=1 (1 being the default). If it's 1, then it's an 780 async job. If it's >1 it's sync job for that number of machines 781 i.e. if sync_count = 2 it is a sync job that requires two 782 machines. 783 Optional: 784 dependencies: What the test requires to run. Comma deliminated list 785 dependency_labels: many-to-many relationship with labels corresponding to 786 test dependencies. 787 experimental: If this is set to True production servers will ignore the test 788 run_verify: Whether or not the scheduler should run the verify stage 789 run_reset: Whether or not the scheduler should run the reset stage 790 test_retry: Number of times to retry test if the test did not complete 791 successfully. (optional, default: 0) 792 """ 793 TestTime = enum.Enum('SHORT', 'MEDIUM', 'LONG', start_value=1) 794 795 name = dbmodels.CharField(max_length=255, unique=True) 796 author = dbmodels.CharField(max_length=255) 797 test_class = dbmodels.CharField(max_length=255) 798 test_category = dbmodels.CharField(max_length=255) 799 dependencies = dbmodels.CharField(max_length=255, blank=True) 800 description = dbmodels.TextField(blank=True) 801 experimental = dbmodels.BooleanField(default=True) 802 run_verify = dbmodels.BooleanField(default=False) 803 test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(), 804 default=TestTime.MEDIUM) 805 test_type = dbmodels.SmallIntegerField( 806 choices=control_data.CONTROL_TYPE.choices()) 807 sync_count = dbmodels.IntegerField(default=1) 808 path = dbmodels.CharField(max_length=255, unique=True) 809 test_retry = dbmodels.IntegerField(blank=True, default=0) 810 run_reset = dbmodels.BooleanField(default=True) 811 812 dependency_labels = ( 813 dbmodels.ManyToManyField(Label, blank=True, 814 db_table='afe_autotests_dependency_labels')) 815 name_field = 'name' 816 objects = model_logic.ExtendedManager() 817 818 819 def admin_description(self): 820 """Returns a string representing the admin description.""" 821 escaped_description = saxutils.escape(self.description) 822 return '<span style="white-space:pre">%s</span>' % escaped_description 823 admin_description.allow_tags = True 824 admin_description.short_description = 'Description' 825 826 827 class Meta: 828 """Metadata for class Test.""" 829 db_table = 'afe_autotests' 830 831 def __unicode__(self): 832 return unicode(self.name) 833 834 835 class TestParameter(dbmodels.Model): 836 """ 837 A declared parameter of a test 838 """ 839 test = dbmodels.ForeignKey(Test) 840 name = dbmodels.CharField(max_length=255) 841 842 class Meta: 843 """Metadata for class TestParameter.""" 844 db_table = 'afe_test_parameters' 845 unique_together = ('test', 'name') 846 847 def __unicode__(self): 848 return u'%s (%s)' % (self.name, self.test.name) 849 850 851 class Profiler(dbmodels.Model, model_logic.ModelExtensions): 852 """\ 853 Required: 854 name: profiler name 855 test_type: Client or Server 856 857 Optional: 858 description: arbirary text description 859 """ 860 name = dbmodels.CharField(max_length=255, unique=True) 861 description = dbmodels.TextField(blank=True) 862 863 name_field = 'name' 864 objects = model_logic.ExtendedManager() 865 866 867 class Meta: 868 """Metadata for class Profiler.""" 869 db_table = 'afe_profilers' 870 871 def __unicode__(self): 872 return unicode(self.name) 873 874 875 class AclGroup(dbmodels.Model, model_logic.ModelExtensions): 876 """\ 877 Required: 878 name: name of ACL group 879 880 Optional: 881 description: arbitrary description of group 882 """ 883 884 SERIALIZATION_LINKS_TO_FOLLOW = set(['users']) 885 886 name = dbmodels.CharField(max_length=255, unique=True) 887 description = dbmodels.CharField(max_length=255, blank=True) 888 users = dbmodels.ManyToManyField(User, blank=False, 889 db_table='afe_acl_groups_users') 890 hosts = dbmodels.ManyToManyField(Host, blank=True, 891 db_table='afe_acl_groups_hosts') 892 893 name_field = 'name' 894 objects = model_logic.ExtendedManager() 895 896 @staticmethod 897 def check_for_acl_violation_hosts(hosts): 898 """Verify the current user has access to the specified hosts. 899 900 @param hosts: The hosts to verify against. 901 @raises AclAccessViolation if the current user doesn't have access 902 to a host. 903 """ 904 user = User.current_user() 905 if user.is_superuser(): 906 return 907 accessible_host_ids = set( 908 host.id for host in Host.objects.filter(aclgroup__users=user)) 909 for host in hosts: 910 # Check if the user has access to this host, 911 # but only if it is not a metahost or a one-time-host. 912 no_access = (isinstance(host, Host) 913 and not host.invalid 914 and int(host.id) not in accessible_host_ids) 915 if no_access: 916 raise AclAccessViolation("%s does not have access to %s" % 917 (str(user), str(host))) 918 919 920 @staticmethod 921 def check_abort_permissions(queue_entries): 922 """Look for queue entries that aren't abortable by the current user. 923 924 An entry is not abortable if: 925 * the job isn't owned by this user, and 926 * the machine isn't ACL-accessible, or 927 * the machine is in the "Everyone" ACL 928 929 @param queue_entries: The queue entries to check. 930 @raises AclAccessViolation if a queue entry is not abortable by the 931 current user. 932 """ 933 user = User.current_user() 934 if user.is_superuser(): 935 return 936 not_owned = queue_entries.exclude(job__owner=user.login) 937 # I do this using ID sets instead of just Django filters because 938 # filtering on M2M dbmodels is broken in Django 0.96. It's better in 939 # 1.0. 940 # TODO: Use Django filters, now that we're using 1.0. 941 accessible_ids = set( 942 entry.id for entry 943 in not_owned.filter(host__aclgroup__users__login=user.login)) 944 public_ids = set(entry.id for entry 945 in not_owned.filter(host__aclgroup__name='Everyone')) 946 cannot_abort = [entry for entry in not_owned.select_related() 947 if entry.id not in accessible_ids 948 or entry.id in public_ids] 949 if len(cannot_abort) == 0: 950 return 951 entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner, 952 entry.host_or_metahost_name()) 953 for entry in cannot_abort) 954 raise AclAccessViolation('You cannot abort the following job entries: ' 955 + entry_names) 956 957 958 def check_for_acl_violation_acl_group(self): 959 """Verifies the current user has acces to this ACL group. 960 961 @raises AclAccessViolation if the current user doesn't have access to 962 this ACL group. 963 """ 964 user = User.current_user() 965 if user.is_superuser(): 966 return 967 if self.name == 'Everyone': 968 raise AclAccessViolation("You cannot modify 'Everyone'!") 969 if not user in self.users.all(): 970 raise AclAccessViolation("You do not have access to %s" 971 % self.name) 972 973 @staticmethod 974 def on_host_membership_change(): 975 """Invoked when host membership changes.""" 976 everyone = AclGroup.objects.get(name='Everyone') 977 978 # find hosts that aren't in any ACL group and add them to Everyone 979 # TODO(showard): this is a bit of a hack, since the fact that this query 980 # works is kind of a coincidence of Django internals. This trick 981 # doesn't work in general (on all foreign key relationships). I'll 982 # replace it with a better technique when the need arises. 983 orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True) 984 everyone.hosts.add(*orphaned_hosts.distinct()) 985 986 # find hosts in both Everyone and another ACL group, and remove them 987 # from Everyone 988 hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone') 989 acled_hosts = set() 990 for host in hosts_in_everyone: 991 # Has an ACL group other than Everyone 992 if host.aclgroup_set.count() > 1: 993 acled_hosts.add(host) 994 everyone.hosts.remove(*acled_hosts) 995 996 997 def delete(self): 998 if (self.name == 'Everyone'): 999 raise AclAccessViolation("You cannot delete 'Everyone'!") 1000 self.check_for_acl_violation_acl_group() 1001 super(AclGroup, self).delete() 1002 self.on_host_membership_change() 1003 1004 1005 def add_current_user_if_empty(self): 1006 """Adds the current user if the set of users is empty.""" 1007 if not self.users.count(): 1008 self.users.add(User.current_user()) 1009 1010 1011 def perform_after_save(self, change): 1012 """Called after a save. 1013 1014 @param change: Whether there was a change. 1015 """ 1016 if not change: 1017 self.users.add(User.current_user()) 1018 self.add_current_user_if_empty() 1019 self.on_host_membership_change() 1020 1021 1022 def save(self, *args, **kwargs): 1023 change = bool(self.id) 1024 if change: 1025 # Check the original object for an ACL violation 1026 AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group() 1027 super(AclGroup, self).save(*args, **kwargs) 1028 self.perform_after_save(change) 1029 1030 1031 class Meta: 1032 """Metadata for class AclGroup.""" 1033 db_table = 'afe_acl_groups' 1034 1035 def __unicode__(self): 1036 return unicode(self.name) 1037 1038 1039 class Kernel(dbmodels.Model): 1040 """ 1041 A kernel configuration for a parameterized job 1042 """ 1043 version = dbmodels.CharField(max_length=255) 1044 cmdline = dbmodels.CharField(max_length=255, blank=True) 1045 1046 @classmethod 1047 def create_kernels(cls, kernel_list): 1048 """Creates all kernels in the kernel list. 1049 1050 @param cls: Implicit class object. 1051 @param kernel_list: A list of dictionaries that describe the kernels, 1052 in the same format as the 'kernel' argument to 1053 rpc_interface.generate_control_file. 1054 @return A list of the created kernels. 1055 """ 1056 if not kernel_list: 1057 return None 1058 return [cls._create(kernel) for kernel in kernel_list] 1059 1060 1061 @classmethod 1062 def _create(cls, kernel_dict): 1063 version = kernel_dict.pop('version') 1064 cmdline = kernel_dict.pop('cmdline', '') 1065 1066 if kernel_dict: 1067 raise Exception('Extraneous kernel arguments remain: %r' 1068 % kernel_dict) 1069 1070 kernel, _ = cls.objects.get_or_create(version=version, 1071 cmdline=cmdline) 1072 return kernel 1073 1074 1075 class Meta: 1076 """Metadata for class Kernel.""" 1077 db_table = 'afe_kernels' 1078 unique_together = ('version', 'cmdline') 1079 1080 def __unicode__(self): 1081 return u'%s %s' % (self.version, self.cmdline) 1082 1083 1084 class ParameterizedJob(dbmodels.Model): 1085 """ 1086 Auxiliary configuration for a parameterized job. 1087 """ 1088 test = dbmodels.ForeignKey(Test) 1089 label = dbmodels.ForeignKey(Label, null=True) 1090 use_container = dbmodels.BooleanField(default=False) 1091 profile_only = dbmodels.BooleanField(default=False) 1092 upload_kernel_config = dbmodels.BooleanField(default=False) 1093 1094 kernels = dbmodels.ManyToManyField( 1095 Kernel, db_table='afe_parameterized_job_kernels') 1096 profilers = dbmodels.ManyToManyField( 1097 Profiler, through='ParameterizedJobProfiler') 1098 1099 1100 @classmethod 1101 def smart_get(cls, id_or_name, *args, **kwargs): 1102 """For compatibility with Job.add_object. 1103 1104 @param cls: Implicit class object. 1105 @param id_or_name: The ID or name to get. 1106 @param args: Non-keyword arguments. 1107 @param kwargs: Keyword arguments. 1108 """ 1109 return cls.objects.get(pk=id_or_name) 1110 1111 1112 def job(self): 1113 """Returns the job if it exists, or else None.""" 1114 jobs = self.job_set.all() 1115 assert jobs.count() <= 1 1116 return jobs and jobs[0] or None 1117 1118 1119 class Meta: 1120 """Metadata for class ParameterizedJob.""" 1121 db_table = 'afe_parameterized_jobs' 1122 1123 def __unicode__(self): 1124 return u'%s (parameterized) - %s' % (self.test.name, self.job()) 1125 1126 1127 class ParameterizedJobProfiler(dbmodels.Model): 1128 """ 1129 A profiler to run on a parameterized job 1130 """ 1131 parameterized_job = dbmodels.ForeignKey(ParameterizedJob) 1132 profiler = dbmodels.ForeignKey(Profiler) 1133 1134 class Meta: 1135 """Metedata for class ParameterizedJobProfiler.""" 1136 db_table = 'afe_parameterized_jobs_profilers' 1137 unique_together = ('parameterized_job', 'profiler') 1138 1139 1140 class ParameterizedJobProfilerParameter(dbmodels.Model): 1141 """ 1142 A parameter for a profiler in a parameterized job 1143 """ 1144 parameterized_job_profiler = dbmodels.ForeignKey(ParameterizedJobProfiler) 1145 parameter_name = dbmodels.CharField(max_length=255) 1146 parameter_value = dbmodels.TextField() 1147 parameter_type = dbmodels.CharField( 1148 max_length=8, choices=model_attributes.ParameterTypes.choices()) 1149 1150 class Meta: 1151 """Metadata for class ParameterizedJobProfilerParameter.""" 1152 db_table = 'afe_parameterized_job_profiler_parameters' 1153 unique_together = ('parameterized_job_profiler', 'parameter_name') 1154 1155 def __unicode__(self): 1156 return u'%s - %s' % (self.parameterized_job_profiler.profiler.name, 1157 self.parameter_name) 1158 1159 1160 class ParameterizedJobParameter(dbmodels.Model): 1161 """ 1162 Parameters for a parameterized job 1163 """ 1164 parameterized_job = dbmodels.ForeignKey(ParameterizedJob) 1165 test_parameter = dbmodels.ForeignKey(TestParameter) 1166 parameter_value = dbmodels.TextField() 1167 parameter_type = dbmodels.CharField( 1168 max_length=8, choices=model_attributes.ParameterTypes.choices()) 1169 1170 class Meta: 1171 """Metadata for class ParameterizedJobParameter.""" 1172 db_table = 'afe_parameterized_job_parameters' 1173 unique_together = ('parameterized_job', 'test_parameter') 1174 1175 def __unicode__(self): 1176 return u'%s - %s' % (self.parameterized_job.job().name, 1177 self.test_parameter.name) 1178 1179 1180 class JobManager(model_logic.ExtendedManager): 1181 'Custom manager to provide efficient status counts querying.' 1182 def get_status_counts(self, job_ids): 1183 """Returns a dict mapping the given job IDs to their status count dicts. 1184 1185 @param job_ids: A list of job IDs. 1186 """ 1187 if not job_ids: 1188 return {} 1189 id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids) 1190 cursor = connection.cursor() 1191 cursor.execute(""" 1192 SELECT job_id, status, aborted, complete, COUNT(*) 1193 FROM afe_host_queue_entries 1194 WHERE job_id IN %s 1195 GROUP BY job_id, status, aborted, complete 1196 """ % id_list) 1197 all_job_counts = dict((job_id, {}) for job_id in job_ids) 1198 for job_id, status, aborted, complete, count in cursor.fetchall(): 1199 job_dict = all_job_counts[job_id] 1200 full_status = HostQueueEntry.compute_full_status(status, aborted, 1201 complete) 1202 job_dict.setdefault(full_status, 0) 1203 job_dict[full_status] += count 1204 return all_job_counts 1205 1206 1207 class Job(dbmodels.Model, model_logic.ModelExtensions): 1208 """\ 1209 owner: username of job owner 1210 name: job name (does not have to be unique) 1211 priority: Integer priority value. Higher is more important. 1212 control_file: contents of control file 1213 control_type: Client or Server 1214 created_on: date of job creation 1215 submitted_on: date of job submission 1216 synch_count: how many hosts should be used per autoserv execution 1217 run_verify: Whether or not to run the verify phase 1218 run_reset: Whether or not to run the reset phase 1219 timeout: DEPRECATED - hours from queuing time until job times out 1220 timeout_mins: minutes from job queuing time until the job times out 1221 max_runtime_hrs: DEPRECATED - hours from job starting time until job 1222 times out 1223 max_runtime_mins: minutes from job starting time until job times out 1224 email_list: list of people to email on completion delimited by any of: 1225 white space, ',', ':', ';' 1226 dependency_labels: many-to-many relationship with labels corresponding to 1227 job dependencies 1228 reboot_before: Never, If dirty, or Always 1229 reboot_after: Never, If all tests passed, or Always 1230 parse_failed_repair: if True, a failed repair launched by this job will have 1231 its results parsed as part of the job. 1232 drone_set: The set of drones to run this job on 1233 parent_job: Parent job (optional) 1234 test_retry: Number of times to retry test if the test did not complete 1235 successfully. (optional, default: 0) 1236 require_ssp: Require server-side packaging unless require_ssp is set to 1237 False. (optional, default: None) 1238 """ 1239 1240 # TODO: Investigate, if jobkeyval_set is really needed. 1241 # dynamic_suite will write them into an attached file for the drone, but 1242 # it doesn't seem like they are actually used. If they aren't used, remove 1243 # jobkeyval_set here. 1244 SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels', 1245 'hostqueueentry_set', 1246 'jobkeyval_set', 1247 'shard']) 1248 1249 # SQL for selecting jobs that should be sent to shard. 1250 # We use raw sql as django filters were not optimized. 1251 # The following jobs are excluded by the SQL. 1252 # - Non-aborted jobs known to shard as specified in |known_ids|. 1253 # Note for jobs aborted on master, even if already known to shard, 1254 # will be sent to shard again so that shard can abort them. 1255 # - Completed jobs 1256 # - Active jobs 1257 # - Jobs without host_queue_entries 1258 NON_ABORTED_KNOWN_JOBS = '(t2.aborted = 0 AND t1.id IN (%(known_ids)s))' 1259 1260 SQL_SHARD_JOBS = ( 1261 'SELECT DISTINCT(t1.id) FROM afe_jobs t1 ' 1262 'INNER JOIN afe_host_queue_entries t2 ON ' 1263 ' (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 ' 1264 ' %(check_known_jobs)s) ' 1265 'LEFT OUTER JOIN afe_jobs_dependency_labels t3 ON (t1.id = t3.job_id) ' 1266 'WHERE (t3.label_id IN ' 1267 ' (SELECT label_id FROM afe_shards_labels ' 1268 ' WHERE shard_id = %(shard_id)s) ' 1269 ' OR t2.meta_host IN ' 1270 ' (SELECT label_id FROM afe_shards_labels ' 1271 ' WHERE shard_id = %(shard_id)s))' 1272 ) 1273 1274 # Jobs can be created with assigned hosts and have no dependency 1275 # labels nor meta_host. 1276 # We are looking for: 1277 # - a job whose hqe's meta_host is null 1278 # - a job whose hqe has a host 1279 # - one of the host's labels matches the shard's label. 1280 # Non-aborted known jobs, completed jobs, active jobs, jobs 1281 # without hqe are exluded as we do with SQL_SHARD_JOBS. 1282 SQL_SHARD_JOBS_WITH_HOSTS = ( 1283 'SELECT DISTINCT(t1.id) FROM afe_jobs t1 ' 1284 'INNER JOIN afe_host_queue_entries t2 ON ' 1285 ' (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 ' 1286 ' AND t2.meta_host IS NULL AND t2.host_id IS NOT NULL ' 1287 ' %(check_known_jobs)s) ' 1288 'LEFT OUTER JOIN afe_hosts_labels t3 ON (t2.host_id = t3.host_id) ' 1289 'WHERE (t3.label_id IN ' 1290 ' (SELECT label_id FROM afe_shards_labels ' 1291 ' WHERE shard_id = %(shard_id)s))' 1292 ) 1293 1294 # Even if we had filters about complete, active and aborted 1295 # bits in the above two SQLs, there is a chance that 1296 # the result may still contain a job with an hqe with 'complete=1' 1297 # or 'active=1' or 'aborted=0 and afe_job.id in known jobs.' 1298 # This happens when a job has two (or more) hqes and at least 1299 # one hqe has different bits than others. 1300 # We use a second sql to ensure we exclude all un-desired jobs. 1301 SQL_JOBS_TO_EXCLUDE =( 1302 'SELECT t1.id FROM afe_jobs t1 ' 1303 'INNER JOIN afe_host_queue_entries t2 ON ' 1304 ' (t1.id = t2.job_id) ' 1305 'WHERE (t1.id in (%(candidates)s) ' 1306 ' AND (t2.complete=1 OR t2.active=1 ' 1307 ' %(check_known_jobs)s))' 1308 ) 1309 1310 def _deserialize_relation(self, link, data): 1311 if link in ['hostqueueentry_set', 'jobkeyval_set']: 1312 for obj in data: 1313 obj['job_id'] = self.id 1314 1315 super(Job, self)._deserialize_relation(link, data) 1316 1317 1318 def custom_deserialize_relation(self, link, data): 1319 assert link == 'shard', 'Link %s should not be deserialized' % link 1320 self.shard = Shard.deserialize(data) 1321 1322 1323 def sanity_check_update_from_shard(self, shard, updated_serialized): 1324 # If the job got aborted on the master after the client fetched it 1325 # no shard_id will be set. The shard might still push updates though, 1326 # as the job might complete before the abort bit syncs to the shard. 1327 # Alternative considered: The master scheduler could be changed to not 1328 # set aborted jobs to completed that are sharded out. But that would 1329 # require database queries and seemed more complicated to implement. 1330 # This seems safe to do, as there won't be updates pushed from the wrong 1331 # shards should be powered off and wiped hen they are removed from the 1332 # master. 1333 if self.shard_id and self.shard_id != shard.id: 1334 raise error.UnallowedRecordsSentToMaster( 1335 'Job id=%s is assigned to shard (%s). Cannot update it with %s ' 1336 'from shard %s.' % (self.id, self.shard_id, updated_serialized, 1337 shard.id)) 1338 1339 1340 # TIMEOUT is deprecated. 1341 DEFAULT_TIMEOUT = global_config.global_config.get_config_value( 1342 'AUTOTEST_WEB', 'job_timeout_default', default=24) 1343 DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value( 1344 'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60) 1345 # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is 1346 # completed. 1347 DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value( 1348 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72) 1349 DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value( 1350 'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60) 1351 DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value( 1352 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool, 1353 default=False) 1354 1355 owner = dbmodels.CharField(max_length=255) 1356 name = dbmodels.CharField(max_length=255) 1357 priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT) 1358 control_file = dbmodels.TextField(null=True, blank=True) 1359 control_type = dbmodels.SmallIntegerField( 1360 choices=control_data.CONTROL_TYPE.choices(), 1361 blank=True, # to allow 0 1362 default=control_data.CONTROL_TYPE.CLIENT) 1363 created_on = dbmodels.DateTimeField() 1364 synch_count = dbmodels.IntegerField(blank=True, default=0) 1365 timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT) 1366 run_verify = dbmodels.BooleanField(default=False) 1367 email_list = dbmodels.CharField(max_length=250, blank=True) 1368 dependency_labels = ( 1369 dbmodels.ManyToManyField(Label, blank=True, 1370 db_table='afe_jobs_dependency_labels')) 1371 reboot_before = dbmodels.SmallIntegerField( 1372 choices=model_attributes.RebootBefore.choices(), blank=True, 1373 default=DEFAULT_REBOOT_BEFORE) 1374 reboot_after = dbmodels.SmallIntegerField( 1375 choices=model_attributes.RebootAfter.choices(), blank=True, 1376 default=DEFAULT_REBOOT_AFTER) 1377 parse_failed_repair = dbmodels.BooleanField( 1378 default=DEFAULT_PARSE_FAILED_REPAIR) 1379 # max_runtime_hrs is deprecated. Will be removed after switch to mins is 1380 # completed. 1381 max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS) 1382 max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS) 1383 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 1384 1385 parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True, 1386 blank=True) 1387 1388 parent_job = dbmodels.ForeignKey('self', blank=True, null=True) 1389 1390 test_retry = dbmodels.IntegerField(blank=True, default=0) 1391 1392 run_reset = dbmodels.BooleanField(default=True) 1393 1394 timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS) 1395 1396 # If this is None on the master, a slave should be found. 1397 # If this is None on a slave, it should be synced back to the master 1398 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 1399 1400 # If this is None, server-side packaging will be used for server side test, 1401 # unless it's disabled in global config AUTOSERV/enable_ssp_container. 1402 require_ssp = dbmodels.NullBooleanField(default=None, blank=True, null=True) 1403 1404 # custom manager 1405 objects = JobManager() 1406 1407 1408 @decorators.cached_property 1409 def labels(self): 1410 """All the labels of this job""" 1411 # We need to convert dependency_labels to a list, because all() gives us 1412 # back an iterator, and storing/caching an iterator means we'd only be 1413 # able to read from it once. 1414 return list(self.dependency_labels.all()) 1415 1416 1417 def is_server_job(self): 1418 """Returns whether this job is of type server.""" 1419 return self.control_type == control_data.CONTROL_TYPE.SERVER 1420 1421 1422 @classmethod 1423 def parameterized_jobs_enabled(cls): 1424 """Returns whether parameterized jobs are enabled. 1425 1426 @param cls: Implicit class object. 1427 """ 1428 return global_config.global_config.get_config_value( 1429 'AUTOTEST_WEB', 'parameterized_jobs', type=bool) 1430 1431 1432 @classmethod 1433 def check_parameterized_job(cls, control_file, parameterized_job): 1434 """Checks that the job is valid given the global config settings. 1435 1436 First, either control_file must be set, or parameterized_job must be 1437 set, but not both. Second, parameterized_job must be set if and only if 1438 the parameterized_jobs option in the global config is set to True. 1439 1440 @param cls: Implict class object. 1441 @param control_file: A control file. 1442 @param parameterized_job: A parameterized job. 1443 """ 1444 if not (bool(control_file) ^ bool(parameterized_job)): 1445 raise Exception('Job must have either control file or ' 1446 'parameterization, but not both') 1447 1448 parameterized_jobs_enabled = cls.parameterized_jobs_enabled() 1449 if control_file and parameterized_jobs_enabled: 1450 raise Exception('Control file specified, but parameterized jobs ' 1451 'are enabled') 1452 if parameterized_job and not parameterized_jobs_enabled: 1453 raise Exception('Parameterized job specified, but parameterized ' 1454 'jobs are not enabled') 1455 1456 1457 @classmethod 1458 def create(cls, owner, options, hosts): 1459 """Creates a job. 1460 1461 The job is created by taking some information (the listed args) and 1462 filling in the rest of the necessary information. 1463 1464 @param cls: Implicit class object. 1465 @param owner: The owner for the job. 1466 @param options: An options object. 1467 @param hosts: The hosts to use. 1468 """ 1469 AclGroup.check_for_acl_violation_hosts(hosts) 1470 1471 control_file = options.get('control_file') 1472 parameterized_job = options.get('parameterized_job') 1473 1474 # The current implementation of parameterized jobs requires that only 1475 # control files or parameterized jobs are used. Using the image 1476 # parameter on autoupdate_ParameterizedJob doesn't mix pure 1477 # parameterized jobs and control files jobs, it does muck enough with 1478 # normal jobs by adding a parameterized id to them that this check will 1479 # fail. So for now we just skip this check. 1480 # cls.check_parameterized_job(control_file=control_file, 1481 # parameterized_job=parameterized_job) 1482 user = User.current_user() 1483 if options.get('reboot_before') is None: 1484 options['reboot_before'] = user.get_reboot_before_display() 1485 if options.get('reboot_after') is None: 1486 options['reboot_after'] = user.get_reboot_after_display() 1487 1488 drone_set = DroneSet.resolve_name(options.get('drone_set')) 1489 1490 if options.get('timeout_mins') is None and options.get('timeout'): 1491 options['timeout_mins'] = options['timeout'] * 60 1492 1493 job = cls.add_object( 1494 owner=owner, 1495 name=options['name'], 1496 priority=options['priority'], 1497 control_file=control_file, 1498 control_type=options['control_type'], 1499 synch_count=options.get('synch_count'), 1500 # timeout needs to be deleted in the future. 1501 timeout=options.get('timeout'), 1502 timeout_mins=options.get('timeout_mins'), 1503 max_runtime_mins=options.get('max_runtime_mins'), 1504 run_verify=options.get('run_verify'), 1505 email_list=options.get('email_list'), 1506 reboot_before=options.get('reboot_before'), 1507 reboot_after=options.get('reboot_after'), 1508 parse_failed_repair=options.get('parse_failed_repair'), 1509 created_on=datetime.now(), 1510 drone_set=drone_set, 1511 parameterized_job=parameterized_job, 1512 parent_job=options.get('parent_job_id'), 1513 test_retry=options.get('test_retry'), 1514 run_reset=options.get('run_reset'), 1515 require_ssp=options.get('require_ssp')) 1516 1517 job.dependency_labels = options['dependencies'] 1518 1519 if options.get('keyvals'): 1520 for key, value in options['keyvals'].iteritems(): 1521 JobKeyval.objects.create(job=job, key=key, value=value) 1522 1523 return job 1524 1525 1526 @classmethod 1527 def assign_to_shard(cls, shard, known_ids): 1528 """Assigns unassigned jobs to a shard. 1529 1530 For all labels that have been assigned to this shard, all jobs that 1531 have this label, are assigned to this shard. 1532 1533 Jobs that are assigned to the shard but aren't already present on the 1534 shard are returned. 1535 1536 @param shard: The shard to assign jobs to. 1537 @param known_ids: List of all ids of incomplete jobs, the shard already 1538 knows about. 1539 This is used to figure out which jobs should be sent 1540 to the shard. If shard_ids were used instead, jobs 1541 would only be transferred once, even if the client 1542 failed persisting them. 1543 The number of unfinished jobs usually lies in O(1000). 1544 Assuming one id takes 8 chars in the json, this means 1545 overhead that lies in the lower kilobyte range. 1546 A not in query with 5000 id's takes about 30ms. 1547 1548 @returns The job objects that should be sent to the shard. 1549 """ 1550 # Disclaimer: Concurrent heartbeats should not occur in today's setup. 1551 # If this changes or they are triggered manually, this applies: 1552 # Jobs may be returned more than once by concurrent calls of this 1553 # function, as there is a race condition between SELECT and UPDATE. 1554 job_ids = set([]) 1555 check_known_jobs_exclude = '' 1556 check_known_jobs_include = '' 1557 1558 if known_ids: 1559 check_known_jobs = ( 1560 cls.NON_ABORTED_KNOWN_JOBS % 1561 {'known_ids': ','.join([str(i) for i in known_ids])}) 1562 check_known_jobs_exclude = 'AND NOT ' + check_known_jobs 1563 check_known_jobs_include = 'OR ' + check_known_jobs 1564 1565 for sql in [cls.SQL_SHARD_JOBS, cls.SQL_SHARD_JOBS_WITH_HOSTS]: 1566 query = Job.objects.raw(sql % { 1567 'check_known_jobs': check_known_jobs_exclude, 1568 'shard_id': shard.id}) 1569 job_ids |= set([j.id for j in query]) 1570 1571 if job_ids: 1572 query = Job.objects.raw( 1573 cls.SQL_JOBS_TO_EXCLUDE % 1574 {'check_known_jobs': check_known_jobs_include, 1575 'candidates': ','.join([str(i) for i in job_ids])}) 1576 job_ids -= set([j.id for j in query]) 1577 1578 if job_ids: 1579 Job.objects.filter(pk__in=job_ids).update(shard=shard) 1580 return list(Job.objects.filter(pk__in=job_ids).all()) 1581 return [] 1582 1583 1584 def save(self, *args, **kwargs): 1585 # The current implementation of parameterized jobs requires that only 1586 # control files or parameterized jobs are used. Using the image 1587 # parameter on autoupdate_ParameterizedJob doesn't mix pure 1588 # parameterized jobs and control files jobs, it does muck enough with 1589 # normal jobs by adding a parameterized id to them that this check will 1590 # fail. So for now we just skip this check. 1591 # cls.check_parameterized_job(control_file=self.control_file, 1592 # parameterized_job=self.parameterized_job) 1593 super(Job, self).save(*args, **kwargs) 1594 1595 1596 def queue(self, hosts, atomic_group=None, is_template=False): 1597 """Enqueue a job on the given hosts. 1598 1599 @param hosts: The hosts to use. 1600 @param atomic_group: The associated atomic group. 1601 @param is_template: Whether the status should be "Template". 1602 """ 1603 if not hosts: 1604 if atomic_group: 1605 # No hosts or labels are required to queue an atomic group 1606 # Job. However, if they are given, we respect them below. 1607 atomic_group.enqueue_job(self, is_template=is_template) 1608 else: 1609 # hostless job 1610 entry = HostQueueEntry.create(job=self, is_template=is_template) 1611 entry.save() 1612 return 1613 1614 for host in hosts: 1615 host.enqueue_job(self, atomic_group=atomic_group, 1616 is_template=is_template) 1617 1618 1619 def create_recurring_job(self, start_date, loop_period, loop_count, owner): 1620 """Creates a recurring job. 1621 1622 @param start_date: The starting date of the job. 1623 @param loop_period: How often to re-run the job, in seconds. 1624 @param loop_count: The re-run count. 1625 @param owner: The owner of the job. 1626 """ 1627 rec = RecurringRun(job=self, start_date=start_date, 1628 loop_period=loop_period, 1629 loop_count=loop_count, 1630 owner=User.objects.get(login=owner)) 1631 rec.save() 1632 return rec.id 1633 1634 1635 def user(self): 1636 """Gets the user of this job, or None if it doesn't exist.""" 1637 try: 1638 return User.objects.get(login=self.owner) 1639 except self.DoesNotExist: 1640 return None 1641 1642 1643 def abort(self): 1644 """Aborts this job.""" 1645 for queue_entry in self.hostqueueentry_set.all(): 1646 queue_entry.abort() 1647 1648 1649 def tag(self): 1650 """Returns a string tag for this job.""" 1651 return server_utils.get_job_tag(self.id, self.owner) 1652 1653 1654 def keyval_dict(self): 1655 """Returns all keyvals for this job as a dictionary.""" 1656 return dict((keyval.key, keyval.value) 1657 for keyval in self.jobkeyval_set.all()) 1658 1659 1660 @classmethod 1661 def get_attribute_model(cls): 1662 """Return the attribute model. 1663 1664 Override method in parent class. This class is called when 1665 deserializing the one-to-many relationship betwen Job and JobKeyval. 1666 On deserialization, we will try to clear any existing job keyvals 1667 associated with a job to avoid any inconsistency. 1668 Though Job doesn't implement ModelWithAttribute, we still treat 1669 it as an attribute model for this purpose. 1670 1671 @returns: The attribute model of Job. 1672 """ 1673 return JobKeyval 1674 1675 1676 class Meta: 1677 """Metadata for class Job.""" 1678 db_table = 'afe_jobs' 1679 1680 def __unicode__(self): 1681 return u'%s (%s-%s)' % (self.name, self.id, self.owner) 1682 1683 1684 class JobKeyval(dbmodels.Model, model_logic.ModelExtensions): 1685 """Keyvals associated with jobs""" 1686 1687 SERIALIZATION_LINKS_TO_KEEP = set(['job']) 1688 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 1689 1690 job = dbmodels.ForeignKey(Job) 1691 key = dbmodels.CharField(max_length=90) 1692 value = dbmodels.CharField(max_length=300) 1693 1694 objects = model_logic.ExtendedManager() 1695 1696 1697 @classmethod 1698 def get_record(cls, data): 1699 """Check the database for an identical record. 1700 1701 Use job_id and key to search for a existing record. 1702 1703 @raises: DoesNotExist, if no record found 1704 @raises: MultipleObjectsReturned if multiple records found. 1705 """ 1706 # TODO(fdeng): We should use job_id and key together as 1707 # a primary key in the db. 1708 return cls.objects.get(job_id=data['job_id'], key=data['key']) 1709 1710 1711 @classmethod 1712 def deserialize(cls, data): 1713 """Override deserialize in parent class. 1714 1715 Do not deserialize id as id is not kept consistent on master and shards. 1716 1717 @param data: A dictionary of data to deserialize. 1718 1719 @returns: A JobKeyval object. 1720 """ 1721 if data: 1722 data.pop('id') 1723 return super(JobKeyval, cls).deserialize(data) 1724 1725 1726 class Meta: 1727 """Metadata for class JobKeyval.""" 1728 db_table = 'afe_job_keyvals' 1729 1730 1731 class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): 1732 """Represents an ineligible host queue.""" 1733 job = dbmodels.ForeignKey(Job) 1734 host = dbmodels.ForeignKey(Host) 1735 1736 objects = model_logic.ExtendedManager() 1737 1738 class Meta: 1739 """Metadata for class IneligibleHostQueue.""" 1740 db_table = 'afe_ineligible_host_queues' 1741 1742 1743 class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1744 """Represents a host queue entry.""" 1745 1746 SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host']) 1747 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 1748 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted']) 1749 1750 1751 def custom_deserialize_relation(self, link, data): 1752 assert link == 'meta_host' 1753 self.meta_host = Label.deserialize(data) 1754 1755 1756 def sanity_check_update_from_shard(self, shard, updated_serialized, 1757 job_ids_sent): 1758 if self.job_id not in job_ids_sent: 1759 raise error.UnallowedRecordsSentToMaster( 1760 'Sent HostQueueEntry without corresponding ' 1761 'job entry: %s' % updated_serialized) 1762 1763 1764 Status = host_queue_entry_states.Status 1765 ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES 1766 COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES 1767 1768 job = dbmodels.ForeignKey(Job) 1769 host = dbmodels.ForeignKey(Host, blank=True, null=True) 1770 status = dbmodels.CharField(max_length=255) 1771 meta_host = dbmodels.ForeignKey(Label, blank=True, null=True, 1772 db_column='meta_host') 1773 active = dbmodels.BooleanField(default=False) 1774 complete = dbmodels.BooleanField(default=False) 1775 deleted = dbmodels.BooleanField(default=False) 1776 execution_subdir = dbmodels.CharField(max_length=255, blank=True, 1777 default='') 1778 # If atomic_group is set, this is a virtual HostQueueEntry that will 1779 # be expanded into many actual hosts within the group at schedule time. 1780 atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True) 1781 aborted = dbmodels.BooleanField(default=False) 1782 started_on = dbmodels.DateTimeField(null=True, blank=True) 1783 finished_on = dbmodels.DateTimeField(null=True, blank=True) 1784 1785 objects = model_logic.ExtendedManager() 1786 1787 1788 def __init__(self, *args, **kwargs): 1789 super(HostQueueEntry, self).__init__(*args, **kwargs) 1790 self._record_attributes(['status']) 1791 1792 1793 @classmethod 1794 def create(cls, job, host=None, meta_host=None, atomic_group=None, 1795 is_template=False): 1796 """Creates a new host queue entry. 1797 1798 @param cls: Implicit class object. 1799 @param job: The associated job. 1800 @param host: The associated host. 1801 @param meta_host: The associated meta host. 1802 @param atomic_group: The associated atomic group. 1803 @param is_template: Whether the status should be "Template". 1804 """ 1805 if is_template: 1806 status = cls.Status.TEMPLATE 1807 else: 1808 status = cls.Status.QUEUED 1809 1810 return cls(job=job, host=host, meta_host=meta_host, 1811 atomic_group=atomic_group, status=status) 1812 1813 1814 def save(self, *args, **kwargs): 1815 self._set_active_and_complete() 1816 super(HostQueueEntry, self).save(*args, **kwargs) 1817 self._check_for_updated_attributes() 1818 1819 1820 def execution_path(self): 1821 """ 1822 Path to this entry's results (relative to the base results directory). 1823 """ 1824 return server_utils.get_hqe_exec_path(self.job.tag(), 1825 self.execution_subdir) 1826 1827 1828 def host_or_metahost_name(self): 1829 """Returns the first non-None name found in priority order. 1830 1831 The priority order checked is: (1) host name; (2) meta host name; and 1832 (3) atomic group name. 1833 """ 1834 if self.host: 1835 return self.host.hostname 1836 elif self.meta_host: 1837 return self.meta_host.name 1838 else: 1839 assert self.atomic_group, "no host, meta_host or atomic group!" 1840 return self.atomic_group.name 1841 1842 1843 def _set_active_and_complete(self): 1844 if self.status in self.ACTIVE_STATUSES: 1845 self.active, self.complete = True, False 1846 elif self.status in self.COMPLETE_STATUSES: 1847 self.active, self.complete = False, True 1848 else: 1849 self.active, self.complete = False, False 1850 1851 1852 def on_attribute_changed(self, attribute, old_value): 1853 assert attribute == 'status' 1854 logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id, 1855 self.status) 1856 1857 1858 def is_meta_host_entry(self): 1859 'True if this is a entry has a meta_host instead of a host.' 1860 return self.host is None and self.meta_host is not None 1861 1862 1863 # This code is shared between rpc_interface and models.HostQueueEntry. 1864 # Sadly due to circular imports between the 2 (crbug.com/230100) making it 1865 # a class method was the best way to refactor it. Attempting to put it in 1866 # rpc_utils or a new utils module failed as that would require us to import 1867 # models.py but to call it from here we would have to import the utils.py 1868 # thus creating a cycle. 1869 @classmethod 1870 def abort_host_queue_entries(cls, host_queue_entries): 1871 """Aborts a collection of host_queue_entries. 1872 1873 Abort these host queue entry and all host queue entries of jobs created 1874 by them. 1875 1876 @param host_queue_entries: List of host queue entries we want to abort. 1877 """ 1878 # This isn't completely immune to race conditions since it's not atomic, 1879 # but it should be safe given the scheduler's behavior. 1880 1881 # TODO(milleral): crbug.com/230100 1882 # The |abort_host_queue_entries| rpc does nearly exactly this, 1883 # however, trying to re-use the code generates some horrible 1884 # circular import error. I'd be nice to refactor things around 1885 # sometime so the code could be reused. 1886 1887 # Fixpoint algorithm to find the whole tree of HQEs to abort to 1888 # minimize the total number of database queries: 1889 children = set() 1890 new_children = set(host_queue_entries) 1891 while new_children: 1892 children.update(new_children) 1893 new_child_ids = [hqe.job_id for hqe in new_children] 1894 new_children = HostQueueEntry.objects.filter( 1895 job__parent_job__in=new_child_ids, 1896 complete=False, aborted=False).all() 1897 # To handle circular parental relationships 1898 new_children = set(new_children) - children 1899 1900 # Associate a user with the host queue entries that we're about 1901 # to abort so that we can look up who to blame for the aborts. 1902 now = datetime.now() 1903 user = User.current_user() 1904 aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe, 1905 aborted_by=user, aborted_on=now) for hqe in children] 1906 AbortedHostQueueEntry.objects.bulk_create(aborted_hqes) 1907 # Bulk update all of the HQEs to set the abort bit. 1908 child_ids = [hqe.id for hqe in children] 1909 HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True) 1910 1911 1912 def abort(self): 1913 """ Aborts this host queue entry. 1914 1915 Abort this host queue entry and all host queue entries of jobs created by 1916 this one. 1917 1918 """ 1919 if not self.complete and not self.aborted: 1920 HostQueueEntry.abort_host_queue_entries([self]) 1921 1922 1923 @classmethod 1924 def compute_full_status(cls, status, aborted, complete): 1925 """Returns a modified status msg if the host queue entry was aborted. 1926 1927 @param cls: Implicit class object. 1928 @param status: The original status message. 1929 @param aborted: Whether the host queue entry was aborted. 1930 @param complete: Whether the host queue entry was completed. 1931 """ 1932 if aborted and not complete: 1933 return 'Aborted (%s)' % status 1934 return status 1935 1936 1937 def full_status(self): 1938 """Returns the full status of this host queue entry, as a string.""" 1939 return self.compute_full_status(self.status, self.aborted, 1940 self.complete) 1941 1942 1943 def _postprocess_object_dict(self, object_dict): 1944 object_dict['full_status'] = self.full_status() 1945 1946 1947 class Meta: 1948 """Metadata for class HostQueueEntry.""" 1949 db_table = 'afe_host_queue_entries' 1950 1951 1952 1953 def __unicode__(self): 1954 hostname = None 1955 if self.host: 1956 hostname = self.host.hostname 1957 return u"%s/%d (%d)" % (hostname, self.job.id, self.id) 1958 1959 1960 class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1961 """Represents an aborted host queue entry.""" 1962 queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True) 1963 aborted_by = dbmodels.ForeignKey(User) 1964 aborted_on = dbmodels.DateTimeField() 1965 1966 objects = model_logic.ExtendedManager() 1967 1968 1969 def save(self, *args, **kwargs): 1970 self.aborted_on = datetime.now() 1971 super(AbortedHostQueueEntry, self).save(*args, **kwargs) 1972 1973 class Meta: 1974 """Metadata for class AbortedHostQueueEntry.""" 1975 db_table = 'afe_aborted_host_queue_entries' 1976 1977 1978 class RecurringRun(dbmodels.Model, model_logic.ModelExtensions): 1979 """\ 1980 job: job to use as a template 1981 owner: owner of the instantiated template 1982 start_date: Run the job at scheduled date 1983 loop_period: Re-run (loop) the job periodically 1984 (in every loop_period seconds) 1985 loop_count: Re-run (loop) count 1986 """ 1987 1988 job = dbmodels.ForeignKey(Job) 1989 owner = dbmodels.ForeignKey(User) 1990 start_date = dbmodels.DateTimeField() 1991 loop_period = dbmodels.IntegerField(blank=True) 1992 loop_count = dbmodels.IntegerField(blank=True) 1993 1994 objects = model_logic.ExtendedManager() 1995 1996 class Meta: 1997 """Metadata for class RecurringRun.""" 1998 db_table = 'afe_recurring_run' 1999 2000 def __unicode__(self): 2001 return u'RecurringRun(job %s, start %s, period %s, count %s)' % ( 2002 self.job.id, self.start_date, self.loop_period, self.loop_count) 2003 2004 2005 class SpecialTask(dbmodels.Model, model_logic.ModelExtensions): 2006 """\ 2007 Tasks to run on hosts at the next time they are in the Ready state. Use this 2008 for high-priority tasks, such as forced repair or forced reinstall. 2009 2010 host: host to run this task on 2011 task: special task to run 2012 time_requested: date and time the request for this task was made 2013 is_active: task is currently running 2014 is_complete: task has finished running 2015 is_aborted: task was aborted 2016 time_started: date and time the task started 2017 time_finished: date and time the task finished 2018 queue_entry: Host queue entry waiting on this task (or None, if task was not 2019 started in preparation of a job) 2020 """ 2021 Task = enum.Enum('Verify', 'Cleanup', 'Repair', 'Reset', 'Provision', 2022 string_values=True) 2023 2024 host = dbmodels.ForeignKey(Host, blank=False, null=False) 2025 task = dbmodels.CharField(max_length=64, choices=Task.choices(), 2026 blank=False, null=False) 2027 requested_by = dbmodels.ForeignKey(User) 2028 time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False, 2029 null=False) 2030 is_active = dbmodels.BooleanField(default=False, blank=False, null=False) 2031 is_complete = dbmodels.BooleanField(default=False, blank=False, null=False) 2032 is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False) 2033 time_started = dbmodels.DateTimeField(null=True, blank=True) 2034 queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True) 2035 success = dbmodels.BooleanField(default=False, blank=False, null=False) 2036 time_finished = dbmodels.DateTimeField(null=True, blank=True) 2037 2038 objects = model_logic.ExtendedManager() 2039 2040 2041 def save(self, **kwargs): 2042 if self.queue_entry: 2043 self.requested_by = User.objects.get( 2044 login=self.queue_entry.job.owner) 2045 super(SpecialTask, self).save(**kwargs) 2046 2047 2048 def execution_path(self): 2049 """Returns the execution path for a special task.""" 2050 return server_utils.get_special_task_exec_path( 2051 self.host.hostname, self.id, self.task, self.time_requested) 2052 2053 2054 # property to emulate HostQueueEntry.status 2055 @property 2056 def status(self): 2057 """Returns a host queue entry status appropriate for a speical task.""" 2058 return server_utils.get_special_task_status( 2059 self.is_complete, self.success, self.is_active) 2060 2061 2062 # property to emulate HostQueueEntry.started_on 2063 @property 2064 def started_on(self): 2065 """Returns the time at which this special task started.""" 2066 return self.time_started 2067 2068 2069 @classmethod 2070 def schedule_special_task(cls, host, task): 2071 """Schedules a special task on a host if not already scheduled. 2072 2073 @param cls: Implicit class object. 2074 @param host: The host to use. 2075 @param task: The task to schedule. 2076 """ 2077 existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task, 2078 is_active=False, 2079 is_complete=False) 2080 if existing_tasks: 2081 return existing_tasks[0] 2082 2083 special_task = SpecialTask(host=host, task=task, 2084 requested_by=User.current_user()) 2085 special_task.save() 2086 return special_task 2087 2088 2089 def abort(self): 2090 """ Abort this special task.""" 2091 self.is_aborted = True 2092 self.save() 2093 2094 2095 def activate(self): 2096 """ 2097 Sets a task as active and sets the time started to the current time. 2098 """ 2099 logging.info('Starting: %s', self) 2100 self.is_active = True 2101 self.time_started = datetime.now() 2102 self.save() 2103 2104 2105 def finish(self, success): 2106 """Sets a task as completed. 2107 2108 @param success: Whether or not the task was successful. 2109 """ 2110 logging.info('Finished: %s', self) 2111 self.is_active = False 2112 self.is_complete = True 2113 self.success = success 2114 if self.time_started: 2115 self.time_finished = datetime.now() 2116 self.save() 2117 2118 2119 class Meta: 2120 """Metadata for class SpecialTask.""" 2121 db_table = 'afe_special_tasks' 2122 2123 2124 def __unicode__(self): 2125 result = u'Special Task %s (host %s, task %s, time %s)' % ( 2126 self.id, self.host, self.task, self.time_requested) 2127 if self.is_complete: 2128 result += u' (completed)' 2129 elif self.is_active: 2130 result += u' (active)' 2131 2132 return result 2133 2134 2135 class StableVersion(dbmodels.Model, model_logic.ModelExtensions): 2136 2137 board = dbmodels.CharField(max_length=255, unique=True) 2138 version = dbmodels.CharField(max_length=255) 2139 2140 class Meta: 2141 """Metadata for class StableVersion.""" 2142 db_table = 'afe_stable_versions' 2143