Home | History | Annotate | Download | only in test
      1 import asyncore
      2 import unittest
      3 import select
      4 import os
      5 import socket
      6 import sys
      7 import time
      8 import warnings
      9 import errno
     10 
     11 from test import test_support
     12 from test.test_support import TESTFN, run_unittest, unlink
     13 from StringIO import StringIO
     14 
     15 try:
     16     import threading
     17 except ImportError:
     18     threading = None
     19 
     20 HOST = test_support.HOST
     21 
     22 class dummysocket:
     23     def __init__(self):
     24         self.closed = False
     25 
     26     def close(self):
     27         self.closed = True
     28 
     29     def fileno(self):
     30         return 42
     31 
     32 class dummychannel:
     33     def __init__(self):
     34         self.socket = dummysocket()
     35 
     36     def close(self):
     37         self.socket.close()
     38 
     39 class exitingdummy:
     40     def __init__(self):
     41         pass
     42 
     43     def handle_read_event(self):
     44         raise asyncore.ExitNow()
     45 
     46     handle_write_event = handle_read_event
     47     handle_close = handle_read_event
     48     handle_expt_event = handle_read_event
     49 
     50 class crashingdummy:
     51     def __init__(self):
     52         self.error_handled = False
     53 
     54     def handle_read_event(self):
     55         raise Exception()
     56 
     57     handle_write_event = handle_read_event
     58     handle_close = handle_read_event
     59     handle_expt_event = handle_read_event
     60 
     61     def handle_error(self):
     62         self.error_handled = True
     63 
     64 # used when testing senders; just collects what it gets until newline is sent

     65 def capture_server(evt, buf, serv):
     66     try:
     67         serv.listen(5)
     68         conn, addr = serv.accept()
     69     except socket.timeout:
     70         pass
     71     else:
     72         n = 200
     73         while n > 0:
     74             r, w, e = select.select([conn], [], [])
     75             if r:
     76                 data = conn.recv(10)
     77                 # keep everything except for the newline terminator

     78                 buf.write(data.replace('\n', ''))
     79                 if '\n' in data:
     80                     break
     81             n -= 1
     82             time.sleep(0.01)
     83 
     84         conn.close()
     85     finally:
     86         serv.close()
     87         evt.set()
     88 
     89 
     90 class HelperFunctionTests(unittest.TestCase):
     91     def test_readwriteexc(self):
     92         # Check exception handling behavior of read, write and _exception

     93 
     94         # check that ExitNow exceptions in the object handler method

     95         # bubbles all the way up through asyncore read/write/_exception calls

     96         tr1 = exitingdummy()
     97         self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
     98         self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
     99         self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
    100 
    101         # check that an exception other than ExitNow in the object handler

    102         # method causes the handle_error method to get called

    103         tr2 = crashingdummy()
    104         asyncore.read(tr2)
    105         self.assertEqual(tr2.error_handled, True)
    106 
    107         tr2 = crashingdummy()
    108         asyncore.write(tr2)
    109         self.assertEqual(tr2.error_handled, True)
    110 
    111         tr2 = crashingdummy()
    112         asyncore._exception(tr2)
    113         self.assertEqual(tr2.error_handled, True)
    114 
    115     # asyncore.readwrite uses constants in the select module that

    116     # are not present in Windows systems (see this thread:

    117     # http://mail.python.org/pipermail/python-list/2001-October/109973.html)

    118     # These constants should be present as long as poll is available

    119 
    120     @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
    121     def test_readwrite(self):
    122         # Check that correct methods are called by readwrite()

    123 
    124         attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
    125 
    126         expected = (
    127             (select.POLLIN, 'read'),
    128             (select.POLLPRI, 'expt'),
    129             (select.POLLOUT, 'write'),
    130             (select.POLLERR, 'closed'),
    131             (select.POLLHUP, 'closed'),
    132             (select.POLLNVAL, 'closed'),
    133             )
    134 
    135         class testobj:
    136             def __init__(self):
    137                 self.read = False
    138                 self.write = False
    139                 self.closed = False
    140                 self.expt = False
    141                 self.error_handled = False
    142 
    143             def handle_read_event(self):
    144                 self.read = True
    145 
    146             def handle_write_event(self):
    147                 self.write = True
    148 
    149             def handle_close(self):
    150                 self.closed = True
    151 
    152             def handle_expt_event(self):
    153                 self.expt = True
    154 
    155             def handle_error(self):
    156                 self.error_handled = True
    157 
    158         for flag, expectedattr in expected:
    159             tobj = testobj()
    160             self.assertEqual(getattr(tobj, expectedattr), False)
    161             asyncore.readwrite(tobj, flag)
    162 
    163             # Only the attribute modified by the routine we expect to be

    164             # called should be True.

    165             for attr in attributes:
    166                 self.assertEqual(getattr(tobj, attr), attr==expectedattr)
    167 
    168             # check that ExitNow exceptions in the object handler method

    169             # bubbles all the way up through asyncore readwrite call

    170             tr1 = exitingdummy()
    171             self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
    172 
    173             # check that an exception other than ExitNow in the object handler

    174             # method causes the handle_error method to get called

    175             tr2 = crashingdummy()
    176             self.assertEqual(tr2.error_handled, False)
    177             asyncore.readwrite(tr2, flag)
    178             self.assertEqual(tr2.error_handled, True)
    179 
    180     def test_closeall(self):
    181         self.closeall_check(False)
    182 
    183     def test_closeall_default(self):
    184         self.closeall_check(True)
    185 
    186     def closeall_check(self, usedefault):
    187         # Check that close_all() closes everything in a given map

    188 
    189         l = []
    190         testmap = {}
    191         for i in range(10):
    192             c = dummychannel()
    193             l.append(c)
    194             self.assertEqual(c.socket.closed, False)
    195             testmap[i] = c
    196 
    197         if usedefault:
    198             socketmap = asyncore.socket_map
    199             try:
    200                 asyncore.socket_map = testmap
    201                 asyncore.close_all()
    202             finally:
    203                 testmap, asyncore.socket_map = asyncore.socket_map, socketmap
    204         else:
    205             asyncore.close_all(testmap)
    206 
    207         self.assertEqual(len(testmap), 0)
    208 
    209         for c in l:
    210             self.assertEqual(c.socket.closed, True)
    211 
    212     def test_compact_traceback(self):
    213         try:
    214             raise Exception("I don't like spam!")
    215         except:
    216             real_t, real_v, real_tb = sys.exc_info()
    217             r = asyncore.compact_traceback()
    218         else:
    219             self.fail("Expected exception")
    220 
    221         (f, function, line), t, v, info = r
    222         self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
    223         self.assertEqual(function, 'test_compact_traceback')
    224         self.assertEqual(t, real_t)
    225         self.assertEqual(v, real_v)
    226         self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
    227 
    228 
    229 class DispatcherTests(unittest.TestCase):
    230     def setUp(self):
    231         pass
    232 
    233     def tearDown(self):
    234         asyncore.close_all()
    235 
    236     def test_basic(self):
    237         d = asyncore.dispatcher()
    238         self.assertEqual(d.readable(), True)
    239         self.assertEqual(d.writable(), True)
    240 
    241     def test_repr(self):
    242         d = asyncore.dispatcher()
    243         self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
    244 
    245     def test_log(self):
    246         d = asyncore.dispatcher()
    247 
    248         # capture output of dispatcher.log() (to stderr)

    249         fp = StringIO()
    250         stderr = sys.stderr
    251         l1 = "Lovely spam! Wonderful spam!"
    252         l2 = "I don't like spam!"
    253         try:
    254             sys.stderr = fp
    255             d.log(l1)
    256             d.log(l2)
    257         finally:
    258             sys.stderr = stderr
    259 
    260         lines = fp.getvalue().splitlines()
    261         self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
    262 
    263     def test_log_info(self):
    264         d = asyncore.dispatcher()
    265 
    266         # capture output of dispatcher.log_info() (to stdout via print)

    267         fp = StringIO()
    268         stdout = sys.stdout
    269         l1 = "Have you got anything without spam?"
    270         l2 = "Why can't she have egg bacon spam and sausage?"
    271         l3 = "THAT'S got spam in it!"
    272         try:
    273             sys.stdout = fp
    274             d.log_info(l1, 'EGGS')
    275             d.log_info(l2)
    276             d.log_info(l3, 'SPAM')
    277         finally:
    278             sys.stdout = stdout
    279 
    280         lines = fp.getvalue().splitlines()
    281         expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
    282 
    283         self.assertEqual(lines, expected)
    284 
    285     def test_unhandled(self):
    286         d = asyncore.dispatcher()
    287         d.ignore_log_types = ()
    288 
    289         # capture output of dispatcher.log_info() (to stdout via print)

    290         fp = StringIO()
    291         stdout = sys.stdout
    292         try:
    293             sys.stdout = fp
    294             d.handle_expt()
    295             d.handle_read()
    296             d.handle_write()
    297             d.handle_connect()
    298             d.handle_accept()
    299         finally:
    300             sys.stdout = stdout
    301 
    302         lines = fp.getvalue().splitlines()
    303         expected = ['warning: unhandled incoming priority event',
    304                     'warning: unhandled read event',
    305                     'warning: unhandled write event',
    306                     'warning: unhandled connect event',
    307                     'warning: unhandled accept event']
    308         self.assertEqual(lines, expected)
    309 
    310     def test_issue_8594(self):
    311         # XXX - this test is supposed to be removed in next major Python

    312         # version

    313         d = asyncore.dispatcher(socket.socket())
    314         # make sure the error message no longer refers to the socket

    315         # object but the dispatcher instance instead

    316         self.assertRaisesRegexp(AttributeError, 'dispatcher instance',
    317                                 getattr, d, 'foo')
    318         # cheap inheritance with the underlying socket is supposed

    319         # to still work but a DeprecationWarning is expected

    320         with warnings.catch_warnings(record=True) as w:
    321             warnings.simplefilter("always")
    322             family = d.family
    323             self.assertEqual(family, socket.AF_INET)
    324             self.assertEqual(len(w), 1)
    325             self.assertTrue(issubclass(w[0].category, DeprecationWarning))
    326 
    327     def test_strerror(self):
    328         # refers to bug #8573

    329         err = asyncore._strerror(errno.EPERM)
    330         if hasattr(os, 'strerror'):
    331             self.assertEqual(err, os.strerror(errno.EPERM))
    332         err = asyncore._strerror(-1)
    333         self.assertTrue(err != "")
    334 
    335 
    336 class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
    337     def readable(self):
    338         return False
    339 
    340     def handle_connect(self):
    341         pass
    342 
    343 class DispatcherWithSendTests(unittest.TestCase):
    344     usepoll = False
    345 
    346     def setUp(self):
    347         pass
    348 
    349     def tearDown(self):
    350         asyncore.close_all()
    351 
    352     @unittest.skipUnless(threading, 'Threading required for this test.')
    353     @test_support.reap_threads
    354     def test_send(self):
    355         evt = threading.Event()
    356         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    357         sock.settimeout(3)
    358         port = test_support.bind_port(sock)
    359 
    360         cap = StringIO()
    361         args = (evt, cap, sock)
    362         t = threading.Thread(target=capture_server, args=args)
    363         t.start()
    364         try:
    365             # wait a little longer for the server to initialize (it sometimes

    366             # refuses connections on slow machines without this wait)

    367             time.sleep(0.2)
    368 
    369             data = "Suppose there isn't a 16-ton weight?"
    370             d = dispatcherwithsend_noread()
    371             d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    372             d.connect((HOST, port))
    373 
    374             # give time for socket to connect

    375             time.sleep(0.1)
    376 
    377             d.send(data)
    378             d.send(data)
    379             d.send('\n')
    380 
    381             n = 1000
    382             while d.out_buffer and n > 0:
    383                 asyncore.poll()
    384                 n -= 1
    385 
    386             evt.wait()
    387 
    388             self.assertEqual(cap.getvalue(), data*2)
    389         finally:
    390             t.join()
    391 
    392 
    393 class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
    394     usepoll = True
    395 
    396 @unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
    397                      'asyncore.file_wrapper required')
    398 class FileWrapperTest(unittest.TestCase):
    399     def setUp(self):
    400         self.d = "It's not dead, it's sleeping!"
    401         with file(TESTFN, 'w') as h:
    402             h.write(self.d)
    403 
    404     def tearDown(self):
    405         unlink(TESTFN)
    406 
    407     def test_recv(self):
    408         fd = os.open(TESTFN, os.O_RDONLY)
    409         w = asyncore.file_wrapper(fd)
    410         os.close(fd)
    411 
    412         self.assertNotEqual(w.fd, fd)
    413         self.assertNotEqual(w.fileno(), fd)
    414         self.assertEqual(w.recv(13), "It's not dead")
    415         self.assertEqual(w.read(6), ", it's")
    416         w.close()
    417         self.assertRaises(OSError, w.read, 1)
    418 
    419 
    420     def test_send(self):
    421         d1 = "Come again?"
    422         d2 = "I want to buy some cheese."
    423         fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
    424         w = asyncore.file_wrapper(fd)
    425         os.close(fd)
    426 
    427         w.write(d1)
    428         w.send(d2)
    429         w.close()
    430         self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
    431 
    432     @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
    433                          'asyncore.file_dispatcher required')
    434     def test_dispatcher(self):
    435         fd = os.open(TESTFN, os.O_RDONLY)
    436         data = []
    437         class FileDispatcher(asyncore.file_dispatcher):
    438             def handle_read(self):
    439                 data.append(self.recv(29))
    440         s = FileDispatcher(fd)
    441         os.close(fd)
    442         asyncore.loop(timeout=0.01, use_poll=True, count=2)
    443         self.assertEqual(b"".join(data), self.d)
    444 
    445 
    446 class BaseTestHandler(asyncore.dispatcher):
    447 
    448     def __init__(self, sock=None):
    449         asyncore.dispatcher.__init__(self, sock)
    450         self.flag = False
    451 
    452     def handle_accept(self):
    453         raise Exception("handle_accept not supposed to be called")
    454 
    455     def handle_connect(self):
    456         raise Exception("handle_connect not supposed to be called")
    457 
    458     def handle_expt(self):
    459         raise Exception("handle_expt not supposed to be called")
    460 
    461     def handle_close(self):
    462         raise Exception("handle_close not supposed to be called")
    463 
    464     def handle_error(self):
    465         raise
    466 
    467 
    468 class TCPServer(asyncore.dispatcher):
    469     """A server which listens on an address and dispatches the
    470     connection to a handler.
    471     """
    472 
    473     def __init__(self, handler=BaseTestHandler, host=HOST, port=0):
    474         asyncore.dispatcher.__init__(self)
    475         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    476         self.set_reuse_addr()
    477         self.bind((host, port))
    478         self.listen(5)
    479         self.handler = handler
    480 
    481     @property
    482     def address(self):
    483         return self.socket.getsockname()[:2]
    484 
    485     def handle_accept(self):
    486         sock, addr = self.accept()
    487         self.handler(sock)
    488 
    489     def handle_error(self):
    490         raise
    491 
    492 
    493 class BaseClient(BaseTestHandler):
    494 
    495     def __init__(self, address):
    496         BaseTestHandler.__init__(self)
    497         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    498         self.connect(address)
    499 
    500     def handle_connect(self):
    501         pass
    502 
    503 
    504 class BaseTestAPI(unittest.TestCase):
    505 
    506     def tearDown(self):
    507         asyncore.close_all()
    508 
    509     def loop_waiting_for_flag(self, instance, timeout=5):
    510         timeout = float(timeout) / 100
    511         count = 100
    512         while asyncore.socket_map and count > 0:
    513             asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
    514             if instance.flag:
    515                 return
    516             count -= 1
    517             time.sleep(timeout)
    518         self.fail("flag not set")
    519 
    520     def test_handle_connect(self):
    521         # make sure handle_connect is called on connect()

    522 
    523         class TestClient(BaseClient):
    524             def handle_connect(self):
    525                 self.flag = True
    526 
    527         server = TCPServer()
    528         client = TestClient(server.address)
    529         self.loop_waiting_for_flag(client)
    530 
    531     def test_handle_accept(self):
    532         # make sure handle_accept() is called when a client connects

    533 
    534         class TestListener(BaseTestHandler):
    535 
    536             def __init__(self):
    537                 BaseTestHandler.__init__(self)
    538                 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    539                 self.bind((HOST, 0))
    540                 self.listen(5)
    541                 self.address = self.socket.getsockname()[:2]
    542 
    543             def handle_accept(self):
    544                 self.flag = True
    545 
    546         server = TestListener()
    547         client = BaseClient(server.address)
    548         self.loop_waiting_for_flag(server)
    549 
    550     def test_handle_read(self):
    551         # make sure handle_read is called on data received

    552 
    553         class TestClient(BaseClient):
    554             def handle_read(self):
    555                 self.flag = True
    556 
    557         class TestHandler(BaseTestHandler):
    558             def __init__(self, conn):
    559                 BaseTestHandler.__init__(self, conn)
    560                 self.send('x' * 1024)
    561 
    562         server = TCPServer(TestHandler)
    563         client = TestClient(server.address)
    564         self.loop_waiting_for_flag(client)
    565 
    566     def test_handle_write(self):
    567         # make sure handle_write is called

    568 
    569         class TestClient(BaseClient):
    570             def handle_write(self):
    571                 self.flag = True
    572 
    573         server = TCPServer()
    574         client = TestClient(server.address)
    575         self.loop_waiting_for_flag(client)
    576 
    577     def test_handle_close(self):
    578         # make sure handle_close is called when the other end closes

    579         # the connection

    580 
    581         class TestClient(BaseClient):
    582 
    583             def handle_read(self):
    584                 # in order to make handle_close be called we are supposed

    585                 # to make at least one recv() call

    586                 self.recv(1024)
    587 
    588             def handle_close(self):
    589                 self.flag = True
    590                 self.close()
    591 
    592         class TestHandler(BaseTestHandler):
    593             def __init__(self, conn):
    594                 BaseTestHandler.__init__(self, conn)
    595                 self.close()
    596 
    597         server = TCPServer(TestHandler)
    598         client = TestClient(server.address)
    599         self.loop_waiting_for_flag(client)
    600 
    601     @unittest.skipIf(sys.platform.startswith("sunos"),
    602                      "OOB support is broken on Solaris")
    603     def test_handle_expt(self):
    604         # Make sure handle_expt is called on OOB data received.

    605         # Note: this might fail on some platforms as OOB data is

    606         # tenuously supported and rarely used.

    607 
    608         class TestClient(BaseClient):
    609             def handle_expt(self):
    610                 self.flag = True
    611 
    612         class TestHandler(BaseTestHandler):
    613             def __init__(self, conn):
    614                 BaseTestHandler.__init__(self, conn)
    615                 self.socket.send(chr(244), socket.MSG_OOB)
    616 
    617         server = TCPServer(TestHandler)
    618         client = TestClient(server.address)
    619         self.loop_waiting_for_flag(client)
    620 
    621     def test_handle_error(self):
    622 
    623         class TestClient(BaseClient):
    624             def handle_write(self):
    625                 1.0 / 0
    626             def handle_error(self):
    627                 self.flag = True
    628                 try:
    629                     raise
    630                 except ZeroDivisionError:
    631                     pass
    632                 else:
    633                     raise Exception("exception not raised")
    634 
    635         server = TCPServer()
    636         client = TestClient(server.address)
    637         self.loop_waiting_for_flag(client)
    638 
    639     def test_connection_attributes(self):
    640         server = TCPServer()
    641         client = BaseClient(server.address)
    642 
    643         # we start disconnected

    644         self.assertFalse(server.connected)
    645         self.assertTrue(server.accepting)
    646         # this can't be taken for granted across all platforms

    647         #self.assertFalse(client.connected)

    648         self.assertFalse(client.accepting)
    649 
    650         # execute some loops so that client connects to server

    651         asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)
    652         self.assertFalse(server.connected)
    653         self.assertTrue(server.accepting)
    654         self.assertTrue(client.connected)
    655         self.assertFalse(client.accepting)
    656 
    657         # disconnect the client

    658         client.close()
    659         self.assertFalse(server.connected)
    660         self.assertTrue(server.accepting)
    661         self.assertFalse(client.connected)
    662         self.assertFalse(client.accepting)
    663 
    664         # stop serving

    665         server.close()
    666         self.assertFalse(server.connected)
    667         self.assertFalse(server.accepting)
    668 
    669     def test_create_socket(self):
    670         s = asyncore.dispatcher()
    671         s.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    672         self.assertEqual(s.socket.family, socket.AF_INET)
    673         self.assertEqual(s.socket.type, socket.SOCK_STREAM)
    674 
    675     def test_bind(self):
    676         s1 = asyncore.dispatcher()
    677         s1.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    678         s1.bind((HOST, 0))
    679         s1.listen(5)
    680         port = s1.socket.getsockname()[1]
    681 
    682         s2 = asyncore.dispatcher()
    683         s2.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    684         # EADDRINUSE indicates the socket was correctly bound

    685         self.assertRaises(socket.error, s2.bind, (HOST, port))
    686 
    687     def test_set_reuse_addr(self):
    688         sock = socket.socket()
    689         try:
    690             sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    691         except socket.error:
    692             unittest.skip("SO_REUSEADDR not supported on this platform")
    693         else:
    694             # if SO_REUSEADDR succeeded for sock we expect asyncore

    695             # to do the same

    696             s = asyncore.dispatcher(socket.socket())
    697             self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
    698                                                  socket.SO_REUSEADDR))
    699             s.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    700             s.set_reuse_addr()
    701             self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
    702                                                  socket.SO_REUSEADDR))
    703         finally:
    704             sock.close()
    705 
    706 
    707 class TestAPI_UseSelect(BaseTestAPI):
    708     use_poll = False
    709 
    710 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
    711 class TestAPI_UsePoll(BaseTestAPI):
    712     use_poll = True
    713 
    714 
    715 def test_main():
    716     tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
    717              DispatcherWithSendTests_UsePoll, TestAPI_UseSelect,
    718              TestAPI_UsePoll, FileWrapperTest]
    719     run_unittest(*tests)
    720 
    721 if __name__ == "__main__":
    722     test_main()
    723