1 #! /usr/bin/python 2 3 # Copyright 2015 The Chromium OS Authors. All rights reserved. 4 # Use of this source code is governed by a BSD-style license that can be 5 # found in the LICENSE file. 6 7 """ 8 Manage swarming bots. 9 10 * Launch bots, e.g. 200 bots: 11 $ swarming_bots.py launch --working_dir WORKING_DIR --id_range '1-200' 12 13 * Kill bot 1-200: 14 $ swarming_bots.py kill --working_dir WORKING_DIR --id_range '1-200' 15 16 * Check bot 1-200, start if not running: 17 $ swarming_bots.py check --working_dir WORKING_DIR --id_range '1-200' 18 19 * The hierachy of working dir is like 20 WORKING_DIR 21 |-- bot_0 22 | |-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem 23 | |-- bot_config.log 24 | |-- swarming_bot.log 25 | |-- swarming_bot.zip 26 | |-- swarming_bot.pid 27 |-- bot_1 28 |-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem 29 |-- bot_config.log 30 |-- swarming_bot.log 31 |-- swarming_bot.zip 32 |-- pid 33 Note bot_config.py:get_dimensions() will rely on the the bot number 34 in the path to generate bot id. 35 36 * TODO (fdeng): 37 ** Restart a bot given a bot id. 38 """ 39 import argparse 40 import logging 41 import logging.handlers 42 import os 43 import re 44 import shutil 45 import signal 46 import subprocess 47 import sys 48 import threading 49 import time 50 import urllib 51 52 import common 53 54 from autotest_lib.client.common_lib import global_config 55 56 57 LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' 58 LOG_FILE_SIZE = 1024 * 5000 # 5000 KB 59 LOG_FILE_BACKUPCOUNT = 5 60 DEFAULT_SWARMING_PROXY = global_config.global_config.get_config_value( 61 'CROS', "swarming_proxy", default=None) 62 ID_RANGE_FMT = r'(\d+)-(\d+)' 63 KILL_PROC_TIMEOUT_SECS = 3600 * 3 # 3 hours 64 MAX_KILL_PROC_SLEEP_SECS = 60 65 66 67 class BotManagementError(Exception): 68 """Raised for any bot management related error.""" 69 70 71 class PidMisMatchError(BotManagementError): 72 """Raised if pid file doesn't match what's found by pgrep.""" 73 74 def __init__(self, known_pid, new_pid): 75 """Initialize. 76 77 @param known_pid: pid in the pid file. 78 @param new_pid: new pid found by pgrep. 79 80 """ 81 self.known_pid = known_pid 82 self.new_pid = new_pid 83 msg = 'pid does not match, pid: %s, found %s' % ( 84 self.known_pid, self.new_pid) 85 super(PidMisMatchError, self).__init__(msg) 86 87 88 class DuplicateBotError(BotManagementError): 89 """Raised when multiple processes are detected for the same bot id.""" 90 91 92 class SwarmingBot(object): 93 """Class represent a swarming bot.""" 94 95 96 PID_FILE = 'swarming_bot.pid' 97 BOT_DIR_FORMAT = 'bot_%s' 98 BOT_FILENAME = 'swarming_bot.zip' 99 # Used to search for bot process 100 # The process may bootstrap itself into swarming_bot.1.zip and swarming_bot.2.zip 101 BOT_CMD_PATTERN = 'swarming_bot.*zip start_bot' 102 103 104 def __init__(self, bot_id, parent_dir, swarming_proxy): 105 """Initialize. 106 107 @param bot_id: An integer. 108 @param bot_dir: The working directory for the bot. 109 The directory is used to store bot code, 110 log file, and any file generated by the bot 111 at run time. 112 @param swarming_proxy: URL to the swarming instance. 113 """ 114 self.bot_id = bot_id 115 self.swarming_proxy = swarming_proxy 116 self.parent_dir = os.path.abspath(os.path.expanduser(parent_dir)) 117 self.bot_dir = os.path.join(self.parent_dir, 118 self.BOT_DIR_FORMAT % self.bot_id) 119 self.pid_file = os.path.join(self.bot_dir, self.PID_FILE) 120 self.pid = None 121 self._refresh_pid() 122 if self.pid is None: 123 logging.debug('[Bot %s] Initialize: bot is not running', 124 self.bot_id) 125 else: 126 logging.debug('[Bot %s] Initialize: bot is running ' 127 'as process %s', self.bot_id, self.pid) 128 129 130 def _write_pid(self): 131 """Write pid to file""" 132 with open(self.pid_file, 'w') as f: 133 f.write(str(self.pid)) 134 135 136 def _cleanup_pid(self): 137 """Cleanup self.pid and pid file.""" 138 self.pid = None 139 if os.path.exists(self.pid_file): 140 os.remove(self.pid_file) 141 142 143 def _is_process_running(self): 144 """Check if the process is running.""" 145 pattern = os.path.join(self.bot_dir, self.BOT_CMD_PATTERN) 146 pattern = '%s %s' % (sys.executable, pattern) 147 cmd = ['pgrep', '-f', pattern] 148 logging.debug('[Bot %s] check process running: %s', 149 self.bot_id, str(cmd)) 150 try: 151 output = subprocess.check_output(cmd) 152 pids = output.splitlines() 153 if len(pids) > 1: 154 raise DuplicateBotError('Multiple processes (pid: %s) detected for Bot %s' 155 % (str(pids), self.bot_id)) 156 pid = int(pids[0]) 157 if pid != self.pid: 158 raise PidMisMatchError(self.pid, pid) 159 return True 160 except subprocess.CalledProcessError as e: 161 if e.returncode == 1: 162 return False 163 else: 164 raise 165 166 167 def _refresh_pid(self): 168 """Check process status and update self.pid accordingly.""" 169 # Reload pid from pid file. 170 if os.path.exists(self.pid_file): 171 with open(self.pid_file) as f: 172 try: 173 pid = f.readline().strip() 174 self.pid = int(pid) 175 except ValueError as e: 176 self.pid = None 177 try: 178 if not self._is_process_running(): 179 self._cleanup_pid() 180 except PidMisMatchError as e: 181 logging.error('[Bot %s] %s, updating pid file', 182 self.bot_id, str(e)) 183 self.pid = e.new_pid 184 self._write_pid() 185 186 187 def is_running(self): 188 """Return if the bot is running.""" 189 self._refresh_pid() 190 return bool(self.pid) 191 192 193 def ensure_running(self): 194 """Start a swarming bot.""" 195 if self.is_running(): 196 logging.info( 197 '[Bot %s] Skip start, bot is already running (pid %s).', 198 self.bot_id, self.pid) 199 return 200 logging.debug('[Bot %s] Bootstrap bot in %s', self.bot_id, self.bot_dir) 201 if os.path.exists(self.bot_dir): 202 shutil.rmtree(self.bot_dir) 203 os.makedirs(self.bot_dir) 204 dest = os.path.join(self.bot_dir, self.BOT_FILENAME) 205 logging.debug('[Bot %s] Getting bot code from: %s/bot_code', 206 self.bot_id, self.swarming_proxy) 207 urllib.urlretrieve('%s/bot_code' % self.swarming_proxy, dest) 208 cmd = [sys.executable, self.BOT_FILENAME] 209 logging.debug('[Bot %s] Calling command: %s', self. bot_id, cmd) 210 process = subprocess.Popen( 211 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, 212 cwd=self.bot_dir) 213 self.pid = process.pid 214 self._write_pid() 215 logging.info('[Bot %s] Created bot (pid: %d)', self.bot_id, self.pid) 216 217 218 def kill(self): 219 """Kill the bot.""" 220 if not self.is_running(): 221 logging.info('[Bot %s] Skip killing bot, Bot is not running', 222 self.bot_id) 223 return 224 try: 225 logging.info('[Bot %s] killing bot (pid: %d)', 226 self.bot_id, self.pid) 227 os.kill(self.pid, signal.SIGTERM) 228 start = time.time() 229 sleep = 1 230 while(time.time() - start < KILL_PROC_TIMEOUT_SECS): 231 if not self.is_running(): 232 return 233 sleep = min(sleep * 2, MAX_KILL_PROC_SLEEP_SECS) 234 logging.debug('[Bot %s] Waiting %d secs for bot to finish' 235 ' any running task and exist.', 236 self.bot_id, sleep) 237 time.sleep(sleep) 238 else: 239 logging.error( 240 '[Bot %s] Failed to kill pid %s within %d secs, ' 241 'the bot may be running a long running task, you ' 242 'can retry the script. SIGKILL the process is not ' 243 'recommended, it might lead to unexpected error.', 244 self.bot_id, self.pid, KILL_PROC_TIMEOUT_SECS) 245 except Exception as e: 246 raise BotManagementError('[Bot %s] %s' % (self.bot_id, str(e))) 247 248 249 class BotManager(object): 250 """Class that manages swarming bots.""" 251 252 253 CHECK_BOTS_PATTERN = '{executable} {working_dir}.*{bot_cmd_pattern}' 254 255 256 def __init__(self, bot_ids, working_dir, swarming_proxy): 257 """Initialize. 258 259 @param bot_ids: a set of integers. 260 @param working_dir: Working directory of the bots. 261 Store temporary files. 262 @param swarming_proxy: The swarming instance to talk to. 263 """ 264 self.bot_ids = bot_ids 265 self.working_dir = os.path.abspath(os.path.expanduser(working_dir)) 266 self.bots = [SwarmingBot(bid, self.working_dir, swarming_proxy) 267 for bid in bot_ids] 268 269 def launch(self): 270 """Launch bots.""" 271 for bot in self.bots: 272 try: 273 bot.ensure_running() 274 except BotManagementError as e: 275 logging.error('[BotManager] Failed to start Bot %s: %s', 276 bot.bot_id, str(e)) 277 # If we let the process exit immediately, the last process won't 278 # be launched sometimes. So sleep for 3 secs. 279 # The right way is to query the server until all bots are seen 280 # by the server by visiting 281 # https://SWARMING_PROXY/swarming/api/v1/client/bots 282 # However, this would require oauth authentication (install 283 # oauth library and install credentials). 284 logging.info('Wait 3 seconds for process creation to complete.') 285 time.sleep(3) 286 287 288 def kill(self): 289 """Kill running bots.""" 290 # Start threads to kill bots. 291 threads = [] 292 for bot in self.bots: 293 t = threading.Thread(target=bot.kill) 294 threads.append(t) 295 t.setDaemon(True) 296 t.start() 297 # Wait on threads. 298 try: 299 while threading.active_count() > 1: 300 time.sleep(0.1) 301 except KeyboardInterrupt: 302 msg = 'Ctrl-c recieved! Bots status not confirmed. Exit.' 303 logging.error(msg) 304 print msg 305 306 307 def check(self): 308 """Check running bots, start it if not running.""" 309 pattern = self.CHECK_BOTS_PATTERN.format( 310 executable=sys.executable, working_dir=self.working_dir, 311 bot_cmd_pattern=SwarmingBot.BOT_CMD_PATTERN) 312 cmd = ['pgrep', '-f', pattern] 313 logging.debug('[BotManager] Check bot counts: %s', str(cmd)) 314 try: 315 output = subprocess.check_output(cmd) 316 bot_count = len(output.splitlines()) 317 except subprocess.CalledProcessError as e: 318 if e.returncode == 1: 319 bot_count = 0 320 else: 321 raise 322 missing_count = len(self.bot_ids) - bot_count 323 logging.info( 324 '[BotManager] Check bot counts: %d bots running, missing: %d', 325 bot_count, missing_count) 326 if missing_count > 0: 327 logging.info('[BotManager] Checking all bots') 328 self.launch() 329 330 331 def _parse_range(id_range): 332 """Convert an id range to a set of bot ids. 333 334 @param id_range: A range of integer, e.g "1-200". 335 336 @returns a set of bot ids set([1,2,...200]) 337 """ 338 m = re.match(ID_RANGE_FMT, id_range) 339 if not m: 340 raise ValueError('Could not parse %s' % id_range) 341 min, max = int(m.group(1)), int(m.group(2)) 342 return set(bid for bid in range(min, max+1)) 343 344 345 def _parse_args(args): 346 """Parse args. 347 348 @param args: Argument list passed from main. 349 350 @return: A tuple with the parsed args, as returned by parser.parse_args. 351 """ 352 parser = argparse.ArgumentParser( 353 description='Launch swarming bots on a autotest server') 354 action_help = ('launch: launch bots. ' 355 'kill: kill bots. ' 356 'check: check if bots are running, if not, starting bots.') 357 parser.add_argument( 358 'action', choices=('launch', 'kill', 'check'), help=action_help) 359 parser.add_argument( 360 '-r', '--id_range', type=str, dest='id_range', required=True, 361 help='A range of integer, each bot created will be labeled ' 362 'with an id from this range. E.g. "1-200"') 363 parser.add_argument( 364 '-d', '--working_dir', type=str, dest='working_dir', required=True, 365 help='A working directory where bots will store files ' 366 'generated at runtime') 367 parser.add_argument( 368 '-p', '--swarming_proxy', type=str, dest='swarming_proxy', 369 default=DEFAULT_SWARMING_PROXY, 370 help='The URL of the swarming instance to talk to, ' 371 'Default to the one specified in global config') 372 parser.add_argument( 373 '-f', '--log_file', dest='log_file', required=False, 374 help='Path to the log file.') 375 parser.add_argument( 376 '-v', '--verbose', dest='verbose', action='store_true', 377 help='Verbose mode') 378 379 return parser.parse_args(args) 380 381 382 def _setup_logging(verbose, log_file): 383 """Setup logging. 384 385 @param verbose: bool, if True, log at DEBUG level, otherwise INFO level. 386 @param log_file; path to log file. 387 """ 388 log_formatter = logging.Formatter(LOGGING_FORMAT) 389 if not log_file: 390 handler = logging.StreamHandler() 391 else: 392 handler = logging.handlers.RotatingFileHandler( 393 filename=log_file, maxBytes=LOG_FILE_SIZE, 394 backupCount=LOG_FILE_BACKUPCOUNT) 395 handler.setFormatter(log_formatter) 396 logger = logging.getLogger() 397 log_level = logging.DEBUG if verbose else logging.INFO 398 logger.setLevel(log_level) 399 logger.addHandler(handler) 400 401 402 def main(args): 403 """Main. 404 405 @args: A list of system arguments. 406 """ 407 args = _parse_args(args) 408 _setup_logging(args.verbose, args.log_file) 409 410 if not args.swarming_proxy: 411 logging.error( 412 'No swarming proxy instance specified. ' 413 'Specify swarming_proxy in [CROS] in shadow_config, ' 414 'or use --swarming_proxy') 415 return 1 416 if not args.swarming_proxy.startswith('https://'): 417 swarming_proxy = 'https://' + args.swarming_proxy 418 else: 419 swarming_proxy = args.swarming_proxy 420 421 logging.info('Connecting to %s', swarming_proxy) 422 m = BotManager(_parse_range(args.id_range), 423 args.working_dir, args.swarming_proxy) 424 425 if args.action == 'launch': 426 m.launch() 427 elif args.action == 'kill': 428 m.kill() 429 elif args.action == 'check': 430 m.check() 431 432 433 if __name__ == '__main__': 434 sys.exit(main(sys.argv[1:])) 435