Home | History | Annotate | Download | only in server
      1 # Copyright 2010 Google Inc. All Rights Reserved.
      2 #
      3 
      4 import logging
      5 import os.path
      6 import threading
      7 
      8 from automation.common import command as cmd
      9 from automation.common import job
     10 from automation.common import logger
     11 from automation.common.command_executer import LoggingCommandExecuter
     12 from automation.common.command_executer import CommandTerminator
     13 
     14 
     15 class JobExecuter(threading.Thread):
     16 
     17   def __init__(self, job_to_execute, machines, listeners):
     18     threading.Thread.__init__(self)
     19 
     20     assert machines
     21 
     22     self.job = job_to_execute
     23     self.listeners = listeners
     24     self.machines = machines
     25 
     26     # Set Thread name.
     27     self.name = '%s-%s' % (self.__class__.__name__, self.job.id)
     28 
     29     self._logger = logging.getLogger(self.__class__.__name__)
     30     self._executer = LoggingCommandExecuter(self.job.dry_run)
     31     self._terminator = CommandTerminator()
     32 
     33   def _RunRemotely(self, command, fail_msg, command_timeout=1 * 60 * 60):
     34     exit_code = self._executer.RunCommand(command,
     35                                           self.job.primary_machine.hostname,
     36                                           self.job.primary_machine.username,
     37                                           command_terminator=self._terminator,
     38                                           command_timeout=command_timeout)
     39     if exit_code:
     40       raise job.JobFailure(fail_msg, exit_code)
     41 
     42   def _RunLocally(self, command, fail_msg, command_timeout=1 * 60 * 60):
     43     exit_code = self._executer.RunCommand(command,
     44                                           command_terminator=self._terminator,
     45                                           command_timeout=command_timeout)
     46     if exit_code:
     47       raise job.JobFailure(fail_msg, exit_code)
     48 
     49   def Kill(self):
     50     self._terminator.Terminate()
     51 
     52   def CleanUpWorkDir(self):
     53     self._logger.debug('Cleaning up %r work directory.', self.job)
     54     self._RunRemotely(cmd.RmTree(self.job.work_dir), 'Cleanup workdir failed.')
     55 
     56   def CleanUpHomeDir(self):
     57     self._logger.debug('Cleaning up %r home directory.', self.job)
     58     self._RunLocally(cmd.RmTree(self.job.home_dir), 'Cleanup homedir failed.')
     59 
     60   def _PrepareRuntimeEnvironment(self):
     61     self._RunRemotely(
     62         cmd.MakeDir(self.job.work_dir, self.job.logs_dir, self.job.results_dir),
     63         'Creating new job directory failed.')
     64 
     65     # The log directory is ready, so we can prepare to log command's output.
     66     self._executer.OpenLog(os.path.join(self.job.logs_dir,
     67                                         self.job.log_filename_prefix))
     68 
     69   def _SatisfyFolderDependencies(self):
     70     for dependency in self.job.folder_dependencies:
     71       to_folder = os.path.join(self.job.work_dir, dependency.dest)
     72       from_folder = os.path.join(dependency.job.work_dir, dependency.src)
     73       from_machine = dependency.job.primary_machine
     74 
     75       if from_machine == self.job.primary_machine and dependency.read_only:
     76         # No need to make a copy, just symlink it
     77         self._RunRemotely(
     78             cmd.MakeSymlink(from_folder, to_folder),
     79             'Failed to create symlink to required directory.')
     80       else:
     81         self._RunRemotely(
     82             cmd.RemoteCopyFrom(from_machine.hostname,
     83                                from_folder,
     84                                to_folder,
     85                                username=from_machine.username),
     86             'Failed to copy required files.')
     87 
     88   def _LaunchJobCommand(self):
     89     command = self.job.GetCommand()
     90 
     91     self._RunRemotely('%s; %s' % ('PS1=. TERM=linux source ~/.bashrc',
     92                                   cmd.Wrapper(command,
     93                                               cwd=self.job.work_dir)),
     94                       "Command failed to execute: '%s'." % command,
     95                       self.job.timeout)
     96 
     97   def _CopyJobResults(self):
     98     """Copy test results back to directory."""
     99     self._RunLocally(
    100         cmd.RemoteCopyFrom(self.job.primary_machine.hostname,
    101                            self.job.results_dir,
    102                            self.job.home_dir,
    103                            username=self.job.primary_machine.username),
    104         'Failed to copy results.')
    105 
    106   def run(self):
    107     self.job.status = job.STATUS_SETUP
    108     self.job.machines = self.machines
    109     self._logger.debug('Executing %r on %r in directory %s.', self.job,
    110                        self.job.primary_machine.hostname, self.job.work_dir)
    111 
    112     try:
    113       self.CleanUpWorkDir()
    114 
    115       self._PrepareRuntimeEnvironment()
    116 
    117       self.job.status = job.STATUS_COPYING
    118 
    119       self._SatisfyFolderDependencies()
    120 
    121       self.job.status = job.STATUS_RUNNING
    122 
    123       self._LaunchJobCommand()
    124       self._CopyJobResults()
    125 
    126       # If we get here, the job succeeded.
    127       self.job.status = job.STATUS_SUCCEEDED
    128     except job.JobFailure as ex:
    129       self._logger.error('Job failed. Exit code %s. %s', ex.exit_code, ex)
    130       if self._terminator.IsTerminated():
    131         self._logger.info('%r was killed', self.job)
    132 
    133       self.job.status = job.STATUS_FAILED
    134 
    135     self._executer.CloseLog()
    136 
    137     for listener in self.listeners:
    138       listener.NotifyJobComplete(self.job)
    139