Home | History | Annotate | Download | only in test
      1 # Copyright (c) 2001-2006 Twisted Matrix Laboratories.

      2 #

      3 # Permission is hereby granted, free of charge, to any person obtaining

      4 # a copy of this software and associated documentation files (the

      5 # "Software"), to deal in the Software without restriction, including

      6 # without limitation the rights to use, copy, modify, merge, publish,

      7 # distribute, sublicense, and/or sell copies of the Software, and to

      8 # permit persons to whom the Software is furnished to do so, subject to

      9 # the following conditions:

     10 #

     11 # The above copyright notice and this permission notice shall be

     12 # included in all copies or substantial portions of the Software.

     13 #

     14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,

     15 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF

     16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND

     17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE

     18 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION

     19 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION

     20 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

     21 """
     22 Tests for epoll wrapper.
     23 """
     24 import socket
     25 import errno
     26 import time
     27 import select
     28 import unittest
     29 
     30 from test import test_support
     31 if not hasattr(select, "epoll"):
     32     raise unittest.SkipTest("test works only on Linux 2.6")
     33 
     34 try:
     35     select.epoll()
     36 except IOError, e:
     37     if e.errno == errno.ENOSYS:
     38         raise unittest.SkipTest("kernel doesn't support epoll()")
     39 
     40 class TestEPoll(unittest.TestCase):
     41 
     42     def setUp(self):
     43         self.serverSocket = socket.socket()
     44         self.serverSocket.bind(('127.0.0.1', 0))
     45         self.serverSocket.listen(1)
     46         self.connections = [self.serverSocket]
     47 
     48 
     49     def tearDown(self):
     50         for skt in self.connections:
     51             skt.close()
     52 
     53     def _connected_pair(self):
     54         client = socket.socket()
     55         client.setblocking(False)
     56         try:
     57             client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
     58         except socket.error, e:
     59             self.assertEqual(e.args[0], errno.EINPROGRESS)
     60         else:
     61             raise AssertionError("Connect should have raised EINPROGRESS")
     62         server, addr = self.serverSocket.accept()
     63 
     64         self.connections.extend((client, server))
     65         return client, server
     66 
     67     def test_create(self):
     68         try:
     69             ep = select.epoll(16)
     70         except OSError, e:
     71             raise AssertionError(str(e))
     72         self.assertTrue(ep.fileno() > 0, ep.fileno())
     73         self.assertTrue(not ep.closed)
     74         ep.close()
     75         self.assertTrue(ep.closed)
     76         self.assertRaises(ValueError, ep.fileno)
     77 
     78     def test_badcreate(self):
     79         self.assertRaises(TypeError, select.epoll, 1, 2, 3)
     80         self.assertRaises(TypeError, select.epoll, 'foo')
     81         self.assertRaises(TypeError, select.epoll, None)
     82         self.assertRaises(TypeError, select.epoll, ())
     83         self.assertRaises(TypeError, select.epoll, ['foo'])
     84         self.assertRaises(TypeError, select.epoll, {})
     85 
     86     def test_add(self):
     87         server, client = self._connected_pair()
     88 
     89         ep = select.epoll(2)
     90         try:
     91             ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
     92             ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
     93         finally:
     94             ep.close()
     95 
     96         # adding by object w/ fileno works, too.

     97         ep = select.epoll(2)
     98         try:
     99             ep.register(server, select.EPOLLIN | select.EPOLLOUT)
    100             ep.register(client, select.EPOLLIN | select.EPOLLOUT)
    101         finally:
    102             ep.close()
    103 
    104         ep = select.epoll(2)
    105         try:
    106             # TypeError: argument must be an int, or have a fileno() method.

    107             self.assertRaises(TypeError, ep.register, object(),
    108                 select.EPOLLIN | select.EPOLLOUT)
    109             self.assertRaises(TypeError, ep.register, None,
    110                 select.EPOLLIN | select.EPOLLOUT)
    111             # ValueError: file descriptor cannot be a negative integer (-1)

    112             self.assertRaises(ValueError, ep.register, -1,
    113                 select.EPOLLIN | select.EPOLLOUT)
    114             # IOError: [Errno 9] Bad file descriptor

    115             self.assertRaises(IOError, ep.register, 10000,
    116                 select.EPOLLIN | select.EPOLLOUT)
    117             # registering twice also raises an exception

    118             ep.register(server, select.EPOLLIN | select.EPOLLOUT)
    119             self.assertRaises(IOError, ep.register, server,
    120                 select.EPOLLIN | select.EPOLLOUT)
    121         finally:
    122             ep.close()
    123 
    124     def test_fromfd(self):
    125         server, client = self._connected_pair()
    126 
    127         ep = select.epoll(2)
    128         ep2 = select.epoll.fromfd(ep.fileno())
    129 
    130         ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
    131         ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
    132 
    133         events = ep.poll(1, 4)
    134         events2 = ep2.poll(0.9, 4)
    135         self.assertEqual(len(events), 2)
    136         self.assertEqual(len(events2), 2)
    137 
    138         ep.close()
    139         try:
    140             ep2.poll(1, 4)
    141         except IOError, e:
    142             self.assertEqual(e.args[0], errno.EBADF, e)
    143         else:
    144             self.fail("epoll on closed fd didn't raise EBADF")
    145 
    146     def test_control_and_wait(self):
    147         client, server = self._connected_pair()
    148 
    149         ep = select.epoll(16)
    150         ep.register(server.fileno(),
    151                    select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
    152         ep.register(client.fileno(),
    153                    select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
    154 
    155         now = time.time()
    156         events = ep.poll(1, 4)
    157         then = time.time()
    158         self.assertFalse(then - now > 0.1, then - now)
    159 
    160         events.sort()
    161         expected = [(client.fileno(), select.EPOLLOUT),
    162                     (server.fileno(), select.EPOLLOUT)]
    163         expected.sort()
    164 
    165         self.assertEqual(events, expected)
    166         self.assertFalse(then - now > 0.01, then - now)
    167 
    168         now = time.time()
    169         events = ep.poll(timeout=2.1, maxevents=4)
    170         then = time.time()
    171         self.assertFalse(events)
    172 
    173         client.send("Hello!")
    174         server.send("world!!!")
    175 
    176         now = time.time()
    177         events = ep.poll(1, 4)
    178         then = time.time()
    179         self.assertFalse(then - now > 0.01)
    180 
    181         events.sort()
    182         expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
    183                     (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
    184         expected.sort()
    185 
    186         self.assertEqual(events, expected)
    187 
    188         ep.unregister(client.fileno())
    189         ep.modify(server.fileno(), select.EPOLLOUT)
    190         now = time.time()
    191         events = ep.poll(1, 4)
    192         then = time.time()
    193         self.assertFalse(then - now > 0.01)
    194 
    195         expected = [(server.fileno(), select.EPOLLOUT)]
    196         self.assertEqual(events, expected)
    197 
    198     def test_errors(self):
    199         self.assertRaises(ValueError, select.epoll, -2)
    200         self.assertRaises(ValueError, select.epoll().register, -1,
    201                           select.EPOLLIN)
    202 
    203     def test_unregister_closed(self):
    204         server, client = self._connected_pair()
    205         fd = server.fileno()
    206         ep = select.epoll(16)
    207         ep.register(server)
    208 
    209         now = time.time()
    210         events = ep.poll(1, 4)
    211         then = time.time()
    212         self.assertFalse(then - now > 0.01)
    213 
    214         server.close()
    215         ep.unregister(fd)
    216 
    217 def test_main():
    218     test_support.run_unittest(TestEPoll)
    219 
    220 if __name__ == "__main__":
    221     test_main()
    222