Home | History | Annotate | Download | only in container_pool
      1 # Copyright 2017 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import Queue
      6 import collections
      7 import logging
      8 import threading
      9 import time
     10 
     11 import common
     12 from autotest_lib.client.bin import utils
     13 from autotest_lib.client.common_lib import error
     14 from autotest_lib.site_utils.lxc.container_pool import error as lxc_error
     15 from autotest_lib.site_utils.lxc.constants import \
     16     CONTAINER_POOL_METRICS_PREFIX as METRICS_PREFIX
     17 
     18 try:
     19     from chromite.lib import metrics
     20     from infra_libs import ts_mon
     21 except ImportError:
     22     import mock
     23     metrics = utils.metrics_mock
     24     ts_mon = mock.Mock()
     25 
     26 # The maximum number of concurrent workers.  Each worker is responsible for
     27 # managing the creation of a single container.
     28 # TODO(kenobi): This may be need to be adjusted for different hosts (e.g. full
     29 # vs quarter shards)
     30 _MAX_CONCURRENT_WORKERS = 5
     31 # Timeout (in seconds) for container creation.  After this amount of time,
     32 # container creation tasks are abandoned and retried.
     33 _CONTAINER_CREATION_TIMEOUT = 600
     34 # The period (in seconds) affects the rate at which the monitor thread runs its
     35 # event loop.  This drives a number of other factors, e.g. how long to wait for
     36 # the thread to respond to shutdown requests.
     37 _MIN_MONITOR_PERIOD = 0.1
     38 # The maximum number of errors per hour.  After this limit is reached, further
     39 # pool creation is throttled.
     40 _MAX_ERRORS_PER_HOUR = 200
     41 
     42 
     43 class Pool(object):
     44     """A fixed-size pool of LXC containers.
     45 
     46     Containers are created using a factory that is passed to the Pool.  A pool
     47     size is passed at construction time - this is the number of containers the
     48     Pool will attempt to maintain.  Whenever the number of containers falls
     49     below the given size, the Pool will start creating new containers to
     50     replenish itself.
     51 
     52     In order to avoid overloading the host, the number of simultaneous container
     53     creations is limited to _MAX_CONCURRENT_WORKERS.
     54 
     55     When container creation produces errors, those errors are saved (see
     56     Pool.errors).  It is the client's responsibility to periodically check and
     57     empty out the error queue.
     58     """
     59 
     60     def __init__(self, factory, size):
     61         """Creates a new Pool instance.
     62 
     63         @param factory: A factory object that will be called upon to create new
     64                         containers.  The passed object must have a method called
     65                         "create_container" that takes no arguments and returns
     66                         an instance of a Container.
     67         @param size: The size of the Pool.  The Pool attempts to keep this many
     68                      containers running at all times.
     69         """
     70         # Pools of size less than 2 don't make sense.  Don't allow them.
     71         if size < 2:
     72             raise ValueError('Invalid pool size.')
     73 
     74         logging.debug('Pool.__init__ called.  Size: %d', size)
     75         self._pool = Queue.Queue(size)
     76         self._monitor = _Monitor(factory, self._pool)
     77         self._monitor.start()
     78 
     79 
     80     def get(self, timeout=0):
     81         """Gets a container from the pool.
     82 
     83         @param timeout: Number of seconds to wait before returning.
     84                         - If 0 (the default), return immediately.  If a
     85                           Container is not immediately available, return None.
     86                         - If a positive number, block at most <timeout> seconds,
     87                           then return None if a Container was not available
     88                           within that time.
     89                         - If None, block indefinitely until a Container is
     90                           available.
     91 
     92         @return: A container from the pool.
     93         """
     94         try:
     95             # Block only if timeout is not zero.
     96             logging.info('Pool.get called.')
     97             return self._pool.get(block=(timeout != 0),
     98                                   timeout=timeout)
     99         except Queue.Empty:
    100             return None
    101 
    102 
    103     def cleanup(self, timeout=0):
    104         """Cleans up the container pool.
    105 
    106         Stops all worker threads, and destroys all Containers still in the Pool.
    107 
    108         @param timeout: For testing.  If this is non-zero, it specifies the
    109                         number of seconds to wait for each worker to shut down.
    110                         An error is raised if shutdown has not occurred by then.
    111                         If zero (the default), don't wait for worker threads to
    112                         shut down, just return immediately.
    113         """
    114         logging.info('Pool.cleanup called.')
    115         # Stop the monitor thread, then drain the pool.
    116         self._monitor.stop(timeout)
    117 
    118         try:
    119             dcount = 0
    120             logging.debug('Emptying container pool')
    121             while True:
    122                 container = self._pool.get(block=False)
    123                 dcount += 1
    124                 container.destroy()
    125         except Queue.Empty:
    126             pass
    127         finally:
    128             metrics.Counter(METRICS_PREFIX + '/containers_cleaned_up'
    129                             ).increment_by(dcount)
    130             logging.debug('Done.  Destroyed %d containers', dcount)
    131 
    132 
    133     @property
    134     def size(self):
    135         """Returns the current size of the pool.
    136 
    137         Note that the pool is asynchronous.  Returning a size greater than zero
    138         does not guarantee that a subsequent call to Pool.get will not block.
    139         Conversely, returning a size of zero does not guarantee that a
    140         subsequent call to Pool.get will block.
    141         """
    142         return self._pool.qsize()
    143 
    144 
    145     @property
    146     def capacity(self):
    147         """Returns the max size of the pool."""
    148         return self._pool.maxsize
    149 
    150 
    151     @property
    152     def errors(self):
    153         """Returns worker errors.
    154 
    155         @return: A Queue containing all the errors encountered on worker
    156                  threads.
    157         """
    158         return self._monitor.errors;
    159 
    160 
    161     @property
    162     def worker_count(self):
    163         """Returns the number of currently active workers.
    164 
    165         Note that this isn't quite the same as the number of currently alive
    166         worker threads.  Worker threads that have timed out or been cancelled
    167         may be technically alive, but they are not included in this count.
    168         """
    169         return len(self._monitor._workers)
    170 
    171 
    172 class _Monitor(threading.Thread):
    173     """A thread that manages the creation of containers for the pool.
    174 
    175     Container creation is potentially time-consuming and can hang or crash.  The
    176     Monitor class manages a pool of independent threads, each responsible for
    177     the creation of a single Container.  This provides parallelized container
    178     creation and ensures that a single Container creation hanging/crashing does
    179     not starve or crash the Pool.
    180     """
    181 
    182     def __init__(self, factory, pool):
    183         """Creates a new monitor.
    184 
    185         @param factory: A container factory.
    186         @param pool: A pool instance to push created containers into.
    187         """
    188         super(_Monitor, self).__init__(name='pool_monitor')
    189 
    190         self._factory = factory
    191         self._pool = pool
    192 
    193         # List of worker threads.  Access this only from the monitor thread.
    194         self._worker_max = _MAX_CONCURRENT_WORKERS
    195         self._workers = []
    196 
    197         # A flag for stopping the monitor.
    198         self._stop = False
    199 
    200         # Stores errors from worker threads.
    201         self.errors = Queue.Queue()
    202 
    203         # Throttle on errors, to avoid log spam and CPU spinning.
    204         self._error_timestamps = collections.deque()
    205 
    206 
    207     def run(self):
    208         """Supplies the container pool with containers."""
    209         logging.debug('Start event loop.')
    210         while not self._stop:
    211             self._clear_old_errors()
    212             self._create_workers()
    213             self._poll_workers()
    214             time.sleep(_MIN_MONITOR_PERIOD)
    215         logging.debug('Exit event loop.')
    216 
    217         # Clean up - stop all workers.
    218         for worker in self._workers:
    219             worker.cancel()
    220 
    221 
    222     def stop(self, timeout=0):
    223         """Stops this thread.
    224 
    225         This function blocks until the monitor thread has stopped.
    226 
    227         @param timeout: If this is a non-zero number, wait this long (in
    228                         seconds) for each worker thread to stop.  If zero (the
    229                         default), don't wait for worker threads to exit.
    230 
    231         @raise WorkerTimeoutError: If a worker thread does not exit within the
    232                                    specified timeout.
    233         """
    234         logging.info('Stop requested.')
    235         self._stop = True
    236         self.join()
    237         logging.info('Stopped.')
    238         # Wait for workers if timeout was requested.
    239         if timeout > 0:
    240             logging.debug('Waiting for workers to terminate...')
    241             for worker in self._workers:
    242                 worker.join(timeout)
    243                 if worker.is_alive():
    244                     raise lxc_error.WorkerTimeoutError()
    245 
    246 
    247     def _create_workers(self):
    248         """Spawns workers to handle container requests.
    249 
    250         This method modifies the _workers list and should only be called from
    251         within run().
    252         """
    253         if self._pool.full():
    254             return
    255 
    256         # Do not exceed the worker limit.
    257         if len(self._workers) >= self._worker_max:
    258             return
    259 
    260         too_many_errors = len(self._error_timestamps) >= _MAX_ERRORS_PER_HOUR
    261         metrics.Counter(METRICS_PREFIX + '/error_throttled',
    262                         field_spec=[ts_mon.BooleanField('throttled')]
    263                         ).increment(fields={'throttled': too_many_errors})
    264         # Throttle if too many errors occur.
    265         if too_many_errors:
    266             logging.warning('Error throttled (until %d)',
    267                             self._error_timestamps[0] + 3600)
    268             return
    269 
    270         # Create workers to refill the pool.
    271         qsize = self._pool.qsize()
    272         shortfall = self._pool.maxsize - qsize
    273         old_worker_count = len(self._workers)
    274 
    275         # Avoid spamming - only log if the monitor is taking some action.  Log
    276         # this before creating worker threads, because we are counting live
    277         # threads and want to avoid race conditions w.r.t. threads actually
    278         # starting.
    279         if (old_worker_count < shortfall and
    280                 old_worker_count < self._worker_max):
    281             # This can include workers that aren't currently in the self._worker
    282             # list, e.g. workers that were dropped from the list because they
    283             # timed out.
    284             active_workers = sum([1 for t in threading.enumerate()
    285                                   if type(t) is _Worker])
    286             # qsize    : Current size of the container pool.
    287             # shortfall: Number of empty slots currently in the pool.
    288             # workers  : m+n, where m is the current number of active worker
    289             #            threads and n is the number of new threads created.
    290             logging.debug('qsize:%d shortfall:%d workers:%d',
    291                           qsize, shortfall, active_workers)
    292         if len(self._workers) < shortfall:
    293             worker = _Worker(self._factory,
    294                              self._on_worker_result,
    295                              self._on_worker_error)
    296             worker.start()
    297             self._workers.append(worker)
    298 
    299 
    300     def _poll_workers(self):
    301         """Checks worker states and deals with them.
    302 
    303         This method modifies the _workers list and should only be called from
    304         within run().
    305 
    306         Completed workers are taken off the worker list and their results/errors
    307         are logged.
    308         """
    309         completed = []
    310         incomplete = []
    311         for worker in self._workers:
    312             if worker.check_health():
    313                 incomplete.append(worker)
    314             else:
    315                 completed.append(worker)
    316 
    317         self._workers = incomplete
    318 
    319 
    320     def _on_worker_result(self, result):
    321         """Receives results from completed worker threads.
    322 
    323         Pass this as the result callback for worker threads.  Worker threads
    324         should call this when they produce a container.
    325         """
    326         logging.debug('Worker result: %r', result)
    327         self._pool.put(result)
    328 
    329 
    330     def _on_worker_error(self, worker, err):
    331         """Receives errors from worker threads.
    332 
    333         Pass this as the error callback for worker threads.  Worker threads
    334         should call this if errors occur.
    335         """
    336         timestamp = time.time()
    337         self._error_timestamps.append(timestamp)
    338         self.errors.put(err)
    339         logging.error('[%d] Worker error: %s', worker.ident, err)
    340 
    341 
    342     def _clear_old_errors(self):
    343         """Clears errors more than an hour old out of the log."""
    344         one_hour_ago = time.time() - 3600
    345         metrics.Counter(METRICS_PREFIX + '/recent_errors'
    346                         ).increment_by(len(self._error_timestamps))
    347         while (self._error_timestamps and
    348                self._error_timestamps[0] < one_hour_ago):
    349             self._error_timestamps.popleft()
    350             # Avoid logspam - log only when some action was taken.
    351             logging.error('Worker error count: %d', len(self._error_timestamps))
    352 
    353 
    354 class _Worker(threading.Thread):
    355     """A worker thread tasked with managing the creation of a single container.
    356 
    357     The worker is a daemon thread that calls upon a container factory to create
    358     a single container.  If container creation raises any exceptions, they are
    359     logged and the worker exits.  The worker also provides a mechanism for the
    360     parent thread to impose timeouts on container creation.
    361     """
    362 
    363     def __init__(self, factory, result_cb, error_cb):
    364         """Creates a new Worker.
    365 
    366         @param factory: A factory object that will be called upon to create
    367                         Containers.
    368         """
    369         super(_Worker, self).__init__(name='pool_worker')
    370         # Hanging worker threads should not keep the pool process alive.
    371         self.daemon = True
    372 
    373         self._factory = factory
    374 
    375         self._result_cb = result_cb
    376         self._error_cb = error_cb
    377 
    378         self._cancelled = False
    379         self._start_time = None
    380 
    381         # A lock for breaking race conditions in worker cancellation.  Use a
    382         # recursive lock because _check_health requires it.
    383         self._completion_lock = threading.RLock()
    384         self._completed = False
    385 
    386 
    387     def run(self):
    388         """Creates a single container."""
    389         self._start_time = time.time()
    390         container = None
    391         try:
    392             container = self._factory.create_container()
    393             container.start(wait_for_network=True)
    394         except Exception as e:
    395             logging.error('Worker error: %s', error.format_error())
    396             self._error_cb(self, e)
    397         finally:
    398             # All this has to happen atomically, otherwise race conditions can
    399             # arise w.r.t. cancellation.
    400             with self._completion_lock:
    401                 self._completed = True
    402                 if self._cancelled:
    403                     # If the job was cancelled, destroy the container instead of
    404                     # putting it in the result queue.  Do not release the
    405                     # throttle, as it would have been released when the
    406                     # cancellation occurred.
    407                     if container is not None:
    408                         container.destroy()
    409                 else:
    410                     # Put the container in the result field.  Release the
    411                     # throttle so another task can be picked up.
    412                     # Container may be None if errors occurred.
    413                     if container is not None:
    414                         self._result_cb(container)
    415 
    416 
    417     def cancel(self):
    418         """Cancels the work request.
    419 
    420         The container will be destroyed when created, instead of being added to
    421         the container pool.
    422         """
    423         with self._completion_lock:
    424             if self._completed:
    425                 return False
    426             else:
    427                 self._cancelled = True
    428                 return True
    429 
    430 
    431     def check_health(self):
    432         """Checks that a worker is alive and has not timed out.
    433 
    434         Checks the run time of the worker to make sure it hasn't timed out.
    435         Cancels workers that exceed the timeout.
    436 
    437         @return: True if the worker is alive and has not timed out, False
    438                  otherwise.
    439         """
    440         # Acquire the completion lock so as to avoid race conditions if the
    441         # factory happens to return just as we are timing out.
    442         with self._completion_lock:
    443             if not self.is_alive() or self._cancelled or self._completed:
    444                 return False
    445 
    446             # Thread hasn't started yet - count this as healthy.
    447             if self._start_time is None:
    448                 return True
    449 
    450             # If alive, check the timeout and cancel if necessary.
    451             runtime = time.time() - self._start_time
    452             if runtime > _CONTAINER_CREATION_TIMEOUT:
    453                 if self.cancel():
    454                     self._error_cb(self, lxc_error.WorkerTimeoutError())
    455                     return False
    456 
    457         return True
    458