Home | History | Annotate | Download | only in asyncio
      1 """A Future class similar to the one in PEP 3148."""
      2 
      3 __all__ = ['CancelledError', 'TimeoutError', 'InvalidStateError',
      4            'Future', 'wrap_future', 'isfuture']
      5 
      6 import concurrent.futures
      7 import logging
      8 import sys
      9 import traceback
     10 
     11 from . import base_futures
     12 from . import compat
     13 from . import events
     14 
     15 
     16 CancelledError = base_futures.CancelledError
     17 InvalidStateError = base_futures.InvalidStateError
     18 TimeoutError = base_futures.TimeoutError
     19 isfuture = base_futures.isfuture
     20 
     21 
     22 _PENDING = base_futures._PENDING
     23 _CANCELLED = base_futures._CANCELLED
     24 _FINISHED = base_futures._FINISHED
     25 
     26 
     27 STACK_DEBUG = logging.DEBUG - 1  # heavy-duty debugging
     28 
     29 
     30 class _TracebackLogger:
     31     """Helper to log a traceback upon destruction if not cleared.
     32 
     33     This solves a nasty problem with Futures and Tasks that have an
     34     exception set: if nobody asks for the exception, the exception is
     35     never logged.  This violates the Zen of Python: 'Errors should
     36     never pass silently.  Unless explicitly silenced.'
     37 
     38     However, we don't want to log the exception as soon as
     39     set_exception() is called: if the calling code is written
     40     properly, it will get the exception and handle it properly.  But
     41     we *do* want to log it if result() or exception() was never called
     42     -- otherwise developers waste a lot of time wondering why their
     43     buggy code fails silently.
     44 
     45     An earlier attempt added a __del__() method to the Future class
     46     itself, but this backfired because the presence of __del__()
     47     prevents garbage collection from breaking cycles.  A way out of
     48     this catch-22 is to avoid having a __del__() method on the Future
     49     class itself, but instead to have a reference to a helper object
     50     with a __del__() method that logs the traceback, where we ensure
     51     that the helper object doesn't participate in cycles, and only the
     52     Future has a reference to it.
     53 
     54     The helper object is added when set_exception() is called.  When
     55     the Future is collected, and the helper is present, the helper
     56     object is also collected, and its __del__() method will log the
     57     traceback.  When the Future's result() or exception() method is
     58     called (and a helper object is present), it removes the helper
     59     object, after calling its clear() method to prevent it from
     60     logging.
     61 
     62     One downside is that we do a fair amount of work to extract the
     63     traceback from the exception, even when it is never logged.  It
     64     would seem cheaper to just store the exception object, but that
     65     references the traceback, which references stack frames, which may
     66     reference the Future, which references the _TracebackLogger, and
     67     then the _TracebackLogger would be included in a cycle, which is
     68     what we're trying to avoid!  As an optimization, we don't
     69     immediately format the exception; we only do the work when
     70     activate() is called, which call is delayed until after all the
     71     Future's callbacks have run.  Since usually a Future has at least
     72     one callback (typically set by 'yield from') and usually that
     73     callback extracts the callback, thereby removing the need to
     74     format the exception.
     75 
     76     PS. I don't claim credit for this solution.  I first heard of it
     77     in a discussion about closing files when they are collected.
     78     """
     79 
     80     __slots__ = ('loop', 'source_traceback', 'exc', 'tb')
     81 
     82     def __init__(self, future, exc):
     83         self.loop = future._loop
     84         self.source_traceback = future._source_traceback
     85         self.exc = exc
     86         self.tb = None
     87 
     88     def activate(self):
     89         exc = self.exc
     90         if exc is not None:
     91             self.exc = None
     92             self.tb = traceback.format_exception(exc.__class__, exc,
     93                                                  exc.__traceback__)
     94 
     95     def clear(self):
     96         self.exc = None
     97         self.tb = None
     98 
     99     def __del__(self):
    100         if self.tb:
    101             msg = 'Future/Task exception was never retrieved\n'
    102             if self.source_traceback:
    103                 src = ''.join(traceback.format_list(self.source_traceback))
    104                 msg += 'Future/Task created at (most recent call last):\n'
    105                 msg += '%s\n' % src.rstrip()
    106             msg += ''.join(self.tb).rstrip()
    107             self.loop.call_exception_handler({'message': msg})
    108 
    109 
    110 class Future:
    111     """This class is *almost* compatible with concurrent.futures.Future.
    112 
    113     Differences:
    114 
    115     - result() and exception() do not take a timeout argument and
    116       raise an exception when the future isn't done yet.
    117 
    118     - Callbacks registered with add_done_callback() are always called
    119       via the event loop's call_soon_threadsafe().
    120 
    121     - This class is not compatible with the wait() and as_completed()
    122       methods in the concurrent.futures package.
    123 
    124     (In Python 3.4 or later we may be able to unify the implementations.)
    125     """
    126 
    127     # Class variables serving as defaults for instance variables.
    128     _state = _PENDING
    129     _result = None
    130     _exception = None
    131     _loop = None
    132     _source_traceback = None
    133 
    134     # This field is used for a dual purpose:
    135     # - Its presence is a marker to declare that a class implements
    136     #   the Future protocol (i.e. is intended to be duck-type compatible).
    137     #   The value must also be not-None, to enable a subclass to declare
    138     #   that it is not compatible by setting this to None.
    139     # - It is set by __iter__() below so that Task._step() can tell
    140     #   the difference between `yield from Future()` (correct) vs.
    141     #   `yield Future()` (incorrect).
    142     _asyncio_future_blocking = False
    143 
    144     _log_traceback = False   # Used for Python 3.4 and later
    145     _tb_logger = None        # Used for Python 3.3 only
    146 
    147     def __init__(self, *, loop=None):
    148         """Initialize the future.
    149 
    150         The optional event_loop argument allows explicitly setting the event
    151         loop object used by the future. If it's not provided, the future uses
    152         the default event loop.
    153         """
    154         if loop is None:
    155             self._loop = events.get_event_loop()
    156         else:
    157             self._loop = loop
    158         self._callbacks = []
    159         if self._loop.get_debug():
    160             self._source_traceback = traceback.extract_stack(sys._getframe(1))
    161 
    162     _repr_info = base_futures._future_repr_info
    163 
    164     def __repr__(self):
    165         return '<%s %s>' % (self.__class__.__name__, ' '.join(self._repr_info()))
    166 
    167     # On Python 3.3 and older, objects with a destructor part of a reference
    168     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
    169     # to the PEP 442.
    170     if compat.PY34:
    171         def __del__(self):
    172             if not self._log_traceback:
    173                 # set_exception() was not called, or result() or exception()
    174                 # has consumed the exception
    175                 return
    176             exc = self._exception
    177             context = {
    178                 'message': ('%s exception was never retrieved'
    179                             % self.__class__.__name__),
    180                 'exception': exc,
    181                 'future': self,
    182             }
    183             if self._source_traceback:
    184                 context['source_traceback'] = self._source_traceback
    185             self._loop.call_exception_handler(context)
    186 
    187     def cancel(self):
    188         """Cancel the future and schedule callbacks.
    189 
    190         If the future is already done or cancelled, return False.  Otherwise,
    191         change the future's state to cancelled, schedule the callbacks and
    192         return True.
    193         """
    194         if self._state != _PENDING:
    195             return False
    196         self._state = _CANCELLED
    197         self._schedule_callbacks()
    198         return True
    199 
    200     def _schedule_callbacks(self):
    201         """Internal: Ask the event loop to call all callbacks.
    202 
    203         The callbacks are scheduled to be called as soon as possible. Also
    204         clears the callback list.
    205         """
    206         callbacks = self._callbacks[:]
    207         if not callbacks:
    208             return
    209 
    210         self._callbacks[:] = []
    211         for callback in callbacks:
    212             self._loop.call_soon(callback, self)
    213 
    214     def cancelled(self):
    215         """Return True if the future was cancelled."""
    216         return self._state == _CANCELLED
    217 
    218     # Don't implement running(); see http://bugs.python.org/issue18699
    219 
    220     def done(self):
    221         """Return True if the future is done.
    222 
    223         Done means either that a result / exception are available, or that the
    224         future was cancelled.
    225         """
    226         return self._state != _PENDING
    227 
    228     def result(self):
    229         """Return the result this future represents.
    230 
    231         If the future has been cancelled, raises CancelledError.  If the
    232         future's result isn't yet available, raises InvalidStateError.  If
    233         the future is done and has an exception set, this exception is raised.
    234         """
    235         if self._state == _CANCELLED:
    236             raise CancelledError
    237         if self._state != _FINISHED:
    238             raise InvalidStateError('Result is not ready.')
    239         self._log_traceback = False
    240         if self._tb_logger is not None:
    241             self._tb_logger.clear()
    242             self._tb_logger = None
    243         if self._exception is not None:
    244             raise self._exception
    245         return self._result
    246 
    247     def exception(self):
    248         """Return the exception that was set on this future.
    249 
    250         The exception (or None if no exception was set) is returned only if
    251         the future is done.  If the future has been cancelled, raises
    252         CancelledError.  If the future isn't done yet, raises
    253         InvalidStateError.
    254         """
    255         if self._state == _CANCELLED:
    256             raise CancelledError
    257         if self._state != _FINISHED:
    258             raise InvalidStateError('Exception is not set.')
    259         self._log_traceback = False
    260         if self._tb_logger is not None:
    261             self._tb_logger.clear()
    262             self._tb_logger = None
    263         return self._exception
    264 
    265     def add_done_callback(self, fn):
    266         """Add a callback to be run when the future becomes done.
    267 
    268         The callback is called with a single argument - the future object. If
    269         the future is already done when this is called, the callback is
    270         scheduled with call_soon.
    271         """
    272         if self._state != _PENDING:
    273             self._loop.call_soon(fn, self)
    274         else:
    275             self._callbacks.append(fn)
    276 
    277     # New method not in PEP 3148.
    278 
    279     def remove_done_callback(self, fn):
    280         """Remove all instances of a callback from the "call when done" list.
    281 
    282         Returns the number of callbacks removed.
    283         """
    284         filtered_callbacks = [f for f in self._callbacks if f != fn]
    285         removed_count = len(self._callbacks) - len(filtered_callbacks)
    286         if removed_count:
    287             self._callbacks[:] = filtered_callbacks
    288         return removed_count
    289 
    290     # So-called internal methods (note: no set_running_or_notify_cancel()).
    291 
    292     def set_result(self, result):
    293         """Mark the future done and set its result.
    294 
    295         If the future is already done when this method is called, raises
    296         InvalidStateError.
    297         """
    298         if self._state != _PENDING:
    299             raise InvalidStateError('{}: {!r}'.format(self._state, self))
    300         self._result = result
    301         self._state = _FINISHED
    302         self._schedule_callbacks()
    303 
    304     def set_exception(self, exception):
    305         """Mark the future done and set an exception.
    306 
    307         If the future is already done when this method is called, raises
    308         InvalidStateError.
    309         """
    310         if self._state != _PENDING:
    311             raise InvalidStateError('{}: {!r}'.format(self._state, self))
    312         if isinstance(exception, type):
    313             exception = exception()
    314         if type(exception) is StopIteration:
    315             raise TypeError("StopIteration interacts badly with generators "
    316                             "and cannot be raised into a Future")
    317         self._exception = exception
    318         self._state = _FINISHED
    319         self._schedule_callbacks()
    320         if compat.PY34:
    321             self._log_traceback = True
    322         else:
    323             self._tb_logger = _TracebackLogger(self, exception)
    324             # Arrange for the logger to be activated after all callbacks
    325             # have had a chance to call result() or exception().
    326             self._loop.call_soon(self._tb_logger.activate)
    327 
    328     def __iter__(self):
    329         if not self.done():
    330             self._asyncio_future_blocking = True
    331             yield self  # This tells Task to wait for completion.
    332         assert self.done(), "yield from wasn't used with future"
    333         return self.result()  # May raise too.
    334 
    335     if compat.PY35:
    336         __await__ = __iter__ # make compatible with 'await' expression
    337 
    338 
    339 # Needed for testing purposes.
    340 _PyFuture = Future
    341 
    342 
    343 def _set_result_unless_cancelled(fut, result):
    344     """Helper setting the result only if the future was not cancelled."""
    345     if fut.cancelled():
    346         return
    347     fut.set_result(result)
    348 
    349 
    350 def _set_concurrent_future_state(concurrent, source):
    351     """Copy state from a future to a concurrent.futures.Future."""
    352     assert source.done()
    353     if source.cancelled():
    354         concurrent.cancel()
    355     if not concurrent.set_running_or_notify_cancel():
    356         return
    357     exception = source.exception()
    358     if exception is not None:
    359         concurrent.set_exception(exception)
    360     else:
    361         result = source.result()
    362         concurrent.set_result(result)
    363 
    364 
    365 def _copy_future_state(source, dest):
    366     """Internal helper to copy state from another Future.
    367 
    368     The other Future may be a concurrent.futures.Future.
    369     """
    370     assert source.done()
    371     if dest.cancelled():
    372         return
    373     assert not dest.done()
    374     if source.cancelled():
    375         dest.cancel()
    376     else:
    377         exception = source.exception()
    378         if exception is not None:
    379             dest.set_exception(exception)
    380         else:
    381             result = source.result()
    382             dest.set_result(result)
    383 
    384 
    385 def _chain_future(source, destination):
    386     """Chain two futures so that when one completes, so does the other.
    387 
    388     The result (or exception) of source will be copied to destination.
    389     If destination is cancelled, source gets cancelled too.
    390     Compatible with both asyncio.Future and concurrent.futures.Future.
    391     """
    392     if not isfuture(source) and not isinstance(source,
    393                                                concurrent.futures.Future):
    394         raise TypeError('A future is required for source argument')
    395     if not isfuture(destination) and not isinstance(destination,
    396                                                     concurrent.futures.Future):
    397         raise TypeError('A future is required for destination argument')
    398     source_loop = source._loop if isfuture(source) else None
    399     dest_loop = destination._loop if isfuture(destination) else None
    400 
    401     def _set_state(future, other):
    402         if isfuture(future):
    403             _copy_future_state(other, future)
    404         else:
    405             _set_concurrent_future_state(future, other)
    406 
    407     def _call_check_cancel(destination):
    408         if destination.cancelled():
    409             if source_loop is None or source_loop is dest_loop:
    410                 source.cancel()
    411             else:
    412                 source_loop.call_soon_threadsafe(source.cancel)
    413 
    414     def _call_set_state(source):
    415         if dest_loop is None or dest_loop is source_loop:
    416             _set_state(destination, source)
    417         else:
    418             dest_loop.call_soon_threadsafe(_set_state, destination, source)
    419 
    420     destination.add_done_callback(_call_check_cancel)
    421     source.add_done_callback(_call_set_state)
    422 
    423 
    424 def wrap_future(future, *, loop=None):
    425     """Wrap concurrent.futures.Future object."""
    426     if isfuture(future):
    427         return future
    428     assert isinstance(future, concurrent.futures.Future), \
    429         'concurrent.futures.Future is expected, got {!r}'.format(future)
    430     if loop is None:
    431         loop = events.get_event_loop()
    432     new_future = loop.create_future()
    433     _chain_future(future, new_future)
    434     return new_future
    435 
    436 
    437 try:
    438     import _asyncio
    439 except ImportError:
    440     pass
    441 else:
    442     # _CFuture is needed for tests.
    443     Future = _CFuture = _asyncio.Future
    444