Home | History | Annotate | Download | only in test_email
      1 import unittest
      2 from test.test_email import TestEmailBase, parameterize
      3 import textwrap
      4 from email import policy
      5 from email.message import EmailMessage
      6 from email.contentmanager import ContentManager, raw_data_manager
      7 
      8 
      9 @parameterize
     10 class TestContentManager(TestEmailBase):
     11 
     12     policy = policy.default
     13     message = EmailMessage
     14 
     15     get_key_params = {
     16         'full_type':        (1, 'text/plain',),
     17         'maintype_only':    (2, 'text',),
     18         'null_key':         (3, '',),
     19         }
     20 
     21     def get_key_as_get_content_key(self, order, key):
     22         def foo_getter(msg, foo=None):
     23             bar = msg['X-Bar-Header']
     24             return foo, bar
     25         cm = ContentManager()
     26         cm.add_get_handler(key, foo_getter)
     27         m = self._make_message()
     28         m['Content-Type'] = 'text/plain'
     29         m['X-Bar-Header'] = 'foo'
     30         self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo'))
     31 
     32     def get_key_as_get_content_key_order(self, order, key):
     33         def bar_getter(msg):
     34             return msg['X-Bar-Header']
     35         def foo_getter(msg):
     36             return msg['X-Foo-Header']
     37         cm = ContentManager()
     38         cm.add_get_handler(key, foo_getter)
     39         for precedence, key in self.get_key_params.values():
     40             if precedence > order:
     41                 cm.add_get_handler(key, bar_getter)
     42         m = self._make_message()
     43         m['Content-Type'] = 'text/plain'
     44         m['X-Bar-Header'] = 'bar'
     45         m['X-Foo-Header'] = 'foo'
     46         self.assertEqual(cm.get_content(m), ('foo'))
     47 
     48     def test_get_content_raises_if_unknown_mimetype_and_no_default(self):
     49         cm = ContentManager()
     50         m = self._make_message()
     51         m['Content-Type'] = 'text/plain'
     52         with self.assertRaisesRegex(KeyError, 'text/plain'):
     53             cm.get_content(m)
     54 
     55     class BaseThing(str):
     56         pass
     57     baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing'
     58     class Thing(BaseThing):
     59         pass
     60     testobject_full_path = __name__ + '.' + 'TestContentManager.Thing'
     61 
     62     set_key_params = {
     63         'type':             (0,  Thing,),
     64         'full_path':        (1,  testobject_full_path,),
     65         'qualname':         (2,  'TestContentManager.Thing',),
     66         'name':             (3,  'Thing',),
     67         'base_type':        (4,  BaseThing,),
     68         'base_full_path':   (5,  baseobject_full_path,),
     69         'base_qualname':    (6,  'TestContentManager.BaseThing',),
     70         'base_name':        (7,  'BaseThing',),
     71         'str_type':         (8,  str,),
     72         'str_full_path':    (9,  'builtins.str',),
     73         'str_name':         (10, 'str',),   # str name and qualname are the same
     74         'null_key':         (11, None,),
     75         }
     76 
     77     def set_key_as_set_content_key(self, order, key):
     78         def foo_setter(msg, obj, foo=None):
     79             msg['X-Foo-Header'] = foo
     80             msg.set_payload(obj)
     81         cm = ContentManager()
     82         cm.add_set_handler(key, foo_setter)
     83         m = self._make_message()
     84         msg_obj = self.Thing()
     85         cm.set_content(m, msg_obj, foo='bar')
     86         self.assertEqual(m['X-Foo-Header'], 'bar')
     87         self.assertEqual(m.get_payload(), msg_obj)
     88 
     89     def set_key_as_set_content_key_order(self, order, key):
     90         def foo_setter(msg, obj):
     91             msg['X-FooBar-Header'] = 'foo'
     92             msg.set_payload(obj)
     93         def bar_setter(msg, obj):
     94             msg['X-FooBar-Header'] = 'bar'
     95         cm = ContentManager()
     96         cm.add_set_handler(key, foo_setter)
     97         for precedence, key in self.get_key_params.values():
     98             if precedence > order:
     99                 cm.add_set_handler(key, bar_setter)
    100         m = self._make_message()
    101         msg_obj = self.Thing()
    102         cm.set_content(m, msg_obj)
    103         self.assertEqual(m['X-FooBar-Header'], 'foo')
    104         self.assertEqual(m.get_payload(), msg_obj)
    105 
    106     def test_set_content_raises_if_unknown_type_and_no_default(self):
    107         cm = ContentManager()
    108         m = self._make_message()
    109         msg_obj = self.Thing()
    110         with self.assertRaisesRegex(KeyError, self.testobject_full_path):
    111             cm.set_content(m, msg_obj)
    112 
    113     def test_set_content_raises_if_called_on_multipart(self):
    114         cm = ContentManager()
    115         m = self._make_message()
    116         m['Content-Type'] = 'multipart/foo'
    117         with self.assertRaises(TypeError):
    118             cm.set_content(m, 'test')
    119 
    120     def test_set_content_calls_clear_content(self):
    121         m = self._make_message()
    122         m['Content-Foo'] = 'bar'
    123         m['Content-Type'] = 'text/html'
    124         m['To'] = 'test'
    125         m.set_payload('abc')
    126         cm = ContentManager()
    127         cm.add_set_handler(str, lambda *args, **kw: None)
    128         m.set_content('xyz', content_manager=cm)
    129         self.assertIsNone(m['Content-Foo'])
    130         self.assertIsNone(m['Content-Type'])
    131         self.assertEqual(m['To'], 'test')
    132         self.assertIsNone(m.get_payload())
    133 
    134 
    135 @parameterize
    136 class TestRawDataManager(TestEmailBase):
    137     # Note: these tests are dependent on the order in which headers are added
    138     # to the message objects by the code.  There's no defined ordering in
    139     # RFC5322/MIME, so this makes the tests more fragile than the standards
    140     # require.  However, if the header order changes it is best to understand
    141     # *why*, and make sure it isn't a subtle bug in whatever change was
    142     # applied.
    143 
    144     policy = policy.default.clone(max_line_length=60,
    145                                   content_manager=raw_data_manager)
    146     message = EmailMessage
    147 
    148     def test_get_text_plain(self):
    149         m = self._str_msg(textwrap.dedent("""\
    150             Content-Type: text/plain
    151 
    152             Basic text.
    153             """))
    154         self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n")
    155 
    156     def test_get_text_html(self):
    157         m = self._str_msg(textwrap.dedent("""\
    158             Content-Type: text/html
    159 
    160             <p>Basic text.</p>
    161             """))
    162         self.assertEqual(raw_data_manager.get_content(m),
    163                          "<p>Basic text.</p>\n")
    164 
    165     def test_get_text_plain_latin1(self):
    166         m = self._bytes_msg(textwrap.dedent("""\
    167             Content-Type: text/plain; charset=latin1
    168 
    169             Basc txt.
    170             """).encode('latin1'))
    171         self.assertEqual(raw_data_manager.get_content(m), "Basc txt.\n")
    172 
    173     def test_get_text_plain_latin1_quoted_printable(self):
    174         m = self._str_msg(textwrap.dedent("""\
    175             Content-Type: text/plain; charset="latin-1"
    176             Content-Transfer-Encoding: quoted-printable
    177 
    178             Bas=ECc t=EBxt.
    179             """))
    180         self.assertEqual(raw_data_manager.get_content(m), "Basc txt.\n")
    181 
    182     def test_get_text_plain_utf8_base64(self):
    183         m = self._str_msg(textwrap.dedent("""\
    184             Content-Type: text/plain; charset="utf8"
    185             Content-Transfer-Encoding: base64
    186 
    187             QmFzw6xjIHTDq3h0Lgo=
    188             """))
    189         self.assertEqual(raw_data_manager.get_content(m), "Basc txt.\n")
    190 
    191     def test_get_text_plain_bad_utf8_quoted_printable(self):
    192         m = self._str_msg(textwrap.dedent("""\
    193             Content-Type: text/plain; charset="utf8"
    194             Content-Transfer-Encoding: quoted-printable
    195 
    196             Bas=c3=acc t=c3=abxt=fd.
    197             """))
    198         self.assertEqual(raw_data_manager.get_content(m), "Basc txt.\n")
    199 
    200     def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self):
    201         m = self._str_msg(textwrap.dedent("""\
    202             Content-Type: text/plain; charset="utf8"
    203             Content-Transfer-Encoding: quoted-printable
    204 
    205             Bas=c3=acc t=c3=abxt=fd.
    206             """))
    207         self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
    208                          "Basc txt.\n")
    209 
    210     def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self):
    211         m = self._str_msg(textwrap.dedent("""\
    212             Content-Type: text/plain; charset="utf8"
    213             Content-Transfer-Encoding: base64
    214 
    215             QmFzw6xjIHTDq3h0Lgo\xFF=
    216             """))
    217         self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
    218                          "Basc txt.\n")
    219 
    220     def test_get_text_invalid_keyword(self):
    221         m = self._str_msg(textwrap.dedent("""\
    222             Content-Type: text/plain
    223 
    224             Basic text.
    225             """))
    226         with self.assertRaises(TypeError):
    227             raw_data_manager.get_content(m, foo='ignore')
    228 
    229     def test_get_non_text(self):
    230         template = textwrap.dedent("""\
    231             Content-Type: {}
    232             Content-Transfer-Encoding: base64
    233 
    234             Ym9ndXMgZGF0YQ==
    235             """)
    236         for maintype in 'audio image video application'.split():
    237             with self.subTest(maintype=maintype):
    238                 m = self._str_msg(template.format(maintype+'/foo'))
    239                 self.assertEqual(raw_data_manager.get_content(m), b"bogus data")
    240 
    241     def test_get_non_text_invalid_keyword(self):
    242         m = self._str_msg(textwrap.dedent("""\
    243             Content-Type: image/jpg
    244             Content-Transfer-Encoding: base64
    245 
    246             Ym9ndXMgZGF0YQ==
    247             """))
    248         with self.assertRaises(TypeError):
    249             raw_data_manager.get_content(m, errors='ignore')
    250 
    251     def test_get_raises_on_multipart(self):
    252         m = self._str_msg(textwrap.dedent("""\
    253             Content-Type: multipart/mixed; boundary="==="
    254 
    255             --===
    256             --===--
    257             """))
    258         with self.assertRaises(KeyError):
    259             raw_data_manager.get_content(m)
    260 
    261     def test_get_message_rfc822_and_external_body(self):
    262         template = textwrap.dedent("""\
    263             Content-Type: message/{}
    264 
    265             To: foo (at] example.com
    266             From: bar (at] example.com
    267             Subject: example
    268 
    269             an example message
    270             """)
    271         for subtype in 'rfc822 external-body'.split():
    272             with self.subTest(subtype=subtype):
    273                 m = self._str_msg(template.format(subtype))
    274                 sub_msg = raw_data_manager.get_content(m)
    275                 self.assertIsInstance(sub_msg, self.message)
    276                 self.assertEqual(raw_data_manager.get_content(sub_msg),
    277                                  "an example message\n")
    278                 self.assertEqual(sub_msg['to'], 'foo (at] example.com')
    279                 self.assertEqual(sub_msg['from'].addresses[0].username, 'bar')
    280 
    281     def test_get_message_non_rfc822_or_external_body_yields_bytes(self):
    282         m = self._str_msg(textwrap.dedent("""\
    283             Content-Type: message/partial
    284 
    285             To: foo (at] example.com
    286             From: bar (at] example.com
    287             Subject: example
    288 
    289             The real body is in another message.
    290             """))
    291         self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex')
    292 
    293     def test_set_text_plain(self):
    294         m = self._make_message()
    295         content = "Simple message.\n"
    296         raw_data_manager.set_content(m, content)
    297         self.assertEqual(str(m), textwrap.dedent("""\
    298             Content-Type: text/plain; charset="utf-8"
    299             Content-Transfer-Encoding: 7bit
    300 
    301             Simple message.
    302             """))
    303         self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
    304         self.assertEqual(m.get_content(), content)
    305 
    306     def test_set_text_html(self):
    307         m = self._make_message()
    308         content = "<p>Simple message.</p>\n"
    309         raw_data_manager.set_content(m, content, subtype='html')
    310         self.assertEqual(str(m), textwrap.dedent("""\
    311             Content-Type: text/html; charset="utf-8"
    312             Content-Transfer-Encoding: 7bit
    313 
    314             <p>Simple message.</p>
    315             """))
    316         self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
    317         self.assertEqual(m.get_content(), content)
    318 
    319     def test_set_text_charset_latin_1(self):
    320         m = self._make_message()
    321         content = "Simple message.\n"
    322         raw_data_manager.set_content(m, content, charset='latin-1')
    323         self.assertEqual(str(m), textwrap.dedent("""\
    324             Content-Type: text/plain; charset="iso-8859-1"
    325             Content-Transfer-Encoding: 7bit
    326 
    327             Simple message.
    328             """))
    329         self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
    330         self.assertEqual(m.get_content(), content)
    331 
    332     def test_set_text_short_line_minimal_non_ascii_heuristics(self):
    333         m = self._make_message()
    334         content = "et l il est mont sur moi et il commence  m'to.\n"
    335         raw_data_manager.set_content(m, content)
    336         self.assertEqual(bytes(m), textwrap.dedent("""\
    337             Content-Type: text/plain; charset="utf-8"
    338             Content-Transfer-Encoding: 8bit
    339 
    340             et l il est mont sur moi et il commence  m'to.
    341             """).encode('utf-8'))
    342         self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
    343         self.assertEqual(m.get_content(), content)
    344 
    345     def test_set_text_long_line_minimal_non_ascii_heuristics(self):
    346         m = self._make_message()
    347         content = ("j'ai un problme de python. il est sorti de son"
    348                    " vivarium.  et l il est mont sur moi et il commence"
    349                    "  m'to.\n")
    350         raw_data_manager.set_content(m, content)
    351         self.assertEqual(bytes(m), textwrap.dedent("""\
    352             Content-Type: text/plain; charset="utf-8"
    353             Content-Transfer-Encoding: quoted-printable
    354 
    355             j'ai un probl=C3=A8me de python. il est sorti de son vivari=
    356             um.  et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
    357             =C3=A0 m'=C3=A9to.
    358             """).encode('utf-8'))
    359         self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
    360         self.assertEqual(m.get_content(), content)
    361 
    362     def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self):
    363         m = self._make_message()
    364         content = '\n'*10 + (
    365                   "j'ai un problme de python. il est sorti de son"
    366                   " vivarium.  et l il est mont sur moi et il commence"
    367                   "  m'to.\n")
    368         raw_data_manager.set_content(m, content)
    369         self.assertEqual(bytes(m), textwrap.dedent("""\
    370             Content-Type: text/plain; charset="utf-8"
    371             Content-Transfer-Encoding: quoted-printable
    372             """ + '\n'*10 + """
    373             j'ai un probl=C3=A8me de python. il est sorti de son vivari=
    374             um.  et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
    375             =C3=A0 m'=C3=A9to.
    376             """).encode('utf-8'))
    377         self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
    378         self.assertEqual(m.get_content(), content)
    379 
    380     def test_set_text_maximal_non_ascii_heuristics(self):
    381         m = self._make_message()
    382         content = ".\n"
    383         raw_data_manager.set_content(m, content)
    384         self.assertEqual(bytes(m), textwrap.dedent("""\
    385             Content-Type: text/plain; charset="utf-8"
    386             Content-Transfer-Encoding: 8bit
    387 
    388             .
    389             """).encode('utf-8'))
    390         self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
    391         self.assertEqual(m.get_content(), content)
    392 
    393     def test_set_text_11_lines_maximal_non_ascii_heuristics(self):
    394         m = self._make_message()
    395         content = '\n'*10 + ".\n"
    396         raw_data_manager.set_content(m, content)
    397         self.assertEqual(bytes(m), textwrap.dedent("""\
    398             Content-Type: text/plain; charset="utf-8"
    399             Content-Transfer-Encoding: 8bit
    400             """ + '\n'*10 + """
    401             .
    402             """).encode('utf-8'))
    403         self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
    404         self.assertEqual(m.get_content(), content)
    405 
    406     def test_set_text_long_line_maximal_non_ascii_heuristics(self):
    407         m = self._make_message()
    408         content = (""
    409                    ""
    410                    ".\n")
    411         raw_data_manager.set_content(m, content)
    412         self.assertEqual(bytes(m), textwrap.dedent("""\
    413             Content-Type: text/plain; charset="utf-8"
    414             Content-Transfer-Encoding: base64
    415 
    416             w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD
    417             tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo
    418             xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD
    419             qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg
    420             w6TDqcOoxJnDtsWRLgo=
    421             """).encode('utf-8'))
    422         self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
    423         self.assertEqual(m.get_content(), content)
    424 
    425     def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self):
    426         # Yes, it chooses "wrong" here.  It's a heuristic.  So this result
    427         # could change if we come up with a better heuristic.
    428         m = self._make_message()
    429         content = ('\n'*10 +
    430                    ""
    431                    ""
    432                    ".\n")
    433         raw_data_manager.set_content(m, "\n"*10 +
    434                                         ""
    435                                         ""
    436                                         ".\n")
    437         self.assertEqual(bytes(m), textwrap.dedent("""\
    438             Content-Type: text/plain; charset="utf-8"
    439             Content-Transfer-Encoding: quoted-printable
    440             """ + '\n'*10 + """
    441             =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=
    442             =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=
    443             =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=
    444             =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=
    445             =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=
    446             =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=
    447             =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=
    448             =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=
    449             =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=
    450             =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=
    451             =C5=91.
    452             """).encode('utf-8'))
    453         self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
    454         self.assertEqual(m.get_content(), content)
    455 
    456     def test_set_text_non_ascii_with_cte_7bit_raises(self):
    457         m = self._make_message()
    458         with self.assertRaises(UnicodeError):
    459             raw_data_manager.set_content(m,".\n", cte='7bit')
    460 
    461     def test_set_text_non_ascii_with_charset_ascii_raises(self):
    462         m = self._make_message()
    463         with self.assertRaises(UnicodeError):
    464             raw_data_manager.set_content(m,".\n", charset='ascii')
    465 
    466     def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self):
    467         m = self._make_message()
    468         with self.assertRaises(UnicodeError):
    469             raw_data_manager.set_content(m,".\n", cte='7bit', charset='ascii')
    470 
    471     def test_set_message(self):
    472         m = self._make_message()
    473         m['Subject'] = "Forwarded message"
    474         content = self._make_message()
    475         content['To'] = 'python (at] vivarium.org'
    476         content['From'] = 'police (at] monty.org'
    477         content['Subject'] = "get back in your box"
    478         content.set_content("Or face the comfy chair.")
    479         raw_data_manager.set_content(m, content)
    480         self.assertEqual(str(m), textwrap.dedent("""\
    481             Subject: Forwarded message
    482             Content-Type: message/rfc822
    483             Content-Transfer-Encoding: 8bit
    484 
    485             To: python (at] vivarium.org
    486             From: police (at] monty.org
    487             Subject: get back in your box
    488             Content-Type: text/plain; charset="utf-8"
    489             Content-Transfer-Encoding: 7bit
    490             MIME-Version: 1.0
    491 
    492             Or face the comfy chair.
    493             """))
    494         payload = m.get_payload(0)
    495         self.assertIsInstance(payload, self.message)
    496         self.assertEqual(str(payload), str(content))
    497         self.assertIsInstance(m.get_content(), self.message)
    498         self.assertEqual(str(m.get_content()), str(content))
    499 
    500     def test_set_message_with_non_ascii_and_coercion_to_7bit(self):
    501         m = self._make_message()
    502         m['Subject'] = "Escape report"
    503         content = self._make_message()
    504         content['To'] = 'police (at] monty.org'
    505         content['From'] = 'victim (at] monty.org'
    506         content['Subject'] = "Help"
    507         content.set_content("j'ai un problme de python. il est sorti de son"
    508                             " vivarium.")
    509         raw_data_manager.set_content(m, content)
    510         self.assertEqual(bytes(m), textwrap.dedent("""\
    511             Subject: Escape report
    512             Content-Type: message/rfc822
    513             Content-Transfer-Encoding: 8bit
    514 
    515             To: police (at] monty.org
    516             From: victim (at] monty.org
    517             Subject: Help
    518             Content-Type: text/plain; charset="utf-8"
    519             Content-Transfer-Encoding: 8bit
    520             MIME-Version: 1.0
    521 
    522             j'ai un problme de python. il est sorti de son vivarium.
    523             """).encode('utf-8'))
    524         # The choice of base64 for the body encoding is because generator
    525         # doesn't bother with heuristics and uses it unconditionally for utf-8
    526         # text.
    527         # XXX: the first cte should be 7bit, too...that's a generator bug.
    528         # XXX: the line length in the body also looks like a generator bug.
    529         self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length),
    530                          textwrap.dedent("""\
    531             Subject: Escape report
    532             Content-Type: message/rfc822
    533             Content-Transfer-Encoding: 8bit
    534 
    535             To: police (at] monty.org
    536             From: victim (at] monty.org
    537             Subject: Help
    538             Content-Type: text/plain; charset="utf-8"
    539             Content-Transfer-Encoding: base64
    540             MIME-Version: 1.0
    541 
    542             aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt
    543             Lgo=
    544             """))
    545         self.assertIsInstance(m.get_content(), self.message)
    546         self.assertEqual(str(m.get_content()), str(content))
    547 
    548     def test_set_message_invalid_cte_raises(self):
    549         m = self._make_message()
    550         content = self._make_message()
    551         for cte in 'quoted-printable base64'.split():
    552             for subtype in 'rfc822 external-body'.split():
    553                 with self.subTest(cte=cte, subtype=subtype):
    554                     with self.assertRaises(ValueError) as ar:
    555                         m.set_content(content, subtype, cte=cte)
    556                     exc = str(ar.exception)
    557                     self.assertIn(cte, exc)
    558                     self.assertIn(subtype, exc)
    559         subtype = 'external-body'
    560         for cte in '8bit binary'.split():
    561             with self.subTest(cte=cte, subtype=subtype):
    562                 with self.assertRaises(ValueError) as ar:
    563                     m.set_content(content, subtype, cte=cte)
    564                 exc = str(ar.exception)
    565                 self.assertIn(cte, exc)
    566                 self.assertIn(subtype, exc)
    567 
    568     def test_set_image_jpg(self):
    569         for content in (b"bogus content",
    570                         bytearray(b"bogus content"),
    571                         memoryview(b"bogus content")):
    572             with self.subTest(content=content):
    573                 m = self._make_message()
    574                 raw_data_manager.set_content(m, content, 'image', 'jpeg')
    575                 self.assertEqual(str(m), textwrap.dedent("""\
    576                     Content-Type: image/jpeg
    577                     Content-Transfer-Encoding: base64
    578 
    579                     Ym9ndXMgY29udGVudA==
    580                     """))
    581                 self.assertEqual(m.get_payload(decode=True), content)
    582                 self.assertEqual(m.get_content(), content)
    583 
    584     def test_set_audio_aif_with_quoted_printable_cte(self):
    585         # Why you would use qp, I don't know, but it is technically supported.
    586         # XXX: the incorrect line length is because binascii.b2a_qp doesn't
    587         # support a line length parameter, but we must use it to get newline
    588         # encoding.
    589         # XXX: what about that lack of tailing newline?  Do we actually handle
    590         # that correctly in all cases?  That is, if the *source* has an
    591         # unencoded newline, do we add an extra newline to the returned payload
    592         # or not?  And can that actually be disambiguated based on the RFC?
    593         m = self._make_message()
    594         content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
    595         m.set_content(content, 'audio', 'aif', cte='quoted-printable')
    596         self.assertEqual(bytes(m), textwrap.dedent("""\
    597             Content-Type: audio/aif
    598             Content-Transfer-Encoding: quoted-printable
    599             MIME-Version: 1.0
    600 
    601             b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz=
    602             zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1'))
    603         self.assertEqual(m.get_payload(decode=True), content)
    604         self.assertEqual(m.get_content(), content)
    605 
    606     def test_set_video_mpeg_with_binary_cte(self):
    607         m = self._make_message()
    608         content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
    609         m.set_content(content, 'video', 'mpeg', cte='binary')
    610         self.assertEqual(bytes(m), textwrap.dedent("""\
    611             Content-Type: video/mpeg
    612             Content-Transfer-Encoding: binary
    613             MIME-Version: 1.0
    614 
    615             """).encode('ascii') +
    616             # XXX: the second \n ought to be a \r, but generator gets it wrong.
    617             # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE.
    618             b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' +
    619             b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz')
    620         self.assertEqual(m.get_payload(decode=True), content)
    621         self.assertEqual(m.get_content(), content)
    622 
    623     def test_set_application_octet_stream_with_8bit_cte(self):
    624         # In 8bit mode, universal line end logic applies.  It is up to the
    625         # application to make sure the lines are short enough; we don't check.
    626         m = self._make_message()
    627         content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n'
    628         m.set_content(content, 'application', 'octet-stream', cte='8bit')
    629         self.assertEqual(bytes(m), textwrap.dedent("""\
    630             Content-Type: application/octet-stream
    631             Content-Transfer-Encoding: 8bit
    632             MIME-Version: 1.0
    633 
    634             """).encode('ascii') +
    635             b'b\xFFgus\tcon\nt\nent\n' +
    636             b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n')
    637         self.assertEqual(m.get_payload(decode=True), content)
    638         self.assertEqual(m.get_content(), content)
    639 
    640     def test_set_headers_from_header_objects(self):
    641         m = self._make_message()
    642         content = "Simple message.\n"
    643         header_factory = self.policy.header_factory
    644         raw_data_manager.set_content(m, content, headers=(
    645             header_factory("To", "foo (at] example.com"),
    646             header_factory("From", "foo (at] example.com"),
    647             header_factory("Subject", "I'm talking to myself.")))
    648         self.assertEqual(str(m), textwrap.dedent("""\
    649             Content-Type: text/plain; charset="utf-8"
    650             To: foo (at] example.com
    651             From: foo (at] example.com
    652             Subject: I'm talking to myself.
    653             Content-Transfer-Encoding: 7bit
    654 
    655             Simple message.
    656             """))
    657 
    658     def test_set_headers_from_strings(self):
    659         m = self._make_message()
    660         content = "Simple message.\n"
    661         raw_data_manager.set_content(m, content, headers=(
    662             "X-Foo-Header: foo",
    663             "X-Bar-Header: bar",))
    664         self.assertEqual(str(m), textwrap.dedent("""\
    665             Content-Type: text/plain; charset="utf-8"
    666             X-Foo-Header: foo
    667             X-Bar-Header: bar
    668             Content-Transfer-Encoding: 7bit
    669 
    670             Simple message.
    671             """))
    672 
    673     def test_set_headers_with_invalid_duplicate_string_header_raises(self):
    674         m = self._make_message()
    675         content = "Simple message.\n"
    676         with self.assertRaisesRegex(ValueError, 'Content-Type'):
    677             raw_data_manager.set_content(m, content, headers=(
    678                 "Content-Type: foo/bar",)
    679                 )
    680 
    681     def test_set_headers_with_invalid_duplicate_header_header_raises(self):
    682         m = self._make_message()
    683         content = "Simple message.\n"
    684         header_factory = self.policy.header_factory
    685         with self.assertRaisesRegex(ValueError, 'Content-Type'):
    686             raw_data_manager.set_content(m, content, headers=(
    687                 header_factory("Content-Type", " foo/bar"),)
    688                 )
    689 
    690     def test_set_headers_with_defective_string_header_raises(self):
    691         m = self._make_message()
    692         content = "Simple message.\n"
    693         with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
    694             raw_data_manager.set_content(m, content, headers=(
    695                 'To: a@fairly@@invalid@address',)
    696                 )
    697             print(m['To'].defects)
    698 
    699     def test_set_headers_with_defective_header_header_raises(self):
    700         m = self._make_message()
    701         content = "Simple message.\n"
    702         header_factory = self.policy.header_factory
    703         with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
    704             raw_data_manager.set_content(m, content, headers=(
    705                 header_factory('To', 'a@fairly@@invalid@address'),)
    706                 )
    707             print(m['To'].defects)
    708 
    709     def test_set_disposition_inline(self):
    710         m = self._make_message()
    711         m.set_content('foo', disposition='inline')
    712         self.assertEqual(m['Content-Disposition'], 'inline')
    713 
    714     def test_set_disposition_attachment(self):
    715         m = self._make_message()
    716         m.set_content('foo', disposition='attachment')
    717         self.assertEqual(m['Content-Disposition'], 'attachment')
    718 
    719     def test_set_disposition_foo(self):
    720         m = self._make_message()
    721         m.set_content('foo', disposition='foo')
    722         self.assertEqual(m['Content-Disposition'], 'foo')
    723 
    724     # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that
    725     # would cause 'foo' above to raise.
    726 
    727     def test_set_filename(self):
    728         m = self._make_message()
    729         m.set_content('foo', filename='bar.txt')
    730         self.assertEqual(m['Content-Disposition'],
    731                          'attachment; filename="bar.txt"')
    732 
    733     def test_set_filename_and_disposition_inline(self):
    734         m = self._make_message()
    735         m.set_content('foo', disposition='inline', filename='bar.txt')
    736         self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"')
    737 
    738     def test_set_non_ascii_filename(self):
    739         m = self._make_message()
    740         m.set_content('foo', filename='br.txt')
    741         self.assertEqual(bytes(m), textwrap.dedent("""\
    742             Content-Type: text/plain; charset="utf-8"
    743             Content-Transfer-Encoding: 7bit
    744             Content-Disposition: attachment;
    745              filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt
    746             MIME-Version: 1.0
    747 
    748             foo
    749             """).encode('ascii'))
    750 
    751     content_object_params = {
    752         'text_plain': ('content', ()),
    753         'text_html': ('content', ('html',)),
    754         'application_octet_stream': (b'content',
    755                                      ('application', 'octet_stream')),
    756         'image_jpeg': (b'content', ('image', 'jpeg')),
    757         'message_rfc822': (message(), ()),
    758         'message_external_body': (message(), ('external-body',)),
    759         }
    760 
    761     def content_object_as_header_receiver(self, obj, mimetype):
    762         m = self._make_message()
    763         m.set_content(obj, *mimetype, headers=(
    764             'To: foo (at] example.com',
    765             'From: bar (at] simple.net'))
    766         self.assertEqual(m['to'], 'foo (at] example.com')
    767         self.assertEqual(m['from'], 'bar (at] simple.net')
    768 
    769     def content_object_as_disposition_inline_receiver(self, obj, mimetype):
    770         m = self._make_message()
    771         m.set_content(obj, *mimetype, disposition='inline')
    772         self.assertEqual(m['Content-Disposition'], 'inline')
    773 
    774     def content_object_as_non_ascii_filename_receiver(self, obj, mimetype):
    775         m = self._make_message()
    776         m.set_content(obj, *mimetype, disposition='inline', filename='br.txt')
    777         self.assertEqual(m['Content-Disposition'], 'inline; filename="br.txt"')
    778         self.assertEqual(m.get_filename(), "br.txt")
    779         self.assertEqual(m['Content-Disposition'].params['filename'], "br.txt")
    780 
    781     def content_object_as_cid_receiver(self, obj, mimetype):
    782         m = self._make_message()
    783         m.set_content(obj, *mimetype, cid='some_random_stuff')
    784         self.assertEqual(m['Content-ID'], 'some_random_stuff')
    785 
    786     def content_object_as_params_receiver(self, obj, mimetype):
    787         m = self._make_message()
    788         params = {'foo': 'br', 'abc': 'xyz'}
    789         m.set_content(obj, *mimetype, params=params)
    790         if isinstance(obj, str):
    791             params['charset'] = 'utf-8'
    792         self.assertEqual(m['Content-Type'].params, params)
    793 
    794 
    795 if __name__ == '__main__':
    796     unittest.main()
    797