Home | History | Annotate | Download | only in test
      1 """Unittests for the various HTTPServer modules.
      2 
      3 Written by Cody A.W. Somerville <cody-somerville (at] ubuntu.com>,
      4 Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
      5 """
      6 
      7 import os
      8 import sys
      9 import re
     10 import base64
     11 import shutil
     12 import urllib
     13 import httplib
     14 import tempfile
     15 import unittest
     16 import CGIHTTPServer
     17 
     18 
     19 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
     20 from SimpleHTTPServer import SimpleHTTPRequestHandler
     21 from CGIHTTPServer import CGIHTTPRequestHandler
     22 from StringIO import StringIO
     23 from test import test_support
     24 
     25 
     26 threading = test_support.import_module('threading')
     27 
     28 
     29 class NoLogRequestHandler:
     30     def log_message(self, *args):
     31         # don't write log messages to stderr
     32         pass
     33 
     34 class SocketlessRequestHandler(SimpleHTTPRequestHandler):
     35     def __init__(self):
     36         self.get_called = False
     37         self.protocol_version = "HTTP/1.1"
     38 
     39     def do_GET(self):
     40         self.get_called = True
     41         self.send_response(200)
     42         self.send_header('Content-Type', 'text/html')
     43         self.end_headers()
     44         self.wfile.write(b'<html><body>Data</body></html>\r\n')
     45 
     46     def log_message(self, fmt, *args):
     47         pass
     48 
     49 
     50 class TestServerThread(threading.Thread):
     51     def __init__(self, test_object, request_handler):
     52         threading.Thread.__init__(self)
     53         self.request_handler = request_handler
     54         self.test_object = test_object
     55 
     56     def run(self):
     57         self.server = HTTPServer(('', 0), self.request_handler)
     58         self.test_object.PORT = self.server.socket.getsockname()[1]
     59         self.test_object.server_started.set()
     60         self.test_object = None
     61         try:
     62             self.server.serve_forever(0.05)
     63         finally:
     64             self.server.server_close()
     65 
     66     def stop(self):
     67         self.server.shutdown()
     68 
     69 
     70 class BaseTestCase(unittest.TestCase):
     71     def setUp(self):
     72         self._threads = test_support.threading_setup()
     73         os.environ = test_support.EnvironmentVarGuard()
     74         self.server_started = threading.Event()
     75         self.thread = TestServerThread(self, self.request_handler)
     76         self.thread.start()
     77         self.server_started.wait()
     78 
     79     def tearDown(self):
     80         self.thread.stop()
     81         os.environ.__exit__()
     82         test_support.threading_cleanup(*self._threads)
     83 
     84     def request(self, uri, method='GET', body=None, headers={}):
     85         self.connection = httplib.HTTPConnection('localhost', self.PORT)
     86         self.connection.request(method, uri, body, headers)
     87         return self.connection.getresponse()
     88 
     89 class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
     90     """Test the functionality of the BaseHTTPServer focussing on
     91     BaseHTTPRequestHandler.
     92     """
     93 
     94     HTTPResponseMatch = re.compile('HTTP/1.[0-9]+ 200 OK')
     95 
     96     def setUp (self):
     97         self.handler = SocketlessRequestHandler()
     98 
     99     def send_typical_request(self, message):
    100         input_msg = StringIO(message)
    101         output = StringIO()
    102         self.handler.rfile = input_msg
    103         self.handler.wfile = output
    104         self.handler.handle_one_request()
    105         output.seek(0)
    106         return output.readlines()
    107 
    108     def verify_get_called(self):
    109         self.assertTrue(self.handler.get_called)
    110 
    111     def verify_expected_headers(self, headers):
    112         for fieldName in 'Server: ', 'Date: ', 'Content-Type: ':
    113             self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
    114 
    115     def verify_http_server_response(self, response):
    116         match = self.HTTPResponseMatch.search(response)
    117         self.assertTrue(match is not None)
    118 
    119     def test_http_1_1(self):
    120         result = self.send_typical_request('GET / HTTP/1.1\r\n\r\n')
    121         self.verify_http_server_response(result[0])
    122         self.verify_expected_headers(result[1:-1])
    123         self.verify_get_called()
    124         self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')
    125 
    126     def test_http_1_0(self):
    127         result = self.send_typical_request('GET / HTTP/1.0\r\n\r\n')
    128         self.verify_http_server_response(result[0])
    129         self.verify_expected_headers(result[1:-1])
    130         self.verify_get_called()
    131         self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')
    132 
    133     def test_http_0_9(self):
    134         result = self.send_typical_request('GET / HTTP/0.9\r\n\r\n')
    135         self.assertEqual(len(result), 1)
    136         self.assertEqual(result[0], '<html><body>Data</body></html>\r\n')
    137         self.verify_get_called()
    138 
    139     def test_with_continue_1_0(self):
    140         result = self.send_typical_request('GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
    141         self.verify_http_server_response(result[0])
    142         self.verify_expected_headers(result[1:-1])
    143         self.verify_get_called()
    144         self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')
    145 
    146     def test_request_length(self):
    147         # Issue #10714: huge request lines are discarded, to avoid Denial
    148         # of Service attacks.
    149         result = self.send_typical_request(b'GET ' + b'x' * 65537)
    150         self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
    151         self.assertFalse(self.handler.get_called)
    152 
    153 
    154 class BaseHTTPServerTestCase(BaseTestCase):
    155     class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
    156         protocol_version = 'HTTP/1.1'
    157         default_request_version = 'HTTP/1.1'
    158 
    159         def do_TEST(self):
    160             self.send_response(204)
    161             self.send_header('Content-Type', 'text/html')
    162             self.send_header('Connection', 'close')
    163             self.end_headers()
    164 
    165         def do_KEEP(self):
    166             self.send_response(204)
    167             self.send_header('Content-Type', 'text/html')
    168             self.send_header('Connection', 'keep-alive')
    169             self.end_headers()
    170 
    171         def do_KEYERROR(self):
    172             self.send_error(999)
    173 
    174         def do_CUSTOM(self):
    175             self.send_response(999)
    176             self.send_header('Content-Type', 'text/html')
    177             self.send_header('Connection', 'close')
    178             self.end_headers()
    179 
    180     def setUp(self):
    181         BaseTestCase.setUp(self)
    182         self.con = httplib.HTTPConnection('localhost', self.PORT)
    183         self.con.connect()
    184 
    185     def test_command(self):
    186         self.con.request('GET', '/')
    187         res = self.con.getresponse()
    188         self.assertEqual(res.status, 501)
    189 
    190     def test_request_line_trimming(self):
    191         self.con._http_vsn_str = 'HTTP/1.1\n'
    192         self.con.putrequest('GET', '/')
    193         self.con.endheaders()
    194         res = self.con.getresponse()
    195         self.assertEqual(res.status, 501)
    196 
    197     def test_version_bogus(self):
    198         self.con._http_vsn_str = 'FUBAR'
    199         self.con.putrequest('GET', '/')
    200         self.con.endheaders()
    201         res = self.con.getresponse()
    202         self.assertEqual(res.status, 400)
    203 
    204     def test_version_digits(self):
    205         self.con._http_vsn_str = 'HTTP/9.9.9'
    206         self.con.putrequest('GET', '/')
    207         self.con.endheaders()
    208         res = self.con.getresponse()
    209         self.assertEqual(res.status, 400)
    210 
    211     def test_version_none_get(self):
    212         self.con._http_vsn_str = ''
    213         self.con.putrequest('GET', '/')
    214         self.con.endheaders()
    215         res = self.con.getresponse()
    216         self.assertEqual(res.status, 501)
    217 
    218     def test_version_none(self):
    219         self.con._http_vsn_str = ''
    220         self.con.putrequest('PUT', '/')
    221         self.con.endheaders()
    222         res = self.con.getresponse()
    223         self.assertEqual(res.status, 400)
    224 
    225     def test_version_invalid(self):
    226         self.con._http_vsn = 99
    227         self.con._http_vsn_str = 'HTTP/9.9'
    228         self.con.putrequest('GET', '/')
    229         self.con.endheaders()
    230         res = self.con.getresponse()
    231         self.assertEqual(res.status, 505)
    232 
    233     def test_send_blank(self):
    234         self.con._http_vsn_str = ''
    235         self.con.putrequest('', '')
    236         self.con.endheaders()
    237         res = self.con.getresponse()
    238         self.assertEqual(res.status, 400)
    239 
    240     def test_header_close(self):
    241         self.con.putrequest('GET', '/')
    242         self.con.putheader('Connection', 'close')
    243         self.con.endheaders()
    244         res = self.con.getresponse()
    245         self.assertEqual(res.status, 501)
    246 
    247     def test_head_keep_alive(self):
    248         self.con._http_vsn_str = 'HTTP/1.1'
    249         self.con.putrequest('GET', '/')
    250         self.con.putheader('Connection', 'keep-alive')
    251         self.con.endheaders()
    252         res = self.con.getresponse()
    253         self.assertEqual(res.status, 501)
    254 
    255     def test_handler(self):
    256         self.con.request('TEST', '/')
    257         res = self.con.getresponse()
    258         self.assertEqual(res.status, 204)
    259 
    260     def test_return_header_keep_alive(self):
    261         self.con.request('KEEP', '/')
    262         res = self.con.getresponse()
    263         self.assertEqual(res.getheader('Connection'), 'keep-alive')
    264         self.con.request('TEST', '/')
    265         self.addCleanup(self.con.close)
    266 
    267     def test_internal_key_error(self):
    268         self.con.request('KEYERROR', '/')
    269         res = self.con.getresponse()
    270         self.assertEqual(res.status, 999)
    271 
    272     def test_return_custom_status(self):
    273         self.con.request('CUSTOM', '/')
    274         res = self.con.getresponse()
    275         self.assertEqual(res.status, 999)
    276 
    277 
    278 class SimpleHTTPServerTestCase(BaseTestCase):
    279     class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
    280         pass
    281 
    282     def setUp(self):
    283         BaseTestCase.setUp(self)
    284         self.cwd = os.getcwd()
    285         basetempdir = tempfile.gettempdir()
    286         os.chdir(basetempdir)
    287         self.data = 'We are the knights who say Ni!'
    288         self.tempdir = tempfile.mkdtemp(dir=basetempdir)
    289         self.tempdir_name = os.path.basename(self.tempdir)
    290         temp = open(os.path.join(self.tempdir, 'test'), 'wb')
    291         temp.write(self.data)
    292         temp.close()
    293 
    294     def tearDown(self):
    295         try:
    296             os.chdir(self.cwd)
    297             try:
    298                 shutil.rmtree(self.tempdir)
    299             except OSError:
    300                 pass
    301         finally:
    302             BaseTestCase.tearDown(self)
    303 
    304     def check_status_and_reason(self, response, status, data=None):
    305         body = response.read()
    306         self.assertTrue(response)
    307         self.assertEqual(response.status, status)
    308         self.assertIsNotNone(response.reason)
    309         if data:
    310             self.assertEqual(data, body)
    311 
    312     def test_get(self):
    313         #constructs the path relative to the root directory of the HTTPServer
    314         response = self.request(self.tempdir_name + '/test')
    315         self.check_status_and_reason(response, 200, data=self.data)
    316         response = self.request(self.tempdir_name + '/')
    317         self.check_status_and_reason(response, 200)
    318         response = self.request(self.tempdir_name)
    319         self.check_status_and_reason(response, 301)
    320         response = self.request('/ThisDoesNotExist')
    321         self.check_status_and_reason(response, 404)
    322         response = self.request('/' + 'ThisDoesNotExist' + '/')
    323         self.check_status_and_reason(response, 404)
    324         f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
    325         response = self.request('/' + self.tempdir_name + '/')
    326         self.check_status_and_reason(response, 200)
    327 
    328         # chmod() doesn't work as expected on Windows, and filesystem
    329         # permissions are ignored by root on Unix.
    330         if os.name == 'posix' and os.geteuid() != 0:
    331             os.chmod(self.tempdir, 0)
    332             response = self.request(self.tempdir_name + '/')
    333             self.check_status_and_reason(response, 404)
    334             os.chmod(self.tempdir, 0755)
    335 
    336     def test_head(self):
    337         response = self.request(
    338             self.tempdir_name + '/test', method='HEAD')
    339         self.check_status_and_reason(response, 200)
    340         self.assertEqual(response.getheader('content-length'),
    341                          str(len(self.data)))
    342         self.assertEqual(response.getheader('content-type'),
    343                          'application/octet-stream')
    344 
    345     def test_invalid_requests(self):
    346         response = self.request('/', method='FOO')
    347         self.check_status_and_reason(response, 501)
    348         # requests must be case sensitive,so this should fail too
    349         response = self.request('/', method='get')
    350         self.check_status_and_reason(response, 501)
    351         response = self.request('/', method='GETs')
    352         self.check_status_and_reason(response, 501)
    353 
    354 
    355 cgi_file1 = """\
    356 #!%s
    357 
    358 print "Content-type: text/html"
    359 print
    360 print "Hello World"
    361 """
    362 
    363 cgi_file2 = """\
    364 #!%s
    365 import cgi
    366 
    367 print "Content-type: text/html"
    368 print
    369 
    370 form = cgi.FieldStorage()
    371 print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),
    372                           form.getfirst("bacon"))
    373 """
    374 
    375 
    376 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
    377         "This test can't be run reliably as root (issue #13308).")
    378 class CGIHTTPServerTestCase(BaseTestCase):
    379     class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
    380         pass
    381 
    382     def setUp(self):
    383         BaseTestCase.setUp(self)
    384         self.parent_dir = tempfile.mkdtemp()
    385         self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
    386         os.mkdir(self.cgi_dir)
    387 
    388         # The shebang line should be pure ASCII: use symlink if possible.
    389         # See issue #7668.
    390         if hasattr(os, 'symlink'):
    391             self.pythonexe = os.path.join(self.parent_dir, 'python')
    392             os.symlink(sys.executable, self.pythonexe)
    393         else:
    394             self.pythonexe = sys.executable
    395 
    396         self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
    397         with open(self.file1_path, 'w') as file1:
    398             file1.write(cgi_file1 % self.pythonexe)
    399         os.chmod(self.file1_path, 0777)
    400 
    401         self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
    402         with open(self.file2_path, 'w') as file2:
    403             file2.write(cgi_file2 % self.pythonexe)
    404         os.chmod(self.file2_path, 0777)
    405 
    406         self.cwd = os.getcwd()
    407         os.chdir(self.parent_dir)
    408 
    409     def tearDown(self):
    410         try:
    411             os.chdir(self.cwd)
    412             if self.pythonexe != sys.executable:
    413                 os.remove(self.pythonexe)
    414             os.remove(self.file1_path)
    415             os.remove(self.file2_path)
    416             os.rmdir(self.cgi_dir)
    417             os.rmdir(self.parent_dir)
    418         finally:
    419             BaseTestCase.tearDown(self)
    420 
    421     def test_url_collapse_path(self):
    422         # verify tail is the last portion and head is the rest on proper urls
    423         test_vectors = {
    424             '': '//',
    425             '..': IndexError,
    426             '/.//..': IndexError,
    427             '/': '//',
    428             '//': '//',
    429             '/\\': '//\\',
    430             '/.//': '//',
    431             'cgi-bin/file1.py': '/cgi-bin/file1.py',
    432             '/cgi-bin/file1.py': '/cgi-bin/file1.py',
    433             'a': '//a',
    434             '/a': '//a',
    435             '//a': '//a',
    436             './a': '//a',
    437             './C:/': '/C:/',
    438             '/a/b': '/a/b',
    439             '/a/b/': '/a/b/',
    440             '/a/b/.': '/a/b/',
    441             '/a/b/c/..': '/a/b/',
    442             '/a/b/c/../d': '/a/b/d',
    443             '/a/b/c/../d/e/../f': '/a/b/d/f',
    444             '/a/b/c/../d/e/../../f': '/a/b/f',
    445             '/a/b/c/../d/e/.././././..//f': '/a/b/f',
    446             '../a/b/c/../d/e/.././././..//f': IndexError,
    447             '/a/b/c/../d/e/../../../f': '/a/f',
    448             '/a/b/c/../d/e/../../../../f': '//f',
    449             '/a/b/c/../d/e/../../../../../f': IndexError,
    450             '/a/b/c/../d/e/../../../../f/..': '//',
    451             '/a/b/c/../d/e/../../../../f/../.': '//',
    452         }
    453         for path, expected in test_vectors.iteritems():
    454             if isinstance(expected, type) and issubclass(expected, Exception):
    455                 self.assertRaises(expected,
    456                                   CGIHTTPServer._url_collapse_path, path)
    457             else:
    458                 actual = CGIHTTPServer._url_collapse_path(path)
    459                 self.assertEqual(expected, actual,
    460                                  msg='path = %r\nGot:    %r\nWanted: %r' %
    461                                  (path, actual, expected))
    462 
    463     def test_headers_and_content(self):
    464         res = self.request('/cgi-bin/file1.py')
    465         self.assertEqual(('Hello World\n', 'text/html', 200),
    466             (res.read(), res.getheader('Content-type'), res.status))
    467 
    468     def test_post(self):
    469         params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
    470         headers = {'Content-type' : 'application/x-www-form-urlencoded'}
    471         res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
    472 
    473         self.assertEqual(res.read(), '1, python, 123456\n')
    474 
    475     def test_invaliduri(self):
    476         res = self.request('/cgi-bin/invalid')
    477         res.read()
    478         self.assertEqual(res.status, 404)
    479 
    480     def test_authorization(self):
    481         headers = {'Authorization' : 'Basic %s' %
    482                    base64.b64encode('username:pass')}
    483         res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
    484         self.assertEqual(('Hello World\n', 'text/html', 200),
    485                 (res.read(), res.getheader('Content-type'), res.status))
    486 
    487     def test_no_leading_slash(self):
    488         # http://bugs.python.org/issue2254
    489         res = self.request('cgi-bin/file1.py')
    490         self.assertEqual(('Hello World\n', 'text/html', 200),
    491              (res.read(), res.getheader('Content-type'), res.status))
    492 
    493     def test_os_environ_is_not_altered(self):
    494         signature = "Test CGI Server"
    495         os.environ['SERVER_SOFTWARE'] = signature
    496         res = self.request('/cgi-bin/file1.py')
    497         self.assertEqual((b'Hello World\n', 'text/html', 200),
    498                 (res.read(), res.getheader('Content-type'), res.status))
    499         self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
    500 
    501 
    502 class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
    503     """ Test url parsing """
    504     def setUp(self):
    505         self.translated = os.getcwd()
    506         self.translated = os.path.join(self.translated, 'filename')
    507         self.handler = SocketlessRequestHandler()
    508 
    509     def test_query_arguments(self):
    510         path = self.handler.translate_path('/filename')
    511         self.assertEqual(path, self.translated)
    512         path = self.handler.translate_path('/filename?foo=bar')
    513         self.assertEqual(path, self.translated)
    514         path = self.handler.translate_path('/filename?a=b&spam=eggs#zot')
    515         self.assertEqual(path, self.translated)
    516 
    517     def test_start_with_double_slash(self):
    518         path = self.handler.translate_path('//filename')
    519         self.assertEqual(path, self.translated)
    520         path = self.handler.translate_path('//filename?foo=bar')
    521         self.assertEqual(path, self.translated)
    522 
    523 
    524 def test_main(verbose=None):
    525     try:
    526         cwd = os.getcwd()
    527         test_support.run_unittest(BaseHTTPRequestHandlerTestCase,
    528                                   SimpleHTTPRequestHandlerTestCase,
    529                                   BaseHTTPServerTestCase,
    530                                   SimpleHTTPServerTestCase,
    531                                   CGIHTTPServerTestCase
    532                                  )
    533     finally:
    534         os.chdir(cwd)
    535 
    536 if __name__ == '__main__':
    537     test_main()
    538