1 """Unittests for the various HTTPServer modules. 2 3 Written by Cody A.W. Somerville <cody-somerville (at] ubuntu.com>, 4 Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. 5 """ 6 7 import os 8 import sys 9 import re 10 import base64 11 import shutil 12 import urllib 13 import httplib 14 import tempfile 15 import unittest 16 import CGIHTTPServer 17 18 19 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer 20 from SimpleHTTPServer import SimpleHTTPRequestHandler 21 from CGIHTTPServer import CGIHTTPRequestHandler 22 from StringIO import StringIO 23 from test import test_support 24 25 26 threading = test_support.import_module('threading') 27 28 29 class NoLogRequestHandler: 30 def log_message(self, *args): 31 # don't write log messages to stderr 32 pass 33 34 class SocketlessRequestHandler(SimpleHTTPRequestHandler): 35 def __init__(self): 36 self.get_called = False 37 self.protocol_version = "HTTP/1.1" 38 39 def do_GET(self): 40 self.get_called = True 41 self.send_response(200) 42 self.send_header('Content-Type', 'text/html') 43 self.end_headers() 44 self.wfile.write(b'<html><body>Data</body></html>\r\n') 45 46 def log_message(self, fmt, *args): 47 pass 48 49 50 class TestServerThread(threading.Thread): 51 def __init__(self, test_object, request_handler): 52 threading.Thread.__init__(self) 53 self.request_handler = request_handler 54 self.test_object = test_object 55 56 def run(self): 57 self.server = HTTPServer(('', 0), self.request_handler) 58 self.test_object.PORT = self.server.socket.getsockname()[1] 59 self.test_object.server_started.set() 60 self.test_object = None 61 try: 62 self.server.serve_forever(0.05) 63 finally: 64 self.server.server_close() 65 66 def stop(self): 67 self.server.shutdown() 68 69 70 class BaseTestCase(unittest.TestCase): 71 def setUp(self): 72 self._threads = test_support.threading_setup() 73 os.environ = test_support.EnvironmentVarGuard() 74 self.server_started = threading.Event() 75 self.thread = TestServerThread(self, self.request_handler) 76 self.thread.start() 77 self.server_started.wait() 78 79 def tearDown(self): 80 self.thread.stop() 81 os.environ.__exit__() 82 test_support.threading_cleanup(*self._threads) 83 84 def request(self, uri, method='GET', body=None, headers={}): 85 self.connection = httplib.HTTPConnection('localhost', self.PORT) 86 self.connection.request(method, uri, body, headers) 87 return self.connection.getresponse() 88 89 class BaseHTTPRequestHandlerTestCase(unittest.TestCase): 90 """Test the functionality of the BaseHTTPServer focussing on 91 BaseHTTPRequestHandler. 92 """ 93 94 HTTPResponseMatch = re.compile('HTTP/1.[0-9]+ 200 OK') 95 96 def setUp (self): 97 self.handler = SocketlessRequestHandler() 98 99 def send_typical_request(self, message): 100 input_msg = StringIO(message) 101 output = StringIO() 102 self.handler.rfile = input_msg 103 self.handler.wfile = output 104 self.handler.handle_one_request() 105 output.seek(0) 106 return output.readlines() 107 108 def verify_get_called(self): 109 self.assertTrue(self.handler.get_called) 110 111 def verify_expected_headers(self, headers): 112 for fieldName in 'Server: ', 'Date: ', 'Content-Type: ': 113 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1) 114 115 def verify_http_server_response(self, response): 116 match = self.HTTPResponseMatch.search(response) 117 self.assertTrue(match is not None) 118 119 def test_http_1_1(self): 120 result = self.send_typical_request('GET / HTTP/1.1\r\n\r\n') 121 self.verify_http_server_response(result[0]) 122 self.verify_expected_headers(result[1:-1]) 123 self.verify_get_called() 124 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n') 125 126 def test_http_1_0(self): 127 result = self.send_typical_request('GET / HTTP/1.0\r\n\r\n') 128 self.verify_http_server_response(result[0]) 129 self.verify_expected_headers(result[1:-1]) 130 self.verify_get_called() 131 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n') 132 133 def test_http_0_9(self): 134 result = self.send_typical_request('GET / HTTP/0.9\r\n\r\n') 135 self.assertEqual(len(result), 1) 136 self.assertEqual(result[0], '<html><body>Data</body></html>\r\n') 137 self.verify_get_called() 138 139 def test_with_continue_1_0(self): 140 result = self.send_typical_request('GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n') 141 self.verify_http_server_response(result[0]) 142 self.verify_expected_headers(result[1:-1]) 143 self.verify_get_called() 144 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n') 145 146 def test_request_length(self): 147 # Issue #10714: huge request lines are discarded, to avoid Denial 148 # of Service attacks. 149 result = self.send_typical_request(b'GET ' + b'x' * 65537) 150 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n') 151 self.assertFalse(self.handler.get_called) 152 153 154 class BaseHTTPServerTestCase(BaseTestCase): 155 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): 156 protocol_version = 'HTTP/1.1' 157 default_request_version = 'HTTP/1.1' 158 159 def do_TEST(self): 160 self.send_response(204) 161 self.send_header('Content-Type', 'text/html') 162 self.send_header('Connection', 'close') 163 self.end_headers() 164 165 def do_KEEP(self): 166 self.send_response(204) 167 self.send_header('Content-Type', 'text/html') 168 self.send_header('Connection', 'keep-alive') 169 self.end_headers() 170 171 def do_KEYERROR(self): 172 self.send_error(999) 173 174 def do_CUSTOM(self): 175 self.send_response(999) 176 self.send_header('Content-Type', 'text/html') 177 self.send_header('Connection', 'close') 178 self.end_headers() 179 180 def setUp(self): 181 BaseTestCase.setUp(self) 182 self.con = httplib.HTTPConnection('localhost', self.PORT) 183 self.con.connect() 184 185 def test_command(self): 186 self.con.request('GET', '/') 187 res = self.con.getresponse() 188 self.assertEqual(res.status, 501) 189 190 def test_request_line_trimming(self): 191 self.con._http_vsn_str = 'HTTP/1.1\n' 192 self.con.putrequest('GET', '/') 193 self.con.endheaders() 194 res = self.con.getresponse() 195 self.assertEqual(res.status, 501) 196 197 def test_version_bogus(self): 198 self.con._http_vsn_str = 'FUBAR' 199 self.con.putrequest('GET', '/') 200 self.con.endheaders() 201 res = self.con.getresponse() 202 self.assertEqual(res.status, 400) 203 204 def test_version_digits(self): 205 self.con._http_vsn_str = 'HTTP/9.9.9' 206 self.con.putrequest('GET', '/') 207 self.con.endheaders() 208 res = self.con.getresponse() 209 self.assertEqual(res.status, 400) 210 211 def test_version_none_get(self): 212 self.con._http_vsn_str = '' 213 self.con.putrequest('GET', '/') 214 self.con.endheaders() 215 res = self.con.getresponse() 216 self.assertEqual(res.status, 501) 217 218 def test_version_none(self): 219 self.con._http_vsn_str = '' 220 self.con.putrequest('PUT', '/') 221 self.con.endheaders() 222 res = self.con.getresponse() 223 self.assertEqual(res.status, 400) 224 225 def test_version_invalid(self): 226 self.con._http_vsn = 99 227 self.con._http_vsn_str = 'HTTP/9.9' 228 self.con.putrequest('GET', '/') 229 self.con.endheaders() 230 res = self.con.getresponse() 231 self.assertEqual(res.status, 505) 232 233 def test_send_blank(self): 234 self.con._http_vsn_str = '' 235 self.con.putrequest('', '') 236 self.con.endheaders() 237 res = self.con.getresponse() 238 self.assertEqual(res.status, 400) 239 240 def test_header_close(self): 241 self.con.putrequest('GET', '/') 242 self.con.putheader('Connection', 'close') 243 self.con.endheaders() 244 res = self.con.getresponse() 245 self.assertEqual(res.status, 501) 246 247 def test_head_keep_alive(self): 248 self.con._http_vsn_str = 'HTTP/1.1' 249 self.con.putrequest('GET', '/') 250 self.con.putheader('Connection', 'keep-alive') 251 self.con.endheaders() 252 res = self.con.getresponse() 253 self.assertEqual(res.status, 501) 254 255 def test_handler(self): 256 self.con.request('TEST', '/') 257 res = self.con.getresponse() 258 self.assertEqual(res.status, 204) 259 260 def test_return_header_keep_alive(self): 261 self.con.request('KEEP', '/') 262 res = self.con.getresponse() 263 self.assertEqual(res.getheader('Connection'), 'keep-alive') 264 self.con.request('TEST', '/') 265 self.addCleanup(self.con.close) 266 267 def test_internal_key_error(self): 268 self.con.request('KEYERROR', '/') 269 res = self.con.getresponse() 270 self.assertEqual(res.status, 999) 271 272 def test_return_custom_status(self): 273 self.con.request('CUSTOM', '/') 274 res = self.con.getresponse() 275 self.assertEqual(res.status, 999) 276 277 278 class SimpleHTTPServerTestCase(BaseTestCase): 279 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): 280 pass 281 282 def setUp(self): 283 BaseTestCase.setUp(self) 284 self.cwd = os.getcwd() 285 basetempdir = tempfile.gettempdir() 286 os.chdir(basetempdir) 287 self.data = 'We are the knights who say Ni!' 288 self.tempdir = tempfile.mkdtemp(dir=basetempdir) 289 self.tempdir_name = os.path.basename(self.tempdir) 290 temp = open(os.path.join(self.tempdir, 'test'), 'wb') 291 temp.write(self.data) 292 temp.close() 293 294 def tearDown(self): 295 try: 296 os.chdir(self.cwd) 297 try: 298 shutil.rmtree(self.tempdir) 299 except OSError: 300 pass 301 finally: 302 BaseTestCase.tearDown(self) 303 304 def check_status_and_reason(self, response, status, data=None): 305 body = response.read() 306 self.assertTrue(response) 307 self.assertEqual(response.status, status) 308 self.assertIsNotNone(response.reason) 309 if data: 310 self.assertEqual(data, body) 311 312 def test_get(self): 313 #constructs the path relative to the root directory of the HTTPServer 314 response = self.request(self.tempdir_name + '/test') 315 self.check_status_and_reason(response, 200, data=self.data) 316 response = self.request(self.tempdir_name + '/') 317 self.check_status_and_reason(response, 200) 318 response = self.request(self.tempdir_name) 319 self.check_status_and_reason(response, 301) 320 response = self.request('/ThisDoesNotExist') 321 self.check_status_and_reason(response, 404) 322 response = self.request('/' + 'ThisDoesNotExist' + '/') 323 self.check_status_and_reason(response, 404) 324 f = open(os.path.join(self.tempdir_name, 'index.html'), 'w') 325 response = self.request('/' + self.tempdir_name + '/') 326 self.check_status_and_reason(response, 200) 327 328 # chmod() doesn't work as expected on Windows, and filesystem 329 # permissions are ignored by root on Unix. 330 if os.name == 'posix' and os.geteuid() != 0: 331 os.chmod(self.tempdir, 0) 332 response = self.request(self.tempdir_name + '/') 333 self.check_status_and_reason(response, 404) 334 os.chmod(self.tempdir, 0755) 335 336 def test_head(self): 337 response = self.request( 338 self.tempdir_name + '/test', method='HEAD') 339 self.check_status_and_reason(response, 200) 340 self.assertEqual(response.getheader('content-length'), 341 str(len(self.data))) 342 self.assertEqual(response.getheader('content-type'), 343 'application/octet-stream') 344 345 def test_invalid_requests(self): 346 response = self.request('/', method='FOO') 347 self.check_status_and_reason(response, 501) 348 # requests must be case sensitive,so this should fail too 349 response = self.request('/', method='get') 350 self.check_status_and_reason(response, 501) 351 response = self.request('/', method='GETs') 352 self.check_status_and_reason(response, 501) 353 354 355 cgi_file1 = """\ 356 #!%s 357 358 print "Content-type: text/html" 359 print 360 print "Hello World" 361 """ 362 363 cgi_file2 = """\ 364 #!%s 365 import cgi 366 367 print "Content-type: text/html" 368 print 369 370 form = cgi.FieldStorage() 371 print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), 372 form.getfirst("bacon")) 373 """ 374 375 376 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 377 "This test can't be run reliably as root (issue #13308).") 378 class CGIHTTPServerTestCase(BaseTestCase): 379 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): 380 pass 381 382 def setUp(self): 383 BaseTestCase.setUp(self) 384 self.parent_dir = tempfile.mkdtemp() 385 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') 386 os.mkdir(self.cgi_dir) 387 388 # The shebang line should be pure ASCII: use symlink if possible. 389 # See issue #7668. 390 if hasattr(os, 'symlink'): 391 self.pythonexe = os.path.join(self.parent_dir, 'python') 392 os.symlink(sys.executable, self.pythonexe) 393 else: 394 self.pythonexe = sys.executable 395 396 self.file1_path = os.path.join(self.cgi_dir, 'file1.py') 397 with open(self.file1_path, 'w') as file1: 398 file1.write(cgi_file1 % self.pythonexe) 399 os.chmod(self.file1_path, 0777) 400 401 self.file2_path = os.path.join(self.cgi_dir, 'file2.py') 402 with open(self.file2_path, 'w') as file2: 403 file2.write(cgi_file2 % self.pythonexe) 404 os.chmod(self.file2_path, 0777) 405 406 self.cwd = os.getcwd() 407 os.chdir(self.parent_dir) 408 409 def tearDown(self): 410 try: 411 os.chdir(self.cwd) 412 if self.pythonexe != sys.executable: 413 os.remove(self.pythonexe) 414 os.remove(self.file1_path) 415 os.remove(self.file2_path) 416 os.rmdir(self.cgi_dir) 417 os.rmdir(self.parent_dir) 418 finally: 419 BaseTestCase.tearDown(self) 420 421 def test_url_collapse_path(self): 422 # verify tail is the last portion and head is the rest on proper urls 423 test_vectors = { 424 '': '//', 425 '..': IndexError, 426 '/.//..': IndexError, 427 '/': '//', 428 '//': '//', 429 '/\\': '//\\', 430 '/.//': '//', 431 'cgi-bin/file1.py': '/cgi-bin/file1.py', 432 '/cgi-bin/file1.py': '/cgi-bin/file1.py', 433 'a': '//a', 434 '/a': '//a', 435 '//a': '//a', 436 './a': '//a', 437 './C:/': '/C:/', 438 '/a/b': '/a/b', 439 '/a/b/': '/a/b/', 440 '/a/b/.': '/a/b/', 441 '/a/b/c/..': '/a/b/', 442 '/a/b/c/../d': '/a/b/d', 443 '/a/b/c/../d/e/../f': '/a/b/d/f', 444 '/a/b/c/../d/e/../../f': '/a/b/f', 445 '/a/b/c/../d/e/.././././..//f': '/a/b/f', 446 '../a/b/c/../d/e/.././././..//f': IndexError, 447 '/a/b/c/../d/e/../../../f': '/a/f', 448 '/a/b/c/../d/e/../../../../f': '//f', 449 '/a/b/c/../d/e/../../../../../f': IndexError, 450 '/a/b/c/../d/e/../../../../f/..': '//', 451 '/a/b/c/../d/e/../../../../f/../.': '//', 452 } 453 for path, expected in test_vectors.iteritems(): 454 if isinstance(expected, type) and issubclass(expected, Exception): 455 self.assertRaises(expected, 456 CGIHTTPServer._url_collapse_path, path) 457 else: 458 actual = CGIHTTPServer._url_collapse_path(path) 459 self.assertEqual(expected, actual, 460 msg='path = %r\nGot: %r\nWanted: %r' % 461 (path, actual, expected)) 462 463 def test_headers_and_content(self): 464 res = self.request('/cgi-bin/file1.py') 465 self.assertEqual(('Hello World\n', 'text/html', 200), 466 (res.read(), res.getheader('Content-type'), res.status)) 467 468 def test_post(self): 469 params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) 470 headers = {'Content-type' : 'application/x-www-form-urlencoded'} 471 res = self.request('/cgi-bin/file2.py', 'POST', params, headers) 472 473 self.assertEqual(res.read(), '1, python, 123456\n') 474 475 def test_invaliduri(self): 476 res = self.request('/cgi-bin/invalid') 477 res.read() 478 self.assertEqual(res.status, 404) 479 480 def test_authorization(self): 481 headers = {'Authorization' : 'Basic %s' % 482 base64.b64encode('username:pass')} 483 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) 484 self.assertEqual(('Hello World\n', 'text/html', 200), 485 (res.read(), res.getheader('Content-type'), res.status)) 486 487 def test_no_leading_slash(self): 488 # http://bugs.python.org/issue2254 489 res = self.request('cgi-bin/file1.py') 490 self.assertEqual(('Hello World\n', 'text/html', 200), 491 (res.read(), res.getheader('Content-type'), res.status)) 492 493 def test_os_environ_is_not_altered(self): 494 signature = "Test CGI Server" 495 os.environ['SERVER_SOFTWARE'] = signature 496 res = self.request('/cgi-bin/file1.py') 497 self.assertEqual((b'Hello World\n', 'text/html', 200), 498 (res.read(), res.getheader('Content-type'), res.status)) 499 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) 500 501 502 class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): 503 """ Test url parsing """ 504 def setUp(self): 505 self.translated = os.getcwd() 506 self.translated = os.path.join(self.translated, 'filename') 507 self.handler = SocketlessRequestHandler() 508 509 def test_query_arguments(self): 510 path = self.handler.translate_path('/filename') 511 self.assertEqual(path, self.translated) 512 path = self.handler.translate_path('/filename?foo=bar') 513 self.assertEqual(path, self.translated) 514 path = self.handler.translate_path('/filename?a=b&spam=eggs#zot') 515 self.assertEqual(path, self.translated) 516 517 def test_start_with_double_slash(self): 518 path = self.handler.translate_path('//filename') 519 self.assertEqual(path, self.translated) 520 path = self.handler.translate_path('//filename?foo=bar') 521 self.assertEqual(path, self.translated) 522 523 524 def test_main(verbose=None): 525 try: 526 cwd = os.getcwd() 527 test_support.run_unittest(BaseHTTPRequestHandlerTestCase, 528 SimpleHTTPRequestHandlerTestCase, 529 BaseHTTPServerTestCase, 530 SimpleHTTPServerTestCase, 531 CGIHTTPServerTestCase 532 ) 533 finally: 534 os.chdir(cwd) 535 536 if __name__ == '__main__': 537 test_main() 538