1 # Copyright 2017 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import Queue 6 import logging 7 import os 8 import socket 9 import threading 10 from multiprocessing import connection 11 12 import common 13 14 15 class AsyncListener(object): 16 """A class for asynchronous listening on a unix socket. 17 18 This class opens a unix socket with the given address and auth key. 19 Connections are listened for on a separate thread, and queued up to be dealt 20 with. 21 """ 22 def __init__(self, address): 23 """Opens a socket with the given address and key. 24 25 @param address: The socket address. 26 27 @raises socket.error: If the address is already in use or is not a valid 28 path. 29 @raises TypeError: If the address is not a valid unix domain socket 30 address. 31 """ 32 self._socket = connection.Listener(address, family='AF_UNIX') 33 34 # This is done mostly for local testing/dev purposes - the easiest/most 35 # reliable way to run the container pool locally is as root, but then 36 # only other processes owned by root can connect to the container. 37 # Setting open permissions on the socket makes it so that other users 38 # can connect, which enables developers to then run tests without sudo. 39 os.chmod(address, 0777) 40 41 self._address = address 42 self._queue = Queue.Queue() 43 self._thread = None 44 self._running = False 45 46 47 def start(self): 48 """Starts listening for connections. 49 50 Starts a child thread that listens asynchronously for connections. 51 After calling this function, incoming connections may be retrieved by 52 calling the get_connection method. 53 """ 54 logging.debug('Starting connection listener.') 55 self._running = True 56 self._thread = threading.Thread(name='connection_listener', 57 target=self._poll) 58 self._thread.start() 59 60 61 def is_running(self): 62 """Returns whether the listener is currently running.""" 63 return self._running 64 65 66 def stop(self): 67 """Stop listening for connections. 68 69 Stops the listening thread. After this is called, connections will no 70 longer be received by the socket. Note, however, that the socket is not 71 destroyed and that calling start again, will resume listening for 72 connections. 73 74 This function is expected to be called when the container pool service 75 is being killed/restarted, so it doesn't make an extraordinary effort to 76 ensure that the listener thread is cleanly destroyed. 77 78 @return: True if the listener thread was successfully killed, False 79 otherwise. 80 """ 81 if not self._running: 82 return False 83 84 logging.debug('Stopping connection listener.') 85 # Setting this to false causes the thread's event loop to exit on the 86 # next iteration. 87 self._running = False 88 # Initiate a connection to force a trip through the event loop. Use raw 89 # sockets because the connection module's convenience classes don't 90 # support timeouts, which leads to deadlocks. 91 try: 92 fake_connection = socket.socket(socket.AF_UNIX) 93 fake_connection.settimeout(0) # non-blocking 94 fake_connection.connect(self._address) 95 fake_connection.close() 96 except socket.timeout: 97 logging.error('Timeout while attempting to close socket listener.') 98 return False 99 100 logging.debug('Socket closed. Waiting for thread to terminate.') 101 self._thread.join(1) 102 return not self._thread.isAlive() 103 104 105 def close(self): 106 """Closes and destroys the socket. 107 108 If the listener thread is running, it is first stopped. 109 """ 110 logging.debug('AsyncListener.close called.') 111 if self._running: 112 self.stop() 113 self._socket.close() 114 115 116 def get_connection(self, timeout=0): 117 """Returns a connection, if one is pending. 118 119 The listener thread queues up connections for the main process to 120 handle. This method returns a pending connection on the queue. If no 121 connections are pending, None is returned. 122 123 @param timeout: Optional timeout. If set to 0 (the default), the method 124 will return instantly if no connections are awaiting. 125 Otherwise, the method will wait the specified number of 126 seconds before returning. 127 128 @return: A pending connection, or None of no connections are pending. 129 """ 130 try: 131 return self._queue.get(block=timeout>0, timeout=timeout) 132 except Queue.Empty: 133 return None 134 135 136 def _poll(self): 137 """Polls the socket for incoming connections. 138 139 This function is intended to be run on the listener thread. It accepts 140 incoming socket connections, and queues them up to be handled. 141 """ 142 logging.debug('Start event loop.') 143 while self._running: 144 try: 145 self._queue.put(self._socket.accept()) 146 logging.debug('Received incoming connection.') 147 except IOError: 148 # The stop method uses a fake connection to unblock the polling 149 # thread. This results in an IOError but this is an expected 150 # outcome. 151 logging.debug('Connection aborted.') 152 logging.debug('Exit event loop.') 153