Home | History | Annotate | Download | only in test
      1 import unittest
      2 from test import test_support
      3 from test import test_urllib
      4 
      5 import os
      6 import socket
      7 import StringIO
      8 
      9 import urllib2
     10 from urllib2 import Request, OpenerDirector, AbstractDigestAuthHandler
     11 import httplib
     12 
     13 try:
     14     import ssl
     15 except ImportError:
     16     ssl = None
     17 
     18 # XXX
     19 # Request
     20 # CacheFTPHandler (hard to write)
     21 # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
     22 
     23 class TrivialTests(unittest.TestCase):
     24     def test_trivial(self):
     25         # A couple trivial tests
     26 
     27         self.assertRaises(ValueError, urllib2.urlopen, 'bogus url')
     28 
     29         # XXX Name hacking to get this to work on Windows.
     30         fname = os.path.abspath(urllib2.__file__).replace(os.sep, '/')
     31 
     32         # And more hacking to get it to work on MacOS. This assumes
     33         # urllib.pathname2url works, unfortunately...
     34         if os.name == 'riscos':
     35             import string
     36             fname = os.expand(fname)
     37             fname = fname.translate(string.maketrans("/.", "./"))
     38 
     39         if os.name == 'nt':
     40             file_url = "file:///%s" % fname
     41         else:
     42             file_url = "file://%s" % fname
     43 
     44         f = urllib2.urlopen(file_url)
     45 
     46         buf = f.read()
     47         f.close()
     48 
     49     def test_parse_http_list(self):
     50         tests = [('a,b,c', ['a', 'b', 'c']),
     51                  ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),
     52                  ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),
     53                  ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]
     54         for string, list in tests:
     55             self.assertEqual(urllib2.parse_http_list(string), list)
     56 
     57     @unittest.skipUnless(ssl, "ssl module required")
     58     def test_cafile_and_context(self):
     59         context = ssl.create_default_context()
     60         with self.assertRaises(ValueError):
     61             urllib2.urlopen(
     62                 "https://localhost", cafile="/nonexistent/path", context=context
     63             )
     64 
     65 
     66 def test_request_headers_dict():
     67     """
     68     The Request.headers dictionary is not a documented interface.  It should
     69     stay that way, because the complete set of headers are only accessible
     70     through the .get_header(), .has_header(), .header_items() interface.
     71     However, .headers pre-dates those methods, and so real code will be using
     72     the dictionary.
     73 
     74     The introduction in 2.4 of those methods was a mistake for the same reason:
     75     code that previously saw all (urllib2 user)-provided headers in .headers
     76     now sees only a subset (and the function interface is ugly and incomplete).
     77     A better change would have been to replace .headers dict with a dict
     78     subclass (or UserDict.DictMixin instance?)  that preserved the .headers
     79     interface and also provided access to the "unredirected" headers.  It's
     80     probably too late to fix that, though.
     81 
     82 
     83     Check .capitalize() case normalization:
     84 
     85     >>> url = "http://example.com"
     86     >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"]
     87     'blah'
     88     >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"]
     89     'blah'
     90 
     91     Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError,
     92     but that could be changed in future.
     93 
     94     """
     95 
     96 def test_request_headers_methods():
     97     """
     98     Note the case normalization of header names here, to .capitalize()-case.
     99     This should be preserved for backwards-compatibility.  (In the HTTP case,
    100     normalization to .title()-case is done by urllib2 before sending headers to
    101     httplib).
    102 
    103     >>> url = "http://example.com"
    104     >>> r = Request(url, headers={"Spam-eggs": "blah"})
    105     >>> r.has_header("Spam-eggs")
    106     True
    107     >>> r.header_items()
    108     [('Spam-eggs', 'blah')]
    109     >>> r.add_header("Foo-Bar", "baz")
    110     >>> items = r.header_items()
    111     >>> items.sort()
    112     >>> items
    113     [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]
    114 
    115     Note that e.g. r.has_header("spam-EggS") is currently False, and
    116     r.get_header("spam-EggS") returns None, but that could be changed in
    117     future.
    118 
    119     >>> r.has_header("Not-there")
    120     False
    121     >>> print r.get_header("Not-there")
    122     None
    123     >>> r.get_header("Not-there", "default")
    124     'default'
    125 
    126     """
    127 
    128 
    129 def test_password_manager(self):
    130     """
    131     >>> mgr = urllib2.HTTPPasswordMgr()
    132     >>> add = mgr.add_password
    133     >>> add("Some Realm", "http://example.com/", "joe", "password")
    134     >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
    135     >>> add("c", "http://example.com/foo", "foo", "ni")
    136     >>> add("c", "http://example.com/bar", "bar", "nini")
    137     >>> add("b", "http://example.com/", "first", "blah")
    138     >>> add("b", "http://example.com/", "second", "spam")
    139     >>> add("a", "http://example.com", "1", "a")
    140     >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
    141     >>> add("Some Realm", "d.example.com", "4", "d")
    142     >>> add("Some Realm", "e.example.com:3128", "5", "e")
    143 
    144     >>> mgr.find_user_password("Some Realm", "example.com")
    145     ('joe', 'password')
    146     >>> mgr.find_user_password("Some Realm", "http://example.com")
    147     ('joe', 'password')
    148     >>> mgr.find_user_password("Some Realm", "http://example.com/")
    149     ('joe', 'password')
    150     >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
    151     ('joe', 'password')
    152     >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
    153     ('joe', 'password')
    154     >>> mgr.find_user_password("c", "http://example.com/foo")
    155     ('foo', 'ni')
    156     >>> mgr.find_user_password("c", "http://example.com/bar")
    157     ('bar', 'nini')
    158 
    159     Actually, this is really undefined ATM
    160 ##     Currently, we use the highest-level path where more than one match:
    161 
    162 ##     >>> mgr.find_user_password("Some Realm", "http://example.com/ni")
    163 ##     ('joe', 'password')
    164 
    165     Use latest add_password() in case of conflict:
    166 
    167     >>> mgr.find_user_password("b", "http://example.com/")
    168     ('second', 'spam')
    169 
    170     No special relationship between a.example.com and example.com:
    171 
    172     >>> mgr.find_user_password("a", "http://example.com/")
    173     ('1', 'a')
    174     >>> mgr.find_user_password("a", "http://a.example.com/")
    175     (None, None)
    176 
    177     Ports:
    178 
    179     >>> mgr.find_user_password("Some Realm", "c.example.com")
    180     (None, None)
    181     >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
    182     ('3', 'c')
    183     >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
    184     ('3', 'c')
    185     >>> mgr.find_user_password("Some Realm", "d.example.com")
    186     ('4', 'd')
    187     >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
    188     ('5', 'e')
    189 
    190     """
    191     pass
    192 
    193 
    194 def test_password_manager_default_port(self):
    195     """
    196     >>> mgr = urllib2.HTTPPasswordMgr()
    197     >>> add = mgr.add_password
    198 
    199     The point to note here is that we can't guess the default port if there's
    200     no scheme.  This applies to both add_password and find_user_password.
    201 
    202     >>> add("f", "http://g.example.com:80", "10", "j")
    203     >>> add("g", "http://h.example.com", "11", "k")
    204     >>> add("h", "i.example.com:80", "12", "l")
    205     >>> add("i", "j.example.com", "13", "m")
    206     >>> mgr.find_user_password("f", "g.example.com:100")
    207     (None, None)
    208     >>> mgr.find_user_password("f", "g.example.com:80")
    209     ('10', 'j')
    210     >>> mgr.find_user_password("f", "g.example.com")
    211     (None, None)
    212     >>> mgr.find_user_password("f", "http://g.example.com:100")
    213     (None, None)
    214     >>> mgr.find_user_password("f", "http://g.example.com:80")
    215     ('10', 'j')
    216     >>> mgr.find_user_password("f", "http://g.example.com")
    217     ('10', 'j')
    218     >>> mgr.find_user_password("g", "h.example.com")
    219     ('11', 'k')
    220     >>> mgr.find_user_password("g", "h.example.com:80")
    221     ('11', 'k')
    222     >>> mgr.find_user_password("g", "http://h.example.com:80")
    223     ('11', 'k')
    224     >>> mgr.find_user_password("h", "i.example.com")
    225     (None, None)
    226     >>> mgr.find_user_password("h", "i.example.com:80")
    227     ('12', 'l')
    228     >>> mgr.find_user_password("h", "http://i.example.com:80")
    229     ('12', 'l')
    230     >>> mgr.find_user_password("i", "j.example.com")
    231     ('13', 'm')
    232     >>> mgr.find_user_password("i", "j.example.com:80")
    233     (None, None)
    234     >>> mgr.find_user_password("i", "http://j.example.com")
    235     ('13', 'm')
    236     >>> mgr.find_user_password("i", "http://j.example.com:80")
    237     (None, None)
    238 
    239     """
    240 
    241 class MockOpener:
    242     addheaders = []
    243     def open(self, req, data=None,timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
    244         self.req, self.data, self.timeout  = req, data, timeout
    245     def error(self, proto, *args):
    246         self.proto, self.args = proto, args
    247 
    248 class MockFile:
    249     def read(self, count=None): pass
    250     def readline(self, count=None): pass
    251     def close(self): pass
    252 
    253 class MockHeaders(dict):
    254     def getheaders(self, name):
    255         return self.values()
    256 
    257 class MockResponse(StringIO.StringIO):
    258     def __init__(self, code, msg, headers, data, url=None):
    259         StringIO.StringIO.__init__(self, data)
    260         self.code, self.msg, self.headers, self.url = code, msg, headers, url
    261     def info(self):
    262         return self.headers
    263     def geturl(self):
    264         return self.url
    265 
    266 class MockCookieJar:
    267     def add_cookie_header(self, request):
    268         self.ach_req = request
    269     def extract_cookies(self, response, request):
    270         self.ec_req, self.ec_r = request, response
    271 
    272 class FakeMethod:
    273     def __init__(self, meth_name, action, handle):
    274         self.meth_name = meth_name
    275         self.handle = handle
    276         self.action = action
    277     def __call__(self, *args):
    278         return self.handle(self.meth_name, self.action, *args)
    279 
    280 class MockHTTPResponse:
    281     def __init__(self, fp, msg, status, reason):
    282         self.fp = fp
    283         self.msg = msg
    284         self.status = status
    285         self.reason = reason
    286     def read(self):
    287         return ''
    288 
    289 class MockHTTPClass:
    290     def __init__(self):
    291         self.req_headers = []
    292         self.data = None
    293         self.raise_on_endheaders = False
    294         self._tunnel_headers = {}
    295 
    296     def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
    297         self.host = host
    298         self.timeout = timeout
    299         return self
    300 
    301     def set_debuglevel(self, level):
    302         self.level = level
    303 
    304     def set_tunnel(self, host, port=None, headers=None):
    305         self._tunnel_host = host
    306         self._tunnel_port = port
    307         if headers:
    308             self._tunnel_headers = headers
    309         else:
    310             self._tunnel_headers.clear()
    311 
    312     def request(self, method, url, body=None, headers=None):
    313         self.method = method
    314         self.selector = url
    315         if headers is not None:
    316             self.req_headers += headers.items()
    317         self.req_headers.sort()
    318         if body:
    319             self.data = body
    320         if self.raise_on_endheaders:
    321             import socket
    322             raise socket.error()
    323 
    324     def getresponse(self):
    325         return MockHTTPResponse(MockFile(), {}, 200, "OK")
    326 
    327     def close(self):
    328         pass
    329 
    330 class MockHandler:
    331     # useful for testing handler machinery
    332     # see add_ordered_mock_handlers() docstring
    333     handler_order = 500
    334     def __init__(self, methods):
    335         self._define_methods(methods)
    336     def _define_methods(self, methods):
    337         for spec in methods:
    338             if len(spec) == 2: name, action = spec
    339             else: name, action = spec, None
    340             meth = FakeMethod(name, action, self.handle)
    341             setattr(self.__class__, name, meth)
    342     def handle(self, fn_name, action, *args, **kwds):
    343         self.parent.calls.append((self, fn_name, args, kwds))
    344         if action is None:
    345             return None
    346         elif action == "return self":
    347             return self
    348         elif action == "return response":
    349             res = MockResponse(200, "OK", {}, "")
    350             return res
    351         elif action == "return request":
    352             return Request("http://blah/")
    353         elif action.startswith("error"):
    354             code = action[action.rfind(" ")+1:]
    355             try:
    356                 code = int(code)
    357             except ValueError:
    358                 pass
    359             res = MockResponse(200, "OK", {}, "")
    360             return self.parent.error("http", args[0], res, code, "", {})
    361         elif action == "raise":
    362             raise urllib2.URLError("blah")
    363         assert False
    364     def close(self): pass
    365     def add_parent(self, parent):
    366         self.parent = parent
    367         self.parent.calls = []
    368     def __lt__(self, other):
    369         if not hasattr(other, "handler_order"):
    370             # No handler_order, leave in original order.  Yuck.
    371             return True
    372         return self.handler_order < other.handler_order
    373 
    374 def add_ordered_mock_handlers(opener, meth_spec):
    375     """Create MockHandlers and add them to an OpenerDirector.
    376 
    377     meth_spec: list of lists of tuples and strings defining methods to define
    378     on handlers.  eg:
    379 
    380     [["http_error", "ftp_open"], ["http_open"]]
    381 
    382     defines methods .http_error() and .ftp_open() on one handler, and
    383     .http_open() on another.  These methods just record their arguments and
    384     return None.  Using a tuple instead of a string causes the method to
    385     perform some action (see MockHandler.handle()), eg:
    386 
    387     [["http_error"], [("http_open", "return request")]]
    388 
    389     defines .http_error() on one handler (which simply returns None), and
    390     .http_open() on another handler, which returns a Request object.
    391 
    392     """
    393     handlers = []
    394     count = 0
    395     for meths in meth_spec:
    396         class MockHandlerSubclass(MockHandler): pass
    397         h = MockHandlerSubclass(meths)
    398         h.handler_order += count
    399         h.add_parent(opener)
    400         count = count + 1
    401         handlers.append(h)
    402         opener.add_handler(h)
    403     return handlers
    404 
    405 def build_test_opener(*handler_instances):
    406     opener = OpenerDirector()
    407     for h in handler_instances:
    408         opener.add_handler(h)
    409     return opener
    410 
    411 class MockHTTPHandler(urllib2.BaseHandler):
    412     # useful for testing redirections and auth
    413     # sends supplied headers and code as first response
    414     # sends 200 OK as second response
    415     def __init__(self, code, headers):
    416         self.code = code
    417         self.headers = headers
    418         self.reset()
    419     def reset(self):
    420         self._count = 0
    421         self.requests = []
    422     def http_open(self, req):
    423         import mimetools, copy
    424         from StringIO import StringIO
    425         self.requests.append(copy.deepcopy(req))
    426         if self._count == 0:
    427             self._count = self._count + 1
    428             name = httplib.responses[self.code]
    429             msg = mimetools.Message(StringIO(self.headers))
    430             return self.parent.error(
    431                 "http", req, MockFile(), self.code, name, msg)
    432         else:
    433             self.req = req
    434             msg = mimetools.Message(StringIO("\r\n\r\n"))
    435             return MockResponse(200, "OK", msg, "", req.get_full_url())
    436 
    437 class MockHTTPSHandler(urllib2.AbstractHTTPHandler):
    438     # Useful for testing the Proxy-Authorization request by verifying the
    439     # properties of httpcon
    440 
    441     def __init__(self):
    442         urllib2.AbstractHTTPHandler.__init__(self)
    443         self.httpconn = MockHTTPClass()
    444 
    445     def https_open(self, req):
    446         return self.do_open(self.httpconn, req)
    447 
    448 class MockPasswordManager:
    449     def add_password(self, realm, uri, user, password):
    450         self.realm = realm
    451         self.url = uri
    452         self.user = user
    453         self.password = password
    454     def find_user_password(self, realm, authuri):
    455         self.target_realm = realm
    456         self.target_url = authuri
    457         return self.user, self.password
    458 
    459 
    460 class OpenerDirectorTests(unittest.TestCase):
    461 
    462     def test_add_non_handler(self):
    463         class NonHandler(object):
    464             pass
    465         self.assertRaises(TypeError,
    466                           OpenerDirector().add_handler, NonHandler())
    467 
    468     def test_badly_named_methods(self):
    469         # test work-around for three methods that accidentally follow the
    470         # naming conventions for handler methods
    471         # (*_open() / *_request() / *_response())
    472 
    473         # These used to call the accidentally-named methods, causing a
    474         # TypeError in real code; here, returning self from these mock
    475         # methods would either cause no exception, or AttributeError.
    476 
    477         from urllib2 import URLError
    478 
    479         o = OpenerDirector()
    480         meth_spec = [
    481             [("do_open", "return self"), ("proxy_open", "return self")],
    482             [("redirect_request", "return self")],
    483             ]
    484         handlers = add_ordered_mock_handlers(o, meth_spec)
    485         o.add_handler(urllib2.UnknownHandler())
    486         for scheme in "do", "proxy", "redirect":
    487             self.assertRaises(URLError, o.open, scheme+"://example.com/")
    488 
    489     def test_handled(self):
    490         # handler returning non-None means no more handlers will be called
    491         o = OpenerDirector()
    492         meth_spec = [
    493             ["http_open", "ftp_open", "http_error_302"],
    494             ["ftp_open"],
    495             [("http_open", "return self")],
    496             [("http_open", "return self")],
    497             ]
    498         handlers = add_ordered_mock_handlers(o, meth_spec)
    499 
    500         req = Request("http://example.com/")
    501         r = o.open(req)
    502         # Second .http_open() gets called, third doesn't, since second returned
    503         # non-None.  Handlers without .http_open() never get any methods called
    504         # on them.
    505         # In fact, second mock handler defining .http_open() returns self
    506         # (instead of response), which becomes the OpenerDirector's return
    507         # value.
    508         self.assertEqual(r, handlers[2])
    509         calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
    510         for expected, got in zip(calls, o.calls):
    511             handler, name, args, kwds = got
    512             self.assertEqual((handler, name), expected)
    513             self.assertEqual(args, (req,))
    514 
    515     def test_handler_order(self):
    516         o = OpenerDirector()
    517         handlers = []
    518         for meths, handler_order in [
    519             ([("http_open", "return self")], 500),
    520             (["http_open"], 0),
    521             ]:
    522             class MockHandlerSubclass(MockHandler): pass
    523             h = MockHandlerSubclass(meths)
    524             h.handler_order = handler_order
    525             handlers.append(h)
    526             o.add_handler(h)
    527 
    528         r = o.open("http://example.com/")
    529         # handlers called in reverse order, thanks to their sort order
    530         self.assertEqual(o.calls[0][0], handlers[1])
    531         self.assertEqual(o.calls[1][0], handlers[0])
    532 
    533     def test_raise(self):
    534         # raising URLError stops processing of request
    535         o = OpenerDirector()
    536         meth_spec = [
    537             [("http_open", "raise")],
    538             [("http_open", "return self")],
    539             ]
    540         handlers = add_ordered_mock_handlers(o, meth_spec)
    541 
    542         req = Request("http://example.com/")
    543         self.assertRaises(urllib2.URLError, o.open, req)
    544         self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])
    545 
    546 ##     def test_error(self):
    547 ##         # XXX this doesn't actually seem to be used in standard library,
    548 ##         #  but should really be tested anyway...
    549 
    550     def test_http_error(self):
    551         # XXX http_error_default
    552         # http errors are a special case
    553         o = OpenerDirector()
    554         meth_spec = [
    555             [("http_open", "error 302")],
    556             [("http_error_400", "raise"), "http_open"],
    557             [("http_error_302", "return response"), "http_error_303",
    558              "http_error"],
    559             [("http_error_302")],
    560             ]
    561         handlers = add_ordered_mock_handlers(o, meth_spec)
    562 
    563         class Unknown:
    564             def __eq__(self, other): return True
    565 
    566         req = Request("http://example.com/")
    567         r = o.open(req)
    568         assert len(o.calls) == 2
    569         calls = [(handlers[0], "http_open", (req,)),
    570                  (handlers[2], "http_error_302",
    571                   (req, Unknown(), 302, "", {}))]
    572         for expected, got in zip(calls, o.calls):
    573             handler, method_name, args = expected
    574             self.assertEqual((handler, method_name), got[:2])
    575             self.assertEqual(args, got[2])
    576 
    577     def test_processors(self):
    578         # *_request / *_response methods get called appropriately
    579         o = OpenerDirector()
    580         meth_spec = [
    581             [("http_request", "return request"),
    582              ("http_response", "return response")],
    583             [("http_request", "return request"),
    584              ("http_response", "return response")],
    585             ]
    586         handlers = add_ordered_mock_handlers(o, meth_spec)
    587 
    588         req = Request("http://example.com/")
    589         r = o.open(req)
    590         # processor methods are called on *all* handlers that define them,
    591         # not just the first handler that handles the request
    592         calls = [
    593             (handlers[0], "http_request"), (handlers[1], "http_request"),
    594             (handlers[0], "http_response"), (handlers[1], "http_response")]
    595 
    596         for i, (handler, name, args, kwds) in enumerate(o.calls):
    597             if i < 2:
    598                 # *_request
    599                 self.assertEqual((handler, name), calls[i])
    600                 self.assertEqual(len(args), 1)
    601                 self.assertIsInstance(args[0], Request)
    602             else:
    603                 # *_response
    604                 self.assertEqual((handler, name), calls[i])
    605                 self.assertEqual(len(args), 2)
    606                 self.assertIsInstance(args[0], Request)
    607                 # response from opener.open is None, because there's no
    608                 # handler that defines http_open to handle it
    609                 if args[1] is not None:
    610                     self.assertIsInstance(args[1], MockResponse)
    611 
    612 
    613 def sanepathname2url(path):
    614     import urllib
    615     urlpath = urllib.pathname2url(path)
    616     if os.name == "nt" and urlpath.startswith("///"):
    617         urlpath = urlpath[2:]
    618     # XXX don't ask me about the mac...
    619     return urlpath
    620 
    621 class HandlerTests(unittest.TestCase):
    622 
    623     def test_ftp(self):
    624         class MockFTPWrapper:
    625             def __init__(self, data): self.data = data
    626             def retrfile(self, filename, filetype):
    627                 self.filename, self.filetype = filename, filetype
    628                 return StringIO.StringIO(self.data), len(self.data)
    629             def close(self): pass
    630 
    631         class NullFTPHandler(urllib2.FTPHandler):
    632             def __init__(self, data): self.data = data
    633             def connect_ftp(self, user, passwd, host, port, dirs,
    634                             timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
    635                 self.user, self.passwd = user, passwd
    636                 self.host, self.port = host, port
    637                 self.dirs = dirs
    638                 self.ftpwrapper = MockFTPWrapper(self.data)
    639                 return self.ftpwrapper
    640 
    641         import ftplib
    642         data = "rheum rhaponicum"
    643         h = NullFTPHandler(data)
    644         o = h.parent = MockOpener()
    645 
    646         for url, host, port, user, passwd, type_, dirs, filename, mimetype in [
    647             ("ftp://localhost/foo/bar/baz.html",
    648              "localhost", ftplib.FTP_PORT, "", "", "I",
    649              ["foo", "bar"], "baz.html", "text/html"),
    650             ("ftp://parrot@localhost/foo/bar/baz.html",
    651              "localhost", ftplib.FTP_PORT, "parrot", "", "I",
    652              ["foo", "bar"], "baz.html", "text/html"),
    653             ("ftp://%25parrot@localhost/foo/bar/baz.html",
    654              "localhost", ftplib.FTP_PORT, "%parrot", "", "I",
    655              ["foo", "bar"], "baz.html", "text/html"),
    656             ("ftp://%2542parrot@localhost/foo/bar/baz.html",
    657              "localhost", ftplib.FTP_PORT, "%42parrot", "", "I",
    658              ["foo", "bar"], "baz.html", "text/html"),
    659             ("ftp://localhost:80/foo/bar/",
    660              "localhost", 80, "", "", "D",
    661              ["foo", "bar"], "", None),
    662             ("ftp://localhost/baz.gif;type=a",
    663              "localhost", ftplib.FTP_PORT, "", "", "A",
    664              [], "baz.gif", None),  # XXX really this should guess image/gif
    665             ]:
    666             req = Request(url)
    667             req.timeout = None
    668             r = h.ftp_open(req)
    669             # ftp authentication not yet implemented by FTPHandler
    670             self.assertEqual(h.user, user)
    671             self.assertEqual(h.passwd, passwd)
    672             self.assertEqual(h.host, socket.gethostbyname(host))
    673             self.assertEqual(h.port, port)
    674             self.assertEqual(h.dirs, dirs)
    675             self.assertEqual(h.ftpwrapper.filename, filename)
    676             self.assertEqual(h.ftpwrapper.filetype, type_)
    677             headers = r.info()
    678             self.assertEqual(headers.get("Content-type"), mimetype)
    679             self.assertEqual(int(headers["Content-length"]), len(data))
    680 
    681     def test_file(self):
    682         import rfc822, socket
    683         h = urllib2.FileHandler()
    684         o = h.parent = MockOpener()
    685 
    686         TESTFN = test_support.TESTFN
    687         urlpath = sanepathname2url(os.path.abspath(TESTFN))
    688         towrite = "hello, world\n"
    689         urls = [
    690             "file://localhost%s" % urlpath,
    691             "file://%s" % urlpath,
    692             "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
    693             ]
    694         try:
    695             localaddr = socket.gethostbyname(socket.gethostname())
    696         except socket.gaierror:
    697             localaddr = ''
    698         if localaddr:
    699             urls.append("file://%s%s" % (localaddr, urlpath))
    700 
    701         for url in urls:
    702             f = open(TESTFN, "wb")
    703             try:
    704                 try:
    705                     f.write(towrite)
    706                 finally:
    707                     f.close()
    708 
    709                 r = h.file_open(Request(url))
    710                 try:
    711                     data = r.read()
    712                     headers = r.info()
    713                     respurl = r.geturl()
    714                 finally:
    715                     r.close()
    716                 stats = os.stat(TESTFN)
    717                 modified = rfc822.formatdate(stats.st_mtime)
    718             finally:
    719                 os.remove(TESTFN)
    720             self.assertEqual(data, towrite)
    721             self.assertEqual(headers["Content-type"], "text/plain")
    722             self.assertEqual(headers["Content-length"], "13")
    723             self.assertEqual(headers["Last-modified"], modified)
    724             self.assertEqual(respurl, url)
    725 
    726         for url in [
    727             "file://localhost:80%s" % urlpath,
    728             "file:///file_does_not_exist.txt",
    729             "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
    730                                    os.getcwd(), TESTFN),
    731             "file://somerandomhost.ontheinternet.com%s/%s" %
    732             (os.getcwd(), TESTFN),
    733             ]:
    734             try:
    735                 f = open(TESTFN, "wb")
    736                 try:
    737                     f.write(towrite)
    738                 finally:
    739                     f.close()
    740 
    741                 self.assertRaises(urllib2.URLError,
    742                                   h.file_open, Request(url))
    743             finally:
    744                 os.remove(TESTFN)
    745 
    746         h = urllib2.FileHandler()
    747         o = h.parent = MockOpener()
    748         # XXXX why does // mean ftp (and /// mean not ftp!), and where
    749         #  is file: scheme specified?  I think this is really a bug, and
    750         #  what was intended was to distinguish between URLs like:
    751         # file:/blah.txt (a file)
    752         # file://localhost/blah.txt (a file)
    753         # file:///blah.txt (a file)
    754         # file://ftp.example.com/blah.txt (an ftp URL)
    755         for url, ftp in [
    756             ("file://ftp.example.com//foo.txt", True),
    757             ("file://ftp.example.com///foo.txt", False),
    758 # XXXX bug: fails with OSError, should be URLError
    759             ("file://ftp.example.com/foo.txt", False),
    760             ("file://somehost//foo/something.txt", True),
    761             ("file://localhost//foo/something.txt", False),
    762             ]:
    763             req = Request(url)
    764             try:
    765                 h.file_open(req)
    766             # XXXX remove OSError when bug fixed
    767             except (urllib2.URLError, OSError):
    768                 self.assertTrue(not ftp)
    769             else:
    770                 self.assertTrue(o.req is req)
    771                 self.assertEqual(req.type, "ftp")
    772             self.assertEqual(req.type == "ftp", ftp)
    773 
    774     def test_http(self):
    775 
    776         h = urllib2.AbstractHTTPHandler()
    777         o = h.parent = MockOpener()
    778 
    779         url = "http://example.com/"
    780         for method, data in [("GET", None), ("POST", "blah")]:
    781             req = Request(url, data, {"Foo": "bar"})
    782             req.timeout = None
    783             req.add_unredirected_header("Spam", "eggs")
    784             http = MockHTTPClass()
    785             r = h.do_open(http, req)
    786 
    787             # result attributes
    788             r.read; r.readline  # wrapped MockFile methods
    789             r.info; r.geturl  # addinfourl methods
    790             r.code, r.msg == 200, "OK"  # added from MockHTTPClass.getreply()
    791             hdrs = r.info()
    792             hdrs.get; hdrs.has_key  # r.info() gives dict from .getreply()
    793             self.assertEqual(r.geturl(), url)
    794 
    795             self.assertEqual(http.host, "example.com")
    796             self.assertEqual(http.level, 0)
    797             self.assertEqual(http.method, method)
    798             self.assertEqual(http.selector, "/")
    799             self.assertEqual(http.req_headers,
    800                              [("Connection", "close"),
    801                               ("Foo", "bar"), ("Spam", "eggs")])
    802             self.assertEqual(http.data, data)
    803 
    804         # check socket.error converted to URLError
    805         http.raise_on_endheaders = True
    806         self.assertRaises(urllib2.URLError, h.do_open, http, req)
    807 
    808         # check adding of standard headers
    809         o.addheaders = [("Spam", "eggs")]
    810         for data in "", None:  # POST, GET
    811             req = Request("http://example.com/", data)
    812             r = MockResponse(200, "OK", {}, "")
    813             newreq = h.do_request_(req)
    814             if data is None:  # GET
    815                 self.assertNotIn("Content-length", req.unredirected_hdrs)
    816                 self.assertNotIn("Content-type", req.unredirected_hdrs)
    817             else:  # POST
    818                 self.assertEqual(req.unredirected_hdrs["Content-length"], "0")
    819                 self.assertEqual(req.unredirected_hdrs["Content-type"],
    820                              "application/x-www-form-urlencoded")
    821             # XXX the details of Host could be better tested
    822             self.assertEqual(req.unredirected_hdrs["Host"], "example.com")
    823             self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")
    824 
    825             # don't clobber existing headers
    826             req.add_unredirected_header("Content-length", "foo")
    827             req.add_unredirected_header("Content-type", "bar")
    828             req.add_unredirected_header("Host", "baz")
    829             req.add_unredirected_header("Spam", "foo")
    830             newreq = h.do_request_(req)
    831             self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")
    832             self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")
    833             self.assertEqual(req.unredirected_hdrs["Host"], "baz")
    834             self.assertEqual(req.unredirected_hdrs["Spam"], "foo")
    835 
    836     def test_http_doubleslash(self):
    837         # Checks that the presence of an unnecessary double slash in a url doesn't break anything
    838         # Previously, a double slash directly after the host could cause incorrect parsing of the url
    839         h = urllib2.AbstractHTTPHandler()
    840         o = h.parent = MockOpener()
    841 
    842         data = ""
    843         ds_urls = [
    844             "http://example.com/foo/bar/baz.html",
    845             "http://example.com//foo/bar/baz.html",
    846             "http://example.com/foo//bar/baz.html",
    847             "http://example.com/foo/bar//baz.html",
    848         ]
    849 
    850         for ds_url in ds_urls:
    851             ds_req = Request(ds_url, data)
    852 
    853             # Check whether host is determined correctly if there is no proxy
    854             np_ds_req = h.do_request_(ds_req)
    855             self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
    856 
    857             # Check whether host is determined correctly if there is a proxy
    858             ds_req.set_proxy("someproxy:3128",None)
    859             p_ds_req = h.do_request_(ds_req)
    860             self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
    861 
    862     def test_fixpath_in_weirdurls(self):
    863         # Issue4493: urllib2 to supply '/' when to urls where path does not
    864         # start with'/'
    865 
    866         h = urllib2.AbstractHTTPHandler()
    867         o = h.parent = MockOpener()
    868 
    869         weird_url = 'http://www.python.org?getspam'
    870         req = Request(weird_url)
    871         newreq = h.do_request_(req)
    872         self.assertEqual(newreq.get_host(),'www.python.org')
    873         self.assertEqual(newreq.get_selector(),'/?getspam')
    874 
    875         url_without_path = 'http://www.python.org'
    876         req = Request(url_without_path)
    877         newreq = h.do_request_(req)
    878         self.assertEqual(newreq.get_host(),'www.python.org')
    879         self.assertEqual(newreq.get_selector(),'')
    880 
    881     def test_errors(self):
    882         h = urllib2.HTTPErrorProcessor()
    883         o = h.parent = MockOpener()
    884 
    885         url = "http://example.com/"
    886         req = Request(url)
    887         # all 2xx are passed through
    888         r = MockResponse(200, "OK", {}, "", url)
    889         newr = h.http_response(req, r)
    890         self.assertTrue(r is newr)
    891         self.assertTrue(not hasattr(o, "proto"))  # o.error not called
    892         r = MockResponse(202, "Accepted", {}, "", url)
    893         newr = h.http_response(req, r)
    894         self.assertTrue(r is newr)
    895         self.assertTrue(not hasattr(o, "proto"))  # o.error not called
    896         r = MockResponse(206, "Partial content", {}, "", url)
    897         newr = h.http_response(req, r)
    898         self.assertTrue(r is newr)
    899         self.assertTrue(not hasattr(o, "proto"))  # o.error not called
    900         # anything else calls o.error (and MockOpener returns None, here)
    901         r = MockResponse(502, "Bad gateway", {}, "", url)
    902         self.assertTrue(h.http_response(req, r) is None)
    903         self.assertEqual(o.proto, "http")  # o.error called
    904         self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
    905 
    906     def test_cookies(self):
    907         cj = MockCookieJar()
    908         h = urllib2.HTTPCookieProcessor(cj)
    909         o = h.parent = MockOpener()
    910 
    911         req = Request("http://example.com/")
    912         r = MockResponse(200, "OK", {}, "")
    913         newreq = h.http_request(req)
    914         self.assertTrue(cj.ach_req is req is newreq)
    915         self.assertEqual(req.get_origin_req_host(), "example.com")
    916         self.assertTrue(not req.is_unverifiable())
    917         newr = h.http_response(req, r)
    918         self.assertTrue(cj.ec_req is req)
    919         self.assertTrue(cj.ec_r is r is newr)
    920 
    921     def test_redirect(self):
    922         from_url = "http://example.com/a.html"
    923         to_url = "http://example.com/b.html"
    924         h = urllib2.HTTPRedirectHandler()
    925         o = h.parent = MockOpener()
    926 
    927         # ordinary redirect behaviour
    928         for code in 301, 302, 303, 307:
    929             for data in None, "blah\nblah\n":
    930                 method = getattr(h, "http_error_%s" % code)
    931                 req = Request(from_url, data)
    932                 req.add_header("Nonsense", "viking=withhold")
    933                 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    934                 if data is not None:
    935                     req.add_header("Content-Length", str(len(data)))
    936                 req.add_unredirected_header("Spam", "spam")
    937                 try:
    938                     method(req, MockFile(), code, "Blah",
    939                            MockHeaders({"location": to_url}))
    940                 except urllib2.HTTPError:
    941                     # 307 in response to POST requires user OK
    942                     self.assertEqual(code, 307)
    943                     self.assertIsNotNone(data)
    944                 self.assertEqual(o.req.get_full_url(), to_url)
    945                 try:
    946                     self.assertEqual(o.req.get_method(), "GET")
    947                 except AttributeError:
    948                     self.assertTrue(not o.req.has_data())
    949 
    950                 # now it's a GET, there should not be headers regarding content
    951                 # (possibly dragged from before being a POST)
    952                 headers = [x.lower() for x in o.req.headers]
    953                 self.assertNotIn("content-length", headers)
    954                 self.assertNotIn("content-type", headers)
    955 
    956                 self.assertEqual(o.req.headers["Nonsense"],
    957                                  "viking=withhold")
    958                 self.assertNotIn("Spam", o.req.headers)
    959                 self.assertNotIn("Spam", o.req.unredirected_hdrs)
    960 
    961         # loop detection
    962         req = Request(from_url)
    963         req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    964         def redirect(h, req, url=to_url):
    965             h.http_error_302(req, MockFile(), 302, "Blah",
    966                              MockHeaders({"location": url}))
    967         # Note that the *original* request shares the same record of
    968         # redirections with the sub-requests caused by the redirections.
    969 
    970         # detect infinite loop redirect of a URL to itself
    971         req = Request(from_url, origin_req_host="example.com")
    972         count = 0
    973         req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    974         try:
    975             while 1:
    976                 redirect(h, req, "http://example.com/")
    977                 count = count + 1
    978         except urllib2.HTTPError:
    979             # don't stop until max_repeats, because cookies may introduce state
    980             self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats)
    981 
    982         # detect endless non-repeating chain of redirects
    983         req = Request(from_url, origin_req_host="example.com")
    984         count = 0
    985         req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    986         try:
    987             while 1:
    988                 redirect(h, req, "http://example.com/%d" % count)
    989                 count = count + 1
    990         except urllib2.HTTPError:
    991             self.assertEqual(count,
    992                              urllib2.HTTPRedirectHandler.max_redirections)
    993 
    994     def test_invalid_redirect(self):
    995         from_url = "http://example.com/a.html"
    996         valid_schemes = ['http', 'https', 'ftp']
    997         invalid_schemes = ['file', 'imap', 'ldap']
    998         schemeless_url = "example.com/b.html"
    999         h = urllib2.HTTPRedirectHandler()
   1000         o = h.parent = MockOpener()
   1001         req = Request(from_url)
   1002         req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
   1003 
   1004         for scheme in invalid_schemes:
   1005             invalid_url = scheme + '://' + schemeless_url
   1006             self.assertRaises(urllib2.HTTPError, h.http_error_302,
   1007                               req, MockFile(), 302, "Security Loophole",
   1008                               MockHeaders({"location": invalid_url}))
   1009 
   1010         for scheme in valid_schemes:
   1011             valid_url = scheme + '://' + schemeless_url
   1012             h.http_error_302(req, MockFile(), 302, "That's fine",
   1013                 MockHeaders({"location": valid_url}))
   1014             self.assertEqual(o.req.get_full_url(), valid_url)
   1015 
   1016     def test_cookie_redirect(self):
   1017         # cookies shouldn't leak into redirected requests
   1018         from cookielib import CookieJar
   1019 
   1020         from test.test_cookielib import interact_netscape
   1021 
   1022         cj = CookieJar()
   1023         interact_netscape(cj, "http://www.example.com/", "spam=eggs")
   1024         hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
   1025         hdeh = urllib2.HTTPDefaultErrorHandler()
   1026         hrh = urllib2.HTTPRedirectHandler()
   1027         cp = urllib2.HTTPCookieProcessor(cj)
   1028         o = build_test_opener(hh, hdeh, hrh, cp)
   1029         o.open("http://www.example.com/")
   1030         self.assertTrue(not hh.req.has_header("Cookie"))
   1031 
   1032     def test_redirect_fragment(self):
   1033         redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
   1034         hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
   1035         hdeh = urllib2.HTTPDefaultErrorHandler()
   1036         hrh = urllib2.HTTPRedirectHandler()
   1037         o = build_test_opener(hh, hdeh, hrh)
   1038         fp = o.open('http://www.example.com')
   1039         self.assertEqual(fp.geturl(), redirected_url.strip())
   1040 
   1041     def test_redirect_no_path(self):
   1042         # Issue 14132: Relative redirect strips original path
   1043         real_class = httplib.HTTPConnection
   1044         response1 = b"HTTP/1.1 302 Found\r\nLocation: ?query\r\n\r\n"
   1045         httplib.HTTPConnection = test_urllib.fakehttp(response1)
   1046         self.addCleanup(setattr, httplib, "HTTPConnection", real_class)
   1047         urls = iter(("/path", "/path?query"))
   1048         def request(conn, method, url, *pos, **kw):
   1049             self.assertEqual(url, next(urls))
   1050             real_class.request(conn, method, url, *pos, **kw)
   1051             # Change response for subsequent connection
   1052             conn.__class__.fakedata = b"HTTP/1.1 200 OK\r\n\r\nHello!"
   1053         httplib.HTTPConnection.request = request
   1054         fp = urllib2.urlopen("http://python.org/path")
   1055         self.assertEqual(fp.geturl(), "http://python.org/path?query")
   1056 
   1057     def test_proxy(self):
   1058         o = OpenerDirector()
   1059         ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
   1060         o.add_handler(ph)
   1061         meth_spec = [
   1062             [("http_open", "return response")]
   1063             ]
   1064         handlers = add_ordered_mock_handlers(o, meth_spec)
   1065 
   1066         req = Request("http://acme.example.com/")
   1067         self.assertEqual(req.get_host(), "acme.example.com")
   1068         r = o.open(req)
   1069         self.assertEqual(req.get_host(), "proxy.example.com:3128")
   1070 
   1071         self.assertEqual([(handlers[0], "http_open")],
   1072                          [tup[0:2] for tup in o.calls])
   1073 
   1074     def test_proxy_no_proxy(self):
   1075         os.environ['no_proxy'] = 'python.org'
   1076         o = OpenerDirector()
   1077         ph = urllib2.ProxyHandler(dict(http="proxy.example.com"))
   1078         o.add_handler(ph)
   1079         req = Request("http://www.perl.org/")
   1080         self.assertEqual(req.get_host(), "www.perl.org")
   1081         r = o.open(req)
   1082         self.assertEqual(req.get_host(), "proxy.example.com")
   1083         req = Request("http://www.python.org")
   1084         self.assertEqual(req.get_host(), "www.python.org")
   1085         r = o.open(req)
   1086         self.assertEqual(req.get_host(), "www.python.org")
   1087         del os.environ['no_proxy']
   1088 
   1089 
   1090     def test_proxy_https(self):
   1091         o = OpenerDirector()
   1092         ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
   1093         o.add_handler(ph)
   1094         meth_spec = [
   1095             [("https_open","return response")]
   1096         ]
   1097         handlers = add_ordered_mock_handlers(o, meth_spec)
   1098         req = Request("https://www.example.com/")
   1099         self.assertEqual(req.get_host(), "www.example.com")
   1100         r = o.open(req)
   1101         self.assertEqual(req.get_host(), "proxy.example.com:3128")
   1102         self.assertEqual([(handlers[0], "https_open")],
   1103                          [tup[0:2] for tup in o.calls])
   1104 
   1105     def test_proxy_https_proxy_authorization(self):
   1106         o = OpenerDirector()
   1107         ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
   1108         o.add_handler(ph)
   1109         https_handler = MockHTTPSHandler()
   1110         o.add_handler(https_handler)
   1111         req = Request("https://www.example.com/")
   1112         req.add_header("Proxy-Authorization","FooBar")
   1113         req.add_header("User-Agent","Grail")
   1114         self.assertEqual(req.get_host(), "www.example.com")
   1115         self.assertIsNone(req._tunnel_host)
   1116         r = o.open(req)
   1117         # Verify Proxy-Authorization gets tunneled to request.
   1118         # httpsconn req_headers do not have the Proxy-Authorization header but
   1119         # the req will have.
   1120         self.assertNotIn(("Proxy-Authorization","FooBar"),
   1121                          https_handler.httpconn.req_headers)
   1122         self.assertIn(("User-Agent","Grail"),
   1123                       https_handler.httpconn.req_headers)
   1124         self.assertIsNotNone(req._tunnel_host)
   1125         self.assertEqual(req.get_host(), "proxy.example.com:3128")
   1126         self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
   1127 
   1128     def test_basic_auth(self, quote_char='"'):
   1129         opener = OpenerDirector()
   1130         password_manager = MockPasswordManager()
   1131         auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
   1132         realm = "ACME Widget Store"
   1133         http_handler = MockHTTPHandler(
   1134             401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
   1135             (quote_char, realm, quote_char) )
   1136         opener.add_handler(auth_handler)
   1137         opener.add_handler(http_handler)
   1138         self._test_basic_auth(opener, auth_handler, "Authorization",
   1139                               realm, http_handler, password_manager,
   1140                               "http://acme.example.com/protected",
   1141                               "http://acme.example.com/protected"
   1142                              )
   1143 
   1144     def test_basic_auth_with_single_quoted_realm(self):
   1145         self.test_basic_auth(quote_char="'")
   1146 
   1147     def test_basic_auth_with_unquoted_realm(self):
   1148         opener = OpenerDirector()
   1149         password_manager = MockPasswordManager()
   1150         auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
   1151         realm = "ACME Widget Store"
   1152         http_handler = MockHTTPHandler(
   1153             401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm)
   1154         opener.add_handler(auth_handler)
   1155         opener.add_handler(http_handler)
   1156         msg = "Basic Auth Realm was unquoted"
   1157         with test_support.check_warnings((msg, UserWarning)):
   1158             self._test_basic_auth(opener, auth_handler, "Authorization",
   1159                                   realm, http_handler, password_manager,
   1160                                   "http://acme.example.com/protected",
   1161                                   "http://acme.example.com/protected"
   1162                                  )
   1163 
   1164 
   1165     def test_proxy_basic_auth(self):
   1166         opener = OpenerDirector()
   1167         ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
   1168         opener.add_handler(ph)
   1169         password_manager = MockPasswordManager()
   1170         auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
   1171         realm = "ACME Networks"
   1172         http_handler = MockHTTPHandler(
   1173             407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
   1174         opener.add_handler(auth_handler)
   1175         opener.add_handler(http_handler)
   1176         self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
   1177                               realm, http_handler, password_manager,
   1178                               "http://acme.example.com:3128/protected",
   1179                               "proxy.example.com:3128",
   1180                               )
   1181 
   1182     def test_basic_and_digest_auth_handlers(self):
   1183         # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
   1184         # response (http://python.org/sf/1479302), where it should instead
   1185         # return None to allow another handler (especially
   1186         # HTTPBasicAuthHandler) to handle the response.
   1187 
   1188         # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
   1189         # try digest first (since it's the strongest auth scheme), so we record
   1190         # order of calls here to check digest comes first:
   1191         class RecordingOpenerDirector(OpenerDirector):
   1192             def __init__(self):
   1193                 OpenerDirector.__init__(self)
   1194                 self.recorded = []
   1195             def record(self, info):
   1196                 self.recorded.append(info)
   1197         class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
   1198             def http_error_401(self, *args, **kwds):
   1199                 self.parent.record("digest")
   1200                 urllib2.HTTPDigestAuthHandler.http_error_401(self,
   1201                                                              *args, **kwds)
   1202         class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
   1203             def http_error_401(self, *args, **kwds):
   1204                 self.parent.record("basic")
   1205                 urllib2.HTTPBasicAuthHandler.http_error_401(self,
   1206                                                             *args, **kwds)
   1207 
   1208         opener = RecordingOpenerDirector()
   1209         password_manager = MockPasswordManager()
   1210         digest_handler = TestDigestAuthHandler(password_manager)
   1211         basic_handler = TestBasicAuthHandler(password_manager)
   1212         realm = "ACME Networks"
   1213         http_handler = MockHTTPHandler(
   1214             401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
   1215         opener.add_handler(basic_handler)
   1216         opener.add_handler(digest_handler)
   1217         opener.add_handler(http_handler)
   1218 
   1219         # check basic auth isn't blocked by digest handler failing
   1220         self._test_basic_auth(opener, basic_handler, "Authorization",
   1221                               realm, http_handler, password_manager,
   1222                               "http://acme.example.com/protected",
   1223                               "http://acme.example.com/protected",
   1224                               )
   1225         # check digest was tried before basic (twice, because
   1226         # _test_basic_auth called .open() twice)
   1227         self.assertEqual(opener.recorded, ["digest", "basic"]*2)
   1228 
   1229     def _test_basic_auth(self, opener, auth_handler, auth_header,
   1230                          realm, http_handler, password_manager,
   1231                          request_url, protected_url):
   1232         import base64
   1233         user, password = "wile", "coyote"
   1234 
   1235         # .add_password() fed through to password manager
   1236         auth_handler.add_password(realm, request_url, user, password)
   1237         self.assertEqual(realm, password_manager.realm)
   1238         self.assertEqual(request_url, password_manager.url)
   1239         self.assertEqual(user, password_manager.user)
   1240         self.assertEqual(password, password_manager.password)
   1241 
   1242         r = opener.open(request_url)
   1243 
   1244         # should have asked the password manager for the username/password
   1245         self.assertEqual(password_manager.target_realm, realm)
   1246         self.assertEqual(password_manager.target_url, protected_url)
   1247 
   1248         # expect one request without authorization, then one with
   1249         self.assertEqual(len(http_handler.requests), 2)
   1250         self.assertFalse(http_handler.requests[0].has_header(auth_header))
   1251         userpass = '%s:%s' % (user, password)
   1252         auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip()
   1253         self.assertEqual(http_handler.requests[1].get_header(auth_header),
   1254                          auth_hdr_value)
   1255         self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header],
   1256                          auth_hdr_value)
   1257         # if the password manager can't find a password, the handler won't
   1258         # handle the HTTP auth error
   1259         password_manager.user = password_manager.password = None
   1260         http_handler.reset()
   1261         r = opener.open(request_url)
   1262         self.assertEqual(len(http_handler.requests), 1)
   1263         self.assertFalse(http_handler.requests[0].has_header(auth_header))
   1264 
   1265 class MiscTests(unittest.TestCase):
   1266 
   1267     def test_build_opener(self):
   1268         class MyHTTPHandler(urllib2.HTTPHandler): pass
   1269         class FooHandler(urllib2.BaseHandler):
   1270             def foo_open(self): pass
   1271         class BarHandler(urllib2.BaseHandler):
   1272             def bar_open(self): pass
   1273 
   1274         build_opener = urllib2.build_opener
   1275 
   1276         o = build_opener(FooHandler, BarHandler)
   1277         self.opener_has_handler(o, FooHandler)
   1278         self.opener_has_handler(o, BarHandler)
   1279 
   1280         # can take a mix of classes and instances
   1281         o = build_opener(FooHandler, BarHandler())
   1282         self.opener_has_handler(o, FooHandler)
   1283         self.opener_has_handler(o, BarHandler)
   1284 
   1285         # subclasses of default handlers override default handlers
   1286         o = build_opener(MyHTTPHandler)
   1287         self.opener_has_handler(o, MyHTTPHandler)
   1288 
   1289         # a particular case of overriding: default handlers can be passed
   1290         # in explicitly
   1291         o = build_opener()
   1292         self.opener_has_handler(o, urllib2.HTTPHandler)
   1293         o = build_opener(urllib2.HTTPHandler)
   1294         self.opener_has_handler(o, urllib2.HTTPHandler)
   1295         o = build_opener(urllib2.HTTPHandler())
   1296         self.opener_has_handler(o, urllib2.HTTPHandler)
   1297 
   1298         # Issue2670: multiple handlers sharing the same base class
   1299         class MyOtherHTTPHandler(urllib2.HTTPHandler): pass
   1300         o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)
   1301         self.opener_has_handler(o, MyHTTPHandler)
   1302         self.opener_has_handler(o, MyOtherHTTPHandler)
   1303 
   1304     def opener_has_handler(self, opener, handler_class):
   1305         for h in opener.handlers:
   1306             if h.__class__ == handler_class:
   1307                 break
   1308         else:
   1309             self.assertTrue(False)
   1310 
   1311     def test_unsupported_algorithm(self):
   1312         handler = AbstractDigestAuthHandler()
   1313         with self.assertRaises(ValueError) as exc:
   1314             handler.get_algorithm_impls('invalid')
   1315         self.assertEqual(
   1316             str(exc.exception),
   1317             "Unsupported digest authentication algorithm 'invalid'"
   1318         )
   1319 
   1320 
   1321 class RequestTests(unittest.TestCase):
   1322 
   1323     def setUp(self):
   1324         self.get = urllib2.Request("http://www.python.org/~jeremy/")
   1325         self.post = urllib2.Request("http://www.python.org/~jeremy/",
   1326                                     "data",
   1327                                     headers={"X-Test": "test"})
   1328 
   1329     def test_method(self):
   1330         self.assertEqual("POST", self.post.get_method())
   1331         self.assertEqual("GET", self.get.get_method())
   1332 
   1333     def test_add_data(self):
   1334         self.assertTrue(not self.get.has_data())
   1335         self.assertEqual("GET", self.get.get_method())
   1336         self.get.add_data("spam")
   1337         self.assertTrue(self.get.has_data())
   1338         self.assertEqual("POST", self.get.get_method())
   1339 
   1340     def test_get_full_url(self):
   1341         self.assertEqual("http://www.python.org/~jeremy/",
   1342                          self.get.get_full_url())
   1343 
   1344     def test_selector(self):
   1345         self.assertEqual("/~jeremy/", self.get.get_selector())
   1346         req = urllib2.Request("http://www.python.org/")
   1347         self.assertEqual("/", req.get_selector())
   1348 
   1349     def test_get_type(self):
   1350         self.assertEqual("http", self.get.get_type())
   1351 
   1352     def test_get_host(self):
   1353         self.assertEqual("www.python.org", self.get.get_host())
   1354 
   1355     def test_get_host_unquote(self):
   1356         req = urllib2.Request("http://www.%70ython.org/")
   1357         self.assertEqual("www.python.org", req.get_host())
   1358 
   1359     def test_proxy(self):
   1360         self.assertTrue(not self.get.has_proxy())
   1361         self.get.set_proxy("www.perl.org", "http")
   1362         self.assertTrue(self.get.has_proxy())
   1363         self.assertEqual("www.python.org", self.get.get_origin_req_host())
   1364         self.assertEqual("www.perl.org", self.get.get_host())
   1365 
   1366     def test_wrapped_url(self):
   1367         req = Request("<URL:http://www.python.org>")
   1368         self.assertEqual("www.python.org", req.get_host())
   1369 
   1370     def test_url_fragment(self):
   1371         req = Request("http://www.python.org/?qs=query#fragment=true")
   1372         self.assertEqual("/?qs=query", req.get_selector())
   1373         req = Request("http://www.python.org/#fun=true")
   1374         self.assertEqual("/", req.get_selector())
   1375 
   1376         # Issue 11703: geturl() omits fragment in the original URL.
   1377         url = 'http://docs.python.org/library/urllib2.html#OK'
   1378         req = Request(url)
   1379         self.assertEqual(req.get_full_url(), url)
   1380 
   1381     def test_private_attributes(self):
   1382         self.assertFalse(hasattr(self.get, '_Request__r_xxx'))
   1383         # Issue #6500: infinite recursion
   1384         self.assertFalse(hasattr(self.get, '_Request__r_method'))
   1385 
   1386     def test_HTTPError_interface(self):
   1387         """
   1388         Issue 13211 reveals that HTTPError didn't implement the URLError
   1389         interface even though HTTPError is a subclass of URLError.
   1390 
   1391         >>> err = urllib2.HTTPError(msg='something bad happened', url=None, code=None, hdrs=None, fp=None)
   1392         >>> assert hasattr(err, 'reason')
   1393         >>> err.reason
   1394         'something bad happened'
   1395         """
   1396 
   1397     def test_HTTPError_interface_call(self):
   1398         """
   1399         Issue 15701= - HTTPError interface has info method available from URLError.
   1400         """
   1401         err = urllib2.HTTPError(msg='something bad happened', url=None,
   1402                                 code=None, hdrs='Content-Length:42', fp=None)
   1403         self.assertTrue(hasattr(err, 'reason'))
   1404         assert hasattr(err, 'reason')
   1405         assert hasattr(err, 'info')
   1406         assert callable(err.info)
   1407         try:
   1408             err.info()
   1409         except AttributeError:
   1410             self.fail("err.info() failed")
   1411         self.assertEqual(err.info(), "Content-Length:42")
   1412 
   1413 def test_main(verbose=None):
   1414     from test import test_urllib2
   1415     test_support.run_doctest(test_urllib2, verbose)
   1416     test_support.run_doctest(urllib2, verbose)
   1417     tests = (TrivialTests,
   1418              OpenerDirectorTests,
   1419              HandlerTests,
   1420              MiscTests,
   1421              RequestTests)
   1422     test_support.run_unittest(*tests)
   1423 
   1424 if __name__ == "__main__":
   1425     test_main(verbose=True)
   1426