Home | History | Annotate | Download | only in test
      1 #!/usr/bin/env python

      2 
      3 import urlparse
      4 import urllib2
      5 import BaseHTTPServer
      6 import unittest
      7 import hashlib
      8 from test import test_support
      9 mimetools = test_support.import_module('mimetools', deprecated=True)
     10 threading = test_support.import_module('threading')
     11 
     12 # Loopback http server infrastructure

     13 
     14 class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
     15     """HTTP server w/ a few modifications that make it useful for
     16     loopback testing purposes.
     17     """
     18 
     19     def __init__(self, server_address, RequestHandlerClass):
     20         BaseHTTPServer.HTTPServer.__init__(self,
     21                                            server_address,
     22                                            RequestHandlerClass)
     23 
     24         # Set the timeout of our listening socket really low so

     25         # that we can stop the server easily.

     26         self.socket.settimeout(1.0)
     27 
     28     def get_request(self):
     29         """BaseHTTPServer method, overridden."""
     30 
     31         request, client_address = self.socket.accept()
     32 
     33         # It's a loopback connection, so setting the timeout

     34         # really low shouldn't affect anything, but should make

     35         # deadlocks less likely to occur.

     36         request.settimeout(10.0)
     37 
     38         return (request, client_address)
     39 
     40 class LoopbackHttpServerThread(threading.Thread):
     41     """Stoppable thread that runs a loopback http server."""
     42 
     43     def __init__(self, request_handler):
     44         threading.Thread.__init__(self)
     45         self._stop = False
     46         self.ready = threading.Event()
     47         request_handler.protocol_version = "HTTP/1.0"
     48         self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
     49                                         request_handler)
     50         #print "Serving HTTP on %s port %s" % (self.httpd.server_name,

     51         #                                      self.httpd.server_port)

     52         self.port = self.httpd.server_port
     53 
     54     def stop(self):
     55         """Stops the webserver if it's currently running."""
     56 
     57         # Set the stop flag.

     58         self._stop = True
     59 
     60         self.join()
     61 
     62     def run(self):
     63         self.ready.set()
     64         while not self._stop:
     65             self.httpd.handle_request()
     66 
     67 # Authentication infrastructure

     68 
     69 class DigestAuthHandler:
     70     """Handler for performing digest authentication."""
     71 
     72     def __init__(self):
     73         self._request_num = 0
     74         self._nonces = []
     75         self._users = {}
     76         self._realm_name = "Test Realm"
     77         self._qop = "auth"
     78 
     79     def set_qop(self, qop):
     80         self._qop = qop
     81 
     82     def set_users(self, users):
     83         assert isinstance(users, dict)
     84         self._users = users
     85 
     86     def set_realm(self, realm):
     87         self._realm_name = realm
     88 
     89     def _generate_nonce(self):
     90         self._request_num += 1
     91         nonce = hashlib.md5(str(self._request_num)).hexdigest()
     92         self._nonces.append(nonce)
     93         return nonce
     94 
     95     def _create_auth_dict(self, auth_str):
     96         first_space_index = auth_str.find(" ")
     97         auth_str = auth_str[first_space_index+1:]
     98 
     99         parts = auth_str.split(",")
    100 
    101         auth_dict = {}
    102         for part in parts:
    103             name, value = part.split("=")
    104             name = name.strip()
    105             if value[0] == '"' and value[-1] == '"':
    106                 value = value[1:-1]
    107             else:
    108                 value = value.strip()
    109             auth_dict[name] = value
    110         return auth_dict
    111 
    112     def _validate_auth(self, auth_dict, password, method, uri):
    113         final_dict = {}
    114         final_dict.update(auth_dict)
    115         final_dict["password"] = password
    116         final_dict["method"] = method
    117         final_dict["uri"] = uri
    118         HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
    119         HA1 = hashlib.md5(HA1_str).hexdigest()
    120         HA2_str = "%(method)s:%(uri)s" % final_dict
    121         HA2 = hashlib.md5(HA2_str).hexdigest()
    122         final_dict["HA1"] = HA1
    123         final_dict["HA2"] = HA2
    124         response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
    125                        "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
    126         response = hashlib.md5(response_str).hexdigest()
    127 
    128         return response == auth_dict["response"]
    129 
    130     def _return_auth_challenge(self, request_handler):
    131         request_handler.send_response(407, "Proxy Authentication Required")
    132         request_handler.send_header("Content-Type", "text/html")
    133         request_handler.send_header(
    134             'Proxy-Authenticate', 'Digest realm="%s", '
    135             'qop="%s",'
    136             'nonce="%s", ' % \
    137             (self._realm_name, self._qop, self._generate_nonce()))
    138         # XXX: Not sure if we're supposed to add this next header or

    139         # not.

    140         #request_handler.send_header('Connection', 'close')

    141         request_handler.end_headers()
    142         request_handler.wfile.write("Proxy Authentication Required.")
    143         return False
    144 
    145     def handle_request(self, request_handler):
    146         """Performs digest authentication on the given HTTP request
    147         handler.  Returns True if authentication was successful, False
    148         otherwise.
    149 
    150         If no users have been set, then digest auth is effectively
    151         disabled and this method will always return True.
    152         """
    153 
    154         if len(self._users) == 0:
    155             return True
    156 
    157         if 'Proxy-Authorization' not in request_handler.headers:
    158             return self._return_auth_challenge(request_handler)
    159         else:
    160             auth_dict = self._create_auth_dict(
    161                 request_handler.headers['Proxy-Authorization']
    162                 )
    163             if auth_dict["username"] in self._users:
    164                 password = self._users[ auth_dict["username"] ]
    165             else:
    166                 return self._return_auth_challenge(request_handler)
    167             if not auth_dict.get("nonce") in self._nonces:
    168                 return self._return_auth_challenge(request_handler)
    169             else:
    170                 self._nonces.remove(auth_dict["nonce"])
    171 
    172             auth_validated = False
    173 
    174             # MSIE uses short_path in its validation, but Python's

    175             # urllib2 uses the full path, so we're going to see if

    176             # either of them works here.

    177 
    178             for path in [request_handler.path, request_handler.short_path]:
    179                 if self._validate_auth(auth_dict,
    180                                        password,
    181                                        request_handler.command,
    182                                        path):
    183                     auth_validated = True
    184 
    185             if not auth_validated:
    186                 return self._return_auth_challenge(request_handler)
    187             return True
    188 
    189 # Proxy test infrastructure

    190 
    191 class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    192     """This is a 'fake proxy' that makes it look like the entire
    193     internet has gone down due to a sudden zombie invasion.  It main
    194     utility is in providing us with authentication support for
    195     testing.
    196     """
    197 
    198     def __init__(self, digest_auth_handler, *args, **kwargs):
    199         # This has to be set before calling our parent's __init__(), which will

    200         # try to call do_GET().

    201         self.digest_auth_handler = digest_auth_handler
    202         BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
    203 
    204     def log_message(self, format, *args):
    205         # Uncomment the next line for debugging.

    206         #sys.stderr.write(format % args)

    207         pass
    208 
    209     def do_GET(self):
    210         (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
    211             self.path, 'http')
    212         self.short_path = path
    213         if self.digest_auth_handler.handle_request(self):
    214             self.send_response(200, "OK")
    215             self.send_header("Content-Type", "text/html")
    216             self.end_headers()
    217             self.wfile.write("You've reached %s!<BR>" % self.path)
    218             self.wfile.write("Our apologies, but our server is down due to "
    219                               "a sudden zombie invasion.")
    220 
    221 # Test cases

    222 
    223 class BaseTestCase(unittest.TestCase):
    224     def setUp(self):
    225         self._threads = test_support.threading_setup()
    226 
    227     def tearDown(self):
    228         test_support.threading_cleanup(*self._threads)
    229 
    230 
    231 class ProxyAuthTests(BaseTestCase):
    232     URL = "http://localhost"
    233 
    234     USER = "tester"
    235     PASSWD = "test123"
    236     REALM = "TestRealm"
    237 
    238     def setUp(self):
    239         super(ProxyAuthTests, self).setUp()
    240         self.digest_auth_handler = DigestAuthHandler()
    241         self.digest_auth_handler.set_users({self.USER: self.PASSWD})
    242         self.digest_auth_handler.set_realm(self.REALM)
    243         def create_fake_proxy_handler(*args, **kwargs):
    244             return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
    245 
    246         self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
    247         self.server.start()
    248         self.server.ready.wait()
    249         proxy_url = "http://127.0.0.1:%d" % self.server.port
    250         handler = urllib2.ProxyHandler({"http" : proxy_url})
    251         self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
    252         self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
    253 
    254     def tearDown(self):
    255         self.server.stop()
    256         super(ProxyAuthTests, self).tearDown()
    257 
    258     def test_proxy_with_bad_password_raises_httperror(self):
    259         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    260                                                self.USER, self.PASSWD+"bad")
    261         self.digest_auth_handler.set_qop("auth")
    262         self.assertRaises(urllib2.HTTPError,
    263                           self.opener.open,
    264                           self.URL)
    265 
    266     def test_proxy_with_no_password_raises_httperror(self):
    267         self.digest_auth_handler.set_qop("auth")
    268         self.assertRaises(urllib2.HTTPError,
    269                           self.opener.open,
    270                           self.URL)
    271 
    272     def test_proxy_qop_auth_works(self):
    273         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    274                                                self.USER, self.PASSWD)
    275         self.digest_auth_handler.set_qop("auth")
    276         result = self.opener.open(self.URL)
    277         while result.read():
    278             pass
    279         result.close()
    280 
    281     def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
    282         self.proxy_digest_handler.add_password(self.REALM, self.URL,
    283                                                self.USER, self.PASSWD)
    284         self.digest_auth_handler.set_qop("auth-int")
    285         try:
    286             result = self.opener.open(self.URL)
    287         except urllib2.URLError:
    288             # It's okay if we don't support auth-int, but we certainly

    289             # shouldn't receive any kind of exception here other than

    290             # a URLError.

    291             result = None
    292         if result:
    293             while result.read():
    294                 pass
    295             result.close()
    296 
    297 
    298 def GetRequestHandler(responses):
    299 
    300     class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    301 
    302         server_version = "TestHTTP/"
    303         requests = []
    304         headers_received = []
    305         port = 80
    306 
    307         def do_GET(self):
    308             body = self.send_head()
    309             if body:
    310                 self.wfile.write(body)
    311 
    312         def do_POST(self):
    313             content_length = self.headers['Content-Length']
    314             post_data = self.rfile.read(int(content_length))
    315             self.do_GET()
    316             self.requests.append(post_data)
    317 
    318         def send_head(self):
    319             FakeHTTPRequestHandler.headers_received = self.headers
    320             self.requests.append(self.path)
    321             response_code, headers, body = responses.pop(0)
    322 
    323             self.send_response(response_code)
    324 
    325             for (header, value) in headers:
    326                 self.send_header(header, value % self.port)
    327             if body:
    328                 self.send_header('Content-type', 'text/plain')
    329                 self.end_headers()
    330                 return body
    331             self.end_headers()
    332 
    333         def log_message(self, *args):
    334             pass
    335 
    336 
    337     return FakeHTTPRequestHandler
    338 
    339 
    340 class TestUrlopen(BaseTestCase):
    341     """Tests urllib2.urlopen using the network.
    342 
    343     These tests are not exhaustive.  Assuming that testing using files does a
    344     good job overall of some of the basic interface features.  There are no
    345     tests exercising the optional 'data' and 'proxies' arguments.  No tests
    346     for transparent redirection have been written.
    347     """
    348 
    349     def start_server(self, responses):
    350         handler = GetRequestHandler(responses)
    351 
    352         self.server = LoopbackHttpServerThread(handler)
    353         self.server.start()
    354         self.server.ready.wait()
    355         port = self.server.port
    356         handler.port = port
    357         return handler
    358 
    359 
    360     def test_redirection(self):
    361         expected_response = 'We got here...'
    362         responses = [
    363             (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
    364             (200, [], expected_response)
    365         ]
    366 
    367         handler = self.start_server(responses)
    368 
    369         try:
    370             f = urllib2.urlopen('http://localhost:%s/' % handler.port)
    371             data = f.read()
    372             f.close()
    373 
    374             self.assertEqual(data, expected_response)
    375             self.assertEqual(handler.requests, ['/', '/somewhere_else'])
    376         finally:
    377             self.server.stop()
    378 
    379 
    380     def test_404(self):
    381         expected_response = 'Bad bad bad...'
    382         handler = self.start_server([(404, [], expected_response)])
    383 
    384         try:
    385             try:
    386                 urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
    387             except urllib2.URLError, f:
    388                 pass
    389             else:
    390                 self.fail('404 should raise URLError')
    391 
    392             data = f.read()
    393             f.close()
    394 
    395             self.assertEqual(data, expected_response)
    396             self.assertEqual(handler.requests, ['/weeble'])
    397         finally:
    398             self.server.stop()
    399 
    400 
    401     def test_200(self):
    402         expected_response = 'pycon 2008...'
    403         handler = self.start_server([(200, [], expected_response)])
    404 
    405         try:
    406             f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
    407             data = f.read()
    408             f.close()
    409 
    410             self.assertEqual(data, expected_response)
    411             self.assertEqual(handler.requests, ['/bizarre'])
    412         finally:
    413             self.server.stop()
    414 
    415     def test_200_with_parameters(self):
    416         expected_response = 'pycon 2008...'
    417         handler = self.start_server([(200, [], expected_response)])
    418 
    419         try:
    420             f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
    421             data = f.read()
    422             f.close()
    423 
    424             self.assertEqual(data, expected_response)
    425             self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])
    426         finally:
    427             self.server.stop()
    428 
    429 
    430     def test_sending_headers(self):
    431         handler = self.start_server([(200, [], "we don't care")])
    432 
    433         try:
    434             req = urllib2.Request("http://localhost:%s/" % handler.port,
    435                                   headers={'Range': 'bytes=20-39'})
    436             urllib2.urlopen(req)
    437             self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
    438         finally:
    439             self.server.stop()
    440 
    441     def test_basic(self):
    442         handler = self.start_server([(200, [], "we don't care")])
    443 
    444         try:
    445             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
    446             for attr in ("read", "close", "info", "geturl"):
    447                 self.assertTrue(hasattr(open_url, attr), "object returned from "
    448                              "urlopen lacks the %s attribute" % attr)
    449             try:
    450                 self.assertTrue(open_url.read(), "calling 'read' failed")
    451             finally:
    452                 open_url.close()
    453         finally:
    454             self.server.stop()
    455 
    456     def test_info(self):
    457         handler = self.start_server([(200, [], "we don't care")])
    458 
    459         try:
    460             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
    461             info_obj = open_url.info()
    462             self.assertIsInstance(info_obj, mimetools.Message,
    463                                   "object returned by 'info' is not an "
    464                                   "instance of mimetools.Message")
    465             self.assertEqual(info_obj.getsubtype(), "plain")
    466         finally:
    467             self.server.stop()
    468 
    469     def test_geturl(self):
    470         # Make sure same URL as opened is returned by geturl.

    471         handler = self.start_server([(200, [], "we don't care")])
    472 
    473         try:
    474             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
    475             url = open_url.geturl()
    476             self.assertEqual(url, "http://localhost:%s" % handler.port)
    477         finally:
    478             self.server.stop()
    479 
    480 
    481     def test_bad_address(self):
    482         # Make sure proper exception is raised when connecting to a bogus

    483         # address.

    484         self.assertRaises(IOError,
    485                           # Given that both VeriSign and various ISPs have in

    486                           # the past or are presently hijacking various invalid

    487                           # domain name requests in an attempt to boost traffic

    488                           # to their own sites, finding a domain name to use

    489                           # for this test is difficult.  RFC2606 leads one to

    490                           # believe that '.invalid' should work, but experience

    491                           # seemed to indicate otherwise.  Single character

    492                           # TLDs are likely to remain invalid, so this seems to

    493                           # be the best choice. The trailing '.' prevents a

    494                           # related problem: The normal DNS resolver appends

    495                           # the domain names from the search path if there is

    496                           # no '.' the end and, and if one of those domains

    497                           # implements a '*' rule a result is returned.

    498                           # However, none of this will prevent the test from

    499                           # failing if the ISP hijacks all invalid domain

    500                           # requests.  The real solution would be to be able to

    501                           # parameterize the framework with a mock resolver.

    502                           urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
    503 
    504     def test_iteration(self):
    505         expected_response = "pycon 2008..."
    506         handler = self.start_server([(200, [], expected_response)])
    507         try:
    508             data = urllib2.urlopen("http://localhost:%s" % handler.port)
    509             for line in data:
    510                 self.assertEqual(line, expected_response)
    511         finally:
    512             self.server.stop()
    513 
    514     def ztest_line_iteration(self):
    515         lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]
    516         expected_response = "".join(lines)
    517         handler = self.start_server([(200, [], expected_response)])
    518         try:
    519             data = urllib2.urlopen("http://localhost:%s" % handler.port)
    520             for index, line in enumerate(data):
    521                 self.assertEqual(line, lines[index],
    522                                  "Fetched line number %s doesn't match expected:\n"
    523                                  "    Expected length was %s, got %s" %
    524                                  (index, len(lines[index]), len(line)))
    525         finally:
    526             self.server.stop()
    527         self.assertEqual(index + 1, len(lines))
    528 
    529 def test_main():
    530     # We will NOT depend on the network resource flag

    531     # (Lib/test/regrtest.py -u network) since all tests here are only

    532     # localhost.  However, if this is a bad rationale, then uncomment

    533     # the next line.

    534     #test_support.requires("network")

    535 
    536     test_support.run_unittest(ProxyAuthTests, TestUrlopen)
    537 
    538 if __name__ == "__main__":
    539     test_main()
    540