Home | History | Annotate | Download | only in audio
      1 #!/usr/bin/python
      2 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 import atexit
      7 import itertools
      8 import logging
      9 import os
     10 import pipes
     11 import pwd
     12 import select
     13 import subprocess
     14 import threading
     15 
     16 from autotest_lib.client.common_lib.utils import TEE_TO_LOGS
     17 
     18 _popen_lock = threading.Lock()
     19 _logging_service = None
     20 _command_serial_number = itertools.count(1)
     21 
     22 _LOG_BUFSIZE = 4096
     23 _PIPE_CLOSED = -1
     24 
     25 class _LoggerProxy(object):
     26 
     27     def __init__(self, logger):
     28         self._logger = logger
     29 
     30     def fileno(self):
     31         """Returns the fileno of the logger pipe."""
     32         return self._logger._pipe[1]
     33 
     34     def __del__(self):
     35         self._logger.close()
     36 
     37 
     38 class _PipeLogger(object):
     39 
     40     def __init__(self, level, prefix):
     41         self._pipe = list(os.pipe())
     42         self._level = level
     43         self._prefix = prefix
     44 
     45     def close(self):
     46         """Closes the logger."""
     47         if self._pipe[1] != _PIPE_CLOSED:
     48             os.close(self._pipe[1])
     49             self._pipe[1] = _PIPE_CLOSED
     50 
     51 
     52 class _LoggingService(object):
     53 
     54     def __init__(self):
     55         # Python's list is thread safe
     56         self._loggers = []
     57 
     58         # Change tuple to list so that we can change the value when
     59         # closing the pipe.
     60         self._pipe = list(os.pipe())
     61         self._thread = threading.Thread(target=self._service_run)
     62         self._thread.daemon = True
     63         self._thread.start()
     64 
     65 
     66     def _service_run(self):
     67         terminate_loop = False
     68         while not terminate_loop:
     69             rlist = [l._pipe[0] for l in self._loggers]
     70             rlist.append(self._pipe[0])
     71             for r in select.select(rlist, [], [])[0]:
     72                 data = os.read(r, _LOG_BUFSIZE)
     73                 if r != self._pipe[0]:
     74                     self._output_logger_message(r, data)
     75                 elif len(data) == 0:
     76                     terminate_loop = True
     77         # Release resources.
     78         os.close(self._pipe[0])
     79         for logger in self._loggers:
     80             os.close(logger._pipe[0])
     81 
     82 
     83     def _output_logger_message(self, r, data):
     84         logger = next(l for l in self._loggers if l._pipe[0] == r)
     85 
     86         if len(data) == 0:
     87             os.close(logger._pipe[0])
     88             self._loggers.remove(logger)
     89             return
     90 
     91         for line in data.split('\n'):
     92             logging.log(logger._level, '%s%s', logger._prefix, line)
     93 
     94 
     95     def create_logger(self, level=logging.DEBUG, prefix=''):
     96         """Creates a new logger.
     97 
     98         @param level: the desired logging level
     99         @param prefix: the prefix to add to each log entry
    100         """
    101         logger = _PipeLogger(level=level, prefix=prefix)
    102         self._loggers.append(logger)
    103         os.write(self._pipe[1], '\0')
    104         return _LoggerProxy(logger)
    105 
    106 
    107     def shutdown(self):
    108         """Shuts down the logger."""
    109         if self._pipe[1] != _PIPE_CLOSED:
    110             os.close(self._pipe[1])
    111             self._pipe[1] = _PIPE_CLOSED
    112             self._thread.join()
    113 
    114 
    115 def create_logger(level=logging.DEBUG, prefix=''):
    116     """Creates a new logger.
    117 
    118     @param level: the desired logging level
    119     @param prefix: the prefix to add to each log entry
    120     """
    121     global _logging_service
    122     if _logging_service is None:
    123         _logging_service = _LoggingService()
    124         atexit.register(_logging_service.shutdown)
    125     return _logging_service.create_logger(level=level, prefix=prefix)
    126 
    127 
    128 def kill_or_log_returncode(*popens):
    129     """Kills all the processes of the given Popens or logs the return code.
    130 
    131     @param popens: The Popens to be killed.
    132     """
    133     for p in popens:
    134         if p.poll() is None:
    135             try:
    136                 p.kill()
    137             except Exception as e:
    138                 logging.warning('failed to kill %d, %s', p.pid, e)
    139         else:
    140             logging.warning('command exit (pid=%d, rc=%d): %s',
    141                             p.pid, p.returncode, p.command)
    142 
    143 
    144 def wait_and_check_returncode(*popens):
    145     """Wait for all the Popens and check the return code is 0.
    146 
    147     If the return code is not 0, it raises an RuntimeError.
    148 
    149     @param popens: The Popens to be checked.
    150     """
    151     error_message = None
    152     for p in popens:
    153         if p.wait() != 0:
    154             error_message = ('Command failed(%d, %d): %s' %
    155                              (p.pid, p.returncode, p.command))
    156             logging.error(error_message)
    157     if error_message:
    158         raise RuntimeError(error_message)
    159 
    160 
    161 def execute(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS,
    162             run_as=None):
    163     """Executes a child command and wait for it.
    164 
    165     Returns the output from standard output if 'stdout' is subprocess.PIPE.
    166     Raises RuntimeException if the return code of the child command is not 0.
    167 
    168     @param args: the command to be executed
    169     @param stdin: the executed program's standard input
    170     @param stdout: the executed program's standard output
    171     @param stderr: the executed program's standard error
    172     @param run_as: if not None, run the command as the given user
    173     """
    174     ps = popen(args, stdin=stdin, stdout=stdout, stderr=stderr,
    175                run_as=run_as)
    176     out = ps.communicate()[0] if stdout == subprocess.PIPE else None
    177     wait_and_check_returncode(ps)
    178     return out
    179 
    180 
    181 def _run_as(user):
    182     """Changes the uid and gid of the running process to be that of the
    183     given user and configures its supplementary groups.
    184 
    185     Don't call this function directly, instead wrap it in a lambda and
    186     pass that to the preexec_fn argument of subprocess.Popen.
    187 
    188     Example usage:
    189     subprocess.Popen(..., preexec_fn=lambda: _run_as('chronos'))
    190 
    191     @param user: the user to run as
    192     """
    193     pw = pwd.getpwnam(user)
    194     os.setgid(pw.pw_gid)
    195     os.initgroups(user, pw.pw_gid)
    196     os.setuid(pw.pw_uid)
    197 
    198 
    199 def popen(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS, env=None,
    200           run_as=None):
    201     """Returns a Popen object just as subprocess.Popen does but with the
    202     executed command stored in Popen.command.
    203 
    204     @param args: the command to be executed
    205     @param stdin: the executed program's standard input
    206     @param stdout: the executed program's standard output
    207     @param stderr: the executed program's standard error
    208     @param env: the executed program's environment
    209     @param run_as: if not None, run the command as the given user
    210     """
    211     command_id = _command_serial_number.next()
    212     prefix = '[%04d] ' % command_id
    213 
    214     if stdout is TEE_TO_LOGS:
    215         stdout = create_logger(level=logging.DEBUG, prefix=prefix)
    216     if stderr is TEE_TO_LOGS:
    217         stderr = create_logger(level=logging.ERROR, prefix=prefix)
    218 
    219     command = ' '.join(pipes.quote(x) for x in args)
    220     logging.info('%sRunning: %s', prefix, command)
    221 
    222     preexec_fn = None
    223     if run_as is not None:
    224         preexec_fn = lambda: _run_as(run_as)
    225 
    226     # The lock is required for http://crbug.com/323843.
    227     with _popen_lock:
    228         ps = subprocess.Popen(args, stdin=stdin, stdout=stdout, stderr=stderr,
    229                               env=env, preexec_fn=preexec_fn)
    230     logging.info('%spid is %d', prefix, ps.pid)
    231     ps.command_id = command_id
    232     ps.command = command
    233     return ps
    234