Home | History | Annotate | Download | only in event
      1 """
      2 Test lldb Python event APIs.
      3 """
      4 
      5 import os, time
      6 import re
      7 import unittest2
      8 import lldb, lldbutil
      9 from lldbtest import *
     10 
     11 class EventAPITestCase(TestBase):
     12 
     13     mydir = os.path.join("python_api", "event")
     14 
     15     @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
     16     @python_api_test
     17     @dsym_test
     18     def test_listen_for_and_print_event_with_dsym(self):
     19         """Exercise SBEvent API."""
     20         self.buildDsym()
     21         self.do_listen_for_and_print_event()
     22 
     23     @python_api_test
     24     @dwarf_test
     25     def test_listen_for_and_print_event_with_dwarf(self):
     26         """Exercise SBEvent API."""
     27         self.buildDwarf()
     28         self.do_listen_for_and_print_event()
     29 
     30     @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
     31     @python_api_test
     32     @dsym_test
     33     def test_wait_for_event_with_dsym(self):
     34         """Exercise SBListener.WaitForEvent() API."""
     35         self.buildDsym()
     36         self.do_wait_for_event()
     37 
     38     @python_api_test
     39     @dwarf_test
     40     def test_wait_for_event_with_dwarf(self):
     41         """Exercise SBListener.WaitForEvent() API."""
     42         self.buildDwarf()
     43         self.do_wait_for_event()
     44 
     45     @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
     46     @python_api_test
     47     @dsym_test
     48     def test_add_listener_to_broadcaster_with_dsym(self):
     49         """Exercise some SBBroadcaster APIs."""
     50         self.buildDsym()
     51         self.do_add_listener_to_broadcaster()
     52 
     53     @python_api_test
     54     @dwarf_test
     55     def test_add_listener_to_broadcaster_with_dwarf(self):
     56         """Exercise some SBBroadcaster APIs."""
     57         self.buildDwarf()
     58         self.do_add_listener_to_broadcaster()
     59 
     60     def setUp(self):
     61         # Call super's setUp().
     62         TestBase.setUp(self)
     63         # Find the line number to of function 'c'.
     64         self.line = line_number('main.c', '// Find the line number of function "c" here.')
     65 
     66     def do_listen_for_and_print_event(self):
     67         """Create a listener and use SBEvent API to print the events received."""
     68         exe = os.path.join(os.getcwd(), "a.out")
     69 
     70         # Create a target by the debugger.
     71         target = self.dbg.CreateTarget(exe)
     72         self.assertTrue(target, VALID_TARGET)
     73 
     74         # Now create a breakpoint on main.c by name 'c'.
     75         breakpoint = target.BreakpointCreateByName('c', 'a.out')
     76 
     77         # Now launch the process, and do not stop at the entry point.
     78         process = target.LaunchSimple(None, None, os.getcwd())
     79         self.assertTrue(process.GetState() == lldb.eStateStopped,
     80                         PROCESS_STOPPED)
     81 
     82         # Get a handle on the process's broadcaster.
     83         broadcaster = process.GetBroadcaster()
     84 
     85         # Create an empty event object.
     86         event = lldb.SBEvent()
     87 
     88         # Create a listener object and register with the broadcaster.
     89         listener = lldb.SBListener("my listener")
     90         rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
     91         self.assertTrue(rc, "AddListener successfully retruns")
     92 
     93         traceOn = self.TraceOn()
     94         if traceOn:
     95             lldbutil.print_stacktraces(process)
     96 
     97         # Create MyListeningThread class to wait for any kind of event.
     98         import threading
     99         class MyListeningThread(threading.Thread):
    100             def run(self):
    101                 count = 0
    102                 # Let's only try at most 4 times to retrieve any kind of event.
    103                 # After that, the thread exits.
    104                 while not count > 3:
    105                     if traceOn:
    106                         print "Try wait for event..."
    107                     if listener.WaitForEventForBroadcasterWithType(5,
    108                                                                    broadcaster,
    109                                                                    lldb.SBProcess.eBroadcastBitStateChanged,
    110                                                                    event):
    111                         if traceOn:
    112                             desc = lldbutil.get_description(event)
    113                             print "Event description:", desc
    114                             print "Event data flavor:", event.GetDataFlavor()
    115                             print "Process state:", lldbutil.state_type_to_str(process.GetState())
    116                             print
    117                     else:
    118                         if traceOn:
    119                             print "timeout occurred waiting for event..."
    120                     count = count + 1
    121                 return
    122 
    123         # Let's start the listening thread to retrieve the events.
    124         my_thread = MyListeningThread()
    125         my_thread.start()
    126 
    127         # Use Python API to continue the process.  The listening thread should be
    128         # able to receive the state changed events.
    129         process.Continue()
    130 
    131         # Use Python API to kill the process.  The listening thread should be
    132         # able to receive the state changed event, too.
    133         process.Kill()
    134 
    135         # Wait until the 'MyListeningThread' terminates.
    136         my_thread.join()
    137 
    138     def do_wait_for_event(self):
    139         """Get the listener associated with the debugger and exercise WaitForEvent API."""
    140         exe = os.path.join(os.getcwd(), "a.out")
    141 
    142         # Create a target by the debugger.
    143         target = self.dbg.CreateTarget(exe)
    144         self.assertTrue(target, VALID_TARGET)
    145 
    146         # Now create a breakpoint on main.c by name 'c'.
    147         breakpoint = target.BreakpointCreateByName('c', 'a.out')
    148         #print "breakpoint:", breakpoint
    149         self.assertTrue(breakpoint and
    150                         breakpoint.GetNumLocations() == 1,
    151                         VALID_BREAKPOINT)
    152 
    153         # Get the debugger listener.
    154         listener = self.dbg.GetListener()
    155 
    156         # Now launch the process, and do not stop at entry point.
    157         error = lldb.SBError()
    158         process = target.Launch (listener, None, None, None, None, None, None, 0, False, error)
    159         self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
    160 
    161         # Get a handle on the process's broadcaster.
    162         broadcaster = process.GetBroadcaster()
    163         self.assertTrue(broadcaster, "Process with valid broadcaster")
    164 
    165         # Create an empty event object.
    166         event = lldb.SBEvent()
    167         self.assertFalse(event, "Event should not be valid initially")
    168 
    169         # Create MyListeningThread to wait for any kind of event.
    170         import threading
    171         class MyListeningThread(threading.Thread):
    172             def run(self):
    173                 count = 0
    174                 # Let's only try at most 3 times to retrieve any kind of event.
    175                 while not count > 3:
    176                     if listener.WaitForEvent(5, event):
    177                         #print "Got a valid event:", event
    178                         #print "Event data flavor:", event.GetDataFlavor()
    179                         #print "Event type:", lldbutil.state_type_to_str(event.GetType())
    180                         return
    181                     count = count + 1
    182                     print "Timeout: listener.WaitForEvent"
    183 
    184                 return
    185 
    186         # Use Python API to kill the process.  The listening thread should be
    187         # able to receive a state changed event.
    188         process.Kill()
    189 
    190         # Let's start the listening thread to retrieve the event.
    191         my_thread = MyListeningThread()
    192         my_thread.start()
    193 
    194         # Wait until the 'MyListeningThread' terminates.
    195         my_thread.join()
    196 
    197         self.assertTrue(event,
    198                         "My listening thread successfully received an event")
    199 
    200     def do_add_listener_to_broadcaster(self):
    201         """Get the broadcaster associated with the process and wait for broadcaster events."""
    202         exe = os.path.join(os.getcwd(), "a.out")
    203 
    204         # Create a target by the debugger.
    205         target = self.dbg.CreateTarget(exe)
    206         self.assertTrue(target, VALID_TARGET)
    207 
    208         # Now create a breakpoint on main.c by name 'c'.
    209         breakpoint = target.BreakpointCreateByName('c', 'a.out')
    210         #print "breakpoint:", breakpoint
    211         self.assertTrue(breakpoint and
    212                         breakpoint.GetNumLocations() == 1,
    213                         VALID_BREAKPOINT)
    214 
    215         # Now launch the process, and do not stop at the entry point.
    216         process = target.LaunchSimple(None, None, os.getcwd())
    217         self.assertTrue(process.GetState() == lldb.eStateStopped,
    218                         PROCESS_STOPPED)
    219 
    220         # Get a handle on the process's broadcaster.
    221         broadcaster = process.GetBroadcaster()
    222         self.assertTrue(broadcaster, "Process with valid broadcaster")
    223 
    224         # Create an empty event object.
    225         event = lldb.SBEvent()
    226         self.assertFalse(event, "Event should not be valid initially")
    227 
    228         # Create a listener object and register with the broadcaster.
    229         listener = lldb.SBListener("TestEvents.listener")
    230         rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
    231         self.assertTrue(rc, "AddListener successfully retruns")
    232 
    233         # The finite state machine for our custom listening thread, with an
    234         # initail state of 0, which means a "running" event is expected.
    235         # It changes to 1 after "running" is received.
    236         # It cahnges to 2 after "stopped" is received.
    237         # 2 will be our final state and the test is complete.
    238         self.state = 0 
    239 
    240         # Create MyListeningThread to wait for state changed events.
    241         # By design, a "running" event is expected following by a "stopped" event.
    242         import threading
    243         class MyListeningThread(threading.Thread):
    244             def run(self):
    245                 #print "Running MyListeningThread:", self
    246 
    247                 # Regular expression pattern for the event description.
    248                 pattern = re.compile("data = {.*, state = (.*)}$")
    249 
    250                 # Let's only try at most 6 times to retrieve our events.
    251                 count = 0
    252                 while True:
    253                     if listener.WaitForEventForBroadcasterWithType(5,
    254                                                                    broadcaster,
    255                                                                    lldb.SBProcess.eBroadcastBitStateChanged,
    256                                                                    event):
    257                         desc = lldbutil.get_description(event)
    258                         #print "Event description:", desc
    259                         match = pattern.search(desc)
    260                         if not match:
    261                             break;
    262                         if self.context.state == 0 and match.group(1) == 'running':
    263                             self.context.state = 1
    264                             continue
    265                         elif self.context.state == 1 and match.group(1) == 'stopped':
    266                             # Whoopee, both events have been received!
    267                             self.context.state = 2
    268                             break
    269                         else:
    270                             break
    271                     print "Timeout: listener.WaitForEvent"
    272                     count = count + 1
    273                     if count > 6:
    274                         break
    275 
    276                 return
    277 
    278         # Use Python API to continue the process.  The listening thread should be
    279         # able to receive the state changed events.
    280         process.Continue()
    281 
    282         # Start the listening thread to receive the "running" followed by the
    283         # "stopped" events.
    284         my_thread = MyListeningThread()
    285         # Supply the enclosing context so that our listening thread can access
    286         # the 'state' variable.
    287         my_thread.context = self
    288         my_thread.start()
    289 
    290         # Wait until the 'MyListeningThread' terminates.
    291         my_thread.join()
    292 
    293         # We are no longer interested in receiving state changed events.
    294         # Remove our custom listener before the inferior is killed.
    295         broadcaster.RemoveListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
    296 
    297         # The final judgement. :-)
    298         self.assertTrue(self.state == 2,
    299                         "Both expected state changed events received")
    300 
    301         
    302 if __name__ == '__main__':
    303     import atexit
    304     lldb.SBDebugger.Initialize()
    305     atexit.register(lambda: lldb.SBDebugger.Terminate())
    306     unittest2.main()
    307