Home | History | Annotate | Download | only in test
      1 #!/usr/bin/python
      2 # -*- coding: utf-8 -*-
      3 
      4 import dbus
      5 import dbus.service
      6 import gobject
      7 from dbus.mainloop.glib import DBusGMainLoop
      8 import sys
      9 
     10 DBusGMainLoop(set_as_default=True)
     11 loop = gobject.MainLoop()
     12 
     13 bus = dbus.SystemBus()
     14 
     15 def sig_received(*args, **kwargs):
     16 	if "member" not in kwargs:
     17 		return
     18 	if "path" not in kwargs:
     19 		return;
     20 	sig_name = kwargs["member"]
     21 	path = kwargs["path"]
     22 	print sig_name
     23 	print path
     24 	if sig_name == "PropertyChanged":
     25 		k, v = args
     26 		print k
     27 		print v
     28 	else:
     29 		ob = args[0]
     30 		print ob
     31 
     32 
     33 def enter_mainloop():
     34 	bus.add_signal_receiver(sig_received, bus_name="org.bluez",
     35 				dbus_interface = "org.bluez.HealthDevice",
     36 				path_keyword="path",
     37 				member_keyword="member",
     38 				interface_keyword="interface")
     39 
     40 	try:
     41 		print "Entering main lopp, push Ctrl+C for finish"
     42 
     43 		mainloop = gobject.MainLoop()
     44 		mainloop.run()
     45 	except KeyboardInterrupt:
     46 		pass
     47 	finally:
     48 		print "Exiting, bye"
     49 
     50 hdp_manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"),
     51 						"org.bluez.HealthManager")
     52 
     53 role = None
     54 while role == None:
     55 	print "Select 1. source or 2. sink: ",
     56 	try:
     57 		sel = int(sys.stdin.readline())
     58 		if sel == 1:
     59 			role = "Source"
     60 		elif sel == 2:
     61 			role = "Sink"
     62 		else:
     63 			raise ValueError
     64 	except (TypeError, ValueError):
     65 		print "Wrong selection, try again: ",
     66 	except KeyboardInterrupt:
     67 		sys.exit()
     68 
     69 dtype = None
     70 while dtype == None:
     71 	print "Select a data type: ",
     72 	try:
     73 		sel = int(sys.stdin.readline())
     74 		if (sel < 0) or (sel > 65535):
     75 			raise ValueError
     76 		dtype = sel;
     77 	except (TypeError, ValueError):
     78 		print "Wrong selection, try again: ",
     79 	except KeyboardInterrupt:
     80 		sys.exit()
     81 
     82 pref = None
     83 if role == "Source":
     84 	while pref == None:
     85 		try:
     86 			print "Select a prefered data channel type 1.",
     87 			print "reliable 2. streaming: ",
     88 			sel = int(sys.stdin.readline())
     89 			if sel == 1:
     90 				pref = "Reliable"
     91 			elif sel == 2:
     92 				pref = "Streaming"
     93 			else:
     94 				raise ValueError
     95 
     96 		except (TypeError, ValueError):
     97 			print "Wrong selection, try again"
     98 		except KeyboardInterrupt:
     99 			sys.exit()
    100 
    101 	app_path = hdp_manager.CreateApplication({
    102 					"DataType": dbus.types.UInt16(dtype),
    103 					"Role": role,
    104 					"Description": "Test Source",
    105 					"ChannelType": pref})
    106 else:
    107 	app_path = hdp_manager.CreateApplication({
    108 					"DataType": dbus.types.UInt16(dtype),
    109 					"Description": "Test sink",
    110 					"Role": role})
    111 
    112 print "New application created:", app_path
    113 
    114 con = None
    115 while con == None:
    116 	try:
    117 		print "Connect to a remote device (y/n)? ",
    118 		sel = sys.stdin.readline()
    119 		if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
    120 			con = True
    121 		elif sel in ("n\n", "no\n", "N\n", "NO\n"):
    122 			con = False
    123 		else:
    124 			print "Wrong selection, try again."
    125 	except KeyboardInterrupt:
    126 		sys.exit()
    127 
    128 if not con:
    129 	enter_mainloop()
    130 	sys.exit()
    131 
    132 manager = dbus.Interface(bus.get_object("org.bluez", "/"),
    133 						"org.bluez.Manager")
    134 
    135 adapters = manager.ListAdapters()
    136 
    137 i = 1
    138 for ad in adapters:
    139 	print "%d. %s" % (i, ad)
    140 	i = i + 1
    141 
    142 print "Select an adapter: ",
    143 select = None
    144 while select == None:
    145 	try:
    146 		pos = int(sys.stdin.readline()) - 1
    147 		if pos < 0:
    148 			raise TypeError
    149 		select = adapters[pos]
    150 	except (TypeError, IndexError, ValueError):
    151 		print "Wrong selection, try again: ",
    152 	except KeyboardInterrupt:
    153 		sys.exit()
    154 
    155 adapter =  dbus.Interface(bus.get_object("org.bluez", select),
    156 						"org.bluez.Adapter")
    157 
    158 devices = adapter.ListDevices()
    159 
    160 if len(devices) == 0:
    161 	print "No devices available"
    162 	sys.exit()
    163 
    164 i = 1
    165 for dev in devices:
    166 	print "%d. %s" % (i, dev)
    167 	i = i + 1
    168 
    169 print "Select a device: ",
    170 select = None
    171 while select == None:
    172 	try:
    173 		pos = int(sys.stdin.readline()) - 1
    174 		if pos < 0:
    175 			raise TypeError
    176 		select = devices[pos]
    177 	except (TypeError, IndexError, ValueError):
    178 		print "Wrong selection, try again: ",
    179 	except KeyboardInterrupt:
    180 		sys.exit()
    181 
    182 device = dbus.Interface(bus.get_object("org.bluez", select),
    183 					"org.bluez.HealthDevice")
    184 
    185 echo = None
    186 while echo == None:
    187 	try:
    188 		print "Perform an echo (y/n)? ",
    189 		sel = sys.stdin.readline()
    190 		if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
    191 			echo = True
    192 		elif sel in ("n\n", "no\n", "N\n", "NO\n"):
    193 			echo = False
    194 		else:
    195 			print "Wrong selection, try again."
    196 	except KeyboardInterrupt:
    197 		sys.exit()
    198 
    199 if echo:
    200 	if device.Echo():
    201 		print "Echo was ok"
    202 	else:
    203 		print "Echo war wrong, exiting"
    204 		sys.exit()
    205 
    206 print "Connecting to device %s" % (select)
    207 
    208 if role == "Source":
    209 	chan = device.CreateChannel(app_path, "Reliable")
    210 else:
    211 	chan = device.CreateChannel(app_path, "Any")
    212 
    213 print chan
    214 
    215 enter_mainloop()
    216 
    217 hdp_manager.DestroyApplication(app_path)
    218