Home | History | Annotate | Download | only in asyncio
      1 """Support for tasks, coroutines and the scheduler."""
      2 
      3 __all__ = (
      4     'Task', 'create_task',
      5     'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
      6     'wait', 'wait_for', 'as_completed', 'sleep',
      7     'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
      8     'current_task', 'all_tasks',
      9     '_register_task', '_unregister_task', '_enter_task', '_leave_task',
     10 )
     11 
     12 import concurrent.futures
     13 import contextvars
     14 import functools
     15 import inspect
     16 import types
     17 import warnings
     18 import weakref
     19 
     20 from . import base_tasks
     21 from . import coroutines
     22 from . import events
     23 from . import futures
     24 from .coroutines import coroutine
     25 
     26 
     27 def current_task(loop=None):
     28     """Return a currently executed task."""
     29     if loop is None:
     30         loop = events.get_running_loop()
     31     return _current_tasks.get(loop)
     32 
     33 
     34 def all_tasks(loop=None):
     35     """Return a set of all tasks for the loop."""
     36     if loop is None:
     37         loop = events.get_running_loop()
     38     # NB: set(_all_tasks) is required to protect
     39     # from https://bugs.python.org/issue34970 bug
     40     return {t for t in list(_all_tasks)
     41             if futures._get_loop(t) is loop and not t.done()}
     42 
     43 
     44 def _all_tasks_compat(loop=None):
     45     # Different from "all_task()" by returning *all* Tasks, including
     46     # the completed ones.  Used to implement deprecated "Tasks.all_task()"
     47     # method.
     48     if loop is None:
     49         loop = events.get_event_loop()
     50     # NB: set(_all_tasks) is required to protect
     51     # from https://bugs.python.org/issue34970 bug
     52     return {t for t in list(_all_tasks) if futures._get_loop(t) is loop}
     53 
     54 
     55 class Task(futures._PyFuture):  # Inherit Python Task implementation
     56                                 # from a Python Future implementation.
     57 
     58     """A coroutine wrapped in a Future."""
     59 
     60     # An important invariant maintained while a Task not done:
     61     #
     62     # - Either _fut_waiter is None, and _step() is scheduled;
     63     # - or _fut_waiter is some Future, and _step() is *not* scheduled.
     64     #
     65     # The only transition from the latter to the former is through
     66     # _wakeup().  When _fut_waiter is not None, one of its callbacks
     67     # must be _wakeup().
     68 
     69     # If False, don't log a message if the task is destroyed whereas its
     70     # status is still pending
     71     _log_destroy_pending = True
     72 
     73     @classmethod
     74     def current_task(cls, loop=None):
     75         """Return the currently running task in an event loop or None.
     76 
     77         By default the current task for the current event loop is returned.
     78 
     79         None is returned when called not in the context of a Task.
     80         """
     81         warnings.warn("Task.current_task() is deprecated, "
     82                       "use asyncio.current_task() instead",
     83                       PendingDeprecationWarning,
     84                       stacklevel=2)
     85         if loop is None:
     86             loop = events.get_event_loop()
     87         return current_task(loop)
     88 
     89     @classmethod
     90     def all_tasks(cls, loop=None):
     91         """Return a set of all tasks for an event loop.
     92 
     93         By default all tasks for the current event loop are returned.
     94         """
     95         warnings.warn("Task.all_tasks() is deprecated, "
     96                       "use asyncio.all_tasks() instead",
     97                       PendingDeprecationWarning,
     98                       stacklevel=2)
     99         return _all_tasks_compat(loop)
    100 
    101     def __init__(self, coro, *, loop=None):
    102         super().__init__(loop=loop)
    103         if self._source_traceback:
    104             del self._source_traceback[-1]
    105         if not coroutines.iscoroutine(coro):
    106             # raise after Future.__init__(), attrs are required for __del__
    107             # prevent logging for pending task in __del__
    108             self._log_destroy_pending = False
    109             raise TypeError(f"a coroutine was expected, got {coro!r}")
    110 
    111         self._must_cancel = False
    112         self._fut_waiter = None
    113         self._coro = coro
    114         self._context = contextvars.copy_context()
    115 
    116         self._loop.call_soon(self.__step, context=self._context)
    117         _register_task(self)
    118 
    119     def __del__(self):
    120         if self._state == futures._PENDING and self._log_destroy_pending:
    121             context = {
    122                 'task': self,
    123                 'message': 'Task was destroyed but it is pending!',
    124             }
    125             if self._source_traceback:
    126                 context['source_traceback'] = self._source_traceback
    127             self._loop.call_exception_handler(context)
    128         super().__del__()
    129 
    130     def _repr_info(self):
    131         return base_tasks._task_repr_info(self)
    132 
    133     def set_result(self, result):
    134         raise RuntimeError('Task does not support set_result operation')
    135 
    136     def set_exception(self, exception):
    137         raise RuntimeError('Task does not support set_exception operation')
    138 
    139     def get_stack(self, *, limit=None):
    140         """Return the list of stack frames for this task's coroutine.
    141 
    142         If the coroutine is not done, this returns the stack where it is
    143         suspended.  If the coroutine has completed successfully or was
    144         cancelled, this returns an empty list.  If the coroutine was
    145         terminated by an exception, this returns the list of traceback
    146         frames.
    147 
    148         The frames are always ordered from oldest to newest.
    149 
    150         The optional limit gives the maximum number of frames to
    151         return; by default all available frames are returned.  Its
    152         meaning differs depending on whether a stack or a traceback is
    153         returned: the newest frames of a stack are returned, but the
    154         oldest frames of a traceback are returned.  (This matches the
    155         behavior of the traceback module.)
    156 
    157         For reasons beyond our control, only one stack frame is
    158         returned for a suspended coroutine.
    159         """
    160         return base_tasks._task_get_stack(self, limit)
    161 
    162     def print_stack(self, *, limit=None, file=None):
    163         """Print the stack or traceback for this task's coroutine.
    164 
    165         This produces output similar to that of the traceback module,
    166         for the frames retrieved by get_stack().  The limit argument
    167         is passed to get_stack().  The file argument is an I/O stream
    168         to which the output is written; by default output is written
    169         to sys.stderr.
    170         """
    171         return base_tasks._task_print_stack(self, limit, file)
    172 
    173     def cancel(self):
    174         """Request that this task cancel itself.
    175 
    176         This arranges for a CancelledError to be thrown into the
    177         wrapped coroutine on the next cycle through the event loop.
    178         The coroutine then has a chance to clean up or even deny
    179         the request using try/except/finally.
    180 
    181         Unlike Future.cancel, this does not guarantee that the
    182         task will be cancelled: the exception might be caught and
    183         acted upon, delaying cancellation of the task or preventing
    184         cancellation completely.  The task may also return a value or
    185         raise a different exception.
    186 
    187         Immediately after this method is called, Task.cancelled() will
    188         not return True (unless the task was already cancelled).  A
    189         task will be marked as cancelled when the wrapped coroutine
    190         terminates with a CancelledError exception (even if cancel()
    191         was not called).
    192         """
    193         self._log_traceback = False
    194         if self.done():
    195             return False
    196         if self._fut_waiter is not None:
    197             if self._fut_waiter.cancel():
    198                 # Leave self._fut_waiter; it may be a Task that
    199                 # catches and ignores the cancellation so we may have
    200                 # to cancel it again later.
    201                 return True
    202         # It must be the case that self.__step is already scheduled.
    203         self._must_cancel = True
    204         return True
    205 
    206     def __step(self, exc=None):
    207         if self.done():
    208             raise futures.InvalidStateError(
    209                 f'_step(): already done: {self!r}, {exc!r}')
    210         if self._must_cancel:
    211             if not isinstance(exc, futures.CancelledError):
    212                 exc = futures.CancelledError()
    213             self._must_cancel = False
    214         coro = self._coro
    215         self._fut_waiter = None
    216 
    217         _enter_task(self._loop, self)
    218         # Call either coro.throw(exc) or coro.send(None).
    219         try:
    220             if exc is None:
    221                 # We use the `send` method directly, because coroutines
    222                 # don't have `__iter__` and `__next__` methods.
    223                 result = coro.send(None)
    224             else:
    225                 result = coro.throw(exc)
    226         except StopIteration as exc:
    227             if self._must_cancel:
    228                 # Task is cancelled right before coro stops.
    229                 self._must_cancel = False
    230                 super().set_exception(futures.CancelledError())
    231             else:
    232                 super().set_result(exc.value)
    233         except futures.CancelledError:
    234             super().cancel()  # I.e., Future.cancel(self).
    235         except Exception as exc:
    236             super().set_exception(exc)
    237         except BaseException as exc:
    238             super().set_exception(exc)
    239             raise
    240         else:
    241             blocking = getattr(result, '_asyncio_future_blocking', None)
    242             if blocking is not None:
    243                 # Yielded Future must come from Future.__iter__().
    244                 if futures._get_loop(result) is not self._loop:
    245                     new_exc = RuntimeError(
    246                         f'Task {self!r} got Future '
    247                         f'{result!r} attached to a different loop')
    248                     self._loop.call_soon(
    249                         self.__step, new_exc, context=self._context)
    250                 elif blocking:
    251                     if result is self:
    252                         new_exc = RuntimeError(
    253                             f'Task cannot await on itself: {self!r}')
    254                         self._loop.call_soon(
    255                             self.__step, new_exc, context=self._context)
    256                     else:
    257                         result._asyncio_future_blocking = False
    258                         result.add_done_callback(
    259                             self.__wakeup, context=self._context)
    260                         self._fut_waiter = result
    261                         if self._must_cancel:
    262                             if self._fut_waiter.cancel():
    263                                 self._must_cancel = False
    264                 else:
    265                     new_exc = RuntimeError(
    266                         f'yield was used instead of yield from '
    267                         f'in task {self!r} with {result!r}')
    268                     self._loop.call_soon(
    269                         self.__step, new_exc, context=self._context)
    270 
    271             elif result is None:
    272                 # Bare yield relinquishes control for one event loop iteration.
    273                 self._loop.call_soon(self.__step, context=self._context)
    274             elif inspect.isgenerator(result):
    275                 # Yielding a generator is just wrong.
    276                 new_exc = RuntimeError(
    277                     f'yield was used instead of yield from for '
    278                     f'generator in task {self!r} with {result!r}')
    279                 self._loop.call_soon(
    280                     self.__step, new_exc, context=self._context)
    281             else:
    282                 # Yielding something else is an error.
    283                 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
    284                 self._loop.call_soon(
    285                     self.__step, new_exc, context=self._context)
    286         finally:
    287             _leave_task(self._loop, self)
    288             self = None  # Needed to break cycles when an exception occurs.
    289 
    290     def __wakeup(self, future):
    291         try:
    292             future.result()
    293         except Exception as exc:
    294             # This may also be a cancellation.
    295             self.__step(exc)
    296         else:
    297             # Don't pass the value of `future.result()` explicitly,
    298             # as `Future.__iter__` and `Future.__await__` don't need it.
    299             # If we call `_step(value, None)` instead of `_step()`,
    300             # Python eval loop would use `.send(value)` method call,
    301             # instead of `__next__()`, which is slower for futures
    302             # that return non-generator iterators from their `__iter__`.
    303             self.__step()
    304         self = None  # Needed to break cycles when an exception occurs.
    305 
    306 
    307 _PyTask = Task
    308 
    309 
    310 try:
    311     import _asyncio
    312 except ImportError:
    313     pass
    314 else:
    315     # _CTask is needed for tests.
    316     Task = _CTask = _asyncio.Task
    317 
    318 
    319 def create_task(coro):
    320     """Schedule the execution of a coroutine object in a spawn task.
    321 
    322     Return a Task object.
    323     """
    324     loop = events.get_running_loop()
    325     return loop.create_task(coro)
    326 
    327 
    328 # wait() and as_completed() similar to those in PEP 3148.
    329 
    330 FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
    331 FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
    332 ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
    333 
    334 
    335 async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
    336     """Wait for the Futures and coroutines given by fs to complete.
    337 
    338     The sequence futures must not be empty.
    339 
    340     Coroutines will be wrapped in Tasks.
    341 
    342     Returns two sets of Future: (done, pending).
    343 
    344     Usage:
    345 
    346         done, pending = await asyncio.wait(fs)
    347 
    348     Note: This does not raise TimeoutError! Futures that aren't done
    349     when the timeout occurs are returned in the second set.
    350     """
    351     if futures.isfuture(fs) or coroutines.iscoroutine(fs):
    352         raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
    353     if not fs:
    354         raise ValueError('Set of coroutines/Futures is empty.')
    355     if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
    356         raise ValueError(f'Invalid return_when value: {return_when}')
    357 
    358     if loop is None:
    359         loop = events.get_event_loop()
    360 
    361     fs = {ensure_future(f, loop=loop) for f in set(fs)}
    362 
    363     return await _wait(fs, timeout, return_when, loop)
    364 
    365 
    366 def _release_waiter(waiter, *args):
    367     if not waiter.done():
    368         waiter.set_result(None)
    369 
    370 
    371 async def wait_for(fut, timeout, *, loop=None):
    372     """Wait for the single Future or coroutine to complete, with timeout.
    373 
    374     Coroutine will be wrapped in Task.
    375 
    376     Returns result of the Future or coroutine.  When a timeout occurs,
    377     it cancels the task and raises TimeoutError.  To avoid the task
    378     cancellation, wrap it in shield().
    379 
    380     If the wait is cancelled, the task is also cancelled.
    381 
    382     This function is a coroutine.
    383     """
    384     if loop is None:
    385         loop = events.get_event_loop()
    386 
    387     if timeout is None:
    388         return await fut
    389 
    390     if timeout <= 0:
    391         fut = ensure_future(fut, loop=loop)
    392 
    393         if fut.done():
    394             return fut.result()
    395 
    396         fut.cancel()
    397         raise futures.TimeoutError()
    398 
    399     waiter = loop.create_future()
    400     timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
    401     cb = functools.partial(_release_waiter, waiter)
    402 
    403     fut = ensure_future(fut, loop=loop)
    404     fut.add_done_callback(cb)
    405 
    406     try:
    407         # wait until the future completes or the timeout
    408         try:
    409             await waiter
    410         except futures.CancelledError:
    411             fut.remove_done_callback(cb)
    412             fut.cancel()
    413             raise
    414 
    415         if fut.done():
    416             return fut.result()
    417         else:
    418             fut.remove_done_callback(cb)
    419             # We must ensure that the task is not running
    420             # after wait_for() returns.
    421             # See https://bugs.python.org/issue32751
    422             await _cancel_and_wait(fut, loop=loop)
    423             raise futures.TimeoutError()
    424     finally:
    425         timeout_handle.cancel()
    426 
    427 
    428 async def _wait(fs, timeout, return_when, loop):
    429     """Internal helper for wait().
    430 
    431     The fs argument must be a collection of Futures.
    432     """
    433     assert fs, 'Set of Futures is empty.'
    434     waiter = loop.create_future()
    435     timeout_handle = None
    436     if timeout is not None:
    437         timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
    438     counter = len(fs)
    439 
    440     def _on_completion(f):
    441         nonlocal counter
    442         counter -= 1
    443         if (counter <= 0 or
    444             return_when == FIRST_COMPLETED or
    445             return_when == FIRST_EXCEPTION and (not f.cancelled() and
    446                                                 f.exception() is not None)):
    447             if timeout_handle is not None:
    448                 timeout_handle.cancel()
    449             if not waiter.done():
    450                 waiter.set_result(None)
    451 
    452     for f in fs:
    453         f.add_done_callback(_on_completion)
    454 
    455     try:
    456         await waiter
    457     finally:
    458         if timeout_handle is not None:
    459             timeout_handle.cancel()
    460 
    461     done, pending = set(), set()
    462     for f in fs:
    463         f.remove_done_callback(_on_completion)
    464         if f.done():
    465             done.add(f)
    466         else:
    467             pending.add(f)
    468     return done, pending
    469 
    470 
    471 async def _cancel_and_wait(fut, loop):
    472     """Cancel the *fut* future or task and wait until it completes."""
    473 
    474     waiter = loop.create_future()
    475     cb = functools.partial(_release_waiter, waiter)
    476     fut.add_done_callback(cb)
    477 
    478     try:
    479         fut.cancel()
    480         # We cannot wait on *fut* directly to make
    481         # sure _cancel_and_wait itself is reliably cancellable.
    482         await waiter
    483     finally:
    484         fut.remove_done_callback(cb)
    485 
    486 
    487 # This is *not* a @coroutine!  It is just an iterator (yielding Futures).
    488 def as_completed(fs, *, loop=None, timeout=None):
    489     """Return an iterator whose values are coroutines.
    490 
    491     When waiting for the yielded coroutines you'll get the results (or
    492     exceptions!) of the original Futures (or coroutines), in the order
    493     in which and as soon as they complete.
    494 
    495     This differs from PEP 3148; the proper way to use this is:
    496 
    497         for f in as_completed(fs):
    498             result = await f  # The 'await' may raise.
    499             # Use result.
    500 
    501     If a timeout is specified, the 'await' will raise
    502     TimeoutError when the timeout occurs before all Futures are done.
    503 
    504     Note: The futures 'f' are not necessarily members of fs.
    505     """
    506     if futures.isfuture(fs) or coroutines.iscoroutine(fs):
    507         raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
    508     loop = loop if loop is not None else events.get_event_loop()
    509     todo = {ensure_future(f, loop=loop) for f in set(fs)}
    510     from .queues import Queue  # Import here to avoid circular import problem.
    511     done = Queue(loop=loop)
    512     timeout_handle = None
    513 
    514     def _on_timeout():
    515         for f in todo:
    516             f.remove_done_callback(_on_completion)
    517             done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
    518         todo.clear()  # Can't do todo.remove(f) in the loop.
    519 
    520     def _on_completion(f):
    521         if not todo:
    522             return  # _on_timeout() was here first.
    523         todo.remove(f)
    524         done.put_nowait(f)
    525         if not todo and timeout_handle is not None:
    526             timeout_handle.cancel()
    527 
    528     async def _wait_for_one():
    529         f = await done.get()
    530         if f is None:
    531             # Dummy value from _on_timeout().
    532             raise futures.TimeoutError
    533         return f.result()  # May raise f.exception().
    534 
    535     for f in todo:
    536         f.add_done_callback(_on_completion)
    537     if todo and timeout is not None:
    538         timeout_handle = loop.call_later(timeout, _on_timeout)
    539     for _ in range(len(todo)):
    540         yield _wait_for_one()
    541 
    542 
    543 @types.coroutine
    544 def __sleep0():
    545     """Skip one event loop run cycle.
    546 
    547     This is a private helper for 'asyncio.sleep()', used
    548     when the 'delay' is set to 0.  It uses a bare 'yield'
    549     expression (which Task.__step knows how to handle)
    550     instead of creating a Future object.
    551     """
    552     yield
    553 
    554 
    555 async def sleep(delay, result=None, *, loop=None):
    556     """Coroutine that completes after a given time (in seconds)."""
    557     if delay <= 0:
    558         await __sleep0()
    559         return result
    560 
    561     if loop is None:
    562         loop = events.get_event_loop()
    563     future = loop.create_future()
    564     h = loop.call_later(delay,
    565                         futures._set_result_unless_cancelled,
    566                         future, result)
    567     try:
    568         return await future
    569     finally:
    570         h.cancel()
    571 
    572 
    573 def ensure_future(coro_or_future, *, loop=None):
    574     """Wrap a coroutine or an awaitable in a future.
    575 
    576     If the argument is a Future, it is returned directly.
    577     """
    578     if coroutines.iscoroutine(coro_or_future):
    579         if loop is None:
    580             loop = events.get_event_loop()
    581         task = loop.create_task(coro_or_future)
    582         if task._source_traceback:
    583             del task._source_traceback[-1]
    584         return task
    585     elif futures.isfuture(coro_or_future):
    586         if loop is not None and loop is not futures._get_loop(coro_or_future):
    587             raise ValueError('loop argument must agree with Future')
    588         return coro_or_future
    589     elif inspect.isawaitable(coro_or_future):
    590         return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    591     else:
    592         raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
    593                         'required')
    594 
    595 
    596 @coroutine
    597 def _wrap_awaitable(awaitable):
    598     """Helper for asyncio.ensure_future().
    599 
    600     Wraps awaitable (an object with __await__) into a coroutine
    601     that will later be wrapped in a Task by ensure_future().
    602     """
    603     return (yield from awaitable.__await__())
    604 
    605 
    606 class _GatheringFuture(futures.Future):
    607     """Helper for gather().
    608 
    609     This overrides cancel() to cancel all the children and act more
    610     like Task.cancel(), which doesn't immediately mark itself as
    611     cancelled.
    612     """
    613 
    614     def __init__(self, children, *, loop=None):
    615         super().__init__(loop=loop)
    616         self._children = children
    617         self._cancel_requested = False
    618 
    619     def cancel(self):
    620         if self.done():
    621             return False
    622         ret = False
    623         for child in self._children:
    624             if child.cancel():
    625                 ret = True
    626         if ret:
    627             # If any child tasks were actually cancelled, we should
    628             # propagate the cancellation request regardless of
    629             # *return_exceptions* argument.  See issue 32684.
    630             self._cancel_requested = True
    631         return ret
    632 
    633 
    634 def gather(*coros_or_futures, loop=None, return_exceptions=False):
    635     """Return a future aggregating results from the given coroutines/futures.
    636 
    637     Coroutines will be wrapped in a future and scheduled in the event
    638     loop. They will not necessarily be scheduled in the same order as
    639     passed in.
    640 
    641     All futures must share the same event loop.  If all the tasks are
    642     done successfully, the returned future's result is the list of
    643     results (in the order of the original sequence, not necessarily
    644     the order of results arrival).  If *return_exceptions* is True,
    645     exceptions in the tasks are treated the same as successful
    646     results, and gathered in the result list; otherwise, the first
    647     raised exception will be immediately propagated to the returned
    648     future.
    649 
    650     Cancellation: if the outer Future is cancelled, all children (that
    651     have not completed yet) are also cancelled.  If any child is
    652     cancelled, this is treated as if it raised CancelledError --
    653     the outer Future is *not* cancelled in this case.  (This is to
    654     prevent the cancellation of one child to cause other children to
    655     be cancelled.)
    656     """
    657     if not coros_or_futures:
    658         if loop is None:
    659             loop = events.get_event_loop()
    660         outer = loop.create_future()
    661         outer.set_result([])
    662         return outer
    663 
    664     def _done_callback(fut):
    665         nonlocal nfinished
    666         nfinished += 1
    667 
    668         if outer.done():
    669             if not fut.cancelled():
    670                 # Mark exception retrieved.
    671                 fut.exception()
    672             return
    673 
    674         if not return_exceptions:
    675             if fut.cancelled():
    676                 # Check if 'fut' is cancelled first, as
    677                 # 'fut.exception()' will *raise* a CancelledError
    678                 # instead of returning it.
    679                 exc = futures.CancelledError()
    680                 outer.set_exception(exc)
    681                 return
    682             else:
    683                 exc = fut.exception()
    684                 if exc is not None:
    685                     outer.set_exception(exc)
    686                     return
    687 
    688         if nfinished == nfuts:
    689             # All futures are done; create a list of results
    690             # and set it to the 'outer' future.
    691             results = []
    692 
    693             for fut in children:
    694                 if fut.cancelled():
    695                     # Check if 'fut' is cancelled first, as
    696                     # 'fut.exception()' will *raise* a CancelledError
    697                     # instead of returning it.
    698                     res = futures.CancelledError()
    699                 else:
    700                     res = fut.exception()
    701                     if res is None:
    702                         res = fut.result()
    703                 results.append(res)
    704 
    705             if outer._cancel_requested:
    706                 # If gather is being cancelled we must propagate the
    707                 # cancellation regardless of *return_exceptions* argument.
    708                 # See issue 32684.
    709                 outer.set_exception(futures.CancelledError())
    710             else:
    711                 outer.set_result(results)
    712 
    713     arg_to_fut = {}
    714     children = []
    715     nfuts = 0
    716     nfinished = 0
    717     for arg in coros_or_futures:
    718         if arg not in arg_to_fut:
    719             fut = ensure_future(arg, loop=loop)
    720             if loop is None:
    721                 loop = futures._get_loop(fut)
    722             if fut is not arg:
    723                 # 'arg' was not a Future, therefore, 'fut' is a new
    724                 # Future created specifically for 'arg'.  Since the caller
    725                 # can't control it, disable the "destroy pending task"
    726                 # warning.
    727                 fut._log_destroy_pending = False
    728 
    729             nfuts += 1
    730             arg_to_fut[arg] = fut
    731             fut.add_done_callback(_done_callback)
    732 
    733         else:
    734             # There's a duplicate Future object in coros_or_futures.
    735             fut = arg_to_fut[arg]
    736 
    737         children.append(fut)
    738 
    739     outer = _GatheringFuture(children, loop=loop)
    740     return outer
    741 
    742 
    743 def shield(arg, *, loop=None):
    744     """Wait for a future, shielding it from cancellation.
    745 
    746     The statement
    747 
    748         res = await shield(something())
    749 
    750     is exactly equivalent to the statement
    751 
    752         res = await something()
    753 
    754     *except* that if the coroutine containing it is cancelled, the
    755     task running in something() is not cancelled.  From the POV of
    756     something(), the cancellation did not happen.  But its caller is
    757     still cancelled, so the yield-from expression still raises
    758     CancelledError.  Note: If something() is cancelled by other means
    759     this will still cancel shield().
    760 
    761     If you want to completely ignore cancellation (not recommended)
    762     you can combine shield() with a try/except clause, as follows:
    763 
    764         try:
    765             res = await shield(something())
    766         except CancelledError:
    767             res = None
    768     """
    769     inner = ensure_future(arg, loop=loop)
    770     if inner.done():
    771         # Shortcut.
    772         return inner
    773     loop = futures._get_loop(inner)
    774     outer = loop.create_future()
    775 
    776     def _done_callback(inner):
    777         if outer.cancelled():
    778             if not inner.cancelled():
    779                 # Mark inner's result as retrieved.
    780                 inner.exception()
    781             return
    782 
    783         if inner.cancelled():
    784             outer.cancel()
    785         else:
    786             exc = inner.exception()
    787             if exc is not None:
    788                 outer.set_exception(exc)
    789             else:
    790                 outer.set_result(inner.result())
    791 
    792     inner.add_done_callback(_done_callback)
    793     return outer
    794 
    795 
    796 def run_coroutine_threadsafe(coro, loop):
    797     """Submit a coroutine object to a given event loop.
    798 
    799     Return a concurrent.futures.Future to access the result.
    800     """
    801     if not coroutines.iscoroutine(coro):
    802         raise TypeError('A coroutine object is required')
    803     future = concurrent.futures.Future()
    804 
    805     def callback():
    806         try:
    807             futures._chain_future(ensure_future(coro, loop=loop), future)
    808         except Exception as exc:
    809             if future.set_running_or_notify_cancel():
    810                 future.set_exception(exc)
    811             raise
    812 
    813     loop.call_soon_threadsafe(callback)
    814     return future
    815 
    816 
    817 # WeakSet containing all alive tasks.
    818 _all_tasks = weakref.WeakSet()
    819 
    820 # Dictionary containing tasks that are currently active in
    821 # all running event loops.  {EventLoop: Task}
    822 _current_tasks = {}
    823 
    824 
    825 def _register_task(task):
    826     """Register a new task in asyncio as executed by loop."""
    827     _all_tasks.add(task)
    828 
    829 
    830 def _enter_task(loop, task):
    831     current_task = _current_tasks.get(loop)
    832     if current_task is not None:
    833         raise RuntimeError(f"Cannot enter into task {task!r} while another "
    834                            f"task {current_task!r} is being executed.")
    835     _current_tasks[loop] = task
    836 
    837 
    838 def _leave_task(loop, task):
    839     current_task = _current_tasks.get(loop)
    840     if current_task is not task:
    841         raise RuntimeError(f"Leaving task {task!r} does not match "
    842                            f"the current task {current_task!r}.")
    843     del _current_tasks[loop]
    844 
    845 
    846 def _unregister_task(task):
    847     """Unregister a task."""
    848     _all_tasks.discard(task)
    849 
    850 
    851 _py_register_task = _register_task
    852 _py_unregister_task = _unregister_task
    853 _py_enter_task = _enter_task
    854 _py_leave_task = _leave_task
    855 
    856 
    857 try:
    858     from _asyncio import (_register_task, _unregister_task,
    859                           _enter_task, _leave_task,
    860                           _all_tasks, _current_tasks)
    861 except ImportError:
    862     pass
    863 else:
    864     _c_register_task = _register_task
    865     _c_unregister_task = _unregister_task
    866     _c_enter_task = _enter_task
    867     _c_leave_task = _leave_task
    868