Home | History | Annotate | Download | only in test
      1 import unittest
      2 import textwrap
      3 from test import support, mock_socket
      4 import socket
      5 import io
      6 import smtpd
      7 import asyncore
      8 
      9 
     10 class DummyServer(smtpd.SMTPServer):
     11     def __init__(self, *args, **kwargs):
     12         smtpd.SMTPServer.__init__(self, *args, **kwargs)
     13         self.messages = []
     14         if self._decode_data:
     15             self.return_status = 'return status'
     16         else:
     17             self.return_status = b'return status'
     18 
     19     def process_message(self, peer, mailfrom, rcpttos, data, **kw):
     20         self.messages.append((peer, mailfrom, rcpttos, data))
     21         if data == self.return_status:
     22             return '250 Okish'
     23         if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']:
     24             return '250 SMTPUTF8 message okish'
     25 
     26 
     27 class DummyDispatcherBroken(Exception):
     28     pass
     29 
     30 
     31 class BrokenDummyServer(DummyServer):
     32     def listen(self, num):
     33         raise DummyDispatcherBroken()
     34 
     35 
     36 class SMTPDServerTest(unittest.TestCase):
     37     def setUp(self):
     38         smtpd.socket = asyncore.socket = mock_socket
     39 
     40     def test_process_message_unimplemented(self):
     41         server = smtpd.SMTPServer((support.HOST, 0), ('b', 0),
     42                                   decode_data=True)
     43         conn, addr = server.accept()
     44         channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
     45 
     46         def write_line(line):
     47             channel.socket.queue_recv(line)
     48             channel.handle_read()
     49 
     50         write_line(b'HELO example')
     51         write_line(b'MAIL From:eggs@example')
     52         write_line(b'RCPT To:spam@example')
     53         write_line(b'DATA')
     54         self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')
     55 
     56     def test_decode_data_and_enable_SMTPUTF8_raises(self):
     57         self.assertRaises(
     58             ValueError,
     59             smtpd.SMTPServer,
     60             (support.HOST, 0),
     61             ('b', 0),
     62             enable_SMTPUTF8=True,
     63             decode_data=True)
     64 
     65     def tearDown(self):
     66         asyncore.close_all()
     67         asyncore.socket = smtpd.socket = socket
     68 
     69 
     70 class DebuggingServerTest(unittest.TestCase):
     71 
     72     def setUp(self):
     73         smtpd.socket = asyncore.socket = mock_socket
     74 
     75     def send_data(self, channel, data, enable_SMTPUTF8=False):
     76         def write_line(line):
     77             channel.socket.queue_recv(line)
     78             channel.handle_read()
     79         write_line(b'EHLO example')
     80         if enable_SMTPUTF8:
     81             write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8')
     82         else:
     83             write_line(b'MAIL From:eggs@example')
     84         write_line(b'RCPT To:spam@example')
     85         write_line(b'DATA')
     86         write_line(data)
     87         write_line(b'.')
     88 
     89     def test_process_message_with_decode_data_true(self):
     90         server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
     91                                        decode_data=True)
     92         conn, addr = server.accept()
     93         channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
     94         with support.captured_stdout() as s:
     95             self.send_data(channel, b'From: test\n\nhello\n')
     96         stdout = s.getvalue()
     97         self.assertEqual(stdout, textwrap.dedent("""\
     98              ---------- MESSAGE FOLLOWS ----------
     99              From: test
    100              X-Peer: peer-address
    101 
    102              hello
    103              ------------ END MESSAGE ------------
    104              """))
    105 
    106     def test_process_message_with_decode_data_false(self):
    107         server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0))
    108         conn, addr = server.accept()
    109         channel = smtpd.SMTPChannel(server, conn, addr)
    110         with support.captured_stdout() as s:
    111             self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
    112         stdout = s.getvalue()
    113         self.assertEqual(stdout, textwrap.dedent("""\
    114              ---------- MESSAGE FOLLOWS ----------
    115              b'From: test'
    116              b'X-Peer: peer-address'
    117              b''
    118              b'h\\xc3\\xa9llo\\xff'
    119              ------------ END MESSAGE ------------
    120              """))
    121 
    122     def test_process_message_with_enable_SMTPUTF8_true(self):
    123         server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
    124                                        enable_SMTPUTF8=True)
    125         conn, addr = server.accept()
    126         channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
    127         with support.captured_stdout() as s:
    128             self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
    129         stdout = s.getvalue()
    130         self.assertEqual(stdout, textwrap.dedent("""\
    131              ---------- MESSAGE FOLLOWS ----------
    132              b'From: test'
    133              b'X-Peer: peer-address'
    134              b''
    135              b'h\\xc3\\xa9llo\\xff'
    136              ------------ END MESSAGE ------------
    137              """))
    138 
    139     def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):
    140         server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
    141                                        enable_SMTPUTF8=True)
    142         conn, addr = server.accept()
    143         channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
    144         with support.captured_stdout() as s:
    145             self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n',
    146                            enable_SMTPUTF8=True)
    147         stdout = s.getvalue()
    148         self.assertEqual(stdout, textwrap.dedent("""\
    149              ---------- MESSAGE FOLLOWS ----------
    150              mail options: ['BODY=8BITMIME', 'SMTPUTF8']
    151              b'From: test'
    152              b'X-Peer: peer-address'
    153              b''
    154              b'h\\xc3\\xa9llo\\xff'
    155              ------------ END MESSAGE ------------
    156              """))
    157 
    158     def tearDown(self):
    159         asyncore.close_all()
    160         asyncore.socket = smtpd.socket = socket
    161 
    162 
    163 class TestFamilyDetection(unittest.TestCase):
    164     def setUp(self):
    165         smtpd.socket = asyncore.socket = mock_socket
    166 
    167     def tearDown(self):
    168         asyncore.close_all()
    169         asyncore.socket = smtpd.socket = socket
    170 
    171     @unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
    172     def test_socket_uses_IPv6(self):
    173         server = smtpd.SMTPServer((support.HOSTv6, 0), (support.HOST, 0))
    174         self.assertEqual(server.socket.family, socket.AF_INET6)
    175 
    176     def test_socket_uses_IPv4(self):
    177         server = smtpd.SMTPServer((support.HOST, 0), (support.HOSTv6, 0))
    178         self.assertEqual(server.socket.family, socket.AF_INET)
    179 
    180 
    181 class TestRcptOptionParsing(unittest.TestCase):
    182     error_response = (b'555 RCPT TO parameters not recognized or not '
    183                       b'implemented\r\n')
    184 
    185     def setUp(self):
    186         smtpd.socket = asyncore.socket = mock_socket
    187         self.old_debugstream = smtpd.DEBUGSTREAM
    188         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
    189 
    190     def tearDown(self):
    191         asyncore.close_all()
    192         asyncore.socket = smtpd.socket = socket
    193         smtpd.DEBUGSTREAM = self.old_debugstream
    194 
    195     def write_line(self, channel, line):
    196         channel.socket.queue_recv(line)
    197         channel.handle_read()
    198 
    199     def test_params_rejected(self):
    200         server = DummyServer((support.HOST, 0), ('b', 0))
    201         conn, addr = server.accept()
    202         channel = smtpd.SMTPChannel(server, conn, addr)
    203         self.write_line(channel, b'EHLO example')
    204         self.write_line(channel, b'MAIL from: <foo (at] example.com> size=20')
    205         self.write_line(channel, b'RCPT to: <foo (at] example.com> foo=bar')
    206         self.assertEqual(channel.socket.last, self.error_response)
    207 
    208     def test_nothing_accepted(self):
    209         server = DummyServer((support.HOST, 0), ('b', 0))
    210         conn, addr = server.accept()
    211         channel = smtpd.SMTPChannel(server, conn, addr)
    212         self.write_line(channel, b'EHLO example')
    213         self.write_line(channel, b'MAIL from: <foo (at] example.com> size=20')
    214         self.write_line(channel, b'RCPT to: <foo (at] example.com>')
    215         self.assertEqual(channel.socket.last, b'250 OK\r\n')
    216 
    217 
    218 class TestMailOptionParsing(unittest.TestCase):
    219     error_response = (b'555 MAIL FROM parameters not recognized or not '
    220                       b'implemented\r\n')
    221 
    222     def setUp(self):
    223         smtpd.socket = asyncore.socket = mock_socket
    224         self.old_debugstream = smtpd.DEBUGSTREAM
    225         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
    226 
    227     def tearDown(self):
    228         asyncore.close_all()
    229         asyncore.socket = smtpd.socket = socket
    230         smtpd.DEBUGSTREAM = self.old_debugstream
    231 
    232     def write_line(self, channel, line):
    233         channel.socket.queue_recv(line)
    234         channel.handle_read()
    235 
    236     def test_with_decode_data_true(self):
    237         server = DummyServer((support.HOST, 0), ('b', 0), decode_data=True)
    238         conn, addr = server.accept()
    239         channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
    240         self.write_line(channel, b'EHLO example')
    241         for line in [
    242             b'MAIL from: <foo (at] example.com> size=20 SMTPUTF8',
    243             b'MAIL from: <foo (at] example.com> size=20 SMTPUTF8 BODY=8BITMIME',
    244             b'MAIL from: <foo (at] example.com> size=20 BODY=UNKNOWN',
    245             b'MAIL from: <foo (at] example.com> size=20 body=8bitmime',
    246         ]:
    247             self.write_line(channel, line)
    248             self.assertEqual(channel.socket.last, self.error_response)
    249         self.write_line(channel, b'MAIL from: <foo (at] example.com> size=20')
    250         self.assertEqual(channel.socket.last, b'250 OK\r\n')
    251 
    252     def test_with_decode_data_false(self):
    253         server = DummyServer((support.HOST, 0), ('b', 0))
    254         conn, addr = server.accept()
    255         channel = smtpd.SMTPChannel(server, conn, addr)
    256         self.write_line(channel, b'EHLO example')
    257         for line in [
    258             b'MAIL from: <foo (at] example.com> size=20 SMTPUTF8',
    259             b'MAIL from: <foo (at] example.com> size=20 SMTPUTF8 BODY=8BITMIME',
    260         ]:
    261             self.write_line(channel, line)
    262             self.assertEqual(channel.socket.last, self.error_response)
    263         self.write_line(
    264             channel,
    265             b'MAIL from: <foo (at] example.com> size=20 SMTPUTF8 BODY=UNKNOWN')
    266         self.assertEqual(
    267             channel.socket.last,
    268             b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n')
    269         self.write_line(
    270             channel, b'MAIL from: <foo (at] example.com> size=20 body=8bitmime')
    271         self.assertEqual(channel.socket.last, b'250 OK\r\n')
    272 
    273     def test_with_enable_smtputf8_true(self):
    274         server = DummyServer((support.HOST, 0), ('b', 0), enable_SMTPUTF8=True)
    275         conn, addr = server.accept()
    276         channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
    277         self.write_line(channel, b'EHLO example')
    278         self.write_line(
    279             channel,
    280             b'MAIL from: <foo (at] example.com> size=20 body=8bitmime smtputf8')
    281         self.assertEqual(channel.socket.last, b'250 OK\r\n')
    282 
    283 
    284 class SMTPDChannelTest(unittest.TestCase):
    285     def setUp(self):
    286         smtpd.socket = asyncore.socket = mock_socket
    287         self.old_debugstream = smtpd.DEBUGSTREAM
    288         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
    289         self.server = DummyServer((support.HOST, 0), ('b', 0),
    290                                   decode_data=True)
    291         conn, addr = self.server.accept()
    292         self.channel = smtpd.SMTPChannel(self.server, conn, addr,
    293                                          decode_data=True)
    294 
    295     def tearDown(self):
    296         asyncore.close_all()
    297         asyncore.socket = smtpd.socket = socket
    298         smtpd.DEBUGSTREAM = self.old_debugstream
    299 
    300     def write_line(self, line):
    301         self.channel.socket.queue_recv(line)
    302         self.channel.handle_read()
    303 
    304     def test_broken_connect(self):
    305         self.assertRaises(
    306             DummyDispatcherBroken, BrokenDummyServer,
    307             (support.HOST, 0), ('b', 0), decode_data=True)
    308 
    309     def test_decode_data_and_enable_SMTPUTF8_raises(self):
    310         self.assertRaises(
    311             ValueError, smtpd.SMTPChannel,
    312             self.server, self.channel.conn, self.channel.addr,
    313             enable_SMTPUTF8=True, decode_data=True)
    314 
    315     def test_server_accept(self):
    316         self.server.handle_accept()
    317 
    318     def test_missing_data(self):
    319         self.write_line(b'')
    320         self.assertEqual(self.channel.socket.last,
    321                          b'500 Error: bad syntax\r\n')
    322 
    323     def test_EHLO(self):
    324         self.write_line(b'EHLO example')
    325         self.assertEqual(self.channel.socket.last, b'250 HELP\r\n')
    326 
    327     def test_EHLO_bad_syntax(self):
    328         self.write_line(b'EHLO')
    329         self.assertEqual(self.channel.socket.last,
    330                          b'501 Syntax: EHLO hostname\r\n')
    331 
    332     def test_EHLO_duplicate(self):
    333         self.write_line(b'EHLO example')
    334         self.write_line(b'EHLO example')
    335         self.assertEqual(self.channel.socket.last,
    336                          b'503 Duplicate HELO/EHLO\r\n')
    337 
    338     def test_EHLO_HELO_duplicate(self):
    339         self.write_line(b'EHLO example')
    340         self.write_line(b'HELO example')
    341         self.assertEqual(self.channel.socket.last,
    342                          b'503 Duplicate HELO/EHLO\r\n')
    343 
    344     def test_HELO(self):
    345         name = smtpd.socket.getfqdn()
    346         self.write_line(b'HELO example')
    347         self.assertEqual(self.channel.socket.last,
    348                          '250 {}\r\n'.format(name).encode('ascii'))
    349 
    350     def test_HELO_EHLO_duplicate(self):
    351         self.write_line(b'HELO example')
    352         self.write_line(b'EHLO example')
    353         self.assertEqual(self.channel.socket.last,
    354                          b'503 Duplicate HELO/EHLO\r\n')
    355 
    356     def test_HELP(self):
    357         self.write_line(b'HELP')
    358         self.assertEqual(self.channel.socket.last,
    359                          b'250 Supported commands: EHLO HELO MAIL RCPT ' + \
    360                          b'DATA RSET NOOP QUIT VRFY\r\n')
    361 
    362     def test_HELP_command(self):
    363         self.write_line(b'HELP MAIL')
    364         self.assertEqual(self.channel.socket.last,
    365                          b'250 Syntax: MAIL FROM: <address>\r\n')
    366 
    367     def test_HELP_command_unknown(self):
    368         self.write_line(b'HELP SPAM')
    369         self.assertEqual(self.channel.socket.last,
    370                          b'501 Supported commands: EHLO HELO MAIL RCPT ' + \
    371                          b'DATA RSET NOOP QUIT VRFY\r\n')
    372 
    373     def test_HELO_bad_syntax(self):
    374         self.write_line(b'HELO')
    375         self.assertEqual(self.channel.socket.last,
    376                          b'501 Syntax: HELO hostname\r\n')
    377 
    378     def test_HELO_duplicate(self):
    379         self.write_line(b'HELO example')
    380         self.write_line(b'HELO example')
    381         self.assertEqual(self.channel.socket.last,
    382                          b'503 Duplicate HELO/EHLO\r\n')
    383 
    384     def test_HELO_parameter_rejected_when_extensions_not_enabled(self):
    385         self.extended_smtp = False
    386         self.write_line(b'HELO example')
    387         self.write_line(b'MAIL from:<foo (at] example.com> SIZE=1234')
    388         self.assertEqual(self.channel.socket.last,
    389                          b'501 Syntax: MAIL FROM: <address>\r\n')
    390 
    391     def test_MAIL_allows_space_after_colon(self):
    392         self.write_line(b'HELO example')
    393         self.write_line(b'MAIL from:   <foo (at] example.com>')
    394         self.assertEqual(self.channel.socket.last,
    395                          b'250 OK\r\n')
    396 
    397     def test_extended_MAIL_allows_space_after_colon(self):
    398         self.write_line(b'EHLO example')
    399         self.write_line(b'MAIL from:   <foo (at] example.com> size=20')
    400         self.assertEqual(self.channel.socket.last,
    401                          b'250 OK\r\n')
    402 
    403     def test_NOOP(self):
    404         self.write_line(b'NOOP')
    405         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    406 
    407     def test_HELO_NOOP(self):
    408         self.write_line(b'HELO example')
    409         self.write_line(b'NOOP')
    410         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    411 
    412     def test_NOOP_bad_syntax(self):
    413         self.write_line(b'NOOP hi')
    414         self.assertEqual(self.channel.socket.last,
    415                          b'501 Syntax: NOOP\r\n')
    416 
    417     def test_QUIT(self):
    418         self.write_line(b'QUIT')
    419         self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
    420 
    421     def test_HELO_QUIT(self):
    422         self.write_line(b'HELO example')
    423         self.write_line(b'QUIT')
    424         self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
    425 
    426     def test_QUIT_arg_ignored(self):
    427         self.write_line(b'QUIT bye bye')
    428         self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
    429 
    430     def test_bad_state(self):
    431         self.channel.smtp_state = 'BAD STATE'
    432         self.write_line(b'HELO example')
    433         self.assertEqual(self.channel.socket.last,
    434                          b'451 Internal confusion\r\n')
    435 
    436     def test_command_too_long(self):
    437         self.write_line(b'HELO example')
    438         self.write_line(b'MAIL from: ' +
    439                         b'a' * self.channel.command_size_limit +
    440                         b'@example')
    441         self.assertEqual(self.channel.socket.last,
    442                          b'500 Error: line too long\r\n')
    443 
    444     def test_MAIL_command_limit_extended_with_SIZE(self):
    445         self.write_line(b'EHLO example')
    446         fill_len = self.channel.command_size_limit - len('MAIL from:<@example>')
    447         self.write_line(b'MAIL from:<' +
    448                         b'a' * fill_len +
    449                         b'@example> SIZE=1234')
    450         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    451 
    452         self.write_line(b'MAIL from:<' +
    453                         b'a' * (fill_len + 26) +
    454                         b'@example> SIZE=1234')
    455         self.assertEqual(self.channel.socket.last,
    456                          b'500 Error: line too long\r\n')
    457 
    458     def test_MAIL_command_rejects_SMTPUTF8_by_default(self):
    459         self.write_line(b'EHLO example')
    460         self.write_line(
    461             b'MAIL from: <naive (at] example.com> BODY=8BITMIME SMTPUTF8')
    462         self.assertEqual(self.channel.socket.last[0:1], b'5')
    463 
    464     def test_data_longer_than_default_data_size_limit(self):
    465         # Hack the default so we don't have to generate so much data.
    466         self.channel.data_size_limit = 1048
    467         self.write_line(b'HELO example')
    468         self.write_line(b'MAIL From:eggs@example')
    469         self.write_line(b'RCPT To:spam@example')
    470         self.write_line(b'DATA')
    471         self.write_line(b'A' * self.channel.data_size_limit +
    472                         b'A\r\n.')
    473         self.assertEqual(self.channel.socket.last,
    474                          b'552 Error: Too much mail data\r\n')
    475 
    476     def test_MAIL_size_parameter(self):
    477         self.write_line(b'EHLO example')
    478         self.write_line(b'MAIL FROM:<eggs@example> SIZE=512')
    479         self.assertEqual(self.channel.socket.last,
    480                          b'250 OK\r\n')
    481 
    482     def test_MAIL_invalid_size_parameter(self):
    483         self.write_line(b'EHLO example')
    484         self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid')
    485         self.assertEqual(self.channel.socket.last,
    486             b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')
    487 
    488     def test_MAIL_RCPT_unknown_parameters(self):
    489         self.write_line(b'EHLO example')
    490         self.write_line(b'MAIL FROM:<eggs@example> ham=green')
    491         self.assertEqual(self.channel.socket.last,
    492             b'555 MAIL FROM parameters not recognized or not implemented\r\n')
    493 
    494         self.write_line(b'MAIL FROM:<eggs@example>')
    495         self.write_line(b'RCPT TO:<eggs@example> ham=green')
    496         self.assertEqual(self.channel.socket.last,
    497             b'555 RCPT TO parameters not recognized or not implemented\r\n')
    498 
    499     def test_MAIL_size_parameter_larger_than_default_data_size_limit(self):
    500         self.channel.data_size_limit = 1048
    501         self.write_line(b'EHLO example')
    502         self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096')
    503         self.assertEqual(self.channel.socket.last,
    504             b'552 Error: message size exceeds fixed maximum message size\r\n')
    505 
    506     def test_need_MAIL(self):
    507         self.write_line(b'HELO example')
    508         self.write_line(b'RCPT to:spam@example')
    509         self.assertEqual(self.channel.socket.last,
    510             b'503 Error: need MAIL command\r\n')
    511 
    512     def test_MAIL_syntax_HELO(self):
    513         self.write_line(b'HELO example')
    514         self.write_line(b'MAIL from eggs@example')
    515         self.assertEqual(self.channel.socket.last,
    516             b'501 Syntax: MAIL FROM: <address>\r\n')
    517 
    518     def test_MAIL_syntax_EHLO(self):
    519         self.write_line(b'EHLO example')
    520         self.write_line(b'MAIL from eggs@example')
    521         self.assertEqual(self.channel.socket.last,
    522             b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')
    523 
    524     def test_MAIL_missing_address(self):
    525         self.write_line(b'HELO example')
    526         self.write_line(b'MAIL from:')
    527         self.assertEqual(self.channel.socket.last,
    528             b'501 Syntax: MAIL FROM: <address>\r\n')
    529 
    530     def test_MAIL_chevrons(self):
    531         self.write_line(b'HELO example')
    532         self.write_line(b'MAIL from:<eggs@example>')
    533         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    534 
    535     def test_MAIL_empty_chevrons(self):
    536         self.write_line(b'EHLO example')
    537         self.write_line(b'MAIL from:<>')
    538         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    539 
    540     def test_MAIL_quoted_localpart(self):
    541         self.write_line(b'EHLO example')
    542         self.write_line(b'MAIL from: <"Fred Blogs"@example.com>')
    543         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    544         self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
    545 
    546     def test_MAIL_quoted_localpart_no_angles(self):
    547         self.write_line(b'EHLO example')
    548         self.write_line(b'MAIL from: "Fred Blogs"@example.com')
    549         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    550         self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
    551 
    552     def test_MAIL_quoted_localpart_with_size(self):
    553         self.write_line(b'EHLO example')
    554         self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000')
    555         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    556         self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
    557 
    558     def test_MAIL_quoted_localpart_with_size_no_angles(self):
    559         self.write_line(b'EHLO example')
    560         self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000')
    561         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    562         self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
    563 
    564     def test_nested_MAIL(self):
    565         self.write_line(b'HELO example')
    566         self.write_line(b'MAIL from:eggs@example')
    567         self.write_line(b'MAIL from:spam@example')
    568         self.assertEqual(self.channel.socket.last,
    569             b'503 Error: nested MAIL command\r\n')
    570 
    571     def test_VRFY(self):
    572         self.write_line(b'VRFY eggs@example')
    573         self.assertEqual(self.channel.socket.last,
    574             b'252 Cannot VRFY user, but will accept message and attempt ' + \
    575             b'delivery\r\n')
    576 
    577     def test_VRFY_syntax(self):
    578         self.write_line(b'VRFY')
    579         self.assertEqual(self.channel.socket.last,
    580             b'501 Syntax: VRFY <address>\r\n')
    581 
    582     def test_EXPN_not_implemented(self):
    583         self.write_line(b'EXPN')
    584         self.assertEqual(self.channel.socket.last,
    585             b'502 EXPN not implemented\r\n')
    586 
    587     def test_no_HELO_MAIL(self):
    588         self.write_line(b'MAIL from:<foo (at] example.com>')
    589         self.assertEqual(self.channel.socket.last,
    590                          b'503 Error: send HELO first\r\n')
    591 
    592     def test_need_RCPT(self):
    593         self.write_line(b'HELO example')
    594         self.write_line(b'MAIL From:eggs@example')
    595         self.write_line(b'DATA')
    596         self.assertEqual(self.channel.socket.last,
    597             b'503 Error: need RCPT command\r\n')
    598 
    599     def test_RCPT_syntax_HELO(self):
    600         self.write_line(b'HELO example')
    601         self.write_line(b'MAIL From: eggs@example')
    602         self.write_line(b'RCPT to eggs@example')
    603         self.assertEqual(self.channel.socket.last,
    604             b'501 Syntax: RCPT TO: <address>\r\n')
    605 
    606     def test_RCPT_syntax_EHLO(self):
    607         self.write_line(b'EHLO example')
    608         self.write_line(b'MAIL From: eggs@example')
    609         self.write_line(b'RCPT to eggs@example')
    610         self.assertEqual(self.channel.socket.last,
    611             b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n')
    612 
    613     def test_RCPT_lowercase_to_OK(self):
    614         self.write_line(b'HELO example')
    615         self.write_line(b'MAIL From: eggs@example')
    616         self.write_line(b'RCPT to: <eggs@example>')
    617         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    618 
    619     def test_no_HELO_RCPT(self):
    620         self.write_line(b'RCPT to eggs@example')
    621         self.assertEqual(self.channel.socket.last,
    622                          b'503 Error: send HELO first\r\n')
    623 
    624     def test_data_dialog(self):
    625         self.write_line(b'HELO example')
    626         self.write_line(b'MAIL From:eggs@example')
    627         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    628         self.write_line(b'RCPT To:spam@example')
    629         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    630 
    631         self.write_line(b'DATA')
    632         self.assertEqual(self.channel.socket.last,
    633             b'354 End data with <CR><LF>.<CR><LF>\r\n')
    634         self.write_line(b'data\r\nmore\r\n.')
    635         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    636         self.assertEqual(self.server.messages,
    637             [(('peer-address', 'peer-port'),
    638               'eggs@example',
    639               ['spam@example'],
    640               'data\nmore')])
    641 
    642     def test_DATA_syntax(self):
    643         self.write_line(b'HELO example')
    644         self.write_line(b'MAIL From:eggs@example')
    645         self.write_line(b'RCPT To:spam@example')
    646         self.write_line(b'DATA spam')
    647         self.assertEqual(self.channel.socket.last, b'501 Syntax: DATA\r\n')
    648 
    649     def test_no_HELO_DATA(self):
    650         self.write_line(b'DATA spam')
    651         self.assertEqual(self.channel.socket.last,
    652                          b'503 Error: send HELO first\r\n')
    653 
    654     def test_data_transparency_section_4_5_2(self):
    655         self.write_line(b'HELO example')
    656         self.write_line(b'MAIL From:eggs@example')
    657         self.write_line(b'RCPT To:spam@example')
    658         self.write_line(b'DATA')
    659         self.write_line(b'..\r\n.\r\n')
    660         self.assertEqual(self.channel.received_data, '.')
    661 
    662     def test_multiple_RCPT(self):
    663         self.write_line(b'HELO example')
    664         self.write_line(b'MAIL From:eggs@example')
    665         self.write_line(b'RCPT To:spam@example')
    666         self.write_line(b'RCPT To:ham@example')
    667         self.write_line(b'DATA')
    668         self.write_line(b'data\r\n.')
    669         self.assertEqual(self.server.messages,
    670             [(('peer-address', 'peer-port'),
    671               'eggs@example',
    672               ['spam@example','ham@example'],
    673               'data')])
    674 
    675     def test_manual_status(self):
    676         # checks that the Channel is able to return a custom status message
    677         self.write_line(b'HELO example')
    678         self.write_line(b'MAIL From:eggs@example')
    679         self.write_line(b'RCPT To:spam@example')
    680         self.write_line(b'DATA')
    681         self.write_line(b'return status\r\n.')
    682         self.assertEqual(self.channel.socket.last, b'250 Okish\r\n')
    683 
    684     def test_RSET(self):
    685         self.write_line(b'HELO example')
    686         self.write_line(b'MAIL From:eggs@example')
    687         self.write_line(b'RCPT To:spam@example')
    688         self.write_line(b'RSET')
    689         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    690         self.write_line(b'MAIL From:foo@example')
    691         self.write_line(b'RCPT To:eggs@example')
    692         self.write_line(b'DATA')
    693         self.write_line(b'data\r\n.')
    694         self.assertEqual(self.server.messages,
    695             [(('peer-address', 'peer-port'),
    696                'foo@example',
    697                ['eggs@example'],
    698                'data')])
    699 
    700     def test_HELO_RSET(self):
    701         self.write_line(b'HELO example')
    702         self.write_line(b'RSET')
    703         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    704 
    705     def test_RSET_syntax(self):
    706         self.write_line(b'RSET hi')
    707         self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n')
    708 
    709     def test_unknown_command(self):
    710         self.write_line(b'UNKNOWN_CMD')
    711         self.assertEqual(self.channel.socket.last,
    712                          b'500 Error: command "UNKNOWN_CMD" not ' + \
    713                          b'recognized\r\n')
    714 
    715     def test_attribute_deprecations(self):
    716         with support.check_warnings(('', DeprecationWarning)):
    717             spam = self.channel._SMTPChannel__server
    718         with support.check_warnings(('', DeprecationWarning)):
    719             self.channel._SMTPChannel__server = 'spam'
    720         with support.check_warnings(('', DeprecationWarning)):
    721             spam = self.channel._SMTPChannel__line
    722         with support.check_warnings(('', DeprecationWarning)):
    723             self.channel._SMTPChannel__line = 'spam'
    724         with support.check_warnings(('', DeprecationWarning)):
    725             spam = self.channel._SMTPChannel__state
    726         with support.check_warnings(('', DeprecationWarning)):
    727             self.channel._SMTPChannel__state = 'spam'
    728         with support.check_warnings(('', DeprecationWarning)):
    729             spam = self.channel._SMTPChannel__greeting
    730         with support.check_warnings(('', DeprecationWarning)):
    731             self.channel._SMTPChannel__greeting = 'spam'
    732         with support.check_warnings(('', DeprecationWarning)):
    733             spam = self.channel._SMTPChannel__mailfrom
    734         with support.check_warnings(('', DeprecationWarning)):
    735             self.channel._SMTPChannel__mailfrom = 'spam'
    736         with support.check_warnings(('', DeprecationWarning)):
    737             spam = self.channel._SMTPChannel__rcpttos
    738         with support.check_warnings(('', DeprecationWarning)):
    739             self.channel._SMTPChannel__rcpttos = 'spam'
    740         with support.check_warnings(('', DeprecationWarning)):
    741             spam = self.channel._SMTPChannel__data
    742         with support.check_warnings(('', DeprecationWarning)):
    743             self.channel._SMTPChannel__data = 'spam'
    744         with support.check_warnings(('', DeprecationWarning)):
    745             spam = self.channel._SMTPChannel__fqdn
    746         with support.check_warnings(('', DeprecationWarning)):
    747             self.channel._SMTPChannel__fqdn = 'spam'
    748         with support.check_warnings(('', DeprecationWarning)):
    749             spam = self.channel._SMTPChannel__peer
    750         with support.check_warnings(('', DeprecationWarning)):
    751             self.channel._SMTPChannel__peer = 'spam'
    752         with support.check_warnings(('', DeprecationWarning)):
    753             spam = self.channel._SMTPChannel__conn
    754         with support.check_warnings(('', DeprecationWarning)):
    755             self.channel._SMTPChannel__conn = 'spam'
    756         with support.check_warnings(('', DeprecationWarning)):
    757             spam = self.channel._SMTPChannel__addr
    758         with support.check_warnings(('', DeprecationWarning)):
    759             self.channel._SMTPChannel__addr = 'spam'
    760 
    761 @unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
    762 class SMTPDChannelIPv6Test(SMTPDChannelTest):
    763     def setUp(self):
    764         smtpd.socket = asyncore.socket = mock_socket
    765         self.old_debugstream = smtpd.DEBUGSTREAM
    766         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
    767         self.server = DummyServer((support.HOSTv6, 0), ('b', 0),
    768                                   decode_data=True)
    769         conn, addr = self.server.accept()
    770         self.channel = smtpd.SMTPChannel(self.server, conn, addr,
    771                                          decode_data=True)
    772 
    773 class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
    774 
    775     def setUp(self):
    776         smtpd.socket = asyncore.socket = mock_socket
    777         self.old_debugstream = smtpd.DEBUGSTREAM
    778         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
    779         self.server = DummyServer((support.HOST, 0), ('b', 0),
    780                                   decode_data=True)
    781         conn, addr = self.server.accept()
    782         # Set DATA size limit to 32 bytes for easy testing
    783         self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32,
    784                                          decode_data=True)
    785 
    786     def tearDown(self):
    787         asyncore.close_all()
    788         asyncore.socket = smtpd.socket = socket
    789         smtpd.DEBUGSTREAM = self.old_debugstream
    790 
    791     def write_line(self, line):
    792         self.channel.socket.queue_recv(line)
    793         self.channel.handle_read()
    794 
    795     def test_data_limit_dialog(self):
    796         self.write_line(b'HELO example')
    797         self.write_line(b'MAIL From:eggs@example')
    798         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    799         self.write_line(b'RCPT To:spam@example')
    800         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    801 
    802         self.write_line(b'DATA')
    803         self.assertEqual(self.channel.socket.last,
    804             b'354 End data with <CR><LF>.<CR><LF>\r\n')
    805         self.write_line(b'data\r\nmore\r\n.')
    806         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    807         self.assertEqual(self.server.messages,
    808             [(('peer-address', 'peer-port'),
    809               'eggs@example',
    810               ['spam@example'],
    811               'data\nmore')])
    812 
    813     def test_data_limit_dialog_too_much_data(self):
    814         self.write_line(b'HELO example')
    815         self.write_line(b'MAIL From:eggs@example')
    816         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    817         self.write_line(b'RCPT To:spam@example')
    818         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    819 
    820         self.write_line(b'DATA')
    821         self.assertEqual(self.channel.socket.last,
    822             b'354 End data with <CR><LF>.<CR><LF>\r\n')
    823         self.write_line(b'This message is longer than 32 bytes\r\n.')
    824         self.assertEqual(self.channel.socket.last,
    825                          b'552 Error: Too much mail data\r\n')
    826 
    827 
    828 class SMTPDChannelWithDecodeDataFalse(unittest.TestCase):
    829 
    830     def setUp(self):
    831         smtpd.socket = asyncore.socket = mock_socket
    832         self.old_debugstream = smtpd.DEBUGSTREAM
    833         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
    834         self.server = DummyServer((support.HOST, 0), ('b', 0))
    835         conn, addr = self.server.accept()
    836         self.channel = smtpd.SMTPChannel(self.server, conn, addr)
    837 
    838     def tearDown(self):
    839         asyncore.close_all()
    840         asyncore.socket = smtpd.socket = socket
    841         smtpd.DEBUGSTREAM = self.old_debugstream
    842 
    843     def write_line(self, line):
    844         self.channel.socket.queue_recv(line)
    845         self.channel.handle_read()
    846 
    847     def test_ascii_data(self):
    848         self.write_line(b'HELO example')
    849         self.write_line(b'MAIL From:eggs@example')
    850         self.write_line(b'RCPT To:spam@example')
    851         self.write_line(b'DATA')
    852         self.write_line(b'plain ascii text')
    853         self.write_line(b'.')
    854         self.assertEqual(self.channel.received_data, b'plain ascii text')
    855 
    856     def test_utf8_data(self):
    857         self.write_line(b'HELO example')
    858         self.write_line(b'MAIL From:eggs@example')
    859         self.write_line(b'RCPT To:spam@example')
    860         self.write_line(b'DATA')
    861         self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
    862         self.write_line(b'and some plain ascii')
    863         self.write_line(b'.')
    864         self.assertEqual(
    865             self.channel.received_data,
    866             b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n'
    867                 b'and some plain ascii')
    868 
    869 
    870 class SMTPDChannelWithDecodeDataTrue(unittest.TestCase):
    871 
    872     def setUp(self):
    873         smtpd.socket = asyncore.socket = mock_socket
    874         self.old_debugstream = smtpd.DEBUGSTREAM
    875         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
    876         self.server = DummyServer((support.HOST, 0), ('b', 0),
    877                                   decode_data=True)
    878         conn, addr = self.server.accept()
    879         # Set decode_data to True
    880         self.channel = smtpd.SMTPChannel(self.server, conn, addr,
    881                 decode_data=True)
    882 
    883     def tearDown(self):
    884         asyncore.close_all()
    885         asyncore.socket = smtpd.socket = socket
    886         smtpd.DEBUGSTREAM = self.old_debugstream
    887 
    888     def write_line(self, line):
    889         self.channel.socket.queue_recv(line)
    890         self.channel.handle_read()
    891 
    892     def test_ascii_data(self):
    893         self.write_line(b'HELO example')
    894         self.write_line(b'MAIL From:eggs@example')
    895         self.write_line(b'RCPT To:spam@example')
    896         self.write_line(b'DATA')
    897         self.write_line(b'plain ascii text')
    898         self.write_line(b'.')
    899         self.assertEqual(self.channel.received_data, 'plain ascii text')
    900 
    901     def test_utf8_data(self):
    902         self.write_line(b'HELO example')
    903         self.write_line(b'MAIL From:eggs@example')
    904         self.write_line(b'RCPT To:spam@example')
    905         self.write_line(b'DATA')
    906         self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
    907         self.write_line(b'and some plain ascii')
    908         self.write_line(b'.')
    909         self.assertEqual(
    910             self.channel.received_data,
    911             'utf8 enriched text: \nand some plain ascii')
    912 
    913 
    914 class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase):
    915     def setUp(self):
    916         smtpd.socket = asyncore.socket = mock_socket
    917         self.old_debugstream = smtpd.DEBUGSTREAM
    918         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
    919         self.server = DummyServer((support.HOST, 0), ('b', 0),
    920                                   enable_SMTPUTF8=True)
    921         conn, addr = self.server.accept()
    922         self.channel = smtpd.SMTPChannel(self.server, conn, addr,
    923                                          enable_SMTPUTF8=True)
    924 
    925     def tearDown(self):
    926         asyncore.close_all()
    927         asyncore.socket = smtpd.socket = socket
    928         smtpd.DEBUGSTREAM = self.old_debugstream
    929 
    930     def write_line(self, line):
    931         self.channel.socket.queue_recv(line)
    932         self.channel.handle_read()
    933 
    934     def test_MAIL_command_accepts_SMTPUTF8_when_announced(self):
    935         self.write_line(b'EHLO example')
    936         self.write_line(
    937             'MAIL from: <naive (at] example.com> BODY=8BITMIME SMTPUTF8'.encode(
    938                 'utf-8')
    939         )
    940         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    941 
    942     def test_process_smtputf8_message(self):
    943         self.write_line(b'EHLO example')
    944         for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']:
    945             self.write_line(b'MAIL from: <a@example> ' + mail_parameters)
    946             self.assertEqual(self.channel.socket.last[0:3], b'250')
    947             self.write_line(b'rcpt to:<b (at] example.com>')
    948             self.assertEqual(self.channel.socket.last[0:3], b'250')
    949             self.write_line(b'data')
    950             self.assertEqual(self.channel.socket.last[0:3], b'354')
    951             self.write_line(b'c\r\n.')
    952             if mail_parameters == b'':
    953                 self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    954             else:
    955                 self.assertEqual(self.channel.socket.last,
    956                                  b'250 SMTPUTF8 message okish\r\n')
    957 
    958     def test_utf8_data(self):
    959         self.write_line(b'EHLO example')
    960         self.write_line(
    961             'MAIL From: naive@exampl BODY=8BITMIME SMTPUTF8'.encode('utf-8'))
    962         self.assertEqual(self.channel.socket.last[0:3], b'250')
    963         self.write_line('RCPT To:spm@exampl'.encode('utf-8'))
    964         self.assertEqual(self.channel.socket.last[0:3], b'250')
    965         self.write_line(b'DATA')
    966         self.assertEqual(self.channel.socket.last[0:3], b'354')
    967         self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
    968         self.write_line(b'.')
    969         self.assertEqual(
    970             self.channel.received_data,
    971             b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
    972 
    973     def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self):
    974         self.write_line(b'ehlo example')
    975         fill_len = (512 + 26 + 10) - len('mail from:<@example>')
    976         self.write_line(b'MAIL from:<' +
    977                         b'a' * (fill_len + 1) +
    978                         b'@example>')
    979         self.assertEqual(self.channel.socket.last,
    980                          b'500 Error: line too long\r\n')
    981         self.write_line(b'MAIL from:<' +
    982                         b'a' * fill_len +
    983                         b'@example>')
    984         self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
    985 
    986     def test_multiple_emails_with_extended_command_length(self):
    987         self.write_line(b'ehlo example')
    988         fill_len = (512 + 26 + 10) - len('mail from:<@example>')
    989         for char in [b'a', b'b', b'c']:
    990             self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>')
    991             self.assertEqual(self.channel.socket.last[0:3], b'500')
    992             self.write_line(b'MAIL from:<' + char * fill_len + b'@example>')
    993             self.assertEqual(self.channel.socket.last[0:3], b'250')
    994             self.write_line(b'rcpt to:<hans (at] example.com>')
    995             self.assertEqual(self.channel.socket.last[0:3], b'250')
    996             self.write_line(b'data')
    997             self.assertEqual(self.channel.socket.last[0:3], b'354')
    998             self.write_line(b'test\r\n.')
    999             self.assertEqual(self.channel.socket.last[0:3], b'250')
   1000 
   1001 
   1002 class MiscTestCase(unittest.TestCase):
   1003     def test__all__(self):
   1004         blacklist = {
   1005             "program", "Devnull", "DEBUGSTREAM", "NEWLINE", "COMMASPACE",
   1006             "DATA_SIZE_DEFAULT", "usage", "Options", "parseargs",
   1007 
   1008         }
   1009         support.check__all__(self, smtpd, blacklist=blacklist)
   1010 
   1011 
   1012 if __name__ == "__main__":
   1013     unittest.main()
   1014