Home | History | Annotate | Download | only in common
      1 # Copyright 2011 Google Inc. All Rights Reserved.
      2 #
      3 """Classes that help running commands in a subshell.
      4 
      5 Commands can be run locally, or remotly using SSH connection.  You may log the
      6 output of a command to a terminal or a file, or any other destination.
      7 """
      8 
      9 __author__ = 'kbaclawski (at] google.com (Krystian Baclawski)'
     10 
     11 import fcntl
     12 import logging
     13 import os
     14 import select
     15 import subprocess
     16 import time
     17 
     18 from automation.common import logger
     19 
     20 
     21 class CommandExecuter(object):
     22   DRY_RUN = False
     23 
     24   def __init__(self, dry_run=False):
     25     self._logger = logging.getLogger(self.__class__.__name__)
     26     self._dry_run = dry_run or self.DRY_RUN
     27 
     28   @classmethod
     29   def Configure(cls, dry_run):
     30     cls.DRY_RUN = dry_run
     31 
     32   def RunCommand(self,
     33                  cmd,
     34                  machine=None,
     35                  username=None,
     36                  command_terminator=None,
     37                  command_timeout=None):
     38     cmd = str(cmd)
     39 
     40     if self._dry_run:
     41       return 0
     42 
     43     if not command_terminator:
     44       command_terminator = CommandTerminator()
     45 
     46     if command_terminator.IsTerminated():
     47       self._logger.warning('Command has been already terminated!')
     48       return 1
     49 
     50     # Rewrite command for remote execution.
     51     if machine:
     52       if username:
     53         login = '%s@%s' % (username, machine)
     54       else:
     55         login = machine
     56 
     57       self._logger.debug("Executing '%s' on %s.", cmd, login)
     58 
     59       # FIXME(asharif): Remove this after crosbug.com/33007 is fixed.
     60       cmd = "ssh -t -t %s -- '%s'" % (login, cmd)
     61     else:
     62       self._logger.debug("Executing: '%s'.", cmd)
     63 
     64     child = self._SpawnProcess(cmd, command_terminator, command_timeout)
     65 
     66     self._logger.debug('{PID: %d} Finished with %d code.', child.pid,
     67                        child.returncode)
     68 
     69     return child.returncode
     70 
     71   def _Terminate(self, child, command_timeout, wait_timeout=10):
     72     """Gracefully shutdown the child by sending SIGTERM."""
     73 
     74     if command_timeout:
     75       self._logger.warning('{PID: %d} Timeout of %s seconds reached since '
     76                            'process started.', child.pid, command_timeout)
     77 
     78     self._logger.warning('{PID: %d} Terminating child.', child.pid)
     79 
     80     try:
     81       child.terminate()
     82     except OSError:
     83       pass
     84 
     85     wait_started = time.time()
     86 
     87     while not child.poll():
     88       if time.time() - wait_started >= wait_timeout:
     89         break
     90       time.sleep(0.1)
     91 
     92     return child.poll()
     93 
     94   def _Kill(self, child):
     95     """Kill the child with immediate result."""
     96     self._logger.warning('{PID: %d} Process still alive.', child.pid)
     97     self._logger.warning('{PID: %d} Killing child.', child.pid)
     98     child.kill()
     99     child.wait()
    100 
    101   def _SpawnProcess(self, cmd, command_terminator, command_timeout):
    102     # Create a child process executing provided command.
    103     child = subprocess.Popen(cmd,
    104                              stdout=subprocess.PIPE,
    105                              stderr=subprocess.PIPE,
    106                              stdin=subprocess.PIPE,
    107                              shell=True)
    108 
    109     # Close stdin so the child won't be able to block on read.
    110     child.stdin.close()
    111 
    112     started_time = time.time()
    113 
    114     # Watch for data on process stdout, stderr.
    115     pipes = [child.stdout, child.stderr]
    116 
    117     # Put pipes into non-blocking mode.
    118     for pipe in pipes:
    119       fd = pipe.fileno()
    120       fd_flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    121       fcntl.fcntl(fd, fcntl.F_SETFL, fd_flags | os.O_NONBLOCK)
    122 
    123     already_terminated = False
    124 
    125     while pipes:
    126       # Maybe timeout reached?
    127       if command_timeout and time.time() - started_time > command_timeout:
    128         command_terminator.Terminate()
    129 
    130       # Check if terminate request was received.
    131       if command_terminator.IsTerminated() and not already_terminated:
    132         if not self._Terminate(child, command_timeout):
    133           self._Kill(child)
    134         # Don't exit the loop immediately. Firstly try to read everything that
    135         # was left on stdout and stderr.
    136         already_terminated = True
    137 
    138       # Wait for pipes to become ready.
    139       ready_pipes, _, _ = select.select(pipes, [], [], 0.1)
    140 
    141       # Handle file descriptors ready to be read.
    142       for pipe in ready_pipes:
    143         fd = pipe.fileno()
    144 
    145         data = os.read(fd, 4096)
    146 
    147         # check for end-of-file
    148         if not data:
    149           pipes.remove(pipe)
    150           continue
    151 
    152         # read all data that's available
    153         while data:
    154           if pipe == child.stdout:
    155             self.DataReceivedOnOutput(data)
    156           elif pipe == child.stderr:
    157             self.DataReceivedOnError(data)
    158 
    159           try:
    160             data = os.read(fd, 4096)
    161           except OSError:
    162             # terminate loop if EWOULDBLOCK (EAGAIN) is received
    163             data = ''
    164 
    165     if not already_terminated:
    166       self._logger.debug('Waiting for command to finish.')
    167       child.wait()
    168 
    169     return child
    170 
    171   def DataReceivedOnOutput(self, data):
    172     """Invoked when the child process wrote data to stdout."""
    173     sys.stdout.write(data)
    174 
    175   def DataReceivedOnError(self, data):
    176     """Invoked when the child process wrote data to stderr."""
    177     sys.stderr.write(data)
    178 
    179 
    180 class LoggingCommandExecuter(CommandExecuter):
    181 
    182   def __init__(self, *args, **kwargs):
    183     super(LoggingCommandExecuter, self).__init__(*args, **kwargs)
    184 
    185     # Create a logger for command's stdout/stderr streams.
    186     self._output = logging.getLogger('%s.%s' % (self._logger.name, 'Output'))
    187 
    188   def OpenLog(self, log_path):
    189     """The messages are going to be saved to gzip compressed file."""
    190     formatter = logging.Formatter('%(asctime)s %(prefix)s: %(message)s',
    191                                   '%Y-%m-%d %H:%M:%S')
    192     handler = logger.CompressedFileHandler(log_path, delay=True)
    193     handler.setFormatter(formatter)
    194     self._output.addHandler(handler)
    195 
    196     # Set a flag to prevent log records from being propagated up the logger
    197     # hierarchy tree.  We don't want for command output messages to appear in
    198     # the main log.
    199     self._output.propagate = 0
    200 
    201   def CloseLog(self):
    202     """Remove handlers and reattach the logger to its parent."""
    203     for handler in list(self._output.handlers):
    204       self._output.removeHandler(handler)
    205       handler.flush()
    206       handler.close()
    207 
    208     self._output.propagate = 1
    209 
    210   def DataReceivedOnOutput(self, data):
    211     """Invoked when the child process wrote data to stdout."""
    212     for line in data.splitlines():
    213       self._output.info(line, extra={'prefix': 'STDOUT'})
    214 
    215   def DataReceivedOnError(self, data):
    216     """Invoked when the child process wrote data to stderr."""
    217     for line in data.splitlines():
    218       self._output.warning(line, extra={'prefix': 'STDERR'})
    219 
    220 
    221 class CommandTerminator(object):
    222 
    223   def __init__(self):
    224     self.terminated = False
    225 
    226   def Terminate(self):
    227     self.terminated = True
    228 
    229   def IsTerminated(self):
    230     return self.terminated
    231