1 #!/usr/bin/env python3 2 """A set of unit tests for httplib2.py.""" 3 4 __author__ = "Joe Gregorio (joe (at] bitworking.org)" 5 __copyright__ = "Copyright 2006, Joe Gregorio" 6 __contributors__ = ["Mark Pilgrim"] 7 __license__ = "MIT" 8 __version__ = "0.2 ($Rev: 118 $)" 9 10 import base64 11 import http.client 12 import httplib2 13 import io 14 import os 15 import pickle 16 import socket 17 import ssl 18 import sys 19 import time 20 import unittest 21 import urllib.parse 22 23 base = "http://bitworking.org/projects/httplib2/test/" 24 cacheDirName = ".cache" 25 26 27 class CredentialsTest(unittest.TestCase): 28 def test(self): 29 c = httplib2.Credentials() 30 c.add("joe", "password") 31 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0]) 32 self.assertEqual(("joe", "password"), list(c.iter(""))[0]) 33 c.add("fred", "password2", "wellformedweb.org") 34 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0]) 35 self.assertEqual(1, len(list(c.iter("bitworking.org")))) 36 self.assertEqual(2, len(list(c.iter("wellformedweb.org")))) 37 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org"))) 38 c.clear() 39 self.assertEqual(0, len(list(c.iter("bitworking.org")))) 40 c.add("fred", "password2", "wellformedweb.org") 41 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org"))) 42 self.assertEqual(0, len(list(c.iter("bitworking.org")))) 43 self.assertEqual(0, len(list(c.iter("")))) 44 45 46 class ParserTest(unittest.TestCase): 47 def testFromStd66(self): 48 self.assertEqual( 49 ("http", "example.com", "", None, None), 50 httplib2.parse_uri("http://example.com"), 51 ) 52 self.assertEqual( 53 ("https", "example.com", "", None, None), 54 httplib2.parse_uri("https://example.com"), 55 ) 56 self.assertEqual( 57 ("https", "example.com:8080", "", None, None), 58 httplib2.parse_uri("https://example.com:8080"), 59 ) 60 self.assertEqual( 61 ("http", "example.com", "/", None, None), 62 httplib2.parse_uri("http://example.com/"), 63 ) 64 self.assertEqual( 65 ("http", "example.com", "/path", None, None), 66 httplib2.parse_uri("http://example.com/path"), 67 ) 68 self.assertEqual( 69 ("http", "example.com", "/path", "a=1&b=2", None), 70 httplib2.parse_uri("http://example.com/path?a=1&b=2"), 71 ) 72 self.assertEqual( 73 ("http", "example.com", "/path", "a=1&b=2", "fred"), 74 httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"), 75 ) 76 self.assertEqual( 77 ("http", "example.com", "/path", "a=1&b=2", "fred"), 78 httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"), 79 ) 80 81 82 class UrlNormTest(unittest.TestCase): 83 def test(self): 84 self.assertEqual( 85 "http://example.org/", httplib2.urlnorm("http://example.org")[-1] 86 ) 87 self.assertEqual( 88 "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1] 89 ) 90 self.assertEqual( 91 "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1] 92 ) 93 self.assertEqual( 94 "http://example.org/mypath?a=b", 95 httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1], 96 ) 97 self.assertEqual( 98 "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1] 99 ) 100 self.assertEqual( 101 httplib2.urlnorm("http://localhost:80/"), 102 httplib2.urlnorm("HTTP://LOCALHOST:80"), 103 ) 104 try: 105 httplib2.urlnorm("/") 106 self.fail("Non-absolute URIs should raise an exception") 107 except httplib2.RelativeURIError: 108 pass 109 110 111 class UrlSafenameTest(unittest.TestCase): 112 def test(self): 113 # Test that different URIs end up generating different safe names 114 self.assertEqual( 115 "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", 116 httplib2.safename("http://example.org/fred/?a=b"), 117 ) 118 self.assertEqual( 119 "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", 120 httplib2.safename("http://example.org/fred?/a=b"), 121 ) 122 self.assertEqual( 123 "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", 124 httplib2.safename("http://www.example.org/fred?/a=b"), 125 ) 126 self.assertEqual( 127 httplib2.safename(httplib2.urlnorm("http://www")[-1]), 128 httplib2.safename(httplib2.urlnorm("http://WWW")[-1]), 129 ) 130 self.assertEqual( 131 "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", 132 httplib2.safename("https://www.example.org/fred?/a=b"), 133 ) 134 self.assertNotEqual( 135 httplib2.safename("http://www"), httplib2.safename("https://www") 136 ) 137 # Test the max length limits 138 uri = "http://" + ("w" * 200) + ".org" 139 uri2 = "http://" + ("w" * 201) + ".org" 140 self.assertNotEqual(httplib2.safename(uri2), httplib2.safename(uri)) 141 # Max length should be 200 + 1 (",") + 32 142 self.assertEqual(233, len(httplib2.safename(uri2))) 143 self.assertEqual(233, len(httplib2.safename(uri))) 144 # Unicode 145 if sys.version_info >= (2, 3): 146 self.assertEqual( 147 "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", 148 httplib2.safename("http://\u2304.org/fred/?a=b"), 149 ) 150 151 152 class _MyResponse(io.BytesIO): 153 def __init__(self, body, **kwargs): 154 io.BytesIO.__init__(self, body) 155 self.headers = kwargs 156 157 def items(self): 158 return self.headers.items() 159 160 def iteritems(self): 161 return iter(self.headers.items()) 162 163 164 class _MyHTTPConnection(object): 165 "This class is just a mock of httplib.HTTPConnection used for testing" 166 167 def __init__( 168 self, 169 host, 170 port=None, 171 key_file=None, 172 cert_file=None, 173 strict=None, 174 timeout=None, 175 proxy_info=None, 176 ): 177 self.host = host 178 self.port = port 179 self.timeout = timeout 180 self.log = "" 181 self.sock = None 182 183 def set_debuglevel(self, level): 184 pass 185 186 def connect(self): 187 "Connect to a host on a given port." 188 pass 189 190 def close(self): 191 pass 192 193 def request(self, method, request_uri, body, headers): 194 pass 195 196 def getresponse(self): 197 return _MyResponse(b"the body", status="200") 198 199 200 class _MyHTTPBadStatusConnection(object): 201 "Mock of httplib.HTTPConnection that raises BadStatusLine." 202 203 num_calls = 0 204 205 def __init__( 206 self, 207 host, 208 port=None, 209 key_file=None, 210 cert_file=None, 211 strict=None, 212 timeout=None, 213 proxy_info=None, 214 ): 215 self.host = host 216 self.port = port 217 self.timeout = timeout 218 self.log = "" 219 self.sock = None 220 _MyHTTPBadStatusConnection.num_calls = 0 221 222 def set_debuglevel(self, level): 223 pass 224 225 def connect(self): 226 pass 227 228 def close(self): 229 pass 230 231 def request(self, method, request_uri, body, headers): 232 pass 233 234 def getresponse(self): 235 _MyHTTPBadStatusConnection.num_calls += 1 236 raise http.client.BadStatusLine("") 237 238 239 class HttpTest(unittest.TestCase): 240 def setUp(self): 241 if os.path.exists(cacheDirName): 242 [ 243 os.remove(os.path.join(cacheDirName, file)) 244 for file in os.listdir(cacheDirName) 245 ] 246 self.http = httplib2.Http(cacheDirName) 247 self.http.clear_credentials() 248 249 def testIPv6NoSSL(self): 250 try: 251 self.http.request("http://[::1]/") 252 except socket.gaierror: 253 self.fail("should get the address family right for IPv6") 254 except socket.error: 255 # Even if IPv6 isn't installed on a machine it should just raise socket.error 256 pass 257 258 def testIPv6SSL(self): 259 try: 260 self.http.request("https://[::1]/") 261 except socket.gaierror: 262 self.fail("should get the address family right for IPv6") 263 except socket.error: 264 # Even if IPv6 isn't installed on a machine it should just raise socket.error 265 pass 266 267 def testConnectionType(self): 268 self.http.force_exception_to_status_code = False 269 response, content = self.http.request( 270 "http://bitworking.org", connection_type=_MyHTTPConnection 271 ) 272 self.assertEqual(response["content-location"], "http://bitworking.org") 273 self.assertEqual(content, b"the body") 274 275 def testBadStatusLineRetry(self): 276 old_retries = httplib2.RETRIES 277 httplib2.RETRIES = 1 278 self.http.force_exception_to_status_code = False 279 try: 280 response, content = self.http.request( 281 "http://bitworking.org", connection_type=_MyHTTPBadStatusConnection 282 ) 283 except http.client.BadStatusLine: 284 self.assertEqual(2, _MyHTTPBadStatusConnection.num_calls) 285 httplib2.RETRIES = old_retries 286 287 def testGetUnknownServer(self): 288 self.http.force_exception_to_status_code = False 289 try: 290 self.http.request("http://fred.bitworking.org/") 291 self.fail( 292 "An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server." 293 ) 294 except httplib2.ServerNotFoundError: 295 pass 296 297 # Now test with exceptions turned off 298 self.http.force_exception_to_status_code = True 299 300 (response, content) = self.http.request("http://fred.bitworking.org/") 301 self.assertEqual(response["content-type"], "text/plain") 302 self.assertTrue(content.startswith(b"Unable to find")) 303 self.assertEqual(response.status, 400) 304 305 def testGetConnectionRefused(self): 306 self.http.force_exception_to_status_code = False 307 try: 308 self.http.request("http://localhost:7777/") 309 self.fail("An socket.error exception must be thrown on Connection Refused.") 310 except socket.error: 311 pass 312 313 # Now test with exceptions turned off 314 self.http.force_exception_to_status_code = True 315 316 (response, content) = self.http.request("http://localhost:7777/") 317 self.assertEqual(response["content-type"], "text/plain") 318 self.assertTrue(b"Connection refused" in content) 319 self.assertEqual(response.status, 400) 320 321 def testGetIRI(self): 322 if sys.version_info >= (2, 3): 323 uri = urllib.parse.urljoin( 324 base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}" 325 ) 326 (response, content) = self.http.request(uri, "GET") 327 d = self.reflector(content) 328 self.assertTrue("QUERY_STRING" in d) 329 self.assertTrue(d["QUERY_STRING"].find("%D0%82") > 0) 330 331 def testGetIsDefaultMethod(self): 332 # Test that GET is the default method 333 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi") 334 (response, content) = self.http.request(uri) 335 self.assertEqual(response["x-method"], "GET") 336 337 def testDifferentMethods(self): 338 # Test that all methods can be used 339 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi") 340 for method in ["GET", "PUT", "DELETE", "POST"]: 341 (response, content) = self.http.request(uri, method, body=b" ") 342 self.assertEqual(response["x-method"], method) 343 344 def testHeadRead(self): 345 # Test that we don't try to read the response of a HEAD request 346 # since httplib blocks response.read() for HEAD requests. 347 # Oddly enough this doesn't appear as a problem when doing HEAD requests 348 # against Apache servers. 349 uri = "http://www.google.com/" 350 (response, content) = self.http.request(uri, "HEAD") 351 self.assertEqual(response.status, 200) 352 self.assertEqual(content, b"") 353 354 def testGetNoCache(self): 355 # Test that can do a GET w/o the cache turned on. 356 http = httplib2.Http() 357 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 358 (response, content) = http.request(uri, "GET") 359 self.assertEqual(response.status, 200) 360 self.assertEqual(response.previous, None) 361 362 def testGetOnlyIfCachedCacheHit(self): 363 # Test that can do a GET with cache and 'only-if-cached' 364 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 365 (response, content) = self.http.request(uri, "GET") 366 (response, content) = self.http.request( 367 uri, "GET", headers={"cache-control": "only-if-cached"} 368 ) 369 self.assertEqual(response.fromcache, True) 370 self.assertEqual(response.status, 200) 371 372 def testGetOnlyIfCachedCacheMiss(self): 373 # Test that can do a GET with no cache with 'only-if-cached' 374 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 375 (response, content) = self.http.request( 376 uri, "GET", headers={"cache-control": "only-if-cached"} 377 ) 378 self.assertEqual(response.fromcache, False) 379 self.assertEqual(response.status, 504) 380 381 def testGetOnlyIfCachedNoCacheAtAll(self): 382 # Test that can do a GET with no cache with 'only-if-cached' 383 # Of course, there might be an intermediary beyond us 384 # that responds to the 'only-if-cached', so this 385 # test can't really be guaranteed to pass. 386 http = httplib2.Http() 387 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 388 (response, content) = http.request( 389 uri, "GET", headers={"cache-control": "only-if-cached"} 390 ) 391 self.assertEqual(response.fromcache, False) 392 self.assertEqual(response.status, 504) 393 394 def testUserAgent(self): 395 # Test that we provide a default user-agent 396 uri = urllib.parse.urljoin(base, "user-agent/test.cgi") 397 (response, content) = self.http.request(uri, "GET") 398 self.assertEqual(response.status, 200) 399 self.assertTrue(content.startswith(b"Python-httplib2/")) 400 401 def testUserAgentNonDefault(self): 402 # Test that the default user-agent can be over-ridden 403 404 uri = urllib.parse.urljoin(base, "user-agent/test.cgi") 405 (response, content) = self.http.request( 406 uri, "GET", headers={"User-Agent": "fred/1.0"} 407 ) 408 self.assertEqual(response.status, 200) 409 self.assertTrue(content.startswith(b"fred/1.0")) 410 411 def testGet300WithLocation(self): 412 # Test the we automatically follow 300 redirects if a Location: header is provided 413 uri = urllib.parse.urljoin(base, "300/with-location-header.asis") 414 (response, content) = self.http.request(uri, "GET") 415 self.assertEqual(response.status, 200) 416 self.assertEqual(content, b"This is the final destination.\n") 417 self.assertEqual(response.previous.status, 300) 418 self.assertEqual(response.previous.fromcache, False) 419 420 # Confirm that the intermediate 300 is not cached 421 (response, content) = self.http.request(uri, "GET") 422 self.assertEqual(response.status, 200) 423 self.assertEqual(content, b"This is the final destination.\n") 424 self.assertEqual(response.previous.status, 300) 425 self.assertEqual(response.previous.fromcache, False) 426 427 def testGet300WithLocationNoRedirect(self): 428 # Test the we automatically follow 300 redirects if a Location: header is provided 429 self.http.follow_redirects = False 430 uri = urllib.parse.urljoin(base, "300/with-location-header.asis") 431 (response, content) = self.http.request(uri, "GET") 432 self.assertEqual(response.status, 300) 433 434 def testGet300WithoutLocation(self): 435 # Not giving a Location: header in a 300 response is acceptable 436 # In which case we just return the 300 response 437 uri = urllib.parse.urljoin(base, "300/without-location-header.asis") 438 (response, content) = self.http.request(uri, "GET") 439 self.assertEqual(response.status, 300) 440 self.assertTrue(response["content-type"].startswith("text/html")) 441 self.assertEqual(response.previous, None) 442 443 def testGet301(self): 444 # Test that we automatically follow 301 redirects 445 # and that we cache the 301 response 446 uri = urllib.parse.urljoin(base, "301/onestep.asis") 447 destination = urllib.parse.urljoin(base, "302/final-destination.txt") 448 (response, content) = self.http.request(uri, "GET") 449 self.assertEqual(response.status, 200) 450 self.assertTrue("content-location" in response) 451 self.assertEqual(response["content-location"], destination) 452 self.assertEqual(content, b"This is the final destination.\n") 453 self.assertEqual(response.previous.status, 301) 454 self.assertEqual(response.previous.fromcache, False) 455 456 (response, content) = self.http.request(uri, "GET") 457 self.assertEqual(response.status, 200) 458 self.assertEqual(response["content-location"], destination) 459 self.assertEqual(content, b"This is the final destination.\n") 460 self.assertEqual(response.previous.status, 301) 461 self.assertEqual(response.previous.fromcache, True) 462 463 def testHead301(self): 464 # Test that we automatically follow 301 redirects 465 uri = urllib.parse.urljoin(base, "301/onestep.asis") 466 (response, content) = self.http.request(uri, "HEAD") 467 self.assertEqual(response.status, 200) 468 self.assertEqual(response.previous.status, 301) 469 self.assertEqual(response.previous.fromcache, False) 470 471 def testGet301NoRedirect(self): 472 # Test that we automatically follow 301 redirects 473 # and that we cache the 301 response 474 self.http.follow_redirects = False 475 uri = urllib.parse.urljoin(base, "301/onestep.asis") 476 destination = urllib.parse.urljoin(base, "302/final-destination.txt") 477 (response, content) = self.http.request(uri, "GET") 478 self.assertEqual(response.status, 301) 479 480 def testGet302(self): 481 # Test that we automatically follow 302 redirects 482 # and that we DO NOT cache the 302 response 483 uri = urllib.parse.urljoin(base, "302/onestep.asis") 484 destination = urllib.parse.urljoin(base, "302/final-destination.txt") 485 (response, content) = self.http.request(uri, "GET") 486 self.assertEqual(response.status, 200) 487 self.assertEqual(response["content-location"], destination) 488 self.assertEqual(content, b"This is the final destination.\n") 489 self.assertEqual(response.previous.status, 302) 490 self.assertEqual(response.previous.fromcache, False) 491 492 uri = urllib.parse.urljoin(base, "302/onestep.asis") 493 (response, content) = self.http.request(uri, "GET") 494 self.assertEqual(response.status, 200) 495 self.assertEqual(response.fromcache, True) 496 self.assertEqual(response["content-location"], destination) 497 self.assertEqual(content, b"This is the final destination.\n") 498 self.assertEqual(response.previous.status, 302) 499 self.assertEqual(response.previous.fromcache, False) 500 self.assertEqual(response.previous["content-location"], uri) 501 502 uri = urllib.parse.urljoin(base, "302/twostep.asis") 503 504 (response, content) = self.http.request(uri, "GET") 505 self.assertEqual(response.status, 200) 506 self.assertEqual(response.fromcache, True) 507 self.assertEqual(content, b"This is the final destination.\n") 508 self.assertEqual(response.previous.status, 302) 509 self.assertEqual(response.previous.fromcache, False) 510 511 def testGet302RedirectionLimit(self): 512 # Test that we can set a lower redirection limit 513 # and that we raise an exception when we exceed 514 # that limit. 515 self.http.force_exception_to_status_code = False 516 517 uri = urllib.parse.urljoin(base, "302/twostep.asis") 518 try: 519 (response, content) = self.http.request(uri, "GET", redirections=1) 520 self.fail("This should not happen") 521 except httplib2.RedirectLimit: 522 pass 523 except Exception as e: 524 self.fail("Threw wrong kind of exception ") 525 526 # Re-run the test with out the exceptions 527 self.http.force_exception_to_status_code = True 528 529 (response, content) = self.http.request(uri, "GET", redirections=1) 530 self.assertEqual(response.status, 500) 531 self.assertTrue(response.reason.startswith("Redirected more")) 532 self.assertEqual("302", response["status"]) 533 self.assertTrue(content.startswith(b"<html>")) 534 self.assertTrue(response.previous != None) 535 536 def testGet302NoLocation(self): 537 # Test that we throw an exception when we get 538 # a 302 with no Location: header. 539 self.http.force_exception_to_status_code = False 540 uri = urllib.parse.urljoin(base, "302/no-location.asis") 541 try: 542 (response, content) = self.http.request(uri, "GET") 543 self.fail("Should never reach here") 544 except httplib2.RedirectMissingLocation: 545 pass 546 except Exception as e: 547 self.fail("Threw wrong kind of exception ") 548 549 # Re-run the test with out the exceptions 550 self.http.force_exception_to_status_code = True 551 552 (response, content) = self.http.request(uri, "GET") 553 self.assertEqual(response.status, 500) 554 self.assertTrue(response.reason.startswith("Redirected but")) 555 self.assertEqual("302", response["status"]) 556 self.assertTrue(content.startswith(b"This is content")) 557 558 def testGet301ViaHttps(self): 559 # Google always redirects to http://google.com 560 (response, content) = self.http.request("https://code.google.com/apis/", "GET") 561 self.assertEqual(200, response.status) 562 self.assertEqual(301, response.previous.status) 563 564 def testGetViaHttps(self): 565 # Test that we can handle HTTPS 566 (response, content) = self.http.request("https://google.com/adsense/", "GET") 567 self.assertEqual(200, response.status) 568 569 def testGetViaHttpsSpecViolationOnLocation(self): 570 # Test that we follow redirects through HTTPS 571 # even if they violate the spec by including 572 # a relative Location: header instead of an 573 # absolute one. 574 (response, content) = self.http.request("https://google.com/adsense", "GET") 575 self.assertEqual(200, response.status) 576 self.assertNotEqual(None, response.previous) 577 578 def testGetViaHttpsKeyCert(self): 579 # At this point I can only test 580 # that the key and cert files are passed in 581 # correctly to httplib. It would be nice to have 582 # a real https endpoint to test against. 583 http = httplib2.Http(timeout=2) 584 585 http.add_certificate("akeyfile", "acertfile", "bitworking.org") 586 try: 587 (response, content) = http.request("https://bitworking.org", "GET") 588 except AttributeError: 589 self.assertEqual( 590 http.connections["https:bitworking.org"].key_file, "akeyfile" 591 ) 592 self.assertEqual( 593 http.connections["https:bitworking.org"].cert_file, "acertfile" 594 ) 595 except IOError: 596 # Skip on 3.2 597 pass 598 599 try: 600 (response, content) = http.request("https://notthere.bitworking.org", "GET") 601 except httplib2.ServerNotFoundError: 602 self.assertEqual( 603 http.connections["https:notthere.bitworking.org"].key_file, None 604 ) 605 self.assertEqual( 606 http.connections["https:notthere.bitworking.org"].cert_file, None 607 ) 608 except IOError: 609 # Skip on 3.2 610 pass 611 612 def testSslCertValidation(self): 613 # Test that we get an ssl.SSLError when specifying a non-existent CA 614 # certs file. 615 http = httplib2.Http(ca_certs="/nosuchfile") 616 self.assertRaises(IOError, http.request, "https://www.google.com/", "GET") 617 618 # Test that we get a SSLHandshakeError if we try to access 619 # https://www.google.com, using a CA cert file that doesn't contain 620 # the CA Google uses (i.e., simulating a cert that's not signed by a 621 # trusted CA). 622 other_ca_certs = os.path.join( 623 os.path.dirname(os.path.abspath(httplib2.__file__)), 624 "test", 625 "other_cacerts.txt", 626 ) 627 http = httplib2.Http(ca_certs=other_ca_certs) 628 self.assertRaises(ssl.SSLError, http.request, "https://www.google.com/", "GET") 629 630 def testSniHostnameValidation(self): 631 self.http.request("https://google.com/", method="GET") 632 633 def testGet303(self): 634 # Do a follow-up GET on a Location: header 635 # returned from a POST that gave a 303. 636 uri = urllib.parse.urljoin(base, "303/303.cgi") 637 (response, content) = self.http.request(uri, "POST", " ") 638 self.assertEqual(response.status, 200) 639 self.assertEqual(content, b"This is the final destination.\n") 640 self.assertEqual(response.previous.status, 303) 641 642 def testGet303NoRedirect(self): 643 # Do a follow-up GET on a Location: header 644 # returned from a POST that gave a 303. 645 self.http.follow_redirects = False 646 uri = urllib.parse.urljoin(base, "303/303.cgi") 647 (response, content) = self.http.request(uri, "POST", " ") 648 self.assertEqual(response.status, 303) 649 650 def test303ForDifferentMethods(self): 651 # Test that all methods can be used 652 uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi") 653 for (method, method_on_303) in [ 654 ("PUT", "GET"), 655 ("DELETE", "GET"), 656 ("POST", "GET"), 657 ("GET", "GET"), 658 ("HEAD", "GET"), 659 ]: 660 (response, content) = self.http.request(uri, method, body=b" ") 661 self.assertEqual(response["x-method"], method_on_303) 662 663 def testGet304(self): 664 # Test that we use ETags properly to validate our cache 665 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 666 (response, content) = self.http.request( 667 uri, "GET", headers={"accept-encoding": "identity"} 668 ) 669 self.assertNotEqual(response["etag"], "") 670 671 (response, content) = self.http.request( 672 uri, "GET", headers={"accept-encoding": "identity"} 673 ) 674 (response, content) = self.http.request( 675 uri, 676 "GET", 677 headers={"accept-encoding": "identity", "cache-control": "must-revalidate"}, 678 ) 679 self.assertEqual(response.status, 200) 680 self.assertEqual(response.fromcache, True) 681 682 cache_file_name = os.path.join( 683 cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]) 684 ) 685 f = open(cache_file_name, "r") 686 status_line = f.readline() 687 f.close() 688 689 self.assertTrue(status_line.startswith("status:")) 690 691 (response, content) = self.http.request( 692 uri, "HEAD", headers={"accept-encoding": "identity"} 693 ) 694 self.assertEqual(response.status, 200) 695 self.assertEqual(response.fromcache, True) 696 697 (response, content) = self.http.request( 698 uri, "GET", headers={"accept-encoding": "identity", "range": "bytes=0-0"} 699 ) 700 self.assertEqual(response.status, 206) 701 self.assertEqual(response.fromcache, False) 702 703 def testGetIgnoreEtag(self): 704 # Test that we can forcibly ignore ETags 705 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi") 706 (response, content) = self.http.request( 707 uri, "GET", headers={"accept-encoding": "identity"} 708 ) 709 self.assertNotEqual(response["etag"], "") 710 711 (response, content) = self.http.request( 712 uri, 713 "GET", 714 headers={"accept-encoding": "identity", "cache-control": "max-age=0"}, 715 ) 716 d = self.reflector(content) 717 self.assertTrue("HTTP_IF_NONE_MATCH" in d) 718 719 self.http.ignore_etag = True 720 (response, content) = self.http.request( 721 uri, 722 "GET", 723 headers={"accept-encoding": "identity", "cache-control": "max-age=0"}, 724 ) 725 d = self.reflector(content) 726 self.assertEqual(response.fromcache, False) 727 self.assertFalse("HTTP_IF_NONE_MATCH" in d) 728 729 def testOverrideEtag(self): 730 # Test that we can forcibly ignore ETags 731 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi") 732 (response, content) = self.http.request( 733 uri, "GET", headers={"accept-encoding": "identity"} 734 ) 735 self.assertNotEqual(response["etag"], "") 736 737 (response, content) = self.http.request( 738 uri, 739 "GET", 740 headers={"accept-encoding": "identity", "cache-control": "max-age=0"}, 741 ) 742 d = self.reflector(content) 743 self.assertTrue("HTTP_IF_NONE_MATCH" in d) 744 self.assertNotEqual(d["HTTP_IF_NONE_MATCH"], "fred") 745 746 (response, content) = self.http.request( 747 uri, 748 "GET", 749 headers={ 750 "accept-encoding": "identity", 751 "cache-control": "max-age=0", 752 "if-none-match": "fred", 753 }, 754 ) 755 d = self.reflector(content) 756 self.assertTrue("HTTP_IF_NONE_MATCH" in d) 757 self.assertEqual(d["HTTP_IF_NONE_MATCH"], "fred") 758 759 # MAP-commented this out because it consistently fails 760 # def testGet304EndToEnd(self): 761 # # Test that end to end headers get overwritten in the cache 762 # uri = urllib.parse.urljoin(base, "304/end2end.cgi") 763 # (response, content) = self.http.request(uri, "GET") 764 # self.assertNotEqual(response['etag'], "") 765 # old_date = response['date'] 766 # time.sleep(2) 767 # 768 # (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'}) 769 # # The response should be from the cache, but the Date: header should be updated. 770 # new_date = response['date'] 771 # self.assertNotEqual(new_date, old_date) 772 # self.assertEqual(response.status, 200) 773 # self.assertEqual(response.fromcache, True) 774 775 def testGet304LastModified(self): 776 # Test that we can still handle a 304 777 # by only using the last-modified cache validator. 778 uri = urllib.parse.urljoin( 779 base, "304/last-modified-only/last-modified-only.txt" 780 ) 781 (response, content) = self.http.request(uri, "GET") 782 783 self.assertNotEqual(response["last-modified"], "") 784 (response, content) = self.http.request(uri, "GET") 785 (response, content) = self.http.request(uri, "GET") 786 self.assertEqual(response.status, 200) 787 self.assertEqual(response.fromcache, True) 788 789 def testGet307(self): 790 # Test that we do follow 307 redirects but 791 # do not cache the 307 792 uri = urllib.parse.urljoin(base, "307/onestep.asis") 793 (response, content) = self.http.request(uri, "GET") 794 self.assertEqual(response.status, 200) 795 self.assertEqual(content, b"This is the final destination.\n") 796 self.assertEqual(response.previous.status, 307) 797 self.assertEqual(response.previous.fromcache, False) 798 799 (response, content) = self.http.request(uri, "GET") 800 self.assertEqual(response.status, 200) 801 self.assertEqual(response.fromcache, True) 802 self.assertEqual(content, b"This is the final destination.\n") 803 self.assertEqual(response.previous.status, 307) 804 self.assertEqual(response.previous.fromcache, False) 805 806 def testGet410(self): 807 # Test that we pass 410's through 808 uri = urllib.parse.urljoin(base, "410/410.asis") 809 (response, content) = self.http.request(uri, "GET") 810 self.assertEqual(response.status, 410) 811 812 def testVaryHeaderSimple(self): 813 """RFC 2616 13.6 When the cache receives a subsequent request whose Request-URI specifies one or more cache entries including a Vary header field, the cache MUST NOT use such a cache entry to construct a response to the new request unless all of the selecting request-headers present in the new request match the corresponding stored request-headers in the original request. 814 815 """ 816 # test that the vary header is sent 817 uri = urllib.parse.urljoin(base, "vary/accept.asis") 818 (response, content) = self.http.request( 819 uri, "GET", headers={"Accept": "text/plain"} 820 ) 821 self.assertEqual(response.status, 200) 822 self.assertTrue("vary" in response) 823 824 # get the resource again, from the cache since accept header in this 825 # request is the same as the request 826 (response, content) = self.http.request( 827 uri, "GET", headers={"Accept": "text/plain"} 828 ) 829 self.assertEqual(response.status, 200) 830 self.assertEqual(response.fromcache, True, msg="Should be from cache") 831 832 # get the resource again, not from cache since Accept headers does not match 833 (response, content) = self.http.request( 834 uri, "GET", headers={"Accept": "text/html"} 835 ) 836 self.assertEqual(response.status, 200) 837 self.assertEqual(response.fromcache, False, msg="Should not be from cache") 838 839 # get the resource again, without any Accept header, so again no match 840 (response, content) = self.http.request(uri, "GET") 841 self.assertEqual(response.status, 200) 842 self.assertEqual(response.fromcache, False, msg="Should not be from cache") 843 844 def testNoVary(self): 845 pass 846 # when there is no vary, a different Accept header (e.g.) should not 847 # impact if the cache is used 848 # test that the vary header is not sent 849 # uri = urllib.parse.urljoin(base, "vary/no-vary.asis") 850 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'}) 851 # self.assertEqual(response.status, 200) 852 # self.assertFalse('vary' in response) 853 # 854 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'}) 855 # self.assertEqual(response.status, 200) 856 # self.assertEqual(response.fromcache, True, msg="Should be from cache") 857 # 858 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'}) 859 # self.assertEqual(response.status, 200) 860 # self.assertEqual(response.fromcache, True, msg="Should be from cache") 861 862 def testVaryHeaderDouble(self): 863 uri = urllib.parse.urljoin(base, "vary/accept-double.asis") 864 (response, content) = self.http.request( 865 uri, 866 "GET", 867 headers={ 868 "Accept": "text/plain", 869 "Accept-Language": "da, en-gb;q=0.8, en;q=0.7", 870 }, 871 ) 872 self.assertEqual(response.status, 200) 873 self.assertTrue("vary" in response) 874 875 # we are from cache 876 (response, content) = self.http.request( 877 uri, 878 "GET", 879 headers={ 880 "Accept": "text/plain", 881 "Accept-Language": "da, en-gb;q=0.8, en;q=0.7", 882 }, 883 ) 884 self.assertEqual(response.fromcache, True, msg="Should be from cache") 885 886 (response, content) = self.http.request( 887 uri, "GET", headers={"Accept": "text/plain"} 888 ) 889 self.assertEqual(response.status, 200) 890 self.assertEqual(response.fromcache, False) 891 892 # get the resource again, not from cache, varied headers don't match exact 893 (response, content) = self.http.request( 894 uri, "GET", headers={"Accept-Language": "da"} 895 ) 896 self.assertEqual(response.status, 200) 897 self.assertEqual(response.fromcache, False, msg="Should not be from cache") 898 899 def testVaryUnusedHeader(self): 900 # A header's value is not considered to vary if it's not used at all. 901 uri = urllib.parse.urljoin(base, "vary/unused-header.asis") 902 (response, content) = self.http.request( 903 uri, "GET", headers={"Accept": "text/plain"} 904 ) 905 self.assertEqual(response.status, 200) 906 self.assertTrue("vary" in response) 907 908 # we are from cache 909 (response, content) = self.http.request( 910 uri, "GET", headers={"Accept": "text/plain"} 911 ) 912 self.assertEqual(response.fromcache, True, msg="Should be from cache") 913 914 def testHeadGZip(self): 915 # Test that we don't try to decompress a HEAD response 916 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt") 917 (response, content) = self.http.request(uri, "HEAD") 918 self.assertEqual(response.status, 200) 919 self.assertNotEqual(int(response["content-length"]), 0) 920 self.assertEqual(content, b"") 921 922 def testGetGZip(self): 923 # Test that we support gzip compression 924 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt") 925 (response, content) = self.http.request(uri, "GET") 926 self.assertEqual(response.status, 200) 927 self.assertFalse("content-encoding" in response) 928 self.assertTrue("-content-encoding" in response) 929 self.assertEqual( 930 int(response["content-length"]), len(b"This is the final destination.\n") 931 ) 932 self.assertEqual(content, b"This is the final destination.\n") 933 934 def testPostAndGZipResponse(self): 935 uri = urllib.parse.urljoin(base, "gzip/post.cgi") 936 (response, content) = self.http.request(uri, "POST", body=" ") 937 self.assertEqual(response.status, 200) 938 self.assertFalse("content-encoding" in response) 939 self.assertTrue("-content-encoding" in response) 940 941 def testGetGZipFailure(self): 942 # Test that we raise a good exception when the gzip fails 943 self.http.force_exception_to_status_code = False 944 uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis") 945 try: 946 (response, content) = self.http.request(uri, "GET") 947 self.fail("Should never reach here") 948 except httplib2.FailedToDecompressContent: 949 pass 950 except Exception: 951 self.fail("Threw wrong kind of exception") 952 953 # Re-run the test with out the exceptions 954 self.http.force_exception_to_status_code = True 955 956 (response, content) = self.http.request(uri, "GET") 957 self.assertEqual(response.status, 500) 958 self.assertTrue(response.reason.startswith("Content purported")) 959 960 def testIndividualTimeout(self): 961 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi") 962 http = httplib2.Http(timeout=1) 963 http.force_exception_to_status_code = True 964 965 (response, content) = http.request(uri) 966 self.assertEqual(response.status, 408) 967 self.assertTrue(response.reason.startswith("Request Timeout")) 968 self.assertTrue(content.startswith(b"Request Timeout")) 969 970 def testGetDeflate(self): 971 # Test that we support deflate compression 972 uri = urllib.parse.urljoin(base, "deflate/deflated.asis") 973 (response, content) = self.http.request(uri, "GET") 974 self.assertEqual(response.status, 200) 975 self.assertFalse("content-encoding" in response) 976 self.assertEqual( 977 int(response["content-length"]), len("This is the final destination.") 978 ) 979 self.assertEqual(content, b"This is the final destination.") 980 981 def testGetDeflateFailure(self): 982 # Test that we raise a good exception when the deflate fails 983 self.http.force_exception_to_status_code = False 984 985 uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis") 986 try: 987 (response, content) = self.http.request(uri, "GET") 988 self.fail("Should never reach here") 989 except httplib2.FailedToDecompressContent: 990 pass 991 except Exception: 992 self.fail("Threw wrong kind of exception") 993 994 # Re-run the test with out the exceptions 995 self.http.force_exception_to_status_code = True 996 997 (response, content) = self.http.request(uri, "GET") 998 self.assertEqual(response.status, 500) 999 self.assertTrue(response.reason.startswith("Content purported")) 1000 1001 def testGetDuplicateHeaders(self): 1002 # Test that duplicate headers get concatenated via ',' 1003 uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis") 1004 (response, content) = self.http.request(uri, "GET") 1005 self.assertEqual(response.status, 200) 1006 self.assertEqual(content, b"This is content\n") 1007 self.assertEqual( 1008 response["link"].split(",")[0], 1009 '<http://bitworking.org>; rel="home"; title="BitWorking"', 1010 ) 1011 1012 def testGetCacheControlNoCache(self): 1013 # Test Cache-Control: no-cache on requests 1014 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 1015 (response, content) = self.http.request( 1016 uri, "GET", headers={"accept-encoding": "identity"} 1017 ) 1018 self.assertNotEqual(response["etag"], "") 1019 (response, content) = self.http.request( 1020 uri, "GET", headers={"accept-encoding": "identity"} 1021 ) 1022 self.assertEqual(response.status, 200) 1023 self.assertEqual(response.fromcache, True) 1024 1025 (response, content) = self.http.request( 1026 uri, 1027 "GET", 1028 headers={"accept-encoding": "identity", "Cache-Control": "no-cache"}, 1029 ) 1030 self.assertEqual(response.status, 200) 1031 self.assertEqual(response.fromcache, False) 1032 1033 def testGetCacheControlPragmaNoCache(self): 1034 # Test Pragma: no-cache on requests 1035 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 1036 (response, content) = self.http.request( 1037 uri, "GET", headers={"accept-encoding": "identity"} 1038 ) 1039 self.assertNotEqual(response["etag"], "") 1040 (response, content) = self.http.request( 1041 uri, "GET", headers={"accept-encoding": "identity"} 1042 ) 1043 self.assertEqual(response.status, 200) 1044 self.assertEqual(response.fromcache, True) 1045 1046 (response, content) = self.http.request( 1047 uri, "GET", headers={"accept-encoding": "identity", "Pragma": "no-cache"} 1048 ) 1049 self.assertEqual(response.status, 200) 1050 self.assertEqual(response.fromcache, False) 1051 1052 def testGetCacheControlNoStoreRequest(self): 1053 # A no-store request means that the response should not be stored. 1054 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 1055 1056 (response, content) = self.http.request( 1057 uri, "GET", headers={"Cache-Control": "no-store"} 1058 ) 1059 self.assertEqual(response.status, 200) 1060 self.assertEqual(response.fromcache, False) 1061 1062 (response, content) = self.http.request( 1063 uri, "GET", headers={"Cache-Control": "no-store"} 1064 ) 1065 self.assertEqual(response.status, 200) 1066 self.assertEqual(response.fromcache, False) 1067 1068 def testGetCacheControlNoStoreResponse(self): 1069 # A no-store response means that the response should not be stored. 1070 uri = urllib.parse.urljoin(base, "no-store/no-store.asis") 1071 1072 (response, content) = self.http.request(uri, "GET") 1073 self.assertEqual(response.status, 200) 1074 self.assertEqual(response.fromcache, False) 1075 1076 (response, content) = self.http.request(uri, "GET") 1077 self.assertEqual(response.status, 200) 1078 self.assertEqual(response.fromcache, False) 1079 1080 def testGetCacheControlNoCacheNoStoreRequest(self): 1081 # Test that a no-store, no-cache clears the entry from the cache 1082 # even if it was cached previously. 1083 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 1084 1085 (response, content) = self.http.request(uri, "GET") 1086 (response, content) = self.http.request(uri, "GET") 1087 self.assertEqual(response.fromcache, True) 1088 (response, content) = self.http.request( 1089 uri, "GET", headers={"Cache-Control": "no-store, no-cache"} 1090 ) 1091 (response, content) = self.http.request( 1092 uri, "GET", headers={"Cache-Control": "no-store, no-cache"} 1093 ) 1094 self.assertEqual(response.status, 200) 1095 self.assertEqual(response.fromcache, False) 1096 1097 def testUpdateInvalidatesCache(self): 1098 # Test that calling PUT or DELETE on a 1099 # URI that is cache invalidates that cache. 1100 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 1101 1102 (response, content) = self.http.request(uri, "GET") 1103 (response, content) = self.http.request(uri, "GET") 1104 self.assertEqual(response.fromcache, True) 1105 (response, content) = self.http.request(uri, "DELETE") 1106 self.assertEqual(response.status, 405) 1107 1108 (response, content) = self.http.request(uri, "GET") 1109 self.assertEqual(response.fromcache, False) 1110 1111 def testUpdateUsesCachedETag(self): 1112 # Test that we natively support http://www.w3.org/1999/04/Editing/ 1113 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") 1114 1115 (response, content) = self.http.request(uri, "GET") 1116 self.assertEqual(response.status, 200) 1117 self.assertEqual(response.fromcache, False) 1118 (response, content) = self.http.request(uri, "GET") 1119 self.assertEqual(response.status, 200) 1120 self.assertEqual(response.fromcache, True) 1121 (response, content) = self.http.request(uri, "PUT", body="foo") 1122 self.assertEqual(response.status, 200) 1123 (response, content) = self.http.request(uri, "PUT", body="foo") 1124 self.assertEqual(response.status, 412) 1125 1126 def testUpdatePatchUsesCachedETag(self): 1127 # Test that we natively support http://www.w3.org/1999/04/Editing/ 1128 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") 1129 1130 (response, content) = self.http.request(uri, "GET") 1131 self.assertEqual(response.status, 200) 1132 self.assertEqual(response.fromcache, False) 1133 (response, content) = self.http.request(uri, "GET") 1134 self.assertEqual(response.status, 200) 1135 self.assertEqual(response.fromcache, True) 1136 (response, content) = self.http.request(uri, "PATCH", body="foo") 1137 self.assertEqual(response.status, 200) 1138 (response, content) = self.http.request(uri, "PATCH", body="foo") 1139 self.assertEqual(response.status, 412) 1140 1141 def testUpdateUsesCachedETagAndOCMethod(self): 1142 # Test that we natively support http://www.w3.org/1999/04/Editing/ 1143 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") 1144 1145 (response, content) = self.http.request(uri, "GET") 1146 self.assertEqual(response.status, 200) 1147 self.assertEqual(response.fromcache, False) 1148 (response, content) = self.http.request(uri, "GET") 1149 self.assertEqual(response.status, 200) 1150 self.assertEqual(response.fromcache, True) 1151 self.http.optimistic_concurrency_methods.append("DELETE") 1152 (response, content) = self.http.request(uri, "DELETE") 1153 self.assertEqual(response.status, 200) 1154 1155 def testUpdateUsesCachedETagOverridden(self): 1156 # Test that we natively support http://www.w3.org/1999/04/Editing/ 1157 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") 1158 1159 (response, content) = self.http.request(uri, "GET") 1160 self.assertEqual(response.status, 200) 1161 self.assertEqual(response.fromcache, False) 1162 (response, content) = self.http.request(uri, "GET") 1163 self.assertEqual(response.status, 200) 1164 self.assertEqual(response.fromcache, True) 1165 (response, content) = self.http.request( 1166 uri, "PUT", body="foo", headers={"if-match": "fred"} 1167 ) 1168 self.assertEqual(response.status, 412) 1169 1170 def testBasicAuth(self): 1171 # Test Basic Authentication 1172 uri = urllib.parse.urljoin(base, "basic/file.txt") 1173 (response, content) = self.http.request(uri, "GET") 1174 self.assertEqual(response.status, 401) 1175 1176 uri = urllib.parse.urljoin(base, "basic/") 1177 (response, content) = self.http.request(uri, "GET") 1178 self.assertEqual(response.status, 401) 1179 1180 self.http.add_credentials("joe", "password") 1181 (response, content) = self.http.request(uri, "GET") 1182 self.assertEqual(response.status, 200) 1183 1184 uri = urllib.parse.urljoin(base, "basic/file.txt") 1185 (response, content) = self.http.request(uri, "GET") 1186 self.assertEqual(response.status, 200) 1187 1188 def testBasicAuthWithDomain(self): 1189 # Test Basic Authentication 1190 uri = urllib.parse.urljoin(base, "basic/file.txt") 1191 (response, content) = self.http.request(uri, "GET") 1192 self.assertEqual(response.status, 401) 1193 1194 uri = urllib.parse.urljoin(base, "basic/") 1195 (response, content) = self.http.request(uri, "GET") 1196 self.assertEqual(response.status, 401) 1197 1198 self.http.add_credentials("joe", "password", "example.org") 1199 (response, content) = self.http.request(uri, "GET") 1200 self.assertEqual(response.status, 401) 1201 1202 uri = urllib.parse.urljoin(base, "basic/file.txt") 1203 (response, content) = self.http.request(uri, "GET") 1204 self.assertEqual(response.status, 401) 1205 1206 domain = urllib.parse.urlparse(base)[1] 1207 self.http.add_credentials("joe", "password", domain) 1208 (response, content) = self.http.request(uri, "GET") 1209 self.assertEqual(response.status, 200) 1210 1211 uri = urllib.parse.urljoin(base, "basic/file.txt") 1212 (response, content) = self.http.request(uri, "GET") 1213 self.assertEqual(response.status, 200) 1214 1215 def testBasicAuthTwoDifferentCredentials(self): 1216 # Test Basic Authentication with multiple sets of credentials 1217 uri = urllib.parse.urljoin(base, "basic2/file.txt") 1218 (response, content) = self.http.request(uri, "GET") 1219 self.assertEqual(response.status, 401) 1220 1221 uri = urllib.parse.urljoin(base, "basic2/") 1222 (response, content) = self.http.request(uri, "GET") 1223 self.assertEqual(response.status, 401) 1224 1225 self.http.add_credentials("fred", "barney") 1226 (response, content) = self.http.request(uri, "GET") 1227 self.assertEqual(response.status, 200) 1228 1229 uri = urllib.parse.urljoin(base, "basic2/file.txt") 1230 (response, content) = self.http.request(uri, "GET") 1231 self.assertEqual(response.status, 200) 1232 1233 def testBasicAuthNested(self): 1234 # Test Basic Authentication with resources 1235 # that are nested 1236 uri = urllib.parse.urljoin(base, "basic-nested/") 1237 (response, content) = self.http.request(uri, "GET") 1238 self.assertEqual(response.status, 401) 1239 1240 uri = urllib.parse.urljoin(base, "basic-nested/subdir") 1241 (response, content) = self.http.request(uri, "GET") 1242 self.assertEqual(response.status, 401) 1243 1244 # Now add in credentials one at a time and test. 1245 self.http.add_credentials("joe", "password") 1246 1247 uri = urllib.parse.urljoin(base, "basic-nested/") 1248 (response, content) = self.http.request(uri, "GET") 1249 self.assertEqual(response.status, 200) 1250 1251 uri = urllib.parse.urljoin(base, "basic-nested/subdir") 1252 (response, content) = self.http.request(uri, "GET") 1253 self.assertEqual(response.status, 401) 1254 1255 self.http.add_credentials("fred", "barney") 1256 1257 uri = urllib.parse.urljoin(base, "basic-nested/") 1258 (response, content) = self.http.request(uri, "GET") 1259 self.assertEqual(response.status, 200) 1260 1261 uri = urllib.parse.urljoin(base, "basic-nested/subdir") 1262 (response, content) = self.http.request(uri, "GET") 1263 self.assertEqual(response.status, 200) 1264 1265 def testDigestAuth(self): 1266 # Test that we support Digest Authentication 1267 uri = urllib.parse.urljoin(base, "digest/") 1268 (response, content) = self.http.request(uri, "GET") 1269 self.assertEqual(response.status, 401) 1270 1271 self.http.add_credentials("joe", "password") 1272 (response, content) = self.http.request(uri, "GET") 1273 self.assertEqual(response.status, 200) 1274 1275 uri = urllib.parse.urljoin(base, "digest/file.txt") 1276 (response, content) = self.http.request(uri, "GET") 1277 1278 def testDigestAuthNextNonceAndNC(self): 1279 # Test that if the server sets nextnonce that we reset 1280 # the nonce count back to 1 1281 uri = urllib.parse.urljoin(base, "digest/file.txt") 1282 self.http.add_credentials("joe", "password") 1283 (response, content) = self.http.request( 1284 uri, "GET", headers={"cache-control": "no-cache"} 1285 ) 1286 info = httplib2._parse_www_authenticate(response, "authentication-info") 1287 self.assertEqual(response.status, 200) 1288 (response, content) = self.http.request( 1289 uri, "GET", headers={"cache-control": "no-cache"} 1290 ) 1291 info2 = httplib2._parse_www_authenticate(response, "authentication-info") 1292 self.assertEqual(response.status, 200) 1293 1294 if "nextnonce" in info: 1295 self.assertEqual(info2["nc"], 1) 1296 1297 def testDigestAuthStale(self): 1298 # Test that we can handle a nonce becoming stale 1299 uri = urllib.parse.urljoin(base, "digest-expire/file.txt") 1300 self.http.add_credentials("joe", "password") 1301 (response, content) = self.http.request( 1302 uri, "GET", headers={"cache-control": "no-cache"} 1303 ) 1304 info = httplib2._parse_www_authenticate(response, "authentication-info") 1305 self.assertEqual(response.status, 200) 1306 1307 time.sleep(3) 1308 # Sleep long enough that the nonce becomes stale 1309 1310 (response, content) = self.http.request( 1311 uri, "GET", headers={"cache-control": "no-cache"} 1312 ) 1313 self.assertFalse(response.fromcache) 1314 self.assertTrue(response._stale_digest) 1315 info3 = httplib2._parse_www_authenticate(response, "authentication-info") 1316 self.assertEqual(response.status, 200) 1317 1318 def reflector(self, content): 1319 return dict( 1320 [ 1321 tuple(x.split("=", 1)) 1322 for x in content.decode("utf-8").strip().split("\n") 1323 ] 1324 ) 1325 1326 def testReflector(self): 1327 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi") 1328 (response, content) = self.http.request(uri, "GET") 1329 d = self.reflector(content) 1330 self.assertTrue("HTTP_USER_AGENT" in d) 1331 1332 def testConnectionClose(self): 1333 uri = "http://www.google.com/" 1334 (response, content) = self.http.request(uri, "GET") 1335 for c in self.http.connections.values(): 1336 self.assertNotEqual(None, c.sock) 1337 (response, content) = self.http.request( 1338 uri, "GET", headers={"connection": "close"} 1339 ) 1340 for c in self.http.connections.values(): 1341 self.assertEqual(None, c.sock) 1342 1343 def testPickleHttp(self): 1344 pickled_http = pickle.dumps(self.http) 1345 new_http = pickle.loads(pickled_http) 1346 1347 self.assertEqual( 1348 sorted(new_http.__dict__.keys()), sorted(self.http.__dict__.keys()) 1349 ) 1350 for key in new_http.__dict__: 1351 if key in ("certificates", "credentials"): 1352 self.assertEqual( 1353 new_http.__dict__[key].credentials, 1354 self.http.__dict__[key].credentials, 1355 ) 1356 elif key == "cache": 1357 self.assertEqual( 1358 new_http.__dict__[key].cache, self.http.__dict__[key].cache 1359 ) 1360 else: 1361 self.assertEqual(new_http.__dict__[key], self.http.__dict__[key]) 1362 1363 def testPickleHttpWithConnection(self): 1364 self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection) 1365 pickled_http = pickle.dumps(self.http) 1366 new_http = pickle.loads(pickled_http) 1367 1368 self.assertEqual(list(self.http.connections.keys()), ["http:bitworking.org"]) 1369 self.assertEqual(new_http.connections, {}) 1370 1371 def testPickleCustomRequestHttp(self): 1372 def dummy_request(*args, **kwargs): 1373 return new_request(*args, **kwargs) 1374 1375 dummy_request.dummy_attr = "dummy_value" 1376 1377 self.http.request = dummy_request 1378 pickled_http = pickle.dumps(self.http) 1379 self.assertFalse(b"S'request'" in pickled_http) 1380 1381 1382 try: 1383 import memcache 1384 1385 class HttpTestMemCached(HttpTest): 1386 def setUp(self): 1387 self.cache = memcache.Client(["127.0.0.1:11211"], debug=0) 1388 # self.cache = memcache.Client(['10.0.0.4:11211'], debug=1) 1389 self.http = httplib2.Http(self.cache) 1390 self.cache.flush_all() 1391 # Not exactly sure why the sleep is needed here, but 1392 # if not present then some unit tests that rely on caching 1393 # fail. Memcached seems to lose some sets immediately 1394 # after a flush_all if the set is to a value that 1395 # was previously cached. (Maybe the flush is handled async?) 1396 time.sleep(1) 1397 self.http.clear_credentials() 1398 1399 1400 except: 1401 pass 1402 1403 # ------------------------------------------------------------------------ 1404 1405 1406 class HttpPrivateTest(unittest.TestCase): 1407 def testParseCacheControl(self): 1408 # Test that we can parse the Cache-Control header 1409 self.assertEqual({}, httplib2._parse_cache_control({})) 1410 self.assertEqual( 1411 {"no-cache": 1}, 1412 httplib2._parse_cache_control({"cache-control": " no-cache"}), 1413 ) 1414 cc = httplib2._parse_cache_control( 1415 {"cache-control": " no-cache, max-age = 7200"} 1416 ) 1417 self.assertEqual(cc["no-cache"], 1) 1418 self.assertEqual(cc["max-age"], "7200") 1419 cc = httplib2._parse_cache_control({"cache-control": " , "}) 1420 self.assertEqual(cc[""], 1) 1421 1422 try: 1423 cc = httplib2._parse_cache_control( 1424 {"cache-control": "Max-age=3600;post-check=1800,pre-check=3600"} 1425 ) 1426 self.assertTrue("max-age" in cc) 1427 except: 1428 self.fail("Should not throw exception") 1429 1430 def testNormalizeHeaders(self): 1431 # Test that we normalize headers to lowercase 1432 h = httplib2._normalize_headers({"Cache-Control": "no-cache", "Other": "Stuff"}) 1433 self.assertTrue("cache-control" in h) 1434 self.assertTrue("other" in h) 1435 self.assertEqual("Stuff", h["other"]) 1436 1437 def testConvertByteStr(self): 1438 with self.assertRaises(TypeError): 1439 httplib2._convert_byte_str(4) 1440 self.assertEqual("Hello World", httplib2._convert_byte_str(b"Hello World")) 1441 self.assertEqual("Bye World", httplib2._convert_byte_str("Bye World")) 1442 1443 def testExpirationModelTransparent(self): 1444 # Test that no-cache makes our request TRANSPARENT 1445 response_headers = {"cache-control": "max-age=7200"} 1446 request_headers = {"cache-control": "no-cache"} 1447 self.assertEqual( 1448 "TRANSPARENT", 1449 httplib2._entry_disposition(response_headers, request_headers), 1450 ) 1451 1452 def testMaxAgeNonNumeric(self): 1453 # Test that no-cache makes our request TRANSPARENT 1454 response_headers = {"cache-control": "max-age=fred, min-fresh=barney"} 1455 request_headers = {} 1456 self.assertEqual( 1457 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1458 ) 1459 1460 def testExpirationModelNoCacheResponse(self): 1461 # The date and expires point to an entry that should be 1462 # FRESH, but the no-cache over-rides that. 1463 now = time.time() 1464 response_headers = { 1465 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1466 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)), 1467 "cache-control": "no-cache", 1468 } 1469 request_headers = {} 1470 self.assertEqual( 1471 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1472 ) 1473 1474 def testExpirationModelStaleRequestMustReval(self): 1475 # must-revalidate forces STALE 1476 self.assertEqual( 1477 "STALE", 1478 httplib2._entry_disposition({}, {"cache-control": "must-revalidate"}), 1479 ) 1480 1481 def testExpirationModelStaleResponseMustReval(self): 1482 # must-revalidate forces STALE 1483 self.assertEqual( 1484 "STALE", 1485 httplib2._entry_disposition({"cache-control": "must-revalidate"}, {}), 1486 ) 1487 1488 def testExpirationModelFresh(self): 1489 response_headers = { 1490 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()), 1491 "cache-control": "max-age=2", 1492 } 1493 request_headers = {} 1494 self.assertEqual( 1495 "FRESH", httplib2._entry_disposition(response_headers, request_headers) 1496 ) 1497 time.sleep(3) 1498 self.assertEqual( 1499 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1500 ) 1501 1502 def testExpirationMaxAge0(self): 1503 response_headers = { 1504 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()), 1505 "cache-control": "max-age=0", 1506 } 1507 request_headers = {} 1508 self.assertEqual( 1509 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1510 ) 1511 1512 def testExpirationModelDateAndExpires(self): 1513 now = time.time() 1514 response_headers = { 1515 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1516 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)), 1517 } 1518 request_headers = {} 1519 self.assertEqual( 1520 "FRESH", httplib2._entry_disposition(response_headers, request_headers) 1521 ) 1522 time.sleep(3) 1523 self.assertEqual( 1524 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1525 ) 1526 1527 def testExpiresZero(self): 1528 now = time.time() 1529 response_headers = { 1530 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1531 "expires": "0", 1532 } 1533 request_headers = {} 1534 self.assertEqual( 1535 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1536 ) 1537 1538 def testExpirationModelDateOnly(self): 1539 now = time.time() 1540 response_headers = { 1541 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 3)) 1542 } 1543 request_headers = {} 1544 self.assertEqual( 1545 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1546 ) 1547 1548 def testExpirationModelOnlyIfCached(self): 1549 response_headers = {} 1550 request_headers = {"cache-control": "only-if-cached"} 1551 self.assertEqual( 1552 "FRESH", httplib2._entry_disposition(response_headers, request_headers) 1553 ) 1554 1555 def testExpirationModelMaxAgeBoth(self): 1556 now = time.time() 1557 response_headers = { 1558 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1559 "cache-control": "max-age=2", 1560 } 1561 request_headers = {"cache-control": "max-age=0"} 1562 self.assertEqual( 1563 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1564 ) 1565 1566 def testExpirationModelDateAndExpiresMinFresh1(self): 1567 now = time.time() 1568 response_headers = { 1569 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1570 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)), 1571 } 1572 request_headers = {"cache-control": "min-fresh=2"} 1573 self.assertEqual( 1574 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1575 ) 1576 1577 def testExpirationModelDateAndExpiresMinFresh2(self): 1578 now = time.time() 1579 response_headers = { 1580 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1581 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)), 1582 } 1583 request_headers = {"cache-control": "min-fresh=2"} 1584 self.assertEqual( 1585 "FRESH", httplib2._entry_disposition(response_headers, request_headers) 1586 ) 1587 1588 def testParseWWWAuthenticateEmpty(self): 1589 res = httplib2._parse_www_authenticate({}) 1590 self.assertEqual(len(list(res.keys())), 0) 1591 1592 def testParseWWWAuthenticate(self): 1593 # different uses of spaces around commas 1594 res = httplib2._parse_www_authenticate( 1595 { 1596 "www-authenticate": 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux' 1597 } 1598 ) 1599 self.assertEqual(len(list(res.keys())), 1) 1600 self.assertEqual(len(list(res["test"].keys())), 5) 1601 1602 # tokens with non-alphanum 1603 res = httplib2._parse_www_authenticate( 1604 {"www-authenticate": 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'} 1605 ) 1606 self.assertEqual(len(list(res.keys())), 1) 1607 self.assertEqual(len(list(res["t*!%#st"].keys())), 2) 1608 1609 # quoted string with quoted pairs 1610 res = httplib2._parse_www_authenticate( 1611 {"www-authenticate": 'Test realm="a \\"test\\" realm"'} 1612 ) 1613 self.assertEqual(len(list(res.keys())), 1) 1614 self.assertEqual(res["test"]["realm"], 'a "test" realm') 1615 1616 def testParseWWWAuthenticateStrict(self): 1617 httplib2.USE_WWW_AUTH_STRICT_PARSING = 1 1618 self.testParseWWWAuthenticate() 1619 httplib2.USE_WWW_AUTH_STRICT_PARSING = 0 1620 1621 def testParseWWWAuthenticateBasic(self): 1622 res = httplib2._parse_www_authenticate({"www-authenticate": 'Basic realm="me"'}) 1623 basic = res["basic"] 1624 self.assertEqual("me", basic["realm"]) 1625 1626 res = httplib2._parse_www_authenticate( 1627 {"www-authenticate": 'Basic realm="me", algorithm="MD5"'} 1628 ) 1629 basic = res["basic"] 1630 self.assertEqual("me", basic["realm"]) 1631 self.assertEqual("MD5", basic["algorithm"]) 1632 1633 res = httplib2._parse_www_authenticate( 1634 {"www-authenticate": 'Basic realm="me", algorithm=MD5'} 1635 ) 1636 basic = res["basic"] 1637 self.assertEqual("me", basic["realm"]) 1638 self.assertEqual("MD5", basic["algorithm"]) 1639 1640 def testParseWWWAuthenticateBasic2(self): 1641 res = httplib2._parse_www_authenticate( 1642 {"www-authenticate": 'Basic realm="me",other="fred" '} 1643 ) 1644 basic = res["basic"] 1645 self.assertEqual("me", basic["realm"]) 1646 self.assertEqual("fred", basic["other"]) 1647 1648 def testParseWWWAuthenticateBasic3(self): 1649 res = httplib2._parse_www_authenticate( 1650 {"www-authenticate": 'Basic REAlm="me" '} 1651 ) 1652 basic = res["basic"] 1653 self.assertEqual("me", basic["realm"]) 1654 1655 def testParseWWWAuthenticateDigest(self): 1656 res = httplib2._parse_www_authenticate( 1657 { 1658 "www-authenticate": 'Digest realm="testrealm (at] host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"' 1659 } 1660 ) 1661 digest = res["digest"] 1662 self.assertEqual("testrealm (at] host.com", digest["realm"]) 1663 self.assertEqual("auth,auth-int", digest["qop"]) 1664 1665 def testParseWWWAuthenticateMultiple(self): 1666 res = httplib2._parse_www_authenticate( 1667 { 1668 "www-authenticate": 'Digest realm="testrealm (at] host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" ' 1669 } 1670 ) 1671 digest = res["digest"] 1672 self.assertEqual("testrealm (at] host.com", digest["realm"]) 1673 self.assertEqual("auth,auth-int", digest["qop"]) 1674 self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"]) 1675 self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"]) 1676 basic = res["basic"] 1677 self.assertEqual("me", basic["realm"]) 1678 1679 def testParseWWWAuthenticateMultiple2(self): 1680 # Handle an added comma between challenges, which might get thrown in if the challenges were 1681 # originally sent in separate www-authenticate headers. 1682 res = httplib2._parse_www_authenticate( 1683 { 1684 "www-authenticate": 'Digest realm="testrealm (at] host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" ' 1685 } 1686 ) 1687 digest = res["digest"] 1688 self.assertEqual("testrealm (at] host.com", digest["realm"]) 1689 self.assertEqual("auth,auth-int", digest["qop"]) 1690 self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"]) 1691 self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"]) 1692 basic = res["basic"] 1693 self.assertEqual("me", basic["realm"]) 1694 1695 def testParseWWWAuthenticateMultiple3(self): 1696 # Handle an added comma between challenges, which might get thrown in if the challenges were 1697 # originally sent in separate www-authenticate headers. 1698 res = httplib2._parse_www_authenticate( 1699 { 1700 "www-authenticate": 'Digest realm="testrealm (at] host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"' 1701 } 1702 ) 1703 digest = res["digest"] 1704 self.assertEqual("testrealm (at] host.com", digest["realm"]) 1705 self.assertEqual("auth,auth-int", digest["qop"]) 1706 self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"]) 1707 self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"]) 1708 basic = res["basic"] 1709 self.assertEqual("me", basic["realm"]) 1710 wsse = res["wsse"] 1711 self.assertEqual("foo", wsse["realm"]) 1712 self.assertEqual("UsernameToken", wsse["profile"]) 1713 1714 def testParseWWWAuthenticateMultiple4(self): 1715 res = httplib2._parse_www_authenticate( 1716 { 1717 "www-authenticate": 'Digest realm="test-real.m (at] host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"' 1718 } 1719 ) 1720 digest = res["digest"] 1721 self.assertEqual("test-real.m (at] host.com", digest["realm"]) 1722 self.assertEqual("\tauth,auth-int", digest["qop"]) 1723 self.assertEqual("(*)&^&$%#", digest["nonce"]) 1724 1725 def testParseWWWAuthenticateMoreQuoteCombos(self): 1726 res = httplib2._parse_www_authenticate( 1727 { 1728 "www-authenticate": 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true' 1729 } 1730 ) 1731 digest = res["digest"] 1732 self.assertEqual("myrealm", digest["realm"]) 1733 1734 def testParseWWWAuthenticateMalformed(self): 1735 try: 1736 res = httplib2._parse_www_authenticate( 1737 { 1738 "www-authenticate": 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."' 1739 } 1740 ) 1741 self.fail("should raise an exception") 1742 except httplib2.MalformedHeader: 1743 pass 1744 1745 def testDigestObject(self): 1746 credentials = ("joe", "password") 1747 host = None 1748 request_uri = "/projects/httplib2/test/digest/" 1749 headers = {} 1750 response = { 1751 "www-authenticate": 'Digest realm="myrealm", ' 1752 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", ' 1753 'algorithm=MD5, qop="auth"' 1754 } 1755 content = b"" 1756 1757 d = httplib2.DigestAuthentication( 1758 credentials, host, request_uri, headers, response, content, None 1759 ) 1760 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46") 1761 our_request = "authorization: %s" % headers["authorization"] 1762 working_request = ( 1763 'authorization: Digest username="joe", realm="myrealm", ' 1764 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' 1765 ' uri="/projects/httplib2/test/digest/", algorithm=MD5, ' 1766 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, ' 1767 'nc=00000001, cnonce="33033375ec278a46"' 1768 ) 1769 self.assertEqual(our_request, working_request) 1770 1771 def testDigestObjectWithOpaque(self): 1772 credentials = ("joe", "password") 1773 host = None 1774 request_uri = "/projects/httplib2/test/digest/" 1775 headers = {} 1776 response = { 1777 "www-authenticate": 'Digest realm="myrealm", ' 1778 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", ' 1779 'algorithm=MD5, qop="auth", opaque="atestopaque"' 1780 } 1781 content = "" 1782 1783 d = httplib2.DigestAuthentication( 1784 credentials, host, request_uri, headers, response, content, None 1785 ) 1786 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46") 1787 our_request = "authorization: %s" % headers["authorization"] 1788 working_request = ( 1789 'authorization: Digest username="joe", realm="myrealm", ' 1790 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' 1791 ' uri="/projects/httplib2/test/digest/", algorithm=MD5, ' 1792 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, ' 1793 'nc=00000001, cnonce="33033375ec278a46", ' 1794 'opaque="atestopaque"' 1795 ) 1796 self.assertEqual(our_request, working_request) 1797 1798 def testDigestObjectStale(self): 1799 credentials = ("joe", "password") 1800 host = None 1801 request_uri = "/projects/httplib2/test/digest/" 1802 headers = {} 1803 response = httplib2.Response({}) 1804 response["www-authenticate"] = ( 1805 'Digest realm="myrealm", ' 1806 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' 1807 ' algorithm=MD5, qop="auth", stale=true' 1808 ) 1809 response.status = 401 1810 content = b"" 1811 d = httplib2.DigestAuthentication( 1812 credentials, host, request_uri, headers, response, content, None 1813 ) 1814 # Returns true to force a retry 1815 self.assertTrue(d.response(response, content)) 1816 1817 def testDigestObjectAuthInfo(self): 1818 credentials = ("joe", "password") 1819 host = None 1820 request_uri = "/projects/httplib2/test/digest/" 1821 headers = {} 1822 response = httplib2.Response({}) 1823 response["www-authenticate"] = ( 1824 'Digest realm="myrealm", ' 1825 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' 1826 ' algorithm=MD5, qop="auth", stale=true' 1827 ) 1828 response["authentication-info"] = 'nextnonce="fred"' 1829 content = b"" 1830 d = httplib2.DigestAuthentication( 1831 credentials, host, request_uri, headers, response, content, None 1832 ) 1833 # Returns true to force a retry 1834 self.assertFalse(d.response(response, content)) 1835 self.assertEqual("fred", d.challenge["nonce"]) 1836 self.assertEqual(1, d.challenge["nc"]) 1837 1838 def testWsseAlgorithm(self): 1839 digest = httplib2._wsse_username_token( 1840 "d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm" 1841 ) 1842 expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY=" 1843 self.assertEqual(expected, digest) 1844 1845 def testEnd2End(self): 1846 # one end to end header 1847 response = {"content-type": "application/atom+xml", "te": "deflate"} 1848 end2end = httplib2._get_end2end_headers(response) 1849 self.assertTrue("content-type" in end2end) 1850 self.assertTrue("te" not in end2end) 1851 self.assertTrue("connection" not in end2end) 1852 1853 # one end to end header that gets eliminated 1854 response = { 1855 "connection": "content-type", 1856 "content-type": "application/atom+xml", 1857 "te": "deflate", 1858 } 1859 end2end = httplib2._get_end2end_headers(response) 1860 self.assertTrue("content-type" not in end2end) 1861 self.assertTrue("te" not in end2end) 1862 self.assertTrue("connection" not in end2end) 1863 1864 # Degenerate case of no headers 1865 response = {} 1866 end2end = httplib2._get_end2end_headers(response) 1867 self.assertEqual(0, len(end2end)) 1868 1869 # Degenerate case of connection referrring to a header not passed in 1870 response = {"connection": "content-type"} 1871 end2end = httplib2._get_end2end_headers(response) 1872 self.assertEqual(0, len(end2end)) 1873 1874 1875 class TestProxyInfo(unittest.TestCase): 1876 def setUp(self): 1877 self.orig_env = dict(os.environ) 1878 1879 def tearDown(self): 1880 os.environ.clear() 1881 os.environ.update(self.orig_env) 1882 1883 def test_from_url(self): 1884 pi = httplib2.proxy_info_from_url("http://myproxy.example.com") 1885 self.assertEqual(pi.proxy_host, "myproxy.example.com") 1886 self.assertEqual(pi.proxy_port, 80) 1887 self.assertEqual(pi.proxy_user, None) 1888 1889 def test_from_url_ident(self): 1890 pi = httplib2.proxy_info_from_url("http://zoidberg:fish@someproxy:99") 1891 self.assertEqual(pi.proxy_host, "someproxy") 1892 self.assertEqual(pi.proxy_port, 99) 1893 self.assertEqual(pi.proxy_user, "zoidberg") 1894 self.assertEqual(pi.proxy_pass, "fish") 1895 1896 def test_from_env(self): 1897 os.environ["http_proxy"] = "http://myproxy.example.com:8080" 1898 pi = httplib2.proxy_info_from_environment() 1899 self.assertEqual(pi.proxy_host, "myproxy.example.com") 1900 self.assertEqual(pi.proxy_port, 8080) 1901 1902 def test_from_env_no_proxy(self): 1903 os.environ["http_proxy"] = "http://myproxy.example.com:80" 1904 os.environ["https_proxy"] = "http://myproxy.example.com:81" 1905 pi = httplib2.proxy_info_from_environment("https") 1906 self.assertEqual(pi.proxy_host, "myproxy.example.com") 1907 self.assertEqual(pi.proxy_port, 81) 1908 1909 def test_from_env_none(self): 1910 os.environ.clear() 1911 pi = httplib2.proxy_info_from_environment() 1912 self.assertEqual(pi, None) 1913 1914 def test_proxy_headers(self): 1915 headers = {"key0": "val0", "key1": "val1"} 1916 pi = httplib2.ProxyInfo( 1917 httplib2.socks.PROXY_TYPE_HTTP, "localhost", 1234, proxy_headers=headers 1918 ) 1919 self.assertEqual(pi.proxy_headers, headers) 1920 1921 # regression: ensure that httplib2.HTTPConnectionWithTimeout initializes when proxy_info is not supplied 1922 def test_proxy_init(self): 1923 connection = httplib2.HTTPConnectionWithTimeout("www.google.com", 80) 1924 connection.request("GET", "/") 1925 connection.close() 1926 1927 1928 if __name__ == "__main__": 1929 unittest.main() 1930