Home | History | Annotate | Download | only in test
      1 import os
      2 import sys
      3 import time
      4 import stat
      5 import socket
      6 import email
      7 import email.message
      8 import re
      9 import io
     10 import tempfile
     11 from test import support
     12 import unittest
     13 import textwrap
     14 import mailbox
     15 import glob
     16 
     17 
     18 class TestBase:
     19 
     20     all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage,
     21                          mailbox.mboxMessage, mailbox.MHMessage,
     22                          mailbox.BabylMessage, mailbox.MMDFMessage)
     23 
     24     def _check_sample(self, msg):
     25         # Inspect a mailbox.Message representation of the sample message
     26         self.assertIsInstance(msg, email.message.Message)
     27         self.assertIsInstance(msg, mailbox.Message)
     28         for key, value in _sample_headers.items():
     29             self.assertIn(value, msg.get_all(key))
     30         self.assertTrue(msg.is_multipart())
     31         self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
     32         for i, payload in enumerate(_sample_payloads):
     33             part = msg.get_payload(i)
     34             self.assertIsInstance(part, email.message.Message)
     35             self.assertNotIsInstance(part, mailbox.Message)
     36             self.assertEqual(part.get_payload(), payload)
     37 
     38     def _delete_recursively(self, target):
     39         # Delete a file or delete a directory recursively
     40         if os.path.isdir(target):
     41             support.rmtree(target)
     42         elif os.path.exists(target):
     43             support.unlink(target)
     44 
     45 
     46 class TestMailbox(TestBase):
     47 
     48     maxDiff = None
     49 
     50     _factory = None     # Overridden by subclasses to reuse tests
     51     _template = 'From: foo\n\n%s\n'
     52 
     53     def setUp(self):
     54         self._path = support.TESTFN
     55         self._delete_recursively(self._path)
     56         self._box = self._factory(self._path)
     57 
     58     def tearDown(self):
     59         self._box.close()
     60         self._delete_recursively(self._path)
     61 
     62     def test_add(self):
     63         # Add copies of a sample message
     64         keys = []
     65         keys.append(self._box.add(self._template % 0))
     66         self.assertEqual(len(self._box), 1)
     67         keys.append(self._box.add(mailbox.Message(_sample_message)))
     68         self.assertEqual(len(self._box), 2)
     69         keys.append(self._box.add(email.message_from_string(_sample_message)))
     70         self.assertEqual(len(self._box), 3)
     71         keys.append(self._box.add(io.BytesIO(_bytes_sample_message)))
     72         self.assertEqual(len(self._box), 4)
     73         keys.append(self._box.add(_sample_message))
     74         self.assertEqual(len(self._box), 5)
     75         keys.append(self._box.add(_bytes_sample_message))
     76         self.assertEqual(len(self._box), 6)
     77         with self.assertWarns(DeprecationWarning):
     78             keys.append(self._box.add(
     79                 io.TextIOWrapper(io.BytesIO(_bytes_sample_message))))
     80         self.assertEqual(len(self._box), 7)
     81         self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
     82         for i in (1, 2, 3, 4, 5, 6):
     83             self._check_sample(self._box[keys[i]])
     84 
     85     _nonascii_msg = textwrap.dedent("""\
     86             From: foo
     87             Subject: Falinaptr hzhozszlltssal. Mr rendeltl?
     88 
     89             0
     90             """)
     91 
     92     def test_add_invalid_8bit_bytes_header(self):
     93         key = self._box.add(self._nonascii_msg.encode('latin-1'))
     94         self.assertEqual(len(self._box), 1)
     95         self.assertEqual(self._box.get_bytes(key),
     96             self._nonascii_msg.encode('latin-1'))
     97 
     98     def test_invalid_nonascii_header_as_string(self):
     99         subj = self._nonascii_msg.splitlines()[1]
    100         key = self._box.add(subj.encode('latin-1'))
    101         self.assertEqual(self._box.get_string(key),
    102             'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz'
    103             'YWwuIE3hciByZW5kZWx06Ww/?=\n\n')
    104 
    105     def test_add_nonascii_string_header_raises(self):
    106         with self.assertRaisesRegex(ValueError, "ASCII-only"):
    107             self._box.add(self._nonascii_msg)
    108         self._box.flush()
    109         self.assertEqual(len(self._box), 0)
    110         self.assertMailboxEmpty()
    111 
    112     def test_add_that_raises_leaves_mailbox_empty(self):
    113         def raiser(*args, **kw):
    114             raise Exception("a fake error")
    115         support.patch(self, email.generator.BytesGenerator, 'flatten', raiser)
    116         with self.assertRaises(Exception):
    117             self._box.add(email.message_from_string("From: Alphso"))
    118         self.assertEqual(len(self._box), 0)
    119         self._box.close()
    120         self.assertMailboxEmpty()
    121 
    122     _non_latin_bin_msg = textwrap.dedent("""\
    123         From: foo (at] bar.com
    124         To: bz
    125         Subject: Maintenant je vous prsente mon collgue, le pouf clbre
    126         \tJean de Baddie
    127         Mime-Version: 1.0
    128         Content-Type: text/plain; charset="utf-8"
    129         Content-Transfer-Encoding: 8bit
    130 
    131         ,  .
    132         """).encode('utf-8')
    133 
    134     def test_add_8bit_body(self):
    135         key = self._box.add(self._non_latin_bin_msg)
    136         self.assertEqual(self._box.get_bytes(key),
    137                          self._non_latin_bin_msg)
    138         with self._box.get_file(key) as f:
    139             self.assertEqual(f.read(),
    140                              self._non_latin_bin_msg.replace(b'\n',
    141                                 os.linesep.encode()))
    142         self.assertEqual(self._box[key].get_payload(),
    143                         ",  .\n")
    144 
    145     def test_add_binary_file(self):
    146         with tempfile.TemporaryFile('wb+') as f:
    147             f.write(_bytes_sample_message)
    148             f.seek(0)
    149             key = self._box.add(f)
    150         self.assertEqual(self._box.get_bytes(key).split(b'\n'),
    151             _bytes_sample_message.split(b'\n'))
    152 
    153     def test_add_binary_nonascii_file(self):
    154         with tempfile.TemporaryFile('wb+') as f:
    155             f.write(self._non_latin_bin_msg)
    156             f.seek(0)
    157             key = self._box.add(f)
    158         self.assertEqual(self._box.get_bytes(key).split(b'\n'),
    159             self._non_latin_bin_msg.split(b'\n'))
    160 
    161     def test_add_text_file_warns(self):
    162         with tempfile.TemporaryFile('w+') as f:
    163             f.write(_sample_message)
    164             f.seek(0)
    165             with self.assertWarns(DeprecationWarning):
    166                 key = self._box.add(f)
    167         self.assertEqual(self._box.get_bytes(key).split(b'\n'),
    168             _bytes_sample_message.split(b'\n'))
    169 
    170     def test_add_StringIO_warns(self):
    171         with self.assertWarns(DeprecationWarning):
    172             key = self._box.add(io.StringIO(self._template % "0"))
    173         self.assertEqual(self._box.get_string(key), self._template % "0")
    174 
    175     def test_add_nonascii_StringIO_raises(self):
    176         with self.assertWarns(DeprecationWarning):
    177             with self.assertRaisesRegex(ValueError, "ASCII-only"):
    178                 self._box.add(io.StringIO(self._nonascii_msg))
    179         self.assertEqual(len(self._box), 0)
    180         self._box.close()
    181         self.assertMailboxEmpty()
    182 
    183     def test_remove(self):
    184         # Remove messages using remove()
    185         self._test_remove_or_delitem(self._box.remove)
    186 
    187     def test_delitem(self):
    188         # Remove messages using __delitem__()
    189         self._test_remove_or_delitem(self._box.__delitem__)
    190 
    191     def _test_remove_or_delitem(self, method):
    192         # (Used by test_remove() and test_delitem().)
    193         key0 = self._box.add(self._template % 0)
    194         key1 = self._box.add(self._template % 1)
    195         self.assertEqual(len(self._box), 2)
    196         method(key0)
    197         self.assertEqual(len(self._box), 1)
    198         self.assertRaises(KeyError, lambda: self._box[key0])
    199         self.assertRaises(KeyError, lambda: method(key0))
    200         self.assertEqual(self._box.get_string(key1), self._template % 1)
    201         key2 = self._box.add(self._template % 2)
    202         self.assertEqual(len(self._box), 2)
    203         method(key2)
    204         self.assertEqual(len(self._box), 1)
    205         self.assertRaises(KeyError, lambda: self._box[key2])
    206         self.assertRaises(KeyError, lambda: method(key2))
    207         self.assertEqual(self._box.get_string(key1), self._template % 1)
    208         method(key1)
    209         self.assertEqual(len(self._box), 0)
    210         self.assertRaises(KeyError, lambda: self._box[key1])
    211         self.assertRaises(KeyError, lambda: method(key1))
    212 
    213     def test_discard(self, repetitions=10):
    214         # Discard messages
    215         key0 = self._box.add(self._template % 0)
    216         key1 = self._box.add(self._template % 1)
    217         self.assertEqual(len(self._box), 2)
    218         self._box.discard(key0)
    219         self.assertEqual(len(self._box), 1)
    220         self.assertRaises(KeyError, lambda: self._box[key0])
    221         self._box.discard(key0)
    222         self.assertEqual(len(self._box), 1)
    223         self.assertRaises(KeyError, lambda: self._box[key0])
    224 
    225     def test_get(self):
    226         # Retrieve messages using get()
    227         key0 = self._box.add(self._template % 0)
    228         msg = self._box.get(key0)
    229         self.assertEqual(msg['from'], 'foo')
    230         self.assertEqual(msg.get_payload(), '0\n')
    231         self.assertIsNone(self._box.get('foo'))
    232         self.assertIs(self._box.get('foo', False), False)
    233         self._box.close()
    234         self._box = self._factory(self._path)
    235         key1 = self._box.add(self._template % 1)
    236         msg = self._box.get(key1)
    237         self.assertEqual(msg['from'], 'foo')
    238         self.assertEqual(msg.get_payload(), '1\n')
    239 
    240     def test_getitem(self):
    241         # Retrieve message using __getitem__()
    242         key0 = self._box.add(self._template % 0)
    243         msg = self._box[key0]
    244         self.assertEqual(msg['from'], 'foo')
    245         self.assertEqual(msg.get_payload(), '0\n')
    246         self.assertRaises(KeyError, lambda: self._box['foo'])
    247         self._box.discard(key0)
    248         self.assertRaises(KeyError, lambda: self._box[key0])
    249 
    250     def test_get_message(self):
    251         # Get Message representations of messages
    252         key0 = self._box.add(self._template % 0)
    253         key1 = self._box.add(_sample_message)
    254         msg0 = self._box.get_message(key0)
    255         self.assertIsInstance(msg0, mailbox.Message)
    256         self.assertEqual(msg0['from'], 'foo')
    257         self.assertEqual(msg0.get_payload(), '0\n')
    258         self._check_sample(self._box.get_message(key1))
    259 
    260     def test_get_bytes(self):
    261         # Get bytes representations of messages
    262         key0 = self._box.add(self._template % 0)
    263         key1 = self._box.add(_sample_message)
    264         self.assertEqual(self._box.get_bytes(key0),
    265             (self._template % 0).encode('ascii'))
    266         self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message)
    267 
    268     def test_get_string(self):
    269         # Get string representations of messages
    270         key0 = self._box.add(self._template % 0)
    271         key1 = self._box.add(_sample_message)
    272         self.assertEqual(self._box.get_string(key0), self._template % 0)
    273         self.assertEqual(self._box.get_string(key1).split('\n'),
    274                          _sample_message.split('\n'))
    275 
    276     def test_get_file(self):
    277         # Get file representations of messages
    278         key0 = self._box.add(self._template % 0)
    279         key1 = self._box.add(_sample_message)
    280         with self._box.get_file(key0) as file:
    281             data0 = file.read()
    282         with self._box.get_file(key1) as file:
    283             data1 = file.read()
    284         self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'),
    285                          self._template % 0)
    286         self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'),
    287                          _sample_message)
    288 
    289     def test_get_file_can_be_closed_twice(self):
    290         # Issue 11700
    291         key = self._box.add(_sample_message)
    292         f = self._box.get_file(key)
    293         f.close()
    294         f.close()
    295 
    296     def test_iterkeys(self):
    297         # Get keys using iterkeys()
    298         self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)
    299 
    300     def test_keys(self):
    301         # Get keys using keys()
    302         self._check_iteration(self._box.keys, do_keys=True, do_values=False)
    303 
    304     def test_itervalues(self):
    305         # Get values using itervalues()
    306         self._check_iteration(self._box.itervalues, do_keys=False,
    307                               do_values=True)
    308 
    309     def test_iter(self):
    310         # Get values using __iter__()
    311         self._check_iteration(self._box.__iter__, do_keys=False,
    312                               do_values=True)
    313 
    314     def test_values(self):
    315         # Get values using values()
    316         self._check_iteration(self._box.values, do_keys=False, do_values=True)
    317 
    318     def test_iteritems(self):
    319         # Get keys and values using iteritems()
    320         self._check_iteration(self._box.iteritems, do_keys=True,
    321                               do_values=True)
    322 
    323     def test_items(self):
    324         # Get keys and values using items()
    325         self._check_iteration(self._box.items, do_keys=True, do_values=True)
    326 
    327     def _check_iteration(self, method, do_keys, do_values, repetitions=10):
    328         for value in method():
    329             self.fail("Not empty")
    330         keys, values = [], []
    331         for i in range(repetitions):
    332             keys.append(self._box.add(self._template % i))
    333             values.append(self._template % i)
    334         if do_keys and not do_values:
    335             returned_keys = list(method())
    336         elif do_values and not do_keys:
    337             returned_values = list(method())
    338         else:
    339             returned_keys, returned_values = [], []
    340             for key, value in method():
    341                 returned_keys.append(key)
    342                 returned_values.append(value)
    343         if do_keys:
    344             self.assertEqual(len(keys), len(returned_keys))
    345             self.assertEqual(set(keys), set(returned_keys))
    346         if do_values:
    347             count = 0
    348             for value in returned_values:
    349                 self.assertEqual(value['from'], 'foo')
    350                 self.assertLess(int(value.get_payload()), repetitions)
    351                 count += 1
    352             self.assertEqual(len(values), count)
    353 
    354     def test_contains(self):
    355         # Check existence of keys using __contains__()
    356         self.assertNotIn('foo', self._box)
    357         key0 = self._box.add(self._template % 0)
    358         self.assertIn(key0, self._box)
    359         self.assertNotIn('foo', self._box)
    360         key1 = self._box.add(self._template % 1)
    361         self.assertIn(key1, self._box)
    362         self.assertIn(key0, self._box)
    363         self.assertNotIn('foo', self._box)
    364         self._box.remove(key0)
    365         self.assertNotIn(key0, self._box)
    366         self.assertIn(key1, self._box)
    367         self.assertNotIn('foo', self._box)
    368         self._box.remove(key1)
    369         self.assertNotIn(key1, self._box)
    370         self.assertNotIn(key0, self._box)
    371         self.assertNotIn('foo', self._box)
    372 
    373     def test_len(self, repetitions=10):
    374         # Get message count
    375         keys = []
    376         for i in range(repetitions):
    377             self.assertEqual(len(self._box), i)
    378             keys.append(self._box.add(self._template % i))
    379             self.assertEqual(len(self._box), i + 1)
    380         for i in range(repetitions):
    381             self.assertEqual(len(self._box), repetitions - i)
    382             self._box.remove(keys[i])
    383             self.assertEqual(len(self._box), repetitions - i - 1)
    384 
    385     def test_set_item(self):
    386         # Modify messages using __setitem__()
    387         key0 = self._box.add(self._template % 'original 0')
    388         self.assertEqual(self._box.get_string(key0),
    389                          self._template % 'original 0')
    390         key1 = self._box.add(self._template % 'original 1')
    391         self.assertEqual(self._box.get_string(key1),
    392                          self._template % 'original 1')
    393         self._box[key0] = self._template % 'changed 0'
    394         self.assertEqual(self._box.get_string(key0),
    395                          self._template % 'changed 0')
    396         self._box[key1] = self._template % 'changed 1'
    397         self.assertEqual(self._box.get_string(key1),
    398                          self._template % 'changed 1')
    399         self._box[key0] = _sample_message
    400         self._check_sample(self._box[key0])
    401         self._box[key1] = self._box[key0]
    402         self._check_sample(self._box[key1])
    403         self._box[key0] = self._template % 'original 0'
    404         self.assertEqual(self._box.get_string(key0),
    405                      self._template % 'original 0')
    406         self._check_sample(self._box[key1])
    407         self.assertRaises(KeyError,
    408                           lambda: self._box.__setitem__('foo', 'bar'))
    409         self.assertRaises(KeyError, lambda: self._box['foo'])
    410         self.assertEqual(len(self._box), 2)
    411 
    412     def test_clear(self, iterations=10):
    413         # Remove all messages using clear()
    414         keys = []
    415         for i in range(iterations):
    416             self._box.add(self._template % i)
    417         for i, key in enumerate(keys):
    418             self.assertEqual(self._box.get_string(key), self._template % i)
    419         self._box.clear()
    420         self.assertEqual(len(self._box), 0)
    421         for i, key in enumerate(keys):
    422             self.assertRaises(KeyError, lambda: self._box.get_string(key))
    423 
    424     def test_pop(self):
    425         # Get and remove a message using pop()
    426         key0 = self._box.add(self._template % 0)
    427         self.assertIn(key0, self._box)
    428         key1 = self._box.add(self._template % 1)
    429         self.assertIn(key1, self._box)
    430         self.assertEqual(self._box.pop(key0).get_payload(), '0\n')
    431         self.assertNotIn(key0, self._box)
    432         self.assertIn(key1, self._box)
    433         key2 = self._box.add(self._template % 2)
    434         self.assertIn(key2, self._box)
    435         self.assertEqual(self._box.pop(key2).get_payload(), '2\n')
    436         self.assertNotIn(key2, self._box)
    437         self.assertIn(key1, self._box)
    438         self.assertEqual(self._box.pop(key1).get_payload(), '1\n')
    439         self.assertNotIn(key1, self._box)
    440         self.assertEqual(len(self._box), 0)
    441 
    442     def test_popitem(self, iterations=10):
    443         # Get and remove an arbitrary (key, message) using popitem()
    444         keys = []
    445         for i in range(10):
    446             keys.append(self._box.add(self._template % i))
    447         seen = []
    448         for i in range(10):
    449             key, msg = self._box.popitem()
    450             self.assertIn(key, keys)
    451             self.assertNotIn(key, seen)
    452             seen.append(key)
    453             self.assertEqual(int(msg.get_payload()), keys.index(key))
    454         self.assertEqual(len(self._box), 0)
    455         for key in keys:
    456             self.assertRaises(KeyError, lambda: self._box[key])
    457 
    458     def test_update(self):
    459         # Modify multiple messages using update()
    460         key0 = self._box.add(self._template % 'original 0')
    461         key1 = self._box.add(self._template % 'original 1')
    462         key2 = self._box.add(self._template % 'original 2')
    463         self._box.update({key0: self._template % 'changed 0',
    464                           key2: _sample_message})
    465         self.assertEqual(len(self._box), 3)
    466         self.assertEqual(self._box.get_string(key0),
    467                      self._template % 'changed 0')
    468         self.assertEqual(self._box.get_string(key1),
    469                      self._template % 'original 1')
    470         self._check_sample(self._box[key2])
    471         self._box.update([(key2, self._template % 'changed 2'),
    472                     (key1, self._template % 'changed 1'),
    473                     (key0, self._template % 'original 0')])
    474         self.assertEqual(len(self._box), 3)
    475         self.assertEqual(self._box.get_string(key0),
    476                      self._template % 'original 0')
    477         self.assertEqual(self._box.get_string(key1),
    478                      self._template % 'changed 1')
    479         self.assertEqual(self._box.get_string(key2),
    480                      self._template % 'changed 2')
    481         self.assertRaises(KeyError,
    482                           lambda: self._box.update({'foo': 'bar',
    483                                           key0: self._template % "changed 0"}))
    484         self.assertEqual(len(self._box), 3)
    485         self.assertEqual(self._box.get_string(key0),
    486                      self._template % "changed 0")
    487         self.assertEqual(self._box.get_string(key1),
    488                      self._template % "changed 1")
    489         self.assertEqual(self._box.get_string(key2),
    490                      self._template % "changed 2")
    491 
    492     def test_flush(self):
    493         # Write changes to disk
    494         self._test_flush_or_close(self._box.flush, True)
    495 
    496     def test_popitem_and_flush_twice(self):
    497         # See #15036.
    498         self._box.add(self._template % 0)
    499         self._box.add(self._template % 1)
    500         self._box.flush()
    501 
    502         self._box.popitem()
    503         self._box.flush()
    504         self._box.popitem()
    505         self._box.flush()
    506 
    507     def test_lock_unlock(self):
    508         # Lock and unlock the mailbox
    509         self.assertFalse(os.path.exists(self._get_lock_path()))
    510         self._box.lock()
    511         self.assertTrue(os.path.exists(self._get_lock_path()))
    512         self._box.unlock()
    513         self.assertFalse(os.path.exists(self._get_lock_path()))
    514 
    515     def test_close(self):
    516         # Close mailbox and flush changes to disk
    517         self._test_flush_or_close(self._box.close, False)
    518 
    519     def _test_flush_or_close(self, method, should_call_close):
    520         contents = [self._template % i for i in range(3)]
    521         self._box.add(contents[0])
    522         self._box.add(contents[1])
    523         self._box.add(contents[2])
    524         oldbox = self._box
    525         method()
    526         if should_call_close:
    527             self._box.close()
    528         self._box = self._factory(self._path)
    529         keys = self._box.keys()
    530         self.assertEqual(len(keys), 3)
    531         for key in keys:
    532             self.assertIn(self._box.get_string(key), contents)
    533         oldbox.close()
    534 
    535     def test_dump_message(self):
    536         # Write message representations to disk
    537         for input in (email.message_from_string(_sample_message),
    538                       _sample_message, io.BytesIO(_bytes_sample_message)):
    539             output = io.BytesIO()
    540             self._box._dump_message(input, output)
    541             self.assertEqual(output.getvalue(),
    542                 _bytes_sample_message.replace(b'\n', os.linesep.encode()))
    543         output = io.BytesIO()
    544         self.assertRaises(TypeError,
    545                           lambda: self._box._dump_message(None, output))
    546 
    547     def _get_lock_path(self):
    548         # Return the path of the dot lock file. May be overridden.
    549         return self._path + '.lock'
    550 
    551 
    552 class TestMailboxSuperclass(TestBase, unittest.TestCase):
    553 
    554     def test_notimplemented(self):
    555         # Test that all Mailbox methods raise NotImplementedException.
    556         box = mailbox.Mailbox('path')
    557         self.assertRaises(NotImplementedError, lambda: box.add(''))
    558         self.assertRaises(NotImplementedError, lambda: box.remove(''))
    559         self.assertRaises(NotImplementedError, lambda: box.__delitem__(''))
    560         self.assertRaises(NotImplementedError, lambda: box.discard(''))
    561         self.assertRaises(NotImplementedError, lambda: box.__setitem__('', ''))
    562         self.assertRaises(NotImplementedError, lambda: box.iterkeys())
    563         self.assertRaises(NotImplementedError, lambda: box.keys())
    564         self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__())
    565         self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__())
    566         self.assertRaises(NotImplementedError, lambda: box.values())
    567         self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__())
    568         self.assertRaises(NotImplementedError, lambda: box.items())
    569         self.assertRaises(NotImplementedError, lambda: box.get(''))
    570         self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
    571         self.assertRaises(NotImplementedError, lambda: box.get_message(''))
    572         self.assertRaises(NotImplementedError, lambda: box.get_string(''))
    573         self.assertRaises(NotImplementedError, lambda: box.get_bytes(''))
    574         self.assertRaises(NotImplementedError, lambda: box.get_file(''))
    575         self.assertRaises(NotImplementedError, lambda: '' in box)
    576         self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
    577         self.assertRaises(NotImplementedError, lambda: box.__len__())
    578         self.assertRaises(NotImplementedError, lambda: box.clear())
    579         self.assertRaises(NotImplementedError, lambda: box.pop(''))
    580         self.assertRaises(NotImplementedError, lambda: box.popitem())
    581         self.assertRaises(NotImplementedError, lambda: box.update((('', ''),)))
    582         self.assertRaises(NotImplementedError, lambda: box.flush())
    583         self.assertRaises(NotImplementedError, lambda: box.lock())
    584         self.assertRaises(NotImplementedError, lambda: box.unlock())
    585         self.assertRaises(NotImplementedError, lambda: box.close())
    586 
    587 
    588 class TestMaildir(TestMailbox, unittest.TestCase):
    589 
    590     _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
    591 
    592     def setUp(self):
    593         TestMailbox.setUp(self)
    594         if (os.name == 'nt') or (sys.platform == 'cygwin'):
    595             self._box.colon = '!'
    596 
    597     def assertMailboxEmpty(self):
    598         self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), [])
    599 
    600     def test_add_MM(self):
    601         # Add a MaildirMessage instance
    602         msg = mailbox.MaildirMessage(self._template % 0)
    603         msg.set_subdir('cur')
    604         msg.set_info('foo')
    605         key = self._box.add(msg)
    606         self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %
    607                                                  (key, self._box.colon))))
    608 
    609     def test_get_MM(self):
    610         # Get a MaildirMessage instance
    611         msg = mailbox.MaildirMessage(self._template % 0)
    612         msg.set_subdir('cur')
    613         msg.set_flags('RF')
    614         key = self._box.add(msg)
    615         msg_returned = self._box.get_message(key)
    616         self.assertIsInstance(msg_returned, mailbox.MaildirMessage)
    617         self.assertEqual(msg_returned.get_subdir(), 'cur')
    618         self.assertEqual(msg_returned.get_flags(), 'FR')
    619 
    620     def test_set_MM(self):
    621         # Set with a MaildirMessage instance
    622         msg0 = mailbox.MaildirMessage(self._template % 0)
    623         msg0.set_flags('TP')
    624         key = self._box.add(msg0)
    625         msg_returned = self._box.get_message(key)
    626         self.assertEqual(msg_returned.get_subdir(), 'new')
    627         self.assertEqual(msg_returned.get_flags(), 'PT')
    628         msg1 = mailbox.MaildirMessage(self._template % 1)
    629         self._box[key] = msg1
    630         msg_returned = self._box.get_message(key)
    631         self.assertEqual(msg_returned.get_subdir(), 'new')
    632         self.assertEqual(msg_returned.get_flags(), '')
    633         self.assertEqual(msg_returned.get_payload(), '1\n')
    634         msg2 = mailbox.MaildirMessage(self._template % 2)
    635         msg2.set_info('2,S')
    636         self._box[key] = msg2
    637         self._box[key] = self._template % 3
    638         msg_returned = self._box.get_message(key)
    639         self.assertEqual(msg_returned.get_subdir(), 'new')
    640         self.assertEqual(msg_returned.get_flags(), 'S')
    641         self.assertEqual(msg_returned.get_payload(), '3\n')
    642 
    643     def test_consistent_factory(self):
    644         # Add a message.
    645         msg = mailbox.MaildirMessage(self._template % 0)
    646         msg.set_subdir('cur')
    647         msg.set_flags('RF')
    648         key = self._box.add(msg)
    649 
    650         # Create new mailbox with
    651         class FakeMessage(mailbox.MaildirMessage):
    652             pass
    653         box = mailbox.Maildir(self._path, factory=FakeMessage)
    654         box.colon = self._box.colon
    655         msg2 = box.get_message(key)
    656         self.assertIsInstance(msg2, FakeMessage)
    657 
    658     def test_initialize_new(self):
    659         # Initialize a non-existent mailbox
    660         self.tearDown()
    661         self._box = mailbox.Maildir(self._path)
    662         self._check_basics()
    663         self._delete_recursively(self._path)
    664         self._box = self._factory(self._path, factory=None)
    665         self._check_basics()
    666 
    667     def test_initialize_existing(self):
    668         # Initialize an existing mailbox
    669         self.tearDown()
    670         for subdir in '', 'tmp', 'new', 'cur':
    671             os.mkdir(os.path.normpath(os.path.join(self._path, subdir)))
    672         self._box = mailbox.Maildir(self._path)
    673         self._check_basics()
    674 
    675     def _check_basics(self, factory=None):
    676         # (Used by test_open_new() and test_open_existing().)
    677         self.assertEqual(self._box._path, os.path.abspath(self._path))
    678         self.assertEqual(self._box._factory, factory)
    679         for subdir in '', 'tmp', 'new', 'cur':
    680             path = os.path.join(self._path, subdir)
    681             mode = os.stat(path)[stat.ST_MODE]
    682             self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
    683 
    684     def test_list_folders(self):
    685         # List folders
    686         self._box.add_folder('one')
    687         self._box.add_folder('two')
    688         self._box.add_folder('three')
    689         self.assertEqual(len(self._box.list_folders()), 3)
    690         self.assertEqual(set(self._box.list_folders()),
    691                      set(('one', 'two', 'three')))
    692 
    693     def test_get_folder(self):
    694         # Open folders
    695         self._box.add_folder('foo.bar')
    696         folder0 = self._box.get_folder('foo.bar')
    697         folder0.add(self._template % 'bar')
    698         self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar')))
    699         folder1 = self._box.get_folder('foo.bar')
    700         self.assertEqual(folder1.get_string(folder1.keys()[0]),
    701                          self._template % 'bar')
    702 
    703     def test_add_and_remove_folders(self):
    704         # Delete folders
    705         self._box.add_folder('one')
    706         self._box.add_folder('two')
    707         self.assertEqual(len(self._box.list_folders()), 2)
    708         self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
    709         self._box.remove_folder('one')
    710         self.assertEqual(len(self._box.list_folders()), 1)
    711         self.assertEqual(set(self._box.list_folders()), set(('two',)))
    712         self._box.add_folder('three')
    713         self.assertEqual(len(self._box.list_folders()), 2)
    714         self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
    715         self._box.remove_folder('three')
    716         self.assertEqual(len(self._box.list_folders()), 1)
    717         self.assertEqual(set(self._box.list_folders()), set(('two',)))
    718         self._box.remove_folder('two')
    719         self.assertEqual(len(self._box.list_folders()), 0)
    720         self.assertEqual(self._box.list_folders(), [])
    721 
    722     def test_clean(self):
    723         # Remove old files from 'tmp'
    724         foo_path = os.path.join(self._path, 'tmp', 'foo')
    725         bar_path = os.path.join(self._path, 'tmp', 'bar')
    726         with open(foo_path, 'w') as f:
    727             f.write("@")
    728         with open(bar_path, 'w') as f:
    729             f.write("@")
    730         self._box.clean()
    731         self.assertTrue(os.path.exists(foo_path))
    732         self.assertTrue(os.path.exists(bar_path))
    733         foo_stat = os.stat(foo_path)
    734         os.utime(foo_path, (time.time() - 129600 - 2,
    735                             foo_stat.st_mtime))
    736         self._box.clean()
    737         self.assertFalse(os.path.exists(foo_path))
    738         self.assertTrue(os.path.exists(bar_path))
    739 
    740     def test_create_tmp(self, repetitions=10):
    741         # Create files in tmp directory
    742         hostname = socket.gethostname()
    743         if '/' in hostname:
    744             hostname = hostname.replace('/', r'\057')
    745         if ':' in hostname:
    746             hostname = hostname.replace(':', r'\072')
    747         pid = os.getpid()
    748         pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"
    749                              r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)")
    750         previous_groups = None
    751         for x in range(repetitions):
    752             tmp_file = self._box._create_tmp()
    753             head, tail = os.path.split(tmp_file.name)
    754             self.assertEqual(head, os.path.abspath(os.path.join(self._path,
    755                                                                 "tmp")),
    756                              "File in wrong location: '%s'" % head)
    757             match = pattern.match(tail)
    758             self.assertIsNotNone(match, "Invalid file name: '%s'" % tail)
    759             groups = match.groups()
    760             if previous_groups is not None:
    761                 self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]),
    762                              "Non-monotonic seconds: '%s' before '%s'" %
    763                              (previous_groups[0], groups[0]))
    764                 if int(groups[0]) == int(previous_groups[0]):
    765                     self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]),
    766                                 "Non-monotonic milliseconds: '%s' before '%s'" %
    767                                 (previous_groups[1], groups[1]))
    768                 self.assertEqual(int(groups[2]), pid,
    769                              "Process ID mismatch: '%s' should be '%s'" %
    770                              (groups[2], pid))
    771                 self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1,
    772                              "Non-sequential counter: '%s' before '%s'" %
    773                              (previous_groups[3], groups[3]))
    774                 self.assertEqual(groups[4], hostname,
    775                              "Host name mismatch: '%s' should be '%s'" %
    776                              (groups[4], hostname))
    777             previous_groups = groups
    778             tmp_file.write(_bytes_sample_message)
    779             tmp_file.seek(0)
    780             self.assertEqual(tmp_file.read(), _bytes_sample_message)
    781             tmp_file.close()
    782         file_count = len(os.listdir(os.path.join(self._path, "tmp")))
    783         self.assertEqual(file_count, repetitions,
    784                      "Wrong file count: '%s' should be '%s'" %
    785                      (file_count, repetitions))
    786 
    787     def test_refresh(self):
    788         # Update the table of contents
    789         self.assertEqual(self._box._toc, {})
    790         key0 = self._box.add(self._template % 0)
    791         key1 = self._box.add(self._template % 1)
    792         self.assertEqual(self._box._toc, {})
    793         self._box._refresh()
    794         self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
    795                                           key1: os.path.join('new', key1)})
    796         key2 = self._box.add(self._template % 2)
    797         self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
    798                                           key1: os.path.join('new', key1)})
    799         self._box._refresh()
    800         self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
    801                                           key1: os.path.join('new', key1),
    802                                           key2: os.path.join('new', key2)})
    803 
    804     def test_refresh_after_safety_period(self):
    805         # Issue #13254: Call _refresh after the "file system safety
    806         # period" of 2 seconds has passed; _toc should still be
    807         # updated because this is the first call to _refresh.
    808         key0 = self._box.add(self._template % 0)
    809         key1 = self._box.add(self._template % 1)
    810 
    811         self._box = self._factory(self._path)
    812         self.assertEqual(self._box._toc, {})
    813 
    814         # Emulate sleeping. Instead of sleeping for 2 seconds, use the
    815         # skew factor to make _refresh think that the filesystem
    816         # safety period has passed and re-reading the _toc is only
    817         # required if mtimes differ.
    818         self._box._skewfactor = -3
    819 
    820         self._box._refresh()
    821         self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1]))
    822 
    823     def test_lookup(self):
    824         # Look up message subpaths in the TOC
    825         self.assertRaises(KeyError, lambda: self._box._lookup('foo'))
    826         key0 = self._box.add(self._template % 0)
    827         self.assertEqual(self._box._lookup(key0), os.path.join('new', key0))
    828         os.remove(os.path.join(self._path, 'new', key0))
    829         self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)})
    830         # Be sure that the TOC is read back from disk (see issue #6896
    831         # about bad mtime behaviour on some systems).
    832         self._box.flush()
    833         self.assertRaises(KeyError, lambda: self._box._lookup(key0))
    834         self.assertEqual(self._box._toc, {})
    835 
    836     def test_lock_unlock(self):
    837         # Lock and unlock the mailbox. For Maildir, this does nothing.
    838         self._box.lock()
    839         self._box.unlock()
    840 
    841     def test_folder (self):
    842         # Test for bug #1569790: verify that folders returned by .get_folder()
    843         # use the same factory function.
    844         def dummy_factory (s):
    845             return None
    846         box = self._factory(self._path, factory=dummy_factory)
    847         folder = box.add_folder('folder1')
    848         self.assertIs(folder._factory, dummy_factory)
    849 
    850         folder1_alias = box.get_folder('folder1')
    851         self.assertIs(folder1_alias._factory, dummy_factory)
    852 
    853     def test_directory_in_folder (self):
    854         # Test that mailboxes still work if there's a stray extra directory
    855         # in a folder.
    856         for i in range(10):
    857             self._box.add(mailbox.Message(_sample_message))
    858 
    859         # Create a stray directory
    860         os.mkdir(os.path.join(self._path, 'cur', 'stray-dir'))
    861 
    862         # Check that looping still works with the directory present.
    863         for msg in self._box:
    864             pass
    865 
    866     @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
    867     @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
    868     def test_file_permissions(self):
    869         # Verify that message files are created without execute permissions
    870         msg = mailbox.MaildirMessage(self._template % 0)
    871         orig_umask = os.umask(0)
    872         try:
    873             key = self._box.add(msg)
    874         finally:
    875             os.umask(orig_umask)
    876         path = os.path.join(self._path, self._box._lookup(key))
    877         mode = os.stat(path).st_mode
    878         self.assertFalse(mode & 0o111)
    879 
    880     @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
    881     @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
    882     def test_folder_file_perms(self):
    883         # From bug #3228, we want to verify that the file created inside a Maildir
    884         # subfolder isn't marked as executable.
    885         orig_umask = os.umask(0)
    886         try:
    887             subfolder = self._box.add_folder('subfolder')
    888         finally:
    889             os.umask(orig_umask)
    890 
    891         path = os.path.join(subfolder._path, 'maildirfolder')
    892         st = os.stat(path)
    893         perms = st.st_mode
    894         self.assertFalse((perms & 0o111)) # Execute bits should all be off.
    895 
    896     def test_reread(self):
    897         # Do an initial unconditional refresh
    898         self._box._refresh()
    899 
    900         # Put the last modified times more than two seconds into the past
    901         # (because mtime may have a two second granularity)
    902         for subdir in ('cur', 'new'):
    903             os.utime(os.path.join(self._box._path, subdir),
    904                      (time.time()-5,)*2)
    905 
    906         # Because mtime has a two second granularity in worst case (FAT), a
    907         # refresh is done unconditionally if called for within
    908         # two-second-plus-a-bit of the last one, just in case the mbox has
    909         # changed; so now we have to wait for that interval to expire.
    910         #
    911         # Because this is a test, emulate sleeping. Instead of
    912         # sleeping for 2 seconds, use the skew factor to make _refresh
    913         # think that 2 seconds have passed and re-reading the _toc is
    914         # only required if mtimes differ.
    915         self._box._skewfactor = -3
    916 
    917         # Re-reading causes the ._toc attribute to be assigned a new dictionary
    918         # object, so we'll check that the ._toc attribute isn't a different
    919         # object.
    920         orig_toc = self._box._toc
    921         def refreshed():
    922             return self._box._toc is not orig_toc
    923 
    924         self._box._refresh()
    925         self.assertFalse(refreshed())
    926 
    927         # Now, write something into cur and remove it.  This changes
    928         # the mtime and should cause a re-read. Note that "sleep
    929         # emulation" is still in effect, as skewfactor is -3.
    930         filename = os.path.join(self._path, 'cur', 'stray-file')
    931         support.create_empty_file(filename)
    932         os.unlink(filename)
    933         self._box._refresh()
    934         self.assertTrue(refreshed())
    935 
    936 
    937 class _TestSingleFile(TestMailbox):
    938     '''Common tests for single-file mailboxes'''
    939 
    940     def test_add_doesnt_rewrite(self):
    941         # When only adding messages, flush() should not rewrite the
    942         # mailbox file. See issue #9559.
    943 
    944         # Inode number changes if the contents are written to another
    945         # file which is then renamed over the original file. So we
    946         # must check that the inode number doesn't change.
    947         inode_before = os.stat(self._path).st_ino
    948 
    949         self._box.add(self._template % 0)
    950         self._box.flush()
    951 
    952         inode_after = os.stat(self._path).st_ino
    953         self.assertEqual(inode_before, inode_after)
    954 
    955         # Make sure the message was really added
    956         self._box.close()
    957         self._box = self._factory(self._path)
    958         self.assertEqual(len(self._box), 1)
    959 
    960     def test_permissions_after_flush(self):
    961         # See issue #5346
    962 
    963         # Make the mailbox world writable. It's unlikely that the new
    964         # mailbox file would have these permissions after flush(),
    965         # because umask usually prevents it.
    966         mode = os.stat(self._path).st_mode | 0o666
    967         os.chmod(self._path, mode)
    968 
    969         self._box.add(self._template % 0)
    970         i = self._box.add(self._template % 1)
    971         # Need to remove one message to make flush() create a new file
    972         self._box.remove(i)
    973         self._box.flush()
    974 
    975         self.assertEqual(os.stat(self._path).st_mode, mode)
    976 
    977 
    978 class _TestMboxMMDF(_TestSingleFile):
    979 
    980     def tearDown(self):
    981         super().tearDown()
    982         self._box.close()
    983         self._delete_recursively(self._path)
    984         for lock_remnant in glob.glob(self._path + '.*'):
    985             support.unlink(lock_remnant)
    986 
    987     def assertMailboxEmpty(self):
    988         with open(self._path) as f:
    989             self.assertEqual(f.readlines(), [])
    990 
    991     def test_add_from_string(self):
    992         # Add a string starting with 'From ' to the mailbox
    993         key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n')
    994         self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
    995         self.assertEqual(self._box[key].get_payload(), '0\n')
    996 
    997     def test_add_from_bytes(self):
    998         # Add a byte string starting with 'From ' to the mailbox
    999         key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0\n')
   1000         self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
   1001         self.assertEqual(self._box[key].get_payload(), '0\n')
   1002 
   1003     def test_add_mbox_or_mmdf_message(self):
   1004         # Add an mboxMessage or MMDFMessage
   1005         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1006             msg = class_('From foo@bar blah\nFrom: foo\n\n0\n')
   1007             key = self._box.add(msg)
   1008 
   1009     def test_open_close_open(self):
   1010         # Open and inspect previously-created mailbox
   1011         values = [self._template % i for i in range(3)]
   1012         for value in values:
   1013             self._box.add(value)
   1014         self._box.close()
   1015         mtime = os.path.getmtime(self._path)
   1016         self._box = self._factory(self._path)
   1017         self.assertEqual(len(self._box), 3)
   1018         for key in self._box.iterkeys():
   1019             self.assertIn(self._box.get_string(key), values)
   1020         self._box.close()
   1021         self.assertEqual(mtime, os.path.getmtime(self._path))
   1022 
   1023     def test_add_and_close(self):
   1024         # Verifying that closing a mailbox doesn't change added items
   1025         self._box.add(_sample_message)
   1026         for i in range(3):
   1027             self._box.add(self._template % i)
   1028         self._box.add(_sample_message)
   1029         self._box._file.flush()
   1030         self._box._file.seek(0)
   1031         contents = self._box._file.read()
   1032         self._box.close()
   1033         with open(self._path, 'rb') as f:
   1034             self.assertEqual(contents, f.read())
   1035         self._box = self._factory(self._path)
   1036 
   1037     @unittest.skipUnless(hasattr(os, 'fork'), "Test needs fork().")
   1038     @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().")
   1039     def test_lock_conflict(self):
   1040         # Fork off a child process that will lock the mailbox temporarily,
   1041         # unlock it and exit.
   1042         c, p = socket.socketpair()
   1043         self.addCleanup(c.close)
   1044         self.addCleanup(p.close)
   1045 
   1046         pid = os.fork()
   1047         if pid == 0:
   1048             # child
   1049             try:
   1050                 # lock the mailbox, and signal the parent it can proceed
   1051                 self._box.lock()
   1052                 c.send(b'c')
   1053 
   1054                 # wait until the parent is done, and unlock the mailbox
   1055                 c.recv(1)
   1056                 self._box.unlock()
   1057             finally:
   1058                 os._exit(0)
   1059 
   1060         # In the parent, wait until the child signals it locked the mailbox.
   1061         p.recv(1)
   1062         try:
   1063             self.assertRaises(mailbox.ExternalClashError,
   1064                               self._box.lock)
   1065         finally:
   1066             # Signal the child it can now release the lock and exit.
   1067             p.send(b'p')
   1068             # Wait for child to exit.  Locking should now succeed.
   1069             exited_pid, status = os.waitpid(pid, 0)
   1070 
   1071         self._box.lock()
   1072         self._box.unlock()
   1073 
   1074     def test_relock(self):
   1075         # Test case for bug #1575506: the mailbox class was locking the
   1076         # wrong file object in its flush() method.
   1077         msg = "Subject: sub\n\nbody\n"
   1078         key1 = self._box.add(msg)
   1079         self._box.flush()
   1080         self._box.close()
   1081 
   1082         self._box = self._factory(self._path)
   1083         self._box.lock()
   1084         key2 = self._box.add(msg)
   1085         self._box.flush()
   1086         self.assertTrue(self._box._locked)
   1087         self._box.close()
   1088 
   1089 
   1090 class TestMbox(_TestMboxMMDF, unittest.TestCase):
   1091 
   1092     _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
   1093 
   1094     @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
   1095     @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
   1096     def test_file_perms(self):
   1097         # From bug #3228, we want to verify that the mailbox file isn't executable,
   1098         # even if the umask is set to something that would leave executable bits set.
   1099         # We only run this test on platforms that support umask.
   1100         try:
   1101             old_umask = os.umask(0o077)
   1102             self._box.close()
   1103             os.unlink(self._path)
   1104             self._box = mailbox.mbox(self._path, create=True)
   1105             self._box.add('')
   1106             self._box.close()
   1107         finally:
   1108             os.umask(old_umask)
   1109 
   1110         st = os.stat(self._path)
   1111         perms = st.st_mode
   1112         self.assertFalse((perms & 0o111)) # Execute bits should all be off.
   1113 
   1114     def test_terminating_newline(self):
   1115         message = email.message.Message()
   1116         message['From'] = 'john (at] example.com'
   1117         message.set_payload('No newline at the end')
   1118         i = self._box.add(message)
   1119 
   1120         # A newline should have been appended to the payload
   1121         message = self._box.get(i)
   1122         self.assertEqual(message.get_payload(), 'No newline at the end\n')
   1123 
   1124     def test_message_separator(self):
   1125         # Check there's always a single blank line after each message
   1126         self._box.add('From: foo\n\n0')  # No newline at the end
   1127         with open(self._path) as f:
   1128             data = f.read()
   1129             self.assertEqual(data[-3:], '0\n\n')
   1130 
   1131         self._box.add('From: foo\n\n0\n')  # Newline at the end
   1132         with open(self._path) as f:
   1133             data = f.read()
   1134             self.assertEqual(data[-3:], '0\n\n')
   1135 
   1136 
   1137 class TestMMDF(_TestMboxMMDF, unittest.TestCase):
   1138 
   1139     _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
   1140 
   1141 
   1142 class TestMH(TestMailbox, unittest.TestCase):
   1143 
   1144     _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
   1145 
   1146     def assertMailboxEmpty(self):
   1147         self.assertEqual(os.listdir(self._path), ['.mh_sequences'])
   1148 
   1149     def test_list_folders(self):
   1150         # List folders
   1151         self._box.add_folder('one')
   1152         self._box.add_folder('two')
   1153         self._box.add_folder('three')
   1154         self.assertEqual(len(self._box.list_folders()), 3)
   1155         self.assertEqual(set(self._box.list_folders()),
   1156                      set(('one', 'two', 'three')))
   1157 
   1158     def test_get_folder(self):
   1159         # Open folders
   1160         def dummy_factory (s):
   1161             return None
   1162         self._box = self._factory(self._path, dummy_factory)
   1163 
   1164         new_folder = self._box.add_folder('foo.bar')
   1165         folder0 = self._box.get_folder('foo.bar')
   1166         folder0.add(self._template % 'bar')
   1167         self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar')))
   1168         folder1 = self._box.get_folder('foo.bar')
   1169         self.assertEqual(folder1.get_string(folder1.keys()[0]),
   1170                          self._template % 'bar')
   1171 
   1172         # Test for bug #1569790: verify that folders returned by .get_folder()
   1173         # use the same factory function.
   1174         self.assertIs(new_folder._factory, self._box._factory)
   1175         self.assertIs(folder0._factory, self._box._factory)
   1176 
   1177     def test_add_and_remove_folders(self):
   1178         # Delete folders
   1179         self._box.add_folder('one')
   1180         self._box.add_folder('two')
   1181         self.assertEqual(len(self._box.list_folders()), 2)
   1182         self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
   1183         self._box.remove_folder('one')
   1184         self.assertEqual(len(self._box.list_folders()), 1)
   1185         self.assertEqual(set(self._box.list_folders()), set(('two',)))
   1186         self._box.add_folder('three')
   1187         self.assertEqual(len(self._box.list_folders()), 2)
   1188         self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
   1189         self._box.remove_folder('three')
   1190         self.assertEqual(len(self._box.list_folders()), 1)
   1191         self.assertEqual(set(self._box.list_folders()), set(('two',)))
   1192         self._box.remove_folder('two')
   1193         self.assertEqual(len(self._box.list_folders()), 0)
   1194         self.assertEqual(self._box.list_folders(), [])
   1195 
   1196     def test_sequences(self):
   1197         # Get and set sequences
   1198         self.assertEqual(self._box.get_sequences(), {})
   1199         msg0 = mailbox.MHMessage(self._template % 0)
   1200         msg0.add_sequence('foo')
   1201         key0 = self._box.add(msg0)
   1202         self.assertEqual(self._box.get_sequences(), {'foo':[key0]})
   1203         msg1 = mailbox.MHMessage(self._template % 1)
   1204         msg1.set_sequences(['bar', 'replied', 'foo'])
   1205         key1 = self._box.add(msg1)
   1206         self.assertEqual(self._box.get_sequences(),
   1207                      {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]})
   1208         msg0.set_sequences(['flagged'])
   1209         self._box[key0] = msg0
   1210         self.assertEqual(self._box.get_sequences(),
   1211                      {'foo':[key1], 'bar':[key1], 'replied':[key1],
   1212                       'flagged':[key0]})
   1213         self._box.remove(key1)
   1214         self.assertEqual(self._box.get_sequences(), {'flagged':[key0]})
   1215 
   1216     def test_issue2625(self):
   1217         msg0 = mailbox.MHMessage(self._template % 0)
   1218         msg0.add_sequence('foo')
   1219         key0 = self._box.add(msg0)
   1220         refmsg0 = self._box.get_message(key0)
   1221 
   1222     def test_issue7627(self):
   1223         msg0 = mailbox.MHMessage(self._template % 0)
   1224         key0 = self._box.add(msg0)
   1225         self._box.lock()
   1226         self._box.remove(key0)
   1227         self._box.unlock()
   1228 
   1229     def test_pack(self):
   1230         # Pack the contents of the mailbox
   1231         msg0 = mailbox.MHMessage(self._template % 0)
   1232         msg1 = mailbox.MHMessage(self._template % 1)
   1233         msg2 = mailbox.MHMessage(self._template % 2)
   1234         msg3 = mailbox.MHMessage(self._template % 3)
   1235         msg0.set_sequences(['foo', 'unseen'])
   1236         msg1.set_sequences(['foo'])
   1237         msg2.set_sequences(['foo', 'flagged'])
   1238         msg3.set_sequences(['foo', 'bar', 'replied'])
   1239         key0 = self._box.add(msg0)
   1240         key1 = self._box.add(msg1)
   1241         key2 = self._box.add(msg2)
   1242         key3 = self._box.add(msg3)
   1243         self.assertEqual(self._box.get_sequences(),
   1244                      {'foo':[key0,key1,key2,key3], 'unseen':[key0],
   1245                       'flagged':[key2], 'bar':[key3], 'replied':[key3]})
   1246         self._box.remove(key2)
   1247         self.assertEqual(self._box.get_sequences(),
   1248                      {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3],
   1249                       'replied':[key3]})
   1250         self._box.pack()
   1251         self.assertEqual(self._box.keys(), [1, 2, 3])
   1252         key0 = key0
   1253         key1 = key0 + 1
   1254         key2 = key1 + 1
   1255         self.assertEqual(self._box.get_sequences(),
   1256                      {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})
   1257 
   1258         # Test case for packing while holding the mailbox locked.
   1259         key0 = self._box.add(msg1)
   1260         key1 = self._box.add(msg1)
   1261         key2 = self._box.add(msg1)
   1262         key3 = self._box.add(msg1)
   1263 
   1264         self._box.remove(key0)
   1265         self._box.remove(key2)
   1266         self._box.lock()
   1267         self._box.pack()
   1268         self._box.unlock()
   1269         self.assertEqual(self._box.get_sequences(),
   1270                      {'foo':[1, 2, 3, 4, 5],
   1271                       'unseen':[1], 'bar':[3], 'replied':[3]})
   1272 
   1273     def _get_lock_path(self):
   1274         return os.path.join(self._path, '.mh_sequences.lock')
   1275 
   1276 
   1277 class TestBabyl(_TestSingleFile, unittest.TestCase):
   1278 
   1279     _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
   1280 
   1281     def assertMailboxEmpty(self):
   1282         with open(self._path) as f:
   1283             self.assertEqual(f.readlines(), [])
   1284 
   1285     def tearDown(self):
   1286         super().tearDown()
   1287         self._box.close()
   1288         self._delete_recursively(self._path)
   1289         for lock_remnant in glob.glob(self._path + '.*'):
   1290             support.unlink(lock_remnant)
   1291 
   1292     def test_labels(self):
   1293         # Get labels from the mailbox
   1294         self.assertEqual(self._box.get_labels(), [])
   1295         msg0 = mailbox.BabylMessage(self._template % 0)
   1296         msg0.add_label('foo')
   1297         key0 = self._box.add(msg0)
   1298         self.assertEqual(self._box.get_labels(), ['foo'])
   1299         msg1 = mailbox.BabylMessage(self._template % 1)
   1300         msg1.set_labels(['bar', 'answered', 'foo'])
   1301         key1 = self._box.add(msg1)
   1302         self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar']))
   1303         msg0.set_labels(['blah', 'filed'])
   1304         self._box[key0] = msg0
   1305         self.assertEqual(set(self._box.get_labels()),
   1306                      set(['foo', 'bar', 'blah']))
   1307         self._box.remove(key1)
   1308         self.assertEqual(set(self._box.get_labels()), set(['blah']))
   1309 
   1310 
   1311 class FakeFileLikeObject:
   1312 
   1313     def __init__(self):
   1314         self.closed = False
   1315 
   1316     def close(self):
   1317         self.closed = True
   1318 
   1319 
   1320 class FakeMailBox(mailbox.Mailbox):
   1321 
   1322     def __init__(self):
   1323         mailbox.Mailbox.__init__(self, '', lambda file: None)
   1324         self.files = [FakeFileLikeObject() for i in range(10)]
   1325 
   1326     def get_file(self, key):
   1327         return self.files[key]
   1328 
   1329 
   1330 class TestFakeMailBox(unittest.TestCase):
   1331 
   1332     def test_closing_fd(self):
   1333         box = FakeMailBox()
   1334         for i in range(10):
   1335             self.assertFalse(box.files[i].closed)
   1336         for i in range(10):
   1337             box[i]
   1338         for i in range(10):
   1339             self.assertTrue(box.files[i].closed)
   1340 
   1341 
   1342 class TestMessage(TestBase, unittest.TestCase):
   1343 
   1344     _factory = mailbox.Message      # Overridden by subclasses to reuse tests
   1345 
   1346     def setUp(self):
   1347         self._path = support.TESTFN
   1348 
   1349     def tearDown(self):
   1350         self._delete_recursively(self._path)
   1351 
   1352     def test_initialize_with_eMM(self):
   1353         # Initialize based on email.message.Message instance
   1354         eMM = email.message_from_string(_sample_message)
   1355         msg = self._factory(eMM)
   1356         self._post_initialize_hook(msg)
   1357         self._check_sample(msg)
   1358 
   1359     def test_initialize_with_string(self):
   1360         # Initialize based on string
   1361         msg = self._factory(_sample_message)
   1362         self._post_initialize_hook(msg)
   1363         self._check_sample(msg)
   1364 
   1365     def test_initialize_with_file(self):
   1366         # Initialize based on contents of file
   1367         with open(self._path, 'w+') as f:
   1368             f.write(_sample_message)
   1369             f.seek(0)
   1370             msg = self._factory(f)
   1371             self._post_initialize_hook(msg)
   1372             self._check_sample(msg)
   1373 
   1374     def test_initialize_with_binary_file(self):
   1375         # Initialize based on contents of binary file
   1376         with open(self._path, 'wb+') as f:
   1377             f.write(_bytes_sample_message)
   1378             f.seek(0)
   1379             msg = self._factory(f)
   1380             self._post_initialize_hook(msg)
   1381             self._check_sample(msg)
   1382 
   1383     def test_initialize_with_nothing(self):
   1384         # Initialize without arguments
   1385         msg = self._factory()
   1386         self._post_initialize_hook(msg)
   1387         self.assertIsInstance(msg, email.message.Message)
   1388         self.assertIsInstance(msg, mailbox.Message)
   1389         self.assertIsInstance(msg, self._factory)
   1390         self.assertEqual(msg.keys(), [])
   1391         self.assertFalse(msg.is_multipart())
   1392         self.assertIsNone(msg.get_payload())
   1393 
   1394     def test_initialize_incorrectly(self):
   1395         # Initialize with invalid argument
   1396         self.assertRaises(TypeError, lambda: self._factory(object()))
   1397 
   1398     def test_all_eMM_attribues_exist(self):
   1399         # Issue 12537
   1400         eMM = email.message_from_string(_sample_message)
   1401         msg = self._factory(_sample_message)
   1402         for attr in eMM.__dict__:
   1403             self.assertIn(attr, msg.__dict__,
   1404                 '{} attribute does not exist'.format(attr))
   1405 
   1406     def test_become_message(self):
   1407         # Take on the state of another message
   1408         eMM = email.message_from_string(_sample_message)
   1409         msg = self._factory()
   1410         msg._become_message(eMM)
   1411         self._check_sample(msg)
   1412 
   1413     def test_explain_to(self):
   1414         # Copy self's format-specific data to other message formats.
   1415         # This test is superficial; better ones are in TestMessageConversion.
   1416         msg = self._factory()
   1417         for class_ in self.all_mailbox_types:
   1418             other_msg = class_()
   1419             msg._explain_to(other_msg)
   1420         other_msg = email.message.Message()
   1421         self.assertRaises(TypeError, lambda: msg._explain_to(other_msg))
   1422 
   1423     def _post_initialize_hook(self, msg):
   1424         # Overridden by subclasses to check extra things after initialization
   1425         pass
   1426 
   1427 
   1428 class TestMaildirMessage(TestMessage, unittest.TestCase):
   1429 
   1430     _factory = mailbox.MaildirMessage
   1431 
   1432     def _post_initialize_hook(self, msg):
   1433         self.assertEqual(msg._subdir, 'new')
   1434         self.assertEqual(msg._info, '')
   1435 
   1436     def test_subdir(self):
   1437         # Use get_subdir() and set_subdir()
   1438         msg = mailbox.MaildirMessage(_sample_message)
   1439         self.assertEqual(msg.get_subdir(), 'new')
   1440         msg.set_subdir('cur')
   1441         self.assertEqual(msg.get_subdir(), 'cur')
   1442         msg.set_subdir('new')
   1443         self.assertEqual(msg.get_subdir(), 'new')
   1444         self.assertRaises(ValueError, lambda: msg.set_subdir('tmp'))
   1445         self.assertEqual(msg.get_subdir(), 'new')
   1446         msg.set_subdir('new')
   1447         self.assertEqual(msg.get_subdir(), 'new')
   1448         self._check_sample(msg)
   1449 
   1450     def test_flags(self):
   1451         # Use get_flags(), set_flags(), add_flag(), remove_flag()
   1452         msg = mailbox.MaildirMessage(_sample_message)
   1453         self.assertEqual(msg.get_flags(), '')
   1454         self.assertEqual(msg.get_subdir(), 'new')
   1455         msg.set_flags('F')
   1456         self.assertEqual(msg.get_subdir(), 'new')
   1457         self.assertEqual(msg.get_flags(), 'F')
   1458         msg.set_flags('SDTP')
   1459         self.assertEqual(msg.get_flags(), 'DPST')
   1460         msg.add_flag('FT')
   1461         self.assertEqual(msg.get_flags(), 'DFPST')
   1462         msg.remove_flag('TDRP')
   1463         self.assertEqual(msg.get_flags(), 'FS')
   1464         self.assertEqual(msg.get_subdir(), 'new')
   1465         self._check_sample(msg)
   1466 
   1467     def test_date(self):
   1468         # Use get_date() and set_date()
   1469         msg = mailbox.MaildirMessage(_sample_message)
   1470         self.assertLess(abs(msg.get_date() - time.time()), 60)
   1471         msg.set_date(0.0)
   1472         self.assertEqual(msg.get_date(), 0.0)
   1473 
   1474     def test_info(self):
   1475         # Use get_info() and set_info()
   1476         msg = mailbox.MaildirMessage(_sample_message)
   1477         self.assertEqual(msg.get_info(), '')
   1478         msg.set_info('1,foo=bar')
   1479         self.assertEqual(msg.get_info(), '1,foo=bar')
   1480         self.assertRaises(TypeError, lambda: msg.set_info(None))
   1481         self._check_sample(msg)
   1482 
   1483     def test_info_and_flags(self):
   1484         # Test interaction of info and flag methods
   1485         msg = mailbox.MaildirMessage(_sample_message)
   1486         self.assertEqual(msg.get_info(), '')
   1487         msg.set_flags('SF')
   1488         self.assertEqual(msg.get_flags(), 'FS')
   1489         self.assertEqual(msg.get_info(), '2,FS')
   1490         msg.set_info('1,')
   1491         self.assertEqual(msg.get_flags(), '')
   1492         self.assertEqual(msg.get_info(), '1,')
   1493         msg.remove_flag('RPT')
   1494         self.assertEqual(msg.get_flags(), '')
   1495         self.assertEqual(msg.get_info(), '1,')
   1496         msg.add_flag('D')
   1497         self.assertEqual(msg.get_flags(), 'D')
   1498         self.assertEqual(msg.get_info(), '2,D')
   1499         self._check_sample(msg)
   1500 
   1501 
   1502 class _TestMboxMMDFMessage:
   1503 
   1504     _factory = mailbox._mboxMMDFMessage
   1505 
   1506     def _post_initialize_hook(self, msg):
   1507         self._check_from(msg)
   1508 
   1509     def test_initialize_with_unixfrom(self):
   1510         # Initialize with a message that already has a _unixfrom attribute
   1511         msg = mailbox.Message(_sample_message)
   1512         msg.set_unixfrom('From foo@bar blah')
   1513         msg = mailbox.mboxMessage(msg)
   1514         self.assertEqual(msg.get_from(), 'foo@bar blah', msg.get_from())
   1515 
   1516     def test_from(self):
   1517         # Get and set "From " line
   1518         msg = mailbox.mboxMessage(_sample_message)
   1519         self._check_from(msg)
   1520         msg.set_from('foo bar')
   1521         self.assertEqual(msg.get_from(), 'foo bar')
   1522         msg.set_from('foo@bar', True)
   1523         self._check_from(msg, 'foo@bar')
   1524         msg.set_from('blah@temp', time.localtime())
   1525         self._check_from(msg, 'blah@temp')
   1526 
   1527     def test_flags(self):
   1528         # Use get_flags(), set_flags(), add_flag(), remove_flag()
   1529         msg = mailbox.mboxMessage(_sample_message)
   1530         self.assertEqual(msg.get_flags(), '')
   1531         msg.set_flags('F')
   1532         self.assertEqual(msg.get_flags(), 'F')
   1533         msg.set_flags('XODR')
   1534         self.assertEqual(msg.get_flags(), 'RODX')
   1535         msg.add_flag('FA')
   1536         self.assertEqual(msg.get_flags(), 'RODFAX')
   1537         msg.remove_flag('FDXA')
   1538         self.assertEqual(msg.get_flags(), 'RO')
   1539         self._check_sample(msg)
   1540 
   1541     def _check_from(self, msg, sender=None):
   1542         # Check contents of "From " line
   1543         if sender is None:
   1544             sender = "MAILER-DAEMON"
   1545         self.assertIsNotNone(re.match(
   1546                 sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:\d{2} \d{4}",
   1547                 msg.get_from()))
   1548 
   1549 
   1550 class TestMboxMessage(_TestMboxMMDFMessage, TestMessage):
   1551 
   1552     _factory = mailbox.mboxMessage
   1553 
   1554 
   1555 class TestMHMessage(TestMessage, unittest.TestCase):
   1556 
   1557     _factory = mailbox.MHMessage
   1558 
   1559     def _post_initialize_hook(self, msg):
   1560         self.assertEqual(msg._sequences, [])
   1561 
   1562     def test_sequences(self):
   1563         # Get, set, join, and leave sequences
   1564         msg = mailbox.MHMessage(_sample_message)
   1565         self.assertEqual(msg.get_sequences(), [])
   1566         msg.set_sequences(['foobar'])
   1567         self.assertEqual(msg.get_sequences(), ['foobar'])
   1568         msg.set_sequences([])
   1569         self.assertEqual(msg.get_sequences(), [])
   1570         msg.add_sequence('unseen')
   1571         self.assertEqual(msg.get_sequences(), ['unseen'])
   1572         msg.add_sequence('flagged')
   1573         self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
   1574         msg.add_sequence('flagged')
   1575         self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
   1576         msg.remove_sequence('unseen')
   1577         self.assertEqual(msg.get_sequences(), ['flagged'])
   1578         msg.add_sequence('foobar')
   1579         self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
   1580         msg.remove_sequence('replied')
   1581         self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
   1582         msg.set_sequences(['foobar', 'replied'])
   1583         self.assertEqual(msg.get_sequences(), ['foobar', 'replied'])
   1584 
   1585 
   1586 class TestBabylMessage(TestMessage, unittest.TestCase):
   1587 
   1588     _factory = mailbox.BabylMessage
   1589 
   1590     def _post_initialize_hook(self, msg):
   1591         self.assertEqual(msg._labels, [])
   1592 
   1593     def test_labels(self):
   1594         # Get, set, join, and leave labels
   1595         msg = mailbox.BabylMessage(_sample_message)
   1596         self.assertEqual(msg.get_labels(), [])
   1597         msg.set_labels(['foobar'])
   1598         self.assertEqual(msg.get_labels(), ['foobar'])
   1599         msg.set_labels([])
   1600         self.assertEqual(msg.get_labels(), [])
   1601         msg.add_label('filed')
   1602         self.assertEqual(msg.get_labels(), ['filed'])
   1603         msg.add_label('resent')
   1604         self.assertEqual(msg.get_labels(), ['filed', 'resent'])
   1605         msg.add_label('resent')
   1606         self.assertEqual(msg.get_labels(), ['filed', 'resent'])
   1607         msg.remove_label('filed')
   1608         self.assertEqual(msg.get_labels(), ['resent'])
   1609         msg.add_label('foobar')
   1610         self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
   1611         msg.remove_label('unseen')
   1612         self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
   1613         msg.set_labels(['foobar', 'answered'])
   1614         self.assertEqual(msg.get_labels(), ['foobar', 'answered'])
   1615 
   1616     def test_visible(self):
   1617         # Get, set, and update visible headers
   1618         msg = mailbox.BabylMessage(_sample_message)
   1619         visible = msg.get_visible()
   1620         self.assertEqual(visible.keys(), [])
   1621         self.assertIsNone(visible.get_payload())
   1622         visible['User-Agent'] = 'FooBar 1.0'
   1623         visible['X-Whatever'] = 'Blah'
   1624         self.assertEqual(msg.get_visible().keys(), [])
   1625         msg.set_visible(visible)
   1626         visible = msg.get_visible()
   1627         self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
   1628         self.assertEqual(visible['User-Agent'], 'FooBar 1.0')
   1629         self.assertEqual(visible['X-Whatever'], 'Blah')
   1630         self.assertIsNone(visible.get_payload())
   1631         msg.update_visible()
   1632         self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
   1633         self.assertIsNone(visible.get_payload())
   1634         visible = msg.get_visible()
   1635         self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To',
   1636                                           'Subject'])
   1637         for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
   1638             self.assertEqual(visible[header], msg[header])
   1639 
   1640 
   1641 class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage):
   1642 
   1643     _factory = mailbox.MMDFMessage
   1644 
   1645 
   1646 class TestMessageConversion(TestBase, unittest.TestCase):
   1647 
   1648     def test_plain_to_x(self):
   1649         # Convert Message to all formats
   1650         for class_ in self.all_mailbox_types:
   1651             msg_plain = mailbox.Message(_sample_message)
   1652             msg = class_(msg_plain)
   1653             self._check_sample(msg)
   1654 
   1655     def test_x_to_plain(self):
   1656         # Convert all formats to Message
   1657         for class_ in self.all_mailbox_types:
   1658             msg = class_(_sample_message)
   1659             msg_plain = mailbox.Message(msg)
   1660             self._check_sample(msg_plain)
   1661 
   1662     def test_x_from_bytes(self):
   1663         # Convert all formats to Message
   1664         for class_ in self.all_mailbox_types:
   1665             msg = class_(_bytes_sample_message)
   1666             self._check_sample(msg)
   1667 
   1668     def test_x_to_invalid(self):
   1669         # Convert all formats to an invalid format
   1670         for class_ in self.all_mailbox_types:
   1671             self.assertRaises(TypeError, lambda: class_(False))
   1672 
   1673     def test_type_specific_attributes_removed_on_conversion(self):
   1674         reference = {class_: class_(_sample_message).__dict__
   1675                         for class_ in self.all_mailbox_types}
   1676         for class1 in self.all_mailbox_types:
   1677             for class2 in self.all_mailbox_types:
   1678                 if class1 is class2:
   1679                     continue
   1680                 source = class1(_sample_message)
   1681                 target = class2(source)
   1682                 type_specific = [a for a in reference[class1]
   1683                                    if a not in reference[class2]]
   1684                 for attr in type_specific:
   1685                     self.assertNotIn(attr, target.__dict__,
   1686                         "while converting {} to {}".format(class1, class2))
   1687 
   1688     def test_maildir_to_maildir(self):
   1689         # Convert MaildirMessage to MaildirMessage
   1690         msg_maildir = mailbox.MaildirMessage(_sample_message)
   1691         msg_maildir.set_flags('DFPRST')
   1692         msg_maildir.set_subdir('cur')
   1693         date = msg_maildir.get_date()
   1694         msg = mailbox.MaildirMessage(msg_maildir)
   1695         self._check_sample(msg)
   1696         self.assertEqual(msg.get_flags(), 'DFPRST')
   1697         self.assertEqual(msg.get_subdir(), 'cur')
   1698         self.assertEqual(msg.get_date(), date)
   1699 
   1700     def test_maildir_to_mboxmmdf(self):
   1701         # Convert MaildirMessage to mboxmessage and MMDFMessage
   1702         pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),
   1703                  ('T', 'D'), ('DFPRST', 'RDFA'))
   1704         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1705             msg_maildir = mailbox.MaildirMessage(_sample_message)
   1706             msg_maildir.set_date(0.0)
   1707             for setting, result in pairs:
   1708                 msg_maildir.set_flags(setting)
   1709                 msg = class_(msg_maildir)
   1710                 self.assertEqual(msg.get_flags(), result)
   1711                 self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' %
   1712                              time.asctime(time.gmtime(0.0)))
   1713             msg_maildir.set_subdir('cur')
   1714             self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA')
   1715 
   1716     def test_maildir_to_mh(self):
   1717         # Convert MaildirMessage to MHMessage
   1718         msg_maildir = mailbox.MaildirMessage(_sample_message)
   1719         pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']),
   1720                  ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),
   1721                  ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))
   1722         for setting, result in pairs:
   1723             msg_maildir.set_flags(setting)
   1724             self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(),
   1725                              result)
   1726 
   1727     def test_maildir_to_babyl(self):
   1728         # Convert MaildirMessage to Babyl
   1729         msg_maildir = mailbox.MaildirMessage(_sample_message)
   1730         pairs = (('D', ['unseen']), ('F', ['unseen']),
   1731                  ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),
   1732                  ('S', []), ('T', ['unseen', 'deleted']),
   1733                  ('DFPRST', ['deleted', 'answered', 'forwarded']))
   1734         for setting, result in pairs:
   1735             msg_maildir.set_flags(setting)
   1736             self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(),
   1737                              result)
   1738 
   1739     def test_mboxmmdf_to_maildir(self):
   1740         # Convert mboxMessage and MMDFMessage to MaildirMessage
   1741         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1742             msg_mboxMMDF = class_(_sample_message)
   1743             msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0))
   1744             pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),
   1745                      ('RODFA', 'FRST'))
   1746             for setting, result in pairs:
   1747                 msg_mboxMMDF.set_flags(setting)
   1748                 msg = mailbox.MaildirMessage(msg_mboxMMDF)
   1749                 self.assertEqual(msg.get_flags(), result)
   1750                 self.assertEqual(msg.get_date(), 0.0)
   1751             msg_mboxMMDF.set_flags('O')
   1752             self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(),
   1753                              'cur')
   1754 
   1755     def test_mboxmmdf_to_mboxmmdf(self):
   1756         # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage
   1757         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1758             msg_mboxMMDF = class_(_sample_message)
   1759             msg_mboxMMDF.set_flags('RODFA')
   1760             msg_mboxMMDF.set_from('foo@bar')
   1761             for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1762                 msg2 = class2_(msg_mboxMMDF)
   1763                 self.assertEqual(msg2.get_flags(), 'RODFA')
   1764                 self.assertEqual(msg2.get_from(), 'foo@bar')
   1765 
   1766     def test_mboxmmdf_to_mh(self):
   1767         # Convert mboxMessage and MMDFMessage to MHMessage
   1768         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1769             msg_mboxMMDF = class_(_sample_message)
   1770             pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']),
   1771                      ('F', ['unseen', 'flagged']),
   1772                      ('A', ['unseen', 'replied']),
   1773                      ('RODFA', ['replied', 'flagged']))
   1774             for setting, result in pairs:
   1775                 msg_mboxMMDF.set_flags(setting)
   1776                 self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(),
   1777                                  result)
   1778 
   1779     def test_mboxmmdf_to_babyl(self):
   1780         # Convert mboxMessage and MMDFMessage to BabylMessage
   1781         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1782             msg = class_(_sample_message)
   1783             pairs = (('R', []), ('O', ['unseen']),
   1784                      ('D', ['unseen', 'deleted']), ('F', ['unseen']),
   1785                      ('A', ['unseen', 'answered']),
   1786                      ('RODFA', ['deleted', 'answered']))
   1787             for setting, result in pairs:
   1788                 msg.set_flags(setting)
   1789                 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
   1790 
   1791     def test_mh_to_maildir(self):
   1792         # Convert MHMessage to MaildirMessage
   1793         pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))
   1794         for setting, result in pairs:
   1795             msg = mailbox.MHMessage(_sample_message)
   1796             msg.add_sequence(setting)
   1797             self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
   1798             self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
   1799         msg = mailbox.MHMessage(_sample_message)
   1800         msg.add_sequence('unseen')
   1801         msg.add_sequence('replied')
   1802         msg.add_sequence('flagged')
   1803         self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR')
   1804         self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
   1805 
   1806     def test_mh_to_mboxmmdf(self):
   1807         # Convert MHMessage to mboxMessage and MMDFMessage
   1808         pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))
   1809         for setting, result in pairs:
   1810             msg = mailbox.MHMessage(_sample_message)
   1811             msg.add_sequence(setting)
   1812             for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1813                 self.assertEqual(class_(msg).get_flags(), result)
   1814         msg = mailbox.MHMessage(_sample_message)
   1815         msg.add_sequence('unseen')
   1816         msg.add_sequence('replied')
   1817         msg.add_sequence('flagged')
   1818         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1819             self.assertEqual(class_(msg).get_flags(), 'OFA')
   1820 
   1821     def test_mh_to_mh(self):
   1822         # Convert MHMessage to MHMessage
   1823         msg = mailbox.MHMessage(_sample_message)
   1824         msg.add_sequence('unseen')
   1825         msg.add_sequence('replied')
   1826         msg.add_sequence('flagged')
   1827         self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
   1828                          ['unseen', 'replied', 'flagged'])
   1829 
   1830     def test_mh_to_babyl(self):
   1831         # Convert MHMessage to BabylMessage
   1832         pairs = (('unseen', ['unseen']), ('replied', ['answered']),
   1833                  ('flagged', []))
   1834         for setting, result in pairs:
   1835             msg = mailbox.MHMessage(_sample_message)
   1836             msg.add_sequence(setting)
   1837             self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
   1838         msg = mailbox.MHMessage(_sample_message)
   1839         msg.add_sequence('unseen')
   1840         msg.add_sequence('replied')
   1841         msg.add_sequence('flagged')
   1842         self.assertEqual(mailbox.BabylMessage(msg).get_labels(),
   1843                          ['unseen', 'answered'])
   1844 
   1845     def test_babyl_to_maildir(self):
   1846         # Convert BabylMessage to MaildirMessage
   1847         pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),
   1848                  ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),
   1849                  ('resent', 'PS'))
   1850         for setting, result in pairs:
   1851             msg = mailbox.BabylMessage(_sample_message)
   1852             msg.add_label(setting)
   1853             self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
   1854             self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
   1855         msg = mailbox.BabylMessage(_sample_message)
   1856         for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
   1857                       'edited', 'resent'):
   1858             msg.add_label(label)
   1859         self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT')
   1860         self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
   1861 
   1862     def test_babyl_to_mboxmmdf(self):
   1863         # Convert BabylMessage to mboxMessage and MMDFMessage
   1864         pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),
   1865                  ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),
   1866                  ('resent', 'RO'))
   1867         for setting, result in pairs:
   1868             for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1869                 msg = mailbox.BabylMessage(_sample_message)
   1870                 msg.add_label(setting)
   1871                 self.assertEqual(class_(msg).get_flags(), result)
   1872         msg = mailbox.BabylMessage(_sample_message)
   1873         for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
   1874                       'edited', 'resent'):
   1875             msg.add_label(label)
   1876         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
   1877             self.assertEqual(class_(msg).get_flags(), 'ODA')
   1878 
   1879     def test_babyl_to_mh(self):
   1880         # Convert BabylMessage to MHMessage
   1881         pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []),
   1882                  ('answered', ['replied']), ('forwarded', []), ('edited', []),
   1883                  ('resent', []))
   1884         for setting, result in pairs:
   1885             msg = mailbox.BabylMessage(_sample_message)
   1886             msg.add_label(setting)
   1887             self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result)
   1888         msg = mailbox.BabylMessage(_sample_message)
   1889         for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
   1890                       'edited', 'resent'):
   1891             msg.add_label(label)
   1892         self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
   1893                          ['unseen', 'replied'])
   1894 
   1895     def test_babyl_to_babyl(self):
   1896         # Convert BabylMessage to BabylMessage
   1897         msg = mailbox.BabylMessage(_sample_message)
   1898         msg.update_visible()
   1899         for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
   1900                       'edited', 'resent'):
   1901             msg.add_label(label)
   1902         msg2 = mailbox.BabylMessage(msg)
   1903         self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed',
   1904                                              'answered', 'forwarded', 'edited',
   1905                                              'resent'])
   1906         self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys())
   1907         for key in msg.get_visible().keys():
   1908             self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key])
   1909 
   1910 
   1911 class TestProxyFileBase(TestBase):
   1912 
   1913     def _test_read(self, proxy):
   1914         # Read by byte
   1915         proxy.seek(0)
   1916         self.assertEqual(proxy.read(), b'bar')
   1917         proxy.seek(1)
   1918         self.assertEqual(proxy.read(), b'ar')
   1919         proxy.seek(0)
   1920         self.assertEqual(proxy.read(2), b'ba')
   1921         proxy.seek(1)
   1922         self.assertEqual(proxy.read(-1), b'ar')
   1923         proxy.seek(2)
   1924         self.assertEqual(proxy.read(1000), b'r')
   1925 
   1926     def _test_readline(self, proxy):
   1927         # Read by line
   1928         linesep = os.linesep.encode()
   1929         proxy.seek(0)
   1930         self.assertEqual(proxy.readline(), b'foo' + linesep)
   1931         self.assertEqual(proxy.readline(), b'bar' + linesep)
   1932         self.assertEqual(proxy.readline(), b'fred' + linesep)
   1933         self.assertEqual(proxy.readline(), b'bob')
   1934         proxy.seek(2)
   1935         self.assertEqual(proxy.readline(), b'o' + linesep)
   1936         proxy.seek(6 + 2 * len(os.linesep))
   1937         self.assertEqual(proxy.readline(), b'fred' + linesep)
   1938         proxy.seek(6 + 2 * len(os.linesep))
   1939         self.assertEqual(proxy.readline(2), b'fr')
   1940         self.assertEqual(proxy.readline(-10), b'ed' + linesep)
   1941 
   1942     def _test_readlines(self, proxy):
   1943         # Read multiple lines
   1944         linesep = os.linesep.encode()
   1945         proxy.seek(0)
   1946         self.assertEqual(proxy.readlines(), [b'foo' + linesep,
   1947                                            b'bar' + linesep,
   1948                                            b'fred' + linesep, b'bob'])
   1949         proxy.seek(0)
   1950         self.assertEqual(proxy.readlines(2), [b'foo' + linesep])
   1951         proxy.seek(3 + len(linesep))
   1952         self.assertEqual(proxy.readlines(4 + len(linesep)),
   1953                      [b'bar' + linesep, b'fred' + linesep])
   1954         proxy.seek(3)
   1955         self.assertEqual(proxy.readlines(1000), [linesep, b'bar' + linesep,
   1956                                                b'fred' + linesep, b'bob'])
   1957 
   1958     def _test_iteration(self, proxy):
   1959         # Iterate by line
   1960         linesep = os.linesep.encode()
   1961         proxy.seek(0)
   1962         iterator = iter(proxy)
   1963         self.assertEqual(next(iterator), b'foo' + linesep)
   1964         self.assertEqual(next(iterator), b'bar' + linesep)
   1965         self.assertEqual(next(iterator), b'fred' + linesep)
   1966         self.assertEqual(next(iterator), b'bob')
   1967         self.assertRaises(StopIteration, next, iterator)
   1968 
   1969     def _test_seek_and_tell(self, proxy):
   1970         # Seek and use tell to check position
   1971         linesep = os.linesep.encode()
   1972         proxy.seek(3)
   1973         self.assertEqual(proxy.tell(), 3)
   1974         self.assertEqual(proxy.read(len(linesep)), linesep)
   1975         proxy.seek(2, 1)
   1976         self.assertEqual(proxy.read(1 + len(linesep)), b'r' + linesep)
   1977         proxy.seek(-3 - len(linesep), 2)
   1978         self.assertEqual(proxy.read(3), b'bar')
   1979         proxy.seek(2, 0)
   1980         self.assertEqual(proxy.read(), b'o' + linesep + b'bar' + linesep)
   1981         proxy.seek(100)
   1982         self.assertFalse(proxy.read())
   1983 
   1984     def _test_close(self, proxy):
   1985         # Close a file
   1986         self.assertFalse(proxy.closed)
   1987         proxy.close()
   1988         self.assertTrue(proxy.closed)
   1989         # Issue 11700 subsequent closes should be a no-op.
   1990         proxy.close()
   1991         self.assertTrue(proxy.closed)
   1992 
   1993 
   1994 class TestProxyFile(TestProxyFileBase, unittest.TestCase):
   1995 
   1996     def setUp(self):
   1997         self._path = support.TESTFN
   1998         self._file = open(self._path, 'wb+')
   1999 
   2000     def tearDown(self):
   2001         self._file.close()
   2002         self._delete_recursively(self._path)
   2003 
   2004     def test_initialize(self):
   2005         # Initialize and check position
   2006         self._file.write(b'foo')
   2007         pos = self._file.tell()
   2008         proxy0 = mailbox._ProxyFile(self._file)
   2009         self.assertEqual(proxy0.tell(), pos)
   2010         self.assertEqual(self._file.tell(), pos)
   2011         proxy1 = mailbox._ProxyFile(self._file, 0)
   2012         self.assertEqual(proxy1.tell(), 0)
   2013         self.assertEqual(self._file.tell(), pos)
   2014 
   2015     def test_read(self):
   2016         self._file.write(b'bar')
   2017         self._test_read(mailbox._ProxyFile(self._file))
   2018 
   2019     def test_readline(self):
   2020         self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
   2021                                                   os.linesep), 'ascii'))
   2022         self._test_readline(mailbox._ProxyFile(self._file))
   2023 
   2024     def test_readlines(self):
   2025         self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
   2026                                                   os.linesep), 'ascii'))
   2027         self._test_readlines(mailbox._ProxyFile(self._file))
   2028 
   2029     def test_iteration(self):
   2030         self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
   2031                                                   os.linesep), 'ascii'))
   2032         self._test_iteration(mailbox._ProxyFile(self._file))
   2033 
   2034     def test_seek_and_tell(self):
   2035         self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
   2036         self._test_seek_and_tell(mailbox._ProxyFile(self._file))
   2037 
   2038     def test_close(self):
   2039         self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
   2040         self._test_close(mailbox._ProxyFile(self._file))
   2041 
   2042 
   2043 class TestPartialFile(TestProxyFileBase, unittest.TestCase):
   2044 
   2045     def setUp(self):
   2046         self._path = support.TESTFN
   2047         self._file = open(self._path, 'wb+')
   2048 
   2049     def tearDown(self):
   2050         self._file.close()
   2051         self._delete_recursively(self._path)
   2052 
   2053     def test_initialize(self):
   2054         # Initialize and check position
   2055         self._file.write(bytes('foo' + os.linesep + 'bar', 'ascii'))
   2056         pos = self._file.tell()
   2057         proxy = mailbox._PartialFile(self._file, 2, 5)
   2058         self.assertEqual(proxy.tell(), 0)
   2059         self.assertEqual(self._file.tell(), pos)
   2060 
   2061     def test_read(self):
   2062         self._file.write(bytes('***bar***', 'ascii'))
   2063         self._test_read(mailbox._PartialFile(self._file, 3, 6))
   2064 
   2065     def test_readline(self):
   2066         self._file.write(bytes('!!!!!foo%sbar%sfred%sbob!!!!!' %
   2067                          (os.linesep, os.linesep, os.linesep), 'ascii'))
   2068         self._test_readline(mailbox._PartialFile(self._file, 5,
   2069                                                  18 + 3 * len(os.linesep)))
   2070 
   2071     def test_readlines(self):
   2072         self._file.write(bytes('foo%sbar%sfred%sbob?????' %
   2073                          (os.linesep, os.linesep, os.linesep), 'ascii'))
   2074         self._test_readlines(mailbox._PartialFile(self._file, 0,
   2075                                                   13 + 3 * len(os.linesep)))
   2076 
   2077     def test_iteration(self):
   2078         self._file.write(bytes('____foo%sbar%sfred%sbob####' %
   2079                          (os.linesep, os.linesep, os.linesep), 'ascii'))
   2080         self._test_iteration(mailbox._PartialFile(self._file, 4,
   2081                                                   17 + 3 * len(os.linesep)))
   2082 
   2083     def test_seek_and_tell(self):
   2084         self._file.write(bytes('(((foo%sbar%s$$$' % (os.linesep, os.linesep), 'ascii'))
   2085         self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,
   2086                                                       9 + 2 * len(os.linesep)))
   2087 
   2088     def test_close(self):
   2089         self._file.write(bytes('&foo%sbar%s^' % (os.linesep, os.linesep), 'ascii'))
   2090         self._test_close(mailbox._PartialFile(self._file, 1,
   2091                                               6 + 3 * len(os.linesep)))
   2092 
   2093 
   2094 ## Start: tests from the original module (for backward compatibility).
   2095 
   2096 FROM_ = "From some.body (at] dummy.domain  Sat Jul 24 13:43:35 2004\n"
   2097 DUMMY_MESSAGE = """\
   2098 From: some.body (at] dummy.domain
   2099 To: me (at] my.domain
   2100 Subject: Simple Test
   2101 
   2102 This is a dummy message.
   2103 """
   2104 
   2105 class MaildirTestCase(unittest.TestCase):
   2106 
   2107     def setUp(self):
   2108         # create a new maildir mailbox to work with:
   2109         self._dir = support.TESTFN
   2110         if os.path.isdir(self._dir):
   2111             support.rmtree(self._dir)
   2112         elif os.path.isfile(self._dir):
   2113             support.unlink(self._dir)
   2114         os.mkdir(self._dir)
   2115         os.mkdir(os.path.join(self._dir, "cur"))
   2116         os.mkdir(os.path.join(self._dir, "tmp"))
   2117         os.mkdir(os.path.join(self._dir, "new"))
   2118         self._counter = 1
   2119         self._msgfiles = []
   2120 
   2121     def tearDown(self):
   2122         list(map(os.unlink, self._msgfiles))
   2123         support.rmdir(os.path.join(self._dir, "cur"))
   2124         support.rmdir(os.path.join(self._dir, "tmp"))
   2125         support.rmdir(os.path.join(self._dir, "new"))
   2126         support.rmdir(self._dir)
   2127 
   2128     def createMessage(self, dir, mbox=False):
   2129         t = int(time.time() % 1000000)
   2130         pid = self._counter
   2131         self._counter += 1
   2132         filename = ".".join((str(t), str(pid), "myhostname", "mydomain"))
   2133         tmpname = os.path.join(self._dir, "tmp", filename)
   2134         newname = os.path.join(self._dir, dir, filename)
   2135         with open(tmpname, "w") as fp:
   2136             self._msgfiles.append(tmpname)
   2137             if mbox:
   2138                 fp.write(FROM_)
   2139             fp.write(DUMMY_MESSAGE)
   2140         try:
   2141             os.link(tmpname, newname)
   2142         except (AttributeError, PermissionError):
   2143             with open(newname, "w") as fp:
   2144                 fp.write(DUMMY_MESSAGE)
   2145         self._msgfiles.append(newname)
   2146         return tmpname
   2147 
   2148     def test_empty_maildir(self):
   2149         """Test an empty maildir mailbox"""
   2150         # Test for regression on bug #117490:
   2151         # Make sure the boxes attribute actually gets set.
   2152         self.mbox = mailbox.Maildir(support.TESTFN)
   2153         #self.assertTrue(hasattr(self.mbox, "boxes"))
   2154         #self.assertEqual(len(self.mbox.boxes), 0)
   2155         self.assertIsNone(self.mbox.next())
   2156         self.assertIsNone(self.mbox.next())
   2157 
   2158     def test_nonempty_maildir_cur(self):
   2159         self.createMessage("cur")
   2160         self.mbox = mailbox.Maildir(support.TESTFN)
   2161         #self.assertEqual(len(self.mbox.boxes), 1)
   2162         self.assertIsNotNone(self.mbox.next())
   2163         self.assertIsNone(self.mbox.next())
   2164         self.assertIsNone(self.mbox.next())
   2165 
   2166     def test_nonempty_maildir_new(self):
   2167         self.createMessage("new")
   2168         self.mbox = mailbox.Maildir(support.TESTFN)
   2169         #self.assertEqual(len(self.mbox.boxes), 1)
   2170         self.assertIsNotNone(self.mbox.next())
   2171         self.assertIsNone(self.mbox.next())
   2172         self.assertIsNone(self.mbox.next())
   2173 
   2174     def test_nonempty_maildir_both(self):
   2175         self.createMessage("cur")
   2176         self.createMessage("new")
   2177         self.mbox = mailbox.Maildir(support.TESTFN)
   2178         #self.assertEqual(len(self.mbox.boxes), 2)
   2179         self.assertIsNotNone(self.mbox.next())
   2180         self.assertIsNotNone(self.mbox.next())
   2181         self.assertIsNone(self.mbox.next())
   2182         self.assertIsNone(self.mbox.next())
   2183 
   2184 ## End: tests from the original module (for backward compatibility).
   2185 
   2186 
   2187 _sample_message = """\
   2188 Return-Path: <gkj (at] gregorykjohnson.com>
   2189 X-Original-To: gkj+person@localhost
   2190 Delivered-To: gkj+person@localhost
   2191 Received: from localhost (localhost [127.0.0.1])
   2192         by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
   2193         for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
   2194 Delivered-To: gkj (at] sundance.gregorykjohnson.com
   2195 Received: from localhost [127.0.0.1]
   2196         by localhost with POP3 (fetchmail-6.2.5)
   2197         for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
   2198 Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
   2199         by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
   2200         for <gkj (at] gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
   2201 Received: by andy.gregorykjohnson.com (Postfix, from userid 1000)
   2202         id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
   2203 Date: Wed, 13 Jul 2005 17:23:11 -0400
   2204 From: "Gregory K. Johnson" <gkj (at] gregorykjohnson.com>
   2205 To: gkj (at] gregorykjohnson.com
   2206 Subject: Sample message
   2207 Message-ID: <20050713212311.GC4701 (at] andy.gregorykjohnson.com>
   2208 Mime-Version: 1.0
   2209 Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"
   2210 Content-Disposition: inline
   2211 User-Agent: Mutt/1.5.9i
   2212 
   2213 
   2214 --NMuMz9nt05w80d4+
   2215 Content-Type: text/plain; charset=us-ascii
   2216 Content-Disposition: inline
   2217 
   2218 This is a sample message.
   2219 
   2220 --
   2221 Gregory K. Johnson
   2222 
   2223 --NMuMz9nt05w80d4+
   2224 Content-Type: application/octet-stream
   2225 Content-Disposition: attachment; filename="text.gz"
   2226 Content-Transfer-Encoding: base64
   2227 
   2228 H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
   2229 3FYlAAAA
   2230 
   2231 --NMuMz9nt05w80d4+--
   2232 """
   2233 
   2234 _bytes_sample_message = _sample_message.encode('ascii')
   2235 
   2236 _sample_headers = {
   2237     "Return-Path":"<gkj (at] gregorykjohnson.com>",
   2238     "X-Original-To":"gkj+person@localhost",
   2239     "Delivered-To":"gkj+person@localhost",
   2240     "Received":"""from localhost (localhost [127.0.0.1])
   2241         by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
   2242         for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
   2243     "Delivered-To":"gkj (at] sundance.gregorykjohnson.com",
   2244     "Received":"""from localhost [127.0.0.1]
   2245         by localhost with POP3 (fetchmail-6.2.5)
   2246         for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
   2247     "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
   2248         by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
   2249         for <gkj (at] gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
   2250     "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)
   2251         id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
   2252     "Date":"Wed, 13 Jul 2005 17:23:11 -0400",
   2253     "From":""""Gregory K. Johnson" <gkj (at] gregorykjohnson.com>""",
   2254     "To":"gkj (at] gregorykjohnson.com",
   2255     "Subject":"Sample message",
   2256     "Mime-Version":"1.0",
   2257     "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",
   2258     "Content-Disposition":"inline",
   2259     "User-Agent": "Mutt/1.5.9i" }
   2260 
   2261 _sample_payloads = ("""This is a sample message.
   2262 
   2263 --
   2264 Gregory K. Johnson
   2265 """,
   2266 """H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
   2267 3FYlAAAA
   2268 """)
   2269 
   2270 
   2271 class MiscTestCase(unittest.TestCase):
   2272     def test__all__(self):
   2273         blacklist = {"linesep", "fcntl"}
   2274         support.check__all__(self, mailbox, blacklist=blacklist)
   2275 
   2276 
   2277 def test_main():
   2278     tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH,
   2279              TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage,
   2280              TestMHMessage, TestBabylMessage, TestMMDFMessage,
   2281              TestMessageConversion, TestProxyFile, TestPartialFile,
   2282              MaildirTestCase, TestFakeMailBox, MiscTestCase)
   2283     support.run_unittest(*tests)
   2284     support.reap_children()
   2285 
   2286 
   2287 if __name__ == '__main__':
   2288     test_main()
   2289