Home | History | Annotate | Download | only in scheduler
      1 # Copyright 2017 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Library providing an API to lucifer."""
      6 
      7 import os
      8 import logging
      9 import pipes
     10 import socket
     11 
     12 import common
     13 from autotest_lib.client.bin import local_host
     14 from autotest_lib.client.common_lib import global_config
     15 from autotest_lib.server.hosts import ssh_host
     16 from autotest_lib.frontend.afe import models
     17 
     18 _config = global_config.global_config
     19 _SECTION = 'LUCIFER'
     20 
     21 # TODO(crbug.com/748234): Move these to shadow_config.ini
     22 # See also drones.AUTOTEST_INSTALL_DIR
     23 _ENV = '/usr/bin/env'
     24 _AUTOTEST_DIR = '/usr/local/autotest'
     25 _JOB_REPORTER_PATH = os.path.join(_AUTOTEST_DIR, 'bin', 'job_reporter')
     26 
     27 logger = logging.getLogger(__name__)
     28 
     29 
     30 def is_lucifer_enabled():
     31     """Return True if lucifer is enabled in the config."""
     32     return True
     33 
     34 
     35 def is_enabled_for(level):
     36     """Return True if lucifer is enabled for the given level.
     37 
     38     @param level: string, e.g. 'PARSING', 'GATHERING'
     39     """
     40     if not is_lucifer_enabled():
     41         return False
     42     config_level = (_config.get_config_value(_SECTION, 'lucifer_level')
     43                     .upper())
     44     return level.upper() == config_level
     45 
     46 
     47 def is_lucifer_owned(job):
     48     """Return True if job is already sent to lucifer."""
     49     return hasattr(job, 'jobhandoff')
     50 
     51 
     52 def is_split_job(hqe_id):
     53     """Return True if HQE is part of a job with HQEs in a different group.
     54 
     55     For examples if the given HQE have execution_subdir=foo and the job
     56     has an HQE with execution_subdir=bar, then return True.  The only
     57     situation where this happens is if provisioning in a multi-DUT job
     58     fails, the HQEs will each be in their own group.
     59 
     60     See https://bugs.chromium.org/p/chromium/issues/detail?id=811877
     61 
     62     @param hqe_id: HQE id
     63     """
     64     hqe = models.HostQueueEntry.objects.get(id=hqe_id)
     65     hqes = hqe.job.hostqueueentry_set.all()
     66     try:
     67         _get_consistent_execution_path(hqes)
     68     except _ExecutionPathError:
     69         return True
     70     return False
     71 
     72 
     73 # TODO(crbug.com/748234): This is temporary to enable toggling
     74 # lucifer rollouts with an option.
     75 def spawn_starting_job_handler(manager, job):
     76     """Spawn job_reporter to handle a job.
     77 
     78     Pass all arguments by keyword.
     79 
     80     @param manager: scheduler.drone_manager.DroneManager instance
     81     @param job: Job instance
     82     @returns: Drone instance
     83     """
     84     raise NotImplementedError
     85 
     86 
     87 # TODO(crbug.com/748234): This is temporary to enable toggling
     88 # lucifer rollouts with an option.
     89 def spawn_gathering_job_handler(manager, job, autoserv_exit, pidfile_id=None):
     90     """Spawn job_reporter to handle a job.
     91 
     92     Pass all arguments by keyword.
     93 
     94     @param manager: scheduler.drone_manager.DroneManager instance
     95     @param job: Job instance
     96     @param autoserv_exit: autoserv exit status
     97     @param pidfile_id: PidfileId instance
     98     @returns: Drone instance
     99     """
    100     manager = _DroneManager(manager)
    101     if pidfile_id is None:
    102         drone = manager.pick_drone_to_use()
    103     else:
    104         drone = manager.get_drone_for_pidfile(pidfile_id)
    105     results_dir = _results_dir(manager, job)
    106     num_tests_failed = manager.get_num_tests_failed(pidfile_id)
    107     args = [
    108             _JOB_REPORTER_PATH,
    109 
    110             # General configuration
    111             '--jobdir', _get_jobdir(),
    112             '--run-job-path', _get_run_job_path(),
    113             '--watcher-path', _get_watcher_path(),
    114 
    115             # Job specific
    116             '--job-id', str(job.id),
    117             '--lucifer-level', 'GATHERING',
    118             '--autoserv-exit', str(autoserv_exit),
    119             '--need-gather',
    120             '--num-tests-failed', str(num_tests_failed),
    121             '--results-dir', results_dir,
    122     ]
    123     if _get_gcp_creds():
    124         args = [
    125                 'GOOGLE_APPLICATION_CREDENTIALS=%s'
    126                 % pipes.quote(_get_gcp_creds()),
    127         ] + args
    128     output_file = os.path.join(results_dir, 'job_reporter_output.log')
    129     drone.spawn(_ENV, args, output_file=output_file)
    130     return drone
    131 
    132 
    133 # TODO(crbug.com/748234): This is temporary to enable toggling
    134 # lucifer rollouts with an option.
    135 def spawn_parsing_job_handler(manager, job, autoserv_exit, pidfile_id=None):
    136     """Spawn job_reporter to handle a job.
    137 
    138     Pass all arguments by keyword.
    139 
    140     @param manager: scheduler.drone_manager.DroneManager instance
    141     @param job: Job instance
    142     @param autoserv_exit: autoserv exit status
    143     @param pidfile_id: PidfileId instance
    144     @returns: Drone instance
    145     """
    146     manager = _DroneManager(manager)
    147     if pidfile_id is None:
    148         drone = manager.pick_drone_to_use()
    149     else:
    150         drone = manager.get_drone_for_pidfile(pidfile_id)
    151     results_dir = _results_dir(manager, job)
    152     args = [
    153             _JOB_REPORTER_PATH,
    154 
    155             # General configuration
    156             '--jobdir', _get_jobdir(),
    157             '--run-job-path', _get_run_job_path(),
    158             '--watcher-path', _get_watcher_path(),
    159 
    160             # Job specific
    161             '--job-id', str(job.id),
    162             '--lucifer-level', 'GATHERING',
    163             '--autoserv-exit', str(autoserv_exit),
    164             '--results-dir', results_dir,
    165     ]
    166     if _get_gcp_creds():
    167         args = [
    168                 'GOOGLE_APPLICATION_CREDENTIALS=%s'
    169                 % pipes.quote(_get_gcp_creds()),
    170         ] + args
    171     output_file = os.path.join(results_dir, 'job_reporter_output.log')
    172     drone.spawn(_ENV, args, output_file=output_file)
    173     return drone
    174 
    175 
    176 def _get_jobdir():
    177     return _config.get_config_value(_SECTION, 'jobdir')
    178 
    179 
    180 def _get_run_job_path():
    181     return os.path.join(_get_binaries_path(), 'lucifer_run_job')
    182 
    183 
    184 def _get_watcher_path():
    185     return os.path.join(_get_binaries_path(), 'lucifer_watcher')
    186 
    187 
    188 def _get_binaries_path():
    189     """Get binaries dir path from config.."""
    190     return _config.get_config_value(_SECTION, 'binaries_path')
    191 
    192 
    193 def _get_gcp_creds():
    194   """Return path to GCP service account credentials.
    195 
    196   This is the empty string by default, if no credentials will be used.
    197   """
    198   return _config.get_config_value(_SECTION, 'gcp_creds', default='')
    199 
    200 
    201 class _DroneManager(object):
    202     """Simplified drone API."""
    203 
    204     def __init__(self, old_manager):
    205         """Initialize instance.
    206 
    207         @param old_manager: old style DroneManager
    208         """
    209         self._manager = old_manager
    210 
    211     def get_num_tests_failed(self, pidfile_id):
    212         """Return the number of tests failed for autoserv by pidfile.
    213 
    214         @param pidfile_id: PidfileId instance.
    215         @returns: int (-1 if missing)
    216         """
    217         state = self._manager.get_pidfile_contents(pidfile_id)
    218         if state.num_tests_failed is None:
    219             return -1
    220         return state.num_tests_failed
    221 
    222     def get_drone_for_pidfile(self, pidfile_id):
    223         """Return a drone to use from a pidfile.
    224 
    225         @param pidfile_id: PidfileId instance.
    226         """
    227         return _wrap_drone(self._manager.get_drone_for_pidfile_id(pidfile_id))
    228 
    229     def pick_drone_to_use(self, num_processes=1, prefer_ssp=False):
    230         """Return a drone to use.
    231 
    232         Various options can be passed to optimize drone selection.
    233 
    234         @param num_processes: number of processes the drone is intended
    235             to run
    236         @param prefer_ssp: indicates whether drones supporting
    237             server-side packaging should be preferred.  The returned
    238             drone is not guaranteed to support it.
    239         """
    240         old_drone = self._manager.pick_drone_to_use(
    241                 num_processes=num_processes,
    242                 prefer_ssp=prefer_ssp,
    243         )
    244         return _wrap_drone(old_drone)
    245 
    246     def absolute_path(self, path):
    247         """Return absolute path for drone results.
    248 
    249         The returned path might be remote.
    250         """
    251         return self._manager.absolute_path(path)
    252 
    253 
    254 def _wrap_drone(old_drone):
    255     """Wrap an old style drone."""
    256     host = old_drone._host
    257     if isinstance(host, local_host.LocalHost):
    258         return LocalDrone()
    259     elif isinstance(host, ssh_host.SSHHost):
    260         return RemoteDrone(host)
    261     else:
    262         raise TypeError('Drone has an unknown host type')
    263 
    264 
    265 def _results_dir(manager, job):
    266     """Return results dir for a job.
    267 
    268     Path may be on a remote host.
    269     """
    270     return manager.absolute_path(_working_directory(job))
    271 
    272 
    273 def _working_directory(job):
    274     return _get_consistent_execution_path(job.hostqueueentry_set.all())
    275 
    276 
    277 def _get_consistent_execution_path(execution_entries):
    278     first_execution_path = execution_entries[0].execution_path()
    279     for execution_entry in execution_entries[1:]:
    280         if execution_entry.execution_path() != first_execution_path:
    281             raise _ExecutionPathError(
    282                     '%s (%s) != %s (%s)'
    283                     % (execution_entry.execution_path(),
    284                        execution_entry,
    285                        first_execution_path,
    286                        execution_entries[0]))
    287     return first_execution_path
    288 
    289 
    290 class _ExecutionPathError(Exception):
    291     """Raised by _get_consistent_execution_path()."""
    292 
    293 
    294 class Drone(object):
    295     """Simplified drone API."""
    296 
    297     def hostname(self):
    298         """Return the hostname of the drone."""
    299 
    300     def spawn(self, path, args, output_file):
    301         """Spawn an independent process.
    302 
    303         path must be an absolute path.  path may be on a remote machine.
    304         args is a list of arguments.
    305 
    306         The process is spawned in its own session.  It should not try to
    307         obtain a controlling terminal.
    308 
    309         The new process will have stdin opened to /dev/null and stdout,
    310         stderr opened to output_file.
    311 
    312         output_file is a pathname, but how it is interpreted is
    313         implementation defined, e.g., it may be a remote file.
    314         """
    315 
    316 
    317 class LocalDrone(Drone):
    318     """Local implementation of Drone."""
    319 
    320     def hostname(self):
    321         return socket.gethostname()
    322 
    323     def spawn(self, path, args, output_file):
    324         _spawn(path, [path] + args, output_file)
    325 
    326 
    327 class RemoteDrone(Drone):
    328     """Remote implementation of Drone through SSH."""
    329 
    330     def __init__(self, host):
    331         if not isinstance(host, ssh_host.SSHHost):
    332             raise TypeError('RemoteDrone must be passed an SSHHost')
    333         self._host = host
    334 
    335     def hostname(self):
    336         return self._host.hostname
    337 
    338     def spawn(self, path, args, output_file):
    339         cmd_parts = [path] + args
    340         safe_cmd = ' '.join(pipes.quote(part) for part in cmd_parts)
    341         safe_file = pipes.quote(output_file)
    342         # SSH creates a session for each command, so we do not have to
    343         # do it.
    344         self._host.run('%(cmd)s <%(null)s >>%(file)s 2>&1 &'
    345                        % {'cmd': safe_cmd,
    346                           'file': safe_file,
    347                           'null': os.devnull})
    348 
    349 
    350 def _spawn(path, argv, output_file):
    351     """Spawn a new process in its own session.
    352 
    353     path must be an absolute path.  The first item in argv should be
    354     path.
    355 
    356     In the calling process, this function returns on success.
    357     The forked process puts itself in its own session and execs.
    358 
    359     The new process will have stdin opened to /dev/null and stdout,
    360     stderr opened to output_file.
    361     """
    362     logger.info('Spawning %r, %r, %r', path, argv, output_file)
    363     assert all(isinstance(arg, basestring) for arg in argv)
    364     pid = os.fork()
    365     if pid:
    366         os.waitpid(pid, 0)
    367         return
    368     # Double fork to reparent to init since monitor_db does not reap.
    369     if os.fork():
    370         os._exit(os.EX_OK)
    371     os.setsid()
    372     null_fd = os.open(os.devnull, os.O_RDONLY)
    373     os.dup2(null_fd, 0)
    374     os.close(null_fd)
    375     out_fd = os.open(output_file, os.O_WRONLY | os.O_APPEND | os.O_CREAT)
    376     os.dup2(out_fd, 1)
    377     os.dup2(out_fd, 2)
    378     os.close(out_fd)
    379     os.execv(path, argv)
    380