Home | History | Annotate | Download | only in test
      1 import base64
      2 import os
      3 import email
      4 import urllib.parse
      5 import urllib.request
      6 import http.server
      7 import unittest
      8 import hashlib
      9 
     10 from test import support
     11 
     12 threading = support.import_module('threading')
     13 
     14 try:
     15     import ssl
     16 except ImportError:
     17     ssl = None
     18 
     19 here = os.path.dirname(__file__)
     20 # Self-signed cert file for 'localhost'
     21 CERT_localhost = os.path.join(here, 'keycert.pem')
     22 # Self-signed cert file for 'fakehostname'
     23 CERT_fakehostname = os.path.join(here, 'keycert2.pem')
     24 
     25 
     26 # Loopback http server infrastructure
     27 
     28 class LoopbackHttpServer(http.server.HTTPServer):
     29     """HTTP server w/ a few modifications that make it useful for
     30     loopback testing purposes.
     31     """
     32 
     33     def __init__(self, server_address, RequestHandlerClass):
     34         http.server.HTTPServer.__init__(self,
     35                                         server_address,
     36                                         RequestHandlerClass)
     37 
     38         # Set the timeout of our listening socket really low so
     39         # that we can stop the server easily.
     40         self.socket.settimeout(0.1)
     41 
     42     def get_request(self):
     43         """HTTPServer method, overridden."""
     44 
     45         request, client_address = self.socket.accept()
     46 
     47         # It's a loopback connection, so setting the timeout
     48         # really low shouldn't affect anything, but should make
     49         # deadlocks less likely to occur.
     50         request.settimeout(10.0)
     51 
     52         return (request, client_address)
     53 
     54 class LoopbackHttpServerThread(threading.Thread):
     55     """Stoppable thread that runs a loopback http server."""
     56 
     57     def __init__(self, request_handler):
     58         threading.Thread.__init__(self)
     59         self._stop_server = False
     60         self.ready = threading.Event()
     61         request_handler.protocol_version = "HTTP/1.0"
     62         self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
     63                                         request_handler)
     64         self.port = self.httpd.server_port
     65 
     66     def stop(self):
     67         """Stops the webserver if it's currently running."""
     68 
     69         self._stop_server = True
     70 
     71         self.join()
     72         self.httpd.server_close()
     73 
     74     def run(self):
     75         self.ready.set()
     76         while not self._stop_server:
     77             self.httpd.handle_request()
     78 
     79 # Authentication infrastructure
     80 
     81 class DigestAuthHandler:
     82     """Handler for performing digest authentication."""
     83 
     84     def __init__(self):
     85         self._request_num = 0
     86         self._nonces = []
     87         self._users = {}
     88         self._realm_name = "Test Realm"
     89         self._qop = "auth"
     90 
     91     def set_qop(self, qop):
     92         self._qop = qop
     93 
     94     def set_users(self, users):
     95         assert isinstance(users, dict)
     96         self._users = users
     97 
     98     def set_realm(self, realm):
     99         self._realm_name = realm
    100 
    101     def _generate_nonce(self):
    102         self._request_num += 1
    103         nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
    104         self._nonces.append(nonce)
    105         return nonce
    106 
    107     def _create_auth_dict(self, auth_str):
    108         first_space_index = auth_str.find(" ")
    109         auth_str = auth_str[first_space_index+1:]
    110 
    111         parts = auth_str.split(",")
    112 
    113         auth_dict = {}
    114         for part in parts:
    115             name, value = part.split("=")
    116             name = name.strip()
    117             if value[0] == '"' and value[-1] == '"':
    118                 value = value[1:-1]
    119             else:
    120                 value = value.strip()
    121             auth_dict[name] = value
    122         return auth_dict
    123 
    124     def _validate_auth(self, auth_dict, password, method, uri):
    125         final_dict = {}
    126         final_dict.update(auth_dict)
    127         final_dict["password"] = password
    128         final_dict["method"] = method
    129         final_dict["uri"] = uri
    130         HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
    131         HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
    132         HA2_str = "%(method)s:%(uri)s" % final_dict
    133         HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
    134         final_dict["HA1"] = HA1
    135         final_dict["HA2"] = HA2
    136         response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
    137                        "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
    138         response = hashlib.md5(response_str.encode("ascii")).hexdigest()
    139 
    140         return response == auth_dict["response"]
    141 
    142     def _return_auth_challenge(self, request_handler):
    143         request_handler.send_response(407, "Proxy Authentication Required")
    144         request_handler.send_header("Content-Type", "text/html")
    145         request_handler.send_header(
    146             'Proxy-Authenticate', 'Digest realm="%s", '
    147             'qop="%s",'
    148             'nonce="%s", ' % \
    149             (self._realm_name, self._qop, self._generate_nonce()))
    150         # XXX: Not sure if we're supposed to add this next header or
    151         # not.
    152         #request_handler.send_header('Connection', 'close')
    153         request_handler.end_headers()
    154         request_handler.wfile.write(b"Proxy Authentication Required.")
    155         return False
    156 
    157     def handle_request(self, request_handler):
    158         """Performs digest authentication on the given HTTP request
    159         handler.  Returns True if authentication was successful, False
    160         otherwise.
    161 
    162         If no users have been set, then digest auth is effectively
    163         disabled and this method will always return True.
    164         """
    165 
    166         if len(self._users) == 0:
    167             return True
    168 
    169         if "Proxy-Authorization" not in request_handler.headers:
    170             return self._return_auth_challenge(request_handler)
    171         else:
    172             auth_dict = self._create_auth_dict(
    173                 request_handler.headers["Proxy-Authorization"]
    174                 )
    175             if auth_dict["username"] in self._users:
    176                 password = self._users[ auth_dict["username"] ]
    177             else:
    178                 return self._return_auth_challenge(request_handler)
    179             if not auth_dict.get("nonce") in self._nonces:
    180                 return self._return_auth_challenge(request_handler)
    181             else:
    182                 self._nonces.remove(auth_dict["nonce"])
    183 
    184             auth_validated = False
    185 
    186             # MSIE uses short_path in its validation, but Python's
    187             # urllib.request uses the full path, so we're going to see if
    188             # either of them works here.
    189 
    190             for path in [request_handler.path, request_handler.short_path]:
    191                 if self._validate_auth(auth_dict,
    192                                        password,
    193                                        request_handler.command,
    194                                        path):
    195                     auth_validated = True
    196 
    197             if not auth_validated:
    198                 return self._return_auth_challenge(request_handler)
    199             return True
    200 
    201 
    202 class BasicAuthHandler(http.server.BaseHTTPRequestHandler):
    203     """Handler for performing basic authentication."""
    204     # Server side values
    205     USER = 'testUser'
    206     PASSWD = 'testPass'
    207     REALM = 'Test'
    208     USER_PASSWD = "%s:%s" % (USER, PASSWD)
    209     ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii')
    210 
    211     def __init__(self, *args, **kwargs):
    212         http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
    213 
    214     def log_message(self, format, *args):
    215         # Suppress console log message
    216         pass
    217 
    218     def do_HEAD(self):
    219         self.send_response(200)
    220         self.send_header("Content-type", "text/html")
    221         self.end_headers()
    222 
    223     def do_AUTHHEAD(self):
    224         self.send_response(401)
    225         self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
    226         self.send_header("Content-type", "text/html")
    227         self.end_headers()
    228 
    229     def do_GET(self):
    230         if not self.headers.get("Authorization", ""):
    231             self.do_AUTHHEAD()
    232             self.wfile.write(b"No Auth header received")
    233         elif self.headers.get(
    234                 "Authorization", "") == "Basic " + self.ENCODED_AUTH:
    235             self.send_response(200)
    236             self.end_headers()
    237             self.wfile.write(b"It works")
    238         else:
    239             # Request Unauthorized
    240             self.do_AUTHHEAD()
    241 
    242 
    243 
    244 # Proxy test infrastructure
    245 
    246 class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
    247     """This is a 'fake proxy' that makes it look like the entire
    248     internet has gone down due to a sudden zombie invasion.  It main
    249     utility is in providing us with authentication support for
    250     testing.
    251     """
    252 
    253     def __init__(self, digest_auth_handler, *args, **kwargs):
    254         # This has to be set before calling our parent's __init__(), which will
    255         # try to call do_GET().
    256         self.digest_auth_handler = digest_auth_handler
    257         http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
    258 
    259     def log_message(self, format, *args):
    260         # Uncomment the next line for debugging.
    261         # sys.stderr.write(format % args)
    262         pass
    263 
    264     def do_GET(self):
    265         (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
    266             self.path, "http")
    267         self.short_path = path
    268         if self.digest_auth_handler.handle_request(self):
    269             self.send_response(200, "OK")
    270             self.send_header("Content-Type", "text/html")
    271             self.end_headers()
    272             self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
    273                                    "ascii"))
    274             self.wfile.write(b"Our apologies, but our server is down due to "
    275                              b"a sudden zombie invasion.")
    276 
    277 # Test cases
    278 
    279 @unittest.skipUnless(threading, "Threading required for this test.")
    280 class BasicAuthTests(unittest.TestCase):
    281     USER = "testUser"
    282     PASSWD = "testPass"
    283     INCORRECT_PASSWD = "Incorrect"
    284     REALM = "Test"
    285 
    286     def setUp(self):
    287         super(BasicAuthTests, self).setUp()
    288         # With Basic Authentication
    289         def http_server_with_basic_auth_handler(*args, **kwargs):
    290             return BasicAuthHandler(*args, **kwargs)
    291         self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
    292         self.addCleanup(self.server.stop)
    293         self.server_url = 'http://127.0.0.1:%s' % self.server.port
    294         self.server.start()
    295         self.server.ready.wait()
    296 
    297     def tearDown(self):
    298         super(BasicAuthTests, self).tearDown()
    299 
    300     def test_basic_auth_success(self):
    301         ah = urllib.request.HTTPBasicAuthHandler()
    302         ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
    303         urllib.request.install_opener(urllib.request.build_opener(ah))
    304         try:
    305             self.assertTrue(urllib.request.urlopen(self.server_url))
    306         except urllib.error.HTTPError:
    307             self.fail("Basic auth failed for the url: %s", self.server_url)
    308 
    309     def test_basic_auth_httperror(self):
    310         ah = urllib.request.HTTPBasicAuthHandler()
    311         ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
    312         urllib.request.install_opener(urllib.request.build_opener(ah))
    313         self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
    314 
    315 
    316 @unittest.skipUnless(threading, "Threading required for this test.")
    317 class ProxyAuthTests(unittest.TestCase):
    318     URL = "http://localhost"
    319 
    320     USER = "tester"
    321     PASSWD = "test123"
    322     REALM = "TestRealm"
    323 
    324     def setUp(self):
    325         super(ProxyAuthTests, self).setUp()
    326         # Ignore proxy bypass settings in the environment.
    327         def restore_environ(old_environ):
    328             os.environ.clear()
    329             os.environ.update(old_environ)
    330         self.addCleanup(restore_environ, os.environ.copy())
    331         os.environ['NO_PROXY'] = ''
    332         os.environ['no_proxy'] = ''
    333 
    334         self.digest_auth_handler = DigestAuthHandler()
    335         self.digest_auth_handler.set_users({self.USER: self.PASSWD})
    336         self.digest_auth_handler.set_realm(self.REALM)
    337         # With Digest Authentication.
    338         def create_fake_proxy_handler(*args, **kwargs):
    339             return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
    340 
    341         self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
    342         self.server.start()
    343         self.server.ready.wait()
    344         proxy_url = "http://127.0.0.1:%d" % self.server.port
    345         handler = urllib.request.ProxyHandler({"http" : proxy_url})
    346         self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
    347         self.opener = urllib.request.build_opener(
    348             handler, self.proxy_digest_handler)
    349 
    350     def tearDown(self):
    351         self.server.stop()
    352         super(ProxyAuthTests, self).tearDown()
    353 
    354     def test_proxy_with_bad_password_raises_httperror(self):
    355         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    356                                                self.USER, self.PASSWD+"bad")
    357         self.digest_auth_handler.set_qop("auth")
    358         self.assertRaises(urllib.error.HTTPError,
    359                           self.opener.open,
    360                           self.URL)
    361 
    362     def test_proxy_with_no_password_raises_httperror(self):
    363         self.digest_auth_handler.set_qop("auth")
    364         self.assertRaises(urllib.error.HTTPError,
    365                           self.opener.open,
    366                           self.URL)
    367 
    368     def test_proxy_qop_auth_works(self):
    369         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    370                                                self.USER, self.PASSWD)
    371         self.digest_auth_handler.set_qop("auth")
    372         result = self.opener.open(self.URL)
    373         while result.read():
    374             pass
    375         result.close()
    376 
    377     def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
    378         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    379                                                self.USER, self.PASSWD)
    380         self.digest_auth_handler.set_qop("auth-int")
    381         try:
    382             result = self.opener.open(self.URL)
    383         except urllib.error.URLError:
    384             # It's okay if we don't support auth-int, but we certainly
    385             # shouldn't receive any kind of exception here other than
    386             # a URLError.
    387             result = None
    388         if result:
    389             while result.read():
    390                 pass
    391             result.close()
    392 
    393 
    394 def GetRequestHandler(responses):
    395 
    396     class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
    397 
    398         server_version = "TestHTTP/"
    399         requests = []
    400         headers_received = []
    401         port = 80
    402 
    403         def do_GET(self):
    404             body = self.send_head()
    405             while body:
    406                 done = self.wfile.write(body)
    407                 body = body[done:]
    408 
    409         def do_POST(self):
    410             content_length = self.headers["Content-Length"]
    411             post_data = self.rfile.read(int(content_length))
    412             self.do_GET()
    413             self.requests.append(post_data)
    414 
    415         def send_head(self):
    416             FakeHTTPRequestHandler.headers_received = self.headers
    417             self.requests.append(self.path)
    418             response_code, headers, body = responses.pop(0)
    419 
    420             self.send_response(response_code)
    421 
    422             for (header, value) in headers:
    423                 self.send_header(header, value % {'port':self.port})
    424             if body:
    425                 self.send_header("Content-type", "text/plain")
    426                 self.end_headers()
    427                 return body
    428             self.end_headers()
    429 
    430         def log_message(self, *args):
    431             pass
    432 
    433 
    434     return FakeHTTPRequestHandler
    435 
    436 
    437 @unittest.skipUnless(threading, "Threading required for this test.")
    438 class TestUrlopen(unittest.TestCase):
    439     """Tests urllib.request.urlopen using the network.
    440 
    441     These tests are not exhaustive.  Assuming that testing using files does a
    442     good job overall of some of the basic interface features.  There are no
    443     tests exercising the optional 'data' and 'proxies' arguments.  No tests
    444     for transparent redirection have been written.
    445     """
    446 
    447     def setUp(self):
    448         super(TestUrlopen, self).setUp()
    449 
    450         # Ignore proxies for localhost tests.
    451         def restore_environ(old_environ):
    452             os.environ.clear()
    453             os.environ.update(old_environ)
    454         self.addCleanup(restore_environ, os.environ.copy())
    455         os.environ['NO_PROXY'] = '*'
    456         os.environ['no_proxy'] = '*'
    457 
    458     def urlopen(self, url, data=None, **kwargs):
    459         l = []
    460         f = urllib.request.urlopen(url, data, **kwargs)
    461         try:
    462             # Exercise various methods
    463             l.extend(f.readlines(200))
    464             l.append(f.readline())
    465             l.append(f.read(1024))
    466             l.append(f.read())
    467         finally:
    468             f.close()
    469         return b"".join(l)
    470 
    471     def start_server(self, responses=None):
    472         if responses is None:
    473             responses = [(200, [], b"we don't care")]
    474         handler = GetRequestHandler(responses)
    475 
    476         self.server = LoopbackHttpServerThread(handler)
    477         self.addCleanup(self.server.stop)
    478         self.server.start()
    479         self.server.ready.wait()
    480         port = self.server.port
    481         handler.port = port
    482         return handler
    483 
    484     def start_https_server(self, responses=None, **kwargs):
    485         if not hasattr(urllib.request, 'HTTPSHandler'):
    486             self.skipTest('ssl support required')
    487         from test.ssl_servers import make_https_server
    488         if responses is None:
    489             responses = [(200, [], b"we care a bit")]
    490         handler = GetRequestHandler(responses)
    491         server = make_https_server(self, handler_class=handler, **kwargs)
    492         handler.port = server.port
    493         return handler
    494 
    495     def test_redirection(self):
    496         expected_response = b"We got here..."
    497         responses = [
    498             (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
    499              ""),
    500             (200, [], expected_response)
    501         ]
    502 
    503         handler = self.start_server(responses)
    504         data = self.urlopen("http://localhost:%s/" % handler.port)
    505         self.assertEqual(data, expected_response)
    506         self.assertEqual(handler.requests, ["/", "/somewhere_else"])
    507 
    508     def test_chunked(self):
    509         expected_response = b"hello world"
    510         chunked_start = (
    511                         b'a\r\n'
    512                         b'hello worl\r\n'
    513                         b'1\r\n'
    514                         b'd\r\n'
    515                         b'0\r\n'
    516                         )
    517         response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
    518         handler = self.start_server(response)
    519         data = self.urlopen("http://localhost:%s/" % handler.port)
    520         self.assertEqual(data, expected_response)
    521 
    522     def test_404(self):
    523         expected_response = b"Bad bad bad..."
    524         handler = self.start_server([(404, [], expected_response)])
    525 
    526         try:
    527             self.urlopen("http://localhost:%s/weeble" % handler.port)
    528         except urllib.error.URLError as f:
    529             data = f.read()
    530             f.close()
    531         else:
    532             self.fail("404 should raise URLError")
    533 
    534         self.assertEqual(data, expected_response)
    535         self.assertEqual(handler.requests, ["/weeble"])
    536 
    537     def test_200(self):
    538         expected_response = b"pycon 2008..."
    539         handler = self.start_server([(200, [], expected_response)])
    540         data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
    541         self.assertEqual(data, expected_response)
    542         self.assertEqual(handler.requests, ["/bizarre"])
    543 
    544     def test_200_with_parameters(self):
    545         expected_response = b"pycon 2008..."
    546         handler = self.start_server([(200, [], expected_response)])
    547         data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
    548                              b"get=with_feeling")
    549         self.assertEqual(data, expected_response)
    550         self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
    551 
    552     def test_https(self):
    553         handler = self.start_https_server()
    554         context = ssl.create_default_context(cafile=CERT_localhost)
    555         data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
    556         self.assertEqual(data, b"we care a bit")
    557 
    558     def test_https_with_cafile(self):
    559         handler = self.start_https_server(certfile=CERT_localhost)
    560         with support.check_warnings(('', DeprecationWarning)):
    561             # Good cert
    562             data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
    563                                 cafile=CERT_localhost)
    564             self.assertEqual(data, b"we care a bit")
    565             # Bad cert
    566             with self.assertRaises(urllib.error.URLError) as cm:
    567                 self.urlopen("https://localhost:%s/bizarre" % handler.port,
    568                              cafile=CERT_fakehostname)
    569             # Good cert, but mismatching hostname
    570             handler = self.start_https_server(certfile=CERT_fakehostname)
    571             with self.assertRaises(ssl.CertificateError) as cm:
    572                 self.urlopen("https://localhost:%s/bizarre" % handler.port,
    573                              cafile=CERT_fakehostname)
    574 
    575     def test_https_with_cadefault(self):
    576         handler = self.start_https_server(certfile=CERT_localhost)
    577         # Self-signed cert should fail verification with system certificate store
    578         with support.check_warnings(('', DeprecationWarning)):
    579             with self.assertRaises(urllib.error.URLError) as cm:
    580                 self.urlopen("https://localhost:%s/bizarre" % handler.port,
    581                              cadefault=True)
    582 
    583     def test_https_sni(self):
    584         if ssl is None:
    585             self.skipTest("ssl module required")
    586         if not ssl.HAS_SNI:
    587             self.skipTest("SNI support required in OpenSSL")
    588         sni_name = None
    589         def cb_sni(ssl_sock, server_name, initial_context):
    590             nonlocal sni_name
    591             sni_name = server_name
    592         context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    593         context.set_servername_callback(cb_sni)
    594         handler = self.start_https_server(context=context, certfile=CERT_localhost)
    595         context = ssl.create_default_context(cafile=CERT_localhost)
    596         self.urlopen("https://localhost:%s" % handler.port, context=context)
    597         self.assertEqual(sni_name, "localhost")
    598 
    599     def test_sending_headers(self):
    600         handler = self.start_server()
    601         req = urllib.request.Request("http://localhost:%s/" % handler.port,
    602                                      headers={"Range": "bytes=20-39"})
    603         with urllib.request.urlopen(req):
    604             pass
    605         self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
    606 
    607     def test_basic(self):
    608         handler = self.start_server()
    609         open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
    610         for attr in ("read", "close", "info", "geturl"):
    611             self.assertTrue(hasattr(open_url, attr), "object returned from "
    612                          "urlopen lacks the %s attribute" % attr)
    613         try:
    614             self.assertTrue(open_url.read(), "calling 'read' failed")
    615         finally:
    616             open_url.close()
    617 
    618     def test_info(self):
    619         handler = self.start_server()
    620         open_url = urllib.request.urlopen(
    621             "http://localhost:%s" % handler.port)
    622         with open_url:
    623             info_obj = open_url.info()
    624         self.assertIsInstance(info_obj, email.message.Message,
    625                               "object returned by 'info' is not an "
    626                               "instance of email.message.Message")
    627         self.assertEqual(info_obj.get_content_subtype(), "plain")
    628 
    629     def test_geturl(self):
    630         # Make sure same URL as opened is returned by geturl.
    631         handler = self.start_server()
    632         open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
    633         with open_url:
    634             url = open_url.geturl()
    635         self.assertEqual(url, "http://localhost:%s" % handler.port)
    636 
    637     def test_iteration(self):
    638         expected_response = b"pycon 2008..."
    639         handler = self.start_server([(200, [], expected_response)])
    640         data = urllib.request.urlopen("http://localhost:%s" % handler.port)
    641         for line in data:
    642             self.assertEqual(line, expected_response)
    643 
    644     def test_line_iteration(self):
    645         lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
    646         expected_response = b"".join(lines)
    647         handler = self.start_server([(200, [], expected_response)])
    648         data = urllib.request.urlopen("http://localhost:%s" % handler.port)
    649         for index, line in enumerate(data):
    650             self.assertEqual(line, lines[index],
    651                              "Fetched line number %s doesn't match expected:\n"
    652                              "    Expected length was %s, got %s" %
    653                              (index, len(lines[index]), len(line)))
    654         self.assertEqual(index + 1, len(lines))
    655 
    656 
    657 threads_key = None
    658 
    659 def setUpModule():
    660     # Store the threading_setup in a key and ensure that it is cleaned up
    661     # in the tearDown
    662     global threads_key
    663     threads_key = support.threading_setup()
    664 
    665 def tearDownModule():
    666     if threads_key:
    667         support.threading_cleanup(threads_key)
    668 
    669 if __name__ == "__main__":
    670     unittest.main()
    671