Home | History | Annotate | Download | only in test_email
      1 import io
      2 import types
      3 import textwrap
      4 import unittest
      5 import email.policy
      6 import email.parser
      7 import email.generator
      8 import email.message
      9 from email import headerregistry
     10 
     11 def make_defaults(base_defaults, differences):
     12     defaults = base_defaults.copy()
     13     defaults.update(differences)
     14     return defaults
     15 
     16 class PolicyAPITests(unittest.TestCase):
     17 
     18     longMessage = True
     19 
     20     # Base default values.
     21     compat32_defaults = {
     22         'max_line_length':          78,
     23         'linesep':                  '\n',
     24         'cte_type':                 '8bit',
     25         'raise_on_defect':          False,
     26         'mangle_from_':             True,
     27         'message_factory':          None,
     28         }
     29     # These default values are the ones set on email.policy.default.
     30     # If any of these defaults change, the docs must be updated.
     31     policy_defaults = compat32_defaults.copy()
     32     policy_defaults.update({
     33         'utf8':                     False,
     34         'raise_on_defect':          False,
     35         'header_factory':           email.policy.EmailPolicy.header_factory,
     36         'refold_source':            'long',
     37         'content_manager':          email.policy.EmailPolicy.content_manager,
     38         'mangle_from_':             False,
     39         'message_factory':          email.message.EmailMessage,
     40         })
     41 
     42     # For each policy under test, we give here what we expect the defaults to
     43     # be for that policy.  The second argument to make defaults is the
     44     # difference between the base defaults and that for the particular policy.
     45     new_policy = email.policy.EmailPolicy()
     46     policies = {
     47         email.policy.compat32: make_defaults(compat32_defaults, {}),
     48         email.policy.default: make_defaults(policy_defaults, {}),
     49         email.policy.SMTP: make_defaults(policy_defaults,
     50                                          {'linesep': '\r\n'}),
     51         email.policy.SMTPUTF8: make_defaults(policy_defaults,
     52                                              {'linesep': '\r\n',
     53                                               'utf8': True}),
     54         email.policy.HTTP: make_defaults(policy_defaults,
     55                                          {'linesep': '\r\n',
     56                                           'max_line_length': None}),
     57         email.policy.strict: make_defaults(policy_defaults,
     58                                            {'raise_on_defect': True}),
     59         new_policy: make_defaults(policy_defaults, {}),
     60         }
     61     # Creating a new policy creates a new header factory.  There is a test
     62     # later that proves this.
     63     policies[new_policy]['header_factory'] = new_policy.header_factory
     64 
     65     def test_defaults(self):
     66         for policy, expected in self.policies.items():
     67             for attr, value in expected.items():
     68                 with self.subTest(policy=policy, attr=attr):
     69                     self.assertEqual(getattr(policy, attr), value,
     70                                     ("change {} docs/docstrings if defaults have "
     71                                     "changed").format(policy))
     72 
     73     def test_all_attributes_covered(self):
     74         for policy, expected in self.policies.items():
     75             for attr in dir(policy):
     76                 with self.subTest(policy=policy, attr=attr):
     77                     if (attr.startswith('_') or
     78                             isinstance(getattr(email.policy.EmailPolicy, attr),
     79                                   types.FunctionType)):
     80                         continue
     81                     else:
     82                         self.assertIn(attr, expected,
     83                                       "{} is not fully tested".format(attr))
     84 
     85     def test_abc(self):
     86         with self.assertRaises(TypeError) as cm:
     87             email.policy.Policy()
     88         msg = str(cm.exception)
     89         abstract_methods = ('fold',
     90                             'fold_binary',
     91                             'header_fetch_parse',
     92                             'header_source_parse',
     93                             'header_store_parse')
     94         for method in abstract_methods:
     95             self.assertIn(method, msg)
     96 
     97     def test_policy_is_immutable(self):
     98         for policy, defaults in self.policies.items():
     99             for attr in defaults:
    100                 with self.assertRaisesRegex(AttributeError, attr+".*read-only"):
    101                     setattr(policy, attr, None)
    102             with self.assertRaisesRegex(AttributeError, 'no attribute.*foo'):
    103                 policy.foo = None
    104 
    105     def test_set_policy_attrs_when_cloned(self):
    106         # None of the attributes has a default value of None, so we set them
    107         # all to None in the clone call and check that it worked.
    108         for policyclass, defaults in self.policies.items():
    109             testattrdict = {attr: None for attr in defaults}
    110             policy = policyclass.clone(**testattrdict)
    111             for attr in defaults:
    112                 self.assertIsNone(getattr(policy, attr))
    113 
    114     def test_reject_non_policy_keyword_when_called(self):
    115         for policyclass in self.policies:
    116             with self.assertRaises(TypeError):
    117                 policyclass(this_keyword_should_not_be_valid=None)
    118             with self.assertRaises(TypeError):
    119                 policyclass(newtline=None)
    120 
    121     def test_policy_addition(self):
    122         expected = self.policy_defaults.copy()
    123         p1 = email.policy.default.clone(max_line_length=100)
    124         p2 = email.policy.default.clone(max_line_length=50)
    125         added = p1 + p2
    126         expected.update(max_line_length=50)
    127         for attr, value in expected.items():
    128             self.assertEqual(getattr(added, attr), value)
    129         added = p2 + p1
    130         expected.update(max_line_length=100)
    131         for attr, value in expected.items():
    132             self.assertEqual(getattr(added, attr), value)
    133         added = added + email.policy.default
    134         for attr, value in expected.items():
    135             self.assertEqual(getattr(added, attr), value)
    136 
    137     def test_register_defect(self):
    138         class Dummy:
    139             def __init__(self):
    140                 self.defects = []
    141         obj = Dummy()
    142         defect = object()
    143         policy = email.policy.EmailPolicy()
    144         policy.register_defect(obj, defect)
    145         self.assertEqual(obj.defects, [defect])
    146         defect2 = object()
    147         policy.register_defect(obj, defect2)
    148         self.assertEqual(obj.defects, [defect, defect2])
    149 
    150     class MyObj:
    151         def __init__(self):
    152             self.defects = []
    153 
    154     class MyDefect(Exception):
    155         pass
    156 
    157     def test_handle_defect_raises_on_strict(self):
    158         foo = self.MyObj()
    159         defect = self.MyDefect("the telly is broken")
    160         with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
    161             email.policy.strict.handle_defect(foo, defect)
    162 
    163     def test_handle_defect_registers_defect(self):
    164         foo = self.MyObj()
    165         defect1 = self.MyDefect("one")
    166         email.policy.default.handle_defect(foo, defect1)
    167         self.assertEqual(foo.defects, [defect1])
    168         defect2 = self.MyDefect("two")
    169         email.policy.default.handle_defect(foo, defect2)
    170         self.assertEqual(foo.defects, [defect1, defect2])
    171 
    172     class MyPolicy(email.policy.EmailPolicy):
    173         defects = None
    174         def __init__(self, *args, **kw):
    175             super().__init__(*args, defects=[], **kw)
    176         def register_defect(self, obj, defect):
    177             self.defects.append(defect)
    178 
    179     def test_overridden_register_defect_still_raises(self):
    180         foo = self.MyObj()
    181         defect = self.MyDefect("the telly is broken")
    182         with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
    183             self.MyPolicy(raise_on_defect=True).handle_defect(foo, defect)
    184 
    185     def test_overridden_register_defect_works(self):
    186         foo = self.MyObj()
    187         defect1 = self.MyDefect("one")
    188         my_policy = self.MyPolicy()
    189         my_policy.handle_defect(foo, defect1)
    190         self.assertEqual(my_policy.defects, [defect1])
    191         self.assertEqual(foo.defects, [])
    192         defect2 = self.MyDefect("two")
    193         my_policy.handle_defect(foo, defect2)
    194         self.assertEqual(my_policy.defects, [defect1, defect2])
    195         self.assertEqual(foo.defects, [])
    196 
    197     def test_default_header_factory(self):
    198         h = email.policy.default.header_factory('Test', 'test')
    199         self.assertEqual(h.name, 'Test')
    200         self.assertIsInstance(h, headerregistry.UnstructuredHeader)
    201         self.assertIsInstance(h, headerregistry.BaseHeader)
    202 
    203     class Foo:
    204         parse = headerregistry.UnstructuredHeader.parse
    205 
    206     def test_each_Policy_gets_unique_factory(self):
    207         policy1 = email.policy.EmailPolicy()
    208         policy2 = email.policy.EmailPolicy()
    209         policy1.header_factory.map_to_type('foo', self.Foo)
    210         h = policy1.header_factory('foo', 'test')
    211         self.assertIsInstance(h, self.Foo)
    212         self.assertNotIsInstance(h, headerregistry.UnstructuredHeader)
    213         h = policy2.header_factory('foo', 'test')
    214         self.assertNotIsInstance(h, self.Foo)
    215         self.assertIsInstance(h, headerregistry.UnstructuredHeader)
    216 
    217     def test_clone_copies_factory(self):
    218         policy1 = email.policy.EmailPolicy()
    219         policy2 = policy1.clone()
    220         policy1.header_factory.map_to_type('foo', self.Foo)
    221         h = policy1.header_factory('foo', 'test')
    222         self.assertIsInstance(h, self.Foo)
    223         h = policy2.header_factory('foo', 'test')
    224         self.assertIsInstance(h, self.Foo)
    225 
    226     def test_new_factory_overrides_default(self):
    227         mypolicy = email.policy.EmailPolicy()
    228         myfactory = mypolicy.header_factory
    229         newpolicy = mypolicy + email.policy.strict
    230         self.assertEqual(newpolicy.header_factory, myfactory)
    231         newpolicy = email.policy.strict + mypolicy
    232         self.assertEqual(newpolicy.header_factory, myfactory)
    233 
    234     def test_adding_default_policies_preserves_default_factory(self):
    235         newpolicy = email.policy.default + email.policy.strict
    236         self.assertEqual(newpolicy.header_factory,
    237                          email.policy.EmailPolicy.header_factory)
    238         self.assertEqual(newpolicy.__dict__, {'raise_on_defect': True})
    239 
    240     # XXX: Need subclassing tests.
    241     # For adding subclassed objects, make sure the usual rules apply (subclass
    242     # wins), but that the order still works (right overrides left).
    243 
    244 
    245 class TestException(Exception):
    246     pass
    247 
    248 class TestPolicyPropagation(unittest.TestCase):
    249 
    250     # The abstract methods are used by the parser but not by the wrapper
    251     # functions that call it, so if the exception gets raised we know that the
    252     # policy was actually propagated all the way to feedparser.
    253     class MyPolicy(email.policy.Policy):
    254         def badmethod(self, *args, **kw):
    255             raise TestException("test")
    256         fold = fold_binary = header_fetch_parser = badmethod
    257         header_source_parse = header_store_parse = badmethod
    258 
    259     def test_message_from_string(self):
    260         with self.assertRaisesRegex(TestException, "^test$"):
    261             email.message_from_string("Subject: test\n\n",
    262                                       policy=self.MyPolicy)
    263 
    264     def test_message_from_bytes(self):
    265         with self.assertRaisesRegex(TestException, "^test$"):
    266             email.message_from_bytes(b"Subject: test\n\n",
    267                                      policy=self.MyPolicy)
    268 
    269     def test_message_from_file(self):
    270         f = io.StringIO('Subject: test\n\n')
    271         with self.assertRaisesRegex(TestException, "^test$"):
    272             email.message_from_file(f, policy=self.MyPolicy)
    273 
    274     def test_message_from_binary_file(self):
    275         f = io.BytesIO(b'Subject: test\n\n')
    276         with self.assertRaisesRegex(TestException, "^test$"):
    277             email.message_from_binary_file(f, policy=self.MyPolicy)
    278 
    279     # These are redundant, but we need them for black-box completeness.
    280 
    281     def test_parser(self):
    282         p = email.parser.Parser(policy=self.MyPolicy)
    283         with self.assertRaisesRegex(TestException, "^test$"):
    284             p.parsestr('Subject: test\n\n')
    285 
    286     def test_bytes_parser(self):
    287         p = email.parser.BytesParser(policy=self.MyPolicy)
    288         with self.assertRaisesRegex(TestException, "^test$"):
    289             p.parsebytes(b'Subject: test\n\n')
    290 
    291     # Now that we've established that all the parse methods get the
    292     # policy in to feedparser, we can use message_from_string for
    293     # the rest of the propagation tests.
    294 
    295     def _make_msg(self, source='Subject: test\n\n', policy=None):
    296         self.policy = email.policy.default.clone() if policy is None else policy
    297         return email.message_from_string(source, policy=self.policy)
    298 
    299     def test_parser_propagates_policy_to_message(self):
    300         msg = self._make_msg()
    301         self.assertIs(msg.policy, self.policy)
    302 
    303     def test_parser_propagates_policy_to_sub_messages(self):
    304         msg = self._make_msg(textwrap.dedent("""\
    305             Subject: mime test
    306             MIME-Version: 1.0
    307             Content-Type: multipart/mixed, boundary="XXX"
    308 
    309             --XXX
    310             Content-Type: text/plain
    311 
    312             test
    313             --XXX
    314             Content-Type: text/plain
    315 
    316             test2
    317             --XXX--
    318             """))
    319         for part in msg.walk():
    320             self.assertIs(part.policy, self.policy)
    321 
    322     def test_message_policy_propagates_to_generator(self):
    323         msg = self._make_msg("Subject: test\nTo: foo\n\n",
    324                              policy=email.policy.default.clone(linesep='X'))
    325         s = io.StringIO()
    326         g = email.generator.Generator(s)
    327         g.flatten(msg)
    328         self.assertEqual(s.getvalue(), "Subject: testXTo: fooXX")
    329 
    330     def test_message_policy_used_by_as_string(self):
    331         msg = self._make_msg("Subject: test\nTo: foo\n\n",
    332                              policy=email.policy.default.clone(linesep='X'))
    333         self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX")
    334 
    335 
    336 class TestConcretePolicies(unittest.TestCase):
    337 
    338     def test_header_store_parse_rejects_newlines(self):
    339         instance = email.policy.EmailPolicy()
    340         self.assertRaises(ValueError,
    341                           instance.header_store_parse,
    342                           'From', 'spam\negg (at] foo.py')
    343 
    344 
    345 if __name__ == '__main__':
    346     unittest.main()
    347