1 #!/usr/bin/python 2 # 3 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 4 # Use of this source code is governed by a BSD-style license that can be 5 # found in the LICENSE file. 6 7 """Updates all unlocked hosts in Autotest lab in parallel at a given rate. 8 9 Used to update all hosts, or only those of a given platform, in the Autotest 10 lab to a given version. Allows a configurable number of updates to be started in 11 parallel. Updates can also be staggered to reduce load.""" 12 13 import logging 14 import os 15 import subprocess 16 import sys 17 import threading 18 import time 19 import traceback 20 21 from collections import deque 22 from optparse import OptionParser 23 24 25 # Default number of hosts to update in parallel. 26 DEFAULT_CONCURRENCY = 10 27 28 29 # By default do not stagger any of the updates. 30 DEFAULT_STAGGER = 0 31 32 33 # Default location of ChromeOS checkout. 34 DEFAULT_GCLIENT_ROOT = '/usr/local/google/home/${USER}/chromeos/chromeos' 35 36 37 # Default path for individual host logs. Each host will have it's own file. E.g. 38 # <default_log_path>/<host>.log 39 DEFAULT_LOG_PATH = '/tmp/mass_update_logs/%s/' % time.strftime('%Y-%m-%d-%H-%M', 40 time.gmtime()) 41 42 43 # Location of Autotest cli executable. 44 AUTOTEST_LOCATION = '/home/chromeos-test/autotest/cli' 45 46 47 # Default time in seconds to sleep while waiting for threads to complete. 48 DEFAULT_SLEEP = 10 49 50 51 # Amount of time in seconds to wait before declaring an update as failed. 52 DEFAULT_TIMEOUT = 2400 53 54 55 class MassUpdateStatus(): 56 """Used to track status for all updates.""" 57 ssh_failures = [] 58 update_failures = [] 59 successful_updates = 0 60 61 62 class UpdateThread(threading.Thread): 63 """Responsible for ssh-test, locking, imaging, and unlocking a host. 64 65 Uses the atest CLI for host control and the image_to_live script to actually 66 update the host. Each thread will continue to process hosts until the queue 67 is empty.""" 68 69 _SUCCESS = 0 # Update was successful. 70 _SSH_FAILURE = 1 # Could not SSH to host or related SSH failure. 71 _UPDATE_FAILURE = 2 # Update failed for any reason other than SSH. 72 73 def __init__(self, options, hosts, status): 74 self._options = options 75 self._hosts = hosts 76 self._status = status 77 self._logger = logging.getLogger() 78 threading.Thread.__init__(self) 79 80 def run(self): 81 while self._hosts: 82 host = self._hosts.popleft() 83 status = UpdateThread._UPDATE_FAILURE 84 85 self._logger.info('Updating host %s' % host) 86 try: 87 try: 88 if not CheckSSH(host=host, options=self._options): 89 status = UpdateThread._SSH_FAILURE 90 elif LockHost(host) and ImageHost(host=host, options=self._options): 91 status = UpdateThread._SUCCESS 92 finally: 93 if status == UpdateThread._SUCCESS: 94 self._logger.info( 95 'Completed update for host %s successfully.' % host) 96 self._status.successful_updates += 1 97 elif status == UpdateThread._SSH_FAILURE: 98 self._logger.info('Failed to SSH to host %s.' % host) 99 self._status.ssh_failures.append(host) 100 else: 101 self._logger.info('Failed to update host %s.' % host) 102 self._status.update_failures.append(host) 103 104 UnlockHost(host) 105 except: 106 traceback.print_exc() 107 self._logger.warning( 108 'Exception encountered during update. Skipping host %s.' % host) 109 110 111 def CheckSSH(host, options): 112 """Uses the ssh_test script to ensure SSH access to a host is available. 113 114 Returns true if an SSH connection to the host was successful.""" 115 return subprocess.Popen( 116 '%s/src/scripts/ssh_test.sh --remote=%s' % (options.gclient, host), 117 shell=True, 118 stdout=subprocess.PIPE, 119 stderr=subprocess.PIPE).wait() == 0 120 121 122 def ImageHost(host, options): 123 """Uses the image_to_live script to update a host. 124 125 Returns true if the imaging process was successful.""" 126 log_file = open(os.path.join(options.log, host + '.log'), 'w') 127 log_file_err = open(os.path.join(options.log, host + '.log.err'), 'w') 128 129 exit_code = subprocess.Popen( 130 ('/usr/local/scripts/alarm %d %s/src/scripts/image_to_live.sh ' 131 '--update_url %s --remote %s' % (DEFAULT_TIMEOUT, options.gclient, 132 options.url, host)), 133 shell=True, 134 stdout=log_file, 135 stderr=log_file_err).wait() 136 137 log_file.close() 138 log_file_err.close() 139 140 return exit_code == 0 141 142 143 def LockHost(host): 144 """Locks a host using the atest CLI. 145 146 Locking a host tells Autotest that the host shouldn't be scheduled for 147 any other tasks. Returns true if the locking process was successful.""" 148 success = subprocess.Popen( 149 '%s/atest host mod -l %s' % (AUTOTEST_LOCATION, host), 150 shell=True, 151 stdout=subprocess.PIPE, 152 stderr=subprocess.PIPE).wait() == 0 153 154 if not success: 155 logging.getLogger().info('Failed to lock host %s.' % host) 156 157 return success 158 159 160 def UnlockHost(host): 161 """Unlocks a host using the atest CLI. 162 163 Unlocking a host tells Autotest that the host is okay to be scheduled 164 for other tasks. Returns true if the unlocking process was successful.""" 165 success = subprocess.Popen( 166 '%s/atest host mod -u %s' % (AUTOTEST_LOCATION, host), 167 shell=True, 168 stdout=subprocess.PIPE, 169 stderr=subprocess.PIPE).wait() == 0 170 171 if not success: 172 logging.getLogger().info('Failed to unlock host %s.' % host) 173 174 return success 175 176 177 def GetHostQueue(options): 178 """Returns a queue containing unlocked hosts retrieved from the atest CLI. 179 180 If options.label has been specified only unlocked hosts with the specified 181 label will be returned.""" 182 cmd = ('%s/atest host list --unlocked -s Ready -a acl_cros_test' 183 % AUTOTEST_LOCATION) 184 185 if options.label: 186 cmd += ' -b ' + options.label 187 188 # atest host list will return a tabular data set. Use sed to remove the first 189 # line which contains column labels we don't need. Then since the first column 190 # contains the host name, use awk to extract it 191 cmd += " | sed '1d' | awk '{print $1}'" 192 193 stdout = subprocess.Popen(cmd, 194 shell=True, 195 stdout=subprocess.PIPE, 196 stderr=subprocess.PIPE).communicate()[0] 197 198 return deque(item.strip() for item in stdout.split('\n') if item.strip()) 199 200 201 def ParseOptions(): 202 usage = 'usage: %prog --url=<update url> [options]' 203 parser = OptionParser(usage) 204 parser.add_option('-b', '--label', dest='label', 205 help='Only update hosts with the specified label.') 206 parser.add_option('-c', '--concurrent', dest='concurrent', 207 default=DEFAULT_CONCURRENCY, 208 help=('Number of hosts to be updated concurrently. ' 209 'Defaults to %d hosts.') % DEFAULT_CONCURRENCY) 210 parser.add_option('-g', '--gclient', dest='gclient', 211 default=DEFAULT_GCLIENT_ROOT, 212 help=('Location of ChromeOS checkout. defaults to %s' 213 % DEFAULT_GCLIENT_ROOT)) 214 parser.add_option('-l', '--log', dest='log', 215 default=DEFAULT_LOG_PATH, 216 help=('Where to put individual host log files. ' 217 'Defaults to %s' % DEFAULT_LOG_PATH)) 218 parser.add_option('-s', '--stagger', dest='stagger', 219 default=DEFAULT_STAGGER, 220 help=('Attempt to stagger updates. Waits the given amount ' 221 'of time in minutes before starting each updater. ' 222 'Updates will still overlap if the value is set as a ' 223 'multiple of the update period.')) 224 parser.add_option('-u', '--url', dest='url', 225 help='Update URL. Points to build for updating hosts.') 226 227 options = parser.parse_args()[0] 228 229 if options.url is None: 230 parser.error('An update URL must be provided.') 231 232 return options 233 234 235 def InitializeLogging(): 236 """Configure the global logger for time/date stamping console output. 237 238 Returns a logger object for convenience.""" 239 logger = logging.getLogger() 240 logger.setLevel(logging.INFO) 241 242 stream_handler = logging.StreamHandler() 243 stream_handler.setLevel(logging.INFO) 244 stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) 245 logger.addHandler(stream_handler) 246 return logger 247 248 249 def main(): 250 options = ParseOptions() 251 hosts = GetHostQueue(options) 252 logger = InitializeLogging() 253 status = MassUpdateStatus() 254 255 # Create log folder if it doesn't exist. 256 if not os.path.exists(options.log): 257 os.makedirs(options.log) 258 259 logger.info('Starting update using URL %s' % options.url) 260 logger.info('Individual host logs can be found under %s' % options.log) 261 262 try: 263 # Spawn processing threads which will handle lock, update, and unlock. 264 for i in range(int(options.concurrent)): 265 UpdateThread(hosts=hosts, options=options, status=status).start() 266 267 # Stagger threads if the option has been enabled. 268 if options.stagger > 0: 269 time.sleep(int(options.stagger) * 60) 270 271 # Wait for all hosts to be processed and threads to complete. NOTE: Not 272 # using hosts.join() here because it does not behave properly with CTRL-C 273 # and KeyboardInterrupt. 274 while len(threading.enumerate()) > 1: 275 time.sleep(DEFAULT_SLEEP) 276 except: 277 traceback.print_exc() 278 logger.warning( 279 'Update process aborted. Some machines may be left locked or updating.') 280 sys.exit(1) 281 finally: 282 logger.info( 283 ('Mass updating complete. %d hosts updated successfully, %d failed.' % 284 (status.successful_updates, len(status.ssh_failures) + 285 len(status.update_failures)))) 286 287 logger.info(('-' * 25) + '[ SUMMARY ]' + ('-' * 25)) 288 289 for host in status.ssh_failures: 290 logger.info('Failed to SSH to host %s.' % host) 291 292 for host in status.update_failures: 293 logger.info('Failed to update host %s.' % host) 294 295 if len(status.ssh_failures) == 0 and len(status.update_failures) == 0: 296 logger.info('All hosts updated successfully.') 297 298 logger.info('-' * 61) 299 300 301 if __name__ == '__main__': 302 main() 303