Home | History | Annotate | Download | only in test
      1 # Test case for the select.devpoll() function
      2 
      3 # Initial tests are copied as is from "test_poll.py"
      4 
      5 import os
      6 import random
      7 import select
      8 import unittest
      9 from test.support import run_unittest, cpython_only
     10 
     11 if not hasattr(select, 'devpoll') :
     12     raise unittest.SkipTest('test works only on Solaris OS family')
     13 
     14 
     15 def find_ready_matching(ready, flag):
     16     match = []
     17     for fd, mode in ready:
     18         if mode & flag:
     19             match.append(fd)
     20     return match
     21 
     22 class DevPollTests(unittest.TestCase):
     23 
     24     def test_devpoll1(self):
     25         # Basic functional test of poll object
     26         # Create a bunch of pipe and test that poll works with them.
     27 
     28         p = select.devpoll()
     29 
     30         NUM_PIPES = 12
     31         MSG = b" This is a test."
     32         MSG_LEN = len(MSG)
     33         readers = []
     34         writers = []
     35         r2w = {}
     36         w2r = {}
     37 
     38         for i in range(NUM_PIPES):
     39             rd, wr = os.pipe()
     40             p.register(rd)
     41             p.modify(rd, select.POLLIN)
     42             p.register(wr, select.POLLOUT)
     43             readers.append(rd)
     44             writers.append(wr)
     45             r2w[rd] = wr
     46             w2r[wr] = rd
     47 
     48         bufs = []
     49 
     50         while writers:
     51             ready = p.poll()
     52             ready_writers = find_ready_matching(ready, select.POLLOUT)
     53             if not ready_writers:
     54                 self.fail("no pipes ready for writing")
     55             wr = random.choice(ready_writers)
     56             os.write(wr, MSG)
     57 
     58             ready = p.poll()
     59             ready_readers = find_ready_matching(ready, select.POLLIN)
     60             if not ready_readers:
     61                 self.fail("no pipes ready for reading")
     62             self.assertEqual([w2r[wr]], ready_readers)
     63             rd = ready_readers[0]
     64             buf = os.read(rd, MSG_LEN)
     65             self.assertEqual(len(buf), MSG_LEN)
     66             bufs.append(buf)
     67             os.close(r2w[rd]) ; os.close(rd)
     68             p.unregister(r2w[rd])
     69             p.unregister(rd)
     70             writers.remove(r2w[rd])
     71 
     72         self.assertEqual(bufs, [MSG] * NUM_PIPES)
     73 
     74     def test_timeout_overflow(self):
     75         pollster = select.devpoll()
     76         w, r = os.pipe()
     77         pollster.register(w)
     78 
     79         pollster.poll(-1)
     80         self.assertRaises(OverflowError, pollster.poll, -2)
     81         self.assertRaises(OverflowError, pollster.poll, -1 << 31)
     82         self.assertRaises(OverflowError, pollster.poll, -1 << 64)
     83 
     84         pollster.poll(0)
     85         pollster.poll(1)
     86         pollster.poll(1 << 30)
     87         self.assertRaises(OverflowError, pollster.poll, 1 << 31)
     88         self.assertRaises(OverflowError, pollster.poll, 1 << 63)
     89         self.assertRaises(OverflowError, pollster.poll, 1 << 64)
     90 
     91     def test_close(self):
     92         open_file = open(__file__, "rb")
     93         self.addCleanup(open_file.close)
     94         fd = open_file.fileno()
     95         devpoll = select.devpoll()
     96 
     97         # test fileno() method and closed attribute
     98         self.assertIsInstance(devpoll.fileno(), int)
     99         self.assertFalse(devpoll.closed)
    100 
    101         # test close()
    102         devpoll.close()
    103         self.assertTrue(devpoll.closed)
    104         self.assertRaises(ValueError, devpoll.fileno)
    105 
    106         # close() can be called more than once
    107         devpoll.close()
    108 
    109         # operations must fail with ValueError("I/O operation on closed ...")
    110         self.assertRaises(ValueError, devpoll.modify, fd, select.POLLIN)
    111         self.assertRaises(ValueError, devpoll.poll)
    112         self.assertRaises(ValueError, devpoll.register, fd, fd, select.POLLIN)
    113         self.assertRaises(ValueError, devpoll.unregister, fd)
    114 
    115     def test_fd_non_inheritable(self):
    116         devpoll = select.devpoll()
    117         self.addCleanup(devpoll.close)
    118         self.assertEqual(os.get_inheritable(devpoll.fileno()), False)
    119 
    120     def test_events_mask_overflow(self):
    121         pollster = select.devpoll()
    122         w, r = os.pipe()
    123         pollster.register(w)
    124         # Issue #17919
    125         self.assertRaises(OverflowError, pollster.register, 0, -1)
    126         self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
    127         self.assertRaises(OverflowError, pollster.modify, 1, -1)
    128         self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
    129 
    130     @cpython_only
    131     def test_events_mask_overflow_c_limits(self):
    132         from _testcapi import USHRT_MAX
    133         pollster = select.devpoll()
    134         w, r = os.pipe()
    135         pollster.register(w)
    136         # Issue #17919
    137         self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
    138         self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
    139 
    140 
    141 def test_main():
    142     run_unittest(DevPollTests)
    143 
    144 if __name__ == '__main__':
    145     test_main()
    146