Home | History | Annotate | Download | only in utils
      1 #!/usr/bin/env python2
      2 
      3 # Copyright 2017 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """Load generator for devserver.
      8 
      9 Example usage:
     10 
     11 # Find DUTs in suites pool to test with:
     12 atest host list -b 'pool:suites,board:BOARD' --unlocked -s Ready
     13 
     14 # Lock DUTs:
     15 atest host mod -l -r 'quick provision testing' DUT1 DUT2
     16 
     17 # Create config file with DUTs to test and builds to use.
     18 cat >config.json <<EOD
     19 {
     20   "BOARD": {
     21     "duts": [
     22       "chromeosX-rowY-rackZ-hostA",
     23       "chromeosX-rowY-rackZ-hostB",
     24     ],
     25     "versions": [
     26       "auron_paine-paladin/R65-10208.0.0-rc2",
     27       "auron_paine-paladin/R65-10208.0.0-rc3",
     28       "auron_paine-paladin/R65-10209.0.0-rc1"
     29     ]
     30   },
     31 }
     32 EOD
     33 
     34 # Do 100 total provisions, aiming to have 10 active simultaneously.
     35 loadtest.py $DS config.json --simultaneous 10 --total 100
     36 
     37 # Unlock DUTs:
     38 atest host mod -u DUT1 DUT2
     39 """
     40 
     41 import collections
     42 import datetime
     43 import json
     44 import random
     45 import re
     46 import signal
     47 import subprocess
     48 import sys
     49 import time
     50 
     51 import common
     52 from autotest_lib.client.common_lib import time_utils
     53 from autotest_lib.client.common_lib.cros import dev_server
     54 from chromite.lib import commandline
     55 from chromite.lib import cros_logging as logging
     56 from chromite.lib import locking
     57 from chromite.lib import parallel
     58 
     59 # Paylods to stage.
     60 PAYLOADS = ['quick_provision', 'stateful']
     61 
     62 # Number of seconds between full status checks.
     63 STATUS_POLL_SECONDS = 2
     64 
     65 # Number of successes/failures to blacklist a DUT.
     66 BLACKLIST_CONSECUTIVE_FAILURE = 2
     67 BLACKLIST_TOTAL_SUCCESS = 0
     68 BLACKLIST_TOTAL_FAILURE = 5
     69 
     70 def get_parser():
     71     """Creates the argparse parser."""
     72     parser = commandline.ArgumentParser(description=__doc__)
     73     parser.add_argument('server', type=str, action='store',
     74                         help='Devserver to load test.')
     75     parser.add_argument('config', type=str, action='store',
     76                         help='Path to JSON config file.'
     77                              'Config file is indexed by board with keys of '
     78                              '"duts" and "versions", each a list.')
     79     parser.add_argument('--blacklist-consecutive', '-C', type=int,
     80                         action='store',
     81                         help=('Consecutive number of failures before '
     82                               'blacklisting DUT (default %d).') %
     83                              BLACKLIST_CONSECUTIVE_FAILURE,
     84                         default=BLACKLIST_CONSECUTIVE_FAILURE)
     85     parser.add_argument('--blacklist-success', '-S', type=int, action='store',
     86                         help=('Total number of successes before blacklisting '
     87                               'DUT (default %d).') % BLACKLIST_TOTAL_SUCCESS,
     88                         default=BLACKLIST_TOTAL_SUCCESS)
     89     parser.add_argument('--blacklist-total', '-T', type=int, action='store',
     90                         help=('Total number of failures before blacklisting '
     91                               'DUT (default %d).') % BLACKLIST_TOTAL_FAILURE,
     92                         default=BLACKLIST_TOTAL_FAILURE)
     93     parser.add_argument('--boards', '-b', type=str, action='store',
     94                         help='Comma-separated list of boards to provision.')
     95     parser.add_argument('--dryrun', '-n', action='store_true', dest='dryrun',
     96                         help='Do not attempt to provision.')
     97     parser.add_argument('--duts', '-d', type=str, action='store',
     98                         help='Comma-separated list of duts to provision.')
     99     parser.add_argument('--outputlog', '-l', type=str, action='store',
    100                         help='Path to append JSON entries to.')
    101     parser.add_argument('--output', '-o', type=str, action='store',
    102                         help='Path to write JSON file to.')
    103     parser.add_argument('--ping', '-p', action='store_true',
    104                         help='Ping DUTs and blacklist unresponsive ones.')
    105     parser.add_argument('--simultaneous', '-s', type=int, action='store',
    106                         help='Number of simultaneous provisions to run.',
    107                         default=1)
    108     parser.add_argument('--no-stage', action='store_false',
    109                         dest='stage', default=True,
    110                         help='Do not attempt to stage builds.')
    111     parser.add_argument('--total', '-t', type=int, action='store',
    112                         help='Number of total provisions to run.',
    113                         default=0)
    114     return parser
    115 
    116 def make_entry(entry_id, name, status, start_time,
    117                finish_time=None, parent=None, **kwargs):
    118     """Generate an event log entry to be stored in Cloud Datastore.
    119 
    120     @param entry_id: A (Kind, id) tuple representing the key.
    121     @param name: A string identifying the event
    122     @param status: A string identifying the status of the event.
    123     @param start_time: A datetime of the start of the event.
    124     @param finish_time: A datetime of the finish of the event.
    125     @param parent: A (Kind, id) tuple representing the parent key.
    126 
    127     @return A dictionary representing the entry suitable for dumping via JSON.
    128     """
    129     entry = {
    130         'id': entry_id,
    131         'name': name,
    132         'status': status,
    133         'start_time': time_utils.to_epoch_time(start_time),
    134     }
    135     if finish_time is not None:
    136         entry['finish_time'] = time_utils.to_epoch_time(finish_time)
    137     if parent is not None:
    138         entry['parent'] = parent
    139     return entry
    140 
    141 class Job(object):
    142     """Tracks a single provision job."""
    143     def __init__(self, ds, host_name, build_name,
    144                  entry_id=0, parent=None, board=None,
    145                  start_active=0,
    146                  force_update=False, full_update=False,
    147                  clobber_stateful=True, quick_provision=True,
    148                  ping=False, dryrun=False):
    149 
    150         self.ds = ds
    151         self.host_name = host_name
    152         self.build_name = build_name
    153 
    154         self.entry_id = ('Job', entry_id)
    155         self.parent = parent
    156         self.board = board
    157         self.start_active = start_active
    158         self.end_active = None
    159         self.check_active_sum = 0
    160         self.check_active_count = 0
    161 
    162         self.start_time = datetime.datetime.now()
    163         self.finish_time = None
    164         self.trigger_response = None
    165 
    166         self.ping = ping
    167         self.pre_ping = None
    168         self.post_ping = None
    169 
    170         self.kwargs = {
    171             'host_name': host_name,
    172             'build_name': build_name,
    173             'force_update': force_update,
    174             'full_update': full_update,
    175             'clobber_stateful': clobber_stateful,
    176             'quick_provision': quick_provision,
    177         }
    178 
    179         if dryrun:
    180             self.finish_time = datetime.datetime.now()
    181             self.raised_error = None
    182             self.success = True
    183             self.pid = 0
    184         else:
    185             if self.ping:
    186                 self.pre_ping = ping_dut(self.host_name)
    187             self.trigger_response = ds._trigger_auto_update(**self.kwargs)
    188 
    189     def as_entry(self):
    190         """Generate an entry for exporting to datastore."""
    191         entry = make_entry(self.entry_id, self.host_name,
    192                            'pass' if self.success else 'fail',
    193                            self.start_time, self.finish_time, self.parent)
    194         entry.update({
    195             'build_name': self.build_name,
    196             'board': self.board,
    197             'devserver': self.ds.hostname,
    198             'start_active': self.start_active,
    199             'end_active': self.end_active,
    200             'force_update': self.kwargs['force_update'],
    201             'full_update': self.kwargs['full_update'],
    202             'clobber_stateful': self.kwargs['clobber_stateful'],
    203             'quick_provision': self.kwargs['quick_provision'],
    204             'elapsed': int(self.elapsed().total_seconds()),
    205             'trigger_response': self.trigger_response,
    206             'pre_ping': self.pre_ping,
    207             'post_ping': self.post_ping,
    208         })
    209         if self.check_active_count:
    210             entry['avg_active'] = (self.check_active_sum /
    211                                    self.check_active_count)
    212         return entry
    213 
    214     def check(self, active_count):
    215         """Checks if a job has completed.
    216 
    217         @param active_count: Number of active provisions at time of the check.
    218         @return: True if the job has completed, False otherwise.
    219         """
    220         if self.finish_time is not None:
    221             return True
    222 
    223         self.check_active_sum += active_count
    224         self.check_active_count += 1
    225 
    226         finished, raised_error, pid = self.ds.check_for_auto_update_finished(
    227             self.trigger_response, wait=False, **self.kwargs)
    228         if finished:
    229             self.finish_time = datetime.datetime.now()
    230             self.raised_error = raised_error
    231             self.success = self.raised_error is None
    232             self.pid = pid
    233             self.end_active = active_count
    234             if self.ping:
    235                 self.post_ping = ping_dut(self.host_name)
    236 
    237         return finished
    238 
    239     def elapsed(self):
    240         """Determine the elapsed time of the task."""
    241         finish_time = self.finish_time or datetime.datetime.now()
    242         return finish_time - self.start_time
    243 
    244 class Runner(object):
    245     """Parallel provision load test runner."""
    246     def __init__(self, ds, duts, config, simultaneous=1, total=0,
    247                  outputlog=None, ping=False, blacklist_consecutive=None,
    248                  blacklist_success=None, blacklist_total=None, dryrun=False):
    249         self.ds = ds
    250         self.duts = duts
    251         self.config = config
    252         self.start_time = datetime.datetime.now()
    253         self.finish_time = None
    254         self.simultaneous = simultaneous
    255         self.total = total
    256         self.outputlog = outputlog
    257         self.ping = ping
    258         self.blacklist_consecutive = blacklist_consecutive
    259         self.blacklist_success = blacklist_success
    260         self.blacklist_total = blacklist_total
    261         self.dryrun = dryrun
    262 
    263         self.active = []
    264         self.started = 0
    265         self.completed = []
    266         # Track DUTs which have failed multiple times.
    267         self.dut_blacklist = set()
    268         # Track versions of each DUT to provision in order.
    269         self.last_versions = {}
    270 
    271         # id for the parent entry.
    272         # TODO: This isn't the most unique.
    273         self.entry_id = ('Runner',
    274                          int(time_utils.to_epoch_time(datetime.datetime.now())))
    275 
    276         # ids for the job entries.
    277         self.next_id = 0
    278 
    279         if self.outputlog:
    280             dump_entries_as_json([self.as_entry()], self.outputlog)
    281 
    282     def signal_handler(self, signum, frame):
    283         """Signal handle to dump current status."""
    284         logging.info('Received signal %s', signum)
    285         if signum == signal.SIGUSR1:
    286             now = datetime.datetime.now()
    287             logging.info('%d active provisions, %d completed provisions, '
    288                          '%s elapsed:',
    289                          len(self.active), len(self.completed),
    290                          now - self.start_time)
    291             for job in self.active:
    292                 logging.info('  %s -> %s, %s elapsed',
    293                              job.host_name, job.build_name,
    294                              now - job.start_time)
    295 
    296     def as_entry(self):
    297         """Generate an entry for exporting to datastore."""
    298         entry = make_entry(self.entry_id, 'Runner', 'pass',
    299                            self.start_time, self.finish_time)
    300         entry.update({
    301             'devserver': self.ds.hostname,
    302         })
    303         return entry
    304 
    305     def get_completed_entries(self):
    306         """Retrieves all completed jobs as entries for datastore."""
    307         entries = [self.as_entry()]
    308         entries.extend([job.as_entry() for job in self.completed])
    309         return entries
    310 
    311     def get_next_id(self):
    312         """Get the next Job id."""
    313         entry_id = self.next_id
    314         self.next_id += 1
    315         return entry_id
    316 
    317     def spawn(self, host_name, build_name):
    318         """Spawn a single provision job."""
    319         job = Job(self.ds, host_name, build_name,
    320                   entry_id=self.get_next_id(), parent=self.entry_id,
    321                   board=self.get_dut_board_type(host_name),
    322                   start_active=len(self.active), ping=self.ping,
    323                   dryrun=self.dryrun)
    324         self.active.append(job)
    325         logging.info('Provision (%d) of %s to %s started',
    326                      job.entry_id[1], job.host_name, job.build_name)
    327         self.last_versions[host_name] = build_name
    328         self.started += 1
    329 
    330     def replenish(self):
    331         """Replenish the number of active provisions to match goals."""
    332         while ((self.simultaneous == 0 or
    333                 len(self.active) < self.simultaneous) and
    334                (self.total == 0 or self.started < self.total)):
    335             host_name = self.find_idle_dut()
    336             if host_name:
    337                 build_name = self.find_build_for_dut(host_name)
    338                 self.spawn(host_name, build_name)
    339             elif self.simultaneous:
    340                 logging.warn('Insufficient DUTs to satisfy goal')
    341                 return False
    342             else:
    343                 return len(self.active) > 0
    344         return True
    345 
    346     def check_all(self):
    347         """Check the status of outstanding provisions."""
    348         still_active = []
    349         for job in self.active:
    350             if job.check(len(self.active)):
    351                 logging.info('Provision (%d) of %s to %s %s in %s: %s',
    352                              job.entry_id[1], job.host_name, job.build_name,
    353                              'completed' if job.success else 'failed',
    354                              job.elapsed(), job.raised_error)
    355                 entry = job.as_entry()
    356                 logging.debug(json.dumps(entry))
    357                 if self.outputlog:
    358                     dump_entries_as_json([entry], self.outputlog)
    359                 self.completed.append(job)
    360                 if self.should_blacklist(job.host_name):
    361                     logging.error('Blacklisting DUT %s', job.host_name)
    362                     self.dut_blacklist.add(job.host_name)
    363             else:
    364                 still_active.append(job)
    365         self.active = still_active
    366 
    367     def should_blacklist(self, host_name):
    368         """Determines if a given DUT should be blacklisted."""
    369         jobs = [job for job in self.completed if job.host_name == host_name]
    370         total = 0
    371         consecutive = 0
    372         successes = 0
    373         for job in jobs:
    374             if not job.success:
    375                 total += 1
    376                 consecutive += 1
    377                 if ((self.blacklist_total is not None and
    378                      total >= self.blacklist_total) or
    379                     (self.blacklist_consecutive is not None and
    380                      consecutive >= self.blacklist_consecutive)):
    381                     return True
    382             else:
    383                 successes += 1
    384                 if (self.blacklist_success is not None and
    385                     successes >= self.blacklist_success):
    386                     return True
    387                 consecutive = 0
    388         return False
    389 
    390     def find_idle_dut(self):
    391         """Find an idle DUT to provision.."""
    392         active_duts = {job.host_name for job in self.active}
    393         idle_duts = [d for d in self.duts
    394                      if d not in active_duts | self.dut_blacklist]
    395         return random.choice(idle_duts) if len(idle_duts) else None
    396 
    397     def get_dut_board_type(self, host_name):
    398         """Determine the board type of a DUT."""
    399         return self.duts[host_name]
    400 
    401     def get_board_versions(self, board):
    402         """Determine the versions to provision for a board."""
    403         return self.config[board]['versions']
    404 
    405     def find_build_for_dut(self, host_name):
    406         """Determine a build to provision on a DUT."""
    407         board = self.get_dut_board_type(host_name)
    408         versions = self.get_board_versions(board)
    409         last_version = self.last_versions.get(host_name)
    410         try:
    411             last_index = versions.index(last_version)
    412         except ValueError:
    413             return versions[0]
    414         return versions[(last_index + 1) % len(versions)]
    415 
    416     def stage(self, build):
    417         """Stage artifacts for a given build."""
    418         logging.debug('Staging %s', build)
    419         self.ds.stage_artifacts(build, PAYLOADS)
    420 
    421     def stage_all(self):
    422         """Stage all necessary artifacts."""
    423         boards = set(self.duts.values())
    424         logging.info('Staging for %d boards', len(boards))
    425         funcs = []
    426         for board in boards:
    427             for build in self.get_board_versions(board):
    428                 funcs.append(lambda build_=build: self.stage(build_))
    429         parallel.RunParallelSteps(funcs)
    430 
    431     def loop(self):
    432         """Run the main provision loop."""
    433         # Install a signal handler for status updates.
    434         old_handler = signal.signal(signal.SIGUSR1, self.signal_handler)
    435         signal.siginterrupt(signal.SIGUSR1, False)
    436 
    437         try:
    438             while True:
    439                 self.check_all()
    440                 if self.total != 0 and len(self.completed) >= self.total:
    441                     break
    442                 if not self.replenish() and len(self.active) == 0:
    443                     logging.error('Unable to replenish with no active '
    444                                   'provisions')
    445                     return False
    446                 logging.debug('%d provisions active', len(self.active))
    447                 time.sleep(STATUS_POLL_SECONDS)
    448             return True
    449         except KeyboardInterrupt:
    450             return False
    451         finally:
    452             self.finish_time = datetime.datetime.now()
    453             # Clean up signal handler.
    454             signal.signal(signal.SIGUSR1, old_handler)
    455 
    456     def elapsed(self):
    457         """Determine the elapsed time of the task."""
    458         finish_time = self.finish_time or datetime.datetime.now()
    459         return finish_time - self.start_time
    460 
    461 def dump_entries_as_json(entries, output_file):
    462     """Dump event log entries as json to a file.
    463 
    464     @param entries: A list of event log entries to dump.
    465     @param output_file: The file to write to.
    466     """
    467     # Write the entries out as JSON.
    468     logging.debug('Dumping %d entries' % len(entries))
    469     for e in entries:
    470         json.dump(e, output_file, sort_keys=True)
    471         output_file.write('\n')
    472         output_file.flush()
    473 
    474 def ping_dut(hostname):
    475     """Checks if a host is responsive to pings."""
    476     if re.match('^chromeos\d+-row\d+-rack\d+-host\d+$', hostname):
    477         hostname += '.cros'
    478 
    479     response = subprocess.call(["/bin/ping", "-c1", "-w2", hostname],
    480                                stdout=subprocess.PIPE)
    481     return response == 0
    482 
    483 def main(argv):
    484     """Load generator for a devserver."""
    485     parser = get_parser()
    486     options = parser.parse_args(argv)
    487 
    488     # Parse devserver.
    489     if options.server:
    490         if re.match(r'^https?://', options.server):
    491             server = options.server
    492         else:
    493             server = 'http://%s/' % options.server
    494         ds = dev_server.ImageServer(server)
    495     else:
    496         parser.print_usage()
    497         logging.error('Must specify devserver')
    498         sys.exit(1)
    499 
    500     # Parse config file and determine master list of duts and their board type,
    501     # filtering by board type if specified.
    502     duts = {}
    503     if options.config:
    504         with open(options.config, 'r') as f:
    505             config = json.load(f)
    506             boards = (options.boards.split(',')
    507                       if options.boards else config.keys())
    508             duts_specified = (set(options.duts.split(','))
    509                               if options.duts else None)
    510             for board in boards:
    511                 duts.update({dut: board for dut in config[board]['duts']
    512                              if duts_specified is None or
    513                                 dut in duts_specified})
    514         logging.info('Config file %s: %d boards, %d duts',
    515                      options.config, len(boards), len(duts))
    516     else:
    517         parser.print_usage()
    518         logging.error('Must specify config file')
    519         sys.exit(1)
    520 
    521     if options.ping:
    522         logging.info('Performing ping tests')
    523         duts_alive = {}
    524         for dut, board in duts.items():
    525             if ping_dut(dut):
    526                 duts_alive[dut] = board
    527             else:
    528                 logging.error('Ignoring DUT %s (%s) for failing initial '
    529                               'ping check', dut, board)
    530         duts = duts_alive
    531         logging.info('After ping tests: %d boards, %d duts', len(boards),
    532                      len(duts))
    533 
    534     # Set up the test runner and stage all the builds.
    535     outputlog = open(options.outputlog, 'a') if options.outputlog else None
    536     runner = Runner(ds, duts, config,
    537                     simultaneous=options.simultaneous, total=options.total,
    538                     outputlog=outputlog, ping=options.ping,
    539                     blacklist_consecutive=options.blacklist_consecutive,
    540                     blacklist_success=options.blacklist_success,
    541                     blacklist_total=options.blacklist_total,
    542                     dryrun=options.dryrun)
    543     if options.stage:
    544         runner.stage_all()
    545 
    546     # Run all the provisions.
    547     with locking.FileLock(options.config, blocking=True).lock():
    548         completed = runner.loop()
    549     logging.info('%s in %s', 'Completed' if completed else 'Interrupted',
    550                  runner.elapsed())
    551     # Write all entries as JSON.
    552     entries = runner.get_completed_entries()
    553     if options.output:
    554         with open(options.output, 'w') as f:
    555             dump_entries_as_json(entries, f)
    556     else:
    557         dump_entries_as_json(entries, sys.stdout)
    558     logging.info('Summary: %s',
    559                  dict(collections.Counter([e['status'] for e in entries
    560                                            if e['name'] != 'Runner'])))
    561 
    562     # List blacklisted DUTs.
    563     if runner.dut_blacklist:
    564         logging.warn('Blacklisted DUTs:')
    565         for host_name in runner.dut_blacklist:
    566             logging.warn('  %s', host_name)
    567 
    568 if __name__ == '__main__':
    569     sys.exit(main(sys.argv[1:]))
    570