Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/python
      2 
      3 from __future__ import print_function
      4 
      5 import argparse
      6 import logging
      7 import multiprocessing
      8 import os
      9 import subprocess
     10 import sys
     11 import time
     12 
     13 import common
     14 from autotest_lib.server import frontend
     15 from autotest_lib.site_utils.lib import infra
     16 
     17 DEPLOY_SERVER_LOCAL = ('/usr/local/autotest/site_utils/deploy_server_local.py')
     18 POOL_SIZE = 124
     19 PUSH_ORDER = {'database': 0,
     20               'database_slave': 0,
     21               'drone': 1,
     22               'shard': 1,
     23               'golo_proxy': 1,
     24               'afe': 2,
     25               'scheduler': 2,
     26               'host_scheduler': 2,
     27               'suite_scheduler': 2}
     28 
     29 
     30 def discover_servers(afe, server_filter=set()):
     31     """Discover the in-production servers to update.
     32 
     33     @param afe: Server to contact with RPC requests.
     34     @param server_filter: A set of servers to get status for.
     35 
     36     @returns: A list of a list of tuple of (server_name, server_status, roles).
     37               The list is sorted by the order to be updated. Servers in the same
     38               sublist can be pushed together.
     39 
     40     """
     41     # Example server details....
     42     # {
     43     #     'hostname': 'server1',
     44     #     'status': 'backup',
     45     #     'roles': ['drone', 'scheduler'],
     46     #     'attributes': {'max_processes': 300}
     47     # }
     48     rpc = frontend.AFE(server=afe)
     49     servers = rpc.run('get_servers')
     50 
     51     # Do not update servers that need repair, and filter the server list by
     52     # given server_filter if needed.
     53     servers = [s for s in servers
     54                if (s['status'] != 'repair_required' and
     55                    (not server_filter or s['hostname'] in server_filter))]
     56 
     57     # Do not update reserve, devserver or crash_server (not YET supported).
     58     servers = [s for s in servers if 'devserver' not in s['roles'] and
     59                'crash_server' not in s['roles'] and
     60                'reserve' not in s['roles']]
     61 
     62     sorted_servers = []
     63     for i in range(max(PUSH_ORDER.values()) + 1):
     64         sorted_servers.append([])
     65     servers_with_unknown_order = []
     66     for server in servers:
     67         info = (server['hostname'], server['status'], server['roles'])
     68         try:
     69             order = min([PUSH_ORDER[r] for r in server['roles']
     70                          if r in PUSH_ORDER])
     71             sorted_servers[order].append(info)
     72         except ValueError:
     73             # All roles are not indexed in PUSH_ORDER.
     74             servers_with_unknown_order.append(info)
     75 
     76     # Push all servers with unknown roles together.
     77     if servers_with_unknown_order:
     78         sorted_servers.append(servers_with_unknown_order)
     79 
     80     found_servers = set([s['hostname'] for s in servers])
     81     # Inject the servers passed in by user but not found in server database.
     82     extra_servers = []
     83     for server in server_filter - found_servers:
     84         extra_servers.append((server, 'unknown', ['unknown']))
     85     if extra_servers:
     86         sorted_servers.append(extra_servers)
     87 
     88     return sorted_servers
     89 
     90 
     91 def parse_arguments(args):
     92     """Parse command line arguments.
     93 
     94     @param args: The command line arguments to parse. (usually sys.argv[1:])
     95 
     96     @returns An argparse.Namespace populated with argument values.
     97     """
     98     parser = argparse.ArgumentParser(
     99             formatter_class=argparse.RawDescriptionHelpFormatter,
    100             description='Command to update an entire autotest installation.',
    101             epilog=('Update all servers:\n'
    102                     '  deploy_server.py\n'
    103                     '\n'
    104                     'Update one server:\n'
    105                     '  deploy_server.py <server>\n'
    106                     '\n'
    107                     'Send arguments to remote deploy_server_local.py:\n'
    108                     '  deploy_server.py -- --dryrun\n'
    109                     '\n'
    110                     'See what arguments would be run on specified servers:\n'
    111                     '  deploy_server.py --dryrun <server_a> <server_b> --'
    112                     ' --skip-update\n'))
    113 
    114     parser.add_argument('-v', '--verbose', action='store_true', dest='verbose',
    115             help='Log all deploy script output.')
    116     parser.add_argument('--continue', action='store_true', dest='cont',
    117             help='Continue to the next server on failure.')
    118     parser.add_argument('--afe', required=True,
    119             help='What is the main server for this installation? (cautotest).')
    120     parser.add_argument('--update_push_servers', action='store_true',
    121             help='Indicate to update test_push servers.')
    122     parser.add_argument('--force_update', action='store_true',
    123             help='Force to run update commands for afe, tko, build_externals')
    124     parser.add_argument('--dryrun', action='store_true',
    125             help='Don\'t actually run remote commands.')
    126     parser.add_argument('--logfile', action='store',
    127             default='/tmp/deployment.log',
    128             help='Path to the file to save the deployment log to. Default is '
    129                  '/tmp/deployment.log')
    130     parser.add_argument('args', nargs=argparse.REMAINDER,
    131             help=('<server>, <server> ... -- <remote_arg>, <remote_arg> ...'))
    132 
    133     results = parser.parse_args(args)
    134 
    135     # We take the args list and further split it down. Everything before --
    136     # is a server name, and everything after it is an argument to pass along
    137     # to deploy_server_local.py.
    138     #
    139     # This:
    140     #   server_a, server_b -- --dryrun --skip-report
    141     #
    142     # Becomes:
    143     #   args.servers['server_a', 'server_b']
    144     #   args.args['--dryrun', '--skip-report']
    145     try:
    146         local_args_index = results.args.index('--') + 1
    147     except ValueError:
    148         # If -- isn't present, they are all servers.
    149         results.servers = results.args
    150         results.args = []
    151     else:
    152         # Split arguments.
    153         results.servers = results.args[:local_args_index-1]
    154         results.args = results.args[local_args_index:]
    155 
    156     return results
    157 
    158 
    159 def update_server(inputs):
    160     """Deploy for given server.
    161 
    162     @param inputs: Inputs for the update action, including:
    163                    server: Name of the server to update.
    164                    status: Status of the server.
    165                    options: Options for the update.
    166 
    167     @return: A tuple of (server, success, output), where:
    168              server: Name of the server to be updated.
    169              sucess: True if update succeeds, False otherwise.
    170              output: A string of the deploy_server_local script output
    171                      including any errors.
    172 
    173     """
    174     start = time.time()
    175     server = inputs['server']
    176     status = inputs['status']
    177     # Shared list to record the finished server.
    178     finished_servers = inputs['finished_servers']
    179     options = inputs['options']
    180     print('Updating server %s...' % server)
    181     if status == 'backup':
    182         extra_args = ['--skip-service-status']
    183     else:
    184         extra_args = []
    185 
    186     cmd = ('%s %s' %
    187            (DEPLOY_SERVER_LOCAL, ' '.join(options.args + extra_args)))
    188     output = '%s: %s' % (server, cmd)
    189     success = True
    190     if not options.dryrun:
    191         for i in range(5):
    192             try:
    193                 print('[%s/5] Try to update server %s' % (i, server))
    194                 output = infra.execute_command(server, cmd)
    195                 finished_servers.append(server)
    196                 break
    197             except subprocess.CalledProcessError as e:
    198                 print('%s: Command failed with error: %s' % (server, e))
    199                 success = False
    200                 output = e.output
    201 
    202     print('Time used to update server %s: %s' % (server, time.time()-start))
    203     return server, success, output
    204 
    205 
    206 def update_in_parallel(servers, options):
    207     """Update a group of servers in parallel.
    208 
    209     @param servers: A list of tuple of (server_name, server_status, roles).
    210     @param options: Options for the push.
    211 
    212     @returns A list of servers that failed to update.
    213     """
    214     # Create a list to record all the finished servers.
    215     manager = multiprocessing.Manager()
    216     finished_servers = manager.list()
    217 
    218     args = []
    219     for server, status, _ in servers:
    220         args.append({'server': server,
    221                      'status': status,
    222                      'finished_servers': finished_servers,
    223                      'options': options})
    224     # The update actions run in parallel. If any update failed, we should wait
    225     # for other running updates being finished. Abort in the middle of an update
    226     # may leave the server in a bad state.
    227     pool = multiprocessing.pool.ThreadPool(POOL_SIZE)
    228     try:
    229         failed_servers = []
    230         results = pool.map_async(update_server, args)
    231         pool.close()
    232 
    233         # Track the updating progress for current group of servers.
    234         incomplete_servers = set()
    235         server_names = set([s[0] for s in servers])
    236         while not results.ready():
    237             incomplete_servers = server_names - set(finished_servers)
    238             print('Not finished yet. %d servers in this group. '
    239                 '%d servers are still running:\n%s\n' %
    240                 (len(servers), len(incomplete_servers), incomplete_servers))
    241             # Check the progress every 1 mins
    242             results.wait(60)
    243 
    244         # After update finished, parse the result.
    245         for server, success, output in results.get():
    246             if options.dryrun:
    247                 print('Dry run, updating server %s is skipped.' % server)
    248             else:
    249                 if success:
    250                     msg = ('Successfully updated server %s.\n' % server)
    251                     if options.verbose:
    252                         print(output)
    253                         print()
    254                 else:
    255                     msg = ('Failed to update server %s.\nError: %s' %
    256                         (server, output.strip()))
    257                     print(msg)
    258                     failed_servers.append(server)
    259                 # Write the result into logfile.
    260                 with open(options.logfile, 'a') as f:
    261                     f.write(msg)
    262     finally:
    263         pool.terminate()
    264         pool.join()
    265 
    266     return failed_servers
    267 
    268 def main(args):
    269     """Main routine that drives all the real work.
    270 
    271     @param args: The command line arguments to parse. (usually sys.argv)
    272 
    273     @returns The system exit code.
    274     """
    275     options = parse_arguments(args[1:])
    276     # Remove all the handlers from the root logger to get rid of the handlers
    277     # introduced by the import packages.
    278     logging.getLogger().handlers = []
    279     logging.basicConfig(level=logging.DEBUG
    280                         if options.verbose else logging.INFO)
    281 
    282     print('Retrieving server status...')
    283     sorted_servers = discover_servers(options.afe, set(options.servers or []))
    284 
    285     # Display what we plan to update.
    286     print('Will update (in this order):')
    287     i = 1
    288     for servers in sorted_servers:
    289         print('%s Group %d (%d servers) %s' % ('='*30, i, len(servers), '='*30))
    290         for server, status, roles in servers:
    291             print('\t%-36s:\t%s\t%s' % (server, status, roles))
    292         i += 1
    293     print()
    294 
    295     if os.path.exists(options.logfile):
    296         os.remove(options.logfile)
    297     print ('Start updating, push logs of every server will be saved '
    298            'at %s' % options.logfile)
    299     failed = []
    300     skipped = []
    301     for servers in sorted_servers:
    302         if not failed or options.cont:
    303             failed += update_in_parallel(servers, options)
    304         else:
    305             skipped.extend(s[0] for s in servers)  # Only include server name.
    306 
    307     if failed:
    308         print('Errors updating:')
    309         for server in failed:
    310             print('  %s' % server)
    311         print()
    312         print('To retry:')
    313         print('  %s <options> %s' %
    314               (str(args[0]), str(' '.join(failed + skipped))))
    315         # Exit with error.
    316         return 1
    317 
    318 
    319 if __name__ == '__main__':
    320     sys.exit(main(sys.argv))
    321