Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 #pylint: disable-msg=C0111
      3 
      4 """Utility module that executes management commands on the drone.
      5 
      6 1. This is the module responsible for orchestrating processes on a drone.
      7 2. It receives instructions via stdin and replies via stdout.
      8 3. Each invocation is responsible for the initiation of a set of batched calls.
      9 4. The batched calls may be synchronous or asynchronous.
     10 5. The caller is responsible for monitoring asynchronous calls through pidfiles.
     11 """
     12 
     13 
     14 import argparse
     15 import pickle, subprocess, os, shutil, sys, time, signal, getpass
     16 import datetime, traceback, tempfile, itertools, logging
     17 import common
     18 from autotest_lib.client.common_lib import utils, global_config, error
     19 from autotest_lib.client.common_lib import logging_manager
     20 from autotest_lib.client.common_lib.cros import retry
     21 from autotest_lib.scheduler import drone_logging_config
     22 from autotest_lib.scheduler import email_manager, scheduler_config
     23 from autotest_lib.server import hosts, subcommand
     24 
     25 
     26 # An environment variable we add to the environment to enable us to
     27 # distinguish processes we started from those that were started by
     28 # something else during recovery.  Name credit goes to showard. ;)
     29 DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK'
     30 
     31 _TEMPORARY_DIRECTORY = 'drone_tmp'
     32 _TRANSFER_FAILED_FILE = '.transfer_failed'
     33 
     34 # script and log file for cleaning up orphaned lxc containers.
     35 LXC_CLEANUP_SCRIPT = os.path.join(common.autotest_dir, 'site_utils',
     36                                   'lxc_cleanup.py')
     37 LXC_CLEANUP_LOG_FILE = os.path.join(common.autotest_dir, 'logs',
     38                                     'lxc_cleanup.log')
     39 
     40 
     41 class _MethodCall(object):
     42     def __init__(self, method, args, kwargs):
     43         self._method = method
     44         self._args = args
     45         self._kwargs = kwargs
     46 
     47 
     48     def execute_on(self, drone_utility):
     49         method = getattr(drone_utility, self._method)
     50         return method(*self._args, **self._kwargs)
     51 
     52 
     53     def __str__(self):
     54         args = ', '.join(repr(arg) for arg in self._args)
     55         kwargs = ', '.join('%s=%r' % (key, value) for key, value in
     56                            self._kwargs.iteritems())
     57         full_args = ', '.join(item for item in (args, kwargs) if item)
     58         return '%s(%s)' % (self._method, full_args)
     59 
     60 
     61 def call(method, *args, **kwargs):
     62     return _MethodCall(method, args, kwargs)
     63 
     64 
     65 class BaseDroneUtility(object):
     66     """
     67     This class executes actual OS calls on the drone machine.
     68 
     69     All paths going into and out of this class are absolute.
     70     """
     71     _WARNING_DURATION = 400
     72 
     73     def __init__(self):
     74         # Tattoo ourselves so that all of our spawn bears our mark.
     75         os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid()))
     76 
     77         self.warnings = []
     78         self._subcommands = []
     79 
     80 
     81     def initialize(self, results_dir):
     82         temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
     83         if os.path.exists(temporary_directory):
     84             # TODO crbug.com/391111: before we have a better solution to
     85             # periodically cleanup tmp files, we have to use rm -rf to delete
     86             # the whole folder. shutil.rmtree has performance issue when a
     87             # folder has large amount of files, e.g., drone_tmp.
     88             os.system('rm -rf %s' % temporary_directory)
     89         self._ensure_directory_exists(temporary_directory)
     90         # TODO (sbasi) crbug.com/345011 - Remove this configuration variable
     91         # and clean up build_externals so it NO-OP's.
     92         build_externals = global_config.global_config.get_config_value(
     93                 scheduler_config.CONFIG_SECTION, 'drone_build_externals',
     94                 default=True, type=bool)
     95         if build_externals:
     96             build_extern_cmd = os.path.join(common.autotest_dir,
     97                                             'utils', 'build_externals.py')
     98             utils.run(build_extern_cmd)
     99 
    100 
    101     def _warn(self, warning):
    102         self.warnings.append(warning)
    103 
    104 
    105     @staticmethod
    106     def _check_pid_for_dark_mark(pid, open=open):
    107         try:
    108             env_file = open('/proc/%s/environ' % pid, 'rb')
    109         except EnvironmentError:
    110             return False
    111         try:
    112             env_data = env_file.read()
    113         finally:
    114             env_file.close()
    115         return DARK_MARK_ENVIRONMENT_VAR in env_data
    116 
    117 
    118     _PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args')
    119 
    120 
    121     @classmethod
    122     def _get_process_info(cls):
    123         """Parse ps output for all process information.
    124 
    125         @returns A generator of dicts with cls._PS_ARGS as keys and
    126             string values each representing a running process. eg:
    127             {
    128                 'comm': command_name,
    129                 'pgid': process group id,
    130                 'ppid': parent process id,
    131                 'pid': process id,
    132                 'args': args the command was invoked with,
    133             }
    134         """
    135         @retry.retry(subprocess.CalledProcessError,
    136                      timeout_min=0.5, delay_sec=0.25)
    137         def run_ps():
    138             return subprocess.check_output(
    139                     ['/bin/ps', 'x', '-o', ','.join(cls._PS_ARGS)])
    140 
    141         ps_output = run_ps()
    142         # split each line into the columns output by ps
    143         split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
    144         return (dict(itertools.izip(cls._PS_ARGS, line_components))
    145                 for line_components in split_lines)
    146 
    147 
    148     def _refresh_processes(self, command_name, open=open,
    149                            site_check_parse=None):
    150         """Refreshes process info for the given command_name.
    151 
    152         Examines ps output as returned by get_process_info and returns
    153         the process dicts for processes matching the given command name.
    154 
    155         @param command_name: The name of the command, eg 'autoserv'.
    156 
    157         @return: A list of process info dictionaries as returned by
    158             _get_process_info.
    159         """
    160         # The open argument is used for test injection.
    161         check_mark = global_config.global_config.get_config_value(
    162             'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
    163         processes = []
    164         for info in self._get_process_info():
    165             is_parse = (site_check_parse and site_check_parse(info))
    166             if info['comm'] == command_name or is_parse:
    167                 if (check_mark and not
    168                         self._check_pid_for_dark_mark(info['pid'], open=open)):
    169                     self._warn('%(comm)s process pid %(pid)s has no '
    170                                'dark mark; ignoring.' % info)
    171                     continue
    172                 processes.append(info)
    173 
    174         return processes
    175 
    176 
    177     def _read_pidfiles(self, pidfile_paths):
    178         pidfiles = {}
    179         for pidfile_path in pidfile_paths:
    180             if not os.path.exists(pidfile_path):
    181                 continue
    182             try:
    183                 file_object = open(pidfile_path, 'r')
    184                 pidfiles[pidfile_path] = file_object.read()
    185                 file_object.close()
    186             except IOError:
    187                 continue
    188         return pidfiles
    189 
    190 
    191     def refresh(self, pidfile_paths):
    192         """
    193         pidfile_paths should be a list of paths to check for pidfiles.
    194 
    195         Returns a dict containing:
    196         * pidfiles: dict mapping pidfile paths to file contents, for pidfiles
    197         that exist.
    198         * autoserv_processes: list of dicts corresponding to running autoserv
    199         processes.  each dict contain pid, pgid, ppid, comm, and args (see
    200         "man ps" for details).
    201         * parse_processes: likewise, for parse processes.
    202         * pidfiles_second_read: same info as pidfiles, but gathered after the
    203         processes are scanned.
    204         """
    205         site_check_parse = utils.import_site_function(
    206                 __file__, 'autotest_lib.scheduler.site_drone_utility',
    207                 'check_parse', lambda x: False)
    208         results = {
    209             'pidfiles' : self._read_pidfiles(pidfile_paths),
    210             # element 0 of _get_process_info() is the headers from `ps`
    211             'all_processes' : list(self._get_process_info())[1:],
    212             'autoserv_processes' : self._refresh_processes('autoserv'),
    213             'parse_processes' : self._refresh_processes(
    214                     'parse', site_check_parse=site_check_parse),
    215             'pidfiles_second_read' : self._read_pidfiles(pidfile_paths),
    216         }
    217         return results
    218 
    219 
    220     def get_signal_queue_to_kill(self, process):
    221         """Get the signal queue needed to kill a process.
    222 
    223         autoserv process has a handle on SIGTERM, in which it can do some
    224         cleanup work. However, abort a process with SIGTERM then SIGKILL has
    225         its overhead, detailed in following CL:
    226         https://chromium-review.googlesource.com/230323
    227         This method checks the process's argument and determine if SIGTERM is
    228         required, and returns signal queue accordingly.
    229 
    230         @param process: A drone_manager.Process object to be killed.
    231 
    232         @return: The signal queue needed to kill a process.
    233 
    234         """
    235         signal_queue_with_sigterm = (signal.SIGTERM, signal.SIGKILL)
    236         try:
    237             ps_output = subprocess.check_output(
    238                     ['/bin/ps', '-p', str(process.pid), '-o', 'args'])
    239             # For test running with server-side packaging, SIGTERM needs to be
    240             # sent for autoserv process to destroy container used by the test.
    241             if '--require-ssp' in ps_output:
    242                 logging.debug('PID %d requires SIGTERM to abort to cleanup '
    243                               'container.', process.pid)
    244                 return signal_queue_with_sigterm
    245         except subprocess.CalledProcessError:
    246             # Ignore errors, return the signal queue with SIGTERM to be safe.
    247             return signal_queue_with_sigterm
    248         # Default to kill the process with SIGKILL directly.
    249         return (signal.SIGKILL,)
    250 
    251 
    252     def kill_processes(self, process_list):
    253         """Send signals escalating in severity to the processes in process_list.
    254 
    255         @param process_list: A list of drone_manager.Process objects
    256                              representing the processes to kill.
    257         """
    258         try:
    259             logging.info('List of process to be killed: %s', process_list)
    260             processes_to_kill = {}
    261             for p in process_list:
    262                 signal_queue = self.get_signal_queue_to_kill(p)
    263                 processes_to_kill[signal_queue] = (
    264                         processes_to_kill.get(signal_queue, []) + [p])
    265             sig_counts = {}
    266             for signal_queue, processes in processes_to_kill.iteritems():
    267                 sig_counts.update(utils.nuke_pids(
    268                         [-process.pid for process in processes],
    269                         signal_queue=signal_queue))
    270         except error.AutoservRunError as e:
    271             self._warn('Error occured when killing processes. Error: %s' % e)
    272 
    273 
    274     def _convert_old_host_log(self, log_path):
    275         """
    276         For backwards compatibility only.  This can safely be removed in the
    277         future.
    278 
    279         The scheduler used to create files at results/hosts/<hostname>, and
    280         append all host logs to that file.  Now, it creates directories at
    281         results/hosts/<hostname>, and places individual timestamped log files
    282         into that directory.
    283 
    284         This can be a problem the first time the scheduler runs after upgrading.
    285         To work around that, we'll look for a file at the path where the
    286         directory should be, and if we find one, we'll automatically convert it
    287         to a directory containing the old logfile.
    288         """
    289         # move the file out of the way
    290         temp_dir = tempfile.mkdtemp(suffix='.convert_host_log')
    291         base_name = os.path.basename(log_path)
    292         temp_path = os.path.join(temp_dir, base_name)
    293         os.rename(log_path, temp_path)
    294 
    295         os.mkdir(log_path)
    296 
    297         # and move it into the new directory
    298         os.rename(temp_path, os.path.join(log_path, 'old_log'))
    299         os.rmdir(temp_dir)
    300 
    301 
    302     def _ensure_directory_exists(self, path):
    303         if os.path.isdir(path):
    304             return
    305 
    306         if os.path.exists(path):
    307             # path exists already, but as a file, not a directory
    308             if '/hosts/' in path:
    309                 self._convert_old_host_log(path)
    310                 return
    311             else:
    312                 raise IOError('Path %s exists as a file, not a directory')
    313 
    314         os.makedirs(path)
    315 
    316 
    317     def execute_command(self, command, working_directory, log_file,
    318                         pidfile_name):
    319         out_file = None
    320         if log_file:
    321             self._ensure_directory_exists(os.path.dirname(log_file))
    322             try:
    323                 out_file = open(log_file, 'a')
    324                 separator = ('*' * 80) + '\n'
    325                 out_file.write('\n' + separator)
    326                 out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
    327                 out_file.write(separator)
    328             except (OSError, IOError):
    329                 email_manager.manager.log_stacktrace(
    330                     'Error opening log file %s' % log_file)
    331 
    332         if not out_file:
    333             out_file = open('/dev/null', 'w')
    334 
    335         in_devnull = open('/dev/null', 'r')
    336 
    337         self._ensure_directory_exists(working_directory)
    338         pidfile_path = os.path.join(working_directory, pidfile_name)
    339         if os.path.exists(pidfile_path):
    340             self._warn('Pidfile %s already exists' % pidfile_path)
    341             os.remove(pidfile_path)
    342 
    343         subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
    344                          stdin=in_devnull)
    345         out_file.close()
    346         in_devnull.close()
    347 
    348 
    349     def write_to_file(self, file_path, contents, is_retry=False):
    350         """Write the specified contents to the end of the given file.
    351 
    352         @param file_path: Path to the file.
    353         @param contents: Content to be written to the file.
    354         @param is_retry: True if this is a retry after file permission be
    355                          corrected.
    356         """
    357         self._ensure_directory_exists(os.path.dirname(file_path))
    358         try:
    359             file_object = open(file_path, 'a')
    360             file_object.write(contents)
    361             file_object.close()
    362         except IOError as e:
    363             # TODO(dshi): crbug.com/459344 Remove following retry when test
    364             # container can be unprivileged container.
    365             # If write failed with error 'Permission denied', one possible cause
    366             # is that the file was created in a container and thus owned by
    367             # root. If so, fix the file permission, and try again.
    368             if e.errno == 13 and not is_retry:
    369                 logging.error('Error write to file %s: %s. Will be retried.',
    370                               file_path, e)
    371                 utils.run('sudo chown %s "%s"' % (os.getuid(), file_path))
    372                 utils.run('sudo chgrp %s "%s"' % (os.getgid(), file_path))
    373                 self.write_to_file(file_path, contents, is_retry=True)
    374             else:
    375                 self._warn('Error write to file %s: %s' % (file_path, e))
    376 
    377 
    378     def copy_file_or_directory(self, source_path, destination_path):
    379         """
    380         This interface is designed to match server.hosts.abstract_ssh.get_file
    381         (and send_file).  That is, if the source_path ends with a slash, the
    382         contents of the directory are copied; otherwise, the directory iself is
    383         copied.
    384         """
    385         if self._same_file(source_path, destination_path):
    386             return
    387         self._ensure_directory_exists(os.path.dirname(destination_path))
    388         if source_path.endswith('/'):
    389             # copying a directory's contents to another directory
    390             assert os.path.isdir(source_path)
    391             assert os.path.isdir(destination_path)
    392             for filename in os.listdir(source_path):
    393                 self.copy_file_or_directory(
    394                     os.path.join(source_path, filename),
    395                     os.path.join(destination_path, filename))
    396         elif os.path.isdir(source_path):
    397             try:
    398                 shutil.copytree(source_path, destination_path, symlinks=True)
    399             except shutil.Error:
    400                 # Ignore copy directory error due to missing files. The cause
    401                 # of this behavior is that, gs_offloader zips up folders with
    402                 # too many files. There is a race condition that repair job
    403                 # tries to copy provision job results to the test job result
    404                 # folder, meanwhile gs_offloader is uploading the provision job
    405                 # result and zipping up folders which contains too many files.
    406                 pass
    407         elif os.path.islink(source_path):
    408             # copied from shutil.copytree()
    409             link_to = os.readlink(source_path)
    410             os.symlink(link_to, destination_path)
    411         else:
    412             shutil.copy(source_path, destination_path)
    413 
    414 
    415     def _same_file(self, source_path, destination_path):
    416         """Checks if the source and destination are the same
    417 
    418         Returns True if the destination is the same as the source, False
    419         otherwise. Also returns False if the destination does not exist.
    420         """
    421         if not os.path.exists(destination_path):
    422             return False
    423         return os.path.samefile(source_path, destination_path)
    424 
    425 
    426     def cleanup_orphaned_containers(self):
    427         """Run lxc_cleanup script to clean up orphaned container.
    428         """
    429         # The script needs to run with sudo as the containers are privileged.
    430         # TODO(dshi): crbug.com/459344 Call lxc_cleanup.main when test
    431         # container can be unprivileged container.
    432         command = ['sudo', LXC_CLEANUP_SCRIPT, '-x', '-v', '-l',
    433                    LXC_CLEANUP_LOG_FILE]
    434         logging.info('Running %s', command)
    435         # stdout and stderr needs to be direct to /dev/null, otherwise existing
    436         # of drone_utils process will kill lxc_cleanup script.
    437         subprocess.Popen(
    438                 command, shell=False, stdin=None, stdout=open('/dev/null', 'w'),
    439                 stderr=open('/dev/null', 'a'), preexec_fn=os.setpgrp)
    440 
    441 
    442     def wait_for_all_async_commands(self):
    443         for subproc in self._subcommands:
    444             subproc.fork_waitfor()
    445         self._subcommands = []
    446 
    447 
    448     def _poll_async_commands(self):
    449         still_running = []
    450         for subproc in self._subcommands:
    451             if subproc.poll() is None:
    452                 still_running.append(subproc)
    453         self._subcommands = still_running
    454 
    455 
    456     def _wait_for_some_async_commands(self):
    457         self._poll_async_commands()
    458         max_processes = scheduler_config.config.max_transfer_processes
    459         while len(self._subcommands) >= max_processes:
    460             time.sleep(1)
    461             self._poll_async_commands()
    462 
    463 
    464     def run_async_command(self, function, args):
    465         subproc = subcommand.subcommand(function, args)
    466         self._subcommands.append(subproc)
    467         subproc.fork_start()
    468 
    469 
    470     def _sync_get_file_from(self, hostname, source_path, destination_path):
    471         logging.debug('_sync_get_file_from hostname: %s, source_path: %s,'
    472                       'destination_path: %s', hostname, source_path,
    473                       destination_path)
    474         self._ensure_directory_exists(os.path.dirname(destination_path))
    475         host = create_host(hostname)
    476         host.get_file(source_path, destination_path, delete_dest=True)
    477 
    478 
    479     def get_file_from(self, hostname, source_path, destination_path):
    480         self.run_async_command(self._sync_get_file_from,
    481                                (hostname, source_path, destination_path))
    482 
    483 
    484     def sync_send_file_to(self, hostname, source_path, destination_path,
    485                            can_fail):
    486         logging.debug('sync_send_file_to. hostname: %s, source_path: %s, '
    487                       'destination_path: %s, can_fail:%s', hostname,
    488                       source_path, destination_path, can_fail)
    489         host = create_host(hostname)
    490         try:
    491             host.run('mkdir -p ' + os.path.dirname(destination_path))
    492             host.send_file(source_path, destination_path, delete_dest=True)
    493         except error.AutoservError:
    494             if not can_fail:
    495                 raise
    496 
    497             if os.path.isdir(source_path):
    498                 failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
    499                 file_object = open(failed_file, 'w')
    500                 try:
    501                     file_object.write('%s:%s\n%s\n%s' %
    502                                       (hostname, destination_path,
    503                                        datetime.datetime.now(),
    504                                        traceback.format_exc()))
    505                 finally:
    506                     file_object.close()
    507             else:
    508                 copy_to = destination_path + _TRANSFER_FAILED_FILE
    509                 self._ensure_directory_exists(os.path.dirname(copy_to))
    510                 self.copy_file_or_directory(source_path, copy_to)
    511 
    512 
    513     def send_file_to(self, hostname, source_path, destination_path,
    514                      can_fail=False):
    515         self.run_async_command(self.sync_send_file_to,
    516                                (hostname, source_path, destination_path,
    517                                 can_fail))
    518 
    519 
    520     def _report_long_execution(self, calls, duration):
    521         call_count = {}
    522         for call in calls:
    523             call_count.setdefault(call._method, 0)
    524             call_count[call._method] += 1
    525         call_summary = '\n'.join('%d %s' % (count, method)
    526                                  for method, count in call_count.iteritems())
    527         self._warn('Execution took %f sec\n%s' % (duration, call_summary))
    528 
    529 
    530     def execute_calls(self, calls):
    531         results = []
    532         start_time = time.time()
    533         max_processes = scheduler_config.config.max_transfer_processes
    534         for method_call in calls:
    535             results.append(method_call.execute_on(self))
    536             if len(self._subcommands) >= max_processes:
    537                 self._wait_for_some_async_commands()
    538         self.wait_for_all_async_commands()
    539 
    540         duration = time.time() - start_time
    541         if duration > self._WARNING_DURATION:
    542             self._report_long_execution(calls, duration)
    543 
    544         warnings = self.warnings
    545         self.warnings = []
    546         return dict(results=results, warnings=warnings)
    547 
    548 
    549 def create_host(hostname):
    550     username = global_config.global_config.get_config_value(
    551         'SCHEDULER', hostname + '_username', default=getpass.getuser())
    552     return hosts.SSHHost(hostname, user=username)
    553 
    554 
    555 def parse_input():
    556     input_chunks = []
    557     chunk_of_input = sys.stdin.read()
    558     while chunk_of_input:
    559         input_chunks.append(chunk_of_input)
    560         chunk_of_input = sys.stdin.read()
    561     pickled_input = ''.join(input_chunks)
    562 
    563     try:
    564         return pickle.loads(pickled_input)
    565     except Exception:
    566         separator = '*' * 50
    567         raise ValueError('Unpickling input failed\n'
    568                          'Input: %r\n'
    569                          'Exception from pickle:\n'
    570                          '%s\n%s\n%s' %
    571                          (pickled_input, separator, traceback.format_exc(),
    572                           separator))
    573 
    574 
    575 def _parse_args(args):
    576     parser = argparse.ArgumentParser(description='Local drone process manager.')
    577     parser.add_argument('--call_time',
    578                         help='Time this process was invoked from the master',
    579                         default=None, type=float)
    580     return parser.parse_args(args)
    581 
    582 
    583 SiteDroneUtility = utils.import_site_class(
    584    __file__, 'autotest_lib.scheduler.site_drone_utility',
    585    'SiteDroneUtility', BaseDroneUtility)
    586 
    587 
    588 class DroneUtility(SiteDroneUtility):
    589     pass
    590 
    591 
    592 def return_data(data):
    593     print pickle.dumps(data)
    594 
    595 
    596 def main():
    597     logging_manager.configure_logging(
    598             drone_logging_config.DroneLoggingConfig())
    599     calls = parse_input()
    600     args = _parse_args(sys.argv[1:])
    601 
    602     drone_utility = DroneUtility()
    603     return_value = drone_utility.execute_calls(calls)
    604     return_data(return_value)
    605 
    606 
    607 if __name__ == '__main__':
    608     main()
    609