Home | History | Annotate | Download | only in test
      1 import os
      2 import base64
      3 import urlparse
      4 import urllib2
      5 import BaseHTTPServer
      6 import unittest
      7 import hashlib
      8 
      9 from test import test_support
     10 
     11 mimetools = test_support.import_module('mimetools', deprecated=True)
     12 threading = test_support.import_module('threading')
     13 
     14 try:
     15     import ssl
     16 except ImportError:
     17     ssl = None
     18 
     19 here = os.path.dirname(__file__)
     20 # Self-signed cert file for 'localhost'
     21 CERT_localhost = os.path.join(here, 'keycert.pem')
     22 # Self-signed cert file for 'fakehostname'
     23 CERT_fakehostname = os.path.join(here, 'keycert2.pem')
     24 
     25 # Loopback http server infrastructure
     26 
     27 class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
     28     """HTTP server w/ a few modifications that make it useful for
     29     loopback testing purposes.
     30     """
     31 
     32     def __init__(self, server_address, RequestHandlerClass):
     33         BaseHTTPServer.HTTPServer.__init__(self,
     34                                            server_address,
     35                                            RequestHandlerClass)
     36 
     37         # Set the timeout of our listening socket really low so
     38         # that we can stop the server easily.
     39         self.socket.settimeout(0.1)
     40 
     41     def get_request(self):
     42         """BaseHTTPServer method, overridden."""
     43 
     44         request, client_address = self.socket.accept()
     45 
     46         # It's a loopback connection, so setting the timeout
     47         # really low shouldn't affect anything, but should make
     48         # deadlocks less likely to occur.
     49         request.settimeout(10.0)
     50 
     51         return (request, client_address)
     52 
     53 class LoopbackHttpServerThread(threading.Thread):
     54     """Stoppable thread that runs a loopback http server."""
     55 
     56     def __init__(self, request_handler):
     57         threading.Thread.__init__(self)
     58         self._stop = False
     59         self.ready = threading.Event()
     60         request_handler.protocol_version = "HTTP/1.0"
     61         self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
     62                                         request_handler)
     63         #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
     64         #                                      self.httpd.server_port)
     65         self.port = self.httpd.server_port
     66 
     67     def stop(self):
     68         """Stops the webserver if it's currently running."""
     69 
     70         # Set the stop flag.
     71         self._stop = True
     72 
     73         self.join()
     74 
     75     def run(self):
     76         self.ready.set()
     77         while not self._stop:
     78             self.httpd.handle_request()
     79 
     80 # Authentication infrastructure
     81 
     82 
     83 class BasicAuthHandler(BaseHTTPServer.BaseHTTPRequestHandler):
     84     """Handler for performing Basic Authentication."""
     85     # Server side values
     86     USER = "testUser"
     87     PASSWD = "testPass"
     88     REALM = "Test"
     89     USER_PASSWD = "%s:%s" % (USER, PASSWD)
     90     ENCODED_AUTH = base64.b64encode(USER_PASSWD)
     91 
     92     def __init__(self, *args, **kwargs):
     93         BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
     94 
     95     def log_message(self, format, *args):
     96         # Suppress the HTTP Console log output
     97         pass
     98 
     99     def do_HEAD(self):
    100         self.send_response(200)
    101         self.send_header("Content-type", "text/html")
    102         self.end_headers()
    103 
    104     def do_AUTHHEAD(self):
    105         self.send_response(401)
    106         self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
    107         self.send_header("Content-type", "text/html")
    108         self.end_headers()
    109 
    110     def do_GET(self):
    111         if self.headers.getheader("Authorization") == None:
    112             self.do_AUTHHEAD()
    113             self.wfile.write("No Auth Header Received")
    114         elif self.headers.getheader(
    115                 "Authorization") == "Basic " + self.ENCODED_AUTH:
    116             self.wfile.write("It works!")
    117         else:
    118             # Unauthorized Request
    119             self.do_AUTHHEAD()
    120 
    121 
    122 class DigestAuthHandler:
    123     """Handler for performing digest authentication."""
    124 
    125     def __init__(self):
    126         self._request_num = 0
    127         self._nonces = []
    128         self._users = {}
    129         self._realm_name = "Test Realm"
    130         self._qop = "auth"
    131 
    132     def set_qop(self, qop):
    133         self._qop = qop
    134 
    135     def set_users(self, users):
    136         assert isinstance(users, dict)
    137         self._users = users
    138 
    139     def set_realm(self, realm):
    140         self._realm_name = realm
    141 
    142     def _generate_nonce(self):
    143         self._request_num += 1
    144         nonce = hashlib.md5(str(self._request_num)).hexdigest()
    145         self._nonces.append(nonce)
    146         return nonce
    147 
    148     def _create_auth_dict(self, auth_str):
    149         first_space_index = auth_str.find(" ")
    150         auth_str = auth_str[first_space_index+1:]
    151 
    152         parts = auth_str.split(",")
    153 
    154         auth_dict = {}
    155         for part in parts:
    156             name, value = part.split("=")
    157             name = name.strip()
    158             if value[0] == '"' and value[-1] == '"':
    159                 value = value[1:-1]
    160             else:
    161                 value = value.strip()
    162             auth_dict[name] = value
    163         return auth_dict
    164 
    165     def _validate_auth(self, auth_dict, password, method, uri):
    166         final_dict = {}
    167         final_dict.update(auth_dict)
    168         final_dict["password"] = password
    169         final_dict["method"] = method
    170         final_dict["uri"] = uri
    171         HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
    172         HA1 = hashlib.md5(HA1_str).hexdigest()
    173         HA2_str = "%(method)s:%(uri)s" % final_dict
    174         HA2 = hashlib.md5(HA2_str).hexdigest()
    175         final_dict["HA1"] = HA1
    176         final_dict["HA2"] = HA2
    177         response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
    178                        "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
    179         response = hashlib.md5(response_str).hexdigest()
    180 
    181         return response == auth_dict["response"]
    182 
    183     def _return_auth_challenge(self, request_handler):
    184         request_handler.send_response(407, "Proxy Authentication Required")
    185         request_handler.send_header("Content-Type", "text/html")
    186         request_handler.send_header(
    187             'Proxy-Authenticate', 'Digest realm="%s", '
    188             'qop="%s",'
    189             'nonce="%s", ' % \
    190             (self._realm_name, self._qop, self._generate_nonce()))
    191         # XXX: Not sure if we're supposed to add this next header or
    192         # not.
    193         #request_handler.send_header('Connection', 'close')
    194         request_handler.end_headers()
    195         request_handler.wfile.write("Proxy Authentication Required.")
    196         return False
    197 
    198     def handle_request(self, request_handler):
    199         """Performs digest authentication on the given HTTP request
    200         handler.  Returns True if authentication was successful, False
    201         otherwise.
    202 
    203         If no users have been set, then digest auth is effectively
    204         disabled and this method will always return True.
    205         """
    206 
    207         if len(self._users) == 0:
    208             return True
    209 
    210         if 'Proxy-Authorization' not in request_handler.headers:
    211             return self._return_auth_challenge(request_handler)
    212         else:
    213             auth_dict = self._create_auth_dict(
    214                 request_handler.headers['Proxy-Authorization']
    215                 )
    216             if auth_dict["username"] in self._users:
    217                 password = self._users[ auth_dict["username"] ]
    218             else:
    219                 return self._return_auth_challenge(request_handler)
    220             if not auth_dict.get("nonce") in self._nonces:
    221                 return self._return_auth_challenge(request_handler)
    222             else:
    223                 self._nonces.remove(auth_dict["nonce"])
    224 
    225             auth_validated = False
    226 
    227             # MSIE uses short_path in its validation, but Python's
    228             # urllib2 uses the full path, so we're going to see if
    229             # either of them works here.
    230 
    231             for path in [request_handler.path, request_handler.short_path]:
    232                 if self._validate_auth(auth_dict,
    233                                        password,
    234                                        request_handler.command,
    235                                        path):
    236                     auth_validated = True
    237 
    238             if not auth_validated:
    239                 return self._return_auth_challenge(request_handler)
    240             return True
    241 
    242 # Proxy test infrastructure
    243 
    244 class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    245     """This is a 'fake proxy' that makes it look like the entire
    246     internet has gone down due to a sudden zombie invasion.  It main
    247     utility is in providing us with authentication support for
    248     testing.
    249     """
    250 
    251     def __init__(self, digest_auth_handler, *args, **kwargs):
    252         # This has to be set before calling our parent's __init__(), which will
    253         # try to call do_GET().
    254         self.digest_auth_handler = digest_auth_handler
    255         BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
    256 
    257     def log_message(self, format, *args):
    258         # Uncomment the next line for debugging.
    259         #sys.stderr.write(format % args)
    260         pass
    261 
    262     def do_GET(self):
    263         (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
    264             self.path, 'http')
    265         self.short_path = path
    266         if self.digest_auth_handler.handle_request(self):
    267             self.send_response(200, "OK")
    268             self.send_header("Content-Type", "text/html")
    269             self.end_headers()
    270             self.wfile.write("You've reached %s!<BR>" % self.path)
    271             self.wfile.write("Our apologies, but our server is down due to "
    272                               "a sudden zombie invasion.")
    273 
    274 # Test cases
    275 
    276 class BaseTestCase(unittest.TestCase):
    277     def setUp(self):
    278         self._threads = test_support.threading_setup()
    279 
    280     def tearDown(self):
    281         self.doCleanups()
    282         test_support.threading_cleanup(*self._threads)
    283 
    284 
    285 class BasicAuthTests(BaseTestCase):
    286     USER = "testUser"
    287     PASSWD = "testPass"
    288     INCORRECT_PASSWD = "Incorrect"
    289     REALM = "Test"
    290 
    291     def setUp(self):
    292         super(BasicAuthTests, self).setUp()
    293         # With Basic Authentication
    294         def http_server_with_basic_auth_handler(*args, **kwargs):
    295             return BasicAuthHandler(*args, **kwargs)
    296         self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
    297         self.server_url = 'http://127.0.0.1:%s' % self.server.port
    298         self.server.start()
    299         self.server.ready.wait()
    300         self.addCleanup(self.server.stop)
    301 
    302     def test_basic_auth_success(self):
    303         ah = urllib2.HTTPBasicAuthHandler()
    304         ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
    305         urllib2.install_opener(urllib2.build_opener(ah))
    306         try:
    307             self.assertTrue(urllib2.urlopen(self.server_url))
    308         except urllib2.HTTPError:
    309             self.fail("Basic Auth Failed for url: %s" % self.server_url)
    310         except Exception as e:
    311             raise e
    312 
    313     def test_basic_auth_httperror(self):
    314         ah = urllib2.HTTPBasicAuthHandler()
    315         ah.add_password(self.REALM, self.server_url, self.USER,
    316                         self.INCORRECT_PASSWD)
    317         urllib2.install_opener(urllib2.build_opener(ah))
    318         self.assertRaises(urllib2.HTTPError, urllib2.urlopen, self.server_url)
    319 
    320 
    321 class ProxyAuthTests(BaseTestCase):
    322     URL = "http://localhost"
    323 
    324     USER = "tester"
    325     PASSWD = "test123"
    326     REALM = "TestRealm"
    327 
    328     def setUp(self):
    329         super(ProxyAuthTests, self).setUp()
    330         # Ignore proxy bypass settings in the environment.
    331         def restore_environ(old_environ):
    332             os.environ.clear()
    333             os.environ.update(old_environ)
    334         self.addCleanup(restore_environ, os.environ.copy())
    335         os.environ['NO_PROXY'] = ''
    336         os.environ['no_proxy'] = ''
    337 
    338         self.digest_auth_handler = DigestAuthHandler()
    339         self.digest_auth_handler.set_users({self.USER: self.PASSWD})
    340         self.digest_auth_handler.set_realm(self.REALM)
    341         # With Digest Authentication
    342         def create_fake_proxy_handler(*args, **kwargs):
    343             return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
    344 
    345         self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
    346         self.server.start()
    347         self.server.ready.wait()
    348         self.addCleanup(self.server.stop)
    349         proxy_url = "http://127.0.0.1:%d" % self.server.port
    350         handler = urllib2.ProxyHandler({"http" : proxy_url})
    351         self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
    352         self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
    353 
    354     def test_proxy_with_bad_password_raises_httperror(self):
    355         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    356                                                self.USER, self.PASSWD+"bad")
    357         self.digest_auth_handler.set_qop("auth")
    358         self.assertRaises(urllib2.HTTPError,
    359                           self.opener.open,
    360                           self.URL)
    361 
    362     def test_proxy_with_no_password_raises_httperror(self):
    363         self.digest_auth_handler.set_qop("auth")
    364         self.assertRaises(urllib2.HTTPError,
    365                           self.opener.open,
    366                           self.URL)
    367 
    368     def test_proxy_qop_auth_works(self):
    369         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    370                                                self.USER, self.PASSWD)
    371         self.digest_auth_handler.set_qop("auth")
    372         result = self.opener.open(self.URL)
    373         while result.read():
    374             pass
    375         result.close()
    376 
    377     def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
    378         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    379                                                self.USER, self.PASSWD)
    380         self.digest_auth_handler.set_qop("auth-int")
    381         try:
    382             result = self.opener.open(self.URL)
    383         except urllib2.URLError:
    384             # It's okay if we don't support auth-int, but we certainly
    385             # shouldn't receive any kind of exception here other than
    386             # a URLError.
    387             result = None
    388         if result:
    389             while result.read():
    390                 pass
    391             result.close()
    392 
    393 
    394 def GetRequestHandler(responses):
    395 
    396     class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    397 
    398         server_version = "TestHTTP/"
    399         requests = []
    400         headers_received = []
    401         port = 80
    402 
    403         def do_GET(self):
    404             body = self.send_head()
    405             if body:
    406                 self.wfile.write(body)
    407 
    408         def do_POST(self):
    409             content_length = self.headers['Content-Length']
    410             post_data = self.rfile.read(int(content_length))
    411             self.do_GET()
    412             self.requests.append(post_data)
    413 
    414         def send_head(self):
    415             FakeHTTPRequestHandler.headers_received = self.headers
    416             self.requests.append(self.path)
    417             response_code, headers, body = responses.pop(0)
    418 
    419             self.send_response(response_code)
    420 
    421             for (header, value) in headers:
    422                 self.send_header(header, value % self.port)
    423             if body:
    424                 self.send_header('Content-type', 'text/plain')
    425                 self.end_headers()
    426                 return body
    427             self.end_headers()
    428 
    429         def log_message(self, *args):
    430             pass
    431 
    432 
    433     return FakeHTTPRequestHandler
    434 
    435 
    436 class TestUrlopen(BaseTestCase):
    437     """Tests urllib2.urlopen using the network.
    438 
    439     These tests are not exhaustive.  Assuming that testing using files does a
    440     good job overall of some of the basic interface features.  There are no
    441     tests exercising the optional 'data' and 'proxies' arguments.  No tests
    442     for transparent redirection have been written.
    443     """
    444 
    445     def setUp(self):
    446         proxy_handler = urllib2.ProxyHandler({})
    447         opener = urllib2.build_opener(proxy_handler)
    448         urllib2.install_opener(opener)
    449         super(TestUrlopen, self).setUp()
    450 
    451     def urlopen(self, url, data=None, **kwargs):
    452         l = []
    453         f = urllib2.urlopen(url, data, **kwargs)
    454         try:
    455             # Exercise various methods
    456             l.extend(f.readlines(200))
    457             l.append(f.readline())
    458             l.append(f.read(1024))
    459             l.append(f.read())
    460         finally:
    461             f.close()
    462         return b"".join(l)
    463 
    464     def start_server(self, responses):
    465         handler = GetRequestHandler(responses)
    466 
    467         self.server = LoopbackHttpServerThread(handler)
    468         self.server.start()
    469         self.server.ready.wait()
    470         self.addCleanup(self.server.stop)
    471         port = self.server.port
    472         handler.port = port
    473         return handler
    474 
    475     def start_https_server(self, responses=None, **kwargs):
    476         if not hasattr(urllib2, 'HTTPSHandler'):
    477             self.skipTest('ssl support required')
    478         from test.ssl_servers import make_https_server
    479         if responses is None:
    480             responses = [(200, [], b"we care a bit")]
    481         handler = GetRequestHandler(responses)
    482         server = make_https_server(self, handler_class=handler, **kwargs)
    483         handler.port = server.port
    484         return handler
    485 
    486     def test_redirection(self):
    487         expected_response = 'We got here...'
    488         responses = [
    489             (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
    490             (200, [], expected_response)
    491         ]
    492 
    493         handler = self.start_server(responses)
    494 
    495         f = urllib2.urlopen('http://localhost:%s/' % handler.port)
    496         data = f.read()
    497         f.close()
    498 
    499         self.assertEqual(data, expected_response)
    500         self.assertEqual(handler.requests, ['/', '/somewhere_else'])
    501 
    502 
    503     def test_404(self):
    504         expected_response = 'Bad bad bad...'
    505         handler = self.start_server([(404, [], expected_response)])
    506 
    507         try:
    508             urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
    509         except urllib2.URLError, f:
    510             pass
    511         else:
    512             self.fail('404 should raise URLError')
    513 
    514         data = f.read()
    515         f.close()
    516 
    517         self.assertEqual(data, expected_response)
    518         self.assertEqual(handler.requests, ['/weeble'])
    519 
    520 
    521     def test_200(self):
    522         expected_response = 'pycon 2008...'
    523         handler = self.start_server([(200, [], expected_response)])
    524 
    525         f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
    526         data = f.read()
    527         f.close()
    528 
    529         self.assertEqual(data, expected_response)
    530         self.assertEqual(handler.requests, ['/bizarre'])
    531 
    532     def test_200_with_parameters(self):
    533         expected_response = 'pycon 2008...'
    534         handler = self.start_server([(200, [], expected_response)])
    535 
    536         f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
    537         data = f.read()
    538         f.close()
    539 
    540         self.assertEqual(data, expected_response)
    541         self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])
    542 
    543     def test_https(self):
    544         handler = self.start_https_server()
    545         context = ssl.create_default_context(cafile=CERT_localhost)
    546         data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
    547         self.assertEqual(data, b"we care a bit")
    548 
    549     def test_https_with_cafile(self):
    550         handler = self.start_https_server(certfile=CERT_localhost)
    551         # Good cert
    552         data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
    553                             cafile=CERT_localhost)
    554         self.assertEqual(data, b"we care a bit")
    555         # Bad cert
    556         with self.assertRaises(urllib2.URLError):
    557             self.urlopen("https://localhost:%s/bizarre" % handler.port,
    558                          cafile=CERT_fakehostname)
    559         # Good cert, but mismatching hostname
    560         handler = self.start_https_server(certfile=CERT_fakehostname)
    561         with self.assertRaises(ssl.CertificateError):
    562             self.urlopen("https://localhost:%s/bizarre" % handler.port,
    563                          cafile=CERT_fakehostname)
    564 
    565     def test_https_with_cadefault(self):
    566         handler = self.start_https_server(certfile=CERT_localhost)
    567         # Self-signed cert should fail verification with system certificate store
    568         with self.assertRaises(urllib2.URLError):
    569             self.urlopen("https://localhost:%s/bizarre" % handler.port,
    570                          cadefault=True)
    571 
    572     def test_https_sni(self):
    573         if ssl is None:
    574             self.skipTest("ssl module required")
    575         if not ssl.HAS_SNI:
    576             self.skipTest("SNI support required in OpenSSL")
    577         sni_name = [None]
    578         def cb_sni(ssl_sock, server_name, initial_context):
    579             sni_name[0] = server_name
    580         context = ssl.SSLContext(ssl.PROTOCOL_TLS)
    581         context.set_servername_callback(cb_sni)
    582         handler = self.start_https_server(context=context, certfile=CERT_localhost)
    583         context = ssl.create_default_context(cafile=CERT_localhost)
    584         self.urlopen("https://localhost:%s" % handler.port, context=context)
    585         self.assertEqual(sni_name[0], "localhost")
    586 
    587     def test_sending_headers(self):
    588         handler = self.start_server([(200, [], "we don't care")])
    589 
    590         req = urllib2.Request("http://localhost:%s/" % handler.port,
    591                               headers={'Range': 'bytes=20-39'})
    592         urllib2.urlopen(req)
    593         self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
    594 
    595     def test_basic(self):
    596         handler = self.start_server([(200, [], "we don't care")])
    597 
    598         open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
    599         for attr in ("read", "close", "info", "geturl"):
    600             self.assertTrue(hasattr(open_url, attr), "object returned from "
    601                          "urlopen lacks the %s attribute" % attr)
    602         try:
    603             self.assertTrue(open_url.read(), "calling 'read' failed")
    604         finally:
    605             open_url.close()
    606 
    607     def test_info(self):
    608         handler = self.start_server([(200, [], "we don't care")])
    609 
    610         open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
    611         info_obj = open_url.info()
    612         self.assertIsInstance(info_obj, mimetools.Message,
    613                               "object returned by 'info' is not an "
    614                               "instance of mimetools.Message")
    615         self.assertEqual(info_obj.getsubtype(), "plain")
    616 
    617     def test_geturl(self):
    618         # Make sure same URL as opened is returned by geturl.
    619         handler = self.start_server([(200, [], "we don't care")])
    620 
    621         open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
    622         url = open_url.geturl()
    623         self.assertEqual(url, "http://localhost:%s" % handler.port)
    624 
    625 
    626     def test_bad_address(self):
    627         # Make sure proper exception is raised when connecting to a bogus
    628         # address.
    629 
    630         # as indicated by the comment below, this might fail with some ISP,
    631         # so we run the test only when -unetwork/-uall is specified to
    632         # mitigate the problem a bit (see #17564)
    633         test_support.requires('network')
    634         self.assertRaises(IOError,
    635                           # Given that both VeriSign and various ISPs have in
    636                           # the past or are presently hijacking various invalid
    637                           # domain name requests in an attempt to boost traffic
    638                           # to their own sites, finding a domain name to use
    639                           # for this test is difficult.  RFC2606 leads one to
    640                           # believe that '.invalid' should work, but experience
    641                           # seemed to indicate otherwise.  Single character
    642                           # TLDs are likely to remain invalid, so this seems to
    643                           # be the best choice. The trailing '.' prevents a
    644                           # related problem: The normal DNS resolver appends
    645                           # the domain names from the search path if there is
    646                           # no '.' the end and, and if one of those domains
    647                           # implements a '*' rule a result is returned.
    648                           # However, none of this will prevent the test from
    649                           # failing if the ISP hijacks all invalid domain
    650                           # requests.  The real solution would be to be able to
    651                           # parameterize the framework with a mock resolver.
    652                           urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
    653 
    654     def test_iteration(self):
    655         expected_response = "pycon 2008..."
    656         handler = self.start_server([(200, [], expected_response)])
    657 
    658         data = urllib2.urlopen("http://localhost:%s" % handler.port)
    659         for line in data:
    660             self.assertEqual(line, expected_response)
    661 
    662     def ztest_line_iteration(self):
    663         lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]
    664         expected_response = "".join(lines)
    665         handler = self.start_server([(200, [], expected_response)])
    666         data = urllib2.urlopen("http://localhost:%s" % handler.port)
    667         for index, line in enumerate(data):
    668             self.assertEqual(line, lines[index],
    669                              "Fetched line number %s doesn't match expected:\n"
    670                              "    Expected length was %s, got %s" %
    671                              (index, len(lines[index]), len(line)))
    672         self.assertEqual(index + 1, len(lines))
    673 
    674 def test_main():
    675     # We will NOT depend on the network resource flag
    676     # (Lib/test/regrtest.py -u network) since all tests here are only
    677     # localhost.  However, if this is a bad rationale, then uncomment
    678     # the next line.
    679     #test_support.requires("network")
    680 
    681     test_support.run_unittest(BasicAuthTests, ProxyAuthTests, TestUrlopen)
    682 
    683 if __name__ == "__main__":
    684     test_main()
    685