Home | History | Annotate | Download | only in test
      1 import unittest
      2 from test import test_support
      3 from test.test_urllib2 import sanepathname2url
      4 
      5 import socket
      6 import urllib2
      7 import os
      8 import sys
      9 
     10 TIMEOUT = 60  # seconds
     11 
     12 
     13 def _retry_thrice(func, exc, *args, **kwargs):
     14     for i in range(3):
     15         try:
     16             return func(*args, **kwargs)
     17         except exc, last_exc:
     18             continue
     19         except:
     20             raise
     21     raise last_exc
     22 
     23 def _wrap_with_retry_thrice(func, exc):
     24     def wrapped(*args, **kwargs):
     25         return _retry_thrice(func, exc, *args, **kwargs)
     26     return wrapped
     27 
     28 # Connecting to remote hosts is flaky.  Make it more robust by retrying
     29 # the connection several times.
     30 _urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError)
     31 
     32 
     33 class AuthTests(unittest.TestCase):
     34     """Tests urllib2 authentication features."""
     35 
     36 ## Disabled at the moment since there is no page under python.org which
     37 ## could be used to HTTP authentication.
     38 #
     39 #    def test_basic_auth(self):
     40 #        import httplib
     41 #
     42 #        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
     43 #        test_hostport = "www.python.org"
     44 #        test_realm = 'Test Realm'
     45 #        test_user = 'test.test_urllib2net'
     46 #        test_password = 'blah'
     47 #
     48 #        # failure
     49 #        try:
     50 #            _urlopen_with_retry(test_url)
     51 #        except urllib2.HTTPError, exc:
     52 #            self.assertEqual(exc.code, 401)
     53 #        else:
     54 #            self.fail("urlopen() should have failed with 401")
     55 #
     56 #        # success
     57 #        auth_handler = urllib2.HTTPBasicAuthHandler()
     58 #        auth_handler.add_password(test_realm, test_hostport,
     59 #                                  test_user, test_password)
     60 #        opener = urllib2.build_opener(auth_handler)
     61 #        f = opener.open('http://localhost/')
     62 #        response = _urlopen_with_retry("http://www.python.org/")
     63 #
     64 #        # The 'userinfo' URL component is deprecated by RFC 3986 for security
     65 #        # reasons, let's not implement it!  (it's already implemented for proxy
     66 #        # specification strings (that is, URLs or authorities specifying a
     67 #        # proxy), so we must keep that)
     68 #        self.assertRaises(httplib.InvalidURL,
     69 #                          urllib2.urlopen, "http://evil:thing@example.com")
     70 
     71 
     72 class CloseSocketTest(unittest.TestCase):
     73 
     74     def test_close(self):
     75         import httplib
     76 
     77         # calling .close() on urllib2's response objects should close the
     78         # underlying socket
     79 
     80         # delve deep into response to fetch socket._socketobject
     81         response = _urlopen_with_retry("http://www.example.com/")
     82         abused_fileobject = response.fp
     83         self.assertIs(abused_fileobject.__class__, socket._fileobject)
     84         httpresponse = abused_fileobject._sock
     85         self.assertIs(httpresponse.__class__, httplib.HTTPResponse)
     86         fileobject = httpresponse.fp
     87         self.assertIs(fileobject.__class__, socket._fileobject)
     88 
     89         self.assertTrue(not fileobject.closed)
     90         response.close()
     91         self.assertTrue(fileobject.closed)
     92 
     93 class OtherNetworkTests(unittest.TestCase):
     94     def setUp(self):
     95         if 0:  # for debugging
     96             import logging
     97             logger = logging.getLogger("test_urllib2net")
     98             logger.addHandler(logging.StreamHandler())
     99 
    100     # XXX The rest of these tests aren't very good -- they don't check much.
    101     # They do sometimes catch some major disasters, though.
    102 
    103     def test_ftp(self):
    104         urls = [
    105             'ftp://ftp.debian.org/debian/README',
    106             ('ftp://ftp.debian.org/debian/non-existent-file',
    107              None, urllib2.URLError),
    108             ]
    109         self._test_urls(urls, self._extra_handlers())
    110 
    111     def test_file(self):
    112         TESTFN = test_support.TESTFN
    113         f = open(TESTFN, 'w')
    114         try:
    115             f.write('hi there\n')
    116             f.close()
    117             urls = [
    118                 'file:'+sanepathname2url(os.path.abspath(TESTFN)),
    119                 ('file:///nonsensename/etc/passwd', None, urllib2.URLError),
    120                 ]
    121             self._test_urls(urls, self._extra_handlers(), retry=True)
    122         finally:
    123             os.remove(TESTFN)
    124 
    125         self.assertRaises(ValueError, urllib2.urlopen,'./relative_path/to/file')
    126 
    127     # XXX Following test depends on machine configurations that are internal
    128     # to CNRI.  Need to set up a public server with the right authentication
    129     # configuration for test purposes.
    130 
    131 ##     def test_cnri(self):
    132 ##         if socket.gethostname() == 'bitdiddle':
    133 ##             localhost = 'bitdiddle.cnri.reston.va.us'
    134 ##         elif socket.gethostname() == 'bitdiddle.concentric.net':
    135 ##             localhost = 'localhost'
    136 ##         else:
    137 ##             localhost = None
    138 ##         if localhost is not None:
    139 ##             urls = [
    140 ##                 'file://%s/etc/passwd' % localhost,
    141 ##                 'http://%s/simple/' % localhost,
    142 ##                 'http://%s/digest/' % localhost,
    143 ##                 'http://%s/not/found.h' % localhost,
    144 ##                 ]
    145 
    146 ##             bauth = HTTPBasicAuthHandler()
    147 ##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
    148 ##                                'password')
    149 ##             dauth = HTTPDigestAuthHandler()
    150 ##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
    151 ##                                'password')
    152 
    153 ##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
    154 
    155     def test_urlwithfrag(self):
    156         urlwith_frag = "http://www.pythontest.net/index.html#frag"
    157         with test_support.transient_internet(urlwith_frag):
    158             req = urllib2.Request(urlwith_frag)
    159             res = urllib2.urlopen(req)
    160             self.assertEqual(res.geturl(),
    161                     "http://www.pythontest.net/index.html#frag")
    162 
    163     def test_fileno(self):
    164         req = urllib2.Request("http://www.example.com")
    165         opener = urllib2.build_opener()
    166         res = opener.open(req)
    167         try:
    168             res.fileno()
    169         except AttributeError:
    170             self.fail("HTTPResponse object should return a valid fileno")
    171         finally:
    172             res.close()
    173 
    174     def test_custom_headers(self):
    175         url = "http://www.example.com"
    176         with test_support.transient_internet(url):
    177             opener = urllib2.build_opener()
    178             request = urllib2.Request(url)
    179             self.assertFalse(request.header_items())
    180             opener.open(request)
    181             self.assertTrue(request.header_items())
    182             self.assertTrue(request.has_header('User-agent'))
    183             request.add_header('User-Agent','Test-Agent')
    184             opener.open(request)
    185             self.assertEqual(request.get_header('User-agent'),'Test-Agent')
    186 
    187     def test_sites_no_connection_close(self):
    188         # Some sites do not send Connection: close header.
    189         # Verify that those work properly. (#issue12576)
    190 
    191         URL = 'http://www.imdb.com' # No Connection:close
    192         with test_support.transient_internet(URL):
    193             req = urllib2.urlopen(URL)
    194             res = req.read()
    195             self.assertTrue(res)
    196 
    197     def _test_urls(self, urls, handlers, retry=True):
    198         import time
    199         import logging
    200         debug = logging.getLogger("test_urllib2").debug
    201 
    202         urlopen = urllib2.build_opener(*handlers).open
    203         if retry:
    204             urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError)
    205 
    206         for url in urls:
    207             if isinstance(url, tuple):
    208                 url, req, expected_err = url
    209             else:
    210                 req = expected_err = None
    211             with test_support.transient_internet(url):
    212                 debug(url)
    213                 try:
    214                     f = urlopen(url, req, TIMEOUT)
    215                 except EnvironmentError as err:
    216                     debug(err)
    217                     if expected_err:
    218                         msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
    219                                (expected_err, url, req, type(err), err))
    220                         self.assertIsInstance(err, expected_err, msg)
    221                 except urllib2.URLError as err:
    222                     if isinstance(err[0], socket.timeout):
    223                         print >>sys.stderr, "<timeout: %s>" % url
    224                         continue
    225                     else:
    226                         raise
    227                 else:
    228                     try:
    229                         with test_support.transient_internet(url):
    230                             buf = f.read()
    231                             debug("read %d bytes" % len(buf))
    232                     except socket.timeout:
    233                         print >>sys.stderr, "<timeout: %s>" % url
    234                     f.close()
    235             debug("******** next url coming up...")
    236             time.sleep(0.1)
    237 
    238     def _extra_handlers(self):
    239         handlers = []
    240 
    241         cfh = urllib2.CacheFTPHandler()
    242         self.addCleanup(cfh.clear_cache)
    243         cfh.setTimeout(1)
    244         handlers.append(cfh)
    245 
    246         return handlers
    247 
    248 
    249 class TimeoutTest(unittest.TestCase):
    250     def test_http_basic(self):
    251         self.assertIsNone(socket.getdefaulttimeout())
    252         url = "http://www.example.com"
    253         with test_support.transient_internet(url, timeout=None):
    254             u = _urlopen_with_retry(url)
    255             self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
    256 
    257     def test_http_default_timeout(self):
    258         self.assertIsNone(socket.getdefaulttimeout())
    259         url = "http://www.example.com"
    260         with test_support.transient_internet(url):
    261             socket.setdefaulttimeout(60)
    262             try:
    263                 u = _urlopen_with_retry(url)
    264             finally:
    265                 socket.setdefaulttimeout(None)
    266             self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
    267 
    268     def test_http_no_timeout(self):
    269         self.assertIsNone(socket.getdefaulttimeout())
    270         url = "http://www.example.com"
    271         with test_support.transient_internet(url):
    272             socket.setdefaulttimeout(60)
    273             try:
    274                 u = _urlopen_with_retry(url, timeout=None)
    275             finally:
    276                 socket.setdefaulttimeout(None)
    277             self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
    278 
    279     def test_http_timeout(self):
    280         url = "http://www.example.com"
    281         with test_support.transient_internet(url):
    282             u = _urlopen_with_retry(url, timeout=120)
    283             self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
    284 
    285     FTP_HOST = 'ftp://ftp.debian.org/debian/'
    286 
    287     def test_ftp_basic(self):
    288         self.assertIsNone(socket.getdefaulttimeout())
    289         with test_support.transient_internet(self.FTP_HOST, timeout=None):
    290             u = _urlopen_with_retry(self.FTP_HOST)
    291             self.assertIsNone(u.fp.fp._sock.gettimeout())
    292 
    293     def test_ftp_default_timeout(self):
    294         self.assertIsNone(socket.getdefaulttimeout())
    295         with test_support.transient_internet(self.FTP_HOST):
    296             socket.setdefaulttimeout(60)
    297             try:
    298                 u = _urlopen_with_retry(self.FTP_HOST)
    299             finally:
    300                 socket.setdefaulttimeout(None)
    301             self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
    302 
    303     def test_ftp_no_timeout(self):
    304         self.assertIsNone(socket.getdefaulttimeout(),)
    305         with test_support.transient_internet(self.FTP_HOST):
    306             socket.setdefaulttimeout(60)
    307             try:
    308                 u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
    309             finally:
    310                 socket.setdefaulttimeout(None)
    311             self.assertIsNone(u.fp.fp._sock.gettimeout())
    312 
    313     def test_ftp_timeout(self):
    314         with test_support.transient_internet(self.FTP_HOST):
    315             u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
    316             self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
    317 
    318 
    319 def test_main():
    320     test_support.requires("network")
    321     test_support.run_unittest(AuthTests,
    322                               OtherNetworkTests,
    323                               CloseSocketTest,
    324                               TimeoutTest,
    325                               )
    326 
    327 if __name__ == "__main__":
    328     test_main()
    329