Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/python
      2 
      3 from __future__ import print_function
      4 
      5 import argparse
      6 import logging
      7 import multiprocessing
      8 import subprocess
      9 import sys
     10 from multiprocessing.pool import ThreadPool
     11 
     12 import common
     13 from autotest_lib.server import frontend
     14 from autotest_lib.site_utils.lib import infra
     15 
     16 DEPLOY_SERVER_LOCAL = ('/usr/local/autotest/site_utils/deploy_server_local.py')
     17 POOL_SIZE = 124
     18 
     19 
     20 def _filter_servers(servers):
     21     """Filter a set of servers to those that should be deployed to."""
     22     non_push_roles = {'devserver', 'crash_server', 'reserve'}
     23     for s in servers:
     24         if s['status'] == 'repair_required':
     25             continue
     26         if s['status'] == 'backup':
     27             continue
     28         if set(s['roles']) & non_push_roles:
     29             continue
     30         yield s
     31 
     32 
     33 def discover_servers(afe):
     34     """Discover the in-production servers to update.
     35 
     36     Returns the set of servers from serverdb that are in production and should
     37     be updated. This filters out servers in need of repair, or servers of roles
     38     that are not yet supported by deploy_server / deploy_server_local.
     39 
     40     @param afe: Server to contact with RPC requests.
     41 
     42     @returns: A set of server hostnames.
     43     """
     44     # Example server details....
     45     # {
     46     #     'hostname': 'server1',
     47     #     'status': 'backup',
     48     #     'roles': ['drone', 'scheduler'],
     49     #     'attributes': {'max_processes': 300}
     50     # }
     51     rpc = frontend.AFE(server=afe)
     52     servers = rpc.run('get_servers')
     53 
     54     return {s['hostname'] for s in _filter_servers(servers)}
     55 
     56 
     57 def _parse_arguments(args):
     58     """Parse command line arguments.
     59 
     60     @param args: The command line arguments to parse. (usually sys.argv[1:])
     61 
     62     @returns A tuple of (argparse.Namespace populated with argument values,
     63                          list of extra args to pass to deploy_server_local).
     64     """
     65     parser = argparse.ArgumentParser(
     66             formatter_class=argparse.RawDescriptionHelpFormatter,
     67             description='Run deploy_server_local on a bunch of servers. Extra '
     68                         'arguments will be passed through.',
     69             epilog=('Update all servers:\n'
     70                     '  deploy_server.py -x --afe cautotest\n'
     71                     '\n'
     72                     'Update one server:\n'
     73                     '  deploy_server.py <server> -x\n'
     74                     ))
     75 
     76     parser.add_argument('-x', action='store_true',
     77                         help='Actually perform actions. If not supplied, '
     78                              'script does nothing.')
     79     parser.add_argument('--afe',
     80             help='The AFE server used to get servers from server_db,'
     81                  'e.g, cautotest. Used only if no SERVER specified.')
     82     parser.add_argument('servers', action='store', nargs='*', metavar='SERVER')
     83 
     84     return parser.parse_known_args()
     85 
     86 
     87 def _update_server(server, extra_args=[]):
     88     """Run deploy_server_local for given server.
     89 
     90     @param server: hostname to update.
     91     @param extra_args: args to be passed in to deploy_server_local.
     92 
     93     @return: A tuple of (server, success, output), where:
     94              server: Name of the server.
     95              sucess: True if update succeeds, False otherwise.
     96              output: A string of the deploy_server_local script output
     97                      including any errors.
     98     """
     99     cmd = ('%s %s' %
    100            (DEPLOY_SERVER_LOCAL, ' '.join(extra_args)))
    101     success = False
    102     try:
    103         output = infra.execute_command(server, cmd)
    104         success = True
    105     except subprocess.CalledProcessError as e:
    106         output = e.output
    107 
    108     return server, success, output
    109 
    110 def _update_in_parallel(servers, extra_args=[]):
    111     """Update a group of servers in parallel.
    112 
    113     @param servers: A list of servers to update.
    114     @param options: Options for the push.
    115 
    116     @returns A dictionary from server names that failed to the output
    117              of the update script.
    118     """
    119     # Create a list to record all the finished servers.
    120     manager = multiprocessing.Manager()
    121     finished_servers = manager.list()
    122 
    123     do_server = lambda s: _update_server(s, extra_args)
    124 
    125     # The update actions run in parallel. If any update failed, we should wait
    126     # for other running updates being finished. Abort in the middle of an update
    127     # may leave the server in a bad state.
    128     pool = ThreadPool(POOL_SIZE)
    129     try:
    130         results = pool.map_async(do_server, servers)
    131         pool.close()
    132 
    133         # Track the updating progress for current group of servers.
    134         incomplete_servers = set()
    135         server_names = set([s[0] for s in servers])
    136         while not results.ready():
    137             incomplete_servers = sorted(set(servers) - set(finished_servers))
    138             print('Not finished yet. %d servers in this group. '
    139                 '%d servers are still running:\n%s\n' %
    140                 (len(servers), len(incomplete_servers), incomplete_servers))
    141             # Check the progress every 20s
    142             results.wait(20)
    143 
    144         # After update finished, parse the result.
    145         failures = {}
    146         for server, success, output in results.get():
    147             if not success:
    148                 failures[server] = output
    149 
    150         return failures
    151 
    152     finally:
    153         pool.terminate()
    154         pool.join()
    155 
    156 
    157 def main(args):
    158     """Entry point to deploy_server.py
    159 
    160     @param args: The command line arguments to parse. (usually sys.argv)
    161 
    162     @returns The system exit code.
    163     """
    164     options, extra_args = _parse_arguments(args[1:])
    165     # Remove all the handlers from the root logger to get rid of the handlers
    166     # introduced by the import packages.
    167     logging.getLogger().handlers = []
    168     logging.basicConfig(level=logging.DEBUG)
    169 
    170     servers = options.servers
    171     if not servers:
    172         if not options.afe:
    173             print('No servers or afe specified. Aborting')
    174             return 1
    175         print('Retrieving servers from %s..' % options.afe)
    176         servers = discover_servers(options.afe)
    177         print('Retrieved servers were: %s' % servers)
    178 
    179     if not options.x:
    180         print('Doing nothing because -x was not supplied.')
    181         print('servers: %s' % options.servers)
    182         print('extra args for deploy_server_local: %s' % extra_args)
    183         return 0
    184 
    185     failures = _update_in_parallel(servers, extra_args)
    186 
    187     if not failures:
    188         print('Completed all updates successfully.')
    189         return 0
    190 
    191     print('The following servers failed, with the following output:')
    192     for s, o in failures.iteritems():
    193         print('======== %s ========' % s)
    194         print(o)
    195 
    196     print('The servers that failed were:')
    197     print('\n'.join(failures.keys()))
    198     print('\n\nTo retry on failed servers, run the following command:')
    199     retry_cmd = [args[0], '-x'] + failures.keys() + extra_args
    200     print(' '.join(retry_cmd))
    201     return 1
    202 
    203 
    204 
    205 if __name__ == '__main__':
    206     sys.exit(main(sys.argv))
    207