Home | History | Annotate | Download | only in futures
      1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
      2 # Licensed to PSF under a Contributor Agreement.
      3 
      4 __author__ = 'Brian Quinlan (brian (at] sweetapp.com)'
      5 
      6 import collections
      7 import logging
      8 import threading
      9 import time
     10 
     11 FIRST_COMPLETED = 'FIRST_COMPLETED'
     12 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
     13 ALL_COMPLETED = 'ALL_COMPLETED'
     14 _AS_COMPLETED = '_AS_COMPLETED'
     15 
     16 # Possible future states (for internal use by the futures package).
     17 PENDING = 'PENDING'
     18 RUNNING = 'RUNNING'
     19 # The future was cancelled by the user...
     20 CANCELLED = 'CANCELLED'
     21 # ...and _Waiter.add_cancelled() was called by a worker.
     22 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
     23 FINISHED = 'FINISHED'
     24 
     25 _FUTURE_STATES = [
     26     PENDING,
     27     RUNNING,
     28     CANCELLED,
     29     CANCELLED_AND_NOTIFIED,
     30     FINISHED
     31 ]
     32 
     33 _STATE_TO_DESCRIPTION_MAP = {
     34     PENDING: "pending",
     35     RUNNING: "running",
     36     CANCELLED: "cancelled",
     37     CANCELLED_AND_NOTIFIED: "cancelled",
     38     FINISHED: "finished"
     39 }
     40 
     41 # Logger for internal use by the futures package.
     42 LOGGER = logging.getLogger("concurrent.futures")
     43 
     44 class Error(Exception):
     45     """Base class for all future-related exceptions."""
     46     pass
     47 
     48 class CancelledError(Error):
     49     """The Future was cancelled."""
     50     pass
     51 
     52 class TimeoutError(Error):
     53     """The operation exceeded the given deadline."""
     54     pass
     55 
     56 class _Waiter(object):
     57     """Provides the event that wait() and as_completed() block on."""
     58     def __init__(self):
     59         self.event = threading.Event()
     60         self.finished_futures = []
     61 
     62     def add_result(self, future):
     63         self.finished_futures.append(future)
     64 
     65     def add_exception(self, future):
     66         self.finished_futures.append(future)
     67 
     68     def add_cancelled(self, future):
     69         self.finished_futures.append(future)
     70 
     71 class _AsCompletedWaiter(_Waiter):
     72     """Used by as_completed()."""
     73 
     74     def __init__(self):
     75         super(_AsCompletedWaiter, self).__init__()
     76         self.lock = threading.Lock()
     77 
     78     def add_result(self, future):
     79         with self.lock:
     80             super(_AsCompletedWaiter, self).add_result(future)
     81             self.event.set()
     82 
     83     def add_exception(self, future):
     84         with self.lock:
     85             super(_AsCompletedWaiter, self).add_exception(future)
     86             self.event.set()
     87 
     88     def add_cancelled(self, future):
     89         with self.lock:
     90             super(_AsCompletedWaiter, self).add_cancelled(future)
     91             self.event.set()
     92 
     93 class _FirstCompletedWaiter(_Waiter):
     94     """Used by wait(return_when=FIRST_COMPLETED)."""
     95 
     96     def add_result(self, future):
     97         super().add_result(future)
     98         self.event.set()
     99 
    100     def add_exception(self, future):
    101         super().add_exception(future)
    102         self.event.set()
    103 
    104     def add_cancelled(self, future):
    105         super().add_cancelled(future)
    106         self.event.set()
    107 
    108 class _AllCompletedWaiter(_Waiter):
    109     """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
    110 
    111     def __init__(self, num_pending_calls, stop_on_exception):
    112         self.num_pending_calls = num_pending_calls
    113         self.stop_on_exception = stop_on_exception
    114         self.lock = threading.Lock()
    115         super().__init__()
    116 
    117     def _decrement_pending_calls(self):
    118         with self.lock:
    119             self.num_pending_calls -= 1
    120             if not self.num_pending_calls:
    121                 self.event.set()
    122 
    123     def add_result(self, future):
    124         super().add_result(future)
    125         self._decrement_pending_calls()
    126 
    127     def add_exception(self, future):
    128         super().add_exception(future)
    129         if self.stop_on_exception:
    130             self.event.set()
    131         else:
    132             self._decrement_pending_calls()
    133 
    134     def add_cancelled(self, future):
    135         super().add_cancelled(future)
    136         self._decrement_pending_calls()
    137 
    138 class _AcquireFutures(object):
    139     """A context manager that does an ordered acquire of Future conditions."""
    140 
    141     def __init__(self, futures):
    142         self.futures = sorted(futures, key=id)
    143 
    144     def __enter__(self):
    145         for future in self.futures:
    146             future._condition.acquire()
    147 
    148     def __exit__(self, *args):
    149         for future in self.futures:
    150             future._condition.release()
    151 
    152 def _create_and_install_waiters(fs, return_when):
    153     if return_when == _AS_COMPLETED:
    154         waiter = _AsCompletedWaiter()
    155     elif return_when == FIRST_COMPLETED:
    156         waiter = _FirstCompletedWaiter()
    157     else:
    158         pending_count = sum(
    159                 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
    160 
    161         if return_when == FIRST_EXCEPTION:
    162             waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
    163         elif return_when == ALL_COMPLETED:
    164             waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
    165         else:
    166             raise ValueError("Invalid return condition: %r" % return_when)
    167 
    168     for f in fs:
    169         f._waiters.append(waiter)
    170 
    171     return waiter
    172 
    173 def as_completed(fs, timeout=None):
    174     """An iterator over the given futures that yields each as it completes.
    175 
    176     Args:
    177         fs: The sequence of Futures (possibly created by different Executors) to
    178             iterate over.
    179         timeout: The maximum number of seconds to wait. If None, then there
    180             is no limit on the wait time.
    181 
    182     Returns:
    183         An iterator that yields the given Futures as they complete (finished or
    184         cancelled). If any given Futures are duplicated, they will be returned
    185         once.
    186 
    187     Raises:
    188         TimeoutError: If the entire result iterator could not be generated
    189             before the given timeout.
    190     """
    191     if timeout is not None:
    192         end_time = timeout + time.time()
    193 
    194     fs = set(fs)
    195     with _AcquireFutures(fs):
    196         finished = set(
    197                 f for f in fs
    198                 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
    199         pending = fs - finished
    200         waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
    201 
    202     try:
    203         yield from finished
    204 
    205         while pending:
    206             if timeout is None:
    207                 wait_timeout = None
    208             else:
    209                 wait_timeout = end_time - time.time()
    210                 if wait_timeout < 0:
    211                     raise TimeoutError(
    212                             '%d (of %d) futures unfinished' % (
    213                             len(pending), len(fs)))
    214 
    215             waiter.event.wait(wait_timeout)
    216 
    217             with waiter.lock:
    218                 finished = waiter.finished_futures
    219                 waiter.finished_futures = []
    220                 waiter.event.clear()
    221 
    222             for future in finished:
    223                 yield future
    224                 pending.remove(future)
    225 
    226     finally:
    227         for f in fs:
    228             with f._condition:
    229                 f._waiters.remove(waiter)
    230 
    231 DoneAndNotDoneFutures = collections.namedtuple(
    232         'DoneAndNotDoneFutures', 'done not_done')
    233 def wait(fs, timeout=None, return_when=ALL_COMPLETED):
    234     """Wait for the futures in the given sequence to complete.
    235 
    236     Args:
    237         fs: The sequence of Futures (possibly created by different Executors) to
    238             wait upon.
    239         timeout: The maximum number of seconds to wait. If None, then there
    240             is no limit on the wait time.
    241         return_when: Indicates when this function should return. The options
    242             are:
    243 
    244             FIRST_COMPLETED - Return when any future finishes or is
    245                               cancelled.
    246             FIRST_EXCEPTION - Return when any future finishes by raising an
    247                               exception. If no future raises an exception
    248                               then it is equivalent to ALL_COMPLETED.
    249             ALL_COMPLETED -   Return when all futures finish or are cancelled.
    250 
    251     Returns:
    252         A named 2-tuple of sets. The first set, named 'done', contains the
    253         futures that completed (is finished or cancelled) before the wait
    254         completed. The second set, named 'not_done', contains uncompleted
    255         futures.
    256     """
    257     with _AcquireFutures(fs):
    258         done = set(f for f in fs
    259                    if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
    260         not_done = set(fs) - done
    261 
    262         if (return_when == FIRST_COMPLETED) and done:
    263             return DoneAndNotDoneFutures(done, not_done)
    264         elif (return_when == FIRST_EXCEPTION) and done:
    265             if any(f for f in done
    266                    if not f.cancelled() and f.exception() is not None):
    267                 return DoneAndNotDoneFutures(done, not_done)
    268 
    269         if len(done) == len(fs):
    270             return DoneAndNotDoneFutures(done, not_done)
    271 
    272         waiter = _create_and_install_waiters(fs, return_when)
    273 
    274     waiter.event.wait(timeout)
    275     for f in fs:
    276         with f._condition:
    277             f._waiters.remove(waiter)
    278 
    279     done.update(waiter.finished_futures)
    280     return DoneAndNotDoneFutures(done, set(fs) - done)
    281 
    282 class Future(object):
    283     """Represents the result of an asynchronous computation."""
    284 
    285     def __init__(self):
    286         """Initializes the future. Should not be called by clients."""
    287         self._condition = threading.Condition()
    288         self._state = PENDING
    289         self._result = None
    290         self._exception = None
    291         self._waiters = []
    292         self._done_callbacks = []
    293 
    294     def _invoke_callbacks(self):
    295         for callback in self._done_callbacks:
    296             try:
    297                 callback(self)
    298             except Exception:
    299                 LOGGER.exception('exception calling callback for %r', self)
    300 
    301     def __repr__(self):
    302         with self._condition:
    303             if self._state == FINISHED:
    304                 if self._exception:
    305                     return '<%s at %#x state=%s raised %s>' % (
    306                         self.__class__.__name__,
    307                         id(self),
    308                         _STATE_TO_DESCRIPTION_MAP[self._state],
    309                         self._exception.__class__.__name__)
    310                 else:
    311                     return '<%s at %#x state=%s returned %s>' % (
    312                         self.__class__.__name__,
    313                         id(self),
    314                         _STATE_TO_DESCRIPTION_MAP[self._state],
    315                         self._result.__class__.__name__)
    316             return '<%s at %#x state=%s>' % (
    317                     self.__class__.__name__,
    318                     id(self),
    319                    _STATE_TO_DESCRIPTION_MAP[self._state])
    320 
    321     def cancel(self):
    322         """Cancel the future if possible.
    323 
    324         Returns True if the future was cancelled, False otherwise. A future
    325         cannot be cancelled if it is running or has already completed.
    326         """
    327         with self._condition:
    328             if self._state in [RUNNING, FINISHED]:
    329                 return False
    330 
    331             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    332                 return True
    333 
    334             self._state = CANCELLED
    335             self._condition.notify_all()
    336 
    337         self._invoke_callbacks()
    338         return True
    339 
    340     def cancelled(self):
    341         """Return True if the future was cancelled."""
    342         with self._condition:
    343             return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
    344 
    345     def running(self):
    346         """Return True if the future is currently executing."""
    347         with self._condition:
    348             return self._state == RUNNING
    349 
    350     def done(self):
    351         """Return True of the future was cancelled or finished executing."""
    352         with self._condition:
    353             return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
    354 
    355     def __get_result(self):
    356         if self._exception:
    357             raise self._exception
    358         else:
    359             return self._result
    360 
    361     def add_done_callback(self, fn):
    362         """Attaches a callable that will be called when the future finishes.
    363 
    364         Args:
    365             fn: A callable that will be called with this future as its only
    366                 argument when the future completes or is cancelled. The callable
    367                 will always be called by a thread in the same process in which
    368                 it was added. If the future has already completed or been
    369                 cancelled then the callable will be called immediately. These
    370                 callables are called in the order that they were added.
    371         """
    372         with self._condition:
    373             if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
    374                 self._done_callbacks.append(fn)
    375                 return
    376         fn(self)
    377 
    378     def result(self, timeout=None):
    379         """Return the result of the call that the future represents.
    380 
    381         Args:
    382             timeout: The number of seconds to wait for the result if the future
    383                 isn't done. If None, then there is no limit on the wait time.
    384 
    385         Returns:
    386             The result of the call that the future represents.
    387 
    388         Raises:
    389             CancelledError: If the future was cancelled.
    390             TimeoutError: If the future didn't finish executing before the given
    391                 timeout.
    392             Exception: If the call raised then that exception will be raised.
    393         """
    394         with self._condition:
    395             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    396                 raise CancelledError()
    397             elif self._state == FINISHED:
    398                 return self.__get_result()
    399 
    400             self._condition.wait(timeout)
    401 
    402             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    403                 raise CancelledError()
    404             elif self._state == FINISHED:
    405                 return self.__get_result()
    406             else:
    407                 raise TimeoutError()
    408 
    409     def exception(self, timeout=None):
    410         """Return the exception raised by the call that the future represents.
    411 
    412         Args:
    413             timeout: The number of seconds to wait for the exception if the
    414                 future isn't done. If None, then there is no limit on the wait
    415                 time.
    416 
    417         Returns:
    418             The exception raised by the call that the future represents or None
    419             if the call completed without raising.
    420 
    421         Raises:
    422             CancelledError: If the future was cancelled.
    423             TimeoutError: If the future didn't finish executing before the given
    424                 timeout.
    425         """
    426 
    427         with self._condition:
    428             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    429                 raise CancelledError()
    430             elif self._state == FINISHED:
    431                 return self._exception
    432 
    433             self._condition.wait(timeout)
    434 
    435             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    436                 raise CancelledError()
    437             elif self._state == FINISHED:
    438                 return self._exception
    439             else:
    440                 raise TimeoutError()
    441 
    442     # The following methods should only be used by Executors and in tests.
    443     def set_running_or_notify_cancel(self):
    444         """Mark the future as running or process any cancel notifications.
    445 
    446         Should only be used by Executor implementations and unit tests.
    447 
    448         If the future has been cancelled (cancel() was called and returned
    449         True) then any threads waiting on the future completing (though calls
    450         to as_completed() or wait()) are notified and False is returned.
    451 
    452         If the future was not cancelled then it is put in the running state
    453         (future calls to running() will return True) and True is returned.
    454 
    455         This method should be called by Executor implementations before
    456         executing the work associated with this future. If this method returns
    457         False then the work should not be executed.
    458 
    459         Returns:
    460             False if the Future was cancelled, True otherwise.
    461 
    462         Raises:
    463             RuntimeError: if this method was already called or if set_result()
    464                 or set_exception() was called.
    465         """
    466         with self._condition:
    467             if self._state == CANCELLED:
    468                 self._state = CANCELLED_AND_NOTIFIED
    469                 for waiter in self._waiters:
    470                     waiter.add_cancelled(self)
    471                 # self._condition.notify_all() is not necessary because
    472                 # self.cancel() triggers a notification.
    473                 return False
    474             elif self._state == PENDING:
    475                 self._state = RUNNING
    476                 return True
    477             else:
    478                 LOGGER.critical('Future %s in unexpected state: %s',
    479                                 id(self),
    480                                 self._state)
    481                 raise RuntimeError('Future in unexpected state')
    482 
    483     def set_result(self, result):
    484         """Sets the return value of work associated with the future.
    485 
    486         Should only be used by Executor implementations and unit tests.
    487         """
    488         with self._condition:
    489             self._result = result
    490             self._state = FINISHED
    491             for waiter in self._waiters:
    492                 waiter.add_result(self)
    493             self._condition.notify_all()
    494         self._invoke_callbacks()
    495 
    496     def set_exception(self, exception):
    497         """Sets the result of the future as being the given exception.
    498 
    499         Should only be used by Executor implementations and unit tests.
    500         """
    501         with self._condition:
    502             self._exception = exception
    503             self._state = FINISHED
    504             for waiter in self._waiters:
    505                 waiter.add_exception(self)
    506             self._condition.notify_all()
    507         self._invoke_callbacks()
    508 
    509 class Executor(object):
    510     """This is an abstract base class for concrete asynchronous executors."""
    511 
    512     def submit(self, fn, *args, **kwargs):
    513         """Submits a callable to be executed with the given arguments.
    514 
    515         Schedules the callable to be executed as fn(*args, **kwargs) and returns
    516         a Future instance representing the execution of the callable.
    517 
    518         Returns:
    519             A Future representing the given call.
    520         """
    521         raise NotImplementedError()
    522 
    523     def map(self, fn, *iterables, timeout=None, chunksize=1):
    524         """Returns an iterator equivalent to map(fn, iter).
    525 
    526         Args:
    527             fn: A callable that will take as many arguments as there are
    528                 passed iterables.
    529             timeout: The maximum number of seconds to wait. If None, then there
    530                 is no limit on the wait time.
    531             chunksize: The size of the chunks the iterable will be broken into
    532                 before being passed to a child process. This argument is only
    533                 used by ProcessPoolExecutor; it is ignored by
    534                 ThreadPoolExecutor.
    535 
    536         Returns:
    537             An iterator equivalent to: map(func, *iterables) but the calls may
    538             be evaluated out-of-order.
    539 
    540         Raises:
    541             TimeoutError: If the entire result iterator could not be generated
    542                 before the given timeout.
    543             Exception: If fn(*args) raises for any values.
    544         """
    545         if timeout is not None:
    546             end_time = timeout + time.time()
    547 
    548         fs = [self.submit(fn, *args) for args in zip(*iterables)]
    549 
    550         # Yield must be hidden in closure so that the futures are submitted
    551         # before the first iterator value is required.
    552         def result_iterator():
    553             try:
    554                 for future in fs:
    555                     if timeout is None:
    556                         yield future.result()
    557                     else:
    558                         yield future.result(end_time - time.time())
    559             finally:
    560                 for future in fs:
    561                     future.cancel()
    562         return result_iterator()
    563 
    564     def shutdown(self, wait=True):
    565         """Clean-up the resources associated with the Executor.
    566 
    567         It is safe to call this method several times. Otherwise, no other
    568         methods can be called after this one.
    569 
    570         Args:
    571             wait: If True then shutdown will not return until all running
    572                 futures have finished executing and the resources used by the
    573                 executor have been reclaimed.
    574         """
    575         pass
    576 
    577     def __enter__(self):
    578         return self
    579 
    580     def __exit__(self, exc_type, exc_val, exc_tb):
    581         self.shutdown(wait=True)
    582         return False
    583