Home | History | Annotate | Download | only in test
      1 import os
      2 import base64
      3 import urlparse
      4 import urllib2
      5 import BaseHTTPServer
      6 import unittest
      7 import hashlib
      8 
      9 from test import test_support
     10 
     11 mimetools = test_support.import_module('mimetools', deprecated=True)
     12 threading = test_support.import_module('threading')
     13 
     14 try:
     15     import ssl
     16 except ImportError:
     17     ssl = None
     18 
     19 here = os.path.dirname(__file__)
     20 # Self-signed cert file for 'localhost'
     21 CERT_localhost = os.path.join(here, 'keycert.pem')
     22 # Self-signed cert file for 'fakehostname'
     23 CERT_fakehostname = os.path.join(here, 'keycert2.pem')
     24 
     25 # Loopback http server infrastructure
     26 
     27 class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
     28     """HTTP server w/ a few modifications that make it useful for
     29     loopback testing purposes.
     30     """
     31 
     32     def __init__(self, server_address, RequestHandlerClass):
     33         BaseHTTPServer.HTTPServer.__init__(self,
     34                                            server_address,
     35                                            RequestHandlerClass)
     36 
     37         # Set the timeout of our listening socket really low so
     38         # that we can stop the server easily.
     39         self.socket.settimeout(0.1)
     40 
     41     def get_request(self):
     42         """BaseHTTPServer method, overridden."""
     43 
     44         request, client_address = self.socket.accept()
     45 
     46         # It's a loopback connection, so setting the timeout
     47         # really low shouldn't affect anything, but should make
     48         # deadlocks less likely to occur.
     49         request.settimeout(10.0)
     50 
     51         return (request, client_address)
     52 
     53 class LoopbackHttpServerThread(threading.Thread):
     54     """Stoppable thread that runs a loopback http server."""
     55 
     56     def __init__(self, request_handler):
     57         threading.Thread.__init__(self)
     58         self._stop = False
     59         self.ready = threading.Event()
     60         request_handler.protocol_version = "HTTP/1.0"
     61         self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
     62                                         request_handler)
     63         #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
     64         #                                      self.httpd.server_port)
     65         self.port = self.httpd.server_port
     66 
     67     def stop(self):
     68         """Stops the webserver if it's currently running."""
     69 
     70         # Set the stop flag.
     71         self._stop = True
     72 
     73         self.join()
     74 
     75     def run(self):
     76         self.ready.set()
     77         while not self._stop:
     78             self.httpd.handle_request()
     79 
     80 # Authentication infrastructure
     81 
     82 
     83 class BasicAuthHandler(BaseHTTPServer.BaseHTTPRequestHandler):
     84     """Handler for performing Basic Authentication."""
     85     # Server side values
     86     USER = "testUser"
     87     PASSWD = "testPass"
     88     REALM = "Test"
     89     USER_PASSWD = "%s:%s" % (USER, PASSWD)
     90     ENCODED_AUTH = base64.b64encode(USER_PASSWD)
     91 
     92     def __init__(self, *args, **kwargs):
     93         BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
     94 
     95     def log_message(self, format, *args):
     96         # Suppress the HTTP Console log output
     97         pass
     98 
     99     def do_HEAD(self):
    100         self.send_response(200)
    101         self.send_header("Content-type", "text/html")
    102         self.end_headers()
    103 
    104     def do_AUTHHEAD(self):
    105         self.send_response(401)
    106         self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
    107         self.send_header("Content-type", "text/html")
    108         self.end_headers()
    109 
    110     def do_GET(self):
    111         if self.headers.getheader("Authorization") == None:
    112             self.do_AUTHHEAD()
    113             self.wfile.write("No Auth Header Received")
    114         elif self.headers.getheader(
    115                 "Authorization") == "Basic " + self.ENCODED_AUTH:
    116             self.wfile.write("It works!")
    117         else:
    118             # Unauthorized Request
    119             self.do_AUTHHEAD()
    120 
    121 
    122 class DigestAuthHandler:
    123     """Handler for performing digest authentication."""
    124 
    125     def __init__(self):
    126         self._request_num = 0
    127         self._nonces = []
    128         self._users = {}
    129         self._realm_name = "Test Realm"
    130         self._qop = "auth"
    131 
    132     def set_qop(self, qop):
    133         self._qop = qop
    134 
    135     def set_users(self, users):
    136         assert isinstance(users, dict)
    137         self._users = users
    138 
    139     def set_realm(self, realm):
    140         self._realm_name = realm
    141 
    142     def _generate_nonce(self):
    143         self._request_num += 1
    144         nonce = hashlib.md5(str(self._request_num)).hexdigest()
    145         self._nonces.append(nonce)
    146         return nonce
    147 
    148     def _create_auth_dict(self, auth_str):
    149         first_space_index = auth_str.find(" ")
    150         auth_str = auth_str[first_space_index+1:]
    151 
    152         parts = auth_str.split(",")
    153 
    154         auth_dict = {}
    155         for part in parts:
    156             name, value = part.split("=")
    157             name = name.strip()
    158             if value[0] == '"' and value[-1] == '"':
    159                 value = value[1:-1]
    160             else:
    161                 value = value.strip()
    162             auth_dict[name] = value
    163         return auth_dict
    164 
    165     def _validate_auth(self, auth_dict, password, method, uri):
    166         final_dict = {}
    167         final_dict.update(auth_dict)
    168         final_dict["password"] = password
    169         final_dict["method"] = method
    170         final_dict["uri"] = uri
    171         HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
    172         HA1 = hashlib.md5(HA1_str).hexdigest()
    173         HA2_str = "%(method)s:%(uri)s" % final_dict
    174         HA2 = hashlib.md5(HA2_str).hexdigest()
    175         final_dict["HA1"] = HA1
    176         final_dict["HA2"] = HA2
    177         response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
    178                        "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
    179         response = hashlib.md5(response_str).hexdigest()
    180 
    181         return response == auth_dict["response"]
    182 
    183     def _return_auth_challenge(self, request_handler):
    184         request_handler.send_response(407, "Proxy Authentication Required")
    185         request_handler.send_header("Content-Type", "text/html")
    186         request_handler.send_header(
    187             'Proxy-Authenticate', 'Digest realm="%s", '
    188             'qop="%s",'
    189             'nonce="%s", ' % \
    190             (self._realm_name, self._qop, self._generate_nonce()))
    191         # XXX: Not sure if we're supposed to add this next header or
    192         # not.
    193         #request_handler.send_header('Connection', 'close')
    194         request_handler.end_headers()
    195         request_handler.wfile.write("Proxy Authentication Required.")
    196         return False
    197 
    198     def handle_request(self, request_handler):
    199         """Performs digest authentication on the given HTTP request
    200         handler.  Returns True if authentication was successful, False
    201         otherwise.
    202 
    203         If no users have been set, then digest auth is effectively
    204         disabled and this method will always return True.
    205         """
    206 
    207         if len(self._users) == 0:
    208             return True
    209 
    210         if 'Proxy-Authorization' not in request_handler.headers:
    211             return self._return_auth_challenge(request_handler)
    212         else:
    213             auth_dict = self._create_auth_dict(
    214                 request_handler.headers['Proxy-Authorization']
    215                 )
    216             if auth_dict["username"] in self._users:
    217                 password = self._users[ auth_dict["username"] ]
    218             else:
    219                 return self._return_auth_challenge(request_handler)
    220             if not auth_dict.get("nonce") in self._nonces:
    221                 return self._return_auth_challenge(request_handler)
    222             else:
    223                 self._nonces.remove(auth_dict["nonce"])
    224 
    225             auth_validated = False
    226 
    227             # MSIE uses short_path in its validation, but Python's
    228             # urllib2 uses the full path, so we're going to see if
    229             # either of them works here.
    230 
    231             for path in [request_handler.path, request_handler.short_path]:
    232                 if self._validate_auth(auth_dict,
    233                                        password,
    234                                        request_handler.command,
    235                                        path):
    236                     auth_validated = True
    237 
    238             if not auth_validated:
    239                 return self._return_auth_challenge(request_handler)
    240             return True
    241 
    242 # Proxy test infrastructure
    243 
    244 class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    245     """This is a 'fake proxy' that makes it look like the entire
    246     internet has gone down due to a sudden zombie invasion.  It main
    247     utility is in providing us with authentication support for
    248     testing.
    249     """
    250 
    251     def __init__(self, digest_auth_handler, *args, **kwargs):
    252         # This has to be set before calling our parent's __init__(), which will
    253         # try to call do_GET().
    254         self.digest_auth_handler = digest_auth_handler
    255         BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
    256 
    257     def log_message(self, format, *args):
    258         # Uncomment the next line for debugging.
    259         #sys.stderr.write(format % args)
    260         pass
    261 
    262     def do_GET(self):
    263         (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
    264             self.path, 'http')
    265         self.short_path = path
    266         if self.digest_auth_handler.handle_request(self):
    267             self.send_response(200, "OK")
    268             self.send_header("Content-Type", "text/html")
    269             self.end_headers()
    270             self.wfile.write("You've reached %s!<BR>" % self.path)
    271             self.wfile.write("Our apologies, but our server is down due to "
    272                               "a sudden zombie invasion.")
    273 
    274 # Test cases
    275 
    276 class BaseTestCase(unittest.TestCase):
    277     def setUp(self):
    278         self._threads = test_support.threading_setup()
    279 
    280     def tearDown(self):
    281         test_support.threading_cleanup(*self._threads)
    282 
    283 
    284 class BasicAuthTests(BaseTestCase):
    285     USER = "testUser"
    286     PASSWD = "testPass"
    287     INCORRECT_PASSWD = "Incorrect"
    288     REALM = "Test"
    289 
    290     def setUp(self):
    291         super(BasicAuthTests, self).setUp()
    292         # With Basic Authentication
    293         def http_server_with_basic_auth_handler(*args, **kwargs):
    294             return BasicAuthHandler(*args, **kwargs)
    295         self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
    296         self.server_url = 'http://127.0.0.1:%s' % self.server.port
    297         self.server.start()
    298         self.server.ready.wait()
    299 
    300     def tearDown(self):
    301         self.server.stop()
    302         super(BasicAuthTests, self).tearDown()
    303 
    304     def test_basic_auth_success(self):
    305         ah = urllib2.HTTPBasicAuthHandler()
    306         ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
    307         urllib2.install_opener(urllib2.build_opener(ah))
    308         try:
    309             self.assertTrue(urllib2.urlopen(self.server_url))
    310         except urllib2.HTTPError:
    311             self.fail("Basic Auth Failed for url: %s" % self.server_url)
    312         except Exception as e:
    313             raise e
    314 
    315     def test_basic_auth_httperror(self):
    316         ah = urllib2.HTTPBasicAuthHandler()
    317         ah.add_password(self.REALM, self.server_url, self.USER,
    318                         self.INCORRECT_PASSWD)
    319         urllib2.install_opener(urllib2.build_opener(ah))
    320         self.assertRaises(urllib2.HTTPError, urllib2.urlopen, self.server_url)
    321 
    322 
    323 class ProxyAuthTests(BaseTestCase):
    324     URL = "http://localhost"
    325 
    326     USER = "tester"
    327     PASSWD = "test123"
    328     REALM = "TestRealm"
    329 
    330     def setUp(self):
    331         super(ProxyAuthTests, self).setUp()
    332         # Ignore proxy bypass settings in the environment.
    333         def restore_environ(old_environ):
    334             os.environ.clear()
    335             os.environ.update(old_environ)
    336         self.addCleanup(restore_environ, os.environ.copy())
    337         os.environ['NO_PROXY'] = ''
    338         os.environ['no_proxy'] = ''
    339 
    340         self.digest_auth_handler = DigestAuthHandler()
    341         self.digest_auth_handler.set_users({self.USER: self.PASSWD})
    342         self.digest_auth_handler.set_realm(self.REALM)
    343         # With Digest Authentication
    344         def create_fake_proxy_handler(*args, **kwargs):
    345             return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
    346 
    347         self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
    348         self.server.start()
    349         self.server.ready.wait()
    350         proxy_url = "http://127.0.0.1:%d" % self.server.port
    351         handler = urllib2.ProxyHandler({"http" : proxy_url})
    352         self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
    353         self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
    354 
    355     def tearDown(self):
    356         self.server.stop()
    357         super(ProxyAuthTests, self).tearDown()
    358 
    359     def test_proxy_with_bad_password_raises_httperror(self):
    360         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    361                                                self.USER, self.PASSWD+"bad")
    362         self.digest_auth_handler.set_qop("auth")
    363         self.assertRaises(urllib2.HTTPError,
    364                           self.opener.open,
    365                           self.URL)
    366 
    367     def test_proxy_with_no_password_raises_httperror(self):
    368         self.digest_auth_handler.set_qop("auth")
    369         self.assertRaises(urllib2.HTTPError,
    370                           self.opener.open,
    371                           self.URL)
    372 
    373     def test_proxy_qop_auth_works(self):
    374         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    375                                                self.USER, self.PASSWD)
    376         self.digest_auth_handler.set_qop("auth")
    377         result = self.opener.open(self.URL)
    378         while result.read():
    379             pass
    380         result.close()
    381 
    382     def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
    383         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    384                                                self.USER, self.PASSWD)
    385         self.digest_auth_handler.set_qop("auth-int")
    386         try:
    387             result = self.opener.open(self.URL)
    388         except urllib2.URLError:
    389             # It's okay if we don't support auth-int, but we certainly
    390             # shouldn't receive any kind of exception here other than
    391             # a URLError.
    392             result = None
    393         if result:
    394             while result.read():
    395                 pass
    396             result.close()
    397 
    398 
    399 def GetRequestHandler(responses):
    400 
    401     class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    402 
    403         server_version = "TestHTTP/"
    404         requests = []
    405         headers_received = []
    406         port = 80
    407 
    408         def do_GET(self):
    409             body = self.send_head()
    410             if body:
    411                 self.wfile.write(body)
    412 
    413         def do_POST(self):
    414             content_length = self.headers['Content-Length']
    415             post_data = self.rfile.read(int(content_length))
    416             self.do_GET()
    417             self.requests.append(post_data)
    418 
    419         def send_head(self):
    420             FakeHTTPRequestHandler.headers_received = self.headers
    421             self.requests.append(self.path)
    422             response_code, headers, body = responses.pop(0)
    423 
    424             self.send_response(response_code)
    425 
    426             for (header, value) in headers:
    427                 self.send_header(header, value % self.port)
    428             if body:
    429                 self.send_header('Content-type', 'text/plain')
    430                 self.end_headers()
    431                 return body
    432             self.end_headers()
    433 
    434         def log_message(self, *args):
    435             pass
    436 
    437 
    438     return FakeHTTPRequestHandler
    439 
    440 
    441 class TestUrlopen(BaseTestCase):
    442     """Tests urllib2.urlopen using the network.
    443 
    444     These tests are not exhaustive.  Assuming that testing using files does a
    445     good job overall of some of the basic interface features.  There are no
    446     tests exercising the optional 'data' and 'proxies' arguments.  No tests
    447     for transparent redirection have been written.
    448     """
    449 
    450     def setUp(self):
    451         proxy_handler = urllib2.ProxyHandler({})
    452         opener = urllib2.build_opener(proxy_handler)
    453         urllib2.install_opener(opener)
    454         super(TestUrlopen, self).setUp()
    455 
    456     def urlopen(self, url, data=None, **kwargs):
    457         l = []
    458         f = urllib2.urlopen(url, data, **kwargs)
    459         try:
    460             # Exercise various methods
    461             l.extend(f.readlines(200))
    462             l.append(f.readline())
    463             l.append(f.read(1024))
    464             l.append(f.read())
    465         finally:
    466             f.close()
    467         return b"".join(l)
    468 
    469     def start_server(self, responses):
    470         handler = GetRequestHandler(responses)
    471 
    472         self.server = LoopbackHttpServerThread(handler)
    473         self.server.start()
    474         self.server.ready.wait()
    475         port = self.server.port
    476         handler.port = port
    477         return handler
    478 
    479     def start_https_server(self, responses=None, **kwargs):
    480         if not hasattr(urllib2, 'HTTPSHandler'):
    481             self.skipTest('ssl support required')
    482         from test.ssl_servers import make_https_server
    483         if responses is None:
    484             responses = [(200, [], b"we care a bit")]
    485         handler = GetRequestHandler(responses)
    486         server = make_https_server(self, handler_class=handler, **kwargs)
    487         handler.port = server.port
    488         return handler
    489 
    490     def test_redirection(self):
    491         expected_response = 'We got here...'
    492         responses = [
    493             (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
    494             (200, [], expected_response)
    495         ]
    496 
    497         handler = self.start_server(responses)
    498 
    499         try:
    500             f = urllib2.urlopen('http://localhost:%s/' % handler.port)
    501             data = f.read()
    502             f.close()
    503 
    504             self.assertEqual(data, expected_response)
    505             self.assertEqual(handler.requests, ['/', '/somewhere_else'])
    506         finally:
    507             self.server.stop()
    508 
    509 
    510     def test_404(self):
    511         expected_response = 'Bad bad bad...'
    512         handler = self.start_server([(404, [], expected_response)])
    513 
    514         try:
    515             try:
    516                 urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
    517             except urllib2.URLError, f:
    518                 pass
    519             else:
    520                 self.fail('404 should raise URLError')
    521 
    522             data = f.read()
    523             f.close()
    524 
    525             self.assertEqual(data, expected_response)
    526             self.assertEqual(handler.requests, ['/weeble'])
    527         finally:
    528             self.server.stop()
    529 
    530 
    531     def test_200(self):
    532         expected_response = 'pycon 2008...'
    533         handler = self.start_server([(200, [], expected_response)])
    534 
    535         try:
    536             f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
    537             data = f.read()
    538             f.close()
    539 
    540             self.assertEqual(data, expected_response)
    541             self.assertEqual(handler.requests, ['/bizarre'])
    542         finally:
    543             self.server.stop()
    544 
    545     def test_200_with_parameters(self):
    546         expected_response = 'pycon 2008...'
    547         handler = self.start_server([(200, [], expected_response)])
    548 
    549         try:
    550             f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
    551             data = f.read()
    552             f.close()
    553 
    554             self.assertEqual(data, expected_response)
    555             self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])
    556         finally:
    557             self.server.stop()
    558 
    559     def test_https(self):
    560         handler = self.start_https_server()
    561         context = ssl.create_default_context(cafile=CERT_localhost)
    562         data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
    563         self.assertEqual(data, b"we care a bit")
    564 
    565     def test_https_with_cafile(self):
    566         handler = self.start_https_server(certfile=CERT_localhost)
    567         # Good cert
    568         data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
    569                             cafile=CERT_localhost)
    570         self.assertEqual(data, b"we care a bit")
    571         # Bad cert
    572         with self.assertRaises(urllib2.URLError):
    573             self.urlopen("https://localhost:%s/bizarre" % handler.port,
    574                          cafile=CERT_fakehostname)
    575         # Good cert, but mismatching hostname
    576         handler = self.start_https_server(certfile=CERT_fakehostname)
    577         with self.assertRaises(ssl.CertificateError):
    578             self.urlopen("https://localhost:%s/bizarre" % handler.port,
    579                          cafile=CERT_fakehostname)
    580 
    581     def test_https_with_cadefault(self):
    582         handler = self.start_https_server(certfile=CERT_localhost)
    583         # Self-signed cert should fail verification with system certificate store
    584         with self.assertRaises(urllib2.URLError):
    585             self.urlopen("https://localhost:%s/bizarre" % handler.port,
    586                          cadefault=True)
    587 
    588     def test_https_sni(self):
    589         if ssl is None:
    590             self.skipTest("ssl module required")
    591         if not ssl.HAS_SNI:
    592             self.skipTest("SNI support required in OpenSSL")
    593         sni_name = [None]
    594         def cb_sni(ssl_sock, server_name, initial_context):
    595             sni_name[0] = server_name
    596         context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    597         context.set_servername_callback(cb_sni)
    598         handler = self.start_https_server(context=context, certfile=CERT_localhost)
    599         context = ssl.create_default_context(cafile=CERT_localhost)
    600         self.urlopen("https://localhost:%s" % handler.port, context=context)
    601         self.assertEqual(sni_name[0], "localhost")
    602 
    603     def test_sending_headers(self):
    604         handler = self.start_server([(200, [], "we don't care")])
    605 
    606         try:
    607             req = urllib2.Request("http://localhost:%s/" % handler.port,
    608                                   headers={'Range': 'bytes=20-39'})
    609             urllib2.urlopen(req)
    610             self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
    611         finally:
    612             self.server.stop()
    613 
    614     def test_basic(self):
    615         handler = self.start_server([(200, [], "we don't care")])
    616 
    617         try:
    618             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
    619             for attr in ("read", "close", "info", "geturl"):
    620                 self.assertTrue(hasattr(open_url, attr), "object returned from "
    621                              "urlopen lacks the %s attribute" % attr)
    622             try:
    623                 self.assertTrue(open_url.read(), "calling 'read' failed")
    624             finally:
    625                 open_url.close()
    626         finally:
    627             self.server.stop()
    628 
    629     def test_info(self):
    630         handler = self.start_server([(200, [], "we don't care")])
    631 
    632         try:
    633             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
    634             info_obj = open_url.info()
    635             self.assertIsInstance(info_obj, mimetools.Message,
    636                                   "object returned by 'info' is not an "
    637                                   "instance of mimetools.Message")
    638             self.assertEqual(info_obj.getsubtype(), "plain")
    639         finally:
    640             self.server.stop()
    641 
    642     def test_geturl(self):
    643         # Make sure same URL as opened is returned by geturl.
    644         handler = self.start_server([(200, [], "we don't care")])
    645 
    646         try:
    647             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
    648             url = open_url.geturl()
    649             self.assertEqual(url, "http://localhost:%s" % handler.port)
    650         finally:
    651             self.server.stop()
    652 
    653 
    654     def test_bad_address(self):
    655         # Make sure proper exception is raised when connecting to a bogus
    656         # address.
    657 
    658         # as indicated by the comment below, this might fail with some ISP,
    659         # so we run the test only when -unetwork/-uall is specified to
    660         # mitigate the problem a bit (see #17564)
    661         test_support.requires('network')
    662         self.assertRaises(IOError,
    663                           # Given that both VeriSign and various ISPs have in
    664                           # the past or are presently hijacking various invalid
    665                           # domain name requests in an attempt to boost traffic
    666                           # to their own sites, finding a domain name to use
    667                           # for this test is difficult.  RFC2606 leads one to
    668                           # believe that '.invalid' should work, but experience
    669                           # seemed to indicate otherwise.  Single character
    670                           # TLDs are likely to remain invalid, so this seems to
    671                           # be the best choice. The trailing '.' prevents a
    672                           # related problem: The normal DNS resolver appends
    673                           # the domain names from the search path if there is
    674                           # no '.' the end and, and if one of those domains
    675                           # implements a '*' rule a result is returned.
    676                           # However, none of this will prevent the test from
    677                           # failing if the ISP hijacks all invalid domain
    678                           # requests.  The real solution would be to be able to
    679                           # parameterize the framework with a mock resolver.
    680                           urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
    681 
    682     def test_iteration(self):
    683         expected_response = "pycon 2008..."
    684         handler = self.start_server([(200, [], expected_response)])
    685         try:
    686             data = urllib2.urlopen("http://localhost:%s" % handler.port)
    687             for line in data:
    688                 self.assertEqual(line, expected_response)
    689         finally:
    690             self.server.stop()
    691 
    692     def ztest_line_iteration(self):
    693         lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]
    694         expected_response = "".join(lines)
    695         handler = self.start_server([(200, [], expected_response)])
    696         try:
    697             data = urllib2.urlopen("http://localhost:%s" % handler.port)
    698             for index, line in enumerate(data):
    699                 self.assertEqual(line, lines[index],
    700                                  "Fetched line number %s doesn't match expected:\n"
    701                                  "    Expected length was %s, got %s" %
    702                                  (index, len(lines[index]), len(line)))
    703         finally:
    704             self.server.stop()
    705         self.assertEqual(index + 1, len(lines))
    706 
    707 def test_main():
    708     # We will NOT depend on the network resource flag
    709     # (Lib/test/regrtest.py -u network) since all tests here are only
    710     # localhost.  However, if this is a bad rationale, then uncomment
    711     # the next line.
    712     #test_support.requires("network")
    713 
    714     test_support.run_unittest(BasicAuthTests, ProxyAuthTests, TestUrlopen)
    715 
    716 if __name__ == "__main__":
    717     test_main()
    718