Home | History | Annotate | Download | only in hosts
      1 # Copyright 2017 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import multiprocessing
      7 import os
      8 import threading
      9 
     10 from autotest_lib.client.common_lib import autotemp
     11 from autotest_lib.server import utils
     12 
     13 _MASTER_SSH_COMMAND_TEMPLATE = (
     14     '/usr/bin/ssh -a -x -N '
     15     '-o ControlMaster=yes '  # Create multiplex socket.
     16     '-o ControlPath=%(socket)s '
     17     '-o StrictHostKeyChecking=no '
     18     '-o UserKnownHostsFile=/dev/null '
     19     '-o BatchMode=yes '
     20     '-o ConnectTimeout=30 '
     21     '-o ServerAliveInterval=900 '
     22     '-o ServerAliveCountMax=3 '
     23     '-o ConnectionAttempts=4 '
     24     '-o Protocol=2 '
     25     '-l %(user)s -p %(port)d %(hostname)s')
     26 
     27 
     28 class MasterSsh(object):
     29     """Manages multiplex ssh connection."""
     30 
     31     def __init__(self, hostname, user, port):
     32         self._hostname = hostname
     33         self._user = user
     34         self._port = port
     35 
     36         self._master_job = None
     37         self._master_tempdir = None
     38 
     39         self._lock = multiprocessing.Lock()
     40 
     41     def __del__(self):
     42         self.close()
     43 
     44     @property
     45     def _socket_path(self):
     46         return os.path.join(self._master_tempdir.name, 'socket')
     47 
     48     @property
     49     def ssh_option(self):
     50         """Returns the ssh option to use this multiplexed ssh.
     51 
     52         If background process is not running, returns an empty string.
     53         """
     54         if not self._master_tempdir:
     55             return ''
     56         return '-o ControlPath=%s' % (self._socket_path,)
     57 
     58     def maybe_start(self, timeout=5):
     59         """Starts the background process to run multiplex ssh connection.
     60 
     61         If there already is a background process running, this does nothing.
     62         If there is a stale process or a stale socket, first clean them up,
     63         then create a background process.
     64 
     65         @param timeout: timeout in seconds (default 5) to wait for master ssh
     66                         connection to be established. If timeout is reached, a
     67                         warning message is logged, but no other action is
     68                         taken.
     69         """
     70         # Multiple processes might try in parallel to clean up the old master
     71         # ssh connection and create a new one, therefore use a lock to protect
     72         # against race conditions.
     73         with self._lock:
     74             # If a previously started master SSH connection is not running
     75             # anymore, it needs to be cleaned up and then restarted.
     76             if (self._master_job and (not os.path.exists(self._socket_path) or
     77                                       self._master_job.sp.poll() is not None)):
     78                 logging.info(
     79                         'Master ssh connection to %s is down.', self._hostname)
     80                 self._close_internal()
     81 
     82             # Start a new master SSH connection.
     83             if not self._master_job:
     84                 # Create a shared socket in a temp location.
     85                 self._master_tempdir = autotemp.tempdir(
     86                         unique_id='ssh-master', dir='/tmp')
     87 
     88                 # Start the master SSH connection in the background.
     89                 master_cmd = _MASTER_SSH_COMMAND_TEMPLATE % {
     90                         'hostname': self._hostname,
     91                         'user': self._user,
     92                         'port': self._port,
     93                         'socket': self._socket_path,
     94                 }
     95                 logging.info(
     96                         'Starting master ssh connection \'%s\'', master_cmd)
     97                 self._master_job = utils.BgJob(
     98                          master_cmd, nickname='master-ssh',
     99                          stdout_tee=utils.DEVNULL, stderr_tee=utils.DEVNULL,
    100                          unjoinable=True)
    101 
    102                 # To prevent a race between the master ssh connection
    103                 # startup and its first attempted use, wait for socket file to
    104                 # exist before returning.
    105                 try:
    106                     utils.poll_for_condition(
    107                             condition=lambda: os.path.exists(self._socket_path),
    108                             timeout=timeout,
    109                             sleep_interval=0.2,
    110                             desc='Wait for a socket file to exist')
    111                 # log the issue if it fails, but don't throw an exception
    112                 except utils.TimeoutError:
    113                     logging.info('Timed out waiting for master-ssh connection '
    114                                  'to be established.')
    115 
    116 
    117     def close(self):
    118         """Releases all resources used by multiplexed ssh connection."""
    119         with self._lock:
    120             self._close_internal()
    121 
    122     def _close_internal(self):
    123         # Assume that when this is called, _lock should be acquired, already.
    124         if self._master_job:
    125             logging.debug('Nuking ssh master_job')
    126             utils.nuke_subprocess(self._master_job.sp)
    127             self._master_job = None
    128 
    129         if self._master_tempdir:
    130             logging.debug('Cleaning ssh master_tempdir')
    131             self._master_tempdir.clean()
    132             self._master_tempdir = None
    133 
    134 
    135 class ConnectionPool(object):
    136     """Holds SSH multiplex connection instance."""
    137 
    138     def __init__(self):
    139         self._pool = {}
    140         self._lock = threading.Lock()
    141 
    142     def get(self, hostname, user, port):
    143         """Returns MasterSsh instance for the given endpoint.
    144 
    145         If the pool holds the instance already, returns it. If not, create the
    146         instance, and returns it.
    147 
    148         Caller has the responsibility to call maybe_start() before using it.
    149 
    150         @param hostname: Host name of the endpoint.
    151         @param user: User name to log in.
    152         @param port: Port number sshd is listening.
    153         """
    154         key = (hostname, user, port)
    155         logging.debug('Get master ssh connection for %s@%s:%d', user, hostname,
    156                       port)
    157 
    158         with self._lock:
    159             conn = self._pool.get(key)
    160             if not conn:
    161                 conn = MasterSsh(hostname, user, port)
    162                 self._pool[key] = conn
    163             return conn
    164 
    165     def shutdown(self):
    166         """Closes all ssh multiplex connections."""
    167         for ssh in self._pool.itervalues():
    168             ssh.close()
    169