Home | History | Annotate | Download | only in container_pool
      1 # Copyright 2017 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import Queue
      6 import logging
      7 import os
      8 import socket
      9 import threading
     10 from multiprocessing import connection
     11 
     12 import common
     13 
     14 
     15 class AsyncListener(object):
     16     """A class for asynchronous listening on a unix socket.
     17 
     18     This class opens a unix socket with the given address and auth key.
     19     Connections are listened for on a separate thread, and queued up to be dealt
     20     with.
     21     """
     22     def __init__(self, address):
     23         """Opens a socket with the given address and key.
     24 
     25         @param address: The socket address.
     26 
     27         @raises socket.error: If the address is already in use or is not a valid
     28                               path.
     29         @raises TypeError: If the address is not a valid unix domain socket
     30                            address.
     31         """
     32         self._socket = connection.Listener(address, family='AF_UNIX')
     33 
     34         # This is done mostly for local testing/dev purposes - the easiest/most
     35         # reliable way to run the container pool locally is as root, but then
     36         # only other processes owned by root can connect to the container.
     37         # Setting open permissions on the socket makes it so that other users
     38         # can connect, which enables developers to then run tests without sudo.
     39         os.chmod(address, 0777)
     40 
     41         self._address = address
     42         self._queue = Queue.Queue()
     43         self._thread = None
     44         self._running = False
     45 
     46 
     47     def start(self):
     48         """Starts listening for connections.
     49 
     50         Starts a child thread that listens asynchronously for connections.
     51         After calling this function, incoming connections may be retrieved by
     52         calling the get_connection method.
     53         """
     54         logging.debug('Starting connection listener.')
     55         self._running = True
     56         self._thread = threading.Thread(name='connection_listener',
     57                                         target=self._poll)
     58         self._thread.start()
     59 
     60 
     61     def is_running(self):
     62         """Returns whether the listener is currently running."""
     63         return self._running
     64 
     65 
     66     def stop(self):
     67         """Stop listening for connections.
     68 
     69         Stops the listening thread.  After this is called, connections will no
     70         longer be received by the socket.  Note, however, that the socket is not
     71         destroyed and that calling start again, will resume listening for
     72         connections.
     73 
     74         This function is expected to be called when the container pool service
     75         is being killed/restarted, so it doesn't make an extraordinary effort to
     76         ensure that the listener thread is cleanly destroyed.
     77 
     78         @return: True if the listener thread was successfully killed, False
     79                  otherwise.
     80         """
     81         if not self._running:
     82             return False
     83 
     84         logging.debug('Stopping connection listener.')
     85         # Setting this to false causes the thread's event loop to exit on the
     86         # next iteration.
     87         self._running = False
     88         # Initiate a connection to force a trip through the event loop.  Use raw
     89         # sockets because the connection module's convenience classes don't
     90         # support timeouts, which leads to deadlocks.
     91         try:
     92             fake_connection = socket.socket(socket.AF_UNIX)
     93             fake_connection.settimeout(0)  # non-blocking
     94             fake_connection.connect(self._address)
     95             fake_connection.close()
     96         except socket.timeout:
     97             logging.error('Timeout while attempting to close socket listener.')
     98             return False
     99 
    100         logging.debug('Socket closed. Waiting for thread to terminate.')
    101         self._thread.join(1)
    102         return not self._thread.isAlive()
    103 
    104 
    105     def close(self):
    106         """Closes and destroys the socket.
    107 
    108         If the listener thread is running, it is first stopped.
    109         """
    110         logging.debug('AsyncListener.close called.')
    111         if self._running:
    112             self.stop()
    113         self._socket.close()
    114 
    115 
    116     def get_connection(self, timeout=0):
    117         """Returns a connection, if one is pending.
    118 
    119         The listener thread queues up connections for the main process to
    120         handle.  This method returns a pending connection on the queue.  If no
    121         connections are pending, None is returned.
    122 
    123         @param timeout: Optional timeout.  If set to 0 (the default), the method
    124                         will return instantly if no connections are awaiting.
    125                         Otherwise, the method will wait the specified number of
    126                         seconds before returning.
    127 
    128         @return: A pending connection, or None of no connections are pending.
    129         """
    130         try:
    131             return self._queue.get(block=timeout>0, timeout=timeout)
    132         except Queue.Empty:
    133             return None
    134 
    135 
    136     def _poll(self):
    137         """Polls the socket for incoming connections.
    138 
    139         This function is intended to be run on the listener thread.  It accepts
    140         incoming socket connections, and queues them up to be handled.
    141         """
    142         logging.debug('Start event loop.')
    143         while self._running:
    144             try:
    145                 self._queue.put(self._socket.accept())
    146                 logging.debug('Received incoming connection.')
    147             except IOError:
    148                 # The stop method uses a fake connection to unblock the polling
    149                 # thread.  This results in an IOError but this is an expected
    150                 # outcome.
    151                 logging.debug('Connection aborted.')
    152         logging.debug('Exit event loop.')
    153