1 # Copyright 2010 Google Inc. All Rights Reserved. 2 # 3 """A module for a job in the infrastructure.""" 4 5 __author__ = 'raymes (at] google.com (Raymes Khoury)' 6 7 import os.path 8 9 from automation.common import state_machine 10 11 STATUS_NOT_EXECUTED = 'NOT_EXECUTED' 12 STATUS_SETUP = 'SETUP' 13 STATUS_COPYING = 'COPYING' 14 STATUS_RUNNING = 'RUNNING' 15 STATUS_SUCCEEDED = 'SUCCEEDED' 16 STATUS_FAILED = 'FAILED' 17 18 19 class FolderDependency(object): 20 21 def __init__(self, job, src, dest=None): 22 if not dest: 23 dest = src 24 25 # TODO(kbaclawski): rename to producer 26 self.job = job 27 self.src = src 28 self.dest = dest 29 30 @property 31 def read_only(self): 32 return self.dest == self.src 33 34 35 class JobStateMachine(state_machine.BasicStateMachine): 36 state_machine = { 37 STATUS_NOT_EXECUTED: [STATUS_SETUP], 38 STATUS_SETUP: [STATUS_COPYING, STATUS_FAILED], 39 STATUS_COPYING: [STATUS_RUNNING, STATUS_FAILED], 40 STATUS_RUNNING: [STATUS_SUCCEEDED, STATUS_FAILED] 41 } 42 43 final_states = [STATUS_SUCCEEDED, STATUS_FAILED] 44 45 46 class JobFailure(Exception): 47 48 def __init__(self, message, exit_code): 49 Exception.__init__(self, message) 50 self.exit_code = exit_code 51 52 53 class Job(object): 54 """A class representing a job whose commands will be executed.""" 55 56 WORKDIR_PREFIX = '/usr/local/google/tmp/automation' 57 58 def __init__(self, label, command, timeout=4 * 60 * 60): 59 self._state = JobStateMachine(STATUS_NOT_EXECUTED) 60 self.predecessors = set() 61 self.successors = set() 62 self.machine_dependencies = [] 63 self.folder_dependencies = [] 64 self.id = 0 65 self.machines = [] 66 self.command = command 67 self._has_primary_machine_spec = False 68 self.group = None 69 self.dry_run = None 70 self.label = label 71 self.timeout = timeout 72 73 def _StateGet(self): 74 return self._state 75 76 def _StateSet(self, new_state): 77 self._state.Change(new_state) 78 79 status = property(_StateGet, _StateSet) 80 81 @property 82 def timeline(self): 83 return self._state.timeline 84 85 def __repr__(self): 86 return '{%s: %s}' % (self.__class__.__name__, self.id) 87 88 def __str__(self): 89 res = [] 90 res.append('%d' % self.id) 91 res.append('Predecessors:') 92 res.extend(['%d' % pred.id for pred in self.predecessors]) 93 res.append('Successors:') 94 res.extend(['%d' % succ.id for succ in self.successors]) 95 res.append('Machines:') 96 res.extend(['%s' % machine for machine in self.machines]) 97 res.append(self.PrettyFormatCommand()) 98 res.append('%s' % self.status) 99 res.append(self.timeline.GetTransitionEventReport()) 100 return '\n'.join(res) 101 102 @staticmethod 103 def _FormatCommand(cmd, substitutions): 104 for pattern, replacement in substitutions: 105 cmd = cmd.replace(pattern, replacement) 106 107 return cmd 108 109 def GetCommand(self): 110 substitutions = [ 111 ('$JOB_ID', str(self.id)), ('$JOB_TMP', self.work_dir), 112 ('$JOB_HOME', self.home_dir), 113 ('$PRIMARY_MACHINE', self.primary_machine.hostname) 114 ] 115 116 if len(self.machines) > 1: 117 for num, machine in enumerate(self.machines[1:]): 118 substitutions.append(('$SECONDARY_MACHINES[%d]' % num, machine.hostname 119 )) 120 121 return self._FormatCommand(str(self.command), substitutions) 122 123 def PrettyFormatCommand(self): 124 # TODO(kbaclawski): This method doesn't belong here, but rather to 125 # non existing Command class. If one is created then PrettyFormatCommand 126 # shall become its method. 127 return self._FormatCommand(self.GetCommand(), [ 128 ('\{ ', ''), ('; \}', ''), ('\} ', '\n'), ('\s*&&\s*', '\n') 129 ]) 130 131 def DependsOnFolder(self, dependency): 132 self.folder_dependencies.append(dependency) 133 self.DependsOn(dependency.job) 134 135 @property 136 def results_dir(self): 137 return os.path.join(self.work_dir, 'results') 138 139 @property 140 def logs_dir(self): 141 return os.path.join(self.home_dir, 'logs') 142 143 @property 144 def log_filename_prefix(self): 145 return 'job-%d.log' % self.id 146 147 @property 148 def work_dir(self): 149 return os.path.join(self.WORKDIR_PREFIX, 'job-%d' % self.id) 150 151 @property 152 def home_dir(self): 153 return os.path.join(self.group.home_dir, 'job-%d' % self.id) 154 155 @property 156 def primary_machine(self): 157 return self.machines[0] 158 159 def DependsOn(self, job): 160 """Specifies Jobs to be finished before this job can be launched.""" 161 self.predecessors.add(job) 162 job.successors.add(self) 163 164 @property 165 def is_ready(self): 166 """Check that all our dependencies have been executed.""" 167 return all(pred.status == STATUS_SUCCEEDED for pred in self.predecessors) 168 169 def DependsOnMachine(self, machine_spec, primary=True): 170 # Job will run on arbitrarily chosen machine specified by 171 # MachineSpecification class instances passed to this method. 172 if primary: 173 if self._has_primary_machine_spec: 174 raise RuntimeError('Only one primary machine specification allowed.') 175 self._has_primary_machine_spec = True 176 self.machine_dependencies.insert(0, machine_spec) 177 else: 178 self.machine_dependencies.append(machine_spec) 179