Home | History | Annotate | Download | only in scheduler
      1 # Copyright 2017 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Library providing an API to lucifer."""
      6 
      7 import os
      8 import logging
      9 import pipes
     10 import socket
     11 import subprocess
     12 
     13 import common
     14 from autotest_lib.client.bin import local_host
     15 from autotest_lib.client.common_lib import global_config
     16 from autotest_lib.scheduler.drone_manager import PidfileId
     17 from autotest_lib.server.hosts import ssh_host
     18 from autotest_lib.frontend.afe import models
     19 
     20 _config = global_config.global_config
     21 _SECTION = 'LUCIFER'
     22 
     23 # TODO(crbug.com/748234): Move these to shadow_config.ini
     24 # See also drones.AUTOTEST_INSTALL_DIR
     25 _ENV = '/usr/bin/env'
     26 _AUTOTEST_DIR = '/usr/local/autotest'
     27 _JOB_REPORTER_PATH = os.path.join(_AUTOTEST_DIR, 'bin', 'job_reporter')
     28 
     29 logger = logging.getLogger(__name__)
     30 
     31 
     32 def is_lucifer_enabled():
     33     """Return True if lucifer is enabled in the config."""
     34     return True
     35 
     36 
     37 def is_enabled_for(level):
     38     """Return True if lucifer is enabled for the given level.
     39 
     40     @param level: string, e.g. 'PARSING', 'GATHERING'
     41     """
     42     if not is_lucifer_enabled():
     43         return False
     44     config_level = (_config.get_config_value(_SECTION, 'lucifer_level')
     45                     .upper())
     46     return level.upper() == config_level
     47 
     48 
     49 def is_lucifer_owned(job):
     50     """Return True if job is already sent to lucifer.
     51 
     52     @param job: frontend.afe.models.Job instance
     53     """
     54     assert isinstance(job, models.Job)
     55     return hasattr(job, 'jobhandoff')
     56 
     57 
     58 def is_lucifer_owned_by_id(job_id):
     59     """Return True if job is already sent to lucifer."""
     60     return models.JobHandoff.objects.filter(job_id=job_id).exists()
     61 
     62 
     63 def is_split_job(hqe_id):
     64     """Return True if HQE is part of a job with HQEs in a different group.
     65 
     66     For examples if the given HQE have execution_subdir=foo and the job
     67     has an HQE with execution_subdir=bar, then return True.  The only
     68     situation where this happens is if provisioning in a multi-DUT job
     69     fails, the HQEs will each be in their own group.
     70 
     71     See https://bugs.chromium.org/p/chromium/issues/detail?id=811877
     72 
     73     @param hqe_id: HQE id
     74     """
     75     hqe = models.HostQueueEntry.objects.get(id=hqe_id)
     76     hqes = hqe.job.hostqueueentry_set.all()
     77     try:
     78         _get_consistent_execution_path(hqes)
     79     except ExecutionPathError:
     80         return True
     81     return False
     82 
     83 
     84 # TODO(crbug.com/748234): This is temporary to enable toggling
     85 # lucifer rollouts with an option.
     86 def spawn_starting_job_handler(manager, job):
     87     """Spawn job_reporter to handle a job.
     88 
     89     Pass all arguments by keyword.
     90 
     91     @param manager: scheduler.drone_manager.DroneManager instance
     92     @param job: Job instance
     93     @returns: Drone instance
     94     """
     95     manager = _DroneManager(manager)
     96     drone = manager.pick_drone_to_use()
     97     results_dir = _results_dir(manager, job)
     98     args = [
     99             _JOB_REPORTER_PATH,
    100 
    101             # General configuration
    102             '--jobdir', _get_jobdir(),
    103             '--lucifer-path', _get_lucifer_path(),
    104 
    105             # Job specific
    106             '--lucifer-level', 'STARTING',
    107             '--job-id', str(job.id),
    108             '--results-dir', results_dir,
    109 
    110             # STARTING specific
    111             '--execution-tag', _working_directory(job),
    112     ]
    113     if _get_gcp_creds():
    114         args = [
    115                 'GOOGLE_APPLICATION_CREDENTIALS=%s'
    116                 % pipes.quote(_get_gcp_creds()),
    117         ] + args
    118     drone.spawn(_ENV, args,
    119                 output_file=_prepare_output_file(drone, results_dir))
    120     drone.add_active_processes(1)
    121     manager.reorder_drone_queue()
    122     manager.register_pidfile_processes(
    123             os.path.join(results_dir, '.autoserv_execute'), 1)
    124     return drone
    125 
    126 
    127 # TODO(crbug.com/748234): This is temporary to enable toggling
    128 # lucifer rollouts with an option.
    129 def spawn_parsing_job_handler(manager, job, autoserv_exit, pidfile_id=None):
    130     """Spawn job_reporter to handle a job.
    131 
    132     Pass all arguments by keyword.
    133 
    134     @param manager: scheduler.drone_manager.DroneManager instance
    135     @param job: Job instance
    136     @param autoserv_exit: autoserv exit status
    137     @param pidfile_id: PidfileId instance
    138     @returns: Drone instance
    139     """
    140     manager = _DroneManager(manager)
    141     if pidfile_id is None:
    142         drone = manager.pick_drone_to_use()
    143     else:
    144         drone = manager.get_drone_for_pidfile(pidfile_id)
    145     results_dir = _results_dir(manager, job)
    146     args = [
    147             _JOB_REPORTER_PATH,
    148 
    149             # General configuration
    150             '--jobdir', _get_jobdir(),
    151             '--lucifer-path', _get_lucifer_path(),
    152 
    153             # Job specific
    154             '--job-id', str(job.id),
    155             '--lucifer-level', 'STARTING',
    156             '--parsing-only',
    157             '--results-dir', results_dir,
    158     ]
    159     if _get_gcp_creds():
    160         args = [
    161                 'GOOGLE_APPLICATION_CREDENTIALS=%s'
    162                 % pipes.quote(_get_gcp_creds()),
    163         ] + args
    164     drone.spawn(_ENV, args,
    165                 output_file=_prepare_output_file(drone, results_dir))
    166     drone.add_active_processes(1)
    167     manager.reorder_drone_queue()
    168     manager.register_pidfile_processes(
    169             os.path.join(results_dir, '.autoserv_execute'), 1)
    170     return drone
    171 
    172 
    173 _LUCIFER_DIR = 'lucifer'
    174 
    175 
    176 def _prepare_output_file(drone, results_dir):
    177     logdir = os.path.join(results_dir, _LUCIFER_DIR)
    178     drone.run('mkdir', ['-p', logdir])
    179     return os.path.join(logdir, 'job_reporter_output.log')
    180 
    181 
    182 def _get_jobdir():
    183     return _config.get_config_value(_SECTION, 'jobdir')
    184 
    185 
    186 def _get_lucifer_path():
    187     return os.path.join(_get_binaries_path(), 'lucifer')
    188 
    189 
    190 def _get_binaries_path():
    191     """Get binaries dir path from config.."""
    192     return _config.get_config_value(_SECTION, 'binaries_path')
    193 
    194 
    195 def _get_gcp_creds():
    196     """Return path to GCP service account credentials.
    197 
    198     This is the empty string by default, if no credentials will be used.
    199     """
    200     return _config.get_config_value(_SECTION, 'gcp_creds', default='')
    201 
    202 
    203 class _DroneManager(object):
    204     """Simplified drone API."""
    205 
    206     def __init__(self, old_manager):
    207         """Initialize instance.
    208 
    209         @param old_manager: old style DroneManager
    210         """
    211         self._manager = old_manager
    212 
    213     def get_num_tests_failed(self, pidfile_id):
    214         """Return the number of tests failed for autoserv by pidfile.
    215 
    216         @param pidfile_id: PidfileId instance.
    217         @returns: int (-1 if missing)
    218         """
    219         state = self._manager.get_pidfile_contents(pidfile_id)
    220         if state.num_tests_failed is None:
    221             return -1
    222         return state.num_tests_failed
    223 
    224     def get_drone_for_pidfile(self, pidfile_id):
    225         """Return a drone to use from a pidfile.
    226 
    227         @param pidfile_id: PidfileId instance.
    228         """
    229         return _wrap_drone(self._manager.get_drone_for_pidfile_id(pidfile_id))
    230 
    231     def pick_drone_to_use(self, num_processes=1):
    232         """Return a drone to use.
    233 
    234         Various options can be passed to optimize drone selection.
    235 
    236         @param num_processes: number of processes the drone is intended
    237             to run
    238         """
    239         old_drone = self._manager.pick_drone_to_use(
    240                 num_processes=num_processes,
    241         )
    242         return _wrap_drone(old_drone)
    243 
    244     def absolute_path(self, path):
    245         """Return absolute path for drone results.
    246 
    247         The returned path might be remote.
    248         """
    249         return self._manager.absolute_path(path)
    250 
    251     def register_pidfile_processes(self, path, count):
    252         """Register a pidfile with the given number of processes.
    253 
    254         This should be done to allow the drone manager to check the
    255         number of processes still alive.  This may be used to select
    256         drones based on the number of active processes as a proxy for
    257         load.
    258 
    259         The exact semantics depends on the drone manager implementation;
    260         implementation specific comments follow:
    261 
    262         Pidfiles are kept in memory to track process count.  Pidfiles
    263         are rediscovered when the scheduler restarts.  Thus, errors in
    264         pidfile tracking can be fixed by restarting the scheduler.xo
    265         """
    266         pidfile_id = PidfileId(path)
    267         self._manager.register_pidfile(pidfile_id)
    268         self._manager._registered_pidfile_info[pidfile_id].num_processes = count
    269 
    270     def reorder_drone_queue(self):
    271         """Reorder drone queue according to modified process counts.
    272 
    273         Call this after Drone.add_active_processes().
    274         """
    275         self._manager.reorder_drone_queue()
    276 
    277 
    278 def _wrap_drone(old_drone):
    279     """Wrap an old style drone."""
    280     host = old_drone._host
    281     if isinstance(host, local_host.LocalHost):
    282         return LocalDrone()
    283     elif isinstance(host, ssh_host.SSHHost):
    284         return RemoteDrone(old_drone)
    285     else:
    286         raise TypeError('Drone has an unknown host type')
    287 
    288 
    289 def _results_dir(manager, job):
    290     """Return results dir for a job.
    291 
    292     Path may be on a remote host.
    293     """
    294     return manager.absolute_path(_working_directory(job))
    295 
    296 
    297 def _working_directory(job):
    298     return _get_consistent_execution_path(job.hostqueueentry_set.all())
    299 
    300 
    301 def _get_consistent_execution_path(execution_entries):
    302     first_execution_path = execution_entries[0].execution_path()
    303     for execution_entry in execution_entries[1:]:
    304         if execution_entry.execution_path() != first_execution_path:
    305             raise ExecutionPathError(
    306                     '%s (%s) != %s (%s)'
    307                     % (execution_entry.execution_path(),
    308                        execution_entry,
    309                        first_execution_path,
    310                        execution_entries[0]))
    311     return first_execution_path
    312 
    313 
    314 class ExecutionPathError(Exception):
    315     """Raised by _get_consistent_execution_path()."""
    316 
    317 
    318 class Drone(object):
    319     """Simplified drone API."""
    320 
    321     def hostname(self):
    322         """Return the hostname of the drone."""
    323 
    324     def run(self, path, args):
    325         """Run a command synchronously.
    326 
    327         path must be an absolute path.  path may be on a remote machine.
    328         args is a list of arguments.
    329 
    330         The process may or may not have its own session.  The process
    331         should be short-lived.  It should not try to obtain a
    332         controlling terminal.
    333 
    334         The new process will have stdin, stdout, and stderr opened to
    335         /dev/null.
    336 
    337         This method intentionally has a very restrictive API.  It should
    338         be used to perform setup local to the drone, when the drone may
    339         be a remote machine.
    340         """
    341 
    342     def spawn(self, path, args, output_file):
    343         """Spawn an independent process.
    344 
    345         path must be an absolute path.  path may be on a remote machine.
    346         args is a list of arguments.
    347 
    348         The process is spawned in its own session.  It should not try to
    349         obtain a controlling terminal.
    350 
    351         The new process will have stdin opened to /dev/null and stdout,
    352         stderr opened to output_file.
    353 
    354         output_file is a pathname, but how it is interpreted is
    355         implementation defined, e.g., it may be a remote file.
    356         """
    357 
    358     def add_active_processes(self, count):
    359         """Track additional number of active processes.
    360 
    361         This may be used to select drones based on the number of active
    362         processes as a proxy for load.
    363 
    364         _DroneManager.register_pidfile_processes() and
    365         _DroneManager.reorder_drone_queue() should also be called.
    366 
    367         The exact semantics depends on the drone manager implementation;
    368         implementation specific comments follow:
    369 
    370         Process count is used as a proxy for workload, and one process
    371         equals the workload of one autoserv or one job.  This count is
    372         recalculated during each scheduler tick, using pidfiles tracked
    373         by the drone manager (so the count added by this function only
    374         applies for one tick).
    375         """
    376 
    377 
    378 class LocalDrone(Drone):
    379     """Local implementation of Drone."""
    380 
    381     def hostname(self):
    382         return socket.gethostname()
    383 
    384     def run(self, path, args):
    385         with open(os.devnull, 'r+b') as null:
    386             subprocess.call([path] + args, stdin=null,
    387                             stdout=null, stderr=null)
    388 
    389     def spawn(self, path, args, output_file):
    390         _spawn(path, [path] + args, output_file)
    391 
    392 
    393 class RemoteDrone(Drone):
    394     """Remote implementation of Drone through SSH."""
    395 
    396     def __init__(self, drone):
    397         host = drone._host
    398         if not isinstance(host, ssh_host.SSHHost):
    399             raise TypeError('RemoteDrone must be passed a drone with SSHHost')
    400         self._drone = drone
    401         self._host = drone._host
    402 
    403     def hostname(self):
    404         return self._host.hostname
    405 
    406     def run(self, path, args):
    407         cmd_parts = [path] + args
    408         safe_cmd = ' '.join(pipes.quote(part) for part in cmd_parts)
    409         self._host.run('%(cmd)s <%(null)s >%(null)s 2>&1'
    410                        % {'cmd': safe_cmd, 'null': os.devnull})
    411 
    412     def spawn(self, path, args, output_file):
    413         cmd_parts = [path] + args
    414         safe_cmd = ' '.join(pipes.quote(part) for part in cmd_parts)
    415         safe_file = pipes.quote(output_file)
    416         # SSH creates a session for each command, so we do not have to
    417         # do it.
    418         self._host.run('%(cmd)s <%(null)s >>%(file)s 2>&1 &'
    419                        % {'cmd': safe_cmd,
    420                           'file': safe_file,
    421                           'null': os.devnull})
    422 
    423     def add_active_processes(self, count):
    424         self._drone.active_processes += count
    425 
    426 
    427 def _spawn(path, argv, output_file):
    428     """Spawn a new process in its own session.
    429 
    430     path must be an absolute path.  The first item in argv should be
    431     path.
    432 
    433     In the calling process, this function returns on success.
    434     The forked process puts itself in its own session and execs.
    435 
    436     The new process will have stdin opened to /dev/null and stdout,
    437     stderr opened to output_file.
    438     """
    439     logger.info('Spawning %r, %r, %r', path, argv, output_file)
    440     assert all(isinstance(arg, basestring) for arg in argv)
    441     pid = os.fork()
    442     if pid:
    443         os.waitpid(pid, 0)
    444         return
    445     # Double fork to reparent to init since monitor_db does not reap.
    446     if os.fork():
    447         os._exit(os.EX_OK)
    448     os.setsid()
    449     null_fd = os.open(os.devnull, os.O_RDONLY)
    450     os.dup2(null_fd, 0)
    451     os.close(null_fd)
    452     out_fd = os.open(output_file, os.O_WRONLY | os.O_APPEND | os.O_CREAT)
    453     os.dup2(out_fd, 1)
    454     os.dup2(out_fd, 2)
    455     os.close(out_fd)
    456     os.execv(path, argv)
    457