Home | History | Annotate | Download | only in test
      1 #!/usr/bin/env python
      2 
      3 import unittest
      4 from test import test_support
      5 from test.test_urllib2 import sanepathname2url
      6 
      7 import socket
      8 import urllib2
      9 import os
     10 import sys
     11 
     12 TIMEOUT = 60  # seconds
     13 
     14 
     15 def _retry_thrice(func, exc, *args, **kwargs):
     16     for i in range(3):
     17         try:
     18             return func(*args, **kwargs)
     19         except exc, last_exc:
     20             continue
     21         except:
     22             raise
     23     raise last_exc
     24 
     25 def _wrap_with_retry_thrice(func, exc):
     26     def wrapped(*args, **kwargs):
     27         return _retry_thrice(func, exc, *args, **kwargs)
     28     return wrapped
     29 
     30 # Connecting to remote hosts is flaky.  Make it more robust by retrying
     31 # the connection several times.
     32 _urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError)
     33 
     34 
     35 class AuthTests(unittest.TestCase):
     36     """Tests urllib2 authentication features."""
     37 
     38 ## Disabled at the moment since there is no page under python.org which
     39 ## could be used to HTTP authentication.
     40 #
     41 #    def test_basic_auth(self):
     42 #        import httplib
     43 #
     44 #        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
     45 #        test_hostport = "www.python.org"
     46 #        test_realm = 'Test Realm'
     47 #        test_user = 'test.test_urllib2net'
     48 #        test_password = 'blah'
     49 #
     50 #        # failure
     51 #        try:
     52 #            _urlopen_with_retry(test_url)
     53 #        except urllib2.HTTPError, exc:
     54 #            self.assertEqual(exc.code, 401)
     55 #        else:
     56 #            self.fail("urlopen() should have failed with 401")
     57 #
     58 #        # success
     59 #        auth_handler = urllib2.HTTPBasicAuthHandler()
     60 #        auth_handler.add_password(test_realm, test_hostport,
     61 #                                  test_user, test_password)
     62 #        opener = urllib2.build_opener(auth_handler)
     63 #        f = opener.open('http://localhost/')
     64 #        response = _urlopen_with_retry("http://www.python.org/")
     65 #
     66 #        # The 'userinfo' URL component is deprecated by RFC 3986 for security
     67 #        # reasons, let's not implement it!  (it's already implemented for proxy
     68 #        # specification strings (that is, URLs or authorities specifying a
     69 #        # proxy), so we must keep that)
     70 #        self.assertRaises(httplib.InvalidURL,
     71 #                          urllib2.urlopen, "http://evil:thing@example.com")
     72 
     73 
     74 class CloseSocketTest(unittest.TestCase):
     75 
     76     def test_close(self):
     77         import httplib
     78 
     79         # calling .close() on urllib2's response objects should close the
     80         # underlying socket
     81 
     82         # delve deep into response to fetch socket._socketobject
     83         response = _urlopen_with_retry("http://www.python.org/")
     84         abused_fileobject = response.fp
     85         self.assertTrue(abused_fileobject.__class__ is socket._fileobject)
     86         httpresponse = abused_fileobject._sock
     87         self.assertTrue(httpresponse.__class__ is httplib.HTTPResponse)
     88         fileobject = httpresponse.fp
     89         self.assertTrue(fileobject.__class__ is socket._fileobject)
     90 
     91         self.assertTrue(not fileobject.closed)
     92         response.close()
     93         self.assertTrue(fileobject.closed)
     94 
     95 class OtherNetworkTests(unittest.TestCase):
     96     def setUp(self):
     97         if 0:  # for debugging
     98             import logging
     99             logger = logging.getLogger("test_urllib2net")
    100             logger.addHandler(logging.StreamHandler())
    101 
    102     # XXX The rest of these tests aren't very good -- they don't check much.
    103     # They do sometimes catch some major disasters, though.
    104 
    105     def test_ftp(self):
    106         urls = [
    107             'ftp://ftp.kernel.org/pub/linux/kernel/README',
    108             'ftp://ftp.kernel.org/pub/linux/kernel/non-existent-file',
    109             #'ftp://ftp.kernel.org/pub/leenox/kernel/test',
    110             'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC'
    111                 '/research-reports/00README-Legal-Rules-Regs',
    112             ]
    113         self._test_urls(urls, self._extra_handlers())
    114 
    115     def test_file(self):
    116         TESTFN = test_support.TESTFN
    117         f = open(TESTFN, 'w')
    118         try:
    119             f.write('hi there\n')
    120             f.close()
    121             urls = [
    122                 'file:'+sanepathname2url(os.path.abspath(TESTFN)),
    123                 ('file:///nonsensename/etc/passwd', None, urllib2.URLError),
    124                 ]
    125             self._test_urls(urls, self._extra_handlers(), retry=True)
    126         finally:
    127             os.remove(TESTFN)
    128 
    129         self.assertRaises(ValueError, urllib2.urlopen,'./relative_path/to/file')
    130 
    131     # XXX Following test depends on machine configurations that are internal
    132     # to CNRI.  Need to set up a public server with the right authentication
    133     # configuration for test purposes.
    134 
    135 ##     def test_cnri(self):
    136 ##         if socket.gethostname() == 'bitdiddle':
    137 ##             localhost = 'bitdiddle.cnri.reston.va.us'
    138 ##         elif socket.gethostname() == 'bitdiddle.concentric.net':
    139 ##             localhost = 'localhost'
    140 ##         else:
    141 ##             localhost = None
    142 ##         if localhost is not None:
    143 ##             urls = [
    144 ##                 'file://%s/etc/passwd' % localhost,
    145 ##                 'http://%s/simple/' % localhost,
    146 ##                 'http://%s/digest/' % localhost,
    147 ##                 'http://%s/not/found.h' % localhost,
    148 ##                 ]
    149 
    150 ##             bauth = HTTPBasicAuthHandler()
    151 ##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
    152 ##                                'password')
    153 ##             dauth = HTTPDigestAuthHandler()
    154 ##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
    155 ##                                'password')
    156 
    157 ##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
    158 
    159     def test_urlwithfrag(self):
    160         urlwith_frag = "http://docs.python.org/2/glossary.html#glossary"
    161         with test_support.transient_internet(urlwith_frag):
    162             req = urllib2.Request(urlwith_frag)
    163             res = urllib2.urlopen(req)
    164             self.assertEqual(res.geturl(),
    165                     "http://docs.python.org/2/glossary.html#glossary")
    166 
    167     def test_fileno(self):
    168         req = urllib2.Request("http://www.python.org")
    169         opener = urllib2.build_opener()
    170         res = opener.open(req)
    171         try:
    172             res.fileno()
    173         except AttributeError:
    174             self.fail("HTTPResponse object should return a valid fileno")
    175         finally:
    176             res.close()
    177 
    178     def test_custom_headers(self):
    179         url = "http://www.example.com"
    180         with test_support.transient_internet(url):
    181             opener = urllib2.build_opener()
    182             request = urllib2.Request(url)
    183             self.assertFalse(request.header_items())
    184             opener.open(request)
    185             self.assertTrue(request.header_items())
    186             self.assertTrue(request.has_header('User-agent'))
    187             request.add_header('User-Agent','Test-Agent')
    188             opener.open(request)
    189             self.assertEqual(request.get_header('User-agent'),'Test-Agent')
    190 
    191     def test_sites_no_connection_close(self):
    192         # Some sites do not send Connection: close header.
    193         # Verify that those work properly. (#issue12576)
    194 
    195         URL = 'http://www.imdb.com' # No Connection:close
    196         with test_support.transient_internet(URL):
    197             req = urllib2.urlopen(URL)
    198             res = req.read()
    199             self.assertTrue(res)
    200 
    201     def _test_urls(self, urls, handlers, retry=True):
    202         import time
    203         import logging
    204         debug = logging.getLogger("test_urllib2").debug
    205 
    206         urlopen = urllib2.build_opener(*handlers).open
    207         if retry:
    208             urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError)
    209 
    210         for url in urls:
    211             if isinstance(url, tuple):
    212                 url, req, expected_err = url
    213             else:
    214                 req = expected_err = None
    215             with test_support.transient_internet(url):
    216                 debug(url)
    217                 try:
    218                     f = urlopen(url, req, TIMEOUT)
    219                 except EnvironmentError as err:
    220                     debug(err)
    221                     if expected_err:
    222                         msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
    223                                (expected_err, url, req, type(err), err))
    224                         self.assertIsInstance(err, expected_err, msg)
    225                 except urllib2.URLError as err:
    226                     if isinstance(err[0], socket.timeout):
    227                         print >>sys.stderr, "<timeout: %s>" % url
    228                         continue
    229                     else:
    230                         raise
    231                 else:
    232                     try:
    233                         with test_support.transient_internet(url):
    234                             buf = f.read()
    235                             debug("read %d bytes" % len(buf))
    236                     except socket.timeout:
    237                         print >>sys.stderr, "<timeout: %s>" % url
    238                     f.close()
    239             debug("******** next url coming up...")
    240             time.sleep(0.1)
    241 
    242     def _extra_handlers(self):
    243         handlers = []
    244 
    245         cfh = urllib2.CacheFTPHandler()
    246         self.addCleanup(cfh.clear_cache)
    247         cfh.setTimeout(1)
    248         handlers.append(cfh)
    249 
    250         return handlers
    251 
    252 
    253 class TimeoutTest(unittest.TestCase):
    254     def test_http_basic(self):
    255         self.assertTrue(socket.getdefaulttimeout() is None)
    256         url = "http://www.python.org"
    257         with test_support.transient_internet(url, timeout=None):
    258             u = _urlopen_with_retry(url)
    259             self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
    260 
    261     def test_http_default_timeout(self):
    262         self.assertTrue(socket.getdefaulttimeout() is None)
    263         url = "http://www.python.org"
    264         with test_support.transient_internet(url):
    265             socket.setdefaulttimeout(60)
    266             try:
    267                 u = _urlopen_with_retry(url)
    268             finally:
    269                 socket.setdefaulttimeout(None)
    270             self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
    271 
    272     def test_http_no_timeout(self):
    273         self.assertTrue(socket.getdefaulttimeout() is None)
    274         url = "http://www.python.org"
    275         with test_support.transient_internet(url):
    276             socket.setdefaulttimeout(60)
    277             try:
    278                 u = _urlopen_with_retry(url, timeout=None)
    279             finally:
    280                 socket.setdefaulttimeout(None)
    281             self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
    282 
    283     def test_http_timeout(self):
    284         url = "http://www.python.org"
    285         with test_support.transient_internet(url):
    286             u = _urlopen_with_retry(url, timeout=120)
    287             self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
    288 
    289     FTP_HOST = "ftp://ftp.mirror.nl/pub/gnu/"
    290 
    291     def test_ftp_basic(self):
    292         self.assertTrue(socket.getdefaulttimeout() is None)
    293         with test_support.transient_internet(self.FTP_HOST, timeout=None):
    294             u = _urlopen_with_retry(self.FTP_HOST)
    295             self.assertTrue(u.fp.fp._sock.gettimeout() is None)
    296 
    297     def test_ftp_default_timeout(self):
    298         self.assertTrue(socket.getdefaulttimeout() is None)
    299         with test_support.transient_internet(self.FTP_HOST):
    300             socket.setdefaulttimeout(60)
    301             try:
    302                 u = _urlopen_with_retry(self.FTP_HOST)
    303             finally:
    304                 socket.setdefaulttimeout(None)
    305             self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
    306 
    307     def test_ftp_no_timeout(self):
    308         self.assertTrue(socket.getdefaulttimeout() is None)
    309         with test_support.transient_internet(self.FTP_HOST):
    310             socket.setdefaulttimeout(60)
    311             try:
    312                 u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
    313             finally:
    314                 socket.setdefaulttimeout(None)
    315             self.assertTrue(u.fp.fp._sock.gettimeout() is None)
    316 
    317     def test_ftp_timeout(self):
    318         with test_support.transient_internet(self.FTP_HOST):
    319             u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
    320             self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
    321 
    322 
    323 def test_main():
    324     test_support.requires("network")
    325     test_support.run_unittest(AuthTests,
    326                               OtherNetworkTests,
    327                               CloseSocketTest,
    328                               TimeoutTest,
    329                               )
    330 
    331 if __name__ == "__main__":
    332     test_main()
    333