Home | History | Annotate | Download | only in chromeos_proxy
      1 #! /usr/bin/python
      2 
      3 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """
      8 Manage swarming bots.
      9 
     10 * Launch bots, e.g. 200 bots:
     11     $ swarming_bots.py launch --working_dir WORKING_DIR --id_range '1-200'
     12 
     13 * Kill bot 1-200:
     14     $ swarming_bots.py kill --working_dir WORKING_DIR --id_range '1-200'
     15 
     16 * Check bot 1-200, start if not running:
     17     $ swarming_bots.py check --working_dir WORKING_DIR --id_range '1-200'
     18 
     19 * The hierachy of working dir is like
     20   WORKING_DIR
     21     |-- bot_0
     22     |   |-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem
     23     |   |-- bot_config.log
     24     |   |-- swarming_bot.log
     25     |   |-- swarming_bot.zip
     26     |   |-- swarming_bot.pid
     27     |-- bot_1
     28         |-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem
     29         |-- bot_config.log
     30         |-- swarming_bot.log
     31         |-- swarming_bot.zip
     32         |-- pid
     33   Note bot_config.py:get_dimensions() will rely on the the bot number
     34   in the path to generate bot id.
     35 
     36 * TODO (fdeng):
     37     ** Restart a bot given a bot id.
     38 """
     39 import argparse
     40 import logging
     41 import logging.handlers
     42 import os
     43 import re
     44 import shutil
     45 import signal
     46 import socket
     47 import subprocess
     48 import sys
     49 import threading
     50 import time
     51 import urllib
     52 
     53 import common
     54 
     55 from autotest_lib.client.common_lib import global_config
     56 
     57 
     58 LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
     59 LOG_FILE_SIZE = 1024 * 5000 # 5000 KB
     60 LOG_FILE_BACKUPCOUNT = 5
     61 DEFAULT_SWARMING_PROXY = global_config.global_config.get_config_value(
     62         'CROS', "swarming_proxy", default=None)
     63 ID_RANGE_FMT = r'(\d+)-(\d+)'
     64 KILL_PROC_TIMEOUT_SECS = 3600 * 3 # 3 hours
     65 MAX_KILL_PROC_SLEEP_SECS = 60
     66 
     67 
     68 class BotManagementError(Exception):
     69     """Raised for any bot management related error."""
     70 
     71 
     72 class PidMisMatchError(BotManagementError):
     73     """Raised if pid file doesn't match what's found by pgrep."""
     74 
     75     def __init__(self, known_pid, new_pid):
     76         """Initialize.
     77 
     78         @param known_pid: pid in the pid file.
     79         @param new_pid: new pid found by pgrep.
     80 
     81         """
     82         self.known_pid = known_pid
     83         self.new_pid = new_pid
     84         msg = 'pid does not match, pid: %s, found %s' % (
     85                 self.known_pid, self.new_pid)
     86         super(PidMisMatchError, self).__init__(msg)
     87 
     88 
     89 class DuplicateBotError(BotManagementError):
     90     """Raised when multiple processes are detected for the same bot id."""
     91 
     92 
     93 def get_hostname():
     94     return socket.getfqdn().split(u'.', 1)[0]
     95 
     96 
     97 class SwarmingBot(object):
     98     """Class represent a swarming bot."""
     99 
    100 
    101     PID_FILE = 'swarming_bot.pid'
    102     BOT_DIR_FORMAT = 'bot_%s'
    103     BOT_FILENAME = 'swarming_bot.zip'
    104     # Used to search for bot process
    105     # The process may bootstrap itself into swarming_bot.1.zip and swarming_bot.2.zip
    106     BOT_CMD_PATTERN = 'swarming_bot.*zip start_bot'
    107 
    108 
    109     def __init__(self, bot_id, parent_dir, swarming_proxy,
    110                  specify_bot_id=False):
    111         """Initialize.
    112 
    113         @param bot_id: An integer.
    114         @param bot_dir: The working directory for the bot.
    115                         The directory is used to store bot code,
    116                         log file, and any file generated by the bot
    117                         at run time.
    118         @param swarming_proxy: URL to the swarming instance.
    119         """
    120         self.bot_id = bot_id
    121         self.specify_bot_id = specify_bot_id
    122         if specify_bot_id:
    123             self.bot_id = '%s-%s' % (get_hostname(), str(self.bot_id))
    124 
    125         self.swarming_proxy = swarming_proxy
    126         self.parent_dir = os.path.abspath(os.path.expanduser(parent_dir))
    127         self.bot_dir = os.path.join(self.parent_dir,
    128                                     self.BOT_DIR_FORMAT % self.bot_id)
    129         self.pid_file = os.path.join(self.bot_dir, self.PID_FILE)
    130         self.pid = None
    131         self._refresh_pid()
    132         if self.pid is None:
    133             logging.debug('[Bot %s] Initialize: bot is not running',
    134                           self.bot_id)
    135         else:
    136             logging.debug('[Bot %s] Initialize: bot is running '
    137                           'as process %s', self.bot_id, self.pid)
    138 
    139 
    140     def _write_pid(self):
    141         """Write pid to file"""
    142         with open(self.pid_file, 'w') as f:
    143             f.write(str(self.pid))
    144 
    145 
    146     def _cleanup_pid(self):
    147         """Cleanup self.pid and pid file."""
    148         self.pid = None
    149         if os.path.exists(self.pid_file):
    150             os.remove(self.pid_file)
    151 
    152 
    153     def _is_process_running(self):
    154         """Check if the process is running."""
    155         pattern = os.path.join(self.bot_dir, self.BOT_CMD_PATTERN)
    156         pattern = '%s %s' % (sys.executable, pattern)
    157         cmd = ['pgrep', '-f', pattern]
    158         logging.debug('[Bot %s] check process running: %s',
    159                       self.bot_id, str(cmd))
    160         try:
    161             output = subprocess.check_output(cmd)
    162             pids = output.splitlines()
    163             if len(pids) > 1:
    164                 raise DuplicateBotError('Multiple processes (pid: %s) detected for Bot %s'
    165                                         % (str(pids), self.bot_id))
    166             pid = int(pids[0])
    167             if pid != self.pid:
    168                 raise PidMisMatchError(self.pid, pid)
    169             return True
    170         except subprocess.CalledProcessError as e:
    171             if e.returncode == 1:
    172                 return False
    173             else:
    174                 raise
    175 
    176 
    177     def _refresh_pid(self):
    178         """Check process status and update self.pid accordingly."""
    179         # Reload pid from pid file.
    180         if os.path.exists(self.pid_file):
    181             with open(self.pid_file) as f:
    182                 try:
    183                     pid = f.readline().strip()
    184                     self.pid = int(pid)
    185                 except ValueError as e:
    186                     self.pid = None
    187         try:
    188             if not self._is_process_running():
    189                 self._cleanup_pid()
    190         except PidMisMatchError as e:
    191             logging.error('[Bot %s] %s, updating pid file',
    192                           self.bot_id, str(e))
    193             self.pid = e.new_pid
    194             self._write_pid()
    195 
    196 
    197     def is_running(self):
    198         """Return if the bot is running."""
    199         self._refresh_pid()
    200         return bool(self.pid)
    201 
    202 
    203     def ensure_running(self):
    204         """Start a swarming bot."""
    205         if self.is_running():
    206             logging.info(
    207                     '[Bot %s] Skip start, bot is already running (pid %s).',
    208                     self.bot_id, self.pid)
    209             return
    210         logging.debug('[Bot %s] Bootstrap bot in %s', self.bot_id, self.bot_dir)
    211         if os.path.exists(self.bot_dir):
    212             shutil.rmtree(self.bot_dir)
    213         os.makedirs(self.bot_dir)
    214         dest = os.path.join(self.bot_dir, self.BOT_FILENAME)
    215         new_env = dict(os.environ)
    216         new_env['SWARMING_EXTERNAL_BOT_SETUP'] = '1'
    217         logging.debug('[Bot %s] Getting bot code from: %s/bot_code',
    218                       self.bot_id, self.swarming_proxy)
    219         if self.specify_bot_id:
    220             url = '%s/bot_code?bot_id=%s' % (self.swarming_proxy, self.bot_id)
    221             new_env['SWARMING_BOT_ID'] = self.bot_id
    222         else:
    223             url = '%s/bot_code' % self.swarming_proxy
    224 
    225         logging.info('Download bot code from %s', url)
    226         urllib.urlretrieve(url, dest)
    227         cmd = [sys.executable, self.BOT_FILENAME]
    228         logging.debug('[Bot %s] Calling command: %s', self. bot_id, cmd)
    229         process = subprocess.Popen(
    230                 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
    231                 cwd=self.bot_dir, env=new_env)
    232         self.pid = process.pid
    233         self._write_pid()
    234         logging.info('[Bot %s] Created bot (pid: %d)', self.bot_id, self.pid)
    235 
    236 
    237     def kill(self):
    238         """Kill the bot."""
    239         if not self.is_running():
    240             logging.info('[Bot %s] Skip killing bot, Bot is not running',
    241                           self.bot_id)
    242             return
    243         try:
    244             logging.info('[Bot %s] killing bot (pid: %d)',
    245                           self.bot_id, self.pid)
    246             os.kill(self.pid, signal.SIGTERM)
    247             start = time.time()
    248             sleep = 1
    249             while(time.time() - start < KILL_PROC_TIMEOUT_SECS):
    250                 if not self.is_running():
    251                     return
    252                 sleep = min(sleep * 2, MAX_KILL_PROC_SLEEP_SECS)
    253                 logging.debug('[Bot %s] Waiting %d secs for bot to finish'
    254                               ' any running task and exist.',
    255                               self.bot_id, sleep)
    256                 time.sleep(sleep)
    257             else:
    258                 logging.error(
    259                         '[Bot %s] Failed to kill pid %s within %d secs, '
    260                         'the bot may be running a long running task, you '
    261                         'can retry the script. SIGKILL the process is not '
    262                         'recommended, it might lead to unexpected error.',
    263                         self.bot_id, self.pid, KILL_PROC_TIMEOUT_SECS)
    264         except Exception as e:
    265             raise BotManagementError('[Bot %s] %s' % (self.bot_id, str(e)))
    266 
    267 
    268 class BotManager(object):
    269     """Class that manages swarming bots."""
    270 
    271 
    272     CHECK_BOTS_PATTERN = '{executable} {working_dir}.*{bot_cmd_pattern}'
    273 
    274 
    275     def __init__(self, bot_ids, working_dir, swarming_proxy,
    276                  specify_bot_id=False):
    277         """Initialize.
    278 
    279         @param bot_ids: a set of integers.
    280         @param working_dir: Working directory of the bots.
    281                             Store temporary files.
    282         @param swarming_proxy: The swarming instance to talk to.
    283         """
    284         self.bot_ids = bot_ids
    285         self.working_dir = os.path.abspath(os.path.expanduser(working_dir))
    286         self.bots = [SwarmingBot(bid, self.working_dir, swarming_proxy,
    287                                  specify_bot_id)
    288                      for bid in bot_ids]
    289 
    290     def launch(self):
    291         """Launch bots."""
    292         for bot in self.bots:
    293           try:
    294               bot.ensure_running()
    295           except BotManagementError as e:
    296               logging.error('[BotManager] Failed to start Bot %s: %s',
    297                             bot.bot_id, str(e))
    298         # If we let the process exit immediately, the last process won't
    299         # be launched sometimes. So sleep for 3 secs.
    300         # The right way is to query the server until all bots are seen
    301         # by the server by visiting
    302         # https://SWARMING_PROXY/swarming/api/v1/client/bots
    303         # However, this would require oauth authentication (install
    304         # oauth library and install credentials).
    305         logging.info('Wait 3 seconds for process creation to complete.')
    306         time.sleep(3)
    307 
    308 
    309     def kill(self):
    310         """Kill running bots."""
    311         # Start threads to kill bots.
    312         threads = []
    313         for bot in self.bots:
    314             t = threading.Thread(target=bot.kill)
    315             threads.append(t)
    316             t.setDaemon(True)
    317             t.start()
    318         # Wait on threads.
    319         try:
    320             while threading.active_count() > 1:
    321                 time.sleep(0.1)
    322         except KeyboardInterrupt:
    323             msg = 'Ctrl-c recieved! Bots status not confirmed. Exit.'
    324             logging.error(msg)
    325             print msg
    326 
    327 
    328     def check(self):
    329         """Check running bots, start it if not running."""
    330         pattern =  self.CHECK_BOTS_PATTERN.format(
    331                 executable=sys.executable, working_dir=self.working_dir,
    332                 bot_cmd_pattern=SwarmingBot.BOT_CMD_PATTERN)
    333         cmd = ['pgrep', '-f', pattern]
    334         logging.debug('[BotManager] Check bot counts: %s', str(cmd))
    335         try:
    336             output = subprocess.check_output(cmd)
    337             bot_count = len(output.splitlines())
    338         except subprocess.CalledProcessError as e:
    339             if e.returncode == 1:
    340                 bot_count = 0
    341             else:
    342                 raise
    343         missing_count = len(self.bot_ids) - bot_count
    344         logging.info(
    345                 '[BotManager] Check bot counts: %d bots running, missing: %d',
    346                 bot_count, missing_count)
    347         if missing_count > 0:
    348             logging.info('[BotManager] Checking all bots')
    349             self.launch()
    350 
    351 
    352 def parse_range(id_range):
    353     """Convert an id range to a set of bot ids.
    354 
    355     @param id_range: A range of integer, e.g "1-200".
    356 
    357     @returns a set of bot ids set([1,2,...200])
    358     """
    359     m = re.match(ID_RANGE_FMT, id_range)
    360     if not m:
    361         raise ValueError('Could not parse %s' % id_range)
    362     min, max = int(m.group(1)), int(m.group(2))
    363     return set(bid for bid in range(min, max+1))
    364 
    365 
    366 def _parse_args(args):
    367     """Parse args.
    368 
    369     @param args: Argument list passed from main.
    370 
    371     @return: A tuple with the parsed args, as returned by parser.parse_args.
    372     """
    373     parser = argparse.ArgumentParser(
    374             description='Launch swarming bots on a autotest server')
    375     action_help = ('launch: launch bots. '
    376                   'kill: kill bots. '
    377                   'check: check if bots are running, if not, starting bots.')
    378     parser.add_argument(
    379             'action', choices=('launch', 'kill', 'check'), help=action_help)
    380     parser.add_argument(
    381             '-r', '--id_range', type=str, dest='id_range', required=True,
    382             help='A range of integer, each bot created will be labeled '
    383                  'with an id from this range. E.g. "1-200"')
    384     parser.add_argument(
    385             '-d', '--working_dir', type=str, dest='working_dir', required=True,
    386             help='A working directory where bots will store files '
    387                  'generated at runtime')
    388     parser.add_argument(
    389             '-p', '--swarming_proxy', type=str, dest='swarming_proxy',
    390             default=DEFAULT_SWARMING_PROXY,
    391             help='The URL of the swarming instance to talk to, '
    392                  'Default to the one specified in global config')
    393     parser.add_argument(
    394             '-f', '--log_file', dest='log_file',
    395             help='Path to the log file.')
    396     parser.add_argument(
    397             '-v', '--verbose', dest='verbose', action='store_true',
    398             help='Verbose mode')
    399 
    400     return parser.parse_args(args)
    401 
    402 
    403 def setup_logging(verbose, log_file):
    404     """Setup logging.
    405 
    406     @param verbose: bool, if True, log at DEBUG level, otherwise INFO level.
    407     @param log_file; path to log file.
    408     """
    409     log_formatter = logging.Formatter(LOGGING_FORMAT)
    410     if not log_file:
    411         handler = logging.StreamHandler()
    412     else:
    413         handler = logging.handlers.RotatingFileHandler(
    414                 filename=log_file, maxBytes=LOG_FILE_SIZE,
    415                 backupCount=LOG_FILE_BACKUPCOUNT)
    416     handler.setFormatter(log_formatter)
    417     logger = logging.getLogger()
    418     log_level = logging.DEBUG if verbose else logging.INFO
    419     logger.setLevel(log_level)
    420     logger.addHandler(handler)
    421 
    422 
    423 def main(args):
    424     """Main.
    425 
    426     @args: A list of system arguments.
    427     """
    428     args = _parse_args(args)
    429     setup_logging(args.verbose, args.log_file)
    430 
    431     if not args.swarming_proxy:
    432         logging.error(
    433                 'No swarming proxy instance specified. '
    434                 'Specify swarming_proxy in [CROS] in shadow_config, '
    435                 'or use --swarming_proxy')
    436         return 1
    437     if not args.swarming_proxy.startswith('https://'):
    438         swarming_proxy = 'https://' + args.swarming_proxy
    439     else:
    440         swarming_proxy = args.swarming_proxy
    441 
    442     logging.info('Connecting to %s', swarming_proxy)
    443     m = BotManager(parse_range(args.id_range),
    444                    args.working_dir, args.swarming_proxy)
    445 
    446     if args.action == 'launch':
    447         m.launch()
    448     elif args.action == 'kill':
    449         m.kill()
    450     elif args.action == 'check':
    451         m.check()
    452 
    453 
    454 if __name__ == '__main__':
    455     sys.exit(main(sys.argv[1:]))
    456