Home | History | Annotate | Download | only in test
      1 #!/usr/bin/env python
      2 
      3 import dbus
      4 import dbus.decorators
      5 import dbus.glib
      6 import gobject
      7 import sys
      8 import getopt
      9 from signal import *
     10 
     11 mgr_cmds = [ "InterfaceVersion", "ListAdapters", "DefaultAdapter" ]
     12 mgr_signals = [ "AdapterAdded", "AdapterRemoved" ]
     13 
     14 dev_cmds = [ "GetAddress",
     15              "GetVersion",
     16              "GetRevision",
     17              "GetManufacturer",
     18              "GetCompany",
     19              "GetMode",
     20              "SetMode",
     21              "GetDiscoverableTimeout",
     22              "SetDiscoverableTimeout",
     23              "IsConnectable",
     24              "IsDiscoverable",
     25              "IsConnected",
     26              "ListConnections",
     27              "GetMajorClass",
     28              "ListAvailableMinorClasses",
     29              "GetMinorClass",
     30              "SetMinorClass",
     31              "GetServiceClasses",
     32              "GetName",
     33              "SetName",
     34              "GetRemoteVersion",
     35              "GetRemoteRevision",
     36              "GetRemoteManufacturer",
     37              "GetRemoteCompany",
     38              "GetRemoteMajorClass",
     39              "GetRemoteMinorClass",
     40              "GetRemoteServiceClasses",
     41              "GetRemoteClass",
     42              "GetRemoteName",
     43              "GetRemoteAlias",
     44              "SetRemoteAlias",
     45              "ClearRemoteAlias",
     46              "LastSeen",
     47              "LastUsed",
     48              "DisconnectRemoteDevice",
     49              "CreateBonding",
     50              "CancelBondingProcess",
     51              "RemoveBonding",
     52              "HasBonding",
     53              "ListBondings",
     54              "GetPinCodeLength",
     55              "GetEncryptionKeySize",
     56              "DiscoverDevices",
     57              "DiscoverDevicesWithoutNameResolving",
     58              "CancelDiscovery",
     59              "ListRemoteDevices",
     60              "ListRecentRemoteDevices" ]
     61 dev_signals = [ "ModeChanged",
     62                 "NameChanged",
     63                 "MinorClassChanged",
     64                 "DiscoveryStarted",
     65                 "DiscoveryCompleted",
     66                 "RemoteDeviceFound",
     67                 "RemoteNameUpdated",
     68                 "RemoteNameFailed",
     69                 "RemoteAliasChanged"
     70                 "RemoteAliasCleared",
     71                 "RemoteDeviceConnected",
     72                 "RemoteDeviceDisconnectRequested",
     73                 "RemoteDeviceDisconnected",
     74                 "BondingCreated",
     75                 "BondingRemoved" ]
     76 
     77 dev_signals_filter = [ "/org/bluez/hci0", "/org/bluez/hci1",
     78                        "/org/bluez/hci2", "/org/bluez/hci3",
     79                        "/org/bluez/hci4", "/org/bluez/hci5",
     80                        "/org/bluez/hci6", "/org/bluez/hci7" ]
     81 
     82 class Tester:
     83     exit_events = []
     84     dev_path = None
     85     need_dev = False
     86     listen = False
     87     at_interrupt = None
     88 
     89     def __init__(self, argv):
     90         self.name = argv[0]
     91 
     92         self.parse_args(argv[1:])
     93 
     94         try:
     95             self.dbus_setup()
     96         except dbus.DBusException, e:
     97             print 'Failed to do D-Bus setup: %s' % e
     98             sys.exit(1)
     99 
    100     def parse_args(self, argv):
    101         try:
    102             opts, args = getopt.getopt(argv, "hli:")
    103         except getopt.GetoptError:
    104             self.usage()
    105             sys.exit(1)
    106 
    107         for o, a in opts:
    108             if o == "-h":
    109                 self.usage()
    110                 sys.exit()
    111             elif o == "-l":
    112                 self.listen = True
    113             elif o == "-i":
    114                 if a[0] == '/':
    115                     self.dev_path = a
    116                 else:
    117                     self.dev_path = '/org/bluez/%s' % a
    118 
    119         if not (args or self.listen):
    120             self.usage()
    121             sys.exit(1)
    122 
    123         if args:
    124             self.cmd = args[0]
    125             self.cmd_args = args[1:]
    126 
    127     def dbus_dev_setup(self):
    128         if not self.dev_path:
    129             try:
    130                 self.dbus_mgr_setup()
    131                 self.dev_path = self.manager.DefaultAdapter()
    132             except dbus.DBusException, e:
    133                 print 'Failed to get default device: %s' % e
    134                 sys.exit(1)
    135         try:
    136             obj = self.bus.get_object('org.bluez', self.dev_path)
    137             self.device = dbus.Interface(obj, 'org.bluez.Adapter')
    138         except dbus.DBusException, e:
    139             print 'Failed to setup device path: %s' % e
    140             sys.exit(1)
    141 
    142     def dbus_dev_sig_setup(self):
    143         try:
    144            for signal in dev_signals:
    145                 for path in dev_signals_filter:
    146                     self.bus.add_signal_receiver(self.dev_signal_handler,
    147                                              signal, 'org.bluez.Adapter',
    148                                              'org.bluez', path,
    149                                              message_keyword='dbus_message')
    150         except dbus.DBusException, e:
    151             print 'Failed to setup signal handler for device path: %s' % e
    152             sys.exit(1)
    153 
    154     def dbus_mgr_sig_setup(self):
    155         try:
    156             for signal in mgr_signals:
    157                 self.bus.add_signal_receiver(self.mgr_signal_handler,
    158                                          signal,'org.bluez.Manager',
    159                                          'org.bluez', '/org/bluez')
    160         except dbus.DBusException, e:
    161             print 'Failed to setup signal handler for manager path: %s' % e
    162             sys.exit(1)
    163 
    164     def dbus_mgr_setup(self):
    165         self.manager_obj = self.bus.get_object('org.bluez', '/org/bluez')
    166         self.manager = dbus.Interface(self.manager_obj, 'org.bluez.Manager')
    167 
    168     def dbus_setup(self):
    169         self.bus = dbus.SystemBus()
    170 
    171     def usage(self):
    172         print 'Usage: %s [-i <dev>] [-l] [-h] <cmd> [arg1..]' % self.name
    173         print '  -i <dev>   Specify device (e.g. "hci0" or "/org/bluez/hci0")'
    174         print '  -l         Listen for events (no command required)'
    175         print '  -h         Show this help'
    176         print 'Manager commands:'
    177         for cmd in mgr_cmds:
    178             print '\t%s' % cmd
    179         print 'Adapter commands:'
    180         for cmd in dev_cmds:
    181             print '\t%s' % cmd
    182 
    183     #@dbus.decorators.explicitly_pass_message
    184     def dev_signal_handler(*args, **keywords):
    185         dbus_message = keywords["dbus_message"]
    186         print '%s - %s: ' % (dbus_message.get_member(), dbus_message.get_path()),
    187         for arg in args[1:]:
    188             print '%s   ' % arg,
    189         print
    190 
    191     #@dbus.decorators.explicitly_pass_message
    192     def mgr_signal_handler(*args, **keywords):
    193         dbus_message = keywords["dbus_message"]
    194         print '%s: ' % dbus_message.get_member()
    195         for arg in args[1:]:
    196             print '%s   ' % arg,
    197         print
    198 
    199     def signal_cb(self, sig, frame):
    200         print 'Caught signal, exiting'
    201         if self.at_interrupt:
    202             self.at_interrupt()
    203         self.main_loop.quit()
    204 
    205     def call_mgr_dbus_func(self):
    206         if self.cmd == 'InterfaceVersion':
    207             try:
    208                 print self.manager.InterfaceVersion()
    209             except dbus.DBusException, e:
    210                 print 'Sending %s failed: %s' % (self.cmd, e)
    211         if self.cmd == 'ListAdapters':
    212             try:
    213                 devices = self.manager.ListAdapters()
    214             except dbus.DBusException, e:
    215                 print 'Sending %s failed: %s' % (self.cmd, e)
    216                 sys.exit(1)
    217             for device in devices:
    218                 print device
    219         elif self.cmd == 'DefaultAdapter':
    220             try:
    221                 print self.manager.DefaultAdapter()
    222             except dbus.DBusException, e:
    223                 print 'Sending %s failed: %s' % (self.cmd, e)
    224                 sys.exit(1)
    225 
    226     def call_dev_dbus_func(self):
    227        try:
    228            if self.cmd == 'GetAddress':
    229                print self.device.GetAddress()
    230            elif self.cmd == 'GetManufacturer':
    231                print self.device.GetManufacturer()
    232            elif self.cmd == 'GetVersion':
    233                print self.device.GetVersion()
    234            elif self.cmd == 'GetRevision':
    235                print self.device.GetRevision()
    236            elif self.cmd == 'GetCompany':
    237                print self.device.GetCompany()
    238            elif self.cmd == 'GetMode':
    239                print self.device.GetMode()
    240            elif self.cmd == 'SetMode':
    241                if len(self.cmd_args) == 1:
    242                    self.device.SetMode(self.cmd_args[0])
    243                else:
    244                    print 'Usage: %s -i <dev> SetMode scan_mode' % self.name
    245            elif self.cmd == 'GetDiscoverableTimeout':
    246                print '%u' % (self.device.GetDiscoverableTimeout())
    247            elif self.cmd == 'SetDiscoverableTimeout':
    248                if len(self.cmd_args) == 1:
    249                    self.device.SetDiscoverableTimeout(dbus.UInt32(self.cmd_args[0]))
    250                else:
    251                    print 'Usage: %s -i <dev> SetDiscoverableTimeout timeout' % self.name
    252            elif self.cmd == 'IsConnectable':
    253                print self.device.IsConnectable()
    254            elif self.cmd == 'IsDiscoverable':
    255                print self.device.IsDiscoverable()
    256            elif self.cmd == 'IsConnected':
    257                if len(self.cmd_args) == 1:
    258                    print self.device.IsConnected(self.cmd_args[0])
    259                else:
    260                    print 'Usage: %s -i <dev> IsConnected address' % self.name
    261            elif self.cmd == 'ListConnections':
    262                print self.device.ListConnections()
    263            elif self.cmd == 'GetMajorClass':
    264                print self.device.GetMajorClass()
    265            elif self.cmd == 'ListAvailableMinorClasses':
    266                print self.device.ListAvailableMinorClasses()
    267            elif self.cmd == 'GetMinorClass':
    268                print self.device.GetMinorClass()
    269            elif self.cmd == 'SetMinorClass':
    270                if len(self.cmd_args) == 1:
    271                    self.device.SetMinorClass(self.cmd_args[0])
    272                else:
    273                    print 'Usage: %s -i <dev> SetMinorClass minor' % self.name
    274            elif self.cmd == 'GetServiceClasses':
    275                classes = self.device.GetServiceClasses()
    276                for clas in classes: 
    277                    print clas,
    278            elif self.cmd == 'GetName':
    279                print self.device.GetName()
    280            elif self.cmd == 'SetName':
    281                if len(self.cmd_args) == 1:
    282                    self.device.SetName(self.cmd_args[0])
    283                else:
    284                    print 'Usage: %s -i <dev> SetName newname' % self.name
    285            elif self.cmd == 'GetRemoteName':
    286                if len(self.cmd_args) == 1:
    287                    print self.device.GetRemoteName(self.cmd_args[0])
    288                else:
    289                    print 'Usage: %s -i <dev> GetRemoteName address' % self.name
    290            elif self.cmd == 'GetRemoteVersion':
    291                if len(self.cmd_args) == 1:
    292                    print self.device.GetRemoteVersion(self.cmd_args[0])
    293                else:
    294                    print 'Usage: %s -i <dev> GetRemoteVersion address' % self.name
    295            elif self.cmd == 'GetRemoteRevision':
    296                if len(self.cmd_args) == 1:
    297                    print self.device.GetRemoteRevision(self.cmd_args[0])
    298                else:
    299                    print 'Usage: %s -i <dev> GetRemoteRevision address' % self.name
    300            elif self.cmd == 'GetRemoteManufacturer':
    301                if len(self.cmd_args) == 1:
    302                    print self.device.GetRemoteManufacturer(self.cmd_args[0])
    303                else:
    304                    print 'Usage: %s -i <dev> GetRemoteManufacturer address' % self.name
    305            elif self.cmd == 'GetRemoteCompany':
    306                if len(self.cmd_args) == 1:
    307                    print self.device.GetRemoteCompany(self.cmd_args[0])
    308                else:
    309                    print 'Usage: %s -i <dev> GetRemoteCompany address' % self.name
    310            elif self.cmd == 'GetRemoteAlias':
    311                if len(self.cmd_args) == 1:
    312                    print self.device.GetRemoteAlias(self.cmd_args[0])
    313                else:
    314                    print 'Usage: %s -i <dev> GetRemoteAlias address' % self.name
    315            elif self.cmd == 'GetRemoteMajorClass':
    316                if len(self.cmd_args) == 1:
    317                    print self.device.GetRemoteMajorClass(self.cmd_args[0])
    318                else:
    319                    print 'Usage: %s -i <dev> GetRemoteMajorClass address' % self.name
    320            elif self.cmd == 'GetRemoteMinorClass':
    321                if len(self.cmd_args) == 1:
    322                    print self.device.GetRemoteMinorClass(self.cmd_args[0])
    323                else:
    324                    print 'Usage: %s -i <dev> GetRemoteMinorClass address' % self.name
    325            elif self.cmd == 'GetRemoteServiceClasses':
    326                if len(self.cmd_args) == 1:
    327                    print self.device.GetRemoteServiceClasses(self.cmd_args[0])
    328                else:
    329                    print 'Usage: %s -i <dev> GetRemoteServiceClasses address' % self.name
    330            elif self.cmd == 'SetRemoteAlias':
    331                if len(self.cmd_args) == 2:
    332                    self.device.SetRemoteAlias(self.cmd_args[0], self.cmd_args[1])
    333                else:
    334                    print 'Usage: %s -i <dev> SetRemoteAlias address alias' % self.name
    335            elif self.cmd == 'ClearRemoteAlias':
    336                if len(self.cmd_args) == 1:
    337                    print self.device.ClearRemoteAlias(self.cmd_args[0])
    338                else:
    339                    print 'Usage: %s -i <dev> ClearRemoteAlias address' % self.name
    340            elif self.cmd == 'LastSeen':
    341                if len(self.cmd_args) == 1:
    342                    print self.device.LastSeen(self.cmd_args[0])
    343                else:
    344                    print 'Usage: %s -i <dev> LastSeen address' % self.name
    345            elif self.cmd == 'LastUsed':
    346                if len(self.cmd_args) == 1:
    347                    print self.device.LastUsed(self.cmd_args[0])
    348                else:
    349                    print 'Usage: %s -i <dev> LastUsed address' % self.name
    350            elif self.cmd == 'DisconnectRemoteDevice':
    351                if len(self.cmd_args) == 1:
    352                    print self.device.LastUsed(self.cmd_args[0])
    353                else:
    354                    print 'Usage: %s -i <dev> DisconnectRemoteDevice address' % self.name
    355            elif self.cmd == 'CreateBonding':
    356                if len(self.cmd_args) == 1:
    357                    print self.device.CreateBonding(self.cmd_args[0])
    358                else:
    359                    print 'Usage: %s -i <dev> CreateBonding address' % self.name
    360            elif self.cmd == 'RemoveBonding':
    361                if len(self.cmd_args) == 1:
    362                    print self.device.RemoveBonding(self.cmd_args[0])
    363                else:
    364                    print 'Usage: %s -i <dev> RemoveBonding address' % self.name
    365            elif self.cmd == 'CancelBondingProcess':
    366                if len(self.cmd_args) == 1:
    367                    print self.device.CancelBondingProcess(self.cmd_args[0])
    368                else:
    369                    print 'Usage: %s -i <dev> CancelBondingProcess address' % self.name
    370            elif self.cmd == 'HasBonding':
    371                if len(self.cmd_args) == 1:
    372                    print self.device.HasBonding(self.cmd_args[0])
    373                else:
    374                    print 'Usage: %s -i <dev> HasBonding address' % self.name
    375            elif self.cmd == 'ListBondings':
    376                bondings = self.device.ListBondings()
    377                for bond in bondings: 
    378                    print bond,
    379            elif self.cmd == 'GetPinCodeLength':
    380                if len(self.cmd_args) == 1:
    381                    print self.device.GetPinCodeLength(self.cmd_args[0])
    382                else:
    383                    print 'Usage: %s -i <dev> GetPinCodeLength address' % self.name
    384            elif self.cmd == 'GetEncryptionKeySize':
    385                if len(self.cmd_args) == 1:
    386                    print self.device.GetEncryptionKeySize(self.cmd_args[0])
    387                else:
    388                    print 'Usage: %s -i <dev> GetEncryptionKeySize address' % self.name
    389            elif self.cmd == 'DiscoverDevices':
    390                print self.device.DiscoverDevices()
    391            elif self.cmd == 'DiscoverDevicesWithoutNameResolving':
    392                print self.device.DiscoverDevicesWithoutNameResolving()
    393            elif self.cmd == 'ListRemoteDevices':
    394                devices = self.device.ListRemoteDevices()
    395                for device in devices: 
    396                    print device,
    397            elif self.cmd == 'ListRecentRemoteDevices':
    398                if len(self.cmd_args) == 1:
    399                    devices = self.device.ListRecentRemoteDevices(self.cmd_args[0])
    400                    for device in devices: 
    401                        print device,
    402                else:
    403                    print 'Usage: %s -i <dev> ListRecentRemoteDevices date' % self.name
    404            else:
    405                 # FIXME: remove at future version
    406                 print 'Script Error: Method %s not found. Maybe a mispelled word.' % (self.cmd_args)
    407        except dbus.DBusException, e:
    408            print '%s failed: %s' % (self.cmd, e)
    409            sys.exit(1)
    410 
    411     def run(self):
    412         # Manager methods
    413         if self.listen:
    414             self.dbus_mgr_sig_setup()
    415             self.dbus_dev_sig_setup()
    416             print 'Listening for events...'
    417 
    418         if self.cmd in mgr_cmds:
    419             try:
    420                 self.dbus_mgr_setup()
    421             except dbus.DBusException, e:
    422                 print 'Failed to setup manager interface: %s' % e
    423                 sys.exit(1)
    424             self.call_mgr_dbus_func()
    425         elif self.cmd in dev_cmds:
    426             try:
    427                 self.dbus_dev_setup()
    428             except dbus.DBusException, e:
    429                 print 'Failed to setup device interface: %s' % e
    430                 sys.exit(1)
    431             self.call_dev_dbus_func()
    432         elif not self.listen:
    433             print 'Unknown command: %s' % self.cmd
    434             self.usage()
    435             sys.exit(1)
    436 
    437         if self.listen:
    438             signal(SIGINT, self.signal_cb)
    439             signal(SIGTERM, self.signal_cb)
    440             self.main_loop = gobject.MainLoop()
    441             self.main_loop.run()
    442 
    443 if __name__ == '__main__':
    444     gobject.threads_init()
    445     dbus.glib.init_threads()
    446 
    447     tester = Tester(sys.argv)
    448     tester.run()
    449