Home | History | Annotate | Download | only in test
      1 # Test the support for SSL and sockets
      2 
      3 import sys
      4 import unittest
      5 from test import test_support
      6 import asyncore
      7 import socket
      8 import select
      9 import time
     10 import gc
     11 import os
     12 import errno
     13 import pprint
     14 import urllib, urlparse
     15 import traceback
     16 import weakref
     17 import functools
     18 import platform
     19 
     20 from BaseHTTPServer import HTTPServer
     21 from SimpleHTTPServer import SimpleHTTPRequestHandler
     22 
     23 ssl = test_support.import_module("ssl")
     24 
     25 HOST = test_support.HOST
     26 CERTFILE = None
     27 SVN_PYTHON_ORG_ROOT_CERT = None
     28 
     29 def handle_error(prefix):
     30     exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
     31     if test_support.verbose:
     32         sys.stdout.write(prefix + exc_format)
     33 
     34 
     35 class BasicTests(unittest.TestCase):
     36 
     37     def test_sslwrap_simple(self):
     38         # A crude test for the legacy API
     39         try:
     40             ssl.sslwrap_simple(socket.socket(socket.AF_INET))
     41         except IOError, e:
     42             if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
     43                 pass
     44             else:
     45                 raise
     46         try:
     47             ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
     48         except IOError, e:
     49             if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
     50                 pass
     51             else:
     52                 raise
     53 
     54 # Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
     55 def skip_if_broken_ubuntu_ssl(func):
     56     if hasattr(ssl, 'PROTOCOL_SSLv2'):
     57         # We need to access the lower-level wrapper in order to create an
     58         # implicit SSL context without trying to connect or listen.
     59         try:
     60             import _ssl
     61         except ImportError:
     62             # The returned function won't get executed, just ignore the error
     63             pass
     64         @functools.wraps(func)
     65         def f(*args, **kwargs):
     66             try:
     67                 s = socket.socket(socket.AF_INET)
     68                 _ssl.sslwrap(s._sock, 0, None, None,
     69                              ssl.CERT_NONE, ssl.PROTOCOL_SSLv2, None, None)
     70             except ssl.SSLError as e:
     71                 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
     72                     platform.linux_distribution() == ('debian', 'squeeze/sid', '')
     73                     and 'Invalid SSL protocol variant specified' in str(e)):
     74                     raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
     75             return func(*args, **kwargs)
     76         return f
     77     else:
     78         return func
     79 
     80 
     81 class BasicSocketTests(unittest.TestCase):
     82 
     83     def test_constants(self):
     84         #ssl.PROTOCOL_SSLv2
     85         ssl.PROTOCOL_SSLv23
     86         ssl.PROTOCOL_SSLv3
     87         ssl.PROTOCOL_TLSv1
     88         ssl.CERT_NONE
     89         ssl.CERT_OPTIONAL
     90         ssl.CERT_REQUIRED
     91 
     92     def test_random(self):
     93         v = ssl.RAND_status()
     94         if test_support.verbose:
     95             sys.stdout.write("\n RAND_status is %d (%s)\n"
     96                              % (v, (v and "sufficient randomness") or
     97                                 "insufficient randomness"))
     98         self.assertRaises(TypeError, ssl.RAND_egd, 1)
     99         self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
    100         ssl.RAND_add("this is a random string", 75.0)
    101 
    102     def test_parse_cert(self):
    103         # note that this uses an 'unofficial' function in _ssl.c,
    104         # provided solely for this test, to exercise the certificate
    105         # parsing code
    106         p = ssl._ssl._test_decode_cert(CERTFILE, False)
    107         if test_support.verbose:
    108             sys.stdout.write("\n" + pprint.pformat(p) + "\n")
    109         self.assertEqual(p['subject'],
    110                          ((('countryName', 'XY'),),
    111                           (('localityName', 'Castle Anthrax'),),
    112                           (('organizationName', 'Python Software Foundation'),),
    113                           (('commonName', 'localhost'),))
    114                         )
    115         self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
    116         # Issue #13034: the subjectAltName in some certificates
    117         # (notably projects.developer.nokia.com:443) wasn't parsed
    118         p = ssl._ssl._test_decode_cert(NOKIACERT)
    119         if test_support.verbose:
    120             sys.stdout.write("\n" + pprint.pformat(p) + "\n")
    121         self.assertEqual(p['subjectAltName'],
    122                          (('DNS', 'projects.developer.nokia.com'),
    123                           ('DNS', 'projects.forum.nokia.com'))
    124                         )
    125 
    126     def test_DER_to_PEM(self):
    127         with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
    128             pem = f.read()
    129         d1 = ssl.PEM_cert_to_DER_cert(pem)
    130         p2 = ssl.DER_cert_to_PEM_cert(d1)
    131         d2 = ssl.PEM_cert_to_DER_cert(p2)
    132         self.assertEqual(d1, d2)
    133         if not p2.startswith(ssl.PEM_HEADER + '\n'):
    134             self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
    135         if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
    136             self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
    137 
    138     def test_openssl_version(self):
    139         n = ssl.OPENSSL_VERSION_NUMBER
    140         t = ssl.OPENSSL_VERSION_INFO
    141         s = ssl.OPENSSL_VERSION
    142         self.assertIsInstance(n, (int, long))
    143         self.assertIsInstance(t, tuple)
    144         self.assertIsInstance(s, str)
    145         # Some sanity checks follow
    146         # >= 0.9
    147         self.assertGreaterEqual(n, 0x900000)
    148         # < 2.0
    149         self.assertLess(n, 0x20000000)
    150         major, minor, fix, patch, status = t
    151         self.assertGreaterEqual(major, 0)
    152         self.assertLess(major, 2)
    153         self.assertGreaterEqual(minor, 0)
    154         self.assertLess(minor, 256)
    155         self.assertGreaterEqual(fix, 0)
    156         self.assertLess(fix, 256)
    157         self.assertGreaterEqual(patch, 0)
    158         self.assertLessEqual(patch, 26)
    159         self.assertGreaterEqual(status, 0)
    160         self.assertLessEqual(status, 15)
    161         # Version string as returned by OpenSSL, the format might change
    162         self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
    163                         (s, t))
    164 
    165     def test_ciphers(self):
    166         if not test_support.is_resource_enabled('network'):
    167             return
    168         remote = ("svn.python.org", 443)
    169         with test_support.transient_internet(remote[0]):
    170             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
    171                                 cert_reqs=ssl.CERT_NONE, ciphers="ALL")
    172             s.connect(remote)
    173             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
    174                                 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")
    175             s.connect(remote)
    176             # Error checking occurs when connecting, because the SSL context
    177             # isn't created before.
    178             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
    179                                 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
    180             with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
    181                 s.connect(remote)
    182 
    183     @test_support.cpython_only
    184     def test_refcycle(self):
    185         # Issue #7943: an SSL object doesn't create reference cycles with
    186         # itself.
    187         s = socket.socket(socket.AF_INET)
    188         ss = ssl.wrap_socket(s)
    189         wr = weakref.ref(ss)
    190         del ss
    191         self.assertEqual(wr(), None)
    192 
    193     def test_wrapped_unconnected(self):
    194         # The _delegate_methods in socket.py are correctly delegated to by an
    195         # unconnected SSLSocket, so they will raise a socket.error rather than
    196         # something unexpected like TypeError.
    197         s = socket.socket(socket.AF_INET)
    198         ss = ssl.wrap_socket(s)
    199         self.assertRaises(socket.error, ss.recv, 1)
    200         self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
    201         self.assertRaises(socket.error, ss.recvfrom, 1)
    202         self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
    203         self.assertRaises(socket.error, ss.send, b'x')
    204         self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
    205 
    206 
    207 class NetworkedTests(unittest.TestCase):
    208 
    209     def test_connect(self):
    210         with test_support.transient_internet("svn.python.org"):
    211             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
    212                                 cert_reqs=ssl.CERT_NONE)
    213             s.connect(("svn.python.org", 443))
    214             c = s.getpeercert()
    215             if c:
    216                 self.fail("Peer cert %s shouldn't be here!")
    217             s.close()
    218 
    219             # this should fail because we have no verification certs
    220             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
    221                                 cert_reqs=ssl.CERT_REQUIRED)
    222             try:
    223                 s.connect(("svn.python.org", 443))
    224             except ssl.SSLError:
    225                 pass
    226             finally:
    227                 s.close()
    228 
    229             # this should succeed because we specify the root cert
    230             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
    231                                 cert_reqs=ssl.CERT_REQUIRED,
    232                                 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
    233             try:
    234                 s.connect(("svn.python.org", 443))
    235             finally:
    236                 s.close()
    237 
    238     def test_connect_ex(self):
    239         # Issue #11326: check connect_ex() implementation
    240         with test_support.transient_internet("svn.python.org"):
    241             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
    242                                 cert_reqs=ssl.CERT_REQUIRED,
    243                                 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
    244             try:
    245                 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
    246                 self.assertTrue(s.getpeercert())
    247             finally:
    248                 s.close()
    249 
    250     def test_non_blocking_connect_ex(self):
    251         # Issue #11326: non-blocking connect_ex() should allow handshake
    252         # to proceed after the socket gets ready.
    253         with test_support.transient_internet("svn.python.org"):
    254             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
    255                                 cert_reqs=ssl.CERT_REQUIRED,
    256                                 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
    257                                 do_handshake_on_connect=False)
    258             try:
    259                 s.setblocking(False)
    260                 rc = s.connect_ex(('svn.python.org', 443))
    261                 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
    262                 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
    263                 # Wait for connect to finish
    264                 select.select([], [s], [], 5.0)
    265                 # Non-blocking handshake
    266                 while True:
    267                     try:
    268                         s.do_handshake()
    269                         break
    270                     except ssl.SSLError as err:
    271                         if err.args[0] == ssl.SSL_ERROR_WANT_READ:
    272                             select.select([s], [], [], 5.0)
    273                         elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
    274                             select.select([], [s], [], 5.0)
    275                         else:
    276                             raise
    277                 # SSL established
    278                 self.assertTrue(s.getpeercert())
    279             finally:
    280                 s.close()
    281 
    282     def test_timeout_connect_ex(self):
    283         # Issue #12065: on a timeout, connect_ex() should return the original
    284         # errno (mimicking the behaviour of non-SSL sockets).
    285         with test_support.transient_internet("svn.python.org"):
    286             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
    287                                 cert_reqs=ssl.CERT_REQUIRED,
    288                                 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
    289                                 do_handshake_on_connect=False)
    290             try:
    291                 s.settimeout(0.0000001)
    292                 rc = s.connect_ex(('svn.python.org', 443))
    293                 if rc == 0:
    294                     self.skipTest("svn.python.org responded too quickly")
    295                 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
    296             finally:
    297                 s.close()
    298 
    299     def test_connect_ex_error(self):
    300         with test_support.transient_internet("svn.python.org"):
    301             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
    302                                 cert_reqs=ssl.CERT_REQUIRED,
    303                                 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
    304             try:
    305                 self.assertEqual(errno.ECONNREFUSED,
    306                                  s.connect_ex(("svn.python.org", 444)))
    307             finally:
    308                 s.close()
    309 
    310     @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
    311     def test_makefile_close(self):
    312         # Issue #5238: creating a file-like object with makefile() shouldn't
    313         # delay closing the underlying "real socket" (here tested with its
    314         # file descriptor, hence skipping the test under Windows).
    315         with test_support.transient_internet("svn.python.org"):
    316             ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
    317             ss.connect(("svn.python.org", 443))
    318             fd = ss.fileno()
    319             f = ss.makefile()
    320             f.close()
    321             # The fd is still open
    322             os.read(fd, 0)
    323             # Closing the SSL socket should close the fd too
    324             ss.close()
    325             gc.collect()
    326             with self.assertRaises(OSError) as e:
    327                 os.read(fd, 0)
    328             self.assertEqual(e.exception.errno, errno.EBADF)
    329 
    330     def test_non_blocking_handshake(self):
    331         with test_support.transient_internet("svn.python.org"):
    332             s = socket.socket(socket.AF_INET)
    333             s.connect(("svn.python.org", 443))
    334             s.setblocking(False)
    335             s = ssl.wrap_socket(s,
    336                                 cert_reqs=ssl.CERT_NONE,
    337                                 do_handshake_on_connect=False)
    338             count = 0
    339             while True:
    340                 try:
    341                     count += 1
    342                     s.do_handshake()
    343                     break
    344                 except ssl.SSLError, err:
    345                     if err.args[0] == ssl.SSL_ERROR_WANT_READ:
    346                         select.select([s], [], [])
    347                     elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
    348                         select.select([], [s], [])
    349                     else:
    350                         raise
    351             s.close()
    352             if test_support.verbose:
    353                 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
    354 
    355     def test_get_server_certificate(self):
    356         with test_support.transient_internet("svn.python.org"):
    357             pem = ssl.get_server_certificate(("svn.python.org", 443))
    358             if not pem:
    359                 self.fail("No server certificate on svn.python.org:443!")
    360 
    361             try:
    362                 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
    363             except ssl.SSLError:
    364                 #should fail
    365                 pass
    366             else:
    367                 self.fail("Got server certificate %s for svn.python.org!" % pem)
    368 
    369             pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
    370             if not pem:
    371                 self.fail("No server certificate on svn.python.org:443!")
    372             if test_support.verbose:
    373                 sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
    374 
    375     def test_algorithms(self):
    376         # Issue #8484: all algorithms should be available when verifying a
    377         # certificate.
    378         # SHA256 was added in OpenSSL 0.9.8
    379         if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
    380             self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
    381         self.skipTest("remote host needs SNI, only available on Python 3.2+")
    382         # NOTE: https://sha2.hboeck.de is another possible test host
    383         remote = ("sha256.tbs-internet.com", 443)
    384         sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
    385         with test_support.transient_internet("sha256.tbs-internet.com"):
    386             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
    387                                 cert_reqs=ssl.CERT_REQUIRED,
    388                                 ca_certs=sha256_cert,)
    389             try:
    390                 s.connect(remote)
    391                 if test_support.verbose:
    392                     sys.stdout.write("\nCipher with %r is %r\n" %
    393                                      (remote, s.cipher()))
    394                     sys.stdout.write("Certificate is:\n%s\n" %
    395                                      pprint.pformat(s.getpeercert()))
    396             finally:
    397                 s.close()
    398 
    399 
    400 try:
    401     import threading
    402 except ImportError:
    403     _have_threads = False
    404 else:
    405     _have_threads = True
    406 
    407     class ThreadedEchoServer(threading.Thread):
    408 
    409         class ConnectionHandler(threading.Thread):
    410 
    411             """A mildly complicated class, because we want it to work both
    412             with and without the SSL wrapper around the socket connection, so
    413             that we can test the STARTTLS functionality."""
    414 
    415             def __init__(self, server, connsock):
    416                 self.server = server
    417                 self.running = False
    418                 self.sock = connsock
    419                 self.sock.setblocking(1)
    420                 self.sslconn = None
    421                 threading.Thread.__init__(self)
    422                 self.daemon = True
    423 
    424             def show_conn_details(self):
    425                 if self.server.certreqs == ssl.CERT_REQUIRED:
    426                     cert = self.sslconn.getpeercert()
    427                     if test_support.verbose and self.server.chatty:
    428                         sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
    429                     cert_binary = self.sslconn.getpeercert(True)
    430                     if test_support.verbose and self.server.chatty:
    431                         sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
    432                 cipher = self.sslconn.cipher()
    433                 if test_support.verbose and self.server.chatty:
    434                     sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
    435 
    436             def wrap_conn(self):
    437                 try:
    438                     self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
    439                                                    certfile=self.server.certificate,
    440                                                    ssl_version=self.server.protocol,
    441                                                    ca_certs=self.server.cacerts,
    442                                                    cert_reqs=self.server.certreqs,
    443                                                    ciphers=self.server.ciphers)
    444                 except ssl.SSLError as e:
    445                     # XXX Various errors can have happened here, for example
    446                     # a mismatching protocol version, an invalid certificate,
    447                     # or a low-level bug. This should be made more discriminating.
    448                     self.server.conn_errors.append(e)
    449                     if self.server.chatty:
    450                         handle_error("\n server:  bad connection attempt from " +
    451                                      str(self.sock.getpeername()) + ":\n")
    452                     self.close()
    453                     self.running = False
    454                     self.server.stop()
    455                     return False
    456                 else:
    457                     return True
    458 
    459             def read(self):
    460                 if self.sslconn:
    461                     return self.sslconn.read()
    462                 else:
    463                     return self.sock.recv(1024)
    464 
    465             def write(self, bytes):
    466                 if self.sslconn:
    467                     return self.sslconn.write(bytes)
    468                 else:
    469                     return self.sock.send(bytes)
    470 
    471             def close(self):
    472                 if self.sslconn:
    473                     self.sslconn.close()
    474                 else:
    475                     self.sock._sock.close()
    476 
    477             def run(self):
    478                 self.running = True
    479                 if not self.server.starttls_server:
    480                     if isinstance(self.sock, ssl.SSLSocket):
    481                         self.sslconn = self.sock
    482                     elif not self.wrap_conn():
    483                         return
    484                     self.show_conn_details()
    485                 while self.running:
    486                     try:
    487                         msg = self.read()
    488                         if not msg:
    489                             # eof, so quit this handler
    490                             self.running = False
    491                             self.close()
    492                         elif msg.strip() == 'over':
    493                             if test_support.verbose and self.server.connectionchatty:
    494                                 sys.stdout.write(" server: client closed connection\n")
    495                             self.close()
    496                             return
    497                         elif self.server.starttls_server and msg.strip() == 'STARTTLS':
    498                             if test_support.verbose and self.server.connectionchatty:
    499                                 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
    500                             self.write("OK\n")
    501                             if not self.wrap_conn():
    502                                 return
    503                         elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS':
    504                             if test_support.verbose and self.server.connectionchatty:
    505                                 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
    506                             self.write("OK\n")
    507                             self.sslconn.unwrap()
    508                             self.sslconn = None
    509                             if test_support.verbose and self.server.connectionchatty:
    510                                 sys.stdout.write(" server: connection is now unencrypted...\n")
    511                         else:
    512                             if (test_support.verbose and
    513                                 self.server.connectionchatty):
    514                                 ctype = (self.sslconn and "encrypted") or "unencrypted"
    515                                 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
    516                                                  % (repr(msg), ctype, repr(msg.lower()), ctype))
    517                             self.write(msg.lower())
    518                     except ssl.SSLError:
    519                         if self.server.chatty:
    520                             handle_error("Test server failure:\n")
    521                         self.close()
    522                         self.running = False
    523                         # normally, we'd just stop here, but for the test
    524                         # harness, we want to stop the server
    525                         self.server.stop()
    526 
    527         def __init__(self, certificate, ssl_version=None,
    528                      certreqs=None, cacerts=None,
    529                      chatty=True, connectionchatty=False, starttls_server=False,
    530                      wrap_accepting_socket=False, ciphers=None):
    531 
    532             if ssl_version is None:
    533                 ssl_version = ssl.PROTOCOL_TLSv1
    534             if certreqs is None:
    535                 certreqs = ssl.CERT_NONE
    536             self.certificate = certificate
    537             self.protocol = ssl_version
    538             self.certreqs = certreqs
    539             self.cacerts = cacerts
    540             self.ciphers = ciphers
    541             self.chatty = chatty
    542             self.connectionchatty = connectionchatty
    543             self.starttls_server = starttls_server
    544             self.sock = socket.socket()
    545             self.flag = None
    546             if wrap_accepting_socket:
    547                 self.sock = ssl.wrap_socket(self.sock, server_side=True,
    548                                             certfile=self.certificate,
    549                                             cert_reqs = self.certreqs,
    550                                             ca_certs = self.cacerts,
    551                                             ssl_version = self.protocol,
    552                                             ciphers = self.ciphers)
    553                 if test_support.verbose and self.chatty:
    554                     sys.stdout.write(' server:  wrapped server socket as %s\n' % str(self.sock))
    555             self.port = test_support.bind_port(self.sock)
    556             self.active = False
    557             self.conn_errors = []
    558             threading.Thread.__init__(self)
    559             self.daemon = True
    560 
    561         def __enter__(self):
    562             self.start(threading.Event())
    563             self.flag.wait()
    564             return self
    565 
    566         def __exit__(self, *args):
    567             self.stop()
    568             self.join()
    569 
    570         def start(self, flag=None):
    571             self.flag = flag
    572             threading.Thread.start(self)
    573 
    574         def run(self):
    575             self.sock.settimeout(0.05)
    576             self.sock.listen(5)
    577             self.active = True
    578             if self.flag:
    579                 # signal an event
    580                 self.flag.set()
    581             while self.active:
    582                 try:
    583                     newconn, connaddr = self.sock.accept()
    584                     if test_support.verbose and self.chatty:
    585                         sys.stdout.write(' server:  new connection from '
    586                                          + str(connaddr) + '\n')
    587                     handler = self.ConnectionHandler(self, newconn)
    588                     handler.start()
    589                     handler.join()
    590                 except socket.timeout:
    591                     pass
    592                 except KeyboardInterrupt:
    593                     self.stop()
    594             self.sock.close()
    595 
    596         def stop(self):
    597             self.active = False
    598 
    599     class AsyncoreEchoServer(threading.Thread):
    600 
    601         class EchoServer(asyncore.dispatcher):
    602 
    603             class ConnectionHandler(asyncore.dispatcher_with_send):
    604 
    605                 def __init__(self, conn, certfile):
    606                     asyncore.dispatcher_with_send.__init__(self, conn)
    607                     self.socket = ssl.wrap_socket(conn, server_side=True,
    608                                                   certfile=certfile,
    609                                                   do_handshake_on_connect=False)
    610                     self._ssl_accepting = True
    611 
    612                 def readable(self):
    613                     if isinstance(self.socket, ssl.SSLSocket):
    614                         while self.socket.pending() > 0:
    615                             self.handle_read_event()
    616                     return True
    617 
    618                 def _do_ssl_handshake(self):
    619                     try:
    620                         self.socket.do_handshake()
    621                     except ssl.SSLError, err:
    622                         if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
    623                                            ssl.SSL_ERROR_WANT_WRITE):
    624                             return
    625                         elif err.args[0] == ssl.SSL_ERROR_EOF:
    626                             return self.handle_close()
    627                         raise
    628                     except socket.error, err:
    629                         if err.args[0] == errno.ECONNABORTED:
    630                             return self.handle_close()
    631                     else:
    632                         self._ssl_accepting = False
    633 
    634                 def handle_read(self):
    635                     if self._ssl_accepting:
    636                         self._do_ssl_handshake()
    637                     else:
    638                         data = self.recv(1024)
    639                         if data and data.strip() != 'over':
    640                             self.send(data.lower())
    641 
    642                 def handle_close(self):
    643                     self.close()
    644                     if test_support.verbose:
    645                         sys.stdout.write(" server:  closed connection %s\n" % self.socket)
    646 
    647                 def handle_error(self):
    648                     raise
    649 
    650             def __init__(self, certfile):
    651                 self.certfile = certfile
    652                 asyncore.dispatcher.__init__(self)
    653                 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    654                 self.port = test_support.bind_port(self.socket)
    655                 self.listen(5)
    656 
    657             def handle_accept(self):
    658                 sock_obj, addr = self.accept()
    659                 if test_support.verbose:
    660                     sys.stdout.write(" server:  new connection from %s:%s\n" %addr)
    661                 self.ConnectionHandler(sock_obj, self.certfile)
    662 
    663             def handle_error(self):
    664                 raise
    665 
    666         def __init__(self, certfile):
    667             self.flag = None
    668             self.active = False
    669             self.server = self.EchoServer(certfile)
    670             self.port = self.server.port
    671             threading.Thread.__init__(self)
    672             self.daemon = True
    673 
    674         def __str__(self):
    675             return "<%s %s>" % (self.__class__.__name__, self.server)
    676 
    677         def __enter__(self):
    678             self.start(threading.Event())
    679             self.flag.wait()
    680             return self
    681 
    682         def __exit__(self, *args):
    683             if test_support.verbose:
    684                 sys.stdout.write(" cleanup: stopping server.\n")
    685             self.stop()
    686             if test_support.verbose:
    687                 sys.stdout.write(" cleanup: joining server thread.\n")
    688             self.join()
    689             if test_support.verbose:
    690                 sys.stdout.write(" cleanup: successfully joined.\n")
    691 
    692         def start(self, flag=None):
    693             self.flag = flag
    694             threading.Thread.start(self)
    695 
    696         def run(self):
    697             self.active = True
    698             if self.flag:
    699                 self.flag.set()
    700             while self.active:
    701                 asyncore.loop(0.05)
    702 
    703         def stop(self):
    704             self.active = False
    705             self.server.close()
    706 
    707     class SocketServerHTTPSServer(threading.Thread):
    708 
    709         class HTTPSServer(HTTPServer):
    710 
    711             def __init__(self, server_address, RequestHandlerClass, certfile):
    712                 HTTPServer.__init__(self, server_address, RequestHandlerClass)
    713                 # we assume the certfile contains both private key and certificate
    714                 self.certfile = certfile
    715                 self.allow_reuse_address = True
    716 
    717             def __str__(self):
    718                 return ('<%s %s:%s>' %
    719                         (self.__class__.__name__,
    720                          self.server_name,
    721                          self.server_port))
    722 
    723             def get_request(self):
    724                 # override this to wrap socket with SSL
    725                 sock, addr = self.socket.accept()
    726                 sslconn = ssl.wrap_socket(sock, server_side=True,
    727                                           certfile=self.certfile)
    728                 return sslconn, addr
    729 
    730         class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
    731             # need to override translate_path to get a known root,
    732             # instead of using os.curdir, since the test could be
    733             # run from anywhere
    734 
    735             server_version = "TestHTTPS/1.0"
    736 
    737             root = None
    738 
    739             def translate_path(self, path):
    740                 """Translate a /-separated PATH to the local filename syntax.
    741 
    742                 Components that mean special things to the local file system
    743                 (e.g. drive or directory names) are ignored.  (XXX They should
    744                 probably be diagnosed.)
    745 
    746                 """
    747                 # abandon query parameters
    748                 path = urlparse.urlparse(path)[2]
    749                 path = os.path.normpath(urllib.unquote(path))
    750                 words = path.split('/')
    751                 words = filter(None, words)
    752                 path = self.root
    753                 for word in words:
    754                     drive, word = os.path.splitdrive(word)
    755                     head, word = os.path.split(word)
    756                     if word in self.root: continue
    757                     path = os.path.join(path, word)
    758                 return path
    759 
    760             def log_message(self, format, *args):
    761 
    762                 # we override this to suppress logging unless "verbose"
    763 
    764                 if test_support.verbose:
    765                     sys.stdout.write(" server (%s:%d %s):\n   [%s] %s\n" %
    766                                      (self.server.server_address,
    767                                       self.server.server_port,
    768                                       self.request.cipher(),
    769                                       self.log_date_time_string(),
    770                                       format%args))
    771 
    772 
    773         def __init__(self, certfile):
    774             self.flag = None
    775             self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
    776             self.server = self.HTTPSServer(
    777                 (HOST, 0), self.RootedHTTPRequestHandler, certfile)
    778             self.port = self.server.server_port
    779             threading.Thread.__init__(self)
    780             self.daemon = True
    781 
    782         def __str__(self):
    783             return "<%s %s>" % (self.__class__.__name__, self.server)
    784 
    785         def start(self, flag=None):
    786             self.flag = flag
    787             threading.Thread.start(self)
    788 
    789         def run(self):
    790             if self.flag:
    791                 self.flag.set()
    792             self.server.serve_forever(0.05)
    793 
    794         def stop(self):
    795             self.server.shutdown()
    796 
    797 
    798     def bad_cert_test(certfile):
    799         """
    800         Launch a server with CERT_REQUIRED, and check that trying to
    801         connect to it with the given client certificate fails.
    802         """
    803         server = ThreadedEchoServer(CERTFILE,
    804                                     certreqs=ssl.CERT_REQUIRED,
    805                                     cacerts=CERTFILE, chatty=False)
    806         with server:
    807             try:
    808                 s = ssl.wrap_socket(socket.socket(),
    809                                     certfile=certfile,
    810                                     ssl_version=ssl.PROTOCOL_TLSv1)
    811                 s.connect((HOST, server.port))
    812             except ssl.SSLError, x:
    813                 if test_support.verbose:
    814                     sys.stdout.write("\nSSLError is %s\n" % x[1])
    815             except socket.error, x:
    816                 if test_support.verbose:
    817                     sys.stdout.write("\nsocket.error is %s\n" % x[1])
    818             else:
    819                 raise AssertionError("Use of invalid cert should have failed!")
    820 
    821     def server_params_test(certfile, protocol, certreqs, cacertsfile,
    822                            client_certfile, client_protocol=None, indata="FOO\n",
    823                            ciphers=None, chatty=True, connectionchatty=False,
    824                            wrap_accepting_socket=False):
    825         """
    826         Launch a server, connect a client to it and try various reads
    827         and writes.
    828         """
    829         server = ThreadedEchoServer(certfile,
    830                                     certreqs=certreqs,
    831                                     ssl_version=protocol,
    832                                     cacerts=cacertsfile,
    833                                     ciphers=ciphers,
    834                                     chatty=chatty,
    835                                     connectionchatty=connectionchatty,
    836                                     wrap_accepting_socket=wrap_accepting_socket)
    837         with server:
    838             # try to connect
    839             if client_protocol is None:
    840                 client_protocol = protocol
    841             s = ssl.wrap_socket(socket.socket(),
    842                                 certfile=client_certfile,
    843                                 ca_certs=cacertsfile,
    844                                 ciphers=ciphers,
    845                                 cert_reqs=certreqs,
    846                                 ssl_version=client_protocol)
    847             s.connect((HOST, server.port))
    848             for arg in [indata, bytearray(indata), memoryview(indata)]:
    849                 if connectionchatty:
    850                     if test_support.verbose:
    851                         sys.stdout.write(
    852                             " client:  sending %s...\n" % (repr(arg)))
    853                 s.write(arg)
    854                 outdata = s.read()
    855                 if connectionchatty:
    856                     if test_support.verbose:
    857                         sys.stdout.write(" client:  read %s\n" % repr(outdata))
    858                 if outdata != indata.lower():
    859                     raise AssertionError(
    860                         "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
    861                         % (outdata[:min(len(outdata),20)], len(outdata),
    862                            indata[:min(len(indata),20)].lower(), len(indata)))
    863             s.write("over\n")
    864             if connectionchatty:
    865                 if test_support.verbose:
    866                     sys.stdout.write(" client:  closing connection.\n")
    867             s.close()
    868 
    869     def try_protocol_combo(server_protocol,
    870                            client_protocol,
    871                            expect_success,
    872                            certsreqs=None):
    873         if certsreqs is None:
    874             certsreqs = ssl.CERT_NONE
    875         certtype = {
    876             ssl.CERT_NONE: "CERT_NONE",
    877             ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
    878             ssl.CERT_REQUIRED: "CERT_REQUIRED",
    879         }[certsreqs]
    880         if test_support.verbose:
    881             formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
    882             sys.stdout.write(formatstr %
    883                              (ssl.get_protocol_name(client_protocol),
    884                               ssl.get_protocol_name(server_protocol),
    885                               certtype))
    886         try:
    887             # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client
    888             # will send an SSLv3 hello (rather than SSLv2) starting from
    889             # OpenSSL 1.0.0 (see issue #8322).
    890             server_params_test(CERTFILE, server_protocol, certsreqs,
    891                                CERTFILE, CERTFILE, client_protocol,
    892                                ciphers="ALL", chatty=False)
    893         # Protocol mismatch can result in either an SSLError, or a
    894         # "Connection reset by peer" error.
    895         except ssl.SSLError:
    896             if expect_success:
    897                 raise
    898         except socket.error as e:
    899             if expect_success or e.errno != errno.ECONNRESET:
    900                 raise
    901         else:
    902             if not expect_success:
    903                 raise AssertionError(
    904                     "Client protocol %s succeeded with server protocol %s!"
    905                     % (ssl.get_protocol_name(client_protocol),
    906                        ssl.get_protocol_name(server_protocol)))
    907 
    908 
    909     class ThreadedTests(unittest.TestCase):
    910 
    911         def test_rude_shutdown(self):
    912             """A brutal shutdown of an SSL server should raise an IOError
    913             in the client when attempting handshake.
    914             """
    915             listener_ready = threading.Event()
    916             listener_gone = threading.Event()
    917 
    918             s = socket.socket()
    919             port = test_support.bind_port(s, HOST)
    920 
    921             # `listener` runs in a thread.  It sits in an accept() until
    922             # the main thread connects.  Then it rudely closes the socket,
    923             # and sets Event `listener_gone` to let the main thread know
    924             # the socket is gone.
    925             def listener():
    926                 s.listen(5)
    927                 listener_ready.set()
    928                 s.accept()
    929                 s.close()
    930                 listener_gone.set()
    931 
    932             def connector():
    933                 listener_ready.wait()
    934                 c = socket.socket()
    935                 c.connect((HOST, port))
    936                 listener_gone.wait()
    937                 try:
    938                     ssl_sock = ssl.wrap_socket(c)
    939                 except IOError:
    940                     pass
    941                 else:
    942                     self.fail('connecting to closed SSL socket should have failed')
    943 
    944             t = threading.Thread(target=listener)
    945             t.start()
    946             try:
    947                 connector()
    948             finally:
    949                 t.join()
    950 
    951         @skip_if_broken_ubuntu_ssl
    952         def test_echo(self):
    953             """Basic test of an SSL client connecting to a server"""
    954             if test_support.verbose:
    955                 sys.stdout.write("\n")
    956             server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
    957                                CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
    958                                chatty=True, connectionchatty=True)
    959 
    960         def test_getpeercert(self):
    961             if test_support.verbose:
    962                 sys.stdout.write("\n")
    963             s2 = socket.socket()
    964             server = ThreadedEchoServer(CERTFILE,
    965                                         certreqs=ssl.CERT_NONE,
    966                                         ssl_version=ssl.PROTOCOL_SSLv23,
    967                                         cacerts=CERTFILE,
    968                                         chatty=False)
    969             with server:
    970                 s = ssl.wrap_socket(socket.socket(),
    971                                     certfile=CERTFILE,
    972                                     ca_certs=CERTFILE,
    973                                     cert_reqs=ssl.CERT_REQUIRED,
    974                                     ssl_version=ssl.PROTOCOL_SSLv23)
    975                 s.connect((HOST, server.port))
    976                 cert = s.getpeercert()
    977                 self.assertTrue(cert, "Can't get peer certificate.")
    978                 cipher = s.cipher()
    979                 if test_support.verbose:
    980                     sys.stdout.write(pprint.pformat(cert) + '\n')
    981                     sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
    982                 if 'subject' not in cert:
    983                     self.fail("No subject field in certificate: %s." %
    984                               pprint.pformat(cert))
    985                 if ((('organizationName', 'Python Software Foundation'),)
    986                     not in cert['subject']):
    987                     self.fail(
    988                         "Missing or invalid 'organizationName' field in certificate subject; "
    989                         "should be 'Python Software Foundation'.")
    990                 s.close()
    991 
    992         def test_empty_cert(self):
    993             """Connecting with an empty cert file"""
    994             bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
    995                                       "nullcert.pem"))
    996         def test_malformed_cert(self):
    997             """Connecting with a badly formatted certificate (syntax error)"""
    998             bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
    999                                        "badcert.pem"))
   1000         def test_nonexisting_cert(self):
   1001             """Connecting with a non-existing cert file"""
   1002             bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
   1003                                        "wrongcert.pem"))
   1004         def test_malformed_key(self):
   1005             """Connecting with a badly formatted key (syntax error)"""
   1006             bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
   1007                                        "badkey.pem"))
   1008 
   1009         @skip_if_broken_ubuntu_ssl
   1010         def test_protocol_sslv2(self):
   1011             """Connecting to an SSLv2 server with various client options"""
   1012             if test_support.verbose:
   1013                 sys.stdout.write("\n")
   1014             if not hasattr(ssl, 'PROTOCOL_SSLv2'):
   1015                 self.skipTest("PROTOCOL_SSLv2 needed")
   1016             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
   1017             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
   1018             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
   1019             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
   1020             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
   1021             try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
   1022 
   1023         @skip_if_broken_ubuntu_ssl
   1024         def test_protocol_sslv23(self):
   1025             """Connecting to an SSLv23 server with various client options"""
   1026             if test_support.verbose:
   1027                 sys.stdout.write("\n")
   1028             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
   1029             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
   1030             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
   1031 
   1032             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
   1033             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
   1034             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
   1035 
   1036             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
   1037             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
   1038             try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
   1039 
   1040         @skip_if_broken_ubuntu_ssl
   1041         def test_protocol_sslv3(self):
   1042             """Connecting to an SSLv3 server with various client options"""
   1043             if test_support.verbose:
   1044                 sys.stdout.write("\n")
   1045             try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
   1046             try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
   1047             try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
   1048             if hasattr(ssl, 'PROTOCOL_SSLv2'):
   1049                 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
   1050             try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
   1051 
   1052         @skip_if_broken_ubuntu_ssl
   1053         def test_protocol_tlsv1(self):
   1054             """Connecting to a TLSv1 server with various client options"""
   1055             if test_support.verbose:
   1056                 sys.stdout.write("\n")
   1057             try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
   1058             try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
   1059             try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
   1060             if hasattr(ssl, 'PROTOCOL_SSLv2'):
   1061                 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
   1062             try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
   1063 
   1064         def test_starttls(self):
   1065             """Switching from clear text to encrypted and back again."""
   1066             msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
   1067 
   1068             server = ThreadedEchoServer(CERTFILE,
   1069                                         ssl_version=ssl.PROTOCOL_TLSv1,
   1070                                         starttls_server=True,
   1071                                         chatty=True,
   1072                                         connectionchatty=True)
   1073             wrapped = False
   1074             with server:
   1075                 s = socket.socket()
   1076                 s.setblocking(1)
   1077                 s.connect((HOST, server.port))
   1078                 if test_support.verbose:
   1079                     sys.stdout.write("\n")
   1080                 for indata in msgs:
   1081                     if test_support.verbose:
   1082                         sys.stdout.write(
   1083                             " client:  sending %s...\n" % repr(indata))
   1084                     if wrapped:
   1085                         conn.write(indata)
   1086                         outdata = conn.read()
   1087                     else:
   1088                         s.send(indata)
   1089                         outdata = s.recv(1024)
   1090                     if (indata == "STARTTLS" and
   1091                         outdata.strip().lower().startswith("ok")):
   1092                         # STARTTLS ok, switch to secure mode
   1093                         if test_support.verbose:
   1094                             sys.stdout.write(
   1095                                 " client:  read %s from server, starting TLS...\n"
   1096                                 % repr(outdata))
   1097                         conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
   1098                         wrapped = True
   1099                     elif (indata == "ENDTLS" and
   1100                         outdata.strip().lower().startswith("ok")):
   1101                         # ENDTLS ok, switch back to clear text
   1102                         if test_support.verbose:
   1103                             sys.stdout.write(
   1104                                 " client:  read %s from server, ending TLS...\n"
   1105                                 % repr(outdata))
   1106                         s = conn.unwrap()
   1107                         wrapped = False
   1108                     else:
   1109                         if test_support.verbose:
   1110                             sys.stdout.write(
   1111                                 " client:  read %s from server\n" % repr(outdata))
   1112                 if test_support.verbose:
   1113                     sys.stdout.write(" client:  closing connection.\n")
   1114                 if wrapped:
   1115                     conn.write("over\n")
   1116                 else:
   1117                     s.send("over\n")
   1118                 s.close()
   1119 
   1120         def test_socketserver(self):
   1121             """Using a SocketServer to create and manage SSL connections."""
   1122             server = SocketServerHTTPSServer(CERTFILE)
   1123             flag = threading.Event()
   1124             server.start(flag)
   1125             # wait for it to start
   1126             flag.wait()
   1127             # try to connect
   1128             try:
   1129                 if test_support.verbose:
   1130                     sys.stdout.write('\n')
   1131                 with open(CERTFILE, 'rb') as f:
   1132                     d1 = f.read()
   1133                 d2 = ''
   1134                 # now fetch the same data from the HTTPS server
   1135                 url = 'https://127.0.0.1:%d/%s' % (
   1136                     server.port, os.path.split(CERTFILE)[1])
   1137                 with test_support.check_py3k_warnings():
   1138                     f = urllib.urlopen(url)
   1139                 dlen = f.info().getheader("content-length")
   1140                 if dlen and (int(dlen) > 0):
   1141                     d2 = f.read(int(dlen))
   1142                     if test_support.verbose:
   1143                         sys.stdout.write(
   1144                             " client: read %d bytes from remote server '%s'\n"
   1145                             % (len(d2), server))
   1146                 f.close()
   1147                 self.assertEqual(d1, d2)
   1148             finally:
   1149                 server.stop()
   1150                 server.join()
   1151 
   1152         def test_wrapped_accept(self):
   1153             """Check the accept() method on SSL sockets."""
   1154             if test_support.verbose:
   1155                 sys.stdout.write("\n")
   1156             server_params_test(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED,
   1157                                CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23,
   1158                                chatty=True, connectionchatty=True,
   1159                                wrap_accepting_socket=True)
   1160 
   1161         def test_asyncore_server(self):
   1162             """Check the example asyncore integration."""
   1163             indata = "TEST MESSAGE of mixed case\n"
   1164 
   1165             if test_support.verbose:
   1166                 sys.stdout.write("\n")
   1167             server = AsyncoreEchoServer(CERTFILE)
   1168             with server:
   1169                 s = ssl.wrap_socket(socket.socket())
   1170                 s.connect(('127.0.0.1', server.port))
   1171                 if test_support.verbose:
   1172                     sys.stdout.write(
   1173                         " client:  sending %s...\n" % (repr(indata)))
   1174                 s.write(indata)
   1175                 outdata = s.read()
   1176                 if test_support.verbose:
   1177                     sys.stdout.write(" client:  read %s\n" % repr(outdata))
   1178                 if outdata != indata.lower():
   1179                     self.fail(
   1180                         "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
   1181                         % (outdata[:min(len(outdata),20)], len(outdata),
   1182                            indata[:min(len(indata),20)].lower(), len(indata)))
   1183                 s.write("over\n")
   1184                 if test_support.verbose:
   1185                     sys.stdout.write(" client:  closing connection.\n")
   1186                 s.close()
   1187 
   1188         def test_recv_send(self):
   1189             """Test recv(), send() and friends."""
   1190             if test_support.verbose:
   1191                 sys.stdout.write("\n")
   1192 
   1193             server = ThreadedEchoServer(CERTFILE,
   1194                                         certreqs=ssl.CERT_NONE,
   1195                                         ssl_version=ssl.PROTOCOL_TLSv1,
   1196                                         cacerts=CERTFILE,
   1197                                         chatty=True,
   1198                                         connectionchatty=False)
   1199             with server:
   1200                 s = ssl.wrap_socket(socket.socket(),
   1201                                     server_side=False,
   1202                                     certfile=CERTFILE,
   1203                                     ca_certs=CERTFILE,
   1204                                     cert_reqs=ssl.CERT_NONE,
   1205                                     ssl_version=ssl.PROTOCOL_TLSv1)
   1206                 s.connect((HOST, server.port))
   1207                 # helper methods for standardising recv* method signatures
   1208                 def _recv_into():
   1209                     b = bytearray("\0"*100)
   1210                     count = s.recv_into(b)
   1211                     return b[:count]
   1212 
   1213                 def _recvfrom_into():
   1214                     b = bytearray("\0"*100)
   1215                     count, addr = s.recvfrom_into(b)
   1216                     return b[:count]
   1217 
   1218                 # (name, method, whether to expect success, *args)
   1219                 send_methods = [
   1220                     ('send', s.send, True, []),
   1221                     ('sendto', s.sendto, False, ["some.address"]),
   1222                     ('sendall', s.sendall, True, []),
   1223                 ]
   1224                 recv_methods = [
   1225                     ('recv', s.recv, True, []),
   1226                     ('recvfrom', s.recvfrom, False, ["some.address"]),
   1227                     ('recv_into', _recv_into, True, []),
   1228                     ('recvfrom_into', _recvfrom_into, False, []),
   1229                 ]
   1230                 data_prefix = u"PREFIX_"
   1231 
   1232                 for meth_name, send_meth, expect_success, args in send_methods:
   1233                     indata = data_prefix + meth_name
   1234                     try:
   1235                         send_meth(indata.encode('ASCII', 'strict'), *args)
   1236                         outdata = s.read()
   1237                         outdata = outdata.decode('ASCII', 'strict')
   1238                         if outdata != indata.lower():
   1239                             self.fail(
   1240                                 "While sending with <<%s>> bad data "
   1241                                 "<<%r>> (%d) received; "
   1242                                 "expected <<%r>> (%d)\n" % (
   1243                                     meth_name, outdata[:20], len(outdata),
   1244                                     indata[:20], len(indata)
   1245                                 )
   1246                             )
   1247                     except ValueError as e:
   1248                         if expect_success:
   1249                             self.fail(
   1250                                 "Failed to send with method <<%s>>; "
   1251                                 "expected to succeed.\n" % (meth_name,)
   1252                             )
   1253                         if not str(e).startswith(meth_name):
   1254                             self.fail(
   1255                                 "Method <<%s>> failed with unexpected "
   1256                                 "exception message: %s\n" % (
   1257                                     meth_name, e
   1258                                 )
   1259                             )
   1260 
   1261                 for meth_name, recv_meth, expect_success, args in recv_methods:
   1262                     indata = data_prefix + meth_name
   1263                     try:
   1264                         s.send(indata.encode('ASCII', 'strict'))
   1265                         outdata = recv_meth(*args)
   1266                         outdata = outdata.decode('ASCII', 'strict')
   1267                         if outdata != indata.lower():
   1268                             self.fail(
   1269                                 "While receiving with <<%s>> bad data "
   1270                                 "<<%r>> (%d) received; "
   1271                                 "expected <<%r>> (%d)\n" % (
   1272                                     meth_name, outdata[:20], len(outdata),
   1273                                     indata[:20], len(indata)
   1274                                 )
   1275                             )
   1276                     except ValueError as e:
   1277                         if expect_success:
   1278                             self.fail(
   1279                                 "Failed to receive with method <<%s>>; "
   1280                                 "expected to succeed.\n" % (meth_name,)
   1281                             )
   1282                         if not str(e).startswith(meth_name):
   1283                             self.fail(
   1284                                 "Method <<%s>> failed with unexpected "
   1285                                 "exception message: %s\n" % (
   1286                                     meth_name, e
   1287                                 )
   1288                             )
   1289                         # consume data
   1290                         s.read()
   1291 
   1292                 s.write("over\n".encode("ASCII", "strict"))
   1293                 s.close()
   1294 
   1295         def test_handshake_timeout(self):
   1296             # Issue #5103: SSL handshake must respect the socket timeout
   1297             server = socket.socket(socket.AF_INET)
   1298             host = "127.0.0.1"
   1299             port = test_support.bind_port(server)
   1300             started = threading.Event()
   1301             finish = False
   1302 
   1303             def serve():
   1304                 server.listen(5)
   1305                 started.set()
   1306                 conns = []
   1307                 while not finish:
   1308                     r, w, e = select.select([server], [], [], 0.1)
   1309                     if server in r:
   1310                         # Let the socket hang around rather than having
   1311                         # it closed by garbage collection.
   1312                         conns.append(server.accept()[0])
   1313 
   1314             t = threading.Thread(target=serve)
   1315             t.start()
   1316             started.wait()
   1317 
   1318             try:
   1319                 try:
   1320                     c = socket.socket(socket.AF_INET)
   1321                     c.settimeout(0.2)
   1322                     c.connect((host, port))
   1323                     # Will attempt handshake and time out
   1324                     self.assertRaisesRegexp(ssl.SSLError, "timed out",
   1325                                             ssl.wrap_socket, c)
   1326                 finally:
   1327                     c.close()
   1328                 try:
   1329                     c = socket.socket(socket.AF_INET)
   1330                     c.settimeout(0.2)
   1331                     c = ssl.wrap_socket(c)
   1332                     # Will attempt handshake and time out
   1333                     self.assertRaisesRegexp(ssl.SSLError, "timed out",
   1334                                             c.connect, (host, port))
   1335                 finally:
   1336                     c.close()
   1337             finally:
   1338                 finish = True
   1339                 t.join()
   1340                 server.close()
   1341 
   1342         def test_default_ciphers(self):
   1343             with ThreadedEchoServer(CERTFILE,
   1344                                     ssl_version=ssl.PROTOCOL_SSLv23,
   1345                                     chatty=False) as server:
   1346                 sock = socket.socket()
   1347                 try:
   1348                     # Force a set of weak ciphers on our client socket
   1349                     try:
   1350                         s = ssl.wrap_socket(sock,
   1351                                             ssl_version=ssl.PROTOCOL_SSLv23,
   1352                                             ciphers="DES")
   1353                     except ssl.SSLError:
   1354                         self.skipTest("no DES cipher available")
   1355                     with self.assertRaises((OSError, ssl.SSLError)):
   1356                         s.connect((HOST, server.port))
   1357                 finally:
   1358                     sock.close()
   1359             self.assertIn("no shared cipher", str(server.conn_errors[0]))
   1360 
   1361 
   1362 def test_main(verbose=False):
   1363     global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, NOKIACERT
   1364     CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
   1365                             "keycert.pem")
   1366     SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
   1367         os.path.dirname(__file__) or os.curdir,
   1368         "https_svn_python_org_root.pem")
   1369     NOKIACERT = os.path.join(os.path.dirname(__file__) or os.curdir,
   1370                              "nokia.pem")
   1371 
   1372     if (not os.path.exists(CERTFILE) or
   1373         not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT) or
   1374         not os.path.exists(NOKIACERT)):
   1375         raise test_support.TestFailed("Can't read certificate files!")
   1376 
   1377     tests = [BasicTests, BasicSocketTests]
   1378 
   1379     if test_support.is_resource_enabled('network'):
   1380         tests.append(NetworkedTests)
   1381 
   1382     if _have_threads:
   1383         thread_info = test_support.threading_setup()
   1384         if thread_info and test_support.is_resource_enabled('network'):
   1385             tests.append(ThreadedTests)
   1386 
   1387     try:
   1388         test_support.run_unittest(*tests)
   1389     finally:
   1390         if _have_threads:
   1391             test_support.threading_cleanup(*thread_info)
   1392 
   1393 if __name__ == "__main__":
   1394     test_main()
   1395