Home | History | Annotate | Download | only in container_pool
      1 # Copyright 2017 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import os
      7 import threading
      8 import time
      9 
     10 import common
     11 from autotest_lib.client.common_lib import utils
     12 from autotest_lib.site_utils.lxc import base_image
     13 from autotest_lib.site_utils.lxc import constants
     14 from autotest_lib.site_utils.lxc import container_factory
     15 from autotest_lib.site_utils.lxc import zygote
     16 from autotest_lib.site_utils.lxc.constants import \
     17     CONTAINER_POOL_METRICS_PREFIX as METRICS_PREFIX
     18 from autotest_lib.site_utils.lxc.container_pool import async_listener
     19 from autotest_lib.site_utils.lxc.container_pool import error
     20 from autotest_lib.site_utils.lxc.container_pool import message
     21 from autotest_lib.site_utils.lxc.container_pool import pool
     22 
     23 try:
     24     import cPickle as pickle
     25 except:
     26     import pickle
     27 
     28 try:
     29     from chromite.lib import metrics
     30     from infra_libs import ts_mon
     31 except ImportError:
     32     import mock
     33     metrics = utils.metrics_mock
     34     ts_mon = mock.Mock()
     35 
     36 
     37 # The minimum period between polling for new connections, in seconds.
     38 _MIN_POLLING_PERIOD = 0.1
     39 
     40 
     41 class Service(object):
     42     """A manager for a pool of LXC containers.
     43 
     44     The Service class manages client communication with an underlying container
     45     pool.  It listens for incoming client connections, then spawns threads to
     46     deal with communication with each client.
     47     """
     48 
     49     def __init__(self, host_dir, pool=None):
     50         """Sets up a new container pool service.
     51 
     52         @param host_dir: A SharedHostDir.  This will be used for Zygote
     53                          configuration as well as for general pool operation
     54                          (e.g. opening linux domain sockets for communication).
     55         @param pool: (for testing) A container pool that the service will
     56                      maintain.  This parameter exists for DI, for testing.
     57                      Under normal circumstances the service instantiates the
     58                      container pool internally.
     59         """
     60         # Create socket for receiving container pool requests.  This also acts
     61         # as a mutex, preventing multiple container pools from being
     62         # instantiated.
     63         self._socket_path = os.path.join(
     64                 host_dir.path, constants.DEFAULT_CONTAINER_POOL_SOCKET)
     65         self._connection_listener = async_listener.AsyncListener(
     66                 self._socket_path)
     67         self._client_threads = []
     68         self._stop_event = None
     69         self._running = False
     70         self._pool = pool
     71 
     72 
     73     def start(self, pool_size=constants.DEFAULT_CONTAINER_POOL_SIZE):
     74         """Starts the service.
     75 
     76         @param pool_size: The desired size of the container pool.  This
     77                           parameter has no effect if a pre-created pool was DI'd
     78                           into the Service constructor.
     79         """
     80         self._running = True
     81 
     82         # Start the container pool.
     83         if self._pool is None:
     84             factory = container_factory.ContainerFactory(
     85                     base_container=base_image.BaseImage().get(),
     86                     container_class=zygote.Zygote)
     87             self._pool = pool.Pool(factory=factory, size=pool_size)
     88 
     89         # Start listening asynchronously for incoming connections.
     90         self._connection_listener.start()
     91 
     92         # Poll for incoming connections, and spawn threads to handle them.
     93         logging.debug('Start event loop.')
     94         while self._stop_event is None:
     95             self._handle_incoming_connections()
     96             self._cleanup_closed_connections()
     97             # TODO(kenobi): Poll for and log errors from pool.
     98             metrics.Counter(METRICS_PREFIX + '/tick').increment()
     99             time.sleep(_MIN_POLLING_PERIOD)
    100 
    101         logging.debug('Exit event loop.')
    102 
    103         # Stopped - stop all the client threads, stop listening, then signal
    104         # that shutdown is complete.
    105         for thread in self._client_threads:
    106             thread.stop()
    107         try:
    108             self._connection_listener.close()
    109         except Exception as e:
    110             logging.error('Error stopping pool service: %r', e)
    111             raise
    112         finally:
    113             # Clean up the container pool.
    114             self._pool.cleanup()
    115             # Make sure state is consistent.
    116             self._stop_event.set()
    117             self._stop_event = None
    118             self._running = False
    119             metrics.Counter(METRICS_PREFIX + '/service_stopped').increment()
    120             logging.debug('Container pool stopped.')
    121 
    122 
    123     def stop(self):
    124         """Stops the service."""
    125         self._stop_event = threading.Event()
    126         return self._stop_event
    127 
    128 
    129     def is_running(self):
    130         """Returns whether or not the service is currently running."""
    131         return self._running
    132 
    133 
    134     def get_status(self):
    135         """Returns a dictionary of values describing the current status."""
    136         status = {}
    137         status['running'] = self._running
    138         status['socket_path'] = self._socket_path
    139         if self._running:
    140             status['pool capacity'] = self._pool.capacity
    141             status['pool size'] = self._pool.size
    142             status['pool worker count'] = self._pool.worker_count
    143             status['pool errors'] = self._pool.errors.qsize()
    144             status['client thread count'] = len(self._client_threads)
    145         return status
    146 
    147 
    148     def _handle_incoming_connections(self):
    149         """Checks for connections, and spawn sub-threads to handle requests."""
    150         connection = self._connection_listener.get_connection()
    151         if connection is not None:
    152             # Spawn a thread to deal with the new connection.
    153             thread = _ClientThread(self, self._pool, connection)
    154             thread.start()
    155             self._client_threads.append(thread)
    156             thread_count = len(self._client_threads)
    157             metrics.Counter(METRICS_PREFIX + '/client_threads'
    158                           ).increment_by(thread_count)
    159             logging.debug('Client thread count: %d', thread_count)
    160 
    161 
    162     def _cleanup_closed_connections(self):
    163         """Cleans up dead client threads."""
    164         # We don't need to lock because all operations on self._client_threads
    165         # take place on the main thread.
    166         self._client_threads = [t for t in self._client_threads if t.is_alive()]
    167 
    168 
    169 class _ClientThread(threading.Thread):
    170     """A class that handles communication with a pool client.
    171 
    172     Use a thread-per-connection model instead of select()/poll() for a few
    173     reasons:
    174     - the number of simultaneous clients is not expected to be high enough for
    175       select or poll to really pay off.
    176     - one thread per connection is more robust - if a single client somehow
    177       crashes its communication thread, that will not affect the other
    178       communication threads or the main pool service.
    179     """
    180 
    181     def __init__(self, service, pool, connection):
    182         self._service = service
    183         self._pool = pool
    184         self._connection = connection
    185         self._running = False
    186         super(_ClientThread, self).__init__(name='client_thread')
    187 
    188 
    189     def run(self):
    190         """Handles messages coming in from clients.
    191 
    192         The thread main loop monitors the connection and handles incoming
    193         messages.  Polling is used so that the loop condition can be checked
    194         regularly - this enables the thread to exit cleanly if required.
    195 
    196         Any kind of error will exit the event loop and close the connection.
    197         """
    198         logging.debug('Start event loop.')
    199         try:
    200             self._running = True
    201             while self._running:
    202                 # Poll and deal with messages every second.  The timeout enables
    203                 # the thread to exit cleanly when stop() is called.
    204                 if self._connection.poll(1):
    205                     try:
    206                         msg = self._connection.recv()
    207                     except (AttributeError,
    208                             ImportError,
    209                             IndexError,
    210                             pickle.UnpicklingError) as e:
    211                         # All of these can occur while unpickling data.
    212                         logging.error('Error while receiving message: %r', e)
    213                         # Exit if an error occurs
    214                         break
    215                     except EOFError:
    216                         # EOFError means the client closed the connection.  This
    217                         # is not an error - just exit.
    218                         break
    219 
    220                     try:
    221                         response = self._handle_message(msg)
    222                         # Always send the response, even if it's None.  This
    223                         # provides more consistent client-side behaviour.
    224                         self._connection.send(response)
    225                     except error.UnknownMessageTypeError as e:
    226                         # The message received was a valid python object, but
    227                         # not a valid Message.
    228                         logging.error('Message error: %s', e)
    229                         # Exit if an error occurs
    230                         break
    231                     except EOFError:
    232                         # EOFError means the client closed the connection early.
    233                         # TODO(chromium:794685): Return container to pool.
    234                         logging.error('Client closed connection before return.')
    235                         break
    236 
    237         finally:
    238             # Always close the connection.
    239             logging.debug('Exit event loop.')
    240             self._connection.close()
    241 
    242 
    243     def stop(self):
    244         """Stops the client thread."""
    245         self._running = False
    246 
    247 
    248     def _handle_message(self, msg):
    249         """Handles incoming messages.
    250 
    251         @param msg: The incoming message to be handled.
    252 
    253         @return: A pickleable object (or None) that should be sent back to the
    254                  client.
    255         """
    256 
    257         # Only handle Message objects.
    258         if not isinstance(msg, message.Message):
    259             raise error.UnknownMessageTypeError(
    260                     'Invalid message class %s' % type(msg))
    261 
    262         # Use a dispatch table to simulate switch/case.
    263         handlers = {
    264             message.ECHO: self._echo,
    265             message.GET: self._get,
    266             message.SHUTDOWN: self._shutdown,
    267             message.STATUS: self._status,
    268         }
    269         try:
    270             return handlers[msg.type](**msg.args)
    271         except KeyError:
    272             raise error.UnknownMessageTypeError(
    273                     'Invalid message type %s' % msg.type)
    274 
    275 
    276     def _echo(self, msg):
    277         """Handles ECHO messages.
    278 
    279         @param msg: A string that will be echoed back to the client.
    280 
    281         @return: The message, for echoing back to the client.
    282         """
    283         # Just echo the message back, for testing aliveness.
    284         logging.debug('Echo: %r', msg)
    285         return msg
    286 
    287 
    288     def _shutdown(self):
    289         """Handles SHUTDOWN messages.
    290 
    291         @return: An ACK message.  This function is synchronous (i.e. it blocks,
    292                  and only returns the ACK when shutdown is complete).
    293         """
    294         logging.debug('Received shutdown request.')
    295         # Request shutdown.  Wait for the service to actually stop before
    296         # sending the response.
    297         self._service.stop().wait()
    298         logging.debug('Service shutdown complete.')
    299         return message.ack()
    300 
    301 
    302     def _status(self):
    303         """Handles STATUS messages.
    304 
    305         @return: The result of the service status call.
    306         """
    307         logging.debug('Received status request.')
    308         return self._service.get_status()
    309 
    310 
    311     def _get(self, id, timeout):
    312         """Gets a container from the pool.
    313 
    314         @param id: A ContainerId to assign to the new container.
    315         @param timeout: A timeout (in seconds) to wait for the pool.  If a
    316                         container is not available from the pool within the
    317                         given period, None will be returned.
    318 
    319         @return: A container from the pool.
    320         """
    321         logging.debug('Received get request (id=%s)', id)
    322         container = self._pool.get(timeout)
    323         # Assign an ID to the container as soon as it is removed from the pool.
    324         # This associates the container with the process to which it will be
    325         # handed off.
    326         if container is not None:
    327             logging.debug(
    328                 'Assigning container (name=%s, id=%s)', container.name, id)
    329             container.id = id
    330         else:
    331             logging.debug('No container (id=%s)', id)
    332         metrics.Counter(METRICS_PREFIX + '/container_requests',
    333                         field_spec=[ts_mon.BooleanField('success')]
    334                         ).increment(fields={'success': (container is not None)})
    335         return container
    336