1 import textwrap 2 import unittest 3 import contextlib 4 from email import policy 5 from email import errors 6 from test.test_email import TestEmailBase 7 8 9 class TestDefectsBase: 10 11 policy = policy.default 12 raise_expected = False 13 14 @contextlib.contextmanager 15 def _raise_point(self, defect): 16 yield 17 18 def test_same_boundary_inner_outer(self): 19 source = textwrap.dedent("""\ 20 Subject: XX 21 From: xx (at] xx.dk 22 To: XX 23 Mime-version: 1.0 24 Content-type: multipart/mixed; 25 boundary="MS_Mac_OE_3071477847_720252_MIME_Part" 26 27 --MS_Mac_OE_3071477847_720252_MIME_Part 28 Content-type: multipart/alternative; 29 boundary="MS_Mac_OE_3071477847_720252_MIME_Part" 30 31 --MS_Mac_OE_3071477847_720252_MIME_Part 32 Content-type: text/plain; charset="ISO-8859-1" 33 Content-transfer-encoding: quoted-printable 34 35 text 36 37 --MS_Mac_OE_3071477847_720252_MIME_Part 38 Content-type: text/html; charset="ISO-8859-1" 39 Content-transfer-encoding: quoted-printable 40 41 <HTML></HTML> 42 43 --MS_Mac_OE_3071477847_720252_MIME_Part-- 44 45 --MS_Mac_OE_3071477847_720252_MIME_Part 46 Content-type: image/gif; name="xx.gif"; 47 Content-disposition: attachment 48 Content-transfer-encoding: base64 49 50 Some removed base64 encoded chars. 51 52 --MS_Mac_OE_3071477847_720252_MIME_Part-- 53 54 """) 55 # XXX better would be to actually detect the duplicate. 56 with self._raise_point(errors.StartBoundaryNotFoundDefect): 57 msg = self._str_msg(source) 58 if self.raise_expected: return 59 inner = msg.get_payload(0) 60 self.assertTrue(hasattr(inner, 'defects')) 61 self.assertEqual(len(self.get_defects(inner)), 1) 62 self.assertIsInstance(self.get_defects(inner)[0], 63 errors.StartBoundaryNotFoundDefect) 64 65 def test_multipart_no_boundary(self): 66 source = textwrap.dedent("""\ 67 Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800) 68 From: foobar 69 Subject: broken mail 70 MIME-Version: 1.0 71 Content-Type: multipart/report; report-type=delivery-status; 72 73 --JAB03225.986577786/zinfandel.lacita.com 74 75 One part 76 77 --JAB03225.986577786/zinfandel.lacita.com 78 Content-Type: message/delivery-status 79 80 Header: Another part 81 82 --JAB03225.986577786/zinfandel.lacita.com-- 83 """) 84 with self._raise_point(errors.NoBoundaryInMultipartDefect): 85 msg = self._str_msg(source) 86 if self.raise_expected: return 87 self.assertIsInstance(msg.get_payload(), str) 88 self.assertEqual(len(self.get_defects(msg)), 2) 89 self.assertIsInstance(self.get_defects(msg)[0], 90 errors.NoBoundaryInMultipartDefect) 91 self.assertIsInstance(self.get_defects(msg)[1], 92 errors.MultipartInvariantViolationDefect) 93 94 multipart_msg = textwrap.dedent("""\ 95 Date: Wed, 14 Nov 2007 12:56:23 GMT 96 From: foo (at] bar.invalid 97 To: foo (at] bar.invalid 98 Subject: Content-Transfer-Encoding: base64 and multipart 99 MIME-Version: 1.0 100 Content-Type: multipart/mixed; 101 boundary="===============3344438784458119861=="{} 102 103 --===============3344438784458119861== 104 Content-Type: text/plain 105 106 Test message 107 108 --===============3344438784458119861== 109 Content-Type: application/octet-stream 110 Content-Transfer-Encoding: base64 111 112 YWJj 113 114 --===============3344438784458119861==-- 115 """) 116 117 def test_multipart_invalid_cte(self): 118 with self._raise_point( 119 errors.InvalidMultipartContentTransferEncodingDefect): 120 msg = self._str_msg( 121 self.multipart_msg.format( 122 "\nContent-Transfer-Encoding: base64")) 123 if self.raise_expected: return 124 self.assertEqual(len(self.get_defects(msg)), 1) 125 self.assertIsInstance(self.get_defects(msg)[0], 126 errors.InvalidMultipartContentTransferEncodingDefect) 127 128 def test_multipart_no_cte_no_defect(self): 129 if self.raise_expected: return 130 msg = self._str_msg(self.multipart_msg.format('')) 131 self.assertEqual(len(self.get_defects(msg)), 0) 132 133 def test_multipart_valid_cte_no_defect(self): 134 if self.raise_expected: return 135 for cte in ('7bit', '8bit', 'BINary'): 136 msg = self._str_msg( 137 self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte)) 138 self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte) 139 140 def test_lying_multipart(self): 141 source = textwrap.dedent("""\ 142 From: "Allison Dunlap" <xxx (at] example.com> 143 To: yyy (at] example.com 144 Subject: 64423 145 Date: Sun, 11 Jul 2004 16:09:27 -0300 146 MIME-Version: 1.0 147 Content-Type: multipart/alternative; 148 149 Blah blah blah 150 """) 151 with self._raise_point(errors.NoBoundaryInMultipartDefect): 152 msg = self._str_msg(source) 153 if self.raise_expected: return 154 self.assertTrue(hasattr(msg, 'defects')) 155 self.assertEqual(len(self.get_defects(msg)), 2) 156 self.assertIsInstance(self.get_defects(msg)[0], 157 errors.NoBoundaryInMultipartDefect) 158 self.assertIsInstance(self.get_defects(msg)[1], 159 errors.MultipartInvariantViolationDefect) 160 161 def test_missing_start_boundary(self): 162 source = textwrap.dedent("""\ 163 Content-Type: multipart/mixed; boundary="AAA" 164 From: Mail Delivery Subsystem <xxx (at] example.com> 165 To: yyy (at] example.com 166 167 --AAA 168 169 Stuff 170 171 --AAA 172 Content-Type: message/rfc822 173 174 From: webmaster (at] python.org 175 To: zzz (at] example.com 176 Content-Type: multipart/mixed; boundary="BBB" 177 178 --BBB-- 179 180 --AAA-- 181 182 """) 183 # The message structure is: 184 # 185 # multipart/mixed 186 # text/plain 187 # message/rfc822 188 # multipart/mixed [*] 189 # 190 # [*] This message is missing its start boundary 191 with self._raise_point(errors.StartBoundaryNotFoundDefect): 192 outer = self._str_msg(source) 193 if self.raise_expected: return 194 bad = outer.get_payload(1).get_payload(0) 195 self.assertEqual(len(self.get_defects(bad)), 1) 196 self.assertIsInstance(self.get_defects(bad)[0], 197 errors.StartBoundaryNotFoundDefect) 198 199 def test_first_line_is_continuation_header(self): 200 with self._raise_point(errors.FirstHeaderLineIsContinuationDefect): 201 msg = self._str_msg(' Line 1\nSubject: test\n\nbody') 202 if self.raise_expected: return 203 self.assertEqual(msg.keys(), ['Subject']) 204 self.assertEqual(msg.get_payload(), 'body') 205 self.assertEqual(len(self.get_defects(msg)), 1) 206 self.assertDefectsEqual(self.get_defects(msg), 207 [errors.FirstHeaderLineIsContinuationDefect]) 208 self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n') 209 210 def test_missing_header_body_separator(self): 211 # Our heuristic if we see a line that doesn't look like a header (no 212 # leading whitespace but no ':') is to assume that the blank line that 213 # separates the header from the body is missing, and to stop parsing 214 # headers and start parsing the body. 215 with self._raise_point(errors.MissingHeaderBodySeparatorDefect): 216 msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n') 217 if self.raise_expected: return 218 self.assertEqual(msg.keys(), ['Subject']) 219 self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n') 220 self.assertDefectsEqual(self.get_defects(msg), 221 [errors.MissingHeaderBodySeparatorDefect]) 222 223 def test_bad_padding_in_base64_payload(self): 224 source = textwrap.dedent("""\ 225 Subject: test 226 MIME-Version: 1.0 227 Content-Type: text/plain; charset="utf-8" 228 Content-Transfer-Encoding: base64 229 230 dmk 231 """) 232 msg = self._str_msg(source) 233 with self._raise_point(errors.InvalidBase64PaddingDefect): 234 payload = msg.get_payload(decode=True) 235 if self.raise_expected: return 236 self.assertEqual(payload, b'vi') 237 self.assertDefectsEqual(self.get_defects(msg), 238 [errors.InvalidBase64PaddingDefect]) 239 240 def test_invalid_chars_in_base64_payload(self): 241 source = textwrap.dedent("""\ 242 Subject: test 243 MIME-Version: 1.0 244 Content-Type: text/plain; charset="utf-8" 245 Content-Transfer-Encoding: base64 246 247 dm\x01k=== 248 """) 249 msg = self._str_msg(source) 250 with self._raise_point(errors.InvalidBase64CharactersDefect): 251 payload = msg.get_payload(decode=True) 252 if self.raise_expected: return 253 self.assertEqual(payload, b'vi') 254 self.assertDefectsEqual(self.get_defects(msg), 255 [errors.InvalidBase64CharactersDefect]) 256 257 def test_invalid_length_of_base64_payload(self): 258 source = textwrap.dedent("""\ 259 Subject: test 260 MIME-Version: 1.0 261 Content-Type: text/plain; charset="utf-8" 262 Content-Transfer-Encoding: base64 263 264 abcde 265 """) 266 msg = self._str_msg(source) 267 with self._raise_point(errors.InvalidBase64LengthDefect): 268 payload = msg.get_payload(decode=True) 269 if self.raise_expected: return 270 self.assertEqual(payload, b'abcde') 271 self.assertDefectsEqual(self.get_defects(msg), 272 [errors.InvalidBase64LengthDefect]) 273 274 def test_missing_ending_boundary(self): 275 source = textwrap.dedent("""\ 276 To: 1 (at] harrydomain4.com 277 Subject: Fwd: 1 278 MIME-Version: 1.0 279 Content-Type: multipart/alternative; 280 boundary="------------000101020201080900040301" 281 282 --------------000101020201080900040301 283 Content-Type: text/plain; charset=ISO-8859-1 284 Content-Transfer-Encoding: 7bit 285 286 Alternative 1 287 288 --------------000101020201080900040301 289 Content-Type: text/html; charset=ISO-8859-1 290 Content-Transfer-Encoding: 7bit 291 292 Alternative 2 293 294 """) 295 with self._raise_point(errors.CloseBoundaryNotFoundDefect): 296 msg = self._str_msg(source) 297 if self.raise_expected: return 298 self.assertEqual(len(msg.get_payload()), 2) 299 self.assertEqual(msg.get_payload(1).get_payload(), 'Alternative 2\n') 300 self.assertDefectsEqual(self.get_defects(msg), 301 [errors.CloseBoundaryNotFoundDefect]) 302 303 304 class TestDefectDetection(TestDefectsBase, TestEmailBase): 305 306 def get_defects(self, obj): 307 return obj.defects 308 309 310 class TestDefectCapture(TestDefectsBase, TestEmailBase): 311 312 class CapturePolicy(policy.EmailPolicy): 313 captured = None 314 def register_defect(self, obj, defect): 315 self.captured.append(defect) 316 317 def setUp(self): 318 self.policy = self.CapturePolicy(captured=list()) 319 320 def get_defects(self, obj): 321 return self.policy.captured 322 323 324 class TestDefectRaising(TestDefectsBase, TestEmailBase): 325 326 policy = TestDefectsBase.policy 327 policy = policy.clone(raise_on_defect=True) 328 raise_expected = True 329 330 @contextlib.contextmanager 331 def _raise_point(self, defect): 332 with self.assertRaises(defect): 333 yield 334 335 336 if __name__ == '__main__': 337 unittest.main() 338