1 # Copyright 2011 Google Inc. All Rights Reserved. 2 # 3 """Classes that help running commands in a subshell. 4 5 Commands can be run locally, or remotly using SSH connection. You may log the 6 output of a command to a terminal or a file, or any other destination. 7 """ 8 9 __author__ = 'kbaclawski (at] google.com (Krystian Baclawski)' 10 11 import fcntl 12 import logging 13 import os 14 import select 15 import subprocess 16 import time 17 18 from automation.common import logger 19 20 21 class CommandExecuter(object): 22 DRY_RUN = False 23 24 def __init__(self, dry_run=False): 25 self._logger = logging.getLogger(self.__class__.__name__) 26 self._dry_run = dry_run or self.DRY_RUN 27 28 @classmethod 29 def Configure(cls, dry_run): 30 cls.DRY_RUN = dry_run 31 32 def RunCommand(self, 33 cmd, 34 machine=None, 35 username=None, 36 command_terminator=None, 37 command_timeout=None): 38 cmd = str(cmd) 39 40 if self._dry_run: 41 return 0 42 43 if not command_terminator: 44 command_terminator = CommandTerminator() 45 46 if command_terminator.IsTerminated(): 47 self._logger.warning('Command has been already terminated!') 48 return 1 49 50 # Rewrite command for remote execution. 51 if machine: 52 if username: 53 login = '%s@%s' % (username, machine) 54 else: 55 login = machine 56 57 self._logger.debug("Executing '%s' on %s.", cmd, login) 58 59 # FIXME(asharif): Remove this after crosbug.com/33007 is fixed. 60 cmd = "ssh -t -t %s -- '%s'" % (login, cmd) 61 else: 62 self._logger.debug("Executing: '%s'.", cmd) 63 64 child = self._SpawnProcess(cmd, command_terminator, command_timeout) 65 66 self._logger.debug('{PID: %d} Finished with %d code.', child.pid, 67 child.returncode) 68 69 return child.returncode 70 71 def _Terminate(self, child, command_timeout, wait_timeout=10): 72 """Gracefully shutdown the child by sending SIGTERM.""" 73 74 if command_timeout: 75 self._logger.warning('{PID: %d} Timeout of %s seconds reached since ' 76 'process started.', child.pid, command_timeout) 77 78 self._logger.warning('{PID: %d} Terminating child.', child.pid) 79 80 try: 81 child.terminate() 82 except OSError: 83 pass 84 85 wait_started = time.time() 86 87 while not child.poll(): 88 if time.time() - wait_started >= wait_timeout: 89 break 90 time.sleep(0.1) 91 92 return child.poll() 93 94 def _Kill(self, child): 95 """Kill the child with immediate result.""" 96 self._logger.warning('{PID: %d} Process still alive.', child.pid) 97 self._logger.warning('{PID: %d} Killing child.', child.pid) 98 child.kill() 99 child.wait() 100 101 def _SpawnProcess(self, cmd, command_terminator, command_timeout): 102 # Create a child process executing provided command. 103 child = subprocess.Popen(cmd, 104 stdout=subprocess.PIPE, 105 stderr=subprocess.PIPE, 106 stdin=subprocess.PIPE, 107 shell=True) 108 109 # Close stdin so the child won't be able to block on read. 110 child.stdin.close() 111 112 started_time = time.time() 113 114 # Watch for data on process stdout, stderr. 115 pipes = [child.stdout, child.stderr] 116 117 # Put pipes into non-blocking mode. 118 for pipe in pipes: 119 fd = pipe.fileno() 120 fd_flags = fcntl.fcntl(fd, fcntl.F_GETFL) 121 fcntl.fcntl(fd, fcntl.F_SETFL, fd_flags | os.O_NONBLOCK) 122 123 already_terminated = False 124 125 while pipes: 126 # Maybe timeout reached? 127 if command_timeout and time.time() - started_time > command_timeout: 128 command_terminator.Terminate() 129 130 # Check if terminate request was received. 131 if command_terminator.IsTerminated() and not already_terminated: 132 if not self._Terminate(child, command_timeout): 133 self._Kill(child) 134 # Don't exit the loop immediately. Firstly try to read everything that 135 # was left on stdout and stderr. 136 already_terminated = True 137 138 # Wait for pipes to become ready. 139 ready_pipes, _, _ = select.select(pipes, [], [], 0.1) 140 141 # Handle file descriptors ready to be read. 142 for pipe in ready_pipes: 143 fd = pipe.fileno() 144 145 data = os.read(fd, 4096) 146 147 # check for end-of-file 148 if not data: 149 pipes.remove(pipe) 150 continue 151 152 # read all data that's available 153 while data: 154 if pipe == child.stdout: 155 self.DataReceivedOnOutput(data) 156 elif pipe == child.stderr: 157 self.DataReceivedOnError(data) 158 159 try: 160 data = os.read(fd, 4096) 161 except OSError: 162 # terminate loop if EWOULDBLOCK (EAGAIN) is received 163 data = '' 164 165 if not already_terminated: 166 self._logger.debug('Waiting for command to finish.') 167 child.wait() 168 169 return child 170 171 def DataReceivedOnOutput(self, data): 172 """Invoked when the child process wrote data to stdout.""" 173 sys.stdout.write(data) 174 175 def DataReceivedOnError(self, data): 176 """Invoked when the child process wrote data to stderr.""" 177 sys.stderr.write(data) 178 179 180 class LoggingCommandExecuter(CommandExecuter): 181 182 def __init__(self, *args, **kwargs): 183 super(LoggingCommandExecuter, self).__init__(*args, **kwargs) 184 185 # Create a logger for command's stdout/stderr streams. 186 self._output = logging.getLogger('%s.%s' % (self._logger.name, 'Output')) 187 188 def OpenLog(self, log_path): 189 """The messages are going to be saved to gzip compressed file.""" 190 formatter = logging.Formatter('%(asctime)s %(prefix)s: %(message)s', 191 '%Y-%m-%d %H:%M:%S') 192 handler = logger.CompressedFileHandler(log_path, delay=True) 193 handler.setFormatter(formatter) 194 self._output.addHandler(handler) 195 196 # Set a flag to prevent log records from being propagated up the logger 197 # hierarchy tree. We don't want for command output messages to appear in 198 # the main log. 199 self._output.propagate = 0 200 201 def CloseLog(self): 202 """Remove handlers and reattach the logger to its parent.""" 203 for handler in list(self._output.handlers): 204 self._output.removeHandler(handler) 205 handler.flush() 206 handler.close() 207 208 self._output.propagate = 1 209 210 def DataReceivedOnOutput(self, data): 211 """Invoked when the child process wrote data to stdout.""" 212 for line in data.splitlines(): 213 self._output.info(line, extra={'prefix': 'STDOUT'}) 214 215 def DataReceivedOnError(self, data): 216 """Invoked when the child process wrote data to stderr.""" 217 for line in data.splitlines(): 218 self._output.warning(line, extra={'prefix': 'STDERR'}) 219 220 221 class CommandTerminator(object): 222 223 def __init__(self): 224 self.terminated = False 225 226 def Terminate(self): 227 self.terminated = True 228 229 def IsTerminated(self): 230 return self.terminated 231