1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 # 4 # Copyright (C) 2015 The Android Open Source Project 5 # 6 # Licensed under the Apache License, Version 2.0 (the "License"); 7 # you may not use this file except in compliance with the License. 8 # You may obtain a copy of the License at 9 # 10 # http://www.apache.org/licenses/LICENSE-2.0 11 # 12 # Unless required by applicable law or agreed to in writing, software 13 # distributed under the License is distributed on an "AS IS" BASIS, 14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 # See the License for the specific language governing permissions and 16 # limitations under the License. 17 # 18 from __future__ import print_function 19 20 import contextlib 21 import hashlib 22 import os 23 import posixpath 24 import random 25 import re 26 import shlex 27 import shutil 28 import signal 29 import socket 30 import string 31 import subprocess 32 import sys 33 import tempfile 34 import unittest 35 36 import mock 37 38 import adb 39 40 41 def requires_root(func): 42 def wrapper(self, *args): 43 if self.device.get_prop('ro.debuggable') != '1': 44 raise unittest.SkipTest('requires rootable build') 45 46 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 47 if not was_root: 48 self.device.root() 49 self.device.wait() 50 51 try: 52 func(self, *args) 53 finally: 54 if not was_root: 55 self.device.unroot() 56 self.device.wait() 57 58 return wrapper 59 60 61 def requires_non_root(func): 62 def wrapper(self, *args): 63 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 64 if was_root: 65 self.device.unroot() 66 self.device.wait() 67 68 try: 69 func(self, *args) 70 finally: 71 if was_root: 72 self.device.root() 73 self.device.wait() 74 75 return wrapper 76 77 78 class GetDeviceTest(unittest.TestCase): 79 def setUp(self): 80 self.android_serial = os.getenv('ANDROID_SERIAL') 81 if 'ANDROID_SERIAL' in os.environ: 82 del os.environ['ANDROID_SERIAL'] 83 84 def tearDown(self): 85 if self.android_serial is not None: 86 os.environ['ANDROID_SERIAL'] = self.android_serial 87 else: 88 if 'ANDROID_SERIAL' in os.environ: 89 del os.environ['ANDROID_SERIAL'] 90 91 @mock.patch('adb.device.get_devices') 92 def test_explicit(self, mock_get_devices): 93 mock_get_devices.return_value = ['foo', 'bar'] 94 device = adb.get_device('foo') 95 self.assertEqual(device.serial, 'foo') 96 97 @mock.patch('adb.device.get_devices') 98 def test_from_env(self, mock_get_devices): 99 mock_get_devices.return_value = ['foo', 'bar'] 100 os.environ['ANDROID_SERIAL'] = 'foo' 101 device = adb.get_device() 102 self.assertEqual(device.serial, 'foo') 103 104 @mock.patch('adb.device.get_devices') 105 def test_arg_beats_env(self, mock_get_devices): 106 mock_get_devices.return_value = ['foo', 'bar'] 107 os.environ['ANDROID_SERIAL'] = 'bar' 108 device = adb.get_device('foo') 109 self.assertEqual(device.serial, 'foo') 110 111 @mock.patch('adb.device.get_devices') 112 def test_no_such_device(self, mock_get_devices): 113 mock_get_devices.return_value = ['foo', 'bar'] 114 self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz']) 115 116 os.environ['ANDROID_SERIAL'] = 'baz' 117 self.assertRaises(adb.DeviceNotFoundError, adb.get_device) 118 119 @mock.patch('adb.device.get_devices') 120 def test_unique_device(self, mock_get_devices): 121 mock_get_devices.return_value = ['foo'] 122 device = adb.get_device() 123 self.assertEqual(device.serial, 'foo') 124 125 @mock.patch('adb.device.get_devices') 126 def test_no_unique_device(self, mock_get_devices): 127 mock_get_devices.return_value = ['foo', 'bar'] 128 self.assertRaises(adb.NoUniqueDeviceError, adb.get_device) 129 130 131 class DeviceTest(unittest.TestCase): 132 def setUp(self): 133 self.device = adb.get_device() 134 135 136 class ForwardReverseTest(DeviceTest): 137 def _test_no_rebind(self, description, direction_list, direction, 138 direction_no_rebind, direction_remove_all): 139 msg = direction_list() 140 self.assertEqual('', msg.strip(), 141 description + ' list must be empty to run this test.') 142 143 # Use --no-rebind with no existing binding 144 direction_no_rebind('tcp:5566', 'tcp:6655') 145 msg = direction_list() 146 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 147 148 # Use --no-rebind with existing binding 149 with self.assertRaises(subprocess.CalledProcessError): 150 direction_no_rebind('tcp:5566', 'tcp:6677') 151 msg = direction_list() 152 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 153 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 154 155 # Use the absence of --no-rebind with existing binding 156 direction('tcp:5566', 'tcp:6677') 157 msg = direction_list() 158 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 159 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 160 161 direction_remove_all() 162 msg = direction_list() 163 self.assertEqual('', msg.strip()) 164 165 def test_forward_no_rebind(self): 166 self._test_no_rebind('forward', self.device.forward_list, 167 self.device.forward, self.device.forward_no_rebind, 168 self.device.forward_remove_all) 169 170 def test_reverse_no_rebind(self): 171 self._test_no_rebind('reverse', self.device.reverse_list, 172 self.device.reverse, self.device.reverse_no_rebind, 173 self.device.reverse_remove_all) 174 175 def test_forward(self): 176 msg = self.device.forward_list() 177 self.assertEqual('', msg.strip(), 178 'Forwarding list must be empty to run this test.') 179 self.device.forward('tcp:5566', 'tcp:6655') 180 msg = self.device.forward_list() 181 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 182 self.device.forward('tcp:7788', 'tcp:8877') 183 msg = self.device.forward_list() 184 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 185 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 186 self.device.forward_remove('tcp:5566') 187 msg = self.device.forward_list() 188 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 189 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 190 self.device.forward_remove_all() 191 msg = self.device.forward_list() 192 self.assertEqual('', msg.strip()) 193 194 def test_reverse(self): 195 msg = self.device.reverse_list() 196 self.assertEqual('', msg.strip(), 197 'Reverse forwarding list must be empty to run this test.') 198 self.device.reverse('tcp:5566', 'tcp:6655') 199 msg = self.device.reverse_list() 200 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 201 self.device.reverse('tcp:7788', 'tcp:8877') 202 msg = self.device.reverse_list() 203 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 204 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 205 self.device.reverse_remove('tcp:5566') 206 msg = self.device.reverse_list() 207 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 208 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 209 self.device.reverse_remove_all() 210 msg = self.device.reverse_list() 211 self.assertEqual('', msg.strip()) 212 213 # Note: If you run this test when adb connect'd to a physical device over 214 # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821 215 def test_forward_reverse_echo(self): 216 """Send data through adb forward and read it back via adb reverse""" 217 forward_port = 12345 218 reverse_port = forward_port + 1 219 forward_spec = 'tcp:' + str(forward_port) 220 reverse_spec = 'tcp:' + str(reverse_port) 221 forward_setup = False 222 reverse_setup = False 223 224 try: 225 # listen on localhost:forward_port, connect to remote:forward_port 226 self.device.forward(forward_spec, forward_spec) 227 forward_setup = True 228 # listen on remote:forward_port, connect to localhost:reverse_port 229 self.device.reverse(forward_spec, reverse_spec) 230 reverse_setup = True 231 232 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 233 with contextlib.closing(listener): 234 # Use SO_REUSEADDR so that subsequent runs of the test can grab 235 # the port even if it is in TIME_WAIT. 236 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 237 238 # Listen on localhost:reverse_port before connecting to 239 # localhost:forward_port because that will cause adb to connect 240 # back to localhost:reverse_port. 241 listener.bind(('127.0.0.1', reverse_port)) 242 listener.listen(4) 243 244 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 245 with contextlib.closing(client): 246 # Connect to the listener. 247 client.connect(('127.0.0.1', forward_port)) 248 249 # Accept the client connection. 250 accepted_connection, addr = listener.accept() 251 with contextlib.closing(accepted_connection) as server: 252 data = 'hello' 253 254 # Send data into the port setup by adb forward. 255 client.sendall(data) 256 # Explicitly close() so that server gets EOF. 257 client.close() 258 259 # Verify that the data came back via adb reverse. 260 self.assertEqual(data, server.makefile().read()) 261 finally: 262 if reverse_setup: 263 self.device.reverse_remove(forward_spec) 264 if forward_setup: 265 self.device.forward_remove(forward_spec) 266 267 268 class ShellTest(DeviceTest): 269 def _interactive_shell(self, shell_args, input): 270 """Runs an interactive adb shell. 271 272 Args: 273 shell_args: List of string arguments to `adb shell`. 274 input: String input to send to the interactive shell. 275 276 Returns: 277 The remote exit code. 278 279 Raises: 280 unittest.SkipTest: The device doesn't support exit codes. 281 """ 282 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: 283 raise unittest.SkipTest('exit codes are unavailable on this device') 284 285 proc = subprocess.Popen( 286 self.device.adb_cmd + ['shell'] + shell_args, 287 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 288 stderr=subprocess.PIPE) 289 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 290 # to explicitly add an exit command to close the session from the device 291 # side, plus the necessary newline to complete the interactive command. 292 proc.communicate(input + '; exit\n') 293 return proc.returncode 294 295 def test_cat(self): 296 """Check that we can at least cat a file.""" 297 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 298 elements = out.split() 299 self.assertEqual(len(elements), 2) 300 301 uptime, idle = elements 302 self.assertGreater(float(uptime), 0.0) 303 self.assertGreater(float(idle), 0.0) 304 305 def test_throws_on_failure(self): 306 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 307 308 def test_output_not_stripped(self): 309 out = self.device.shell(['echo', 'foo'])[0] 310 self.assertEqual(out, 'foo' + self.device.linesep) 311 312 def test_shell_nocheck_failure(self): 313 rc, out, _ = self.device.shell_nocheck(['false']) 314 self.assertNotEqual(rc, 0) 315 self.assertEqual(out, '') 316 317 def test_shell_nocheck_output_not_stripped(self): 318 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 319 self.assertEqual(rc, 0) 320 self.assertEqual(out, 'foo' + self.device.linesep) 321 322 def test_can_distinguish_tricky_results(self): 323 # If result checking on ADB shell is naively implemented as 324 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 325 # output from the result for a cmd of `echo -n 1`. 326 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 327 self.assertEqual(rc, 0) 328 self.assertEqual(out, '1') 329 330 def test_line_endings(self): 331 """Ensure that line ending translation is not happening in the pty. 332 333 Bug: http://b/19735063 334 """ 335 output = self.device.shell(['uname'])[0] 336 self.assertEqual(output, 'Linux' + self.device.linesep) 337 338 def test_pty_logic(self): 339 """Tests that a PTY is allocated when it should be. 340 341 PTY allocation behavior should match ssh; some behavior requires 342 a terminal stdin to test so this test will be skipped if stdin 343 is not a terminal. 344 """ 345 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: 346 raise unittest.SkipTest('PTY arguments unsupported on this device') 347 if not os.isatty(sys.stdin.fileno()): 348 raise unittest.SkipTest('PTY tests require stdin terminal') 349 350 def check_pty(args): 351 """Checks adb shell PTY allocation. 352 353 Tests |args| for terminal and non-terminal stdin. 354 355 Args: 356 args: -Tt args in a list (e.g. ['-t', '-t']). 357 358 Returns: 359 A tuple (<terminal>, <non-terminal>). True indicates 360 the corresponding shell allocated a remote PTY. 361 """ 362 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 363 364 terminal = subprocess.Popen( 365 test_cmd, stdin=None, 366 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 367 terminal.communicate() 368 369 non_terminal = subprocess.Popen( 370 test_cmd, stdin=subprocess.PIPE, 371 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 372 non_terminal.communicate() 373 374 return (terminal.returncode == 0, non_terminal.returncode == 0) 375 376 # -T: never allocate PTY. 377 self.assertEqual((False, False), check_pty(['-T'])) 378 379 # No args: PTY only if stdin is a terminal and shell is interactive, 380 # which is difficult to reliably test from a script. 381 self.assertEqual((False, False), check_pty([])) 382 383 # -t: PTY if stdin is a terminal. 384 self.assertEqual((True, False), check_pty(['-t'])) 385 386 # -t -t: always allocate PTY. 387 self.assertEqual((True, True), check_pty(['-t', '-t'])) 388 389 def test_shell_protocol(self): 390 """Tests the shell protocol on the device. 391 392 If the device supports shell protocol, this gives us the ability 393 to separate stdout/stderr and return the exit code directly. 394 395 Bug: http://b/19734861 396 """ 397 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: 398 raise unittest.SkipTest('shell protocol unsupported on this device') 399 400 # Shell protocol should be used by default. 401 result = self.device.shell_nocheck( 402 shlex.split('echo foo; echo bar >&2; exit 17')) 403 self.assertEqual(17, result[0]) 404 self.assertEqual('foo' + self.device.linesep, result[1]) 405 self.assertEqual('bar' + self.device.linesep, result[2]) 406 407 self.assertEqual(17, self._interactive_shell([], 'exit 17')) 408 409 # -x flag should disable shell protocol. 410 result = self.device.shell_nocheck( 411 shlex.split('-x echo foo; echo bar >&2; exit 17')) 412 self.assertEqual(0, result[0]) 413 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 414 self.assertEqual('', result[2]) 415 416 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17')) 417 418 def test_non_interactive_sigint(self): 419 """Tests that SIGINT in a non-interactive shell kills the process. 420 421 This requires the shell protocol in order to detect the broken 422 pipe; raw data transfer mode will only see the break once the 423 subprocess tries to read or write. 424 425 Bug: http://b/23825725 426 """ 427 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: 428 raise unittest.SkipTest('shell protocol unsupported on this device') 429 430 # Start a long-running process. 431 sleep_proc = subprocess.Popen( 432 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 433 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 434 stderr=subprocess.STDOUT) 435 remote_pid = sleep_proc.stdout.readline().strip() 436 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 437 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 438 439 # Verify that the process is running, send signal, verify it stopped. 440 self.device.shell(proc_query) 441 os.kill(sleep_proc.pid, signal.SIGINT) 442 sleep_proc.communicate() 443 self.assertEqual(1, self.device.shell_nocheck(proc_query)[0], 444 'subprocess failed to terminate') 445 446 def test_non_interactive_stdin(self): 447 """Tests that non-interactive shells send stdin.""" 448 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: 449 raise unittest.SkipTest('non-interactive stdin unsupported ' 450 'on this device') 451 452 # Test both small and large inputs. 453 small_input = 'foo' 454 large_input = '\n'.join(c * 100 for c in (string.ascii_letters + 455 string.digits)) 456 457 for input in (small_input, large_input): 458 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 459 stdin=subprocess.PIPE, 460 stdout=subprocess.PIPE, 461 stderr=subprocess.PIPE) 462 stdout, stderr = proc.communicate(input) 463 self.assertEqual(input.splitlines(), stdout.splitlines()) 464 self.assertEqual('', stderr) 465 466 467 class ArgumentEscapingTest(DeviceTest): 468 def test_shell_escaping(self): 469 """Make sure that argument escaping is somewhat sane.""" 470 471 # http://b/19734868 472 # Note that this actually matches ssh(1)'s behavior --- it's 473 # converted to `sh -c echo hello; echo world` which sh interprets 474 # as `sh -c echo` (with an argument to that shell of "hello"), 475 # and then `echo world` back in the first shell. 476 result = self.device.shell( 477 shlex.split("sh -c 'echo hello; echo world'"))[0] 478 result = result.splitlines() 479 self.assertEqual(['', 'world'], result) 480 # If you really wanted "hello" and "world", here's what you'd do: 481 result = self.device.shell( 482 shlex.split(r'echo hello\;echo world'))[0].splitlines() 483 self.assertEqual(['hello', 'world'], result) 484 485 # http://b/15479704 486 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 487 self.assertEqual('t', result) 488 result = self.device.shell( 489 shlex.split("sh -c 'true && echo t'"))[0].strip() 490 self.assertEqual('t', result) 491 492 # http://b/20564385 493 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 494 self.assertEqual('t', result) 495 result = self.device.shell( 496 shlex.split(r'echo -n 123\;uname'))[0].strip() 497 self.assertEqual('123Linux', result) 498 499 def test_install_argument_escaping(self): 500 """Make sure that install argument escaping works.""" 501 # http://b/20323053, http://b/3090932. 502 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"): 503 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 504 delete=False) 505 tf.close() 506 507 # Installing bogus .apks fails if the device supports exit codes. 508 try: 509 output = self.device.install(tf.name) 510 except subprocess.CalledProcessError as e: 511 output = e.output 512 513 self.assertIn(file_suffix, output) 514 os.remove(tf.name) 515 516 517 class RootUnrootTest(DeviceTest): 518 def _test_root(self): 519 message = self.device.root() 520 if 'adbd cannot run as root in production builds' in message: 521 return 522 self.device.wait() 523 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 524 525 def _test_unroot(self): 526 self.device.unroot() 527 self.device.wait() 528 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 529 530 def test_root_unroot(self): 531 """Make sure that adb root and adb unroot work, using id(1).""" 532 if self.device.get_prop('ro.debuggable') != '1': 533 raise unittest.SkipTest('requires rootable build') 534 535 original_user = self.device.shell(['id', '-un'])[0].strip() 536 try: 537 if original_user == 'root': 538 self._test_unroot() 539 self._test_root() 540 elif original_user == 'shell': 541 self._test_root() 542 self._test_unroot() 543 finally: 544 if original_user == 'root': 545 self.device.root() 546 else: 547 self.device.unroot() 548 self.device.wait() 549 550 551 class TcpIpTest(DeviceTest): 552 def test_tcpip_failure_raises(self): 553 """adb tcpip requires a port. 554 555 Bug: http://b/22636927 556 """ 557 self.assertRaises( 558 subprocess.CalledProcessError, self.device.tcpip, '') 559 self.assertRaises( 560 subprocess.CalledProcessError, self.device.tcpip, 'foo') 561 562 563 class SystemPropertiesTest(DeviceTest): 564 def test_get_prop(self): 565 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 566 567 @requires_root 568 def test_set_prop(self): 569 prop_name = 'foo.bar' 570 self.device.shell(['setprop', prop_name, '""']) 571 572 self.device.set_prop(prop_name, 'qux') 573 self.assertEqual( 574 self.device.shell(['getprop', prop_name])[0].strip(), 'qux') 575 576 577 def compute_md5(string): 578 hsh = hashlib.md5() 579 hsh.update(string) 580 return hsh.hexdigest() 581 582 583 def get_md5_prog(device): 584 """Older platforms (pre-L) had the name md5 rather than md5sum.""" 585 try: 586 device.shell(['md5sum', '/proc/uptime']) 587 return 'md5sum' 588 except adb.ShellError: 589 return 'md5' 590 591 592 class HostFile(object): 593 def __init__(self, handle, checksum): 594 self.handle = handle 595 self.checksum = checksum 596 self.full_path = handle.name 597 self.base_name = os.path.basename(self.full_path) 598 599 600 class DeviceFile(object): 601 def __init__(self, checksum, full_path): 602 self.checksum = checksum 603 self.full_path = full_path 604 self.base_name = posixpath.basename(self.full_path) 605 606 607 def make_random_host_files(in_dir, num_files): 608 min_size = 1 * (1 << 10) 609 max_size = 16 * (1 << 10) 610 611 files = [] 612 for _ in xrange(num_files): 613 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 614 615 size = random.randrange(min_size, max_size, 1024) 616 rand_str = os.urandom(size) 617 file_handle.write(rand_str) 618 file_handle.flush() 619 file_handle.close() 620 621 md5 = compute_md5(rand_str) 622 files.append(HostFile(file_handle, md5)) 623 return files 624 625 626 def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 627 min_size = 1 * (1 << 10) 628 max_size = 16 * (1 << 10) 629 630 files = [] 631 for file_num in xrange(num_files): 632 size = random.randrange(min_size, max_size, 1024) 633 634 base_name = prefix + str(file_num) 635 full_path = posixpath.join(in_dir, base_name) 636 637 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 638 'bs={}'.format(size), 'count=1']) 639 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() 640 641 files.append(DeviceFile(dev_md5, full_path)) 642 return files 643 644 645 class FileOperationsTest(DeviceTest): 646 SCRATCH_DIR = '/data/local/tmp' 647 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 648 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 649 650 def _verify_remote(self, checksum, remote_path): 651 dev_md5, _ = self.device.shell([get_md5_prog(self.device), 652 remote_path])[0].split() 653 self.assertEqual(checksum, dev_md5) 654 655 def _verify_local(self, checksum, local_path): 656 with open(local_path, 'rb') as host_file: 657 host_md5 = compute_md5(host_file.read()) 658 self.assertEqual(host_md5, checksum) 659 660 def test_push(self): 661 """Push a randomly generated file to specified device.""" 662 kbytes = 512 663 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 664 rand_str = os.urandom(1024 * kbytes) 665 tmp.write(rand_str) 666 tmp.close() 667 668 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 669 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 670 671 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 672 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 673 674 os.remove(tmp.name) 675 676 def test_push_dir(self): 677 """Push a randomly generated directory of files to the device.""" 678 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 679 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 680 681 try: 682 host_dir = tempfile.mkdtemp() 683 684 # Make sure the temp directory isn't setuid, or else adb will complain. 685 os.chmod(host_dir, 0o700) 686 687 # Create 32 random files. 688 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 689 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 690 691 for temp_file in temp_files: 692 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 693 os.path.basename(host_dir), 694 temp_file.base_name) 695 self._verify_remote(temp_file.checksum, remote_path) 696 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 697 finally: 698 if host_dir is not None: 699 shutil.rmtree(host_dir) 700 701 @unittest.expectedFailure # b/25566053 702 def test_push_empty(self): 703 """Push a directory containing an empty directory to the device.""" 704 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 705 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 706 707 try: 708 host_dir = tempfile.mkdtemp() 709 710 # Make sure the temp directory isn't setuid, or else adb will complain. 711 os.chmod(host_dir, 0o700) 712 713 # Create an empty directory. 714 os.mkdir(os.path.join(host_dir, 'empty')) 715 716 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 717 718 test_empty_cmd = ['[', '-d', 719 os.path.join(self.DEVICE_TEMP_DIR, 'empty')] 720 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 721 self.assertEqual(rc, 0) 722 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 723 finally: 724 if host_dir is not None: 725 shutil.rmtree(host_dir) 726 727 def test_multiple_push(self): 728 """Push multiple files to the device in one adb push command. 729 730 Bug: http://b/25324823 731 """ 732 733 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 734 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 735 736 try: 737 host_dir = tempfile.mkdtemp() 738 739 # Create some random files and a subdirectory containing more files. 740 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 741 742 subdir = os.path.join(host_dir, 'subdir') 743 os.mkdir(subdir) 744 subdir_temp_files = make_random_host_files(in_dir=subdir, 745 num_files=4) 746 747 paths = map(lambda temp_file: temp_file.full_path, temp_files) 748 paths.append(subdir) 749 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 750 751 for temp_file in temp_files: 752 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 753 temp_file.base_name) 754 self._verify_remote(temp_file.checksum, remote_path) 755 756 for subdir_temp_file in subdir_temp_files: 757 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 758 # BROKEN: http://b/25394682 759 # 'subdir'; 760 temp_file.base_name) 761 self._verify_remote(temp_file.checksum, remote_path) 762 763 764 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 765 finally: 766 if host_dir is not None: 767 shutil.rmtree(host_dir) 768 769 @requires_non_root 770 def test_push_error_reporting(self): 771 """Make sure that errors that occur while pushing a file get reported 772 773 Bug: http://b/26816782 774 """ 775 with tempfile.NamedTemporaryFile() as tmp_file: 776 tmp_file.write('\0' * 1024 * 1024) 777 tmp_file.flush() 778 try: 779 self.device.push(local=tmp_file.name, remote='/system/') 780 self.fail('push should not have succeeded') 781 except subprocess.CalledProcessError as e: 782 output = e.output 783 784 self.assertIn('Permission denied', output) 785 786 def _test_pull(self, remote_file, checksum): 787 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 788 tmp_write.close() 789 self.device.pull(remote=remote_file, local=tmp_write.name) 790 with open(tmp_write.name, 'rb') as tmp_read: 791 host_contents = tmp_read.read() 792 host_md5 = compute_md5(host_contents) 793 self.assertEqual(checksum, host_md5) 794 os.remove(tmp_write.name) 795 796 @requires_non_root 797 def test_pull_error_reporting(self): 798 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 799 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 800 801 try: 802 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 803 except subprocess.CalledProcessError as e: 804 output = e.output 805 806 self.assertIn('Permission denied', output) 807 808 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 809 810 def test_pull(self): 811 """Pull a randomly generated file from specified device.""" 812 kbytes = 512 813 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 814 cmd = ['dd', 'if=/dev/urandom', 815 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 816 'count={}'.format(kbytes)] 817 self.device.shell(cmd) 818 dev_md5, _ = self.device.shell( 819 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() 820 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 821 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 822 823 def test_pull_dir(self): 824 """Pull a randomly generated directory of files from the device.""" 825 try: 826 host_dir = tempfile.mkdtemp() 827 828 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 829 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 830 831 # Populate device directory with random files. 832 temp_files = make_random_device_files( 833 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 834 835 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 836 837 for temp_file in temp_files: 838 host_path = os.path.join( 839 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 840 temp_file.base_name) 841 self._verify_local(temp_file.checksum, host_path) 842 843 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 844 finally: 845 if host_dir is not None: 846 shutil.rmtree(host_dir) 847 848 def test_pull_dir_symlink(self): 849 """Pull a directory into a symlink to a directory. 850 851 Bug: http://b/27362811 852 """ 853 if os.name != 'posix': 854 raise unittest.SkipTest('requires POSIX') 855 856 try: 857 host_dir = tempfile.mkdtemp() 858 real_dir = os.path.join(host_dir, 'dir') 859 symlink = os.path.join(host_dir, 'symlink') 860 os.mkdir(real_dir) 861 os.symlink(real_dir, symlink) 862 863 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 864 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 865 866 # Populate device directory with random files. 867 temp_files = make_random_device_files( 868 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 869 870 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 871 872 for temp_file in temp_files: 873 host_path = os.path.join( 874 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 875 temp_file.base_name) 876 self._verify_local(temp_file.checksum, host_path) 877 878 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 879 finally: 880 if host_dir is not None: 881 shutil.rmtree(host_dir) 882 883 def test_pull_dir_symlink_collision(self): 884 """Pull a directory into a colliding symlink to directory.""" 885 if os.name != 'posix': 886 raise unittest.SkipTest('requires POSIX') 887 888 try: 889 host_dir = tempfile.mkdtemp() 890 real_dir = os.path.join(host_dir, 'real') 891 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 892 symlink = os.path.join(host_dir, tmp_dirname) 893 os.mkdir(real_dir) 894 os.symlink(real_dir, symlink) 895 896 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 897 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 898 899 # Populate device directory with random files. 900 temp_files = make_random_device_files( 901 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 902 903 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 904 905 for temp_file in temp_files: 906 host_path = os.path.join(real_dir, temp_file.base_name) 907 self._verify_local(temp_file.checksum, host_path) 908 909 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 910 finally: 911 if host_dir is not None: 912 shutil.rmtree(host_dir) 913 914 def test_pull_dir_nonexistent(self): 915 """Pull a directory of files from the device to a nonexistent path.""" 916 try: 917 host_dir = tempfile.mkdtemp() 918 dest_dir = os.path.join(host_dir, 'dest') 919 920 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 921 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 922 923 # Populate device directory with random files. 924 temp_files = make_random_device_files( 925 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 926 927 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 928 929 for temp_file in temp_files: 930 host_path = os.path.join(dest_dir, temp_file.base_name) 931 self._verify_local(temp_file.checksum, host_path) 932 933 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 934 finally: 935 if host_dir is not None: 936 shutil.rmtree(host_dir) 937 938 def test_pull_symlink_dir(self): 939 """Pull a symlink to a directory of symlinks to files.""" 940 try: 941 host_dir = tempfile.mkdtemp() 942 943 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 944 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 945 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 946 947 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 948 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 949 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 950 951 # Populate device directory with random files. 952 temp_files = make_random_device_files( 953 self.device, in_dir=remote_dir, num_files=32) 954 955 for temp_file in temp_files: 956 self.device.shell( 957 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 958 posixpath.join(remote_links, temp_file.base_name)]) 959 960 self.device.pull(remote=remote_symlink, local=host_dir) 961 962 for temp_file in temp_files: 963 host_path = os.path.join( 964 host_dir, 'symlink', temp_file.base_name) 965 self._verify_local(temp_file.checksum, host_path) 966 967 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 968 finally: 969 if host_dir is not None: 970 shutil.rmtree(host_dir) 971 972 def test_pull_empty(self): 973 """Pull a directory containing an empty directory from the device.""" 974 try: 975 host_dir = tempfile.mkdtemp() 976 977 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 978 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 979 self.device.shell(['mkdir', '-p', remote_empty_path]) 980 981 self.device.pull(remote=remote_empty_path, local=host_dir) 982 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 983 finally: 984 if host_dir is not None: 985 shutil.rmtree(host_dir) 986 987 def test_multiple_pull(self): 988 """Pull a randomly generated directory of files from the device.""" 989 990 try: 991 host_dir = tempfile.mkdtemp() 992 993 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 994 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 995 self.device.shell(['mkdir', '-p', subdir]) 996 997 # Create some random files and a subdirectory containing more files. 998 temp_files = make_random_device_files( 999 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1000 1001 subdir_temp_files = make_random_device_files( 1002 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1003 1004 paths = map(lambda temp_file: temp_file.full_path, temp_files) 1005 paths.append(subdir) 1006 self.device._simple_call(['pull'] + paths + [host_dir]) 1007 1008 for temp_file in temp_files: 1009 local_path = os.path.join(host_dir, temp_file.base_name) 1010 self._verify_local(temp_file.checksum, local_path) 1011 1012 for subdir_temp_file in subdir_temp_files: 1013 local_path = os.path.join(host_dir, 1014 'subdir', 1015 subdir_temp_file.base_name) 1016 self._verify_local(subdir_temp_file.checksum, local_path) 1017 1018 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1019 finally: 1020 if host_dir is not None: 1021 shutil.rmtree(host_dir) 1022 1023 def test_sync(self): 1024 """Sync a randomly generated directory of files to specified device.""" 1025 1026 try: 1027 base_dir = tempfile.mkdtemp() 1028 1029 # Create mirror device directory hierarchy within base_dir. 1030 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1031 os.makedirs(full_dir_path) 1032 1033 # Create 32 random files within the host mirror. 1034 temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32) 1035 1036 # Clean up any trash on the device. 1037 device = adb.get_device(product=base_dir) 1038 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1039 1040 device.sync('data') 1041 1042 # Confirm that every file on the device mirrors that on the host. 1043 for temp_file in temp_files: 1044 device_full_path = posixpath.join(self.DEVICE_TEMP_DIR, 1045 temp_file.base_name) 1046 dev_md5, _ = device.shell( 1047 [get_md5_prog(self.device), device_full_path])[0].split() 1048 self.assertEqual(temp_file.checksum, dev_md5) 1049 1050 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1051 finally: 1052 if base_dir is not None: 1053 shutil.rmtree(base_dir) 1054 1055 def test_unicode_paths(self): 1056 """Ensure that we can support non-ASCII paths, even on Windows.""" 1057 name = u' ' 1058 1059 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1060 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1061 1062 ## push. 1063 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1064 tf.close() 1065 self.device.push(tf.name, remote_path) 1066 os.remove(tf.name) 1067 self.assertFalse(os.path.exists(tf.name)) 1068 1069 # Verify that the device ended up with the expected UTF-8 path 1070 output = self.device.shell( 1071 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1072 self.assertEqual(remote_path.encode('utf-8'), output) 1073 1074 # pull. 1075 self.device.pull(remote_path, tf.name) 1076 self.assertTrue(os.path.exists(tf.name)) 1077 os.remove(tf.name) 1078 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1079 1080 1081 def main(): 1082 random.seed(0) 1083 if len(adb.get_devices()) > 0: 1084 suite = unittest.TestLoader().loadTestsFromName(__name__) 1085 unittest.TextTestRunner(verbosity=3).run(suite) 1086 else: 1087 print('Test suite must be run with attached devices') 1088 1089 1090 if __name__ == '__main__': 1091 main() 1092