Home | History | Annotate | Download | only in asyncio
      1 """Selector and proactor event loops for Windows."""
      2 
      3 import _winapi
      4 import errno
      5 import math
      6 import socket
      7 import struct
      8 import weakref
      9 
     10 from . import events
     11 from . import base_subprocess
     12 from . import futures
     13 from . import proactor_events
     14 from . import selector_events
     15 from . import tasks
     16 from . import windows_utils
     17 from . import _overlapped
     18 from .coroutines import coroutine
     19 from .log import logger
     20 
     21 
     22 __all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
     23            'DefaultEventLoopPolicy',
     24            ]
     25 
     26 
     27 NULL = 0
     28 INFINITE = 0xffffffff
     29 ERROR_CONNECTION_REFUSED = 1225
     30 ERROR_CONNECTION_ABORTED = 1236
     31 
     32 # Initial delay in seconds for connect_pipe() before retrying to connect
     33 CONNECT_PIPE_INIT_DELAY = 0.001
     34 
     35 # Maximum delay in seconds for connect_pipe() before retrying to connect
     36 CONNECT_PIPE_MAX_DELAY = 0.100
     37 
     38 
     39 class _OverlappedFuture(futures.Future):
     40     """Subclass of Future which represents an overlapped operation.
     41 
     42     Cancelling it will immediately cancel the overlapped operation.
     43     """
     44 
     45     def __init__(self, ov, *, loop=None):
     46         super().__init__(loop=loop)
     47         if self._source_traceback:
     48             del self._source_traceback[-1]
     49         self._ov = ov
     50 
     51     def _repr_info(self):
     52         info = super()._repr_info()
     53         if self._ov is not None:
     54             state = 'pending' if self._ov.pending else 'completed'
     55             info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
     56         return info
     57 
     58     def _cancel_overlapped(self):
     59         if self._ov is None:
     60             return
     61         try:
     62             self._ov.cancel()
     63         except OSError as exc:
     64             context = {
     65                 'message': 'Cancelling an overlapped future failed',
     66                 'exception': exc,
     67                 'future': self,
     68             }
     69             if self._source_traceback:
     70                 context['source_traceback'] = self._source_traceback
     71             self._loop.call_exception_handler(context)
     72         self._ov = None
     73 
     74     def cancel(self):
     75         self._cancel_overlapped()
     76         return super().cancel()
     77 
     78     def set_exception(self, exception):
     79         super().set_exception(exception)
     80         self._cancel_overlapped()
     81 
     82     def set_result(self, result):
     83         super().set_result(result)
     84         self._ov = None
     85 
     86 
     87 class _BaseWaitHandleFuture(futures.Future):
     88     """Subclass of Future which represents a wait handle."""
     89 
     90     def __init__(self, ov, handle, wait_handle, *, loop=None):
     91         super().__init__(loop=loop)
     92         if self._source_traceback:
     93             del self._source_traceback[-1]
     94         # Keep a reference to the Overlapped object to keep it alive until the
     95         # wait is unregistered
     96         self._ov = ov
     97         self._handle = handle
     98         self._wait_handle = wait_handle
     99 
    100         # Should we call UnregisterWaitEx() if the wait completes
    101         # or is cancelled?
    102         self._registered = True
    103 
    104     def _poll(self):
    105         # non-blocking wait: use a timeout of 0 millisecond
    106         return (_winapi.WaitForSingleObject(self._handle, 0) ==
    107                 _winapi.WAIT_OBJECT_0)
    108 
    109     def _repr_info(self):
    110         info = super()._repr_info()
    111         info.append('handle=%#x' % self._handle)
    112         if self._handle is not None:
    113             state = 'signaled' if self._poll() else 'waiting'
    114             info.append(state)
    115         if self._wait_handle is not None:
    116             info.append('wait_handle=%#x' % self._wait_handle)
    117         return info
    118 
    119     def _unregister_wait_cb(self, fut):
    120         # The wait was unregistered: it's not safe to destroy the Overlapped
    121         # object
    122         self._ov = None
    123 
    124     def _unregister_wait(self):
    125         if not self._registered:
    126             return
    127         self._registered = False
    128 
    129         wait_handle = self._wait_handle
    130         self._wait_handle = None
    131         try:
    132             _overlapped.UnregisterWait(wait_handle)
    133         except OSError as exc:
    134             if exc.winerror != _overlapped.ERROR_IO_PENDING:
    135                 context = {
    136                     'message': 'Failed to unregister the wait handle',
    137                     'exception': exc,
    138                     'future': self,
    139                 }
    140                 if self._source_traceback:
    141                     context['source_traceback'] = self._source_traceback
    142                 self._loop.call_exception_handler(context)
    143                 return
    144             # ERROR_IO_PENDING means that the unregister is pending
    145 
    146         self._unregister_wait_cb(None)
    147 
    148     def cancel(self):
    149         self._unregister_wait()
    150         return super().cancel()
    151 
    152     def set_exception(self, exception):
    153         self._unregister_wait()
    154         super().set_exception(exception)
    155 
    156     def set_result(self, result):
    157         self._unregister_wait()
    158         super().set_result(result)
    159 
    160 
    161 class _WaitCancelFuture(_BaseWaitHandleFuture):
    162     """Subclass of Future which represents a wait for the cancellation of a
    163     _WaitHandleFuture using an event.
    164     """
    165 
    166     def __init__(self, ov, event, wait_handle, *, loop=None):
    167         super().__init__(ov, event, wait_handle, loop=loop)
    168 
    169         self._done_callback = None
    170 
    171     def cancel(self):
    172         raise RuntimeError("_WaitCancelFuture must not be cancelled")
    173 
    174     def set_result(self, result):
    175         super().set_result(result)
    176         if self._done_callback is not None:
    177             self._done_callback(self)
    178 
    179     def set_exception(self, exception):
    180         super().set_exception(exception)
    181         if self._done_callback is not None:
    182             self._done_callback(self)
    183 
    184 
    185 class _WaitHandleFuture(_BaseWaitHandleFuture):
    186     def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
    187         super().__init__(ov, handle, wait_handle, loop=loop)
    188         self._proactor = proactor
    189         self._unregister_proactor = True
    190         self._event = _overlapped.CreateEvent(None, True, False, None)
    191         self._event_fut = None
    192 
    193     def _unregister_wait_cb(self, fut):
    194         if self._event is not None:
    195             _winapi.CloseHandle(self._event)
    196             self._event = None
    197             self._event_fut = None
    198 
    199         # If the wait was cancelled, the wait may never be signalled, so
    200         # it's required to unregister it. Otherwise, IocpProactor.close() will
    201         # wait forever for an event which will never come.
    202         #
    203         # If the IocpProactor already received the event, it's safe to call
    204         # _unregister() because we kept a reference to the Overlapped object
    205         # which is used as a unique key.
    206         self._proactor._unregister(self._ov)
    207         self._proactor = None
    208 
    209         super()._unregister_wait_cb(fut)
    210 
    211     def _unregister_wait(self):
    212         if not self._registered:
    213             return
    214         self._registered = False
    215 
    216         wait_handle = self._wait_handle
    217         self._wait_handle = None
    218         try:
    219             _overlapped.UnregisterWaitEx(wait_handle, self._event)
    220         except OSError as exc:
    221             if exc.winerror != _overlapped.ERROR_IO_PENDING:
    222                 context = {
    223                     'message': 'Failed to unregister the wait handle',
    224                     'exception': exc,
    225                     'future': self,
    226                 }
    227                 if self._source_traceback:
    228                     context['source_traceback'] = self._source_traceback
    229                 self._loop.call_exception_handler(context)
    230                 return
    231             # ERROR_IO_PENDING is not an error, the wait was unregistered
    232 
    233         self._event_fut = self._proactor._wait_cancel(self._event,
    234                                                       self._unregister_wait_cb)
    235 
    236 
    237 class PipeServer(object):
    238     """Class representing a pipe server.
    239 
    240     This is much like a bound, listening socket.
    241     """
    242     def __init__(self, address):
    243         self._address = address
    244         self._free_instances = weakref.WeakSet()
    245         # initialize the pipe attribute before calling _server_pipe_handle()
    246         # because this function can raise an exception and the destructor calls
    247         # the close() method
    248         self._pipe = None
    249         self._accept_pipe_future = None
    250         self._pipe = self._server_pipe_handle(True)
    251 
    252     def _get_unconnected_pipe(self):
    253         # Create new instance and return previous one.  This ensures
    254         # that (until the server is closed) there is always at least
    255         # one pipe handle for address.  Therefore if a client attempt
    256         # to connect it will not fail with FileNotFoundError.
    257         tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
    258         return tmp
    259 
    260     def _server_pipe_handle(self, first):
    261         # Return a wrapper for a new pipe handle.
    262         if self.closed():
    263             return None
    264         flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
    265         if first:
    266             flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
    267         h = _winapi.CreateNamedPipe(
    268             self._address, flags,
    269             _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
    270             _winapi.PIPE_WAIT,
    271             _winapi.PIPE_UNLIMITED_INSTANCES,
    272             windows_utils.BUFSIZE, windows_utils.BUFSIZE,
    273             _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
    274         pipe = windows_utils.PipeHandle(h)
    275         self._free_instances.add(pipe)
    276         return pipe
    277 
    278     def closed(self):
    279         return (self._address is None)
    280 
    281     def close(self):
    282         if self._accept_pipe_future is not None:
    283             self._accept_pipe_future.cancel()
    284             self._accept_pipe_future = None
    285         # Close all instances which have not been connected to by a client.
    286         if self._address is not None:
    287             for pipe in self._free_instances:
    288                 pipe.close()
    289             self._pipe = None
    290             self._address = None
    291             self._free_instances.clear()
    292 
    293     __del__ = close
    294 
    295 
    296 class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
    297     """Windows version of selector event loop."""
    298 
    299     def _socketpair(self):
    300         return windows_utils.socketpair()
    301 
    302 
    303 class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
    304     """Windows version of proactor event loop using IOCP."""
    305 
    306     def __init__(self, proactor=None):
    307         if proactor is None:
    308             proactor = IocpProactor()
    309         super().__init__(proactor)
    310 
    311     def _socketpair(self):
    312         return windows_utils.socketpair()
    313 
    314     @coroutine
    315     def create_pipe_connection(self, protocol_factory, address):
    316         f = self._proactor.connect_pipe(address)
    317         pipe = yield from f
    318         protocol = protocol_factory()
    319         trans = self._make_duplex_pipe_transport(pipe, protocol,
    320                                                  extra={'addr': address})
    321         return trans, protocol
    322 
    323     @coroutine
    324     def start_serving_pipe(self, protocol_factory, address):
    325         server = PipeServer(address)
    326 
    327         def loop_accept_pipe(f=None):
    328             pipe = None
    329             try:
    330                 if f:
    331                     pipe = f.result()
    332                     server._free_instances.discard(pipe)
    333 
    334                     if server.closed():
    335                         # A client connected before the server was closed:
    336                         # drop the client (close the pipe) and exit
    337                         pipe.close()
    338                         return
    339 
    340                     protocol = protocol_factory()
    341                     self._make_duplex_pipe_transport(
    342                         pipe, protocol, extra={'addr': address})
    343 
    344                 pipe = server._get_unconnected_pipe()
    345                 if pipe is None:
    346                     return
    347 
    348                 f = self._proactor.accept_pipe(pipe)
    349             except OSError as exc:
    350                 if pipe and pipe.fileno() != -1:
    351                     self.call_exception_handler({
    352                         'message': 'Pipe accept failed',
    353                         'exception': exc,
    354                         'pipe': pipe,
    355                     })
    356                     pipe.close()
    357                 elif self._debug:
    358                     logger.warning("Accept pipe failed on pipe %r",
    359                                    pipe, exc_info=True)
    360             except futures.CancelledError:
    361                 if pipe:
    362                     pipe.close()
    363             else:
    364                 server._accept_pipe_future = f
    365                 f.add_done_callback(loop_accept_pipe)
    366 
    367         self.call_soon(loop_accept_pipe)
    368         return [server]
    369 
    370     @coroutine
    371     def _make_subprocess_transport(self, protocol, args, shell,
    372                                    stdin, stdout, stderr, bufsize,
    373                                    extra=None, **kwargs):
    374         waiter = self.create_future()
    375         transp = _WindowsSubprocessTransport(self, protocol, args, shell,
    376                                              stdin, stdout, stderr, bufsize,
    377                                              waiter=waiter, extra=extra,
    378                                              **kwargs)
    379         try:
    380             yield from waiter
    381         except Exception as exc:
    382             # Workaround CPython bug #23353: using yield/yield-from in an
    383             # except block of a generator doesn't clear properly sys.exc_info()
    384             err = exc
    385         else:
    386             err = None
    387 
    388         if err is not None:
    389             transp.close()
    390             yield from transp._wait()
    391             raise err
    392 
    393         return transp
    394 
    395 
    396 class IocpProactor:
    397     """Proactor implementation using IOCP."""
    398 
    399     def __init__(self, concurrency=0xffffffff):
    400         self._loop = None
    401         self._results = []
    402         self._iocp = _overlapped.CreateIoCompletionPort(
    403             _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
    404         self._cache = {}
    405         self._registered = weakref.WeakSet()
    406         self._unregistered = []
    407         self._stopped_serving = weakref.WeakSet()
    408 
    409     def __repr__(self):
    410         return ('<%s overlapped#=%s result#=%s>'
    411                 % (self.__class__.__name__, len(self._cache),
    412                    len(self._results)))
    413 
    414     def set_loop(self, loop):
    415         self._loop = loop
    416 
    417     def select(self, timeout=None):
    418         if not self._results:
    419             self._poll(timeout)
    420         tmp = self._results
    421         self._results = []
    422         return tmp
    423 
    424     def _result(self, value):
    425         fut = self._loop.create_future()
    426         fut.set_result(value)
    427         return fut
    428 
    429     def recv(self, conn, nbytes, flags=0):
    430         self._register_with_iocp(conn)
    431         ov = _overlapped.Overlapped(NULL)
    432         try:
    433             if isinstance(conn, socket.socket):
    434                 ov.WSARecv(conn.fileno(), nbytes, flags)
    435             else:
    436                 ov.ReadFile(conn.fileno(), nbytes)
    437         except BrokenPipeError:
    438             return self._result(b'')
    439 
    440         def finish_recv(trans, key, ov):
    441             try:
    442                 return ov.getresult()
    443             except OSError as exc:
    444                 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
    445                     raise ConnectionResetError(*exc.args)
    446                 else:
    447                     raise
    448 
    449         return self._register(ov, conn, finish_recv)
    450 
    451     def send(self, conn, buf, flags=0):
    452         self._register_with_iocp(conn)
    453         ov = _overlapped.Overlapped(NULL)
    454         if isinstance(conn, socket.socket):
    455             ov.WSASend(conn.fileno(), buf, flags)
    456         else:
    457             ov.WriteFile(conn.fileno(), buf)
    458 
    459         def finish_send(trans, key, ov):
    460             try:
    461                 return ov.getresult()
    462             except OSError as exc:
    463                 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
    464                     raise ConnectionResetError(*exc.args)
    465                 else:
    466                     raise
    467 
    468         return self._register(ov, conn, finish_send)
    469 
    470     def accept(self, listener):
    471         self._register_with_iocp(listener)
    472         conn = self._get_accept_socket(listener.family)
    473         ov = _overlapped.Overlapped(NULL)
    474         ov.AcceptEx(listener.fileno(), conn.fileno())
    475 
    476         def finish_accept(trans, key, ov):
    477             ov.getresult()
    478             # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
    479             buf = struct.pack('@P', listener.fileno())
    480             conn.setsockopt(socket.SOL_SOCKET,
    481                             _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
    482             conn.settimeout(listener.gettimeout())
    483             return conn, conn.getpeername()
    484 
    485         @coroutine
    486         def accept_coro(future, conn):
    487             # Coroutine closing the accept socket if the future is cancelled
    488             try:
    489                 yield from future
    490             except futures.CancelledError:
    491                 conn.close()
    492                 raise
    493 
    494         future = self._register(ov, listener, finish_accept)
    495         coro = accept_coro(future, conn)
    496         tasks.ensure_future(coro, loop=self._loop)
    497         return future
    498 
    499     def connect(self, conn, address):
    500         self._register_with_iocp(conn)
    501         # The socket needs to be locally bound before we call ConnectEx().
    502         try:
    503             _overlapped.BindLocal(conn.fileno(), conn.family)
    504         except OSError as e:
    505             if e.winerror != errno.WSAEINVAL:
    506                 raise
    507             # Probably already locally bound; check using getsockname().
    508             if conn.getsockname()[1] == 0:
    509                 raise
    510         ov = _overlapped.Overlapped(NULL)
    511         ov.ConnectEx(conn.fileno(), address)
    512 
    513         def finish_connect(trans, key, ov):
    514             ov.getresult()
    515             # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
    516             conn.setsockopt(socket.SOL_SOCKET,
    517                             _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
    518             return conn
    519 
    520         return self._register(ov, conn, finish_connect)
    521 
    522     def accept_pipe(self, pipe):
    523         self._register_with_iocp(pipe)
    524         ov = _overlapped.Overlapped(NULL)
    525         connected = ov.ConnectNamedPipe(pipe.fileno())
    526 
    527         if connected:
    528             # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
    529             # that the pipe is connected. There is no need to wait for the
    530             # completion of the connection.
    531             return self._result(pipe)
    532 
    533         def finish_accept_pipe(trans, key, ov):
    534             ov.getresult()
    535             return pipe
    536 
    537         return self._register(ov, pipe, finish_accept_pipe)
    538 
    539     @coroutine
    540     def connect_pipe(self, address):
    541         delay = CONNECT_PIPE_INIT_DELAY
    542         while True:
    543             # Unfortunately there is no way to do an overlapped connect to a pipe.
    544             # Call CreateFile() in a loop until it doesn't fail with
    545             # ERROR_PIPE_BUSY
    546             try:
    547                 handle = _overlapped.ConnectPipe(address)
    548                 break
    549             except OSError as exc:
    550                 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
    551                     raise
    552 
    553             # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
    554             delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
    555             yield from tasks.sleep(delay, loop=self._loop)
    556 
    557         return windows_utils.PipeHandle(handle)
    558 
    559     def wait_for_handle(self, handle, timeout=None):
    560         """Wait for a handle.
    561 
    562         Return a Future object. The result of the future is True if the wait
    563         completed, or False if the wait did not complete (on timeout).
    564         """
    565         return self._wait_for_handle(handle, timeout, False)
    566 
    567     def _wait_cancel(self, event, done_callback):
    568         fut = self._wait_for_handle(event, None, True)
    569         # add_done_callback() cannot be used because the wait may only complete
    570         # in IocpProactor.close(), while the event loop is not running.
    571         fut._done_callback = done_callback
    572         return fut
    573 
    574     def _wait_for_handle(self, handle, timeout, _is_cancel):
    575         if timeout is None:
    576             ms = _winapi.INFINITE
    577         else:
    578             # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
    579             # round away from zero to wait *at least* timeout seconds.
    580             ms = math.ceil(timeout * 1e3)
    581 
    582         # We only create ov so we can use ov.address as a key for the cache.
    583         ov = _overlapped.Overlapped(NULL)
    584         wait_handle = _overlapped.RegisterWaitWithQueue(
    585             handle, self._iocp, ov.address, ms)
    586         if _is_cancel:
    587             f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
    588         else:
    589             f = _WaitHandleFuture(ov, handle, wait_handle, self,
    590                                   loop=self._loop)
    591         if f._source_traceback:
    592             del f._source_traceback[-1]
    593 
    594         def finish_wait_for_handle(trans, key, ov):
    595             # Note that this second wait means that we should only use
    596             # this with handles types where a successful wait has no
    597             # effect.  So events or processes are all right, but locks
    598             # or semaphores are not.  Also note if the handle is
    599             # signalled and then quickly reset, then we may return
    600             # False even though we have not timed out.
    601             return f._poll()
    602 
    603         self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
    604         return f
    605 
    606     def _register_with_iocp(self, obj):
    607         # To get notifications of finished ops on this objects sent to the
    608         # completion port, were must register the handle.
    609         if obj not in self._registered:
    610             self._registered.add(obj)
    611             _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
    612             # XXX We could also use SetFileCompletionNotificationModes()
    613             # to avoid sending notifications to completion port of ops
    614             # that succeed immediately.
    615 
    616     def _register(self, ov, obj, callback):
    617         # Return a future which will be set with the result of the
    618         # operation when it completes.  The future's value is actually
    619         # the value returned by callback().
    620         f = _OverlappedFuture(ov, loop=self._loop)
    621         if f._source_traceback:
    622             del f._source_traceback[-1]
    623         if not ov.pending:
    624             # The operation has completed, so no need to postpone the
    625             # work.  We cannot take this short cut if we need the
    626             # NumberOfBytes, CompletionKey values returned by
    627             # PostQueuedCompletionStatus().
    628             try:
    629                 value = callback(None, None, ov)
    630             except OSError as e:
    631                 f.set_exception(e)
    632             else:
    633                 f.set_result(value)
    634             # Even if GetOverlappedResult() was called, we have to wait for the
    635             # notification of the completion in GetQueuedCompletionStatus().
    636             # Register the overlapped operation to keep a reference to the
    637             # OVERLAPPED object, otherwise the memory is freed and Windows may
    638             # read uninitialized memory.
    639 
    640         # Register the overlapped operation for later.  Note that
    641         # we only store obj to prevent it from being garbage
    642         # collected too early.
    643         self._cache[ov.address] = (f, ov, obj, callback)
    644         return f
    645 
    646     def _unregister(self, ov):
    647         """Unregister an overlapped object.
    648 
    649         Call this method when its future has been cancelled. The event can
    650         already be signalled (pending in the proactor event queue). It is also
    651         safe if the event is never signalled (because it was cancelled).
    652         """
    653         self._unregistered.append(ov)
    654 
    655     def _get_accept_socket(self, family):
    656         s = socket.socket(family)
    657         s.settimeout(0)
    658         return s
    659 
    660     def _poll(self, timeout=None):
    661         if timeout is None:
    662             ms = INFINITE
    663         elif timeout < 0:
    664             raise ValueError("negative timeout")
    665         else:
    666             # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
    667             # round away from zero to wait *at least* timeout seconds.
    668             ms = math.ceil(timeout * 1e3)
    669             if ms >= INFINITE:
    670                 raise ValueError("timeout too big")
    671 
    672         while True:
    673             status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
    674             if status is None:
    675                 break
    676             ms = 0
    677 
    678             err, transferred, key, address = status
    679             try:
    680                 f, ov, obj, callback = self._cache.pop(address)
    681             except KeyError:
    682                 if self._loop.get_debug():
    683                     self._loop.call_exception_handler({
    684                         'message': ('GetQueuedCompletionStatus() returned an '
    685                                     'unexpected event'),
    686                         'status': ('err=%s transferred=%s key=%#x address=%#x'
    687                                    % (err, transferred, key, address)),
    688                     })
    689 
    690                 # key is either zero, or it is used to return a pipe
    691                 # handle which should be closed to avoid a leak.
    692                 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
    693                     _winapi.CloseHandle(key)
    694                 continue
    695 
    696             if obj in self._stopped_serving:
    697                 f.cancel()
    698             # Don't call the callback if _register() already read the result or
    699             # if the overlapped has been cancelled
    700             elif not f.done():
    701                 try:
    702                     value = callback(transferred, key, ov)
    703                 except OSError as e:
    704                     f.set_exception(e)
    705                     self._results.append(f)
    706                 else:
    707                     f.set_result(value)
    708                     self._results.append(f)
    709 
    710         # Remove unregisted futures
    711         for ov in self._unregistered:
    712             self._cache.pop(ov.address, None)
    713         self._unregistered.clear()
    714 
    715     def _stop_serving(self, obj):
    716         # obj is a socket or pipe handle.  It will be closed in
    717         # BaseProactorEventLoop._stop_serving() which will make any
    718         # pending operations fail quickly.
    719         self._stopped_serving.add(obj)
    720 
    721     def close(self):
    722         # Cancel remaining registered operations.
    723         for address, (fut, ov, obj, callback) in list(self._cache.items()):
    724             if fut.cancelled():
    725                 # Nothing to do with cancelled futures
    726                 pass
    727             elif isinstance(fut, _WaitCancelFuture):
    728                 # _WaitCancelFuture must not be cancelled
    729                 pass
    730             else:
    731                 try:
    732                     fut.cancel()
    733                 except OSError as exc:
    734                     if self._loop is not None:
    735                         context = {
    736                             'message': 'Cancelling a future failed',
    737                             'exception': exc,
    738                             'future': fut,
    739                         }
    740                         if fut._source_traceback:
    741                             context['source_traceback'] = fut._source_traceback
    742                         self._loop.call_exception_handler(context)
    743 
    744         while self._cache:
    745             if not self._poll(1):
    746                 logger.debug('taking long time to close proactor')
    747 
    748         self._results = []
    749         if self._iocp is not None:
    750             _winapi.CloseHandle(self._iocp)
    751             self._iocp = None
    752 
    753     def __del__(self):
    754         self.close()
    755 
    756 
    757 class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
    758 
    759     def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
    760         self._proc = windows_utils.Popen(
    761             args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
    762             bufsize=bufsize, **kwargs)
    763 
    764         def callback(f):
    765             returncode = self._proc.poll()
    766             self._process_exited(returncode)
    767 
    768         f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
    769         f.add_done_callback(callback)
    770 
    771 
    772 SelectorEventLoop = _WindowsSelectorEventLoop
    773 
    774 
    775 class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
    776     _loop_factory = SelectorEventLoop
    777 
    778 
    779 DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy
    780