1 # Copyright 2009 Brian Quinlan. All Rights Reserved. 2 # Licensed to PSF under a Contributor Agreement. 3 4 """Implements ThreadPoolExecutor.""" 5 6 __author__ = 'Brian Quinlan (brian (at] sweetapp.com)' 7 8 import atexit 9 from concurrent.futures import _base 10 import itertools 11 import queue 12 import threading 13 import weakref 14 import os 15 16 # Workers are created as daemon threads. This is done to allow the interpreter 17 # to exit when there are still idle threads in a ThreadPoolExecutor's thread 18 # pool (i.e. shutdown() was not called). However, allowing workers to die with 19 # the interpreter has two undesirable properties: 20 # - The workers would still be running during interpreter shutdown, 21 # meaning that they would fail in unpredictable ways. 22 # - The workers could be killed while evaluating a work item, which could 23 # be bad if the callable being evaluated has external side-effects e.g. 24 # writing to a file. 25 # 26 # To work around this problem, an exit handler is installed which tells the 27 # workers to exit when their work queues are empty and then waits until the 28 # threads finish. 29 30 _threads_queues = weakref.WeakKeyDictionary() 31 _shutdown = False 32 33 def _python_exit(): 34 global _shutdown 35 _shutdown = True 36 items = list(_threads_queues.items()) 37 for t, q in items: 38 q.put(None) 39 for t, q in items: 40 t.join() 41 42 atexit.register(_python_exit) 43 44 45 class _WorkItem(object): 46 def __init__(self, future, fn, args, kwargs): 47 self.future = future 48 self.fn = fn 49 self.args = args 50 self.kwargs = kwargs 51 52 def run(self): 53 if not self.future.set_running_or_notify_cancel(): 54 return 55 56 try: 57 result = self.fn(*self.args, **self.kwargs) 58 except BaseException as exc: 59 self.future.set_exception(exc) 60 # Break a reference cycle with the exception 'exc' 61 self = None 62 else: 63 self.future.set_result(result) 64 65 66 def _worker(executor_reference, work_queue, initializer, initargs): 67 if initializer is not None: 68 try: 69 initializer(*initargs) 70 except BaseException: 71 _base.LOGGER.critical('Exception in initializer:', exc_info=True) 72 executor = executor_reference() 73 if executor is not None: 74 executor._initializer_failed() 75 return 76 try: 77 while True: 78 work_item = work_queue.get(block=True) 79 if work_item is not None: 80 work_item.run() 81 # Delete references to object. See issue16284 82 del work_item 83 continue 84 executor = executor_reference() 85 # Exit if: 86 # - The interpreter is shutting down OR 87 # - The executor that owns the worker has been collected OR 88 # - The executor that owns the worker has been shutdown. 89 if _shutdown or executor is None or executor._shutdown: 90 # Flag the executor as shutting down as early as possible if it 91 # is not gc-ed yet. 92 if executor is not None: 93 executor._shutdown = True 94 # Notice other workers 95 work_queue.put(None) 96 return 97 del executor 98 except BaseException: 99 _base.LOGGER.critical('Exception in worker', exc_info=True) 100 101 102 class BrokenThreadPool(_base.BrokenExecutor): 103 """ 104 Raised when a worker thread in a ThreadPoolExecutor failed initializing. 105 """ 106 107 108 class ThreadPoolExecutor(_base.Executor): 109 110 # Used to assign unique thread names when thread_name_prefix is not supplied. 111 _counter = itertools.count().__next__ 112 113 def __init__(self, max_workers=None, thread_name_prefix='', 114 initializer=None, initargs=()): 115 """Initializes a new ThreadPoolExecutor instance. 116 117 Args: 118 max_workers: The maximum number of threads that can be used to 119 execute the given calls. 120 thread_name_prefix: An optional name prefix to give our threads. 121 initializer: An callable used to initialize worker threads. 122 initargs: A tuple of arguments to pass to the initializer. 123 """ 124 if max_workers is None: 125 # Use this number because ThreadPoolExecutor is often 126 # used to overlap I/O instead of CPU work. 127 max_workers = (os.cpu_count() or 1) * 5 128 if max_workers <= 0: 129 raise ValueError("max_workers must be greater than 0") 130 131 if initializer is not None and not callable(initializer): 132 raise TypeError("initializer must be a callable") 133 134 self._max_workers = max_workers 135 self._work_queue = queue.SimpleQueue() 136 self._threads = set() 137 self._broken = False 138 self._shutdown = False 139 self._shutdown_lock = threading.Lock() 140 self._thread_name_prefix = (thread_name_prefix or 141 ("ThreadPoolExecutor-%d" % self._counter())) 142 self._initializer = initializer 143 self._initargs = initargs 144 145 def submit(self, fn, *args, **kwargs): 146 with self._shutdown_lock: 147 if self._broken: 148 raise BrokenThreadPool(self._broken) 149 150 if self._shutdown: 151 raise RuntimeError('cannot schedule new futures after shutdown') 152 if _shutdown: 153 raise RuntimeError('cannot schedule new futures after ' 154 'interpreter shutdown') 155 156 f = _base.Future() 157 w = _WorkItem(f, fn, args, kwargs) 158 159 self._work_queue.put(w) 160 self._adjust_thread_count() 161 return f 162 submit.__doc__ = _base.Executor.submit.__doc__ 163 164 def _adjust_thread_count(self): 165 # When the executor gets lost, the weakref callback will wake up 166 # the worker threads. 167 def weakref_cb(_, q=self._work_queue): 168 q.put(None) 169 # TODO(bquinlan): Should avoid creating new threads if there are more 170 # idle threads than items in the work queue. 171 num_threads = len(self._threads) 172 if num_threads < self._max_workers: 173 thread_name = '%s_%d' % (self._thread_name_prefix or self, 174 num_threads) 175 t = threading.Thread(name=thread_name, target=_worker, 176 args=(weakref.ref(self, weakref_cb), 177 self._work_queue, 178 self._initializer, 179 self._initargs)) 180 t.daemon = True 181 t.start() 182 self._threads.add(t) 183 _threads_queues[t] = self._work_queue 184 185 def _initializer_failed(self): 186 with self._shutdown_lock: 187 self._broken = ('A thread initializer failed, the thread pool ' 188 'is not usable anymore') 189 # Drain work queue and mark pending futures failed 190 while True: 191 try: 192 work_item = self._work_queue.get_nowait() 193 except queue.Empty: 194 break 195 if work_item is not None: 196 work_item.future.set_exception(BrokenThreadPool(self._broken)) 197 198 def shutdown(self, wait=True): 199 with self._shutdown_lock: 200 self._shutdown = True 201 self._work_queue.put(None) 202 if wait: 203 for t in self._threads: 204 t.join() 205 shutdown.__doc__ = _base.Executor.shutdown.__doc__ 206