Home | History | Annotate | Download | only in futures
      1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
      2 # Licensed to PSF under a Contributor Agreement.
      3 
      4 """Implements ThreadPoolExecutor."""
      5 
      6 import atexit
      7 from concurrent.futures import _base
      8 import itertools
      9 import Queue as queue
     10 import threading
     11 import weakref
     12 import sys
     13 
     14 try:
     15     from multiprocessing import cpu_count
     16 except ImportError:
     17     # some platforms don't have multiprocessing
     18     def cpu_count():
     19         return None
     20 
     21 __author__ = 'Brian Quinlan (brian (at] sweetapp.com)'
     22 
     23 # Workers are created as daemon threads. This is done to allow the interpreter
     24 # to exit when there are still idle threads in a ThreadPoolExecutor's thread
     25 # pool (i.e. shutdown() was not called). However, allowing workers to die with
     26 # the interpreter has two undesirable properties:
     27 #   - The workers would still be running during interpretor shutdown,
     28 #     meaning that they would fail in unpredictable ways.
     29 #   - The workers could be killed while evaluating a work item, which could
     30 #     be bad if the callable being evaluated has external side-effects e.g.
     31 #     writing to a file.
     32 #
     33 # To work around this problem, an exit handler is installed which tells the
     34 # workers to exit when their work queues are empty and then waits until the
     35 # threads finish.
     36 
     37 _threads_queues = weakref.WeakKeyDictionary()
     38 _shutdown = False
     39 
     40 def _python_exit():
     41     global _shutdown
     42     _shutdown = True
     43     items = list(_threads_queues.items()) if _threads_queues else ()
     44     for t, q in items:
     45         q.put(None)
     46     for t, q in items:
     47         t.join(sys.maxint)
     48 
     49 atexit.register(_python_exit)
     50 
     51 class _WorkItem(object):
     52     def __init__(self, future, fn, args, kwargs):
     53         self.future = future
     54         self.fn = fn
     55         self.args = args
     56         self.kwargs = kwargs
     57 
     58     def run(self):
     59         if not self.future.set_running_or_notify_cancel():
     60             return
     61 
     62         try:
     63             result = self.fn(*self.args, **self.kwargs)
     64         except:
     65             e, tb = sys.exc_info()[1:]
     66             self.future.set_exception_info(e, tb)
     67         else:
     68             self.future.set_result(result)
     69 
     70 def _worker(executor_reference, work_queue):
     71     try:
     72         while True:
     73             work_item = work_queue.get(block=True)
     74             if work_item is not None:
     75                 work_item.run()
     76                 # Delete references to object. See issue16284
     77                 del work_item
     78                 continue
     79             executor = executor_reference()
     80             # Exit if:
     81             #   - The interpreter is shutting down OR
     82             #   - The executor that owns the worker has been collected OR
     83             #   - The executor that owns the worker has been shutdown.
     84             if _shutdown or executor is None or executor._shutdown:
     85                 # Notice other workers
     86                 work_queue.put(None)
     87                 return
     88             del executor
     89     except:
     90         _base.LOGGER.critical('Exception in worker', exc_info=True)
     91 
     92 
     93 class ThreadPoolExecutor(_base.Executor):
     94 
     95     # Used to assign unique thread names when thread_name_prefix is not supplied.
     96     _counter = itertools.count().next
     97 
     98     def __init__(self, max_workers=None, thread_name_prefix=''):
     99         """Initializes a new ThreadPoolExecutor instance.
    100 
    101         Args:
    102             max_workers: The maximum number of threads that can be used to
    103                 execute the given calls.
    104             thread_name_prefix: An optional name prefix to give our threads.
    105         """
    106         if max_workers is None:
    107             # Use this number because ThreadPoolExecutor is often
    108             # used to overlap I/O instead of CPU work.
    109             max_workers = (cpu_count() or 1) * 5
    110         if max_workers <= 0:
    111             raise ValueError("max_workers must be greater than 0")
    112 
    113         self._max_workers = max_workers
    114         self._work_queue = queue.Queue()
    115         self._threads = set()
    116         self._shutdown = False
    117         self._shutdown_lock = threading.Lock()
    118         self._thread_name_prefix = (thread_name_prefix or
    119                                     ("ThreadPoolExecutor-%d" % self._counter()))
    120 
    121     def submit(self, fn, *args, **kwargs):
    122         with self._shutdown_lock:
    123             if self._shutdown:
    124                 raise RuntimeError('cannot schedule new futures after shutdown')
    125 
    126             f = _base.Future()
    127             w = _WorkItem(f, fn, args, kwargs)
    128 
    129             self._work_queue.put(w)
    130             self._adjust_thread_count()
    131             return f
    132     submit.__doc__ = _base.Executor.submit.__doc__
    133 
    134     def _adjust_thread_count(self):
    135         # When the executor gets lost, the weakref callback will wake up
    136         # the worker threads.
    137         def weakref_cb(_, q=self._work_queue):
    138             q.put(None)
    139         # TODO(bquinlan): Should avoid creating new threads if there are more
    140         # idle threads than items in the work queue.
    141         num_threads = len(self._threads)
    142         if num_threads < self._max_workers:
    143             thread_name = '%s_%d' % (self._thread_name_prefix or self,
    144                                      num_threads)
    145             t = threading.Thread(name=thread_name, target=_worker,
    146                                  args=(weakref.ref(self, weakref_cb),
    147                                        self._work_queue))
    148             t.daemon = True
    149             t.start()
    150             self._threads.add(t)
    151             _threads_queues[t] = self._work_queue
    152 
    153     def shutdown(self, wait=True):
    154         with self._shutdown_lock:
    155             self._shutdown = True
    156             self._work_queue.put(None)
    157         if wait:
    158             for t in self._threads:
    159                 t.join(sys.maxint)
    160     shutdown.__doc__ = _base.Executor.shutdown.__doc__
    161