Home | History | Annotate | Download | only in test
      1 #!/usr/bin/env python
      2 
      3 import urlparse
      4 import urllib2
      5 import BaseHTTPServer
      6 import unittest
      7 import hashlib
      8 
      9 from test import test_support
     10 
     11 mimetools = test_support.import_module('mimetools', deprecated=True)
     12 threading = test_support.import_module('threading')
     13 
     14 # Loopback http server infrastructure
     15 
     16 class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
     17     """HTTP server w/ a few modifications that make it useful for
     18     loopback testing purposes.
     19     """
     20 
     21     def __init__(self, server_address, RequestHandlerClass):
     22         BaseHTTPServer.HTTPServer.__init__(self,
     23                                            server_address,
     24                                            RequestHandlerClass)
     25 
     26         # Set the timeout of our listening socket really low so
     27         # that we can stop the server easily.
     28         self.socket.settimeout(1.0)
     29 
     30     def get_request(self):
     31         """BaseHTTPServer method, overridden."""
     32 
     33         request, client_address = self.socket.accept()
     34 
     35         # It's a loopback connection, so setting the timeout
     36         # really low shouldn't affect anything, but should make
     37         # deadlocks less likely to occur.
     38         request.settimeout(10.0)
     39 
     40         return (request, client_address)
     41 
     42 class LoopbackHttpServerThread(threading.Thread):
     43     """Stoppable thread that runs a loopback http server."""
     44 
     45     def __init__(self, request_handler):
     46         threading.Thread.__init__(self)
     47         self._stop = False
     48         self.ready = threading.Event()
     49         request_handler.protocol_version = "HTTP/1.0"
     50         self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
     51                                         request_handler)
     52         #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
     53         #                                      self.httpd.server_port)
     54         self.port = self.httpd.server_port
     55 
     56     def stop(self):
     57         """Stops the webserver if it's currently running."""
     58 
     59         # Set the stop flag.
     60         self._stop = True
     61 
     62         self.join()
     63 
     64     def run(self):
     65         self.ready.set()
     66         while not self._stop:
     67             self.httpd.handle_request()
     68 
     69 # Authentication infrastructure
     70 
     71 class DigestAuthHandler:
     72     """Handler for performing digest authentication."""
     73 
     74     def __init__(self):
     75         self._request_num = 0
     76         self._nonces = []
     77         self._users = {}
     78         self._realm_name = "Test Realm"
     79         self._qop = "auth"
     80 
     81     def set_qop(self, qop):
     82         self._qop = qop
     83 
     84     def set_users(self, users):
     85         assert isinstance(users, dict)
     86         self._users = users
     87 
     88     def set_realm(self, realm):
     89         self._realm_name = realm
     90 
     91     def _generate_nonce(self):
     92         self._request_num += 1
     93         nonce = hashlib.md5(str(self._request_num)).hexdigest()
     94         self._nonces.append(nonce)
     95         return nonce
     96 
     97     def _create_auth_dict(self, auth_str):
     98         first_space_index = auth_str.find(" ")
     99         auth_str = auth_str[first_space_index+1:]
    100 
    101         parts = auth_str.split(",")
    102 
    103         auth_dict = {}
    104         for part in parts:
    105             name, value = part.split("=")
    106             name = name.strip()
    107             if value[0] == '"' and value[-1] == '"':
    108                 value = value[1:-1]
    109             else:
    110                 value = value.strip()
    111             auth_dict[name] = value
    112         return auth_dict
    113 
    114     def _validate_auth(self, auth_dict, password, method, uri):
    115         final_dict = {}
    116         final_dict.update(auth_dict)
    117         final_dict["password"] = password
    118         final_dict["method"] = method
    119         final_dict["uri"] = uri
    120         HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
    121         HA1 = hashlib.md5(HA1_str).hexdigest()
    122         HA2_str = "%(method)s:%(uri)s" % final_dict
    123         HA2 = hashlib.md5(HA2_str).hexdigest()
    124         final_dict["HA1"] = HA1
    125         final_dict["HA2"] = HA2
    126         response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
    127                        "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
    128         response = hashlib.md5(response_str).hexdigest()
    129 
    130         return response == auth_dict["response"]
    131 
    132     def _return_auth_challenge(self, request_handler):
    133         request_handler.send_response(407, "Proxy Authentication Required")
    134         request_handler.send_header("Content-Type", "text/html")
    135         request_handler.send_header(
    136             'Proxy-Authenticate', 'Digest realm="%s", '
    137             'qop="%s",'
    138             'nonce="%s", ' % \
    139             (self._realm_name, self._qop, self._generate_nonce()))
    140         # XXX: Not sure if we're supposed to add this next header or
    141         # not.
    142         #request_handler.send_header('Connection', 'close')
    143         request_handler.end_headers()
    144         request_handler.wfile.write("Proxy Authentication Required.")
    145         return False
    146 
    147     def handle_request(self, request_handler):
    148         """Performs digest authentication on the given HTTP request
    149         handler.  Returns True if authentication was successful, False
    150         otherwise.
    151 
    152         If no users have been set, then digest auth is effectively
    153         disabled and this method will always return True.
    154         """
    155 
    156         if len(self._users) == 0:
    157             return True
    158 
    159         if 'Proxy-Authorization' not in request_handler.headers:
    160             return self._return_auth_challenge(request_handler)
    161         else:
    162             auth_dict = self._create_auth_dict(
    163                 request_handler.headers['Proxy-Authorization']
    164                 )
    165             if auth_dict["username"] in self._users:
    166                 password = self._users[ auth_dict["username"] ]
    167             else:
    168                 return self._return_auth_challenge(request_handler)
    169             if not auth_dict.get("nonce") in self._nonces:
    170                 return self._return_auth_challenge(request_handler)
    171             else:
    172                 self._nonces.remove(auth_dict["nonce"])
    173 
    174             auth_validated = False
    175 
    176             # MSIE uses short_path in its validation, but Python's
    177             # urllib2 uses the full path, so we're going to see if
    178             # either of them works here.
    179 
    180             for path in [request_handler.path, request_handler.short_path]:
    181                 if self._validate_auth(auth_dict,
    182                                        password,
    183                                        request_handler.command,
    184                                        path):
    185                     auth_validated = True
    186 
    187             if not auth_validated:
    188                 return self._return_auth_challenge(request_handler)
    189             return True
    190 
    191 # Proxy test infrastructure
    192 
    193 class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    194     """This is a 'fake proxy' that makes it look like the entire
    195     internet has gone down due to a sudden zombie invasion.  It main
    196     utility is in providing us with authentication support for
    197     testing.
    198     """
    199 
    200     def __init__(self, digest_auth_handler, *args, **kwargs):
    201         # This has to be set before calling our parent's __init__(), which will
    202         # try to call do_GET().
    203         self.digest_auth_handler = digest_auth_handler
    204         BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
    205 
    206     def log_message(self, format, *args):
    207         # Uncomment the next line for debugging.
    208         #sys.stderr.write(format % args)
    209         pass
    210 
    211     def do_GET(self):
    212         (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
    213             self.path, 'http')
    214         self.short_path = path
    215         if self.digest_auth_handler.handle_request(self):
    216             self.send_response(200, "OK")
    217             self.send_header("Content-Type", "text/html")
    218             self.end_headers()
    219             self.wfile.write("You've reached %s!<BR>" % self.path)
    220             self.wfile.write("Our apologies, but our server is down due to "
    221                               "a sudden zombie invasion.")
    222 
    223 # Test cases
    224 
    225 class BaseTestCase(unittest.TestCase):
    226     def setUp(self):
    227         self._threads = test_support.threading_setup()
    228 
    229     def tearDown(self):
    230         test_support.threading_cleanup(*self._threads)
    231 
    232 
    233 class ProxyAuthTests(BaseTestCase):
    234     URL = "http://localhost"
    235 
    236     USER = "tester"
    237     PASSWD = "test123"
    238     REALM = "TestRealm"
    239 
    240     def setUp(self):
    241         super(ProxyAuthTests, self).setUp()
    242         self.digest_auth_handler = DigestAuthHandler()
    243         self.digest_auth_handler.set_users({self.USER: self.PASSWD})
    244         self.digest_auth_handler.set_realm(self.REALM)
    245         def create_fake_proxy_handler(*args, **kwargs):
    246             return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
    247 
    248         self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
    249         self.server.start()
    250         self.server.ready.wait()
    251         proxy_url = "http://127.0.0.1:%d" % self.server.port
    252         handler = urllib2.ProxyHandler({"http" : proxy_url})
    253         self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
    254         self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
    255 
    256     def tearDown(self):
    257         self.server.stop()
    258         super(ProxyAuthTests, self).tearDown()
    259 
    260     def test_proxy_with_bad_password_raises_httperror(self):
    261         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    262                                                self.USER, self.PASSWD+"bad")
    263         self.digest_auth_handler.set_qop("auth")
    264         self.assertRaises(urllib2.HTTPError,
    265                           self.opener.open,
    266                           self.URL)
    267 
    268     def test_proxy_with_no_password_raises_httperror(self):
    269         self.digest_auth_handler.set_qop("auth")
    270         self.assertRaises(urllib2.HTTPError,
    271                           self.opener.open,
    272                           self.URL)
    273 
    274     def test_proxy_qop_auth_works(self):
    275         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    276                                                self.USER, self.PASSWD)
    277         self.digest_auth_handler.set_qop("auth")
    278         result = self.opener.open(self.URL)
    279         while result.read():
    280             pass
    281         result.close()
    282 
    283     def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
    284         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    285                                                self.USER, self.PASSWD)
    286         self.digest_auth_handler.set_qop("auth-int")
    287         try:
    288             result = self.opener.open(self.URL)
    289         except urllib2.URLError:
    290             # It's okay if we don't support auth-int, but we certainly
    291             # shouldn't receive any kind of exception here other than
    292             # a URLError.
    293             result = None
    294         if result:
    295             while result.read():
    296                 pass
    297             result.close()
    298 
    299 
    300 def GetRequestHandler(responses):
    301 
    302     class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    303 
    304         server_version = "TestHTTP/"
    305         requests = []
    306         headers_received = []
    307         port = 80
    308 
    309         def do_GET(self):
    310             body = self.send_head()
    311             if body:
    312                 self.wfile.write(body)
    313 
    314         def do_POST(self):
    315             content_length = self.headers['Content-Length']
    316             post_data = self.rfile.read(int(content_length))
    317             self.do_GET()
    318             self.requests.append(post_data)
    319 
    320         def send_head(self):
    321             FakeHTTPRequestHandler.headers_received = self.headers
    322             self.requests.append(self.path)
    323             response_code, headers, body = responses.pop(0)
    324 
    325             self.send_response(response_code)
    326 
    327             for (header, value) in headers:
    328                 self.send_header(header, value % self.port)
    329             if body:
    330                 self.send_header('Content-type', 'text/plain')
    331                 self.end_headers()
    332                 return body
    333             self.end_headers()
    334 
    335         def log_message(self, *args):
    336             pass
    337 
    338 
    339     return FakeHTTPRequestHandler
    340 
    341 
    342 class TestUrlopen(BaseTestCase):
    343     """Tests urllib2.urlopen using the network.
    344 
    345     These tests are not exhaustive.  Assuming that testing using files does a
    346     good job overall of some of the basic interface features.  There are no
    347     tests exercising the optional 'data' and 'proxies' arguments.  No tests
    348     for transparent redirection have been written.
    349     """
    350 
    351     def setUp(self):
    352         proxy_handler = urllib2.ProxyHandler({})
    353         opener = urllib2.build_opener(proxy_handler)
    354         urllib2.install_opener(opener)
    355         super(TestUrlopen, self).setUp()
    356 
    357     def start_server(self, responses):
    358         handler = GetRequestHandler(responses)
    359 
    360         self.server = LoopbackHttpServerThread(handler)
    361         self.server.start()
    362         self.server.ready.wait()
    363         port = self.server.port
    364         handler.port = port
    365         return handler
    366 
    367 
    368     def test_redirection(self):
    369         expected_response = 'We got here...'
    370         responses = [
    371             (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
    372             (200, [], expected_response)
    373         ]
    374 
    375         handler = self.start_server(responses)
    376 
    377         try:
    378             f = urllib2.urlopen('http://localhost:%s/' % handler.port)
    379             data = f.read()
    380             f.close()
    381 
    382             self.assertEqual(data, expected_response)
    383             self.assertEqual(handler.requests, ['/', '/somewhere_else'])
    384         finally:
    385             self.server.stop()
    386 
    387 
    388     def test_404(self):
    389         expected_response = 'Bad bad bad...'
    390         handler = self.start_server([(404, [], expected_response)])
    391 
    392         try:
    393             try:
    394                 urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
    395             except urllib2.URLError, f:
    396                 pass
    397             else:
    398                 self.fail('404 should raise URLError')
    399 
    400             data = f.read()
    401             f.close()
    402 
    403             self.assertEqual(data, expected_response)
    404             self.assertEqual(handler.requests, ['/weeble'])
    405         finally:
    406             self.server.stop()
    407 
    408 
    409     def test_200(self):
    410         expected_response = 'pycon 2008...'
    411         handler = self.start_server([(200, [], expected_response)])
    412 
    413         try:
    414             f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
    415             data = f.read()
    416             f.close()
    417 
    418             self.assertEqual(data, expected_response)
    419             self.assertEqual(handler.requests, ['/bizarre'])
    420         finally:
    421             self.server.stop()
    422 
    423     def test_200_with_parameters(self):
    424         expected_response = 'pycon 2008...'
    425         handler = self.start_server([(200, [], expected_response)])
    426 
    427         try:
    428             f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
    429             data = f.read()
    430             f.close()
    431 
    432             self.assertEqual(data, expected_response)
    433             self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])
    434         finally:
    435             self.server.stop()
    436 
    437 
    438     def test_sending_headers(self):
    439         handler = self.start_server([(200, [], "we don't care")])
    440 
    441         try:
    442             req = urllib2.Request("http://localhost:%s/" % handler.port,
    443                                   headers={'Range': 'bytes=20-39'})
    444             urllib2.urlopen(req)
    445             self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
    446         finally:
    447             self.server.stop()
    448 
    449     def test_basic(self):
    450         handler = self.start_server([(200, [], "we don't care")])
    451 
    452         try:
    453             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
    454             for attr in ("read", "close", "info", "geturl"):
    455                 self.assertTrue(hasattr(open_url, attr), "object returned from "
    456                              "urlopen lacks the %s attribute" % attr)
    457             try:
    458                 self.assertTrue(open_url.read(), "calling 'read' failed")
    459             finally:
    460                 open_url.close()
    461         finally:
    462             self.server.stop()
    463 
    464     def test_info(self):
    465         handler = self.start_server([(200, [], "we don't care")])
    466 
    467         try:
    468             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
    469             info_obj = open_url.info()
    470             self.assertIsInstance(info_obj, mimetools.Message,
    471                                   "object returned by 'info' is not an "
    472                                   "instance of mimetools.Message")
    473             self.assertEqual(info_obj.getsubtype(), "plain")
    474         finally:
    475             self.server.stop()
    476 
    477     def test_geturl(self):
    478         # Make sure same URL as opened is returned by geturl.
    479         handler = self.start_server([(200, [], "we don't care")])
    480 
    481         try:
    482             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
    483             url = open_url.geturl()
    484             self.assertEqual(url, "http://localhost:%s" % handler.port)
    485         finally:
    486             self.server.stop()
    487 
    488 
    489     def test_bad_address(self):
    490         # Make sure proper exception is raised when connecting to a bogus
    491         # address.
    492 
    493         # as indicated by the comment below, this might fail with some ISP,
    494         # so we run the test only when -unetwork/-uall is specified to
    495         # mitigate the problem a bit (see #17564)
    496         test_support.requires('network')
    497         self.assertRaises(IOError,
    498                           # Given that both VeriSign and various ISPs have in
    499                           # the past or are presently hijacking various invalid
    500                           # domain name requests in an attempt to boost traffic
    501                           # to their own sites, finding a domain name to use
    502                           # for this test is difficult.  RFC2606 leads one to
    503                           # believe that '.invalid' should work, but experience
    504                           # seemed to indicate otherwise.  Single character
    505                           # TLDs are likely to remain invalid, so this seems to
    506                           # be the best choice. The trailing '.' prevents a
    507                           # related problem: The normal DNS resolver appends
    508                           # the domain names from the search path if there is
    509                           # no '.' the end and, and if one of those domains
    510                           # implements a '*' rule a result is returned.
    511                           # However, none of this will prevent the test from
    512                           # failing if the ISP hijacks all invalid domain
    513                           # requests.  The real solution would be to be able to
    514                           # parameterize the framework with a mock resolver.
    515                           urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
    516 
    517     def test_iteration(self):
    518         expected_response = "pycon 2008..."
    519         handler = self.start_server([(200, [], expected_response)])
    520         try:
    521             data = urllib2.urlopen("http://localhost:%s" % handler.port)
    522             for line in data:
    523                 self.assertEqual(line, expected_response)
    524         finally:
    525             self.server.stop()
    526 
    527     def ztest_line_iteration(self):
    528         lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]
    529         expected_response = "".join(lines)
    530         handler = self.start_server([(200, [], expected_response)])
    531         try:
    532             data = urllib2.urlopen("http://localhost:%s" % handler.port)
    533             for index, line in enumerate(data):
    534                 self.assertEqual(line, lines[index],
    535                                  "Fetched line number %s doesn't match expected:\n"
    536                                  "    Expected length was %s, got %s" %
    537                                  (index, len(lines[index]), len(line)))
    538         finally:
    539             self.server.stop()
    540         self.assertEqual(index + 1, len(lines))
    541 
    542 def test_main():
    543     # We will NOT depend on the network resource flag
    544     # (Lib/test/regrtest.py -u network) since all tests here are only
    545     # localhost.  However, if this is a bad rationale, then uncomment
    546     # the next line.
    547     #test_support.requires("network")
    548 
    549     test_support.run_unittest(ProxyAuthTests, TestUrlopen)
    550 
    551 if __name__ == "__main__":
    552     test_main()
    553