Home | History | Annotate | Download | only in asyncio
      1 """Base implementation of event loop.
      2 
      3 The event loop can be broken up into a multiplexer (the part
      4 responsible for notifying us of I/O events) and the event loop proper,
      5 which wraps a multiplexer with functionality for scheduling callbacks,
      6 immediately or at a given time in the future.
      7 
      8 Whenever a public API takes a callback, subsequent positional
      9 arguments will be passed to the callback if/when it is called.  This
     10 avoids the proliferation of trivial lambdas implementing closures.
     11 Keyword arguments for the callback are not supported; this is a
     12 conscious design decision, leaving the door open for keyword arguments
     13 to modify the meaning of the API call itself.
     14 """
     15 
     16 import collections
     17 import concurrent.futures
     18 import heapq
     19 import inspect
     20 import itertools
     21 import logging
     22 import os
     23 import socket
     24 import subprocess
     25 import threading
     26 import time
     27 import traceback
     28 import sys
     29 import warnings
     30 import weakref
     31 
     32 from . import compat
     33 from . import coroutines
     34 from . import events
     35 from . import futures
     36 from . import tasks
     37 from .coroutines import coroutine
     38 from .log import logger
     39 
     40 
     41 __all__ = ['BaseEventLoop']
     42 
     43 
     44 # Minimum number of _scheduled timer handles before cleanup of
     45 # cancelled handles is performed.
     46 _MIN_SCHEDULED_TIMER_HANDLES = 100
     47 
     48 # Minimum fraction of _scheduled timer handles that are cancelled
     49 # before cleanup of cancelled handles is performed.
     50 _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
     51 
     52 # Exceptions which must not call the exception handler in fatal error
     53 # methods (_fatal_error())
     54 _FATAL_ERROR_IGNORE = (BrokenPipeError,
     55                        ConnectionResetError, ConnectionAbortedError)
     56 
     57 
     58 def _format_handle(handle):
     59     cb = handle._callback
     60     if isinstance(getattr(cb, '__self__', None), tasks.Task):
     61         # format the task
     62         return repr(cb.__self__)
     63     else:
     64         return str(handle)
     65 
     66 
     67 def _format_pipe(fd):
     68     if fd == subprocess.PIPE:
     69         return '<pipe>'
     70     elif fd == subprocess.STDOUT:
     71         return '<stdout>'
     72     else:
     73         return repr(fd)
     74 
     75 
     76 def _set_reuseport(sock):
     77     if not hasattr(socket, 'SO_REUSEPORT'):
     78         raise ValueError('reuse_port not supported by socket module')
     79     else:
     80         try:
     81             sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
     82         except OSError:
     83             raise ValueError('reuse_port not supported by socket module, '
     84                              'SO_REUSEPORT defined but not implemented.')
     85 
     86 
     87 def _is_stream_socket(sock):
     88     # Linux's socket.type is a bitmask that can include extra info
     89     # about socket, therefore we can't do simple
     90     # `sock_type == socket.SOCK_STREAM`.
     91     return (sock.type & socket.SOCK_STREAM) == socket.SOCK_STREAM
     92 
     93 
     94 def _is_dgram_socket(sock):
     95     # Linux's socket.type is a bitmask that can include extra info
     96     # about socket, therefore we can't do simple
     97     # `sock_type == socket.SOCK_DGRAM`.
     98     return (sock.type & socket.SOCK_DGRAM) == socket.SOCK_DGRAM
     99 
    100 
    101 def _ipaddr_info(host, port, family, type, proto):
    102     # Try to skip getaddrinfo if "host" is already an IP. Users might have
    103     # handled name resolution in their own code and pass in resolved IPs.
    104     if not hasattr(socket, 'inet_pton'):
    105         return
    106 
    107     if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
    108             host is None:
    109         return None
    110 
    111     if type == socket.SOCK_STREAM:
    112         # Linux only:
    113         #    getaddrinfo() can raise when socket.type is a bit mask.
    114         #    So if socket.type is a bit mask of SOCK_STREAM, and say
    115         #    SOCK_NONBLOCK, we simply return None, which will trigger
    116         #    a call to getaddrinfo() letting it process this request.
    117         proto = socket.IPPROTO_TCP
    118     elif type == socket.SOCK_DGRAM:
    119         proto = socket.IPPROTO_UDP
    120     else:
    121         return None
    122 
    123     if port is None:
    124         port = 0
    125     elif isinstance(port, bytes) and port == b'':
    126         port = 0
    127     elif isinstance(port, str) and port == '':
    128         port = 0
    129     else:
    130         # If port's a service name like "http", don't skip getaddrinfo.
    131         try:
    132             port = int(port)
    133         except (TypeError, ValueError):
    134             return None
    135 
    136     if family == socket.AF_UNSPEC:
    137         afs = [socket.AF_INET]
    138         if hasattr(socket, 'AF_INET6'):
    139             afs.append(socket.AF_INET6)
    140     else:
    141         afs = [family]
    142 
    143     if isinstance(host, bytes):
    144         host = host.decode('idna')
    145     if '%' in host:
    146         # Linux's inet_pton doesn't accept an IPv6 zone index after host,
    147         # like '::1%lo0'.
    148         return None
    149 
    150     for af in afs:
    151         try:
    152             socket.inet_pton(af, host)
    153             # The host has already been resolved.
    154             return af, type, proto, '', (host, port)
    155         except OSError:
    156             pass
    157 
    158     # "host" is not an IP address.
    159     return None
    160 
    161 
    162 def _ensure_resolved(address, *, family=0, type=socket.SOCK_STREAM, proto=0,
    163                      flags=0, loop):
    164     host, port = address[:2]
    165     info = _ipaddr_info(host, port, family, type, proto)
    166     if info is not None:
    167         # "host" is already a resolved IP.
    168         fut = loop.create_future()
    169         fut.set_result([info])
    170         return fut
    171     else:
    172         return loop.getaddrinfo(host, port, family=family, type=type,
    173                                 proto=proto, flags=flags)
    174 
    175 
    176 def _run_until_complete_cb(fut):
    177     exc = fut._exception
    178     if (isinstance(exc, BaseException)
    179     and not isinstance(exc, Exception)):
    180         # Issue #22429: run_forever() already finished, no need to
    181         # stop it.
    182         return
    183     fut._loop.stop()
    184 
    185 
    186 class Server(events.AbstractServer):
    187 
    188     def __init__(self, loop, sockets):
    189         self._loop = loop
    190         self.sockets = sockets
    191         self._active_count = 0
    192         self._waiters = []
    193 
    194     def __repr__(self):
    195         return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
    196 
    197     def _attach(self):
    198         assert self.sockets is not None
    199         self._active_count += 1
    200 
    201     def _detach(self):
    202         assert self._active_count > 0
    203         self._active_count -= 1
    204         if self._active_count == 0 and self.sockets is None:
    205             self._wakeup()
    206 
    207     def close(self):
    208         sockets = self.sockets
    209         if sockets is None:
    210             return
    211         self.sockets = None
    212         for sock in sockets:
    213             self._loop._stop_serving(sock)
    214         if self._active_count == 0:
    215             self._wakeup()
    216 
    217     def _wakeup(self):
    218         waiters = self._waiters
    219         self._waiters = None
    220         for waiter in waiters:
    221             if not waiter.done():
    222                 waiter.set_result(waiter)
    223 
    224     @coroutine
    225     def wait_closed(self):
    226         if self.sockets is None or self._waiters is None:
    227             return
    228         waiter = self._loop.create_future()
    229         self._waiters.append(waiter)
    230         yield from waiter
    231 
    232 
    233 class BaseEventLoop(events.AbstractEventLoop):
    234 
    235     def __init__(self):
    236         self._timer_cancelled_count = 0
    237         self._closed = False
    238         self._stopping = False
    239         self._ready = collections.deque()
    240         self._scheduled = []
    241         self._default_executor = None
    242         self._internal_fds = 0
    243         # Identifier of the thread running the event loop, or None if the
    244         # event loop is not running
    245         self._thread_id = None
    246         self._clock_resolution = time.get_clock_info('monotonic').resolution
    247         self._exception_handler = None
    248         self.set_debug((not sys.flags.ignore_environment
    249                         and bool(os.environ.get('PYTHONASYNCIODEBUG'))))
    250         # In debug mode, if the execution of a callback or a step of a task
    251         # exceed this duration in seconds, the slow callback/task is logged.
    252         self.slow_callback_duration = 0.1
    253         self._current_handle = None
    254         self._task_factory = None
    255         self._coroutine_wrapper_set = False
    256 
    257         if hasattr(sys, 'get_asyncgen_hooks'):
    258             # Python >= 3.6
    259             # A weak set of all asynchronous generators that are
    260             # being iterated by the loop.
    261             self._asyncgens = weakref.WeakSet()
    262         else:
    263             self._asyncgens = None
    264 
    265         # Set to True when `loop.shutdown_asyncgens` is called.
    266         self._asyncgens_shutdown_called = False
    267 
    268     def __repr__(self):
    269         return ('<%s running=%s closed=%s debug=%s>'
    270                 % (self.__class__.__name__, self.is_running(),
    271                    self.is_closed(), self.get_debug()))
    272 
    273     def create_future(self):
    274         """Create a Future object attached to the loop."""
    275         return futures.Future(loop=self)
    276 
    277     def create_task(self, coro):
    278         """Schedule a coroutine object.
    279 
    280         Return a task object.
    281         """
    282         self._check_closed()
    283         if self._task_factory is None:
    284             task = tasks.Task(coro, loop=self)
    285             if task._source_traceback:
    286                 del task._source_traceback[-1]
    287         else:
    288             task = self._task_factory(self, coro)
    289         return task
    290 
    291     def set_task_factory(self, factory):
    292         """Set a task factory that will be used by loop.create_task().
    293 
    294         If factory is None the default task factory will be set.
    295 
    296         If factory is a callable, it should have a signature matching
    297         '(loop, coro)', where 'loop' will be a reference to the active
    298         event loop, 'coro' will be a coroutine object.  The callable
    299         must return a Future.
    300         """
    301         if factory is not None and not callable(factory):
    302             raise TypeError('task factory must be a callable or None')
    303         self._task_factory = factory
    304 
    305     def get_task_factory(self):
    306         """Return a task factory, or None if the default one is in use."""
    307         return self._task_factory
    308 
    309     def _make_socket_transport(self, sock, protocol, waiter=None, *,
    310                                extra=None, server=None):
    311         """Create socket transport."""
    312         raise NotImplementedError
    313 
    314     def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
    315                             *, server_side=False, server_hostname=None,
    316                             extra=None, server=None):
    317         """Create SSL transport."""
    318         raise NotImplementedError
    319 
    320     def _make_datagram_transport(self, sock, protocol,
    321                                  address=None, waiter=None, extra=None):
    322         """Create datagram transport."""
    323         raise NotImplementedError
    324 
    325     def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
    326                                   extra=None):
    327         """Create read pipe transport."""
    328         raise NotImplementedError
    329 
    330     def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
    331                                    extra=None):
    332         """Create write pipe transport."""
    333         raise NotImplementedError
    334 
    335     @coroutine
    336     def _make_subprocess_transport(self, protocol, args, shell,
    337                                    stdin, stdout, stderr, bufsize,
    338                                    extra=None, **kwargs):
    339         """Create subprocess transport."""
    340         raise NotImplementedError
    341 
    342     def _write_to_self(self):
    343         """Write a byte to self-pipe, to wake up the event loop.
    344 
    345         This may be called from a different thread.
    346 
    347         The subclass is responsible for implementing the self-pipe.
    348         """
    349         raise NotImplementedError
    350 
    351     def _process_events(self, event_list):
    352         """Process selector events."""
    353         raise NotImplementedError
    354 
    355     def _check_closed(self):
    356         if self._closed:
    357             raise RuntimeError('Event loop is closed')
    358 
    359     def _asyncgen_finalizer_hook(self, agen):
    360         self._asyncgens.discard(agen)
    361         if not self.is_closed():
    362             self.create_task(agen.aclose())
    363             # Wake up the loop if the finalizer was called from
    364             # a different thread.
    365             self._write_to_self()
    366 
    367     def _asyncgen_firstiter_hook(self, agen):
    368         if self._asyncgens_shutdown_called:
    369             warnings.warn(
    370                 "asynchronous generator {!r} was scheduled after "
    371                 "loop.shutdown_asyncgens() call".format(agen),
    372                 ResourceWarning, source=self)
    373 
    374         self._asyncgens.add(agen)
    375 
    376     @coroutine
    377     def shutdown_asyncgens(self):
    378         """Shutdown all active asynchronous generators."""
    379         self._asyncgens_shutdown_called = True
    380 
    381         if self._asyncgens is None or not len(self._asyncgens):
    382             # If Python version is <3.6 or we don't have any asynchronous
    383             # generators alive.
    384             return
    385 
    386         closing_agens = list(self._asyncgens)
    387         self._asyncgens.clear()
    388 
    389         shutdown_coro = tasks.gather(
    390             *[ag.aclose() for ag in closing_agens],
    391             return_exceptions=True,
    392             loop=self)
    393 
    394         results = yield from shutdown_coro
    395         for result, agen in zip(results, closing_agens):
    396             if isinstance(result, Exception):
    397                 self.call_exception_handler({
    398                     'message': 'an error occurred during closing of '
    399                                'asynchronous generator {!r}'.format(agen),
    400                     'exception': result,
    401                     'asyncgen': agen
    402                 })
    403 
    404     def run_forever(self):
    405         """Run until stop() is called."""
    406         self._check_closed()
    407         if self.is_running():
    408             raise RuntimeError('This event loop is already running')
    409         if events._get_running_loop() is not None:
    410             raise RuntimeError(
    411                 'Cannot run the event loop while another loop is running')
    412         self._set_coroutine_wrapper(self._debug)
    413         self._thread_id = threading.get_ident()
    414         if self._asyncgens is not None:
    415             old_agen_hooks = sys.get_asyncgen_hooks()
    416             sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
    417                                    finalizer=self._asyncgen_finalizer_hook)
    418         try:
    419             events._set_running_loop(self)
    420             while True:
    421                 self._run_once()
    422                 if self._stopping:
    423                     break
    424         finally:
    425             self._stopping = False
    426             self._thread_id = None
    427             events._set_running_loop(None)
    428             self._set_coroutine_wrapper(False)
    429             if self._asyncgens is not None:
    430                 sys.set_asyncgen_hooks(*old_agen_hooks)
    431 
    432     def run_until_complete(self, future):
    433         """Run until the Future is done.
    434 
    435         If the argument is a coroutine, it is wrapped in a Task.
    436 
    437         WARNING: It would be disastrous to call run_until_complete()
    438         with the same coroutine twice -- it would wrap it in two
    439         different Tasks and that can't be good.
    440 
    441         Return the Future's result, or raise its exception.
    442         """
    443         self._check_closed()
    444 
    445         new_task = not futures.isfuture(future)
    446         future = tasks.ensure_future(future, loop=self)
    447         if new_task:
    448             # An exception is raised if the future didn't complete, so there
    449             # is no need to log the "destroy pending task" message
    450             future._log_destroy_pending = False
    451 
    452         future.add_done_callback(_run_until_complete_cb)
    453         try:
    454             self.run_forever()
    455         except:
    456             if new_task and future.done() and not future.cancelled():
    457                 # The coroutine raised a BaseException. Consume the exception
    458                 # to not log a warning, the caller doesn't have access to the
    459                 # local task.
    460                 future.exception()
    461             raise
    462         future.remove_done_callback(_run_until_complete_cb)
    463         if not future.done():
    464             raise RuntimeError('Event loop stopped before Future completed.')
    465 
    466         return future.result()
    467 
    468     def stop(self):
    469         """Stop running the event loop.
    470 
    471         Every callback already scheduled will still run.  This simply informs
    472         run_forever to stop looping after a complete iteration.
    473         """
    474         self._stopping = True
    475 
    476     def close(self):
    477         """Close the event loop.
    478 
    479         This clears the queues and shuts down the executor,
    480         but does not wait for the executor to finish.
    481 
    482         The event loop must not be running.
    483         """
    484         if self.is_running():
    485             raise RuntimeError("Cannot close a running event loop")
    486         if self._closed:
    487             return
    488         if self._debug:
    489             logger.debug("Close %r", self)
    490         self._closed = True
    491         self._ready.clear()
    492         self._scheduled.clear()
    493         executor = self._default_executor
    494         if executor is not None:
    495             self._default_executor = None
    496             executor.shutdown(wait=False)
    497 
    498     def is_closed(self):
    499         """Returns True if the event loop was closed."""
    500         return self._closed
    501 
    502     # On Python 3.3 and older, objects with a destructor part of a reference
    503     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
    504     # to the PEP 442.
    505     if compat.PY34:
    506         def __del__(self):
    507             if not self.is_closed():
    508                 warnings.warn("unclosed event loop %r" % self, ResourceWarning,
    509                               source=self)
    510                 if not self.is_running():
    511                     self.close()
    512 
    513     def is_running(self):
    514         """Returns True if the event loop is running."""
    515         return (self._thread_id is not None)
    516 
    517     def time(self):
    518         """Return the time according to the event loop's clock.
    519 
    520         This is a float expressed in seconds since an epoch, but the
    521         epoch, precision, accuracy and drift are unspecified and may
    522         differ per event loop.
    523         """
    524         return time.monotonic()
    525 
    526     def call_later(self, delay, callback, *args):
    527         """Arrange for a callback to be called at a given time.
    528 
    529         Return a Handle: an opaque object with a cancel() method that
    530         can be used to cancel the call.
    531 
    532         The delay can be an int or float, expressed in seconds.  It is
    533         always relative to the current time.
    534 
    535         Each callback will be called exactly once.  If two callbacks
    536         are scheduled for exactly the same time, it undefined which
    537         will be called first.
    538 
    539         Any positional arguments after the callback will be passed to
    540         the callback when it is called.
    541         """
    542         timer = self.call_at(self.time() + delay, callback, *args)
    543         if timer._source_traceback:
    544             del timer._source_traceback[-1]
    545         return timer
    546 
    547     def call_at(self, when, callback, *args):
    548         """Like call_later(), but uses an absolute time.
    549 
    550         Absolute time corresponds to the event loop's time() method.
    551         """
    552         self._check_closed()
    553         if self._debug:
    554             self._check_thread()
    555             self._check_callback(callback, 'call_at')
    556         timer = events.TimerHandle(when, callback, args, self)
    557         if timer._source_traceback:
    558             del timer._source_traceback[-1]
    559         heapq.heappush(self._scheduled, timer)
    560         timer._scheduled = True
    561         return timer
    562 
    563     def call_soon(self, callback, *args):
    564         """Arrange for a callback to be called as soon as possible.
    565 
    566         This operates as a FIFO queue: callbacks are called in the
    567         order in which they are registered.  Each callback will be
    568         called exactly once.
    569 
    570         Any positional arguments after the callback will be passed to
    571         the callback when it is called.
    572         """
    573         self._check_closed()
    574         if self._debug:
    575             self._check_thread()
    576             self._check_callback(callback, 'call_soon')
    577         handle = self._call_soon(callback, args)
    578         if handle._source_traceback:
    579             del handle._source_traceback[-1]
    580         return handle
    581 
    582     def _check_callback(self, callback, method):
    583         if (coroutines.iscoroutine(callback) or
    584                 coroutines.iscoroutinefunction(callback)):
    585             raise TypeError(
    586                 "coroutines cannot be used with {}()".format(method))
    587         if not callable(callback):
    588             raise TypeError(
    589                 'a callable object was expected by {}(), got {!r}'.format(
    590                     method, callback))
    591 
    592 
    593     def _call_soon(self, callback, args):
    594         handle = events.Handle(callback, args, self)
    595         if handle._source_traceback:
    596             del handle._source_traceback[-1]
    597         self._ready.append(handle)
    598         return handle
    599 
    600     def _check_thread(self):
    601         """Check that the current thread is the thread running the event loop.
    602 
    603         Non-thread-safe methods of this class make this assumption and will
    604         likely behave incorrectly when the assumption is violated.
    605 
    606         Should only be called when (self._debug == True).  The caller is
    607         responsible for checking this condition for performance reasons.
    608         """
    609         if self._thread_id is None:
    610             return
    611         thread_id = threading.get_ident()
    612         if thread_id != self._thread_id:
    613             raise RuntimeError(
    614                 "Non-thread-safe operation invoked on an event loop other "
    615                 "than the current one")
    616 
    617     def call_soon_threadsafe(self, callback, *args):
    618         """Like call_soon(), but thread-safe."""
    619         self._check_closed()
    620         if self._debug:
    621             self._check_callback(callback, 'call_soon_threadsafe')
    622         handle = self._call_soon(callback, args)
    623         if handle._source_traceback:
    624             del handle._source_traceback[-1]
    625         self._write_to_self()
    626         return handle
    627 
    628     def run_in_executor(self, executor, func, *args):
    629         self._check_closed()
    630         if self._debug:
    631             self._check_callback(func, 'run_in_executor')
    632         if executor is None:
    633             executor = self._default_executor
    634             if executor is None:
    635                 executor = concurrent.futures.ThreadPoolExecutor()
    636                 self._default_executor = executor
    637         return futures.wrap_future(executor.submit(func, *args), loop=self)
    638 
    639     def set_default_executor(self, executor):
    640         self._default_executor = executor
    641 
    642     def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
    643         msg = ["%s:%r" % (host, port)]
    644         if family:
    645             msg.append('family=%r' % family)
    646         if type:
    647             msg.append('type=%r' % type)
    648         if proto:
    649             msg.append('proto=%r' % proto)
    650         if flags:
    651             msg.append('flags=%r' % flags)
    652         msg = ', '.join(msg)
    653         logger.debug('Get address info %s', msg)
    654 
    655         t0 = self.time()
    656         addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
    657         dt = self.time() - t0
    658 
    659         msg = ('Getting address info %s took %.3f ms: %r'
    660                % (msg, dt * 1e3, addrinfo))
    661         if dt >= self.slow_callback_duration:
    662             logger.info(msg)
    663         else:
    664             logger.debug(msg)
    665         return addrinfo
    666 
    667     def getaddrinfo(self, host, port, *,
    668                     family=0, type=0, proto=0, flags=0):
    669         if self._debug:
    670             return self.run_in_executor(None, self._getaddrinfo_debug,
    671                                         host, port, family, type, proto, flags)
    672         else:
    673             return self.run_in_executor(None, socket.getaddrinfo,
    674                                         host, port, family, type, proto, flags)
    675 
    676     def getnameinfo(self, sockaddr, flags=0):
    677         return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
    678 
    679     @coroutine
    680     def create_connection(self, protocol_factory, host=None, port=None, *,
    681                           ssl=None, family=0, proto=0, flags=0, sock=None,
    682                           local_addr=None, server_hostname=None):
    683         """Connect to a TCP server.
    684 
    685         Create a streaming transport connection to a given Internet host and
    686         port: socket family AF_INET or socket.AF_INET6 depending on host (or
    687         family if specified), socket type SOCK_STREAM. protocol_factory must be
    688         a callable returning a protocol instance.
    689 
    690         This method is a coroutine which will try to establish the connection
    691         in the background.  When successful, the coroutine returns a
    692         (transport, protocol) pair.
    693         """
    694         if server_hostname is not None and not ssl:
    695             raise ValueError('server_hostname is only meaningful with ssl')
    696 
    697         if server_hostname is None and ssl:
    698             # Use host as default for server_hostname.  It is an error
    699             # if host is empty or not set, e.g. when an
    700             # already-connected socket was passed or when only a port
    701             # is given.  To avoid this error, you can pass
    702             # server_hostname='' -- this will bypass the hostname
    703             # check.  (This also means that if host is a numeric
    704             # IP/IPv6 address, we will attempt to verify that exact
    705             # address; this will probably fail, but it is possible to
    706             # create a certificate for a specific IP address, so we
    707             # don't judge it here.)
    708             if not host:
    709                 raise ValueError('You must set server_hostname '
    710                                  'when using ssl without a host')
    711             server_hostname = host
    712 
    713         if host is not None or port is not None:
    714             if sock is not None:
    715                 raise ValueError(
    716                     'host/port and sock can not be specified at the same time')
    717 
    718             f1 = _ensure_resolved((host, port), family=family,
    719                                   type=socket.SOCK_STREAM, proto=proto,
    720                                   flags=flags, loop=self)
    721             fs = [f1]
    722             if local_addr is not None:
    723                 f2 = _ensure_resolved(local_addr, family=family,
    724                                       type=socket.SOCK_STREAM, proto=proto,
    725                                       flags=flags, loop=self)
    726                 fs.append(f2)
    727             else:
    728                 f2 = None
    729 
    730             yield from tasks.wait(fs, loop=self)
    731 
    732             infos = f1.result()
    733             if not infos:
    734                 raise OSError('getaddrinfo() returned empty list')
    735             if f2 is not None:
    736                 laddr_infos = f2.result()
    737                 if not laddr_infos:
    738                     raise OSError('getaddrinfo() returned empty list')
    739 
    740             exceptions = []
    741             for family, type, proto, cname, address in infos:
    742                 try:
    743                     sock = socket.socket(family=family, type=type, proto=proto)
    744                     sock.setblocking(False)
    745                     if f2 is not None:
    746                         for _, _, _, _, laddr in laddr_infos:
    747                             try:
    748                                 sock.bind(laddr)
    749                                 break
    750                             except OSError as exc:
    751                                 exc = OSError(
    752                                     exc.errno, 'error while '
    753                                     'attempting to bind on address '
    754                                     '{!r}: {}'.format(
    755                                         laddr, exc.strerror.lower()))
    756                                 exceptions.append(exc)
    757                         else:
    758                             sock.close()
    759                             sock = None
    760                             continue
    761                     if self._debug:
    762                         logger.debug("connect %r to %r", sock, address)
    763                     yield from self.sock_connect(sock, address)
    764                 except OSError as exc:
    765                     if sock is not None:
    766                         sock.close()
    767                     exceptions.append(exc)
    768                 except:
    769                     if sock is not None:
    770                         sock.close()
    771                     raise
    772                 else:
    773                     break
    774             else:
    775                 if len(exceptions) == 1:
    776                     raise exceptions[0]
    777                 else:
    778                     # If they all have the same str(), raise one.
    779                     model = str(exceptions[0])
    780                     if all(str(exc) == model for exc in exceptions):
    781                         raise exceptions[0]
    782                     # Raise a combined exception so the user can see all
    783                     # the various error messages.
    784                     raise OSError('Multiple exceptions: {}'.format(
    785                         ', '.join(str(exc) for exc in exceptions)))
    786 
    787         else:
    788             if sock is None:
    789                 raise ValueError(
    790                     'host and port was not specified and no sock specified')
    791             if not _is_stream_socket(sock):
    792                 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
    793                 # are SOCK_STREAM.
    794                 # We support passing AF_UNIX sockets even though we have
    795                 # a dedicated API for that: create_unix_connection.
    796                 # Disallowing AF_UNIX in this method, breaks backwards
    797                 # compatibility.
    798                 raise ValueError(
    799                     'A Stream Socket was expected, got {!r}'.format(sock))
    800 
    801         transport, protocol = yield from self._create_connection_transport(
    802             sock, protocol_factory, ssl, server_hostname)
    803         if self._debug:
    804             # Get the socket from the transport because SSL transport closes
    805             # the old socket and creates a new SSL socket
    806             sock = transport.get_extra_info('socket')
    807             logger.debug("%r connected to %s:%r: (%r, %r)",
    808                          sock, host, port, transport, protocol)
    809         return transport, protocol
    810 
    811     @coroutine
    812     def _create_connection_transport(self, sock, protocol_factory, ssl,
    813                                      server_hostname, server_side=False):
    814 
    815         sock.setblocking(False)
    816 
    817         protocol = protocol_factory()
    818         waiter = self.create_future()
    819         if ssl:
    820             sslcontext = None if isinstance(ssl, bool) else ssl
    821             transport = self._make_ssl_transport(
    822                 sock, protocol, sslcontext, waiter,
    823                 server_side=server_side, server_hostname=server_hostname)
    824         else:
    825             transport = self._make_socket_transport(sock, protocol, waiter)
    826 
    827         try:
    828             yield from waiter
    829         except:
    830             transport.close()
    831             raise
    832 
    833         return transport, protocol
    834 
    835     @coroutine
    836     def create_datagram_endpoint(self, protocol_factory,
    837                                  local_addr=None, remote_addr=None, *,
    838                                  family=0, proto=0, flags=0,
    839                                  reuse_address=None, reuse_port=None,
    840                                  allow_broadcast=None, sock=None):
    841         """Create datagram connection."""
    842         if sock is not None:
    843             if not _is_dgram_socket(sock):
    844                 raise ValueError(
    845                     'A UDP Socket was expected, got {!r}'.format(sock))
    846             if (local_addr or remote_addr or
    847                     family or proto or flags or
    848                     reuse_address or reuse_port or allow_broadcast):
    849                 # show the problematic kwargs in exception msg
    850                 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
    851                             family=family, proto=proto, flags=flags,
    852                             reuse_address=reuse_address, reuse_port=reuse_port,
    853                             allow_broadcast=allow_broadcast)
    854                 problems = ', '.join(
    855                     '{}={}'.format(k, v) for k, v in opts.items() if v)
    856                 raise ValueError(
    857                     'socket modifier keyword arguments can not be used '
    858                     'when sock is specified. ({})'.format(problems))
    859             sock.setblocking(False)
    860             r_addr = None
    861         else:
    862             if not (local_addr or remote_addr):
    863                 if family == 0:
    864                     raise ValueError('unexpected address family')
    865                 addr_pairs_info = (((family, proto), (None, None)),)
    866             else:
    867                 # join address by (family, protocol)
    868                 addr_infos = collections.OrderedDict()
    869                 for idx, addr in ((0, local_addr), (1, remote_addr)):
    870                     if addr is not None:
    871                         assert isinstance(addr, tuple) and len(addr) == 2, (
    872                             '2-tuple is expected')
    873 
    874                         infos = yield from _ensure_resolved(
    875                             addr, family=family, type=socket.SOCK_DGRAM,
    876                             proto=proto, flags=flags, loop=self)
    877                         if not infos:
    878                             raise OSError('getaddrinfo() returned empty list')
    879 
    880                         for fam, _, pro, _, address in infos:
    881                             key = (fam, pro)
    882                             if key not in addr_infos:
    883                                 addr_infos[key] = [None, None]
    884                             addr_infos[key][idx] = address
    885 
    886                 # each addr has to have info for each (family, proto) pair
    887                 addr_pairs_info = [
    888                     (key, addr_pair) for key, addr_pair in addr_infos.items()
    889                     if not ((local_addr and addr_pair[0] is None) or
    890                             (remote_addr and addr_pair[1] is None))]
    891 
    892                 if not addr_pairs_info:
    893                     raise ValueError('can not get address information')
    894 
    895             exceptions = []
    896 
    897             if reuse_address is None:
    898                 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
    899 
    900             for ((family, proto),
    901                  (local_address, remote_address)) in addr_pairs_info:
    902                 sock = None
    903                 r_addr = None
    904                 try:
    905                     sock = socket.socket(
    906                         family=family, type=socket.SOCK_DGRAM, proto=proto)
    907                     if reuse_address:
    908                         sock.setsockopt(
    909                             socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    910                     if reuse_port:
    911                         _set_reuseport(sock)
    912                     if allow_broadcast:
    913                         sock.setsockopt(
    914                             socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    915                     sock.setblocking(False)
    916 
    917                     if local_addr:
    918                         sock.bind(local_address)
    919                     if remote_addr:
    920                         yield from self.sock_connect(sock, remote_address)
    921                         r_addr = remote_address
    922                 except OSError as exc:
    923                     if sock is not None:
    924                         sock.close()
    925                     exceptions.append(exc)
    926                 except:
    927                     if sock is not None:
    928                         sock.close()
    929                     raise
    930                 else:
    931                     break
    932             else:
    933                 raise exceptions[0]
    934 
    935         protocol = protocol_factory()
    936         waiter = self.create_future()
    937         transport = self._make_datagram_transport(
    938             sock, protocol, r_addr, waiter)
    939         if self._debug:
    940             if local_addr:
    941                 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
    942                             "created: (%r, %r)",
    943                             local_addr, remote_addr, transport, protocol)
    944             else:
    945                 logger.debug("Datagram endpoint remote_addr=%r created: "
    946                              "(%r, %r)",
    947                              remote_addr, transport, protocol)
    948 
    949         try:
    950             yield from waiter
    951         except:
    952             transport.close()
    953             raise
    954 
    955         return transport, protocol
    956 
    957     @coroutine
    958     def _create_server_getaddrinfo(self, host, port, family, flags):
    959         infos = yield from _ensure_resolved((host, port), family=family,
    960                                             type=socket.SOCK_STREAM,
    961                                             flags=flags, loop=self)
    962         if not infos:
    963             raise OSError('getaddrinfo({!r}) returned empty list'.format(host))
    964         return infos
    965 
    966     @coroutine
    967     def create_server(self, protocol_factory, host=None, port=None,
    968                       *,
    969                       family=socket.AF_UNSPEC,
    970                       flags=socket.AI_PASSIVE,
    971                       sock=None,
    972                       backlog=100,
    973                       ssl=None,
    974                       reuse_address=None,
    975                       reuse_port=None):
    976         """Create a TCP server.
    977 
    978         The host parameter can be a string, in that case the TCP server is bound
    979         to host and port.
    980 
    981         The host parameter can also be a sequence of strings and in that case
    982         the TCP server is bound to all hosts of the sequence. If a host
    983         appears multiple times (possibly indirectly e.g. when hostnames
    984         resolve to the same IP address), the server is only bound once to that
    985         host.
    986 
    987         Return a Server object which can be used to stop the service.
    988 
    989         This method is a coroutine.
    990         """
    991         if isinstance(ssl, bool):
    992             raise TypeError('ssl argument must be an SSLContext or None')
    993         if host is not None or port is not None:
    994             if sock is not None:
    995                 raise ValueError(
    996                     'host/port and sock can not be specified at the same time')
    997 
    998             AF_INET6 = getattr(socket, 'AF_INET6', 0)
    999             if reuse_address is None:
   1000                 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
   1001             sockets = []
   1002             if host == '':
   1003                 hosts = [None]
   1004             elif (isinstance(host, str) or
   1005                   not isinstance(host, collections.Iterable)):
   1006                 hosts = [host]
   1007             else:
   1008                 hosts = host
   1009 
   1010             fs = [self._create_server_getaddrinfo(host, port, family=family,
   1011                                                   flags=flags)
   1012                   for host in hosts]
   1013             infos = yield from tasks.gather(*fs, loop=self)
   1014             infos = set(itertools.chain.from_iterable(infos))
   1015 
   1016             completed = False
   1017             try:
   1018                 for res in infos:
   1019                     af, socktype, proto, canonname, sa = res
   1020                     try:
   1021                         sock = socket.socket(af, socktype, proto)
   1022                     except socket.error:
   1023                         # Assume it's a bad family/type/protocol combination.
   1024                         if self._debug:
   1025                             logger.warning('create_server() failed to create '
   1026                                            'socket.socket(%r, %r, %r)',
   1027                                            af, socktype, proto, exc_info=True)
   1028                         continue
   1029                     sockets.append(sock)
   1030                     if reuse_address:
   1031                         sock.setsockopt(
   1032                             socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
   1033                     if reuse_port:
   1034                         _set_reuseport(sock)
   1035                     # Disable IPv4/IPv6 dual stack support (enabled by
   1036                     # default on Linux) which makes a single socket
   1037                     # listen on both address families.
   1038                     if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
   1039                         sock.setsockopt(socket.IPPROTO_IPV6,
   1040                                         socket.IPV6_V6ONLY,
   1041                                         True)
   1042                     try:
   1043                         sock.bind(sa)
   1044                     except OSError as err:
   1045                         raise OSError(err.errno, 'error while attempting '
   1046                                       'to bind on address %r: %s'
   1047                                       % (sa, err.strerror.lower()))
   1048                 completed = True
   1049             finally:
   1050                 if not completed:
   1051                     for sock in sockets:
   1052                         sock.close()
   1053         else:
   1054             if sock is None:
   1055                 raise ValueError('Neither host/port nor sock were specified')
   1056             if not _is_stream_socket(sock):
   1057                 raise ValueError(
   1058                     'A Stream Socket was expected, got {!r}'.format(sock))
   1059             sockets = [sock]
   1060 
   1061         server = Server(self, sockets)
   1062         for sock in sockets:
   1063             sock.listen(backlog)
   1064             sock.setblocking(False)
   1065             self._start_serving(protocol_factory, sock, ssl, server, backlog)
   1066         if self._debug:
   1067             logger.info("%r is serving", server)
   1068         return server
   1069 
   1070     @coroutine
   1071     def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None):
   1072         """Handle an accepted connection.
   1073 
   1074         This is used by servers that accept connections outside of
   1075         asyncio but that use asyncio to handle connections.
   1076 
   1077         This method is a coroutine.  When completed, the coroutine
   1078         returns a (transport, protocol) pair.
   1079         """
   1080         if not _is_stream_socket(sock):
   1081             raise ValueError(
   1082                 'A Stream Socket was expected, got {!r}'.format(sock))
   1083 
   1084         transport, protocol = yield from self._create_connection_transport(
   1085             sock, protocol_factory, ssl, '', server_side=True)
   1086         if self._debug:
   1087             # Get the socket from the transport because SSL transport closes
   1088             # the old socket and creates a new SSL socket
   1089             sock = transport.get_extra_info('socket')
   1090             logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
   1091         return transport, protocol
   1092 
   1093     @coroutine
   1094     def connect_read_pipe(self, protocol_factory, pipe):
   1095         protocol = protocol_factory()
   1096         waiter = self.create_future()
   1097         transport = self._make_read_pipe_transport(pipe, protocol, waiter)
   1098 
   1099         try:
   1100             yield from waiter
   1101         except:
   1102             transport.close()
   1103             raise
   1104 
   1105         if self._debug:
   1106             logger.debug('Read pipe %r connected: (%r, %r)',
   1107                          pipe.fileno(), transport, protocol)
   1108         return transport, protocol
   1109 
   1110     @coroutine
   1111     def connect_write_pipe(self, protocol_factory, pipe):
   1112         protocol = protocol_factory()
   1113         waiter = self.create_future()
   1114         transport = self._make_write_pipe_transport(pipe, protocol, waiter)
   1115 
   1116         try:
   1117             yield from waiter
   1118         except:
   1119             transport.close()
   1120             raise
   1121 
   1122         if self._debug:
   1123             logger.debug('Write pipe %r connected: (%r, %r)',
   1124                          pipe.fileno(), transport, protocol)
   1125         return transport, protocol
   1126 
   1127     def _log_subprocess(self, msg, stdin, stdout, stderr):
   1128         info = [msg]
   1129         if stdin is not None:
   1130             info.append('stdin=%s' % _format_pipe(stdin))
   1131         if stdout is not None and stderr == subprocess.STDOUT:
   1132             info.append('stdout=stderr=%s' % _format_pipe(stdout))
   1133         else:
   1134             if stdout is not None:
   1135                 info.append('stdout=%s' % _format_pipe(stdout))
   1136             if stderr is not None:
   1137                 info.append('stderr=%s' % _format_pipe(stderr))
   1138         logger.debug(' '.join(info))
   1139 
   1140     @coroutine
   1141     def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
   1142                          stdout=subprocess.PIPE, stderr=subprocess.PIPE,
   1143                          universal_newlines=False, shell=True, bufsize=0,
   1144                          **kwargs):
   1145         if not isinstance(cmd, (bytes, str)):
   1146             raise ValueError("cmd must be a string")
   1147         if universal_newlines:
   1148             raise ValueError("universal_newlines must be False")
   1149         if not shell:
   1150             raise ValueError("shell must be True")
   1151         if bufsize != 0:
   1152             raise ValueError("bufsize must be 0")
   1153         protocol = protocol_factory()
   1154         if self._debug:
   1155             # don't log parameters: they may contain sensitive information
   1156             # (password) and may be too long
   1157             debug_log = 'run shell command %r' % cmd
   1158             self._log_subprocess(debug_log, stdin, stdout, stderr)
   1159         transport = yield from self._make_subprocess_transport(
   1160             protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
   1161         if self._debug:
   1162             logger.info('%s: %r', debug_log, transport)
   1163         return transport, protocol
   1164 
   1165     @coroutine
   1166     def subprocess_exec(self, protocol_factory, program, *args,
   1167                         stdin=subprocess.PIPE, stdout=subprocess.PIPE,
   1168                         stderr=subprocess.PIPE, universal_newlines=False,
   1169                         shell=False, bufsize=0, **kwargs):
   1170         if universal_newlines:
   1171             raise ValueError("universal_newlines must be False")
   1172         if shell:
   1173             raise ValueError("shell must be False")
   1174         if bufsize != 0:
   1175             raise ValueError("bufsize must be 0")
   1176         popen_args = (program,) + args
   1177         for arg in popen_args:
   1178             if not isinstance(arg, (str, bytes)):
   1179                 raise TypeError("program arguments must be "
   1180                                 "a bytes or text string, not %s"
   1181                                 % type(arg).__name__)
   1182         protocol = protocol_factory()
   1183         if self._debug:
   1184             # don't log parameters: they may contain sensitive information
   1185             # (password) and may be too long
   1186             debug_log = 'execute program %r' % program
   1187             self._log_subprocess(debug_log, stdin, stdout, stderr)
   1188         transport = yield from self._make_subprocess_transport(
   1189             protocol, popen_args, False, stdin, stdout, stderr,
   1190             bufsize, **kwargs)
   1191         if self._debug:
   1192             logger.info('%s: %r', debug_log, transport)
   1193         return transport, protocol
   1194 
   1195     def get_exception_handler(self):
   1196         """Return an exception handler, or None if the default one is in use.
   1197         """
   1198         return self._exception_handler
   1199 
   1200     def set_exception_handler(self, handler):
   1201         """Set handler as the new event loop exception handler.
   1202 
   1203         If handler is None, the default exception handler will
   1204         be set.
   1205 
   1206         If handler is a callable object, it should have a
   1207         signature matching '(loop, context)', where 'loop'
   1208         will be a reference to the active event loop, 'context'
   1209         will be a dict object (see `call_exception_handler()`
   1210         documentation for details about context).
   1211         """
   1212         if handler is not None and not callable(handler):
   1213             raise TypeError('A callable object or None is expected, '
   1214                             'got {!r}'.format(handler))
   1215         self._exception_handler = handler
   1216 
   1217     def default_exception_handler(self, context):
   1218         """Default exception handler.
   1219 
   1220         This is called when an exception occurs and no exception
   1221         handler is set, and can be called by a custom exception
   1222         handler that wants to defer to the default behavior.
   1223 
   1224         The context parameter has the same meaning as in
   1225         `call_exception_handler()`.
   1226         """
   1227         message = context.get('message')
   1228         if not message:
   1229             message = 'Unhandled exception in event loop'
   1230 
   1231         exception = context.get('exception')
   1232         if exception is not None:
   1233             exc_info = (type(exception), exception, exception.__traceback__)
   1234         else:
   1235             exc_info = False
   1236 
   1237         if ('source_traceback' not in context
   1238         and self._current_handle is not None
   1239         and self._current_handle._source_traceback):
   1240             context['handle_traceback'] = self._current_handle._source_traceback
   1241 
   1242         log_lines = [message]
   1243         for key in sorted(context):
   1244             if key in {'message', 'exception'}:
   1245                 continue
   1246             value = context[key]
   1247             if key == 'source_traceback':
   1248                 tb = ''.join(traceback.format_list(value))
   1249                 value = 'Object created at (most recent call last):\n'
   1250                 value += tb.rstrip()
   1251             elif key == 'handle_traceback':
   1252                 tb = ''.join(traceback.format_list(value))
   1253                 value = 'Handle created at (most recent call last):\n'
   1254                 value += tb.rstrip()
   1255             else:
   1256                 value = repr(value)
   1257             log_lines.append('{}: {}'.format(key, value))
   1258 
   1259         logger.error('\n'.join(log_lines), exc_info=exc_info)
   1260 
   1261     def call_exception_handler(self, context):
   1262         """Call the current event loop's exception handler.
   1263 
   1264         The context argument is a dict containing the following keys:
   1265 
   1266         - 'message': Error message;
   1267         - 'exception' (optional): Exception object;
   1268         - 'future' (optional): Future instance;
   1269         - 'handle' (optional): Handle instance;
   1270         - 'protocol' (optional): Protocol instance;
   1271         - 'transport' (optional): Transport instance;
   1272         - 'socket' (optional): Socket instance;
   1273         - 'asyncgen' (optional): Asynchronous generator that caused
   1274                                  the exception.
   1275 
   1276         New keys maybe introduced in the future.
   1277 
   1278         Note: do not overload this method in an event loop subclass.
   1279         For custom exception handling, use the
   1280         `set_exception_handler()` method.
   1281         """
   1282         if self._exception_handler is None:
   1283             try:
   1284                 self.default_exception_handler(context)
   1285             except Exception:
   1286                 # Second protection layer for unexpected errors
   1287                 # in the default implementation, as well as for subclassed
   1288                 # event loops with overloaded "default_exception_handler".
   1289                 logger.error('Exception in default exception handler',
   1290                              exc_info=True)
   1291         else:
   1292             try:
   1293                 self._exception_handler(self, context)
   1294             except Exception as exc:
   1295                 # Exception in the user set custom exception handler.
   1296                 try:
   1297                     # Let's try default handler.
   1298                     self.default_exception_handler({
   1299                         'message': 'Unhandled error in exception handler',
   1300                         'exception': exc,
   1301                         'context': context,
   1302                     })
   1303                 except Exception:
   1304                     # Guard 'default_exception_handler' in case it is
   1305                     # overloaded.
   1306                     logger.error('Exception in default exception handler '
   1307                                  'while handling an unexpected error '
   1308                                  'in custom exception handler',
   1309                                  exc_info=True)
   1310 
   1311     def _add_callback(self, handle):
   1312         """Add a Handle to _scheduled (TimerHandle) or _ready."""
   1313         assert isinstance(handle, events.Handle), 'A Handle is required here'
   1314         if handle._cancelled:
   1315             return
   1316         assert not isinstance(handle, events.TimerHandle)
   1317         self._ready.append(handle)
   1318 
   1319     def _add_callback_signalsafe(self, handle):
   1320         """Like _add_callback() but called from a signal handler."""
   1321         self._add_callback(handle)
   1322         self._write_to_self()
   1323 
   1324     def _timer_handle_cancelled(self, handle):
   1325         """Notification that a TimerHandle has been cancelled."""
   1326         if handle._scheduled:
   1327             self._timer_cancelled_count += 1
   1328 
   1329     def _run_once(self):
   1330         """Run one full iteration of the event loop.
   1331 
   1332         This calls all currently ready callbacks, polls for I/O,
   1333         schedules the resulting callbacks, and finally schedules
   1334         'call_later' callbacks.
   1335         """
   1336 
   1337         sched_count = len(self._scheduled)
   1338         if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
   1339             self._timer_cancelled_count / sched_count >
   1340                 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
   1341             # Remove delayed calls that were cancelled if their number
   1342             # is too high
   1343             new_scheduled = []
   1344             for handle in self._scheduled:
   1345                 if handle._cancelled:
   1346                     handle._scheduled = False
   1347                 else:
   1348                     new_scheduled.append(handle)
   1349 
   1350             heapq.heapify(new_scheduled)
   1351             self._scheduled = new_scheduled
   1352             self._timer_cancelled_count = 0
   1353         else:
   1354             # Remove delayed calls that were cancelled from head of queue.
   1355             while self._scheduled and self._scheduled[0]._cancelled:
   1356                 self._timer_cancelled_count -= 1
   1357                 handle = heapq.heappop(self._scheduled)
   1358                 handle._scheduled = False
   1359 
   1360         timeout = None
   1361         if self._ready or self._stopping:
   1362             timeout = 0
   1363         elif self._scheduled:
   1364             # Compute the desired timeout.
   1365             when = self._scheduled[0]._when
   1366             timeout = max(0, when - self.time())
   1367 
   1368         if self._debug and timeout != 0:
   1369             t0 = self.time()
   1370             event_list = self._selector.select(timeout)
   1371             dt = self.time() - t0
   1372             if dt >= 1.0:
   1373                 level = logging.INFO
   1374             else:
   1375                 level = logging.DEBUG
   1376             nevent = len(event_list)
   1377             if timeout is None:
   1378                 logger.log(level, 'poll took %.3f ms: %s events',
   1379                            dt * 1e3, nevent)
   1380             elif nevent:
   1381                 logger.log(level,
   1382                            'poll %.3f ms took %.3f ms: %s events',
   1383                            timeout * 1e3, dt * 1e3, nevent)
   1384             elif dt >= 1.0:
   1385                 logger.log(level,
   1386                            'poll %.3f ms took %.3f ms: timeout',
   1387                            timeout * 1e3, dt * 1e3)
   1388         else:
   1389             event_list = self._selector.select(timeout)
   1390         self._process_events(event_list)
   1391 
   1392         # Handle 'later' callbacks that are ready.
   1393         end_time = self.time() + self._clock_resolution
   1394         while self._scheduled:
   1395             handle = self._scheduled[0]
   1396             if handle._when >= end_time:
   1397                 break
   1398             handle = heapq.heappop(self._scheduled)
   1399             handle._scheduled = False
   1400             self._ready.append(handle)
   1401 
   1402         # This is the only place where callbacks are actually *called*.
   1403         # All other places just add them to ready.
   1404         # Note: We run all currently scheduled callbacks, but not any
   1405         # callbacks scheduled by callbacks run this time around --
   1406         # they will be run the next time (after another I/O poll).
   1407         # Use an idiom that is thread-safe without using locks.
   1408         ntodo = len(self._ready)
   1409         for i in range(ntodo):
   1410             handle = self._ready.popleft()
   1411             if handle._cancelled:
   1412                 continue
   1413             if self._debug:
   1414                 try:
   1415                     self._current_handle = handle
   1416                     t0 = self.time()
   1417                     handle._run()
   1418                     dt = self.time() - t0
   1419                     if dt >= self.slow_callback_duration:
   1420                         logger.warning('Executing %s took %.3f seconds',
   1421                                        _format_handle(handle), dt)
   1422                 finally:
   1423                     self._current_handle = None
   1424             else:
   1425                 handle._run()
   1426         handle = None  # Needed to break cycles when an exception occurs.
   1427 
   1428     def _set_coroutine_wrapper(self, enabled):
   1429         try:
   1430             set_wrapper = sys.set_coroutine_wrapper
   1431             get_wrapper = sys.get_coroutine_wrapper
   1432         except AttributeError:
   1433             return
   1434 
   1435         enabled = bool(enabled)
   1436         if self._coroutine_wrapper_set == enabled:
   1437             return
   1438 
   1439         wrapper = coroutines.debug_wrapper
   1440         current_wrapper = get_wrapper()
   1441 
   1442         if enabled:
   1443             if current_wrapper not in (None, wrapper):
   1444                 warnings.warn(
   1445                     "loop.set_debug(True): cannot set debug coroutine "
   1446                     "wrapper; another wrapper is already set %r" %
   1447                     current_wrapper, RuntimeWarning)
   1448             else:
   1449                 set_wrapper(wrapper)
   1450                 self._coroutine_wrapper_set = True
   1451         else:
   1452             if current_wrapper not in (None, wrapper):
   1453                 warnings.warn(
   1454                     "loop.set_debug(False): cannot unset debug coroutine "
   1455                     "wrapper; another wrapper was set %r" %
   1456                     current_wrapper, RuntimeWarning)
   1457             else:
   1458                 set_wrapper(None)
   1459                 self._coroutine_wrapper_set = False
   1460 
   1461     def get_debug(self):
   1462         return self._debug
   1463 
   1464     def set_debug(self, enabled):
   1465         self._debug = enabled
   1466 
   1467         if self.is_running():
   1468             self._set_coroutine_wrapper(enabled)
   1469