Home | History | Annotate | Download | only in afe
      1 # pylint: disable-msg=C0111
      2 
      3 import logging
      4 from datetime import datetime
      5 import django.core
      6 try:
      7     from django.db import models as dbmodels, connection
      8 except django.core.exceptions.ImproperlyConfigured:
      9     raise ImportError('Django database not yet configured. Import either '
     10                        'setup_django_environment or '
     11                        'setup_django_lite_environment from '
     12                        'autotest_lib.frontend before any imports that '
     13                        'depend on django models.')
     14 from xml.sax import saxutils
     15 import common
     16 from autotest_lib.frontend.afe import model_logic, model_attributes
     17 from autotest_lib.frontend.afe import rdb_model_extensions
     18 from autotest_lib.frontend import settings, thread_local
     19 from autotest_lib.client.common_lib import enum, error, host_protections
     20 from autotest_lib.client.common_lib import global_config
     21 from autotest_lib.client.common_lib import host_queue_entry_states
     22 from autotest_lib.client.common_lib import control_data, priorities, decorators
     23 from autotest_lib.client.common_lib import site_utils
     24 from autotest_lib.client.common_lib.cros.graphite import autotest_es
     25 from autotest_lib.server import utils as server_utils
     26 
     27 # job options and user preferences
     28 DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY
     29 DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER
     30 
     31 
     32 class AclAccessViolation(Exception):
     33     """\
     34     Raised when an operation is attempted with proper permissions as
     35     dictated by ACLs.
     36     """
     37 
     38 
     39 class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model):
     40     """\
     41     An atomic group defines a collection of hosts which must only be scheduled
     42     all at once.  Any host with a label having an atomic group will only be
     43     scheduled for a job at the same time as other hosts sharing that label.
     44 
     45     Required:
     46       name: A name for this atomic group, e.g. 'rack23' or 'funky_net'.
     47       max_number_of_machines: The maximum number of machines that will be
     48               scheduled at once when scheduling jobs to this atomic group.
     49               The job.synch_count is considered the minimum.
     50 
     51     Optional:
     52       description: Arbitrary text description of this group's purpose.
     53     """
     54     name = dbmodels.CharField(max_length=255, unique=True)
     55     description = dbmodels.TextField(blank=True)
     56     # This magic value is the default to simplify the scheduler logic.
     57     # It must be "large".  The common use of atomic groups is to want all
     58     # machines in the group to be used, limits on which subset used are
     59     # often chosen via dependency labels.
     60     # TODO(dennisjeffrey): Revisit this so we don't have to assume that
     61     # "infinity" is around 3.3 million.
     62     INFINITE_MACHINES = 333333333
     63     max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES)
     64     invalid = dbmodels.BooleanField(default=False,
     65                                   editable=settings.FULL_ADMIN)
     66 
     67     name_field = 'name'
     68     objects = model_logic.ModelWithInvalidManager()
     69     valid_objects = model_logic.ValidObjectsManager()
     70 
     71 
     72     def enqueue_job(self, job, is_template=False):
     73         """Enqueue a job on an associated atomic group of hosts.
     74 
     75         @param job: A job to enqueue.
     76         @param is_template: Whether the status should be "Template".
     77         """
     78         queue_entry = HostQueueEntry.create(atomic_group=self, job=job,
     79                                             is_template=is_template)
     80         queue_entry.save()
     81 
     82 
     83     def clean_object(self):
     84         self.label_set.clear()
     85 
     86 
     87     class Meta:
     88         """Metadata for class AtomicGroup."""
     89         db_table = 'afe_atomic_groups'
     90 
     91 
     92     def __unicode__(self):
     93         return unicode(self.name)
     94 
     95 
     96 class Label(model_logic.ModelWithInvalid, dbmodels.Model):
     97     """\
     98     Required:
     99       name: label name
    100 
    101     Optional:
    102       kernel_config: URL/path to kernel config for jobs run on this label.
    103       platform: If True, this is a platform label (defaults to False).
    104       only_if_needed: If True, a Host with this label can only be used if that
    105               label is requested by the job/test (either as the meta_host or
    106               in the job_dependencies).
    107       atomic_group: The atomic group associated with this label.
    108     """
    109     name = dbmodels.CharField(max_length=255, unique=True)
    110     kernel_config = dbmodels.CharField(max_length=255, blank=True)
    111     platform = dbmodels.BooleanField(default=False)
    112     invalid = dbmodels.BooleanField(default=False,
    113                                     editable=settings.FULL_ADMIN)
    114     only_if_needed = dbmodels.BooleanField(default=False)
    115 
    116     name_field = 'name'
    117     objects = model_logic.ModelWithInvalidManager()
    118     valid_objects = model_logic.ValidObjectsManager()
    119     atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True)
    120 
    121 
    122     def clean_object(self):
    123         self.host_set.clear()
    124         self.test_set.clear()
    125 
    126 
    127     def enqueue_job(self, job, atomic_group=None, is_template=False):
    128         """Enqueue a job on any host of this label.
    129 
    130         @param job: A job to enqueue.
    131         @param atomic_group: The associated atomic group.
    132         @param is_template: Whether the status should be "Template".
    133         """
    134         queue_entry = HostQueueEntry.create(meta_host=self, job=job,
    135                                             is_template=is_template,
    136                                             atomic_group=atomic_group)
    137         queue_entry.save()
    138 
    139 
    140 
    141     class Meta:
    142         """Metadata for class Label."""
    143         db_table = 'afe_labels'
    144 
    145 
    146     def __unicode__(self):
    147         return unicode(self.name)
    148 
    149 
    150 class Shard(dbmodels.Model, model_logic.ModelExtensions):
    151 
    152     hostname = dbmodels.CharField(max_length=255, unique=True)
    153 
    154     name_field = 'hostname'
    155 
    156     labels = dbmodels.ManyToManyField(Label, blank=True,
    157                                       db_table='afe_shards_labels')
    158 
    159     class Meta:
    160         """Metadata for class ParameterizedJob."""
    161         db_table = 'afe_shards'
    162 
    163 
    164     def rpc_hostname(self):
    165         """Get the rpc hostname of the shard.
    166 
    167         @return: Just the shard hostname for all non-testing environments.
    168                  The address of the default gateway for vm testing environments.
    169         """
    170         # TODO: Figure out a better solution for testing. Since no 2 shards
    171         # can run on the same host, if the shard hostname is localhost we
    172         # conclude that it must be a vm in a test cluster. In such situations
    173         # a name of localhost:<port> is necessary to achieve the correct
    174         # afe links/redirection from the frontend (this happens through the
    175         # host), but for rpcs that are performed *on* the shard, they need to
    176         # use the address of the gateway.
    177         # In the virtual machine testing environment (i.e., puppylab), each
    178         # shard VM has a hostname like localhost:<port>. In the real cluster
    179         # environment, a shard node does not have 'localhost' for its hostname.
    180         # The following hostname substitution is needed only for the VM
    181         # in puppylab.
    182         # The 'hostname' should not be replaced in the case of real cluster.
    183         if site_utils.is_puppylab_vm(self.hostname):
    184             hostname = self.hostname.split(':')[0]
    185             return self.hostname.replace(
    186                     hostname, site_utils.DEFAULT_VM_GATEWAY)
    187         return self.hostname
    188 
    189 
    190 class Drone(dbmodels.Model, model_logic.ModelExtensions):
    191     """
    192     A scheduler drone
    193 
    194     hostname: the drone's hostname
    195     """
    196     hostname = dbmodels.CharField(max_length=255, unique=True)
    197 
    198     name_field = 'hostname'
    199     objects = model_logic.ExtendedManager()
    200 
    201 
    202     def save(self, *args, **kwargs):
    203         if not User.current_user().is_superuser():
    204             raise Exception('Only superusers may edit drones')
    205         super(Drone, self).save(*args, **kwargs)
    206 
    207 
    208     def delete(self):
    209         if not User.current_user().is_superuser():
    210             raise Exception('Only superusers may delete drones')
    211         super(Drone, self).delete()
    212 
    213 
    214     class Meta:
    215         """Metadata for class Drone."""
    216         db_table = 'afe_drones'
    217 
    218     def __unicode__(self):
    219         return unicode(self.hostname)
    220 
    221 
    222 class DroneSet(dbmodels.Model, model_logic.ModelExtensions):
    223     """
    224     A set of scheduler drones
    225 
    226     These will be used by the scheduler to decide what drones a job is allowed
    227     to run on.
    228 
    229     name: the drone set's name
    230     drones: the drones that are part of the set
    231     """
    232     DRONE_SETS_ENABLED = global_config.global_config.get_config_value(
    233             'SCHEDULER', 'drone_sets_enabled', type=bool, default=False)
    234     DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value(
    235             'SCHEDULER', 'default_drone_set_name', default=None)
    236 
    237     name = dbmodels.CharField(max_length=255, unique=True)
    238     drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones')
    239 
    240     name_field = 'name'
    241     objects = model_logic.ExtendedManager()
    242 
    243 
    244     def save(self, *args, **kwargs):
    245         if not User.current_user().is_superuser():
    246             raise Exception('Only superusers may edit drone sets')
    247         super(DroneSet, self).save(*args, **kwargs)
    248 
    249 
    250     def delete(self):
    251         if not User.current_user().is_superuser():
    252             raise Exception('Only superusers may delete drone sets')
    253         super(DroneSet, self).delete()
    254 
    255 
    256     @classmethod
    257     def drone_sets_enabled(cls):
    258         """Returns whether drone sets are enabled.
    259 
    260         @param cls: Implicit class object.
    261         """
    262         return cls.DRONE_SETS_ENABLED
    263 
    264 
    265     @classmethod
    266     def default_drone_set_name(cls):
    267         """Returns the default drone set name.
    268 
    269         @param cls: Implicit class object.
    270         """
    271         return cls.DEFAULT_DRONE_SET_NAME
    272 
    273 
    274     @classmethod
    275     def get_default(cls):
    276         """Gets the default drone set name, compatible with Job.add_object.
    277 
    278         @param cls: Implicit class object.
    279         """
    280         return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME)
    281 
    282 
    283     @classmethod
    284     def resolve_name(cls, drone_set_name):
    285         """
    286         Returns the name of one of these, if not None, in order of preference:
    287         1) the drone set given,
    288         2) the current user's default drone set, or
    289         3) the global default drone set
    290 
    291         or returns None if drone sets are disabled
    292 
    293         @param cls: Implicit class object.
    294         @param drone_set_name: A drone set name.
    295         """
    296         if not cls.drone_sets_enabled():
    297             return None
    298 
    299         user = User.current_user()
    300         user_drone_set_name = user.drone_set and user.drone_set.name
    301 
    302         return drone_set_name or user_drone_set_name or cls.get_default().name
    303 
    304 
    305     def get_drone_hostnames(self):
    306         """
    307         Gets the hostnames of all drones in this drone set
    308         """
    309         return set(self.drones.all().values_list('hostname', flat=True))
    310 
    311 
    312     class Meta:
    313         """Metadata for class DroneSet."""
    314         db_table = 'afe_drone_sets'
    315 
    316     def __unicode__(self):
    317         return unicode(self.name)
    318 
    319 
    320 class User(dbmodels.Model, model_logic.ModelExtensions):
    321     """\
    322     Required:
    323     login :user login name
    324 
    325     Optional:
    326     access_level: 0=User (default), 1=Admin, 100=Root
    327     """
    328     ACCESS_ROOT = 100
    329     ACCESS_ADMIN = 1
    330     ACCESS_USER = 0
    331 
    332     AUTOTEST_SYSTEM = 'autotest_system'
    333 
    334     login = dbmodels.CharField(max_length=255, unique=True)
    335     access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True)
    336 
    337     # user preferences
    338     reboot_before = dbmodels.SmallIntegerField(
    339         choices=model_attributes.RebootBefore.choices(), blank=True,
    340         default=DEFAULT_REBOOT_BEFORE)
    341     reboot_after = dbmodels.SmallIntegerField(
    342         choices=model_attributes.RebootAfter.choices(), blank=True,
    343         default=DEFAULT_REBOOT_AFTER)
    344     drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True)
    345     show_experimental = dbmodels.BooleanField(default=False)
    346 
    347     name_field = 'login'
    348     objects = model_logic.ExtendedManager()
    349 
    350 
    351     def save(self, *args, **kwargs):
    352         # is this a new object being saved for the first time?
    353         first_time = (self.id is None)
    354         user = thread_local.get_user()
    355         if user and not user.is_superuser() and user.login != self.login:
    356             raise AclAccessViolation("You cannot modify user " + self.login)
    357         super(User, self).save(*args, **kwargs)
    358         if first_time:
    359             everyone = AclGroup.objects.get(name='Everyone')
    360             everyone.users.add(self)
    361 
    362 
    363     def is_superuser(self):
    364         """Returns whether the user has superuser access."""
    365         return self.access_level >= self.ACCESS_ROOT
    366 
    367 
    368     @classmethod
    369     def current_user(cls):
    370         """Returns the current user.
    371 
    372         @param cls: Implicit class object.
    373         """
    374         user = thread_local.get_user()
    375         if user is None:
    376             user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM)
    377             user.access_level = cls.ACCESS_ROOT
    378             user.save()
    379         return user
    380 
    381 
    382     @classmethod
    383     def get_record(cls, data):
    384         """Check the database for an identical record.
    385 
    386         Check for a record with matching id and login. If one exists,
    387         return it. If one does not exist there is a possibility that
    388         the following cases have happened:
    389         1. Same id, different login
    390             We received: "1 chromeos-test"
    391             And we have: "1 debug-user"
    392         In this case we need to delete "1 debug_user" and insert
    393         "1 chromeos-test".
    394 
    395         2. Same login, different id:
    396             We received: "1 chromeos-test"
    397             And we have: "2 chromeos-test"
    398         In this case we need to delete "2 chromeos-test" and insert
    399         "1 chromeos-test".
    400 
    401         As long as this method deletes bad records and raises the
    402         DoesNotExist exception the caller will handle creating the
    403         new record.
    404 
    405         @raises: DoesNotExist, if a record with the matching login and id
    406                 does not exist.
    407         """
    408 
    409         # Both the id and login should be uniqe but there are cases when
    410         # we might already have a user with the same login/id because
    411         # current_user will proactively create a user record if it doesn't
    412         # exist. Since we want to avoid conflict between the master and
    413         # shard, just delete any existing user records that don't match
    414         # what we're about to deserialize from the master.
    415         try:
    416             return cls.objects.get(login=data['login'], id=data['id'])
    417         except cls.DoesNotExist:
    418             cls.delete_matching_record(login=data['login'])
    419             cls.delete_matching_record(id=data['id'])
    420             raise
    421 
    422 
    423     class Meta:
    424         """Metadata for class User."""
    425         db_table = 'afe_users'
    426 
    427     def __unicode__(self):
    428         return unicode(self.login)
    429 
    430 
    431 class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel,
    432            model_logic.ModelWithAttributes):
    433     """\
    434     Required:
    435     hostname
    436 
    437     optional:
    438     locked: if true, host is locked and will not be queued
    439 
    440     Internal:
    441     From AbstractHostModel:
    442         synch_id: currently unused
    443         status: string describing status of host
    444         invalid: true if the host has been deleted
    445         protection: indicates what can be done to this host during repair
    446         lock_time: DateTime at which the host was locked
    447         dirty: true if the host has been used without being rebooted
    448     Local:
    449         locked_by: user that locked the host, or null if the host is unlocked
    450     """
    451 
    452     SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set',
    453                                          'hostattribute_set',
    454                                          'labels',
    455                                          'shard'])
    456     SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid'])
    457 
    458 
    459     def custom_deserialize_relation(self, link, data):
    460         assert link == 'shard', 'Link %s should not be deserialized' % link
    461         self.shard = Shard.deserialize(data)
    462 
    463 
    464     # Note: Only specify foreign keys here, specify all native host columns in
    465     # rdb_model_extensions instead.
    466     Protection = host_protections.Protection
    467     labels = dbmodels.ManyToManyField(Label, blank=True,
    468                                       db_table='afe_hosts_labels')
    469     locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False)
    470     name_field = 'hostname'
    471     objects = model_logic.ModelWithInvalidManager()
    472     valid_objects = model_logic.ValidObjectsManager()
    473     leased_objects = model_logic.LeasedHostManager()
    474 
    475     shard = dbmodels.ForeignKey(Shard, blank=True, null=True)
    476 
    477     def __init__(self, *args, **kwargs):
    478         super(Host, self).__init__(*args, **kwargs)
    479         self._record_attributes(['status'])
    480 
    481 
    482     @staticmethod
    483     def create_one_time_host(hostname):
    484         """Creates a one-time host.
    485 
    486         @param hostname: The name for the host.
    487         """
    488         query = Host.objects.filter(hostname=hostname)
    489         if query.count() == 0:
    490             host = Host(hostname=hostname, invalid=True)
    491             host.do_validate()
    492         else:
    493             host = query[0]
    494             if not host.invalid:
    495                 raise model_logic.ValidationError({
    496                     'hostname' : '%s already exists in the autotest DB.  '
    497                         'Select it rather than entering it as a one time '
    498                         'host.' % hostname
    499                     })
    500         host.protection = host_protections.Protection.DO_NOT_REPAIR
    501         host.locked = False
    502         host.save()
    503         host.clean_object()
    504         return host
    505 
    506 
    507     @classmethod
    508     def assign_to_shard(cls, shard, known_ids):
    509         """Assigns hosts to a shard.
    510 
    511         For all labels that have been assigned to a shard, all hosts that
    512         have at least one of the shard's labels are assigned to the shard.
    513         Hosts that are assigned to the shard but aren't already present on the
    514         shard are returned.
    515 
    516         Board to shard mapping is many-to-one. Many different boards can be
    517         hosted in a shard. However, DUTs of a single board cannot be distributed
    518         into more than one shard.
    519 
    520         @param shard: The shard object to assign labels/hosts for.
    521         @param known_ids: List of all host-ids the shard already knows.
    522                           This is used to figure out which hosts should be sent
    523                           to the shard. If shard_ids were used instead, hosts
    524                           would only be transferred once, even if the client
    525                           failed persisting them.
    526                           The number of hosts usually lies in O(100), so the
    527                           overhead is acceptable.
    528 
    529         @returns the hosts objects that should be sent to the shard.
    530         """
    531 
    532         # Disclaimer: concurrent heartbeats should theoretically not occur in
    533         # the current setup. As they may be introduced in the near future,
    534         # this comment will be left here.
    535 
    536         # Sending stuff twice is acceptable, but forgetting something isn't.
    537         # Detecting duplicates on the client is easy, but here it's harder. The
    538         # following options were considered:
    539         # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more
    540         #   than select returned, as concurrently more hosts might have been
    541         #   inserted
    542         # - UPDATE and then SELECT WHERE shard=shard: select always returns all
    543         #   hosts for the shard, this is overhead
    544         # - SELECT and then UPDATE only selected without requerying afterwards:
    545         #   returns the old state of the records.
    546         host_ids = set(Host.objects.filter(
    547             labels__in=shard.labels.all(),
    548             leased=False
    549             ).exclude(
    550             id__in=known_ids,
    551             ).values_list('pk', flat=True))
    552 
    553         if host_ids:
    554             Host.objects.filter(pk__in=host_ids).update(shard=shard)
    555             return list(Host.objects.filter(pk__in=host_ids).all())
    556         return []
    557 
    558     def resurrect_object(self, old_object):
    559         super(Host, self).resurrect_object(old_object)
    560         # invalid hosts can be in use by the scheduler (as one-time hosts), so
    561         # don't change the status
    562         self.status = old_object.status
    563 
    564 
    565     def clean_object(self):
    566         self.aclgroup_set.clear()
    567         self.labels.clear()
    568 
    569 
    570     def record_state(self, type_str, state, value, other_metadata=None):
    571         """Record metadata in elasticsearch.
    572 
    573         @param type_str: sets the _type field in elasticsearch db.
    574         @param state: string representing what state we are recording,
    575                       e.g. 'locked'
    576         @param value: value of the state, e.g. True
    577         @param other_metadata: Other metadata to store in metaDB.
    578         """
    579         metadata = {
    580             state: value,
    581             'hostname': self.hostname,
    582         }
    583         if other_metadata:
    584             metadata = dict(metadata.items() + other_metadata.items())
    585         autotest_es.post(use_http=True, type_str=type_str, metadata=metadata)
    586 
    587 
    588     def save(self, *args, **kwargs):
    589         # extra spaces in the hostname can be a sneaky source of errors
    590         self.hostname = self.hostname.strip()
    591         # is this a new object being saved for the first time?
    592         first_time = (self.id is None)
    593         if not first_time:
    594             AclGroup.check_for_acl_violation_hosts([self])
    595         # If locked is changed, send its status and user made the change to
    596         # metaDB. Locks are important in host history because if a device is
    597         # locked then we don't really care what state it is in.
    598         if self.locked and not self.locked_by:
    599             self.locked_by = User.current_user()
    600             if not self.lock_time:
    601                 self.lock_time = datetime.now()
    602             self.record_state('lock_history', 'locked', self.locked,
    603                               {'changed_by': self.locked_by.login,
    604                                'lock_reason': self.lock_reason})
    605             self.dirty = True
    606         elif not self.locked and self.locked_by:
    607             self.record_state('lock_history', 'locked', self.locked,
    608                               {'changed_by': self.locked_by.login})
    609             self.locked_by = None
    610             self.lock_time = None
    611         super(Host, self).save(*args, **kwargs)
    612         if first_time:
    613             everyone = AclGroup.objects.get(name='Everyone')
    614             everyone.hosts.add(self)
    615         self._check_for_updated_attributes()
    616 
    617 
    618     def delete(self):
    619         AclGroup.check_for_acl_violation_hosts([self])
    620         for queue_entry in self.hostqueueentry_set.all():
    621             queue_entry.deleted = True
    622             queue_entry.abort()
    623         super(Host, self).delete()
    624 
    625 
    626     def on_attribute_changed(self, attribute, old_value):
    627         assert attribute == 'status'
    628         logging.info(self.hostname + ' -> ' + self.status)
    629 
    630 
    631     def enqueue_job(self, job, atomic_group=None, is_template=False):
    632         """Enqueue a job on this host.
    633 
    634         @param job: A job to enqueue.
    635         @param atomic_group: The associated atomic group.
    636         @param is_template: Whther the status should be "Template".
    637         """
    638         queue_entry = HostQueueEntry.create(host=self, job=job,
    639                                             is_template=is_template,
    640                                             atomic_group=atomic_group)
    641         # allow recovery of dead hosts from the frontend
    642         if not self.active_queue_entry() and self.is_dead():
    643             self.status = Host.Status.READY
    644             self.save()
    645         queue_entry.save()
    646 
    647         block = IneligibleHostQueue(job=job, host=self)
    648         block.save()
    649 
    650 
    651     def platform(self):
    652         """The platform of the host."""
    653         # TODO(showard): slighly hacky?
    654         platforms = self.labels.filter(platform=True)
    655         if len(platforms) == 0:
    656             return None
    657         return platforms[0]
    658     platform.short_description = 'Platform'
    659 
    660 
    661     @classmethod
    662     def check_no_platform(cls, hosts):
    663         """Verify the specified hosts have no associated platforms.
    664 
    665         @param cls: Implicit class object.
    666         @param hosts: The hosts to verify.
    667         @raises model_logic.ValidationError if any hosts already have a
    668             platform.
    669         """
    670         Host.objects.populate_relationships(hosts, Label, 'label_list')
    671         errors = []
    672         for host in hosts:
    673             platforms = [label.name for label in host.label_list
    674                          if label.platform]
    675             if platforms:
    676                 # do a join, just in case this host has multiple platforms,
    677                 # we'll be able to see it
    678                 errors.append('Host %s already has a platform: %s' % (
    679                               host.hostname, ', '.join(platforms)))
    680         if errors:
    681             raise model_logic.ValidationError({'labels': '; '.join(errors)})
    682 
    683 
    684     def is_dead(self):
    685         """Returns whether the host is dead (has status repair failed)."""
    686         return self.status == Host.Status.REPAIR_FAILED
    687 
    688 
    689     def active_queue_entry(self):
    690         """Returns the active queue entry for this host, or None if none."""
    691         active = list(self.hostqueueentry_set.filter(active=True))
    692         if not active:
    693             return None
    694         assert len(active) == 1, ('More than one active entry for '
    695                                   'host ' + self.hostname)
    696         return active[0]
    697 
    698 
    699     def _get_attribute_model_and_args(self, attribute):
    700         return HostAttribute, dict(host=self, attribute=attribute)
    701 
    702 
    703     @classmethod
    704     def get_attribute_model(cls):
    705         """Return the attribute model.
    706 
    707         Override method in parent class. See ModelExtensions for details.
    708         @returns: The attribute model of Host.
    709         """
    710         return HostAttribute
    711 
    712 
    713     class Meta:
    714         """Metadata for the Host class."""
    715         db_table = 'afe_hosts'
    716 
    717 
    718     def __unicode__(self):
    719         return unicode(self.hostname)
    720 
    721 
    722 class HostAttribute(dbmodels.Model, model_logic.ModelExtensions):
    723     """Arbitrary keyvals associated with hosts."""
    724 
    725     SERIALIZATION_LINKS_TO_KEEP = set(['host'])
    726     SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value'])
    727     host = dbmodels.ForeignKey(Host)
    728     attribute = dbmodels.CharField(max_length=90)
    729     value = dbmodels.CharField(max_length=300)
    730 
    731     objects = model_logic.ExtendedManager()
    732 
    733     class Meta:
    734         """Metadata for the HostAttribute class."""
    735         db_table = 'afe_host_attributes'
    736 
    737 
    738     @classmethod
    739     def get_record(cls, data):
    740         """Check the database for an identical record.
    741 
    742         Use host_id and attribute to search for a existing record.
    743 
    744         @raises: DoesNotExist, if no record found
    745         @raises: MultipleObjectsReturned if multiple records found.
    746         """
    747         # TODO(fdeng): We should use host_id and attribute together as
    748         #              a primary key in the db.
    749         return cls.objects.get(host_id=data['host_id'],
    750                                attribute=data['attribute'])
    751 
    752 
    753     @classmethod
    754     def deserialize(cls, data):
    755         """Override deserialize in parent class.
    756 
    757         Do not deserialize id as id is not kept consistent on master and shards.
    758 
    759         @param data: A dictionary of data to deserialize.
    760 
    761         @returns: A HostAttribute object.
    762         """
    763         if data:
    764             data.pop('id')
    765         return super(HostAttribute, cls).deserialize(data)
    766 
    767 
    768 class Test(dbmodels.Model, model_logic.ModelExtensions):
    769     """\
    770     Required:
    771     author: author name
    772     description: description of the test
    773     name: test name
    774     time: short, medium, long
    775     test_class: This describes the class for your the test belongs in.
    776     test_category: This describes the category for your tests
    777     test_type: Client or Server
    778     path: path to pass to run_test()
    779     sync_count:  is a number >=1 (1 being the default). If it's 1, then it's an
    780                  async job. If it's >1 it's sync job for that number of machines
    781                  i.e. if sync_count = 2 it is a sync job that requires two
    782                  machines.
    783     Optional:
    784     dependencies: What the test requires to run. Comma deliminated list
    785     dependency_labels: many-to-many relationship with labels corresponding to
    786                        test dependencies.
    787     experimental: If this is set to True production servers will ignore the test
    788     run_verify: Whether or not the scheduler should run the verify stage
    789     run_reset: Whether or not the scheduler should run the reset stage
    790     test_retry: Number of times to retry test if the test did not complete
    791                 successfully. (optional, default: 0)
    792     """
    793     TestTime = enum.Enum('SHORT', 'MEDIUM', 'LONG', start_value=1)
    794 
    795     name = dbmodels.CharField(max_length=255, unique=True)
    796     author = dbmodels.CharField(max_length=255)
    797     test_class = dbmodels.CharField(max_length=255)
    798     test_category = dbmodels.CharField(max_length=255)
    799     dependencies = dbmodels.CharField(max_length=255, blank=True)
    800     description = dbmodels.TextField(blank=True)
    801     experimental = dbmodels.BooleanField(default=True)
    802     run_verify = dbmodels.BooleanField(default=False)
    803     test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(),
    804                                            default=TestTime.MEDIUM)
    805     test_type = dbmodels.SmallIntegerField(
    806         choices=control_data.CONTROL_TYPE.choices())
    807     sync_count = dbmodels.IntegerField(default=1)
    808     path = dbmodels.CharField(max_length=255, unique=True)
    809     test_retry = dbmodels.IntegerField(blank=True, default=0)
    810     run_reset = dbmodels.BooleanField(default=True)
    811 
    812     dependency_labels = (
    813         dbmodels.ManyToManyField(Label, blank=True,
    814                                  db_table='afe_autotests_dependency_labels'))
    815     name_field = 'name'
    816     objects = model_logic.ExtendedManager()
    817 
    818 
    819     def admin_description(self):
    820         """Returns a string representing the admin description."""
    821         escaped_description = saxutils.escape(self.description)
    822         return '<span style="white-space:pre">%s</span>' % escaped_description
    823     admin_description.allow_tags = True
    824     admin_description.short_description = 'Description'
    825 
    826 
    827     class Meta:
    828         """Metadata for class Test."""
    829         db_table = 'afe_autotests'
    830 
    831     def __unicode__(self):
    832         return unicode(self.name)
    833 
    834 
    835 class TestParameter(dbmodels.Model):
    836     """
    837     A declared parameter of a test
    838     """
    839     test = dbmodels.ForeignKey(Test)
    840     name = dbmodels.CharField(max_length=255)
    841 
    842     class Meta:
    843         """Metadata for class TestParameter."""
    844         db_table = 'afe_test_parameters'
    845         unique_together = ('test', 'name')
    846 
    847     def __unicode__(self):
    848         return u'%s (%s)' % (self.name, self.test.name)
    849 
    850 
    851 class Profiler(dbmodels.Model, model_logic.ModelExtensions):
    852     """\
    853     Required:
    854     name: profiler name
    855     test_type: Client or Server
    856 
    857     Optional:
    858     description: arbirary text description
    859     """
    860     name = dbmodels.CharField(max_length=255, unique=True)
    861     description = dbmodels.TextField(blank=True)
    862 
    863     name_field = 'name'
    864     objects = model_logic.ExtendedManager()
    865 
    866 
    867     class Meta:
    868         """Metadata for class Profiler."""
    869         db_table = 'afe_profilers'
    870 
    871     def __unicode__(self):
    872         return unicode(self.name)
    873 
    874 
    875 class AclGroup(dbmodels.Model, model_logic.ModelExtensions):
    876     """\
    877     Required:
    878     name: name of ACL group
    879 
    880     Optional:
    881     description: arbitrary description of group
    882     """
    883 
    884     SERIALIZATION_LINKS_TO_FOLLOW = set(['users'])
    885 
    886     name = dbmodels.CharField(max_length=255, unique=True)
    887     description = dbmodels.CharField(max_length=255, blank=True)
    888     users = dbmodels.ManyToManyField(User, blank=False,
    889                                      db_table='afe_acl_groups_users')
    890     hosts = dbmodels.ManyToManyField(Host, blank=True,
    891                                      db_table='afe_acl_groups_hosts')
    892 
    893     name_field = 'name'
    894     objects = model_logic.ExtendedManager()
    895 
    896     @staticmethod
    897     def check_for_acl_violation_hosts(hosts):
    898         """Verify the current user has access to the specified hosts.
    899 
    900         @param hosts: The hosts to verify against.
    901         @raises AclAccessViolation if the current user doesn't have access
    902             to a host.
    903         """
    904         user = User.current_user()
    905         if user.is_superuser():
    906             return
    907         accessible_host_ids = set(
    908             host.id for host in Host.objects.filter(aclgroup__users=user))
    909         for host in hosts:
    910             # Check if the user has access to this host,
    911             # but only if it is not a metahost or a one-time-host.
    912             no_access = (isinstance(host, Host)
    913                          and not host.invalid
    914                          and int(host.id) not in accessible_host_ids)
    915             if no_access:
    916                 raise AclAccessViolation("%s does not have access to %s" %
    917                                          (str(user), str(host)))
    918 
    919 
    920     @staticmethod
    921     def check_abort_permissions(queue_entries):
    922         """Look for queue entries that aren't abortable by the current user.
    923 
    924         An entry is not abortable if:
    925            * the job isn't owned by this user, and
    926            * the machine isn't ACL-accessible, or
    927            * the machine is in the "Everyone" ACL
    928 
    929         @param queue_entries: The queue entries to check.
    930         @raises AclAccessViolation if a queue entry is not abortable by the
    931             current user.
    932         """
    933         user = User.current_user()
    934         if user.is_superuser():
    935             return
    936         not_owned = queue_entries.exclude(job__owner=user.login)
    937         # I do this using ID sets instead of just Django filters because
    938         # filtering on M2M dbmodels is broken in Django 0.96. It's better in
    939         # 1.0.
    940         # TODO: Use Django filters, now that we're using 1.0.
    941         accessible_ids = set(
    942             entry.id for entry
    943             in not_owned.filter(host__aclgroup__users__login=user.login))
    944         public_ids = set(entry.id for entry
    945                          in not_owned.filter(host__aclgroup__name='Everyone'))
    946         cannot_abort = [entry for entry in not_owned.select_related()
    947                         if entry.id not in accessible_ids
    948                         or entry.id in public_ids]
    949         if len(cannot_abort) == 0:
    950             return
    951         entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner,
    952                                               entry.host_or_metahost_name())
    953                                 for entry in cannot_abort)
    954         raise AclAccessViolation('You cannot abort the following job entries: '
    955                                  + entry_names)
    956 
    957 
    958     def check_for_acl_violation_acl_group(self):
    959         """Verifies the current user has acces to this ACL group.
    960 
    961         @raises AclAccessViolation if the current user doesn't have access to
    962             this ACL group.
    963         """
    964         user = User.current_user()
    965         if user.is_superuser():
    966             return
    967         if self.name == 'Everyone':
    968             raise AclAccessViolation("You cannot modify 'Everyone'!")
    969         if not user in self.users.all():
    970             raise AclAccessViolation("You do not have access to %s"
    971                                      % self.name)
    972 
    973     @staticmethod
    974     def on_host_membership_change():
    975         """Invoked when host membership changes."""
    976         everyone = AclGroup.objects.get(name='Everyone')
    977 
    978         # find hosts that aren't in any ACL group and add them to Everyone
    979         # TODO(showard): this is a bit of a hack, since the fact that this query
    980         # works is kind of a coincidence of Django internals.  This trick
    981         # doesn't work in general (on all foreign key relationships).  I'll
    982         # replace it with a better technique when the need arises.
    983         orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True)
    984         everyone.hosts.add(*orphaned_hosts.distinct())
    985 
    986         # find hosts in both Everyone and another ACL group, and remove them
    987         # from Everyone
    988         hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone')
    989         acled_hosts = set()
    990         for host in hosts_in_everyone:
    991             # Has an ACL group other than Everyone
    992             if host.aclgroup_set.count() > 1:
    993                 acled_hosts.add(host)
    994         everyone.hosts.remove(*acled_hosts)
    995 
    996 
    997     def delete(self):
    998         if (self.name == 'Everyone'):
    999             raise AclAccessViolation("You cannot delete 'Everyone'!")
   1000         self.check_for_acl_violation_acl_group()
   1001         super(AclGroup, self).delete()
   1002         self.on_host_membership_change()
   1003 
   1004 
   1005     def add_current_user_if_empty(self):
   1006         """Adds the current user if the set of users is empty."""
   1007         if not self.users.count():
   1008             self.users.add(User.current_user())
   1009 
   1010 
   1011     def perform_after_save(self, change):
   1012         """Called after a save.
   1013 
   1014         @param change: Whether there was a change.
   1015         """
   1016         if not change:
   1017             self.users.add(User.current_user())
   1018         self.add_current_user_if_empty()
   1019         self.on_host_membership_change()
   1020 
   1021 
   1022     def save(self, *args, **kwargs):
   1023         change = bool(self.id)
   1024         if change:
   1025             # Check the original object for an ACL violation
   1026             AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group()
   1027         super(AclGroup, self).save(*args, **kwargs)
   1028         self.perform_after_save(change)
   1029 
   1030 
   1031     class Meta:
   1032         """Metadata for class AclGroup."""
   1033         db_table = 'afe_acl_groups'
   1034 
   1035     def __unicode__(self):
   1036         return unicode(self.name)
   1037 
   1038 
   1039 class Kernel(dbmodels.Model):
   1040     """
   1041     A kernel configuration for a parameterized job
   1042     """
   1043     version = dbmodels.CharField(max_length=255)
   1044     cmdline = dbmodels.CharField(max_length=255, blank=True)
   1045 
   1046     @classmethod
   1047     def create_kernels(cls, kernel_list):
   1048         """Creates all kernels in the kernel list.
   1049 
   1050         @param cls: Implicit class object.
   1051         @param kernel_list: A list of dictionaries that describe the kernels,
   1052             in the same format as the 'kernel' argument to
   1053             rpc_interface.generate_control_file.
   1054         @return A list of the created kernels.
   1055         """
   1056         if not kernel_list:
   1057             return None
   1058         return [cls._create(kernel) for kernel in kernel_list]
   1059 
   1060 
   1061     @classmethod
   1062     def _create(cls, kernel_dict):
   1063         version = kernel_dict.pop('version')
   1064         cmdline = kernel_dict.pop('cmdline', '')
   1065 
   1066         if kernel_dict:
   1067             raise Exception('Extraneous kernel arguments remain: %r'
   1068                             % kernel_dict)
   1069 
   1070         kernel, _ = cls.objects.get_or_create(version=version,
   1071                                               cmdline=cmdline)
   1072         return kernel
   1073 
   1074 
   1075     class Meta:
   1076         """Metadata for class Kernel."""
   1077         db_table = 'afe_kernels'
   1078         unique_together = ('version', 'cmdline')
   1079 
   1080     def __unicode__(self):
   1081         return u'%s %s' % (self.version, self.cmdline)
   1082 
   1083 
   1084 class ParameterizedJob(dbmodels.Model):
   1085     """
   1086     Auxiliary configuration for a parameterized job.
   1087     """
   1088     test = dbmodels.ForeignKey(Test)
   1089     label = dbmodels.ForeignKey(Label, null=True)
   1090     use_container = dbmodels.BooleanField(default=False)
   1091     profile_only = dbmodels.BooleanField(default=False)
   1092     upload_kernel_config = dbmodels.BooleanField(default=False)
   1093 
   1094     kernels = dbmodels.ManyToManyField(
   1095             Kernel, db_table='afe_parameterized_job_kernels')
   1096     profilers = dbmodels.ManyToManyField(
   1097             Profiler, through='ParameterizedJobProfiler')
   1098 
   1099 
   1100     @classmethod
   1101     def smart_get(cls, id_or_name, *args, **kwargs):
   1102         """For compatibility with Job.add_object.
   1103 
   1104         @param cls: Implicit class object.
   1105         @param id_or_name: The ID or name to get.
   1106         @param args: Non-keyword arguments.
   1107         @param kwargs: Keyword arguments.
   1108         """
   1109         return cls.objects.get(pk=id_or_name)
   1110 
   1111 
   1112     def job(self):
   1113         """Returns the job if it exists, or else None."""
   1114         jobs = self.job_set.all()
   1115         assert jobs.count() <= 1
   1116         return jobs and jobs[0] or None
   1117 
   1118 
   1119     class Meta:
   1120         """Metadata for class ParameterizedJob."""
   1121         db_table = 'afe_parameterized_jobs'
   1122 
   1123     def __unicode__(self):
   1124         return u'%s (parameterized) - %s' % (self.test.name, self.job())
   1125 
   1126 
   1127 class ParameterizedJobProfiler(dbmodels.Model):
   1128     """
   1129     A profiler to run on a parameterized job
   1130     """
   1131     parameterized_job = dbmodels.ForeignKey(ParameterizedJob)
   1132     profiler = dbmodels.ForeignKey(Profiler)
   1133 
   1134     class Meta:
   1135         """Metedata for class ParameterizedJobProfiler."""
   1136         db_table = 'afe_parameterized_jobs_profilers'
   1137         unique_together = ('parameterized_job', 'profiler')
   1138 
   1139 
   1140 class ParameterizedJobProfilerParameter(dbmodels.Model):
   1141     """
   1142     A parameter for a profiler in a parameterized job
   1143     """
   1144     parameterized_job_profiler = dbmodels.ForeignKey(ParameterizedJobProfiler)
   1145     parameter_name = dbmodels.CharField(max_length=255)
   1146     parameter_value = dbmodels.TextField()
   1147     parameter_type = dbmodels.CharField(
   1148             max_length=8, choices=model_attributes.ParameterTypes.choices())
   1149 
   1150     class Meta:
   1151         """Metadata for class ParameterizedJobProfilerParameter."""
   1152         db_table = 'afe_parameterized_job_profiler_parameters'
   1153         unique_together = ('parameterized_job_profiler', 'parameter_name')
   1154 
   1155     def __unicode__(self):
   1156         return u'%s - %s' % (self.parameterized_job_profiler.profiler.name,
   1157                              self.parameter_name)
   1158 
   1159 
   1160 class ParameterizedJobParameter(dbmodels.Model):
   1161     """
   1162     Parameters for a parameterized job
   1163     """
   1164     parameterized_job = dbmodels.ForeignKey(ParameterizedJob)
   1165     test_parameter = dbmodels.ForeignKey(TestParameter)
   1166     parameter_value = dbmodels.TextField()
   1167     parameter_type = dbmodels.CharField(
   1168             max_length=8, choices=model_attributes.ParameterTypes.choices())
   1169 
   1170     class Meta:
   1171         """Metadata for class ParameterizedJobParameter."""
   1172         db_table = 'afe_parameterized_job_parameters'
   1173         unique_together = ('parameterized_job', 'test_parameter')
   1174 
   1175     def __unicode__(self):
   1176         return u'%s - %s' % (self.parameterized_job.job().name,
   1177                              self.test_parameter.name)
   1178 
   1179 
   1180 class JobManager(model_logic.ExtendedManager):
   1181     'Custom manager to provide efficient status counts querying.'
   1182     def get_status_counts(self, job_ids):
   1183         """Returns a dict mapping the given job IDs to their status count dicts.
   1184 
   1185         @param job_ids: A list of job IDs.
   1186         """
   1187         if not job_ids:
   1188             return {}
   1189         id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids)
   1190         cursor = connection.cursor()
   1191         cursor.execute("""
   1192             SELECT job_id, status, aborted, complete, COUNT(*)
   1193             FROM afe_host_queue_entries
   1194             WHERE job_id IN %s
   1195             GROUP BY job_id, status, aborted, complete
   1196             """ % id_list)
   1197         all_job_counts = dict((job_id, {}) for job_id in job_ids)
   1198         for job_id, status, aborted, complete, count in cursor.fetchall():
   1199             job_dict = all_job_counts[job_id]
   1200             full_status = HostQueueEntry.compute_full_status(status, aborted,
   1201                                                              complete)
   1202             job_dict.setdefault(full_status, 0)
   1203             job_dict[full_status] += count
   1204         return all_job_counts
   1205 
   1206 
   1207 class Job(dbmodels.Model, model_logic.ModelExtensions):
   1208     """\
   1209     owner: username of job owner
   1210     name: job name (does not have to be unique)
   1211     priority: Integer priority value.  Higher is more important.
   1212     control_file: contents of control file
   1213     control_type: Client or Server
   1214     created_on: date of job creation
   1215     submitted_on: date of job submission
   1216     synch_count: how many hosts should be used per autoserv execution
   1217     run_verify: Whether or not to run the verify phase
   1218     run_reset: Whether or not to run the reset phase
   1219     timeout: DEPRECATED - hours from queuing time until job times out
   1220     timeout_mins: minutes from job queuing time until the job times out
   1221     max_runtime_hrs: DEPRECATED - hours from job starting time until job
   1222                      times out
   1223     max_runtime_mins: minutes from job starting time until job times out
   1224     email_list: list of people to email on completion delimited by any of:
   1225                 white space, ',', ':', ';'
   1226     dependency_labels: many-to-many relationship with labels corresponding to
   1227                        job dependencies
   1228     reboot_before: Never, If dirty, or Always
   1229     reboot_after: Never, If all tests passed, or Always
   1230     parse_failed_repair: if True, a failed repair launched by this job will have
   1231     its results parsed as part of the job.
   1232     drone_set: The set of drones to run this job on
   1233     parent_job: Parent job (optional)
   1234     test_retry: Number of times to retry test if the test did not complete
   1235                 successfully. (optional, default: 0)
   1236     require_ssp: Require server-side packaging unless require_ssp is set to
   1237                  False. (optional, default: None)
   1238     """
   1239 
   1240     # TODO: Investigate, if jobkeyval_set is really needed.
   1241     # dynamic_suite will write them into an attached file for the drone, but
   1242     # it doesn't seem like they are actually used. If they aren't used, remove
   1243     # jobkeyval_set here.
   1244     SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels',
   1245                                          'hostqueueentry_set',
   1246                                          'jobkeyval_set',
   1247                                          'shard'])
   1248 
   1249     # SQL for selecting jobs that should be sent to shard.
   1250     # We use raw sql as django filters were not optimized.
   1251     # The following jobs are excluded by the SQL.
   1252     #     - Non-aborted jobs known to shard as specified in |known_ids|.
   1253     #       Note for jobs aborted on master, even if already known to shard,
   1254     #       will be sent to shard again so that shard can abort them.
   1255     #     - Completed jobs
   1256     #     - Active jobs
   1257     #     - Jobs without host_queue_entries
   1258     NON_ABORTED_KNOWN_JOBS = '(t2.aborted = 0 AND t1.id IN (%(known_ids)s))'
   1259 
   1260     SQL_SHARD_JOBS = (
   1261         'SELECT DISTINCT(t1.id) FROM afe_jobs t1 '
   1262         'INNER JOIN afe_host_queue_entries t2  ON '
   1263         '  (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 '
   1264         '   %(check_known_jobs)s) '
   1265         'LEFT OUTER JOIN afe_jobs_dependency_labels t3 ON (t1.id = t3.job_id) '
   1266         'WHERE (t3.label_id IN  '
   1267         '  (SELECT label_id FROM afe_shards_labels '
   1268         '   WHERE shard_id = %(shard_id)s) '
   1269         '  OR t2.meta_host IN '
   1270         '  (SELECT label_id FROM afe_shards_labels '
   1271         '   WHERE shard_id = %(shard_id)s))'
   1272         )
   1273 
   1274     # Jobs can be created with assigned hosts and have no dependency
   1275     # labels nor meta_host.
   1276     # We are looking for:
   1277     #     - a job whose hqe's meta_host is null
   1278     #     - a job whose hqe has a host
   1279     #     - one of the host's labels matches the shard's label.
   1280     # Non-aborted known jobs, completed jobs, active jobs, jobs
   1281     # without hqe are exluded as we do with SQL_SHARD_JOBS.
   1282     SQL_SHARD_JOBS_WITH_HOSTS = (
   1283         'SELECT DISTINCT(t1.id) FROM afe_jobs t1 '
   1284         'INNER JOIN afe_host_queue_entries t2 ON '
   1285         '  (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 '
   1286         '   AND t2.meta_host IS NULL AND t2.host_id IS NOT NULL '
   1287         '   %(check_known_jobs)s) '
   1288         'LEFT OUTER JOIN afe_hosts_labels t3 ON (t2.host_id = t3.host_id) '
   1289         'WHERE (t3.label_id IN '
   1290         '  (SELECT label_id FROM afe_shards_labels '
   1291         '   WHERE shard_id = %(shard_id)s))'
   1292         )
   1293 
   1294     # Even if we had filters about complete, active and aborted
   1295     # bits in the above two SQLs, there is a chance that
   1296     # the result may still contain a job with an hqe with 'complete=1'
   1297     # or 'active=1' or 'aborted=0 and afe_job.id in known jobs.'
   1298     # This happens when a job has two (or more) hqes and at least
   1299     # one hqe has different bits than others.
   1300     # We use a second sql to ensure we exclude all un-desired jobs.
   1301     SQL_JOBS_TO_EXCLUDE =(
   1302         'SELECT t1.id FROM afe_jobs t1 '
   1303         'INNER JOIN afe_host_queue_entries t2 ON '
   1304         '  (t1.id = t2.job_id) '
   1305         'WHERE (t1.id in (%(candidates)s) '
   1306         '  AND (t2.complete=1 OR t2.active=1 '
   1307         '  %(check_known_jobs)s))'
   1308         )
   1309 
   1310     def _deserialize_relation(self, link, data):
   1311         if link in ['hostqueueentry_set', 'jobkeyval_set']:
   1312             for obj in data:
   1313                 obj['job_id'] = self.id
   1314 
   1315         super(Job, self)._deserialize_relation(link, data)
   1316 
   1317 
   1318     def custom_deserialize_relation(self, link, data):
   1319         assert link == 'shard', 'Link %s should not be deserialized' % link
   1320         self.shard = Shard.deserialize(data)
   1321 
   1322 
   1323     def sanity_check_update_from_shard(self, shard, updated_serialized):
   1324         # If the job got aborted on the master after the client fetched it
   1325         # no shard_id will be set. The shard might still push updates though,
   1326         # as the job might complete before the abort bit syncs to the shard.
   1327         # Alternative considered: The master scheduler could be changed to not
   1328         # set aborted jobs to completed that are sharded out. But that would
   1329         # require database queries and seemed more complicated to implement.
   1330         # This seems safe to do, as there won't be updates pushed from the wrong
   1331         # shards should be powered off and wiped hen they are removed from the
   1332         # master.
   1333         if self.shard_id and self.shard_id != shard.id:
   1334             raise error.UnallowedRecordsSentToMaster(
   1335                 'Job id=%s is assigned to shard (%s). Cannot update it with %s '
   1336                 'from shard %s.' % (self.id, self.shard_id, updated_serialized,
   1337                                     shard.id))
   1338 
   1339 
   1340     # TIMEOUT is deprecated.
   1341     DEFAULT_TIMEOUT = global_config.global_config.get_config_value(
   1342         'AUTOTEST_WEB', 'job_timeout_default', default=24)
   1343     DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value(
   1344         'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60)
   1345     # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is
   1346     # completed.
   1347     DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value(
   1348         'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72)
   1349     DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value(
   1350         'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60)
   1351     DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value(
   1352         'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool,
   1353         default=False)
   1354 
   1355     owner = dbmodels.CharField(max_length=255)
   1356     name = dbmodels.CharField(max_length=255)
   1357     priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT)
   1358     control_file = dbmodels.TextField(null=True, blank=True)
   1359     control_type = dbmodels.SmallIntegerField(
   1360         choices=control_data.CONTROL_TYPE.choices(),
   1361         blank=True, # to allow 0
   1362         default=control_data.CONTROL_TYPE.CLIENT)
   1363     created_on = dbmodels.DateTimeField()
   1364     synch_count = dbmodels.IntegerField(blank=True, default=0)
   1365     timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT)
   1366     run_verify = dbmodels.BooleanField(default=False)
   1367     email_list = dbmodels.CharField(max_length=250, blank=True)
   1368     dependency_labels = (
   1369             dbmodels.ManyToManyField(Label, blank=True,
   1370                                      db_table='afe_jobs_dependency_labels'))
   1371     reboot_before = dbmodels.SmallIntegerField(
   1372         choices=model_attributes.RebootBefore.choices(), blank=True,
   1373         default=DEFAULT_REBOOT_BEFORE)
   1374     reboot_after = dbmodels.SmallIntegerField(
   1375         choices=model_attributes.RebootAfter.choices(), blank=True,
   1376         default=DEFAULT_REBOOT_AFTER)
   1377     parse_failed_repair = dbmodels.BooleanField(
   1378         default=DEFAULT_PARSE_FAILED_REPAIR)
   1379     # max_runtime_hrs is deprecated. Will be removed after switch to mins is
   1380     # completed.
   1381     max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS)
   1382     max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS)
   1383     drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True)
   1384 
   1385     parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True,
   1386                                             blank=True)
   1387 
   1388     parent_job = dbmodels.ForeignKey('self', blank=True, null=True)
   1389 
   1390     test_retry = dbmodels.IntegerField(blank=True, default=0)
   1391 
   1392     run_reset = dbmodels.BooleanField(default=True)
   1393 
   1394     timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS)
   1395 
   1396     # If this is None on the master, a slave should be found.
   1397     # If this is None on a slave, it should be synced back to the master
   1398     shard = dbmodels.ForeignKey(Shard, blank=True, null=True)
   1399 
   1400     # If this is None, server-side packaging will be used for server side test,
   1401     # unless it's disabled in global config AUTOSERV/enable_ssp_container.
   1402     require_ssp = dbmodels.NullBooleanField(default=None, blank=True, null=True)
   1403 
   1404     # custom manager
   1405     objects = JobManager()
   1406 
   1407 
   1408     @decorators.cached_property
   1409     def labels(self):
   1410         """All the labels of this job"""
   1411         # We need to convert dependency_labels to a list, because all() gives us
   1412         # back an iterator, and storing/caching an iterator means we'd only be
   1413         # able to read from it once.
   1414         return list(self.dependency_labels.all())
   1415 
   1416 
   1417     def is_server_job(self):
   1418         """Returns whether this job is of type server."""
   1419         return self.control_type == control_data.CONTROL_TYPE.SERVER
   1420 
   1421 
   1422     @classmethod
   1423     def parameterized_jobs_enabled(cls):
   1424         """Returns whether parameterized jobs are enabled.
   1425 
   1426         @param cls: Implicit class object.
   1427         """
   1428         return global_config.global_config.get_config_value(
   1429                 'AUTOTEST_WEB', 'parameterized_jobs', type=bool)
   1430 
   1431 
   1432     @classmethod
   1433     def check_parameterized_job(cls, control_file, parameterized_job):
   1434         """Checks that the job is valid given the global config settings.
   1435 
   1436         First, either control_file must be set, or parameterized_job must be
   1437         set, but not both. Second, parameterized_job must be set if and only if
   1438         the parameterized_jobs option in the global config is set to True.
   1439 
   1440         @param cls: Implict class object.
   1441         @param control_file: A control file.
   1442         @param parameterized_job: A parameterized job.
   1443         """
   1444         if not (bool(control_file) ^ bool(parameterized_job)):
   1445             raise Exception('Job must have either control file or '
   1446                             'parameterization, but not both')
   1447 
   1448         parameterized_jobs_enabled = cls.parameterized_jobs_enabled()
   1449         if control_file and parameterized_jobs_enabled:
   1450             raise Exception('Control file specified, but parameterized jobs '
   1451                             'are enabled')
   1452         if parameterized_job and not parameterized_jobs_enabled:
   1453             raise Exception('Parameterized job specified, but parameterized '
   1454                             'jobs are not enabled')
   1455 
   1456 
   1457     @classmethod
   1458     def create(cls, owner, options, hosts):
   1459         """Creates a job.
   1460 
   1461         The job is created by taking some information (the listed args) and
   1462         filling in the rest of the necessary information.
   1463 
   1464         @param cls: Implicit class object.
   1465         @param owner: The owner for the job.
   1466         @param options: An options object.
   1467         @param hosts: The hosts to use.
   1468         """
   1469         AclGroup.check_for_acl_violation_hosts(hosts)
   1470 
   1471         control_file = options.get('control_file')
   1472         parameterized_job = options.get('parameterized_job')
   1473 
   1474         # The current implementation of parameterized jobs requires that only
   1475         # control files or parameterized jobs are used. Using the image
   1476         # parameter on autoupdate_ParameterizedJob doesn't mix pure
   1477         # parameterized jobs and control files jobs, it does muck enough with
   1478         # normal jobs by adding a parameterized id to them that this check will
   1479         # fail. So for now we just skip this check.
   1480         # cls.check_parameterized_job(control_file=control_file,
   1481         #                             parameterized_job=parameterized_job)
   1482         user = User.current_user()
   1483         if options.get('reboot_before') is None:
   1484             options['reboot_before'] = user.get_reboot_before_display()
   1485         if options.get('reboot_after') is None:
   1486             options['reboot_after'] = user.get_reboot_after_display()
   1487 
   1488         drone_set = DroneSet.resolve_name(options.get('drone_set'))
   1489 
   1490         if options.get('timeout_mins') is None and options.get('timeout'):
   1491             options['timeout_mins'] = options['timeout'] * 60
   1492 
   1493         job = cls.add_object(
   1494             owner=owner,
   1495             name=options['name'],
   1496             priority=options['priority'],
   1497             control_file=control_file,
   1498             control_type=options['control_type'],
   1499             synch_count=options.get('synch_count'),
   1500             # timeout needs to be deleted in the future.
   1501             timeout=options.get('timeout'),
   1502             timeout_mins=options.get('timeout_mins'),
   1503             max_runtime_mins=options.get('max_runtime_mins'),
   1504             run_verify=options.get('run_verify'),
   1505             email_list=options.get('email_list'),
   1506             reboot_before=options.get('reboot_before'),
   1507             reboot_after=options.get('reboot_after'),
   1508             parse_failed_repair=options.get('parse_failed_repair'),
   1509             created_on=datetime.now(),
   1510             drone_set=drone_set,
   1511             parameterized_job=parameterized_job,
   1512             parent_job=options.get('parent_job_id'),
   1513             test_retry=options.get('test_retry'),
   1514             run_reset=options.get('run_reset'),
   1515             require_ssp=options.get('require_ssp'))
   1516 
   1517         job.dependency_labels = options['dependencies']
   1518 
   1519         if options.get('keyvals'):
   1520             for key, value in options['keyvals'].iteritems():
   1521                 JobKeyval.objects.create(job=job, key=key, value=value)
   1522 
   1523         return job
   1524 
   1525 
   1526     @classmethod
   1527     def assign_to_shard(cls, shard, known_ids):
   1528         """Assigns unassigned jobs to a shard.
   1529 
   1530         For all labels that have been assigned to this shard, all jobs that
   1531         have this label, are assigned to this shard.
   1532 
   1533         Jobs that are assigned to the shard but aren't already present on the
   1534         shard are returned.
   1535 
   1536         @param shard: The shard to assign jobs to.
   1537         @param known_ids: List of all ids of incomplete jobs, the shard already
   1538                           knows about.
   1539                           This is used to figure out which jobs should be sent
   1540                           to the shard. If shard_ids were used instead, jobs
   1541                           would only be transferred once, even if the client
   1542                           failed persisting them.
   1543                           The number of unfinished jobs usually lies in O(1000).
   1544                           Assuming one id takes 8 chars in the json, this means
   1545                           overhead that lies in the lower kilobyte range.
   1546                           A not in query with 5000 id's takes about 30ms.
   1547 
   1548         @returns The job objects that should be sent to the shard.
   1549         """
   1550         # Disclaimer: Concurrent heartbeats should not occur in today's setup.
   1551         # If this changes or they are triggered manually, this applies:
   1552         # Jobs may be returned more than once by concurrent calls of this
   1553         # function, as there is a race condition between SELECT and UPDATE.
   1554         job_ids = set([])
   1555         check_known_jobs_exclude = ''
   1556         check_known_jobs_include = ''
   1557 
   1558         if known_ids:
   1559             check_known_jobs = (
   1560                     cls.NON_ABORTED_KNOWN_JOBS %
   1561                     {'known_ids': ','.join([str(i) for i in known_ids])})
   1562             check_known_jobs_exclude = 'AND NOT ' + check_known_jobs
   1563             check_known_jobs_include = 'OR ' + check_known_jobs
   1564 
   1565         for sql in [cls.SQL_SHARD_JOBS, cls.SQL_SHARD_JOBS_WITH_HOSTS]:
   1566             query = Job.objects.raw(sql % {
   1567                     'check_known_jobs': check_known_jobs_exclude,
   1568                     'shard_id': shard.id})
   1569             job_ids |= set([j.id for j in query])
   1570 
   1571         if job_ids:
   1572             query = Job.objects.raw(
   1573                     cls.SQL_JOBS_TO_EXCLUDE %
   1574                     {'check_known_jobs': check_known_jobs_include,
   1575                      'candidates': ','.join([str(i) for i in job_ids])})
   1576             job_ids -= set([j.id for j in query])
   1577 
   1578         if job_ids:
   1579             Job.objects.filter(pk__in=job_ids).update(shard=shard)
   1580             return list(Job.objects.filter(pk__in=job_ids).all())
   1581         return []
   1582 
   1583 
   1584     def save(self, *args, **kwargs):
   1585         # The current implementation of parameterized jobs requires that only
   1586         # control files or parameterized jobs are used. Using the image
   1587         # parameter on autoupdate_ParameterizedJob doesn't mix pure
   1588         # parameterized jobs and control files jobs, it does muck enough with
   1589         # normal jobs by adding a parameterized id to them that this check will
   1590         # fail. So for now we just skip this check.
   1591         # cls.check_parameterized_job(control_file=self.control_file,
   1592         #                             parameterized_job=self.parameterized_job)
   1593         super(Job, self).save(*args, **kwargs)
   1594 
   1595 
   1596     def queue(self, hosts, atomic_group=None, is_template=False):
   1597         """Enqueue a job on the given hosts.
   1598 
   1599         @param hosts: The hosts to use.
   1600         @param atomic_group: The associated atomic group.
   1601         @param is_template: Whether the status should be "Template".
   1602         """
   1603         if not hosts:
   1604             if atomic_group:
   1605                 # No hosts or labels are required to queue an atomic group
   1606                 # Job.  However, if they are given, we respect them below.
   1607                 atomic_group.enqueue_job(self, is_template=is_template)
   1608             else:
   1609                 # hostless job
   1610                 entry = HostQueueEntry.create(job=self, is_template=is_template)
   1611                 entry.save()
   1612             return
   1613 
   1614         for host in hosts:
   1615             host.enqueue_job(self, atomic_group=atomic_group,
   1616                              is_template=is_template)
   1617 
   1618 
   1619     def create_recurring_job(self, start_date, loop_period, loop_count, owner):
   1620         """Creates a recurring job.
   1621 
   1622         @param start_date: The starting date of the job.
   1623         @param loop_period: How often to re-run the job, in seconds.
   1624         @param loop_count: The re-run count.
   1625         @param owner: The owner of the job.
   1626         """
   1627         rec = RecurringRun(job=self, start_date=start_date,
   1628                            loop_period=loop_period,
   1629                            loop_count=loop_count,
   1630                            owner=User.objects.get(login=owner))
   1631         rec.save()
   1632         return rec.id
   1633 
   1634 
   1635     def user(self):
   1636         """Gets the user of this job, or None if it doesn't exist."""
   1637         try:
   1638             return User.objects.get(login=self.owner)
   1639         except self.DoesNotExist:
   1640             return None
   1641 
   1642 
   1643     def abort(self):
   1644         """Aborts this job."""
   1645         for queue_entry in self.hostqueueentry_set.all():
   1646             queue_entry.abort()
   1647 
   1648 
   1649     def tag(self):
   1650         """Returns a string tag for this job."""
   1651         return server_utils.get_job_tag(self.id, self.owner)
   1652 
   1653 
   1654     def keyval_dict(self):
   1655         """Returns all keyvals for this job as a dictionary."""
   1656         return dict((keyval.key, keyval.value)
   1657                     for keyval in self.jobkeyval_set.all())
   1658 
   1659 
   1660     @classmethod
   1661     def get_attribute_model(cls):
   1662         """Return the attribute model.
   1663 
   1664         Override method in parent class. This class is called when
   1665         deserializing the one-to-many relationship betwen Job and JobKeyval.
   1666         On deserialization, we will try to clear any existing job keyvals
   1667         associated with a job to avoid any inconsistency.
   1668         Though Job doesn't implement ModelWithAttribute, we still treat
   1669         it as an attribute model for this purpose.
   1670 
   1671         @returns: The attribute model of Job.
   1672         """
   1673         return JobKeyval
   1674 
   1675 
   1676     class Meta:
   1677         """Metadata for class Job."""
   1678         db_table = 'afe_jobs'
   1679 
   1680     def __unicode__(self):
   1681         return u'%s (%s-%s)' % (self.name, self.id, self.owner)
   1682 
   1683 
   1684 class JobKeyval(dbmodels.Model, model_logic.ModelExtensions):
   1685     """Keyvals associated with jobs"""
   1686 
   1687     SERIALIZATION_LINKS_TO_KEEP = set(['job'])
   1688     SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value'])
   1689 
   1690     job = dbmodels.ForeignKey(Job)
   1691     key = dbmodels.CharField(max_length=90)
   1692     value = dbmodels.CharField(max_length=300)
   1693 
   1694     objects = model_logic.ExtendedManager()
   1695 
   1696 
   1697     @classmethod
   1698     def get_record(cls, data):
   1699         """Check the database for an identical record.
   1700 
   1701         Use job_id and key to search for a existing record.
   1702 
   1703         @raises: DoesNotExist, if no record found
   1704         @raises: MultipleObjectsReturned if multiple records found.
   1705         """
   1706         # TODO(fdeng): We should use job_id and key together as
   1707         #              a primary key in the db.
   1708         return cls.objects.get(job_id=data['job_id'], key=data['key'])
   1709 
   1710 
   1711     @classmethod
   1712     def deserialize(cls, data):
   1713         """Override deserialize in parent class.
   1714 
   1715         Do not deserialize id as id is not kept consistent on master and shards.
   1716 
   1717         @param data: A dictionary of data to deserialize.
   1718 
   1719         @returns: A JobKeyval object.
   1720         """
   1721         if data:
   1722             data.pop('id')
   1723         return super(JobKeyval, cls).deserialize(data)
   1724 
   1725 
   1726     class Meta:
   1727         """Metadata for class JobKeyval."""
   1728         db_table = 'afe_job_keyvals'
   1729 
   1730 
   1731 class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions):
   1732     """Represents an ineligible host queue."""
   1733     job = dbmodels.ForeignKey(Job)
   1734     host = dbmodels.ForeignKey(Host)
   1735 
   1736     objects = model_logic.ExtendedManager()
   1737 
   1738     class Meta:
   1739         """Metadata for class IneligibleHostQueue."""
   1740         db_table = 'afe_ineligible_host_queues'
   1741 
   1742 
   1743 class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions):
   1744     """Represents a host queue entry."""
   1745 
   1746     SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host'])
   1747     SERIALIZATION_LINKS_TO_KEEP = set(['host'])
   1748     SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted'])
   1749 
   1750 
   1751     def custom_deserialize_relation(self, link, data):
   1752         assert link == 'meta_host'
   1753         self.meta_host = Label.deserialize(data)
   1754 
   1755 
   1756     def sanity_check_update_from_shard(self, shard, updated_serialized,
   1757                                        job_ids_sent):
   1758         if self.job_id not in job_ids_sent:
   1759             raise error.UnallowedRecordsSentToMaster(
   1760                 'Sent HostQueueEntry without corresponding '
   1761                 'job entry: %s' % updated_serialized)
   1762 
   1763 
   1764     Status = host_queue_entry_states.Status
   1765     ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES
   1766     COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES
   1767 
   1768     job = dbmodels.ForeignKey(Job)
   1769     host = dbmodels.ForeignKey(Host, blank=True, null=True)
   1770     status = dbmodels.CharField(max_length=255)
   1771     meta_host = dbmodels.ForeignKey(Label, blank=True, null=True,
   1772                                     db_column='meta_host')
   1773     active = dbmodels.BooleanField(default=False)
   1774     complete = dbmodels.BooleanField(default=False)
   1775     deleted = dbmodels.BooleanField(default=False)
   1776     execution_subdir = dbmodels.CharField(max_length=255, blank=True,
   1777                                           default='')
   1778     # If atomic_group is set, this is a virtual HostQueueEntry that will
   1779     # be expanded into many actual hosts within the group at schedule time.
   1780     atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True)
   1781     aborted = dbmodels.BooleanField(default=False)
   1782     started_on = dbmodels.DateTimeField(null=True, blank=True)
   1783     finished_on = dbmodels.DateTimeField(null=True, blank=True)
   1784 
   1785     objects = model_logic.ExtendedManager()
   1786 
   1787 
   1788     def __init__(self, *args, **kwargs):
   1789         super(HostQueueEntry, self).__init__(*args, **kwargs)
   1790         self._record_attributes(['status'])
   1791 
   1792 
   1793     @classmethod
   1794     def create(cls, job, host=None, meta_host=None, atomic_group=None,
   1795                  is_template=False):
   1796         """Creates a new host queue entry.
   1797 
   1798         @param cls: Implicit class object.
   1799         @param job: The associated job.
   1800         @param host: The associated host.
   1801         @param meta_host: The associated meta host.
   1802         @param atomic_group: The associated atomic group.
   1803         @param is_template: Whether the status should be "Template".
   1804         """
   1805         if is_template:
   1806             status = cls.Status.TEMPLATE
   1807         else:
   1808             status = cls.Status.QUEUED
   1809 
   1810         return cls(job=job, host=host, meta_host=meta_host,
   1811                    atomic_group=atomic_group, status=status)
   1812 
   1813 
   1814     def save(self, *args, **kwargs):
   1815         self._set_active_and_complete()
   1816         super(HostQueueEntry, self).save(*args, **kwargs)
   1817         self._check_for_updated_attributes()
   1818 
   1819 
   1820     def execution_path(self):
   1821         """
   1822         Path to this entry's results (relative to the base results directory).
   1823         """
   1824         return server_utils.get_hqe_exec_path(self.job.tag(),
   1825                                               self.execution_subdir)
   1826 
   1827 
   1828     def host_or_metahost_name(self):
   1829         """Returns the first non-None name found in priority order.
   1830 
   1831         The priority order checked is: (1) host name; (2) meta host name; and
   1832         (3) atomic group name.
   1833         """
   1834         if self.host:
   1835             return self.host.hostname
   1836         elif self.meta_host:
   1837             return self.meta_host.name
   1838         else:
   1839             assert self.atomic_group, "no host, meta_host or atomic group!"
   1840             return self.atomic_group.name
   1841 
   1842 
   1843     def _set_active_and_complete(self):
   1844         if self.status in self.ACTIVE_STATUSES:
   1845             self.active, self.complete = True, False
   1846         elif self.status in self.COMPLETE_STATUSES:
   1847             self.active, self.complete = False, True
   1848         else:
   1849             self.active, self.complete = False, False
   1850 
   1851 
   1852     def on_attribute_changed(self, attribute, old_value):
   1853         assert attribute == 'status'
   1854         logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id,
   1855                      self.status)
   1856 
   1857 
   1858     def is_meta_host_entry(self):
   1859         'True if this is a entry has a meta_host instead of a host.'
   1860         return self.host is None and self.meta_host is not None
   1861 
   1862 
   1863     # This code is shared between rpc_interface and models.HostQueueEntry.
   1864     # Sadly due to circular imports between the 2 (crbug.com/230100) making it
   1865     # a class method was the best way to refactor it. Attempting to put it in
   1866     # rpc_utils or a new utils module failed as that would require us to import
   1867     # models.py but to call it from here we would have to import the utils.py
   1868     # thus creating a cycle.
   1869     @classmethod
   1870     def abort_host_queue_entries(cls, host_queue_entries):
   1871         """Aborts a collection of host_queue_entries.
   1872 
   1873         Abort these host queue entry and all host queue entries of jobs created
   1874         by them.
   1875 
   1876         @param host_queue_entries: List of host queue entries we want to abort.
   1877         """
   1878         # This isn't completely immune to race conditions since it's not atomic,
   1879         # but it should be safe given the scheduler's behavior.
   1880 
   1881         # TODO(milleral): crbug.com/230100
   1882         # The |abort_host_queue_entries| rpc does nearly exactly this,
   1883         # however, trying to re-use the code generates some horrible
   1884         # circular import error.  I'd be nice to refactor things around
   1885         # sometime so the code could be reused.
   1886 
   1887         # Fixpoint algorithm to find the whole tree of HQEs to abort to
   1888         # minimize the total number of database queries:
   1889         children = set()
   1890         new_children = set(host_queue_entries)
   1891         while new_children:
   1892             children.update(new_children)
   1893             new_child_ids = [hqe.job_id for hqe in new_children]
   1894             new_children = HostQueueEntry.objects.filter(
   1895                     job__parent_job__in=new_child_ids,
   1896                     complete=False, aborted=False).all()
   1897             # To handle circular parental relationships
   1898             new_children = set(new_children) - children
   1899 
   1900         # Associate a user with the host queue entries that we're about
   1901         # to abort so that we can look up who to blame for the aborts.
   1902         now = datetime.now()
   1903         user = User.current_user()
   1904         aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe,
   1905                 aborted_by=user, aborted_on=now) for hqe in children]
   1906         AbortedHostQueueEntry.objects.bulk_create(aborted_hqes)
   1907         # Bulk update all of the HQEs to set the abort bit.
   1908         child_ids = [hqe.id for hqe in children]
   1909         HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True)
   1910 
   1911 
   1912     def abort(self):
   1913         """ Aborts this host queue entry.
   1914 
   1915         Abort this host queue entry and all host queue entries of jobs created by
   1916         this one.
   1917 
   1918         """
   1919         if not self.complete and not self.aborted:
   1920             HostQueueEntry.abort_host_queue_entries([self])
   1921 
   1922 
   1923     @classmethod
   1924     def compute_full_status(cls, status, aborted, complete):
   1925         """Returns a modified status msg if the host queue entry was aborted.
   1926 
   1927         @param cls: Implicit class object.
   1928         @param status: The original status message.
   1929         @param aborted: Whether the host queue entry was aborted.
   1930         @param complete: Whether the host queue entry was completed.
   1931         """
   1932         if aborted and not complete:
   1933             return 'Aborted (%s)' % status
   1934         return status
   1935 
   1936 
   1937     def full_status(self):
   1938         """Returns the full status of this host queue entry, as a string."""
   1939         return self.compute_full_status(self.status, self.aborted,
   1940                                         self.complete)
   1941 
   1942 
   1943     def _postprocess_object_dict(self, object_dict):
   1944         object_dict['full_status'] = self.full_status()
   1945 
   1946 
   1947     class Meta:
   1948         """Metadata for class HostQueueEntry."""
   1949         db_table = 'afe_host_queue_entries'
   1950 
   1951 
   1952 
   1953     def __unicode__(self):
   1954         hostname = None
   1955         if self.host:
   1956             hostname = self.host.hostname
   1957         return u"%s/%d (%d)" % (hostname, self.job.id, self.id)
   1958 
   1959 
   1960 class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions):
   1961     """Represents an aborted host queue entry."""
   1962     queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True)
   1963     aborted_by = dbmodels.ForeignKey(User)
   1964     aborted_on = dbmodels.DateTimeField()
   1965 
   1966     objects = model_logic.ExtendedManager()
   1967 
   1968 
   1969     def save(self, *args, **kwargs):
   1970         self.aborted_on = datetime.now()
   1971         super(AbortedHostQueueEntry, self).save(*args, **kwargs)
   1972 
   1973     class Meta:
   1974         """Metadata for class AbortedHostQueueEntry."""
   1975         db_table = 'afe_aborted_host_queue_entries'
   1976 
   1977 
   1978 class RecurringRun(dbmodels.Model, model_logic.ModelExtensions):
   1979     """\
   1980     job: job to use as a template
   1981     owner: owner of the instantiated template
   1982     start_date: Run the job at scheduled date
   1983     loop_period: Re-run (loop) the job periodically
   1984                  (in every loop_period seconds)
   1985     loop_count: Re-run (loop) count
   1986     """
   1987 
   1988     job = dbmodels.ForeignKey(Job)
   1989     owner = dbmodels.ForeignKey(User)
   1990     start_date = dbmodels.DateTimeField()
   1991     loop_period = dbmodels.IntegerField(blank=True)
   1992     loop_count = dbmodels.IntegerField(blank=True)
   1993 
   1994     objects = model_logic.ExtendedManager()
   1995 
   1996     class Meta:
   1997         """Metadata for class RecurringRun."""
   1998         db_table = 'afe_recurring_run'
   1999 
   2000     def __unicode__(self):
   2001         return u'RecurringRun(job %s, start %s, period %s, count %s)' % (
   2002             self.job.id, self.start_date, self.loop_period, self.loop_count)
   2003 
   2004 
   2005 class SpecialTask(dbmodels.Model, model_logic.ModelExtensions):
   2006     """\
   2007     Tasks to run on hosts at the next time they are in the Ready state. Use this
   2008     for high-priority tasks, such as forced repair or forced reinstall.
   2009 
   2010     host: host to run this task on
   2011     task: special task to run
   2012     time_requested: date and time the request for this task was made
   2013     is_active: task is currently running
   2014     is_complete: task has finished running
   2015     is_aborted: task was aborted
   2016     time_started: date and time the task started
   2017     time_finished: date and time the task finished
   2018     queue_entry: Host queue entry waiting on this task (or None, if task was not
   2019                  started in preparation of a job)
   2020     """
   2021     Task = enum.Enum('Verify', 'Cleanup', 'Repair', 'Reset', 'Provision',
   2022                      string_values=True)
   2023 
   2024     host = dbmodels.ForeignKey(Host, blank=False, null=False)
   2025     task = dbmodels.CharField(max_length=64, choices=Task.choices(),
   2026                               blank=False, null=False)
   2027     requested_by = dbmodels.ForeignKey(User)
   2028     time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False,
   2029                                             null=False)
   2030     is_active = dbmodels.BooleanField(default=False, blank=False, null=False)
   2031     is_complete = dbmodels.BooleanField(default=False, blank=False, null=False)
   2032     is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False)
   2033     time_started = dbmodels.DateTimeField(null=True, blank=True)
   2034     queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True)
   2035     success = dbmodels.BooleanField(default=False, blank=False, null=False)
   2036     time_finished = dbmodels.DateTimeField(null=True, blank=True)
   2037 
   2038     objects = model_logic.ExtendedManager()
   2039 
   2040 
   2041     def save(self, **kwargs):
   2042         if self.queue_entry:
   2043             self.requested_by = User.objects.get(
   2044                     login=self.queue_entry.job.owner)
   2045         super(SpecialTask, self).save(**kwargs)
   2046 
   2047 
   2048     def execution_path(self):
   2049         """Returns the execution path for a special task."""
   2050         return server_utils.get_special_task_exec_path(
   2051                 self.host.hostname, self.id, self.task, self.time_requested)
   2052 
   2053 
   2054     # property to emulate HostQueueEntry.status
   2055     @property
   2056     def status(self):
   2057         """Returns a host queue entry status appropriate for a speical task."""
   2058         return server_utils.get_special_task_status(
   2059                 self.is_complete, self.success, self.is_active)
   2060 
   2061 
   2062     # property to emulate HostQueueEntry.started_on
   2063     @property
   2064     def started_on(self):
   2065         """Returns the time at which this special task started."""
   2066         return self.time_started
   2067 
   2068 
   2069     @classmethod
   2070     def schedule_special_task(cls, host, task):
   2071         """Schedules a special task on a host if not already scheduled.
   2072 
   2073         @param cls: Implicit class object.
   2074         @param host: The host to use.
   2075         @param task: The task to schedule.
   2076         """
   2077         existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task,
   2078                                                     is_active=False,
   2079                                                     is_complete=False)
   2080         if existing_tasks:
   2081             return existing_tasks[0]
   2082 
   2083         special_task = SpecialTask(host=host, task=task,
   2084                                    requested_by=User.current_user())
   2085         special_task.save()
   2086         return special_task
   2087 
   2088 
   2089     def abort(self):
   2090         """ Abort this special task."""
   2091         self.is_aborted = True
   2092         self.save()
   2093 
   2094 
   2095     def activate(self):
   2096         """
   2097         Sets a task as active and sets the time started to the current time.
   2098         """
   2099         logging.info('Starting: %s', self)
   2100         self.is_active = True
   2101         self.time_started = datetime.now()
   2102         self.save()
   2103 
   2104 
   2105     def finish(self, success):
   2106         """Sets a task as completed.
   2107 
   2108         @param success: Whether or not the task was successful.
   2109         """
   2110         logging.info('Finished: %s', self)
   2111         self.is_active = False
   2112         self.is_complete = True
   2113         self.success = success
   2114         if self.time_started:
   2115             self.time_finished = datetime.now()
   2116         self.save()
   2117 
   2118 
   2119     class Meta:
   2120         """Metadata for class SpecialTask."""
   2121         db_table = 'afe_special_tasks'
   2122 
   2123 
   2124     def __unicode__(self):
   2125         result = u'Special Task %s (host %s, task %s, time %s)' % (
   2126             self.id, self.host, self.task, self.time_requested)
   2127         if self.is_complete:
   2128             result += u' (completed)'
   2129         elif self.is_active:
   2130             result += u' (active)'
   2131 
   2132         return result
   2133 
   2134 
   2135 class StableVersion(dbmodels.Model, model_logic.ModelExtensions):
   2136 
   2137     board = dbmodels.CharField(max_length=255, unique=True)
   2138     version = dbmodels.CharField(max_length=255)
   2139 
   2140     class Meta:
   2141         """Metadata for class StableVersion."""
   2142         db_table = 'afe_stable_versions'
   2143