Home | History | Annotate | Download | only in test
      1 from test import support
      2 # If we end up with a significant number of tests that don't require
      3 # threading, this test module should be split.  Right now we skip
      4 # them all if we don't have threading.
      5 threading = support.import_module('threading')
      6 
      7 from contextlib import contextmanager
      8 import imaplib
      9 import os.path
     10 import socketserver
     11 import time
     12 import calendar
     13 import inspect
     14 
     15 from test.support import (reap_threads, verbose, transient_internet,
     16                           run_with_tz, run_with_locale)
     17 import unittest
     18 from unittest import mock
     19 from datetime import datetime, timezone, timedelta
     20 try:
     21     import ssl
     22 except ImportError:
     23     ssl = None
     24 
     25 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem")
     26 CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem")
     27 
     28 
     29 class TestImaplib(unittest.TestCase):
     30 
     31     def test_Internaldate2tuple(self):
     32         t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1))
     33         tt = imaplib.Internaldate2tuple(
     34             b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")')
     35         self.assertEqual(time.mktime(tt), t0)
     36         tt = imaplib.Internaldate2tuple(
     37             b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")')
     38         self.assertEqual(time.mktime(tt), t0)
     39         tt = imaplib.Internaldate2tuple(
     40             b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")')
     41         self.assertEqual(time.mktime(tt), t0)
     42 
     43     @run_with_tz('MST+07MDT,M4.1.0,M10.5.0')
     44     def test_Internaldate2tuple_issue10941(self):
     45         self.assertNotEqual(imaplib.Internaldate2tuple(
     46             b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'),
     47             imaplib.Internaldate2tuple(
     48                 b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")'))
     49 
     50     def timevalues(self):
     51         return [2000000000, 2000000000.0, time.localtime(2000000000),
     52                 (2033, 5, 18, 5, 33, 20, -1, -1, -1),
     53                 (2033, 5, 18, 5, 33, 20, -1, -1, 1),
     54                 datetime.fromtimestamp(2000000000,
     55                                        timezone(timedelta(0, 2 * 60 * 60))),
     56                 '"18-May-2033 05:33:20 +0200"']
     57 
     58     @run_with_locale('LC_ALL', 'de_DE', 'fr_FR')
     59     # DST rules included to work around quirk where the Gnu C library may not
     60     # otherwise restore the previous time zone
     61     @run_with_tz('STD-1DST,M3.2.0,M11.1.0')
     62     def test_Time2Internaldate(self):
     63         expected = '"18-May-2033 05:33:20 +0200"'
     64 
     65         for t in self.timevalues():
     66             internal = imaplib.Time2Internaldate(t)
     67             self.assertEqual(internal, expected)
     68 
     69     def test_that_Time2Internaldate_returns_a_result(self):
     70         # Without tzset, we can check only that it successfully
     71         # produces a result, not the correctness of the result itself,
     72         # since the result depends on the timezone the machine is in.
     73         for t in self.timevalues():
     74             imaplib.Time2Internaldate(t)
     75 
     76 
     77 if ssl:
     78     class SecureTCPServer(socketserver.TCPServer):
     79 
     80         def get_request(self):
     81             newsocket, fromaddr = self.socket.accept()
     82             context = ssl.SSLContext()
     83             context.load_cert_chain(CERTFILE)
     84             connstream = context.wrap_socket(newsocket, server_side=True)
     85             return connstream, fromaddr
     86 
     87     IMAP4_SSL = imaplib.IMAP4_SSL
     88 
     89 else:
     90 
     91     class SecureTCPServer:
     92         pass
     93 
     94     IMAP4_SSL = None
     95 
     96 
     97 class SimpleIMAPHandler(socketserver.StreamRequestHandler):
     98     timeout = 1
     99     continuation = None
    100     capabilities = ''
    101 
    102     def setup(self):
    103         super().setup()
    104         self.server.logged = None
    105 
    106     def _send(self, message):
    107         if verbose:
    108             print("SENT: %r" % message.strip())
    109         self.wfile.write(message)
    110 
    111     def _send_line(self, message):
    112         self._send(message + b'\r\n')
    113 
    114     def _send_textline(self, message):
    115         self._send_line(message.encode('ASCII'))
    116 
    117     def _send_tagged(self, tag, code, message):
    118         self._send_textline(' '.join((tag, code, message)))
    119 
    120     def handle(self):
    121         # Send a welcome message.
    122         self._send_textline('* OK IMAP4rev1')
    123         while 1:
    124             # Gather up input until we receive a line terminator or we timeout.
    125             # Accumulate read(1) because it's simpler to handle the differences
    126             # between naked sockets and SSL sockets.
    127             line = b''
    128             while 1:
    129                 try:
    130                     part = self.rfile.read(1)
    131                     if part == b'':
    132                         # Naked sockets return empty strings..
    133                         return
    134                     line += part
    135                 except OSError:
    136                     # ..but SSLSockets raise exceptions.
    137                     return
    138                 if line.endswith(b'\r\n'):
    139                     break
    140 
    141             if verbose:
    142                 print('GOT: %r' % line.strip())
    143             if self.continuation:
    144                 try:
    145                     self.continuation.send(line)
    146                 except StopIteration:
    147                     self.continuation = None
    148                 continue
    149             splitline = line.decode('ASCII').split()
    150             tag = splitline[0]
    151             cmd = splitline[1]
    152             args = splitline[2:]
    153 
    154             if hasattr(self, 'cmd_' + cmd):
    155                 continuation = getattr(self, 'cmd_' + cmd)(tag, args)
    156                 if continuation:
    157                     self.continuation = continuation
    158                     next(continuation)
    159             else:
    160                 self._send_tagged(tag, 'BAD', cmd + ' unknown')
    161 
    162     def cmd_CAPABILITY(self, tag, args):
    163         caps = ('IMAP4rev1 ' + self.capabilities
    164                 if self.capabilities
    165                 else 'IMAP4rev1')
    166         self._send_textline('* CAPABILITY ' + caps)
    167         self._send_tagged(tag, 'OK', 'CAPABILITY completed')
    168 
    169     def cmd_LOGOUT(self, tag, args):
    170         self.server.logged = None
    171         self._send_textline('* BYE IMAP4ref1 Server logging out')
    172         self._send_tagged(tag, 'OK', 'LOGOUT completed')
    173 
    174     def cmd_LOGIN(self, tag, args):
    175         self.server.logged = args[0]
    176         self._send_tagged(tag, 'OK', 'LOGIN completed')
    177 
    178 
    179 class NewIMAPTestsMixin():
    180     client = None
    181 
    182     def _setup(self, imap_handler, connect=True):
    183         """
    184         Sets up imap_handler for tests. imap_handler should inherit from either:
    185         - SimpleIMAPHandler - for testing IMAP commands,
    186         - socketserver.StreamRequestHandler - if raw access to stream is needed.
    187         Returns (client, server).
    188         """
    189         class TestTCPServer(self.server_class):
    190             def handle_error(self, request, client_address):
    191                 """
    192                 End request and raise the error if one occurs.
    193                 """
    194                 self.close_request(request)
    195                 self.server_close()
    196                 raise
    197 
    198         self.addCleanup(self._cleanup)
    199         self.server = self.server_class((support.HOST, 0), imap_handler)
    200         self.thread = threading.Thread(
    201             name=self._testMethodName+'-server',
    202             target=self.server.serve_forever,
    203             # Short poll interval to make the test finish quickly.
    204             # Time between requests is short enough that we won't wake
    205             # up spuriously too many times.
    206             kwargs={'poll_interval': 0.01})
    207         self.thread.daemon = True  # In case this function raises.
    208         self.thread.start()
    209 
    210         if connect:
    211             self.client = self.imap_class(*self.server.server_address)
    212 
    213         return self.client, self.server
    214 
    215     def _cleanup(self):
    216         """
    217         Cleans up the test server. This method should not be called manually,
    218         it is added to the cleanup queue in the _setup method already.
    219         """
    220         # if logout was called already we'd raise an exception trying to
    221         # shutdown the client once again
    222         if self.client is not None and self.client.state != 'LOGOUT':
    223             self.client.shutdown()
    224         # cleanup the server
    225         self.server.shutdown()
    226         self.server.server_close()
    227         self.thread.join(3.0)
    228 
    229     def test_EOF_without_complete_welcome_message(self):
    230         # http://bugs.python.org/issue5949
    231         class EOFHandler(socketserver.StreamRequestHandler):
    232             def handle(self):
    233                 self.wfile.write(b'* OK')
    234         _, server = self._setup(EOFHandler, connect=False)
    235         self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
    236                           *server.server_address)
    237 
    238     def test_line_termination(self):
    239         class BadNewlineHandler(SimpleIMAPHandler):
    240             def cmd_CAPABILITY(self, tag, args):
    241                 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
    242                 self._send_tagged(tag, 'OK', 'CAPABILITY completed')
    243         _, server = self._setup(BadNewlineHandler, connect=False)
    244         self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
    245                           *server.server_address)
    246 
    247     def test_enable_raises_error_if_not_AUTH(self):
    248         class EnableHandler(SimpleIMAPHandler):
    249             capabilities = 'AUTH ENABLE UTF8=ACCEPT'
    250         client, _ = self._setup(EnableHandler)
    251         self.assertFalse(client.utf8_enabled)
    252         with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'):
    253             client.enable('foo')
    254         self.assertFalse(client.utf8_enabled)
    255 
    256     def test_enable_raises_error_if_no_capability(self):
    257         client, _ = self._setup(SimpleIMAPHandler)
    258         with self.assertRaisesRegex(imaplib.IMAP4.error,
    259                 'does not support ENABLE'):
    260             client.enable('foo')
    261 
    262     def test_enable_UTF8_raises_error_if_not_supported(self):
    263         client, _ = self._setup(SimpleIMAPHandler)
    264         typ, data = client.login('user', 'pass')
    265         self.assertEqual(typ, 'OK')
    266         with self.assertRaisesRegex(imaplib.IMAP4.error,
    267                 'does not support ENABLE'):
    268             client.enable('UTF8=ACCEPT')
    269 
    270     def test_enable_UTF8_True_append(self):
    271         class UTF8AppendServer(SimpleIMAPHandler):
    272             capabilities = 'ENABLE UTF8=ACCEPT'
    273             def cmd_ENABLE(self, tag, args):
    274                 self._send_tagged(tag, 'OK', 'ENABLE successful')
    275             def cmd_AUTHENTICATE(self, tag, args):
    276                 self._send_textline('+')
    277                 self.server.response = yield
    278                 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
    279             def cmd_APPEND(self, tag, args):
    280                 self._send_textline('+')
    281                 self.server.response = yield
    282                 self._send_tagged(tag, 'OK', 'okay')
    283         client, server = self._setup(UTF8AppendServer)
    284         self.assertEqual(client._encoding, 'ascii')
    285         code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
    286         self.assertEqual(code, 'OK')
    287         self.assertEqual(server.response, b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
    288         code, _ = client.enable('UTF8=ACCEPT')
    289         self.assertEqual(code, 'OK')
    290         self.assertEqual(client._encoding, 'utf-8')
    291         msg_string = 'Subject: '
    292         typ, data = client.append(None, None, None, msg_string.encode('utf-8'))
    293         self.assertEqual(typ, 'OK')
    294         self.assertEqual(server.response,
    295             ('UTF8 (%s)\r\n' % msg_string).encode('utf-8'))
    296 
    297     def test_search_disallows_charset_in_utf8_mode(self):
    298         class UTF8Server(SimpleIMAPHandler):
    299             capabilities = 'AUTH ENABLE UTF8=ACCEPT'
    300             def cmd_ENABLE(self, tag, args):
    301                 self._send_tagged(tag, 'OK', 'ENABLE successful')
    302             def cmd_AUTHENTICATE(self, tag, args):
    303                 self._send_textline('+')
    304                 self.server.response = yield
    305                 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
    306         client, _ = self._setup(UTF8Server)
    307         typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
    308         self.assertEqual(typ, 'OK')
    309         typ, _ = client.enable('UTF8=ACCEPT')
    310         self.assertEqual(typ, 'OK')
    311         self.assertTrue(client.utf8_enabled)
    312         with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'):
    313             client.search('foo', 'bar')
    314 
    315     def test_bad_auth_name(self):
    316         class MyServer(SimpleIMAPHandler):
    317             def cmd_AUTHENTICATE(self, tag, args):
    318                 self._send_tagged(tag, 'NO',
    319                     'unrecognized authentication type {}'.format(args[0]))
    320         client, _ = self._setup(MyServer)
    321         with self.assertRaisesRegex(imaplib.IMAP4.error,
    322                 'unrecognized authentication type METHOD'):
    323             client.authenticate('METHOD', lambda: 1)
    324 
    325     def test_invalid_authentication(self):
    326         class MyServer(SimpleIMAPHandler):
    327             def cmd_AUTHENTICATE(self, tag, args):
    328                 self._send_textline('+')
    329                 self.response = yield
    330                 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
    331         client, _ = self._setup(MyServer)
    332         with self.assertRaisesRegex(imaplib.IMAP4.error,
    333                 r'\[AUTHENTICATIONFAILED\] invalid'):
    334             client.authenticate('MYAUTH', lambda x: b'fake')
    335 
    336     def test_valid_authentication_bytes(self):
    337         class MyServer(SimpleIMAPHandler):
    338             def cmd_AUTHENTICATE(self, tag, args):
    339                 self._send_textline('+')
    340                 self.server.response = yield
    341                 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
    342         client, server = self._setup(MyServer)
    343         code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
    344         self.assertEqual(code, 'OK')
    345         self.assertEqual(server.response, b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
    346 
    347     def test_valid_authentication_plain_text(self):
    348         class MyServer(SimpleIMAPHandler):
    349             def cmd_AUTHENTICATE(self, tag, args):
    350                 self._send_textline('+')
    351                 self.server.response = yield
    352                 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
    353         client, server = self._setup(MyServer)
    354         code, _ = client.authenticate('MYAUTH', lambda x: 'fake')
    355         self.assertEqual(code, 'OK')
    356         self.assertEqual(server.response, b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
    357 
    358     def test_login_cram_md5_bytes(self):
    359         class AuthHandler(SimpleIMAPHandler):
    360             capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
    361             def cmd_AUTHENTICATE(self, tag, args):
    362                 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
    363                                     'VzdG9uLm1jaS5uZXQ=')
    364                 r = yield
    365                 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
    366                          b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
    367                     self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
    368                 else:
    369                     self._send_tagged(tag, 'NO', 'No access')
    370         client, _ = self._setup(AuthHandler)
    371         self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
    372         ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf")
    373         self.assertEqual(ret, "OK")
    374 
    375     def test_login_cram_md5_plain_text(self):
    376         class AuthHandler(SimpleIMAPHandler):
    377             capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
    378             def cmd_AUTHENTICATE(self, tag, args):
    379                 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
    380                                     'VzdG9uLm1jaS5uZXQ=')
    381                 r = yield
    382                 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
    383                          b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
    384                     self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
    385                 else:
    386                     self._send_tagged(tag, 'NO', 'No access')
    387         client, _ = self._setup(AuthHandler)
    388         self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
    389         ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf")
    390         self.assertEqual(ret, "OK")
    391 
    392     def test_aborted_authentication(self):
    393         class MyServer(SimpleIMAPHandler):
    394             def cmd_AUTHENTICATE(self, tag, args):
    395                 self._send_textline('+')
    396                 self.response = yield
    397                 if self.response == b'*\r\n':
    398                     self._send_tagged(
    399                         tag,
    400                         'NO',
    401                         '[AUTHENTICATIONFAILED] aborted')
    402                 else:
    403                     self._send_tagged(tag, 'OK', 'MYAUTH successful')
    404         client, _ = self._setup(MyServer)
    405         with self.assertRaisesRegex(imaplib.IMAP4.error,
    406                 r'\[AUTHENTICATIONFAILED\] aborted'):
    407             client.authenticate('MYAUTH', lambda x: None)
    408 
    409     @mock.patch('imaplib._MAXLINE', 10)
    410     def test_linetoolong(self):
    411         class TooLongHandler(SimpleIMAPHandler):
    412             def handle(self):
    413                 # send response line longer than the limit set in the next line
    414                 self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n')
    415         _, server = self._setup(TooLongHandler, connect=False)
    416         with self.assertRaisesRegex(imaplib.IMAP4.error,
    417                 'got more than 10 bytes'):
    418             self.imap_class(*server.server_address)
    419 
    420     def test_simple_with_statement(self):
    421         _, server = self._setup(SimpleIMAPHandler, connect=False)
    422         with self.imap_class(*server.server_address):
    423             pass
    424 
    425     def test_with_statement(self):
    426         _, server = self._setup(SimpleIMAPHandler, connect=False)
    427         with self.imap_class(*server.server_address) as imap:
    428             imap.login('user', 'pass')
    429             self.assertEqual(server.logged, 'user')
    430         self.assertIsNone(server.logged)
    431 
    432     def test_with_statement_logout(self):
    433         # It is legal to log out explicitly inside the with block
    434         _, server = self._setup(SimpleIMAPHandler, connect=False)
    435         with self.imap_class(*server.server_address) as imap:
    436             imap.login('user', 'pass')
    437             self.assertEqual(server.logged, 'user')
    438             imap.logout()
    439             self.assertIsNone(server.logged)
    440         self.assertIsNone(server.logged)
    441 
    442     # command tests
    443 
    444     def test_login(self):
    445         client, _ = self._setup(SimpleIMAPHandler)
    446         typ, data = client.login('user', 'pass')
    447         self.assertEqual(typ, 'OK')
    448         self.assertEqual(data[0], b'LOGIN completed')
    449         self.assertEqual(client.state, 'AUTH')
    450 
    451     def test_logout(self):
    452         client, _ = self._setup(SimpleIMAPHandler)
    453         typ, data = client.login('user', 'pass')
    454         self.assertEqual(typ, 'OK')
    455         self.assertEqual(data[0], b'LOGIN completed')
    456         typ, data = client.logout()
    457         self.assertEqual(typ, 'BYE')
    458         self.assertEqual(data[0], b'IMAP4ref1 Server logging out')
    459         self.assertEqual(client.state, 'LOGOUT')
    460 
    461     def test_lsub(self):
    462         class LsubCmd(SimpleIMAPHandler):
    463             def cmd_LSUB(self, tag, args):
    464                 self._send_textline('* LSUB () "." directoryA')
    465                 return self._send_tagged(tag, 'OK', 'LSUB completed')
    466         client, _ = self._setup(LsubCmd)
    467         client.login('user', 'pass')
    468         typ, data = client.lsub()
    469         self.assertEqual(typ, 'OK')
    470         self.assertEqual(data[0], b'() "." directoryA')
    471 
    472 
    473 class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase):
    474     imap_class = imaplib.IMAP4
    475     server_class = socketserver.TCPServer
    476 
    477 
    478 @unittest.skipUnless(ssl, "SSL not available")
    479 class NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase):
    480     imap_class = IMAP4_SSL
    481     server_class = SecureTCPServer
    482 
    483     def test_ssl_raises(self):
    484         ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    485         ssl_context.verify_mode = ssl.CERT_REQUIRED
    486         ssl_context.check_hostname = True
    487         ssl_context.load_verify_locations(CAFILE)
    488 
    489         with self.assertRaisesRegex(ssl.CertificateError,
    490                 "hostname '127.0.0.1' doesn't match 'localhost'"):
    491             _, server = self._setup(SimpleIMAPHandler)
    492             client = self.imap_class(*server.server_address,
    493                                      ssl_context=ssl_context)
    494             client.shutdown()
    495 
    496     def test_ssl_verified(self):
    497         ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    498         ssl_context.verify_mode = ssl.CERT_REQUIRED
    499         ssl_context.check_hostname = True
    500         ssl_context.load_verify_locations(CAFILE)
    501 
    502         _, server = self._setup(SimpleIMAPHandler)
    503         client = self.imap_class("localhost", server.server_address[1],
    504                                  ssl_context=ssl_context)
    505         client.shutdown()
    506 
    507 class ThreadedNetworkedTests(unittest.TestCase):
    508     server_class = socketserver.TCPServer
    509     imap_class = imaplib.IMAP4
    510 
    511     def make_server(self, addr, hdlr):
    512 
    513         class MyServer(self.server_class):
    514             def handle_error(self, request, client_address):
    515                 self.close_request(request)
    516                 self.server_close()
    517                 raise
    518 
    519         if verbose:
    520             print("creating server")
    521         server = MyServer(addr, hdlr)
    522         self.assertEqual(server.server_address, server.socket.getsockname())
    523 
    524         if verbose:
    525             print("server created")
    526             print("ADDR =", addr)
    527             print("CLASS =", self.server_class)
    528             print("HDLR =", server.RequestHandlerClass)
    529 
    530         t = threading.Thread(
    531             name='%s serving' % self.server_class,
    532             target=server.serve_forever,
    533             # Short poll interval to make the test finish quickly.
    534             # Time between requests is short enough that we won't wake
    535             # up spuriously too many times.
    536             kwargs={'poll_interval': 0.01})
    537         t.daemon = True  # In case this function raises.
    538         t.start()
    539         if verbose:
    540             print("server running")
    541         return server, t
    542 
    543     def reap_server(self, server, thread):
    544         if verbose:
    545             print("waiting for server")
    546         server.shutdown()
    547         server.server_close()
    548         thread.join()
    549         if verbose:
    550             print("done")
    551 
    552     @contextmanager
    553     def reaped_server(self, hdlr):
    554         server, thread = self.make_server((support.HOST, 0), hdlr)
    555         try:
    556             yield server
    557         finally:
    558             self.reap_server(server, thread)
    559 
    560     @contextmanager
    561     def reaped_pair(self, hdlr):
    562         with self.reaped_server(hdlr) as server:
    563             client = self.imap_class(*server.server_address)
    564             try:
    565                 yield server, client
    566             finally:
    567                 client.logout()
    568 
    569     @reap_threads
    570     def test_connect(self):
    571         with self.reaped_server(SimpleIMAPHandler) as server:
    572             client = self.imap_class(*server.server_address)
    573             client.shutdown()
    574 
    575     @reap_threads
    576     def test_bracket_flags(self):
    577 
    578         # This violates RFC 3501, which disallows ']' characters in tag names,
    579         # but imaplib has allowed producing such tags forever, other programs
    580         # also produce them (eg: OtherInbox's Organizer app as of 20140716),
    581         # and Gmail, for example, accepts them and produces them.  So we
    582         # support them.  See issue #21815.
    583 
    584         class BracketFlagHandler(SimpleIMAPHandler):
    585 
    586             def handle(self):
    587                 self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']
    588                 super().handle()
    589 
    590             def cmd_AUTHENTICATE(self, tag, args):
    591                 self._send_textline('+')
    592                 self.server.response = yield
    593                 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
    594 
    595             def cmd_SELECT(self, tag, args):
    596                 flag_msg = ' \\'.join(self.flags)
    597                 self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii'))
    598                 self._send_line(b'* 2 EXISTS')
    599                 self._send_line(b'* 0 RECENT')
    600                 msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.'
    601                         % flag_msg)
    602                 self._send_line(msg.encode('ascii'))
    603                 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')
    604 
    605             def cmd_STORE(self, tag, args):
    606                 new_flags = args[2].strip('(').strip(')').split()
    607                 self.flags.extend(new_flags)
    608                 flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags)
    609                 msg = '* %s FETCH %s' % (args[0], flags_msg)
    610                 self._send_line(msg.encode('ascii'))
    611                 self._send_tagged(tag, 'OK', 'STORE completed.')
    612 
    613         with self.reaped_pair(BracketFlagHandler) as (server, client):
    614             code, data = client.authenticate('MYAUTH', lambda x: b'fake')
    615             self.assertEqual(code, 'OK')
    616             self.assertEqual(server.response, b'ZmFrZQ==\r\n')
    617             client.select('test')
    618             typ, [data] = client.store(b'1', "+FLAGS", "[test]")
    619             self.assertIn(b'[test]', data)
    620             client.select('test')
    621             typ, [data] = client.response('PERMANENTFLAGS')
    622             self.assertIn(b'[test]', data)
    623 
    624     @reap_threads
    625     def test_issue5949(self):
    626 
    627         class EOFHandler(socketserver.StreamRequestHandler):
    628             def handle(self):
    629                 # EOF without sending a complete welcome message.
    630                 self.wfile.write(b'* OK')
    631 
    632         with self.reaped_server(EOFHandler) as server:
    633             self.assertRaises(imaplib.IMAP4.abort,
    634                               self.imap_class, *server.server_address)
    635 
    636     @reap_threads
    637     def test_line_termination(self):
    638 
    639         class BadNewlineHandler(SimpleIMAPHandler):
    640 
    641             def cmd_CAPABILITY(self, tag, args):
    642                 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
    643                 self._send_tagged(tag, 'OK', 'CAPABILITY completed')
    644 
    645         with self.reaped_server(BadNewlineHandler) as server:
    646             self.assertRaises(imaplib.IMAP4.abort,
    647                               self.imap_class, *server.server_address)
    648 
    649     class UTF8Server(SimpleIMAPHandler):
    650         capabilities = 'AUTH ENABLE UTF8=ACCEPT'
    651 
    652         def cmd_ENABLE(self, tag, args):
    653             self._send_tagged(tag, 'OK', 'ENABLE successful')
    654 
    655         def cmd_AUTHENTICATE(self, tag, args):
    656             self._send_textline('+')
    657             self.server.response = yield
    658             self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
    659 
    660     @reap_threads
    661     def test_enable_raises_error_if_not_AUTH(self):
    662         with self.reaped_pair(self.UTF8Server) as (server, client):
    663             self.assertFalse(client.utf8_enabled)
    664             self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
    665             self.assertFalse(client.utf8_enabled)
    666 
    667     # XXX Also need a test that enable after SELECT raises an error.
    668 
    669     @reap_threads
    670     def test_enable_raises_error_if_no_capability(self):
    671         class NoEnableServer(self.UTF8Server):
    672             capabilities = 'AUTH'
    673         with self.reaped_pair(NoEnableServer) as (server, client):
    674             self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
    675 
    676     @reap_threads
    677     def test_enable_UTF8_raises_error_if_not_supported(self):
    678         class NonUTF8Server(SimpleIMAPHandler):
    679             pass
    680         with self.assertRaises(imaplib.IMAP4.error):
    681             with self.reaped_pair(NonUTF8Server) as (server, client):
    682                 typ, data = client.login('user', 'pass')
    683                 self.assertEqual(typ, 'OK')
    684                 client.enable('UTF8=ACCEPT')
    685                 pass
    686 
    687     @reap_threads
    688     def test_enable_UTF8_True_append(self):
    689 
    690         class UTF8AppendServer(self.UTF8Server):
    691             def cmd_APPEND(self, tag, args):
    692                 self._send_textline('+')
    693                 self.server.response = yield
    694                 self._send_tagged(tag, 'OK', 'okay')
    695 
    696         with self.reaped_pair(UTF8AppendServer) as (server, client):
    697             self.assertEqual(client._encoding, 'ascii')
    698             code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
    699             self.assertEqual(code, 'OK')
    700             self.assertEqual(server.response,
    701                              b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
    702             code, _ = client.enable('UTF8=ACCEPT')
    703             self.assertEqual(code, 'OK')
    704             self.assertEqual(client._encoding, 'utf-8')
    705             msg_string = 'Subject: '
    706             typ, data = client.append(
    707                 None, None, None, msg_string.encode('utf-8'))
    708             self.assertEqual(typ, 'OK')
    709             self.assertEqual(
    710                 server.response,
    711                 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')
    712             )
    713 
    714     # XXX also need a test that makes sure that the Literal and Untagged_status
    715     # regexes uses unicode in UTF8 mode instead of the default ASCII.
    716 
    717     @reap_threads
    718     def test_search_disallows_charset_in_utf8_mode(self):
    719         with self.reaped_pair(self.UTF8Server) as (server, client):
    720             typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
    721             self.assertEqual(typ, 'OK')
    722             typ, _ = client.enable('UTF8=ACCEPT')
    723             self.assertEqual(typ, 'OK')
    724             self.assertTrue(client.utf8_enabled)
    725             self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar')
    726 
    727     @reap_threads
    728     def test_bad_auth_name(self):
    729 
    730         class MyServer(SimpleIMAPHandler):
    731 
    732             def cmd_AUTHENTICATE(self, tag, args):
    733                 self._send_tagged(tag, 'NO', 'unrecognized authentication '
    734                                   'type {}'.format(args[0]))
    735 
    736         with self.reaped_pair(MyServer) as (server, client):
    737             with self.assertRaises(imaplib.IMAP4.error):
    738                 client.authenticate('METHOD', lambda: 1)
    739 
    740     @reap_threads
    741     def test_invalid_authentication(self):
    742 
    743         class MyServer(SimpleIMAPHandler):
    744 
    745             def cmd_AUTHENTICATE(self, tag, args):
    746                 self._send_textline('+')
    747                 self.response = yield
    748                 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
    749 
    750         with self.reaped_pair(MyServer) as (server, client):
    751             with self.assertRaises(imaplib.IMAP4.error):
    752                 code, data = client.authenticate('MYAUTH', lambda x: b'fake')
    753 
    754     @reap_threads
    755     def test_valid_authentication(self):
    756 
    757         class MyServer(SimpleIMAPHandler):
    758 
    759             def cmd_AUTHENTICATE(self, tag, args):
    760                 self._send_textline('+')
    761                 self.server.response = yield
    762                 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
    763 
    764         with self.reaped_pair(MyServer) as (server, client):
    765             code, data = client.authenticate('MYAUTH', lambda x: b'fake')
    766             self.assertEqual(code, 'OK')
    767             self.assertEqual(server.response,
    768                              b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
    769 
    770         with self.reaped_pair(MyServer) as (server, client):
    771             code, data = client.authenticate('MYAUTH', lambda x: 'fake')
    772             self.assertEqual(code, 'OK')
    773             self.assertEqual(server.response,
    774                              b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
    775 
    776     @reap_threads
    777     def test_login_cram_md5(self):
    778 
    779         class AuthHandler(SimpleIMAPHandler):
    780 
    781             capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
    782 
    783             def cmd_AUTHENTICATE(self, tag, args):
    784                 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
    785                                     'VzdG9uLm1jaS5uZXQ=')
    786                 r = yield
    787                 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
    788                          b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
    789                     self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
    790                 else:
    791                     self._send_tagged(tag, 'NO', 'No access')
    792 
    793         with self.reaped_pair(AuthHandler) as (server, client):
    794             self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
    795             ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
    796             self.assertEqual(ret, "OK")
    797 
    798         with self.reaped_pair(AuthHandler) as (server, client):
    799             self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
    800             ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
    801             self.assertEqual(ret, "OK")
    802 
    803 
    804     @reap_threads
    805     def test_aborted_authentication(self):
    806 
    807         class MyServer(SimpleIMAPHandler):
    808 
    809             def cmd_AUTHENTICATE(self, tag, args):
    810                 self._send_textline('+')
    811                 self.response = yield
    812 
    813                 if self.response == b'*\r\n':
    814                     self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted')
    815                 else:
    816                     self._send_tagged(tag, 'OK', 'MYAUTH successful')
    817 
    818         with self.reaped_pair(MyServer) as (server, client):
    819             with self.assertRaises(imaplib.IMAP4.error):
    820                 code, data = client.authenticate('MYAUTH', lambda x: None)
    821 
    822 
    823     def test_linetoolong(self):
    824         class TooLongHandler(SimpleIMAPHandler):
    825             def handle(self):
    826                 # Send a very long response line
    827                 self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n')
    828 
    829         with self.reaped_server(TooLongHandler) as server:
    830             self.assertRaises(imaplib.IMAP4.error,
    831                               self.imap_class, *server.server_address)
    832 
    833     @reap_threads
    834     def test_simple_with_statement(self):
    835         # simplest call
    836         with self.reaped_server(SimpleIMAPHandler) as server:
    837             with self.imap_class(*server.server_address):
    838                 pass
    839 
    840     @reap_threads
    841     def test_with_statement(self):
    842         with self.reaped_server(SimpleIMAPHandler) as server:
    843             with self.imap_class(*server.server_address) as imap:
    844                 imap.login('user', 'pass')
    845                 self.assertEqual(server.logged, 'user')
    846             self.assertIsNone(server.logged)
    847 
    848     @reap_threads
    849     def test_with_statement_logout(self):
    850         # what happens if already logout in the block?
    851         with self.reaped_server(SimpleIMAPHandler) as server:
    852             with self.imap_class(*server.server_address) as imap:
    853                 imap.login('user', 'pass')
    854                 self.assertEqual(server.logged, 'user')
    855                 imap.logout()
    856                 self.assertIsNone(server.logged)
    857             self.assertIsNone(server.logged)
    858 
    859 
    860 @unittest.skipUnless(ssl, "SSL not available")
    861 class ThreadedNetworkedTestsSSL(ThreadedNetworkedTests):
    862     server_class = SecureTCPServer
    863     imap_class = IMAP4_SSL
    864 
    865     @reap_threads
    866     def test_ssl_verified(self):
    867         ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    868         ssl_context.verify_mode = ssl.CERT_REQUIRED
    869         ssl_context.check_hostname = True
    870         ssl_context.load_verify_locations(CAFILE)
    871 
    872         with self.assertRaisesRegex(
    873                 ssl.CertificateError,
    874                 "hostname '127.0.0.1' doesn't match 'localhost'"):
    875             with self.reaped_server(SimpleIMAPHandler) as server:
    876                 client = self.imap_class(*server.server_address,
    877                                          ssl_context=ssl_context)
    878                 client.shutdown()
    879 
    880         with self.reaped_server(SimpleIMAPHandler) as server:
    881             client = self.imap_class("localhost", server.server_address[1],
    882                                      ssl_context=ssl_context)
    883             client.shutdown()
    884 
    885 
    886 @unittest.skipUnless(
    887     support.is_resource_enabled('network'), 'network resource disabled')
    888 class RemoteIMAPTest(unittest.TestCase):
    889     host = 'cyrus.andrew.cmu.edu'
    890     port = 143
    891     username = 'anonymous'
    892     password = 'pass'
    893     imap_class = imaplib.IMAP4
    894 
    895     def setUp(self):
    896         with transient_internet(self.host):
    897             self.server = self.imap_class(self.host, self.port)
    898 
    899     def tearDown(self):
    900         if self.server is not None:
    901             with transient_internet(self.host):
    902                 self.server.logout()
    903 
    904     def test_logincapa(self):
    905         with transient_internet(self.host):
    906             for cap in self.server.capabilities:
    907                 self.assertIsInstance(cap, str)
    908             self.assertIn('LOGINDISABLED', self.server.capabilities)
    909             self.assertIn('AUTH=ANONYMOUS', self.server.capabilities)
    910             rs = self.server.login(self.username, self.password)
    911             self.assertEqual(rs[0], 'OK')
    912 
    913     def test_logout(self):
    914         with transient_internet(self.host):
    915             rs = self.server.logout()
    916             self.server = None
    917             self.assertEqual(rs[0], 'BYE')
    918 
    919 
    920 @unittest.skipUnless(ssl, "SSL not available")
    921 @unittest.skipUnless(
    922     support.is_resource_enabled('network'), 'network resource disabled')
    923 class RemoteIMAP_STARTTLSTest(RemoteIMAPTest):
    924 
    925     def setUp(self):
    926         super().setUp()
    927         with transient_internet(self.host):
    928             rs = self.server.starttls()
    929             self.assertEqual(rs[0], 'OK')
    930 
    931     def test_logincapa(self):
    932         for cap in self.server.capabilities:
    933             self.assertIsInstance(cap, str)
    934         self.assertNotIn('LOGINDISABLED', self.server.capabilities)
    935 
    936 
    937 @unittest.skipUnless(ssl, "SSL not available")
    938 class RemoteIMAP_SSLTest(RemoteIMAPTest):
    939     port = 993
    940     imap_class = IMAP4_SSL
    941 
    942     def setUp(self):
    943         pass
    944 
    945     def tearDown(self):
    946         pass
    947 
    948     def create_ssl_context(self):
    949         ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    950         ssl_context.load_cert_chain(CERTFILE)
    951         return ssl_context
    952 
    953     def check_logincapa(self, server):
    954         try:
    955             for cap in server.capabilities:
    956                 self.assertIsInstance(cap, str)
    957             self.assertNotIn('LOGINDISABLED', server.capabilities)
    958             self.assertIn('AUTH=PLAIN', server.capabilities)
    959             rs = server.login(self.username, self.password)
    960             self.assertEqual(rs[0], 'OK')
    961         finally:
    962             server.logout()
    963 
    964     def test_logincapa(self):
    965         with transient_internet(self.host):
    966             _server = self.imap_class(self.host, self.port)
    967             self.check_logincapa(_server)
    968 
    969     def test_logincapa_with_client_certfile(self):
    970         with transient_internet(self.host):
    971             with support.check_warnings(('', DeprecationWarning)):
    972                 _server = self.imap_class(self.host, self.port,
    973                                           certfile=CERTFILE)
    974                 self.check_logincapa(_server)
    975 
    976     def test_logincapa_with_client_ssl_context(self):
    977         with transient_internet(self.host):
    978             _server = self.imap_class(
    979                 self.host, self.port, ssl_context=self.create_ssl_context())
    980             self.check_logincapa(_server)
    981 
    982     def test_logout(self):
    983         with transient_internet(self.host):
    984             _server = self.imap_class(self.host, self.port)
    985             rs = _server.logout()
    986             self.assertEqual(rs[0], 'BYE')
    987 
    988     def test_ssl_context_certfile_exclusive(self):
    989         with transient_internet(self.host):
    990             self.assertRaises(
    991                 ValueError, self.imap_class, self.host, self.port,
    992                 certfile=CERTFILE, ssl_context=self.create_ssl_context())
    993 
    994     def test_ssl_context_keyfile_exclusive(self):
    995         with transient_internet(self.host):
    996             self.assertRaises(
    997                 ValueError, self.imap_class, self.host, self.port,
    998                 keyfile=CERTFILE, ssl_context=self.create_ssl_context())
    999 
   1000 
   1001 if __name__ == "__main__":
   1002     unittest.main()
   1003