Home | History | Annotate | Download | only in tko
      1 #!/usr/bin/python -u
      2 
      3 import collections
      4 import errno
      5 import fcntl
      6 import json
      7 import optparse
      8 import os
      9 import socket
     10 import subprocess
     11 import sys
     12 import time
     13 import traceback
     14 
     15 import common
     16 from autotest_lib.client.bin.result_tools import utils as result_utils
     17 from autotest_lib.client.bin.result_tools import utils_lib as result_utils_lib
     18 from autotest_lib.client.bin.result_tools import runner as result_runner
     19 from autotest_lib.client.common_lib import control_data
     20 from autotest_lib.client.common_lib import global_config
     21 from autotest_lib.client.common_lib import mail, pidfile
     22 from autotest_lib.client.common_lib import utils
     23 from autotest_lib.frontend import setup_django_environment
     24 from autotest_lib.frontend.tko import models as tko_models
     25 from autotest_lib.server import site_utils
     26 from autotest_lib.server.cros.dynamic_suite import constants
     27 from autotest_lib.site_utils.sponge_lib import sponge_utils
     28 from autotest_lib.tko import db as tko_db, utils as tko_utils
     29 from autotest_lib.tko import models, parser_lib
     30 from autotest_lib.tko.perf_upload import perf_uploader
     31 
     32 try:
     33     from chromite.lib import metrics
     34 except ImportError:
     35     metrics = utils.metrics_mock
     36 
     37 
     38 _ParseOptions = collections.namedtuple(
     39     'ParseOptions', ['reparse', 'mail_on_failure', 'dry_run', 'suite_report',
     40                      'datastore_creds', 'export_to_gcloud_path'])
     41 
     42 _HARDCODED_CONTROL_FILE_NAMES = (
     43         # client side test control, as saved in old Autotest paths.
     44         'control',
     45         # server side test control, as saved in old Autotest paths.
     46         'control.srv',
     47         # All control files, as saved in skylab.
     48         'control.from_control_name',
     49 )
     50 
     51 
     52 def parse_args():
     53     """Parse args."""
     54     # build up our options parser and parse sys.argv
     55     parser = optparse.OptionParser()
     56     parser.add_option("-m", help="Send mail for FAILED tests",
     57                       dest="mailit", action="store_true")
     58     parser.add_option("-r", help="Reparse the results of a job",
     59                       dest="reparse", action="store_true")
     60     parser.add_option("-o", help="Parse a single results directory",
     61                       dest="singledir", action="store_true")
     62     parser.add_option("-l", help=("Levels of subdirectories to include "
     63                                   "in the job name"),
     64                       type="int", dest="level", default=1)
     65     parser.add_option("-n", help="No blocking on an existing parse",
     66                       dest="noblock", action="store_true")
     67     parser.add_option("-s", help="Database server hostname",
     68                       dest="db_host", action="store")
     69     parser.add_option("-u", help="Database username", dest="db_user",
     70                       action="store")
     71     parser.add_option("-p", help="Database password", dest="db_pass",
     72                       action="store")
     73     parser.add_option("-d", help="Database name", dest="db_name",
     74                       action="store")
     75     parser.add_option("--dry-run", help="Do not actually commit any results.",
     76                       dest="dry_run", action="store_true", default=False)
     77     parser.add_option(
     78             "--detach", action="store_true",
     79             help="Detach parsing process from the caller process. Used by "
     80                  "monitor_db to safely restart without affecting parsing.",
     81             default=False)
     82     parser.add_option("--write-pidfile",
     83                       help="write pidfile (.parser_execute)",
     84                       dest="write_pidfile", action="store_true",
     85                       default=False)
     86     parser.add_option("--record-duration",
     87                       help="[DEPRECATED] Record timing to metadata db",
     88                       dest="record_duration", action="store_true",
     89                       default=False)
     90     parser.add_option("--suite-report",
     91                       help=("Allows parsing job to attempt to create a suite "
     92                             "timeline report, if it detects that the job being "
     93                             "parsed is a suite job."),
     94                       dest="suite_report", action="store_true",
     95                       default=False)
     96     parser.add_option("--datastore-creds",
     97                       help=("The path to gcloud datastore credentials file, "
     98                             "which will be used to upload suite timeline "
     99                             "report to gcloud. If not specified, the one "
    100                             "defined in shadow_config will be used."),
    101                       dest="datastore_creds", action="store", default=None)
    102     parser.add_option("--export-to-gcloud-path",
    103                       help=("The path to export_to_gcloud script. Please find "
    104                             "chromite path on your server. The script is under "
    105                             "chromite/bin/."),
    106                       dest="export_to_gcloud_path", action="store",
    107                       default=None)
    108     options, args = parser.parse_args()
    109 
    110     # we need a results directory
    111     if len(args) == 0:
    112         tko_utils.dprint("ERROR: at least one results directory must "
    113                          "be provided")
    114         parser.print_help()
    115         sys.exit(1)
    116 
    117     if not options.datastore_creds:
    118         gcloud_creds = global_config.global_config.get_config_value(
    119             'GCLOUD', 'cidb_datastore_writer_creds', default=None)
    120         options.datastore_creds = (site_utils.get_creds_abspath(gcloud_creds)
    121                                    if gcloud_creds else None)
    122 
    123     if not options.export_to_gcloud_path:
    124         export_script = 'chromiumos/chromite/bin/export_to_gcloud'
    125         # If it is a lab server, the script is under ~chromeos-test/
    126         if os.path.exists(os.path.expanduser('~chromeos-test/%s' %
    127                                              export_script)):
    128             path = os.path.expanduser('~chromeos-test/%s' % export_script)
    129         # If it is a local workstation, it is probably under ~/
    130         elif os.path.exists(os.path.expanduser('~/%s' % export_script)):
    131             path = os.path.expanduser('~/%s' % export_script)
    132         # If it is not found anywhere, the default will be set to None.
    133         else:
    134             path = None
    135         options.export_to_gcloud_path = path
    136 
    137     # pass the options back
    138     return options, args
    139 
    140 
    141 def format_failure_message(jobname, kernel, testname, status, reason):
    142     """Format failure message with the given information.
    143 
    144     @param jobname: String representing the job name.
    145     @param kernel: String representing the kernel.
    146     @param testname: String representing the test name.
    147     @param status: String representing the test status.
    148     @param reason: String representing the reason.
    149 
    150     @return: Failure message as a string.
    151     """
    152     format_string = "%-12s %-20s %-12s %-10s %s"
    153     return format_string % (jobname, kernel, testname, status, reason)
    154 
    155 
    156 def mailfailure(jobname, job, message):
    157     """Send an email about the failure.
    158 
    159     @param jobname: String representing the job name.
    160     @param job: A job object.
    161     @param message: The message to mail.
    162     """
    163     message_lines = [""]
    164     message_lines.append("The following tests FAILED for this job")
    165     message_lines.append("http://%s/results/%s" %
    166                          (socket.gethostname(), jobname))
    167     message_lines.append("")
    168     message_lines.append(format_failure_message("Job name", "Kernel",
    169                                                 "Test name", "FAIL/WARN",
    170                                                 "Failure reason"))
    171     message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8,
    172                                                 "=" * 8, "=" * 14))
    173     message_header = "\n".join(message_lines)
    174 
    175     subject = "AUTOTEST: FAILED tests from job %s" % jobname
    176     mail.send("", job.user, "", subject, message_header + message)
    177 
    178 
    179 def _invalidate_original_tests(orig_job_idx, retry_job_idx):
    180     """Retry tests invalidates original tests.
    181 
    182     Whenever a retry job is complete, we want to invalidate the original
    183     job's test results, such that the consumers of the tko database
    184     (e.g. tko frontend, wmatrix) could figure out which results are the latest.
    185 
    186     When a retry job is parsed, we retrieve the original job's afe_job_id
    187     from the retry job's keyvals, which is then converted to tko job_idx and
    188     passed into this method as |orig_job_idx|.
    189 
    190     In this method, we are going to invalidate the rows in tko_tests that are
    191     associated with the original job by flipping their 'invalid' bit to True.
    192     In addition, in tko_tests, we also maintain a pointer from the retry results
    193     to the original results, so that later we can always know which rows in
    194     tko_tests are retries and which are the corresponding original results.
    195     This is done by setting the field 'invalidates_test_idx' of the tests
    196     associated with the retry job.
    197 
    198     For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after
    199     this method is run, their tko_tests rows will look like:
    200     __________________________________________________________________________
    201     test_idx| job_idx | test            | ... | invalid | invalidates_test_idx
    202     10      | 105     | dummy_Fail.Error| ... | 1       | NULL
    203     11      | 105     | dummy_Fail.Fail | ... | 1       | NULL
    204     ...
    205     20      | 108     | dummy_Fail.Error| ... | 0       | 10
    206     21      | 108     | dummy_Fail.Fail | ... | 0       | 11
    207     __________________________________________________________________________
    208     Note the invalid bits of the rows for Job(job_idx=105) are set to '1'.
    209     And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108)
    210     are set to 10 and 11 (the test_idx of the rows for the original job).
    211 
    212     @param orig_job_idx: An integer representing the original job's
    213                          tko job_idx. Tests associated with this job will
    214                          be marked as 'invalid'.
    215     @param retry_job_idx: An integer representing the retry job's
    216                           tko job_idx. The field 'invalidates_test_idx'
    217                           of the tests associated with this job will be updated.
    218 
    219     """
    220     msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx)
    221     if not orig_job_idx or not retry_job_idx:
    222         tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg)
    223     # Using django models here makes things easier, but make sure that
    224     # before this method is called, all other relevant transactions have been
    225     # committed to avoid race condition. In the long run, we might consider
    226     # to make the rest of parser use django models.
    227     orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx)
    228     retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx)
    229 
    230     # Invalidate original tests.
    231     orig_tests.update(invalid=True)
    232 
    233     # Maintain a dictionary that maps (test, subdir) to original tests.
    234     # Note that within the scope of a job, (test, subdir) uniquelly
    235     # identifies a test run, but 'test' does not.
    236     # In a control file, one could run the same test with different
    237     # 'subdir_tag', for example,
    238     #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_1')
    239     #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_2')
    240     # In tko, we will get
    241     #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_1')
    242     #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_2')
    243     invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test
    244                          for orig_test in orig_tests}
    245     for retry in retry_tests:
    246         # It is possible that (retry.test, retry.subdir) doesn't exist
    247         # in invalidated_tests. This could happen when the original job
    248         # didn't run some of its tests. For example, a dut goes offline
    249         # since the beginning of the job, in which case invalidated_tests
    250         # will only have one entry for 'SERVER_JOB'.
    251         orig_test = invalidated_tests.get((retry.test, retry.subdir), None)
    252         if orig_test:
    253             retry.invalidates_test = orig_test
    254             retry.save()
    255     tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg)
    256 
    257 
    258 def _throttle_result_size(path):
    259     """Limit the total size of test results for the given path.
    260 
    261     @param path: Path of the result directory.
    262     """
    263     if not result_runner.ENABLE_RESULT_THROTTLING:
    264         tko_utils.dprint(
    265                 'Result throttling is not enabled. Skipping throttling %s' %
    266                 path)
    267         return
    268 
    269     max_result_size_KB = _max_result_size_from_control(path)
    270     if max_result_size_KB is None:
    271         max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB
    272 
    273     try:
    274         result_utils.execute(path, max_result_size_KB)
    275     except:
    276         tko_utils.dprint(
    277                 'Failed to throttle result size of %s.\nDetails %s' %
    278                 (path, traceback.format_exc()))
    279 
    280 
    281 def _max_result_size_from_control(path):
    282     """Gets the max result size set in a control file, if any.
    283 
    284     If not overrides is found, returns None.
    285     """
    286     for control_file in _HARDCODED_CONTROL_FILE_NAMES:
    287         control = os.path.join(path, control_file)
    288         if not os.path.exists(control):
    289             continue
    290 
    291         try:
    292             max_result_size_KB = control_data.parse_control(
    293                     control, raise_warnings=False).max_result_size_KB
    294             if max_result_size_KB != control_data.DEFAULT_MAX_RESULT_SIZE_KB:
    295                 return max_result_size_KB
    296         except IOError as e:
    297             tko_utils.dprint(
    298                     'Failed to access %s. Error: %s\nDetails %s' %
    299                     (control, e, traceback.format_exc()))
    300         except control_data.ControlVariableException as e:
    301             tko_utils.dprint(
    302                     'Failed to parse %s. Error: %s\nDetails %s' %
    303                     (control, e, traceback.format_exc()))
    304     return None
    305 
    306 
    307 def export_tko_job_to_file(job, jobname, filename):
    308     """Exports the tko job to disk file.
    309 
    310     @param job: database object.
    311     @param jobname: the job name as string.
    312     @param filename: The path to the results to be parsed.
    313     """
    314     from autotest_lib.tko import job_serializer
    315 
    316     serializer = job_serializer.JobSerializer()
    317     serializer.serialize_to_binary(job, jobname, filename)
    318 
    319 
    320 def parse_one(db, pid_file_manager, jobname, path, parse_options):
    321     """Parse a single job. Optionally send email on failure.
    322 
    323     @param db: database object.
    324     @param pid_file_manager: pidfile.PidFileManager object.
    325     @param jobname: the tag used to search for existing job in db,
    326                     e.g. '1234-chromeos-test/host1'
    327     @param path: The path to the results to be parsed.
    328     @param parse_options: _ParseOptions instance.
    329     """
    330     reparse = parse_options.reparse
    331     mail_on_failure = parse_options.mail_on_failure
    332     dry_run = parse_options.dry_run
    333     suite_report = parse_options.suite_report
    334     datastore_creds = parse_options.datastore_creds
    335     export_to_gcloud_path = parse_options.export_to_gcloud_path
    336 
    337     tko_utils.dprint("\nScanning %s (%s)" % (jobname, path))
    338     old_job_idx = db.find_job(jobname)
    339     if old_job_idx is not None and not reparse:
    340         tko_utils.dprint("! Job is already parsed, done")
    341         return
    342 
    343     # look up the status version
    344     job_keyval = models.job.read_keyval(path)
    345     status_version = job_keyval.get("status_version", 0)
    346 
    347     parser = parser_lib.parser(status_version)
    348     job = parser.make_job(path)
    349     tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname))
    350     status_log_path = _find_status_log_path(path)
    351     if not status_log_path:
    352         tko_utils.dprint("! Unable to parse job, no status file")
    353         return
    354     _parse_status_log(parser, job, status_log_path)
    355 
    356     if old_job_idx is not None:
    357         job.job_idx = old_job_idx
    358         unmatched_tests = _match_existing_tests(db, job)
    359         if not dry_run:
    360             _delete_tests_from_db(db, unmatched_tests)
    361 
    362     job.afe_job_id = tko_utils.get_afe_job_id(jobname)
    363     job.skylab_task_id = tko_utils.get_skylab_task_id(jobname)
    364     job.afe_parent_job_id = job_keyval.get(constants.PARENT_JOB_ID)
    365     job.skylab_parent_task_id = job_keyval.get(constants.PARENT_JOB_ID)
    366     job.build = None
    367     job.board = None
    368     job.build_version = None
    369     job.suite = None
    370     if job.label:
    371         label_info = site_utils.parse_job_name(job.label)
    372         if label_info:
    373             job.build = label_info.get('build', None)
    374             job.build_version = label_info.get('build_version', None)
    375             job.board = label_info.get('board', None)
    376             job.suite = label_info.get('suite', None)
    377 
    378     result_utils_lib.LOG =  tko_utils.dprint
    379     _throttle_result_size(path)
    380 
    381     # Record test result size to job_keyvals
    382     start_time = time.time()
    383     result_size_info = site_utils.collect_result_sizes(
    384             path, log=tko_utils.dprint)
    385     tko_utils.dprint('Finished collecting result sizes after %s seconds' %
    386                      (time.time()-start_time))
    387     job.keyval_dict.update(result_size_info.__dict__)
    388 
    389     # TODO(dshi): Update sizes with sponge_invocation.xml and throttle it.
    390 
    391     # check for failures
    392     message_lines = [""]
    393     job_successful = True
    394     for test in job.tests:
    395         if not test.subdir:
    396             continue
    397         tko_utils.dprint("* testname, subdir, status, reason: %s %s %s %s"
    398                          % (test.testname, test.subdir, test.status,
    399                             test.reason))
    400         if test.status not in ('GOOD', 'WARN'):
    401             job_successful = False
    402             pid_file_manager.num_tests_failed += 1
    403             message_lines.append(format_failure_message(
    404                 jobname, test.kernel.base, test.subdir,
    405                 test.status, test.reason))
    406 
    407     message = "\n".join(message_lines)
    408 
    409     if not dry_run:
    410         # send out a email report of failure
    411         if len(message) > 2 and mail_on_failure:
    412             tko_utils.dprint("Sending email report of failure on %s to %s"
    413                                 % (jobname, job.user))
    414             mailfailure(jobname, job, message)
    415 
    416         # Upload perf values to the perf dashboard, if applicable.
    417         for test in job.tests:
    418             perf_uploader.upload_test(job, test, jobname)
    419 
    420         # Upload job details to Sponge.
    421         sponge_url = sponge_utils.upload_results(job, log=tko_utils.dprint)
    422         if sponge_url:
    423             job.keyval_dict['sponge_url'] = sponge_url
    424 
    425         _write_job_to_db(db, jobname, job)
    426 
    427         # Verify the job data is written to the database.
    428         if job.tests:
    429             tests_in_db = db.find_tests(job.job_idx)
    430             tests_in_db_count = len(tests_in_db) if tests_in_db else 0
    431             if tests_in_db_count != len(job.tests):
    432                 tko_utils.dprint(
    433                         'Failed to find enough tests for job_idx: %d. The '
    434                         'job should have %d tests, only found %d tests.' %
    435                         (job.job_idx, len(job.tests), tests_in_db_count))
    436                 metrics.Counter(
    437                         'chromeos/autotest/result/db_save_failure',
    438                         description='The number of times parse failed to '
    439                         'save job to TKO database.').increment()
    440 
    441         # Although the cursor has autocommit, we still need to force it to
    442         # commit existing changes before we can use django models, otherwise
    443         # it will go into deadlock when django models try to start a new
    444         # trasaction while the current one has not finished yet.
    445         db.commit()
    446 
    447         # Handle retry job.
    448         orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID,
    449                                             None)
    450         if orig_afe_job_id:
    451             orig_job_idx = tko_models.Job.objects.get(
    452                     afe_job_id=orig_afe_job_id).job_idx
    453             _invalidate_original_tests(orig_job_idx, job.job_idx)
    454 
    455     # Serializing job into a binary file
    456     export_tko_to_file = global_config.global_config.get_config_value(
    457             'AUTOSERV', 'export_tko_job_to_file', type=bool, default=False)
    458 
    459     binary_file_name = os.path.join(path, "job.serialize")
    460     if export_tko_to_file:
    461         export_tko_job_to_file(job, jobname, binary_file_name)
    462 
    463     if not dry_run:
    464         db.commit()
    465 
    466     # Generate a suite report.
    467     # Check whether this is a suite job, a suite job will be a hostless job, its
    468     # jobname will be <JOB_ID>-<USERNAME>/hostless, the suite field will not be
    469     # NULL. Only generate timeline report when datastore_parent_key is given.
    470     datastore_parent_key = job_keyval.get('datastore_parent_key', None)
    471     provision_job_id = job_keyval.get('provision_job_id', None)
    472     if (suite_report and jobname.endswith('/hostless')
    473         and job.suite and datastore_parent_key):
    474         tko_utils.dprint('Start dumping suite timing report...')
    475         timing_log = os.path.join(path, 'suite_timing.log')
    476         dump_cmd = ("%s/site_utils/dump_suite_report.py %s "
    477                     "--output='%s' --debug" %
    478                     (common.autotest_dir, job.afe_job_id,
    479                         timing_log))
    480 
    481         if provision_job_id is not None:
    482             dump_cmd += " --provision_job_id=%d" % int(provision_job_id)
    483 
    484         subprocess.check_output(dump_cmd, shell=True)
    485         tko_utils.dprint('Successfully finish dumping suite timing report')
    486 
    487         if (datastore_creds and export_to_gcloud_path
    488             and os.path.exists(export_to_gcloud_path)):
    489             upload_cmd = [export_to_gcloud_path, datastore_creds,
    490                             timing_log, '--parent_key',
    491                             datastore_parent_key]
    492             tko_utils.dprint('Start exporting timeline report to gcloud')
    493             subprocess.check_output(upload_cmd)
    494             tko_utils.dprint('Successfully export timeline report to '
    495                                 'gcloud')
    496         else:
    497             tko_utils.dprint('DEBUG: skip exporting suite timeline to '
    498                                 'gcloud, because either gcloud creds or '
    499                                 'export_to_gcloud script is not found.')
    500 
    501     # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of
    502     # the function, so any failure, e.g., db connection error, will stop
    503     # gs_offloader_instructions being updated, and logs can be uploaded for
    504     # troubleshooting.
    505     if job_successful:
    506         # Check if we should not offload this test's results.
    507         if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False):
    508             # Update the gs_offloader_instructions json file.
    509             gs_instructions_file = os.path.join(
    510                     path, constants.GS_OFFLOADER_INSTRUCTIONS)
    511             gs_offloader_instructions = {}
    512             if os.path.exists(gs_instructions_file):
    513                 with open(gs_instructions_file, 'r') as f:
    514                     gs_offloader_instructions = json.load(f)
    515 
    516             gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True
    517             with open(gs_instructions_file, 'w') as f:
    518                 json.dump(gs_offloader_instructions, f)
    519 
    520 
    521 def _write_job_to_db(db, jobname, job):
    522     """Write all TKO data associated with a job to DB.
    523 
    524     This updates the job object as a side effect.
    525 
    526     @param db: tko.db.db_sql object.
    527     @param jobname: Name of the job to write.
    528     @param job: tko.models.job object.
    529     """
    530     db.insert_or_update_machine(job)
    531     db.insert_job(jobname, job)
    532     db.insert_or_update_task_reference(
    533             job,
    534             'skylab' if tko_utils.is_skylab_task(jobname) else 'afe',
    535     )
    536     db.update_job_keyvals(job)
    537     for test in job.tests:
    538         db.insert_test(job, test)
    539 
    540 
    541 def _find_status_log_path(path):
    542     if os.path.exists(os.path.join(path, "status.log")):
    543         return os.path.join(path, "status.log")
    544     if os.path.exists(os.path.join(path, "status")):
    545         return os.path.join(path, "status")
    546     return ""
    547 
    548 
    549 def _parse_status_log(parser, job, status_log_path):
    550     status_lines = open(status_log_path).readlines()
    551     parser.start(job)
    552     tests = parser.end(status_lines)
    553 
    554     # parser.end can return the same object multiple times, so filter out dups
    555     job.tests = []
    556     already_added = set()
    557     for test in tests:
    558         if test not in already_added:
    559             already_added.add(test)
    560             job.tests.append(test)
    561 
    562 
    563 def _match_existing_tests(db, job):
    564     """Find entries in the DB corresponding to the job's tests, update job.
    565 
    566     @return: Any unmatched tests in the db.
    567     """
    568     old_job_idx = job.job_idx
    569     raw_old_tests = db.select("test_idx,subdir,test", "tko_tests",
    570                                 {"job_idx": old_job_idx})
    571     if raw_old_tests:
    572         old_tests = dict(((test, subdir), test_idx)
    573                             for test_idx, subdir, test in raw_old_tests)
    574     else:
    575         old_tests = {}
    576 
    577     for test in job.tests:
    578         test_idx = old_tests.pop((test.testname, test.subdir), None)
    579         if test_idx is not None:
    580             test.test_idx = test_idx
    581         else:
    582             tko_utils.dprint("! Reparse returned new test "
    583                                 "testname=%r subdir=%r" %
    584                                 (test.testname, test.subdir))
    585     return old_tests
    586 
    587 
    588 def _delete_tests_from_db(db, tests):
    589     for test_idx in tests.itervalues():
    590         where = {'test_idx' : test_idx}
    591         db.delete('tko_iteration_result', where)
    592         db.delete('tko_iteration_perf_value', where)
    593         db.delete('tko_iteration_attributes', where)
    594         db.delete('tko_test_attributes', where)
    595         db.delete('tko_test_labels_tests', {'test_id': test_idx})
    596         db.delete('tko_tests', where)
    597 
    598 
    599 def _get_job_subdirs(path):
    600     """
    601     Returns a list of job subdirectories at path. Returns None if the test
    602     is itself a job directory. Does not recurse into the subdirs.
    603     """
    604     # if there's a .machines file, use it to get the subdirs
    605     machine_list = os.path.join(path, ".machines")
    606     if os.path.exists(machine_list):
    607         subdirs = set(line.strip() for line in file(machine_list))
    608         existing_subdirs = set(subdir for subdir in subdirs
    609                                if os.path.exists(os.path.join(path, subdir)))
    610         if len(existing_subdirs) != 0:
    611             return existing_subdirs
    612 
    613     # if this dir contains ONLY subdirectories, return them
    614     contents = set(os.listdir(path))
    615     contents.discard(".parse.lock")
    616     subdirs = set(sub for sub in contents if
    617                   os.path.isdir(os.path.join(path, sub)))
    618     if len(contents) == len(subdirs) != 0:
    619         return subdirs
    620 
    621     # this is a job directory, or something else we don't understand
    622     return None
    623 
    624 
    625 def parse_leaf_path(db, pid_file_manager, path, level, parse_options):
    626     """Parse a leaf path.
    627 
    628     @param db: database handle.
    629     @param pid_file_manager: pidfile.PidFileManager object.
    630     @param path: The path to the results to be parsed.
    631     @param level: Integer, level of subdirectories to include in the job name.
    632     @param parse_options: _ParseOptions instance.
    633 
    634     @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1'
    635     """
    636     job_elements = path.split("/")[-level:]
    637     jobname = "/".join(job_elements)
    638     db.run_with_retry(parse_one, db, pid_file_manager, jobname, path,
    639                       parse_options)
    640     return jobname
    641 
    642 
    643 def parse_path(db, pid_file_manager, path, level, parse_options):
    644     """Parse a path
    645 
    646     @param db: database handle.
    647     @param pid_file_manager: pidfile.PidFileManager object.
    648     @param path: The path to the results to be parsed.
    649     @param level: Integer, level of subdirectories to include in the job name.
    650     @param parse_options: _ParseOptions instance.
    651 
    652     @returns: A set of job names of the parsed jobs.
    653               set(['123-chromeos-test/host1', '123-chromeos-test/host2'])
    654     """
    655     processed_jobs = set()
    656     job_subdirs = _get_job_subdirs(path)
    657     if job_subdirs is not None:
    658         # parse status.log in current directory, if it exists. multi-machine
    659         # synchronous server side tests record output in this directory. without
    660         # this check, we do not parse these results.
    661         if os.path.exists(os.path.join(path, 'status.log')):
    662             new_job = parse_leaf_path(db, pid_file_manager, path, level,
    663                                       parse_options)
    664             processed_jobs.add(new_job)
    665         # multi-machine job
    666         for subdir in job_subdirs:
    667             jobpath = os.path.join(path, subdir)
    668             new_jobs = parse_path(db, pid_file_manager, jobpath, level + 1,
    669                                   parse_options)
    670             processed_jobs.update(new_jobs)
    671     else:
    672         # single machine job
    673         new_job = parse_leaf_path(db, pid_file_manager, path, level,
    674                                   parse_options)
    675         processed_jobs.add(new_job)
    676     return processed_jobs
    677 
    678 
    679 def _detach_from_parent_process():
    680     """Allow reparenting the parse process away from caller.
    681 
    682     When monitor_db is run via upstart, restarting the job sends SIGTERM to
    683     the whole process group. This makes us immune from that.
    684     """
    685     if os.getpid() != os.getpgid(0):
    686         os.setsid()
    687 
    688 
    689 def main():
    690     """tko_parse entry point."""
    691     options, args = parse_args()
    692 
    693     # We are obliged to use indirect=False, not use the SetupTsMonGlobalState
    694     # context manager, and add a manual flush, because tko/parse is expected to
    695     # be a very short lived (<1 min) script when working effectively, and we
    696     # can't afford to either a) wait for up to 1min for metrics to flush at the
    697     # end or b) drop metrics that were sent within the last minute of execution.
    698     site_utils.SetupTsMonGlobalState('tko_parse', indirect=False,
    699                                      short_lived=True)
    700     try:
    701         with metrics.SuccessCounter('chromeos/autotest/tko_parse/runs'):
    702             _main_with_options(options, args)
    703     finally:
    704         metrics.Flush()
    705 
    706 
    707 def _main_with_options(options, args):
    708     """Entry point with options parsed and metrics already set up."""
    709     # Record the processed jobs so that
    710     # we can send the duration of parsing to metadata db.
    711     processed_jobs = set()
    712 
    713     if options.detach:
    714         _detach_from_parent_process()
    715 
    716     parse_options = _ParseOptions(options.reparse, options.mailit,
    717                                   options.dry_run, options.suite_report,
    718                                   options.datastore_creds,
    719                                   options.export_to_gcloud_path)
    720     results_dir = os.path.abspath(args[0])
    721     assert os.path.exists(results_dir)
    722 
    723     pid_file_manager = pidfile.PidFileManager("parser", results_dir)
    724 
    725     if options.write_pidfile:
    726         pid_file_manager.open_file()
    727 
    728     try:
    729         # build up the list of job dirs to parse
    730         if options.singledir:
    731             jobs_list = [results_dir]
    732         else:
    733             jobs_list = [os.path.join(results_dir, subdir)
    734                          for subdir in os.listdir(results_dir)]
    735 
    736         # build up the database
    737         db = tko_db.db(autocommit=False, host=options.db_host,
    738                        user=options.db_user, password=options.db_pass,
    739                        database=options.db_name)
    740 
    741         # parse all the jobs
    742         for path in jobs_list:
    743             lockfile = open(os.path.join(path, ".parse.lock"), "w")
    744             flags = fcntl.LOCK_EX
    745             if options.noblock:
    746                 flags |= fcntl.LOCK_NB
    747             try:
    748                 fcntl.flock(lockfile, flags)
    749             except IOError, e:
    750                 # lock is not available and nonblock has been requested
    751                 if e.errno == errno.EWOULDBLOCK:
    752                     lockfile.close()
    753                     continue
    754                 else:
    755                     raise # something unexpected happened
    756             try:
    757                 new_jobs = parse_path(db, pid_file_manager, path, options.level,
    758                                       parse_options)
    759                 processed_jobs.update(new_jobs)
    760 
    761             finally:
    762                 fcntl.flock(lockfile, fcntl.LOCK_UN)
    763                 lockfile.close()
    764 
    765     except Exception as e:
    766         pid_file_manager.close_file(1)
    767         raise
    768     else:
    769         pid_file_manager.close_file(0)
    770 
    771 
    772 if __name__ == "__main__":
    773     main()
    774