Home | History | Annotate | Download | only in scheduler
      1 #pylint: disable-msg=C0111
      2 
      3 import cPickle
      4 import logging
      5 import os
      6 import time
      7 
      8 import common
      9 from autotest_lib.scheduler import drone_utility, email_manager
     10 from autotest_lib.client.bin import local_host
     11 from autotest_lib.client.common_lib import error, global_config
     12 
     13 CONFIG = global_config.global_config
     14 AUTOTEST_INSTALL_DIR = CONFIG.get_config_value('SCHEDULER',
     15                                                'drone_installation_directory')
     16 DEFAULT_CONTAINER_PATH = CONFIG.get_config_value('AUTOSERV', 'container_path')
     17 
     18 SSP_REQUIRED = CONFIG.get_config_value('SCHEDULER', 'exit_on_failed_ssp_setup',
     19                                        default=False)
     20 
     21 class DroneUnreachable(Exception):
     22     """The drone is non-sshable."""
     23     pass
     24 
     25 
     26 class SiteDrone(object):
     27     """
     28     Attributes:
     29     * allowed_users: set of usernames allowed to use this drone.  if None,
     30             any user can use this drone.
     31     """
     32     def __init__(self, timestamp_remote_calls=True):
     33         """Instantiate an abstract drone.
     34 
     35         @param timestamp_remote_calls: If true, drone_utility is invoked with
     36             the --call_time option and the current time. Currently this is only
     37             used for testing.
     38         """
     39         self._calls = []
     40         self.hostname = None
     41         self.enabled = True
     42         self.max_processes = 0
     43         self.active_processes = 0
     44         self.allowed_users = None
     45         self._autotest_install_dir = AUTOTEST_INSTALL_DIR
     46         self._host = None
     47         self.timestamp_remote_calls = timestamp_remote_calls
     48         self._processes_to_kill = []
     49 
     50 
     51     def shutdown(self):
     52         pass
     53 
     54 
     55     @property
     56     def _drone_utility_path(self):
     57         return os.path.join(self._autotest_install_dir,
     58                             'scheduler', 'drone_utility.py')
     59 
     60 
     61     def used_capacity(self):
     62         """Gets the capacity used by this drone
     63 
     64         Returns a tuple of (percentage_full, -max_capacity). This is to aid
     65         direct comparisons, so that a 0/10 drone is considered less heavily
     66         loaded than a 0/2 drone.
     67 
     68         This value should never be used directly. It should only be used in
     69         direct comparisons using the basic comparison operators, or using the
     70         cmp() function.
     71         """
     72         if self.max_processes == 0:
     73             return (1.0, 0)
     74         return (float(self.active_processes) / self.max_processes,
     75                 -self.max_processes)
     76 
     77 
     78     def usable_by(self, user):
     79         if self.allowed_users is None:
     80             return True
     81         return user in self.allowed_users
     82 
     83 
     84     def _execute_calls_impl(self, calls):
     85         if not self._host:
     86             raise ValueError('Drone cannot execute calls without a host.')
     87         drone_utility_cmd = self._drone_utility_path
     88         if self.timestamp_remote_calls:
     89             drone_utility_cmd = '%s --call_time %s' % (
     90                     drone_utility_cmd, time.time())
     91         logging.info("Running drone_utility on %s", self.hostname)
     92         result = self._host.run('python %s' % drone_utility_cmd,
     93                                 stdin=cPickle.dumps(calls), stdout_tee=None,
     94                                 connect_timeout=300)
     95         try:
     96             return cPickle.loads(result.stdout)
     97         except Exception: # cPickle.loads can throw all kinds of exceptions
     98             logging.critical('Invalid response:\n---\n%s\n---', result.stdout)
     99             raise
    100 
    101 
    102     def _execute_calls(self, calls):
    103         return_message = self._execute_calls_impl(calls)
    104         for warning in return_message['warnings']:
    105             subject = 'Warning from drone %s' % self.hostname
    106             logging.warning(subject + '\n' + warning)
    107             email_manager.manager.enqueue_notify_email(subject, warning)
    108         return return_message['results']
    109 
    110 
    111     def get_calls(self):
    112         """Returns the calls queued against this drone.
    113 
    114         @return: A list of calls queued against the drone.
    115         """
    116         return self._calls
    117 
    118 
    119     def call(self, method, *args, **kwargs):
    120         return self._execute_calls(
    121             [drone_utility.call(method, *args, **kwargs)])
    122 
    123 
    124     def queue_call(self, method, *args, **kwargs):
    125         self._calls.append(drone_utility.call(method, *args, **kwargs))
    126 
    127 
    128     def clear_call_queue(self):
    129         self._calls = []
    130 
    131 
    132     def execute_queued_calls(self):
    133         """Execute queued calls.
    134 
    135         If there are any processes queued to kill, kill them then process the
    136         remaining queued up calls.
    137         """
    138         if self._processes_to_kill:
    139             self.queue_call('kill_processes', self._processes_to_kill)
    140         self.clear_processes_to_kill()
    141 
    142         if not self._calls:
    143             return
    144         results = self._execute_calls(self._calls)
    145         self.clear_call_queue()
    146         return results
    147 
    148 
    149     def set_autotest_install_dir(self, path):
    150         pass
    151 
    152 
    153     def queue_kill_process(self, process):
    154         """Queue a process to kill/abort.
    155 
    156         @param process: Process to kill/abort.
    157         """
    158         self._processes_to_kill.append(process)
    159 
    160 
    161     def clear_processes_to_kill(self):
    162         """Reset the list of processes to kill for this tick."""
    163         self._processes_to_kill = []
    164 
    165 
    166 class _AbstractDrone(SiteDrone):
    167     pass
    168 
    169 
    170 class _LocalDrone(_AbstractDrone):
    171     def __init__(self, timestamp_remote_calls=True):
    172         super(_LocalDrone, self).__init__(
    173                 timestamp_remote_calls=timestamp_remote_calls)
    174         self.hostname = 'localhost'
    175         self._host = local_host.LocalHost()
    176 
    177 
    178     def send_file_to(self, drone, source_path, destination_path,
    179                      can_fail=False):
    180         if drone.hostname == self.hostname:
    181             self.queue_call('copy_file_or_directory', source_path,
    182                             destination_path)
    183         else:
    184             self.queue_call('send_file_to', drone.hostname, source_path,
    185                             destination_path, can_fail)
    186 
    187 
    188 class _RemoteDrone(_AbstractDrone):
    189     def __init__(self, hostname, timestamp_remote_calls=True):
    190         super(_RemoteDrone, self).__init__(
    191                 timestamp_remote_calls=timestamp_remote_calls)
    192         self.hostname = hostname
    193         self._host = drone_utility.create_host(hostname)
    194         if not self._host.is_up():
    195             logging.error('Drone %s is unpingable, kicking out', hostname)
    196             raise DroneUnreachable
    197 
    198 
    199     def set_autotest_install_dir(self, path):
    200         self._autotest_install_dir = path
    201 
    202 
    203     def shutdown(self):
    204         super(_RemoteDrone, self).shutdown()
    205         self._host.close()
    206 
    207 
    208     def send_file_to(self, drone, source_path, destination_path,
    209                      can_fail=False):
    210         if drone.hostname == self.hostname:
    211             self.queue_call('copy_file_or_directory', source_path,
    212                             destination_path)
    213         elif isinstance(drone, _LocalDrone):
    214             drone.queue_call('get_file_from', self.hostname, source_path,
    215                              destination_path)
    216         else:
    217             self.queue_call('send_file_to', drone.hostname, source_path,
    218                             destination_path, can_fail)
    219 
    220 
    221 def get_drone(hostname):
    222     """
    223     Use this factory method to get drone objects.
    224     """
    225     if hostname == 'localhost':
    226         return _LocalDrone()
    227     try:
    228         return _RemoteDrone(hostname)
    229     except DroneUnreachable:
    230         return None
    231