Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/python
      2 #
      3 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """Updates all unlocked hosts in Autotest lab in parallel at a given rate.
      8 
      9 Used to update all hosts, or only those of a given platform, in the Autotest
     10 lab to a given version. Allows a configurable number of updates to be started in
     11 parallel. Updates can also be staggered to reduce load."""
     12 
     13 import logging
     14 import os
     15 import subprocess
     16 import sys
     17 import threading
     18 import time
     19 import traceback
     20 
     21 from collections import deque
     22 from optparse import OptionParser
     23 
     24 
     25 # Default number of hosts to update in parallel.
     26 DEFAULT_CONCURRENCY = 10
     27 
     28 
     29 # By default do not stagger any of the updates.
     30 DEFAULT_STAGGER = 0
     31 
     32 
     33 # Default location of ChromeOS checkout.
     34 DEFAULT_GCLIENT_ROOT = '/usr/local/google/home/${USER}/chromeos/chromeos'
     35 
     36 
     37 # Default path for individual host logs. Each host will have it's own file. E.g.
     38 # <default_log_path>/<host>.log
     39 DEFAULT_LOG_PATH = '/tmp/mass_update_logs/%s/' % time.strftime('%Y-%m-%d-%H-%M',
     40                                                                time.gmtime())
     41 
     42 
     43 # Location of Autotest cli executable.
     44 AUTOTEST_LOCATION = '/home/chromeos-test/autotest/cli'
     45 
     46 
     47 # Default time in seconds to sleep while waiting for threads to complete.
     48 DEFAULT_SLEEP = 10
     49 
     50 
     51 # Amount of time in seconds to wait before declaring an update as failed.
     52 DEFAULT_TIMEOUT = 2400
     53 
     54 
     55 class MassUpdateStatus():
     56   """Used to track status for all updates."""
     57   ssh_failures = []
     58   update_failures = []
     59   successful_updates = 0
     60 
     61 
     62 class UpdateThread(threading.Thread):
     63   """Responsible for ssh-test, locking, imaging, and unlocking a host.
     64 
     65   Uses the atest CLI for host control and the image_to_live script to actually
     66   update the host. Each thread will continue to process hosts until the queue
     67   is empty."""
     68 
     69   _SUCCESS = 0            # Update was successful.
     70   _SSH_FAILURE = 1        # Could not SSH to host or related SSH failure.
     71   _UPDATE_FAILURE = 2     # Update failed for any reason other than SSH.
     72 
     73   def __init__(self, options, hosts, status):
     74     self._options = options
     75     self._hosts = hosts
     76     self._status = status
     77     self._logger = logging.getLogger()
     78     threading.Thread.__init__(self)
     79 
     80   def run(self):
     81     while self._hosts:
     82       host = self._hosts.popleft()
     83       status = UpdateThread._UPDATE_FAILURE
     84 
     85       self._logger.info('Updating host %s' % host)
     86       try:
     87         try:
     88           if not CheckSSH(host=host, options=self._options):
     89             status = UpdateThread._SSH_FAILURE
     90           elif LockHost(host) and ImageHost(host=host, options=self._options):
     91             status = UpdateThread._SUCCESS
     92         finally:
     93           if status == UpdateThread._SUCCESS:
     94             self._logger.info(
     95                 'Completed update for host %s successfully.' % host)
     96             self._status.successful_updates += 1
     97           elif status == UpdateThread._SSH_FAILURE:
     98             self._logger.info('Failed to SSH to host %s.' % host)
     99             self._status.ssh_failures.append(host)
    100           else:
    101             self._logger.info('Failed to update host %s.' % host)
    102             self._status.update_failures.append(host)
    103 
    104           UnlockHost(host)
    105       except:
    106         traceback.print_exc()
    107         self._logger.warning(
    108             'Exception encountered during update. Skipping host %s.' % host)
    109 
    110 
    111 def CheckSSH(host, options):
    112   """Uses the ssh_test script to ensure SSH access to a host is available.
    113 
    114   Returns true if an SSH connection to the host was successful."""
    115   return subprocess.Popen(
    116       '%s/src/scripts/ssh_test.sh --remote=%s' % (options.gclient, host),
    117       shell=True,
    118       stdout=subprocess.PIPE,
    119       stderr=subprocess.PIPE).wait() == 0
    120 
    121 
    122 def ImageHost(host, options):
    123   """Uses the image_to_live script to update a host.
    124 
    125   Returns true if the imaging process was successful."""
    126   log_file = open(os.path.join(options.log, host + '.log'), 'w')
    127   log_file_err = open(os.path.join(options.log, host + '.log.err'), 'w')
    128 
    129   exit_code = subprocess.Popen(
    130       ('/usr/local/scripts/alarm %d %s/src/scripts/image_to_live.sh '
    131        '--update_url %s --remote %s' % (DEFAULT_TIMEOUT, options.gclient,
    132                                         options.url, host)),
    133       shell=True,
    134       stdout=log_file,
    135       stderr=log_file_err).wait()
    136 
    137   log_file.close()
    138   log_file_err.close()
    139 
    140   return exit_code == 0
    141 
    142 
    143 def LockHost(host):
    144   """Locks a host using the atest CLI.
    145 
    146   Locking a host tells Autotest that the host shouldn't be scheduled for
    147   any other tasks. Returns true if the locking process was successful."""
    148   success = subprocess.Popen(
    149       '%s/atest host mod -l %s' % (AUTOTEST_LOCATION, host),
    150       shell=True,
    151       stdout=subprocess.PIPE,
    152       stderr=subprocess.PIPE).wait() == 0
    153 
    154   if not success:
    155     logging.getLogger().info('Failed to lock host %s.' % host)
    156 
    157   return success
    158 
    159 
    160 def UnlockHost(host):
    161   """Unlocks a host using the atest CLI.
    162 
    163   Unlocking a host tells Autotest that the host is okay to be scheduled
    164   for other tasks. Returns true if the unlocking process was successful."""
    165   success = subprocess.Popen(
    166       '%s/atest host mod -u %s' % (AUTOTEST_LOCATION, host),
    167       shell=True,
    168       stdout=subprocess.PIPE,
    169       stderr=subprocess.PIPE).wait() == 0
    170 
    171   if not success:
    172     logging.getLogger().info('Failed to unlock host %s.' % host)
    173 
    174   return success
    175 
    176 
    177 def GetHostQueue(options):
    178   """Returns a queue containing unlocked hosts retrieved from the atest CLI.
    179 
    180   If options.label has been specified only unlocked hosts with the specified
    181   label will be returned."""
    182   cmd = ('%s/atest host list --unlocked -s Ready -a acl_cros_test'
    183          % AUTOTEST_LOCATION)
    184 
    185   if options.label:
    186     cmd += ' -b ' + options.label
    187 
    188   # atest host list will return a tabular data set. Use sed to remove the first
    189   # line which contains column labels we don't need. Then since the first column
    190   # contains the host name, use awk to extract it
    191   cmd += " | sed '1d' | awk '{print $1}'"
    192 
    193   stdout = subprocess.Popen(cmd,
    194                             shell=True,
    195                             stdout=subprocess.PIPE,
    196                             stderr=subprocess.PIPE).communicate()[0]
    197 
    198   return deque(item.strip() for item in stdout.split('\n') if item.strip())
    199 
    200 
    201 def ParseOptions():
    202   usage = 'usage: %prog --url=<update url> [options]'
    203   parser = OptionParser(usage)
    204   parser.add_option('-b', '--label', dest='label',
    205                     help='Only update hosts with the specified label.')
    206   parser.add_option('-c', '--concurrent', dest='concurrent',
    207                     default=DEFAULT_CONCURRENCY,
    208                     help=('Number of hosts to be updated concurrently. '
    209                           'Defaults to %d hosts.') % DEFAULT_CONCURRENCY)
    210   parser.add_option('-g', '--gclient', dest='gclient',
    211                     default=DEFAULT_GCLIENT_ROOT,
    212                     help=('Location of ChromeOS checkout. defaults to %s'
    213                     % DEFAULT_GCLIENT_ROOT))
    214   parser.add_option('-l', '--log', dest='log',
    215                     default=DEFAULT_LOG_PATH,
    216                     help=('Where to put individual host log files. '
    217                           'Defaults to %s' % DEFAULT_LOG_PATH))
    218   parser.add_option('-s', '--stagger', dest='stagger',
    219                     default=DEFAULT_STAGGER,
    220                     help=('Attempt to stagger updates. Waits the given amount '
    221                           'of time in minutes before starting each updater. '
    222                           'Updates will still overlap if the value is set as a '
    223                           'multiple of the update period.'))
    224   parser.add_option('-u', '--url', dest='url',
    225                     help='Update URL. Points to build for updating hosts.')
    226 
    227   options = parser.parse_args()[0]
    228 
    229   if options.url is None:
    230     parser.error('An update URL must be provided.')
    231 
    232   return options
    233 
    234 
    235 def InitializeLogging():
    236   """Configure the global logger for time/date stamping console output.
    237 
    238   Returns a logger object for convenience."""
    239   logger = logging.getLogger()
    240   logger.setLevel(logging.INFO)
    241 
    242   stream_handler = logging.StreamHandler()
    243   stream_handler.setLevel(logging.INFO)
    244   stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
    245   logger.addHandler(stream_handler)
    246   return logger
    247 
    248 
    249 def main():
    250   options = ParseOptions()
    251   hosts = GetHostQueue(options)
    252   logger = InitializeLogging()
    253   status = MassUpdateStatus()
    254 
    255   # Create log folder if it doesn't exist.
    256   if not os.path.exists(options.log):
    257     os.makedirs(options.log)
    258 
    259   logger.info('Starting update using URL %s' % options.url)
    260   logger.info('Individual host logs can be found under %s' % options.log)
    261 
    262   try:
    263     # Spawn processing threads which will handle lock, update, and unlock.
    264     for i in range(int(options.concurrent)):
    265       UpdateThread(hosts=hosts, options=options, status=status).start()
    266 
    267       # Stagger threads if the option has been enabled.
    268       if options.stagger > 0:
    269         time.sleep(int(options.stagger) * 60)
    270 
    271     # Wait for all hosts to be processed and threads to complete. NOTE: Not
    272     # using hosts.join() here because it does not behave properly with CTRL-C
    273     # and KeyboardInterrupt.
    274     while len(threading.enumerate()) > 1:
    275       time.sleep(DEFAULT_SLEEP)
    276   except:
    277     traceback.print_exc()
    278     logger.warning(
    279         'Update process aborted. Some machines may be left locked or updating.')
    280     sys.exit(1)
    281   finally:
    282     logger.info(
    283         ('Mass updating complete. %d hosts updated successfully, %d failed.' %
    284         (status.successful_updates, len(status.ssh_failures) +
    285             len(status.update_failures))))
    286 
    287     logger.info(('-' * 25) + '[ SUMMARY ]' + ('-' * 25))
    288 
    289     for host in status.ssh_failures:
    290       logger.info('Failed to SSH to host %s.' % host)
    291 
    292     for host in status.update_failures:
    293       logger.info('Failed to update host %s.' % host)
    294 
    295     if len(status.ssh_failures) == 0 and len(status.update_failures) == 0:
    296       logger.info('All hosts updated successfully.')
    297 
    298     logger.info('-' * 61)
    299 
    300 
    301 if __name__ == '__main__':
    302   main()
    303