Home | History | Annotate | Download | only in hosts
      1 # Copyright 2017 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import multiprocessing
      7 import os
      8 import threading
      9 import time
     10 
     11 from autotest_lib.client.common_lib import autotemp
     12 from autotest_lib.server import utils
     13 
     14 _MASTER_SSH_COMMAND_TEMPLATE = (
     15     '/usr/bin/ssh -a -x -N '
     16     '-o ControlMaster=yes '  # Create multiplex socket.
     17     '-o ControlPath=%(socket)s '
     18     '-o StrictHostKeyChecking=no '
     19     '-o UserKnownHostsFile=/dev/null '
     20     '-o BatchMode=yes '
     21     '-o ConnectTimeout=30 '
     22     '-o ServerAliveInterval=900 '
     23     '-o ServerAliveCountMax=3 '
     24     '-o ConnectionAttempts=4 '
     25     '-o Protocol=2 '
     26     '-l %(user)s -p %(port)d %(hostname)s')
     27 
     28 
     29 class MasterSsh(object):
     30     """Manages multiplex ssh connection."""
     31 
     32     def __init__(self, hostname, user, port):
     33         self._hostname = hostname
     34         self._user = user
     35         self._port = port
     36 
     37         self._master_job = None
     38         self._master_tempdir = None
     39 
     40         self._lock = multiprocessing.Lock()
     41 
     42     def __del__(self):
     43         self.close()
     44 
     45     @property
     46     def _socket_path(self):
     47         return os.path.join(self._master_tempdir.name, 'socket')
     48 
     49     @property
     50     def ssh_option(self):
     51         """Returns the ssh option to use this multiplexed ssh.
     52 
     53         If background process is not running, returns an empty string.
     54         """
     55         if not self._master_tempdir:
     56             return ''
     57         return '-o ControlPath=%s' % (self._socket_path,)
     58 
     59     def maybe_start(self, timeout=5):
     60         """Starts the background process to run multiplex ssh connection.
     61 
     62         If there already is a background process running, this does nothing.
     63         If there is a stale process or a stale socket, first clean them up,
     64         then create a background process.
     65 
     66         @param timeout: timeout in seconds (default 5) to wait for master ssh
     67                         connection to be established. If timeout is reached, a
     68                         warning message is logged, but no other action is
     69                         taken.
     70         """
     71         # Multiple processes might try in parallel to clean up the old master
     72         # ssh connection and create a new one, therefore use a lock to protect
     73         # against race conditions.
     74         with self._lock:
     75             # If a previously started master SSH connection is not running
     76             # anymore, it needs to be cleaned up and then restarted.
     77             if (self._master_job and (not os.path.exists(self._socket_path) or
     78                                       self._master_job.sp.poll() is not None)):
     79                 logging.info(
     80                         'Master ssh connection to %s is down.', self._hostname)
     81                 self._close_internal()
     82 
     83             # Start a new master SSH connection.
     84             if not self._master_job:
     85                 # Create a shared socket in a temp location.
     86                 self._master_tempdir = autotemp.tempdir(unique_id='ssh-master')
     87 
     88                 # Start the master SSH connection in the background.
     89                 master_cmd = _MASTER_SSH_COMMAND_TEMPLATE % {
     90                         'hostname': self._hostname,
     91                         'user': self._user,
     92                         'port': self._port,
     93                         'socket': self._socket_path,
     94                 }
     95                 logging.info(
     96                         'Starting master ssh connection \'%s\'', master_cmd)
     97                 self._master_job = utils.BgJob(
     98                          master_cmd, nickname='master-ssh',
     99                          stdout_tee=utils.DEVNULL, stderr_tee=utils.DEVNULL,
    100                          unjoinable=True)
    101 
    102                 # To prevent a race between the the master ssh connection
    103                 # startup and its first attempted use, wait for socket file to
    104                 # exist before returning.
    105                 end_time = time.time() + timeout
    106                 while time.time() < end_time:
    107                     if os.path.exists(self._socket_path):
    108                         break
    109                     time.sleep(.2)
    110                 else:
    111                     logging.info('Timed out waiting for master-ssh connection '
    112                        'to be established.')
    113 
    114     def close(self):
    115         """Releases all resources used by multiplexed ssh connection."""
    116         with self._lock:
    117             self._close_internal()
    118 
    119     def _close_internal(self):
    120         # Assume that when this is called, _lock should be acquired, already.
    121         if self._master_job:
    122             logging.debug('Nuking ssh master_job')
    123             utils.nuke_subprocess(self._master_job.sp)
    124             self._master_job = None
    125 
    126         if self._master_tempdir:
    127             logging.debug('Cleaning ssh master_tempdir')
    128             self._master_tempdir.clean()
    129             self._master_tempdir = None
    130 
    131 
    132 class ConnectionPool(object):
    133     """Holds SSH multiplex connection instance."""
    134 
    135     def __init__(self):
    136         self._pool = {}
    137         self._lock = threading.Lock()
    138 
    139     def get(self, hostname, user, port):
    140         """Returns MasterSsh instance for the given endpoint.
    141 
    142         If the pool holds the instance already, returns it. If not, create the
    143         instance, and returns it.
    144 
    145         Caller has the responsibility to call maybe_start() before using it.
    146 
    147         @param hostname: Host name of the endpoint.
    148         @param user: User name to log in.
    149         @param port: Port number sshd is listening.
    150         """
    151         key = (hostname, user, port)
    152         logging.debug('Get master ssh connection for %s@%s:%d', user, hostname,
    153                       port)
    154 
    155         with self._lock:
    156             conn = self._pool.get(key)
    157             if not conn:
    158                 conn = MasterSsh(hostname, user, port)
    159                 self._pool[key] = conn
    160             return conn
    161 
    162     def shutdown(self):
    163         """Closes all ssh multiplex connections."""
    164         for ssh in self._pool.itervalues():
    165             ssh.close()
    166