Home | History | Annotate | Download | only in futures
      1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
      2 # Licensed to PSF under a Contributor Agreement.
      3 
      4 """Implements ThreadPoolExecutor."""
      5 
      6 __author__ = 'Brian Quinlan (brian (at] sweetapp.com)'
      7 
      8 import atexit
      9 from concurrent.futures import _base
     10 import itertools
     11 import queue
     12 import threading
     13 import weakref
     14 import os
     15 
     16 # Workers are created as daemon threads. This is done to allow the interpreter
     17 # to exit when there are still idle threads in a ThreadPoolExecutor's thread
     18 # pool (i.e. shutdown() was not called). However, allowing workers to die with
     19 # the interpreter has two undesirable properties:
     20 #   - The workers would still be running during interpreter shutdown,
     21 #     meaning that they would fail in unpredictable ways.
     22 #   - The workers could be killed while evaluating a work item, which could
     23 #     be bad if the callable being evaluated has external side-effects e.g.
     24 #     writing to a file.
     25 #
     26 # To work around this problem, an exit handler is installed which tells the
     27 # workers to exit when their work queues are empty and then waits until the
     28 # threads finish.
     29 
     30 _threads_queues = weakref.WeakKeyDictionary()
     31 _shutdown = False
     32 
     33 def _python_exit():
     34     global _shutdown
     35     _shutdown = True
     36     items = list(_threads_queues.items())
     37     for t, q in items:
     38         q.put(None)
     39     for t, q in items:
     40         t.join()
     41 
     42 atexit.register(_python_exit)
     43 
     44 
     45 class _WorkItem(object):
     46     def __init__(self, future, fn, args, kwargs):
     47         self.future = future
     48         self.fn = fn
     49         self.args = args
     50         self.kwargs = kwargs
     51 
     52     def run(self):
     53         if not self.future.set_running_or_notify_cancel():
     54             return
     55 
     56         try:
     57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)
     60             # Break a reference cycle with the exception 'exc'
     61             self = None
     62         else:
     63             self.future.set_result(result)
     64 
     65 
     66 def _worker(executor_reference, work_queue, initializer, initargs):
     67     if initializer is not None:
     68         try:
     69             initializer(*initargs)
     70         except BaseException:
     71             _base.LOGGER.critical('Exception in initializer:', exc_info=True)
     72             executor = executor_reference()
     73             if executor is not None:
     74                 executor._initializer_failed()
     75             return
     76     try:
     77         while True:
     78             work_item = work_queue.get(block=True)
     79             if work_item is not None:
     80                 work_item.run()
     81                 # Delete references to object. See issue16284
     82                 del work_item
     83                 continue
     84             executor = executor_reference()
     85             # Exit if:
     86             #   - The interpreter is shutting down OR
     87             #   - The executor that owns the worker has been collected OR
     88             #   - The executor that owns the worker has been shutdown.
     89             if _shutdown or executor is None or executor._shutdown:
     90                 # Flag the executor as shutting down as early as possible if it
     91                 # is not gc-ed yet.
     92                 if executor is not None:
     93                     executor._shutdown = True
     94                 # Notice other workers
     95                 work_queue.put(None)
     96                 return
     97             del executor
     98     except BaseException:
     99         _base.LOGGER.critical('Exception in worker', exc_info=True)
    100 
    101 
    102 class BrokenThreadPool(_base.BrokenExecutor):
    103     """
    104     Raised when a worker thread in a ThreadPoolExecutor failed initializing.
    105     """
    106 
    107 
    108 class ThreadPoolExecutor(_base.Executor):
    109 
    110     # Used to assign unique thread names when thread_name_prefix is not supplied.
    111     _counter = itertools.count().__next__
    112 
    113     def __init__(self, max_workers=None, thread_name_prefix='',
    114                  initializer=None, initargs=()):
    115         """Initializes a new ThreadPoolExecutor instance.
    116 
    117         Args:
    118             max_workers: The maximum number of threads that can be used to
    119                 execute the given calls.
    120             thread_name_prefix: An optional name prefix to give our threads.
    121             initializer: An callable used to initialize worker threads.
    122             initargs: A tuple of arguments to pass to the initializer.
    123         """
    124         if max_workers is None:
    125             # Use this number because ThreadPoolExecutor is often
    126             # used to overlap I/O instead of CPU work.
    127             max_workers = (os.cpu_count() or 1) * 5
    128         if max_workers <= 0:
    129             raise ValueError("max_workers must be greater than 0")
    130 
    131         if initializer is not None and not callable(initializer):
    132             raise TypeError("initializer must be a callable")
    133 
    134         self._max_workers = max_workers
    135         self._work_queue = queue.SimpleQueue()
    136         self._threads = set()
    137         self._broken = False
    138         self._shutdown = False
    139         self._shutdown_lock = threading.Lock()
    140         self._thread_name_prefix = (thread_name_prefix or
    141                                     ("ThreadPoolExecutor-%d" % self._counter()))
    142         self._initializer = initializer
    143         self._initargs = initargs
    144 
    145     def submit(self, fn, *args, **kwargs):
    146         with self._shutdown_lock:
    147             if self._broken:
    148                 raise BrokenThreadPool(self._broken)
    149 
    150             if self._shutdown:
    151                 raise RuntimeError('cannot schedule new futures after shutdown')
    152             if _shutdown:
    153                 raise RuntimeError('cannot schedule new futures after '
    154                                    'interpreter shutdown')
    155 
    156             f = _base.Future()
    157             w = _WorkItem(f, fn, args, kwargs)
    158 
    159             self._work_queue.put(w)
    160             self._adjust_thread_count()
    161             return f
    162     submit.__doc__ = _base.Executor.submit.__doc__
    163 
    164     def _adjust_thread_count(self):
    165         # When the executor gets lost, the weakref callback will wake up
    166         # the worker threads.
    167         def weakref_cb(_, q=self._work_queue):
    168             q.put(None)
    169         # TODO(bquinlan): Should avoid creating new threads if there are more
    170         # idle threads than items in the work queue.
    171         num_threads = len(self._threads)
    172         if num_threads < self._max_workers:
    173             thread_name = '%s_%d' % (self._thread_name_prefix or self,
    174                                      num_threads)
    175             t = threading.Thread(name=thread_name, target=_worker,
    176                                  args=(weakref.ref(self, weakref_cb),
    177                                        self._work_queue,
    178                                        self._initializer,
    179                                        self._initargs))
    180             t.daemon = True
    181             t.start()
    182             self._threads.add(t)
    183             _threads_queues[t] = self._work_queue
    184 
    185     def _initializer_failed(self):
    186         with self._shutdown_lock:
    187             self._broken = ('A thread initializer failed, the thread pool '
    188                             'is not usable anymore')
    189             # Drain work queue and mark pending futures failed
    190             while True:
    191                 try:
    192                     work_item = self._work_queue.get_nowait()
    193                 except queue.Empty:
    194                     break
    195                 if work_item is not None:
    196                     work_item.future.set_exception(BrokenThreadPool(self._broken))
    197 
    198     def shutdown(self, wait=True):
    199         with self._shutdown_lock:
    200             self._shutdown = True
    201             self._work_queue.put(None)
    202         if wait:
    203             for t in self._threads:
    204                 t.join()
    205     shutdown.__doc__ = _base.Executor.shutdown.__doc__
    206