1 import unittest 2 from test import test_support 3 4 import os 5 import socket 6 import StringIO 7 8 import urllib2 9 from urllib2 import Request, OpenerDirector 10 11 # XXX 12 # Request 13 # CacheFTPHandler (hard to write) 14 # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler 15 16 class TrivialTests(unittest.TestCase): 17 def test_trivial(self): 18 # A couple trivial tests 19 20 self.assertRaises(ValueError, urllib2.urlopen, 'bogus url') 21 22 # XXX Name hacking to get this to work on Windows. 23 fname = os.path.abspath(urllib2.__file__).replace('\\', '/') 24 25 # And more hacking to get it to work on MacOS. This assumes 26 # urllib.pathname2url works, unfortunately... 27 if os.name == 'riscos': 28 import string 29 fname = os.expand(fname) 30 fname = fname.translate(string.maketrans("/.", "./")) 31 32 if os.name == 'nt': 33 file_url = "file:///%s" % fname 34 else: 35 file_url = "file://%s" % fname 36 37 f = urllib2.urlopen(file_url) 38 39 buf = f.read() 40 f.close() 41 42 def test_parse_http_list(self): 43 tests = [('a,b,c', ['a', 'b', 'c']), 44 ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']), 45 ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']), 46 ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])] 47 for string, list in tests: 48 self.assertEqual(urllib2.parse_http_list(string), list) 49 50 51 def test_request_headers_dict(): 52 """ 53 The Request.headers dictionary is not a documented interface. It should 54 stay that way, because the complete set of headers are only accessible 55 through the .get_header(), .has_header(), .header_items() interface. 56 However, .headers pre-dates those methods, and so real code will be using 57 the dictionary. 58 59 The introduction in 2.4 of those methods was a mistake for the same reason: 60 code that previously saw all (urllib2 user)-provided headers in .headers 61 now sees only a subset (and the function interface is ugly and incomplete). 62 A better change would have been to replace .headers dict with a dict 63 subclass (or UserDict.DictMixin instance?) that preserved the .headers 64 interface and also provided access to the "unredirected" headers. It's 65 probably too late to fix that, though. 66 67 68 Check .capitalize() case normalization: 69 70 >>> url = "http://example.com" 71 >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"] 72 'blah' 73 >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"] 74 'blah' 75 76 Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError, 77 but that could be changed in future. 78 79 """ 80 81 def test_request_headers_methods(): 82 """ 83 Note the case normalization of header names here, to .capitalize()-case. 84 This should be preserved for backwards-compatibility. (In the HTTP case, 85 normalization to .title()-case is done by urllib2 before sending headers to 86 httplib). 87 88 >>> url = "http://example.com" 89 >>> r = Request(url, headers={"Spam-eggs": "blah"}) 90 >>> r.has_header("Spam-eggs") 91 True 92 >>> r.header_items() 93 [('Spam-eggs', 'blah')] 94 >>> r.add_header("Foo-Bar", "baz") 95 >>> items = r.header_items() 96 >>> items.sort() 97 >>> items 98 [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')] 99 100 Note that e.g. r.has_header("spam-EggS") is currently False, and 101 r.get_header("spam-EggS") returns None, but that could be changed in 102 future. 103 104 >>> r.has_header("Not-there") 105 False 106 >>> print r.get_header("Not-there") 107 None 108 >>> r.get_header("Not-there", "default") 109 'default' 110 111 """ 112 113 114 def test_password_manager(self): 115 """ 116 >>> mgr = urllib2.HTTPPasswordMgr() 117 >>> add = mgr.add_password 118 >>> add("Some Realm", "http://example.com/", "joe", "password") 119 >>> add("Some Realm", "http://example.com/ni", "ni", "ni") 120 >>> add("c", "http://example.com/foo", "foo", "ni") 121 >>> add("c", "http://example.com/bar", "bar", "nini") 122 >>> add("b", "http://example.com/", "first", "blah") 123 >>> add("b", "http://example.com/", "second", "spam") 124 >>> add("a", "http://example.com", "1", "a") 125 >>> add("Some Realm", "http://c.example.com:3128", "3", "c") 126 >>> add("Some Realm", "d.example.com", "4", "d") 127 >>> add("Some Realm", "e.example.com:3128", "5", "e") 128 129 >>> mgr.find_user_password("Some Realm", "example.com") 130 ('joe', 'password') 131 >>> mgr.find_user_password("Some Realm", "http://example.com") 132 ('joe', 'password') 133 >>> mgr.find_user_password("Some Realm", "http://example.com/") 134 ('joe', 'password') 135 >>> mgr.find_user_password("Some Realm", "http://example.com/spam") 136 ('joe', 'password') 137 >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam") 138 ('joe', 'password') 139 >>> mgr.find_user_password("c", "http://example.com/foo") 140 ('foo', 'ni') 141 >>> mgr.find_user_password("c", "http://example.com/bar") 142 ('bar', 'nini') 143 144 Actually, this is really undefined ATM 145 ## Currently, we use the highest-level path where more than one match: 146 147 ## >>> mgr.find_user_password("Some Realm", "http://example.com/ni") 148 ## ('joe', 'password') 149 150 Use latest add_password() in case of conflict: 151 152 >>> mgr.find_user_password("b", "http://example.com/") 153 ('second', 'spam') 154 155 No special relationship between a.example.com and example.com: 156 157 >>> mgr.find_user_password("a", "http://example.com/") 158 ('1', 'a') 159 >>> mgr.find_user_password("a", "http://a.example.com/") 160 (None, None) 161 162 Ports: 163 164 >>> mgr.find_user_password("Some Realm", "c.example.com") 165 (None, None) 166 >>> mgr.find_user_password("Some Realm", "c.example.com:3128") 167 ('3', 'c') 168 >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128") 169 ('3', 'c') 170 >>> mgr.find_user_password("Some Realm", "d.example.com") 171 ('4', 'd') 172 >>> mgr.find_user_password("Some Realm", "e.example.com:3128") 173 ('5', 'e') 174 175 """ 176 pass 177 178 179 def test_password_manager_default_port(self): 180 """ 181 >>> mgr = urllib2.HTTPPasswordMgr() 182 >>> add = mgr.add_password 183 184 The point to note here is that we can't guess the default port if there's 185 no scheme. This applies to both add_password and find_user_password. 186 187 >>> add("f", "http://g.example.com:80", "10", "j") 188 >>> add("g", "http://h.example.com", "11", "k") 189 >>> add("h", "i.example.com:80", "12", "l") 190 >>> add("i", "j.example.com", "13", "m") 191 >>> mgr.find_user_password("f", "g.example.com:100") 192 (None, None) 193 >>> mgr.find_user_password("f", "g.example.com:80") 194 ('10', 'j') 195 >>> mgr.find_user_password("f", "g.example.com") 196 (None, None) 197 >>> mgr.find_user_password("f", "http://g.example.com:100") 198 (None, None) 199 >>> mgr.find_user_password("f", "http://g.example.com:80") 200 ('10', 'j') 201 >>> mgr.find_user_password("f", "http://g.example.com") 202 ('10', 'j') 203 >>> mgr.find_user_password("g", "h.example.com") 204 ('11', 'k') 205 >>> mgr.find_user_password("g", "h.example.com:80") 206 ('11', 'k') 207 >>> mgr.find_user_password("g", "http://h.example.com:80") 208 ('11', 'k') 209 >>> mgr.find_user_password("h", "i.example.com") 210 (None, None) 211 >>> mgr.find_user_password("h", "i.example.com:80") 212 ('12', 'l') 213 >>> mgr.find_user_password("h", "http://i.example.com:80") 214 ('12', 'l') 215 >>> mgr.find_user_password("i", "j.example.com") 216 ('13', 'm') 217 >>> mgr.find_user_password("i", "j.example.com:80") 218 (None, None) 219 >>> mgr.find_user_password("i", "http://j.example.com") 220 ('13', 'm') 221 >>> mgr.find_user_password("i", "http://j.example.com:80") 222 (None, None) 223 224 """ 225 226 class MockOpener: 227 addheaders = [] 228 def open(self, req, data=None,timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 229 self.req, self.data, self.timeout = req, data, timeout 230 def error(self, proto, *args): 231 self.proto, self.args = proto, args 232 233 class MockFile: 234 def read(self, count=None): pass 235 def readline(self, count=None): pass 236 def close(self): pass 237 238 class MockHeaders(dict): 239 def getheaders(self, name): 240 return self.values() 241 242 class MockResponse(StringIO.StringIO): 243 def __init__(self, code, msg, headers, data, url=None): 244 StringIO.StringIO.__init__(self, data) 245 self.code, self.msg, self.headers, self.url = code, msg, headers, url 246 def info(self): 247 return self.headers 248 def geturl(self): 249 return self.url 250 251 class MockCookieJar: 252 def add_cookie_header(self, request): 253 self.ach_req = request 254 def extract_cookies(self, response, request): 255 self.ec_req, self.ec_r = request, response 256 257 class FakeMethod: 258 def __init__(self, meth_name, action, handle): 259 self.meth_name = meth_name 260 self.handle = handle 261 self.action = action 262 def __call__(self, *args): 263 return self.handle(self.meth_name, self.action, *args) 264 265 class MockHTTPResponse: 266 def __init__(self, fp, msg, status, reason): 267 self.fp = fp 268 self.msg = msg 269 self.status = status 270 self.reason = reason 271 def read(self): 272 return '' 273 274 class MockHTTPClass: 275 def __init__(self): 276 self.req_headers = [] 277 self.data = None 278 self.raise_on_endheaders = False 279 self._tunnel_headers = {} 280 281 def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 282 self.host = host 283 self.timeout = timeout 284 return self 285 286 def set_debuglevel(self, level): 287 self.level = level 288 289 def set_tunnel(self, host, port=None, headers=None): 290 self._tunnel_host = host 291 self._tunnel_port = port 292 if headers: 293 self._tunnel_headers = headers 294 else: 295 self._tunnel_headers.clear() 296 297 def request(self, method, url, body=None, headers=None): 298 self.method = method 299 self.selector = url 300 if headers is not None: 301 self.req_headers += headers.items() 302 self.req_headers.sort() 303 if body: 304 self.data = body 305 if self.raise_on_endheaders: 306 import socket 307 raise socket.error() 308 309 def getresponse(self): 310 return MockHTTPResponse(MockFile(), {}, 200, "OK") 311 312 def close(self): 313 pass 314 315 class MockHandler: 316 # useful for testing handler machinery 317 # see add_ordered_mock_handlers() docstring 318 handler_order = 500 319 def __init__(self, methods): 320 self._define_methods(methods) 321 def _define_methods(self, methods): 322 for spec in methods: 323 if len(spec) == 2: name, action = spec 324 else: name, action = spec, None 325 meth = FakeMethod(name, action, self.handle) 326 setattr(self.__class__, name, meth) 327 def handle(self, fn_name, action, *args, **kwds): 328 self.parent.calls.append((self, fn_name, args, kwds)) 329 if action is None: 330 return None 331 elif action == "return self": 332 return self 333 elif action == "return response": 334 res = MockResponse(200, "OK", {}, "") 335 return res 336 elif action == "return request": 337 return Request("http://blah/") 338 elif action.startswith("error"): 339 code = action[action.rfind(" ")+1:] 340 try: 341 code = int(code) 342 except ValueError: 343 pass 344 res = MockResponse(200, "OK", {}, "") 345 return self.parent.error("http", args[0], res, code, "", {}) 346 elif action == "raise": 347 raise urllib2.URLError("blah") 348 assert False 349 def close(self): pass 350 def add_parent(self, parent): 351 self.parent = parent 352 self.parent.calls = [] 353 def __lt__(self, other): 354 if not hasattr(other, "handler_order"): 355 # No handler_order, leave in original order. Yuck. 356 return True 357 return self.handler_order < other.handler_order 358 359 def add_ordered_mock_handlers(opener, meth_spec): 360 """Create MockHandlers and add them to an OpenerDirector. 361 362 meth_spec: list of lists of tuples and strings defining methods to define 363 on handlers. eg: 364 365 [["http_error", "ftp_open"], ["http_open"]] 366 367 defines methods .http_error() and .ftp_open() on one handler, and 368 .http_open() on another. These methods just record their arguments and 369 return None. Using a tuple instead of a string causes the method to 370 perform some action (see MockHandler.handle()), eg: 371 372 [["http_error"], [("http_open", "return request")]] 373 374 defines .http_error() on one handler (which simply returns None), and 375 .http_open() on another handler, which returns a Request object. 376 377 """ 378 handlers = [] 379 count = 0 380 for meths in meth_spec: 381 class MockHandlerSubclass(MockHandler): pass 382 h = MockHandlerSubclass(meths) 383 h.handler_order += count 384 h.add_parent(opener) 385 count = count + 1 386 handlers.append(h) 387 opener.add_handler(h) 388 return handlers 389 390 def build_test_opener(*handler_instances): 391 opener = OpenerDirector() 392 for h in handler_instances: 393 opener.add_handler(h) 394 return opener 395 396 class MockHTTPHandler(urllib2.BaseHandler): 397 # useful for testing redirections and auth 398 # sends supplied headers and code as first response 399 # sends 200 OK as second response 400 def __init__(self, code, headers): 401 self.code = code 402 self.headers = headers 403 self.reset() 404 def reset(self): 405 self._count = 0 406 self.requests = [] 407 def http_open(self, req): 408 import mimetools, httplib, copy 409 from StringIO import StringIO 410 self.requests.append(copy.deepcopy(req)) 411 if self._count == 0: 412 self._count = self._count + 1 413 name = httplib.responses[self.code] 414 msg = mimetools.Message(StringIO(self.headers)) 415 return self.parent.error( 416 "http", req, MockFile(), self.code, name, msg) 417 else: 418 self.req = req 419 msg = mimetools.Message(StringIO("\r\n\r\n")) 420 return MockResponse(200, "OK", msg, "", req.get_full_url()) 421 422 class MockHTTPSHandler(urllib2.AbstractHTTPHandler): 423 # Useful for testing the Proxy-Authorization request by verifying the 424 # properties of httpcon 425 426 def __init__(self): 427 urllib2.AbstractHTTPHandler.__init__(self) 428 self.httpconn = MockHTTPClass() 429 430 def https_open(self, req): 431 return self.do_open(self.httpconn, req) 432 433 class MockPasswordManager: 434 def add_password(self, realm, uri, user, password): 435 self.realm = realm 436 self.url = uri 437 self.user = user 438 self.password = password 439 def find_user_password(self, realm, authuri): 440 self.target_realm = realm 441 self.target_url = authuri 442 return self.user, self.password 443 444 445 class OpenerDirectorTests(unittest.TestCase): 446 447 def test_add_non_handler(self): 448 class NonHandler(object): 449 pass 450 self.assertRaises(TypeError, 451 OpenerDirector().add_handler, NonHandler()) 452 453 def test_badly_named_methods(self): 454 # test work-around for three methods that accidentally follow the 455 # naming conventions for handler methods 456 # (*_open() / *_request() / *_response()) 457 458 # These used to call the accidentally-named methods, causing a 459 # TypeError in real code; here, returning self from these mock 460 # methods would either cause no exception, or AttributeError. 461 462 from urllib2 import URLError 463 464 o = OpenerDirector() 465 meth_spec = [ 466 [("do_open", "return self"), ("proxy_open", "return self")], 467 [("redirect_request", "return self")], 468 ] 469 handlers = add_ordered_mock_handlers(o, meth_spec) 470 o.add_handler(urllib2.UnknownHandler()) 471 for scheme in "do", "proxy", "redirect": 472 self.assertRaises(URLError, o.open, scheme+"://example.com/") 473 474 def test_handled(self): 475 # handler returning non-None means no more handlers will be called 476 o = OpenerDirector() 477 meth_spec = [ 478 ["http_open", "ftp_open", "http_error_302"], 479 ["ftp_open"], 480 [("http_open", "return self")], 481 [("http_open", "return self")], 482 ] 483 handlers = add_ordered_mock_handlers(o, meth_spec) 484 485 req = Request("http://example.com/") 486 r = o.open(req) 487 # Second .http_open() gets called, third doesn't, since second returned 488 # non-None. Handlers without .http_open() never get any methods called 489 # on them. 490 # In fact, second mock handler defining .http_open() returns self 491 # (instead of response), which becomes the OpenerDirector's return 492 # value. 493 self.assertEqual(r, handlers[2]) 494 calls = [(handlers[0], "http_open"), (handlers[2], "http_open")] 495 for expected, got in zip(calls, o.calls): 496 handler, name, args, kwds = got 497 self.assertEqual((handler, name), expected) 498 self.assertEqual(args, (req,)) 499 500 def test_handler_order(self): 501 o = OpenerDirector() 502 handlers = [] 503 for meths, handler_order in [ 504 ([("http_open", "return self")], 500), 505 (["http_open"], 0), 506 ]: 507 class MockHandlerSubclass(MockHandler): pass 508 h = MockHandlerSubclass(meths) 509 h.handler_order = handler_order 510 handlers.append(h) 511 o.add_handler(h) 512 513 r = o.open("http://example.com/") 514 # handlers called in reverse order, thanks to their sort order 515 self.assertEqual(o.calls[0][0], handlers[1]) 516 self.assertEqual(o.calls[1][0], handlers[0]) 517 518 def test_raise(self): 519 # raising URLError stops processing of request 520 o = OpenerDirector() 521 meth_spec = [ 522 [("http_open", "raise")], 523 [("http_open", "return self")], 524 ] 525 handlers = add_ordered_mock_handlers(o, meth_spec) 526 527 req = Request("http://example.com/") 528 self.assertRaises(urllib2.URLError, o.open, req) 529 self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})]) 530 531 ## def test_error(self): 532 ## # XXX this doesn't actually seem to be used in standard library, 533 ## # but should really be tested anyway... 534 535 def test_http_error(self): 536 # XXX http_error_default 537 # http errors are a special case 538 o = OpenerDirector() 539 meth_spec = [ 540 [("http_open", "error 302")], 541 [("http_error_400", "raise"), "http_open"], 542 [("http_error_302", "return response"), "http_error_303", 543 "http_error"], 544 [("http_error_302")], 545 ] 546 handlers = add_ordered_mock_handlers(o, meth_spec) 547 548 class Unknown: 549 def __eq__(self, other): return True 550 551 req = Request("http://example.com/") 552 r = o.open(req) 553 assert len(o.calls) == 2 554 calls = [(handlers[0], "http_open", (req,)), 555 (handlers[2], "http_error_302", 556 (req, Unknown(), 302, "", {}))] 557 for expected, got in zip(calls, o.calls): 558 handler, method_name, args = expected 559 self.assertEqual((handler, method_name), got[:2]) 560 self.assertEqual(args, got[2]) 561 562 def test_processors(self): 563 # *_request / *_response methods get called appropriately 564 o = OpenerDirector() 565 meth_spec = [ 566 [("http_request", "return request"), 567 ("http_response", "return response")], 568 [("http_request", "return request"), 569 ("http_response", "return response")], 570 ] 571 handlers = add_ordered_mock_handlers(o, meth_spec) 572 573 req = Request("http://example.com/") 574 r = o.open(req) 575 # processor methods are called on *all* handlers that define them, 576 # not just the first handler that handles the request 577 calls = [ 578 (handlers[0], "http_request"), (handlers[1], "http_request"), 579 (handlers[0], "http_response"), (handlers[1], "http_response")] 580 581 for i, (handler, name, args, kwds) in enumerate(o.calls): 582 if i < 2: 583 # *_request 584 self.assertEqual((handler, name), calls[i]) 585 self.assertEqual(len(args), 1) 586 self.assertIsInstance(args[0], Request) 587 else: 588 # *_response 589 self.assertEqual((handler, name), calls[i]) 590 self.assertEqual(len(args), 2) 591 self.assertIsInstance(args[0], Request) 592 # response from opener.open is None, because there's no 593 # handler that defines http_open to handle it 594 self.assertTrue(args[1] is None or 595 isinstance(args[1], MockResponse)) 596 597 598 def sanepathname2url(path): 599 import urllib 600 urlpath = urllib.pathname2url(path) 601 if os.name == "nt" and urlpath.startswith("///"): 602 urlpath = urlpath[2:] 603 # XXX don't ask me about the mac... 604 return urlpath 605 606 class HandlerTests(unittest.TestCase): 607 608 def test_ftp(self): 609 class MockFTPWrapper: 610 def __init__(self, data): self.data = data 611 def retrfile(self, filename, filetype): 612 self.filename, self.filetype = filename, filetype 613 return StringIO.StringIO(self.data), len(self.data) 614 def close(self): pass 615 616 class NullFTPHandler(urllib2.FTPHandler): 617 def __init__(self, data): self.data = data 618 def connect_ftp(self, user, passwd, host, port, dirs, 619 timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 620 self.user, self.passwd = user, passwd 621 self.host, self.port = host, port 622 self.dirs = dirs 623 self.ftpwrapper = MockFTPWrapper(self.data) 624 return self.ftpwrapper 625 626 import ftplib 627 data = "rheum rhaponicum" 628 h = NullFTPHandler(data) 629 o = h.parent = MockOpener() 630 631 for url, host, port, user, passwd, type_, dirs, filename, mimetype in [ 632 ("ftp://localhost/foo/bar/baz.html", 633 "localhost", ftplib.FTP_PORT, "", "", "I", 634 ["foo", "bar"], "baz.html", "text/html"), 635 ("ftp://parrot@localhost/foo/bar/baz.html", 636 "localhost", ftplib.FTP_PORT, "parrot", "", "I", 637 ["foo", "bar"], "baz.html", "text/html"), 638 ("ftp://%25parrot@localhost/foo/bar/baz.html", 639 "localhost", ftplib.FTP_PORT, "%parrot", "", "I", 640 ["foo", "bar"], "baz.html", "text/html"), 641 ("ftp://%2542parrot@localhost/foo/bar/baz.html", 642 "localhost", ftplib.FTP_PORT, "%42parrot", "", "I", 643 ["foo", "bar"], "baz.html", "text/html"), 644 ("ftp://localhost:80/foo/bar/", 645 "localhost", 80, "", "", "D", 646 ["foo", "bar"], "", None), 647 ("ftp://localhost/baz.gif;type=a", 648 "localhost", ftplib.FTP_PORT, "", "", "A", 649 [], "baz.gif", None), # XXX really this should guess image/gif 650 ]: 651 req = Request(url) 652 req.timeout = None 653 r = h.ftp_open(req) 654 # ftp authentication not yet implemented by FTPHandler 655 self.assertEqual(h.user, user) 656 self.assertEqual(h.passwd, passwd) 657 self.assertEqual(h.host, socket.gethostbyname(host)) 658 self.assertEqual(h.port, port) 659 self.assertEqual(h.dirs, dirs) 660 self.assertEqual(h.ftpwrapper.filename, filename) 661 self.assertEqual(h.ftpwrapper.filetype, type_) 662 headers = r.info() 663 self.assertEqual(headers.get("Content-type"), mimetype) 664 self.assertEqual(int(headers["Content-length"]), len(data)) 665 666 def test_file(self): 667 import rfc822, socket 668 h = urllib2.FileHandler() 669 o = h.parent = MockOpener() 670 671 TESTFN = test_support.TESTFN 672 urlpath = sanepathname2url(os.path.abspath(TESTFN)) 673 towrite = "hello, world\n" 674 urls = [ 675 "file://localhost%s" % urlpath, 676 "file://%s" % urlpath, 677 "file://%s%s" % (socket.gethostbyname('localhost'), urlpath), 678 ] 679 try: 680 localaddr = socket.gethostbyname(socket.gethostname()) 681 except socket.gaierror: 682 localaddr = '' 683 if localaddr: 684 urls.append("file://%s%s" % (localaddr, urlpath)) 685 686 for url in urls: 687 f = open(TESTFN, "wb") 688 try: 689 try: 690 f.write(towrite) 691 finally: 692 f.close() 693 694 r = h.file_open(Request(url)) 695 try: 696 data = r.read() 697 headers = r.info() 698 respurl = r.geturl() 699 finally: 700 r.close() 701 stats = os.stat(TESTFN) 702 modified = rfc822.formatdate(stats.st_mtime) 703 finally: 704 os.remove(TESTFN) 705 self.assertEqual(data, towrite) 706 self.assertEqual(headers["Content-type"], "text/plain") 707 self.assertEqual(headers["Content-length"], "13") 708 self.assertEqual(headers["Last-modified"], modified) 709 self.assertEqual(respurl, url) 710 711 for url in [ 712 "file://localhost:80%s" % urlpath, 713 "file:///file_does_not_exist.txt", 714 "file://%s:80%s/%s" % (socket.gethostbyname('localhost'), 715 os.getcwd(), TESTFN), 716 "file://somerandomhost.ontheinternet.com%s/%s" % 717 (os.getcwd(), TESTFN), 718 ]: 719 try: 720 f = open(TESTFN, "wb") 721 try: 722 f.write(towrite) 723 finally: 724 f.close() 725 726 self.assertRaises(urllib2.URLError, 727 h.file_open, Request(url)) 728 finally: 729 os.remove(TESTFN) 730 731 h = urllib2.FileHandler() 732 o = h.parent = MockOpener() 733 # XXXX why does // mean ftp (and /// mean not ftp!), and where 734 # is file: scheme specified? I think this is really a bug, and 735 # what was intended was to distinguish between URLs like: 736 # file:/blah.txt (a file) 737 # file://localhost/blah.txt (a file) 738 # file:///blah.txt (a file) 739 # file://ftp.example.com/blah.txt (an ftp URL) 740 for url, ftp in [ 741 ("file://ftp.example.com//foo.txt", True), 742 ("file://ftp.example.com///foo.txt", False), 743 # XXXX bug: fails with OSError, should be URLError 744 ("file://ftp.example.com/foo.txt", False), 745 ("file://somehost//foo/something.txt", True), 746 ("file://localhost//foo/something.txt", False), 747 ]: 748 req = Request(url) 749 try: 750 h.file_open(req) 751 # XXXX remove OSError when bug fixed 752 except (urllib2.URLError, OSError): 753 self.assertTrue(not ftp) 754 else: 755 self.assertTrue(o.req is req) 756 self.assertEqual(req.type, "ftp") 757 self.assertEqual(req.type == "ftp", ftp) 758 759 def test_http(self): 760 761 h = urllib2.AbstractHTTPHandler() 762 o = h.parent = MockOpener() 763 764 url = "http://example.com/" 765 for method, data in [("GET", None), ("POST", "blah")]: 766 req = Request(url, data, {"Foo": "bar"}) 767 req.timeout = None 768 req.add_unredirected_header("Spam", "eggs") 769 http = MockHTTPClass() 770 r = h.do_open(http, req) 771 772 # result attributes 773 r.read; r.readline # wrapped MockFile methods 774 r.info; r.geturl # addinfourl methods 775 r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply() 776 hdrs = r.info() 777 hdrs.get; hdrs.has_key # r.info() gives dict from .getreply() 778 self.assertEqual(r.geturl(), url) 779 780 self.assertEqual(http.host, "example.com") 781 self.assertEqual(http.level, 0) 782 self.assertEqual(http.method, method) 783 self.assertEqual(http.selector, "/") 784 self.assertEqual(http.req_headers, 785 [("Connection", "close"), 786 ("Foo", "bar"), ("Spam", "eggs")]) 787 self.assertEqual(http.data, data) 788 789 # check socket.error converted to URLError 790 http.raise_on_endheaders = True 791 self.assertRaises(urllib2.URLError, h.do_open, http, req) 792 793 # check adding of standard headers 794 o.addheaders = [("Spam", "eggs")] 795 for data in "", None: # POST, GET 796 req = Request("http://example.com/", data) 797 r = MockResponse(200, "OK", {}, "") 798 newreq = h.do_request_(req) 799 if data is None: # GET 800 self.assertNotIn("Content-length", req.unredirected_hdrs) 801 self.assertNotIn("Content-type", req.unredirected_hdrs) 802 else: # POST 803 self.assertEqual(req.unredirected_hdrs["Content-length"], "0") 804 self.assertEqual(req.unredirected_hdrs["Content-type"], 805 "application/x-www-form-urlencoded") 806 # XXX the details of Host could be better tested 807 self.assertEqual(req.unredirected_hdrs["Host"], "example.com") 808 self.assertEqual(req.unredirected_hdrs["Spam"], "eggs") 809 810 # don't clobber existing headers 811 req.add_unredirected_header("Content-length", "foo") 812 req.add_unredirected_header("Content-type", "bar") 813 req.add_unredirected_header("Host", "baz") 814 req.add_unredirected_header("Spam", "foo") 815 newreq = h.do_request_(req) 816 self.assertEqual(req.unredirected_hdrs["Content-length"], "foo") 817 self.assertEqual(req.unredirected_hdrs["Content-type"], "bar") 818 self.assertEqual(req.unredirected_hdrs["Host"], "baz") 819 self.assertEqual(req.unredirected_hdrs["Spam"], "foo") 820 821 def test_http_doubleslash(self): 822 # Checks that the presence of an unnecessary double slash in a url doesn't break anything 823 # Previously, a double slash directly after the host could cause incorrect parsing of the url 824 h = urllib2.AbstractHTTPHandler() 825 o = h.parent = MockOpener() 826 827 data = "" 828 ds_urls = [ 829 "http://example.com/foo/bar/baz.html", 830 "http://example.com//foo/bar/baz.html", 831 "http://example.com/foo//bar/baz.html", 832 "http://example.com/foo/bar//baz.html", 833 ] 834 835 for ds_url in ds_urls: 836 ds_req = Request(ds_url, data) 837 838 # Check whether host is determined correctly if there is no proxy 839 np_ds_req = h.do_request_(ds_req) 840 self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com") 841 842 # Check whether host is determined correctly if there is a proxy 843 ds_req.set_proxy("someproxy:3128",None) 844 p_ds_req = h.do_request_(ds_req) 845 self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com") 846 847 def test_fixpath_in_weirdurls(self): 848 # Issue4493: urllib2 to supply '/' when to urls where path does not 849 # start with'/' 850 851 h = urllib2.AbstractHTTPHandler() 852 o = h.parent = MockOpener() 853 854 weird_url = 'http://www.python.org?getspam' 855 req = Request(weird_url) 856 newreq = h.do_request_(req) 857 self.assertEqual(newreq.get_host(),'www.python.org') 858 self.assertEqual(newreq.get_selector(),'/?getspam') 859 860 url_without_path = 'http://www.python.org' 861 req = Request(url_without_path) 862 newreq = h.do_request_(req) 863 self.assertEqual(newreq.get_host(),'www.python.org') 864 self.assertEqual(newreq.get_selector(),'') 865 866 def test_errors(self): 867 h = urllib2.HTTPErrorProcessor() 868 o = h.parent = MockOpener() 869 870 url = "http://example.com/" 871 req = Request(url) 872 # all 2xx are passed through 873 r = MockResponse(200, "OK", {}, "", url) 874 newr = h.http_response(req, r) 875 self.assertTrue(r is newr) 876 self.assertTrue(not hasattr(o, "proto")) # o.error not called 877 r = MockResponse(202, "Accepted", {}, "", url) 878 newr = h.http_response(req, r) 879 self.assertTrue(r is newr) 880 self.assertTrue(not hasattr(o, "proto")) # o.error not called 881 r = MockResponse(206, "Partial content", {}, "", url) 882 newr = h.http_response(req, r) 883 self.assertTrue(r is newr) 884 self.assertTrue(not hasattr(o, "proto")) # o.error not called 885 # anything else calls o.error (and MockOpener returns None, here) 886 r = MockResponse(502, "Bad gateway", {}, "", url) 887 self.assertTrue(h.http_response(req, r) is None) 888 self.assertEqual(o.proto, "http") # o.error called 889 self.assertEqual(o.args, (req, r, 502, "Bad gateway", {})) 890 891 def test_cookies(self): 892 cj = MockCookieJar() 893 h = urllib2.HTTPCookieProcessor(cj) 894 o = h.parent = MockOpener() 895 896 req = Request("http://example.com/") 897 r = MockResponse(200, "OK", {}, "") 898 newreq = h.http_request(req) 899 self.assertTrue(cj.ach_req is req is newreq) 900 self.assertEqual(req.get_origin_req_host(), "example.com") 901 self.assertTrue(not req.is_unverifiable()) 902 newr = h.http_response(req, r) 903 self.assertTrue(cj.ec_req is req) 904 self.assertTrue(cj.ec_r is r is newr) 905 906 def test_redirect(self): 907 from_url = "http://example.com/a.html" 908 to_url = "http://example.com/b.html" 909 h = urllib2.HTTPRedirectHandler() 910 o = h.parent = MockOpener() 911 912 # ordinary redirect behaviour 913 for code in 301, 302, 303, 307: 914 for data in None, "blah\nblah\n": 915 method = getattr(h, "http_error_%s" % code) 916 req = Request(from_url, data) 917 req.add_header("Nonsense", "viking=withhold") 918 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 919 if data is not None: 920 req.add_header("Content-Length", str(len(data))) 921 req.add_unredirected_header("Spam", "spam") 922 try: 923 method(req, MockFile(), code, "Blah", 924 MockHeaders({"location": to_url})) 925 except urllib2.HTTPError: 926 # 307 in response to POST requires user OK 927 self.assertTrue(code == 307 and data is not None) 928 self.assertEqual(o.req.get_full_url(), to_url) 929 try: 930 self.assertEqual(o.req.get_method(), "GET") 931 except AttributeError: 932 self.assertTrue(not o.req.has_data()) 933 934 # now it's a GET, there should not be headers regarding content 935 # (possibly dragged from before being a POST) 936 headers = [x.lower() for x in o.req.headers] 937 self.assertNotIn("content-length", headers) 938 self.assertNotIn("content-type", headers) 939 940 self.assertEqual(o.req.headers["Nonsense"], 941 "viking=withhold") 942 self.assertNotIn("Spam", o.req.headers) 943 self.assertNotIn("Spam", o.req.unredirected_hdrs) 944 945 # loop detection 946 req = Request(from_url) 947 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 948 def redirect(h, req, url=to_url): 949 h.http_error_302(req, MockFile(), 302, "Blah", 950 MockHeaders({"location": url})) 951 # Note that the *original* request shares the same record of 952 # redirections with the sub-requests caused by the redirections. 953 954 # detect infinite loop redirect of a URL to itself 955 req = Request(from_url, origin_req_host="example.com") 956 count = 0 957 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 958 try: 959 while 1: 960 redirect(h, req, "http://example.com/") 961 count = count + 1 962 except urllib2.HTTPError: 963 # don't stop until max_repeats, because cookies may introduce state 964 self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats) 965 966 # detect endless non-repeating chain of redirects 967 req = Request(from_url, origin_req_host="example.com") 968 count = 0 969 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 970 try: 971 while 1: 972 redirect(h, req, "http://example.com/%d" % count) 973 count = count + 1 974 except urllib2.HTTPError: 975 self.assertEqual(count, 976 urllib2.HTTPRedirectHandler.max_redirections) 977 978 def test_invalid_redirect(self): 979 from_url = "http://example.com/a.html" 980 valid_schemes = ['http', 'https', 'ftp'] 981 invalid_schemes = ['file', 'imap', 'ldap'] 982 schemeless_url = "example.com/b.html" 983 h = urllib2.HTTPRedirectHandler() 984 o = h.parent = MockOpener() 985 req = Request(from_url) 986 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 987 988 for scheme in invalid_schemes: 989 invalid_url = scheme + '://' + schemeless_url 990 self.assertRaises(urllib2.HTTPError, h.http_error_302, 991 req, MockFile(), 302, "Security Loophole", 992 MockHeaders({"location": invalid_url})) 993 994 for scheme in valid_schemes: 995 valid_url = scheme + '://' + schemeless_url 996 h.http_error_302(req, MockFile(), 302, "That's fine", 997 MockHeaders({"location": valid_url})) 998 self.assertEqual(o.req.get_full_url(), valid_url) 999 1000 def test_cookie_redirect(self): 1001 # cookies shouldn't leak into redirected requests 1002 from cookielib import CookieJar 1003 1004 from test.test_cookielib import interact_netscape 1005 1006 cj = CookieJar() 1007 interact_netscape(cj, "http://www.example.com/", "spam=eggs") 1008 hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n") 1009 hdeh = urllib2.HTTPDefaultErrorHandler() 1010 hrh = urllib2.HTTPRedirectHandler() 1011 cp = urllib2.HTTPCookieProcessor(cj) 1012 o = build_test_opener(hh, hdeh, hrh, cp) 1013 o.open("http://www.example.com/") 1014 self.assertTrue(not hh.req.has_header("Cookie")) 1015 1016 def test_redirect_fragment(self): 1017 redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n' 1018 hh = MockHTTPHandler(302, 'Location: ' + redirected_url) 1019 hdeh = urllib2.HTTPDefaultErrorHandler() 1020 hrh = urllib2.HTTPRedirectHandler() 1021 o = build_test_opener(hh, hdeh, hrh) 1022 fp = o.open('http://www.example.com') 1023 self.assertEqual(fp.geturl(), redirected_url.strip()) 1024 1025 def test_proxy(self): 1026 o = OpenerDirector() 1027 ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128")) 1028 o.add_handler(ph) 1029 meth_spec = [ 1030 [("http_open", "return response")] 1031 ] 1032 handlers = add_ordered_mock_handlers(o, meth_spec) 1033 1034 req = Request("http://acme.example.com/") 1035 self.assertEqual(req.get_host(), "acme.example.com") 1036 r = o.open(req) 1037 self.assertEqual(req.get_host(), "proxy.example.com:3128") 1038 1039 self.assertEqual([(handlers[0], "http_open")], 1040 [tup[0:2] for tup in o.calls]) 1041 1042 def test_proxy_no_proxy(self): 1043 os.environ['no_proxy'] = 'python.org' 1044 o = OpenerDirector() 1045 ph = urllib2.ProxyHandler(dict(http="proxy.example.com")) 1046 o.add_handler(ph) 1047 req = Request("http://www.perl.org/") 1048 self.assertEqual(req.get_host(), "www.perl.org") 1049 r = o.open(req) 1050 self.assertEqual(req.get_host(), "proxy.example.com") 1051 req = Request("http://www.python.org") 1052 self.assertEqual(req.get_host(), "www.python.org") 1053 r = o.open(req) 1054 self.assertEqual(req.get_host(), "www.python.org") 1055 del os.environ['no_proxy'] 1056 1057 1058 def test_proxy_https(self): 1059 o = OpenerDirector() 1060 ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128')) 1061 o.add_handler(ph) 1062 meth_spec = [ 1063 [("https_open","return response")] 1064 ] 1065 handlers = add_ordered_mock_handlers(o, meth_spec) 1066 req = Request("https://www.example.com/") 1067 self.assertEqual(req.get_host(), "www.example.com") 1068 r = o.open(req) 1069 self.assertEqual(req.get_host(), "proxy.example.com:3128") 1070 self.assertEqual([(handlers[0], "https_open")], 1071 [tup[0:2] for tup in o.calls]) 1072 1073 def test_proxy_https_proxy_authorization(self): 1074 o = OpenerDirector() 1075 ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128')) 1076 o.add_handler(ph) 1077 https_handler = MockHTTPSHandler() 1078 o.add_handler(https_handler) 1079 req = Request("https://www.example.com/") 1080 req.add_header("Proxy-Authorization","FooBar") 1081 req.add_header("User-Agent","Grail") 1082 self.assertEqual(req.get_host(), "www.example.com") 1083 self.assertIsNone(req._tunnel_host) 1084 r = o.open(req) 1085 # Verify Proxy-Authorization gets tunneled to request. 1086 # httpsconn req_headers do not have the Proxy-Authorization header but 1087 # the req will have. 1088 self.assertNotIn(("Proxy-Authorization","FooBar"), 1089 https_handler.httpconn.req_headers) 1090 self.assertIn(("User-Agent","Grail"), 1091 https_handler.httpconn.req_headers) 1092 self.assertIsNotNone(req._tunnel_host) 1093 self.assertEqual(req.get_host(), "proxy.example.com:3128") 1094 self.assertEqual(req.get_header("Proxy-authorization"),"FooBar") 1095 1096 def test_basic_auth(self, quote_char='"'): 1097 opener = OpenerDirector() 1098 password_manager = MockPasswordManager() 1099 auth_handler = urllib2.HTTPBasicAuthHandler(password_manager) 1100 realm = "ACME Widget Store" 1101 http_handler = MockHTTPHandler( 1102 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' % 1103 (quote_char, realm, quote_char) ) 1104 opener.add_handler(auth_handler) 1105 opener.add_handler(http_handler) 1106 self._test_basic_auth(opener, auth_handler, "Authorization", 1107 realm, http_handler, password_manager, 1108 "http://acme.example.com/protected", 1109 "http://acme.example.com/protected" 1110 ) 1111 1112 def test_basic_auth_with_single_quoted_realm(self): 1113 self.test_basic_auth(quote_char="'") 1114 1115 def test_basic_auth_with_unquoted_realm(self): 1116 opener = OpenerDirector() 1117 password_manager = MockPasswordManager() 1118 auth_handler = urllib2.HTTPBasicAuthHandler(password_manager) 1119 realm = "ACME Widget Store" 1120 http_handler = MockHTTPHandler( 1121 401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm) 1122 opener.add_handler(auth_handler) 1123 opener.add_handler(http_handler) 1124 msg = "Basic Auth Realm was unquoted" 1125 with test_support.check_warnings((msg, UserWarning)): 1126 self._test_basic_auth(opener, auth_handler, "Authorization", 1127 realm, http_handler, password_manager, 1128 "http://acme.example.com/protected", 1129 "http://acme.example.com/protected" 1130 ) 1131 1132 1133 def test_proxy_basic_auth(self): 1134 opener = OpenerDirector() 1135 ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128")) 1136 opener.add_handler(ph) 1137 password_manager = MockPasswordManager() 1138 auth_handler = urllib2.ProxyBasicAuthHandler(password_manager) 1139 realm = "ACME Networks" 1140 http_handler = MockHTTPHandler( 1141 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1142 opener.add_handler(auth_handler) 1143 opener.add_handler(http_handler) 1144 self._test_basic_auth(opener, auth_handler, "Proxy-authorization", 1145 realm, http_handler, password_manager, 1146 "http://acme.example.com:3128/protected", 1147 "proxy.example.com:3128", 1148 ) 1149 1150 def test_basic_and_digest_auth_handlers(self): 1151 # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40* 1152 # response (http://python.org/sf/1479302), where it should instead 1153 # return None to allow another handler (especially 1154 # HTTPBasicAuthHandler) to handle the response. 1155 1156 # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must 1157 # try digest first (since it's the strongest auth scheme), so we record 1158 # order of calls here to check digest comes first: 1159 class RecordingOpenerDirector(OpenerDirector): 1160 def __init__(self): 1161 OpenerDirector.__init__(self) 1162 self.recorded = [] 1163 def record(self, info): 1164 self.recorded.append(info) 1165 class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler): 1166 def http_error_401(self, *args, **kwds): 1167 self.parent.record("digest") 1168 urllib2.HTTPDigestAuthHandler.http_error_401(self, 1169 *args, **kwds) 1170 class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler): 1171 def http_error_401(self, *args, **kwds): 1172 self.parent.record("basic") 1173 urllib2.HTTPBasicAuthHandler.http_error_401(self, 1174 *args, **kwds) 1175 1176 opener = RecordingOpenerDirector() 1177 password_manager = MockPasswordManager() 1178 digest_handler = TestDigestAuthHandler(password_manager) 1179 basic_handler = TestBasicAuthHandler(password_manager) 1180 realm = "ACME Networks" 1181 http_handler = MockHTTPHandler( 1182 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1183 opener.add_handler(basic_handler) 1184 opener.add_handler(digest_handler) 1185 opener.add_handler(http_handler) 1186 1187 # check basic auth isn't blocked by digest handler failing 1188 self._test_basic_auth(opener, basic_handler, "Authorization", 1189 realm, http_handler, password_manager, 1190 "http://acme.example.com/protected", 1191 "http://acme.example.com/protected", 1192 ) 1193 # check digest was tried before basic (twice, because 1194 # _test_basic_auth called .open() twice) 1195 self.assertEqual(opener.recorded, ["digest", "basic"]*2) 1196 1197 def _test_basic_auth(self, opener, auth_handler, auth_header, 1198 realm, http_handler, password_manager, 1199 request_url, protected_url): 1200 import base64 1201 user, password = "wile", "coyote" 1202 1203 # .add_password() fed through to password manager 1204 auth_handler.add_password(realm, request_url, user, password) 1205 self.assertEqual(realm, password_manager.realm) 1206 self.assertEqual(request_url, password_manager.url) 1207 self.assertEqual(user, password_manager.user) 1208 self.assertEqual(password, password_manager.password) 1209 1210 r = opener.open(request_url) 1211 1212 # should have asked the password manager for the username/password 1213 self.assertEqual(password_manager.target_realm, realm) 1214 self.assertEqual(password_manager.target_url, protected_url) 1215 1216 # expect one request without authorization, then one with 1217 self.assertEqual(len(http_handler.requests), 2) 1218 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1219 userpass = '%s:%s' % (user, password) 1220 auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip() 1221 self.assertEqual(http_handler.requests[1].get_header(auth_header), 1222 auth_hdr_value) 1223 self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header], 1224 auth_hdr_value) 1225 # if the password manager can't find a password, the handler won't 1226 # handle the HTTP auth error 1227 password_manager.user = password_manager.password = None 1228 http_handler.reset() 1229 r = opener.open(request_url) 1230 self.assertEqual(len(http_handler.requests), 1) 1231 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1232 1233 class MiscTests(unittest.TestCase): 1234 1235 def test_build_opener(self): 1236 class MyHTTPHandler(urllib2.HTTPHandler): pass 1237 class FooHandler(urllib2.BaseHandler): 1238 def foo_open(self): pass 1239 class BarHandler(urllib2.BaseHandler): 1240 def bar_open(self): pass 1241 1242 build_opener = urllib2.build_opener 1243 1244 o = build_opener(FooHandler, BarHandler) 1245 self.opener_has_handler(o, FooHandler) 1246 self.opener_has_handler(o, BarHandler) 1247 1248 # can take a mix of classes and instances 1249 o = build_opener(FooHandler, BarHandler()) 1250 self.opener_has_handler(o, FooHandler) 1251 self.opener_has_handler(o, BarHandler) 1252 1253 # subclasses of default handlers override default handlers 1254 o = build_opener(MyHTTPHandler) 1255 self.opener_has_handler(o, MyHTTPHandler) 1256 1257 # a particular case of overriding: default handlers can be passed 1258 # in explicitly 1259 o = build_opener() 1260 self.opener_has_handler(o, urllib2.HTTPHandler) 1261 o = build_opener(urllib2.HTTPHandler) 1262 self.opener_has_handler(o, urllib2.HTTPHandler) 1263 o = build_opener(urllib2.HTTPHandler()) 1264 self.opener_has_handler(o, urllib2.HTTPHandler) 1265 1266 # Issue2670: multiple handlers sharing the same base class 1267 class MyOtherHTTPHandler(urllib2.HTTPHandler): pass 1268 o = build_opener(MyHTTPHandler, MyOtherHTTPHandler) 1269 self.opener_has_handler(o, MyHTTPHandler) 1270 self.opener_has_handler(o, MyOtherHTTPHandler) 1271 1272 def opener_has_handler(self, opener, handler_class): 1273 for h in opener.handlers: 1274 if h.__class__ == handler_class: 1275 break 1276 else: 1277 self.assertTrue(False) 1278 1279 class RequestTests(unittest.TestCase): 1280 1281 def setUp(self): 1282 self.get = urllib2.Request("http://www.python.org/~jeremy/") 1283 self.post = urllib2.Request("http://www.python.org/~jeremy/", 1284 "data", 1285 headers={"X-Test": "test"}) 1286 1287 def test_method(self): 1288 self.assertEqual("POST", self.post.get_method()) 1289 self.assertEqual("GET", self.get.get_method()) 1290 1291 def test_add_data(self): 1292 self.assertTrue(not self.get.has_data()) 1293 self.assertEqual("GET", self.get.get_method()) 1294 self.get.add_data("spam") 1295 self.assertTrue(self.get.has_data()) 1296 self.assertEqual("POST", self.get.get_method()) 1297 1298 def test_get_full_url(self): 1299 self.assertEqual("http://www.python.org/~jeremy/", 1300 self.get.get_full_url()) 1301 1302 def test_selector(self): 1303 self.assertEqual("/~jeremy/", self.get.get_selector()) 1304 req = urllib2.Request("http://www.python.org/") 1305 self.assertEqual("/", req.get_selector()) 1306 1307 def test_get_type(self): 1308 self.assertEqual("http", self.get.get_type()) 1309 1310 def test_get_host(self): 1311 self.assertEqual("www.python.org", self.get.get_host()) 1312 1313 def test_get_host_unquote(self): 1314 req = urllib2.Request("http://www.%70ython.org/") 1315 self.assertEqual("www.python.org", req.get_host()) 1316 1317 def test_proxy(self): 1318 self.assertTrue(not self.get.has_proxy()) 1319 self.get.set_proxy("www.perl.org", "http") 1320 self.assertTrue(self.get.has_proxy()) 1321 self.assertEqual("www.python.org", self.get.get_origin_req_host()) 1322 self.assertEqual("www.perl.org", self.get.get_host()) 1323 1324 def test_wrapped_url(self): 1325 req = Request("<URL:http://www.python.org>") 1326 self.assertEqual("www.python.org", req.get_host()) 1327 1328 def test_url_fragment(self): 1329 req = Request("http://www.python.org/?qs=query#fragment=true") 1330 self.assertEqual("/?qs=query", req.get_selector()) 1331 req = Request("http://www.python.org/#fun=true") 1332 self.assertEqual("/", req.get_selector()) 1333 1334 # Issue 11703: geturl() omits fragment in the original URL. 1335 url = 'http://docs.python.org/library/urllib2.html#OK' 1336 req = Request(url) 1337 self.assertEqual(req.get_full_url(), url) 1338 1339 def test_HTTPError_interface(self): 1340 """ 1341 Issue 13211 reveals that HTTPError didn't implement the URLError 1342 interface even though HTTPError is a subclass of URLError. 1343 1344 >>> err = urllib2.HTTPError(msg='something bad happened', url=None, code=None, hdrs=None, fp=None) 1345 >>> assert hasattr(err, 'reason') 1346 >>> err.reason 1347 'something bad happened' 1348 """ 1349 1350 def test_HTTPError_interface_call(self): 1351 """ 1352 Issue 15701= - HTTPError interface has info method available from URLError. 1353 """ 1354 err = urllib2.HTTPError(msg='something bad happened', url=None, 1355 code=None, hdrs='Content-Length:42', fp=None) 1356 self.assertTrue(hasattr(err, 'reason')) 1357 assert hasattr(err, 'reason') 1358 assert hasattr(err, 'info') 1359 assert callable(err.info) 1360 try: 1361 err.info() 1362 except AttributeError: 1363 self.fail("err.info() failed") 1364 self.assertEqual(err.info(), "Content-Length:42") 1365 1366 def test_main(verbose=None): 1367 from test import test_urllib2 1368 test_support.run_doctest(test_urllib2, verbose) 1369 test_support.run_doctest(urllib2, verbose) 1370 tests = (TrivialTests, 1371 OpenerDirectorTests, 1372 HandlerTests, 1373 MiscTests, 1374 RequestTests) 1375 test_support.run_unittest(*tests) 1376 1377 if __name__ == "__main__": 1378 test_main(verbose=True) 1379