1 import unittest 2 import textwrap 3 from test import support, mock_socket 4 import socket 5 import io 6 import smtpd 7 import asyncore 8 9 10 class DummyServer(smtpd.SMTPServer): 11 def __init__(self, *args, **kwargs): 12 smtpd.SMTPServer.__init__(self, *args, **kwargs) 13 self.messages = [] 14 if self._decode_data: 15 self.return_status = 'return status' 16 else: 17 self.return_status = b'return status' 18 19 def process_message(self, peer, mailfrom, rcpttos, data, **kw): 20 self.messages.append((peer, mailfrom, rcpttos, data)) 21 if data == self.return_status: 22 return '250 Okish' 23 if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']: 24 return '250 SMTPUTF8 message okish' 25 26 27 class DummyDispatcherBroken(Exception): 28 pass 29 30 31 class BrokenDummyServer(DummyServer): 32 def listen(self, num): 33 raise DummyDispatcherBroken() 34 35 36 class SMTPDServerTest(unittest.TestCase): 37 def setUp(self): 38 smtpd.socket = asyncore.socket = mock_socket 39 40 def test_process_message_unimplemented(self): 41 server = smtpd.SMTPServer((support.HOST, 0), ('b', 0), 42 decode_data=True) 43 conn, addr = server.accept() 44 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 45 46 def write_line(line): 47 channel.socket.queue_recv(line) 48 channel.handle_read() 49 50 write_line(b'HELO example') 51 write_line(b'MAIL From:eggs@example') 52 write_line(b'RCPT To:spam@example') 53 write_line(b'DATA') 54 self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') 55 56 def test_decode_data_and_enable_SMTPUTF8_raises(self): 57 self.assertRaises( 58 ValueError, 59 smtpd.SMTPServer, 60 (support.HOST, 0), 61 ('b', 0), 62 enable_SMTPUTF8=True, 63 decode_data=True) 64 65 def tearDown(self): 66 asyncore.close_all() 67 asyncore.socket = smtpd.socket = socket 68 69 70 class DebuggingServerTest(unittest.TestCase): 71 72 def setUp(self): 73 smtpd.socket = asyncore.socket = mock_socket 74 75 def send_data(self, channel, data, enable_SMTPUTF8=False): 76 def write_line(line): 77 channel.socket.queue_recv(line) 78 channel.handle_read() 79 write_line(b'EHLO example') 80 if enable_SMTPUTF8: 81 write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8') 82 else: 83 write_line(b'MAIL From:eggs@example') 84 write_line(b'RCPT To:spam@example') 85 write_line(b'DATA') 86 write_line(data) 87 write_line(b'.') 88 89 def test_process_message_with_decode_data_true(self): 90 server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), 91 decode_data=True) 92 conn, addr = server.accept() 93 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 94 with support.captured_stdout() as s: 95 self.send_data(channel, b'From: test\n\nhello\n') 96 stdout = s.getvalue() 97 self.assertEqual(stdout, textwrap.dedent("""\ 98 ---------- MESSAGE FOLLOWS ---------- 99 From: test 100 X-Peer: peer-address 101 102 hello 103 ------------ END MESSAGE ------------ 104 """)) 105 106 def test_process_message_with_decode_data_false(self): 107 server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0)) 108 conn, addr = server.accept() 109 channel = smtpd.SMTPChannel(server, conn, addr) 110 with support.captured_stdout() as s: 111 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') 112 stdout = s.getvalue() 113 self.assertEqual(stdout, textwrap.dedent("""\ 114 ---------- MESSAGE FOLLOWS ---------- 115 b'From: test' 116 b'X-Peer: peer-address' 117 b'' 118 b'h\\xc3\\xa9llo\\xff' 119 ------------ END MESSAGE ------------ 120 """)) 121 122 def test_process_message_with_enable_SMTPUTF8_true(self): 123 server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), 124 enable_SMTPUTF8=True) 125 conn, addr = server.accept() 126 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 127 with support.captured_stdout() as s: 128 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') 129 stdout = s.getvalue() 130 self.assertEqual(stdout, textwrap.dedent("""\ 131 ---------- MESSAGE FOLLOWS ---------- 132 b'From: test' 133 b'X-Peer: peer-address' 134 b'' 135 b'h\\xc3\\xa9llo\\xff' 136 ------------ END MESSAGE ------------ 137 """)) 138 139 def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self): 140 server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), 141 enable_SMTPUTF8=True) 142 conn, addr = server.accept() 143 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 144 with support.captured_stdout() as s: 145 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n', 146 enable_SMTPUTF8=True) 147 stdout = s.getvalue() 148 self.assertEqual(stdout, textwrap.dedent("""\ 149 ---------- MESSAGE FOLLOWS ---------- 150 mail options: ['BODY=8BITMIME', 'SMTPUTF8'] 151 b'From: test' 152 b'X-Peer: peer-address' 153 b'' 154 b'h\\xc3\\xa9llo\\xff' 155 ------------ END MESSAGE ------------ 156 """)) 157 158 def tearDown(self): 159 asyncore.close_all() 160 asyncore.socket = smtpd.socket = socket 161 162 163 class TestFamilyDetection(unittest.TestCase): 164 def setUp(self): 165 smtpd.socket = asyncore.socket = mock_socket 166 167 def tearDown(self): 168 asyncore.close_all() 169 asyncore.socket = smtpd.socket = socket 170 171 @unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled") 172 def test_socket_uses_IPv6(self): 173 server = smtpd.SMTPServer((support.HOSTv6, 0), (support.HOSTv4, 0)) 174 self.assertEqual(server.socket.family, socket.AF_INET6) 175 176 def test_socket_uses_IPv4(self): 177 server = smtpd.SMTPServer((support.HOSTv4, 0), (support.HOSTv6, 0)) 178 self.assertEqual(server.socket.family, socket.AF_INET) 179 180 181 class TestRcptOptionParsing(unittest.TestCase): 182 error_response = (b'555 RCPT TO parameters not recognized or not ' 183 b'implemented\r\n') 184 185 def setUp(self): 186 smtpd.socket = asyncore.socket = mock_socket 187 self.old_debugstream = smtpd.DEBUGSTREAM 188 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 189 190 def tearDown(self): 191 asyncore.close_all() 192 asyncore.socket = smtpd.socket = socket 193 smtpd.DEBUGSTREAM = self.old_debugstream 194 195 def write_line(self, channel, line): 196 channel.socket.queue_recv(line) 197 channel.handle_read() 198 199 def test_params_rejected(self): 200 server = DummyServer((support.HOST, 0), ('b', 0)) 201 conn, addr = server.accept() 202 channel = smtpd.SMTPChannel(server, conn, addr) 203 self.write_line(channel, b'EHLO example') 204 self.write_line(channel, b'MAIL from: <foo (at] example.com> size=20') 205 self.write_line(channel, b'RCPT to: <foo (at] example.com> foo=bar') 206 self.assertEqual(channel.socket.last, self.error_response) 207 208 def test_nothing_accepted(self): 209 server = DummyServer((support.HOST, 0), ('b', 0)) 210 conn, addr = server.accept() 211 channel = smtpd.SMTPChannel(server, conn, addr) 212 self.write_line(channel, b'EHLO example') 213 self.write_line(channel, b'MAIL from: <foo (at] example.com> size=20') 214 self.write_line(channel, b'RCPT to: <foo (at] example.com>') 215 self.assertEqual(channel.socket.last, b'250 OK\r\n') 216 217 218 class TestMailOptionParsing(unittest.TestCase): 219 error_response = (b'555 MAIL FROM parameters not recognized or not ' 220 b'implemented\r\n') 221 222 def setUp(self): 223 smtpd.socket = asyncore.socket = mock_socket 224 self.old_debugstream = smtpd.DEBUGSTREAM 225 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 226 227 def tearDown(self): 228 asyncore.close_all() 229 asyncore.socket = smtpd.socket = socket 230 smtpd.DEBUGSTREAM = self.old_debugstream 231 232 def write_line(self, channel, line): 233 channel.socket.queue_recv(line) 234 channel.handle_read() 235 236 def test_with_decode_data_true(self): 237 server = DummyServer((support.HOST, 0), ('b', 0), decode_data=True) 238 conn, addr = server.accept() 239 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 240 self.write_line(channel, b'EHLO example') 241 for line in [ 242 b'MAIL from: <foo (at] example.com> size=20 SMTPUTF8', 243 b'MAIL from: <foo (at] example.com> size=20 SMTPUTF8 BODY=8BITMIME', 244 b'MAIL from: <foo (at] example.com> size=20 BODY=UNKNOWN', 245 b'MAIL from: <foo (at] example.com> size=20 body=8bitmime', 246 ]: 247 self.write_line(channel, line) 248 self.assertEqual(channel.socket.last, self.error_response) 249 self.write_line(channel, b'MAIL from: <foo (at] example.com> size=20') 250 self.assertEqual(channel.socket.last, b'250 OK\r\n') 251 252 def test_with_decode_data_false(self): 253 server = DummyServer((support.HOST, 0), ('b', 0)) 254 conn, addr = server.accept() 255 channel = smtpd.SMTPChannel(server, conn, addr) 256 self.write_line(channel, b'EHLO example') 257 for line in [ 258 b'MAIL from: <foo (at] example.com> size=20 SMTPUTF8', 259 b'MAIL from: <foo (at] example.com> size=20 SMTPUTF8 BODY=8BITMIME', 260 ]: 261 self.write_line(channel, line) 262 self.assertEqual(channel.socket.last, self.error_response) 263 self.write_line( 264 channel, 265 b'MAIL from: <foo (at] example.com> size=20 SMTPUTF8 BODY=UNKNOWN') 266 self.assertEqual( 267 channel.socket.last, 268 b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n') 269 self.write_line( 270 channel, b'MAIL from: <foo (at] example.com> size=20 body=8bitmime') 271 self.assertEqual(channel.socket.last, b'250 OK\r\n') 272 273 def test_with_enable_smtputf8_true(self): 274 server = DummyServer((support.HOST, 0), ('b', 0), enable_SMTPUTF8=True) 275 conn, addr = server.accept() 276 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 277 self.write_line(channel, b'EHLO example') 278 self.write_line( 279 channel, 280 b'MAIL from: <foo (at] example.com> size=20 body=8bitmime smtputf8') 281 self.assertEqual(channel.socket.last, b'250 OK\r\n') 282 283 284 class SMTPDChannelTest(unittest.TestCase): 285 def setUp(self): 286 smtpd.socket = asyncore.socket = mock_socket 287 self.old_debugstream = smtpd.DEBUGSTREAM 288 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 289 self.server = DummyServer((support.HOST, 0), ('b', 0), 290 decode_data=True) 291 conn, addr = self.server.accept() 292 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 293 decode_data=True) 294 295 def tearDown(self): 296 asyncore.close_all() 297 asyncore.socket = smtpd.socket = socket 298 smtpd.DEBUGSTREAM = self.old_debugstream 299 300 def write_line(self, line): 301 self.channel.socket.queue_recv(line) 302 self.channel.handle_read() 303 304 def test_broken_connect(self): 305 self.assertRaises( 306 DummyDispatcherBroken, BrokenDummyServer, 307 (support.HOST, 0), ('b', 0), decode_data=True) 308 309 def test_decode_data_and_enable_SMTPUTF8_raises(self): 310 self.assertRaises( 311 ValueError, smtpd.SMTPChannel, 312 self.server, self.channel.conn, self.channel.addr, 313 enable_SMTPUTF8=True, decode_data=True) 314 315 def test_server_accept(self): 316 self.server.handle_accept() 317 318 def test_missing_data(self): 319 self.write_line(b'') 320 self.assertEqual(self.channel.socket.last, 321 b'500 Error: bad syntax\r\n') 322 323 def test_EHLO(self): 324 self.write_line(b'EHLO example') 325 self.assertEqual(self.channel.socket.last, b'250 HELP\r\n') 326 327 def test_EHLO_bad_syntax(self): 328 self.write_line(b'EHLO') 329 self.assertEqual(self.channel.socket.last, 330 b'501 Syntax: EHLO hostname\r\n') 331 332 def test_EHLO_duplicate(self): 333 self.write_line(b'EHLO example') 334 self.write_line(b'EHLO example') 335 self.assertEqual(self.channel.socket.last, 336 b'503 Duplicate HELO/EHLO\r\n') 337 338 def test_EHLO_HELO_duplicate(self): 339 self.write_line(b'EHLO example') 340 self.write_line(b'HELO example') 341 self.assertEqual(self.channel.socket.last, 342 b'503 Duplicate HELO/EHLO\r\n') 343 344 def test_HELO(self): 345 name = smtpd.socket.getfqdn() 346 self.write_line(b'HELO example') 347 self.assertEqual(self.channel.socket.last, 348 '250 {}\r\n'.format(name).encode('ascii')) 349 350 def test_HELO_EHLO_duplicate(self): 351 self.write_line(b'HELO example') 352 self.write_line(b'EHLO example') 353 self.assertEqual(self.channel.socket.last, 354 b'503 Duplicate HELO/EHLO\r\n') 355 356 def test_HELP(self): 357 self.write_line(b'HELP') 358 self.assertEqual(self.channel.socket.last, 359 b'250 Supported commands: EHLO HELO MAIL RCPT ' + \ 360 b'DATA RSET NOOP QUIT VRFY\r\n') 361 362 def test_HELP_command(self): 363 self.write_line(b'HELP MAIL') 364 self.assertEqual(self.channel.socket.last, 365 b'250 Syntax: MAIL FROM: <address>\r\n') 366 367 def test_HELP_command_unknown(self): 368 self.write_line(b'HELP SPAM') 369 self.assertEqual(self.channel.socket.last, 370 b'501 Supported commands: EHLO HELO MAIL RCPT ' + \ 371 b'DATA RSET NOOP QUIT VRFY\r\n') 372 373 def test_HELO_bad_syntax(self): 374 self.write_line(b'HELO') 375 self.assertEqual(self.channel.socket.last, 376 b'501 Syntax: HELO hostname\r\n') 377 378 def test_HELO_duplicate(self): 379 self.write_line(b'HELO example') 380 self.write_line(b'HELO example') 381 self.assertEqual(self.channel.socket.last, 382 b'503 Duplicate HELO/EHLO\r\n') 383 384 def test_HELO_parameter_rejected_when_extensions_not_enabled(self): 385 self.extended_smtp = False 386 self.write_line(b'HELO example') 387 self.write_line(b'MAIL from:<foo (at] example.com> SIZE=1234') 388 self.assertEqual(self.channel.socket.last, 389 b'501 Syntax: MAIL FROM: <address>\r\n') 390 391 def test_MAIL_allows_space_after_colon(self): 392 self.write_line(b'HELO example') 393 self.write_line(b'MAIL from: <foo (at] example.com>') 394 self.assertEqual(self.channel.socket.last, 395 b'250 OK\r\n') 396 397 def test_extended_MAIL_allows_space_after_colon(self): 398 self.write_line(b'EHLO example') 399 self.write_line(b'MAIL from: <foo (at] example.com> size=20') 400 self.assertEqual(self.channel.socket.last, 401 b'250 OK\r\n') 402 403 def test_NOOP(self): 404 self.write_line(b'NOOP') 405 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 406 407 def test_HELO_NOOP(self): 408 self.write_line(b'HELO example') 409 self.write_line(b'NOOP') 410 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 411 412 def test_NOOP_bad_syntax(self): 413 self.write_line(b'NOOP hi') 414 self.assertEqual(self.channel.socket.last, 415 b'501 Syntax: NOOP\r\n') 416 417 def test_QUIT(self): 418 self.write_line(b'QUIT') 419 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 420 421 def test_HELO_QUIT(self): 422 self.write_line(b'HELO example') 423 self.write_line(b'QUIT') 424 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 425 426 def test_QUIT_arg_ignored(self): 427 self.write_line(b'QUIT bye bye') 428 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 429 430 def test_bad_state(self): 431 self.channel.smtp_state = 'BAD STATE' 432 self.write_line(b'HELO example') 433 self.assertEqual(self.channel.socket.last, 434 b'451 Internal confusion\r\n') 435 436 def test_command_too_long(self): 437 self.write_line(b'HELO example') 438 self.write_line(b'MAIL from: ' + 439 b'a' * self.channel.command_size_limit + 440 b'@example') 441 self.assertEqual(self.channel.socket.last, 442 b'500 Error: line too long\r\n') 443 444 def test_MAIL_command_limit_extended_with_SIZE(self): 445 self.write_line(b'EHLO example') 446 fill_len = self.channel.command_size_limit - len('MAIL from:<@example>') 447 self.write_line(b'MAIL from:<' + 448 b'a' * fill_len + 449 b'@example> SIZE=1234') 450 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 451 452 self.write_line(b'MAIL from:<' + 453 b'a' * (fill_len + 26) + 454 b'@example> SIZE=1234') 455 self.assertEqual(self.channel.socket.last, 456 b'500 Error: line too long\r\n') 457 458 def test_MAIL_command_rejects_SMTPUTF8_by_default(self): 459 self.write_line(b'EHLO example') 460 self.write_line( 461 b'MAIL from: <naive (at] example.com> BODY=8BITMIME SMTPUTF8') 462 self.assertEqual(self.channel.socket.last[0:1], b'5') 463 464 def test_data_longer_than_default_data_size_limit(self): 465 # Hack the default so we don't have to generate so much data. 466 self.channel.data_size_limit = 1048 467 self.write_line(b'HELO example') 468 self.write_line(b'MAIL From:eggs@example') 469 self.write_line(b'RCPT To:spam@example') 470 self.write_line(b'DATA') 471 self.write_line(b'A' * self.channel.data_size_limit + 472 b'A\r\n.') 473 self.assertEqual(self.channel.socket.last, 474 b'552 Error: Too much mail data\r\n') 475 476 def test_MAIL_size_parameter(self): 477 self.write_line(b'EHLO example') 478 self.write_line(b'MAIL FROM:<eggs@example> SIZE=512') 479 self.assertEqual(self.channel.socket.last, 480 b'250 OK\r\n') 481 482 def test_MAIL_invalid_size_parameter(self): 483 self.write_line(b'EHLO example') 484 self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid') 485 self.assertEqual(self.channel.socket.last, 486 b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') 487 488 def test_MAIL_RCPT_unknown_parameters(self): 489 self.write_line(b'EHLO example') 490 self.write_line(b'MAIL FROM:<eggs@example> ham=green') 491 self.assertEqual(self.channel.socket.last, 492 b'555 MAIL FROM parameters not recognized or not implemented\r\n') 493 494 self.write_line(b'MAIL FROM:<eggs@example>') 495 self.write_line(b'RCPT TO:<eggs@example> ham=green') 496 self.assertEqual(self.channel.socket.last, 497 b'555 RCPT TO parameters not recognized or not implemented\r\n') 498 499 def test_MAIL_size_parameter_larger_than_default_data_size_limit(self): 500 self.channel.data_size_limit = 1048 501 self.write_line(b'EHLO example') 502 self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096') 503 self.assertEqual(self.channel.socket.last, 504 b'552 Error: message size exceeds fixed maximum message size\r\n') 505 506 def test_need_MAIL(self): 507 self.write_line(b'HELO example') 508 self.write_line(b'RCPT to:spam@example') 509 self.assertEqual(self.channel.socket.last, 510 b'503 Error: need MAIL command\r\n') 511 512 def test_MAIL_syntax_HELO(self): 513 self.write_line(b'HELO example') 514 self.write_line(b'MAIL from eggs@example') 515 self.assertEqual(self.channel.socket.last, 516 b'501 Syntax: MAIL FROM: <address>\r\n') 517 518 def test_MAIL_syntax_EHLO(self): 519 self.write_line(b'EHLO example') 520 self.write_line(b'MAIL from eggs@example') 521 self.assertEqual(self.channel.socket.last, 522 b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') 523 524 def test_MAIL_missing_address(self): 525 self.write_line(b'HELO example') 526 self.write_line(b'MAIL from:') 527 self.assertEqual(self.channel.socket.last, 528 b'501 Syntax: MAIL FROM: <address>\r\n') 529 530 def test_MAIL_chevrons(self): 531 self.write_line(b'HELO example') 532 self.write_line(b'MAIL from:<eggs@example>') 533 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 534 535 def test_MAIL_empty_chevrons(self): 536 self.write_line(b'EHLO example') 537 self.write_line(b'MAIL from:<>') 538 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 539 540 def test_MAIL_quoted_localpart(self): 541 self.write_line(b'EHLO example') 542 self.write_line(b'MAIL from: <"Fred Blogs"@example.com>') 543 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 544 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 545 546 def test_MAIL_quoted_localpart_no_angles(self): 547 self.write_line(b'EHLO example') 548 self.write_line(b'MAIL from: "Fred Blogs"@example.com') 549 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 550 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 551 552 def test_MAIL_quoted_localpart_with_size(self): 553 self.write_line(b'EHLO example') 554 self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000') 555 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 556 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 557 558 def test_MAIL_quoted_localpart_with_size_no_angles(self): 559 self.write_line(b'EHLO example') 560 self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000') 561 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 562 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 563 564 def test_nested_MAIL(self): 565 self.write_line(b'HELO example') 566 self.write_line(b'MAIL from:eggs@example') 567 self.write_line(b'MAIL from:spam@example') 568 self.assertEqual(self.channel.socket.last, 569 b'503 Error: nested MAIL command\r\n') 570 571 def test_VRFY(self): 572 self.write_line(b'VRFY eggs@example') 573 self.assertEqual(self.channel.socket.last, 574 b'252 Cannot VRFY user, but will accept message and attempt ' + \ 575 b'delivery\r\n') 576 577 def test_VRFY_syntax(self): 578 self.write_line(b'VRFY') 579 self.assertEqual(self.channel.socket.last, 580 b'501 Syntax: VRFY <address>\r\n') 581 582 def test_EXPN_not_implemented(self): 583 self.write_line(b'EXPN') 584 self.assertEqual(self.channel.socket.last, 585 b'502 EXPN not implemented\r\n') 586 587 def test_no_HELO_MAIL(self): 588 self.write_line(b'MAIL from:<foo (at] example.com>') 589 self.assertEqual(self.channel.socket.last, 590 b'503 Error: send HELO first\r\n') 591 592 def test_need_RCPT(self): 593 self.write_line(b'HELO example') 594 self.write_line(b'MAIL From:eggs@example') 595 self.write_line(b'DATA') 596 self.assertEqual(self.channel.socket.last, 597 b'503 Error: need RCPT command\r\n') 598 599 def test_RCPT_syntax_HELO(self): 600 self.write_line(b'HELO example') 601 self.write_line(b'MAIL From: eggs@example') 602 self.write_line(b'RCPT to eggs@example') 603 self.assertEqual(self.channel.socket.last, 604 b'501 Syntax: RCPT TO: <address>\r\n') 605 606 def test_RCPT_syntax_EHLO(self): 607 self.write_line(b'EHLO example') 608 self.write_line(b'MAIL From: eggs@example') 609 self.write_line(b'RCPT to eggs@example') 610 self.assertEqual(self.channel.socket.last, 611 b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n') 612 613 def test_RCPT_lowercase_to_OK(self): 614 self.write_line(b'HELO example') 615 self.write_line(b'MAIL From: eggs@example') 616 self.write_line(b'RCPT to: <eggs@example>') 617 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 618 619 def test_no_HELO_RCPT(self): 620 self.write_line(b'RCPT to eggs@example') 621 self.assertEqual(self.channel.socket.last, 622 b'503 Error: send HELO first\r\n') 623 624 def test_data_dialog(self): 625 self.write_line(b'HELO example') 626 self.write_line(b'MAIL From:eggs@example') 627 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 628 self.write_line(b'RCPT To:spam@example') 629 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 630 631 self.write_line(b'DATA') 632 self.assertEqual(self.channel.socket.last, 633 b'354 End data with <CR><LF>.<CR><LF>\r\n') 634 self.write_line(b'data\r\nmore\r\n.') 635 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 636 self.assertEqual(self.server.messages, 637 [(('peer-address', 'peer-port'), 638 'eggs@example', 639 ['spam@example'], 640 'data\nmore')]) 641 642 def test_DATA_syntax(self): 643 self.write_line(b'HELO example') 644 self.write_line(b'MAIL From:eggs@example') 645 self.write_line(b'RCPT To:spam@example') 646 self.write_line(b'DATA spam') 647 self.assertEqual(self.channel.socket.last, b'501 Syntax: DATA\r\n') 648 649 def test_no_HELO_DATA(self): 650 self.write_line(b'DATA spam') 651 self.assertEqual(self.channel.socket.last, 652 b'503 Error: send HELO first\r\n') 653 654 def test_data_transparency_section_4_5_2(self): 655 self.write_line(b'HELO example') 656 self.write_line(b'MAIL From:eggs@example') 657 self.write_line(b'RCPT To:spam@example') 658 self.write_line(b'DATA') 659 self.write_line(b'..\r\n.\r\n') 660 self.assertEqual(self.channel.received_data, '.') 661 662 def test_multiple_RCPT(self): 663 self.write_line(b'HELO example') 664 self.write_line(b'MAIL From:eggs@example') 665 self.write_line(b'RCPT To:spam@example') 666 self.write_line(b'RCPT To:ham@example') 667 self.write_line(b'DATA') 668 self.write_line(b'data\r\n.') 669 self.assertEqual(self.server.messages, 670 [(('peer-address', 'peer-port'), 671 'eggs@example', 672 ['spam@example','ham@example'], 673 'data')]) 674 675 def test_manual_status(self): 676 # checks that the Channel is able to return a custom status message 677 self.write_line(b'HELO example') 678 self.write_line(b'MAIL From:eggs@example') 679 self.write_line(b'RCPT To:spam@example') 680 self.write_line(b'DATA') 681 self.write_line(b'return status\r\n.') 682 self.assertEqual(self.channel.socket.last, b'250 Okish\r\n') 683 684 def test_RSET(self): 685 self.write_line(b'HELO example') 686 self.write_line(b'MAIL From:eggs@example') 687 self.write_line(b'RCPT To:spam@example') 688 self.write_line(b'RSET') 689 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 690 self.write_line(b'MAIL From:foo@example') 691 self.write_line(b'RCPT To:eggs@example') 692 self.write_line(b'DATA') 693 self.write_line(b'data\r\n.') 694 self.assertEqual(self.server.messages, 695 [(('peer-address', 'peer-port'), 696 'foo@example', 697 ['eggs@example'], 698 'data')]) 699 700 def test_HELO_RSET(self): 701 self.write_line(b'HELO example') 702 self.write_line(b'RSET') 703 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 704 705 def test_RSET_syntax(self): 706 self.write_line(b'RSET hi') 707 self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n') 708 709 def test_unknown_command(self): 710 self.write_line(b'UNKNOWN_CMD') 711 self.assertEqual(self.channel.socket.last, 712 b'500 Error: command "UNKNOWN_CMD" not ' + \ 713 b'recognized\r\n') 714 715 def test_attribute_deprecations(self): 716 with support.check_warnings(('', DeprecationWarning)): 717 spam = self.channel._SMTPChannel__server 718 with support.check_warnings(('', DeprecationWarning)): 719 self.channel._SMTPChannel__server = 'spam' 720 with support.check_warnings(('', DeprecationWarning)): 721 spam = self.channel._SMTPChannel__line 722 with support.check_warnings(('', DeprecationWarning)): 723 self.channel._SMTPChannel__line = 'spam' 724 with support.check_warnings(('', DeprecationWarning)): 725 spam = self.channel._SMTPChannel__state 726 with support.check_warnings(('', DeprecationWarning)): 727 self.channel._SMTPChannel__state = 'spam' 728 with support.check_warnings(('', DeprecationWarning)): 729 spam = self.channel._SMTPChannel__greeting 730 with support.check_warnings(('', DeprecationWarning)): 731 self.channel._SMTPChannel__greeting = 'spam' 732 with support.check_warnings(('', DeprecationWarning)): 733 spam = self.channel._SMTPChannel__mailfrom 734 with support.check_warnings(('', DeprecationWarning)): 735 self.channel._SMTPChannel__mailfrom = 'spam' 736 with support.check_warnings(('', DeprecationWarning)): 737 spam = self.channel._SMTPChannel__rcpttos 738 with support.check_warnings(('', DeprecationWarning)): 739 self.channel._SMTPChannel__rcpttos = 'spam' 740 with support.check_warnings(('', DeprecationWarning)): 741 spam = self.channel._SMTPChannel__data 742 with support.check_warnings(('', DeprecationWarning)): 743 self.channel._SMTPChannel__data = 'spam' 744 with support.check_warnings(('', DeprecationWarning)): 745 spam = self.channel._SMTPChannel__fqdn 746 with support.check_warnings(('', DeprecationWarning)): 747 self.channel._SMTPChannel__fqdn = 'spam' 748 with support.check_warnings(('', DeprecationWarning)): 749 spam = self.channel._SMTPChannel__peer 750 with support.check_warnings(('', DeprecationWarning)): 751 self.channel._SMTPChannel__peer = 'spam' 752 with support.check_warnings(('', DeprecationWarning)): 753 spam = self.channel._SMTPChannel__conn 754 with support.check_warnings(('', DeprecationWarning)): 755 self.channel._SMTPChannel__conn = 'spam' 756 with support.check_warnings(('', DeprecationWarning)): 757 spam = self.channel._SMTPChannel__addr 758 with support.check_warnings(('', DeprecationWarning)): 759 self.channel._SMTPChannel__addr = 'spam' 760 761 @unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled") 762 class SMTPDChannelIPv6Test(SMTPDChannelTest): 763 def setUp(self): 764 smtpd.socket = asyncore.socket = mock_socket 765 self.old_debugstream = smtpd.DEBUGSTREAM 766 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 767 self.server = DummyServer((support.HOSTv6, 0), ('b', 0), 768 decode_data=True) 769 conn, addr = self.server.accept() 770 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 771 decode_data=True) 772 773 class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): 774 775 def setUp(self): 776 smtpd.socket = asyncore.socket = mock_socket 777 self.old_debugstream = smtpd.DEBUGSTREAM 778 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 779 self.server = DummyServer((support.HOST, 0), ('b', 0), 780 decode_data=True) 781 conn, addr = self.server.accept() 782 # Set DATA size limit to 32 bytes for easy testing 783 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32, 784 decode_data=True) 785 786 def tearDown(self): 787 asyncore.close_all() 788 asyncore.socket = smtpd.socket = socket 789 smtpd.DEBUGSTREAM = self.old_debugstream 790 791 def write_line(self, line): 792 self.channel.socket.queue_recv(line) 793 self.channel.handle_read() 794 795 def test_data_limit_dialog(self): 796 self.write_line(b'HELO example') 797 self.write_line(b'MAIL From:eggs@example') 798 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 799 self.write_line(b'RCPT To:spam@example') 800 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 801 802 self.write_line(b'DATA') 803 self.assertEqual(self.channel.socket.last, 804 b'354 End data with <CR><LF>.<CR><LF>\r\n') 805 self.write_line(b'data\r\nmore\r\n.') 806 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 807 self.assertEqual(self.server.messages, 808 [(('peer-address', 'peer-port'), 809 'eggs@example', 810 ['spam@example'], 811 'data\nmore')]) 812 813 def test_data_limit_dialog_too_much_data(self): 814 self.write_line(b'HELO example') 815 self.write_line(b'MAIL From:eggs@example') 816 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 817 self.write_line(b'RCPT To:spam@example') 818 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 819 820 self.write_line(b'DATA') 821 self.assertEqual(self.channel.socket.last, 822 b'354 End data with <CR><LF>.<CR><LF>\r\n') 823 self.write_line(b'This message is longer than 32 bytes\r\n.') 824 self.assertEqual(self.channel.socket.last, 825 b'552 Error: Too much mail data\r\n') 826 827 828 class SMTPDChannelWithDecodeDataFalse(unittest.TestCase): 829 830 def setUp(self): 831 smtpd.socket = asyncore.socket = mock_socket 832 self.old_debugstream = smtpd.DEBUGSTREAM 833 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 834 self.server = DummyServer((support.HOST, 0), ('b', 0)) 835 conn, addr = self.server.accept() 836 self.channel = smtpd.SMTPChannel(self.server, conn, addr) 837 838 def tearDown(self): 839 asyncore.close_all() 840 asyncore.socket = smtpd.socket = socket 841 smtpd.DEBUGSTREAM = self.old_debugstream 842 843 def write_line(self, line): 844 self.channel.socket.queue_recv(line) 845 self.channel.handle_read() 846 847 def test_ascii_data(self): 848 self.write_line(b'HELO example') 849 self.write_line(b'MAIL From:eggs@example') 850 self.write_line(b'RCPT To:spam@example') 851 self.write_line(b'DATA') 852 self.write_line(b'plain ascii text') 853 self.write_line(b'.') 854 self.assertEqual(self.channel.received_data, b'plain ascii text') 855 856 def test_utf8_data(self): 857 self.write_line(b'HELO example') 858 self.write_line(b'MAIL From:eggs@example') 859 self.write_line(b'RCPT To:spam@example') 860 self.write_line(b'DATA') 861 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 862 self.write_line(b'and some plain ascii') 863 self.write_line(b'.') 864 self.assertEqual( 865 self.channel.received_data, 866 b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n' 867 b'and some plain ascii') 868 869 870 class SMTPDChannelWithDecodeDataTrue(unittest.TestCase): 871 872 def setUp(self): 873 smtpd.socket = asyncore.socket = mock_socket 874 self.old_debugstream = smtpd.DEBUGSTREAM 875 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 876 self.server = DummyServer((support.HOST, 0), ('b', 0), 877 decode_data=True) 878 conn, addr = self.server.accept() 879 # Set decode_data to True 880 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 881 decode_data=True) 882 883 def tearDown(self): 884 asyncore.close_all() 885 asyncore.socket = smtpd.socket = socket 886 smtpd.DEBUGSTREAM = self.old_debugstream 887 888 def write_line(self, line): 889 self.channel.socket.queue_recv(line) 890 self.channel.handle_read() 891 892 def test_ascii_data(self): 893 self.write_line(b'HELO example') 894 self.write_line(b'MAIL From:eggs@example') 895 self.write_line(b'RCPT To:spam@example') 896 self.write_line(b'DATA') 897 self.write_line(b'plain ascii text') 898 self.write_line(b'.') 899 self.assertEqual(self.channel.received_data, 'plain ascii text') 900 901 def test_utf8_data(self): 902 self.write_line(b'HELO example') 903 self.write_line(b'MAIL From:eggs@example') 904 self.write_line(b'RCPT To:spam@example') 905 self.write_line(b'DATA') 906 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 907 self.write_line(b'and some plain ascii') 908 self.write_line(b'.') 909 self.assertEqual( 910 self.channel.received_data, 911 'utf8 enriched text: \nand some plain ascii') 912 913 914 class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase): 915 def setUp(self): 916 smtpd.socket = asyncore.socket = mock_socket 917 self.old_debugstream = smtpd.DEBUGSTREAM 918 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 919 self.server = DummyServer((support.HOST, 0), ('b', 0), 920 enable_SMTPUTF8=True) 921 conn, addr = self.server.accept() 922 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 923 enable_SMTPUTF8=True) 924 925 def tearDown(self): 926 asyncore.close_all() 927 asyncore.socket = smtpd.socket = socket 928 smtpd.DEBUGSTREAM = self.old_debugstream 929 930 def write_line(self, line): 931 self.channel.socket.queue_recv(line) 932 self.channel.handle_read() 933 934 def test_MAIL_command_accepts_SMTPUTF8_when_announced(self): 935 self.write_line(b'EHLO example') 936 self.write_line( 937 'MAIL from: <naive (at] example.com> BODY=8BITMIME SMTPUTF8'.encode( 938 'utf-8') 939 ) 940 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 941 942 def test_process_smtputf8_message(self): 943 self.write_line(b'EHLO example') 944 for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']: 945 self.write_line(b'MAIL from: <a@example> ' + mail_parameters) 946 self.assertEqual(self.channel.socket.last[0:3], b'250') 947 self.write_line(b'rcpt to:<b (at] example.com>') 948 self.assertEqual(self.channel.socket.last[0:3], b'250') 949 self.write_line(b'data') 950 self.assertEqual(self.channel.socket.last[0:3], b'354') 951 self.write_line(b'c\r\n.') 952 if mail_parameters == b'': 953 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 954 else: 955 self.assertEqual(self.channel.socket.last, 956 b'250 SMTPUTF8 message okish\r\n') 957 958 def test_utf8_data(self): 959 self.write_line(b'EHLO example') 960 self.write_line( 961 'MAIL From: naive@exampl BODY=8BITMIME SMTPUTF8'.encode('utf-8')) 962 self.assertEqual(self.channel.socket.last[0:3], b'250') 963 self.write_line('RCPT To:spm@exampl'.encode('utf-8')) 964 self.assertEqual(self.channel.socket.last[0:3], b'250') 965 self.write_line(b'DATA') 966 self.assertEqual(self.channel.socket.last[0:3], b'354') 967 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 968 self.write_line(b'.') 969 self.assertEqual( 970 self.channel.received_data, 971 b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 972 973 def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self): 974 self.write_line(b'ehlo example') 975 fill_len = (512 + 26 + 10) - len('mail from:<@example>') 976 self.write_line(b'MAIL from:<' + 977 b'a' * (fill_len + 1) + 978 b'@example>') 979 self.assertEqual(self.channel.socket.last, 980 b'500 Error: line too long\r\n') 981 self.write_line(b'MAIL from:<' + 982 b'a' * fill_len + 983 b'@example>') 984 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 985 986 def test_multiple_emails_with_extended_command_length(self): 987 self.write_line(b'ehlo example') 988 fill_len = (512 + 26 + 10) - len('mail from:<@example>') 989 for char in [b'a', b'b', b'c']: 990 self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>') 991 self.assertEqual(self.channel.socket.last[0:3], b'500') 992 self.write_line(b'MAIL from:<' + char * fill_len + b'@example>') 993 self.assertEqual(self.channel.socket.last[0:3], b'250') 994 self.write_line(b'rcpt to:<hans (at] example.com>') 995 self.assertEqual(self.channel.socket.last[0:3], b'250') 996 self.write_line(b'data') 997 self.assertEqual(self.channel.socket.last[0:3], b'354') 998 self.write_line(b'test\r\n.') 999 self.assertEqual(self.channel.socket.last[0:3], b'250') 1000 1001 1002 class MiscTestCase(unittest.TestCase): 1003 def test__all__(self): 1004 blacklist = { 1005 "program", "Devnull", "DEBUGSTREAM", "NEWLINE", "COMMASPACE", 1006 "DATA_SIZE_DEFAULT", "usage", "Options", "parseargs", 1007 1008 } 1009 support.check__all__(self, smtpd, blacklist=blacklist) 1010 1011 1012 if __name__ == "__main__": 1013 unittest.main() 1014