Home | History | Annotate | Download | only in site_utils
      1 import abc
      2 import datetime
      3 import glob
      4 import json
      5 import os
      6 import re
      7 import shutil
      8 import time
      9 
     10 import common
     11 from autotest_lib.client.common_lib import time_utils
     12 from autotest_lib.client.common_lib import utils
     13 from autotest_lib.server.cros.dynamic_suite import constants
     14 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
     15 
     16 
     17 _AFE = frontend_wrappers.RetryingAFE()
     18 
     19 SPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+'
     20 JOB_PATTERN = '.*/(\d+)-[^/]+'
     21 
     22 def _is_job_expired(age_limit, timestamp):
     23   """Check whether a job timestamp is older than an age limit.
     24 
     25   @param age_limit: Minimum age, measured in days.  If the value is
     26                     not positive, the job is always expired.
     27   @param timestamp: Timestamp of the job whose age we are checking.
     28                     The format must match time_utils.TIME_FMT.
     29 
     30   @returns True iff the job is old enough to be expired.
     31   """
     32   if age_limit <= 0:
     33     return True
     34   job_time = time_utils.time_string_to_datetime(timestamp)
     35   expiration = job_time + datetime.timedelta(days=age_limit)
     36   return datetime.datetime.now() >= expiration
     37 
     38 
     39 def get_job_id_or_task_id(result_dir):
     40     """Extract job id or special task id from result_dir
     41 
     42     @param result_dir: path to the result dir.
     43             For test job:
     44             /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6
     45             The hostname at the end is optional.
     46             For special task:
     47             /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup
     48 
     49     @returns: integer representing the job id or task id. Returns None if fail
     50               to parse job or task id from the result_dir.
     51     """
     52     if not result_dir:
     53         return
     54     result_dir = os.path.abspath(result_dir)
     55     # Result folder for job running inside container has only job id.
     56     ssp_job_pattern = '.*/(\d+)$'
     57     # Try to get the job ID from the last pattern of number-text. This avoids
     58     # issue with path like 123-results/456-debug_user, in which 456 is the real
     59     # job ID.
     60     m_job = re.findall(JOB_PATTERN, result_dir)
     61     if m_job:
     62         return int(m_job[-1])
     63     m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir)
     64     if m_special_task:
     65         return int(m_special_task.group(1))
     66     m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir)
     67     if m_ssp_job_pattern and utils.is_in_container():
     68         return int(m_ssp_job_pattern.group(1))
     69 
     70 
     71 class _JobDirectory(object):
     72   """State associated with a job to be offloaded.
     73 
     74   The full life-cycle of a job (including failure events that
     75   normally don't occur) looks like this:
     76    1. The job's results directory is discovered by
     77       `get_job_directories()`, and a job instance is created for it.
     78    2. Calls to `offload()` have no effect so long as the job
     79       isn't complete in the database and the job isn't expired
     80       according to the `age_limit` parameter.
     81    3. Eventually, the job is both finished and expired.  The next
     82       call to `offload()` makes the first attempt to offload the
     83       directory to GS.  Offload is attempted, but fails to complete
     84       (e.g. because of a GS problem).
     85    4. After the first failed offload `is_offloaded()` is false,
     86       but `is_reportable()` is also false, so the failure is not
     87       reported.
     88    5. Another call to `offload()` again tries to offload the
     89       directory, and again fails.
     90    6. After a second failure, `is_offloaded()` is false and
     91       `is_reportable()` is true, so the failure generates an e-mail
     92       notification.
     93    7. Finally, a call to `offload()` succeeds, and the directory no
     94       longer exists.  Now `is_offloaded()` is true, so the job
     95       instance is deleted, and future failures will not mention this
     96       directory any more.
     97 
     98   Only steps 1. and 7. are guaranteed to occur.  The others depend
     99   on the timing of calls to `offload()`, and on the reliability of
    100   the actual offload process.
    101 
    102   """
    103 
    104   __metaclass__ = abc.ABCMeta
    105 
    106   GLOB_PATTERN = None   # must be redefined in subclass
    107 
    108   def __init__(self, resultsdir):
    109     self._dirname = resultsdir
    110     self._id = get_job_id_or_task_id(resultsdir)
    111     self._offload_count = 0
    112     self._first_offload_start = 0
    113 
    114   @classmethod
    115   def get_job_directories(cls):
    116     """Return a list of directories of jobs that need offloading."""
    117     return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)]
    118 
    119   @abc.abstractmethod
    120   def get_timestamp_if_finished(self):
    121     """Return this job's timestamp from the database.
    122 
    123     If the database has not marked the job as finished, return
    124     `None`.  Otherwise, return a timestamp for the job.  The
    125     timestamp is to be used to determine expiration in
    126     `_is_job_expired()`.
    127 
    128     @return Return `None` if the job is still running; otherwise
    129             return a string with a timestamp in the appropriate
    130             format.
    131     """
    132     raise NotImplementedError("_JobDirectory.get_timestamp_if_finished")
    133 
    134   def enqueue_offload(self, queue, age_limit):
    135     """Enqueue the job for offload, if it's eligible.
    136 
    137     The job is eligible for offloading if the database has marked
    138     it finished, and the job is older than the `age_limit`
    139     parameter.
    140 
    141     If the job is eligible, offload processing is requested by
    142     passing the `queue` parameter's `put()` method a sequence with
    143     the job's `_dirname` attribute and its directory name.
    144 
    145     @param queue     If the job should be offloaded, put the offload
    146                      parameters into this queue for processing.
    147     @param age_limit Minimum age for a job to be offloaded.  A value
    148                      of 0 means that the job will be offloaded as
    149                      soon as it is finished.
    150 
    151     """
    152     if not self._offload_count:
    153       timestamp = self.get_timestamp_if_finished()
    154       if not timestamp:
    155         return
    156       if not _is_job_expired(age_limit, timestamp):
    157         return
    158       self._first_offload_start = time.time()
    159     self._offload_count += 1
    160     if self.process_gs_instructions():
    161       queue.put([self._dirname, os.path.dirname(self._dirname)])
    162 
    163   def is_offloaded(self):
    164     """Return whether this job has been successfully offloaded."""
    165     return not os.path.exists(self._dirname)
    166 
    167   def is_reportable(self):
    168     """Return whether this job has a reportable failure."""
    169     return self._offload_count > 1
    170 
    171   def get_failure_time(self):
    172     """Return the time of the first offload failure."""
    173     return self._first_offload_start
    174 
    175   def get_failure_count(self):
    176     """Return the number of times this job has failed to offload."""
    177     return self._offload_count
    178 
    179   def get_job_directory(self):
    180     """Return the name of this job's results directory."""
    181     return self._dirname
    182 
    183   def process_gs_instructions(self):
    184     """Process any gs_offloader instructions for this special task.
    185 
    186     @returns True/False if there is anything left to offload.
    187     """
    188     # Default support is to still offload the directory.
    189     return True
    190 
    191 
    192 class RegularJobDirectory(_JobDirectory):
    193   """Subclass of _JobDirectory for regular test jobs."""
    194 
    195   GLOB_PATTERN = '[0-9]*-*'
    196 
    197   def process_gs_instructions(self):
    198     """Process any gs_offloader instructions for this job.
    199 
    200     @returns True/False if there is anything left to offload.
    201     """
    202     # Go through the gs_offloader instructions file for each test in this job.
    203     for path in glob.glob(os.path.join(self._dirname, '*',
    204                                        constants.GS_OFFLOADER_INSTRUCTIONS)):
    205       with open(path, 'r') as f:
    206         gs_off_instructions = json.load(f)
    207       if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD):
    208         shutil.rmtree(os.path.dirname(path))
    209 
    210     # Finally check if there's anything left to offload.
    211     if not os.listdir(self._dirname):
    212       shutil.rmtree(self._dirname)
    213       return False
    214     return True
    215 
    216 
    217   def get_timestamp_if_finished(self):
    218     """Get the timestamp to use for finished jobs.
    219 
    220     @returns the latest hqe finished_on time. If the finished_on times are null
    221              returns the job's created_on time.
    222     """
    223     entry = _AFE.get_jobs(id=self._id, finished=True)
    224     if not entry:
    225       return None
    226     hqes = _AFE.get_host_queue_entries(finished_on__isnull=False,
    227                                        job_id=self._id)
    228     if not hqes:
    229       return entry[0].created_on
    230     # While most Jobs have 1 HQE, some can have multiple, so check them all.
    231     return max([hqe.finished_on for hqe in hqes])
    232 
    233 
    234 class SpecialJobDirectory(_JobDirectory):
    235   """Subclass of _JobDirectory for special (per-host) jobs."""
    236 
    237   GLOB_PATTERN = 'hosts/*/[0-9]*-*'
    238 
    239   def __init__(self, resultsdir):
    240     super(SpecialJobDirectory, self).__init__(resultsdir)
    241 
    242   def get_timestamp_if_finished(self):
    243     entry = _AFE.get_special_tasks(id=self._id, is_complete=True)
    244     return entry[0].time_finished if entry else None
    245