1 import httplib 2 import array 3 import httplib 4 import StringIO 5 import socket 6 import errno 7 8 import unittest 9 TestCase = unittest.TestCase 10 11 from test import test_support 12 13 HOST = test_support.HOST 14 15 class FakeSocket: 16 def __init__(self, text, fileclass=StringIO.StringIO): 17 self.text = text 18 self.fileclass = fileclass 19 self.data = '' 20 21 def sendall(self, data): 22 self.data += ''.join(data) 23 24 def makefile(self, mode, bufsize=None): 25 if mode != 'r' and mode != 'rb': 26 raise httplib.UnimplementedFileMode() 27 return self.fileclass(self.text) 28 29 class EPipeSocket(FakeSocket): 30 31 def __init__(self, text, pipe_trigger): 32 # When sendall() is called with pipe_trigger, raise EPIPE. 33 FakeSocket.__init__(self, text) 34 self.pipe_trigger = pipe_trigger 35 36 def sendall(self, data): 37 if self.pipe_trigger in data: 38 raise socket.error(errno.EPIPE, "gotcha") 39 self.data += data 40 41 def close(self): 42 pass 43 44 class NoEOFStringIO(StringIO.StringIO): 45 """Like StringIO, but raises AssertionError on EOF. 46 47 This is used below to test that httplib doesn't try to read 48 more from the underlying file than it should. 49 """ 50 def read(self, n=-1): 51 data = StringIO.StringIO.read(self, n) 52 if data == '': 53 raise AssertionError('caller tried to read past EOF') 54 return data 55 56 def readline(self, length=None): 57 data = StringIO.StringIO.readline(self, length) 58 if data == '': 59 raise AssertionError('caller tried to read past EOF') 60 return data 61 62 63 class HeaderTests(TestCase): 64 def test_auto_headers(self): 65 # Some headers are added automatically, but should not be added by 66 # .request() if they are explicitly set. 67 68 class HeaderCountingBuffer(list): 69 def __init__(self): 70 self.count = {} 71 def append(self, item): 72 kv = item.split(':') 73 if len(kv) > 1: 74 # item is a 'Key: Value' header string 75 lcKey = kv[0].lower() 76 self.count.setdefault(lcKey, 0) 77 self.count[lcKey] += 1 78 list.append(self, item) 79 80 for explicit_header in True, False: 81 for header in 'Content-length', 'Host', 'Accept-encoding': 82 conn = httplib.HTTPConnection('example.com') 83 conn.sock = FakeSocket('blahblahblah') 84 conn._buffer = HeaderCountingBuffer() 85 86 body = 'spamspamspam' 87 headers = {} 88 if explicit_header: 89 headers[header] = str(len(body)) 90 conn.request('POST', '/', body, headers) 91 self.assertEqual(conn._buffer.count[header.lower()], 1) 92 93 def test_putheader(self): 94 conn = httplib.HTTPConnection('example.com') 95 conn.sock = FakeSocket(None) 96 conn.putrequest('GET','/') 97 conn.putheader('Content-length',42) 98 self.assertTrue('Content-length: 42' in conn._buffer) 99 100 def test_ipv6host_header(self): 101 # Default host header on IPv6 transaction should wrapped by [] if 102 # its actual IPv6 address 103 expected = 'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \ 104 'Accept-Encoding: identity\r\n\r\n' 105 conn = httplib.HTTPConnection('[2001::]:81') 106 sock = FakeSocket('') 107 conn.sock = sock 108 conn.request('GET', '/foo') 109 self.assertTrue(sock.data.startswith(expected)) 110 111 expected = 'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \ 112 'Accept-Encoding: identity\r\n\r\n' 113 conn = httplib.HTTPConnection('[2001:102A::]') 114 sock = FakeSocket('') 115 conn.sock = sock 116 conn.request('GET', '/foo') 117 self.assertTrue(sock.data.startswith(expected)) 118 119 120 class BasicTest(TestCase): 121 def test_status_lines(self): 122 # Test HTTP status lines 123 124 body = "HTTP/1.1 200 Ok\r\n\r\nText" 125 sock = FakeSocket(body) 126 resp = httplib.HTTPResponse(sock) 127 resp.begin() 128 self.assertEqual(resp.read(), 'Text') 129 self.assertTrue(resp.isclosed()) 130 131 body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" 132 sock = FakeSocket(body) 133 resp = httplib.HTTPResponse(sock) 134 self.assertRaises(httplib.BadStatusLine, resp.begin) 135 136 def test_bad_status_repr(self): 137 exc = httplib.BadStatusLine('') 138 self.assertEqual(repr(exc), '''BadStatusLine("\'\'",)''') 139 140 def test_partial_reads(self): 141 # if we have a lenght, the system knows when to close itself 142 # same behaviour than when we read the whole thing with read() 143 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 144 sock = FakeSocket(body) 145 resp = httplib.HTTPResponse(sock) 146 resp.begin() 147 self.assertEqual(resp.read(2), 'Te') 148 self.assertFalse(resp.isclosed()) 149 self.assertEqual(resp.read(2), 'xt') 150 self.assertTrue(resp.isclosed()) 151 152 def test_host_port(self): 153 # Check invalid host_port 154 155 for hp in ("www.python.org:abc", "www.python.org:"): 156 self.assertRaises(httplib.InvalidURL, httplib.HTTP, hp) 157 158 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b", 159 8000), 160 ("www.python.org:80", "www.python.org", 80), 161 ("www.python.org", "www.python.org", 80), 162 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)): 163 http = httplib.HTTP(hp) 164 c = http._conn 165 if h != c.host: 166 self.fail("Host incorrectly parsed: %s != %s" % (h, c.host)) 167 if p != c.port: 168 self.fail("Port incorrectly parsed: %s != %s" % (p, c.host)) 169 170 def test_response_headers(self): 171 # test response with multiple message headers with the same field name. 172 text = ('HTTP/1.1 200 OK\r\n' 173 'Set-Cookie: Customer="WILE_E_COYOTE";' 174 ' Version="1"; Path="/acme"\r\n' 175 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' 176 ' Path="/acme"\r\n' 177 '\r\n' 178 'No body\r\n') 179 hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' 180 ', ' 181 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') 182 s = FakeSocket(text) 183 r = httplib.HTTPResponse(s) 184 r.begin() 185 cookies = r.getheader("Set-Cookie") 186 if cookies != hdr: 187 self.fail("multiple headers not combined properly") 188 189 def test_read_head(self): 190 # Test that the library doesn't attempt to read any data 191 # from a HEAD request. (Tickles SF bug #622042.) 192 sock = FakeSocket( 193 'HTTP/1.1 200 OK\r\n' 194 'Content-Length: 14432\r\n' 195 '\r\n', 196 NoEOFStringIO) 197 resp = httplib.HTTPResponse(sock, method="HEAD") 198 resp.begin() 199 if resp.read() != "": 200 self.fail("Did not expect response from HEAD request") 201 202 def test_send_file(self): 203 expected = 'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \ 204 'Accept-Encoding: identity\r\nContent-Length:' 205 206 body = open(__file__, 'rb') 207 conn = httplib.HTTPConnection('example.com') 208 sock = FakeSocket(body) 209 conn.sock = sock 210 conn.request('GET', '/foo', body) 211 self.assertTrue(sock.data.startswith(expected)) 212 213 def test_send(self): 214 expected = 'this is a test this is only a test' 215 conn = httplib.HTTPConnection('example.com') 216 sock = FakeSocket(None) 217 conn.sock = sock 218 conn.send(expected) 219 self.assertEqual(expected, sock.data) 220 sock.data = '' 221 conn.send(array.array('c', expected)) 222 self.assertEqual(expected, sock.data) 223 sock.data = '' 224 conn.send(StringIO.StringIO(expected)) 225 self.assertEqual(expected, sock.data) 226 227 def test_chunked(self): 228 chunked_start = ( 229 'HTTP/1.1 200 OK\r\n' 230 'Transfer-Encoding: chunked\r\n\r\n' 231 'a\r\n' 232 'hello worl\r\n' 233 '1\r\n' 234 'd\r\n' 235 ) 236 sock = FakeSocket(chunked_start + '0\r\n') 237 resp = httplib.HTTPResponse(sock, method="GET") 238 resp.begin() 239 self.assertEqual(resp.read(), 'hello world') 240 resp.close() 241 242 for x in ('', 'foo\r\n'): 243 sock = FakeSocket(chunked_start + x) 244 resp = httplib.HTTPResponse(sock, method="GET") 245 resp.begin() 246 try: 247 resp.read() 248 except httplib.IncompleteRead, i: 249 self.assertEqual(i.partial, 'hello world') 250 self.assertEqual(repr(i),'IncompleteRead(11 bytes read)') 251 self.assertEqual(str(i),'IncompleteRead(11 bytes read)') 252 else: 253 self.fail('IncompleteRead expected') 254 finally: 255 resp.close() 256 257 def test_chunked_head(self): 258 chunked_start = ( 259 'HTTP/1.1 200 OK\r\n' 260 'Transfer-Encoding: chunked\r\n\r\n' 261 'a\r\n' 262 'hello world\r\n' 263 '1\r\n' 264 'd\r\n' 265 ) 266 sock = FakeSocket(chunked_start + '0\r\n') 267 resp = httplib.HTTPResponse(sock, method="HEAD") 268 resp.begin() 269 self.assertEqual(resp.read(), '') 270 self.assertEqual(resp.status, 200) 271 self.assertEqual(resp.reason, 'OK') 272 self.assertTrue(resp.isclosed()) 273 274 def test_negative_content_length(self): 275 sock = FakeSocket('HTTP/1.1 200 OK\r\n' 276 'Content-Length: -1\r\n\r\nHello\r\n') 277 resp = httplib.HTTPResponse(sock, method="GET") 278 resp.begin() 279 self.assertEqual(resp.read(), 'Hello\r\n') 280 resp.close() 281 282 def test_incomplete_read(self): 283 sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n') 284 resp = httplib.HTTPResponse(sock, method="GET") 285 resp.begin() 286 try: 287 resp.read() 288 except httplib.IncompleteRead as i: 289 self.assertEqual(i.partial, 'Hello\r\n') 290 self.assertEqual(repr(i), 291 "IncompleteRead(7 bytes read, 3 more expected)") 292 self.assertEqual(str(i), 293 "IncompleteRead(7 bytes read, 3 more expected)") 294 else: 295 self.fail('IncompleteRead expected') 296 finally: 297 resp.close() 298 299 def test_epipe(self): 300 sock = EPipeSocket( 301 "HTTP/1.0 401 Authorization Required\r\n" 302 "Content-type: text/html\r\n" 303 "WWW-Authenticate: Basic realm=\"example\"\r\n", 304 b"Content-Length") 305 conn = httplib.HTTPConnection("example.com") 306 conn.sock = sock 307 self.assertRaises(socket.error, 308 lambda: conn.request("PUT", "/url", "body")) 309 resp = conn.getresponse() 310 self.assertEqual(401, resp.status) 311 self.assertEqual("Basic realm=\"example\"", 312 resp.getheader("www-authenticate")) 313 314 def test_filenoattr(self): 315 # Just test the fileno attribute in the HTTPResponse Object. 316 body = "HTTP/1.1 200 Ok\r\n\r\nText" 317 sock = FakeSocket(body) 318 resp = httplib.HTTPResponse(sock) 319 self.assertTrue(hasattr(resp,'fileno'), 320 'HTTPResponse should expose a fileno attribute') 321 322 # Test lines overflowing the max line size (_MAXLINE in http.client) 323 324 def test_overflowing_status_line(self): 325 self.skipTest("disabled for HTTP 0.9 support") 326 body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n" 327 resp = httplib.HTTPResponse(FakeSocket(body)) 328 self.assertRaises((httplib.LineTooLong, httplib.BadStatusLine), resp.begin) 329 330 def test_overflowing_header_line(self): 331 body = ( 332 'HTTP/1.1 200 OK\r\n' 333 'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n' 334 ) 335 resp = httplib.HTTPResponse(FakeSocket(body)) 336 self.assertRaises(httplib.LineTooLong, resp.begin) 337 338 def test_overflowing_chunked_line(self): 339 body = ( 340 'HTTP/1.1 200 OK\r\n' 341 'Transfer-Encoding: chunked\r\n\r\n' 342 + '0' * 65536 + 'a\r\n' 343 'hello world\r\n' 344 '0\r\n' 345 ) 346 resp = httplib.HTTPResponse(FakeSocket(body)) 347 resp.begin() 348 self.assertRaises(httplib.LineTooLong, resp.read) 349 350 351 class OfflineTest(TestCase): 352 def test_responses(self): 353 self.assertEqual(httplib.responses[httplib.NOT_FOUND], "Not Found") 354 355 356 class SourceAddressTest(TestCase): 357 def setUp(self): 358 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 359 self.port = test_support.bind_port(self.serv) 360 self.source_port = test_support.find_unused_port() 361 self.serv.listen(5) 362 self.conn = None 363 364 def tearDown(self): 365 if self.conn: 366 self.conn.close() 367 self.conn = None 368 self.serv.close() 369 self.serv = None 370 371 def testHTTPConnectionSourceAddress(self): 372 self.conn = httplib.HTTPConnection(HOST, self.port, 373 source_address=('', self.source_port)) 374 self.conn.connect() 375 self.assertEqual(self.conn.sock.getsockname()[1], self.source_port) 376 377 @unittest.skipIf(not hasattr(httplib, 'HTTPSConnection'), 378 'httplib.HTTPSConnection not defined') 379 def testHTTPSConnectionSourceAddress(self): 380 self.conn = httplib.HTTPSConnection(HOST, self.port, 381 source_address=('', self.source_port)) 382 # We don't test anything here other the constructor not barfing as 383 # this code doesn't deal with setting up an active running SSL server 384 # for an ssl_wrapped connect() to actually return from. 385 386 387 class TimeoutTest(TestCase): 388 PORT = None 389 390 def setUp(self): 391 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 392 TimeoutTest.PORT = test_support.bind_port(self.serv) 393 self.serv.listen(5) 394 395 def tearDown(self): 396 self.serv.close() 397 self.serv = None 398 399 def testTimeoutAttribute(self): 400 '''This will prove that the timeout gets through 401 HTTPConnection and into the socket. 402 ''' 403 # default -- use global socket timeout 404 self.assertTrue(socket.getdefaulttimeout() is None) 405 socket.setdefaulttimeout(30) 406 try: 407 httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT) 408 httpConn.connect() 409 finally: 410 socket.setdefaulttimeout(None) 411 self.assertEqual(httpConn.sock.gettimeout(), 30) 412 httpConn.close() 413 414 # no timeout -- do not use global socket default 415 self.assertTrue(socket.getdefaulttimeout() is None) 416 socket.setdefaulttimeout(30) 417 try: 418 httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT, 419 timeout=None) 420 httpConn.connect() 421 finally: 422 socket.setdefaulttimeout(None) 423 self.assertEqual(httpConn.sock.gettimeout(), None) 424 httpConn.close() 425 426 # a value 427 httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30) 428 httpConn.connect() 429 self.assertEqual(httpConn.sock.gettimeout(), 30) 430 httpConn.close() 431 432 433 class HTTPSTimeoutTest(TestCase): 434 # XXX Here should be tests for HTTPS, there isn't any right now! 435 436 def test_attributes(self): 437 # simple test to check it's storing it 438 if hasattr(httplib, 'HTTPSConnection'): 439 h = httplib.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30) 440 self.assertEqual(h.timeout, 30) 441 442 def test_main(verbose=None): 443 test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest, 444 HTTPSTimeoutTest, SourceAddressTest) 445 446 if __name__ == '__main__': 447 test_main() 448