Home | History | Annotate | Download | only in test
      1 import unittest
      2 from test import support
      3 
      4 import contextlib
      5 import socket
      6 import urllib.request
      7 import os
      8 import email.message
      9 import time
     10 
     11 
     12 support.requires('network')
     13 
     14 class URLTimeoutTest(unittest.TestCase):
     15     # XXX this test doesn't seem to test anything useful.
     16 
     17     TIMEOUT = 30.0
     18 
     19     def setUp(self):
     20         socket.setdefaulttimeout(self.TIMEOUT)
     21 
     22     def tearDown(self):
     23         socket.setdefaulttimeout(None)
     24 
     25     def testURLread(self):
     26         with support.transient_internet("www.example.com"):
     27             f = urllib.request.urlopen("http://www.example.com/")
     28             x = f.read()
     29 
     30 
     31 class urlopenNetworkTests(unittest.TestCase):
     32     """Tests urllib.reqest.urlopen using the network.
     33 
     34     These tests are not exhaustive.  Assuming that testing using files does a
     35     good job overall of some of the basic interface features.  There are no
     36     tests exercising the optional 'data' and 'proxies' arguments.  No tests
     37     for transparent redirection have been written.
     38 
     39     setUp is not used for always constructing a connection to
     40     http://www.pythontest.net/ since there a few tests that don't use that address
     41     and making a connection is expensive enough to warrant minimizing unneeded
     42     connections.
     43 
     44     """
     45 
     46     url = 'http://www.pythontest.net/'
     47 
     48     @contextlib.contextmanager
     49     def urlopen(self, *args, **kwargs):
     50         resource = args[0]
     51         with support.transient_internet(resource):
     52             r = urllib.request.urlopen(*args, **kwargs)
     53             try:
     54                 yield r
     55             finally:
     56                 r.close()
     57 
     58     def test_basic(self):
     59         # Simple test expected to pass.
     60         with self.urlopen(self.url) as open_url:
     61             for attr in ("read", "readline", "readlines", "fileno", "close",
     62                          "info", "geturl"):
     63                 self.assertTrue(hasattr(open_url, attr), "object returned from "
     64                                 "urlopen lacks the %s attribute" % attr)
     65             self.assertTrue(open_url.read(), "calling 'read' failed")
     66 
     67     def test_readlines(self):
     68         # Test both readline and readlines.
     69         with self.urlopen(self.url) as open_url:
     70             self.assertIsInstance(open_url.readline(), bytes,
     71                                   "readline did not return a string")
     72             self.assertIsInstance(open_url.readlines(), list,
     73                                   "readlines did not return a list")
     74 
     75     def test_info(self):
     76         # Test 'info'.
     77         with self.urlopen(self.url) as open_url:
     78             info_obj = open_url.info()
     79             self.assertIsInstance(info_obj, email.message.Message,
     80                                   "object returned by 'info' is not an "
     81                                   "instance of email.message.Message")
     82             self.assertEqual(info_obj.get_content_subtype(), "html")
     83 
     84     def test_geturl(self):
     85         # Make sure same URL as opened is returned by geturl.
     86         with self.urlopen(self.url) as open_url:
     87             gotten_url = open_url.geturl()
     88             self.assertEqual(gotten_url, self.url)
     89 
     90     def test_getcode(self):
     91         # test getcode() with the fancy opener to get 404 error codes
     92         URL = self.url + "XXXinvalidXXX"
     93         with support.transient_internet(URL):
     94             with self.assertWarns(DeprecationWarning):
     95                 open_url = urllib.request.FancyURLopener().open(URL)
     96             try:
     97                 code = open_url.getcode()
     98             finally:
     99                 open_url.close()
    100             self.assertEqual(code, 404)
    101 
    102     def test_bad_address(self):
    103         # Make sure proper exception is raised when connecting to a bogus
    104         # address.
    105 
    106         # Given that both VeriSign and various ISPs have in
    107         # the past or are presently hijacking various invalid
    108         # domain name requests in an attempt to boost traffic
    109         # to their own sites, finding a domain name to use
    110         # for this test is difficult.  RFC2606 leads one to
    111         # believe that '.invalid' should work, but experience
    112         # seemed to indicate otherwise.  Single character
    113         # TLDs are likely to remain invalid, so this seems to
    114         # be the best choice. The trailing '.' prevents a
    115         # related problem: The normal DNS resolver appends
    116         # the domain names from the search path if there is
    117         # no '.' the end and, and if one of those domains
    118         # implements a '*' rule a result is returned.
    119         # However, none of this will prevent the test from
    120         # failing if the ISP hijacks all invalid domain
    121         # requests.  The real solution would be to be able to
    122         # parameterize the framework with a mock resolver.
    123         bogus_domain = "sadflkjsasf.i.nvali.d."
    124         try:
    125             socket.gethostbyname(bogus_domain)
    126         except OSError:
    127             # socket.gaierror is too narrow, since getaddrinfo() may also
    128             # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04),
    129             # i.e. Python's TimeoutError.
    130             pass
    131         else:
    132             # This happens with some overzealous DNS providers such as OpenDNS
    133             self.skipTest("%r should not resolve for test to work" % bogus_domain)
    134         failure_explanation = ('opening an invalid URL did not raise OSError; '
    135                                'can be caused by a broken DNS server '
    136                                '(e.g. returns 404 or hijacks page)')
    137         with self.assertRaises(OSError, msg=failure_explanation):
    138             urllib.request.urlopen("http://{}/".format(bogus_domain))
    139 
    140 
    141 class urlretrieveNetworkTests(unittest.TestCase):
    142     """Tests urllib.request.urlretrieve using the network."""
    143 
    144     @contextlib.contextmanager
    145     def urlretrieve(self, *args, **kwargs):
    146         resource = args[0]
    147         with support.transient_internet(resource):
    148             file_location, info = urllib.request.urlretrieve(*args, **kwargs)
    149             try:
    150                 yield file_location, info
    151             finally:
    152                 support.unlink(file_location)
    153 
    154     def test_basic(self):
    155         # Test basic functionality.
    156         with self.urlretrieve(self.logo) as (file_location, info):
    157             self.assertTrue(os.path.exists(file_location), "file location returned by"
    158                             " urlretrieve is not a valid path")
    159             with open(file_location, 'rb') as f:
    160                 self.assertTrue(f.read(), "reading from the file location returned"
    161                                 " by urlretrieve failed")
    162 
    163     def test_specified_path(self):
    164         # Make sure that specifying the location of the file to write to works.
    165         with self.urlretrieve(self.logo,
    166                               support.TESTFN) as (file_location, info):
    167             self.assertEqual(file_location, support.TESTFN)
    168             self.assertTrue(os.path.exists(file_location))
    169             with open(file_location, 'rb') as f:
    170                 self.assertTrue(f.read(), "reading from temporary file failed")
    171 
    172     def test_header(self):
    173         # Make sure header returned as 2nd value from urlretrieve is good.
    174         with self.urlretrieve(self.logo) as (file_location, info):
    175             self.assertIsInstance(info, email.message.Message,
    176                                   "info is not an instance of email.message.Message")
    177 
    178     logo = "http://www.pythontest.net/"
    179 
    180     def test_data_header(self):
    181         with self.urlretrieve(self.logo) as (file_location, fileheaders):
    182             datevalue = fileheaders.get('Date')
    183             dateformat = '%a, %d %b %Y %H:%M:%S GMT'
    184             try:
    185                 time.strptime(datevalue, dateformat)
    186             except ValueError:
    187                 self.fail('Date value not in %r format' % dateformat)
    188 
    189     def test_reporthook(self):
    190         records = []
    191         def recording_reporthook(blocks, block_size, total_size):
    192             records.append((blocks, block_size, total_size))
    193 
    194         with self.urlretrieve(self.logo, reporthook=recording_reporthook) as (
    195                 file_location, fileheaders):
    196             expected_size = int(fileheaders['Content-Length'])
    197 
    198         records_repr = repr(records)  # For use in error messages.
    199         self.assertGreater(len(records), 1, msg="There should always be two "
    200                            "calls; the first one before the transfer starts.")
    201         self.assertEqual(records[0][0], 0)
    202         self.assertGreater(records[0][1], 0,
    203                            msg="block size can't be 0 in %s" % records_repr)
    204         self.assertEqual(records[0][2], expected_size)
    205         self.assertEqual(records[-1][2], expected_size)
    206 
    207         block_sizes = {block_size for _, block_size, _ in records}
    208         self.assertEqual({records[0][1]}, block_sizes,
    209                          msg="block sizes in %s must be equal" % records_repr)
    210         self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size,
    211                                 msg="number of blocks * block size must be"
    212                                 " >= total size in %s" % records_repr)
    213 
    214 
    215 if __name__ == "__main__":
    216     unittest.main()
    217