1 # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) 2 # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php 3 # (c) 2005 Clark C. Evans 4 # This module is part of the Python Paste Project and is released under 5 # the MIT License: http://www.opensource.org/licenses/mit-license.php 6 # This code was written with funding by http://prometheusresearch.com 7 """ 8 WSGI HTTP Server 9 10 This is a minimalistic WSGI server using Python's built-in BaseHTTPServer; 11 if pyOpenSSL is installed, it also provides SSL capabilities. 12 """ 13 14 # @@: add in protection against HTTP/1.0 clients who claim to 15 # be 1.1 but do not send a Content-Length 16 17 # @@: add support for chunked encoding, this is not a 1.1 server 18 # till this is completed. 19 20 from __future__ import print_function 21 import atexit 22 import traceback 23 import socket, sys, threading 24 import posixpath 25 import six 26 import time 27 import os 28 from itertools import count 29 from six.moves import _thread 30 from six.moves import queue 31 from six.moves.BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer 32 from six.moves.socketserver import ThreadingMixIn 33 from six.moves.urllib.parse import unquote, urlsplit 34 from paste.util import converters 35 import logging 36 try: 37 from paste.util import killthread 38 except ImportError: 39 # Not available, probably no ctypes 40 killthread = None 41 42 __all__ = ['WSGIHandlerMixin', 'WSGIServer', 'WSGIHandler', 'serve'] 43 __version__ = "0.5" 44 45 46 def _get_headers(headers, k): 47 """ 48 Private function for abstracting differences in getting HTTP request 49 headers on Python 2 vs. Python 3 50 """ 51 52 if hasattr(headers, 'get_all'): 53 return headers.get_all(k) # Python 3 - email.message.Message 54 else: 55 return headers.getheaders(k) # Python 2 - mimetools.Message 56 57 58 class ContinueHook(object): 59 """ 60 When a client request includes a 'Expect: 100-continue' header, then 61 it is the responsibility of the server to send 100 Continue when it 62 is ready for the content body. This allows authentication, access 63 levels, and other exceptions to be detected *before* bandwith is 64 spent on the request body. 65 66 This is a rfile wrapper that implements this functionality by 67 sending 100 Continue to the client immediately after the user 68 requests the content via a read() operation on the rfile stream. 69 After this response is sent, it becomes a pass-through object. 70 """ 71 72 def __init__(self, rfile, write): 73 self._ContinueFile_rfile = rfile 74 self._ContinueFile_write = write 75 for attr in ('close', 'closed', 'fileno', 'flush', 76 'mode', 'bufsize', 'softspace'): 77 if hasattr(rfile, attr): 78 setattr(self, attr, getattr(rfile, attr)) 79 for attr in ('read', 'readline', 'readlines'): 80 if hasattr(rfile, attr): 81 setattr(self, attr, getattr(self, '_ContinueFile_' + attr)) 82 83 def _ContinueFile_send(self): 84 self._ContinueFile_write("HTTP/1.1 100 Continue\r\n\r\n") 85 rfile = self._ContinueFile_rfile 86 for attr in ('read', 'readline', 'readlines'): 87 if hasattr(rfile, attr): 88 setattr(self, attr, getattr(rfile, attr)) 89 90 def _ContinueFile_read(self, size=-1): 91 self._ContinueFile_send() 92 return self._ContinueFile_rfile.read(size) 93 94 def _ContinueFile_readline(self, size=-1): 95 self._ContinueFile_send() 96 return self._ContinueFile_rfile.readline(size) 97 98 def _ContinueFile_readlines(self, sizehint=0): 99 self._ContinueFile_send() 100 return self._ContinueFile_rfile.readlines(sizehint) 101 102 class WSGIHandlerMixin: 103 """ 104 WSGI mix-in for HTTPRequestHandler 105 106 This class is a mix-in to provide WSGI functionality to any 107 HTTPRequestHandler derivative (as provided in Python's BaseHTTPServer). 108 This assumes a ``wsgi_application`` handler on ``self.server``. 109 """ 110 lookup_addresses = True 111 112 def log_request(self, *args, **kwargs): 113 """ disable success request logging 114 115 Logging transactions should not be part of a WSGI server, 116 if you want logging; look at paste.translogger 117 """ 118 pass 119 120 def log_message(self, *args, **kwargs): 121 """ disable error message logging 122 123 Logging transactions should not be part of a WSGI server, 124 if you want logging; look at paste.translogger 125 """ 126 pass 127 128 def version_string(self): 129 """ behavior that BaseHTTPServer should have had """ 130 if not self.sys_version: 131 return self.server_version 132 else: 133 return self.server_version + ' ' + self.sys_version 134 135 def wsgi_write_chunk(self, chunk): 136 """ 137 Write a chunk of the output stream; send headers if they 138 have not already been sent. 139 """ 140 if not self.wsgi_headers_sent and not self.wsgi_curr_headers: 141 raise RuntimeError( 142 "Content returned before start_response called") 143 if not self.wsgi_headers_sent: 144 self.wsgi_headers_sent = True 145 (status, headers) = self.wsgi_curr_headers 146 code, message = status.split(" ", 1) 147 self.send_response(int(code), message) 148 # 149 # HTTP/1.1 compliance; either send Content-Length or 150 # signal that the connection is being closed. 151 # 152 send_close = True 153 for (k, v) in headers: 154 lk = k.lower() 155 if 'content-length' == lk: 156 send_close = False 157 if 'connection' == lk: 158 if 'close' == v.lower(): 159 self.close_connection = 1 160 send_close = False 161 self.send_header(k, v) 162 if send_close: 163 self.close_connection = 1 164 self.send_header('Connection', 'close') 165 166 self.end_headers() 167 self.wfile.write(chunk) 168 169 def wsgi_start_response(self, status, response_headers, exc_info=None): 170 if exc_info: 171 try: 172 if self.wsgi_headers_sent: 173 six.reraise(exc_info[0], exc_info[1], exc_info[2]) 174 else: 175 # In this case, we're going to assume that the 176 # higher-level code is currently handling the 177 # issue and returning a resonable response. 178 # self.log_error(repr(exc_info)) 179 pass 180 finally: 181 exc_info = None 182 elif self.wsgi_curr_headers: 183 assert 0, "Attempt to set headers a second time w/o an exc_info" 184 self.wsgi_curr_headers = (status, response_headers) 185 return self.wsgi_write_chunk 186 187 def wsgi_setup(self, environ=None): 188 """ 189 Setup the member variables used by this WSGI mixin, including 190 the ``environ`` and status member variables. 191 192 After the basic environment is created; the optional ``environ`` 193 argument can be used to override any settings. 194 """ 195 196 dummy_url = 'http://dummy%s' % (self.path,) 197 (scheme, netloc, path, query, fragment) = urlsplit(dummy_url) 198 path = unquote(path) 199 endslash = path.endswith('/') 200 path = posixpath.normpath(path) 201 if endslash and path != '/': 202 # Put the slash back... 203 path += '/' 204 (server_name, server_port) = self.server.server_address[:2] 205 206 rfile = self.rfile 207 # We can put in the protection to keep from over-reading the 208 # file 209 try: 210 content_length = int(self.headers.get('Content-Length', '0')) 211 except ValueError: 212 content_length = 0 213 if '100-continue' == self.headers.get('Expect','').lower(): 214 rfile = LimitedLengthFile(ContinueHook(rfile, self.wfile.write), content_length) 215 else: 216 if not hasattr(self.connection, 'get_context'): 217 # @@: LimitedLengthFile is currently broken in connection 218 # with SSL (sporatic errors that are diffcult to trace, but 219 # ones that go away when you don't use LimitedLengthFile) 220 rfile = LimitedLengthFile(rfile, content_length) 221 222 remote_address = self.client_address[0] 223 self.wsgi_environ = { 224 'wsgi.version': (1,0) 225 ,'wsgi.url_scheme': 'http' 226 ,'wsgi.input': rfile 227 ,'wsgi.errors': sys.stderr 228 ,'wsgi.multithread': True 229 ,'wsgi.multiprocess': False 230 ,'wsgi.run_once': False 231 # CGI variables required by PEP-333 232 ,'REQUEST_METHOD': self.command 233 ,'SCRIPT_NAME': '' # application is root of server 234 ,'PATH_INFO': path 235 ,'QUERY_STRING': query 236 ,'CONTENT_TYPE': self.headers.get('Content-Type', '') 237 ,'CONTENT_LENGTH': self.headers.get('Content-Length', '0') 238 ,'SERVER_NAME': server_name 239 ,'SERVER_PORT': str(server_port) 240 ,'SERVER_PROTOCOL': self.request_version 241 # CGI not required by PEP-333 242 ,'REMOTE_ADDR': remote_address 243 } 244 if scheme: 245 self.wsgi_environ['paste.httpserver.proxy.scheme'] = scheme 246 if netloc: 247 self.wsgi_environ['paste.httpserver.proxy.host'] = netloc 248 249 if self.lookup_addresses: 250 # @@: make lookup_addreses actually work, at this point 251 # it has been address_string() is overriden down in 252 # file and hence is a noop 253 if remote_address.startswith("192.168.") \ 254 or remote_address.startswith("10.") \ 255 or remote_address.startswith("172.16."): 256 pass 257 else: 258 address_string = None # self.address_string() 259 if address_string: 260 self.wsgi_environ['REMOTE_HOST'] = address_string 261 262 if hasattr(self.server, 'thread_pool'): 263 # Now that we know what the request was for, we should 264 # tell the thread pool what its worker is working on 265 self.server.thread_pool.worker_tracker[_thread.get_ident()][1] = self.wsgi_environ 266 self.wsgi_environ['paste.httpserver.thread_pool'] = self.server.thread_pool 267 268 for k, v in self.headers.items(): 269 key = 'HTTP_' + k.replace("-","_").upper() 270 if key in ('HTTP_CONTENT_TYPE','HTTP_CONTENT_LENGTH'): 271 continue 272 self.wsgi_environ[key] = ','.join(_get_headers(self.headers, k)) 273 274 if hasattr(self.connection,'get_context'): 275 self.wsgi_environ['wsgi.url_scheme'] = 'https' 276 # @@: extract other SSL parameters from pyOpenSSL at... 277 # http://www.modssl.org/docs/2.8/ssl_reference.html#ToC25 278 279 if environ: 280 assert isinstance(environ, dict) 281 self.wsgi_environ.update(environ) 282 if 'on' == environ.get('HTTPS'): 283 self.wsgi_environ['wsgi.url_scheme'] = 'https' 284 285 self.wsgi_curr_headers = None 286 self.wsgi_headers_sent = False 287 288 def wsgi_connection_drop(self, exce, environ=None): 289 """ 290 Override this if you're interested in socket exceptions, such 291 as when the user clicks 'Cancel' during a file download. 292 """ 293 pass 294 295 def wsgi_execute(self, environ=None): 296 """ 297 Invoke the server's ``wsgi_application``. 298 """ 299 300 self.wsgi_setup(environ) 301 302 try: 303 result = self.server.wsgi_application(self.wsgi_environ, 304 self.wsgi_start_response) 305 try: 306 for chunk in result: 307 self.wsgi_write_chunk(chunk) 308 if not self.wsgi_headers_sent: 309 self.wsgi_write_chunk('') 310 finally: 311 if hasattr(result,'close'): 312 result.close() 313 result = None 314 except socket.error as exce: 315 self.wsgi_connection_drop(exce, environ) 316 return 317 except: 318 if not self.wsgi_headers_sent: 319 error_msg = "Internal Server Error\n" 320 self.wsgi_curr_headers = ( 321 '500 Internal Server Error', 322 [('Content-type', 'text/plain'), 323 ('Content-length', str(len(error_msg)))]) 324 self.wsgi_write_chunk("Internal Server Error\n") 325 raise 326 327 # 328 # SSL Functionality 329 # 330 # This implementation was motivated by Sebastien Martini's SSL example 331 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473 332 # 333 try: 334 from OpenSSL import SSL, tsafe 335 SocketErrors = (socket.error, SSL.ZeroReturnError, SSL.SysCallError) 336 except ImportError: 337 # Do not require pyOpenSSL to be installed, but disable SSL 338 # functionality in that case. 339 SSL = None 340 SocketErrors = (socket.error,) 341 class SecureHTTPServer(HTTPServer): 342 def __init__(self, server_address, RequestHandlerClass, 343 ssl_context=None, request_queue_size=None): 344 assert not ssl_context, "pyOpenSSL not installed" 345 HTTPServer.__init__(self, server_address, RequestHandlerClass) 346 if request_queue_size: 347 self.socket.listen(request_queue_size) 348 else: 349 350 class _ConnFixer(object): 351 """ wraps a socket connection so it implements makefile """ 352 def __init__(self, conn): 353 self.__conn = conn 354 def makefile(self, mode, bufsize): 355 return socket._fileobject(self.__conn, mode, bufsize) 356 def __getattr__(self, attrib): 357 return getattr(self.__conn, attrib) 358 359 class SecureHTTPServer(HTTPServer): 360 """ 361 Provides SSL server functionality on top of the BaseHTTPServer 362 by overriding _private_ members of Python's standard 363 distribution. The interface for this instance only changes by 364 adding a an optional ssl_context attribute to the constructor: 365 366 cntx = SSL.Context(SSL.SSLv23_METHOD) 367 cntx.use_privatekey_file("host.pem") 368 cntx.use_certificate_file("host.pem") 369 370 """ 371 372 def __init__(self, server_address, RequestHandlerClass, 373 ssl_context=None, request_queue_size=None): 374 # This overrides the implementation of __init__ in python's 375 # SocketServer.TCPServer (which BaseHTTPServer.HTTPServer 376 # does not override, thankfully). 377 HTTPServer.__init__(self, server_address, RequestHandlerClass) 378 self.socket = socket.socket(self.address_family, 379 self.socket_type) 380 self.ssl_context = ssl_context 381 if ssl_context: 382 class TSafeConnection(tsafe.Connection): 383 def settimeout(self, *args): 384 self._lock.acquire() 385 try: 386 return self._ssl_conn.settimeout(*args) 387 finally: 388 self._lock.release() 389 def gettimeout(self): 390 self._lock.acquire() 391 try: 392 return self._ssl_conn.gettimeout() 393 finally: 394 self._lock.release() 395 self.socket = TSafeConnection(ssl_context, self.socket) 396 self.server_bind() 397 if request_queue_size: 398 self.socket.listen(request_queue_size) 399 self.server_activate() 400 401 def get_request(self): 402 # The default SSL request object does not seem to have a 403 # ``makefile(mode, bufsize)`` method as expected by 404 # Socketserver.StreamRequestHandler. 405 (conn, info) = self.socket.accept() 406 if self.ssl_context: 407 conn = _ConnFixer(conn) 408 return (conn, info) 409 410 def _auto_ssl_context(): 411 import OpenSSL, random 412 pkey = OpenSSL.crypto.PKey() 413 pkey.generate_key(OpenSSL.crypto.TYPE_RSA, 768) 414 415 cert = OpenSSL.crypto.X509() 416 417 cert.set_serial_number(random.randint(0, sys.maxint)) 418 cert.gmtime_adj_notBefore(0) 419 cert.gmtime_adj_notAfter(60 * 60 * 24 * 365) 420 cert.get_subject().CN = '*' 421 cert.get_subject().O = 'Dummy Certificate' 422 cert.get_issuer().CN = 'Untrusted Authority' 423 cert.get_issuer().O = 'Self-Signed' 424 cert.set_pubkey(pkey) 425 cert.sign(pkey, 'md5') 426 427 ctx = SSL.Context(SSL.SSLv23_METHOD) 428 ctx.use_privatekey(pkey) 429 ctx.use_certificate(cert) 430 431 return ctx 432 433 class WSGIHandler(WSGIHandlerMixin, BaseHTTPRequestHandler): 434 """ 435 A WSGI handler that overrides POST, GET and HEAD to delegate 436 requests to the server's ``wsgi_application``. 437 """ 438 server_version = 'PasteWSGIServer/' + __version__ 439 440 def handle_one_request(self): 441 """Handle a single HTTP request. 442 443 You normally don't need to override this method; see the class 444 __doc__ string for information on how to handle specific HTTP 445 commands such as GET and POST. 446 447 """ 448 self.raw_requestline = self.rfile.readline() 449 if not self.raw_requestline: 450 self.close_connection = 1 451 return 452 if not self.parse_request(): # An error code has been sent, just exit 453 return 454 self.wsgi_execute() 455 456 def handle(self): 457 # don't bother logging disconnects while handling a request 458 try: 459 BaseHTTPRequestHandler.handle(self) 460 except SocketErrors as exce: 461 self.wsgi_connection_drop(exce) 462 463 def address_string(self): 464 """Return the client address formatted for logging. 465 466 This is overridden so that no hostname lookup is done. 467 """ 468 return '' 469 470 class LimitedLengthFile(object): 471 def __init__(self, file, length): 472 self.file = file 473 self.length = length 474 self._consumed = 0 475 if hasattr(self.file, 'seek'): 476 self.seek = self._seek 477 478 def __repr__(self): 479 base_repr = repr(self.file) 480 return base_repr[:-1] + ' length=%s>' % self.length 481 482 def read(self, length=None): 483 left = self.length - self._consumed 484 if length is None: 485 length = left 486 else: 487 length = min(length, left) 488 # next two lines are hnecessary only if read(0) blocks 489 if not left: 490 return '' 491 data = self.file.read(length) 492 self._consumed += len(data) 493 return data 494 495 def readline(self, *args): 496 max_read = self.length - self._consumed 497 if len(args): 498 max_read = min(args[0], max_read) 499 data = self.file.readline(max_read) 500 self._consumed += len(data) 501 return data 502 503 def readlines(self, hint=None): 504 data = self.file.readlines(hint) 505 for chunk in data: 506 self._consumed += len(chunk) 507 return data 508 509 def __iter__(self): 510 return self 511 512 def next(self): 513 if self.length - self._consumed <= 0: 514 raise StopIteration 515 return self.readline() 516 517 ## Optional methods ## 518 519 def _seek(self, place): 520 self.file.seek(place) 521 self._consumed = place 522 523 def tell(self): 524 if hasattr(self.file, 'tell'): 525 return self.file.tell() 526 else: 527 return self._consumed 528 529 class ThreadPool(object): 530 """ 531 Generic thread pool with a queue of callables to consume. 532 533 Keeps a notion of the status of its worker threads: 534 535 idle: worker thread with nothing to do 536 537 busy: worker thread doing its job 538 539 hung: worker thread that's been doing a job for too long 540 541 dying: a hung thread that has been killed, but hasn't died quite 542 yet. 543 544 zombie: what was a worker thread that we've tried to kill but 545 isn't dead yet. 546 547 At any time you can call track_threads, to get a dictionary with 548 these keys and lists of thread_ids that fall in that status. All 549 keys will be present, even if they point to emty lists. 550 551 hung threads are threads that have been busy more than 552 hung_thread_limit seconds. Hung threads are killed when they live 553 longer than kill_thread_limit seconds. A thread is then 554 considered dying for dying_limit seconds, if it is still alive 555 after that it is considered a zombie. 556 557 When there are no idle workers and a request comes in, another 558 worker *may* be spawned. If there are less than spawn_if_under 559 threads in the busy state, another thread will be spawned. So if 560 the limit is 5, and there are 4 hung threads and 6 busy threads, 561 no thread will be spawned. 562 563 When there are more than max_zombie_threads_before_die zombie 564 threads, a SystemExit exception will be raised, stopping the 565 server. Use 0 or None to never raise this exception. Zombie 566 threads *should* get cleaned up, but killing threads is no 567 necessarily reliable. This is turned off by default, since it is 568 only a good idea if you've deployed the server with some process 569 watching from above (something similar to daemontools or zdaemon). 570 571 Each worker thread only processes ``max_requests`` tasks before it 572 dies and replaces itself with a new worker thread. 573 """ 574 575 576 SHUTDOWN = object() 577 578 def __init__( 579 self, nworkers, name="ThreadPool", daemon=False, 580 max_requests=100, # threads are killed after this many requests 581 hung_thread_limit=30, # when a thread is marked "hung" 582 kill_thread_limit=1800, # when you kill that hung thread 583 dying_limit=300, # seconds that a kill should take to go into effect (longer than this and the thread is a "zombie") 584 spawn_if_under=5, # spawn if there's too many hung threads 585 max_zombie_threads_before_die=0, # when to give up on the process 586 hung_check_period=100, # every 100 requests check for hung workers 587 logger=None, # Place to log messages to 588 error_email=None, # Person(s) to notify if serious problem occurs 589 ): 590 """ 591 Create thread pool with `nworkers` worker threads. 592 """ 593 self.nworkers = nworkers 594 self.max_requests = max_requests 595 self.name = name 596 self.queue = queue.Queue() 597 self.workers = [] 598 self.daemon = daemon 599 if logger is None: 600 logger = logging.getLogger('paste.httpserver.ThreadPool') 601 if isinstance(logger, six.string_types): 602 logger = logging.getLogger(logger) 603 self.logger = logger 604 self.error_email = error_email 605 self._worker_count = count() 606 607 assert (not kill_thread_limit 608 or kill_thread_limit >= hung_thread_limit), ( 609 "kill_thread_limit (%s) should be higher than hung_thread_limit (%s)" 610 % (kill_thread_limit, hung_thread_limit)) 611 if not killthread: 612 kill_thread_limit = 0 613 self.logger.info( 614 "Cannot use kill_thread_limit as ctypes/killthread is not available") 615 self.kill_thread_limit = kill_thread_limit 616 self.dying_limit = dying_limit 617 self.hung_thread_limit = hung_thread_limit 618 assert spawn_if_under <= nworkers, ( 619 "spawn_if_under (%s) should be less than nworkers (%s)" 620 % (spawn_if_under, nworkers)) 621 self.spawn_if_under = spawn_if_under 622 self.max_zombie_threads_before_die = max_zombie_threads_before_die 623 self.hung_check_period = hung_check_period 624 self.requests_since_last_hung_check = 0 625 # Used to keep track of what worker is doing what: 626 self.worker_tracker = {} 627 # Used to keep track of the workers not doing anything: 628 self.idle_workers = [] 629 # Used to keep track of threads that have been killed, but maybe aren't dead yet: 630 self.dying_threads = {} 631 # This is used to track when we last had to add idle workers; 632 # we shouldn't cull extra workers until some time has passed 633 # (hung_thread_limit) since workers were added: 634 self._last_added_new_idle_workers = 0 635 if not daemon: 636 atexit.register(self.shutdown) 637 for i in range(self.nworkers): 638 self.add_worker_thread(message='Initial worker pool') 639 640 def add_task(self, task): 641 """ 642 Add a task to the queue 643 """ 644 self.logger.debug('Added task (%i tasks queued)', self.queue.qsize()) 645 if self.hung_check_period: 646 self.requests_since_last_hung_check += 1 647 if self.requests_since_last_hung_check > self.hung_check_period: 648 self.requests_since_last_hung_check = 0 649 self.kill_hung_threads() 650 if not self.idle_workers and self.spawn_if_under: 651 # spawn_if_under can come into effect... 652 busy = 0 653 now = time.time() 654 self.logger.debug('No idle workers for task; checking if we need to make more workers') 655 for worker in self.workers: 656 if not hasattr(worker, 'thread_id'): 657 # Not initialized 658 continue 659 time_started, info = self.worker_tracker.get(worker.thread_id, 660 (None, None)) 661 if time_started is not None: 662 if now - time_started < self.hung_thread_limit: 663 busy += 1 664 if busy < self.spawn_if_under: 665 self.logger.info( 666 'No idle tasks, and only %s busy tasks; adding %s more ' 667 'workers', busy, self.spawn_if_under-busy) 668 self._last_added_new_idle_workers = time.time() 669 for i in range(self.spawn_if_under - busy): 670 self.add_worker_thread(message='Response to lack of idle workers') 671 else: 672 self.logger.debug( 673 'No extra workers needed (%s busy workers)', 674 busy) 675 if (len(self.workers) > self.nworkers 676 and len(self.idle_workers) > 3 677 and time.time()-self._last_added_new_idle_workers > self.hung_thread_limit): 678 # We've spawned worers in the past, but they aren't needed 679 # anymore; kill off some 680 self.logger.info( 681 'Culling %s extra workers (%s idle workers present)', 682 len(self.workers)-self.nworkers, len(self.idle_workers)) 683 self.logger.debug( 684 'Idle workers: %s', self.idle_workers) 685 for i in range(len(self.workers) - self.nworkers): 686 self.queue.put(self.SHUTDOWN) 687 self.queue.put(task) 688 689 def track_threads(self): 690 """ 691 Return a dict summarizing the threads in the pool (as 692 described in the ThreadPool docstring). 693 """ 694 result = dict(idle=[], busy=[], hung=[], dying=[], zombie=[]) 695 now = time.time() 696 for worker in self.workers: 697 if not hasattr(worker, 'thread_id'): 698 # The worker hasn't fully started up, we should just 699 # ignore it 700 continue 701 time_started, info = self.worker_tracker.get(worker.thread_id, 702 (None, None)) 703 if time_started is not None: 704 if now - time_started > self.hung_thread_limit: 705 result['hung'].append(worker) 706 else: 707 result['busy'].append(worker) 708 else: 709 result['idle'].append(worker) 710 for thread_id, (time_killed, worker) in self.dying_threads.items(): 711 if not self.thread_exists(thread_id): 712 # Cull dying threads that are actually dead and gone 713 self.logger.info('Killed thread %s no longer around', 714 thread_id) 715 try: 716 del self.dying_threads[thread_id] 717 except KeyError: 718 pass 719 continue 720 if now - time_killed > self.dying_limit: 721 result['zombie'].append(worker) 722 else: 723 result['dying'].append(worker) 724 return result 725 726 def kill_worker(self, thread_id): 727 """ 728 Removes the worker with the given thread_id from the pool, and 729 replaces it with a new worker thread. 730 731 This should only be done for mis-behaving workers. 732 """ 733 if killthread is None: 734 raise RuntimeError( 735 "Cannot kill worker; killthread/ctypes not available") 736 thread_obj = threading._active.get(thread_id) 737 killthread.async_raise(thread_id, SystemExit) 738 try: 739 del self.worker_tracker[thread_id] 740 except KeyError: 741 pass 742 self.logger.info('Killing thread %s', thread_id) 743 if thread_obj in self.workers: 744 self.workers.remove(thread_obj) 745 self.dying_threads[thread_id] = (time.time(), thread_obj) 746 self.add_worker_thread(message='Replacement for killed thread %s' % thread_id) 747 748 def thread_exists(self, thread_id): 749 """ 750 Returns true if a thread with this id is still running 751 """ 752 return thread_id in threading._active 753 754 def add_worker_thread(self, *args, **kwargs): 755 index = six.next(self._worker_count) 756 worker = threading.Thread(target=self.worker_thread_callback, 757 args=args, kwargs=kwargs, 758 name=("worker %d" % index)) 759 worker.setDaemon(self.daemon) 760 worker.start() 761 762 def kill_hung_threads(self): 763 """ 764 Tries to kill any hung threads 765 """ 766 if not self.kill_thread_limit: 767 # No killing should occur 768 return 769 now = time.time() 770 max_time = 0 771 total_time = 0 772 idle_workers = 0 773 starting_workers = 0 774 working_workers = 0 775 killed_workers = 0 776 for worker in self.workers: 777 if not hasattr(worker, 'thread_id'): 778 # Not setup yet 779 starting_workers += 1 780 continue 781 time_started, info = self.worker_tracker.get(worker.thread_id, 782 (None, None)) 783 if time_started is None: 784 # Must be idle 785 idle_workers += 1 786 continue 787 working_workers += 1 788 max_time = max(max_time, now-time_started) 789 total_time += now-time_started 790 if now - time_started > self.kill_thread_limit: 791 self.logger.warning( 792 'Thread %s hung (working on task for %i seconds)', 793 worker.thread_id, now - time_started) 794 try: 795 import pprint 796 info_desc = pprint.pformat(info) 797 except: 798 out = six.StringIO() 799 traceback.print_exc(file=out) 800 info_desc = 'Error:\n%s' % out.getvalue() 801 self.notify_problem( 802 "Killing worker thread (id=%(thread_id)s) because it has been \n" 803 "working on task for %(time)s seconds (limit is %(limit)s)\n" 804 "Info on task:\n" 805 "%(info)s" 806 % dict(thread_id=worker.thread_id, 807 time=now - time_started, 808 limit=self.kill_thread_limit, 809 info=info_desc)) 810 self.kill_worker(worker.thread_id) 811 killed_workers += 1 812 if working_workers: 813 ave_time = float(total_time) / working_workers 814 ave_time = '%.2fsec' % ave_time 815 else: 816 ave_time = 'N/A' 817 self.logger.info( 818 "kill_hung_threads status: %s threads (%s working, %s idle, %s starting) " 819 "ave time %s, max time %.2fsec, killed %s workers" 820 % (idle_workers + starting_workers + working_workers, 821 working_workers, idle_workers, starting_workers, 822 ave_time, max_time, killed_workers)) 823 self.check_max_zombies() 824 825 def check_max_zombies(self): 826 """ 827 Check if we've reached max_zombie_threads_before_die; if so 828 then kill the entire process. 829 """ 830 if not self.max_zombie_threads_before_die: 831 return 832 found = [] 833 now = time.time() 834 for thread_id, (time_killed, worker) in self.dying_threads.items(): 835 if not self.thread_exists(thread_id): 836 # Cull dying threads that are actually dead and gone 837 try: 838 del self.dying_threads[thread_id] 839 except KeyError: 840 pass 841 continue 842 if now - time_killed > self.dying_limit: 843 found.append(thread_id) 844 if found: 845 self.logger.info('Found %s zombie threads', found) 846 if len(found) > self.max_zombie_threads_before_die: 847 self.logger.fatal( 848 'Exiting process because %s zombie threads is more than %s limit', 849 len(found), self.max_zombie_threads_before_die) 850 self.notify_problem( 851 "Exiting process because %(found)s zombie threads " 852 "(more than limit of %(limit)s)\n" 853 "Bad threads (ids):\n" 854 " %(ids)s\n" 855 % dict(found=len(found), 856 limit=self.max_zombie_threads_before_die, 857 ids="\n ".join(map(str, found))), 858 subject="Process restart (too many zombie threads)") 859 self.shutdown(10) 860 print('Shutting down', threading.currentThread()) 861 raise ServerExit(3) 862 863 def worker_thread_callback(self, message=None): 864 """ 865 Worker thread should call this method to get and process queued 866 callables. 867 """ 868 thread_obj = threading.currentThread() 869 thread_id = thread_obj.thread_id = _thread.get_ident() 870 self.workers.append(thread_obj) 871 self.idle_workers.append(thread_id) 872 requests_processed = 0 873 add_replacement_worker = False 874 self.logger.debug('Started new worker %s: %s', thread_id, message) 875 try: 876 while True: 877 if self.max_requests and self.max_requests < requests_processed: 878 # Replace this thread then die 879 self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread' 880 % (thread_id, requests_processed, self.max_requests)) 881 add_replacement_worker = True 882 break 883 runnable = self.queue.get() 884 if runnable is ThreadPool.SHUTDOWN: 885 self.logger.debug('Worker %s asked to SHUTDOWN', thread_id) 886 break 887 try: 888 self.idle_workers.remove(thread_id) 889 except ValueError: 890 pass 891 self.worker_tracker[thread_id] = [time.time(), None] 892 requests_processed += 1 893 try: 894 try: 895 runnable() 896 except: 897 # We are later going to call sys.exc_clear(), 898 # removing all remnants of any exception, so 899 # we should log it now. But ideally no 900 # exception should reach this level 901 print('Unexpected exception in worker %r' % runnable, 902 file=sys.stderr) 903 traceback.print_exc() 904 if thread_id in self.dying_threads: 905 # That last exception was intended to kill me 906 break 907 finally: 908 try: 909 del self.worker_tracker[thread_id] 910 except KeyError: 911 pass 912 if six.PY2: 913 sys.exc_clear() 914 self.idle_workers.append(thread_id) 915 finally: 916 try: 917 del self.worker_tracker[thread_id] 918 except KeyError: 919 pass 920 try: 921 self.idle_workers.remove(thread_id) 922 except ValueError: 923 pass 924 try: 925 self.workers.remove(thread_obj) 926 except ValueError: 927 pass 928 try: 929 del self.dying_threads[thread_id] 930 except KeyError: 931 pass 932 if add_replacement_worker: 933 self.add_worker_thread(message='Voluntary replacement for thread %s' % thread_id) 934 935 def shutdown(self, force_quit_timeout=0): 936 """ 937 Shutdown the queue (after finishing any pending requests). 938 """ 939 self.logger.info('Shutting down threadpool') 940 # Add a shutdown request for every worker 941 for i in range(len(self.workers)): 942 self.queue.put(ThreadPool.SHUTDOWN) 943 # Wait for each thread to terminate 944 hung_workers = [] 945 for worker in self.workers: 946 worker.join(0.5) 947 if worker.isAlive(): 948 hung_workers.append(worker) 949 zombies = [] 950 for thread_id in self.dying_threads: 951 if self.thread_exists(thread_id): 952 zombies.append(thread_id) 953 if hung_workers or zombies: 954 self.logger.info("%s workers didn't stop properly, and %s zombies", 955 len(hung_workers), len(zombies)) 956 if hung_workers: 957 for worker in hung_workers: 958 self.kill_worker(worker.thread_id) 959 self.logger.info('Workers killed forcefully') 960 if force_quit_timeout: 961 timed_out = False 962 need_force_quit = bool(zombies) 963 for worker in self.workers: 964 if not timed_out and worker.isAlive(): 965 timed_out = True 966 worker.join(force_quit_timeout) 967 if worker.isAlive(): 968 print("Worker %s won't die" % worker) 969 need_force_quit = True 970 if need_force_quit: 971 import atexit 972 # Remove the threading atexit callback 973 for callback in list(atexit._exithandlers): 974 func = getattr(callback[0], 'im_func', None) 975 if not func: 976 continue 977 globs = getattr(func, 'func_globals', {}) 978 mod = globs.get('__name__') 979 if mod == 'threading': 980 atexit._exithandlers.remove(callback) 981 atexit._run_exitfuncs() 982 print('Forcefully exiting process') 983 os._exit(3) 984 else: 985 self.logger.info('All workers eventually killed') 986 else: 987 self.logger.info('All workers stopped') 988 989 def notify_problem(self, msg, subject=None, spawn_thread=True): 990 """ 991 Called when there's a substantial problem. msg contains the 992 body of the notification, subject the summary. 993 994 If spawn_thread is true, then the email will be send in 995 another thread (so this doesn't block). 996 """ 997 if not self.error_email: 998 return 999 if spawn_thread: 1000 t = threading.Thread( 1001 target=self.notify_problem, 1002 args=(msg, subject, False)) 1003 t.start() 1004 return 1005 from_address = 'errors@localhost' 1006 if not subject: 1007 subject = msg.strip().splitlines()[0] 1008 subject = subject[:50] 1009 subject = '[http threadpool] %s' % subject 1010 headers = [ 1011 "To: %s" % self.error_email, 1012 "From: %s" % from_address, 1013 "Subject: %s" % subject, 1014 ] 1015 try: 1016 system = ' '.join(os.uname()) 1017 except: 1018 system = '(unknown)' 1019 body = ( 1020 "An error has occurred in the paste.httpserver.ThreadPool\n" 1021 "Error:\n" 1022 " %(msg)s\n" 1023 "Occurred at: %(time)s\n" 1024 "PID: %(pid)s\n" 1025 "System: %(system)s\n" 1026 "Server .py file: %(file)s\n" 1027 % dict(msg=msg, 1028 time=time.strftime("%c"), 1029 pid=os.getpid(), 1030 system=system, 1031 file=os.path.abspath(__file__), 1032 )) 1033 message = '\n'.join(headers) + "\n\n" + body 1034 import smtplib 1035 server = smtplib.SMTP('localhost') 1036 error_emails = [ 1037 e.strip() for e in self.error_email.split(",") 1038 if e.strip()] 1039 server.sendmail(from_address, error_emails, message) 1040 server.quit() 1041 print('email sent to', error_emails, message) 1042 1043 class ThreadPoolMixIn(object): 1044 """ 1045 Mix-in class to process requests from a thread pool 1046 """ 1047 def __init__(self, nworkers, daemon=False, **threadpool_options): 1048 # Create and start the workers 1049 self.running = True 1050 assert nworkers > 0, "ThreadPoolMixIn servers must have at least one worker" 1051 self.thread_pool = ThreadPool( 1052 nworkers, 1053 "ThreadPoolMixIn HTTP server on %s:%d" 1054 % (self.server_name, self.server_port), 1055 daemon, 1056 **threadpool_options) 1057 1058 def process_request(self, request, client_address): 1059 """ 1060 Queue the request to be processed by on of the thread pool threads 1061 """ 1062 # This sets the socket to blocking mode (and no timeout) since it 1063 # may take the thread pool a little while to get back to it. (This 1064 # is the default but since we set a timeout on the parent socket so 1065 # that we can trap interrupts we need to restore this,.) 1066 request.setblocking(1) 1067 # Queue processing of the request 1068 self.thread_pool.add_task( 1069 lambda: self.process_request_in_thread(request, client_address)) 1070 1071 def handle_error(self, request, client_address): 1072 exc_class, exc, tb = sys.exc_info() 1073 if exc_class is ServerExit: 1074 # This is actually a request to stop the server 1075 raise 1076 return super(ThreadPoolMixIn, self).handle_error(request, client_address) 1077 1078 def process_request_in_thread(self, request, client_address): 1079 """ 1080 The worker thread should call back here to do the rest of the 1081 request processing. Error handling normaller done in 'handle_request' 1082 must be done here. 1083 """ 1084 try: 1085 self.finish_request(request, client_address) 1086 self.close_request(request) 1087 except: 1088 self.handle_error(request, client_address) 1089 self.close_request(request) 1090 exc = sys.exc_info()[1] 1091 if isinstance(exc, (MemoryError, KeyboardInterrupt)): 1092 raise 1093 1094 def serve_forever(self): 1095 """ 1096 Overrides `serve_forever` to shut the threadpool down cleanly. 1097 """ 1098 try: 1099 while self.running: 1100 try: 1101 self.handle_request() 1102 except socket.timeout: 1103 # Timeout is expected, gives interrupts a chance to 1104 # propogate, just keep handling 1105 pass 1106 finally: 1107 if hasattr(self, 'thread_pool'): 1108 self.thread_pool.shutdown() 1109 1110 def server_activate(self): 1111 """ 1112 Overrides server_activate to set timeout on our listener socket. 1113 """ 1114 # We set the timeout here so that we can trap interrupts on windows 1115 self.socket.settimeout(1) 1116 1117 def server_close(self): 1118 """ 1119 Finish pending requests and shutdown the server. 1120 """ 1121 self.running = False 1122 self.socket.close() 1123 if hasattr(self, 'thread_pool'): 1124 self.thread_pool.shutdown(60) 1125 1126 class WSGIServerBase(SecureHTTPServer): 1127 def __init__(self, wsgi_application, server_address, 1128 RequestHandlerClass=None, ssl_context=None, 1129 request_queue_size=None): 1130 SecureHTTPServer.__init__(self, server_address, 1131 RequestHandlerClass, ssl_context, 1132 request_queue_size=request_queue_size) 1133 self.wsgi_application = wsgi_application 1134 self.wsgi_socket_timeout = None 1135 1136 def get_request(self): 1137 # If there is a socket_timeout, set it on the accepted 1138 (conn,info) = SecureHTTPServer.get_request(self) 1139 if self.wsgi_socket_timeout: 1140 conn.settimeout(self.wsgi_socket_timeout) 1141 return (conn, info) 1142 1143 class WSGIServer(ThreadingMixIn, WSGIServerBase): 1144 daemon_threads = False 1145 1146 class WSGIThreadPoolServer(ThreadPoolMixIn, WSGIServerBase): 1147 def __init__(self, wsgi_application, server_address, 1148 RequestHandlerClass=None, ssl_context=None, 1149 nworkers=10, daemon_threads=False, 1150 threadpool_options=None, request_queue_size=None): 1151 WSGIServerBase.__init__(self, wsgi_application, server_address, 1152 RequestHandlerClass, ssl_context, 1153 request_queue_size=request_queue_size) 1154 if threadpool_options is None: 1155 threadpool_options = {} 1156 ThreadPoolMixIn.__init__(self, nworkers, daemon_threads, 1157 **threadpool_options) 1158 1159 class ServerExit(SystemExit): 1160 """ 1161 Raised to tell the server to really exit (SystemExit is normally 1162 caught) 1163 """ 1164 1165 def serve(application, host=None, port=None, handler=None, ssl_pem=None, 1166 ssl_context=None, server_version=None, protocol_version=None, 1167 start_loop=True, daemon_threads=None, socket_timeout=None, 1168 use_threadpool=None, threadpool_workers=10, 1169 threadpool_options=None, request_queue_size=5): 1170 """ 1171 Serves your ``application`` over HTTP(S) via WSGI interface 1172 1173 ``host`` 1174 1175 This is the ipaddress to bind to (or a hostname if your 1176 nameserver is properly configured). This defaults to 1177 127.0.0.1, which is not a public interface. 1178 1179 ``port`` 1180 1181 The port to run on, defaults to 8080 for HTTP, or 4443 for 1182 HTTPS. This can be a string or an integer value. 1183 1184 ``handler`` 1185 1186 This is the HTTP request handler to use, it defaults to 1187 ``WSGIHandler`` in this module. 1188 1189 ``ssl_pem`` 1190 1191 This an optional SSL certificate file (via OpenSSL). You can 1192 supply ``*`` and a development-only certificate will be 1193 created for you, or you can generate a self-signed test PEM 1194 certificate file as follows:: 1195 1196 $ openssl genrsa 1024 > host.key 1197 $ chmod 400 host.key 1198 $ openssl req -new -x509 -nodes -sha1 -days 365 \\ 1199 -key host.key > host.cert 1200 $ cat host.cert host.key > host.pem 1201 $ chmod 400 host.pem 1202 1203 ``ssl_context`` 1204 1205 This an optional SSL context object for the server. A SSL 1206 context will be automatically constructed for you if you supply 1207 ``ssl_pem``. Supply this to use a context of your own 1208 construction. 1209 1210 ``server_version`` 1211 1212 The version of the server as reported in HTTP response line. This 1213 defaults to something like "PasteWSGIServer/0.5". Many servers 1214 hide their code-base identity with a name like 'Amnesiac/1.0' 1215 1216 ``protocol_version`` 1217 1218 This sets the protocol used by the server, by default 1219 ``HTTP/1.0``. There is some support for ``HTTP/1.1``, which 1220 defaults to nicer keep-alive connections. This server supports 1221 ``100 Continue``, but does not yet support HTTP/1.1 Chunked 1222 Encoding. Hence, if you use HTTP/1.1, you're somewhat in error 1223 since chunked coding is a mandatory requirement of a HTTP/1.1 1224 server. If you specify HTTP/1.1, every response *must* have a 1225 ``Content-Length`` and you must be careful not to read past the 1226 end of the socket. 1227 1228 ``start_loop`` 1229 1230 This specifies if the server loop (aka ``server.serve_forever()``) 1231 should be called; it defaults to ``True``. 1232 1233 ``daemon_threads`` 1234 1235 This flag specifies if when your webserver terminates all 1236 in-progress client connections should be droppped. It defaults 1237 to ``False``. You might want to set this to ``True`` if you 1238 are using ``HTTP/1.1`` and don't set a ``socket_timeout``. 1239 1240 ``socket_timeout`` 1241 1242 This specifies the maximum amount of time that a connection to a 1243 given client will be kept open. At this time, it is a rude 1244 disconnect, but at a later time it might follow the RFC a bit 1245 more closely. 1246 1247 ``use_threadpool`` 1248 1249 Server requests from a pool of worker threads (``threadpool_workers``) 1250 rather than creating a new thread for each request. This can 1251 substantially reduce latency since there is a high cost associated 1252 with thread creation. 1253 1254 ``threadpool_workers`` 1255 1256 Number of worker threads to create when ``use_threadpool`` is true. This 1257 can be a string or an integer value. 1258 1259 ``threadpool_options`` 1260 1261 A dictionary of options to be used when instantiating the 1262 threadpool. See paste.httpserver.ThreadPool for specific 1263 options (``threadpool_workers`` is a specific option that can 1264 also go here). 1265 1266 ``request_queue_size`` 1267 1268 The 'backlog' argument to socket.listen(); specifies the 1269 maximum number of queued connections. 1270 1271 """ 1272 is_ssl = False 1273 if ssl_pem or ssl_context: 1274 assert SSL, "pyOpenSSL is not installed" 1275 is_ssl = True 1276 port = int(port or 4443) 1277 if not ssl_context: 1278 if ssl_pem == '*': 1279 ssl_context = _auto_ssl_context() 1280 else: 1281 ssl_context = SSL.Context(SSL.SSLv23_METHOD) 1282 ssl_context.use_privatekey_file(ssl_pem) 1283 ssl_context.use_certificate_chain_file(ssl_pem) 1284 1285 host = host or '127.0.0.1' 1286 if port is None: 1287 if ':' in host: 1288 host, port = host.split(':', 1) 1289 else: 1290 port = 8080 1291 server_address = (host, int(port)) 1292 1293 if not handler: 1294 handler = WSGIHandler 1295 if server_version: 1296 handler.server_version = server_version 1297 handler.sys_version = None 1298 if protocol_version: 1299 assert protocol_version in ('HTTP/0.9', 'HTTP/1.0', 'HTTP/1.1') 1300 handler.protocol_version = protocol_version 1301 1302 if use_threadpool is None: 1303 use_threadpool = True 1304 1305 if converters.asbool(use_threadpool): 1306 server = WSGIThreadPoolServer(application, server_address, handler, 1307 ssl_context, int(threadpool_workers), 1308 daemon_threads, 1309 threadpool_options=threadpool_options, 1310 request_queue_size=request_queue_size) 1311 else: 1312 server = WSGIServer(application, server_address, handler, ssl_context, 1313 request_queue_size=request_queue_size) 1314 if daemon_threads: 1315 server.daemon_threads = daemon_threads 1316 1317 if socket_timeout: 1318 server.wsgi_socket_timeout = int(socket_timeout) 1319 1320 if converters.asbool(start_loop): 1321 protocol = is_ssl and 'https' or 'http' 1322 host, port = server.server_address[:2] 1323 if host == '0.0.0.0': 1324 print('serving on 0.0.0.0:%s view at %s://127.0.0.1:%s' 1325 % (port, protocol, port)) 1326 else: 1327 print("serving on %s://%s:%s" % (protocol, host, port)) 1328 try: 1329 server.serve_forever() 1330 except KeyboardInterrupt: 1331 # allow CTRL+C to shutdown 1332 pass 1333 return server 1334 1335 # For paste.deploy server instantiation (egg:Paste#http) 1336 # Note: this gets a separate function because it has to expect string 1337 # arguments (though that's not much of an issue yet, ever?) 1338 def server_runner(wsgi_app, global_conf, **kwargs): 1339 from paste.deploy.converters import asbool 1340 for name in ['port', 'socket_timeout', 'threadpool_workers', 1341 'threadpool_hung_thread_limit', 1342 'threadpool_kill_thread_limit', 1343 'threadpool_dying_limit', 'threadpool_spawn_if_under', 1344 'threadpool_max_zombie_threads_before_die', 1345 'threadpool_hung_check_period', 1346 'threadpool_max_requests', 'request_queue_size']: 1347 if name in kwargs: 1348 kwargs[name] = int(kwargs[name]) 1349 for name in ['use_threadpool', 'daemon_threads']: 1350 if name in kwargs: 1351 kwargs[name] = asbool(kwargs[name]) 1352 threadpool_options = {} 1353 for name, value in kwargs.items(): 1354 if name.startswith('threadpool_') and name != 'threadpool_workers': 1355 threadpool_options[name[len('threadpool_'):]] = value 1356 del kwargs[name] 1357 if ('error_email' not in threadpool_options 1358 and 'error_email' in global_conf): 1359 threadpool_options['error_email'] = global_conf['error_email'] 1360 kwargs['threadpool_options'] = threadpool_options 1361 serve(wsgi_app, **kwargs) 1362 1363 server_runner.__doc__ = (serve.__doc__ or '') + """ 1364 1365 You can also set these threadpool options: 1366 1367 ``threadpool_max_requests``: 1368 1369 The maximum number of requests a worker thread will process 1370 before dying (and replacing itself with a new worker thread). 1371 Default 100. 1372 1373 ``threadpool_hung_thread_limit``: 1374 1375 The number of seconds a thread can work on a task before it is 1376 considered hung (stuck). Default 30 seconds. 1377 1378 ``threadpool_kill_thread_limit``: 1379 1380 The number of seconds a thread can work before you should kill it 1381 (assuming it will never finish). Default 600 seconds (10 minutes). 1382 1383 ``threadpool_dying_limit``: 1384 1385 The length of time after killing a thread that it should actually 1386 disappear. If it lives longer than this, it is considered a 1387 "zombie". Note that even in easy situations killing a thread can 1388 be very slow. Default 300 seconds (5 minutes). 1389 1390 ``threadpool_spawn_if_under``: 1391 1392 If there are no idle threads and a request comes in, and there are 1393 less than this number of *busy* threads, then add workers to the 1394 pool. Busy threads are threads that have taken less than 1395 ``threadpool_hung_thread_limit`` seconds so far. So if you get 1396 *lots* of requests but they complete in a reasonable amount of time, 1397 the requests will simply queue up (adding more threads probably 1398 wouldn't speed them up). But if you have lots of hung threads and 1399 one more request comes in, this will add workers to handle it. 1400 Default 5. 1401 1402 ``threadpool_max_zombie_threads_before_die``: 1403 1404 If there are more zombies than this, just kill the process. This is 1405 only good if you have a monitor that will automatically restart 1406 the server. This can clean up the mess. Default 0 (disabled). 1407 1408 `threadpool_hung_check_period``: 1409 1410 Every X requests, check for hung threads that need to be killed, 1411 or for zombie threads that should cause a restart. Default 100 1412 requests. 1413 1414 ``threadpool_logger``: 1415 1416 Logging messages will go the logger named here. 1417 1418 ``threadpool_error_email`` (or global ``error_email`` setting): 1419 1420 When threads are killed or the process restarted, this email 1421 address will be contacted (using an SMTP server on localhost). 1422 1423 """ 1424 1425 1426 if __name__ == '__main__': 1427 from paste.wsgilib import dump_environ 1428 #serve(dump_environ, ssl_pem="test.pem") 1429 serve(dump_environ, server_version="Wombles/1.0", 1430 protocol_version="HTTP/1.1", port="8888") 1431