Home | History | Annotate | Download | only in test
      1 import asyncore
      2 import base64
      3 import email.mime.text
      4 from email.message import EmailMessage
      5 from email.base64mime import body_encode as encode_base64
      6 import email.utils
      7 import hmac
      8 import socket
      9 import smtpd
     10 import smtplib
     11 import io
     12 import re
     13 import sys
     14 import time
     15 import select
     16 import errno
     17 import textwrap
     18 import threading
     19 
     20 import unittest
     21 from test import support, mock_socket
     22 from test.support import HOST, HOSTv4, HOSTv6
     23 from unittest.mock import Mock
     24 
     25 
     26 if sys.platform == 'darwin':
     27     # select.poll returns a select.POLLHUP at the end of the tests
     28     # on darwin, so just ignore it
     29     def handle_expt(self):
     30         pass
     31     smtpd.SMTPChannel.handle_expt = handle_expt
     32 
     33 
     34 def server(evt, buf, serv):
     35     serv.listen()
     36     evt.set()
     37     try:
     38         conn, addr = serv.accept()
     39     except socket.timeout:
     40         pass
     41     else:
     42         n = 500
     43         while buf and n > 0:
     44             r, w, e = select.select([], [conn], [])
     45             if w:
     46                 sent = conn.send(buf)
     47                 buf = buf[sent:]
     48 
     49             n -= 1
     50 
     51         conn.close()
     52     finally:
     53         serv.close()
     54         evt.set()
     55 
     56 class GeneralTests(unittest.TestCase):
     57 
     58     def setUp(self):
     59         smtplib.socket = mock_socket
     60         self.port = 25
     61 
     62     def tearDown(self):
     63         smtplib.socket = socket
     64 
     65     # This method is no longer used but is retained for backward compatibility,
     66     # so test to make sure it still works.
     67     def testQuoteData(self):
     68         teststr  = "abc\n.jkl\rfoo\r\n..blue"
     69         expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
     70         self.assertEqual(expected, smtplib.quotedata(teststr))
     71 
     72     def testBasic1(self):
     73         mock_socket.reply_with(b"220 Hola mundo")
     74         # connects
     75         smtp = smtplib.SMTP(HOST, self.port)
     76         smtp.close()
     77 
     78     def testSourceAddress(self):
     79         mock_socket.reply_with(b"220 Hola mundo")
     80         # connects
     81         smtp = smtplib.SMTP(HOST, self.port,
     82                 source_address=('127.0.0.1',19876))
     83         self.assertEqual(smtp.source_address, ('127.0.0.1', 19876))
     84         smtp.close()
     85 
     86     def testBasic2(self):
     87         mock_socket.reply_with(b"220 Hola mundo")
     88         # connects, include port in host name
     89         smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
     90         smtp.close()
     91 
     92     def testLocalHostName(self):
     93         mock_socket.reply_with(b"220 Hola mundo")
     94         # check that supplied local_hostname is used
     95         smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
     96         self.assertEqual(smtp.local_hostname, "testhost")
     97         smtp.close()
     98 
     99     def testTimeoutDefault(self):
    100         mock_socket.reply_with(b"220 Hola mundo")
    101         self.assertIsNone(mock_socket.getdefaulttimeout())
    102         mock_socket.setdefaulttimeout(30)
    103         self.assertEqual(mock_socket.getdefaulttimeout(), 30)
    104         try:
    105             smtp = smtplib.SMTP(HOST, self.port)
    106         finally:
    107             mock_socket.setdefaulttimeout(None)
    108         self.assertEqual(smtp.sock.gettimeout(), 30)
    109         smtp.close()
    110 
    111     def testTimeoutNone(self):
    112         mock_socket.reply_with(b"220 Hola mundo")
    113         self.assertIsNone(socket.getdefaulttimeout())
    114         socket.setdefaulttimeout(30)
    115         try:
    116             smtp = smtplib.SMTP(HOST, self.port, timeout=None)
    117         finally:
    118             socket.setdefaulttimeout(None)
    119         self.assertIsNone(smtp.sock.gettimeout())
    120         smtp.close()
    121 
    122     def testTimeoutValue(self):
    123         mock_socket.reply_with(b"220 Hola mundo")
    124         smtp = smtplib.SMTP(HOST, self.port, timeout=30)
    125         self.assertEqual(smtp.sock.gettimeout(), 30)
    126         smtp.close()
    127 
    128     def test_debuglevel(self):
    129         mock_socket.reply_with(b"220 Hello world")
    130         smtp = smtplib.SMTP()
    131         smtp.set_debuglevel(1)
    132         with support.captured_stderr() as stderr:
    133             smtp.connect(HOST, self.port)
    134         smtp.close()
    135         expected = re.compile(r"^connect:", re.MULTILINE)
    136         self.assertRegex(stderr.getvalue(), expected)
    137 
    138     def test_debuglevel_2(self):
    139         mock_socket.reply_with(b"220 Hello world")
    140         smtp = smtplib.SMTP()
    141         smtp.set_debuglevel(2)
    142         with support.captured_stderr() as stderr:
    143             smtp.connect(HOST, self.port)
    144         smtp.close()
    145         expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
    146                               re.MULTILINE)
    147         self.assertRegex(stderr.getvalue(), expected)
    148 
    149 
    150 # Test server thread using the specified SMTP server class
    151 def debugging_server(serv, serv_evt, client_evt):
    152     serv_evt.set()
    153 
    154     try:
    155         if hasattr(select, 'poll'):
    156             poll_fun = asyncore.poll2
    157         else:
    158             poll_fun = asyncore.poll
    159 
    160         n = 1000
    161         while asyncore.socket_map and n > 0:
    162             poll_fun(0.01, asyncore.socket_map)
    163 
    164             # when the client conversation is finished, it will
    165             # set client_evt, and it's then ok to kill the server
    166             if client_evt.is_set():
    167                 serv.close()
    168                 break
    169 
    170             n -= 1
    171 
    172     except socket.timeout:
    173         pass
    174     finally:
    175         if not client_evt.is_set():
    176             # allow some time for the client to read the result
    177             time.sleep(0.5)
    178             serv.close()
    179         asyncore.close_all()
    180         serv_evt.set()
    181 
    182 MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
    183 MSG_END = '------------ END MESSAGE ------------\n'
    184 
    185 # NOTE: Some SMTP objects in the tests below are created with a non-default
    186 # local_hostname argument to the constructor, since (on some systems) the FQDN
    187 # lookup caused by the default local_hostname sometimes takes so long that the
    188 # test server times out, causing the test to fail.
    189 
    190 # Test behavior of smtpd.DebuggingServer
    191 class DebuggingServerTests(unittest.TestCase):
    192 
    193     maxDiff = None
    194 
    195     def setUp(self):
    196         self.real_getfqdn = socket.getfqdn
    197         socket.getfqdn = mock_socket.getfqdn
    198         # temporarily replace sys.stdout to capture DebuggingServer output
    199         self.old_stdout = sys.stdout
    200         self.output = io.StringIO()
    201         sys.stdout = self.output
    202 
    203         self.serv_evt = threading.Event()
    204         self.client_evt = threading.Event()
    205         # Capture SMTPChannel debug output
    206         self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
    207         smtpd.DEBUGSTREAM = io.StringIO()
    208         # Pick a random unused port by passing 0 for the port number
    209         self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
    210                                           decode_data=True)
    211         # Keep a note of what server host and port were assigned
    212         self.host, self.port = self.serv.socket.getsockname()[:2]
    213         serv_args = (self.serv, self.serv_evt, self.client_evt)
    214         self.thread = threading.Thread(target=debugging_server, args=serv_args)
    215         self.thread.start()
    216 
    217         # wait until server thread has assigned a port number
    218         self.serv_evt.wait()
    219         self.serv_evt.clear()
    220 
    221     def tearDown(self):
    222         socket.getfqdn = self.real_getfqdn
    223         # indicate that the client is finished
    224         self.client_evt.set()
    225         # wait for the server thread to terminate
    226         self.serv_evt.wait()
    227         self.thread.join()
    228         # restore sys.stdout
    229         sys.stdout = self.old_stdout
    230         # restore DEBUGSTREAM
    231         smtpd.DEBUGSTREAM.close()
    232         smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
    233 
    234     def get_output_without_xpeer(self):
    235         test_output = self.output.getvalue()
    236         return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
    237                       test_output, flags=re.MULTILINE|re.DOTALL)
    238 
    239     def testBasic(self):
    240         # connect
    241         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    242         smtp.quit()
    243 
    244     def testSourceAddress(self):
    245         # connect
    246         src_port = support.find_unused_port()
    247         try:
    248             smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
    249                                 timeout=3, source_address=(self.host, src_port))
    250             self.assertEqual(smtp.source_address, (self.host, src_port))
    251             self.assertEqual(smtp.local_hostname, 'localhost')
    252             smtp.quit()
    253         except OSError as e:
    254             if e.errno == errno.EADDRINUSE:
    255                 self.skipTest("couldn't bind to source port %d" % src_port)
    256             raise
    257 
    258     def testNOOP(self):
    259         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    260         expected = (250, b'OK')
    261         self.assertEqual(smtp.noop(), expected)
    262         smtp.quit()
    263 
    264     def testRSET(self):
    265         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    266         expected = (250, b'OK')
    267         self.assertEqual(smtp.rset(), expected)
    268         smtp.quit()
    269 
    270     def testELHO(self):
    271         # EHLO isn't implemented in DebuggingServer
    272         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    273         expected = (250, b'\nSIZE 33554432\nHELP')
    274         self.assertEqual(smtp.ehlo(), expected)
    275         smtp.quit()
    276 
    277     def testEXPNNotImplemented(self):
    278         # EXPN isn't implemented in DebuggingServer
    279         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    280         expected = (502, b'EXPN not implemented')
    281         smtp.putcmd('EXPN')
    282         self.assertEqual(smtp.getreply(), expected)
    283         smtp.quit()
    284 
    285     def testVRFY(self):
    286         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    287         expected = (252, b'Cannot VRFY user, but will accept message ' + \
    288                          b'and attempt delivery')
    289         self.assertEqual(smtp.vrfy('nobody (at] nowhere.com'), expected)
    290         self.assertEqual(smtp.verify('nobody (at] nowhere.com'), expected)
    291         smtp.quit()
    292 
    293     def testSecondHELO(self):
    294         # check that a second HELO returns a message that it's a duplicate
    295         # (this behavior is specific to smtpd.SMTPChannel)
    296         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    297         smtp.helo()
    298         expected = (503, b'Duplicate HELO/EHLO')
    299         self.assertEqual(smtp.helo(), expected)
    300         smtp.quit()
    301 
    302     def testHELP(self):
    303         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    304         self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
    305                                       b'RCPT DATA RSET NOOP QUIT VRFY')
    306         smtp.quit()
    307 
    308     def testSend(self):
    309         # connect and send mail
    310         m = 'A test message'
    311         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    312         smtp.sendmail('John', 'Sally', m)
    313         # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
    314         # in asyncore.  This sleep might help, but should really be fixed
    315         # properly by using an Event variable.
    316         time.sleep(0.01)
    317         smtp.quit()
    318 
    319         self.client_evt.set()
    320         self.serv_evt.wait()
    321         self.output.flush()
    322         mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
    323         self.assertEqual(self.output.getvalue(), mexpect)
    324 
    325     def testSendBinary(self):
    326         m = b'A test message'
    327         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    328         smtp.sendmail('John', 'Sally', m)
    329         # XXX (see comment in testSend)
    330         time.sleep(0.01)
    331         smtp.quit()
    332 
    333         self.client_evt.set()
    334         self.serv_evt.wait()
    335         self.output.flush()
    336         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
    337         self.assertEqual(self.output.getvalue(), mexpect)
    338 
    339     def testSendNeedingDotQuote(self):
    340         # Issue 12283
    341         m = '.A test\n.mes.sage.'
    342         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    343         smtp.sendmail('John', 'Sally', m)
    344         # XXX (see comment in testSend)
    345         time.sleep(0.01)
    346         smtp.quit()
    347 
    348         self.client_evt.set()
    349         self.serv_evt.wait()
    350         self.output.flush()
    351         mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
    352         self.assertEqual(self.output.getvalue(), mexpect)
    353 
    354     def testSendNullSender(self):
    355         m = 'A test message'
    356         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    357         smtp.sendmail('<>', 'Sally', m)
    358         # XXX (see comment in testSend)
    359         time.sleep(0.01)
    360         smtp.quit()
    361 
    362         self.client_evt.set()
    363         self.serv_evt.wait()
    364         self.output.flush()
    365         mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
    366         self.assertEqual(self.output.getvalue(), mexpect)
    367         debugout = smtpd.DEBUGSTREAM.getvalue()
    368         sender = re.compile("^sender: <>$", re.MULTILINE)
    369         self.assertRegex(debugout, sender)
    370 
    371     def testSendMessage(self):
    372         m = email.mime.text.MIMEText('A test message')
    373         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    374         smtp.send_message(m, from_addr='John', to_addrs='Sally')
    375         # XXX (see comment in testSend)
    376         time.sleep(0.01)
    377         smtp.quit()
    378 
    379         self.client_evt.set()
    380         self.serv_evt.wait()
    381         self.output.flush()
    382         # Remove the X-Peer header that DebuggingServer adds as figuring out
    383         # exactly what IP address format is put there is not easy (and
    384         # irrelevant to our test).  Typically 127.0.0.1 or ::1, but it is
    385         # not always the same as socket.gethostbyname(HOST). :(
    386         test_output = self.get_output_without_xpeer()
    387         del m['X-Peer']
    388         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
    389         self.assertEqual(test_output, mexpect)
    390 
    391     def testSendMessageWithAddresses(self):
    392         m = email.mime.text.MIMEText('A test message')
    393         m['From'] = 'foo (at] bar.com'
    394         m['To'] = 'John'
    395         m['CC'] = 'Sally, Fred'
    396         m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped (at] silly.walks.com>'
    397         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    398         smtp.send_message(m)
    399         # XXX (see comment in testSend)
    400         time.sleep(0.01)
    401         smtp.quit()
    402         # make sure the Bcc header is still in the message.
    403         self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
    404                                     '<warped (at] silly.walks.com>')
    405 
    406         self.client_evt.set()
    407         self.serv_evt.wait()
    408         self.output.flush()
    409         # Remove the X-Peer header that DebuggingServer adds.
    410         test_output = self.get_output_without_xpeer()
    411         del m['X-Peer']
    412         # The Bcc header should not be transmitted.
    413         del m['Bcc']
    414         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
    415         self.assertEqual(test_output, mexpect)
    416         debugout = smtpd.DEBUGSTREAM.getvalue()
    417         sender = re.compile("^sender: foo (at] bar.com$", re.MULTILINE)
    418         self.assertRegex(debugout, sender)
    419         for addr in ('John', 'Sally', 'Fred', 'root@localhost',
    420                      'warped (at] silly.walks.com'):
    421             to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
    422                                  re.MULTILINE)
    423             self.assertRegex(debugout, to_addr)
    424 
    425     def testSendMessageWithSomeAddresses(self):
    426         # Make sure nothing breaks if not all of the three 'to' headers exist
    427         m = email.mime.text.MIMEText('A test message')
    428         m['From'] = 'foo (at] bar.com'
    429         m['To'] = 'John, Dinsdale'
    430         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    431         smtp.send_message(m)
    432         # XXX (see comment in testSend)
    433         time.sleep(0.01)
    434         smtp.quit()
    435 
    436         self.client_evt.set()
    437         self.serv_evt.wait()
    438         self.output.flush()
    439         # Remove the X-Peer header that DebuggingServer adds.
    440         test_output = self.get_output_without_xpeer()
    441         del m['X-Peer']
    442         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
    443         self.assertEqual(test_output, mexpect)
    444         debugout = smtpd.DEBUGSTREAM.getvalue()
    445         sender = re.compile("^sender: foo (at] bar.com$", re.MULTILINE)
    446         self.assertRegex(debugout, sender)
    447         for addr in ('John', 'Dinsdale'):
    448             to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
    449                                  re.MULTILINE)
    450             self.assertRegex(debugout, to_addr)
    451 
    452     def testSendMessageWithSpecifiedAddresses(self):
    453         # Make sure addresses specified in call override those in message.
    454         m = email.mime.text.MIMEText('A test message')
    455         m['From'] = 'foo (at] bar.com'
    456         m['To'] = 'John, Dinsdale'
    457         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    458         smtp.send_message(m, from_addr='joe (at] example.com', to_addrs='foo (at] example.net')
    459         # XXX (see comment in testSend)
    460         time.sleep(0.01)
    461         smtp.quit()
    462 
    463         self.client_evt.set()
    464         self.serv_evt.wait()
    465         self.output.flush()
    466         # Remove the X-Peer header that DebuggingServer adds.
    467         test_output = self.get_output_without_xpeer()
    468         del m['X-Peer']
    469         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
    470         self.assertEqual(test_output, mexpect)
    471         debugout = smtpd.DEBUGSTREAM.getvalue()
    472         sender = re.compile("^sender: joe (at] example.com$", re.MULTILINE)
    473         self.assertRegex(debugout, sender)
    474         for addr in ('John', 'Dinsdale'):
    475             to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
    476                                  re.MULTILINE)
    477             self.assertNotRegex(debugout, to_addr)
    478         recip = re.compile(r"^recips: .*'foo (at] example.net'.*$", re.MULTILINE)
    479         self.assertRegex(debugout, recip)
    480 
    481     def testSendMessageWithMultipleFrom(self):
    482         # Sender overrides To
    483         m = email.mime.text.MIMEText('A test message')
    484         m['From'] = 'Bernard, Bianca'
    485         m['Sender'] = 'the_rescuers (at] Rescue-Aid-Society.com'
    486         m['To'] = 'John, Dinsdale'
    487         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    488         smtp.send_message(m)
    489         # XXX (see comment in testSend)
    490         time.sleep(0.01)
    491         smtp.quit()
    492 
    493         self.client_evt.set()
    494         self.serv_evt.wait()
    495         self.output.flush()
    496         # Remove the X-Peer header that DebuggingServer adds.
    497         test_output = self.get_output_without_xpeer()
    498         del m['X-Peer']
    499         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
    500         self.assertEqual(test_output, mexpect)
    501         debugout = smtpd.DEBUGSTREAM.getvalue()
    502         sender = re.compile("^sender: the_rescuers (at] Rescue-Aid-Society.com$", re.MULTILINE)
    503         self.assertRegex(debugout, sender)
    504         for addr in ('John', 'Dinsdale'):
    505             to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
    506                                  re.MULTILINE)
    507             self.assertRegex(debugout, to_addr)
    508 
    509     def testSendMessageResent(self):
    510         m = email.mime.text.MIMEText('A test message')
    511         m['From'] = 'foo (at] bar.com'
    512         m['To'] = 'John'
    513         m['CC'] = 'Sally, Fred'
    514         m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped (at] silly.walks.com>'
    515         m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
    516         m['Resent-From'] = 'holy (at] grail.net'
    517         m['Resent-To'] = 'Martha <my_mom (at] great.cooker.com>, Jeff'
    518         m['Resent-Bcc'] = 'doe (at] losthope.net'
    519         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    520         smtp.send_message(m)
    521         # XXX (see comment in testSend)
    522         time.sleep(0.01)
    523         smtp.quit()
    524 
    525         self.client_evt.set()
    526         self.serv_evt.wait()
    527         self.output.flush()
    528         # The Resent-Bcc headers are deleted before serialization.
    529         del m['Bcc']
    530         del m['Resent-Bcc']
    531         # Remove the X-Peer header that DebuggingServer adds.
    532         test_output = self.get_output_without_xpeer()
    533         del m['X-Peer']
    534         mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
    535         self.assertEqual(test_output, mexpect)
    536         debugout = smtpd.DEBUGSTREAM.getvalue()
    537         sender = re.compile("^sender: holy (at] grail.net$", re.MULTILINE)
    538         self.assertRegex(debugout, sender)
    539         for addr in ('my_mom (at] great.cooker.com', 'Jeff', 'doe (at] losthope.net'):
    540             to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
    541                                  re.MULTILINE)
    542             self.assertRegex(debugout, to_addr)
    543 
    544     def testSendMessageMultipleResentRaises(self):
    545         m = email.mime.text.MIMEText('A test message')
    546         m['From'] = 'foo (at] bar.com'
    547         m['To'] = 'John'
    548         m['CC'] = 'Sally, Fred'
    549         m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped (at] silly.walks.com>'
    550         m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
    551         m['Resent-From'] = 'holy (at] grail.net'
    552         m['Resent-To'] = 'Martha <my_mom (at] great.cooker.com>, Jeff'
    553         m['Resent-Bcc'] = 'doe (at] losthope.net'
    554         m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
    555         m['Resent-To'] = 'holy (at] grail.net'
    556         m['Resent-From'] = 'Martha <my_mom (at] great.cooker.com>, Jeff'
    557         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
    558         with self.assertRaises(ValueError):
    559             smtp.send_message(m)
    560         smtp.close()
    561 
    562 class NonConnectingTests(unittest.TestCase):
    563 
    564     def testNotConnected(self):
    565         # Test various operations on an unconnected SMTP object that
    566         # should raise exceptions (at present the attempt in SMTP.send
    567         # to reference the nonexistent 'sock' attribute of the SMTP object
    568         # causes an AttributeError)
    569         smtp = smtplib.SMTP()
    570         self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
    571         self.assertRaises(smtplib.SMTPServerDisconnected,
    572                           smtp.send, 'test msg')
    573 
    574     def testNonnumericPort(self):
    575         # check that non-numeric port raises OSError
    576         self.assertRaises(OSError, smtplib.SMTP,
    577                           "localhost", "bogus")
    578         self.assertRaises(OSError, smtplib.SMTP,
    579                           "localhost:bogus")
    580 
    581 
    582 class DefaultArgumentsTests(unittest.TestCase):
    583 
    584     def setUp(self):
    585         self.msg = EmailMessage()
    586         self.msg['From'] = 'Polo <fo (at] bar.com>'
    587         self.smtp = smtplib.SMTP()
    588         self.smtp.ehlo = Mock(return_value=(200, 'OK'))
    589         self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
    590 
    591     def testSendMessage(self):
    592         expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
    593         self.smtp.send_message(self.msg)
    594         self.smtp.send_message(self.msg)
    595         self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
    596                          expected_mail_options)
    597         self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
    598                          expected_mail_options)
    599 
    600     def testSendMessageWithMailOptions(self):
    601         mail_options = ['STARTTLS']
    602         expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
    603         self.smtp.send_message(self.msg, None, None, mail_options)
    604         self.assertEqual(mail_options, ['STARTTLS'])
    605         self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
    606                          expected_mail_options)
    607 
    608 
    609 # test response of client to a non-successful HELO message
    610 class BadHELOServerTests(unittest.TestCase):
    611 
    612     def setUp(self):
    613         smtplib.socket = mock_socket
    614         mock_socket.reply_with(b"199 no hello for you!")
    615         self.old_stdout = sys.stdout
    616         self.output = io.StringIO()
    617         sys.stdout = self.output
    618         self.port = 25
    619 
    620     def tearDown(self):
    621         smtplib.socket = socket
    622         sys.stdout = self.old_stdout
    623 
    624     def testFailingHELO(self):
    625         self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
    626                             HOST, self.port, 'localhost', 3)
    627 
    628 
    629 class TooLongLineTests(unittest.TestCase):
    630     respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
    631 
    632     def setUp(self):
    633         self.old_stdout = sys.stdout
    634         self.output = io.StringIO()
    635         sys.stdout = self.output
    636 
    637         self.evt = threading.Event()
    638         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    639         self.sock.settimeout(15)
    640         self.port = support.bind_port(self.sock)
    641         servargs = (self.evt, self.respdata, self.sock)
    642         thread = threading.Thread(target=server, args=servargs)
    643         thread.start()
    644         self.addCleanup(thread.join)
    645         self.evt.wait()
    646         self.evt.clear()
    647 
    648     def tearDown(self):
    649         self.evt.wait()
    650         sys.stdout = self.old_stdout
    651 
    652     def testLineTooLong(self):
    653         self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
    654                           HOST, self.port, 'localhost', 3)
    655 
    656 
    657 sim_users = {'Mr.A (at] somewhere.com':'John A',
    658              'Ms.B (at] xn--fo-fka.com':'Sally B',
    659              'Mrs.C (at] somewhereesle.com':'Ruth C',
    660             }
    661 
    662 sim_auth = ('Mr.A (at] somewhere.com', 'somepassword')
    663 sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
    664                           'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
    665 sim_lists = {'list-1':['Mr.A (at] somewhere.com','Mrs.C (at] somewhereesle.com'],
    666              'list-2':['Ms.B (at] xn--fo-fka.com',],
    667             }
    668 
    669 # Simulated SMTP channel & server
    670 class ResponseException(Exception): pass
    671 class SimSMTPChannel(smtpd.SMTPChannel):
    672 
    673     quit_response = None
    674     mail_response = None
    675     rcpt_response = None
    676     data_response = None
    677     rcpt_count = 0
    678     rset_count = 0
    679     disconnect = 0
    680     AUTH = 99    # Add protocol state to enable auth testing.
    681     authenticated_user = None
    682 
    683     def __init__(self, extra_features, *args, **kw):
    684         self._extrafeatures = ''.join(
    685             [ "250-{0}\r\n".format(x) for x in extra_features ])
    686         super(SimSMTPChannel, self).__init__(*args, **kw)
    687 
    688     # AUTH related stuff.  It would be nice if support for this were in smtpd.
    689     def found_terminator(self):
    690         if self.smtp_state == self.AUTH:
    691             line = self._emptystring.join(self.received_lines)
    692             print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
    693             self.received_lines = []
    694             try:
    695                 self.auth_object(line)
    696             except ResponseException as e:
    697                 self.smtp_state = self.COMMAND
    698                 self.push('%s %s' % (e.smtp_code, e.smtp_error))
    699                 return
    700         super().found_terminator()
    701 
    702 
    703     def smtp_AUTH(self, arg):
    704         if not self.seen_greeting:
    705             self.push('503 Error: send EHLO first')
    706             return
    707         if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
    708             self.push('500 Error: command "AUTH" not recognized')
    709             return
    710         if self.authenticated_user is not None:
    711             self.push(
    712                 '503 Bad sequence of commands: already authenticated')
    713             return
    714         args = arg.split()
    715         if len(args) not in [1, 2]:
    716             self.push('501 Syntax: AUTH <mechanism> [initial-response]')
    717             return
    718         auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
    719         try:
    720             self.auth_object = getattr(self, auth_object_name)
    721         except AttributeError:
    722             self.push('504 Command parameter not implemented: unsupported '
    723                       ' authentication mechanism {!r}'.format(auth_object_name))
    724             return
    725         self.smtp_state = self.AUTH
    726         self.auth_object(args[1] if len(args) == 2 else None)
    727 
    728     def _authenticated(self, user, valid):
    729         if valid:
    730             self.authenticated_user = user
    731             self.push('235 Authentication Succeeded')
    732         else:
    733             self.push('535 Authentication credentials invalid')
    734         self.smtp_state = self.COMMAND
    735 
    736     def _decode_base64(self, string):
    737         return base64.decodebytes(string.encode('ascii')).decode('utf-8')
    738 
    739     def _auth_plain(self, arg=None):
    740         if arg is None:
    741             self.push('334 ')
    742         else:
    743             logpass = self._decode_base64(arg)
    744             try:
    745                 *_, user, password = logpass.split('\0')
    746             except ValueError as e:
    747                 self.push('535 Splitting response {!r} into user and password'
    748                           ' failed: {}'.format(logpass, e))
    749                 return
    750             self._authenticated(user, password == sim_auth[1])
    751 
    752     def _auth_login(self, arg=None):
    753         if arg is None:
    754             # base64 encoded 'Username:'
    755             self.push('334 VXNlcm5hbWU6')
    756         elif not hasattr(self, '_auth_login_user'):
    757             self._auth_login_user = self._decode_base64(arg)
    758             # base64 encoded 'Password:'
    759             self.push('334 UGFzc3dvcmQ6')
    760         else:
    761             password = self._decode_base64(arg)
    762             self._authenticated(self._auth_login_user, password == sim_auth[1])
    763             del self._auth_login_user
    764 
    765     def _auth_cram_md5(self, arg=None):
    766         if arg is None:
    767             self.push('334 {}'.format(sim_cram_md5_challenge))
    768         else:
    769             logpass = self._decode_base64(arg)
    770             try:
    771                 user, hashed_pass = logpass.split()
    772             except ValueError as e:
    773                 self.push('535 Splitting response {!r} into user and password '
    774                           'failed: {}'.format(logpass, e))
    775                 return False
    776             valid_hashed_pass = hmac.HMAC(
    777                 sim_auth[1].encode('ascii'),
    778                 self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
    779                 'md5').hexdigest()
    780             self._authenticated(user, hashed_pass == valid_hashed_pass)
    781     # end AUTH related stuff.
    782 
    783     def smtp_EHLO(self, arg):
    784         resp = ('250-testhost\r\n'
    785                 '250-EXPN\r\n'
    786                 '250-SIZE 20000000\r\n'
    787                 '250-STARTTLS\r\n'
    788                 '250-DELIVERBY\r\n')
    789         resp = resp + self._extrafeatures + '250 HELP'
    790         self.push(resp)
    791         self.seen_greeting = arg
    792         self.extended_smtp = True
    793 
    794     def smtp_VRFY(self, arg):
    795         # For max compatibility smtplib should be sending the raw address.
    796         if arg in sim_users:
    797             self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
    798         else:
    799             self.push('550 No such user: %s' % arg)
    800 
    801     def smtp_EXPN(self, arg):
    802         list_name = arg.lower()
    803         if list_name in sim_lists:
    804             user_list = sim_lists[list_name]
    805             for n, user_email in enumerate(user_list):
    806                 quoted_addr = smtplib.quoteaddr(user_email)
    807                 if n < len(user_list) - 1:
    808                     self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
    809                 else:
    810                     self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
    811         else:
    812             self.push('550 No access for you!')
    813 
    814     def smtp_QUIT(self, arg):
    815         if self.quit_response is None:
    816             super(SimSMTPChannel, self).smtp_QUIT(arg)
    817         else:
    818             self.push(self.quit_response)
    819             self.close_when_done()
    820 
    821     def smtp_MAIL(self, arg):
    822         if self.mail_response is None:
    823             super().smtp_MAIL(arg)
    824         else:
    825             self.push(self.mail_response)
    826             if self.disconnect:
    827                 self.close_when_done()
    828 
    829     def smtp_RCPT(self, arg):
    830         if self.rcpt_response is None:
    831             super().smtp_RCPT(arg)
    832             return
    833         self.rcpt_count += 1
    834         self.push(self.rcpt_response[self.rcpt_count-1])
    835 
    836     def smtp_RSET(self, arg):
    837         self.rset_count += 1
    838         super().smtp_RSET(arg)
    839 
    840     def smtp_DATA(self, arg):
    841         if self.data_response is None:
    842             super().smtp_DATA(arg)
    843         else:
    844             self.push(self.data_response)
    845 
    846     def handle_error(self):
    847         raise
    848 
    849 
    850 class SimSMTPServer(smtpd.SMTPServer):
    851 
    852     channel_class = SimSMTPChannel
    853 
    854     def __init__(self, *args, **kw):
    855         self._extra_features = []
    856         self._addresses = {}
    857         smtpd.SMTPServer.__init__(self, *args, **kw)
    858 
    859     def handle_accepted(self, conn, addr):
    860         self._SMTPchannel = self.channel_class(
    861             self._extra_features, self, conn, addr,
    862             decode_data=self._decode_data)
    863 
    864     def process_message(self, peer, mailfrom, rcpttos, data):
    865         self._addresses['from'] = mailfrom
    866         self._addresses['tos'] = rcpttos
    867 
    868     def add_feature(self, feature):
    869         self._extra_features.append(feature)
    870 
    871     def handle_error(self):
    872         raise
    873 
    874 
    875 # Test various SMTP & ESMTP commands/behaviors that require a simulated server
    876 # (i.e., something with more features than DebuggingServer)
    877 class SMTPSimTests(unittest.TestCase):
    878 
    879     def setUp(self):
    880         self.real_getfqdn = socket.getfqdn
    881         socket.getfqdn = mock_socket.getfqdn
    882         self.serv_evt = threading.Event()
    883         self.client_evt = threading.Event()
    884         # Pick a random unused port by passing 0 for the port number
    885         self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
    886         # Keep a note of what port was assigned
    887         self.port = self.serv.socket.getsockname()[1]
    888         serv_args = (self.serv, self.serv_evt, self.client_evt)
    889         self.thread = threading.Thread(target=debugging_server, args=serv_args)
    890         self.thread.start()
    891 
    892         # wait until server thread has assigned a port number
    893         self.serv_evt.wait()
    894         self.serv_evt.clear()
    895 
    896     def tearDown(self):
    897         socket.getfqdn = self.real_getfqdn
    898         # indicate that the client is finished
    899         self.client_evt.set()
    900         # wait for the server thread to terminate
    901         self.serv_evt.wait()
    902         self.thread.join()
    903 
    904     def testBasic(self):
    905         # smoke test
    906         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    907         smtp.quit()
    908 
    909     def testEHLO(self):
    910         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    911 
    912         # no features should be present before the EHLO
    913         self.assertEqual(smtp.esmtp_features, {})
    914 
    915         # features expected from the test server
    916         expected_features = {'expn':'',
    917                              'size': '20000000',
    918                              'starttls': '',
    919                              'deliverby': '',
    920                              'help': '',
    921                              }
    922 
    923         smtp.ehlo()
    924         self.assertEqual(smtp.esmtp_features, expected_features)
    925         for k in expected_features:
    926             self.assertTrue(smtp.has_extn(k))
    927         self.assertFalse(smtp.has_extn('unsupported-feature'))
    928         smtp.quit()
    929 
    930     def testVRFY(self):
    931         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    932 
    933         for addr_spec, name in sim_users.items():
    934             expected_known = (250, bytes('%s %s' %
    935                                          (name, smtplib.quoteaddr(addr_spec)),
    936                                          "ascii"))
    937             self.assertEqual(smtp.vrfy(addr_spec), expected_known)
    938 
    939         u = 'nobody (at] nowhere.com'
    940         expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
    941         self.assertEqual(smtp.vrfy(u), expected_unknown)
    942         smtp.quit()
    943 
    944     def testEXPN(self):
    945         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    946 
    947         for listname, members in sim_lists.items():
    948             users = []
    949             for m in members:
    950                 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
    951             expected_known = (250, bytes('\n'.join(users), "ascii"))
    952             self.assertEqual(smtp.expn(listname), expected_known)
    953 
    954         u = 'PSU-Members-List'
    955         expected_unknown = (550, b'No access for you!')
    956         self.assertEqual(smtp.expn(u), expected_unknown)
    957         smtp.quit()
    958 
    959     def testAUTH_PLAIN(self):
    960         self.serv.add_feature("AUTH PLAIN")
    961         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    962         resp = smtp.login(sim_auth[0], sim_auth[1])
    963         self.assertEqual(resp, (235, b'Authentication Succeeded'))
    964         smtp.close()
    965 
    966     def testAUTH_LOGIN(self):
    967         self.serv.add_feature("AUTH LOGIN")
    968         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    969         resp = smtp.login(sim_auth[0], sim_auth[1])
    970         self.assertEqual(resp, (235, b'Authentication Succeeded'))
    971         smtp.close()
    972 
    973     def testAUTH_CRAM_MD5(self):
    974         self.serv.add_feature("AUTH CRAM-MD5")
    975         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    976         resp = smtp.login(sim_auth[0], sim_auth[1])
    977         self.assertEqual(resp, (235, b'Authentication Succeeded'))
    978         smtp.close()
    979 
    980     def testAUTH_multiple(self):
    981         # Test that multiple authentication methods are tried.
    982         self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
    983         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
    984         resp = smtp.login(sim_auth[0], sim_auth[1])
    985         self.assertEqual(resp, (235, b'Authentication Succeeded'))
    986         smtp.close()
    987 
    988     def test_auth_function(self):
    989         supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'}
    990         for mechanism in supported:
    991             self.serv.add_feature("AUTH {}".format(mechanism))
    992         for mechanism in supported:
    993             with self.subTest(mechanism=mechanism):
    994                 smtp = smtplib.SMTP(HOST, self.port,
    995                                     local_hostname='localhost', timeout=15)
    996                 smtp.ehlo('foo')
    997                 smtp.user, smtp.password = sim_auth[0], sim_auth[1]
    998                 method = 'auth_' + mechanism.lower().replace('-', '_')
    999                 resp = smtp.auth(mechanism, getattr(smtp, method))
   1000                 self.assertEqual(resp, (235, b'Authentication Succeeded'))
   1001                 smtp.close()
   1002 
   1003     def test_quit_resets_greeting(self):
   1004         smtp = smtplib.SMTP(HOST, self.port,
   1005                             local_hostname='localhost',
   1006                             timeout=15)
   1007         code, message = smtp.ehlo()
   1008         self.assertEqual(code, 250)
   1009         self.assertIn('size', smtp.esmtp_features)
   1010         smtp.quit()
   1011         self.assertNotIn('size', smtp.esmtp_features)
   1012         smtp.connect(HOST, self.port)
   1013         self.assertNotIn('size', smtp.esmtp_features)
   1014         smtp.ehlo_or_helo_if_needed()
   1015         self.assertIn('size', smtp.esmtp_features)
   1016         smtp.quit()
   1017 
   1018     def test_with_statement(self):
   1019         with smtplib.SMTP(HOST, self.port) as smtp:
   1020             code, message = smtp.noop()
   1021             self.assertEqual(code, 250)
   1022         self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
   1023         with smtplib.SMTP(HOST, self.port) as smtp:
   1024             smtp.close()
   1025         self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
   1026 
   1027     def test_with_statement_QUIT_failure(self):
   1028         with self.assertRaises(smtplib.SMTPResponseException) as error:
   1029             with smtplib.SMTP(HOST, self.port) as smtp:
   1030                 smtp.noop()
   1031                 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
   1032         self.assertEqual(error.exception.smtp_code, 421)
   1033         self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
   1034 
   1035     #TODO: add tests for correct AUTH method fallback now that the
   1036     #test infrastructure can support it.
   1037 
   1038     # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
   1039     def test__rest_from_mail_cmd(self):
   1040         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
   1041         smtp.noop()
   1042         self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
   1043         self.serv._SMTPchannel.disconnect = True
   1044         with self.assertRaises(smtplib.SMTPSenderRefused):
   1045             smtp.sendmail('John', 'Sally', 'test message')
   1046         self.assertIsNone(smtp.sock)
   1047 
   1048     # Issue 5713: make sure close, not rset, is called if we get a 421 error
   1049     def test_421_from_mail_cmd(self):
   1050         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
   1051         smtp.noop()
   1052         self.serv._SMTPchannel.mail_response = '421 closing connection'
   1053         with self.assertRaises(smtplib.SMTPSenderRefused):
   1054             smtp.sendmail('John', 'Sally', 'test message')
   1055         self.assertIsNone(smtp.sock)
   1056         self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
   1057 
   1058     def test_421_from_rcpt_cmd(self):
   1059         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
   1060         smtp.noop()
   1061         self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
   1062         with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
   1063             smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
   1064         self.assertIsNone(smtp.sock)
   1065         self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
   1066         self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
   1067 
   1068     def test_421_from_data_cmd(self):
   1069         class MySimSMTPChannel(SimSMTPChannel):
   1070             def found_terminator(self):
   1071                 if self.smtp_state == self.DATA:
   1072                     self.push('421 closing')
   1073                 else:
   1074                     super().found_terminator()
   1075         self.serv.channel_class = MySimSMTPChannel
   1076         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
   1077         smtp.noop()
   1078         with self.assertRaises(smtplib.SMTPDataError):
   1079             smtp.sendmail('John (at] foo.org', ['Sally (at] foo.org'], 'test message')
   1080         self.assertIsNone(smtp.sock)
   1081         self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
   1082 
   1083     def test_smtputf8_NotSupportedError_if_no_server_support(self):
   1084         smtp = smtplib.SMTP(
   1085             HOST, self.port, local_hostname='localhost', timeout=3)
   1086         self.addCleanup(smtp.close)
   1087         smtp.ehlo()
   1088         self.assertTrue(smtp.does_esmtp)
   1089         self.assertFalse(smtp.has_extn('smtputf8'))
   1090         self.assertRaises(
   1091             smtplib.SMTPNotSupportedError,
   1092             smtp.sendmail,
   1093             'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
   1094         self.assertRaises(
   1095             smtplib.SMTPNotSupportedError,
   1096             smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
   1097 
   1098     def test_send_unicode_without_SMTPUTF8(self):
   1099         smtp = smtplib.SMTP(
   1100             HOST, self.port, local_hostname='localhost', timeout=3)
   1101         self.addCleanup(smtp.close)
   1102         self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Bb', '')
   1103         self.assertRaises(UnicodeEncodeError, smtp.mail, 'lice')
   1104 
   1105     def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
   1106         # This test is located here and not in the SMTPUTF8SimTests
   1107         # class because it needs a "regular" SMTP server to work
   1108         msg = EmailMessage()
   1109         msg['From'] = "Polo <fo (at] bar.com>"
   1110         msg['To'] = 'Dinsdale'
   1111         msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
   1112         smtp = smtplib.SMTP(
   1113             HOST, self.port, local_hostname='localhost', timeout=3)
   1114         self.addCleanup(smtp.close)
   1115         with self.assertRaises(smtplib.SMTPNotSupportedError):
   1116             smtp.send_message(msg)
   1117 
   1118     def test_name_field_not_included_in_envelop_addresses(self):
   1119         smtp = smtplib.SMTP(
   1120             HOST, self.port, local_hostname='localhost', timeout=3
   1121         )
   1122         self.addCleanup(smtp.close)
   1123 
   1124         message = EmailMessage()
   1125         message['From'] = email.utils.formataddr(('Michal', 'michael (at] example.com'))
   1126         message['To'] = email.utils.formataddr(('Ren', 'rene (at] example.com'))
   1127 
   1128         self.assertDictEqual(smtp.send_message(message), {})
   1129 
   1130         self.assertEqual(self.serv._addresses['from'], 'michael (at] example.com')
   1131         self.assertEqual(self.serv._addresses['tos'], ['rene (at] example.com'])
   1132 
   1133 
   1134 class SimSMTPUTF8Server(SimSMTPServer):
   1135 
   1136     def __init__(self, *args, **kw):
   1137         # The base SMTP server turns these on automatically, but our test
   1138         # server is set up to munge the EHLO response, so we need to provide
   1139         # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
   1140         self._extra_features = ['SMTPUTF8', '8BITMIME']
   1141         smtpd.SMTPServer.__init__(self, *args, **kw)
   1142 
   1143     def handle_accepted(self, conn, addr):
   1144         self._SMTPchannel = self.channel_class(
   1145             self._extra_features, self, conn, addr,
   1146             decode_data=self._decode_data,
   1147             enable_SMTPUTF8=self.enable_SMTPUTF8,
   1148         )
   1149 
   1150     def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
   1151                                                              rcpt_options=None):
   1152         self.last_peer = peer
   1153         self.last_mailfrom = mailfrom
   1154         self.last_rcpttos = rcpttos
   1155         self.last_message = data
   1156         self.last_mail_options = mail_options
   1157         self.last_rcpt_options = rcpt_options
   1158 
   1159 
   1160 class SMTPUTF8SimTests(unittest.TestCase):
   1161 
   1162     maxDiff = None
   1163 
   1164     def setUp(self):
   1165         self.real_getfqdn = socket.getfqdn
   1166         socket.getfqdn = mock_socket.getfqdn
   1167         self.serv_evt = threading.Event()
   1168         self.client_evt = threading.Event()
   1169         # Pick a random unused port by passing 0 for the port number
   1170         self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
   1171                                       decode_data=False,
   1172                                       enable_SMTPUTF8=True)
   1173         # Keep a note of what port was assigned
   1174         self.port = self.serv.socket.getsockname()[1]
   1175         serv_args = (self.serv, self.serv_evt, self.client_evt)
   1176         self.thread = threading.Thread(target=debugging_server, args=serv_args)
   1177         self.thread.start()
   1178 
   1179         # wait until server thread has assigned a port number
   1180         self.serv_evt.wait()
   1181         self.serv_evt.clear()
   1182 
   1183     def tearDown(self):
   1184         socket.getfqdn = self.real_getfqdn
   1185         # indicate that the client is finished
   1186         self.client_evt.set()
   1187         # wait for the server thread to terminate
   1188         self.serv_evt.wait()
   1189         self.thread.join()
   1190 
   1191     def test_test_server_supports_extensions(self):
   1192         smtp = smtplib.SMTP(
   1193             HOST, self.port, local_hostname='localhost', timeout=3)
   1194         self.addCleanup(smtp.close)
   1195         smtp.ehlo()
   1196         self.assertTrue(smtp.does_esmtp)
   1197         self.assertTrue(smtp.has_extn('smtputf8'))
   1198 
   1199     def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
   1200         m = 'a test message containing unicode!'.encode('utf-8')
   1201         smtp = smtplib.SMTP(
   1202             HOST, self.port, local_hostname='localhost', timeout=3)
   1203         self.addCleanup(smtp.close)
   1204         smtp.sendmail('Jhn', 'Slly', m,
   1205                       mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
   1206         self.assertEqual(self.serv.last_mailfrom, 'Jhn')
   1207         self.assertEqual(self.serv.last_rcpttos, ['Slly'])
   1208         self.assertEqual(self.serv.last_message, m)
   1209         self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
   1210         self.assertIn('SMTPUTF8', self.serv.last_mail_options)
   1211         self.assertEqual(self.serv.last_rcpt_options, [])
   1212 
   1213     def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
   1214         m = 'a test message containing unicode!'.encode('utf-8')
   1215         smtp = smtplib.SMTP(
   1216             HOST, self.port, local_hostname='localhost', timeout=3)
   1217         self.addCleanup(smtp.close)
   1218         smtp.ehlo()
   1219         self.assertEqual(
   1220             smtp.mail('J', options=['BODY=8BITMIME', 'SMTPUTF8']),
   1221             (250, b'OK'))
   1222         self.assertEqual(smtp.rcpt('Jnos'), (250, b'OK'))
   1223         self.assertEqual(smtp.data(m), (250, b'OK'))
   1224         self.assertEqual(self.serv.last_mailfrom, 'J')
   1225         self.assertEqual(self.serv.last_rcpttos, ['Jnos'])
   1226         self.assertEqual(self.serv.last_message, m)
   1227         self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
   1228         self.assertIn('SMTPUTF8', self.serv.last_mail_options)
   1229         self.assertEqual(self.serv.last_rcpt_options, [])
   1230 
   1231     def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
   1232         msg = EmailMessage()
   1233         msg['From'] = "Polo <fo (at] bar.com>"
   1234         msg['To'] = 'Dinsdale'
   1235         msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
   1236         # XXX I don't know why I need two \n's here, but this is an existing
   1237         # bug (if it is one) and not a problem with the new functionality.
   1238         msg.set_content("oh l l, know what I mean, know what I mean?\n\n")
   1239         # XXX smtpd converts received /r/n to /n, so we can't easily test that
   1240         # we are successfully sending /r/n :(.
   1241         expected = textwrap.dedent("""\
   1242             From: Polo <fo (at] bar.com>
   1243             To: Dinsdale
   1244             Subject: Nudge nudge, wink, wink \u1F609
   1245             Content-Type: text/plain; charset="utf-8"
   1246             Content-Transfer-Encoding: 8bit
   1247             MIME-Version: 1.0
   1248 
   1249             oh l l, know what I mean, know what I mean?
   1250             """)
   1251         smtp = smtplib.SMTP(
   1252             HOST, self.port, local_hostname='localhost', timeout=3)
   1253         self.addCleanup(smtp.close)
   1254         self.assertEqual(smtp.send_message(msg), {})
   1255         self.assertEqual(self.serv.last_mailfrom, 'fo (at] bar.com')
   1256         self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
   1257         self.assertEqual(self.serv.last_message.decode(), expected)
   1258         self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
   1259         self.assertIn('SMTPUTF8', self.serv.last_mail_options)
   1260         self.assertEqual(self.serv.last_rcpt_options, [])
   1261 
   1262 
   1263 EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
   1264 
   1265 class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
   1266     def smtp_AUTH(self, arg):
   1267         # RFC 4954's AUTH command allows for an optional initial-response.
   1268         # Not all AUTH methods support this; some require a challenge.  AUTH
   1269         # PLAIN does those, so test that here.  See issue #15014.
   1270         args = arg.split()
   1271         if args[0].lower() == 'plain':
   1272             if len(args) == 2:
   1273                 # AUTH PLAIN <initial-response> with the response base 64
   1274                 # encoded.  Hard code the expected response for the test.
   1275                 if args[1] == EXPECTED_RESPONSE:
   1276                     self.push('235 Ok')
   1277                     return
   1278         self.push('571 Bad authentication')
   1279 
   1280 class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
   1281     channel_class = SimSMTPAUTHInitialResponseChannel
   1282 
   1283 
   1284 class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
   1285     def setUp(self):
   1286         self.real_getfqdn = socket.getfqdn
   1287         socket.getfqdn = mock_socket.getfqdn
   1288         self.serv_evt = threading.Event()
   1289         self.client_evt = threading.Event()
   1290         # Pick a random unused port by passing 0 for the port number
   1291         self.serv = SimSMTPAUTHInitialResponseServer(
   1292             (HOST, 0), ('nowhere', -1), decode_data=True)
   1293         # Keep a note of what port was assigned
   1294         self.port = self.serv.socket.getsockname()[1]
   1295         serv_args = (self.serv, self.serv_evt, self.client_evt)
   1296         self.thread = threading.Thread(target=debugging_server, args=serv_args)
   1297         self.thread.start()
   1298 
   1299         # wait until server thread has assigned a port number
   1300         self.serv_evt.wait()
   1301         self.serv_evt.clear()
   1302 
   1303     def tearDown(self):
   1304         socket.getfqdn = self.real_getfqdn
   1305         # indicate that the client is finished
   1306         self.client_evt.set()
   1307         # wait for the server thread to terminate
   1308         self.serv_evt.wait()
   1309         self.thread.join()
   1310 
   1311     def testAUTH_PLAIN_initial_response_login(self):
   1312         self.serv.add_feature('AUTH PLAIN')
   1313         smtp = smtplib.SMTP(HOST, self.port,
   1314                             local_hostname='localhost', timeout=15)
   1315         smtp.login('psu', 'doesnotexist')
   1316         smtp.close()
   1317 
   1318     def testAUTH_PLAIN_initial_response_auth(self):
   1319         self.serv.add_feature('AUTH PLAIN')
   1320         smtp = smtplib.SMTP(HOST, self.port,
   1321                             local_hostname='localhost', timeout=15)
   1322         smtp.user = 'psu'
   1323         smtp.password = 'doesnotexist'
   1324         code, response = smtp.auth('plain', smtp.auth_plain)
   1325         smtp.close()
   1326         self.assertEqual(code, 235)
   1327 
   1328 
   1329 if __name__ == '__main__':
   1330     unittest.main()
   1331