1 #!/usr/bin/env python 2 3 import urlparse 4 import urllib2 5 import BaseHTTPServer 6 import unittest 7 import hashlib 8 9 from test import test_support 10 11 mimetools = test_support.import_module('mimetools', deprecated=True) 12 threading = test_support.import_module('threading') 13 14 # Loopback http server infrastructure 15 16 class LoopbackHttpServer(BaseHTTPServer.HTTPServer): 17 """HTTP server w/ a few modifications that make it useful for 18 loopback testing purposes. 19 """ 20 21 def __init__(self, server_address, RequestHandlerClass): 22 BaseHTTPServer.HTTPServer.__init__(self, 23 server_address, 24 RequestHandlerClass) 25 26 # Set the timeout of our listening socket really low so 27 # that we can stop the server easily. 28 self.socket.settimeout(1.0) 29 30 def get_request(self): 31 """BaseHTTPServer method, overridden.""" 32 33 request, client_address = self.socket.accept() 34 35 # It's a loopback connection, so setting the timeout 36 # really low shouldn't affect anything, but should make 37 # deadlocks less likely to occur. 38 request.settimeout(10.0) 39 40 return (request, client_address) 41 42 class LoopbackHttpServerThread(threading.Thread): 43 """Stoppable thread that runs a loopback http server.""" 44 45 def __init__(self, request_handler): 46 threading.Thread.__init__(self) 47 self._stop = False 48 self.ready = threading.Event() 49 request_handler.protocol_version = "HTTP/1.0" 50 self.httpd = LoopbackHttpServer(('127.0.0.1', 0), 51 request_handler) 52 #print "Serving HTTP on %s port %s" % (self.httpd.server_name, 53 # self.httpd.server_port) 54 self.port = self.httpd.server_port 55 56 def stop(self): 57 """Stops the webserver if it's currently running.""" 58 59 # Set the stop flag. 60 self._stop = True 61 62 self.join() 63 64 def run(self): 65 self.ready.set() 66 while not self._stop: 67 self.httpd.handle_request() 68 69 # Authentication infrastructure 70 71 class DigestAuthHandler: 72 """Handler for performing digest authentication.""" 73 74 def __init__(self): 75 self._request_num = 0 76 self._nonces = [] 77 self._users = {} 78 self._realm_name = "Test Realm" 79 self._qop = "auth" 80 81 def set_qop(self, qop): 82 self._qop = qop 83 84 def set_users(self, users): 85 assert isinstance(users, dict) 86 self._users = users 87 88 def set_realm(self, realm): 89 self._realm_name = realm 90 91 def _generate_nonce(self): 92 self._request_num += 1 93 nonce = hashlib.md5(str(self._request_num)).hexdigest() 94 self._nonces.append(nonce) 95 return nonce 96 97 def _create_auth_dict(self, auth_str): 98 first_space_index = auth_str.find(" ") 99 auth_str = auth_str[first_space_index+1:] 100 101 parts = auth_str.split(",") 102 103 auth_dict = {} 104 for part in parts: 105 name, value = part.split("=") 106 name = name.strip() 107 if value[0] == '"' and value[-1] == '"': 108 value = value[1:-1] 109 else: 110 value = value.strip() 111 auth_dict[name] = value 112 return auth_dict 113 114 def _validate_auth(self, auth_dict, password, method, uri): 115 final_dict = {} 116 final_dict.update(auth_dict) 117 final_dict["password"] = password 118 final_dict["method"] = method 119 final_dict["uri"] = uri 120 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict 121 HA1 = hashlib.md5(HA1_str).hexdigest() 122 HA2_str = "%(method)s:%(uri)s" % final_dict 123 HA2 = hashlib.md5(HA2_str).hexdigest() 124 final_dict["HA1"] = HA1 125 final_dict["HA2"] = HA2 126 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ 127 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict 128 response = hashlib.md5(response_str).hexdigest() 129 130 return response == auth_dict["response"] 131 132 def _return_auth_challenge(self, request_handler): 133 request_handler.send_response(407, "Proxy Authentication Required") 134 request_handler.send_header("Content-Type", "text/html") 135 request_handler.send_header( 136 'Proxy-Authenticate', 'Digest realm="%s", ' 137 'qop="%s",' 138 'nonce="%s", ' % \ 139 (self._realm_name, self._qop, self._generate_nonce())) 140 # XXX: Not sure if we're supposed to add this next header or 141 # not. 142 #request_handler.send_header('Connection', 'close') 143 request_handler.end_headers() 144 request_handler.wfile.write("Proxy Authentication Required.") 145 return False 146 147 def handle_request(self, request_handler): 148 """Performs digest authentication on the given HTTP request 149 handler. Returns True if authentication was successful, False 150 otherwise. 151 152 If no users have been set, then digest auth is effectively 153 disabled and this method will always return True. 154 """ 155 156 if len(self._users) == 0: 157 return True 158 159 if 'Proxy-Authorization' not in request_handler.headers: 160 return self._return_auth_challenge(request_handler) 161 else: 162 auth_dict = self._create_auth_dict( 163 request_handler.headers['Proxy-Authorization'] 164 ) 165 if auth_dict["username"] in self._users: 166 password = self._users[ auth_dict["username"] ] 167 else: 168 return self._return_auth_challenge(request_handler) 169 if not auth_dict.get("nonce") in self._nonces: 170 return self._return_auth_challenge(request_handler) 171 else: 172 self._nonces.remove(auth_dict["nonce"]) 173 174 auth_validated = False 175 176 # MSIE uses short_path in its validation, but Python's 177 # urllib2 uses the full path, so we're going to see if 178 # either of them works here. 179 180 for path in [request_handler.path, request_handler.short_path]: 181 if self._validate_auth(auth_dict, 182 password, 183 request_handler.command, 184 path): 185 auth_validated = True 186 187 if not auth_validated: 188 return self._return_auth_challenge(request_handler) 189 return True 190 191 # Proxy test infrastructure 192 193 class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): 194 """This is a 'fake proxy' that makes it look like the entire 195 internet has gone down due to a sudden zombie invasion. It main 196 utility is in providing us with authentication support for 197 testing. 198 """ 199 200 def __init__(self, digest_auth_handler, *args, **kwargs): 201 # This has to be set before calling our parent's __init__(), which will 202 # try to call do_GET(). 203 self.digest_auth_handler = digest_auth_handler 204 BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 205 206 def log_message(self, format, *args): 207 # Uncomment the next line for debugging. 208 #sys.stderr.write(format % args) 209 pass 210 211 def do_GET(self): 212 (scm, netloc, path, params, query, fragment) = urlparse.urlparse( 213 self.path, 'http') 214 self.short_path = path 215 if self.digest_auth_handler.handle_request(self): 216 self.send_response(200, "OK") 217 self.send_header("Content-Type", "text/html") 218 self.end_headers() 219 self.wfile.write("You've reached %s!<BR>" % self.path) 220 self.wfile.write("Our apologies, but our server is down due to " 221 "a sudden zombie invasion.") 222 223 # Test cases 224 225 class BaseTestCase(unittest.TestCase): 226 def setUp(self): 227 self._threads = test_support.threading_setup() 228 229 def tearDown(self): 230 test_support.threading_cleanup(*self._threads) 231 232 233 class ProxyAuthTests(BaseTestCase): 234 URL = "http://localhost" 235 236 USER = "tester" 237 PASSWD = "test123" 238 REALM = "TestRealm" 239 240 def setUp(self): 241 super(ProxyAuthTests, self).setUp() 242 self.digest_auth_handler = DigestAuthHandler() 243 self.digest_auth_handler.set_users({self.USER: self.PASSWD}) 244 self.digest_auth_handler.set_realm(self.REALM) 245 def create_fake_proxy_handler(*args, **kwargs): 246 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) 247 248 self.server = LoopbackHttpServerThread(create_fake_proxy_handler) 249 self.server.start() 250 self.server.ready.wait() 251 proxy_url = "http://127.0.0.1:%d" % self.server.port 252 handler = urllib2.ProxyHandler({"http" : proxy_url}) 253 self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler() 254 self.opener = urllib2.build_opener(handler, self.proxy_digest_handler) 255 256 def tearDown(self): 257 self.server.stop() 258 super(ProxyAuthTests, self).tearDown() 259 260 def test_proxy_with_bad_password_raises_httperror(self): 261 self.proxy_digest_handler.add_password(self.REALM, self.URL, 262 self.USER, self.PASSWD+"bad") 263 self.digest_auth_handler.set_qop("auth") 264 self.assertRaises(urllib2.HTTPError, 265 self.opener.open, 266 self.URL) 267 268 def test_proxy_with_no_password_raises_httperror(self): 269 self.digest_auth_handler.set_qop("auth") 270 self.assertRaises(urllib2.HTTPError, 271 self.opener.open, 272 self.URL) 273 274 def test_proxy_qop_auth_works(self): 275 self.proxy_digest_handler.add_password(self.REALM, self.URL, 276 self.USER, self.PASSWD) 277 self.digest_auth_handler.set_qop("auth") 278 result = self.opener.open(self.URL) 279 while result.read(): 280 pass 281 result.close() 282 283 def test_proxy_qop_auth_int_works_or_throws_urlerror(self): 284 self.proxy_digest_handler.add_password(self.REALM, self.URL, 285 self.USER, self.PASSWD) 286 self.digest_auth_handler.set_qop("auth-int") 287 try: 288 result = self.opener.open(self.URL) 289 except urllib2.URLError: 290 # It's okay if we don't support auth-int, but we certainly 291 # shouldn't receive any kind of exception here other than 292 # a URLError. 293 result = None 294 if result: 295 while result.read(): 296 pass 297 result.close() 298 299 300 def GetRequestHandler(responses): 301 302 class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): 303 304 server_version = "TestHTTP/" 305 requests = [] 306 headers_received = [] 307 port = 80 308 309 def do_GET(self): 310 body = self.send_head() 311 if body: 312 self.wfile.write(body) 313 314 def do_POST(self): 315 content_length = self.headers['Content-Length'] 316 post_data = self.rfile.read(int(content_length)) 317 self.do_GET() 318 self.requests.append(post_data) 319 320 def send_head(self): 321 FakeHTTPRequestHandler.headers_received = self.headers 322 self.requests.append(self.path) 323 response_code, headers, body = responses.pop(0) 324 325 self.send_response(response_code) 326 327 for (header, value) in headers: 328 self.send_header(header, value % self.port) 329 if body: 330 self.send_header('Content-type', 'text/plain') 331 self.end_headers() 332 return body 333 self.end_headers() 334 335 def log_message(self, *args): 336 pass 337 338 339 return FakeHTTPRequestHandler 340 341 342 class TestUrlopen(BaseTestCase): 343 """Tests urllib2.urlopen using the network. 344 345 These tests are not exhaustive. Assuming that testing using files does a 346 good job overall of some of the basic interface features. There are no 347 tests exercising the optional 'data' and 'proxies' arguments. No tests 348 for transparent redirection have been written. 349 """ 350 351 def setUp(self): 352 proxy_handler = urllib2.ProxyHandler({}) 353 opener = urllib2.build_opener(proxy_handler) 354 urllib2.install_opener(opener) 355 super(TestUrlopen, self).setUp() 356 357 def start_server(self, responses): 358 handler = GetRequestHandler(responses) 359 360 self.server = LoopbackHttpServerThread(handler) 361 self.server.start() 362 self.server.ready.wait() 363 port = self.server.port 364 handler.port = port 365 return handler 366 367 368 def test_redirection(self): 369 expected_response = 'We got here...' 370 responses = [ 371 (302, [('Location', 'http://localhost:%s/somewhere_else')], ''), 372 (200, [], expected_response) 373 ] 374 375 handler = self.start_server(responses) 376 377 try: 378 f = urllib2.urlopen('http://localhost:%s/' % handler.port) 379 data = f.read() 380 f.close() 381 382 self.assertEqual(data, expected_response) 383 self.assertEqual(handler.requests, ['/', '/somewhere_else']) 384 finally: 385 self.server.stop() 386 387 388 def test_404(self): 389 expected_response = 'Bad bad bad...' 390 handler = self.start_server([(404, [], expected_response)]) 391 392 try: 393 try: 394 urllib2.urlopen('http://localhost:%s/weeble' % handler.port) 395 except urllib2.URLError, f: 396 pass 397 else: 398 self.fail('404 should raise URLError') 399 400 data = f.read() 401 f.close() 402 403 self.assertEqual(data, expected_response) 404 self.assertEqual(handler.requests, ['/weeble']) 405 finally: 406 self.server.stop() 407 408 409 def test_200(self): 410 expected_response = 'pycon 2008...' 411 handler = self.start_server([(200, [], expected_response)]) 412 413 try: 414 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port) 415 data = f.read() 416 f.close() 417 418 self.assertEqual(data, expected_response) 419 self.assertEqual(handler.requests, ['/bizarre']) 420 finally: 421 self.server.stop() 422 423 def test_200_with_parameters(self): 424 expected_response = 'pycon 2008...' 425 handler = self.start_server([(200, [], expected_response)]) 426 427 try: 428 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling') 429 data = f.read() 430 f.close() 431 432 self.assertEqual(data, expected_response) 433 self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling']) 434 finally: 435 self.server.stop() 436 437 438 def test_sending_headers(self): 439 handler = self.start_server([(200, [], "we don't care")]) 440 441 try: 442 req = urllib2.Request("http://localhost:%s/" % handler.port, 443 headers={'Range': 'bytes=20-39'}) 444 urllib2.urlopen(req) 445 self.assertEqual(handler.headers_received['Range'], 'bytes=20-39') 446 finally: 447 self.server.stop() 448 449 def test_basic(self): 450 handler = self.start_server([(200, [], "we don't care")]) 451 452 try: 453 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) 454 for attr in ("read", "close", "info", "geturl"): 455 self.assertTrue(hasattr(open_url, attr), "object returned from " 456 "urlopen lacks the %s attribute" % attr) 457 try: 458 self.assertTrue(open_url.read(), "calling 'read' failed") 459 finally: 460 open_url.close() 461 finally: 462 self.server.stop() 463 464 def test_info(self): 465 handler = self.start_server([(200, [], "we don't care")]) 466 467 try: 468 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) 469 info_obj = open_url.info() 470 self.assertIsInstance(info_obj, mimetools.Message, 471 "object returned by 'info' is not an " 472 "instance of mimetools.Message") 473 self.assertEqual(info_obj.getsubtype(), "plain") 474 finally: 475 self.server.stop() 476 477 def test_geturl(self): 478 # Make sure same URL as opened is returned by geturl. 479 handler = self.start_server([(200, [], "we don't care")]) 480 481 try: 482 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) 483 url = open_url.geturl() 484 self.assertEqual(url, "http://localhost:%s" % handler.port) 485 finally: 486 self.server.stop() 487 488 489 def test_bad_address(self): 490 # Make sure proper exception is raised when connecting to a bogus 491 # address. 492 493 # as indicated by the comment below, this might fail with some ISP, 494 # so we run the test only when -unetwork/-uall is specified to 495 # mitigate the problem a bit (see #17564) 496 test_support.requires('network') 497 self.assertRaises(IOError, 498 # Given that both VeriSign and various ISPs have in 499 # the past or are presently hijacking various invalid 500 # domain name requests in an attempt to boost traffic 501 # to their own sites, finding a domain name to use 502 # for this test is difficult. RFC2606 leads one to 503 # believe that '.invalid' should work, but experience 504 # seemed to indicate otherwise. Single character 505 # TLDs are likely to remain invalid, so this seems to 506 # be the best choice. The trailing '.' prevents a 507 # related problem: The normal DNS resolver appends 508 # the domain names from the search path if there is 509 # no '.' the end and, and if one of those domains 510 # implements a '*' rule a result is returned. 511 # However, none of this will prevent the test from 512 # failing if the ISP hijacks all invalid domain 513 # requests. The real solution would be to be able to 514 # parameterize the framework with a mock resolver. 515 urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./") 516 517 def test_iteration(self): 518 expected_response = "pycon 2008..." 519 handler = self.start_server([(200, [], expected_response)]) 520 try: 521 data = urllib2.urlopen("http://localhost:%s" % handler.port) 522 for line in data: 523 self.assertEqual(line, expected_response) 524 finally: 525 self.server.stop() 526 527 def ztest_line_iteration(self): 528 lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"] 529 expected_response = "".join(lines) 530 handler = self.start_server([(200, [], expected_response)]) 531 try: 532 data = urllib2.urlopen("http://localhost:%s" % handler.port) 533 for index, line in enumerate(data): 534 self.assertEqual(line, lines[index], 535 "Fetched line number %s doesn't match expected:\n" 536 " Expected length was %s, got %s" % 537 (index, len(lines[index]), len(line))) 538 finally: 539 self.server.stop() 540 self.assertEqual(index + 1, len(lines)) 541 542 def test_main(): 543 # We will NOT depend on the network resource flag 544 # (Lib/test/regrtest.py -u network) since all tests here are only 545 # localhost. However, if this is a bad rationale, then uncomment 546 # the next line. 547 #test_support.requires("network") 548 549 test_support.run_unittest(ProxyAuthTests, TestUrlopen) 550 551 if __name__ == "__main__": 552 test_main() 553