Home | History | Annotate | Download | only in test
      1 # Copyright (c) 2001-2006 Twisted Matrix Laboratories.
      2 #
      3 # Permission is hereby granted, free of charge, to any person obtaining
      4 # a copy of this software and associated documentation files (the
      5 # "Software"), to deal in the Software without restriction, including
      6 # without limitation the rights to use, copy, modify, merge, publish,
      7 # distribute, sublicense, and/or sell copies of the Software, and to
      8 # permit persons to whom the Software is furnished to do so, subject to
      9 # the following conditions:
     10 #
     11 # The above copyright notice and this permission notice shall be
     12 # included in all copies or substantial portions of the Software.
     13 #
     14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     15 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
     18 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
     19 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
     20 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
     21 """
     22 Tests for epoll wrapper.
     23 """
     24 import errno
     25 import os
     26 import select
     27 import socket
     28 import time
     29 import unittest
     30 
     31 if not hasattr(select, "epoll"):
     32     raise unittest.SkipTest("test works only on Linux 2.6")
     33 
     34 try:
     35     select.epoll()
     36 except OSError as e:
     37     if e.errno == errno.ENOSYS:
     38         raise unittest.SkipTest("kernel doesn't support epoll()")
     39     raise
     40 
     41 class TestEPoll(unittest.TestCase):
     42 
     43     def setUp(self):
     44         self.serverSocket = socket.socket()
     45         self.serverSocket.bind(('127.0.0.1', 0))
     46         self.serverSocket.listen()
     47         self.connections = [self.serverSocket]
     48 
     49     def tearDown(self):
     50         for skt in self.connections:
     51             skt.close()
     52 
     53     def _connected_pair(self):
     54         client = socket.socket()
     55         client.setblocking(False)
     56         try:
     57             client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
     58         except OSError as e:
     59             self.assertEqual(e.args[0], errno.EINPROGRESS)
     60         else:
     61             raise AssertionError("Connect should have raised EINPROGRESS")
     62         server, addr = self.serverSocket.accept()
     63 
     64         self.connections.extend((client, server))
     65         return client, server
     66 
     67     def test_create(self):
     68         try:
     69             ep = select.epoll(16)
     70         except OSError as e:
     71             raise AssertionError(str(e))
     72         self.assertTrue(ep.fileno() > 0, ep.fileno())
     73         self.assertTrue(not ep.closed)
     74         ep.close()
     75         self.assertTrue(ep.closed)
     76         self.assertRaises(ValueError, ep.fileno)
     77         if hasattr(select, "EPOLL_CLOEXEC"):
     78             select.epoll(select.EPOLL_CLOEXEC).close()
     79             self.assertRaises(OSError, select.epoll, flags=12356)
     80 
     81     def test_badcreate(self):
     82         self.assertRaises(TypeError, select.epoll, 1, 2, 3)
     83         self.assertRaises(TypeError, select.epoll, 'foo')
     84         self.assertRaises(TypeError, select.epoll, None)
     85         self.assertRaises(TypeError, select.epoll, ())
     86         self.assertRaises(TypeError, select.epoll, ['foo'])
     87         self.assertRaises(TypeError, select.epoll, {})
     88 
     89     def test_context_manager(self):
     90         with select.epoll(16) as ep:
     91             self.assertGreater(ep.fileno(), 0)
     92             self.assertFalse(ep.closed)
     93         self.assertTrue(ep.closed)
     94         self.assertRaises(ValueError, ep.fileno)
     95 
     96     def test_add(self):
     97         server, client = self._connected_pair()
     98 
     99         ep = select.epoll(2)
    100         try:
    101             ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
    102             ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
    103         finally:
    104             ep.close()
    105 
    106         # adding by object w/ fileno works, too.
    107         ep = select.epoll(2)
    108         try:
    109             ep.register(server, select.EPOLLIN | select.EPOLLOUT)
    110             ep.register(client, select.EPOLLIN | select.EPOLLOUT)
    111         finally:
    112             ep.close()
    113 
    114         ep = select.epoll(2)
    115         try:
    116             # TypeError: argument must be an int, or have a fileno() method.
    117             self.assertRaises(TypeError, ep.register, object(),
    118                 select.EPOLLIN | select.EPOLLOUT)
    119             self.assertRaises(TypeError, ep.register, None,
    120                 select.EPOLLIN | select.EPOLLOUT)
    121             # ValueError: file descriptor cannot be a negative integer (-1)
    122             self.assertRaises(ValueError, ep.register, -1,
    123                 select.EPOLLIN | select.EPOLLOUT)
    124             # OSError: [Errno 9] Bad file descriptor
    125             self.assertRaises(OSError, ep.register, 10000,
    126                 select.EPOLLIN | select.EPOLLOUT)
    127             # registering twice also raises an exception
    128             ep.register(server, select.EPOLLIN | select.EPOLLOUT)
    129             self.assertRaises(OSError, ep.register, server,
    130                 select.EPOLLIN | select.EPOLLOUT)
    131         finally:
    132             ep.close()
    133 
    134     def test_fromfd(self):
    135         server, client = self._connected_pair()
    136 
    137         ep = select.epoll(2)
    138         ep2 = select.epoll.fromfd(ep.fileno())
    139 
    140         ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
    141         ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
    142 
    143         events = ep.poll(1, 4)
    144         events2 = ep2.poll(0.9, 4)
    145         self.assertEqual(len(events), 2)
    146         self.assertEqual(len(events2), 2)
    147 
    148         ep.close()
    149         try:
    150             ep2.poll(1, 4)
    151         except OSError as e:
    152             self.assertEqual(e.args[0], errno.EBADF, e)
    153         else:
    154             self.fail("epoll on closed fd didn't raise EBADF")
    155 
    156     def test_control_and_wait(self):
    157         client, server = self._connected_pair()
    158 
    159         ep = select.epoll(16)
    160         ep.register(server.fileno(),
    161                    select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
    162         ep.register(client.fileno(),
    163                    select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
    164 
    165         now = time.monotonic()
    166         events = ep.poll(1, 4)
    167         then = time.monotonic()
    168         self.assertFalse(then - now > 0.1, then - now)
    169 
    170         events.sort()
    171         expected = [(client.fileno(), select.EPOLLOUT),
    172                     (server.fileno(), select.EPOLLOUT)]
    173         expected.sort()
    174 
    175         self.assertEqual(events, expected)
    176 
    177         events = ep.poll(timeout=2.1, maxevents=4)
    178         self.assertFalse(events)
    179 
    180         client.send(b"Hello!")
    181         server.send(b"world!!!")
    182 
    183         now = time.monotonic()
    184         events = ep.poll(1, 4)
    185         then = time.monotonic()
    186         self.assertFalse(then - now > 0.01)
    187 
    188         events.sort()
    189         expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
    190                     (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
    191         expected.sort()
    192 
    193         self.assertEqual(events, expected)
    194 
    195         ep.unregister(client.fileno())
    196         ep.modify(server.fileno(), select.EPOLLOUT)
    197         now = time.monotonic()
    198         events = ep.poll(1, 4)
    199         then = time.monotonic()
    200         self.assertFalse(then - now > 0.01)
    201 
    202         expected = [(server.fileno(), select.EPOLLOUT)]
    203         self.assertEqual(events, expected)
    204 
    205     def test_errors(self):
    206         self.assertRaises(ValueError, select.epoll, -2)
    207         self.assertRaises(ValueError, select.epoll().register, -1,
    208                           select.EPOLLIN)
    209 
    210     def test_unregister_closed(self):
    211         server, client = self._connected_pair()
    212         fd = server.fileno()
    213         ep = select.epoll(16)
    214         ep.register(server)
    215 
    216         now = time.monotonic()
    217         events = ep.poll(1, 4)
    218         then = time.monotonic()
    219         self.assertFalse(then - now > 0.01)
    220 
    221         server.close()
    222         ep.unregister(fd)
    223 
    224     def test_close(self):
    225         open_file = open(__file__, "rb")
    226         self.addCleanup(open_file.close)
    227         fd = open_file.fileno()
    228         epoll = select.epoll()
    229 
    230         # test fileno() method and closed attribute
    231         self.assertIsInstance(epoll.fileno(), int)
    232         self.assertFalse(epoll.closed)
    233 
    234         # test close()
    235         epoll.close()
    236         self.assertTrue(epoll.closed)
    237         self.assertRaises(ValueError, epoll.fileno)
    238 
    239         # close() can be called more than once
    240         epoll.close()
    241 
    242         # operations must fail with ValueError("I/O operation on closed ...")
    243         self.assertRaises(ValueError, epoll.modify, fd, select.EPOLLIN)
    244         self.assertRaises(ValueError, epoll.poll, 1.0)
    245         self.assertRaises(ValueError, epoll.register, fd, select.EPOLLIN)
    246         self.assertRaises(ValueError, epoll.unregister, fd)
    247 
    248     def test_fd_non_inheritable(self):
    249         epoll = select.epoll()
    250         self.addCleanup(epoll.close)
    251         self.assertEqual(os.get_inheritable(epoll.fileno()), False)
    252 
    253 
    254 if __name__ == "__main__":
    255     unittest.main()
    256