1 # Copyright 2009 Brian Quinlan. All Rights Reserved. 2 # Licensed to PSF under a Contributor Agreement. 3 4 import collections 5 import logging 6 import threading 7 import itertools 8 import time 9 import types 10 11 __author__ = 'Brian Quinlan (brian (at] sweetapp.com)' 12 13 FIRST_COMPLETED = 'FIRST_COMPLETED' 14 FIRST_EXCEPTION = 'FIRST_EXCEPTION' 15 ALL_COMPLETED = 'ALL_COMPLETED' 16 _AS_COMPLETED = '_AS_COMPLETED' 17 18 # Possible future states (for internal use by the futures package). 19 PENDING = 'PENDING' 20 RUNNING = 'RUNNING' 21 # The future was cancelled by the user... 22 CANCELLED = 'CANCELLED' 23 # ...and _Waiter.add_cancelled() was called by a worker. 24 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' 25 FINISHED = 'FINISHED' 26 27 _FUTURE_STATES = [ 28 PENDING, 29 RUNNING, 30 CANCELLED, 31 CANCELLED_AND_NOTIFIED, 32 FINISHED 33 ] 34 35 _STATE_TO_DESCRIPTION_MAP = { 36 PENDING: "pending", 37 RUNNING: "running", 38 CANCELLED: "cancelled", 39 CANCELLED_AND_NOTIFIED: "cancelled", 40 FINISHED: "finished" 41 } 42 43 # Logger for internal use by the futures package. 44 LOGGER = logging.getLogger("concurrent.futures") 45 46 class Error(Exception): 47 """Base class for all future-related exceptions.""" 48 pass 49 50 class CancelledError(Error): 51 """The Future was cancelled.""" 52 pass 53 54 class TimeoutError(Error): 55 """The operation exceeded the given deadline.""" 56 pass 57 58 class _Waiter(object): 59 """Provides the event that wait() and as_completed() block on.""" 60 def __init__(self): 61 self.event = threading.Event() 62 self.finished_futures = [] 63 64 def add_result(self, future): 65 self.finished_futures.append(future) 66 67 def add_exception(self, future): 68 self.finished_futures.append(future) 69 70 def add_cancelled(self, future): 71 self.finished_futures.append(future) 72 73 class _AsCompletedWaiter(_Waiter): 74 """Used by as_completed().""" 75 76 def __init__(self): 77 super(_AsCompletedWaiter, self).__init__() 78 self.lock = threading.Lock() 79 80 def add_result(self, future): 81 with self.lock: 82 super(_AsCompletedWaiter, self).add_result(future) 83 self.event.set() 84 85 def add_exception(self, future): 86 with self.lock: 87 super(_AsCompletedWaiter, self).add_exception(future) 88 self.event.set() 89 90 def add_cancelled(self, future): 91 with self.lock: 92 super(_AsCompletedWaiter, self).add_cancelled(future) 93 self.event.set() 94 95 class _FirstCompletedWaiter(_Waiter): 96 """Used by wait(return_when=FIRST_COMPLETED).""" 97 98 def add_result(self, future): 99 super(_FirstCompletedWaiter, self).add_result(future) 100 self.event.set() 101 102 def add_exception(self, future): 103 super(_FirstCompletedWaiter, self).add_exception(future) 104 self.event.set() 105 106 def add_cancelled(self, future): 107 super(_FirstCompletedWaiter, self).add_cancelled(future) 108 self.event.set() 109 110 class _AllCompletedWaiter(_Waiter): 111 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" 112 113 def __init__(self, num_pending_calls, stop_on_exception): 114 self.num_pending_calls = num_pending_calls 115 self.stop_on_exception = stop_on_exception 116 self.lock = threading.Lock() 117 super(_AllCompletedWaiter, self).__init__() 118 119 def _decrement_pending_calls(self): 120 with self.lock: 121 self.num_pending_calls -= 1 122 if not self.num_pending_calls: 123 self.event.set() 124 125 def add_result(self, future): 126 super(_AllCompletedWaiter, self).add_result(future) 127 self._decrement_pending_calls() 128 129 def add_exception(self, future): 130 super(_AllCompletedWaiter, self).add_exception(future) 131 if self.stop_on_exception: 132 self.event.set() 133 else: 134 self._decrement_pending_calls() 135 136 def add_cancelled(self, future): 137 super(_AllCompletedWaiter, self).add_cancelled(future) 138 self._decrement_pending_calls() 139 140 class _AcquireFutures(object): 141 """A context manager that does an ordered acquire of Future conditions.""" 142 143 def __init__(self, futures): 144 self.futures = sorted(futures, key=id) 145 146 def __enter__(self): 147 for future in self.futures: 148 future._condition.acquire() 149 150 def __exit__(self, *args): 151 for future in self.futures: 152 future._condition.release() 153 154 def _create_and_install_waiters(fs, return_when): 155 if return_when == _AS_COMPLETED: 156 waiter = _AsCompletedWaiter() 157 elif return_when == FIRST_COMPLETED: 158 waiter = _FirstCompletedWaiter() 159 else: 160 pending_count = sum( 161 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) 162 163 if return_when == FIRST_EXCEPTION: 164 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) 165 elif return_when == ALL_COMPLETED: 166 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) 167 else: 168 raise ValueError("Invalid return condition: %r" % return_when) 169 170 for f in fs: 171 f._waiters.append(waiter) 172 173 return waiter 174 175 176 def _yield_finished_futures(fs, waiter, ref_collect): 177 """ 178 Iterate on the list *fs*, yielding finished futures one by one in 179 reverse order. 180 Before yielding a future, *waiter* is removed from its waiters 181 and the future is removed from each set in the collection of sets 182 *ref_collect*. 183 184 The aim of this function is to avoid keeping stale references after 185 the future is yielded and before the iterator resumes. 186 """ 187 while fs: 188 f = fs[-1] 189 for futures_set in ref_collect: 190 futures_set.remove(f) 191 with f._condition: 192 f._waiters.remove(waiter) 193 del f 194 # Careful not to keep a reference to the popped value 195 yield fs.pop() 196 197 198 def as_completed(fs, timeout=None): 199 """An iterator over the given futures that yields each as it completes. 200 201 Args: 202 fs: The sequence of Futures (possibly created by different Executors) to 203 iterate over. 204 timeout: The maximum number of seconds to wait. If None, then there 205 is no limit on the wait time. 206 207 Returns: 208 An iterator that yields the given Futures as they complete (finished or 209 cancelled). If any given Futures are duplicated, they will be returned 210 once. 211 212 Raises: 213 TimeoutError: If the entire result iterator could not be generated 214 before the given timeout. 215 """ 216 if timeout is not None: 217 end_time = timeout + time.time() 218 219 fs = set(fs) 220 total_futures = len(fs) 221 with _AcquireFutures(fs): 222 finished = set( 223 f for f in fs 224 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 225 pending = fs - finished 226 waiter = _create_and_install_waiters(fs, _AS_COMPLETED) 227 finished = list(finished) 228 try: 229 for f in _yield_finished_futures(finished, waiter, 230 ref_collect=(fs,)): 231 f = [f] 232 yield f.pop() 233 234 while pending: 235 if timeout is None: 236 wait_timeout = None 237 else: 238 wait_timeout = end_time - time.time() 239 if wait_timeout < 0: 240 raise TimeoutError( 241 '%d (of %d) futures unfinished' % ( 242 len(pending), total_futures)) 243 244 waiter.event.wait(wait_timeout) 245 246 with waiter.lock: 247 finished = waiter.finished_futures 248 waiter.finished_futures = [] 249 waiter.event.clear() 250 251 # reverse to keep finishing order 252 finished.reverse() 253 for f in _yield_finished_futures(finished, waiter, 254 ref_collect=(fs, pending)): 255 f = [f] 256 yield f.pop() 257 258 finally: 259 # Remove waiter from unfinished futures 260 for f in fs: 261 with f._condition: 262 f._waiters.remove(waiter) 263 264 DoneAndNotDoneFutures = collections.namedtuple( 265 'DoneAndNotDoneFutures', 'done not_done') 266 def wait(fs, timeout=None, return_when=ALL_COMPLETED): 267 """Wait for the futures in the given sequence to complete. 268 269 Args: 270 fs: The sequence of Futures (possibly created by different Executors) to 271 wait upon. 272 timeout: The maximum number of seconds to wait. If None, then there 273 is no limit on the wait time. 274 return_when: Indicates when this function should return. The options 275 are: 276 277 FIRST_COMPLETED - Return when any future finishes or is 278 cancelled. 279 FIRST_EXCEPTION - Return when any future finishes by raising an 280 exception. If no future raises an exception 281 then it is equivalent to ALL_COMPLETED. 282 ALL_COMPLETED - Return when all futures finish or are cancelled. 283 284 Returns: 285 A named 2-tuple of sets. The first set, named 'done', contains the 286 futures that completed (is finished or cancelled) before the wait 287 completed. The second set, named 'not_done', contains uncompleted 288 futures. 289 """ 290 with _AcquireFutures(fs): 291 done = set(f for f in fs 292 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 293 not_done = set(fs) - done 294 295 if (return_when == FIRST_COMPLETED) and done: 296 return DoneAndNotDoneFutures(done, not_done) 297 elif (return_when == FIRST_EXCEPTION) and done: 298 if any(f for f in done 299 if not f.cancelled() and f.exception() is not None): 300 return DoneAndNotDoneFutures(done, not_done) 301 302 if len(done) == len(fs): 303 return DoneAndNotDoneFutures(done, not_done) 304 305 waiter = _create_and_install_waiters(fs, return_when) 306 307 waiter.event.wait(timeout) 308 for f in fs: 309 with f._condition: 310 f._waiters.remove(waiter) 311 312 done.update(waiter.finished_futures) 313 return DoneAndNotDoneFutures(done, set(fs) - done) 314 315 class Future(object): 316 """Represents the result of an asynchronous computation.""" 317 318 def __init__(self): 319 """Initializes the future. Should not be called by clients.""" 320 self._condition = threading.Condition() 321 self._state = PENDING 322 self._result = None 323 self._exception = None 324 self._traceback = None 325 self._waiters = [] 326 self._done_callbacks = [] 327 328 def _invoke_callbacks(self): 329 for callback in self._done_callbacks: 330 try: 331 callback(self) 332 except Exception: 333 LOGGER.exception('exception calling callback for %r', self) 334 except BaseException: 335 # Explicitly let all other new-style exceptions through so 336 # that we can catch all old-style exceptions with a simple 337 # "except:" clause below. 338 # 339 # All old-style exception objects are instances of 340 # types.InstanceType, but "except types.InstanceType:" does 341 # not catch old-style exceptions for some reason. Thus, the 342 # only way to catch all old-style exceptions without catching 343 # any new-style exceptions is to filter out the new-style 344 # exceptions, which all derive from BaseException. 345 raise 346 except: 347 # Because of the BaseException clause above, this handler only 348 # executes for old-style exception objects. 349 LOGGER.exception('exception calling callback for %r', self) 350 351 def __repr__(self): 352 with self._condition: 353 if self._state == FINISHED: 354 if self._exception: 355 return '<%s at %#x state=%s raised %s>' % ( 356 self.__class__.__name__, 357 id(self), 358 _STATE_TO_DESCRIPTION_MAP[self._state], 359 self._exception.__class__.__name__) 360 else: 361 return '<%s at %#x state=%s returned %s>' % ( 362 self.__class__.__name__, 363 id(self), 364 _STATE_TO_DESCRIPTION_MAP[self._state], 365 self._result.__class__.__name__) 366 return '<%s at %#x state=%s>' % ( 367 self.__class__.__name__, 368 id(self), 369 _STATE_TO_DESCRIPTION_MAP[self._state]) 370 371 def cancel(self): 372 """Cancel the future if possible. 373 374 Returns True if the future was cancelled, False otherwise. A future 375 cannot be cancelled if it is running or has already completed. 376 """ 377 with self._condition: 378 if self._state in [RUNNING, FINISHED]: 379 return False 380 381 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 382 return True 383 384 self._state = CANCELLED 385 self._condition.notify_all() 386 387 self._invoke_callbacks() 388 return True 389 390 def cancelled(self): 391 """Return True if the future was cancelled.""" 392 with self._condition: 393 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] 394 395 def running(self): 396 """Return True if the future is currently executing.""" 397 with self._condition: 398 return self._state == RUNNING 399 400 def done(self): 401 """Return True of the future was cancelled or finished executing.""" 402 with self._condition: 403 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] 404 405 def __get_result(self): 406 if self._exception: 407 if isinstance(self._exception, types.InstanceType): 408 # The exception is an instance of an old-style class, which 409 # means type(self._exception) returns types.ClassType instead 410 # of the exception's actual class type. 411 exception_type = self._exception.__class__ 412 else: 413 exception_type = type(self._exception) 414 raise exception_type, self._exception, self._traceback 415 else: 416 return self._result 417 418 def add_done_callback(self, fn): 419 """Attaches a callable that will be called when the future finishes. 420 421 Args: 422 fn: A callable that will be called with this future as its only 423 argument when the future completes or is cancelled. The callable 424 will always be called by a thread in the same process in which 425 it was added. If the future has already completed or been 426 cancelled then the callable will be called immediately. These 427 callables are called in the order that they were added. 428 """ 429 with self._condition: 430 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: 431 self._done_callbacks.append(fn) 432 return 433 fn(self) 434 435 def result(self, timeout=None): 436 """Return the result of the call that the future represents. 437 438 Args: 439 timeout: The number of seconds to wait for the result if the future 440 isn't done. If None, then there is no limit on the wait time. 441 442 Returns: 443 The result of the call that the future represents. 444 445 Raises: 446 CancelledError: If the future was cancelled. 447 TimeoutError: If the future didn't finish executing before the given 448 timeout. 449 Exception: If the call raised then that exception will be raised. 450 """ 451 with self._condition: 452 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 453 raise CancelledError() 454 elif self._state == FINISHED: 455 return self.__get_result() 456 457 self._condition.wait(timeout) 458 459 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 460 raise CancelledError() 461 elif self._state == FINISHED: 462 return self.__get_result() 463 else: 464 raise TimeoutError() 465 466 def exception_info(self, timeout=None): 467 """Return a tuple of (exception, traceback) raised by the call that the 468 future represents. 469 470 Args: 471 timeout: The number of seconds to wait for the exception if the 472 future isn't done. If None, then there is no limit on the wait 473 time. 474 475 Returns: 476 The exception raised by the call that the future represents or None 477 if the call completed without raising. 478 479 Raises: 480 CancelledError: If the future was cancelled. 481 TimeoutError: If the future didn't finish executing before the given 482 timeout. 483 """ 484 with self._condition: 485 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 486 raise CancelledError() 487 elif self._state == FINISHED: 488 return self._exception, self._traceback 489 490 self._condition.wait(timeout) 491 492 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 493 raise CancelledError() 494 elif self._state == FINISHED: 495 return self._exception, self._traceback 496 else: 497 raise TimeoutError() 498 499 def exception(self, timeout=None): 500 """Return the exception raised by the call that the future represents. 501 502 Args: 503 timeout: The number of seconds to wait for the exception if the 504 future isn't done. If None, then there is no limit on the wait 505 time. 506 507 Returns: 508 The exception raised by the call that the future represents or None 509 if the call completed without raising. 510 511 Raises: 512 CancelledError: If the future was cancelled. 513 TimeoutError: If the future didn't finish executing before the given 514 timeout. 515 """ 516 return self.exception_info(timeout)[0] 517 518 # The following methods should only be used by Executors and in tests. 519 def set_running_or_notify_cancel(self): 520 """Mark the future as running or process any cancel notifications. 521 522 Should only be used by Executor implementations and unit tests. 523 524 If the future has been cancelled (cancel() was called and returned 525 True) then any threads waiting on the future completing (though calls 526 to as_completed() or wait()) are notified and False is returned. 527 528 If the future was not cancelled then it is put in the running state 529 (future calls to running() will return True) and True is returned. 530 531 This method should be called by Executor implementations before 532 executing the work associated with this future. If this method returns 533 False then the work should not be executed. 534 535 Returns: 536 False if the Future was cancelled, True otherwise. 537 538 Raises: 539 RuntimeError: if this method was already called or if set_result() 540 or set_exception() was called. 541 """ 542 with self._condition: 543 if self._state == CANCELLED: 544 self._state = CANCELLED_AND_NOTIFIED 545 for waiter in self._waiters: 546 waiter.add_cancelled(self) 547 # self._condition.notify_all() is not necessary because 548 # self.cancel() triggers a notification. 549 return False 550 elif self._state == PENDING: 551 self._state = RUNNING 552 return True 553 else: 554 LOGGER.critical('Future %s in unexpected state: %s', 555 id(self), 556 self._state) 557 raise RuntimeError('Future in unexpected state') 558 559 def set_result(self, result): 560 """Sets the return value of work associated with the future. 561 562 Should only be used by Executor implementations and unit tests. 563 """ 564 with self._condition: 565 self._result = result 566 self._state = FINISHED 567 for waiter in self._waiters: 568 waiter.add_result(self) 569 self._condition.notify_all() 570 self._invoke_callbacks() 571 572 def set_exception_info(self, exception, traceback): 573 """Sets the result of the future as being the given exception 574 and traceback. 575 576 Should only be used by Executor implementations and unit tests. 577 """ 578 with self._condition: 579 self._exception = exception 580 self._traceback = traceback 581 self._state = FINISHED 582 for waiter in self._waiters: 583 waiter.add_exception(self) 584 self._condition.notify_all() 585 self._invoke_callbacks() 586 587 def set_exception(self, exception): 588 """Sets the result of the future as being the given exception. 589 590 Should only be used by Executor implementations and unit tests. 591 """ 592 self.set_exception_info(exception, None) 593 594 class Executor(object): 595 """This is an abstract base class for concrete asynchronous executors.""" 596 597 def submit(self, fn, *args, **kwargs): 598 """Submits a callable to be executed with the given arguments. 599 600 Schedules the callable to be executed as fn(*args, **kwargs) and returns 601 a Future instance representing the execution of the callable. 602 603 Returns: 604 A Future representing the given call. 605 """ 606 raise NotImplementedError() 607 608 def map(self, fn, *iterables, **kwargs): 609 """Returns an iterator equivalent to map(fn, iter). 610 611 Args: 612 fn: A callable that will take as many arguments as there are 613 passed iterables. 614 timeout: The maximum number of seconds to wait. If None, then there 615 is no limit on the wait time. 616 617 Returns: 618 An iterator equivalent to: map(func, *iterables) but the calls may 619 be evaluated out-of-order. 620 621 Raises: 622 TimeoutError: If the entire result iterator could not be generated 623 before the given timeout. 624 Exception: If fn(*args) raises for any values. 625 """ 626 timeout = kwargs.get('timeout') 627 if timeout is not None: 628 end_time = timeout + time.time() 629 630 fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)] 631 632 # Yield must be hidden in closure so that the futures are submitted 633 # before the first iterator value is required. 634 def result_iterator(): 635 try: 636 # reverse to keep finishing order 637 fs.reverse() 638 while fs: 639 # Careful not to keep a reference to the popped future 640 if timeout is None: 641 yield fs.pop().result() 642 else: 643 yield fs.pop().result(end_time - time.time()) 644 finally: 645 for future in fs: 646 future.cancel() 647 return result_iterator() 648 649 def shutdown(self, wait=True): 650 """Clean-up the resources associated with the Executor. 651 652 It is safe to call this method several times. Otherwise, no other 653 methods can be called after this one. 654 655 Args: 656 wait: If True then shutdown will not return until all running 657 futures have finished executing and the resources used by the 658 executor have been reclaimed. 659 """ 660 pass 661 662 def __enter__(self): 663 return self 664 665 def __exit__(self, exc_type, exc_val, exc_tb): 666 self.shutdown(wait=True) 667 return False 668