1 # Copyright 2017 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging 6 import os 7 import socket 8 import sys 9 import threading 10 from contextlib import contextmanager 11 from multiprocessing import connection 12 13 import common 14 from autotest_lib.site_utils.lxc import constants 15 from autotest_lib.site_utils.lxc.container_pool import message 16 17 18 # Default server-side timeout in seconds; limits time to fetch container. 19 _SERVER_CONNECTION_TIMEOUT = 1 20 # Extra timeout to use on the client side; limits network communication time. 21 _CLIENT_CONNECTION_TIMEOUT = 5 22 23 class Client(object): 24 """A class for communicating with a container pool service. 25 26 The Client class enables clients to communicate with a running container 27 pool service - for example, to query current status, or to obtain a new 28 container. 29 30 Here is an example usage: 31 32 def status(); 33 client = Client(pool_address, timeout) 34 print(client.get_status()) 35 client.close() 36 37 In addition, the class provides a context manager for easier cleanup: 38 39 def status(): 40 with Client.connect(pool_address, timeout) as client: 41 print(client.get_status()) 42 """ 43 44 def __init__(self, address=None, timeout=_SERVER_CONNECTION_TIMEOUT): 45 """Initializes a new Client object. 46 47 @param address: The address of the pool to connect to. 48 @param timeout: A connection timeout, in seconds. 49 50 @raises socket.error: If some other miscelleneous socket error occurs 51 (e.g. if the socket does not exist) 52 @raises socket.timeout: If the connection is not established before the 53 given timeout expires. 54 """ 55 if address is None: 56 address = os.path.join( 57 constants.DEFAULT_SHARED_HOST_PATH, 58 constants.DEFAULT_CONTAINER_POOL_SOCKET) 59 self._connection = _ConnectionHelper(address).connect(timeout) 60 61 62 @classmethod 63 @contextmanager 64 def connect(cls, address, timeout): 65 """A ContextManager for Client objects. 66 67 @param address: The address of the pool's communication socket. 68 @param timeout: A connection timeout, in seconds. 69 70 @return: A Client connected to the domain socket on the given address. 71 72 @raises socket.error: If some other miscelleneous socket error occurs 73 (e.g. if the socket does not exist) 74 @raises socket.timeout: If the connection is not established before the 75 given timeout expires. 76 """ 77 client = Client(address, timeout) 78 try: 79 yield client 80 finally: 81 client.close() 82 83 84 def close(self): 85 """Closes the client connection.""" 86 self._connection.close() 87 self._connection = None 88 89 90 def get_container(self, id, timeout): 91 """Retrieves a container from the pool service. 92 93 @param id: A ContainerId to assign to the container. Containers require 94 an ID when they are dissociated from the pool, so that they 95 can be tracked. 96 @param timeout: A timeout (in seconds) to wait for the operation to 97 complete. A timeout of 0 will return immediately if no 98 containers are available. 99 100 @return: A container from the pool, when one becomes available, or None 101 if no containers were available within the specified timeout. 102 """ 103 self._connection.send(message.get(id, timeout)) 104 # The service side guarantees that it always returns something 105 # (i.e. a Container, or None) within the specified timeout period, or 106 # to wait indefinitely if given None. 107 # However, we don't entirely trust it and account for network problems. 108 if timeout is None or self._connection.poll( 109 timeout + _CLIENT_CONNECTION_TIMEOUT): 110 return self._connection.recv() 111 else: 112 logging.debug('No container (id=%s). Connection failed.', id) 113 return None 114 115 116 def get_status(self): 117 """Gets the status of the connected Pool.""" 118 self._connection.send(message.status()) 119 return self._connection.recv() 120 121 122 def shutdown(self): 123 """Stops the service.""" 124 self._connection.send(message.shutdown()) 125 # Wait for ack. 126 self._connection.recv() 127 128 129 class _ConnectionHelper(threading.Thread): 130 """Factory class for making client connections with a timeout. 131 132 Instantiate this with an address, and call connect. The factory will take 133 care of polling for a connection. If a connection is not established within 134 a set period of time, the make_connction call will raise a socket.timeout 135 exception instead of hanging indefinitely. 136 """ 137 def __init__(self, address): 138 super(_ConnectionHelper, self).__init__() 139 # Use a daemon thread, so that if this thread hangs, it doesn't keep the 140 # parent thread alive. All daemon threads die when the parent process 141 # dies. 142 self.daemon = True 143 self._address = address 144 self._client = None 145 self._exc_info = None 146 147 148 def run(self): 149 """Instantiates a connection.Client.""" 150 try: 151 logging.debug('Attempting connection to %s', self._address) 152 self._client = connection.Client(self._address) 153 logging.debug('Connection to %s successful', self._address) 154 except Exception: 155 self._exc_info = sys.exc_info() 156 157 158 def connect(self, timeout): 159 """Attempts to create a connection.Client with a timeout. 160 161 Every 5 seconds a warning will be logged for debugging purposes. After 162 the timeout expires, the function will raise a socket.timout error. 163 164 @param timeout: A connection timeout, in seconds. 165 166 @return: A connection.Client connected using the address that was 167 specified when this factory was created. 168 169 @raises socket.timeout: If the connection is not established before the 170 given timeout expires. 171 """ 172 # Start the thread, which attempts to open the connection. 173 self.start() 174 # Poll approximately once a second, so clients don't wait forever. 175 # Range starts at 1 for readability (so the message below doesn't say 0 176 # seconds). 177 # Range ends at timeout+2 so that a timeout of 0 results in at least 1 178 # try. 179 for i in range(1, timeout + 2): 180 self.join(1) 181 if self._exc_info is not None: 182 raise self._exc_info[0], self._exc_info[1], self._exc_info[2] 183 elif self._client is not None: 184 return self._client 185 186 # Log a warning when we first detect a potential problem, then every 187 # 5 seconds after that. 188 if i < 3 or i % 5 == 0: 189 logging.warning( 190 'Test client failed to connect after %s seconds.', i) 191 # Still no connection - time out. 192 raise socket.timeout('Test client timed out waiting for connection.') 193