Home | History | Annotate | Download | only in tko
      1 #!/usr/bin/python -u
      2 
      3 import collections
      4 import datetime
      5 import errno
      6 import fcntl
      7 import json
      8 import optparse
      9 import os
     10 import socket
     11 import subprocess
     12 import sys
     13 import time
     14 import traceback
     15 
     16 import common
     17 from autotest_lib.client.bin.result_tools import utils as result_utils
     18 from autotest_lib.client.bin.result_tools import utils_lib as result_utils_lib
     19 from autotest_lib.client.bin.result_tools import runner as result_runner
     20 from autotest_lib.client.common_lib import control_data
     21 from autotest_lib.client.common_lib import global_config
     22 from autotest_lib.client.common_lib import mail, pidfile
     23 from autotest_lib.client.common_lib import utils
     24 from autotest_lib.frontend import setup_django_environment
     25 from autotest_lib.frontend.tko import models as tko_models
     26 from autotest_lib.server import site_utils
     27 from autotest_lib.server.cros.dynamic_suite import constants
     28 from autotest_lib.site_utils import job_overhead
     29 from autotest_lib.site_utils.sponge_lib import sponge_utils
     30 from autotest_lib.tko import db as tko_db, utils as tko_utils
     31 from autotest_lib.tko import models, parser_lib
     32 from autotest_lib.tko.perf_upload import perf_uploader
     33 
     34 try:
     35     from chromite.lib import metrics
     36 except ImportError:
     37     metrics = utils.metrics_mock
     38 
     39 
     40 _ParseOptions = collections.namedtuple(
     41     'ParseOptions', ['reparse', 'mail_on_failure', 'dry_run', 'suite_report',
     42                      'datastore_creds', 'export_to_gcloud_path'])
     43 
     44 def parse_args():
     45     """Parse args."""
     46     # build up our options parser and parse sys.argv
     47     parser = optparse.OptionParser()
     48     parser.add_option("-m", help="Send mail for FAILED tests",
     49                       dest="mailit", action="store_true")
     50     parser.add_option("-r", help="Reparse the results of a job",
     51                       dest="reparse", action="store_true")
     52     parser.add_option("-o", help="Parse a single results directory",
     53                       dest="singledir", action="store_true")
     54     parser.add_option("-l", help=("Levels of subdirectories to include "
     55                                   "in the job name"),
     56                       type="int", dest="level", default=1)
     57     parser.add_option("-n", help="No blocking on an existing parse",
     58                       dest="noblock", action="store_true")
     59     parser.add_option("-s", help="Database server hostname",
     60                       dest="db_host", action="store")
     61     parser.add_option("-u", help="Database username", dest="db_user",
     62                       action="store")
     63     parser.add_option("-p", help="Database password", dest="db_pass",
     64                       action="store")
     65     parser.add_option("-d", help="Database name", dest="db_name",
     66                       action="store")
     67     parser.add_option("--dry-run", help="Do not actually commit any results.",
     68                       dest="dry_run", action="store_true", default=False)
     69     parser.add_option(
     70             "--detach", action="store_true",
     71             help="Detach parsing process from the caller process. Used by "
     72                  "monitor_db to safely restart without affecting parsing.",
     73             default=False)
     74     parser.add_option("--write-pidfile",
     75                       help="write pidfile (.parser_execute)",
     76                       dest="write_pidfile", action="store_true",
     77                       default=False)
     78     parser.add_option("--record-duration",
     79                       help="Record timing to metadata db",
     80                       dest="record_duration", action="store_true",
     81                       default=False)
     82     parser.add_option("--suite-report",
     83                       help=("Allows parsing job to attempt to create a suite "
     84                             "timeline report, if it detects that the job being "
     85                             "parsed is a suite job."),
     86                       dest="suite_report", action="store_true",
     87                       default=False)
     88     parser.add_option("--datastore-creds",
     89                       help=("The path to gcloud datastore credentials file, "
     90                             "which will be used to upload suite timeline "
     91                             "report to gcloud. If not specified, the one "
     92                             "defined in shadow_config will be used."),
     93                       dest="datastore_creds", action="store", default=None)
     94     parser.add_option("--export-to-gcloud-path",
     95                       help=("The path to export_to_gcloud script. Please find "
     96                             "chromite path on your server. The script is under "
     97                             "chromite/bin/."),
     98                       dest="export_to_gcloud_path", action="store",
     99                       default=None)
    100     options, args = parser.parse_args()
    101 
    102     # we need a results directory
    103     if len(args) == 0:
    104         tko_utils.dprint("ERROR: at least one results directory must "
    105                          "be provided")
    106         parser.print_help()
    107         sys.exit(1)
    108 
    109     if not options.datastore_creds:
    110         gcloud_creds = global_config.global_config.get_config_value(
    111             'GCLOUD', 'cidb_datastore_writer_creds', default=None)
    112         options.datastore_creds = (site_utils.get_creds_abspath(gcloud_creds)
    113                                    if gcloud_creds else None)
    114 
    115     if not options.export_to_gcloud_path:
    116         export_script = 'chromiumos/chromite/bin/export_to_gcloud'
    117         # If it is a lab server, the script is under ~chromeos-test/
    118         if os.path.exists(os.path.expanduser('~chromeos-test/%s' %
    119                                              export_script)):
    120             path = os.path.expanduser('~chromeos-test/%s' % export_script)
    121         # If it is a local workstation, it is probably under ~/
    122         elif os.path.exists(os.path.expanduser('~/%s' % export_script)):
    123             path = os.path.expanduser('~/%s' % export_script)
    124         # If it is not found anywhere, the default will be set to None.
    125         else:
    126             path = None
    127         options.export_to_gcloud_path = path
    128 
    129     # pass the options back
    130     return options, args
    131 
    132 
    133 def format_failure_message(jobname, kernel, testname, status, reason):
    134     """Format failure message with the given information.
    135 
    136     @param jobname: String representing the job name.
    137     @param kernel: String representing the kernel.
    138     @param testname: String representing the test name.
    139     @param status: String representing the test status.
    140     @param reason: String representing the reason.
    141 
    142     @return: Failure message as a string.
    143     """
    144     format_string = "%-12s %-20s %-12s %-10s %s"
    145     return format_string % (jobname, kernel, testname, status, reason)
    146 
    147 
    148 def mailfailure(jobname, job, message):
    149     """Send an email about the failure.
    150 
    151     @param jobname: String representing the job name.
    152     @param job: A job object.
    153     @param message: The message to mail.
    154     """
    155     message_lines = [""]
    156     message_lines.append("The following tests FAILED for this job")
    157     message_lines.append("http://%s/results/%s" %
    158                          (socket.gethostname(), jobname))
    159     message_lines.append("")
    160     message_lines.append(format_failure_message("Job name", "Kernel",
    161                                                 "Test name", "FAIL/WARN",
    162                                                 "Failure reason"))
    163     message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8,
    164                                                 "=" * 8, "=" * 14))
    165     message_header = "\n".join(message_lines)
    166 
    167     subject = "AUTOTEST: FAILED tests from job %s" % jobname
    168     mail.send("", job.user, "", subject, message_header + message)
    169 
    170 
    171 def _invalidate_original_tests(orig_job_idx, retry_job_idx):
    172     """Retry tests invalidates original tests.
    173 
    174     Whenever a retry job is complete, we want to invalidate the original
    175     job's test results, such that the consumers of the tko database
    176     (e.g. tko frontend, wmatrix) could figure out which results are the latest.
    177 
    178     When a retry job is parsed, we retrieve the original job's afe_job_id
    179     from the retry job's keyvals, which is then converted to tko job_idx and
    180     passed into this method as |orig_job_idx|.
    181 
    182     In this method, we are going to invalidate the rows in tko_tests that are
    183     associated with the original job by flipping their 'invalid' bit to True.
    184     In addition, in tko_tests, we also maintain a pointer from the retry results
    185     to the original results, so that later we can always know which rows in
    186     tko_tests are retries and which are the corresponding original results.
    187     This is done by setting the field 'invalidates_test_idx' of the tests
    188     associated with the retry job.
    189 
    190     For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after
    191     this method is run, their tko_tests rows will look like:
    192     __________________________________________________________________________
    193     test_idx| job_idx | test            | ... | invalid | invalidates_test_idx
    194     10      | 105     | dummy_Fail.Error| ... | 1       | NULL
    195     11      | 105     | dummy_Fail.Fail | ... | 1       | NULL
    196     ...
    197     20      | 108     | dummy_Fail.Error| ... | 0       | 10
    198     21      | 108     | dummy_Fail.Fail | ... | 0       | 11
    199     __________________________________________________________________________
    200     Note the invalid bits of the rows for Job(job_idx=105) are set to '1'.
    201     And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108)
    202     are set to 10 and 11 (the test_idx of the rows for the original job).
    203 
    204     @param orig_job_idx: An integer representing the original job's
    205                          tko job_idx. Tests associated with this job will
    206                          be marked as 'invalid'.
    207     @param retry_job_idx: An integer representing the retry job's
    208                           tko job_idx. The field 'invalidates_test_idx'
    209                           of the tests associated with this job will be updated.
    210 
    211     """
    212     msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx)
    213     if not orig_job_idx or not retry_job_idx:
    214         tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg)
    215     # Using django models here makes things easier, but make sure that
    216     # before this method is called, all other relevant transactions have been
    217     # committed to avoid race condition. In the long run, we might consider
    218     # to make the rest of parser use django models.
    219     orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx)
    220     retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx)
    221 
    222     # Invalidate original tests.
    223     orig_tests.update(invalid=True)
    224 
    225     # Maintain a dictionary that maps (test, subdir) to original tests.
    226     # Note that within the scope of a job, (test, subdir) uniquelly
    227     # identifies a test run, but 'test' does not.
    228     # In a control file, one could run the same test with different
    229     # 'subdir_tag', for example,
    230     #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_1')
    231     #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_2')
    232     # In tko, we will get
    233     #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_1')
    234     #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_2')
    235     invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test
    236                          for orig_test in orig_tests}
    237     for retry in retry_tests:
    238         # It is possible that (retry.test, retry.subdir) doesn't exist
    239         # in invalidated_tests. This could happen when the original job
    240         # didn't run some of its tests. For example, a dut goes offline
    241         # since the beginning of the job, in which case invalidated_tests
    242         # will only have one entry for 'SERVER_JOB'.
    243         orig_test = invalidated_tests.get((retry.test, retry.subdir), None)
    244         if orig_test:
    245             retry.invalidates_test = orig_test
    246             retry.save()
    247     tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg)
    248 
    249 
    250 def _throttle_result_size(path):
    251     """Limit the total size of test results for the given path.
    252 
    253     @param path: Path of the result directory.
    254     """
    255     if not result_runner.ENABLE_RESULT_THROTTLING:
    256         tko_utils.dprint(
    257                 'Result throttling is not enabled. Skipping throttling %s' %
    258                 path)
    259         return
    260 
    261     max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB
    262     # Client side test saves the test control to file `control`, while server
    263     # side test saves the test control to file `control.srv`
    264     for control_file in ['control', 'control.srv']:
    265         control = os.path.join(path, control_file)
    266         try:
    267             max_result_size_KB = control_data.parse_control(
    268                     control, raise_warnings=False).max_result_size_KB
    269             # Any value different from the default is considered to be the one
    270             # set in the test control file.
    271             if max_result_size_KB != control_data.DEFAULT_MAX_RESULT_SIZE_KB:
    272                 break
    273         except IOError as e:
    274             tko_utils.dprint(
    275                     'Failed to access %s. Error: %s\nDetails %s' %
    276                     (control, e, traceback.format_exc()))
    277         except control_data.ControlVariableException as e:
    278             tko_utils.dprint(
    279                     'Failed to parse %s. Error: %s\nDetails %s' %
    280                     (control, e, traceback.format_exc()))
    281 
    282     try:
    283         result_utils.execute(path, max_result_size_KB)
    284     except:
    285         tko_utils.dprint(
    286                 'Failed to throttle result size of %s.\nDetails %s' %
    287                 (path, traceback.format_exc()))
    288 
    289 
    290 def export_tko_job_to_file(job, jobname, filename):
    291     """Exports the tko job to disk file.
    292 
    293     @param job: database object.
    294     @param jobname: the job name as string.
    295     @param filename: The path to the results to be parsed.
    296     """
    297     try:
    298         from autotest_lib.tko import job_serializer
    299 
    300         serializer = job_serializer.JobSerializer()
    301         serializer.serialize_to_binary(job, jobname, filename)
    302     except ImportError:
    303         tko_utils.dprint("WARNING: tko_pb2.py doesn't exist. Create by "
    304                          "compiling tko/tko.proto.")
    305 
    306 
    307 def parse_one(db, jobname, path, parse_options):
    308     """Parse a single job. Optionally send email on failure.
    309 
    310     @param db: database object.
    311     @param jobname: the tag used to search for existing job in db,
    312                     e.g. '1234-chromeos-test/host1'
    313     @param path: The path to the results to be parsed.
    314     @param parse_options: _ParseOptions instance.
    315     """
    316     reparse = parse_options.reparse
    317     mail_on_failure = parse_options.mail_on_failure
    318     dry_run = parse_options.dry_run
    319     suite_report = parse_options.suite_report
    320     datastore_creds = parse_options.datastore_creds
    321     export_to_gcloud_path = parse_options.export_to_gcloud_path
    322 
    323     tko_utils.dprint("\nScanning %s (%s)" % (jobname, path))
    324     old_job_idx = db.find_job(jobname)
    325     # old tests is a dict from tuple (test_name, subdir) to test_idx
    326     old_tests = {}
    327     if old_job_idx is not None:
    328         if not reparse:
    329             tko_utils.dprint("! Job is already parsed, done")
    330             return
    331 
    332         raw_old_tests = db.select("test_idx,subdir,test", "tko_tests",
    333                                   {"job_idx": old_job_idx})
    334         if raw_old_tests:
    335             old_tests = dict(((test, subdir), test_idx)
    336                              for test_idx, subdir, test in raw_old_tests)
    337 
    338     # look up the status version
    339     job_keyval = models.job.read_keyval(path)
    340     status_version = job_keyval.get("status_version", 0)
    341 
    342     # parse out the job
    343     parser = parser_lib.parser(status_version)
    344     job = parser.make_job(path)
    345     status_log = os.path.join(path, "status.log")
    346     if not os.path.exists(status_log):
    347         status_log = os.path.join(path, "status")
    348     if not os.path.exists(status_log):
    349         tko_utils.dprint("! Unable to parse job, no status file")
    350         return
    351 
    352     # parse the status logs
    353     tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname))
    354     status_lines = open(status_log).readlines()
    355     parser.start(job)
    356     tests = parser.end(status_lines)
    357 
    358     # parser.end can return the same object multiple times, so filter out dups
    359     job.tests = []
    360     already_added = set()
    361     for test in tests:
    362         if test not in already_added:
    363             already_added.add(test)
    364             job.tests.append(test)
    365 
    366     # try and port test_idx over from the old tests, but if old tests stop
    367     # matching up with new ones just give up
    368     if reparse and old_job_idx is not None:
    369         job.index = old_job_idx
    370         for test in job.tests:
    371             test_idx = old_tests.pop((test.testname, test.subdir), None)
    372             if test_idx is not None:
    373                 test.test_idx = test_idx
    374             else:
    375                 tko_utils.dprint("! Reparse returned new test "
    376                                  "testname=%r subdir=%r" %
    377                                  (test.testname, test.subdir))
    378         if not dry_run:
    379             for test_idx in old_tests.itervalues():
    380                 where = {'test_idx' : test_idx}
    381                 db.delete('tko_iteration_result', where)
    382                 db.delete('tko_iteration_perf_value', where)
    383                 db.delete('tko_iteration_attributes', where)
    384                 db.delete('tko_test_attributes', where)
    385                 db.delete('tko_test_labels_tests', {'test_id': test_idx})
    386                 db.delete('tko_tests', where)
    387 
    388     job.build = None
    389     job.board = None
    390     job.build_version = None
    391     job.suite = None
    392     if job.label:
    393         label_info = site_utils.parse_job_name(job.label)
    394         if label_info:
    395             job.build = label_info.get('build', None)
    396             job.build_version = label_info.get('build_version', None)
    397             job.board = label_info.get('board', None)
    398             job.suite = label_info.get('suite', None)
    399 
    400     result_utils_lib.LOG =  tko_utils.dprint
    401     _throttle_result_size(path)
    402 
    403     # Record test result size to job_keyvals
    404     start_time = time.time()
    405     result_size_info = site_utils.collect_result_sizes(
    406             path, log=tko_utils.dprint)
    407     tko_utils.dprint('Finished collecting result sizes after %s seconds' %
    408                      (time.time()-start_time))
    409     job.keyval_dict.update(result_size_info.__dict__)
    410 
    411     # TODO(dshi): Update sizes with sponge_invocation.xml and throttle it.
    412 
    413     # check for failures
    414     message_lines = [""]
    415     job_successful = True
    416     for test in job.tests:
    417         if not test.subdir:
    418             continue
    419         tko_utils.dprint("* testname, subdir, status, reason: %s %s %s %s"
    420                          % (test.testname, test.subdir, test.status,
    421                             test.reason))
    422         if test.status != 'GOOD':
    423             job_successful = False
    424             message_lines.append(format_failure_message(
    425                 jobname, test.kernel.base, test.subdir,
    426                 test.status, test.reason))
    427     try:
    428         message = "\n".join(message_lines)
    429 
    430         if not dry_run:
    431             # send out a email report of failure
    432             if len(message) > 2 and mail_on_failure:
    433                 tko_utils.dprint("Sending email report of failure on %s to %s"
    434                                  % (jobname, job.user))
    435                 mailfailure(jobname, job, message)
    436 
    437             # Upload perf values to the perf dashboard, if applicable.
    438             for test in job.tests:
    439                 perf_uploader.upload_test(job, test, jobname)
    440 
    441             # Upload job details to Sponge.
    442             sponge_url = sponge_utils.upload_results(job, log=tko_utils.dprint)
    443             if sponge_url:
    444                 job.keyval_dict['sponge_url'] = sponge_url
    445 
    446             # write the job into the database.
    447             job_data = db.insert_job(
    448                 jobname, job,
    449                 parent_job_id=job_keyval.get(constants.PARENT_JOB_ID, None))
    450 
    451             # Verify the job data is written to the database.
    452             if job.tests:
    453                 tests_in_db = db.find_tests(job_data['job_idx'])
    454                 tests_in_db_count = len(tests_in_db) if tests_in_db else 0
    455                 if tests_in_db_count != len(job.tests):
    456                     tko_utils.dprint(
    457                             'Failed to find enough tests for job_idx: %d. The '
    458                             'job should have %d tests, only found %d tests.' %
    459                             (job_data['job_idx'], len(job.tests),
    460                              tests_in_db_count))
    461                     metrics.Counter(
    462                             'chromeos/autotest/result/db_save_failure',
    463                             description='The number of times parse failed to '
    464                             'save job to TKO database.').increment()
    465 
    466             # Although the cursor has autocommit, we still need to force it to
    467             # commit existing changes before we can use django models, otherwise
    468             # it will go into deadlock when django models try to start a new
    469             # trasaction while the current one has not finished yet.
    470             db.commit()
    471 
    472             # Handle retry job.
    473             orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID,
    474                                              None)
    475             if orig_afe_job_id:
    476                 orig_job_idx = tko_models.Job.objects.get(
    477                         afe_job_id=orig_afe_job_id).job_idx
    478                 _invalidate_original_tests(orig_job_idx, job.index)
    479     except Exception as e:
    480         tko_utils.dprint("Hit exception while uploading to tko db:\n%s" %
    481                          traceback.format_exc())
    482         raise e
    483 
    484     # Serializing job into a binary file
    485     export_tko_to_file = global_config.global_config.get_config_value(
    486             'AUTOSERV', 'export_tko_job_to_file', type=bool, default=False)
    487 
    488     binary_file_name = os.path.join(path, "job.serialize")
    489     if export_tko_to_file:
    490         export_tko_job_to_file(job, jobname, binary_file_name)
    491 
    492     if reparse:
    493         site_export_file = "autotest_lib.tko.site_export"
    494         site_export = utils.import_site_function(__file__,
    495                                                  site_export_file,
    496                                                  "site_export",
    497                                                  _site_export_dummy)
    498         site_export(binary_file_name)
    499 
    500     if not dry_run:
    501         db.commit()
    502 
    503     # Generate a suite report.
    504     # Check whether this is a suite job, a suite job will be a hostless job, its
    505     # jobname will be <JOB_ID>-<USERNAME>/hostless, the suite field will not be
    506     # NULL. Only generate timeline report when datastore_parent_key is given.
    507     try:
    508         datastore_parent_key = job_keyval.get('datastore_parent_key', None)
    509         if (suite_report and jobname.endswith('/hostless')
    510             and job_data['suite'] and datastore_parent_key):
    511             tko_utils.dprint('Start dumping suite timing report...')
    512             timing_log = os.path.join(path, 'suite_timing.log')
    513             dump_cmd = ("%s/site_utils/dump_suite_report.py %s "
    514                         "--output='%s' --debug" %
    515                         (common.autotest_dir, job_data['afe_job_id'],
    516                          timing_log))
    517             subprocess.check_output(dump_cmd, shell=True)
    518             tko_utils.dprint('Successfully finish dumping suite timing report')
    519 
    520             if (datastore_creds and export_to_gcloud_path
    521                 and os.path.exists(export_to_gcloud_path)):
    522                 upload_cmd = [export_to_gcloud_path, datastore_creds,
    523                               timing_log, '--parent_key',
    524                               datastore_parent_key]
    525                 tko_utils.dprint('Start exporting timeline report to gcloud')
    526                 subprocess.check_output(upload_cmd)
    527                 tko_utils.dprint('Successfully export timeline report to '
    528                                  'gcloud')
    529             else:
    530                 tko_utils.dprint('DEBUG: skip exporting suite timeline to '
    531                                  'gcloud, because either gcloud creds or '
    532                                  'export_to_gcloud script is not found.')
    533     except Exception as e:
    534         tko_utils.dprint("WARNING: fail to dump/export suite report. "
    535                          "Error:\n%s" % e)
    536 
    537     # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of
    538     # the function, so any failure, e.g., db connection error, will stop
    539     # gs_offloader_instructions being updated, and logs can be uploaded for
    540     # troubleshooting.
    541     if job_successful:
    542         # Check if we should not offload this test's results.
    543         if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False):
    544             # Update the gs_offloader_instructions json file.
    545             gs_instructions_file = os.path.join(
    546                     path, constants.GS_OFFLOADER_INSTRUCTIONS)
    547             gs_offloader_instructions = {}
    548             if os.path.exists(gs_instructions_file):
    549                 with open(gs_instructions_file, 'r') as f:
    550                     gs_offloader_instructions = json.load(f)
    551 
    552             gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True
    553             with open(gs_instructions_file, 'w') as f:
    554                 json.dump(gs_offloader_instructions, f)
    555 
    556 
    557 def _site_export_dummy(binary_file_name):
    558     pass
    559 
    560 
    561 def _get_job_subdirs(path):
    562     """
    563     Returns a list of job subdirectories at path. Returns None if the test
    564     is itself a job directory. Does not recurse into the subdirs.
    565     """
    566     # if there's a .machines file, use it to get the subdirs
    567     machine_list = os.path.join(path, ".machines")
    568     if os.path.exists(machine_list):
    569         subdirs = set(line.strip() for line in file(machine_list))
    570         existing_subdirs = set(subdir for subdir in subdirs
    571                                if os.path.exists(os.path.join(path, subdir)))
    572         if len(existing_subdirs) != 0:
    573             return existing_subdirs
    574 
    575     # if this dir contains ONLY subdirectories, return them
    576     contents = set(os.listdir(path))
    577     contents.discard(".parse.lock")
    578     subdirs = set(sub for sub in contents if
    579                   os.path.isdir(os.path.join(path, sub)))
    580     if len(contents) == len(subdirs) != 0:
    581         return subdirs
    582 
    583     # this is a job directory, or something else we don't understand
    584     return None
    585 
    586 
    587 def parse_leaf_path(db, path, level, parse_options):
    588     """Parse a leaf path.
    589 
    590     @param db: database handle.
    591     @param path: The path to the results to be parsed.
    592     @param level: Integer, level of subdirectories to include in the job name.
    593     @param parse_options: _ParseOptions instance.
    594 
    595     @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1'
    596     """
    597     job_elements = path.split("/")[-level:]
    598     jobname = "/".join(job_elements)
    599     try:
    600         db.run_with_retry(parse_one, db, jobname, path, parse_options)
    601     except Exception as e:
    602         tko_utils.dprint("Error parsing leaf path: %s\nException:\n%s\n%s" %
    603                          (path, e, traceback.format_exc()))
    604     return jobname
    605 
    606 
    607 def parse_path(db, path, level, parse_options):
    608     """Parse a path
    609 
    610     @param db: database handle.
    611     @param path: The path to the results to be parsed.
    612     @param level: Integer, level of subdirectories to include in the job name.
    613     @param parse_options: _ParseOptions instance.
    614 
    615     @returns: A set of job names of the parsed jobs.
    616               set(['123-chromeos-test/host1', '123-chromeos-test/host2'])
    617     """
    618     processed_jobs = set()
    619     job_subdirs = _get_job_subdirs(path)
    620     if job_subdirs is not None:
    621         # parse status.log in current directory, if it exists. multi-machine
    622         # synchronous server side tests record output in this directory. without
    623         # this check, we do not parse these results.
    624         if os.path.exists(os.path.join(path, 'status.log')):
    625             new_job = parse_leaf_path(db, path, level, parse_options)
    626             processed_jobs.add(new_job)
    627         # multi-machine job
    628         for subdir in job_subdirs:
    629             jobpath = os.path.join(path, subdir)
    630             new_jobs = parse_path(db, jobpath, level + 1, parse_options)
    631             processed_jobs.update(new_jobs)
    632     else:
    633         # single machine job
    634         new_job = parse_leaf_path(db, path, level, parse_options)
    635         processed_jobs.add(new_job)
    636     return processed_jobs
    637 
    638 
    639 def record_parsing(processed_jobs, duration_secs):
    640     """Record the time spent on parsing to metadata db.
    641 
    642     @param processed_jobs: A set of job names of the parsed jobs.
    643               set(['123-chromeos-test/host1', '123-chromeos-test/host2'])
    644     @param duration_secs: Total time spent on parsing, in seconds.
    645     """
    646 
    647     for job_name in processed_jobs:
    648         job_id, hostname = tko_utils.get_afe_job_id_and_hostname(job_name)
    649         if not job_id or not hostname:
    650             tko_utils.dprint('ERROR: can not parse job name %s, '
    651                              'will not send duration to metadata db.'
    652                              % job_name)
    653             continue
    654         else:
    655             job_overhead.record_state_duration(
    656                     job_id, hostname, job_overhead.STATUS.PARSING,
    657                     duration_secs)
    658 
    659 def _detach_from_parent_process():
    660     """Allow reparenting the parse process away from caller.
    661 
    662     When monitor_db is run via upstart, restarting the job sends SIGTERM to
    663     the whole process group. This makes us immune from that.
    664     """
    665     if os.getpid() != os.getpgid(0):
    666         os.setsid()
    667 
    668 def main():
    669     """Main entrance."""
    670     start_time = datetime.datetime.now()
    671     # Record the processed jobs so that
    672     # we can send the duration of parsing to metadata db.
    673     processed_jobs = set()
    674 
    675     options, args = parse_args()
    676 
    677     if options.detach:
    678         _detach_from_parent_process()
    679 
    680     parse_options = _ParseOptions(options.reparse, options.mailit,
    681                                   options.dry_run, options.suite_report,
    682                                   options.datastore_creds,
    683                                   options.export_to_gcloud_path)
    684     results_dir = os.path.abspath(args[0])
    685     assert os.path.exists(results_dir)
    686 
    687     site_utils.SetupTsMonGlobalState('tko_parse', indirect=False,
    688                                      short_lived=True)
    689 
    690     pid_file_manager = pidfile.PidFileManager("parser", results_dir)
    691 
    692     if options.write_pidfile:
    693         pid_file_manager.open_file()
    694 
    695     try:
    696         # build up the list of job dirs to parse
    697         if options.singledir:
    698             jobs_list = [results_dir]
    699         else:
    700             jobs_list = [os.path.join(results_dir, subdir)
    701                          for subdir in os.listdir(results_dir)]
    702 
    703         # build up the database
    704         db = tko_db.db(autocommit=False, host=options.db_host,
    705                        user=options.db_user, password=options.db_pass,
    706                        database=options.db_name)
    707 
    708         # parse all the jobs
    709         for path in jobs_list:
    710             lockfile = open(os.path.join(path, ".parse.lock"), "w")
    711             flags = fcntl.LOCK_EX
    712             if options.noblock:
    713                 flags |= fcntl.LOCK_NB
    714             try:
    715                 fcntl.flock(lockfile, flags)
    716             except IOError, e:
    717                 # lock is not available and nonblock has been requested
    718                 if e.errno == errno.EWOULDBLOCK:
    719                     lockfile.close()
    720                     continue
    721                 else:
    722                     raise # something unexpected happened
    723             try:
    724                 new_jobs = parse_path(db, path, options.level, parse_options)
    725                 processed_jobs.update(new_jobs)
    726 
    727             finally:
    728                 fcntl.flock(lockfile, fcntl.LOCK_UN)
    729                 lockfile.close()
    730 
    731     except Exception as e:
    732         pid_file_manager.close_file(1)
    733         raise
    734     else:
    735         pid_file_manager.close_file(0)
    736     finally:
    737         metrics.Flush()
    738     duration_secs = (datetime.datetime.now() - start_time).total_seconds()
    739     if options.record_duration:
    740         record_parsing(processed_jobs, duration_secs)
    741 
    742 
    743 if __name__ == "__main__":
    744     main()
    745