Home | History | Annotate | Download | only in test
      1 # -*- coding: utf-8 -*-
      2 # Test the support for SSL and sockets
      3 
      4 import sys
      5 import unittest
      6 from test import test_support as support
      7 from test.script_helper import assert_python_ok
      8 import asyncore
      9 import socket
     10 import select
     11 import time
     12 import datetime
     13 import gc
     14 import os
     15 import errno
     16 import pprint
     17 import tempfile
     18 import urllib2
     19 import traceback
     20 import weakref
     21 import platform
     22 import functools
     23 from contextlib import closing
     24 
     25 ssl = support.import_module("ssl")
     26 
     27 PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
     28 HOST = support.HOST
     29 IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
     30 IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
     31 
     32 
     33 def data_file(*name):
     34     return os.path.join(os.path.dirname(__file__), *name)
     35 
     36 # The custom key and certificate files used in test_ssl are generated
     37 # using Lib/test/make_ssl_certs.py.
     38 # Other certificates are simply fetched from the Internet servers they
     39 # are meant to authenticate.
     40 
     41 CERTFILE = data_file("keycert.pem")
     42 BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
     43 ONLYCERT = data_file("ssl_cert.pem")
     44 ONLYKEY = data_file("ssl_key.pem")
     45 BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
     46 BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
     47 CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
     48 ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
     49 KEY_PASSWORD = "somepass"
     50 CAPATH = data_file("capath")
     51 BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
     52 CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
     53 CAFILE_CACERT = data_file("capath", "5ed36f99.0")
     54 
     55 
     56 # empty CRL
     57 CRLFILE = data_file("revocation.crl")
     58 
     59 # Two keys and certs signed by the same CA (for SNI tests)
     60 SIGNED_CERTFILE = data_file("keycert3.pem")
     61 SIGNED_CERTFILE2 = data_file("keycert4.pem")
     62 SIGNING_CA = data_file("pycacert.pem")
     63 # cert with all kinds of subject alt names
     64 ALLSANFILE = data_file("allsans.pem")
     65 
     66 REMOTE_HOST = "self-signed.pythontest.net"
     67 REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem")
     68 
     69 EMPTYCERT = data_file("nullcert.pem")
     70 BADCERT = data_file("badcert.pem")
     71 NONEXISTINGCERT = data_file("XXXnonexisting.pem")
     72 BADKEY = data_file("badkey.pem")
     73 NOKIACERT = data_file("nokia.pem")
     74 NULLBYTECERT = data_file("nullbytecert.pem")
     75 
     76 DHFILE = data_file("dh1024.pem")
     77 BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
     78 
     79 
     80 def handle_error(prefix):
     81     exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
     82     if support.verbose:
     83         sys.stdout.write(prefix + exc_format)
     84 
     85 
     86 class BasicTests(unittest.TestCase):
     87 
     88     def test_sslwrap_simple(self):
     89         # A crude test for the legacy API
     90         try:
     91             ssl.sslwrap_simple(socket.socket(socket.AF_INET))
     92         except IOError, e:
     93             if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
     94                 pass
     95             else:
     96                 raise
     97         try:
     98             ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
     99         except IOError, e:
    100             if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
    101                 pass
    102             else:
    103                 raise
    104 
    105 
    106 def can_clear_options():
    107     # 0.9.8m or higher
    108     return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
    109 
    110 def no_sslv2_implies_sslv3_hello():
    111     # 0.9.7h or higher
    112     return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
    113 
    114 def have_verify_flags():
    115     # 0.9.8 or higher
    116     return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
    117 
    118 def utc_offset(): #NOTE: ignore issues like #1647654
    119     # local time = utc time + utc offset
    120     if time.daylight and time.localtime().tm_isdst > 0:
    121         return -time.altzone  # seconds
    122     return -time.timezone
    123 
    124 def asn1time(cert_time):
    125     # Some versions of OpenSSL ignore seconds, see #18207
    126     # 0.9.8.i
    127     if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
    128         fmt = "%b %d %H:%M:%S %Y GMT"
    129         dt = datetime.datetime.strptime(cert_time, fmt)
    130         dt = dt.replace(second=0)
    131         cert_time = dt.strftime(fmt)
    132         # %d adds leading zero but ASN1_TIME_print() uses leading space
    133         if cert_time[4] == "0":
    134             cert_time = cert_time[:4] + " " + cert_time[5:]
    135 
    136     return cert_time
    137 
    138 # Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
    139 def skip_if_broken_ubuntu_ssl(func):
    140     if hasattr(ssl, 'PROTOCOL_SSLv2'):
    141         @functools.wraps(func)
    142         def f(*args, **kwargs):
    143             try:
    144                 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
    145             except ssl.SSLError:
    146                 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
    147                     platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
    148                     raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
    149             return func(*args, **kwargs)
    150         return f
    151     else:
    152         return func
    153 
    154 needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
    155 
    156 
    157 class BasicSocketTests(unittest.TestCase):
    158 
    159     def test_constants(self):
    160         ssl.CERT_NONE
    161         ssl.CERT_OPTIONAL
    162         ssl.CERT_REQUIRED
    163         ssl.OP_CIPHER_SERVER_PREFERENCE
    164         ssl.OP_SINGLE_DH_USE
    165         if ssl.HAS_ECDH:
    166             ssl.OP_SINGLE_ECDH_USE
    167         if ssl.OPENSSL_VERSION_INFO >= (1, 0):
    168             ssl.OP_NO_COMPRESSION
    169         self.assertIn(ssl.HAS_SNI, {True, False})
    170         self.assertIn(ssl.HAS_ECDH, {True, False})
    171 
    172     def test_random(self):
    173         v = ssl.RAND_status()
    174         if support.verbose:
    175             sys.stdout.write("\n RAND_status is %d (%s)\n"
    176                              % (v, (v and "sufficient randomness") or
    177                                 "insufficient randomness"))
    178         if hasattr(ssl, 'RAND_egd'):
    179             self.assertRaises(TypeError, ssl.RAND_egd, 1)
    180             self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
    181         ssl.RAND_add("this is a random string", 75.0)
    182 
    183     def test_parse_cert(self):
    184         # note that this uses an 'unofficial' function in _ssl.c,
    185         # provided solely for this test, to exercise the certificate
    186         # parsing code
    187         p = ssl._ssl._test_decode_cert(CERTFILE)
    188         if support.verbose:
    189             sys.stdout.write("\n" + pprint.pformat(p) + "\n")
    190         self.assertEqual(p['issuer'],
    191                          ((('countryName', 'XY'),),
    192                           (('localityName', 'Castle Anthrax'),),
    193                           (('organizationName', 'Python Software Foundation'),),
    194                           (('commonName', 'localhost'),))
    195                         )
    196         # Note the next three asserts will fail if the keys are regenerated
    197         self.assertEqual(p['notAfter'], asn1time('Oct  5 23:01:56 2020 GMT'))
    198         self.assertEqual(p['notBefore'], asn1time('Oct  8 23:01:56 2010 GMT'))
    199         self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
    200         self.assertEqual(p['subject'],
    201                          ((('countryName', 'XY'),),
    202                           (('localityName', 'Castle Anthrax'),),
    203                           (('organizationName', 'Python Software Foundation'),),
    204                           (('commonName', 'localhost'),))
    205                         )
    206         self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
    207         # Issue #13034: the subjectAltName in some certificates
    208         # (notably projects.developer.nokia.com:443) wasn't parsed
    209         p = ssl._ssl._test_decode_cert(NOKIACERT)
    210         if support.verbose:
    211             sys.stdout.write("\n" + pprint.pformat(p) + "\n")
    212         self.assertEqual(p['subjectAltName'],
    213                          (('DNS', 'projects.developer.nokia.com'),
    214                           ('DNS', 'projects.forum.nokia.com'))
    215                         )
    216         # extra OCSP and AIA fields
    217         self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
    218         self.assertEqual(p['caIssuers'],
    219                          ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
    220         self.assertEqual(p['crlDistributionPoints'],
    221                          ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
    222 
    223     def test_parse_cert_CVE_2013_4238(self):
    224         p = ssl._ssl._test_decode_cert(NULLBYTECERT)
    225         if support.verbose:
    226             sys.stdout.write("\n" + pprint.pformat(p) + "\n")
    227         subject = ((('countryName', 'US'),),
    228                    (('stateOrProvinceName', 'Oregon'),),
    229                    (('localityName', 'Beaverton'),),
    230                    (('organizationName', 'Python Software Foundation'),),
    231                    (('organizationalUnitName', 'Python Core Development'),),
    232                    (('commonName', 'null.python.org\x00example.org'),),
    233                    (('emailAddress', 'python-dev (at] python.org'),))
    234         self.assertEqual(p['subject'], subject)
    235         self.assertEqual(p['issuer'], subject)
    236         if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
    237             san = (('DNS', 'altnull.python.org\x00example.com'),
    238                    ('email', 'null (at] python.org\x00user (at] example.org'),
    239                    ('URI', 'http://null.python.org\x00http://example.org'),
    240                    ('IP Address', '192.0.2.1'),
    241                    ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
    242         else:
    243             # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
    244             san = (('DNS', 'altnull.python.org\x00example.com'),
    245                    ('email', 'null (at] python.org\x00user (at] example.org'),
    246                    ('URI', 'http://null.python.org\x00http://example.org'),
    247                    ('IP Address', '192.0.2.1'),
    248                    ('IP Address', '<invalid>'))
    249 
    250         self.assertEqual(p['subjectAltName'], san)
    251 
    252     def test_parse_all_sans(self):
    253         p = ssl._ssl._test_decode_cert(ALLSANFILE)
    254         self.assertEqual(p['subjectAltName'],
    255             (
    256                 ('DNS', 'allsans'),
    257                 ('othername', '<unsupported>'),
    258                 ('othername', '<unsupported>'),
    259                 ('email', 'user (at] example.org'),
    260                 ('DNS', 'www.example.org'),
    261                 ('DirName',
    262                     ((('countryName', 'XY'),),
    263                     (('localityName', 'Castle Anthrax'),),
    264                     (('organizationName', 'Python Software Foundation'),),
    265                     (('commonName', 'dirname example'),))),
    266                 ('URI', 'https://www.python.org/'),
    267                 ('IP Address', '127.0.0.1'),
    268                 ('IP Address', '0:0:0:0:0:0:0:1\n'),
    269                 ('Registered ID', '1.2.3.4.5')
    270             )
    271         )
    272 
    273     def test_DER_to_PEM(self):
    274         with open(CAFILE_CACERT, 'r') as f:
    275             pem = f.read()
    276         d1 = ssl.PEM_cert_to_DER_cert(pem)
    277         p2 = ssl.DER_cert_to_PEM_cert(d1)
    278         d2 = ssl.PEM_cert_to_DER_cert(p2)
    279         self.assertEqual(d1, d2)
    280         if not p2.startswith(ssl.PEM_HEADER + '\n'):
    281             self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
    282         if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
    283             self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
    284 
    285     def test_openssl_version(self):
    286         n = ssl.OPENSSL_VERSION_NUMBER
    287         t = ssl.OPENSSL_VERSION_INFO
    288         s = ssl.OPENSSL_VERSION
    289         self.assertIsInstance(n, (int, long))
    290         self.assertIsInstance(t, tuple)
    291         self.assertIsInstance(s, str)
    292         # Some sanity checks follow
    293         # >= 0.9
    294         self.assertGreaterEqual(n, 0x900000)
    295         # < 3.0
    296         self.assertLess(n, 0x30000000)
    297         major, minor, fix, patch, status = t
    298         self.assertGreaterEqual(major, 0)
    299         self.assertLess(major, 3)
    300         self.assertGreaterEqual(minor, 0)
    301         self.assertLess(minor, 256)
    302         self.assertGreaterEqual(fix, 0)
    303         self.assertLess(fix, 256)
    304         self.assertGreaterEqual(patch, 0)
    305         self.assertLessEqual(patch, 63)
    306         self.assertGreaterEqual(status, 0)
    307         self.assertLessEqual(status, 15)
    308         # Version string as returned by {Open,Libre}SSL, the format might change
    309         if IS_LIBRESSL:
    310             self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
    311                             (s, t, hex(n)))
    312         else:
    313             self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
    314                             (s, t))
    315 
    316     @support.cpython_only
    317     def test_refcycle(self):
    318         # Issue #7943: an SSL object doesn't create reference cycles with
    319         # itself.
    320         s = socket.socket(socket.AF_INET)
    321         ss = ssl.wrap_socket(s)
    322         wr = weakref.ref(ss)
    323         del ss
    324         self.assertEqual(wr(), None)
    325 
    326     def test_wrapped_unconnected(self):
    327         # Methods on an unconnected SSLSocket propagate the original
    328         # socket.error raise by the underlying socket object.
    329         s = socket.socket(socket.AF_INET)
    330         with closing(ssl.wrap_socket(s)) as ss:
    331             self.assertRaises(socket.error, ss.recv, 1)
    332             self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
    333             self.assertRaises(socket.error, ss.recvfrom, 1)
    334             self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
    335             self.assertRaises(socket.error, ss.send, b'x')
    336             self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
    337 
    338     def test_timeout(self):
    339         # Issue #8524: when creating an SSL socket, the timeout of the
    340         # original socket should be retained.
    341         for timeout in (None, 0.0, 5.0):
    342             s = socket.socket(socket.AF_INET)
    343             s.settimeout(timeout)
    344             with closing(ssl.wrap_socket(s)) as ss:
    345                 self.assertEqual(timeout, ss.gettimeout())
    346 
    347     def test_errors(self):
    348         sock = socket.socket()
    349         self.assertRaisesRegexp(ValueError,
    350                         "certfile must be specified",
    351                         ssl.wrap_socket, sock, keyfile=CERTFILE)
    352         self.assertRaisesRegexp(ValueError,
    353                         "certfile must be specified for server-side operations",
    354                         ssl.wrap_socket, sock, server_side=True)
    355         self.assertRaisesRegexp(ValueError,
    356                         "certfile must be specified for server-side operations",
    357                         ssl.wrap_socket, sock, server_side=True, certfile="")
    358         with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
    359             self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
    360                                     s.connect, (HOST, 8080))
    361         with self.assertRaises(IOError) as cm:
    362             with closing(socket.socket()) as sock:
    363                 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
    364         self.assertEqual(cm.exception.errno, errno.ENOENT)
    365         with self.assertRaises(IOError) as cm:
    366             with closing(socket.socket()) as sock:
    367                 ssl.wrap_socket(sock,
    368                     certfile=CERTFILE, keyfile=NONEXISTINGCERT)
    369         self.assertEqual(cm.exception.errno, errno.ENOENT)
    370         with self.assertRaises(IOError) as cm:
    371             with closing(socket.socket()) as sock:
    372                 ssl.wrap_socket(sock,
    373                     certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
    374         self.assertEqual(cm.exception.errno, errno.ENOENT)
    375 
    376     def bad_cert_test(self, certfile):
    377         """Check that trying to use the given client certificate fails"""
    378         certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
    379                                    certfile)
    380         sock = socket.socket()
    381         self.addCleanup(sock.close)
    382         with self.assertRaises(ssl.SSLError):
    383             ssl.wrap_socket(sock,
    384                             certfile=certfile,
    385                             ssl_version=ssl.PROTOCOL_TLSv1)
    386 
    387     def test_empty_cert(self):
    388         """Wrapping with an empty cert file"""
    389         self.bad_cert_test("nullcert.pem")
    390 
    391     def test_malformed_cert(self):
    392         """Wrapping with a badly formatted certificate (syntax error)"""
    393         self.bad_cert_test("badcert.pem")
    394 
    395     def test_malformed_key(self):
    396         """Wrapping with a badly formatted key (syntax error)"""
    397         self.bad_cert_test("badkey.pem")
    398 
    399     def test_match_hostname(self):
    400         def ok(cert, hostname):
    401             ssl.match_hostname(cert, hostname)
    402         def fail(cert, hostname):
    403             self.assertRaises(ssl.CertificateError,
    404                               ssl.match_hostname, cert, hostname)
    405 
    406         cert = {'subject': ((('commonName', 'example.com'),),)}
    407         ok(cert, 'example.com')
    408         ok(cert, 'ExAmple.cOm')
    409         fail(cert, 'www.example.com')
    410         fail(cert, '.example.com')
    411         fail(cert, 'example.org')
    412         fail(cert, 'exampleXcom')
    413 
    414         cert = {'subject': ((('commonName', '*.a.com'),),)}
    415         ok(cert, 'foo.a.com')
    416         fail(cert, 'bar.foo.a.com')
    417         fail(cert, 'a.com')
    418         fail(cert, 'Xa.com')
    419         fail(cert, '.a.com')
    420 
    421         # only match one left-most wildcard
    422         cert = {'subject': ((('commonName', 'f*.com'),),)}
    423         ok(cert, 'foo.com')
    424         ok(cert, 'f.com')
    425         fail(cert, 'bar.com')
    426         fail(cert, 'foo.a.com')
    427         fail(cert, 'bar.foo.com')
    428 
    429         # NULL bytes are bad, CVE-2013-4073
    430         cert = {'subject': ((('commonName',
    431                               'null.python.org\x00example.org'),),)}
    432         ok(cert, 'null.python.org\x00example.org') # or raise an error?
    433         fail(cert, 'example.org')
    434         fail(cert, 'null.python.org')
    435 
    436         # error cases with wildcards
    437         cert = {'subject': ((('commonName', '*.*.a.com'),),)}
    438         fail(cert, 'bar.foo.a.com')
    439         fail(cert, 'a.com')
    440         fail(cert, 'Xa.com')
    441         fail(cert, '.a.com')
    442 
    443         cert = {'subject': ((('commonName', 'a.*.com'),),)}
    444         fail(cert, 'a.foo.com')
    445         fail(cert, 'a..com')
    446         fail(cert, 'a.com')
    447 
    448         # wildcard doesn't match IDNA prefix 'xn--'
    449         idna = u'pthon.python.org'.encode("idna").decode("ascii")
    450         cert = {'subject': ((('commonName', idna),),)}
    451         ok(cert, idna)
    452         cert = {'subject': ((('commonName', 'x*.python.org'),),)}
    453         fail(cert, idna)
    454         cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
    455         fail(cert, idna)
    456 
    457         # wildcard in first fragment and  IDNA A-labels in sequent fragments
    458         # are supported.
    459         idna = u'www*.pythn.org'.encode("idna").decode("ascii")
    460         cert = {'subject': ((('commonName', idna),),)}
    461         ok(cert, u'www.pythn.org'.encode("idna").decode("ascii"))
    462         ok(cert, u'www1.pythn.org'.encode("idna").decode("ascii"))
    463         fail(cert, u'ftp.pythn.org'.encode("idna").decode("ascii"))
    464         fail(cert, u'pythn.org'.encode("idna").decode("ascii"))
    465 
    466         # Slightly fake real-world example
    467         cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
    468                 'subject': ((('commonName', 'linuxfrz.org'),),),
    469                 'subjectAltName': (('DNS', 'linuxfr.org'),
    470                                    ('DNS', 'linuxfr.com'),
    471                                    ('othername', '<unsupported>'))}
    472         ok(cert, 'linuxfr.org')
    473         ok(cert, 'linuxfr.com')
    474         # Not a "DNS" entry
    475         fail(cert, '<unsupported>')
    476         # When there is a subjectAltName, commonName isn't used
    477         fail(cert, 'linuxfrz.org')
    478 
    479         # A pristine real-world example
    480         cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
    481                 'subject': ((('countryName', 'US'),),
    482                             (('stateOrProvinceName', 'California'),),
    483                             (('localityName', 'Mountain View'),),
    484                             (('organizationName', 'Google Inc'),),
    485                             (('commonName', 'mail.google.com'),))}
    486         ok(cert, 'mail.google.com')
    487         fail(cert, 'gmail.com')
    488         # Only commonName is considered
    489         fail(cert, 'California')
    490 
    491         # Neither commonName nor subjectAltName
    492         cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
    493                 'subject': ((('countryName', 'US'),),
    494                             (('stateOrProvinceName', 'California'),),
    495                             (('localityName', 'Mountain View'),),
    496                             (('organizationName', 'Google Inc'),))}
    497         fail(cert, 'mail.google.com')
    498 
    499         # No DNS entry in subjectAltName but a commonName
    500         cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
    501                 'subject': ((('countryName', 'US'),),
    502                             (('stateOrProvinceName', 'California'),),
    503                             (('localityName', 'Mountain View'),),
    504                             (('commonName', 'mail.google.com'),)),
    505                 'subjectAltName': (('othername', 'blabla'), )}
    506         ok(cert, 'mail.google.com')
    507 
    508         # No DNS entry subjectAltName and no commonName
    509         cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
    510                 'subject': ((('countryName', 'US'),),
    511                             (('stateOrProvinceName', 'California'),),
    512                             (('localityName', 'Mountain View'),),
    513                             (('organizationName', 'Google Inc'),)),
    514                 'subjectAltName': (('othername', 'blabla'),)}
    515         fail(cert, 'google.com')
    516 
    517         # Empty cert / no cert
    518         self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
    519         self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
    520 
    521         # Issue #17980: avoid denials of service by refusing more than one
    522         # wildcard per fragment.
    523         cert = {'subject': ((('commonName', 'a*b.com'),),)}
    524         ok(cert, 'axxb.com')
    525         cert = {'subject': ((('commonName', 'a*b.co*'),),)}
    526         fail(cert, 'axxb.com')
    527         cert = {'subject': ((('commonName', 'a*b*.com'),),)}
    528         with self.assertRaises(ssl.CertificateError) as cm:
    529             ssl.match_hostname(cert, 'axxbxxc.com')
    530         self.assertIn("too many wildcards", str(cm.exception))
    531 
    532     def test_server_side(self):
    533         # server_hostname doesn't work for server sockets
    534         ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    535         with closing(socket.socket()) as sock:
    536             self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
    537                               server_hostname="some.hostname")
    538 
    539     def test_unknown_channel_binding(self):
    540         # should raise ValueError for unknown type
    541         s = socket.socket(socket.AF_INET)
    542         with closing(ssl.wrap_socket(s)) as ss:
    543             with self.assertRaises(ValueError):
    544                 ss.get_channel_binding("unknown-type")
    545 
    546     @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
    547                          "'tls-unique' channel binding not available")
    548     def test_tls_unique_channel_binding(self):
    549         # unconnected should return None for known type
    550         s = socket.socket(socket.AF_INET)
    551         with closing(ssl.wrap_socket(s)) as ss:
    552             self.assertIsNone(ss.get_channel_binding("tls-unique"))
    553         # the same for server-side
    554         s = socket.socket(socket.AF_INET)
    555         with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
    556             self.assertIsNone(ss.get_channel_binding("tls-unique"))
    557 
    558     def test_get_default_verify_paths(self):
    559         paths = ssl.get_default_verify_paths()
    560         self.assertEqual(len(paths), 6)
    561         self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
    562 
    563         with support.EnvironmentVarGuard() as env:
    564             env["SSL_CERT_DIR"] = CAPATH
    565             env["SSL_CERT_FILE"] = CERTFILE
    566             paths = ssl.get_default_verify_paths()
    567             self.assertEqual(paths.cafile, CERTFILE)
    568             self.assertEqual(paths.capath, CAPATH)
    569 
    570     @unittest.skipUnless(sys.platform == "win32", "Windows specific")
    571     def test_enum_certificates(self):
    572         self.assertTrue(ssl.enum_certificates("CA"))
    573         self.assertTrue(ssl.enum_certificates("ROOT"))
    574 
    575         self.assertRaises(TypeError, ssl.enum_certificates)
    576         self.assertRaises(WindowsError, ssl.enum_certificates, "")
    577 
    578         trust_oids = set()
    579         for storename in ("CA", "ROOT"):
    580             store = ssl.enum_certificates(storename)
    581             self.assertIsInstance(store, list)
    582             for element in store:
    583                 self.assertIsInstance(element, tuple)
    584                 self.assertEqual(len(element), 3)
    585                 cert, enc, trust = element
    586                 self.assertIsInstance(cert, bytes)
    587                 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
    588                 self.assertIsInstance(trust, (set, bool))
    589                 if isinstance(trust, set):
    590                     trust_oids.update(trust)
    591 
    592         serverAuth = "1.3.6.1.5.5.7.3.1"
    593         self.assertIn(serverAuth, trust_oids)
    594 
    595     @unittest.skipUnless(sys.platform == "win32", "Windows specific")
    596     def test_enum_crls(self):
    597         self.assertTrue(ssl.enum_crls("CA"))
    598         self.assertRaises(TypeError, ssl.enum_crls)
    599         self.assertRaises(WindowsError, ssl.enum_crls, "")
    600 
    601         crls = ssl.enum_crls("CA")
    602         self.assertIsInstance(crls, list)
    603         for element in crls:
    604             self.assertIsInstance(element, tuple)
    605             self.assertEqual(len(element), 2)
    606             self.assertIsInstance(element[0], bytes)
    607             self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
    608 
    609 
    610     def test_asn1object(self):
    611         expected = (129, 'serverAuth', 'TLS Web Server Authentication',
    612                     '1.3.6.1.5.5.7.3.1')
    613 
    614         val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
    615         self.assertEqual(val, expected)
    616         self.assertEqual(val.nid, 129)
    617         self.assertEqual(val.shortname, 'serverAuth')
    618         self.assertEqual(val.longname, 'TLS Web Server Authentication')
    619         self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
    620         self.assertIsInstance(val, ssl._ASN1Object)
    621         self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
    622 
    623         val = ssl._ASN1Object.fromnid(129)
    624         self.assertEqual(val, expected)
    625         self.assertIsInstance(val, ssl._ASN1Object)
    626         self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
    627         with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
    628             ssl._ASN1Object.fromnid(100000)
    629         for i in range(1000):
    630             try:
    631                 obj = ssl._ASN1Object.fromnid(i)
    632             except ValueError:
    633                 pass
    634             else:
    635                 self.assertIsInstance(obj.nid, int)
    636                 self.assertIsInstance(obj.shortname, str)
    637                 self.assertIsInstance(obj.longname, str)
    638                 self.assertIsInstance(obj.oid, (str, type(None)))
    639 
    640         val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
    641         self.assertEqual(val, expected)
    642         self.assertIsInstance(val, ssl._ASN1Object)
    643         self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
    644         self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
    645                          expected)
    646         with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
    647             ssl._ASN1Object.fromname('serverauth')
    648 
    649     def test_purpose_enum(self):
    650         val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
    651         self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
    652         self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
    653         self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
    654         self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
    655         self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
    656                               '1.3.6.1.5.5.7.3.1')
    657 
    658         val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
    659         self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
    660         self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
    661         self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
    662         self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
    663         self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
    664                               '1.3.6.1.5.5.7.3.2')
    665 
    666     def test_unsupported_dtls(self):
    667         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    668         self.addCleanup(s.close)
    669         with self.assertRaises(NotImplementedError) as cx:
    670             ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
    671         self.assertEqual(str(cx.exception), "only stream sockets are supported")
    672         ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    673         with self.assertRaises(NotImplementedError) as cx:
    674             ctx.wrap_socket(s)
    675         self.assertEqual(str(cx.exception), "only stream sockets are supported")
    676 
    677     def cert_time_ok(self, timestring, timestamp):
    678         self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
    679 
    680     def cert_time_fail(self, timestring):
    681         with self.assertRaises(ValueError):
    682             ssl.cert_time_to_seconds(timestring)
    683 
    684     @unittest.skipUnless(utc_offset(),
    685                          'local time needs to be different from UTC')
    686     def test_cert_time_to_seconds_timezone(self):
    687         # Issue #19940: ssl.cert_time_to_seconds() returns wrong
    688         #               results if local timezone is not UTC
    689         self.cert_time_ok("May  9 00:00:00 2007 GMT", 1178668800.0)
    690         self.cert_time_ok("Jan  5 09:34:43 2018 GMT", 1515144883.0)
    691 
    692     def test_cert_time_to_seconds(self):
    693         timestring = "Jan  5 09:34:43 2018 GMT"
    694         ts = 1515144883.0
    695         self.cert_time_ok(timestring, ts)
    696         # accept keyword parameter, assert its name
    697         self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
    698         # accept both %e and %d (space or zero generated by strftime)
    699         self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
    700         # case-insensitive
    701         self.cert_time_ok("JaN  5 09:34:43 2018 GmT", ts)
    702         self.cert_time_fail("Jan  5 09:34 2018 GMT")     # no seconds
    703         self.cert_time_fail("Jan  5 09:34:43 2018")      # no GMT
    704         self.cert_time_fail("Jan  5 09:34:43 2018 UTC")  # not GMT timezone
    705         self.cert_time_fail("Jan 35 09:34:43 2018 GMT")  # invalid day
    706         self.cert_time_fail("Jon  5 09:34:43 2018 GMT")  # invalid month
    707         self.cert_time_fail("Jan  5 24:00:00 2018 GMT")  # invalid hour
    708         self.cert_time_fail("Jan  5 09:60:43 2018 GMT")  # invalid minute
    709 
    710         newyear_ts = 1230768000.0
    711         # leap seconds
    712         self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
    713         # same timestamp
    714         self.cert_time_ok("Jan  1 00:00:00 2009 GMT", newyear_ts)
    715 
    716         self.cert_time_ok("Jan  5 09:34:59 2018 GMT", 1515144899)
    717         #  allow 60th second (even if it is not a leap second)
    718         self.cert_time_ok("Jan  5 09:34:60 2018 GMT", 1515144900)
    719         #  allow 2nd leap second for compatibility with time.strptime()
    720         self.cert_time_ok("Jan  5 09:34:61 2018 GMT", 1515144901)
    721         self.cert_time_fail("Jan  5 09:34:62 2018 GMT")  # invalid seconds
    722 
    723         # no special treatement for the special value:
    724         #   99991231235959Z (rfc 5280)
    725         self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
    726 
    727     @support.run_with_locale('LC_ALL', '')
    728     def test_cert_time_to_seconds_locale(self):
    729         # `cert_time_to_seconds()` should be locale independent
    730 
    731         def local_february_name():
    732             return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
    733 
    734         if local_february_name().lower() == 'feb':
    735             self.skipTest("locale-specific month name needs to be "
    736                           "different from C locale")
    737 
    738         # locale-independent
    739         self.cert_time_ok("Feb  9 00:00:00 2007 GMT", 1170979200.0)
    740         self.cert_time_fail(local_february_name() + "  9 00:00:00 2007 GMT")
    741 
    742 
    743 class ContextTests(unittest.TestCase):
    744 
    745     @skip_if_broken_ubuntu_ssl
    746     def test_constructor(self):
    747         for protocol in PROTOCOLS:
    748             ssl.SSLContext(protocol)
    749         self.assertRaises(TypeError, ssl.SSLContext)
    750         self.assertRaises(ValueError, ssl.SSLContext, -1)
    751         self.assertRaises(ValueError, ssl.SSLContext, 42)
    752 
    753     @skip_if_broken_ubuntu_ssl
    754     def test_protocol(self):
    755         for proto in PROTOCOLS:
    756             ctx = ssl.SSLContext(proto)
    757             self.assertEqual(ctx.protocol, proto)
    758 
    759     def test_ciphers(self):
    760         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    761         ctx.set_ciphers("ALL")
    762         ctx.set_ciphers("DEFAULT")
    763         with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
    764             ctx.set_ciphers("^$:,;?*'dorothyx")
    765 
    766     @skip_if_broken_ubuntu_ssl
    767     def test_options(self):
    768         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    769         # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
    770         default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
    771         if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0):
    772             default |= ssl.OP_NO_COMPRESSION
    773         self.assertEqual(default, ctx.options)
    774         ctx.options |= ssl.OP_NO_TLSv1
    775         self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
    776         if can_clear_options():
    777             ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
    778             self.assertEqual(default, ctx.options)
    779             ctx.options = 0
    780             self.assertEqual(0, ctx.options)
    781         else:
    782             with self.assertRaises(ValueError):
    783                 ctx.options = 0
    784 
    785     def test_verify_mode(self):
    786         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    787         # Default value
    788         self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
    789         ctx.verify_mode = ssl.CERT_OPTIONAL
    790         self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
    791         ctx.verify_mode = ssl.CERT_REQUIRED
    792         self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
    793         ctx.verify_mode = ssl.CERT_NONE
    794         self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
    795         with self.assertRaises(TypeError):
    796             ctx.verify_mode = None
    797         with self.assertRaises(ValueError):
    798             ctx.verify_mode = 42
    799 
    800     @unittest.skipUnless(have_verify_flags(),
    801                          "verify_flags need OpenSSL > 0.9.8")
    802     def test_verify_flags(self):
    803         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    804         # default value
    805         tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
    806         self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
    807         ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
    808         self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
    809         ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
    810         self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
    811         ctx.verify_flags = ssl.VERIFY_DEFAULT
    812         self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
    813         # supports any value
    814         ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
    815         self.assertEqual(ctx.verify_flags,
    816                          ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
    817         with self.assertRaises(TypeError):
    818             ctx.verify_flags = None
    819 
    820     def test_load_cert_chain(self):
    821         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    822         # Combined key and cert in a single file
    823         ctx.load_cert_chain(CERTFILE, keyfile=None)
    824         ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
    825         self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
    826         with self.assertRaises(IOError) as cm:
    827             ctx.load_cert_chain(NONEXISTINGCERT)
    828         self.assertEqual(cm.exception.errno, errno.ENOENT)
    829         with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
    830             ctx.load_cert_chain(BADCERT)
    831         with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
    832             ctx.load_cert_chain(EMPTYCERT)
    833         # Separate key and cert
    834         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    835         ctx.load_cert_chain(ONLYCERT, ONLYKEY)
    836         ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
    837         ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
    838         with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
    839             ctx.load_cert_chain(ONLYCERT)
    840         with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
    841             ctx.load_cert_chain(ONLYKEY)
    842         with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
    843             ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
    844         # Mismatching key and cert
    845         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    846         with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
    847             ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
    848         # Password protected key and cert
    849         ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
    850         ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
    851         ctx.load_cert_chain(CERTFILE_PROTECTED,
    852                             password=bytearray(KEY_PASSWORD.encode()))
    853         ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
    854         ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
    855         ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
    856                             bytearray(KEY_PASSWORD.encode()))
    857         with self.assertRaisesRegexp(TypeError, "should be a string"):
    858             ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
    859         with self.assertRaises(ssl.SSLError):
    860             ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
    861         with self.assertRaisesRegexp(ValueError, "cannot be longer"):
    862             # openssl has a fixed limit on the password buffer.
    863             # PEM_BUFSIZE is generally set to 1kb.
    864             # Return a string larger than this.
    865             ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
    866         # Password callback
    867         def getpass_unicode():
    868             return KEY_PASSWORD
    869         def getpass_bytes():
    870             return KEY_PASSWORD.encode()
    871         def getpass_bytearray():
    872             return bytearray(KEY_PASSWORD.encode())
    873         def getpass_badpass():
    874             return "badpass"
    875         def getpass_huge():
    876             return b'a' * (1024 * 1024)
    877         def getpass_bad_type():
    878             return 9
    879         def getpass_exception():
    880             raise Exception('getpass error')
    881         class GetPassCallable:
    882             def __call__(self):
    883                 return KEY_PASSWORD
    884             def getpass(self):
    885                 return KEY_PASSWORD
    886         ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
    887         ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
    888         ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
    889         ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
    890         ctx.load_cert_chain(CERTFILE_PROTECTED,
    891                             password=GetPassCallable().getpass)
    892         with self.assertRaises(ssl.SSLError):
    893             ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
    894         with self.assertRaisesRegexp(ValueError, "cannot be longer"):
    895             ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
    896         with self.assertRaisesRegexp(TypeError, "must return a string"):
    897             ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
    898         with self.assertRaisesRegexp(Exception, "getpass error"):
    899             ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
    900         # Make sure the password function isn't called if it isn't needed
    901         ctx.load_cert_chain(CERTFILE, password=getpass_exception)
    902 
    903     def test_load_verify_locations(self):
    904         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    905         ctx.load_verify_locations(CERTFILE)
    906         ctx.load_verify_locations(cafile=CERTFILE, capath=None)
    907         ctx.load_verify_locations(BYTES_CERTFILE)
    908         ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
    909         ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8'))
    910         self.assertRaises(TypeError, ctx.load_verify_locations)
    911         self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
    912         with self.assertRaises(IOError) as cm:
    913             ctx.load_verify_locations(NONEXISTINGCERT)
    914         self.assertEqual(cm.exception.errno, errno.ENOENT)
    915         with self.assertRaises(IOError):
    916             ctx.load_verify_locations(u'')
    917         with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
    918             ctx.load_verify_locations(BADCERT)
    919         ctx.load_verify_locations(CERTFILE, CAPATH)
    920         ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
    921 
    922         # Issue #10989: crash if the second argument type is invalid
    923         self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
    924 
    925     def test_load_verify_cadata(self):
    926         # test cadata
    927         with open(CAFILE_CACERT) as f:
    928             cacert_pem = f.read().decode("ascii")
    929         cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
    930         with open(CAFILE_NEURONIO) as f:
    931             neuronio_pem = f.read().decode("ascii")
    932         neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
    933 
    934         # test PEM
    935         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    936         self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
    937         ctx.load_verify_locations(cadata=cacert_pem)
    938         self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
    939         ctx.load_verify_locations(cadata=neuronio_pem)
    940         self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
    941         # cert already in hash table
    942         ctx.load_verify_locations(cadata=neuronio_pem)
    943         self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
    944 
    945         # combined
    946         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    947         combined = "\n".join((cacert_pem, neuronio_pem))
    948         ctx.load_verify_locations(cadata=combined)
    949         self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
    950 
    951         # with junk around the certs
    952         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    953         combined = ["head", cacert_pem, "other", neuronio_pem, "again",
    954                     neuronio_pem, "tail"]
    955         ctx.load_verify_locations(cadata="\n".join(combined))
    956         self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
    957 
    958         # test DER
    959         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    960         ctx.load_verify_locations(cadata=cacert_der)
    961         ctx.load_verify_locations(cadata=neuronio_der)
    962         self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
    963         # cert already in hash table
    964         ctx.load_verify_locations(cadata=cacert_der)
    965         self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
    966 
    967         # combined
    968         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    969         combined = b"".join((cacert_der, neuronio_der))
    970         ctx.load_verify_locations(cadata=combined)
    971         self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
    972 
    973         # error cases
    974         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    975         self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
    976 
    977         with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
    978             ctx.load_verify_locations(cadata=u"broken")
    979         with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
    980             ctx.load_verify_locations(cadata=b"broken")
    981 
    982 
    983     def test_load_dh_params(self):
    984         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    985         ctx.load_dh_params(DHFILE)
    986         if os.name != 'nt':
    987             ctx.load_dh_params(BYTES_DHFILE)
    988         self.assertRaises(TypeError, ctx.load_dh_params)
    989         self.assertRaises(TypeError, ctx.load_dh_params, None)
    990         with self.assertRaises(IOError) as cm:
    991             ctx.load_dh_params(NONEXISTINGCERT)
    992         self.assertEqual(cm.exception.errno, errno.ENOENT)
    993         with self.assertRaises(ssl.SSLError) as cm:
    994             ctx.load_dh_params(CERTFILE)
    995 
    996     @skip_if_broken_ubuntu_ssl
    997     def test_session_stats(self):
    998         for proto in PROTOCOLS:
    999             ctx = ssl.SSLContext(proto)
   1000             self.assertEqual(ctx.session_stats(), {
   1001                 'number': 0,
   1002                 'connect': 0,
   1003                 'connect_good': 0,
   1004                 'connect_renegotiate': 0,
   1005                 'accept': 0,
   1006                 'accept_good': 0,
   1007                 'accept_renegotiate': 0,
   1008                 'hits': 0,
   1009                 'misses': 0,
   1010                 'timeouts': 0,
   1011                 'cache_full': 0,
   1012             })
   1013 
   1014     def test_set_default_verify_paths(self):
   1015         # There's not much we can do to test that it acts as expected,
   1016         # so just check it doesn't crash or raise an exception.
   1017         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1018         ctx.set_default_verify_paths()
   1019 
   1020     @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
   1021     def test_set_ecdh_curve(self):
   1022         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1023         ctx.set_ecdh_curve("prime256v1")
   1024         ctx.set_ecdh_curve(b"prime256v1")
   1025         self.assertRaises(TypeError, ctx.set_ecdh_curve)
   1026         self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
   1027         self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
   1028         self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
   1029 
   1030     @needs_sni
   1031     def test_sni_callback(self):
   1032         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1033 
   1034         # set_servername_callback expects a callable, or None
   1035         self.assertRaises(TypeError, ctx.set_servername_callback)
   1036         self.assertRaises(TypeError, ctx.set_servername_callback, 4)
   1037         self.assertRaises(TypeError, ctx.set_servername_callback, "")
   1038         self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
   1039 
   1040         def dummycallback(sock, servername, ctx):
   1041             pass
   1042         ctx.set_servername_callback(None)
   1043         ctx.set_servername_callback(dummycallback)
   1044 
   1045     @needs_sni
   1046     def test_sni_callback_refcycle(self):
   1047         # Reference cycles through the servername callback are detected
   1048         # and cleared.
   1049         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1050         def dummycallback(sock, servername, ctx, cycle=ctx):
   1051             pass
   1052         ctx.set_servername_callback(dummycallback)
   1053         wr = weakref.ref(ctx)
   1054         del ctx, dummycallback
   1055         gc.collect()
   1056         self.assertIs(wr(), None)
   1057 
   1058     def test_cert_store_stats(self):
   1059         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1060         self.assertEqual(ctx.cert_store_stats(),
   1061             {'x509_ca': 0, 'crl': 0, 'x509': 0})
   1062         ctx.load_cert_chain(CERTFILE)
   1063         self.assertEqual(ctx.cert_store_stats(),
   1064             {'x509_ca': 0, 'crl': 0, 'x509': 0})
   1065         ctx.load_verify_locations(CERTFILE)
   1066         self.assertEqual(ctx.cert_store_stats(),
   1067             {'x509_ca': 0, 'crl': 0, 'x509': 1})
   1068         ctx.load_verify_locations(CAFILE_CACERT)
   1069         self.assertEqual(ctx.cert_store_stats(),
   1070             {'x509_ca': 1, 'crl': 0, 'x509': 2})
   1071 
   1072     def test_get_ca_certs(self):
   1073         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1074         self.assertEqual(ctx.get_ca_certs(), [])
   1075         # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
   1076         ctx.load_verify_locations(CERTFILE)
   1077         self.assertEqual(ctx.get_ca_certs(), [])
   1078         # but CAFILE_CACERT is a CA cert
   1079         ctx.load_verify_locations(CAFILE_CACERT)
   1080         self.assertEqual(ctx.get_ca_certs(),
   1081             [{'issuer': ((('organizationName', 'Root CA'),),
   1082                          (('organizationalUnitName', 'http://www.cacert.org'),),
   1083                          (('commonName', 'CA Cert Signing Authority'),),
   1084                          (('emailAddress', 'support (at] cacert.org'),)),
   1085               'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
   1086               'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
   1087               'serialNumber': '00',
   1088               'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
   1089               'subject': ((('organizationName', 'Root CA'),),
   1090                           (('organizationalUnitName', 'http://www.cacert.org'),),
   1091                           (('commonName', 'CA Cert Signing Authority'),),
   1092                           (('emailAddress', 'support (at] cacert.org'),)),
   1093               'version': 3}])
   1094 
   1095         with open(CAFILE_CACERT) as f:
   1096             pem = f.read()
   1097         der = ssl.PEM_cert_to_DER_cert(pem)
   1098         self.assertEqual(ctx.get_ca_certs(True), [der])
   1099 
   1100     def test_load_default_certs(self):
   1101         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1102         ctx.load_default_certs()
   1103 
   1104         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1105         ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
   1106         ctx.load_default_certs()
   1107 
   1108         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1109         ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
   1110 
   1111         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1112         self.assertRaises(TypeError, ctx.load_default_certs, None)
   1113         self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
   1114 
   1115     @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
   1116     @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
   1117     def test_load_default_certs_env(self):
   1118         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1119         with support.EnvironmentVarGuard() as env:
   1120             env["SSL_CERT_DIR"] = CAPATH
   1121             env["SSL_CERT_FILE"] = CERTFILE
   1122             ctx.load_default_certs()
   1123             self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
   1124 
   1125     @unittest.skipUnless(sys.platform == "win32", "Windows specific")
   1126     def test_load_default_certs_env_windows(self):
   1127         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1128         ctx.load_default_certs()
   1129         stats = ctx.cert_store_stats()
   1130 
   1131         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1132         with support.EnvironmentVarGuard() as env:
   1133             env["SSL_CERT_DIR"] = CAPATH
   1134             env["SSL_CERT_FILE"] = CERTFILE
   1135             ctx.load_default_certs()
   1136             stats["x509"] += 1
   1137             self.assertEqual(ctx.cert_store_stats(), stats)
   1138 
   1139     def test_create_default_context(self):
   1140         ctx = ssl.create_default_context()
   1141         self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
   1142         self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
   1143         self.assertTrue(ctx.check_hostname)
   1144         self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
   1145         self.assertEqual(
   1146             ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
   1147             getattr(ssl, "OP_NO_COMPRESSION", 0),
   1148         )
   1149 
   1150         with open(SIGNING_CA) as f:
   1151             cadata = f.read().decode("ascii")
   1152         ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
   1153                                          cadata=cadata)
   1154         self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
   1155         self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
   1156         self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
   1157         self.assertEqual(
   1158             ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
   1159             getattr(ssl, "OP_NO_COMPRESSION", 0),
   1160         )
   1161 
   1162         ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
   1163         self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
   1164         self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
   1165         self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
   1166         self.assertEqual(
   1167             ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
   1168             getattr(ssl, "OP_NO_COMPRESSION", 0),
   1169         )
   1170         self.assertEqual(
   1171             ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
   1172             getattr(ssl, "OP_SINGLE_DH_USE", 0),
   1173         )
   1174         self.assertEqual(
   1175             ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
   1176             getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
   1177         )
   1178 
   1179     def test__create_stdlib_context(self):
   1180         ctx = ssl._create_stdlib_context()
   1181         self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
   1182         self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
   1183         self.assertFalse(ctx.check_hostname)
   1184         self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
   1185 
   1186         ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
   1187         self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
   1188         self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
   1189         self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
   1190 
   1191         ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
   1192                                          cert_reqs=ssl.CERT_REQUIRED,
   1193                                          check_hostname=True)
   1194         self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
   1195         self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
   1196         self.assertTrue(ctx.check_hostname)
   1197         self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
   1198 
   1199         ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
   1200         self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
   1201         self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
   1202         self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
   1203 
   1204     def test__https_verify_certificates(self):
   1205         # Unit test to check the contect factory mapping
   1206         # The factories themselves are tested above
   1207         # This test will fail by design if run under PYTHONHTTPSVERIFY=0
   1208         # (as will various test_httplib tests)
   1209 
   1210         # Uses a fresh SSL module to avoid affecting the real one
   1211         local_ssl = support.import_fresh_module("ssl")
   1212         # Certificate verification is enabled by default
   1213         self.assertIs(local_ssl._create_default_https_context,
   1214                       local_ssl.create_default_context)
   1215         # Turn default verification off
   1216         local_ssl._https_verify_certificates(enable=False)
   1217         self.assertIs(local_ssl._create_default_https_context,
   1218                       local_ssl._create_unverified_context)
   1219         # And back on
   1220         local_ssl._https_verify_certificates(enable=True)
   1221         self.assertIs(local_ssl._create_default_https_context,
   1222                       local_ssl.create_default_context)
   1223         # The default behaviour is to enable
   1224         local_ssl._https_verify_certificates(enable=False)
   1225         local_ssl._https_verify_certificates()
   1226         self.assertIs(local_ssl._create_default_https_context,
   1227                       local_ssl.create_default_context)
   1228 
   1229     def test__https_verify_envvar(self):
   1230         # Unit test to check the PYTHONHTTPSVERIFY handling
   1231         # Need to use a subprocess so it can still be run under -E
   1232         https_is_verified = """import ssl, sys; \
   1233             status = "Error: _create_default_https_context does not verify certs" \
   1234                        if ssl._create_default_https_context is \
   1235                           ssl._create_unverified_context \
   1236                      else None; \
   1237             sys.exit(status)"""
   1238         https_is_not_verified = """import ssl, sys; \
   1239             status = "Error: _create_default_https_context verifies certs" \
   1240                        if ssl._create_default_https_context is \
   1241                           ssl.create_default_context \
   1242                      else None; \
   1243             sys.exit(status)"""
   1244         extra_env = {}
   1245         # Omitting it leaves verification on
   1246         assert_python_ok("-c", https_is_verified, **extra_env)
   1247         # Setting it to zero turns verification off
   1248         extra_env[ssl._https_verify_envvar] = "0"
   1249         assert_python_ok("-c", https_is_not_verified, **extra_env)
   1250         # Any other value should also leave it on
   1251         for setting in ("", "1", "enabled", "foo"):
   1252             extra_env[ssl._https_verify_envvar] = setting
   1253             assert_python_ok("-c", https_is_verified, **extra_env)
   1254 
   1255     def test_check_hostname(self):
   1256         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1257         self.assertFalse(ctx.check_hostname)
   1258 
   1259         # Requires CERT_REQUIRED or CERT_OPTIONAL
   1260         with self.assertRaises(ValueError):
   1261             ctx.check_hostname = True
   1262         ctx.verify_mode = ssl.CERT_REQUIRED
   1263         self.assertFalse(ctx.check_hostname)
   1264         ctx.check_hostname = True
   1265         self.assertTrue(ctx.check_hostname)
   1266 
   1267         ctx.verify_mode = ssl.CERT_OPTIONAL
   1268         ctx.check_hostname = True
   1269         self.assertTrue(ctx.check_hostname)
   1270 
   1271         # Cannot set CERT_NONE with check_hostname enabled
   1272         with self.assertRaises(ValueError):
   1273             ctx.verify_mode = ssl.CERT_NONE
   1274         ctx.check_hostname = False
   1275         self.assertFalse(ctx.check_hostname)
   1276 
   1277 
   1278 class SSLErrorTests(unittest.TestCase):
   1279 
   1280     def test_str(self):
   1281         # The str() of a SSLError doesn't include the errno
   1282         e = ssl.SSLError(1, "foo")
   1283         self.assertEqual(str(e), "foo")
   1284         self.assertEqual(e.errno, 1)
   1285         # Same for a subclass
   1286         e = ssl.SSLZeroReturnError(1, "foo")
   1287         self.assertEqual(str(e), "foo")
   1288         self.assertEqual(e.errno, 1)
   1289 
   1290     def test_lib_reason(self):
   1291         # Test the library and reason attributes
   1292         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1293         with self.assertRaises(ssl.SSLError) as cm:
   1294             ctx.load_dh_params(CERTFILE)
   1295         self.assertEqual(cm.exception.library, 'PEM')
   1296         self.assertEqual(cm.exception.reason, 'NO_START_LINE')
   1297         s = str(cm.exception)
   1298         self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
   1299 
   1300     def test_subclass(self):
   1301         # Check that the appropriate SSLError subclass is raised
   1302         # (this only tests one of them)
   1303         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1304         with closing(socket.socket()) as s:
   1305             s.bind(("127.0.0.1", 0))
   1306             s.listen(5)
   1307             c = socket.socket()
   1308             c.connect(s.getsockname())
   1309             c.setblocking(False)
   1310             with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
   1311                 with self.assertRaises(ssl.SSLWantReadError) as cm:
   1312                     c.do_handshake()
   1313                 s = str(cm.exception)
   1314                 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
   1315                 # For compatibility
   1316                 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
   1317 
   1318 
   1319 class NetworkedTests(unittest.TestCase):
   1320 
   1321     def test_connect(self):
   1322         with support.transient_internet(REMOTE_HOST):
   1323             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
   1324                                 cert_reqs=ssl.CERT_NONE)
   1325             try:
   1326                 s.connect((REMOTE_HOST, 443))
   1327                 self.assertEqual({}, s.getpeercert())
   1328             finally:
   1329                 s.close()
   1330 
   1331             # this should fail because we have no verification certs
   1332             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
   1333                                 cert_reqs=ssl.CERT_REQUIRED)
   1334             self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
   1335                                    s.connect, (REMOTE_HOST, 443))
   1336             s.close()
   1337 
   1338             # this should succeed because we specify the root cert
   1339             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
   1340                                 cert_reqs=ssl.CERT_REQUIRED,
   1341                                 ca_certs=REMOTE_ROOT_CERT)
   1342             try:
   1343                 s.connect((REMOTE_HOST, 443))
   1344                 self.assertTrue(s.getpeercert())
   1345             finally:
   1346                 s.close()
   1347 
   1348     def test_connect_ex(self):
   1349         # Issue #11326: check connect_ex() implementation
   1350         with support.transient_internet(REMOTE_HOST):
   1351             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
   1352                                 cert_reqs=ssl.CERT_REQUIRED,
   1353                                 ca_certs=REMOTE_ROOT_CERT)
   1354             try:
   1355                 self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443)))
   1356                 self.assertTrue(s.getpeercert())
   1357             finally:
   1358                 s.close()
   1359 
   1360     def test_non_blocking_connect_ex(self):
   1361         # Issue #11326: non-blocking connect_ex() should allow handshake
   1362         # to proceed after the socket gets ready.
   1363         with support.transient_internet(REMOTE_HOST):
   1364             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
   1365                                 cert_reqs=ssl.CERT_REQUIRED,
   1366                                 ca_certs=REMOTE_ROOT_CERT,
   1367                                 do_handshake_on_connect=False)
   1368             try:
   1369                 s.setblocking(False)
   1370                 rc = s.connect_ex((REMOTE_HOST, 443))
   1371                 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
   1372                 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
   1373                 # Wait for connect to finish
   1374                 select.select([], [s], [], 5.0)
   1375                 # Non-blocking handshake
   1376                 while True:
   1377                     try:
   1378                         s.do_handshake()
   1379                         break
   1380                     except ssl.SSLWantReadError:
   1381                         select.select([s], [], [], 5.0)
   1382                     except ssl.SSLWantWriteError:
   1383                         select.select([], [s], [], 5.0)
   1384                 # SSL established
   1385                 self.assertTrue(s.getpeercert())
   1386             finally:
   1387                 s.close()
   1388 
   1389     def test_timeout_connect_ex(self):
   1390         # Issue #12065: on a timeout, connect_ex() should return the original
   1391         # errno (mimicking the behaviour of non-SSL sockets).
   1392         with support.transient_internet(REMOTE_HOST):
   1393             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
   1394                                 cert_reqs=ssl.CERT_REQUIRED,
   1395                                 ca_certs=REMOTE_ROOT_CERT,
   1396                                 do_handshake_on_connect=False)
   1397             try:
   1398                 s.settimeout(0.0000001)
   1399                 rc = s.connect_ex((REMOTE_HOST, 443))
   1400                 if rc == 0:
   1401                     self.skipTest("REMOTE_HOST responded too quickly")
   1402                 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
   1403             finally:
   1404                 s.close()
   1405 
   1406     def test_connect_ex_error(self):
   1407         with support.transient_internet(REMOTE_HOST):
   1408             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
   1409                                 cert_reqs=ssl.CERT_REQUIRED,
   1410                                 ca_certs=REMOTE_ROOT_CERT)
   1411             try:
   1412                 rc = s.connect_ex((REMOTE_HOST, 444))
   1413                 # Issue #19919: Windows machines or VMs hosted on Windows
   1414                 # machines sometimes return EWOULDBLOCK.
   1415                 errors = (
   1416                     errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
   1417                     errno.EWOULDBLOCK,
   1418                 )
   1419                 self.assertIn(rc, errors)
   1420             finally:
   1421                 s.close()
   1422 
   1423     def test_connect_with_context(self):
   1424         with support.transient_internet(REMOTE_HOST):
   1425             # Same as test_connect, but with a separately created context
   1426             ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   1427             s = ctx.wrap_socket(socket.socket(socket.AF_INET))
   1428             s.connect((REMOTE_HOST, 443))
   1429             try:
   1430                 self.assertEqual({}, s.getpeercert())
   1431             finally:
   1432                 s.close()
   1433             # Same with a server hostname
   1434             s = ctx.wrap_socket(socket.socket(socket.AF_INET),
   1435                                 server_hostname=REMOTE_HOST)
   1436             s.connect((REMOTE_HOST, 443))
   1437             s.close()
   1438             # This should fail because we have no verification certs
   1439             ctx.verify_mode = ssl.CERT_REQUIRED
   1440             s = ctx.wrap_socket(socket.socket(socket.AF_INET))
   1441             self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
   1442                                     s.connect, (REMOTE_HOST, 443))
   1443             s.close()
   1444             # This should succeed because we specify the root cert
   1445             ctx.load_verify_locations(REMOTE_ROOT_CERT)
   1446             s = ctx.wrap_socket(socket.socket(socket.AF_INET))
   1447             s.connect((REMOTE_HOST, 443))
   1448             try:
   1449                 cert = s.getpeercert()
   1450                 self.assertTrue(cert)
   1451             finally:
   1452                 s.close()
   1453 
   1454     def test_connect_capath(self):
   1455         # Verify server certificates using the `capath` argument
   1456         # NOTE: the subject hashing algorithm has been changed between
   1457         # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
   1458         # contain both versions of each certificate (same content, different
   1459         # filename) for this test to be portable across OpenSSL releases.
   1460         with support.transient_internet(REMOTE_HOST):
   1461             ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   1462             ctx.verify_mode = ssl.CERT_REQUIRED
   1463             ctx.load_verify_locations(capath=CAPATH)
   1464             s = ctx.wrap_socket(socket.socket(socket.AF_INET))
   1465             s.connect((REMOTE_HOST, 443))
   1466             try:
   1467                 cert = s.getpeercert()
   1468                 self.assertTrue(cert)
   1469             finally:
   1470                 s.close()
   1471             # Same with a bytes `capath` argument
   1472             ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   1473             ctx.verify_mode = ssl.CERT_REQUIRED
   1474             ctx.load_verify_locations(capath=BYTES_CAPATH)
   1475             s = ctx.wrap_socket(socket.socket(socket.AF_INET))
   1476             s.connect((REMOTE_HOST, 443))
   1477             try:
   1478                 cert = s.getpeercert()
   1479                 self.assertTrue(cert)
   1480             finally:
   1481                 s.close()
   1482 
   1483     def test_connect_cadata(self):
   1484         with open(REMOTE_ROOT_CERT) as f:
   1485             pem = f.read().decode('ascii')
   1486         der = ssl.PEM_cert_to_DER_cert(pem)
   1487         with support.transient_internet(REMOTE_HOST):
   1488             ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   1489             ctx.verify_mode = ssl.CERT_REQUIRED
   1490             ctx.load_verify_locations(cadata=pem)
   1491             with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
   1492                 s.connect((REMOTE_HOST, 443))
   1493                 cert = s.getpeercert()
   1494                 self.assertTrue(cert)
   1495 
   1496             # same with DER
   1497             ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   1498             ctx.verify_mode = ssl.CERT_REQUIRED
   1499             ctx.load_verify_locations(cadata=der)
   1500             with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
   1501                 s.connect((REMOTE_HOST, 443))
   1502                 cert = s.getpeercert()
   1503                 self.assertTrue(cert)
   1504 
   1505     @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
   1506     def test_makefile_close(self):
   1507         # Issue #5238: creating a file-like object with makefile() shouldn't
   1508         # delay closing the underlying "real socket" (here tested with its
   1509         # file descriptor, hence skipping the test under Windows).
   1510         with support.transient_internet(REMOTE_HOST):
   1511             ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
   1512             ss.connect((REMOTE_HOST, 443))
   1513             fd = ss.fileno()
   1514             f = ss.makefile()
   1515             f.close()
   1516             # The fd is still open
   1517             os.read(fd, 0)
   1518             # Closing the SSL socket should close the fd too
   1519             ss.close()
   1520             gc.collect()
   1521             with self.assertRaises(OSError) as e:
   1522                 os.read(fd, 0)
   1523             self.assertEqual(e.exception.errno, errno.EBADF)
   1524 
   1525     def test_non_blocking_handshake(self):
   1526         with support.transient_internet(REMOTE_HOST):
   1527             s = socket.socket(socket.AF_INET)
   1528             s.connect((REMOTE_HOST, 443))
   1529             s.setblocking(False)
   1530             s = ssl.wrap_socket(s,
   1531                                 cert_reqs=ssl.CERT_NONE,
   1532                                 do_handshake_on_connect=False)
   1533             count = 0
   1534             while True:
   1535                 try:
   1536                     count += 1
   1537                     s.do_handshake()
   1538                     break
   1539                 except ssl.SSLWantReadError:
   1540                     select.select([s], [], [])
   1541                 except ssl.SSLWantWriteError:
   1542                     select.select([], [s], [])
   1543             s.close()
   1544             if support.verbose:
   1545                 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
   1546 
   1547     def test_get_server_certificate(self):
   1548         def _test_get_server_certificate(host, port, cert=None):
   1549             with support.transient_internet(host):
   1550                 pem = ssl.get_server_certificate((host, port))
   1551                 if not pem:
   1552                     self.fail("No server certificate on %s:%s!" % (host, port))
   1553 
   1554                 try:
   1555                     pem = ssl.get_server_certificate((host, port),
   1556                                                      ca_certs=CERTFILE)
   1557                 except ssl.SSLError as x:
   1558                     #should fail
   1559                     if support.verbose:
   1560                         sys.stdout.write("%s\n" % x)
   1561                 else:
   1562                     self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
   1563                 pem = ssl.get_server_certificate((host, port),
   1564                                                  ca_certs=cert)
   1565                 if not pem:
   1566                     self.fail("No server certificate on %s:%s!" % (host, port))
   1567                 if support.verbose:
   1568                     sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
   1569 
   1570         _test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT)
   1571         if support.IPV6_ENABLED:
   1572             _test_get_server_certificate('ipv6.google.com', 443)
   1573 
   1574     def test_ciphers(self):
   1575         remote = (REMOTE_HOST, 443)
   1576         with support.transient_internet(remote[0]):
   1577             with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
   1578                                          cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
   1579                 s.connect(remote)
   1580             with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
   1581                                          cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
   1582                 s.connect(remote)
   1583             # Error checking can happen at instantiation or when connecting
   1584             with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
   1585                 with closing(socket.socket(socket.AF_INET)) as sock:
   1586                     s = ssl.wrap_socket(sock,
   1587                                         cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
   1588                     s.connect(remote)
   1589 
   1590     def test_algorithms(self):
   1591         # Issue #8484: all algorithms should be available when verifying a
   1592         # certificate.
   1593         # SHA256 was added in OpenSSL 0.9.8
   1594         if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
   1595             self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
   1596         # sha256.tbs-internet.com needs SNI to use the correct certificate
   1597         if not ssl.HAS_SNI:
   1598             self.skipTest("SNI needed for this test")
   1599         # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
   1600         remote = ("sha256.tbs-internet.com", 443)
   1601         sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
   1602         with support.transient_internet("sha256.tbs-internet.com"):
   1603             ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1604             ctx.verify_mode = ssl.CERT_REQUIRED
   1605             ctx.load_verify_locations(sha256_cert)
   1606             s = ctx.wrap_socket(socket.socket(socket.AF_INET),
   1607                                 server_hostname="sha256.tbs-internet.com")
   1608             try:
   1609                 s.connect(remote)
   1610                 if support.verbose:
   1611                     sys.stdout.write("\nCipher with %r is %r\n" %
   1612                                      (remote, s.cipher()))
   1613                     sys.stdout.write("Certificate is:\n%s\n" %
   1614                                      pprint.pformat(s.getpeercert()))
   1615             finally:
   1616                 s.close()
   1617 
   1618     def test_get_ca_certs_capath(self):
   1619         # capath certs are loaded on request
   1620         with support.transient_internet(REMOTE_HOST):
   1621             ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   1622             ctx.verify_mode = ssl.CERT_REQUIRED
   1623             ctx.load_verify_locations(capath=CAPATH)
   1624             self.assertEqual(ctx.get_ca_certs(), [])
   1625             s = ctx.wrap_socket(socket.socket(socket.AF_INET))
   1626             s.connect((REMOTE_HOST, 443))
   1627             try:
   1628                 cert = s.getpeercert()
   1629                 self.assertTrue(cert)
   1630             finally:
   1631                 s.close()
   1632             self.assertEqual(len(ctx.get_ca_certs()), 1)
   1633 
   1634     @needs_sni
   1635     def test_context_setget(self):
   1636         # Check that the context of a connected socket can be replaced.
   1637         with support.transient_internet(REMOTE_HOST):
   1638             ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   1639             ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   1640             s = socket.socket(socket.AF_INET)
   1641             with closing(ctx1.wrap_socket(s)) as ss:
   1642                 ss.connect((REMOTE_HOST, 443))
   1643                 self.assertIs(ss.context, ctx1)
   1644                 self.assertIs(ss._sslobj.context, ctx1)
   1645                 ss.context = ctx2
   1646                 self.assertIs(ss.context, ctx2)
   1647                 self.assertIs(ss._sslobj.context, ctx2)
   1648 
   1649 try:
   1650     import threading
   1651 except ImportError:
   1652     _have_threads = False
   1653 else:
   1654     _have_threads = True
   1655 
   1656     from test.ssl_servers import make_https_server
   1657 
   1658     class ThreadedEchoServer(threading.Thread):
   1659 
   1660         class ConnectionHandler(threading.Thread):
   1661 
   1662             """A mildly complicated class, because we want it to work both
   1663             with and without the SSL wrapper around the socket connection, so
   1664             that we can test the STARTTLS functionality."""
   1665 
   1666             def __init__(self, server, connsock, addr):
   1667                 self.server = server
   1668                 self.running = False
   1669                 self.sock = connsock
   1670                 self.addr = addr
   1671                 self.sock.setblocking(1)
   1672                 self.sslconn = None
   1673                 threading.Thread.__init__(self)
   1674                 self.daemon = True
   1675 
   1676             def wrap_conn(self):
   1677                 try:
   1678                     self.sslconn = self.server.context.wrap_socket(
   1679                         self.sock, server_side=True)
   1680                     self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
   1681                     self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
   1682                 except socket.error as e:
   1683                     # We treat ConnectionResetError as though it were an
   1684                     # SSLError - OpenSSL on Ubuntu abruptly closes the
   1685                     # connection when asked to use an unsupported protocol.
   1686                     #
   1687                     # XXX Various errors can have happened here, for example
   1688                     # a mismatching protocol version, an invalid certificate,
   1689                     # or a low-level bug. This should be made more discriminating.
   1690                     if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
   1691                         raise
   1692                     self.server.conn_errors.append(e)
   1693                     if self.server.chatty:
   1694                         handle_error("\n server:  bad connection attempt from " + repr(self.addr) + ":\n")
   1695                     self.running = False
   1696                     self.server.stop()
   1697                     self.close()
   1698                     return False
   1699                 else:
   1700                     if self.server.context.verify_mode == ssl.CERT_REQUIRED:
   1701                         cert = self.sslconn.getpeercert()
   1702                         if support.verbose and self.server.chatty:
   1703                             sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
   1704                         cert_binary = self.sslconn.getpeercert(True)
   1705                         if support.verbose and self.server.chatty:
   1706                             sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
   1707                     cipher = self.sslconn.cipher()
   1708                     if support.verbose and self.server.chatty:
   1709                         sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
   1710                         sys.stdout.write(" server: selected protocol is now "
   1711                                 + str(self.sslconn.selected_npn_protocol()) + "\n")
   1712                     return True
   1713 
   1714             def read(self):
   1715                 if self.sslconn:
   1716                     return self.sslconn.read()
   1717                 else:
   1718                     return self.sock.recv(1024)
   1719 
   1720             def write(self, bytes):
   1721                 if self.sslconn:
   1722                     return self.sslconn.write(bytes)
   1723                 else:
   1724                     return self.sock.send(bytes)
   1725 
   1726             def close(self):
   1727                 if self.sslconn:
   1728                     self.sslconn.close()
   1729                 else:
   1730                     self.sock.close()
   1731 
   1732             def run(self):
   1733                 self.running = True
   1734                 if not self.server.starttls_server:
   1735                     if not self.wrap_conn():
   1736                         return
   1737                 while self.running:
   1738                     try:
   1739                         msg = self.read()
   1740                         stripped = msg.strip()
   1741                         if not stripped:
   1742                             # eof, so quit this handler
   1743                             self.running = False
   1744                             self.close()
   1745                         elif stripped == b'over':
   1746                             if support.verbose and self.server.connectionchatty:
   1747                                 sys.stdout.write(" server: client closed connection\n")
   1748                             self.close()
   1749                             return
   1750                         elif (self.server.starttls_server and
   1751                               stripped == b'STARTTLS'):
   1752                             if support.verbose and self.server.connectionchatty:
   1753                                 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
   1754                             self.write(b"OK\n")
   1755                             if not self.wrap_conn():
   1756                                 return
   1757                         elif (self.server.starttls_server and self.sslconn
   1758                               and stripped == b'ENDTLS'):
   1759                             if support.verbose and self.server.connectionchatty:
   1760                                 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
   1761                             self.write(b"OK\n")
   1762                             self.sock = self.sslconn.unwrap()
   1763                             self.sslconn = None
   1764                             if support.verbose and self.server.connectionchatty:
   1765                                 sys.stdout.write(" server: connection is now unencrypted...\n")
   1766                         elif stripped == b'CB tls-unique':
   1767                             if support.verbose and self.server.connectionchatty:
   1768                                 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
   1769                             data = self.sslconn.get_channel_binding("tls-unique")
   1770                             self.write(repr(data).encode("us-ascii") + b"\n")
   1771                         else:
   1772                             if (support.verbose and
   1773                                 self.server.connectionchatty):
   1774                                 ctype = (self.sslconn and "encrypted") or "unencrypted"
   1775                                 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
   1776                                                  % (msg, ctype, msg.lower(), ctype))
   1777                             self.write(msg.lower())
   1778                     except ssl.SSLError:
   1779                         if self.server.chatty:
   1780                             handle_error("Test server failure:\n")
   1781                         self.close()
   1782                         self.running = False
   1783                         # normally, we'd just stop here, but for the test
   1784                         # harness, we want to stop the server
   1785                         self.server.stop()
   1786 
   1787         def __init__(self, certificate=None, ssl_version=None,
   1788                      certreqs=None, cacerts=None,
   1789                      chatty=True, connectionchatty=False, starttls_server=False,
   1790                      npn_protocols=None, alpn_protocols=None,
   1791                      ciphers=None, context=None):
   1792             if context:
   1793                 self.context = context
   1794             else:
   1795                 self.context = ssl.SSLContext(ssl_version
   1796                                               if ssl_version is not None
   1797                                               else ssl.PROTOCOL_TLSv1)
   1798                 self.context.verify_mode = (certreqs if certreqs is not None
   1799                                             else ssl.CERT_NONE)
   1800                 if cacerts:
   1801                     self.context.load_verify_locations(cacerts)
   1802                 if certificate:
   1803                     self.context.load_cert_chain(certificate)
   1804                 if npn_protocols:
   1805                     self.context.set_npn_protocols(npn_protocols)
   1806                 if alpn_protocols:
   1807                     self.context.set_alpn_protocols(alpn_protocols)
   1808                 if ciphers:
   1809                     self.context.set_ciphers(ciphers)
   1810             self.chatty = chatty
   1811             self.connectionchatty = connectionchatty
   1812             self.starttls_server = starttls_server
   1813             self.sock = socket.socket()
   1814             self.port = support.bind_port(self.sock)
   1815             self.flag = None
   1816             self.active = False
   1817             self.selected_npn_protocols = []
   1818             self.selected_alpn_protocols = []
   1819             self.conn_errors = []
   1820             threading.Thread.__init__(self)
   1821             self.daemon = True
   1822 
   1823         def __enter__(self):
   1824             self.start(threading.Event())
   1825             self.flag.wait()
   1826             return self
   1827 
   1828         def __exit__(self, *args):
   1829             self.stop()
   1830             self.join()
   1831 
   1832         def start(self, flag=None):
   1833             self.flag = flag
   1834             threading.Thread.start(self)
   1835 
   1836         def run(self):
   1837             self.sock.settimeout(0.05)
   1838             self.sock.listen(5)
   1839             self.active = True
   1840             if self.flag:
   1841                 # signal an event
   1842                 self.flag.set()
   1843             while self.active:
   1844                 try:
   1845                     newconn, connaddr = self.sock.accept()
   1846                     if support.verbose and self.chatty:
   1847                         sys.stdout.write(' server:  new connection from '
   1848                                          + repr(connaddr) + '\n')
   1849                     handler = self.ConnectionHandler(self, newconn, connaddr)
   1850                     handler.start()
   1851                     handler.join()
   1852                 except socket.timeout:
   1853                     pass
   1854                 except KeyboardInterrupt:
   1855                     self.stop()
   1856             self.sock.close()
   1857 
   1858         def stop(self):
   1859             self.active = False
   1860 
   1861     class AsyncoreEchoServer(threading.Thread):
   1862 
   1863         class EchoServer(asyncore.dispatcher):
   1864 
   1865             class ConnectionHandler(asyncore.dispatcher_with_send):
   1866 
   1867                 def __init__(self, conn, certfile):
   1868                     self.socket = ssl.wrap_socket(conn, server_side=True,
   1869                                                   certfile=certfile,
   1870                                                   do_handshake_on_connect=False)
   1871                     asyncore.dispatcher_with_send.__init__(self, self.socket)
   1872                     self._ssl_accepting = True
   1873                     self._do_ssl_handshake()
   1874 
   1875                 def readable(self):
   1876                     if isinstance(self.socket, ssl.SSLSocket):
   1877                         while self.socket.pending() > 0:
   1878                             self.handle_read_event()
   1879                     return True
   1880 
   1881                 def _do_ssl_handshake(self):
   1882                     try:
   1883                         self.socket.do_handshake()
   1884                     except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
   1885                         return
   1886                     except ssl.SSLEOFError:
   1887                         return self.handle_close()
   1888                     except ssl.SSLError:
   1889                         raise
   1890                     except socket.error, err:
   1891                         if err.args[0] == errno.ECONNABORTED:
   1892                             return self.handle_close()
   1893                     else:
   1894                         self._ssl_accepting = False
   1895 
   1896                 def handle_read(self):
   1897                     if self._ssl_accepting:
   1898                         self._do_ssl_handshake()
   1899                     else:
   1900                         data = self.recv(1024)
   1901                         if support.verbose:
   1902                             sys.stdout.write(" server:  read %s from client\n" % repr(data))
   1903                         if not data:
   1904                             self.close()
   1905                         else:
   1906                             self.send(data.lower())
   1907 
   1908                 def handle_close(self):
   1909                     self.close()
   1910                     if support.verbose:
   1911                         sys.stdout.write(" server:  closed connection %s\n" % self.socket)
   1912 
   1913                 def handle_error(self):
   1914                     raise
   1915 
   1916             def __init__(self, certfile):
   1917                 self.certfile = certfile
   1918                 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   1919                 self.port = support.bind_port(sock, '')
   1920                 asyncore.dispatcher.__init__(self, sock)
   1921                 self.listen(5)
   1922 
   1923             def handle_accept(self):
   1924                 sock_obj, addr = self.accept()
   1925                 if support.verbose:
   1926                     sys.stdout.write(" server:  new connection from %s:%s\n" %addr)
   1927                 self.ConnectionHandler(sock_obj, self.certfile)
   1928 
   1929             def handle_error(self):
   1930                 raise
   1931 
   1932         def __init__(self, certfile):
   1933             self.flag = None
   1934             self.active = False
   1935             self.server = self.EchoServer(certfile)
   1936             self.port = self.server.port
   1937             threading.Thread.__init__(self)
   1938             self.daemon = True
   1939 
   1940         def __str__(self):
   1941             return "<%s %s>" % (self.__class__.__name__, self.server)
   1942 
   1943         def __enter__(self):
   1944             self.start(threading.Event())
   1945             self.flag.wait()
   1946             return self
   1947 
   1948         def __exit__(self, *args):
   1949             if support.verbose:
   1950                 sys.stdout.write(" cleanup: stopping server.\n")
   1951             self.stop()
   1952             if support.verbose:
   1953                 sys.stdout.write(" cleanup: joining server thread.\n")
   1954             self.join()
   1955             if support.verbose:
   1956                 sys.stdout.write(" cleanup: successfully joined.\n")
   1957 
   1958         def start(self, flag=None):
   1959             self.flag = flag
   1960             threading.Thread.start(self)
   1961 
   1962         def run(self):
   1963             self.active = True
   1964             if self.flag:
   1965                 self.flag.set()
   1966             while self.active:
   1967                 try:
   1968                     asyncore.loop(1)
   1969                 except:
   1970                     pass
   1971 
   1972         def stop(self):
   1973             self.active = False
   1974             self.server.close()
   1975 
   1976     def server_params_test(client_context, server_context, indata=b"FOO\n",
   1977                            chatty=True, connectionchatty=False, sni_name=None):
   1978         """
   1979         Launch a server, connect a client to it and try various reads
   1980         and writes.
   1981         """
   1982         stats = {}
   1983         server = ThreadedEchoServer(context=server_context,
   1984                                     chatty=chatty,
   1985                                     connectionchatty=False)
   1986         with server:
   1987             with closing(client_context.wrap_socket(socket.socket(),
   1988                     server_hostname=sni_name)) as s:
   1989                 s.connect((HOST, server.port))
   1990                 for arg in [indata, bytearray(indata), memoryview(indata)]:
   1991                     if connectionchatty:
   1992                         if support.verbose:
   1993                             sys.stdout.write(
   1994                                 " client:  sending %r...\n" % indata)
   1995                     s.write(arg)
   1996                     outdata = s.read()
   1997                     if connectionchatty:
   1998                         if support.verbose:
   1999                             sys.stdout.write(" client:  read %r\n" % outdata)
   2000                     if outdata != indata.lower():
   2001                         raise AssertionError(
   2002                             "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
   2003                             % (outdata[:20], len(outdata),
   2004                                indata[:20].lower(), len(indata)))
   2005                 s.write(b"over\n")
   2006                 if connectionchatty:
   2007                     if support.verbose:
   2008                         sys.stdout.write(" client:  closing connection.\n")
   2009                 stats.update({
   2010                     'compression': s.compression(),
   2011                     'cipher': s.cipher(),
   2012                     'peercert': s.getpeercert(),
   2013                     'client_alpn_protocol': s.selected_alpn_protocol(),
   2014                     'client_npn_protocol': s.selected_npn_protocol(),
   2015                     'version': s.version(),
   2016                 })
   2017                 s.close()
   2018             stats['server_alpn_protocols'] = server.selected_alpn_protocols
   2019             stats['server_npn_protocols'] = server.selected_npn_protocols
   2020         return stats
   2021 
   2022     def try_protocol_combo(server_protocol, client_protocol, expect_success,
   2023                            certsreqs=None, server_options=0, client_options=0):
   2024         """
   2025         Try to SSL-connect using *client_protocol* to *server_protocol*.
   2026         If *expect_success* is true, assert that the connection succeeds,
   2027         if it's false, assert that the connection fails.
   2028         Also, if *expect_success* is a string, assert that it is the protocol
   2029         version actually used by the connection.
   2030         """
   2031         if certsreqs is None:
   2032             certsreqs = ssl.CERT_NONE
   2033         certtype = {
   2034             ssl.CERT_NONE: "CERT_NONE",
   2035             ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
   2036             ssl.CERT_REQUIRED: "CERT_REQUIRED",
   2037         }[certsreqs]
   2038         if support.verbose:
   2039             formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
   2040             sys.stdout.write(formatstr %
   2041                              (ssl.get_protocol_name(client_protocol),
   2042                               ssl.get_protocol_name(server_protocol),
   2043                               certtype))
   2044         client_context = ssl.SSLContext(client_protocol)
   2045         client_context.options |= client_options
   2046         server_context = ssl.SSLContext(server_protocol)
   2047         server_context.options |= server_options
   2048 
   2049         # NOTE: we must enable "ALL" ciphers on the client, otherwise an
   2050         # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
   2051         # starting from OpenSSL 1.0.0 (see issue #8322).
   2052         if client_context.protocol == ssl.PROTOCOL_SSLv23:
   2053             client_context.set_ciphers("ALL")
   2054 
   2055         for ctx in (client_context, server_context):
   2056             ctx.verify_mode = certsreqs
   2057             ctx.load_cert_chain(CERTFILE)
   2058             ctx.load_verify_locations(CERTFILE)
   2059         try:
   2060             stats = server_params_test(client_context, server_context,
   2061                                        chatty=False, connectionchatty=False)
   2062         # Protocol mismatch can result in either an SSLError, or a
   2063         # "Connection reset by peer" error.
   2064         except ssl.SSLError:
   2065             if expect_success:
   2066                 raise
   2067         except socket.error as e:
   2068             if expect_success or e.errno != errno.ECONNRESET:
   2069                 raise
   2070         else:
   2071             if not expect_success:
   2072                 raise AssertionError(
   2073                     "Client protocol %s succeeded with server protocol %s!"
   2074                     % (ssl.get_protocol_name(client_protocol),
   2075                        ssl.get_protocol_name(server_protocol)))
   2076             elif (expect_success is not True
   2077                   and expect_success != stats['version']):
   2078                 raise AssertionError("version mismatch: expected %r, got %r"
   2079                                      % (expect_success, stats['version']))
   2080 
   2081 
   2082     class ThreadedTests(unittest.TestCase):
   2083 
   2084         @skip_if_broken_ubuntu_ssl
   2085         def test_echo(self):
   2086             """Basic test of an SSL client connecting to a server"""
   2087             if support.verbose:
   2088                 sys.stdout.write("\n")
   2089             for protocol in PROTOCOLS:
   2090                 context = ssl.SSLContext(protocol)
   2091                 context.load_cert_chain(CERTFILE)
   2092                 server_params_test(context, context,
   2093                                    chatty=True, connectionchatty=True)
   2094 
   2095         def test_getpeercert(self):
   2096             if support.verbose:
   2097                 sys.stdout.write("\n")
   2098             context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   2099             context.verify_mode = ssl.CERT_REQUIRED
   2100             context.load_verify_locations(CERTFILE)
   2101             context.load_cert_chain(CERTFILE)
   2102             server = ThreadedEchoServer(context=context, chatty=False)
   2103             with server:
   2104                 s = context.wrap_socket(socket.socket(),
   2105                                         do_handshake_on_connect=False)
   2106                 s.connect((HOST, server.port))
   2107                 # getpeercert() raise ValueError while the handshake isn't
   2108                 # done.
   2109                 with self.assertRaises(ValueError):
   2110                     s.getpeercert()
   2111                 s.do_handshake()
   2112                 cert = s.getpeercert()
   2113                 self.assertTrue(cert, "Can't get peer certificate.")
   2114                 cipher = s.cipher()
   2115                 if support.verbose:
   2116                     sys.stdout.write(pprint.pformat(cert) + '\n')
   2117                     sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
   2118                 if 'subject' not in cert:
   2119                     self.fail("No subject field in certificate: %s." %
   2120                               pprint.pformat(cert))
   2121                 if ((('organizationName', 'Python Software Foundation'),)
   2122                     not in cert['subject']):
   2123                     self.fail(
   2124                         "Missing or invalid 'organizationName' field in certificate subject; "
   2125                         "should be 'Python Software Foundation'.")
   2126                 self.assertIn('notBefore', cert)
   2127                 self.assertIn('notAfter', cert)
   2128                 before = ssl.cert_time_to_seconds(cert['notBefore'])
   2129                 after = ssl.cert_time_to_seconds(cert['notAfter'])
   2130                 self.assertLess(before, after)
   2131                 s.close()
   2132 
   2133         @unittest.skipUnless(have_verify_flags(),
   2134                             "verify_flags need OpenSSL > 0.9.8")
   2135         def test_crl_check(self):
   2136             if support.verbose:
   2137                 sys.stdout.write("\n")
   2138 
   2139             server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   2140             server_context.load_cert_chain(SIGNED_CERTFILE)
   2141 
   2142             context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   2143             context.verify_mode = ssl.CERT_REQUIRED
   2144             context.load_verify_locations(SIGNING_CA)
   2145             tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
   2146             self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
   2147 
   2148             # VERIFY_DEFAULT should pass
   2149             server = ThreadedEchoServer(context=server_context, chatty=True)
   2150             with server:
   2151                 with closing(context.wrap_socket(socket.socket())) as s:
   2152                     s.connect((HOST, server.port))
   2153                     cert = s.getpeercert()
   2154                     self.assertTrue(cert, "Can't get peer certificate.")
   2155 
   2156             # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
   2157             context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
   2158 
   2159             server = ThreadedEchoServer(context=server_context, chatty=True)
   2160             with server:
   2161                 with closing(context.wrap_socket(socket.socket())) as s:
   2162                     with self.assertRaisesRegexp(ssl.SSLError,
   2163                                                 "certificate verify failed"):
   2164                         s.connect((HOST, server.port))
   2165 
   2166             # now load a CRL file. The CRL file is signed by the CA.
   2167             context.load_verify_locations(CRLFILE)
   2168 
   2169             server = ThreadedEchoServer(context=server_context, chatty=True)
   2170             with server:
   2171                 with closing(context.wrap_socket(socket.socket())) as s:
   2172                     s.connect((HOST, server.port))
   2173                     cert = s.getpeercert()
   2174                     self.assertTrue(cert, "Can't get peer certificate.")
   2175 
   2176         def test_check_hostname(self):
   2177             if support.verbose:
   2178                 sys.stdout.write("\n")
   2179 
   2180             server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   2181             server_context.load_cert_chain(SIGNED_CERTFILE)
   2182 
   2183             context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   2184             context.verify_mode = ssl.CERT_REQUIRED
   2185             context.check_hostname = True
   2186             context.load_verify_locations(SIGNING_CA)
   2187 
   2188             # correct hostname should verify
   2189             server = ThreadedEchoServer(context=server_context, chatty=True)
   2190             with server:
   2191                 with closing(context.wrap_socket(socket.socket(),
   2192                                                  server_hostname="localhost")) as s:
   2193                     s.connect((HOST, server.port))
   2194                     cert = s.getpeercert()
   2195                     self.assertTrue(cert, "Can't get peer certificate.")
   2196 
   2197             # incorrect hostname should raise an exception
   2198             server = ThreadedEchoServer(context=server_context, chatty=True)
   2199             with server:
   2200                 with closing(context.wrap_socket(socket.socket(),
   2201                                                  server_hostname="invalid")) as s:
   2202                     with self.assertRaisesRegexp(ssl.CertificateError,
   2203                                                 "hostname 'invalid' doesn't match u?'localhost'"):
   2204                         s.connect((HOST, server.port))
   2205 
   2206             # missing server_hostname arg should cause an exception, too
   2207             server = ThreadedEchoServer(context=server_context, chatty=True)
   2208             with server:
   2209                 with closing(socket.socket()) as s:
   2210                     with self.assertRaisesRegexp(ValueError,
   2211                                                 "check_hostname requires server_hostname"):
   2212                         context.wrap_socket(s)
   2213 
   2214         def test_wrong_cert(self):
   2215             """Connecting when the server rejects the client's certificate
   2216 
   2217             Launch a server with CERT_REQUIRED, and check that trying to
   2218             connect to it with a wrong client certificate fails.
   2219             """
   2220             certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
   2221                                        "wrongcert.pem")
   2222             server = ThreadedEchoServer(CERTFILE,
   2223                                         certreqs=ssl.CERT_REQUIRED,
   2224                                         cacerts=CERTFILE, chatty=False,
   2225                                         connectionchatty=False)
   2226             with server, \
   2227                     closing(socket.socket()) as sock, \
   2228                     closing(ssl.wrap_socket(sock,
   2229                                         certfile=certfile,
   2230                                         ssl_version=ssl.PROTOCOL_TLSv1)) as s:
   2231                 try:
   2232                     # Expect either an SSL error about the server rejecting
   2233                     # the connection, or a low-level connection reset (which
   2234                     # sometimes happens on Windows)
   2235                     s.connect((HOST, server.port))
   2236                 except ssl.SSLError as e:
   2237                     if support.verbose:
   2238                         sys.stdout.write("\nSSLError is %r\n" % e)
   2239                 except socket.error as e:
   2240                     if e.errno != errno.ECONNRESET:
   2241                         raise
   2242                     if support.verbose:
   2243                         sys.stdout.write("\nsocket.error is %r\n" % e)
   2244                 else:
   2245                     self.fail("Use of invalid cert should have failed!")
   2246 
   2247         def test_rude_shutdown(self):
   2248             """A brutal shutdown of an SSL server should raise an OSError
   2249             in the client when attempting handshake.
   2250             """
   2251             listener_ready = threading.Event()
   2252             listener_gone = threading.Event()
   2253 
   2254             s = socket.socket()
   2255             port = support.bind_port(s, HOST)
   2256 
   2257             # `listener` runs in a thread.  It sits in an accept() until
   2258             # the main thread connects.  Then it rudely closes the socket,
   2259             # and sets Event `listener_gone` to let the main thread know
   2260             # the socket is gone.
   2261             def listener():
   2262                 s.listen(5)
   2263                 listener_ready.set()
   2264                 newsock, addr = s.accept()
   2265                 newsock.close()
   2266                 s.close()
   2267                 listener_gone.set()
   2268 
   2269             def connector():
   2270                 listener_ready.wait()
   2271                 with closing(socket.socket()) as c:
   2272                     c.connect((HOST, port))
   2273                     listener_gone.wait()
   2274                     try:
   2275                         ssl_sock = ssl.wrap_socket(c)
   2276                     except socket.error:
   2277                         pass
   2278                     else:
   2279                         self.fail('connecting to closed SSL socket should have failed')
   2280 
   2281             t = threading.Thread(target=listener)
   2282             t.start()
   2283             try:
   2284                 connector()
   2285             finally:
   2286                 t.join()
   2287 
   2288         @skip_if_broken_ubuntu_ssl
   2289         @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
   2290                              "OpenSSL is compiled without SSLv2 support")
   2291         def test_protocol_sslv2(self):
   2292             """Connecting to an SSLv2 server with various client options"""
   2293             if support.verbose:
   2294                 sys.stdout.write("\n")
   2295             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
   2296             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
   2297             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
   2298             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
   2299             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
   2300             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
   2301             # SSLv23 client with specific SSL options
   2302             if no_sslv2_implies_sslv3_hello():
   2303                 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
   2304                 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
   2305                                    client_options=ssl.OP_NO_SSLv2)
   2306             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
   2307                                client_options=ssl.OP_NO_SSLv3)
   2308             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
   2309                                client_options=ssl.OP_NO_TLSv1)
   2310 
   2311         @skip_if_broken_ubuntu_ssl
   2312         def test_protocol_sslv23(self):
   2313             """Connecting to an SSLv23 server with various client options"""
   2314             if support.verbose:
   2315                 sys.stdout.write("\n")
   2316             if hasattr(ssl, 'PROTOCOL_SSLv2'):
   2317                 try:
   2318                     try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
   2319                 except socket.error as x:
   2320                     # this fails on some older versions of OpenSSL (0.9.7l, for instance)
   2321                     if support.verbose:
   2322                         sys.stdout.write(
   2323                             " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
   2324                             % str(x))
   2325             if hasattr(ssl, 'PROTOCOL_SSLv3'):
   2326                 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False)
   2327             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
   2328             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
   2329 
   2330             if hasattr(ssl, 'PROTOCOL_SSLv3'):
   2331                 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
   2332             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
   2333             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
   2334 
   2335             if hasattr(ssl, 'PROTOCOL_SSLv3'):
   2336                 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
   2337             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
   2338             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
   2339 
   2340             # Server with specific SSL options
   2341             if hasattr(ssl, 'PROTOCOL_SSLv3'):
   2342                 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
   2343                                server_options=ssl.OP_NO_SSLv3)
   2344             # Will choose TLSv1
   2345             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
   2346                                server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
   2347             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
   2348                                server_options=ssl.OP_NO_TLSv1)
   2349 
   2350 
   2351         @skip_if_broken_ubuntu_ssl
   2352         @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
   2353                              "OpenSSL is compiled without SSLv3 support")
   2354         def test_protocol_sslv3(self):
   2355             """Connecting to an SSLv3 server with various client options"""
   2356             if support.verbose:
   2357                 sys.stdout.write("\n")
   2358             try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
   2359             try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
   2360             try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
   2361             if hasattr(ssl, 'PROTOCOL_SSLv2'):
   2362                 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
   2363             try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
   2364                                client_options=ssl.OP_NO_SSLv3)
   2365             try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
   2366             if no_sslv2_implies_sslv3_hello():
   2367                 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
   2368                 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,
   2369                                    False, client_options=ssl.OP_NO_SSLv2)
   2370 
   2371         @skip_if_broken_ubuntu_ssl
   2372         def test_protocol_tlsv1(self):
   2373             """Connecting to a TLSv1 server with various client options"""
   2374             if support.verbose:
   2375                 sys.stdout.write("\n")
   2376             try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
   2377             try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
   2378             try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
   2379             if hasattr(ssl, 'PROTOCOL_SSLv2'):
   2380                 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
   2381             if hasattr(ssl, 'PROTOCOL_SSLv3'):
   2382                 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
   2383             try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
   2384                                client_options=ssl.OP_NO_TLSv1)
   2385 
   2386         @skip_if_broken_ubuntu_ssl
   2387         @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
   2388                              "TLS version 1.1 not supported.")
   2389         def test_protocol_tlsv1_1(self):
   2390             """Connecting to a TLSv1.1 server with various client options.
   2391                Testing against older TLS versions."""
   2392             if support.verbose:
   2393                 sys.stdout.write("\n")
   2394             try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
   2395             if hasattr(ssl, 'PROTOCOL_SSLv2'):
   2396                 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
   2397             if hasattr(ssl, 'PROTOCOL_SSLv3'):
   2398                 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
   2399             try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
   2400                                client_options=ssl.OP_NO_TLSv1_1)
   2401 
   2402             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
   2403             try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
   2404             try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
   2405 
   2406 
   2407         @skip_if_broken_ubuntu_ssl
   2408         @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
   2409                              "TLS version 1.2 not supported.")
   2410         def test_protocol_tlsv1_2(self):
   2411             """Connecting to a TLSv1.2 server with various client options.
   2412                Testing against older TLS versions."""
   2413             if support.verbose:
   2414                 sys.stdout.write("\n")
   2415             try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
   2416                                server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
   2417                                client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
   2418             if hasattr(ssl, 'PROTOCOL_SSLv2'):
   2419                 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
   2420             if hasattr(ssl, 'PROTOCOL_SSLv3'):
   2421                 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
   2422             try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
   2423                                client_options=ssl.OP_NO_TLSv1_2)
   2424 
   2425             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
   2426             try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
   2427             try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
   2428             try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
   2429             try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
   2430 
   2431         def test_starttls(self):
   2432             """Switching from clear text to encrypted and back again."""
   2433             msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
   2434 
   2435             server = ThreadedEchoServer(CERTFILE,
   2436                                         ssl_version=ssl.PROTOCOL_TLSv1,
   2437                                         starttls_server=True,
   2438                                         chatty=True,
   2439                                         connectionchatty=True)
   2440             wrapped = False
   2441             with server:
   2442                 s = socket.socket()
   2443                 s.setblocking(1)
   2444                 s.connect((HOST, server.port))
   2445                 if support.verbose:
   2446                     sys.stdout.write("\n")
   2447                 for indata in msgs:
   2448                     if support.verbose:
   2449                         sys.stdout.write(
   2450                             " client:  sending %r...\n" % indata)
   2451                     if wrapped:
   2452                         conn.write(indata)
   2453                         outdata = conn.read()
   2454                     else:
   2455                         s.send(indata)
   2456                         outdata = s.recv(1024)
   2457                     msg = outdata.strip().lower()
   2458                     if indata == b"STARTTLS" and msg.startswith(b"ok"):
   2459                         # STARTTLS ok, switch to secure mode
   2460                         if support.verbose:
   2461                             sys.stdout.write(
   2462                                 " client:  read %r from server, starting TLS...\n"
   2463                                 % msg)
   2464                         conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
   2465                         wrapped = True
   2466                     elif indata == b"ENDTLS" and msg.startswith(b"ok"):
   2467                         # ENDTLS ok, switch back to clear text
   2468                         if support.verbose:
   2469                             sys.stdout.write(
   2470                                 " client:  read %r from server, ending TLS...\n"
   2471                                 % msg)
   2472                         s = conn.unwrap()
   2473                         wrapped = False
   2474                     else:
   2475                         if support.verbose:
   2476                             sys.stdout.write(
   2477                                 " client:  read %r from server\n" % msg)
   2478                 if support.verbose:
   2479                     sys.stdout.write(" client:  closing connection.\n")
   2480                 if wrapped:
   2481                     conn.write(b"over\n")
   2482                 else:
   2483                     s.send(b"over\n")
   2484                 if wrapped:
   2485                     conn.close()
   2486                 else:
   2487                     s.close()
   2488 
   2489         def test_socketserver(self):
   2490             """Using a SocketServer to create and manage SSL connections."""
   2491             server = make_https_server(self, certfile=CERTFILE)
   2492             # try to connect
   2493             if support.verbose:
   2494                 sys.stdout.write('\n')
   2495             with open(CERTFILE, 'rb') as f:
   2496                 d1 = f.read()
   2497             d2 = ''
   2498             # now fetch the same data from the HTTPS server
   2499             url = 'https://localhost:%d/%s' % (
   2500                 server.port, os.path.split(CERTFILE)[1])
   2501             context = ssl.create_default_context(cafile=CERTFILE)
   2502             f = urllib2.urlopen(url, context=context)
   2503             try:
   2504                 dlen = f.info().getheader("content-length")
   2505                 if dlen and (int(dlen) > 0):
   2506                     d2 = f.read(int(dlen))
   2507                     if support.verbose:
   2508                         sys.stdout.write(
   2509                             " client: read %d bytes from remote server '%s'\n"
   2510                             % (len(d2), server))
   2511             finally:
   2512                 f.close()
   2513             self.assertEqual(d1, d2)
   2514 
   2515         def test_asyncore_server(self):
   2516             """Check the example asyncore integration."""
   2517             if support.verbose:
   2518                 sys.stdout.write("\n")
   2519 
   2520             indata = b"FOO\n"
   2521             server = AsyncoreEchoServer(CERTFILE)
   2522             with server:
   2523                 s = ssl.wrap_socket(socket.socket())
   2524                 s.connect(('127.0.0.1', server.port))
   2525                 if support.verbose:
   2526                     sys.stdout.write(
   2527                         " client:  sending %r...\n" % indata)
   2528                 s.write(indata)
   2529                 outdata = s.read()
   2530                 if support.verbose:
   2531                     sys.stdout.write(" client:  read %r\n" % outdata)
   2532                 if outdata != indata.lower():
   2533                     self.fail(
   2534                         "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
   2535                         % (outdata[:20], len(outdata),
   2536                            indata[:20].lower(), len(indata)))
   2537                 s.write(b"over\n")
   2538                 if support.verbose:
   2539                     sys.stdout.write(" client:  closing connection.\n")
   2540                 s.close()
   2541                 if support.verbose:
   2542                     sys.stdout.write(" client:  connection closed.\n")
   2543 
   2544         def test_recv_send(self):
   2545             """Test recv(), send() and friends."""
   2546             if support.verbose:
   2547                 sys.stdout.write("\n")
   2548 
   2549             server = ThreadedEchoServer(CERTFILE,
   2550                                         certreqs=ssl.CERT_NONE,
   2551                                         ssl_version=ssl.PROTOCOL_TLSv1,
   2552                                         cacerts=CERTFILE,
   2553                                         chatty=True,
   2554                                         connectionchatty=False)
   2555             with server:
   2556                 s = ssl.wrap_socket(socket.socket(),
   2557                                     server_side=False,
   2558                                     certfile=CERTFILE,
   2559                                     ca_certs=CERTFILE,
   2560                                     cert_reqs=ssl.CERT_NONE,
   2561                                     ssl_version=ssl.PROTOCOL_TLSv1)
   2562                 s.connect((HOST, server.port))
   2563                 # helper methods for standardising recv* method signatures
   2564                 def _recv_into():
   2565                     b = bytearray(b"\0"*100)
   2566                     count = s.recv_into(b)
   2567                     return b[:count]
   2568 
   2569                 def _recvfrom_into():
   2570                     b = bytearray(b"\0"*100)
   2571                     count, addr = s.recvfrom_into(b)
   2572                     return b[:count]
   2573 
   2574                 # (name, method, whether to expect success, *args)
   2575                 send_methods = [
   2576                     ('send', s.send, True, []),
   2577                     ('sendto', s.sendto, False, ["some.address"]),
   2578                     ('sendall', s.sendall, True, []),
   2579                 ]
   2580                 recv_methods = [
   2581                     ('recv', s.recv, True, []),
   2582                     ('recvfrom', s.recvfrom, False, ["some.address"]),
   2583                     ('recv_into', _recv_into, True, []),
   2584                     ('recvfrom_into', _recvfrom_into, False, []),
   2585                 ]
   2586                 data_prefix = u"PREFIX_"
   2587 
   2588                 for meth_name, send_meth, expect_success, args in send_methods:
   2589                     indata = (data_prefix + meth_name).encode('ascii')
   2590                     try:
   2591                         send_meth(indata, *args)
   2592                         outdata = s.read()
   2593                         if outdata != indata.lower():
   2594                             self.fail(
   2595                                 "While sending with <<{name:s}>> bad data "
   2596                                 "<<{outdata:r}>> ({nout:d}) received; "
   2597                                 "expected <<{indata:r}>> ({nin:d})\n".format(
   2598                                     name=meth_name, outdata=outdata[:20],
   2599                                     nout=len(outdata),
   2600                                     indata=indata[:20], nin=len(indata)
   2601                                 )
   2602                             )
   2603                     except ValueError as e:
   2604                         if expect_success:
   2605                             self.fail(
   2606                                 "Failed to send with method <<{name:s}>>; "
   2607                                 "expected to succeed.\n".format(name=meth_name)
   2608                             )
   2609                         if not str(e).startswith(meth_name):
   2610                             self.fail(
   2611                                 "Method <<{name:s}>> failed with unexpected "
   2612                                 "exception message: {exp:s}\n".format(
   2613                                     name=meth_name, exp=e
   2614                                 )
   2615                             )
   2616 
   2617                 for meth_name, recv_meth, expect_success, args in recv_methods:
   2618                     indata = (data_prefix + meth_name).encode('ascii')
   2619                     try:
   2620                         s.send(indata)
   2621                         outdata = recv_meth(*args)
   2622                         if outdata != indata.lower():
   2623                             self.fail(
   2624                                 "While receiving with <<{name:s}>> bad data "
   2625                                 "<<{outdata:r}>> ({nout:d}) received; "
   2626                                 "expected <<{indata:r}>> ({nin:d})\n".format(
   2627                                     name=meth_name, outdata=outdata[:20],
   2628                                     nout=len(outdata),
   2629                                     indata=indata[:20], nin=len(indata)
   2630                                 )
   2631                             )
   2632                     except ValueError as e:
   2633                         if expect_success:
   2634                             self.fail(
   2635                                 "Failed to receive with method <<{name:s}>>; "
   2636                                 "expected to succeed.\n".format(name=meth_name)
   2637                             )
   2638                         if not str(e).startswith(meth_name):
   2639                             self.fail(
   2640                                 "Method <<{name:s}>> failed with unexpected "
   2641                                 "exception message: {exp:s}\n".format(
   2642                                     name=meth_name, exp=e
   2643                                 )
   2644                             )
   2645                         # consume data
   2646                         s.read()
   2647 
   2648                 # read(-1, buffer) is supported, even though read(-1) is not
   2649                 data = b"data"
   2650                 s.send(data)
   2651                 buffer = bytearray(len(data))
   2652                 self.assertEqual(s.read(-1, buffer), len(data))
   2653                 self.assertEqual(buffer, data)
   2654 
   2655                 s.write(b"over\n")
   2656 
   2657                 self.assertRaises(ValueError, s.recv, -1)
   2658                 self.assertRaises(ValueError, s.read, -1)
   2659 
   2660                 s.close()
   2661 
   2662         def test_recv_zero(self):
   2663             server = ThreadedEchoServer(CERTFILE)
   2664             server.__enter__()
   2665             self.addCleanup(server.__exit__, None, None)
   2666             s = socket.create_connection((HOST, server.port))
   2667             self.addCleanup(s.close)
   2668             s = ssl.wrap_socket(s, suppress_ragged_eofs=False)
   2669             self.addCleanup(s.close)
   2670 
   2671             # recv/read(0) should return no data
   2672             s.send(b"data")
   2673             self.assertEqual(s.recv(0), b"")
   2674             self.assertEqual(s.read(0), b"")
   2675             self.assertEqual(s.read(), b"data")
   2676 
   2677             # Should not block if the other end sends no data
   2678             s.setblocking(False)
   2679             self.assertEqual(s.recv(0), b"")
   2680             self.assertEqual(s.recv_into(bytearray()), 0)
   2681 
   2682         def test_handshake_timeout(self):
   2683             # Issue #5103: SSL handshake must respect the socket timeout
   2684             server = socket.socket(socket.AF_INET)
   2685             host = "127.0.0.1"
   2686             port = support.bind_port(server)
   2687             started = threading.Event()
   2688             finish = False
   2689 
   2690             def serve():
   2691                 server.listen(5)
   2692                 started.set()
   2693                 conns = []
   2694                 while not finish:
   2695                     r, w, e = select.select([server], [], [], 0.1)
   2696                     if server in r:
   2697                         # Let the socket hang around rather than having
   2698                         # it closed by garbage collection.
   2699                         conns.append(server.accept()[0])
   2700                 for sock in conns:
   2701                     sock.close()
   2702 
   2703             t = threading.Thread(target=serve)
   2704             t.start()
   2705             started.wait()
   2706 
   2707             try:
   2708                 try:
   2709                     c = socket.socket(socket.AF_INET)
   2710                     c.settimeout(0.2)
   2711                     c.connect((host, port))
   2712                     # Will attempt handshake and time out
   2713                     self.assertRaisesRegexp(ssl.SSLError, "timed out",
   2714                                             ssl.wrap_socket, c)
   2715                 finally:
   2716                     c.close()
   2717                 try:
   2718                     c = socket.socket(socket.AF_INET)
   2719                     c = ssl.wrap_socket(c)
   2720                     c.settimeout(0.2)
   2721                     # Will attempt handshake and time out
   2722                     self.assertRaisesRegexp(ssl.SSLError, "timed out",
   2723                                             c.connect, (host, port))
   2724                 finally:
   2725                     c.close()
   2726             finally:
   2727                 finish = True
   2728                 t.join()
   2729                 server.close()
   2730 
   2731         def test_server_accept(self):
   2732             # Issue #16357: accept() on a SSLSocket created through
   2733             # SSLContext.wrap_socket().
   2734             context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   2735             context.verify_mode = ssl.CERT_REQUIRED
   2736             context.load_verify_locations(CERTFILE)
   2737             context.load_cert_chain(CERTFILE)
   2738             server = socket.socket(socket.AF_INET)
   2739             host = "127.0.0.1"
   2740             port = support.bind_port(server)
   2741             server = context.wrap_socket(server, server_side=True)
   2742 
   2743             evt = threading.Event()
   2744             remote = [None]
   2745             peer = [None]
   2746             def serve():
   2747                 server.listen(5)
   2748                 # Block on the accept and wait on the connection to close.
   2749                 evt.set()
   2750                 remote[0], peer[0] = server.accept()
   2751                 remote[0].recv(1)
   2752 
   2753             t = threading.Thread(target=serve)
   2754             t.start()
   2755             # Client wait until server setup and perform a connect.
   2756             evt.wait()
   2757             client = context.wrap_socket(socket.socket())
   2758             client.connect((host, port))
   2759             client_addr = client.getsockname()
   2760             client.close()
   2761             t.join()
   2762             remote[0].close()
   2763             server.close()
   2764             # Sanity checks.
   2765             self.assertIsInstance(remote[0], ssl.SSLSocket)
   2766             self.assertEqual(peer[0], client_addr)
   2767 
   2768         def test_getpeercert_enotconn(self):
   2769             context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   2770             with closing(context.wrap_socket(socket.socket())) as sock:
   2771                 with self.assertRaises(socket.error) as cm:
   2772                     sock.getpeercert()
   2773                 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
   2774 
   2775         def test_do_handshake_enotconn(self):
   2776             context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   2777             with closing(context.wrap_socket(socket.socket())) as sock:
   2778                 with self.assertRaises(socket.error) as cm:
   2779                     sock.do_handshake()
   2780                 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
   2781 
   2782         def test_default_ciphers(self):
   2783             context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   2784             try:
   2785                 # Force a set of weak ciphers on our client context
   2786                 context.set_ciphers("DES")
   2787             except ssl.SSLError:
   2788                 self.skipTest("no DES cipher available")
   2789             with ThreadedEchoServer(CERTFILE,
   2790                                     ssl_version=ssl.PROTOCOL_SSLv23,
   2791                                     chatty=False) as server:
   2792                 with closing(context.wrap_socket(socket.socket())) as s:
   2793                     with self.assertRaises(ssl.SSLError):
   2794                         s.connect((HOST, server.port))
   2795             self.assertIn("no shared cipher", str(server.conn_errors[0]))
   2796 
   2797         def test_version_basic(self):
   2798             """
   2799             Basic tests for SSLSocket.version().
   2800             More tests are done in the test_protocol_*() methods.
   2801             """
   2802             context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   2803             with ThreadedEchoServer(CERTFILE,
   2804                                     ssl_version=ssl.PROTOCOL_TLSv1,
   2805                                     chatty=False) as server:
   2806                 with closing(context.wrap_socket(socket.socket())) as s:
   2807                     self.assertIs(s.version(), None)
   2808                     s.connect((HOST, server.port))
   2809                     self.assertEqual(s.version(), 'TLSv1')
   2810                 self.assertIs(s.version(), None)
   2811 
   2812         @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
   2813         def test_default_ecdh_curve(self):
   2814             # Issue #21015: elliptic curve-based Diffie Hellman key exchange
   2815             # should be enabled by default on SSL contexts.
   2816             context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   2817             context.load_cert_chain(CERTFILE)
   2818             # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
   2819             # explicitly using the 'ECCdraft' cipher alias.  Otherwise,
   2820             # our default cipher list should prefer ECDH-based ciphers
   2821             # automatically.
   2822             if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
   2823                 context.set_ciphers("ECCdraft:ECDH")
   2824             with ThreadedEchoServer(context=context) as server:
   2825                 with closing(context.wrap_socket(socket.socket())) as s:
   2826                     s.connect((HOST, server.port))
   2827                     self.assertIn("ECDH", s.cipher()[0])
   2828 
   2829         @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
   2830                              "'tls-unique' channel binding not available")
   2831         def test_tls_unique_channel_binding(self):
   2832             """Test tls-unique channel binding."""
   2833             if support.verbose:
   2834                 sys.stdout.write("\n")
   2835 
   2836             server = ThreadedEchoServer(CERTFILE,
   2837                                         certreqs=ssl.CERT_NONE,
   2838                                         ssl_version=ssl.PROTOCOL_TLSv1,
   2839                                         cacerts=CERTFILE,
   2840                                         chatty=True,
   2841                                         connectionchatty=False)
   2842             with server:
   2843                 s = ssl.wrap_socket(socket.socket(),
   2844                                     server_side=False,
   2845                                     certfile=CERTFILE,
   2846                                     ca_certs=CERTFILE,
   2847                                     cert_reqs=ssl.CERT_NONE,
   2848                                     ssl_version=ssl.PROTOCOL_TLSv1)
   2849                 s.connect((HOST, server.port))
   2850                 # get the data
   2851                 cb_data = s.get_channel_binding("tls-unique")
   2852                 if support.verbose:
   2853                     sys.stdout.write(" got channel binding data: {0!r}\n"
   2854                                      .format(cb_data))
   2855 
   2856                 # check if it is sane
   2857                 self.assertIsNotNone(cb_data)
   2858                 self.assertEqual(len(cb_data), 12) # True for TLSv1
   2859 
   2860                 # and compare with the peers version
   2861                 s.write(b"CB tls-unique\n")
   2862                 peer_data_repr = s.read().strip()
   2863                 self.assertEqual(peer_data_repr,
   2864                                  repr(cb_data).encode("us-ascii"))
   2865                 s.close()
   2866 
   2867                 # now, again
   2868                 s = ssl.wrap_socket(socket.socket(),
   2869                                     server_side=False,
   2870                                     certfile=CERTFILE,
   2871                                     ca_certs=CERTFILE,
   2872                                     cert_reqs=ssl.CERT_NONE,
   2873                                     ssl_version=ssl.PROTOCOL_TLSv1)
   2874                 s.connect((HOST, server.port))
   2875                 new_cb_data = s.get_channel_binding("tls-unique")
   2876                 if support.verbose:
   2877                     sys.stdout.write(" got another channel binding data: {0!r}\n"
   2878                                      .format(new_cb_data))
   2879                 # is it really unique
   2880                 self.assertNotEqual(cb_data, new_cb_data)
   2881                 self.assertIsNotNone(cb_data)
   2882                 self.assertEqual(len(cb_data), 12) # True for TLSv1
   2883                 s.write(b"CB tls-unique\n")
   2884                 peer_data_repr = s.read().strip()
   2885                 self.assertEqual(peer_data_repr,
   2886                                  repr(new_cb_data).encode("us-ascii"))
   2887                 s.close()
   2888 
   2889         def test_compression(self):
   2890             context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   2891             context.load_cert_chain(CERTFILE)
   2892             stats = server_params_test(context, context,
   2893                                        chatty=True, connectionchatty=True)
   2894             if support.verbose:
   2895                 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
   2896             self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
   2897 
   2898         @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
   2899                              "ssl.OP_NO_COMPRESSION needed for this test")
   2900         def test_compression_disabled(self):
   2901             context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   2902             context.load_cert_chain(CERTFILE)
   2903             context.options |= ssl.OP_NO_COMPRESSION
   2904             stats = server_params_test(context, context,
   2905                                        chatty=True, connectionchatty=True)
   2906             self.assertIs(stats['compression'], None)
   2907 
   2908         def test_dh_params(self):
   2909             # Check we can get a connection with ephemeral Diffie-Hellman
   2910             context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   2911             context.load_cert_chain(CERTFILE)
   2912             context.load_dh_params(DHFILE)
   2913             context.set_ciphers("kEDH")
   2914             stats = server_params_test(context, context,
   2915                                        chatty=True, connectionchatty=True)
   2916             cipher = stats["cipher"][0]
   2917             parts = cipher.split("-")
   2918             if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
   2919                 self.fail("Non-DH cipher: " + cipher[0])
   2920 
   2921         def test_selected_alpn_protocol(self):
   2922             # selected_alpn_protocol() is None unless ALPN is used.
   2923             context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   2924             context.load_cert_chain(CERTFILE)
   2925             stats = server_params_test(context, context,
   2926                                        chatty=True, connectionchatty=True)
   2927             self.assertIs(stats['client_alpn_protocol'], None)
   2928 
   2929         @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
   2930         def test_selected_alpn_protocol_if_server_uses_alpn(self):
   2931             # selected_alpn_protocol() is None unless ALPN is used by the client.
   2932             client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   2933             client_context.load_verify_locations(CERTFILE)
   2934             server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   2935             server_context.load_cert_chain(CERTFILE)
   2936             server_context.set_alpn_protocols(['foo', 'bar'])
   2937             stats = server_params_test(client_context, server_context,
   2938                                        chatty=True, connectionchatty=True)
   2939             self.assertIs(stats['client_alpn_protocol'], None)
   2940 
   2941         @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
   2942         def test_alpn_protocols(self):
   2943             server_protocols = ['foo', 'bar', 'milkshake']
   2944             protocol_tests = [
   2945                 (['foo', 'bar'], 'foo'),
   2946                 (['bar', 'foo'], 'foo'),
   2947                 (['milkshake'], 'milkshake'),
   2948                 (['http/3.0', 'http/4.0'], None)
   2949             ]
   2950             for client_protocols, expected in protocol_tests:
   2951                 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
   2952                 server_context.load_cert_chain(CERTFILE)
   2953                 server_context.set_alpn_protocols(server_protocols)
   2954                 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
   2955                 client_context.load_cert_chain(CERTFILE)
   2956                 client_context.set_alpn_protocols(client_protocols)
   2957 
   2958                 try:
   2959                     stats = server_params_test(client_context,
   2960                                                server_context,
   2961                                                chatty=True,
   2962                                                connectionchatty=True)
   2963                 except ssl.SSLError as e:
   2964                     stats = e
   2965 
   2966                 if expected is None and IS_OPENSSL_1_1:
   2967                     # OpenSSL 1.1.0 raises handshake error
   2968                     self.assertIsInstance(stats, ssl.SSLError)
   2969                 else:
   2970                     msg = "failed trying %s (s) and %s (c).\n" \
   2971                         "was expecting %s, but got %%s from the %%s" \
   2972                             % (str(server_protocols), str(client_protocols),
   2973                                 str(expected))
   2974                     client_result = stats['client_alpn_protocol']
   2975                     self.assertEqual(client_result, expected,
   2976                                      msg % (client_result, "client"))
   2977                     server_result = stats['server_alpn_protocols'][-1] \
   2978                         if len(stats['server_alpn_protocols']) else 'nothing'
   2979                     self.assertEqual(server_result, expected,
   2980                                      msg % (server_result, "server"))
   2981 
   2982         def test_selected_npn_protocol(self):
   2983             # selected_npn_protocol() is None unless NPN is used
   2984             context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   2985             context.load_cert_chain(CERTFILE)
   2986             stats = server_params_test(context, context,
   2987                                        chatty=True, connectionchatty=True)
   2988             self.assertIs(stats['client_npn_protocol'], None)
   2989 
   2990         @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
   2991         def test_npn_protocols(self):
   2992             server_protocols = ['http/1.1', 'spdy/2']
   2993             protocol_tests = [
   2994                 (['http/1.1', 'spdy/2'], 'http/1.1'),
   2995                 (['spdy/2', 'http/1.1'], 'http/1.1'),
   2996                 (['spdy/2', 'test'], 'spdy/2'),
   2997                 (['abc', 'def'], 'abc')
   2998             ]
   2999             for client_protocols, expected in protocol_tests:
   3000                 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   3001                 server_context.load_cert_chain(CERTFILE)
   3002                 server_context.set_npn_protocols(server_protocols)
   3003                 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   3004                 client_context.load_cert_chain(CERTFILE)
   3005                 client_context.set_npn_protocols(client_protocols)
   3006                 stats = server_params_test(client_context, server_context,
   3007                                            chatty=True, connectionchatty=True)
   3008 
   3009                 msg = "failed trying %s (s) and %s (c).\n" \
   3010                       "was expecting %s, but got %%s from the %%s" \
   3011                           % (str(server_protocols), str(client_protocols),
   3012                              str(expected))
   3013                 client_result = stats['client_npn_protocol']
   3014                 self.assertEqual(client_result, expected, msg % (client_result, "client"))
   3015                 server_result = stats['server_npn_protocols'][-1] \
   3016                     if len(stats['server_npn_protocols']) else 'nothing'
   3017                 self.assertEqual(server_result, expected, msg % (server_result, "server"))
   3018 
   3019         def sni_contexts(self):
   3020             server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   3021             server_context.load_cert_chain(SIGNED_CERTFILE)
   3022             other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   3023             other_context.load_cert_chain(SIGNED_CERTFILE2)
   3024             client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
   3025             client_context.verify_mode = ssl.CERT_REQUIRED
   3026             client_context.load_verify_locations(SIGNING_CA)
   3027             return server_context, other_context, client_context
   3028 
   3029         def check_common_name(self, stats, name):
   3030             cert = stats['peercert']
   3031             self.assertIn((('commonName', name),), cert['subject'])
   3032 
   3033         @needs_sni
   3034         def test_sni_callback(self):
   3035             calls = []
   3036             server_context, other_context, client_context = self.sni_contexts()
   3037 
   3038             def servername_cb(ssl_sock, server_name, initial_context):
   3039                 calls.append((server_name, initial_context))
   3040                 if server_name is not None:
   3041                     ssl_sock.context = other_context
   3042             server_context.set_servername_callback(servername_cb)
   3043 
   3044             stats = server_params_test(client_context, server_context,
   3045                                        chatty=True,
   3046                                        sni_name='supermessage')
   3047             # The hostname was fetched properly, and the certificate was
   3048             # changed for the connection.
   3049             self.assertEqual(calls, [("supermessage", server_context)])
   3050             # CERTFILE4 was selected
   3051             self.check_common_name(stats, 'fakehostname')
   3052 
   3053             calls = []
   3054             # The callback is called with server_name=None
   3055             stats = server_params_test(client_context, server_context,
   3056                                        chatty=True,
   3057                                        sni_name=None)
   3058             self.assertEqual(calls, [(None, server_context)])
   3059             self.check_common_name(stats, 'localhost')
   3060 
   3061             # Check disabling the callback
   3062             calls = []
   3063             server_context.set_servername_callback(None)
   3064 
   3065             stats = server_params_test(client_context, server_context,
   3066                                        chatty=True,
   3067                                        sni_name='notfunny')
   3068             # Certificate didn't change
   3069             self.check_common_name(stats, 'localhost')
   3070             self.assertEqual(calls, [])
   3071 
   3072         @needs_sni
   3073         def test_sni_callback_alert(self):
   3074             # Returning a TLS alert is reflected to the connecting client
   3075             server_context, other_context, client_context = self.sni_contexts()
   3076 
   3077             def cb_returning_alert(ssl_sock, server_name, initial_context):
   3078                 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
   3079             server_context.set_servername_callback(cb_returning_alert)
   3080 
   3081             with self.assertRaises(ssl.SSLError) as cm:
   3082                 stats = server_params_test(client_context, server_context,
   3083                                            chatty=False,
   3084                                            sni_name='supermessage')
   3085             self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
   3086 
   3087         @needs_sni
   3088         def test_sni_callback_raising(self):
   3089             # Raising fails the connection with a TLS handshake failure alert.
   3090             server_context, other_context, client_context = self.sni_contexts()
   3091 
   3092             def cb_raising(ssl_sock, server_name, initial_context):
   3093                 1.0/0.0
   3094             server_context.set_servername_callback(cb_raising)
   3095 
   3096             with self.assertRaises(ssl.SSLError) as cm, \
   3097                  support.captured_stderr() as stderr:
   3098                 stats = server_params_test(client_context, server_context,
   3099                                            chatty=False,
   3100                                            sni_name='supermessage')
   3101             self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
   3102             self.assertIn("ZeroDivisionError", stderr.getvalue())
   3103 
   3104         @needs_sni
   3105         def test_sni_callback_wrong_return_type(self):
   3106             # Returning the wrong return type terminates the TLS connection
   3107             # with an internal error alert.
   3108             server_context, other_context, client_context = self.sni_contexts()
   3109 
   3110             def cb_wrong_return_type(ssl_sock, server_name, initial_context):
   3111                 return "foo"
   3112             server_context.set_servername_callback(cb_wrong_return_type)
   3113 
   3114             with self.assertRaises(ssl.SSLError) as cm, \
   3115                  support.captured_stderr() as stderr:
   3116                 stats = server_params_test(client_context, server_context,
   3117                                            chatty=False,
   3118                                            sni_name='supermessage')
   3119             self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
   3120             self.assertIn("TypeError", stderr.getvalue())
   3121 
   3122         def test_read_write_after_close_raises_valuerror(self):
   3123             context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   3124             context.verify_mode = ssl.CERT_REQUIRED
   3125             context.load_verify_locations(CERTFILE)
   3126             context.load_cert_chain(CERTFILE)
   3127             server = ThreadedEchoServer(context=context, chatty=False)
   3128 
   3129             with server:
   3130                 s = context.wrap_socket(socket.socket())
   3131                 s.connect((HOST, server.port))
   3132                 s.close()
   3133 
   3134                 self.assertRaises(ValueError, s.read, 1024)
   3135                 self.assertRaises(ValueError, s.write, b'hello')
   3136 
   3137 
   3138 def test_main(verbose=False):
   3139     if support.verbose:
   3140         plats = {
   3141             'Linux': platform.linux_distribution,
   3142             'Mac': platform.mac_ver,
   3143             'Windows': platform.win32_ver,
   3144         }
   3145         for name, func in plats.items():
   3146             plat = func()
   3147             if plat and plat[0]:
   3148                 plat = '%s %r' % (name, plat)
   3149                 break
   3150         else:
   3151             plat = repr(platform.platform())
   3152         print("test_ssl: testing with %r %r" %
   3153             (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
   3154         print("          under %s" % plat)
   3155         print("          HAS_SNI = %r" % ssl.HAS_SNI)
   3156         print("          OP_ALL = 0x%8x" % ssl.OP_ALL)
   3157         try:
   3158             print("          OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
   3159         except AttributeError:
   3160             pass
   3161 
   3162     for filename in [
   3163         CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE,
   3164         ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
   3165         SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
   3166         BADCERT, BADKEY, EMPTYCERT]:
   3167         if not os.path.exists(filename):
   3168             raise support.TestFailed("Can't read certificate file %r" % filename)
   3169 
   3170     tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests]
   3171 
   3172     if support.is_resource_enabled('network'):
   3173         tests.append(NetworkedTests)
   3174 
   3175     if _have_threads:
   3176         thread_info = support.threading_setup()
   3177         if thread_info:
   3178             tests.append(ThreadedTests)
   3179 
   3180     try:
   3181         support.run_unittest(*tests)
   3182     finally:
   3183         if _have_threads:
   3184             support.threading_cleanup(*thread_info)
   3185 
   3186 if __name__ == "__main__":
   3187     test_main()
   3188