1 # Copyright (C) 2010 Google Inc. All rights reserved. 2 # 3 # Redistribution and use in source and binary forms, with or without 4 # modification, are permitted provided that the following conditions are 5 # met: 6 # 7 # * Redistributions of source code must retain the above copyright 8 # notice, this list of conditions and the following disclaimer. 9 # * Redistributions in binary form must reproduce the above 10 # copyright notice, this list of conditions and the following disclaimer 11 # in the documentation and/or other materials provided with the 12 # distribution. 13 # * Neither the Google name nor the names of its 14 # contributors may be used to endorse or promote products derived from 15 # this software without specific prior written permission. 16 # 17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 29 """Package that implements the ServerProcess wrapper class""" 30 31 import errno 32 import logging 33 import re 34 import signal 35 import sys 36 import time 37 38 # Note that although win32 python does provide an implementation of 39 # the win32 select API, it only works on sockets, and not on the named pipes 40 # used by subprocess, so we have to use the native APIs directly. 41 _quote_cmd = None 42 43 if sys.platform == 'win32': 44 import msvcrt 45 import win32pipe 46 import win32file 47 import subprocess 48 _quote_cmd = subprocess.list2cmdline 49 else: 50 import fcntl 51 import os 52 import pipes 53 import select 54 _quote_cmd = lambda cmdline: ' '.join(pipes.quote(arg) for arg in cmdline) 55 56 from webkitpy.common.system.executive import ScriptError 57 58 59 _log = logging.getLogger(__name__) 60 61 62 _trailing_spaces_re = re.compile('(.*[^ ])?( +)$') 63 64 65 def quote_data(data): 66 txt = repr(data).replace('\\n', '\\n\n')[1:-1] 67 lines = [] 68 for l in txt.splitlines(): 69 m = _trailing_spaces_re.match(l) 70 if m: 71 l = m.group(1) + m.group(2).replace(' ', '\x20') 72 lines.append(l) 73 return lines 74 75 class ServerProcess(object): 76 """This class provides a wrapper around a subprocess that 77 implements a simple request/response usage model. The primary benefit 78 is that reading responses takes a deadline, so that we don't ever block 79 indefinitely. The class also handles transparently restarting processes 80 as necessary to keep issuing commands.""" 81 82 def __init__(self, port_obj, name, cmd, env=None, universal_newlines=False, treat_no_data_as_crash=False, 83 logging=False): 84 self._port = port_obj 85 self._name = name # Should be the command name (e.g. content_shell, image_diff) 86 self._cmd = cmd 87 self._env = env 88 # Set if the process outputs non-standard newlines like '\r\n' or '\r'. 89 # Don't set if there will be binary data or the data must be ASCII encoded. 90 self._universal_newlines = universal_newlines 91 self._treat_no_data_as_crash = treat_no_data_as_crash 92 self._logging = logging 93 self._host = self._port.host 94 self._pid = None 95 self._reset() 96 97 # See comment in imports for why we need the win32 APIs and can't just use select. 98 # FIXME: there should be a way to get win32 vs. cygwin from platforminfo. 99 self._use_win32_apis = sys.platform == 'win32' 100 101 def name(self): 102 return self._name 103 104 def pid(self): 105 return self._pid 106 107 def _reset(self): 108 if getattr(self, '_proc', None): 109 if self._proc.stdin: 110 self._proc.stdin.close() 111 self._proc.stdin = None 112 if self._proc.stdout: 113 self._proc.stdout.close() 114 self._proc.stdout = None 115 if self._proc.stderr: 116 self._proc.stderr.close() 117 self._proc.stderr = None 118 119 self._proc = None 120 self._output = str() # bytesarray() once we require Python 2.6 121 self._error = str() # bytesarray() once we require Python 2.6 122 self._crashed = False 123 self.timed_out = False 124 125 def process_name(self): 126 return self._name 127 128 def _start(self): 129 if self._proc: 130 raise ValueError("%s already running" % self._name) 131 self._reset() 132 # close_fds is a workaround for http://bugs.python.org/issue2320 133 close_fds = not self._host.platform.is_win() 134 if self._logging: 135 env_str = '' 136 if self._env: 137 env_str += '\n'.join("%s=%s" % (k, v) for k, v in self._env.items()) + '\n' 138 _log.info('CMD: \n%s%s\n', env_str, _quote_cmd(self._cmd)) 139 self._proc = self._host.executive.popen(self._cmd, stdin=self._host.executive.PIPE, 140 stdout=self._host.executive.PIPE, 141 stderr=self._host.executive.PIPE, 142 close_fds=close_fds, 143 env=self._env, 144 universal_newlines=self._universal_newlines) 145 self._pid = self._proc.pid 146 fd = self._proc.stdout.fileno() 147 if not self._use_win32_apis: 148 fl = fcntl.fcntl(fd, fcntl.F_GETFL) 149 fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) 150 fd = self._proc.stderr.fileno() 151 fl = fcntl.fcntl(fd, fcntl.F_GETFL) 152 fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) 153 154 def _handle_possible_interrupt(self): 155 """This routine checks to see if the process crashed or exited 156 because of a keyboard interrupt and raises KeyboardInterrupt 157 accordingly.""" 158 # FIXME: Linux and Mac set the returncode to -signal.SIGINT if a 159 # subprocess is killed with a ctrl^C. Previous comments in this 160 # routine said that supposedly Windows returns 0xc000001d, but that's not what 161 # -1073741510 evaluates to. Figure out what the right value is 162 # for win32 and cygwin here ... 163 if self._proc.returncode in (-1073741510, -signal.SIGINT): 164 raise KeyboardInterrupt 165 166 def poll(self): 167 """Check to see if the underlying process is running; returns None 168 if it still is (wrapper around subprocess.poll).""" 169 if self._proc: 170 return self._proc.poll() 171 return None 172 173 def write(self, bytes): 174 """Write a request to the subprocess. The subprocess is (re-)start()'ed 175 if is not already running.""" 176 if not self._proc: 177 self._start() 178 try: 179 self._log_data(' IN', bytes) 180 self._proc.stdin.write(bytes) 181 except IOError, e: 182 self.stop(0.0) 183 # stop() calls _reset(), so we have to set crashed to True after calling stop(). 184 self._crashed = True 185 186 def _pop_stdout_line_if_ready(self): 187 index_after_newline = self._output.find('\n') + 1 188 if index_after_newline > 0: 189 return self._pop_output_bytes(index_after_newline) 190 return None 191 192 def _pop_stderr_line_if_ready(self): 193 index_after_newline = self._error.find('\n') + 1 194 if index_after_newline > 0: 195 return self._pop_error_bytes(index_after_newline) 196 return None 197 198 def pop_all_buffered_stderr(self): 199 return self._pop_error_bytes(len(self._error)) 200 201 def read_stdout_line(self, deadline): 202 return self._read(deadline, self._pop_stdout_line_if_ready) 203 204 def read_stderr_line(self, deadline): 205 return self._read(deadline, self._pop_stderr_line_if_ready) 206 207 def read_either_stdout_or_stderr_line(self, deadline): 208 def retrieve_bytes_from_buffers(): 209 stdout_line = self._pop_stdout_line_if_ready() 210 if stdout_line: 211 return stdout_line, None 212 stderr_line = self._pop_stderr_line_if_ready() 213 if stderr_line: 214 return None, stderr_line 215 return None # Instructs the caller to keep waiting. 216 217 return_value = self._read(deadline, retrieve_bytes_from_buffers) 218 # FIXME: This is a bit of a hack around the fact that _read normally only returns one value, but this caller wants it to return two. 219 if return_value is None: 220 return None, None 221 return return_value 222 223 def read_stdout(self, deadline, size): 224 if size <= 0: 225 raise ValueError('ServerProcess.read() called with a non-positive size: %d ' % size) 226 227 def retrieve_bytes_from_stdout_buffer(): 228 if len(self._output) >= size: 229 return self._pop_output_bytes(size) 230 return None 231 232 return self._read(deadline, retrieve_bytes_from_stdout_buffer) 233 234 def _log(self, message): 235 # This is a bit of a hack, but we first log a blank line to avoid 236 # messing up the master process's output. 237 _log.info('') 238 _log.info(message) 239 240 def _log_data(self, prefix, data): 241 if self._logging and data and len(data): 242 for line in quote_data(data): 243 _log.info('%s: %s', prefix, line) 244 245 def _handle_timeout(self): 246 self.timed_out = True 247 self._port.sample_process(self._name, self._proc.pid) 248 249 def _split_string_after_index(self, string, index): 250 return string[:index], string[index:] 251 252 def _pop_output_bytes(self, bytes_count): 253 output, self._output = self._split_string_after_index(self._output, bytes_count) 254 return output 255 256 def _pop_error_bytes(self, bytes_count): 257 output, self._error = self._split_string_after_index(self._error, bytes_count) 258 return output 259 260 def _wait_for_data_and_update_buffers_using_select(self, deadline, stopping=False): 261 if self._proc.stdout.closed or self._proc.stderr.closed: 262 # If the process crashed and is using FIFOs, like Chromium Android, the 263 # stdout and stderr pipes will be closed. 264 return 265 266 out_fd = self._proc.stdout.fileno() 267 err_fd = self._proc.stderr.fileno() 268 select_fds = (out_fd, err_fd) 269 try: 270 read_fds, _, _ = select.select(select_fds, [], select_fds, max(deadline - time.time(), 0)) 271 except select.error, e: 272 # We can ignore EINVAL since it's likely the process just crashed and we'll 273 # figure that out the next time through the loop in _read(). 274 if e.args[0] == errno.EINVAL: 275 return 276 raise 277 278 try: 279 # Note that we may get no data during read() even though 280 # select says we got something; see the select() man page 281 # on linux. I don't know if this happens on Mac OS and 282 # other Unixen as well, but we don't bother special-casing 283 # Linux because it's relatively harmless either way. 284 if out_fd in read_fds: 285 data = self._proc.stdout.read() 286 if not data and not stopping and (self._treat_no_data_as_crash or self._proc.poll()): 287 self._crashed = True 288 self._log_data('OUT', data) 289 self._output += data 290 291 if err_fd in read_fds: 292 data = self._proc.stderr.read() 293 if not data and not stopping and (self._treat_no_data_as_crash or self._proc.poll()): 294 self._crashed = True 295 self._log_data('ERR', data) 296 self._error += data 297 except IOError, e: 298 # We can ignore the IOErrors because we will detect if the subporcess crashed 299 # the next time through the loop in _read() 300 pass 301 302 def _wait_for_data_and_update_buffers_using_win32_apis(self, deadline): 303 # See http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/ 304 # and http://docs.activestate.com/activepython/2.6/pywin32/modules.html 305 # for documentation on all of these win32-specific modules. 306 now = time.time() 307 out_fh = msvcrt.get_osfhandle(self._proc.stdout.fileno()) 308 err_fh = msvcrt.get_osfhandle(self._proc.stderr.fileno()) 309 while (self._proc.poll() is None) and (now < deadline): 310 output = self._non_blocking_read_win32(out_fh) 311 self._log_data('OUT', output) 312 error = self._non_blocking_read_win32(err_fh) 313 self._log_data('ERR', error) 314 if output or error: 315 if output: 316 self._output += output 317 if error: 318 self._error += error 319 return 320 time.sleep(0.01) 321 now = time.time() 322 return 323 324 def _non_blocking_read_win32(self, handle): 325 try: 326 _, avail, _ = win32pipe.PeekNamedPipe(handle, 0) 327 if avail > 0: 328 _, buf = win32file.ReadFile(handle, avail, None) 329 return buf 330 except Exception, e: 331 if e[0] not in (109, errno.ESHUTDOWN): # 109 == win32 ERROR_BROKEN_PIPE 332 raise 333 return None 334 335 def has_crashed(self): 336 if not self._crashed and self.poll(): 337 self._crashed = True 338 self._handle_possible_interrupt() 339 return self._crashed 340 341 # This read function is a bit oddly-designed, as it polls both stdout and stderr, yet 342 # only reads/returns from one of them (buffering both in local self._output/self._error). 343 # It might be cleaner to pass in the file descriptor to poll instead. 344 def _read(self, deadline, fetch_bytes_from_buffers_callback): 345 while True: 346 if self.has_crashed(): 347 return None 348 349 if time.time() > deadline: 350 self._handle_timeout() 351 return None 352 353 bytes = fetch_bytes_from_buffers_callback() 354 if bytes is not None: 355 return bytes 356 357 if self._use_win32_apis: 358 self._wait_for_data_and_update_buffers_using_win32_apis(deadline) 359 else: 360 self._wait_for_data_and_update_buffers_using_select(deadline) 361 362 def start(self): 363 if not self._proc: 364 self._start() 365 366 def stop(self, timeout_secs=0.0): 367 if not self._proc: 368 return (None, None) 369 370 now = time.time() 371 if self._proc.stdin: 372 if self._logging: 373 _log.info(' IN: ^D') 374 self._proc.stdin.close() 375 self._proc.stdin = None 376 killed = False 377 if timeout_secs: 378 deadline = now + timeout_secs 379 while self._proc.poll() is None and time.time() < deadline: 380 time.sleep(0.01) 381 if self._proc.poll() is None: 382 _log.warning('stopping %s(pid %d) timed out, killing it' % (self._name, self._proc.pid)) 383 384 if self._proc.poll() is None: 385 self._kill() 386 killed = True 387 _log.debug('killed pid %d' % self._proc.pid) 388 389 # read any remaining data on the pipes and return it. 390 if not killed: 391 if self._use_win32_apis: 392 self._wait_for_data_and_update_buffers_using_win32_apis(now) 393 else: 394 self._wait_for_data_and_update_buffers_using_select(now, stopping=True) 395 out, err = self._output, self._error 396 self._reset() 397 return (out, err) 398 399 def kill(self): 400 self.stop(0.0) 401 402 def _kill(self): 403 self._host.executive.kill_process(self._proc.pid) 404 if self._proc.poll() is not None: 405 self._proc.wait() 406 407 def replace_outputs(self, stdout, stderr): 408 assert self._proc 409 if stdout: 410 self._proc.stdout.close() 411 self._proc.stdout = stdout 412 if stderr: 413 self._proc.stderr.close() 414 self._proc.stderr = stderr 415