Home | History | Annotate | Download | only in src
      1 #!/usr/bin/env python
      2 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 import ctypes
      7 import select
      8 
      9 import xi2
     10 import xlib
     11 
     12 
     13 class XI2Reader(object):
     14     """A reader to create connection to X server and read x input events."""
     15     def __init__(self, display_name=':0'):
     16         """Constructor
     17 
     18         Args:
     19             display_name: The X window display name.
     20         """
     21         self._display = xlib.XOpenDisplay(display_name)
     22         self._window = xlib.XDefaultRootWindow(self._display)
     23         self._data = []
     24 
     25         self._register()
     26 
     27         # Consumes the very first traffic within the connection with X server.
     28         xlib.XFlush(self._display)
     29 
     30     def _register(self):
     31         """Registers device and events to listen on"""
     32         mask = xi2.XIEventMask()
     33         mask.deviceid = xi2.XIAllDevices
     34         mask.mask_len = xi2.XIMaskLen(xi2.XI_RawMotion)
     35         mask.mask = ctypes.cast((ctypes.c_ubyte * mask.mask_len)(),
     36                                 ctypes.POINTER(ctypes.c_ubyte))
     37 
     38         self._set_mask(mask.mask, xi2.XI_RawKeyPress)
     39         self._set_mask(mask.mask, xi2.XI_RawKeyRelease)
     40         self._set_mask(mask.mask, xi2.XI_RawButtonPress)
     41         self._set_mask(mask.mask, xi2.XI_RawButtonRelease)
     42         self._set_mask(mask.mask, xi2.XI_RawMotion)
     43 
     44         xi2.XISelectEvents(self._display, self._window, ctypes.pointer(mask), 1)
     45         xlib.XSelectInput(self._display, self._window, ctypes.c_long(0))
     46 
     47     def _set_mask(self, ptr, event):
     48         """Sets event mask"""
     49         val = xi2.XISetMask(ptr, event)
     50         ptr[event >> 3] = val
     51 
     52     def get_valuator_names(self, device_id):
     53         """Gets the valuator names for device.
     54 
     55         Return:
     56             An dictionary maps valuator index to descriptive names.
     57             Sample output:
     58             {
     59                 0: 'Rel X',
     60                 1: 'Rel Y',
     61                 2: 'Abs Start Timestamp',
     62                 3: 'Abs End Timestamp',
     63                 4: 'Rel Vert Wheel',
     64                 5: 'Rel Horiz Wheel'
     65             }
     66         """
     67         num_devices = ctypes.c_int()
     68         device = xi2.XIQueryDevice(self._display, device_id,
     69                 ctypes.pointer(num_devices)).contents
     70 
     71         valuator_names = []
     72         for i in range(device.num_classes):
     73             if device.classes[i].contents.type == xi2.XIValuatorClass:
     74                 valuator_class_info = ctypes.cast(device.classes[i],
     75                         ctypes.POINTER(xi2.XIValuatorClassInfo)).contents
     76                 valuator_names.append(xlib.XGetAtomName(reader._display,
     77                         valuator_class_info.label))
     78         valuator_names_dict = {}
     79         for i in range(len(valuator_names)):
     80             valuator_names_dict[i] = valuator_names[i]
     81         return valuator_names_dict
     82 
     83     def get_connection_number(self):
     84         """Gets the file descriptor number for the connection with X server"""
     85         return xlib.XConnectionNumber(reader._display)
     86 
     87     def read_pending_events(self):
     88         """Read all the new event datas.
     89 
     90         Return:
     91             An array contains all event data with event type and valuator
     92             values. Sample format:
     93             {
     94                 'deviceid': 11,
     95                 'evtype': 17,
     96                 'time': 406752437L,
     97                 'valuators': {
     98                     0: (396.0, -38.0),
     99                     1: (578.0, -21.0),
    100                     2: (22802890.0, 22802890.0),
    101                     3: (26145746.0, 26145746.0)
    102                 }
    103             }
    104         """
    105         data = []
    106         while xlib.XPending(self._display):
    107             xevent = xlib.XEvent()
    108             xlib.XNextEvent(self._display, ctypes.pointer(xevent))
    109             cookie = xevent.xcookie
    110 
    111             # Get event valuator_data
    112             result = xlib.XGetEventData(self._display, ctypes.pointer(cookie))
    113             if (not result or cookie.type != xlib.GenericEvent):
    114                 continue
    115 
    116             raw_event_ptr = ctypes.cast(cookie.data,
    117                     ctypes.POINTER(xi2.XIRawEvent))
    118             raw_event = raw_event_ptr.contents
    119             valuator_state = raw_event.valuators
    120 
    121             # Two value arrays
    122             val_ptr = valuator_state.values
    123             val_idx = 0
    124             raw_val_ptr = raw_event.raw_values
    125             raw_val_idx = 0
    126 
    127             valuator_data = {}
    128             for i in range(valuator_state.mask_len):
    129                 if xi2.XIMaskIsSet(valuator_state.mask, i):
    130                     valuator_data[i] = (val_ptr[val_idx],
    131                             raw_val_ptr[raw_val_idx])
    132                     val_idx += 1
    133                     raw_val_idx += 1
    134             data.append({'deviceid': raw_event.deviceid,
    135                          'evtype': cookie.evtype,
    136                          'time': raw_event.time,
    137                          'valuators': valuator_data})
    138         return data
    139 
    140 
    141 if __name__ == '__main__':
    142     reader = XI2Reader()
    143     fd = reader.get_connection_number()
    144 
    145     while True:
    146         rl, _, _ = select.select([fd], [], [])
    147         if fd not in rl:
    148             break
    149         print reader.read_pending_events()
    150