Home | History | Annotate | Download | only in server
      1 # Copyright 2010 Google Inc. All Rights Reserved.
      2 #
      3 
      4 import logging
      5 import os
      6 import re
      7 import threading
      8 
      9 from automation.common import job
     10 from automation.common import logger
     11 from automation.server.job_executer import JobExecuter
     12 
     13 
     14 class IdProducerPolicy(object):
     15   """Produces series of unique integer IDs.
     16 
     17   Example:
     18       id_producer = IdProducerPolicy()
     19       id_a = id_producer.GetNextId()
     20       id_b = id_producer.GetNextId()
     21       assert id_a != id_b
     22   """
     23 
     24   def __init__(self):
     25     self._counter = 1
     26 
     27   def Initialize(self, home_prefix, home_pattern):
     28     """Find first available ID based on a directory listing.
     29 
     30     Args:
     31       home_prefix: A directory to be traversed.
     32       home_pattern: A regexp describing all files/directories that will be
     33         considered. The regexp must contain exactly one match group with name
     34         "id", which must match an integer number.
     35 
     36     Example:
     37       id_producer.Initialize(JOBDIR_PREFIX, 'job-(?P<id>\d+)')
     38     """
     39     harvested_ids = []
     40 
     41     if os.path.isdir(home_prefix):
     42       for filename in os.listdir(home_prefix):
     43         path = os.path.join(home_prefix, filename)
     44 
     45         if os.path.isdir(path):
     46           match = re.match(home_pattern, filename)
     47 
     48           if match:
     49             harvested_ids.append(int(match.group('id')))
     50 
     51     self._counter = max(harvested_ids or [0]) + 1
     52 
     53   def GetNextId(self):
     54     """Calculates another ID considered to be unique."""
     55     new_id = self._counter
     56     self._counter += 1
     57     return new_id
     58 
     59 
     60 class JobManager(threading.Thread):
     61 
     62   def __init__(self, machine_manager):
     63     threading.Thread.__init__(self, name=self.__class__.__name__)
     64     self.all_jobs = []
     65     self.ready_jobs = []
     66     self.job_executer_mapping = {}
     67 
     68     self.machine_manager = machine_manager
     69 
     70     self._lock = threading.Lock()
     71     self._jobs_available = threading.Condition(self._lock)
     72     self._exit_request = False
     73 
     74     self.listeners = []
     75     self.listeners.append(self)
     76 
     77     self._id_producer = IdProducerPolicy()
     78     self._id_producer.Initialize(job.Job.WORKDIR_PREFIX, 'job-(?P<id>\d+)')
     79 
     80     self._logger = logging.getLogger(self.__class__.__name__)
     81 
     82   def StartJobManager(self):
     83     self._logger.info('Starting...')
     84 
     85     with self._lock:
     86       self.start()
     87       self._jobs_available.notifyAll()
     88 
     89   def StopJobManager(self):
     90     self._logger.info('Shutdown request received.')
     91 
     92     with self._lock:
     93       for job_ in self.all_jobs:
     94         self._KillJob(job_.id)
     95 
     96       # Signal to die
     97       self._exit_request = True
     98       self._jobs_available.notifyAll()
     99 
    100     # Wait for all job threads to finish
    101     for executer in self.job_executer_mapping.values():
    102       executer.join()
    103 
    104   def KillJob(self, job_id):
    105     """Kill a job by id.
    106 
    107     Does not block until the job is completed.
    108     """
    109     with self._lock:
    110       self._KillJob(job_id)
    111 
    112   def GetJob(self, job_id):
    113     for job_ in self.all_jobs:
    114       if job_.id == job_id:
    115         return job_
    116     return None
    117 
    118   def _KillJob(self, job_id):
    119     self._logger.info('Killing [Job: %d].', job_id)
    120 
    121     if job_id in self.job_executer_mapping:
    122       self.job_executer_mapping[job_id].Kill()
    123     for job_ in self.ready_jobs:
    124       if job_.id == job_id:
    125         self.ready_jobs.remove(job_)
    126         break
    127 
    128   def AddJob(self, job_):
    129     with self._lock:
    130       job_.id = self._id_producer.GetNextId()
    131 
    132       self.all_jobs.append(job_)
    133       # Only queue a job as ready if it has no dependencies
    134       if job_.is_ready:
    135         self.ready_jobs.append(job_)
    136 
    137       self._jobs_available.notifyAll()
    138 
    139     return job_.id
    140 
    141   def CleanUpJob(self, job_):
    142     with self._lock:
    143       if job_.id in self.job_executer_mapping:
    144         self.job_executer_mapping[job_.id].CleanUpWorkDir()
    145         del self.job_executer_mapping[job_.id]
    146       # TODO(raymes): remove job from self.all_jobs
    147 
    148   def NotifyJobComplete(self, job_):
    149     self.machine_manager.ReturnMachines(job_.machines)
    150 
    151     with self._lock:
    152       self._logger.debug('Handling %r completion event.', job_)
    153 
    154       if job_.status == job.STATUS_SUCCEEDED:
    155         for succ in job_.successors:
    156           if succ.is_ready:
    157             if succ not in self.ready_jobs:
    158               self.ready_jobs.append(succ)
    159 
    160       self._jobs_available.notifyAll()
    161 
    162   def AddListener(self, listener):
    163     self.listeners.append(listener)
    164 
    165   @logger.HandleUncaughtExceptions
    166   def run(self):
    167     self._logger.info('Started.')
    168 
    169     while not self._exit_request:
    170       with self._lock:
    171         # Get the next ready job, block if there are none
    172         self._jobs_available.wait()
    173 
    174         while self.ready_jobs:
    175           ready_job = self.ready_jobs.pop()
    176 
    177           required_machines = ready_job.machine_dependencies
    178           for pred in ready_job.predecessors:
    179             required_machines[0].AddPreferredMachine(
    180                 pred.primary_machine.hostname)
    181 
    182           machines = self.machine_manager.GetMachines(required_machines)
    183           if not machines:
    184             # If we can't get the necessary machines right now, simply wait
    185             # for some jobs to complete
    186             self.ready_jobs.insert(0, ready_job)
    187             break
    188           else:
    189             # Mark as executing
    190             executer = JobExecuter(ready_job, machines, self.listeners)
    191             executer.start()
    192             self.job_executer_mapping[ready_job.id] = executer
    193 
    194     self._logger.info('Stopped.')
    195