Home | History | Annotate | Download | only in scheduler
      1 import heapq
      2 import os
      3 import logging
      4 
      5 import common
      6 from autotest_lib.client.common_lib import error
      7 from autotest_lib.client.common_lib import global_config
      8 from autotest_lib.client.common_lib import utils
      9 from autotest_lib.scheduler import drone_task_queue
     10 from autotest_lib.scheduler import drone_utility
     11 from autotest_lib.scheduler import drones
     12 from autotest_lib.scheduler import scheduler_config
     13 from autotest_lib.scheduler import thread_lib
     14 
     15 try:
     16     from chromite.lib import metrics
     17 except ImportError:
     18     metrics = utils.metrics_mock
     19 
     20 
     21 # results on drones will be placed under the drone_installation_directory in a
     22 # directory with this name
     23 _DRONE_RESULTS_DIR_SUFFIX = 'results'
     24 
     25 WORKING_DIRECTORY = object() # see execute_command()
     26 
     27 
     28 AUTOSERV_PID_FILE = '.autoserv_execute'
     29 CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
     30 PARSER_PID_FILE = '.parser_execute'
     31 ARCHIVER_PID_FILE = '.archiver_execute'
     32 
     33 ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
     34                      ARCHIVER_PID_FILE)
     35 
     36 _THREADED_DRONE_MANAGER = global_config.global_config.get_config_value(
     37         scheduler_config.CONFIG_SECTION, 'threaded_drone_manager',
     38         type=bool, default=True)
     39 
     40 HOSTS_JOB_SUBDIR = 'hosts/'
     41 PARSE_LOG = '.parse.log'
     42 ENABLE_ARCHIVING =  global_config.global_config.get_config_value(
     43         scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool)
     44 
     45 
     46 class DroneManagerError(Exception):
     47     pass
     48 
     49 
     50 class CustomEquals(object):
     51     def _id(self):
     52         raise NotImplementedError
     53 
     54 
     55     def __eq__(self, other):
     56         if not isinstance(other, type(self)):
     57             return NotImplemented
     58         return self._id() == other._id()
     59 
     60 
     61     def __ne__(self, other):
     62         return not self == other
     63 
     64 
     65     def __hash__(self):
     66         return hash(self._id())
     67 
     68 
     69 class Process(CustomEquals):
     70     def __init__(self, hostname, pid, ppid=None):
     71         self.hostname = hostname
     72         self.pid = pid
     73         self.ppid = ppid
     74 
     75     def _id(self):
     76         return (self.hostname, self.pid)
     77 
     78 
     79     def __str__(self):
     80         return '%s/%s' % (self.hostname, self.pid)
     81 
     82 
     83     def __repr__(self):
     84         return super(Process, self).__repr__() + '<%s>' % self
     85 
     86 
     87 class PidfileId(CustomEquals):
     88     def __init__(self, path):
     89         self.path = path
     90 
     91 
     92     def _id(self):
     93         return self.path
     94 
     95 
     96     def __str__(self):
     97         return str(self.path)
     98 
     99 
    100 class _PidfileInfo(object):
    101     age = 0
    102     num_processes = None
    103 
    104 
    105 class PidfileContents(object):
    106     process = None
    107     exit_status = None
    108     num_tests_failed = None
    109 
    110     def is_invalid(self):
    111         return False
    112 
    113 
    114     def is_running(self):
    115         return self.process and not self.exit_status
    116 
    117 
    118 class InvalidPidfile(object):
    119     process = None
    120     exit_status = None
    121     num_tests_failed = None
    122 
    123 
    124     def __init__(self, error):
    125         self.error = error
    126 
    127 
    128     def is_invalid(self):
    129         return True
    130 
    131 
    132     def is_running(self):
    133         return False
    134 
    135 
    136     def __str__(self):
    137         return self.error
    138 
    139 
    140 class _DroneHeapWrapper(object):
    141     """Wrapper to compare drones based on used_capacity().
    142 
    143     These objects can be used to keep a heap of drones by capacity.
    144     """
    145     def __init__(self, drone):
    146         self.drone = drone
    147 
    148 
    149     def __cmp__(self, other):
    150         assert isinstance(other, _DroneHeapWrapper)
    151         return cmp(self.drone.used_capacity(), other.drone.used_capacity())
    152 
    153 
    154 class DroneManager(object):
    155     """
    156     This class acts as an interface from the scheduler to drones, whether it be
    157     only a single "drone" for localhost or multiple remote drones.
    158 
    159     All paths going into and out of this class are relative to the full results
    160     directory, except for those returns by absolute_path().
    161     """
    162 
    163 
    164     # Minimum time to wait before next email
    165     # about a drone hitting process limit is sent.
    166     NOTIFY_INTERVAL = 60 * 60 * 24 # one day
    167     _STATS_KEY = 'drone_manager'
    168 
    169 
    170 
    171     def __init__(self):
    172         # absolute path of base results dir
    173         self._results_dir = None
    174         # holds Process objects
    175         self._process_set = set()
    176         # holds the list of all processes running on all drones
    177         self._all_processes = {}
    178         # maps PidfileId to PidfileContents
    179         self._pidfiles = {}
    180         # same as _pidfiles
    181         self._pidfiles_second_read = {}
    182         # maps PidfileId to _PidfileInfo
    183         self._registered_pidfile_info = {}
    184         # used to generate unique temporary paths
    185         self._temporary_path_counter = 0
    186         # maps hostname to Drone object
    187         self._drones = {}
    188         self._results_drone = None
    189         # maps results dir to dict mapping file path to contents
    190         self._attached_files = {}
    191         # heapq of _DroneHeapWrappers
    192         self._drone_queue = []
    193         # A threaded task queue used to refresh drones asynchronously.
    194         if _THREADED_DRONE_MANAGER:
    195             self._refresh_task_queue = thread_lib.ThreadedTaskQueue(
    196                     name='%s.refresh_queue' % self._STATS_KEY)
    197         else:
    198             self._refresh_task_queue = drone_task_queue.DroneTaskQueue()
    199 
    200 
    201     def initialize(self, base_results_dir, drone_hostnames,
    202                    results_repository_hostname):
    203         self._results_dir = base_results_dir
    204 
    205         for hostname in drone_hostnames:
    206             self._add_drone(hostname)
    207 
    208         if not self._drones:
    209             # all drones failed to initialize
    210             raise DroneManagerError('No valid drones found')
    211 
    212         self.refresh_drone_configs()
    213 
    214         logging.info('Using results repository on %s',
    215                      results_repository_hostname)
    216         self._results_drone = drones.get_drone(results_repository_hostname)
    217         results_installation_dir = global_config.global_config.get_config_value(
    218                 scheduler_config.CONFIG_SECTION,
    219                 'results_host_installation_directory', default=None)
    220         if results_installation_dir:
    221             self._results_drone.set_autotest_install_dir(
    222                     results_installation_dir)
    223         # don't initialize() the results drone - we don't want to clear out any
    224         # directories and we don't need to kill any processes
    225 
    226 
    227     def reinitialize_drones(self):
    228         for drone in self.get_drones():
    229             with metrics.SecondsTimer(
    230                     'chromeos/autotest/drone_manager/'
    231                     'reinitialize_drones_duration',
    232                     fields={'drone': drone.hostname}):
    233                 drone.call('initialize', self._results_dir)
    234 
    235 
    236     def shutdown(self):
    237         for drone in self.get_drones():
    238             drone.shutdown()
    239 
    240 
    241     def _get_max_pidfile_refreshes(self):
    242         """
    243         Normally refresh() is called on every monitor_db.Dispatcher.tick().
    244 
    245         @returns: The number of refresh() calls before we forget a pidfile.
    246         """
    247         pidfile_timeout = global_config.global_config.get_config_value(
    248                 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
    249                 type=int, default=2000)
    250         return pidfile_timeout
    251 
    252 
    253     def _add_drone(self, hostname):
    254         """
    255         Add drone.
    256 
    257         Catches AutoservRunError if the drone fails initialization and does not
    258         add it to the list of usable drones.
    259 
    260         @param hostname: Hostname of the drone we are trying to add.
    261         """
    262         logging.info('Adding drone %s' % hostname)
    263         drone = drones.get_drone(hostname)
    264         if drone:
    265             try:
    266                 drone.call('initialize', self.absolute_path(''))
    267             except error.AutoservRunError as e:
    268                 logging.error('Failed to initialize drone %s with error: %s',
    269                               hostname, e)
    270                 return
    271             self._drones[drone.hostname] = drone
    272 
    273 
    274     def _remove_drone(self, hostname):
    275         self._drones.pop(hostname, None)
    276 
    277 
    278     def refresh_drone_configs(self):
    279         """
    280         Reread global config options for all drones.
    281         """
    282         # Import server_manager_utils is delayed rather than at the beginning of
    283         # this module. The reason is that test_that imports drone_manager when
    284         # importing autoserv_utils. The import is done before test_that setup
    285         # django (test_that only setup django in setup_local_afe, since it's
    286         # not needed when test_that runs the test in a lab duts through :lab:
    287         # option. Therefore, if server_manager_utils is imported at the
    288         # beginning of this module, test_that will fail since django is not
    289         # setup yet.
    290         from autotest_lib.site_utils import server_manager_utils
    291         config = global_config.global_config
    292         section = scheduler_config.CONFIG_SECTION
    293         config.parse_config_file()
    294         for hostname, drone in self._drones.iteritems():
    295             if server_manager_utils.use_server_db():
    296                 server = server_manager_utils.get_servers(hostname=hostname)[0]
    297                 attributes = dict([(a.attribute, a.value)
    298                                    for a in server.attributes.all()])
    299                 drone.enabled = (
    300                         int(attributes.get('disabled', 0)) == 0)
    301                 drone.max_processes = int(
    302                         attributes.get(
    303                             'max_processes',
    304                             scheduler_config.config.max_processes_per_drone))
    305                 allowed_users = attributes.get('users', None)
    306             else:
    307                 disabled = config.get_config_value(
    308                         section, '%s_disabled' % hostname, default='')
    309                 drone.enabled = not bool(disabled)
    310                 drone.max_processes = config.get_config_value(
    311                         section, '%s_max_processes' % hostname, type=int,
    312                         default=scheduler_config.config.max_processes_per_drone)
    313 
    314                 allowed_users = config.get_config_value(
    315                         section, '%s_users' % hostname, default=None)
    316             if allowed_users:
    317                 drone.allowed_users = set(allowed_users.split())
    318             else:
    319                 drone.allowed_users = None
    320             logging.info('Drone %s.max_processes: %s', hostname,
    321                          drone.max_processes)
    322             logging.info('Drone %s.enabled: %s', hostname, drone.enabled)
    323             logging.info('Drone %s.allowed_users: %s', hostname,
    324                          drone.allowed_users)
    325             logging.info('Drone %s.support_ssp: %s', hostname,
    326                          drone.support_ssp)
    327 
    328         self._reorder_drone_queue() # max_processes may have changed
    329         # Clear notification record about reaching max_processes limit.
    330         self._notify_record = {}
    331 
    332 
    333     def get_drones(self):
    334         return self._drones.itervalues()
    335 
    336 
    337     def cleanup_orphaned_containers(self):
    338         """Queue cleanup_orphaned_containers call at each drone.
    339         """
    340         for drone in self._drones.values():
    341             logging.info('Queue cleanup_orphaned_containers at %s',
    342                          drone.hostname)
    343             drone.queue_call('cleanup_orphaned_containers')
    344 
    345 
    346     def _get_drone_for_process(self, process):
    347         return self._drones[process.hostname]
    348 
    349 
    350     def _get_drone_for_pidfile_id(self, pidfile_id):
    351         pidfile_contents = self.get_pidfile_contents(pidfile_id)
    352         if pidfile_contents.process is None:
    353           raise DroneManagerError('Fail to get a drone due to empty pidfile')
    354         return self._get_drone_for_process(pidfile_contents.process)
    355 
    356 
    357     def get_drone_for_pidfile_id(self, pidfile_id):
    358         """Public API for luciferlib.
    359 
    360         @param pidfile_id: PidfileId instance.
    361         """
    362         return self._get_drone_for_pidfile_id(pidfile_id)
    363 
    364 
    365     def _drop_old_pidfiles(self):
    366         # use items() since the dict is modified in unregister_pidfile()
    367         for pidfile_id, info in self._registered_pidfile_info.items():
    368             if info.age > self._get_max_pidfile_refreshes():
    369                 logging.warning('dropping leaked pidfile %s', pidfile_id)
    370                 self.unregister_pidfile(pidfile_id)
    371             else:
    372                 info.age += 1
    373 
    374 
    375     def _reset(self):
    376         self._process_set = set()
    377         self._all_processes = {}
    378         self._pidfiles = {}
    379         self._pidfiles_second_read = {}
    380         self._drone_queue = []
    381 
    382 
    383     def _parse_pidfile(self, drone, raw_contents):
    384         """Parse raw pidfile contents.
    385 
    386         @param drone: The drone on which this pidfile was found.
    387         @param raw_contents: The raw contents of a pidfile, eg:
    388             "pid\nexit_staus\nnum_tests_failed\n".
    389         """
    390         contents = PidfileContents()
    391         if not raw_contents:
    392             return contents
    393         lines = raw_contents.splitlines()
    394         if len(lines) > 3:
    395             return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
    396                                   (len(lines), lines))
    397         try:
    398             pid = int(lines[0])
    399             contents.process = Process(drone.hostname, pid)
    400             # if len(lines) == 2, assume we caught Autoserv between writing
    401             # exit_status and num_failed_tests, so just ignore it and wait for
    402             # the next cycle
    403             if len(lines) == 3:
    404                 contents.exit_status = int(lines[1])
    405                 contents.num_tests_failed = int(lines[2])
    406         except ValueError, exc:
    407             return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
    408 
    409         return contents
    410 
    411 
    412     def _process_pidfiles(self, drone, pidfiles, store_in_dict):
    413         for pidfile_path, contents in pidfiles.iteritems():
    414             pidfile_id = PidfileId(pidfile_path)
    415             contents = self._parse_pidfile(drone, contents)
    416             store_in_dict[pidfile_id] = contents
    417 
    418 
    419     def _add_process(self, drone, process_info):
    420         process = Process(drone.hostname, int(process_info['pid']),
    421                           int(process_info['ppid']))
    422         self._process_set.add(process)
    423 
    424 
    425     def _add_autoserv_process(self, drone, process_info):
    426         assert process_info['comm'] == 'autoserv'
    427         # only root autoserv processes have pgid == pid
    428         if process_info['pgid'] != process_info['pid']:
    429             return
    430         self._add_process(drone, process_info)
    431 
    432 
    433     def _enqueue_drone(self, drone):
    434         heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
    435 
    436 
    437     def _reorder_drone_queue(self):
    438         heapq.heapify(self._drone_queue)
    439 
    440 
    441     def _compute_active_processes(self, drone):
    442         drone.active_processes = 0
    443         for pidfile_id, contents in self._pidfiles.iteritems():
    444             is_running = contents.exit_status is None
    445             on_this_drone = (contents.process
    446                              and contents.process.hostname == drone.hostname)
    447             if is_running and on_this_drone:
    448                 info = self._registered_pidfile_info[pidfile_id]
    449                 if info.num_processes is not None:
    450                     drone.active_processes += info.num_processes
    451 
    452         metrics.Gauge('chromeos/autotest/drone/active_processes').set(
    453                 drone.active_processes,
    454                 fields={'drone_hostname': drone.hostname})
    455 
    456 
    457     def _check_drone_process_limit(self, drone):
    458         """
    459         Notify if the number of processes on |drone| is approaching limit.
    460 
    461         @param drone: A Drone object.
    462         """
    463         try:
    464             percent = float(drone.active_processes) / drone.max_processes
    465         except ZeroDivisionError:
    466             percent = 100
    467         metrics.Float('chromeos/autotest/drone/active_process_percentage'
    468                       ).set(percent, fields={'drone_hostname': drone.hostname})
    469 
    470     def trigger_refresh(self):
    471         """Triggers a drone manager refresh.
    472 
    473         @raises DroneManagerError: If a drone has un-executed calls.
    474             Since they will get clobbered when we queue refresh calls.
    475         """
    476         self._reset()
    477         self._drop_old_pidfiles()
    478         pidfile_paths = [pidfile_id.path
    479                          for pidfile_id in self._registered_pidfile_info]
    480         drones = list(self.get_drones())
    481         for drone in drones:
    482             calls = drone.get_calls()
    483             if calls:
    484                 raise DroneManagerError('Drone %s has un-executed calls: %s '
    485                                         'which might get corrupted through '
    486                                         'this invocation' %
    487                                         (drone, [str(call) for call in calls]))
    488             drone.queue_call('refresh', pidfile_paths)
    489         logging.info("Invoking drone refresh.")
    490         with metrics.SecondsTimer(
    491                 'chromeos/autotest/drone_manager/trigger_refresh_duration'):
    492             self._refresh_task_queue.execute(drones, wait=False)
    493 
    494 
    495     def sync_refresh(self):
    496         """Complete the drone refresh started by trigger_refresh.
    497 
    498         Waits for all drone threads then refreshes internal datastructures
    499         with drone process information.
    500         """
    501 
    502         # This gives us a dictionary like what follows:
    503         # {drone: [{'pidfiles': (raw contents of pidfile paths),
    504         #           'autoserv_processes': (autoserv process info from ps),
    505         #           'all_processes': (all process info from ps),
    506         #           'parse_processes': (parse process infor from ps),
    507         #           'pidfile_second_read': (pidfile contents, again),}]
    508         #   drone2: ...}
    509         # The values of each drone are only a list because this adheres to the
    510         # drone utility interface (each call is executed and its results are
    511         # places in a list, but since we never couple the refresh calls with
    512         # any other call, this list will always contain a single dict).
    513         with metrics.SecondsTimer(
    514                 'chromeos/autotest/drone_manager/sync_refresh_duration'):
    515             all_results = self._refresh_task_queue.get_results()
    516         logging.info("Drones refreshed.")
    517 
    518         # The loop below goes through and parses pidfile contents. Pidfiles
    519         # are used to track autoserv execution, and will always contain < 3
    520         # lines of the following: pid, exit code, number of tests. Each pidfile
    521         # is identified by a PidfileId object, which contains a unique pidfile
    522         # path (unique because it contains the job id) making it hashable.
    523         # All pidfiles are stored in the drone managers _pidfiles dict as:
    524         #   {pidfile_id: pidfile_contents(Process(drone, pid),
    525         #                                 exit_code, num_tests_failed)}
    526         # In handle agents, each agent knows its pidfile_id, and uses this
    527         # to retrieve the refreshed contents of its pidfile via the
    528         # PidfileRunMonitor (through its tick) before making decisions. If
    529         # the agent notices that its process has exited, it unregisters the
    530         # pidfile from the drone_managers._registered_pidfile_info dict
    531         # through its epilog.
    532         for drone, results_list in all_results.iteritems():
    533             results = results_list[0]
    534             drone_hostname = drone.hostname.replace('.', '_')
    535 
    536             for process_info in results['all_processes']:
    537                 if process_info['comm'] == 'autoserv':
    538                     self._add_autoserv_process(drone, process_info)
    539                 drone_pid = drone.hostname, int(process_info['pid'])
    540                 self._all_processes[drone_pid] = process_info
    541 
    542             for process_info in results['parse_processes']:
    543                 self._add_process(drone, process_info)
    544 
    545             self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
    546             self._process_pidfiles(drone, results['pidfiles_second_read'],
    547                                    self._pidfiles_second_read)
    548 
    549             self._compute_active_processes(drone)
    550             if drone.enabled:
    551                 self._enqueue_drone(drone)
    552                 self._check_drone_process_limit(drone)
    553 
    554 
    555     def refresh(self):
    556         """Refresh all drones."""
    557         with metrics.SecondsTimer(
    558                 'chromeos/autotest/drone_manager/refresh_duration'):
    559             self.trigger_refresh()
    560             self.sync_refresh()
    561 
    562 
    563     @metrics.SecondsTimerDecorator(
    564         'chromeos/autotest/drone_manager/execute_actions_duration')
    565     def execute_actions(self):
    566         """
    567         Called at the end of a scheduler cycle to execute all queued actions
    568         on drones.
    569         """
    570         # Invoke calls queued on all drones since the last call to execute
    571         # and wait for them to return.
    572         if _THREADED_DRONE_MANAGER:
    573             thread_lib.ThreadedTaskQueue(
    574                     name='%s.execute_queue' % self._STATS_KEY).execute(
    575                             self._drones.values())
    576         else:
    577             drone_task_queue.DroneTaskQueue().execute(self._drones.values())
    578 
    579         try:
    580             self._results_drone.execute_queued_calls()
    581         except error.AutoservError:
    582             m = 'chromeos/autotest/errors/results_repository_failed'
    583             metrics.Counter(m).increment(
    584                 fields={'drone_hostname': self._results_drone.hostname})
    585             self._results_drone.clear_call_queue()
    586 
    587 
    588     def get_orphaned_autoserv_processes(self):
    589         """
    590         Returns a set of Process objects for orphaned processes only.
    591         """
    592         return set(process for process in self._process_set
    593                    if process.ppid == 1)
    594 
    595 
    596     def kill_process(self, process):
    597         """
    598         Kill the given process.
    599         """
    600         logging.info('killing %s', process)
    601         drone = self._get_drone_for_process(process)
    602         drone.queue_kill_process(process)
    603 
    604 
    605     def _ensure_directory_exists(self, path):
    606         if not os.path.exists(path):
    607             os.makedirs(path)
    608 
    609 
    610     def total_running_processes(self):
    611         return sum(drone.active_processes for drone in self.get_drones())
    612 
    613 
    614     def max_runnable_processes(self, username, drone_hostnames_allowed):
    615         """
    616         Return the maximum number of processes that can be run (in a single
    617         execution) given the current load on drones.
    618         @param username: login of user to run a process.  may be None.
    619         @param drone_hostnames_allowed: list of drones that can be used. May be
    620                                         None
    621         """
    622         usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
    623                                  if wrapper.drone.usable_by(username) and
    624                                  (drone_hostnames_allowed is None or
    625                                           wrapper.drone.hostname in
    626                                                   drone_hostnames_allowed)]
    627         if not usable_drone_wrappers:
    628             # all drones disabled or inaccessible
    629             return 0
    630         runnable_processes = [
    631                 wrapper.drone.max_processes - wrapper.drone.active_processes
    632                 for wrapper in usable_drone_wrappers]
    633         return max([0] + runnable_processes)
    634 
    635 
    636     def _least_loaded_drone(self, drones):
    637         return min(drones, key=lambda d: d.used_capacity())
    638 
    639 
    640     def pick_drone_to_use(self, num_processes=1, prefer_ssp=False):
    641         """Return a drone to use.
    642 
    643         Various options can be passed to optimize drone selection.
    644 
    645         num_processes is the number of processes the drone is intended
    646         to run.
    647 
    648         prefer_ssp indicates whether drones supporting server-side
    649         packaging should be preferred.  The returned drone is not
    650         guaranteed to support it.
    651 
    652         This public API is exposed for luciferlib to wrap.
    653 
    654         Returns a drone instance (see drones.py).
    655         """
    656         return self._choose_drone_for_execution(
    657                 num_processes=num_processes,
    658                 username=None,  # Always allow all drones
    659                 drone_hostnames_allowed=None,  # Always allow all drones
    660                 require_ssp=prefer_ssp,
    661         )
    662 
    663 
    664     def _choose_drone_for_execution(self, num_processes, username,
    665                                     drone_hostnames_allowed,
    666                                     require_ssp=False):
    667         """Choose a drone to execute command.
    668 
    669         @param num_processes: Number of processes needed for execution.
    670         @param username: Name of the user to execute the command.
    671         @param drone_hostnames_allowed: A list of names of drone allowed.
    672         @param require_ssp: Require server-side packaging to execute the,
    673                             command, default to False.
    674 
    675         @return: A drone object to be used for execution.
    676         """
    677         # cycle through drones is order of increasing used capacity until
    678         # we find one that can handle these processes
    679         checked_drones = []
    680         usable_drones = []
    681         # Drones do not support server-side packaging, used as backup if no
    682         # drone is found to run command requires server-side packaging.
    683         no_ssp_drones = []
    684         drone_to_use = None
    685         while self._drone_queue:
    686             drone = heapq.heappop(self._drone_queue).drone
    687             checked_drones.append(drone)
    688             logging.info('Checking drone %s', drone.hostname)
    689             if not drone.usable_by(username):
    690                 continue
    691 
    692             drone_allowed = (drone_hostnames_allowed is None
    693                              or drone.hostname in drone_hostnames_allowed)
    694             if not drone_allowed:
    695                 logging.debug('Drone %s not allowed: ', drone.hostname)
    696                 continue
    697             if require_ssp and not drone.support_ssp:
    698                 logging.debug('Drone %s does not support server-side '
    699                               'packaging.', drone.hostname)
    700                 no_ssp_drones.append(drone)
    701                 continue
    702 
    703             usable_drones.append(drone)
    704 
    705             if drone.active_processes + num_processes <= drone.max_processes:
    706                 drone_to_use = drone
    707                 break
    708             logging.info('Drone %s has %d active + %s requested > %s max',
    709                          drone.hostname, drone.active_processes, num_processes,
    710                          drone.max_processes)
    711 
    712         if not drone_to_use and usable_drones:
    713             # Drones are all over loaded, pick the one with least load.
    714             drone_summary = ','.join('%s %s/%s' % (drone.hostname,
    715                                                    drone.active_processes,
    716                                                    drone.max_processes)
    717                                      for drone in usable_drones)
    718             logging.error('No drone has capacity to handle %d processes (%s) '
    719                           'for user %s', num_processes, drone_summary, username)
    720             drone_to_use = self._least_loaded_drone(usable_drones)
    721         elif not drone_to_use and require_ssp and no_ssp_drones:
    722             # No drone supports server-side packaging, choose the least loaded.
    723             drone_to_use = self._least_loaded_drone(no_ssp_drones)
    724 
    725         # refill _drone_queue
    726         for drone in checked_drones:
    727             self._enqueue_drone(drone)
    728 
    729         return drone_to_use
    730 
    731 
    732     def _substitute_working_directory_into_command(self, command,
    733                                                    working_directory):
    734         for i, item in enumerate(command):
    735             if item is WORKING_DIRECTORY:
    736                 command[i] = working_directory
    737 
    738 
    739     def execute_command(self, command, working_directory, pidfile_name,
    740                         num_processes, log_file=None, paired_with_pidfile=None,
    741                         username=None, drone_hostnames_allowed=None):
    742         """
    743         Execute the given command, taken as an argv list.
    744 
    745         @param command: command to execute as a list.  if any item is
    746                 WORKING_DIRECTORY, the absolute path to the working directory
    747                 will be substituted for it.
    748         @param working_directory: directory in which the pidfile will be written
    749         @param pidfile_name: name of the pidfile this process will write
    750         @param num_processes: number of processes to account for from this
    751                 execution
    752         @param log_file (optional): path (in the results repository) to hold
    753                 command output.
    754         @param paired_with_pidfile (optional): a PidfileId for an
    755                 already-executed process; the new process will execute on the
    756                 same drone as the previous process.
    757         @param username (optional): login of the user responsible for this
    758                 process.
    759         @param drone_hostnames_allowed (optional): hostnames of the drones that
    760                                                    this command is allowed to
    761                                                    execute on
    762         """
    763         abs_working_directory = self.absolute_path(working_directory)
    764         if not log_file:
    765             log_file = self.get_temporary_path('execute')
    766         log_file = self.absolute_path(log_file)
    767 
    768         self._substitute_working_directory_into_command(command,
    769                                                         abs_working_directory)
    770 
    771         if paired_with_pidfile:
    772             drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
    773         else:
    774             require_ssp = '--require-ssp' in command
    775             drone = self._choose_drone_for_execution(
    776                     num_processes, username, drone_hostnames_allowed,
    777                     require_ssp=require_ssp)
    778             # Enable --warn-no-ssp option for autoserv to log a warning and run
    779             # the command without using server-side packaging.
    780             if require_ssp and not drone.support_ssp:
    781                 command.append('--warn-no-ssp')
    782 
    783         if not drone:
    784             raise DroneManagerError('command failed; no drones available: %s'
    785                                     % command)
    786 
    787         logging.info("command = %s", command)
    788         logging.info('log file = %s:%s', drone.hostname, log_file)
    789         self._write_attached_files(working_directory, drone)
    790         drone.queue_call('execute_command', command, abs_working_directory,
    791                          log_file, pidfile_name)
    792         drone.active_processes += num_processes
    793         self._reorder_drone_queue()
    794 
    795         pidfile_path = os.path.join(abs_working_directory, pidfile_name)
    796         pidfile_id = PidfileId(pidfile_path)
    797         self.register_pidfile(pidfile_id)
    798         self._registered_pidfile_info[pidfile_id].num_processes = num_processes
    799         return pidfile_id
    800 
    801 
    802     def get_pidfile_id_from(self, execution_tag, pidfile_name):
    803         path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
    804         return PidfileId(path)
    805 
    806 
    807     def register_pidfile(self, pidfile_id):
    808         """
    809         Indicate that the DroneManager should look for the given pidfile when
    810         refreshing.
    811         """
    812         if pidfile_id not in self._registered_pidfile_info:
    813             logging.info('monitoring pidfile %s', pidfile_id)
    814             self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
    815         self._reset_pidfile_age(pidfile_id)
    816 
    817 
    818     def _reset_pidfile_age(self, pidfile_id):
    819         if pidfile_id in self._registered_pidfile_info:
    820             self._registered_pidfile_info[pidfile_id].age = 0
    821 
    822 
    823     def unregister_pidfile(self, pidfile_id):
    824         if pidfile_id in self._registered_pidfile_info:
    825             logging.info('forgetting pidfile %s', pidfile_id)
    826             del self._registered_pidfile_info[pidfile_id]
    827 
    828 
    829     def declare_process_count(self, pidfile_id, num_processes):
    830         self._registered_pidfile_info[pidfile_id].num_processes = num_processes
    831 
    832 
    833     def get_pidfile_contents(self, pidfile_id, use_second_read=False):
    834         """
    835         Retrieve a PidfileContents object for the given pidfile_id.  If
    836         use_second_read is True, use results that were read after the processes
    837         were checked, instead of before.
    838         """
    839         self._reset_pidfile_age(pidfile_id)
    840         if use_second_read:
    841             pidfile_map = self._pidfiles_second_read
    842         else:
    843             pidfile_map = self._pidfiles
    844         return pidfile_map.get(pidfile_id, PidfileContents())
    845 
    846 
    847     def is_process_running(self, process):
    848         """
    849         Check if the given process is in the running process list.
    850         """
    851         if process in self._process_set:
    852             return True
    853 
    854         drone_pid = process.hostname, process.pid
    855         if drone_pid in self._all_processes:
    856             logging.error('Process %s found, but not an autoserv process. '
    857                     'Is %s', process, self._all_processes[drone_pid])
    858             return True
    859 
    860         return False
    861 
    862 
    863     def get_temporary_path(self, base_name):
    864         """
    865         Get a new temporary path guaranteed to be unique across all drones
    866         for this scheduler execution.
    867         """
    868         self._temporary_path_counter += 1
    869         return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
    870                             '%s.%s' % (base_name, self._temporary_path_counter))
    871 
    872 
    873     def absolute_path(self, path, on_results_repository=False):
    874         if on_results_repository:
    875             base_dir = self._results_dir
    876         else:
    877             base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
    878                                     _DRONE_RESULTS_DIR_SUFFIX)
    879         return os.path.join(base_dir, path)
    880 
    881 
    882     def _copy_results_helper(self, process, source_path, destination_path,
    883                              to_results_repository=False):
    884         logging.debug('_copy_results_helper. process: %s, source_path: %s, '
    885                       'destination_path: %s, to_results_repository: %s',
    886                       process, source_path, destination_path,
    887                       to_results_repository)
    888         full_source = self.absolute_path(source_path)
    889         full_destination = self.absolute_path(
    890                 destination_path, on_results_repository=to_results_repository)
    891         source_drone = self._get_drone_for_process(process)
    892         if to_results_repository:
    893             source_drone.send_file_to(self._results_drone, full_source,
    894                                       full_destination, can_fail=True)
    895         else:
    896             source_drone.queue_call('copy_file_or_directory', full_source,
    897                                     full_destination)
    898 
    899 
    900     def copy_to_results_repository(self, process, source_path,
    901                                    destination_path=None):
    902         """
    903         Copy results from the given process at source_path to destination_path
    904         in the results repository.
    905 
    906         This will only copy the results back for Special Agent Tasks (Cleanup,
    907         Verify, Repair) that reside in the hosts/ subdirectory of results if
    908         the copy_task_results_back flag has been set to True inside
    909         global_config.ini
    910 
    911         It will also only copy .parse.log files back to the scheduler if the
    912         copy_parse_log_back flag in global_config.ini has been set to True.
    913         """
    914         if not ENABLE_ARCHIVING:
    915             return
    916         copy_task_results_back = global_config.global_config.get_config_value(
    917                 scheduler_config.CONFIG_SECTION, 'copy_task_results_back',
    918                 type=bool)
    919         copy_parse_log_back = global_config.global_config.get_config_value(
    920                 scheduler_config.CONFIG_SECTION, 'copy_parse_log_back',
    921                 type=bool)
    922         special_task = source_path.startswith(HOSTS_JOB_SUBDIR)
    923         parse_log = source_path.endswith(PARSE_LOG)
    924         if (copy_task_results_back or not special_task) and (
    925                 copy_parse_log_back or not parse_log):
    926             if destination_path is None:
    927                 destination_path = source_path
    928             self._copy_results_helper(process, source_path, destination_path,
    929                                       to_results_repository=True)
    930 
    931     def _copy_to_results_repository(self, process, source_path,
    932                                    destination_path=None):
    933         """
    934         Copy results from the given process at source_path to destination_path
    935         in the results repository, without special task handling.
    936         """
    937         if destination_path is None:
    938             destination_path = source_path
    939         self._copy_results_helper(process, source_path, destination_path,
    940                                   to_results_repository=True)
    941 
    942 
    943     def copy_results_on_drone(self, process, source_path, destination_path):
    944         """
    945         Copy a results directory from one place to another on the drone.
    946         """
    947         self._copy_results_helper(process, source_path, destination_path)
    948 
    949 
    950     def _write_attached_files(self, results_dir, drone):
    951         attached_files = self._attached_files.pop(results_dir, {})
    952         for file_path, contents in attached_files.iteritems():
    953             drone.queue_call('write_to_file', self.absolute_path(file_path),
    954                              contents)
    955 
    956 
    957     def attach_file_to_execution(self, results_dir, file_contents,
    958                                  file_path=None):
    959         """
    960         When the process for the results directory is executed, the given file
    961         contents will be placed in a file on the drone.  Returns the path at
    962         which the file will be placed.
    963         """
    964         if not file_path:
    965             file_path = self.get_temporary_path('attach')
    966         files_for_execution = self._attached_files.setdefault(results_dir, {})
    967         assert file_path not in files_for_execution
    968         files_for_execution[file_path] = file_contents
    969         return file_path
    970 
    971 
    972     def write_lines_to_file(self, file_path, lines, paired_with_process=None):
    973         """
    974         Write the given lines (as a list of strings) to a file.  If
    975         paired_with_process is given, the file will be written on the drone
    976         running the given Process.  Otherwise, the file will be written to the
    977         results repository.
    978         """
    979         file_contents = '\n'.join(lines) + '\n'
    980         if paired_with_process:
    981             drone = self._get_drone_for_process(paired_with_process)
    982             on_results_repository = False
    983         else:
    984             drone = self._results_drone
    985             on_results_repository = True
    986         full_path = self.absolute_path(
    987                 file_path, on_results_repository=on_results_repository)
    988         drone.queue_call('write_to_file', full_path, file_contents)
    989 
    990 
    991 _the_instance = None
    992 
    993 def instance():
    994     if _the_instance is None:
    995         _set_instance(DroneManager())
    996     return _the_instance
    997 
    998 
    999 def _set_instance(instance): # usable for testing
   1000     global _the_instance
   1001     _the_instance = instance
   1002