1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import datetime 6 import os 7 import random 8 import time 9 10 11 from autotest_lib.client.common_lib import base_job, global_config, log 12 from autotest_lib.client.common_lib import time_utils 13 14 _DEFAULT_POLL_INTERVAL_SECONDS = 30.0 15 16 HQE_MAXIMUM_ABORT_RATE_FLOAT = global_config.global_config.get_config_value( 17 'SCHEDULER', 'hqe_maximum_abort_rate_float', type=float, 18 default=0.5) 19 20 21 def view_is_relevant(view): 22 """ 23 Indicates whether the view of a given test is meaningful or not. 24 25 @param view: a detailed test 'view' from the TKO DB to look at. 26 @return True if this is a test result worth looking at further. 27 """ 28 return not view['test_name'].startswith('CLIENT_JOB') 29 30 31 def view_is_for_suite_job(view): 32 """ 33 Indicates whether the given test view is the view of Suite job. 34 35 @param view: a detailed test 'view' from the TKO DB to look at. 36 @return True if this is view of suite job. 37 """ 38 return view['test_name'] == 'SERVER_JOB' 39 40 41 def view_is_for_infrastructure_fail(view): 42 """ 43 Indicates whether the given test view is from an infra fail. 44 45 @param view: a detailed test 'view' from the TKO DB to look at. 46 @return True if this view indicates an infrastructure-side issue during 47 a test. 48 """ 49 return view['test_name'].endswith('SERVER_JOB') 50 51 52 def is_for_infrastructure_fail(status): 53 """ 54 Indicates whether the given Status is from an infra fail. 55 56 @param status: the Status object to look at. 57 @return True if this Status indicates an infrastructure-side issue during 58 a test. 59 """ 60 return view_is_for_infrastructure_fail({'test_name': status.test_name}) 61 62 63 def _collate_aborted(current_value, entry): 64 """ 65 reduce() over a list of HostQueueEntries for a job; True if any aborted. 66 67 Functor that can be reduced()ed over a list of 68 HostQueueEntries for a job. If any were aborted 69 (|entry.aborted| exists and is True), then the reduce() will 70 return True. 71 72 Ex: 73 entries = AFE.run('get_host_queue_entries', job=job.id) 74 reduce(_collate_aborted, entries, False) 75 76 @param current_value: the current accumulator (a boolean). 77 @param entry: the current entry under consideration. 78 @return the value of |entry.aborted| if it exists, False if not. 79 """ 80 return current_value or ('aborted' in entry and entry['aborted']) 81 82 83 def _status_for_test(status): 84 """ 85 Indicates whether the status of a given test is meaningful or not. 86 87 @param status: frontend.TestStatus object to look at. 88 @return True if this is a test result worth looking at further. 89 """ 90 return not (status.test_name.startswith('SERVER_JOB') or 91 status.test_name.startswith('CLIENT_JOB')) 92 93 94 class JobResultWaiter(object): 95 """Class for waiting on job results.""" 96 97 def __init__(self, afe, tko): 98 """Instantiate class 99 100 @param afe: an instance of AFE as defined in server/frontend.py. 101 @param tko: an instance of TKO as defined in server/frontend.py. 102 """ 103 self._afe = afe 104 self._tko = tko 105 self._job_ids = set() 106 107 def add_job(self, job): 108 """Add job to wait on. 109 110 @param job: Job object to get results from, as defined in 111 server/frontend.py 112 """ 113 self.add_jobs((job,)) 114 115 def add_jobs(self, jobs): 116 """Add job to wait on. 117 118 @param jobs: Iterable of Job object to get results from, as defined in 119 server/frontend.py 120 """ 121 self._job_ids.update(job.id for job in jobs) 122 123 def wait_for_results(self): 124 """Wait for jobs to finish and return their results. 125 126 The returned generator blocks until all jobs have finished, 127 naturally. 128 129 @yields an iterator of Statuses, one per test. 130 """ 131 while self._job_ids: 132 for job in self._get_finished_jobs(): 133 for result in _yield_job_results(self._afe, self._tko, job): 134 yield result 135 self._job_ids.remove(job.id) 136 self._sleep() 137 138 def _get_finished_jobs(self): 139 # This is an RPC call which serializes to JSON, so we can't pass 140 # in sets. 141 return self._afe.get_jobs(id__in=list(self._job_ids), finished=True) 142 143 def _sleep(self): 144 time.sleep(_DEFAULT_POLL_INTERVAL_SECONDS * (random.random() + 0.5)) 145 146 147 def _yield_job_results(afe, tko, job): 148 """ 149 Yields the results of an individual job. 150 151 Yields one Status object per test. 152 153 @param afe: an instance of AFE as defined in server/frontend.py. 154 @param tko: an instance of TKO as defined in server/frontend.py. 155 @param job: Job object to get results from, as defined in 156 server/frontend.py 157 @yields an iterator of Statuses, one per test. 158 """ 159 entries = afe.run('get_host_queue_entries', job=job.id) 160 161 # This query uses the job id to search through the tko_test_view_2 162 # table, for results of a test with a similar job_tag. The job_tag 163 # is used to store results, and takes the form job_id-owner/host. 164 # Many times when a job aborts during a test, the job_tag actually 165 # exists and the results directory contains valid logs. If the job 166 # was aborted prematurely i.e before it had a chance to create the 167 # job_tag, this query will return no results. When statuses is not 168 # empty it will contain frontend.TestStatus' with fields populated 169 # using the results of the db query. 170 statuses = tko.get_job_test_statuses_from_db(job.id) 171 if not statuses: 172 yield Status('ABORT', job.name) 173 174 # We only care about the SERVER and CLIENT job failures when there 175 # are no test failures. 176 contains_test_failure = any(_status_for_test(s) and s.status != 'GOOD' 177 for s in statuses) 178 for s in statuses: 179 # TKO parser uniquelly identifies a test run by 180 # (test_name, subdir). In dynamic suite, we need to emit 181 # a subdir for each status and make sure (test_name, subdir) 182 # in the suite job's status log is unique. 183 # For non-test status (i.e.SERVER_JOB, CLIENT_JOB), 184 # we use 'job_tag' from tko_test_view_2, which looks like 185 # '1246-owner/172.22.33.44' 186 # For normal test status, we use 'job_tag/subdir' 187 # which looks like '1246-owner/172.22.33.44/my_DummyTest.tag.subdir_tag' 188 if _status_for_test(s): 189 yield Status(s.status, s.test_name, s.reason, 190 s.test_started_time, s.test_finished_time, 191 job.id, job.owner, s.hostname, job.name, 192 subdir=os.path.join(s.job_tag, s.subdir)) 193 else: 194 if s.status != 'GOOD' and not contains_test_failure: 195 yield Status(s.status, 196 '%s_%s' % (entries[0]['job']['name'], 197 s.test_name), 198 s.reason, s.test_started_time, 199 s.test_finished_time, job.id, 200 job.owner, s.hostname, job.name, 201 subdir=s.job_tag) 202 203 204 class Status(object): 205 """ 206 A class representing a test result. 207 208 Stores all pertinent info about a test result and, given a callable 209 to use, can record start, result, and end info appropriately. 210 211 @var _status: status code, e.g. 'INFO', 'FAIL', etc. 212 @var _test_name: the name of the test whose result this is. 213 @var _reason: message explaining failure, if any. 214 @var _begin_timestamp: when test started (int, in seconds since the epoch). 215 @var _end_timestamp: when test finished (int, in seconds since the epoch). 216 @var _id: the ID of the job that generated this Status. 217 @var _owner: the owner of the job that generated this Status. 218 219 @var STATUS_MAP: a dict mapping host queue entry status strings to canonical 220 status codes; e.g. 'Aborted' -> 'ABORT' 221 """ 222 _status = None 223 _test_name = None 224 _reason = None 225 _begin_timestamp = None 226 _end_timestamp = None 227 228 # Queued status can occur if the try job just aborted due to not completing 229 # reimaging for all machines. The Queued corresponds to an 'ABORT'. 230 STATUS_MAP = {'Failed': 'FAIL', 'Aborted': 'ABORT', 'Completed': 'GOOD', 231 'Queued' : 'ABORT'} 232 233 class sle(base_job.status_log_entry): 234 """ 235 Thin wrapper around status_log_entry that supports stringification. 236 """ 237 def __str__(self): 238 return self.render() 239 240 def __repr__(self): 241 return self.render() 242 243 244 def __init__(self, status, test_name, reason='', begin_time_str=None, 245 end_time_str=None, job_id=None, owner=None, hostname=None, 246 job_name='', subdir=None): 247 """ 248 Constructor 249 250 @param status: status code, e.g. 'INFO', 'FAIL', etc. 251 @param test_name: the name of the test whose result this is. 252 @param reason: message explaining failure, if any; Optional. 253 @param begin_time_str: when test started (in time_utils.TIME_FMT); 254 now() if None or 'None'. 255 @param end_time_str: when test finished (in time_utils.TIME_FMT); 256 now() if None or 'None'. 257 @param job_id: the ID of the job that generated this Status. 258 @param owner: the owner of the job that generated this Status. 259 @param hostname: The name of the host the test that generated this 260 result ran on. 261 @param job_name: The job name; Contains the test name with/without the 262 experimental prefix, the tag and the build. 263 @param subdir: The result directory of the test. It will be recorded 264 as the subdir in the status.log file. 265 """ 266 self._status = status 267 self._test_name = test_name 268 self._reason = reason 269 self._id = job_id 270 self._owner = owner 271 self._hostname = hostname 272 self._job_name = job_name 273 self._subdir = subdir 274 # Autoserv drops a keyval of the started time which eventually makes its 275 # way here. Therefore, if we have a starting time, we may assume that 276 # the test reached Running and actually began execution on a drone. 277 self._test_executed = begin_time_str and begin_time_str != 'None' 278 279 if begin_time_str and begin_time_str != 'None': 280 self._begin_timestamp = int(time.mktime( 281 datetime.datetime.strptime( 282 begin_time_str, time_utils.TIME_FMT).timetuple())) 283 else: 284 self._begin_timestamp = int(time.time()) 285 286 if end_time_str and end_time_str != 'None': 287 self._end_timestamp = int(time.mktime( 288 datetime.datetime.strptime( 289 end_time_str, time_utils.TIME_FMT).timetuple())) 290 else: 291 self._end_timestamp = int(time.time()) 292 293 294 def is_good(self): 295 """ Returns true if status is good. """ 296 return self._status == 'GOOD' 297 298 299 def is_warn(self): 300 """ Returns true if status is warn. """ 301 return self._status == 'WARN' 302 303 304 def is_testna(self): 305 """ Returns true if status is TEST_NA """ 306 return self._status == 'TEST_NA' 307 308 309 def is_worse_than(self, candidate): 310 """ 311 Return whether |self| represents a "worse" failure than |candidate|. 312 313 "Worse" is defined the same as it is for log message purposes in 314 common_lib/log.py. We also consider status with a specific error 315 message to represent a "worse" failure than one without. 316 317 @param candidate: a Status instance to compare to this one. 318 @return True if |self| is "worse" than |candidate|. 319 """ 320 if self._status != candidate._status: 321 return (log.job_statuses.index(self._status) < 322 log.job_statuses.index(candidate._status)) 323 # else, if the statuses are the same... 324 if self._reason and not candidate._reason: 325 return True 326 return False 327 328 329 def record_start(self, record_entry): 330 """ 331 Use record_entry to log message about start of test. 332 333 @param record_entry: a callable to use for logging. 334 prototype: 335 record_entry(base_job.status_log_entry) 336 """ 337 log_entry = Status.sle('START', self._subdir, 338 self._test_name, '', 339 None, self._begin_timestamp) 340 record_entry(log_entry, log_in_subdir=False) 341 342 343 def record_result(self, record_entry): 344 """ 345 Use record_entry to log message about result of test. 346 347 @param record_entry: a callable to use for logging. 348 prototype: 349 record_entry(base_job.status_log_entry) 350 """ 351 log_entry = Status.sle(self._status, self._subdir, 352 self._test_name, self._reason, None, 353 self._end_timestamp) 354 record_entry(log_entry, log_in_subdir=False) 355 356 357 def record_end(self, record_entry): 358 """ 359 Use record_entry to log message about end of test. 360 361 @param record_entry: a callable to use for logging. 362 prototype: 363 record_entry(base_job.status_log_entry) 364 """ 365 log_entry = Status.sle('END %s' % self._status, self._subdir, 366 self._test_name, '', None, self._end_timestamp) 367 record_entry(log_entry, log_in_subdir=False) 368 369 370 def record_all(self, record_entry): 371 """ 372 Use record_entry to log all messages about test results. 373 374 @param record_entry: a callable to use for logging. 375 prototype: 376 record_entry(base_job.status_log_entry) 377 """ 378 self.record_start(record_entry) 379 self.record_result(record_entry) 380 self.record_end(record_entry) 381 382 383 def override_status(self, override): 384 """ 385 Override the _status field of this Status. 386 387 @param override: value with which to override _status. 388 """ 389 self._status = override 390 391 392 @property 393 def test_name(self): 394 """ Name of the test this status corresponds to. """ 395 return self._test_name 396 397 398 @test_name.setter 399 def test_name(self, value): 400 """ 401 Test name setter. 402 403 @param value: The test name. 404 """ 405 self._test_name = value 406 407 408 @property 409 def id(self): 410 """ Id of the job that corresponds to this status. """ 411 return self._id 412 413 414 @property 415 def owner(self): 416 """ Owner of the job that corresponds to this status. """ 417 return self._owner 418 419 420 @property 421 def hostname(self): 422 """ Host the job corresponding to this status ran on. """ 423 return self._hostname 424 425 426 @property 427 def reason(self): 428 """ Reason the job corresponding to this status failed. """ 429 return self._reason 430 431 432 @property 433 def test_executed(self): 434 """ If the test reached running an autoserv instance or not. """ 435 return self._test_executed 436 437 @property 438 def subdir(self): 439 """Subdir of test this status corresponds to.""" 440 return self._subdir 441