1 import asyncore 2 import base64 3 import email.mime.text 4 from email.message import EmailMessage 5 from email.base64mime import body_encode as encode_base64 6 import email.utils 7 import hmac 8 import socket 9 import smtpd 10 import smtplib 11 import io 12 import re 13 import sys 14 import time 15 import select 16 import errno 17 import textwrap 18 import threading 19 20 import unittest 21 from test import support, mock_socket 22 from test.support import HOST, HOSTv4, HOSTv6 23 from unittest.mock import Mock 24 25 26 if sys.platform == 'darwin': 27 # select.poll returns a select.POLLHUP at the end of the tests 28 # on darwin, so just ignore it 29 def handle_expt(self): 30 pass 31 smtpd.SMTPChannel.handle_expt = handle_expt 32 33 34 def server(evt, buf, serv): 35 serv.listen() 36 evt.set() 37 try: 38 conn, addr = serv.accept() 39 except socket.timeout: 40 pass 41 else: 42 n = 500 43 while buf and n > 0: 44 r, w, e = select.select([], [conn], []) 45 if w: 46 sent = conn.send(buf) 47 buf = buf[sent:] 48 49 n -= 1 50 51 conn.close() 52 finally: 53 serv.close() 54 evt.set() 55 56 class GeneralTests(unittest.TestCase): 57 58 def setUp(self): 59 smtplib.socket = mock_socket 60 self.port = 25 61 62 def tearDown(self): 63 smtplib.socket = socket 64 65 # This method is no longer used but is retained for backward compatibility, 66 # so test to make sure it still works. 67 def testQuoteData(self): 68 teststr = "abc\n.jkl\rfoo\r\n..blue" 69 expected = "abc\r\n..jkl\r\nfoo\r\n...blue" 70 self.assertEqual(expected, smtplib.quotedata(teststr)) 71 72 def testBasic1(self): 73 mock_socket.reply_with(b"220 Hola mundo") 74 # connects 75 smtp = smtplib.SMTP(HOST, self.port) 76 smtp.close() 77 78 def testSourceAddress(self): 79 mock_socket.reply_with(b"220 Hola mundo") 80 # connects 81 smtp = smtplib.SMTP(HOST, self.port, 82 source_address=('127.0.0.1',19876)) 83 self.assertEqual(smtp.source_address, ('127.0.0.1', 19876)) 84 smtp.close() 85 86 def testBasic2(self): 87 mock_socket.reply_with(b"220 Hola mundo") 88 # connects, include port in host name 89 smtp = smtplib.SMTP("%s:%s" % (HOST, self.port)) 90 smtp.close() 91 92 def testLocalHostName(self): 93 mock_socket.reply_with(b"220 Hola mundo") 94 # check that supplied local_hostname is used 95 smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost") 96 self.assertEqual(smtp.local_hostname, "testhost") 97 smtp.close() 98 99 def testTimeoutDefault(self): 100 mock_socket.reply_with(b"220 Hola mundo") 101 self.assertIsNone(mock_socket.getdefaulttimeout()) 102 mock_socket.setdefaulttimeout(30) 103 self.assertEqual(mock_socket.getdefaulttimeout(), 30) 104 try: 105 smtp = smtplib.SMTP(HOST, self.port) 106 finally: 107 mock_socket.setdefaulttimeout(None) 108 self.assertEqual(smtp.sock.gettimeout(), 30) 109 smtp.close() 110 111 def testTimeoutNone(self): 112 mock_socket.reply_with(b"220 Hola mundo") 113 self.assertIsNone(socket.getdefaulttimeout()) 114 socket.setdefaulttimeout(30) 115 try: 116 smtp = smtplib.SMTP(HOST, self.port, timeout=None) 117 finally: 118 socket.setdefaulttimeout(None) 119 self.assertIsNone(smtp.sock.gettimeout()) 120 smtp.close() 121 122 def testTimeoutValue(self): 123 mock_socket.reply_with(b"220 Hola mundo") 124 smtp = smtplib.SMTP(HOST, self.port, timeout=30) 125 self.assertEqual(smtp.sock.gettimeout(), 30) 126 smtp.close() 127 128 def test_debuglevel(self): 129 mock_socket.reply_with(b"220 Hello world") 130 smtp = smtplib.SMTP() 131 smtp.set_debuglevel(1) 132 with support.captured_stderr() as stderr: 133 smtp.connect(HOST, self.port) 134 smtp.close() 135 expected = re.compile(r"^connect:", re.MULTILINE) 136 self.assertRegex(stderr.getvalue(), expected) 137 138 def test_debuglevel_2(self): 139 mock_socket.reply_with(b"220 Hello world") 140 smtp = smtplib.SMTP() 141 smtp.set_debuglevel(2) 142 with support.captured_stderr() as stderr: 143 smtp.connect(HOST, self.port) 144 smtp.close() 145 expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ", 146 re.MULTILINE) 147 self.assertRegex(stderr.getvalue(), expected) 148 149 150 # Test server thread using the specified SMTP server class 151 def debugging_server(serv, serv_evt, client_evt): 152 serv_evt.set() 153 154 try: 155 if hasattr(select, 'poll'): 156 poll_fun = asyncore.poll2 157 else: 158 poll_fun = asyncore.poll 159 160 n = 1000 161 while asyncore.socket_map and n > 0: 162 poll_fun(0.01, asyncore.socket_map) 163 164 # when the client conversation is finished, it will 165 # set client_evt, and it's then ok to kill the server 166 if client_evt.is_set(): 167 serv.close() 168 break 169 170 n -= 1 171 172 except socket.timeout: 173 pass 174 finally: 175 if not client_evt.is_set(): 176 # allow some time for the client to read the result 177 time.sleep(0.5) 178 serv.close() 179 asyncore.close_all() 180 serv_evt.set() 181 182 MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' 183 MSG_END = '------------ END MESSAGE ------------\n' 184 185 # NOTE: Some SMTP objects in the tests below are created with a non-default 186 # local_hostname argument to the constructor, since (on some systems) the FQDN 187 # lookup caused by the default local_hostname sometimes takes so long that the 188 # test server times out, causing the test to fail. 189 190 # Test behavior of smtpd.DebuggingServer 191 class DebuggingServerTests(unittest.TestCase): 192 193 maxDiff = None 194 195 def setUp(self): 196 self.real_getfqdn = socket.getfqdn 197 socket.getfqdn = mock_socket.getfqdn 198 # temporarily replace sys.stdout to capture DebuggingServer output 199 self.old_stdout = sys.stdout 200 self.output = io.StringIO() 201 sys.stdout = self.output 202 203 self.serv_evt = threading.Event() 204 self.client_evt = threading.Event() 205 # Capture SMTPChannel debug output 206 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM 207 smtpd.DEBUGSTREAM = io.StringIO() 208 # Pick a random unused port by passing 0 for the port number 209 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1), 210 decode_data=True) 211 # Keep a note of what server host and port were assigned 212 self.host, self.port = self.serv.socket.getsockname()[:2] 213 serv_args = (self.serv, self.serv_evt, self.client_evt) 214 self.thread = threading.Thread(target=debugging_server, args=serv_args) 215 self.thread.start() 216 217 # wait until server thread has assigned a port number 218 self.serv_evt.wait() 219 self.serv_evt.clear() 220 221 def tearDown(self): 222 socket.getfqdn = self.real_getfqdn 223 # indicate that the client is finished 224 self.client_evt.set() 225 # wait for the server thread to terminate 226 self.serv_evt.wait() 227 self.thread.join() 228 # restore sys.stdout 229 sys.stdout = self.old_stdout 230 # restore DEBUGSTREAM 231 smtpd.DEBUGSTREAM.close() 232 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM 233 234 def get_output_without_xpeer(self): 235 test_output = self.output.getvalue() 236 return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2', 237 test_output, flags=re.MULTILINE|re.DOTALL) 238 239 def testBasic(self): 240 # connect 241 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 242 smtp.quit() 243 244 def testSourceAddress(self): 245 # connect 246 src_port = support.find_unused_port() 247 try: 248 smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost', 249 timeout=3, source_address=(self.host, src_port)) 250 self.assertEqual(smtp.source_address, (self.host, src_port)) 251 self.assertEqual(smtp.local_hostname, 'localhost') 252 smtp.quit() 253 except OSError as e: 254 if e.errno == errno.EADDRINUSE: 255 self.skipTest("couldn't bind to source port %d" % src_port) 256 raise 257 258 def testNOOP(self): 259 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 260 expected = (250, b'OK') 261 self.assertEqual(smtp.noop(), expected) 262 smtp.quit() 263 264 def testRSET(self): 265 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 266 expected = (250, b'OK') 267 self.assertEqual(smtp.rset(), expected) 268 smtp.quit() 269 270 def testELHO(self): 271 # EHLO isn't implemented in DebuggingServer 272 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 273 expected = (250, b'\nSIZE 33554432\nHELP') 274 self.assertEqual(smtp.ehlo(), expected) 275 smtp.quit() 276 277 def testEXPNNotImplemented(self): 278 # EXPN isn't implemented in DebuggingServer 279 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 280 expected = (502, b'EXPN not implemented') 281 smtp.putcmd('EXPN') 282 self.assertEqual(smtp.getreply(), expected) 283 smtp.quit() 284 285 def testVRFY(self): 286 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 287 expected = (252, b'Cannot VRFY user, but will accept message ' + \ 288 b'and attempt delivery') 289 self.assertEqual(smtp.vrfy('nobody (at] nowhere.com'), expected) 290 self.assertEqual(smtp.verify('nobody (at] nowhere.com'), expected) 291 smtp.quit() 292 293 def testSecondHELO(self): 294 # check that a second HELO returns a message that it's a duplicate 295 # (this behavior is specific to smtpd.SMTPChannel) 296 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 297 smtp.helo() 298 expected = (503, b'Duplicate HELO/EHLO') 299 self.assertEqual(smtp.helo(), expected) 300 smtp.quit() 301 302 def testHELP(self): 303 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 304 self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \ 305 b'RCPT DATA RSET NOOP QUIT VRFY') 306 smtp.quit() 307 308 def testSend(self): 309 # connect and send mail 310 m = 'A test message' 311 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 312 smtp.sendmail('John', 'Sally', m) 313 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor 314 # in asyncore. This sleep might help, but should really be fixed 315 # properly by using an Event variable. 316 time.sleep(0.01) 317 smtp.quit() 318 319 self.client_evt.set() 320 self.serv_evt.wait() 321 self.output.flush() 322 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 323 self.assertEqual(self.output.getvalue(), mexpect) 324 325 def testSendBinary(self): 326 m = b'A test message' 327 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 328 smtp.sendmail('John', 'Sally', m) 329 # XXX (see comment in testSend) 330 time.sleep(0.01) 331 smtp.quit() 332 333 self.client_evt.set() 334 self.serv_evt.wait() 335 self.output.flush() 336 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END) 337 self.assertEqual(self.output.getvalue(), mexpect) 338 339 def testSendNeedingDotQuote(self): 340 # Issue 12283 341 m = '.A test\n.mes.sage.' 342 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 343 smtp.sendmail('John', 'Sally', m) 344 # XXX (see comment in testSend) 345 time.sleep(0.01) 346 smtp.quit() 347 348 self.client_evt.set() 349 self.serv_evt.wait() 350 self.output.flush() 351 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 352 self.assertEqual(self.output.getvalue(), mexpect) 353 354 def testSendNullSender(self): 355 m = 'A test message' 356 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 357 smtp.sendmail('<>', 'Sally', m) 358 # XXX (see comment in testSend) 359 time.sleep(0.01) 360 smtp.quit() 361 362 self.client_evt.set() 363 self.serv_evt.wait() 364 self.output.flush() 365 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 366 self.assertEqual(self.output.getvalue(), mexpect) 367 debugout = smtpd.DEBUGSTREAM.getvalue() 368 sender = re.compile("^sender: <>$", re.MULTILINE) 369 self.assertRegex(debugout, sender) 370 371 def testSendMessage(self): 372 m = email.mime.text.MIMEText('A test message') 373 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 374 smtp.send_message(m, from_addr='John', to_addrs='Sally') 375 # XXX (see comment in testSend) 376 time.sleep(0.01) 377 smtp.quit() 378 379 self.client_evt.set() 380 self.serv_evt.wait() 381 self.output.flush() 382 # Remove the X-Peer header that DebuggingServer adds as figuring out 383 # exactly what IP address format is put there is not easy (and 384 # irrelevant to our test). Typically 127.0.0.1 or ::1, but it is 385 # not always the same as socket.gethostbyname(HOST). :( 386 test_output = self.get_output_without_xpeer() 387 del m['X-Peer'] 388 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 389 self.assertEqual(test_output, mexpect) 390 391 def testSendMessageWithAddresses(self): 392 m = email.mime.text.MIMEText('A test message') 393 m['From'] = 'foo (at] bar.com' 394 m['To'] = 'John' 395 m['CC'] = 'Sally, Fred' 396 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped (at] silly.walks.com>' 397 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 398 smtp.send_message(m) 399 # XXX (see comment in testSend) 400 time.sleep(0.01) 401 smtp.quit() 402 # make sure the Bcc header is still in the message. 403 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" ' 404 '<warped (at] silly.walks.com>') 405 406 self.client_evt.set() 407 self.serv_evt.wait() 408 self.output.flush() 409 # Remove the X-Peer header that DebuggingServer adds. 410 test_output = self.get_output_without_xpeer() 411 del m['X-Peer'] 412 # The Bcc header should not be transmitted. 413 del m['Bcc'] 414 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 415 self.assertEqual(test_output, mexpect) 416 debugout = smtpd.DEBUGSTREAM.getvalue() 417 sender = re.compile("^sender: foo (at] bar.com$", re.MULTILINE) 418 self.assertRegex(debugout, sender) 419 for addr in ('John', 'Sally', 'Fred', 'root@localhost', 420 'warped (at] silly.walks.com'): 421 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 422 re.MULTILINE) 423 self.assertRegex(debugout, to_addr) 424 425 def testSendMessageWithSomeAddresses(self): 426 # Make sure nothing breaks if not all of the three 'to' headers exist 427 m = email.mime.text.MIMEText('A test message') 428 m['From'] = 'foo (at] bar.com' 429 m['To'] = 'John, Dinsdale' 430 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 431 smtp.send_message(m) 432 # XXX (see comment in testSend) 433 time.sleep(0.01) 434 smtp.quit() 435 436 self.client_evt.set() 437 self.serv_evt.wait() 438 self.output.flush() 439 # Remove the X-Peer header that DebuggingServer adds. 440 test_output = self.get_output_without_xpeer() 441 del m['X-Peer'] 442 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 443 self.assertEqual(test_output, mexpect) 444 debugout = smtpd.DEBUGSTREAM.getvalue() 445 sender = re.compile("^sender: foo (at] bar.com$", re.MULTILINE) 446 self.assertRegex(debugout, sender) 447 for addr in ('John', 'Dinsdale'): 448 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 449 re.MULTILINE) 450 self.assertRegex(debugout, to_addr) 451 452 def testSendMessageWithSpecifiedAddresses(self): 453 # Make sure addresses specified in call override those in message. 454 m = email.mime.text.MIMEText('A test message') 455 m['From'] = 'foo (at] bar.com' 456 m['To'] = 'John, Dinsdale' 457 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 458 smtp.send_message(m, from_addr='joe (at] example.com', to_addrs='foo (at] example.net') 459 # XXX (see comment in testSend) 460 time.sleep(0.01) 461 smtp.quit() 462 463 self.client_evt.set() 464 self.serv_evt.wait() 465 self.output.flush() 466 # Remove the X-Peer header that DebuggingServer adds. 467 test_output = self.get_output_without_xpeer() 468 del m['X-Peer'] 469 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 470 self.assertEqual(test_output, mexpect) 471 debugout = smtpd.DEBUGSTREAM.getvalue() 472 sender = re.compile("^sender: joe (at] example.com$", re.MULTILINE) 473 self.assertRegex(debugout, sender) 474 for addr in ('John', 'Dinsdale'): 475 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 476 re.MULTILINE) 477 self.assertNotRegex(debugout, to_addr) 478 recip = re.compile(r"^recips: .*'foo (at] example.net'.*$", re.MULTILINE) 479 self.assertRegex(debugout, recip) 480 481 def testSendMessageWithMultipleFrom(self): 482 # Sender overrides To 483 m = email.mime.text.MIMEText('A test message') 484 m['From'] = 'Bernard, Bianca' 485 m['Sender'] = 'the_rescuers (at] Rescue-Aid-Society.com' 486 m['To'] = 'John, Dinsdale' 487 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 488 smtp.send_message(m) 489 # XXX (see comment in testSend) 490 time.sleep(0.01) 491 smtp.quit() 492 493 self.client_evt.set() 494 self.serv_evt.wait() 495 self.output.flush() 496 # Remove the X-Peer header that DebuggingServer adds. 497 test_output = self.get_output_without_xpeer() 498 del m['X-Peer'] 499 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 500 self.assertEqual(test_output, mexpect) 501 debugout = smtpd.DEBUGSTREAM.getvalue() 502 sender = re.compile("^sender: the_rescuers (at] Rescue-Aid-Society.com$", re.MULTILINE) 503 self.assertRegex(debugout, sender) 504 for addr in ('John', 'Dinsdale'): 505 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 506 re.MULTILINE) 507 self.assertRegex(debugout, to_addr) 508 509 def testSendMessageResent(self): 510 m = email.mime.text.MIMEText('A test message') 511 m['From'] = 'foo (at] bar.com' 512 m['To'] = 'John' 513 m['CC'] = 'Sally, Fred' 514 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped (at] silly.walks.com>' 515 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 516 m['Resent-From'] = 'holy (at] grail.net' 517 m['Resent-To'] = 'Martha <my_mom (at] great.cooker.com>, Jeff' 518 m['Resent-Bcc'] = 'doe (at] losthope.net' 519 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 520 smtp.send_message(m) 521 # XXX (see comment in testSend) 522 time.sleep(0.01) 523 smtp.quit() 524 525 self.client_evt.set() 526 self.serv_evt.wait() 527 self.output.flush() 528 # The Resent-Bcc headers are deleted before serialization. 529 del m['Bcc'] 530 del m['Resent-Bcc'] 531 # Remove the X-Peer header that DebuggingServer adds. 532 test_output = self.get_output_without_xpeer() 533 del m['X-Peer'] 534 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 535 self.assertEqual(test_output, mexpect) 536 debugout = smtpd.DEBUGSTREAM.getvalue() 537 sender = re.compile("^sender: holy (at] grail.net$", re.MULTILINE) 538 self.assertRegex(debugout, sender) 539 for addr in ('my_mom (at] great.cooker.com', 'Jeff', 'doe (at] losthope.net'): 540 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 541 re.MULTILINE) 542 self.assertRegex(debugout, to_addr) 543 544 def testSendMessageMultipleResentRaises(self): 545 m = email.mime.text.MIMEText('A test message') 546 m['From'] = 'foo (at] bar.com' 547 m['To'] = 'John' 548 m['CC'] = 'Sally, Fred' 549 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped (at] silly.walks.com>' 550 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 551 m['Resent-From'] = 'holy (at] grail.net' 552 m['Resent-To'] = 'Martha <my_mom (at] great.cooker.com>, Jeff' 553 m['Resent-Bcc'] = 'doe (at] losthope.net' 554 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000' 555 m['Resent-To'] = 'holy (at] grail.net' 556 m['Resent-From'] = 'Martha <my_mom (at] great.cooker.com>, Jeff' 557 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 558 with self.assertRaises(ValueError): 559 smtp.send_message(m) 560 smtp.close() 561 562 class NonConnectingTests(unittest.TestCase): 563 564 def testNotConnected(self): 565 # Test various operations on an unconnected SMTP object that 566 # should raise exceptions (at present the attempt in SMTP.send 567 # to reference the nonexistent 'sock' attribute of the SMTP object 568 # causes an AttributeError) 569 smtp = smtplib.SMTP() 570 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo) 571 self.assertRaises(smtplib.SMTPServerDisconnected, 572 smtp.send, 'test msg') 573 574 def testNonnumericPort(self): 575 # check that non-numeric port raises OSError 576 self.assertRaises(OSError, smtplib.SMTP, 577 "localhost", "bogus") 578 self.assertRaises(OSError, smtplib.SMTP, 579 "localhost:bogus") 580 581 582 class DefaultArgumentsTests(unittest.TestCase): 583 584 def setUp(self): 585 self.msg = EmailMessage() 586 self.msg['From'] = 'Polo <fo (at] bar.com>' 587 self.smtp = smtplib.SMTP() 588 self.smtp.ehlo = Mock(return_value=(200, 'OK')) 589 self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock() 590 591 def testSendMessage(self): 592 expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME') 593 self.smtp.send_message(self.msg) 594 self.smtp.send_message(self.msg) 595 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 596 expected_mail_options) 597 self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3], 598 expected_mail_options) 599 600 def testSendMessageWithMailOptions(self): 601 mail_options = ['STARTTLS'] 602 expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME') 603 self.smtp.send_message(self.msg, None, None, mail_options) 604 self.assertEqual(mail_options, ['STARTTLS']) 605 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 606 expected_mail_options) 607 608 609 # test response of client to a non-successful HELO message 610 class BadHELOServerTests(unittest.TestCase): 611 612 def setUp(self): 613 smtplib.socket = mock_socket 614 mock_socket.reply_with(b"199 no hello for you!") 615 self.old_stdout = sys.stdout 616 self.output = io.StringIO() 617 sys.stdout = self.output 618 self.port = 25 619 620 def tearDown(self): 621 smtplib.socket = socket 622 sys.stdout = self.old_stdout 623 624 def testFailingHELO(self): 625 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, 626 HOST, self.port, 'localhost', 3) 627 628 629 class TooLongLineTests(unittest.TestCase): 630 respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n' 631 632 def setUp(self): 633 self.old_stdout = sys.stdout 634 self.output = io.StringIO() 635 sys.stdout = self.output 636 637 self.evt = threading.Event() 638 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 639 self.sock.settimeout(15) 640 self.port = support.bind_port(self.sock) 641 servargs = (self.evt, self.respdata, self.sock) 642 thread = threading.Thread(target=server, args=servargs) 643 thread.start() 644 self.addCleanup(thread.join) 645 self.evt.wait() 646 self.evt.clear() 647 648 def tearDown(self): 649 self.evt.wait() 650 sys.stdout = self.old_stdout 651 652 def testLineTooLong(self): 653 self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP, 654 HOST, self.port, 'localhost', 3) 655 656 657 sim_users = {'Mr.A (at] somewhere.com':'John A', 658 'Ms.B (at] xn--fo-fka.com':'Sally B', 659 'Mrs.C (at] somewhereesle.com':'Ruth C', 660 } 661 662 sim_auth = ('Mr.A (at] somewhere.com', 'somepassword') 663 sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 664 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') 665 sim_lists = {'list-1':['Mr.A (at] somewhere.com','Mrs.C (at] somewhereesle.com'], 666 'list-2':['Ms.B (at] xn--fo-fka.com',], 667 } 668 669 # Simulated SMTP channel & server 670 class ResponseException(Exception): pass 671 class SimSMTPChannel(smtpd.SMTPChannel): 672 673 quit_response = None 674 mail_response = None 675 rcpt_response = None 676 data_response = None 677 rcpt_count = 0 678 rset_count = 0 679 disconnect = 0 680 AUTH = 99 # Add protocol state to enable auth testing. 681 authenticated_user = None 682 683 def __init__(self, extra_features, *args, **kw): 684 self._extrafeatures = ''.join( 685 [ "250-{0}\r\n".format(x) for x in extra_features ]) 686 super(SimSMTPChannel, self).__init__(*args, **kw) 687 688 # AUTH related stuff. It would be nice if support for this were in smtpd. 689 def found_terminator(self): 690 if self.smtp_state == self.AUTH: 691 line = self._emptystring.join(self.received_lines) 692 print('Data:', repr(line), file=smtpd.DEBUGSTREAM) 693 self.received_lines = [] 694 try: 695 self.auth_object(line) 696 except ResponseException as e: 697 self.smtp_state = self.COMMAND 698 self.push('%s %s' % (e.smtp_code, e.smtp_error)) 699 return 700 super().found_terminator() 701 702 703 def smtp_AUTH(self, arg): 704 if not self.seen_greeting: 705 self.push('503 Error: send EHLO first') 706 return 707 if not self.extended_smtp or 'AUTH' not in self._extrafeatures: 708 self.push('500 Error: command "AUTH" not recognized') 709 return 710 if self.authenticated_user is not None: 711 self.push( 712 '503 Bad sequence of commands: already authenticated') 713 return 714 args = arg.split() 715 if len(args) not in [1, 2]: 716 self.push('501 Syntax: AUTH <mechanism> [initial-response]') 717 return 718 auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') 719 try: 720 self.auth_object = getattr(self, auth_object_name) 721 except AttributeError: 722 self.push('504 Command parameter not implemented: unsupported ' 723 ' authentication mechanism {!r}'.format(auth_object_name)) 724 return 725 self.smtp_state = self.AUTH 726 self.auth_object(args[1] if len(args) == 2 else None) 727 728 def _authenticated(self, user, valid): 729 if valid: 730 self.authenticated_user = user 731 self.push('235 Authentication Succeeded') 732 else: 733 self.push('535 Authentication credentials invalid') 734 self.smtp_state = self.COMMAND 735 736 def _decode_base64(self, string): 737 return base64.decodebytes(string.encode('ascii')).decode('utf-8') 738 739 def _auth_plain(self, arg=None): 740 if arg is None: 741 self.push('334 ') 742 else: 743 logpass = self._decode_base64(arg) 744 try: 745 *_, user, password = logpass.split('\0') 746 except ValueError as e: 747 self.push('535 Splitting response {!r} into user and password' 748 ' failed: {}'.format(logpass, e)) 749 return 750 self._authenticated(user, password == sim_auth[1]) 751 752 def _auth_login(self, arg=None): 753 if arg is None: 754 # base64 encoded 'Username:' 755 self.push('334 VXNlcm5hbWU6') 756 elif not hasattr(self, '_auth_login_user'): 757 self._auth_login_user = self._decode_base64(arg) 758 # base64 encoded 'Password:' 759 self.push('334 UGFzc3dvcmQ6') 760 else: 761 password = self._decode_base64(arg) 762 self._authenticated(self._auth_login_user, password == sim_auth[1]) 763 del self._auth_login_user 764 765 def _auth_cram_md5(self, arg=None): 766 if arg is None: 767 self.push('334 {}'.format(sim_cram_md5_challenge)) 768 else: 769 logpass = self._decode_base64(arg) 770 try: 771 user, hashed_pass = logpass.split() 772 except ValueError as e: 773 self.push('535 Splitting response {!r} into user and password ' 774 'failed: {}'.format(logpass, e)) 775 return False 776 valid_hashed_pass = hmac.HMAC( 777 sim_auth[1].encode('ascii'), 778 self._decode_base64(sim_cram_md5_challenge).encode('ascii'), 779 'md5').hexdigest() 780 self._authenticated(user, hashed_pass == valid_hashed_pass) 781 # end AUTH related stuff. 782 783 def smtp_EHLO(self, arg): 784 resp = ('250-testhost\r\n' 785 '250-EXPN\r\n' 786 '250-SIZE 20000000\r\n' 787 '250-STARTTLS\r\n' 788 '250-DELIVERBY\r\n') 789 resp = resp + self._extrafeatures + '250 HELP' 790 self.push(resp) 791 self.seen_greeting = arg 792 self.extended_smtp = True 793 794 def smtp_VRFY(self, arg): 795 # For max compatibility smtplib should be sending the raw address. 796 if arg in sim_users: 797 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg))) 798 else: 799 self.push('550 No such user: %s' % arg) 800 801 def smtp_EXPN(self, arg): 802 list_name = arg.lower() 803 if list_name in sim_lists: 804 user_list = sim_lists[list_name] 805 for n, user_email in enumerate(user_list): 806 quoted_addr = smtplib.quoteaddr(user_email) 807 if n < len(user_list) - 1: 808 self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) 809 else: 810 self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) 811 else: 812 self.push('550 No access for you!') 813 814 def smtp_QUIT(self, arg): 815 if self.quit_response is None: 816 super(SimSMTPChannel, self).smtp_QUIT(arg) 817 else: 818 self.push(self.quit_response) 819 self.close_when_done() 820 821 def smtp_MAIL(self, arg): 822 if self.mail_response is None: 823 super().smtp_MAIL(arg) 824 else: 825 self.push(self.mail_response) 826 if self.disconnect: 827 self.close_when_done() 828 829 def smtp_RCPT(self, arg): 830 if self.rcpt_response is None: 831 super().smtp_RCPT(arg) 832 return 833 self.rcpt_count += 1 834 self.push(self.rcpt_response[self.rcpt_count-1]) 835 836 def smtp_RSET(self, arg): 837 self.rset_count += 1 838 super().smtp_RSET(arg) 839 840 def smtp_DATA(self, arg): 841 if self.data_response is None: 842 super().smtp_DATA(arg) 843 else: 844 self.push(self.data_response) 845 846 def handle_error(self): 847 raise 848 849 850 class SimSMTPServer(smtpd.SMTPServer): 851 852 channel_class = SimSMTPChannel 853 854 def __init__(self, *args, **kw): 855 self._extra_features = [] 856 self._addresses = {} 857 smtpd.SMTPServer.__init__(self, *args, **kw) 858 859 def handle_accepted(self, conn, addr): 860 self._SMTPchannel = self.channel_class( 861 self._extra_features, self, conn, addr, 862 decode_data=self._decode_data) 863 864 def process_message(self, peer, mailfrom, rcpttos, data): 865 self._addresses['from'] = mailfrom 866 self._addresses['tos'] = rcpttos 867 868 def add_feature(self, feature): 869 self._extra_features.append(feature) 870 871 def handle_error(self): 872 raise 873 874 875 # Test various SMTP & ESMTP commands/behaviors that require a simulated server 876 # (i.e., something with more features than DebuggingServer) 877 class SMTPSimTests(unittest.TestCase): 878 879 def setUp(self): 880 self.real_getfqdn = socket.getfqdn 881 socket.getfqdn = mock_socket.getfqdn 882 self.serv_evt = threading.Event() 883 self.client_evt = threading.Event() 884 # Pick a random unused port by passing 0 for the port number 885 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True) 886 # Keep a note of what port was assigned 887 self.port = self.serv.socket.getsockname()[1] 888 serv_args = (self.serv, self.serv_evt, self.client_evt) 889 self.thread = threading.Thread(target=debugging_server, args=serv_args) 890 self.thread.start() 891 892 # wait until server thread has assigned a port number 893 self.serv_evt.wait() 894 self.serv_evt.clear() 895 896 def tearDown(self): 897 socket.getfqdn = self.real_getfqdn 898 # indicate that the client is finished 899 self.client_evt.set() 900 # wait for the server thread to terminate 901 self.serv_evt.wait() 902 self.thread.join() 903 904 def testBasic(self): 905 # smoke test 906 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 907 smtp.quit() 908 909 def testEHLO(self): 910 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 911 912 # no features should be present before the EHLO 913 self.assertEqual(smtp.esmtp_features, {}) 914 915 # features expected from the test server 916 expected_features = {'expn':'', 917 'size': '20000000', 918 'starttls': '', 919 'deliverby': '', 920 'help': '', 921 } 922 923 smtp.ehlo() 924 self.assertEqual(smtp.esmtp_features, expected_features) 925 for k in expected_features: 926 self.assertTrue(smtp.has_extn(k)) 927 self.assertFalse(smtp.has_extn('unsupported-feature')) 928 smtp.quit() 929 930 def testVRFY(self): 931 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 932 933 for addr_spec, name in sim_users.items(): 934 expected_known = (250, bytes('%s %s' % 935 (name, smtplib.quoteaddr(addr_spec)), 936 "ascii")) 937 self.assertEqual(smtp.vrfy(addr_spec), expected_known) 938 939 u = 'nobody (at] nowhere.com' 940 expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) 941 self.assertEqual(smtp.vrfy(u), expected_unknown) 942 smtp.quit() 943 944 def testEXPN(self): 945 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 946 947 for listname, members in sim_lists.items(): 948 users = [] 949 for m in members: 950 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) 951 expected_known = (250, bytes('\n'.join(users), "ascii")) 952 self.assertEqual(smtp.expn(listname), expected_known) 953 954 u = 'PSU-Members-List' 955 expected_unknown = (550, b'No access for you!') 956 self.assertEqual(smtp.expn(u), expected_unknown) 957 smtp.quit() 958 959 def testAUTH_PLAIN(self): 960 self.serv.add_feature("AUTH PLAIN") 961 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 962 resp = smtp.login(sim_auth[0], sim_auth[1]) 963 self.assertEqual(resp, (235, b'Authentication Succeeded')) 964 smtp.close() 965 966 def testAUTH_LOGIN(self): 967 self.serv.add_feature("AUTH LOGIN") 968 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 969 resp = smtp.login(sim_auth[0], sim_auth[1]) 970 self.assertEqual(resp, (235, b'Authentication Succeeded')) 971 smtp.close() 972 973 def testAUTH_CRAM_MD5(self): 974 self.serv.add_feature("AUTH CRAM-MD5") 975 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 976 resp = smtp.login(sim_auth[0], sim_auth[1]) 977 self.assertEqual(resp, (235, b'Authentication Succeeded')) 978 smtp.close() 979 980 def testAUTH_multiple(self): 981 # Test that multiple authentication methods are tried. 982 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") 983 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 984 resp = smtp.login(sim_auth[0], sim_auth[1]) 985 self.assertEqual(resp, (235, b'Authentication Succeeded')) 986 smtp.close() 987 988 def test_auth_function(self): 989 supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'} 990 for mechanism in supported: 991 self.serv.add_feature("AUTH {}".format(mechanism)) 992 for mechanism in supported: 993 with self.subTest(mechanism=mechanism): 994 smtp = smtplib.SMTP(HOST, self.port, 995 local_hostname='localhost', timeout=15) 996 smtp.ehlo('foo') 997 smtp.user, smtp.password = sim_auth[0], sim_auth[1] 998 method = 'auth_' + mechanism.lower().replace('-', '_') 999 resp = smtp.auth(mechanism, getattr(smtp, method)) 1000 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1001 smtp.close() 1002 1003 def test_quit_resets_greeting(self): 1004 smtp = smtplib.SMTP(HOST, self.port, 1005 local_hostname='localhost', 1006 timeout=15) 1007 code, message = smtp.ehlo() 1008 self.assertEqual(code, 250) 1009 self.assertIn('size', smtp.esmtp_features) 1010 smtp.quit() 1011 self.assertNotIn('size', smtp.esmtp_features) 1012 smtp.connect(HOST, self.port) 1013 self.assertNotIn('size', smtp.esmtp_features) 1014 smtp.ehlo_or_helo_if_needed() 1015 self.assertIn('size', smtp.esmtp_features) 1016 smtp.quit() 1017 1018 def test_with_statement(self): 1019 with smtplib.SMTP(HOST, self.port) as smtp: 1020 code, message = smtp.noop() 1021 self.assertEqual(code, 250) 1022 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1023 with smtplib.SMTP(HOST, self.port) as smtp: 1024 smtp.close() 1025 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1026 1027 def test_with_statement_QUIT_failure(self): 1028 with self.assertRaises(smtplib.SMTPResponseException) as error: 1029 with smtplib.SMTP(HOST, self.port) as smtp: 1030 smtp.noop() 1031 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED' 1032 self.assertEqual(error.exception.smtp_code, 421) 1033 self.assertEqual(error.exception.smtp_error, b'QUIT FAILED') 1034 1035 #TODO: add tests for correct AUTH method fallback now that the 1036 #test infrastructure can support it. 1037 1038 # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception 1039 def test__rest_from_mail_cmd(self): 1040 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1041 smtp.noop() 1042 self.serv._SMTPchannel.mail_response = '451 Requested action aborted' 1043 self.serv._SMTPchannel.disconnect = True 1044 with self.assertRaises(smtplib.SMTPSenderRefused): 1045 smtp.sendmail('John', 'Sally', 'test message') 1046 self.assertIsNone(smtp.sock) 1047 1048 # Issue 5713: make sure close, not rset, is called if we get a 421 error 1049 def test_421_from_mail_cmd(self): 1050 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1051 smtp.noop() 1052 self.serv._SMTPchannel.mail_response = '421 closing connection' 1053 with self.assertRaises(smtplib.SMTPSenderRefused): 1054 smtp.sendmail('John', 'Sally', 'test message') 1055 self.assertIsNone(smtp.sock) 1056 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1057 1058 def test_421_from_rcpt_cmd(self): 1059 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1060 smtp.noop() 1061 self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing'] 1062 with self.assertRaises(smtplib.SMTPRecipientsRefused) as r: 1063 smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message') 1064 self.assertIsNone(smtp.sock) 1065 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1066 self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')}) 1067 1068 def test_421_from_data_cmd(self): 1069 class MySimSMTPChannel(SimSMTPChannel): 1070 def found_terminator(self): 1071 if self.smtp_state == self.DATA: 1072 self.push('421 closing') 1073 else: 1074 super().found_terminator() 1075 self.serv.channel_class = MySimSMTPChannel 1076 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1077 smtp.noop() 1078 with self.assertRaises(smtplib.SMTPDataError): 1079 smtp.sendmail('John (at] foo.org', ['Sally (at] foo.org'], 'test message') 1080 self.assertIsNone(smtp.sock) 1081 self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) 1082 1083 def test_smtputf8_NotSupportedError_if_no_server_support(self): 1084 smtp = smtplib.SMTP( 1085 HOST, self.port, local_hostname='localhost', timeout=3) 1086 self.addCleanup(smtp.close) 1087 smtp.ehlo() 1088 self.assertTrue(smtp.does_esmtp) 1089 self.assertFalse(smtp.has_extn('smtputf8')) 1090 self.assertRaises( 1091 smtplib.SMTPNotSupportedError, 1092 smtp.sendmail, 1093 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1094 self.assertRaises( 1095 smtplib.SMTPNotSupportedError, 1096 smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) 1097 1098 def test_send_unicode_without_SMTPUTF8(self): 1099 smtp = smtplib.SMTP( 1100 HOST, self.port, local_hostname='localhost', timeout=3) 1101 self.addCleanup(smtp.close) 1102 self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Bb', '') 1103 self.assertRaises(UnicodeEncodeError, smtp.mail, 'lice') 1104 1105 def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self): 1106 # This test is located here and not in the SMTPUTF8SimTests 1107 # class because it needs a "regular" SMTP server to work 1108 msg = EmailMessage() 1109 msg['From'] = "Polo <fo (at] bar.com>" 1110 msg['To'] = 'Dinsdale' 1111 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1112 smtp = smtplib.SMTP( 1113 HOST, self.port, local_hostname='localhost', timeout=3) 1114 self.addCleanup(smtp.close) 1115 with self.assertRaises(smtplib.SMTPNotSupportedError): 1116 smtp.send_message(msg) 1117 1118 def test_name_field_not_included_in_envelop_addresses(self): 1119 smtp = smtplib.SMTP( 1120 HOST, self.port, local_hostname='localhost', timeout=3 1121 ) 1122 self.addCleanup(smtp.close) 1123 1124 message = EmailMessage() 1125 message['From'] = email.utils.formataddr(('Michal', 'michael (at] example.com')) 1126 message['To'] = email.utils.formataddr(('Ren', 'rene (at] example.com')) 1127 1128 self.assertDictEqual(smtp.send_message(message), {}) 1129 1130 self.assertEqual(self.serv._addresses['from'], 'michael (at] example.com') 1131 self.assertEqual(self.serv._addresses['tos'], ['rene (at] example.com']) 1132 1133 1134 class SimSMTPUTF8Server(SimSMTPServer): 1135 1136 def __init__(self, *args, **kw): 1137 # The base SMTP server turns these on automatically, but our test 1138 # server is set up to munge the EHLO response, so we need to provide 1139 # them as well. And yes, the call is to SMTPServer not SimSMTPServer. 1140 self._extra_features = ['SMTPUTF8', '8BITMIME'] 1141 smtpd.SMTPServer.__init__(self, *args, **kw) 1142 1143 def handle_accepted(self, conn, addr): 1144 self._SMTPchannel = self.channel_class( 1145 self._extra_features, self, conn, addr, 1146 decode_data=self._decode_data, 1147 enable_SMTPUTF8=self.enable_SMTPUTF8, 1148 ) 1149 1150 def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, 1151 rcpt_options=None): 1152 self.last_peer = peer 1153 self.last_mailfrom = mailfrom 1154 self.last_rcpttos = rcpttos 1155 self.last_message = data 1156 self.last_mail_options = mail_options 1157 self.last_rcpt_options = rcpt_options 1158 1159 1160 class SMTPUTF8SimTests(unittest.TestCase): 1161 1162 maxDiff = None 1163 1164 def setUp(self): 1165 self.real_getfqdn = socket.getfqdn 1166 socket.getfqdn = mock_socket.getfqdn 1167 self.serv_evt = threading.Event() 1168 self.client_evt = threading.Event() 1169 # Pick a random unused port by passing 0 for the port number 1170 self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), 1171 decode_data=False, 1172 enable_SMTPUTF8=True) 1173 # Keep a note of what port was assigned 1174 self.port = self.serv.socket.getsockname()[1] 1175 serv_args = (self.serv, self.serv_evt, self.client_evt) 1176 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1177 self.thread.start() 1178 1179 # wait until server thread has assigned a port number 1180 self.serv_evt.wait() 1181 self.serv_evt.clear() 1182 1183 def tearDown(self): 1184 socket.getfqdn = self.real_getfqdn 1185 # indicate that the client is finished 1186 self.client_evt.set() 1187 # wait for the server thread to terminate 1188 self.serv_evt.wait() 1189 self.thread.join() 1190 1191 def test_test_server_supports_extensions(self): 1192 smtp = smtplib.SMTP( 1193 HOST, self.port, local_hostname='localhost', timeout=3) 1194 self.addCleanup(smtp.close) 1195 smtp.ehlo() 1196 self.assertTrue(smtp.does_esmtp) 1197 self.assertTrue(smtp.has_extn('smtputf8')) 1198 1199 def test_send_unicode_with_SMTPUTF8_via_sendmail(self): 1200 m = 'a test message containing unicode!'.encode('utf-8') 1201 smtp = smtplib.SMTP( 1202 HOST, self.port, local_hostname='localhost', timeout=3) 1203 self.addCleanup(smtp.close) 1204 smtp.sendmail('Jhn', 'Slly', m, 1205 mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1206 self.assertEqual(self.serv.last_mailfrom, 'Jhn') 1207 self.assertEqual(self.serv.last_rcpttos, ['Slly']) 1208 self.assertEqual(self.serv.last_message, m) 1209 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1210 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1211 self.assertEqual(self.serv.last_rcpt_options, []) 1212 1213 def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): 1214 m = 'a test message containing unicode!'.encode('utf-8') 1215 smtp = smtplib.SMTP( 1216 HOST, self.port, local_hostname='localhost', timeout=3) 1217 self.addCleanup(smtp.close) 1218 smtp.ehlo() 1219 self.assertEqual( 1220 smtp.mail('J', options=['BODY=8BITMIME', 'SMTPUTF8']), 1221 (250, b'OK')) 1222 self.assertEqual(smtp.rcpt('Jnos'), (250, b'OK')) 1223 self.assertEqual(smtp.data(m), (250, b'OK')) 1224 self.assertEqual(self.serv.last_mailfrom, 'J') 1225 self.assertEqual(self.serv.last_rcpttos, ['Jnos']) 1226 self.assertEqual(self.serv.last_message, m) 1227 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1228 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1229 self.assertEqual(self.serv.last_rcpt_options, []) 1230 1231 def test_send_message_uses_smtputf8_if_addrs_non_ascii(self): 1232 msg = EmailMessage() 1233 msg['From'] = "Polo <fo (at] bar.com>" 1234 msg['To'] = 'Dinsdale' 1235 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1236 # XXX I don't know why I need two \n's here, but this is an existing 1237 # bug (if it is one) and not a problem with the new functionality. 1238 msg.set_content("oh l l, know what I mean, know what I mean?\n\n") 1239 # XXX smtpd converts received /r/n to /n, so we can't easily test that 1240 # we are successfully sending /r/n :(. 1241 expected = textwrap.dedent("""\ 1242 From: Polo <fo (at] bar.com> 1243 To: Dinsdale 1244 Subject: Nudge nudge, wink, wink \u1F609 1245 Content-Type: text/plain; charset="utf-8" 1246 Content-Transfer-Encoding: 8bit 1247 MIME-Version: 1.0 1248 1249 oh l l, know what I mean, know what I mean? 1250 """) 1251 smtp = smtplib.SMTP( 1252 HOST, self.port, local_hostname='localhost', timeout=3) 1253 self.addCleanup(smtp.close) 1254 self.assertEqual(smtp.send_message(msg), {}) 1255 self.assertEqual(self.serv.last_mailfrom, 'fo (at] bar.com') 1256 self.assertEqual(self.serv.last_rcpttos, ['Dinsdale']) 1257 self.assertEqual(self.serv.last_message.decode(), expected) 1258 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1259 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1260 self.assertEqual(self.serv.last_rcpt_options, []) 1261 1262 1263 EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='') 1264 1265 class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel): 1266 def smtp_AUTH(self, arg): 1267 # RFC 4954's AUTH command allows for an optional initial-response. 1268 # Not all AUTH methods support this; some require a challenge. AUTH 1269 # PLAIN does those, so test that here. See issue #15014. 1270 args = arg.split() 1271 if args[0].lower() == 'plain': 1272 if len(args) == 2: 1273 # AUTH PLAIN <initial-response> with the response base 64 1274 # encoded. Hard code the expected response for the test. 1275 if args[1] == EXPECTED_RESPONSE: 1276 self.push('235 Ok') 1277 return 1278 self.push('571 Bad authentication') 1279 1280 class SimSMTPAUTHInitialResponseServer(SimSMTPServer): 1281 channel_class = SimSMTPAUTHInitialResponseChannel 1282 1283 1284 class SMTPAUTHInitialResponseSimTests(unittest.TestCase): 1285 def setUp(self): 1286 self.real_getfqdn = socket.getfqdn 1287 socket.getfqdn = mock_socket.getfqdn 1288 self.serv_evt = threading.Event() 1289 self.client_evt = threading.Event() 1290 # Pick a random unused port by passing 0 for the port number 1291 self.serv = SimSMTPAUTHInitialResponseServer( 1292 (HOST, 0), ('nowhere', -1), decode_data=True) 1293 # Keep a note of what port was assigned 1294 self.port = self.serv.socket.getsockname()[1] 1295 serv_args = (self.serv, self.serv_evt, self.client_evt) 1296 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1297 self.thread.start() 1298 1299 # wait until server thread has assigned a port number 1300 self.serv_evt.wait() 1301 self.serv_evt.clear() 1302 1303 def tearDown(self): 1304 socket.getfqdn = self.real_getfqdn 1305 # indicate that the client is finished 1306 self.client_evt.set() 1307 # wait for the server thread to terminate 1308 self.serv_evt.wait() 1309 self.thread.join() 1310 1311 def testAUTH_PLAIN_initial_response_login(self): 1312 self.serv.add_feature('AUTH PLAIN') 1313 smtp = smtplib.SMTP(HOST, self.port, 1314 local_hostname='localhost', timeout=15) 1315 smtp.login('psu', 'doesnotexist') 1316 smtp.close() 1317 1318 def testAUTH_PLAIN_initial_response_auth(self): 1319 self.serv.add_feature('AUTH PLAIN') 1320 smtp = smtplib.SMTP(HOST, self.port, 1321 local_hostname='localhost', timeout=15) 1322 smtp.user = 'psu' 1323 smtp.password = 'doesnotexist' 1324 code, response = smtp.auth('plain', smtp.auth_plain) 1325 smtp.close() 1326 self.assertEqual(code, 235) 1327 1328 1329 if __name__ == '__main__': 1330 unittest.main() 1331