1 """Selector and proactor event loops for Windows.""" 2 3 import _winapi 4 import errno 5 import math 6 import socket 7 import struct 8 import weakref 9 10 from . import events 11 from . import base_subprocess 12 from . import futures 13 from . import proactor_events 14 from . import selector_events 15 from . import tasks 16 from . import windows_utils 17 from . import _overlapped 18 from .coroutines import coroutine 19 from .log import logger 20 21 22 __all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', 23 'DefaultEventLoopPolicy', 24 ] 25 26 27 NULL = 0 28 INFINITE = 0xffffffff 29 ERROR_CONNECTION_REFUSED = 1225 30 ERROR_CONNECTION_ABORTED = 1236 31 32 # Initial delay in seconds for connect_pipe() before retrying to connect 33 CONNECT_PIPE_INIT_DELAY = 0.001 34 35 # Maximum delay in seconds for connect_pipe() before retrying to connect 36 CONNECT_PIPE_MAX_DELAY = 0.100 37 38 39 class _OverlappedFuture(futures.Future): 40 """Subclass of Future which represents an overlapped operation. 41 42 Cancelling it will immediately cancel the overlapped operation. 43 """ 44 45 def __init__(self, ov, *, loop=None): 46 super().__init__(loop=loop) 47 if self._source_traceback: 48 del self._source_traceback[-1] 49 self._ov = ov 50 51 def _repr_info(self): 52 info = super()._repr_info() 53 if self._ov is not None: 54 state = 'pending' if self._ov.pending else 'completed' 55 info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address)) 56 return info 57 58 def _cancel_overlapped(self): 59 if self._ov is None: 60 return 61 try: 62 self._ov.cancel() 63 except OSError as exc: 64 context = { 65 'message': 'Cancelling an overlapped future failed', 66 'exception': exc, 67 'future': self, 68 } 69 if self._source_traceback: 70 context['source_traceback'] = self._source_traceback 71 self._loop.call_exception_handler(context) 72 self._ov = None 73 74 def cancel(self): 75 self._cancel_overlapped() 76 return super().cancel() 77 78 def set_exception(self, exception): 79 super().set_exception(exception) 80 self._cancel_overlapped() 81 82 def set_result(self, result): 83 super().set_result(result) 84 self._ov = None 85 86 87 class _BaseWaitHandleFuture(futures.Future): 88 """Subclass of Future which represents a wait handle.""" 89 90 def __init__(self, ov, handle, wait_handle, *, loop=None): 91 super().__init__(loop=loop) 92 if self._source_traceback: 93 del self._source_traceback[-1] 94 # Keep a reference to the Overlapped object to keep it alive until the 95 # wait is unregistered 96 self._ov = ov 97 self._handle = handle 98 self._wait_handle = wait_handle 99 100 # Should we call UnregisterWaitEx() if the wait completes 101 # or is cancelled? 102 self._registered = True 103 104 def _poll(self): 105 # non-blocking wait: use a timeout of 0 millisecond 106 return (_winapi.WaitForSingleObject(self._handle, 0) == 107 _winapi.WAIT_OBJECT_0) 108 109 def _repr_info(self): 110 info = super()._repr_info() 111 info.append('handle=%#x' % self._handle) 112 if self._handle is not None: 113 state = 'signaled' if self._poll() else 'waiting' 114 info.append(state) 115 if self._wait_handle is not None: 116 info.append('wait_handle=%#x' % self._wait_handle) 117 return info 118 119 def _unregister_wait_cb(self, fut): 120 # The wait was unregistered: it's not safe to destroy the Overlapped 121 # object 122 self._ov = None 123 124 def _unregister_wait(self): 125 if not self._registered: 126 return 127 self._registered = False 128 129 wait_handle = self._wait_handle 130 self._wait_handle = None 131 try: 132 _overlapped.UnregisterWait(wait_handle) 133 except OSError as exc: 134 if exc.winerror != _overlapped.ERROR_IO_PENDING: 135 context = { 136 'message': 'Failed to unregister the wait handle', 137 'exception': exc, 138 'future': self, 139 } 140 if self._source_traceback: 141 context['source_traceback'] = self._source_traceback 142 self._loop.call_exception_handler(context) 143 return 144 # ERROR_IO_PENDING means that the unregister is pending 145 146 self._unregister_wait_cb(None) 147 148 def cancel(self): 149 self._unregister_wait() 150 return super().cancel() 151 152 def set_exception(self, exception): 153 self._unregister_wait() 154 super().set_exception(exception) 155 156 def set_result(self, result): 157 self._unregister_wait() 158 super().set_result(result) 159 160 161 class _WaitCancelFuture(_BaseWaitHandleFuture): 162 """Subclass of Future which represents a wait for the cancellation of a 163 _WaitHandleFuture using an event. 164 """ 165 166 def __init__(self, ov, event, wait_handle, *, loop=None): 167 super().__init__(ov, event, wait_handle, loop=loop) 168 169 self._done_callback = None 170 171 def cancel(self): 172 raise RuntimeError("_WaitCancelFuture must not be cancelled") 173 174 def set_result(self, result): 175 super().set_result(result) 176 if self._done_callback is not None: 177 self._done_callback(self) 178 179 def set_exception(self, exception): 180 super().set_exception(exception) 181 if self._done_callback is not None: 182 self._done_callback(self) 183 184 185 class _WaitHandleFuture(_BaseWaitHandleFuture): 186 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None): 187 super().__init__(ov, handle, wait_handle, loop=loop) 188 self._proactor = proactor 189 self._unregister_proactor = True 190 self._event = _overlapped.CreateEvent(None, True, False, None) 191 self._event_fut = None 192 193 def _unregister_wait_cb(self, fut): 194 if self._event is not None: 195 _winapi.CloseHandle(self._event) 196 self._event = None 197 self._event_fut = None 198 199 # If the wait was cancelled, the wait may never be signalled, so 200 # it's required to unregister it. Otherwise, IocpProactor.close() will 201 # wait forever for an event which will never come. 202 # 203 # If the IocpProactor already received the event, it's safe to call 204 # _unregister() because we kept a reference to the Overlapped object 205 # which is used as a unique key. 206 self._proactor._unregister(self._ov) 207 self._proactor = None 208 209 super()._unregister_wait_cb(fut) 210 211 def _unregister_wait(self): 212 if not self._registered: 213 return 214 self._registered = False 215 216 wait_handle = self._wait_handle 217 self._wait_handle = None 218 try: 219 _overlapped.UnregisterWaitEx(wait_handle, self._event) 220 except OSError as exc: 221 if exc.winerror != _overlapped.ERROR_IO_PENDING: 222 context = { 223 'message': 'Failed to unregister the wait handle', 224 'exception': exc, 225 'future': self, 226 } 227 if self._source_traceback: 228 context['source_traceback'] = self._source_traceback 229 self._loop.call_exception_handler(context) 230 return 231 # ERROR_IO_PENDING is not an error, the wait was unregistered 232 233 self._event_fut = self._proactor._wait_cancel(self._event, 234 self._unregister_wait_cb) 235 236 237 class PipeServer(object): 238 """Class representing a pipe server. 239 240 This is much like a bound, listening socket. 241 """ 242 def __init__(self, address): 243 self._address = address 244 self._free_instances = weakref.WeakSet() 245 # initialize the pipe attribute before calling _server_pipe_handle() 246 # because this function can raise an exception and the destructor calls 247 # the close() method 248 self._pipe = None 249 self._accept_pipe_future = None 250 self._pipe = self._server_pipe_handle(True) 251 252 def _get_unconnected_pipe(self): 253 # Create new instance and return previous one. This ensures 254 # that (until the server is closed) there is always at least 255 # one pipe handle for address. Therefore if a client attempt 256 # to connect it will not fail with FileNotFoundError. 257 tmp, self._pipe = self._pipe, self._server_pipe_handle(False) 258 return tmp 259 260 def _server_pipe_handle(self, first): 261 # Return a wrapper for a new pipe handle. 262 if self.closed(): 263 return None 264 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 265 if first: 266 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 267 h = _winapi.CreateNamedPipe( 268 self._address, flags, 269 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 270 _winapi.PIPE_WAIT, 271 _winapi.PIPE_UNLIMITED_INSTANCES, 272 windows_utils.BUFSIZE, windows_utils.BUFSIZE, 273 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) 274 pipe = windows_utils.PipeHandle(h) 275 self._free_instances.add(pipe) 276 return pipe 277 278 def closed(self): 279 return (self._address is None) 280 281 def close(self): 282 if self._accept_pipe_future is not None: 283 self._accept_pipe_future.cancel() 284 self._accept_pipe_future = None 285 # Close all instances which have not been connected to by a client. 286 if self._address is not None: 287 for pipe in self._free_instances: 288 pipe.close() 289 self._pipe = None 290 self._address = None 291 self._free_instances.clear() 292 293 __del__ = close 294 295 296 class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): 297 """Windows version of selector event loop.""" 298 299 def _socketpair(self): 300 return windows_utils.socketpair() 301 302 303 class ProactorEventLoop(proactor_events.BaseProactorEventLoop): 304 """Windows version of proactor event loop using IOCP.""" 305 306 def __init__(self, proactor=None): 307 if proactor is None: 308 proactor = IocpProactor() 309 super().__init__(proactor) 310 311 def _socketpair(self): 312 return windows_utils.socketpair() 313 314 @coroutine 315 def create_pipe_connection(self, protocol_factory, address): 316 f = self._proactor.connect_pipe(address) 317 pipe = yield from f 318 protocol = protocol_factory() 319 trans = self._make_duplex_pipe_transport(pipe, protocol, 320 extra={'addr': address}) 321 return trans, protocol 322 323 @coroutine 324 def start_serving_pipe(self, protocol_factory, address): 325 server = PipeServer(address) 326 327 def loop_accept_pipe(f=None): 328 pipe = None 329 try: 330 if f: 331 pipe = f.result() 332 server._free_instances.discard(pipe) 333 334 if server.closed(): 335 # A client connected before the server was closed: 336 # drop the client (close the pipe) and exit 337 pipe.close() 338 return 339 340 protocol = protocol_factory() 341 self._make_duplex_pipe_transport( 342 pipe, protocol, extra={'addr': address}) 343 344 pipe = server._get_unconnected_pipe() 345 if pipe is None: 346 return 347 348 f = self._proactor.accept_pipe(pipe) 349 except OSError as exc: 350 if pipe and pipe.fileno() != -1: 351 self.call_exception_handler({ 352 'message': 'Pipe accept failed', 353 'exception': exc, 354 'pipe': pipe, 355 }) 356 pipe.close() 357 elif self._debug: 358 logger.warning("Accept pipe failed on pipe %r", 359 pipe, exc_info=True) 360 except futures.CancelledError: 361 if pipe: 362 pipe.close() 363 else: 364 server._accept_pipe_future = f 365 f.add_done_callback(loop_accept_pipe) 366 367 self.call_soon(loop_accept_pipe) 368 return [server] 369 370 @coroutine 371 def _make_subprocess_transport(self, protocol, args, shell, 372 stdin, stdout, stderr, bufsize, 373 extra=None, **kwargs): 374 waiter = self.create_future() 375 transp = _WindowsSubprocessTransport(self, protocol, args, shell, 376 stdin, stdout, stderr, bufsize, 377 waiter=waiter, extra=extra, 378 **kwargs) 379 try: 380 yield from waiter 381 except Exception as exc: 382 # Workaround CPython bug #23353: using yield/yield-from in an 383 # except block of a generator doesn't clear properly sys.exc_info() 384 err = exc 385 else: 386 err = None 387 388 if err is not None: 389 transp.close() 390 yield from transp._wait() 391 raise err 392 393 return transp 394 395 396 class IocpProactor: 397 """Proactor implementation using IOCP.""" 398 399 def __init__(self, concurrency=0xffffffff): 400 self._loop = None 401 self._results = [] 402 self._iocp = _overlapped.CreateIoCompletionPort( 403 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) 404 self._cache = {} 405 self._registered = weakref.WeakSet() 406 self._unregistered = [] 407 self._stopped_serving = weakref.WeakSet() 408 409 def __repr__(self): 410 return ('<%s overlapped#=%s result#=%s>' 411 % (self.__class__.__name__, len(self._cache), 412 len(self._results))) 413 414 def set_loop(self, loop): 415 self._loop = loop 416 417 def select(self, timeout=None): 418 if not self._results: 419 self._poll(timeout) 420 tmp = self._results 421 self._results = [] 422 return tmp 423 424 def _result(self, value): 425 fut = self._loop.create_future() 426 fut.set_result(value) 427 return fut 428 429 def recv(self, conn, nbytes, flags=0): 430 self._register_with_iocp(conn) 431 ov = _overlapped.Overlapped(NULL) 432 try: 433 if isinstance(conn, socket.socket): 434 ov.WSARecv(conn.fileno(), nbytes, flags) 435 else: 436 ov.ReadFile(conn.fileno(), nbytes) 437 except BrokenPipeError: 438 return self._result(b'') 439 440 def finish_recv(trans, key, ov): 441 try: 442 return ov.getresult() 443 except OSError as exc: 444 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED: 445 raise ConnectionResetError(*exc.args) 446 else: 447 raise 448 449 return self._register(ov, conn, finish_recv) 450 451 def send(self, conn, buf, flags=0): 452 self._register_with_iocp(conn) 453 ov = _overlapped.Overlapped(NULL) 454 if isinstance(conn, socket.socket): 455 ov.WSASend(conn.fileno(), buf, flags) 456 else: 457 ov.WriteFile(conn.fileno(), buf) 458 459 def finish_send(trans, key, ov): 460 try: 461 return ov.getresult() 462 except OSError as exc: 463 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED: 464 raise ConnectionResetError(*exc.args) 465 else: 466 raise 467 468 return self._register(ov, conn, finish_send) 469 470 def accept(self, listener): 471 self._register_with_iocp(listener) 472 conn = self._get_accept_socket(listener.family) 473 ov = _overlapped.Overlapped(NULL) 474 ov.AcceptEx(listener.fileno(), conn.fileno()) 475 476 def finish_accept(trans, key, ov): 477 ov.getresult() 478 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work. 479 buf = struct.pack('@P', listener.fileno()) 480 conn.setsockopt(socket.SOL_SOCKET, 481 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf) 482 conn.settimeout(listener.gettimeout()) 483 return conn, conn.getpeername() 484 485 @coroutine 486 def accept_coro(future, conn): 487 # Coroutine closing the accept socket if the future is cancelled 488 try: 489 yield from future 490 except futures.CancelledError: 491 conn.close() 492 raise 493 494 future = self._register(ov, listener, finish_accept) 495 coro = accept_coro(future, conn) 496 tasks.ensure_future(coro, loop=self._loop) 497 return future 498 499 def connect(self, conn, address): 500 self._register_with_iocp(conn) 501 # The socket needs to be locally bound before we call ConnectEx(). 502 try: 503 _overlapped.BindLocal(conn.fileno(), conn.family) 504 except OSError as e: 505 if e.winerror != errno.WSAEINVAL: 506 raise 507 # Probably already locally bound; check using getsockname(). 508 if conn.getsockname()[1] == 0: 509 raise 510 ov = _overlapped.Overlapped(NULL) 511 ov.ConnectEx(conn.fileno(), address) 512 513 def finish_connect(trans, key, ov): 514 ov.getresult() 515 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work. 516 conn.setsockopt(socket.SOL_SOCKET, 517 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0) 518 return conn 519 520 return self._register(ov, conn, finish_connect) 521 522 def accept_pipe(self, pipe): 523 self._register_with_iocp(pipe) 524 ov = _overlapped.Overlapped(NULL) 525 connected = ov.ConnectNamedPipe(pipe.fileno()) 526 527 if connected: 528 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means 529 # that the pipe is connected. There is no need to wait for the 530 # completion of the connection. 531 return self._result(pipe) 532 533 def finish_accept_pipe(trans, key, ov): 534 ov.getresult() 535 return pipe 536 537 return self._register(ov, pipe, finish_accept_pipe) 538 539 @coroutine 540 def connect_pipe(self, address): 541 delay = CONNECT_PIPE_INIT_DELAY 542 while True: 543 # Unfortunately there is no way to do an overlapped connect to a pipe. 544 # Call CreateFile() in a loop until it doesn't fail with 545 # ERROR_PIPE_BUSY 546 try: 547 handle = _overlapped.ConnectPipe(address) 548 break 549 except OSError as exc: 550 if exc.winerror != _overlapped.ERROR_PIPE_BUSY: 551 raise 552 553 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later 554 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) 555 yield from tasks.sleep(delay, loop=self._loop) 556 557 return windows_utils.PipeHandle(handle) 558 559 def wait_for_handle(self, handle, timeout=None): 560 """Wait for a handle. 561 562 Return a Future object. The result of the future is True if the wait 563 completed, or False if the wait did not complete (on timeout). 564 """ 565 return self._wait_for_handle(handle, timeout, False) 566 567 def _wait_cancel(self, event, done_callback): 568 fut = self._wait_for_handle(event, None, True) 569 # add_done_callback() cannot be used because the wait may only complete 570 # in IocpProactor.close(), while the event loop is not running. 571 fut._done_callback = done_callback 572 return fut 573 574 def _wait_for_handle(self, handle, timeout, _is_cancel): 575 if timeout is None: 576 ms = _winapi.INFINITE 577 else: 578 # RegisterWaitForSingleObject() has a resolution of 1 millisecond, 579 # round away from zero to wait *at least* timeout seconds. 580 ms = math.ceil(timeout * 1e3) 581 582 # We only create ov so we can use ov.address as a key for the cache. 583 ov = _overlapped.Overlapped(NULL) 584 wait_handle = _overlapped.RegisterWaitWithQueue( 585 handle, self._iocp, ov.address, ms) 586 if _is_cancel: 587 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) 588 else: 589 f = _WaitHandleFuture(ov, handle, wait_handle, self, 590 loop=self._loop) 591 if f._source_traceback: 592 del f._source_traceback[-1] 593 594 def finish_wait_for_handle(trans, key, ov): 595 # Note that this second wait means that we should only use 596 # this with handles types where a successful wait has no 597 # effect. So events or processes are all right, but locks 598 # or semaphores are not. Also note if the handle is 599 # signalled and then quickly reset, then we may return 600 # False even though we have not timed out. 601 return f._poll() 602 603 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) 604 return f 605 606 def _register_with_iocp(self, obj): 607 # To get notifications of finished ops on this objects sent to the 608 # completion port, were must register the handle. 609 if obj not in self._registered: 610 self._registered.add(obj) 611 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0) 612 # XXX We could also use SetFileCompletionNotificationModes() 613 # to avoid sending notifications to completion port of ops 614 # that succeed immediately. 615 616 def _register(self, ov, obj, callback): 617 # Return a future which will be set with the result of the 618 # operation when it completes. The future's value is actually 619 # the value returned by callback(). 620 f = _OverlappedFuture(ov, loop=self._loop) 621 if f._source_traceback: 622 del f._source_traceback[-1] 623 if not ov.pending: 624 # The operation has completed, so no need to postpone the 625 # work. We cannot take this short cut if we need the 626 # NumberOfBytes, CompletionKey values returned by 627 # PostQueuedCompletionStatus(). 628 try: 629 value = callback(None, None, ov) 630 except OSError as e: 631 f.set_exception(e) 632 else: 633 f.set_result(value) 634 # Even if GetOverlappedResult() was called, we have to wait for the 635 # notification of the completion in GetQueuedCompletionStatus(). 636 # Register the overlapped operation to keep a reference to the 637 # OVERLAPPED object, otherwise the memory is freed and Windows may 638 # read uninitialized memory. 639 640 # Register the overlapped operation for later. Note that 641 # we only store obj to prevent it from being garbage 642 # collected too early. 643 self._cache[ov.address] = (f, ov, obj, callback) 644 return f 645 646 def _unregister(self, ov): 647 """Unregister an overlapped object. 648 649 Call this method when its future has been cancelled. The event can 650 already be signalled (pending in the proactor event queue). It is also 651 safe if the event is never signalled (because it was cancelled). 652 """ 653 self._unregistered.append(ov) 654 655 def _get_accept_socket(self, family): 656 s = socket.socket(family) 657 s.settimeout(0) 658 return s 659 660 def _poll(self, timeout=None): 661 if timeout is None: 662 ms = INFINITE 663 elif timeout < 0: 664 raise ValueError("negative timeout") 665 else: 666 # GetQueuedCompletionStatus() has a resolution of 1 millisecond, 667 # round away from zero to wait *at least* timeout seconds. 668 ms = math.ceil(timeout * 1e3) 669 if ms >= INFINITE: 670 raise ValueError("timeout too big") 671 672 while True: 673 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) 674 if status is None: 675 break 676 ms = 0 677 678 err, transferred, key, address = status 679 try: 680 f, ov, obj, callback = self._cache.pop(address) 681 except KeyError: 682 if self._loop.get_debug(): 683 self._loop.call_exception_handler({ 684 'message': ('GetQueuedCompletionStatus() returned an ' 685 'unexpected event'), 686 'status': ('err=%s transferred=%s key=%#x address=%#x' 687 % (err, transferred, key, address)), 688 }) 689 690 # key is either zero, or it is used to return a pipe 691 # handle which should be closed to avoid a leak. 692 if key not in (0, _overlapped.INVALID_HANDLE_VALUE): 693 _winapi.CloseHandle(key) 694 continue 695 696 if obj in self._stopped_serving: 697 f.cancel() 698 # Don't call the callback if _register() already read the result or 699 # if the overlapped has been cancelled 700 elif not f.done(): 701 try: 702 value = callback(transferred, key, ov) 703 except OSError as e: 704 f.set_exception(e) 705 self._results.append(f) 706 else: 707 f.set_result(value) 708 self._results.append(f) 709 710 # Remove unregisted futures 711 for ov in self._unregistered: 712 self._cache.pop(ov.address, None) 713 self._unregistered.clear() 714 715 def _stop_serving(self, obj): 716 # obj is a socket or pipe handle. It will be closed in 717 # BaseProactorEventLoop._stop_serving() which will make any 718 # pending operations fail quickly. 719 self._stopped_serving.add(obj) 720 721 def close(self): 722 # Cancel remaining registered operations. 723 for address, (fut, ov, obj, callback) in list(self._cache.items()): 724 if fut.cancelled(): 725 # Nothing to do with cancelled futures 726 pass 727 elif isinstance(fut, _WaitCancelFuture): 728 # _WaitCancelFuture must not be cancelled 729 pass 730 else: 731 try: 732 fut.cancel() 733 except OSError as exc: 734 if self._loop is not None: 735 context = { 736 'message': 'Cancelling a future failed', 737 'exception': exc, 738 'future': fut, 739 } 740 if fut._source_traceback: 741 context['source_traceback'] = fut._source_traceback 742 self._loop.call_exception_handler(context) 743 744 while self._cache: 745 if not self._poll(1): 746 logger.debug('taking long time to close proactor') 747 748 self._results = [] 749 if self._iocp is not None: 750 _winapi.CloseHandle(self._iocp) 751 self._iocp = None 752 753 def __del__(self): 754 self.close() 755 756 757 class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport): 758 759 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 760 self._proc = windows_utils.Popen( 761 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 762 bufsize=bufsize, **kwargs) 763 764 def callback(f): 765 returncode = self._proc.poll() 766 self._process_exited(returncode) 767 768 f = self._loop._proactor.wait_for_handle(int(self._proc._handle)) 769 f.add_done_callback(callback) 770 771 772 SelectorEventLoop = _WindowsSelectorEventLoop 773 774 775 class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 776 _loop_factory = SelectorEventLoop 777 778 779 DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy 780