Home | History | Annotate | Download | only in test
      1 from test import test_support as support
      2 # If we end up with a significant number of tests that don't require
      3 # threading, this test module should be split.  Right now we skip
      4 # them all if we don't have threading.
      5 threading = support.import_module('threading')
      6 
      7 from contextlib import contextmanager
      8 import imaplib
      9 import os.path
     10 import SocketServer
     11 import time
     12 
     13 from test_support import reap_threads, verbose, transient_internet
     14 import unittest
     15 
     16 try:
     17     import ssl
     18 except ImportError:
     19     ssl = None
     20 
     21 CERTFILE = None
     22 
     23 
     24 class TestImaplib(unittest.TestCase):
     25 
     26     def test_that_Time2Internaldate_returns_a_result(self):
     27         # We can check only that it successfully produces a result,
     28         # not the correctness of the result itself, since the result
     29         # depends on the timezone the machine is in.
     30         timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),
     31                       '"18-May-2033 05:33:20 +0200"']
     32 
     33         for t in timevalues:
     34             imaplib.Time2Internaldate(t)
     35 
     36 
     37 if ssl:
     38 
     39     class SecureTCPServer(SocketServer.TCPServer):
     40 
     41         def get_request(self):
     42             newsocket, fromaddr = self.socket.accept()
     43             connstream = ssl.wrap_socket(newsocket,
     44                                          server_side=True,
     45                                          certfile=CERTFILE)
     46             return connstream, fromaddr
     47 
     48     IMAP4_SSL = imaplib.IMAP4_SSL
     49 
     50 else:
     51 
     52     class SecureTCPServer:
     53         pass
     54 
     55     IMAP4_SSL = None
     56 
     57 
     58 class SimpleIMAPHandler(SocketServer.StreamRequestHandler):
     59 
     60     timeout = 1
     61 
     62     def _send(self, message):
     63         if verbose: print "SENT:", message.strip()
     64         self.wfile.write(message)
     65 
     66     def handle(self):
     67         # Send a welcome message.
     68         self._send('* OK IMAP4rev1\r\n')
     69         while 1:
     70             # Gather up input until we receive a line terminator or we timeout.
     71             # Accumulate read(1) because it's simpler to handle the differences
     72             # between naked sockets and SSL sockets.
     73             line = ''
     74             while 1:
     75                 try:
     76                     part = self.rfile.read(1)
     77                     if part == '':
     78                         # Naked sockets return empty strings..
     79                         return
     80                     line += part
     81                 except IOError:
     82                     # ..but SSLSockets raise exceptions.
     83                     return
     84                 if line.endswith('\r\n'):
     85                     break
     86 
     87             if verbose: print 'GOT:', line.strip()
     88             splitline = line.split()
     89             tag = splitline[0]
     90             cmd = splitline[1]
     91             args = splitline[2:]
     92 
     93             if hasattr(self, 'cmd_%s' % (cmd,)):
     94                 getattr(self, 'cmd_%s' % (cmd,))(tag, args)
     95             else:
     96                 self._send('%s BAD %s unknown\r\n' % (tag, cmd))
     97 
     98     def cmd_CAPABILITY(self, tag, args):
     99         self._send('* CAPABILITY IMAP4rev1\r\n')
    100         self._send('%s OK CAPABILITY completed\r\n' % (tag,))
    101 
    102 
    103 class BaseThreadedNetworkedTests(unittest.TestCase):
    104 
    105     def make_server(self, addr, hdlr):
    106 
    107         class MyServer(self.server_class):
    108             def handle_error(self, request, client_address):
    109                 self.close_request(request)
    110                 self.server_close()
    111                 raise
    112 
    113         if verbose: print "creating server"
    114         server = MyServer(addr, hdlr)
    115         self.assertEqual(server.server_address, server.socket.getsockname())
    116 
    117         if verbose:
    118             print "server created"
    119             print "ADDR =", addr
    120             print "CLASS =", self.server_class
    121             print "HDLR =", server.RequestHandlerClass
    122 
    123         t = threading.Thread(
    124             name='%s serving' % self.server_class,
    125             target=server.serve_forever,
    126             # Short poll interval to make the test finish quickly.
    127             # Time between requests is short enough that we won't wake
    128             # up spuriously too many times.
    129             kwargs={'poll_interval':0.01})
    130         t.daemon = True  # In case this function raises.
    131         t.start()
    132         if verbose: print "server running"
    133         return server, t
    134 
    135     def reap_server(self, server, thread):
    136         if verbose: print "waiting for server"
    137         server.shutdown()
    138         thread.join()
    139         if verbose: print "done"
    140 
    141     @contextmanager
    142     def reaped_server(self, hdlr):
    143         server, thread = self.make_server((support.HOST, 0), hdlr)
    144         try:
    145             yield server
    146         finally:
    147             self.reap_server(server, thread)
    148 
    149     @reap_threads
    150     def test_connect(self):
    151         with self.reaped_server(SimpleIMAPHandler) as server:
    152             client = self.imap_class(*server.server_address)
    153             client.shutdown()
    154 
    155     @reap_threads
    156     def test_issue5949(self):
    157 
    158         class EOFHandler(SocketServer.StreamRequestHandler):
    159             def handle(self):
    160                 # EOF without sending a complete welcome message.
    161                 self.wfile.write('* OK')
    162 
    163         with self.reaped_server(EOFHandler) as server:
    164             self.assertRaises(imaplib.IMAP4.abort,
    165                               self.imap_class, *server.server_address)
    166 
    167 
    168     def test_linetoolong(self):
    169         class TooLongHandler(SimpleIMAPHandler):
    170             def handle(self):
    171                 # Send a very long response line
    172                 self.wfile.write('* OK ' + imaplib._MAXLINE*'x' + '\r\n')
    173 
    174         with self.reaped_server(TooLongHandler) as server:
    175             self.assertRaises(imaplib.IMAP4.error,
    176                               self.imap_class, *server.server_address)
    177 
    178 class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
    179 
    180     server_class = SocketServer.TCPServer
    181     imap_class = imaplib.IMAP4
    182 
    183 
    184 @unittest.skipUnless(ssl, "SSL not available")
    185 class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
    186 
    187     server_class = SecureTCPServer
    188     imap_class = IMAP4_SSL
    189 
    190     def test_linetoolong(self):
    191         raise unittest.SkipTest("test is not reliable on 2.7; see issue 20118")
    192 
    193 
    194 class RemoteIMAPTest(unittest.TestCase):
    195     host = 'cyrus.andrew.cmu.edu'
    196     port = 143
    197     username = 'anonymous'
    198     password = 'pass'
    199     imap_class = imaplib.IMAP4
    200 
    201     def setUp(self):
    202         with transient_internet(self.host):
    203             self.server = self.imap_class(self.host, self.port)
    204 
    205     def tearDown(self):
    206         if self.server is not None:
    207             self.server.logout()
    208 
    209     def test_logincapa(self):
    210         self.assertTrue('LOGINDISABLED' in self.server.capabilities)
    211 
    212     def test_anonlogin(self):
    213         self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities)
    214         rs = self.server.login(self.username, self.password)
    215         self.assertEqual(rs[0], 'OK')
    216 
    217     def test_logout(self):
    218         rs = self.server.logout()
    219         self.server = None
    220         self.assertEqual(rs[0], 'BYE')
    221 
    222 
    223 @unittest.skipUnless(ssl, "SSL not available")
    224 class RemoteIMAP_SSLTest(RemoteIMAPTest):
    225     port = 993
    226     imap_class = IMAP4_SSL
    227 
    228     def test_logincapa(self):
    229         self.assertFalse('LOGINDISABLED' in self.server.capabilities)
    230         self.assertTrue('AUTH=PLAIN' in self.server.capabilities)
    231 
    232 
    233 def test_main():
    234     tests = [TestImaplib]
    235 
    236     if support.is_resource_enabled('network'):
    237         if ssl:
    238             global CERTFILE
    239             CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
    240                                     "keycert.pem")
    241             if not os.path.exists(CERTFILE):
    242                 raise support.TestFailed("Can't read certificate files!")
    243         tests.extend([
    244             ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,
    245             RemoteIMAPTest, RemoteIMAP_SSLTest,
    246         ])
    247 
    248     support.run_unittest(*tests)
    249 
    250 
    251 if __name__ == "__main__":
    252     test_main()
    253