Home | History | Annotate | Download | only in Lib
      1 """Generic socket server classes.
      2 
      3 This module tries to capture the various aspects of defining a server:
      4 
      5 For socket-based servers:
      6 
      7 - address family:
      8         - AF_INET{,6}: IP (Internet Protocol) sockets (default)
      9         - AF_UNIX: Unix domain sockets
     10         - others, e.g. AF_DECNET are conceivable (see <socket.h>
     11 - socket type:
     12         - SOCK_STREAM (reliable stream, e.g. TCP)
     13         - SOCK_DGRAM (datagrams, e.g. UDP)
     14 
     15 For request-based servers (including socket-based):
     16 
     17 - client address verification before further looking at the request
     18         (This is actually a hook for any processing that needs to look
     19          at the request before anything else, e.g. logging)
     20 - how to handle multiple requests:
     21         - synchronous (one request is handled at a time)
     22         - forking (each request is handled by a new process)
     23         - threading (each request is handled by a new thread)
     24 
     25 The classes in this module favor the server type that is simplest to
     26 write: a synchronous TCP/IP server.  This is bad class design, but
     27 save some typing.  (There's also the issue that a deep class hierarchy
     28 slows down method lookups.)
     29 
     30 There are five classes in an inheritance diagram, four of which represent
     31 synchronous servers of four types:
     32 
     33         +------------+
     34         | BaseServer |
     35         +------------+
     36               |
     37               v
     38         +-----------+        +------------------+
     39         | TCPServer |------->| UnixStreamServer |
     40         +-----------+        +------------------+
     41               |
     42               v
     43         +-----------+        +--------------------+
     44         | UDPServer |------->| UnixDatagramServer |
     45         +-----------+        +--------------------+
     46 
     47 Note that UnixDatagramServer derives from UDPServer, not from
     48 UnixStreamServer -- the only difference between an IP and a Unix
     49 stream server is the address family, which is simply repeated in both
     50 unix server classes.
     51 
     52 Forking and threading versions of each type of server can be created
     53 using the ForkingMixIn and ThreadingMixIn mix-in classes.  For
     54 instance, a threading UDP server class is created as follows:
     55 
     56         class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
     57 
     58 The Mix-in class must come first, since it overrides a method defined
     59 in UDPServer! Setting the various member variables also changes
     60 the behavior of the underlying server mechanism.
     61 
     62 To implement a service, you must derive a class from
     63 BaseRequestHandler and redefine its handle() method.  You can then run
     64 various versions of the service by combining one of the server classes
     65 with your request handler class.
     66 
     67 The request handler class must be different for datagram or stream
     68 services.  This can be hidden by using the request handler
     69 subclasses StreamRequestHandler or DatagramRequestHandler.
     70 
     71 Of course, you still have to use your head!
     72 
     73 For instance, it makes no sense to use a forking server if the service
     74 contains state in memory that can be modified by requests (since the
     75 modifications in the child process would never reach the initial state
     76 kept in the parent process and passed to each child).  In this case,
     77 you can use a threading server, but you will probably have to use
     78 locks to avoid two requests that come in nearly simultaneous to apply
     79 conflicting changes to the server state.
     80 
     81 On the other hand, if you are building e.g. an HTTP server, where all
     82 data is stored externally (e.g. in the file system), a synchronous
     83 class will essentially render the service "deaf" while one request is
     84 being handled -- which may be for a very long time if a client is slow
     85 to read all the data it has requested.  Here a threading or forking
     86 server is appropriate.
     87 
     88 In some cases, it may be appropriate to process part of a request
     89 synchronously, but to finish processing in a forked child depending on
     90 the request data.  This can be implemented by using a synchronous
     91 server and doing an explicit fork in the request handler class
     92 handle() method.
     93 
     94 Another approach to handling multiple simultaneous requests in an
     95 environment that supports neither threads nor fork (or where these are
     96 too expensive or inappropriate for the service) is to maintain an
     97 explicit table of partially finished requests and to use a selector to
     98 decide which request to work on next (or whether to handle a new
     99 incoming request).  This is particularly important for stream services
    100 where each client can potentially be connected for a long time (if
    101 threads or subprocesses cannot be used).
    102 
    103 Future work:
    104 - Standard classes for Sun RPC (which uses either UDP or TCP)
    105 - Standard mix-in classes to implement various authentication
    106   and encryption schemes
    107 
    108 XXX Open problems:
    109 - What to do with out-of-band data?
    110 
    111 BaseServer:
    112 - split generic "request" functionality out into BaseServer class.
    113   Copyright (C) 2000  Luke Kenneth Casson Leighton <lkcl (at] samba.org>
    114 
    115   example: read entries from a SQL database (requires overriding
    116   get_request() to return a table entry from the database).
    117   entry is processed by a RequestHandlerClass.
    118 
    119 """
    120 
    121 # Author of the BaseServer patch: Luke Kenneth Casson Leighton
    122 
    123 __version__ = "0.4"
    124 
    125 
    126 import socket
    127 import selectors
    128 import os
    129 import sys
    130 import threading
    131 from io import BufferedIOBase
    132 from time import monotonic as time
    133 
    134 __all__ = ["BaseServer", "TCPServer", "UDPServer",
    135            "ThreadingUDPServer", "ThreadingTCPServer",
    136            "BaseRequestHandler", "StreamRequestHandler",
    137            "DatagramRequestHandler", "ThreadingMixIn"]
    138 if hasattr(os, "fork"):
    139     __all__.extend(["ForkingUDPServer","ForkingTCPServer", "ForkingMixIn"])
    140 if hasattr(socket, "AF_UNIX"):
    141     __all__.extend(["UnixStreamServer","UnixDatagramServer",
    142                     "ThreadingUnixStreamServer",
    143                     "ThreadingUnixDatagramServer"])
    144 
    145 # poll/select have the advantage of not requiring any extra file descriptor,
    146 # contrarily to epoll/kqueue (also, they require a single syscall).
    147 if hasattr(selectors, 'PollSelector'):
    148     _ServerSelector = selectors.PollSelector
    149 else:
    150     _ServerSelector = selectors.SelectSelector
    151 
    152 
    153 class BaseServer:
    154 
    155     """Base class for server classes.
    156 
    157     Methods for the caller:
    158 
    159     - __init__(server_address, RequestHandlerClass)
    160     - serve_forever(poll_interval=0.5)
    161     - shutdown()
    162     - handle_request()  # if you do not use serve_forever()
    163     - fileno() -> int   # for selector
    164 
    165     Methods that may be overridden:
    166 
    167     - server_bind()
    168     - server_activate()
    169     - get_request() -> request, client_address
    170     - handle_timeout()
    171     - verify_request(request, client_address)
    172     - server_close()
    173     - process_request(request, client_address)
    174     - shutdown_request(request)
    175     - close_request(request)
    176     - service_actions()
    177     - handle_error()
    178 
    179     Methods for derived classes:
    180 
    181     - finish_request(request, client_address)
    182 
    183     Class variables that may be overridden by derived classes or
    184     instances:
    185 
    186     - timeout
    187     - address_family
    188     - socket_type
    189     - allow_reuse_address
    190 
    191     Instance variables:
    192 
    193     - RequestHandlerClass
    194     - socket
    195 
    196     """
    197 
    198     timeout = None
    199 
    200     def __init__(self, server_address, RequestHandlerClass):
    201         """Constructor.  May be extended, do not override."""
    202         self.server_address = server_address
    203         self.RequestHandlerClass = RequestHandlerClass
    204         self.__is_shut_down = threading.Event()
    205         self.__shutdown_request = False
    206 
    207     def server_activate(self):
    208         """Called by constructor to activate the server.
    209 
    210         May be overridden.
    211 
    212         """
    213         pass
    214 
    215     def serve_forever(self, poll_interval=0.5):
    216         """Handle one request at a time until shutdown.
    217 
    218         Polls for shutdown every poll_interval seconds. Ignores
    219         self.timeout. If you need to do periodic tasks, do them in
    220         another thread.
    221         """
    222         self.__is_shut_down.clear()
    223         try:
    224             # XXX: Consider using another file descriptor or connecting to the
    225             # socket to wake this up instead of polling. Polling reduces our
    226             # responsiveness to a shutdown request and wastes cpu at all other
    227             # times.
    228             with _ServerSelector() as selector:
    229                 selector.register(self, selectors.EVENT_READ)
    230 
    231                 while not self.__shutdown_request:
    232                     ready = selector.select(poll_interval)
    233                     # bpo-35017: shutdown() called during select(), exit immediately.
    234                     if self.__shutdown_request:
    235                         break
    236                     if ready:
    237                         self._handle_request_noblock()
    238 
    239                     self.service_actions()
    240         finally:
    241             self.__shutdown_request = False
    242             self.__is_shut_down.set()
    243 
    244     def shutdown(self):
    245         """Stops the serve_forever loop.
    246 
    247         Blocks until the loop has finished. This must be called while
    248         serve_forever() is running in another thread, or it will
    249         deadlock.
    250         """
    251         self.__shutdown_request = True
    252         self.__is_shut_down.wait()
    253 
    254     def service_actions(self):
    255         """Called by the serve_forever() loop.
    256 
    257         May be overridden by a subclass / Mixin to implement any code that
    258         needs to be run during the loop.
    259         """
    260         pass
    261 
    262     # The distinction between handling, getting, processing and finishing a
    263     # request is fairly arbitrary.  Remember:
    264     #
    265     # - handle_request() is the top-level call.  It calls selector.select(),
    266     #   get_request(), verify_request() and process_request()
    267     # - get_request() is different for stream or datagram sockets
    268     # - process_request() is the place that may fork a new process or create a
    269     #   new thread to finish the request
    270     # - finish_request() instantiates the request handler class; this
    271     #   constructor will handle the request all by itself
    272 
    273     def handle_request(self):
    274         """Handle one request, possibly blocking.
    275 
    276         Respects self.timeout.
    277         """
    278         # Support people who used socket.settimeout() to escape
    279         # handle_request before self.timeout was available.
    280         timeout = self.socket.gettimeout()
    281         if timeout is None:
    282             timeout = self.timeout
    283         elif self.timeout is not None:
    284             timeout = min(timeout, self.timeout)
    285         if timeout is not None:
    286             deadline = time() + timeout
    287 
    288         # Wait until a request arrives or the timeout expires - the loop is
    289         # necessary to accommodate early wakeups due to EINTR.
    290         with _ServerSelector() as selector:
    291             selector.register(self, selectors.EVENT_READ)
    292 
    293             while True:
    294                 ready = selector.select(timeout)
    295                 if ready:
    296                     return self._handle_request_noblock()
    297                 else:
    298                     if timeout is not None:
    299                         timeout = deadline - time()
    300                         if timeout < 0:
    301                             return self.handle_timeout()
    302 
    303     def _handle_request_noblock(self):
    304         """Handle one request, without blocking.
    305 
    306         I assume that selector.select() has returned that the socket is
    307         readable before this function was called, so there should be no risk of
    308         blocking in get_request().
    309         """
    310         try:
    311             request, client_address = self.get_request()
    312         except OSError:
    313             return
    314         if self.verify_request(request, client_address):
    315             try:
    316                 self.process_request(request, client_address)
    317             except Exception:
    318                 self.handle_error(request, client_address)
    319                 self.shutdown_request(request)
    320             except:
    321                 self.shutdown_request(request)
    322                 raise
    323         else:
    324             self.shutdown_request(request)
    325 
    326     def handle_timeout(self):
    327         """Called if no new request arrives within self.timeout.
    328 
    329         Overridden by ForkingMixIn.
    330         """
    331         pass
    332 
    333     def verify_request(self, request, client_address):
    334         """Verify the request.  May be overridden.
    335 
    336         Return True if we should proceed with this request.
    337 
    338         """
    339         return True
    340 
    341     def process_request(self, request, client_address):
    342         """Call finish_request.
    343 
    344         Overridden by ForkingMixIn and ThreadingMixIn.
    345 
    346         """
    347         self.finish_request(request, client_address)
    348         self.shutdown_request(request)
    349 
    350     def server_close(self):
    351         """Called to clean-up the server.
    352 
    353         May be overridden.
    354 
    355         """
    356         pass
    357 
    358     def finish_request(self, request, client_address):
    359         """Finish one request by instantiating RequestHandlerClass."""
    360         self.RequestHandlerClass(request, client_address, self)
    361 
    362     def shutdown_request(self, request):
    363         """Called to shutdown and close an individual request."""
    364         self.close_request(request)
    365 
    366     def close_request(self, request):
    367         """Called to clean up an individual request."""
    368         pass
    369 
    370     def handle_error(self, request, client_address):
    371         """Handle an error gracefully.  May be overridden.
    372 
    373         The default is to print a traceback and continue.
    374 
    375         """
    376         print('-'*40, file=sys.stderr)
    377         print('Exception happened during processing of request from',
    378             client_address, file=sys.stderr)
    379         import traceback
    380         traceback.print_exc()
    381         print('-'*40, file=sys.stderr)
    382 
    383     def __enter__(self):
    384         return self
    385 
    386     def __exit__(self, *args):
    387         self.server_close()
    388 
    389 
    390 class TCPServer(BaseServer):
    391 
    392     """Base class for various socket-based server classes.
    393 
    394     Defaults to synchronous IP stream (i.e., TCP).
    395 
    396     Methods for the caller:
    397 
    398     - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
    399     - serve_forever(poll_interval=0.5)
    400     - shutdown()
    401     - handle_request()  # if you don't use serve_forever()
    402     - fileno() -> int   # for selector
    403 
    404     Methods that may be overridden:
    405 
    406     - server_bind()
    407     - server_activate()
    408     - get_request() -> request, client_address
    409     - handle_timeout()
    410     - verify_request(request, client_address)
    411     - process_request(request, client_address)
    412     - shutdown_request(request)
    413     - close_request(request)
    414     - handle_error()
    415 
    416     Methods for derived classes:
    417 
    418     - finish_request(request, client_address)
    419 
    420     Class variables that may be overridden by derived classes or
    421     instances:
    422 
    423     - timeout
    424     - address_family
    425     - socket_type
    426     - request_queue_size (only for stream sockets)
    427     - allow_reuse_address
    428 
    429     Instance variables:
    430 
    431     - server_address
    432     - RequestHandlerClass
    433     - socket
    434 
    435     """
    436 
    437     address_family = socket.AF_INET
    438 
    439     socket_type = socket.SOCK_STREAM
    440 
    441     request_queue_size = 5
    442 
    443     allow_reuse_address = False
    444 
    445     def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
    446         """Constructor.  May be extended, do not override."""
    447         BaseServer.__init__(self, server_address, RequestHandlerClass)
    448         self.socket = socket.socket(self.address_family,
    449                                     self.socket_type)
    450         if bind_and_activate:
    451             try:
    452                 self.server_bind()
    453                 self.server_activate()
    454             except:
    455                 self.server_close()
    456                 raise
    457 
    458     def server_bind(self):
    459         """Called by constructor to bind the socket.
    460 
    461         May be overridden.
    462 
    463         """
    464         if self.allow_reuse_address:
    465             self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    466         self.socket.bind(self.server_address)
    467         self.server_address = self.socket.getsockname()
    468 
    469     def server_activate(self):
    470         """Called by constructor to activate the server.
    471 
    472         May be overridden.
    473 
    474         """
    475         self.socket.listen(self.request_queue_size)
    476 
    477     def server_close(self):
    478         """Called to clean-up the server.
    479 
    480         May be overridden.
    481 
    482         """
    483         self.socket.close()
    484 
    485     def fileno(self):
    486         """Return socket file number.
    487 
    488         Interface required by selector.
    489 
    490         """
    491         return self.socket.fileno()
    492 
    493     def get_request(self):
    494         """Get the request and client address from the socket.
    495 
    496         May be overridden.
    497 
    498         """
    499         return self.socket.accept()
    500 
    501     def shutdown_request(self, request):
    502         """Called to shutdown and close an individual request."""
    503         try:
    504             #explicitly shutdown.  socket.close() merely releases
    505             #the socket and waits for GC to perform the actual close.
    506             request.shutdown(socket.SHUT_WR)
    507         except OSError:
    508             pass #some platforms may raise ENOTCONN here
    509         self.close_request(request)
    510 
    511     def close_request(self, request):
    512         """Called to clean up an individual request."""
    513         request.close()
    514 
    515 
    516 class UDPServer(TCPServer):
    517 
    518     """UDP server class."""
    519 
    520     allow_reuse_address = False
    521 
    522     socket_type = socket.SOCK_DGRAM
    523 
    524     max_packet_size = 8192
    525 
    526     def get_request(self):
    527         data, client_addr = self.socket.recvfrom(self.max_packet_size)
    528         return (data, self.socket), client_addr
    529 
    530     def server_activate(self):
    531         # No need to call listen() for UDP.
    532         pass
    533 
    534     def shutdown_request(self, request):
    535         # No need to shutdown anything.
    536         self.close_request(request)
    537 
    538     def close_request(self, request):
    539         # No need to close anything.
    540         pass
    541 
    542 if hasattr(os, "fork"):
    543     class ForkingMixIn:
    544         """Mix-in class to handle each request in a new process."""
    545 
    546         timeout = 300
    547         active_children = None
    548         max_children = 40
    549         # If true, server_close() waits until all child processes complete.
    550         block_on_close = True
    551 
    552         def collect_children(self, *, blocking=False):
    553             """Internal routine to wait for children that have exited."""
    554             if self.active_children is None:
    555                 return
    556 
    557             # If we're above the max number of children, wait and reap them until
    558             # we go back below threshold. Note that we use waitpid(-1) below to be
    559             # able to collect children in size(<defunct children>) syscalls instead
    560             # of size(<children>): the downside is that this might reap children
    561             # which we didn't spawn, which is why we only resort to this when we're
    562             # above max_children.
    563             while len(self.active_children) >= self.max_children:
    564                 try:
    565                     pid, _ = os.waitpid(-1, 0)
    566                     self.active_children.discard(pid)
    567                 except ChildProcessError:
    568                     # we don't have any children, we're done
    569                     self.active_children.clear()
    570                 except OSError:
    571                     break
    572 
    573             # Now reap all defunct children.
    574             for pid in self.active_children.copy():
    575                 try:
    576                     flags = 0 if blocking else os.WNOHANG
    577                     pid, _ = os.waitpid(pid, flags)
    578                     # if the child hasn't exited yet, pid will be 0 and ignored by
    579                     # discard() below
    580                     self.active_children.discard(pid)
    581                 except ChildProcessError:
    582                     # someone else reaped it
    583                     self.active_children.discard(pid)
    584                 except OSError:
    585                     pass
    586 
    587         def handle_timeout(self):
    588             """Wait for zombies after self.timeout seconds of inactivity.
    589 
    590             May be extended, do not override.
    591             """
    592             self.collect_children()
    593 
    594         def service_actions(self):
    595             """Collect the zombie child processes regularly in the ForkingMixIn.
    596 
    597             service_actions is called in the BaseServer's serve_forever loop.
    598             """
    599             self.collect_children()
    600 
    601         def process_request(self, request, client_address):
    602             """Fork a new subprocess to process the request."""
    603             pid = os.fork()
    604             if pid:
    605                 # Parent process
    606                 if self.active_children is None:
    607                     self.active_children = set()
    608                 self.active_children.add(pid)
    609                 self.close_request(request)
    610                 return
    611             else:
    612                 # Child process.
    613                 # This must never return, hence os._exit()!
    614                 status = 1
    615                 try:
    616                     self.finish_request(request, client_address)
    617                     status = 0
    618                 except Exception:
    619                     self.handle_error(request, client_address)
    620                 finally:
    621                     try:
    622                         self.shutdown_request(request)
    623                     finally:
    624                         os._exit(status)
    625 
    626         def server_close(self):
    627             super().server_close()
    628             self.collect_children(blocking=self.block_on_close)
    629 
    630 
    631 class ThreadingMixIn:
    632     """Mix-in class to handle each request in a new thread."""
    633 
    634     # Decides how threads will act upon termination of the
    635     # main process
    636     daemon_threads = False
    637     # If true, server_close() waits until all non-daemonic threads terminate.
    638     block_on_close = True
    639     # For non-daemonic threads, list of threading.Threading objects
    640     # used by server_close() to wait for all threads completion.
    641     _threads = None
    642 
    643     def process_request_thread(self, request, client_address):
    644         """Same as in BaseServer but as a thread.
    645 
    646         In addition, exception handling is done here.
    647 
    648         """
    649         try:
    650             self.finish_request(request, client_address)
    651         except Exception:
    652             self.handle_error(request, client_address)
    653         finally:
    654             self.shutdown_request(request)
    655 
    656     def process_request(self, request, client_address):
    657         """Start a new thread to process the request."""
    658         t = threading.Thread(target = self.process_request_thread,
    659                              args = (request, client_address))
    660         t.daemon = self.daemon_threads
    661         if not t.daemon and self.block_on_close:
    662             if self._threads is None:
    663                 self._threads = []
    664             self._threads.append(t)
    665         t.start()
    666 
    667     def server_close(self):
    668         super().server_close()
    669         if self.block_on_close:
    670             threads = self._threads
    671             self._threads = None
    672             if threads:
    673                 for thread in threads:
    674                     thread.join()
    675 
    676 
    677 if hasattr(os, "fork"):
    678     class ForkingUDPServer(ForkingMixIn, UDPServer): pass
    679     class ForkingTCPServer(ForkingMixIn, TCPServer): pass
    680 
    681 class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
    682 class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
    683 
    684 if hasattr(socket, 'AF_UNIX'):
    685 
    686     class UnixStreamServer(TCPServer):
    687         address_family = socket.AF_UNIX
    688 
    689     class UnixDatagramServer(UDPServer):
    690         address_family = socket.AF_UNIX
    691 
    692     class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
    693 
    694     class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
    695 
    696 class BaseRequestHandler:
    697 
    698     """Base class for request handler classes.
    699 
    700     This class is instantiated for each request to be handled.  The
    701     constructor sets the instance variables request, client_address
    702     and server, and then calls the handle() method.  To implement a
    703     specific service, all you need to do is to derive a class which
    704     defines a handle() method.
    705 
    706     The handle() method can find the request as self.request, the
    707     client address as self.client_address, and the server (in case it
    708     needs access to per-server information) as self.server.  Since a
    709     separate instance is created for each request, the handle() method
    710     can define other arbitrary instance variables.
    711 
    712     """
    713 
    714     def __init__(self, request, client_address, server):
    715         self.request = request
    716         self.client_address = client_address
    717         self.server = server
    718         self.setup()
    719         try:
    720             self.handle()
    721         finally:
    722             self.finish()
    723 
    724     def setup(self):
    725         pass
    726 
    727     def handle(self):
    728         pass
    729 
    730     def finish(self):
    731         pass
    732 
    733 
    734 # The following two classes make it possible to use the same service
    735 # class for stream or datagram servers.
    736 # Each class sets up these instance variables:
    737 # - rfile: a file object from which receives the request is read
    738 # - wfile: a file object to which the reply is written
    739 # When the handle() method returns, wfile is flushed properly
    740 
    741 
    742 class StreamRequestHandler(BaseRequestHandler):
    743 
    744     """Define self.rfile and self.wfile for stream sockets."""
    745 
    746     # Default buffer sizes for rfile, wfile.
    747     # We default rfile to buffered because otherwise it could be
    748     # really slow for large data (a getc() call per byte); we make
    749     # wfile unbuffered because (a) often after a write() we want to
    750     # read and we need to flush the line; (b) big writes to unbuffered
    751     # files are typically optimized by stdio even when big reads
    752     # aren't.
    753     rbufsize = -1
    754     wbufsize = 0
    755 
    756     # A timeout to apply to the request socket, if not None.
    757     timeout = None
    758 
    759     # Disable nagle algorithm for this socket, if True.
    760     # Use only when wbufsize != 0, to avoid small packets.
    761     disable_nagle_algorithm = False
    762 
    763     def setup(self):
    764         self.connection = self.request
    765         if self.timeout is not None:
    766             self.connection.settimeout(self.timeout)
    767         if self.disable_nagle_algorithm:
    768             self.connection.setsockopt(socket.IPPROTO_TCP,
    769                                        socket.TCP_NODELAY, True)
    770         self.rfile = self.connection.makefile('rb', self.rbufsize)
    771         if self.wbufsize == 0:
    772             self.wfile = _SocketWriter(self.connection)
    773         else:
    774             self.wfile = self.connection.makefile('wb', self.wbufsize)
    775 
    776     def finish(self):
    777         if not self.wfile.closed:
    778             try:
    779                 self.wfile.flush()
    780             except socket.error:
    781                 # A final socket error may have occurred here, such as
    782                 # the local error ECONNABORTED.
    783                 pass
    784         self.wfile.close()
    785         self.rfile.close()
    786 
    787 class _SocketWriter(BufferedIOBase):
    788     """Simple writable BufferedIOBase implementation for a socket
    789 
    790     Does not hold data in a buffer, avoiding any need to call flush()."""
    791 
    792     def __init__(self, sock):
    793         self._sock = sock
    794 
    795     def writable(self):
    796         return True
    797 
    798     def write(self, b):
    799         self._sock.sendall(b)
    800         with memoryview(b) as view:
    801             return view.nbytes
    802 
    803     def fileno(self):
    804         return self._sock.fileno()
    805 
    806 class DatagramRequestHandler(BaseRequestHandler):
    807 
    808     """Define self.rfile and self.wfile for datagram sockets."""
    809 
    810     def setup(self):
    811         from io import BytesIO
    812         self.packet, self.socket = self.request
    813         self.rfile = BytesIO(self.packet)
    814         self.wfile = BytesIO()
    815 
    816     def finish(self):
    817         self.socket.sendto(self.wfile.getvalue(), self.client_address)
    818