Home | History | Annotate | Download | only in test
      1 import asyncore
      2 import base64
      3 import email.mime.text
      4 from email.message import EmailMessage
      5 from email.base64mime import body_encode as encode_base64
      6 import email.utils
      7 import hmac
      8 import socket
      9 import smtpd
     10 import smtplib
     11 import io
     12 import re
     13 import sys
     14 import time
     15 import select
     16 import errno
     17 import textwrap
     18 
     19 import unittest
     20 from test import support, mock_socket
     21 
     22 try:
     23     import threading
     24 except ImportError:
     25     threading = None
     26 
     27 HOST = support.HOST
     28 
     29 if sys.platform == 'darwin':
     30     # select.poll returns a select.POLLHUP at the end of the tests
     31     # on darwin, so just ignore it
     32     def handle_expt(self):
     33         pass
     34     smtpd.SMTPChannel.handle_expt = handle_expt
     35 
     36 
     37 def server(evt, buf, serv):
     38     serv.listen()
     39     evt.set()
     40     try:
     41         conn, addr = serv.accept()
     42     except socket.timeout:
     43         pass
     44     else:
     45         n = 500
     46         while buf and n > 0:
     47             r, w, e = select.select([], [conn], [])
     48             if w:
     49                 sent = conn.send(buf)
     50                 buf = buf[sent:]
     51 
     52             n -= 1
     53 
     54         conn.close()
     55     finally:
     56         serv.close()
     57         evt.set()
     58 
     59 class GeneralTests(unittest.TestCase):
     60 
     61     def setUp(self):
     62         smtplib.socket = mock_socket
     63         self.port = 25
     64 
     65     def tearDown(self):
     66         smtplib.socket = socket
     67 
     68     # This method is no longer used but is retained for backward compatibility,
     69     # so test to make sure it still works.
     70     def testQuoteData(self):
     71         teststr  = "abc\n.jkl\rfoo\r\n..blue"
     72         expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
     73         self.assertEqual(expected, smtplib.quotedata(teststr))
     74 
     75     def testBasic1(self):
     76         mock_socket.reply_with(b"220 Hola mundo")
     77         # connects
     78         smtp = smtplib.SMTP(HOST, self.port)
     79         smtp.close()
     80 
     81     def testSourceAddress(self):
     82         mock_socket.reply_with(b"220 Hola mundo")
     83         # connects
     84         smtp = smtplib.SMTP(HOST, self.port,
     85                 source_address=('127.0.0.1',19876))
     86         self.assertEqual(smtp.source_address, ('127.0.0.1', 19876))
     87         smtp.close()
     88 
     89     def testBasic2(self):
     90         mock_socket.reply_with(b"220 Hola mundo")
     91         # connects, include port in host name
     92         smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
     93         smtp.close()
     94 
     95     def testLocalHostName(self):
     96         mock_socket.reply_with(b"220 Hola mundo")
     97         # check that supplied local_hostname is used
     98         smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
     99         self.assertEqual(smtp.local_hostname, "testhost")
    100         smtp.close()
    101 
    102     def testTimeoutDefault(self):
    103         mock_socket.reply_with(b"220 Hola mundo")
    104         self.assertIsNone(mock_socket.getdefaulttimeout())
    105         mock_socket.setdefaulttimeout(30)
    106         self.assertEqual(mock_socket.getdefaulttimeout(), 30)
    107         try:
    108             smtp = smtplib.SMTP(HOST, self.port)
    109         finally:
    110             mock_socket.setdefaulttimeout(None)
    111         self.assertEqual(smtp.sock.gettimeout(), 30)
    112         smtp.close()
    113 
    114     def testTimeoutNone(self):
    115         mock_socket.reply_with(b"220 Hola mundo")
    116         self.assertIsNone(socket.getdefaulttimeout())
    117         socket.setdefaulttimeout(30)
    118         try:
    119             smtp = smtplib.SMTP(HOST, self.port, timeout=None)
    120         finally:
    121             socket.setdefaulttimeout(None)
    122         self.assertIsNone(smtp.sock.gettimeout())
    123         smtp.close()
    124 
    125     def testTimeoutValue(self):
    126         mock_socket.reply_with(b"220 Hola mundo")
    127         smtp = smtplib.SMTP(HOST, self.port, timeout=30)
    128         self.assertEqual(smtp.sock.gettimeout(), 30)
    129         smtp.close()
    130 
    131     def test_debuglevel(self):
    132         mock_socket.reply_with(b"220 Hello world")
    133         smtp = smtplib.SMTP()
    134         smtp.set_debuglevel(1)
    135         with support.captured_stderr() as stderr:
    136             smtp.connect(HOST, self.port)
    137         smtp.close()
    138         expected = re.compile(r"^connect:", re.MULTILINE)
    139         self.assertRegex(stderr.getvalue(), expected)
    140 
    141     def test_debuglevel_2(self):
    142         mock_socket.reply_with(b"220 Hello world")
    143         smtp = smtplib.SMTP()
    144         smtp.set_debuglevel(2)
    145         with support.captured_stderr() as stderr:
    146             smtp.connect(HOST, self.port)
    147         smtp.close()
    148         expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
    149                               re.MULTILINE)
    150         self.assertRegex(stderr.getvalue(), expected)
    151 
    152 
    153 # Test server thread using the specified SMTP server class
    154 def debugging_server(serv, serv_evt, client_evt):
    155     serv_evt.set()
    156 
    157     try:
    158         if hasattr(select, 'poll'):
    159             poll_fun = asyncore.poll2
    160         else:
    161             poll_fun = asyncore.poll
    162 
    163         n = 1000
    164         while asyncore.socket_map and n > 0:
    165             poll_fun(0.01, asyncore.socket_map)
    166 
    167             # when the client conversation is finished, it will
    168             # set client_evt, and it's then ok to kill the server
    169             if client_evt.is_set():
    170                 serv.close()
    171                 break
    172 
    173             n -= 1
    174 
    175     except socket.timeout:
    176         pass
    177     finally:
    178         if not client_evt.is_set():
    179             # allow some time for the client to read the result
    180             time.sleep(0.5)
    181             serv.close()
    182         asyncore.close_all()
    183         serv_evt.set()
    184 
    185 MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
    186 MSG_END = '------------ END MESSAGE ------------\n'
    187 
    188 # NOTE: Some SMTP objects in the tests below are created with a non-default
    189 # local_hostname argument to the constructor, since (on some systems) the FQDN
    190 # lookup caused by the default local_hostname sometimes takes so long that the
    191 # test server times out, causing the test to fail.
    192 
    193 # Test behavior of smtpd.DebuggingServer
    194 @unittest.skipUnless(threading, 'Threading required for this test.')
    195 class DebuggingServerTests(unittest.TestCase):
    196 
    197     maxDiff = None
    198 
    199     def setUp(self):
    200         self.real_getfqdn = socket.getfqdn
    201         socket.getfqdn = mock_socket.getfqdn
    202         # temporarily replace sys.stdout to capture DebuggingServer output
    203         self.old_stdout = sys.stdout
    204         self.output = io.StringIO()
    205         sys.stdout = self.output
    206 
    207         self.serv_evt = threading.Event()
    208         self.client_evt = threading.Event()
    209         # Capture SMTPChannel debug output
    210         self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
    211         smtpd.DEBUGSTREAM = io.StringIO()
    212         # Pick a random unused port by passing 0 for the port number
    213         self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
    214                                           decode_data=True)
    215         # Keep a note of what port was assigned
    216         self.port = self.serv.socket.getsockname()[1]
    217         serv_args = (self.serv, self.serv_evt, self.client_evt)
    218         self.thread = threading.Thread(target=debugging_server, args=serv_args)
    219         self.thread.start()
    220 
    221         # wait until server thread has assigned a port number
    222         self.serv_evt.wait()
    223         self.serv_evt.clear()
    224 
    225     def tearDown(self):
    226         socket.getfqdn = self.real_getfqdn
    227         # indicate that the client is finished
    228         self.client_evt.set()
    229         # wait for the server thread to terminate
    230         self.serv_evt.wait()
    231         self.thread.join()
    232         # restore sys.stdout
    233         sys.stdout = self.old_stdout
    234         # restore DEBUGSTREAM
    235         smtpd.DEBUGSTREAM.close()
    236         smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
    237 
    238     def testBasic(self):
    239         # connect
    240         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    241         smtp.quit()
    242 
    243     def testSourceAddress(self):
    244         # connect
    245         port = support.find_unused_port()
    246         try:
    247             smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    248                     timeout=3, source_address=('127.0.0.1', port))
    249             self.assertEqual(smtp.source_address, ('127.0.0.1', port))
    250             self.assertEqual(smtp.local_hostname, 'localhost')
    251             smtp.quit()
    252         except OSError as e:
    253             if e.errno == errno.EADDRINUSE:
    254                 self.skipTest("couldn't bind to port %d" % port)
    255             raise
    256 
    257     def testNOOP(self):
    258         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    259         expected = (250, b'OK')
    260         self.assertEqual(smtp.noop(), expected)
    261         smtp.quit()
    262 
    263     def testRSET(self):
    264         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    265         expected = (250, b'OK')
    266         self.assertEqual(smtp.rset(), expected)
    267         smtp.quit()
    268 
    269     def testELHO(self):
    270         # EHLO isn't implemented in DebuggingServer
    271         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    272         expected = (250, b'\nSIZE 33554432\nHELP')
    273         self.assertEqual(smtp.ehlo(), expected)
    274         smtp.quit()
    275 
    276     def testEXPNNotImplemented(self):
    277         # EXPN isn't implemented in DebuggingServer
    278         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    279         expected = (502, b'EXPN not implemented')
    280         smtp.putcmd('EXPN')
    281         self.assertEqual(smtp.getreply(), expected)
    282         smtp.quit()
    283 
    284     def testVRFY(self):
    285         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    286         expected = (252, b'Cannot VRFY user, but will accept message ' + \
    287                          b'and attempt delivery')
    288         self.assertEqual(smtp.vrfy('nobody (at] nowhere.com'), expected)
    289         self.assertEqual(smtp.verify('nobody (at] nowhere.com'), expected)
    290         smtp.quit()
    291 
    292     def testSecondHELO(self):
    293         # check that a second HELO returns a message that it's a duplicate
    294         # (this behavior is specific to smtpd.SMTPChannel)
    295         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    296         smtp.helo()
    297         expected = (503, b'Duplicate HELO/EHLO')
    298         self.assertEqual(smtp.helo(), expected)
    299         smtp.quit()
    300 
    301     def testHELP(self):
    302         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    303         self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
    304                                       b'RCPT DATA RSET NOOP QUIT VRFY')
    305         smtp.quit()
    306 
    307     def testSend(self):
    308         # connect and send mail
    309         m = 'A test message'
    310         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    311         smtp.sendmail('John', 'Sally', m)
    312         # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
    313         # in asyncore.  This sleep might help, but should really be fixed
    314         # properly by using an Event variable.
    315         time.sleep(0.01)
    316         smtp.quit()
    317 
    318         self.client_evt.set()
    319         self.serv_evt.wait()
    320         self.output.flush()
    321         mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
    322         self.assertEqual(self.output.getvalue(), mexpect)
    323 
    324     def testSendBinary(self):
    325         m = b'A test message'
    326         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    327         smtp.sendmail('John', 'Sally', m)
    328         # XXX (see comment in testSend)
    329         time.sleep(0.01)
    330         smtp.quit()
    331 
    332         self.client_evt.set()
    333         self.serv_evt.wait()
    334         self.output.flush()
    335         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
    336         self.assertEqual(self.output.getvalue(), mexpect)
    337 
    338     def testSendNeedingDotQuote(self):
    339         # Issue 12283
    340         m = '.A test\n.mes.sage.'
    341         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    342         smtp.sendmail('John', 'Sally', m)
    343         # XXX (see comment in testSend)
    344         time.sleep(0.01)
    345         smtp.quit()
    346 
    347         self.client_evt.set()
    348         self.serv_evt.wait()
    349         self.output.flush()
    350         mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
    351         self.assertEqual(self.output.getvalue(), mexpect)
    352 
    353     def testSendNullSender(self):
    354         m = 'A test message'
    355         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    356         smtp.sendmail('<>', 'Sally', m)
    357         # XXX (see comment in testSend)
    358         time.sleep(0.01)
    359         smtp.quit()
    360 
    361         self.client_evt.set()
    362         self.serv_evt.wait()
    363         self.output.flush()
    364         mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
    365         self.assertEqual(self.output.getvalue(), mexpect)
    366         debugout = smtpd.DEBUGSTREAM.getvalue()
    367         sender = re.compile("^sender: <>$", re.MULTILINE)
    368         self.assertRegex(debugout, sender)
    369 
    370     def testSendMessage(self):
    371         m = email.mime.text.MIMEText('A test message')
    372         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    373         smtp.send_message(m, from_addr='John', to_addrs='Sally')
    374         # XXX (see comment in testSend)
    375         time.sleep(0.01)
    376         smtp.quit()
    377 
    378         self.client_evt.set()
    379         self.serv_evt.wait()
    380         self.output.flush()
    381         # Add the X-Peer header that DebuggingServer adds
    382         m['X-Peer'] = socket.gethostbyname('localhost')
    383         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
    384         self.assertEqual(self.output.getvalue(), mexpect)
    385 
    386     def testSendMessageWithAddresses(self):
    387         m = email.mime.text.MIMEText('A test message')
    388         m['From'] = 'foo (at] bar.com'
    389         m['To'] = 'John'
    390         m['CC'] = 'Sally, Fred'
    391         m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped (at] silly.walks.com>'
    392         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    393         smtp.send_message(m)
    394         # XXX (see comment in testSend)
    395         time.sleep(0.01)
    396         smtp.quit()
    397         # make sure the Bcc header is still in the message.
    398         self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
    399                                     '<warped (at] silly.walks.com>')
    400 
    401         self.client_evt.set()
    402         self.serv_evt.wait()
    403         self.output.flush()
    404         # Add the X-Peer header that DebuggingServer adds
    405         m['X-Peer'] = socket.gethostbyname('localhost')
    406         # The Bcc header should not be transmitted.
    407         del m['Bcc']
    408         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
    409         self.assertEqual(self.output.getvalue(), mexpect)
    410         debugout = smtpd.DEBUGSTREAM.getvalue()
    411         sender = re.compile("^sender: foo (at] bar.com$", re.MULTILINE)
    412         self.assertRegex(debugout, sender)
    413         for addr in ('John', 'Sally', 'Fred', 'root@localhost',
    414                      'warped (at] silly.walks.com'):
    415             to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
    416                                  re.MULTILINE)
    417             self.assertRegex(debugout, to_addr)
    418 
    419     def testSendMessageWithSomeAddresses(self):
    420         # Make sure nothing breaks if not all of the three 'to' headers exist
    421         m = email.mime.text.MIMEText('A test message')
    422         m['From'] = 'foo (at] bar.com'
    423         m['To'] = 'John, Dinsdale'
    424         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    425         smtp.send_message(m)
    426         # XXX (see comment in testSend)
    427         time.sleep(0.01)
    428         smtp.quit()
    429 
    430         self.client_evt.set()
    431         self.serv_evt.wait()
    432         self.output.flush()
    433         # Add the X-Peer header that DebuggingServer adds
    434         m['X-Peer'] = socket.gethostbyname('localhost')
    435         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
    436         self.assertEqual(self.output.getvalue(), mexpect)
    437         debugout = smtpd.DEBUGSTREAM.getvalue()
    438         sender = re.compile("^sender: foo (at] bar.com$", re.MULTILINE)
    439         self.assertRegex(debugout, sender)
    440         for addr in ('John', 'Dinsdale'):
    441             to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
    442                                  re.MULTILINE)
    443             self.assertRegex(debugout, to_addr)
    444 
    445     def testSendMessageWithSpecifiedAddresses(self):
    446         # Make sure addresses specified in call override those in message.
    447         m = email.mime.text.MIMEText('A test message')
    448         m['From'] = 'foo (at] bar.com'
    449         m['To'] = 'John, Dinsdale'
    450         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    451         smtp.send_message(m, from_addr='joe (at] example.com', to_addrs='foo (at] example.net')
    452         # XXX (see comment in testSend)
    453         time.sleep(0.01)
    454         smtp.quit()
    455 
    456         self.client_evt.set()
    457         self.serv_evt.wait()
    458         self.output.flush()
    459         # Add the X-Peer header that DebuggingServer adds
    460         m['X-Peer'] = socket.gethostbyname('localhost')
    461         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
    462         self.assertEqual(self.output.getvalue(), mexpect)
    463         debugout = smtpd.DEBUGSTREAM.getvalue()
    464         sender = re.compile("^sender: joe (at] example.com$", re.MULTILINE)
    465         self.assertRegex(debugout, sender)
    466         for addr in ('John', 'Dinsdale'):
    467             to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
    468                                  re.MULTILINE)
    469             self.assertNotRegex(debugout, to_addr)
    470         recip = re.compile(r"^recips: .*'foo (at] example.net'.*$", re.MULTILINE)
    471         self.assertRegex(debugout, recip)
    472 
    473     def testSendMessageWithMultipleFrom(self):
    474         # Sender overrides To
    475         m = email.mime.text.MIMEText('A test message')
    476         m['From'] = 'Bernard, Bianca'
    477         m['Sender'] = 'the_rescuers (at] Rescue-Aid-Society.com'
    478         m['To'] = 'John, Dinsdale'
    479         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    480         smtp.send_message(m)
    481         # XXX (see comment in testSend)
    482         time.sleep(0.01)
    483         smtp.quit()
    484 
    485         self.client_evt.set()
    486         self.serv_evt.wait()
    487         self.output.flush()
    488         # Add the X-Peer header that DebuggingServer adds
    489         m['X-Peer'] = socket.gethostbyname('localhost')
    490         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
    491         self.assertEqual(self.output.getvalue(), mexpect)
    492         debugout = smtpd.DEBUGSTREAM.getvalue()
    493         sender = re.compile("^sender: the_rescuers (at] Rescue-Aid-Society.com$", re.MULTILINE)
    494         self.assertRegex(debugout, sender)
    495         for addr in ('John', 'Dinsdale'):
    496             to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
    497                                  re.MULTILINE)
    498             self.assertRegex(debugout, to_addr)
    499 
    500     def testSendMessageResent(self):
    501         m = email.mime.text.MIMEText('A test message')
    502         m['From'] = 'foo (at] bar.com'
    503         m['To'] = 'John'
    504         m['CC'] = 'Sally, Fred'
    505         m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped (at] silly.walks.com>'
    506         m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
    507         m['Resent-From'] = 'holy (at] grail.net'
    508         m['Resent-To'] = 'Martha <my_mom (at] great.cooker.com>, Jeff'
    509         m['Resent-Bcc'] = 'doe (at] losthope.net'
    510         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    511         smtp.send_message(m)
    512         # XXX (see comment in testSend)
    513         time.sleep(0.01)
    514         smtp.quit()
    515 
    516         self.client_evt.set()
    517         self.serv_evt.wait()
    518         self.output.flush()
    519         # The Resent-Bcc headers are deleted before serialization.
    520         del m['Bcc']
    521         del m['Resent-Bcc']
    522         # Add the X-Peer header that DebuggingServer adds
    523         m['X-Peer'] = socket.gethostbyname('localhost')
    524         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
    525         self.assertEqual(self.output.getvalue(), mexpect)
    526         debugout = smtpd.DEBUGSTREAM.getvalue()
    527         sender = re.compile("^sender: holy (at] grail.net$", re.MULTILINE)
    528         self.assertRegex(debugout, sender)
    529         for addr in ('my_mom (at] great.cooker.com', 'Jeff', 'doe (at] losthope.net'):
    530             to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
    531                                  re.MULTILINE)
    532             self.assertRegex(debugout, to_addr)
    533 
    534     def testSendMessageMultipleResentRaises(self):
    535         m = email.mime.text.MIMEText('A test message')
    536         m['From'] = 'foo (at] bar.com'
    537         m['To'] = 'John'
    538         m['CC'] = 'Sally, Fred'
    539         m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped (at] silly.walks.com>'
    540         m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
    541         m['Resent-From'] = 'holy (at] grail.net'
    542         m['Resent-To'] = 'Martha <my_mom (at] great.cooker.com>, Jeff'
    543         m['Resent-Bcc'] = 'doe (at] losthope.net'
    544         m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
    545         m['Resent-To'] = 'holy (at] grail.net'
    546         m['Resent-From'] = 'Martha <my_mom (at] great.cooker.com>, Jeff'
    547         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    548         with self.assertRaises(ValueError):
    549             smtp.send_message(m)
    550         smtp.close()
    551 
    552 class NonConnectingTests(unittest.TestCase):
    553 
    554     def testNotConnected(self):
    555         # Test various operations on an unconnected SMTP object that
    556         # should raise exceptions (at present the attempt in SMTP.send
    557         # to reference the nonexistent 'sock' attribute of the SMTP object
    558         # causes an AttributeError)
    559         smtp = smtplib.SMTP()
    560         self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
    561         self.assertRaises(smtplib.SMTPServerDisconnected,
    562                           smtp.send, 'test msg')
    563 
    564     def testNonnumericPort(self):
    565         # check that non-numeric port raises OSError
    566         self.assertRaises(OSError, smtplib.SMTP,
    567                           "localhost", "bogus")
    568         self.assertRaises(OSError, smtplib.SMTP,
    569                           "localhost:bogus")
    570 
    571 
    572 # test response of client to a non-successful HELO message
    573 @unittest.skipUnless(threading, 'Threading required for this test.')
    574 class BadHELOServerTests(unittest.TestCase):
    575 
    576     def setUp(self):
    577         smtplib.socket = mock_socket
    578         mock_socket.reply_with(b"199 no hello for you!")
    579         self.old_stdout = sys.stdout
    580         self.output = io.StringIO()
    581         sys.stdout = self.output
    582         self.port = 25
    583 
    584     def tearDown(self):
    585         smtplib.socket = socket
    586         sys.stdout = self.old_stdout
    587 
    588     def testFailingHELO(self):
    589         self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
    590                             HOST, self.port, 'localhost', 3)
    591 
    592 
    593 @unittest.skipUnless(threading, 'Threading required for this test.')
    594 class TooLongLineTests(unittest.TestCase):
    595     respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
    596 
    597     def setUp(self):
    598         self.old_stdout = sys.stdout
    599         self.output = io.StringIO()
    600         sys.stdout = self.output
    601 
    602         self.evt = threading.Event()
    603         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    604         self.sock.settimeout(15)
    605         self.port = support.bind_port(self.sock)
    606         servargs = (self.evt, self.respdata, self.sock)
    607         threading.Thread(target=server, args=servargs).start()
    608         self.evt.wait()
    609         self.evt.clear()
    610 
    611     def tearDown(self):
    612         self.evt.wait()
    613         sys.stdout = self.old_stdout
    614 
    615     def testLineTooLong(self):
    616         self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
    617                           HOST, self.port, 'localhost', 3)
    618 
    619 
    620 sim_users = {'Mr.A (at] somewhere.com':'John A',
    621              'Ms.B (at] xn--fo-fka.com':'Sally B',
    622              'Mrs.C (at] somewhereesle.com':'Ruth C',
    623             }
    624 
    625 sim_auth = ('Mr.A (at] somewhere.com', 'somepassword')
    626 sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
    627                           'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
    628 sim_lists = {'list-1':['Mr.A (at] somewhere.com','Mrs.C (at] somewhereesle.com'],
    629              'list-2':['Ms.B (at] xn--fo-fka.com',],
    630             }
    631 
    632 # Simulated SMTP channel & server
    633 class ResponseException(Exception): pass
    634 class SimSMTPChannel(smtpd.SMTPChannel):
    635 
    636     quit_response = None
    637     mail_response = None
    638     rcpt_response = None
    639     data_response = None
    640     rcpt_count = 0
    641     rset_count = 0
    642     disconnect = 0
    643     AUTH = 99    # Add protocol state to enable auth testing.
    644     authenticated_user = None
    645 
    646     def __init__(self, extra_features, *args, **kw):
    647         self._extrafeatures = ''.join(
    648             [ "250-{0}\r\n".format(x) for x in extra_features ])
    649         super(SimSMTPChannel, self).__init__(*args, **kw)
    650 
    651     # AUTH related stuff.  It would be nice if support for this were in smtpd.
    652     def found_terminator(self):
    653         if self.smtp_state == self.AUTH:
    654             line = self._emptystring.join(self.received_lines)
    655             print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
    656             self.received_lines = []
    657             try:
    658                 self.auth_object(line)
    659             except ResponseException as e:
    660                 self.smtp_state = self.COMMAND
    661                 self.push('%s %s' % (e.smtp_code, e.smtp_error))
    662                 return
    663         super().found_terminator()
    664 
    665 
    666     def smtp_AUTH(self, arg):
    667         if not self.seen_greeting:
    668             self.push('503 Error: send EHLO first')
    669             return
    670         if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
    671             self.push('500 Error: command "AUTH" not recognized')
    672             return
    673         if self.authenticated_user is not None:
    674             self.push(
    675                 '503 Bad sequence of commands: already authenticated')
    676             return
    677         args = arg.split()
    678         if len(args) not in [1, 2]:
    679             self.push('501 Syntax: AUTH <mechanism> [initial-response]')
    680             return
    681         auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
    682         try:
    683             self.auth_object = getattr(self, auth_object_name)
    684         except AttributeError:
    685             self.push('504 Command parameter not implemented: unsupported '
    686                       ' authentication mechanism {!r}'.format(auth_object_name))
    687             return
    688         self.smtp_state = self.AUTH
    689         self.auth_object(args[1] if len(args) == 2 else None)
    690 
    691     def _authenticated(self, user, valid):
    692         if valid:
    693             self.authenticated_user = user
    694             self.push('235 Authentication Succeeded')
    695         else:
    696             self.push('535 Authentication credentials invalid')
    697         self.smtp_state = self.COMMAND
    698 
    699     def _decode_base64(self, string):
    700         return base64.decodebytes(string.encode('ascii')).decode('utf-8')
    701 
    702     def _auth_plain(self, arg=None):
    703         if arg is None:
    704             self.push('334 ')
    705         else:
    706             logpass = self._decode_base64(arg)
    707             try:
    708                 *_, user, password = logpass.split('\0')
    709             except ValueError as e:
    710                 self.push('535 Splitting response {!r} into user and password'
    711                           ' failed: {}'.format(logpass, e))
    712                 return
    713             self._authenticated(user, password == sim_auth[1])
    714 
    715     def _auth_login(self, arg=None):
    716         if arg is None:
    717             # base64 encoded 'Username:'
    718             self.push('334 VXNlcm5hbWU6')
    719         elif not hasattr(self, '_auth_login_user'):
    720             self._auth_login_user = self._decode_base64(arg)
    721             # base64 encoded 'Password:'
    722             self.push('334 UGFzc3dvcmQ6')
    723         else:
    724             password = self._decode_base64(arg)
    725             self._authenticated(self._auth_login_user, password == sim_auth[1])
    726             del self._auth_login_user
    727 
    728     def _auth_cram_md5(self, arg=None):
    729         if arg is None:
    730             self.push('334 {}'.format(sim_cram_md5_challenge))
    731         else:
    732             logpass = self._decode_base64(arg)
    733             try:
    734                 user, hashed_pass = logpass.split()
    735             except ValueError as e:
    736                 self.push('535 Splitting response {!r} into user and password'
    737                           'failed: {}'.format(logpass, e))
    738                 return False
    739             valid_hashed_pass = hmac.HMAC(
    740                 sim_auth[1].encode('ascii'),
    741                 self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
    742                 'md5').hexdigest()
    743             self._authenticated(user, hashed_pass == valid_hashed_pass)
    744     # end AUTH related stuff.
    745 
    746     def smtp_EHLO(self, arg):
    747         resp = ('250-testhost\r\n'
    748                 '250-EXPN\r\n'
    749                 '250-SIZE 20000000\r\n'
    750                 '250-STARTTLS\r\n'
    751                 '250-DELIVERBY\r\n')
    752         resp = resp + self._extrafeatures + '250 HELP'
    753         self.push(resp)
    754         self.seen_greeting = arg
    755         self.extended_smtp = True
    756 
    757     def smtp_VRFY(self, arg):
    758         # For max compatibility smtplib should be sending the raw address.
    759         if arg in sim_users:
    760             self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
    761         else:
    762             self.push('550 No such user: %s' % arg)
    763 
    764     def smtp_EXPN(self, arg):
    765         list_name = arg.lower()
    766         if list_name in sim_lists:
    767             user_list = sim_lists[list_name]
    768             for n, user_email in enumerate(user_list):
    769                 quoted_addr = smtplib.quoteaddr(user_email)
    770                 if n < len(user_list) - 1:
    771                     self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
    772                 else:
    773                     self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
    774         else:
    775             self.push('550 No access for you!')
    776 
    777     def smtp_QUIT(self, arg):
    778         if self.quit_response is None:
    779             super(SimSMTPChannel, self).smtp_QUIT(arg)
    780         else:
    781             self.push(self.quit_response)
    782             self.close_when_done()
    783 
    784     def smtp_MAIL(self, arg):
    785         if self.mail_response is None:
    786             super().smtp_MAIL(arg)
    787         else:
    788             self.push(self.mail_response)
    789             if self.disconnect:
    790                 self.close_when_done()
    791 
    792     def smtp_RCPT(self, arg):
    793         if self.rcpt_response is None:
    794             super().smtp_RCPT(arg)
    795             return
    796         self.rcpt_count += 1
    797         self.push(self.rcpt_response[self.rcpt_count-1])
    798 
    799     def smtp_RSET(self, arg):
    800         self.rset_count += 1
    801         super().smtp_RSET(arg)
    802 
    803     def smtp_DATA(self, arg):
    804         if self.data_response is None:
    805             super().smtp_DATA(arg)
    806         else:
    807             self.push(self.data_response)
    808 
    809     def handle_error(self):
    810         raise
    811 
    812 
    813 class SimSMTPServer(smtpd.SMTPServer):
    814 
    815     channel_class = SimSMTPChannel
    816 
    817     def __init__(self, *args, **kw):
    818         self._extra_features = []
    819         smtpd.SMTPServer.__init__(self, *args, **kw)
    820 
    821     def handle_accepted(self, conn, addr):
    822         self._SMTPchannel = self.channel_class(
    823             self._extra_features, self, conn, addr,
    824             decode_data=self._decode_data)
    825 
    826     def process_message(self, peer, mailfrom, rcpttos, data):
    827         pass
    828 
    829     def add_feature(self, feature):
    830         self._extra_features.append(feature)
    831 
    832     def handle_error(self):
    833         raise
    834 
    835 
    836 # Test various SMTP & ESMTP commands/behaviors that require a simulated server
    837 # (i.e., something with more features than DebuggingServer)
    838 @unittest.skipUnless(threading, 'Threading required for this test.')
    839 class SMTPSimTests(unittest.TestCase):
    840 
    841     def setUp(self):
    842         self.real_getfqdn = socket.getfqdn
    843         socket.getfqdn = mock_socket.getfqdn
    844         self.serv_evt = threading.Event()
    845         self.client_evt = threading.Event()
    846         # Pick a random unused port by passing 0 for the port number
    847         self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
    848         # Keep a note of what port was assigned
    849         self.port = self.serv.socket.getsockname()[1]
    850         serv_args = (self.serv, self.serv_evt, self.client_evt)
    851         self.thread = threading.Thread(target=debugging_server, args=serv_args)
    852         self.thread.start()
    853 
    854         # wait until server thread has assigned a port number
    855         self.serv_evt.wait()
    856         self.serv_evt.clear()
    857 
    858     def tearDown(self):
    859         socket.getfqdn = self.real_getfqdn
    860         # indicate that the client is finished
    861         self.client_evt.set()
    862         # wait for the server thread to terminate
    863         self.serv_evt.wait()
    864         self.thread.join()
    865 
    866     def testBasic(self):
    867         # smoke test
    868         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    869         smtp.quit()
    870 
    871     def testEHLO(self):
    872         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    873 
    874         # no features should be present before the EHLO
    875         self.assertEqual(smtp.esmtp_features, {})
    876 
    877         # features expected from the test server
    878         expected_features = {'expn':'',
    879                              'size': '20000000',
    880                              'starttls': '',
    881                              'deliverby': '',
    882                              'help': '',
    883                              }
    884 
    885         smtp.ehlo()
    886         self.assertEqual(smtp.esmtp_features, expected_features)
    887         for k in expected_features:
    888             self.assertTrue(smtp.has_extn(k))
    889         self.assertFalse(smtp.has_extn('unsupported-feature'))
    890         smtp.quit()
    891 
    892     def testVRFY(self):
    893         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    894 
    895         for addr_spec, name in sim_users.items():
    896             expected_known = (250, bytes('%s %s' %
    897                                          (name, smtplib.quoteaddr(addr_spec)),
    898                                          "ascii"))
    899             self.assertEqual(smtp.vrfy(addr_spec), expected_known)
    900 
    901         u = 'nobody (at] nowhere.com'
    902         expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
    903         self.assertEqual(smtp.vrfy(u), expected_unknown)
    904         smtp.quit()
    905 
    906     def testEXPN(self):
    907         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    908 
    909         for listname, members in sim_lists.items():
    910             users = []
    911             for m in members:
    912                 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
    913             expected_known = (250, bytes('\n'.join(users), "ascii"))
    914             self.assertEqual(smtp.expn(listname), expected_known)
    915 
    916         u = 'PSU-Members-List'
    917         expected_unknown = (550, b'No access for you!')
    918         self.assertEqual(smtp.expn(u), expected_unknown)
    919         smtp.quit()
    920 
    921     def testAUTH_PLAIN(self):
    922         self.serv.add_feature("AUTH PLAIN")
    923         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    924         resp = smtp.login(sim_auth[0], sim_auth[1])
    925         self.assertEqual(resp, (235, b'Authentication Succeeded'))
    926         smtp.close()
    927 
    928     def testAUTH_LOGIN(self):
    929         self.serv.add_feature("AUTH LOGIN")
    930         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    931         resp = smtp.login(sim_auth[0], sim_auth[1])
    932         self.assertEqual(resp, (235, b'Authentication Succeeded'))
    933         smtp.close()
    934 
    935     def testAUTH_CRAM_MD5(self):
    936         self.serv.add_feature("AUTH CRAM-MD5")
    937         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    938         resp = smtp.login(sim_auth[0], sim_auth[1])
    939         self.assertEqual(resp, (235, b'Authentication Succeeded'))
    940         smtp.close()
    941 
    942     def testAUTH_multiple(self):
    943         # Test that multiple authentication methods are tried.
    944         self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
    945         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    946         resp = smtp.login(sim_auth[0], sim_auth[1])
    947         self.assertEqual(resp, (235, b'Authentication Succeeded'))
    948         smtp.close()
    949 
    950     def test_auth_function(self):
    951         supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'}
    952         for mechanism in supported:
    953             self.serv.add_feature("AUTH {}".format(mechanism))
    954         for mechanism in supported:
    955             with self.subTest(mechanism=mechanism):
    956                 smtp = smtplib.SMTP(HOST, self.port,
    957                                     local_hostname='localhost', timeout=15)
    958                 smtp.ehlo('foo')
    959                 smtp.user, smtp.password = sim_auth[0], sim_auth[1]
    960                 method = 'auth_' + mechanism.lower().replace('-', '_')
    961                 resp = smtp.auth(mechanism, getattr(smtp, method))
    962                 self.assertEqual(resp, (235, b'Authentication Succeeded'))
    963                 smtp.close()
    964 
    965     def test_quit_resets_greeting(self):
    966         smtp = smtplib.SMTP(HOST, self.port,
    967                             local_hostname='localhost',
    968                             timeout=15)
    969         code, message = smtp.ehlo()
    970         self.assertEqual(code, 250)
    971         self.assertIn('size', smtp.esmtp_features)
    972         smtp.quit()
    973         self.assertNotIn('size', smtp.esmtp_features)
    974         smtp.connect(HOST, self.port)
    975         self.assertNotIn('size', smtp.esmtp_features)
    976         smtp.ehlo_or_helo_if_needed()
    977         self.assertIn('size', smtp.esmtp_features)
    978         smtp.quit()
    979 
    980     def test_with_statement(self):
    981         with smtplib.SMTP(HOST, self.port) as smtp:
    982             code, message = smtp.noop()
    983             self.assertEqual(code, 250)
    984         self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
    985         with smtplib.SMTP(HOST, self.port) as smtp:
    986             smtp.close()
    987         self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
    988 
    989     def test_with_statement_QUIT_failure(self):
    990         with self.assertRaises(smtplib.SMTPResponseException) as error:
    991             with smtplib.SMTP(HOST, self.port) as smtp:
    992                 smtp.noop()
    993                 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
    994         self.assertEqual(error.exception.smtp_code, 421)
    995         self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
    996 
    997     #TODO: add tests for correct AUTH method fallback now that the
    998     #test infrastructure can support it.
    999 
   1000     # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
   1001     def test__rest_from_mail_cmd(self):
   1002         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
   1003         smtp.noop()
   1004         self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
   1005         self.serv._SMTPchannel.disconnect = True
   1006         with self.assertRaises(smtplib.SMTPSenderRefused):
   1007             smtp.sendmail('John', 'Sally', 'test message')
   1008         self.assertIsNone(smtp.sock)
   1009 
   1010     # Issue 5713: make sure close, not rset, is called if we get a 421 error
   1011     def test_421_from_mail_cmd(self):
   1012         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
   1013         smtp.noop()
   1014         self.serv._SMTPchannel.mail_response = '421 closing connection'
   1015         with self.assertRaises(smtplib.SMTPSenderRefused):
   1016             smtp.sendmail('John', 'Sally', 'test message')
   1017         self.assertIsNone(smtp.sock)
   1018         self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
   1019 
   1020     def test_421_from_rcpt_cmd(self):
   1021         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
   1022         smtp.noop()
   1023         self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
   1024         with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
   1025             smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
   1026         self.assertIsNone(smtp.sock)
   1027         self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
   1028         self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
   1029 
   1030     def test_421_from_data_cmd(self):
   1031         class MySimSMTPChannel(SimSMTPChannel):
   1032             def found_terminator(self):
   1033                 if self.smtp_state == self.DATA:
   1034                     self.push('421 closing')
   1035                 else:
   1036                     super().found_terminator()
   1037         self.serv.channel_class = MySimSMTPChannel
   1038         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
   1039         smtp.noop()
   1040         with self.assertRaises(smtplib.SMTPDataError):
   1041             smtp.sendmail('John (at] foo.org', ['Sally (at] foo.org'], 'test message')
   1042         self.assertIsNone(smtp.sock)
   1043         self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
   1044 
   1045     def test_smtputf8_NotSupportedError_if_no_server_support(self):
   1046         smtp = smtplib.SMTP(
   1047             HOST, self.port, local_hostname='localhost', timeout=3)
   1048         self.addCleanup(smtp.close)
   1049         smtp.ehlo()
   1050         self.assertTrue(smtp.does_esmtp)
   1051         self.assertFalse(smtp.has_extn('smtputf8'))
   1052         self.assertRaises(
   1053             smtplib.SMTPNotSupportedError,
   1054             smtp.sendmail,
   1055             'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
   1056         self.assertRaises(
   1057             smtplib.SMTPNotSupportedError,
   1058             smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
   1059 
   1060     def test_send_unicode_without_SMTPUTF8(self):
   1061         smtp = smtplib.SMTP(
   1062             HOST, self.port, local_hostname='localhost', timeout=3)
   1063         self.addCleanup(smtp.close)
   1064         self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Bb', '')
   1065         self.assertRaises(UnicodeEncodeError, smtp.mail, 'lice')
   1066 
   1067 
   1068 class SimSMTPUTF8Server(SimSMTPServer):
   1069 
   1070     def __init__(self, *args, **kw):
   1071         # The base SMTP server turns these on automatically, but our test
   1072         # server is set up to munge the EHLO response, so we need to provide
   1073         # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
   1074         self._extra_features = ['SMTPUTF8', '8BITMIME']
   1075         smtpd.SMTPServer.__init__(self, *args, **kw)
   1076 
   1077     def handle_accepted(self, conn, addr):
   1078         self._SMTPchannel = self.channel_class(
   1079             self._extra_features, self, conn, addr,
   1080             decode_data=self._decode_data,
   1081             enable_SMTPUTF8=self.enable_SMTPUTF8,
   1082         )
   1083 
   1084     def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
   1085                                                              rcpt_options=None):
   1086         self.last_peer = peer
   1087         self.last_mailfrom = mailfrom
   1088         self.last_rcpttos = rcpttos
   1089         self.last_message = data
   1090         self.last_mail_options = mail_options
   1091         self.last_rcpt_options = rcpt_options
   1092 
   1093 
   1094 @unittest.skipUnless(threading, 'Threading required for this test.')
   1095 class SMTPUTF8SimTests(unittest.TestCase):
   1096 
   1097     maxDiff = None
   1098 
   1099     def setUp(self):
   1100         self.real_getfqdn = socket.getfqdn
   1101         socket.getfqdn = mock_socket.getfqdn
   1102         self.serv_evt = threading.Event()
   1103         self.client_evt = threading.Event()
   1104         # Pick a random unused port by passing 0 for the port number
   1105         self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
   1106                                       decode_data=False,
   1107                                       enable_SMTPUTF8=True)
   1108         # Keep a note of what port was assigned
   1109         self.port = self.serv.socket.getsockname()[1]
   1110         serv_args = (self.serv, self.serv_evt, self.client_evt)
   1111         self.thread = threading.Thread(target=debugging_server, args=serv_args)
   1112         self.thread.start()
   1113 
   1114         # wait until server thread has assigned a port number
   1115         self.serv_evt.wait()
   1116         self.serv_evt.clear()
   1117 
   1118     def tearDown(self):
   1119         socket.getfqdn = self.real_getfqdn
   1120         # indicate that the client is finished
   1121         self.client_evt.set()
   1122         # wait for the server thread to terminate
   1123         self.serv_evt.wait()
   1124         self.thread.join()
   1125 
   1126     def test_test_server_supports_extensions(self):
   1127         smtp = smtplib.SMTP(
   1128             HOST, self.port, local_hostname='localhost', timeout=3)
   1129         self.addCleanup(smtp.close)
   1130         smtp.ehlo()
   1131         self.assertTrue(smtp.does_esmtp)
   1132         self.assertTrue(smtp.has_extn('smtputf8'))
   1133 
   1134     def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
   1135         m = 'a test message containing unicode!'.encode('utf-8')
   1136         smtp = smtplib.SMTP(
   1137             HOST, self.port, local_hostname='localhost', timeout=3)
   1138         self.addCleanup(smtp.close)
   1139         smtp.sendmail('Jhn', 'Slly', m,
   1140                       mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
   1141         self.assertEqual(self.serv.last_mailfrom, 'Jhn')
   1142         self.assertEqual(self.serv.last_rcpttos, ['Slly'])
   1143         self.assertEqual(self.serv.last_message, m)
   1144         self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
   1145         self.assertIn('SMTPUTF8', self.serv.last_mail_options)
   1146         self.assertEqual(self.serv.last_rcpt_options, [])
   1147 
   1148     def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
   1149         m = 'a test message containing unicode!'.encode('utf-8')
   1150         smtp = smtplib.SMTP(
   1151             HOST, self.port, local_hostname='localhost', timeout=3)
   1152         self.addCleanup(smtp.close)
   1153         smtp.ehlo()
   1154         self.assertEqual(
   1155             smtp.mail('J', options=['BODY=8BITMIME', 'SMTPUTF8']),
   1156             (250, b'OK'))
   1157         self.assertEqual(smtp.rcpt('Jnos'), (250, b'OK'))
   1158         self.assertEqual(smtp.data(m), (250, b'OK'))
   1159         self.assertEqual(self.serv.last_mailfrom, 'J')
   1160         self.assertEqual(self.serv.last_rcpttos, ['Jnos'])
   1161         self.assertEqual(self.serv.last_message, m)
   1162         self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
   1163         self.assertIn('SMTPUTF8', self.serv.last_mail_options)
   1164         self.assertEqual(self.serv.last_rcpt_options, [])
   1165 
   1166     def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
   1167         msg = EmailMessage()
   1168         msg['From'] = "Polo <fo (at] bar.com>"
   1169         msg['To'] = 'Dinsdale'
   1170         msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
   1171         # XXX I don't know why I need two \n's here, but this is an existing
   1172         # bug (if it is one) and not a problem with the new functionality.
   1173         msg.set_content("oh l l, know what I mean, know what I mean?\n\n")
   1174         # XXX smtpd converts received /r/n to /n, so we can't easily test that
   1175         # we are successfully sending /r/n :(.
   1176         expected = textwrap.dedent("""\
   1177             From: Polo <fo (at] bar.com>
   1178             To: Dinsdale
   1179             Subject: Nudge nudge, wink, wink \u1F609
   1180             Content-Type: text/plain; charset="utf-8"
   1181             Content-Transfer-Encoding: 8bit
   1182             MIME-Version: 1.0
   1183 
   1184             oh l l, know what I mean, know what I mean?
   1185             """)
   1186         smtp = smtplib.SMTP(
   1187             HOST, self.port, local_hostname='localhost', timeout=3)
   1188         self.addCleanup(smtp.close)
   1189         self.assertEqual(smtp.send_message(msg), {})
   1190         self.assertEqual(self.serv.last_mailfrom, 'fo (at] bar.com')
   1191         self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
   1192         self.assertEqual(self.serv.last_message.decode(), expected)
   1193         self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
   1194         self.assertIn('SMTPUTF8', self.serv.last_mail_options)
   1195         self.assertEqual(self.serv.last_rcpt_options, [])
   1196 
   1197     def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
   1198         msg = EmailMessage()
   1199         msg['From'] = "Polo <fo (at] bar.com>"
   1200         msg['To'] = 'Dinsdale'
   1201         msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
   1202         smtp = smtplib.SMTP(
   1203             HOST, self.port, local_hostname='localhost', timeout=3)
   1204         self.addCleanup(smtp.close)
   1205         self.assertRaises(smtplib.SMTPNotSupportedError,
   1206                           smtp.send_message(msg))
   1207 
   1208 
   1209 EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
   1210 
   1211 class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
   1212     def smtp_AUTH(self, arg):
   1213         # RFC 4954's AUTH command allows for an optional initial-response.
   1214         # Not all AUTH methods support this; some require a challenge.  AUTH
   1215         # PLAIN does those, so test that here.  See issue #15014.
   1216         args = arg.split()
   1217         if args[0].lower() == 'plain':
   1218             if len(args) == 2:
   1219                 # AUTH PLAIN <initial-response> with the response base 64
   1220                 # encoded.  Hard code the expected response for the test.
   1221                 if args[1] == EXPECTED_RESPONSE:
   1222                     self.push('235 Ok')
   1223                     return
   1224         self.push('571 Bad authentication')
   1225 
   1226 class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
   1227     channel_class = SimSMTPAUTHInitialResponseChannel
   1228 
   1229 
   1230 @unittest.skipUnless(threading, 'Threading required for this test.')
   1231 class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
   1232     def setUp(self):
   1233         self.real_getfqdn = socket.getfqdn
   1234         socket.getfqdn = mock_socket.getfqdn
   1235         self.serv_evt = threading.Event()
   1236         self.client_evt = threading.Event()
   1237         # Pick a random unused port by passing 0 for the port number
   1238         self.serv = SimSMTPAUTHInitialResponseServer(
   1239             (HOST, 0), ('nowhere', -1), decode_data=True)
   1240         # Keep a note of what port was assigned
   1241         self.port = self.serv.socket.getsockname()[1]
   1242         serv_args = (self.serv, self.serv_evt, self.client_evt)
   1243         self.thread = threading.Thread(target=debugging_server, args=serv_args)
   1244         self.thread.start()
   1245 
   1246         # wait until server thread has assigned a port number
   1247         self.serv_evt.wait()
   1248         self.serv_evt.clear()
   1249 
   1250     def tearDown(self):
   1251         socket.getfqdn = self.real_getfqdn
   1252         # indicate that the client is finished
   1253         self.client_evt.set()
   1254         # wait for the server thread to terminate
   1255         self.serv_evt.wait()
   1256         self.thread.join()
   1257 
   1258     def testAUTH_PLAIN_initial_response_login(self):
   1259         self.serv.add_feature('AUTH PLAIN')
   1260         smtp = smtplib.SMTP(HOST, self.port,
   1261                             local_hostname='localhost', timeout=15)
   1262         smtp.login('psu', 'doesnotexist')
   1263         smtp.close()
   1264 
   1265     def testAUTH_PLAIN_initial_response_auth(self):
   1266         self.serv.add_feature('AUTH PLAIN')
   1267         smtp = smtplib.SMTP(HOST, self.port,
   1268                             local_hostname='localhost', timeout=15)
   1269         smtp.user = 'psu'
   1270         smtp.password = 'doesnotexist'
   1271         code, response = smtp.auth('plain', smtp.auth_plain)
   1272         smtp.close()
   1273         self.assertEqual(code, 235)
   1274 
   1275 
   1276 @support.reap_threads
   1277 def test_main(verbose=None):
   1278     support.run_unittest(
   1279         BadHELOServerTests,
   1280         DebuggingServerTests,
   1281         GeneralTests,
   1282         NonConnectingTests,
   1283         SMTPAUTHInitialResponseSimTests,
   1284         SMTPSimTests,
   1285         TooLongLineTests,
   1286         )
   1287 
   1288 
   1289 if __name__ == '__main__':
   1290     test_main()
   1291