Home | History | Annotate | Download | only in tests
      1 # Copyright 2014 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import unittest
      6 
      7 # pylint: disable=F0401
      8 import mojo.embedder
      9 from mojo import system
     10 
     11 
     12 class AsyncWaitTest(unittest.TestCase):
     13 
     14   def setUp(self):
     15     mojo.embedder.Init()
     16     self.loop = system.RunLoop()
     17     self.array = []
     18     self.handles = system.MessagePipe()
     19     self.cancel = self.handles.handle0.AsyncWait(system.HANDLE_SIGNAL_READABLE,
     20                                                  system.DEADLINE_INDEFINITE,
     21                                                  self._OnResult)
     22 
     23   def tearDown(self):
     24     self.cancel()
     25     self.handles = None
     26     self.array = None
     27     self.loop = None
     28 
     29   def _OnResult(self, value):
     30     self.array.append(value)
     31 
     32   def _WriteToHandle(self):
     33     self.handles.handle1.WriteMessage()
     34 
     35   def _PostWriteAndRun(self):
     36     self.loop.PostDelayedTask(self._WriteToHandle, 0)
     37     self.loop.RunUntilIdle()
     38 
     39   def testAsyncWait(self):
     40     self._PostWriteAndRun()
     41     self.assertEquals(len(self.array), 1)
     42     self.assertEquals(system.RESULT_OK, self.array[0])
     43 
     44   def testAsyncWaitCancel(self):
     45     self.loop.PostDelayedTask(self.cancel, 0)
     46     self._PostWriteAndRun()
     47     self.assertEquals(len(self.array), 0)
     48 
     49   def testAsyncWaitImmediateCancel(self):
     50     self.cancel()
     51     self._PostWriteAndRun()
     52     self.assertEquals(len(self.array), 0)
     53