1 # Copyright 2017 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging 6 import os 7 import threading 8 import time 9 10 import common 11 from autotest_lib.client.common_lib import utils 12 from autotest_lib.site_utils.lxc import base_image 13 from autotest_lib.site_utils.lxc import constants 14 from autotest_lib.site_utils.lxc import container_factory 15 from autotest_lib.site_utils.lxc import zygote 16 from autotest_lib.site_utils.lxc.constants import \ 17 CONTAINER_POOL_METRICS_PREFIX as METRICS_PREFIX 18 from autotest_lib.site_utils.lxc.container_pool import async_listener 19 from autotest_lib.site_utils.lxc.container_pool import error 20 from autotest_lib.site_utils.lxc.container_pool import message 21 from autotest_lib.site_utils.lxc.container_pool import pool 22 23 try: 24 import cPickle as pickle 25 except: 26 import pickle 27 28 try: 29 from chromite.lib import metrics 30 from infra_libs import ts_mon 31 except ImportError: 32 import mock 33 metrics = utils.metrics_mock 34 ts_mon = mock.Mock() 35 36 37 # The minimum period between polling for new connections, in seconds. 38 _MIN_POLLING_PERIOD = 0.1 39 40 41 class Service(object): 42 """A manager for a pool of LXC containers. 43 44 The Service class manages client communication with an underlying container 45 pool. It listens for incoming client connections, then spawns threads to 46 deal with communication with each client. 47 """ 48 49 def __init__(self, host_dir, pool=None): 50 """Sets up a new container pool service. 51 52 @param host_dir: A SharedHostDir. This will be used for Zygote 53 configuration as well as for general pool operation 54 (e.g. opening linux domain sockets for communication). 55 @param pool: (for testing) A container pool that the service will 56 maintain. This parameter exists for DI, for testing. 57 Under normal circumstances the service instantiates the 58 container pool internally. 59 """ 60 # Create socket for receiving container pool requests. This also acts 61 # as a mutex, preventing multiple container pools from being 62 # instantiated. 63 self._socket_path = os.path.join( 64 host_dir.path, constants.DEFAULT_CONTAINER_POOL_SOCKET) 65 self._connection_listener = async_listener.AsyncListener( 66 self._socket_path) 67 self._client_threads = [] 68 self._stop_event = None 69 self._running = False 70 self._pool = pool 71 72 73 def start(self, pool_size=constants.DEFAULT_CONTAINER_POOL_SIZE): 74 """Starts the service. 75 76 @param pool_size: The desired size of the container pool. This 77 parameter has no effect if a pre-created pool was DI'd 78 into the Service constructor. 79 """ 80 self._running = True 81 82 # Start the container pool. 83 if self._pool is None: 84 factory = container_factory.ContainerFactory( 85 base_container=base_image.BaseImage().get(), 86 container_class=zygote.Zygote) 87 self._pool = pool.Pool(factory=factory, size=pool_size) 88 89 # Start listening asynchronously for incoming connections. 90 self._connection_listener.start() 91 92 # Poll for incoming connections, and spawn threads to handle them. 93 logging.debug('Start event loop.') 94 while self._stop_event is None: 95 self._handle_incoming_connections() 96 self._cleanup_closed_connections() 97 # TODO(kenobi): Poll for and log errors from pool. 98 metrics.Counter(METRICS_PREFIX + '/tick').increment() 99 time.sleep(_MIN_POLLING_PERIOD) 100 101 logging.debug('Exit event loop.') 102 103 # Stopped - stop all the client threads, stop listening, then signal 104 # that shutdown is complete. 105 for thread in self._client_threads: 106 thread.stop() 107 try: 108 self._connection_listener.close() 109 except Exception as e: 110 logging.error('Error stopping pool service: %r', e) 111 raise 112 finally: 113 # Clean up the container pool. 114 self._pool.cleanup() 115 # Make sure state is consistent. 116 self._stop_event.set() 117 self._stop_event = None 118 self._running = False 119 metrics.Counter(METRICS_PREFIX + '/service_stopped').increment() 120 logging.debug('Container pool stopped.') 121 122 123 def stop(self): 124 """Stops the service.""" 125 self._stop_event = threading.Event() 126 return self._stop_event 127 128 129 def is_running(self): 130 """Returns whether or not the service is currently running.""" 131 return self._running 132 133 134 def get_status(self): 135 """Returns a dictionary of values describing the current status.""" 136 status = {} 137 status['running'] = self._running 138 status['socket_path'] = self._socket_path 139 if self._running: 140 status['pool capacity'] = self._pool.capacity 141 status['pool size'] = self._pool.size 142 status['pool worker count'] = self._pool.worker_count 143 status['pool errors'] = self._pool.errors.qsize() 144 status['client thread count'] = len(self._client_threads) 145 return status 146 147 148 def _handle_incoming_connections(self): 149 """Checks for connections, and spawn sub-threads to handle requests.""" 150 connection = self._connection_listener.get_connection() 151 if connection is not None: 152 # Spawn a thread to deal with the new connection. 153 thread = _ClientThread(self, self._pool, connection) 154 thread.start() 155 self._client_threads.append(thread) 156 thread_count = len(self._client_threads) 157 metrics.Counter(METRICS_PREFIX + '/client_threads' 158 ).increment_by(thread_count) 159 logging.debug('Client thread count: %d', thread_count) 160 161 162 def _cleanup_closed_connections(self): 163 """Cleans up dead client threads.""" 164 # We don't need to lock because all operations on self._client_threads 165 # take place on the main thread. 166 self._client_threads = [t for t in self._client_threads if t.is_alive()] 167 168 169 class _ClientThread(threading.Thread): 170 """A class that handles communication with a pool client. 171 172 Use a thread-per-connection model instead of select()/poll() for a few 173 reasons: 174 - the number of simultaneous clients is not expected to be high enough for 175 select or poll to really pay off. 176 - one thread per connection is more robust - if a single client somehow 177 crashes its communication thread, that will not affect the other 178 communication threads or the main pool service. 179 """ 180 181 def __init__(self, service, pool, connection): 182 self._service = service 183 self._pool = pool 184 self._connection = connection 185 self._running = False 186 super(_ClientThread, self).__init__(name='client_thread') 187 188 189 def run(self): 190 """Handles messages coming in from clients. 191 192 The thread main loop monitors the connection and handles incoming 193 messages. Polling is used so that the loop condition can be checked 194 regularly - this enables the thread to exit cleanly if required. 195 196 Any kind of error will exit the event loop and close the connection. 197 """ 198 logging.debug('Start event loop.') 199 try: 200 self._running = True 201 while self._running: 202 # Poll and deal with messages every second. The timeout enables 203 # the thread to exit cleanly when stop() is called. 204 if self._connection.poll(1): 205 try: 206 msg = self._connection.recv() 207 except (AttributeError, 208 ImportError, 209 IndexError, 210 pickle.UnpicklingError) as e: 211 # All of these can occur while unpickling data. 212 logging.error('Error while receiving message: %r', e) 213 # Exit if an error occurs 214 break 215 except EOFError: 216 # EOFError means the client closed the connection. This 217 # is not an error - just exit. 218 break 219 220 try: 221 response = self._handle_message(msg) 222 # Always send the response, even if it's None. This 223 # provides more consistent client-side behaviour. 224 self._connection.send(response) 225 except error.UnknownMessageTypeError as e: 226 # The message received was a valid python object, but 227 # not a valid Message. 228 logging.error('Message error: %s', e) 229 # Exit if an error occurs 230 break 231 except EOFError: 232 # EOFError means the client closed the connection early. 233 # TODO(chromium:794685): Return container to pool. 234 logging.error('Client closed connection before return.') 235 break 236 237 finally: 238 # Always close the connection. 239 logging.debug('Exit event loop.') 240 self._connection.close() 241 242 243 def stop(self): 244 """Stops the client thread.""" 245 self._running = False 246 247 248 def _handle_message(self, msg): 249 """Handles incoming messages. 250 251 @param msg: The incoming message to be handled. 252 253 @return: A pickleable object (or None) that should be sent back to the 254 client. 255 """ 256 257 # Only handle Message objects. 258 if not isinstance(msg, message.Message): 259 raise error.UnknownMessageTypeError( 260 'Invalid message class %s' % type(msg)) 261 262 # Use a dispatch table to simulate switch/case. 263 handlers = { 264 message.ECHO: self._echo, 265 message.GET: self._get, 266 message.SHUTDOWN: self._shutdown, 267 message.STATUS: self._status, 268 } 269 try: 270 return handlers[msg.type](**msg.args) 271 except KeyError: 272 raise error.UnknownMessageTypeError( 273 'Invalid message type %s' % msg.type) 274 275 276 def _echo(self, msg): 277 """Handles ECHO messages. 278 279 @param msg: A string that will be echoed back to the client. 280 281 @return: The message, for echoing back to the client. 282 """ 283 # Just echo the message back, for testing aliveness. 284 logging.debug('Echo: %r', msg) 285 return msg 286 287 288 def _shutdown(self): 289 """Handles SHUTDOWN messages. 290 291 @return: An ACK message. This function is synchronous (i.e. it blocks, 292 and only returns the ACK when shutdown is complete). 293 """ 294 logging.debug('Received shutdown request.') 295 # Request shutdown. Wait for the service to actually stop before 296 # sending the response. 297 self._service.stop().wait() 298 logging.debug('Service shutdown complete.') 299 return message.ack() 300 301 302 def _status(self): 303 """Handles STATUS messages. 304 305 @return: The result of the service status call. 306 """ 307 logging.debug('Received status request.') 308 return self._service.get_status() 309 310 311 def _get(self, id, timeout): 312 """Gets a container from the pool. 313 314 @param id: A ContainerId to assign to the new container. 315 @param timeout: A timeout (in seconds) to wait for the pool. If a 316 container is not available from the pool within the 317 given period, None will be returned. 318 319 @return: A container from the pool. 320 """ 321 logging.debug('Received get request (id=%s)', id) 322 container = self._pool.get(timeout) 323 # Assign an ID to the container as soon as it is removed from the pool. 324 # This associates the container with the process to which it will be 325 # handed off. 326 if container is not None: 327 logging.debug( 328 'Assigning container (name=%s, id=%s)', container.name, id) 329 container.id = id 330 else: 331 logging.debug('No container (id=%s)', id) 332 metrics.Counter(METRICS_PREFIX + '/container_requests', 333 field_spec=[ts_mon.BooleanField('success')] 334 ).increment(fields={'success': (container is not None)}) 335 return container 336