1 # Copyright 2010 Google Inc. All Rights Reserved. 2 # 3 4 import copy 5 import logging 6 import threading 7 8 from automation.common import command as cmd 9 from automation.common import logger 10 from automation.common.command_executer import CommandExecuter 11 from automation.common import job 12 from automation.common import job_group 13 from automation.server.job_manager import IdProducerPolicy 14 15 16 class JobGroupManager(object): 17 18 def __init__(self, job_manager): 19 self.all_job_groups = [] 20 21 self.job_manager = job_manager 22 self.job_manager.AddListener(self) 23 24 self._lock = threading.Lock() 25 self._job_group_finished = threading.Condition(self._lock) 26 27 self._id_producer = IdProducerPolicy() 28 self._id_producer.Initialize(job_group.JobGroup.HOMEDIR_PREFIX, 29 'job-group-(?P<id>\d+)') 30 31 self._logger = logging.getLogger(self.__class__.__name__) 32 33 def GetJobGroup(self, group_id): 34 with self._lock: 35 for group in self.all_job_groups: 36 if group.id == group_id: 37 return group 38 39 return None 40 41 def GetAllJobGroups(self): 42 with self._lock: 43 return copy.deepcopy(self.all_job_groups) 44 45 def AddJobGroup(self, group): 46 with self._lock: 47 group.id = self._id_producer.GetNextId() 48 49 self._logger.debug('Creating runtime environment for %r.', group) 50 51 CommandExecuter().RunCommand(cmd.Chain( 52 cmd.RmTree(group.home_dir), cmd.MakeDir(group.home_dir))) 53 54 with self._lock: 55 self.all_job_groups.append(group) 56 57 for job_ in group.jobs: 58 self.job_manager.AddJob(job_) 59 60 group.status = job_group.STATUS_EXECUTING 61 62 self._logger.info('Added %r to queue.', group) 63 64 return group.id 65 66 def KillJobGroup(self, group): 67 with self._lock: 68 self._logger.debug('Killing all jobs that belong to %r.', group) 69 70 for job_ in group.jobs: 71 self.job_manager.KillJob(job_) 72 73 self._logger.debug('Waiting for jobs to quit.') 74 75 # Lets block until the group is killed so we know it is completed 76 # when we return. 77 while group.status not in [job_group.STATUS_SUCCEEDED, 78 job_group.STATUS_FAILED]: 79 self._job_group_finished.wait() 80 81 def NotifyJobComplete(self, job_): 82 self._logger.debug('Handling %r completion event.', job_) 83 84 group = job_.group 85 86 with self._lock: 87 # We need to perform an action only if the group hasn't already failed. 88 if group.status != job_group.STATUS_FAILED: 89 if job_.status == job.STATUS_FAILED: 90 # We have a failed job, abort the job group 91 group.status = job_group.STATUS_FAILED 92 if group.cleanup_on_failure: 93 for job_ in group.jobs: 94 # TODO(bjanakiraman): We should probably only kill dependent jobs 95 # instead of the whole job group. 96 self.job_manager.KillJob(job_) 97 self.job_manager.CleanUpJob(job_) 98 else: 99 # The job succeeded successfully -- lets check to see if we are done. 100 assert job_.status == job.STATUS_SUCCEEDED 101 finished = True 102 for other_job in group.jobs: 103 assert other_job.status != job.STATUS_FAILED 104 if other_job.status != job.STATUS_SUCCEEDED: 105 finished = False 106 break 107 108 if finished and group.status != job_group.STATUS_SUCCEEDED: 109 # TODO(kbaclawski): Without check performed above following code 110 # could be called more than once. This would trigger StateMachine 111 # crash, because it cannot transition from STATUS_SUCCEEDED to 112 # STATUS_SUCCEEDED. Need to address that bug in near future. 113 group.status = job_group.STATUS_SUCCEEDED 114 if group.cleanup_on_completion: 115 for job_ in group.jobs: 116 self.job_manager.CleanUpJob(job_) 117 118 self._job_group_finished.notifyAll() 119