1 import unittest 2 from test import support 3 from test.test_urllib2 import sanepathname2url 4 5 import os 6 import socket 7 import urllib.error 8 import urllib.request 9 import sys 10 11 support.requires("network") 12 13 TIMEOUT = 60 # seconds 14 15 16 def _retry_thrice(func, exc, *args, **kwargs): 17 for i in range(3): 18 try: 19 return func(*args, **kwargs) 20 except exc as e: 21 last_exc = e 22 continue 23 raise last_exc 24 25 def _wrap_with_retry_thrice(func, exc): 26 def wrapped(*args, **kwargs): 27 return _retry_thrice(func, exc, *args, **kwargs) 28 return wrapped 29 30 # Connecting to remote hosts is flaky. Make it more robust by retrying 31 # the connection several times. 32 _urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen, 33 urllib.error.URLError) 34 35 36 class AuthTests(unittest.TestCase): 37 """Tests urllib2 authentication features.""" 38 39 ## Disabled at the moment since there is no page under python.org which 40 ## could be used to HTTP authentication. 41 # 42 # def test_basic_auth(self): 43 # import http.client 44 # 45 # test_url = "http://www.python.org/test/test_urllib2/basic_auth" 46 # test_hostport = "www.python.org" 47 # test_realm = 'Test Realm' 48 # test_user = 'test.test_urllib2net' 49 # test_password = 'blah' 50 # 51 # # failure 52 # try: 53 # _urlopen_with_retry(test_url) 54 # except urllib2.HTTPError, exc: 55 # self.assertEqual(exc.code, 401) 56 # else: 57 # self.fail("urlopen() should have failed with 401") 58 # 59 # # success 60 # auth_handler = urllib2.HTTPBasicAuthHandler() 61 # auth_handler.add_password(test_realm, test_hostport, 62 # test_user, test_password) 63 # opener = urllib2.build_opener(auth_handler) 64 # f = opener.open('http://localhost/') 65 # response = _urlopen_with_retry("http://www.python.org/") 66 # 67 # # The 'userinfo' URL component is deprecated by RFC 3986 for security 68 # # reasons, let's not implement it! (it's already implemented for proxy 69 # # specification strings (that is, URLs or authorities specifying a 70 # # proxy), so we must keep that) 71 # self.assertRaises(http.client.InvalidURL, 72 # urllib2.urlopen, "http://evil:thing@example.com") 73 74 75 class CloseSocketTest(unittest.TestCase): 76 77 def test_close(self): 78 # calling .close() on urllib2's response objects should close the 79 # underlying socket 80 url = "http://www.example.com/" 81 with support.transient_internet(url): 82 response = _urlopen_with_retry(url) 83 sock = response.fp 84 self.assertFalse(sock.closed) 85 response.close() 86 self.assertTrue(sock.closed) 87 88 class OtherNetworkTests(unittest.TestCase): 89 def setUp(self): 90 if 0: # for debugging 91 import logging 92 logger = logging.getLogger("test_urllib2net") 93 logger.addHandler(logging.StreamHandler()) 94 95 # XXX The rest of these tests aren't very good -- they don't check much. 96 # They do sometimes catch some major disasters, though. 97 98 def test_ftp(self): 99 urls = [ 100 'ftp://ftp.debian.org/debian/README', 101 ('ftp://ftp.debian.org/debian/non-existent-file', 102 None, urllib.error.URLError), 103 ] 104 self._test_urls(urls, self._extra_handlers()) 105 106 def test_file(self): 107 TESTFN = support.TESTFN 108 f = open(TESTFN, 'w') 109 try: 110 f.write('hi there\n') 111 f.close() 112 urls = [ 113 'file:' + sanepathname2url(os.path.abspath(TESTFN)), 114 ('file:///nonsensename/etc/passwd', None, 115 urllib.error.URLError), 116 ] 117 self._test_urls(urls, self._extra_handlers(), retry=True) 118 finally: 119 os.remove(TESTFN) 120 121 self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file') 122 123 # XXX Following test depends on machine configurations that are internal 124 # to CNRI. Need to set up a public server with the right authentication 125 # configuration for test purposes. 126 127 ## def test_cnri(self): 128 ## if socket.gethostname() == 'bitdiddle': 129 ## localhost = 'bitdiddle.cnri.reston.va.us' 130 ## elif socket.gethostname() == 'bitdiddle.concentric.net': 131 ## localhost = 'localhost' 132 ## else: 133 ## localhost = None 134 ## if localhost is not None: 135 ## urls = [ 136 ## 'file://%s/etc/passwd' % localhost, 137 ## 'http://%s/simple/' % localhost, 138 ## 'http://%s/digest/' % localhost, 139 ## 'http://%s/not/found.h' % localhost, 140 ## ] 141 142 ## bauth = HTTPBasicAuthHandler() 143 ## bauth.add_password('basic_test_realm', localhost, 'jhylton', 144 ## 'password') 145 ## dauth = HTTPDigestAuthHandler() 146 ## dauth.add_password('digest_test_realm', localhost, 'jhylton', 147 ## 'password') 148 149 ## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) 150 151 def test_urlwithfrag(self): 152 urlwith_frag = "http://www.pythontest.net/index.html#frag" 153 with support.transient_internet(urlwith_frag): 154 req = urllib.request.Request(urlwith_frag) 155 res = urllib.request.urlopen(req) 156 self.assertEqual(res.geturl(), 157 "http://www.pythontest.net/index.html#frag") 158 159 def test_redirect_url_withfrag(self): 160 redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/" 161 with support.transient_internet(redirect_url_with_frag): 162 req = urllib.request.Request(redirect_url_with_frag) 163 res = urllib.request.urlopen(req) 164 self.assertEqual(res.geturl(), 165 "http://www.pythontest.net/elsewhere/#frag") 166 167 def test_custom_headers(self): 168 url = "http://www.example.com" 169 with support.transient_internet(url): 170 opener = urllib.request.build_opener() 171 request = urllib.request.Request(url) 172 self.assertFalse(request.header_items()) 173 opener.open(request) 174 self.assertTrue(request.header_items()) 175 self.assertTrue(request.has_header('User-agent')) 176 request.add_header('User-Agent','Test-Agent') 177 opener.open(request) 178 self.assertEqual(request.get_header('User-agent'),'Test-Agent') 179 180 def test_sites_no_connection_close(self): 181 # Some sites do not send Connection: close header. 182 # Verify that those work properly. (#issue12576) 183 184 URL = 'http://www.imdb.com' # mangles Connection:close 185 186 with support.transient_internet(URL): 187 try: 188 with urllib.request.urlopen(URL) as res: 189 pass 190 except ValueError as e: 191 self.fail("urlopen failed for site not sending \ 192 Connection:close") 193 else: 194 self.assertTrue(res) 195 196 req = urllib.request.urlopen(URL) 197 res = req.read() 198 self.assertTrue(res) 199 200 def _test_urls(self, urls, handlers, retry=True): 201 import time 202 import logging 203 debug = logging.getLogger("test_urllib2").debug 204 205 urlopen = urllib.request.build_opener(*handlers).open 206 if retry: 207 urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError) 208 209 for url in urls: 210 with self.subTest(url=url): 211 if isinstance(url, tuple): 212 url, req, expected_err = url 213 else: 214 req = expected_err = None 215 216 with support.transient_internet(url): 217 try: 218 f = urlopen(url, req, TIMEOUT) 219 # urllib.error.URLError is a subclass of OSError 220 except OSError as err: 221 if expected_err: 222 msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" % 223 (expected_err, url, req, type(err), err)) 224 self.assertIsInstance(err, expected_err, msg) 225 else: 226 raise 227 else: 228 try: 229 with support.time_out, \ 230 support.socket_peer_reset, \ 231 support.ioerror_peer_reset: 232 buf = f.read() 233 debug("read %d bytes" % len(buf)) 234 except socket.timeout: 235 print("<timeout: %s>" % url, file=sys.stderr) 236 f.close() 237 time.sleep(0.1) 238 239 def _extra_handlers(self): 240 handlers = [] 241 242 cfh = urllib.request.CacheFTPHandler() 243 self.addCleanup(cfh.clear_cache) 244 cfh.setTimeout(1) 245 handlers.append(cfh) 246 247 return handlers 248 249 250 class TimeoutTest(unittest.TestCase): 251 def test_http_basic(self): 252 self.assertIsNone(socket.getdefaulttimeout()) 253 url = "http://www.example.com" 254 with support.transient_internet(url, timeout=None): 255 u = _urlopen_with_retry(url) 256 self.addCleanup(u.close) 257 self.assertIsNone(u.fp.raw._sock.gettimeout()) 258 259 def test_http_default_timeout(self): 260 self.assertIsNone(socket.getdefaulttimeout()) 261 url = "http://www.example.com" 262 with support.transient_internet(url): 263 socket.setdefaulttimeout(60) 264 try: 265 u = _urlopen_with_retry(url) 266 self.addCleanup(u.close) 267 finally: 268 socket.setdefaulttimeout(None) 269 self.assertEqual(u.fp.raw._sock.gettimeout(), 60) 270 271 def test_http_no_timeout(self): 272 self.assertIsNone(socket.getdefaulttimeout()) 273 url = "http://www.example.com" 274 with support.transient_internet(url): 275 socket.setdefaulttimeout(60) 276 try: 277 u = _urlopen_with_retry(url, timeout=None) 278 self.addCleanup(u.close) 279 finally: 280 socket.setdefaulttimeout(None) 281 self.assertIsNone(u.fp.raw._sock.gettimeout()) 282 283 def test_http_timeout(self): 284 url = "http://www.example.com" 285 with support.transient_internet(url): 286 u = _urlopen_with_retry(url, timeout=120) 287 self.addCleanup(u.close) 288 self.assertEqual(u.fp.raw._sock.gettimeout(), 120) 289 290 FTP_HOST = 'ftp://ftp.debian.org/debian/' 291 292 def test_ftp_basic(self): 293 self.assertIsNone(socket.getdefaulttimeout()) 294 with support.transient_internet(self.FTP_HOST, timeout=None): 295 u = _urlopen_with_retry(self.FTP_HOST) 296 self.addCleanup(u.close) 297 self.assertIsNone(u.fp.fp.raw._sock.gettimeout()) 298 299 def test_ftp_default_timeout(self): 300 self.assertIsNone(socket.getdefaulttimeout()) 301 with support.transient_internet(self.FTP_HOST): 302 socket.setdefaulttimeout(60) 303 try: 304 u = _urlopen_with_retry(self.FTP_HOST) 305 self.addCleanup(u.close) 306 finally: 307 socket.setdefaulttimeout(None) 308 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60) 309 310 def test_ftp_no_timeout(self): 311 self.assertIsNone(socket.getdefaulttimeout()) 312 with support.transient_internet(self.FTP_HOST): 313 socket.setdefaulttimeout(60) 314 try: 315 u = _urlopen_with_retry(self.FTP_HOST, timeout=None) 316 self.addCleanup(u.close) 317 finally: 318 socket.setdefaulttimeout(None) 319 self.assertIsNone(u.fp.fp.raw._sock.gettimeout()) 320 321 def test_ftp_timeout(self): 322 with support.transient_internet(self.FTP_HOST): 323 u = _urlopen_with_retry(self.FTP_HOST, timeout=60) 324 self.addCleanup(u.close) 325 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60) 326 327 328 if __name__ == "__main__": 329 unittest.main() 330