1 # -*- Mode: Python -*- 2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp 3 # Author: Sam Rushing <rushing (at] nightmare.com> 4 5 # ====================================================================== 6 # Copyright 1996 by Sam Rushing 7 # 8 # All Rights Reserved 9 # 10 # Permission to use, copy, modify, and distribute this software and 11 # its documentation for any purpose and without fee is hereby 12 # granted, provided that the above copyright notice appear in all 13 # copies and that both that copyright notice and this permission 14 # notice appear in supporting documentation, and that the name of Sam 15 # Rushing not be used in advertising or publicity pertaining to 16 # distribution of the software without specific, written prior 17 # permission. 18 # 19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN 21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR 22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS 23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 26 # ====================================================================== 27 28 """Basic infrastructure for asynchronous socket service clients and servers. 29 30 There are only two ways to have a program on a single processor do "more 31 than one thing at a time". Multi-threaded programming is the simplest and 32 most popular way to do it, but there is another very different technique, 33 that lets you have nearly all the advantages of multi-threading, without 34 actually using multiple threads. it's really only practical if your program 35 is largely I/O bound. If your program is CPU bound, then pre-emptive 36 scheduled threads are probably what you really need. Network servers are 37 rarely CPU-bound, however. 38 39 If your operating system supports the select() system call in its I/O 40 library (and nearly all do), then you can use it to juggle multiple 41 communication channels at once; doing other work while your I/O is taking 42 place in the "background." Although this strategy can seem strange and 43 complex, especially at first, it is in many ways easier to understand and 44 control than multi-threaded programming. The module documented here solves 45 many of the difficult problems for you, making the task of building 46 sophisticated high-performance network servers and clients a snap. 47 """ 48 49 import select 50 import socket 51 import sys 52 import time 53 import warnings 54 55 import os 56 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \ 57 ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \ 58 errorcode 59 60 _DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, 61 EBADF}) 62 63 try: 64 socket_map 65 except NameError: 66 socket_map = {} 67 68 def _strerror(err): 69 try: 70 return os.strerror(err) 71 except (ValueError, OverflowError, NameError): 72 if err in errorcode: 73 return errorcode[err] 74 return "Unknown error %s" %err 75 76 class ExitNow(Exception): 77 pass 78 79 _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit) 80 81 def read(obj): 82 try: 83 obj.handle_read_event() 84 except _reraised_exceptions: 85 raise 86 except: 87 obj.handle_error() 88 89 def write(obj): 90 try: 91 obj.handle_write_event() 92 except _reraised_exceptions: 93 raise 94 except: 95 obj.handle_error() 96 97 def _exception(obj): 98 try: 99 obj.handle_expt_event() 100 except _reraised_exceptions: 101 raise 102 except: 103 obj.handle_error() 104 105 def readwrite(obj, flags): 106 try: 107 if flags & select.POLLIN: 108 obj.handle_read_event() 109 if flags & select.POLLOUT: 110 obj.handle_write_event() 111 if flags & select.POLLPRI: 112 obj.handle_expt_event() 113 if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL): 114 obj.handle_close() 115 except OSError as e: 116 if e.args[0] not in _DISCONNECTED: 117 obj.handle_error() 118 else: 119 obj.handle_close() 120 except _reraised_exceptions: 121 raise 122 except: 123 obj.handle_error() 124 125 def poll(timeout=0.0, map=None): 126 if map is None: 127 map = socket_map 128 if map: 129 r = []; w = []; e = [] 130 for fd, obj in list(map.items()): 131 is_r = obj.readable() 132 is_w = obj.writable() 133 if is_r: 134 r.append(fd) 135 # accepting sockets should not be writable 136 if is_w and not obj.accepting: 137 w.append(fd) 138 if is_r or is_w: 139 e.append(fd) 140 if [] == r == w == e: 141 time.sleep(timeout) 142 return 143 144 r, w, e = select.select(r, w, e, timeout) 145 146 for fd in r: 147 obj = map.get(fd) 148 if obj is None: 149 continue 150 read(obj) 151 152 for fd in w: 153 obj = map.get(fd) 154 if obj is None: 155 continue 156 write(obj) 157 158 for fd in e: 159 obj = map.get(fd) 160 if obj is None: 161 continue 162 _exception(obj) 163 164 def poll2(timeout=0.0, map=None): 165 # Use the poll() support added to the select module in Python 2.0 166 if map is None: 167 map = socket_map 168 if timeout is not None: 169 # timeout is in milliseconds 170 timeout = int(timeout*1000) 171 pollster = select.poll() 172 if map: 173 for fd, obj in list(map.items()): 174 flags = 0 175 if obj.readable(): 176 flags |= select.POLLIN | select.POLLPRI 177 # accepting sockets should not be writable 178 if obj.writable() and not obj.accepting: 179 flags |= select.POLLOUT 180 if flags: 181 pollster.register(fd, flags) 182 183 r = pollster.poll(timeout) 184 for fd, flags in r: 185 obj = map.get(fd) 186 if obj is None: 187 continue 188 readwrite(obj, flags) 189 190 poll3 = poll2 # Alias for backward compatibility 191 192 def loop(timeout=30.0, use_poll=False, map=None, count=None): 193 if map is None: 194 map = socket_map 195 196 if use_poll and hasattr(select, 'poll'): 197 poll_fun = poll2 198 else: 199 poll_fun = poll 200 201 if count is None: 202 while map: 203 poll_fun(timeout, map) 204 205 else: 206 while map and count > 0: 207 poll_fun(timeout, map) 208 count = count - 1 209 210 class dispatcher: 211 212 debug = False 213 connected = False 214 accepting = False 215 connecting = False 216 closing = False 217 addr = None 218 ignore_log_types = frozenset({'warning'}) 219 220 def __init__(self, sock=None, map=None): 221 if map is None: 222 self._map = socket_map 223 else: 224 self._map = map 225 226 self._fileno = None 227 228 if sock: 229 # Set to nonblocking just to make sure for cases where we 230 # get a socket from a blocking source. 231 sock.setblocking(0) 232 self.set_socket(sock, map) 233 self.connected = True 234 # The constructor no longer requires that the socket 235 # passed be connected. 236 try: 237 self.addr = sock.getpeername() 238 except OSError as err: 239 if err.args[0] in (ENOTCONN, EINVAL): 240 # To handle the case where we got an unconnected 241 # socket. 242 self.connected = False 243 else: 244 # The socket is broken in some unknown way, alert 245 # the user and remove it from the map (to prevent 246 # polling of broken sockets). 247 self.del_channel(map) 248 raise 249 else: 250 self.socket = None 251 252 def __repr__(self): 253 status = [self.__class__.__module__+"."+self.__class__.__qualname__] 254 if self.accepting and self.addr: 255 status.append('listening') 256 elif self.connected: 257 status.append('connected') 258 if self.addr is not None: 259 try: 260 status.append('%s:%d' % self.addr) 261 except TypeError: 262 status.append(repr(self.addr)) 263 return '<%s at %#x>' % (' '.join(status), id(self)) 264 265 __str__ = __repr__ 266 267 def add_channel(self, map=None): 268 #self.log_info('adding channel %s' % self) 269 if map is None: 270 map = self._map 271 map[self._fileno] = self 272 273 def del_channel(self, map=None): 274 fd = self._fileno 275 if map is None: 276 map = self._map 277 if fd in map: 278 #self.log_info('closing channel %d:%s' % (fd, self)) 279 del map[fd] 280 self._fileno = None 281 282 def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM): 283 self.family_and_type = family, type 284 sock = socket.socket(family, type) 285 sock.setblocking(0) 286 self.set_socket(sock) 287 288 def set_socket(self, sock, map=None): 289 self.socket = sock 290 ## self.__dict__['socket'] = sock 291 self._fileno = sock.fileno() 292 self.add_channel(map) 293 294 def set_reuse_addr(self): 295 # try to re-use a server port if possible 296 try: 297 self.socket.setsockopt( 298 socket.SOL_SOCKET, socket.SO_REUSEADDR, 299 self.socket.getsockopt(socket.SOL_SOCKET, 300 socket.SO_REUSEADDR) | 1 301 ) 302 except OSError: 303 pass 304 305 # ================================================== 306 # predicates for select() 307 # these are used as filters for the lists of sockets 308 # to pass to select(). 309 # ================================================== 310 311 def readable(self): 312 return True 313 314 def writable(self): 315 return True 316 317 # ================================================== 318 # socket object methods. 319 # ================================================== 320 321 def listen(self, num): 322 self.accepting = True 323 if os.name == 'nt' and num > 5: 324 num = 5 325 return self.socket.listen(num) 326 327 def bind(self, addr): 328 self.addr = addr 329 return self.socket.bind(addr) 330 331 def connect(self, address): 332 self.connected = False 333 self.connecting = True 334 err = self.socket.connect_ex(address) 335 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \ 336 or err == EINVAL and os.name == 'nt': 337 self.addr = address 338 return 339 if err in (0, EISCONN): 340 self.addr = address 341 self.handle_connect_event() 342 else: 343 raise OSError(err, errorcode[err]) 344 345 def accept(self): 346 # XXX can return either an address pair or None 347 try: 348 conn, addr = self.socket.accept() 349 except TypeError: 350 return None 351 except OSError as why: 352 if why.args[0] in (EWOULDBLOCK, ECONNABORTED, EAGAIN): 353 return None 354 else: 355 raise 356 else: 357 return conn, addr 358 359 def send(self, data): 360 try: 361 result = self.socket.send(data) 362 return result 363 except OSError as why: 364 if why.args[0] == EWOULDBLOCK: 365 return 0 366 elif why.args[0] in _DISCONNECTED: 367 self.handle_close() 368 return 0 369 else: 370 raise 371 372 def recv(self, buffer_size): 373 try: 374 data = self.socket.recv(buffer_size) 375 if not data: 376 # a closed connection is indicated by signaling 377 # a read condition, and having recv() return 0. 378 self.handle_close() 379 return b'' 380 else: 381 return data 382 except OSError as why: 383 # winsock sometimes raises ENOTCONN 384 if why.args[0] in _DISCONNECTED: 385 self.handle_close() 386 return b'' 387 else: 388 raise 389 390 def close(self): 391 self.connected = False 392 self.accepting = False 393 self.connecting = False 394 self.del_channel() 395 if self.socket is not None: 396 try: 397 self.socket.close() 398 except OSError as why: 399 if why.args[0] not in (ENOTCONN, EBADF): 400 raise 401 402 # log and log_info may be overridden to provide more sophisticated 403 # logging and warning methods. In general, log is for 'hit' logging 404 # and 'log_info' is for informational, warning and error logging. 405 406 def log(self, message): 407 sys.stderr.write('log: %s\n' % str(message)) 408 409 def log_info(self, message, type='info'): 410 if type not in self.ignore_log_types: 411 print('%s: %s' % (type, message)) 412 413 def handle_read_event(self): 414 if self.accepting: 415 # accepting sockets are never connected, they "spawn" new 416 # sockets that are connected 417 self.handle_accept() 418 elif not self.connected: 419 if self.connecting: 420 self.handle_connect_event() 421 self.handle_read() 422 else: 423 self.handle_read() 424 425 def handle_connect_event(self): 426 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 427 if err != 0: 428 raise OSError(err, _strerror(err)) 429 self.handle_connect() 430 self.connected = True 431 self.connecting = False 432 433 def handle_write_event(self): 434 if self.accepting: 435 # Accepting sockets shouldn't get a write event. 436 # We will pretend it didn't happen. 437 return 438 439 if not self.connected: 440 if self.connecting: 441 self.handle_connect_event() 442 self.handle_write() 443 444 def handle_expt_event(self): 445 # handle_expt_event() is called if there might be an error on the 446 # socket, or if there is OOB data 447 # check for the error condition first 448 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 449 if err != 0: 450 # we can get here when select.select() says that there is an 451 # exceptional condition on the socket 452 # since there is an error, we'll go ahead and close the socket 453 # like we would in a subclassed handle_read() that received no 454 # data 455 self.handle_close() 456 else: 457 self.handle_expt() 458 459 def handle_error(self): 460 nil, t, v, tbinfo = compact_traceback() 461 462 # sometimes a user repr method will crash. 463 try: 464 self_repr = repr(self) 465 except: 466 self_repr = '<__repr__(self) failed for object at %0x>' % id(self) 467 468 self.log_info( 469 'uncaptured python exception, closing channel %s (%s:%s %s)' % ( 470 self_repr, 471 t, 472 v, 473 tbinfo 474 ), 475 'error' 476 ) 477 self.handle_close() 478 479 def handle_expt(self): 480 self.log_info('unhandled incoming priority event', 'warning') 481 482 def handle_read(self): 483 self.log_info('unhandled read event', 'warning') 484 485 def handle_write(self): 486 self.log_info('unhandled write event', 'warning') 487 488 def handle_connect(self): 489 self.log_info('unhandled connect event', 'warning') 490 491 def handle_accept(self): 492 pair = self.accept() 493 if pair is not None: 494 self.handle_accepted(*pair) 495 496 def handle_accepted(self, sock, addr): 497 sock.close() 498 self.log_info('unhandled accepted event', 'warning') 499 500 def handle_close(self): 501 self.log_info('unhandled close event', 'warning') 502 self.close() 503 504 # --------------------------------------------------------------------------- 505 # adds simple buffered output capability, useful for simple clients. 506 # [for more sophisticated usage use asynchat.async_chat] 507 # --------------------------------------------------------------------------- 508 509 class dispatcher_with_send(dispatcher): 510 511 def __init__(self, sock=None, map=None): 512 dispatcher.__init__(self, sock, map) 513 self.out_buffer = b'' 514 515 def initiate_send(self): 516 num_sent = 0 517 num_sent = dispatcher.send(self, self.out_buffer[:65536]) 518 self.out_buffer = self.out_buffer[num_sent:] 519 520 def handle_write(self): 521 self.initiate_send() 522 523 def writable(self): 524 return (not self.connected) or len(self.out_buffer) 525 526 def send(self, data): 527 if self.debug: 528 self.log_info('sending %s' % repr(data)) 529 self.out_buffer = self.out_buffer + data 530 self.initiate_send() 531 532 # --------------------------------------------------------------------------- 533 # used for debugging. 534 # --------------------------------------------------------------------------- 535 536 def compact_traceback(): 537 t, v, tb = sys.exc_info() 538 tbinfo = [] 539 if not tb: # Must have a traceback 540 raise AssertionError("traceback does not exist") 541 while tb: 542 tbinfo.append(( 543 tb.tb_frame.f_code.co_filename, 544 tb.tb_frame.f_code.co_name, 545 str(tb.tb_lineno) 546 )) 547 tb = tb.tb_next 548 549 # just to be safe 550 del tb 551 552 file, function, line = tbinfo[-1] 553 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) 554 return (file, function, line), t, v, info 555 556 def close_all(map=None, ignore_all=False): 557 if map is None: 558 map = socket_map 559 for x in list(map.values()): 560 try: 561 x.close() 562 except OSError as x: 563 if x.args[0] == EBADF: 564 pass 565 elif not ignore_all: 566 raise 567 except _reraised_exceptions: 568 raise 569 except: 570 if not ignore_all: 571 raise 572 map.clear() 573 574 # Asynchronous File I/O: 575 # 576 # After a little research (reading man pages on various unixen, and 577 # digging through the linux kernel), I've determined that select() 578 # isn't meant for doing asynchronous file i/o. 579 # Heartening, though - reading linux/mm/filemap.c shows that linux 580 # supports asynchronous read-ahead. So _MOST_ of the time, the data 581 # will be sitting in memory for us already when we go to read it. 582 # 583 # What other OS's (besides NT) support async file i/o? [VMS?] 584 # 585 # Regardless, this is useful for pipes, and stdin/stdout... 586 587 if os.name == 'posix': 588 class file_wrapper: 589 # Here we override just enough to make a file 590 # look like a socket for the purposes of asyncore. 591 # The passed fd is automatically os.dup()'d 592 593 def __init__(self, fd): 594 self.fd = os.dup(fd) 595 596 def __del__(self): 597 if self.fd >= 0: 598 warnings.warn("unclosed file %r" % self, ResourceWarning, 599 source=self) 600 self.close() 601 602 def recv(self, *args): 603 return os.read(self.fd, *args) 604 605 def send(self, *args): 606 return os.write(self.fd, *args) 607 608 def getsockopt(self, level, optname, buflen=None): 609 if (level == socket.SOL_SOCKET and 610 optname == socket.SO_ERROR and 611 not buflen): 612 return 0 613 raise NotImplementedError("Only asyncore specific behaviour " 614 "implemented.") 615 616 read = recv 617 write = send 618 619 def close(self): 620 if self.fd < 0: 621 return 622 os.close(self.fd) 623 self.fd = -1 624 625 def fileno(self): 626 return self.fd 627 628 class file_dispatcher(dispatcher): 629 630 def __init__(self, fd, map=None): 631 dispatcher.__init__(self, None, map) 632 self.connected = True 633 try: 634 fd = fd.fileno() 635 except AttributeError: 636 pass 637 self.set_file(fd) 638 # set it to non-blocking mode 639 os.set_blocking(fd, False) 640 641 def set_file(self, fd): 642 self.socket = file_wrapper(fd) 643 self._fileno = self.socket.fileno() 644 self.add_channel() 645