Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/python
      2 
      3 from __future__ import print_function
      4 
      5 import argparse
      6 import multiprocessing.pool
      7 import subprocess
      8 import sys
      9 
     10 import common
     11 from autotest_lib.server import frontend
     12 from autotest_lib.site_utils.lib import infra
     13 
     14 DEPLOY_PRODUCTION_LOCAL = ('/usr/local/autotest/site_utils/'
     15                            'deploy_production_local.py')
     16 POOL_SIZE = 124
     17 PUSH_ORDER = {'database': 0,
     18               'drone': 1,
     19               'shard': 1,
     20               'golo_proxy': 1,
     21               'afe': 2,
     22               'scheduler': 2,
     23               'host_scheduler': 2,
     24               'suite_scheduler': 2}
     25 
     26 
     27 def discover_servers(afe, server_filter=set()):
     28     """Discover the in-production servers to update.
     29 
     30     @param afe: Server to contact with RPC requests.
     31     @param server_filter: A set of servers to get status for.
     32 
     33     @returns: A list of a list of tuple of (server_name, server_status, roles).
     34               The list is sorted by the order to be updated. Servers in the same
     35               sublist can be pushed together.
     36 
     37     """
     38     # Example server details....
     39     # {
     40     #     'hostname': 'server1',
     41     #     'status': 'backup',
     42     #     'roles': ['drone', 'scheduler'],
     43     #     'attributes': {'max_processes': 300}
     44     # }
     45     rpc = frontend.AFE(server=afe)
     46     servers = rpc.run('get_servers')
     47 
     48     # Do not update servers that need repair, and filter the server list by
     49     # given server_filter if needed.
     50     servers = [s for s in servers
     51                if (s['status'] != 'repair_required' and
     52                    (not server_filter or s['hostname'] in server_filter))]
     53 
     54     # Do not update reserve, devserver or crash_server (not YET supported).
     55     servers = [s for s in servers if 'devserver' not in s['roles'] and
     56                'crash_server' not in s['roles'] and
     57                'reserve' not in s['roles']]
     58 
     59     sorted_servers = []
     60     for i in range(max(PUSH_ORDER.values()) + 1):
     61         sorted_servers.append([])
     62     servers_with_unknown_order = []
     63     for server in servers:
     64         info = (server['hostname'], server['status'], server['roles'])
     65         try:
     66             order = min([PUSH_ORDER[r] for r in server['roles']
     67                          if r in PUSH_ORDER])
     68             sorted_servers[order].append(info)
     69         except ValueError:
     70             # All roles are not indexed in PUSH_ORDER.
     71             servers_with_unknown_order.append(info)
     72 
     73     # Push all servers with unknown roles together.
     74     if servers_with_unknown_order:
     75         sorted_servers.append(servers_with_unknown_order)
     76 
     77     found_servers = set([s['hostname'] for s in servers])
     78     # Inject the servers passed in by user but not found in server database.
     79     extra_servers = []
     80     for server in server_filter - found_servers:
     81         extra_servers.append((server, 'unknown', ['unknown']))
     82     if extra_servers:
     83         sorted_servers.append(extra_servers)
     84 
     85     return sorted_servers
     86 
     87 
     88 def parse_arguments(args):
     89     """Parse command line arguments.
     90 
     91     @param args: The command line arguments to parse. (usually sys.argv[1:])
     92 
     93     @returns An argparse.Namespace populated with argument values.
     94     """
     95     parser = argparse.ArgumentParser(
     96             formatter_class=argparse.RawDescriptionHelpFormatter,
     97             description='Command to update an entire autotest installation.',
     98             epilog=('Update all servers:\n'
     99                     '  deploy_production.py\n'
    100                     '\n'
    101                     'Update one server:\n'
    102                     '  deploy_production.py <server>\n'
    103                     '\n'
    104                     'Send arguments to remote deploy_production_local.py:\n'
    105                     '  deploy_production.py -- --dryrun\n'
    106                     '\n'
    107                     'See what arguments would be run on specified servers:\n'
    108                     '  deploy_production.py --dryrun <server_a> <server_b> --'
    109                     ' --skip-update\n'))
    110 
    111     parser.add_argument('-v', '--verbose', action='store_true', dest='verbose',
    112             help='Log all deploy script output.')
    113     parser.add_argument('--continue', action='store_true', dest='cont',
    114             help='Continue to the next server on failure.')
    115     parser.add_argument('--afe', default='cautotest',
    116             help='What is the main server for this installation? (cautotest).')
    117     parser.add_argument('--dryrun', action='store_true',
    118             help='Don\'t actually run remote commands.')
    119     parser.add_argument('args', nargs=argparse.REMAINDER,
    120             help=('<server>, <server> ... -- <remote_arg>, <remote_arg> ...'))
    121 
    122     results = parser.parse_args(args)
    123 
    124     # We take the args list and further split it down. Everything before --
    125     # is a server name, and everything after it is an argument to pass along
    126     # to deploy_production_local.py.
    127     #
    128     # This:
    129     #   server_a, server_b -- --dryrun --skip-report
    130     #
    131     # Becomes:
    132     #   args.servers['server_a', 'server_b']
    133     #   args.args['--dryrun', '--skip-report']
    134     try:
    135         local_args_index = results.args.index('--') + 1
    136     except ValueError:
    137         # If -- isn't present, they are all servers.
    138         results.servers = results.args
    139         results.args = []
    140     else:
    141         # Split arguments.
    142         results.servers = results.args[:local_args_index-1]
    143         results.args = results.args[local_args_index:]
    144 
    145     return results
    146 
    147 
    148 def update_server(inputs):
    149     """Deploy for given server.
    150 
    151     @param inputs: Inputs for the update action, including:
    152                    server: Name of the server to update.
    153                    status: Status of the server.
    154                    options: Options for the update.
    155 
    156     @return: A tuple of (server, success, output), where:
    157              server: Name of the server to be updated.
    158              sucess: True if update succeeds, False otherwise.
    159              output: A string of the deploy_production_local script output
    160                      including any errors.
    161 
    162     """
    163     server = inputs['server']
    164     status = inputs['status']
    165     options = inputs['options']
    166     print('Updating server %s...' % server)
    167     if status == 'backup':
    168         extra_args = ['--skip-service-status']
    169     else:
    170         extra_args = []
    171 
    172     cmd = ('%s %s' %
    173            (DEPLOY_PRODUCTION_LOCAL, ' '.join(options.args + extra_args)))
    174     output = '%s: %s' % (server, cmd)
    175     success = True
    176     if not options.dryrun:
    177         try:
    178             output = infra.execute_command(server, cmd)
    179         except subprocess.CalledProcessError as e:
    180             success = False
    181             output = e.output
    182     return server, success, output
    183 
    184 
    185 def update_in_parallel(servers, options):
    186     """Update a group of servers in parallel.
    187 
    188     Exit the process with error if any server failed to be updated and
    189     options.cont is not set.
    190 
    191     @param servers: A list of tuple of (server_name, server_status, roles).
    192     @param options: Options for the push.
    193 
    194     """
    195     args = []
    196     for server, status, _ in servers:
    197         args.append({'server': server,
    198                      'status': status,
    199                      'options': options})
    200     # The update actions run in parallel. If any update failed, we should wait
    201     # for other running updates being finished. Abort in the middle of an update
    202     # may leave the server in a bad state.
    203     pool = multiprocessing.pool.ThreadPool(POOL_SIZE)
    204     failed_servers = []
    205     results = pool.imap_unordered(update_server, args)
    206     for server, success, output in results:
    207         if options.dryrun:
    208             print('Dry run, updating server %s is skipped.' % server)
    209         elif success:
    210             print('Successfully updated server %s.' % server)
    211             if options.verbose:
    212                 print(output)
    213                 print()
    214         else:
    215             error = ('Failed to update server %s.\nError: %s' %
    216                      (server, output))
    217             print(error)
    218             failed_servers.append(server)
    219     if failed_servers and not options.cont:
    220         print('Error! Failed to update following servers: %s' %
    221               failed_servers)
    222         sys.exit(1)
    223 
    224 
    225 def update_group(servers, options):
    226     """Update a group of servers in parallel.
    227 
    228     Exit the process with error if any server failed to be updated and
    229     options.cont is not set.
    230 
    231     @param servers: A list of tuple of (server_name, server_status, roles).
    232     @param options: Options for the push.
    233 
    234     """
    235     # If it's allowed to continue updating even after some update fails, update
    236     # all servers together.
    237     if options.cont:
    238         update_in_parallel(servers, options)
    239         return
    240 
    241     # Pick on server per role in the group to update first. Abort if any update
    242     # failed.
    243     server_per_role = {}
    244     # Each server can be used to qualify only one role.
    245     server_picked = set()
    246     for server, status, roles in servers:
    247         for role in roles:
    248             if not role in server_per_role and not server in server_picked:
    249                 server_per_role[role] = (server, status, roles)
    250                 server_picked.add(server)
    251                 break
    252     update_in_parallel(server_per_role.values(), options)
    253 
    254     rest_servers = [s for s in servers if not s[0] in server_picked]
    255     update_in_parallel(rest_servers, options)
    256 
    257 
    258 def main(args):
    259     """Main routine that drives all the real work.
    260 
    261     @param args: The command line arguments to parse. (usually sys.argv[1:])
    262 
    263     @returns The system exit code.
    264     """
    265     options = parse_arguments(args)
    266 
    267     print('Retrieving server status...')
    268     sorted_servers = discover_servers(options.afe, set(options.servers or []))
    269 
    270     # Display what we plan to update.
    271     print('Will update (in this order):')
    272     i = 1
    273     for servers in sorted_servers:
    274         print('%s Group %d (%d servers) %s' % ('='*30, i, len(servers), '='*30))
    275         for server, status, roles in servers:
    276             print('\t%-36s:\t%s\t%s' % (server, status, roles))
    277         i += 1
    278     print()
    279 
    280     for servers in sorted_servers:
    281         update_group(servers, options)
    282 
    283 
    284 if __name__ == '__main__':
    285     sys.exit(main(sys.argv[1:]))
    286