1 # -*- Mode: Python -*- 2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp 3 # Author: Sam Rushing <rushing (at] nightmare.com> 4 5 # ====================================================================== 6 # Copyright 1996 by Sam Rushing 7 # 8 # All Rights Reserved 9 # 10 # Permission to use, copy, modify, and distribute this software and 11 # its documentation for any purpose and without fee is hereby 12 # granted, provided that the above copyright notice appear in all 13 # copies and that both that copyright notice and this permission 14 # notice appear in supporting documentation, and that the name of Sam 15 # Rushing not be used in advertising or publicity pertaining to 16 # distribution of the software without specific, written prior 17 # permission. 18 # 19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN 21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR 22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS 23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 26 # ====================================================================== 27 28 """Basic infrastructure for asynchronous socket service clients and servers. 29 30 There are only two ways to have a program on a single processor do "more 31 than one thing at a time". Multi-threaded programming is the simplest and 32 most popular way to do it, but there is another very different technique, 33 that lets you have nearly all the advantages of multi-threading, without 34 actually using multiple threads. it's really only practical if your program 35 is largely I/O bound. If your program is CPU bound, then pre-emptive 36 scheduled threads are probably what you really need. Network servers are 37 rarely CPU-bound, however. 38 39 If your operating system supports the select() system call in its I/O 40 library (and nearly all do), then you can use it to juggle multiple 41 communication channels at once; doing other work while your I/O is taking 42 place in the "background." Although this strategy can seem strange and 43 complex, especially at first, it is in many ways easier to understand and 44 control than multi-threaded programming. The module documented here solves 45 many of the difficult problems for you, making the task of building 46 sophisticated high-performance network servers and clients a snap. 47 """ 48 49 import select 50 import socket 51 import sys 52 import time 53 import warnings 54 55 import os 56 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \ 57 ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \ 58 errorcode 59 60 _DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, 61 EBADF)) 62 63 try: 64 socket_map 65 except NameError: 66 socket_map = {} 67 68 def _strerror(err): 69 try: 70 return os.strerror(err) 71 except (ValueError, OverflowError, NameError): 72 if err in errorcode: 73 return errorcode[err] 74 return "Unknown error %s" %err 75 76 class ExitNow(Exception): 77 pass 78 79 _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit) 80 81 def read(obj): 82 try: 83 obj.handle_read_event() 84 except _reraised_exceptions: 85 raise 86 except: 87 obj.handle_error() 88 89 def write(obj): 90 try: 91 obj.handle_write_event() 92 except _reraised_exceptions: 93 raise 94 except: 95 obj.handle_error() 96 97 def _exception(obj): 98 try: 99 obj.handle_expt_event() 100 except _reraised_exceptions: 101 raise 102 except: 103 obj.handle_error() 104 105 def readwrite(obj, flags): 106 try: 107 if flags & select.POLLIN: 108 obj.handle_read_event() 109 if flags & select.POLLOUT: 110 obj.handle_write_event() 111 if flags & select.POLLPRI: 112 obj.handle_expt_event() 113 if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL): 114 obj.handle_close() 115 except socket.error, e: 116 if e.args[0] not in _DISCONNECTED: 117 obj.handle_error() 118 else: 119 obj.handle_close() 120 except _reraised_exceptions: 121 raise 122 except: 123 obj.handle_error() 124 125 def poll(timeout=0.0, map=None): 126 if map is None: 127 map = socket_map 128 if map: 129 r = []; w = []; e = [] 130 for fd, obj in map.items(): 131 is_r = obj.readable() 132 is_w = obj.writable() 133 if is_r: 134 r.append(fd) 135 # accepting sockets should not be writable 136 if is_w and not obj.accepting: 137 w.append(fd) 138 if is_r or is_w: 139 e.append(fd) 140 if [] == r == w == e: 141 time.sleep(timeout) 142 return 143 144 try: 145 r, w, e = select.select(r, w, e, timeout) 146 except select.error, err: 147 if err.args[0] != EINTR: 148 raise 149 else: 150 return 151 152 for fd in r: 153 obj = map.get(fd) 154 if obj is None: 155 continue 156 read(obj) 157 158 for fd in w: 159 obj = map.get(fd) 160 if obj is None: 161 continue 162 write(obj) 163 164 for fd in e: 165 obj = map.get(fd) 166 if obj is None: 167 continue 168 _exception(obj) 169 170 def poll2(timeout=0.0, map=None): 171 # Use the poll() support added to the select module in Python 2.0 172 if map is None: 173 map = socket_map 174 if timeout is not None: 175 # timeout is in milliseconds 176 timeout = int(timeout*1000) 177 pollster = select.poll() 178 if map: 179 for fd, obj in map.items(): 180 flags = 0 181 if obj.readable(): 182 flags |= select.POLLIN | select.POLLPRI 183 # accepting sockets should not be writable 184 if obj.writable() and not obj.accepting: 185 flags |= select.POLLOUT 186 if flags: 187 # Only check for exceptions if object was either readable 188 # or writable. 189 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL 190 pollster.register(fd, flags) 191 try: 192 r = pollster.poll(timeout) 193 except select.error, err: 194 if err.args[0] != EINTR: 195 raise 196 r = [] 197 for fd, flags in r: 198 obj = map.get(fd) 199 if obj is None: 200 continue 201 readwrite(obj, flags) 202 203 poll3 = poll2 # Alias for backward compatibility 204 205 def loop(timeout=30.0, use_poll=False, map=None, count=None): 206 if map is None: 207 map = socket_map 208 209 if use_poll and hasattr(select, 'poll'): 210 poll_fun = poll2 211 else: 212 poll_fun = poll 213 214 if count is None: 215 while map: 216 poll_fun(timeout, map) 217 218 else: 219 while map and count > 0: 220 poll_fun(timeout, map) 221 count = count - 1 222 223 class dispatcher: 224 225 debug = False 226 connected = False 227 accepting = False 228 connecting = False 229 closing = False 230 addr = None 231 ignore_log_types = frozenset(['warning']) 232 233 def __init__(self, sock=None, map=None): 234 if map is None: 235 self._map = socket_map 236 else: 237 self._map = map 238 239 self._fileno = None 240 241 if sock: 242 # Set to nonblocking just to make sure for cases where we 243 # get a socket from a blocking source. 244 sock.setblocking(0) 245 self.set_socket(sock, map) 246 self.connected = True 247 # The constructor no longer requires that the socket 248 # passed be connected. 249 try: 250 self.addr = sock.getpeername() 251 except socket.error, err: 252 if err.args[0] in (ENOTCONN, EINVAL): 253 # To handle the case where we got an unconnected 254 # socket. 255 self.connected = False 256 else: 257 # The socket is broken in some unknown way, alert 258 # the user and remove it from the map (to prevent 259 # polling of broken sockets). 260 self.del_channel(map) 261 raise 262 else: 263 self.socket = None 264 265 def __repr__(self): 266 status = [self.__class__.__module__+"."+self.__class__.__name__] 267 if self.accepting and self.addr: 268 status.append('listening') 269 elif self.connected: 270 status.append('connected') 271 if self.addr is not None: 272 try: 273 status.append('%s:%d' % self.addr) 274 except TypeError: 275 status.append(repr(self.addr)) 276 return '<%s at %#x>' % (' '.join(status), id(self)) 277 278 __str__ = __repr__ 279 280 def add_channel(self, map=None): 281 #self.log_info('adding channel %s' % self) 282 if map is None: 283 map = self._map 284 map[self._fileno] = self 285 286 def del_channel(self, map=None): 287 fd = self._fileno 288 if map is None: 289 map = self._map 290 if fd in map: 291 #self.log_info('closing channel %d:%s' % (fd, self)) 292 del map[fd] 293 self._fileno = None 294 295 def create_socket(self, family, type): 296 self.family_and_type = family, type 297 sock = socket.socket(family, type) 298 sock.setblocking(0) 299 self.set_socket(sock) 300 301 def set_socket(self, sock, map=None): 302 self.socket = sock 303 ## self.__dict__['socket'] = sock 304 self._fileno = sock.fileno() 305 self.add_channel(map) 306 307 def set_reuse_addr(self): 308 # try to re-use a server port if possible 309 try: 310 self.socket.setsockopt( 311 socket.SOL_SOCKET, socket.SO_REUSEADDR, 312 self.socket.getsockopt(socket.SOL_SOCKET, 313 socket.SO_REUSEADDR) | 1 314 ) 315 except socket.error: 316 pass 317 318 # ================================================== 319 # predicates for select() 320 # these are used as filters for the lists of sockets 321 # to pass to select(). 322 # ================================================== 323 324 def readable(self): 325 return True 326 327 def writable(self): 328 return True 329 330 # ================================================== 331 # socket object methods. 332 # ================================================== 333 334 def listen(self, num): 335 self.accepting = True 336 if os.name == 'nt' and num > 5: 337 num = 5 338 return self.socket.listen(num) 339 340 def bind(self, addr): 341 self.addr = addr 342 return self.socket.bind(addr) 343 344 def connect(self, address): 345 self.connected = False 346 self.connecting = True 347 err = self.socket.connect_ex(address) 348 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \ 349 or err == EINVAL and os.name in ('nt', 'ce'): 350 self.addr = address 351 return 352 if err in (0, EISCONN): 353 self.addr = address 354 self.handle_connect_event() 355 else: 356 raise socket.error(err, errorcode[err]) 357 358 def accept(self): 359 # XXX can return either an address pair or None 360 try: 361 conn, addr = self.socket.accept() 362 except TypeError: 363 return None 364 except socket.error as why: 365 if why.args[0] in (EWOULDBLOCK, ECONNABORTED, EAGAIN): 366 return None 367 else: 368 raise 369 else: 370 return conn, addr 371 372 def send(self, data): 373 try: 374 result = self.socket.send(data) 375 return result 376 except socket.error, why: 377 if why.args[0] == EWOULDBLOCK: 378 return 0 379 elif why.args[0] in _DISCONNECTED: 380 self.handle_close() 381 return 0 382 else: 383 raise 384 385 def recv(self, buffer_size): 386 try: 387 data = self.socket.recv(buffer_size) 388 if not data: 389 # a closed connection is indicated by signaling 390 # a read condition, and having recv() return 0. 391 self.handle_close() 392 return '' 393 else: 394 return data 395 except socket.error, why: 396 # winsock sometimes raises ENOTCONN 397 if why.args[0] in _DISCONNECTED: 398 self.handle_close() 399 return '' 400 else: 401 raise 402 403 def close(self): 404 self.connected = False 405 self.accepting = False 406 self.connecting = False 407 self.del_channel() 408 try: 409 self.socket.close() 410 except socket.error, why: 411 if why.args[0] not in (ENOTCONN, EBADF): 412 raise 413 414 # cheap inheritance, used to pass all other attribute 415 # references to the underlying socket object. 416 def __getattr__(self, attr): 417 try: 418 retattr = getattr(self.socket, attr) 419 except AttributeError: 420 raise AttributeError("%s instance has no attribute '%s'" 421 %(self.__class__.__name__, attr)) 422 else: 423 msg = "%(me)s.%(attr)s is deprecated. Use %(me)s.socket.%(attr)s " \ 424 "instead." % {'me': self.__class__.__name__, 'attr':attr} 425 warnings.warn(msg, DeprecationWarning, stacklevel=2) 426 return retattr 427 428 # log and log_info may be overridden to provide more sophisticated 429 # logging and warning methods. In general, log is for 'hit' logging 430 # and 'log_info' is for informational, warning and error logging. 431 432 def log(self, message): 433 sys.stderr.write('log: %s\n' % str(message)) 434 435 def log_info(self, message, type='info'): 436 if type not in self.ignore_log_types: 437 print '%s: %s' % (type, message) 438 439 def handle_read_event(self): 440 if self.accepting: 441 # accepting sockets are never connected, they "spawn" new 442 # sockets that are connected 443 self.handle_accept() 444 elif not self.connected: 445 if self.connecting: 446 self.handle_connect_event() 447 self.handle_read() 448 else: 449 self.handle_read() 450 451 def handle_connect_event(self): 452 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 453 if err != 0: 454 raise socket.error(err, _strerror(err)) 455 self.handle_connect() 456 self.connected = True 457 self.connecting = False 458 459 def handle_write_event(self): 460 if self.accepting: 461 # Accepting sockets shouldn't get a write event. 462 # We will pretend it didn't happen. 463 return 464 465 if not self.connected: 466 if self.connecting: 467 self.handle_connect_event() 468 self.handle_write() 469 470 def handle_expt_event(self): 471 # handle_expt_event() is called if there might be an error on the 472 # socket, or if there is OOB data 473 # check for the error condition first 474 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 475 if err != 0: 476 # we can get here when select.select() says that there is an 477 # exceptional condition on the socket 478 # since there is an error, we'll go ahead and close the socket 479 # like we would in a subclassed handle_read() that received no 480 # data 481 self.handle_close() 482 else: 483 self.handle_expt() 484 485 def handle_error(self): 486 nil, t, v, tbinfo = compact_traceback() 487 488 # sometimes a user repr method will crash. 489 try: 490 self_repr = repr(self) 491 except: 492 self_repr = '<__repr__(self) failed for object at %0x>' % id(self) 493 494 self.log_info( 495 'uncaptured python exception, closing channel %s (%s:%s %s)' % ( 496 self_repr, 497 t, 498 v, 499 tbinfo 500 ), 501 'error' 502 ) 503 self.handle_close() 504 505 def handle_expt(self): 506 self.log_info('unhandled incoming priority event', 'warning') 507 508 def handle_read(self): 509 self.log_info('unhandled read event', 'warning') 510 511 def handle_write(self): 512 self.log_info('unhandled write event', 'warning') 513 514 def handle_connect(self): 515 self.log_info('unhandled connect event', 'warning') 516 517 def handle_accept(self): 518 self.log_info('unhandled accept event', 'warning') 519 520 def handle_close(self): 521 self.log_info('unhandled close event', 'warning') 522 self.close() 523 524 # --------------------------------------------------------------------------- 525 # adds simple buffered output capability, useful for simple clients. 526 # [for more sophisticated usage use asynchat.async_chat] 527 # --------------------------------------------------------------------------- 528 529 class dispatcher_with_send(dispatcher): 530 531 def __init__(self, sock=None, map=None): 532 dispatcher.__init__(self, sock, map) 533 self.out_buffer = '' 534 535 def initiate_send(self): 536 num_sent = 0 537 num_sent = dispatcher.send(self, self.out_buffer[:512]) 538 self.out_buffer = self.out_buffer[num_sent:] 539 540 def handle_write(self): 541 self.initiate_send() 542 543 def writable(self): 544 return (not self.connected) or len(self.out_buffer) 545 546 def send(self, data): 547 if self.debug: 548 self.log_info('sending %s' % repr(data)) 549 self.out_buffer = self.out_buffer + data 550 self.initiate_send() 551 552 # --------------------------------------------------------------------------- 553 # used for debugging. 554 # --------------------------------------------------------------------------- 555 556 def compact_traceback(): 557 t, v, tb = sys.exc_info() 558 tbinfo = [] 559 if not tb: # Must have a traceback 560 raise AssertionError("traceback does not exist") 561 while tb: 562 tbinfo.append(( 563 tb.tb_frame.f_code.co_filename, 564 tb.tb_frame.f_code.co_name, 565 str(tb.tb_lineno) 566 )) 567 tb = tb.tb_next 568 569 # just to be safe 570 del tb 571 572 file, function, line = tbinfo[-1] 573 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) 574 return (file, function, line), t, v, info 575 576 def close_all(map=None, ignore_all=False): 577 if map is None: 578 map = socket_map 579 for x in map.values(): 580 try: 581 x.close() 582 except OSError, x: 583 if x.args[0] == EBADF: 584 pass 585 elif not ignore_all: 586 raise 587 except _reraised_exceptions: 588 raise 589 except: 590 if not ignore_all: 591 raise 592 map.clear() 593 594 # Asynchronous File I/O: 595 # 596 # After a little research (reading man pages on various unixen, and 597 # digging through the linux kernel), I've determined that select() 598 # isn't meant for doing asynchronous file i/o. 599 # Heartening, though - reading linux/mm/filemap.c shows that linux 600 # supports asynchronous read-ahead. So _MOST_ of the time, the data 601 # will be sitting in memory for us already when we go to read it. 602 # 603 # What other OS's (besides NT) support async file i/o? [VMS?] 604 # 605 # Regardless, this is useful for pipes, and stdin/stdout... 606 607 if os.name == 'posix': 608 import fcntl 609 610 class file_wrapper: 611 # Here we override just enough to make a file 612 # look like a socket for the purposes of asyncore. 613 # The passed fd is automatically os.dup()'d 614 615 def __init__(self, fd): 616 self.fd = os.dup(fd) 617 618 def recv(self, *args): 619 return os.read(self.fd, *args) 620 621 def send(self, *args): 622 return os.write(self.fd, *args) 623 624 def getsockopt(self, level, optname, buflen=None): 625 if (level == socket.SOL_SOCKET and 626 optname == socket.SO_ERROR and 627 not buflen): 628 return 0 629 raise NotImplementedError("Only asyncore specific behaviour " 630 "implemented.") 631 632 read = recv 633 write = send 634 635 def close(self): 636 if self.fd < 0: 637 return 638 fd = self.fd 639 self.fd = -1 640 os.close(fd) 641 642 def fileno(self): 643 return self.fd 644 645 class file_dispatcher(dispatcher): 646 647 def __init__(self, fd, map=None): 648 dispatcher.__init__(self, None, map) 649 self.connected = True 650 try: 651 fd = fd.fileno() 652 except AttributeError: 653 pass 654 self.set_file(fd) 655 # set it to non-blocking mode 656 flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) 657 flags = flags | os.O_NONBLOCK 658 fcntl.fcntl(fd, fcntl.F_SETFL, flags) 659 660 def set_file(self, fd): 661 self.socket = file_wrapper(fd) 662 self._fileno = self.socket.fileno() 663 self.add_channel() 664