Home | History | Annotate | Download | only in test
      1 import errno
      2 from http import client
      3 import io
      4 import itertools
      5 import os
      6 import array
      7 import socket
      8 
      9 import unittest
     10 TestCase = unittest.TestCase
     11 
     12 from test import support
     13 
     14 here = os.path.dirname(__file__)
     15 # Self-signed cert file for 'localhost'
     16 CERT_localhost = os.path.join(here, 'keycert.pem')
     17 # Self-signed cert file for 'fakehostname'
     18 CERT_fakehostname = os.path.join(here, 'keycert2.pem')
     19 # Self-signed cert file for self-signed.pythontest.net
     20 CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
     21 
     22 # constants for testing chunked encoding
     23 chunked_start = (
     24     'HTTP/1.1 200 OK\r\n'
     25     'Transfer-Encoding: chunked\r\n\r\n'
     26     'a\r\n'
     27     'hello worl\r\n'
     28     '3\r\n'
     29     'd! \r\n'
     30     '8\r\n'
     31     'and now \r\n'
     32     '22\r\n'
     33     'for something completely different\r\n'
     34 )
     35 chunked_expected = b'hello world! and now for something completely different'
     36 chunk_extension = ";foo=bar"
     37 last_chunk = "0\r\n"
     38 last_chunk_extended = "0" + chunk_extension + "\r\n"
     39 trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
     40 chunked_end = "\r\n"
     41 
     42 HOST = support.HOST
     43 
     44 class FakeSocket:
     45     def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
     46         if isinstance(text, str):
     47             text = text.encode("ascii")
     48         self.text = text
     49         self.fileclass = fileclass
     50         self.data = b''
     51         self.sendall_calls = 0
     52         self.file_closed = False
     53         self.host = host
     54         self.port = port
     55 
     56     def sendall(self, data):
     57         self.sendall_calls += 1
     58         self.data += data
     59 
     60     def makefile(self, mode, bufsize=None):
     61         if mode != 'r' and mode != 'rb':
     62             raise client.UnimplementedFileMode()
     63         # keep the file around so we can check how much was read from it
     64         self.file = self.fileclass(self.text)
     65         self.file.close = self.file_close #nerf close ()
     66         return self.file
     67 
     68     def file_close(self):
     69         self.file_closed = True
     70 
     71     def close(self):
     72         pass
     73 
     74     def setsockopt(self, level, optname, value):
     75         pass
     76 
     77 class EPipeSocket(FakeSocket):
     78 
     79     def __init__(self, text, pipe_trigger):
     80         # When sendall() is called with pipe_trigger, raise EPIPE.
     81         FakeSocket.__init__(self, text)
     82         self.pipe_trigger = pipe_trigger
     83 
     84     def sendall(self, data):
     85         if self.pipe_trigger in data:
     86             raise OSError(errno.EPIPE, "gotcha")
     87         self.data += data
     88 
     89     def close(self):
     90         pass
     91 
     92 class NoEOFBytesIO(io.BytesIO):
     93     """Like BytesIO, but raises AssertionError on EOF.
     94 
     95     This is used below to test that http.client doesn't try to read
     96     more from the underlying file than it should.
     97     """
     98     def read(self, n=-1):
     99         data = io.BytesIO.read(self, n)
    100         if data == b'':
    101             raise AssertionError('caller tried to read past EOF')
    102         return data
    103 
    104     def readline(self, length=None):
    105         data = io.BytesIO.readline(self, length)
    106         if data == b'':
    107             raise AssertionError('caller tried to read past EOF')
    108         return data
    109 
    110 class FakeSocketHTTPConnection(client.HTTPConnection):
    111     """HTTPConnection subclass using FakeSocket; counts connect() calls"""
    112 
    113     def __init__(self, *args):
    114         self.connections = 0
    115         super().__init__('example.com')
    116         self.fake_socket_args = args
    117         self._create_connection = self.create_connection
    118 
    119     def connect(self):
    120         """Count the number of times connect() is invoked"""
    121         self.connections += 1
    122         return super().connect()
    123 
    124     def create_connection(self, *pos, **kw):
    125         return FakeSocket(*self.fake_socket_args)
    126 
    127 class HeaderTests(TestCase):
    128     def test_auto_headers(self):
    129         # Some headers are added automatically, but should not be added by
    130         # .request() if they are explicitly set.
    131 
    132         class HeaderCountingBuffer(list):
    133             def __init__(self):
    134                 self.count = {}
    135             def append(self, item):
    136                 kv = item.split(b':')
    137                 if len(kv) > 1:
    138                     # item is a 'Key: Value' header string
    139                     lcKey = kv[0].decode('ascii').lower()
    140                     self.count.setdefault(lcKey, 0)
    141                     self.count[lcKey] += 1
    142                 list.append(self, item)
    143 
    144         for explicit_header in True, False:
    145             for header in 'Content-length', 'Host', 'Accept-encoding':
    146                 conn = client.HTTPConnection('example.com')
    147                 conn.sock = FakeSocket('blahblahblah')
    148                 conn._buffer = HeaderCountingBuffer()
    149 
    150                 body = 'spamspamspam'
    151                 headers = {}
    152                 if explicit_header:
    153                     headers[header] = str(len(body))
    154                 conn.request('POST', '/', body, headers)
    155                 self.assertEqual(conn._buffer.count[header.lower()], 1)
    156 
    157     def test_content_length_0(self):
    158 
    159         class ContentLengthChecker(list):
    160             def __init__(self):
    161                 list.__init__(self)
    162                 self.content_length = None
    163             def append(self, item):
    164                 kv = item.split(b':', 1)
    165                 if len(kv) > 1 and kv[0].lower() == b'content-length':
    166                     self.content_length = kv[1].strip()
    167                 list.append(self, item)
    168 
    169         # Here, we're testing that methods expecting a body get a
    170         # content-length set to zero if the body is empty (either None or '')
    171         bodies = (None, '')
    172         methods_with_body = ('PUT', 'POST', 'PATCH')
    173         for method, body in itertools.product(methods_with_body, bodies):
    174             conn = client.HTTPConnection('example.com')
    175             conn.sock = FakeSocket(None)
    176             conn._buffer = ContentLengthChecker()
    177             conn.request(method, '/', body)
    178             self.assertEqual(
    179                 conn._buffer.content_length, b'0',
    180                 'Header Content-Length incorrect on {}'.format(method)
    181             )
    182 
    183         # For these methods, we make sure that content-length is not set when
    184         # the body is None because it might cause unexpected behaviour on the
    185         # server.
    186         methods_without_body = (
    187              'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
    188         )
    189         for method in methods_without_body:
    190             conn = client.HTTPConnection('example.com')
    191             conn.sock = FakeSocket(None)
    192             conn._buffer = ContentLengthChecker()
    193             conn.request(method, '/', None)
    194             self.assertEqual(
    195                 conn._buffer.content_length, None,
    196                 'Header Content-Length set for empty body on {}'.format(method)
    197             )
    198 
    199         # If the body is set to '', that's considered to be "present but
    200         # empty" rather than "missing", so content length would be set, even
    201         # for methods that don't expect a body.
    202         for method in methods_without_body:
    203             conn = client.HTTPConnection('example.com')
    204             conn.sock = FakeSocket(None)
    205             conn._buffer = ContentLengthChecker()
    206             conn.request(method, '/', '')
    207             self.assertEqual(
    208                 conn._buffer.content_length, b'0',
    209                 'Header Content-Length incorrect on {}'.format(method)
    210             )
    211 
    212         # If the body is set, make sure Content-Length is set.
    213         for method in itertools.chain(methods_without_body, methods_with_body):
    214             conn = client.HTTPConnection('example.com')
    215             conn.sock = FakeSocket(None)
    216             conn._buffer = ContentLengthChecker()
    217             conn.request(method, '/', ' ')
    218             self.assertEqual(
    219                 conn._buffer.content_length, b'1',
    220                 'Header Content-Length incorrect on {}'.format(method)
    221             )
    222 
    223     def test_putheader(self):
    224         conn = client.HTTPConnection('example.com')
    225         conn.sock = FakeSocket(None)
    226         conn.putrequest('GET','/')
    227         conn.putheader('Content-length', 42)
    228         self.assertIn(b'Content-length: 42', conn._buffer)
    229 
    230         conn.putheader('Foo', ' bar ')
    231         self.assertIn(b'Foo:  bar ', conn._buffer)
    232         conn.putheader('Bar', '\tbaz\t')
    233         self.assertIn(b'Bar: \tbaz\t', conn._buffer)
    234         conn.putheader('Authorization', 'Bearer mytoken')
    235         self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
    236         conn.putheader('IterHeader', 'IterA', 'IterB')
    237         self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
    238         conn.putheader('LatinHeader', b'\xFF')
    239         self.assertIn(b'LatinHeader: \xFF', conn._buffer)
    240         conn.putheader('Utf8Header', b'\xc3\x80')
    241         self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
    242         conn.putheader('C1-Control', b'next\x85line')
    243         self.assertIn(b'C1-Control: next\x85line', conn._buffer)
    244         conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
    245         self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
    246         conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
    247         self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
    248         conn.putheader('Key Space', 'value')
    249         self.assertIn(b'Key Space: value', conn._buffer)
    250         conn.putheader('KeySpace ', 'value')
    251         self.assertIn(b'KeySpace : value', conn._buffer)
    252         conn.putheader(b'Nonbreak\xa0Space', 'value')
    253         self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
    254         conn.putheader(b'\xa0NonbreakSpace', 'value')
    255         self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
    256 
    257     def test_ipv6host_header(self):
    258         # Default host header on IPv6 transaction should be wrapped by [] if
    259         # it is an IPv6 address
    260         expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
    261                    b'Accept-Encoding: identity\r\n\r\n'
    262         conn = client.HTTPConnection('[2001::]:81')
    263         sock = FakeSocket('')
    264         conn.sock = sock
    265         conn.request('GET', '/foo')
    266         self.assertTrue(sock.data.startswith(expected))
    267 
    268         expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
    269                    b'Accept-Encoding: identity\r\n\r\n'
    270         conn = client.HTTPConnection('[2001:102A::]')
    271         sock = FakeSocket('')
    272         conn.sock = sock
    273         conn.request('GET', '/foo')
    274         self.assertTrue(sock.data.startswith(expected))
    275 
    276     def test_malformed_headers_coped_with(self):
    277         # Issue 19996
    278         body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
    279         sock = FakeSocket(body)
    280         resp = client.HTTPResponse(sock)
    281         resp.begin()
    282 
    283         self.assertEqual(resp.getheader('First'), 'val')
    284         self.assertEqual(resp.getheader('Second'), 'val')
    285 
    286     def test_parse_all_octets(self):
    287         # Ensure no valid header field octet breaks the parser
    288         body = (
    289             b'HTTP/1.1 200 OK\r\n'
    290             b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
    291             b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n'
    292             b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n'
    293             b'obs-fold: text\r\n'
    294             b' folded with space\r\n'
    295             b'\tfolded with tab\r\n'
    296             b'Content-Length: 0\r\n'
    297             b'\r\n'
    298         )
    299         sock = FakeSocket(body)
    300         resp = client.HTTPResponse(sock)
    301         resp.begin()
    302         self.assertEqual(resp.getheader('Content-Length'), '0')
    303         self.assertEqual(resp.msg['Content-Length'], '0')
    304         self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
    305         self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value')
    306         vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
    307         self.assertEqual(resp.getheader('VCHAR'), vchar)
    308         self.assertEqual(resp.msg['VCHAR'], vchar)
    309         self.assertIsNotNone(resp.getheader('obs-text'))
    310         self.assertIn('obs-text', resp.msg)
    311         for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']):
    312             self.assertTrue(folded.startswith('text'))
    313             self.assertIn(' folded with space', folded)
    314             self.assertTrue(folded.endswith('folded with tab'))
    315 
    316     def test_invalid_headers(self):
    317         conn = client.HTTPConnection('example.com')
    318         conn.sock = FakeSocket('')
    319         conn.putrequest('GET', '/')
    320 
    321         # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
    322         # longer allowed in header names
    323         cases = (
    324             (b'Invalid\r\nName', b'ValidValue'),
    325             (b'Invalid\rName', b'ValidValue'),
    326             (b'Invalid\nName', b'ValidValue'),
    327             (b'\r\nInvalidName', b'ValidValue'),
    328             (b'\rInvalidName', b'ValidValue'),
    329             (b'\nInvalidName', b'ValidValue'),
    330             (b' InvalidName', b'ValidValue'),
    331             (b'\tInvalidName', b'ValidValue'),
    332             (b'Invalid:Name', b'ValidValue'),
    333             (b':InvalidName', b'ValidValue'),
    334             (b'ValidName', b'Invalid\r\nValue'),
    335             (b'ValidName', b'Invalid\rValue'),
    336             (b'ValidName', b'Invalid\nValue'),
    337             (b'ValidName', b'InvalidValue\r\n'),
    338             (b'ValidName', b'InvalidValue\r'),
    339             (b'ValidName', b'InvalidValue\n'),
    340         )
    341         for name, value in cases:
    342             with self.subTest((name, value)):
    343                 with self.assertRaisesRegex(ValueError, 'Invalid header'):
    344                     conn.putheader(name, value)
    345 
    346 
    347 class TransferEncodingTest(TestCase):
    348     expected_body = b"It's just a flesh wound"
    349 
    350     def test_endheaders_chunked(self):
    351         conn = client.HTTPConnection('example.com')
    352         conn.sock = FakeSocket(b'')
    353         conn.putrequest('POST', '/')
    354         conn.endheaders(self._make_body(), encode_chunked=True)
    355 
    356         _, _, body = self._parse_request(conn.sock.data)
    357         body = self._parse_chunked(body)
    358         self.assertEqual(body, self.expected_body)
    359 
    360     def test_explicit_headers(self):
    361         # explicit chunked
    362         conn = client.HTTPConnection('example.com')
    363         conn.sock = FakeSocket(b'')
    364         # this shouldn't actually be automatically chunk-encoded because the
    365         # calling code has explicitly stated that it's taking care of it
    366         conn.request(
    367             'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
    368 
    369         _, headers, body = self._parse_request(conn.sock.data)
    370         self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
    371         self.assertEqual(headers['Transfer-Encoding'], 'chunked')
    372         self.assertEqual(body, self.expected_body)
    373 
    374         # explicit chunked, string body
    375         conn = client.HTTPConnection('example.com')
    376         conn.sock = FakeSocket(b'')
    377         conn.request(
    378             'POST', '/', self.expected_body.decode('latin-1'),
    379             {'Transfer-Encoding': 'chunked'})
    380 
    381         _, headers, body = self._parse_request(conn.sock.data)
    382         self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
    383         self.assertEqual(headers['Transfer-Encoding'], 'chunked')
    384         self.assertEqual(body, self.expected_body)
    385 
    386         # User-specified TE, but request() does the chunk encoding
    387         conn = client.HTTPConnection('example.com')
    388         conn.sock = FakeSocket(b'')
    389         conn.request('POST', '/',
    390             headers={'Transfer-Encoding': 'gzip, chunked'},
    391             encode_chunked=True,
    392             body=self._make_body())
    393         _, headers, body = self._parse_request(conn.sock.data)
    394         self.assertNotIn('content-length', [k.lower() for k in headers])
    395         self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
    396         self.assertEqual(self._parse_chunked(body), self.expected_body)
    397 
    398     def test_request(self):
    399         for empty_lines in (False, True,):
    400             conn = client.HTTPConnection('example.com')
    401             conn.sock = FakeSocket(b'')
    402             conn.request(
    403                 'POST', '/', self._make_body(empty_lines=empty_lines))
    404 
    405             _, headers, body = self._parse_request(conn.sock.data)
    406             body = self._parse_chunked(body)
    407             self.assertEqual(body, self.expected_body)
    408             self.assertEqual(headers['Transfer-Encoding'], 'chunked')
    409 
    410             # Content-Length and Transfer-Encoding SHOULD not be sent in the
    411             # same request
    412             self.assertNotIn('content-length', [k.lower() for k in headers])
    413 
    414     def test_empty_body(self):
    415         # Zero-length iterable should be treated like any other iterable
    416         conn = client.HTTPConnection('example.com')
    417         conn.sock = FakeSocket(b'')
    418         conn.request('POST', '/', ())
    419         _, headers, body = self._parse_request(conn.sock.data)
    420         self.assertEqual(headers['Transfer-Encoding'], 'chunked')
    421         self.assertNotIn('content-length', [k.lower() for k in headers])
    422         self.assertEqual(body, b"0\r\n\r\n")
    423 
    424     def _make_body(self, empty_lines=False):
    425         lines = self.expected_body.split(b' ')
    426         for idx, line in enumerate(lines):
    427             # for testing handling empty lines
    428             if empty_lines and idx % 2:
    429                 yield b''
    430             if idx < len(lines) - 1:
    431                 yield line + b' '
    432             else:
    433                 yield line
    434 
    435     def _parse_request(self, data):
    436         lines = data.split(b'\r\n')
    437         request = lines[0]
    438         headers = {}
    439         n = 1
    440         while n < len(lines) and len(lines[n]) > 0:
    441             key, val = lines[n].split(b':')
    442             key = key.decode('latin-1').strip()
    443             headers[key] = val.decode('latin-1').strip()
    444             n += 1
    445 
    446         return request, headers, b'\r\n'.join(lines[n + 1:])
    447 
    448     def _parse_chunked(self, data):
    449         body = []
    450         trailers = {}
    451         n = 0
    452         lines = data.split(b'\r\n')
    453         # parse body
    454         while True:
    455             size, chunk = lines[n:n+2]
    456             size = int(size, 16)
    457 
    458             if size == 0:
    459                 n += 1
    460                 break
    461 
    462             self.assertEqual(size, len(chunk))
    463             body.append(chunk)
    464 
    465             n += 2
    466             # we /should/ hit the end chunk, but check against the size of
    467             # lines so we're not stuck in an infinite loop should we get
    468             # malformed data
    469             if n > len(lines):
    470                 break
    471 
    472         return b''.join(body)
    473 
    474 
    475 class BasicTest(TestCase):
    476     def test_status_lines(self):
    477         # Test HTTP status lines
    478 
    479         body = "HTTP/1.1 200 Ok\r\n\r\nText"
    480         sock = FakeSocket(body)
    481         resp = client.HTTPResponse(sock)
    482         resp.begin()
    483         self.assertEqual(resp.read(0), b'')  # Issue #20007
    484         self.assertFalse(resp.isclosed())
    485         self.assertFalse(resp.closed)
    486         self.assertEqual(resp.read(), b"Text")
    487         self.assertTrue(resp.isclosed())
    488         self.assertFalse(resp.closed)
    489         resp.close()
    490         self.assertTrue(resp.closed)
    491 
    492         body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
    493         sock = FakeSocket(body)
    494         resp = client.HTTPResponse(sock)
    495         self.assertRaises(client.BadStatusLine, resp.begin)
    496 
    497     def test_bad_status_repr(self):
    498         exc = client.BadStatusLine('')
    499         self.assertEqual(repr(exc), '''BadStatusLine("\'\'",)''')
    500 
    501     def test_partial_reads(self):
    502         # if we have Content-Length, HTTPResponse knows when to close itself,
    503         # the same behaviour as when we read the whole thing with read()
    504         body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
    505         sock = FakeSocket(body)
    506         resp = client.HTTPResponse(sock)
    507         resp.begin()
    508         self.assertEqual(resp.read(2), b'Te')
    509         self.assertFalse(resp.isclosed())
    510         self.assertEqual(resp.read(2), b'xt')
    511         self.assertTrue(resp.isclosed())
    512         self.assertFalse(resp.closed)
    513         resp.close()
    514         self.assertTrue(resp.closed)
    515 
    516     def test_mixed_reads(self):
    517         # readline() should update the remaining length, so that read() knows
    518         # how much data is left and does not raise IncompleteRead
    519         body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother"
    520         sock = FakeSocket(body)
    521         resp = client.HTTPResponse(sock)
    522         resp.begin()
    523         self.assertEqual(resp.readline(), b'Text\r\n')
    524         self.assertFalse(resp.isclosed())
    525         self.assertEqual(resp.read(), b'Another')
    526         self.assertTrue(resp.isclosed())
    527         self.assertFalse(resp.closed)
    528         resp.close()
    529         self.assertTrue(resp.closed)
    530 
    531     def test_partial_readintos(self):
    532         # if we have Content-Length, HTTPResponse knows when to close itself,
    533         # the same behaviour as when we read the whole thing with read()
    534         body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
    535         sock = FakeSocket(body)
    536         resp = client.HTTPResponse(sock)
    537         resp.begin()
    538         b = bytearray(2)
    539         n = resp.readinto(b)
    540         self.assertEqual(n, 2)
    541         self.assertEqual(bytes(b), b'Te')
    542         self.assertFalse(resp.isclosed())
    543         n = resp.readinto(b)
    544         self.assertEqual(n, 2)
    545         self.assertEqual(bytes(b), b'xt')
    546         self.assertTrue(resp.isclosed())
    547         self.assertFalse(resp.closed)
    548         resp.close()
    549         self.assertTrue(resp.closed)
    550 
    551     def test_partial_reads_no_content_length(self):
    552         # when no length is present, the socket should be gracefully closed when
    553         # all data was read
    554         body = "HTTP/1.1 200 Ok\r\n\r\nText"
    555         sock = FakeSocket(body)
    556         resp = client.HTTPResponse(sock)
    557         resp.begin()
    558         self.assertEqual(resp.read(2), b'Te')
    559         self.assertFalse(resp.isclosed())
    560         self.assertEqual(resp.read(2), b'xt')
    561         self.assertEqual(resp.read(1), b'')
    562         self.assertTrue(resp.isclosed())
    563         self.assertFalse(resp.closed)
    564         resp.close()
    565         self.assertTrue(resp.closed)
    566 
    567     def test_partial_readintos_no_content_length(self):
    568         # when no length is present, the socket should be gracefully closed when
    569         # all data was read
    570         body = "HTTP/1.1 200 Ok\r\n\r\nText"
    571         sock = FakeSocket(body)
    572         resp = client.HTTPResponse(sock)
    573         resp.begin()
    574         b = bytearray(2)
    575         n = resp.readinto(b)
    576         self.assertEqual(n, 2)
    577         self.assertEqual(bytes(b), b'Te')
    578         self.assertFalse(resp.isclosed())
    579         n = resp.readinto(b)
    580         self.assertEqual(n, 2)
    581         self.assertEqual(bytes(b), b'xt')
    582         n = resp.readinto(b)
    583         self.assertEqual(n, 0)
    584         self.assertTrue(resp.isclosed())
    585 
    586     def test_partial_reads_incomplete_body(self):
    587         # if the server shuts down the connection before the whole
    588         # content-length is delivered, the socket is gracefully closed
    589         body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
    590         sock = FakeSocket(body)
    591         resp = client.HTTPResponse(sock)
    592         resp.begin()
    593         self.assertEqual(resp.read(2), b'Te')
    594         self.assertFalse(resp.isclosed())
    595         self.assertEqual(resp.read(2), b'xt')
    596         self.assertEqual(resp.read(1), b'')
    597         self.assertTrue(resp.isclosed())
    598 
    599     def test_partial_readintos_incomplete_body(self):
    600         # if the server shuts down the connection before the whole
    601         # content-length is delivered, the socket is gracefully closed
    602         body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
    603         sock = FakeSocket(body)
    604         resp = client.HTTPResponse(sock)
    605         resp.begin()
    606         b = bytearray(2)
    607         n = resp.readinto(b)
    608         self.assertEqual(n, 2)
    609         self.assertEqual(bytes(b), b'Te')
    610         self.assertFalse(resp.isclosed())
    611         n = resp.readinto(b)
    612         self.assertEqual(n, 2)
    613         self.assertEqual(bytes(b), b'xt')
    614         n = resp.readinto(b)
    615         self.assertEqual(n, 0)
    616         self.assertTrue(resp.isclosed())
    617         self.assertFalse(resp.closed)
    618         resp.close()
    619         self.assertTrue(resp.closed)
    620 
    621     def test_host_port(self):
    622         # Check invalid host_port
    623 
    624         for hp in ("www.python.org:abc", "user:password (at] www.python.org"):
    625             self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
    626 
    627         for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
    628                           "fe80::207:e9ff:fe9b", 8000),
    629                          ("www.python.org:80", "www.python.org", 80),
    630                          ("www.python.org:", "www.python.org", 80),
    631                          ("www.python.org", "www.python.org", 80),
    632                          ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
    633                          ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
    634             c = client.HTTPConnection(hp)
    635             self.assertEqual(h, c.host)
    636             self.assertEqual(p, c.port)
    637 
    638     def test_response_headers(self):
    639         # test response with multiple message headers with the same field name.
    640         text = ('HTTP/1.1 200 OK\r\n'
    641                 'Set-Cookie: Customer="WILE_E_COYOTE"; '
    642                 'Version="1"; Path="/acme"\r\n'
    643                 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
    644                 ' Path="/acme"\r\n'
    645                 '\r\n'
    646                 'No body\r\n')
    647         hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
    648                ', '
    649                'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
    650         s = FakeSocket(text)
    651         r = client.HTTPResponse(s)
    652         r.begin()
    653         cookies = r.getheader("Set-Cookie")
    654         self.assertEqual(cookies, hdr)
    655 
    656     def test_read_head(self):
    657         # Test that the library doesn't attempt to read any data
    658         # from a HEAD request.  (Tickles SF bug #622042.)
    659         sock = FakeSocket(
    660             'HTTP/1.1 200 OK\r\n'
    661             'Content-Length: 14432\r\n'
    662             '\r\n',
    663             NoEOFBytesIO)
    664         resp = client.HTTPResponse(sock, method="HEAD")
    665         resp.begin()
    666         if resp.read():
    667             self.fail("Did not expect response from HEAD request")
    668 
    669     def test_readinto_head(self):
    670         # Test that the library doesn't attempt to read any data
    671         # from a HEAD request.  (Tickles SF bug #622042.)
    672         sock = FakeSocket(
    673             'HTTP/1.1 200 OK\r\n'
    674             'Content-Length: 14432\r\n'
    675             '\r\n',
    676             NoEOFBytesIO)
    677         resp = client.HTTPResponse(sock, method="HEAD")
    678         resp.begin()
    679         b = bytearray(5)
    680         if resp.readinto(b) != 0:
    681             self.fail("Did not expect response from HEAD request")
    682         self.assertEqual(bytes(b), b'\x00'*5)
    683 
    684     def test_too_many_headers(self):
    685         headers = '\r\n'.join('Header%d: foo' % i
    686                               for i in range(client._MAXHEADERS + 1)) + '\r\n'
    687         text = ('HTTP/1.1 200 OK\r\n' + headers)
    688         s = FakeSocket(text)
    689         r = client.HTTPResponse(s)
    690         self.assertRaisesRegex(client.HTTPException,
    691                                r"got more than \d+ headers", r.begin)
    692 
    693     def test_send_file(self):
    694         expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
    695                     b'Accept-Encoding: identity\r\n'
    696                     b'Transfer-Encoding: chunked\r\n'
    697                     b'\r\n')
    698 
    699         with open(__file__, 'rb') as body:
    700             conn = client.HTTPConnection('example.com')
    701             sock = FakeSocket(body)
    702             conn.sock = sock
    703             conn.request('GET', '/foo', body)
    704             self.assertTrue(sock.data.startswith(expected), '%r != %r' %
    705                     (sock.data[:len(expected)], expected))
    706 
    707     def test_send(self):
    708         expected = b'this is a test this is only a test'
    709         conn = client.HTTPConnection('example.com')
    710         sock = FakeSocket(None)
    711         conn.sock = sock
    712         conn.send(expected)
    713         self.assertEqual(expected, sock.data)
    714         sock.data = b''
    715         conn.send(array.array('b', expected))
    716         self.assertEqual(expected, sock.data)
    717         sock.data = b''
    718         conn.send(io.BytesIO(expected))
    719         self.assertEqual(expected, sock.data)
    720 
    721     def test_send_updating_file(self):
    722         def data():
    723             yield 'data'
    724             yield None
    725             yield 'data_two'
    726 
    727         class UpdatingFile(io.TextIOBase):
    728             mode = 'r'
    729             d = data()
    730             def read(self, blocksize=-1):
    731                 return next(self.d)
    732 
    733         expected = b'data'
    734 
    735         conn = client.HTTPConnection('example.com')
    736         sock = FakeSocket("")
    737         conn.sock = sock
    738         conn.send(UpdatingFile())
    739         self.assertEqual(sock.data, expected)
    740 
    741 
    742     def test_send_iter(self):
    743         expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
    744                    b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
    745                    b'\r\nonetwothree'
    746 
    747         def body():
    748             yield b"one"
    749             yield b"two"
    750             yield b"three"
    751 
    752         conn = client.HTTPConnection('example.com')
    753         sock = FakeSocket("")
    754         conn.sock = sock
    755         conn.request('GET', '/foo', body(), {'Content-Length': '11'})
    756         self.assertEqual(sock.data, expected)
    757 
    758     def test_send_type_error(self):
    759         # See: Issue #12676
    760         conn = client.HTTPConnection('example.com')
    761         conn.sock = FakeSocket('')
    762         with self.assertRaises(TypeError):
    763             conn.request('POST', 'test', conn)
    764 
    765     def test_chunked(self):
    766         expected = chunked_expected
    767         sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    768         resp = client.HTTPResponse(sock, method="GET")
    769         resp.begin()
    770         self.assertEqual(resp.read(), expected)
    771         resp.close()
    772 
    773         # Various read sizes
    774         for n in range(1, 12):
    775             sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    776             resp = client.HTTPResponse(sock, method="GET")
    777             resp.begin()
    778             self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
    779             resp.close()
    780 
    781         for x in ('', 'foo\r\n'):
    782             sock = FakeSocket(chunked_start + x)
    783             resp = client.HTTPResponse(sock, method="GET")
    784             resp.begin()
    785             try:
    786                 resp.read()
    787             except client.IncompleteRead as i:
    788                 self.assertEqual(i.partial, expected)
    789                 expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
    790                 self.assertEqual(repr(i), expected_message)
    791                 self.assertEqual(str(i), expected_message)
    792             else:
    793                 self.fail('IncompleteRead expected')
    794             finally:
    795                 resp.close()
    796 
    797     def test_readinto_chunked(self):
    798 
    799         expected = chunked_expected
    800         nexpected = len(expected)
    801         b = bytearray(128)
    802 
    803         sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    804         resp = client.HTTPResponse(sock, method="GET")
    805         resp.begin()
    806         n = resp.readinto(b)
    807         self.assertEqual(b[:nexpected], expected)
    808         self.assertEqual(n, nexpected)
    809         resp.close()
    810 
    811         # Various read sizes
    812         for n in range(1, 12):
    813             sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    814             resp = client.HTTPResponse(sock, method="GET")
    815             resp.begin()
    816             m = memoryview(b)
    817             i = resp.readinto(m[0:n])
    818             i += resp.readinto(m[i:n + i])
    819             i += resp.readinto(m[i:])
    820             self.assertEqual(b[:nexpected], expected)
    821             self.assertEqual(i, nexpected)
    822             resp.close()
    823 
    824         for x in ('', 'foo\r\n'):
    825             sock = FakeSocket(chunked_start + x)
    826             resp = client.HTTPResponse(sock, method="GET")
    827             resp.begin()
    828             try:
    829                 n = resp.readinto(b)
    830             except client.IncompleteRead as i:
    831                 self.assertEqual(i.partial, expected)
    832                 expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
    833                 self.assertEqual(repr(i), expected_message)
    834                 self.assertEqual(str(i), expected_message)
    835             else:
    836                 self.fail('IncompleteRead expected')
    837             finally:
    838                 resp.close()
    839 
    840     def test_chunked_head(self):
    841         chunked_start = (
    842             'HTTP/1.1 200 OK\r\n'
    843             'Transfer-Encoding: chunked\r\n\r\n'
    844             'a\r\n'
    845             'hello world\r\n'
    846             '1\r\n'
    847             'd\r\n'
    848         )
    849         sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    850         resp = client.HTTPResponse(sock, method="HEAD")
    851         resp.begin()
    852         self.assertEqual(resp.read(), b'')
    853         self.assertEqual(resp.status, 200)
    854         self.assertEqual(resp.reason, 'OK')
    855         self.assertTrue(resp.isclosed())
    856         self.assertFalse(resp.closed)
    857         resp.close()
    858         self.assertTrue(resp.closed)
    859 
    860     def test_readinto_chunked_head(self):
    861         chunked_start = (
    862             'HTTP/1.1 200 OK\r\n'
    863             'Transfer-Encoding: chunked\r\n\r\n'
    864             'a\r\n'
    865             'hello world\r\n'
    866             '1\r\n'
    867             'd\r\n'
    868         )
    869         sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    870         resp = client.HTTPResponse(sock, method="HEAD")
    871         resp.begin()
    872         b = bytearray(5)
    873         n = resp.readinto(b)
    874         self.assertEqual(n, 0)
    875         self.assertEqual(bytes(b), b'\x00'*5)
    876         self.assertEqual(resp.status, 200)
    877         self.assertEqual(resp.reason, 'OK')
    878         self.assertTrue(resp.isclosed())
    879         self.assertFalse(resp.closed)
    880         resp.close()
    881         self.assertTrue(resp.closed)
    882 
    883     def test_negative_content_length(self):
    884         sock = FakeSocket(
    885             'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
    886         resp = client.HTTPResponse(sock, method="GET")
    887         resp.begin()
    888         self.assertEqual(resp.read(), b'Hello\r\n')
    889         self.assertTrue(resp.isclosed())
    890 
    891     def test_incomplete_read(self):
    892         sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
    893         resp = client.HTTPResponse(sock, method="GET")
    894         resp.begin()
    895         try:
    896             resp.read()
    897         except client.IncompleteRead as i:
    898             self.assertEqual(i.partial, b'Hello\r\n')
    899             self.assertEqual(repr(i),
    900                              "IncompleteRead(7 bytes read, 3 more expected)")
    901             self.assertEqual(str(i),
    902                              "IncompleteRead(7 bytes read, 3 more expected)")
    903             self.assertTrue(resp.isclosed())
    904         else:
    905             self.fail('IncompleteRead expected')
    906 
    907     def test_epipe(self):
    908         sock = EPipeSocket(
    909             "HTTP/1.0 401 Authorization Required\r\n"
    910             "Content-type: text/html\r\n"
    911             "WWW-Authenticate: Basic realm=\"example\"\r\n",
    912             b"Content-Length")
    913         conn = client.HTTPConnection("example.com")
    914         conn.sock = sock
    915         self.assertRaises(OSError,
    916                           lambda: conn.request("PUT", "/url", "body"))
    917         resp = conn.getresponse()
    918         self.assertEqual(401, resp.status)
    919         self.assertEqual("Basic realm=\"example\"",
    920                          resp.getheader("www-authenticate"))
    921 
    922     # Test lines overflowing the max line size (_MAXLINE in http.client)
    923 
    924     def test_overflowing_status_line(self):
    925         body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
    926         resp = client.HTTPResponse(FakeSocket(body))
    927         self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)
    928 
    929     def test_overflowing_header_line(self):
    930         body = (
    931             'HTTP/1.1 200 OK\r\n'
    932             'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
    933         )
    934         resp = client.HTTPResponse(FakeSocket(body))
    935         self.assertRaises(client.LineTooLong, resp.begin)
    936 
    937     def test_overflowing_chunked_line(self):
    938         body = (
    939             'HTTP/1.1 200 OK\r\n'
    940             'Transfer-Encoding: chunked\r\n\r\n'
    941             + '0' * 65536 + 'a\r\n'
    942             'hello world\r\n'
    943             '0\r\n'
    944             '\r\n'
    945         )
    946         resp = client.HTTPResponse(FakeSocket(body))
    947         resp.begin()
    948         self.assertRaises(client.LineTooLong, resp.read)
    949 
    950     def test_early_eof(self):
    951         # Test httpresponse with no \r\n termination,
    952         body = "HTTP/1.1 200 Ok"
    953         sock = FakeSocket(body)
    954         resp = client.HTTPResponse(sock)
    955         resp.begin()
    956         self.assertEqual(resp.read(), b'')
    957         self.assertTrue(resp.isclosed())
    958         self.assertFalse(resp.closed)
    959         resp.close()
    960         self.assertTrue(resp.closed)
    961 
    962     def test_error_leak(self):
    963         # Test that the socket is not leaked if getresponse() fails
    964         conn = client.HTTPConnection('example.com')
    965         response = None
    966         class Response(client.HTTPResponse):
    967             def __init__(self, *pos, **kw):
    968                 nonlocal response
    969                 response = self  # Avoid garbage collector closing the socket
    970                 client.HTTPResponse.__init__(self, *pos, **kw)
    971         conn.response_class = Response
    972         conn.sock = FakeSocket('Invalid status line')
    973         conn.request('GET', '/')
    974         self.assertRaises(client.BadStatusLine, conn.getresponse)
    975         self.assertTrue(response.closed)
    976         self.assertTrue(conn.sock.file_closed)
    977 
    978     def test_chunked_extension(self):
    979         extra = '3;foo=bar\r\n' + 'abc\r\n'
    980         expected = chunked_expected + b'abc'
    981 
    982         sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
    983         resp = client.HTTPResponse(sock, method="GET")
    984         resp.begin()
    985         self.assertEqual(resp.read(), expected)
    986         resp.close()
    987 
    988     def test_chunked_missing_end(self):
    989         """some servers may serve up a short chunked encoding stream"""
    990         expected = chunked_expected
    991         sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf
    992         resp = client.HTTPResponse(sock, method="GET")
    993         resp.begin()
    994         self.assertEqual(resp.read(), expected)
    995         resp.close()
    996 
    997     def test_chunked_trailers(self):
    998         """See that trailers are read and ignored"""
    999         expected = chunked_expected
   1000         sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
   1001         resp = client.HTTPResponse(sock, method="GET")
   1002         resp.begin()
   1003         self.assertEqual(resp.read(), expected)
   1004         # we should have reached the end of the file
   1005         self.assertEqual(sock.file.read(), b"") #we read to the end
   1006         resp.close()
   1007 
   1008     def test_chunked_sync(self):
   1009         """Check that we don't read past the end of the chunked-encoding stream"""
   1010         expected = chunked_expected
   1011         extradata = "extradata"
   1012         sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
   1013         resp = client.HTTPResponse(sock, method="GET")
   1014         resp.begin()
   1015         self.assertEqual(resp.read(), expected)
   1016         # the file should now have our extradata ready to be read
   1017         self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end
   1018         resp.close()
   1019 
   1020     def test_content_length_sync(self):
   1021         """Check that we don't read past the end of the Content-Length stream"""
   1022         extradata = b"extradata"
   1023         expected = b"Hello123\r\n"
   1024         sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
   1025         resp = client.HTTPResponse(sock, method="GET")
   1026         resp.begin()
   1027         self.assertEqual(resp.read(), expected)
   1028         # the file should now have our extradata ready to be read
   1029         self.assertEqual(sock.file.read(), extradata) #we read to the end
   1030         resp.close()
   1031 
   1032     def test_readlines_content_length(self):
   1033         extradata = b"extradata"
   1034         expected = b"Hello123\r\n"
   1035         sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
   1036         resp = client.HTTPResponse(sock, method="GET")
   1037         resp.begin()
   1038         self.assertEqual(resp.readlines(2000), [expected])
   1039         # the file should now have our extradata ready to be read
   1040         self.assertEqual(sock.file.read(), extradata) #we read to the end
   1041         resp.close()
   1042 
   1043     def test_read1_content_length(self):
   1044         extradata = b"extradata"
   1045         expected = b"Hello123\r\n"
   1046         sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
   1047         resp = client.HTTPResponse(sock, method="GET")
   1048         resp.begin()
   1049         self.assertEqual(resp.read1(2000), expected)
   1050         # the file should now have our extradata ready to be read
   1051         self.assertEqual(sock.file.read(), extradata) #we read to the end
   1052         resp.close()
   1053 
   1054     def test_readline_bound_content_length(self):
   1055         extradata = b"extradata"
   1056         expected = b"Hello123\r\n"
   1057         sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
   1058         resp = client.HTTPResponse(sock, method="GET")
   1059         resp.begin()
   1060         self.assertEqual(resp.readline(10), expected)
   1061         self.assertEqual(resp.readline(10), b"")
   1062         # the file should now have our extradata ready to be read
   1063         self.assertEqual(sock.file.read(), extradata) #we read to the end
   1064         resp.close()
   1065 
   1066     def test_read1_bound_content_length(self):
   1067         extradata = b"extradata"
   1068         expected = b"Hello123\r\n"
   1069         sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata)
   1070         resp = client.HTTPResponse(sock, method="GET")
   1071         resp.begin()
   1072         self.assertEqual(resp.read1(20), expected*2)
   1073         self.assertEqual(resp.read(), expected)
   1074         # the file should now have our extradata ready to be read
   1075         self.assertEqual(sock.file.read(), extradata) #we read to the end
   1076         resp.close()
   1077 
   1078     def test_response_fileno(self):
   1079         # Make sure fd returned by fileno is valid.
   1080         threading = support.import_module("threading")
   1081 
   1082         serv = socket.socket(
   1083             socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
   1084         self.addCleanup(serv.close)
   1085         serv.bind((HOST, 0))
   1086         serv.listen()
   1087 
   1088         result = None
   1089         def run_server():
   1090             [conn, address] = serv.accept()
   1091             with conn, conn.makefile("rb") as reader:
   1092                 # Read the request header until a blank line
   1093                 while True:
   1094                     line = reader.readline()
   1095                     if not line.rstrip(b"\r\n"):
   1096                         break
   1097                 conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n")
   1098                 nonlocal result
   1099                 result = reader.read()
   1100 
   1101         thread = threading.Thread(target=run_server)
   1102         thread.start()
   1103         self.addCleanup(thread.join, float(1))
   1104         conn = client.HTTPConnection(*serv.getsockname())
   1105         conn.request("CONNECT", "dummy:1234")
   1106         response = conn.getresponse()
   1107         try:
   1108             self.assertEqual(response.status, client.OK)
   1109             s = socket.socket(fileno=response.fileno())
   1110             try:
   1111                 s.sendall(b"proxied data\n")
   1112             finally:
   1113                 s.detach()
   1114         finally:
   1115             response.close()
   1116             conn.close()
   1117         thread.join()
   1118         self.assertEqual(result, b"proxied data\n")
   1119 
   1120 class ExtendedReadTest(TestCase):
   1121     """
   1122     Test peek(), read1(), readline()
   1123     """
   1124     lines = (
   1125         'HTTP/1.1 200 OK\r\n'
   1126         '\r\n'
   1127         'hello world!\n'
   1128         'and now \n'
   1129         'for something completely different\n'
   1130         'foo'
   1131         )
   1132     lines_expected = lines[lines.find('hello'):].encode("ascii")
   1133     lines_chunked = (
   1134         'HTTP/1.1 200 OK\r\n'
   1135         'Transfer-Encoding: chunked\r\n\r\n'
   1136         'a\r\n'
   1137         'hello worl\r\n'
   1138         '3\r\n'
   1139         'd!\n\r\n'
   1140         '9\r\n'
   1141         'and now \n\r\n'
   1142         '23\r\n'
   1143         'for something completely different\n\r\n'
   1144         '3\r\n'
   1145         'foo\r\n'
   1146         '0\r\n' # terminating chunk
   1147         '\r\n'  # end of trailers
   1148     )
   1149 
   1150     def setUp(self):
   1151         sock = FakeSocket(self.lines)
   1152         resp = client.HTTPResponse(sock, method="GET")
   1153         resp.begin()
   1154         resp.fp = io.BufferedReader(resp.fp)
   1155         self.resp = resp
   1156 
   1157 
   1158 
   1159     def test_peek(self):
   1160         resp = self.resp
   1161         # patch up the buffered peek so that it returns not too much stuff
   1162         oldpeek = resp.fp.peek
   1163         def mypeek(n=-1):
   1164             p = oldpeek(n)
   1165             if n >= 0:
   1166                 return p[:n]
   1167             return p[:10]
   1168         resp.fp.peek = mypeek
   1169 
   1170         all = []
   1171         while True:
   1172             # try a short peek
   1173             p = resp.peek(3)
   1174             if p:
   1175                 self.assertGreater(len(p), 0)
   1176                 # then unbounded peek
   1177                 p2 = resp.peek()
   1178                 self.assertGreaterEqual(len(p2), len(p))
   1179                 self.assertTrue(p2.startswith(p))
   1180                 next = resp.read(len(p2))
   1181                 self.assertEqual(next, p2)
   1182             else:
   1183                 next = resp.read()
   1184                 self.assertFalse(next)
   1185             all.append(next)
   1186             if not next:
   1187                 break
   1188         self.assertEqual(b"".join(all), self.lines_expected)
   1189 
   1190     def test_readline(self):
   1191         resp = self.resp
   1192         self._verify_readline(self.resp.readline, self.lines_expected)
   1193 
   1194     def _verify_readline(self, readline, expected):
   1195         all = []
   1196         while True:
   1197             # short readlines
   1198             line = readline(5)
   1199             if line and line != b"foo":
   1200                 if len(line) < 5:
   1201                     self.assertTrue(line.endswith(b"\n"))
   1202             all.append(line)
   1203             if not line:
   1204                 break
   1205         self.assertEqual(b"".join(all), expected)
   1206 
   1207     def test_read1(self):
   1208         resp = self.resp
   1209         def r():
   1210             res = resp.read1(4)
   1211             self.assertLessEqual(len(res), 4)
   1212             return res
   1213         readliner = Readliner(r)
   1214         self._verify_readline(readliner.readline, self.lines_expected)
   1215 
   1216     def test_read1_unbounded(self):
   1217         resp = self.resp
   1218         all = []
   1219         while True:
   1220             data = resp.read1()
   1221             if not data:
   1222                 break
   1223             all.append(data)
   1224         self.assertEqual(b"".join(all), self.lines_expected)
   1225 
   1226     def test_read1_bounded(self):
   1227         resp = self.resp
   1228         all = []
   1229         while True:
   1230             data = resp.read1(10)
   1231             if not data:
   1232                 break
   1233             self.assertLessEqual(len(data), 10)
   1234             all.append(data)
   1235         self.assertEqual(b"".join(all), self.lines_expected)
   1236 
   1237     def test_read1_0(self):
   1238         self.assertEqual(self.resp.read1(0), b"")
   1239 
   1240     def test_peek_0(self):
   1241         p = self.resp.peek(0)
   1242         self.assertLessEqual(0, len(p))
   1243 
   1244 class ExtendedReadTestChunked(ExtendedReadTest):
   1245     """
   1246     Test peek(), read1(), readline() in chunked mode
   1247     """
   1248     lines = (
   1249         'HTTP/1.1 200 OK\r\n'
   1250         'Transfer-Encoding: chunked\r\n\r\n'
   1251         'a\r\n'
   1252         'hello worl\r\n'
   1253         '3\r\n'
   1254         'd!\n\r\n'
   1255         '9\r\n'
   1256         'and now \n\r\n'
   1257         '23\r\n'
   1258         'for something completely different\n\r\n'
   1259         '3\r\n'
   1260         'foo\r\n'
   1261         '0\r\n' # terminating chunk
   1262         '\r\n'  # end of trailers
   1263     )
   1264 
   1265 
   1266 class Readliner:
   1267     """
   1268     a simple readline class that uses an arbitrary read function and buffering
   1269     """
   1270     def __init__(self, readfunc):
   1271         self.readfunc = readfunc
   1272         self.remainder = b""
   1273 
   1274     def readline(self, limit):
   1275         data = []
   1276         datalen = 0
   1277         read = self.remainder
   1278         try:
   1279             while True:
   1280                 idx = read.find(b'\n')
   1281                 if idx != -1:
   1282                     break
   1283                 if datalen + len(read) >= limit:
   1284                     idx = limit - datalen - 1
   1285                 # read more data
   1286                 data.append(read)
   1287                 read = self.readfunc()
   1288                 if not read:
   1289                     idx = 0 #eof condition
   1290                     break
   1291             idx += 1
   1292             data.append(read[:idx])
   1293             self.remainder = read[idx:]
   1294             return b"".join(data)
   1295         except:
   1296             self.remainder = b"".join(data)
   1297             raise
   1298 
   1299 
   1300 class OfflineTest(TestCase):
   1301     def test_all(self):
   1302         # Documented objects defined in the module should be in __all__
   1303         expected = {"responses"}  # White-list documented dict() object
   1304         # HTTPMessage, parse_headers(), and the HTTP status code constants are
   1305         # intentionally omitted for simplicity
   1306         blacklist = {"HTTPMessage", "parse_headers"}
   1307         for name in dir(client):
   1308             if name.startswith("_") or name in blacklist:
   1309                 continue
   1310             module_object = getattr(client, name)
   1311             if getattr(module_object, "__module__", None) == "http.client":
   1312                 expected.add(name)
   1313         self.assertCountEqual(client.__all__, expected)
   1314 
   1315     def test_responses(self):
   1316         self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
   1317 
   1318     def test_client_constants(self):
   1319         # Make sure we don't break backward compatibility with 3.4
   1320         expected = [
   1321             'CONTINUE',
   1322             'SWITCHING_PROTOCOLS',
   1323             'PROCESSING',
   1324             'OK',
   1325             'CREATED',
   1326             'ACCEPTED',
   1327             'NON_AUTHORITATIVE_INFORMATION',
   1328             'NO_CONTENT',
   1329             'RESET_CONTENT',
   1330             'PARTIAL_CONTENT',
   1331             'MULTI_STATUS',
   1332             'IM_USED',
   1333             'MULTIPLE_CHOICES',
   1334             'MOVED_PERMANENTLY',
   1335             'FOUND',
   1336             'SEE_OTHER',
   1337             'NOT_MODIFIED',
   1338             'USE_PROXY',
   1339             'TEMPORARY_REDIRECT',
   1340             'BAD_REQUEST',
   1341             'UNAUTHORIZED',
   1342             'PAYMENT_REQUIRED',
   1343             'FORBIDDEN',
   1344             'NOT_FOUND',
   1345             'METHOD_NOT_ALLOWED',
   1346             'NOT_ACCEPTABLE',
   1347             'PROXY_AUTHENTICATION_REQUIRED',
   1348             'REQUEST_TIMEOUT',
   1349             'CONFLICT',
   1350             'GONE',
   1351             'LENGTH_REQUIRED',
   1352             'PRECONDITION_FAILED',
   1353             'REQUEST_ENTITY_TOO_LARGE',
   1354             'REQUEST_URI_TOO_LONG',
   1355             'UNSUPPORTED_MEDIA_TYPE',
   1356             'REQUESTED_RANGE_NOT_SATISFIABLE',
   1357             'EXPECTATION_FAILED',
   1358             'UNPROCESSABLE_ENTITY',
   1359             'LOCKED',
   1360             'FAILED_DEPENDENCY',
   1361             'UPGRADE_REQUIRED',
   1362             'PRECONDITION_REQUIRED',
   1363             'TOO_MANY_REQUESTS',
   1364             'REQUEST_HEADER_FIELDS_TOO_LARGE',
   1365             'INTERNAL_SERVER_ERROR',
   1366             'NOT_IMPLEMENTED',
   1367             'BAD_GATEWAY',
   1368             'SERVICE_UNAVAILABLE',
   1369             'GATEWAY_TIMEOUT',
   1370             'HTTP_VERSION_NOT_SUPPORTED',
   1371             'INSUFFICIENT_STORAGE',
   1372             'NOT_EXTENDED',
   1373             'NETWORK_AUTHENTICATION_REQUIRED',
   1374         ]
   1375         for const in expected:
   1376             with self.subTest(constant=const):
   1377                 self.assertTrue(hasattr(client, const))
   1378 
   1379 
   1380 class SourceAddressTest(TestCase):
   1381     def setUp(self):
   1382         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   1383         self.port = support.bind_port(self.serv)
   1384         self.source_port = support.find_unused_port()
   1385         self.serv.listen()
   1386         self.conn = None
   1387 
   1388     def tearDown(self):
   1389         if self.conn:
   1390             self.conn.close()
   1391             self.conn = None
   1392         self.serv.close()
   1393         self.serv = None
   1394 
   1395     def testHTTPConnectionSourceAddress(self):
   1396         self.conn = client.HTTPConnection(HOST, self.port,
   1397                 source_address=('', self.source_port))
   1398         self.conn.connect()
   1399         self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
   1400 
   1401     @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
   1402                      'http.client.HTTPSConnection not defined')
   1403     def testHTTPSConnectionSourceAddress(self):
   1404         self.conn = client.HTTPSConnection(HOST, self.port,
   1405                 source_address=('', self.source_port))
   1406         # We don't test anything here other than the constructor not barfing as
   1407         # this code doesn't deal with setting up an active running SSL server
   1408         # for an ssl_wrapped connect() to actually return from.
   1409 
   1410 
   1411 class TimeoutTest(TestCase):
   1412     PORT = None
   1413 
   1414     def setUp(self):
   1415         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   1416         TimeoutTest.PORT = support.bind_port(self.serv)
   1417         self.serv.listen()
   1418 
   1419     def tearDown(self):
   1420         self.serv.close()
   1421         self.serv = None
   1422 
   1423     def testTimeoutAttribute(self):
   1424         # This will prove that the timeout gets through HTTPConnection
   1425         # and into the socket.
   1426 
   1427         # default -- use global socket timeout
   1428         self.assertIsNone(socket.getdefaulttimeout())
   1429         socket.setdefaulttimeout(30)
   1430         try:
   1431             httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
   1432             httpConn.connect()
   1433         finally:
   1434             socket.setdefaulttimeout(None)
   1435         self.assertEqual(httpConn.sock.gettimeout(), 30)
   1436         httpConn.close()
   1437 
   1438         # no timeout -- do not use global socket default
   1439         self.assertIsNone(socket.getdefaulttimeout())
   1440         socket.setdefaulttimeout(30)
   1441         try:
   1442             httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
   1443                                               timeout=None)
   1444             httpConn.connect()
   1445         finally:
   1446             socket.setdefaulttimeout(None)
   1447         self.assertEqual(httpConn.sock.gettimeout(), None)
   1448         httpConn.close()
   1449 
   1450         # a value
   1451         httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
   1452         httpConn.connect()
   1453         self.assertEqual(httpConn.sock.gettimeout(), 30)
   1454         httpConn.close()
   1455 
   1456 
   1457 class PersistenceTest(TestCase):
   1458 
   1459     def test_reuse_reconnect(self):
   1460         # Should reuse or reconnect depending on header from server
   1461         tests = (
   1462             ('1.0', '', False),
   1463             ('1.0', 'Connection: keep-alive\r\n', True),
   1464             ('1.1', '', True),
   1465             ('1.1', 'Connection: close\r\n', False),
   1466             ('1.0', 'Connection: keep-ALIVE\r\n', True),
   1467             ('1.1', 'Connection: cloSE\r\n', False),
   1468         )
   1469         for version, header, reuse in tests:
   1470             with self.subTest(version=version, header=header):
   1471                 msg = (
   1472                     'HTTP/{} 200 OK\r\n'
   1473                     '{}'
   1474                     'Content-Length: 12\r\n'
   1475                     '\r\n'
   1476                     'Dummy body\r\n'
   1477                 ).format(version, header)
   1478                 conn = FakeSocketHTTPConnection(msg)
   1479                 self.assertIsNone(conn.sock)
   1480                 conn.request('GET', '/open-connection')
   1481                 with conn.getresponse() as response:
   1482                     self.assertEqual(conn.sock is None, not reuse)
   1483                     response.read()
   1484                 self.assertEqual(conn.sock is None, not reuse)
   1485                 self.assertEqual(conn.connections, 1)
   1486                 conn.request('GET', '/subsequent-request')
   1487                 self.assertEqual(conn.connections, 1 if reuse else 2)
   1488 
   1489     def test_disconnected(self):
   1490 
   1491         def make_reset_reader(text):
   1492             """Return BufferedReader that raises ECONNRESET at EOF"""
   1493             stream = io.BytesIO(text)
   1494             def readinto(buffer):
   1495                 size = io.BytesIO.readinto(stream, buffer)
   1496                 if size == 0:
   1497                     raise ConnectionResetError()
   1498                 return size
   1499             stream.readinto = readinto
   1500             return io.BufferedReader(stream)
   1501 
   1502         tests = (
   1503             (io.BytesIO, client.RemoteDisconnected),
   1504             (make_reset_reader, ConnectionResetError),
   1505         )
   1506         for stream_factory, exception in tests:
   1507             with self.subTest(exception=exception):
   1508                 conn = FakeSocketHTTPConnection(b'', stream_factory)
   1509                 conn.request('GET', '/eof-response')
   1510                 self.assertRaises(exception, conn.getresponse)
   1511                 self.assertIsNone(conn.sock)
   1512                 # HTTPConnection.connect() should be automatically invoked
   1513                 conn.request('GET', '/reconnect')
   1514                 self.assertEqual(conn.connections, 2)
   1515 
   1516     def test_100_close(self):
   1517         conn = FakeSocketHTTPConnection(
   1518             b'HTTP/1.1 100 Continue\r\n'
   1519             b'\r\n'
   1520             # Missing final response
   1521         )
   1522         conn.request('GET', '/', headers={'Expect': '100-continue'})
   1523         self.assertRaises(client.RemoteDisconnected, conn.getresponse)
   1524         self.assertIsNone(conn.sock)
   1525         conn.request('GET', '/reconnect')
   1526         self.assertEqual(conn.connections, 2)
   1527 
   1528 
   1529 class HTTPSTest(TestCase):
   1530 
   1531     def setUp(self):
   1532         if not hasattr(client, 'HTTPSConnection'):
   1533             self.skipTest('ssl support required')
   1534 
   1535     def make_server(self, certfile):
   1536         from test.ssl_servers import make_https_server
   1537         return make_https_server(self, certfile=certfile)
   1538 
   1539     def test_attributes(self):
   1540         # simple test to check it's storing the timeout
   1541         h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
   1542         self.assertEqual(h.timeout, 30)
   1543 
   1544     def test_networked(self):
   1545         # Default settings: requires a valid cert from a trusted CA
   1546         import ssl
   1547         support.requires('network')
   1548         with support.transient_internet('self-signed.pythontest.net'):
   1549             h = client.HTTPSConnection('self-signed.pythontest.net', 443)
   1550             with self.assertRaises(ssl.SSLError) as exc_info:
   1551                 h.request('GET', '/')
   1552             self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
   1553 
   1554     def test_networked_noverification(self):
   1555         # Switch off cert verification
   1556         import ssl
   1557         support.requires('network')
   1558         with support.transient_internet('self-signed.pythontest.net'):
   1559             context = ssl._create_unverified_context()
   1560             h = client.HTTPSConnection('self-signed.pythontest.net', 443,
   1561                                        context=context)
   1562             h.request('GET', '/')
   1563             resp = h.getresponse()
   1564             h.close()
   1565             self.assertIn('nginx', resp.getheader('server'))
   1566             resp.close()
   1567 
   1568     @support.system_must_validate_cert
   1569     def test_networked_trusted_by_default_cert(self):
   1570         # Default settings: requires a valid cert from a trusted CA
   1571         support.requires('network')
   1572         with support.transient_internet('www.python.org'):
   1573             h = client.HTTPSConnection('www.python.org', 443)
   1574             h.request('GET', '/')
   1575             resp = h.getresponse()
   1576             content_type = resp.getheader('content-type')
   1577             resp.close()
   1578             h.close()
   1579             self.assertIn('text/html', content_type)
   1580 
   1581     def test_networked_good_cert(self):
   1582         # We feed the server's cert as a validating cert
   1583         import ssl
   1584         support.requires('network')
   1585         with support.transient_internet('self-signed.pythontest.net'):
   1586             context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1587             context.verify_mode = ssl.CERT_REQUIRED
   1588             context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
   1589             h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
   1590             h.request('GET', '/')
   1591             resp = h.getresponse()
   1592             server_string = resp.getheader('server')
   1593             resp.close()
   1594             h.close()
   1595             self.assertIn('nginx', server_string)
   1596 
   1597     def test_networked_bad_cert(self):
   1598         # We feed a "CA" cert that is unrelated to the server's cert
   1599         import ssl
   1600         support.requires('network')
   1601         with support.transient_internet('self-signed.pythontest.net'):
   1602             context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1603             context.verify_mode = ssl.CERT_REQUIRED
   1604             context.load_verify_locations(CERT_localhost)
   1605             h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
   1606             with self.assertRaises(ssl.SSLError) as exc_info:
   1607                 h.request('GET', '/')
   1608             self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
   1609 
   1610     def test_local_unknown_cert(self):
   1611         # The custom cert isn't known to the default trust bundle
   1612         import ssl
   1613         server = self.make_server(CERT_localhost)
   1614         h = client.HTTPSConnection('localhost', server.port)
   1615         with self.assertRaises(ssl.SSLError) as exc_info:
   1616             h.request('GET', '/')
   1617         self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
   1618 
   1619     def test_local_good_hostname(self):
   1620         # The (valid) cert validates the HTTP hostname
   1621         import ssl
   1622         server = self.make_server(CERT_localhost)
   1623         context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1624         context.verify_mode = ssl.CERT_REQUIRED
   1625         context.load_verify_locations(CERT_localhost)
   1626         h = client.HTTPSConnection('localhost', server.port, context=context)
   1627         self.addCleanup(h.close)
   1628         h.request('GET', '/nonexistent')
   1629         resp = h.getresponse()
   1630         self.addCleanup(resp.close)
   1631         self.assertEqual(resp.status, 404)
   1632 
   1633     def test_local_bad_hostname(self):
   1634         # The (valid) cert doesn't validate the HTTP hostname
   1635         import ssl
   1636         server = self.make_server(CERT_fakehostname)
   1637         context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1638         context.verify_mode = ssl.CERT_REQUIRED
   1639         context.check_hostname = True
   1640         context.load_verify_locations(CERT_fakehostname)
   1641         h = client.HTTPSConnection('localhost', server.port, context=context)
   1642         with self.assertRaises(ssl.CertificateError):
   1643             h.request('GET', '/')
   1644         # Same with explicit check_hostname=True
   1645         with support.check_warnings(('', DeprecationWarning)):
   1646             h = client.HTTPSConnection('localhost', server.port,
   1647                                        context=context, check_hostname=True)
   1648         with self.assertRaises(ssl.CertificateError):
   1649             h.request('GET', '/')
   1650         # With check_hostname=False, the mismatching is ignored
   1651         context.check_hostname = False
   1652         with support.check_warnings(('', DeprecationWarning)):
   1653             h = client.HTTPSConnection('localhost', server.port,
   1654                                        context=context, check_hostname=False)
   1655         h.request('GET', '/nonexistent')
   1656         resp = h.getresponse()
   1657         resp.close()
   1658         h.close()
   1659         self.assertEqual(resp.status, 404)
   1660         # The context's check_hostname setting is used if one isn't passed to
   1661         # HTTPSConnection.
   1662         context.check_hostname = False
   1663         h = client.HTTPSConnection('localhost', server.port, context=context)
   1664         h.request('GET', '/nonexistent')
   1665         resp = h.getresponse()
   1666         self.assertEqual(resp.status, 404)
   1667         resp.close()
   1668         h.close()
   1669         # Passing check_hostname to HTTPSConnection should override the
   1670         # context's setting.
   1671         with support.check_warnings(('', DeprecationWarning)):
   1672             h = client.HTTPSConnection('localhost', server.port,
   1673                                        context=context, check_hostname=True)
   1674         with self.assertRaises(ssl.CertificateError):
   1675             h.request('GET', '/')
   1676 
   1677     @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
   1678                      'http.client.HTTPSConnection not available')
   1679     def test_host_port(self):
   1680         # Check invalid host_port
   1681 
   1682         for hp in ("www.python.org:abc", "user:password (at] www.python.org"):
   1683             self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)
   1684 
   1685         for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
   1686                           "fe80::207:e9ff:fe9b", 8000),
   1687                          ("www.python.org:443", "www.python.org", 443),
   1688                          ("www.python.org:", "www.python.org", 443),
   1689                          ("www.python.org", "www.python.org", 443),
   1690                          ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
   1691                          ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
   1692                              443)):
   1693             c = client.HTTPSConnection(hp)
   1694             self.assertEqual(h, c.host)
   1695             self.assertEqual(p, c.port)
   1696 
   1697 
   1698 class RequestBodyTest(TestCase):
   1699     """Test cases where a request includes a message body."""
   1700 
   1701     def setUp(self):
   1702         self.conn = client.HTTPConnection('example.com')
   1703         self.conn.sock = self.sock = FakeSocket("")
   1704         self.conn.sock = self.sock
   1705 
   1706     def get_headers_and_fp(self):
   1707         f = io.BytesIO(self.sock.data)
   1708         f.readline()  # read the request line
   1709         message = client.parse_headers(f)
   1710         return message, f
   1711 
   1712     def test_list_body(self):
   1713         # Note that no content-length is automatically calculated for
   1714         # an iterable.  The request will fall back to send chunked
   1715         # transfer encoding.
   1716         cases = (
   1717             ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
   1718             ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
   1719         )
   1720         for body, expected in cases:
   1721             with self.subTest(body):
   1722                 self.conn = client.HTTPConnection('example.com')
   1723                 self.conn.sock = self.sock = FakeSocket('')
   1724 
   1725                 self.conn.request('PUT', '/url', body)
   1726                 msg, f = self.get_headers_and_fp()
   1727                 self.assertNotIn('Content-Type', msg)
   1728                 self.assertNotIn('Content-Length', msg)
   1729                 self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
   1730                 self.assertEqual(expected, f.read())
   1731 
   1732     def test_manual_content_length(self):
   1733         # Set an incorrect content-length so that we can verify that
   1734         # it will not be over-ridden by the library.
   1735         self.conn.request("PUT", "/url", "body",
   1736                           {"Content-Length": "42"})
   1737         message, f = self.get_headers_and_fp()
   1738         self.assertEqual("42", message.get("content-length"))
   1739         self.assertEqual(4, len(f.read()))
   1740 
   1741     def test_ascii_body(self):
   1742         self.conn.request("PUT", "/url", "body")
   1743         message, f = self.get_headers_and_fp()
   1744         self.assertEqual("text/plain", message.get_content_type())
   1745         self.assertIsNone(message.get_charset())
   1746         self.assertEqual("4", message.get("content-length"))
   1747         self.assertEqual(b'body', f.read())
   1748 
   1749     def test_latin1_body(self):
   1750         self.conn.request("PUT", "/url", "body\xc1")
   1751         message, f = self.get_headers_and_fp()
   1752         self.assertEqual("text/plain", message.get_content_type())
   1753         self.assertIsNone(message.get_charset())
   1754         self.assertEqual("5", message.get("content-length"))
   1755         self.assertEqual(b'body\xc1', f.read())
   1756 
   1757     def test_bytes_body(self):
   1758         self.conn.request("PUT", "/url", b"body\xc1")
   1759         message, f = self.get_headers_and_fp()
   1760         self.assertEqual("text/plain", message.get_content_type())
   1761         self.assertIsNone(message.get_charset())
   1762         self.assertEqual("5", message.get("content-length"))
   1763         self.assertEqual(b'body\xc1', f.read())
   1764 
   1765     def test_text_file_body(self):
   1766         self.addCleanup(support.unlink, support.TESTFN)
   1767         with open(support.TESTFN, "w") as f:
   1768             f.write("body")
   1769         with open(support.TESTFN) as f:
   1770             self.conn.request("PUT", "/url", f)
   1771             message, f = self.get_headers_and_fp()
   1772             self.assertEqual("text/plain", message.get_content_type())
   1773             self.assertIsNone(message.get_charset())
   1774             # No content-length will be determined for files; the body
   1775             # will be sent using chunked transfer encoding instead.
   1776             self.assertIsNone(message.get("content-length"))
   1777             self.assertEqual("chunked", message.get("transfer-encoding"))
   1778             self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
   1779 
   1780     def test_binary_file_body(self):
   1781         self.addCleanup(support.unlink, support.TESTFN)
   1782         with open(support.TESTFN, "wb") as f:
   1783             f.write(b"body\xc1")
   1784         with open(support.TESTFN, "rb") as f:
   1785             self.conn.request("PUT", "/url", f)
   1786             message, f = self.get_headers_and_fp()
   1787             self.assertEqual("text/plain", message.get_content_type())
   1788             self.assertIsNone(message.get_charset())
   1789             self.assertEqual("chunked", message.get("Transfer-Encoding"))
   1790             self.assertNotIn("Content-Length", message)
   1791             self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read())
   1792 
   1793 
   1794 class HTTPResponseTest(TestCase):
   1795 
   1796     def setUp(self):
   1797         body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
   1798                 second-value\r\n\r\nText"
   1799         sock = FakeSocket(body)
   1800         self.resp = client.HTTPResponse(sock)
   1801         self.resp.begin()
   1802 
   1803     def test_getting_header(self):
   1804         header = self.resp.getheader('My-Header')
   1805         self.assertEqual(header, 'first-value, second-value')
   1806 
   1807         header = self.resp.getheader('My-Header', 'some default')
   1808         self.assertEqual(header, 'first-value, second-value')
   1809 
   1810     def test_getting_nonexistent_header_with_string_default(self):
   1811         header = self.resp.getheader('No-Such-Header', 'default-value')
   1812         self.assertEqual(header, 'default-value')
   1813 
   1814     def test_getting_nonexistent_header_with_iterable_default(self):
   1815         header = self.resp.getheader('No-Such-Header', ['default', 'values'])
   1816         self.assertEqual(header, 'default, values')
   1817 
   1818         header = self.resp.getheader('No-Such-Header', ('default', 'values'))
   1819         self.assertEqual(header, 'default, values')
   1820 
   1821     def test_getting_nonexistent_header_without_default(self):
   1822         header = self.resp.getheader('No-Such-Header')
   1823         self.assertEqual(header, None)
   1824 
   1825     def test_getting_header_defaultint(self):
   1826         header = self.resp.getheader('No-Such-Header',default=42)
   1827         self.assertEqual(header, 42)
   1828 
   1829 class TunnelTests(TestCase):
   1830     def setUp(self):
   1831         response_text = (
   1832             'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
   1833             'HTTP/1.1 200 OK\r\n' # Reply to HEAD
   1834             'Content-Length: 42\r\n\r\n'
   1835         )
   1836         self.host = 'proxy.com'
   1837         self.conn = client.HTTPConnection(self.host)
   1838         self.conn._create_connection = self._create_connection(response_text)
   1839 
   1840     def tearDown(self):
   1841         self.conn.close()
   1842 
   1843     def _create_connection(self, response_text):
   1844         def create_connection(address, timeout=None, source_address=None):
   1845             return FakeSocket(response_text, host=address[0], port=address[1])
   1846         return create_connection
   1847 
   1848     def test_set_tunnel_host_port_headers(self):
   1849         tunnel_host = 'destination.com'
   1850         tunnel_port = 8888
   1851         tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'}
   1852         self.conn.set_tunnel(tunnel_host, port=tunnel_port,
   1853                              headers=tunnel_headers)
   1854         self.conn.request('HEAD', '/', '')
   1855         self.assertEqual(self.conn.sock.host, self.host)
   1856         self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
   1857         self.assertEqual(self.conn._tunnel_host, tunnel_host)
   1858         self.assertEqual(self.conn._tunnel_port, tunnel_port)
   1859         self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
   1860 
   1861     def test_disallow_set_tunnel_after_connect(self):
   1862         # Once connected, we shouldn't be able to tunnel anymore
   1863         self.conn.connect()
   1864         self.assertRaises(RuntimeError, self.conn.set_tunnel,
   1865                           'destination.com')
   1866 
   1867     def test_connect_with_tunnel(self):
   1868         self.conn.set_tunnel('destination.com')
   1869         self.conn.request('HEAD', '/', '')
   1870         self.assertEqual(self.conn.sock.host, self.host)
   1871         self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
   1872         self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
   1873         # issue22095
   1874         self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data)
   1875         self.assertIn(b'Host: destination.com', self.conn.sock.data)
   1876 
   1877         # This test should be removed when CONNECT gets the HTTP/1.1 blessing
   1878         self.assertNotIn(b'Host: proxy.com', self.conn.sock.data)
   1879 
   1880     def test_connect_put_request(self):
   1881         self.conn.set_tunnel('destination.com')
   1882         self.conn.request('PUT', '/', '')
   1883         self.assertEqual(self.conn.sock.host, self.host)
   1884         self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
   1885         self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
   1886         self.assertIn(b'Host: destination.com', self.conn.sock.data)
   1887 
   1888     def test_tunnel_debuglog(self):
   1889         expected_header = 'X-Dummy: 1'
   1890         response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header)
   1891 
   1892         self.conn.set_debuglevel(1)
   1893         self.conn._create_connection = self._create_connection(response_text)
   1894         self.conn.set_tunnel('destination.com')
   1895 
   1896         with support.captured_stdout() as output:
   1897             self.conn.request('PUT', '/', '')
   1898         lines = output.getvalue().splitlines()
   1899         self.assertIn('header: {}'.format(expected_header), lines)
   1900 
   1901 
   1902 if __name__ == '__main__':
   1903     unittest.main(verbosity=2)
   1904