1 #!/usr/bin/python 2 # -*- coding: utf-8 -*- 3 4 import dbus 5 import dbus.service 6 import gobject 7 from dbus.mainloop.glib import DBusGMainLoop 8 import sys 9 10 DBusGMainLoop(set_as_default=True) 11 loop = gobject.MainLoop() 12 13 bus = dbus.SystemBus() 14 15 def sig_received(*args, **kwargs): 16 if "member" not in kwargs: 17 return 18 if "path" not in kwargs: 19 return; 20 sig_name = kwargs["member"] 21 path = kwargs["path"] 22 print sig_name 23 print path 24 if sig_name == "PropertyChanged": 25 k, v = args 26 print k 27 print v 28 else: 29 ob = args[0] 30 print ob 31 32 33 def enter_mainloop(): 34 bus.add_signal_receiver(sig_received, bus_name="org.bluez", 35 dbus_interface = "org.bluez.HealthDevice", 36 path_keyword="path", 37 member_keyword="member", 38 interface_keyword="interface") 39 40 try: 41 print "Entering main lopp, push Ctrl+C for finish" 42 43 mainloop = gobject.MainLoop() 44 mainloop.run() 45 except KeyboardInterrupt: 46 pass 47 finally: 48 print "Exiting, bye" 49 50 hdp_manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"), 51 "org.bluez.HealthManager") 52 53 role = None 54 while role == None: 55 print "Select 1. source or 2. sink: ", 56 try: 57 sel = int(sys.stdin.readline()) 58 if sel == 1: 59 role = "Source" 60 elif sel == 2: 61 role = "Sink" 62 else: 63 raise ValueError 64 except (TypeError, ValueError): 65 print "Wrong selection, try again: ", 66 except KeyboardInterrupt: 67 sys.exit() 68 69 dtype = None 70 while dtype == None: 71 print "Select a data type: ", 72 try: 73 sel = int(sys.stdin.readline()) 74 if (sel < 0) or (sel > 65535): 75 raise ValueError 76 dtype = sel; 77 except (TypeError, ValueError): 78 print "Wrong selection, try again: ", 79 except KeyboardInterrupt: 80 sys.exit() 81 82 pref = None 83 if role == "Source": 84 while pref == None: 85 try: 86 print "Select a prefered data channel type 1.", 87 print "reliable 2. streaming: ", 88 sel = int(sys.stdin.readline()) 89 if sel == 1: 90 pref = "Reliable" 91 elif sel == 2: 92 pref = "Streaming" 93 else: 94 raise ValueError 95 96 except (TypeError, ValueError): 97 print "Wrong selection, try again" 98 except KeyboardInterrupt: 99 sys.exit() 100 101 app_path = hdp_manager.CreateApplication({ 102 "DataType": dbus.types.UInt16(dtype), 103 "Role": role, 104 "Description": "Test Source", 105 "ChannelType": pref}) 106 else: 107 app_path = hdp_manager.CreateApplication({ 108 "DataType": dbus.types.UInt16(dtype), 109 "Description": "Test sink", 110 "Role": role}) 111 112 print "New application created:", app_path 113 114 con = None 115 while con == None: 116 try: 117 print "Connect to a remote device (y/n)? ", 118 sel = sys.stdin.readline() 119 if sel in ("y\n", "yes\n", "Y\n", "YES\n"): 120 con = True 121 elif sel in ("n\n", "no\n", "N\n", "NO\n"): 122 con = False 123 else: 124 print "Wrong selection, try again." 125 except KeyboardInterrupt: 126 sys.exit() 127 128 if not con: 129 enter_mainloop() 130 sys.exit() 131 132 manager = dbus.Interface(bus.get_object("org.bluez", "/"), 133 "org.bluez.Manager") 134 135 adapters = manager.ListAdapters() 136 137 i = 1 138 for ad in adapters: 139 print "%d. %s" % (i, ad) 140 i = i + 1 141 142 print "Select an adapter: ", 143 select = None 144 while select == None: 145 try: 146 pos = int(sys.stdin.readline()) - 1 147 if pos < 0: 148 raise TypeError 149 select = adapters[pos] 150 except (TypeError, IndexError, ValueError): 151 print "Wrong selection, try again: ", 152 except KeyboardInterrupt: 153 sys.exit() 154 155 adapter = dbus.Interface(bus.get_object("org.bluez", select), 156 "org.bluez.Adapter") 157 158 devices = adapter.ListDevices() 159 160 if len(devices) == 0: 161 print "No devices available" 162 sys.exit() 163 164 i = 1 165 for dev in devices: 166 print "%d. %s" % (i, dev) 167 i = i + 1 168 169 print "Select a device: ", 170 select = None 171 while select == None: 172 try: 173 pos = int(sys.stdin.readline()) - 1 174 if pos < 0: 175 raise TypeError 176 select = devices[pos] 177 except (TypeError, IndexError, ValueError): 178 print "Wrong selection, try again: ", 179 except KeyboardInterrupt: 180 sys.exit() 181 182 device = dbus.Interface(bus.get_object("org.bluez", select), 183 "org.bluez.HealthDevice") 184 185 echo = None 186 while echo == None: 187 try: 188 print "Perform an echo (y/n)? ", 189 sel = sys.stdin.readline() 190 if sel in ("y\n", "yes\n", "Y\n", "YES\n"): 191 echo = True 192 elif sel in ("n\n", "no\n", "N\n", "NO\n"): 193 echo = False 194 else: 195 print "Wrong selection, try again." 196 except KeyboardInterrupt: 197 sys.exit() 198 199 if echo: 200 if device.Echo(): 201 print "Echo was ok" 202 else: 203 print "Echo war wrong, exiting" 204 sys.exit() 205 206 print "Connecting to device %s" % (select) 207 208 if role == "Source": 209 chan = device.CreateChannel(app_path, "Reliable") 210 else: 211 chan = device.CreateChannel(app_path, "Any") 212 213 print chan 214 215 enter_mainloop() 216 217 hdp_manager.DestroyApplication(app_path) 218