1 #!/usr/bin/python -u 2 3 import collections 4 import datetime 5 import errno 6 import fcntl 7 import json 8 import optparse 9 import os 10 import socket 11 import subprocess 12 import sys 13 import time 14 import traceback 15 16 import common 17 from autotest_lib.client.bin.result_tools import utils as result_utils 18 from autotest_lib.client.bin.result_tools import utils_lib as result_utils_lib 19 from autotest_lib.client.bin.result_tools import runner as result_runner 20 from autotest_lib.client.common_lib import control_data 21 from autotest_lib.client.common_lib import global_config 22 from autotest_lib.client.common_lib import mail, pidfile 23 from autotest_lib.client.common_lib import utils 24 from autotest_lib.frontend import setup_django_environment 25 from autotest_lib.frontend.tko import models as tko_models 26 from autotest_lib.server import site_utils 27 from autotest_lib.server.cros.dynamic_suite import constants 28 from autotest_lib.site_utils import job_overhead 29 from autotest_lib.site_utils.sponge_lib import sponge_utils 30 from autotest_lib.tko import db as tko_db, utils as tko_utils 31 from autotest_lib.tko import models, parser_lib 32 from autotest_lib.tko.perf_upload import perf_uploader 33 34 try: 35 from chromite.lib import metrics 36 except ImportError: 37 metrics = utils.metrics_mock 38 39 40 _ParseOptions = collections.namedtuple( 41 'ParseOptions', ['reparse', 'mail_on_failure', 'dry_run', 'suite_report', 42 'datastore_creds', 'export_to_gcloud_path']) 43 44 def parse_args(): 45 """Parse args.""" 46 # build up our options parser and parse sys.argv 47 parser = optparse.OptionParser() 48 parser.add_option("-m", help="Send mail for FAILED tests", 49 dest="mailit", action="store_true") 50 parser.add_option("-r", help="Reparse the results of a job", 51 dest="reparse", action="store_true") 52 parser.add_option("-o", help="Parse a single results directory", 53 dest="singledir", action="store_true") 54 parser.add_option("-l", help=("Levels of subdirectories to include " 55 "in the job name"), 56 type="int", dest="level", default=1) 57 parser.add_option("-n", help="No blocking on an existing parse", 58 dest="noblock", action="store_true") 59 parser.add_option("-s", help="Database server hostname", 60 dest="db_host", action="store") 61 parser.add_option("-u", help="Database username", dest="db_user", 62 action="store") 63 parser.add_option("-p", help="Database password", dest="db_pass", 64 action="store") 65 parser.add_option("-d", help="Database name", dest="db_name", 66 action="store") 67 parser.add_option("--dry-run", help="Do not actually commit any results.", 68 dest="dry_run", action="store_true", default=False) 69 parser.add_option( 70 "--detach", action="store_true", 71 help="Detach parsing process from the caller process. Used by " 72 "monitor_db to safely restart without affecting parsing.", 73 default=False) 74 parser.add_option("--write-pidfile", 75 help="write pidfile (.parser_execute)", 76 dest="write_pidfile", action="store_true", 77 default=False) 78 parser.add_option("--record-duration", 79 help="Record timing to metadata db", 80 dest="record_duration", action="store_true", 81 default=False) 82 parser.add_option("--suite-report", 83 help=("Allows parsing job to attempt to create a suite " 84 "timeline report, if it detects that the job being " 85 "parsed is a suite job."), 86 dest="suite_report", action="store_true", 87 default=False) 88 parser.add_option("--datastore-creds", 89 help=("The path to gcloud datastore credentials file, " 90 "which will be used to upload suite timeline " 91 "report to gcloud. If not specified, the one " 92 "defined in shadow_config will be used."), 93 dest="datastore_creds", action="store", default=None) 94 parser.add_option("--export-to-gcloud-path", 95 help=("The path to export_to_gcloud script. Please find " 96 "chromite path on your server. The script is under " 97 "chromite/bin/."), 98 dest="export_to_gcloud_path", action="store", 99 default=None) 100 options, args = parser.parse_args() 101 102 # we need a results directory 103 if len(args) == 0: 104 tko_utils.dprint("ERROR: at least one results directory must " 105 "be provided") 106 parser.print_help() 107 sys.exit(1) 108 109 if not options.datastore_creds: 110 gcloud_creds = global_config.global_config.get_config_value( 111 'GCLOUD', 'cidb_datastore_writer_creds', default=None) 112 options.datastore_creds = (site_utils.get_creds_abspath(gcloud_creds) 113 if gcloud_creds else None) 114 115 if not options.export_to_gcloud_path: 116 export_script = 'chromiumos/chromite/bin/export_to_gcloud' 117 # If it is a lab server, the script is under ~chromeos-test/ 118 if os.path.exists(os.path.expanduser('~chromeos-test/%s' % 119 export_script)): 120 path = os.path.expanduser('~chromeos-test/%s' % export_script) 121 # If it is a local workstation, it is probably under ~/ 122 elif os.path.exists(os.path.expanduser('~/%s' % export_script)): 123 path = os.path.expanduser('~/%s' % export_script) 124 # If it is not found anywhere, the default will be set to None. 125 else: 126 path = None 127 options.export_to_gcloud_path = path 128 129 # pass the options back 130 return options, args 131 132 133 def format_failure_message(jobname, kernel, testname, status, reason): 134 """Format failure message with the given information. 135 136 @param jobname: String representing the job name. 137 @param kernel: String representing the kernel. 138 @param testname: String representing the test name. 139 @param status: String representing the test status. 140 @param reason: String representing the reason. 141 142 @return: Failure message as a string. 143 """ 144 format_string = "%-12s %-20s %-12s %-10s %s" 145 return format_string % (jobname, kernel, testname, status, reason) 146 147 148 def mailfailure(jobname, job, message): 149 """Send an email about the failure. 150 151 @param jobname: String representing the job name. 152 @param job: A job object. 153 @param message: The message to mail. 154 """ 155 message_lines = [""] 156 message_lines.append("The following tests FAILED for this job") 157 message_lines.append("http://%s/results/%s" % 158 (socket.gethostname(), jobname)) 159 message_lines.append("") 160 message_lines.append(format_failure_message("Job name", "Kernel", 161 "Test name", "FAIL/WARN", 162 "Failure reason")) 163 message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8, 164 "=" * 8, "=" * 14)) 165 message_header = "\n".join(message_lines) 166 167 subject = "AUTOTEST: FAILED tests from job %s" % jobname 168 mail.send("", job.user, "", subject, message_header + message) 169 170 171 def _invalidate_original_tests(orig_job_idx, retry_job_idx): 172 """Retry tests invalidates original tests. 173 174 Whenever a retry job is complete, we want to invalidate the original 175 job's test results, such that the consumers of the tko database 176 (e.g. tko frontend, wmatrix) could figure out which results are the latest. 177 178 When a retry job is parsed, we retrieve the original job's afe_job_id 179 from the retry job's keyvals, which is then converted to tko job_idx and 180 passed into this method as |orig_job_idx|. 181 182 In this method, we are going to invalidate the rows in tko_tests that are 183 associated with the original job by flipping their 'invalid' bit to True. 184 In addition, in tko_tests, we also maintain a pointer from the retry results 185 to the original results, so that later we can always know which rows in 186 tko_tests are retries and which are the corresponding original results. 187 This is done by setting the field 'invalidates_test_idx' of the tests 188 associated with the retry job. 189 190 For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after 191 this method is run, their tko_tests rows will look like: 192 __________________________________________________________________________ 193 test_idx| job_idx | test | ... | invalid | invalidates_test_idx 194 10 | 105 | dummy_Fail.Error| ... | 1 | NULL 195 11 | 105 | dummy_Fail.Fail | ... | 1 | NULL 196 ... 197 20 | 108 | dummy_Fail.Error| ... | 0 | 10 198 21 | 108 | dummy_Fail.Fail | ... | 0 | 11 199 __________________________________________________________________________ 200 Note the invalid bits of the rows for Job(job_idx=105) are set to '1'. 201 And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108) 202 are set to 10 and 11 (the test_idx of the rows for the original job). 203 204 @param orig_job_idx: An integer representing the original job's 205 tko job_idx. Tests associated with this job will 206 be marked as 'invalid'. 207 @param retry_job_idx: An integer representing the retry job's 208 tko job_idx. The field 'invalidates_test_idx' 209 of the tests associated with this job will be updated. 210 211 """ 212 msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx) 213 if not orig_job_idx or not retry_job_idx: 214 tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg) 215 # Using django models here makes things easier, but make sure that 216 # before this method is called, all other relevant transactions have been 217 # committed to avoid race condition. In the long run, we might consider 218 # to make the rest of parser use django models. 219 orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx) 220 retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx) 221 222 # Invalidate original tests. 223 orig_tests.update(invalid=True) 224 225 # Maintain a dictionary that maps (test, subdir) to original tests. 226 # Note that within the scope of a job, (test, subdir) uniquelly 227 # identifies a test run, but 'test' does not. 228 # In a control file, one could run the same test with different 229 # 'subdir_tag', for example, 230 # job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_1') 231 # job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_2') 232 # In tko, we will get 233 # (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_1') 234 # (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_2') 235 invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test 236 for orig_test in orig_tests} 237 for retry in retry_tests: 238 # It is possible that (retry.test, retry.subdir) doesn't exist 239 # in invalidated_tests. This could happen when the original job 240 # didn't run some of its tests. For example, a dut goes offline 241 # since the beginning of the job, in which case invalidated_tests 242 # will only have one entry for 'SERVER_JOB'. 243 orig_test = invalidated_tests.get((retry.test, retry.subdir), None) 244 if orig_test: 245 retry.invalidates_test = orig_test 246 retry.save() 247 tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg) 248 249 250 def _throttle_result_size(path): 251 """Limit the total size of test results for the given path. 252 253 @param path: Path of the result directory. 254 """ 255 if not result_runner.ENABLE_RESULT_THROTTLING: 256 tko_utils.dprint( 257 'Result throttling is not enabled. Skipping throttling %s' % 258 path) 259 return 260 261 max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB 262 # Client side test saves the test control to file `control`, while server 263 # side test saves the test control to file `control.srv` 264 for control_file in ['control', 'control.srv']: 265 control = os.path.join(path, control_file) 266 try: 267 max_result_size_KB = control_data.parse_control( 268 control, raise_warnings=False).max_result_size_KB 269 # Any value different from the default is considered to be the one 270 # set in the test control file. 271 if max_result_size_KB != control_data.DEFAULT_MAX_RESULT_SIZE_KB: 272 break 273 except IOError as e: 274 tko_utils.dprint( 275 'Failed to access %s. Error: %s\nDetails %s' % 276 (control, e, traceback.format_exc())) 277 except control_data.ControlVariableException as e: 278 tko_utils.dprint( 279 'Failed to parse %s. Error: %s\nDetails %s' % 280 (control, e, traceback.format_exc())) 281 282 try: 283 result_utils.execute(path, max_result_size_KB) 284 except: 285 tko_utils.dprint( 286 'Failed to throttle result size of %s.\nDetails %s' % 287 (path, traceback.format_exc())) 288 289 290 def export_tko_job_to_file(job, jobname, filename): 291 """Exports the tko job to disk file. 292 293 @param job: database object. 294 @param jobname: the job name as string. 295 @param filename: The path to the results to be parsed. 296 """ 297 try: 298 from autotest_lib.tko import job_serializer 299 300 serializer = job_serializer.JobSerializer() 301 serializer.serialize_to_binary(job, jobname, filename) 302 except ImportError: 303 tko_utils.dprint("WARNING: tko_pb2.py doesn't exist. Create by " 304 "compiling tko/tko.proto.") 305 306 307 def parse_one(db, jobname, path, parse_options): 308 """Parse a single job. Optionally send email on failure. 309 310 @param db: database object. 311 @param jobname: the tag used to search for existing job in db, 312 e.g. '1234-chromeos-test/host1' 313 @param path: The path to the results to be parsed. 314 @param parse_options: _ParseOptions instance. 315 """ 316 reparse = parse_options.reparse 317 mail_on_failure = parse_options.mail_on_failure 318 dry_run = parse_options.dry_run 319 suite_report = parse_options.suite_report 320 datastore_creds = parse_options.datastore_creds 321 export_to_gcloud_path = parse_options.export_to_gcloud_path 322 323 tko_utils.dprint("\nScanning %s (%s)" % (jobname, path)) 324 old_job_idx = db.find_job(jobname) 325 # old tests is a dict from tuple (test_name, subdir) to test_idx 326 old_tests = {} 327 if old_job_idx is not None: 328 if not reparse: 329 tko_utils.dprint("! Job is already parsed, done") 330 return 331 332 raw_old_tests = db.select("test_idx,subdir,test", "tko_tests", 333 {"job_idx": old_job_idx}) 334 if raw_old_tests: 335 old_tests = dict(((test, subdir), test_idx) 336 for test_idx, subdir, test in raw_old_tests) 337 338 # look up the status version 339 job_keyval = models.job.read_keyval(path) 340 status_version = job_keyval.get("status_version", 0) 341 342 # parse out the job 343 parser = parser_lib.parser(status_version) 344 job = parser.make_job(path) 345 status_log = os.path.join(path, "status.log") 346 if not os.path.exists(status_log): 347 status_log = os.path.join(path, "status") 348 if not os.path.exists(status_log): 349 tko_utils.dprint("! Unable to parse job, no status file") 350 return 351 352 # parse the status logs 353 tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname)) 354 status_lines = open(status_log).readlines() 355 parser.start(job) 356 tests = parser.end(status_lines) 357 358 # parser.end can return the same object multiple times, so filter out dups 359 job.tests = [] 360 already_added = set() 361 for test in tests: 362 if test not in already_added: 363 already_added.add(test) 364 job.tests.append(test) 365 366 # try and port test_idx over from the old tests, but if old tests stop 367 # matching up with new ones just give up 368 if reparse and old_job_idx is not None: 369 job.index = old_job_idx 370 for test in job.tests: 371 test_idx = old_tests.pop((test.testname, test.subdir), None) 372 if test_idx is not None: 373 test.test_idx = test_idx 374 else: 375 tko_utils.dprint("! Reparse returned new test " 376 "testname=%r subdir=%r" % 377 (test.testname, test.subdir)) 378 if not dry_run: 379 for test_idx in old_tests.itervalues(): 380 where = {'test_idx' : test_idx} 381 db.delete('tko_iteration_result', where) 382 db.delete('tko_iteration_perf_value', where) 383 db.delete('tko_iteration_attributes', where) 384 db.delete('tko_test_attributes', where) 385 db.delete('tko_test_labels_tests', {'test_id': test_idx}) 386 db.delete('tko_tests', where) 387 388 job.build = None 389 job.board = None 390 job.build_version = None 391 job.suite = None 392 if job.label: 393 label_info = site_utils.parse_job_name(job.label) 394 if label_info: 395 job.build = label_info.get('build', None) 396 job.build_version = label_info.get('build_version', None) 397 job.board = label_info.get('board', None) 398 job.suite = label_info.get('suite', None) 399 400 result_utils_lib.LOG = tko_utils.dprint 401 _throttle_result_size(path) 402 403 # Record test result size to job_keyvals 404 start_time = time.time() 405 result_size_info = site_utils.collect_result_sizes( 406 path, log=tko_utils.dprint) 407 tko_utils.dprint('Finished collecting result sizes after %s seconds' % 408 (time.time()-start_time)) 409 job.keyval_dict.update(result_size_info.__dict__) 410 411 # TODO(dshi): Update sizes with sponge_invocation.xml and throttle it. 412 413 # check for failures 414 message_lines = [""] 415 job_successful = True 416 for test in job.tests: 417 if not test.subdir: 418 continue 419 tko_utils.dprint("* testname, subdir, status, reason: %s %s %s %s" 420 % (test.testname, test.subdir, test.status, 421 test.reason)) 422 if test.status != 'GOOD': 423 job_successful = False 424 message_lines.append(format_failure_message( 425 jobname, test.kernel.base, test.subdir, 426 test.status, test.reason)) 427 try: 428 message = "\n".join(message_lines) 429 430 if not dry_run: 431 # send out a email report of failure 432 if len(message) > 2 and mail_on_failure: 433 tko_utils.dprint("Sending email report of failure on %s to %s" 434 % (jobname, job.user)) 435 mailfailure(jobname, job, message) 436 437 # Upload perf values to the perf dashboard, if applicable. 438 for test in job.tests: 439 perf_uploader.upload_test(job, test, jobname) 440 441 # Upload job details to Sponge. 442 sponge_url = sponge_utils.upload_results(job, log=tko_utils.dprint) 443 if sponge_url: 444 job.keyval_dict['sponge_url'] = sponge_url 445 446 # write the job into the database. 447 job_data = db.insert_job( 448 jobname, job, 449 parent_job_id=job_keyval.get(constants.PARENT_JOB_ID, None)) 450 451 # Verify the job data is written to the database. 452 if job.tests: 453 tests_in_db = db.find_tests(job_data['job_idx']) 454 tests_in_db_count = len(tests_in_db) if tests_in_db else 0 455 if tests_in_db_count != len(job.tests): 456 tko_utils.dprint( 457 'Failed to find enough tests for job_idx: %d. The ' 458 'job should have %d tests, only found %d tests.' % 459 (job_data['job_idx'], len(job.tests), 460 tests_in_db_count)) 461 metrics.Counter( 462 'chromeos/autotest/result/db_save_failure', 463 description='The number of times parse failed to ' 464 'save job to TKO database.').increment() 465 466 # Although the cursor has autocommit, we still need to force it to 467 # commit existing changes before we can use django models, otherwise 468 # it will go into deadlock when django models try to start a new 469 # trasaction while the current one has not finished yet. 470 db.commit() 471 472 # Handle retry job. 473 orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID, 474 None) 475 if orig_afe_job_id: 476 orig_job_idx = tko_models.Job.objects.get( 477 afe_job_id=orig_afe_job_id).job_idx 478 _invalidate_original_tests(orig_job_idx, job.index) 479 except Exception as e: 480 tko_utils.dprint("Hit exception while uploading to tko db:\n%s" % 481 traceback.format_exc()) 482 raise e 483 484 # Serializing job into a binary file 485 export_tko_to_file = global_config.global_config.get_config_value( 486 'AUTOSERV', 'export_tko_job_to_file', type=bool, default=False) 487 488 binary_file_name = os.path.join(path, "job.serialize") 489 if export_tko_to_file: 490 export_tko_job_to_file(job, jobname, binary_file_name) 491 492 if reparse: 493 site_export_file = "autotest_lib.tko.site_export" 494 site_export = utils.import_site_function(__file__, 495 site_export_file, 496 "site_export", 497 _site_export_dummy) 498 site_export(binary_file_name) 499 500 if not dry_run: 501 db.commit() 502 503 # Generate a suite report. 504 # Check whether this is a suite job, a suite job will be a hostless job, its 505 # jobname will be <JOB_ID>-<USERNAME>/hostless, the suite field will not be 506 # NULL. Only generate timeline report when datastore_parent_key is given. 507 try: 508 datastore_parent_key = job_keyval.get('datastore_parent_key', None) 509 if (suite_report and jobname.endswith('/hostless') 510 and job_data['suite'] and datastore_parent_key): 511 tko_utils.dprint('Start dumping suite timing report...') 512 timing_log = os.path.join(path, 'suite_timing.log') 513 dump_cmd = ("%s/site_utils/dump_suite_report.py %s " 514 "--output='%s' --debug" % 515 (common.autotest_dir, job_data['afe_job_id'], 516 timing_log)) 517 subprocess.check_output(dump_cmd, shell=True) 518 tko_utils.dprint('Successfully finish dumping suite timing report') 519 520 if (datastore_creds and export_to_gcloud_path 521 and os.path.exists(export_to_gcloud_path)): 522 upload_cmd = [export_to_gcloud_path, datastore_creds, 523 timing_log, '--parent_key', 524 datastore_parent_key] 525 tko_utils.dprint('Start exporting timeline report to gcloud') 526 subprocess.check_output(upload_cmd) 527 tko_utils.dprint('Successfully export timeline report to ' 528 'gcloud') 529 else: 530 tko_utils.dprint('DEBUG: skip exporting suite timeline to ' 531 'gcloud, because either gcloud creds or ' 532 'export_to_gcloud script is not found.') 533 except Exception as e: 534 tko_utils.dprint("WARNING: fail to dump/export suite report. " 535 "Error:\n%s" % e) 536 537 # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of 538 # the function, so any failure, e.g., db connection error, will stop 539 # gs_offloader_instructions being updated, and logs can be uploaded for 540 # troubleshooting. 541 if job_successful: 542 # Check if we should not offload this test's results. 543 if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False): 544 # Update the gs_offloader_instructions json file. 545 gs_instructions_file = os.path.join( 546 path, constants.GS_OFFLOADER_INSTRUCTIONS) 547 gs_offloader_instructions = {} 548 if os.path.exists(gs_instructions_file): 549 with open(gs_instructions_file, 'r') as f: 550 gs_offloader_instructions = json.load(f) 551 552 gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True 553 with open(gs_instructions_file, 'w') as f: 554 json.dump(gs_offloader_instructions, f) 555 556 557 def _site_export_dummy(binary_file_name): 558 pass 559 560 561 def _get_job_subdirs(path): 562 """ 563 Returns a list of job subdirectories at path. Returns None if the test 564 is itself a job directory. Does not recurse into the subdirs. 565 """ 566 # if there's a .machines file, use it to get the subdirs 567 machine_list = os.path.join(path, ".machines") 568 if os.path.exists(machine_list): 569 subdirs = set(line.strip() for line in file(machine_list)) 570 existing_subdirs = set(subdir for subdir in subdirs 571 if os.path.exists(os.path.join(path, subdir))) 572 if len(existing_subdirs) != 0: 573 return existing_subdirs 574 575 # if this dir contains ONLY subdirectories, return them 576 contents = set(os.listdir(path)) 577 contents.discard(".parse.lock") 578 subdirs = set(sub for sub in contents if 579 os.path.isdir(os.path.join(path, sub))) 580 if len(contents) == len(subdirs) != 0: 581 return subdirs 582 583 # this is a job directory, or something else we don't understand 584 return None 585 586 587 def parse_leaf_path(db, path, level, parse_options): 588 """Parse a leaf path. 589 590 @param db: database handle. 591 @param path: The path to the results to be parsed. 592 @param level: Integer, level of subdirectories to include in the job name. 593 @param parse_options: _ParseOptions instance. 594 595 @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1' 596 """ 597 job_elements = path.split("/")[-level:] 598 jobname = "/".join(job_elements) 599 try: 600 db.run_with_retry(parse_one, db, jobname, path, parse_options) 601 except Exception as e: 602 tko_utils.dprint("Error parsing leaf path: %s\nException:\n%s\n%s" % 603 (path, e, traceback.format_exc())) 604 return jobname 605 606 607 def parse_path(db, path, level, parse_options): 608 """Parse a path 609 610 @param db: database handle. 611 @param path: The path to the results to be parsed. 612 @param level: Integer, level of subdirectories to include in the job name. 613 @param parse_options: _ParseOptions instance. 614 615 @returns: A set of job names of the parsed jobs. 616 set(['123-chromeos-test/host1', '123-chromeos-test/host2']) 617 """ 618 processed_jobs = set() 619 job_subdirs = _get_job_subdirs(path) 620 if job_subdirs is not None: 621 # parse status.log in current directory, if it exists. multi-machine 622 # synchronous server side tests record output in this directory. without 623 # this check, we do not parse these results. 624 if os.path.exists(os.path.join(path, 'status.log')): 625 new_job = parse_leaf_path(db, path, level, parse_options) 626 processed_jobs.add(new_job) 627 # multi-machine job 628 for subdir in job_subdirs: 629 jobpath = os.path.join(path, subdir) 630 new_jobs = parse_path(db, jobpath, level + 1, parse_options) 631 processed_jobs.update(new_jobs) 632 else: 633 # single machine job 634 new_job = parse_leaf_path(db, path, level, parse_options) 635 processed_jobs.add(new_job) 636 return processed_jobs 637 638 639 def record_parsing(processed_jobs, duration_secs): 640 """Record the time spent on parsing to metadata db. 641 642 @param processed_jobs: A set of job names of the parsed jobs. 643 set(['123-chromeos-test/host1', '123-chromeos-test/host2']) 644 @param duration_secs: Total time spent on parsing, in seconds. 645 """ 646 647 for job_name in processed_jobs: 648 job_id, hostname = tko_utils.get_afe_job_id_and_hostname(job_name) 649 if not job_id or not hostname: 650 tko_utils.dprint('ERROR: can not parse job name %s, ' 651 'will not send duration to metadata db.' 652 % job_name) 653 continue 654 else: 655 job_overhead.record_state_duration( 656 job_id, hostname, job_overhead.STATUS.PARSING, 657 duration_secs) 658 659 def _detach_from_parent_process(): 660 """Allow reparenting the parse process away from caller. 661 662 When monitor_db is run via upstart, restarting the job sends SIGTERM to 663 the whole process group. This makes us immune from that. 664 """ 665 if os.getpid() != os.getpgid(0): 666 os.setsid() 667 668 def main(): 669 """Main entrance.""" 670 start_time = datetime.datetime.now() 671 # Record the processed jobs so that 672 # we can send the duration of parsing to metadata db. 673 processed_jobs = set() 674 675 options, args = parse_args() 676 677 if options.detach: 678 _detach_from_parent_process() 679 680 parse_options = _ParseOptions(options.reparse, options.mailit, 681 options.dry_run, options.suite_report, 682 options.datastore_creds, 683 options.export_to_gcloud_path) 684 results_dir = os.path.abspath(args[0]) 685 assert os.path.exists(results_dir) 686 687 site_utils.SetupTsMonGlobalState('tko_parse', indirect=False, 688 short_lived=True) 689 690 pid_file_manager = pidfile.PidFileManager("parser", results_dir) 691 692 if options.write_pidfile: 693 pid_file_manager.open_file() 694 695 try: 696 # build up the list of job dirs to parse 697 if options.singledir: 698 jobs_list = [results_dir] 699 else: 700 jobs_list = [os.path.join(results_dir, subdir) 701 for subdir in os.listdir(results_dir)] 702 703 # build up the database 704 db = tko_db.db(autocommit=False, host=options.db_host, 705 user=options.db_user, password=options.db_pass, 706 database=options.db_name) 707 708 # parse all the jobs 709 for path in jobs_list: 710 lockfile = open(os.path.join(path, ".parse.lock"), "w") 711 flags = fcntl.LOCK_EX 712 if options.noblock: 713 flags |= fcntl.LOCK_NB 714 try: 715 fcntl.flock(lockfile, flags) 716 except IOError, e: 717 # lock is not available and nonblock has been requested 718 if e.errno == errno.EWOULDBLOCK: 719 lockfile.close() 720 continue 721 else: 722 raise # something unexpected happened 723 try: 724 new_jobs = parse_path(db, path, options.level, parse_options) 725 processed_jobs.update(new_jobs) 726 727 finally: 728 fcntl.flock(lockfile, fcntl.LOCK_UN) 729 lockfile.close() 730 731 except Exception as e: 732 pid_file_manager.close_file(1) 733 raise 734 else: 735 pid_file_manager.close_file(0) 736 finally: 737 metrics.Flush() 738 duration_secs = (datetime.datetime.now() - start_time).total_seconds() 739 if options.record_duration: 740 record_parsing(processed_jobs, duration_secs) 741 742 743 if __name__ == "__main__": 744 main() 745