Home | History | Annotate | Download | only in container_pool
      1 # Copyright 2017 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import os
      7 import socket
      8 import sys
      9 import threading
     10 from contextlib import contextmanager
     11 from multiprocessing import connection
     12 
     13 import common
     14 from autotest_lib.site_utils.lxc import constants
     15 from autotest_lib.site_utils.lxc.container_pool import message
     16 
     17 
     18 # Default server-side timeout in seconds; limits time to fetch container.
     19 _SERVER_CONNECTION_TIMEOUT = 1
     20 # Extra timeout to use on the client side; limits network communication time.
     21 _CLIENT_CONNECTION_TIMEOUT = 5
     22 
     23 class Client(object):
     24     """A class for communicating with a container pool service.
     25 
     26     The Client class enables clients to communicate with a running container
     27     pool service - for example, to query current status, or to obtain a new
     28     container.
     29 
     30     Here is an example usage:
     31 
     32     def status();
     33       client = Client(pool_address, timeout)
     34       print(client.get_status())
     35       client.close()
     36 
     37     In addition, the class provides a context manager for easier cleanup:
     38 
     39     def status():
     40       with Client.connect(pool_address, timeout) as client:
     41         print(client.get_status())
     42     """
     43 
     44     def __init__(self, address=None, timeout=_SERVER_CONNECTION_TIMEOUT):
     45         """Initializes a new Client object.
     46 
     47         @param address: The address of the pool to connect to.
     48         @param timeout: A connection timeout, in seconds.
     49 
     50         @raises socket.error: If some other miscelleneous socket error occurs
     51                               (e.g. if the socket does not exist)
     52         @raises socket.timeout: If the connection is not established before the
     53                                 given timeout expires.
     54         """
     55         if address is None:
     56             address = os.path.join(
     57                 constants.DEFAULT_SHARED_HOST_PATH,
     58                 constants.DEFAULT_CONTAINER_POOL_SOCKET)
     59         self._connection = _ConnectionHelper(address).connect(timeout)
     60 
     61 
     62     @classmethod
     63     @contextmanager
     64     def connect(cls, address, timeout):
     65         """A ContextManager for Client objects.
     66 
     67         @param address: The address of the pool's communication socket.
     68         @param timeout: A connection timeout, in seconds.
     69 
     70         @return: A Client connected to the domain socket on the given address.
     71 
     72         @raises socket.error: If some other miscelleneous socket error occurs
     73                               (e.g. if the socket does not exist)
     74         @raises socket.timeout: If the connection is not established before the
     75                                 given timeout expires.
     76         """
     77         client = Client(address, timeout)
     78         try:
     79             yield client
     80         finally:
     81             client.close()
     82 
     83 
     84     def close(self):
     85         """Closes the client connection."""
     86         self._connection.close()
     87         self._connection = None
     88 
     89 
     90     def get_container(self, id, timeout):
     91         """Retrieves a container from the pool service.
     92 
     93         @param id: A ContainerId to assign to the container.  Containers require
     94                    an ID when they are dissociated from the pool, so that they
     95                    can be tracked.
     96         @param timeout: A timeout (in seconds) to wait for the operation to
     97                         complete.  A timeout of 0 will return immediately if no
     98                         containers are available.
     99 
    100         @return: A container from the pool, when one becomes available, or None
    101                  if no containers were available within the specified timeout.
    102         """
    103         self._connection.send(message.get(id, timeout))
    104         # The service side guarantees that it always returns something
    105         # (i.e. a Container, or None) within the specified timeout period, or
    106         # to wait indefinitely if given None.
    107         # However, we don't entirely trust it and account for network problems.
    108         if timeout is None or self._connection.poll(
    109                 timeout + _CLIENT_CONNECTION_TIMEOUT):
    110             return self._connection.recv()
    111         else:
    112             logging.debug('No container (id=%s). Connection failed.', id)
    113             return None
    114 
    115 
    116     def get_status(self):
    117         """Gets the status of the connected Pool."""
    118         self._connection.send(message.status())
    119         return self._connection.recv()
    120 
    121 
    122     def shutdown(self):
    123         """Stops the service."""
    124         self._connection.send(message.shutdown())
    125         # Wait for ack.
    126         self._connection.recv()
    127 
    128 
    129 class _ConnectionHelper(threading.Thread):
    130     """Factory class for making client connections with a timeout.
    131 
    132     Instantiate this with an address, and call connect.  The factory will take
    133     care of polling for a connection.  If a connection is not established within
    134     a set period of time, the make_connction call will raise a socket.timeout
    135     exception instead of hanging indefinitely.
    136     """
    137     def __init__(self, address):
    138         super(_ConnectionHelper, self).__init__()
    139         # Use a daemon thread, so that if this thread hangs, it doesn't keep the
    140         # parent thread alive.  All daemon threads die when the parent process
    141         # dies.
    142         self.daemon = True
    143         self._address = address
    144         self._client = None
    145         self._exc_info = None
    146 
    147 
    148     def run(self):
    149         """Instantiates a connection.Client."""
    150         try:
    151             logging.debug('Attempting connection to %s', self._address)
    152             self._client = connection.Client(self._address)
    153             logging.debug('Connection to %s successful', self._address)
    154         except Exception:
    155             self._exc_info = sys.exc_info()
    156 
    157 
    158     def connect(self, timeout):
    159         """Attempts to create a connection.Client with a timeout.
    160 
    161         Every 5 seconds a warning will be logged for debugging purposes.  After
    162         the timeout expires, the function will raise a socket.timout error.
    163 
    164         @param timeout: A connection timeout, in seconds.
    165 
    166         @return: A connection.Client connected using the address that was
    167                  specified when this factory was created.
    168 
    169         @raises socket.timeout: If the connection is not established before the
    170                                 given timeout expires.
    171         """
    172         # Start the thread, which attempts to open the connection.
    173         self.start()
    174         # Poll approximately once a second, so clients don't wait forever.
    175         # Range starts at 1 for readability (so the message below doesn't say 0
    176         # seconds).
    177         # Range ends at timeout+2 so that a timeout of 0 results in at least 1
    178         # try.
    179         for i in range(1, timeout + 2):
    180             self.join(1)
    181             if self._exc_info is not None:
    182                 raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
    183             elif self._client is not None:
    184                 return self._client
    185 
    186             # Log a warning when we first detect a potential problem, then every
    187             # 5 seconds after that.
    188             if i < 3 or i % 5 == 0:
    189                 logging.warning(
    190                     'Test client failed to connect after %s seconds.', i)
    191         # Still no connection - time out.
    192         raise socket.timeout('Test client timed out waiting for connection.')
    193