Home | History | Annotate | Download | only in test
      1 import unittest
      2 from test import support
      3 from test.test_urllib2 import sanepathname2url
      4 
      5 import os
      6 import socket
      7 import urllib.error
      8 import urllib.request
      9 import sys
     10 
     11 support.requires("network")
     12 
     13 TIMEOUT = 60  # seconds
     14 
     15 
     16 def _retry_thrice(func, exc, *args, **kwargs):
     17     for i in range(3):
     18         try:
     19             return func(*args, **kwargs)
     20         except exc as e:
     21             last_exc = e
     22             continue
     23     raise last_exc
     24 
     25 def _wrap_with_retry_thrice(func, exc):
     26     def wrapped(*args, **kwargs):
     27         return _retry_thrice(func, exc, *args, **kwargs)
     28     return wrapped
     29 
     30 # Connecting to remote hosts is flaky.  Make it more robust by retrying
     31 # the connection several times.
     32 _urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen,
     33                                               urllib.error.URLError)
     34 
     35 
     36 class AuthTests(unittest.TestCase):
     37     """Tests urllib2 authentication features."""
     38 
     39 ## Disabled at the moment since there is no page under python.org which
     40 ## could be used to HTTP authentication.
     41 #
     42 #    def test_basic_auth(self):
     43 #        import http.client
     44 #
     45 #        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
     46 #        test_hostport = "www.python.org"
     47 #        test_realm = 'Test Realm'
     48 #        test_user = 'test.test_urllib2net'
     49 #        test_password = 'blah'
     50 #
     51 #        # failure
     52 #        try:
     53 #            _urlopen_with_retry(test_url)
     54 #        except urllib2.HTTPError, exc:
     55 #            self.assertEqual(exc.code, 401)
     56 #        else:
     57 #            self.fail("urlopen() should have failed with 401")
     58 #
     59 #        # success
     60 #        auth_handler = urllib2.HTTPBasicAuthHandler()
     61 #        auth_handler.add_password(test_realm, test_hostport,
     62 #                                  test_user, test_password)
     63 #        opener = urllib2.build_opener(auth_handler)
     64 #        f = opener.open('http://localhost/')
     65 #        response = _urlopen_with_retry("http://www.python.org/")
     66 #
     67 #        # The 'userinfo' URL component is deprecated by RFC 3986 for security
     68 #        # reasons, let's not implement it!  (it's already implemented for proxy
     69 #        # specification strings (that is, URLs or authorities specifying a
     70 #        # proxy), so we must keep that)
     71 #        self.assertRaises(http.client.InvalidURL,
     72 #                          urllib2.urlopen, "http://evil:thing@example.com")
     73 
     74 
     75 class CloseSocketTest(unittest.TestCase):
     76 
     77     def test_close(self):
     78         # calling .close() on urllib2's response objects should close the
     79         # underlying socket
     80         url = "http://www.example.com/"
     81         with support.transient_internet(url):
     82             response = _urlopen_with_retry(url)
     83             sock = response.fp
     84             self.assertFalse(sock.closed)
     85             response.close()
     86             self.assertTrue(sock.closed)
     87 
     88 class OtherNetworkTests(unittest.TestCase):
     89     def setUp(self):
     90         if 0:  # for debugging
     91             import logging
     92             logger = logging.getLogger("test_urllib2net")
     93             logger.addHandler(logging.StreamHandler())
     94 
     95     # XXX The rest of these tests aren't very good -- they don't check much.
     96     # They do sometimes catch some major disasters, though.
     97 
     98     def test_ftp(self):
     99         urls = [
    100             'ftp://ftp.debian.org/debian/README',
    101             ('ftp://ftp.debian.org/debian/non-existent-file',
    102              None, urllib.error.URLError),
    103             ]
    104         self._test_urls(urls, self._extra_handlers())
    105 
    106     def test_file(self):
    107         TESTFN = support.TESTFN
    108         f = open(TESTFN, 'w')
    109         try:
    110             f.write('hi there\n')
    111             f.close()
    112             urls = [
    113                 'file:' + sanepathname2url(os.path.abspath(TESTFN)),
    114                 ('file:///nonsensename/etc/passwd', None,
    115                  urllib.error.URLError),
    116                 ]
    117             self._test_urls(urls, self._extra_handlers(), retry=True)
    118         finally:
    119             os.remove(TESTFN)
    120 
    121         self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file')
    122 
    123     # XXX Following test depends on machine configurations that are internal
    124     # to CNRI.  Need to set up a public server with the right authentication
    125     # configuration for test purposes.
    126 
    127 ##     def test_cnri(self):
    128 ##         if socket.gethostname() == 'bitdiddle':
    129 ##             localhost = 'bitdiddle.cnri.reston.va.us'
    130 ##         elif socket.gethostname() == 'bitdiddle.concentric.net':
    131 ##             localhost = 'localhost'
    132 ##         else:
    133 ##             localhost = None
    134 ##         if localhost is not None:
    135 ##             urls = [
    136 ##                 'file://%s/etc/passwd' % localhost,
    137 ##                 'http://%s/simple/' % localhost,
    138 ##                 'http://%s/digest/' % localhost,
    139 ##                 'http://%s/not/found.h' % localhost,
    140 ##                 ]
    141 
    142 ##             bauth = HTTPBasicAuthHandler()
    143 ##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
    144 ##                                'password')
    145 ##             dauth = HTTPDigestAuthHandler()
    146 ##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
    147 ##                                'password')
    148 
    149 ##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
    150 
    151     def test_urlwithfrag(self):
    152         urlwith_frag = "http://www.pythontest.net/index.html#frag"
    153         with support.transient_internet(urlwith_frag):
    154             req = urllib.request.Request(urlwith_frag)
    155             res = urllib.request.urlopen(req)
    156             self.assertEqual(res.geturl(),
    157                     "http://www.pythontest.net/index.html#frag")
    158 
    159     def test_redirect_url_withfrag(self):
    160         redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/"
    161         with support.transient_internet(redirect_url_with_frag):
    162             req = urllib.request.Request(redirect_url_with_frag)
    163             res = urllib.request.urlopen(req)
    164             self.assertEqual(res.geturl(),
    165                     "http://www.pythontest.net/elsewhere/#frag")
    166 
    167     def test_custom_headers(self):
    168         url = "http://www.example.com"
    169         with support.transient_internet(url):
    170             opener = urllib.request.build_opener()
    171             request = urllib.request.Request(url)
    172             self.assertFalse(request.header_items())
    173             opener.open(request)
    174             self.assertTrue(request.header_items())
    175             self.assertTrue(request.has_header('User-agent'))
    176             request.add_header('User-Agent','Test-Agent')
    177             opener.open(request)
    178             self.assertEqual(request.get_header('User-agent'),'Test-Agent')
    179 
    180     def test_sites_no_connection_close(self):
    181         # Some sites do not send Connection: close header.
    182         # Verify that those work properly. (#issue12576)
    183 
    184         URL = 'http://www.imdb.com' # mangles Connection:close
    185 
    186         with support.transient_internet(URL):
    187             try:
    188                 with urllib.request.urlopen(URL) as res:
    189                     pass
    190             except ValueError as e:
    191                 self.fail("urlopen failed for site not sending \
    192                            Connection:close")
    193             else:
    194                 self.assertTrue(res)
    195 
    196             req = urllib.request.urlopen(URL)
    197             res = req.read()
    198             self.assertTrue(res)
    199 
    200     def _test_urls(self, urls, handlers, retry=True):
    201         import time
    202         import logging
    203         debug = logging.getLogger("test_urllib2").debug
    204 
    205         urlopen = urllib.request.build_opener(*handlers).open
    206         if retry:
    207             urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError)
    208 
    209         for url in urls:
    210             with self.subTest(url=url):
    211                 if isinstance(url, tuple):
    212                     url, req, expected_err = url
    213                 else:
    214                     req = expected_err = None
    215 
    216                 with support.transient_internet(url):
    217                     try:
    218                         f = urlopen(url, req, TIMEOUT)
    219                     # urllib.error.URLError is a subclass of OSError
    220                     except OSError as err:
    221                         if expected_err:
    222                             msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
    223                                    (expected_err, url, req, type(err), err))
    224                             self.assertIsInstance(err, expected_err, msg)
    225                         else:
    226                             raise
    227                     else:
    228                         try:
    229                             with support.time_out, \
    230                                  support.socket_peer_reset, \
    231                                  support.ioerror_peer_reset:
    232                                 buf = f.read()
    233                                 debug("read %d bytes" % len(buf))
    234                         except socket.timeout:
    235                             print("<timeout: %s>" % url, file=sys.stderr)
    236                         f.close()
    237                 time.sleep(0.1)
    238 
    239     def _extra_handlers(self):
    240         handlers = []
    241 
    242         cfh = urllib.request.CacheFTPHandler()
    243         self.addCleanup(cfh.clear_cache)
    244         cfh.setTimeout(1)
    245         handlers.append(cfh)
    246 
    247         return handlers
    248 
    249 
    250 class TimeoutTest(unittest.TestCase):
    251     def test_http_basic(self):
    252         self.assertIsNone(socket.getdefaulttimeout())
    253         url = "http://www.example.com"
    254         with support.transient_internet(url, timeout=None):
    255             u = _urlopen_with_retry(url)
    256             self.addCleanup(u.close)
    257             self.assertIsNone(u.fp.raw._sock.gettimeout())
    258 
    259     def test_http_default_timeout(self):
    260         self.assertIsNone(socket.getdefaulttimeout())
    261         url = "http://www.example.com"
    262         with support.transient_internet(url):
    263             socket.setdefaulttimeout(60)
    264             try:
    265                 u = _urlopen_with_retry(url)
    266                 self.addCleanup(u.close)
    267             finally:
    268                 socket.setdefaulttimeout(None)
    269             self.assertEqual(u.fp.raw._sock.gettimeout(), 60)
    270 
    271     def test_http_no_timeout(self):
    272         self.assertIsNone(socket.getdefaulttimeout())
    273         url = "http://www.example.com"
    274         with support.transient_internet(url):
    275             socket.setdefaulttimeout(60)
    276             try:
    277                 u = _urlopen_with_retry(url, timeout=None)
    278                 self.addCleanup(u.close)
    279             finally:
    280                 socket.setdefaulttimeout(None)
    281             self.assertIsNone(u.fp.raw._sock.gettimeout())
    282 
    283     def test_http_timeout(self):
    284         url = "http://www.example.com"
    285         with support.transient_internet(url):
    286             u = _urlopen_with_retry(url, timeout=120)
    287             self.addCleanup(u.close)
    288             self.assertEqual(u.fp.raw._sock.gettimeout(), 120)
    289 
    290     FTP_HOST = 'ftp://ftp.debian.org/debian/'
    291 
    292     def test_ftp_basic(self):
    293         self.assertIsNone(socket.getdefaulttimeout())
    294         with support.transient_internet(self.FTP_HOST, timeout=None):
    295             u = _urlopen_with_retry(self.FTP_HOST)
    296             self.addCleanup(u.close)
    297             self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
    298 
    299     def test_ftp_default_timeout(self):
    300         self.assertIsNone(socket.getdefaulttimeout())
    301         with support.transient_internet(self.FTP_HOST):
    302             socket.setdefaulttimeout(60)
    303             try:
    304                 u = _urlopen_with_retry(self.FTP_HOST)
    305                 self.addCleanup(u.close)
    306             finally:
    307                 socket.setdefaulttimeout(None)
    308             self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
    309 
    310     def test_ftp_no_timeout(self):
    311         self.assertIsNone(socket.getdefaulttimeout())
    312         with support.transient_internet(self.FTP_HOST):
    313             socket.setdefaulttimeout(60)
    314             try:
    315                 u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
    316                 self.addCleanup(u.close)
    317             finally:
    318                 socket.setdefaulttimeout(None)
    319             self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
    320 
    321     def test_ftp_timeout(self):
    322         with support.transient_internet(self.FTP_HOST):
    323             u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
    324             self.addCleanup(u.close)
    325             self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
    326 
    327 
    328 if __name__ == "__main__":
    329     unittest.main()
    330