1 """Support for tasks, coroutines and the scheduler.""" 2 3 __all__ = ( 4 'Task', 'create_task', 5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 6 'wait', 'wait_for', 'as_completed', 'sleep', 7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 8 'current_task', 'all_tasks', 9 '_register_task', '_unregister_task', '_enter_task', '_leave_task', 10 ) 11 12 import concurrent.futures 13 import contextvars 14 import functools 15 import inspect 16 import types 17 import warnings 18 import weakref 19 20 from . import base_tasks 21 from . import coroutines 22 from . import events 23 from . import futures 24 from .coroutines import coroutine 25 26 27 def current_task(loop=None): 28 """Return a currently executed task.""" 29 if loop is None: 30 loop = events.get_running_loop() 31 return _current_tasks.get(loop) 32 33 34 def all_tasks(loop=None): 35 """Return a set of all tasks for the loop.""" 36 if loop is None: 37 loop = events.get_running_loop() 38 # NB: set(_all_tasks) is required to protect 39 # from https://bugs.python.org/issue34970 bug 40 return {t for t in list(_all_tasks) 41 if futures._get_loop(t) is loop and not t.done()} 42 43 44 def _all_tasks_compat(loop=None): 45 # Different from "all_task()" by returning *all* Tasks, including 46 # the completed ones. Used to implement deprecated "Tasks.all_task()" 47 # method. 48 if loop is None: 49 loop = events.get_event_loop() 50 # NB: set(_all_tasks) is required to protect 51 # from https://bugs.python.org/issue34970 bug 52 return {t for t in list(_all_tasks) if futures._get_loop(t) is loop} 53 54 55 class Task(futures._PyFuture): # Inherit Python Task implementation 56 # from a Python Future implementation. 57 58 """A coroutine wrapped in a Future.""" 59 60 # An important invariant maintained while a Task not done: 61 # 62 # - Either _fut_waiter is None, and _step() is scheduled; 63 # - or _fut_waiter is some Future, and _step() is *not* scheduled. 64 # 65 # The only transition from the latter to the former is through 66 # _wakeup(). When _fut_waiter is not None, one of its callbacks 67 # must be _wakeup(). 68 69 # If False, don't log a message if the task is destroyed whereas its 70 # status is still pending 71 _log_destroy_pending = True 72 73 @classmethod 74 def current_task(cls, loop=None): 75 """Return the currently running task in an event loop or None. 76 77 By default the current task for the current event loop is returned. 78 79 None is returned when called not in the context of a Task. 80 """ 81 warnings.warn("Task.current_task() is deprecated, " 82 "use asyncio.current_task() instead", 83 PendingDeprecationWarning, 84 stacklevel=2) 85 if loop is None: 86 loop = events.get_event_loop() 87 return current_task(loop) 88 89 @classmethod 90 def all_tasks(cls, loop=None): 91 """Return a set of all tasks for an event loop. 92 93 By default all tasks for the current event loop are returned. 94 """ 95 warnings.warn("Task.all_tasks() is deprecated, " 96 "use asyncio.all_tasks() instead", 97 PendingDeprecationWarning, 98 stacklevel=2) 99 return _all_tasks_compat(loop) 100 101 def __init__(self, coro, *, loop=None): 102 super().__init__(loop=loop) 103 if self._source_traceback: 104 del self._source_traceback[-1] 105 if not coroutines.iscoroutine(coro): 106 # raise after Future.__init__(), attrs are required for __del__ 107 # prevent logging for pending task in __del__ 108 self._log_destroy_pending = False 109 raise TypeError(f"a coroutine was expected, got {coro!r}") 110 111 self._must_cancel = False 112 self._fut_waiter = None 113 self._coro = coro 114 self._context = contextvars.copy_context() 115 116 self._loop.call_soon(self.__step, context=self._context) 117 _register_task(self) 118 119 def __del__(self): 120 if self._state == futures._PENDING and self._log_destroy_pending: 121 context = { 122 'task': self, 123 'message': 'Task was destroyed but it is pending!', 124 } 125 if self._source_traceback: 126 context['source_traceback'] = self._source_traceback 127 self._loop.call_exception_handler(context) 128 super().__del__() 129 130 def _repr_info(self): 131 return base_tasks._task_repr_info(self) 132 133 def set_result(self, result): 134 raise RuntimeError('Task does not support set_result operation') 135 136 def set_exception(self, exception): 137 raise RuntimeError('Task does not support set_exception operation') 138 139 def get_stack(self, *, limit=None): 140 """Return the list of stack frames for this task's coroutine. 141 142 If the coroutine is not done, this returns the stack where it is 143 suspended. If the coroutine has completed successfully or was 144 cancelled, this returns an empty list. If the coroutine was 145 terminated by an exception, this returns the list of traceback 146 frames. 147 148 The frames are always ordered from oldest to newest. 149 150 The optional limit gives the maximum number of frames to 151 return; by default all available frames are returned. Its 152 meaning differs depending on whether a stack or a traceback is 153 returned: the newest frames of a stack are returned, but the 154 oldest frames of a traceback are returned. (This matches the 155 behavior of the traceback module.) 156 157 For reasons beyond our control, only one stack frame is 158 returned for a suspended coroutine. 159 """ 160 return base_tasks._task_get_stack(self, limit) 161 162 def print_stack(self, *, limit=None, file=None): 163 """Print the stack or traceback for this task's coroutine. 164 165 This produces output similar to that of the traceback module, 166 for the frames retrieved by get_stack(). The limit argument 167 is passed to get_stack(). The file argument is an I/O stream 168 to which the output is written; by default output is written 169 to sys.stderr. 170 """ 171 return base_tasks._task_print_stack(self, limit, file) 172 173 def cancel(self): 174 """Request that this task cancel itself. 175 176 This arranges for a CancelledError to be thrown into the 177 wrapped coroutine on the next cycle through the event loop. 178 The coroutine then has a chance to clean up or even deny 179 the request using try/except/finally. 180 181 Unlike Future.cancel, this does not guarantee that the 182 task will be cancelled: the exception might be caught and 183 acted upon, delaying cancellation of the task or preventing 184 cancellation completely. The task may also return a value or 185 raise a different exception. 186 187 Immediately after this method is called, Task.cancelled() will 188 not return True (unless the task was already cancelled). A 189 task will be marked as cancelled when the wrapped coroutine 190 terminates with a CancelledError exception (even if cancel() 191 was not called). 192 """ 193 self._log_traceback = False 194 if self.done(): 195 return False 196 if self._fut_waiter is not None: 197 if self._fut_waiter.cancel(): 198 # Leave self._fut_waiter; it may be a Task that 199 # catches and ignores the cancellation so we may have 200 # to cancel it again later. 201 return True 202 # It must be the case that self.__step is already scheduled. 203 self._must_cancel = True 204 return True 205 206 def __step(self, exc=None): 207 if self.done(): 208 raise futures.InvalidStateError( 209 f'_step(): already done: {self!r}, {exc!r}') 210 if self._must_cancel: 211 if not isinstance(exc, futures.CancelledError): 212 exc = futures.CancelledError() 213 self._must_cancel = False 214 coro = self._coro 215 self._fut_waiter = None 216 217 _enter_task(self._loop, self) 218 # Call either coro.throw(exc) or coro.send(None). 219 try: 220 if exc is None: 221 # We use the `send` method directly, because coroutines 222 # don't have `__iter__` and `__next__` methods. 223 result = coro.send(None) 224 else: 225 result = coro.throw(exc) 226 except StopIteration as exc: 227 if self._must_cancel: 228 # Task is cancelled right before coro stops. 229 self._must_cancel = False 230 super().set_exception(futures.CancelledError()) 231 else: 232 super().set_result(exc.value) 233 except futures.CancelledError: 234 super().cancel() # I.e., Future.cancel(self). 235 except Exception as exc: 236 super().set_exception(exc) 237 except BaseException as exc: 238 super().set_exception(exc) 239 raise 240 else: 241 blocking = getattr(result, '_asyncio_future_blocking', None) 242 if blocking is not None: 243 # Yielded Future must come from Future.__iter__(). 244 if futures._get_loop(result) is not self._loop: 245 new_exc = RuntimeError( 246 f'Task {self!r} got Future ' 247 f'{result!r} attached to a different loop') 248 self._loop.call_soon( 249 self.__step, new_exc, context=self._context) 250 elif blocking: 251 if result is self: 252 new_exc = RuntimeError( 253 f'Task cannot await on itself: {self!r}') 254 self._loop.call_soon( 255 self.__step, new_exc, context=self._context) 256 else: 257 result._asyncio_future_blocking = False 258 result.add_done_callback( 259 self.__wakeup, context=self._context) 260 self._fut_waiter = result 261 if self._must_cancel: 262 if self._fut_waiter.cancel(): 263 self._must_cancel = False 264 else: 265 new_exc = RuntimeError( 266 f'yield was used instead of yield from ' 267 f'in task {self!r} with {result!r}') 268 self._loop.call_soon( 269 self.__step, new_exc, context=self._context) 270 271 elif result is None: 272 # Bare yield relinquishes control for one event loop iteration. 273 self._loop.call_soon(self.__step, context=self._context) 274 elif inspect.isgenerator(result): 275 # Yielding a generator is just wrong. 276 new_exc = RuntimeError( 277 f'yield was used instead of yield from for ' 278 f'generator in task {self!r} with {result!r}') 279 self._loop.call_soon( 280 self.__step, new_exc, context=self._context) 281 else: 282 # Yielding something else is an error. 283 new_exc = RuntimeError(f'Task got bad yield: {result!r}') 284 self._loop.call_soon( 285 self.__step, new_exc, context=self._context) 286 finally: 287 _leave_task(self._loop, self) 288 self = None # Needed to break cycles when an exception occurs. 289 290 def __wakeup(self, future): 291 try: 292 future.result() 293 except Exception as exc: 294 # This may also be a cancellation. 295 self.__step(exc) 296 else: 297 # Don't pass the value of `future.result()` explicitly, 298 # as `Future.__iter__` and `Future.__await__` don't need it. 299 # If we call `_step(value, None)` instead of `_step()`, 300 # Python eval loop would use `.send(value)` method call, 301 # instead of `__next__()`, which is slower for futures 302 # that return non-generator iterators from their `__iter__`. 303 self.__step() 304 self = None # Needed to break cycles when an exception occurs. 305 306 307 _PyTask = Task 308 309 310 try: 311 import _asyncio 312 except ImportError: 313 pass 314 else: 315 # _CTask is needed for tests. 316 Task = _CTask = _asyncio.Task 317 318 319 def create_task(coro): 320 """Schedule the execution of a coroutine object in a spawn task. 321 322 Return a Task object. 323 """ 324 loop = events.get_running_loop() 325 return loop.create_task(coro) 326 327 328 # wait() and as_completed() similar to those in PEP 3148. 329 330 FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 331 FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 332 ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 333 334 335 async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): 336 """Wait for the Futures and coroutines given by fs to complete. 337 338 The sequence futures must not be empty. 339 340 Coroutines will be wrapped in Tasks. 341 342 Returns two sets of Future: (done, pending). 343 344 Usage: 345 346 done, pending = await asyncio.wait(fs) 347 348 Note: This does not raise TimeoutError! Futures that aren't done 349 when the timeout occurs are returned in the second set. 350 """ 351 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 352 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 353 if not fs: 354 raise ValueError('Set of coroutines/Futures is empty.') 355 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 356 raise ValueError(f'Invalid return_when value: {return_when}') 357 358 if loop is None: 359 loop = events.get_event_loop() 360 361 fs = {ensure_future(f, loop=loop) for f in set(fs)} 362 363 return await _wait(fs, timeout, return_when, loop) 364 365 366 def _release_waiter(waiter, *args): 367 if not waiter.done(): 368 waiter.set_result(None) 369 370 371 async def wait_for(fut, timeout, *, loop=None): 372 """Wait for the single Future or coroutine to complete, with timeout. 373 374 Coroutine will be wrapped in Task. 375 376 Returns result of the Future or coroutine. When a timeout occurs, 377 it cancels the task and raises TimeoutError. To avoid the task 378 cancellation, wrap it in shield(). 379 380 If the wait is cancelled, the task is also cancelled. 381 382 This function is a coroutine. 383 """ 384 if loop is None: 385 loop = events.get_event_loop() 386 387 if timeout is None: 388 return await fut 389 390 if timeout <= 0: 391 fut = ensure_future(fut, loop=loop) 392 393 if fut.done(): 394 return fut.result() 395 396 fut.cancel() 397 raise futures.TimeoutError() 398 399 waiter = loop.create_future() 400 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 401 cb = functools.partial(_release_waiter, waiter) 402 403 fut = ensure_future(fut, loop=loop) 404 fut.add_done_callback(cb) 405 406 try: 407 # wait until the future completes or the timeout 408 try: 409 await waiter 410 except futures.CancelledError: 411 fut.remove_done_callback(cb) 412 fut.cancel() 413 raise 414 415 if fut.done(): 416 return fut.result() 417 else: 418 fut.remove_done_callback(cb) 419 # We must ensure that the task is not running 420 # after wait_for() returns. 421 # See https://bugs.python.org/issue32751 422 await _cancel_and_wait(fut, loop=loop) 423 raise futures.TimeoutError() 424 finally: 425 timeout_handle.cancel() 426 427 428 async def _wait(fs, timeout, return_when, loop): 429 """Internal helper for wait(). 430 431 The fs argument must be a collection of Futures. 432 """ 433 assert fs, 'Set of Futures is empty.' 434 waiter = loop.create_future() 435 timeout_handle = None 436 if timeout is not None: 437 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 438 counter = len(fs) 439 440 def _on_completion(f): 441 nonlocal counter 442 counter -= 1 443 if (counter <= 0 or 444 return_when == FIRST_COMPLETED or 445 return_when == FIRST_EXCEPTION and (not f.cancelled() and 446 f.exception() is not None)): 447 if timeout_handle is not None: 448 timeout_handle.cancel() 449 if not waiter.done(): 450 waiter.set_result(None) 451 452 for f in fs: 453 f.add_done_callback(_on_completion) 454 455 try: 456 await waiter 457 finally: 458 if timeout_handle is not None: 459 timeout_handle.cancel() 460 461 done, pending = set(), set() 462 for f in fs: 463 f.remove_done_callback(_on_completion) 464 if f.done(): 465 done.add(f) 466 else: 467 pending.add(f) 468 return done, pending 469 470 471 async def _cancel_and_wait(fut, loop): 472 """Cancel the *fut* future or task and wait until it completes.""" 473 474 waiter = loop.create_future() 475 cb = functools.partial(_release_waiter, waiter) 476 fut.add_done_callback(cb) 477 478 try: 479 fut.cancel() 480 # We cannot wait on *fut* directly to make 481 # sure _cancel_and_wait itself is reliably cancellable. 482 await waiter 483 finally: 484 fut.remove_done_callback(cb) 485 486 487 # This is *not* a @coroutine! It is just an iterator (yielding Futures). 488 def as_completed(fs, *, loop=None, timeout=None): 489 """Return an iterator whose values are coroutines. 490 491 When waiting for the yielded coroutines you'll get the results (or 492 exceptions!) of the original Futures (or coroutines), in the order 493 in which and as soon as they complete. 494 495 This differs from PEP 3148; the proper way to use this is: 496 497 for f in as_completed(fs): 498 result = await f # The 'await' may raise. 499 # Use result. 500 501 If a timeout is specified, the 'await' will raise 502 TimeoutError when the timeout occurs before all Futures are done. 503 504 Note: The futures 'f' are not necessarily members of fs. 505 """ 506 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 507 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 508 loop = loop if loop is not None else events.get_event_loop() 509 todo = {ensure_future(f, loop=loop) for f in set(fs)} 510 from .queues import Queue # Import here to avoid circular import problem. 511 done = Queue(loop=loop) 512 timeout_handle = None 513 514 def _on_timeout(): 515 for f in todo: 516 f.remove_done_callback(_on_completion) 517 done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 518 todo.clear() # Can't do todo.remove(f) in the loop. 519 520 def _on_completion(f): 521 if not todo: 522 return # _on_timeout() was here first. 523 todo.remove(f) 524 done.put_nowait(f) 525 if not todo and timeout_handle is not None: 526 timeout_handle.cancel() 527 528 async def _wait_for_one(): 529 f = await done.get() 530 if f is None: 531 # Dummy value from _on_timeout(). 532 raise futures.TimeoutError 533 return f.result() # May raise f.exception(). 534 535 for f in todo: 536 f.add_done_callback(_on_completion) 537 if todo and timeout is not None: 538 timeout_handle = loop.call_later(timeout, _on_timeout) 539 for _ in range(len(todo)): 540 yield _wait_for_one() 541 542 543 @types.coroutine 544 def __sleep0(): 545 """Skip one event loop run cycle. 546 547 This is a private helper for 'asyncio.sleep()', used 548 when the 'delay' is set to 0. It uses a bare 'yield' 549 expression (which Task.__step knows how to handle) 550 instead of creating a Future object. 551 """ 552 yield 553 554 555 async def sleep(delay, result=None, *, loop=None): 556 """Coroutine that completes after a given time (in seconds).""" 557 if delay <= 0: 558 await __sleep0() 559 return result 560 561 if loop is None: 562 loop = events.get_event_loop() 563 future = loop.create_future() 564 h = loop.call_later(delay, 565 futures._set_result_unless_cancelled, 566 future, result) 567 try: 568 return await future 569 finally: 570 h.cancel() 571 572 573 def ensure_future(coro_or_future, *, loop=None): 574 """Wrap a coroutine or an awaitable in a future. 575 576 If the argument is a Future, it is returned directly. 577 """ 578 if coroutines.iscoroutine(coro_or_future): 579 if loop is None: 580 loop = events.get_event_loop() 581 task = loop.create_task(coro_or_future) 582 if task._source_traceback: 583 del task._source_traceback[-1] 584 return task 585 elif futures.isfuture(coro_or_future): 586 if loop is not None and loop is not futures._get_loop(coro_or_future): 587 raise ValueError('loop argument must agree with Future') 588 return coro_or_future 589 elif inspect.isawaitable(coro_or_future): 590 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) 591 else: 592 raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' 593 'required') 594 595 596 @coroutine 597 def _wrap_awaitable(awaitable): 598 """Helper for asyncio.ensure_future(). 599 600 Wraps awaitable (an object with __await__) into a coroutine 601 that will later be wrapped in a Task by ensure_future(). 602 """ 603 return (yield from awaitable.__await__()) 604 605 606 class _GatheringFuture(futures.Future): 607 """Helper for gather(). 608 609 This overrides cancel() to cancel all the children and act more 610 like Task.cancel(), which doesn't immediately mark itself as 611 cancelled. 612 """ 613 614 def __init__(self, children, *, loop=None): 615 super().__init__(loop=loop) 616 self._children = children 617 self._cancel_requested = False 618 619 def cancel(self): 620 if self.done(): 621 return False 622 ret = False 623 for child in self._children: 624 if child.cancel(): 625 ret = True 626 if ret: 627 # If any child tasks were actually cancelled, we should 628 # propagate the cancellation request regardless of 629 # *return_exceptions* argument. See issue 32684. 630 self._cancel_requested = True 631 return ret 632 633 634 def gather(*coros_or_futures, loop=None, return_exceptions=False): 635 """Return a future aggregating results from the given coroutines/futures. 636 637 Coroutines will be wrapped in a future and scheduled in the event 638 loop. They will not necessarily be scheduled in the same order as 639 passed in. 640 641 All futures must share the same event loop. If all the tasks are 642 done successfully, the returned future's result is the list of 643 results (in the order of the original sequence, not necessarily 644 the order of results arrival). If *return_exceptions* is True, 645 exceptions in the tasks are treated the same as successful 646 results, and gathered in the result list; otherwise, the first 647 raised exception will be immediately propagated to the returned 648 future. 649 650 Cancellation: if the outer Future is cancelled, all children (that 651 have not completed yet) are also cancelled. If any child is 652 cancelled, this is treated as if it raised CancelledError -- 653 the outer Future is *not* cancelled in this case. (This is to 654 prevent the cancellation of one child to cause other children to 655 be cancelled.) 656 """ 657 if not coros_or_futures: 658 if loop is None: 659 loop = events.get_event_loop() 660 outer = loop.create_future() 661 outer.set_result([]) 662 return outer 663 664 def _done_callback(fut): 665 nonlocal nfinished 666 nfinished += 1 667 668 if outer.done(): 669 if not fut.cancelled(): 670 # Mark exception retrieved. 671 fut.exception() 672 return 673 674 if not return_exceptions: 675 if fut.cancelled(): 676 # Check if 'fut' is cancelled first, as 677 # 'fut.exception()' will *raise* a CancelledError 678 # instead of returning it. 679 exc = futures.CancelledError() 680 outer.set_exception(exc) 681 return 682 else: 683 exc = fut.exception() 684 if exc is not None: 685 outer.set_exception(exc) 686 return 687 688 if nfinished == nfuts: 689 # All futures are done; create a list of results 690 # and set it to the 'outer' future. 691 results = [] 692 693 for fut in children: 694 if fut.cancelled(): 695 # Check if 'fut' is cancelled first, as 696 # 'fut.exception()' will *raise* a CancelledError 697 # instead of returning it. 698 res = futures.CancelledError() 699 else: 700 res = fut.exception() 701 if res is None: 702 res = fut.result() 703 results.append(res) 704 705 if outer._cancel_requested: 706 # If gather is being cancelled we must propagate the 707 # cancellation regardless of *return_exceptions* argument. 708 # See issue 32684. 709 outer.set_exception(futures.CancelledError()) 710 else: 711 outer.set_result(results) 712 713 arg_to_fut = {} 714 children = [] 715 nfuts = 0 716 nfinished = 0 717 for arg in coros_or_futures: 718 if arg not in arg_to_fut: 719 fut = ensure_future(arg, loop=loop) 720 if loop is None: 721 loop = futures._get_loop(fut) 722 if fut is not arg: 723 # 'arg' was not a Future, therefore, 'fut' is a new 724 # Future created specifically for 'arg'. Since the caller 725 # can't control it, disable the "destroy pending task" 726 # warning. 727 fut._log_destroy_pending = False 728 729 nfuts += 1 730 arg_to_fut[arg] = fut 731 fut.add_done_callback(_done_callback) 732 733 else: 734 # There's a duplicate Future object in coros_or_futures. 735 fut = arg_to_fut[arg] 736 737 children.append(fut) 738 739 outer = _GatheringFuture(children, loop=loop) 740 return outer 741 742 743 def shield(arg, *, loop=None): 744 """Wait for a future, shielding it from cancellation. 745 746 The statement 747 748 res = await shield(something()) 749 750 is exactly equivalent to the statement 751 752 res = await something() 753 754 *except* that if the coroutine containing it is cancelled, the 755 task running in something() is not cancelled. From the POV of 756 something(), the cancellation did not happen. But its caller is 757 still cancelled, so the yield-from expression still raises 758 CancelledError. Note: If something() is cancelled by other means 759 this will still cancel shield(). 760 761 If you want to completely ignore cancellation (not recommended) 762 you can combine shield() with a try/except clause, as follows: 763 764 try: 765 res = await shield(something()) 766 except CancelledError: 767 res = None 768 """ 769 inner = ensure_future(arg, loop=loop) 770 if inner.done(): 771 # Shortcut. 772 return inner 773 loop = futures._get_loop(inner) 774 outer = loop.create_future() 775 776 def _done_callback(inner): 777 if outer.cancelled(): 778 if not inner.cancelled(): 779 # Mark inner's result as retrieved. 780 inner.exception() 781 return 782 783 if inner.cancelled(): 784 outer.cancel() 785 else: 786 exc = inner.exception() 787 if exc is not None: 788 outer.set_exception(exc) 789 else: 790 outer.set_result(inner.result()) 791 792 inner.add_done_callback(_done_callback) 793 return outer 794 795 796 def run_coroutine_threadsafe(coro, loop): 797 """Submit a coroutine object to a given event loop. 798 799 Return a concurrent.futures.Future to access the result. 800 """ 801 if not coroutines.iscoroutine(coro): 802 raise TypeError('A coroutine object is required') 803 future = concurrent.futures.Future() 804 805 def callback(): 806 try: 807 futures._chain_future(ensure_future(coro, loop=loop), future) 808 except Exception as exc: 809 if future.set_running_or_notify_cancel(): 810 future.set_exception(exc) 811 raise 812 813 loop.call_soon_threadsafe(callback) 814 return future 815 816 817 # WeakSet containing all alive tasks. 818 _all_tasks = weakref.WeakSet() 819 820 # Dictionary containing tasks that are currently active in 821 # all running event loops. {EventLoop: Task} 822 _current_tasks = {} 823 824 825 def _register_task(task): 826 """Register a new task in asyncio as executed by loop.""" 827 _all_tasks.add(task) 828 829 830 def _enter_task(loop, task): 831 current_task = _current_tasks.get(loop) 832 if current_task is not None: 833 raise RuntimeError(f"Cannot enter into task {task!r} while another " 834 f"task {current_task!r} is being executed.") 835 _current_tasks[loop] = task 836 837 838 def _leave_task(loop, task): 839 current_task = _current_tasks.get(loop) 840 if current_task is not task: 841 raise RuntimeError(f"Leaving task {task!r} does not match " 842 f"the current task {current_task!r}.") 843 del _current_tasks[loop] 844 845 846 def _unregister_task(task): 847 """Unregister a task.""" 848 _all_tasks.discard(task) 849 850 851 _py_register_task = _register_task 852 _py_unregister_task = _unregister_task 853 _py_enter_task = _enter_task 854 _py_leave_task = _leave_task 855 856 857 try: 858 from _asyncio import (_register_task, _unregister_task, 859 _enter_task, _leave_task, 860 _all_tasks, _current_tasks) 861 except ImportError: 862 pass 863 else: 864 _c_register_task = _register_task 865 _c_unregister_task = _unregister_task 866 _c_enter_task = _enter_task 867 _c_leave_task = _leave_task 868