1 from test import test_support as support 2 # If we end up with a significant number of tests that don't require 3 # threading, this test module should be split. Right now we skip 4 # them all if we don't have threading. 5 threading = support.import_module('threading') 6 7 from contextlib import contextmanager 8 import imaplib 9 import os.path 10 import SocketServer 11 import time 12 13 from test_support import reap_threads, verbose, transient_internet 14 import unittest 15 16 try: 17 import ssl 18 except ImportError: 19 ssl = None 20 21 CERTFILE = None 22 23 24 class TestImaplib(unittest.TestCase): 25 26 def test_that_Time2Internaldate_returns_a_result(self): 27 # We can check only that it successfully produces a result, 28 # not the correctness of the result itself, since the result 29 # depends on the timezone the machine is in. 30 timevalues = [2000000000, 2000000000.0, time.localtime(2000000000), 31 '"18-May-2033 05:33:20 +0200"'] 32 33 for t in timevalues: 34 imaplib.Time2Internaldate(t) 35 36 37 if ssl: 38 39 class SecureTCPServer(SocketServer.TCPServer): 40 41 def get_request(self): 42 newsocket, fromaddr = self.socket.accept() 43 connstream = ssl.wrap_socket(newsocket, 44 server_side=True, 45 certfile=CERTFILE) 46 return connstream, fromaddr 47 48 IMAP4_SSL = imaplib.IMAP4_SSL 49 50 else: 51 52 class SecureTCPServer: 53 pass 54 55 IMAP4_SSL = None 56 57 58 class SimpleIMAPHandler(SocketServer.StreamRequestHandler): 59 60 timeout = 1 61 62 def _send(self, message): 63 if verbose: print "SENT:", message.strip() 64 self.wfile.write(message) 65 66 def handle(self): 67 # Send a welcome message. 68 self._send('* OK IMAP4rev1\r\n') 69 while 1: 70 # Gather up input until we receive a line terminator or we timeout. 71 # Accumulate read(1) because it's simpler to handle the differences 72 # between naked sockets and SSL sockets. 73 line = '' 74 while 1: 75 try: 76 part = self.rfile.read(1) 77 if part == '': 78 # Naked sockets return empty strings.. 79 return 80 line += part 81 except IOError: 82 # ..but SSLSockets raise exceptions. 83 return 84 if line.endswith('\r\n'): 85 break 86 87 if verbose: print 'GOT:', line.strip() 88 splitline = line.split() 89 tag = splitline[0] 90 cmd = splitline[1] 91 args = splitline[2:] 92 93 if hasattr(self, 'cmd_%s' % (cmd,)): 94 getattr(self, 'cmd_%s' % (cmd,))(tag, args) 95 else: 96 self._send('%s BAD %s unknown\r\n' % (tag, cmd)) 97 98 def cmd_CAPABILITY(self, tag, args): 99 self._send('* CAPABILITY IMAP4rev1\r\n') 100 self._send('%s OK CAPABILITY completed\r\n' % (tag,)) 101 102 103 class BaseThreadedNetworkedTests(unittest.TestCase): 104 105 def make_server(self, addr, hdlr): 106 107 class MyServer(self.server_class): 108 def handle_error(self, request, client_address): 109 self.close_request(request) 110 self.server_close() 111 raise 112 113 if verbose: print "creating server" 114 server = MyServer(addr, hdlr) 115 self.assertEqual(server.server_address, server.socket.getsockname()) 116 117 if verbose: 118 print "server created" 119 print "ADDR =", addr 120 print "CLASS =", self.server_class 121 print "HDLR =", server.RequestHandlerClass 122 123 t = threading.Thread( 124 name='%s serving' % self.server_class, 125 target=server.serve_forever, 126 # Short poll interval to make the test finish quickly. 127 # Time between requests is short enough that we won't wake 128 # up spuriously too many times. 129 kwargs={'poll_interval':0.01}) 130 t.daemon = True # In case this function raises. 131 t.start() 132 if verbose: print "server running" 133 return server, t 134 135 def reap_server(self, server, thread): 136 if verbose: print "waiting for server" 137 server.shutdown() 138 thread.join() 139 if verbose: print "done" 140 141 @contextmanager 142 def reaped_server(self, hdlr): 143 server, thread = self.make_server((support.HOST, 0), hdlr) 144 try: 145 yield server 146 finally: 147 self.reap_server(server, thread) 148 149 @reap_threads 150 def test_connect(self): 151 with self.reaped_server(SimpleIMAPHandler) as server: 152 client = self.imap_class(*server.server_address) 153 client.shutdown() 154 155 @reap_threads 156 def test_issue5949(self): 157 158 class EOFHandler(SocketServer.StreamRequestHandler): 159 def handle(self): 160 # EOF without sending a complete welcome message. 161 self.wfile.write('* OK') 162 163 with self.reaped_server(EOFHandler) as server: 164 self.assertRaises(imaplib.IMAP4.abort, 165 self.imap_class, *server.server_address) 166 167 168 class ThreadedNetworkedTests(BaseThreadedNetworkedTests): 169 170 server_class = SocketServer.TCPServer 171 imap_class = imaplib.IMAP4 172 173 174 @unittest.skipUnless(ssl, "SSL not available") 175 class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): 176 177 server_class = SecureTCPServer 178 imap_class = IMAP4_SSL 179 180 181 class RemoteIMAPTest(unittest.TestCase): 182 host = 'cyrus.andrew.cmu.edu' 183 port = 143 184 username = 'anonymous' 185 password = 'pass' 186 imap_class = imaplib.IMAP4 187 188 def setUp(self): 189 with transient_internet(self.host): 190 self.server = self.imap_class(self.host, self.port) 191 192 def tearDown(self): 193 if self.server is not None: 194 self.server.logout() 195 196 def test_logincapa(self): 197 self.assertTrue('LOGINDISABLED' in self.server.capabilities) 198 199 def test_anonlogin(self): 200 self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities) 201 rs = self.server.login(self.username, self.password) 202 self.assertEqual(rs[0], 'OK') 203 204 def test_logout(self): 205 rs = self.server.logout() 206 self.server = None 207 self.assertEqual(rs[0], 'BYE') 208 209 210 @unittest.skipUnless(ssl, "SSL not available") 211 class RemoteIMAP_SSLTest(RemoteIMAPTest): 212 port = 993 213 imap_class = IMAP4_SSL 214 215 def test_logincapa(self): 216 self.assertFalse('LOGINDISABLED' in self.server.capabilities) 217 self.assertTrue('AUTH=PLAIN' in self.server.capabilities) 218 219 220 def test_main(): 221 tests = [TestImaplib] 222 223 if support.is_resource_enabled('network'): 224 if ssl: 225 global CERTFILE 226 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, 227 "keycert.pem") 228 if not os.path.exists(CERTFILE): 229 raise support.TestFailed("Can't read certificate files!") 230 tests.extend([ 231 ThreadedNetworkedTests, ThreadedNetworkedTestsSSL, 232 RemoteIMAPTest, RemoteIMAP_SSLTest, 233 ]) 234 235 support.run_unittest(*tests) 236 237 238 if __name__ == "__main__": 239 support.use_resources = ['network'] 240 test_main() 241