Home | History | Annotate | Download | only in site_utils
      1 import abc
      2 import datetime
      3 import glob
      4 import json
      5 import os
      6 import re
      7 import shutil
      8 import time
      9 
     10 import common
     11 from autotest_lib.client.common_lib import time_utils
     12 from autotest_lib.client.common_lib import utils
     13 from autotest_lib.server.cros.dynamic_suite import constants
     14 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
     15 
     16 
     17 _AFE = frontend_wrappers.RetryingAFE()
     18 
     19 SPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+'
     20 JOB_PATTERN = '.*/(\d+)-[^/]+'
     21 # Pattern of a job folder, e.g., 123-debug_user, where 123 is job id and
     22 # debug_user is the name of user starts the job.
     23 JOB_FOLDER_PATTERN = '.*/(\d+-[^/]+)'
     24 
     25 def is_job_expired(age_limit, timestamp):
     26   """Check whether a job timestamp is older than an age limit.
     27 
     28   @param age_limit: Minimum age, measured in days.  If the value is
     29                     not positive, the job is always expired.
     30   @param timestamp: Timestamp of the job whose age we are checking.
     31                     The format must match time_utils.TIME_FMT.
     32 
     33   @returns True iff the job is old enough to be expired.
     34   """
     35   if age_limit <= 0:
     36     return True
     37   job_time = time_utils.time_string_to_datetime(timestamp)
     38   expiration = job_time + datetime.timedelta(days=age_limit)
     39   return datetime.datetime.now() >= expiration
     40 
     41 
     42 def get_job_id_or_task_id(result_dir):
     43     """Extract job id or special task id from result_dir
     44 
     45     @param result_dir: path to the result dir.
     46             For test job:
     47             /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6
     48             The hostname at the end is optional.
     49             For special task:
     50             /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup
     51 
     52     @returns: integer representing the job id or task id. Returns None if fail
     53               to parse job or task id from the result_dir.
     54     """
     55     if not result_dir:
     56         return
     57     result_dir = os.path.abspath(result_dir)
     58     # Result folder for job running inside container has only job id.
     59     ssp_job_pattern = '.*/(\d+)$'
     60     # Try to get the job ID from the last pattern of number-text. This avoids
     61     # issue with path like 123-results/456-debug_user, in which 456 is the real
     62     # job ID.
     63     m_job = re.findall(JOB_PATTERN, result_dir)
     64     if m_job:
     65         return int(m_job[-1])
     66     m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir)
     67     if m_special_task:
     68         return int(m_special_task.group(1))
     69     m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir)
     70     if m_ssp_job_pattern and utils.is_in_container():
     71         return int(m_ssp_job_pattern.group(1))
     72 
     73 
     74 def get_job_folder_name(result_dir):
     75     """Extract folder name of a job from result_dir.
     76 
     77     @param result_dir: path to the result dir.
     78             For test job:
     79             /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6
     80             The hostname at the end is optional.
     81             For special task:
     82             /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup
     83 
     84     @returns: The name of the folder of a job. Returns None if fail to parse
     85             the name matching pattern JOB_FOLDER_PATTERN from the result_dir.
     86     """
     87     if not result_dir:
     88         return
     89     m_job = re.findall(JOB_FOLDER_PATTERN, result_dir)
     90     if m_job:
     91         return m_job[-1]
     92 
     93 
     94 class _JobDirectory(object):
     95   """State associated with a job to be offloaded.
     96 
     97   The full life-cycle of a job (including failure events that
     98   normally don't occur) looks like this:
     99    1. The job's results directory is discovered by
    100       `get_job_directories()`, and a job instance is created for it.
    101    2. Calls to `offload()` have no effect so long as the job
    102       isn't complete in the database and the job isn't expired
    103       according to the `age_limit` parameter.
    104    3. Eventually, the job is both finished and expired.  The next
    105       call to `offload()` makes the first attempt to offload the
    106       directory to GS.  Offload is attempted, but fails to complete
    107       (e.g. because of a GS problem).
    108    4. Finally, a call to `offload()` succeeds, and the directory no
    109       longer exists.  Now `is_offloaded()` is true, so the job
    110       instance is deleted, and future failures will not mention this
    111       directory any more.
    112 
    113   Only steps 1. and 4. are guaranteed to occur.  The others depend
    114   on the timing of calls to `offload()`, and on the reliability of
    115   the actual offload process.
    116 
    117   """
    118 
    119   __metaclass__ = abc.ABCMeta
    120 
    121   GLOB_PATTERN = None   # must be redefined in subclass
    122 
    123   def __init__(self, resultsdir):
    124     self._dirname = resultsdir
    125     self._id = get_job_id_or_task_id(resultsdir)
    126     self._offload_count = 0
    127     self._first_offload_start = 0
    128 
    129   @classmethod
    130   def get_job_directories(cls):
    131     """Return a list of directories of jobs that need offloading."""
    132     return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)]
    133 
    134   @abc.abstractmethod
    135   def get_timestamp_if_finished(self):
    136     """Return this job's timestamp from the database.
    137 
    138     If the database has not marked the job as finished, return
    139     `None`.  Otherwise, return a timestamp for the job.  The
    140     timestamp is to be used to determine expiration in
    141     `is_job_expired()`.
    142 
    143     @return Return `None` if the job is still running; otherwise
    144             return a string with a timestamp in the appropriate
    145             format.
    146     """
    147     raise NotImplementedError("_JobDirectory.get_timestamp_if_finished")
    148 
    149   def enqueue_offload(self, queue, age_limit):
    150     """Enqueue the job for offload, if it's eligible.
    151 
    152     The job is eligible for offloading if the database has marked
    153     it finished, and the job is older than the `age_limit`
    154     parameter.
    155 
    156     If the job is eligible, offload processing is requested by
    157     passing the `queue` parameter's `put()` method a sequence with
    158     the job's `_dirname` attribute and its directory name.
    159 
    160     @param queue     If the job should be offloaded, put the offload
    161                      parameters into this queue for processing.
    162     @param age_limit Minimum age for a job to be offloaded.  A value
    163                      of 0 means that the job will be offloaded as
    164                      soon as it is finished.
    165 
    166     """
    167     timestamp = self.get_timestamp_if_finished()
    168     if not self._offload_count:
    169       if not timestamp:
    170         return
    171       if not is_job_expired(age_limit, timestamp):
    172         return
    173       self._first_offload_start = time.time()
    174     self._offload_count += 1
    175     if self.process_gs_instructions():
    176       queue.put([self._dirname, os.path.dirname(self._dirname), timestamp])
    177 
    178   def is_offloaded(self):
    179     """Return whether this job has been successfully offloaded."""
    180     return not os.path.exists(self._dirname)
    181 
    182   def get_failure_time(self):
    183     """Return the time of the first offload failure."""
    184     return self._first_offload_start
    185 
    186   def get_failure_count(self):
    187     """Return the number of times this job has failed to offload."""
    188     return self._offload_count
    189 
    190   def get_job_directory(self):
    191     """Return the name of this job's results directory."""
    192     return self._dirname
    193 
    194   def process_gs_instructions(self):
    195     """Process any gs_offloader instructions for this special task.
    196 
    197     @returns True/False if there is anything left to offload.
    198     """
    199     # Default support is to still offload the directory.
    200     return True
    201 
    202 
    203 NO_OFFLOAD_README = """These results have been deleted rather than offloaded.
    204 This is the expected behavior for passing jobs from the Commit Queue."""
    205 
    206 
    207 class RegularJobDirectory(_JobDirectory):
    208   """Subclass of _JobDirectory for regular test jobs."""
    209 
    210   GLOB_PATTERN = '[0-9]*-*'
    211 
    212   def process_gs_instructions(self):
    213     """Process any gs_offloader instructions for this job.
    214 
    215     @returns True/False if there is anything left to offload.
    216     """
    217     # Go through the gs_offloader instructions file for each test in this job.
    218     for path in glob.glob(os.path.join(self._dirname, '*',
    219                                        constants.GS_OFFLOADER_INSTRUCTIONS)):
    220       with open(path, 'r') as f:
    221         gs_off_instructions = json.load(f)
    222       if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD):
    223         dirname = os.path.dirname(path)
    224         shutil.rmtree(dirname)
    225         os.mkdir(dirname)
    226         breadcrumb_name = os.path.join(dirname, 'logs-removed-readme.txt')
    227         with open(breadcrumb_name, 'w') as f:
    228           f.write(NO_OFFLOAD_README)
    229 
    230     # Finally check if there's anything left to offload.
    231     if not os.listdir(self._dirname):
    232       shutil.rmtree(self._dirname)
    233       return False
    234     return True
    235 
    236 
    237   def get_timestamp_if_finished(self):
    238     """Get the timestamp to use for finished jobs.
    239 
    240     @returns the latest hqe finished_on time. If the finished_on times are null
    241              returns the job's created_on time.
    242     """
    243     entry = _AFE.get_jobs(id=self._id, finished=True)
    244     if not entry:
    245       return None
    246     hqes = _AFE.get_host_queue_entries(finished_on__isnull=False,
    247                                        job_id=self._id)
    248     if not hqes:
    249       return entry[0].created_on
    250     # While most Jobs have 1 HQE, some can have multiple, so check them all.
    251     return max([hqe.finished_on for hqe in hqes])
    252 
    253 
    254 class SpecialJobDirectory(_JobDirectory):
    255   """Subclass of _JobDirectory for special (per-host) jobs."""
    256 
    257   GLOB_PATTERN = 'hosts/*/[0-9]*-*'
    258 
    259   def __init__(self, resultsdir):
    260     super(SpecialJobDirectory, self).__init__(resultsdir)
    261 
    262   def get_timestamp_if_finished(self):
    263     entry = _AFE.get_special_tasks(id=self._id, is_complete=True)
    264     return entry[0].time_finished if entry else None
    265