1 #!/usr/bin/python 2 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3 # Use of this source code is governed by a BSD-style license that can be 4 # found in the LICENSE file. 5 6 import atexit 7 import itertools 8 import logging 9 import os 10 import pipes 11 import pwd 12 import select 13 import subprocess 14 import threading 15 16 from autotest_lib.client.common_lib.utils import TEE_TO_LOGS 17 18 _popen_lock = threading.Lock() 19 _logging_service = None 20 _command_serial_number = itertools.count(1) 21 22 _LOG_BUFSIZE = 4096 23 _PIPE_CLOSED = -1 24 25 class _LoggerProxy(object): 26 27 def __init__(self, logger): 28 self._logger = logger 29 30 def fileno(self): 31 """Returns the fileno of the logger pipe.""" 32 return self._logger._pipe[1] 33 34 def __del__(self): 35 self._logger.close() 36 37 38 class _PipeLogger(object): 39 40 def __init__(self, level, prefix): 41 self._pipe = list(os.pipe()) 42 self._level = level 43 self._prefix = prefix 44 45 def close(self): 46 """Closes the logger.""" 47 if self._pipe[1] != _PIPE_CLOSED: 48 os.close(self._pipe[1]) 49 self._pipe[1] = _PIPE_CLOSED 50 51 52 class _LoggingService(object): 53 54 def __init__(self): 55 # Python's list is thread safe 56 self._loggers = [] 57 58 # Change tuple to list so that we can change the value when 59 # closing the pipe. 60 self._pipe = list(os.pipe()) 61 self._thread = threading.Thread(target=self._service_run) 62 self._thread.daemon = True 63 self._thread.start() 64 65 66 def _service_run(self): 67 terminate_loop = False 68 while not terminate_loop: 69 rlist = [l._pipe[0] for l in self._loggers] 70 rlist.append(self._pipe[0]) 71 for r in select.select(rlist, [], [])[0]: 72 data = os.read(r, _LOG_BUFSIZE) 73 if r != self._pipe[0]: 74 self._output_logger_message(r, data) 75 elif len(data) == 0: 76 terminate_loop = True 77 # Release resources. 78 os.close(self._pipe[0]) 79 for logger in self._loggers: 80 os.close(logger._pipe[0]) 81 82 83 def _output_logger_message(self, r, data): 84 logger = next(l for l in self._loggers if l._pipe[0] == r) 85 86 if len(data) == 0: 87 os.close(logger._pipe[0]) 88 self._loggers.remove(logger) 89 return 90 91 for line in data.split('\n'): 92 logging.log(logger._level, '%s%s', logger._prefix, line) 93 94 95 def create_logger(self, level=logging.DEBUG, prefix=''): 96 """Creates a new logger. 97 98 @param level: the desired logging level 99 @param prefix: the prefix to add to each log entry 100 """ 101 logger = _PipeLogger(level=level, prefix=prefix) 102 self._loggers.append(logger) 103 os.write(self._pipe[1], '\0') 104 return _LoggerProxy(logger) 105 106 107 def shutdown(self): 108 """Shuts down the logger.""" 109 if self._pipe[1] != _PIPE_CLOSED: 110 os.close(self._pipe[1]) 111 self._pipe[1] = _PIPE_CLOSED 112 self._thread.join() 113 114 115 def create_logger(level=logging.DEBUG, prefix=''): 116 """Creates a new logger. 117 118 @param level: the desired logging level 119 @param prefix: the prefix to add to each log entry 120 """ 121 global _logging_service 122 if _logging_service is None: 123 _logging_service = _LoggingService() 124 atexit.register(_logging_service.shutdown) 125 return _logging_service.create_logger(level=level, prefix=prefix) 126 127 128 def kill_or_log_returncode(*popens): 129 """Kills all the processes of the given Popens or logs the return code. 130 131 @param popens: The Popens to be killed. 132 """ 133 for p in popens: 134 if p.poll() is None: 135 try: 136 p.kill() 137 except Exception as e: 138 logging.warning('failed to kill %d, %s', p.pid, e) 139 else: 140 logging.warning('command exit (pid=%d, rc=%d): %s', 141 p.pid, p.returncode, p.command) 142 143 144 def wait_and_check_returncode(*popens): 145 """Wait for all the Popens and check the return code is 0. 146 147 If the return code is not 0, it raises an RuntimeError. 148 149 @param popens: The Popens to be checked. 150 """ 151 error_message = None 152 for p in popens: 153 if p.wait() != 0: 154 error_message = ('Command failed(%d, %d): %s' % 155 (p.pid, p.returncode, p.command)) 156 logging.error(error_message) 157 if error_message: 158 raise RuntimeError(error_message) 159 160 161 def execute(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS, 162 run_as=None): 163 """Executes a child command and wait for it. 164 165 Returns the output from standard output if 'stdout' is subprocess.PIPE. 166 Raises RuntimeException if the return code of the child command is not 0. 167 168 @param args: the command to be executed 169 @param stdin: the executed program's standard input 170 @param stdout: the executed program's standard output 171 @param stderr: the executed program's standard error 172 @param run_as: if not None, run the command as the given user 173 """ 174 ps = popen(args, stdin=stdin, stdout=stdout, stderr=stderr, 175 run_as=run_as) 176 out = ps.communicate()[0] if stdout == subprocess.PIPE else None 177 wait_and_check_returncode(ps) 178 return out 179 180 181 def _run_as(user): 182 """Changes the uid and gid of the running process to be that of the 183 given user and configures its supplementary groups. 184 185 Don't call this function directly, instead wrap it in a lambda and 186 pass that to the preexec_fn argument of subprocess.Popen. 187 188 Example usage: 189 subprocess.Popen(..., preexec_fn=lambda: _run_as('chronos')) 190 191 @param user: the user to run as 192 """ 193 pw = pwd.getpwnam(user) 194 os.setgid(pw.pw_gid) 195 os.initgroups(user, pw.pw_gid) 196 os.setuid(pw.pw_uid) 197 198 199 def popen(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS, env=None, 200 run_as=None): 201 """Returns a Popen object just as subprocess.Popen does but with the 202 executed command stored in Popen.command. 203 204 @param args: the command to be executed 205 @param stdin: the executed program's standard input 206 @param stdout: the executed program's standard output 207 @param stderr: the executed program's standard error 208 @param env: the executed program's environment 209 @param run_as: if not None, run the command as the given user 210 """ 211 command_id = _command_serial_number.next() 212 prefix = '[%04d] ' % command_id 213 214 if stdout is TEE_TO_LOGS: 215 stdout = create_logger(level=logging.DEBUG, prefix=prefix) 216 if stderr is TEE_TO_LOGS: 217 stderr = create_logger(level=logging.ERROR, prefix=prefix) 218 219 command = ' '.join(pipes.quote(x) for x in args) 220 logging.info('%sRunning: %s', prefix, command) 221 222 preexec_fn = None 223 if run_as is not None: 224 preexec_fn = lambda: _run_as(run_as) 225 226 # The lock is required for http://crbug.com/323843. 227 with _popen_lock: 228 ps = subprocess.Popen(args, stdin=stdin, stdout=stdout, stderr=stderr, 229 env=env, preexec_fn=preexec_fn) 230 logging.info('%spid is %d', prefix, ps.pid) 231 ps.command_id = command_id 232 ps.command = command 233 return ps 234