Home | History | Annotate | Download | only in test
      1 import os
      2 import sys
      3 import time
      4 import stat
      5 import socket
      6 import email
      7 import email.message
      8 import re
      9 import StringIO
     10 from test import test_support
     11 import unittest
     12 import mailbox
     13 import glob
     14 try:
     15     import fcntl
     16 except ImportError:
     17     pass
     18 
     19 # Silence Py3k warning

     20 rfc822 = test_support.import_module('rfc822', deprecated=True)
     21 
     22 class TestBase(unittest.TestCase):
     23 
     24     def _check_sample(self, msg):
     25         # Inspect a mailbox.Message representation of the sample message

     26         self.assertIsInstance(msg, email.message.Message)
     27         self.assertIsInstance(msg, mailbox.Message)
     28         for key, value in _sample_headers.iteritems():
     29             self.assertIn(value, msg.get_all(key))
     30         self.assertTrue(msg.is_multipart())
     31         self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
     32         for i, payload in enumerate(_sample_payloads):
     33             part = msg.get_payload(i)
     34             self.assertIsInstance(part, email.message.Message)
     35             self.assertNotIsInstance(part, mailbox.Message)
     36             self.assertEqual(part.get_payload(), payload)
     37 
     38     def _delete_recursively(self, target):
     39         # Delete a file or delete a directory recursively

     40         if os.path.isdir(target):
     41             for path, dirs, files in os.walk(target, topdown=False):
     42                 for name in files:
     43                     os.remove(os.path.join(path, name))
     44                 for name in dirs:
     45                     os.rmdir(os.path.join(path, name))
     46             os.rmdir(target)
     47         elif os.path.exists(target):
     48             os.remove(target)
     49 
     50 
     51 class TestMailbox(TestBase):
     52 
     53     _factory = None     # Overridden by subclasses to reuse tests

     54     _template = 'From: foo\n\n%s'
     55 
     56     def setUp(self):
     57         self._path = test_support.TESTFN
     58         self._delete_recursively(self._path)
     59         self._box = self._factory(self._path)
     60 
     61     def tearDown(self):
     62         self._box.close()
     63         self._delete_recursively(self._path)
     64 
     65     def test_add(self):
     66         # Add copies of a sample message

     67         keys = []
     68         keys.append(self._box.add(self._template % 0))
     69         self.assertEqual(len(self._box), 1)
     70         keys.append(self._box.add(mailbox.Message(_sample_message)))
     71         self.assertEqual(len(self._box), 2)
     72         keys.append(self._box.add(email.message_from_string(_sample_message)))
     73         self.assertEqual(len(self._box), 3)
     74         keys.append(self._box.add(StringIO.StringIO(_sample_message)))
     75         self.assertEqual(len(self._box), 4)
     76         keys.append(self._box.add(_sample_message))
     77         self.assertEqual(len(self._box), 5)
     78         self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
     79         for i in (1, 2, 3, 4):
     80             self._check_sample(self._box[keys[i]])
     81 
     82     def test_remove(self):
     83         # Remove messages using remove()

     84         self._test_remove_or_delitem(self._box.remove)
     85 
     86     def test_delitem(self):
     87         # Remove messages using __delitem__()

     88         self._test_remove_or_delitem(self._box.__delitem__)
     89 
     90     def _test_remove_or_delitem(self, method):
     91         # (Used by test_remove() and test_delitem().)

     92         key0 = self._box.add(self._template % 0)
     93         key1 = self._box.add(self._template % 1)
     94         self.assertEqual(len(self._box), 2)
     95         method(key0)
     96         l = len(self._box)
     97         self.assertEqual(l, 1)
     98         self.assertRaises(KeyError, lambda: self._box[key0])
     99         self.assertRaises(KeyError, lambda: method(key0))
    100         self.assertEqual(self._box.get_string(key1), self._template % 1)
    101         key2 = self._box.add(self._template % 2)
    102         self.assertEqual(len(self._box), 2)
    103         method(key2)
    104         l = len(self._box)
    105         self.assertEqual(l, 1)
    106         self.assertRaises(KeyError, lambda: self._box[key2])
    107         self.assertRaises(KeyError, lambda: method(key2))
    108         self.assertEqual(self._box.get_string(key1), self._template % 1)
    109         method(key1)
    110         self.assertEqual(len(self._box), 0)
    111         self.assertRaises(KeyError, lambda: self._box[key1])
    112         self.assertRaises(KeyError, lambda: method(key1))
    113 
    114     def test_discard(self, repetitions=10):
    115         # Discard messages

    116         key0 = self._box.add(self._template % 0)
    117         key1 = self._box.add(self._template % 1)
    118         self.assertEqual(len(self._box), 2)
    119         self._box.discard(key0)
    120         self.assertEqual(len(self._box), 1)
    121         self.assertRaises(KeyError, lambda: self._box[key0])
    122         self._box.discard(key0)
    123         self.assertEqual(len(self._box), 1)
    124         self.assertRaises(KeyError, lambda: self._box[key0])
    125 
    126     def test_get(self):
    127         # Retrieve messages using get()

    128         key0 = self._box.add(self._template % 0)
    129         msg = self._box.get(key0)
    130         self.assertEqual(msg['from'], 'foo')
    131         self.assertEqual(msg.get_payload(), '0')
    132         self.assertIs(self._box.get('foo'), None)
    133         self.assertFalse(self._box.get('foo', False))
    134         self._box.close()
    135         self._box = self._factory(self._path, factory=rfc822.Message)
    136         key1 = self._box.add(self._template % 1)
    137         msg = self._box.get(key1)
    138         self.assertEqual(msg['from'], 'foo')
    139         self.assertEqual(msg.fp.read(), '1')
    140 
    141     def test_getitem(self):
    142         # Retrieve message using __getitem__()

    143         key0 = self._box.add(self._template % 0)
    144         msg = self._box[key0]
    145         self.assertEqual(msg['from'], 'foo')
    146         self.assertEqual(msg.get_payload(), '0')
    147         self.assertRaises(KeyError, lambda: self._box['foo'])
    148         self._box.discard(key0)
    149         self.assertRaises(KeyError, lambda: self._box[key0])
    150 
    151     def test_get_message(self):
    152         # Get Message representations of messages

    153         key0 = self._box.add(self._template % 0)
    154         key1 = self._box.add(_sample_message)
    155         msg0 = self._box.get_message(key0)
    156         self.assertIsInstance(msg0, mailbox.Message)
    157         self.assertEqual(msg0['from'], 'foo')
    158         self.assertEqual(msg0.get_payload(), '0')
    159         self._check_sample(self._box.get_message(key1))
    160 
    161     def test_get_string(self):
    162         # Get string representations of messages

    163         key0 = self._box.add(self._template % 0)
    164         key1 = self._box.add(_sample_message)
    165         self.assertEqual(self._box.get_string(key0), self._template % 0)
    166         self.assertEqual(self._box.get_string(key1), _sample_message)
    167 
    168     def test_get_file(self):
    169         # Get file representations of messages

    170         key0 = self._box.add(self._template % 0)
    171         key1 = self._box.add(_sample_message)
    172         self.assertEqual(self._box.get_file(key0).read().replace(os.linesep, '\n'),
    173                          self._template % 0)
    174         self.assertEqual(self._box.get_file(key1).read().replace(os.linesep, '\n'),
    175                          _sample_message)
    176 
    177     def test_iterkeys(self):
    178         # Get keys using iterkeys()

    179         self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)
    180 
    181     def test_keys(self):
    182         # Get keys using keys()

    183         self._check_iteration(self._box.keys, do_keys=True, do_values=False)
    184 
    185     def test_itervalues(self):
    186         # Get values using itervalues()

    187         self._check_iteration(self._box.itervalues, do_keys=False,
    188                               do_values=True)
    189 
    190     def test_iter(self):
    191         # Get values using __iter__()

    192         self._check_iteration(self._box.__iter__, do_keys=False,
    193                               do_values=True)
    194 
    195     def test_values(self):
    196         # Get values using values()

    197         self._check_iteration(self._box.values, do_keys=False, do_values=True)
    198 
    199     def test_iteritems(self):
    200         # Get keys and values using iteritems()

    201         self._check_iteration(self._box.iteritems, do_keys=True,
    202                               do_values=True)
    203 
    204     def test_items(self):
    205         # Get keys and values using items()

    206         self._check_iteration(self._box.items, do_keys=True, do_values=True)
    207 
    208     def _check_iteration(self, method, do_keys, do_values, repetitions=10):
    209         for value in method():
    210             self.fail("Not empty")
    211         keys, values = [], []
    212         for i in xrange(repetitions):
    213             keys.append(self._box.add(self._template % i))
    214             values.append(self._template % i)
    215         if do_keys and not do_values:
    216             returned_keys = list(method())
    217         elif do_values and not do_keys:
    218             returned_values = list(method())
    219         else:
    220             returned_keys, returned_values = [], []
    221             for key, value in method():
    222                 returned_keys.append(key)
    223                 returned_values.append(value)
    224         if do_keys:
    225             self.assertEqual(len(keys), len(returned_keys))
    226             self.assertEqual(set(keys), set(returned_keys))
    227         if do_values:
    228             count = 0
    229             for value in returned_values:
    230                 self.assertEqual(value['from'], 'foo')
    231                 self.assertTrue(int(value.get_payload()) < repetitions,
    232                                 (value.get_payload(), repetitions))
    233                 count += 1
    234             self.assertEqual(len(values), count)
    235 
    236     def test_has_key(self):
    237         # Check existence of keys using has_key()

    238         self._test_has_key_or_contains(self._box.has_key)
    239 
    240     def test_contains(self):
    241         # Check existence of keys using __contains__()

    242         self._test_has_key_or_contains(self._box.__contains__)
    243 
    244     def _test_has_key_or_contains(self, method):
    245         # (Used by test_has_key() and test_contains().)

    246         self.assertFalse(method('foo'))
    247         key0 = self._box.add(self._template % 0)
    248         self.assertTrue(method(key0))
    249         self.assertFalse(method('foo'))
    250         key1 = self._box.add(self._template % 1)
    251         self.assertTrue(method(key1))
    252         self.assertTrue(method(key0))
    253         self.assertFalse(method('foo'))
    254         self._box.remove(key0)
    255         self.assertFalse(method(key0))
    256         self.assertTrue(method(key1))
    257         self.assertFalse(method('foo'))
    258         self._box.remove(key1)
    259         self.assertFalse(method(key1))
    260         self.assertFalse(method(key0))
    261         self.assertFalse(method('foo'))
    262 
    263     def test_len(self, repetitions=10):
    264         # Get message count

    265         keys = []
    266         for i in xrange(repetitions):
    267             self.assertEqual(len(self._box), i)
    268             keys.append(self._box.add(self._template % i))
    269             self.assertEqual(len(self._box),  i + 1)
    270         for i in xrange(repetitions):
    271             self.assertEqual(len(self._box), repetitions - i)
    272             self._box.remove(keys[i])
    273             self.assertEqual(len(self._box), repetitions - i - 1)
    274 
    275     def test_set_item(self):
    276         # Modify messages using __setitem__()

    277         key0 = self._box.add(self._template % 'original 0')
    278         self.assertEqual(self._box.get_string(key0),
    279                          self._template % 'original 0')
    280         key1 = self._box.add(self._template % 'original 1')
    281         self.assertEqual(self._box.get_string(key1),
    282                          self._template % 'original 1')
    283         self._box[key0] = self._template % 'changed 0'
    284         self.assertEqual(self._box.get_string(key0),
    285                          self._template % 'changed 0')
    286         self._box[key1] = self._template % 'changed 1'
    287         self.assertEqual(self._box.get_string(key1),
    288                          self._template % 'changed 1')
    289         self._box[key0] = _sample_message
    290         self._check_sample(self._box[key0])
    291         self._box[key1] = self._box[key0]
    292         self._check_sample(self._box[key1])
    293         self._box[key0] = self._template % 'original 0'
    294         self.assertEqual(self._box.get_string(key0),
    295                          self._template % 'original 0')
    296         self._check_sample(self._box[key1])
    297         self.assertRaises(KeyError,
    298                           lambda: self._box.__setitem__('foo', 'bar'))
    299         self.assertRaises(KeyError, lambda: self._box['foo'])
    300         self.assertEqual(len(self._box), 2)
    301 
    302     def test_clear(self, iterations=10):
    303         # Remove all messages using clear()

    304         keys = []
    305         for i in xrange(iterations):
    306             self._box.add(self._template % i)
    307         for i, key in enumerate(keys):
    308             self.assertEqual(self._box.get_string(key), self._template % i)
    309         self._box.clear()
    310         self.assertEqual(len(self._box), 0)
    311         for i, key in enumerate(keys):
    312             self.assertRaises(KeyError, lambda: self._box.get_string(key))
    313 
    314     def test_pop(self):
    315         # Get and remove a message using pop()

    316         key0 = self._box.add(self._template % 0)
    317         self.assertIn(key0, self._box)
    318         key1 = self._box.add(self._template % 1)
    319         self.assertIn(key1, self._box)
    320         self.assertEqual(self._box.pop(key0).get_payload(), '0')
    321         self.assertNotIn(key0, self._box)
    322         self.assertIn(key1, self._box)
    323         key2 = self._box.add(self._template % 2)
    324         self.assertIn(key2, self._box)
    325         self.assertEqual(self._box.pop(key2).get_payload(), '2')
    326         self.assertNotIn(key2, self._box)
    327         self.assertIn(key1, self._box)
    328         self.assertEqual(self._box.pop(key1).get_payload(), '1')
    329         self.assertNotIn(key1, self._box)
    330         self.assertEqual(len(self._box), 0)
    331 
    332     def test_popitem(self, iterations=10):
    333         # Get and remove an arbitrary (key, message) using popitem()

    334         keys = []
    335         for i in xrange(10):
    336             keys.append(self._box.add(self._template % i))
    337         seen = []
    338         for i in xrange(10):
    339             key, msg = self._box.popitem()
    340             self.assertIn(key, keys)
    341             self.assertNotIn(key, seen)
    342             seen.append(key)
    343             self.assertEqual(int(msg.get_payload()), keys.index(key))
    344         self.assertEqual(len(self._box), 0)
    345         for key in keys:
    346             self.assertRaises(KeyError, lambda: self._box[key])
    347 
    348     def test_update(self):
    349         # Modify multiple messages using update()

    350         key0 = self._box.add(self._template % 'original 0')
    351         key1 = self._box.add(self._template % 'original 1')
    352         key2 = self._box.add(self._template % 'original 2')
    353         self._box.update({key0: self._template % 'changed 0',
    354                           key2: _sample_message})
    355         self.assertEqual(len(self._box), 3)
    356         self.assertEqual(self._box.get_string(key0),
    357                          self._template % 'changed 0')
    358         self.assertEqual(self._box.get_string(key1),
    359                          self._template % 'original 1')
    360         self._check_sample(self._box[key2])
    361         self._box.update([(key2, self._template % 'changed 2'),
    362                     (key1, self._template % 'changed 1'),
    363                     (key0, self._template % 'original 0')])
    364         self.assertEqual(len(self._box), 3)
    365         self.assertEqual(self._box.get_string(key0),
    366                          self._template % 'original 0')
    367         self.assertEqual(self._box.get_string(key1),
    368                          self._template % 'changed 1')
    369         self.assertEqual(self._box.get_string(key2),
    370                          self._template % 'changed 2')
    371         self.assertRaises(KeyError,
    372                           lambda: self._box.update({'foo': 'bar',
    373                                           key0: self._template % "changed 0"}))
    374         self.assertEqual(len(self._box), 3)
    375         self.assertEqual(self._box.get_string(key0),
    376                          self._template % "changed 0")
    377         self.assertEqual(self._box.get_string(key1),
    378                          self._template % "changed 1")
    379         self.assertEqual(self._box.get_string(key2),
    380                          self._template % "changed 2")
    381 
    382     def test_flush(self):
    383         # Write changes to disk

    384         self._test_flush_or_close(self._box.flush, True)
    385 
    386     def test_lock_unlock(self):
    387         # Lock and unlock the mailbox

    388         self.assertFalse(os.path.exists(self._get_lock_path()))
    389         self._box.lock()
    390         self.assertTrue(os.path.exists(self._get_lock_path()))
    391         self._box.unlock()
    392         self.assertFalse(os.path.exists(self._get_lock_path()))
    393 
    394     def test_close(self):
    395         # Close mailbox and flush changes to disk

    396         self._test_flush_or_close(self._box.close, False)
    397 
    398     def _test_flush_or_close(self, method, should_call_close):
    399         contents = [self._template % i for i in xrange(3)]
    400         self._box.add(contents[0])
    401         self._box.add(contents[1])
    402         self._box.add(contents[2])
    403         method()
    404         if should_call_close:
    405             self._box.close()
    406         self._box = self._factory(self._path)
    407         keys = self._box.keys()
    408         self.assertEqual(len(keys), 3)
    409         for key in keys:
    410             self.assertIn(self._box.get_string(key), contents)
    411 
    412     def test_dump_message(self):
    413         # Write message representations to disk

    414         for input in (email.message_from_string(_sample_message),
    415                       _sample_message, StringIO.StringIO(_sample_message)):
    416             output = StringIO.StringIO()
    417             self._box._dump_message(input, output)
    418             self.assertEqual(output.getvalue(),
    419                              _sample_message.replace('\n', os.linesep))
    420         output = StringIO.StringIO()
    421         self.assertRaises(TypeError,
    422                           lambda: self._box._dump_message(None, output))
    423 
    424     def _get_lock_path(self):
    425         # Return the path of the dot lock file. May be overridden.

    426         return self._path + '.lock'
    427 
    428 
    429 class TestMailboxSuperclass(TestBase):
    430 
    431     def test_notimplemented(self):
    432         # Test that all Mailbox methods raise NotImplementedException.

    433         box = mailbox.Mailbox('path')
    434         self.assertRaises(NotImplementedError, lambda: box.add(''))
    435         self.assertRaises(NotImplementedError, lambda: box.remove(''))
    436         self.assertRaises(NotImplementedError, lambda: box.__delitem__(''))
    437         self.assertRaises(NotImplementedError, lambda: box.discard(''))
    438         self.assertRaises(NotImplementedError, lambda: box.__setitem__('', ''))
    439         self.assertRaises(NotImplementedError, lambda: box.iterkeys())
    440         self.assertRaises(NotImplementedError, lambda: box.keys())
    441         self.assertRaises(NotImplementedError, lambda: box.itervalues().next())
    442         self.assertRaises(NotImplementedError, lambda: box.__iter__().next())
    443         self.assertRaises(NotImplementedError, lambda: box.values())
    444         self.assertRaises(NotImplementedError, lambda: box.iteritems().next())
    445         self.assertRaises(NotImplementedError, lambda: box.items())
    446         self.assertRaises(NotImplementedError, lambda: box.get(''))
    447         self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
    448         self.assertRaises(NotImplementedError, lambda: box.get_message(''))
    449         self.assertRaises(NotImplementedError, lambda: box.get_string(''))
    450         self.assertRaises(NotImplementedError, lambda: box.get_file(''))
    451         self.assertRaises(NotImplementedError, lambda: box.has_key(''))
    452         self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
    453         self.assertRaises(NotImplementedError, lambda: box.__len__())
    454         self.assertRaises(NotImplementedError, lambda: box.clear())
    455         self.assertRaises(NotImplementedError, lambda: box.pop(''))
    456         self.assertRaises(NotImplementedError, lambda: box.popitem())
    457         self.assertRaises(NotImplementedError, lambda: box.update((('', ''),)))
    458         self.assertRaises(NotImplementedError, lambda: box.flush())
    459         self.assertRaises(NotImplementedError, lambda: box.lock())
    460         self.assertRaises(NotImplementedError, lambda: box.unlock())
    461         self.assertRaises(NotImplementedError, lambda: box.close())
    462 
    463 
    464 class TestMaildir(TestMailbox):
    465 
    466     _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
    467 
    468     def setUp(self):
    469         TestMailbox.setUp(self)
    470         if os.name in ('nt', 'os2') or sys.platform == 'cygwin':
    471             self._box.colon = '!'
    472 
    473     def test_add_MM(self):
    474         # Add a MaildirMessage instance

    475         msg = mailbox.MaildirMessage(self._template % 0)
    476         msg.set_subdir('cur')
    477         msg.set_info('foo')
    478         key = self._box.add(msg)
    479         self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %
    480                                                  (key, self._box.colon))))
    481 
    482     def test_get_MM(self):
    483         # Get a MaildirMessage instance

    484         msg = mailbox.MaildirMessage(self._template % 0)
    485         msg.set_subdir('cur')
    486         msg.set_flags('RF')
    487         key = self._box.add(msg)
    488         msg_returned = self._box.get_message(key)
    489         self.assertIsInstance(msg_returned, mailbox.MaildirMessage)
    490         self.assertEqual(msg_returned.get_subdir(), 'cur')
    491         self.assertEqual(msg_returned.get_flags(), 'FR')
    492 
    493     def test_set_MM(self):
    494         # Set with a MaildirMessage instance

    495         msg0 = mailbox.MaildirMessage(self._template % 0)
    496         msg0.set_flags('TP')
    497         key = self._box.add(msg0)
    498         msg_returned = self._box.get_message(key)
    499         self.assertEqual(msg_returned.get_subdir(), 'new')
    500         self.assertEqual(msg_returned.get_flags(), 'PT')
    501         msg1 = mailbox.MaildirMessage(self._template % 1)
    502         self._box[key] = msg1
    503         msg_returned = self._box.get_message(key)
    504         self.assertEqual(msg_returned.get_subdir(), 'new')
    505         self.assertEqual(msg_returned.get_flags(), '')
    506         self.assertEqual(msg_returned.get_payload(), '1')
    507         msg2 = mailbox.MaildirMessage(self._template % 2)
    508         msg2.set_info('2,S')
    509         self._box[key] = msg2
    510         self._box[key] = self._template % 3
    511         msg_returned = self._box.get_message(key)
    512         self.assertEqual(msg_returned.get_subdir(), 'new')
    513         self.assertEqual(msg_returned.get_flags(), 'S')
    514         self.assertEqual(msg_returned.get_payload(), '3')
    515 
    516     def test_consistent_factory(self):
    517         # Add a message.

    518         msg = mailbox.MaildirMessage(self._template % 0)
    519         msg.set_subdir('cur')
    520         msg.set_flags('RF')
    521         key = self._box.add(msg)
    522 
    523         # Create new mailbox with

    524         class FakeMessage(mailbox.MaildirMessage):
    525             pass
    526         box = mailbox.Maildir(self._path, factory=FakeMessage)
    527         box.colon = self._box.colon
    528         msg2 = box.get_message(key)
    529         self.assertIsInstance(msg2, FakeMessage)
    530 
    531     def test_initialize_new(self):
    532         # Initialize a non-existent mailbox

    533         self.tearDown()
    534         self._box = mailbox.Maildir(self._path)
    535         self._check_basics(factory=rfc822.Message)
    536         self._delete_recursively(self._path)
    537         self._box = self._factory(self._path, factory=None)
    538         self._check_basics()
    539 
    540     def test_initialize_existing(self):
    541         # Initialize an existing mailbox

    542         self.tearDown()
    543         for subdir in '', 'tmp', 'new', 'cur':
    544             os.mkdir(os.path.normpath(os.path.join(self._path, subdir)))
    545         self._box = mailbox.Maildir(self._path)
    546         self._check_basics(factory=rfc822.Message)
    547         self._box = mailbox.Maildir(self._path, factory=None)
    548         self._check_basics()
    549 
    550     def _check_basics(self, factory=None):
    551         # (Used by test_open_new() and test_open_existing().)

    552         self.assertEqual(self._box._path, os.path.abspath(self._path))
    553         self.assertEqual(self._box._factory, factory)
    554         for subdir in '', 'tmp', 'new', 'cur':
    555             path = os.path.join(self._path, subdir)
    556             mode = os.stat(path)[stat.ST_MODE]
    557             self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
    558 
    559     def test_list_folders(self):
    560         # List folders

    561         self._box.add_folder('one')
    562         self._box.add_folder('two')
    563         self._box.add_folder('three')
    564         self.assertEqual(len(self._box.list_folders()), 3)
    565         self.assertEqual(set(self._box.list_folders()),
    566                          set(('one', 'two', 'three')))
    567 
    568     def test_get_folder(self):
    569         # Open folders

    570         self._box.add_folder('foo.bar')
    571         folder0 = self._box.get_folder('foo.bar')
    572         folder0.add(self._template % 'bar')
    573         self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar')))
    574         folder1 = self._box.get_folder('foo.bar')
    575         self.assertEqual(folder1.get_string(folder1.keys()[0]),
    576                          self._template % 'bar')
    577 
    578     def test_add_and_remove_folders(self):
    579         # Delete folders

    580         self._box.add_folder('one')
    581         self._box.add_folder('two')
    582         self.assertEqual(len(self._box.list_folders()), 2)
    583         self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
    584         self._box.remove_folder('one')
    585         self.assertEqual(len(self._box.list_folders()), 1)
    586         self.assertEqual(set(self._box.list_folders()), set(('two',)))
    587         self._box.add_folder('three')
    588         self.assertEqual(len(self._box.list_folders()), 2)
    589         self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
    590         self._box.remove_folder('three')
    591         self.assertEqual(len(self._box.list_folders()), 1)
    592         self.assertEqual(set(self._box.list_folders()), set(('two',)))
    593         self._box.remove_folder('two')
    594         self.assertEqual(len(self._box.list_folders()), 0)
    595         self.assertEqual(self._box.list_folders(), [])
    596 
    597     def test_clean(self):
    598         # Remove old files from 'tmp'

    599         foo_path = os.path.join(self._path, 'tmp', 'foo')
    600         bar_path = os.path.join(self._path, 'tmp', 'bar')
    601         with open(foo_path, 'w') as f:
    602             f.write("@")
    603         with open(bar_path, 'w') as f:
    604             f.write("@")
    605         self._box.clean()
    606         self.assertTrue(os.path.exists(foo_path))
    607         self.assertTrue(os.path.exists(bar_path))
    608         foo_stat = os.stat(foo_path)
    609         os.utime(foo_path, (time.time() - 129600 - 2,
    610                             foo_stat.st_mtime))
    611         self._box.clean()
    612         self.assertFalse(os.path.exists(foo_path))
    613         self.assertTrue(os.path.exists(bar_path))
    614 
    615     def test_create_tmp(self, repetitions=10):
    616         # Create files in tmp directory

    617         hostname = socket.gethostname()
    618         if '/' in hostname:
    619             hostname = hostname.replace('/', r'\057')
    620         if ':' in hostname:
    621             hostname = hostname.replace(':', r'\072')
    622         pid = os.getpid()
    623         pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"
    624                              r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)")
    625         previous_groups = None
    626         for x in xrange(repetitions):
    627             tmp_file = self._box._create_tmp()
    628             head, tail = os.path.split(tmp_file.name)
    629             self.assertEqual(head, os.path.abspath(os.path.join(self._path,
    630                                                                 "tmp")),
    631                              "File in wrong location: '%s'" % head)
    632             match = pattern.match(tail)
    633             self.assertTrue(match is not None, "Invalid file name: '%s'" % tail)
    634             groups = match.groups()
    635             if previous_groups is not None:
    636                 self.assertTrue(int(groups[0] >= previous_groups[0]),
    637                              "Non-monotonic seconds: '%s' before '%s'" %
    638                              (previous_groups[0], groups[0]))
    639                 self.assertTrue(int(groups[1] >= previous_groups[1]) or
    640                              groups[0] != groups[1],
    641                              "Non-monotonic milliseconds: '%s' before '%s'" %
    642                              (previous_groups[1], groups[1]))
    643                 self.assertTrue(int(groups[2]) == pid,
    644                              "Process ID mismatch: '%s' should be '%s'" %
    645                              (groups[2], pid))
    646                 self.assertTrue(int(groups[3]) == int(previous_groups[3]) + 1,
    647                              "Non-sequential counter: '%s' before '%s'" %
    648                              (previous_groups[3], groups[3]))
    649                 self.assertTrue(groups[4] == hostname,
    650                              "Host name mismatch: '%s' should be '%s'" %
    651                              (groups[4], hostname))
    652             previous_groups = groups
    653             tmp_file.write(_sample_message)
    654             tmp_file.seek(0)
    655             self.assertTrue(tmp_file.read() == _sample_message)
    656             tmp_file.close()
    657         file_count = len(os.listdir(os.path.join(self._path, "tmp")))
    658         self.assertTrue(file_count == repetitions,
    659                      "Wrong file count: '%s' should be '%s'" %
    660                      (file_count, repetitions))
    661 
    662     def test_refresh(self):
    663         # Update the table of contents

    664         self.assertEqual(self._box._toc, {})
    665         key0 = self._box.add(self._template % 0)
    666         key1 = self._box.add(self._template % 1)
    667         self.assertEqual(self._box._toc, {})
    668         self._box._refresh()
    669         self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
    670                                           key1: os.path.join('new', key1)})
    671         key2 = self._box.add(self._template % 2)
    672         self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
    673                                           key1: os.path.join('new', key1)})
    674         self._box._refresh()
    675         self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
    676                                           key1: os.path.join('new', key1),
    677                                           key2: os.path.join('new', key2)})
    678 
    679     def test_lookup(self):
    680         # Look up message subpaths in the TOC

    681         self.assertRaises(KeyError, lambda: self._box._lookup('foo'))
    682         key0 = self._box.add(self._template % 0)
    683         self.assertEqual(self._box._lookup(key0), os.path.join('new', key0))
    684         os.remove(os.path.join(self._path, 'new', key0))
    685         self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)})
    686         # Be sure that the TOC is read back from disk (see issue #6896

    687         # about bad mtime behaviour on some systems).

    688         self._box.flush()
    689         self.assertRaises(KeyError, lambda: self._box._lookup(key0))
    690         self.assertEqual(self._box._toc, {})
    691 
    692     def test_lock_unlock(self):
    693         # Lock and unlock the mailbox. For Maildir, this does nothing.

    694         self._box.lock()
    695         self._box.unlock()
    696 
    697     def test_folder (self):
    698         # Test for bug #1569790: verify that folders returned by .get_folder()

    699         # use the same factory function.

    700         def dummy_factory (s):
    701             return None
    702         box = self._factory(self._path, factory=dummy_factory)
    703         folder = box.add_folder('folder1')
    704         self.assertIs(folder._factory, dummy_factory)
    705 
    706         folder1_alias = box.get_folder('folder1')
    707         self.assertIs(folder1_alias._factory, dummy_factory)
    708 
    709     def test_directory_in_folder (self):
    710         # Test that mailboxes still work if there's a stray extra directory

    711         # in a folder.

    712         for i in range(10):
    713             self._box.add(mailbox.Message(_sample_message))
    714 
    715         # Create a stray directory

    716         os.mkdir(os.path.join(self._path, 'cur', 'stray-dir'))
    717 
    718         # Check that looping still works with the directory present.

    719         for msg in self._box:
    720             pass
    721 
    722     def test_file_permissions(self):
    723         # Verify that message files are created without execute permissions

    724         if not hasattr(os, "stat") or not hasattr(os, "umask"):
    725             return
    726         msg = mailbox.MaildirMessage(self._template % 0)
    727         orig_umask = os.umask(0)
    728         try:
    729             key = self._box.add(msg)
    730         finally:
    731             os.umask(orig_umask)
    732         path = os.path.join(self._path, self._box._lookup(key))
    733         mode = os.stat(path).st_mode
    734         self.assertEqual(mode & 0111, 0)
    735 
    736     def test_folder_file_perms(self):
    737         # From bug #3228, we want to verify that the file created inside a Maildir

    738         # subfolder isn't marked as executable.

    739         if not hasattr(os, "stat") or not hasattr(os, "umask"):
    740             return
    741 
    742         orig_umask = os.umask(0)
    743         try:
    744             subfolder = self._box.add_folder('subfolder')
    745         finally:
    746             os.umask(orig_umask)
    747 
    748         path = os.path.join(subfolder._path, 'maildirfolder')
    749         st = os.stat(path)
    750         perms = st.st_mode
    751         self.assertFalse((perms & 0111)) # Execute bits should all be off.

    752 
    753     def test_reread(self):
    754 
    755         # Put the last modified times more than two seconds into the past

    756         # (because mtime may have only a two second granularity).

    757         for subdir in ('cur', 'new'):
    758             os.utime(os.path.join(self._box._path, subdir),
    759                      (time.time()-5,)*2)
    760 
    761         # Because mtime has a two second granularity in worst case (FAT), a

    762         # refresh is done unconditionally if called for within

    763         # two-second-plus-a-bit of the last one, just in case the mbox has

    764         # changed; so now we have to wait for that interval to expire.

    765         time.sleep(2.01 + self._box._skewfactor)
    766 
    767         # Re-reading causes the ._toc attribute to be assigned a new dictionary

    768         # object, so we'll check that the ._toc attribute isn't a different

    769         # object.

    770         orig_toc = self._box._toc
    771         def refreshed():
    772             return self._box._toc is not orig_toc
    773 
    774         self._box._refresh()
    775         self.assertFalse(refreshed())
    776 
    777         # Now, write something into cur and remove it.  This changes

    778         # the mtime and should cause a re-read.

    779         filename = os.path.join(self._path, 'cur', 'stray-file')
    780         f = open(filename, 'w')
    781         f.close()
    782         os.unlink(filename)
    783         self._box._refresh()
    784         self.assertTrue(refreshed())
    785 
    786 class _TestMboxMMDF(TestMailbox):
    787 
    788     def tearDown(self):
    789         self._box.close()
    790         self._delete_recursively(self._path)
    791         for lock_remnant in glob.glob(self._path + '.*'):
    792             test_support.unlink(lock_remnant)
    793 
    794     def test_add_from_string(self):
    795         # Add a string starting with 'From ' to the mailbox

    796         key = self._box.add('From foo@bar blah\nFrom: foo\n\n0')
    797         self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
    798         self.assertEqual(self._box[key].get_payload(), '0')
    799 
    800     def test_add_mbox_or_mmdf_message(self):
    801         # Add an mboxMessage or MMDFMessage

    802         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
    803             msg = class_('From foo@bar blah\nFrom: foo\n\n0')
    804             key = self._box.add(msg)
    805 
    806     def test_open_close_open(self):
    807         # Open and inspect previously-created mailbox

    808         values = [self._template % i for i in xrange(3)]
    809         for value in values:
    810             self._box.add(value)
    811         self._box.close()
    812         mtime = os.path.getmtime(self._path)
    813         self._box = self._factory(self._path)
    814         self.assertEqual(len(self._box), 3)
    815         for key in self._box.iterkeys():
    816             self.assertIn(self._box.get_string(key), values)
    817         self._box.close()
    818         self.assertEqual(mtime, os.path.getmtime(self._path))
    819 
    820     def test_add_and_close(self):
    821         # Verifying that closing a mailbox doesn't change added items

    822         self._box.add(_sample_message)
    823         for i in xrange(3):
    824             self._box.add(self._template % i)
    825         self._box.add(_sample_message)
    826         self._box._file.flush()
    827         self._box._file.seek(0)
    828         contents = self._box._file.read()
    829         self._box.close()
    830         with open(self._path, 'rb') as f:
    831             self.assertEqual(contents, f.read())
    832         self._box = self._factory(self._path)
    833 
    834     def test_lock_conflict(self):
    835         # Fork off a subprocess that will lock the file for 2 seconds,

    836         # unlock it, and then exit.

    837         if not hasattr(os, 'fork'):
    838             return
    839         pid = os.fork()
    840         if pid == 0:
    841             # In the child, lock the mailbox.

    842             self._box.lock()
    843             time.sleep(2)
    844             self._box.unlock()
    845             os._exit(0)
    846 
    847         # In the parent, sleep a bit to give the child time to acquire

    848         # the lock.

    849         time.sleep(0.5)
    850         try:
    851             self.assertRaises(mailbox.ExternalClashError,
    852                               self._box.lock)
    853         finally:
    854             # Wait for child to exit.  Locking should now succeed.

    855             exited_pid, status = os.waitpid(pid, 0)
    856 
    857         self._box.lock()
    858         self._box.unlock()
    859 
    860     def test_relock(self):
    861         # Test case for bug #1575506: the mailbox class was locking the

    862         # wrong file object in its flush() method.

    863         msg = "Subject: sub\n\nbody\n"
    864         key1 = self._box.add(msg)
    865         self._box.flush()
    866         self._box.close()
    867 
    868         self._box = self._factory(self._path)
    869         self._box.lock()
    870         key2 = self._box.add(msg)
    871         self._box.flush()
    872         self.assertTrue(self._box._locked)
    873         self._box.close()
    874 
    875 
    876 class TestMbox(_TestMboxMMDF):
    877 
    878     _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
    879 
    880     def test_file_perms(self):
    881         # From bug #3228, we want to verify that the mailbox file isn't executable,

    882         # even if the umask is set to something that would leave executable bits set.

    883         # We only run this test on platforms that support umask.

    884         if hasattr(os, 'umask') and hasattr(os, 'stat'):
    885             try:
    886                 old_umask = os.umask(0077)
    887                 self._box.close()
    888                 os.unlink(self._path)
    889                 self._box = mailbox.mbox(self._path, create=True)
    890                 self._box.add('')
    891                 self._box.close()
    892             finally:
    893                 os.umask(old_umask)
    894 
    895             st = os.stat(self._path)
    896             perms = st.st_mode
    897             self.assertFalse((perms & 0111)) # Execute bits should all be off.

    898 
    899 class TestMMDF(_TestMboxMMDF):
    900 
    901     _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
    902 
    903 
    904 class TestMH(TestMailbox):
    905 
    906     _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
    907 
    908     def test_list_folders(self):
    909         # List folders

    910         self._box.add_folder('one')
    911         self._box.add_folder('two')
    912         self._box.add_folder('three')
    913         self.assertEqual(len(self._box.list_folders()), 3)
    914         self.assertEqual(set(self._box.list_folders()),
    915                          set(('one', 'two', 'three')))
    916 
    917     def test_get_folder(self):
    918         # Open folders

    919         def dummy_factory (s):
    920             return None
    921         self._box = self._factory(self._path, dummy_factory)
    922 
    923         new_folder = self._box.add_folder('foo.bar')
    924         folder0 = self._box.get_folder('foo.bar')
    925         folder0.add(self._template % 'bar')
    926         self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar')))
    927         folder1 = self._box.get_folder('foo.bar')
    928         self.assertEqual(folder1.get_string(folder1.keys()[0]),
    929                          self._template % 'bar')
    930 
    931         # Test for bug #1569790: verify that folders returned by .get_folder()

    932         # use the same factory function.

    933         self.assertIs(new_folder._factory, self._box._factory)
    934         self.assertIs(folder0._factory, self._box._factory)
    935 
    936     def test_add_and_remove_folders(self):
    937         # Delete folders

    938         self._box.add_folder('one')
    939         self._box.add_folder('two')
    940         self.assertEqual(len(self._box.list_folders()), 2)
    941         self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
    942         self._box.remove_folder('one')
    943         self.assertEqual(len(self._box.list_folders()), 1)
    944         self.assertEqual(set(self._box.list_folders()), set(('two', )))
    945         self._box.add_folder('three')
    946         self.assertEqual(len(self._box.list_folders()), 2)
    947         self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
    948         self._box.remove_folder('three')
    949         self.assertEqual(len(self._box.list_folders()), 1)
    950         self.assertEqual(set(self._box.list_folders()), set(('two', )))
    951         self._box.remove_folder('two')
    952         self.assertEqual(len(self._box.list_folders()), 0)
    953         self.assertEqual(self._box.list_folders(), [])
    954 
    955     def test_sequences(self):
    956         # Get and set sequences

    957         self.assertEqual(self._box.get_sequences(), {})
    958         msg0 = mailbox.MHMessage(self._template % 0)
    959         msg0.add_sequence('foo')
    960         key0 = self._box.add(msg0)
    961         self.assertEqual(self._box.get_sequences(), {'foo':[key0]})
    962         msg1 = mailbox.MHMessage(self._template % 1)
    963         msg1.set_sequences(['bar', 'replied', 'foo'])
    964         key1 = self._box.add(msg1)
    965         self.assertEqual(self._box.get_sequences(),
    966                          {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]})
    967         msg0.set_sequences(['flagged'])
    968         self._box[key0] = msg0
    969         self.assertEqual(self._box.get_sequences(),
    970                          {'foo':[key1], 'bar':[key1], 'replied':[key1],
    971                           'flagged':[key0]})
    972         self._box.remove(key1)
    973         self.assertEqual(self._box.get_sequences(), {'flagged':[key0]})
    974 
    975     def test_issue2625(self):
    976         msg0 = mailbox.MHMessage(self._template % 0)
    977         msg0.add_sequence('foo')
    978         key0 = self._box.add(msg0)
    979         refmsg0 = self._box.get_message(key0)
    980 
    981     def test_issue7627(self):
    982         msg0 = mailbox.MHMessage(self._template % 0)
    983         key0 = self._box.add(msg0)
    984         self._box.lock()
    985         self._box.remove(key0)
    986         self._box.unlock()
    987 
    988     def test_pack(self):
    989         # Pack the contents of the mailbox

    990         msg0 = mailbox.MHMessage(self._template % 0)
    991         msg1 = mailbox.MHMessage(self._template % 1)
    992         msg2 = mailbox.MHMessage(self._template % 2)
    993         msg3 = mailbox.MHMessage(self._template % 3)
    994         msg0.set_sequences(['foo', 'unseen'])
    995         msg1.set_sequences(['foo'])
    996         msg2.set_sequences(['foo', 'flagged'])
    997         msg3.set_sequences(['foo', 'bar', 'replied'])
    998         key0 = self._box.add(msg0)
    999         key1 = self._box.add(msg1)
   1000         key2 = self._box.add(msg2)
   1001         key3 = self._box.add(msg3)
   1002         self.assertEqual(self._box.get_sequences(),
   1003                          {'foo':[key0,key1,key2,key3], 'unseen':[key0],
   1004                           'flagged':[key2], 'bar':[key3], 'replied':[key3]})
   1005         self._box.remove(key2)
   1006         self.assertEqual(self._box.get_sequences(),
   1007                          {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3],
   1008                           'replied':[key3]})
   1009         self._box.pack()
   1010         self.assertEqual(self._box.keys(), [1, 2, 3])
   1011         key0 = key0
   1012         key1 = key0 + 1
   1013         key2 = key1 + 1
   1014         self.assertEqual(self._box.get_sequences(),
   1015                      {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})
   1016 
   1017         # Test case for packing while holding the mailbox locked.

   1018         key0 = self._box.add(msg1)
   1019         key1 = self._box.add(msg1)
   1020         key2 = self._box.add(msg1)
   1021         key3 = self._box.add(msg1)
   1022 
   1023         self._box.remove(key0)
   1024         self._box.remove(key2)
   1025         self._box.lock()
   1026         self._box.pack()
   1027         self._box.unlock()
   1028         self.assertEqual(self._box.get_sequences(),
   1029                          {'foo':[1, 2, 3, 4, 5],
   1030                           'unseen':[1], 'bar':[3], 'replied':[3]})
   1031 
   1032     def _get_lock_path(self):
   1033         return os.path.join(self._path, '.mh_sequences.lock')
   1034 
   1035 
   1036 class TestBabyl(TestMailbox):
   1037 
   1038     _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
   1039 
   1040     def tearDown(self):
   1041         self._box.close()
   1042         self._delete_recursively(self._path)
   1043         for lock_remnant in glob.glob(self._path + '.*'):
   1044             test_support.unlink(lock_remnant)
   1045 
   1046     def test_labels(self):
   1047         # Get labels from the mailbox

   1048         self.assertEqual(self._box.get_labels(), [])
   1049         msg0 = mailbox.BabylMessage(self._template % 0)
   1050         msg0.add_label('foo')
   1051         key0 = self._box.add(msg0)
   1052         self.assertEqual(self._box.get_labels(), ['foo'])
   1053         msg1 = mailbox.BabylMessage(self._template % 1)
   1054         msg1.set_labels(['bar', 'answered', 'foo'])
   1055         key1 = self._box.add(msg1)
   1056         self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar']))
   1057         msg0.set_labels(['blah', 'filed'])
   1058         self._box[key0] = msg0
   1059         self.assertEqual(set(self._box.get_labels()),
   1060                          set(['foo', 'bar', 'blah']))
   1061         self._box.remove(key1)
   1062         self.assertEqual(set(self._box.get_labels()), set(['blah']))
   1063 
   1064 
   1065 class TestMessage(TestBase):
   1066 
   1067     _factory = mailbox.Message      # Overridden by subclasses to reuse tests

   1068 
   1069     def setUp(self):
   1070         self._path = test_support.TESTFN
   1071 
   1072     def tearDown(self):
   1073         self._delete_recursively(self._path)
   1074 
   1075     def test_initialize_with_eMM(self):
   1076         # Initialize based on email.message.Message instance

   1077         eMM = email.message_from_string(_sample_message)
   1078         msg = self._factory(eMM)
   1079         self._post_initialize_hook(msg)
   1080         self._check_sample(msg)
   1081 
   1082     def test_initialize_with_string(self):
   1083         # Initialize based on string

   1084         msg = self._factory(_sample_message)
   1085         self._post_initialize_hook(msg)
   1086         self._check_sample(msg)
   1087 
   1088     def test_initialize_with_file(self):
   1089         # Initialize based on contents of file

   1090         with open(self._path, 'w+') as f:
   1091             f.write(_sample_message)
   1092             f.seek(0)
   1093             msg = self._factory(f)
   1094             self._post_initialize_hook(msg)
   1095             self._check_sample(msg)
   1096 
   1097     def test_initialize_with_nothing(self):
   1098         # Initialize without arguments

   1099         msg = self._factory()
   1100         self._post_initialize_hook(msg)
   1101         self.assertIsInstance(msg, email.message.Message)
   1102         self.assertIsInstance(msg, mailbox.Message)
   1103         self.assertIsInstance(msg, self._factory)
   1104         self.assertEqual(msg.keys(), [])
   1105         self.assertFalse(msg.is_multipart())
   1106         self.assertEqual(msg.get_payload(), None)
   1107 
   1108     def test_initialize_incorrectly(self):
   1109         # Initialize with invalid argument

   1110         self.assertRaises(TypeError, lambda: self._factory(object()))
   1111 
   1112     def test_become_message(self):
   1113         # Take on the state of another message

   1114         eMM = email.message_from_string(_sample_message)
   1115         msg = self._factory()
   1116         msg._become_message(eMM)
   1117         self._check_sample(msg)
   1118 
   1119     def test_explain_to(self):
   1120         # Copy self's format-specific data to other message formats.

   1121         # This test is superficial; better ones are in TestMessageConversion.

   1122         msg = self._factory()
   1123         for class_ in (mailbox.Message, mailbox.MaildirMessage,
   1124                        mailbox.mboxMessage, mailbox.MHMessage,
   1125                        mailbox.BabylMessage, mailbox.MMDFMessage):
   1126             other_msg = class_()
   1127             msg._explain_to(other_msg)
   1128         other_msg = email.message.Message()
   1129         self.assertRaises(TypeError, lambda: msg._explain_to(other_msg))
   1130 
   1131     def _post_initialize_hook(self, msg):
   1132         # Overridden by subclasses to check extra things after initialization

   1133         pass
   1134 
   1135 
   1136 class TestMaildirMessage(TestMessage):
   1137 
   1138     _factory = mailbox.MaildirMessage
   1139 
   1140     def _post_initialize_hook(self, msg):
   1141         self.assertEqual(msg._subdir, 'new')
   1142         self.assertEqual(msg._info,'')
   1143 
   1144     def test_subdir(self):
   1145         # Use get_subdir() and set_subdir()

   1146         msg = mailbox.MaildirMessage(_sample_message)
   1147         self.assertEqual(msg.get_subdir(), 'new')
   1148         msg.set_subdir('cur')
   1149         self.assertEqual(msg.get_subdir(), 'cur')
   1150         msg.set_subdir('new')
   1151         self.assertEqual(msg.get_subdir(), 'new')
   1152         self.assertRaises(ValueError, lambda: msg.set_subdir('tmp'))
   1153         self.assertEqual(msg.get_subdir(), 'new')
   1154         msg.set_subdir('new')
   1155         self.assertEqual(msg.get_subdir(), 'new')
   1156         self._check_sample(msg)
   1157 
   1158     def test_flags(self):
   1159         # Use get_flags(), set_flags(), add_flag(), remove_flag()

   1160         msg = mailbox.MaildirMessage(_sample_message)
   1161         self.assertEqual(msg.get_flags(), '')
   1162         self.assertEqual(msg.get_subdir(), 'new')
   1163         msg.set_flags('F')
   1164         self.assertEqual(msg.get_subdir(), 'new')
   1165         self.assertEqual(msg.get_flags(), 'F')
   1166         msg.set_flags('SDTP')
   1167         self.assertEqual(msg.get_flags(), 'DPST')
   1168         msg.add_flag('FT')
   1169         self.assertEqual(msg.get_flags(), 'DFPST')
   1170         msg.remove_flag('TDRP')
   1171         self.assertEqual(msg.get_flags(), 'FS')
   1172         self.assertEqual(msg.get_subdir(), 'new')
   1173         self._check_sample(msg)
   1174 
   1175     def test_date(self):
   1176         # Use get_date() and set_date()

   1177         msg = mailbox.MaildirMessage(_sample_message)
   1178         diff = msg.get_date() - time.time()
   1179         self.assertTrue(abs(diff) < 60, diff)
   1180         msg.set_date(0.0)
   1181         self.assertEqual(msg.get_date(), 0.0)
   1182 
   1183     def test_info(self):
   1184         # Use get_info() and set_info()

   1185         msg = mailbox.MaildirMessage(_sample_message)
   1186         self.assertEqual(msg.get_info(), '')
   1187         msg.set_info('1,foo=bar')
   1188         self.assertEqual(msg.get_info(), '1,foo=bar')
   1189         self.assertRaises(TypeError, lambda: msg.set_info(None))
   1190         self._check_sample(msg)
   1191 
   1192     def test_info_and_flags(self):
   1193         # Test interaction of info and flag methods

   1194         msg = mailbox.MaildirMessage(_sample_message)
   1195         self.assertEqual(msg.get_info(), '')
   1196         msg.set_flags('SF')
   1197         self.assertEqual(msg.get_flags(), 'FS')
   1198         self.assertEqual(msg.get_info(), '2,FS')
   1199         msg.set_info('1,')
   1200         self.assertEqual(msg.get_flags(), '')
   1201         self.assertEqual(msg.get_info(), '1,')
   1202         msg.remove_flag('RPT')
   1203         self.assertEqual(msg.get_flags(), '')
   1204         self.assertEqual(msg.get_info(), '1,')
   1205         msg.add_flag('D')
   1206         self.assertEqual(msg.get_flags(), 'D')
   1207         self.assertEqual(msg.get_info(), '2,D')
   1208         self._check_sample(msg)
   1209 
   1210 
   1211 class _TestMboxMMDFMessage(TestMessage):
   1212 
   1213     _factory = mailbox._mboxMMDFMessage
   1214 
   1215     def _post_initialize_hook(self, msg):
   1216         self._check_from(msg)
   1217 
   1218     def test_initialize_with_unixfrom(self):
   1219         # Initialize with a message that already has a _unixfrom attribute

   1220         msg = mailbox.Message(_sample_message)
   1221         msg.set_unixfrom('From foo@bar blah')
   1222         msg = mailbox.mboxMessage(msg)
   1223         self.assertEqual(msg.get_from(), 'foo@bar blah')
   1224 
   1225     def test_from(self):
   1226         # Get and set "From " line

   1227         msg = mailbox.mboxMessage(_sample_message)
   1228         self._check_from(msg)
   1229         msg.set_from('foo bar')
   1230         self.assertEqual(msg.get_from(), 'foo bar')
   1231         msg.set_from('foo@bar', True)
   1232         self._check_from(msg, 'foo@bar')
   1233         msg.set_from('blah@temp', time.localtime())
   1234         self._check_from(msg, 'blah@temp')
   1235 
   1236     def test_flags(self):
   1237         # Use get_flags(), set_flags(), add_flag(), remove_flag()

   1238         msg = mailbox.mboxMessage(_sample_message)
   1239         self.assertEqual(msg.get_flags(), '')
   1240         msg.set_flags('F')
   1241         self.assertEqual(msg.get_flags(), 'F')
   1242         msg.set_flags('XODR')
   1243         self.assertEqual(msg.get_flags(), 'RODX')
   1244         msg.add_flag('FA')
   1245         self.assertEqual(msg.get_flags(), 'RODFAX')
   1246         msg.remove_flag('FDXA')
   1247         self.assertEqual(msg.get_flags(), 'RO')
   1248         self._check_sample(msg)
   1249 
   1250     def _check_from(self, msg, sender=None):
   1251         # Check contents of "From " line

   1252         if sender is None:
   1253             sender = "MAILER-DAEMON"
   1254         self.assertTrue(re.match(sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:"
   1255                                  r"\d{2} \d{4}", msg.get_from()))
   1256 
   1257 
   1258 class TestMboxMessage(_TestMboxMMDFMessage):
   1259 
   1260     _factory = mailbox.mboxMessage
   1261 
   1262 
   1263 class TestMHMessage(TestMessage):
   1264 
   1265     _factory = mailbox.MHMessage
   1266 
   1267     def _post_initialize_hook(self, msg):
   1268         self.assertEqual(msg._sequences, [])
   1269 
   1270     def test_sequences(self):
   1271         # Get, set, join, and leave sequences

   1272         msg = mailbox.MHMessage(_sample_message)
   1273         self.assertEqual(msg.get_sequences(), [])
   1274         msg.set_sequences(['foobar'])
   1275         self.assertEqual(msg.get_sequences(), ['foobar'])
   1276         msg.set_sequences([])
   1277         self.assertEqual(msg.get_sequences(), [])
   1278         msg.add_sequence('unseen')
   1279         self.assertEqual(msg.get_sequences(), ['unseen'])
   1280         msg.add_sequence('flagged')
   1281         self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
   1282         msg.add_sequence('flagged')
   1283         self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
   1284         msg.remove_sequence('unseen')
   1285         self.assertEqual(msg.get_sequences(), ['flagged'])
   1286         msg.add_sequence('foobar')
   1287         self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
   1288         msg.remove_sequence('replied')
   1289         self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
   1290         msg.set_sequences(['foobar', 'replied'])
   1291         self.assertEqual(msg.get_sequences(), ['foobar', 'replied'])
   1292 
   1293 
   1294 class TestBabylMessage(TestMessage):
   1295 
   1296     _factory = mailbox.BabylMessage
   1297 
   1298     def _post_initialize_hook(self, msg):
   1299         self.assertEqual(msg._labels, [])
   1300 
   1301     def test_labels(self):
   1302         # Get, set, join, and leave labels

   1303         msg = mailbox.BabylMessage(_sample_message)
   1304         self.assertEqual(msg.get_labels(), [])
   1305         msg.set_labels(['foobar'])
   1306         self.assertEqual(msg.get_labels(), ['foobar'])
   1307         msg.set_labels([])
   1308         self.assertEqual(msg.get_labels(), [])
   1309         msg.add_label('filed')
   1310         self.assertEqual(msg.get_labels(), ['filed'])
   1311         msg.add_label('resent')
   1312         self.assertEqual(msg.get_labels(), ['filed', 'resent'])
   1313         msg.add_label('resent')
   1314         self.assertEqual(msg.get_labels(), ['filed', 'resent'])
   1315         msg.remove_label('filed')
   1316         self.assertEqual(msg.get_labels(), ['resent'])
   1317         msg.add_label('foobar')
   1318         self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
   1319         msg.remove_label('unseen')
   1320         self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
   1321         msg.set_labels(['foobar', 'answered'])
   1322         self.assertEqual(msg.get_labels(), ['foobar', 'answered'])
   1323 
   1324     def test_visible(self):
   1325         # Get, set, and update visible headers

   1326         msg = mailbox.BabylMessage(_sample_message)
   1327         visible = msg.get_visible()
   1328         self.assertEqual(visible.keys(), [])
   1329         self.assertIs(visible.get_payload(), None)
   1330         visible['User-Agent'] = 'FooBar 1.0'
   1331         visible['X-Whatever'] = 'Blah'
   1332         self.assertEqual(msg.get_visible().keys(), [])
   1333         msg.set_visible(visible)
   1334         visible = msg.get_visible()
   1335         self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
   1336         self.assertEqual(visible['User-Agent'], 'FooBar 1.0')
   1337         self.assertEqual(visible['X-Whatever'], 'Blah')
   1338         self.assertIs(visible.get_payload(), None)
   1339         msg.update_visible()
   1340         self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
   1341         self.assertIs(visible.get_payload(), None)
   1342         visible = msg.get_visible()
   1343         self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To',
   1344                                           'Subject'])
   1345         for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
   1346             self.assertEqual(visible[header], msg[header])
   1347 
   1348 
   1349 class TestMMDFMessage(_TestMboxMMDFMessage):
   1350 
   1351     _factory = mailbox.MMDFMessage
   1352 
   1353 
   1354 class TestMessageConversion(TestBase):
   1355 
   1356     def test_plain_to_x(self):
   1357         # Convert Message to all formats

   1358         for class_ in (mailbox.Message, mailbox.MaildirMessage,
   1359                        mailbox.mboxMessage, mailbox.MHMessage,
   1360                        mailbox.BabylMessage, mailbox.MMDFMessage):
   1361             msg_plain = mailbox.Message(_sample_message)
   1362             msg = class_(msg_plain)
   1363             self._check_sample(msg)
   1364 
   1365     def test_x_to_plain(self):
   1366         # Convert all formats to Message

   1367         for class_ in (mailbox.Message, mailbox.MaildirMessage,
   1368                        mailbox.mboxMessage, mailbox.MHMessage,
   1369                        mailbox.BabylMessage, mailbox.MMDFMessage):
   1370             msg = class_(_sample_message)
   1371             msg_plain = mailbox.Message(msg)
   1372             self._check_sample(msg_plain)
   1373 
   1374     def test_x_to_invalid(self):
   1375         # Convert all formats to an invalid format

   1376         for class_ in (mailbox.Message, mailbox.MaildirMessage,
   1377                        mailbox.mboxMessage, mailbox.MHMessage,
   1378                        mailbox.BabylMessage, mailbox.MMDFMessage):
   1379             self.assertRaises(TypeError, lambda: class_(False))
   1380 
   1381     def test_maildir_to_maildir(self):
   1382         # Convert MaildirMessage to MaildirMessage

   1383         msg_maildir = mailbox.MaildirMessage(_sample_message)
   1384         msg_maildir.set_flags('DFPRST')
   1385         msg_maildir.set_subdir('cur')
   1386         date = msg_maildir.get_date()
   1387         msg = mailbox.MaildirMessage(msg_maildir)
   1388         self._check_sample(msg)
   1389         self.assertEqual(msg.get_flags(), 'DFPRST')
   1390         self.assertEqual(msg.get_subdir(), 'cur')
   1391         self.assertEqual(msg.get_date(), date)
   1392 
   1393     def test_maildir_to_mboxmmdf(self):
   1394         # Convert MaildirMessage to mboxmessage and MMDFMessage

   1395         pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),
   1396                  ('T', 'D'), ('DFPRST', 'RDFA'))
   1397         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1398             msg_maildir = mailbox.MaildirMessage(_sample_message)
   1399             msg_maildir.set_date(0.0)
   1400             for setting, result in pairs:
   1401                 msg_maildir.set_flags(setting)
   1402                 msg = class_(msg_maildir)
   1403                 self.assertEqual(msg.get_flags(), result)
   1404                 self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' %
   1405                                  time.asctime(time.gmtime(0.0)))
   1406             msg_maildir.set_subdir('cur')
   1407             self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA')
   1408 
   1409     def test_maildir_to_mh(self):
   1410         # Convert MaildirMessage to MHMessage

   1411         msg_maildir = mailbox.MaildirMessage(_sample_message)
   1412         pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']),
   1413                  ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),
   1414                  ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))
   1415         for setting, result in pairs:
   1416             msg_maildir.set_flags(setting)
   1417             self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(),
   1418                              result)
   1419 
   1420     def test_maildir_to_babyl(self):
   1421         # Convert MaildirMessage to Babyl

   1422         msg_maildir = mailbox.MaildirMessage(_sample_message)
   1423         pairs = (('D', ['unseen']), ('F', ['unseen']),
   1424                  ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),
   1425                  ('S', []), ('T', ['unseen', 'deleted']),
   1426                  ('DFPRST', ['deleted', 'answered', 'forwarded']))
   1427         for setting, result in pairs:
   1428             msg_maildir.set_flags(setting)
   1429             self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(),
   1430                              result)
   1431 
   1432     def test_mboxmmdf_to_maildir(self):
   1433         # Convert mboxMessage and MMDFMessage to MaildirMessage

   1434         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1435             msg_mboxMMDF = class_(_sample_message)
   1436             msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0))
   1437             pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),
   1438                      ('RODFA', 'FRST'))
   1439             for setting, result in pairs:
   1440                 msg_mboxMMDF.set_flags(setting)
   1441                 msg = mailbox.MaildirMessage(msg_mboxMMDF)
   1442                 self.assertEqual(msg.get_flags(), result)
   1443                 self.assertEqual(msg.get_date(), 0.0)
   1444             msg_mboxMMDF.set_flags('O')
   1445             self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(),
   1446                              'cur')
   1447 
   1448     def test_mboxmmdf_to_mboxmmdf(self):
   1449         # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage

   1450         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1451             msg_mboxMMDF = class_(_sample_message)
   1452             msg_mboxMMDF.set_flags('RODFA')
   1453             msg_mboxMMDF.set_from('foo@bar')
   1454             for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1455                 msg2 = class2_(msg_mboxMMDF)
   1456                 self.assertEqual(msg2.get_flags(), 'RODFA')
   1457                 self.assertEqual(msg2.get_from(), 'foo@bar')
   1458 
   1459     def test_mboxmmdf_to_mh(self):
   1460         # Convert mboxMessage and MMDFMessage to MHMessage

   1461         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1462             msg_mboxMMDF = class_(_sample_message)
   1463             pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']),
   1464                      ('F', ['unseen', 'flagged']),
   1465                      ('A', ['unseen', 'replied']),
   1466                      ('RODFA', ['replied', 'flagged']))
   1467             for setting, result in pairs:
   1468                 msg_mboxMMDF.set_flags(setting)
   1469                 self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(),
   1470                                  result)
   1471 
   1472     def test_mboxmmdf_to_babyl(self):
   1473         # Convert mboxMessage and MMDFMessage to BabylMessage

   1474         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1475             msg = class_(_sample_message)
   1476             pairs = (('R', []), ('O', ['unseen']),
   1477                      ('D', ['unseen', 'deleted']), ('F', ['unseen']),
   1478                      ('A', ['unseen', 'answered']),
   1479                      ('RODFA', ['deleted', 'answered']))
   1480             for setting, result in pairs:
   1481                 msg.set_flags(setting)
   1482                 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
   1483 
   1484     def test_mh_to_maildir(self):
   1485         # Convert MHMessage to MaildirMessage

   1486         pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))
   1487         for setting, result in pairs:
   1488             msg = mailbox.MHMessage(_sample_message)
   1489             msg.add_sequence(setting)
   1490             self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
   1491             self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
   1492         msg = mailbox.MHMessage(_sample_message)
   1493         msg.add_sequence('unseen')
   1494         msg.add_sequence('replied')
   1495         msg.add_sequence('flagged')
   1496         self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR')
   1497         self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
   1498 
   1499     def test_mh_to_mboxmmdf(self):
   1500         # Convert MHMessage to mboxMessage and MMDFMessage

   1501         pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))
   1502         for setting, result in pairs:
   1503             msg = mailbox.MHMessage(_sample_message)
   1504             msg.add_sequence(setting)
   1505             for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1506                 self.assertEqual(class_(msg).get_flags(), result)
   1507         msg = mailbox.MHMessage(_sample_message)
   1508         msg.add_sequence('unseen')
   1509         msg.add_sequence('replied')
   1510         msg.add_sequence('flagged')
   1511         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1512             self.assertEqual(class_(msg).get_flags(), 'OFA')
   1513 
   1514     def test_mh_to_mh(self):
   1515         # Convert MHMessage to MHMessage

   1516         msg = mailbox.MHMessage(_sample_message)
   1517         msg.add_sequence('unseen')
   1518         msg.add_sequence('replied')
   1519         msg.add_sequence('flagged')
   1520         self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
   1521                          ['unseen', 'replied', 'flagged'])
   1522 
   1523     def test_mh_to_babyl(self):
   1524         # Convert MHMessage to BabylMessage

   1525         pairs = (('unseen', ['unseen']), ('replied', ['answered']),
   1526                  ('flagged', []))
   1527         for setting, result in pairs:
   1528             msg = mailbox.MHMessage(_sample_message)
   1529             msg.add_sequence(setting)
   1530             self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
   1531         msg = mailbox.MHMessage(_sample_message)
   1532         msg.add_sequence('unseen')
   1533         msg.add_sequence('replied')
   1534         msg.add_sequence('flagged')
   1535         self.assertEqual(mailbox.BabylMessage(msg).get_labels(),
   1536                          ['unseen', 'answered'])
   1537 
   1538     def test_babyl_to_maildir(self):
   1539         # Convert BabylMessage to MaildirMessage

   1540         pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),
   1541                  ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),
   1542                  ('resent', 'PS'))
   1543         for setting, result in pairs:
   1544             msg = mailbox.BabylMessage(_sample_message)
   1545             msg.add_label(setting)
   1546             self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
   1547             self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
   1548         msg = mailbox.BabylMessage(_sample_message)
   1549         for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
   1550                       'edited', 'resent'):
   1551             msg.add_label(label)
   1552         self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT')
   1553         self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
   1554 
   1555     def test_babyl_to_mboxmmdf(self):
   1556         # Convert BabylMessage to mboxMessage and MMDFMessage

   1557         pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),
   1558                  ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),
   1559                  ('resent', 'RO'))
   1560         for setting, result in pairs:
   1561             for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1562                 msg = mailbox.BabylMessage(_sample_message)
   1563                 msg.add_label(setting)
   1564                 self.assertEqual(class_(msg).get_flags(), result)
   1565         msg = mailbox.BabylMessage(_sample_message)
   1566         for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
   1567                       'edited', 'resent'):
   1568             msg.add_label(label)
   1569         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1570             self.assertEqual(class_(msg).get_flags(), 'ODA')
   1571 
   1572     def test_babyl_to_mh(self):
   1573         # Convert BabylMessage to MHMessage

   1574         pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []),
   1575                  ('answered', ['replied']), ('forwarded', []), ('edited', []),
   1576                  ('resent', []))
   1577         for setting, result in pairs:
   1578             msg = mailbox.BabylMessage(_sample_message)
   1579             msg.add_label(setting)
   1580             self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result)
   1581         msg = mailbox.BabylMessage(_sample_message)
   1582         for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
   1583                       'edited', 'resent'):
   1584             msg.add_label(label)
   1585         self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
   1586                          ['unseen', 'replied'])
   1587 
   1588     def test_babyl_to_babyl(self):
   1589         # Convert BabylMessage to BabylMessage

   1590         msg = mailbox.BabylMessage(_sample_message)
   1591         msg.update_visible()
   1592         for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
   1593                       'edited', 'resent'):
   1594             msg.add_label(label)
   1595         msg2 = mailbox.BabylMessage(msg)
   1596         self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed',
   1597                                              'answered', 'forwarded', 'edited',
   1598                                              'resent'])
   1599         self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys())
   1600         for key in msg.get_visible().keys():
   1601             self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key])
   1602 
   1603 
   1604 class TestProxyFileBase(TestBase):
   1605 
   1606     def _test_read(self, proxy):
   1607         # Read by byte

   1608         proxy.seek(0)
   1609         self.assertEqual(proxy.read(), 'bar')
   1610         proxy.seek(1)
   1611         self.assertEqual(proxy.read(), 'ar')
   1612         proxy.seek(0)
   1613         self.assertEqual(proxy.read(2), 'ba')
   1614         proxy.seek(1)
   1615         self.assertEqual(proxy.read(-1), 'ar')
   1616         proxy.seek(2)
   1617         self.assertEqual(proxy.read(1000), 'r')
   1618 
   1619     def _test_readline(self, proxy):
   1620         # Read by line

   1621         proxy.seek(0)
   1622         self.assertEqual(proxy.readline(), 'foo' + os.linesep)
   1623         self.assertEqual(proxy.readline(), 'bar' + os.linesep)
   1624         self.assertEqual(proxy.readline(), 'fred' + os.linesep)
   1625         self.assertEqual(proxy.readline(), 'bob')
   1626         proxy.seek(2)
   1627         self.assertEqual(proxy.readline(), 'o' + os.linesep)
   1628         proxy.seek(6 + 2 * len(os.linesep))
   1629         self.assertEqual(proxy.readline(), 'fred' + os.linesep)
   1630         proxy.seek(6 + 2 * len(os.linesep))
   1631         self.assertEqual(proxy.readline(2), 'fr')
   1632         self.assertEqual(proxy.readline(-10), 'ed' + os.linesep)
   1633 
   1634     def _test_readlines(self, proxy):
   1635         # Read multiple lines

   1636         proxy.seek(0)
   1637         self.assertEqual(proxy.readlines(), ['foo' + os.linesep,
   1638                                             'bar' + os.linesep,
   1639                                             'fred' + os.linesep, 'bob'])
   1640         proxy.seek(0)
   1641         self.assertEqual(proxy.readlines(2), ['foo' + os.linesep])
   1642         proxy.seek(3 + len(os.linesep))
   1643         self.assertEqual(proxy.readlines(4 + len(os.linesep)),
   1644                          ['bar' + os.linesep, 'fred' + os.linesep])
   1645         proxy.seek(3)
   1646         self.assertEqual(proxy.readlines(1000), [os.linesep, 'bar' + os.linesep,
   1647                                                  'fred' + os.linesep, 'bob'])
   1648 
   1649     def _test_iteration(self, proxy):
   1650         # Iterate by line

   1651         proxy.seek(0)
   1652         iterator = iter(proxy)
   1653         self.assertEqual(list(iterator),
   1654             ['foo' + os.linesep, 'bar' + os.linesep, 'fred' + os.linesep, 'bob'])
   1655 
   1656     def _test_seek_and_tell(self, proxy):
   1657         # Seek and use tell to check position

   1658         proxy.seek(3)
   1659         self.assertEqual(proxy.tell(), 3)
   1660         self.assertEqual(proxy.read(len(os.linesep)), os.linesep)
   1661         proxy.seek(2, 1)
   1662         self.assertEqual(proxy.read(1 + len(os.linesep)), 'r' + os.linesep)
   1663         proxy.seek(-3 - len(os.linesep), 2)
   1664         self.assertEqual(proxy.read(3), 'bar')
   1665         proxy.seek(2, 0)
   1666         self.assertEqual(proxy.read(), 'o' + os.linesep + 'bar' + os.linesep)
   1667         proxy.seek(100)
   1668         self.assertEqual(proxy.read(), '')
   1669 
   1670     def _test_close(self, proxy):
   1671         # Close a file

   1672         proxy.close()
   1673         self.assertRaises(AttributeError, lambda: proxy.close())
   1674 
   1675 
   1676 class TestProxyFile(TestProxyFileBase):
   1677 
   1678     def setUp(self):
   1679         self._path = test_support.TESTFN
   1680         self._file = open(self._path, 'wb+')
   1681 
   1682     def tearDown(self):
   1683         self._file.close()
   1684         self._delete_recursively(self._path)
   1685 
   1686     def test_initialize(self):
   1687         # Initialize and check position

   1688         self._file.write('foo')
   1689         pos = self._file.tell()
   1690         proxy0 = mailbox._ProxyFile(self._file)
   1691         self.assertEqual(proxy0.tell(), pos)
   1692         self.assertEqual(self._file.tell(), pos)
   1693         proxy1 = mailbox._ProxyFile(self._file, 0)
   1694         self.assertEqual(proxy1.tell(), 0)
   1695         self.assertEqual(self._file.tell(), pos)
   1696 
   1697     def test_read(self):
   1698         self._file.write('bar')
   1699         self._test_read(mailbox._ProxyFile(self._file))
   1700 
   1701     def test_readline(self):
   1702         self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
   1703                                                   os.linesep))
   1704         self._test_readline(mailbox._ProxyFile(self._file))
   1705 
   1706     def test_readlines(self):
   1707         self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
   1708                                                   os.linesep))
   1709         self._test_readlines(mailbox._ProxyFile(self._file))
   1710 
   1711     def test_iteration(self):
   1712         self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
   1713                                                   os.linesep))
   1714         self._test_iteration(mailbox._ProxyFile(self._file))
   1715 
   1716     def test_seek_and_tell(self):
   1717         self._file.write('foo%sbar%s' % (os.linesep, os.linesep))
   1718         self._test_seek_and_tell(mailbox._ProxyFile(self._file))
   1719 
   1720     def test_close(self):
   1721         self._file.write('foo%sbar%s' % (os.linesep, os.linesep))
   1722         self._test_close(mailbox._ProxyFile(self._file))
   1723 
   1724 
   1725 class TestPartialFile(TestProxyFileBase):
   1726 
   1727     def setUp(self):
   1728         self._path = test_support.TESTFN
   1729         self._file = open(self._path, 'wb+')
   1730 
   1731     def tearDown(self):
   1732         self._file.close()
   1733         self._delete_recursively(self._path)
   1734 
   1735     def test_initialize(self):
   1736         # Initialize and check position

   1737         self._file.write('foo' + os.linesep + 'bar')
   1738         pos = self._file.tell()
   1739         proxy = mailbox._PartialFile(self._file, 2, 5)
   1740         self.assertEqual(proxy.tell(), 0)
   1741         self.assertEqual(self._file.tell(), pos)
   1742 
   1743     def test_read(self):
   1744         self._file.write('***bar***')
   1745         self._test_read(mailbox._PartialFile(self._file, 3, 6))
   1746 
   1747     def test_readline(self):
   1748         self._file.write('!!!!!foo%sbar%sfred%sbob!!!!!' %
   1749                          (os.linesep, os.linesep, os.linesep))
   1750         self._test_readline(mailbox._PartialFile(self._file, 5,
   1751                                                  18 + 3 * len(os.linesep)))
   1752 
   1753     def test_readlines(self):
   1754         self._file.write('foo%sbar%sfred%sbob?????' %
   1755                          (os.linesep, os.linesep, os.linesep))
   1756         self._test_readlines(mailbox._PartialFile(self._file, 0,
   1757                                                   13 + 3 * len(os.linesep)))
   1758 
   1759     def test_iteration(self):
   1760         self._file.write('____foo%sbar%sfred%sbob####' %
   1761                          (os.linesep, os.linesep, os.linesep))
   1762         self._test_iteration(mailbox._PartialFile(self._file, 4,
   1763                                                   17 + 3 * len(os.linesep)))
   1764 
   1765     def test_seek_and_tell(self):
   1766         self._file.write('(((foo%sbar%s$$$' % (os.linesep, os.linesep))
   1767         self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,
   1768                                                       9 + 2 * len(os.linesep)))
   1769 
   1770     def test_close(self):
   1771         self._file.write('&foo%sbar%s^' % (os.linesep, os.linesep))
   1772         self._test_close(mailbox._PartialFile(self._file, 1,
   1773                                               6 + 3 * len(os.linesep)))
   1774 
   1775 
   1776 ## Start: tests from the original module (for backward compatibility).

   1777 
   1778 FROM_ = "From some.body (at] dummy.domain  Sat Jul 24 13:43:35 2004\n"
   1779 DUMMY_MESSAGE = """\
   1780 From: some.body (at] dummy.domain
   1781 To: me (at] my.domain
   1782 Subject: Simple Test
   1783 
   1784 This is a dummy message.
   1785 """
   1786 
   1787 class MaildirTestCase(unittest.TestCase):
   1788 
   1789     def setUp(self):
   1790         # create a new maildir mailbox to work with:

   1791         self._dir = test_support.TESTFN
   1792         os.mkdir(self._dir)
   1793         os.mkdir(os.path.join(self._dir, "cur"))
   1794         os.mkdir(os.path.join(self._dir, "tmp"))
   1795         os.mkdir(os.path.join(self._dir, "new"))
   1796         self._counter = 1
   1797         self._msgfiles = []
   1798 
   1799     def tearDown(self):
   1800         map(os.unlink, self._msgfiles)
   1801         os.rmdir(os.path.join(self._dir, "cur"))
   1802         os.rmdir(os.path.join(self._dir, "tmp"))
   1803         os.rmdir(os.path.join(self._dir, "new"))
   1804         os.rmdir(self._dir)
   1805 
   1806     def createMessage(self, dir, mbox=False):
   1807         t = int(time.time() % 1000000)
   1808         pid = self._counter
   1809         self._counter += 1
   1810         filename = os.extsep.join((str(t), str(pid), "myhostname", "mydomain"))
   1811         tmpname = os.path.join(self._dir, "tmp", filename)
   1812         newname = os.path.join(self._dir, dir, filename)
   1813         with open(tmpname, "w") as fp:
   1814             self._msgfiles.append(tmpname)
   1815             if mbox:
   1816                 fp.write(FROM_)
   1817             fp.write(DUMMY_MESSAGE)
   1818         if hasattr(os, "link"):
   1819             os.link(tmpname, newname)
   1820         else:
   1821             with open(newname, "w") as fp:
   1822                 fp.write(DUMMY_MESSAGE)
   1823         self._msgfiles.append(newname)
   1824         return tmpname
   1825 
   1826     def test_empty_maildir(self):
   1827         """Test an empty maildir mailbox"""
   1828         # Test for regression on bug #117490:

   1829         # Make sure the boxes attribute actually gets set.

   1830         self.mbox = mailbox.Maildir(test_support.TESTFN)
   1831         #self.assertTrue(hasattr(self.mbox, "boxes"))

   1832         #self.assertTrue(len(self.mbox.boxes) == 0)

   1833         self.assertIs(self.mbox.next(), None)
   1834         self.assertIs(self.mbox.next(), None)
   1835 
   1836     def test_nonempty_maildir_cur(self):
   1837         self.createMessage("cur")
   1838         self.mbox = mailbox.Maildir(test_support.TESTFN)
   1839         #self.assertTrue(len(self.mbox.boxes) == 1)

   1840         self.assertIsNot(self.mbox.next(), None)
   1841         self.assertIs(self.mbox.next(), None)
   1842         self.assertIs(self.mbox.next(), None)
   1843 
   1844     def test_nonempty_maildir_new(self):
   1845         self.createMessage("new")
   1846         self.mbox = mailbox.Maildir(test_support.TESTFN)
   1847         #self.assertTrue(len(self.mbox.boxes) == 1)

   1848         self.assertIsNot(self.mbox.next(), None)
   1849         self.assertIs(self.mbox.next(), None)
   1850         self.assertIs(self.mbox.next(), None)
   1851 
   1852     def test_nonempty_maildir_both(self):
   1853         self.createMessage("cur")
   1854         self.createMessage("new")
   1855         self.mbox = mailbox.Maildir(test_support.TESTFN)
   1856         #self.assertTrue(len(self.mbox.boxes) == 2)

   1857         self.assertIsNot(self.mbox.next(), None)
   1858         self.assertIsNot(self.mbox.next(), None)
   1859         self.assertIs(self.mbox.next(), None)
   1860         self.assertIs(self.mbox.next(), None)
   1861 
   1862     def test_unix_mbox(self):
   1863         ### should be better!

   1864         import email.parser
   1865         fname = self.createMessage("cur", True)
   1866         n = 0
   1867         for msg in mailbox.PortableUnixMailbox(open(fname),
   1868                                                email.parser.Parser().parse):
   1869             n += 1
   1870             self.assertEqual(msg["subject"], "Simple Test")
   1871             self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE))
   1872         self.assertEqual(n, 1)
   1873 
   1874 ## End: classes from the original module (for backward compatibility).

   1875 
   1876 
   1877 _sample_message = """\
   1878 Return-Path: <gkj (at] gregorykjohnson.com>
   1879 X-Original-To: gkj+person@localhost
   1880 Delivered-To: gkj+person@localhost
   1881 Received: from localhost (localhost [127.0.0.1])
   1882         by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
   1883         for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
   1884 Delivered-To: gkj (at] sundance.gregorykjohnson.com
   1885 Received: from localhost [127.0.0.1]
   1886         by localhost with POP3 (fetchmail-6.2.5)
   1887         for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
   1888 Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
   1889         by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
   1890         for <gkj (at] gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
   1891 Received: by andy.gregorykjohnson.com (Postfix, from userid 1000)
   1892         id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
   1893 Date: Wed, 13 Jul 2005 17:23:11 -0400
   1894 From: "Gregory K. Johnson" <gkj (at] gregorykjohnson.com>
   1895 To: gkj (at] gregorykjohnson.com
   1896 Subject: Sample message
   1897 Message-ID: <20050713212311.GC4701 (at] andy.gregorykjohnson.com>
   1898 Mime-Version: 1.0
   1899 Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"
   1900 Content-Disposition: inline
   1901 User-Agent: Mutt/1.5.9i
   1902 
   1903 
   1904 --NMuMz9nt05w80d4+
   1905 Content-Type: text/plain; charset=us-ascii
   1906 Content-Disposition: inline
   1907 
   1908 This is a sample message.
   1909 
   1910 --
   1911 Gregory K. Johnson
   1912 
   1913 --NMuMz9nt05w80d4+
   1914 Content-Type: application/octet-stream
   1915 Content-Disposition: attachment; filename="text.gz"
   1916 Content-Transfer-Encoding: base64
   1917 
   1918 H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
   1919 3FYlAAAA
   1920 
   1921 --NMuMz9nt05w80d4+--
   1922 """
   1923 
   1924 _sample_headers = {
   1925     "Return-Path":"<gkj (at] gregorykjohnson.com>",
   1926     "X-Original-To":"gkj+person@localhost",
   1927     "Delivered-To":"gkj+person@localhost",
   1928     "Received":"""from localhost (localhost [127.0.0.1])
   1929         by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
   1930         for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
   1931     "Delivered-To":"gkj (at] sundance.gregorykjohnson.com",
   1932     "Received":"""from localhost [127.0.0.1]
   1933         by localhost with POP3 (fetchmail-6.2.5)
   1934         for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
   1935     "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
   1936         by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
   1937         for <gkj (at] gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
   1938     "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)
   1939         id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
   1940     "Date":"Wed, 13 Jul 2005 17:23:11 -0400",
   1941     "From":""""Gregory K. Johnson" <gkj (at] gregorykjohnson.com>""",
   1942     "To":"gkj (at] gregorykjohnson.com",
   1943     "Subject":"Sample message",
   1944     "Mime-Version":"1.0",
   1945     "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",
   1946     "Content-Disposition":"inline",
   1947     "User-Agent": "Mutt/1.5.9i" }
   1948 
   1949 _sample_payloads = ("""This is a sample message.
   1950 
   1951 --
   1952 Gregory K. Johnson
   1953 """,
   1954 """H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
   1955 3FYlAAAA
   1956 """)
   1957 
   1958 
   1959 def test_main():
   1960     tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH,
   1961              TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage,
   1962              TestMHMessage, TestBabylMessage, TestMMDFMessage,
   1963              TestMessageConversion, TestProxyFile, TestPartialFile,
   1964              MaildirTestCase)
   1965     test_support.run_unittest(*tests)
   1966     test_support.reap_children()
   1967 
   1968 
   1969 if __name__ == '__main__':
   1970     test_main()
   1971