Home | History | Annotate | Download | only in common
      1 # Copyright 2010 Google Inc. All Rights Reserved.
      2 #
      3 """A module for a job in the infrastructure."""
      4 
      5 __author__ = 'raymes (at] google.com (Raymes Khoury)'
      6 
      7 import os.path
      8 
      9 from automation.common import state_machine
     10 
     11 STATUS_NOT_EXECUTED = 'NOT_EXECUTED'
     12 STATUS_SETUP = 'SETUP'
     13 STATUS_COPYING = 'COPYING'
     14 STATUS_RUNNING = 'RUNNING'
     15 STATUS_SUCCEEDED = 'SUCCEEDED'
     16 STATUS_FAILED = 'FAILED'
     17 
     18 
     19 class FolderDependency(object):
     20 
     21   def __init__(self, job, src, dest=None):
     22     if not dest:
     23       dest = src
     24 
     25     # TODO(kbaclawski): rename to producer
     26     self.job = job
     27     self.src = src
     28     self.dest = dest
     29 
     30   @property
     31   def read_only(self):
     32     return self.dest == self.src
     33 
     34 
     35 class JobStateMachine(state_machine.BasicStateMachine):
     36   state_machine = {
     37       STATUS_NOT_EXECUTED: [STATUS_SETUP],
     38       STATUS_SETUP: [STATUS_COPYING, STATUS_FAILED],
     39       STATUS_COPYING: [STATUS_RUNNING, STATUS_FAILED],
     40       STATUS_RUNNING: [STATUS_SUCCEEDED, STATUS_FAILED]
     41   }
     42 
     43   final_states = [STATUS_SUCCEEDED, STATUS_FAILED]
     44 
     45 
     46 class JobFailure(Exception):
     47 
     48   def __init__(self, message, exit_code):
     49     Exception.__init__(self, message)
     50     self.exit_code = exit_code
     51 
     52 
     53 class Job(object):
     54   """A class representing a job whose commands will be executed."""
     55 
     56   WORKDIR_PREFIX = '/usr/local/google/tmp/automation'
     57 
     58   def __init__(self, label, command, timeout=4 * 60 * 60):
     59     self._state = JobStateMachine(STATUS_NOT_EXECUTED)
     60     self.predecessors = set()
     61     self.successors = set()
     62     self.machine_dependencies = []
     63     self.folder_dependencies = []
     64     self.id = 0
     65     self.machines = []
     66     self.command = command
     67     self._has_primary_machine_spec = False
     68     self.group = None
     69     self.dry_run = None
     70     self.label = label
     71     self.timeout = timeout
     72 
     73   def _StateGet(self):
     74     return self._state
     75 
     76   def _StateSet(self, new_state):
     77     self._state.Change(new_state)
     78 
     79   status = property(_StateGet, _StateSet)
     80 
     81   @property
     82   def timeline(self):
     83     return self._state.timeline
     84 
     85   def __repr__(self):
     86     return '{%s: %s}' % (self.__class__.__name__, self.id)
     87 
     88   def __str__(self):
     89     res = []
     90     res.append('%d' % self.id)
     91     res.append('Predecessors:')
     92     res.extend(['%d' % pred.id for pred in self.predecessors])
     93     res.append('Successors:')
     94     res.extend(['%d' % succ.id for succ in self.successors])
     95     res.append('Machines:')
     96     res.extend(['%s' % machine for machine in self.machines])
     97     res.append(self.PrettyFormatCommand())
     98     res.append('%s' % self.status)
     99     res.append(self.timeline.GetTransitionEventReport())
    100     return '\n'.join(res)
    101 
    102   @staticmethod
    103   def _FormatCommand(cmd, substitutions):
    104     for pattern, replacement in substitutions:
    105       cmd = cmd.replace(pattern, replacement)
    106 
    107     return cmd
    108 
    109   def GetCommand(self):
    110     substitutions = [
    111         ('$JOB_ID', str(self.id)), ('$JOB_TMP', self.work_dir),
    112         ('$JOB_HOME', self.home_dir),
    113         ('$PRIMARY_MACHINE', self.primary_machine.hostname)
    114     ]
    115 
    116     if len(self.machines) > 1:
    117       for num, machine in enumerate(self.machines[1:]):
    118         substitutions.append(('$SECONDARY_MACHINES[%d]' % num, machine.hostname
    119                              ))
    120 
    121     return self._FormatCommand(str(self.command), substitutions)
    122 
    123   def PrettyFormatCommand(self):
    124     # TODO(kbaclawski): This method doesn't belong here, but rather to
    125     # non existing Command class. If one is created then PrettyFormatCommand
    126     # shall become its method.
    127     return self._FormatCommand(self.GetCommand(), [
    128         ('\{ ', ''), ('; \}', ''), ('\} ', '\n'), ('\s*&&\s*', '\n')
    129     ])
    130 
    131   def DependsOnFolder(self, dependency):
    132     self.folder_dependencies.append(dependency)
    133     self.DependsOn(dependency.job)
    134 
    135   @property
    136   def results_dir(self):
    137     return os.path.join(self.work_dir, 'results')
    138 
    139   @property
    140   def logs_dir(self):
    141     return os.path.join(self.home_dir, 'logs')
    142 
    143   @property
    144   def log_filename_prefix(self):
    145     return 'job-%d.log' % self.id
    146 
    147   @property
    148   def work_dir(self):
    149     return os.path.join(self.WORKDIR_PREFIX, 'job-%d' % self.id)
    150 
    151   @property
    152   def home_dir(self):
    153     return os.path.join(self.group.home_dir, 'job-%d' % self.id)
    154 
    155   @property
    156   def primary_machine(self):
    157     return self.machines[0]
    158 
    159   def DependsOn(self, job):
    160     """Specifies Jobs to be finished before this job can be launched."""
    161     self.predecessors.add(job)
    162     job.successors.add(self)
    163 
    164   @property
    165   def is_ready(self):
    166     """Check that all our dependencies have been executed."""
    167     return all(pred.status == STATUS_SUCCEEDED for pred in self.predecessors)
    168 
    169   def DependsOnMachine(self, machine_spec, primary=True):
    170     # Job will run on arbitrarily chosen machine specified by
    171     # MachineSpecification class instances passed to this method.
    172     if primary:
    173       if self._has_primary_machine_spec:
    174         raise RuntimeError('Only one primary machine specification allowed.')
    175       self._has_primary_machine_spec = True
    176       self.machine_dependencies.insert(0, machine_spec)
    177     else:
    178       self.machine_dependencies.append(machine_spec)
    179