Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 
      3 """Utility module that executes management commands on the drone.
      4 
      5 1. This is the module responsible for orchestrating processes on a drone.
      6 2. It receives instructions via stdin and replies via stdout.
      7 3. Each invocation is responsible for the initiation of a set of batched calls.
      8 4. The batched calls may be synchronous or asynchronous.
      9 5. The caller is responsible for monitoring asynchronous calls through pidfiles.
     10 """
     11 
     12 #pylint: disable-msg=missing-docstring
     13 
     14 import argparse
     15 import collections
     16 import datetime
     17 import getpass
     18 import itertools
     19 import logging
     20 import multiprocessing
     21 import os
     22 import pickle
     23 import shutil
     24 import signal
     25 import subprocess
     26 import sys
     27 import time
     28 import traceback
     29 
     30 import common
     31 
     32 from autotest_lib.client.common_lib import error
     33 from autotest_lib.client.common_lib import global_config
     34 from autotest_lib.client.common_lib import logging_manager
     35 from autotest_lib.client.common_lib import utils
     36 from autotest_lib.client.common_lib.cros import retry
     37 from autotest_lib.scheduler import drone_logging_config
     38 from autotest_lib.scheduler import scheduler_config
     39 from autotest_lib.server import subcommand
     40 
     41 
     42 # An environment variable we add to the environment to enable us to
     43 # distinguish processes we started from those that were started by
     44 # something else during recovery.  Name credit goes to showard. ;)
     45 DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK'
     46 
     47 _TEMPORARY_DIRECTORY = 'drone_tmp'
     48 _TRANSFER_FAILED_FILE = '.transfer_failed'
     49 
     50 # script and log file for cleaning up orphaned lxc containers.
     51 LXC_CLEANUP_SCRIPT = os.path.join(common.autotest_dir, 'site_utils',
     52                                   'lxc_cleanup.py')
     53 LXC_CLEANUP_LOG_FILE = os.path.join(common.autotest_dir, 'logs',
     54                                     'lxc_cleanup.log')
     55 
     56 
     57 class _MethodCall(object):
     58     def __init__(self, method, args, kwargs):
     59         self._method = method
     60         self._args = args
     61         self._kwargs = kwargs
     62 
     63 
     64     def execute_on(self, drone_utility):
     65         method = getattr(drone_utility, self._method)
     66         return method(*self._args, **self._kwargs)
     67 
     68 
     69     def __str__(self):
     70         args = ', '.join(repr(arg) for arg in self._args)
     71         kwargs = ', '.join('%s=%r' % (key, value) for key, value in
     72                            self._kwargs.iteritems())
     73         full_args = ', '.join(item for item in (args, kwargs) if item)
     74         return '%s(%s)' % (self._method, full_args)
     75 
     76 
     77 def call(method, *args, **kwargs):
     78     return _MethodCall(method, args, kwargs)
     79 
     80 
     81 class DroneUtility(object):
     82     """
     83     This class executes actual OS calls on the drone machine.
     84 
     85     All paths going into and out of this class are absolute.
     86     """
     87     _WARNING_DURATION = 400
     88 
     89     def __init__(self):
     90         # Tattoo ourselves so that all of our spawn bears our mark.
     91         os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid()))
     92 
     93         self.warnings = []
     94         self._subcommands = []
     95 
     96 
     97     def initialize(self, results_dir):
     98         temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
     99         if os.path.exists(temporary_directory):
    100             # TODO crbug.com/391111: before we have a better solution to
    101             # periodically cleanup tmp files, we have to use rm -rf to delete
    102             # the whole folder. shutil.rmtree has performance issue when a
    103             # folder has large amount of files, e.g., drone_tmp.
    104             os.system('rm -rf %s' % temporary_directory)
    105         self._ensure_directory_exists(temporary_directory)
    106         # TODO (sbasi) crbug.com/345011 - Remove this configuration variable
    107         # and clean up build_externals so it NO-OP's.
    108         build_externals = global_config.global_config.get_config_value(
    109                 scheduler_config.CONFIG_SECTION, 'drone_build_externals',
    110                 default=True, type=bool)
    111         if build_externals:
    112             build_extern_cmd = os.path.join(common.autotest_dir,
    113                                             'utils', 'build_externals.py')
    114             utils.run(build_extern_cmd)
    115 
    116 
    117     def _warn(self, warning):
    118         self.warnings.append(warning)
    119 
    120 
    121     def refresh(self, pidfile_paths):
    122         """Refreshes our view of the processes referred to by pdfile_paths.
    123 
    124         See drone_utility.ProcessRefresher.__call__ for details.
    125         """
    126         check_mark = global_config.global_config.get_config_value(
    127             'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
    128         use_pool = global_config.global_config.get_config_value(
    129             'SCHEDULER', 'drone_utility_refresh_use_pool', bool, False)
    130         result, warnings = ProcessRefresher(check_mark, use_pool)(pidfile_paths)
    131         self.warnings += warnings
    132         return result
    133 
    134 
    135     def get_signal_queue_to_kill(self, process):
    136         """Get the signal queue needed to kill a process.
    137 
    138         autoserv process has a handle on SIGTERM, in which it can do some
    139         cleanup work. However, abort a process with SIGTERM then SIGKILL has
    140         its overhead, detailed in following CL:
    141         https://chromium-review.googlesource.com/230323
    142         This method checks the process's argument and determine if SIGTERM is
    143         required, and returns signal queue accordingly.
    144 
    145         @param process: A drone_manager.Process object to be killed.
    146 
    147         @return: The signal queue needed to kill a process.
    148 
    149         """
    150         signal_queue_with_sigterm = (signal.SIGTERM, signal.SIGKILL)
    151         try:
    152             ps_output = subprocess.check_output(
    153                     ['/bin/ps', '-p', str(process.pid), '-o', 'args'])
    154             # For test running with server-side packaging, SIGTERM needs to be
    155             # sent for autoserv process to destroy container used by the test.
    156             if '--require-ssp' in ps_output:
    157                 logging.debug('PID %d requires SIGTERM to abort to cleanup '
    158                               'container.', process.pid)
    159                 return signal_queue_with_sigterm
    160         except subprocess.CalledProcessError:
    161             # Ignore errors, return the signal queue with SIGTERM to be safe.
    162             return signal_queue_with_sigterm
    163         # Default to kill the process with SIGKILL directly.
    164         return (signal.SIGKILL,)
    165 
    166 
    167     def kill_processes(self, process_list):
    168         """Send signals escalating in severity to the processes in process_list.
    169 
    170         @param process_list: A list of drone_manager.Process objects
    171                              representing the processes to kill.
    172         """
    173         try:
    174             logging.info('List of process to be killed: %s', process_list)
    175             processes_to_kill = {}
    176             for p in process_list:
    177                 signal_queue = self.get_signal_queue_to_kill(p)
    178                 processes_to_kill[signal_queue] = (
    179                         processes_to_kill.get(signal_queue, []) + [p])
    180             sig_counts = {}
    181             for signal_queue, processes in processes_to_kill.iteritems():
    182                 sig_counts.update(utils.nuke_pids(
    183                         [-process.pid for process in processes],
    184                         signal_queue=signal_queue))
    185         except error.AutoservRunError as e:
    186             self._warn('Error occured when killing processes. Error: %s' % e)
    187 
    188 
    189     def _ensure_directory_exists(self, path):
    190         if not os.path.exists(path):
    191             os.makedirs(path)
    192             return
    193         if os.path.isdir(path):
    194             return
    195         assert os.path.isfile(path)
    196         if '/hosts/' in path:
    197             return
    198         raise IOError('Path %s exists as a file, not a directory')
    199 
    200 
    201     def execute_command(self, command, working_directory, log_file,
    202                         pidfile_name):
    203         out_file = None
    204         if log_file:
    205             self._ensure_directory_exists(os.path.dirname(log_file))
    206             try:
    207                 out_file = open(log_file, 'a')
    208                 separator = ('*' * 80) + '\n'
    209                 out_file.write('\n' + separator)
    210                 out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
    211                 out_file.write(separator)
    212             except (OSError, IOError):
    213                 pass
    214 
    215         if not out_file:
    216             out_file = open('/dev/null', 'w')
    217 
    218         in_devnull = open('/dev/null', 'r')
    219 
    220         self._ensure_directory_exists(working_directory)
    221         pidfile_path = os.path.join(working_directory, pidfile_name)
    222         if os.path.exists(pidfile_path):
    223             self._warn('Pidfile %s already exists' % pidfile_path)
    224             os.remove(pidfile_path)
    225 
    226         subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
    227                          stdin=in_devnull)
    228         out_file.close()
    229         in_devnull.close()
    230 
    231 
    232     def write_to_file(self, file_path, contents, is_retry=False):
    233         """Write the specified contents to the end of the given file.
    234 
    235         @param file_path: Path to the file.
    236         @param contents: Content to be written to the file.
    237         @param is_retry: True if this is a retry after file permission be
    238                          corrected.
    239         """
    240         self._ensure_directory_exists(os.path.dirname(file_path))
    241         try:
    242             file_object = open(file_path, 'a')
    243             file_object.write(contents)
    244             file_object.close()
    245         except IOError as e:
    246             # TODO(dshi): crbug.com/459344 Remove following retry when test
    247             # container can be unprivileged container.
    248             # If write failed with error 'Permission denied', one possible cause
    249             # is that the file was created in a container and thus owned by
    250             # root. If so, fix the file permission, and try again.
    251             if e.errno == 13 and not is_retry:
    252                 logging.error('Error write to file %s: %s. Will be retried.',
    253                               file_path, e)
    254                 utils.run('sudo chown %s "%s"' % (os.getuid(), file_path))
    255                 utils.run('sudo chgrp %s "%s"' % (os.getgid(), file_path))
    256                 self.write_to_file(file_path, contents, is_retry=True)
    257             else:
    258                 self._warn('Error write to file %s: %s' % (file_path, e))
    259 
    260 
    261     def copy_file_or_directory(self, source_path, destination_path):
    262         """
    263         This interface is designed to match server.hosts.abstract_ssh.get_file
    264         (and send_file).  That is, if the source_path ends with a slash, the
    265         contents of the directory are copied; otherwise, the directory iself is
    266         copied.
    267         """
    268         if self._same_file(source_path, destination_path):
    269             return
    270         self._ensure_directory_exists(os.path.dirname(destination_path))
    271         if source_path.endswith('/'):
    272             # copying a directory's contents to another directory
    273             assert os.path.isdir(source_path)
    274             assert os.path.isdir(destination_path)
    275             for filename in os.listdir(source_path):
    276                 self.copy_file_or_directory(
    277                     os.path.join(source_path, filename),
    278                     os.path.join(destination_path, filename))
    279         elif os.path.isdir(source_path):
    280             try:
    281                 shutil.copytree(source_path, destination_path, symlinks=True)
    282             except shutil.Error:
    283                 # Ignore copy directory error due to missing files. The cause
    284                 # of this behavior is that, gs_offloader zips up folders with
    285                 # too many files. There is a race condition that repair job
    286                 # tries to copy provision job results to the test job result
    287                 # folder, meanwhile gs_offloader is uploading the provision job
    288                 # result and zipping up folders which contains too many files.
    289                 pass
    290         elif os.path.islink(source_path):
    291             # copied from shutil.copytree()
    292             link_to = os.readlink(source_path)
    293             os.symlink(link_to, destination_path)
    294         else:
    295             try:
    296                 shutil.copy(source_path, destination_path)
    297             except IOError:
    298                 # Ignore copy error following the same above reason.
    299                 pass
    300 
    301 
    302     def _same_file(self, source_path, destination_path):
    303         """Checks if the source and destination are the same
    304 
    305         Returns True if the destination is the same as the source, False
    306         otherwise. Also returns False if the destination does not exist.
    307         """
    308         if not os.path.exists(destination_path):
    309             return False
    310         return os.path.samefile(source_path, destination_path)
    311 
    312 
    313     def cleanup_orphaned_containers(self):
    314         """Run lxc_cleanup script to clean up orphaned container.
    315         """
    316         # The script needs to run with sudo as the containers are privileged.
    317         # TODO(dshi): crbug.com/459344 Call lxc_cleanup.main when test
    318         # container can be unprivileged container.
    319         command = ['sudo', LXC_CLEANUP_SCRIPT, '-x', '-v', '-l',
    320                    LXC_CLEANUP_LOG_FILE]
    321         logging.info('Running %s', command)
    322         # stdout and stderr needs to be direct to /dev/null, otherwise existing
    323         # of drone_utils process will kill lxc_cleanup script.
    324         subprocess.Popen(
    325                 command, shell=False, stdin=None, stdout=open('/dev/null', 'w'),
    326                 stderr=open('/dev/null', 'a'), preexec_fn=os.setpgrp)
    327 
    328 
    329     def wait_for_all_async_commands(self):
    330         for subproc in self._subcommands:
    331             subproc.fork_waitfor()
    332         self._subcommands = []
    333 
    334 
    335     def _poll_async_commands(self):
    336         still_running = []
    337         for subproc in self._subcommands:
    338             if subproc.poll() is None:
    339                 still_running.append(subproc)
    340         self._subcommands = still_running
    341 
    342 
    343     def _wait_for_some_async_commands(self):
    344         self._poll_async_commands()
    345         max_processes = scheduler_config.config.max_transfer_processes
    346         while len(self._subcommands) >= max_processes:
    347             time.sleep(1)
    348             self._poll_async_commands()
    349 
    350 
    351     def run_async_command(self, function, args):
    352         subproc = subcommand.subcommand(function, args)
    353         self._subcommands.append(subproc)
    354         subproc.fork_start()
    355 
    356 
    357     def _sync_get_file_from(self, hostname, source_path, destination_path):
    358         logging.debug('_sync_get_file_from hostname: %s, source_path: %s,'
    359                       'destination_path: %s', hostname, source_path,
    360                       destination_path)
    361         self._ensure_directory_exists(os.path.dirname(destination_path))
    362         host = create_host(hostname)
    363         host.get_file(source_path, destination_path, delete_dest=True)
    364 
    365 
    366     def get_file_from(self, hostname, source_path, destination_path):
    367         self.run_async_command(self._sync_get_file_from,
    368                                (hostname, source_path, destination_path))
    369 
    370 
    371     def _sync_send_file_to(self, hostname, source_path, destination_path,
    372                            can_fail):
    373         logging.debug('_sync_send_file_to. hostname: %s, source_path: %s, '
    374                       'destination_path: %s, can_fail:%s', hostname,
    375                       source_path, destination_path, can_fail)
    376         host = create_host(hostname)
    377         try:
    378             host.run('mkdir -p ' + os.path.dirname(destination_path))
    379             host.send_file(source_path, destination_path, delete_dest=True)
    380         except error.AutoservError:
    381             if not can_fail:
    382                 raise
    383 
    384             if os.path.isdir(source_path):
    385                 failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
    386                 file_object = open(failed_file, 'w')
    387                 try:
    388                     file_object.write('%s:%s\n%s\n%s' %
    389                                       (hostname, destination_path,
    390                                        datetime.datetime.now(),
    391                                        traceback.format_exc()))
    392                 finally:
    393                     file_object.close()
    394             else:
    395                 copy_to = destination_path + _TRANSFER_FAILED_FILE
    396                 self._ensure_directory_exists(os.path.dirname(copy_to))
    397                 self.copy_file_or_directory(source_path, copy_to)
    398 
    399 
    400     def send_file_to(self, hostname, source_path, destination_path,
    401                      can_fail=False):
    402         self.run_async_command(self._sync_send_file_to,
    403                                (hostname, source_path, destination_path,
    404                                 can_fail))
    405 
    406 
    407     def _report_long_execution(self, calls, duration):
    408         call_count = {}
    409         for call in calls:
    410             call_count.setdefault(call._method, 0)
    411             call_count[call._method] += 1
    412         call_summary = '\n'.join('%d %s' % (count, method)
    413                                  for method, count in call_count.iteritems())
    414         self._warn('Execution took %f sec\n%s' % (duration, call_summary))
    415 
    416 
    417     def execute_calls(self, calls):
    418         results = []
    419         start_time = time.time()
    420         max_processes = scheduler_config.config.max_transfer_processes
    421         for method_call in calls:
    422             results.append(method_call.execute_on(self))
    423             if len(self._subcommands) >= max_processes:
    424                 self._wait_for_some_async_commands()
    425         self.wait_for_all_async_commands()
    426 
    427         duration = time.time() - start_time
    428         if duration > self._WARNING_DURATION:
    429             self._report_long_execution(calls, duration)
    430 
    431         warnings = self.warnings
    432         self.warnings = []
    433         return dict(results=results, warnings=warnings)
    434 
    435 
    436 _MAX_REFRESH_POOL_SIZE = 50
    437 
    438 class ProcessRefresher(object):
    439     """Object to refresh process information from give pidfiles.
    440 
    441     Usage: ProcessRefresh(True)(pidfile_list)
    442     """
    443 
    444     def __init__(self, check_mark, use_pool=False):
    445         """
    446         @param check_mark: If True, only consider processes that were
    447                 explicitly marked by a former drone_utility call as autotest
    448                 related processes.
    449         @param use_pool: If True, use a multiprocessing.Pool to parallelize
    450                 costly operations.
    451         """
    452         self._check_mark = check_mark
    453         self._use_pool = use_pool
    454         self._pool = None
    455 
    456 
    457     def __call__(self, pidfile_paths):
    458         """
    459         @param pidfile_paths: A list of paths to check for pidfiles.
    460 
    461         @returns (result, warnings)
    462             where result is a dict with the following keys:
    463             - pidfiles: dict mapping pidfile paths to file contents, for
    464               pidfiles that exist.
    465             - all_processes: list of dicts corresponding to all running
    466               processes. Each dict contain pid, pgid, ppid, comm, and args (see
    467               "man ps" for details).
    468             - autoserv_processes: likewise, restricted to autoserv processes.
    469             - parse_processes: likewise, restricted to parse processes.
    470             - pidfiles_second_read: same info as pidfiles, but gathered after
    471               the processes are scanned.
    472             and warnings is a list of warnings genearted during process refresh.
    473         """
    474 
    475         if self._use_pool:
    476             pool_size = max(
    477                     min(len(pidfile_paths), _MAX_REFRESH_POOL_SIZE),
    478                     1)
    479             self._pool = multiprocessing.Pool(pool_size)
    480         else:
    481             pool_size = 0
    482         logging.info('Refreshing %d pidfiles with %d helper processes',
    483                      len(pidfile_paths), pool_size)
    484 
    485         warnings = []
    486         # It is necessary to explicitly force this to be a list because results
    487         # are pickled by DroneUtility.
    488         proc_infos = list(_get_process_info())
    489 
    490         autoserv_processes, extra_warnings = self._filter_proc_infos(
    491                 proc_infos, 'autoserv')
    492         warnings += extra_warnings
    493         parse_processes, extra_warnings = self._filter_proc_infos(proc_infos,
    494                                                                   'parse')
    495         warnings += extra_warnings
    496         site_parse_processes, extra_warnings = self._filter_proc_infos(
    497                 proc_infos, 'site_parse')
    498         warnings += extra_warnings
    499 
    500         result = {
    501                 'pidfiles': self._read_pidfiles(pidfile_paths),
    502                 'all_processes': proc_infos,
    503                 'autoserv_processes': autoserv_processes,
    504                 'parse_processes': (parse_processes + site_parse_processes),
    505                 'pidfiles_second_read': self._read_pidfiles(pidfile_paths),
    506         }
    507         return result, warnings
    508 
    509 
    510     def _read_pidfiles(self, pidfile_paths):
    511         """Uses a process pool to read requested pidfile_paths."""
    512         if self._use_pool:
    513             contents = self._pool.map(_read_pidfile, pidfile_paths)
    514             contents = [c for c in contents if c is not None]
    515             return {k: v for k, v in contents}
    516         else:
    517             pidfiles = {}
    518             for path in pidfile_paths:
    519                 content = _read_pidfile(path)
    520                 if content is None:
    521                     continue
    522                 pidfiles[content.path] = content.content
    523             return pidfiles
    524 
    525 
    526     def _filter_proc_infos(self, proc_infos, command_name):
    527         """Filters process info for the given command_name.
    528 
    529         Examines ps output as returned by get_process_info and return
    530         the process dicts for processes matching the given command name.
    531 
    532         @proc_infos: ps output as returned by _get_process_info.
    533         @param command_name: The name of the command, eg 'autoserv'.
    534 
    535         @return: (proc_infos, warnings) where proc_infos is a list of ProcInfo
    536                 as returned by _get_process_info and warnings is a list of
    537                 warnings generated while filtering.
    538                 """
    539         proc_infos = [info for info in proc_infos
    540                       if info['comm'] == command_name]
    541         if not self._check_mark:
    542             return proc_infos, []
    543 
    544         if self._use_pool:
    545             dark_marks = self._pool.map(
    546                     _process_has_dark_mark,
    547                     [info['pid'] for info in proc_infos]
    548             )
    549         else:
    550             dark_marks = [_process_has_dark_mark(info['pid'])
    551                           for info in proc_infos]
    552 
    553         marked_proc_infos = []
    554         warnings = []
    555         for marked, info in itertools.izip(dark_marks, proc_infos):
    556             if marked:
    557                 marked_proc_infos.append(info)
    558             else:
    559                 warnings.append(
    560                         '%(comm)s process pid %(pid)s has no dark mark; '
    561                         'ignoring.' % info)
    562         return marked_proc_infos, warnings
    563 
    564 
    565 def create_host(hostname):
    566     # TODO(crbug.com/739466) Delay import to avoid a ~0.7 second penalty
    567     # drone_utility calls that don't actually interact with DUTs.
    568     from autotest_lib.server import hosts
    569     username = global_config.global_config.get_config_value(
    570         'SCHEDULER', hostname + '_username', default=getpass.getuser())
    571     return hosts.SSHHost(hostname, user=username)
    572 
    573 
    574 def parse_input():
    575     input_chunks = []
    576     chunk_of_input = sys.stdin.read()
    577     while chunk_of_input:
    578         input_chunks.append(chunk_of_input)
    579         chunk_of_input = sys.stdin.read()
    580     pickled_input = ''.join(input_chunks)
    581 
    582     try:
    583         return pickle.loads(pickled_input)
    584     except Exception:
    585         separator = '*' * 50
    586         raise ValueError('Unpickling input failed\n'
    587                          'Input: %r\n'
    588                          'Exception from pickle:\n'
    589                          '%s\n%s\n%s' %
    590                          (pickled_input, separator, traceback.format_exc(),
    591                           separator))
    592 
    593 
    594 def _parse_args(args):
    595     parser = argparse.ArgumentParser(description='Local drone process manager.')
    596     parser.add_argument('--call_time',
    597                         help='Time this process was invoked from the master',
    598                         default=None, type=float)
    599     return parser.parse_args(args)
    600 
    601 
    602 def return_data(data):
    603     print pickle.dumps(data)
    604 
    605 def _process_has_dark_mark(pid):
    606     """Checks if a process was launched earlier by drone_utility.
    607 
    608     @param pid: The pid of the process to check.
    609     """
    610     try:
    611         with open('/proc/%s/environ' % pid, 'rb') as env_file:
    612             env_data = env_file.read()
    613     except EnvironmentError:
    614         return False
    615     return DARK_MARK_ENVIRONMENT_VAR in env_data
    616 
    617 
    618 _PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args')
    619 def _get_process_info():
    620     """Parse ps output for all process information.
    621 
    622     @returns A generator of dicts. Each dict has the following keys:
    623         - comm: command_name,
    624         - pgid: process group id,
    625         - ppid: parent process id,
    626         - pid: process id,
    627         - args: args the command was invoked with,
    628     """
    629     @retry.retry(subprocess.CalledProcessError,
    630                     timeout_min=0.5, delay_sec=0.25)
    631     def run_ps():
    632         return subprocess.check_output(
    633                 ['/bin/ps', '--no-header', 'x', '-o', ','.join(_PS_ARGS)])
    634 
    635     ps_output = run_ps()
    636     # split each line into the columns output by ps
    637     split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
    638     return (dict(itertools.izip(_PS_ARGS, line_components))
    639             for line_components in split_lines)
    640 
    641 
    642 _PidfileContent = collections.namedtuple('_PidfileContent', ['path', 'content'])
    643 def _read_pidfile(pidfile_path):
    644     """Reads the content of the given pidfile if it exists
    645 
    646     @param: pidfile_path: Path of the file to read.
    647     @returns: _PidfileContent tuple on success, None otherwise.
    648     """
    649     if not os.path.exists(pidfile_path):
    650         return None
    651     try:
    652         with open(pidfile_path, 'r') as file_object:
    653             return _PidfileContent(pidfile_path, file_object.read())
    654     except IOError:
    655         return None
    656 
    657 
    658 def main():
    659     logging_manager.configure_logging(
    660             drone_logging_config.DroneLoggingConfig())
    661     calls = parse_input()
    662     args = _parse_args(sys.argv[1:])
    663 
    664     drone_utility = DroneUtility()
    665     return_value = drone_utility.execute_calls(calls)
    666     return_data(return_value)
    667 
    668 
    669 if __name__ == '__main__':
    670     main()
    671