Home | History | Annotate | Download | only in scheduler
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """This module manages translation between monitor_db and the rdb. """
      6 
      7 import common
      8 from autotest_lib.scheduler import rdb
      9 from autotest_lib.scheduler import rdb_hosts
     10 from autotest_lib.scheduler import rdb_requests
     11 from autotest_lib.server.cros import provision
     12 
     13 
     14 # Adapters for scheduler specific objects: Convert job information to a
     15 # format more ameanable to the rdb/rdb request managers.
     16 class JobQueryManager(object):
     17     """A caching query manager for all job related information."""
     18     def __init__(self, queue_entries, suite_min_duts=None):
     19         """Initialize.
     20 
     21         @param queue_entries: A list of HostQueueEntry objects.
     22         @param suite_min_duts: A dictionary where the key is suite job id,
     23                 and the value is the value of 'suite_min_dut' in the suite's
     24                 job keyvals. It should cover all the suite jobs which
     25                 the jobs (associated with the queue_entries) belong to.
     26         """
     27         # TODO(beeps): Break this dependency on the host_query_manager,
     28         # crbug.com/336934.
     29         from autotest_lib.scheduler import query_managers
     30         self.query_manager = query_managers.AFEHostQueryManager()
     31         jobs = [queue_entry.job_id for queue_entry in queue_entries]
     32         self._job_acls = self.query_manager._get_job_acl_groups(jobs)
     33         self._job_deps = self.query_manager._get_job_dependencies(jobs)
     34         self._labels = self.query_manager._get_labels(self._job_deps)
     35         self._suite_min_duts = suite_min_duts or {}
     36 
     37 
     38     def get_job_info(self, queue_entry):
     39         """Extract job information from a queue_entry/host-scheduler.
     40 
     41         @param queue_entry: The queue_entry for which we need job information.
     42 
     43         @return: A dictionary representing job related information.
     44         """
     45         job_id = queue_entry.job_id
     46         job_deps, job_preferred_deps = [], []
     47         for dep in self._job_deps.get(job_id, []):
     48             if not provision.is_for_special_action(self._labels[dep].name):
     49                 job_deps.append(dep)
     50             elif provision.Provision.acts_on(self._labels[dep].name):
     51                 job_preferred_deps.append(dep)
     52 
     53         job_acls = self._job_acls.get(job_id, [])
     54         parent_id = queue_entry.job.parent_job_id
     55         min_duts = self._suite_min_duts.get(parent_id, 0) if parent_id else 0
     56 
     57         return {'deps': job_deps, 'acls': job_acls,
     58                 'preferred_deps': job_preferred_deps,
     59                 'host_id': queue_entry.host_id,
     60                 'parent_job_id': queue_entry.job.parent_job_id,
     61                 'priority': queue_entry.job.priority,
     62                 'suite_min_duts': min_duts}
     63 
     64 
     65 def acquire_hosts(queue_entries, suite_min_duts=None):
     66     """Acquire hosts for the list of queue_entries.
     67 
     68     The act of acquisition involves leasing a host from the rdb.
     69 
     70     @param queue_entries: A list of queue_entries that need hosts.
     71     @param suite_min_duts: A dictionary that maps suite job id to the minimum
     72                            number of duts required.
     73 
     74     @yield: An rdb_hosts.RDBClientHostWrapper for each host acquired on behalf
     75         of a queue_entry, or None if a host wasn't found.
     76 
     77     @raises RDBException: If something goes wrong making the request.
     78     """
     79     job_query_manager = JobQueryManager(queue_entries, suite_min_duts)
     80     request_manager = rdb_requests.BaseHostRequestManager(
     81             rdb_requests.AcquireHostRequest, rdb.rdb_host_request_dispatcher)
     82     for entry in queue_entries:
     83         request_manager.add_request(**job_query_manager.get_job_info(entry))
     84 
     85     for host in request_manager.response():
     86         yield (rdb_hosts.RDBClientHostWrapper(**host)
     87                if host else None)
     88 
     89 
     90 def get_hosts(host_ids):
     91     """Get information about the hosts with ids in host_ids.
     92 
     93     get_hosts is different from acquire_hosts in that it is completely
     94     oblivious to the leased state of a host.
     95 
     96     @param host_ids: A list of host_ids.
     97 
     98     @return: A list of rdb_hosts.RDBClientHostWrapper objects.
     99 
    100     @raises RDBException: If something goes wrong in making the request.
    101     """
    102     request_manager = rdb_requests.BaseHostRequestManager(
    103             rdb_requests.HostRequest, rdb.get_hosts)
    104     for host_id in host_ids:
    105         request_manager.add_request(host_id=host_id)
    106 
    107     hosts = []
    108     for host in request_manager.response():
    109         hosts.append(rdb_hosts.RDBClientHostWrapper(**host)
    110                      if host else None)
    111     return hosts
    112