Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/python
      2 #
      3 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 
      8 """Script to calculate timing stats for suites.
      9 
     10 This script measures nine stats for a suite run.
     11 1. Net suite runtime.
     12 2. Suite scheduling overhead.
     13 3. Average scheduling overhead.
     14 4. Average Queuing time.
     15 5. Average Resetting time.
     16 6. Average provisioning time.
     17 7. Average Running time.
     18 8. Average Parsing time.
     19 9. Average Gathering time.
     20 
     21 When the cron_mode is enabled, this script throws all stats but the first one
     22 (Net suite runtime) to Graphite because the first one is already
     23 being sent to Graphite by Autotest online.
     24 
     25 Net suite runtime is end-to-end time for a suite from the beginning
     26 to the end.
     27 It is stored in a field, "duration", of a type, "suite_runtime" in
     28 elasticsearch (ES).
     29 
     30 Suite scheduling overhead is defined by the average of DUT overheads.
     31 Suite is composed of one or more jobs, and those jobs are run on
     32 one or more DUTs that are available.
     33 A DUT overhead is defined by:
     34     DUT_i overhead = sum(net time for job_k - runtime for job_k
     35                          - runtime for special tasks of job_k)
     36     Job_k are the jobs run on DUT_i.
     37 
     38 Net time for a job is the time from job_queued_time to hqe_finished_time.
     39 job_queued_time is stored in the "queued_time" column of "tko_jobs" table.
     40 hqe_finished_time is stored in the "finished_on" of "afe_host_queue_entries"
     41 table.
     42 We do not use "job_finished_time" of "tko_jobs" as job_finished_time is
     43 recorded before gathering/parsing/archiving.
     44 We do not use hqe started time ("started_on" of "afe_host_queue_entries"),
     45 as it does not account for the lag from a host is assigned to the job till
     46 the scheduler sees the assignment.
     47 
     48 Runtime for job_k is the sum of durations for the records of
     49 "job_time_breakdown" type in ES that have "Queued" or "Running" status.
     50 It is possible that a job has multiple "Queued" records when the job's test
     51 failed and tried again.
     52 We take into account only the last "Queued" record.
     53 
     54 Runtime for special tasks of job_k is the sum of durations for the records
     55 of "job_time_breakdown" type in ES that have "Resetting", "Provisioning",
     56 "Gathering", or "Parsing" status.
     57 We take into account only the records whose timestamp is larger than
     58 the timestamp of the last "Queued" record.
     59 """
     60 
     61 import argparse
     62 from datetime import datetime
     63 from datetime import timedelta
     64 
     65 import common
     66 from autotest_lib.client.common_lib import host_queue_entry_states
     67 from autotest_lib.client.common_lib import time_utils
     68 from autotest_lib.client.common_lib.cros.graphite import autotest_es
     69 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     70 from autotest_lib.frontend import setup_django_environment
     71 from autotest_lib.frontend.afe import models
     72 from autotest_lib.frontend.tko import models as tko_models
     73 from autotest_lib.server import utils
     74 from autotest_lib.site_utils import job_overhead
     75 
     76 
     77 _options = None
     78 
     79 _hqes = host_queue_entry_states.Status
     80 _states = [
     81         _hqes.QUEUED, _hqes.RESETTING, _hqes.PROVISIONING,
     82         _hqes.RUNNING, _hqes.GATHERING, _hqes.PARSING]
     83 
     84 
     85 def mean(l):
     86     """
     87     Calculates an Arithmetic Mean for the numbers in a list.
     88 
     89     @param l: A list of numbers.
     90     @return: Arithmetic mean if the list is not empty.
     91              Otherwise, returns zero.
     92     """
     93     return float(sum(l)) / len(l) if l else 0
     94 
     95 
     96 def print_verbose(string, *args):
     97     if _options.verbose:
     98         print(string % args)
     99 
    100 
    101 def get_nontask_runtime(job_id, dut, job_info_dict):
    102     """
    103     Get sum of durations for "Queued", "Running", "Parsing", and "Gathering"
    104     status records.
    105     job_info_dict will be modified in this function to store the duration
    106     for each status.
    107 
    108     @param job_id: The job id of interest.
    109     @param dut: Hostname of a DUT that the job ran on.
    110     @param job_info_dict: Dictionary that has information for jobs.
    111     @return: Tuple of sum of durations and the timestamp for the last
    112              Queued record.
    113     """
    114     results = autotest_es.query(
    115             fields_returned=['status', 'duration', 'time_recorded'],
    116             equality_constraints=[('_type', 'job_time_breakdown'),
    117                                   ('job_id', job_id),
    118                                   ('hostname', dut)],
    119             sort_specs=[{'time_recorded': 'desc'}])
    120 
    121     sum = 0
    122     last_queued_timestamp = 0
    123     # There could be multiple "Queued" records.
    124     # Get sum of durations for the records after the last "Queued" records
    125     # (including the last "Queued" record).
    126     # Exploits the fact that "results" are ordered in the descending order
    127     # of time_recorded.
    128     for hit in results.hits:
    129         job_info_dict[job_id][hit['status']] = float(hit['duration'])
    130         if hit['status'] == 'Queued':
    131             # The first Queued record is the last one because of the descending
    132             # order of "results".
    133             last_queued_timestamp = float(hit['time_recorded'])
    134             sum += float(hit['duration'])
    135             break
    136         else:
    137             sum += float(hit['duration'])
    138     return (sum, last_queued_timestamp)
    139 
    140 
    141 def get_tasks_runtime(task_list, dut, t_start, job_id, job_info_dict):
    142     """
    143     Get sum of durations for special tasks.
    144     job_info_dict will be modified in this function to store the duration
    145     for each special task.
    146 
    147     @param task_list: List of task id.
    148     @param dut: Hostname of a DUT that the tasks ran on.
    149     @param t_start: Beginning timestamp.
    150     @param job_id: The job id that is related to the tasks.
    151                    This is used only for debugging purpose.
    152     @param job_info_dict: Dictionary that has information for jobs.
    153     @return: Sum of durations of the tasks.
    154     """
    155     t_start_epoch = time_utils.to_epoch_time(t_start)
    156     results = autotest_es.query(
    157             fields_returned=['status', 'task_id', 'duration'],
    158             equality_constraints=[('_type', 'job_time_breakdown'),
    159                                   ('hostname', dut)],
    160             range_constraints=[('time_recorded', t_start_epoch, None)],
    161             batch_constraints=[('task_id', task_list)])
    162     sum = 0
    163     for hit in results.hits:
    164         sum += float(hit['duration'])
    165         job_info_dict[job_id][hit['status']] = float(hit['duration'])
    166         print_verbose('Task %s for Job %s took %s',
    167                       hit['task_id'], job_id, hit['duration'])
    168     return sum
    169 
    170 
    171 def get_job_runtime(job_id, dut, job_info_dict):
    172     """
    173     Get sum of durations for the entries that are related to a job.
    174     job_info_dict will be modified in this function.
    175 
    176     @param job_id: The job id of interest.
    177     @param dut: Hostname of a DUT that the job ran on.
    178     @param job_info_dict: Dictionary that has information for jobs.
    179     @return: Total duration taken by a job.
    180     """
    181     sum, t_last_queued = get_nontask_runtime(job_id, dut, job_info_dict)
    182     print_verbose('Job %s took %f, last Queued: %s',
    183                   job_id, sum, t_last_queued)
    184     sum += get_tasks_runtime(
    185             list(job_info_dict[job_id]['tasks']), dut, t_last_queued,
    186             job_id, job_info_dict)
    187     return sum
    188 
    189 
    190 def get_dut_overhead(dut, jobs, job_info_dict):
    191     """
    192     Calculates the scheduling overhead of a DUT.
    193 
    194     The scheduling overhead of a DUT is defined by the sum of scheduling
    195     overheads for the jobs that ran on the DUT.
    196     The scheduling overhead for a job is defined by the difference
    197     of net job runtime and real job runtime.
    198     job_info_dict will be modified in this function.
    199 
    200     @param dut: Hostname of a DUT.
    201     @param jobs: The list of jobs that ran on the DUT.
    202     @param job_info_dict: Dictionary that has information for jobs.
    203     @return: Scheduling overhead of a DUT in a floating point value.
    204              The unit is a second.
    205     """
    206     overheads = []
    207     for job_id in jobs:
    208         (t_start, t_end) = job_info_dict[job_id]['timestamps']
    209         runtime = get_job_runtime(job_id, dut, job_info_dict)
    210         overheads.append(t_end - t_start - runtime)
    211         print_verbose('Job: %s, Net runtime: %f, Real runtime: %f, '
    212                       'Overhead: %f', job_id, t_end - t_start, runtime,
    213                       t_end - t_start - runtime)
    214     return sum(overheads)
    215 
    216 
    217 def get_child_jobs_info(suite_job_id, num_child_jobs, sanity_check):
    218     """
    219     Gets information about child jobs of a suite.
    220 
    221     @param suite_job_id: Job id of a suite.
    222     @param num_child_jobs: Number of child jobs of the suite.
    223     @param sanity_check: Do sanity check if True.
    224     @return: A tuple of (dictionary, list). For dictionary, the key is
    225              a DUT's hostname and the value is a list of jobs that ran on
    226              the DUT. List is the list of all jobs of the suite.
    227     """
    228     results = autotest_es.query(
    229             fields_returned=['job_id', 'hostname'],
    230             equality_constraints=[('_type', 'host_history'),
    231                                   ('parent_job_id', suite_job_id),
    232                                   ('status', 'Running'),])
    233 
    234     dut_jobs_dict = {}
    235     job_filter = set()
    236     for hit in results.hits:
    237         job_id = hit['job_id']
    238         dut = hit['hostname']
    239         if job_id in job_filter:
    240             continue
    241         job_list = dut_jobs_dict.setdefault(dut, [])
    242         job_list.append(job_id)
    243         job_filter.add(job_id)
    244 
    245     if sanity_check and len(job_filter) != num_child_jobs:
    246         print('WARNING: Mismatch number of child jobs of a suite (%d): '
    247               '%d != %d' % (suite_job_id, len(job_filter), num_child_jobs))
    248     return dut_jobs_dict, list(job_filter)
    249 
    250 
    251 def get_job_timestamps(job_list, job_info_dict):
    252     """
    253     Get beginning time and ending time for each job.
    254 
    255     The beginning time of a job is "queued_time" of "tko_jobs" table.
    256     The ending time of a job is "finished_on" of "afe_host_queue_entries" table.
    257     job_info_dict will be modified in this function to store the timestamps.
    258 
    259     @param job_list: List of job ids
    260     @param job_info_dict: Dictionary that timestamps for each job will be stored
    261     """
    262     tko = tko_models.Job.objects.filter(afe_job_id__in=job_list)
    263     hqe = models.HostQueueEntry.objects.filter(job_id__in=job_list)
    264     job_start = {}
    265     for t in tko:
    266         job_start[t.afe_job_id] = time_utils.to_epoch_time(t.queued_time)
    267     job_end = {}
    268     for h in hqe:
    269         job_end[h.job_id] = time_utils.to_epoch_time(h.finished_on)
    270 
    271     for job_id in job_list:
    272         info_dict = job_info_dict.setdefault(job_id, {})
    273         info_dict.setdefault('timestamps', (job_start[job_id], job_end[job_id]))
    274 
    275 
    276 def get_job_tasks(job_list, job_info_dict):
    277     """
    278     Get task ids for each job.
    279     job_info_dict will be modified in this function to store the task ids.
    280 
    281     @param job_list: List of job ids
    282     @param job_info_dict: Dictionary that task ids for each job will be stored.
    283     """
    284     results = autotest_es.query(
    285             fields_returned=['job_id', 'task_id'],
    286             equality_constraints=[('_type', 'host_history')],
    287             batch_constraints=[('job_id', job_list)])
    288     for hit in results.hits:
    289         if 'task_id' in hit:
    290             info_dict = job_info_dict.setdefault(hit['job_id'], {})
    291             task_set = info_dict.setdefault('tasks', set())
    292             task_set.add(hit['task_id'])
    293 
    294 
    295 def get_scheduling_overhead(suite_job_id, num_child_jobs, sanity_check=True):
    296     """
    297     Calculates a scheduling overhead.
    298 
    299     A scheduling overhead is defined by the average of DUT overheads
    300     for the DUTs that the child jobs of a suite ran on.
    301 
    302     @param suite_job_id: Job id of a suite.
    303     @param num_child_jobs: Number of child jobs of the suite.
    304     @param sanity_check: Do sanity check if True.
    305     @return: Dictionary storing stats.
    306     """
    307     dut_jobs_dict, job_list = get_child_jobs_info(
    308             suite_job_id, num_child_jobs, sanity_check)
    309     job_info_dict = {}
    310     get_job_timestamps(job_list, job_info_dict)
    311     get_job_tasks(job_list, job_info_dict)
    312 
    313     dut_overheads = []
    314     avg_overhead = 0
    315     for dut, jobs in dut_jobs_dict.iteritems():
    316         print_verbose('Dut: %s, Jobs: %s', dut, jobs)
    317         overhead = get_dut_overhead(dut, jobs, job_info_dict)
    318         avg_overhead += overhead
    319         print_verbose('Dut overhead: %f', overhead)
    320         dut_overheads.append(overhead)
    321 
    322     if job_list:
    323         avg_overhead = avg_overhead / len(job_list)
    324 
    325     state_samples_dict = {}
    326     for info in job_info_dict.itervalues():
    327         for state in _states:
    328             if state in info:
    329                 samples = state_samples_dict.setdefault(state, [])
    330                 samples.append(info[state])
    331 
    332     if state_samples_dict:
    333         result = {state: mean(state_samples_dict[state])
    334                   if state in state_samples_dict else 0
    335                   for state in _states}
    336     result['suite_overhead'] = mean(dut_overheads)
    337     result['overhead'] = avg_overhead
    338     result['num_duts'] = len(dut_jobs_dict)
    339     return result
    340 
    341 
    342 def print_suite_stats(suite_stats):
    343     """Prints out statistics for a suite to standard output."""
    344     print('suite_overhead: %(suite_overhead)f, overhead: %(overhead)f,' %
    345           suite_stats),
    346     for state in _states:
    347         if state in suite_stats:
    348             print('%s: %f,' % (state, suite_stats[state])),
    349     print('num_duts: %(num_duts)d' % suite_stats)
    350 
    351 
    352 def analyze_suites(start_time, end_time):
    353     """
    354     Calculates timing stats (i.e., suite runtime, scheduling overhead)
    355     for the suites that finished within the timestamps given by parameters.
    356 
    357     @param start_time: Beginning timestamp.
    358     @param end_time: Ending timestamp.
    359     """
    360     print('Analyzing suites from %s to %s...' % (
    361           time_utils.epoch_time_to_date_string(start_time),
    362           time_utils.epoch_time_to_date_string(end_time)))
    363 
    364     if _options.bvtonly:
    365         batch_constraints = [
    366                 ('suite_name', ['bvt-inline', 'bvt-cq', 'bvt-perbuild'])]
    367     else:
    368         batch_constraints = []
    369 
    370     start_time_epoch = time_utils.to_epoch_time(start_time)
    371     end_time_epoch = time_utils.to_epoch_time(end_time)
    372     results = autotest_es.query(
    373             fields_returned=['suite_name', 'suite_job_id', 'board', 'build',
    374                              'num_child_jobs', 'duration'],
    375             equality_constraints=[('_type', job_overhead.SUITE_RUNTIME_KEY),],
    376             range_constraints=[('time_recorded', start_time_epoch,
    377                                 end_time_epoch)],
    378             sort_specs=[{'time_recorded': 'asc'}],
    379             batch_constraints=batch_constraints)
    380     print('Found %d suites' % (results.total))
    381 
    382     for hit in results.hits:
    383         suite_job_id = hit['suite_job_id']
    384 
    385         try:
    386             suite_name = hit['suite_name']
    387             num_child_jobs = int(hit['num_child_jobs'])
    388             suite_runtime = float(hit['duration'])
    389 
    390             print('Suite: %s (%s), Board: %s, Build: %s, Num child jobs: %d' % (
    391                     suite_name, suite_job_id, hit['board'], hit['build'],
    392                     num_child_jobs))
    393 
    394             suite_stats = get_scheduling_overhead(suite_job_id, num_child_jobs)
    395             print('Suite: %s (%s) runtime: %f,' % (
    396                     suite_name, suite_job_id, suite_runtime)),
    397             print_suite_stats(suite_stats)
    398 
    399             if _options.cron_mode:
    400                 key = utils.get_data_key(
    401                         'suite_time_stats', suite_name, hit['build'],
    402                         hit['board'])
    403                 autotest_stats.Timer(key).send('suite_runtime', suite_runtime)
    404                 for stat, val in suite_stats.iteritems():
    405                     autotest_stats.Timer(key).send(stat, val)
    406         except Exception as e:
    407             print('ERROR: Exception is raised while processing suite %s' % (
    408                     suite_job_id))
    409             print e
    410 
    411 
    412 def analyze_suite(suite_job_id):
    413     suite_stats = get_scheduling_overhead(suite_job_id, 0, False)
    414     print('Suite (%s)' % suite_job_id),
    415     print_suite_stats(suite_stats)
    416 
    417 
    418 def main():
    419     """main script."""
    420     parser = argparse.ArgumentParser(
    421             formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    422     parser.add_argument('-c', dest='cron_mode', action='store_true',
    423                         help=('Run in a cron mode. Cron mode '
    424                               'sends calculated stat data to Graphite.'),
    425                         default=False)
    426     parser.add_argument('-s', type=int, dest='span',
    427                         help=('Number of hours that stats should be '
    428                               'collected.'),
    429                         default=1)
    430     parser.add_argument('--bvtonly', dest='bvtonly', action='store_true',
    431                         help=('Gets bvt suites only (i.e., bvt-inline,'
    432                               'bvt-cq, bvt-perbuild).'),
    433                         default=False)
    434     parser.add_argument('--suite', type=int, dest='suite_job_id',
    435                         help=('Job id of a suite.'))
    436     parser.add_argument('--verbose', dest='verbose', action='store_true',
    437                         help=('Prints out more info if True.'),
    438                         default=False)
    439     global _options
    440     _options = parser.parse_args()
    441 
    442     if _options.suite_job_id:
    443         analyze_suite(_options.suite_job_id)
    444     else:
    445         end_time = time_utils.to_epoch_time(datetime.now())
    446         start_time = end_time - timedelta(hours=_options.span).total_seconds()
    447         analyze_suites(start_time, end_time)
    448 
    449 
    450 if __name__ == '__main__':
    451     main()
    452