1 import errno 2 from http import client 3 import io 4 import itertools 5 import os 6 import array 7 import socket 8 9 import unittest 10 TestCase = unittest.TestCase 11 12 from test import support 13 14 here = os.path.dirname(__file__) 15 # Self-signed cert file for 'localhost' 16 CERT_localhost = os.path.join(here, 'keycert.pem') 17 # Self-signed cert file for 'fakehostname' 18 CERT_fakehostname = os.path.join(here, 'keycert2.pem') 19 # Self-signed cert file for self-signed.pythontest.net 20 CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem') 21 22 # constants for testing chunked encoding 23 chunked_start = ( 24 'HTTP/1.1 200 OK\r\n' 25 'Transfer-Encoding: chunked\r\n\r\n' 26 'a\r\n' 27 'hello worl\r\n' 28 '3\r\n' 29 'd! \r\n' 30 '8\r\n' 31 'and now \r\n' 32 '22\r\n' 33 'for something completely different\r\n' 34 ) 35 chunked_expected = b'hello world! and now for something completely different' 36 chunk_extension = ";foo=bar" 37 last_chunk = "0\r\n" 38 last_chunk_extended = "0" + chunk_extension + "\r\n" 39 trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" 40 chunked_end = "\r\n" 41 42 HOST = support.HOST 43 44 class FakeSocket: 45 def __init__(self, text, fileclass=io.BytesIO, host=None, port=None): 46 if isinstance(text, str): 47 text = text.encode("ascii") 48 self.text = text 49 self.fileclass = fileclass 50 self.data = b'' 51 self.sendall_calls = 0 52 self.file_closed = False 53 self.host = host 54 self.port = port 55 56 def sendall(self, data): 57 self.sendall_calls += 1 58 self.data += data 59 60 def makefile(self, mode, bufsize=None): 61 if mode != 'r' and mode != 'rb': 62 raise client.UnimplementedFileMode() 63 # keep the file around so we can check how much was read from it 64 self.file = self.fileclass(self.text) 65 self.file.close = self.file_close #nerf close () 66 return self.file 67 68 def file_close(self): 69 self.file_closed = True 70 71 def close(self): 72 pass 73 74 def setsockopt(self, level, optname, value): 75 pass 76 77 class EPipeSocket(FakeSocket): 78 79 def __init__(self, text, pipe_trigger): 80 # When sendall() is called with pipe_trigger, raise EPIPE. 81 FakeSocket.__init__(self, text) 82 self.pipe_trigger = pipe_trigger 83 84 def sendall(self, data): 85 if self.pipe_trigger in data: 86 raise OSError(errno.EPIPE, "gotcha") 87 self.data += data 88 89 def close(self): 90 pass 91 92 class NoEOFBytesIO(io.BytesIO): 93 """Like BytesIO, but raises AssertionError on EOF. 94 95 This is used below to test that http.client doesn't try to read 96 more from the underlying file than it should. 97 """ 98 def read(self, n=-1): 99 data = io.BytesIO.read(self, n) 100 if data == b'': 101 raise AssertionError('caller tried to read past EOF') 102 return data 103 104 def readline(self, length=None): 105 data = io.BytesIO.readline(self, length) 106 if data == b'': 107 raise AssertionError('caller tried to read past EOF') 108 return data 109 110 class FakeSocketHTTPConnection(client.HTTPConnection): 111 """HTTPConnection subclass using FakeSocket; counts connect() calls""" 112 113 def __init__(self, *args): 114 self.connections = 0 115 super().__init__('example.com') 116 self.fake_socket_args = args 117 self._create_connection = self.create_connection 118 119 def connect(self): 120 """Count the number of times connect() is invoked""" 121 self.connections += 1 122 return super().connect() 123 124 def create_connection(self, *pos, **kw): 125 return FakeSocket(*self.fake_socket_args) 126 127 class HeaderTests(TestCase): 128 def test_auto_headers(self): 129 # Some headers are added automatically, but should not be added by 130 # .request() if they are explicitly set. 131 132 class HeaderCountingBuffer(list): 133 def __init__(self): 134 self.count = {} 135 def append(self, item): 136 kv = item.split(b':') 137 if len(kv) > 1: 138 # item is a 'Key: Value' header string 139 lcKey = kv[0].decode('ascii').lower() 140 self.count.setdefault(lcKey, 0) 141 self.count[lcKey] += 1 142 list.append(self, item) 143 144 for explicit_header in True, False: 145 for header in 'Content-length', 'Host', 'Accept-encoding': 146 conn = client.HTTPConnection('example.com') 147 conn.sock = FakeSocket('blahblahblah') 148 conn._buffer = HeaderCountingBuffer() 149 150 body = 'spamspamspam' 151 headers = {} 152 if explicit_header: 153 headers[header] = str(len(body)) 154 conn.request('POST', '/', body, headers) 155 self.assertEqual(conn._buffer.count[header.lower()], 1) 156 157 def test_content_length_0(self): 158 159 class ContentLengthChecker(list): 160 def __init__(self): 161 list.__init__(self) 162 self.content_length = None 163 def append(self, item): 164 kv = item.split(b':', 1) 165 if len(kv) > 1 and kv[0].lower() == b'content-length': 166 self.content_length = kv[1].strip() 167 list.append(self, item) 168 169 # Here, we're testing that methods expecting a body get a 170 # content-length set to zero if the body is empty (either None or '') 171 bodies = (None, '') 172 methods_with_body = ('PUT', 'POST', 'PATCH') 173 for method, body in itertools.product(methods_with_body, bodies): 174 conn = client.HTTPConnection('example.com') 175 conn.sock = FakeSocket(None) 176 conn._buffer = ContentLengthChecker() 177 conn.request(method, '/', body) 178 self.assertEqual( 179 conn._buffer.content_length, b'0', 180 'Header Content-Length incorrect on {}'.format(method) 181 ) 182 183 # For these methods, we make sure that content-length is not set when 184 # the body is None because it might cause unexpected behaviour on the 185 # server. 186 methods_without_body = ( 187 'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE', 188 ) 189 for method in methods_without_body: 190 conn = client.HTTPConnection('example.com') 191 conn.sock = FakeSocket(None) 192 conn._buffer = ContentLengthChecker() 193 conn.request(method, '/', None) 194 self.assertEqual( 195 conn._buffer.content_length, None, 196 'Header Content-Length set for empty body on {}'.format(method) 197 ) 198 199 # If the body is set to '', that's considered to be "present but 200 # empty" rather than "missing", so content length would be set, even 201 # for methods that don't expect a body. 202 for method in methods_without_body: 203 conn = client.HTTPConnection('example.com') 204 conn.sock = FakeSocket(None) 205 conn._buffer = ContentLengthChecker() 206 conn.request(method, '/', '') 207 self.assertEqual( 208 conn._buffer.content_length, b'0', 209 'Header Content-Length incorrect on {}'.format(method) 210 ) 211 212 # If the body is set, make sure Content-Length is set. 213 for method in itertools.chain(methods_without_body, methods_with_body): 214 conn = client.HTTPConnection('example.com') 215 conn.sock = FakeSocket(None) 216 conn._buffer = ContentLengthChecker() 217 conn.request(method, '/', ' ') 218 self.assertEqual( 219 conn._buffer.content_length, b'1', 220 'Header Content-Length incorrect on {}'.format(method) 221 ) 222 223 def test_putheader(self): 224 conn = client.HTTPConnection('example.com') 225 conn.sock = FakeSocket(None) 226 conn.putrequest('GET','/') 227 conn.putheader('Content-length', 42) 228 self.assertIn(b'Content-length: 42', conn._buffer) 229 230 conn.putheader('Foo', ' bar ') 231 self.assertIn(b'Foo: bar ', conn._buffer) 232 conn.putheader('Bar', '\tbaz\t') 233 self.assertIn(b'Bar: \tbaz\t', conn._buffer) 234 conn.putheader('Authorization', 'Bearer mytoken') 235 self.assertIn(b'Authorization: Bearer mytoken', conn._buffer) 236 conn.putheader('IterHeader', 'IterA', 'IterB') 237 self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer) 238 conn.putheader('LatinHeader', b'\xFF') 239 self.assertIn(b'LatinHeader: \xFF', conn._buffer) 240 conn.putheader('Utf8Header', b'\xc3\x80') 241 self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer) 242 conn.putheader('C1-Control', b'next\x85line') 243 self.assertIn(b'C1-Control: next\x85line', conn._buffer) 244 conn.putheader('Embedded-Fold-Space', 'is\r\n allowed') 245 self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer) 246 conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed') 247 self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer) 248 conn.putheader('Key Space', 'value') 249 self.assertIn(b'Key Space: value', conn._buffer) 250 conn.putheader('KeySpace ', 'value') 251 self.assertIn(b'KeySpace : value', conn._buffer) 252 conn.putheader(b'Nonbreak\xa0Space', 'value') 253 self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer) 254 conn.putheader(b'\xa0NonbreakSpace', 'value') 255 self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer) 256 257 def test_ipv6host_header(self): 258 # Default host header on IPv6 transaction should be wrapped by [] if 259 # it is an IPv6 address 260 expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \ 261 b'Accept-Encoding: identity\r\n\r\n' 262 conn = client.HTTPConnection('[2001::]:81') 263 sock = FakeSocket('') 264 conn.sock = sock 265 conn.request('GET', '/foo') 266 self.assertTrue(sock.data.startswith(expected)) 267 268 expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \ 269 b'Accept-Encoding: identity\r\n\r\n' 270 conn = client.HTTPConnection('[2001:102A::]') 271 sock = FakeSocket('') 272 conn.sock = sock 273 conn.request('GET', '/foo') 274 self.assertTrue(sock.data.startswith(expected)) 275 276 def test_malformed_headers_coped_with(self): 277 # Issue 19996 278 body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n" 279 sock = FakeSocket(body) 280 resp = client.HTTPResponse(sock) 281 resp.begin() 282 283 self.assertEqual(resp.getheader('First'), 'val') 284 self.assertEqual(resp.getheader('Second'), 'val') 285 286 def test_parse_all_octets(self): 287 # Ensure no valid header field octet breaks the parser 288 body = ( 289 b'HTTP/1.1 200 OK\r\n' 290 b"!#$%&'*+-.^_`|~: value\r\n" # Special token characters 291 b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n' 292 b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n' 293 b'obs-fold: text\r\n' 294 b' folded with space\r\n' 295 b'\tfolded with tab\r\n' 296 b'Content-Length: 0\r\n' 297 b'\r\n' 298 ) 299 sock = FakeSocket(body) 300 resp = client.HTTPResponse(sock) 301 resp.begin() 302 self.assertEqual(resp.getheader('Content-Length'), '0') 303 self.assertEqual(resp.msg['Content-Length'], '0') 304 self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value') 305 self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value') 306 vchar = ''.join(map(chr, range(0x21, 0x7E + 1))) 307 self.assertEqual(resp.getheader('VCHAR'), vchar) 308 self.assertEqual(resp.msg['VCHAR'], vchar) 309 self.assertIsNotNone(resp.getheader('obs-text')) 310 self.assertIn('obs-text', resp.msg) 311 for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']): 312 self.assertTrue(folded.startswith('text')) 313 self.assertIn(' folded with space', folded) 314 self.assertTrue(folded.endswith('folded with tab')) 315 316 def test_invalid_headers(self): 317 conn = client.HTTPConnection('example.com') 318 conn.sock = FakeSocket('') 319 conn.putrequest('GET', '/') 320 321 # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no 322 # longer allowed in header names 323 cases = ( 324 (b'Invalid\r\nName', b'ValidValue'), 325 (b'Invalid\rName', b'ValidValue'), 326 (b'Invalid\nName', b'ValidValue'), 327 (b'\r\nInvalidName', b'ValidValue'), 328 (b'\rInvalidName', b'ValidValue'), 329 (b'\nInvalidName', b'ValidValue'), 330 (b' InvalidName', b'ValidValue'), 331 (b'\tInvalidName', b'ValidValue'), 332 (b'Invalid:Name', b'ValidValue'), 333 (b':InvalidName', b'ValidValue'), 334 (b'ValidName', b'Invalid\r\nValue'), 335 (b'ValidName', b'Invalid\rValue'), 336 (b'ValidName', b'Invalid\nValue'), 337 (b'ValidName', b'InvalidValue\r\n'), 338 (b'ValidName', b'InvalidValue\r'), 339 (b'ValidName', b'InvalidValue\n'), 340 ) 341 for name, value in cases: 342 with self.subTest((name, value)): 343 with self.assertRaisesRegex(ValueError, 'Invalid header'): 344 conn.putheader(name, value) 345 346 347 class TransferEncodingTest(TestCase): 348 expected_body = b"It's just a flesh wound" 349 350 def test_endheaders_chunked(self): 351 conn = client.HTTPConnection('example.com') 352 conn.sock = FakeSocket(b'') 353 conn.putrequest('POST', '/') 354 conn.endheaders(self._make_body(), encode_chunked=True) 355 356 _, _, body = self._parse_request(conn.sock.data) 357 body = self._parse_chunked(body) 358 self.assertEqual(body, self.expected_body) 359 360 def test_explicit_headers(self): 361 # explicit chunked 362 conn = client.HTTPConnection('example.com') 363 conn.sock = FakeSocket(b'') 364 # this shouldn't actually be automatically chunk-encoded because the 365 # calling code has explicitly stated that it's taking care of it 366 conn.request( 367 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'}) 368 369 _, headers, body = self._parse_request(conn.sock.data) 370 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 371 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 372 self.assertEqual(body, self.expected_body) 373 374 # explicit chunked, string body 375 conn = client.HTTPConnection('example.com') 376 conn.sock = FakeSocket(b'') 377 conn.request( 378 'POST', '/', self.expected_body.decode('latin-1'), 379 {'Transfer-Encoding': 'chunked'}) 380 381 _, headers, body = self._parse_request(conn.sock.data) 382 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 383 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 384 self.assertEqual(body, self.expected_body) 385 386 # User-specified TE, but request() does the chunk encoding 387 conn = client.HTTPConnection('example.com') 388 conn.sock = FakeSocket(b'') 389 conn.request('POST', '/', 390 headers={'Transfer-Encoding': 'gzip, chunked'}, 391 encode_chunked=True, 392 body=self._make_body()) 393 _, headers, body = self._parse_request(conn.sock.data) 394 self.assertNotIn('content-length', [k.lower() for k in headers]) 395 self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked') 396 self.assertEqual(self._parse_chunked(body), self.expected_body) 397 398 def test_request(self): 399 for empty_lines in (False, True,): 400 conn = client.HTTPConnection('example.com') 401 conn.sock = FakeSocket(b'') 402 conn.request( 403 'POST', '/', self._make_body(empty_lines=empty_lines)) 404 405 _, headers, body = self._parse_request(conn.sock.data) 406 body = self._parse_chunked(body) 407 self.assertEqual(body, self.expected_body) 408 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 409 410 # Content-Length and Transfer-Encoding SHOULD not be sent in the 411 # same request 412 self.assertNotIn('content-length', [k.lower() for k in headers]) 413 414 def test_empty_body(self): 415 # Zero-length iterable should be treated like any other iterable 416 conn = client.HTTPConnection('example.com') 417 conn.sock = FakeSocket(b'') 418 conn.request('POST', '/', ()) 419 _, headers, body = self._parse_request(conn.sock.data) 420 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 421 self.assertNotIn('content-length', [k.lower() for k in headers]) 422 self.assertEqual(body, b"0\r\n\r\n") 423 424 def _make_body(self, empty_lines=False): 425 lines = self.expected_body.split(b' ') 426 for idx, line in enumerate(lines): 427 # for testing handling empty lines 428 if empty_lines and idx % 2: 429 yield b'' 430 if idx < len(lines) - 1: 431 yield line + b' ' 432 else: 433 yield line 434 435 def _parse_request(self, data): 436 lines = data.split(b'\r\n') 437 request = lines[0] 438 headers = {} 439 n = 1 440 while n < len(lines) and len(lines[n]) > 0: 441 key, val = lines[n].split(b':') 442 key = key.decode('latin-1').strip() 443 headers[key] = val.decode('latin-1').strip() 444 n += 1 445 446 return request, headers, b'\r\n'.join(lines[n + 1:]) 447 448 def _parse_chunked(self, data): 449 body = [] 450 trailers = {} 451 n = 0 452 lines = data.split(b'\r\n') 453 # parse body 454 while True: 455 size, chunk = lines[n:n+2] 456 size = int(size, 16) 457 458 if size == 0: 459 n += 1 460 break 461 462 self.assertEqual(size, len(chunk)) 463 body.append(chunk) 464 465 n += 2 466 # we /should/ hit the end chunk, but check against the size of 467 # lines so we're not stuck in an infinite loop should we get 468 # malformed data 469 if n > len(lines): 470 break 471 472 return b''.join(body) 473 474 475 class BasicTest(TestCase): 476 def test_status_lines(self): 477 # Test HTTP status lines 478 479 body = "HTTP/1.1 200 Ok\r\n\r\nText" 480 sock = FakeSocket(body) 481 resp = client.HTTPResponse(sock) 482 resp.begin() 483 self.assertEqual(resp.read(0), b'') # Issue #20007 484 self.assertFalse(resp.isclosed()) 485 self.assertFalse(resp.closed) 486 self.assertEqual(resp.read(), b"Text") 487 self.assertTrue(resp.isclosed()) 488 self.assertFalse(resp.closed) 489 resp.close() 490 self.assertTrue(resp.closed) 491 492 body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" 493 sock = FakeSocket(body) 494 resp = client.HTTPResponse(sock) 495 self.assertRaises(client.BadStatusLine, resp.begin) 496 497 def test_bad_status_repr(self): 498 exc = client.BadStatusLine('') 499 self.assertEqual(repr(exc), '''BadStatusLine("\'\'",)''') 500 501 def test_partial_reads(self): 502 # if we have Content-Length, HTTPResponse knows when to close itself, 503 # the same behaviour as when we read the whole thing with read() 504 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 505 sock = FakeSocket(body) 506 resp = client.HTTPResponse(sock) 507 resp.begin() 508 self.assertEqual(resp.read(2), b'Te') 509 self.assertFalse(resp.isclosed()) 510 self.assertEqual(resp.read(2), b'xt') 511 self.assertTrue(resp.isclosed()) 512 self.assertFalse(resp.closed) 513 resp.close() 514 self.assertTrue(resp.closed) 515 516 def test_mixed_reads(self): 517 # readline() should update the remaining length, so that read() knows 518 # how much data is left and does not raise IncompleteRead 519 body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother" 520 sock = FakeSocket(body) 521 resp = client.HTTPResponse(sock) 522 resp.begin() 523 self.assertEqual(resp.readline(), b'Text\r\n') 524 self.assertFalse(resp.isclosed()) 525 self.assertEqual(resp.read(), b'Another') 526 self.assertTrue(resp.isclosed()) 527 self.assertFalse(resp.closed) 528 resp.close() 529 self.assertTrue(resp.closed) 530 531 def test_partial_readintos(self): 532 # if we have Content-Length, HTTPResponse knows when to close itself, 533 # the same behaviour as when we read the whole thing with read() 534 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 535 sock = FakeSocket(body) 536 resp = client.HTTPResponse(sock) 537 resp.begin() 538 b = bytearray(2) 539 n = resp.readinto(b) 540 self.assertEqual(n, 2) 541 self.assertEqual(bytes(b), b'Te') 542 self.assertFalse(resp.isclosed()) 543 n = resp.readinto(b) 544 self.assertEqual(n, 2) 545 self.assertEqual(bytes(b), b'xt') 546 self.assertTrue(resp.isclosed()) 547 self.assertFalse(resp.closed) 548 resp.close() 549 self.assertTrue(resp.closed) 550 551 def test_partial_reads_no_content_length(self): 552 # when no length is present, the socket should be gracefully closed when 553 # all data was read 554 body = "HTTP/1.1 200 Ok\r\n\r\nText" 555 sock = FakeSocket(body) 556 resp = client.HTTPResponse(sock) 557 resp.begin() 558 self.assertEqual(resp.read(2), b'Te') 559 self.assertFalse(resp.isclosed()) 560 self.assertEqual(resp.read(2), b'xt') 561 self.assertEqual(resp.read(1), b'') 562 self.assertTrue(resp.isclosed()) 563 self.assertFalse(resp.closed) 564 resp.close() 565 self.assertTrue(resp.closed) 566 567 def test_partial_readintos_no_content_length(self): 568 # when no length is present, the socket should be gracefully closed when 569 # all data was read 570 body = "HTTP/1.1 200 Ok\r\n\r\nText" 571 sock = FakeSocket(body) 572 resp = client.HTTPResponse(sock) 573 resp.begin() 574 b = bytearray(2) 575 n = resp.readinto(b) 576 self.assertEqual(n, 2) 577 self.assertEqual(bytes(b), b'Te') 578 self.assertFalse(resp.isclosed()) 579 n = resp.readinto(b) 580 self.assertEqual(n, 2) 581 self.assertEqual(bytes(b), b'xt') 582 n = resp.readinto(b) 583 self.assertEqual(n, 0) 584 self.assertTrue(resp.isclosed()) 585 586 def test_partial_reads_incomplete_body(self): 587 # if the server shuts down the connection before the whole 588 # content-length is delivered, the socket is gracefully closed 589 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 590 sock = FakeSocket(body) 591 resp = client.HTTPResponse(sock) 592 resp.begin() 593 self.assertEqual(resp.read(2), b'Te') 594 self.assertFalse(resp.isclosed()) 595 self.assertEqual(resp.read(2), b'xt') 596 self.assertEqual(resp.read(1), b'') 597 self.assertTrue(resp.isclosed()) 598 599 def test_partial_readintos_incomplete_body(self): 600 # if the server shuts down the connection before the whole 601 # content-length is delivered, the socket is gracefully closed 602 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 603 sock = FakeSocket(body) 604 resp = client.HTTPResponse(sock) 605 resp.begin() 606 b = bytearray(2) 607 n = resp.readinto(b) 608 self.assertEqual(n, 2) 609 self.assertEqual(bytes(b), b'Te') 610 self.assertFalse(resp.isclosed()) 611 n = resp.readinto(b) 612 self.assertEqual(n, 2) 613 self.assertEqual(bytes(b), b'xt') 614 n = resp.readinto(b) 615 self.assertEqual(n, 0) 616 self.assertTrue(resp.isclosed()) 617 self.assertFalse(resp.closed) 618 resp.close() 619 self.assertTrue(resp.closed) 620 621 def test_host_port(self): 622 # Check invalid host_port 623 624 for hp in ("www.python.org:abc", "user:password (at] www.python.org"): 625 self.assertRaises(client.InvalidURL, client.HTTPConnection, hp) 626 627 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 628 "fe80::207:e9ff:fe9b", 8000), 629 ("www.python.org:80", "www.python.org", 80), 630 ("www.python.org:", "www.python.org", 80), 631 ("www.python.org", "www.python.org", 80), 632 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80), 633 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)): 634 c = client.HTTPConnection(hp) 635 self.assertEqual(h, c.host) 636 self.assertEqual(p, c.port) 637 638 def test_response_headers(self): 639 # test response with multiple message headers with the same field name. 640 text = ('HTTP/1.1 200 OK\r\n' 641 'Set-Cookie: Customer="WILE_E_COYOTE"; ' 642 'Version="1"; Path="/acme"\r\n' 643 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' 644 ' Path="/acme"\r\n' 645 '\r\n' 646 'No body\r\n') 647 hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' 648 ', ' 649 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') 650 s = FakeSocket(text) 651 r = client.HTTPResponse(s) 652 r.begin() 653 cookies = r.getheader("Set-Cookie") 654 self.assertEqual(cookies, hdr) 655 656 def test_read_head(self): 657 # Test that the library doesn't attempt to read any data 658 # from a HEAD request. (Tickles SF bug #622042.) 659 sock = FakeSocket( 660 'HTTP/1.1 200 OK\r\n' 661 'Content-Length: 14432\r\n' 662 '\r\n', 663 NoEOFBytesIO) 664 resp = client.HTTPResponse(sock, method="HEAD") 665 resp.begin() 666 if resp.read(): 667 self.fail("Did not expect response from HEAD request") 668 669 def test_readinto_head(self): 670 # Test that the library doesn't attempt to read any data 671 # from a HEAD request. (Tickles SF bug #622042.) 672 sock = FakeSocket( 673 'HTTP/1.1 200 OK\r\n' 674 'Content-Length: 14432\r\n' 675 '\r\n', 676 NoEOFBytesIO) 677 resp = client.HTTPResponse(sock, method="HEAD") 678 resp.begin() 679 b = bytearray(5) 680 if resp.readinto(b) != 0: 681 self.fail("Did not expect response from HEAD request") 682 self.assertEqual(bytes(b), b'\x00'*5) 683 684 def test_too_many_headers(self): 685 headers = '\r\n'.join('Header%d: foo' % i 686 for i in range(client._MAXHEADERS + 1)) + '\r\n' 687 text = ('HTTP/1.1 200 OK\r\n' + headers) 688 s = FakeSocket(text) 689 r = client.HTTPResponse(s) 690 self.assertRaisesRegex(client.HTTPException, 691 r"got more than \d+ headers", r.begin) 692 693 def test_send_file(self): 694 expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' 695 b'Accept-Encoding: identity\r\n' 696 b'Transfer-Encoding: chunked\r\n' 697 b'\r\n') 698 699 with open(__file__, 'rb') as body: 700 conn = client.HTTPConnection('example.com') 701 sock = FakeSocket(body) 702 conn.sock = sock 703 conn.request('GET', '/foo', body) 704 self.assertTrue(sock.data.startswith(expected), '%r != %r' % 705 (sock.data[:len(expected)], expected)) 706 707 def test_send(self): 708 expected = b'this is a test this is only a test' 709 conn = client.HTTPConnection('example.com') 710 sock = FakeSocket(None) 711 conn.sock = sock 712 conn.send(expected) 713 self.assertEqual(expected, sock.data) 714 sock.data = b'' 715 conn.send(array.array('b', expected)) 716 self.assertEqual(expected, sock.data) 717 sock.data = b'' 718 conn.send(io.BytesIO(expected)) 719 self.assertEqual(expected, sock.data) 720 721 def test_send_updating_file(self): 722 def data(): 723 yield 'data' 724 yield None 725 yield 'data_two' 726 727 class UpdatingFile(io.TextIOBase): 728 mode = 'r' 729 d = data() 730 def read(self, blocksize=-1): 731 return next(self.d) 732 733 expected = b'data' 734 735 conn = client.HTTPConnection('example.com') 736 sock = FakeSocket("") 737 conn.sock = sock 738 conn.send(UpdatingFile()) 739 self.assertEqual(sock.data, expected) 740 741 742 def test_send_iter(self): 743 expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \ 744 b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \ 745 b'\r\nonetwothree' 746 747 def body(): 748 yield b"one" 749 yield b"two" 750 yield b"three" 751 752 conn = client.HTTPConnection('example.com') 753 sock = FakeSocket("") 754 conn.sock = sock 755 conn.request('GET', '/foo', body(), {'Content-Length': '11'}) 756 self.assertEqual(sock.data, expected) 757 758 def test_send_type_error(self): 759 # See: Issue #12676 760 conn = client.HTTPConnection('example.com') 761 conn.sock = FakeSocket('') 762 with self.assertRaises(TypeError): 763 conn.request('POST', 'test', conn) 764 765 def test_chunked(self): 766 expected = chunked_expected 767 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 768 resp = client.HTTPResponse(sock, method="GET") 769 resp.begin() 770 self.assertEqual(resp.read(), expected) 771 resp.close() 772 773 # Various read sizes 774 for n in range(1, 12): 775 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 776 resp = client.HTTPResponse(sock, method="GET") 777 resp.begin() 778 self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected) 779 resp.close() 780 781 for x in ('', 'foo\r\n'): 782 sock = FakeSocket(chunked_start + x) 783 resp = client.HTTPResponse(sock, method="GET") 784 resp.begin() 785 try: 786 resp.read() 787 except client.IncompleteRead as i: 788 self.assertEqual(i.partial, expected) 789 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 790 self.assertEqual(repr(i), expected_message) 791 self.assertEqual(str(i), expected_message) 792 else: 793 self.fail('IncompleteRead expected') 794 finally: 795 resp.close() 796 797 def test_readinto_chunked(self): 798 799 expected = chunked_expected 800 nexpected = len(expected) 801 b = bytearray(128) 802 803 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 804 resp = client.HTTPResponse(sock, method="GET") 805 resp.begin() 806 n = resp.readinto(b) 807 self.assertEqual(b[:nexpected], expected) 808 self.assertEqual(n, nexpected) 809 resp.close() 810 811 # Various read sizes 812 for n in range(1, 12): 813 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 814 resp = client.HTTPResponse(sock, method="GET") 815 resp.begin() 816 m = memoryview(b) 817 i = resp.readinto(m[0:n]) 818 i += resp.readinto(m[i:n + i]) 819 i += resp.readinto(m[i:]) 820 self.assertEqual(b[:nexpected], expected) 821 self.assertEqual(i, nexpected) 822 resp.close() 823 824 for x in ('', 'foo\r\n'): 825 sock = FakeSocket(chunked_start + x) 826 resp = client.HTTPResponse(sock, method="GET") 827 resp.begin() 828 try: 829 n = resp.readinto(b) 830 except client.IncompleteRead as i: 831 self.assertEqual(i.partial, expected) 832 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 833 self.assertEqual(repr(i), expected_message) 834 self.assertEqual(str(i), expected_message) 835 else: 836 self.fail('IncompleteRead expected') 837 finally: 838 resp.close() 839 840 def test_chunked_head(self): 841 chunked_start = ( 842 'HTTP/1.1 200 OK\r\n' 843 'Transfer-Encoding: chunked\r\n\r\n' 844 'a\r\n' 845 'hello world\r\n' 846 '1\r\n' 847 'd\r\n' 848 ) 849 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 850 resp = client.HTTPResponse(sock, method="HEAD") 851 resp.begin() 852 self.assertEqual(resp.read(), b'') 853 self.assertEqual(resp.status, 200) 854 self.assertEqual(resp.reason, 'OK') 855 self.assertTrue(resp.isclosed()) 856 self.assertFalse(resp.closed) 857 resp.close() 858 self.assertTrue(resp.closed) 859 860 def test_readinto_chunked_head(self): 861 chunked_start = ( 862 'HTTP/1.1 200 OK\r\n' 863 'Transfer-Encoding: chunked\r\n\r\n' 864 'a\r\n' 865 'hello world\r\n' 866 '1\r\n' 867 'd\r\n' 868 ) 869 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 870 resp = client.HTTPResponse(sock, method="HEAD") 871 resp.begin() 872 b = bytearray(5) 873 n = resp.readinto(b) 874 self.assertEqual(n, 0) 875 self.assertEqual(bytes(b), b'\x00'*5) 876 self.assertEqual(resp.status, 200) 877 self.assertEqual(resp.reason, 'OK') 878 self.assertTrue(resp.isclosed()) 879 self.assertFalse(resp.closed) 880 resp.close() 881 self.assertTrue(resp.closed) 882 883 def test_negative_content_length(self): 884 sock = FakeSocket( 885 'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n') 886 resp = client.HTTPResponse(sock, method="GET") 887 resp.begin() 888 self.assertEqual(resp.read(), b'Hello\r\n') 889 self.assertTrue(resp.isclosed()) 890 891 def test_incomplete_read(self): 892 sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n') 893 resp = client.HTTPResponse(sock, method="GET") 894 resp.begin() 895 try: 896 resp.read() 897 except client.IncompleteRead as i: 898 self.assertEqual(i.partial, b'Hello\r\n') 899 self.assertEqual(repr(i), 900 "IncompleteRead(7 bytes read, 3 more expected)") 901 self.assertEqual(str(i), 902 "IncompleteRead(7 bytes read, 3 more expected)") 903 self.assertTrue(resp.isclosed()) 904 else: 905 self.fail('IncompleteRead expected') 906 907 def test_epipe(self): 908 sock = EPipeSocket( 909 "HTTP/1.0 401 Authorization Required\r\n" 910 "Content-type: text/html\r\n" 911 "WWW-Authenticate: Basic realm=\"example\"\r\n", 912 b"Content-Length") 913 conn = client.HTTPConnection("example.com") 914 conn.sock = sock 915 self.assertRaises(OSError, 916 lambda: conn.request("PUT", "/url", "body")) 917 resp = conn.getresponse() 918 self.assertEqual(401, resp.status) 919 self.assertEqual("Basic realm=\"example\"", 920 resp.getheader("www-authenticate")) 921 922 # Test lines overflowing the max line size (_MAXLINE in http.client) 923 924 def test_overflowing_status_line(self): 925 body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n" 926 resp = client.HTTPResponse(FakeSocket(body)) 927 self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin) 928 929 def test_overflowing_header_line(self): 930 body = ( 931 'HTTP/1.1 200 OK\r\n' 932 'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n' 933 ) 934 resp = client.HTTPResponse(FakeSocket(body)) 935 self.assertRaises(client.LineTooLong, resp.begin) 936 937 def test_overflowing_chunked_line(self): 938 body = ( 939 'HTTP/1.1 200 OK\r\n' 940 'Transfer-Encoding: chunked\r\n\r\n' 941 + '0' * 65536 + 'a\r\n' 942 'hello world\r\n' 943 '0\r\n' 944 '\r\n' 945 ) 946 resp = client.HTTPResponse(FakeSocket(body)) 947 resp.begin() 948 self.assertRaises(client.LineTooLong, resp.read) 949 950 def test_early_eof(self): 951 # Test httpresponse with no \r\n termination, 952 body = "HTTP/1.1 200 Ok" 953 sock = FakeSocket(body) 954 resp = client.HTTPResponse(sock) 955 resp.begin() 956 self.assertEqual(resp.read(), b'') 957 self.assertTrue(resp.isclosed()) 958 self.assertFalse(resp.closed) 959 resp.close() 960 self.assertTrue(resp.closed) 961 962 def test_error_leak(self): 963 # Test that the socket is not leaked if getresponse() fails 964 conn = client.HTTPConnection('example.com') 965 response = None 966 class Response(client.HTTPResponse): 967 def __init__(self, *pos, **kw): 968 nonlocal response 969 response = self # Avoid garbage collector closing the socket 970 client.HTTPResponse.__init__(self, *pos, **kw) 971 conn.response_class = Response 972 conn.sock = FakeSocket('Invalid status line') 973 conn.request('GET', '/') 974 self.assertRaises(client.BadStatusLine, conn.getresponse) 975 self.assertTrue(response.closed) 976 self.assertTrue(conn.sock.file_closed) 977 978 def test_chunked_extension(self): 979 extra = '3;foo=bar\r\n' + 'abc\r\n' 980 expected = chunked_expected + b'abc' 981 982 sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end) 983 resp = client.HTTPResponse(sock, method="GET") 984 resp.begin() 985 self.assertEqual(resp.read(), expected) 986 resp.close() 987 988 def test_chunked_missing_end(self): 989 """some servers may serve up a short chunked encoding stream""" 990 expected = chunked_expected 991 sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf 992 resp = client.HTTPResponse(sock, method="GET") 993 resp.begin() 994 self.assertEqual(resp.read(), expected) 995 resp.close() 996 997 def test_chunked_trailers(self): 998 """See that trailers are read and ignored""" 999 expected = chunked_expected 1000 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end) 1001 resp = client.HTTPResponse(sock, method="GET") 1002 resp.begin() 1003 self.assertEqual(resp.read(), expected) 1004 # we should have reached the end of the file 1005 self.assertEqual(sock.file.read(), b"") #we read to the end 1006 resp.close() 1007 1008 def test_chunked_sync(self): 1009 """Check that we don't read past the end of the chunked-encoding stream""" 1010 expected = chunked_expected 1011 extradata = "extradata" 1012 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata) 1013 resp = client.HTTPResponse(sock, method="GET") 1014 resp.begin() 1015 self.assertEqual(resp.read(), expected) 1016 # the file should now have our extradata ready to be read 1017 self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end 1018 resp.close() 1019 1020 def test_content_length_sync(self): 1021 """Check that we don't read past the end of the Content-Length stream""" 1022 extradata = b"extradata" 1023 expected = b"Hello123\r\n" 1024 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1025 resp = client.HTTPResponse(sock, method="GET") 1026 resp.begin() 1027 self.assertEqual(resp.read(), expected) 1028 # the file should now have our extradata ready to be read 1029 self.assertEqual(sock.file.read(), extradata) #we read to the end 1030 resp.close() 1031 1032 def test_readlines_content_length(self): 1033 extradata = b"extradata" 1034 expected = b"Hello123\r\n" 1035 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1036 resp = client.HTTPResponse(sock, method="GET") 1037 resp.begin() 1038 self.assertEqual(resp.readlines(2000), [expected]) 1039 # the file should now have our extradata ready to be read 1040 self.assertEqual(sock.file.read(), extradata) #we read to the end 1041 resp.close() 1042 1043 def test_read1_content_length(self): 1044 extradata = b"extradata" 1045 expected = b"Hello123\r\n" 1046 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1047 resp = client.HTTPResponse(sock, method="GET") 1048 resp.begin() 1049 self.assertEqual(resp.read1(2000), expected) 1050 # the file should now have our extradata ready to be read 1051 self.assertEqual(sock.file.read(), extradata) #we read to the end 1052 resp.close() 1053 1054 def test_readline_bound_content_length(self): 1055 extradata = b"extradata" 1056 expected = b"Hello123\r\n" 1057 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1058 resp = client.HTTPResponse(sock, method="GET") 1059 resp.begin() 1060 self.assertEqual(resp.readline(10), expected) 1061 self.assertEqual(resp.readline(10), b"") 1062 # the file should now have our extradata ready to be read 1063 self.assertEqual(sock.file.read(), extradata) #we read to the end 1064 resp.close() 1065 1066 def test_read1_bound_content_length(self): 1067 extradata = b"extradata" 1068 expected = b"Hello123\r\n" 1069 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata) 1070 resp = client.HTTPResponse(sock, method="GET") 1071 resp.begin() 1072 self.assertEqual(resp.read1(20), expected*2) 1073 self.assertEqual(resp.read(), expected) 1074 # the file should now have our extradata ready to be read 1075 self.assertEqual(sock.file.read(), extradata) #we read to the end 1076 resp.close() 1077 1078 def test_response_fileno(self): 1079 # Make sure fd returned by fileno is valid. 1080 threading = support.import_module("threading") 1081 1082 serv = socket.socket( 1083 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) 1084 self.addCleanup(serv.close) 1085 serv.bind((HOST, 0)) 1086 serv.listen() 1087 1088 result = None 1089 def run_server(): 1090 [conn, address] = serv.accept() 1091 with conn, conn.makefile("rb") as reader: 1092 # Read the request header until a blank line 1093 while True: 1094 line = reader.readline() 1095 if not line.rstrip(b"\r\n"): 1096 break 1097 conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n") 1098 nonlocal result 1099 result = reader.read() 1100 1101 thread = threading.Thread(target=run_server) 1102 thread.start() 1103 self.addCleanup(thread.join, float(1)) 1104 conn = client.HTTPConnection(*serv.getsockname()) 1105 conn.request("CONNECT", "dummy:1234") 1106 response = conn.getresponse() 1107 try: 1108 self.assertEqual(response.status, client.OK) 1109 s = socket.socket(fileno=response.fileno()) 1110 try: 1111 s.sendall(b"proxied data\n") 1112 finally: 1113 s.detach() 1114 finally: 1115 response.close() 1116 conn.close() 1117 thread.join() 1118 self.assertEqual(result, b"proxied data\n") 1119 1120 class ExtendedReadTest(TestCase): 1121 """ 1122 Test peek(), read1(), readline() 1123 """ 1124 lines = ( 1125 'HTTP/1.1 200 OK\r\n' 1126 '\r\n' 1127 'hello world!\n' 1128 'and now \n' 1129 'for something completely different\n' 1130 'foo' 1131 ) 1132 lines_expected = lines[lines.find('hello'):].encode("ascii") 1133 lines_chunked = ( 1134 'HTTP/1.1 200 OK\r\n' 1135 'Transfer-Encoding: chunked\r\n\r\n' 1136 'a\r\n' 1137 'hello worl\r\n' 1138 '3\r\n' 1139 'd!\n\r\n' 1140 '9\r\n' 1141 'and now \n\r\n' 1142 '23\r\n' 1143 'for something completely different\n\r\n' 1144 '3\r\n' 1145 'foo\r\n' 1146 '0\r\n' # terminating chunk 1147 '\r\n' # end of trailers 1148 ) 1149 1150 def setUp(self): 1151 sock = FakeSocket(self.lines) 1152 resp = client.HTTPResponse(sock, method="GET") 1153 resp.begin() 1154 resp.fp = io.BufferedReader(resp.fp) 1155 self.resp = resp 1156 1157 1158 1159 def test_peek(self): 1160 resp = self.resp 1161 # patch up the buffered peek so that it returns not too much stuff 1162 oldpeek = resp.fp.peek 1163 def mypeek(n=-1): 1164 p = oldpeek(n) 1165 if n >= 0: 1166 return p[:n] 1167 return p[:10] 1168 resp.fp.peek = mypeek 1169 1170 all = [] 1171 while True: 1172 # try a short peek 1173 p = resp.peek(3) 1174 if p: 1175 self.assertGreater(len(p), 0) 1176 # then unbounded peek 1177 p2 = resp.peek() 1178 self.assertGreaterEqual(len(p2), len(p)) 1179 self.assertTrue(p2.startswith(p)) 1180 next = resp.read(len(p2)) 1181 self.assertEqual(next, p2) 1182 else: 1183 next = resp.read() 1184 self.assertFalse(next) 1185 all.append(next) 1186 if not next: 1187 break 1188 self.assertEqual(b"".join(all), self.lines_expected) 1189 1190 def test_readline(self): 1191 resp = self.resp 1192 self._verify_readline(self.resp.readline, self.lines_expected) 1193 1194 def _verify_readline(self, readline, expected): 1195 all = [] 1196 while True: 1197 # short readlines 1198 line = readline(5) 1199 if line and line != b"foo": 1200 if len(line) < 5: 1201 self.assertTrue(line.endswith(b"\n")) 1202 all.append(line) 1203 if not line: 1204 break 1205 self.assertEqual(b"".join(all), expected) 1206 1207 def test_read1(self): 1208 resp = self.resp 1209 def r(): 1210 res = resp.read1(4) 1211 self.assertLessEqual(len(res), 4) 1212 return res 1213 readliner = Readliner(r) 1214 self._verify_readline(readliner.readline, self.lines_expected) 1215 1216 def test_read1_unbounded(self): 1217 resp = self.resp 1218 all = [] 1219 while True: 1220 data = resp.read1() 1221 if not data: 1222 break 1223 all.append(data) 1224 self.assertEqual(b"".join(all), self.lines_expected) 1225 1226 def test_read1_bounded(self): 1227 resp = self.resp 1228 all = [] 1229 while True: 1230 data = resp.read1(10) 1231 if not data: 1232 break 1233 self.assertLessEqual(len(data), 10) 1234 all.append(data) 1235 self.assertEqual(b"".join(all), self.lines_expected) 1236 1237 def test_read1_0(self): 1238 self.assertEqual(self.resp.read1(0), b"") 1239 1240 def test_peek_0(self): 1241 p = self.resp.peek(0) 1242 self.assertLessEqual(0, len(p)) 1243 1244 class ExtendedReadTestChunked(ExtendedReadTest): 1245 """ 1246 Test peek(), read1(), readline() in chunked mode 1247 """ 1248 lines = ( 1249 'HTTP/1.1 200 OK\r\n' 1250 'Transfer-Encoding: chunked\r\n\r\n' 1251 'a\r\n' 1252 'hello worl\r\n' 1253 '3\r\n' 1254 'd!\n\r\n' 1255 '9\r\n' 1256 'and now \n\r\n' 1257 '23\r\n' 1258 'for something completely different\n\r\n' 1259 '3\r\n' 1260 'foo\r\n' 1261 '0\r\n' # terminating chunk 1262 '\r\n' # end of trailers 1263 ) 1264 1265 1266 class Readliner: 1267 """ 1268 a simple readline class that uses an arbitrary read function and buffering 1269 """ 1270 def __init__(self, readfunc): 1271 self.readfunc = readfunc 1272 self.remainder = b"" 1273 1274 def readline(self, limit): 1275 data = [] 1276 datalen = 0 1277 read = self.remainder 1278 try: 1279 while True: 1280 idx = read.find(b'\n') 1281 if idx != -1: 1282 break 1283 if datalen + len(read) >= limit: 1284 idx = limit - datalen - 1 1285 # read more data 1286 data.append(read) 1287 read = self.readfunc() 1288 if not read: 1289 idx = 0 #eof condition 1290 break 1291 idx += 1 1292 data.append(read[:idx]) 1293 self.remainder = read[idx:] 1294 return b"".join(data) 1295 except: 1296 self.remainder = b"".join(data) 1297 raise 1298 1299 1300 class OfflineTest(TestCase): 1301 def test_all(self): 1302 # Documented objects defined in the module should be in __all__ 1303 expected = {"responses"} # White-list documented dict() object 1304 # HTTPMessage, parse_headers(), and the HTTP status code constants are 1305 # intentionally omitted for simplicity 1306 blacklist = {"HTTPMessage", "parse_headers"} 1307 for name in dir(client): 1308 if name.startswith("_") or name in blacklist: 1309 continue 1310 module_object = getattr(client, name) 1311 if getattr(module_object, "__module__", None) == "http.client": 1312 expected.add(name) 1313 self.assertCountEqual(client.__all__, expected) 1314 1315 def test_responses(self): 1316 self.assertEqual(client.responses[client.NOT_FOUND], "Not Found") 1317 1318 def test_client_constants(self): 1319 # Make sure we don't break backward compatibility with 3.4 1320 expected = [ 1321 'CONTINUE', 1322 'SWITCHING_PROTOCOLS', 1323 'PROCESSING', 1324 'OK', 1325 'CREATED', 1326 'ACCEPTED', 1327 'NON_AUTHORITATIVE_INFORMATION', 1328 'NO_CONTENT', 1329 'RESET_CONTENT', 1330 'PARTIAL_CONTENT', 1331 'MULTI_STATUS', 1332 'IM_USED', 1333 'MULTIPLE_CHOICES', 1334 'MOVED_PERMANENTLY', 1335 'FOUND', 1336 'SEE_OTHER', 1337 'NOT_MODIFIED', 1338 'USE_PROXY', 1339 'TEMPORARY_REDIRECT', 1340 'BAD_REQUEST', 1341 'UNAUTHORIZED', 1342 'PAYMENT_REQUIRED', 1343 'FORBIDDEN', 1344 'NOT_FOUND', 1345 'METHOD_NOT_ALLOWED', 1346 'NOT_ACCEPTABLE', 1347 'PROXY_AUTHENTICATION_REQUIRED', 1348 'REQUEST_TIMEOUT', 1349 'CONFLICT', 1350 'GONE', 1351 'LENGTH_REQUIRED', 1352 'PRECONDITION_FAILED', 1353 'REQUEST_ENTITY_TOO_LARGE', 1354 'REQUEST_URI_TOO_LONG', 1355 'UNSUPPORTED_MEDIA_TYPE', 1356 'REQUESTED_RANGE_NOT_SATISFIABLE', 1357 'EXPECTATION_FAILED', 1358 'UNPROCESSABLE_ENTITY', 1359 'LOCKED', 1360 'FAILED_DEPENDENCY', 1361 'UPGRADE_REQUIRED', 1362 'PRECONDITION_REQUIRED', 1363 'TOO_MANY_REQUESTS', 1364 'REQUEST_HEADER_FIELDS_TOO_LARGE', 1365 'INTERNAL_SERVER_ERROR', 1366 'NOT_IMPLEMENTED', 1367 'BAD_GATEWAY', 1368 'SERVICE_UNAVAILABLE', 1369 'GATEWAY_TIMEOUT', 1370 'HTTP_VERSION_NOT_SUPPORTED', 1371 'INSUFFICIENT_STORAGE', 1372 'NOT_EXTENDED', 1373 'NETWORK_AUTHENTICATION_REQUIRED', 1374 ] 1375 for const in expected: 1376 with self.subTest(constant=const): 1377 self.assertTrue(hasattr(client, const)) 1378 1379 1380 class SourceAddressTest(TestCase): 1381 def setUp(self): 1382 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1383 self.port = support.bind_port(self.serv) 1384 self.source_port = support.find_unused_port() 1385 self.serv.listen() 1386 self.conn = None 1387 1388 def tearDown(self): 1389 if self.conn: 1390 self.conn.close() 1391 self.conn = None 1392 self.serv.close() 1393 self.serv = None 1394 1395 def testHTTPConnectionSourceAddress(self): 1396 self.conn = client.HTTPConnection(HOST, self.port, 1397 source_address=('', self.source_port)) 1398 self.conn.connect() 1399 self.assertEqual(self.conn.sock.getsockname()[1], self.source_port) 1400 1401 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1402 'http.client.HTTPSConnection not defined') 1403 def testHTTPSConnectionSourceAddress(self): 1404 self.conn = client.HTTPSConnection(HOST, self.port, 1405 source_address=('', self.source_port)) 1406 # We don't test anything here other than the constructor not barfing as 1407 # this code doesn't deal with setting up an active running SSL server 1408 # for an ssl_wrapped connect() to actually return from. 1409 1410 1411 class TimeoutTest(TestCase): 1412 PORT = None 1413 1414 def setUp(self): 1415 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1416 TimeoutTest.PORT = support.bind_port(self.serv) 1417 self.serv.listen() 1418 1419 def tearDown(self): 1420 self.serv.close() 1421 self.serv = None 1422 1423 def testTimeoutAttribute(self): 1424 # This will prove that the timeout gets through HTTPConnection 1425 # and into the socket. 1426 1427 # default -- use global socket timeout 1428 self.assertIsNone(socket.getdefaulttimeout()) 1429 socket.setdefaulttimeout(30) 1430 try: 1431 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT) 1432 httpConn.connect() 1433 finally: 1434 socket.setdefaulttimeout(None) 1435 self.assertEqual(httpConn.sock.gettimeout(), 30) 1436 httpConn.close() 1437 1438 # no timeout -- do not use global socket default 1439 self.assertIsNone(socket.getdefaulttimeout()) 1440 socket.setdefaulttimeout(30) 1441 try: 1442 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, 1443 timeout=None) 1444 httpConn.connect() 1445 finally: 1446 socket.setdefaulttimeout(None) 1447 self.assertEqual(httpConn.sock.gettimeout(), None) 1448 httpConn.close() 1449 1450 # a value 1451 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30) 1452 httpConn.connect() 1453 self.assertEqual(httpConn.sock.gettimeout(), 30) 1454 httpConn.close() 1455 1456 1457 class PersistenceTest(TestCase): 1458 1459 def test_reuse_reconnect(self): 1460 # Should reuse or reconnect depending on header from server 1461 tests = ( 1462 ('1.0', '', False), 1463 ('1.0', 'Connection: keep-alive\r\n', True), 1464 ('1.1', '', True), 1465 ('1.1', 'Connection: close\r\n', False), 1466 ('1.0', 'Connection: keep-ALIVE\r\n', True), 1467 ('1.1', 'Connection: cloSE\r\n', False), 1468 ) 1469 for version, header, reuse in tests: 1470 with self.subTest(version=version, header=header): 1471 msg = ( 1472 'HTTP/{} 200 OK\r\n' 1473 '{}' 1474 'Content-Length: 12\r\n' 1475 '\r\n' 1476 'Dummy body\r\n' 1477 ).format(version, header) 1478 conn = FakeSocketHTTPConnection(msg) 1479 self.assertIsNone(conn.sock) 1480 conn.request('GET', '/open-connection') 1481 with conn.getresponse() as response: 1482 self.assertEqual(conn.sock is None, not reuse) 1483 response.read() 1484 self.assertEqual(conn.sock is None, not reuse) 1485 self.assertEqual(conn.connections, 1) 1486 conn.request('GET', '/subsequent-request') 1487 self.assertEqual(conn.connections, 1 if reuse else 2) 1488 1489 def test_disconnected(self): 1490 1491 def make_reset_reader(text): 1492 """Return BufferedReader that raises ECONNRESET at EOF""" 1493 stream = io.BytesIO(text) 1494 def readinto(buffer): 1495 size = io.BytesIO.readinto(stream, buffer) 1496 if size == 0: 1497 raise ConnectionResetError() 1498 return size 1499 stream.readinto = readinto 1500 return io.BufferedReader(stream) 1501 1502 tests = ( 1503 (io.BytesIO, client.RemoteDisconnected), 1504 (make_reset_reader, ConnectionResetError), 1505 ) 1506 for stream_factory, exception in tests: 1507 with self.subTest(exception=exception): 1508 conn = FakeSocketHTTPConnection(b'', stream_factory) 1509 conn.request('GET', '/eof-response') 1510 self.assertRaises(exception, conn.getresponse) 1511 self.assertIsNone(conn.sock) 1512 # HTTPConnection.connect() should be automatically invoked 1513 conn.request('GET', '/reconnect') 1514 self.assertEqual(conn.connections, 2) 1515 1516 def test_100_close(self): 1517 conn = FakeSocketHTTPConnection( 1518 b'HTTP/1.1 100 Continue\r\n' 1519 b'\r\n' 1520 # Missing final response 1521 ) 1522 conn.request('GET', '/', headers={'Expect': '100-continue'}) 1523 self.assertRaises(client.RemoteDisconnected, conn.getresponse) 1524 self.assertIsNone(conn.sock) 1525 conn.request('GET', '/reconnect') 1526 self.assertEqual(conn.connections, 2) 1527 1528 1529 class HTTPSTest(TestCase): 1530 1531 def setUp(self): 1532 if not hasattr(client, 'HTTPSConnection'): 1533 self.skipTest('ssl support required') 1534 1535 def make_server(self, certfile): 1536 from test.ssl_servers import make_https_server 1537 return make_https_server(self, certfile=certfile) 1538 1539 def test_attributes(self): 1540 # simple test to check it's storing the timeout 1541 h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30) 1542 self.assertEqual(h.timeout, 30) 1543 1544 def test_networked(self): 1545 # Default settings: requires a valid cert from a trusted CA 1546 import ssl 1547 support.requires('network') 1548 with support.transient_internet('self-signed.pythontest.net'): 1549 h = client.HTTPSConnection('self-signed.pythontest.net', 443) 1550 with self.assertRaises(ssl.SSLError) as exc_info: 1551 h.request('GET', '/') 1552 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1553 1554 def test_networked_noverification(self): 1555 # Switch off cert verification 1556 import ssl 1557 support.requires('network') 1558 with support.transient_internet('self-signed.pythontest.net'): 1559 context = ssl._create_unverified_context() 1560 h = client.HTTPSConnection('self-signed.pythontest.net', 443, 1561 context=context) 1562 h.request('GET', '/') 1563 resp = h.getresponse() 1564 h.close() 1565 self.assertIn('nginx', resp.getheader('server')) 1566 resp.close() 1567 1568 @support.system_must_validate_cert 1569 def test_networked_trusted_by_default_cert(self): 1570 # Default settings: requires a valid cert from a trusted CA 1571 support.requires('network') 1572 with support.transient_internet('www.python.org'): 1573 h = client.HTTPSConnection('www.python.org', 443) 1574 h.request('GET', '/') 1575 resp = h.getresponse() 1576 content_type = resp.getheader('content-type') 1577 resp.close() 1578 h.close() 1579 self.assertIn('text/html', content_type) 1580 1581 def test_networked_good_cert(self): 1582 # We feed the server's cert as a validating cert 1583 import ssl 1584 support.requires('network') 1585 with support.transient_internet('self-signed.pythontest.net'): 1586 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1587 context.verify_mode = ssl.CERT_REQUIRED 1588 context.load_verify_locations(CERT_selfsigned_pythontestdotnet) 1589 h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context) 1590 h.request('GET', '/') 1591 resp = h.getresponse() 1592 server_string = resp.getheader('server') 1593 resp.close() 1594 h.close() 1595 self.assertIn('nginx', server_string) 1596 1597 def test_networked_bad_cert(self): 1598 # We feed a "CA" cert that is unrelated to the server's cert 1599 import ssl 1600 support.requires('network') 1601 with support.transient_internet('self-signed.pythontest.net'): 1602 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1603 context.verify_mode = ssl.CERT_REQUIRED 1604 context.load_verify_locations(CERT_localhost) 1605 h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context) 1606 with self.assertRaises(ssl.SSLError) as exc_info: 1607 h.request('GET', '/') 1608 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1609 1610 def test_local_unknown_cert(self): 1611 # The custom cert isn't known to the default trust bundle 1612 import ssl 1613 server = self.make_server(CERT_localhost) 1614 h = client.HTTPSConnection('localhost', server.port) 1615 with self.assertRaises(ssl.SSLError) as exc_info: 1616 h.request('GET', '/') 1617 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1618 1619 def test_local_good_hostname(self): 1620 # The (valid) cert validates the HTTP hostname 1621 import ssl 1622 server = self.make_server(CERT_localhost) 1623 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1624 context.verify_mode = ssl.CERT_REQUIRED 1625 context.load_verify_locations(CERT_localhost) 1626 h = client.HTTPSConnection('localhost', server.port, context=context) 1627 self.addCleanup(h.close) 1628 h.request('GET', '/nonexistent') 1629 resp = h.getresponse() 1630 self.addCleanup(resp.close) 1631 self.assertEqual(resp.status, 404) 1632 1633 def test_local_bad_hostname(self): 1634 # The (valid) cert doesn't validate the HTTP hostname 1635 import ssl 1636 server = self.make_server(CERT_fakehostname) 1637 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1638 context.verify_mode = ssl.CERT_REQUIRED 1639 context.check_hostname = True 1640 context.load_verify_locations(CERT_fakehostname) 1641 h = client.HTTPSConnection('localhost', server.port, context=context) 1642 with self.assertRaises(ssl.CertificateError): 1643 h.request('GET', '/') 1644 # Same with explicit check_hostname=True 1645 with support.check_warnings(('', DeprecationWarning)): 1646 h = client.HTTPSConnection('localhost', server.port, 1647 context=context, check_hostname=True) 1648 with self.assertRaises(ssl.CertificateError): 1649 h.request('GET', '/') 1650 # With check_hostname=False, the mismatching is ignored 1651 context.check_hostname = False 1652 with support.check_warnings(('', DeprecationWarning)): 1653 h = client.HTTPSConnection('localhost', server.port, 1654 context=context, check_hostname=False) 1655 h.request('GET', '/nonexistent') 1656 resp = h.getresponse() 1657 resp.close() 1658 h.close() 1659 self.assertEqual(resp.status, 404) 1660 # The context's check_hostname setting is used if one isn't passed to 1661 # HTTPSConnection. 1662 context.check_hostname = False 1663 h = client.HTTPSConnection('localhost', server.port, context=context) 1664 h.request('GET', '/nonexistent') 1665 resp = h.getresponse() 1666 self.assertEqual(resp.status, 404) 1667 resp.close() 1668 h.close() 1669 # Passing check_hostname to HTTPSConnection should override the 1670 # context's setting. 1671 with support.check_warnings(('', DeprecationWarning)): 1672 h = client.HTTPSConnection('localhost', server.port, 1673 context=context, check_hostname=True) 1674 with self.assertRaises(ssl.CertificateError): 1675 h.request('GET', '/') 1676 1677 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1678 'http.client.HTTPSConnection not available') 1679 def test_host_port(self): 1680 # Check invalid host_port 1681 1682 for hp in ("www.python.org:abc", "user:password (at] www.python.org"): 1683 self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp) 1684 1685 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 1686 "fe80::207:e9ff:fe9b", 8000), 1687 ("www.python.org:443", "www.python.org", 443), 1688 ("www.python.org:", "www.python.org", 443), 1689 ("www.python.org", "www.python.org", 443), 1690 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443), 1691 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 1692 443)): 1693 c = client.HTTPSConnection(hp) 1694 self.assertEqual(h, c.host) 1695 self.assertEqual(p, c.port) 1696 1697 1698 class RequestBodyTest(TestCase): 1699 """Test cases where a request includes a message body.""" 1700 1701 def setUp(self): 1702 self.conn = client.HTTPConnection('example.com') 1703 self.conn.sock = self.sock = FakeSocket("") 1704 self.conn.sock = self.sock 1705 1706 def get_headers_and_fp(self): 1707 f = io.BytesIO(self.sock.data) 1708 f.readline() # read the request line 1709 message = client.parse_headers(f) 1710 return message, f 1711 1712 def test_list_body(self): 1713 # Note that no content-length is automatically calculated for 1714 # an iterable. The request will fall back to send chunked 1715 # transfer encoding. 1716 cases = ( 1717 ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1718 ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1719 ) 1720 for body, expected in cases: 1721 with self.subTest(body): 1722 self.conn = client.HTTPConnection('example.com') 1723 self.conn.sock = self.sock = FakeSocket('') 1724 1725 self.conn.request('PUT', '/url', body) 1726 msg, f = self.get_headers_and_fp() 1727 self.assertNotIn('Content-Type', msg) 1728 self.assertNotIn('Content-Length', msg) 1729 self.assertEqual(msg.get('Transfer-Encoding'), 'chunked') 1730 self.assertEqual(expected, f.read()) 1731 1732 def test_manual_content_length(self): 1733 # Set an incorrect content-length so that we can verify that 1734 # it will not be over-ridden by the library. 1735 self.conn.request("PUT", "/url", "body", 1736 {"Content-Length": "42"}) 1737 message, f = self.get_headers_and_fp() 1738 self.assertEqual("42", message.get("content-length")) 1739 self.assertEqual(4, len(f.read())) 1740 1741 def test_ascii_body(self): 1742 self.conn.request("PUT", "/url", "body") 1743 message, f = self.get_headers_and_fp() 1744 self.assertEqual("text/plain", message.get_content_type()) 1745 self.assertIsNone(message.get_charset()) 1746 self.assertEqual("4", message.get("content-length")) 1747 self.assertEqual(b'body', f.read()) 1748 1749 def test_latin1_body(self): 1750 self.conn.request("PUT", "/url", "body\xc1") 1751 message, f = self.get_headers_and_fp() 1752 self.assertEqual("text/plain", message.get_content_type()) 1753 self.assertIsNone(message.get_charset()) 1754 self.assertEqual("5", message.get("content-length")) 1755 self.assertEqual(b'body\xc1', f.read()) 1756 1757 def test_bytes_body(self): 1758 self.conn.request("PUT", "/url", b"body\xc1") 1759 message, f = self.get_headers_and_fp() 1760 self.assertEqual("text/plain", message.get_content_type()) 1761 self.assertIsNone(message.get_charset()) 1762 self.assertEqual("5", message.get("content-length")) 1763 self.assertEqual(b'body\xc1', f.read()) 1764 1765 def test_text_file_body(self): 1766 self.addCleanup(support.unlink, support.TESTFN) 1767 with open(support.TESTFN, "w") as f: 1768 f.write("body") 1769 with open(support.TESTFN) as f: 1770 self.conn.request("PUT", "/url", f) 1771 message, f = self.get_headers_and_fp() 1772 self.assertEqual("text/plain", message.get_content_type()) 1773 self.assertIsNone(message.get_charset()) 1774 # No content-length will be determined for files; the body 1775 # will be sent using chunked transfer encoding instead. 1776 self.assertIsNone(message.get("content-length")) 1777 self.assertEqual("chunked", message.get("transfer-encoding")) 1778 self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read()) 1779 1780 def test_binary_file_body(self): 1781 self.addCleanup(support.unlink, support.TESTFN) 1782 with open(support.TESTFN, "wb") as f: 1783 f.write(b"body\xc1") 1784 with open(support.TESTFN, "rb") as f: 1785 self.conn.request("PUT", "/url", f) 1786 message, f = self.get_headers_and_fp() 1787 self.assertEqual("text/plain", message.get_content_type()) 1788 self.assertIsNone(message.get_charset()) 1789 self.assertEqual("chunked", message.get("Transfer-Encoding")) 1790 self.assertNotIn("Content-Length", message) 1791 self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read()) 1792 1793 1794 class HTTPResponseTest(TestCase): 1795 1796 def setUp(self): 1797 body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \ 1798 second-value\r\n\r\nText" 1799 sock = FakeSocket(body) 1800 self.resp = client.HTTPResponse(sock) 1801 self.resp.begin() 1802 1803 def test_getting_header(self): 1804 header = self.resp.getheader('My-Header') 1805 self.assertEqual(header, 'first-value, second-value') 1806 1807 header = self.resp.getheader('My-Header', 'some default') 1808 self.assertEqual(header, 'first-value, second-value') 1809 1810 def test_getting_nonexistent_header_with_string_default(self): 1811 header = self.resp.getheader('No-Such-Header', 'default-value') 1812 self.assertEqual(header, 'default-value') 1813 1814 def test_getting_nonexistent_header_with_iterable_default(self): 1815 header = self.resp.getheader('No-Such-Header', ['default', 'values']) 1816 self.assertEqual(header, 'default, values') 1817 1818 header = self.resp.getheader('No-Such-Header', ('default', 'values')) 1819 self.assertEqual(header, 'default, values') 1820 1821 def test_getting_nonexistent_header_without_default(self): 1822 header = self.resp.getheader('No-Such-Header') 1823 self.assertEqual(header, None) 1824 1825 def test_getting_header_defaultint(self): 1826 header = self.resp.getheader('No-Such-Header',default=42) 1827 self.assertEqual(header, 42) 1828 1829 class TunnelTests(TestCase): 1830 def setUp(self): 1831 response_text = ( 1832 'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT 1833 'HTTP/1.1 200 OK\r\n' # Reply to HEAD 1834 'Content-Length: 42\r\n\r\n' 1835 ) 1836 self.host = 'proxy.com' 1837 self.conn = client.HTTPConnection(self.host) 1838 self.conn._create_connection = self._create_connection(response_text) 1839 1840 def tearDown(self): 1841 self.conn.close() 1842 1843 def _create_connection(self, response_text): 1844 def create_connection(address, timeout=None, source_address=None): 1845 return FakeSocket(response_text, host=address[0], port=address[1]) 1846 return create_connection 1847 1848 def test_set_tunnel_host_port_headers(self): 1849 tunnel_host = 'destination.com' 1850 tunnel_port = 8888 1851 tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'} 1852 self.conn.set_tunnel(tunnel_host, port=tunnel_port, 1853 headers=tunnel_headers) 1854 self.conn.request('HEAD', '/', '') 1855 self.assertEqual(self.conn.sock.host, self.host) 1856 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 1857 self.assertEqual(self.conn._tunnel_host, tunnel_host) 1858 self.assertEqual(self.conn._tunnel_port, tunnel_port) 1859 self.assertEqual(self.conn._tunnel_headers, tunnel_headers) 1860 1861 def test_disallow_set_tunnel_after_connect(self): 1862 # Once connected, we shouldn't be able to tunnel anymore 1863 self.conn.connect() 1864 self.assertRaises(RuntimeError, self.conn.set_tunnel, 1865 'destination.com') 1866 1867 def test_connect_with_tunnel(self): 1868 self.conn.set_tunnel('destination.com') 1869 self.conn.request('HEAD', '/', '') 1870 self.assertEqual(self.conn.sock.host, self.host) 1871 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 1872 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 1873 # issue22095 1874 self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data) 1875 self.assertIn(b'Host: destination.com', self.conn.sock.data) 1876 1877 # This test should be removed when CONNECT gets the HTTP/1.1 blessing 1878 self.assertNotIn(b'Host: proxy.com', self.conn.sock.data) 1879 1880 def test_connect_put_request(self): 1881 self.conn.set_tunnel('destination.com') 1882 self.conn.request('PUT', '/', '') 1883 self.assertEqual(self.conn.sock.host, self.host) 1884 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 1885 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 1886 self.assertIn(b'Host: destination.com', self.conn.sock.data) 1887 1888 def test_tunnel_debuglog(self): 1889 expected_header = 'X-Dummy: 1' 1890 response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header) 1891 1892 self.conn.set_debuglevel(1) 1893 self.conn._create_connection = self._create_connection(response_text) 1894 self.conn.set_tunnel('destination.com') 1895 1896 with support.captured_stdout() as output: 1897 self.conn.request('PUT', '/', '') 1898 lines = output.getvalue().splitlines() 1899 self.assertIn('header: {}'.format(expected_header), lines) 1900 1901 1902 if __name__ == '__main__': 1903 unittest.main(verbosity=2) 1904