Home | History | Annotate | Download | only in test
      1 """Unittests for the various HTTPServer modules.
      2 
      3 Written by Cody A.W. Somerville <cody-somerville (at] ubuntu.com>,
      4 Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
      5 """
      6 
      7 from http.server import BaseHTTPRequestHandler, HTTPServer, \
      8      SimpleHTTPRequestHandler, CGIHTTPRequestHandler
      9 from http import server, HTTPStatus
     10 
     11 import os
     12 import sys
     13 import re
     14 import base64
     15 import ntpath
     16 import shutil
     17 import urllib.parse
     18 import html
     19 import http.client
     20 import tempfile
     21 import time
     22 from io import BytesIO
     23 
     24 import unittest
     25 from test import support
     26 threading = support.import_module('threading')
     27 
     28 class NoLogRequestHandler:
     29     def log_message(self, *args):
     30         # don't write log messages to stderr
     31         pass
     32 
     33     def read(self, n=None):
     34         return ''
     35 
     36 
     37 class TestServerThread(threading.Thread):
     38     def __init__(self, test_object, request_handler):
     39         threading.Thread.__init__(self)
     40         self.request_handler = request_handler
     41         self.test_object = test_object
     42 
     43     def run(self):
     44         self.server = HTTPServer(('localhost', 0), self.request_handler)
     45         self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname()
     46         self.test_object.server_started.set()
     47         self.test_object = None
     48         try:
     49             self.server.serve_forever(0.05)
     50         finally:
     51             self.server.server_close()
     52 
     53     def stop(self):
     54         self.server.shutdown()
     55 
     56 
     57 class BaseTestCase(unittest.TestCase):
     58     def setUp(self):
     59         self._threads = support.threading_setup()
     60         os.environ = support.EnvironmentVarGuard()
     61         self.server_started = threading.Event()
     62         self.thread = TestServerThread(self, self.request_handler)
     63         self.thread.start()
     64         self.server_started.wait()
     65 
     66     def tearDown(self):
     67         self.thread.stop()
     68         self.thread = None
     69         os.environ.__exit__()
     70         support.threading_cleanup(*self._threads)
     71 
     72     def request(self, uri, method='GET', body=None, headers={}):
     73         self.connection = http.client.HTTPConnection(self.HOST, self.PORT)
     74         self.connection.request(method, uri, body, headers)
     75         return self.connection.getresponse()
     76 
     77 
     78 class BaseHTTPServerTestCase(BaseTestCase):
     79     class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
     80         protocol_version = 'HTTP/1.1'
     81         default_request_version = 'HTTP/1.1'
     82 
     83         def do_TEST(self):
     84             self.send_response(HTTPStatus.NO_CONTENT)
     85             self.send_header('Content-Type', 'text/html')
     86             self.send_header('Connection', 'close')
     87             self.end_headers()
     88 
     89         def do_KEEP(self):
     90             self.send_response(HTTPStatus.NO_CONTENT)
     91             self.send_header('Content-Type', 'text/html')
     92             self.send_header('Connection', 'keep-alive')
     93             self.end_headers()
     94 
     95         def do_KEYERROR(self):
     96             self.send_error(999)
     97 
     98         def do_NOTFOUND(self):
     99             self.send_error(HTTPStatus.NOT_FOUND)
    100 
    101         def do_EXPLAINERROR(self):
    102             self.send_error(999, "Short Message",
    103                             "This is a long \n explanation")
    104 
    105         def do_CUSTOM(self):
    106             self.send_response(999)
    107             self.send_header('Content-Type', 'text/html')
    108             self.send_header('Connection', 'close')
    109             self.end_headers()
    110 
    111         def do_LATINONEHEADER(self):
    112             self.send_response(999)
    113             self.send_header('X-Special', 'Dngerous Mind')
    114             self.send_header('Connection', 'close')
    115             self.end_headers()
    116             body = self.headers['x-special-incoming'].encode('utf-8')
    117             self.wfile.write(body)
    118 
    119         def do_SEND_ERROR(self):
    120             self.send_error(int(self.path[1:]))
    121 
    122         def do_HEAD(self):
    123             self.send_error(int(self.path[1:]))
    124 
    125     def setUp(self):
    126         BaseTestCase.setUp(self)
    127         self.con = http.client.HTTPConnection(self.HOST, self.PORT)
    128         self.con.connect()
    129 
    130     def test_command(self):
    131         self.con.request('GET', '/')
    132         res = self.con.getresponse()
    133         self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
    134 
    135     def test_request_line_trimming(self):
    136         self.con._http_vsn_str = 'HTTP/1.1\n'
    137         self.con.putrequest('XYZBOGUS', '/')
    138         self.con.endheaders()
    139         res = self.con.getresponse()
    140         self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
    141 
    142     def test_version_bogus(self):
    143         self.con._http_vsn_str = 'FUBAR'
    144         self.con.putrequest('GET', '/')
    145         self.con.endheaders()
    146         res = self.con.getresponse()
    147         self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
    148 
    149     def test_version_digits(self):
    150         self.con._http_vsn_str = 'HTTP/9.9.9'
    151         self.con.putrequest('GET', '/')
    152         self.con.endheaders()
    153         res = self.con.getresponse()
    154         self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
    155 
    156     def test_version_none_get(self):
    157         self.con._http_vsn_str = ''
    158         self.con.putrequest('GET', '/')
    159         self.con.endheaders()
    160         res = self.con.getresponse()
    161         self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
    162 
    163     def test_version_none(self):
    164         # Test that a valid method is rejected when not HTTP/1.x
    165         self.con._http_vsn_str = ''
    166         self.con.putrequest('CUSTOM', '/')
    167         self.con.endheaders()
    168         res = self.con.getresponse()
    169         self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
    170 
    171     def test_version_invalid(self):
    172         self.con._http_vsn = 99
    173         self.con._http_vsn_str = 'HTTP/9.9'
    174         self.con.putrequest('GET', '/')
    175         self.con.endheaders()
    176         res = self.con.getresponse()
    177         self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED)
    178 
    179     def test_send_blank(self):
    180         self.con._http_vsn_str = ''
    181         self.con.putrequest('', '')
    182         self.con.endheaders()
    183         res = self.con.getresponse()
    184         self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
    185 
    186     def test_header_close(self):
    187         self.con.putrequest('GET', '/')
    188         self.con.putheader('Connection', 'close')
    189         self.con.endheaders()
    190         res = self.con.getresponse()
    191         self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
    192 
    193     def test_header_keep_alive(self):
    194         self.con._http_vsn_str = 'HTTP/1.1'
    195         self.con.putrequest('GET', '/')
    196         self.con.putheader('Connection', 'keep-alive')
    197         self.con.endheaders()
    198         res = self.con.getresponse()
    199         self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
    200 
    201     def test_handler(self):
    202         self.con.request('TEST', '/')
    203         res = self.con.getresponse()
    204         self.assertEqual(res.status, HTTPStatus.NO_CONTENT)
    205 
    206     def test_return_header_keep_alive(self):
    207         self.con.request('KEEP', '/')
    208         res = self.con.getresponse()
    209         self.assertEqual(res.getheader('Connection'), 'keep-alive')
    210         self.con.request('TEST', '/')
    211         self.addCleanup(self.con.close)
    212 
    213     def test_internal_key_error(self):
    214         self.con.request('KEYERROR', '/')
    215         res = self.con.getresponse()
    216         self.assertEqual(res.status, 999)
    217 
    218     def test_return_custom_status(self):
    219         self.con.request('CUSTOM', '/')
    220         res = self.con.getresponse()
    221         self.assertEqual(res.status, 999)
    222 
    223     def test_return_explain_error(self):
    224         self.con.request('EXPLAINERROR', '/')
    225         res = self.con.getresponse()
    226         self.assertEqual(res.status, 999)
    227         self.assertTrue(int(res.getheader('Content-Length')))
    228 
    229     def test_latin1_header(self):
    230         self.con.request('LATINONEHEADER', '/', headers={
    231             'X-Special-Incoming':       'rger mit Unicode'
    232         })
    233         res = self.con.getresponse()
    234         self.assertEqual(res.getheader('X-Special'), 'Dngerous Mind')
    235         self.assertEqual(res.read(), 'rger mit Unicode'.encode('utf-8'))
    236 
    237     def test_error_content_length(self):
    238         # Issue #16088: standard error responses should have a content-length
    239         self.con.request('NOTFOUND', '/')
    240         res = self.con.getresponse()
    241         self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
    242 
    243         data = res.read()
    244         self.assertEqual(int(res.getheader('Content-Length')), len(data))
    245 
    246     def test_send_error(self):
    247         allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
    248                                          HTTPStatus.RESET_CONTENT)
    249         for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED,
    250                      HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT,
    251                      HTTPStatus.SWITCHING_PROTOCOLS):
    252             self.con.request('SEND_ERROR', '/{}'.format(code))
    253             res = self.con.getresponse()
    254             self.assertEqual(code, res.status)
    255             self.assertEqual(None, res.getheader('Content-Length'))
    256             self.assertEqual(None, res.getheader('Content-Type'))
    257             if code not in allow_transfer_encoding_codes:
    258                 self.assertEqual(None, res.getheader('Transfer-Encoding'))
    259 
    260             data = res.read()
    261             self.assertEqual(b'', data)
    262 
    263     def test_head_via_send_error(self):
    264         allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
    265                                          HTTPStatus.RESET_CONTENT)
    266         for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT,
    267                      HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT,
    268                      HTTPStatus.SWITCHING_PROTOCOLS):
    269             self.con.request('HEAD', '/{}'.format(code))
    270             res = self.con.getresponse()
    271             self.assertEqual(code, res.status)
    272             if code == HTTPStatus.OK:
    273                 self.assertTrue(int(res.getheader('Content-Length')) > 0)
    274                 self.assertIn('text/html', res.getheader('Content-Type'))
    275             else:
    276                 self.assertEqual(None, res.getheader('Content-Length'))
    277                 self.assertEqual(None, res.getheader('Content-Type'))
    278             if code not in allow_transfer_encoding_codes:
    279                 self.assertEqual(None, res.getheader('Transfer-Encoding'))
    280 
    281             data = res.read()
    282             self.assertEqual(b'', data)
    283 
    284 
    285 class RequestHandlerLoggingTestCase(BaseTestCase):
    286     class request_handler(BaseHTTPRequestHandler):
    287         protocol_version = 'HTTP/1.1'
    288         default_request_version = 'HTTP/1.1'
    289 
    290         def do_GET(self):
    291             self.send_response(HTTPStatus.OK)
    292             self.end_headers()
    293 
    294         def do_ERROR(self):
    295             self.send_error(HTTPStatus.NOT_FOUND, 'File not found')
    296 
    297     def test_get(self):
    298         self.con = http.client.HTTPConnection(self.HOST, self.PORT)
    299         self.con.connect()
    300 
    301         with support.captured_stderr() as err:
    302             self.con.request('GET', '/')
    303             self.con.getresponse()
    304 
    305         self.assertTrue(
    306             err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n'))
    307 
    308     def test_err(self):
    309         self.con = http.client.HTTPConnection(self.HOST, self.PORT)
    310         self.con.connect()
    311 
    312         with support.captured_stderr() as err:
    313             self.con.request('ERROR', '/')
    314             self.con.getresponse()
    315 
    316         lines = err.getvalue().split('\n')
    317         self.assertTrue(lines[0].endswith('code 404, message File not found'))
    318         self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -'))
    319 
    320 
    321 class SimpleHTTPServerTestCase(BaseTestCase):
    322     class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
    323         pass
    324 
    325     def setUp(self):
    326         BaseTestCase.setUp(self)
    327         self.cwd = os.getcwd()
    328         basetempdir = tempfile.gettempdir()
    329         os.chdir(basetempdir)
    330         self.data = b'We are the knights who say Ni!'
    331         self.tempdir = tempfile.mkdtemp(dir=basetempdir)
    332         self.tempdir_name = os.path.basename(self.tempdir)
    333         self.base_url = '/' + self.tempdir_name
    334         with open(os.path.join(self.tempdir, 'test'), 'wb') as temp:
    335             temp.write(self.data)
    336 
    337     def tearDown(self):
    338         try:
    339             os.chdir(self.cwd)
    340             try:
    341                 shutil.rmtree(self.tempdir)
    342             except:
    343                 pass
    344         finally:
    345             BaseTestCase.tearDown(self)
    346 
    347     def check_status_and_reason(self, response, status, data=None):
    348         def close_conn():
    349             """Don't close reader yet so we can check if there was leftover
    350             buffered input"""
    351             nonlocal reader
    352             reader = response.fp
    353             response.fp = None
    354         reader = None
    355         response._close_conn = close_conn
    356 
    357         body = response.read()
    358         self.assertTrue(response)
    359         self.assertEqual(response.status, status)
    360         self.assertIsNotNone(response.reason)
    361         if data:
    362             self.assertEqual(data, body)
    363         # Ensure the server has not set up a persistent connection, and has
    364         # not sent any extra data
    365         self.assertEqual(response.version, 10)
    366         self.assertEqual(response.msg.get("Connection", "close"), "close")
    367         self.assertEqual(reader.read(30), b'', 'Connection should be closed')
    368 
    369         reader.close()
    370         return body
    371 
    372     @support.requires_mac_ver(10, 5)
    373     @unittest.skipIf(sys.platform == 'win32',
    374                      'undecodable name cannot be decoded on win32')
    375     @unittest.skipUnless(support.TESTFN_UNDECODABLE,
    376                          'need support.TESTFN_UNDECODABLE')
    377     def test_undecodable_filename(self):
    378         enc = sys.getfilesystemencoding()
    379         filename = os.fsdecode(support.TESTFN_UNDECODABLE) + '.txt'
    380         with open(os.path.join(self.tempdir, filename), 'wb') as f:
    381             f.write(support.TESTFN_UNDECODABLE)
    382         response = self.request(self.base_url + '/')
    383         if sys.platform == 'darwin':
    384             # On Mac OS the HFS+ filesystem replaces bytes that aren't valid
    385             # UTF-8 into a percent-encoded value.
    386             for name in os.listdir(self.tempdir):
    387                 if name != 'test': # Ignore a filename created in setUp().
    388                     filename = name
    389                     break
    390         body = self.check_status_and_reason(response, HTTPStatus.OK)
    391         quotedname = urllib.parse.quote(filename, errors='surrogatepass')
    392         self.assertIn(('href="%s"' % quotedname)
    393                       .encode(enc, 'surrogateescape'), body)
    394         self.assertIn(('>%s<' % html.escape(filename, quote=False))
    395                       .encode(enc, 'surrogateescape'), body)
    396         response = self.request(self.base_url + '/' + quotedname)
    397         self.check_status_and_reason(response, HTTPStatus.OK,
    398                                      data=support.TESTFN_UNDECODABLE)
    399 
    400     def test_get(self):
    401         #constructs the path relative to the root directory of the HTTPServer
    402         response = self.request(self.base_url + '/test')
    403         self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
    404         # check for trailing "/" which should return 404. See Issue17324
    405         response = self.request(self.base_url + '/test/')
    406         self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
    407         response = self.request(self.base_url + '/')
    408         self.check_status_and_reason(response, HTTPStatus.OK)
    409         response = self.request(self.base_url)
    410         self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
    411         response = self.request(self.base_url + '/?hi=2')
    412         self.check_status_and_reason(response, HTTPStatus.OK)
    413         response = self.request(self.base_url + '?hi=1')
    414         self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
    415         self.assertEqual(response.getheader("Location"),
    416                          self.base_url + "/?hi=1")
    417         response = self.request('/ThisDoesNotExist')
    418         self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
    419         response = self.request('/' + 'ThisDoesNotExist' + '/')
    420         self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
    421 
    422         data = b"Dummy index file\r\n"
    423         with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f:
    424             f.write(data)
    425         response = self.request(self.base_url + '/')
    426         self.check_status_and_reason(response, HTTPStatus.OK, data)
    427 
    428         # chmod() doesn't work as expected on Windows, and filesystem
    429         # permissions are ignored by root on Unix.
    430         if os.name == 'posix' and os.geteuid() != 0:
    431             os.chmod(self.tempdir, 0)
    432             try:
    433                 response = self.request(self.base_url + '/')
    434                 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
    435             finally:
    436                 os.chmod(self.tempdir, 0o755)
    437 
    438     def test_head(self):
    439         response = self.request(
    440             self.base_url + '/test', method='HEAD')
    441         self.check_status_and_reason(response, HTTPStatus.OK)
    442         self.assertEqual(response.getheader('content-length'),
    443                          str(len(self.data)))
    444         self.assertEqual(response.getheader('content-type'),
    445                          'application/octet-stream')
    446 
    447     def test_invalid_requests(self):
    448         response = self.request('/', method='FOO')
    449         self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
    450         # requests must be case sensitive,so this should fail too
    451         response = self.request('/', method='custom')
    452         self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
    453         response = self.request('/', method='GETs')
    454         self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
    455 
    456     def test_path_without_leading_slash(self):
    457         response = self.request(self.tempdir_name + '/test')
    458         self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
    459         response = self.request(self.tempdir_name + '/test/')
    460         self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
    461         response = self.request(self.tempdir_name + '/')
    462         self.check_status_and_reason(response, HTTPStatus.OK)
    463         response = self.request(self.tempdir_name)
    464         self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
    465         response = self.request(self.tempdir_name + '/?hi=2')
    466         self.check_status_and_reason(response, HTTPStatus.OK)
    467         response = self.request(self.tempdir_name + '?hi=1')
    468         self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
    469         self.assertEqual(response.getheader("Location"),
    470                          self.tempdir_name + "/?hi=1")
    471 
    472     def test_html_escape_filename(self):
    473         filename = '<test&>.txt'
    474         fullpath = os.path.join(self.tempdir, filename)
    475 
    476         try:
    477             open(fullpath, 'w').close()
    478         except OSError:
    479             raise unittest.SkipTest('Can not create file %s on current file '
    480                                     'system' % filename)
    481 
    482         try:
    483             response = self.request(self.base_url + '/')
    484             body = self.check_status_and_reason(response, HTTPStatus.OK)
    485             enc = response.headers.get_content_charset()
    486         finally:
    487             os.unlink(fullpath)  # avoid affecting test_undecodable_filename
    488 
    489         self.assertIsNotNone(enc)
    490         html_text = '>%s<' % html.escape(filename, quote=False)
    491         self.assertIn(html_text.encode(enc), body)
    492 
    493 
    494 cgi_file1 = """\
    495 #!%s
    496 
    497 print("Content-type: text/html")
    498 print()
    499 print("Hello World")
    500 """
    501 
    502 cgi_file2 = """\
    503 #!%s
    504 import cgi
    505 
    506 print("Content-type: text/html")
    507 print()
    508 
    509 form = cgi.FieldStorage()
    510 print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),
    511                           form.getfirst("bacon")))
    512 """
    513 
    514 cgi_file4 = """\
    515 #!%s
    516 import os
    517 
    518 print("Content-type: text/html")
    519 print()
    520 
    521 print(os.environ["%s"])
    522 """
    523 
    524 
    525 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
    526         "This test can't be run reliably as root (issue #13308).")
    527 class CGIHTTPServerTestCase(BaseTestCase):
    528     class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
    529         pass
    530 
    531     linesep = os.linesep.encode('ascii')
    532 
    533     def setUp(self):
    534         BaseTestCase.setUp(self)
    535         self.cwd = os.getcwd()
    536         self.parent_dir = tempfile.mkdtemp()
    537         self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
    538         self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir')
    539         os.mkdir(self.cgi_dir)
    540         os.mkdir(self.cgi_child_dir)
    541         self.nocgi_path = None
    542         self.file1_path = None
    543         self.file2_path = None
    544         self.file3_path = None
    545         self.file4_path = None
    546 
    547         # The shebang line should be pure ASCII: use symlink if possible.
    548         # See issue #7668.
    549         if support.can_symlink():
    550             self.pythonexe = os.path.join(self.parent_dir, 'python')
    551             os.symlink(sys.executable, self.pythonexe)
    552         else:
    553             self.pythonexe = sys.executable
    554 
    555         try:
    556             # The python executable path is written as the first line of the
    557             # CGI Python script. The encoding cookie cannot be used, and so the
    558             # path should be encodable to the default script encoding (utf-8)
    559             self.pythonexe.encode('utf-8')
    560         except UnicodeEncodeError:
    561             self.tearDown()
    562             self.skipTest("Python executable path is not encodable to utf-8")
    563 
    564         self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py')
    565         with open(self.nocgi_path, 'w') as fp:
    566             fp.write(cgi_file1 % self.pythonexe)
    567         os.chmod(self.nocgi_path, 0o777)
    568 
    569         self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
    570         with open(self.file1_path, 'w', encoding='utf-8') as file1:
    571             file1.write(cgi_file1 % self.pythonexe)
    572         os.chmod(self.file1_path, 0o777)
    573 
    574         self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
    575         with open(self.file2_path, 'w', encoding='utf-8') as file2:
    576             file2.write(cgi_file2 % self.pythonexe)
    577         os.chmod(self.file2_path, 0o777)
    578 
    579         self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py')
    580         with open(self.file3_path, 'w', encoding='utf-8') as file3:
    581             file3.write(cgi_file1 % self.pythonexe)
    582         os.chmod(self.file3_path, 0o777)
    583 
    584         self.file4_path = os.path.join(self.cgi_dir, 'file4.py')
    585         with open(self.file4_path, 'w', encoding='utf-8') as file4:
    586             file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING'))
    587         os.chmod(self.file4_path, 0o777)
    588 
    589         os.chdir(self.parent_dir)
    590 
    591     def tearDown(self):
    592         try:
    593             os.chdir(self.cwd)
    594             if self.pythonexe != sys.executable:
    595                 os.remove(self.pythonexe)
    596             if self.nocgi_path:
    597                 os.remove(self.nocgi_path)
    598             if self.file1_path:
    599                 os.remove(self.file1_path)
    600             if self.file2_path:
    601                 os.remove(self.file2_path)
    602             if self.file3_path:
    603                 os.remove(self.file3_path)
    604             if self.file4_path:
    605                 os.remove(self.file4_path)
    606             os.rmdir(self.cgi_child_dir)
    607             os.rmdir(self.cgi_dir)
    608             os.rmdir(self.parent_dir)
    609         finally:
    610             BaseTestCase.tearDown(self)
    611 
    612     def test_url_collapse_path(self):
    613         # verify tail is the last portion and head is the rest on proper urls
    614         test_vectors = {
    615             '': '//',
    616             '..': IndexError,
    617             '/.//..': IndexError,
    618             '/': '//',
    619             '//': '//',
    620             '/\\': '//\\',
    621             '/.//': '//',
    622             'cgi-bin/file1.py': '/cgi-bin/file1.py',
    623             '/cgi-bin/file1.py': '/cgi-bin/file1.py',
    624             'a': '//a',
    625             '/a': '//a',
    626             '//a': '//a',
    627             './a': '//a',
    628             './C:/': '/C:/',
    629             '/a/b': '/a/b',
    630             '/a/b/': '/a/b/',
    631             '/a/b/.': '/a/b/',
    632             '/a/b/c/..': '/a/b/',
    633             '/a/b/c/../d': '/a/b/d',
    634             '/a/b/c/../d/e/../f': '/a/b/d/f',
    635             '/a/b/c/../d/e/../../f': '/a/b/f',
    636             '/a/b/c/../d/e/.././././..//f': '/a/b/f',
    637             '../a/b/c/../d/e/.././././..//f': IndexError,
    638             '/a/b/c/../d/e/../../../f': '/a/f',
    639             '/a/b/c/../d/e/../../../../f': '//f',
    640             '/a/b/c/../d/e/../../../../../f': IndexError,
    641             '/a/b/c/../d/e/../../../../f/..': '//',
    642             '/a/b/c/../d/e/../../../../f/../.': '//',
    643         }
    644         for path, expected in test_vectors.items():
    645             if isinstance(expected, type) and issubclass(expected, Exception):
    646                 self.assertRaises(expected,
    647                                   server._url_collapse_path, path)
    648             else:
    649                 actual = server._url_collapse_path(path)
    650                 self.assertEqual(expected, actual,
    651                                  msg='path = %r\nGot:    %r\nWanted: %r' %
    652                                  (path, actual, expected))
    653 
    654     def test_headers_and_content(self):
    655         res = self.request('/cgi-bin/file1.py')
    656         self.assertEqual(
    657             (res.read(), res.getheader('Content-type'), res.status),
    658             (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK))
    659 
    660     def test_issue19435(self):
    661         res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh')
    662         self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
    663 
    664     def test_post(self):
    665         params = urllib.parse.urlencode(
    666             {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
    667         headers = {'Content-type' : 'application/x-www-form-urlencoded'}
    668         res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
    669 
    670         self.assertEqual(res.read(), b'1, python, 123456' + self.linesep)
    671 
    672     def test_invaliduri(self):
    673         res = self.request('/cgi-bin/invalid')
    674         res.read()
    675         self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
    676 
    677     def test_authorization(self):
    678         headers = {b'Authorization' : b'Basic ' +
    679                    base64.b64encode(b'username:pass')}
    680         res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
    681         self.assertEqual(
    682             (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
    683             (res.read(), res.getheader('Content-type'), res.status))
    684 
    685     def test_no_leading_slash(self):
    686         # http://bugs.python.org/issue2254
    687         res = self.request('cgi-bin/file1.py')
    688         self.assertEqual(
    689             (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
    690             (res.read(), res.getheader('Content-type'), res.status))
    691 
    692     def test_os_environ_is_not_altered(self):
    693         signature = "Test CGI Server"
    694         os.environ['SERVER_SOFTWARE'] = signature
    695         res = self.request('/cgi-bin/file1.py')
    696         self.assertEqual(
    697             (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
    698             (res.read(), res.getheader('Content-type'), res.status))
    699         self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
    700 
    701     def test_urlquote_decoding_in_cgi_check(self):
    702         res = self.request('/cgi-bin%2ffile1.py')
    703         self.assertEqual(
    704             (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
    705             (res.read(), res.getheader('Content-type'), res.status))
    706 
    707     def test_nested_cgi_path_issue21323(self):
    708         res = self.request('/cgi-bin/child-dir/file3.py')
    709         self.assertEqual(
    710             (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
    711             (res.read(), res.getheader('Content-type'), res.status))
    712 
    713     def test_query_with_multiple_question_mark(self):
    714         res = self.request('/cgi-bin/file4.py?a=b?c=d')
    715         self.assertEqual(
    716             (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK),
    717             (res.read(), res.getheader('Content-type'), res.status))
    718 
    719     def test_query_with_continuous_slashes(self):
    720         res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//')
    721         self.assertEqual(
    722             (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep,
    723              'text/html', HTTPStatus.OK),
    724             (res.read(), res.getheader('Content-type'), res.status))
    725 
    726 
    727 class SocketlessRequestHandler(SimpleHTTPRequestHandler):
    728     def __init__(self):
    729         self.get_called = False
    730         self.protocol_version = "HTTP/1.1"
    731 
    732     def do_GET(self):
    733         self.get_called = True
    734         self.send_response(HTTPStatus.OK)
    735         self.send_header('Content-Type', 'text/html')
    736         self.end_headers()
    737         self.wfile.write(b'<html><body>Data</body></html>\r\n')
    738 
    739     def log_message(self, format, *args):
    740         pass
    741 
    742 class RejectingSocketlessRequestHandler(SocketlessRequestHandler):
    743     def handle_expect_100(self):
    744         self.send_error(HTTPStatus.EXPECTATION_FAILED)
    745         return False
    746 
    747 
    748 class AuditableBytesIO:
    749 
    750     def __init__(self):
    751         self.datas = []
    752 
    753     def write(self, data):
    754         self.datas.append(data)
    755 
    756     def getData(self):
    757         return b''.join(self.datas)
    758 
    759     @property
    760     def numWrites(self):
    761         return len(self.datas)
    762 
    763 
    764 class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
    765     """Test the functionality of the BaseHTTPServer.
    766 
    767        Test the support for the Expect 100-continue header.
    768        """
    769 
    770     HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK')
    771 
    772     def setUp (self):
    773         self.handler = SocketlessRequestHandler()
    774 
    775     def send_typical_request(self, message):
    776         input = BytesIO(message)
    777         output = BytesIO()
    778         self.handler.rfile = input
    779         self.handler.wfile = output
    780         self.handler.handle_one_request()
    781         output.seek(0)
    782         return output.readlines()
    783 
    784     def verify_get_called(self):
    785         self.assertTrue(self.handler.get_called)
    786 
    787     def verify_expected_headers(self, headers):
    788         for fieldName in b'Server: ', b'Date: ', b'Content-Type: ':
    789             self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
    790 
    791     def verify_http_server_response(self, response):
    792         match = self.HTTPResponseMatch.search(response)
    793         self.assertIsNotNone(match)
    794 
    795     def test_http_1_1(self):
    796         result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n')
    797         self.verify_http_server_response(result[0])
    798         self.verify_expected_headers(result[1:-1])
    799         self.verify_get_called()
    800         self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
    801         self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
    802         self.assertEqual(self.handler.command, 'GET')
    803         self.assertEqual(self.handler.path, '/')
    804         self.assertEqual(self.handler.request_version, 'HTTP/1.1')
    805         self.assertSequenceEqual(self.handler.headers.items(), ())
    806 
    807     def test_http_1_0(self):
    808         result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n')
    809         self.verify_http_server_response(result[0])
    810         self.verify_expected_headers(result[1:-1])
    811         self.verify_get_called()
    812         self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
    813         self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
    814         self.assertEqual(self.handler.command, 'GET')
    815         self.assertEqual(self.handler.path, '/')
    816         self.assertEqual(self.handler.request_version, 'HTTP/1.0')
    817         self.assertSequenceEqual(self.handler.headers.items(), ())
    818 
    819     def test_http_0_9(self):
    820         result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n')
    821         self.assertEqual(len(result), 1)
    822         self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n')
    823         self.verify_get_called()
    824 
    825     def test_with_continue_1_0(self):
    826         result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
    827         self.verify_http_server_response(result[0])
    828         self.verify_expected_headers(result[1:-1])
    829         self.verify_get_called()
    830         self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
    831         self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
    832         self.assertEqual(self.handler.command, 'GET')
    833         self.assertEqual(self.handler.path, '/')
    834         self.assertEqual(self.handler.request_version, 'HTTP/1.0')
    835         headers = (("Expect", "100-continue"),)
    836         self.assertSequenceEqual(self.handler.headers.items(), headers)
    837 
    838     def test_with_continue_1_1(self):
    839         result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
    840         self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n')
    841         self.assertEqual(result[1], b'\r\n')
    842         self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n')
    843         self.verify_expected_headers(result[2:-1])
    844         self.verify_get_called()
    845         self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
    846         self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
    847         self.assertEqual(self.handler.command, 'GET')
    848         self.assertEqual(self.handler.path, '/')
    849         self.assertEqual(self.handler.request_version, 'HTTP/1.1')
    850         headers = (("Expect", "100-continue"),)
    851         self.assertSequenceEqual(self.handler.headers.items(), headers)
    852 
    853     def test_header_buffering_of_send_error(self):
    854 
    855         input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
    856         output = AuditableBytesIO()
    857         handler = SocketlessRequestHandler()
    858         handler.rfile = input
    859         handler.wfile = output
    860         handler.request_version = 'HTTP/1.1'
    861         handler.requestline = ''
    862         handler.command = None
    863 
    864         handler.send_error(418)
    865         self.assertEqual(output.numWrites, 2)
    866 
    867     def test_header_buffering_of_send_response_only(self):
    868 
    869         input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
    870         output = AuditableBytesIO()
    871         handler = SocketlessRequestHandler()
    872         handler.rfile = input
    873         handler.wfile = output
    874         handler.request_version = 'HTTP/1.1'
    875 
    876         handler.send_response_only(418)
    877         self.assertEqual(output.numWrites, 0)
    878         handler.end_headers()
    879         self.assertEqual(output.numWrites, 1)
    880 
    881     def test_header_buffering_of_send_header(self):
    882 
    883         input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
    884         output = AuditableBytesIO()
    885         handler = SocketlessRequestHandler()
    886         handler.rfile = input
    887         handler.wfile = output
    888         handler.request_version = 'HTTP/1.1'
    889 
    890         handler.send_header('Foo', 'foo')
    891         handler.send_header('bar', 'bar')
    892         self.assertEqual(output.numWrites, 0)
    893         handler.end_headers()
    894         self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n')
    895         self.assertEqual(output.numWrites, 1)
    896 
    897     def test_header_unbuffered_when_continue(self):
    898 
    899         def _readAndReseek(f):
    900             pos = f.tell()
    901             f.seek(0)
    902             data = f.read()
    903             f.seek(pos)
    904             return data
    905 
    906         input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
    907         output = BytesIO()
    908         self.handler.rfile = input
    909         self.handler.wfile = output
    910         self.handler.request_version = 'HTTP/1.1'
    911 
    912         self.handler.handle_one_request()
    913         self.assertNotEqual(_readAndReseek(output), b'')
    914         result = _readAndReseek(output).split(b'\r\n')
    915         self.assertEqual(result[0], b'HTTP/1.1 100 Continue')
    916         self.assertEqual(result[1], b'')
    917         self.assertEqual(result[2], b'HTTP/1.1 200 OK')
    918 
    919     def test_with_continue_rejected(self):
    920         usual_handler = self.handler        # Save to avoid breaking any subsequent tests.
    921         self.handler = RejectingSocketlessRequestHandler()
    922         result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
    923         self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n')
    924         self.verify_expected_headers(result[1:-1])
    925         # The expect handler should short circuit the usual get method by
    926         # returning false here, so get_called should be false
    927         self.assertFalse(self.handler.get_called)
    928         self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1)
    929         self.handler = usual_handler        # Restore to avoid breaking any subsequent tests.
    930 
    931     def test_request_length(self):
    932         # Issue #10714: huge request lines are discarded, to avoid Denial
    933         # of Service attacks.
    934         result = self.send_typical_request(b'GET ' + b'x' * 65537)
    935         self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
    936         self.assertFalse(self.handler.get_called)
    937         self.assertIsInstance(self.handler.requestline, str)
    938 
    939     def test_header_length(self):
    940         # Issue #6791: same for headers
    941         result = self.send_typical_request(
    942             b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n')
    943         self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n')
    944         self.assertFalse(self.handler.get_called)
    945         self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
    946 
    947     def test_too_many_headers(self):
    948         result = self.send_typical_request(
    949             b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n')
    950         self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n')
    951         self.assertFalse(self.handler.get_called)
    952         self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
    953 
    954     def test_html_escape_on_error(self):
    955         result = self.send_typical_request(
    956             b'<script>alert("hello")</script> / HTTP/1.1')
    957         result = b''.join(result)
    958         text = '<script>alert("hello")</script>'
    959         self.assertIn(html.escape(text, quote=False).encode('ascii'), result)
    960 
    961     def test_close_connection(self):
    962         # handle_one_request() should be repeatedly called until
    963         # it sets close_connection
    964         def handle_one_request():
    965             self.handler.close_connection = next(close_values)
    966         self.handler.handle_one_request = handle_one_request
    967 
    968         close_values = iter((True,))
    969         self.handler.handle()
    970         self.assertRaises(StopIteration, next, close_values)
    971 
    972         close_values = iter((False, False, True))
    973         self.handler.handle()
    974         self.assertRaises(StopIteration, next, close_values)
    975 
    976     def test_date_time_string(self):
    977         now = time.time()
    978         # this is the old code that formats the timestamp
    979         year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now)
    980         expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
    981             self.handler.weekdayname[wd],
    982             day,
    983             self.handler.monthname[month],
    984             year, hh, mm, ss
    985         )
    986         self.assertEqual(self.handler.date_time_string(timestamp=now), expected)
    987 
    988 
    989 class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
    990     """ Test url parsing """
    991     def setUp(self):
    992         self.translated = os.getcwd()
    993         self.translated = os.path.join(self.translated, 'filename')
    994         self.handler = SocketlessRequestHandler()
    995 
    996     def test_query_arguments(self):
    997         path = self.handler.translate_path('/filename')
    998         self.assertEqual(path, self.translated)
    999         path = self.handler.translate_path('/filename?foo=bar')
   1000         self.assertEqual(path, self.translated)
   1001         path = self.handler.translate_path('/filename?a=b&spam=eggs#zot')
   1002         self.assertEqual(path, self.translated)
   1003 
   1004     def test_start_with_double_slash(self):
   1005         path = self.handler.translate_path('//filename')
   1006         self.assertEqual(path, self.translated)
   1007         path = self.handler.translate_path('//filename?foo=bar')
   1008         self.assertEqual(path, self.translated)
   1009 
   1010     def test_windows_colon(self):
   1011         with support.swap_attr(server.os, 'path', ntpath):
   1012             path = self.handler.translate_path('c:c:c:foo/filename')
   1013             path = path.replace(ntpath.sep, os.sep)
   1014             self.assertEqual(path, self.translated)
   1015 
   1016             path = self.handler.translate_path('\\c:../filename')
   1017             path = path.replace(ntpath.sep, os.sep)
   1018             self.assertEqual(path, self.translated)
   1019 
   1020             path = self.handler.translate_path('c:\\c:..\\foo/filename')
   1021             path = path.replace(ntpath.sep, os.sep)
   1022             self.assertEqual(path, self.translated)
   1023 
   1024             path = self.handler.translate_path('c:c:foo\\c:c:bar/filename')
   1025             path = path.replace(ntpath.sep, os.sep)
   1026             self.assertEqual(path, self.translated)
   1027 
   1028 
   1029 class MiscTestCase(unittest.TestCase):
   1030     def test_all(self):
   1031         expected = []
   1032         blacklist = {'executable', 'nobody_uid', 'test'}
   1033         for name in dir(server):
   1034             if name.startswith('_') or name in blacklist:
   1035                 continue
   1036             module_object = getattr(server, name)
   1037             if getattr(module_object, '__module__', None) == 'http.server':
   1038                 expected.append(name)
   1039         self.assertCountEqual(server.__all__, expected)
   1040 
   1041 
   1042 def test_main(verbose=None):
   1043     cwd = os.getcwd()
   1044     try:
   1045         support.run_unittest(
   1046             RequestHandlerLoggingTestCase,
   1047             BaseHTTPRequestHandlerTestCase,
   1048             BaseHTTPServerTestCase,
   1049             SimpleHTTPServerTestCase,
   1050             CGIHTTPServerTestCase,
   1051             SimpleHTTPRequestHandlerTestCase,
   1052             MiscTestCase,
   1053         )
   1054     finally:
   1055         os.chdir(cwd)
   1056 
   1057 if __name__ == '__main__':
   1058     test_main()
   1059