1 import abc 2 import datetime 3 import glob 4 import json 5 import logging 6 import os 7 import re 8 import shutil 9 10 import common 11 from autotest_lib.client.common_lib import time_utils 12 from autotest_lib.client.common_lib import utils 13 from autotest_lib.server.cros.dynamic_suite import constants 14 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 15 16 try: 17 from chromite.lib import metrics 18 except ImportError: 19 metrics = utils.metrics_mock 20 21 22 SPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+' 23 24 def is_job_expired(age_limit, timestamp): 25 """Check whether a job timestamp is older than an age limit. 26 27 @param age_limit: Minimum age, measured in days. If the value is 28 not positive, the job is always expired. 29 @param timestamp: Timestamp of the job whose age we are checking. 30 The format must match time_utils.TIME_FMT. 31 32 @returns True iff the job is old enough to be expired. 33 """ 34 if age_limit <= 0: 35 return True 36 job_time = time_utils.time_string_to_datetime(timestamp) 37 expiration = job_time + datetime.timedelta(days=age_limit) 38 return datetime.datetime.now() >= expiration 39 40 41 def get_job_id_or_task_id(result_dir): 42 """Extract job id or special task id from result_dir 43 44 @param result_dir: path to the result dir. 45 For test job: 46 /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6 47 The hostname at the end is optional. 48 For special task: 49 /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup 50 51 @returns: str representing the job id or task id. Returns None if fail 52 to parse job or task id from the result_dir. 53 """ 54 if not result_dir: 55 return 56 result_dir = os.path.abspath(result_dir) 57 # Result folder for job running inside container has only job id. 58 ssp_job_pattern = '.*/(\d+)$' 59 # Try to get the job ID from the last pattern of number-text. This avoids 60 # issue with path like 123-results/456-debug_user, in which 456 is the real 61 # job ID. 62 m_job = re.findall('.*/(\d+)-[^/]+', result_dir) 63 if m_job: 64 return m_job[-1] 65 m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir) 66 if m_special_task: 67 return m_special_task.group(1) 68 m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir) 69 if m_ssp_job_pattern and utils.is_in_container(): 70 return m_ssp_job_pattern.group(1) 71 return _get_swarming_run_id(result_dir) 72 73 74 def _get_swarming_run_id(path): 75 """Extract the Swarming run_id for a Skylab task from the result path.""" 76 # Legacy swarming results are in directories like 77 # .../results/swarming-3e4391423c3a4311 78 # In particular, the ending digit is never 0 79 m_legacy_path = re.match('.*/swarming-([0-9a-fA-F]*[1-9a-fA-F])$', path) 80 if m_legacy_path: 81 return m_legacy_path.group(1) 82 # New style swarming results are in directories like 83 # .../results/swarming-3e4391423c3a4310/1 84 # - Results are one directory deeper. 85 # - Ending digit of first directory is always 0. 86 m_path = re.match('.*/swarming-([0-9a-fA-F]*)0/([1-9a-fA-F])$', path) 87 if m_path: 88 return m_path.group(1) + m_path.group(2) 89 return None 90 91 92 class _JobDirectory(object): 93 """State associated with a job to be offloaded. 94 95 The full life-cycle of a job (including failure events that 96 normally don't occur) looks like this: 97 1. The job's results directory is discovered by 98 `get_job_directories()`, and a job instance is created for it. 99 2. Calls to `offload()` have no effect so long as the job 100 isn't complete in the database and the job isn't expired 101 according to the `age_limit` parameter. 102 3. Eventually, the job is both finished and expired. The next 103 call to `offload()` makes the first attempt to offload the 104 directory to GS. Offload is attempted, but fails to complete 105 (e.g. because of a GS problem). 106 4. Finally, a call to `offload()` succeeds, and the directory no 107 longer exists. Now `is_offloaded()` is true, so the job 108 instance is deleted, and future failures will not mention this 109 directory any more. 110 111 Only steps 1. and 4. are guaranteed to occur. The others depend 112 on the timing of calls to `offload()`, and on the reliability of 113 the actual offload process. 114 115 """ 116 117 __metaclass__ = abc.ABCMeta 118 119 GLOB_PATTERN = None # must be redefined in subclass 120 121 def __init__(self, resultsdir): 122 self.dirname = resultsdir 123 self._id = get_job_id_or_task_id(resultsdir) 124 self.offload_count = 0 125 self.first_offload_start = 0 126 127 @classmethod 128 def get_job_directories(cls): 129 """Return a list of directories of jobs that need offloading.""" 130 return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)] 131 132 @abc.abstractmethod 133 def get_timestamp_if_finished(self): 134 """Return this job's timestamp from the database. 135 136 If the database has not marked the job as finished, return 137 `None`. Otherwise, return a timestamp for the job. The 138 timestamp is to be used to determine expiration in 139 `is_job_expired()`. 140 141 @return Return `None` if the job is still running; otherwise 142 return a string with a timestamp in the appropriate 143 format. 144 """ 145 raise NotImplementedError("_JobDirectory.get_timestamp_if_finished") 146 147 def process_gs_instructions(self): 148 """Process any gs_offloader instructions for this special task. 149 150 @returns True/False if there is anything left to offload. 151 """ 152 # Default support is to still offload the directory. 153 return True 154 155 156 NO_OFFLOAD_README = """These results have been deleted rather than offloaded. 157 This is the expected behavior for passing jobs from the Commit Queue.""" 158 159 160 class RegularJobDirectory(_JobDirectory): 161 """Subclass of _JobDirectory for regular test jobs.""" 162 163 GLOB_PATTERN = '[0-9]*-*' 164 165 def process_gs_instructions(self): 166 """Process any gs_offloader instructions for this job. 167 168 @returns True/False if there is anything left to offload. 169 """ 170 # Go through the gs_offloader instructions file for each test in this job. 171 for path in glob.glob(os.path.join(self.dirname, '*', 172 constants.GS_OFFLOADER_INSTRUCTIONS)): 173 with open(path, 'r') as f: 174 gs_off_instructions = json.load(f) 175 if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD): 176 dirname = os.path.dirname(path) 177 _remove_log_directory_contents(dirname) 178 179 # Finally check if there's anything left to offload. 180 if os.path.exists(self.dirname) and not os.listdir(self.dirname): 181 shutil.rmtree(self.dirname) 182 return False 183 return True 184 185 def get_timestamp_if_finished(self): 186 """Get the timestamp to use for finished jobs. 187 188 @returns the latest hqe finished_on time. If the finished_on times are null 189 returns the job's created_on time. 190 """ 191 entry = _cached_afe().get_jobs(id=self._id, finished=True) 192 if not entry: 193 return None 194 hqes = _cached_afe().get_host_queue_entries(finished_on__isnull=False, 195 job_id=self._id) 196 if not hqes: 197 return entry[0].created_on 198 # While most Jobs have 1 HQE, some can have multiple, so check them all. 199 return max([hqe.finished_on for hqe in hqes]) 200 201 202 def _remove_log_directory_contents(dirpath): 203 """Remove log directory contents. 204 205 Leave a note explaining what has happened to the logs. 206 207 @param dirpath: Path to log directory. 208 """ 209 shutil.rmtree(dirpath) 210 os.mkdir(dirpath) 211 breadcrumb_name = os.path.join(dirpath, 'logs-removed-readme.txt') 212 with open(breadcrumb_name, 'w') as f: 213 f.write(NO_OFFLOAD_README) 214 215 216 class SpecialJobDirectory(_JobDirectory): 217 """Subclass of _JobDirectory for special (per-host) jobs.""" 218 219 GLOB_PATTERN = 'hosts/*/[0-9]*-*' 220 221 def __init__(self, resultsdir): 222 super(SpecialJobDirectory, self).__init__(resultsdir) 223 224 def get_timestamp_if_finished(self): 225 entry = _cached_afe().get_special_tasks(id=self._id, is_complete=True) 226 return entry[0].time_finished if entry else None 227 228 229 _OFFLOAD_MARKER = ".ready_for_offload" 230 _marker_parse_error_metric = metrics.Counter( 231 'chromeos/autotest/gs_offloader/offload_marker_parse_errors', 232 description='Errors parsing the offload marker file') 233 234 235 class SwarmingJobDirectory(_JobDirectory): 236 """Subclass of _JobDirectory for Skylab swarming jobs.""" 237 238 @classmethod 239 def get_job_directories(cls): 240 """Return a list of directories of jobs that need offloading.""" 241 # Legacy swarming results are in directories like 242 # .../results/swarming-3e4391423c3a4311 243 # In particular, the ending digit is never 0 244 jobdirs = [d for d in glob.glob('swarming-[0-9a-f]*[1-9a-f]') 245 if os.path.isdir(d)] 246 # New style swarming results are in directories like 247 # .../results/swarming-3e4391423c3a4310/1 248 # - Results are one directory deeper. 249 # - Ending digit of first directory is always 0. 250 new_style_topdir = [d for d in glob.glob('swarming-[0-9a-f]*0') 251 if os.path.isdir(d)] 252 for topdir in new_style_topdir: 253 subdirs = [d for d in glob.glob('%s/[1-9a-f]*' % topdir) 254 if os.path.isdir(d)] 255 jobdirs += subdirs 256 return jobdirs 257 258 259 260 def get_timestamp_if_finished(self): 261 """Get the timestamp to use for finished jobs. 262 263 @returns the latest hqe finished_on time. If the finished_on times are null 264 returns the job's created_on time. 265 """ 266 marker_path = os.path.join(self.dirname, _OFFLOAD_MARKER) 267 try: 268 with open(marker_path) as f: 269 ts_string = f.read().strip() 270 except (OSError, IOError) as e: 271 return None 272 try: 273 ts = int(ts_string) 274 return time_utils.epoch_time_to_date_string(ts) 275 except ValueError as e: 276 logging.debug('Error parsing %s for %s: %s', 277 _OFFLOAD_MARKER, self.dirname, e) 278 _marker_parse_error_metric.increment() 279 return None 280 281 282 _AFE = None 283 def _cached_afe(): 284 global _AFE 285 if _AFE is None: 286 _AFE = frontend_wrappers.RetryingAFE() 287 return _AFE 288