Home | History | Annotate | Download | only in futures
      1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
      2 # Licensed to PSF under a Contributor Agreement.
      3 
      4 import collections
      5 import logging
      6 import threading
      7 import itertools
      8 import time
      9 import types
     10 
     11 __author__ = 'Brian Quinlan (brian (at] sweetapp.com)'
     12 
     13 FIRST_COMPLETED = 'FIRST_COMPLETED'
     14 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
     15 ALL_COMPLETED = 'ALL_COMPLETED'
     16 _AS_COMPLETED = '_AS_COMPLETED'
     17 
     18 # Possible future states (for internal use by the futures package).
     19 PENDING = 'PENDING'
     20 RUNNING = 'RUNNING'
     21 # The future was cancelled by the user...
     22 CANCELLED = 'CANCELLED'
     23 # ...and _Waiter.add_cancelled() was called by a worker.
     24 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
     25 FINISHED = 'FINISHED'
     26 
     27 _FUTURE_STATES = [
     28     PENDING,
     29     RUNNING,
     30     CANCELLED,
     31     CANCELLED_AND_NOTIFIED,
     32     FINISHED
     33 ]
     34 
     35 _STATE_TO_DESCRIPTION_MAP = {
     36     PENDING: "pending",
     37     RUNNING: "running",
     38     CANCELLED: "cancelled",
     39     CANCELLED_AND_NOTIFIED: "cancelled",
     40     FINISHED: "finished"
     41 }
     42 
     43 # Logger for internal use by the futures package.
     44 LOGGER = logging.getLogger("concurrent.futures")
     45 
     46 class Error(Exception):
     47     """Base class for all future-related exceptions."""
     48     pass
     49 
     50 class CancelledError(Error):
     51     """The Future was cancelled."""
     52     pass
     53 
     54 class TimeoutError(Error):
     55     """The operation exceeded the given deadline."""
     56     pass
     57 
     58 class _Waiter(object):
     59     """Provides the event that wait() and as_completed() block on."""
     60     def __init__(self):
     61         self.event = threading.Event()
     62         self.finished_futures = []
     63 
     64     def add_result(self, future):
     65         self.finished_futures.append(future)
     66 
     67     def add_exception(self, future):
     68         self.finished_futures.append(future)
     69 
     70     def add_cancelled(self, future):
     71         self.finished_futures.append(future)
     72 
     73 class _AsCompletedWaiter(_Waiter):
     74     """Used by as_completed()."""
     75 
     76     def __init__(self):
     77         super(_AsCompletedWaiter, self).__init__()
     78         self.lock = threading.Lock()
     79 
     80     def add_result(self, future):
     81         with self.lock:
     82             super(_AsCompletedWaiter, self).add_result(future)
     83             self.event.set()
     84 
     85     def add_exception(self, future):
     86         with self.lock:
     87             super(_AsCompletedWaiter, self).add_exception(future)
     88             self.event.set()
     89 
     90     def add_cancelled(self, future):
     91         with self.lock:
     92             super(_AsCompletedWaiter, self).add_cancelled(future)
     93             self.event.set()
     94 
     95 class _FirstCompletedWaiter(_Waiter):
     96     """Used by wait(return_when=FIRST_COMPLETED)."""
     97 
     98     def add_result(self, future):
     99         super(_FirstCompletedWaiter, self).add_result(future)
    100         self.event.set()
    101 
    102     def add_exception(self, future):
    103         super(_FirstCompletedWaiter, self).add_exception(future)
    104         self.event.set()
    105 
    106     def add_cancelled(self, future):
    107         super(_FirstCompletedWaiter, self).add_cancelled(future)
    108         self.event.set()
    109 
    110 class _AllCompletedWaiter(_Waiter):
    111     """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
    112 
    113     def __init__(self, num_pending_calls, stop_on_exception):
    114         self.num_pending_calls = num_pending_calls
    115         self.stop_on_exception = stop_on_exception
    116         self.lock = threading.Lock()
    117         super(_AllCompletedWaiter, self).__init__()
    118 
    119     def _decrement_pending_calls(self):
    120         with self.lock:
    121             self.num_pending_calls -= 1
    122             if not self.num_pending_calls:
    123                 self.event.set()
    124 
    125     def add_result(self, future):
    126         super(_AllCompletedWaiter, self).add_result(future)
    127         self._decrement_pending_calls()
    128 
    129     def add_exception(self, future):
    130         super(_AllCompletedWaiter, self).add_exception(future)
    131         if self.stop_on_exception:
    132             self.event.set()
    133         else:
    134             self._decrement_pending_calls()
    135 
    136     def add_cancelled(self, future):
    137         super(_AllCompletedWaiter, self).add_cancelled(future)
    138         self._decrement_pending_calls()
    139 
    140 class _AcquireFutures(object):
    141     """A context manager that does an ordered acquire of Future conditions."""
    142 
    143     def __init__(self, futures):
    144         self.futures = sorted(futures, key=id)
    145 
    146     def __enter__(self):
    147         for future in self.futures:
    148             future._condition.acquire()
    149 
    150     def __exit__(self, *args):
    151         for future in self.futures:
    152             future._condition.release()
    153 
    154 def _create_and_install_waiters(fs, return_when):
    155     if return_when == _AS_COMPLETED:
    156         waiter = _AsCompletedWaiter()
    157     elif return_when == FIRST_COMPLETED:
    158         waiter = _FirstCompletedWaiter()
    159     else:
    160         pending_count = sum(
    161                 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
    162 
    163         if return_when == FIRST_EXCEPTION:
    164             waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
    165         elif return_when == ALL_COMPLETED:
    166             waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
    167         else:
    168             raise ValueError("Invalid return condition: %r" % return_when)
    169 
    170     for f in fs:
    171         f._waiters.append(waiter)
    172 
    173     return waiter
    174 
    175 
    176 def _yield_finished_futures(fs, waiter, ref_collect):
    177     """
    178     Iterate on the list *fs*, yielding finished futures one by one in
    179     reverse order.
    180     Before yielding a future, *waiter* is removed from its waiters
    181     and the future is removed from each set in the collection of sets
    182     *ref_collect*.
    183 
    184     The aim of this function is to avoid keeping stale references after
    185     the future is yielded and before the iterator resumes.
    186     """
    187     while fs:
    188         f = fs[-1]
    189         for futures_set in ref_collect:
    190             futures_set.remove(f)
    191         with f._condition:
    192             f._waiters.remove(waiter)
    193         del f
    194         # Careful not to keep a reference to the popped value
    195         yield fs.pop()
    196 
    197 
    198 def as_completed(fs, timeout=None):
    199     """An iterator over the given futures that yields each as it completes.
    200 
    201     Args:
    202         fs: The sequence of Futures (possibly created by different Executors) to
    203             iterate over.
    204         timeout: The maximum number of seconds to wait. If None, then there
    205             is no limit on the wait time.
    206 
    207     Returns:
    208         An iterator that yields the given Futures as they complete (finished or
    209         cancelled). If any given Futures are duplicated, they will be returned
    210         once.
    211 
    212     Raises:
    213         TimeoutError: If the entire result iterator could not be generated
    214             before the given timeout.
    215     """
    216     if timeout is not None:
    217         end_time = timeout + time.time()
    218 
    219     fs = set(fs)
    220     total_futures = len(fs)
    221     with _AcquireFutures(fs):
    222         finished = set(
    223                 f for f in fs
    224                 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
    225         pending = fs - finished
    226         waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
    227     finished = list(finished)
    228     try:
    229         for f in _yield_finished_futures(finished, waiter,
    230                                          ref_collect=(fs,)):
    231             f = [f]
    232             yield f.pop()
    233 
    234         while pending:
    235             if timeout is None:
    236                 wait_timeout = None
    237             else:
    238                 wait_timeout = end_time - time.time()
    239                 if wait_timeout < 0:
    240                     raise TimeoutError(
    241                             '%d (of %d) futures unfinished' % (
    242                             len(pending), total_futures))
    243 
    244             waiter.event.wait(wait_timeout)
    245 
    246             with waiter.lock:
    247                 finished = waiter.finished_futures
    248                 waiter.finished_futures = []
    249                 waiter.event.clear()
    250 
    251             # reverse to keep finishing order
    252             finished.reverse()
    253             for f in _yield_finished_futures(finished, waiter,
    254                                              ref_collect=(fs, pending)):
    255                 f = [f]
    256                 yield f.pop()
    257 
    258     finally:
    259         # Remove waiter from unfinished futures
    260         for f in fs:
    261             with f._condition:
    262                 f._waiters.remove(waiter)
    263 
    264 DoneAndNotDoneFutures = collections.namedtuple(
    265         'DoneAndNotDoneFutures', 'done not_done')
    266 def wait(fs, timeout=None, return_when=ALL_COMPLETED):
    267     """Wait for the futures in the given sequence to complete.
    268 
    269     Args:
    270         fs: The sequence of Futures (possibly created by different Executors) to
    271             wait upon.
    272         timeout: The maximum number of seconds to wait. If None, then there
    273             is no limit on the wait time.
    274         return_when: Indicates when this function should return. The options
    275             are:
    276 
    277             FIRST_COMPLETED - Return when any future finishes or is
    278                               cancelled.
    279             FIRST_EXCEPTION - Return when any future finishes by raising an
    280                               exception. If no future raises an exception
    281                               then it is equivalent to ALL_COMPLETED.
    282             ALL_COMPLETED -   Return when all futures finish or are cancelled.
    283 
    284     Returns:
    285         A named 2-tuple of sets. The first set, named 'done', contains the
    286         futures that completed (is finished or cancelled) before the wait
    287         completed. The second set, named 'not_done', contains uncompleted
    288         futures.
    289     """
    290     with _AcquireFutures(fs):
    291         done = set(f for f in fs
    292                    if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
    293         not_done = set(fs) - done
    294 
    295         if (return_when == FIRST_COMPLETED) and done:
    296             return DoneAndNotDoneFutures(done, not_done)
    297         elif (return_when == FIRST_EXCEPTION) and done:
    298             if any(f for f in done
    299                    if not f.cancelled() and f.exception() is not None):
    300                 return DoneAndNotDoneFutures(done, not_done)
    301 
    302         if len(done) == len(fs):
    303             return DoneAndNotDoneFutures(done, not_done)
    304 
    305         waiter = _create_and_install_waiters(fs, return_when)
    306 
    307     waiter.event.wait(timeout)
    308     for f in fs:
    309         with f._condition:
    310             f._waiters.remove(waiter)
    311 
    312     done.update(waiter.finished_futures)
    313     return DoneAndNotDoneFutures(done, set(fs) - done)
    314 
    315 class Future(object):
    316     """Represents the result of an asynchronous computation."""
    317 
    318     def __init__(self):
    319         """Initializes the future. Should not be called by clients."""
    320         self._condition = threading.Condition()
    321         self._state = PENDING
    322         self._result = None
    323         self._exception = None
    324         self._traceback = None
    325         self._waiters = []
    326         self._done_callbacks = []
    327 
    328     def _invoke_callbacks(self):
    329         for callback in self._done_callbacks:
    330             try:
    331                 callback(self)
    332             except Exception:
    333                 LOGGER.exception('exception calling callback for %r', self)
    334             except BaseException:
    335                 # Explicitly let all other new-style exceptions through so
    336                 # that we can catch all old-style exceptions with a simple
    337                 # "except:" clause below.
    338                 #
    339                 # All old-style exception objects are instances of
    340                 # types.InstanceType, but "except types.InstanceType:" does
    341                 # not catch old-style exceptions for some reason.  Thus, the
    342                 # only way to catch all old-style exceptions without catching
    343                 # any new-style exceptions is to filter out the new-style
    344                 # exceptions, which all derive from BaseException.
    345                 raise
    346             except:
    347                 # Because of the BaseException clause above, this handler only
    348                 # executes for old-style exception objects.
    349                 LOGGER.exception('exception calling callback for %r', self)
    350 
    351     def __repr__(self):
    352         with self._condition:
    353             if self._state == FINISHED:
    354                 if self._exception:
    355                     return '<%s at %#x state=%s raised %s>' % (
    356                         self.__class__.__name__,
    357                         id(self),
    358                         _STATE_TO_DESCRIPTION_MAP[self._state],
    359                         self._exception.__class__.__name__)
    360                 else:
    361                     return '<%s at %#x state=%s returned %s>' % (
    362                         self.__class__.__name__,
    363                         id(self),
    364                         _STATE_TO_DESCRIPTION_MAP[self._state],
    365                         self._result.__class__.__name__)
    366             return '<%s at %#x state=%s>' % (
    367                     self.__class__.__name__,
    368                     id(self),
    369                    _STATE_TO_DESCRIPTION_MAP[self._state])
    370 
    371     def cancel(self):
    372         """Cancel the future if possible.
    373 
    374         Returns True if the future was cancelled, False otherwise. A future
    375         cannot be cancelled if it is running or has already completed.
    376         """
    377         with self._condition:
    378             if self._state in [RUNNING, FINISHED]:
    379                 return False
    380 
    381             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    382                 return True
    383 
    384             self._state = CANCELLED
    385             self._condition.notify_all()
    386 
    387         self._invoke_callbacks()
    388         return True
    389 
    390     def cancelled(self):
    391         """Return True if the future was cancelled."""
    392         with self._condition:
    393             return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
    394 
    395     def running(self):
    396         """Return True if the future is currently executing."""
    397         with self._condition:
    398             return self._state == RUNNING
    399 
    400     def done(self):
    401         """Return True of the future was cancelled or finished executing."""
    402         with self._condition:
    403             return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
    404 
    405     def __get_result(self):
    406         if self._exception:
    407             if isinstance(self._exception, types.InstanceType):
    408                 # The exception is an instance of an old-style class, which
    409                 # means type(self._exception) returns types.ClassType instead
    410                 # of the exception's actual class type.
    411                 exception_type = self._exception.__class__
    412             else:
    413                 exception_type = type(self._exception)
    414             raise exception_type, self._exception, self._traceback
    415         else:
    416             return self._result
    417 
    418     def add_done_callback(self, fn):
    419         """Attaches a callable that will be called when the future finishes.
    420 
    421         Args:
    422             fn: A callable that will be called with this future as its only
    423                 argument when the future completes or is cancelled. The callable
    424                 will always be called by a thread in the same process in which
    425                 it was added. If the future has already completed or been
    426                 cancelled then the callable will be called immediately. These
    427                 callables are called in the order that they were added.
    428         """
    429         with self._condition:
    430             if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
    431                 self._done_callbacks.append(fn)
    432                 return
    433         fn(self)
    434 
    435     def result(self, timeout=None):
    436         """Return the result of the call that the future represents.
    437 
    438         Args:
    439             timeout: The number of seconds to wait for the result if the future
    440                 isn't done. If None, then there is no limit on the wait time.
    441 
    442         Returns:
    443             The result of the call that the future represents.
    444 
    445         Raises:
    446             CancelledError: If the future was cancelled.
    447             TimeoutError: If the future didn't finish executing before the given
    448                 timeout.
    449             Exception: If the call raised then that exception will be raised.
    450         """
    451         with self._condition:
    452             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    453                 raise CancelledError()
    454             elif self._state == FINISHED:
    455                 return self.__get_result()
    456 
    457             self._condition.wait(timeout)
    458 
    459             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    460                 raise CancelledError()
    461             elif self._state == FINISHED:
    462                 return self.__get_result()
    463             else:
    464                 raise TimeoutError()
    465 
    466     def exception_info(self, timeout=None):
    467         """Return a tuple of (exception, traceback) raised by the call that the
    468         future represents.
    469 
    470         Args:
    471             timeout: The number of seconds to wait for the exception if the
    472                 future isn't done. If None, then there is no limit on the wait
    473                 time.
    474 
    475         Returns:
    476             The exception raised by the call that the future represents or None
    477             if the call completed without raising.
    478 
    479         Raises:
    480             CancelledError: If the future was cancelled.
    481             TimeoutError: If the future didn't finish executing before the given
    482                 timeout.
    483         """
    484         with self._condition:
    485             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    486                 raise CancelledError()
    487             elif self._state == FINISHED:
    488                 return self._exception, self._traceback
    489 
    490             self._condition.wait(timeout)
    491 
    492             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    493                 raise CancelledError()
    494             elif self._state == FINISHED:
    495                 return self._exception, self._traceback
    496             else:
    497                 raise TimeoutError()
    498 
    499     def exception(self, timeout=None):
    500         """Return the exception raised by the call that the future represents.
    501 
    502         Args:
    503             timeout: The number of seconds to wait for the exception if the
    504                 future isn't done. If None, then there is no limit on the wait
    505                 time.
    506 
    507         Returns:
    508             The exception raised by the call that the future represents or None
    509             if the call completed without raising.
    510 
    511         Raises:
    512             CancelledError: If the future was cancelled.
    513             TimeoutError: If the future didn't finish executing before the given
    514                 timeout.
    515         """
    516         return self.exception_info(timeout)[0]
    517 
    518     # The following methods should only be used by Executors and in tests.
    519     def set_running_or_notify_cancel(self):
    520         """Mark the future as running or process any cancel notifications.
    521 
    522         Should only be used by Executor implementations and unit tests.
    523 
    524         If the future has been cancelled (cancel() was called and returned
    525         True) then any threads waiting on the future completing (though calls
    526         to as_completed() or wait()) are notified and False is returned.
    527 
    528         If the future was not cancelled then it is put in the running state
    529         (future calls to running() will return True) and True is returned.
    530 
    531         This method should be called by Executor implementations before
    532         executing the work associated with this future. If this method returns
    533         False then the work should not be executed.
    534 
    535         Returns:
    536             False if the Future was cancelled, True otherwise.
    537 
    538         Raises:
    539             RuntimeError: if this method was already called or if set_result()
    540                 or set_exception() was called.
    541         """
    542         with self._condition:
    543             if self._state == CANCELLED:
    544                 self._state = CANCELLED_AND_NOTIFIED
    545                 for waiter in self._waiters:
    546                     waiter.add_cancelled(self)
    547                 # self._condition.notify_all() is not necessary because
    548                 # self.cancel() triggers a notification.
    549                 return False
    550             elif self._state == PENDING:
    551                 self._state = RUNNING
    552                 return True
    553             else:
    554                 LOGGER.critical('Future %s in unexpected state: %s',
    555                                 id(self),
    556                                 self._state)
    557                 raise RuntimeError('Future in unexpected state')
    558 
    559     def set_result(self, result):
    560         """Sets the return value of work associated with the future.
    561 
    562         Should only be used by Executor implementations and unit tests.
    563         """
    564         with self._condition:
    565             self._result = result
    566             self._state = FINISHED
    567             for waiter in self._waiters:
    568                 waiter.add_result(self)
    569             self._condition.notify_all()
    570         self._invoke_callbacks()
    571 
    572     def set_exception_info(self, exception, traceback):
    573         """Sets the result of the future as being the given exception
    574         and traceback.
    575 
    576         Should only be used by Executor implementations and unit tests.
    577         """
    578         with self._condition:
    579             self._exception = exception
    580             self._traceback = traceback
    581             self._state = FINISHED
    582             for waiter in self._waiters:
    583                 waiter.add_exception(self)
    584             self._condition.notify_all()
    585         self._invoke_callbacks()
    586 
    587     def set_exception(self, exception):
    588         """Sets the result of the future as being the given exception.
    589 
    590         Should only be used by Executor implementations and unit tests.
    591         """
    592         self.set_exception_info(exception, None)
    593 
    594 class Executor(object):
    595     """This is an abstract base class for concrete asynchronous executors."""
    596 
    597     def submit(self, fn, *args, **kwargs):
    598         """Submits a callable to be executed with the given arguments.
    599 
    600         Schedules the callable to be executed as fn(*args, **kwargs) and returns
    601         a Future instance representing the execution of the callable.
    602 
    603         Returns:
    604             A Future representing the given call.
    605         """
    606         raise NotImplementedError()
    607 
    608     def map(self, fn, *iterables, **kwargs):
    609         """Returns an iterator equivalent to map(fn, iter).
    610 
    611         Args:
    612             fn: A callable that will take as many arguments as there are
    613                 passed iterables.
    614             timeout: The maximum number of seconds to wait. If None, then there
    615                 is no limit on the wait time.
    616 
    617         Returns:
    618             An iterator equivalent to: map(func, *iterables) but the calls may
    619             be evaluated out-of-order.
    620 
    621         Raises:
    622             TimeoutError: If the entire result iterator could not be generated
    623                 before the given timeout.
    624             Exception: If fn(*args) raises for any values.
    625         """
    626         timeout = kwargs.get('timeout')
    627         if timeout is not None:
    628             end_time = timeout + time.time()
    629 
    630         fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]
    631 
    632         # Yield must be hidden in closure so that the futures are submitted
    633         # before the first iterator value is required.
    634         def result_iterator():
    635             try:
    636                 # reverse to keep finishing order
    637                 fs.reverse()
    638                 while fs:
    639                     # Careful not to keep a reference to the popped future
    640                     if timeout is None:
    641                         yield fs.pop().result()
    642                     else:
    643                         yield fs.pop().result(end_time - time.time())
    644             finally:
    645                 for future in fs:
    646                     future.cancel()
    647         return result_iterator()
    648 
    649     def shutdown(self, wait=True):
    650         """Clean-up the resources associated with the Executor.
    651 
    652         It is safe to call this method several times. Otherwise, no other
    653         methods can be called after this one.
    654 
    655         Args:
    656             wait: If True then shutdown will not return until all running
    657                 futures have finished executing and the resources used by the
    658                 executor have been reclaimed.
    659         """
    660         pass
    661 
    662     def __enter__(self):
    663         return self
    664 
    665     def __exit__(self, exc_type, exc_val, exc_tb):
    666         self.shutdown(wait=True)
    667         return False
    668