1 import os 2 import sys 3 import time 4 import stat 5 import socket 6 import email 7 import email.message 8 import re 9 import shutil 10 import StringIO 11 import tempfile 12 from test import test_support 13 import unittest 14 import mailbox 15 import glob 16 try: 17 import fcntl 18 except ImportError: 19 pass 20 21 # Silence Py3k warning 22 rfc822 = test_support.import_module('rfc822', deprecated=True) 23 24 class TestBase: 25 26 def _check_sample(self, msg): 27 # Inspect a mailbox.Message representation of the sample message 28 self.assertIsInstance(msg, email.message.Message) 29 self.assertIsInstance(msg, mailbox.Message) 30 for key, value in _sample_headers.iteritems(): 31 self.assertIn(value, msg.get_all(key)) 32 self.assertTrue(msg.is_multipart()) 33 self.assertEqual(len(msg.get_payload()), len(_sample_payloads)) 34 for i, payload in enumerate(_sample_payloads): 35 part = msg.get_payload(i) 36 self.assertIsInstance(part, email.message.Message) 37 self.assertNotIsInstance(part, mailbox.Message) 38 self.assertEqual(part.get_payload(), payload) 39 40 def _delete_recursively(self, target): 41 # Delete a file or delete a directory recursively 42 if os.path.isdir(target): 43 test_support.rmtree(target) 44 elif os.path.exists(target): 45 test_support.unlink(target) 46 47 48 class TestMailbox(TestBase): 49 50 _factory = None # Overridden by subclasses to reuse tests 51 _template = 'From: foo\n\n%s\n' 52 53 def setUp(self): 54 self._path = test_support.TESTFN 55 self._delete_recursively(self._path) 56 self._box = self._factory(self._path) 57 58 def tearDown(self): 59 self._box.close() 60 self._delete_recursively(self._path) 61 62 def test_add(self): 63 # Add copies of a sample message 64 keys = [] 65 keys.append(self._box.add(self._template % 0)) 66 self.assertEqual(len(self._box), 1) 67 keys.append(self._box.add(mailbox.Message(_sample_message))) 68 self.assertEqual(len(self._box), 2) 69 keys.append(self._box.add(email.message_from_string(_sample_message))) 70 self.assertEqual(len(self._box), 3) 71 keys.append(self._box.add(StringIO.StringIO(_sample_message))) 72 self.assertEqual(len(self._box), 4) 73 keys.append(self._box.add(_sample_message)) 74 self.assertEqual(len(self._box), 5) 75 self.assertEqual(self._box.get_string(keys[0]), self._template % 0) 76 for i in (1, 2, 3, 4): 77 self._check_sample(self._box[keys[i]]) 78 79 def test_add_file(self): 80 with tempfile.TemporaryFile('w+') as f: 81 f.write(_sample_message) 82 f.seek(0) 83 key = self._box.add(f) 84 self.assertEqual(self._box.get_string(key).split('\n'), 85 _sample_message.split('\n')) 86 87 def test_add_StringIO(self): 88 key = self._box.add(StringIO.StringIO(self._template % "0")) 89 self.assertEqual(self._box.get_string(key), self._template % "0") 90 91 def test_remove(self): 92 # Remove messages using remove() 93 self._test_remove_or_delitem(self._box.remove) 94 95 def test_delitem(self): 96 # Remove messages using __delitem__() 97 self._test_remove_or_delitem(self._box.__delitem__) 98 99 def _test_remove_or_delitem(self, method): 100 # (Used by test_remove() and test_delitem().) 101 key0 = self._box.add(self._template % 0) 102 key1 = self._box.add(self._template % 1) 103 self.assertEqual(len(self._box), 2) 104 method(key0) 105 l = len(self._box) 106 self.assertEqual(l, 1) 107 self.assertRaises(KeyError, lambda: self._box[key0]) 108 self.assertRaises(KeyError, lambda: method(key0)) 109 self.assertEqual(self._box.get_string(key1), self._template % 1) 110 key2 = self._box.add(self._template % 2) 111 self.assertEqual(len(self._box), 2) 112 method(key2) 113 l = len(self._box) 114 self.assertEqual(l, 1) 115 self.assertRaises(KeyError, lambda: self._box[key2]) 116 self.assertRaises(KeyError, lambda: method(key2)) 117 self.assertEqual(self._box.get_string(key1), self._template % 1) 118 method(key1) 119 self.assertEqual(len(self._box), 0) 120 self.assertRaises(KeyError, lambda: self._box[key1]) 121 self.assertRaises(KeyError, lambda: method(key1)) 122 123 def test_discard(self, repetitions=10): 124 # Discard messages 125 key0 = self._box.add(self._template % 0) 126 key1 = self._box.add(self._template % 1) 127 self.assertEqual(len(self._box), 2) 128 self._box.discard(key0) 129 self.assertEqual(len(self._box), 1) 130 self.assertRaises(KeyError, lambda: self._box[key0]) 131 self._box.discard(key0) 132 self.assertEqual(len(self._box), 1) 133 self.assertRaises(KeyError, lambda: self._box[key0]) 134 135 def test_get(self): 136 # Retrieve messages using get() 137 key0 = self._box.add(self._template % 0) 138 msg = self._box.get(key0) 139 self.assertEqual(msg['from'], 'foo') 140 self.assertEqual(msg.get_payload(), '0\n') 141 self.assertIs(self._box.get('foo'), None) 142 self.assertFalse(self._box.get('foo', False)) 143 self._box.close() 144 self._box = self._factory(self._path, factory=rfc822.Message) 145 key1 = self._box.add(self._template % 1) 146 msg = self._box.get(key1) 147 self.assertEqual(msg['from'], 'foo') 148 self.assertEqual(msg.fp.read(), '1' + os.linesep) 149 msg.fp.close() 150 151 def test_getitem(self): 152 # Retrieve message using __getitem__() 153 key0 = self._box.add(self._template % 0) 154 msg = self._box[key0] 155 self.assertEqual(msg['from'], 'foo') 156 self.assertEqual(msg.get_payload(), '0\n') 157 self.assertRaises(KeyError, lambda: self._box['foo']) 158 self._box.discard(key0) 159 self.assertRaises(KeyError, lambda: self._box[key0]) 160 161 def test_get_message(self): 162 # Get Message representations of messages 163 key0 = self._box.add(self._template % 0) 164 key1 = self._box.add(_sample_message) 165 msg0 = self._box.get_message(key0) 166 self.assertIsInstance(msg0, mailbox.Message) 167 self.assertEqual(msg0['from'], 'foo') 168 self.assertEqual(msg0.get_payload(), '0\n') 169 self._check_sample(self._box.get_message(key1)) 170 171 def test_get_string(self): 172 # Get string representations of messages 173 key0 = self._box.add(self._template % 0) 174 key1 = self._box.add(_sample_message) 175 self.assertEqual(self._box.get_string(key0), self._template % 0) 176 self.assertEqual(self._box.get_string(key1), _sample_message) 177 178 def test_get_file(self): 179 # Get file representations of messages 180 key0 = self._box.add(self._template % 0) 181 key1 = self._box.add(_sample_message) 182 msg0 = self._box.get_file(key0) 183 self.assertEqual(msg0.read().replace(os.linesep, '\n'), 184 self._template % 0) 185 msg1 = self._box.get_file(key1) 186 self.assertEqual(msg1.read().replace(os.linesep, '\n'), 187 _sample_message) 188 msg0.close() 189 msg1.close() 190 191 def test_get_file_can_be_closed_twice(self): 192 # Issue 11700 193 key = self._box.add(_sample_message) 194 f = self._box.get_file(key) 195 f.close() 196 f.close() 197 198 def test_iterkeys(self): 199 # Get keys using iterkeys() 200 self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False) 201 202 def test_keys(self): 203 # Get keys using keys() 204 self._check_iteration(self._box.keys, do_keys=True, do_values=False) 205 206 def test_itervalues(self): 207 # Get values using itervalues() 208 self._check_iteration(self._box.itervalues, do_keys=False, 209 do_values=True) 210 211 def test_iter(self): 212 # Get values using __iter__() 213 self._check_iteration(self._box.__iter__, do_keys=False, 214 do_values=True) 215 216 def test_values(self): 217 # Get values using values() 218 self._check_iteration(self._box.values, do_keys=False, do_values=True) 219 220 def test_iteritems(self): 221 # Get keys and values using iteritems() 222 self._check_iteration(self._box.iteritems, do_keys=True, 223 do_values=True) 224 225 def test_items(self): 226 # Get keys and values using items() 227 self._check_iteration(self._box.items, do_keys=True, do_values=True) 228 229 def _check_iteration(self, method, do_keys, do_values, repetitions=10): 230 for value in method(): 231 self.fail("Not empty") 232 keys, values = [], [] 233 for i in xrange(repetitions): 234 keys.append(self._box.add(self._template % i)) 235 values.append(self._template % i) 236 if do_keys and not do_values: 237 returned_keys = list(method()) 238 elif do_values and not do_keys: 239 returned_values = list(method()) 240 else: 241 returned_keys, returned_values = [], [] 242 for key, value in method(): 243 returned_keys.append(key) 244 returned_values.append(value) 245 if do_keys: 246 self.assertEqual(len(keys), len(returned_keys)) 247 self.assertEqual(set(keys), set(returned_keys)) 248 if do_values: 249 count = 0 250 for value in returned_values: 251 self.assertEqual(value['from'], 'foo') 252 self.assertTrue(int(value.get_payload()) < repetitions, 253 (value.get_payload(), repetitions)) 254 count += 1 255 self.assertEqual(len(values), count) 256 257 def test_has_key(self): 258 # Check existence of keys using has_key() 259 self._test_has_key_or_contains(self._box.has_key) 260 261 def test_contains(self): 262 # Check existence of keys using __contains__() 263 self._test_has_key_or_contains(self._box.__contains__) 264 265 def _test_has_key_or_contains(self, method): 266 # (Used by test_has_key() and test_contains().) 267 self.assertFalse(method('foo')) 268 key0 = self._box.add(self._template % 0) 269 self.assertTrue(method(key0)) 270 self.assertFalse(method('foo')) 271 key1 = self._box.add(self._template % 1) 272 self.assertTrue(method(key1)) 273 self.assertTrue(method(key0)) 274 self.assertFalse(method('foo')) 275 self._box.remove(key0) 276 self.assertFalse(method(key0)) 277 self.assertTrue(method(key1)) 278 self.assertFalse(method('foo')) 279 self._box.remove(key1) 280 self.assertFalse(method(key1)) 281 self.assertFalse(method(key0)) 282 self.assertFalse(method('foo')) 283 284 def test_len(self, repetitions=10): 285 # Get message count 286 keys = [] 287 for i in xrange(repetitions): 288 self.assertEqual(len(self._box), i) 289 keys.append(self._box.add(self._template % i)) 290 self.assertEqual(len(self._box), i + 1) 291 for i in xrange(repetitions): 292 self.assertEqual(len(self._box), repetitions - i) 293 self._box.remove(keys[i]) 294 self.assertEqual(len(self._box), repetitions - i - 1) 295 296 def test_set_item(self): 297 # Modify messages using __setitem__() 298 key0 = self._box.add(self._template % 'original 0') 299 self.assertEqual(self._box.get_string(key0), 300 self._template % 'original 0') 301 key1 = self._box.add(self._template % 'original 1') 302 self.assertEqual(self._box.get_string(key1), 303 self._template % 'original 1') 304 self._box[key0] = self._template % 'changed 0' 305 self.assertEqual(self._box.get_string(key0), 306 self._template % 'changed 0') 307 self._box[key1] = self._template % 'changed 1' 308 self.assertEqual(self._box.get_string(key1), 309 self._template % 'changed 1') 310 self._box[key0] = _sample_message 311 self._check_sample(self._box[key0]) 312 self._box[key1] = self._box[key0] 313 self._check_sample(self._box[key1]) 314 self._box[key0] = self._template % 'original 0' 315 self.assertEqual(self._box.get_string(key0), 316 self._template % 'original 0') 317 self._check_sample(self._box[key1]) 318 self.assertRaises(KeyError, 319 lambda: self._box.__setitem__('foo', 'bar')) 320 self.assertRaises(KeyError, lambda: self._box['foo']) 321 self.assertEqual(len(self._box), 2) 322 323 def test_clear(self, iterations=10): 324 # Remove all messages using clear() 325 keys = [] 326 for i in xrange(iterations): 327 self._box.add(self._template % i) 328 for i, key in enumerate(keys): 329 self.assertEqual(self._box.get_string(key), self._template % i) 330 self._box.clear() 331 self.assertEqual(len(self._box), 0) 332 for i, key in enumerate(keys): 333 self.assertRaises(KeyError, lambda: self._box.get_string(key)) 334 335 def test_pop(self): 336 # Get and remove a message using pop() 337 key0 = self._box.add(self._template % 0) 338 self.assertIn(key0, self._box) 339 key1 = self._box.add(self._template % 1) 340 self.assertIn(key1, self._box) 341 self.assertEqual(self._box.pop(key0).get_payload(), '0\n') 342 self.assertNotIn(key0, self._box) 343 self.assertIn(key1, self._box) 344 key2 = self._box.add(self._template % 2) 345 self.assertIn(key2, self._box) 346 self.assertEqual(self._box.pop(key2).get_payload(), '2\n') 347 self.assertNotIn(key2, self._box) 348 self.assertIn(key1, self._box) 349 self.assertEqual(self._box.pop(key1).get_payload(), '1\n') 350 self.assertNotIn(key1, self._box) 351 self.assertEqual(len(self._box), 0) 352 353 def test_popitem(self, iterations=10): 354 # Get and remove an arbitrary (key, message) using popitem() 355 keys = [] 356 for i in xrange(10): 357 keys.append(self._box.add(self._template % i)) 358 seen = [] 359 for i in xrange(10): 360 key, msg = self._box.popitem() 361 self.assertIn(key, keys) 362 self.assertNotIn(key, seen) 363 seen.append(key) 364 self.assertEqual(int(msg.get_payload()), keys.index(key)) 365 self.assertEqual(len(self._box), 0) 366 for key in keys: 367 self.assertRaises(KeyError, lambda: self._box[key]) 368 369 def test_update(self): 370 # Modify multiple messages using update() 371 key0 = self._box.add(self._template % 'original 0') 372 key1 = self._box.add(self._template % 'original 1') 373 key2 = self._box.add(self._template % 'original 2') 374 self._box.update({key0: self._template % 'changed 0', 375 key2: _sample_message}) 376 self.assertEqual(len(self._box), 3) 377 self.assertEqual(self._box.get_string(key0), 378 self._template % 'changed 0') 379 self.assertEqual(self._box.get_string(key1), 380 self._template % 'original 1') 381 self._check_sample(self._box[key2]) 382 self._box.update([(key2, self._template % 'changed 2'), 383 (key1, self._template % 'changed 1'), 384 (key0, self._template % 'original 0')]) 385 self.assertEqual(len(self._box), 3) 386 self.assertEqual(self._box.get_string(key0), 387 self._template % 'original 0') 388 self.assertEqual(self._box.get_string(key1), 389 self._template % 'changed 1') 390 self.assertEqual(self._box.get_string(key2), 391 self._template % 'changed 2') 392 self.assertRaises(KeyError, 393 lambda: self._box.update({'foo': 'bar', 394 key0: self._template % "changed 0"})) 395 self.assertEqual(len(self._box), 3) 396 self.assertEqual(self._box.get_string(key0), 397 self._template % "changed 0") 398 self.assertEqual(self._box.get_string(key1), 399 self._template % "changed 1") 400 self.assertEqual(self._box.get_string(key2), 401 self._template % "changed 2") 402 403 def test_flush(self): 404 # Write changes to disk 405 self._test_flush_or_close(self._box.flush, True) 406 407 def test_popitem_and_flush_twice(self): 408 # See #15036. 409 self._box.add(self._template % 0) 410 self._box.add(self._template % 1) 411 self._box.flush() 412 413 self._box.popitem() 414 self._box.flush() 415 self._box.popitem() 416 self._box.flush() 417 418 def test_lock_unlock(self): 419 # Lock and unlock the mailbox 420 self.assertFalse(os.path.exists(self._get_lock_path())) 421 self._box.lock() 422 self.assertTrue(os.path.exists(self._get_lock_path())) 423 self._box.unlock() 424 self.assertFalse(os.path.exists(self._get_lock_path())) 425 426 def test_close(self): 427 # Close mailbox and flush changes to disk 428 self._test_flush_or_close(self._box.close, False) 429 430 def _test_flush_or_close(self, method, should_call_close): 431 contents = [self._template % i for i in xrange(3)] 432 self._box.add(contents[0]) 433 self._box.add(contents[1]) 434 self._box.add(contents[2]) 435 oldbox = self._box 436 method() 437 if should_call_close: 438 self._box.close() 439 self._box = self._factory(self._path) 440 keys = self._box.keys() 441 self.assertEqual(len(keys), 3) 442 for key in keys: 443 self.assertIn(self._box.get_string(key), contents) 444 oldbox.close() 445 446 def test_dump_message(self): 447 # Write message representations to disk 448 for input in (email.message_from_string(_sample_message), 449 _sample_message, StringIO.StringIO(_sample_message)): 450 output = StringIO.StringIO() 451 self._box._dump_message(input, output) 452 self.assertEqual(output.getvalue(), 453 _sample_message.replace('\n', os.linesep)) 454 output = StringIO.StringIO() 455 self.assertRaises(TypeError, 456 lambda: self._box._dump_message(None, output)) 457 458 def _get_lock_path(self): 459 # Return the path of the dot lock file. May be overridden. 460 return self._path + '.lock' 461 462 463 class TestMailboxSuperclass(TestBase, unittest.TestCase): 464 465 def test_notimplemented(self): 466 # Test that all Mailbox methods raise NotImplementedException. 467 box = mailbox.Mailbox('path') 468 self.assertRaises(NotImplementedError, lambda: box.add('')) 469 self.assertRaises(NotImplementedError, lambda: box.remove('')) 470 self.assertRaises(NotImplementedError, lambda: box.__delitem__('')) 471 self.assertRaises(NotImplementedError, lambda: box.discard('')) 472 self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) 473 self.assertRaises(NotImplementedError, lambda: box.iterkeys()) 474 self.assertRaises(NotImplementedError, lambda: box.keys()) 475 self.assertRaises(NotImplementedError, lambda: box.itervalues().next()) 476 self.assertRaises(NotImplementedError, lambda: box.__iter__().next()) 477 self.assertRaises(NotImplementedError, lambda: box.values()) 478 self.assertRaises(NotImplementedError, lambda: box.iteritems().next()) 479 self.assertRaises(NotImplementedError, lambda: box.items()) 480 self.assertRaises(NotImplementedError, lambda: box.get('')) 481 self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) 482 self.assertRaises(NotImplementedError, lambda: box.get_message('')) 483 self.assertRaises(NotImplementedError, lambda: box.get_string('')) 484 self.assertRaises(NotImplementedError, lambda: box.get_file('')) 485 self.assertRaises(NotImplementedError, lambda: box.has_key('')) 486 self.assertRaises(NotImplementedError, lambda: box.__contains__('')) 487 self.assertRaises(NotImplementedError, lambda: box.__len__()) 488 self.assertRaises(NotImplementedError, lambda: box.clear()) 489 self.assertRaises(NotImplementedError, lambda: box.pop('')) 490 self.assertRaises(NotImplementedError, lambda: box.popitem()) 491 self.assertRaises(NotImplementedError, lambda: box.update((('', ''),))) 492 self.assertRaises(NotImplementedError, lambda: box.flush()) 493 self.assertRaises(NotImplementedError, lambda: box.lock()) 494 self.assertRaises(NotImplementedError, lambda: box.unlock()) 495 self.assertRaises(NotImplementedError, lambda: box.close()) 496 497 498 class TestMaildir(TestMailbox, unittest.TestCase): 499 500 _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) 501 502 def setUp(self): 503 TestMailbox.setUp(self) 504 if os.name in ('nt', 'os2') or sys.platform == 'cygwin': 505 self._box.colon = '!' 506 507 def test_add_MM(self): 508 # Add a MaildirMessage instance 509 msg = mailbox.MaildirMessage(self._template % 0) 510 msg.set_subdir('cur') 511 msg.set_info('foo') 512 key = self._box.add(msg) 513 self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' % 514 (key, self._box.colon)))) 515 516 def test_get_MM(self): 517 # Get a MaildirMessage instance 518 msg = mailbox.MaildirMessage(self._template % 0) 519 msg.set_subdir('cur') 520 msg.set_flags('RF') 521 key = self._box.add(msg) 522 msg_returned = self._box.get_message(key) 523 self.assertIsInstance(msg_returned, mailbox.MaildirMessage) 524 self.assertEqual(msg_returned.get_subdir(), 'cur') 525 self.assertEqual(msg_returned.get_flags(), 'FR') 526 527 def test_set_MM(self): 528 # Set with a MaildirMessage instance 529 msg0 = mailbox.MaildirMessage(self._template % 0) 530 msg0.set_flags('TP') 531 key = self._box.add(msg0) 532 msg_returned = self._box.get_message(key) 533 self.assertEqual(msg_returned.get_subdir(), 'new') 534 self.assertEqual(msg_returned.get_flags(), 'PT') 535 msg1 = mailbox.MaildirMessage(self._template % 1) 536 self._box[key] = msg1 537 msg_returned = self._box.get_message(key) 538 self.assertEqual(msg_returned.get_subdir(), 'new') 539 self.assertEqual(msg_returned.get_flags(), '') 540 self.assertEqual(msg_returned.get_payload(), '1\n') 541 msg2 = mailbox.MaildirMessage(self._template % 2) 542 msg2.set_info('2,S') 543 self._box[key] = msg2 544 self._box[key] = self._template % 3 545 msg_returned = self._box.get_message(key) 546 self.assertEqual(msg_returned.get_subdir(), 'new') 547 self.assertEqual(msg_returned.get_flags(), 'S') 548 self.assertEqual(msg_returned.get_payload(), '3\n') 549 550 def test_consistent_factory(self): 551 # Add a message. 552 msg = mailbox.MaildirMessage(self._template % 0) 553 msg.set_subdir('cur') 554 msg.set_flags('RF') 555 key = self._box.add(msg) 556 557 # Create new mailbox with 558 class FakeMessage(mailbox.MaildirMessage): 559 pass 560 box = mailbox.Maildir(self._path, factory=FakeMessage) 561 box.colon = self._box.colon 562 msg2 = box.get_message(key) 563 self.assertIsInstance(msg2, FakeMessage) 564 565 def test_initialize_new(self): 566 # Initialize a non-existent mailbox 567 self.tearDown() 568 self._box = mailbox.Maildir(self._path) 569 self._check_basics(factory=rfc822.Message) 570 self._delete_recursively(self._path) 571 self._box = self._factory(self._path, factory=None) 572 self._check_basics() 573 574 def test_initialize_existing(self): 575 # Initialize an existing mailbox 576 self.tearDown() 577 for subdir in '', 'tmp', 'new', 'cur': 578 os.mkdir(os.path.normpath(os.path.join(self._path, subdir))) 579 self._box = mailbox.Maildir(self._path) 580 self._check_basics(factory=rfc822.Message) 581 self._box = mailbox.Maildir(self._path, factory=None) 582 self._check_basics() 583 584 def _check_basics(self, factory=None): 585 # (Used by test_open_new() and test_open_existing().) 586 self.assertEqual(self._box._path, os.path.abspath(self._path)) 587 self.assertEqual(self._box._factory, factory) 588 for subdir in '', 'tmp', 'new', 'cur': 589 path = os.path.join(self._path, subdir) 590 mode = os.stat(path)[stat.ST_MODE] 591 self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path) 592 593 def test_list_folders(self): 594 # List folders 595 self._box.add_folder('one') 596 self._box.add_folder('two') 597 self._box.add_folder('three') 598 self.assertEqual(len(self._box.list_folders()), 3) 599 self.assertEqual(set(self._box.list_folders()), 600 set(('one', 'two', 'three'))) 601 602 def test_get_folder(self): 603 # Open folders 604 self._box.add_folder('foo.bar') 605 folder0 = self._box.get_folder('foo.bar') 606 folder0.add(self._template % 'bar') 607 self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar'))) 608 folder1 = self._box.get_folder('foo.bar') 609 self.assertEqual(folder1.get_string(folder1.keys()[0]), 610 self._template % 'bar') 611 612 def test_add_and_remove_folders(self): 613 # Delete folders 614 self._box.add_folder('one') 615 self._box.add_folder('two') 616 self.assertEqual(len(self._box.list_folders()), 2) 617 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 618 self._box.remove_folder('one') 619 self.assertEqual(len(self._box.list_folders()), 1) 620 self.assertEqual(set(self._box.list_folders()), set(('two',))) 621 self._box.add_folder('three') 622 self.assertEqual(len(self._box.list_folders()), 2) 623 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 624 self._box.remove_folder('three') 625 self.assertEqual(len(self._box.list_folders()), 1) 626 self.assertEqual(set(self._box.list_folders()), set(('two',))) 627 self._box.remove_folder('two') 628 self.assertEqual(len(self._box.list_folders()), 0) 629 self.assertEqual(self._box.list_folders(), []) 630 631 def test_clean(self): 632 # Remove old files from 'tmp' 633 foo_path = os.path.join(self._path, 'tmp', 'foo') 634 bar_path = os.path.join(self._path, 'tmp', 'bar') 635 with open(foo_path, 'w') as f: 636 f.write("@") 637 with open(bar_path, 'w') as f: 638 f.write("@") 639 self._box.clean() 640 self.assertTrue(os.path.exists(foo_path)) 641 self.assertTrue(os.path.exists(bar_path)) 642 foo_stat = os.stat(foo_path) 643 os.utime(foo_path, (time.time() - 129600 - 2, 644 foo_stat.st_mtime)) 645 self._box.clean() 646 self.assertFalse(os.path.exists(foo_path)) 647 self.assertTrue(os.path.exists(bar_path)) 648 649 def test_create_tmp(self, repetitions=10): 650 # Create files in tmp directory 651 hostname = socket.gethostname() 652 if '/' in hostname: 653 hostname = hostname.replace('/', r'\057') 654 if ':' in hostname: 655 hostname = hostname.replace(':', r'\072') 656 pid = os.getpid() 657 pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)" 658 r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)") 659 previous_groups = None 660 for x in xrange(repetitions): 661 tmp_file = self._box._create_tmp() 662 head, tail = os.path.split(tmp_file.name) 663 self.assertEqual(head, os.path.abspath(os.path.join(self._path, 664 "tmp")), 665 "File in wrong location: '%s'" % head) 666 match = pattern.match(tail) 667 self.assertTrue(match is not None, "Invalid file name: '%s'" % tail) 668 groups = match.groups() 669 if previous_groups is not None: 670 self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]), 671 "Non-monotonic seconds: '%s' before '%s'" % 672 (previous_groups[0], groups[0])) 673 if int(groups[0]) == int(previous_groups[0]): 674 self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]), 675 "Non-monotonic milliseconds: '%s' before '%s'" % 676 (previous_groups[1], groups[1])) 677 self.assertTrue(int(groups[2]) == pid, 678 "Process ID mismatch: '%s' should be '%s'" % 679 (groups[2], pid)) 680 self.assertTrue(int(groups[3]) == int(previous_groups[3]) + 1, 681 "Non-sequential counter: '%s' before '%s'" % 682 (previous_groups[3], groups[3])) 683 self.assertTrue(groups[4] == hostname, 684 "Host name mismatch: '%s' should be '%s'" % 685 (groups[4], hostname)) 686 previous_groups = groups 687 tmp_file.write(_sample_message) 688 tmp_file.seek(0) 689 self.assertTrue(tmp_file.read() == _sample_message) 690 tmp_file.close() 691 file_count = len(os.listdir(os.path.join(self._path, "tmp"))) 692 self.assertTrue(file_count == repetitions, 693 "Wrong file count: '%s' should be '%s'" % 694 (file_count, repetitions)) 695 696 def test_refresh(self): 697 # Update the table of contents 698 self.assertEqual(self._box._toc, {}) 699 key0 = self._box.add(self._template % 0) 700 key1 = self._box.add(self._template % 1) 701 self.assertEqual(self._box._toc, {}) 702 self._box._refresh() 703 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 704 key1: os.path.join('new', key1)}) 705 key2 = self._box.add(self._template % 2) 706 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 707 key1: os.path.join('new', key1)}) 708 self._box._refresh() 709 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 710 key1: os.path.join('new', key1), 711 key2: os.path.join('new', key2)}) 712 713 def test_refresh_after_safety_period(self): 714 # Issue #13254: Call _refresh after the "file system safety 715 # period" of 2 seconds has passed; _toc should still be 716 # updated because this is the first call to _refresh. 717 key0 = self._box.add(self._template % 0) 718 key1 = self._box.add(self._template % 1) 719 720 self._box = self._factory(self._path) 721 self.assertEqual(self._box._toc, {}) 722 723 # Emulate sleeping. Instead of sleeping for 2 seconds, use the 724 # skew factor to make _refresh think that the filesystem 725 # safety period has passed and re-reading the _toc is only 726 # required if mtimes differ. 727 self._box._skewfactor = -3 728 729 self._box._refresh() 730 self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1])) 731 732 def test_lookup(self): 733 # Look up message subpaths in the TOC 734 self.assertRaises(KeyError, lambda: self._box._lookup('foo')) 735 key0 = self._box.add(self._template % 0) 736 self.assertEqual(self._box._lookup(key0), os.path.join('new', key0)) 737 os.remove(os.path.join(self._path, 'new', key0)) 738 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)}) 739 # Be sure that the TOC is read back from disk (see issue #6896 740 # about bad mtime behaviour on some systems). 741 self._box.flush() 742 self.assertRaises(KeyError, lambda: self._box._lookup(key0)) 743 self.assertEqual(self._box._toc, {}) 744 745 def test_lock_unlock(self): 746 # Lock and unlock the mailbox. For Maildir, this does nothing. 747 self._box.lock() 748 self._box.unlock() 749 750 def test_folder (self): 751 # Test for bug #1569790: verify that folders returned by .get_folder() 752 # use the same factory function. 753 def dummy_factory (s): 754 return None 755 box = self._factory(self._path, factory=dummy_factory) 756 folder = box.add_folder('folder1') 757 self.assertIs(folder._factory, dummy_factory) 758 759 folder1_alias = box.get_folder('folder1') 760 self.assertIs(folder1_alias._factory, dummy_factory) 761 762 def test_directory_in_folder (self): 763 # Test that mailboxes still work if there's a stray extra directory 764 # in a folder. 765 for i in range(10): 766 self._box.add(mailbox.Message(_sample_message)) 767 768 # Create a stray directory 769 os.mkdir(os.path.join(self._path, 'cur', 'stray-dir')) 770 771 # Check that looping still works with the directory present. 772 for msg in self._box: 773 pass 774 775 def test_file_permissions(self): 776 # Verify that message files are created without execute permissions 777 if not hasattr(os, "stat") or not hasattr(os, "umask"): 778 return 779 msg = mailbox.MaildirMessage(self._template % 0) 780 orig_umask = os.umask(0) 781 try: 782 key = self._box.add(msg) 783 finally: 784 os.umask(orig_umask) 785 path = os.path.join(self._path, self._box._lookup(key)) 786 mode = os.stat(path).st_mode 787 self.assertEqual(mode & 0111, 0) 788 789 def test_folder_file_perms(self): 790 # From bug #3228, we want to verify that the file created inside a Maildir 791 # subfolder isn't marked as executable. 792 if not hasattr(os, "stat") or not hasattr(os, "umask"): 793 return 794 795 orig_umask = os.umask(0) 796 try: 797 subfolder = self._box.add_folder('subfolder') 798 finally: 799 os.umask(orig_umask) 800 801 path = os.path.join(subfolder._path, 'maildirfolder') 802 st = os.stat(path) 803 perms = st.st_mode 804 self.assertFalse((perms & 0111)) # Execute bits should all be off. 805 806 def test_reread(self): 807 # Do an initial unconditional refresh 808 self._box._refresh() 809 810 # Put the last modified times more than two seconds into the past 811 # (because mtime may have only a two second granularity). 812 for subdir in ('cur', 'new'): 813 os.utime(os.path.join(self._box._path, subdir), 814 (time.time()-5,)*2) 815 816 # Because mtime has a two second granularity in worst case (FAT), a 817 # refresh is done unconditionally if called for within 818 # two-second-plus-a-bit of the last one, just in case the mbox has 819 # changed; so now we have to wait for that interval to expire. 820 # 821 # Because this is a test, emulate sleeping. Instead of 822 # sleeping for 2 seconds, use the skew factor to make _refresh 823 # think that 2 seconds have passed and re-reading the _toc is 824 # only required if mtimes differ. 825 self._box._skewfactor = -3 826 827 # Re-reading causes the ._toc attribute to be assigned a new dictionary 828 # object, so we'll check that the ._toc attribute isn't a different 829 # object. 830 orig_toc = self._box._toc 831 def refreshed(): 832 return self._box._toc is not orig_toc 833 834 self._box._refresh() 835 self.assertFalse(refreshed()) 836 837 # Now, write something into cur and remove it. This changes 838 # the mtime and should cause a re-read. Note that "sleep 839 # emulation" is still in effect, as skewfactor is -3. 840 filename = os.path.join(self._path, 'cur', 'stray-file') 841 f = open(filename, 'w') 842 f.close() 843 os.unlink(filename) 844 self._box._refresh() 845 self.assertTrue(refreshed()) 846 847 848 class _TestSingleFile(TestMailbox): 849 '''Common tests for single-file mailboxes''' 850 851 def test_add_doesnt_rewrite(self): 852 # When only adding messages, flush() should not rewrite the 853 # mailbox file. See issue #9559. 854 855 # Inode number changes if the contents are written to another 856 # file which is then renamed over the original file. So we 857 # must check that the inode number doesn't change. 858 inode_before = os.stat(self._path).st_ino 859 860 self._box.add(self._template % 0) 861 self._box.flush() 862 863 inode_after = os.stat(self._path).st_ino 864 self.assertEqual(inode_before, inode_after) 865 866 # Make sure the message was really added 867 self._box.close() 868 self._box = self._factory(self._path) 869 self.assertEqual(len(self._box), 1) 870 871 def test_permissions_after_flush(self): 872 # See issue #5346 873 874 # Make the mailbox world writable. It's unlikely that the new 875 # mailbox file would have these permissions after flush(), 876 # because umask usually prevents it. 877 mode = os.stat(self._path).st_mode | 0o666 878 os.chmod(self._path, mode) 879 880 self._box.add(self._template % 0) 881 i = self._box.add(self._template % 1) 882 # Need to remove one message to make flush() create a new file 883 self._box.remove(i) 884 self._box.flush() 885 886 self.assertEqual(os.stat(self._path).st_mode, mode) 887 888 889 class _TestMboxMMDF(_TestSingleFile): 890 891 def tearDown(self): 892 self._box.close() 893 self._delete_recursively(self._path) 894 for lock_remnant in glob.glob(self._path + '.*'): 895 test_support.unlink(lock_remnant) 896 897 def test_add_from_string(self): 898 # Add a string starting with 'From ' to the mailbox 899 key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n') 900 self.assertEqual(self._box[key].get_from(), 'foo@bar blah') 901 self.assertEqual(self._box[key].get_payload(), '0\n') 902 903 def test_add_mbox_or_mmdf_message(self): 904 # Add an mboxMessage or MMDFMessage 905 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 906 msg = class_('From foo@bar blah\nFrom: foo\n\n0\n') 907 key = self._box.add(msg) 908 909 def test_open_close_open(self): 910 # Open and inspect previously-created mailbox 911 values = [self._template % i for i in xrange(3)] 912 for value in values: 913 self._box.add(value) 914 self._box.close() 915 mtime = os.path.getmtime(self._path) 916 self._box = self._factory(self._path) 917 self.assertEqual(len(self._box), 3) 918 for key in self._box.iterkeys(): 919 self.assertIn(self._box.get_string(key), values) 920 self._box.close() 921 self.assertEqual(mtime, os.path.getmtime(self._path)) 922 923 def test_add_and_close(self): 924 # Verifying that closing a mailbox doesn't change added items 925 self._box.add(_sample_message) 926 for i in xrange(3): 927 self._box.add(self._template % i) 928 self._box.add(_sample_message) 929 self._box._file.flush() 930 self._box._file.seek(0) 931 contents = self._box._file.read() 932 self._box.close() 933 with open(self._path, 'rb') as f: 934 self.assertEqual(contents, f.read()) 935 self._box = self._factory(self._path) 936 937 @unittest.skipUnless(hasattr(os, 'fork'), "Test needs fork().") 938 @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().") 939 def test_lock_conflict(self): 940 # Fork off a child process that will lock the mailbox temporarily, 941 # unlock it and exit. 942 c, p = socket.socketpair() 943 self.addCleanup(c.close) 944 self.addCleanup(p.close) 945 946 pid = os.fork() 947 if pid == 0: 948 # child 949 try: 950 # lock the mailbox, and signal the parent it can proceed 951 self._box.lock() 952 c.send(b'c') 953 954 # wait until the parent is done, and unlock the mailbox 955 c.recv(1) 956 self._box.unlock() 957 finally: 958 os._exit(0) 959 960 # In the parent, wait until the child signals it locked the mailbox. 961 p.recv(1) 962 try: 963 self.assertRaises(mailbox.ExternalClashError, 964 self._box.lock) 965 finally: 966 # Signal the child it can now release the lock and exit. 967 p.send(b'p') 968 # Wait for child to exit. Locking should now succeed. 969 exited_pid, status = os.waitpid(pid, 0) 970 971 self._box.lock() 972 self._box.unlock() 973 974 def test_relock(self): 975 # Test case for bug #1575506: the mailbox class was locking the 976 # wrong file object in its flush() method. 977 msg = "Subject: sub\n\nbody\n" 978 key1 = self._box.add(msg) 979 self._box.flush() 980 self._box.close() 981 982 self._box = self._factory(self._path) 983 self._box.lock() 984 key2 = self._box.add(msg) 985 self._box.flush() 986 self.assertTrue(self._box._locked) 987 self._box.close() 988 989 990 class TestMbox(_TestMboxMMDF, unittest.TestCase): 991 992 _factory = lambda self, path, factory=None: mailbox.mbox(path, factory) 993 994 def test_file_perms(self): 995 # From bug #3228, we want to verify that the mailbox file isn't executable, 996 # even if the umask is set to something that would leave executable bits set. 997 # We only run this test on platforms that support umask. 998 if hasattr(os, 'umask') and hasattr(os, 'stat'): 999 try: 1000 old_umask = os.umask(0077) 1001 self._box.close() 1002 os.unlink(self._path) 1003 self._box = mailbox.mbox(self._path, create=True) 1004 self._box.add('') 1005 self._box.close() 1006 finally: 1007 os.umask(old_umask) 1008 1009 st = os.stat(self._path) 1010 perms = st.st_mode 1011 self.assertFalse((perms & 0111)) # Execute bits should all be off. 1012 1013 def test_terminating_newline(self): 1014 message = email.message.Message() 1015 message['From'] = 'john (at] example.com' 1016 message.set_payload('No newline at the end') 1017 i = self._box.add(message) 1018 1019 # A newline should have been appended to the payload 1020 message = self._box.get(i) 1021 self.assertEqual(message.get_payload(), 'No newline at the end\n') 1022 1023 def test_message_separator(self): 1024 # Check there's always a single blank line after each message 1025 self._box.add('From: foo\n\n0') # No newline at the end 1026 with open(self._path) as f: 1027 data = f.read() 1028 self.assertEqual(data[-3:], '0\n\n') 1029 1030 self._box.add('From: foo\n\n0\n') # Newline at the end 1031 with open(self._path) as f: 1032 data = f.read() 1033 self.assertEqual(data[-3:], '0\n\n') 1034 1035 1036 class TestMMDF(_TestMboxMMDF, unittest.TestCase): 1037 1038 _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory) 1039 1040 1041 class TestMH(TestMailbox, unittest.TestCase): 1042 1043 _factory = lambda self, path, factory=None: mailbox.MH(path, factory) 1044 1045 def test_list_folders(self): 1046 # List folders 1047 self._box.add_folder('one') 1048 self._box.add_folder('two') 1049 self._box.add_folder('three') 1050 self.assertEqual(len(self._box.list_folders()), 3) 1051 self.assertEqual(set(self._box.list_folders()), 1052 set(('one', 'two', 'three'))) 1053 1054 def test_get_folder(self): 1055 # Open folders 1056 def dummy_factory (s): 1057 return None 1058 self._box = self._factory(self._path, dummy_factory) 1059 1060 new_folder = self._box.add_folder('foo.bar') 1061 folder0 = self._box.get_folder('foo.bar') 1062 folder0.add(self._template % 'bar') 1063 self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar'))) 1064 folder1 = self._box.get_folder('foo.bar') 1065 self.assertEqual(folder1.get_string(folder1.keys()[0]), 1066 self._template % 'bar') 1067 1068 # Test for bug #1569790: verify that folders returned by .get_folder() 1069 # use the same factory function. 1070 self.assertIs(new_folder._factory, self._box._factory) 1071 self.assertIs(folder0._factory, self._box._factory) 1072 1073 def test_add_and_remove_folders(self): 1074 # Delete folders 1075 self._box.add_folder('one') 1076 self._box.add_folder('two') 1077 self.assertEqual(len(self._box.list_folders()), 2) 1078 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 1079 self._box.remove_folder('one') 1080 self.assertEqual(len(self._box.list_folders()), 1) 1081 self.assertEqual(set(self._box.list_folders()), set(('two', ))) 1082 self._box.add_folder('three') 1083 self.assertEqual(len(self._box.list_folders()), 2) 1084 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 1085 self._box.remove_folder('three') 1086 self.assertEqual(len(self._box.list_folders()), 1) 1087 self.assertEqual(set(self._box.list_folders()), set(('two', ))) 1088 self._box.remove_folder('two') 1089 self.assertEqual(len(self._box.list_folders()), 0) 1090 self.assertEqual(self._box.list_folders(), []) 1091 1092 def test_sequences(self): 1093 # Get and set sequences 1094 self.assertEqual(self._box.get_sequences(), {}) 1095 msg0 = mailbox.MHMessage(self._template % 0) 1096 msg0.add_sequence('foo') 1097 key0 = self._box.add(msg0) 1098 self.assertEqual(self._box.get_sequences(), {'foo':[key0]}) 1099 msg1 = mailbox.MHMessage(self._template % 1) 1100 msg1.set_sequences(['bar', 'replied', 'foo']) 1101 key1 = self._box.add(msg1) 1102 self.assertEqual(self._box.get_sequences(), 1103 {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]}) 1104 msg0.set_sequences(['flagged']) 1105 self._box[key0] = msg0 1106 self.assertEqual(self._box.get_sequences(), 1107 {'foo':[key1], 'bar':[key1], 'replied':[key1], 1108 'flagged':[key0]}) 1109 self._box.remove(key1) 1110 self.assertEqual(self._box.get_sequences(), {'flagged':[key0]}) 1111 1112 def test_issue2625(self): 1113 msg0 = mailbox.MHMessage(self._template % 0) 1114 msg0.add_sequence('foo') 1115 key0 = self._box.add(msg0) 1116 refmsg0 = self._box.get_message(key0) 1117 1118 def test_issue7627(self): 1119 msg0 = mailbox.MHMessage(self._template % 0) 1120 key0 = self._box.add(msg0) 1121 self._box.lock() 1122 self._box.remove(key0) 1123 self._box.unlock() 1124 1125 def test_pack(self): 1126 # Pack the contents of the mailbox 1127 msg0 = mailbox.MHMessage(self._template % 0) 1128 msg1 = mailbox.MHMessage(self._template % 1) 1129 msg2 = mailbox.MHMessage(self._template % 2) 1130 msg3 = mailbox.MHMessage(self._template % 3) 1131 msg0.set_sequences(['foo', 'unseen']) 1132 msg1.set_sequences(['foo']) 1133 msg2.set_sequences(['foo', 'flagged']) 1134 msg3.set_sequences(['foo', 'bar', 'replied']) 1135 key0 = self._box.add(msg0) 1136 key1 = self._box.add(msg1) 1137 key2 = self._box.add(msg2) 1138 key3 = self._box.add(msg3) 1139 self.assertEqual(self._box.get_sequences(), 1140 {'foo':[key0,key1,key2,key3], 'unseen':[key0], 1141 'flagged':[key2], 'bar':[key3], 'replied':[key3]}) 1142 self._box.remove(key2) 1143 self.assertEqual(self._box.get_sequences(), 1144 {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3], 1145 'replied':[key3]}) 1146 self._box.pack() 1147 self.assertEqual(self._box.keys(), [1, 2, 3]) 1148 key0 = key0 1149 key1 = key0 + 1 1150 key2 = key1 + 1 1151 self.assertEqual(self._box.get_sequences(), 1152 {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]}) 1153 1154 # Test case for packing while holding the mailbox locked. 1155 key0 = self._box.add(msg1) 1156 key1 = self._box.add(msg1) 1157 key2 = self._box.add(msg1) 1158 key3 = self._box.add(msg1) 1159 1160 self._box.remove(key0) 1161 self._box.remove(key2) 1162 self._box.lock() 1163 self._box.pack() 1164 self._box.unlock() 1165 self.assertEqual(self._box.get_sequences(), 1166 {'foo':[1, 2, 3, 4, 5], 1167 'unseen':[1], 'bar':[3], 'replied':[3]}) 1168 1169 def _get_lock_path(self): 1170 return os.path.join(self._path, '.mh_sequences.lock') 1171 1172 1173 class TestBabyl(_TestSingleFile, unittest.TestCase): 1174 1175 _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory) 1176 1177 def tearDown(self): 1178 self._box.close() 1179 self._delete_recursively(self._path) 1180 for lock_remnant in glob.glob(self._path + '.*'): 1181 test_support.unlink(lock_remnant) 1182 1183 def test_labels(self): 1184 # Get labels from the mailbox 1185 self.assertEqual(self._box.get_labels(), []) 1186 msg0 = mailbox.BabylMessage(self._template % 0) 1187 msg0.add_label('foo') 1188 key0 = self._box.add(msg0) 1189 self.assertEqual(self._box.get_labels(), ['foo']) 1190 msg1 = mailbox.BabylMessage(self._template % 1) 1191 msg1.set_labels(['bar', 'answered', 'foo']) 1192 key1 = self._box.add(msg1) 1193 self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar'])) 1194 msg0.set_labels(['blah', 'filed']) 1195 self._box[key0] = msg0 1196 self.assertEqual(set(self._box.get_labels()), 1197 set(['foo', 'bar', 'blah'])) 1198 self._box.remove(key1) 1199 self.assertEqual(set(self._box.get_labels()), set(['blah'])) 1200 1201 1202 class TestMessage(TestBase, unittest.TestCase): 1203 1204 _factory = mailbox.Message # Overridden by subclasses to reuse tests 1205 1206 def setUp(self): 1207 self._path = test_support.TESTFN 1208 1209 def tearDown(self): 1210 self._delete_recursively(self._path) 1211 1212 def test_initialize_with_eMM(self): 1213 # Initialize based on email.message.Message instance 1214 eMM = email.message_from_string(_sample_message) 1215 msg = self._factory(eMM) 1216 self._post_initialize_hook(msg) 1217 self._check_sample(msg) 1218 1219 def test_initialize_with_string(self): 1220 # Initialize based on string 1221 msg = self._factory(_sample_message) 1222 self._post_initialize_hook(msg) 1223 self._check_sample(msg) 1224 1225 def test_initialize_with_file(self): 1226 # Initialize based on contents of file 1227 with open(self._path, 'w+') as f: 1228 f.write(_sample_message) 1229 f.seek(0) 1230 msg = self._factory(f) 1231 self._post_initialize_hook(msg) 1232 self._check_sample(msg) 1233 1234 def test_initialize_with_nothing(self): 1235 # Initialize without arguments 1236 msg = self._factory() 1237 self._post_initialize_hook(msg) 1238 self.assertIsInstance(msg, email.message.Message) 1239 self.assertIsInstance(msg, mailbox.Message) 1240 self.assertIsInstance(msg, self._factory) 1241 self.assertEqual(msg.keys(), []) 1242 self.assertFalse(msg.is_multipart()) 1243 self.assertEqual(msg.get_payload(), None) 1244 1245 def test_initialize_incorrectly(self): 1246 # Initialize with invalid argument 1247 self.assertRaises(TypeError, lambda: self._factory(object())) 1248 1249 def test_become_message(self): 1250 # Take on the state of another message 1251 eMM = email.message_from_string(_sample_message) 1252 msg = self._factory() 1253 msg._become_message(eMM) 1254 self._check_sample(msg) 1255 1256 def test_explain_to(self): 1257 # Copy self's format-specific data to other message formats. 1258 # This test is superficial; better ones are in TestMessageConversion. 1259 msg = self._factory() 1260 for class_ in (mailbox.Message, mailbox.MaildirMessage, 1261 mailbox.mboxMessage, mailbox.MHMessage, 1262 mailbox.BabylMessage, mailbox.MMDFMessage): 1263 other_msg = class_() 1264 msg._explain_to(other_msg) 1265 other_msg = email.message.Message() 1266 self.assertRaises(TypeError, lambda: msg._explain_to(other_msg)) 1267 1268 def _post_initialize_hook(self, msg): 1269 # Overridden by subclasses to check extra things after initialization 1270 pass 1271 1272 1273 class TestMaildirMessage(TestMessage, unittest.TestCase): 1274 1275 _factory = mailbox.MaildirMessage 1276 1277 def _post_initialize_hook(self, msg): 1278 self.assertEqual(msg._subdir, 'new') 1279 self.assertEqual(msg._info,'') 1280 1281 def test_subdir(self): 1282 # Use get_subdir() and set_subdir() 1283 msg = mailbox.MaildirMessage(_sample_message) 1284 self.assertEqual(msg.get_subdir(), 'new') 1285 msg.set_subdir('cur') 1286 self.assertEqual(msg.get_subdir(), 'cur') 1287 msg.set_subdir('new') 1288 self.assertEqual(msg.get_subdir(), 'new') 1289 self.assertRaises(ValueError, lambda: msg.set_subdir('tmp')) 1290 self.assertEqual(msg.get_subdir(), 'new') 1291 msg.set_subdir('new') 1292 self.assertEqual(msg.get_subdir(), 'new') 1293 self._check_sample(msg) 1294 1295 def test_flags(self): 1296 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1297 msg = mailbox.MaildirMessage(_sample_message) 1298 self.assertEqual(msg.get_flags(), '') 1299 self.assertEqual(msg.get_subdir(), 'new') 1300 msg.set_flags('F') 1301 self.assertEqual(msg.get_subdir(), 'new') 1302 self.assertEqual(msg.get_flags(), 'F') 1303 msg.set_flags('SDTP') 1304 self.assertEqual(msg.get_flags(), 'DPST') 1305 msg.add_flag('FT') 1306 self.assertEqual(msg.get_flags(), 'DFPST') 1307 msg.remove_flag('TDRP') 1308 self.assertEqual(msg.get_flags(), 'FS') 1309 self.assertEqual(msg.get_subdir(), 'new') 1310 self._check_sample(msg) 1311 1312 def test_date(self): 1313 # Use get_date() and set_date() 1314 msg = mailbox.MaildirMessage(_sample_message) 1315 diff = msg.get_date() - time.time() 1316 self.assertTrue(abs(diff) < 60, diff) 1317 msg.set_date(0.0) 1318 self.assertEqual(msg.get_date(), 0.0) 1319 1320 def test_info(self): 1321 # Use get_info() and set_info() 1322 msg = mailbox.MaildirMessage(_sample_message) 1323 self.assertEqual(msg.get_info(), '') 1324 msg.set_info('1,foo=bar') 1325 self.assertEqual(msg.get_info(), '1,foo=bar') 1326 self.assertRaises(TypeError, lambda: msg.set_info(None)) 1327 self._check_sample(msg) 1328 1329 def test_info_and_flags(self): 1330 # Test interaction of info and flag methods 1331 msg = mailbox.MaildirMessage(_sample_message) 1332 self.assertEqual(msg.get_info(), '') 1333 msg.set_flags('SF') 1334 self.assertEqual(msg.get_flags(), 'FS') 1335 self.assertEqual(msg.get_info(), '2,FS') 1336 msg.set_info('1,') 1337 self.assertEqual(msg.get_flags(), '') 1338 self.assertEqual(msg.get_info(), '1,') 1339 msg.remove_flag('RPT') 1340 self.assertEqual(msg.get_flags(), '') 1341 self.assertEqual(msg.get_info(), '1,') 1342 msg.add_flag('D') 1343 self.assertEqual(msg.get_flags(), 'D') 1344 self.assertEqual(msg.get_info(), '2,D') 1345 self._check_sample(msg) 1346 1347 1348 class _TestMboxMMDFMessage: 1349 1350 _factory = mailbox._mboxMMDFMessage 1351 1352 def _post_initialize_hook(self, msg): 1353 self._check_from(msg) 1354 1355 def test_initialize_with_unixfrom(self): 1356 # Initialize with a message that already has a _unixfrom attribute 1357 msg = mailbox.Message(_sample_message) 1358 msg.set_unixfrom('From foo@bar blah') 1359 msg = mailbox.mboxMessage(msg) 1360 self.assertEqual(msg.get_from(), 'foo@bar blah') 1361 1362 def test_from(self): 1363 # Get and set "From " line 1364 msg = mailbox.mboxMessage(_sample_message) 1365 self._check_from(msg) 1366 msg.set_from('foo bar') 1367 self.assertEqual(msg.get_from(), 'foo bar') 1368 msg.set_from('foo@bar', True) 1369 self._check_from(msg, 'foo@bar') 1370 msg.set_from('blah@temp', time.localtime()) 1371 self._check_from(msg, 'blah@temp') 1372 1373 def test_flags(self): 1374 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1375 msg = mailbox.mboxMessage(_sample_message) 1376 self.assertEqual(msg.get_flags(), '') 1377 msg.set_flags('F') 1378 self.assertEqual(msg.get_flags(), 'F') 1379 msg.set_flags('XODR') 1380 self.assertEqual(msg.get_flags(), 'RODX') 1381 msg.add_flag('FA') 1382 self.assertEqual(msg.get_flags(), 'RODFAX') 1383 msg.remove_flag('FDXA') 1384 self.assertEqual(msg.get_flags(), 'RO') 1385 self._check_sample(msg) 1386 1387 def _check_from(self, msg, sender=None): 1388 # Check contents of "From " line 1389 if sender is None: 1390 sender = "MAILER-DAEMON" 1391 self.assertTrue(re.match(sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:" 1392 r"\d{2} \d{4}", msg.get_from())) 1393 1394 1395 class TestMboxMessage(_TestMboxMMDFMessage, TestMessage): 1396 1397 _factory = mailbox.mboxMessage 1398 1399 1400 class TestMHMessage(TestMessage, unittest.TestCase): 1401 1402 _factory = mailbox.MHMessage 1403 1404 def _post_initialize_hook(self, msg): 1405 self.assertEqual(msg._sequences, []) 1406 1407 def test_sequences(self): 1408 # Get, set, join, and leave sequences 1409 msg = mailbox.MHMessage(_sample_message) 1410 self.assertEqual(msg.get_sequences(), []) 1411 msg.set_sequences(['foobar']) 1412 self.assertEqual(msg.get_sequences(), ['foobar']) 1413 msg.set_sequences([]) 1414 self.assertEqual(msg.get_sequences(), []) 1415 msg.add_sequence('unseen') 1416 self.assertEqual(msg.get_sequences(), ['unseen']) 1417 msg.add_sequence('flagged') 1418 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1419 msg.add_sequence('flagged') 1420 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1421 msg.remove_sequence('unseen') 1422 self.assertEqual(msg.get_sequences(), ['flagged']) 1423 msg.add_sequence('foobar') 1424 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1425 msg.remove_sequence('replied') 1426 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1427 msg.set_sequences(['foobar', 'replied']) 1428 self.assertEqual(msg.get_sequences(), ['foobar', 'replied']) 1429 1430 1431 class TestBabylMessage(TestMessage, unittest.TestCase): 1432 1433 _factory = mailbox.BabylMessage 1434 1435 def _post_initialize_hook(self, msg): 1436 self.assertEqual(msg._labels, []) 1437 1438 def test_labels(self): 1439 # Get, set, join, and leave labels 1440 msg = mailbox.BabylMessage(_sample_message) 1441 self.assertEqual(msg.get_labels(), []) 1442 msg.set_labels(['foobar']) 1443 self.assertEqual(msg.get_labels(), ['foobar']) 1444 msg.set_labels([]) 1445 self.assertEqual(msg.get_labels(), []) 1446 msg.add_label('filed') 1447 self.assertEqual(msg.get_labels(), ['filed']) 1448 msg.add_label('resent') 1449 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1450 msg.add_label('resent') 1451 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1452 msg.remove_label('filed') 1453 self.assertEqual(msg.get_labels(), ['resent']) 1454 msg.add_label('foobar') 1455 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1456 msg.remove_label('unseen') 1457 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1458 msg.set_labels(['foobar', 'answered']) 1459 self.assertEqual(msg.get_labels(), ['foobar', 'answered']) 1460 1461 def test_visible(self): 1462 # Get, set, and update visible headers 1463 msg = mailbox.BabylMessage(_sample_message) 1464 visible = msg.get_visible() 1465 self.assertEqual(visible.keys(), []) 1466 self.assertIs(visible.get_payload(), None) 1467 visible['User-Agent'] = 'FooBar 1.0' 1468 visible['X-Whatever'] = 'Blah' 1469 self.assertEqual(msg.get_visible().keys(), []) 1470 msg.set_visible(visible) 1471 visible = msg.get_visible() 1472 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1473 self.assertEqual(visible['User-Agent'], 'FooBar 1.0') 1474 self.assertEqual(visible['X-Whatever'], 'Blah') 1475 self.assertIs(visible.get_payload(), None) 1476 msg.update_visible() 1477 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1478 self.assertIs(visible.get_payload(), None) 1479 visible = msg.get_visible() 1480 self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To', 1481 'Subject']) 1482 for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'): 1483 self.assertEqual(visible[header], msg[header]) 1484 1485 1486 class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage): 1487 1488 _factory = mailbox.MMDFMessage 1489 1490 1491 class TestMessageConversion(TestBase, unittest.TestCase): 1492 1493 def test_plain_to_x(self): 1494 # Convert Message to all formats 1495 for class_ in (mailbox.Message, mailbox.MaildirMessage, 1496 mailbox.mboxMessage, mailbox.MHMessage, 1497 mailbox.BabylMessage, mailbox.MMDFMessage): 1498 msg_plain = mailbox.Message(_sample_message) 1499 msg = class_(msg_plain) 1500 self._check_sample(msg) 1501 1502 def test_x_to_plain(self): 1503 # Convert all formats to Message 1504 for class_ in (mailbox.Message, mailbox.MaildirMessage, 1505 mailbox.mboxMessage, mailbox.MHMessage, 1506 mailbox.BabylMessage, mailbox.MMDFMessage): 1507 msg = class_(_sample_message) 1508 msg_plain = mailbox.Message(msg) 1509 self._check_sample(msg_plain) 1510 1511 def test_x_to_invalid(self): 1512 # Convert all formats to an invalid format 1513 for class_ in (mailbox.Message, mailbox.MaildirMessage, 1514 mailbox.mboxMessage, mailbox.MHMessage, 1515 mailbox.BabylMessage, mailbox.MMDFMessage): 1516 self.assertRaises(TypeError, lambda: class_(False)) 1517 1518 def test_maildir_to_maildir(self): 1519 # Convert MaildirMessage to MaildirMessage 1520 msg_maildir = mailbox.MaildirMessage(_sample_message) 1521 msg_maildir.set_flags('DFPRST') 1522 msg_maildir.set_subdir('cur') 1523 date = msg_maildir.get_date() 1524 msg = mailbox.MaildirMessage(msg_maildir) 1525 self._check_sample(msg) 1526 self.assertEqual(msg.get_flags(), 'DFPRST') 1527 self.assertEqual(msg.get_subdir(), 'cur') 1528 self.assertEqual(msg.get_date(), date) 1529 1530 def test_maildir_to_mboxmmdf(self): 1531 # Convert MaildirMessage to mboxmessage and MMDFMessage 1532 pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'), 1533 ('T', 'D'), ('DFPRST', 'RDFA')) 1534 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1535 msg_maildir = mailbox.MaildirMessage(_sample_message) 1536 msg_maildir.set_date(0.0) 1537 for setting, result in pairs: 1538 msg_maildir.set_flags(setting) 1539 msg = class_(msg_maildir) 1540 self.assertEqual(msg.get_flags(), result) 1541 self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' % 1542 time.asctime(time.gmtime(0.0))) 1543 msg_maildir.set_subdir('cur') 1544 self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA') 1545 1546 def test_maildir_to_mh(self): 1547 # Convert MaildirMessage to MHMessage 1548 msg_maildir = mailbox.MaildirMessage(_sample_message) 1549 pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']), 1550 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []), 1551 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged'])) 1552 for setting, result in pairs: 1553 msg_maildir.set_flags(setting) 1554 self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(), 1555 result) 1556 1557 def test_maildir_to_babyl(self): 1558 # Convert MaildirMessage to Babyl 1559 msg_maildir = mailbox.MaildirMessage(_sample_message) 1560 pairs = (('D', ['unseen']), ('F', ['unseen']), 1561 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']), 1562 ('S', []), ('T', ['unseen', 'deleted']), 1563 ('DFPRST', ['deleted', 'answered', 'forwarded'])) 1564 for setting, result in pairs: 1565 msg_maildir.set_flags(setting) 1566 self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(), 1567 result) 1568 1569 def test_mboxmmdf_to_maildir(self): 1570 # Convert mboxMessage and MMDFMessage to MaildirMessage 1571 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1572 msg_mboxMMDF = class_(_sample_message) 1573 msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0)) 1574 pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'), 1575 ('RODFA', 'FRST')) 1576 for setting, result in pairs: 1577 msg_mboxMMDF.set_flags(setting) 1578 msg = mailbox.MaildirMessage(msg_mboxMMDF) 1579 self.assertEqual(msg.get_flags(), result) 1580 self.assertEqual(msg.get_date(), 0.0) 1581 msg_mboxMMDF.set_flags('O') 1582 self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(), 1583 'cur') 1584 1585 def test_mboxmmdf_to_mboxmmdf(self): 1586 # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage 1587 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1588 msg_mboxMMDF = class_(_sample_message) 1589 msg_mboxMMDF.set_flags('RODFA') 1590 msg_mboxMMDF.set_from('foo@bar') 1591 for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1592 msg2 = class2_(msg_mboxMMDF) 1593 self.assertEqual(msg2.get_flags(), 'RODFA') 1594 self.assertEqual(msg2.get_from(), 'foo@bar') 1595 1596 def test_mboxmmdf_to_mh(self): 1597 # Convert mboxMessage and MMDFMessage to MHMessage 1598 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1599 msg_mboxMMDF = class_(_sample_message) 1600 pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']), 1601 ('F', ['unseen', 'flagged']), 1602 ('A', ['unseen', 'replied']), 1603 ('RODFA', ['replied', 'flagged'])) 1604 for setting, result in pairs: 1605 msg_mboxMMDF.set_flags(setting) 1606 self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(), 1607 result) 1608 1609 def test_mboxmmdf_to_babyl(self): 1610 # Convert mboxMessage and MMDFMessage to BabylMessage 1611 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1612 msg = class_(_sample_message) 1613 pairs = (('R', []), ('O', ['unseen']), 1614 ('D', ['unseen', 'deleted']), ('F', ['unseen']), 1615 ('A', ['unseen', 'answered']), 1616 ('RODFA', ['deleted', 'answered'])) 1617 for setting, result in pairs: 1618 msg.set_flags(setting) 1619 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1620 1621 def test_mh_to_maildir(self): 1622 # Convert MHMessage to MaildirMessage 1623 pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS')) 1624 for setting, result in pairs: 1625 msg = mailbox.MHMessage(_sample_message) 1626 msg.add_sequence(setting) 1627 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1628 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1629 msg = mailbox.MHMessage(_sample_message) 1630 msg.add_sequence('unseen') 1631 msg.add_sequence('replied') 1632 msg.add_sequence('flagged') 1633 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR') 1634 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1635 1636 def test_mh_to_mboxmmdf(self): 1637 # Convert MHMessage to mboxMessage and MMDFMessage 1638 pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF')) 1639 for setting, result in pairs: 1640 msg = mailbox.MHMessage(_sample_message) 1641 msg.add_sequence(setting) 1642 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1643 self.assertEqual(class_(msg).get_flags(), result) 1644 msg = mailbox.MHMessage(_sample_message) 1645 msg.add_sequence('unseen') 1646 msg.add_sequence('replied') 1647 msg.add_sequence('flagged') 1648 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1649 self.assertEqual(class_(msg).get_flags(), 'OFA') 1650 1651 def test_mh_to_mh(self): 1652 # Convert MHMessage to MHMessage 1653 msg = mailbox.MHMessage(_sample_message) 1654 msg.add_sequence('unseen') 1655 msg.add_sequence('replied') 1656 msg.add_sequence('flagged') 1657 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1658 ['unseen', 'replied', 'flagged']) 1659 1660 def test_mh_to_babyl(self): 1661 # Convert MHMessage to BabylMessage 1662 pairs = (('unseen', ['unseen']), ('replied', ['answered']), 1663 ('flagged', [])) 1664 for setting, result in pairs: 1665 msg = mailbox.MHMessage(_sample_message) 1666 msg.add_sequence(setting) 1667 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1668 msg = mailbox.MHMessage(_sample_message) 1669 msg.add_sequence('unseen') 1670 msg.add_sequence('replied') 1671 msg.add_sequence('flagged') 1672 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), 1673 ['unseen', 'answered']) 1674 1675 def test_babyl_to_maildir(self): 1676 # Convert BabylMessage to MaildirMessage 1677 pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'), 1678 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'), 1679 ('resent', 'PS')) 1680 for setting, result in pairs: 1681 msg = mailbox.BabylMessage(_sample_message) 1682 msg.add_label(setting) 1683 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1684 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1685 msg = mailbox.BabylMessage(_sample_message) 1686 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1687 'edited', 'resent'): 1688 msg.add_label(label) 1689 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT') 1690 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1691 1692 def test_babyl_to_mboxmmdf(self): 1693 # Convert BabylMessage to mboxMessage and MMDFMessage 1694 pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'), 1695 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'), 1696 ('resent', 'RO')) 1697 for setting, result in pairs: 1698 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1699 msg = mailbox.BabylMessage(_sample_message) 1700 msg.add_label(setting) 1701 self.assertEqual(class_(msg).get_flags(), result) 1702 msg = mailbox.BabylMessage(_sample_message) 1703 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1704 'edited', 'resent'): 1705 msg.add_label(label) 1706 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1707 self.assertEqual(class_(msg).get_flags(), 'ODA') 1708 1709 def test_babyl_to_mh(self): 1710 # Convert BabylMessage to MHMessage 1711 pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []), 1712 ('answered', ['replied']), ('forwarded', []), ('edited', []), 1713 ('resent', [])) 1714 for setting, result in pairs: 1715 msg = mailbox.BabylMessage(_sample_message) 1716 msg.add_label(setting) 1717 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result) 1718 msg = mailbox.BabylMessage(_sample_message) 1719 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1720 'edited', 'resent'): 1721 msg.add_label(label) 1722 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1723 ['unseen', 'replied']) 1724 1725 def test_babyl_to_babyl(self): 1726 # Convert BabylMessage to BabylMessage 1727 msg = mailbox.BabylMessage(_sample_message) 1728 msg.update_visible() 1729 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1730 'edited', 'resent'): 1731 msg.add_label(label) 1732 msg2 = mailbox.BabylMessage(msg) 1733 self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed', 1734 'answered', 'forwarded', 'edited', 1735 'resent']) 1736 self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys()) 1737 for key in msg.get_visible().keys(): 1738 self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key]) 1739 1740 1741 class TestProxyFileBase(TestBase): 1742 1743 def _test_read(self, proxy): 1744 # Read by byte 1745 proxy.seek(0) 1746 self.assertEqual(proxy.read(), 'bar') 1747 proxy.seek(1) 1748 self.assertEqual(proxy.read(), 'ar') 1749 proxy.seek(0) 1750 self.assertEqual(proxy.read(2), 'ba') 1751 proxy.seek(1) 1752 self.assertEqual(proxy.read(-1), 'ar') 1753 proxy.seek(2) 1754 self.assertEqual(proxy.read(1000), 'r') 1755 1756 def _test_readline(self, proxy): 1757 # Read by line 1758 proxy.seek(0) 1759 self.assertEqual(proxy.readline(), 'foo' + os.linesep) 1760 self.assertEqual(proxy.readline(), 'bar' + os.linesep) 1761 self.assertEqual(proxy.readline(), 'fred' + os.linesep) 1762 self.assertEqual(proxy.readline(), 'bob') 1763 proxy.seek(2) 1764 self.assertEqual(proxy.readline(), 'o' + os.linesep) 1765 proxy.seek(6 + 2 * len(os.linesep)) 1766 self.assertEqual(proxy.readline(), 'fred' + os.linesep) 1767 proxy.seek(6 + 2 * len(os.linesep)) 1768 self.assertEqual(proxy.readline(2), 'fr') 1769 self.assertEqual(proxy.readline(-10), 'ed' + os.linesep) 1770 1771 def _test_readlines(self, proxy): 1772 # Read multiple lines 1773 proxy.seek(0) 1774 self.assertEqual(proxy.readlines(), ['foo' + os.linesep, 1775 'bar' + os.linesep, 1776 'fred' + os.linesep, 'bob']) 1777 proxy.seek(0) 1778 self.assertEqual(proxy.readlines(2), ['foo' + os.linesep]) 1779 proxy.seek(3 + len(os.linesep)) 1780 self.assertEqual(proxy.readlines(4 + len(os.linesep)), 1781 ['bar' + os.linesep, 'fred' + os.linesep]) 1782 proxy.seek(3) 1783 self.assertEqual(proxy.readlines(1000), [os.linesep, 'bar' + os.linesep, 1784 'fred' + os.linesep, 'bob']) 1785 1786 def _test_iteration(self, proxy): 1787 # Iterate by line 1788 proxy.seek(0) 1789 iterator = iter(proxy) 1790 self.assertEqual(list(iterator), 1791 ['foo' + os.linesep, 'bar' + os.linesep, 'fred' + os.linesep, 'bob']) 1792 1793 def _test_seek_and_tell(self, proxy): 1794 # Seek and use tell to check position 1795 proxy.seek(3) 1796 self.assertEqual(proxy.tell(), 3) 1797 self.assertEqual(proxy.read(len(os.linesep)), os.linesep) 1798 proxy.seek(2, 1) 1799 self.assertEqual(proxy.read(1 + len(os.linesep)), 'r' + os.linesep) 1800 proxy.seek(-3 - len(os.linesep), 2) 1801 self.assertEqual(proxy.read(3), 'bar') 1802 proxy.seek(2, 0) 1803 self.assertEqual(proxy.read(), 'o' + os.linesep + 'bar' + os.linesep) 1804 proxy.seek(100) 1805 self.assertEqual(proxy.read(), '') 1806 1807 def _test_close(self, proxy): 1808 # Close a file 1809 proxy.close() 1810 # Issue 11700 subsequent closes should be a no-op, not an error. 1811 proxy.close() 1812 1813 1814 class TestProxyFile(TestProxyFileBase, unittest.TestCase): 1815 1816 def setUp(self): 1817 self._path = test_support.TESTFN 1818 self._file = open(self._path, 'wb+') 1819 1820 def tearDown(self): 1821 self._file.close() 1822 self._delete_recursively(self._path) 1823 1824 def test_initialize(self): 1825 # Initialize and check position 1826 self._file.write('foo') 1827 pos = self._file.tell() 1828 proxy0 = mailbox._ProxyFile(self._file) 1829 self.assertEqual(proxy0.tell(), pos) 1830 self.assertEqual(self._file.tell(), pos) 1831 proxy1 = mailbox._ProxyFile(self._file, 0) 1832 self.assertEqual(proxy1.tell(), 0) 1833 self.assertEqual(self._file.tell(), pos) 1834 1835 def test_read(self): 1836 self._file.write('bar') 1837 self._test_read(mailbox._ProxyFile(self._file)) 1838 1839 def test_readline(self): 1840 self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 1841 os.linesep)) 1842 self._test_readline(mailbox._ProxyFile(self._file)) 1843 1844 def test_readlines(self): 1845 self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 1846 os.linesep)) 1847 self._test_readlines(mailbox._ProxyFile(self._file)) 1848 1849 def test_iteration(self): 1850 self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 1851 os.linesep)) 1852 self._test_iteration(mailbox._ProxyFile(self._file)) 1853 1854 def test_seek_and_tell(self): 1855 self._file.write('foo%sbar%s' % (os.linesep, os.linesep)) 1856 self._test_seek_and_tell(mailbox._ProxyFile(self._file)) 1857 1858 def test_close(self): 1859 self._file.write('foo%sbar%s' % (os.linesep, os.linesep)) 1860 self._test_close(mailbox._ProxyFile(self._file)) 1861 1862 1863 class TestPartialFile(TestProxyFileBase, unittest.TestCase): 1864 1865 def setUp(self): 1866 self._path = test_support.TESTFN 1867 self._file = open(self._path, 'wb+') 1868 1869 def tearDown(self): 1870 self._file.close() 1871 self._delete_recursively(self._path) 1872 1873 def test_initialize(self): 1874 # Initialize and check position 1875 self._file.write('foo' + os.linesep + 'bar') 1876 pos = self._file.tell() 1877 proxy = mailbox._PartialFile(self._file, 2, 5) 1878 self.assertEqual(proxy.tell(), 0) 1879 self.assertEqual(self._file.tell(), pos) 1880 1881 def test_read(self): 1882 self._file.write('***bar***') 1883 self._test_read(mailbox._PartialFile(self._file, 3, 6)) 1884 1885 def test_readline(self): 1886 self._file.write('!!!!!foo%sbar%sfred%sbob!!!!!' % 1887 (os.linesep, os.linesep, os.linesep)) 1888 self._test_readline(mailbox._PartialFile(self._file, 5, 1889 18 + 3 * len(os.linesep))) 1890 1891 def test_readlines(self): 1892 self._file.write('foo%sbar%sfred%sbob?????' % 1893 (os.linesep, os.linesep, os.linesep)) 1894 self._test_readlines(mailbox._PartialFile(self._file, 0, 1895 13 + 3 * len(os.linesep))) 1896 1897 def test_iteration(self): 1898 self._file.write('____foo%sbar%sfred%sbob####' % 1899 (os.linesep, os.linesep, os.linesep)) 1900 self._test_iteration(mailbox._PartialFile(self._file, 4, 1901 17 + 3 * len(os.linesep))) 1902 1903 def test_seek_and_tell(self): 1904 self._file.write('(((foo%sbar%s$$$' % (os.linesep, os.linesep)) 1905 self._test_seek_and_tell(mailbox._PartialFile(self._file, 3, 1906 9 + 2 * len(os.linesep))) 1907 1908 def test_close(self): 1909 self._file.write('&foo%sbar%s^' % (os.linesep, os.linesep)) 1910 self._test_close(mailbox._PartialFile(self._file, 1, 1911 6 + 3 * len(os.linesep))) 1912 1913 1914 ## Start: tests from the original module (for backward compatibility). 1915 1916 FROM_ = "From some.body (at] dummy.domain Sat Jul 24 13:43:35 2004\n" 1917 DUMMY_MESSAGE = """\ 1918 From: some.body (at] dummy.domain 1919 To: me (at] my.domain 1920 Subject: Simple Test 1921 1922 This is a dummy message. 1923 """ 1924 1925 class MaildirTestCase(unittest.TestCase): 1926 1927 def setUp(self): 1928 # create a new maildir mailbox to work with: 1929 self._dir = test_support.TESTFN 1930 if os.path.isdir(self._dir): 1931 test_support.rmtree(self._dir) 1932 if os.path.isfile(self._dir): 1933 test_support.unlink(self._dir) 1934 os.mkdir(self._dir) 1935 os.mkdir(os.path.join(self._dir, "cur")) 1936 os.mkdir(os.path.join(self._dir, "tmp")) 1937 os.mkdir(os.path.join(self._dir, "new")) 1938 self._counter = 1 1939 self._msgfiles = [] 1940 1941 def tearDown(self): 1942 map(os.unlink, self._msgfiles) 1943 test_support.rmdir(os.path.join(self._dir, "cur")) 1944 test_support.rmdir(os.path.join(self._dir, "tmp")) 1945 test_support.rmdir(os.path.join(self._dir, "new")) 1946 test_support.rmdir(self._dir) 1947 1948 def createMessage(self, dir, mbox=False): 1949 t = int(time.time() % 1000000) 1950 pid = self._counter 1951 self._counter += 1 1952 filename = os.extsep.join((str(t), str(pid), "myhostname", "mydomain")) 1953 tmpname = os.path.join(self._dir, "tmp", filename) 1954 newname = os.path.join(self._dir, dir, filename) 1955 with open(tmpname, "w") as fp: 1956 self._msgfiles.append(tmpname) 1957 if mbox: 1958 fp.write(FROM_) 1959 fp.write(DUMMY_MESSAGE) 1960 if hasattr(os, "link"): 1961 os.link(tmpname, newname) 1962 else: 1963 with open(newname, "w") as fp: 1964 fp.write(DUMMY_MESSAGE) 1965 self._msgfiles.append(newname) 1966 return tmpname 1967 1968 def test_empty_maildir(self): 1969 """Test an empty maildir mailbox""" 1970 # Test for regression on bug #117490: 1971 # Make sure the boxes attribute actually gets set. 1972 self.mbox = mailbox.Maildir(test_support.TESTFN) 1973 #self.assertTrue(hasattr(self.mbox, "boxes")) 1974 #self.assertTrue(len(self.mbox.boxes) == 0) 1975 self.assertIs(self.mbox.next(), None) 1976 self.assertIs(self.mbox.next(), None) 1977 1978 def test_nonempty_maildir_cur(self): 1979 self.createMessage("cur") 1980 self.mbox = mailbox.Maildir(test_support.TESTFN) 1981 #self.assertTrue(len(self.mbox.boxes) == 1) 1982 msg = self.mbox.next() 1983 self.assertIsNot(msg, None) 1984 msg.fp.close() 1985 self.assertIs(self.mbox.next(), None) 1986 self.assertIs(self.mbox.next(), None) 1987 1988 def test_nonempty_maildir_new(self): 1989 self.createMessage("new") 1990 self.mbox = mailbox.Maildir(test_support.TESTFN) 1991 #self.assertTrue(len(self.mbox.boxes) == 1) 1992 msg = self.mbox.next() 1993 self.assertIsNot(msg, None) 1994 msg.fp.close() 1995 self.assertIs(self.mbox.next(), None) 1996 self.assertIs(self.mbox.next(), None) 1997 1998 def test_nonempty_maildir_both(self): 1999 self.createMessage("cur") 2000 self.createMessage("new") 2001 self.mbox = mailbox.Maildir(test_support.TESTFN) 2002 #self.assertTrue(len(self.mbox.boxes) == 2) 2003 msg = self.mbox.next() 2004 self.assertIsNot(msg, None) 2005 msg.fp.close() 2006 msg = self.mbox.next() 2007 self.assertIsNot(msg, None) 2008 msg.fp.close() 2009 self.assertIs(self.mbox.next(), None) 2010 self.assertIs(self.mbox.next(), None) 2011 2012 def test_unix_mbox(self): 2013 ### should be better! 2014 import email.parser 2015 fname = self.createMessage("cur", True) 2016 n = 0 2017 fid = open(fname) 2018 for msg in mailbox.PortableUnixMailbox(fid, 2019 email.parser.Parser().parse): 2020 n += 1 2021 self.assertEqual(msg["subject"], "Simple Test") 2022 self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE)) 2023 fid.close() 2024 self.assertEqual(n, 1) 2025 2026 ## End: classes from the original module (for backward compatibility). 2027 2028 2029 _sample_message = """\ 2030 Return-Path: <gkj (at] gregorykjohnson.com> 2031 X-Original-To: gkj+person@localhost 2032 Delivered-To: gkj+person@localhost 2033 Received: from localhost (localhost [127.0.0.1]) 2034 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2035 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2036 Delivered-To: gkj (at] sundance.gregorykjohnson.com 2037 Received: from localhost [127.0.0.1] 2038 by localhost with POP3 (fetchmail-6.2.5) 2039 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2040 Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2041 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2042 for <gkj (at] gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2043 Received: by andy.gregorykjohnson.com (Postfix, from userid 1000) 2044 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2045 Date: Wed, 13 Jul 2005 17:23:11 -0400 2046 From: "Gregory K. Johnson" <gkj (at] gregorykjohnson.com> 2047 To: gkj (at] gregorykjohnson.com 2048 Subject: Sample message 2049 Message-ID: <20050713212311.GC4701 (at] andy.gregorykjohnson.com> 2050 Mime-Version: 1.0 2051 Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+" 2052 Content-Disposition: inline 2053 User-Agent: Mutt/1.5.9i 2054 2055 2056 --NMuMz9nt05w80d4+ 2057 Content-Type: text/plain; charset=us-ascii 2058 Content-Disposition: inline 2059 2060 This is a sample message. 2061 2062 -- 2063 Gregory K. Johnson 2064 2065 --NMuMz9nt05w80d4+ 2066 Content-Type: application/octet-stream 2067 Content-Disposition: attachment; filename="text.gz" 2068 Content-Transfer-Encoding: base64 2069 2070 H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 2071 3FYlAAAA 2072 2073 --NMuMz9nt05w80d4+-- 2074 """ 2075 2076 _sample_headers = { 2077 "Return-Path":"<gkj (at] gregorykjohnson.com>", 2078 "X-Original-To":"gkj+person@localhost", 2079 "Delivered-To":"gkj+person@localhost", 2080 "Received":"""from localhost (localhost [127.0.0.1]) 2081 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2082 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2083 "Delivered-To":"gkj (at] sundance.gregorykjohnson.com", 2084 "Received":"""from localhost [127.0.0.1] 2085 by localhost with POP3 (fetchmail-6.2.5) 2086 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2087 "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2088 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2089 for <gkj (at] gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2090 "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000) 2091 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2092 "Date":"Wed, 13 Jul 2005 17:23:11 -0400", 2093 "From":""""Gregory K. Johnson" <gkj (at] gregorykjohnson.com>""", 2094 "To":"gkj (at] gregorykjohnson.com", 2095 "Subject":"Sample message", 2096 "Mime-Version":"1.0", 2097 "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""", 2098 "Content-Disposition":"inline", 2099 "User-Agent": "Mutt/1.5.9i" } 2100 2101 _sample_payloads = ("""This is a sample message. 2102 2103 -- 2104 Gregory K. Johnson 2105 """, 2106 """H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 2107 3FYlAAAA 2108 """) 2109 2110 2111 def test_main(): 2112 tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH, 2113 TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage, 2114 TestMHMessage, TestBabylMessage, TestMMDFMessage, 2115 TestMessageConversion, TestProxyFile, TestPartialFile, 2116 MaildirTestCase) 2117 test_support.run_unittest(*tests) 2118 test_support.reap_children() 2119 2120 2121 if __name__ == '__main__': 2122 test_main() 2123