Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 
      3 """Utility module that executes management commands on the drone.
      4 
      5 1. This is the module responsible for orchestrating processes on a drone.
      6 2. It receives instructions via stdin and replies via stdout.
      7 3. Each invocation is responsible for the initiation of a set of batched calls.
      8 4. The batched calls may be synchronous or asynchronous.
      9 5. The caller is responsible for monitoring asynchronous calls through pidfiles.
     10 """
     11 
     12 #pylint: disable-msg=missing-docstring
     13 
     14 import argparse
     15 import collections
     16 import datetime
     17 import getpass
     18 import itertools
     19 import logging
     20 import multiprocessing
     21 import os
     22 import pickle
     23 import shutil
     24 import signal
     25 import subprocess
     26 import sys
     27 import tempfile
     28 import time
     29 import traceback
     30 
     31 import common
     32 
     33 from autotest_lib.client.common_lib import error
     34 from autotest_lib.client.common_lib import global_config
     35 from autotest_lib.client.common_lib import logging_manager
     36 from autotest_lib.client.common_lib import utils
     37 from autotest_lib.client.common_lib.cros import retry
     38 from autotest_lib.scheduler import drone_logging_config
     39 from autotest_lib.scheduler import scheduler_config
     40 from autotest_lib.server import subcommand
     41 
     42 
     43 # An environment variable we add to the environment to enable us to
     44 # distinguish processes we started from those that were started by
     45 # something else during recovery.  Name credit goes to showard. ;)
     46 DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK'
     47 
     48 _TEMPORARY_DIRECTORY = 'drone_tmp'
     49 _TRANSFER_FAILED_FILE = '.transfer_failed'
     50 
     51 # script and log file for cleaning up orphaned lxc containers.
     52 LXC_CLEANUP_SCRIPT = os.path.join(common.autotest_dir, 'site_utils',
     53                                   'lxc_cleanup.py')
     54 LXC_CLEANUP_LOG_FILE = os.path.join(common.autotest_dir, 'logs',
     55                                     'lxc_cleanup.log')
     56 
     57 
     58 class _MethodCall(object):
     59     def __init__(self, method, args, kwargs):
     60         self._method = method
     61         self._args = args
     62         self._kwargs = kwargs
     63 
     64 
     65     def execute_on(self, drone_utility):
     66         method = getattr(drone_utility, self._method)
     67         return method(*self._args, **self._kwargs)
     68 
     69 
     70     def __str__(self):
     71         args = ', '.join(repr(arg) for arg in self._args)
     72         kwargs = ', '.join('%s=%r' % (key, value) for key, value in
     73                            self._kwargs.iteritems())
     74         full_args = ', '.join(item for item in (args, kwargs) if item)
     75         return '%s(%s)' % (self._method, full_args)
     76 
     77 
     78 def call(method, *args, **kwargs):
     79     return _MethodCall(method, args, kwargs)
     80 
     81 
     82 class DroneUtility(object):
     83     """
     84     This class executes actual OS calls on the drone machine.
     85 
     86     All paths going into and out of this class are absolute.
     87     """
     88     _WARNING_DURATION = 400
     89 
     90     def __init__(self):
     91         # Tattoo ourselves so that all of our spawn bears our mark.
     92         os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid()))
     93 
     94         self.warnings = []
     95         self._subcommands = []
     96 
     97 
     98     def initialize(self, results_dir):
     99         temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
    100         if os.path.exists(temporary_directory):
    101             # TODO crbug.com/391111: before we have a better solution to
    102             # periodically cleanup tmp files, we have to use rm -rf to delete
    103             # the whole folder. shutil.rmtree has performance issue when a
    104             # folder has large amount of files, e.g., drone_tmp.
    105             os.system('rm -rf %s' % temporary_directory)
    106         self._ensure_directory_exists(temporary_directory)
    107         # TODO (sbasi) crbug.com/345011 - Remove this configuration variable
    108         # and clean up build_externals so it NO-OP's.
    109         build_externals = global_config.global_config.get_config_value(
    110                 scheduler_config.CONFIG_SECTION, 'drone_build_externals',
    111                 default=True, type=bool)
    112         if build_externals:
    113             build_extern_cmd = os.path.join(common.autotest_dir,
    114                                             'utils', 'build_externals.py')
    115             utils.run(build_extern_cmd)
    116 
    117 
    118     def _warn(self, warning):
    119         self.warnings.append(warning)
    120 
    121 
    122     def refresh(self, pidfile_paths):
    123         """Refreshes our view of the processes referred to by pdfile_paths.
    124 
    125         See drone_utility.ProcessRefresher.__call__ for details.
    126         """
    127         check_mark = global_config.global_config.get_config_value(
    128             'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
    129         use_pool = global_config.global_config.get_config_value(
    130             'SCHEDULER', 'drone_utility_refresh_use_pool', bool, False)
    131         result, warnings = ProcessRefresher(check_mark, use_pool)(pidfile_paths)
    132         self.warnings += warnings
    133         return result
    134 
    135 
    136     def get_signal_queue_to_kill(self, process):
    137         """Get the signal queue needed to kill a process.
    138 
    139         autoserv process has a handle on SIGTERM, in which it can do some
    140         cleanup work. However, abort a process with SIGTERM then SIGKILL has
    141         its overhead, detailed in following CL:
    142         https://chromium-review.googlesource.com/230323
    143         This method checks the process's argument and determine if SIGTERM is
    144         required, and returns signal queue accordingly.
    145 
    146         @param process: A drone_manager.Process object to be killed.
    147 
    148         @return: The signal queue needed to kill a process.
    149 
    150         """
    151         signal_queue_with_sigterm = (signal.SIGTERM, signal.SIGKILL)
    152         try:
    153             ps_output = subprocess.check_output(
    154                     ['/bin/ps', '-p', str(process.pid), '-o', 'args'])
    155             # For test running with server-side packaging, SIGTERM needs to be
    156             # sent for autoserv process to destroy container used by the test.
    157             if '--require-ssp' in ps_output:
    158                 logging.debug('PID %d requires SIGTERM to abort to cleanup '
    159                               'container.', process.pid)
    160                 return signal_queue_with_sigterm
    161         except subprocess.CalledProcessError:
    162             # Ignore errors, return the signal queue with SIGTERM to be safe.
    163             return signal_queue_with_sigterm
    164         # Default to kill the process with SIGKILL directly.
    165         return (signal.SIGKILL,)
    166 
    167 
    168     def kill_processes(self, process_list):
    169         """Send signals escalating in severity to the processes in process_list.
    170 
    171         @param process_list: A list of drone_manager.Process objects
    172                              representing the processes to kill.
    173         """
    174         try:
    175             logging.info('List of process to be killed: %s', process_list)
    176             processes_to_kill = {}
    177             for p in process_list:
    178                 signal_queue = self.get_signal_queue_to_kill(p)
    179                 processes_to_kill[signal_queue] = (
    180                         processes_to_kill.get(signal_queue, []) + [p])
    181             sig_counts = {}
    182             for signal_queue, processes in processes_to_kill.iteritems():
    183                 sig_counts.update(utils.nuke_pids(
    184                         [-process.pid for process in processes],
    185                         signal_queue=signal_queue))
    186         except error.AutoservRunError as e:
    187             self._warn('Error occured when killing processes. Error: %s' % e)
    188 
    189 
    190     def _convert_old_host_log(self, log_path):
    191         """
    192         For backwards compatibility only.  This can safely be removed in the
    193         future.
    194 
    195         The scheduler used to create files at results/hosts/<hostname>, and
    196         append all host logs to that file.  Now, it creates directories at
    197         results/hosts/<hostname>, and places individual timestamped log files
    198         into that directory.
    199 
    200         This can be a problem the first time the scheduler runs after upgrading.
    201         To work around that, we'll look for a file at the path where the
    202         directory should be, and if we find one, we'll automatically convert it
    203         to a directory containing the old logfile.
    204         """
    205         # move the file out of the way
    206         temp_dir = tempfile.mkdtemp(suffix='.convert_host_log')
    207         base_name = os.path.basename(log_path)
    208         temp_path = os.path.join(temp_dir, base_name)
    209         os.rename(log_path, temp_path)
    210 
    211         os.mkdir(log_path)
    212 
    213         # and move it into the new directory
    214         os.rename(temp_path, os.path.join(log_path, 'old_log'))
    215         os.rmdir(temp_dir)
    216 
    217 
    218     def _ensure_directory_exists(self, path):
    219         if os.path.isdir(path):
    220             return
    221 
    222         if os.path.exists(path):
    223             # path exists already, but as a file, not a directory
    224             if '/hosts/' in path:
    225                 self._convert_old_host_log(path)
    226                 return
    227             else:
    228                 raise IOError('Path %s exists as a file, not a directory')
    229 
    230         os.makedirs(path)
    231 
    232 
    233     def execute_command(self, command, working_directory, log_file,
    234                         pidfile_name):
    235         out_file = None
    236         if log_file:
    237             self._ensure_directory_exists(os.path.dirname(log_file))
    238             try:
    239                 out_file = open(log_file, 'a')
    240                 separator = ('*' * 80) + '\n'
    241                 out_file.write('\n' + separator)
    242                 out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
    243                 out_file.write(separator)
    244             except (OSError, IOError):
    245                 pass
    246 
    247         if not out_file:
    248             out_file = open('/dev/null', 'w')
    249 
    250         in_devnull = open('/dev/null', 'r')
    251 
    252         self._ensure_directory_exists(working_directory)
    253         pidfile_path = os.path.join(working_directory, pidfile_name)
    254         if os.path.exists(pidfile_path):
    255             self._warn('Pidfile %s already exists' % pidfile_path)
    256             os.remove(pidfile_path)
    257 
    258         subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
    259                          stdin=in_devnull)
    260         out_file.close()
    261         in_devnull.close()
    262 
    263 
    264     def write_to_file(self, file_path, contents, is_retry=False):
    265         """Write the specified contents to the end of the given file.
    266 
    267         @param file_path: Path to the file.
    268         @param contents: Content to be written to the file.
    269         @param is_retry: True if this is a retry after file permission be
    270                          corrected.
    271         """
    272         self._ensure_directory_exists(os.path.dirname(file_path))
    273         try:
    274             file_object = open(file_path, 'a')
    275             file_object.write(contents)
    276             file_object.close()
    277         except IOError as e:
    278             # TODO(dshi): crbug.com/459344 Remove following retry when test
    279             # container can be unprivileged container.
    280             # If write failed with error 'Permission denied', one possible cause
    281             # is that the file was created in a container and thus owned by
    282             # root. If so, fix the file permission, and try again.
    283             if e.errno == 13 and not is_retry:
    284                 logging.error('Error write to file %s: %s. Will be retried.',
    285                               file_path, e)
    286                 utils.run('sudo chown %s "%s"' % (os.getuid(), file_path))
    287                 utils.run('sudo chgrp %s "%s"' % (os.getgid(), file_path))
    288                 self.write_to_file(file_path, contents, is_retry=True)
    289             else:
    290                 self._warn('Error write to file %s: %s' % (file_path, e))
    291 
    292 
    293     def copy_file_or_directory(self, source_path, destination_path):
    294         """
    295         This interface is designed to match server.hosts.abstract_ssh.get_file
    296         (and send_file).  That is, if the source_path ends with a slash, the
    297         contents of the directory are copied; otherwise, the directory iself is
    298         copied.
    299         """
    300         if self._same_file(source_path, destination_path):
    301             return
    302         self._ensure_directory_exists(os.path.dirname(destination_path))
    303         if source_path.endswith('/'):
    304             # copying a directory's contents to another directory
    305             assert os.path.isdir(source_path)
    306             assert os.path.isdir(destination_path)
    307             for filename in os.listdir(source_path):
    308                 self.copy_file_or_directory(
    309                     os.path.join(source_path, filename),
    310                     os.path.join(destination_path, filename))
    311         elif os.path.isdir(source_path):
    312             try:
    313                 shutil.copytree(source_path, destination_path, symlinks=True)
    314             except shutil.Error:
    315                 # Ignore copy directory error due to missing files. The cause
    316                 # of this behavior is that, gs_offloader zips up folders with
    317                 # too many files. There is a race condition that repair job
    318                 # tries to copy provision job results to the test job result
    319                 # folder, meanwhile gs_offloader is uploading the provision job
    320                 # result and zipping up folders which contains too many files.
    321                 pass
    322         elif os.path.islink(source_path):
    323             # copied from shutil.copytree()
    324             link_to = os.readlink(source_path)
    325             os.symlink(link_to, destination_path)
    326         else:
    327             try:
    328                 shutil.copy(source_path, destination_path)
    329             except IOError:
    330                 # Ignore copy error following the same above reason.
    331                 pass
    332 
    333 
    334     def _same_file(self, source_path, destination_path):
    335         """Checks if the source and destination are the same
    336 
    337         Returns True if the destination is the same as the source, False
    338         otherwise. Also returns False if the destination does not exist.
    339         """
    340         if not os.path.exists(destination_path):
    341             return False
    342         return os.path.samefile(source_path, destination_path)
    343 
    344 
    345     def cleanup_orphaned_containers(self):
    346         """Run lxc_cleanup script to clean up orphaned container.
    347         """
    348         # The script needs to run with sudo as the containers are privileged.
    349         # TODO(dshi): crbug.com/459344 Call lxc_cleanup.main when test
    350         # container can be unprivileged container.
    351         command = ['sudo', LXC_CLEANUP_SCRIPT, '-x', '-v', '-l',
    352                    LXC_CLEANUP_LOG_FILE]
    353         logging.info('Running %s', command)
    354         # stdout and stderr needs to be direct to /dev/null, otherwise existing
    355         # of drone_utils process will kill lxc_cleanup script.
    356         subprocess.Popen(
    357                 command, shell=False, stdin=None, stdout=open('/dev/null', 'w'),
    358                 stderr=open('/dev/null', 'a'), preexec_fn=os.setpgrp)
    359 
    360 
    361     def wait_for_all_async_commands(self):
    362         for subproc in self._subcommands:
    363             subproc.fork_waitfor()
    364         self._subcommands = []
    365 
    366 
    367     def _poll_async_commands(self):
    368         still_running = []
    369         for subproc in self._subcommands:
    370             if subproc.poll() is None:
    371                 still_running.append(subproc)
    372         self._subcommands = still_running
    373 
    374 
    375     def _wait_for_some_async_commands(self):
    376         self._poll_async_commands()
    377         max_processes = scheduler_config.config.max_transfer_processes
    378         while len(self._subcommands) >= max_processes:
    379             time.sleep(1)
    380             self._poll_async_commands()
    381 
    382 
    383     def run_async_command(self, function, args):
    384         subproc = subcommand.subcommand(function, args)
    385         self._subcommands.append(subproc)
    386         subproc.fork_start()
    387 
    388 
    389     def _sync_get_file_from(self, hostname, source_path, destination_path):
    390         logging.debug('_sync_get_file_from hostname: %s, source_path: %s,'
    391                       'destination_path: %s', hostname, source_path,
    392                       destination_path)
    393         self._ensure_directory_exists(os.path.dirname(destination_path))
    394         host = create_host(hostname)
    395         host.get_file(source_path, destination_path, delete_dest=True)
    396 
    397 
    398     def get_file_from(self, hostname, source_path, destination_path):
    399         self.run_async_command(self._sync_get_file_from,
    400                                (hostname, source_path, destination_path))
    401 
    402 
    403     def sync_send_file_to(self, hostname, source_path, destination_path,
    404                            can_fail):
    405         logging.debug('sync_send_file_to. hostname: %s, source_path: %s, '
    406                       'destination_path: %s, can_fail:%s', hostname,
    407                       source_path, destination_path, can_fail)
    408         host = create_host(hostname)
    409         try:
    410             host.run('mkdir -p ' + os.path.dirname(destination_path))
    411             host.send_file(source_path, destination_path, delete_dest=True)
    412         except error.AutoservError:
    413             if not can_fail:
    414                 raise
    415 
    416             if os.path.isdir(source_path):
    417                 failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
    418                 file_object = open(failed_file, 'w')
    419                 try:
    420                     file_object.write('%s:%s\n%s\n%s' %
    421                                       (hostname, destination_path,
    422                                        datetime.datetime.now(),
    423                                        traceback.format_exc()))
    424                 finally:
    425                     file_object.close()
    426             else:
    427                 copy_to = destination_path + _TRANSFER_FAILED_FILE
    428                 self._ensure_directory_exists(os.path.dirname(copy_to))
    429                 self.copy_file_or_directory(source_path, copy_to)
    430 
    431 
    432     def send_file_to(self, hostname, source_path, destination_path,
    433                      can_fail=False):
    434         self.run_async_command(self.sync_send_file_to,
    435                                (hostname, source_path, destination_path,
    436                                 can_fail))
    437 
    438 
    439     def _report_long_execution(self, calls, duration):
    440         call_count = {}
    441         for call in calls:
    442             call_count.setdefault(call._method, 0)
    443             call_count[call._method] += 1
    444         call_summary = '\n'.join('%d %s' % (count, method)
    445                                  for method, count in call_count.iteritems())
    446         self._warn('Execution took %f sec\n%s' % (duration, call_summary))
    447 
    448 
    449     def execute_calls(self, calls):
    450         results = []
    451         start_time = time.time()
    452         max_processes = scheduler_config.config.max_transfer_processes
    453         for method_call in calls:
    454             results.append(method_call.execute_on(self))
    455             if len(self._subcommands) >= max_processes:
    456                 self._wait_for_some_async_commands()
    457         self.wait_for_all_async_commands()
    458 
    459         duration = time.time() - start_time
    460         if duration > self._WARNING_DURATION:
    461             self._report_long_execution(calls, duration)
    462 
    463         warnings = self.warnings
    464         self.warnings = []
    465         return dict(results=results, warnings=warnings)
    466 
    467 
    468 _MAX_REFRESH_POOL_SIZE = 50
    469 
    470 class ProcessRefresher(object):
    471     """Object to refresh process information from give pidfiles.
    472 
    473     Usage: ProcessRefresh(True)(pidfile_list)
    474     """
    475 
    476     def __init__(self, check_mark, use_pool=False):
    477         """
    478         @param check_mark: If True, only consider processes that were
    479                 explicitly marked by a former drone_utility call as autotest
    480                 related processes.
    481         @param use_pool: If True, use a multiprocessing.Pool to parallelize
    482                 costly operations.
    483         """
    484         self._check_mark = check_mark
    485         self._use_pool = use_pool
    486         self._pool = None
    487 
    488 
    489     def __call__(self, pidfile_paths):
    490         """
    491         @param pidfile_paths: A list of paths to check for pidfiles.
    492 
    493         @returns (result, warnings)
    494             where result is a dict with the following keys:
    495             - pidfiles: dict mapping pidfile paths to file contents, for
    496               pidfiles that exist.
    497             - all_processes: list of dicts corresponding to all running
    498               processes. Each dict contain pid, pgid, ppid, comm, and args (see
    499               "man ps" for details).
    500             - autoserv_processes: likewise, restricted to autoserv processes.
    501             - parse_processes: likewise, restricted to parse processes.
    502             - pidfiles_second_read: same info as pidfiles, but gathered after
    503               the processes are scanned.
    504             and warnings is a list of warnings genearted during process refresh.
    505         """
    506 
    507         if self._use_pool:
    508             pool_size = max(
    509                     min(len(pidfile_paths), _MAX_REFRESH_POOL_SIZE),
    510                     1)
    511             self._pool = multiprocessing.Pool(pool_size)
    512         else:
    513             pool_size = 0
    514         logging.info('Refreshing %d pidfiles with %d helper processes',
    515                      len(pidfile_paths), pool_size)
    516 
    517         warnings = []
    518         # It is necessary to explicitly force this to be a list because results
    519         # are pickled by DroneUtility.
    520         proc_infos = list(_get_process_info())
    521 
    522         autoserv_processes, extra_warnings = self._filter_proc_infos(
    523                 proc_infos, 'autoserv')
    524         warnings += extra_warnings
    525         parse_processes, extra_warnings = self._filter_proc_infos(proc_infos,
    526                                                                   'parse')
    527         warnings += extra_warnings
    528         site_parse_processes, extra_warnings = self._filter_proc_infos(
    529                 proc_infos, 'site_parse')
    530         warnings += extra_warnings
    531 
    532         result = {
    533                 'pidfiles': self._read_pidfiles(pidfile_paths),
    534                 'all_processes': proc_infos,
    535                 'autoserv_processes': autoserv_processes,
    536                 'parse_processes': (parse_processes + site_parse_processes),
    537                 'pidfiles_second_read': self._read_pidfiles(pidfile_paths),
    538         }
    539         return result, warnings
    540 
    541 
    542     def _read_pidfiles(self, pidfile_paths):
    543         """Uses a process pool to read requested pidfile_paths."""
    544         if self._use_pool:
    545             contents = self._pool.map(_read_pidfile, pidfile_paths)
    546             contents = [c for c in contents if c is not None]
    547             return {k: v for k, v in contents}
    548         else:
    549             pidfiles = {}
    550             for path in pidfile_paths:
    551                 content = _read_pidfile(path)
    552                 if content is None:
    553                     continue
    554                 pidfiles[content.path] = content.content
    555             return pidfiles
    556 
    557 
    558     def _filter_proc_infos(self, proc_infos, command_name):
    559         """Filters process info for the given command_name.
    560 
    561         Examines ps output as returned by get_process_info and return
    562         the process dicts for processes matching the given command name.
    563 
    564         @proc_infos: ps output as returned by _get_process_info.
    565         @param command_name: The name of the command, eg 'autoserv'.
    566 
    567         @return: (proc_infos, warnings) where proc_infos is a list of ProcInfo
    568                 as returned by _get_process_info and warnings is a list of
    569                 warnings generated while filtering.
    570                 """
    571         proc_infos = [info for info in proc_infos
    572                       if info['comm'] == command_name]
    573         if not self._check_mark:
    574             return proc_infos, []
    575 
    576         if self._use_pool:
    577             dark_marks = self._pool.map(
    578                     _process_has_dark_mark,
    579                     [info['pid'] for info in proc_infos]
    580             )
    581         else:
    582             dark_marks = [_process_has_dark_mark(info['pid'])
    583                           for info in proc_infos]
    584 
    585         marked_proc_infos = []
    586         warnings = []
    587         for marked, info in itertools.izip(dark_marks, proc_infos):
    588             if marked:
    589                 marked_proc_infos.append(info)
    590             else:
    591                 warnings.append(
    592                         '%(comm)s process pid %(pid)s has no dark mark; '
    593                         'ignoring.' % info)
    594         return marked_proc_infos, warnings
    595 
    596 
    597 def create_host(hostname):
    598     # TODO(crbug.com/739466) Delay import to avoid a ~0.7 second penalty
    599     # drone_utility calls that don't actually interact with DUTs.
    600     from autotest_lib.server import hosts
    601     username = global_config.global_config.get_config_value(
    602         'SCHEDULER', hostname + '_username', default=getpass.getuser())
    603     return hosts.SSHHost(hostname, user=username)
    604 
    605 
    606 def parse_input():
    607     input_chunks = []
    608     chunk_of_input = sys.stdin.read()
    609     while chunk_of_input:
    610         input_chunks.append(chunk_of_input)
    611         chunk_of_input = sys.stdin.read()
    612     pickled_input = ''.join(input_chunks)
    613 
    614     try:
    615         return pickle.loads(pickled_input)
    616     except Exception:
    617         separator = '*' * 50
    618         raise ValueError('Unpickling input failed\n'
    619                          'Input: %r\n'
    620                          'Exception from pickle:\n'
    621                          '%s\n%s\n%s' %
    622                          (pickled_input, separator, traceback.format_exc(),
    623                           separator))
    624 
    625 
    626 def _parse_args(args):
    627     parser = argparse.ArgumentParser(description='Local drone process manager.')
    628     parser.add_argument('--call_time',
    629                         help='Time this process was invoked from the master',
    630                         default=None, type=float)
    631     return parser.parse_args(args)
    632 
    633 
    634 def return_data(data):
    635     print pickle.dumps(data)
    636 
    637 def _process_has_dark_mark(pid):
    638     """Checks if a process was launched earlier by drone_utility.
    639 
    640     @param pid: The pid of the process to check.
    641     """
    642     try:
    643         with open('/proc/%s/environ' % pid, 'rb') as env_file:
    644             env_data = env_file.read()
    645     except EnvironmentError:
    646         return False
    647     return DARK_MARK_ENVIRONMENT_VAR in env_data
    648 
    649 
    650 _PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args')
    651 def _get_process_info():
    652     """Parse ps output for all process information.
    653 
    654     @returns A generator of dicts. Each dict has the following keys:
    655         - comm: command_name,
    656         - pgid: process group id,
    657         - ppid: parent process id,
    658         - pid: process id,
    659         - args: args the command was invoked with,
    660     """
    661     @retry.retry(subprocess.CalledProcessError,
    662                     timeout_min=0.5, delay_sec=0.25)
    663     def run_ps():
    664         return subprocess.check_output(
    665                 ['/bin/ps', '--no-header', 'x', '-o', ','.join(_PS_ARGS)])
    666 
    667     ps_output = run_ps()
    668     # split each line into the columns output by ps
    669     split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
    670     return (dict(itertools.izip(_PS_ARGS, line_components))
    671             for line_components in split_lines)
    672 
    673 
    674 _PidfileContent = collections.namedtuple('_PidfileContent', ['path', 'content'])
    675 def _read_pidfile(pidfile_path):
    676     """Reads the content of the given pidfile if it exists
    677 
    678     @param: pidfile_path: Path of the file to read.
    679     @returns: _PidfileContent tuple on success, None otherwise.
    680     """
    681     if not os.path.exists(pidfile_path):
    682         return None
    683     try:
    684         with open(pidfile_path, 'r') as file_object:
    685             return _PidfileContent(pidfile_path, file_object.read())
    686     except IOError:
    687         return None
    688 
    689 
    690 def main():
    691     logging_manager.configure_logging(
    692             drone_logging_config.DroneLoggingConfig())
    693     calls = parse_input()
    694     args = _parse_args(sys.argv[1:])
    695 
    696     drone_utility = DroneUtility()
    697     return_value = drone_utility.execute_calls(calls)
    698     return_data(return_value)
    699 
    700 
    701 if __name__ == '__main__':
    702     main()
    703