Home | History | Annotate | Download | only in python3
      1 #!/usr/bin/env python3
      2 """A set of unit tests for httplib2.py."""
      3 
      4 __author__ = "Joe Gregorio (joe (at] bitworking.org)"
      5 __copyright__ = "Copyright 2006, Joe Gregorio"
      6 __contributors__ = ["Mark Pilgrim"]
      7 __license__ = "MIT"
      8 __version__ = "0.2 ($Rev: 118 $)"
      9 
     10 import base64
     11 import http.client
     12 import httplib2
     13 import io
     14 import os
     15 import pickle
     16 import socket
     17 import ssl
     18 import sys
     19 import time
     20 import unittest
     21 import urllib.parse
     22 
     23 base = "http://bitworking.org/projects/httplib2/test/"
     24 cacheDirName = ".cache"
     25 
     26 
     27 class CredentialsTest(unittest.TestCase):
     28     def test(self):
     29         c = httplib2.Credentials()
     30         c.add("joe", "password")
     31         self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
     32         self.assertEqual(("joe", "password"), list(c.iter(""))[0])
     33         c.add("fred", "password2", "wellformedweb.org")
     34         self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
     35         self.assertEqual(1, len(list(c.iter("bitworking.org"))))
     36         self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
     37         self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
     38         c.clear()
     39         self.assertEqual(0, len(list(c.iter("bitworking.org"))))
     40         c.add("fred", "password2", "wellformedweb.org")
     41         self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
     42         self.assertEqual(0, len(list(c.iter("bitworking.org"))))
     43         self.assertEqual(0, len(list(c.iter(""))))
     44 
     45 
     46 class ParserTest(unittest.TestCase):
     47     def testFromStd66(self):
     48         self.assertEqual(
     49             ("http", "example.com", "", None, None),
     50             httplib2.parse_uri("http://example.com"),
     51         )
     52         self.assertEqual(
     53             ("https", "example.com", "", None, None),
     54             httplib2.parse_uri("https://example.com"),
     55         )
     56         self.assertEqual(
     57             ("https", "example.com:8080", "", None, None),
     58             httplib2.parse_uri("https://example.com:8080"),
     59         )
     60         self.assertEqual(
     61             ("http", "example.com", "/", None, None),
     62             httplib2.parse_uri("http://example.com/"),
     63         )
     64         self.assertEqual(
     65             ("http", "example.com", "/path", None, None),
     66             httplib2.parse_uri("http://example.com/path"),
     67         )
     68         self.assertEqual(
     69             ("http", "example.com", "/path", "a=1&b=2", None),
     70             httplib2.parse_uri("http://example.com/path?a=1&b=2"),
     71         )
     72         self.assertEqual(
     73             ("http", "example.com", "/path", "a=1&b=2", "fred"),
     74             httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"),
     75         )
     76         self.assertEqual(
     77             ("http", "example.com", "/path", "a=1&b=2", "fred"),
     78             httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"),
     79         )
     80 
     81 
     82 class UrlNormTest(unittest.TestCase):
     83     def test(self):
     84         self.assertEqual(
     85             "http://example.org/", httplib2.urlnorm("http://example.org")[-1]
     86         )
     87         self.assertEqual(
     88             "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1]
     89         )
     90         self.assertEqual(
     91             "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1]
     92         )
     93         self.assertEqual(
     94             "http://example.org/mypath?a=b",
     95             httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1],
     96         )
     97         self.assertEqual(
     98             "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1]
     99         )
    100         self.assertEqual(
    101             httplib2.urlnorm("http://localhost:80/"),
    102             httplib2.urlnorm("HTTP://LOCALHOST:80"),
    103         )
    104         try:
    105             httplib2.urlnorm("/")
    106             self.fail("Non-absolute URIs should raise an exception")
    107         except httplib2.RelativeURIError:
    108             pass
    109 
    110 
    111 class UrlSafenameTest(unittest.TestCase):
    112     def test(self):
    113         # Test that different URIs end up generating different safe names
    114         self.assertEqual(
    115             "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f",
    116             httplib2.safename("http://example.org/fred/?a=b"),
    117         )
    118         self.assertEqual(
    119             "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b",
    120             httplib2.safename("http://example.org/fred?/a=b"),
    121         )
    122         self.assertEqual(
    123             "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968",
    124             httplib2.safename("http://www.example.org/fred?/a=b"),
    125         )
    126         self.assertEqual(
    127             httplib2.safename(httplib2.urlnorm("http://www")[-1]),
    128             httplib2.safename(httplib2.urlnorm("http://WWW")[-1]),
    129         )
    130         self.assertEqual(
    131             "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d",
    132             httplib2.safename("https://www.example.org/fred?/a=b"),
    133         )
    134         self.assertNotEqual(
    135             httplib2.safename("http://www"), httplib2.safename("https://www")
    136         )
    137         # Test the max length limits
    138         uri = "http://" + ("w" * 200) + ".org"
    139         uri2 = "http://" + ("w" * 201) + ".org"
    140         self.assertNotEqual(httplib2.safename(uri2), httplib2.safename(uri))
    141         # Max length should be 200 + 1 (",") + 32
    142         self.assertEqual(233, len(httplib2.safename(uri2)))
    143         self.assertEqual(233, len(httplib2.safename(uri)))
    144         # Unicode
    145         if sys.version_info >= (2, 3):
    146             self.assertEqual(
    147                 "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193",
    148                 httplib2.safename("http://\u2304.org/fred/?a=b"),
    149             )
    150 
    151 
    152 class _MyResponse(io.BytesIO):
    153     def __init__(self, body, **kwargs):
    154         io.BytesIO.__init__(self, body)
    155         self.headers = kwargs
    156 
    157     def items(self):
    158         return self.headers.items()
    159 
    160     def iteritems(self):
    161         return iter(self.headers.items())
    162 
    163 
    164 class _MyHTTPConnection(object):
    165     "This class is just a mock of httplib.HTTPConnection used for testing"
    166 
    167     def __init__(
    168         self,
    169         host,
    170         port=None,
    171         key_file=None,
    172         cert_file=None,
    173         strict=None,
    174         timeout=None,
    175         proxy_info=None,
    176     ):
    177         self.host = host
    178         self.port = port
    179         self.timeout = timeout
    180         self.log = ""
    181         self.sock = None
    182 
    183     def set_debuglevel(self, level):
    184         pass
    185 
    186     def connect(self):
    187         "Connect to a host on a given port."
    188         pass
    189 
    190     def close(self):
    191         pass
    192 
    193     def request(self, method, request_uri, body, headers):
    194         pass
    195 
    196     def getresponse(self):
    197         return _MyResponse(b"the body", status="200")
    198 
    199 
    200 class _MyHTTPBadStatusConnection(object):
    201     "Mock of httplib.HTTPConnection that raises BadStatusLine."
    202 
    203     num_calls = 0
    204 
    205     def __init__(
    206         self,
    207         host,
    208         port=None,
    209         key_file=None,
    210         cert_file=None,
    211         strict=None,
    212         timeout=None,
    213         proxy_info=None,
    214     ):
    215         self.host = host
    216         self.port = port
    217         self.timeout = timeout
    218         self.log = ""
    219         self.sock = None
    220         _MyHTTPBadStatusConnection.num_calls = 0
    221 
    222     def set_debuglevel(self, level):
    223         pass
    224 
    225     def connect(self):
    226         pass
    227 
    228     def close(self):
    229         pass
    230 
    231     def request(self, method, request_uri, body, headers):
    232         pass
    233 
    234     def getresponse(self):
    235         _MyHTTPBadStatusConnection.num_calls += 1
    236         raise http.client.BadStatusLine("")
    237 
    238 
    239 class HttpTest(unittest.TestCase):
    240     def setUp(self):
    241         if os.path.exists(cacheDirName):
    242             [
    243                 os.remove(os.path.join(cacheDirName, file))
    244                 for file in os.listdir(cacheDirName)
    245             ]
    246         self.http = httplib2.Http(cacheDirName)
    247         self.http.clear_credentials()
    248 
    249     def testIPv6NoSSL(self):
    250         try:
    251             self.http.request("http://[::1]/")
    252         except socket.gaierror:
    253             self.fail("should get the address family right for IPv6")
    254         except socket.error:
    255             # Even if IPv6 isn't installed on a machine it should just raise socket.error
    256             pass
    257 
    258     def testIPv6SSL(self):
    259         try:
    260             self.http.request("https://[::1]/")
    261         except socket.gaierror:
    262             self.fail("should get the address family right for IPv6")
    263         except socket.error:
    264             # Even if IPv6 isn't installed on a machine it should just raise socket.error
    265             pass
    266 
    267     def testConnectionType(self):
    268         self.http.force_exception_to_status_code = False
    269         response, content = self.http.request(
    270             "http://bitworking.org", connection_type=_MyHTTPConnection
    271         )
    272         self.assertEqual(response["content-location"], "http://bitworking.org")
    273         self.assertEqual(content, b"the body")
    274 
    275     def testBadStatusLineRetry(self):
    276         old_retries = httplib2.RETRIES
    277         httplib2.RETRIES = 1
    278         self.http.force_exception_to_status_code = False
    279         try:
    280             response, content = self.http.request(
    281                 "http://bitworking.org", connection_type=_MyHTTPBadStatusConnection
    282             )
    283         except http.client.BadStatusLine:
    284             self.assertEqual(2, _MyHTTPBadStatusConnection.num_calls)
    285         httplib2.RETRIES = old_retries
    286 
    287     def testGetUnknownServer(self):
    288         self.http.force_exception_to_status_code = False
    289         try:
    290             self.http.request("http://fred.bitworking.org/")
    291             self.fail(
    292                 "An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server."
    293             )
    294         except httplib2.ServerNotFoundError:
    295             pass
    296 
    297         # Now test with exceptions turned off
    298         self.http.force_exception_to_status_code = True
    299 
    300         (response, content) = self.http.request("http://fred.bitworking.org/")
    301         self.assertEqual(response["content-type"], "text/plain")
    302         self.assertTrue(content.startswith(b"Unable to find"))
    303         self.assertEqual(response.status, 400)
    304 
    305     def testGetConnectionRefused(self):
    306         self.http.force_exception_to_status_code = False
    307         try:
    308             self.http.request("http://localhost:7777/")
    309             self.fail("An socket.error exception must be thrown on Connection Refused.")
    310         except socket.error:
    311             pass
    312 
    313         # Now test with exceptions turned off
    314         self.http.force_exception_to_status_code = True
    315 
    316         (response, content) = self.http.request("http://localhost:7777/")
    317         self.assertEqual(response["content-type"], "text/plain")
    318         self.assertTrue(b"Connection refused" in content)
    319         self.assertEqual(response.status, 400)
    320 
    321     def testGetIRI(self):
    322         if sys.version_info >= (2, 3):
    323             uri = urllib.parse.urljoin(
    324                 base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}"
    325             )
    326             (response, content) = self.http.request(uri, "GET")
    327             d = self.reflector(content)
    328             self.assertTrue("QUERY_STRING" in d)
    329             self.assertTrue(d["QUERY_STRING"].find("%D0%82") > 0)
    330 
    331     def testGetIsDefaultMethod(self):
    332         # Test that GET is the default method
    333         uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
    334         (response, content) = self.http.request(uri)
    335         self.assertEqual(response["x-method"], "GET")
    336 
    337     def testDifferentMethods(self):
    338         # Test that all methods can be used
    339         uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
    340         for method in ["GET", "PUT", "DELETE", "POST"]:
    341             (response, content) = self.http.request(uri, method, body=b" ")
    342             self.assertEqual(response["x-method"], method)
    343 
    344     def testHeadRead(self):
    345         # Test that we don't try to read the response of a HEAD request
    346         # since httplib blocks response.read() for HEAD requests.
    347         # Oddly enough this doesn't appear as a problem when doing HEAD requests
    348         # against Apache servers.
    349         uri = "http://www.google.com/"
    350         (response, content) = self.http.request(uri, "HEAD")
    351         self.assertEqual(response.status, 200)
    352         self.assertEqual(content, b"")
    353 
    354     def testGetNoCache(self):
    355         # Test that can do a GET w/o the cache turned on.
    356         http = httplib2.Http()
    357         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
    358         (response, content) = http.request(uri, "GET")
    359         self.assertEqual(response.status, 200)
    360         self.assertEqual(response.previous, None)
    361 
    362     def testGetOnlyIfCachedCacheHit(self):
    363         # Test that can do a GET with cache and 'only-if-cached'
    364         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
    365         (response, content) = self.http.request(uri, "GET")
    366         (response, content) = self.http.request(
    367             uri, "GET", headers={"cache-control": "only-if-cached"}
    368         )
    369         self.assertEqual(response.fromcache, True)
    370         self.assertEqual(response.status, 200)
    371 
    372     def testGetOnlyIfCachedCacheMiss(self):
    373         # Test that can do a GET with no cache with 'only-if-cached'
    374         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
    375         (response, content) = self.http.request(
    376             uri, "GET", headers={"cache-control": "only-if-cached"}
    377         )
    378         self.assertEqual(response.fromcache, False)
    379         self.assertEqual(response.status, 504)
    380 
    381     def testGetOnlyIfCachedNoCacheAtAll(self):
    382         # Test that can do a GET with no cache with 'only-if-cached'
    383         # Of course, there might be an intermediary beyond us
    384         # that responds to the 'only-if-cached', so this
    385         # test can't really be guaranteed to pass.
    386         http = httplib2.Http()
    387         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
    388         (response, content) = http.request(
    389             uri, "GET", headers={"cache-control": "only-if-cached"}
    390         )
    391         self.assertEqual(response.fromcache, False)
    392         self.assertEqual(response.status, 504)
    393 
    394     def testUserAgent(self):
    395         # Test that we provide a default user-agent
    396         uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
    397         (response, content) = self.http.request(uri, "GET")
    398         self.assertEqual(response.status, 200)
    399         self.assertTrue(content.startswith(b"Python-httplib2/"))
    400 
    401     def testUserAgentNonDefault(self):
    402         # Test that the default user-agent can be over-ridden
    403 
    404         uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
    405         (response, content) = self.http.request(
    406             uri, "GET", headers={"User-Agent": "fred/1.0"}
    407         )
    408         self.assertEqual(response.status, 200)
    409         self.assertTrue(content.startswith(b"fred/1.0"))
    410 
    411     def testGet300WithLocation(self):
    412         # Test the we automatically follow 300 redirects if a Location: header is provided
    413         uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
    414         (response, content) = self.http.request(uri, "GET")
    415         self.assertEqual(response.status, 200)
    416         self.assertEqual(content, b"This is the final destination.\n")
    417         self.assertEqual(response.previous.status, 300)
    418         self.assertEqual(response.previous.fromcache, False)
    419 
    420         # Confirm that the intermediate 300 is not cached
    421         (response, content) = self.http.request(uri, "GET")
    422         self.assertEqual(response.status, 200)
    423         self.assertEqual(content, b"This is the final destination.\n")
    424         self.assertEqual(response.previous.status, 300)
    425         self.assertEqual(response.previous.fromcache, False)
    426 
    427     def testGet300WithLocationNoRedirect(self):
    428         # Test the we automatically follow 300 redirects if a Location: header is provided
    429         self.http.follow_redirects = False
    430         uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
    431         (response, content) = self.http.request(uri, "GET")
    432         self.assertEqual(response.status, 300)
    433 
    434     def testGet300WithoutLocation(self):
    435         # Not giving a Location: header in a 300 response is acceptable
    436         # In which case we just return the 300 response
    437         uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
    438         (response, content) = self.http.request(uri, "GET")
    439         self.assertEqual(response.status, 300)
    440         self.assertTrue(response["content-type"].startswith("text/html"))
    441         self.assertEqual(response.previous, None)
    442 
    443     def testGet301(self):
    444         # Test that we automatically follow 301 redirects
    445         # and that we cache the 301 response
    446         uri = urllib.parse.urljoin(base, "301/onestep.asis")
    447         destination = urllib.parse.urljoin(base, "302/final-destination.txt")
    448         (response, content) = self.http.request(uri, "GET")
    449         self.assertEqual(response.status, 200)
    450         self.assertTrue("content-location" in response)
    451         self.assertEqual(response["content-location"], destination)
    452         self.assertEqual(content, b"This is the final destination.\n")
    453         self.assertEqual(response.previous.status, 301)
    454         self.assertEqual(response.previous.fromcache, False)
    455 
    456         (response, content) = self.http.request(uri, "GET")
    457         self.assertEqual(response.status, 200)
    458         self.assertEqual(response["content-location"], destination)
    459         self.assertEqual(content, b"This is the final destination.\n")
    460         self.assertEqual(response.previous.status, 301)
    461         self.assertEqual(response.previous.fromcache, True)
    462 
    463     def testHead301(self):
    464         # Test that we automatically follow 301 redirects
    465         uri = urllib.parse.urljoin(base, "301/onestep.asis")
    466         (response, content) = self.http.request(uri, "HEAD")
    467         self.assertEqual(response.status, 200)
    468         self.assertEqual(response.previous.status, 301)
    469         self.assertEqual(response.previous.fromcache, False)
    470 
    471     def testGet301NoRedirect(self):
    472         # Test that we automatically follow 301 redirects
    473         # and that we cache the 301 response
    474         self.http.follow_redirects = False
    475         uri = urllib.parse.urljoin(base, "301/onestep.asis")
    476         destination = urllib.parse.urljoin(base, "302/final-destination.txt")
    477         (response, content) = self.http.request(uri, "GET")
    478         self.assertEqual(response.status, 301)
    479 
    480     def testGet302(self):
    481         # Test that we automatically follow 302 redirects
    482         # and that we DO NOT cache the 302 response
    483         uri = urllib.parse.urljoin(base, "302/onestep.asis")
    484         destination = urllib.parse.urljoin(base, "302/final-destination.txt")
    485         (response, content) = self.http.request(uri, "GET")
    486         self.assertEqual(response.status, 200)
    487         self.assertEqual(response["content-location"], destination)
    488         self.assertEqual(content, b"This is the final destination.\n")
    489         self.assertEqual(response.previous.status, 302)
    490         self.assertEqual(response.previous.fromcache, False)
    491 
    492         uri = urllib.parse.urljoin(base, "302/onestep.asis")
    493         (response, content) = self.http.request(uri, "GET")
    494         self.assertEqual(response.status, 200)
    495         self.assertEqual(response.fromcache, True)
    496         self.assertEqual(response["content-location"], destination)
    497         self.assertEqual(content, b"This is the final destination.\n")
    498         self.assertEqual(response.previous.status, 302)
    499         self.assertEqual(response.previous.fromcache, False)
    500         self.assertEqual(response.previous["content-location"], uri)
    501 
    502         uri = urllib.parse.urljoin(base, "302/twostep.asis")
    503 
    504         (response, content) = self.http.request(uri, "GET")
    505         self.assertEqual(response.status, 200)
    506         self.assertEqual(response.fromcache, True)
    507         self.assertEqual(content, b"This is the final destination.\n")
    508         self.assertEqual(response.previous.status, 302)
    509         self.assertEqual(response.previous.fromcache, False)
    510 
    511     def testGet302RedirectionLimit(self):
    512         # Test that we can set a lower redirection limit
    513         # and that we raise an exception when we exceed
    514         # that limit.
    515         self.http.force_exception_to_status_code = False
    516 
    517         uri = urllib.parse.urljoin(base, "302/twostep.asis")
    518         try:
    519             (response, content) = self.http.request(uri, "GET", redirections=1)
    520             self.fail("This should not happen")
    521         except httplib2.RedirectLimit:
    522             pass
    523         except Exception as e:
    524             self.fail("Threw wrong kind of exception ")
    525 
    526         # Re-run the test with out the exceptions
    527         self.http.force_exception_to_status_code = True
    528 
    529         (response, content) = self.http.request(uri, "GET", redirections=1)
    530         self.assertEqual(response.status, 500)
    531         self.assertTrue(response.reason.startswith("Redirected more"))
    532         self.assertEqual("302", response["status"])
    533         self.assertTrue(content.startswith(b"<html>"))
    534         self.assertTrue(response.previous != None)
    535 
    536     def testGet302NoLocation(self):
    537         # Test that we throw an exception when we get
    538         # a 302 with no Location: header.
    539         self.http.force_exception_to_status_code = False
    540         uri = urllib.parse.urljoin(base, "302/no-location.asis")
    541         try:
    542             (response, content) = self.http.request(uri, "GET")
    543             self.fail("Should never reach here")
    544         except httplib2.RedirectMissingLocation:
    545             pass
    546         except Exception as e:
    547             self.fail("Threw wrong kind of exception ")
    548 
    549         # Re-run the test with out the exceptions
    550         self.http.force_exception_to_status_code = True
    551 
    552         (response, content) = self.http.request(uri, "GET")
    553         self.assertEqual(response.status, 500)
    554         self.assertTrue(response.reason.startswith("Redirected but"))
    555         self.assertEqual("302", response["status"])
    556         self.assertTrue(content.startswith(b"This is content"))
    557 
    558     def testGet301ViaHttps(self):
    559         # Google always redirects to http://google.com
    560         (response, content) = self.http.request("https://code.google.com/apis/", "GET")
    561         self.assertEqual(200, response.status)
    562         self.assertEqual(301, response.previous.status)
    563 
    564     def testGetViaHttps(self):
    565         # Test that we can handle HTTPS
    566         (response, content) = self.http.request("https://google.com/adsense/", "GET")
    567         self.assertEqual(200, response.status)
    568 
    569     def testGetViaHttpsSpecViolationOnLocation(self):
    570         # Test that we follow redirects through HTTPS
    571         # even if they violate the spec by including
    572         # a relative Location: header instead of an
    573         # absolute one.
    574         (response, content) = self.http.request("https://google.com/adsense", "GET")
    575         self.assertEqual(200, response.status)
    576         self.assertNotEqual(None, response.previous)
    577 
    578     def testGetViaHttpsKeyCert(self):
    579         #  At this point I can only test
    580         #  that the key and cert files are passed in
    581         #  correctly to httplib. It would be nice to have
    582         #  a real https endpoint to test against.
    583         http = httplib2.Http(timeout=2)
    584 
    585         http.add_certificate("akeyfile", "acertfile", "bitworking.org")
    586         try:
    587             (response, content) = http.request("https://bitworking.org", "GET")
    588         except AttributeError:
    589             self.assertEqual(
    590                 http.connections["https:bitworking.org"].key_file, "akeyfile"
    591             )
    592             self.assertEqual(
    593                 http.connections["https:bitworking.org"].cert_file, "acertfile"
    594             )
    595         except IOError:
    596             # Skip on 3.2
    597             pass
    598 
    599         try:
    600             (response, content) = http.request("https://notthere.bitworking.org", "GET")
    601         except httplib2.ServerNotFoundError:
    602             self.assertEqual(
    603                 http.connections["https:notthere.bitworking.org"].key_file, None
    604             )
    605             self.assertEqual(
    606                 http.connections["https:notthere.bitworking.org"].cert_file, None
    607             )
    608         except IOError:
    609             # Skip on 3.2
    610             pass
    611 
    612     def testSslCertValidation(self):
    613         # Test that we get an ssl.SSLError when specifying a non-existent CA
    614         # certs file.
    615         http = httplib2.Http(ca_certs="/nosuchfile")
    616         self.assertRaises(IOError, http.request, "https://www.google.com/", "GET")
    617 
    618         # Test that we get a SSLHandshakeError if we try to access
    619         # https://www.google.com, using a CA cert file that doesn't contain
    620         # the CA Google uses (i.e., simulating a cert that's not signed by a
    621         # trusted CA).
    622         other_ca_certs = os.path.join(
    623             os.path.dirname(os.path.abspath(httplib2.__file__)),
    624             "test",
    625             "other_cacerts.txt",
    626         )
    627         http = httplib2.Http(ca_certs=other_ca_certs)
    628         self.assertRaises(ssl.SSLError, http.request, "https://www.google.com/", "GET")
    629 
    630     def testSniHostnameValidation(self):
    631         self.http.request("https://google.com/", method="GET")
    632 
    633     def testGet303(self):
    634         # Do a follow-up GET on a Location: header
    635         # returned from a POST that gave a 303.
    636         uri = urllib.parse.urljoin(base, "303/303.cgi")
    637         (response, content) = self.http.request(uri, "POST", " ")
    638         self.assertEqual(response.status, 200)
    639         self.assertEqual(content, b"This is the final destination.\n")
    640         self.assertEqual(response.previous.status, 303)
    641 
    642     def testGet303NoRedirect(self):
    643         # Do a follow-up GET on a Location: header
    644         # returned from a POST that gave a 303.
    645         self.http.follow_redirects = False
    646         uri = urllib.parse.urljoin(base, "303/303.cgi")
    647         (response, content) = self.http.request(uri, "POST", " ")
    648         self.assertEqual(response.status, 303)
    649 
    650     def test303ForDifferentMethods(self):
    651         # Test that all methods can be used
    652         uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
    653         for (method, method_on_303) in [
    654             ("PUT", "GET"),
    655             ("DELETE", "GET"),
    656             ("POST", "GET"),
    657             ("GET", "GET"),
    658             ("HEAD", "GET"),
    659         ]:
    660             (response, content) = self.http.request(uri, method, body=b" ")
    661             self.assertEqual(response["x-method"], method_on_303)
    662 
    663     def testGet304(self):
    664         # Test that we use ETags properly to validate our cache
    665         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
    666         (response, content) = self.http.request(
    667             uri, "GET", headers={"accept-encoding": "identity"}
    668         )
    669         self.assertNotEqual(response["etag"], "")
    670 
    671         (response, content) = self.http.request(
    672             uri, "GET", headers={"accept-encoding": "identity"}
    673         )
    674         (response, content) = self.http.request(
    675             uri,
    676             "GET",
    677             headers={"accept-encoding": "identity", "cache-control": "must-revalidate"},
    678         )
    679         self.assertEqual(response.status, 200)
    680         self.assertEqual(response.fromcache, True)
    681 
    682         cache_file_name = os.path.join(
    683             cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1])
    684         )
    685         f = open(cache_file_name, "r")
    686         status_line = f.readline()
    687         f.close()
    688 
    689         self.assertTrue(status_line.startswith("status:"))
    690 
    691         (response, content) = self.http.request(
    692             uri, "HEAD", headers={"accept-encoding": "identity"}
    693         )
    694         self.assertEqual(response.status, 200)
    695         self.assertEqual(response.fromcache, True)
    696 
    697         (response, content) = self.http.request(
    698             uri, "GET", headers={"accept-encoding": "identity", "range": "bytes=0-0"}
    699         )
    700         self.assertEqual(response.status, 206)
    701         self.assertEqual(response.fromcache, False)
    702 
    703     def testGetIgnoreEtag(self):
    704         # Test that we can forcibly ignore ETags
    705         uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
    706         (response, content) = self.http.request(
    707             uri, "GET", headers={"accept-encoding": "identity"}
    708         )
    709         self.assertNotEqual(response["etag"], "")
    710 
    711         (response, content) = self.http.request(
    712             uri,
    713             "GET",
    714             headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
    715         )
    716         d = self.reflector(content)
    717         self.assertTrue("HTTP_IF_NONE_MATCH" in d)
    718 
    719         self.http.ignore_etag = True
    720         (response, content) = self.http.request(
    721             uri,
    722             "GET",
    723             headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
    724         )
    725         d = self.reflector(content)
    726         self.assertEqual(response.fromcache, False)
    727         self.assertFalse("HTTP_IF_NONE_MATCH" in d)
    728 
    729     def testOverrideEtag(self):
    730         # Test that we can forcibly ignore ETags
    731         uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
    732         (response, content) = self.http.request(
    733             uri, "GET", headers={"accept-encoding": "identity"}
    734         )
    735         self.assertNotEqual(response["etag"], "")
    736 
    737         (response, content) = self.http.request(
    738             uri,
    739             "GET",
    740             headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
    741         )
    742         d = self.reflector(content)
    743         self.assertTrue("HTTP_IF_NONE_MATCH" in d)
    744         self.assertNotEqual(d["HTTP_IF_NONE_MATCH"], "fred")
    745 
    746         (response, content) = self.http.request(
    747             uri,
    748             "GET",
    749             headers={
    750                 "accept-encoding": "identity",
    751                 "cache-control": "max-age=0",
    752                 "if-none-match": "fred",
    753             },
    754         )
    755         d = self.reflector(content)
    756         self.assertTrue("HTTP_IF_NONE_MATCH" in d)
    757         self.assertEqual(d["HTTP_IF_NONE_MATCH"], "fred")
    758 
    759     # MAP-commented this out because it consistently fails
    760     #    def testGet304EndToEnd(self):
    761     #       # Test that end to end headers get overwritten in the cache
    762     #        uri = urllib.parse.urljoin(base, "304/end2end.cgi")
    763     #        (response, content) = self.http.request(uri, "GET")
    764     #        self.assertNotEqual(response['etag'], "")
    765     #        old_date = response['date']
    766     #        time.sleep(2)
    767     #
    768     #        (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
    769     #        # The response should be from the cache, but the Date: header should be updated.
    770     #        new_date = response['date']
    771     #        self.assertNotEqual(new_date, old_date)
    772     #        self.assertEqual(response.status, 200)
    773     #        self.assertEqual(response.fromcache, True)
    774 
    775     def testGet304LastModified(self):
    776         # Test that we can still handle a 304
    777         # by only using the last-modified cache validator.
    778         uri = urllib.parse.urljoin(
    779             base, "304/last-modified-only/last-modified-only.txt"
    780         )
    781         (response, content) = self.http.request(uri, "GET")
    782 
    783         self.assertNotEqual(response["last-modified"], "")
    784         (response, content) = self.http.request(uri, "GET")
    785         (response, content) = self.http.request(uri, "GET")
    786         self.assertEqual(response.status, 200)
    787         self.assertEqual(response.fromcache, True)
    788 
    789     def testGet307(self):
    790         # Test that we do follow 307 redirects but
    791         # do not cache the 307
    792         uri = urllib.parse.urljoin(base, "307/onestep.asis")
    793         (response, content) = self.http.request(uri, "GET")
    794         self.assertEqual(response.status, 200)
    795         self.assertEqual(content, b"This is the final destination.\n")
    796         self.assertEqual(response.previous.status, 307)
    797         self.assertEqual(response.previous.fromcache, False)
    798 
    799         (response, content) = self.http.request(uri, "GET")
    800         self.assertEqual(response.status, 200)
    801         self.assertEqual(response.fromcache, True)
    802         self.assertEqual(content, b"This is the final destination.\n")
    803         self.assertEqual(response.previous.status, 307)
    804         self.assertEqual(response.previous.fromcache, False)
    805 
    806     def testGet410(self):
    807         # Test that we pass 410's through
    808         uri = urllib.parse.urljoin(base, "410/410.asis")
    809         (response, content) = self.http.request(uri, "GET")
    810         self.assertEqual(response.status, 410)
    811 
    812     def testVaryHeaderSimple(self):
    813         """RFC 2616 13.6 When the cache receives a subsequent request whose Request-URI specifies one or more cache entries including a Vary header field, the cache MUST NOT use such a cache entry to construct a response to the new request unless all of the selecting request-headers present in the new request match the corresponding stored request-headers in the original request.
    814 
    815         """
    816         # test that the vary header is sent
    817         uri = urllib.parse.urljoin(base, "vary/accept.asis")
    818         (response, content) = self.http.request(
    819             uri, "GET", headers={"Accept": "text/plain"}
    820         )
    821         self.assertEqual(response.status, 200)
    822         self.assertTrue("vary" in response)
    823 
    824         # get the resource again, from the cache since accept header in this
    825         # request is the same as the request
    826         (response, content) = self.http.request(
    827             uri, "GET", headers={"Accept": "text/plain"}
    828         )
    829         self.assertEqual(response.status, 200)
    830         self.assertEqual(response.fromcache, True, msg="Should be from cache")
    831 
    832         # get the resource again, not from cache since Accept headers does not match
    833         (response, content) = self.http.request(
    834             uri, "GET", headers={"Accept": "text/html"}
    835         )
    836         self.assertEqual(response.status, 200)
    837         self.assertEqual(response.fromcache, False, msg="Should not be from cache")
    838 
    839         # get the resource again, without any Accept header, so again no match
    840         (response, content) = self.http.request(uri, "GET")
    841         self.assertEqual(response.status, 200)
    842         self.assertEqual(response.fromcache, False, msg="Should not be from cache")
    843 
    844     def testNoVary(self):
    845         pass
    846         # when there is no vary, a different Accept header (e.g.) should not
    847         # impact if the cache is used
    848         # test that the vary header is not sent
    849         # uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
    850         # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
    851         # self.assertEqual(response.status, 200)
    852         # self.assertFalse('vary' in response)
    853         #
    854         # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
    855         # self.assertEqual(response.status, 200)
    856         # self.assertEqual(response.fromcache, True, msg="Should be from cache")
    857         #
    858         # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
    859         # self.assertEqual(response.status, 200)
    860         # self.assertEqual(response.fromcache, True, msg="Should be from cache")
    861 
    862     def testVaryHeaderDouble(self):
    863         uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
    864         (response, content) = self.http.request(
    865             uri,
    866             "GET",
    867             headers={
    868                 "Accept": "text/plain",
    869                 "Accept-Language": "da, en-gb;q=0.8, en;q=0.7",
    870             },
    871         )
    872         self.assertEqual(response.status, 200)
    873         self.assertTrue("vary" in response)
    874 
    875         # we are from cache
    876         (response, content) = self.http.request(
    877             uri,
    878             "GET",
    879             headers={
    880                 "Accept": "text/plain",
    881                 "Accept-Language": "da, en-gb;q=0.8, en;q=0.7",
    882             },
    883         )
    884         self.assertEqual(response.fromcache, True, msg="Should be from cache")
    885 
    886         (response, content) = self.http.request(
    887             uri, "GET", headers={"Accept": "text/plain"}
    888         )
    889         self.assertEqual(response.status, 200)
    890         self.assertEqual(response.fromcache, False)
    891 
    892         # get the resource again, not from cache, varied headers don't match exact
    893         (response, content) = self.http.request(
    894             uri, "GET", headers={"Accept-Language": "da"}
    895         )
    896         self.assertEqual(response.status, 200)
    897         self.assertEqual(response.fromcache, False, msg="Should not be from cache")
    898 
    899     def testVaryUnusedHeader(self):
    900         # A header's value is not considered to vary if it's not used at all.
    901         uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
    902         (response, content) = self.http.request(
    903             uri, "GET", headers={"Accept": "text/plain"}
    904         )
    905         self.assertEqual(response.status, 200)
    906         self.assertTrue("vary" in response)
    907 
    908         # we are from cache
    909         (response, content) = self.http.request(
    910             uri, "GET", headers={"Accept": "text/plain"}
    911         )
    912         self.assertEqual(response.fromcache, True, msg="Should be from cache")
    913 
    914     def testHeadGZip(self):
    915         # Test that we don't try to decompress a HEAD response
    916         uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
    917         (response, content) = self.http.request(uri, "HEAD")
    918         self.assertEqual(response.status, 200)
    919         self.assertNotEqual(int(response["content-length"]), 0)
    920         self.assertEqual(content, b"")
    921 
    922     def testGetGZip(self):
    923         # Test that we support gzip compression
    924         uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
    925         (response, content) = self.http.request(uri, "GET")
    926         self.assertEqual(response.status, 200)
    927         self.assertFalse("content-encoding" in response)
    928         self.assertTrue("-content-encoding" in response)
    929         self.assertEqual(
    930             int(response["content-length"]), len(b"This is the final destination.\n")
    931         )
    932         self.assertEqual(content, b"This is the final destination.\n")
    933 
    934     def testPostAndGZipResponse(self):
    935         uri = urllib.parse.urljoin(base, "gzip/post.cgi")
    936         (response, content) = self.http.request(uri, "POST", body=" ")
    937         self.assertEqual(response.status, 200)
    938         self.assertFalse("content-encoding" in response)
    939         self.assertTrue("-content-encoding" in response)
    940 
    941     def testGetGZipFailure(self):
    942         # Test that we raise a good exception when the gzip fails
    943         self.http.force_exception_to_status_code = False
    944         uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
    945         try:
    946             (response, content) = self.http.request(uri, "GET")
    947             self.fail("Should never reach here")
    948         except httplib2.FailedToDecompressContent:
    949             pass
    950         except Exception:
    951             self.fail("Threw wrong kind of exception")
    952 
    953         # Re-run the test with out the exceptions
    954         self.http.force_exception_to_status_code = True
    955 
    956         (response, content) = self.http.request(uri, "GET")
    957         self.assertEqual(response.status, 500)
    958         self.assertTrue(response.reason.startswith("Content purported"))
    959 
    960     def testIndividualTimeout(self):
    961         uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
    962         http = httplib2.Http(timeout=1)
    963         http.force_exception_to_status_code = True
    964 
    965         (response, content) = http.request(uri)
    966         self.assertEqual(response.status, 408)
    967         self.assertTrue(response.reason.startswith("Request Timeout"))
    968         self.assertTrue(content.startswith(b"Request Timeout"))
    969 
    970     def testGetDeflate(self):
    971         # Test that we support deflate compression
    972         uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
    973         (response, content) = self.http.request(uri, "GET")
    974         self.assertEqual(response.status, 200)
    975         self.assertFalse("content-encoding" in response)
    976         self.assertEqual(
    977             int(response["content-length"]), len("This is the final destination.")
    978         )
    979         self.assertEqual(content, b"This is the final destination.")
    980 
    981     def testGetDeflateFailure(self):
    982         # Test that we raise a good exception when the deflate fails
    983         self.http.force_exception_to_status_code = False
    984 
    985         uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
    986         try:
    987             (response, content) = self.http.request(uri, "GET")
    988             self.fail("Should never reach here")
    989         except httplib2.FailedToDecompressContent:
    990             pass
    991         except Exception:
    992             self.fail("Threw wrong kind of exception")
    993 
    994         # Re-run the test with out the exceptions
    995         self.http.force_exception_to_status_code = True
    996 
    997         (response, content) = self.http.request(uri, "GET")
    998         self.assertEqual(response.status, 500)
    999         self.assertTrue(response.reason.startswith("Content purported"))
   1000 
   1001     def testGetDuplicateHeaders(self):
   1002         # Test that duplicate headers get concatenated via ','
   1003         uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
   1004         (response, content) = self.http.request(uri, "GET")
   1005         self.assertEqual(response.status, 200)
   1006         self.assertEqual(content, b"This is content\n")
   1007         self.assertEqual(
   1008             response["link"].split(",")[0],
   1009             '<http://bitworking.org>; rel="home"; title="BitWorking"',
   1010         )
   1011 
   1012     def testGetCacheControlNoCache(self):
   1013         # Test Cache-Control: no-cache on requests
   1014         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
   1015         (response, content) = self.http.request(
   1016             uri, "GET", headers={"accept-encoding": "identity"}
   1017         )
   1018         self.assertNotEqual(response["etag"], "")
   1019         (response, content) = self.http.request(
   1020             uri, "GET", headers={"accept-encoding": "identity"}
   1021         )
   1022         self.assertEqual(response.status, 200)
   1023         self.assertEqual(response.fromcache, True)
   1024 
   1025         (response, content) = self.http.request(
   1026             uri,
   1027             "GET",
   1028             headers={"accept-encoding": "identity", "Cache-Control": "no-cache"},
   1029         )
   1030         self.assertEqual(response.status, 200)
   1031         self.assertEqual(response.fromcache, False)
   1032 
   1033     def testGetCacheControlPragmaNoCache(self):
   1034         # Test Pragma: no-cache on requests
   1035         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
   1036         (response, content) = self.http.request(
   1037             uri, "GET", headers={"accept-encoding": "identity"}
   1038         )
   1039         self.assertNotEqual(response["etag"], "")
   1040         (response, content) = self.http.request(
   1041             uri, "GET", headers={"accept-encoding": "identity"}
   1042         )
   1043         self.assertEqual(response.status, 200)
   1044         self.assertEqual(response.fromcache, True)
   1045 
   1046         (response, content) = self.http.request(
   1047             uri, "GET", headers={"accept-encoding": "identity", "Pragma": "no-cache"}
   1048         )
   1049         self.assertEqual(response.status, 200)
   1050         self.assertEqual(response.fromcache, False)
   1051 
   1052     def testGetCacheControlNoStoreRequest(self):
   1053         # A no-store request means that the response should not be stored.
   1054         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
   1055 
   1056         (response, content) = self.http.request(
   1057             uri, "GET", headers={"Cache-Control": "no-store"}
   1058         )
   1059         self.assertEqual(response.status, 200)
   1060         self.assertEqual(response.fromcache, False)
   1061 
   1062         (response, content) = self.http.request(
   1063             uri, "GET", headers={"Cache-Control": "no-store"}
   1064         )
   1065         self.assertEqual(response.status, 200)
   1066         self.assertEqual(response.fromcache, False)
   1067 
   1068     def testGetCacheControlNoStoreResponse(self):
   1069         # A no-store response means that the response should not be stored.
   1070         uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
   1071 
   1072         (response, content) = self.http.request(uri, "GET")
   1073         self.assertEqual(response.status, 200)
   1074         self.assertEqual(response.fromcache, False)
   1075 
   1076         (response, content) = self.http.request(uri, "GET")
   1077         self.assertEqual(response.status, 200)
   1078         self.assertEqual(response.fromcache, False)
   1079 
   1080     def testGetCacheControlNoCacheNoStoreRequest(self):
   1081         # Test that a no-store, no-cache clears the entry from the cache
   1082         # even if it was cached previously.
   1083         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
   1084 
   1085         (response, content) = self.http.request(uri, "GET")
   1086         (response, content) = self.http.request(uri, "GET")
   1087         self.assertEqual(response.fromcache, True)
   1088         (response, content) = self.http.request(
   1089             uri, "GET", headers={"Cache-Control": "no-store, no-cache"}
   1090         )
   1091         (response, content) = self.http.request(
   1092             uri, "GET", headers={"Cache-Control": "no-store, no-cache"}
   1093         )
   1094         self.assertEqual(response.status, 200)
   1095         self.assertEqual(response.fromcache, False)
   1096 
   1097     def testUpdateInvalidatesCache(self):
   1098         # Test that calling PUT or DELETE on a
   1099         # URI that is cache invalidates that cache.
   1100         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
   1101 
   1102         (response, content) = self.http.request(uri, "GET")
   1103         (response, content) = self.http.request(uri, "GET")
   1104         self.assertEqual(response.fromcache, True)
   1105         (response, content) = self.http.request(uri, "DELETE")
   1106         self.assertEqual(response.status, 405)
   1107 
   1108         (response, content) = self.http.request(uri, "GET")
   1109         self.assertEqual(response.fromcache, False)
   1110 
   1111     def testUpdateUsesCachedETag(self):
   1112         # Test that we natively support http://www.w3.org/1999/04/Editing/
   1113         uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
   1114 
   1115         (response, content) = self.http.request(uri, "GET")
   1116         self.assertEqual(response.status, 200)
   1117         self.assertEqual(response.fromcache, False)
   1118         (response, content) = self.http.request(uri, "GET")
   1119         self.assertEqual(response.status, 200)
   1120         self.assertEqual(response.fromcache, True)
   1121         (response, content) = self.http.request(uri, "PUT", body="foo")
   1122         self.assertEqual(response.status, 200)
   1123         (response, content) = self.http.request(uri, "PUT", body="foo")
   1124         self.assertEqual(response.status, 412)
   1125 
   1126     def testUpdatePatchUsesCachedETag(self):
   1127         # Test that we natively support http://www.w3.org/1999/04/Editing/
   1128         uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
   1129 
   1130         (response, content) = self.http.request(uri, "GET")
   1131         self.assertEqual(response.status, 200)
   1132         self.assertEqual(response.fromcache, False)
   1133         (response, content) = self.http.request(uri, "GET")
   1134         self.assertEqual(response.status, 200)
   1135         self.assertEqual(response.fromcache, True)
   1136         (response, content) = self.http.request(uri, "PATCH", body="foo")
   1137         self.assertEqual(response.status, 200)
   1138         (response, content) = self.http.request(uri, "PATCH", body="foo")
   1139         self.assertEqual(response.status, 412)
   1140 
   1141     def testUpdateUsesCachedETagAndOCMethod(self):
   1142         # Test that we natively support http://www.w3.org/1999/04/Editing/
   1143         uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
   1144 
   1145         (response, content) = self.http.request(uri, "GET")
   1146         self.assertEqual(response.status, 200)
   1147         self.assertEqual(response.fromcache, False)
   1148         (response, content) = self.http.request(uri, "GET")
   1149         self.assertEqual(response.status, 200)
   1150         self.assertEqual(response.fromcache, True)
   1151         self.http.optimistic_concurrency_methods.append("DELETE")
   1152         (response, content) = self.http.request(uri, "DELETE")
   1153         self.assertEqual(response.status, 200)
   1154 
   1155     def testUpdateUsesCachedETagOverridden(self):
   1156         # Test that we natively support http://www.w3.org/1999/04/Editing/
   1157         uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
   1158 
   1159         (response, content) = self.http.request(uri, "GET")
   1160         self.assertEqual(response.status, 200)
   1161         self.assertEqual(response.fromcache, False)
   1162         (response, content) = self.http.request(uri, "GET")
   1163         self.assertEqual(response.status, 200)
   1164         self.assertEqual(response.fromcache, True)
   1165         (response, content) = self.http.request(
   1166             uri, "PUT", body="foo", headers={"if-match": "fred"}
   1167         )
   1168         self.assertEqual(response.status, 412)
   1169 
   1170     def testBasicAuth(self):
   1171         # Test Basic Authentication
   1172         uri = urllib.parse.urljoin(base, "basic/file.txt")
   1173         (response, content) = self.http.request(uri, "GET")
   1174         self.assertEqual(response.status, 401)
   1175 
   1176         uri = urllib.parse.urljoin(base, "basic/")
   1177         (response, content) = self.http.request(uri, "GET")
   1178         self.assertEqual(response.status, 401)
   1179 
   1180         self.http.add_credentials("joe", "password")
   1181         (response, content) = self.http.request(uri, "GET")
   1182         self.assertEqual(response.status, 200)
   1183 
   1184         uri = urllib.parse.urljoin(base, "basic/file.txt")
   1185         (response, content) = self.http.request(uri, "GET")
   1186         self.assertEqual(response.status, 200)
   1187 
   1188     def testBasicAuthWithDomain(self):
   1189         # Test Basic Authentication
   1190         uri = urllib.parse.urljoin(base, "basic/file.txt")
   1191         (response, content) = self.http.request(uri, "GET")
   1192         self.assertEqual(response.status, 401)
   1193 
   1194         uri = urllib.parse.urljoin(base, "basic/")
   1195         (response, content) = self.http.request(uri, "GET")
   1196         self.assertEqual(response.status, 401)
   1197 
   1198         self.http.add_credentials("joe", "password", "example.org")
   1199         (response, content) = self.http.request(uri, "GET")
   1200         self.assertEqual(response.status, 401)
   1201 
   1202         uri = urllib.parse.urljoin(base, "basic/file.txt")
   1203         (response, content) = self.http.request(uri, "GET")
   1204         self.assertEqual(response.status, 401)
   1205 
   1206         domain = urllib.parse.urlparse(base)[1]
   1207         self.http.add_credentials("joe", "password", domain)
   1208         (response, content) = self.http.request(uri, "GET")
   1209         self.assertEqual(response.status, 200)
   1210 
   1211         uri = urllib.parse.urljoin(base, "basic/file.txt")
   1212         (response, content) = self.http.request(uri, "GET")
   1213         self.assertEqual(response.status, 200)
   1214 
   1215     def testBasicAuthTwoDifferentCredentials(self):
   1216         # Test Basic Authentication with multiple sets of credentials
   1217         uri = urllib.parse.urljoin(base, "basic2/file.txt")
   1218         (response, content) = self.http.request(uri, "GET")
   1219         self.assertEqual(response.status, 401)
   1220 
   1221         uri = urllib.parse.urljoin(base, "basic2/")
   1222         (response, content) = self.http.request(uri, "GET")
   1223         self.assertEqual(response.status, 401)
   1224 
   1225         self.http.add_credentials("fred", "barney")
   1226         (response, content) = self.http.request(uri, "GET")
   1227         self.assertEqual(response.status, 200)
   1228 
   1229         uri = urllib.parse.urljoin(base, "basic2/file.txt")
   1230         (response, content) = self.http.request(uri, "GET")
   1231         self.assertEqual(response.status, 200)
   1232 
   1233     def testBasicAuthNested(self):
   1234         # Test Basic Authentication with resources
   1235         # that are nested
   1236         uri = urllib.parse.urljoin(base, "basic-nested/")
   1237         (response, content) = self.http.request(uri, "GET")
   1238         self.assertEqual(response.status, 401)
   1239 
   1240         uri = urllib.parse.urljoin(base, "basic-nested/subdir")
   1241         (response, content) = self.http.request(uri, "GET")
   1242         self.assertEqual(response.status, 401)
   1243 
   1244         # Now add in credentials one at a time and test.
   1245         self.http.add_credentials("joe", "password")
   1246 
   1247         uri = urllib.parse.urljoin(base, "basic-nested/")
   1248         (response, content) = self.http.request(uri, "GET")
   1249         self.assertEqual(response.status, 200)
   1250 
   1251         uri = urllib.parse.urljoin(base, "basic-nested/subdir")
   1252         (response, content) = self.http.request(uri, "GET")
   1253         self.assertEqual(response.status, 401)
   1254 
   1255         self.http.add_credentials("fred", "barney")
   1256 
   1257         uri = urllib.parse.urljoin(base, "basic-nested/")
   1258         (response, content) = self.http.request(uri, "GET")
   1259         self.assertEqual(response.status, 200)
   1260 
   1261         uri = urllib.parse.urljoin(base, "basic-nested/subdir")
   1262         (response, content) = self.http.request(uri, "GET")
   1263         self.assertEqual(response.status, 200)
   1264 
   1265     def testDigestAuth(self):
   1266         # Test that we support Digest Authentication
   1267         uri = urllib.parse.urljoin(base, "digest/")
   1268         (response, content) = self.http.request(uri, "GET")
   1269         self.assertEqual(response.status, 401)
   1270 
   1271         self.http.add_credentials("joe", "password")
   1272         (response, content) = self.http.request(uri, "GET")
   1273         self.assertEqual(response.status, 200)
   1274 
   1275         uri = urllib.parse.urljoin(base, "digest/file.txt")
   1276         (response, content) = self.http.request(uri, "GET")
   1277 
   1278     def testDigestAuthNextNonceAndNC(self):
   1279         # Test that if the server sets nextnonce that we reset
   1280         # the nonce count back to 1
   1281         uri = urllib.parse.urljoin(base, "digest/file.txt")
   1282         self.http.add_credentials("joe", "password")
   1283         (response, content) = self.http.request(
   1284             uri, "GET", headers={"cache-control": "no-cache"}
   1285         )
   1286         info = httplib2._parse_www_authenticate(response, "authentication-info")
   1287         self.assertEqual(response.status, 200)
   1288         (response, content) = self.http.request(
   1289             uri, "GET", headers={"cache-control": "no-cache"}
   1290         )
   1291         info2 = httplib2._parse_www_authenticate(response, "authentication-info")
   1292         self.assertEqual(response.status, 200)
   1293 
   1294         if "nextnonce" in info:
   1295             self.assertEqual(info2["nc"], 1)
   1296 
   1297     def testDigestAuthStale(self):
   1298         # Test that we can handle a nonce becoming stale
   1299         uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
   1300         self.http.add_credentials("joe", "password")
   1301         (response, content) = self.http.request(
   1302             uri, "GET", headers={"cache-control": "no-cache"}
   1303         )
   1304         info = httplib2._parse_www_authenticate(response, "authentication-info")
   1305         self.assertEqual(response.status, 200)
   1306 
   1307         time.sleep(3)
   1308         # Sleep long enough that the nonce becomes stale
   1309 
   1310         (response, content) = self.http.request(
   1311             uri, "GET", headers={"cache-control": "no-cache"}
   1312         )
   1313         self.assertFalse(response.fromcache)
   1314         self.assertTrue(response._stale_digest)
   1315         info3 = httplib2._parse_www_authenticate(response, "authentication-info")
   1316         self.assertEqual(response.status, 200)
   1317 
   1318     def reflector(self, content):
   1319         return dict(
   1320             [
   1321                 tuple(x.split("=", 1))
   1322                 for x in content.decode("utf-8").strip().split("\n")
   1323             ]
   1324         )
   1325 
   1326     def testReflector(self):
   1327         uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
   1328         (response, content) = self.http.request(uri, "GET")
   1329         d = self.reflector(content)
   1330         self.assertTrue("HTTP_USER_AGENT" in d)
   1331 
   1332     def testConnectionClose(self):
   1333         uri = "http://www.google.com/"
   1334         (response, content) = self.http.request(uri, "GET")
   1335         for c in self.http.connections.values():
   1336             self.assertNotEqual(None, c.sock)
   1337         (response, content) = self.http.request(
   1338             uri, "GET", headers={"connection": "close"}
   1339         )
   1340         for c in self.http.connections.values():
   1341             self.assertEqual(None, c.sock)
   1342 
   1343     def testPickleHttp(self):
   1344         pickled_http = pickle.dumps(self.http)
   1345         new_http = pickle.loads(pickled_http)
   1346 
   1347         self.assertEqual(
   1348             sorted(new_http.__dict__.keys()), sorted(self.http.__dict__.keys())
   1349         )
   1350         for key in new_http.__dict__:
   1351             if key in ("certificates", "credentials"):
   1352                 self.assertEqual(
   1353                     new_http.__dict__[key].credentials,
   1354                     self.http.__dict__[key].credentials,
   1355                 )
   1356             elif key == "cache":
   1357                 self.assertEqual(
   1358                     new_http.__dict__[key].cache, self.http.__dict__[key].cache
   1359                 )
   1360             else:
   1361                 self.assertEqual(new_http.__dict__[key], self.http.__dict__[key])
   1362 
   1363     def testPickleHttpWithConnection(self):
   1364         self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
   1365         pickled_http = pickle.dumps(self.http)
   1366         new_http = pickle.loads(pickled_http)
   1367 
   1368         self.assertEqual(list(self.http.connections.keys()), ["http:bitworking.org"])
   1369         self.assertEqual(new_http.connections, {})
   1370 
   1371     def testPickleCustomRequestHttp(self):
   1372         def dummy_request(*args, **kwargs):
   1373             return new_request(*args, **kwargs)
   1374 
   1375         dummy_request.dummy_attr = "dummy_value"
   1376 
   1377         self.http.request = dummy_request
   1378         pickled_http = pickle.dumps(self.http)
   1379         self.assertFalse(b"S'request'" in pickled_http)
   1380 
   1381 
   1382 try:
   1383     import memcache
   1384 
   1385     class HttpTestMemCached(HttpTest):
   1386         def setUp(self):
   1387             self.cache = memcache.Client(["127.0.0.1:11211"], debug=0)
   1388             # self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
   1389             self.http = httplib2.Http(self.cache)
   1390             self.cache.flush_all()
   1391             # Not exactly sure why the sleep is needed here, but
   1392             # if not present then some unit tests that rely on caching
   1393             # fail. Memcached seems to lose some sets immediately
   1394             # after a flush_all if the set is to a value that
   1395             # was previously cached. (Maybe the flush is handled async?)
   1396             time.sleep(1)
   1397             self.http.clear_credentials()
   1398 
   1399 
   1400 except:
   1401     pass
   1402 
   1403 # ------------------------------------------------------------------------
   1404 
   1405 
   1406 class HttpPrivateTest(unittest.TestCase):
   1407     def testParseCacheControl(self):
   1408         # Test that we can parse the Cache-Control header
   1409         self.assertEqual({}, httplib2._parse_cache_control({}))
   1410         self.assertEqual(
   1411             {"no-cache": 1},
   1412             httplib2._parse_cache_control({"cache-control": " no-cache"}),
   1413         )
   1414         cc = httplib2._parse_cache_control(
   1415             {"cache-control": " no-cache, max-age = 7200"}
   1416         )
   1417         self.assertEqual(cc["no-cache"], 1)
   1418         self.assertEqual(cc["max-age"], "7200")
   1419         cc = httplib2._parse_cache_control({"cache-control": " , "})
   1420         self.assertEqual(cc[""], 1)
   1421 
   1422         try:
   1423             cc = httplib2._parse_cache_control(
   1424                 {"cache-control": "Max-age=3600;post-check=1800,pre-check=3600"}
   1425             )
   1426             self.assertTrue("max-age" in cc)
   1427         except:
   1428             self.fail("Should not throw exception")
   1429 
   1430     def testNormalizeHeaders(self):
   1431         # Test that we normalize headers to lowercase
   1432         h = httplib2._normalize_headers({"Cache-Control": "no-cache", "Other": "Stuff"})
   1433         self.assertTrue("cache-control" in h)
   1434         self.assertTrue("other" in h)
   1435         self.assertEqual("Stuff", h["other"])
   1436 
   1437     def testConvertByteStr(self):
   1438         with self.assertRaises(TypeError):
   1439             httplib2._convert_byte_str(4)
   1440         self.assertEqual("Hello World", httplib2._convert_byte_str(b"Hello World"))
   1441         self.assertEqual("Bye World", httplib2._convert_byte_str("Bye World"))
   1442 
   1443     def testExpirationModelTransparent(self):
   1444         # Test that no-cache makes our request TRANSPARENT
   1445         response_headers = {"cache-control": "max-age=7200"}
   1446         request_headers = {"cache-control": "no-cache"}
   1447         self.assertEqual(
   1448             "TRANSPARENT",
   1449             httplib2._entry_disposition(response_headers, request_headers),
   1450         )
   1451 
   1452     def testMaxAgeNonNumeric(self):
   1453         # Test that no-cache makes our request TRANSPARENT
   1454         response_headers = {"cache-control": "max-age=fred, min-fresh=barney"}
   1455         request_headers = {}
   1456         self.assertEqual(
   1457             "STALE", httplib2._entry_disposition(response_headers, request_headers)
   1458         )
   1459 
   1460     def testExpirationModelNoCacheResponse(self):
   1461         # The date and expires point to an entry that should be
   1462         # FRESH, but the no-cache over-rides that.
   1463         now = time.time()
   1464         response_headers = {
   1465             "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
   1466             "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)),
   1467             "cache-control": "no-cache",
   1468         }
   1469         request_headers = {}
   1470         self.assertEqual(
   1471             "STALE", httplib2._entry_disposition(response_headers, request_headers)
   1472         )
   1473 
   1474     def testExpirationModelStaleRequestMustReval(self):
   1475         # must-revalidate forces STALE
   1476         self.assertEqual(
   1477             "STALE",
   1478             httplib2._entry_disposition({}, {"cache-control": "must-revalidate"}),
   1479         )
   1480 
   1481     def testExpirationModelStaleResponseMustReval(self):
   1482         # must-revalidate forces STALE
   1483         self.assertEqual(
   1484             "STALE",
   1485             httplib2._entry_disposition({"cache-control": "must-revalidate"}, {}),
   1486         )
   1487 
   1488     def testExpirationModelFresh(self):
   1489         response_headers = {
   1490             "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
   1491             "cache-control": "max-age=2",
   1492         }
   1493         request_headers = {}
   1494         self.assertEqual(
   1495             "FRESH", httplib2._entry_disposition(response_headers, request_headers)
   1496         )
   1497         time.sleep(3)
   1498         self.assertEqual(
   1499             "STALE", httplib2._entry_disposition(response_headers, request_headers)
   1500         )
   1501 
   1502     def testExpirationMaxAge0(self):
   1503         response_headers = {
   1504             "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
   1505             "cache-control": "max-age=0",
   1506         }
   1507         request_headers = {}
   1508         self.assertEqual(
   1509             "STALE", httplib2._entry_disposition(response_headers, request_headers)
   1510         )
   1511 
   1512     def testExpirationModelDateAndExpires(self):
   1513         now = time.time()
   1514         response_headers = {
   1515             "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
   1516             "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)),
   1517         }
   1518         request_headers = {}
   1519         self.assertEqual(
   1520             "FRESH", httplib2._entry_disposition(response_headers, request_headers)
   1521         )
   1522         time.sleep(3)
   1523         self.assertEqual(
   1524             "STALE", httplib2._entry_disposition(response_headers, request_headers)
   1525         )
   1526 
   1527     def testExpiresZero(self):
   1528         now = time.time()
   1529         response_headers = {
   1530             "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
   1531             "expires": "0",
   1532         }
   1533         request_headers = {}
   1534         self.assertEqual(
   1535             "STALE", httplib2._entry_disposition(response_headers, request_headers)
   1536         )
   1537 
   1538     def testExpirationModelDateOnly(self):
   1539         now = time.time()
   1540         response_headers = {
   1541             "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 3))
   1542         }
   1543         request_headers = {}
   1544         self.assertEqual(
   1545             "STALE", httplib2._entry_disposition(response_headers, request_headers)
   1546         )
   1547 
   1548     def testExpirationModelOnlyIfCached(self):
   1549         response_headers = {}
   1550         request_headers = {"cache-control": "only-if-cached"}
   1551         self.assertEqual(
   1552             "FRESH", httplib2._entry_disposition(response_headers, request_headers)
   1553         )
   1554 
   1555     def testExpirationModelMaxAgeBoth(self):
   1556         now = time.time()
   1557         response_headers = {
   1558             "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
   1559             "cache-control": "max-age=2",
   1560         }
   1561         request_headers = {"cache-control": "max-age=0"}
   1562         self.assertEqual(
   1563             "STALE", httplib2._entry_disposition(response_headers, request_headers)
   1564         )
   1565 
   1566     def testExpirationModelDateAndExpiresMinFresh1(self):
   1567         now = time.time()
   1568         response_headers = {
   1569             "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
   1570             "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)),
   1571         }
   1572         request_headers = {"cache-control": "min-fresh=2"}
   1573         self.assertEqual(
   1574             "STALE", httplib2._entry_disposition(response_headers, request_headers)
   1575         )
   1576 
   1577     def testExpirationModelDateAndExpiresMinFresh2(self):
   1578         now = time.time()
   1579         response_headers = {
   1580             "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
   1581             "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)),
   1582         }
   1583         request_headers = {"cache-control": "min-fresh=2"}
   1584         self.assertEqual(
   1585             "FRESH", httplib2._entry_disposition(response_headers, request_headers)
   1586         )
   1587 
   1588     def testParseWWWAuthenticateEmpty(self):
   1589         res = httplib2._parse_www_authenticate({})
   1590         self.assertEqual(len(list(res.keys())), 0)
   1591 
   1592     def testParseWWWAuthenticate(self):
   1593         # different uses of spaces around commas
   1594         res = httplib2._parse_www_authenticate(
   1595             {
   1596                 "www-authenticate": 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'
   1597             }
   1598         )
   1599         self.assertEqual(len(list(res.keys())), 1)
   1600         self.assertEqual(len(list(res["test"].keys())), 5)
   1601 
   1602         # tokens with non-alphanum
   1603         res = httplib2._parse_www_authenticate(
   1604             {"www-authenticate": 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'}
   1605         )
   1606         self.assertEqual(len(list(res.keys())), 1)
   1607         self.assertEqual(len(list(res["t*!%#st"].keys())), 2)
   1608 
   1609         # quoted string with quoted pairs
   1610         res = httplib2._parse_www_authenticate(
   1611             {"www-authenticate": 'Test realm="a \\"test\\" realm"'}
   1612         )
   1613         self.assertEqual(len(list(res.keys())), 1)
   1614         self.assertEqual(res["test"]["realm"], 'a "test" realm')
   1615 
   1616     def testParseWWWAuthenticateStrict(self):
   1617         httplib2.USE_WWW_AUTH_STRICT_PARSING = 1
   1618         self.testParseWWWAuthenticate()
   1619         httplib2.USE_WWW_AUTH_STRICT_PARSING = 0
   1620 
   1621     def testParseWWWAuthenticateBasic(self):
   1622         res = httplib2._parse_www_authenticate({"www-authenticate": 'Basic realm="me"'})
   1623         basic = res["basic"]
   1624         self.assertEqual("me", basic["realm"])
   1625 
   1626         res = httplib2._parse_www_authenticate(
   1627             {"www-authenticate": 'Basic realm="me", algorithm="MD5"'}
   1628         )
   1629         basic = res["basic"]
   1630         self.assertEqual("me", basic["realm"])
   1631         self.assertEqual("MD5", basic["algorithm"])
   1632 
   1633         res = httplib2._parse_www_authenticate(
   1634             {"www-authenticate": 'Basic realm="me", algorithm=MD5'}
   1635         )
   1636         basic = res["basic"]
   1637         self.assertEqual("me", basic["realm"])
   1638         self.assertEqual("MD5", basic["algorithm"])
   1639 
   1640     def testParseWWWAuthenticateBasic2(self):
   1641         res = httplib2._parse_www_authenticate(
   1642             {"www-authenticate": 'Basic realm="me",other="fred" '}
   1643         )
   1644         basic = res["basic"]
   1645         self.assertEqual("me", basic["realm"])
   1646         self.assertEqual("fred", basic["other"])
   1647 
   1648     def testParseWWWAuthenticateBasic3(self):
   1649         res = httplib2._parse_www_authenticate(
   1650             {"www-authenticate": 'Basic REAlm="me" '}
   1651         )
   1652         basic = res["basic"]
   1653         self.assertEqual("me", basic["realm"])
   1654 
   1655     def testParseWWWAuthenticateDigest(self):
   1656         res = httplib2._parse_www_authenticate(
   1657             {
   1658                 "www-authenticate": 'Digest realm="testrealm (at] host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'
   1659             }
   1660         )
   1661         digest = res["digest"]
   1662         self.assertEqual("testrealm (at] host.com", digest["realm"])
   1663         self.assertEqual("auth,auth-int", digest["qop"])
   1664 
   1665     def testParseWWWAuthenticateMultiple(self):
   1666         res = httplib2._parse_www_authenticate(
   1667             {
   1668                 "www-authenticate": 'Digest realm="testrealm (at] host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '
   1669             }
   1670         )
   1671         digest = res["digest"]
   1672         self.assertEqual("testrealm (at] host.com", digest["realm"])
   1673         self.assertEqual("auth,auth-int", digest["qop"])
   1674         self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
   1675         self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
   1676         basic = res["basic"]
   1677         self.assertEqual("me", basic["realm"])
   1678 
   1679     def testParseWWWAuthenticateMultiple2(self):
   1680         # Handle an added comma between challenges, which might get thrown in if the challenges were
   1681         # originally sent in separate www-authenticate headers.
   1682         res = httplib2._parse_www_authenticate(
   1683             {
   1684                 "www-authenticate": 'Digest realm="testrealm (at] host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '
   1685             }
   1686         )
   1687         digest = res["digest"]
   1688         self.assertEqual("testrealm (at] host.com", digest["realm"])
   1689         self.assertEqual("auth,auth-int", digest["qop"])
   1690         self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
   1691         self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
   1692         basic = res["basic"]
   1693         self.assertEqual("me", basic["realm"])
   1694 
   1695     def testParseWWWAuthenticateMultiple3(self):
   1696         # Handle an added comma between challenges, which might get thrown in if the challenges were
   1697         # originally sent in separate www-authenticate headers.
   1698         res = httplib2._parse_www_authenticate(
   1699             {
   1700                 "www-authenticate": 'Digest realm="testrealm (at] host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'
   1701             }
   1702         )
   1703         digest = res["digest"]
   1704         self.assertEqual("testrealm (at] host.com", digest["realm"])
   1705         self.assertEqual("auth,auth-int", digest["qop"])
   1706         self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
   1707         self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
   1708         basic = res["basic"]
   1709         self.assertEqual("me", basic["realm"])
   1710         wsse = res["wsse"]
   1711         self.assertEqual("foo", wsse["realm"])
   1712         self.assertEqual("UsernameToken", wsse["profile"])
   1713 
   1714     def testParseWWWAuthenticateMultiple4(self):
   1715         res = httplib2._parse_www_authenticate(
   1716             {
   1717                 "www-authenticate": 'Digest realm="test-real.m (at] host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'
   1718             }
   1719         )
   1720         digest = res["digest"]
   1721         self.assertEqual("test-real.m (at] host.com", digest["realm"])
   1722         self.assertEqual("\tauth,auth-int", digest["qop"])
   1723         self.assertEqual("(*)&^&$%#", digest["nonce"])
   1724 
   1725     def testParseWWWAuthenticateMoreQuoteCombos(self):
   1726         res = httplib2._parse_www_authenticate(
   1727             {
   1728                 "www-authenticate": 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
   1729             }
   1730         )
   1731         digest = res["digest"]
   1732         self.assertEqual("myrealm", digest["realm"])
   1733 
   1734     def testParseWWWAuthenticateMalformed(self):
   1735         try:
   1736             res = httplib2._parse_www_authenticate(
   1737                 {
   1738                     "www-authenticate": 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'
   1739                 }
   1740             )
   1741             self.fail("should raise an exception")
   1742         except httplib2.MalformedHeader:
   1743             pass
   1744 
   1745     def testDigestObject(self):
   1746         credentials = ("joe", "password")
   1747         host = None
   1748         request_uri = "/projects/httplib2/test/digest/"
   1749         headers = {}
   1750         response = {
   1751             "www-authenticate": 'Digest realm="myrealm", '
   1752             'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", '
   1753             'algorithm=MD5, qop="auth"'
   1754         }
   1755         content = b""
   1756 
   1757         d = httplib2.DigestAuthentication(
   1758             credentials, host, request_uri, headers, response, content, None
   1759         )
   1760         d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
   1761         our_request = "authorization: %s" % headers["authorization"]
   1762         working_request = (
   1763             'authorization: Digest username="joe", realm="myrealm", '
   1764             'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
   1765             ' uri="/projects/httplib2/test/digest/", algorithm=MD5, '
   1766             'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, '
   1767             'nc=00000001, cnonce="33033375ec278a46"'
   1768         )
   1769         self.assertEqual(our_request, working_request)
   1770 
   1771     def testDigestObjectWithOpaque(self):
   1772         credentials = ("joe", "password")
   1773         host = None
   1774         request_uri = "/projects/httplib2/test/digest/"
   1775         headers = {}
   1776         response = {
   1777             "www-authenticate": 'Digest realm="myrealm", '
   1778             'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", '
   1779             'algorithm=MD5, qop="auth", opaque="atestopaque"'
   1780         }
   1781         content = ""
   1782 
   1783         d = httplib2.DigestAuthentication(
   1784             credentials, host, request_uri, headers, response, content, None
   1785         )
   1786         d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
   1787         our_request = "authorization: %s" % headers["authorization"]
   1788         working_request = (
   1789             'authorization: Digest username="joe", realm="myrealm", '
   1790             'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
   1791             ' uri="/projects/httplib2/test/digest/", algorithm=MD5, '
   1792             'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, '
   1793             'nc=00000001, cnonce="33033375ec278a46", '
   1794             'opaque="atestopaque"'
   1795         )
   1796         self.assertEqual(our_request, working_request)
   1797 
   1798     def testDigestObjectStale(self):
   1799         credentials = ("joe", "password")
   1800         host = None
   1801         request_uri = "/projects/httplib2/test/digest/"
   1802         headers = {}
   1803         response = httplib2.Response({})
   1804         response["www-authenticate"] = (
   1805             'Digest realm="myrealm", '
   1806             'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
   1807             ' algorithm=MD5, qop="auth", stale=true'
   1808         )
   1809         response.status = 401
   1810         content = b""
   1811         d = httplib2.DigestAuthentication(
   1812             credentials, host, request_uri, headers, response, content, None
   1813         )
   1814         # Returns true to force a retry
   1815         self.assertTrue(d.response(response, content))
   1816 
   1817     def testDigestObjectAuthInfo(self):
   1818         credentials = ("joe", "password")
   1819         host = None
   1820         request_uri = "/projects/httplib2/test/digest/"
   1821         headers = {}
   1822         response = httplib2.Response({})
   1823         response["www-authenticate"] = (
   1824             'Digest realm="myrealm", '
   1825             'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
   1826             ' algorithm=MD5, qop="auth", stale=true'
   1827         )
   1828         response["authentication-info"] = 'nextnonce="fred"'
   1829         content = b""
   1830         d = httplib2.DigestAuthentication(
   1831             credentials, host, request_uri, headers, response, content, None
   1832         )
   1833         # Returns true to force a retry
   1834         self.assertFalse(d.response(response, content))
   1835         self.assertEqual("fred", d.challenge["nonce"])
   1836         self.assertEqual(1, d.challenge["nc"])
   1837 
   1838     def testWsseAlgorithm(self):
   1839         digest = httplib2._wsse_username_token(
   1840             "d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm"
   1841         )
   1842         expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
   1843         self.assertEqual(expected, digest)
   1844 
   1845     def testEnd2End(self):
   1846         # one end to end header
   1847         response = {"content-type": "application/atom+xml", "te": "deflate"}
   1848         end2end = httplib2._get_end2end_headers(response)
   1849         self.assertTrue("content-type" in end2end)
   1850         self.assertTrue("te" not in end2end)
   1851         self.assertTrue("connection" not in end2end)
   1852 
   1853         # one end to end header that gets eliminated
   1854         response = {
   1855             "connection": "content-type",
   1856             "content-type": "application/atom+xml",
   1857             "te": "deflate",
   1858         }
   1859         end2end = httplib2._get_end2end_headers(response)
   1860         self.assertTrue("content-type" not in end2end)
   1861         self.assertTrue("te" not in end2end)
   1862         self.assertTrue("connection" not in end2end)
   1863 
   1864         # Degenerate case of no headers
   1865         response = {}
   1866         end2end = httplib2._get_end2end_headers(response)
   1867         self.assertEqual(0, len(end2end))
   1868 
   1869         # Degenerate case of connection referrring to a header not passed in
   1870         response = {"connection": "content-type"}
   1871         end2end = httplib2._get_end2end_headers(response)
   1872         self.assertEqual(0, len(end2end))
   1873 
   1874 
   1875 class TestProxyInfo(unittest.TestCase):
   1876     def setUp(self):
   1877         self.orig_env = dict(os.environ)
   1878 
   1879     def tearDown(self):
   1880         os.environ.clear()
   1881         os.environ.update(self.orig_env)
   1882 
   1883     def test_from_url(self):
   1884         pi = httplib2.proxy_info_from_url("http://myproxy.example.com")
   1885         self.assertEqual(pi.proxy_host, "myproxy.example.com")
   1886         self.assertEqual(pi.proxy_port, 80)
   1887         self.assertEqual(pi.proxy_user, None)
   1888 
   1889     def test_from_url_ident(self):
   1890         pi = httplib2.proxy_info_from_url("http://zoidberg:fish@someproxy:99")
   1891         self.assertEqual(pi.proxy_host, "someproxy")
   1892         self.assertEqual(pi.proxy_port, 99)
   1893         self.assertEqual(pi.proxy_user, "zoidberg")
   1894         self.assertEqual(pi.proxy_pass, "fish")
   1895 
   1896     def test_from_env(self):
   1897         os.environ["http_proxy"] = "http://myproxy.example.com:8080"
   1898         pi = httplib2.proxy_info_from_environment()
   1899         self.assertEqual(pi.proxy_host, "myproxy.example.com")
   1900         self.assertEqual(pi.proxy_port, 8080)
   1901 
   1902     def test_from_env_no_proxy(self):
   1903         os.environ["http_proxy"] = "http://myproxy.example.com:80"
   1904         os.environ["https_proxy"] = "http://myproxy.example.com:81"
   1905         pi = httplib2.proxy_info_from_environment("https")
   1906         self.assertEqual(pi.proxy_host, "myproxy.example.com")
   1907         self.assertEqual(pi.proxy_port, 81)
   1908 
   1909     def test_from_env_none(self):
   1910         os.environ.clear()
   1911         pi = httplib2.proxy_info_from_environment()
   1912         self.assertEqual(pi, None)
   1913 
   1914     def test_proxy_headers(self):
   1915         headers = {"key0": "val0", "key1": "val1"}
   1916         pi = httplib2.ProxyInfo(
   1917             httplib2.socks.PROXY_TYPE_HTTP, "localhost", 1234, proxy_headers=headers
   1918         )
   1919         self.assertEqual(pi.proxy_headers, headers)
   1920 
   1921     # regression: ensure that httplib2.HTTPConnectionWithTimeout initializes when proxy_info is not supplied
   1922     def test_proxy_init(self):
   1923         connection = httplib2.HTTPConnectionWithTimeout("www.google.com", 80)
   1924         connection.request("GET", "/")
   1925         connection.close()
   1926 
   1927 
   1928 if __name__ == "__main__":
   1929     unittest.main()
   1930