Home | History | Annotate | Download | only in test_email
      1 import textwrap
      2 import unittest
      3 import contextlib
      4 from email import policy
      5 from email import errors
      6 from test.test_email import TestEmailBase
      7 
      8 
      9 class TestDefectsBase:
     10 
     11     policy = policy.default
     12     raise_expected = False
     13 
     14     @contextlib.contextmanager
     15     def _raise_point(self, defect):
     16         yield
     17 
     18     def test_same_boundary_inner_outer(self):
     19         source = textwrap.dedent("""\
     20             Subject: XX
     21             From: xx (at] xx.dk
     22             To: XX
     23             Mime-version: 1.0
     24             Content-type: multipart/mixed;
     25                boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
     26 
     27             --MS_Mac_OE_3071477847_720252_MIME_Part
     28             Content-type: multipart/alternative;
     29                boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
     30 
     31             --MS_Mac_OE_3071477847_720252_MIME_Part
     32             Content-type: text/plain; charset="ISO-8859-1"
     33             Content-transfer-encoding: quoted-printable
     34 
     35             text
     36 
     37             --MS_Mac_OE_3071477847_720252_MIME_Part
     38             Content-type: text/html; charset="ISO-8859-1"
     39             Content-transfer-encoding: quoted-printable
     40 
     41             <HTML></HTML>
     42 
     43             --MS_Mac_OE_3071477847_720252_MIME_Part--
     44 
     45             --MS_Mac_OE_3071477847_720252_MIME_Part
     46             Content-type: image/gif; name="xx.gif";
     47             Content-disposition: attachment
     48             Content-transfer-encoding: base64
     49 
     50             Some removed base64 encoded chars.
     51 
     52             --MS_Mac_OE_3071477847_720252_MIME_Part--
     53 
     54             """)
     55         # XXX better would be to actually detect the duplicate.
     56         with self._raise_point(errors.StartBoundaryNotFoundDefect):
     57             msg = self._str_msg(source)
     58         if self.raise_expected: return
     59         inner = msg.get_payload(0)
     60         self.assertTrue(hasattr(inner, 'defects'))
     61         self.assertEqual(len(self.get_defects(inner)), 1)
     62         self.assertIsInstance(self.get_defects(inner)[0],
     63                               errors.StartBoundaryNotFoundDefect)
     64 
     65     def test_multipart_no_boundary(self):
     66         source = textwrap.dedent("""\
     67             Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
     68             From: foobar
     69             Subject: broken mail
     70             MIME-Version: 1.0
     71             Content-Type: multipart/report; report-type=delivery-status;
     72 
     73             --JAB03225.986577786/zinfandel.lacita.com
     74 
     75             One part
     76 
     77             --JAB03225.986577786/zinfandel.lacita.com
     78             Content-Type: message/delivery-status
     79 
     80             Header: Another part
     81 
     82             --JAB03225.986577786/zinfandel.lacita.com--
     83             """)
     84         with self._raise_point(errors.NoBoundaryInMultipartDefect):
     85             msg = self._str_msg(source)
     86         if self.raise_expected: return
     87         self.assertIsInstance(msg.get_payload(), str)
     88         self.assertEqual(len(self.get_defects(msg)), 2)
     89         self.assertIsInstance(self.get_defects(msg)[0],
     90                               errors.NoBoundaryInMultipartDefect)
     91         self.assertIsInstance(self.get_defects(msg)[1],
     92                               errors.MultipartInvariantViolationDefect)
     93 
     94     multipart_msg = textwrap.dedent("""\
     95         Date: Wed, 14 Nov 2007 12:56:23 GMT
     96         From: foo (at] bar.invalid
     97         To: foo (at] bar.invalid
     98         Subject: Content-Transfer-Encoding: base64 and multipart
     99         MIME-Version: 1.0
    100         Content-Type: multipart/mixed;
    101             boundary="===============3344438784458119861=="{}
    102 
    103         --===============3344438784458119861==
    104         Content-Type: text/plain
    105 
    106         Test message
    107 
    108         --===============3344438784458119861==
    109         Content-Type: application/octet-stream
    110         Content-Transfer-Encoding: base64
    111 
    112         YWJj
    113 
    114         --===============3344438784458119861==--
    115         """)
    116 
    117     def test_multipart_invalid_cte(self):
    118         with self._raise_point(
    119                 errors.InvalidMultipartContentTransferEncodingDefect):
    120             msg = self._str_msg(
    121                     self.multipart_msg.format(
    122                         "\nContent-Transfer-Encoding: base64"))
    123         if self.raise_expected: return
    124         self.assertEqual(len(self.get_defects(msg)), 1)
    125         self.assertIsInstance(self.get_defects(msg)[0],
    126             errors.InvalidMultipartContentTransferEncodingDefect)
    127 
    128     def test_multipart_no_cte_no_defect(self):
    129         if self.raise_expected: return
    130         msg = self._str_msg(self.multipart_msg.format(''))
    131         self.assertEqual(len(self.get_defects(msg)), 0)
    132 
    133     def test_multipart_valid_cte_no_defect(self):
    134         if self.raise_expected: return
    135         for cte in ('7bit', '8bit', 'BINary'):
    136             msg = self._str_msg(
    137                 self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
    138             self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
    139 
    140     def test_lying_multipart(self):
    141         source = textwrap.dedent("""\
    142             From: "Allison Dunlap" <xxx (at] example.com>
    143             To: yyy (at] example.com
    144             Subject: 64423
    145             Date: Sun, 11 Jul 2004 16:09:27 -0300
    146             MIME-Version: 1.0
    147             Content-Type: multipart/alternative;
    148 
    149             Blah blah blah
    150             """)
    151         with self._raise_point(errors.NoBoundaryInMultipartDefect):
    152             msg = self._str_msg(source)
    153         if self.raise_expected: return
    154         self.assertTrue(hasattr(msg, 'defects'))
    155         self.assertEqual(len(self.get_defects(msg)), 2)
    156         self.assertIsInstance(self.get_defects(msg)[0],
    157                               errors.NoBoundaryInMultipartDefect)
    158         self.assertIsInstance(self.get_defects(msg)[1],
    159                               errors.MultipartInvariantViolationDefect)
    160 
    161     def test_missing_start_boundary(self):
    162         source = textwrap.dedent("""\
    163             Content-Type: multipart/mixed; boundary="AAA"
    164             From: Mail Delivery Subsystem <xxx (at] example.com>
    165             To: yyy (at] example.com
    166 
    167             --AAA
    168 
    169             Stuff
    170 
    171             --AAA
    172             Content-Type: message/rfc822
    173 
    174             From: webmaster (at] python.org
    175             To: zzz (at] example.com
    176             Content-Type: multipart/mixed; boundary="BBB"
    177 
    178             --BBB--
    179 
    180             --AAA--
    181 
    182             """)
    183         # The message structure is:
    184         #
    185         # multipart/mixed
    186         #    text/plain
    187         #    message/rfc822
    188         #        multipart/mixed [*]
    189         #
    190         # [*] This message is missing its start boundary
    191         with self._raise_point(errors.StartBoundaryNotFoundDefect):
    192             outer = self._str_msg(source)
    193         if self.raise_expected: return
    194         bad = outer.get_payload(1).get_payload(0)
    195         self.assertEqual(len(self.get_defects(bad)), 1)
    196         self.assertIsInstance(self.get_defects(bad)[0],
    197                               errors.StartBoundaryNotFoundDefect)
    198 
    199     def test_first_line_is_continuation_header(self):
    200         with self._raise_point(errors.FirstHeaderLineIsContinuationDefect):
    201             msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
    202         if self.raise_expected: return
    203         self.assertEqual(msg.keys(), ['Subject'])
    204         self.assertEqual(msg.get_payload(), 'body')
    205         self.assertEqual(len(self.get_defects(msg)), 1)
    206         self.assertDefectsEqual(self.get_defects(msg),
    207                                  [errors.FirstHeaderLineIsContinuationDefect])
    208         self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
    209 
    210     def test_missing_header_body_separator(self):
    211         # Our heuristic if we see a line that doesn't look like a header (no
    212         # leading whitespace but no ':') is to assume that the blank line that
    213         # separates the header from the body is missing, and to stop parsing
    214         # headers and start parsing the body.
    215         with self._raise_point(errors.MissingHeaderBodySeparatorDefect):
    216             msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
    217         if self.raise_expected: return
    218         self.assertEqual(msg.keys(), ['Subject'])
    219         self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
    220         self.assertDefectsEqual(self.get_defects(msg),
    221                                 [errors.MissingHeaderBodySeparatorDefect])
    222 
    223     def test_bad_padding_in_base64_payload(self):
    224         source = textwrap.dedent("""\
    225             Subject: test
    226             MIME-Version: 1.0
    227             Content-Type: text/plain; charset="utf-8"
    228             Content-Transfer-Encoding: base64
    229 
    230             dmk
    231             """)
    232         msg = self._str_msg(source)
    233         with self._raise_point(errors.InvalidBase64PaddingDefect):
    234             payload = msg.get_payload(decode=True)
    235         if self.raise_expected: return
    236         self.assertEqual(payload, b'vi')
    237         self.assertDefectsEqual(self.get_defects(msg),
    238                                 [errors.InvalidBase64PaddingDefect])
    239 
    240     def test_invalid_chars_in_base64_payload(self):
    241         source = textwrap.dedent("""\
    242             Subject: test
    243             MIME-Version: 1.0
    244             Content-Type: text/plain; charset="utf-8"
    245             Content-Transfer-Encoding: base64
    246 
    247             dm\x01k===
    248             """)
    249         msg = self._str_msg(source)
    250         with self._raise_point(errors.InvalidBase64CharactersDefect):
    251             payload = msg.get_payload(decode=True)
    252         if self.raise_expected: return
    253         self.assertEqual(payload, b'vi')
    254         self.assertDefectsEqual(self.get_defects(msg),
    255                                 [errors.InvalidBase64CharactersDefect])
    256 
    257     def test_invalid_length_of_base64_payload(self):
    258         source = textwrap.dedent("""\
    259             Subject: test
    260             MIME-Version: 1.0
    261             Content-Type: text/plain; charset="utf-8"
    262             Content-Transfer-Encoding: base64
    263 
    264             abcde
    265             """)
    266         msg = self._str_msg(source)
    267         with self._raise_point(errors.InvalidBase64LengthDefect):
    268             payload = msg.get_payload(decode=True)
    269         if self.raise_expected: return
    270         self.assertEqual(payload, b'abcde')
    271         self.assertDefectsEqual(self.get_defects(msg),
    272                                 [errors.InvalidBase64LengthDefect])
    273 
    274     def test_missing_ending_boundary(self):
    275         source = textwrap.dedent("""\
    276             To: 1 (at] harrydomain4.com
    277             Subject: Fwd: 1
    278             MIME-Version: 1.0
    279             Content-Type: multipart/alternative;
    280              boundary="------------000101020201080900040301"
    281 
    282             --------------000101020201080900040301
    283             Content-Type: text/plain; charset=ISO-8859-1
    284             Content-Transfer-Encoding: 7bit
    285 
    286             Alternative 1
    287 
    288             --------------000101020201080900040301
    289             Content-Type: text/html; charset=ISO-8859-1
    290             Content-Transfer-Encoding: 7bit
    291 
    292             Alternative 2
    293 
    294             """)
    295         with self._raise_point(errors.CloseBoundaryNotFoundDefect):
    296             msg = self._str_msg(source)
    297         if self.raise_expected: return
    298         self.assertEqual(len(msg.get_payload()), 2)
    299         self.assertEqual(msg.get_payload(1).get_payload(), 'Alternative 2\n')
    300         self.assertDefectsEqual(self.get_defects(msg),
    301                                 [errors.CloseBoundaryNotFoundDefect])
    302 
    303 
    304 class TestDefectDetection(TestDefectsBase, TestEmailBase):
    305 
    306     def get_defects(self, obj):
    307         return obj.defects
    308 
    309 
    310 class TestDefectCapture(TestDefectsBase, TestEmailBase):
    311 
    312     class CapturePolicy(policy.EmailPolicy):
    313         captured = None
    314         def register_defect(self, obj, defect):
    315             self.captured.append(defect)
    316 
    317     def setUp(self):
    318         self.policy = self.CapturePolicy(captured=list())
    319 
    320     def get_defects(self, obj):
    321         return self.policy.captured
    322 
    323 
    324 class TestDefectRaising(TestDefectsBase, TestEmailBase):
    325 
    326     policy = TestDefectsBase.policy
    327     policy = policy.clone(raise_on_defect=True)
    328     raise_expected = True
    329 
    330     @contextlib.contextmanager
    331     def _raise_point(self, defect):
    332         with self.assertRaises(defect):
    333             yield
    334 
    335 
    336 if __name__ == '__main__':
    337     unittest.main()
    338