Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 #pylint: disable-msg=C0111
      3 
      4 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      5 # Use of this source code is governed by a BSD-style license that can be
      6 # found in the LICENSE file.
      7 
      8 import abc
      9 import os
     10 
     11 import common
     12 
     13 from autotest_lib.database import database_connection
     14 from autotest_lib.frontend import setup_django_environment
     15 from autotest_lib.frontend.afe import frontend_test_utils
     16 from autotest_lib.frontend.afe import models
     17 from autotest_lib.frontend.afe import rdb_model_extensions as rdb_models
     18 from autotest_lib.scheduler import monitor_db
     19 from autotest_lib.scheduler import query_managers
     20 from autotest_lib.scheduler import scheduler_lib
     21 from autotest_lib.scheduler import scheduler_models
     22 from autotest_lib.scheduler import rdb_hosts
     23 from autotest_lib.scheduler import rdb_requests
     24 from autotest_lib.server.cros import provision
     25 
     26 
     27 # Set for verbose table creation output.
     28 _DEBUG = False
     29 DEFAULT_ACLS = ['Everyone', 'my_acl']
     30 DEFAULT_DEPS = ['a', 'b']
     31 DEFAULT_USER = 'system'
     32 
     33 
     34 def get_default_job_params():
     35     return {'deps': DEFAULT_DEPS, 'user': DEFAULT_USER, 'acls': DEFAULT_ACLS,
     36             'priority': 0, 'parent_job_id': 0}
     37 
     38 
     39 def get_default_host_params():
     40     return {'deps': DEFAULT_DEPS, 'acls': DEFAULT_ACLS}
     41 
     42 
     43 class FakeHost(rdb_hosts.RDBHost):
     44     """Fake host to use in unittests."""
     45 
     46     def __init__(self, hostname, host_id, **kwargs):
     47         kwargs.update({'hostname': hostname, 'id': host_id})
     48         kwargs = rdb_models.AbstractHostModel.provide_default_values(
     49                 kwargs)
     50         super(FakeHost, self).__init__(**kwargs)
     51 
     52 
     53 def wire_format_response_map(response_map):
     54     wire_formatted_map = {}
     55     for request, response in response_map.iteritems():
     56         wire_formatted_map[request] = [reply.wire_format()
     57                                        for reply in response]
     58     return wire_formatted_map
     59 
     60 
     61 class DBHelper(object):
     62     """Utility class for updating the database."""
     63 
     64     def __init__(self):
     65         """Initialized django so it uses an in memory SQLite database."""
     66         self.database = (
     67             database_connection.TranslatingDatabase.get_test_database(
     68                 translators=scheduler_lib._DB_TRANSLATORS))
     69         self.database.connect(db_type='django')
     70         self.database.debug = _DEBUG
     71 
     72 
     73     @classmethod
     74     def get_labels(cls, **kwargs):
     75         """Get a label queryset based on the kwargs."""
     76         return models.Label.objects.filter(**kwargs)
     77 
     78 
     79     @classmethod
     80     def get_acls(cls, **kwargs):
     81         """Get an aclgroup queryset based on the kwargs."""
     82         return models.AclGroup.objects.filter(**kwargs)
     83 
     84 
     85     @classmethod
     86     def get_host(cls, **kwargs):
     87         """Get a host queryset based on the kwargs."""
     88         return models.Host.objects.filter(**kwargs)
     89 
     90 
     91     @classmethod
     92     def get_hqes(cls, **kwargs):
     93         return models.HostQueueEntry.objects.filter(**kwargs)
     94 
     95 
     96     @classmethod
     97     def get_tasks(cls, **kwargs):
     98         return models.SpecialTask.objects.filter(**kwargs)
     99 
    100 
    101     @classmethod
    102     def get_shard(cls, **kwargs):
    103         return models.Shard.objects.filter(**kwargs)
    104 
    105 
    106     @classmethod
    107     def create_label(cls, name, **kwargs):
    108         label = cls.get_labels(name=name, **kwargs)
    109         return (models.Label.add_object(name=name, **kwargs)
    110                 if not label else label[0])
    111 
    112 
    113     @classmethod
    114     def create_user(cls, name):
    115         user = models.User.objects.filter(login=name)
    116         return models.User.add_object(login=name) if not user else user[0]
    117 
    118 
    119     @classmethod
    120     def create_special_task(cls, job_id=None, host_id=None,
    121                             task=models.SpecialTask.Task.VERIFY,
    122                             user='autotest-system'):
    123         if job_id:
    124             queue_entry = cls.get_hqes(job_id=job_id)[0]
    125             host_id = queue_entry.host.id
    126         else:
    127             queue_entry = None
    128         host = models.Host.objects.get(id=host_id)
    129         owner = cls.create_user(user)
    130         if not host:
    131             raise ValueError('Require a host to create special tasks.')
    132         return models.SpecialTask.objects.create(
    133                 host=host, queue_entry=queue_entry, task=task,
    134                 requested_by_id=owner.id)
    135 
    136 
    137     @classmethod
    138     def create_shard(cls, shard_hostname):
    139         """Create a shard with the given hostname if one doesn't already exist.
    140 
    141         @param shard_hostname: The hostname of the shard.
    142         """
    143         shard = cls.get_shard(hostname=shard_hostname)
    144         return (models.Shard.objects.create(hostname=shard_hostname)
    145                 if not shard else shard[0])
    146 
    147 
    148     @classmethod
    149     def add_labels_to_host(cls, host, label_names=set([])):
    150         label_objects = set([])
    151         for label in label_names:
    152             label_objects.add(cls.create_label(label))
    153         host.labels.add(*label_objects)
    154 
    155 
    156     @classmethod
    157     def create_acl_group(cls, name):
    158         aclgroup = cls.get_acls(name=name)
    159         return (models.AclGroup.add_object(name=name)
    160                 if not aclgroup else aclgroup[0])
    161 
    162 
    163     @classmethod
    164     def add_deps_to_job(cls, job, dep_names=set([])):
    165         label_objects = set([])
    166         for label in dep_names:
    167             label_objects.add(cls.create_label(label))
    168         job.dependency_labels.add(*label_objects)
    169 
    170 
    171     @classmethod
    172     def assign_job_to_shard(cls, job_id, shard_hostname):
    173         """Assign a job to a shard.
    174 
    175         @param job: A job object without a shard.
    176         @param shard_hostname: The hostname of a shard to assign the job.
    177 
    178         @raises ValueError: If the job already has a shard.
    179         """
    180         job_filter = models.Job.objects.filter(id=job_id, shard__isnull=True)
    181         if len(job_filter) != 1:
    182             raise ValueError('Failed to assign job %s to shard %s' %
    183                              job_filter, shard_hostname)
    184         job_filter.update(shard=cls.create_shard(shard_hostname))
    185 
    186 
    187     @classmethod
    188     def add_host_to_aclgroup(cls, host, aclgroup_names=set([])):
    189         for group_name in aclgroup_names:
    190             aclgroup = cls.create_acl_group(group_name)
    191             aclgroup.hosts.add(host)
    192 
    193 
    194     @classmethod
    195     def add_user_to_aclgroups(cls, username, aclgroup_names=set([])):
    196         user = cls.create_user(username)
    197         for group_name in aclgroup_names:
    198             aclgroup = cls.create_acl_group(group_name)
    199             aclgroup.users.add(user)
    200 
    201 
    202     @classmethod
    203     def create_host(cls, name, deps=set([]), acls=set([]), status='Ready',
    204                  locked=0, lock_reason='', leased=0, protection=0, dirty=0):
    205         """Create a host.
    206 
    207         Also adds the appropriate labels to the host, and adds the host to the
    208         required acl groups.
    209 
    210         @param name: The hostname.
    211         @param kwargs:
    212             deps: The labels on the host that match job deps.
    213             acls: The aclgroups this host must be a part of.
    214             status: The status of the host.
    215             locked: 1 if the host is locked.
    216             lock_reason: non-empty string if the host is locked.
    217             leased: 1 if the host is leased.
    218             protection: Any protection level, such as Do Not Verify.
    219             dirty: 1 if the host requires cleanup.
    220 
    221         @return: The host object for the new host.
    222         """
    223         # TODO: Modify this to use the create host request once
    224         # crbug.com/350995 is fixed.
    225         host = models.Host.add_object(
    226                 hostname=name, status=status, locked=locked,
    227                 lock_reason=lock_reason, leased=leased,
    228                 protection=protection)
    229         cls.add_labels_to_host(host, label_names=deps)
    230         cls.add_host_to_aclgroup(host, aclgroup_names=acls)
    231 
    232         # Though we can return the host object above, this proves that the host
    233         # actually got saved in the database. For example, this will return none
    234         # if save() wasn't called on the model.Host instance.
    235         return cls.get_host(hostname=name)[0]
    236 
    237 
    238     @classmethod
    239     def update_hqe(cls, hqe_id, **kwargs):
    240         """Update the hqe with the given kwargs.
    241 
    242         @param hqe_id: The id of the hqe to update.
    243         """
    244         models.HostQueueEntry.objects.filter(id=hqe_id).update(**kwargs)
    245 
    246 
    247     @classmethod
    248     def update_special_task(cls, task_id, **kwargs):
    249         """Update special tasks with the given kwargs.
    250 
    251         @param task_id: The if of the task to update.
    252         """
    253         models.SpecialTask.objects.filter(id=task_id).update(**kwargs)
    254 
    255 
    256     @classmethod
    257     def add_host_to_job(cls, host, job_id, activate=0):
    258         """Add a host to the hqe of a job.
    259 
    260         @param host: An instance of the host model.
    261         @param job_id: The job to which we need to add the host.
    262         @param activate: If true, flip the active bit on the hqe.
    263 
    264         @raises ValueError: If the hqe for the job already has a host,
    265             or if the host argument isn't a Host instance.
    266         """
    267         hqe = models.HostQueueEntry.objects.get(job_id=job_id)
    268         if hqe.host:
    269             raise ValueError('HQE for job %s already has a host' % job_id)
    270         hqe.host = host
    271         hqe.save()
    272         if activate:
    273             cls.update_hqe(hqe.id, active=True)
    274 
    275 
    276     @classmethod
    277     def increment_priority(cls, job_id):
    278         job = models.Job.objects.get(id=job_id)
    279         job.priority = job.priority + 1
    280         job.save()
    281 
    282 
    283 class FileDatabaseHelper(object):
    284     """A helper class to setup a SQLite database backed by a file.
    285 
    286     Note that initializing a file database takes significantly longer than an
    287     in-memory database and should only be used for functional tests.
    288     """
    289 
    290     DB_FILE = os.path.join(common.autotest_dir, 'host_scheduler_db')
    291 
    292     def initialize_database_for_testing(self, db_file_path=None):
    293         """Initialize a SQLite database for testing.
    294 
    295         To force monitor_db and the host_scheduler to use the same SQLite file
    296         database, call this method before initializing the database through
    297         frontend_test_utils. The host_scheduler is setup to look for the
    298         host_scheduler_db when invoked with --testing.
    299 
    300         @param db_file_path: The name of the file to use to create
    301             a SQLite database. Since this database is shared across different
    302             processes using a file is closer to the real world.
    303         """
    304         if not db_file_path:
    305             db_file_path = self.DB_FILE
    306         # TODO: Move the translating database elsewhere. Monitor_db circular
    307         # imports host_scheduler.
    308         from autotest_lib.frontend import setup_test_environment
    309         from django.conf import settings
    310         self.old_django_db_name = settings.DATABASES['default']['NAME']
    311         settings.DATABASES['default']['NAME'] = db_file_path
    312         self.db_file_path = db_file_path
    313         _db_manager = scheduler_lib.ConnectionManager(autocommit=False)
    314         _db_manager.db_connection = (
    315                 database_connection.TranslatingDatabase.get_test_database(
    316                 translators=scheduler_lib._DB_TRANSLATORS))
    317 
    318 
    319     def teardown_file_database(self):
    320         """Teardown django database settings."""
    321         # TODO: Move the translating database elsewhere. Monitor_db circular
    322         # imports host_scheduler.
    323         from django.conf import settings
    324         settings.DATABASES['default']['NAME'] = self.old_django_db_name
    325         try:
    326             os.remove(self.db_file_path)
    327         except (OSError, AttributeError):
    328             pass
    329 
    330 
    331 class AbstractBaseRDBTester(frontend_test_utils.FrontendTestMixin):
    332 
    333     __meta__ = abc.ABCMeta
    334     _config_section = 'AUTOTEST_WEB'
    335 
    336 
    337     @staticmethod
    338     def get_request(dep_names, acl_names, priority=0, parent_job_id=0):
    339         deps = [dep.id for dep in DBHelper.get_labels(name__in=dep_names)]
    340         acls = [acl.id for acl in DBHelper.get_acls(name__in=acl_names)]
    341         return rdb_requests.AcquireHostRequest(
    342                         deps=deps, acls=acls, host_id=None, priority=priority,
    343                         parent_job_id=parent_job_id)._request
    344 
    345 
    346     def _release_unused_hosts(self):
    347         """Release all hosts unused by an active hqe. """
    348         self.host_scheduler.tick()
    349 
    350 
    351     def setUp(self, inline_host_acquisition=True, setup_tables=True):
    352         """Common setup module for tests that need a jobs/host database.
    353 
    354         @param inline_host_acquisition: If True, the dispatcher tries to acquire
    355             hosts inline with the rest of the tick.
    356         """
    357         self.db_helper = DBHelper()
    358         self._database = self.db_helper.database
    359         # Runs syncdb setting up initial database conditions
    360         self._frontend_common_setup(setup_tables=setup_tables)
    361         connection_manager = scheduler_lib.ConnectionManager(autocommit=False)
    362         self.god.stub_with(connection_manager, 'db_connection', self._database)
    363         self.god.stub_with(monitor_db, '_db_manager', connection_manager)
    364         self.god.stub_with(scheduler_models, '_db', self._database)
    365         self.god.stub_with(monitor_db, '_inline_host_acquisition',
    366                            inline_host_acquisition)
    367         self._dispatcher = monitor_db.Dispatcher()
    368         self.host_scheduler = self._dispatcher._host_scheduler
    369         self.host_query_manager = query_managers.AFEHostQueryManager()
    370         self.job_query_manager = self._dispatcher._job_query_manager
    371         self._release_unused_hosts()
    372 
    373 
    374     def tearDown(self):
    375         self.god.unstub_all()
    376         self._database.disconnect()
    377         self._frontend_common_teardown()
    378 
    379 
    380     def create_job(self, user='autotest_system',
    381                    deps=set([]), acls=set([]), hostless_job=False,
    382                    priority=0, parent_job_id=None, shard_hostname=None):
    383         """Create a job owned by user, with the deps and acls specified.
    384 
    385         This method is a wrapper around frontend_test_utils.create_job, that
    386         also takes care of creating the appropriate deps for a job, and the
    387         appropriate acls for the given user.
    388 
    389         @raises ValueError: If no deps are specified for a job, since all jobs
    390             need at least the metahost.
    391         @raises AssertionError: If no hqe was created for the job.
    392 
    393         @return: An instance of the job model associated with the new job.
    394         """
    395         # This is a slight hack around the implementation of
    396         # scheduler_models.is_hostless_job, even though a metahost is just
    397         # another label to the rdb.
    398         if not deps:
    399             raise ValueError('Need at least one dep for metahost')
    400 
    401         # TODO: This is a hack around the fact that frontend_test_utils still
    402         # need a metahost, but metahost is treated like any other label.
    403         metahost = self.db_helper.create_label(list(deps)[0])
    404         job = self._create_job(metahosts=[metahost.id], priority=priority,
    405                 owner=user, parent_job_id=parent_job_id)
    406         self.assert_(len(job.hostqueueentry_set.all()) == 1)
    407 
    408         self.db_helper.add_deps_to_job(job, dep_names=list(deps)[1:])
    409         self.db_helper.add_user_to_aclgroups(user, aclgroup_names=acls)
    410         if shard_hostname:
    411             self.db_helper.assign_job_to_shard(job.id, shard_hostname)
    412         return models.Job.objects.filter(id=job.id)[0]
    413 
    414 
    415     def assert_host_db_status(self, host_id):
    416         """Assert host state right after acquisition.
    417 
    418         Call this method to check the status of any host leased by the
    419         rdb before it has been assigned to an hqe. It must be leased and
    420         ready at this point in time.
    421 
    422         @param host_id: Id of the host to check.
    423 
    424         @raises AssertionError: If the host is either not leased or Ready.
    425         """
    426         host = models.Host.objects.get(id=host_id)
    427         self.assert_(host.leased)
    428         self.assert_(host.status == 'Ready')
    429 
    430 
    431     def check_hosts(self, host_iter):
    432         """Sanity check all hosts in the host_gen.
    433 
    434         @param host_iter: A generator/iterator of RDBClientHostWrappers.
    435             eg: The generator returned by rdb_lib.acquire_hosts. If a request
    436             was not satisfied this iterator can contain None.
    437 
    438         @raises AssertionError: If any of the sanity checks fail.
    439         """
    440         for host in host_iter:
    441             if host:
    442                 self.assert_host_db_status(host.id)
    443                 self.assert_(host.leased == 1)
    444 
    445 
    446     def create_suite(self, user='autotest_system', num=2, priority=0,
    447                      board='z', build='x', acls=set()):
    448         """Create num jobs with the same parent_job_id, board, build, priority.
    449 
    450         @return: A dictionary with the parent job object keyed as 'parent_job'
    451             and all other jobs keyed at an index from 0-num.
    452         """
    453         jobs = {}
    454         # Create a hostless parent job without an hqe or deps. Since the
    455         # hostless job does nothing, we need to hand craft cros-version.
    456         parent_job = self._create_job(owner=user, priority=priority)
    457         jobs['parent_job'] = parent_job
    458         build = '%s:%s' % (provision.CROS_VERSION_PREFIX, build)
    459         for job_index in range(0, num):
    460             jobs[job_index] = self.create_job(user=user, priority=priority,
    461                                               deps=set([board, build]),
    462                                               acls=acls,
    463                                               parent_job_id=parent_job.id)
    464         return jobs
    465 
    466 
    467     def check_host_assignment(self, job_id, host_id):
    468         """Check is a job<->host assignment is valid.
    469 
    470         Uses the deps of a job and the aclgroups the owner of the job is
    471         in to see if the given host can be used to run the given job. Also
    472         checks that the host-job assignment has Not been made, but that the
    473         host is no longer in the available hosts pool.
    474 
    475         Use this method to check host assignements made by the rdb, Before
    476         they're handed off to the scheduler, since the scheduler.
    477 
    478         @param job_id: The id of the job to use in the compatibility check.
    479         @param host_id: The id of the host to check for compatibility.
    480 
    481         @raises AssertionError: If the job and the host are incompatible.
    482         """
    483         job = models.Job.objects.get(id=job_id)
    484         host = models.Host.objects.get(id=host_id)
    485         hqe = job.hostqueueentry_set.all()[0]
    486 
    487         # Confirm that the host has not been assigned, either to another hqe
    488         # or the this one.
    489         all_hqes = models.HostQueueEntry.objects.filter(
    490                 host_id=host_id, complete=0)
    491         self.assert_(len(all_hqes) <= 1)
    492         self.assert_(hqe.host_id == None)
    493         self.assert_host_db_status(host_id)
    494 
    495         # Assert that all deps of the job are satisfied.
    496         job_deps = set([d.name for d in job.dependency_labels.all()])
    497         host_labels = set([l.name for l in host.labels.all()])
    498         self.assert_(job_deps.intersection(host_labels) == job_deps)
    499 
    500         # Assert that the owner of the job is in at least one of the
    501         # groups that owns the host.
    502         job_owner_aclgroups = set([job_acl.name for job_acl
    503                                    in job.user().aclgroup_set.all()])
    504         host_aclgroups = set([host_acl.name for host_acl
    505                               in host.aclgroup_set.all()])
    506         self.assert_(job_owner_aclgroups.intersection(host_aclgroups))
    507 
    508 
    509