1 """Base implementation of event loop. 2 3 The event loop can be broken up into a multiplexer (the part 4 responsible for notifying us of I/O events) and the event loop proper, 5 which wraps a multiplexer with functionality for scheduling callbacks, 6 immediately or at a given time in the future. 7 8 Whenever a public API takes a callback, subsequent positional 9 arguments will be passed to the callback if/when it is called. This 10 avoids the proliferation of trivial lambdas implementing closures. 11 Keyword arguments for the callback are not supported; this is a 12 conscious design decision, leaving the door open for keyword arguments 13 to modify the meaning of the API call itself. 14 """ 15 16 import collections 17 import concurrent.futures 18 import heapq 19 import inspect 20 import itertools 21 import logging 22 import os 23 import socket 24 import subprocess 25 import threading 26 import time 27 import traceback 28 import sys 29 import warnings 30 import weakref 31 32 from . import compat 33 from . import coroutines 34 from . import events 35 from . import futures 36 from . import tasks 37 from .coroutines import coroutine 38 from .log import logger 39 40 41 __all__ = ['BaseEventLoop'] 42 43 44 # Minimum number of _scheduled timer handles before cleanup of 45 # cancelled handles is performed. 46 _MIN_SCHEDULED_TIMER_HANDLES = 100 47 48 # Minimum fraction of _scheduled timer handles that are cancelled 49 # before cleanup of cancelled handles is performed. 50 _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 51 52 # Exceptions which must not call the exception handler in fatal error 53 # methods (_fatal_error()) 54 _FATAL_ERROR_IGNORE = (BrokenPipeError, 55 ConnectionResetError, ConnectionAbortedError) 56 57 58 def _format_handle(handle): 59 cb = handle._callback 60 if isinstance(getattr(cb, '__self__', None), tasks.Task): 61 # format the task 62 return repr(cb.__self__) 63 else: 64 return str(handle) 65 66 67 def _format_pipe(fd): 68 if fd == subprocess.PIPE: 69 return '<pipe>' 70 elif fd == subprocess.STDOUT: 71 return '<stdout>' 72 else: 73 return repr(fd) 74 75 76 def _set_reuseport(sock): 77 if not hasattr(socket, 'SO_REUSEPORT'): 78 raise ValueError('reuse_port not supported by socket module') 79 else: 80 try: 81 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 82 except OSError: 83 raise ValueError('reuse_port not supported by socket module, ' 84 'SO_REUSEPORT defined but not implemented.') 85 86 87 def _is_stream_socket(sock): 88 # Linux's socket.type is a bitmask that can include extra info 89 # about socket, therefore we can't do simple 90 # `sock_type == socket.SOCK_STREAM`. 91 return (sock.type & socket.SOCK_STREAM) == socket.SOCK_STREAM 92 93 94 def _is_dgram_socket(sock): 95 # Linux's socket.type is a bitmask that can include extra info 96 # about socket, therefore we can't do simple 97 # `sock_type == socket.SOCK_DGRAM`. 98 return (sock.type & socket.SOCK_DGRAM) == socket.SOCK_DGRAM 99 100 101 def _ipaddr_info(host, port, family, type, proto): 102 # Try to skip getaddrinfo if "host" is already an IP. Users might have 103 # handled name resolution in their own code and pass in resolved IPs. 104 if not hasattr(socket, 'inet_pton'): 105 return 106 107 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 108 host is None: 109 return None 110 111 if type == socket.SOCK_STREAM: 112 # Linux only: 113 # getaddrinfo() can raise when socket.type is a bit mask. 114 # So if socket.type is a bit mask of SOCK_STREAM, and say 115 # SOCK_NONBLOCK, we simply return None, which will trigger 116 # a call to getaddrinfo() letting it process this request. 117 proto = socket.IPPROTO_TCP 118 elif type == socket.SOCK_DGRAM: 119 proto = socket.IPPROTO_UDP 120 else: 121 return None 122 123 if port is None: 124 port = 0 125 elif isinstance(port, bytes) and port == b'': 126 port = 0 127 elif isinstance(port, str) and port == '': 128 port = 0 129 else: 130 # If port's a service name like "http", don't skip getaddrinfo. 131 try: 132 port = int(port) 133 except (TypeError, ValueError): 134 return None 135 136 if family == socket.AF_UNSPEC: 137 afs = [socket.AF_INET] 138 if hasattr(socket, 'AF_INET6'): 139 afs.append(socket.AF_INET6) 140 else: 141 afs = [family] 142 143 if isinstance(host, bytes): 144 host = host.decode('idna') 145 if '%' in host: 146 # Linux's inet_pton doesn't accept an IPv6 zone index after host, 147 # like '::1%lo0'. 148 return None 149 150 for af in afs: 151 try: 152 socket.inet_pton(af, host) 153 # The host has already been resolved. 154 return af, type, proto, '', (host, port) 155 except OSError: 156 pass 157 158 # "host" is not an IP address. 159 return None 160 161 162 def _ensure_resolved(address, *, family=0, type=socket.SOCK_STREAM, proto=0, 163 flags=0, loop): 164 host, port = address[:2] 165 info = _ipaddr_info(host, port, family, type, proto) 166 if info is not None: 167 # "host" is already a resolved IP. 168 fut = loop.create_future() 169 fut.set_result([info]) 170 return fut 171 else: 172 return loop.getaddrinfo(host, port, family=family, type=type, 173 proto=proto, flags=flags) 174 175 176 def _run_until_complete_cb(fut): 177 exc = fut._exception 178 if (isinstance(exc, BaseException) 179 and not isinstance(exc, Exception)): 180 # Issue #22429: run_forever() already finished, no need to 181 # stop it. 182 return 183 fut._loop.stop() 184 185 186 class Server(events.AbstractServer): 187 188 def __init__(self, loop, sockets): 189 self._loop = loop 190 self.sockets = sockets 191 self._active_count = 0 192 self._waiters = [] 193 194 def __repr__(self): 195 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets) 196 197 def _attach(self): 198 assert self.sockets is not None 199 self._active_count += 1 200 201 def _detach(self): 202 assert self._active_count > 0 203 self._active_count -= 1 204 if self._active_count == 0 and self.sockets is None: 205 self._wakeup() 206 207 def close(self): 208 sockets = self.sockets 209 if sockets is None: 210 return 211 self.sockets = None 212 for sock in sockets: 213 self._loop._stop_serving(sock) 214 if self._active_count == 0: 215 self._wakeup() 216 217 def _wakeup(self): 218 waiters = self._waiters 219 self._waiters = None 220 for waiter in waiters: 221 if not waiter.done(): 222 waiter.set_result(waiter) 223 224 @coroutine 225 def wait_closed(self): 226 if self.sockets is None or self._waiters is None: 227 return 228 waiter = self._loop.create_future() 229 self._waiters.append(waiter) 230 yield from waiter 231 232 233 class BaseEventLoop(events.AbstractEventLoop): 234 235 def __init__(self): 236 self._timer_cancelled_count = 0 237 self._closed = False 238 self._stopping = False 239 self._ready = collections.deque() 240 self._scheduled = [] 241 self._default_executor = None 242 self._internal_fds = 0 243 # Identifier of the thread running the event loop, or None if the 244 # event loop is not running 245 self._thread_id = None 246 self._clock_resolution = time.get_clock_info('monotonic').resolution 247 self._exception_handler = None 248 self.set_debug((not sys.flags.ignore_environment 249 and bool(os.environ.get('PYTHONASYNCIODEBUG')))) 250 # In debug mode, if the execution of a callback or a step of a task 251 # exceed this duration in seconds, the slow callback/task is logged. 252 self.slow_callback_duration = 0.1 253 self._current_handle = None 254 self._task_factory = None 255 self._coroutine_wrapper_set = False 256 257 if hasattr(sys, 'get_asyncgen_hooks'): 258 # Python >= 3.6 259 # A weak set of all asynchronous generators that are 260 # being iterated by the loop. 261 self._asyncgens = weakref.WeakSet() 262 else: 263 self._asyncgens = None 264 265 # Set to True when `loop.shutdown_asyncgens` is called. 266 self._asyncgens_shutdown_called = False 267 268 def __repr__(self): 269 return ('<%s running=%s closed=%s debug=%s>' 270 % (self.__class__.__name__, self.is_running(), 271 self.is_closed(), self.get_debug())) 272 273 def create_future(self): 274 """Create a Future object attached to the loop.""" 275 return futures.Future(loop=self) 276 277 def create_task(self, coro): 278 """Schedule a coroutine object. 279 280 Return a task object. 281 """ 282 self._check_closed() 283 if self._task_factory is None: 284 task = tasks.Task(coro, loop=self) 285 if task._source_traceback: 286 del task._source_traceback[-1] 287 else: 288 task = self._task_factory(self, coro) 289 return task 290 291 def set_task_factory(self, factory): 292 """Set a task factory that will be used by loop.create_task(). 293 294 If factory is None the default task factory will be set. 295 296 If factory is a callable, it should have a signature matching 297 '(loop, coro)', where 'loop' will be a reference to the active 298 event loop, 'coro' will be a coroutine object. The callable 299 must return a Future. 300 """ 301 if factory is not None and not callable(factory): 302 raise TypeError('task factory must be a callable or None') 303 self._task_factory = factory 304 305 def get_task_factory(self): 306 """Return a task factory, or None if the default one is in use.""" 307 return self._task_factory 308 309 def _make_socket_transport(self, sock, protocol, waiter=None, *, 310 extra=None, server=None): 311 """Create socket transport.""" 312 raise NotImplementedError 313 314 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None, 315 *, server_side=False, server_hostname=None, 316 extra=None, server=None): 317 """Create SSL transport.""" 318 raise NotImplementedError 319 320 def _make_datagram_transport(self, sock, protocol, 321 address=None, waiter=None, extra=None): 322 """Create datagram transport.""" 323 raise NotImplementedError 324 325 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 326 extra=None): 327 """Create read pipe transport.""" 328 raise NotImplementedError 329 330 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 331 extra=None): 332 """Create write pipe transport.""" 333 raise NotImplementedError 334 335 @coroutine 336 def _make_subprocess_transport(self, protocol, args, shell, 337 stdin, stdout, stderr, bufsize, 338 extra=None, **kwargs): 339 """Create subprocess transport.""" 340 raise NotImplementedError 341 342 def _write_to_self(self): 343 """Write a byte to self-pipe, to wake up the event loop. 344 345 This may be called from a different thread. 346 347 The subclass is responsible for implementing the self-pipe. 348 """ 349 raise NotImplementedError 350 351 def _process_events(self, event_list): 352 """Process selector events.""" 353 raise NotImplementedError 354 355 def _check_closed(self): 356 if self._closed: 357 raise RuntimeError('Event loop is closed') 358 359 def _asyncgen_finalizer_hook(self, agen): 360 self._asyncgens.discard(agen) 361 if not self.is_closed(): 362 self.create_task(agen.aclose()) 363 # Wake up the loop if the finalizer was called from 364 # a different thread. 365 self._write_to_self() 366 367 def _asyncgen_firstiter_hook(self, agen): 368 if self._asyncgens_shutdown_called: 369 warnings.warn( 370 "asynchronous generator {!r} was scheduled after " 371 "loop.shutdown_asyncgens() call".format(agen), 372 ResourceWarning, source=self) 373 374 self._asyncgens.add(agen) 375 376 @coroutine 377 def shutdown_asyncgens(self): 378 """Shutdown all active asynchronous generators.""" 379 self._asyncgens_shutdown_called = True 380 381 if self._asyncgens is None or not len(self._asyncgens): 382 # If Python version is <3.6 or we don't have any asynchronous 383 # generators alive. 384 return 385 386 closing_agens = list(self._asyncgens) 387 self._asyncgens.clear() 388 389 shutdown_coro = tasks.gather( 390 *[ag.aclose() for ag in closing_agens], 391 return_exceptions=True, 392 loop=self) 393 394 results = yield from shutdown_coro 395 for result, agen in zip(results, closing_agens): 396 if isinstance(result, Exception): 397 self.call_exception_handler({ 398 'message': 'an error occurred during closing of ' 399 'asynchronous generator {!r}'.format(agen), 400 'exception': result, 401 'asyncgen': agen 402 }) 403 404 def run_forever(self): 405 """Run until stop() is called.""" 406 self._check_closed() 407 if self.is_running(): 408 raise RuntimeError('This event loop is already running') 409 if events._get_running_loop() is not None: 410 raise RuntimeError( 411 'Cannot run the event loop while another loop is running') 412 self._set_coroutine_wrapper(self._debug) 413 self._thread_id = threading.get_ident() 414 if self._asyncgens is not None: 415 old_agen_hooks = sys.get_asyncgen_hooks() 416 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, 417 finalizer=self._asyncgen_finalizer_hook) 418 try: 419 events._set_running_loop(self) 420 while True: 421 self._run_once() 422 if self._stopping: 423 break 424 finally: 425 self._stopping = False 426 self._thread_id = None 427 events._set_running_loop(None) 428 self._set_coroutine_wrapper(False) 429 if self._asyncgens is not None: 430 sys.set_asyncgen_hooks(*old_agen_hooks) 431 432 def run_until_complete(self, future): 433 """Run until the Future is done. 434 435 If the argument is a coroutine, it is wrapped in a Task. 436 437 WARNING: It would be disastrous to call run_until_complete() 438 with the same coroutine twice -- it would wrap it in two 439 different Tasks and that can't be good. 440 441 Return the Future's result, or raise its exception. 442 """ 443 self._check_closed() 444 445 new_task = not futures.isfuture(future) 446 future = tasks.ensure_future(future, loop=self) 447 if new_task: 448 # An exception is raised if the future didn't complete, so there 449 # is no need to log the "destroy pending task" message 450 future._log_destroy_pending = False 451 452 future.add_done_callback(_run_until_complete_cb) 453 try: 454 self.run_forever() 455 except: 456 if new_task and future.done() and not future.cancelled(): 457 # The coroutine raised a BaseException. Consume the exception 458 # to not log a warning, the caller doesn't have access to the 459 # local task. 460 future.exception() 461 raise 462 future.remove_done_callback(_run_until_complete_cb) 463 if not future.done(): 464 raise RuntimeError('Event loop stopped before Future completed.') 465 466 return future.result() 467 468 def stop(self): 469 """Stop running the event loop. 470 471 Every callback already scheduled will still run. This simply informs 472 run_forever to stop looping after a complete iteration. 473 """ 474 self._stopping = True 475 476 def close(self): 477 """Close the event loop. 478 479 This clears the queues and shuts down the executor, 480 but does not wait for the executor to finish. 481 482 The event loop must not be running. 483 """ 484 if self.is_running(): 485 raise RuntimeError("Cannot close a running event loop") 486 if self._closed: 487 return 488 if self._debug: 489 logger.debug("Close %r", self) 490 self._closed = True 491 self._ready.clear() 492 self._scheduled.clear() 493 executor = self._default_executor 494 if executor is not None: 495 self._default_executor = None 496 executor.shutdown(wait=False) 497 498 def is_closed(self): 499 """Returns True if the event loop was closed.""" 500 return self._closed 501 502 # On Python 3.3 and older, objects with a destructor part of a reference 503 # cycle are never destroyed. It's not more the case on Python 3.4 thanks 504 # to the PEP 442. 505 if compat.PY34: 506 def __del__(self): 507 if not self.is_closed(): 508 warnings.warn("unclosed event loop %r" % self, ResourceWarning, 509 source=self) 510 if not self.is_running(): 511 self.close() 512 513 def is_running(self): 514 """Returns True if the event loop is running.""" 515 return (self._thread_id is not None) 516 517 def time(self): 518 """Return the time according to the event loop's clock. 519 520 This is a float expressed in seconds since an epoch, but the 521 epoch, precision, accuracy and drift are unspecified and may 522 differ per event loop. 523 """ 524 return time.monotonic() 525 526 def call_later(self, delay, callback, *args): 527 """Arrange for a callback to be called at a given time. 528 529 Return a Handle: an opaque object with a cancel() method that 530 can be used to cancel the call. 531 532 The delay can be an int or float, expressed in seconds. It is 533 always relative to the current time. 534 535 Each callback will be called exactly once. If two callbacks 536 are scheduled for exactly the same time, it undefined which 537 will be called first. 538 539 Any positional arguments after the callback will be passed to 540 the callback when it is called. 541 """ 542 timer = self.call_at(self.time() + delay, callback, *args) 543 if timer._source_traceback: 544 del timer._source_traceback[-1] 545 return timer 546 547 def call_at(self, when, callback, *args): 548 """Like call_later(), but uses an absolute time. 549 550 Absolute time corresponds to the event loop's time() method. 551 """ 552 self._check_closed() 553 if self._debug: 554 self._check_thread() 555 self._check_callback(callback, 'call_at') 556 timer = events.TimerHandle(when, callback, args, self) 557 if timer._source_traceback: 558 del timer._source_traceback[-1] 559 heapq.heappush(self._scheduled, timer) 560 timer._scheduled = True 561 return timer 562 563 def call_soon(self, callback, *args): 564 """Arrange for a callback to be called as soon as possible. 565 566 This operates as a FIFO queue: callbacks are called in the 567 order in which they are registered. Each callback will be 568 called exactly once. 569 570 Any positional arguments after the callback will be passed to 571 the callback when it is called. 572 """ 573 self._check_closed() 574 if self._debug: 575 self._check_thread() 576 self._check_callback(callback, 'call_soon') 577 handle = self._call_soon(callback, args) 578 if handle._source_traceback: 579 del handle._source_traceback[-1] 580 return handle 581 582 def _check_callback(self, callback, method): 583 if (coroutines.iscoroutine(callback) or 584 coroutines.iscoroutinefunction(callback)): 585 raise TypeError( 586 "coroutines cannot be used with {}()".format(method)) 587 if not callable(callback): 588 raise TypeError( 589 'a callable object was expected by {}(), got {!r}'.format( 590 method, callback)) 591 592 593 def _call_soon(self, callback, args): 594 handle = events.Handle(callback, args, self) 595 if handle._source_traceback: 596 del handle._source_traceback[-1] 597 self._ready.append(handle) 598 return handle 599 600 def _check_thread(self): 601 """Check that the current thread is the thread running the event loop. 602 603 Non-thread-safe methods of this class make this assumption and will 604 likely behave incorrectly when the assumption is violated. 605 606 Should only be called when (self._debug == True). The caller is 607 responsible for checking this condition for performance reasons. 608 """ 609 if self._thread_id is None: 610 return 611 thread_id = threading.get_ident() 612 if thread_id != self._thread_id: 613 raise RuntimeError( 614 "Non-thread-safe operation invoked on an event loop other " 615 "than the current one") 616 617 def call_soon_threadsafe(self, callback, *args): 618 """Like call_soon(), but thread-safe.""" 619 self._check_closed() 620 if self._debug: 621 self._check_callback(callback, 'call_soon_threadsafe') 622 handle = self._call_soon(callback, args) 623 if handle._source_traceback: 624 del handle._source_traceback[-1] 625 self._write_to_self() 626 return handle 627 628 def run_in_executor(self, executor, func, *args): 629 self._check_closed() 630 if self._debug: 631 self._check_callback(func, 'run_in_executor') 632 if executor is None: 633 executor = self._default_executor 634 if executor is None: 635 executor = concurrent.futures.ThreadPoolExecutor() 636 self._default_executor = executor 637 return futures.wrap_future(executor.submit(func, *args), loop=self) 638 639 def set_default_executor(self, executor): 640 self._default_executor = executor 641 642 def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 643 msg = ["%s:%r" % (host, port)] 644 if family: 645 msg.append('family=%r' % family) 646 if type: 647 msg.append('type=%r' % type) 648 if proto: 649 msg.append('proto=%r' % proto) 650 if flags: 651 msg.append('flags=%r' % flags) 652 msg = ', '.join(msg) 653 logger.debug('Get address info %s', msg) 654 655 t0 = self.time() 656 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 657 dt = self.time() - t0 658 659 msg = ('Getting address info %s took %.3f ms: %r' 660 % (msg, dt * 1e3, addrinfo)) 661 if dt >= self.slow_callback_duration: 662 logger.info(msg) 663 else: 664 logger.debug(msg) 665 return addrinfo 666 667 def getaddrinfo(self, host, port, *, 668 family=0, type=0, proto=0, flags=0): 669 if self._debug: 670 return self.run_in_executor(None, self._getaddrinfo_debug, 671 host, port, family, type, proto, flags) 672 else: 673 return self.run_in_executor(None, socket.getaddrinfo, 674 host, port, family, type, proto, flags) 675 676 def getnameinfo(self, sockaddr, flags=0): 677 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags) 678 679 @coroutine 680 def create_connection(self, protocol_factory, host=None, port=None, *, 681 ssl=None, family=0, proto=0, flags=0, sock=None, 682 local_addr=None, server_hostname=None): 683 """Connect to a TCP server. 684 685 Create a streaming transport connection to a given Internet host and 686 port: socket family AF_INET or socket.AF_INET6 depending on host (or 687 family if specified), socket type SOCK_STREAM. protocol_factory must be 688 a callable returning a protocol instance. 689 690 This method is a coroutine which will try to establish the connection 691 in the background. When successful, the coroutine returns a 692 (transport, protocol) pair. 693 """ 694 if server_hostname is not None and not ssl: 695 raise ValueError('server_hostname is only meaningful with ssl') 696 697 if server_hostname is None and ssl: 698 # Use host as default for server_hostname. It is an error 699 # if host is empty or not set, e.g. when an 700 # already-connected socket was passed or when only a port 701 # is given. To avoid this error, you can pass 702 # server_hostname='' -- this will bypass the hostname 703 # check. (This also means that if host is a numeric 704 # IP/IPv6 address, we will attempt to verify that exact 705 # address; this will probably fail, but it is possible to 706 # create a certificate for a specific IP address, so we 707 # don't judge it here.) 708 if not host: 709 raise ValueError('You must set server_hostname ' 710 'when using ssl without a host') 711 server_hostname = host 712 713 if host is not None or port is not None: 714 if sock is not None: 715 raise ValueError( 716 'host/port and sock can not be specified at the same time') 717 718 f1 = _ensure_resolved((host, port), family=family, 719 type=socket.SOCK_STREAM, proto=proto, 720 flags=flags, loop=self) 721 fs = [f1] 722 if local_addr is not None: 723 f2 = _ensure_resolved(local_addr, family=family, 724 type=socket.SOCK_STREAM, proto=proto, 725 flags=flags, loop=self) 726 fs.append(f2) 727 else: 728 f2 = None 729 730 yield from tasks.wait(fs, loop=self) 731 732 infos = f1.result() 733 if not infos: 734 raise OSError('getaddrinfo() returned empty list') 735 if f2 is not None: 736 laddr_infos = f2.result() 737 if not laddr_infos: 738 raise OSError('getaddrinfo() returned empty list') 739 740 exceptions = [] 741 for family, type, proto, cname, address in infos: 742 try: 743 sock = socket.socket(family=family, type=type, proto=proto) 744 sock.setblocking(False) 745 if f2 is not None: 746 for _, _, _, _, laddr in laddr_infos: 747 try: 748 sock.bind(laddr) 749 break 750 except OSError as exc: 751 exc = OSError( 752 exc.errno, 'error while ' 753 'attempting to bind on address ' 754 '{!r}: {}'.format( 755 laddr, exc.strerror.lower())) 756 exceptions.append(exc) 757 else: 758 sock.close() 759 sock = None 760 continue 761 if self._debug: 762 logger.debug("connect %r to %r", sock, address) 763 yield from self.sock_connect(sock, address) 764 except OSError as exc: 765 if sock is not None: 766 sock.close() 767 exceptions.append(exc) 768 except: 769 if sock is not None: 770 sock.close() 771 raise 772 else: 773 break 774 else: 775 if len(exceptions) == 1: 776 raise exceptions[0] 777 else: 778 # If they all have the same str(), raise one. 779 model = str(exceptions[0]) 780 if all(str(exc) == model for exc in exceptions): 781 raise exceptions[0] 782 # Raise a combined exception so the user can see all 783 # the various error messages. 784 raise OSError('Multiple exceptions: {}'.format( 785 ', '.join(str(exc) for exc in exceptions))) 786 787 else: 788 if sock is None: 789 raise ValueError( 790 'host and port was not specified and no sock specified') 791 if not _is_stream_socket(sock): 792 # We allow AF_INET, AF_INET6, AF_UNIX as long as they 793 # are SOCK_STREAM. 794 # We support passing AF_UNIX sockets even though we have 795 # a dedicated API for that: create_unix_connection. 796 # Disallowing AF_UNIX in this method, breaks backwards 797 # compatibility. 798 raise ValueError( 799 'A Stream Socket was expected, got {!r}'.format(sock)) 800 801 transport, protocol = yield from self._create_connection_transport( 802 sock, protocol_factory, ssl, server_hostname) 803 if self._debug: 804 # Get the socket from the transport because SSL transport closes 805 # the old socket and creates a new SSL socket 806 sock = transport.get_extra_info('socket') 807 logger.debug("%r connected to %s:%r: (%r, %r)", 808 sock, host, port, transport, protocol) 809 return transport, protocol 810 811 @coroutine 812 def _create_connection_transport(self, sock, protocol_factory, ssl, 813 server_hostname, server_side=False): 814 815 sock.setblocking(False) 816 817 protocol = protocol_factory() 818 waiter = self.create_future() 819 if ssl: 820 sslcontext = None if isinstance(ssl, bool) else ssl 821 transport = self._make_ssl_transport( 822 sock, protocol, sslcontext, waiter, 823 server_side=server_side, server_hostname=server_hostname) 824 else: 825 transport = self._make_socket_transport(sock, protocol, waiter) 826 827 try: 828 yield from waiter 829 except: 830 transport.close() 831 raise 832 833 return transport, protocol 834 835 @coroutine 836 def create_datagram_endpoint(self, protocol_factory, 837 local_addr=None, remote_addr=None, *, 838 family=0, proto=0, flags=0, 839 reuse_address=None, reuse_port=None, 840 allow_broadcast=None, sock=None): 841 """Create datagram connection.""" 842 if sock is not None: 843 if not _is_dgram_socket(sock): 844 raise ValueError( 845 'A UDP Socket was expected, got {!r}'.format(sock)) 846 if (local_addr or remote_addr or 847 family or proto or flags or 848 reuse_address or reuse_port or allow_broadcast): 849 # show the problematic kwargs in exception msg 850 opts = dict(local_addr=local_addr, remote_addr=remote_addr, 851 family=family, proto=proto, flags=flags, 852 reuse_address=reuse_address, reuse_port=reuse_port, 853 allow_broadcast=allow_broadcast) 854 problems = ', '.join( 855 '{}={}'.format(k, v) for k, v in opts.items() if v) 856 raise ValueError( 857 'socket modifier keyword arguments can not be used ' 858 'when sock is specified. ({})'.format(problems)) 859 sock.setblocking(False) 860 r_addr = None 861 else: 862 if not (local_addr or remote_addr): 863 if family == 0: 864 raise ValueError('unexpected address family') 865 addr_pairs_info = (((family, proto), (None, None)),) 866 else: 867 # join address by (family, protocol) 868 addr_infos = collections.OrderedDict() 869 for idx, addr in ((0, local_addr), (1, remote_addr)): 870 if addr is not None: 871 assert isinstance(addr, tuple) and len(addr) == 2, ( 872 '2-tuple is expected') 873 874 infos = yield from _ensure_resolved( 875 addr, family=family, type=socket.SOCK_DGRAM, 876 proto=proto, flags=flags, loop=self) 877 if not infos: 878 raise OSError('getaddrinfo() returned empty list') 879 880 for fam, _, pro, _, address in infos: 881 key = (fam, pro) 882 if key not in addr_infos: 883 addr_infos[key] = [None, None] 884 addr_infos[key][idx] = address 885 886 # each addr has to have info for each (family, proto) pair 887 addr_pairs_info = [ 888 (key, addr_pair) for key, addr_pair in addr_infos.items() 889 if not ((local_addr and addr_pair[0] is None) or 890 (remote_addr and addr_pair[1] is None))] 891 892 if not addr_pairs_info: 893 raise ValueError('can not get address information') 894 895 exceptions = [] 896 897 if reuse_address is None: 898 reuse_address = os.name == 'posix' and sys.platform != 'cygwin' 899 900 for ((family, proto), 901 (local_address, remote_address)) in addr_pairs_info: 902 sock = None 903 r_addr = None 904 try: 905 sock = socket.socket( 906 family=family, type=socket.SOCK_DGRAM, proto=proto) 907 if reuse_address: 908 sock.setsockopt( 909 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 910 if reuse_port: 911 _set_reuseport(sock) 912 if allow_broadcast: 913 sock.setsockopt( 914 socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 915 sock.setblocking(False) 916 917 if local_addr: 918 sock.bind(local_address) 919 if remote_addr: 920 yield from self.sock_connect(sock, remote_address) 921 r_addr = remote_address 922 except OSError as exc: 923 if sock is not None: 924 sock.close() 925 exceptions.append(exc) 926 except: 927 if sock is not None: 928 sock.close() 929 raise 930 else: 931 break 932 else: 933 raise exceptions[0] 934 935 protocol = protocol_factory() 936 waiter = self.create_future() 937 transport = self._make_datagram_transport( 938 sock, protocol, r_addr, waiter) 939 if self._debug: 940 if local_addr: 941 logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 942 "created: (%r, %r)", 943 local_addr, remote_addr, transport, protocol) 944 else: 945 logger.debug("Datagram endpoint remote_addr=%r created: " 946 "(%r, %r)", 947 remote_addr, transport, protocol) 948 949 try: 950 yield from waiter 951 except: 952 transport.close() 953 raise 954 955 return transport, protocol 956 957 @coroutine 958 def _create_server_getaddrinfo(self, host, port, family, flags): 959 infos = yield from _ensure_resolved((host, port), family=family, 960 type=socket.SOCK_STREAM, 961 flags=flags, loop=self) 962 if not infos: 963 raise OSError('getaddrinfo({!r}) returned empty list'.format(host)) 964 return infos 965 966 @coroutine 967 def create_server(self, protocol_factory, host=None, port=None, 968 *, 969 family=socket.AF_UNSPEC, 970 flags=socket.AI_PASSIVE, 971 sock=None, 972 backlog=100, 973 ssl=None, 974 reuse_address=None, 975 reuse_port=None): 976 """Create a TCP server. 977 978 The host parameter can be a string, in that case the TCP server is bound 979 to host and port. 980 981 The host parameter can also be a sequence of strings and in that case 982 the TCP server is bound to all hosts of the sequence. If a host 983 appears multiple times (possibly indirectly e.g. when hostnames 984 resolve to the same IP address), the server is only bound once to that 985 host. 986 987 Return a Server object which can be used to stop the service. 988 989 This method is a coroutine. 990 """ 991 if isinstance(ssl, bool): 992 raise TypeError('ssl argument must be an SSLContext or None') 993 if host is not None or port is not None: 994 if sock is not None: 995 raise ValueError( 996 'host/port and sock can not be specified at the same time') 997 998 AF_INET6 = getattr(socket, 'AF_INET6', 0) 999 if reuse_address is None: 1000 reuse_address = os.name == 'posix' and sys.platform != 'cygwin' 1001 sockets = [] 1002 if host == '': 1003 hosts = [None] 1004 elif (isinstance(host, str) or 1005 not isinstance(host, collections.Iterable)): 1006 hosts = [host] 1007 else: 1008 hosts = host 1009 1010 fs = [self._create_server_getaddrinfo(host, port, family=family, 1011 flags=flags) 1012 for host in hosts] 1013 infos = yield from tasks.gather(*fs, loop=self) 1014 infos = set(itertools.chain.from_iterable(infos)) 1015 1016 completed = False 1017 try: 1018 for res in infos: 1019 af, socktype, proto, canonname, sa = res 1020 try: 1021 sock = socket.socket(af, socktype, proto) 1022 except socket.error: 1023 # Assume it's a bad family/type/protocol combination. 1024 if self._debug: 1025 logger.warning('create_server() failed to create ' 1026 'socket.socket(%r, %r, %r)', 1027 af, socktype, proto, exc_info=True) 1028 continue 1029 sockets.append(sock) 1030 if reuse_address: 1031 sock.setsockopt( 1032 socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 1033 if reuse_port: 1034 _set_reuseport(sock) 1035 # Disable IPv4/IPv6 dual stack support (enabled by 1036 # default on Linux) which makes a single socket 1037 # listen on both address families. 1038 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'): 1039 sock.setsockopt(socket.IPPROTO_IPV6, 1040 socket.IPV6_V6ONLY, 1041 True) 1042 try: 1043 sock.bind(sa) 1044 except OSError as err: 1045 raise OSError(err.errno, 'error while attempting ' 1046 'to bind on address %r: %s' 1047 % (sa, err.strerror.lower())) 1048 completed = True 1049 finally: 1050 if not completed: 1051 for sock in sockets: 1052 sock.close() 1053 else: 1054 if sock is None: 1055 raise ValueError('Neither host/port nor sock were specified') 1056 if not _is_stream_socket(sock): 1057 raise ValueError( 1058 'A Stream Socket was expected, got {!r}'.format(sock)) 1059 sockets = [sock] 1060 1061 server = Server(self, sockets) 1062 for sock in sockets: 1063 sock.listen(backlog) 1064 sock.setblocking(False) 1065 self._start_serving(protocol_factory, sock, ssl, server, backlog) 1066 if self._debug: 1067 logger.info("%r is serving", server) 1068 return server 1069 1070 @coroutine 1071 def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None): 1072 """Handle an accepted connection. 1073 1074 This is used by servers that accept connections outside of 1075 asyncio but that use asyncio to handle connections. 1076 1077 This method is a coroutine. When completed, the coroutine 1078 returns a (transport, protocol) pair. 1079 """ 1080 if not _is_stream_socket(sock): 1081 raise ValueError( 1082 'A Stream Socket was expected, got {!r}'.format(sock)) 1083 1084 transport, protocol = yield from self._create_connection_transport( 1085 sock, protocol_factory, ssl, '', server_side=True) 1086 if self._debug: 1087 # Get the socket from the transport because SSL transport closes 1088 # the old socket and creates a new SSL socket 1089 sock = transport.get_extra_info('socket') 1090 logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 1091 return transport, protocol 1092 1093 @coroutine 1094 def connect_read_pipe(self, protocol_factory, pipe): 1095 protocol = protocol_factory() 1096 waiter = self.create_future() 1097 transport = self._make_read_pipe_transport(pipe, protocol, waiter) 1098 1099 try: 1100 yield from waiter 1101 except: 1102 transport.close() 1103 raise 1104 1105 if self._debug: 1106 logger.debug('Read pipe %r connected: (%r, %r)', 1107 pipe.fileno(), transport, protocol) 1108 return transport, protocol 1109 1110 @coroutine 1111 def connect_write_pipe(self, protocol_factory, pipe): 1112 protocol = protocol_factory() 1113 waiter = self.create_future() 1114 transport = self._make_write_pipe_transport(pipe, protocol, waiter) 1115 1116 try: 1117 yield from waiter 1118 except: 1119 transport.close() 1120 raise 1121 1122 if self._debug: 1123 logger.debug('Write pipe %r connected: (%r, %r)', 1124 pipe.fileno(), transport, protocol) 1125 return transport, protocol 1126 1127 def _log_subprocess(self, msg, stdin, stdout, stderr): 1128 info = [msg] 1129 if stdin is not None: 1130 info.append('stdin=%s' % _format_pipe(stdin)) 1131 if stdout is not None and stderr == subprocess.STDOUT: 1132 info.append('stdout=stderr=%s' % _format_pipe(stdout)) 1133 else: 1134 if stdout is not None: 1135 info.append('stdout=%s' % _format_pipe(stdout)) 1136 if stderr is not None: 1137 info.append('stderr=%s' % _format_pipe(stderr)) 1138 logger.debug(' '.join(info)) 1139 1140 @coroutine 1141 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE, 1142 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 1143 universal_newlines=False, shell=True, bufsize=0, 1144 **kwargs): 1145 if not isinstance(cmd, (bytes, str)): 1146 raise ValueError("cmd must be a string") 1147 if universal_newlines: 1148 raise ValueError("universal_newlines must be False") 1149 if not shell: 1150 raise ValueError("shell must be True") 1151 if bufsize != 0: 1152 raise ValueError("bufsize must be 0") 1153 protocol = protocol_factory() 1154 if self._debug: 1155 # don't log parameters: they may contain sensitive information 1156 # (password) and may be too long 1157 debug_log = 'run shell command %r' % cmd 1158 self._log_subprocess(debug_log, stdin, stdout, stderr) 1159 transport = yield from self._make_subprocess_transport( 1160 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 1161 if self._debug: 1162 logger.info('%s: %r', debug_log, transport) 1163 return transport, protocol 1164 1165 @coroutine 1166 def subprocess_exec(self, protocol_factory, program, *args, 1167 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1168 stderr=subprocess.PIPE, universal_newlines=False, 1169 shell=False, bufsize=0, **kwargs): 1170 if universal_newlines: 1171 raise ValueError("universal_newlines must be False") 1172 if shell: 1173 raise ValueError("shell must be False") 1174 if bufsize != 0: 1175 raise ValueError("bufsize must be 0") 1176 popen_args = (program,) + args 1177 for arg in popen_args: 1178 if not isinstance(arg, (str, bytes)): 1179 raise TypeError("program arguments must be " 1180 "a bytes or text string, not %s" 1181 % type(arg).__name__) 1182 protocol = protocol_factory() 1183 if self._debug: 1184 # don't log parameters: they may contain sensitive information 1185 # (password) and may be too long 1186 debug_log = 'execute program %r' % program 1187 self._log_subprocess(debug_log, stdin, stdout, stderr) 1188 transport = yield from self._make_subprocess_transport( 1189 protocol, popen_args, False, stdin, stdout, stderr, 1190 bufsize, **kwargs) 1191 if self._debug: 1192 logger.info('%s: %r', debug_log, transport) 1193 return transport, protocol 1194 1195 def get_exception_handler(self): 1196 """Return an exception handler, or None if the default one is in use. 1197 """ 1198 return self._exception_handler 1199 1200 def set_exception_handler(self, handler): 1201 """Set handler as the new event loop exception handler. 1202 1203 If handler is None, the default exception handler will 1204 be set. 1205 1206 If handler is a callable object, it should have a 1207 signature matching '(loop, context)', where 'loop' 1208 will be a reference to the active event loop, 'context' 1209 will be a dict object (see `call_exception_handler()` 1210 documentation for details about context). 1211 """ 1212 if handler is not None and not callable(handler): 1213 raise TypeError('A callable object or None is expected, ' 1214 'got {!r}'.format(handler)) 1215 self._exception_handler = handler 1216 1217 def default_exception_handler(self, context): 1218 """Default exception handler. 1219 1220 This is called when an exception occurs and no exception 1221 handler is set, and can be called by a custom exception 1222 handler that wants to defer to the default behavior. 1223 1224 The context parameter has the same meaning as in 1225 `call_exception_handler()`. 1226 """ 1227 message = context.get('message') 1228 if not message: 1229 message = 'Unhandled exception in event loop' 1230 1231 exception = context.get('exception') 1232 if exception is not None: 1233 exc_info = (type(exception), exception, exception.__traceback__) 1234 else: 1235 exc_info = False 1236 1237 if ('source_traceback' not in context 1238 and self._current_handle is not None 1239 and self._current_handle._source_traceback): 1240 context['handle_traceback'] = self._current_handle._source_traceback 1241 1242 log_lines = [message] 1243 for key in sorted(context): 1244 if key in {'message', 'exception'}: 1245 continue 1246 value = context[key] 1247 if key == 'source_traceback': 1248 tb = ''.join(traceback.format_list(value)) 1249 value = 'Object created at (most recent call last):\n' 1250 value += tb.rstrip() 1251 elif key == 'handle_traceback': 1252 tb = ''.join(traceback.format_list(value)) 1253 value = 'Handle created at (most recent call last):\n' 1254 value += tb.rstrip() 1255 else: 1256 value = repr(value) 1257 log_lines.append('{}: {}'.format(key, value)) 1258 1259 logger.error('\n'.join(log_lines), exc_info=exc_info) 1260 1261 def call_exception_handler(self, context): 1262 """Call the current event loop's exception handler. 1263 1264 The context argument is a dict containing the following keys: 1265 1266 - 'message': Error message; 1267 - 'exception' (optional): Exception object; 1268 - 'future' (optional): Future instance; 1269 - 'handle' (optional): Handle instance; 1270 - 'protocol' (optional): Protocol instance; 1271 - 'transport' (optional): Transport instance; 1272 - 'socket' (optional): Socket instance; 1273 - 'asyncgen' (optional): Asynchronous generator that caused 1274 the exception. 1275 1276 New keys maybe introduced in the future. 1277 1278 Note: do not overload this method in an event loop subclass. 1279 For custom exception handling, use the 1280 `set_exception_handler()` method. 1281 """ 1282 if self._exception_handler is None: 1283 try: 1284 self.default_exception_handler(context) 1285 except Exception: 1286 # Second protection layer for unexpected errors 1287 # in the default implementation, as well as for subclassed 1288 # event loops with overloaded "default_exception_handler". 1289 logger.error('Exception in default exception handler', 1290 exc_info=True) 1291 else: 1292 try: 1293 self._exception_handler(self, context) 1294 except Exception as exc: 1295 # Exception in the user set custom exception handler. 1296 try: 1297 # Let's try default handler. 1298 self.default_exception_handler({ 1299 'message': 'Unhandled error in exception handler', 1300 'exception': exc, 1301 'context': context, 1302 }) 1303 except Exception: 1304 # Guard 'default_exception_handler' in case it is 1305 # overloaded. 1306 logger.error('Exception in default exception handler ' 1307 'while handling an unexpected error ' 1308 'in custom exception handler', 1309 exc_info=True) 1310 1311 def _add_callback(self, handle): 1312 """Add a Handle to _scheduled (TimerHandle) or _ready.""" 1313 assert isinstance(handle, events.Handle), 'A Handle is required here' 1314 if handle._cancelled: 1315 return 1316 assert not isinstance(handle, events.TimerHandle) 1317 self._ready.append(handle) 1318 1319 def _add_callback_signalsafe(self, handle): 1320 """Like _add_callback() but called from a signal handler.""" 1321 self._add_callback(handle) 1322 self._write_to_self() 1323 1324 def _timer_handle_cancelled(self, handle): 1325 """Notification that a TimerHandle has been cancelled.""" 1326 if handle._scheduled: 1327 self._timer_cancelled_count += 1 1328 1329 def _run_once(self): 1330 """Run one full iteration of the event loop. 1331 1332 This calls all currently ready callbacks, polls for I/O, 1333 schedules the resulting callbacks, and finally schedules 1334 'call_later' callbacks. 1335 """ 1336 1337 sched_count = len(self._scheduled) 1338 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 1339 self._timer_cancelled_count / sched_count > 1340 _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 1341 # Remove delayed calls that were cancelled if their number 1342 # is too high 1343 new_scheduled = [] 1344 for handle in self._scheduled: 1345 if handle._cancelled: 1346 handle._scheduled = False 1347 else: 1348 new_scheduled.append(handle) 1349 1350 heapq.heapify(new_scheduled) 1351 self._scheduled = new_scheduled 1352 self._timer_cancelled_count = 0 1353 else: 1354 # Remove delayed calls that were cancelled from head of queue. 1355 while self._scheduled and self._scheduled[0]._cancelled: 1356 self._timer_cancelled_count -= 1 1357 handle = heapq.heappop(self._scheduled) 1358 handle._scheduled = False 1359 1360 timeout = None 1361 if self._ready or self._stopping: 1362 timeout = 0 1363 elif self._scheduled: 1364 # Compute the desired timeout. 1365 when = self._scheduled[0]._when 1366 timeout = max(0, when - self.time()) 1367 1368 if self._debug and timeout != 0: 1369 t0 = self.time() 1370 event_list = self._selector.select(timeout) 1371 dt = self.time() - t0 1372 if dt >= 1.0: 1373 level = logging.INFO 1374 else: 1375 level = logging.DEBUG 1376 nevent = len(event_list) 1377 if timeout is None: 1378 logger.log(level, 'poll took %.3f ms: %s events', 1379 dt * 1e3, nevent) 1380 elif nevent: 1381 logger.log(level, 1382 'poll %.3f ms took %.3f ms: %s events', 1383 timeout * 1e3, dt * 1e3, nevent) 1384 elif dt >= 1.0: 1385 logger.log(level, 1386 'poll %.3f ms took %.3f ms: timeout', 1387 timeout * 1e3, dt * 1e3) 1388 else: 1389 event_list = self._selector.select(timeout) 1390 self._process_events(event_list) 1391 1392 # Handle 'later' callbacks that are ready. 1393 end_time = self.time() + self._clock_resolution 1394 while self._scheduled: 1395 handle = self._scheduled[0] 1396 if handle._when >= end_time: 1397 break 1398 handle = heapq.heappop(self._scheduled) 1399 handle._scheduled = False 1400 self._ready.append(handle) 1401 1402 # This is the only place where callbacks are actually *called*. 1403 # All other places just add them to ready. 1404 # Note: We run all currently scheduled callbacks, but not any 1405 # callbacks scheduled by callbacks run this time around -- 1406 # they will be run the next time (after another I/O poll). 1407 # Use an idiom that is thread-safe without using locks. 1408 ntodo = len(self._ready) 1409 for i in range(ntodo): 1410 handle = self._ready.popleft() 1411 if handle._cancelled: 1412 continue 1413 if self._debug: 1414 try: 1415 self._current_handle = handle 1416 t0 = self.time() 1417 handle._run() 1418 dt = self.time() - t0 1419 if dt >= self.slow_callback_duration: 1420 logger.warning('Executing %s took %.3f seconds', 1421 _format_handle(handle), dt) 1422 finally: 1423 self._current_handle = None 1424 else: 1425 handle._run() 1426 handle = None # Needed to break cycles when an exception occurs. 1427 1428 def _set_coroutine_wrapper(self, enabled): 1429 try: 1430 set_wrapper = sys.set_coroutine_wrapper 1431 get_wrapper = sys.get_coroutine_wrapper 1432 except AttributeError: 1433 return 1434 1435 enabled = bool(enabled) 1436 if self._coroutine_wrapper_set == enabled: 1437 return 1438 1439 wrapper = coroutines.debug_wrapper 1440 current_wrapper = get_wrapper() 1441 1442 if enabled: 1443 if current_wrapper not in (None, wrapper): 1444 warnings.warn( 1445 "loop.set_debug(True): cannot set debug coroutine " 1446 "wrapper; another wrapper is already set %r" % 1447 current_wrapper, RuntimeWarning) 1448 else: 1449 set_wrapper(wrapper) 1450 self._coroutine_wrapper_set = True 1451 else: 1452 if current_wrapper not in (None, wrapper): 1453 warnings.warn( 1454 "loop.set_debug(False): cannot unset debug coroutine " 1455 "wrapper; another wrapper was set %r" % 1456 current_wrapper, RuntimeWarning) 1457 else: 1458 set_wrapper(None) 1459 self._coroutine_wrapper_set = False 1460 1461 def get_debug(self): 1462 return self._debug 1463 1464 def set_debug(self, enabled): 1465 self._debug = enabled 1466 1467 if self.is_running(): 1468 self._set_coroutine_wrapper(enabled) 1469