Home | History | Annotate | Download | only in test
      1 import base64
      2 import os
      3 import email
      4 import urllib.parse
      5 import urllib.request
      6 import http.server
      7 import threading
      8 import unittest
      9 import hashlib
     10 
     11 from test import support
     12 
     13 try:
     14     import ssl
     15 except ImportError:
     16     ssl = None
     17 
     18 here = os.path.dirname(__file__)
     19 # Self-signed cert file for 'localhost'
     20 CERT_localhost = os.path.join(here, 'keycert.pem')
     21 # Self-signed cert file for 'fakehostname'
     22 CERT_fakehostname = os.path.join(here, 'keycert2.pem')
     23 
     24 
     25 # Loopback http server infrastructure
     26 
     27 class LoopbackHttpServer(http.server.HTTPServer):
     28     """HTTP server w/ a few modifications that make it useful for
     29     loopback testing purposes.
     30     """
     31 
     32     def __init__(self, server_address, RequestHandlerClass):
     33         http.server.HTTPServer.__init__(self,
     34                                         server_address,
     35                                         RequestHandlerClass)
     36 
     37         # Set the timeout of our listening socket really low so
     38         # that we can stop the server easily.
     39         self.socket.settimeout(0.1)
     40 
     41     def get_request(self):
     42         """HTTPServer method, overridden."""
     43 
     44         request, client_address = self.socket.accept()
     45 
     46         # It's a loopback connection, so setting the timeout
     47         # really low shouldn't affect anything, but should make
     48         # deadlocks less likely to occur.
     49         request.settimeout(10.0)
     50 
     51         return (request, client_address)
     52 
     53 class LoopbackHttpServerThread(threading.Thread):
     54     """Stoppable thread that runs a loopback http server."""
     55 
     56     def __init__(self, request_handler):
     57         threading.Thread.__init__(self)
     58         self._stop_server = False
     59         self.ready = threading.Event()
     60         request_handler.protocol_version = "HTTP/1.0"
     61         self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
     62                                         request_handler)
     63         self.port = self.httpd.server_port
     64 
     65     def stop(self):
     66         """Stops the webserver if it's currently running."""
     67 
     68         self._stop_server = True
     69 
     70         self.join()
     71         self.httpd.server_close()
     72 
     73     def run(self):
     74         self.ready.set()
     75         while not self._stop_server:
     76             self.httpd.handle_request()
     77 
     78 # Authentication infrastructure
     79 
     80 class DigestAuthHandler:
     81     """Handler for performing digest authentication."""
     82 
     83     def __init__(self):
     84         self._request_num = 0
     85         self._nonces = []
     86         self._users = {}
     87         self._realm_name = "Test Realm"
     88         self._qop = "auth"
     89 
     90     def set_qop(self, qop):
     91         self._qop = qop
     92 
     93     def set_users(self, users):
     94         assert isinstance(users, dict)
     95         self._users = users
     96 
     97     def set_realm(self, realm):
     98         self._realm_name = realm
     99 
    100     def _generate_nonce(self):
    101         self._request_num += 1
    102         nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
    103         self._nonces.append(nonce)
    104         return nonce
    105 
    106     def _create_auth_dict(self, auth_str):
    107         first_space_index = auth_str.find(" ")
    108         auth_str = auth_str[first_space_index+1:]
    109 
    110         parts = auth_str.split(",")
    111 
    112         auth_dict = {}
    113         for part in parts:
    114             name, value = part.split("=")
    115             name = name.strip()
    116             if value[0] == '"' and value[-1] == '"':
    117                 value = value[1:-1]
    118             else:
    119                 value = value.strip()
    120             auth_dict[name] = value
    121         return auth_dict
    122 
    123     def _validate_auth(self, auth_dict, password, method, uri):
    124         final_dict = {}
    125         final_dict.update(auth_dict)
    126         final_dict["password"] = password
    127         final_dict["method"] = method
    128         final_dict["uri"] = uri
    129         HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
    130         HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
    131         HA2_str = "%(method)s:%(uri)s" % final_dict
    132         HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
    133         final_dict["HA1"] = HA1
    134         final_dict["HA2"] = HA2
    135         response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
    136                        "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
    137         response = hashlib.md5(response_str.encode("ascii")).hexdigest()
    138 
    139         return response == auth_dict["response"]
    140 
    141     def _return_auth_challenge(self, request_handler):
    142         request_handler.send_response(407, "Proxy Authentication Required")
    143         request_handler.send_header("Content-Type", "text/html")
    144         request_handler.send_header(
    145             'Proxy-Authenticate', 'Digest realm="%s", '
    146             'qop="%s",'
    147             'nonce="%s", ' % \
    148             (self._realm_name, self._qop, self._generate_nonce()))
    149         # XXX: Not sure if we're supposed to add this next header or
    150         # not.
    151         #request_handler.send_header('Connection', 'close')
    152         request_handler.end_headers()
    153         request_handler.wfile.write(b"Proxy Authentication Required.")
    154         return False
    155 
    156     def handle_request(self, request_handler):
    157         """Performs digest authentication on the given HTTP request
    158         handler.  Returns True if authentication was successful, False
    159         otherwise.
    160 
    161         If no users have been set, then digest auth is effectively
    162         disabled and this method will always return True.
    163         """
    164 
    165         if len(self._users) == 0:
    166             return True
    167 
    168         if "Proxy-Authorization" not in request_handler.headers:
    169             return self._return_auth_challenge(request_handler)
    170         else:
    171             auth_dict = self._create_auth_dict(
    172                 request_handler.headers["Proxy-Authorization"]
    173                 )
    174             if auth_dict["username"] in self._users:
    175                 password = self._users[ auth_dict["username"] ]
    176             else:
    177                 return self._return_auth_challenge(request_handler)
    178             if not auth_dict.get("nonce") in self._nonces:
    179                 return self._return_auth_challenge(request_handler)
    180             else:
    181                 self._nonces.remove(auth_dict["nonce"])
    182 
    183             auth_validated = False
    184 
    185             # MSIE uses short_path in its validation, but Python's
    186             # urllib.request uses the full path, so we're going to see if
    187             # either of them works here.
    188 
    189             for path in [request_handler.path, request_handler.short_path]:
    190                 if self._validate_auth(auth_dict,
    191                                        password,
    192                                        request_handler.command,
    193                                        path):
    194                     auth_validated = True
    195 
    196             if not auth_validated:
    197                 return self._return_auth_challenge(request_handler)
    198             return True
    199 
    200 
    201 class BasicAuthHandler(http.server.BaseHTTPRequestHandler):
    202     """Handler for performing basic authentication."""
    203     # Server side values
    204     USER = 'testUser'
    205     PASSWD = 'testPass'
    206     REALM = 'Test'
    207     USER_PASSWD = "%s:%s" % (USER, PASSWD)
    208     ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii')
    209 
    210     def __init__(self, *args, **kwargs):
    211         http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
    212 
    213     def log_message(self, format, *args):
    214         # Suppress console log message
    215         pass
    216 
    217     def do_HEAD(self):
    218         self.send_response(200)
    219         self.send_header("Content-type", "text/html")
    220         self.end_headers()
    221 
    222     def do_AUTHHEAD(self):
    223         self.send_response(401)
    224         self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
    225         self.send_header("Content-type", "text/html")
    226         self.end_headers()
    227 
    228     def do_GET(self):
    229         if not self.headers.get("Authorization", ""):
    230             self.do_AUTHHEAD()
    231             self.wfile.write(b"No Auth header received")
    232         elif self.headers.get(
    233                 "Authorization", "") == "Basic " + self.ENCODED_AUTH:
    234             self.send_response(200)
    235             self.end_headers()
    236             self.wfile.write(b"It works")
    237         else:
    238             # Request Unauthorized
    239             self.do_AUTHHEAD()
    240 
    241 
    242 
    243 # Proxy test infrastructure
    244 
    245 class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
    246     """This is a 'fake proxy' that makes it look like the entire
    247     internet has gone down due to a sudden zombie invasion.  It main
    248     utility is in providing us with authentication support for
    249     testing.
    250     """
    251 
    252     def __init__(self, digest_auth_handler, *args, **kwargs):
    253         # This has to be set before calling our parent's __init__(), which will
    254         # try to call do_GET().
    255         self.digest_auth_handler = digest_auth_handler
    256         http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
    257 
    258     def log_message(self, format, *args):
    259         # Uncomment the next line for debugging.
    260         # sys.stderr.write(format % args)
    261         pass
    262 
    263     def do_GET(self):
    264         (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
    265             self.path, "http")
    266         self.short_path = path
    267         if self.digest_auth_handler.handle_request(self):
    268             self.send_response(200, "OK")
    269             self.send_header("Content-Type", "text/html")
    270             self.end_headers()
    271             self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
    272                                    "ascii"))
    273             self.wfile.write(b"Our apologies, but our server is down due to "
    274                              b"a sudden zombie invasion.")
    275 
    276 # Test cases
    277 
    278 class BasicAuthTests(unittest.TestCase):
    279     USER = "testUser"
    280     PASSWD = "testPass"
    281     INCORRECT_PASSWD = "Incorrect"
    282     REALM = "Test"
    283 
    284     def setUp(self):
    285         super(BasicAuthTests, self).setUp()
    286         # With Basic Authentication
    287         def http_server_with_basic_auth_handler(*args, **kwargs):
    288             return BasicAuthHandler(*args, **kwargs)
    289         self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
    290         self.addCleanup(self.stop_server)
    291         self.server_url = 'http://127.0.0.1:%s' % self.server.port
    292         self.server.start()
    293         self.server.ready.wait()
    294 
    295     def stop_server(self):
    296         self.server.stop()
    297         self.server = None
    298 
    299     def tearDown(self):
    300         super(BasicAuthTests, self).tearDown()
    301 
    302     def test_basic_auth_success(self):
    303         ah = urllib.request.HTTPBasicAuthHandler()
    304         ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
    305         urllib.request.install_opener(urllib.request.build_opener(ah))
    306         try:
    307             self.assertTrue(urllib.request.urlopen(self.server_url))
    308         except urllib.error.HTTPError:
    309             self.fail("Basic auth failed for the url: %s" % self.server_url)
    310 
    311     def test_basic_auth_httperror(self):
    312         ah = urllib.request.HTTPBasicAuthHandler()
    313         ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
    314         urllib.request.install_opener(urllib.request.build_opener(ah))
    315         self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
    316 
    317 
    318 class ProxyAuthTests(unittest.TestCase):
    319     URL = "http://localhost"
    320 
    321     USER = "tester"
    322     PASSWD = "test123"
    323     REALM = "TestRealm"
    324 
    325     def setUp(self):
    326         super(ProxyAuthTests, self).setUp()
    327         # Ignore proxy bypass settings in the environment.
    328         def restore_environ(old_environ):
    329             os.environ.clear()
    330             os.environ.update(old_environ)
    331         self.addCleanup(restore_environ, os.environ.copy())
    332         os.environ['NO_PROXY'] = ''
    333         os.environ['no_proxy'] = ''
    334 
    335         self.digest_auth_handler = DigestAuthHandler()
    336         self.digest_auth_handler.set_users({self.USER: self.PASSWD})
    337         self.digest_auth_handler.set_realm(self.REALM)
    338         # With Digest Authentication.
    339         def create_fake_proxy_handler(*args, **kwargs):
    340             return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
    341 
    342         self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
    343         self.addCleanup(self.stop_server)
    344         self.server.start()
    345         self.server.ready.wait()
    346         proxy_url = "http://127.0.0.1:%d" % self.server.port
    347         handler = urllib.request.ProxyHandler({"http" : proxy_url})
    348         self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
    349         self.opener = urllib.request.build_opener(
    350             handler, self.proxy_digest_handler)
    351 
    352     def stop_server(self):
    353         self.server.stop()
    354         self.server = None
    355 
    356     def test_proxy_with_bad_password_raises_httperror(self):
    357         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    358                                                self.USER, self.PASSWD+"bad")
    359         self.digest_auth_handler.set_qop("auth")
    360         self.assertRaises(urllib.error.HTTPError,
    361                           self.opener.open,
    362                           self.URL)
    363 
    364     def test_proxy_with_no_password_raises_httperror(self):
    365         self.digest_auth_handler.set_qop("auth")
    366         self.assertRaises(urllib.error.HTTPError,
    367                           self.opener.open,
    368                           self.URL)
    369 
    370     def test_proxy_qop_auth_works(self):
    371         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    372                                                self.USER, self.PASSWD)
    373         self.digest_auth_handler.set_qop("auth")
    374         result = self.opener.open(self.URL)
    375         while result.read():
    376             pass
    377         result.close()
    378 
    379     def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
    380         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    381                                                self.USER, self.PASSWD)
    382         self.digest_auth_handler.set_qop("auth-int")
    383         try:
    384             result = self.opener.open(self.URL)
    385         except urllib.error.URLError:
    386             # It's okay if we don't support auth-int, but we certainly
    387             # shouldn't receive any kind of exception here other than
    388             # a URLError.
    389             result = None
    390         if result:
    391             while result.read():
    392                 pass
    393             result.close()
    394 
    395 
    396 def GetRequestHandler(responses):
    397 
    398     class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
    399 
    400         server_version = "TestHTTP/"
    401         requests = []
    402         headers_received = []
    403         port = 80
    404 
    405         def do_GET(self):
    406             body = self.send_head()
    407             while body:
    408                 done = self.wfile.write(body)
    409                 body = body[done:]
    410 
    411         def do_POST(self):
    412             content_length = self.headers["Content-Length"]
    413             post_data = self.rfile.read(int(content_length))
    414             self.do_GET()
    415             self.requests.append(post_data)
    416 
    417         def send_head(self):
    418             FakeHTTPRequestHandler.headers_received = self.headers
    419             self.requests.append(self.path)
    420             response_code, headers, body = responses.pop(0)
    421 
    422             self.send_response(response_code)
    423 
    424             for (header, value) in headers:
    425                 self.send_header(header, value % {'port':self.port})
    426             if body:
    427                 self.send_header("Content-type", "text/plain")
    428                 self.end_headers()
    429                 return body
    430             self.end_headers()
    431 
    432         def log_message(self, *args):
    433             pass
    434 
    435 
    436     return FakeHTTPRequestHandler
    437 
    438 
    439 class TestUrlopen(unittest.TestCase):
    440     """Tests urllib.request.urlopen using the network.
    441 
    442     These tests are not exhaustive.  Assuming that testing using files does a
    443     good job overall of some of the basic interface features.  There are no
    444     tests exercising the optional 'data' and 'proxies' arguments.  No tests
    445     for transparent redirection have been written.
    446     """
    447 
    448     def setUp(self):
    449         super(TestUrlopen, self).setUp()
    450 
    451         # Ignore proxies for localhost tests.
    452         def restore_environ(old_environ):
    453             os.environ.clear()
    454             os.environ.update(old_environ)
    455         self.addCleanup(restore_environ, os.environ.copy())
    456         os.environ['NO_PROXY'] = '*'
    457         os.environ['no_proxy'] = '*'
    458 
    459     def urlopen(self, url, data=None, **kwargs):
    460         l = []
    461         f = urllib.request.urlopen(url, data, **kwargs)
    462         try:
    463             # Exercise various methods
    464             l.extend(f.readlines(200))
    465             l.append(f.readline())
    466             l.append(f.read(1024))
    467             l.append(f.read())
    468         finally:
    469             f.close()
    470         return b"".join(l)
    471 
    472     def stop_server(self):
    473         self.server.stop()
    474         self.server = None
    475 
    476     def start_server(self, responses=None):
    477         if responses is None:
    478             responses = [(200, [], b"we don't care")]
    479         handler = GetRequestHandler(responses)
    480 
    481         self.server = LoopbackHttpServerThread(handler)
    482         self.addCleanup(self.stop_server)
    483         self.server.start()
    484         self.server.ready.wait()
    485         port = self.server.port
    486         handler.port = port
    487         return handler
    488 
    489     def start_https_server(self, responses=None, **kwargs):
    490         if not hasattr(urllib.request, 'HTTPSHandler'):
    491             self.skipTest('ssl support required')
    492         from test.ssl_servers import make_https_server
    493         if responses is None:
    494             responses = [(200, [], b"we care a bit")]
    495         handler = GetRequestHandler(responses)
    496         server = make_https_server(self, handler_class=handler, **kwargs)
    497         handler.port = server.port
    498         return handler
    499 
    500     def test_redirection(self):
    501         expected_response = b"We got here..."
    502         responses = [
    503             (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
    504              ""),
    505             (200, [], expected_response)
    506         ]
    507 
    508         handler = self.start_server(responses)
    509         data = self.urlopen("http://localhost:%s/" % handler.port)
    510         self.assertEqual(data, expected_response)
    511         self.assertEqual(handler.requests, ["/", "/somewhere_else"])
    512 
    513     def test_chunked(self):
    514         expected_response = b"hello world"
    515         chunked_start = (
    516                         b'a\r\n'
    517                         b'hello worl\r\n'
    518                         b'1\r\n'
    519                         b'd\r\n'
    520                         b'0\r\n'
    521                         )
    522         response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
    523         handler = self.start_server(response)
    524         data = self.urlopen("http://localhost:%s/" % handler.port)
    525         self.assertEqual(data, expected_response)
    526 
    527     def test_404(self):
    528         expected_response = b"Bad bad bad..."
    529         handler = self.start_server([(404, [], expected_response)])
    530 
    531         try:
    532             self.urlopen("http://localhost:%s/weeble" % handler.port)
    533         except urllib.error.URLError as f:
    534             data = f.read()
    535             f.close()
    536         else:
    537             self.fail("404 should raise URLError")
    538 
    539         self.assertEqual(data, expected_response)
    540         self.assertEqual(handler.requests, ["/weeble"])
    541 
    542     def test_200(self):
    543         expected_response = b"pycon 2008..."
    544         handler = self.start_server([(200, [], expected_response)])
    545         data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
    546         self.assertEqual(data, expected_response)
    547         self.assertEqual(handler.requests, ["/bizarre"])
    548 
    549     def test_200_with_parameters(self):
    550         expected_response = b"pycon 2008..."
    551         handler = self.start_server([(200, [], expected_response)])
    552         data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
    553                              b"get=with_feeling")
    554         self.assertEqual(data, expected_response)
    555         self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
    556 
    557     def test_https(self):
    558         handler = self.start_https_server()
    559         context = ssl.create_default_context(cafile=CERT_localhost)
    560         data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
    561         self.assertEqual(data, b"we care a bit")
    562 
    563     def test_https_with_cafile(self):
    564         handler = self.start_https_server(certfile=CERT_localhost)
    565         with support.check_warnings(('', DeprecationWarning)):
    566             # Good cert
    567             data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
    568                                 cafile=CERT_localhost)
    569             self.assertEqual(data, b"we care a bit")
    570             # Bad cert
    571             with self.assertRaises(urllib.error.URLError) as cm:
    572                 self.urlopen("https://localhost:%s/bizarre" % handler.port,
    573                              cafile=CERT_fakehostname)
    574             # Good cert, but mismatching hostname
    575             handler = self.start_https_server(certfile=CERT_fakehostname)
    576             with self.assertRaises(urllib.error.URLError) as cm:
    577                 self.urlopen("https://localhost:%s/bizarre" % handler.port,
    578                              cafile=CERT_fakehostname)
    579 
    580     def test_https_with_cadefault(self):
    581         handler = self.start_https_server(certfile=CERT_localhost)
    582         # Self-signed cert should fail verification with system certificate store
    583         with support.check_warnings(('', DeprecationWarning)):
    584             with self.assertRaises(urllib.error.URLError) as cm:
    585                 self.urlopen("https://localhost:%s/bizarre" % handler.port,
    586                              cadefault=True)
    587 
    588     def test_https_sni(self):
    589         if ssl is None:
    590             self.skipTest("ssl module required")
    591         if not ssl.HAS_SNI:
    592             self.skipTest("SNI support required in OpenSSL")
    593         sni_name = None
    594         def cb_sni(ssl_sock, server_name, initial_context):
    595             nonlocal sni_name
    596             sni_name = server_name
    597         context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
    598         context.set_servername_callback(cb_sni)
    599         handler = self.start_https_server(context=context, certfile=CERT_localhost)
    600         context = ssl.create_default_context(cafile=CERT_localhost)
    601         self.urlopen("https://localhost:%s" % handler.port, context=context)
    602         self.assertEqual(sni_name, "localhost")
    603 
    604     def test_sending_headers(self):
    605         handler = self.start_server()
    606         req = urllib.request.Request("http://localhost:%s/" % handler.port,
    607                                      headers={"Range": "bytes=20-39"})
    608         with urllib.request.urlopen(req):
    609             pass
    610         self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
    611 
    612     def test_basic(self):
    613         handler = self.start_server()
    614         open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
    615         for attr in ("read", "close", "info", "geturl"):
    616             self.assertTrue(hasattr(open_url, attr), "object returned from "
    617                          "urlopen lacks the %s attribute" % attr)
    618         try:
    619             self.assertTrue(open_url.read(), "calling 'read' failed")
    620         finally:
    621             open_url.close()
    622 
    623     def test_info(self):
    624         handler = self.start_server()
    625         open_url = urllib.request.urlopen(
    626             "http://localhost:%s" % handler.port)
    627         with open_url:
    628             info_obj = open_url.info()
    629         self.assertIsInstance(info_obj, email.message.Message,
    630                               "object returned by 'info' is not an "
    631                               "instance of email.message.Message")
    632         self.assertEqual(info_obj.get_content_subtype(), "plain")
    633 
    634     def test_geturl(self):
    635         # Make sure same URL as opened is returned by geturl.
    636         handler = self.start_server()
    637         open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
    638         with open_url:
    639             url = open_url.geturl()
    640         self.assertEqual(url, "http://localhost:%s" % handler.port)
    641 
    642     def test_iteration(self):
    643         expected_response = b"pycon 2008..."
    644         handler = self.start_server([(200, [], expected_response)])
    645         data = urllib.request.urlopen("http://localhost:%s" % handler.port)
    646         for line in data:
    647             self.assertEqual(line, expected_response)
    648 
    649     def test_line_iteration(self):
    650         lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
    651         expected_response = b"".join(lines)
    652         handler = self.start_server([(200, [], expected_response)])
    653         data = urllib.request.urlopen("http://localhost:%s" % handler.port)
    654         for index, line in enumerate(data):
    655             self.assertEqual(line, lines[index],
    656                              "Fetched line number %s doesn't match expected:\n"
    657                              "    Expected length was %s, got %s" %
    658                              (index, len(lines[index]), len(line)))
    659         self.assertEqual(index + 1, len(lines))
    660 
    661 
    662 threads_key = None
    663 
    664 def setUpModule():
    665     # Store the threading_setup in a key and ensure that it is cleaned up
    666     # in the tearDown
    667     global threads_key
    668     threads_key = support.threading_setup()
    669 
    670 def tearDownModule():
    671     if threads_key:
    672         support.threading_cleanup(*threads_key)
    673 
    674 if __name__ == "__main__":
    675     unittest.main()
    676