Home | History | Annotate | Download | only in test
      1 import unittest
      2 from test import test_support
      3 
      4 import os
      5 import socket
      6 import StringIO
      7 
      8 import urllib2
      9 from urllib2 import Request, OpenerDirector
     10 
     11 # XXX

     12 # Request

     13 # CacheFTPHandler (hard to write)

     14 # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler

     15 
     16 class TrivialTests(unittest.TestCase):
     17     def test_trivial(self):
     18         # A couple trivial tests

     19 
     20         self.assertRaises(ValueError, urllib2.urlopen, 'bogus url')
     21 
     22         # XXX Name hacking to get this to work on Windows.

     23         fname = os.path.abspath(urllib2.__file__).replace('\\', '/')
     24 
     25         # And more hacking to get it to work on MacOS. This assumes
     26         # urllib.pathname2url works, unfortunately...
     27         if os.name == 'riscos':
     28             import string
     29             fname = os.expand(fname)
     30             fname = fname.translate(string.maketrans("/.", "./"))
     31 
     32         if os.name == 'nt':
     33             file_url = "file:///%s" % fname
     34         else:
     35             file_url = "file://%s" % fname
     36 
     37         f = urllib2.urlopen(file_url)
     38 
     39         buf = f.read()
     40         f.close()
     41 
     42     def test_parse_http_list(self):
     43         tests = [('a,b,c', ['a', 'b', 'c']),
     44                  ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),
     45                  ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),
     46                  ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]
     47         for string, list in tests:
     48             self.assertEqual(urllib2.parse_http_list(string), list)
     49 
     50 
     51 def test_request_headers_dict():
     52     """
     53     The Request.headers dictionary is not a documented interface.  It should
     54     stay that way, because the complete set of headers are only accessible
     55     through the .get_header(), .has_header(), .header_items() interface.
     56     However, .headers pre-dates those methods, and so real code will be using
     57     the dictionary.
     58 
     59     The introduction in 2.4 of those methods was a mistake for the same reason:
     60     code that previously saw all (urllib2 user)-provided headers in .headers
     61     now sees only a subset (and the function interface is ugly and incomplete).
     62     A better change would have been to replace .headers dict with a dict
     63     subclass (or UserDict.DictMixin instance?)  that preserved the .headers
     64     interface and also provided access to the "unredirected" headers.  It's
     65     probably too late to fix that, though.
     66 
     67 
     68     Check .capitalize() case normalization:
     69 
     70     >>> url = "http://example.com"
     71     >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"]
     72     'blah'
     73     >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"]
     74     'blah'
     75 
     76     Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError,
     77     but that could be changed in future.
     78 
     79     """
     80 
     81 def test_request_headers_methods():
     82     """
     83     Note the case normalization of header names here, to .capitalize()-case.
     84     This should be preserved for backwards-compatibility.  (In the HTTP case,
     85     normalization to .title()-case is done by urllib2 before sending headers to
     86     httplib).
     87 
     88     >>> url = "http://example.com"
     89     >>> r = Request(url, headers={"Spam-eggs": "blah"})
     90     >>> r.has_header("Spam-eggs")
     91     True
     92     >>> r.header_items()
     93     [('Spam-eggs', 'blah')]
     94     >>> r.add_header("Foo-Bar", "baz")
     95     >>> items = r.header_items()
     96     >>> items.sort()
     97     >>> items
     98     [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]
     99 
    100     Note that e.g. r.has_header("spam-EggS") is currently False, and
    101     r.get_header("spam-EggS") returns None, but that could be changed in
    102     future.
    103 
    104     >>> r.has_header("Not-there")
    105     False
    106     >>> print r.get_header("Not-there")
    107     None
    108     >>> r.get_header("Not-there", "default")
    109     'default'
    110 
    111     """
    112 
    113 
    114 def test_password_manager(self):
    115     """
    116     >>> mgr = urllib2.HTTPPasswordMgr()
    117     >>> add = mgr.add_password
    118     >>> add("Some Realm", "http://example.com/", "joe", "password")
    119     >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
    120     >>> add("c", "http://example.com/foo", "foo", "ni")
    121     >>> add("c", "http://example.com/bar", "bar", "nini")
    122     >>> add("b", "http://example.com/", "first", "blah")
    123     >>> add("b", "http://example.com/", "second", "spam")
    124     >>> add("a", "http://example.com", "1", "a")
    125     >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
    126     >>> add("Some Realm", "d.example.com", "4", "d")
    127     >>> add("Some Realm", "e.example.com:3128", "5", "e")
    128 
    129     >>> mgr.find_user_password("Some Realm", "example.com")
    130     ('joe', 'password')
    131     >>> mgr.find_user_password("Some Realm", "http://example.com")
    132     ('joe', 'password')
    133     >>> mgr.find_user_password("Some Realm", "http://example.com/")
    134     ('joe', 'password')
    135     >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
    136     ('joe', 'password')
    137     >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
    138     ('joe', 'password')
    139     >>> mgr.find_user_password("c", "http://example.com/foo")
    140     ('foo', 'ni')
    141     >>> mgr.find_user_password("c", "http://example.com/bar")
    142     ('bar', 'nini')
    143 
    144     Actually, this is really undefined ATM
    145 ##     Currently, we use the highest-level path where more than one match:

    146 
    147 ##     >>> mgr.find_user_password("Some Realm", "http://example.com/ni")

    148 ##     ('joe', 'password')

    149 
    150     Use latest add_password() in case of conflict:
    151 
    152     >>> mgr.find_user_password("b", "http://example.com/")
    153     ('second', 'spam')
    154 
    155     No special relationship between a.example.com and example.com:
    156 
    157     >>> mgr.find_user_password("a", "http://example.com/")
    158     ('1', 'a')
    159     >>> mgr.find_user_password("a", "http://a.example.com/")
    160     (None, None)
    161 
    162     Ports:
    163 
    164     >>> mgr.find_user_password("Some Realm", "c.example.com")
    165     (None, None)
    166     >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
    167     ('3', 'c')
    168     >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
    169     ('3', 'c')
    170     >>> mgr.find_user_password("Some Realm", "d.example.com")
    171     ('4', 'd')
    172     >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
    173     ('5', 'e')
    174 
    175     """
    176     pass
    177 
    178 
    179 def test_password_manager_default_port(self):
    180     """
    181     >>> mgr = urllib2.HTTPPasswordMgr()
    182     >>> add = mgr.add_password
    183 
    184     The point to note here is that we can't guess the default port if there's
    185     no scheme.  This applies to both add_password and find_user_password.
    186 
    187     >>> add("f", "http://g.example.com:80", "10", "j")
    188     >>> add("g", "http://h.example.com", "11", "k")
    189     >>> add("h", "i.example.com:80", "12", "l")
    190     >>> add("i", "j.example.com", "13", "m")
    191     >>> mgr.find_user_password("f", "g.example.com:100")
    192     (None, None)
    193     >>> mgr.find_user_password("f", "g.example.com:80")
    194     ('10', 'j')
    195     >>> mgr.find_user_password("f", "g.example.com")
    196     (None, None)
    197     >>> mgr.find_user_password("f", "http://g.example.com:100")
    198     (None, None)
    199     >>> mgr.find_user_password("f", "http://g.example.com:80")
    200     ('10', 'j')
    201     >>> mgr.find_user_password("f", "http://g.example.com")
    202     ('10', 'j')
    203     >>> mgr.find_user_password("g", "h.example.com")
    204     ('11', 'k')
    205     >>> mgr.find_user_password("g", "h.example.com:80")
    206     ('11', 'k')
    207     >>> mgr.find_user_password("g", "http://h.example.com:80")
    208     ('11', 'k')
    209     >>> mgr.find_user_password("h", "i.example.com")
    210     (None, None)
    211     >>> mgr.find_user_password("h", "i.example.com:80")
    212     ('12', 'l')
    213     >>> mgr.find_user_password("h", "http://i.example.com:80")
    214     ('12', 'l')
    215     >>> mgr.find_user_password("i", "j.example.com")
    216     ('13', 'm')
    217     >>> mgr.find_user_password("i", "j.example.com:80")
    218     (None, None)
    219     >>> mgr.find_user_password("i", "http://j.example.com")
    220     ('13', 'm')
    221     >>> mgr.find_user_password("i", "http://j.example.com:80")
    222     (None, None)
    223 
    224     """
    225 
    226 class MockOpener:
    227     addheaders = []
    228     def open(self, req, data=None,timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
    229         self.req, self.data, self.timeout  = req, data, timeout
    230     def error(self, proto, *args):
    231         self.proto, self.args = proto, args
    232 
    233 class MockFile:
    234     def read(self, count=None): pass
    235     def readline(self, count=None): pass
    236     def close(self): pass
    237 
    238 class MockHeaders(dict):
    239     def getheaders(self, name):
    240         return self.values()
    241 
    242 class MockResponse(StringIO.StringIO):
    243     def __init__(self, code, msg, headers, data, url=None):
    244         StringIO.StringIO.__init__(self, data)
    245         self.code, self.msg, self.headers, self.url = code, msg, headers, url
    246     def info(self):
    247         return self.headers
    248     def geturl(self):
    249         return self.url
    250 
    251 class MockCookieJar:
    252     def add_cookie_header(self, request):
    253         self.ach_req = request
    254     def extract_cookies(self, response, request):
    255         self.ec_req, self.ec_r = request, response
    256 
    257 class FakeMethod:
    258     def __init__(self, meth_name, action, handle):
    259         self.meth_name = meth_name
    260         self.handle = handle
    261         self.action = action
    262     def __call__(self, *args):
    263         return self.handle(self.meth_name, self.action, *args)
    264 
    265 class MockHTTPResponse:
    266     def __init__(self, fp, msg, status, reason):
    267         self.fp = fp
    268         self.msg = msg
    269         self.status = status
    270         self.reason = reason
    271     def read(self):
    272         return ''
    273 
    274 class MockHTTPClass:
    275     def __init__(self):
    276         self.req_headers = []
    277         self.data = None
    278         self.raise_on_endheaders = False
    279         self._tunnel_headers = {}
    280 
    281     def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
    282         self.host = host
    283         self.timeout = timeout
    284         return self
    285 
    286     def set_debuglevel(self, level):
    287         self.level = level
    288 
    289     def set_tunnel(self, host, port=None, headers=None):
    290         self._tunnel_host = host
    291         self._tunnel_port = port
    292         if headers:
    293             self._tunnel_headers = headers
    294         else:
    295             self._tunnel_headers.clear()
    296     def request(self, method, url, body=None, headers=None):
    297         self.method = method
    298         self.selector = url
    299         if headers is not None:
    300             self.req_headers += headers.items()
    301         self.req_headers.sort()
    302         if body:
    303             self.data = body
    304         if self.raise_on_endheaders:
    305             import socket
    306             raise socket.error()
    307     def getresponse(self):
    308         return MockHTTPResponse(MockFile(), {}, 200, "OK")
    309 
    310 class MockHandler:
    311     # useful for testing handler machinery
    312     # see add_ordered_mock_handlers() docstring
    313     handler_order = 500
    314     def __init__(self, methods):
    315         self._define_methods(methods)
    316     def _define_methods(self, methods):
    317         for spec in methods:
    318             if len(spec) == 2: name, action = spec
    319             else: name, action = spec, None
    320             meth = FakeMethod(name, action, self.handle)
    321             setattr(self.__class__, name, meth)
    322     def handle(self, fn_name, action, *args, **kwds):
    323         self.parent.calls.append((self, fn_name, args, kwds))
    324         if action is None:
    325             return None
    326         elif action == "return self":
    327             return self
    328         elif action == "return response":
    329             res = MockResponse(200, "OK", {}, "")
    330             return res
    331         elif action == "return request":
    332             return Request("http://blah/")
    333         elif action.startswith("error"):
    334             code = action[action.rfind(" ")+1:]
    335             try:
    336                 code = int(code)
    337             except ValueError:
    338                 pass
    339             res = MockResponse(200, "OK", {}, "")
    340             return self.parent.error("http", args[0], res, code, "", {})
    341         elif action == "raise":
    342             raise urllib2.URLError("blah")
    343         assert False
    344     def close(self): pass
    345     def add_parent(self, parent):
    346         self.parent = parent
    347         self.parent.calls = []
    348     def __lt__(self, other):
    349         if not hasattr(other, "handler_order"):
    350             # No handler_order, leave in original order.  Yuck.
    351             return True
    352         return self.handler_order < other.handler_order
    353 
    354 def add_ordered_mock_handlers(opener, meth_spec):
    355     """Create MockHandlers and add them to an OpenerDirector.
    356 
    357     meth_spec: list of lists of tuples and strings defining methods to define
    358     on handlers.  eg:
    359 
    360     [["http_error", "ftp_open"], ["http_open"]]
    361 
    362     defines methods .http_error() and .ftp_open() on one handler, and
    363     .http_open() on another.  These methods just record their arguments and
    364     return None.  Using a tuple instead of a string causes the method to
    365     perform some action (see MockHandler.handle()), eg:
    366 
    367     [["http_error"], [("http_open", "return request")]]
    368 
    369     defines .http_error() on one handler (which simply returns None), and
    370     .http_open() on another handler, which returns a Request object.
    371 
    372     """
    373     handlers = []
    374     count = 0
    375     for meths in meth_spec:
    376         class MockHandlerSubclass(MockHandler): pass
    377         h = MockHandlerSubclass(meths)
    378         h.handler_order += count
    379         h.add_parent(opener)
    380         count = count + 1
    381         handlers.append(h)
    382         opener.add_handler(h)
    383     return handlers
    384 
    385 def build_test_opener(*handler_instances):
    386     opener = OpenerDirector()
    387     for h in handler_instances:
    388         opener.add_handler(h)
    389     return opener
    390 
    391 class MockHTTPHandler(urllib2.BaseHandler):
    392     # useful for testing redirections and auth
    393     # sends supplied headers and code as first response
    394     # sends 200 OK as second response
    395     def __init__(self, code, headers):
    396         self.code = code
    397         self.headers = headers
    398         self.reset()
    399     def reset(self):
    400         self._count = 0
    401         self.requests = []
    402     def http_open(self, req):
    403         import mimetools, httplib, copy
    404         from StringIO import StringIO
    405         self.requests.append(copy.deepcopy(req))
    406         if self._count == 0:
    407             self._count = self._count + 1
    408             name = httplib.responses[self.code]
    409             msg = mimetools.Message(StringIO(self.headers))
    410             return self.parent.error(
    411                 "http", req, MockFile(), self.code, name, msg)
    412         else:
    413             self.req = req
    414             msg = mimetools.Message(StringIO("\r\n\r\n"))
    415             return MockResponse(200, "OK", msg, "", req.get_full_url())
    416 
    417 class MockHTTPSHandler(urllib2.AbstractHTTPHandler):
    418     # Useful for testing the Proxy-Authorization request by verifying the
    419     # properties of httpcon
    420 
    421     def __init__(self):
    422         urllib2.AbstractHTTPHandler.__init__(self)
    423         self.httpconn = MockHTTPClass()
    424 
    425     def https_open(self, req):
    426         return self.do_open(self.httpconn, req)
    427 
    428 class MockPasswordManager:
    429     def add_password(self, realm, uri, user, password):
    430         self.realm = realm
    431         self.url = uri
    432         self.user = user
    433         self.password = password
    434     def find_user_password(self, realm, authuri):
    435         self.target_realm = realm
    436         self.target_url = authuri
    437         return self.user, self.password
    438 
    439 
    440 class OpenerDirectorTests(unittest.TestCase):
    441 
    442     def test_add_non_handler(self):
    443         class NonHandler(object):
    444             pass
    445         self.assertRaises(TypeError,
    446                           OpenerDirector().add_handler, NonHandler())
    447 
    448     def test_badly_named_methods(self):
    449         # test work-around for three methods that accidentally follow the
    450         # naming conventions for handler methods
    451         # (*_open() / *_request() / *_response())
    452 
    453         # These used to call the accidentally-named methods, causing a
    454         # TypeError in real code; here, returning self from these mock
    455         # methods would either cause no exception, or AttributeError.
    456 
    457         from urllib2 import URLError
    458 
    459         o = OpenerDirector()
    460         meth_spec = [
    461             [("do_open", "return self"), ("proxy_open", "return self")],
    462             [("redirect_request", "return self")],
    463             ]
    464         handlers = add_ordered_mock_handlers(o, meth_spec)
    465         o.add_handler(urllib2.UnknownHandler())
    466         for scheme in "do", "proxy", "redirect":
    467             self.assertRaises(URLError, o.open, scheme+"://example.com/")
    468 
    469     def test_handled(self):
    470         # handler returning non-None means no more handlers will be called
    471         o = OpenerDirector()
    472         meth_spec = [
    473             ["http_open", "ftp_open", "http_error_302"],
    474             ["ftp_open"],
    475             [("http_open", "return self")],
    476             [("http_open", "return self")],
    477             ]
    478         handlers = add_ordered_mock_handlers(o, meth_spec)
    479 
    480         req = Request("http://example.com/")
    481         r = o.open(req)
    482         # Second .http_open() gets called, third doesn't, since second returned
    483         # non-None.  Handlers without .http_open() never get any methods called
    484         # on them.
    485         # In fact, second mock handler defining .http_open() returns self
    486         # (instead of response), which becomes the OpenerDirector's return
    487         # value.
    488         self.assertEqual(r, handlers[2])
    489         calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
    490         for expected, got in zip(calls, o.calls):
    491             handler, name, args, kwds = got
    492             self.assertEqual((handler, name), expected)
    493             self.assertEqual(args, (req,))
    494 
    495     def test_handler_order(self):
    496         o = OpenerDirector()
    497         handlers = []
    498         for meths, handler_order in [
    499             ([("http_open", "return self")], 500),
    500             (["http_open"], 0),
    501             ]:
    502             class MockHandlerSubclass(MockHandler): pass
    503             h = MockHandlerSubclass(meths)
    504             h.handler_order = handler_order
    505             handlers.append(h)
    506             o.add_handler(h)
    507 
    508         r = o.open("http://example.com/")
    509         # handlers called in reverse order, thanks to their sort order
    510         self.assertEqual(o.calls[0][0], handlers[1])
    511         self.assertEqual(o.calls[1][0], handlers[0])
    512 
    513     def test_raise(self):
    514         # raising URLError stops processing of request
    515         o = OpenerDirector()
    516         meth_spec = [
    517             [("http_open", "raise")],
    518             [("http_open", "return self")],
    519             ]
    520         handlers = add_ordered_mock_handlers(o, meth_spec)
    521 
    522         req = Request("http://example.com/")
    523         self.assertRaises(urllib2.URLError, o.open, req)
    524         self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])
    525 
    526 ##     def test_error(self):
    527 ##         # XXX this doesn't actually seem to be used in standard library,
    528 ##         #  but should really be tested anyway...
    529 
    530     def test_http_error(self):
    531         # XXX http_error_default
    532         # http errors are a special case
    533         o = OpenerDirector()
    534         meth_spec = [
    535             [("http_open", "error 302")],
    536             [("http_error_400", "raise"), "http_open"],
    537             [("http_error_302", "return response"), "http_error_303",
    538              "http_error"],
    539             [("http_error_302")],
    540             ]
    541         handlers = add_ordered_mock_handlers(o, meth_spec)
    542 
    543         class Unknown:
    544             def __eq__(self, other): return True
    545 
    546         req = Request("http://example.com/")
    547         r = o.open(req)
    548         assert len(o.calls) == 2
    549         calls = [(handlers[0], "http_open", (req,)),
    550                  (handlers[2], "http_error_302",
    551                   (req, Unknown(), 302, "", {}))]
    552         for expected, got in zip(calls, o.calls):
    553             handler, method_name, args = expected
    554             self.assertEqual((handler, method_name), got[:2])
    555             self.assertEqual(args, got[2])
    556 
    557     def test_processors(self):
    558         # *_request / *_response methods get called appropriately
    559         o = OpenerDirector()
    560         meth_spec = [
    561             [("http_request", "return request"),
    562              ("http_response", "return response")],
    563             [("http_request", "return request"),
    564              ("http_response", "return response")],
    565             ]
    566         handlers = add_ordered_mock_handlers(o, meth_spec)
    567 
    568         req = Request("http://example.com/")
    569         r = o.open(req)
    570         # processor methods are called on *all* handlers that define them,
    571         # not just the first handler that handles the request
    572         calls = [
    573             (handlers[0], "http_request"), (handlers[1], "http_request"),
    574             (handlers[0], "http_response"), (handlers[1], "http_response")]
    575 
    576         for i, (handler, name, args, kwds) in enumerate(o.calls):
    577             if i < 2:
    578                 # *_request
    579                 self.assertEqual((handler, name), calls[i])
    580                 self.assertEqual(len(args), 1)
    581                 self.assertIsInstance(args[0], Request)
    582             else:
    583                 # *_response
    584                 self.assertEqual((handler, name), calls[i])
    585                 self.assertEqual(len(args), 2)
    586                 self.assertIsInstance(args[0], Request)
    587                 # response from opener.open is None, because there's no
    588                 # handler that defines http_open to handle it
    589                 self.assertTrue(args[1] is None or
    590                              isinstance(args[1], MockResponse))
    591 
    592 
    593 def sanepathname2url(path):
    594     import urllib
    595     urlpath = urllib.pathname2url(path)
    596     if os.name == "nt" and urlpath.startswith("///"):
    597         urlpath = urlpath[2:]
    598     # XXX don't ask me about the mac...
    599     return urlpath
    600 
    601 class HandlerTests(unittest.TestCase):
    602 
    603     def test_ftp(self):
    604         class MockFTPWrapper:
    605             def __init__(self, data): self.data = data
    606             def retrfile(self, filename, filetype):
    607                 self.filename, self.filetype = filename, filetype
    608                 return StringIO.StringIO(self.data), len(self.data)
    609 
    610         class NullFTPHandler(urllib2.FTPHandler):
    611             def __init__(self, data): self.data = data
    612             def connect_ftp(self, user, passwd, host, port, dirs,
    613                             timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
    614                 self.user, self.passwd = user, passwd
    615                 self.host, self.port = host, port
    616                 self.dirs = dirs
    617                 self.ftpwrapper = MockFTPWrapper(self.data)
    618                 return self.ftpwrapper
    619 
    620         import ftplib
    621         data = "rheum rhaponicum"
    622         h = NullFTPHandler(data)
    623         o = h.parent = MockOpener()
    624 
    625         for url, host, port, user, passwd, type_, dirs, filename, mimetype in [
    626             ("ftp://localhost/foo/bar/baz.html",
    627              "localhost", ftplib.FTP_PORT, "", "", "I",
    628              ["foo", "bar"], "baz.html", "text/html"),
    629             ("ftp://parrot@localhost/foo/bar/baz.html",
    630              "localhost", ftplib.FTP_PORT, "parrot", "", "I",
    631              ["foo", "bar"], "baz.html", "text/html"),
    632             ("ftp://%25parrot@localhost/foo/bar/baz.html",
    633              "localhost", ftplib.FTP_PORT, "%parrot", "", "I",
    634              ["foo", "bar"], "baz.html", "text/html"),
    635             ("ftp://%2542parrot@localhost/foo/bar/baz.html",
    636              "localhost", ftplib.FTP_PORT, "%42parrot", "", "I",
    637              ["foo", "bar"], "baz.html", "text/html"),
    638             ("ftp://localhost:80/foo/bar/",
    639              "localhost", 80, "", "", "D",
    640              ["foo", "bar"], "", None),
    641             ("ftp://localhost/baz.gif;type=a",
    642              "localhost", ftplib.FTP_PORT, "", "", "A",
    643              [], "baz.gif", None),  # XXX really this should guess image/gif
    644             ]:
    645             req = Request(url)
    646             req.timeout = None
    647             r = h.ftp_open(req)
    648             # ftp authentication not yet implemented by FTPHandler
    649             self.assertEqual(h.user, user)
    650             self.assertEqual(h.passwd, passwd)
    651             self.assertEqual(h.host, socket.gethostbyname(host))
    652             self.assertEqual(h.port, port)
    653             self.assertEqual(h.dirs, dirs)
    654             self.assertEqual(h.ftpwrapper.filename, filename)
    655             self.assertEqual(h.ftpwrapper.filetype, type_)
    656             headers = r.info()
    657             self.assertEqual(headers.get("Content-type"), mimetype)
    658             self.assertEqual(int(headers["Content-length"]), len(data))
    659 
    660     def test_file(self):
    661         import rfc822, socket
    662         h = urllib2.FileHandler()
    663         o = h.parent = MockOpener()
    664 
    665         TESTFN = test_support.TESTFN
    666         urlpath = sanepathname2url(os.path.abspath(TESTFN))
    667         towrite = "hello, world\n"
    668         urls = [
    669             "file://localhost%s" % urlpath,
    670             "file://%s" % urlpath,
    671             "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
    672             ]
    673         try:
    674             localaddr = socket.gethostbyname(socket.gethostname())
    675         except socket.gaierror:
    676             localaddr = ''
    677         if localaddr:
    678             urls.append("file://%s%s" % (localaddr, urlpath))
    679 
    680         for url in urls:
    681             f = open(TESTFN, "wb")
    682             try:
    683                 try:
    684                     f.write(towrite)
    685                 finally:
    686                     f.close()
    687 
    688                 r = h.file_open(Request(url))
    689                 try:
    690                     data = r.read()
    691                     headers = r.info()
    692                     respurl = r.geturl()
    693                 finally:
    694                     r.close()
    695                 stats = os.stat(TESTFN)
    696                 modified = rfc822.formatdate(stats.st_mtime)
    697             finally:
    698                 os.remove(TESTFN)
    699             self.assertEqual(data, towrite)
    700             self.assertEqual(headers["Content-type"], "text/plain")
    701             self.assertEqual(headers["Content-length"], "13")
    702             self.assertEqual(headers["Last-modified"], modified)
    703             self.assertEqual(respurl, url)
    704 
    705         for url in [
    706             "file://localhost:80%s" % urlpath,
    707             "file:///file_does_not_exist.txt",
    708             "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
    709                                    os.getcwd(), TESTFN),
    710             "file://somerandomhost.ontheinternet.com%s/%s" %
    711             (os.getcwd(), TESTFN),
    712             ]:
    713             try:
    714                 f = open(TESTFN, "wb")
    715                 try:
    716                     f.write(towrite)
    717                 finally:
    718                     f.close()
    719 
    720                 self.assertRaises(urllib2.URLError,
    721                                   h.file_open, Request(url))
    722             finally:
    723                 os.remove(TESTFN)
    724 
    725         h = urllib2.FileHandler()
    726         o = h.parent = MockOpener()
    727         # XXXX why does // mean ftp (and /// mean not ftp!), and where
    728         #  is file: scheme specified?  I think this is really a bug, and
    729         #  what was intended was to distinguish between URLs like:
    730         # file:/blah.txt (a file)
    731         # file://localhost/blah.txt (a file)
    732         # file:///blah.txt (a file)
    733         # file://ftp.example.com/blah.txt (an ftp URL)
    734         for url, ftp in [
    735             ("file://ftp.example.com//foo.txt", True),
    736             ("file://ftp.example.com///foo.txt", False),
    737 # XXXX bug: fails with OSError, should be URLError
    738             ("file://ftp.example.com/foo.txt", False),
    739             ("file://somehost//foo/something.txt", True),
    740             ("file://localhost//foo/something.txt", False),
    741             ]:
    742             req = Request(url)
    743             try:
    744                 h.file_open(req)
    745             # XXXX remove OSError when bug fixed
    746             except (urllib2.URLError, OSError):
    747                 self.assertTrue(not ftp)
    748             else:
    749                 self.assertTrue(o.req is req)
    750                 self.assertEqual(req.type, "ftp")
    751             self.assertEqual(req.type == "ftp", ftp)
    752 
    753     def test_http(self):
    754 
    755         h = urllib2.AbstractHTTPHandler()
    756         o = h.parent = MockOpener()
    757 
    758         url = "http://example.com/"
    759         for method, data in [("GET", None), ("POST", "blah")]:
    760             req = Request(url, data, {"Foo": "bar"})
    761             req.timeout = None
    762             req.add_unredirected_header("Spam", "eggs")
    763             http = MockHTTPClass()
    764             r = h.do_open(http, req)
    765 
    766             # result attributes
    767             r.read; r.readline  # wrapped MockFile methods
    768             r.info; r.geturl  # addinfourl methods
    769             r.code, r.msg == 200, "OK"  # added from MockHTTPClass.getreply()
    770             hdrs = r.info()
    771             hdrs.get; hdrs.has_key  # r.info() gives dict from .getreply()
    772             self.assertEqual(r.geturl(), url)
    773 
    774             self.assertEqual(http.host, "example.com")
    775             self.assertEqual(http.level, 0)
    776             self.assertEqual(http.method, method)
    777             self.assertEqual(http.selector, "/")
    778             self.assertEqual(http.req_headers,
    779                              [("Connection", "close"),
    780                               ("Foo", "bar"), ("Spam", "eggs")])
    781             self.assertEqual(http.data, data)
    782 
    783         # check socket.error converted to URLError
    784         http.raise_on_endheaders = True
    785         self.assertRaises(urllib2.URLError, h.do_open, http, req)
    786 
    787         # check adding of standard headers
    788         o.addheaders = [("Spam", "eggs")]
    789         for data in "", None:  # POST, GET
    790             req = Request("http://example.com/", data)
    791             r = MockResponse(200, "OK", {}, "")
    792             newreq = h.do_request_(req)
    793             if data is None:  # GET
    794                 self.assertNotIn("Content-length", req.unredirected_hdrs)
    795                 self.assertNotIn("Content-type", req.unredirected_hdrs)
    796             else:  # POST
    797                 self.assertEqual(req.unredirected_hdrs["Content-length"], "0")
    798                 self.assertEqual(req.unredirected_hdrs["Content-type"],
    799                              "application/x-www-form-urlencoded")
    800             # XXX the details of Host could be better tested
    801             self.assertEqual(req.unredirected_hdrs["Host"], "example.com")
    802             self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")
    803 
    804             # don't clobber existing headers
    805             req.add_unredirected_header("Content-length", "foo")
    806             req.add_unredirected_header("Content-type", "bar")
    807             req.add_unredirected_header("Host", "baz")
    808             req.add_unredirected_header("Spam", "foo")
    809             newreq = h.do_request_(req)
    810             self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")
    811             self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")
    812             self.assertEqual(req.unredirected_hdrs["Host"], "baz")
    813             self.assertEqual(req.unredirected_hdrs["Spam"], "foo")
    814 
    815     def test_http_doubleslash(self):
    816         # Checks that the presence of an unnecessary double slash in a url doesn't break anything
    817         # Previously, a double slash directly after the host could cause incorrect parsing of the url
    818         h = urllib2.AbstractHTTPHandler()
    819         o = h.parent = MockOpener()
    820 
    821         data = ""
    822         ds_urls = [
    823             "http://example.com/foo/bar/baz.html",
    824             "http://example.com//foo/bar/baz.html",
    825             "http://example.com/foo//bar/baz.html",
    826             "http://example.com/foo/bar//baz.html",
    827         ]
    828 
    829         for ds_url in ds_urls:
    830             ds_req = Request(ds_url, data)
    831 
    832             # Check whether host is determined correctly if there is no proxy
    833             np_ds_req = h.do_request_(ds_req)
    834             self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
    835 
    836             # Check whether host is determined correctly if there is a proxy
    837             ds_req.set_proxy("someproxy:3128",None)
    838             p_ds_req = h.do_request_(ds_req)
    839             self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
    840 
    841     def test_fixpath_in_weirdurls(self):
    842         # Issue4493: urllib2 to supply '/' when to urls where path does not
    843         # start with'/'
    844 
    845         h = urllib2.AbstractHTTPHandler()
    846         o = h.parent = MockOpener()
    847 
    848         weird_url = 'http://www.python.org?getspam'
    849         req = Request(weird_url)
    850         newreq = h.do_request_(req)
    851         self.assertEqual(newreq.get_host(),'www.python.org')
    852         self.assertEqual(newreq.get_selector(),'/?getspam')
    853 
    854         url_without_path = 'http://www.python.org'
    855         req = Request(url_without_path)
    856         newreq = h.do_request_(req)
    857         self.assertEqual(newreq.get_host(),'www.python.org')
    858         self.assertEqual(newreq.get_selector(),'')
    859 
    860     def test_errors(self):
    861         h = urllib2.HTTPErrorProcessor()
    862         o = h.parent = MockOpener()
    863 
    864         url = "http://example.com/"
    865         req = Request(url)
    866         # all 2xx are passed through
    867         r = MockResponse(200, "OK", {}, "", url)
    868         newr = h.http_response(req, r)
    869         self.assertTrue(r is newr)
    870         self.assertTrue(not hasattr(o, "proto"))  # o.error not called
    871         r = MockResponse(202, "Accepted", {}, "", url)
    872         newr = h.http_response(req, r)
    873         self.assertTrue(r is newr)
    874         self.assertTrue(not hasattr(o, "proto"))  # o.error not called
    875         r = MockResponse(206, "Partial content", {}, "", url)
    876         newr = h.http_response(req, r)
    877         self.assertTrue(r is newr)
    878         self.assertTrue(not hasattr(o, "proto"))  # o.error not called
    879         # anything else calls o.error (and MockOpener returns None, here)
    880         r = MockResponse(502, "Bad gateway", {}, "", url)
    881         self.assertTrue(h.http_response(req, r) is None)
    882         self.assertEqual(o.proto, "http")  # o.error called
    883         self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
    884 
    885     def test_cookies(self):
    886         cj = MockCookieJar()
    887         h = urllib2.HTTPCookieProcessor(cj)
    888         o = h.parent = MockOpener()
    889 
    890         req = Request("http://example.com/")
    891         r = MockResponse(200, "OK", {}, "")
    892         newreq = h.http_request(req)
    893         self.assertTrue(cj.ach_req is req is newreq)
    894         self.assertEqual(req.get_origin_req_host(), "example.com")
    895         self.assertTrue(not req.is_unverifiable())
    896         newr = h.http_response(req, r)
    897         self.assertTrue(cj.ec_req is req)
    898         self.assertTrue(cj.ec_r is r is newr)
    899 
    900     def test_redirect(self):
    901         from_url = "http://example.com/a.html"
    902         to_url = "http://example.com/b.html"
    903         h = urllib2.HTTPRedirectHandler()
    904         o = h.parent = MockOpener()
    905 
    906         # ordinary redirect behaviour
    907         for code in 301, 302, 303, 307:
    908             for data in None, "blah\nblah\n":
    909                 method = getattr(h, "http_error_%s" % code)
    910                 req = Request(from_url, data)
    911                 req.add_header("Nonsense", "viking=withhold")
    912                 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    913                 if data is not None:
    914                     req.add_header("Content-Length", str(len(data)))
    915                 req.add_unredirected_header("Spam", "spam")
    916                 try:
    917                     method(req, MockFile(), code, "Blah",
    918                            MockHeaders({"location": to_url}))
    919                 except urllib2.HTTPError:
    920                     # 307 in response to POST requires user OK
    921                     self.assertTrue(code == 307 and data is not None)
    922                 self.assertEqual(o.req.get_full_url(), to_url)
    923                 try:
    924                     self.assertEqual(o.req.get_method(), "GET")
    925                 except AttributeError:
    926                     self.assertTrue(not o.req.has_data())
    927 
    928                 # now it's a GET, there should not be headers regarding content
    929                 # (possibly dragged from before being a POST)
    930                 headers = [x.lower() for x in o.req.headers]
    931                 self.assertNotIn("content-length", headers)
    932                 self.assertNotIn("content-type", headers)
    933 
    934                 self.assertEqual(o.req.headers["Nonsense"],
    935                                  "viking=withhold")
    936                 self.assertNotIn("Spam", o.req.headers)
    937                 self.assertNotIn("Spam", o.req.unredirected_hdrs)
    938 
    939         # loop detection
    940         req = Request(from_url)
    941         req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    942         def redirect(h, req, url=to_url):
    943             h.http_error_302(req, MockFile(), 302, "Blah",
    944                              MockHeaders({"location": url}))
    945         # Note that the *original* request shares the same record of
    946         # redirections with the sub-requests caused by the redirections.
    947 
    948         # detect infinite loop redirect of a URL to itself
    949         req = Request(from_url, origin_req_host="example.com")
    950         count = 0
    951         req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    952         try:
    953             while 1:
    954                 redirect(h, req, "http://example.com/")
    955                 count = count + 1
    956         except urllib2.HTTPError:
    957             # don't stop until max_repeats, because cookies may introduce state
    958             self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats)
    959 
    960         # detect endless non-repeating chain of redirects
    961         req = Request(from_url, origin_req_host="example.com")
    962         count = 0
    963         req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    964         try:
    965             while 1:
    966                 redirect(h, req, "http://example.com/%d" % count)
    967                 count = count + 1
    968         except urllib2.HTTPError:
    969             self.assertEqual(count,
    970                              urllib2.HTTPRedirectHandler.max_redirections)
    971 
    972     def test_invalid_redirect(self):
    973         from_url = "http://example.com/a.html"
    974         valid_schemes = ['http', 'https', 'ftp']
    975         invalid_schemes = ['file', 'imap', 'ldap']
    976         schemeless_url = "example.com/b.html"
    977         h = urllib2.HTTPRedirectHandler()
    978         o = h.parent = MockOpener()
    979         req = Request(from_url)
    980         req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    981 
    982         for scheme in invalid_schemes:
    983             invalid_url = scheme + '://' + schemeless_url
    984             self.assertRaises(urllib2.HTTPError, h.http_error_302,
    985                               req, MockFile(), 302, "Security Loophole",
    986                               MockHeaders({"location": invalid_url}))
    987 
    988         for scheme in valid_schemes:
    989             valid_url = scheme + '://' + schemeless_url
    990             h.http_error_302(req, MockFile(), 302, "That's fine",
    991                 MockHeaders({"location": valid_url}))
    992             self.assertEqual(o.req.get_full_url(), valid_url)
    993 
    994     def test_cookie_redirect(self):
    995         # cookies shouldn't leak into redirected requests
    996         from cookielib import CookieJar
    997 
    998         from test.test_cookielib import interact_netscape
    999 
   1000         cj = CookieJar()
   1001         interact_netscape(cj, "http://www.example.com/", "spam=eggs")
   1002         hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
   1003         hdeh = urllib2.HTTPDefaultErrorHandler()
   1004         hrh = urllib2.HTTPRedirectHandler()
   1005         cp = urllib2.HTTPCookieProcessor(cj)
   1006         o = build_test_opener(hh, hdeh, hrh, cp)
   1007         o.open("http://www.example.com/")
   1008         self.assertTrue(not hh.req.has_header("Cookie"))
   1009 
   1010     def test_redirect_fragment(self):
   1011         redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
   1012         hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
   1013         hdeh = urllib2.HTTPDefaultErrorHandler()
   1014         hrh = urllib2.HTTPRedirectHandler()
   1015         o = build_test_opener(hh, hdeh, hrh)
   1016         fp = o.open('http://www.example.com')
   1017         self.assertEqual(fp.geturl(), redirected_url.strip())
   1018 
   1019     def test_proxy(self):
   1020         o = OpenerDirector()
   1021         ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
   1022         o.add_handler(ph)
   1023         meth_spec = [
   1024             [("http_open", "return response")]
   1025             ]
   1026         handlers = add_ordered_mock_handlers(o, meth_spec)
   1027 
   1028         req = Request("http://acme.example.com/")
   1029         self.assertEqual(req.get_host(), "acme.example.com")
   1030         r = o.open(req)
   1031         self.assertEqual(req.get_host(), "proxy.example.com:3128")
   1032 
   1033         self.assertEqual([(handlers[0], "http_open")],
   1034                          [tup[0:2] for tup in o.calls])
   1035 
   1036     def test_proxy_no_proxy(self):
   1037         os.environ['no_proxy'] = 'python.org'
   1038         o = OpenerDirector()
   1039         ph = urllib2.ProxyHandler(dict(http="proxy.example.com"))
   1040         o.add_handler(ph)
   1041         req = Request("http://www.perl.org/")
   1042         self.assertEqual(req.get_host(), "www.perl.org")
   1043         r = o.open(req)
   1044         self.assertEqual(req.get_host(), "proxy.example.com")
   1045         req = Request("http://www.python.org")
   1046         self.assertEqual(req.get_host(), "www.python.org")
   1047         r = o.open(req)
   1048         self.assertEqual(req.get_host(), "www.python.org")
   1049         del os.environ['no_proxy']
   1050 
   1051 
   1052     def test_proxy_https(self):
   1053         o = OpenerDirector()
   1054         ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
   1055         o.add_handler(ph)
   1056         meth_spec = [
   1057             [("https_open","return response")]
   1058         ]
   1059         handlers = add_ordered_mock_handlers(o, meth_spec)
   1060         req = Request("https://www.example.com/")
   1061         self.assertEqual(req.get_host(), "www.example.com")
   1062         r = o.open(req)
   1063         self.assertEqual(req.get_host(), "proxy.example.com:3128")
   1064         self.assertEqual([(handlers[0], "https_open")],
   1065                          [tup[0:2] for tup in o.calls])
   1066 
   1067     def test_proxy_https_proxy_authorization(self):
   1068         o = OpenerDirector()
   1069         ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
   1070         o.add_handler(ph)
   1071         https_handler = MockHTTPSHandler()
   1072         o.add_handler(https_handler)
   1073         req = Request("https://www.example.com/")
   1074         req.add_header("Proxy-Authorization","FooBar")
   1075         req.add_header("User-Agent","Grail")
   1076         self.assertEqual(req.get_host(), "www.example.com")
   1077         self.assertIsNone(req._tunnel_host)
   1078         r = o.open(req)
   1079         # Verify Proxy-Authorization gets tunneled to request.

   1080         # httpsconn req_headers do not have the Proxy-Authorization header but

   1081         # the req will have.

   1082         self.assertNotIn(("Proxy-Authorization","FooBar"),
   1083                          https_handler.httpconn.req_headers)
   1084         self.assertIn(("User-Agent","Grail"),
   1085                       https_handler.httpconn.req_headers)
   1086         self.assertIsNotNone(req._tunnel_host)
   1087         self.assertEqual(req.get_host(), "proxy.example.com:3128")
   1088         self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
   1089 
   1090     def test_basic_auth(self, quote_char='"'):
   1091         opener = OpenerDirector()
   1092         password_manager = MockPasswordManager()
   1093         auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
   1094         realm = "ACME Widget Store"
   1095         http_handler = MockHTTPHandler(
   1096             401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
   1097             (quote_char, realm, quote_char) )
   1098         opener.add_handler(auth_handler)
   1099         opener.add_handler(http_handler)
   1100         self._test_basic_auth(opener, auth_handler, "Authorization",
   1101                               realm, http_handler, password_manager,
   1102                               "http://acme.example.com/protected",
   1103                               "http://acme.example.com/protected",
   1104                               )
   1105 
   1106     def test_basic_auth_with_single_quoted_realm(self):
   1107         self.test_basic_auth(quote_char="'")
   1108 
   1109     def test_proxy_basic_auth(self):
   1110         opener = OpenerDirector()
   1111         ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
   1112         opener.add_handler(ph)
   1113         password_manager = MockPasswordManager()
   1114         auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
   1115         realm = "ACME Networks"
   1116         http_handler = MockHTTPHandler(
   1117             407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
   1118         opener.add_handler(auth_handler)
   1119         opener.add_handler(http_handler)
   1120         self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
   1121                               realm, http_handler, password_manager,
   1122                               "http://acme.example.com:3128/protected",
   1123                               "proxy.example.com:3128",
   1124                               )
   1125 
   1126     def test_basic_and_digest_auth_handlers(self):
   1127         # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*

   1128         # response (http://python.org/sf/1479302), where it should instead

   1129         # return None to allow another handler (especially

   1130         # HTTPBasicAuthHandler) to handle the response.

   1131 
   1132         # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must

   1133         # try digest first (since it's the strongest auth scheme), so we record

   1134         # order of calls here to check digest comes first:

   1135         class RecordingOpenerDirector(OpenerDirector):
   1136             def __init__(self):
   1137                 OpenerDirector.__init__(self)
   1138                 self.recorded = []
   1139             def record(self, info):
   1140                 self.recorded.append(info)
   1141         class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
   1142             def http_error_401(self, *args, **kwds):
   1143                 self.parent.record("digest")
   1144                 urllib2.HTTPDigestAuthHandler.http_error_401(self,
   1145                                                              *args, **kwds)
   1146         class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
   1147             def http_error_401(self, *args, **kwds):
   1148                 self.parent.record("basic")
   1149                 urllib2.HTTPBasicAuthHandler.http_error_401(self,
   1150                                                             *args, **kwds)
   1151 
   1152         opener = RecordingOpenerDirector()
   1153         password_manager = MockPasswordManager()
   1154         digest_handler = TestDigestAuthHandler(password_manager)
   1155         basic_handler = TestBasicAuthHandler(password_manager)
   1156         realm = "ACME Networks"
   1157         http_handler = MockHTTPHandler(
   1158             401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
   1159         opener.add_handler(basic_handler)
   1160         opener.add_handler(digest_handler)
   1161         opener.add_handler(http_handler)
   1162 
   1163         # check basic auth isn't blocked by digest handler failing

   1164         self._test_basic_auth(opener, basic_handler, "Authorization",
   1165                               realm, http_handler, password_manager,
   1166                               "http://acme.example.com/protected",
   1167                               "http://acme.example.com/protected",
   1168                               )
   1169         # check digest was tried before basic (twice, because

   1170         # _test_basic_auth called .open() twice)

   1171         self.assertEqual(opener.recorded, ["digest", "basic"]*2)
   1172 
   1173     def _test_basic_auth(self, opener, auth_handler, auth_header,
   1174                          realm, http_handler, password_manager,
   1175                          request_url, protected_url):
   1176         import base64
   1177         user, password = "wile", "coyote"
   1178 
   1179         # .add_password() fed through to password manager

   1180         auth_handler.add_password(realm, request_url, user, password)
   1181         self.assertEqual(realm, password_manager.realm)
   1182         self.assertEqual(request_url, password_manager.url)
   1183         self.assertEqual(user, password_manager.user)
   1184         self.assertEqual(password, password_manager.password)
   1185 
   1186         r = opener.open(request_url)
   1187 
   1188         # should have asked the password manager for the username/password

   1189         self.assertEqual(password_manager.target_realm, realm)
   1190         self.assertEqual(password_manager.target_url, protected_url)
   1191 
   1192         # expect one request without authorization, then one with

   1193         self.assertEqual(len(http_handler.requests), 2)
   1194         self.assertFalse(http_handler.requests[0].has_header(auth_header))
   1195         userpass = '%s:%s' % (user, password)
   1196         auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip()
   1197         self.assertEqual(http_handler.requests[1].get_header(auth_header),
   1198                          auth_hdr_value)
   1199         self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header],
   1200                          auth_hdr_value)
   1201         # if the password manager can't find a password, the handler won't

   1202         # handle the HTTP auth error

   1203         password_manager.user = password_manager.password = None
   1204         http_handler.reset()
   1205         r = opener.open(request_url)
   1206         self.assertEqual(len(http_handler.requests), 1)
   1207         self.assertFalse(http_handler.requests[0].has_header(auth_header))
   1208 
   1209 class MiscTests(unittest.TestCase):
   1210 
   1211     def test_build_opener(self):
   1212         class MyHTTPHandler(urllib2.HTTPHandler): pass
   1213         class FooHandler(urllib2.BaseHandler):
   1214             def foo_open(self): pass
   1215         class BarHandler(urllib2.BaseHandler):
   1216             def bar_open(self): pass
   1217 
   1218         build_opener = urllib2.build_opener
   1219 
   1220         o = build_opener(FooHandler, BarHandler)
   1221         self.opener_has_handler(o, FooHandler)
   1222         self.opener_has_handler(o, BarHandler)
   1223 
   1224         # can take a mix of classes and instances

   1225         o = build_opener(FooHandler, BarHandler())
   1226         self.opener_has_handler(o, FooHandler)
   1227         self.opener_has_handler(o, BarHandler)
   1228 
   1229         # subclasses of default handlers override default handlers

   1230         o = build_opener(MyHTTPHandler)
   1231         self.opener_has_handler(o, MyHTTPHandler)
   1232 
   1233         # a particular case of overriding: default handlers can be passed

   1234         # in explicitly

   1235         o = build_opener()
   1236         self.opener_has_handler(o, urllib2.HTTPHandler)
   1237         o = build_opener(urllib2.HTTPHandler)
   1238         self.opener_has_handler(o, urllib2.HTTPHandler)
   1239         o = build_opener(urllib2.HTTPHandler())
   1240         self.opener_has_handler(o, urllib2.HTTPHandler)
   1241 
   1242         # Issue2670: multiple handlers sharing the same base class

   1243         class MyOtherHTTPHandler(urllib2.HTTPHandler): pass
   1244         o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)
   1245         self.opener_has_handler(o, MyHTTPHandler)
   1246         self.opener_has_handler(o, MyOtherHTTPHandler)
   1247 
   1248     def opener_has_handler(self, opener, handler_class):
   1249         for h in opener.handlers:
   1250             if h.__class__ == handler_class:
   1251                 break
   1252         else:
   1253             self.assertTrue(False)
   1254 
   1255 class RequestTests(unittest.TestCase):
   1256 
   1257     def setUp(self):
   1258         self.get = urllib2.Request("http://www.python.org/~jeremy/")
   1259         self.post = urllib2.Request("http://www.python.org/~jeremy/",
   1260                                     "data",
   1261                                     headers={"X-Test": "test"})
   1262 
   1263     def test_method(self):
   1264         self.assertEqual("POST", self.post.get_method())
   1265         self.assertEqual("GET", self.get.get_method())
   1266 
   1267     def test_add_data(self):
   1268         self.assertTrue(not self.get.has_data())
   1269         self.assertEqual("GET", self.get.get_method())
   1270         self.get.add_data("spam")
   1271         self.assertTrue(self.get.has_data())
   1272         self.assertEqual("POST", self.get.get_method())
   1273 
   1274     def test_get_full_url(self):
   1275         self.assertEqual("http://www.python.org/~jeremy/",
   1276                          self.get.get_full_url())
   1277 
   1278     def test_selector(self):
   1279         self.assertEqual("/~jeremy/", self.get.get_selector())
   1280         req = urllib2.Request("http://www.python.org/")
   1281         self.assertEqual("/", req.get_selector())
   1282 
   1283     def test_get_type(self):
   1284         self.assertEqual("http", self.get.get_type())
   1285 
   1286     def test_get_host(self):
   1287         self.assertEqual("www.python.org", self.get.get_host())
   1288 
   1289     def test_get_host_unquote(self):
   1290         req = urllib2.Request("http://www.%70ython.org/")
   1291         self.assertEqual("www.python.org", req.get_host())
   1292 
   1293     def test_proxy(self):
   1294         self.assertTrue(not self.get.has_proxy())
   1295         self.get.set_proxy("www.perl.org", "http")
   1296         self.assertTrue(self.get.has_proxy())
   1297         self.assertEqual("www.python.org", self.get.get_origin_req_host())
   1298         self.assertEqual("www.perl.org", self.get.get_host())
   1299 
   1300     def test_wrapped_url(self):
   1301         req = Request("<URL:http://www.python.org>")
   1302         self.assertEqual("www.python.org", req.get_host())
   1303 
   1304     def test_url_fragment(self):
   1305         req = Request("http://www.python.org/?qs=query#fragment=true")
   1306         self.assertEqual("/?qs=query", req.get_selector())
   1307         req = Request("http://www.python.org/#fun=true")
   1308         self.assertEqual("/", req.get_selector())
   1309 
   1310         # Issue 11703: geturl() omits fragment in the original URL.

   1311         url = 'http://docs.python.org/library/urllib2.html#OK'
   1312         req = Request(url)
   1313         self.assertEqual(req.get_full_url(), url)
   1314 
   1315 def test_main(verbose=None):
   1316     from test import test_urllib2
   1317     test_support.run_doctest(test_urllib2, verbose)
   1318     test_support.run_doctest(urllib2, verbose)
   1319     tests = (TrivialTests,
   1320              OpenerDirectorTests,
   1321              HandlerTests,
   1322              MiscTests,
   1323              RequestTests)
   1324     test_support.run_unittest(*tests)
   1325 
   1326 if __name__ == "__main__":
   1327     test_main(verbose=True)
   1328