Home | History | Annotate | Download | only in dynamic_suite
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import datetime
      6 import logging
      7 import os
      8 import random
      9 import time
     10 
     11 
     12 from autotest_lib.client.common_lib import base_job, global_config, log
     13 from autotest_lib.client.common_lib import time_utils
     14 
     15 _DEFAULT_POLL_INTERVAL_SECONDS = 30.0
     16 
     17 HQE_MAXIMUM_ABORT_RATE_FLOAT = global_config.global_config.get_config_value(
     18             'SCHEDULER', 'hqe_maximum_abort_rate_float', type=float,
     19             default=0.5)
     20 
     21 
     22 def view_is_relevant(view):
     23     """
     24     Indicates whether the view of a given test is meaningful or not.
     25 
     26     @param view: a detailed test 'view' from the TKO DB to look at.
     27     @return True if this is a test result worth looking at further.
     28     """
     29     return not view['test_name'].startswith('CLIENT_JOB')
     30 
     31 
     32 def view_is_for_suite_job(view):
     33     """
     34     Indicates whether the given test view is the view of Suite job.
     35 
     36     @param view: a detailed test 'view' from the TKO DB to look at.
     37     @return True if this is view of suite job.
     38     """
     39     return view['test_name'] == 'SERVER_JOB'
     40 
     41 
     42 def view_is_for_infrastructure_fail(view):
     43     """
     44     Indicates whether the given test view is from an infra fail.
     45 
     46     @param view: a detailed test 'view' from the TKO DB to look at.
     47     @return True if this view indicates an infrastructure-side issue during
     48                  a test.
     49     """
     50     return view['test_name'].endswith('SERVER_JOB')
     51 
     52 
     53 def is_for_infrastructure_fail(status):
     54     """
     55     Indicates whether the given Status is from an infra fail.
     56 
     57     @param status: the Status object to look at.
     58     @return True if this Status indicates an infrastructure-side issue during
     59                  a test.
     60     """
     61     return view_is_for_infrastructure_fail({'test_name': status.test_name})
     62 
     63 
     64 def _abort_jobs_if_timedout(afe, jobs, start_time, timeout_mins):
     65     """
     66     Abort all of the jobs in jobs if the running time has past the timeout.
     67 
     68     @param afe: an instance of AFE as defined in server/frontend.py.
     69     @param jobs: an iterable of Running frontend.Jobs
     70     @param start_time: Time to compare to the current time to see if a timeout
     71                        has occurred.
     72     @param timeout_mins: Time in minutes to wait before aborting the jobs we
     73                          are waiting on.
     74 
     75     @returns True if we there was a timeout, False if not.
     76     """
     77     if datetime.datetime.utcnow() < (start_time +
     78                                      datetime.timedelta(minutes=timeout_mins)):
     79         return False
     80     for job in jobs:
     81         logging.debug('Job: %s has timed out after %s minutes. Aborting job.',
     82                       job.id, timeout_mins)
     83         afe.run('abort_host_queue_entries', job=job.id)
     84     return True
     85 
     86 
     87 def _collate_aborted(current_value, entry):
     88     """
     89     reduce() over a list of HostQueueEntries for a job; True if any aborted.
     90 
     91     Functor that can be reduced()ed over a list of
     92     HostQueueEntries for a job.  If any were aborted
     93     (|entry.aborted| exists and is True), then the reduce() will
     94     return True.
     95 
     96     Ex:
     97       entries = AFE.run('get_host_queue_entries', job=job.id)
     98       reduce(_collate_aborted, entries, False)
     99 
    100     @param current_value: the current accumulator (a boolean).
    101     @param entry: the current entry under consideration.
    102     @return the value of |entry.aborted| if it exists, False if not.
    103     """
    104     return current_value or ('aborted' in entry and entry['aborted'])
    105 
    106 
    107 def _status_for_test(status):
    108     """
    109     Indicates whether the status of a given test is meaningful or not.
    110 
    111     @param status: frontend.TestStatus object to look at.
    112     @return True if this is a test result worth looking at further.
    113     """
    114     return not (status.test_name.startswith('SERVER_JOB') or
    115                 status.test_name.startswith('CLIENT_JOB'))
    116 
    117 
    118 class _JobResultWaiter(object):
    119     """Class for waiting on job results."""
    120 
    121     def __init__(self, afe, tko):
    122         """Instantiate class
    123 
    124         @param afe: an instance of AFE as defined in server/frontend.py.
    125         @param tko: an instance of TKO as defined in server/frontend.py.
    126         """
    127         self._afe = afe
    128         self._tko = tko
    129         self._job_ids = set()
    130 
    131     def add_job(self, job):
    132         """Add job to wait on.
    133 
    134         @param job: Job object to get results from, as defined in
    135                     server/frontend.py
    136         """
    137         self.add_jobs((job,))
    138 
    139     def add_jobs(self, jobs):
    140         """Add job to wait on.
    141 
    142         @param jobs: Iterable of Job object to get results from, as defined in
    143                      server/frontend.py
    144         """
    145         self._job_ids.update(job.id for job in jobs)
    146 
    147     def wait_for_results(self):
    148         """Wait for jobs to finish and return their results.
    149 
    150         The returned generator blocks until all jobs have finished,
    151         naturally.
    152 
    153         @yields an iterator of Statuses, one per test.
    154         """
    155         while self._job_ids:
    156             for job in self._get_finished_jobs():
    157                 for result in _yield_job_results(self._afe, self._tko, job):
    158                     yield result
    159                 self._job_ids.remove(job.id)
    160             self._sleep()
    161 
    162     def _get_finished_jobs(self):
    163         # This is an RPC call which serializes to JSON, so we can't pass
    164         # in sets.
    165         return self._afe.get_jobs(id__in=list(self._job_ids), finished=True)
    166 
    167     def _sleep(self):
    168         time.sleep(_DEFAULT_POLL_INTERVAL_SECONDS * (random.random() + 0.5))
    169 
    170 
    171 def _yield_job_results(afe, tko, job):
    172     """
    173     Yields the results of an individual job.
    174 
    175     Yields one Status object per test.
    176 
    177     @param afe: an instance of AFE as defined in server/frontend.py.
    178     @param tko: an instance of TKO as defined in server/frontend.py.
    179     @param job: Job object to get results from, as defined in
    180                 server/frontend.py
    181     @yields an iterator of Statuses, one per test.
    182     """
    183     entries = afe.run('get_host_queue_entries', job=job.id)
    184 
    185     # This query uses the job id to search through the tko_test_view_2
    186     # table, for results of a test with a similar job_tag. The job_tag
    187     # is used to store results, and takes the form job_id-owner/host.
    188     # Many times when a job aborts during a test, the job_tag actually
    189     # exists and the results directory contains valid logs. If the job
    190     # was aborted prematurely i.e before it had a chance to create the
    191     # job_tag, this query will return no results. When statuses is not
    192     # empty it will contain frontend.TestStatus' with fields populated
    193     # using the results of the db query.
    194     statuses = tko.get_job_test_statuses_from_db(job.id)
    195     if not statuses:
    196         yield Status('ABORT', job.name)
    197 
    198     # We only care about the SERVER and CLIENT job failures when there
    199     # are no test failures.
    200     contains_test_failure = any(_status_for_test(s) and s.status != 'GOOD'
    201                                 for s in statuses)
    202     for s in statuses:
    203         # TKO parser uniquelly identifies a test run by
    204         # (test_name, subdir). In dynamic suite, we need to emit
    205         # a subdir for each status and make sure (test_name, subdir)
    206         # in the suite job's status log is unique.
    207         # For non-test status (i.e.SERVER_JOB, CLIENT_JOB),
    208         # we use 'job_tag' from tko_test_view_2, which looks like
    209         # '1246-owner/172.22.33.44'
    210         # For normal test status, we use 'job_tag/subdir'
    211         # which looks like '1246-owner/172.22.33.44/my_DummyTest.tag.subdir_tag'
    212         if _status_for_test(s):
    213             yield Status(s.status, s.test_name, s.reason,
    214                          s.test_started_time, s.test_finished_time,
    215                          job.id, job.owner, s.hostname, job.name,
    216                          subdir=os.path.join(s.job_tag, s.subdir))
    217         else:
    218             if s.status != 'GOOD' and not contains_test_failure:
    219                 yield Status(s.status,
    220                              '%s_%s' % (entries[0]['job']['name'],
    221                                         s.test_name),
    222                              s.reason, s.test_started_time,
    223                              s.test_finished_time, job.id,
    224                              job.owner, s.hostname, job.name,
    225                              subdir=s.job_tag)
    226 
    227 
    228 def wait_for_child_results(afe, tko, parent_job_id):
    229     """
    230     Wait for results of all tests in jobs with given parent id.
    231 
    232     New jobs could be added by calling send(new_jobs) on the generator.
    233     Currently polls for results every 5s.  Yields one Status object per test
    234     as results become available.
    235 
    236     @param afe: an instance of AFE as defined in server/frontend.py.
    237     @param tko: an instance of TKO as defined in server/frontend.py.
    238     @param parent_job_id: Parent job id for the jobs to wait on.
    239     @yields an iterator of Statuses, one per test.
    240     """
    241     waiter = _JobResultWaiter(afe, tko)
    242     waiter.add_jobs(afe.get_jobs(parent_job_id=parent_job_id))
    243     for result in waiter.wait_for_results():
    244         new_jobs = (yield result)
    245         if new_jobs:
    246             waiter.add_jobs(new_jobs)
    247             # Return nothing if 'send' is called
    248             yield None
    249 
    250 
    251 def wait_for_results(afe, tko, jobs):
    252     """
    253     Wait for results of all tests in all jobs in |jobs|.
    254 
    255     New jobs could be added by calling send(new_jobs) on the generator.
    256     Currently polls for results every 5s.  Yields one Status object per test
    257     as results become available.
    258 
    259     @param afe: an instance of AFE as defined in server/frontend.py.
    260     @param tko: an instance of TKO as defined in server/frontend.py.
    261     @param jobs: a list of Job objects, as defined in server/frontend.py.
    262     @yields an iterator of Statuses, one per test.
    263     """
    264     waiter = _JobResultWaiter(afe, tko)
    265     waiter.add_jobs(jobs)
    266     for result in waiter.wait_for_results():
    267         new_jobs = (yield result)
    268         if new_jobs:
    269             waiter.add_jobs(new_jobs)
    270             # Return nothing if 'send' is called
    271             yield None
    272 
    273 
    274 class Status(object):
    275     """
    276     A class representing a test result.
    277 
    278     Stores all pertinent info about a test result and, given a callable
    279     to use, can record start, result, and end info appropriately.
    280 
    281     @var _status: status code, e.g. 'INFO', 'FAIL', etc.
    282     @var _test_name: the name of the test whose result this is.
    283     @var _reason: message explaining failure, if any.
    284     @var _begin_timestamp: when test started (int, in seconds since the epoch).
    285     @var _end_timestamp: when test finished (int, in seconds since the epoch).
    286     @var _id: the ID of the job that generated this Status.
    287     @var _owner: the owner of the job that generated this Status.
    288 
    289     @var STATUS_MAP: a dict mapping host queue entry status strings to canonical
    290                      status codes; e.g. 'Aborted' -> 'ABORT'
    291     """
    292     _status = None
    293     _test_name = None
    294     _reason = None
    295     _begin_timestamp = None
    296     _end_timestamp = None
    297 
    298     # Queued status can occur if the try job just aborted due to not completing
    299     # reimaging for all machines. The Queued corresponds to an 'ABORT'.
    300     STATUS_MAP = {'Failed': 'FAIL', 'Aborted': 'ABORT', 'Completed': 'GOOD',
    301                   'Queued' : 'ABORT'}
    302 
    303     class sle(base_job.status_log_entry):
    304         """
    305         Thin wrapper around status_log_entry that supports stringification.
    306         """
    307         def __str__(self):
    308             return self.render()
    309 
    310         def __repr__(self):
    311             return self.render()
    312 
    313 
    314     def __init__(self, status, test_name, reason='', begin_time_str=None,
    315                  end_time_str=None, job_id=None, owner=None, hostname=None,
    316                  job_name='', subdir=None):
    317         """
    318         Constructor
    319 
    320         @param status: status code, e.g. 'INFO', 'FAIL', etc.
    321         @param test_name: the name of the test whose result this is.
    322         @param reason: message explaining failure, if any; Optional.
    323         @param begin_time_str: when test started (in time_utils.TIME_FMT);
    324                                now() if None or 'None'.
    325         @param end_time_str: when test finished (in time_utils.TIME_FMT);
    326                              now() if None or 'None'.
    327         @param job_id: the ID of the job that generated this Status.
    328         @param owner: the owner of the job that generated this Status.
    329         @param hostname: The name of the host the test that generated this
    330                          result ran on.
    331         @param job_name: The job name; Contains the test name with/without the
    332                          experimental prefix, the tag and the build.
    333         @param subdir: The result directory of the test. It will be recorded
    334                        as the subdir in the status.log file.
    335         """
    336         self._status = status
    337         self._test_name = test_name
    338         self._reason = reason
    339         self._id = job_id
    340         self._owner = owner
    341         self._hostname = hostname
    342         self._job_name = job_name
    343         self._subdir = subdir
    344         # Autoserv drops a keyval of the started time which eventually makes its
    345         # way here.  Therefore, if we have a starting time, we may assume that
    346         # the test reached Running and actually began execution on a drone.
    347         self._test_executed = begin_time_str and begin_time_str != 'None'
    348 
    349         if begin_time_str and begin_time_str != 'None':
    350             self._begin_timestamp = int(time.mktime(
    351                 datetime.datetime.strptime(
    352                     begin_time_str, time_utils.TIME_FMT).timetuple()))
    353         else:
    354             self._begin_timestamp = int(time.time())
    355 
    356         if end_time_str and end_time_str != 'None':
    357             self._end_timestamp = int(time.mktime(
    358                 datetime.datetime.strptime(
    359                     end_time_str, time_utils.TIME_FMT).timetuple()))
    360         else:
    361             self._end_timestamp = int(time.time())
    362 
    363 
    364     def is_good(self):
    365         """ Returns true if status is good. """
    366         return self._status == 'GOOD'
    367 
    368 
    369     def is_warn(self):
    370         """ Returns true if status is warn. """
    371         return self._status == 'WARN'
    372 
    373 
    374     def is_testna(self):
    375         """ Returns true if status is TEST_NA """
    376         return self._status == 'TEST_NA'
    377 
    378 
    379     def is_worse_than(self, candidate):
    380         """
    381         Return whether |self| represents a "worse" failure than |candidate|.
    382 
    383         "Worse" is defined the same as it is for log message purposes in
    384         common_lib/log.py.  We also consider status with a specific error
    385         message to represent a "worse" failure than one without.
    386 
    387         @param candidate: a Status instance to compare to this one.
    388         @return True if |self| is "worse" than |candidate|.
    389         """
    390         if self._status != candidate._status:
    391             return (log.job_statuses.index(self._status) <
    392                     log.job_statuses.index(candidate._status))
    393         # else, if the statuses are the same...
    394         if self._reason and not candidate._reason:
    395             return True
    396         return False
    397 
    398 
    399     def record_start(self, record_entry):
    400         """
    401         Use record_entry to log message about start of test.
    402 
    403         @param record_entry: a callable to use for logging.
    404                prototype:
    405                    record_entry(base_job.status_log_entry)
    406         """
    407         log_entry = Status.sle('START', self._subdir,
    408                                 self._test_name, '',
    409                                 None, self._begin_timestamp)
    410         record_entry(log_entry, log_in_subdir=False)
    411 
    412 
    413     def record_result(self, record_entry):
    414         """
    415         Use record_entry to log message about result of test.
    416 
    417         @param record_entry: a callable to use for logging.
    418                prototype:
    419                    record_entry(base_job.status_log_entry)
    420         """
    421         log_entry = Status.sle(self._status, self._subdir,
    422                                 self._test_name, self._reason, None,
    423                                 self._end_timestamp)
    424         record_entry(log_entry, log_in_subdir=False)
    425 
    426 
    427     def record_end(self, record_entry):
    428         """
    429         Use record_entry to log message about end of test.
    430 
    431         @param record_entry: a callable to use for logging.
    432                prototype:
    433                    record_entry(base_job.status_log_entry)
    434         """
    435         log_entry = Status.sle('END %s' % self._status, self._subdir,
    436                                self._test_name, '', None, self._end_timestamp)
    437         record_entry(log_entry, log_in_subdir=False)
    438 
    439 
    440     def record_all(self, record_entry):
    441         """
    442         Use record_entry to log all messages about test results.
    443 
    444         @param record_entry: a callable to use for logging.
    445                prototype:
    446                    record_entry(base_job.status_log_entry)
    447         """
    448         self.record_start(record_entry)
    449         self.record_result(record_entry)
    450         self.record_end(record_entry)
    451 
    452 
    453     def override_status(self, override):
    454         """
    455         Override the _status field of this Status.
    456 
    457         @param override: value with which to override _status.
    458         """
    459         self._status = override
    460 
    461 
    462     @property
    463     def test_name(self):
    464         """ Name of the test this status corresponds to. """
    465         return self._test_name
    466 
    467 
    468     @test_name.setter
    469     def test_name(self, value):
    470         """
    471         Test name setter.
    472 
    473         @param value: The test name.
    474         """
    475         self._test_name = value
    476 
    477 
    478     @property
    479     def id(self):
    480         """ Id of the job that corresponds to this status. """
    481         return self._id
    482 
    483 
    484     @property
    485     def owner(self):
    486         """ Owner of the job that corresponds to this status. """
    487         return self._owner
    488 
    489 
    490     @property
    491     def hostname(self):
    492         """ Host the job corresponding to this status ran on. """
    493         return self._hostname
    494 
    495 
    496     @property
    497     def reason(self):
    498         """ Reason the job corresponding to this status failed. """
    499         return self._reason
    500 
    501 
    502     @property
    503     def test_executed(self):
    504         """ If the test reached running an autoserv instance or not. """
    505         return self._test_executed
    506 
    507     @property
    508     def subdir(self):
    509         """Subdir of test this status corresponds to."""
    510         return self._subdir
    511