Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/python
      2 
      3 from __future__ import print_function
      4 
      5 import argparse
      6 import logging
      7 import multiprocessing
      8 import os
      9 import subprocess
     10 import sys
     11 import time
     12 
     13 import common
     14 from autotest_lib.server import frontend
     15 from autotest_lib.site_utils.lib import infra
     16 
     17 DEPLOY_SERVER_LOCAL = ('/usr/local/autotest/site_utils/deploy_server_local.py')
     18 POOL_SIZE = 124
     19 PUSH_ORDER = {'database': 0,
     20               'database_slave': 0,
     21               'drone': 1,
     22               'shard': 1,
     23               'golo_proxy': 1,
     24               'sentinel': 1,
     25               'afe': 2,
     26               'scheduler': 2,
     27               'host_scheduler': 2,
     28               'suite_scheduler': 2}
     29 
     30 
     31 def discover_servers(afe, server_filter=set()):
     32     """Discover the in-production servers to update.
     33 
     34     @param afe: Server to contact with RPC requests.
     35     @param server_filter: A set of servers to get status for.
     36 
     37     @returns: A list of a list of tuple of (server_name, server_status, roles).
     38               The list is sorted by the order to be updated. Servers in the same
     39               sublist can be pushed together.
     40 
     41     """
     42     # Example server details....
     43     # {
     44     #     'hostname': 'server1',
     45     #     'status': 'backup',
     46     #     'roles': ['drone', 'scheduler'],
     47     #     'attributes': {'max_processes': 300}
     48     # }
     49     rpc = frontend.AFE(server=afe)
     50     servers = rpc.run('get_servers')
     51 
     52     # Do not update servers that need repair, and filter the server list by
     53     # given server_filter if needed.
     54     servers = [s for s in servers
     55                if (s['status'] != 'repair_required' and
     56                    (not server_filter or s['hostname'] in server_filter))]
     57 
     58     # Do not update reserve, devserver or crash_server (not YET supported).
     59     servers = [s for s in servers if 'devserver' not in s['roles'] and
     60                'crash_server' not in s['roles'] and
     61                'reserve' not in s['roles']]
     62 
     63     sorted_servers = []
     64     for i in range(max(PUSH_ORDER.values()) + 1):
     65         sorted_servers.append([])
     66     servers_with_unknown_order = []
     67     for server in servers:
     68         info = (server['hostname'], server['status'], server['roles'])
     69         try:
     70             order = min([PUSH_ORDER[r] for r in server['roles']
     71                          if r in PUSH_ORDER])
     72             sorted_servers[order].append(info)
     73         except ValueError:
     74             # All roles are not indexed in PUSH_ORDER.
     75             servers_with_unknown_order.append(info)
     76 
     77     # Push all servers with unknown roles together.
     78     if servers_with_unknown_order:
     79         sorted_servers.append(servers_with_unknown_order)
     80 
     81     found_servers = set([s['hostname'] for s in servers])
     82     # Inject the servers passed in by user but not found in server database.
     83     extra_servers = []
     84     for server in server_filter - found_servers:
     85         extra_servers.append((server, 'unknown', ['unknown']))
     86     if extra_servers:
     87         sorted_servers.append(extra_servers)
     88 
     89     return sorted_servers
     90 
     91 
     92 def parse_arguments(args):
     93     """Parse command line arguments.
     94 
     95     @param args: The command line arguments to parse. (usually sys.argv[1:])
     96 
     97     @returns An argparse.Namespace populated with argument values.
     98     """
     99     parser = argparse.ArgumentParser(
    100             formatter_class=argparse.RawDescriptionHelpFormatter,
    101             description='Command to update an entire autotest installation.',
    102             epilog=('Update all servers:\n'
    103                     '  deploy_server.py\n'
    104                     '\n'
    105                     'Update one server:\n'
    106                     '  deploy_server.py <server>\n'
    107                     '\n'
    108                     'Send arguments to remote deploy_server_local.py:\n'
    109                     '  deploy_server.py -- --dryrun\n'
    110                     '\n'
    111                     'See what arguments would be run on specified servers:\n'
    112                     '  deploy_server.py --dryrun <server_a> <server_b> --'
    113                     ' --skip-update\n'))
    114 
    115     parser.add_argument('-v', '--verbose', action='store_true', dest='verbose',
    116             help='Log all deploy script output.')
    117     parser.add_argument('--continue', action='store_true', dest='cont',
    118             help='Continue to the next server on failure.')
    119     parser.add_argument('--afe', required=True,
    120             help='What is the main server for this installation? (cautotest).')
    121     parser.add_argument('--update_push_servers', action='store_true',
    122             help='Indicate to update test_push servers.')
    123     parser.add_argument('--force_update', action='store_true',
    124             help='Force to run update commands for afe, tko, build_externals')
    125     parser.add_argument('--dryrun', action='store_true',
    126             help='Don\'t actually run remote commands.')
    127     parser.add_argument('--logfile', action='store',
    128             default='/tmp/deployment.log',
    129             help='Path to the file to save the deployment log to. Default is '
    130                  '/tmp/deployment.log')
    131     parser.add_argument('args', nargs=argparse.REMAINDER,
    132             help=('<server>, <server> ... -- <remote_arg>, <remote_arg> ...'))
    133 
    134     results = parser.parse_args(args)
    135 
    136     # We take the args list and further split it down. Everything before --
    137     # is a server name, and everything after it is an argument to pass along
    138     # to deploy_server_local.py.
    139     #
    140     # This:
    141     #   server_a, server_b -- --dryrun --skip-report
    142     #
    143     # Becomes:
    144     #   args.servers['server_a', 'server_b']
    145     #   args.args['--dryrun', '--skip-report']
    146     try:
    147         local_args_index = results.args.index('--') + 1
    148     except ValueError:
    149         # If -- isn't present, they are all servers.
    150         results.servers = results.args
    151         results.args = []
    152     else:
    153         # Split arguments.
    154         results.servers = results.args[:local_args_index-1]
    155         results.args = results.args[local_args_index:]
    156 
    157     return results
    158 
    159 
    160 def update_server(inputs):
    161     """Deploy for given server.
    162 
    163     @param inputs: Inputs for the update action, including:
    164                    server: Name of the server to update.
    165                    status: Status of the server.
    166                    options: Options for the update.
    167 
    168     @return: A tuple of (server, success, output), where:
    169              server: Name of the server to be updated.
    170              sucess: True if update succeeds, False otherwise.
    171              output: A string of the deploy_server_local script output
    172                      including any errors.
    173 
    174     """
    175     start = time.time()
    176     server = inputs['server']
    177     status = inputs['status']
    178     # Shared list to record the finished server.
    179     finished_servers = inputs['finished_servers']
    180     options = inputs['options']
    181     print('Updating server %s...' % server)
    182     if status == 'backup':
    183         extra_args = ['--skip-service-status']
    184     else:
    185         extra_args = []
    186 
    187     cmd = ('%s %s' %
    188            (DEPLOY_SERVER_LOCAL, ' '.join(options.args + extra_args)))
    189     output = '%s: %s' % (server, cmd)
    190     success = True
    191     if not options.dryrun:
    192         for i in range(5):
    193             try:
    194                 print('[%s/5] Try to update server %s' % (i, server))
    195                 output = infra.execute_command(server, cmd)
    196                 finished_servers.append(server)
    197                 break
    198             except subprocess.CalledProcessError as e:
    199                 print('%s: Command failed with error: %s' % (server, e))
    200                 success = False
    201                 output = e.output
    202 
    203     print('Time used to update server %s: %s' % (server, time.time()-start))
    204     return server, success, output
    205 
    206 
    207 def update_in_parallel(servers, options):
    208     """Update a group of servers in parallel.
    209 
    210     @param servers: A list of tuple of (server_name, server_status, roles).
    211     @param options: Options for the push.
    212 
    213     @returns A list of servers that failed to update.
    214     """
    215     # Create a list to record all the finished servers.
    216     manager = multiprocessing.Manager()
    217     finished_servers = manager.list()
    218 
    219     args = []
    220     for server, status, _ in servers:
    221         args.append({'server': server,
    222                      'status': status,
    223                      'finished_servers': finished_servers,
    224                      'options': options})
    225     # The update actions run in parallel. If any update failed, we should wait
    226     # for other running updates being finished. Abort in the middle of an update
    227     # may leave the server in a bad state.
    228     pool = multiprocessing.pool.ThreadPool(POOL_SIZE)
    229     try:
    230         failed_servers = []
    231         results = pool.map_async(update_server, args)
    232         pool.close()
    233 
    234         # Track the updating progress for current group of servers.
    235         incomplete_servers = set()
    236         server_names = set([s[0] for s in servers])
    237         while not results.ready():
    238             incomplete_servers = server_names - set(finished_servers)
    239             print('Not finished yet. %d servers in this group. '
    240                 '%d servers are still running:\n%s\n' %
    241                 (len(servers), len(incomplete_servers), incomplete_servers))
    242             # Check the progress every 1 mins
    243             results.wait(60)
    244 
    245         # After update finished, parse the result.
    246         for server, success, output in results.get():
    247             if options.dryrun:
    248                 print('Dry run, updating server %s is skipped.' % server)
    249             else:
    250                 if success:
    251                     msg = ('Successfully updated server %s.\n' % server)
    252                     if options.verbose:
    253                         print(output)
    254                         print()
    255                 else:
    256                     msg = ('Failed to update server %s.\nError: %s' %
    257                         (server, output.strip()))
    258                     print(msg)
    259                     failed_servers.append(server)
    260                 # Write the result into logfile.
    261                 with open(options.logfile, 'a') as f:
    262                     f.write(msg)
    263     finally:
    264         pool.terminate()
    265         pool.join()
    266 
    267     return failed_servers
    268 
    269 def main(args):
    270     """Main routine that drives all the real work.
    271 
    272     @param args: The command line arguments to parse. (usually sys.argv)
    273 
    274     @returns The system exit code.
    275     """
    276     options = parse_arguments(args[1:])
    277     # Remove all the handlers from the root logger to get rid of the handlers
    278     # introduced by the import packages.
    279     logging.getLogger().handlers = []
    280     logging.basicConfig(level=logging.DEBUG
    281                         if options.verbose else logging.INFO)
    282 
    283     print('Retrieving server status...')
    284     sorted_servers = discover_servers(options.afe, set(options.servers or []))
    285 
    286     # Display what we plan to update.
    287     print('Will update (in this order):')
    288     i = 1
    289     for servers in sorted_servers:
    290         print('%s Group %d (%d servers) %s' % ('='*30, i, len(servers), '='*30))
    291         for server, status, roles in servers:
    292             print('\t%-36s:\t%s\t%s' % (server, status, roles))
    293         i += 1
    294     print()
    295 
    296     if os.path.exists(options.logfile):
    297         os.remove(options.logfile)
    298     print ('Start updating, push logs of every server will be saved '
    299            'at %s' % options.logfile)
    300     failed = []
    301     skipped = []
    302     for servers in sorted_servers:
    303         if not failed or options.cont:
    304             failed += update_in_parallel(servers, options)
    305         else:
    306             skipped.extend(s[0] for s in servers)  # Only include server name.
    307 
    308     if failed:
    309         print('Errors updating:')
    310         for server in failed:
    311             print('  %s' % server)
    312         print()
    313         print('To retry:')
    314         print('  %s <options> %s' %
    315               (str(args[0]), str(' '.join(failed + skipped))))
    316         # Exit with error.
    317         return 1
    318 
    319 
    320 if __name__ == '__main__':
    321     sys.exit(main(sys.argv))
    322