1 :mod:`concurrent.futures` --- Asynchronous computation 2 ====================================================== 3 4 .. module:: concurrent.futures 5 :synopsis: Execute computations asynchronously using threads or processes. 6 7 The :mod:`concurrent.futures` module provides a high-level interface for 8 asynchronously executing callables. 9 10 The asynchronous execution can be be performed by threads using 11 :class:`ThreadPoolExecutor` or separate processes using 12 :class:`ProcessPoolExecutor`. Both implement the same interface, which is 13 defined by the abstract :class:`Executor` class. 14 15 Executor Objects 16 ---------------- 17 18 :class:`Executor` is an abstract class that provides methods to execute calls 19 asynchronously. It should not be used directly, but through its two 20 subclasses: :class:`ThreadPoolExecutor` and :class:`ProcessPoolExecutor`. 21 22 .. method:: Executor.submit(fn, *args, **kwargs) 23 24 Schedules the callable to be executed as *fn*(*\*args*, *\*\*kwargs*) and 25 returns a :class:`Future` representing the execution of the callable. 26 27 :: 28 29 with ThreadPoolExecutor(max_workers=1) as executor: 30 future = executor.submit(pow, 323, 1235) 31 print(future.result()) 32 33 .. method:: Executor.map(func, *iterables, timeout=None) 34 35 Equivalent to map(*func*, *\*iterables*) but func is executed asynchronously 36 and several calls to *func* may be made concurrently. The returned iterator 37 raises a :exc:`TimeoutError` if :meth:`__next__()` is called and the result 38 isn't available after *timeout* seconds from the original call to 39 :meth:`map()`. *timeout* can be an int or float. If *timeout* is not 40 specified or ``None`` then there is no limit to the wait time. If a call 41 raises an exception then that exception will be raised when its value is 42 retrieved from the iterator. 43 44 .. method:: Executor.shutdown(wait=True) 45 46 Signal the executor that it should free any resources that it is using when 47 the currently pending futures are done executing. Calls to 48 :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will 49 raise :exc:`RuntimeError`. 50 51 If *wait* is `True` then this method will not return until all the pending 52 futures are done executing and the resources associated with the executor 53 have been freed. If *wait* is `False` then this method will return 54 immediately and the resources associated with the executor will be freed 55 when all pending futures are done executing. Regardless of the value of 56 *wait*, the entire Python program will not exit until all pending futures 57 are done executing. 58 59 You can avoid having to call this method explicitly if you use the `with` 60 statement, which will shutdown the `Executor` (waiting as if 61 `Executor.shutdown` were called with *wait* set to `True`): 62 63 :: 64 65 import shutil 66 with ThreadPoolExecutor(max_workers=4) as e: 67 e.submit(shutil.copy, 'src1.txt', 'dest1.txt') 68 e.submit(shutil.copy, 'src2.txt', 'dest2.txt') 69 e.submit(shutil.copy, 'src3.txt', 'dest3.txt') 70 e.submit(shutil.copy, 'src3.txt', 'dest4.txt') 71 72 73 ThreadPoolExecutor Objects 74 -------------------------- 75 76 The :class:`ThreadPoolExecutor` class is an :class:`Executor` subclass that uses 77 a pool of threads to execute calls asynchronously. 78 79 Deadlock can occur when the callable associated with a :class:`Future` waits on 80 the results of another :class:`Future`. For example: 81 82 :: 83 84 import time 85 def wait_on_b(): 86 time.sleep(5) 87 print(b.result()) # b will never complete because it is waiting on a. 88 return 5 89 90 def wait_on_a(): 91 time.sleep(5) 92 print(a.result()) # a will never complete because it is waiting on b. 93 return 6 94 95 96 executor = ThreadPoolExecutor(max_workers=2) 97 a = executor.submit(wait_on_b) 98 b = executor.submit(wait_on_a) 99 100 And: 101 102 :: 103 104 def wait_on_future(): 105 f = executor.submit(pow, 5, 2) 106 # This will never complete because there is only one worker thread and 107 # it is executing this function. 108 print(f.result()) 109 110 executor = ThreadPoolExecutor(max_workers=1) 111 executor.submit(wait_on_future) 112 113 .. class:: ThreadPoolExecutor(max_workers) 114 115 Executes calls asynchronously using a pool of at most *max_workers* threads. 116 117 .. _threadpoolexecutor-example: 118 119 ThreadPoolExecutor Example 120 ^^^^^^^^^^^^^^^^^^^^^^^^^^ 121 :: 122 123 from concurrent import futures 124 import urllib.request 125 126 URLS = ['http://www.foxnews.com/', 127 'http://www.cnn.com/', 128 'http://europe.wsj.com/', 129 'http://www.bbc.co.uk/', 130 'http://some-made-up-domain.com/'] 131 132 def load_url(url, timeout): 133 return urllib.request.urlopen(url, timeout=timeout).read() 134 135 with futures.ThreadPoolExecutor(max_workers=5) as executor: 136 future_to_url = dict((executor.submit(load_url, url, 60), url) 137 for url in URLS) 138 139 for future in futures.as_completed(future_to_url): 140 url = future_to_url[future] 141 if future.exception() is not None: 142 print('%r generated an exception: %s' % (url, 143 future.exception())) 144 else: 145 print('%r page is %d bytes' % (url, len(future.result()))) 146 147 ProcessPoolExecutor Objects 148 --------------------------- 149 150 The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that 151 uses a pool of processes to execute calls asynchronously. 152 :class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which 153 allows it to side-step the :term:`Global Interpreter Lock` but also means that 154 only picklable objects can be executed and returned. 155 156 Calling :class:`Executor` or :class:`Future` methods from a callable submitted 157 to a :class:`ProcessPoolExecutor` will result in deadlock. 158 159 .. class:: ProcessPoolExecutor(max_workers=None) 160 161 Executes calls asynchronously using a pool of at most *max_workers* 162 processes. If *max_workers* is ``None`` or not given then as many worker 163 processes will be created as the machine has processors. 164 165 .. _processpoolexecutor-example: 166 167 ProcessPoolExecutor Example 168 ^^^^^^^^^^^^^^^^^^^^^^^^^^^ 169 :: 170 171 import math 172 173 PRIMES = [ 174 112272535095293, 175 112582705942171, 176 112272535095293, 177 115280095190773, 178 115797848077099, 179 1099726899285419] 180 181 def is_prime(n): 182 if n % 2 == 0: 183 return False 184 185 sqrt_n = int(math.floor(math.sqrt(n))) 186 for i in range(3, sqrt_n + 1, 2): 187 if n % i == 0: 188 return False 189 return True 190 191 def main(): 192 with futures.ProcessPoolExecutor() as executor: 193 for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): 194 print('%d is prime: %s' % (number, prime)) 195 196 if __name__ == '__main__': 197 main() 198 199 Future Objects 200 -------------- 201 202 The :class:`Future` class encapulates the asynchronous execution of a callable. 203 :class:`Future` instances are created by :meth:`Executor.submit`. 204 205 .. method:: Future.cancel() 206 207 Attempt to cancel the call. If the call is currently being executed then 208 it cannot be cancelled and the method will return `False`, otherwise the call 209 will be cancelled and the method will return `True`. 210 211 .. method:: Future.cancelled() 212 213 Return `True` if the call was successfully cancelled. 214 215 .. method:: Future.running() 216 217 Return `True` if the call is currently being executed and cannot be 218 cancelled. 219 220 .. method:: Future.done() 221 222 Return `True` if the call was successfully cancelled or finished running. 223 224 .. method:: Future.result(timeout=None) 225 226 Return the value returned by the call. If the call hasn't yet completed then 227 this method will wait up to *timeout* seconds. If the call hasn't completed 228 in *timeout* seconds then a :exc:`TimeoutError` will be raised. *timeout* can 229 be an int or float.If *timeout* is not specified or ``None`` then there is no 230 limit to the wait time. 231 232 If the future is cancelled before completing then :exc:`CancelledError` will 233 be raised. 234 235 If the call raised then this method will raise the same exception. 236 237 .. method:: Future.exception(timeout=None) 238 239 Return the exception raised by the call. If the call hasn't yet completed 240 then this method will wait up to *timeout* seconds. If the call hasn't 241 completed in *timeout* seconds then a :exc:`TimeoutError` will be raised. 242 *timeout* can be an int or float. If *timeout* is not specified or ``None`` 243 then there is no limit to the wait time. 244 245 If the future is cancelled before completing then :exc:`CancelledError` will 246 be raised. 247 248 If the call completed without raising then ``None`` is returned. 249 250 .. method:: Future.add_done_callback(fn) 251 252 Attaches the callable *fn* to the future. *fn* will be called, with the 253 future as its only argument, when the future is cancelled or finishes 254 running. 255 256 Added callables are called in the order that they were added and are always 257 called in a thread belonging to the process that added them. If the callable 258 raises an :exc:`Exception` then it will be logged and ignored. If the 259 callable raises another :exc:`BaseException` then the behavior is not 260 defined. 261 262 If the future has already completed or been cancelled then *fn* will be 263 called immediately. 264 265 Internal Future Methods 266 ^^^^^^^^^^^^^^^^^^^^^^^ 267 268 The following :class:`Future` methods are meant for use in unit tests and 269 :class:`Executor` implementations. 270 271 .. method:: Future.set_running_or_notify_cancel() 272 273 This method should only be called by :class:`Executor` implementations before 274 executing the work associated with the :class:`Future` and by unit tests. 275 276 If the method returns `False` then the :class:`Future` was cancelled i.e. 277 :meth:`Future.cancel` was called and returned `True`. Any threads waiting 278 on the :class:`Future` completing (i.e. through :func:`as_completed` or 279 :func:`wait`) will be woken up. 280 281 If the method returns `True` then the :class:`Future` was not cancelled 282 and has been put in the running state i.e. calls to 283 :meth:`Future.running` will return `True`. 284 285 This method can only be called once and cannot be called after 286 :meth:`Future.set_result` or :meth:`Future.set_exception` have been 287 called. 288 289 .. method:: Future.set_result(result) 290 291 Sets the result of the work associated with the :class:`Future` to *result*. 292 293 This method should only be used by Executor implementations and unit tests. 294 295 .. method:: Future.set_exception(exception) 296 297 Sets the result of the work associated with the :class:`Future` to the 298 :class:`Exception` *exception*. 299 300 This method should only be used by Executor implementations and unit tests. 301 302 Module Functions 303 ---------------- 304 305 .. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED) 306 307 Wait for the :class:`Future` instances (possibly created by different 308 :class:`Executor` instances) given by *fs* to complete. Returns a named 309 2-tuple of sets. The first set, named "done", contains the futures that 310 completed (finished or were cancelled) before the wait completed. The second 311 set, named "not_done", contains uncompleted futures. 312 313 *timeout* can be used to control the maximum number of seconds to wait before 314 returning. *timeout* can be an int or float. If *timeout* is not specified or 315 ``None`` then there is no limit to the wait time. 316 317 *return_when* indicates when this function should return. It must be one of 318 the following constants: 319 320 +-----------------------------+----------------------------------------+ 321 | Constant | Description | 322 +=============================+========================================+ 323 | :const:`FIRST_COMPLETED` | The function will return when any | 324 | | future finishes or is cancelled. | 325 +-----------------------------+----------------------------------------+ 326 | :const:`FIRST_EXCEPTION` | The function will return when any | 327 | | future finishes by raising an | 328 | | exception. If no future raises an | 329 | | exception then it is equivalent to | 330 | | `ALL_COMPLETED`. | 331 +-----------------------------+----------------------------------------+ 332 | :const:`ALL_COMPLETED` | The function will return when all | 333 | | futures finish or are cancelled. | 334 +-----------------------------+----------------------------------------+ 335 336 .. function:: as_completed(fs, timeout=None) 337 338 Returns an iterator over the :class:`Future` instances (possibly created by 339 different :class:`Executor` instances) given by *fs* that yields futures as 340 they complete (finished or were cancelled). Any futures given by *fs* that 341 are duplicated will be returned once. Any futures that completed 342 before :func:`as_completed` is called will be yielded first. The returned 343 iterator raises a :exc:`TimeoutError` if :meth:`~iterator.__next__` is 344 called and the result isn't available after *timeout* seconds from the 345 original call to :func:`as_completed`. *timeout* can be an int or float. 346 If *timeout* is not specified or ``None``, there is no limit to the wait 347 time. 348