1 # Copyright 2009 Brian Quinlan. All Rights Reserved. 2 # Licensed to PSF under a Contributor Agreement. 3 4 """Implements ThreadPoolExecutor.""" 5 6 __author__ = 'Brian Quinlan (brian (at] sweetapp.com)' 7 8 import atexit 9 from concurrent.futures import _base 10 import queue 11 import threading 12 import weakref 13 import os 14 15 # Workers are created as daemon threads. This is done to allow the interpreter 16 # to exit when there are still idle threads in a ThreadPoolExecutor's thread 17 # pool (i.e. shutdown() was not called). However, allowing workers to die with 18 # the interpreter has two undesirable properties: 19 # - The workers would still be running during interpreter shutdown, 20 # meaning that they would fail in unpredictable ways. 21 # - The workers could be killed while evaluating a work item, which could 22 # be bad if the callable being evaluated has external side-effects e.g. 23 # writing to a file. 24 # 25 # To work around this problem, an exit handler is installed which tells the 26 # workers to exit when their work queues are empty and then waits until the 27 # threads finish. 28 29 _threads_queues = weakref.WeakKeyDictionary() 30 _shutdown = False 31 32 def _python_exit(): 33 global _shutdown 34 _shutdown = True 35 items = list(_threads_queues.items()) 36 for t, q in items: 37 q.put(None) 38 for t, q in items: 39 t.join() 40 41 atexit.register(_python_exit) 42 43 class _WorkItem(object): 44 def __init__(self, future, fn, args, kwargs): 45 self.future = future 46 self.fn = fn 47 self.args = args 48 self.kwargs = kwargs 49 50 def run(self): 51 if not self.future.set_running_or_notify_cancel(): 52 return 53 54 try: 55 result = self.fn(*self.args, **self.kwargs) 56 except BaseException as e: 57 self.future.set_exception(e) 58 else: 59 self.future.set_result(result) 60 61 def _worker(executor_reference, work_queue): 62 try: 63 while True: 64 work_item = work_queue.get(block=True) 65 if work_item is not None: 66 work_item.run() 67 # Delete references to object. See issue16284 68 del work_item 69 continue 70 executor = executor_reference() 71 # Exit if: 72 # - The interpreter is shutting down OR 73 # - The executor that owns the worker has been collected OR 74 # - The executor that owns the worker has been shutdown. 75 if _shutdown or executor is None or executor._shutdown: 76 # Notice other workers 77 work_queue.put(None) 78 return 79 del executor 80 except BaseException: 81 _base.LOGGER.critical('Exception in worker', exc_info=True) 82 83 class ThreadPoolExecutor(_base.Executor): 84 def __init__(self, max_workers=None, thread_name_prefix=''): 85 """Initializes a new ThreadPoolExecutor instance. 86 87 Args: 88 max_workers: The maximum number of threads that can be used to 89 execute the given calls. 90 thread_name_prefix: An optional name prefix to give our threads. 91 """ 92 if max_workers is None: 93 # Use this number because ThreadPoolExecutor is often 94 # used to overlap I/O instead of CPU work. 95 max_workers = (os.cpu_count() or 1) * 5 96 if max_workers <= 0: 97 raise ValueError("max_workers must be greater than 0") 98 99 self._max_workers = max_workers 100 self._work_queue = queue.Queue() 101 self._threads = set() 102 self._shutdown = False 103 self._shutdown_lock = threading.Lock() 104 self._thread_name_prefix = thread_name_prefix 105 106 def submit(self, fn, *args, **kwargs): 107 with self._shutdown_lock: 108 if self._shutdown: 109 raise RuntimeError('cannot schedule new futures after shutdown') 110 111 f = _base.Future() 112 w = _WorkItem(f, fn, args, kwargs) 113 114 self._work_queue.put(w) 115 self._adjust_thread_count() 116 return f 117 submit.__doc__ = _base.Executor.submit.__doc__ 118 119 def _adjust_thread_count(self): 120 # When the executor gets lost, the weakref callback will wake up 121 # the worker threads. 122 def weakref_cb(_, q=self._work_queue): 123 q.put(None) 124 # TODO(bquinlan): Should avoid creating new threads if there are more 125 # idle threads than items in the work queue. 126 num_threads = len(self._threads) 127 if num_threads < self._max_workers: 128 thread_name = '%s_%d' % (self._thread_name_prefix or self, 129 num_threads) 130 t = threading.Thread(name=thread_name, target=_worker, 131 args=(weakref.ref(self, weakref_cb), 132 self._work_queue)) 133 t.daemon = True 134 t.start() 135 self._threads.add(t) 136 _threads_queues[t] = self._work_queue 137 138 def shutdown(self, wait=True): 139 with self._shutdown_lock: 140 self._shutdown = True 141 self._work_queue.put(None) 142 if wait: 143 for t in self._threads: 144 t.join() 145 shutdown.__doc__ = _base.Executor.shutdown.__doc__ 146