Home | History | Annotate | Download | only in scheduler
      1 import heapq
      2 import os
      3 import logging
      4 
      5 import common
      6 from autotest_lib.client.common_lib import error
      7 from autotest_lib.client.common_lib import global_config
      8 from autotest_lib.client.common_lib import utils
      9 from autotest_lib.scheduler import drone_task_queue
     10 from autotest_lib.scheduler import drone_utility
     11 from autotest_lib.scheduler import drones
     12 from autotest_lib.scheduler import scheduler_config
     13 from autotest_lib.scheduler import thread_lib
     14 
     15 try:
     16     from chromite.lib import metrics
     17 except ImportError:
     18     metrics = utils.metrics_mock
     19 
     20 
     21 # results on drones will be placed under the drone_installation_directory in a
     22 # directory with this name
     23 _DRONE_RESULTS_DIR_SUFFIX = 'results'
     24 
     25 WORKING_DIRECTORY = object() # see execute_command()
     26 
     27 
     28 AUTOSERV_PID_FILE = '.autoserv_execute'
     29 CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
     30 PARSER_PID_FILE = '.parser_execute'
     31 ARCHIVER_PID_FILE = '.archiver_execute'
     32 
     33 ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
     34                      ARCHIVER_PID_FILE)
     35 
     36 _THREADED_DRONE_MANAGER = global_config.global_config.get_config_value(
     37         scheduler_config.CONFIG_SECTION, 'threaded_drone_manager',
     38         type=bool, default=True)
     39 
     40 HOSTS_JOB_SUBDIR = 'hosts/'
     41 PARSE_LOG = '.parse.log'
     42 ENABLE_ARCHIVING =  global_config.global_config.get_config_value(
     43         scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool)
     44 
     45 
     46 class DroneManagerError(Exception):
     47     pass
     48 
     49 
     50 class CustomEquals(object):
     51     def _id(self):
     52         raise NotImplementedError
     53 
     54 
     55     def __eq__(self, other):
     56         if not isinstance(other, type(self)):
     57             return NotImplemented
     58         return self._id() == other._id()
     59 
     60 
     61     def __ne__(self, other):
     62         return not self == other
     63 
     64 
     65     def __hash__(self):
     66         return hash(self._id())
     67 
     68 
     69 class Process(CustomEquals):
     70     def __init__(self, hostname, pid, ppid=None):
     71         self.hostname = hostname
     72         self.pid = pid
     73         self.ppid = ppid
     74 
     75     def _id(self):
     76         return (self.hostname, self.pid)
     77 
     78 
     79     def __str__(self):
     80         return '%s/%s' % (self.hostname, self.pid)
     81 
     82 
     83     def __repr__(self):
     84         return super(Process, self).__repr__() + '<%s>' % self
     85 
     86 
     87 class PidfileId(CustomEquals):
     88     def __init__(self, path):
     89         self.path = path
     90 
     91 
     92     def _id(self):
     93         return self.path
     94 
     95 
     96     def __str__(self):
     97         return str(self.path)
     98 
     99 
    100 class _PidfileInfo(object):
    101     age = 0
    102     num_processes = None
    103 
    104 
    105 class PidfileContents(object):
    106     process = None
    107     exit_status = None
    108     num_tests_failed = None
    109 
    110     def is_invalid(self):
    111         return False
    112 
    113 
    114     def is_running(self):
    115         return self.process and not self.exit_status
    116 
    117 
    118 class InvalidPidfile(object):
    119     process = None
    120     exit_status = None
    121     num_tests_failed = None
    122 
    123 
    124     def __init__(self, error):
    125         self.error = error
    126 
    127 
    128     def is_invalid(self):
    129         return True
    130 
    131 
    132     def is_running(self):
    133         return False
    134 
    135 
    136     def __str__(self):
    137         return self.error
    138 
    139 
    140 class _DroneHeapWrapper(object):
    141     """Wrapper to compare drones based on used_capacity().
    142 
    143     These objects can be used to keep a heap of drones by capacity.
    144     """
    145     def __init__(self, drone):
    146         self.drone = drone
    147 
    148 
    149     def __cmp__(self, other):
    150         assert isinstance(other, _DroneHeapWrapper)
    151         return cmp(self.drone.used_capacity(), other.drone.used_capacity())
    152 
    153 
    154 class DroneManager(object):
    155     """
    156     This class acts as an interface from the scheduler to drones, whether it be
    157     only a single "drone" for localhost or multiple remote drones.
    158 
    159     All paths going into and out of this class are relative to the full results
    160     directory, except for those returns by absolute_path().
    161     """
    162 
    163 
    164     # Minimum time to wait before next email
    165     # about a drone hitting process limit is sent.
    166     NOTIFY_INTERVAL = 60 * 60 * 24 # one day
    167     _STATS_KEY = 'drone_manager'
    168 
    169 
    170 
    171     def __init__(self):
    172         # absolute path of base results dir
    173         self._results_dir = None
    174         # holds Process objects
    175         self._process_set = set()
    176         # holds the list of all processes running on all drones
    177         self._all_processes = {}
    178         # maps PidfileId to PidfileContents
    179         self._pidfiles = {}
    180         # same as _pidfiles
    181         self._pidfiles_second_read = {}
    182         # maps PidfileId to _PidfileInfo
    183         self._registered_pidfile_info = {}
    184         # used to generate unique temporary paths
    185         self._temporary_path_counter = 0
    186         # maps hostname to Drone object
    187         self._drones = {}
    188         self._results_drone = None
    189         # maps results dir to dict mapping file path to contents
    190         self._attached_files = {}
    191         # heapq of _DroneHeapWrappers
    192         self._drone_queue = []
    193         # A threaded task queue used to refresh drones asynchronously.
    194         if _THREADED_DRONE_MANAGER:
    195             self._refresh_task_queue = thread_lib.ThreadedTaskQueue(
    196                     name='%s.refresh_queue' % self._STATS_KEY)
    197         else:
    198             self._refresh_task_queue = drone_task_queue.DroneTaskQueue()
    199 
    200 
    201     def initialize(self, base_results_dir, drone_hostnames,
    202                    results_repository_hostname):
    203         self._results_dir = base_results_dir
    204 
    205         for hostname in drone_hostnames:
    206             self._add_drone(hostname)
    207 
    208         if not self._drones:
    209             # all drones failed to initialize
    210             raise DroneManagerError('No valid drones found')
    211 
    212         self.refresh_drone_configs()
    213 
    214         logging.info('Using results repository on %s',
    215                      results_repository_hostname)
    216         self._results_drone = drones.get_drone(results_repository_hostname)
    217         results_installation_dir = global_config.global_config.get_config_value(
    218                 scheduler_config.CONFIG_SECTION,
    219                 'results_host_installation_directory', default=None)
    220         if results_installation_dir:
    221             self._results_drone.set_autotest_install_dir(
    222                     results_installation_dir)
    223         # don't initialize() the results drone - we don't want to clear out any
    224         # directories and we don't need to kill any processes
    225 
    226 
    227     def reinitialize_drones(self):
    228         for drone in self.get_drones():
    229             with metrics.SecondsTimer(
    230                     'chromeos/autotest/drone_manager/'
    231                     'reinitialize_drones_duration',
    232                     fields={'drone': drone.hostname}):
    233                 drone.call('initialize', self._results_dir)
    234 
    235 
    236     def shutdown(self):
    237         for drone in self.get_drones():
    238             drone.shutdown()
    239 
    240 
    241     def _get_max_pidfile_refreshes(self):
    242         """
    243         Normally refresh() is called on every monitor_db.Dispatcher.tick().
    244 
    245         @returns: The number of refresh() calls before we forget a pidfile.
    246         """
    247         pidfile_timeout = global_config.global_config.get_config_value(
    248                 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
    249                 type=int, default=2000)
    250         return pidfile_timeout
    251 
    252 
    253     def _add_drone(self, hostname):
    254         """
    255         Add drone.
    256 
    257         Catches AutoservRunError if the drone fails initialization and does not
    258         add it to the list of usable drones.
    259 
    260         @param hostname: Hostname of the drone we are trying to add.
    261         """
    262         logging.info('Adding drone %s' % hostname)
    263         drone = drones.get_drone(hostname)
    264         if drone:
    265             try:
    266                 drone.call('initialize', self.absolute_path(''))
    267             except error.AutoservRunError as e:
    268                 logging.error('Failed to initialize drone %s with error: %s',
    269                               hostname, e)
    270                 return
    271             self._drones[drone.hostname] = drone
    272 
    273 
    274     def _remove_drone(self, hostname):
    275         self._drones.pop(hostname, None)
    276 
    277 
    278     def refresh_drone_configs(self):
    279         """
    280         Reread global config options for all drones.
    281         """
    282         # Import server_manager_utils is delayed rather than at the beginning of
    283         # this module. The reason is that test_that imports drone_manager when
    284         # importing autoserv_utils. The import is done before test_that setup
    285         # django (test_that only setup django in setup_local_afe, since it's
    286         # not needed when test_that runs the test in a lab duts through :lab:
    287         # option. Therefore, if server_manager_utils is imported at the
    288         # beginning of this module, test_that will fail since django is not
    289         # setup yet.
    290         from autotest_lib.site_utils import server_manager_utils
    291         config = global_config.global_config
    292         section = scheduler_config.CONFIG_SECTION
    293         config.parse_config_file()
    294         for hostname, drone in self._drones.iteritems():
    295             if server_manager_utils.use_server_db():
    296                 server = server_manager_utils.get_servers(hostname=hostname)[0]
    297                 attributes = dict([(a.attribute, a.value)
    298                                    for a in server.attributes.all()])
    299                 drone.enabled = (
    300                         int(attributes.get('disabled', 0)) == 0)
    301                 drone.max_processes = int(
    302                         attributes.get(
    303                             'max_processes',
    304                             scheduler_config.config.max_processes_per_drone))
    305                 allowed_users = attributes.get('users', None)
    306             else:
    307                 disabled = config.get_config_value(
    308                         section, '%s_disabled' % hostname, default='')
    309                 drone.enabled = not bool(disabled)
    310                 drone.max_processes = config.get_config_value(
    311                         section, '%s_max_processes' % hostname, type=int,
    312                         default=scheduler_config.config.max_processes_per_drone)
    313 
    314                 allowed_users = config.get_config_value(
    315                         section, '%s_users' % hostname, default=None)
    316             if allowed_users:
    317                 drone.allowed_users = set(allowed_users.split())
    318             else:
    319                 drone.allowed_users = None
    320             logging.info('Drone %s.max_processes: %s', hostname,
    321                          drone.max_processes)
    322             logging.info('Drone %s.enabled: %s', hostname, drone.enabled)
    323             logging.info('Drone %s.allowed_users: %s', hostname,
    324                          drone.allowed_users)
    325 
    326         self._reorder_drone_queue() # max_processes may have changed
    327         # Clear notification record about reaching max_processes limit.
    328         self._notify_record = {}
    329 
    330 
    331     def get_drones(self):
    332         return self._drones.itervalues()
    333 
    334 
    335     def cleanup_orphaned_containers(self):
    336         """Queue cleanup_orphaned_containers call at each drone.
    337         """
    338         for drone in self._drones.values():
    339             logging.info('Queue cleanup_orphaned_containers at %s',
    340                          drone.hostname)
    341             drone.queue_call('cleanup_orphaned_containers')
    342 
    343 
    344     def _get_drone_for_process(self, process):
    345         return self._drones[process.hostname]
    346 
    347 
    348     def _get_drone_for_pidfile_id(self, pidfile_id):
    349         pidfile_contents = self.get_pidfile_contents(pidfile_id)
    350         if pidfile_contents.process is None:
    351           raise DroneManagerError('Fail to get a drone due to empty pidfile')
    352         return self._get_drone_for_process(pidfile_contents.process)
    353 
    354 
    355     def get_drone_for_pidfile_id(self, pidfile_id):
    356         """Public API for luciferlib.
    357 
    358         @param pidfile_id: PidfileId instance.
    359         """
    360         return self._get_drone_for_pidfile_id(pidfile_id)
    361 
    362 
    363     def _drop_old_pidfiles(self):
    364         # use items() since the dict is modified in unregister_pidfile()
    365         for pidfile_id, info in self._registered_pidfile_info.items():
    366             if info.age > self._get_max_pidfile_refreshes():
    367                 logging.warning('dropping leaked pidfile %s', pidfile_id)
    368                 self.unregister_pidfile(pidfile_id)
    369             else:
    370                 info.age += 1
    371 
    372 
    373     def _reset(self):
    374         self._process_set = set()
    375         self._all_processes = {}
    376         self._pidfiles = {}
    377         self._pidfiles_second_read = {}
    378         self._drone_queue = []
    379 
    380 
    381     def _parse_pidfile(self, drone, raw_contents):
    382         """Parse raw pidfile contents.
    383 
    384         @param drone: The drone on which this pidfile was found.
    385         @param raw_contents: The raw contents of a pidfile, eg:
    386             "pid\nexit_staus\nnum_tests_failed\n".
    387         """
    388         contents = PidfileContents()
    389         if not raw_contents:
    390             return contents
    391         lines = raw_contents.splitlines()
    392         if len(lines) > 3:
    393             return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
    394                                   (len(lines), lines))
    395         try:
    396             pid = int(lines[0])
    397             contents.process = Process(drone.hostname, pid)
    398             # if len(lines) == 2, assume we caught Autoserv between writing
    399             # exit_status and num_failed_tests, so just ignore it and wait for
    400             # the next cycle
    401             if len(lines) == 3:
    402                 contents.exit_status = int(lines[1])
    403                 contents.num_tests_failed = int(lines[2])
    404         except ValueError, exc:
    405             return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
    406 
    407         return contents
    408 
    409 
    410     def _process_pidfiles(self, drone, pidfiles, store_in_dict):
    411         for pidfile_path, contents in pidfiles.iteritems():
    412             pidfile_id = PidfileId(pidfile_path)
    413             contents = self._parse_pidfile(drone, contents)
    414             store_in_dict[pidfile_id] = contents
    415 
    416 
    417     def _add_process(self, drone, process_info):
    418         process = Process(drone.hostname, int(process_info['pid']),
    419                           int(process_info['ppid']))
    420         self._process_set.add(process)
    421 
    422 
    423     def _add_autoserv_process(self, drone, process_info):
    424         assert process_info['comm'] == 'autoserv'
    425         # only root autoserv processes have pgid == pid
    426         if process_info['pgid'] != process_info['pid']:
    427             return
    428         self._add_process(drone, process_info)
    429 
    430 
    431     def _enqueue_drone(self, drone):
    432         heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
    433 
    434 
    435     def _reorder_drone_queue(self):
    436         heapq.heapify(self._drone_queue)
    437 
    438 
    439     def reorder_drone_queue(self):
    440         """Reorder drone queue according to modified process counts.
    441 
    442         This public API is exposed for luciferlib to wrap.
    443         """
    444         self._reorder_drone_queue()
    445 
    446 
    447     def _compute_active_processes(self, drone):
    448         drone.active_processes = 0
    449         for pidfile_id, contents in self._pidfiles.iteritems():
    450             is_running = contents.exit_status is None
    451             on_this_drone = (contents.process
    452                              and contents.process.hostname == drone.hostname)
    453             if is_running and on_this_drone:
    454                 info = self._registered_pidfile_info[pidfile_id]
    455                 if info.num_processes is not None:
    456                     drone.active_processes += info.num_processes
    457 
    458         metrics.Gauge('chromeos/autotest/drone/active_processes').set(
    459                 drone.active_processes,
    460                 fields={'drone_hostname': drone.hostname})
    461 
    462 
    463     def _check_drone_process_limit(self, drone):
    464         """
    465         Notify if the number of processes on |drone| is approaching limit.
    466 
    467         @param drone: A Drone object.
    468         """
    469         try:
    470             percent = float(drone.active_processes) / drone.max_processes
    471         except ZeroDivisionError:
    472             percent = 100
    473         metrics.Float('chromeos/autotest/drone/active_process_percentage'
    474                       ).set(percent, fields={'drone_hostname': drone.hostname})
    475 
    476     def trigger_refresh(self):
    477         """Triggers a drone manager refresh.
    478 
    479         @raises DroneManagerError: If a drone has un-executed calls.
    480             Since they will get clobbered when we queue refresh calls.
    481         """
    482         self._reset()
    483         self._drop_old_pidfiles()
    484         pidfile_paths = [pidfile_id.path
    485                          for pidfile_id in self._registered_pidfile_info]
    486         drones = list(self.get_drones())
    487         for drone in drones:
    488             calls = drone.get_calls()
    489             if calls:
    490                 raise DroneManagerError('Drone %s has un-executed calls: %s '
    491                                         'which might get corrupted through '
    492                                         'this invocation' %
    493                                         (drone, [str(call) for call in calls]))
    494             drone.queue_call('refresh', pidfile_paths)
    495         logging.info("Invoking drone refresh.")
    496         with metrics.SecondsTimer(
    497                 'chromeos/autotest/drone_manager/trigger_refresh_duration'):
    498             self._refresh_task_queue.execute(drones, wait=False)
    499 
    500 
    501     def sync_refresh(self):
    502         """Complete the drone refresh started by trigger_refresh.
    503 
    504         Waits for all drone threads then refreshes internal datastructures
    505         with drone process information.
    506         """
    507 
    508         # This gives us a dictionary like what follows:
    509         # {drone: [{'pidfiles': (raw contents of pidfile paths),
    510         #           'autoserv_processes': (autoserv process info from ps),
    511         #           'all_processes': (all process info from ps),
    512         #           'parse_processes': (parse process infor from ps),
    513         #           'pidfile_second_read': (pidfile contents, again),}]
    514         #   drone2: ...}
    515         # The values of each drone are only a list because this adheres to the
    516         # drone utility interface (each call is executed and its results are
    517         # places in a list, but since we never couple the refresh calls with
    518         # any other call, this list will always contain a single dict).
    519         with metrics.SecondsTimer(
    520                 'chromeos/autotest/drone_manager/sync_refresh_duration'):
    521             all_results = self._refresh_task_queue.get_results()
    522         logging.info("Drones refreshed.")
    523 
    524         # The loop below goes through and parses pidfile contents. Pidfiles
    525         # are used to track autoserv execution, and will always contain < 3
    526         # lines of the following: pid, exit code, number of tests. Each pidfile
    527         # is identified by a PidfileId object, which contains a unique pidfile
    528         # path (unique because it contains the job id) making it hashable.
    529         # All pidfiles are stored in the drone managers _pidfiles dict as:
    530         #   {pidfile_id: pidfile_contents(Process(drone, pid),
    531         #                                 exit_code, num_tests_failed)}
    532         # In handle agents, each agent knows its pidfile_id, and uses this
    533         # to retrieve the refreshed contents of its pidfile via the
    534         # PidfileRunMonitor (through its tick) before making decisions. If
    535         # the agent notices that its process has exited, it unregisters the
    536         # pidfile from the drone_managers._registered_pidfile_info dict
    537         # through its epilog.
    538         for drone, results_list in all_results.iteritems():
    539             results = results_list[0]
    540             drone_hostname = drone.hostname.replace('.', '_')
    541 
    542             for process_info in results['all_processes']:
    543                 if process_info['comm'] == 'autoserv':
    544                     self._add_autoserv_process(drone, process_info)
    545                 drone_pid = drone.hostname, int(process_info['pid'])
    546                 self._all_processes[drone_pid] = process_info
    547 
    548             for process_info in results['parse_processes']:
    549                 self._add_process(drone, process_info)
    550 
    551             self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
    552             self._process_pidfiles(drone, results['pidfiles_second_read'],
    553                                    self._pidfiles_second_read)
    554 
    555             self._compute_active_processes(drone)
    556             if drone.enabled:
    557                 self._enqueue_drone(drone)
    558                 self._check_drone_process_limit(drone)
    559 
    560 
    561     def refresh(self):
    562         """Refresh all drones."""
    563         with metrics.SecondsTimer(
    564                 'chromeos/autotest/drone_manager/refresh_duration'):
    565             self.trigger_refresh()
    566             self.sync_refresh()
    567 
    568 
    569     @metrics.SecondsTimerDecorator(
    570         'chromeos/autotest/drone_manager/execute_actions_duration')
    571     def execute_actions(self):
    572         """
    573         Called at the end of a scheduler cycle to execute all queued actions
    574         on drones.
    575         """
    576         # Invoke calls queued on all drones since the last call to execute
    577         # and wait for them to return.
    578         if _THREADED_DRONE_MANAGER:
    579             thread_lib.ThreadedTaskQueue(
    580                     name='%s.execute_queue' % self._STATS_KEY).execute(
    581                             self._drones.values())
    582         else:
    583             drone_task_queue.DroneTaskQueue().execute(self._drones.values())
    584 
    585         try:
    586             self._results_drone.execute_queued_calls()
    587         except error.AutoservError:
    588             m = 'chromeos/autotest/errors/results_repository_failed'
    589             metrics.Counter(m).increment(
    590                 fields={'drone_hostname': self._results_drone.hostname})
    591             self._results_drone.clear_call_queue()
    592 
    593 
    594     def get_orphaned_autoserv_processes(self):
    595         """
    596         Returns a set of Process objects for orphaned processes only.
    597         """
    598         return set(process for process in self._process_set
    599                    if process.ppid == 1)
    600 
    601 
    602     def kill_process(self, process):
    603         """
    604         Kill the given process.
    605         """
    606         logging.info('killing %s', process)
    607         drone = self._get_drone_for_process(process)
    608         drone.queue_kill_process(process)
    609 
    610 
    611     def _ensure_directory_exists(self, path):
    612         if not os.path.exists(path):
    613             os.makedirs(path)
    614 
    615 
    616     def total_running_processes(self):
    617         return sum(drone.active_processes for drone in self.get_drones())
    618 
    619 
    620     def max_runnable_processes(self, username, drone_hostnames_allowed):
    621         """
    622         Return the maximum number of processes that can be run (in a single
    623         execution) given the current load on drones.
    624         @param username: login of user to run a process.  may be None.
    625         @param drone_hostnames_allowed: list of drones that can be used. May be
    626                                         None
    627         """
    628         usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
    629                                  if wrapper.drone.usable_by(username) and
    630                                  (drone_hostnames_allowed is None or
    631                                           wrapper.drone.hostname in
    632                                                   drone_hostnames_allowed)]
    633         if not usable_drone_wrappers:
    634             # all drones disabled or inaccessible
    635             return 0
    636         runnable_processes = [
    637                 wrapper.drone.max_processes - wrapper.drone.active_processes
    638                 for wrapper in usable_drone_wrappers]
    639         return max([0] + runnable_processes)
    640 
    641 
    642     def _least_loaded_drone(self, drones):
    643         return min(drones, key=lambda d: d.used_capacity())
    644 
    645 
    646     def pick_drone_to_use(self, num_processes=1):
    647         """Return a drone to use.
    648 
    649         Various options can be passed to optimize drone selection.
    650 
    651         num_processes is the number of processes the drone is intended
    652         to run.
    653 
    654         This public API is exposed for luciferlib to wrap.
    655 
    656         Returns a drone instance (see drones.py).
    657         """
    658         return self._choose_drone_for_execution(
    659                 num_processes=num_processes,
    660                 username=None,  # Always allow all drones
    661                 drone_hostnames_allowed=None,  # Always allow all drones
    662         )
    663 
    664 
    665     def _choose_drone_for_execution(self, num_processes, username,
    666                                     drone_hostnames_allowed):
    667         """Choose a drone to execute command.
    668 
    669         @param num_processes: Number of processes needed for execution.
    670         @param username: Name of the user to execute the command.
    671         @param drone_hostnames_allowed: A list of names of drone allowed.
    672 
    673         @return: A drone object to be used for execution.
    674         """
    675         # cycle through drones is order of increasing used capacity until
    676         # we find one that can handle these processes
    677         checked_drones = []
    678         usable_drones = []
    679         drone_to_use = None
    680         while self._drone_queue:
    681             drone = heapq.heappop(self._drone_queue).drone
    682             checked_drones.append(drone)
    683             logging.info('Checking drone %s', drone.hostname)
    684             if not drone.usable_by(username):
    685                 continue
    686 
    687             drone_allowed = (drone_hostnames_allowed is None
    688                              or drone.hostname in drone_hostnames_allowed)
    689             if not drone_allowed:
    690                 logging.debug('Drone %s not allowed: ', drone.hostname)
    691                 continue
    692 
    693             usable_drones.append(drone)
    694 
    695             if drone.active_processes + num_processes <= drone.max_processes:
    696                 drone_to_use = drone
    697                 break
    698             logging.info('Drone %s has %d active + %s requested > %s max',
    699                          drone.hostname, drone.active_processes, num_processes,
    700                          drone.max_processes)
    701 
    702         if not drone_to_use and usable_drones:
    703             # Drones are all over loaded, pick the one with least load.
    704             drone_summary = ','.join('%s %s/%s' % (drone.hostname,
    705                                                    drone.active_processes,
    706                                                    drone.max_processes)
    707                                      for drone in usable_drones)
    708             logging.error('No drone has capacity to handle %d processes (%s) '
    709                           'for user %s', num_processes, drone_summary, username)
    710             drone_to_use = self._least_loaded_drone(usable_drones)
    711 
    712         # refill _drone_queue
    713         for drone in checked_drones:
    714             self._enqueue_drone(drone)
    715 
    716         return drone_to_use
    717 
    718 
    719     def _substitute_working_directory_into_command(self, command,
    720                                                    working_directory):
    721         for i, item in enumerate(command):
    722             if item is WORKING_DIRECTORY:
    723                 command[i] = working_directory
    724 
    725 
    726     def execute_command(self, command, working_directory, pidfile_name,
    727                         num_processes, log_file=None, paired_with_pidfile=None,
    728                         username=None, drone_hostnames_allowed=None):
    729         """
    730         Execute the given command, taken as an argv list.
    731 
    732         @param command: command to execute as a list.  if any item is
    733                 WORKING_DIRECTORY, the absolute path to the working directory
    734                 will be substituted for it.
    735         @param working_directory: directory in which the pidfile will be written
    736         @param pidfile_name: name of the pidfile this process will write
    737         @param num_processes: number of processes to account for from this
    738                 execution
    739         @param log_file (optional): path (in the results repository) to hold
    740                 command output.
    741         @param paired_with_pidfile (optional): a PidfileId for an
    742                 already-executed process; the new process will execute on the
    743                 same drone as the previous process.
    744         @param username (optional): login of the user responsible for this
    745                 process.
    746         @param drone_hostnames_allowed (optional): hostnames of the drones that
    747                                                    this command is allowed to
    748                                                    execute on
    749         """
    750         abs_working_directory = self.absolute_path(working_directory)
    751         if not log_file:
    752             log_file = self.get_temporary_path('execute')
    753         log_file = self.absolute_path(log_file)
    754 
    755         self._substitute_working_directory_into_command(command,
    756                                                         abs_working_directory)
    757 
    758         if paired_with_pidfile:
    759             drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
    760         else:
    761             drone = self._choose_drone_for_execution(
    762                     num_processes, username, drone_hostnames_allowed)
    763 
    764         if not drone:
    765             raise DroneManagerError('command failed; no drones available: %s'
    766                                     % command)
    767 
    768         logging.info("command = %s", command)
    769         logging.info('log file = %s:%s', drone.hostname, log_file)
    770         self._write_attached_files(working_directory, drone)
    771         drone.queue_call('execute_command', command, abs_working_directory,
    772                          log_file, pidfile_name)
    773         drone.active_processes += num_processes
    774         self._reorder_drone_queue()
    775 
    776         pidfile_path = os.path.join(abs_working_directory, pidfile_name)
    777         pidfile_id = PidfileId(pidfile_path)
    778         self.register_pidfile(pidfile_id)
    779         self._registered_pidfile_info[pidfile_id].num_processes = num_processes
    780         return pidfile_id
    781 
    782 
    783     def get_pidfile_id_from(self, execution_tag, pidfile_name):
    784         path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
    785         return PidfileId(path)
    786 
    787 
    788     def register_pidfile(self, pidfile_id):
    789         """
    790         Indicate that the DroneManager should look for the given pidfile when
    791         refreshing.
    792         """
    793         if pidfile_id not in self._registered_pidfile_info:
    794             logging.info('monitoring pidfile %s', pidfile_id)
    795             self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
    796         self._reset_pidfile_age(pidfile_id)
    797 
    798 
    799     def _reset_pidfile_age(self, pidfile_id):
    800         if pidfile_id in self._registered_pidfile_info:
    801             self._registered_pidfile_info[pidfile_id].age = 0
    802 
    803 
    804     def unregister_pidfile(self, pidfile_id):
    805         if pidfile_id in self._registered_pidfile_info:
    806             logging.info('forgetting pidfile %s', pidfile_id)
    807             del self._registered_pidfile_info[pidfile_id]
    808 
    809 
    810     def declare_process_count(self, pidfile_id, num_processes):
    811         self._registered_pidfile_info[pidfile_id].num_processes = num_processes
    812 
    813 
    814     def get_pidfile_contents(self, pidfile_id, use_second_read=False):
    815         """
    816         Retrieve a PidfileContents object for the given pidfile_id.  If
    817         use_second_read is True, use results that were read after the processes
    818         were checked, instead of before.
    819         """
    820         self._reset_pidfile_age(pidfile_id)
    821         if use_second_read:
    822             pidfile_map = self._pidfiles_second_read
    823         else:
    824             pidfile_map = self._pidfiles
    825         return pidfile_map.get(pidfile_id, PidfileContents())
    826 
    827 
    828     def is_process_running(self, process):
    829         """
    830         Check if the given process is in the running process list.
    831         """
    832         if process in self._process_set:
    833             return True
    834 
    835         drone_pid = process.hostname, process.pid
    836         if drone_pid in self._all_processes:
    837             logging.error('Process %s found, but not an autoserv process. '
    838                     'Is %s', process, self._all_processes[drone_pid])
    839             return True
    840 
    841         return False
    842 
    843 
    844     def get_temporary_path(self, base_name):
    845         """
    846         Get a new temporary path guaranteed to be unique across all drones
    847         for this scheduler execution.
    848         """
    849         self._temporary_path_counter += 1
    850         return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
    851                             '%s.%s' % (base_name, self._temporary_path_counter))
    852 
    853 
    854     def absolute_path(self, path, on_results_repository=False):
    855         if on_results_repository:
    856             base_dir = self._results_dir
    857         else:
    858             base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
    859                                     _DRONE_RESULTS_DIR_SUFFIX)
    860         return os.path.join(base_dir, path)
    861 
    862 
    863     def _copy_results_helper(self, process, source_path, destination_path,
    864                              to_results_repository=False):
    865         logging.debug('_copy_results_helper. process: %s, source_path: %s, '
    866                       'destination_path: %s, to_results_repository: %s',
    867                       process, source_path, destination_path,
    868                       to_results_repository)
    869         full_source = self.absolute_path(source_path)
    870         full_destination = self.absolute_path(
    871                 destination_path, on_results_repository=to_results_repository)
    872         source_drone = self._get_drone_for_process(process)
    873         if to_results_repository:
    874             source_drone.send_file_to(self._results_drone, full_source,
    875                                       full_destination, can_fail=True)
    876         else:
    877             source_drone.queue_call('copy_file_or_directory', full_source,
    878                                     full_destination)
    879 
    880 
    881     def copy_to_results_repository(self, process, source_path,
    882                                    destination_path=None):
    883         """
    884         Copy results from the given process at source_path to destination_path
    885         in the results repository.
    886 
    887         This will only copy the results back for Special Agent Tasks (Cleanup,
    888         Verify, Repair) that reside in the hosts/ subdirectory of results if
    889         the copy_task_results_back flag has been set to True inside
    890         global_config.ini
    891 
    892         It will also only copy .parse.log files back to the scheduler if the
    893         copy_parse_log_back flag in global_config.ini has been set to True.
    894         """
    895         if not ENABLE_ARCHIVING:
    896             return
    897         copy_task_results_back = global_config.global_config.get_config_value(
    898                 scheduler_config.CONFIG_SECTION, 'copy_task_results_back',
    899                 type=bool)
    900         copy_parse_log_back = global_config.global_config.get_config_value(
    901                 scheduler_config.CONFIG_SECTION, 'copy_parse_log_back',
    902                 type=bool)
    903         special_task = source_path.startswith(HOSTS_JOB_SUBDIR)
    904         parse_log = source_path.endswith(PARSE_LOG)
    905         if (copy_task_results_back or not special_task) and (
    906                 copy_parse_log_back or not parse_log):
    907             if destination_path is None:
    908                 destination_path = source_path
    909             self._copy_results_helper(process, source_path, destination_path,
    910                                       to_results_repository=True)
    911 
    912     def _copy_to_results_repository(self, process, source_path,
    913                                    destination_path=None):
    914         """
    915         Copy results from the given process at source_path to destination_path
    916         in the results repository, without special task handling.
    917         """
    918         if destination_path is None:
    919             destination_path = source_path
    920         self._copy_results_helper(process, source_path, destination_path,
    921                                   to_results_repository=True)
    922 
    923 
    924     def copy_results_on_drone(self, process, source_path, destination_path):
    925         """
    926         Copy a results directory from one place to another on the drone.
    927         """
    928         self._copy_results_helper(process, source_path, destination_path)
    929 
    930 
    931     def _write_attached_files(self, results_dir, drone):
    932         attached_files = self._attached_files.pop(results_dir, {})
    933         for file_path, contents in attached_files.iteritems():
    934             drone.queue_call('write_to_file', self.absolute_path(file_path),
    935                              contents)
    936 
    937 
    938     def attach_file_to_execution(self, results_dir, file_contents,
    939                                  file_path=None):
    940         """
    941         When the process for the results directory is executed, the given file
    942         contents will be placed in a file on the drone.  Returns the path at
    943         which the file will be placed.
    944         """
    945         if not file_path:
    946             file_path = self.get_temporary_path('attach')
    947         files_for_execution = self._attached_files.setdefault(results_dir, {})
    948         assert file_path not in files_for_execution
    949         files_for_execution[file_path] = file_contents
    950         return file_path
    951 
    952 
    953     def write_lines_to_file(self, file_path, lines, paired_with_process=None):
    954         """
    955         Write the given lines (as a list of strings) to a file.  If
    956         paired_with_process is given, the file will be written on the drone
    957         running the given Process.  Otherwise, the file will be written to the
    958         results repository.
    959         """
    960         file_contents = '\n'.join(lines) + '\n'
    961         if paired_with_process:
    962             drone = self._get_drone_for_process(paired_with_process)
    963             on_results_repository = False
    964         else:
    965             drone = self._results_drone
    966             on_results_repository = True
    967         full_path = self.absolute_path(
    968                 file_path, on_results_repository=on_results_repository)
    969         drone.queue_call('write_to_file', full_path, file_contents)
    970 
    971 
    972 _the_instance = None
    973 
    974 def instance():
    975     if _the_instance is None:
    976         _set_instance(DroneManager())
    977     return _the_instance
    978 
    979 
    980 def _set_instance(instance): # usable for testing
    981     global _the_instance
    982     _the_instance = instance
    983