Home | History | Annotate | Download | only in test
      1 #!/usr/bin/env python

      2 
      3 import unittest
      4 from test import test_support
      5 from test.test_urllib2 import sanepathname2url
      6 
      7 import socket
      8 import urllib2
      9 import os
     10 import sys
     11 
     12 TIMEOUT = 60  # seconds

     13 
     14 
     15 def _retry_thrice(func, exc, *args, **kwargs):
     16     for i in range(3):
     17         try:
     18             return func(*args, **kwargs)
     19         except exc, last_exc:
     20             continue
     21         except:
     22             raise
     23     raise last_exc
     24 
     25 def _wrap_with_retry_thrice(func, exc):
     26     def wrapped(*args, **kwargs):
     27         return _retry_thrice(func, exc, *args, **kwargs)
     28     return wrapped
     29 
     30 # Connecting to remote hosts is flaky.  Make it more robust by retrying

     31 # the connection several times.

     32 _urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError)
     33 
     34 
     35 class AuthTests(unittest.TestCase):
     36     """Tests urllib2 authentication features."""
     37 
     38 ## Disabled at the moment since there is no page under python.org which

     39 ## could be used to HTTP authentication.

     40 #

     41 #    def test_basic_auth(self):

     42 #        import httplib

     43 #

     44 #        test_url = "http://www.python.org/test/test_urllib2/basic_auth"

     45 #        test_hostport = "www.python.org"

     46 #        test_realm = 'Test Realm'

     47 #        test_user = 'test.test_urllib2net'

     48 #        test_password = 'blah'

     49 #

     50 #        # failure

     51 #        try:

     52 #            _urlopen_with_retry(test_url)

     53 #        except urllib2.HTTPError, exc:

     54 #            self.assertEqual(exc.code, 401)

     55 #        else:

     56 #            self.fail("urlopen() should have failed with 401")

     57 #

     58 #        # success

     59 #        auth_handler = urllib2.HTTPBasicAuthHandler()

     60 #        auth_handler.add_password(test_realm, test_hostport,

     61 #                                  test_user, test_password)

     62 #        opener = urllib2.build_opener(auth_handler)

     63 #        f = opener.open('http://localhost/')

     64 #        response = _urlopen_with_retry("http://www.python.org/")

     65 #

     66 #        # The 'userinfo' URL component is deprecated by RFC 3986 for security

     67 #        # reasons, let's not implement it!  (it's already implemented for proxy

     68 #        # specification strings (that is, URLs or authorities specifying a

     69 #        # proxy), so we must keep that)

     70 #        self.assertRaises(httplib.InvalidURL,

     71 #                          urllib2.urlopen, "http://evil:thing@example.com")

     72 
     73 
     74 class CloseSocketTest(unittest.TestCase):
     75 
     76     def test_close(self):
     77         import httplib
     78 
     79         # calling .close() on urllib2's response objects should close the

     80         # underlying socket

     81 
     82         # delve deep into response to fetch socket._socketobject

     83         response = _urlopen_with_retry("http://www.python.org/")
     84         abused_fileobject = response.fp
     85         self.assertTrue(abused_fileobject.__class__ is socket._fileobject)
     86         httpresponse = abused_fileobject._sock
     87         self.assertTrue(httpresponse.__class__ is httplib.HTTPResponse)
     88         fileobject = httpresponse.fp
     89         self.assertTrue(fileobject.__class__ is socket._fileobject)
     90 
     91         self.assertTrue(not fileobject.closed)
     92         response.close()
     93         self.assertTrue(fileobject.closed)
     94 
     95 class OtherNetworkTests(unittest.TestCase):
     96     def setUp(self):
     97         if 0:  # for debugging

     98             import logging
     99             logger = logging.getLogger("test_urllib2net")
    100             logger.addHandler(logging.StreamHandler())
    101 
    102     # XXX The rest of these tests aren't very good -- they don't check much.

    103     # They do sometimes catch some major disasters, though.

    104 
    105     def test_ftp(self):
    106         urls = [
    107             'ftp://ftp.kernel.org/pub/linux/kernel/README',
    108             'ftp://ftp.kernel.org/pub/linux/kernel/non-existent-file',
    109             #'ftp://ftp.kernel.org/pub/leenox/kernel/test',

    110             'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC'
    111                 '/research-reports/00README-Legal-Rules-Regs',
    112             ]
    113         self._test_urls(urls, self._extra_handlers())
    114 
    115     def test_file(self):
    116         TESTFN = test_support.TESTFN
    117         f = open(TESTFN, 'w')
    118         try:
    119             f.write('hi there\n')
    120             f.close()
    121             urls = [
    122                 'file:'+sanepathname2url(os.path.abspath(TESTFN)),
    123                 ('file:///nonsensename/etc/passwd', None, urllib2.URLError),
    124                 ]
    125             self._test_urls(urls, self._extra_handlers(), retry=True)
    126         finally:
    127             os.remove(TESTFN)
    128 
    129     # XXX Following test depends on machine configurations that are internal

    130     # to CNRI.  Need to set up a public server with the right authentication

    131     # configuration for test purposes.

    132 
    133 ##     def test_cnri(self):

    134 ##         if socket.gethostname() == 'bitdiddle':

    135 ##             localhost = 'bitdiddle.cnri.reston.va.us'

    136 ##         elif socket.gethostname() == 'bitdiddle.concentric.net':

    137 ##             localhost = 'localhost'

    138 ##         else:

    139 ##             localhost = None

    140 ##         if localhost is not None:

    141 ##             urls = [

    142 ##                 'file://%s/etc/passwd' % localhost,

    143 ##                 'http://%s/simple/' % localhost,

    144 ##                 'http://%s/digest/' % localhost,

    145 ##                 'http://%s/not/found.h' % localhost,

    146 ##                 ]

    147 
    148 ##             bauth = HTTPBasicAuthHandler()

    149 ##             bauth.add_password('basic_test_realm', localhost, 'jhylton',

    150 ##                                'password')

    151 ##             dauth = HTTPDigestAuthHandler()

    152 ##             dauth.add_password('digest_test_realm', localhost, 'jhylton',

    153 ##                                'password')

    154 
    155 ##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])

    156 
    157     def test_urlwithfrag(self):
    158         urlwith_frag = "http://docs.python.org/glossary.html#glossary"
    159         with test_support.transient_internet(urlwith_frag):
    160             req = urllib2.Request(urlwith_frag)
    161             res = urllib2.urlopen(req)
    162             self.assertEqual(res.geturl(),
    163                     "http://docs.python.org/glossary.html#glossary")
    164 
    165     def test_fileno(self):
    166         req = urllib2.Request("http://www.python.org")
    167         opener = urllib2.build_opener()
    168         res = opener.open(req)
    169         try:
    170             res.fileno()
    171         except AttributeError:
    172             self.fail("HTTPResponse object should return a valid fileno")
    173         finally:
    174             res.close()
    175 
    176     def test_custom_headers(self):
    177         url = "http://www.example.com"
    178         with test_support.transient_internet(url):
    179             opener = urllib2.build_opener()
    180             request = urllib2.Request(url)
    181             self.assertFalse(request.header_items())
    182             opener.open(request)
    183             self.assertTrue(request.header_items())
    184             self.assertTrue(request.has_header('User-agent'))
    185             request.add_header('User-Agent','Test-Agent')
    186             opener.open(request)
    187             self.assertEqual(request.get_header('User-agent'),'Test-Agent')
    188 
    189     def _test_urls(self, urls, handlers, retry=True):
    190         import time
    191         import logging
    192         debug = logging.getLogger("test_urllib2").debug
    193 
    194         urlopen = urllib2.build_opener(*handlers).open
    195         if retry:
    196             urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError)
    197 
    198         for url in urls:
    199             if isinstance(url, tuple):
    200                 url, req, expected_err = url
    201             else:
    202                 req = expected_err = None
    203             with test_support.transient_internet(url):
    204                 debug(url)
    205                 try:
    206                     f = urlopen(url, req, TIMEOUT)
    207                 except EnvironmentError as err:
    208                     debug(err)
    209                     if expected_err:
    210                         msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
    211                                (expected_err, url, req, type(err), err))
    212                         self.assertIsInstance(err, expected_err, msg)
    213                 except urllib2.URLError as err:
    214                     if isinstance(err[0], socket.timeout):
    215                         print >>sys.stderr, "<timeout: %s>" % url
    216                         continue
    217                     else:
    218                         raise
    219                 else:
    220                     try:
    221                         with test_support.transient_internet(url):
    222                             buf = f.read()
    223                             debug("read %d bytes" % len(buf))
    224                     except socket.timeout:
    225                         print >>sys.stderr, "<timeout: %s>" % url
    226                     f.close()
    227             debug("******** next url coming up...")
    228             time.sleep(0.1)
    229 
    230     def _extra_handlers(self):
    231         handlers = []
    232 
    233         cfh = urllib2.CacheFTPHandler()
    234         cfh.setTimeout(1)
    235         handlers.append(cfh)
    236 
    237         return handlers
    238 
    239 
    240 class TimeoutTest(unittest.TestCase):
    241     def test_http_basic(self):
    242         self.assertTrue(socket.getdefaulttimeout() is None)
    243         url = "http://www.python.org"
    244         with test_support.transient_internet(url, timeout=None):
    245             u = _urlopen_with_retry(url)
    246             self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
    247 
    248     def test_http_default_timeout(self):
    249         self.assertTrue(socket.getdefaulttimeout() is None)
    250         url = "http://www.python.org"
    251         with test_support.transient_internet(url):
    252             socket.setdefaulttimeout(60)
    253             try:
    254                 u = _urlopen_with_retry(url)
    255             finally:
    256                 socket.setdefaulttimeout(None)
    257             self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
    258 
    259     def test_http_no_timeout(self):
    260         self.assertTrue(socket.getdefaulttimeout() is None)
    261         url = "http://www.python.org"
    262         with test_support.transient_internet(url):
    263             socket.setdefaulttimeout(60)
    264             try:
    265                 u = _urlopen_with_retry(url, timeout=None)
    266             finally:
    267                 socket.setdefaulttimeout(None)
    268             self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
    269 
    270     def test_http_timeout(self):
    271         url = "http://www.python.org"
    272         with test_support.transient_internet(url):
    273             u = _urlopen_with_retry(url, timeout=120)
    274             self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
    275 
    276     FTP_HOST = "ftp://ftp.mirror.nl/pub/gnu/"
    277 
    278     def test_ftp_basic(self):
    279         self.assertTrue(socket.getdefaulttimeout() is None)
    280         with test_support.transient_internet(self.FTP_HOST, timeout=None):
    281             u = _urlopen_with_retry(self.FTP_HOST)
    282             self.assertTrue(u.fp.fp._sock.gettimeout() is None)
    283 
    284     def test_ftp_default_timeout(self):
    285         self.assertTrue(socket.getdefaulttimeout() is None)
    286         with test_support.transient_internet(self.FTP_HOST):
    287             socket.setdefaulttimeout(60)
    288             try:
    289                 u = _urlopen_with_retry(self.FTP_HOST)
    290             finally:
    291                 socket.setdefaulttimeout(None)
    292             self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
    293 
    294     def test_ftp_no_timeout(self):
    295         self.assertTrue(socket.getdefaulttimeout() is None)
    296         with test_support.transient_internet(self.FTP_HOST):
    297             socket.setdefaulttimeout(60)
    298             try:
    299                 u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
    300             finally:
    301                 socket.setdefaulttimeout(None)
    302             self.assertTrue(u.fp.fp._sock.gettimeout() is None)
    303 
    304     def test_ftp_timeout(self):
    305         with test_support.transient_internet(self.FTP_HOST):
    306             u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
    307             self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
    308 
    309 
    310 def test_main():
    311     test_support.requires("network")
    312     test_support.run_unittest(AuthTests,
    313                               OtherNetworkTests,
    314                               CloseSocketTest,
    315                               TimeoutTest,
    316                               )
    317 
    318 if __name__ == "__main__":
    319     test_main()
    320