Home | History | Annotate | Download | only in asyncio
      1 """Event loop and event loop policy."""
      2 
      3 __all__ = ['AbstractEventLoopPolicy',
      4            'AbstractEventLoop', 'AbstractServer',
      5            'Handle', 'TimerHandle',
      6            'get_event_loop_policy', 'set_event_loop_policy',
      7            'get_event_loop', 'set_event_loop', 'new_event_loop',
      8            'get_child_watcher', 'set_child_watcher',
      9            '_set_running_loop', '_get_running_loop',
     10            ]
     11 
     12 import functools
     13 import inspect
     14 import os
     15 import reprlib
     16 import socket
     17 import subprocess
     18 import sys
     19 import threading
     20 import traceback
     21 
     22 from asyncio import compat
     23 
     24 
     25 def _get_function_source(func):
     26     if compat.PY34:
     27         func = inspect.unwrap(func)
     28     elif hasattr(func, '__wrapped__'):
     29         func = func.__wrapped__
     30     if inspect.isfunction(func):
     31         code = func.__code__
     32         return (code.co_filename, code.co_firstlineno)
     33     if isinstance(func, functools.partial):
     34         return _get_function_source(func.func)
     35     if compat.PY34 and isinstance(func, functools.partialmethod):
     36         return _get_function_source(func.func)
     37     return None
     38 
     39 
     40 def _format_args_and_kwargs(args, kwargs):
     41     """Format function arguments and keyword arguments.
     42 
     43     Special case for a single parameter: ('hello',) is formatted as ('hello').
     44     """
     45     # use reprlib to limit the length of the output
     46     items = []
     47     if args:
     48         items.extend(reprlib.repr(arg) for arg in args)
     49     if kwargs:
     50         items.extend('{}={}'.format(k, reprlib.repr(v))
     51                      for k, v in kwargs.items())
     52     return '(' + ', '.join(items) + ')'
     53 
     54 
     55 def _format_callback(func, args, kwargs, suffix=''):
     56     if isinstance(func, functools.partial):
     57         suffix = _format_args_and_kwargs(args, kwargs) + suffix
     58         return _format_callback(func.func, func.args, func.keywords, suffix)
     59 
     60     if hasattr(func, '__qualname__'):
     61         func_repr = getattr(func, '__qualname__')
     62     elif hasattr(func, '__name__'):
     63         func_repr = getattr(func, '__name__')
     64     else:
     65         func_repr = repr(func)
     66 
     67     func_repr += _format_args_and_kwargs(args, kwargs)
     68     if suffix:
     69         func_repr += suffix
     70     return func_repr
     71 
     72 def _format_callback_source(func, args):
     73     func_repr = _format_callback(func, args, None)
     74     source = _get_function_source(func)
     75     if source:
     76         func_repr += ' at %s:%s' % source
     77     return func_repr
     78 
     79 
     80 class Handle:
     81     """Object returned by callback registration methods."""
     82 
     83     __slots__ = ('_callback', '_args', '_cancelled', '_loop',
     84                  '_source_traceback', '_repr', '__weakref__')
     85 
     86     def __init__(self, callback, args, loop):
     87         self._loop = loop
     88         self._callback = callback
     89         self._args = args
     90         self._cancelled = False
     91         self._repr = None
     92         if self._loop.get_debug():
     93             self._source_traceback = traceback.extract_stack(sys._getframe(1))
     94         else:
     95             self._source_traceback = None
     96 
     97     def _repr_info(self):
     98         info = [self.__class__.__name__]
     99         if self._cancelled:
    100             info.append('cancelled')
    101         if self._callback is not None:
    102             info.append(_format_callback_source(self._callback, self._args))
    103         if self._source_traceback:
    104             frame = self._source_traceback[-1]
    105             info.append('created at %s:%s' % (frame[0], frame[1]))
    106         return info
    107 
    108     def __repr__(self):
    109         if self._repr is not None:
    110             return self._repr
    111         info = self._repr_info()
    112         return '<%s>' % ' '.join(info)
    113 
    114     def cancel(self):
    115         if not self._cancelled:
    116             self._cancelled = True
    117             if self._loop.get_debug():
    118                 # Keep a representation in debug mode to keep callback and
    119                 # parameters. For example, to log the warning
    120                 # "Executing <Handle...> took 2.5 second"
    121                 self._repr = repr(self)
    122             self._callback = None
    123             self._args = None
    124 
    125     def _run(self):
    126         try:
    127             self._callback(*self._args)
    128         except Exception as exc:
    129             cb = _format_callback_source(self._callback, self._args)
    130             msg = 'Exception in callback {}'.format(cb)
    131             context = {
    132                 'message': msg,
    133                 'exception': exc,
    134                 'handle': self,
    135             }
    136             if self._source_traceback:
    137                 context['source_traceback'] = self._source_traceback
    138             self._loop.call_exception_handler(context)
    139         self = None  # Needed to break cycles when an exception occurs.
    140 
    141 
    142 class TimerHandle(Handle):
    143     """Object returned by timed callback registration methods."""
    144 
    145     __slots__ = ['_scheduled', '_when']
    146 
    147     def __init__(self, when, callback, args, loop):
    148         assert when is not None
    149         super().__init__(callback, args, loop)
    150         if self._source_traceback:
    151             del self._source_traceback[-1]
    152         self._when = when
    153         self._scheduled = False
    154 
    155     def _repr_info(self):
    156         info = super()._repr_info()
    157         pos = 2 if self._cancelled else 1
    158         info.insert(pos, 'when=%s' % self._when)
    159         return info
    160 
    161     def __hash__(self):
    162         return hash(self._when)
    163 
    164     def __lt__(self, other):
    165         return self._when < other._when
    166 
    167     def __le__(self, other):
    168         if self._when < other._when:
    169             return True
    170         return self.__eq__(other)
    171 
    172     def __gt__(self, other):
    173         return self._when > other._when
    174 
    175     def __ge__(self, other):
    176         if self._when > other._when:
    177             return True
    178         return self.__eq__(other)
    179 
    180     def __eq__(self, other):
    181         if isinstance(other, TimerHandle):
    182             return (self._when == other._when and
    183                     self._callback == other._callback and
    184                     self._args == other._args and
    185                     self._cancelled == other._cancelled)
    186         return NotImplemented
    187 
    188     def __ne__(self, other):
    189         equal = self.__eq__(other)
    190         return NotImplemented if equal is NotImplemented else not equal
    191 
    192     def cancel(self):
    193         if not self._cancelled:
    194             self._loop._timer_handle_cancelled(self)
    195         super().cancel()
    196 
    197 
    198 class AbstractServer:
    199     """Abstract server returned by create_server()."""
    200 
    201     def close(self):
    202         """Stop serving.  This leaves existing connections open."""
    203         return NotImplemented
    204 
    205     def wait_closed(self):
    206         """Coroutine to wait until service is closed."""
    207         return NotImplemented
    208 
    209 
    210 class AbstractEventLoop:
    211     """Abstract event loop."""
    212 
    213     # Running and stopping the event loop.
    214 
    215     def run_forever(self):
    216         """Run the event loop until stop() is called."""
    217         raise NotImplementedError
    218 
    219     def run_until_complete(self, future):
    220         """Run the event loop until a Future is done.
    221 
    222         Return the Future's result, or raise its exception.
    223         """
    224         raise NotImplementedError
    225 
    226     def stop(self):
    227         """Stop the event loop as soon as reasonable.
    228 
    229         Exactly how soon that is may depend on the implementation, but
    230         no more I/O callbacks should be scheduled.
    231         """
    232         raise NotImplementedError
    233 
    234     def is_running(self):
    235         """Return whether the event loop is currently running."""
    236         raise NotImplementedError
    237 
    238     def is_closed(self):
    239         """Returns True if the event loop was closed."""
    240         raise NotImplementedError
    241 
    242     def close(self):
    243         """Close the loop.
    244 
    245         The loop should not be running.
    246 
    247         This is idempotent and irreversible.
    248 
    249         No other methods should be called after this one.
    250         """
    251         raise NotImplementedError
    252 
    253     def shutdown_asyncgens(self):
    254         """Shutdown all active asynchronous generators."""
    255         raise NotImplementedError
    256 
    257     # Methods scheduling callbacks.  All these return Handles.
    258 
    259     def _timer_handle_cancelled(self, handle):
    260         """Notification that a TimerHandle has been cancelled."""
    261         raise NotImplementedError
    262 
    263     def call_soon(self, callback, *args):
    264         return self.call_later(0, callback, *args)
    265 
    266     def call_later(self, delay, callback, *args):
    267         raise NotImplementedError
    268 
    269     def call_at(self, when, callback, *args):
    270         raise NotImplementedError
    271 
    272     def time(self):
    273         raise NotImplementedError
    274 
    275     def create_future(self):
    276         raise NotImplementedError
    277 
    278     # Method scheduling a coroutine object: create a task.
    279 
    280     def create_task(self, coro):
    281         raise NotImplementedError
    282 
    283     # Methods for interacting with threads.
    284 
    285     def call_soon_threadsafe(self, callback, *args):
    286         raise NotImplementedError
    287 
    288     def run_in_executor(self, executor, func, *args):
    289         raise NotImplementedError
    290 
    291     def set_default_executor(self, executor):
    292         raise NotImplementedError
    293 
    294     # Network I/O methods returning Futures.
    295 
    296     def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
    297         raise NotImplementedError
    298 
    299     def getnameinfo(self, sockaddr, flags=0):
    300         raise NotImplementedError
    301 
    302     def create_connection(self, protocol_factory, host=None, port=None, *,
    303                           ssl=None, family=0, proto=0, flags=0, sock=None,
    304                           local_addr=None, server_hostname=None):
    305         raise NotImplementedError
    306 
    307     def create_server(self, protocol_factory, host=None, port=None, *,
    308                       family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
    309                       sock=None, backlog=100, ssl=None, reuse_address=None,
    310                       reuse_port=None):
    311         """A coroutine which creates a TCP server bound to host and port.
    312 
    313         The return value is a Server object which can be used to stop
    314         the service.
    315 
    316         If host is an empty string or None all interfaces are assumed
    317         and a list of multiple sockets will be returned (most likely
    318         one for IPv4 and another one for IPv6). The host parameter can also be a
    319         sequence (e.g. list) of hosts to bind to.
    320 
    321         family can be set to either AF_INET or AF_INET6 to force the
    322         socket to use IPv4 or IPv6. If not set it will be determined
    323         from host (defaults to AF_UNSPEC).
    324 
    325         flags is a bitmask for getaddrinfo().
    326 
    327         sock can optionally be specified in order to use a preexisting
    328         socket object.
    329 
    330         backlog is the maximum number of queued connections passed to
    331         listen() (defaults to 100).
    332 
    333         ssl can be set to an SSLContext to enable SSL over the
    334         accepted connections.
    335 
    336         reuse_address tells the kernel to reuse a local socket in
    337         TIME_WAIT state, without waiting for its natural timeout to
    338         expire. If not specified will automatically be set to True on
    339         UNIX.
    340 
    341         reuse_port tells the kernel to allow this endpoint to be bound to
    342         the same port as other existing endpoints are bound to, so long as
    343         they all set this flag when being created. This option is not
    344         supported on Windows.
    345         """
    346         raise NotImplementedError
    347 
    348     def create_unix_connection(self, protocol_factory, path, *,
    349                                ssl=None, sock=None,
    350                                server_hostname=None):
    351         raise NotImplementedError
    352 
    353     def create_unix_server(self, protocol_factory, path, *,
    354                            sock=None, backlog=100, ssl=None):
    355         """A coroutine which creates a UNIX Domain Socket server.
    356 
    357         The return value is a Server object, which can be used to stop
    358         the service.
    359 
    360         path is a str, representing a file systsem path to bind the
    361         server socket to.
    362 
    363         sock can optionally be specified in order to use a preexisting
    364         socket object.
    365 
    366         backlog is the maximum number of queued connections passed to
    367         listen() (defaults to 100).
    368 
    369         ssl can be set to an SSLContext to enable SSL over the
    370         accepted connections.
    371         """
    372         raise NotImplementedError
    373 
    374     def create_datagram_endpoint(self, protocol_factory,
    375                                  local_addr=None, remote_addr=None, *,
    376                                  family=0, proto=0, flags=0,
    377                                  reuse_address=None, reuse_port=None,
    378                                  allow_broadcast=None, sock=None):
    379         """A coroutine which creates a datagram endpoint.
    380 
    381         This method will try to establish the endpoint in the background.
    382         When successful, the coroutine returns a (transport, protocol) pair.
    383 
    384         protocol_factory must be a callable returning a protocol instance.
    385 
    386         socket family AF_INET or socket.AF_INET6 depending on host (or
    387         family if specified), socket type SOCK_DGRAM.
    388 
    389         reuse_address tells the kernel to reuse a local socket in
    390         TIME_WAIT state, without waiting for its natural timeout to
    391         expire. If not specified it will automatically be set to True on
    392         UNIX.
    393 
    394         reuse_port tells the kernel to allow this endpoint to be bound to
    395         the same port as other existing endpoints are bound to, so long as
    396         they all set this flag when being created. This option is not
    397         supported on Windows and some UNIX's. If the
    398         :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
    399         capability is unsupported.
    400 
    401         allow_broadcast tells the kernel to allow this endpoint to send
    402         messages to the broadcast address.
    403 
    404         sock can optionally be specified in order to use a preexisting
    405         socket object.
    406         """
    407         raise NotImplementedError
    408 
    409     # Pipes and subprocesses.
    410 
    411     def connect_read_pipe(self, protocol_factory, pipe):
    412         """Register read pipe in event loop. Set the pipe to non-blocking mode.
    413 
    414         protocol_factory should instantiate object with Protocol interface.
    415         pipe is a file-like object.
    416         Return pair (transport, protocol), where transport supports the
    417         ReadTransport interface."""
    418         # The reason to accept file-like object instead of just file descriptor
    419         # is: we need to own pipe and close it at transport finishing
    420         # Can got complicated errors if pass f.fileno(),
    421         # close fd in pipe transport then close f and vise versa.
    422         raise NotImplementedError
    423 
    424     def connect_write_pipe(self, protocol_factory, pipe):
    425         """Register write pipe in event loop.
    426 
    427         protocol_factory should instantiate object with BaseProtocol interface.
    428         Pipe is file-like object already switched to nonblocking.
    429         Return pair (transport, protocol), where transport support
    430         WriteTransport interface."""
    431         # The reason to accept file-like object instead of just file descriptor
    432         # is: we need to own pipe and close it at transport finishing
    433         # Can got complicated errors if pass f.fileno(),
    434         # close fd in pipe transport then close f and vise versa.
    435         raise NotImplementedError
    436 
    437     def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
    438                          stdout=subprocess.PIPE, stderr=subprocess.PIPE,
    439                          **kwargs):
    440         raise NotImplementedError
    441 
    442     def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
    443                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
    444                         **kwargs):
    445         raise NotImplementedError
    446 
    447     # Ready-based callback registration methods.
    448     # The add_*() methods return None.
    449     # The remove_*() methods return True if something was removed,
    450     # False if there was nothing to delete.
    451 
    452     def add_reader(self, fd, callback, *args):
    453         raise NotImplementedError
    454 
    455     def remove_reader(self, fd):
    456         raise NotImplementedError
    457 
    458     def add_writer(self, fd, callback, *args):
    459         raise NotImplementedError
    460 
    461     def remove_writer(self, fd):
    462         raise NotImplementedError
    463 
    464     # Completion based I/O methods returning Futures.
    465 
    466     def sock_recv(self, sock, nbytes):
    467         raise NotImplementedError
    468 
    469     def sock_sendall(self, sock, data):
    470         raise NotImplementedError
    471 
    472     def sock_connect(self, sock, address):
    473         raise NotImplementedError
    474 
    475     def sock_accept(self, sock):
    476         raise NotImplementedError
    477 
    478     # Signal handling.
    479 
    480     def add_signal_handler(self, sig, callback, *args):
    481         raise NotImplementedError
    482 
    483     def remove_signal_handler(self, sig):
    484         raise NotImplementedError
    485 
    486     # Task factory.
    487 
    488     def set_task_factory(self, factory):
    489         raise NotImplementedError
    490 
    491     def get_task_factory(self):
    492         raise NotImplementedError
    493 
    494     # Error handlers.
    495 
    496     def get_exception_handler(self):
    497         raise NotImplementedError
    498 
    499     def set_exception_handler(self, handler):
    500         raise NotImplementedError
    501 
    502     def default_exception_handler(self, context):
    503         raise NotImplementedError
    504 
    505     def call_exception_handler(self, context):
    506         raise NotImplementedError
    507 
    508     # Debug flag management.
    509 
    510     def get_debug(self):
    511         raise NotImplementedError
    512 
    513     def set_debug(self, enabled):
    514         raise NotImplementedError
    515 
    516 
    517 class AbstractEventLoopPolicy:
    518     """Abstract policy for accessing the event loop."""
    519 
    520     def get_event_loop(self):
    521         """Get the event loop for the current context.
    522 
    523         Returns an event loop object implementing the BaseEventLoop interface,
    524         or raises an exception in case no event loop has been set for the
    525         current context and the current policy does not specify to create one.
    526 
    527         It should never return None."""
    528         raise NotImplementedError
    529 
    530     def set_event_loop(self, loop):
    531         """Set the event loop for the current context to loop."""
    532         raise NotImplementedError
    533 
    534     def new_event_loop(self):
    535         """Create and return a new event loop object according to this
    536         policy's rules. If there's need to set this loop as the event loop for
    537         the current context, set_event_loop must be called explicitly."""
    538         raise NotImplementedError
    539 
    540     # Child processes handling (Unix only).
    541 
    542     def get_child_watcher(self):
    543         "Get the watcher for child processes."
    544         raise NotImplementedError
    545 
    546     def set_child_watcher(self, watcher):
    547         """Set the watcher for child processes."""
    548         raise NotImplementedError
    549 
    550 
    551 class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
    552     """Default policy implementation for accessing the event loop.
    553 
    554     In this policy, each thread has its own event loop.  However, we
    555     only automatically create an event loop by default for the main
    556     thread; other threads by default have no event loop.
    557 
    558     Other policies may have different rules (e.g. a single global
    559     event loop, or automatically creating an event loop per thread, or
    560     using some other notion of context to which an event loop is
    561     associated).
    562     """
    563 
    564     _loop_factory = None
    565 
    566     class _Local(threading.local):
    567         _loop = None
    568         _set_called = False
    569 
    570     def __init__(self):
    571         self._local = self._Local()
    572 
    573     def get_event_loop(self):
    574         """Get the event loop.
    575 
    576         This may be None or an instance of EventLoop.
    577         """
    578         if (self._local._loop is None and
    579             not self._local._set_called and
    580             isinstance(threading.current_thread(), threading._MainThread)):
    581             self.set_event_loop(self.new_event_loop())
    582         if self._local._loop is None:
    583             raise RuntimeError('There is no current event loop in thread %r.'
    584                                % threading.current_thread().name)
    585         return self._local._loop
    586 
    587     def set_event_loop(self, loop):
    588         """Set the event loop."""
    589         self._local._set_called = True
    590         assert loop is None or isinstance(loop, AbstractEventLoop)
    591         self._local._loop = loop
    592 
    593     def new_event_loop(self):
    594         """Create a new event loop.
    595 
    596         You must call set_event_loop() to make this the current event
    597         loop.
    598         """
    599         return self._loop_factory()
    600 
    601 
    602 # Event loop policy.  The policy itself is always global, even if the
    603 # policy's rules say that there is an event loop per thread (or other
    604 # notion of context).  The default policy is installed by the first
    605 # call to get_event_loop_policy().
    606 _event_loop_policy = None
    607 
    608 # Lock for protecting the on-the-fly creation of the event loop policy.
    609 _lock = threading.Lock()
    610 
    611 
    612 # A TLS for the running event loop, used by _get_running_loop.
    613 class _RunningLoop(threading.local):
    614     _loop = None
    615     _pid = None
    616 
    617 
    618 _running_loop = _RunningLoop()
    619 
    620 
    621 def _get_running_loop():
    622     """Return the running event loop or None.
    623 
    624     This is a low-level function intended to be used by event loops.
    625     This function is thread-specific.
    626     """
    627     running_loop = _running_loop._loop
    628     if running_loop is not None and _running_loop._pid == os.getpid():
    629         return running_loop
    630 
    631 
    632 def _set_running_loop(loop):
    633     """Set the running event loop.
    634 
    635     This is a low-level function intended to be used by event loops.
    636     This function is thread-specific.
    637     """
    638     _running_loop._pid = os.getpid()
    639     _running_loop._loop = loop
    640 
    641 
    642 def _init_event_loop_policy():
    643     global _event_loop_policy
    644     with _lock:
    645         if _event_loop_policy is None:  # pragma: no branch
    646             from . import DefaultEventLoopPolicy
    647             _event_loop_policy = DefaultEventLoopPolicy()
    648 
    649 
    650 def get_event_loop_policy():
    651     """Get the current event loop policy."""
    652     if _event_loop_policy is None:
    653         _init_event_loop_policy()
    654     return _event_loop_policy
    655 
    656 
    657 def set_event_loop_policy(policy):
    658     """Set the current event loop policy.
    659 
    660     If policy is None, the default policy is restored."""
    661     global _event_loop_policy
    662     assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
    663     _event_loop_policy = policy
    664 
    665 
    666 def get_event_loop():
    667     """Return an asyncio event loop.
    668 
    669     When called from a coroutine or a callback (e.g. scheduled with call_soon
    670     or similar API), this function will always return the running event loop.
    671 
    672     If there is no running event loop set, the function will return
    673     the result of `get_event_loop_policy().get_event_loop()` call.
    674     """
    675     current_loop = _get_running_loop()
    676     if current_loop is not None:
    677         return current_loop
    678     return get_event_loop_policy().get_event_loop()
    679 
    680 
    681 def set_event_loop(loop):
    682     """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
    683     get_event_loop_policy().set_event_loop(loop)
    684 
    685 
    686 def new_event_loop():
    687     """Equivalent to calling get_event_loop_policy().new_event_loop()."""
    688     return get_event_loop_policy().new_event_loop()
    689 
    690 
    691 def get_child_watcher():
    692     """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
    693     return get_event_loop_policy().get_child_watcher()
    694 
    695 
    696 def set_child_watcher(watcher):
    697     """Equivalent to calling
    698     get_event_loop_policy().set_child_watcher(watcher)."""
    699     return get_event_loop_policy().set_child_watcher(watcher)
    700