Home | History | Annotate | Download | only in site_utils
      1 import abc
      2 import datetime
      3 import glob
      4 import json
      5 import logging
      6 import os
      7 import re
      8 import shutil
      9 
     10 import common
     11 from autotest_lib.client.common_lib import time_utils
     12 from autotest_lib.client.common_lib import utils
     13 from autotest_lib.server.cros.dynamic_suite import constants
     14 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
     15 
     16 try:
     17     from chromite.lib import metrics
     18 except ImportError:
     19     metrics = utils.metrics_mock
     20 
     21 
     22 SPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+'
     23 
     24 def is_job_expired(age_limit, timestamp):
     25   """Check whether a job timestamp is older than an age limit.
     26 
     27   @param age_limit: Minimum age, measured in days.  If the value is
     28                     not positive, the job is always expired.
     29   @param timestamp: Timestamp of the job whose age we are checking.
     30                     The format must match time_utils.TIME_FMT.
     31 
     32   @returns True iff the job is old enough to be expired.
     33   """
     34   if age_limit <= 0:
     35     return True
     36   job_time = time_utils.time_string_to_datetime(timestamp)
     37   expiration = job_time + datetime.timedelta(days=age_limit)
     38   return datetime.datetime.now() >= expiration
     39 
     40 
     41 def get_job_id_or_task_id(result_dir):
     42     """Extract job id or special task id from result_dir
     43 
     44     @param result_dir: path to the result dir.
     45             For test job:
     46             /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6
     47             The hostname at the end is optional.
     48             For special task:
     49             /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup
     50 
     51     @returns: str representing the job id or task id. Returns None if fail
     52         to parse job or task id from the result_dir.
     53     """
     54     if not result_dir:
     55         return
     56     result_dir = os.path.abspath(result_dir)
     57     # Result folder for job running inside container has only job id.
     58     ssp_job_pattern = '.*/(\d+)$'
     59     # Try to get the job ID from the last pattern of number-text. This avoids
     60     # issue with path like 123-results/456-debug_user, in which 456 is the real
     61     # job ID.
     62     m_job = re.findall('.*/(\d+)-[^/]+', result_dir)
     63     if m_job:
     64         return m_job[-1]
     65     m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir)
     66     if m_special_task:
     67         return m_special_task.group(1)
     68     m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir)
     69     if m_ssp_job_pattern and utils.is_in_container():
     70         return m_ssp_job_pattern.group(1)
     71     return _get_swarming_run_id(result_dir)
     72 
     73 
     74 def _get_swarming_run_id(path):
     75     """Extract the Swarming run_id for a Skylab task from the result path."""
     76     # Legacy swarming results are in directories like
     77     #   .../results/swarming-3e4391423c3a4311
     78     # In particular, the ending digit is never 0
     79     m_legacy_path = re.match('.*/swarming-([0-9a-fA-F]*[1-9a-fA-F])$', path)
     80     if m_legacy_path:
     81         return m_legacy_path.group(1)
     82     # New style swarming results are in directories like
     83     #   .../results/swarming-3e4391423c3a4310/1
     84     # - Results are one directory deeper.
     85     # - Ending digit of first directory is always 0.
     86     m_path = re.match('.*/swarming-([0-9a-fA-F]*)0/([1-9a-fA-F])$', path)
     87     if m_path:
     88       return m_path.group(1) + m_path.group(2)
     89     return None
     90 
     91 
     92 class _JobDirectory(object):
     93   """State associated with a job to be offloaded.
     94 
     95   The full life-cycle of a job (including failure events that
     96   normally don't occur) looks like this:
     97    1. The job's results directory is discovered by
     98       `get_job_directories()`, and a job instance is created for it.
     99    2. Calls to `offload()` have no effect so long as the job
    100       isn't complete in the database and the job isn't expired
    101       according to the `age_limit` parameter.
    102    3. Eventually, the job is both finished and expired.  The next
    103       call to `offload()` makes the first attempt to offload the
    104       directory to GS.  Offload is attempted, but fails to complete
    105       (e.g. because of a GS problem).
    106    4. Finally, a call to `offload()` succeeds, and the directory no
    107       longer exists.  Now `is_offloaded()` is true, so the job
    108       instance is deleted, and future failures will not mention this
    109       directory any more.
    110 
    111   Only steps 1. and 4. are guaranteed to occur.  The others depend
    112   on the timing of calls to `offload()`, and on the reliability of
    113   the actual offload process.
    114 
    115   """
    116 
    117   __metaclass__ = abc.ABCMeta
    118 
    119   GLOB_PATTERN = None   # must be redefined in subclass
    120 
    121   def __init__(self, resultsdir):
    122     self.dirname = resultsdir
    123     self._id = get_job_id_or_task_id(resultsdir)
    124     self.offload_count = 0
    125     self.first_offload_start = 0
    126 
    127   @classmethod
    128   def get_job_directories(cls):
    129     """Return a list of directories of jobs that need offloading."""
    130     return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)]
    131 
    132   @abc.abstractmethod
    133   def get_timestamp_if_finished(self):
    134     """Return this job's timestamp from the database.
    135 
    136     If the database has not marked the job as finished, return
    137     `None`.  Otherwise, return a timestamp for the job.  The
    138     timestamp is to be used to determine expiration in
    139     `is_job_expired()`.
    140 
    141     @return Return `None` if the job is still running; otherwise
    142             return a string with a timestamp in the appropriate
    143             format.
    144     """
    145     raise NotImplementedError("_JobDirectory.get_timestamp_if_finished")
    146 
    147   def process_gs_instructions(self):
    148     """Process any gs_offloader instructions for this special task.
    149 
    150     @returns True/False if there is anything left to offload.
    151     """
    152     # Default support is to still offload the directory.
    153     return True
    154 
    155 
    156 NO_OFFLOAD_README = """These results have been deleted rather than offloaded.
    157 This is the expected behavior for passing jobs from the Commit Queue."""
    158 
    159 
    160 class RegularJobDirectory(_JobDirectory):
    161   """Subclass of _JobDirectory for regular test jobs."""
    162 
    163   GLOB_PATTERN = '[0-9]*-*'
    164 
    165   def process_gs_instructions(self):
    166     """Process any gs_offloader instructions for this job.
    167 
    168     @returns True/False if there is anything left to offload.
    169     """
    170     # Go through the gs_offloader instructions file for each test in this job.
    171     for path in glob.glob(os.path.join(self.dirname, '*',
    172                                        constants.GS_OFFLOADER_INSTRUCTIONS)):
    173       with open(path, 'r') as f:
    174         gs_off_instructions = json.load(f)
    175       if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD):
    176         dirname = os.path.dirname(path)
    177         _remove_log_directory_contents(dirname)
    178 
    179     # Finally check if there's anything left to offload.
    180     if os.path.exists(self.dirname) and not os.listdir(self.dirname):
    181       shutil.rmtree(self.dirname)
    182       return False
    183     return True
    184 
    185   def get_timestamp_if_finished(self):
    186     """Get the timestamp to use for finished jobs.
    187 
    188     @returns the latest hqe finished_on time. If the finished_on times are null
    189              returns the job's created_on time.
    190     """
    191     entry = _cached_afe().get_jobs(id=self._id, finished=True)
    192     if not entry:
    193       return None
    194     hqes = _cached_afe().get_host_queue_entries(finished_on__isnull=False,
    195                                                 job_id=self._id)
    196     if not hqes:
    197       return entry[0].created_on
    198     # While most Jobs have 1 HQE, some can have multiple, so check them all.
    199     return max([hqe.finished_on for hqe in hqes])
    200 
    201 
    202 def _remove_log_directory_contents(dirpath):
    203     """Remove log directory contents.
    204 
    205     Leave a note explaining what has happened to the logs.
    206 
    207     @param dirpath: Path to log directory.
    208     """
    209     shutil.rmtree(dirpath)
    210     os.mkdir(dirpath)
    211     breadcrumb_name = os.path.join(dirpath, 'logs-removed-readme.txt')
    212     with open(breadcrumb_name, 'w') as f:
    213       f.write(NO_OFFLOAD_README)
    214 
    215 
    216 class SpecialJobDirectory(_JobDirectory):
    217   """Subclass of _JobDirectory for special (per-host) jobs."""
    218 
    219   GLOB_PATTERN = 'hosts/*/[0-9]*-*'
    220 
    221   def __init__(self, resultsdir):
    222     super(SpecialJobDirectory, self).__init__(resultsdir)
    223 
    224   def get_timestamp_if_finished(self):
    225     entry = _cached_afe().get_special_tasks(id=self._id, is_complete=True)
    226     return entry[0].time_finished if entry else None
    227 
    228 
    229 _OFFLOAD_MARKER = ".ready_for_offload"
    230 _marker_parse_error_metric = metrics.Counter(
    231     'chromeos/autotest/gs_offloader/offload_marker_parse_errors',
    232     description='Errors parsing the offload marker file')
    233 
    234 
    235 class SwarmingJobDirectory(_JobDirectory):
    236   """Subclass of _JobDirectory for Skylab swarming jobs."""
    237 
    238   @classmethod
    239   def get_job_directories(cls):
    240     """Return a list of directories of jobs that need offloading."""
    241     # Legacy swarming results are in directories like
    242     #   .../results/swarming-3e4391423c3a4311
    243     # In particular, the ending digit is never 0
    244     jobdirs = [d for d in glob.glob('swarming-[0-9a-f]*[1-9a-f]')
    245                  if os.path.isdir(d)]
    246     # New style swarming results are in directories like
    247     #   .../results/swarming-3e4391423c3a4310/1
    248     # - Results are one directory deeper.
    249     # - Ending digit of first directory is always 0.
    250     new_style_topdir = [d for d in glob.glob('swarming-[0-9a-f]*0')
    251                         if os.path.isdir(d)]
    252     for topdir in new_style_topdir:
    253       subdirs = [d for d in glob.glob('%s/[1-9a-f]*' % topdir)
    254                  if os.path.isdir(d)]
    255       jobdirs += subdirs
    256     return jobdirs
    257 
    258 
    259 
    260   def get_timestamp_if_finished(self):
    261     """Get the timestamp to use for finished jobs.
    262 
    263     @returns the latest hqe finished_on time. If the finished_on times are null
    264              returns the job's created_on time.
    265     """
    266     marker_path = os.path.join(self.dirname, _OFFLOAD_MARKER)
    267     try:
    268       with open(marker_path) as f:
    269         ts_string = f.read().strip()
    270     except (OSError, IOError) as e:
    271       return None
    272     try:
    273       ts = int(ts_string)
    274       return time_utils.epoch_time_to_date_string(ts)
    275     except ValueError as e:
    276       logging.debug('Error parsing %s for %s: %s',
    277                     _OFFLOAD_MARKER, self.dirname, e)
    278       _marker_parse_error_metric.increment()
    279       return None
    280 
    281 
    282 _AFE = None
    283 def _cached_afe():
    284   global _AFE
    285   if _AFE is None:
    286     _AFE = frontend_wrappers.RetryingAFE()
    287   return _AFE
    288