Home | History | Annotate | Download | only in test
      1 from test import test_support as support
      2 # If we end up with a significant number of tests that don't require
      3 # threading, this test module should be split.  Right now we skip
      4 # them all if we don't have threading.
      5 threading = support.import_module('threading')
      6 
      7 from contextlib import contextmanager
      8 import imaplib
      9 import os.path
     10 import SocketServer
     11 import time
     12 
     13 from test_support import reap_threads, verbose, transient_internet
     14 import unittest
     15 
     16 try:
     17     import ssl
     18 except ImportError:
     19     ssl = None
     20 
     21 CERTFILE = None
     22 
     23 
     24 class TestImaplib(unittest.TestCase):
     25 
     26     def test_that_Time2Internaldate_returns_a_result(self):
     27         # We can check only that it successfully produces a result,
     28         # not the correctness of the result itself, since the result
     29         # depends on the timezone the machine is in.
     30         timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),
     31                       '"18-May-2033 05:33:20 +0200"']
     32 
     33         for t in timevalues:
     34             imaplib.Time2Internaldate(t)
     35 
     36 
     37 if ssl:
     38 
     39     class SecureTCPServer(SocketServer.TCPServer):
     40 
     41         def get_request(self):
     42             newsocket, fromaddr = self.socket.accept()
     43             connstream = ssl.wrap_socket(newsocket,
     44                                          server_side=True,
     45                                          certfile=CERTFILE)
     46             return connstream, fromaddr
     47 
     48     IMAP4_SSL = imaplib.IMAP4_SSL
     49 
     50 else:
     51 
     52     class SecureTCPServer:
     53         pass
     54 
     55     IMAP4_SSL = None
     56 
     57 
     58 class SimpleIMAPHandler(SocketServer.StreamRequestHandler):
     59 
     60     timeout = 1
     61 
     62     def _send(self, message):
     63         if verbose: print "SENT:", message.strip()
     64         self.wfile.write(message)
     65 
     66     def handle(self):
     67         # Send a welcome message.
     68         self._send('* OK IMAP4rev1\r\n')
     69         while 1:
     70             # Gather up input until we receive a line terminator or we timeout.
     71             # Accumulate read(1) because it's simpler to handle the differences
     72             # between naked sockets and SSL sockets.
     73             line = ''
     74             while 1:
     75                 try:
     76                     part = self.rfile.read(1)
     77                     if part == '':
     78                         # Naked sockets return empty strings..
     79                         return
     80                     line += part
     81                 except IOError:
     82                     # ..but SSLSockets raise exceptions.
     83                     return
     84                 if line.endswith('\r\n'):
     85                     break
     86 
     87             if verbose: print 'GOT:', line.strip()
     88             splitline = line.split()
     89             tag = splitline[0]
     90             cmd = splitline[1]
     91             args = splitline[2:]
     92 
     93             if hasattr(self, 'cmd_%s' % (cmd,)):
     94                 getattr(self, 'cmd_%s' % (cmd,))(tag, args)
     95             else:
     96                 self._send('%s BAD %s unknown\r\n' % (tag, cmd))
     97 
     98     def cmd_CAPABILITY(self, tag, args):
     99         self._send('* CAPABILITY IMAP4rev1\r\n')
    100         self._send('%s OK CAPABILITY completed\r\n' % (tag,))
    101 
    102 
    103 class BaseThreadedNetworkedTests(unittest.TestCase):
    104 
    105     def make_server(self, addr, hdlr):
    106 
    107         class MyServer(self.server_class):
    108             def handle_error(self, request, client_address):
    109                 self.close_request(request)
    110                 self.server_close()
    111                 raise
    112 
    113         if verbose: print "creating server"
    114         server = MyServer(addr, hdlr)
    115         self.assertEqual(server.server_address, server.socket.getsockname())
    116 
    117         if verbose:
    118             print "server created"
    119             print "ADDR =", addr
    120             print "CLASS =", self.server_class
    121             print "HDLR =", server.RequestHandlerClass
    122 
    123         t = threading.Thread(
    124             name='%s serving' % self.server_class,
    125             target=server.serve_forever,
    126             # Short poll interval to make the test finish quickly.
    127             # Time between requests is short enough that we won't wake
    128             # up spuriously too many times.
    129             kwargs={'poll_interval':0.01})
    130         t.daemon = True  # In case this function raises.
    131         t.start()
    132         if verbose: print "server running"
    133         return server, t
    134 
    135     def reap_server(self, server, thread):
    136         if verbose: print "waiting for server"
    137         server.shutdown()
    138         thread.join()
    139         if verbose: print "done"
    140 
    141     @contextmanager
    142     def reaped_server(self, hdlr):
    143         server, thread = self.make_server((support.HOST, 0), hdlr)
    144         try:
    145             yield server
    146         finally:
    147             self.reap_server(server, thread)
    148 
    149     @reap_threads
    150     def test_connect(self):
    151         with self.reaped_server(SimpleIMAPHandler) as server:
    152             client = self.imap_class(*server.server_address)
    153             client.shutdown()
    154 
    155     @reap_threads
    156     def test_issue5949(self):
    157 
    158         class EOFHandler(SocketServer.StreamRequestHandler):
    159             def handle(self):
    160                 # EOF without sending a complete welcome message.
    161                 self.wfile.write('* OK')
    162 
    163         with self.reaped_server(EOFHandler) as server:
    164             self.assertRaises(imaplib.IMAP4.abort,
    165                               self.imap_class, *server.server_address)
    166 
    167 
    168 class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
    169 
    170     server_class = SocketServer.TCPServer
    171     imap_class = imaplib.IMAP4
    172 
    173 
    174 @unittest.skipUnless(ssl, "SSL not available")
    175 class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
    176 
    177     server_class = SecureTCPServer
    178     imap_class = IMAP4_SSL
    179 
    180 
    181 class RemoteIMAPTest(unittest.TestCase):
    182     host = 'cyrus.andrew.cmu.edu'
    183     port = 143
    184     username = 'anonymous'
    185     password = 'pass'
    186     imap_class = imaplib.IMAP4
    187 
    188     def setUp(self):
    189         with transient_internet(self.host):
    190             self.server = self.imap_class(self.host, self.port)
    191 
    192     def tearDown(self):
    193         if self.server is not None:
    194             self.server.logout()
    195 
    196     def test_logincapa(self):
    197         self.assertTrue('LOGINDISABLED' in self.server.capabilities)
    198 
    199     def test_anonlogin(self):
    200         self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities)
    201         rs = self.server.login(self.username, self.password)
    202         self.assertEqual(rs[0], 'OK')
    203 
    204     def test_logout(self):
    205         rs = self.server.logout()
    206         self.server = None
    207         self.assertEqual(rs[0], 'BYE')
    208 
    209 
    210 @unittest.skipUnless(ssl, "SSL not available")
    211 class RemoteIMAP_SSLTest(RemoteIMAPTest):
    212     port = 993
    213     imap_class = IMAP4_SSL
    214 
    215     def test_logincapa(self):
    216         self.assertFalse('LOGINDISABLED' in self.server.capabilities)
    217         self.assertTrue('AUTH=PLAIN' in self.server.capabilities)
    218 
    219 
    220 def test_main():
    221     tests = [TestImaplib]
    222 
    223     if support.is_resource_enabled('network'):
    224         if ssl:
    225             global CERTFILE
    226             CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
    227                                     "keycert.pem")
    228             if not os.path.exists(CERTFILE):
    229                 raise support.TestFailed("Can't read certificate files!")
    230         tests.extend([
    231             ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,
    232             RemoteIMAPTest, RemoteIMAP_SSLTest,
    233         ])
    234 
    235     support.run_unittest(*tests)
    236 
    237 
    238 if __name__ == "__main__":
    239     support.use_resources = ['network']
    240     test_main()
    241