1 """Unittests for the various HTTPServer modules. 2 3 Written by Cody A.W. Somerville <cody-somerville (at] ubuntu.com>, 4 Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. 5 """ 6 7 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer 8 from SimpleHTTPServer import SimpleHTTPRequestHandler 9 from CGIHTTPServer import CGIHTTPRequestHandler 10 import CGIHTTPServer 11 12 import os 13 import sys 14 import re 15 import base64 16 import shutil 17 import urllib 18 import httplib 19 import tempfile 20 21 import unittest 22 23 from StringIO import StringIO 24 25 from test import test_support 26 threading = test_support.import_module('threading') 27 28 29 class NoLogRequestHandler: 30 def log_message(self, *args): 31 # don't write log messages to stderr 32 pass 33 34 class SocketlessRequestHandler(SimpleHTTPRequestHandler): 35 def __init__(self): 36 self.get_called = False 37 self.protocol_version = "HTTP/1.1" 38 39 def do_GET(self): 40 self.get_called = True 41 self.send_response(200) 42 self.send_header('Content-Type', 'text/html') 43 self.end_headers() 44 self.wfile.write(b'<html><body>Data</body></html>\r\n') 45 46 def log_message(self, format, *args): 47 pass 48 49 50 class TestServerThread(threading.Thread): 51 def __init__(self, test_object, request_handler): 52 threading.Thread.__init__(self) 53 self.request_handler = request_handler 54 self.test_object = test_object 55 56 def run(self): 57 self.server = HTTPServer(('', 0), self.request_handler) 58 self.test_object.PORT = self.server.socket.getsockname()[1] 59 self.test_object.server_started.set() 60 self.test_object = None 61 try: 62 self.server.serve_forever(0.05) 63 finally: 64 self.server.server_close() 65 66 def stop(self): 67 self.server.shutdown() 68 69 70 class BaseTestCase(unittest.TestCase): 71 def setUp(self): 72 self._threads = test_support.threading_setup() 73 os.environ = test_support.EnvironmentVarGuard() 74 self.server_started = threading.Event() 75 self.thread = TestServerThread(self, self.request_handler) 76 self.thread.start() 77 self.server_started.wait() 78 79 def tearDown(self): 80 self.thread.stop() 81 os.environ.__exit__() 82 test_support.threading_cleanup(*self._threads) 83 84 def request(self, uri, method='GET', body=None, headers={}): 85 self.connection = httplib.HTTPConnection('localhost', self.PORT) 86 self.connection.request(method, uri, body, headers) 87 return self.connection.getresponse() 88 89 class BaseHTTPRequestHandlerTestCase(unittest.TestCase): 90 """Test the functionality of the BaseHTTPServer focussing on 91 BaseHTTPRequestHandler. 92 """ 93 94 HTTPResponseMatch = re.compile('HTTP/1.[0-9]+ 200 OK') 95 96 def setUp (self): 97 self.handler = SocketlessRequestHandler() 98 99 def send_typical_request(self, message): 100 input = StringIO(message) 101 output = StringIO() 102 self.handler.rfile = input 103 self.handler.wfile = output 104 self.handler.handle_one_request() 105 output.seek(0) 106 return output.readlines() 107 108 def verify_get_called(self): 109 self.assertTrue(self.handler.get_called) 110 111 def verify_expected_headers(self, headers): 112 for fieldName in 'Server: ', 'Date: ', 'Content-Type: ': 113 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1) 114 115 def verify_http_server_response(self, response): 116 match = self.HTTPResponseMatch.search(response) 117 self.assertTrue(match is not None) 118 119 def test_http_1_1(self): 120 result = self.send_typical_request('GET / HTTP/1.1\r\n\r\n') 121 self.verify_http_server_response(result[0]) 122 self.verify_expected_headers(result[1:-1]) 123 self.verify_get_called() 124 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n') 125 126 def test_http_1_0(self): 127 result = self.send_typical_request('GET / HTTP/1.0\r\n\r\n') 128 self.verify_http_server_response(result[0]) 129 self.verify_expected_headers(result[1:-1]) 130 self.verify_get_called() 131 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n') 132 133 def test_http_0_9(self): 134 result = self.send_typical_request('GET / HTTP/0.9\r\n\r\n') 135 self.assertEqual(len(result), 1) 136 self.assertEqual(result[0], '<html><body>Data</body></html>\r\n') 137 self.verify_get_called() 138 139 def test_with_continue_1_0(self): 140 result = self.send_typical_request('GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n') 141 self.verify_http_server_response(result[0]) 142 self.verify_expected_headers(result[1:-1]) 143 self.verify_get_called() 144 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n') 145 146 def test_request_length(self): 147 # Issue #10714: huge request lines are discarded, to avoid Denial 148 # of Service attacks. 149 result = self.send_typical_request(b'GET ' + b'x' * 65537) 150 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n') 151 self.assertFalse(self.handler.get_called) 152 153 154 class BaseHTTPServerTestCase(BaseTestCase): 155 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): 156 protocol_version = 'HTTP/1.1' 157 default_request_version = 'HTTP/1.1' 158 159 def do_TEST(self): 160 self.send_response(204) 161 self.send_header('Content-Type', 'text/html') 162 self.send_header('Connection', 'close') 163 self.end_headers() 164 165 def do_KEEP(self): 166 self.send_response(204) 167 self.send_header('Content-Type', 'text/html') 168 self.send_header('Connection', 'keep-alive') 169 self.end_headers() 170 171 def do_KEYERROR(self): 172 self.send_error(999) 173 174 def do_CUSTOM(self): 175 self.send_response(999) 176 self.send_header('Content-Type', 'text/html') 177 self.send_header('Connection', 'close') 178 self.end_headers() 179 180 def setUp(self): 181 BaseTestCase.setUp(self) 182 self.con = httplib.HTTPConnection('localhost', self.PORT) 183 self.con.connect() 184 185 def test_command(self): 186 self.con.request('GET', '/') 187 res = self.con.getresponse() 188 self.assertEqual(res.status, 501) 189 190 def test_request_line_trimming(self): 191 self.con._http_vsn_str = 'HTTP/1.1\n' 192 self.con.putrequest('GET', '/') 193 self.con.endheaders() 194 res = self.con.getresponse() 195 self.assertEqual(res.status, 501) 196 197 def test_version_bogus(self): 198 self.con._http_vsn_str = 'FUBAR' 199 self.con.putrequest('GET', '/') 200 self.con.endheaders() 201 res = self.con.getresponse() 202 self.assertEqual(res.status, 400) 203 204 def test_version_digits(self): 205 self.con._http_vsn_str = 'HTTP/9.9.9' 206 self.con.putrequest('GET', '/') 207 self.con.endheaders() 208 res = self.con.getresponse() 209 self.assertEqual(res.status, 400) 210 211 def test_version_none_get(self): 212 self.con._http_vsn_str = '' 213 self.con.putrequest('GET', '/') 214 self.con.endheaders() 215 res = self.con.getresponse() 216 self.assertEqual(res.status, 501) 217 218 def test_version_none(self): 219 self.con._http_vsn_str = '' 220 self.con.putrequest('PUT', '/') 221 self.con.endheaders() 222 res = self.con.getresponse() 223 self.assertEqual(res.status, 400) 224 225 def test_version_invalid(self): 226 self.con._http_vsn = 99 227 self.con._http_vsn_str = 'HTTP/9.9' 228 self.con.putrequest('GET', '/') 229 self.con.endheaders() 230 res = self.con.getresponse() 231 self.assertEqual(res.status, 505) 232 233 def test_send_blank(self): 234 self.con._http_vsn_str = '' 235 self.con.putrequest('', '') 236 self.con.endheaders() 237 res = self.con.getresponse() 238 self.assertEqual(res.status, 400) 239 240 def test_header_close(self): 241 self.con.putrequest('GET', '/') 242 self.con.putheader('Connection', 'close') 243 self.con.endheaders() 244 res = self.con.getresponse() 245 self.assertEqual(res.status, 501) 246 247 def test_head_keep_alive(self): 248 self.con._http_vsn_str = 'HTTP/1.1' 249 self.con.putrequest('GET', '/') 250 self.con.putheader('Connection', 'keep-alive') 251 self.con.endheaders() 252 res = self.con.getresponse() 253 self.assertEqual(res.status, 501) 254 255 def test_handler(self): 256 self.con.request('TEST', '/') 257 res = self.con.getresponse() 258 self.assertEqual(res.status, 204) 259 260 def test_return_header_keep_alive(self): 261 self.con.request('KEEP', '/') 262 res = self.con.getresponse() 263 self.assertEqual(res.getheader('Connection'), 'keep-alive') 264 self.con.request('TEST', '/') 265 self.addCleanup(self.con.close) 266 267 def test_internal_key_error(self): 268 self.con.request('KEYERROR', '/') 269 res = self.con.getresponse() 270 self.assertEqual(res.status, 999) 271 272 def test_return_custom_status(self): 273 self.con.request('CUSTOM', '/') 274 res = self.con.getresponse() 275 self.assertEqual(res.status, 999) 276 277 278 class SimpleHTTPServerTestCase(BaseTestCase): 279 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): 280 pass 281 282 def setUp(self): 283 BaseTestCase.setUp(self) 284 self.cwd = os.getcwd() 285 basetempdir = tempfile.gettempdir() 286 os.chdir(basetempdir) 287 self.data = 'We are the knights who say Ni!' 288 self.tempdir = tempfile.mkdtemp(dir=basetempdir) 289 self.tempdir_name = os.path.basename(self.tempdir) 290 temp = open(os.path.join(self.tempdir, 'test'), 'wb') 291 temp.write(self.data) 292 temp.close() 293 294 def tearDown(self): 295 try: 296 os.chdir(self.cwd) 297 try: 298 shutil.rmtree(self.tempdir) 299 except: 300 pass 301 finally: 302 BaseTestCase.tearDown(self) 303 304 def check_status_and_reason(self, response, status, data=None): 305 body = response.read() 306 self.assertTrue(response) 307 self.assertEqual(response.status, status) 308 self.assertIsNotNone(response.reason) 309 if data: 310 self.assertEqual(data, body) 311 312 def test_get(self): 313 #constructs the path relative to the root directory of the HTTPServer 314 response = self.request(self.tempdir_name + '/test') 315 self.check_status_and_reason(response, 200, data=self.data) 316 response = self.request(self.tempdir_name + '/') 317 self.check_status_and_reason(response, 200) 318 response = self.request(self.tempdir_name) 319 self.check_status_and_reason(response, 301) 320 response = self.request('/ThisDoesNotExist') 321 self.check_status_and_reason(response, 404) 322 response = self.request('/' + 'ThisDoesNotExist' + '/') 323 self.check_status_and_reason(response, 404) 324 f = open(os.path.join(self.tempdir_name, 'index.html'), 'w') 325 response = self.request('/' + self.tempdir_name + '/') 326 self.check_status_and_reason(response, 200) 327 if os.name == 'posix': 328 # chmod won't work as expected on Windows platforms 329 os.chmod(self.tempdir, 0) 330 response = self.request(self.tempdir_name + '/') 331 self.check_status_and_reason(response, 404) 332 os.chmod(self.tempdir, 0755) 333 334 def test_head(self): 335 response = self.request( 336 self.tempdir_name + '/test', method='HEAD') 337 self.check_status_and_reason(response, 200) 338 self.assertEqual(response.getheader('content-length'), 339 str(len(self.data))) 340 self.assertEqual(response.getheader('content-type'), 341 'application/octet-stream') 342 343 def test_invalid_requests(self): 344 response = self.request('/', method='FOO') 345 self.check_status_and_reason(response, 501) 346 # requests must be case sensitive,so this should fail too 347 response = self.request('/', method='get') 348 self.check_status_and_reason(response, 501) 349 response = self.request('/', method='GETs') 350 self.check_status_and_reason(response, 501) 351 352 353 cgi_file1 = """\ 354 #!%s 355 356 print "Content-type: text/html" 357 print 358 print "Hello World" 359 """ 360 361 cgi_file2 = """\ 362 #!%s 363 import cgi 364 365 print "Content-type: text/html" 366 print 367 368 form = cgi.FieldStorage() 369 print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), 370 form.getfirst("bacon")) 371 """ 372 373 class CGIHTTPServerTestCase(BaseTestCase): 374 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): 375 pass 376 377 def setUp(self): 378 BaseTestCase.setUp(self) 379 self.parent_dir = tempfile.mkdtemp() 380 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') 381 os.mkdir(self.cgi_dir) 382 383 # The shebang line should be pure ASCII: use symlink if possible. 384 # See issue #7668. 385 if hasattr(os, 'symlink'): 386 self.pythonexe = os.path.join(self.parent_dir, 'python') 387 os.symlink(sys.executable, self.pythonexe) 388 else: 389 self.pythonexe = sys.executable 390 391 self.file1_path = os.path.join(self.cgi_dir, 'file1.py') 392 with open(self.file1_path, 'w') as file1: 393 file1.write(cgi_file1 % self.pythonexe) 394 os.chmod(self.file1_path, 0777) 395 396 self.file2_path = os.path.join(self.cgi_dir, 'file2.py') 397 with open(self.file2_path, 'w') as file2: 398 file2.write(cgi_file2 % self.pythonexe) 399 os.chmod(self.file2_path, 0777) 400 401 self.cwd = os.getcwd() 402 os.chdir(self.parent_dir) 403 404 def tearDown(self): 405 try: 406 os.chdir(self.cwd) 407 if self.pythonexe != sys.executable: 408 os.remove(self.pythonexe) 409 os.remove(self.file1_path) 410 os.remove(self.file2_path) 411 os.rmdir(self.cgi_dir) 412 os.rmdir(self.parent_dir) 413 finally: 414 BaseTestCase.tearDown(self) 415 416 def test_url_collapse_path_split(self): 417 test_vectors = { 418 '': ('/', ''), 419 '..': IndexError, 420 '/.//..': IndexError, 421 '/': ('/', ''), 422 '//': ('/', ''), 423 '/\\': ('/', '\\'), 424 '/.//': ('/', ''), 425 'cgi-bin/file1.py': ('/cgi-bin', 'file1.py'), 426 '/cgi-bin/file1.py': ('/cgi-bin', 'file1.py'), 427 'a': ('/', 'a'), 428 '/a': ('/', 'a'), 429 '//a': ('/', 'a'), 430 './a': ('/', 'a'), 431 './C:/': ('/C:', ''), 432 '/a/b': ('/a', 'b'), 433 '/a/b/': ('/a/b', ''), 434 '/a/b/c/..': ('/a/b', ''), 435 '/a/b/c/../d': ('/a/b', 'd'), 436 '/a/b/c/../d/e/../f': ('/a/b/d', 'f'), 437 '/a/b/c/../d/e/../../f': ('/a/b', 'f'), 438 '/a/b/c/../d/e/.././././..//f': ('/a/b', 'f'), 439 '../a/b/c/../d/e/.././././..//f': IndexError, 440 '/a/b/c/../d/e/../../../f': ('/a', 'f'), 441 '/a/b/c/../d/e/../../../../f': ('/', 'f'), 442 '/a/b/c/../d/e/../../../../../f': IndexError, 443 '/a/b/c/../d/e/../../../../f/..': ('/', ''), 444 } 445 for path, expected in test_vectors.iteritems(): 446 if isinstance(expected, type) and issubclass(expected, Exception): 447 self.assertRaises(expected, 448 CGIHTTPServer._url_collapse_path_split, path) 449 else: 450 actual = CGIHTTPServer._url_collapse_path_split(path) 451 self.assertEqual(expected, actual, 452 msg='path = %r\nGot: %r\nWanted: %r' % 453 (path, actual, expected)) 454 455 def test_headers_and_content(self): 456 res = self.request('/cgi-bin/file1.py') 457 self.assertEqual(('Hello World\n', 'text/html', 200), 458 (res.read(), res.getheader('Content-type'), res.status)) 459 460 def test_post(self): 461 params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) 462 headers = {'Content-type' : 'application/x-www-form-urlencoded'} 463 res = self.request('/cgi-bin/file2.py', 'POST', params, headers) 464 465 self.assertEqual(res.read(), '1, python, 123456\n') 466 467 def test_invaliduri(self): 468 res = self.request('/cgi-bin/invalid') 469 res.read() 470 self.assertEqual(res.status, 404) 471 472 def test_authorization(self): 473 headers = {'Authorization' : 'Basic %s' % 474 base64.b64encode('username:pass')} 475 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) 476 self.assertEqual(('Hello World\n', 'text/html', 200), 477 (res.read(), res.getheader('Content-type'), res.status)) 478 479 def test_no_leading_slash(self): 480 # http://bugs.python.org/issue2254 481 res = self.request('cgi-bin/file1.py') 482 self.assertEqual(('Hello World\n', 'text/html', 200), 483 (res.read(), res.getheader('Content-type'), res.status)) 484 485 def test_os_environ_is_not_altered(self): 486 signature = "Test CGI Server" 487 os.environ['SERVER_SOFTWARE'] = signature 488 res = self.request('/cgi-bin/file1.py') 489 self.assertEqual((b'Hello World\n', 'text/html', 200), 490 (res.read(), res.getheader('Content-type'), res.status)) 491 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) 492 493 494 class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): 495 """ Test url parsing """ 496 def setUp(self): 497 self.translated = os.getcwd() 498 self.translated = os.path.join(self.translated, 'filename') 499 self.handler = SocketlessRequestHandler() 500 501 def test_query_arguments(self): 502 path = self.handler.translate_path('/filename') 503 self.assertEqual(path, self.translated) 504 path = self.handler.translate_path('/filename?foo=bar') 505 self.assertEqual(path, self.translated) 506 path = self.handler.translate_path('/filename?a=b&spam=eggs#zot') 507 self.assertEqual(path, self.translated) 508 509 def test_start_with_double_slash(self): 510 path = self.handler.translate_path('//filename') 511 self.assertEqual(path, self.translated) 512 path = self.handler.translate_path('//filename?foo=bar') 513 self.assertEqual(path, self.translated) 514 515 516 def test_main(verbose=None): 517 try: 518 cwd = os.getcwd() 519 test_support.run_unittest(BaseHTTPRequestHandlerTestCase, 520 SimpleHTTPRequestHandlerTestCase, 521 BaseHTTPServerTestCase, 522 SimpleHTTPServerTestCase, 523 CGIHTTPServerTestCase 524 ) 525 finally: 526 os.chdir(cwd) 527 528 if __name__ == '__main__': 529 test_main() 530