1 import os 2 import sys 3 import time 4 import stat 5 import socket 6 import email 7 import email.message 8 import re 9 import io 10 import tempfile 11 from test import support 12 import unittest 13 import textwrap 14 import mailbox 15 import glob 16 17 18 class TestBase: 19 20 all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage, 21 mailbox.mboxMessage, mailbox.MHMessage, 22 mailbox.BabylMessage, mailbox.MMDFMessage) 23 24 def _check_sample(self, msg): 25 # Inspect a mailbox.Message representation of the sample message 26 self.assertIsInstance(msg, email.message.Message) 27 self.assertIsInstance(msg, mailbox.Message) 28 for key, value in _sample_headers.items(): 29 self.assertIn(value, msg.get_all(key)) 30 self.assertTrue(msg.is_multipart()) 31 self.assertEqual(len(msg.get_payload()), len(_sample_payloads)) 32 for i, payload in enumerate(_sample_payloads): 33 part = msg.get_payload(i) 34 self.assertIsInstance(part, email.message.Message) 35 self.assertNotIsInstance(part, mailbox.Message) 36 self.assertEqual(part.get_payload(), payload) 37 38 def _delete_recursively(self, target): 39 # Delete a file or delete a directory recursively 40 if os.path.isdir(target): 41 support.rmtree(target) 42 elif os.path.exists(target): 43 support.unlink(target) 44 45 46 class TestMailbox(TestBase): 47 48 maxDiff = None 49 50 _factory = None # Overridden by subclasses to reuse tests 51 _template = 'From: foo\n\n%s\n' 52 53 def setUp(self): 54 self._path = support.TESTFN 55 self._delete_recursively(self._path) 56 self._box = self._factory(self._path) 57 58 def tearDown(self): 59 self._box.close() 60 self._delete_recursively(self._path) 61 62 def test_add(self): 63 # Add copies of a sample message 64 keys = [] 65 keys.append(self._box.add(self._template % 0)) 66 self.assertEqual(len(self._box), 1) 67 keys.append(self._box.add(mailbox.Message(_sample_message))) 68 self.assertEqual(len(self._box), 2) 69 keys.append(self._box.add(email.message_from_string(_sample_message))) 70 self.assertEqual(len(self._box), 3) 71 keys.append(self._box.add(io.BytesIO(_bytes_sample_message))) 72 self.assertEqual(len(self._box), 4) 73 keys.append(self._box.add(_sample_message)) 74 self.assertEqual(len(self._box), 5) 75 keys.append(self._box.add(_bytes_sample_message)) 76 self.assertEqual(len(self._box), 6) 77 with self.assertWarns(DeprecationWarning): 78 keys.append(self._box.add( 79 io.TextIOWrapper(io.BytesIO(_bytes_sample_message)))) 80 self.assertEqual(len(self._box), 7) 81 self.assertEqual(self._box.get_string(keys[0]), self._template % 0) 82 for i in (1, 2, 3, 4, 5, 6): 83 self._check_sample(self._box[keys[i]]) 84 85 _nonascii_msg = textwrap.dedent("""\ 86 From: foo 87 Subject: Falinaptr hzhozszlltssal. Mr rendeltl? 88 89 0 90 """) 91 92 def test_add_invalid_8bit_bytes_header(self): 93 key = self._box.add(self._nonascii_msg.encode('latin-1')) 94 self.assertEqual(len(self._box), 1) 95 self.assertEqual(self._box.get_bytes(key), 96 self._nonascii_msg.encode('latin-1')) 97 98 def test_invalid_nonascii_header_as_string(self): 99 subj = self._nonascii_msg.splitlines()[1] 100 key = self._box.add(subj.encode('latin-1')) 101 self.assertEqual(self._box.get_string(key), 102 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz' 103 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n') 104 105 def test_add_nonascii_string_header_raises(self): 106 with self.assertRaisesRegex(ValueError, "ASCII-only"): 107 self._box.add(self._nonascii_msg) 108 self._box.flush() 109 self.assertEqual(len(self._box), 0) 110 self.assertMailboxEmpty() 111 112 def test_add_that_raises_leaves_mailbox_empty(self): 113 def raiser(*args, **kw): 114 raise Exception("a fake error") 115 support.patch(self, email.generator.BytesGenerator, 'flatten', raiser) 116 with self.assertRaises(Exception): 117 self._box.add(email.message_from_string("From: Alphso")) 118 self.assertEqual(len(self._box), 0) 119 self._box.close() 120 self.assertMailboxEmpty() 121 122 _non_latin_bin_msg = textwrap.dedent("""\ 123 From: foo (at] bar.com 124 To: bz 125 Subject: Maintenant je vous prsente mon collgue, le pouf clbre 126 \tJean de Baddie 127 Mime-Version: 1.0 128 Content-Type: text/plain; charset="utf-8" 129 Content-Transfer-Encoding: 8bit 130 131 , . 132 """).encode('utf-8') 133 134 def test_add_8bit_body(self): 135 key = self._box.add(self._non_latin_bin_msg) 136 self.assertEqual(self._box.get_bytes(key), 137 self._non_latin_bin_msg) 138 with self._box.get_file(key) as f: 139 self.assertEqual(f.read(), 140 self._non_latin_bin_msg.replace(b'\n', 141 os.linesep.encode())) 142 self.assertEqual(self._box[key].get_payload(), 143 ", .\n") 144 145 def test_add_binary_file(self): 146 with tempfile.TemporaryFile('wb+') as f: 147 f.write(_bytes_sample_message) 148 f.seek(0) 149 key = self._box.add(f) 150 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 151 _bytes_sample_message.split(b'\n')) 152 153 def test_add_binary_nonascii_file(self): 154 with tempfile.TemporaryFile('wb+') as f: 155 f.write(self._non_latin_bin_msg) 156 f.seek(0) 157 key = self._box.add(f) 158 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 159 self._non_latin_bin_msg.split(b'\n')) 160 161 def test_add_text_file_warns(self): 162 with tempfile.TemporaryFile('w+') as f: 163 f.write(_sample_message) 164 f.seek(0) 165 with self.assertWarns(DeprecationWarning): 166 key = self._box.add(f) 167 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 168 _bytes_sample_message.split(b'\n')) 169 170 def test_add_StringIO_warns(self): 171 with self.assertWarns(DeprecationWarning): 172 key = self._box.add(io.StringIO(self._template % "0")) 173 self.assertEqual(self._box.get_string(key), self._template % "0") 174 175 def test_add_nonascii_StringIO_raises(self): 176 with self.assertWarns(DeprecationWarning): 177 with self.assertRaisesRegex(ValueError, "ASCII-only"): 178 self._box.add(io.StringIO(self._nonascii_msg)) 179 self.assertEqual(len(self._box), 0) 180 self._box.close() 181 self.assertMailboxEmpty() 182 183 def test_remove(self): 184 # Remove messages using remove() 185 self._test_remove_or_delitem(self._box.remove) 186 187 def test_delitem(self): 188 # Remove messages using __delitem__() 189 self._test_remove_or_delitem(self._box.__delitem__) 190 191 def _test_remove_or_delitem(self, method): 192 # (Used by test_remove() and test_delitem().) 193 key0 = self._box.add(self._template % 0) 194 key1 = self._box.add(self._template % 1) 195 self.assertEqual(len(self._box), 2) 196 method(key0) 197 self.assertEqual(len(self._box), 1) 198 self.assertRaises(KeyError, lambda: self._box[key0]) 199 self.assertRaises(KeyError, lambda: method(key0)) 200 self.assertEqual(self._box.get_string(key1), self._template % 1) 201 key2 = self._box.add(self._template % 2) 202 self.assertEqual(len(self._box), 2) 203 method(key2) 204 self.assertEqual(len(self._box), 1) 205 self.assertRaises(KeyError, lambda: self._box[key2]) 206 self.assertRaises(KeyError, lambda: method(key2)) 207 self.assertEqual(self._box.get_string(key1), self._template % 1) 208 method(key1) 209 self.assertEqual(len(self._box), 0) 210 self.assertRaises(KeyError, lambda: self._box[key1]) 211 self.assertRaises(KeyError, lambda: method(key1)) 212 213 def test_discard(self, repetitions=10): 214 # Discard messages 215 key0 = self._box.add(self._template % 0) 216 key1 = self._box.add(self._template % 1) 217 self.assertEqual(len(self._box), 2) 218 self._box.discard(key0) 219 self.assertEqual(len(self._box), 1) 220 self.assertRaises(KeyError, lambda: self._box[key0]) 221 self._box.discard(key0) 222 self.assertEqual(len(self._box), 1) 223 self.assertRaises(KeyError, lambda: self._box[key0]) 224 225 def test_get(self): 226 # Retrieve messages using get() 227 key0 = self._box.add(self._template % 0) 228 msg = self._box.get(key0) 229 self.assertEqual(msg['from'], 'foo') 230 self.assertEqual(msg.get_payload(), '0\n') 231 self.assertIsNone(self._box.get('foo')) 232 self.assertIs(self._box.get('foo', False), False) 233 self._box.close() 234 self._box = self._factory(self._path) 235 key1 = self._box.add(self._template % 1) 236 msg = self._box.get(key1) 237 self.assertEqual(msg['from'], 'foo') 238 self.assertEqual(msg.get_payload(), '1\n') 239 240 def test_getitem(self): 241 # Retrieve message using __getitem__() 242 key0 = self._box.add(self._template % 0) 243 msg = self._box[key0] 244 self.assertEqual(msg['from'], 'foo') 245 self.assertEqual(msg.get_payload(), '0\n') 246 self.assertRaises(KeyError, lambda: self._box['foo']) 247 self._box.discard(key0) 248 self.assertRaises(KeyError, lambda: self._box[key0]) 249 250 def test_get_message(self): 251 # Get Message representations of messages 252 key0 = self._box.add(self._template % 0) 253 key1 = self._box.add(_sample_message) 254 msg0 = self._box.get_message(key0) 255 self.assertIsInstance(msg0, mailbox.Message) 256 self.assertEqual(msg0['from'], 'foo') 257 self.assertEqual(msg0.get_payload(), '0\n') 258 self._check_sample(self._box.get_message(key1)) 259 260 def test_get_bytes(self): 261 # Get bytes representations of messages 262 key0 = self._box.add(self._template % 0) 263 key1 = self._box.add(_sample_message) 264 self.assertEqual(self._box.get_bytes(key0), 265 (self._template % 0).encode('ascii')) 266 self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message) 267 268 def test_get_string(self): 269 # Get string representations of messages 270 key0 = self._box.add(self._template % 0) 271 key1 = self._box.add(_sample_message) 272 self.assertEqual(self._box.get_string(key0), self._template % 0) 273 self.assertEqual(self._box.get_string(key1).split('\n'), 274 _sample_message.split('\n')) 275 276 def test_get_file(self): 277 # Get file representations of messages 278 key0 = self._box.add(self._template % 0) 279 key1 = self._box.add(_sample_message) 280 with self._box.get_file(key0) as file: 281 data0 = file.read() 282 with self._box.get_file(key1) as file: 283 data1 = file.read() 284 self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'), 285 self._template % 0) 286 self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'), 287 _sample_message) 288 289 def test_get_file_can_be_closed_twice(self): 290 # Issue 11700 291 key = self._box.add(_sample_message) 292 f = self._box.get_file(key) 293 f.close() 294 f.close() 295 296 def test_iterkeys(self): 297 # Get keys using iterkeys() 298 self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False) 299 300 def test_keys(self): 301 # Get keys using keys() 302 self._check_iteration(self._box.keys, do_keys=True, do_values=False) 303 304 def test_itervalues(self): 305 # Get values using itervalues() 306 self._check_iteration(self._box.itervalues, do_keys=False, 307 do_values=True) 308 309 def test_iter(self): 310 # Get values using __iter__() 311 self._check_iteration(self._box.__iter__, do_keys=False, 312 do_values=True) 313 314 def test_values(self): 315 # Get values using values() 316 self._check_iteration(self._box.values, do_keys=False, do_values=True) 317 318 def test_iteritems(self): 319 # Get keys and values using iteritems() 320 self._check_iteration(self._box.iteritems, do_keys=True, 321 do_values=True) 322 323 def test_items(self): 324 # Get keys and values using items() 325 self._check_iteration(self._box.items, do_keys=True, do_values=True) 326 327 def _check_iteration(self, method, do_keys, do_values, repetitions=10): 328 for value in method(): 329 self.fail("Not empty") 330 keys, values = [], [] 331 for i in range(repetitions): 332 keys.append(self._box.add(self._template % i)) 333 values.append(self._template % i) 334 if do_keys and not do_values: 335 returned_keys = list(method()) 336 elif do_values and not do_keys: 337 returned_values = list(method()) 338 else: 339 returned_keys, returned_values = [], [] 340 for key, value in method(): 341 returned_keys.append(key) 342 returned_values.append(value) 343 if do_keys: 344 self.assertEqual(len(keys), len(returned_keys)) 345 self.assertEqual(set(keys), set(returned_keys)) 346 if do_values: 347 count = 0 348 for value in returned_values: 349 self.assertEqual(value['from'], 'foo') 350 self.assertLess(int(value.get_payload()), repetitions) 351 count += 1 352 self.assertEqual(len(values), count) 353 354 def test_contains(self): 355 # Check existence of keys using __contains__() 356 self.assertNotIn('foo', self._box) 357 key0 = self._box.add(self._template % 0) 358 self.assertIn(key0, self._box) 359 self.assertNotIn('foo', self._box) 360 key1 = self._box.add(self._template % 1) 361 self.assertIn(key1, self._box) 362 self.assertIn(key0, self._box) 363 self.assertNotIn('foo', self._box) 364 self._box.remove(key0) 365 self.assertNotIn(key0, self._box) 366 self.assertIn(key1, self._box) 367 self.assertNotIn('foo', self._box) 368 self._box.remove(key1) 369 self.assertNotIn(key1, self._box) 370 self.assertNotIn(key0, self._box) 371 self.assertNotIn('foo', self._box) 372 373 def test_len(self, repetitions=10): 374 # Get message count 375 keys = [] 376 for i in range(repetitions): 377 self.assertEqual(len(self._box), i) 378 keys.append(self._box.add(self._template % i)) 379 self.assertEqual(len(self._box), i + 1) 380 for i in range(repetitions): 381 self.assertEqual(len(self._box), repetitions - i) 382 self._box.remove(keys[i]) 383 self.assertEqual(len(self._box), repetitions - i - 1) 384 385 def test_set_item(self): 386 # Modify messages using __setitem__() 387 key0 = self._box.add(self._template % 'original 0') 388 self.assertEqual(self._box.get_string(key0), 389 self._template % 'original 0') 390 key1 = self._box.add(self._template % 'original 1') 391 self.assertEqual(self._box.get_string(key1), 392 self._template % 'original 1') 393 self._box[key0] = self._template % 'changed 0' 394 self.assertEqual(self._box.get_string(key0), 395 self._template % 'changed 0') 396 self._box[key1] = self._template % 'changed 1' 397 self.assertEqual(self._box.get_string(key1), 398 self._template % 'changed 1') 399 self._box[key0] = _sample_message 400 self._check_sample(self._box[key0]) 401 self._box[key1] = self._box[key0] 402 self._check_sample(self._box[key1]) 403 self._box[key0] = self._template % 'original 0' 404 self.assertEqual(self._box.get_string(key0), 405 self._template % 'original 0') 406 self._check_sample(self._box[key1]) 407 self.assertRaises(KeyError, 408 lambda: self._box.__setitem__('foo', 'bar')) 409 self.assertRaises(KeyError, lambda: self._box['foo']) 410 self.assertEqual(len(self._box), 2) 411 412 def test_clear(self, iterations=10): 413 # Remove all messages using clear() 414 keys = [] 415 for i in range(iterations): 416 self._box.add(self._template % i) 417 for i, key in enumerate(keys): 418 self.assertEqual(self._box.get_string(key), self._template % i) 419 self._box.clear() 420 self.assertEqual(len(self._box), 0) 421 for i, key in enumerate(keys): 422 self.assertRaises(KeyError, lambda: self._box.get_string(key)) 423 424 def test_pop(self): 425 # Get and remove a message using pop() 426 key0 = self._box.add(self._template % 0) 427 self.assertIn(key0, self._box) 428 key1 = self._box.add(self._template % 1) 429 self.assertIn(key1, self._box) 430 self.assertEqual(self._box.pop(key0).get_payload(), '0\n') 431 self.assertNotIn(key0, self._box) 432 self.assertIn(key1, self._box) 433 key2 = self._box.add(self._template % 2) 434 self.assertIn(key2, self._box) 435 self.assertEqual(self._box.pop(key2).get_payload(), '2\n') 436 self.assertNotIn(key2, self._box) 437 self.assertIn(key1, self._box) 438 self.assertEqual(self._box.pop(key1).get_payload(), '1\n') 439 self.assertNotIn(key1, self._box) 440 self.assertEqual(len(self._box), 0) 441 442 def test_popitem(self, iterations=10): 443 # Get and remove an arbitrary (key, message) using popitem() 444 keys = [] 445 for i in range(10): 446 keys.append(self._box.add(self._template % i)) 447 seen = [] 448 for i in range(10): 449 key, msg = self._box.popitem() 450 self.assertIn(key, keys) 451 self.assertNotIn(key, seen) 452 seen.append(key) 453 self.assertEqual(int(msg.get_payload()), keys.index(key)) 454 self.assertEqual(len(self._box), 0) 455 for key in keys: 456 self.assertRaises(KeyError, lambda: self._box[key]) 457 458 def test_update(self): 459 # Modify multiple messages using update() 460 key0 = self._box.add(self._template % 'original 0') 461 key1 = self._box.add(self._template % 'original 1') 462 key2 = self._box.add(self._template % 'original 2') 463 self._box.update({key0: self._template % 'changed 0', 464 key2: _sample_message}) 465 self.assertEqual(len(self._box), 3) 466 self.assertEqual(self._box.get_string(key0), 467 self._template % 'changed 0') 468 self.assertEqual(self._box.get_string(key1), 469 self._template % 'original 1') 470 self._check_sample(self._box[key2]) 471 self._box.update([(key2, self._template % 'changed 2'), 472 (key1, self._template % 'changed 1'), 473 (key0, self._template % 'original 0')]) 474 self.assertEqual(len(self._box), 3) 475 self.assertEqual(self._box.get_string(key0), 476 self._template % 'original 0') 477 self.assertEqual(self._box.get_string(key1), 478 self._template % 'changed 1') 479 self.assertEqual(self._box.get_string(key2), 480 self._template % 'changed 2') 481 self.assertRaises(KeyError, 482 lambda: self._box.update({'foo': 'bar', 483 key0: self._template % "changed 0"})) 484 self.assertEqual(len(self._box), 3) 485 self.assertEqual(self._box.get_string(key0), 486 self._template % "changed 0") 487 self.assertEqual(self._box.get_string(key1), 488 self._template % "changed 1") 489 self.assertEqual(self._box.get_string(key2), 490 self._template % "changed 2") 491 492 def test_flush(self): 493 # Write changes to disk 494 self._test_flush_or_close(self._box.flush, True) 495 496 def test_popitem_and_flush_twice(self): 497 # See #15036. 498 self._box.add(self._template % 0) 499 self._box.add(self._template % 1) 500 self._box.flush() 501 502 self._box.popitem() 503 self._box.flush() 504 self._box.popitem() 505 self._box.flush() 506 507 def test_lock_unlock(self): 508 # Lock and unlock the mailbox 509 self.assertFalse(os.path.exists(self._get_lock_path())) 510 self._box.lock() 511 self.assertTrue(os.path.exists(self._get_lock_path())) 512 self._box.unlock() 513 self.assertFalse(os.path.exists(self._get_lock_path())) 514 515 def test_close(self): 516 # Close mailbox and flush changes to disk 517 self._test_flush_or_close(self._box.close, False) 518 519 def _test_flush_or_close(self, method, should_call_close): 520 contents = [self._template % i for i in range(3)] 521 self._box.add(contents[0]) 522 self._box.add(contents[1]) 523 self._box.add(contents[2]) 524 oldbox = self._box 525 method() 526 if should_call_close: 527 self._box.close() 528 self._box = self._factory(self._path) 529 keys = self._box.keys() 530 self.assertEqual(len(keys), 3) 531 for key in keys: 532 self.assertIn(self._box.get_string(key), contents) 533 oldbox.close() 534 535 def test_dump_message(self): 536 # Write message representations to disk 537 for input in (email.message_from_string(_sample_message), 538 _sample_message, io.BytesIO(_bytes_sample_message)): 539 output = io.BytesIO() 540 self._box._dump_message(input, output) 541 self.assertEqual(output.getvalue(), 542 _bytes_sample_message.replace(b'\n', os.linesep.encode())) 543 output = io.BytesIO() 544 self.assertRaises(TypeError, 545 lambda: self._box._dump_message(None, output)) 546 547 def _get_lock_path(self): 548 # Return the path of the dot lock file. May be overridden. 549 return self._path + '.lock' 550 551 552 class TestMailboxSuperclass(TestBase, unittest.TestCase): 553 554 def test_notimplemented(self): 555 # Test that all Mailbox methods raise NotImplementedException. 556 box = mailbox.Mailbox('path') 557 self.assertRaises(NotImplementedError, lambda: box.add('')) 558 self.assertRaises(NotImplementedError, lambda: box.remove('')) 559 self.assertRaises(NotImplementedError, lambda: box.__delitem__('')) 560 self.assertRaises(NotImplementedError, lambda: box.discard('')) 561 self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) 562 self.assertRaises(NotImplementedError, lambda: box.iterkeys()) 563 self.assertRaises(NotImplementedError, lambda: box.keys()) 564 self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__()) 565 self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__()) 566 self.assertRaises(NotImplementedError, lambda: box.values()) 567 self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__()) 568 self.assertRaises(NotImplementedError, lambda: box.items()) 569 self.assertRaises(NotImplementedError, lambda: box.get('')) 570 self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) 571 self.assertRaises(NotImplementedError, lambda: box.get_message('')) 572 self.assertRaises(NotImplementedError, lambda: box.get_string('')) 573 self.assertRaises(NotImplementedError, lambda: box.get_bytes('')) 574 self.assertRaises(NotImplementedError, lambda: box.get_file('')) 575 self.assertRaises(NotImplementedError, lambda: '' in box) 576 self.assertRaises(NotImplementedError, lambda: box.__contains__('')) 577 self.assertRaises(NotImplementedError, lambda: box.__len__()) 578 self.assertRaises(NotImplementedError, lambda: box.clear()) 579 self.assertRaises(NotImplementedError, lambda: box.pop('')) 580 self.assertRaises(NotImplementedError, lambda: box.popitem()) 581 self.assertRaises(NotImplementedError, lambda: box.update((('', ''),))) 582 self.assertRaises(NotImplementedError, lambda: box.flush()) 583 self.assertRaises(NotImplementedError, lambda: box.lock()) 584 self.assertRaises(NotImplementedError, lambda: box.unlock()) 585 self.assertRaises(NotImplementedError, lambda: box.close()) 586 587 588 class TestMaildir(TestMailbox, unittest.TestCase): 589 590 _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) 591 592 def setUp(self): 593 TestMailbox.setUp(self) 594 if (os.name == 'nt') or (sys.platform == 'cygwin'): 595 self._box.colon = '!' 596 597 def assertMailboxEmpty(self): 598 self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), []) 599 600 def test_add_MM(self): 601 # Add a MaildirMessage instance 602 msg = mailbox.MaildirMessage(self._template % 0) 603 msg.set_subdir('cur') 604 msg.set_info('foo') 605 key = self._box.add(msg) 606 self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' % 607 (key, self._box.colon)))) 608 609 def test_get_MM(self): 610 # Get a MaildirMessage instance 611 msg = mailbox.MaildirMessage(self._template % 0) 612 msg.set_subdir('cur') 613 msg.set_flags('RF') 614 key = self._box.add(msg) 615 msg_returned = self._box.get_message(key) 616 self.assertIsInstance(msg_returned, mailbox.MaildirMessage) 617 self.assertEqual(msg_returned.get_subdir(), 'cur') 618 self.assertEqual(msg_returned.get_flags(), 'FR') 619 620 def test_set_MM(self): 621 # Set with a MaildirMessage instance 622 msg0 = mailbox.MaildirMessage(self._template % 0) 623 msg0.set_flags('TP') 624 key = self._box.add(msg0) 625 msg_returned = self._box.get_message(key) 626 self.assertEqual(msg_returned.get_subdir(), 'new') 627 self.assertEqual(msg_returned.get_flags(), 'PT') 628 msg1 = mailbox.MaildirMessage(self._template % 1) 629 self._box[key] = msg1 630 msg_returned = self._box.get_message(key) 631 self.assertEqual(msg_returned.get_subdir(), 'new') 632 self.assertEqual(msg_returned.get_flags(), '') 633 self.assertEqual(msg_returned.get_payload(), '1\n') 634 msg2 = mailbox.MaildirMessage(self._template % 2) 635 msg2.set_info('2,S') 636 self._box[key] = msg2 637 self._box[key] = self._template % 3 638 msg_returned = self._box.get_message(key) 639 self.assertEqual(msg_returned.get_subdir(), 'new') 640 self.assertEqual(msg_returned.get_flags(), 'S') 641 self.assertEqual(msg_returned.get_payload(), '3\n') 642 643 def test_consistent_factory(self): 644 # Add a message. 645 msg = mailbox.MaildirMessage(self._template % 0) 646 msg.set_subdir('cur') 647 msg.set_flags('RF') 648 key = self._box.add(msg) 649 650 # Create new mailbox with 651 class FakeMessage(mailbox.MaildirMessage): 652 pass 653 box = mailbox.Maildir(self._path, factory=FakeMessage) 654 box.colon = self._box.colon 655 msg2 = box.get_message(key) 656 self.assertIsInstance(msg2, FakeMessage) 657 658 def test_initialize_new(self): 659 # Initialize a non-existent mailbox 660 self.tearDown() 661 self._box = mailbox.Maildir(self._path) 662 self._check_basics() 663 self._delete_recursively(self._path) 664 self._box = self._factory(self._path, factory=None) 665 self._check_basics() 666 667 def test_initialize_existing(self): 668 # Initialize an existing mailbox 669 self.tearDown() 670 for subdir in '', 'tmp', 'new', 'cur': 671 os.mkdir(os.path.normpath(os.path.join(self._path, subdir))) 672 self._box = mailbox.Maildir(self._path) 673 self._check_basics() 674 675 def _check_basics(self, factory=None): 676 # (Used by test_open_new() and test_open_existing().) 677 self.assertEqual(self._box._path, os.path.abspath(self._path)) 678 self.assertEqual(self._box._factory, factory) 679 for subdir in '', 'tmp', 'new', 'cur': 680 path = os.path.join(self._path, subdir) 681 mode = os.stat(path)[stat.ST_MODE] 682 self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path) 683 684 def test_list_folders(self): 685 # List folders 686 self._box.add_folder('one') 687 self._box.add_folder('two') 688 self._box.add_folder('three') 689 self.assertEqual(len(self._box.list_folders()), 3) 690 self.assertEqual(set(self._box.list_folders()), 691 set(('one', 'two', 'three'))) 692 693 def test_get_folder(self): 694 # Open folders 695 self._box.add_folder('foo.bar') 696 folder0 = self._box.get_folder('foo.bar') 697 folder0.add(self._template % 'bar') 698 self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar'))) 699 folder1 = self._box.get_folder('foo.bar') 700 self.assertEqual(folder1.get_string(folder1.keys()[0]), 701 self._template % 'bar') 702 703 def test_add_and_remove_folders(self): 704 # Delete folders 705 self._box.add_folder('one') 706 self._box.add_folder('two') 707 self.assertEqual(len(self._box.list_folders()), 2) 708 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 709 self._box.remove_folder('one') 710 self.assertEqual(len(self._box.list_folders()), 1) 711 self.assertEqual(set(self._box.list_folders()), set(('two',))) 712 self._box.add_folder('three') 713 self.assertEqual(len(self._box.list_folders()), 2) 714 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 715 self._box.remove_folder('three') 716 self.assertEqual(len(self._box.list_folders()), 1) 717 self.assertEqual(set(self._box.list_folders()), set(('two',))) 718 self._box.remove_folder('two') 719 self.assertEqual(len(self._box.list_folders()), 0) 720 self.assertEqual(self._box.list_folders(), []) 721 722 def test_clean(self): 723 # Remove old files from 'tmp' 724 foo_path = os.path.join(self._path, 'tmp', 'foo') 725 bar_path = os.path.join(self._path, 'tmp', 'bar') 726 with open(foo_path, 'w') as f: 727 f.write("@") 728 with open(bar_path, 'w') as f: 729 f.write("@") 730 self._box.clean() 731 self.assertTrue(os.path.exists(foo_path)) 732 self.assertTrue(os.path.exists(bar_path)) 733 foo_stat = os.stat(foo_path) 734 os.utime(foo_path, (time.time() - 129600 - 2, 735 foo_stat.st_mtime)) 736 self._box.clean() 737 self.assertFalse(os.path.exists(foo_path)) 738 self.assertTrue(os.path.exists(bar_path)) 739 740 def test_create_tmp(self, repetitions=10): 741 # Create files in tmp directory 742 hostname = socket.gethostname() 743 if '/' in hostname: 744 hostname = hostname.replace('/', r'\057') 745 if ':' in hostname: 746 hostname = hostname.replace(':', r'\072') 747 pid = os.getpid() 748 pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)" 749 r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)") 750 previous_groups = None 751 for x in range(repetitions): 752 tmp_file = self._box._create_tmp() 753 head, tail = os.path.split(tmp_file.name) 754 self.assertEqual(head, os.path.abspath(os.path.join(self._path, 755 "tmp")), 756 "File in wrong location: '%s'" % head) 757 match = pattern.match(tail) 758 self.assertIsNotNone(match, "Invalid file name: '%s'" % tail) 759 groups = match.groups() 760 if previous_groups is not None: 761 self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]), 762 "Non-monotonic seconds: '%s' before '%s'" % 763 (previous_groups[0], groups[0])) 764 if int(groups[0]) == int(previous_groups[0]): 765 self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]), 766 "Non-monotonic milliseconds: '%s' before '%s'" % 767 (previous_groups[1], groups[1])) 768 self.assertEqual(int(groups[2]), pid, 769 "Process ID mismatch: '%s' should be '%s'" % 770 (groups[2], pid)) 771 self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1, 772 "Non-sequential counter: '%s' before '%s'" % 773 (previous_groups[3], groups[3])) 774 self.assertEqual(groups[4], hostname, 775 "Host name mismatch: '%s' should be '%s'" % 776 (groups[4], hostname)) 777 previous_groups = groups 778 tmp_file.write(_bytes_sample_message) 779 tmp_file.seek(0) 780 self.assertEqual(tmp_file.read(), _bytes_sample_message) 781 tmp_file.close() 782 file_count = len(os.listdir(os.path.join(self._path, "tmp"))) 783 self.assertEqual(file_count, repetitions, 784 "Wrong file count: '%s' should be '%s'" % 785 (file_count, repetitions)) 786 787 def test_refresh(self): 788 # Update the table of contents 789 self.assertEqual(self._box._toc, {}) 790 key0 = self._box.add(self._template % 0) 791 key1 = self._box.add(self._template % 1) 792 self.assertEqual(self._box._toc, {}) 793 self._box._refresh() 794 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 795 key1: os.path.join('new', key1)}) 796 key2 = self._box.add(self._template % 2) 797 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 798 key1: os.path.join('new', key1)}) 799 self._box._refresh() 800 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 801 key1: os.path.join('new', key1), 802 key2: os.path.join('new', key2)}) 803 804 def test_refresh_after_safety_period(self): 805 # Issue #13254: Call _refresh after the "file system safety 806 # period" of 2 seconds has passed; _toc should still be 807 # updated because this is the first call to _refresh. 808 key0 = self._box.add(self._template % 0) 809 key1 = self._box.add(self._template % 1) 810 811 self._box = self._factory(self._path) 812 self.assertEqual(self._box._toc, {}) 813 814 # Emulate sleeping. Instead of sleeping for 2 seconds, use the 815 # skew factor to make _refresh think that the filesystem 816 # safety period has passed and re-reading the _toc is only 817 # required if mtimes differ. 818 self._box._skewfactor = -3 819 820 self._box._refresh() 821 self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1])) 822 823 def test_lookup(self): 824 # Look up message subpaths in the TOC 825 self.assertRaises(KeyError, lambda: self._box._lookup('foo')) 826 key0 = self._box.add(self._template % 0) 827 self.assertEqual(self._box._lookup(key0), os.path.join('new', key0)) 828 os.remove(os.path.join(self._path, 'new', key0)) 829 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)}) 830 # Be sure that the TOC is read back from disk (see issue #6896 831 # about bad mtime behaviour on some systems). 832 self._box.flush() 833 self.assertRaises(KeyError, lambda: self._box._lookup(key0)) 834 self.assertEqual(self._box._toc, {}) 835 836 def test_lock_unlock(self): 837 # Lock and unlock the mailbox. For Maildir, this does nothing. 838 self._box.lock() 839 self._box.unlock() 840 841 def test_folder (self): 842 # Test for bug #1569790: verify that folders returned by .get_folder() 843 # use the same factory function. 844 def dummy_factory (s): 845 return None 846 box = self._factory(self._path, factory=dummy_factory) 847 folder = box.add_folder('folder1') 848 self.assertIs(folder._factory, dummy_factory) 849 850 folder1_alias = box.get_folder('folder1') 851 self.assertIs(folder1_alias._factory, dummy_factory) 852 853 def test_directory_in_folder (self): 854 # Test that mailboxes still work if there's a stray extra directory 855 # in a folder. 856 for i in range(10): 857 self._box.add(mailbox.Message(_sample_message)) 858 859 # Create a stray directory 860 os.mkdir(os.path.join(self._path, 'cur', 'stray-dir')) 861 862 # Check that looping still works with the directory present. 863 for msg in self._box: 864 pass 865 866 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 867 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 868 def test_file_permissions(self): 869 # Verify that message files are created without execute permissions 870 msg = mailbox.MaildirMessage(self._template % 0) 871 orig_umask = os.umask(0) 872 try: 873 key = self._box.add(msg) 874 finally: 875 os.umask(orig_umask) 876 path = os.path.join(self._path, self._box._lookup(key)) 877 mode = os.stat(path).st_mode 878 self.assertFalse(mode & 0o111) 879 880 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 881 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 882 def test_folder_file_perms(self): 883 # From bug #3228, we want to verify that the file created inside a Maildir 884 # subfolder isn't marked as executable. 885 orig_umask = os.umask(0) 886 try: 887 subfolder = self._box.add_folder('subfolder') 888 finally: 889 os.umask(orig_umask) 890 891 path = os.path.join(subfolder._path, 'maildirfolder') 892 st = os.stat(path) 893 perms = st.st_mode 894 self.assertFalse((perms & 0o111)) # Execute bits should all be off. 895 896 def test_reread(self): 897 # Do an initial unconditional refresh 898 self._box._refresh() 899 900 # Put the last modified times more than two seconds into the past 901 # (because mtime may have a two second granularity) 902 for subdir in ('cur', 'new'): 903 os.utime(os.path.join(self._box._path, subdir), 904 (time.time()-5,)*2) 905 906 # Because mtime has a two second granularity in worst case (FAT), a 907 # refresh is done unconditionally if called for within 908 # two-second-plus-a-bit of the last one, just in case the mbox has 909 # changed; so now we have to wait for that interval to expire. 910 # 911 # Because this is a test, emulate sleeping. Instead of 912 # sleeping for 2 seconds, use the skew factor to make _refresh 913 # think that 2 seconds have passed and re-reading the _toc is 914 # only required if mtimes differ. 915 self._box._skewfactor = -3 916 917 # Re-reading causes the ._toc attribute to be assigned a new dictionary 918 # object, so we'll check that the ._toc attribute isn't a different 919 # object. 920 orig_toc = self._box._toc 921 def refreshed(): 922 return self._box._toc is not orig_toc 923 924 self._box._refresh() 925 self.assertFalse(refreshed()) 926 927 # Now, write something into cur and remove it. This changes 928 # the mtime and should cause a re-read. Note that "sleep 929 # emulation" is still in effect, as skewfactor is -3. 930 filename = os.path.join(self._path, 'cur', 'stray-file') 931 support.create_empty_file(filename) 932 os.unlink(filename) 933 self._box._refresh() 934 self.assertTrue(refreshed()) 935 936 937 class _TestSingleFile(TestMailbox): 938 '''Common tests for single-file mailboxes''' 939 940 def test_add_doesnt_rewrite(self): 941 # When only adding messages, flush() should not rewrite the 942 # mailbox file. See issue #9559. 943 944 # Inode number changes if the contents are written to another 945 # file which is then renamed over the original file. So we 946 # must check that the inode number doesn't change. 947 inode_before = os.stat(self._path).st_ino 948 949 self._box.add(self._template % 0) 950 self._box.flush() 951 952 inode_after = os.stat(self._path).st_ino 953 self.assertEqual(inode_before, inode_after) 954 955 # Make sure the message was really added 956 self._box.close() 957 self._box = self._factory(self._path) 958 self.assertEqual(len(self._box), 1) 959 960 def test_permissions_after_flush(self): 961 # See issue #5346 962 963 # Make the mailbox world writable. It's unlikely that the new 964 # mailbox file would have these permissions after flush(), 965 # because umask usually prevents it. 966 mode = os.stat(self._path).st_mode | 0o666 967 os.chmod(self._path, mode) 968 969 self._box.add(self._template % 0) 970 i = self._box.add(self._template % 1) 971 # Need to remove one message to make flush() create a new file 972 self._box.remove(i) 973 self._box.flush() 974 975 self.assertEqual(os.stat(self._path).st_mode, mode) 976 977 978 class _TestMboxMMDF(_TestSingleFile): 979 980 def tearDown(self): 981 super().tearDown() 982 self._box.close() 983 self._delete_recursively(self._path) 984 for lock_remnant in glob.glob(self._path + '.*'): 985 support.unlink(lock_remnant) 986 987 def assertMailboxEmpty(self): 988 with open(self._path) as f: 989 self.assertEqual(f.readlines(), []) 990 991 def test_add_from_string(self): 992 # Add a string starting with 'From ' to the mailbox 993 key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n') 994 self.assertEqual(self._box[key].get_from(), 'foo@bar blah') 995 self.assertEqual(self._box[key].get_payload(), '0\n') 996 997 def test_add_from_bytes(self): 998 # Add a byte string starting with 'From ' to the mailbox 999 key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0\n') 1000 self.assertEqual(self._box[key].get_from(), 'foo@bar blah') 1001 self.assertEqual(self._box[key].get_payload(), '0\n') 1002 1003 def test_add_mbox_or_mmdf_message(self): 1004 # Add an mboxMessage or MMDFMessage 1005 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1006 msg = class_('From foo@bar blah\nFrom: foo\n\n0\n') 1007 key = self._box.add(msg) 1008 1009 def test_open_close_open(self): 1010 # Open and inspect previously-created mailbox 1011 values = [self._template % i for i in range(3)] 1012 for value in values: 1013 self._box.add(value) 1014 self._box.close() 1015 mtime = os.path.getmtime(self._path) 1016 self._box = self._factory(self._path) 1017 self.assertEqual(len(self._box), 3) 1018 for key in self._box.iterkeys(): 1019 self.assertIn(self._box.get_string(key), values) 1020 self._box.close() 1021 self.assertEqual(mtime, os.path.getmtime(self._path)) 1022 1023 def test_add_and_close(self): 1024 # Verifying that closing a mailbox doesn't change added items 1025 self._box.add(_sample_message) 1026 for i in range(3): 1027 self._box.add(self._template % i) 1028 self._box.add(_sample_message) 1029 self._box._file.flush() 1030 self._box._file.seek(0) 1031 contents = self._box._file.read() 1032 self._box.close() 1033 with open(self._path, 'rb') as f: 1034 self.assertEqual(contents, f.read()) 1035 self._box = self._factory(self._path) 1036 1037 @unittest.skipUnless(hasattr(os, 'fork'), "Test needs fork().") 1038 @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().") 1039 def test_lock_conflict(self): 1040 # Fork off a child process that will lock the mailbox temporarily, 1041 # unlock it and exit. 1042 c, p = socket.socketpair() 1043 self.addCleanup(c.close) 1044 self.addCleanup(p.close) 1045 1046 pid = os.fork() 1047 if pid == 0: 1048 # child 1049 try: 1050 # lock the mailbox, and signal the parent it can proceed 1051 self._box.lock() 1052 c.send(b'c') 1053 1054 # wait until the parent is done, and unlock the mailbox 1055 c.recv(1) 1056 self._box.unlock() 1057 finally: 1058 os._exit(0) 1059 1060 # In the parent, wait until the child signals it locked the mailbox. 1061 p.recv(1) 1062 try: 1063 self.assertRaises(mailbox.ExternalClashError, 1064 self._box.lock) 1065 finally: 1066 # Signal the child it can now release the lock and exit. 1067 p.send(b'p') 1068 # Wait for child to exit. Locking should now succeed. 1069 exited_pid, status = os.waitpid(pid, 0) 1070 1071 self._box.lock() 1072 self._box.unlock() 1073 1074 def test_relock(self): 1075 # Test case for bug #1575506: the mailbox class was locking the 1076 # wrong file object in its flush() method. 1077 msg = "Subject: sub\n\nbody\n" 1078 key1 = self._box.add(msg) 1079 self._box.flush() 1080 self._box.close() 1081 1082 self._box = self._factory(self._path) 1083 self._box.lock() 1084 key2 = self._box.add(msg) 1085 self._box.flush() 1086 self.assertTrue(self._box._locked) 1087 self._box.close() 1088 1089 1090 class TestMbox(_TestMboxMMDF, unittest.TestCase): 1091 1092 _factory = lambda self, path, factory=None: mailbox.mbox(path, factory) 1093 1094 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 1095 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 1096 def test_file_perms(self): 1097 # From bug #3228, we want to verify that the mailbox file isn't executable, 1098 # even if the umask is set to something that would leave executable bits set. 1099 # We only run this test on platforms that support umask. 1100 try: 1101 old_umask = os.umask(0o077) 1102 self._box.close() 1103 os.unlink(self._path) 1104 self._box = mailbox.mbox(self._path, create=True) 1105 self._box.add('') 1106 self._box.close() 1107 finally: 1108 os.umask(old_umask) 1109 1110 st = os.stat(self._path) 1111 perms = st.st_mode 1112 self.assertFalse((perms & 0o111)) # Execute bits should all be off. 1113 1114 def test_terminating_newline(self): 1115 message = email.message.Message() 1116 message['From'] = 'john (at] example.com' 1117 message.set_payload('No newline at the end') 1118 i = self._box.add(message) 1119 1120 # A newline should have been appended to the payload 1121 message = self._box.get(i) 1122 self.assertEqual(message.get_payload(), 'No newline at the end\n') 1123 1124 def test_message_separator(self): 1125 # Check there's always a single blank line after each message 1126 self._box.add('From: foo\n\n0') # No newline at the end 1127 with open(self._path) as f: 1128 data = f.read() 1129 self.assertEqual(data[-3:], '0\n\n') 1130 1131 self._box.add('From: foo\n\n0\n') # Newline at the end 1132 with open(self._path) as f: 1133 data = f.read() 1134 self.assertEqual(data[-3:], '0\n\n') 1135 1136 1137 class TestMMDF(_TestMboxMMDF, unittest.TestCase): 1138 1139 _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory) 1140 1141 1142 class TestMH(TestMailbox, unittest.TestCase): 1143 1144 _factory = lambda self, path, factory=None: mailbox.MH(path, factory) 1145 1146 def assertMailboxEmpty(self): 1147 self.assertEqual(os.listdir(self._path), ['.mh_sequences']) 1148 1149 def test_list_folders(self): 1150 # List folders 1151 self._box.add_folder('one') 1152 self._box.add_folder('two') 1153 self._box.add_folder('three') 1154 self.assertEqual(len(self._box.list_folders()), 3) 1155 self.assertEqual(set(self._box.list_folders()), 1156 set(('one', 'two', 'three'))) 1157 1158 def test_get_folder(self): 1159 # Open folders 1160 def dummy_factory (s): 1161 return None 1162 self._box = self._factory(self._path, dummy_factory) 1163 1164 new_folder = self._box.add_folder('foo.bar') 1165 folder0 = self._box.get_folder('foo.bar') 1166 folder0.add(self._template % 'bar') 1167 self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar'))) 1168 folder1 = self._box.get_folder('foo.bar') 1169 self.assertEqual(folder1.get_string(folder1.keys()[0]), 1170 self._template % 'bar') 1171 1172 # Test for bug #1569790: verify that folders returned by .get_folder() 1173 # use the same factory function. 1174 self.assertIs(new_folder._factory, self._box._factory) 1175 self.assertIs(folder0._factory, self._box._factory) 1176 1177 def test_add_and_remove_folders(self): 1178 # Delete folders 1179 self._box.add_folder('one') 1180 self._box.add_folder('two') 1181 self.assertEqual(len(self._box.list_folders()), 2) 1182 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 1183 self._box.remove_folder('one') 1184 self.assertEqual(len(self._box.list_folders()), 1) 1185 self.assertEqual(set(self._box.list_folders()), set(('two',))) 1186 self._box.add_folder('three') 1187 self.assertEqual(len(self._box.list_folders()), 2) 1188 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 1189 self._box.remove_folder('three') 1190 self.assertEqual(len(self._box.list_folders()), 1) 1191 self.assertEqual(set(self._box.list_folders()), set(('two',))) 1192 self._box.remove_folder('two') 1193 self.assertEqual(len(self._box.list_folders()), 0) 1194 self.assertEqual(self._box.list_folders(), []) 1195 1196 def test_sequences(self): 1197 # Get and set sequences 1198 self.assertEqual(self._box.get_sequences(), {}) 1199 msg0 = mailbox.MHMessage(self._template % 0) 1200 msg0.add_sequence('foo') 1201 key0 = self._box.add(msg0) 1202 self.assertEqual(self._box.get_sequences(), {'foo':[key0]}) 1203 msg1 = mailbox.MHMessage(self._template % 1) 1204 msg1.set_sequences(['bar', 'replied', 'foo']) 1205 key1 = self._box.add(msg1) 1206 self.assertEqual(self._box.get_sequences(), 1207 {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]}) 1208 msg0.set_sequences(['flagged']) 1209 self._box[key0] = msg0 1210 self.assertEqual(self._box.get_sequences(), 1211 {'foo':[key1], 'bar':[key1], 'replied':[key1], 1212 'flagged':[key0]}) 1213 self._box.remove(key1) 1214 self.assertEqual(self._box.get_sequences(), {'flagged':[key0]}) 1215 1216 def test_issue2625(self): 1217 msg0 = mailbox.MHMessage(self._template % 0) 1218 msg0.add_sequence('foo') 1219 key0 = self._box.add(msg0) 1220 refmsg0 = self._box.get_message(key0) 1221 1222 def test_issue7627(self): 1223 msg0 = mailbox.MHMessage(self._template % 0) 1224 key0 = self._box.add(msg0) 1225 self._box.lock() 1226 self._box.remove(key0) 1227 self._box.unlock() 1228 1229 def test_pack(self): 1230 # Pack the contents of the mailbox 1231 msg0 = mailbox.MHMessage(self._template % 0) 1232 msg1 = mailbox.MHMessage(self._template % 1) 1233 msg2 = mailbox.MHMessage(self._template % 2) 1234 msg3 = mailbox.MHMessage(self._template % 3) 1235 msg0.set_sequences(['foo', 'unseen']) 1236 msg1.set_sequences(['foo']) 1237 msg2.set_sequences(['foo', 'flagged']) 1238 msg3.set_sequences(['foo', 'bar', 'replied']) 1239 key0 = self._box.add(msg0) 1240 key1 = self._box.add(msg1) 1241 key2 = self._box.add(msg2) 1242 key3 = self._box.add(msg3) 1243 self.assertEqual(self._box.get_sequences(), 1244 {'foo':[key0,key1,key2,key3], 'unseen':[key0], 1245 'flagged':[key2], 'bar':[key3], 'replied':[key3]}) 1246 self._box.remove(key2) 1247 self.assertEqual(self._box.get_sequences(), 1248 {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3], 1249 'replied':[key3]}) 1250 self._box.pack() 1251 self.assertEqual(self._box.keys(), [1, 2, 3]) 1252 key0 = key0 1253 key1 = key0 + 1 1254 key2 = key1 + 1 1255 self.assertEqual(self._box.get_sequences(), 1256 {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]}) 1257 1258 # Test case for packing while holding the mailbox locked. 1259 key0 = self._box.add(msg1) 1260 key1 = self._box.add(msg1) 1261 key2 = self._box.add(msg1) 1262 key3 = self._box.add(msg1) 1263 1264 self._box.remove(key0) 1265 self._box.remove(key2) 1266 self._box.lock() 1267 self._box.pack() 1268 self._box.unlock() 1269 self.assertEqual(self._box.get_sequences(), 1270 {'foo':[1, 2, 3, 4, 5], 1271 'unseen':[1], 'bar':[3], 'replied':[3]}) 1272 1273 def _get_lock_path(self): 1274 return os.path.join(self._path, '.mh_sequences.lock') 1275 1276 1277 class TestBabyl(_TestSingleFile, unittest.TestCase): 1278 1279 _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory) 1280 1281 def assertMailboxEmpty(self): 1282 with open(self._path) as f: 1283 self.assertEqual(f.readlines(), []) 1284 1285 def tearDown(self): 1286 super().tearDown() 1287 self._box.close() 1288 self._delete_recursively(self._path) 1289 for lock_remnant in glob.glob(self._path + '.*'): 1290 support.unlink(lock_remnant) 1291 1292 def test_labels(self): 1293 # Get labels from the mailbox 1294 self.assertEqual(self._box.get_labels(), []) 1295 msg0 = mailbox.BabylMessage(self._template % 0) 1296 msg0.add_label('foo') 1297 key0 = self._box.add(msg0) 1298 self.assertEqual(self._box.get_labels(), ['foo']) 1299 msg1 = mailbox.BabylMessage(self._template % 1) 1300 msg1.set_labels(['bar', 'answered', 'foo']) 1301 key1 = self._box.add(msg1) 1302 self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar'])) 1303 msg0.set_labels(['blah', 'filed']) 1304 self._box[key0] = msg0 1305 self.assertEqual(set(self._box.get_labels()), 1306 set(['foo', 'bar', 'blah'])) 1307 self._box.remove(key1) 1308 self.assertEqual(set(self._box.get_labels()), set(['blah'])) 1309 1310 1311 class FakeFileLikeObject: 1312 1313 def __init__(self): 1314 self.closed = False 1315 1316 def close(self): 1317 self.closed = True 1318 1319 1320 class FakeMailBox(mailbox.Mailbox): 1321 1322 def __init__(self): 1323 mailbox.Mailbox.__init__(self, '', lambda file: None) 1324 self.files = [FakeFileLikeObject() for i in range(10)] 1325 1326 def get_file(self, key): 1327 return self.files[key] 1328 1329 1330 class TestFakeMailBox(unittest.TestCase): 1331 1332 def test_closing_fd(self): 1333 box = FakeMailBox() 1334 for i in range(10): 1335 self.assertFalse(box.files[i].closed) 1336 for i in range(10): 1337 box[i] 1338 for i in range(10): 1339 self.assertTrue(box.files[i].closed) 1340 1341 1342 class TestMessage(TestBase, unittest.TestCase): 1343 1344 _factory = mailbox.Message # Overridden by subclasses to reuse tests 1345 1346 def setUp(self): 1347 self._path = support.TESTFN 1348 1349 def tearDown(self): 1350 self._delete_recursively(self._path) 1351 1352 def test_initialize_with_eMM(self): 1353 # Initialize based on email.message.Message instance 1354 eMM = email.message_from_string(_sample_message) 1355 msg = self._factory(eMM) 1356 self._post_initialize_hook(msg) 1357 self._check_sample(msg) 1358 1359 def test_initialize_with_string(self): 1360 # Initialize based on string 1361 msg = self._factory(_sample_message) 1362 self._post_initialize_hook(msg) 1363 self._check_sample(msg) 1364 1365 def test_initialize_with_file(self): 1366 # Initialize based on contents of file 1367 with open(self._path, 'w+') as f: 1368 f.write(_sample_message) 1369 f.seek(0) 1370 msg = self._factory(f) 1371 self._post_initialize_hook(msg) 1372 self._check_sample(msg) 1373 1374 def test_initialize_with_binary_file(self): 1375 # Initialize based on contents of binary file 1376 with open(self._path, 'wb+') as f: 1377 f.write(_bytes_sample_message) 1378 f.seek(0) 1379 msg = self._factory(f) 1380 self._post_initialize_hook(msg) 1381 self._check_sample(msg) 1382 1383 def test_initialize_with_nothing(self): 1384 # Initialize without arguments 1385 msg = self._factory() 1386 self._post_initialize_hook(msg) 1387 self.assertIsInstance(msg, email.message.Message) 1388 self.assertIsInstance(msg, mailbox.Message) 1389 self.assertIsInstance(msg, self._factory) 1390 self.assertEqual(msg.keys(), []) 1391 self.assertFalse(msg.is_multipart()) 1392 self.assertIsNone(msg.get_payload()) 1393 1394 def test_initialize_incorrectly(self): 1395 # Initialize with invalid argument 1396 self.assertRaises(TypeError, lambda: self._factory(object())) 1397 1398 def test_all_eMM_attribues_exist(self): 1399 # Issue 12537 1400 eMM = email.message_from_string(_sample_message) 1401 msg = self._factory(_sample_message) 1402 for attr in eMM.__dict__: 1403 self.assertIn(attr, msg.__dict__, 1404 '{} attribute does not exist'.format(attr)) 1405 1406 def test_become_message(self): 1407 # Take on the state of another message 1408 eMM = email.message_from_string(_sample_message) 1409 msg = self._factory() 1410 msg._become_message(eMM) 1411 self._check_sample(msg) 1412 1413 def test_explain_to(self): 1414 # Copy self's format-specific data to other message formats. 1415 # This test is superficial; better ones are in TestMessageConversion. 1416 msg = self._factory() 1417 for class_ in self.all_mailbox_types: 1418 other_msg = class_() 1419 msg._explain_to(other_msg) 1420 other_msg = email.message.Message() 1421 self.assertRaises(TypeError, lambda: msg._explain_to(other_msg)) 1422 1423 def _post_initialize_hook(self, msg): 1424 # Overridden by subclasses to check extra things after initialization 1425 pass 1426 1427 1428 class TestMaildirMessage(TestMessage, unittest.TestCase): 1429 1430 _factory = mailbox.MaildirMessage 1431 1432 def _post_initialize_hook(self, msg): 1433 self.assertEqual(msg._subdir, 'new') 1434 self.assertEqual(msg._info, '') 1435 1436 def test_subdir(self): 1437 # Use get_subdir() and set_subdir() 1438 msg = mailbox.MaildirMessage(_sample_message) 1439 self.assertEqual(msg.get_subdir(), 'new') 1440 msg.set_subdir('cur') 1441 self.assertEqual(msg.get_subdir(), 'cur') 1442 msg.set_subdir('new') 1443 self.assertEqual(msg.get_subdir(), 'new') 1444 self.assertRaises(ValueError, lambda: msg.set_subdir('tmp')) 1445 self.assertEqual(msg.get_subdir(), 'new') 1446 msg.set_subdir('new') 1447 self.assertEqual(msg.get_subdir(), 'new') 1448 self._check_sample(msg) 1449 1450 def test_flags(self): 1451 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1452 msg = mailbox.MaildirMessage(_sample_message) 1453 self.assertEqual(msg.get_flags(), '') 1454 self.assertEqual(msg.get_subdir(), 'new') 1455 msg.set_flags('F') 1456 self.assertEqual(msg.get_subdir(), 'new') 1457 self.assertEqual(msg.get_flags(), 'F') 1458 msg.set_flags('SDTP') 1459 self.assertEqual(msg.get_flags(), 'DPST') 1460 msg.add_flag('FT') 1461 self.assertEqual(msg.get_flags(), 'DFPST') 1462 msg.remove_flag('TDRP') 1463 self.assertEqual(msg.get_flags(), 'FS') 1464 self.assertEqual(msg.get_subdir(), 'new') 1465 self._check_sample(msg) 1466 1467 def test_date(self): 1468 # Use get_date() and set_date() 1469 msg = mailbox.MaildirMessage(_sample_message) 1470 self.assertLess(abs(msg.get_date() - time.time()), 60) 1471 msg.set_date(0.0) 1472 self.assertEqual(msg.get_date(), 0.0) 1473 1474 def test_info(self): 1475 # Use get_info() and set_info() 1476 msg = mailbox.MaildirMessage(_sample_message) 1477 self.assertEqual(msg.get_info(), '') 1478 msg.set_info('1,foo=bar') 1479 self.assertEqual(msg.get_info(), '1,foo=bar') 1480 self.assertRaises(TypeError, lambda: msg.set_info(None)) 1481 self._check_sample(msg) 1482 1483 def test_info_and_flags(self): 1484 # Test interaction of info and flag methods 1485 msg = mailbox.MaildirMessage(_sample_message) 1486 self.assertEqual(msg.get_info(), '') 1487 msg.set_flags('SF') 1488 self.assertEqual(msg.get_flags(), 'FS') 1489 self.assertEqual(msg.get_info(), '2,FS') 1490 msg.set_info('1,') 1491 self.assertEqual(msg.get_flags(), '') 1492 self.assertEqual(msg.get_info(), '1,') 1493 msg.remove_flag('RPT') 1494 self.assertEqual(msg.get_flags(), '') 1495 self.assertEqual(msg.get_info(), '1,') 1496 msg.add_flag('D') 1497 self.assertEqual(msg.get_flags(), 'D') 1498 self.assertEqual(msg.get_info(), '2,D') 1499 self._check_sample(msg) 1500 1501 1502 class _TestMboxMMDFMessage: 1503 1504 _factory = mailbox._mboxMMDFMessage 1505 1506 def _post_initialize_hook(self, msg): 1507 self._check_from(msg) 1508 1509 def test_initialize_with_unixfrom(self): 1510 # Initialize with a message that already has a _unixfrom attribute 1511 msg = mailbox.Message(_sample_message) 1512 msg.set_unixfrom('From foo@bar blah') 1513 msg = mailbox.mboxMessage(msg) 1514 self.assertEqual(msg.get_from(), 'foo@bar blah', msg.get_from()) 1515 1516 def test_from(self): 1517 # Get and set "From " line 1518 msg = mailbox.mboxMessage(_sample_message) 1519 self._check_from(msg) 1520 msg.set_from('foo bar') 1521 self.assertEqual(msg.get_from(), 'foo bar') 1522 msg.set_from('foo@bar', True) 1523 self._check_from(msg, 'foo@bar') 1524 msg.set_from('blah@temp', time.localtime()) 1525 self._check_from(msg, 'blah@temp') 1526 1527 def test_flags(self): 1528 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1529 msg = mailbox.mboxMessage(_sample_message) 1530 self.assertEqual(msg.get_flags(), '') 1531 msg.set_flags('F') 1532 self.assertEqual(msg.get_flags(), 'F') 1533 msg.set_flags('XODR') 1534 self.assertEqual(msg.get_flags(), 'RODX') 1535 msg.add_flag('FA') 1536 self.assertEqual(msg.get_flags(), 'RODFAX') 1537 msg.remove_flag('FDXA') 1538 self.assertEqual(msg.get_flags(), 'RO') 1539 self._check_sample(msg) 1540 1541 def _check_from(self, msg, sender=None): 1542 # Check contents of "From " line 1543 if sender is None: 1544 sender = "MAILER-DAEMON" 1545 self.assertIsNotNone(re.match( 1546 sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:\d{2} \d{4}", 1547 msg.get_from())) 1548 1549 1550 class TestMboxMessage(_TestMboxMMDFMessage, TestMessage): 1551 1552 _factory = mailbox.mboxMessage 1553 1554 1555 class TestMHMessage(TestMessage, unittest.TestCase): 1556 1557 _factory = mailbox.MHMessage 1558 1559 def _post_initialize_hook(self, msg): 1560 self.assertEqual(msg._sequences, []) 1561 1562 def test_sequences(self): 1563 # Get, set, join, and leave sequences 1564 msg = mailbox.MHMessage(_sample_message) 1565 self.assertEqual(msg.get_sequences(), []) 1566 msg.set_sequences(['foobar']) 1567 self.assertEqual(msg.get_sequences(), ['foobar']) 1568 msg.set_sequences([]) 1569 self.assertEqual(msg.get_sequences(), []) 1570 msg.add_sequence('unseen') 1571 self.assertEqual(msg.get_sequences(), ['unseen']) 1572 msg.add_sequence('flagged') 1573 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1574 msg.add_sequence('flagged') 1575 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1576 msg.remove_sequence('unseen') 1577 self.assertEqual(msg.get_sequences(), ['flagged']) 1578 msg.add_sequence('foobar') 1579 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1580 msg.remove_sequence('replied') 1581 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1582 msg.set_sequences(['foobar', 'replied']) 1583 self.assertEqual(msg.get_sequences(), ['foobar', 'replied']) 1584 1585 1586 class TestBabylMessage(TestMessage, unittest.TestCase): 1587 1588 _factory = mailbox.BabylMessage 1589 1590 def _post_initialize_hook(self, msg): 1591 self.assertEqual(msg._labels, []) 1592 1593 def test_labels(self): 1594 # Get, set, join, and leave labels 1595 msg = mailbox.BabylMessage(_sample_message) 1596 self.assertEqual(msg.get_labels(), []) 1597 msg.set_labels(['foobar']) 1598 self.assertEqual(msg.get_labels(), ['foobar']) 1599 msg.set_labels([]) 1600 self.assertEqual(msg.get_labels(), []) 1601 msg.add_label('filed') 1602 self.assertEqual(msg.get_labels(), ['filed']) 1603 msg.add_label('resent') 1604 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1605 msg.add_label('resent') 1606 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1607 msg.remove_label('filed') 1608 self.assertEqual(msg.get_labels(), ['resent']) 1609 msg.add_label('foobar') 1610 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1611 msg.remove_label('unseen') 1612 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1613 msg.set_labels(['foobar', 'answered']) 1614 self.assertEqual(msg.get_labels(), ['foobar', 'answered']) 1615 1616 def test_visible(self): 1617 # Get, set, and update visible headers 1618 msg = mailbox.BabylMessage(_sample_message) 1619 visible = msg.get_visible() 1620 self.assertEqual(visible.keys(), []) 1621 self.assertIsNone(visible.get_payload()) 1622 visible['User-Agent'] = 'FooBar 1.0' 1623 visible['X-Whatever'] = 'Blah' 1624 self.assertEqual(msg.get_visible().keys(), []) 1625 msg.set_visible(visible) 1626 visible = msg.get_visible() 1627 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1628 self.assertEqual(visible['User-Agent'], 'FooBar 1.0') 1629 self.assertEqual(visible['X-Whatever'], 'Blah') 1630 self.assertIsNone(visible.get_payload()) 1631 msg.update_visible() 1632 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1633 self.assertIsNone(visible.get_payload()) 1634 visible = msg.get_visible() 1635 self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To', 1636 'Subject']) 1637 for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'): 1638 self.assertEqual(visible[header], msg[header]) 1639 1640 1641 class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage): 1642 1643 _factory = mailbox.MMDFMessage 1644 1645 1646 class TestMessageConversion(TestBase, unittest.TestCase): 1647 1648 def test_plain_to_x(self): 1649 # Convert Message to all formats 1650 for class_ in self.all_mailbox_types: 1651 msg_plain = mailbox.Message(_sample_message) 1652 msg = class_(msg_plain) 1653 self._check_sample(msg) 1654 1655 def test_x_to_plain(self): 1656 # Convert all formats to Message 1657 for class_ in self.all_mailbox_types: 1658 msg = class_(_sample_message) 1659 msg_plain = mailbox.Message(msg) 1660 self._check_sample(msg_plain) 1661 1662 def test_x_from_bytes(self): 1663 # Convert all formats to Message 1664 for class_ in self.all_mailbox_types: 1665 msg = class_(_bytes_sample_message) 1666 self._check_sample(msg) 1667 1668 def test_x_to_invalid(self): 1669 # Convert all formats to an invalid format 1670 for class_ in self.all_mailbox_types: 1671 self.assertRaises(TypeError, lambda: class_(False)) 1672 1673 def test_type_specific_attributes_removed_on_conversion(self): 1674 reference = {class_: class_(_sample_message).__dict__ 1675 for class_ in self.all_mailbox_types} 1676 for class1 in self.all_mailbox_types: 1677 for class2 in self.all_mailbox_types: 1678 if class1 is class2: 1679 continue 1680 source = class1(_sample_message) 1681 target = class2(source) 1682 type_specific = [a for a in reference[class1] 1683 if a not in reference[class2]] 1684 for attr in type_specific: 1685 self.assertNotIn(attr, target.__dict__, 1686 "while converting {} to {}".format(class1, class2)) 1687 1688 def test_maildir_to_maildir(self): 1689 # Convert MaildirMessage to MaildirMessage 1690 msg_maildir = mailbox.MaildirMessage(_sample_message) 1691 msg_maildir.set_flags('DFPRST') 1692 msg_maildir.set_subdir('cur') 1693 date = msg_maildir.get_date() 1694 msg = mailbox.MaildirMessage(msg_maildir) 1695 self._check_sample(msg) 1696 self.assertEqual(msg.get_flags(), 'DFPRST') 1697 self.assertEqual(msg.get_subdir(), 'cur') 1698 self.assertEqual(msg.get_date(), date) 1699 1700 def test_maildir_to_mboxmmdf(self): 1701 # Convert MaildirMessage to mboxmessage and MMDFMessage 1702 pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'), 1703 ('T', 'D'), ('DFPRST', 'RDFA')) 1704 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1705 msg_maildir = mailbox.MaildirMessage(_sample_message) 1706 msg_maildir.set_date(0.0) 1707 for setting, result in pairs: 1708 msg_maildir.set_flags(setting) 1709 msg = class_(msg_maildir) 1710 self.assertEqual(msg.get_flags(), result) 1711 self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' % 1712 time.asctime(time.gmtime(0.0))) 1713 msg_maildir.set_subdir('cur') 1714 self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA') 1715 1716 def test_maildir_to_mh(self): 1717 # Convert MaildirMessage to MHMessage 1718 msg_maildir = mailbox.MaildirMessage(_sample_message) 1719 pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']), 1720 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []), 1721 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged'])) 1722 for setting, result in pairs: 1723 msg_maildir.set_flags(setting) 1724 self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(), 1725 result) 1726 1727 def test_maildir_to_babyl(self): 1728 # Convert MaildirMessage to Babyl 1729 msg_maildir = mailbox.MaildirMessage(_sample_message) 1730 pairs = (('D', ['unseen']), ('F', ['unseen']), 1731 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']), 1732 ('S', []), ('T', ['unseen', 'deleted']), 1733 ('DFPRST', ['deleted', 'answered', 'forwarded'])) 1734 for setting, result in pairs: 1735 msg_maildir.set_flags(setting) 1736 self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(), 1737 result) 1738 1739 def test_mboxmmdf_to_maildir(self): 1740 # Convert mboxMessage and MMDFMessage to MaildirMessage 1741 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1742 msg_mboxMMDF = class_(_sample_message) 1743 msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0)) 1744 pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'), 1745 ('RODFA', 'FRST')) 1746 for setting, result in pairs: 1747 msg_mboxMMDF.set_flags(setting) 1748 msg = mailbox.MaildirMessage(msg_mboxMMDF) 1749 self.assertEqual(msg.get_flags(), result) 1750 self.assertEqual(msg.get_date(), 0.0) 1751 msg_mboxMMDF.set_flags('O') 1752 self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(), 1753 'cur') 1754 1755 def test_mboxmmdf_to_mboxmmdf(self): 1756 # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage 1757 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1758 msg_mboxMMDF = class_(_sample_message) 1759 msg_mboxMMDF.set_flags('RODFA') 1760 msg_mboxMMDF.set_from('foo@bar') 1761 for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1762 msg2 = class2_(msg_mboxMMDF) 1763 self.assertEqual(msg2.get_flags(), 'RODFA') 1764 self.assertEqual(msg2.get_from(), 'foo@bar') 1765 1766 def test_mboxmmdf_to_mh(self): 1767 # Convert mboxMessage and MMDFMessage to MHMessage 1768 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1769 msg_mboxMMDF = class_(_sample_message) 1770 pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']), 1771 ('F', ['unseen', 'flagged']), 1772 ('A', ['unseen', 'replied']), 1773 ('RODFA', ['replied', 'flagged'])) 1774 for setting, result in pairs: 1775 msg_mboxMMDF.set_flags(setting) 1776 self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(), 1777 result) 1778 1779 def test_mboxmmdf_to_babyl(self): 1780 # Convert mboxMessage and MMDFMessage to BabylMessage 1781 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1782 msg = class_(_sample_message) 1783 pairs = (('R', []), ('O', ['unseen']), 1784 ('D', ['unseen', 'deleted']), ('F', ['unseen']), 1785 ('A', ['unseen', 'answered']), 1786 ('RODFA', ['deleted', 'answered'])) 1787 for setting, result in pairs: 1788 msg.set_flags(setting) 1789 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1790 1791 def test_mh_to_maildir(self): 1792 # Convert MHMessage to MaildirMessage 1793 pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS')) 1794 for setting, result in pairs: 1795 msg = mailbox.MHMessage(_sample_message) 1796 msg.add_sequence(setting) 1797 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1798 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1799 msg = mailbox.MHMessage(_sample_message) 1800 msg.add_sequence('unseen') 1801 msg.add_sequence('replied') 1802 msg.add_sequence('flagged') 1803 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR') 1804 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1805 1806 def test_mh_to_mboxmmdf(self): 1807 # Convert MHMessage to mboxMessage and MMDFMessage 1808 pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF')) 1809 for setting, result in pairs: 1810 msg = mailbox.MHMessage(_sample_message) 1811 msg.add_sequence(setting) 1812 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1813 self.assertEqual(class_(msg).get_flags(), result) 1814 msg = mailbox.MHMessage(_sample_message) 1815 msg.add_sequence('unseen') 1816 msg.add_sequence('replied') 1817 msg.add_sequence('flagged') 1818 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1819 self.assertEqual(class_(msg).get_flags(), 'OFA') 1820 1821 def test_mh_to_mh(self): 1822 # Convert MHMessage to MHMessage 1823 msg = mailbox.MHMessage(_sample_message) 1824 msg.add_sequence('unseen') 1825 msg.add_sequence('replied') 1826 msg.add_sequence('flagged') 1827 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1828 ['unseen', 'replied', 'flagged']) 1829 1830 def test_mh_to_babyl(self): 1831 # Convert MHMessage to BabylMessage 1832 pairs = (('unseen', ['unseen']), ('replied', ['answered']), 1833 ('flagged', [])) 1834 for setting, result in pairs: 1835 msg = mailbox.MHMessage(_sample_message) 1836 msg.add_sequence(setting) 1837 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1838 msg = mailbox.MHMessage(_sample_message) 1839 msg.add_sequence('unseen') 1840 msg.add_sequence('replied') 1841 msg.add_sequence('flagged') 1842 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), 1843 ['unseen', 'answered']) 1844 1845 def test_babyl_to_maildir(self): 1846 # Convert BabylMessage to MaildirMessage 1847 pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'), 1848 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'), 1849 ('resent', 'PS')) 1850 for setting, result in pairs: 1851 msg = mailbox.BabylMessage(_sample_message) 1852 msg.add_label(setting) 1853 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1854 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1855 msg = mailbox.BabylMessage(_sample_message) 1856 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1857 'edited', 'resent'): 1858 msg.add_label(label) 1859 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT') 1860 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1861 1862 def test_babyl_to_mboxmmdf(self): 1863 # Convert BabylMessage to mboxMessage and MMDFMessage 1864 pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'), 1865 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'), 1866 ('resent', 'RO')) 1867 for setting, result in pairs: 1868 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1869 msg = mailbox.BabylMessage(_sample_message) 1870 msg.add_label(setting) 1871 self.assertEqual(class_(msg).get_flags(), result) 1872 msg = mailbox.BabylMessage(_sample_message) 1873 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1874 'edited', 'resent'): 1875 msg.add_label(label) 1876 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1877 self.assertEqual(class_(msg).get_flags(), 'ODA') 1878 1879 def test_babyl_to_mh(self): 1880 # Convert BabylMessage to MHMessage 1881 pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []), 1882 ('answered', ['replied']), ('forwarded', []), ('edited', []), 1883 ('resent', [])) 1884 for setting, result in pairs: 1885 msg = mailbox.BabylMessage(_sample_message) 1886 msg.add_label(setting) 1887 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result) 1888 msg = mailbox.BabylMessage(_sample_message) 1889 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1890 'edited', 'resent'): 1891 msg.add_label(label) 1892 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1893 ['unseen', 'replied']) 1894 1895 def test_babyl_to_babyl(self): 1896 # Convert BabylMessage to BabylMessage 1897 msg = mailbox.BabylMessage(_sample_message) 1898 msg.update_visible() 1899 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1900 'edited', 'resent'): 1901 msg.add_label(label) 1902 msg2 = mailbox.BabylMessage(msg) 1903 self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed', 1904 'answered', 'forwarded', 'edited', 1905 'resent']) 1906 self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys()) 1907 for key in msg.get_visible().keys(): 1908 self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key]) 1909 1910 1911 class TestProxyFileBase(TestBase): 1912 1913 def _test_read(self, proxy): 1914 # Read by byte 1915 proxy.seek(0) 1916 self.assertEqual(proxy.read(), b'bar') 1917 proxy.seek(1) 1918 self.assertEqual(proxy.read(), b'ar') 1919 proxy.seek(0) 1920 self.assertEqual(proxy.read(2), b'ba') 1921 proxy.seek(1) 1922 self.assertEqual(proxy.read(-1), b'ar') 1923 proxy.seek(2) 1924 self.assertEqual(proxy.read(1000), b'r') 1925 1926 def _test_readline(self, proxy): 1927 # Read by line 1928 linesep = os.linesep.encode() 1929 proxy.seek(0) 1930 self.assertEqual(proxy.readline(), b'foo' + linesep) 1931 self.assertEqual(proxy.readline(), b'bar' + linesep) 1932 self.assertEqual(proxy.readline(), b'fred' + linesep) 1933 self.assertEqual(proxy.readline(), b'bob') 1934 proxy.seek(2) 1935 self.assertEqual(proxy.readline(), b'o' + linesep) 1936 proxy.seek(6 + 2 * len(os.linesep)) 1937 self.assertEqual(proxy.readline(), b'fred' + linesep) 1938 proxy.seek(6 + 2 * len(os.linesep)) 1939 self.assertEqual(proxy.readline(2), b'fr') 1940 self.assertEqual(proxy.readline(-10), b'ed' + linesep) 1941 1942 def _test_readlines(self, proxy): 1943 # Read multiple lines 1944 linesep = os.linesep.encode() 1945 proxy.seek(0) 1946 self.assertEqual(proxy.readlines(), [b'foo' + linesep, 1947 b'bar' + linesep, 1948 b'fred' + linesep, b'bob']) 1949 proxy.seek(0) 1950 self.assertEqual(proxy.readlines(2), [b'foo' + linesep]) 1951 proxy.seek(3 + len(linesep)) 1952 self.assertEqual(proxy.readlines(4 + len(linesep)), 1953 [b'bar' + linesep, b'fred' + linesep]) 1954 proxy.seek(3) 1955 self.assertEqual(proxy.readlines(1000), [linesep, b'bar' + linesep, 1956 b'fred' + linesep, b'bob']) 1957 1958 def _test_iteration(self, proxy): 1959 # Iterate by line 1960 linesep = os.linesep.encode() 1961 proxy.seek(0) 1962 iterator = iter(proxy) 1963 self.assertEqual(next(iterator), b'foo' + linesep) 1964 self.assertEqual(next(iterator), b'bar' + linesep) 1965 self.assertEqual(next(iterator), b'fred' + linesep) 1966 self.assertEqual(next(iterator), b'bob') 1967 self.assertRaises(StopIteration, next, iterator) 1968 1969 def _test_seek_and_tell(self, proxy): 1970 # Seek and use tell to check position 1971 linesep = os.linesep.encode() 1972 proxy.seek(3) 1973 self.assertEqual(proxy.tell(), 3) 1974 self.assertEqual(proxy.read(len(linesep)), linesep) 1975 proxy.seek(2, 1) 1976 self.assertEqual(proxy.read(1 + len(linesep)), b'r' + linesep) 1977 proxy.seek(-3 - len(linesep), 2) 1978 self.assertEqual(proxy.read(3), b'bar') 1979 proxy.seek(2, 0) 1980 self.assertEqual(proxy.read(), b'o' + linesep + b'bar' + linesep) 1981 proxy.seek(100) 1982 self.assertFalse(proxy.read()) 1983 1984 def _test_close(self, proxy): 1985 # Close a file 1986 self.assertFalse(proxy.closed) 1987 proxy.close() 1988 self.assertTrue(proxy.closed) 1989 # Issue 11700 subsequent closes should be a no-op. 1990 proxy.close() 1991 self.assertTrue(proxy.closed) 1992 1993 1994 class TestProxyFile(TestProxyFileBase, unittest.TestCase): 1995 1996 def setUp(self): 1997 self._path = support.TESTFN 1998 self._file = open(self._path, 'wb+') 1999 2000 def tearDown(self): 2001 self._file.close() 2002 self._delete_recursively(self._path) 2003 2004 def test_initialize(self): 2005 # Initialize and check position 2006 self._file.write(b'foo') 2007 pos = self._file.tell() 2008 proxy0 = mailbox._ProxyFile(self._file) 2009 self.assertEqual(proxy0.tell(), pos) 2010 self.assertEqual(self._file.tell(), pos) 2011 proxy1 = mailbox._ProxyFile(self._file, 0) 2012 self.assertEqual(proxy1.tell(), 0) 2013 self.assertEqual(self._file.tell(), pos) 2014 2015 def test_read(self): 2016 self._file.write(b'bar') 2017 self._test_read(mailbox._ProxyFile(self._file)) 2018 2019 def test_readline(self): 2020 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2021 os.linesep), 'ascii')) 2022 self._test_readline(mailbox._ProxyFile(self._file)) 2023 2024 def test_readlines(self): 2025 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2026 os.linesep), 'ascii')) 2027 self._test_readlines(mailbox._ProxyFile(self._file)) 2028 2029 def test_iteration(self): 2030 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2031 os.linesep), 'ascii')) 2032 self._test_iteration(mailbox._ProxyFile(self._file)) 2033 2034 def test_seek_and_tell(self): 2035 self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii')) 2036 self._test_seek_and_tell(mailbox._ProxyFile(self._file)) 2037 2038 def test_close(self): 2039 self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii')) 2040 self._test_close(mailbox._ProxyFile(self._file)) 2041 2042 2043 class TestPartialFile(TestProxyFileBase, unittest.TestCase): 2044 2045 def setUp(self): 2046 self._path = support.TESTFN 2047 self._file = open(self._path, 'wb+') 2048 2049 def tearDown(self): 2050 self._file.close() 2051 self._delete_recursively(self._path) 2052 2053 def test_initialize(self): 2054 # Initialize and check position 2055 self._file.write(bytes('foo' + os.linesep + 'bar', 'ascii')) 2056 pos = self._file.tell() 2057 proxy = mailbox._PartialFile(self._file, 2, 5) 2058 self.assertEqual(proxy.tell(), 0) 2059 self.assertEqual(self._file.tell(), pos) 2060 2061 def test_read(self): 2062 self._file.write(bytes('***bar***', 'ascii')) 2063 self._test_read(mailbox._PartialFile(self._file, 3, 6)) 2064 2065 def test_readline(self): 2066 self._file.write(bytes('!!!!!foo%sbar%sfred%sbob!!!!!' % 2067 (os.linesep, os.linesep, os.linesep), 'ascii')) 2068 self._test_readline(mailbox._PartialFile(self._file, 5, 2069 18 + 3 * len(os.linesep))) 2070 2071 def test_readlines(self): 2072 self._file.write(bytes('foo%sbar%sfred%sbob?????' % 2073 (os.linesep, os.linesep, os.linesep), 'ascii')) 2074 self._test_readlines(mailbox._PartialFile(self._file, 0, 2075 13 + 3 * len(os.linesep))) 2076 2077 def test_iteration(self): 2078 self._file.write(bytes('____foo%sbar%sfred%sbob####' % 2079 (os.linesep, os.linesep, os.linesep), 'ascii')) 2080 self._test_iteration(mailbox._PartialFile(self._file, 4, 2081 17 + 3 * len(os.linesep))) 2082 2083 def test_seek_and_tell(self): 2084 self._file.write(bytes('(((foo%sbar%s$$$' % (os.linesep, os.linesep), 'ascii')) 2085 self._test_seek_and_tell(mailbox._PartialFile(self._file, 3, 2086 9 + 2 * len(os.linesep))) 2087 2088 def test_close(self): 2089 self._file.write(bytes('&foo%sbar%s^' % (os.linesep, os.linesep), 'ascii')) 2090 self._test_close(mailbox._PartialFile(self._file, 1, 2091 6 + 3 * len(os.linesep))) 2092 2093 2094 ## Start: tests from the original module (for backward compatibility). 2095 2096 FROM_ = "From some.body (at] dummy.domain Sat Jul 24 13:43:35 2004\n" 2097 DUMMY_MESSAGE = """\ 2098 From: some.body (at] dummy.domain 2099 To: me (at] my.domain 2100 Subject: Simple Test 2101 2102 This is a dummy message. 2103 """ 2104 2105 class MaildirTestCase(unittest.TestCase): 2106 2107 def setUp(self): 2108 # create a new maildir mailbox to work with: 2109 self._dir = support.TESTFN 2110 if os.path.isdir(self._dir): 2111 support.rmtree(self._dir) 2112 elif os.path.isfile(self._dir): 2113 support.unlink(self._dir) 2114 os.mkdir(self._dir) 2115 os.mkdir(os.path.join(self._dir, "cur")) 2116 os.mkdir(os.path.join(self._dir, "tmp")) 2117 os.mkdir(os.path.join(self._dir, "new")) 2118 self._counter = 1 2119 self._msgfiles = [] 2120 2121 def tearDown(self): 2122 list(map(os.unlink, self._msgfiles)) 2123 support.rmdir(os.path.join(self._dir, "cur")) 2124 support.rmdir(os.path.join(self._dir, "tmp")) 2125 support.rmdir(os.path.join(self._dir, "new")) 2126 support.rmdir(self._dir) 2127 2128 def createMessage(self, dir, mbox=False): 2129 t = int(time.time() % 1000000) 2130 pid = self._counter 2131 self._counter += 1 2132 filename = ".".join((str(t), str(pid), "myhostname", "mydomain")) 2133 tmpname = os.path.join(self._dir, "tmp", filename) 2134 newname = os.path.join(self._dir, dir, filename) 2135 with open(tmpname, "w") as fp: 2136 self._msgfiles.append(tmpname) 2137 if mbox: 2138 fp.write(FROM_) 2139 fp.write(DUMMY_MESSAGE) 2140 try: 2141 os.link(tmpname, newname) 2142 except (AttributeError, PermissionError): 2143 with open(newname, "w") as fp: 2144 fp.write(DUMMY_MESSAGE) 2145 self._msgfiles.append(newname) 2146 return tmpname 2147 2148 def test_empty_maildir(self): 2149 """Test an empty maildir mailbox""" 2150 # Test for regression on bug #117490: 2151 # Make sure the boxes attribute actually gets set. 2152 self.mbox = mailbox.Maildir(support.TESTFN) 2153 #self.assertTrue(hasattr(self.mbox, "boxes")) 2154 #self.assertEqual(len(self.mbox.boxes), 0) 2155 self.assertIsNone(self.mbox.next()) 2156 self.assertIsNone(self.mbox.next()) 2157 2158 def test_nonempty_maildir_cur(self): 2159 self.createMessage("cur") 2160 self.mbox = mailbox.Maildir(support.TESTFN) 2161 #self.assertEqual(len(self.mbox.boxes), 1) 2162 self.assertIsNotNone(self.mbox.next()) 2163 self.assertIsNone(self.mbox.next()) 2164 self.assertIsNone(self.mbox.next()) 2165 2166 def test_nonempty_maildir_new(self): 2167 self.createMessage("new") 2168 self.mbox = mailbox.Maildir(support.TESTFN) 2169 #self.assertEqual(len(self.mbox.boxes), 1) 2170 self.assertIsNotNone(self.mbox.next()) 2171 self.assertIsNone(self.mbox.next()) 2172 self.assertIsNone(self.mbox.next()) 2173 2174 def test_nonempty_maildir_both(self): 2175 self.createMessage("cur") 2176 self.createMessage("new") 2177 self.mbox = mailbox.Maildir(support.TESTFN) 2178 #self.assertEqual(len(self.mbox.boxes), 2) 2179 self.assertIsNotNone(self.mbox.next()) 2180 self.assertIsNotNone(self.mbox.next()) 2181 self.assertIsNone(self.mbox.next()) 2182 self.assertIsNone(self.mbox.next()) 2183 2184 ## End: tests from the original module (for backward compatibility). 2185 2186 2187 _sample_message = """\ 2188 Return-Path: <gkj (at] gregorykjohnson.com> 2189 X-Original-To: gkj+person@localhost 2190 Delivered-To: gkj+person@localhost 2191 Received: from localhost (localhost [127.0.0.1]) 2192 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2193 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2194 Delivered-To: gkj (at] sundance.gregorykjohnson.com 2195 Received: from localhost [127.0.0.1] 2196 by localhost with POP3 (fetchmail-6.2.5) 2197 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2198 Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2199 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2200 for <gkj (at] gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2201 Received: by andy.gregorykjohnson.com (Postfix, from userid 1000) 2202 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2203 Date: Wed, 13 Jul 2005 17:23:11 -0400 2204 From: "Gregory K. Johnson" <gkj (at] gregorykjohnson.com> 2205 To: gkj (at] gregorykjohnson.com 2206 Subject: Sample message 2207 Message-ID: <20050713212311.GC4701 (at] andy.gregorykjohnson.com> 2208 Mime-Version: 1.0 2209 Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+" 2210 Content-Disposition: inline 2211 User-Agent: Mutt/1.5.9i 2212 2213 2214 --NMuMz9nt05w80d4+ 2215 Content-Type: text/plain; charset=us-ascii 2216 Content-Disposition: inline 2217 2218 This is a sample message. 2219 2220 -- 2221 Gregory K. Johnson 2222 2223 --NMuMz9nt05w80d4+ 2224 Content-Type: application/octet-stream 2225 Content-Disposition: attachment; filename="text.gz" 2226 Content-Transfer-Encoding: base64 2227 2228 H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 2229 3FYlAAAA 2230 2231 --NMuMz9nt05w80d4+-- 2232 """ 2233 2234 _bytes_sample_message = _sample_message.encode('ascii') 2235 2236 _sample_headers = { 2237 "Return-Path":"<gkj (at] gregorykjohnson.com>", 2238 "X-Original-To":"gkj+person@localhost", 2239 "Delivered-To":"gkj+person@localhost", 2240 "Received":"""from localhost (localhost [127.0.0.1]) 2241 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2242 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2243 "Delivered-To":"gkj (at] sundance.gregorykjohnson.com", 2244 "Received":"""from localhost [127.0.0.1] 2245 by localhost with POP3 (fetchmail-6.2.5) 2246 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2247 "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2248 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2249 for <gkj (at] gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2250 "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000) 2251 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2252 "Date":"Wed, 13 Jul 2005 17:23:11 -0400", 2253 "From":""""Gregory K. Johnson" <gkj (at] gregorykjohnson.com>""", 2254 "To":"gkj (at] gregorykjohnson.com", 2255 "Subject":"Sample message", 2256 "Mime-Version":"1.0", 2257 "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""", 2258 "Content-Disposition":"inline", 2259 "User-Agent": "Mutt/1.5.9i" } 2260 2261 _sample_payloads = ("""This is a sample message. 2262 2263 -- 2264 Gregory K. Johnson 2265 """, 2266 """H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 2267 3FYlAAAA 2268 """) 2269 2270 2271 class MiscTestCase(unittest.TestCase): 2272 def test__all__(self): 2273 blacklist = {"linesep", "fcntl"} 2274 support.check__all__(self, mailbox, blacklist=blacklist) 2275 2276 2277 def test_main(): 2278 tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH, 2279 TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage, 2280 TestMHMessage, TestBabylMessage, TestMMDFMessage, 2281 TestMessageConversion, TestProxyFile, TestPartialFile, 2282 MaildirTestCase, TestFakeMailBox, MiscTestCase) 2283 support.run_unittest(*tests) 2284 support.reap_children() 2285 2286 2287 if __name__ == '__main__': 2288 test_main() 2289