1 #!/usr/bin/env python2 2 3 # Copyright 2017 The Chromium OS Authors. All rights reserved. 4 # Use of this source code is governed by a BSD-style license that can be 5 # found in the LICENSE file. 6 7 """Load generator for devserver. 8 9 Example usage: 10 11 # Find DUTs in suites pool to test with: 12 atest host list -b 'pool:suites,board:BOARD' --unlocked -s Ready 13 14 # Lock DUTs: 15 atest host mod -l -r 'quick provision testing' DUT1 DUT2 16 17 # Create config file with DUTs to test and builds to use. 18 cat >config.json <<EOD 19 { 20 "BOARD": { 21 "duts": [ 22 "chromeosX-rowY-rackZ-hostA", 23 "chromeosX-rowY-rackZ-hostB", 24 ], 25 "versions": [ 26 "auron_paine-paladin/R65-10208.0.0-rc2", 27 "auron_paine-paladin/R65-10208.0.0-rc3", 28 "auron_paine-paladin/R65-10209.0.0-rc1" 29 ] 30 }, 31 } 32 EOD 33 34 # Do 100 total provisions, aiming to have 10 active simultaneously. 35 loadtest.py $DS config.json --simultaneous 10 --total 100 36 37 # Unlock DUTs: 38 atest host mod -u DUT1 DUT2 39 """ 40 41 import collections 42 import datetime 43 import json 44 import random 45 import re 46 import signal 47 import subprocess 48 import sys 49 import time 50 51 import common 52 from autotest_lib.client.common_lib import time_utils 53 from autotest_lib.client.common_lib.cros import dev_server 54 from chromite.lib import commandline 55 from chromite.lib import cros_logging as logging 56 from chromite.lib import locking 57 from chromite.lib import parallel 58 59 # Paylods to stage. 60 PAYLOADS = ['quick_provision', 'stateful'] 61 62 # Number of seconds between full status checks. 63 STATUS_POLL_SECONDS = 2 64 65 # Number of successes/failures to blacklist a DUT. 66 BLACKLIST_CONSECUTIVE_FAILURE = 2 67 BLACKLIST_TOTAL_SUCCESS = 0 68 BLACKLIST_TOTAL_FAILURE = 5 69 70 def get_parser(): 71 """Creates the argparse parser.""" 72 parser = commandline.ArgumentParser(description=__doc__) 73 parser.add_argument('server', type=str, action='store', 74 help='Devserver to load test.') 75 parser.add_argument('config', type=str, action='store', 76 help='Path to JSON config file.' 77 'Config file is indexed by board with keys of ' 78 '"duts" and "versions", each a list.') 79 parser.add_argument('--blacklist-consecutive', '-C', type=int, 80 action='store', 81 help=('Consecutive number of failures before ' 82 'blacklisting DUT (default %d).') % 83 BLACKLIST_CONSECUTIVE_FAILURE, 84 default=BLACKLIST_CONSECUTIVE_FAILURE) 85 parser.add_argument('--blacklist-success', '-S', type=int, action='store', 86 help=('Total number of successes before blacklisting ' 87 'DUT (default %d).') % BLACKLIST_TOTAL_SUCCESS, 88 default=BLACKLIST_TOTAL_SUCCESS) 89 parser.add_argument('--blacklist-total', '-T', type=int, action='store', 90 help=('Total number of failures before blacklisting ' 91 'DUT (default %d).') % BLACKLIST_TOTAL_FAILURE, 92 default=BLACKLIST_TOTAL_FAILURE) 93 parser.add_argument('--boards', '-b', type=str, action='store', 94 help='Comma-separated list of boards to provision.') 95 parser.add_argument('--dryrun', '-n', action='store_true', dest='dryrun', 96 help='Do not attempt to provision.') 97 parser.add_argument('--duts', '-d', type=str, action='store', 98 help='Comma-separated list of duts to provision.') 99 parser.add_argument('--outputlog', '-l', type=str, action='store', 100 help='Path to append JSON entries to.') 101 parser.add_argument('--output', '-o', type=str, action='store', 102 help='Path to write JSON file to.') 103 parser.add_argument('--ping', '-p', action='store_true', 104 help='Ping DUTs and blacklist unresponsive ones.') 105 parser.add_argument('--simultaneous', '-s', type=int, action='store', 106 help='Number of simultaneous provisions to run.', 107 default=1) 108 parser.add_argument('--no-stage', action='store_false', 109 dest='stage', default=True, 110 help='Do not attempt to stage builds.') 111 parser.add_argument('--total', '-t', type=int, action='store', 112 help='Number of total provisions to run.', 113 default=0) 114 return parser 115 116 def make_entry(entry_id, name, status, start_time, 117 finish_time=None, parent=None, **kwargs): 118 """Generate an event log entry to be stored in Cloud Datastore. 119 120 @param entry_id: A (Kind, id) tuple representing the key. 121 @param name: A string identifying the event 122 @param status: A string identifying the status of the event. 123 @param start_time: A datetime of the start of the event. 124 @param finish_time: A datetime of the finish of the event. 125 @param parent: A (Kind, id) tuple representing the parent key. 126 127 @return A dictionary representing the entry suitable for dumping via JSON. 128 """ 129 entry = { 130 'id': entry_id, 131 'name': name, 132 'status': status, 133 'start_time': time_utils.to_epoch_time(start_time), 134 } 135 if finish_time is not None: 136 entry['finish_time'] = time_utils.to_epoch_time(finish_time) 137 if parent is not None: 138 entry['parent'] = parent 139 return entry 140 141 class Job(object): 142 """Tracks a single provision job.""" 143 def __init__(self, ds, host_name, build_name, 144 entry_id=0, parent=None, board=None, 145 start_active=0, 146 force_update=False, full_update=False, 147 clobber_stateful=True, quick_provision=True, 148 ping=False, dryrun=False): 149 150 self.ds = ds 151 self.host_name = host_name 152 self.build_name = build_name 153 154 self.entry_id = ('Job', entry_id) 155 self.parent = parent 156 self.board = board 157 self.start_active = start_active 158 self.end_active = None 159 self.check_active_sum = 0 160 self.check_active_count = 0 161 162 self.start_time = datetime.datetime.now() 163 self.finish_time = None 164 self.trigger_response = None 165 166 self.ping = ping 167 self.pre_ping = None 168 self.post_ping = None 169 170 self.kwargs = { 171 'host_name': host_name, 172 'build_name': build_name, 173 'force_update': force_update, 174 'full_update': full_update, 175 'clobber_stateful': clobber_stateful, 176 'quick_provision': quick_provision, 177 } 178 179 if dryrun: 180 self.finish_time = datetime.datetime.now() 181 self.raised_error = None 182 self.success = True 183 self.pid = 0 184 else: 185 if self.ping: 186 self.pre_ping = ping_dut(self.host_name) 187 self.trigger_response = ds._trigger_auto_update(**self.kwargs) 188 189 def as_entry(self): 190 """Generate an entry for exporting to datastore.""" 191 entry = make_entry(self.entry_id, self.host_name, 192 'pass' if self.success else 'fail', 193 self.start_time, self.finish_time, self.parent) 194 entry.update({ 195 'build_name': self.build_name, 196 'board': self.board, 197 'devserver': self.ds.hostname, 198 'start_active': self.start_active, 199 'end_active': self.end_active, 200 'force_update': self.kwargs['force_update'], 201 'full_update': self.kwargs['full_update'], 202 'clobber_stateful': self.kwargs['clobber_stateful'], 203 'quick_provision': self.kwargs['quick_provision'], 204 'elapsed': int(self.elapsed().total_seconds()), 205 'trigger_response': self.trigger_response, 206 'pre_ping': self.pre_ping, 207 'post_ping': self.post_ping, 208 }) 209 if self.check_active_count: 210 entry['avg_active'] = (self.check_active_sum / 211 self.check_active_count) 212 return entry 213 214 def check(self, active_count): 215 """Checks if a job has completed. 216 217 @param active_count: Number of active provisions at time of the check. 218 @return: True if the job has completed, False otherwise. 219 """ 220 if self.finish_time is not None: 221 return True 222 223 self.check_active_sum += active_count 224 self.check_active_count += 1 225 226 finished, raised_error, pid = self.ds.check_for_auto_update_finished( 227 self.trigger_response, wait=False, **self.kwargs) 228 if finished: 229 self.finish_time = datetime.datetime.now() 230 self.raised_error = raised_error 231 self.success = self.raised_error is None 232 self.pid = pid 233 self.end_active = active_count 234 if self.ping: 235 self.post_ping = ping_dut(self.host_name) 236 237 return finished 238 239 def elapsed(self): 240 """Determine the elapsed time of the task.""" 241 finish_time = self.finish_time or datetime.datetime.now() 242 return finish_time - self.start_time 243 244 class Runner(object): 245 """Parallel provision load test runner.""" 246 def __init__(self, ds, duts, config, simultaneous=1, total=0, 247 outputlog=None, ping=False, blacklist_consecutive=None, 248 blacklist_success=None, blacklist_total=None, dryrun=False): 249 self.ds = ds 250 self.duts = duts 251 self.config = config 252 self.start_time = datetime.datetime.now() 253 self.finish_time = None 254 self.simultaneous = simultaneous 255 self.total = total 256 self.outputlog = outputlog 257 self.ping = ping 258 self.blacklist_consecutive = blacklist_consecutive 259 self.blacklist_success = blacklist_success 260 self.blacklist_total = blacklist_total 261 self.dryrun = dryrun 262 263 self.active = [] 264 self.started = 0 265 self.completed = [] 266 # Track DUTs which have failed multiple times. 267 self.dut_blacklist = set() 268 # Track versions of each DUT to provision in order. 269 self.last_versions = {} 270 271 # id for the parent entry. 272 # TODO: This isn't the most unique. 273 self.entry_id = ('Runner', 274 int(time_utils.to_epoch_time(datetime.datetime.now()))) 275 276 # ids for the job entries. 277 self.next_id = 0 278 279 if self.outputlog: 280 dump_entries_as_json([self.as_entry()], self.outputlog) 281 282 def signal_handler(self, signum, frame): 283 """Signal handle to dump current status.""" 284 logging.info('Received signal %s', signum) 285 if signum == signal.SIGUSR1: 286 now = datetime.datetime.now() 287 logging.info('%d active provisions, %d completed provisions, ' 288 '%s elapsed:', 289 len(self.active), len(self.completed), 290 now - self.start_time) 291 for job in self.active: 292 logging.info(' %s -> %s, %s elapsed', 293 job.host_name, job.build_name, 294 now - job.start_time) 295 296 def as_entry(self): 297 """Generate an entry for exporting to datastore.""" 298 entry = make_entry(self.entry_id, 'Runner', 'pass', 299 self.start_time, self.finish_time) 300 entry.update({ 301 'devserver': self.ds.hostname, 302 }) 303 return entry 304 305 def get_completed_entries(self): 306 """Retrieves all completed jobs as entries for datastore.""" 307 entries = [self.as_entry()] 308 entries.extend([job.as_entry() for job in self.completed]) 309 return entries 310 311 def get_next_id(self): 312 """Get the next Job id.""" 313 entry_id = self.next_id 314 self.next_id += 1 315 return entry_id 316 317 def spawn(self, host_name, build_name): 318 """Spawn a single provision job.""" 319 job = Job(self.ds, host_name, build_name, 320 entry_id=self.get_next_id(), parent=self.entry_id, 321 board=self.get_dut_board_type(host_name), 322 start_active=len(self.active), ping=self.ping, 323 dryrun=self.dryrun) 324 self.active.append(job) 325 logging.info('Provision (%d) of %s to %s started', 326 job.entry_id[1], job.host_name, job.build_name) 327 self.last_versions[host_name] = build_name 328 self.started += 1 329 330 def replenish(self): 331 """Replenish the number of active provisions to match goals.""" 332 while ((self.simultaneous == 0 or 333 len(self.active) < self.simultaneous) and 334 (self.total == 0 or self.started < self.total)): 335 host_name = self.find_idle_dut() 336 if host_name: 337 build_name = self.find_build_for_dut(host_name) 338 self.spawn(host_name, build_name) 339 elif self.simultaneous: 340 logging.warn('Insufficient DUTs to satisfy goal') 341 return False 342 else: 343 return len(self.active) > 0 344 return True 345 346 def check_all(self): 347 """Check the status of outstanding provisions.""" 348 still_active = [] 349 for job in self.active: 350 if job.check(len(self.active)): 351 logging.info('Provision (%d) of %s to %s %s in %s: %s', 352 job.entry_id[1], job.host_name, job.build_name, 353 'completed' if job.success else 'failed', 354 job.elapsed(), job.raised_error) 355 entry = job.as_entry() 356 logging.debug(json.dumps(entry)) 357 if self.outputlog: 358 dump_entries_as_json([entry], self.outputlog) 359 self.completed.append(job) 360 if self.should_blacklist(job.host_name): 361 logging.error('Blacklisting DUT %s', job.host_name) 362 self.dut_blacklist.add(job.host_name) 363 else: 364 still_active.append(job) 365 self.active = still_active 366 367 def should_blacklist(self, host_name): 368 """Determines if a given DUT should be blacklisted.""" 369 jobs = [job for job in self.completed if job.host_name == host_name] 370 total = 0 371 consecutive = 0 372 successes = 0 373 for job in jobs: 374 if not job.success: 375 total += 1 376 consecutive += 1 377 if ((self.blacklist_total is not None and 378 total >= self.blacklist_total) or 379 (self.blacklist_consecutive is not None and 380 consecutive >= self.blacklist_consecutive)): 381 return True 382 else: 383 successes += 1 384 if (self.blacklist_success is not None and 385 successes >= self.blacklist_success): 386 return True 387 consecutive = 0 388 return False 389 390 def find_idle_dut(self): 391 """Find an idle DUT to provision..""" 392 active_duts = {job.host_name for job in self.active} 393 idle_duts = [d for d in self.duts 394 if d not in active_duts | self.dut_blacklist] 395 return random.choice(idle_duts) if len(idle_duts) else None 396 397 def get_dut_board_type(self, host_name): 398 """Determine the board type of a DUT.""" 399 return self.duts[host_name] 400 401 def get_board_versions(self, board): 402 """Determine the versions to provision for a board.""" 403 return self.config[board]['versions'] 404 405 def find_build_for_dut(self, host_name): 406 """Determine a build to provision on a DUT.""" 407 board = self.get_dut_board_type(host_name) 408 versions = self.get_board_versions(board) 409 last_version = self.last_versions.get(host_name) 410 try: 411 last_index = versions.index(last_version) 412 except ValueError: 413 return versions[0] 414 return versions[(last_index + 1) % len(versions)] 415 416 def stage(self, build): 417 logging.debug('Staging %s', build) 418 self.ds.stage_artifacts(build, PAYLOADS) 419 420 def stage_all(self): 421 """Stage all necessary artifacts.""" 422 boards = set(self.duts.values()) 423 logging.info('Staging for %d boards', len(boards)) 424 funcs = [] 425 for board in boards: 426 for build in self.get_board_versions(board): 427 funcs.append(lambda build_=build: self.stage(build_)) 428 parallel.RunParallelSteps(funcs) 429 430 def loop(self): 431 """Run the main provision loop.""" 432 # Install a signal handler for status updates. 433 old_handler = signal.signal(signal.SIGUSR1, self.signal_handler) 434 signal.siginterrupt(signal.SIGUSR1, False) 435 436 try: 437 while True: 438 self.check_all() 439 if self.total != 0 and len(self.completed) >= self.total: 440 break 441 if not self.replenish() and len(self.active) == 0: 442 logging.error('Unable to replenish with no active ' 443 'provisions') 444 return False 445 logging.debug('%d provisions active', len(self.active)) 446 time.sleep(STATUS_POLL_SECONDS) 447 return True 448 except KeyboardInterrupt: 449 return False 450 finally: 451 self.finish_time = datetime.datetime.now() 452 # Clean up signal handler. 453 signal.signal(signal.SIGUSR1, old_handler) 454 455 def elapsed(self): 456 """Determine the elapsed time of the task.""" 457 finish_time = self.finish_time or datetime.datetime.now() 458 return finish_time - self.start_time 459 460 def dump_entries_as_json(entries, output_file): 461 """Dump event log entries as json to a file. 462 463 @param entries: A list of event log entries to dump. 464 @param output_file: The file to write to. 465 """ 466 # Write the entries out as JSON. 467 logging.debug('Dumping %d entries' % len(entries)) 468 for e in entries: 469 json.dump(e, output_file, sort_keys=True) 470 output_file.write('\n') 471 output_file.flush() 472 473 def ping_dut(hostname): 474 """Checks if a host is responsive to pings.""" 475 if re.match('^chromeos\d+-row\d+-rack\d+-host\d+$', hostname): 476 hostname += '.cros' 477 478 response = subprocess.call(["/bin/ping", "-c1", "-w2", hostname], 479 stdout=subprocess.PIPE) 480 return response == 0 481 482 def main(argv): 483 """Load generator for a devserver.""" 484 parser = get_parser() 485 options = parser.parse_args(argv) 486 487 # Parse devserver. 488 if options.server: 489 if re.match(r'^https?://', options.server): 490 server = options.server 491 else: 492 server = 'http://%s/' % options.server 493 ds = dev_server.ImageServer(server) 494 else: 495 parser.print_usage() 496 logging.error('Must specify devserver') 497 sys.exit(1) 498 499 # Parse config file and determine master list of duts and their board type, 500 # filtering by board type if specified. 501 duts = {} 502 if options.config: 503 with open(options.config, 'r') as f: 504 config = json.load(f) 505 boards = (options.boards.split(',') 506 if options.boards else config.keys()) 507 duts_specified = (set(options.duts.split(',')) 508 if options.duts else None) 509 for board in boards: 510 duts.update({dut: board for dut in config[board]['duts'] 511 if duts_specified is None or 512 dut in duts_specified}) 513 logging.info('Config file %s: %d boards, %d duts', 514 options.config, len(boards), len(duts)) 515 else: 516 parser.print_usage() 517 logging.error('Must specify config file') 518 sys.exit(1) 519 520 if options.ping: 521 logging.info('Performing ping tests') 522 duts_alive = {} 523 for dut, board in duts.items(): 524 if ping_dut(dut): 525 duts_alive[dut] = board 526 else: 527 logging.error('Ignoring DUT %s (%s) for failing initial ' 528 'ping check', dut, board) 529 duts = duts_alive 530 logging.info('After ping tests: %d boards, %d duts', len(boards), 531 len(duts)) 532 533 # Set up the test runner and stage all the builds. 534 outputlog = open(options.outputlog, 'a') if options.outputlog else None 535 runner = Runner(ds, duts, config, 536 simultaneous=options.simultaneous, total=options.total, 537 outputlog=outputlog, ping=options.ping, 538 blacklist_consecutive=options.blacklist_consecutive, 539 blacklist_success=options.blacklist_success, 540 blacklist_total=options.blacklist_total, 541 dryrun=options.dryrun) 542 if options.stage: 543 runner.stage_all() 544 545 # Run all the provisions. 546 with locking.FileLock(options.config, blocking=True).lock(): 547 completed = runner.loop() 548 logging.info('%s in %s', 'Completed' if completed else 'Interrupted', 549 runner.elapsed()) 550 # Write all entries as JSON. 551 entries = runner.get_completed_entries() 552 if options.output: 553 with open(options.output, 'w') as f: 554 dump_entries_as_json(entries, f) 555 else: 556 dump_entries_as_json(entries, sys.stdout) 557 logging.info('Summary: %s', 558 dict(collections.Counter([e['status'] for e in entries 559 if e['name'] != 'Runner']))) 560 561 # List blacklisted DUTs. 562 if runner.dut_blacklist: 563 logging.warn('Blacklisted DUTs:') 564 for host_name in runner.dut_blacklist: 565 logging.warn(' %s', host_name) 566 567 if __name__ == '__main__': 568 sys.exit(main(sys.argv[1:])) 569