Home | History | Annotate | Download | only in futures
      1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
      2 # Licensed to PSF under a Contributor Agreement.
      3 
      4 """Implements ThreadPoolExecutor."""
      5 
      6 __author__ = 'Brian Quinlan (brian (at] sweetapp.com)'
      7 
      8 import atexit
      9 from concurrent.futures import _base
     10 import queue
     11 import threading
     12 import weakref
     13 import os
     14 
     15 # Workers are created as daemon threads. This is done to allow the interpreter
     16 # to exit when there are still idle threads in a ThreadPoolExecutor's thread
     17 # pool (i.e. shutdown() was not called). However, allowing workers to die with
     18 # the interpreter has two undesirable properties:
     19 #   - The workers would still be running during interpreter shutdown,
     20 #     meaning that they would fail in unpredictable ways.
     21 #   - The workers could be killed while evaluating a work item, which could
     22 #     be bad if the callable being evaluated has external side-effects e.g.
     23 #     writing to a file.
     24 #
     25 # To work around this problem, an exit handler is installed which tells the
     26 # workers to exit when their work queues are empty and then waits until the
     27 # threads finish.
     28 
     29 _threads_queues = weakref.WeakKeyDictionary()
     30 _shutdown = False
     31 
     32 def _python_exit():
     33     global _shutdown
     34     _shutdown = True
     35     items = list(_threads_queues.items())
     36     for t, q in items:
     37         q.put(None)
     38     for t, q in items:
     39         t.join()
     40 
     41 atexit.register(_python_exit)
     42 
     43 class _WorkItem(object):
     44     def __init__(self, future, fn, args, kwargs):
     45         self.future = future
     46         self.fn = fn
     47         self.args = args
     48         self.kwargs = kwargs
     49 
     50     def run(self):
     51         if not self.future.set_running_or_notify_cancel():
     52             return
     53 
     54         try:
     55             result = self.fn(*self.args, **self.kwargs)
     56         except BaseException as e:
     57             self.future.set_exception(e)
     58         else:
     59             self.future.set_result(result)
     60 
     61 def _worker(executor_reference, work_queue):
     62     try:
     63         while True:
     64             work_item = work_queue.get(block=True)
     65             if work_item is not None:
     66                 work_item.run()
     67                 # Delete references to object. See issue16284
     68                 del work_item
     69                 continue
     70             executor = executor_reference()
     71             # Exit if:
     72             #   - The interpreter is shutting down OR
     73             #   - The executor that owns the worker has been collected OR
     74             #   - The executor that owns the worker has been shutdown.
     75             if _shutdown or executor is None or executor._shutdown:
     76                 # Notice other workers
     77                 work_queue.put(None)
     78                 return
     79             del executor
     80     except BaseException:
     81         _base.LOGGER.critical('Exception in worker', exc_info=True)
     82 
     83 class ThreadPoolExecutor(_base.Executor):
     84     def __init__(self, max_workers=None, thread_name_prefix=''):
     85         """Initializes a new ThreadPoolExecutor instance.
     86 
     87         Args:
     88             max_workers: The maximum number of threads that can be used to
     89                 execute the given calls.
     90             thread_name_prefix: An optional name prefix to give our threads.
     91         """
     92         if max_workers is None:
     93             # Use this number because ThreadPoolExecutor is often
     94             # used to overlap I/O instead of CPU work.
     95             max_workers = (os.cpu_count() or 1) * 5
     96         if max_workers <= 0:
     97             raise ValueError("max_workers must be greater than 0")
     98 
     99         self._max_workers = max_workers
    100         self._work_queue = queue.Queue()
    101         self._threads = set()
    102         self._shutdown = False
    103         self._shutdown_lock = threading.Lock()
    104         self._thread_name_prefix = thread_name_prefix
    105 
    106     def submit(self, fn, *args, **kwargs):
    107         with self._shutdown_lock:
    108             if self._shutdown:
    109                 raise RuntimeError('cannot schedule new futures after shutdown')
    110 
    111             f = _base.Future()
    112             w = _WorkItem(f, fn, args, kwargs)
    113 
    114             self._work_queue.put(w)
    115             self._adjust_thread_count()
    116             return f
    117     submit.__doc__ = _base.Executor.submit.__doc__
    118 
    119     def _adjust_thread_count(self):
    120         # When the executor gets lost, the weakref callback will wake up
    121         # the worker threads.
    122         def weakref_cb(_, q=self._work_queue):
    123             q.put(None)
    124         # TODO(bquinlan): Should avoid creating new threads if there are more
    125         # idle threads than items in the work queue.
    126         num_threads = len(self._threads)
    127         if num_threads < self._max_workers:
    128             thread_name = '%s_%d' % (self._thread_name_prefix or self,
    129                                      num_threads)
    130             t = threading.Thread(name=thread_name, target=_worker,
    131                                  args=(weakref.ref(self, weakref_cb),
    132                                        self._work_queue))
    133             t.daemon = True
    134             t.start()
    135             self._threads.add(t)
    136             _threads_queues[t] = self._work_queue
    137 
    138     def shutdown(self, wait=True):
    139         with self._shutdown_lock:
    140             self._shutdown = True
    141             self._work_queue.put(None)
    142         if wait:
    143             for t in self._threads:
    144                 t.join()
    145     shutdown.__doc__ = _base.Executor.shutdown.__doc__
    146