Home | History | Annotate | Download | only in test
      1 from unittest import mock
      2 from test import support
      3 from test.test_httpservers import NoLogRequestHandler
      4 from unittest import TestCase
      5 from wsgiref.util import setup_testing_defaults
      6 from wsgiref.headers import Headers
      7 from wsgiref.handlers import BaseHandler, BaseCGIHandler, SimpleHandler
      8 from wsgiref import util
      9 from wsgiref.validate import validator
     10 from wsgiref.simple_server import WSGIServer, WSGIRequestHandler
     11 from wsgiref.simple_server import make_server
     12 from http.client import HTTPConnection
     13 from io import StringIO, BytesIO, BufferedReader
     14 from socketserver import BaseServer
     15 from platform import python_implementation
     16 
     17 import os
     18 import re
     19 import signal
     20 import sys
     21 import unittest
     22 
     23 
     24 class MockServer(WSGIServer):
     25     """Non-socket HTTP server"""
     26 
     27     def __init__(self, server_address, RequestHandlerClass):
     28         BaseServer.__init__(self, server_address, RequestHandlerClass)
     29         self.server_bind()
     30 
     31     def server_bind(self):
     32         host, port = self.server_address
     33         self.server_name = host
     34         self.server_port = port
     35         self.setup_environ()
     36 
     37 
     38 class MockHandler(WSGIRequestHandler):
     39     """Non-socket HTTP handler"""
     40     def setup(self):
     41         self.connection = self.request
     42         self.rfile, self.wfile = self.connection
     43 
     44     def finish(self):
     45         pass
     46 
     47 
     48 def hello_app(environ,start_response):
     49     start_response("200 OK", [
     50         ('Content-Type','text/plain'),
     51         ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
     52     ])
     53     return [b"Hello, world!"]
     54 
     55 
     56 def header_app(environ, start_response):
     57     start_response("200 OK", [
     58         ('Content-Type', 'text/plain'),
     59         ('Date', 'Mon, 05 Jun 2006 18:49:54 GMT')
     60     ])
     61     return [';'.join([
     62         environ['HTTP_X_TEST_HEADER'], environ['QUERY_STRING'],
     63         environ['PATH_INFO']
     64     ]).encode('iso-8859-1')]
     65 
     66 
     67 def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"):
     68     server = make_server("", 80, app, MockServer, MockHandler)
     69     inp = BufferedReader(BytesIO(data))
     70     out = BytesIO()
     71     olderr = sys.stderr
     72     err = sys.stderr = StringIO()
     73 
     74     try:
     75         server.finish_request((inp, out), ("127.0.0.1",8888))
     76     finally:
     77         sys.stderr = olderr
     78 
     79     return out.getvalue(), err.getvalue()
     80 
     81 def compare_generic_iter(make_it,match):
     82     """Utility to compare a generic 2.1/2.2+ iterator with an iterable
     83 
     84     If running under Python 2.2+, this tests the iterator using iter()/next(),
     85     as well as __getitem__.  'make_it' must be a function returning a fresh
     86     iterator to be tested (since this may test the iterator twice)."""
     87 
     88     it = make_it()
     89     n = 0
     90     for item in match:
     91         if not it[n]==item: raise AssertionError
     92         n+=1
     93     try:
     94         it[n]
     95     except IndexError:
     96         pass
     97     else:
     98         raise AssertionError("Too many items from __getitem__",it)
     99 
    100     try:
    101         iter, StopIteration
    102     except NameError:
    103         pass
    104     else:
    105         # Only test iter mode under 2.2+
    106         it = make_it()
    107         if not iter(it) is it: raise AssertionError
    108         for item in match:
    109             if not next(it) == item: raise AssertionError
    110         try:
    111             next(it)
    112         except StopIteration:
    113             pass
    114         else:
    115             raise AssertionError("Too many items from .__next__()", it)
    116 
    117 
    118 class IntegrationTests(TestCase):
    119 
    120     def check_hello(self, out, has_length=True):
    121         pyver = (python_implementation() + "/" +
    122                 sys.version.split()[0])
    123         self.assertEqual(out,
    124             ("HTTP/1.0 200 OK\r\n"
    125             "Server: WSGIServer/0.2 " + pyver +"\r\n"
    126             "Content-Type: text/plain\r\n"
    127             "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
    128             (has_length and  "Content-Length: 13\r\n" or "") +
    129             "\r\n"
    130             "Hello, world!").encode("iso-8859-1")
    131         )
    132 
    133     def test_plain_hello(self):
    134         out, err = run_amock()
    135         self.check_hello(out)
    136 
    137     def test_environ(self):
    138         request = (
    139             b"GET /p%61th/?query=test HTTP/1.0\n"
    140             b"X-Test-Header: Python test \n"
    141             b"X-Test-Header: Python test 2\n"
    142             b"Content-Length: 0\n\n"
    143         )
    144         out, err = run_amock(header_app, request)
    145         self.assertEqual(
    146             out.splitlines()[-1],
    147             b"Python test,Python test 2;query=test;/path/"
    148         )
    149 
    150     def test_request_length(self):
    151         out, err = run_amock(data=b"GET " + (b"x" * 65537) + b" HTTP/1.0\n\n")
    152         self.assertEqual(out.splitlines()[0],
    153                          b"HTTP/1.0 414 Request-URI Too Long")
    154 
    155     def test_validated_hello(self):
    156         out, err = run_amock(validator(hello_app))
    157         # the middleware doesn't support len(), so content-length isn't there
    158         self.check_hello(out, has_length=False)
    159 
    160     def test_simple_validation_error(self):
    161         def bad_app(environ,start_response):
    162             start_response("200 OK", ('Content-Type','text/plain'))
    163             return ["Hello, world!"]
    164         out, err = run_amock(validator(bad_app))
    165         self.assertTrue(out.endswith(
    166             b"A server error occurred.  Please contact the administrator."
    167         ))
    168         self.assertEqual(
    169             err.splitlines()[-2],
    170             "AssertionError: Headers (('Content-Type', 'text/plain')) must"
    171             " be of type list: <class 'tuple'>"
    172         )
    173 
    174     def test_status_validation_errors(self):
    175         def create_bad_app(status):
    176             def bad_app(environ, start_response):
    177                 start_response(status, [("Content-Type", "text/plain; charset=utf-8")])
    178                 return [b"Hello, world!"]
    179             return bad_app
    180 
    181         tests = [
    182             ('200', 'AssertionError: Status must be at least 4 characters'),
    183             ('20X OK', 'AssertionError: Status message must begin w/3-digit code'),
    184             ('200OK', 'AssertionError: Status message must have a space after code'),
    185         ]
    186 
    187         for status, exc_message in tests:
    188             with self.subTest(status=status):
    189                 out, err = run_amock(create_bad_app(status))
    190                 self.assertTrue(out.endswith(
    191                     b"A server error occurred.  Please contact the administrator."
    192                 ))
    193                 self.assertEqual(err.splitlines()[-2], exc_message)
    194 
    195     def test_wsgi_input(self):
    196         def bad_app(e,s):
    197             e["wsgi.input"].read()
    198             s("200 OK", [("Content-Type", "text/plain; charset=utf-8")])
    199             return [b"data"]
    200         out, err = run_amock(validator(bad_app))
    201         self.assertTrue(out.endswith(
    202             b"A server error occurred.  Please contact the administrator."
    203         ))
    204         self.assertEqual(
    205             err.splitlines()[-2], "AssertionError"
    206         )
    207 
    208     def test_bytes_validation(self):
    209         def app(e, s):
    210             s("200 OK", [
    211                 ("Content-Type", "text/plain; charset=utf-8"),
    212                 ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"),
    213                 ])
    214             return [b"data"]
    215         out, err = run_amock(validator(app))
    216         self.assertTrue(err.endswith('"GET / HTTP/1.0" 200 4\n'))
    217         ver = sys.version.split()[0].encode('ascii')
    218         py  = python_implementation().encode('ascii')
    219         pyver = py + b"/" + ver
    220         self.assertEqual(
    221                 b"HTTP/1.0 200 OK\r\n"
    222                 b"Server: WSGIServer/0.2 "+ pyver + b"\r\n"
    223                 b"Content-Type: text/plain; charset=utf-8\r\n"
    224                 b"Date: Wed, 24 Dec 2008 13:29:32 GMT\r\n"
    225                 b"\r\n"
    226                 b"data",
    227                 out)
    228 
    229     def test_cp1252_url(self):
    230         def app(e, s):
    231             s("200 OK", [
    232                 ("Content-Type", "text/plain"),
    233                 ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"),
    234                 ])
    235             # PEP3333 says environ variables are decoded as latin1.
    236             # Encode as latin1 to get original bytes
    237             return [e["PATH_INFO"].encode("latin1")]
    238 
    239         out, err = run_amock(
    240             validator(app), data=b"GET /\x80%80 HTTP/1.0")
    241         self.assertEqual(
    242             [
    243                 b"HTTP/1.0 200 OK",
    244                 mock.ANY,
    245                 b"Content-Type: text/plain",
    246                 b"Date: Wed, 24 Dec 2008 13:29:32 GMT",
    247                 b"",
    248                 b"/\x80\x80",
    249             ],
    250             out.splitlines())
    251 
    252     def test_interrupted_write(self):
    253         # BaseHandler._write() and _flush() have to write all data, even if
    254         # it takes multiple send() calls.  Test this by interrupting a send()
    255         # call with a Unix signal.
    256         threading = support.import_module("threading")
    257         pthread_kill = support.get_attribute(signal, "pthread_kill")
    258 
    259         def app(environ, start_response):
    260             start_response("200 OK", [])
    261             return [b'\0' * support.SOCK_MAX_SIZE]
    262 
    263         class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler):
    264             pass
    265 
    266         server = make_server(support.HOST, 0, app, handler_class=WsgiHandler)
    267         self.addCleanup(server.server_close)
    268         interrupted = threading.Event()
    269 
    270         def signal_handler(signum, frame):
    271             interrupted.set()
    272 
    273         original = signal.signal(signal.SIGUSR1, signal_handler)
    274         self.addCleanup(signal.signal, signal.SIGUSR1, original)
    275         received = None
    276         main_thread = threading.get_ident()
    277 
    278         def run_client():
    279             http = HTTPConnection(*server.server_address)
    280             http.request("GET", "/")
    281             with http.getresponse() as response:
    282                 response.read(100)
    283                 # The main thread should now be blocking in a send() system
    284                 # call.  But in theory, it could get interrupted by other
    285                 # signals, and then retried.  So keep sending the signal in a
    286                 # loop, in case an earlier signal happens to be delivered at
    287                 # an inconvenient moment.
    288                 while True:
    289                     pthread_kill(main_thread, signal.SIGUSR1)
    290                     if interrupted.wait(timeout=float(1)):
    291                         break
    292                 nonlocal received
    293                 received = len(response.read())
    294             http.close()
    295 
    296         background = threading.Thread(target=run_client)
    297         background.start()
    298         server.handle_request()
    299         background.join()
    300         self.assertEqual(received, support.SOCK_MAX_SIZE - 100)
    301 
    302 
    303 class UtilityTests(TestCase):
    304 
    305     def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
    306         env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
    307         util.setup_testing_defaults(env)
    308         self.assertEqual(util.shift_path_info(env),part)
    309         self.assertEqual(env['PATH_INFO'],pi_out)
    310         self.assertEqual(env['SCRIPT_NAME'],sn_out)
    311         return env
    312 
    313     def checkDefault(self, key, value, alt=None):
    314         # Check defaulting when empty
    315         env = {}
    316         util.setup_testing_defaults(env)
    317         if isinstance(value, StringIO):
    318             self.assertIsInstance(env[key], StringIO)
    319         elif isinstance(value,BytesIO):
    320             self.assertIsInstance(env[key],BytesIO)
    321         else:
    322             self.assertEqual(env[key], value)
    323 
    324         # Check existing value
    325         env = {key:alt}
    326         util.setup_testing_defaults(env)
    327         self.assertIs(env[key], alt)
    328 
    329     def checkCrossDefault(self,key,value,**kw):
    330         util.setup_testing_defaults(kw)
    331         self.assertEqual(kw[key],value)
    332 
    333     def checkAppURI(self,uri,**kw):
    334         util.setup_testing_defaults(kw)
    335         self.assertEqual(util.application_uri(kw),uri)
    336 
    337     def checkReqURI(self,uri,query=1,**kw):
    338         util.setup_testing_defaults(kw)
    339         self.assertEqual(util.request_uri(kw,query),uri)
    340 
    341     def checkFW(self,text,size,match):
    342 
    343         def make_it(text=text,size=size):
    344             return util.FileWrapper(StringIO(text),size)
    345 
    346         compare_generic_iter(make_it,match)
    347 
    348         it = make_it()
    349         self.assertFalse(it.filelike.closed)
    350 
    351         for item in it:
    352             pass
    353 
    354         self.assertFalse(it.filelike.closed)
    355 
    356         it.close()
    357         self.assertTrue(it.filelike.closed)
    358 
    359     def testSimpleShifts(self):
    360         self.checkShift('','/', '', '/', '')
    361         self.checkShift('','/x', 'x', '/x', '')
    362         self.checkShift('/','', None, '/', '')
    363         self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
    364         self.checkShift('/a','/x/',  'x', '/a/x', '/')
    365 
    366     def testNormalizedShifts(self):
    367         self.checkShift('/a/b', '/../y', '..', '/a', '/y')
    368         self.checkShift('', '/../y', '..', '', '/y')
    369         self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
    370         self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
    371         self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
    372         self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
    373         self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
    374         self.checkShift('/a/b', '///', '', '/a/b/', '')
    375         self.checkShift('/a/b', '/.//', '', '/a/b/', '')
    376         self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
    377         self.checkShift('/a/b', '/.', None, '/a/b', '')
    378 
    379     def testDefaults(self):
    380         for key, value in [
    381             ('SERVER_NAME','127.0.0.1'),
    382             ('SERVER_PORT', '80'),
    383             ('SERVER_PROTOCOL','HTTP/1.0'),
    384             ('HTTP_HOST','127.0.0.1'),
    385             ('REQUEST_METHOD','GET'),
    386             ('SCRIPT_NAME',''),
    387             ('PATH_INFO','/'),
    388             ('wsgi.version', (1,0)),
    389             ('wsgi.run_once', 0),
    390             ('wsgi.multithread', 0),
    391             ('wsgi.multiprocess', 0),
    392             ('wsgi.input', BytesIO()),
    393             ('wsgi.errors', StringIO()),
    394             ('wsgi.url_scheme','http'),
    395         ]:
    396             self.checkDefault(key,value)
    397 
    398     def testCrossDefaults(self):
    399         self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
    400         self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
    401         self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
    402         self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
    403         self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
    404         self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
    405         self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
    406 
    407     def testGuessScheme(self):
    408         self.assertEqual(util.guess_scheme({}), "http")
    409         self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
    410         self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
    411         self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
    412         self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
    413 
    414     def testAppURIs(self):
    415         self.checkAppURI("http://127.0.0.1/")
    416         self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
    417         self.checkAppURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m")
    418         self.checkAppURI("http://spam.example.com:2071/",
    419             HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
    420         self.checkAppURI("http://spam.example.com/",
    421             SERVER_NAME="spam.example.com")
    422         self.checkAppURI("http://127.0.0.1/",
    423             HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
    424         self.checkAppURI("https://127.0.0.1/", HTTPS="on")
    425         self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
    426             HTTP_HOST=None)
    427 
    428     def testReqURIs(self):
    429         self.checkReqURI("http://127.0.0.1/")
    430         self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
    431         self.checkReqURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m")
    432         self.checkReqURI("http://127.0.0.1/spammity/spam",
    433             SCRIPT_NAME="/spammity", PATH_INFO="/spam")
    434         self.checkReqURI("http://127.0.0.1/spammity/sp%E4m",
    435             SCRIPT_NAME="/spammity", PATH_INFO="/sp\xe4m")
    436         self.checkReqURI("http://127.0.0.1/spammity/spam;ham",
    437             SCRIPT_NAME="/spammity", PATH_INFO="/spam;ham")
    438         self.checkReqURI("http://127.0.0.1/spammity/spam;cookie=1234,5678",
    439             SCRIPT_NAME="/spammity", PATH_INFO="/spam;cookie=1234,5678")
    440         self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
    441             SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
    442         self.checkReqURI("http://127.0.0.1/spammity/spam?s%E4y=ni",
    443             SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="s%E4y=ni")
    444         self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
    445             SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
    446 
    447     def testFileWrapper(self):
    448         self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
    449 
    450     def testHopByHop(self):
    451         for hop in (
    452             "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
    453             "TE Trailers Transfer-Encoding Upgrade"
    454         ).split():
    455             for alt in hop, hop.title(), hop.upper(), hop.lower():
    456                 self.assertTrue(util.is_hop_by_hop(alt))
    457 
    458         # Not comprehensive, just a few random header names
    459         for hop in (
    460             "Accept Cache-Control Date Pragma Trailer Via Warning"
    461         ).split():
    462             for alt in hop, hop.title(), hop.upper(), hop.lower():
    463                 self.assertFalse(util.is_hop_by_hop(alt))
    464 
    465 class HeaderTests(TestCase):
    466 
    467     def testMappingInterface(self):
    468         test = [('x','y')]
    469         self.assertEqual(len(Headers()), 0)
    470         self.assertEqual(len(Headers([])),0)
    471         self.assertEqual(len(Headers(test[:])),1)
    472         self.assertEqual(Headers(test[:]).keys(), ['x'])
    473         self.assertEqual(Headers(test[:]).values(), ['y'])
    474         self.assertEqual(Headers(test[:]).items(), test)
    475         self.assertIsNot(Headers(test).items(), test)  # must be copy!
    476 
    477         h = Headers()
    478         del h['foo']   # should not raise an error
    479 
    480         h['Foo'] = 'bar'
    481         for m in h.__contains__, h.get, h.get_all, h.__getitem__:
    482             self.assertTrue(m('foo'))
    483             self.assertTrue(m('Foo'))
    484             self.assertTrue(m('FOO'))
    485             self.assertFalse(m('bar'))
    486 
    487         self.assertEqual(h['foo'],'bar')
    488         h['foo'] = 'baz'
    489         self.assertEqual(h['FOO'],'baz')
    490         self.assertEqual(h.get_all('foo'),['baz'])
    491 
    492         self.assertEqual(h.get("foo","whee"), "baz")
    493         self.assertEqual(h.get("zoo","whee"), "whee")
    494         self.assertEqual(h.setdefault("foo","whee"), "baz")
    495         self.assertEqual(h.setdefault("zoo","whee"), "whee")
    496         self.assertEqual(h["foo"],"baz")
    497         self.assertEqual(h["zoo"],"whee")
    498 
    499     def testRequireList(self):
    500         self.assertRaises(TypeError, Headers, "foo")
    501 
    502     def testExtras(self):
    503         h = Headers()
    504         self.assertEqual(str(h),'\r\n')
    505 
    506         h.add_header('foo','bar',baz="spam")
    507         self.assertEqual(h['foo'], 'bar; baz="spam"')
    508         self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
    509 
    510         h.add_header('Foo','bar',cheese=None)
    511         self.assertEqual(h.get_all('foo'),
    512             ['bar; baz="spam"', 'bar; cheese'])
    513 
    514         self.assertEqual(str(h),
    515             'foo: bar; baz="spam"\r\n'
    516             'Foo: bar; cheese\r\n'
    517             '\r\n'
    518         )
    519 
    520 class ErrorHandler(BaseCGIHandler):
    521     """Simple handler subclass for testing BaseHandler"""
    522 
    523     # BaseHandler records the OS environment at import time, but envvars
    524     # might have been changed later by other tests, which trips up
    525     # HandlerTests.testEnviron().
    526     os_environ = dict(os.environ.items())
    527 
    528     def __init__(self,**kw):
    529         setup_testing_defaults(kw)
    530         BaseCGIHandler.__init__(
    531             self, BytesIO(), BytesIO(), StringIO(), kw,
    532             multithread=True, multiprocess=True
    533         )
    534 
    535 class TestHandler(ErrorHandler):
    536     """Simple handler subclass for testing BaseHandler, w/error passthru"""
    537 
    538     def handle_error(self):
    539         raise   # for testing, we want to see what's happening
    540 
    541 
    542 class HandlerTests(TestCase):
    543 
    544     def checkEnvironAttrs(self, handler):
    545         env = handler.environ
    546         for attr in [
    547             'version','multithread','multiprocess','run_once','file_wrapper'
    548         ]:
    549             if attr=='file_wrapper' and handler.wsgi_file_wrapper is None:
    550                 continue
    551             self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr])
    552 
    553     def checkOSEnviron(self,handler):
    554         empty = {}; setup_testing_defaults(empty)
    555         env = handler.environ
    556         from os import environ
    557         for k,v in environ.items():
    558             if k not in empty:
    559                 self.assertEqual(env[k],v)
    560         for k,v in empty.items():
    561             self.assertIn(k, env)
    562 
    563     def testEnviron(self):
    564         h = TestHandler(X="Y")
    565         h.setup_environ()
    566         self.checkEnvironAttrs(h)
    567         self.checkOSEnviron(h)
    568         self.assertEqual(h.environ["X"],"Y")
    569 
    570     def testCGIEnviron(self):
    571         h = BaseCGIHandler(None,None,None,{})
    572         h.setup_environ()
    573         for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
    574             self.assertIn(key, h.environ)
    575 
    576     def testScheme(self):
    577         h=TestHandler(HTTPS="on"); h.setup_environ()
    578         self.assertEqual(h.environ['wsgi.url_scheme'],'https')
    579         h=TestHandler(); h.setup_environ()
    580         self.assertEqual(h.environ['wsgi.url_scheme'],'http')
    581 
    582     def testAbstractMethods(self):
    583         h = BaseHandler()
    584         for name in [
    585             '_flush','get_stdin','get_stderr','add_cgi_vars'
    586         ]:
    587             self.assertRaises(NotImplementedError, getattr(h,name))
    588         self.assertRaises(NotImplementedError, h._write, "test")
    589 
    590     def testContentLength(self):
    591         # Demo one reason iteration is better than write()...  ;)
    592 
    593         def trivial_app1(e,s):
    594             s('200 OK',[])
    595             return [e['wsgi.url_scheme'].encode('iso-8859-1')]
    596 
    597         def trivial_app2(e,s):
    598             s('200 OK',[])(e['wsgi.url_scheme'].encode('iso-8859-1'))
    599             return []
    600 
    601         def trivial_app3(e,s):
    602             s('200 OK',[])
    603             return ['\u0442\u0435\u0441\u0442'.encode("utf-8")]
    604 
    605         def trivial_app4(e,s):
    606             # Simulate a response to a HEAD request
    607             s('200 OK',[('Content-Length', '12345')])
    608             return []
    609 
    610         h = TestHandler()
    611         h.run(trivial_app1)
    612         self.assertEqual(h.stdout.getvalue(),
    613             ("Status: 200 OK\r\n"
    614             "Content-Length: 4\r\n"
    615             "\r\n"
    616             "http").encode("iso-8859-1"))
    617 
    618         h = TestHandler()
    619         h.run(trivial_app2)
    620         self.assertEqual(h.stdout.getvalue(),
    621             ("Status: 200 OK\r\n"
    622             "\r\n"
    623             "http").encode("iso-8859-1"))
    624 
    625         h = TestHandler()
    626         h.run(trivial_app3)
    627         self.assertEqual(h.stdout.getvalue(),
    628             b'Status: 200 OK\r\n'
    629             b'Content-Length: 8\r\n'
    630             b'\r\n'
    631             b'\xd1\x82\xd0\xb5\xd1\x81\xd1\x82')
    632 
    633         h = TestHandler()
    634         h.run(trivial_app4)
    635         self.assertEqual(h.stdout.getvalue(),
    636             b'Status: 200 OK\r\n'
    637             b'Content-Length: 12345\r\n'
    638             b'\r\n')
    639 
    640     def testBasicErrorOutput(self):
    641 
    642         def non_error_app(e,s):
    643             s('200 OK',[])
    644             return []
    645 
    646         def error_app(e,s):
    647             raise AssertionError("This should be caught by handler")
    648 
    649         h = ErrorHandler()
    650         h.run(non_error_app)
    651         self.assertEqual(h.stdout.getvalue(),
    652             ("Status: 200 OK\r\n"
    653             "Content-Length: 0\r\n"
    654             "\r\n").encode("iso-8859-1"))
    655         self.assertEqual(h.stderr.getvalue(),"")
    656 
    657         h = ErrorHandler()
    658         h.run(error_app)
    659         self.assertEqual(h.stdout.getvalue(),
    660             ("Status: %s\r\n"
    661             "Content-Type: text/plain\r\n"
    662             "Content-Length: %d\r\n"
    663             "\r\n" % (h.error_status,len(h.error_body))).encode('iso-8859-1')
    664             + h.error_body)
    665 
    666         self.assertIn("AssertionError", h.stderr.getvalue())
    667 
    668     def testErrorAfterOutput(self):
    669         MSG = b"Some output has been sent"
    670         def error_app(e,s):
    671             s("200 OK",[])(MSG)
    672             raise AssertionError("This should be caught by handler")
    673 
    674         h = ErrorHandler()
    675         h.run(error_app)
    676         self.assertEqual(h.stdout.getvalue(),
    677             ("Status: 200 OK\r\n"
    678             "\r\n".encode("iso-8859-1")+MSG))
    679         self.assertIn("AssertionError", h.stderr.getvalue())
    680 
    681     def testHeaderFormats(self):
    682 
    683         def non_error_app(e,s):
    684             s('200 OK',[])
    685             return []
    686 
    687         stdpat = (
    688             r"HTTP/%s 200 OK\r\n"
    689             r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
    690             r"%s" r"Content-Length: 0\r\n" r"\r\n"
    691         )
    692         shortpat = (
    693             "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
    694         ).encode("iso-8859-1")
    695 
    696         for ssw in "FooBar/1.0", None:
    697             sw = ssw and "Server: %s\r\n" % ssw or ""
    698 
    699             for version in "1.0", "1.1":
    700                 for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
    701 
    702                     h = TestHandler(SERVER_PROTOCOL=proto)
    703                     h.origin_server = False
    704                     h.http_version = version
    705                     h.server_software = ssw
    706                     h.run(non_error_app)
    707                     self.assertEqual(shortpat,h.stdout.getvalue())
    708 
    709                     h = TestHandler(SERVER_PROTOCOL=proto)
    710                     h.origin_server = True
    711                     h.http_version = version
    712                     h.server_software = ssw
    713                     h.run(non_error_app)
    714                     if proto=="HTTP/0.9":
    715                         self.assertEqual(h.stdout.getvalue(),b"")
    716                     else:
    717                         self.assertTrue(
    718                             re.match((stdpat%(version,sw)).encode("iso-8859-1"),
    719                                 h.stdout.getvalue()),
    720                             ((stdpat%(version,sw)).encode("iso-8859-1"),
    721                                 h.stdout.getvalue())
    722                         )
    723 
    724     def testBytesData(self):
    725         def app(e, s):
    726             s("200 OK", [
    727                 ("Content-Type", "text/plain; charset=utf-8"),
    728                 ])
    729             return [b"data"]
    730 
    731         h = TestHandler()
    732         h.run(app)
    733         self.assertEqual(b"Status: 200 OK\r\n"
    734             b"Content-Type: text/plain; charset=utf-8\r\n"
    735             b"Content-Length: 4\r\n"
    736             b"\r\n"
    737             b"data",
    738             h.stdout.getvalue())
    739 
    740     def testCloseOnError(self):
    741         side_effects = {'close_called': False}
    742         MSG = b"Some output has been sent"
    743         def error_app(e,s):
    744             s("200 OK",[])(MSG)
    745             class CrashyIterable(object):
    746                 def __iter__(self):
    747                     while True:
    748                         yield b'blah'
    749                         raise AssertionError("This should be caught by handler")
    750                 def close(self):
    751                     side_effects['close_called'] = True
    752             return CrashyIterable()
    753 
    754         h = ErrorHandler()
    755         h.run(error_app)
    756         self.assertEqual(side_effects['close_called'], True)
    757 
    758     def testPartialWrite(self):
    759         written = bytearray()
    760 
    761         class PartialWriter:
    762             def write(self, b):
    763                 partial = b[:7]
    764                 written.extend(partial)
    765                 return len(partial)
    766 
    767             def flush(self):
    768                 pass
    769 
    770         environ = {"SERVER_PROTOCOL": "HTTP/1.0"}
    771         h = SimpleHandler(BytesIO(), PartialWriter(), sys.stderr, environ)
    772         msg = "should not do partial writes"
    773         with self.assertWarnsRegex(DeprecationWarning, msg):
    774             h.run(hello_app)
    775         self.assertEqual(b"HTTP/1.0 200 OK\r\n"
    776             b"Content-Type: text/plain\r\n"
    777             b"Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n"
    778             b"Content-Length: 13\r\n"
    779             b"\r\n"
    780             b"Hello, world!",
    781             written)
    782 
    783 
    784 if __name__ == "__main__":
    785     unittest.main()
    786