Home | History | Annotate | Download | only in chromeos_proxy
      1 #! /usr/bin/python
      2 
      3 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """
      8 Manage swarming bots.
      9 
     10 * Launch bots, e.g. 200 bots:
     11     $ swarming_bots.py launch --working_dir WORKING_DIR --id_range '1-200'
     12 
     13 * Kill bot 1-200:
     14     $ swarming_bots.py kill --working_dir WORKING_DIR --id_range '1-200'
     15 
     16 * Check bot 1-200, start if not running:
     17     $ swarming_bots.py check --working_dir WORKING_DIR --id_range '1-200'
     18 
     19 * The hierachy of working dir is like
     20   WORKING_DIR
     21     |-- bot_0
     22     |   |-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem
     23     |   |-- bot_config.log
     24     |   |-- swarming_bot.log
     25     |   |-- swarming_bot.zip
     26     |   |-- swarming_bot.pid
     27     |-- bot_1
     28         |-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem
     29         |-- bot_config.log
     30         |-- swarming_bot.log
     31         |-- swarming_bot.zip
     32         |-- pid
     33   Note bot_config.py:get_dimensions() will rely on the the bot number
     34   in the path to generate bot id.
     35 
     36 * TODO (fdeng):
     37     ** Restart a bot given a bot id.
     38 """
     39 import argparse
     40 import logging
     41 import logging.handlers
     42 import os
     43 import re
     44 import shutil
     45 import signal
     46 import subprocess
     47 import sys
     48 import threading
     49 import time
     50 import urllib
     51 
     52 import common
     53 
     54 from autotest_lib.client.common_lib import global_config
     55 
     56 
     57 LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
     58 LOG_FILE_SIZE = 1024 * 5000 # 5000 KB
     59 LOG_FILE_BACKUPCOUNT = 5
     60 DEFAULT_SWARMING_PROXY = global_config.global_config.get_config_value(
     61         'CROS', "swarming_proxy", default=None)
     62 ID_RANGE_FMT = r'(\d+)-(\d+)'
     63 KILL_PROC_TIMEOUT_SECS = 3600 * 3 # 3 hours
     64 MAX_KILL_PROC_SLEEP_SECS = 60
     65 
     66 
     67 class BotManagementError(Exception):
     68     """Raised for any bot management related error."""
     69 
     70 
     71 class PidMisMatchError(BotManagementError):
     72     """Raised if pid file doesn't match what's found by pgrep."""
     73 
     74     def __init__(self, known_pid, new_pid):
     75         """Initialize.
     76 
     77         @param known_pid: pid in the pid file.
     78         @param new_pid: new pid found by pgrep.
     79 
     80         """
     81         self.known_pid = known_pid
     82         self.new_pid = new_pid
     83         msg = 'pid does not match, pid: %s, found %s' % (
     84                 self.known_pid, self.new_pid)
     85         super(PidMisMatchError, self).__init__(msg)
     86 
     87 
     88 class DuplicateBotError(BotManagementError):
     89     """Raised when multiple processes are detected for the same bot id."""
     90 
     91 
     92 class SwarmingBot(object):
     93     """Class represent a swarming bot."""
     94 
     95 
     96     PID_FILE = 'swarming_bot.pid'
     97     BOT_DIR_FORMAT = 'bot_%s'
     98     BOT_FILENAME = 'swarming_bot.zip'
     99     # Used to search for bot process
    100     # The process may bootstrap itself into swarming_bot.1.zip and swarming_bot.2.zip
    101     BOT_CMD_PATTERN = 'swarming_bot.*zip start_bot'
    102 
    103 
    104     def __init__(self, bot_id, parent_dir, swarming_proxy):
    105         """Initialize.
    106 
    107         @param bot_id: An integer.
    108         @param bot_dir: The working directory for the bot.
    109                         The directory is used to store bot code,
    110                         log file, and any file generated by the bot
    111                         at run time.
    112         @param swarming_proxy: URL to the swarming instance.
    113         """
    114         self.bot_id = bot_id
    115         self.swarming_proxy = swarming_proxy
    116         self.parent_dir = os.path.abspath(os.path.expanduser(parent_dir))
    117         self.bot_dir = os.path.join(self.parent_dir,
    118                                     self.BOT_DIR_FORMAT % self.bot_id)
    119         self.pid_file = os.path.join(self.bot_dir, self.PID_FILE)
    120         self.pid = None
    121         self._refresh_pid()
    122         if self.pid is None:
    123             logging.debug('[Bot %s] Initialize: bot is not running',
    124                           self.bot_id)
    125         else:
    126             logging.debug('[Bot %s] Initialize: bot is running '
    127                           'as process %s', self.bot_id, self.pid)
    128 
    129 
    130     def _write_pid(self):
    131         """Write pid to file"""
    132         with open(self.pid_file, 'w') as f:
    133             f.write(str(self.pid))
    134 
    135 
    136     def _cleanup_pid(self):
    137         """Cleanup self.pid and pid file."""
    138         self.pid = None
    139         if os.path.exists(self.pid_file):
    140             os.remove(self.pid_file)
    141 
    142 
    143     def _is_process_running(self):
    144         """Check if the process is running."""
    145         pattern = os.path.join(self.bot_dir, self.BOT_CMD_PATTERN)
    146         pattern = '%s %s' % (sys.executable, pattern)
    147         cmd = ['pgrep', '-f', pattern]
    148         logging.debug('[Bot %s] check process running: %s',
    149                       self.bot_id, str(cmd))
    150         try:
    151             output = subprocess.check_output(cmd)
    152             pids = output.splitlines()
    153             if len(pids) > 1:
    154                 raise DuplicateBotError('Multiple processes (pid: %s) detected for Bot %s'
    155                                         % (str(pids), self.bot_id))
    156             pid = int(pids[0])
    157             if pid != self.pid:
    158                 raise PidMisMatchError(self.pid, pid)
    159             return True
    160         except subprocess.CalledProcessError as e:
    161             if e.returncode == 1:
    162                 return False
    163             else:
    164                 raise
    165 
    166 
    167     def _refresh_pid(self):
    168         """Check process status and update self.pid accordingly."""
    169         # Reload pid from pid file.
    170         if os.path.exists(self.pid_file):
    171             with open(self.pid_file) as f:
    172                 try:
    173                     pid = f.readline().strip()
    174                     self.pid = int(pid)
    175                 except ValueError as e:
    176                     self.pid = None
    177         try:
    178             if not self._is_process_running():
    179                 self._cleanup_pid()
    180         except PidMisMatchError as e:
    181             logging.error('[Bot %s] %s, updating pid file',
    182                           self.bot_id, str(e))
    183             self.pid = e.new_pid
    184             self._write_pid()
    185 
    186 
    187     def is_running(self):
    188         """Return if the bot is running."""
    189         self._refresh_pid()
    190         return bool(self.pid)
    191 
    192 
    193     def ensure_running(self):
    194         """Start a swarming bot."""
    195         if self.is_running():
    196             logging.info(
    197                     '[Bot %s] Skip start, bot is already running (pid %s).',
    198                     self.bot_id, self.pid)
    199             return
    200         logging.debug('[Bot %s] Bootstrap bot in %s', self.bot_id, self.bot_dir)
    201         if os.path.exists(self.bot_dir):
    202             shutil.rmtree(self.bot_dir)
    203         os.makedirs(self.bot_dir)
    204         dest = os.path.join(self.bot_dir, self.BOT_FILENAME)
    205         logging.debug('[Bot %s] Getting bot code from: %s/bot_code',
    206                       self.bot_id, self.swarming_proxy)
    207         urllib.urlretrieve('%s/bot_code' % self.swarming_proxy, dest)
    208         cmd = [sys.executable, self.BOT_FILENAME]
    209         logging.debug('[Bot %s] Calling command: %s', self. bot_id, cmd)
    210         process = subprocess.Popen(
    211                 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
    212                 cwd=self.bot_dir)
    213         self.pid = process.pid
    214         self._write_pid()
    215         logging.info('[Bot %s] Created bot (pid: %d)', self.bot_id, self.pid)
    216 
    217 
    218     def kill(self):
    219         """Kill the bot."""
    220         if not self.is_running():
    221             logging.info('[Bot %s] Skip killing bot, Bot is not running',
    222                           self.bot_id)
    223             return
    224         try:
    225             logging.info('[Bot %s] killing bot (pid: %d)',
    226                           self.bot_id, self.pid)
    227             os.kill(self.pid, signal.SIGTERM)
    228             start = time.time()
    229             sleep = 1
    230             while(time.time() - start < KILL_PROC_TIMEOUT_SECS):
    231                 if not self.is_running():
    232                     return
    233                 sleep = min(sleep * 2, MAX_KILL_PROC_SLEEP_SECS)
    234                 logging.debug('[Bot %s] Waiting %d secs for bot to finish'
    235                               ' any running task and exist.',
    236                               self.bot_id, sleep)
    237                 time.sleep(sleep)
    238             else:
    239                 logging.error(
    240                         '[Bot %s] Failed to kill pid %s within %d secs, '
    241                         'the bot may be running a long running task, you '
    242                         'can retry the script. SIGKILL the process is not '
    243                         'recommended, it might lead to unexpected error.',
    244                         self.bot_id, self.pid, KILL_PROC_TIMEOUT_SECS)
    245         except Exception as e:
    246             raise BotManagementError('[Bot %s] %s' % (self.bot_id, str(e)))
    247 
    248 
    249 class BotManager(object):
    250     """Class that manages swarming bots."""
    251 
    252 
    253     CHECK_BOTS_PATTERN = '{executable} {working_dir}.*{bot_cmd_pattern}'
    254 
    255 
    256     def __init__(self, bot_ids, working_dir, swarming_proxy):
    257         """Initialize.
    258 
    259         @param bot_ids: a set of integers.
    260         @param working_dir: Working directory of the bots.
    261                             Store temporary files.
    262         @param swarming_proxy: The swarming instance to talk to.
    263         """
    264         self.bot_ids = bot_ids
    265         self.working_dir = os.path.abspath(os.path.expanduser(working_dir))
    266         self.bots = [SwarmingBot(bid, self.working_dir, swarming_proxy)
    267                      for bid in bot_ids]
    268 
    269     def launch(self):
    270         """Launch bots."""
    271         for bot in self.bots:
    272           try:
    273               bot.ensure_running()
    274           except BotManagementError as e:
    275               logging.error('[BotManager] Failed to start Bot %s: %s',
    276                             bot.bot_id, str(e))
    277         # If we let the process exit immediately, the last process won't
    278         # be launched sometimes. So sleep for 3 secs.
    279         # The right way is to query the server until all bots are seen
    280         # by the server by visiting
    281         # https://SWARMING_PROXY/swarming/api/v1/client/bots
    282         # However, this would require oauth authentication (install
    283         # oauth library and install credentials).
    284         logging.info('Wait 3 seconds for process creation to complete.')
    285         time.sleep(3)
    286 
    287 
    288     def kill(self):
    289         """Kill running bots."""
    290         # Start threads to kill bots.
    291         threads = []
    292         for bot in self.bots:
    293             t = threading.Thread(target=bot.kill)
    294             threads.append(t)
    295             t.setDaemon(True)
    296             t.start()
    297         # Wait on threads.
    298         try:
    299             while threading.active_count() > 1:
    300                 time.sleep(0.1)
    301         except KeyboardInterrupt:
    302             msg = 'Ctrl-c recieved! Bots status not confirmed. Exit.'
    303             logging.error(msg)
    304             print msg
    305 
    306 
    307     def check(self):
    308         """Check running bots, start it if not running."""
    309         pattern =  self.CHECK_BOTS_PATTERN.format(
    310                 executable=sys.executable, working_dir=self.working_dir,
    311                 bot_cmd_pattern=SwarmingBot.BOT_CMD_PATTERN)
    312         cmd = ['pgrep', '-f', pattern]
    313         logging.debug('[BotManager] Check bot counts: %s', str(cmd))
    314         try:
    315             output = subprocess.check_output(cmd)
    316             bot_count = len(output.splitlines())
    317         except subprocess.CalledProcessError as e:
    318             if e.returncode == 1:
    319                 bot_count = 0
    320             else:
    321                 raise
    322         missing_count = len(self.bot_ids) - bot_count
    323         logging.info(
    324                 '[BotManager] Check bot counts: %d bots running, missing: %d',
    325                 bot_count, missing_count)
    326         if missing_count > 0:
    327             logging.info('[BotManager] Checking all bots')
    328             self.launch()
    329 
    330 
    331 def _parse_range(id_range):
    332     """Convert an id range to a set of bot ids.
    333 
    334     @param id_range: A range of integer, e.g "1-200".
    335 
    336     @returns a set of bot ids set([1,2,...200])
    337     """
    338     m = re.match(ID_RANGE_FMT, id_range)
    339     if not m:
    340         raise ValueError('Could not parse %s' % id_range)
    341     min, max = int(m.group(1)), int(m.group(2))
    342     return set(bid for bid in range(min, max+1))
    343 
    344 
    345 def _parse_args(args):
    346     """Parse args.
    347 
    348     @param args: Argument list passed from main.
    349 
    350     @return: A tuple with the parsed args, as returned by parser.parse_args.
    351     """
    352     parser = argparse.ArgumentParser(
    353             description='Launch swarming bots on a autotest server')
    354     action_help = ('launch: launch bots. '
    355                   'kill: kill bots. '
    356                   'check: check if bots are running, if not, starting bots.')
    357     parser.add_argument(
    358             'action', choices=('launch', 'kill', 'check'), help=action_help)
    359     parser.add_argument(
    360             '-r', '--id_range', type=str, dest='id_range', required=True,
    361             help='A range of integer, each bot created will be labeled '
    362                  'with an id from this range. E.g. "1-200"')
    363     parser.add_argument(
    364             '-d', '--working_dir', type=str, dest='working_dir', required=True,
    365             help='A working directory where bots will store files '
    366                  'generated at runtime')
    367     parser.add_argument(
    368             '-p', '--swarming_proxy', type=str, dest='swarming_proxy',
    369             default=DEFAULT_SWARMING_PROXY,
    370             help='The URL of the swarming instance to talk to, '
    371                  'Default to the one specified in global config')
    372     parser.add_argument(
    373             '-f', '--log_file', dest='log_file', required=False,
    374             help='Path to the log file.')
    375     parser.add_argument(
    376             '-v', '--verbose', dest='verbose', action='store_true',
    377             help='Verbose mode')
    378 
    379     return parser.parse_args(args)
    380 
    381 
    382 def _setup_logging(verbose, log_file):
    383     """Setup logging.
    384 
    385     @param verbose: bool, if True, log at DEBUG level, otherwise INFO level.
    386     @param log_file; path to log file.
    387     """
    388     log_formatter = logging.Formatter(LOGGING_FORMAT)
    389     if not log_file:
    390         handler = logging.StreamHandler()
    391     else:
    392         handler = logging.handlers.RotatingFileHandler(
    393                 filename=log_file, maxBytes=LOG_FILE_SIZE,
    394                 backupCount=LOG_FILE_BACKUPCOUNT)
    395     handler.setFormatter(log_formatter)
    396     logger = logging.getLogger()
    397     log_level = logging.DEBUG if verbose else logging.INFO
    398     logger.setLevel(log_level)
    399     logger.addHandler(handler)
    400 
    401 
    402 def main(args):
    403     """Main.
    404 
    405     @args: A list of system arguments.
    406     """
    407     args = _parse_args(args)
    408     _setup_logging(args.verbose, args.log_file)
    409 
    410     if not args.swarming_proxy:
    411         logging.error(
    412                 'No swarming proxy instance specified. '
    413                 'Specify swarming_proxy in [CROS] in shadow_config, '
    414                 'or use --swarming_proxy')
    415         return 1
    416     if not args.swarming_proxy.startswith('https://'):
    417         swarming_proxy = 'https://' + args.swarming_proxy
    418     else:
    419         swarming_proxy = args.swarming_proxy
    420 
    421     logging.info('Connecting to %s', swarming_proxy)
    422     m = BotManager(_parse_range(args.id_range),
    423                    args.working_dir, args.swarming_proxy)
    424 
    425     if args.action == 'launch':
    426         m.launch()
    427     elif args.action == 'kill':
    428         m.kill()
    429     elif args.action == 'check':
    430         m.check()
    431 
    432 
    433 if __name__ == '__main__':
    434     sys.exit(main(sys.argv[1:]))
    435