1 #!/usr/bin/env python2 2 """Simple conformance test for adb. 3 4 This script will use the available adb in path and run simple 5 tests that attempt to touch all accessible attached devices. 6 """ 7 import hashlib 8 import os 9 import pipes 10 import random 11 import re 12 import shlex 13 import subprocess 14 import sys 15 import tempfile 16 import unittest 17 18 19 def trace(cmd): 20 """Print debug message if tracing enabled.""" 21 if False: 22 print >> sys.stderr, cmd 23 24 25 def call(cmd_str): 26 """Run process and return output tuple (stdout, stderr, ret code).""" 27 trace(cmd_str) 28 process = subprocess.Popen(shlex.split(cmd_str), 29 stdout=subprocess.PIPE, 30 stderr=subprocess.PIPE) 31 stdout, stderr = process.communicate() 32 return stdout, stderr, process.returncode 33 34 35 def call_combined(cmd_str): 36 """Run process and return output tuple (stdout+stderr, ret code).""" 37 trace(cmd_str) 38 process = subprocess.Popen(shlex.split(cmd_str), 39 stdout=subprocess.PIPE, 40 stderr=subprocess.STDOUT) 41 stdout, _ = process.communicate() 42 return stdout, process.returncode 43 44 45 def call_checked(cmd_str): 46 """Run process and get stdout+stderr, raise an exception on trouble.""" 47 trace(cmd_str) 48 return subprocess.check_output(shlex.split(cmd_str), 49 stderr=subprocess.STDOUT) 50 51 52 def call_checked_list(cmd_str): 53 return call_checked(cmd_str).split('\n') 54 55 56 def call_checked_list_skip(cmd_str): 57 out_list = call_checked_list(cmd_str) 58 59 def is_init_line(line): 60 if (len(line) >= 3) and (line[0] == "*") and (line[-2] == "*"): 61 return True 62 else: 63 return False 64 65 return [line for line in out_list if not is_init_line(line)] 66 67 68 def get_device_list(): 69 output = call_checked_list_skip("adb devices") 70 dev_list = [] 71 for line in output[1:]: 72 if line.strip() == "": 73 continue 74 device, _ = line.split() 75 dev_list.append(device) 76 return dev_list 77 78 79 def get_attached_device_count(): 80 return len(get_device_list()) 81 82 83 def compute_md5(string): 84 hsh = hashlib.md5() 85 hsh.update(string) 86 return hsh.hexdigest() 87 88 89 class HostFile(object): 90 def __init__(self, handle, md5): 91 self.handle = handle 92 self.md5 = md5 93 self.full_path = handle.name 94 self.base_name = os.path.basename(self.full_path) 95 96 97 class DeviceFile(object): 98 def __init__(self, md5, full_path): 99 self.md5 = md5 100 self.full_path = full_path 101 self.base_name = os.path.basename(self.full_path) 102 103 104 def make_random_host_files(in_dir, num_files, rand_size=True): 105 files = {} 106 min_size = 1 * (1 << 10) 107 max_size = 16 * (1 << 10) 108 fixed_size = min_size 109 110 for _ in range(num_files): 111 file_handle = tempfile.NamedTemporaryFile(dir=in_dir) 112 113 if rand_size: 114 size = random.randrange(min_size, max_size, 1024) 115 else: 116 size = fixed_size 117 rand_str = os.urandom(size) 118 file_handle.write(rand_str) 119 file_handle.flush() 120 121 md5 = compute_md5(rand_str) 122 files[file_handle.name] = HostFile(file_handle, md5) 123 return files 124 125 126 def make_random_device_files(adb, in_dir, num_files, rand_size=True): 127 files = {} 128 min_size = 1 * (1 << 10) 129 max_size = 16 * (1 << 10) 130 fixed_size = min_size 131 132 for i in range(num_files): 133 if rand_size: 134 size = random.randrange(min_size, max_size, 1024) 135 else: 136 size = fixed_size 137 138 base_name = "device_tmpfile" + str(i) 139 full_path = in_dir + "/" + base_name 140 141 adb.shell("dd if=/dev/urandom of={} bs={} count=1".format(full_path, 142 size)) 143 dev_md5, _ = adb.shell("md5sum {}".format(full_path)).split() 144 145 files[full_path] = DeviceFile(dev_md5, full_path) 146 return files 147 148 149 class AdbWrapper(object): 150 """Convenience wrapper object for the adb command.""" 151 def __init__(self, device=None, out_dir=None): 152 self.device = device 153 self.out_dir = out_dir 154 self.adb_cmd = "adb " 155 if self.device: 156 self.adb_cmd += "-s {} ".format(device) 157 if self.out_dir: 158 self.adb_cmd += "-p {} ".format(out_dir) 159 160 def shell(self, cmd): 161 return call_checked(self.adb_cmd + "shell " + cmd) 162 163 def shell_nocheck(self, cmd): 164 return call_combined(self.adb_cmd + "shell " + cmd) 165 166 def install(self, filename): 167 return call_checked(self.adb_cmd + "install {}".format(pipes.quote(filename))) 168 169 def push(self, local, remote): 170 return call_checked(self.adb_cmd + "push {} {}".format(local, remote)) 171 172 def pull(self, remote, local): 173 return call_checked(self.adb_cmd + "pull {} {}".format(remote, local)) 174 175 def sync(self, directory=""): 176 return call_checked(self.adb_cmd + "sync {}".format(directory)) 177 178 def forward(self, local, remote): 179 return call_checked(self.adb_cmd + "forward {} {}".format(local, 180 remote)) 181 182 def tcpip(self, port): 183 return call_checked(self.adb_cmd + "tcpip {}".format(port)) 184 185 def usb(self): 186 return call_checked(self.adb_cmd + "usb") 187 188 def root(self): 189 return call_checked(self.adb_cmd + "root") 190 191 def unroot(self): 192 return call_checked(self.adb_cmd + "unroot") 193 194 def forward_remove(self, local): 195 return call_checked(self.adb_cmd + "forward --remove {}".format(local)) 196 197 def forward_remove_all(self): 198 return call_checked(self.adb_cmd + "forward --remove-all") 199 200 def connect(self, host): 201 return call_checked(self.adb_cmd + "connect {}".format(host)) 202 203 def disconnect(self, host): 204 return call_checked(self.adb_cmd + "disconnect {}".format(host)) 205 206 def reverse(self, remote, local): 207 return call_checked(self.adb_cmd + "reverse {} {}".format(remote, 208 local)) 209 210 def reverse_remove_all(self): 211 return call_checked(self.adb_cmd + "reverse --remove-all") 212 213 def reverse_remove(self, remote): 214 return call_checked( 215 self.adb_cmd + "reverse --remove {}".format(remote)) 216 217 def wait(self): 218 return call_checked(self.adb_cmd + "wait-for-device") 219 220 221 class AdbBasic(unittest.TestCase): 222 def test_shell(self): 223 """Check that we can at least cat a file.""" 224 adb = AdbWrapper() 225 out = adb.shell("cat /proc/uptime") 226 self.assertEqual(len(out.split()), 2) 227 self.assertGreater(float(out.split()[0]), 0.0) 228 self.assertGreater(float(out.split()[1]), 0.0) 229 230 def test_help(self): 231 """Make sure we get _something_ out of help.""" 232 out = call_checked("adb help") 233 self.assertTrue(len(out) > 0) 234 235 def test_version(self): 236 """Get a version number out of the output of adb.""" 237 out = call_checked("adb version").split() 238 version_num = False 239 for item in out: 240 if re.match(r"[\d+\.]*\d", item): 241 version_num = True 242 self.assertTrue(version_num) 243 244 def _test_root(self): 245 adb = AdbWrapper() 246 adb.root() 247 adb.wait() 248 self.assertEqual("root", adb.shell("id -un").strip()) 249 250 def _test_unroot(self): 251 adb = AdbWrapper() 252 adb.unroot() 253 adb.wait() 254 self.assertEqual("shell", adb.shell("id -un").strip()) 255 256 def test_root_unroot(self): 257 """Make sure that adb root and adb unroot work, using id(1).""" 258 adb = AdbWrapper() 259 original_user = adb.shell("id -un").strip() 260 try: 261 if original_user == "root": 262 self._test_unroot() 263 self._test_root() 264 elif original_user == "shell": 265 self._test_root() 266 self._test_unroot() 267 finally: 268 if original_user == "root": 269 adb.root() 270 else: 271 adb.unroot() 272 adb.wait() 273 274 def test_argument_escaping(self): 275 """Make sure that argument escaping is somewhat sane.""" 276 adb = AdbWrapper() 277 278 # http://b/19734868 279 # Note that this actually matches ssh(1)'s behavior --- it's 280 # converted to "sh -c echo hello; echo world" which sh interprets 281 # as "sh -c echo" (with an argument to that shell of "hello"), 282 # and then "echo world" back in the first shell. 283 result = adb.shell("sh -c 'echo hello; echo world'").splitlines() 284 self.assertEqual(["", "world"], result) 285 # If you really wanted "hello" and "world", here's what you'd do: 286 result = adb.shell("echo hello\;echo world").splitlines() 287 self.assertEqual(["hello", "world"], result) 288 289 # http://b/15479704 290 self.assertEqual('t', adb.shell("'true && echo t'").strip()) 291 self.assertEqual('t', adb.shell("sh -c 'true && echo t'").strip()) 292 293 # http://b/20564385 294 self.assertEqual('t', adb.shell("FOO=a BAR=b echo t").strip()) 295 self.assertEqual('123Linux', adb.shell("echo -n 123\;uname").strip()) 296 297 def test_install_argument_escaping(self): 298 """Make sure that install argument escaping works.""" 299 adb = AdbWrapper() 300 301 # http://b/20323053 302 tf = tempfile.NamedTemporaryFile("w", suffix="-text;ls;1.apk") 303 self.assertIn("-text;ls;1.apk", adb.install(tf.name)) 304 305 # http://b/3090932 306 tf = tempfile.NamedTemporaryFile("w", suffix="-Live Hold'em.apk") 307 self.assertIn("-Live Hold'em.apk", adb.install(tf.name)) 308 309 310 class AdbFile(unittest.TestCase): 311 SCRATCH_DIR = "/data/local/tmp" 312 DEVICE_TEMP_FILE = SCRATCH_DIR + "/adb_test_file" 313 DEVICE_TEMP_DIR = SCRATCH_DIR + "/adb_test_dir" 314 315 def test_push(self): 316 """Push a randomly generated file to specified device.""" 317 kbytes = 512 318 adb = AdbWrapper() 319 with tempfile.NamedTemporaryFile(mode="w") as tmp: 320 rand_str = os.urandom(1024 * kbytes) 321 tmp.write(rand_str) 322 tmp.flush() 323 324 host_md5 = compute_md5(rand_str) 325 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE)) 326 try: 327 adb.push(local=tmp.name, remote=AdbFile.DEVICE_TEMP_FILE) 328 dev_md5, _ = adb.shell( 329 "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split() 330 self.assertEqual(host_md5, dev_md5) 331 finally: 332 adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE)) 333 334 # TODO: write push directory test. 335 336 def test_pull(self): 337 """Pull a randomly generated file from specified device.""" 338 kbytes = 512 339 adb = AdbWrapper() 340 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE)) 341 try: 342 adb.shell("dd if=/dev/urandom of={} bs=1024 count={}".format( 343 AdbFile.DEVICE_TEMP_FILE, kbytes)) 344 dev_md5, _ = adb.shell( 345 "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split() 346 347 with tempfile.NamedTemporaryFile(mode="w") as tmp_write: 348 adb.pull(remote=AdbFile.DEVICE_TEMP_FILE, local=tmp_write.name) 349 with open(tmp_write.name) as tmp_read: 350 host_contents = tmp_read.read() 351 host_md5 = compute_md5(host_contents) 352 self.assertEqual(dev_md5, host_md5) 353 finally: 354 adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE)) 355 356 def test_pull_dir(self): 357 """Pull a randomly generated directory of files from the device.""" 358 adb = AdbWrapper() 359 temp_files = {} 360 host_dir = None 361 try: 362 # create temporary host directory 363 host_dir = tempfile.mkdtemp() 364 365 # create temporary dir on device 366 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR)) 367 adb.shell("mkdir -p {}".format(AdbFile.DEVICE_TEMP_DIR)) 368 369 # populate device dir with random files 370 temp_files = make_random_device_files( 371 adb, in_dir=AdbFile.DEVICE_TEMP_DIR, num_files=32) 372 373 adb.pull(remote=AdbFile.DEVICE_TEMP_DIR, local=host_dir) 374 375 for device_full_path in temp_files: 376 host_path = os.path.join( 377 host_dir, temp_files[device_full_path].base_name) 378 with open(host_path) as host_file: 379 host_md5 = compute_md5(host_file.read()) 380 self.assertEqual(host_md5, 381 temp_files[device_full_path].md5) 382 finally: 383 for dev_file in temp_files.values(): 384 host_path = os.path.join(host_dir, dev_file.base_name) 385 os.remove(host_path) 386 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR)) 387 if host_dir: 388 os.removedirs(host_dir) 389 390 def test_sync(self): 391 """Sync a randomly generated directory of files to specified device.""" 392 try: 393 adb = AdbWrapper() 394 temp_files = {} 395 396 # create temporary host directory 397 base_dir = tempfile.mkdtemp() 398 399 # create mirror device directory hierarchy within base_dir 400 full_dir_path = base_dir + AdbFile.DEVICE_TEMP_DIR 401 os.makedirs(full_dir_path) 402 403 # create 32 random files within the host mirror 404 temp_files = make_random_host_files(in_dir=full_dir_path, 405 num_files=32) 406 407 # clean up any trash on the device 408 adb = AdbWrapper(out_dir=base_dir) 409 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR)) 410 411 # issue the sync 412 adb.sync("data") 413 414 # confirm that every file on the device mirrors that on the host 415 for host_full_path in temp_files.keys(): 416 device_full_path = os.path.join( 417 AdbFile.DEVICE_TEMP_DIR, 418 temp_files[host_full_path].base_name) 419 dev_md5, _ = adb.shell( 420 "md5sum {}".format(device_full_path)).split() 421 self.assertEqual(temp_files[host_full_path].md5, dev_md5) 422 423 finally: 424 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR)) 425 if temp_files: 426 for tf in temp_files.values(): 427 tf.handle.close() 428 if base_dir: 429 os.removedirs(base_dir + AdbFile.DEVICE_TEMP_DIR) 430 431 432 if __name__ == '__main__': 433 random.seed(0) 434 dev_count = get_attached_device_count() 435 if dev_count: 436 suite = unittest.TestLoader().loadTestsFromName(__name__) 437 unittest.TextTestRunner(verbosity=3).run(suite) 438 else: 439 print "Test suite must be run with attached devices" 440