Home | History | Annotate | Download | only in server
      1 # Copyright 2010 Google Inc. All Rights Reserved.
      2 #
      3 
      4 import copy
      5 import logging
      6 import threading
      7 
      8 from automation.common import command as cmd
      9 from automation.common import logger
     10 from automation.common.command_executer import CommandExecuter
     11 from automation.common import job
     12 from automation.common import job_group
     13 from automation.server.job_manager import IdProducerPolicy
     14 
     15 
     16 class JobGroupManager(object):
     17 
     18   def __init__(self, job_manager):
     19     self.all_job_groups = []
     20 
     21     self.job_manager = job_manager
     22     self.job_manager.AddListener(self)
     23 
     24     self._lock = threading.Lock()
     25     self._job_group_finished = threading.Condition(self._lock)
     26 
     27     self._id_producer = IdProducerPolicy()
     28     self._id_producer.Initialize(job_group.JobGroup.HOMEDIR_PREFIX,
     29                                  'job-group-(?P<id>\d+)')
     30 
     31     self._logger = logging.getLogger(self.__class__.__name__)
     32 
     33   def GetJobGroup(self, group_id):
     34     with self._lock:
     35       for group in self.all_job_groups:
     36         if group.id == group_id:
     37           return group
     38 
     39       return None
     40 
     41   def GetAllJobGroups(self):
     42     with self._lock:
     43       return copy.deepcopy(self.all_job_groups)
     44 
     45   def AddJobGroup(self, group):
     46     with self._lock:
     47       group.id = self._id_producer.GetNextId()
     48 
     49     self._logger.debug('Creating runtime environment for %r.', group)
     50 
     51     CommandExecuter().RunCommand(cmd.Chain(
     52         cmd.RmTree(group.home_dir), cmd.MakeDir(group.home_dir)))
     53 
     54     with self._lock:
     55       self.all_job_groups.append(group)
     56 
     57       for job_ in group.jobs:
     58         self.job_manager.AddJob(job_)
     59 
     60       group.status = job_group.STATUS_EXECUTING
     61 
     62     self._logger.info('Added %r to queue.', group)
     63 
     64     return group.id
     65 
     66   def KillJobGroup(self, group):
     67     with self._lock:
     68       self._logger.debug('Killing all jobs that belong to %r.', group)
     69 
     70       for job_ in group.jobs:
     71         self.job_manager.KillJob(job_)
     72 
     73       self._logger.debug('Waiting for jobs to quit.')
     74 
     75       # Lets block until the group is killed so we know it is completed
     76       # when we return.
     77       while group.status not in [job_group.STATUS_SUCCEEDED,
     78                                  job_group.STATUS_FAILED]:
     79         self._job_group_finished.wait()
     80 
     81   def NotifyJobComplete(self, job_):
     82     self._logger.debug('Handling %r completion event.', job_)
     83 
     84     group = job_.group
     85 
     86     with self._lock:
     87       # We need to perform an action only if the group hasn't already failed.
     88       if group.status != job_group.STATUS_FAILED:
     89         if job_.status == job.STATUS_FAILED:
     90           # We have a failed job, abort the job group
     91           group.status = job_group.STATUS_FAILED
     92           if group.cleanup_on_failure:
     93             for job_ in group.jobs:
     94               # TODO(bjanakiraman): We should probably only kill dependent jobs
     95               # instead of the whole job group.
     96               self.job_manager.KillJob(job_)
     97               self.job_manager.CleanUpJob(job_)
     98         else:
     99           # The job succeeded successfully -- lets check to see if we are done.
    100           assert job_.status == job.STATUS_SUCCEEDED
    101           finished = True
    102           for other_job in group.jobs:
    103             assert other_job.status != job.STATUS_FAILED
    104             if other_job.status != job.STATUS_SUCCEEDED:
    105               finished = False
    106               break
    107 
    108           if finished and group.status != job_group.STATUS_SUCCEEDED:
    109             # TODO(kbaclawski): Without check performed above following code
    110             # could be called more than once. This would trigger StateMachine
    111             # crash, because it cannot transition from STATUS_SUCCEEDED to
    112             # STATUS_SUCCEEDED. Need to address that bug in near future.
    113             group.status = job_group.STATUS_SUCCEEDED
    114             if group.cleanup_on_completion:
    115               for job_ in group.jobs:
    116                 self.job_manager.CleanUpJob(job_)
    117 
    118         self._job_group_finished.notifyAll()
    119