Home | History | Annotate | Download | only in test
      1 import httplib
      2 import itertools
      3 import array
      4 import StringIO
      5 import socket
      6 import errno
      7 import os
      8 import tempfile
      9 
     10 import unittest
     11 TestCase = unittest.TestCase
     12 
     13 from test import test_support
     14 
     15 here = os.path.dirname(__file__)
     16 # Self-signed cert file for 'localhost'
     17 CERT_localhost = os.path.join(here, 'keycert.pem')
     18 # Self-signed cert file for 'fakehostname'
     19 CERT_fakehostname = os.path.join(here, 'keycert2.pem')
     20 # Self-signed cert file for self-signed.pythontest.net
     21 CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
     22 
     23 HOST = test_support.HOST
     24 
     25 class FakeSocket:
     26     def __init__(self, text, fileclass=StringIO.StringIO, host=None, port=None):
     27         self.text = text
     28         self.fileclass = fileclass
     29         self.data = ''
     30         self.file_closed = False
     31         self.host = host
     32         self.port = port
     33 
     34     def sendall(self, data):
     35         self.data += ''.join(data)
     36 
     37     def makefile(self, mode, bufsize=None):
     38         if mode != 'r' and mode != 'rb':
     39             raise httplib.UnimplementedFileMode()
     40         # keep the file around so we can check how much was read from it
     41         self.file = self.fileclass(self.text)
     42         self.file.close = self.file_close #nerf close ()
     43         return self.file
     44 
     45     def file_close(self):
     46         self.file_closed = True
     47 
     48     def close(self):
     49         pass
     50 
     51 class EPipeSocket(FakeSocket):
     52 
     53     def __init__(self, text, pipe_trigger):
     54         # When sendall() is called with pipe_trigger, raise EPIPE.
     55         FakeSocket.__init__(self, text)
     56         self.pipe_trigger = pipe_trigger
     57 
     58     def sendall(self, data):
     59         if self.pipe_trigger in data:
     60             raise socket.error(errno.EPIPE, "gotcha")
     61         self.data += data
     62 
     63     def close(self):
     64         pass
     65 
     66 class NoEOFStringIO(StringIO.StringIO):
     67     """Like StringIO, but raises AssertionError on EOF.
     68 
     69     This is used below to test that httplib doesn't try to read
     70     more from the underlying file than it should.
     71     """
     72     def read(self, n=-1):
     73         data = StringIO.StringIO.read(self, n)
     74         if data == '':
     75             raise AssertionError('caller tried to read past EOF')
     76         return data
     77 
     78     def readline(self, length=None):
     79         data = StringIO.StringIO.readline(self, length)
     80         if data == '':
     81             raise AssertionError('caller tried to read past EOF')
     82         return data
     83 
     84 
     85 class HeaderTests(TestCase):
     86     def test_auto_headers(self):
     87         # Some headers are added automatically, but should not be added by
     88         # .request() if they are explicitly set.
     89 
     90         class HeaderCountingBuffer(list):
     91             def __init__(self):
     92                 self.count = {}
     93             def append(self, item):
     94                 kv = item.split(':')
     95                 if len(kv) > 1:
     96                     # item is a 'Key: Value' header string
     97                     lcKey = kv[0].lower()
     98                     self.count.setdefault(lcKey, 0)
     99                     self.count[lcKey] += 1
    100                 list.append(self, item)
    101 
    102         for explicit_header in True, False:
    103             for header in 'Content-length', 'Host', 'Accept-encoding':
    104                 conn = httplib.HTTPConnection('example.com')
    105                 conn.sock = FakeSocket('blahblahblah')
    106                 conn._buffer = HeaderCountingBuffer()
    107 
    108                 body = 'spamspamspam'
    109                 headers = {}
    110                 if explicit_header:
    111                     headers[header] = str(len(body))
    112                 conn.request('POST', '/', body, headers)
    113                 self.assertEqual(conn._buffer.count[header.lower()], 1)
    114 
    115     def test_content_length_0(self):
    116 
    117         class ContentLengthChecker(list):
    118             def __init__(self):
    119                 list.__init__(self)
    120                 self.content_length = None
    121             def append(self, item):
    122                 kv = item.split(':', 1)
    123                 if len(kv) > 1 and kv[0].lower() == 'content-length':
    124                     self.content_length = kv[1].strip()
    125                 list.append(self, item)
    126 
    127         # Here, we're testing that methods expecting a body get a
    128         # content-length set to zero if the body is empty (either None or '')
    129         bodies = (None, '')
    130         methods_with_body = ('PUT', 'POST', 'PATCH')
    131         for method, body in itertools.product(methods_with_body, bodies):
    132             conn = httplib.HTTPConnection('example.com')
    133             conn.sock = FakeSocket(None)
    134             conn._buffer = ContentLengthChecker()
    135             conn.request(method, '/', body)
    136             self.assertEqual(
    137                 conn._buffer.content_length, '0',
    138                 'Header Content-Length incorrect on {}'.format(method)
    139             )
    140 
    141         # For these methods, we make sure that content-length is not set when
    142         # the body is None because it might cause unexpected behaviour on the
    143         # server.
    144         methods_without_body = (
    145              'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
    146         )
    147         for method in methods_without_body:
    148             conn = httplib.HTTPConnection('example.com')
    149             conn.sock = FakeSocket(None)
    150             conn._buffer = ContentLengthChecker()
    151             conn.request(method, '/', None)
    152             self.assertEqual(
    153                 conn._buffer.content_length, None,
    154                 'Header Content-Length set for empty body on {}'.format(method)
    155             )
    156 
    157         # If the body is set to '', that's considered to be "present but
    158         # empty" rather than "missing", so content length would be set, even
    159         # for methods that don't expect a body.
    160         for method in methods_without_body:
    161             conn = httplib.HTTPConnection('example.com')
    162             conn.sock = FakeSocket(None)
    163             conn._buffer = ContentLengthChecker()
    164             conn.request(method, '/', '')
    165             self.assertEqual(
    166                 conn._buffer.content_length, '0',
    167                 'Header Content-Length incorrect on {}'.format(method)
    168             )
    169 
    170         # If the body is set, make sure Content-Length is set.
    171         for method in itertools.chain(methods_without_body, methods_with_body):
    172             conn = httplib.HTTPConnection('example.com')
    173             conn.sock = FakeSocket(None)
    174             conn._buffer = ContentLengthChecker()
    175             conn.request(method, '/', ' ')
    176             self.assertEqual(
    177                 conn._buffer.content_length, '1',
    178                 'Header Content-Length incorrect on {}'.format(method)
    179             )
    180 
    181     def test_putheader(self):
    182         conn = httplib.HTTPConnection('example.com')
    183         conn.sock = FakeSocket(None)
    184         conn.putrequest('GET','/')
    185         conn.putheader('Content-length',42)
    186         self.assertIn('Content-length: 42', conn._buffer)
    187 
    188         conn.putheader('Foo', ' bar ')
    189         self.assertIn(b'Foo:  bar ', conn._buffer)
    190         conn.putheader('Bar', '\tbaz\t')
    191         self.assertIn(b'Bar: \tbaz\t', conn._buffer)
    192         conn.putheader('Authorization', 'Bearer mytoken')
    193         self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
    194         conn.putheader('IterHeader', 'IterA', 'IterB')
    195         self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
    196         conn.putheader('LatinHeader', b'\xFF')
    197         self.assertIn(b'LatinHeader: \xFF', conn._buffer)
    198         conn.putheader('Utf8Header', b'\xc3\x80')
    199         self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
    200         conn.putheader('C1-Control', b'next\x85line')
    201         self.assertIn(b'C1-Control: next\x85line', conn._buffer)
    202         conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
    203         self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
    204         conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
    205         self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
    206         conn.putheader('Key Space', 'value')
    207         self.assertIn(b'Key Space: value', conn._buffer)
    208         conn.putheader('KeySpace ', 'value')
    209         self.assertIn(b'KeySpace : value', conn._buffer)
    210         conn.putheader(b'Nonbreak\xa0Space', 'value')
    211         self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
    212         conn.putheader(b'\xa0NonbreakSpace', 'value')
    213         self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
    214 
    215     def test_ipv6host_header(self):
    216         # Default host header on IPv6 transaction should be wrapped by [] if
    217         # it is an IPv6 address
    218         expected = 'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
    219                    'Accept-Encoding: identity\r\n\r\n'
    220         conn = httplib.HTTPConnection('[2001::]:81')
    221         sock = FakeSocket('')
    222         conn.sock = sock
    223         conn.request('GET', '/foo')
    224         self.assertTrue(sock.data.startswith(expected))
    225 
    226         expected = 'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
    227                    'Accept-Encoding: identity\r\n\r\n'
    228         conn = httplib.HTTPConnection('[2001:102A::]')
    229         sock = FakeSocket('')
    230         conn.sock = sock
    231         conn.request('GET', '/foo')
    232         self.assertTrue(sock.data.startswith(expected))
    233 
    234     def test_malformed_headers_coped_with(self):
    235         # Issue 19996
    236         body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
    237         sock = FakeSocket(body)
    238         resp = httplib.HTTPResponse(sock)
    239         resp.begin()
    240 
    241         self.assertEqual(resp.getheader('First'), 'val')
    242         self.assertEqual(resp.getheader('Second'), 'val')
    243 
    244     def test_malformed_truncation(self):
    245         # Other malformed header lines, especially without colons, used to
    246         # cause the rest of the header section to be truncated
    247         resp = (
    248             b'HTTP/1.1 200 OK\r\n'
    249             b'Public-Key-Pins: \n'
    250             b'pin-sha256="xxx=";\n'
    251             b'report-uri="https://..."\r\n'
    252             b'Transfer-Encoding: chunked\r\n'
    253             b'\r\n'
    254             b'4\r\nbody\r\n0\r\n\r\n'
    255         )
    256         resp = httplib.HTTPResponse(FakeSocket(resp))
    257         resp.begin()
    258         self.assertIsNotNone(resp.getheader('Public-Key-Pins'))
    259         self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
    260         self.assertEqual(resp.read(), b'body')
    261 
    262     def test_blank_line_forms(self):
    263         # Test that both CRLF and LF blank lines can terminate the header
    264         # section and start the body
    265         for blank in (b'\r\n', b'\n'):
    266             resp = b'HTTP/1.1 200 OK\r\n' b'Transfer-Encoding: chunked\r\n'
    267             resp += blank
    268             resp += b'4\r\nbody\r\n0\r\n\r\n'
    269             resp = httplib.HTTPResponse(FakeSocket(resp))
    270             resp.begin()
    271             self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
    272             self.assertEqual(resp.read(), b'body')
    273 
    274             resp = b'HTTP/1.0 200 OK\r\n' + blank + b'body'
    275             resp = httplib.HTTPResponse(FakeSocket(resp))
    276             resp.begin()
    277             self.assertEqual(resp.read(), b'body')
    278 
    279         # A blank line ending in CR is not treated as the end of the HTTP
    280         # header section, therefore header fields following it should be
    281         # parsed if possible
    282         resp = (
    283             b'HTTP/1.1 200 OK\r\n'
    284             b'\r'
    285             b'Name: value\r\n'
    286             b'Transfer-Encoding: chunked\r\n'
    287             b'\r\n'
    288             b'4\r\nbody\r\n0\r\n\r\n'
    289         )
    290         resp = httplib.HTTPResponse(FakeSocket(resp))
    291         resp.begin()
    292         self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
    293         self.assertEqual(resp.read(), b'body')
    294 
    295         # No header fields nor blank line
    296         resp = b'HTTP/1.0 200 OK\r\n'
    297         resp = httplib.HTTPResponse(FakeSocket(resp))
    298         resp.begin()
    299         self.assertEqual(resp.read(), b'')
    300 
    301     def test_from_line(self):
    302         # The parser handles "From" lines specially, so test this does not
    303         # affect parsing the rest of the header section
    304         resp = (
    305             b'HTTP/1.1 200 OK\r\n'
    306             b'From start\r\n'
    307             b' continued\r\n'
    308             b'Name: value\r\n'
    309             b'From middle\r\n'
    310             b' continued\r\n'
    311             b'Transfer-Encoding: chunked\r\n'
    312             b'From end\r\n'
    313             b'\r\n'
    314             b'4\r\nbody\r\n0\r\n\r\n'
    315         )
    316         resp = httplib.HTTPResponse(FakeSocket(resp))
    317         resp.begin()
    318         self.assertIsNotNone(resp.getheader('Name'))
    319         self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
    320         self.assertEqual(resp.read(), b'body')
    321 
    322         resp = (
    323             b'HTTP/1.0 200 OK\r\n'
    324             b'From alone\r\n'
    325             b'\r\n'
    326             b'body'
    327         )
    328         resp = httplib.HTTPResponse(FakeSocket(resp))
    329         resp.begin()
    330         self.assertEqual(resp.read(), b'body')
    331 
    332     def test_parse_all_octets(self):
    333         # Ensure no valid header field octet breaks the parser
    334         body = (
    335             b'HTTP/1.1 200 OK\r\n'
    336             b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
    337             b'VCHAR: ' + bytearray(range(0x21, 0x7E + 1)) + b'\r\n'
    338             b'obs-text: ' + bytearray(range(0x80, 0xFF + 1)) + b'\r\n'
    339             b'obs-fold: text\r\n'
    340             b' folded with space\r\n'
    341             b'\tfolded with tab\r\n'
    342             b'Content-Length: 0\r\n'
    343             b'\r\n'
    344         )
    345         sock = FakeSocket(body)
    346         resp = httplib.HTTPResponse(sock)
    347         resp.begin()
    348         self.assertEqual(resp.getheader('Content-Length'), '0')
    349         self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
    350         vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
    351         self.assertEqual(resp.getheader('VCHAR'), vchar)
    352         self.assertIsNotNone(resp.getheader('obs-text'))
    353         folded = resp.getheader('obs-fold')
    354         self.assertTrue(folded.startswith('text'))
    355         self.assertIn(' folded with space', folded)
    356         self.assertTrue(folded.endswith('folded with tab'))
    357 
    358     def test_invalid_headers(self):
    359         conn = httplib.HTTPConnection('example.com')
    360         conn.sock = FakeSocket('')
    361         conn.putrequest('GET', '/')
    362 
    363         # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
    364         # longer allowed in header names
    365         cases = (
    366             (b'Invalid\r\nName', b'ValidValue'),
    367             (b'Invalid\rName', b'ValidValue'),
    368             (b'Invalid\nName', b'ValidValue'),
    369             (b'\r\nInvalidName', b'ValidValue'),
    370             (b'\rInvalidName', b'ValidValue'),
    371             (b'\nInvalidName', b'ValidValue'),
    372             (b' InvalidName', b'ValidValue'),
    373             (b'\tInvalidName', b'ValidValue'),
    374             (b'Invalid:Name', b'ValidValue'),
    375             (b':InvalidName', b'ValidValue'),
    376             (b'ValidName', b'Invalid\r\nValue'),
    377             (b'ValidName', b'Invalid\rValue'),
    378             (b'ValidName', b'Invalid\nValue'),
    379             (b'ValidName', b'InvalidValue\r\n'),
    380             (b'ValidName', b'InvalidValue\r'),
    381             (b'ValidName', b'InvalidValue\n'),
    382         )
    383         for name, value in cases:
    384             with self.assertRaisesRegexp(ValueError, 'Invalid header'):
    385                 conn.putheader(name, value)
    386 
    387 
    388 class BasicTest(TestCase):
    389     def test_status_lines(self):
    390         # Test HTTP status lines
    391 
    392         body = "HTTP/1.1 200 Ok\r\n\r\nText"
    393         sock = FakeSocket(body)
    394         resp = httplib.HTTPResponse(sock)
    395         resp.begin()
    396         self.assertEqual(resp.read(0), '')  # Issue #20007
    397         self.assertFalse(resp.isclosed())
    398         self.assertEqual(resp.read(), 'Text')
    399         self.assertTrue(resp.isclosed())
    400 
    401         body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
    402         sock = FakeSocket(body)
    403         resp = httplib.HTTPResponse(sock)
    404         self.assertRaises(httplib.BadStatusLine, resp.begin)
    405 
    406     def test_bad_status_repr(self):
    407         exc = httplib.BadStatusLine('')
    408         self.assertEqual(repr(exc), '''BadStatusLine("\'\'",)''')
    409 
    410     def test_partial_reads(self):
    411         # if we have a length, the system knows when to close itself
    412         # same behaviour than when we read the whole thing with read()
    413         body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
    414         sock = FakeSocket(body)
    415         resp = httplib.HTTPResponse(sock)
    416         resp.begin()
    417         self.assertEqual(resp.read(2), 'Te')
    418         self.assertFalse(resp.isclosed())
    419         self.assertEqual(resp.read(2), 'xt')
    420         self.assertTrue(resp.isclosed())
    421 
    422     def test_partial_reads_no_content_length(self):
    423         # when no length is present, the socket should be gracefully closed when
    424         # all data was read
    425         body = "HTTP/1.1 200 Ok\r\n\r\nText"
    426         sock = FakeSocket(body)
    427         resp = httplib.HTTPResponse(sock)
    428         resp.begin()
    429         self.assertEqual(resp.read(2), 'Te')
    430         self.assertFalse(resp.isclosed())
    431         self.assertEqual(resp.read(2), 'xt')
    432         self.assertEqual(resp.read(1), '')
    433         self.assertTrue(resp.isclosed())
    434 
    435     def test_partial_reads_incomplete_body(self):
    436         # if the server shuts down the connection before the whole
    437         # content-length is delivered, the socket is gracefully closed
    438         body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
    439         sock = FakeSocket(body)
    440         resp = httplib.HTTPResponse(sock)
    441         resp.begin()
    442         self.assertEqual(resp.read(2), 'Te')
    443         self.assertFalse(resp.isclosed())
    444         self.assertEqual(resp.read(2), 'xt')
    445         self.assertEqual(resp.read(1), '')
    446         self.assertTrue(resp.isclosed())
    447 
    448     def test_host_port(self):
    449         # Check invalid host_port
    450 
    451         # Note that httplib does not accept user:password@ in the host-port.
    452         for hp in ("www.python.org:abc", "user:password (at] www.python.org"):
    453             self.assertRaises(httplib.InvalidURL, httplib.HTTP, hp)
    454 
    455         for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b",
    456                           8000),
    457                          ("www.python.org:80", "www.python.org", 80),
    458                          ("www.python.org", "www.python.org", 80),
    459                          ("www.python.org:", "www.python.org", 80),
    460                          ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)):
    461             http = httplib.HTTP(hp)
    462             c = http._conn
    463             if h != c.host:
    464                 self.fail("Host incorrectly parsed: %s != %s" % (h, c.host))
    465             if p != c.port:
    466                 self.fail("Port incorrectly parsed: %s != %s" % (p, c.host))
    467 
    468     def test_response_headers(self):
    469         # test response with multiple message headers with the same field name.
    470         text = ('HTTP/1.1 200 OK\r\n'
    471                 'Set-Cookie: Customer="WILE_E_COYOTE";'
    472                 ' Version="1"; Path="/acme"\r\n'
    473                 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
    474                 ' Path="/acme"\r\n'
    475                 '\r\n'
    476                 'No body\r\n')
    477         hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
    478                ', '
    479                'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
    480         s = FakeSocket(text)
    481         r = httplib.HTTPResponse(s)
    482         r.begin()
    483         cookies = r.getheader("Set-Cookie")
    484         if cookies != hdr:
    485             self.fail("multiple headers not combined properly")
    486 
    487     def test_read_head(self):
    488         # Test that the library doesn't attempt to read any data
    489         # from a HEAD request.  (Tickles SF bug #622042.)
    490         sock = FakeSocket(
    491             'HTTP/1.1 200 OK\r\n'
    492             'Content-Length: 14432\r\n'
    493             '\r\n',
    494             NoEOFStringIO)
    495         resp = httplib.HTTPResponse(sock, method="HEAD")
    496         resp.begin()
    497         if resp.read() != "":
    498             self.fail("Did not expect response from HEAD request")
    499 
    500     def test_too_many_headers(self):
    501         headers = '\r\n'.join('Header%d: foo' % i for i in xrange(200)) + '\r\n'
    502         text = ('HTTP/1.1 200 OK\r\n' + headers)
    503         s = FakeSocket(text)
    504         r = httplib.HTTPResponse(s)
    505         self.assertRaises(httplib.HTTPException, r.begin)
    506 
    507     def test_send_file(self):
    508         expected = 'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
    509                    'Accept-Encoding: identity\r\nContent-Length:'
    510 
    511         body = open(__file__, 'rb')
    512         conn = httplib.HTTPConnection('example.com')
    513         sock = FakeSocket(body)
    514         conn.sock = sock
    515         conn.request('GET', '/foo', body)
    516         self.assertTrue(sock.data.startswith(expected))
    517         self.assertIn('def test_send_file', sock.data)
    518 
    519     def test_send_tempfile(self):
    520         expected = ('GET /foo HTTP/1.1\r\nHost: example.com\r\n'
    521                     'Accept-Encoding: identity\r\nContent-Length: 9\r\n\r\n'
    522                     'fake\ndata')
    523 
    524         with tempfile.TemporaryFile() as body:
    525             body.write('fake\ndata')
    526             body.seek(0)
    527 
    528             conn = httplib.HTTPConnection('example.com')
    529             sock = FakeSocket(body)
    530             conn.sock = sock
    531             conn.request('GET', '/foo', body)
    532         self.assertEqual(sock.data, expected)
    533 
    534     def test_send(self):
    535         expected = 'this is a test this is only a test'
    536         conn = httplib.HTTPConnection('example.com')
    537         sock = FakeSocket(None)
    538         conn.sock = sock
    539         conn.send(expected)
    540         self.assertEqual(expected, sock.data)
    541         sock.data = ''
    542         conn.send(array.array('c', expected))
    543         self.assertEqual(expected, sock.data)
    544         sock.data = ''
    545         conn.send(StringIO.StringIO(expected))
    546         self.assertEqual(expected, sock.data)
    547 
    548     def test_chunked(self):
    549         chunked_start = (
    550             'HTTP/1.1 200 OK\r\n'
    551             'Transfer-Encoding: chunked\r\n\r\n'
    552             'a\r\n'
    553             'hello worl\r\n'
    554             '1\r\n'
    555             'd\r\n'
    556         )
    557         sock = FakeSocket(chunked_start + '0\r\n')
    558         resp = httplib.HTTPResponse(sock, method="GET")
    559         resp.begin()
    560         self.assertEqual(resp.read(), 'hello world')
    561         resp.close()
    562 
    563         for x in ('', 'foo\r\n'):
    564             sock = FakeSocket(chunked_start + x)
    565             resp = httplib.HTTPResponse(sock, method="GET")
    566             resp.begin()
    567             try:
    568                 resp.read()
    569             except httplib.IncompleteRead, i:
    570                 self.assertEqual(i.partial, 'hello world')
    571                 self.assertEqual(repr(i),'IncompleteRead(11 bytes read)')
    572                 self.assertEqual(str(i),'IncompleteRead(11 bytes read)')
    573             else:
    574                 self.fail('IncompleteRead expected')
    575             finally:
    576                 resp.close()
    577 
    578     def test_chunked_head(self):
    579         chunked_start = (
    580             'HTTP/1.1 200 OK\r\n'
    581             'Transfer-Encoding: chunked\r\n\r\n'
    582             'a\r\n'
    583             'hello world\r\n'
    584             '1\r\n'
    585             'd\r\n'
    586         )
    587         sock = FakeSocket(chunked_start + '0\r\n')
    588         resp = httplib.HTTPResponse(sock, method="HEAD")
    589         resp.begin()
    590         self.assertEqual(resp.read(), '')
    591         self.assertEqual(resp.status, 200)
    592         self.assertEqual(resp.reason, 'OK')
    593         self.assertTrue(resp.isclosed())
    594 
    595     def test_negative_content_length(self):
    596         sock = FakeSocket('HTTP/1.1 200 OK\r\n'
    597                           'Content-Length: -1\r\n\r\nHello\r\n')
    598         resp = httplib.HTTPResponse(sock, method="GET")
    599         resp.begin()
    600         self.assertEqual(resp.read(), 'Hello\r\n')
    601         self.assertTrue(resp.isclosed())
    602 
    603     def test_incomplete_read(self):
    604         sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
    605         resp = httplib.HTTPResponse(sock, method="GET")
    606         resp.begin()
    607         try:
    608             resp.read()
    609         except httplib.IncompleteRead as i:
    610             self.assertEqual(i.partial, 'Hello\r\n')
    611             self.assertEqual(repr(i),
    612                              "IncompleteRead(7 bytes read, 3 more expected)")
    613             self.assertEqual(str(i),
    614                              "IncompleteRead(7 bytes read, 3 more expected)")
    615             self.assertTrue(resp.isclosed())
    616         else:
    617             self.fail('IncompleteRead expected')
    618 
    619     def test_epipe(self):
    620         sock = EPipeSocket(
    621             "HTTP/1.0 401 Authorization Required\r\n"
    622             "Content-type: text/html\r\n"
    623             "WWW-Authenticate: Basic realm=\"example\"\r\n",
    624             b"Content-Length")
    625         conn = httplib.HTTPConnection("example.com")
    626         conn.sock = sock
    627         self.assertRaises(socket.error,
    628                           lambda: conn.request("PUT", "/url", "body"))
    629         resp = conn.getresponse()
    630         self.assertEqual(401, resp.status)
    631         self.assertEqual("Basic realm=\"example\"",
    632                          resp.getheader("www-authenticate"))
    633 
    634     def test_filenoattr(self):
    635         # Just test the fileno attribute in the HTTPResponse Object.
    636         body = "HTTP/1.1 200 Ok\r\n\r\nText"
    637         sock = FakeSocket(body)
    638         resp = httplib.HTTPResponse(sock)
    639         self.assertTrue(hasattr(resp,'fileno'),
    640                 'HTTPResponse should expose a fileno attribute')
    641 
    642     # Test lines overflowing the max line size (_MAXLINE in httplib)
    643 
    644     def test_overflowing_status_line(self):
    645         self.skipTest("disabled for HTTP 0.9 support")
    646         body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
    647         resp = httplib.HTTPResponse(FakeSocket(body))
    648         self.assertRaises((httplib.LineTooLong, httplib.BadStatusLine), resp.begin)
    649 
    650     def test_overflowing_header_line(self):
    651         body = (
    652             'HTTP/1.1 200 OK\r\n'
    653             'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
    654         )
    655         resp = httplib.HTTPResponse(FakeSocket(body))
    656         self.assertRaises(httplib.LineTooLong, resp.begin)
    657 
    658     def test_overflowing_chunked_line(self):
    659         body = (
    660             'HTTP/1.1 200 OK\r\n'
    661             'Transfer-Encoding: chunked\r\n\r\n'
    662             + '0' * 65536 + 'a\r\n'
    663             'hello world\r\n'
    664             '0\r\n'
    665         )
    666         resp = httplib.HTTPResponse(FakeSocket(body))
    667         resp.begin()
    668         self.assertRaises(httplib.LineTooLong, resp.read)
    669 
    670     def test_early_eof(self):
    671         # Test httpresponse with no \r\n termination,
    672         body = "HTTP/1.1 200 Ok"
    673         sock = FakeSocket(body)
    674         resp = httplib.HTTPResponse(sock)
    675         resp.begin()
    676         self.assertEqual(resp.read(), '')
    677         self.assertTrue(resp.isclosed())
    678 
    679     def test_error_leak(self):
    680         # Test that the socket is not leaked if getresponse() fails
    681         conn = httplib.HTTPConnection('example.com')
    682         response = []
    683         class Response(httplib.HTTPResponse):
    684             def __init__(self, *pos, **kw):
    685                 response.append(self)  # Avoid garbage collector closing the socket
    686                 httplib.HTTPResponse.__init__(self, *pos, **kw)
    687         conn.response_class = Response
    688         conn.sock = FakeSocket('')  # Emulate server dropping connection
    689         conn.request('GET', '/')
    690         self.assertRaises(httplib.BadStatusLine, conn.getresponse)
    691         self.assertTrue(response)
    692         #self.assertTrue(response[0].closed)
    693         self.assertTrue(conn.sock.file_closed)
    694 
    695     def test_proxy_tunnel_without_status_line(self):
    696         # Issue 17849: If a proxy tunnel is created that does not return
    697         # a status code, fail.
    698         body = 'hello world'
    699         conn = httplib.HTTPConnection('example.com', strict=False)
    700         conn.set_tunnel('foo')
    701         conn.sock = FakeSocket(body)
    702         with self.assertRaisesRegexp(socket.error, "Invalid response"):
    703             conn._tunnel()
    704 
    705 class OfflineTest(TestCase):
    706     def test_responses(self):
    707         self.assertEqual(httplib.responses[httplib.NOT_FOUND], "Not Found")
    708 
    709 
    710 class TestServerMixin:
    711     """A limited socket server mixin.
    712 
    713     This is used by test cases for testing http connection end points.
    714     """
    715     def setUp(self):
    716         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    717         self.port = test_support.bind_port(self.serv)
    718         self.source_port = test_support.find_unused_port()
    719         self.serv.listen(5)
    720         self.conn = None
    721 
    722     def tearDown(self):
    723         if self.conn:
    724             self.conn.close()
    725             self.conn = None
    726         self.serv.close()
    727         self.serv = None
    728 
    729 class SourceAddressTest(TestServerMixin, TestCase):
    730     def testHTTPConnectionSourceAddress(self):
    731         self.conn = httplib.HTTPConnection(HOST, self.port,
    732                 source_address=('', self.source_port))
    733         self.conn.connect()
    734         self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
    735 
    736     @unittest.skipIf(not hasattr(httplib, 'HTTPSConnection'),
    737                      'httplib.HTTPSConnection not defined')
    738     def testHTTPSConnectionSourceAddress(self):
    739         self.conn = httplib.HTTPSConnection(HOST, self.port,
    740                 source_address=('', self.source_port))
    741         # We don't test anything here other than the constructor not barfing as
    742         # this code doesn't deal with setting up an active running SSL server
    743         # for an ssl_wrapped connect() to actually return from.
    744 
    745 
    746 class HTTPTest(TestServerMixin, TestCase):
    747     def testHTTPConnection(self):
    748         self.conn = httplib.HTTP(host=HOST, port=self.port, strict=None)
    749         self.conn.connect()
    750         self.assertEqual(self.conn._conn.host, HOST)
    751         self.assertEqual(self.conn._conn.port, self.port)
    752 
    753     def testHTTPWithConnectHostPort(self):
    754         testhost = 'unreachable.test.domain'
    755         testport = '80'
    756         self.conn = httplib.HTTP(host=testhost, port=testport)
    757         self.conn.connect(host=HOST, port=self.port)
    758         self.assertNotEqual(self.conn._conn.host, testhost)
    759         self.assertNotEqual(self.conn._conn.port, testport)
    760         self.assertEqual(self.conn._conn.host, HOST)
    761         self.assertEqual(self.conn._conn.port, self.port)
    762 
    763 
    764 class TimeoutTest(TestCase):
    765     PORT = None
    766 
    767     def setUp(self):
    768         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    769         TimeoutTest.PORT = test_support.bind_port(self.serv)
    770         self.serv.listen(5)
    771 
    772     def tearDown(self):
    773         self.serv.close()
    774         self.serv = None
    775 
    776     def testTimeoutAttribute(self):
    777         '''This will prove that the timeout gets through
    778         HTTPConnection and into the socket.
    779         '''
    780         # default -- use global socket timeout
    781         self.assertIsNone(socket.getdefaulttimeout())
    782         socket.setdefaulttimeout(30)
    783         try:
    784             httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT)
    785             httpConn.connect()
    786         finally:
    787             socket.setdefaulttimeout(None)
    788         self.assertEqual(httpConn.sock.gettimeout(), 30)
    789         httpConn.close()
    790 
    791         # no timeout -- do not use global socket default
    792         self.assertIsNone(socket.getdefaulttimeout())
    793         socket.setdefaulttimeout(30)
    794         try:
    795             httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT,
    796                                               timeout=None)
    797             httpConn.connect()
    798         finally:
    799             socket.setdefaulttimeout(None)
    800         self.assertEqual(httpConn.sock.gettimeout(), None)
    801         httpConn.close()
    802 
    803         # a value
    804         httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
    805         httpConn.connect()
    806         self.assertEqual(httpConn.sock.gettimeout(), 30)
    807         httpConn.close()
    808 
    809 
    810 class HTTPSTest(TestCase):
    811 
    812     def setUp(self):
    813         if not hasattr(httplib, 'HTTPSConnection'):
    814             self.skipTest('ssl support required')
    815 
    816     def make_server(self, certfile):
    817         from test.ssl_servers import make_https_server
    818         return make_https_server(self, certfile=certfile)
    819 
    820     def test_attributes(self):
    821         # simple test to check it's storing the timeout
    822         h = httplib.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
    823         self.assertEqual(h.timeout, 30)
    824 
    825     def test_networked(self):
    826         # Default settings: requires a valid cert from a trusted CA
    827         import ssl
    828         test_support.requires('network')
    829         with test_support.transient_internet('self-signed.pythontest.net'):
    830             h = httplib.HTTPSConnection('self-signed.pythontest.net', 443)
    831             with self.assertRaises(ssl.SSLError) as exc_info:
    832                 h.request('GET', '/')
    833             self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
    834 
    835     def test_networked_noverification(self):
    836         # Switch off cert verification
    837         import ssl
    838         test_support.requires('network')
    839         with test_support.transient_internet('self-signed.pythontest.net'):
    840             context = ssl._create_stdlib_context()
    841             h = httplib.HTTPSConnection('self-signed.pythontest.net', 443,
    842                                         context=context)
    843             h.request('GET', '/')
    844             resp = h.getresponse()
    845             self.assertIn('nginx', resp.getheader('server'))
    846 
    847     @test_support.system_must_validate_cert
    848     def test_networked_trusted_by_default_cert(self):
    849         # Default settings: requires a valid cert from a trusted CA
    850         test_support.requires('network')
    851         with test_support.transient_internet('www.python.org'):
    852             h = httplib.HTTPSConnection('www.python.org', 443)
    853             h.request('GET', '/')
    854             resp = h.getresponse()
    855             content_type = resp.getheader('content-type')
    856             self.assertIn('text/html', content_type)
    857 
    858     def test_networked_good_cert(self):
    859         # We feed the server's cert as a validating cert
    860         import ssl
    861         test_support.requires('network')
    862         with test_support.transient_internet('self-signed.pythontest.net'):
    863             context = ssl.SSLContext(ssl.PROTOCOL_TLS)
    864             context.verify_mode = ssl.CERT_REQUIRED
    865             context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
    866             h = httplib.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
    867             h.request('GET', '/')
    868             resp = h.getresponse()
    869             server_string = resp.getheader('server')
    870             self.assertIn('nginx', server_string)
    871 
    872     def test_networked_bad_cert(self):
    873         # We feed a "CA" cert that is unrelated to the server's cert
    874         import ssl
    875         test_support.requires('network')
    876         with test_support.transient_internet('self-signed.pythontest.net'):
    877             context = ssl.SSLContext(ssl.PROTOCOL_TLS)
    878             context.verify_mode = ssl.CERT_REQUIRED
    879             context.load_verify_locations(CERT_localhost)
    880             h = httplib.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
    881             with self.assertRaises(ssl.SSLError) as exc_info:
    882                 h.request('GET', '/')
    883             self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
    884 
    885     def test_local_unknown_cert(self):
    886         # The custom cert isn't known to the default trust bundle
    887         import ssl
    888         server = self.make_server(CERT_localhost)
    889         h = httplib.HTTPSConnection('localhost', server.port)
    890         with self.assertRaises(ssl.SSLError) as exc_info:
    891             h.request('GET', '/')
    892         self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
    893 
    894     def test_local_good_hostname(self):
    895         # The (valid) cert validates the HTTP hostname
    896         import ssl
    897         server = self.make_server(CERT_localhost)
    898         context = ssl.SSLContext(ssl.PROTOCOL_TLS)
    899         context.verify_mode = ssl.CERT_REQUIRED
    900         context.load_verify_locations(CERT_localhost)
    901         h = httplib.HTTPSConnection('localhost', server.port, context=context)
    902         h.request('GET', '/nonexistent')
    903         resp = h.getresponse()
    904         self.assertEqual(resp.status, 404)
    905 
    906     def test_local_bad_hostname(self):
    907         # The (valid) cert doesn't validate the HTTP hostname
    908         import ssl
    909         server = self.make_server(CERT_fakehostname)
    910         context = ssl.SSLContext(ssl.PROTOCOL_TLS)
    911         context.verify_mode = ssl.CERT_REQUIRED
    912         context.check_hostname = True
    913         context.load_verify_locations(CERT_fakehostname)
    914         h = httplib.HTTPSConnection('localhost', server.port, context=context)
    915         with self.assertRaises(ssl.CertificateError):
    916             h.request('GET', '/')
    917         h.close()
    918         # With context.check_hostname=False, the mismatching is ignored
    919         context.check_hostname = False
    920         h = httplib.HTTPSConnection('localhost', server.port, context=context)
    921         h.request('GET', '/nonexistent')
    922         resp = h.getresponse()
    923         self.assertEqual(resp.status, 404)
    924 
    925     def test_host_port(self):
    926         # Check invalid host_port
    927 
    928         for hp in ("www.python.org:abc", "user:password (at] www.python.org"):
    929             self.assertRaises(httplib.InvalidURL, httplib.HTTPSConnection, hp)
    930 
    931         for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
    932                           "fe80::207:e9ff:fe9b", 8000),
    933                          ("www.python.org:443", "www.python.org", 443),
    934                          ("www.python.org:", "www.python.org", 443),
    935                          ("www.python.org", "www.python.org", 443),
    936                          ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
    937                          ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
    938                              443)):
    939             c = httplib.HTTPSConnection(hp)
    940             self.assertEqual(h, c.host)
    941             self.assertEqual(p, c.port)
    942 
    943 
    944 class TunnelTests(TestCase):
    945     def test_connect(self):
    946         response_text = (
    947             'HTTP/1.0 200 OK\r\n\r\n'   # Reply to CONNECT
    948             'HTTP/1.1 200 OK\r\n'       # Reply to HEAD
    949             'Content-Length: 42\r\n\r\n'
    950         )
    951 
    952         def create_connection(address, timeout=None, source_address=None):
    953             return FakeSocket(response_text, host=address[0], port=address[1])
    954 
    955         conn = httplib.HTTPConnection('proxy.com')
    956         conn._create_connection = create_connection
    957 
    958         # Once connected, we should not be able to tunnel anymore
    959         conn.connect()
    960         self.assertRaises(RuntimeError, conn.set_tunnel, 'destination.com')
    961 
    962         # But if close the connection, we are good.
    963         conn.close()
    964         conn.set_tunnel('destination.com')
    965         conn.request('HEAD', '/', '')
    966 
    967         self.assertEqual(conn.sock.host, 'proxy.com')
    968         self.assertEqual(conn.sock.port, 80)
    969         self.assertIn('CONNECT destination.com', conn.sock.data)
    970         # issue22095
    971         self.assertNotIn('Host: destination.com:None', conn.sock.data)
    972         self.assertIn('Host: destination.com', conn.sock.data)
    973 
    974         self.assertNotIn('Host: proxy.com', conn.sock.data)
    975 
    976         conn.close()
    977 
    978         conn.request('PUT', '/', '')
    979         self.assertEqual(conn.sock.host, 'proxy.com')
    980         self.assertEqual(conn.sock.port, 80)
    981         self.assertTrue('CONNECT destination.com' in conn.sock.data)
    982         self.assertTrue('Host: destination.com' in conn.sock.data)
    983 
    984 
    985 @test_support.reap_threads
    986 def test_main(verbose=None):
    987     test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
    988                               HTTPTest, HTTPSTest, SourceAddressTest,
    989                               TunnelTests)
    990 
    991 if __name__ == '__main__':
    992     test_main()
    993