Home | History | Annotate | Download | only in library
      1 :mod:`concurrent.futures` --- Launching parallel tasks
      2 ======================================================
      3 
      4 .. module:: concurrent.futures
      5    :synopsis: Execute computations concurrently using threads or processes.
      6 
      7 .. versionadded:: 3.2
      8 
      9 **Source code:** :source:`Lib/concurrent/futures/thread.py`
     10 and :source:`Lib/concurrent/futures/process.py`
     11 
     12 --------------
     13 
     14 The :mod:`concurrent.futures` module provides a high-level interface for
     15 asynchronously executing callables.
     16 
     17 The asynchronous execution can be performed with threads, using
     18 :class:`ThreadPoolExecutor`, or separate processes, using
     19 :class:`ProcessPoolExecutor`.  Both implement the same interface, which is
     20 defined by the abstract :class:`Executor` class.
     21 
     22 
     23 Executor Objects
     24 ----------------
     25 
     26 .. class:: Executor
     27 
     28    An abstract class that provides methods to execute calls asynchronously.  It
     29    should not be used directly, but through its concrete subclasses.
     30 
     31     .. method:: submit(fn, *args, **kwargs)
     32 
     33        Schedules the callable, *fn*, to be executed as ``fn(*args **kwargs)``
     34        and returns a :class:`Future` object representing the execution of the
     35        callable. ::
     36 
     37           with ThreadPoolExecutor(max_workers=1) as executor:
     38               future = executor.submit(pow, 323, 1235)
     39               print(future.result())
     40 
     41     .. method:: map(func, *iterables, timeout=None, chunksize=1)
     42 
     43        Similar to :func:`map(func, *iterables) <map>` except:
     44 
     45        * the *iterables* are collected immediately rather than lazily;
     46 
     47        * *func* is executed asynchronously and several calls to
     48          *func* may be made concurrently.
     49 
     50        The returned iterator raises a :exc:`concurrent.futures.TimeoutError`
     51        if :meth:`~iterator.__next__` is called and the result isn't available
     52        after *timeout* seconds from the original call to :meth:`Executor.map`.
     53        *timeout* can be an int or a float.  If *timeout* is not specified or
     54        ``None``, there is no limit to the wait time.
     55 
     56        If a *func* call raises an exception, then that exception will be
     57        raised when its value is retrieved from the iterator.
     58 
     59        When using :class:`ProcessPoolExecutor`, this method chops *iterables*
     60        into a number of chunks which it submits to the pool as separate
     61        tasks.  The (approximate) size of these chunks can be specified by
     62        setting *chunksize* to a positive integer.  For very long iterables,
     63        using a large value for *chunksize* can significantly improve
     64        performance compared to the default size of 1.  With
     65        :class:`ThreadPoolExecutor`, *chunksize* has no effect.
     66 
     67        .. versionchanged:: 3.5
     68           Added the *chunksize* argument.
     69 
     70     .. method:: shutdown(wait=True)
     71 
     72        Signal the executor that it should free any resources that it is using
     73        when the currently pending futures are done executing.  Calls to
     74        :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will
     75        raise :exc:`RuntimeError`.
     76 
     77        If *wait* is ``True`` then this method will not return until all the
     78        pending futures are done executing and the resources associated with the
     79        executor have been freed.  If *wait* is ``False`` then this method will
     80        return immediately and the resources associated with the executor will be
     81        freed when all pending futures are done executing.  Regardless of the
     82        value of *wait*, the entire Python program will not exit until all
     83        pending futures are done executing.
     84 
     85        You can avoid having to call this method explicitly if you use the
     86        :keyword:`with` statement, which will shutdown the :class:`Executor`
     87        (waiting as if :meth:`Executor.shutdown` were called with *wait* set to
     88        ``True``)::
     89 
     90           import shutil
     91           with ThreadPoolExecutor(max_workers=4) as e:
     92               e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
     93               e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
     94               e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
     95               e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
     96 
     97 
     98 ThreadPoolExecutor
     99 ------------------
    100 
    101 :class:`ThreadPoolExecutor` is an :class:`Executor` subclass that uses a pool of
    102 threads to execute calls asynchronously.
    103 
    104 Deadlocks can occur when the callable associated with a :class:`Future` waits on
    105 the results of another :class:`Future`.  For example::
    106 
    107    import time
    108    def wait_on_b():
    109        time.sleep(5)
    110        print(b.result())  # b will never complete because it is waiting on a.
    111        return 5
    112 
    113    def wait_on_a():
    114        time.sleep(5)
    115        print(a.result())  # a will never complete because it is waiting on b.
    116        return 6
    117 
    118 
    119    executor = ThreadPoolExecutor(max_workers=2)
    120    a = executor.submit(wait_on_b)
    121    b = executor.submit(wait_on_a)
    122 
    123 And::
    124 
    125    def wait_on_future():
    126        f = executor.submit(pow, 5, 2)
    127        # This will never complete because there is only one worker thread and
    128        # it is executing this function.
    129        print(f.result())
    130 
    131    executor = ThreadPoolExecutor(max_workers=1)
    132    executor.submit(wait_on_future)
    133 
    134 
    135 .. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
    136 
    137    An :class:`Executor` subclass that uses a pool of at most *max_workers*
    138    threads to execute calls asynchronously.
    139 
    140    *initializer* is an optional callable that is called at the start of
    141    each worker thread; *initargs* is a tuple of arguments passed to the
    142    initializer.  Should *initializer* raise an exception, all currently
    143    pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
    144    as well as any attempt to submit more jobs to the pool.
    145 
    146    .. versionchanged:: 3.5
    147       If *max_workers* is ``None`` or
    148       not given, it will default to the number of processors on the machine,
    149       multiplied by ``5``, assuming that :class:`ThreadPoolExecutor` is often
    150       used to overlap I/O instead of CPU work and the number of workers
    151       should be higher than the number of workers
    152       for :class:`ProcessPoolExecutor`.
    153 
    154    .. versionadded:: 3.6
    155       The *thread_name_prefix* argument was added to allow users to
    156       control the :class:`threading.Thread` names for worker threads created by
    157       the pool for easier debugging.
    158 
    159    .. versionchanged:: 3.7
    160       Added the *initializer* and *initargs* arguments.
    161 
    162 
    163 .. _threadpoolexecutor-example:
    164 
    165 ThreadPoolExecutor Example
    166 ~~~~~~~~~~~~~~~~~~~~~~~~~~
    167 ::
    168 
    169    import concurrent.futures
    170    import urllib.request
    171 
    172    URLS = ['http://www.foxnews.com/',
    173            'http://www.cnn.com/',
    174            'http://europe.wsj.com/',
    175            'http://www.bbc.co.uk/',
    176            'http://some-made-up-domain.com/']
    177 
    178    # Retrieve a single page and report the URL and contents
    179    def load_url(url, timeout):
    180        with urllib.request.urlopen(url, timeout=timeout) as conn:
    181            return conn.read()
    182 
    183    # We can use a with statement to ensure threads are cleaned up promptly
    184    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    185        # Start the load operations and mark each future with its URL
    186        future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    187        for future in concurrent.futures.as_completed(future_to_url):
    188            url = future_to_url[future]
    189            try:
    190                data = future.result()
    191            except Exception as exc:
    192                print('%r generated an exception: %s' % (url, exc))
    193            else:
    194                print('%r page is %d bytes' % (url, len(data)))
    195 
    196 
    197 ProcessPoolExecutor
    198 -------------------
    199 
    200 The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that
    201 uses a pool of processes to execute calls asynchronously.
    202 :class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which
    203 allows it to side-step the :term:`Global Interpreter Lock` but also means that
    204 only picklable objects can be executed and returned.
    205 
    206 The ``__main__`` module must be importable by worker subprocesses. This means
    207 that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.
    208 
    209 Calling :class:`Executor` or :class:`Future` methods from a callable submitted
    210 to a :class:`ProcessPoolExecutor` will result in deadlock.
    211 
    212 .. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
    213 
    214    An :class:`Executor` subclass that executes calls asynchronously using a pool
    215    of at most *max_workers* processes.  If *max_workers* is ``None`` or not
    216    given, it will default to the number of processors on the machine.
    217    If *max_workers* is lower or equal to ``0``, then a :exc:`ValueError`
    218    will be raised.
    219    *mp_context* can be a multiprocessing context or None. It will be used to
    220    launch the workers. If *mp_context* is ``None`` or not given, the default
    221    multiprocessing context is used.
    222 
    223    *initializer* is an optional callable that is called at the start of
    224    each worker process; *initargs* is a tuple of arguments passed to the
    225    initializer.  Should *initializer* raise an exception, all currently
    226    pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`,
    227    as well any attempt to submit more jobs to the pool.
    228 
    229    .. versionchanged:: 3.3
    230       When one of the worker processes terminates abruptly, a
    231       :exc:`BrokenProcessPool` error is now raised.  Previously, behaviour
    232       was undefined but operations on the executor or its futures would often
    233       freeze or deadlock.
    234 
    235    .. versionchanged:: 3.7
    236       The *mp_context* argument was added to allow users to control the
    237       start_method for worker processes created by the pool.
    238 
    239       Added the *initializer* and *initargs* arguments.
    240 
    241 
    242 .. _processpoolexecutor-example:
    243 
    244 ProcessPoolExecutor Example
    245 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
    246 ::
    247 
    248    import concurrent.futures
    249    import math
    250 
    251    PRIMES = [
    252        112272535095293,
    253        112582705942171,
    254        112272535095293,
    255        115280095190773,
    256        115797848077099,
    257        1099726899285419]
    258 
    259    def is_prime(n):
    260        if n % 2 == 0:
    261            return False
    262 
    263        sqrt_n = int(math.floor(math.sqrt(n)))
    264        for i in range(3, sqrt_n + 1, 2):
    265            if n % i == 0:
    266                return False
    267        return True
    268 
    269    def main():
    270        with concurrent.futures.ProcessPoolExecutor() as executor:
    271            for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
    272                print('%d is prime: %s' % (number, prime))
    273 
    274    if __name__ == '__main__':
    275        main()
    276 
    277 
    278 Future Objects
    279 --------------
    280 
    281 The :class:`Future` class encapsulates the asynchronous execution of a callable.
    282 :class:`Future` instances are created by :meth:`Executor.submit`.
    283 
    284 .. class:: Future
    285 
    286    Encapsulates the asynchronous execution of a callable.  :class:`Future`
    287    instances are created by :meth:`Executor.submit` and should not be created
    288    directly except for testing.
    289 
    290     .. method:: cancel()
    291 
    292        Attempt to cancel the call.  If the call is currently being executed and
    293        cannot be cancelled then the method will return ``False``, otherwise the
    294        call will be cancelled and the method will return ``True``.
    295 
    296     .. method:: cancelled()
    297 
    298        Return ``True`` if the call was successfully cancelled.
    299 
    300     .. method:: running()
    301 
    302        Return ``True`` if the call is currently being executed and cannot be
    303        cancelled.
    304 
    305     .. method:: done()
    306 
    307        Return ``True`` if the call was successfully cancelled or finished
    308        running.
    309 
    310     .. method:: result(timeout=None)
    311 
    312        Return the value returned by the call. If the call hasn't yet completed
    313        then this method will wait up to *timeout* seconds.  If the call hasn't
    314        completed in *timeout* seconds, then a
    315        :exc:`concurrent.futures.TimeoutError` will be raised. *timeout* can be
    316        an int or float.  If *timeout* is not specified or ``None``, there is no
    317        limit to the wait time.
    318 
    319        If the future is cancelled before completing then :exc:`.CancelledError`
    320        will be raised.
    321 
    322        If the call raised, this method will raise the same exception.
    323 
    324     .. method:: exception(timeout=None)
    325 
    326        Return the exception raised by the call.  If the call hasn't yet
    327        completed then this method will wait up to *timeout* seconds.  If the
    328        call hasn't completed in *timeout* seconds, then a
    329        :exc:`concurrent.futures.TimeoutError` will be raised.  *timeout* can be
    330        an int or float.  If *timeout* is not specified or ``None``, there is no
    331        limit to the wait time.
    332 
    333        If the future is cancelled before completing then :exc:`.CancelledError`
    334        will be raised.
    335 
    336        If the call completed without raising, ``None`` is returned.
    337 
    338     .. method:: add_done_callback(fn)
    339 
    340        Attaches the callable *fn* to the future.  *fn* will be called, with the
    341        future as its only argument, when the future is cancelled or finishes
    342        running.
    343 
    344        Added callables are called in the order that they were added and are
    345        always called in a thread belonging to the process that added them.  If
    346        the callable raises an :exc:`Exception` subclass, it will be logged and
    347        ignored.  If the callable raises a :exc:`BaseException` subclass, the
    348        behavior is undefined.
    349 
    350        If the future has already completed or been cancelled, *fn* will be
    351        called immediately.
    352 
    353    The following :class:`Future` methods are meant for use in unit tests and
    354    :class:`Executor` implementations.
    355 
    356     .. method:: set_running_or_notify_cancel()
    357 
    358        This method should only be called by :class:`Executor` implementations
    359        before executing the work associated with the :class:`Future` and by unit
    360        tests.
    361 
    362        If the method returns ``False`` then the :class:`Future` was cancelled,
    363        i.e. :meth:`Future.cancel` was called and returned `True`.  Any threads
    364        waiting on the :class:`Future` completing (i.e. through
    365        :func:`as_completed` or :func:`wait`) will be woken up.
    366 
    367        If the method returns ``True`` then the :class:`Future` was not cancelled
    368        and has been put in the running state, i.e. calls to
    369        :meth:`Future.running` will return `True`.
    370 
    371        This method can only be called once and cannot be called after
    372        :meth:`Future.set_result` or :meth:`Future.set_exception` have been
    373        called.
    374 
    375     .. method:: set_result(result)
    376 
    377        Sets the result of the work associated with the :class:`Future` to
    378        *result*.
    379 
    380        This method should only be used by :class:`Executor` implementations and
    381        unit tests.
    382 
    383     .. method:: set_exception(exception)
    384 
    385        Sets the result of the work associated with the :class:`Future` to the
    386        :class:`Exception` *exception*.
    387 
    388        This method should only be used by :class:`Executor` implementations and
    389        unit tests.
    390 
    391 
    392 Module Functions
    393 ----------------
    394 
    395 .. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED)
    396 
    397    Wait for the :class:`Future` instances (possibly created by different
    398    :class:`Executor` instances) given by *fs* to complete.  Returns a named
    399    2-tuple of sets.  The first set, named ``done``, contains the futures that
    400    completed (finished or were cancelled) before the wait completed.  The second
    401    set, named ``not_done``, contains uncompleted futures.
    402 
    403    *timeout* can be used to control the maximum number of seconds to wait before
    404    returning.  *timeout* can be an int or float.  If *timeout* is not specified
    405    or ``None``, there is no limit to the wait time.
    406 
    407    *return_when* indicates when this function should return.  It must be one of
    408    the following constants:
    409 
    410    .. tabularcolumns:: |l|L|
    411 
    412    +-----------------------------+----------------------------------------+
    413    | Constant                    | Description                            |
    414    +=============================+========================================+
    415    | :const:`FIRST_COMPLETED`    | The function will return when any      |
    416    |                             | future finishes or is cancelled.       |
    417    +-----------------------------+----------------------------------------+
    418    | :const:`FIRST_EXCEPTION`    | The function will return when any      |
    419    |                             | future finishes by raising an          |
    420    |                             | exception.  If no future raises an     |
    421    |                             | exception then it is equivalent to     |
    422    |                             | :const:`ALL_COMPLETED`.                |
    423    +-----------------------------+----------------------------------------+
    424    | :const:`ALL_COMPLETED`      | The function will return when all      |
    425    |                             | futures finish or are cancelled.       |
    426    +-----------------------------+----------------------------------------+
    427 
    428 .. function:: as_completed(fs, timeout=None)
    429 
    430    Returns an iterator over the :class:`Future` instances (possibly created by
    431    different :class:`Executor` instances) given by *fs* that yields futures as
    432    they complete (finished or were cancelled). Any futures given by *fs* that
    433    are duplicated will be returned once. Any futures that completed before
    434    :func:`as_completed` is called will be yielded first.  The returned iterator
    435    raises a :exc:`concurrent.futures.TimeoutError` if :meth:`~iterator.__next__`
    436    is called and the result isn't available after *timeout* seconds from the
    437    original call to :func:`as_completed`.  *timeout* can be an int or float. If
    438    *timeout* is not specified or ``None``, there is no limit to the wait time.
    439 
    440 
    441 .. seealso::
    442 
    443    :pep:`3148` -- futures - execute computations asynchronously
    444       The proposal which described this feature for inclusion in the Python
    445       standard library.
    446 
    447 
    448 Exception classes
    449 -----------------
    450 
    451 .. currentmodule:: concurrent.futures
    452 
    453 .. exception:: CancelledError
    454 
    455    Raised when a future is cancelled.
    456 
    457 .. exception:: TimeoutError
    458 
    459    Raised when a future operation exceeds the given timeout.
    460 
    461 .. exception:: BrokenExecutor
    462 
    463    Derived from :exc:`RuntimeError`, this exception class is raised
    464    when an executor is broken for some reason, and cannot be used
    465    to submit or execute new tasks.
    466 
    467    .. versionadded:: 3.7
    468 
    469 .. currentmodule:: concurrent.futures.thread
    470 
    471 .. exception:: BrokenThreadPool
    472 
    473    Derived from :exc:`~concurrent.futures.BrokenExecutor`, this exception
    474    class is raised when one of the workers of a :class:`ThreadPoolExecutor`
    475    has failed initializing.
    476 
    477    .. versionadded:: 3.7
    478 
    479 .. currentmodule:: concurrent.futures.process
    480 
    481 .. exception:: BrokenProcessPool
    482 
    483    Derived from :exc:`~concurrent.futures.BrokenExecutor` (formerly
    484    :exc:`RuntimeError`), this exception class is raised when one of the
    485    workers of a :class:`ProcessPoolExecutor` has terminated in a non-clean
    486    fashion (for example, if it was killed from the outside).
    487 
    488    .. versionadded:: 3.3
    489