Home | History | Annotate | Download | only in adb
      1 #!/usr/bin/env python
      2 # -*- coding: utf-8 -*-
      3 #
      4 # Copyright (C) 2015 The Android Open Source Project
      5 #
      6 # Licensed under the Apache License, Version 2.0 (the "License");
      7 # you may not use this file except in compliance with the License.
      8 # You may obtain a copy of the License at
      9 #
     10 #      http://www.apache.org/licenses/LICENSE-2.0
     11 #
     12 # Unless required by applicable law or agreed to in writing, software
     13 # distributed under the License is distributed on an "AS IS" BASIS,
     14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     15 # See the License for the specific language governing permissions and
     16 # limitations under the License.
     17 #
     18 from __future__ import print_function
     19 
     20 import contextlib
     21 import hashlib
     22 import os
     23 import posixpath
     24 import random
     25 import re
     26 import shlex
     27 import shutil
     28 import signal
     29 import socket
     30 import string
     31 import subprocess
     32 import sys
     33 import tempfile
     34 import unittest
     35 
     36 import mock
     37 
     38 import adb
     39 
     40 
     41 def requires_root(func):
     42     def wrapper(self, *args):
     43         if self.device.get_prop('ro.debuggable') != '1':
     44             raise unittest.SkipTest('requires rootable build')
     45 
     46         was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
     47         if not was_root:
     48             self.device.root()
     49             self.device.wait()
     50 
     51         try:
     52             func(self, *args)
     53         finally:
     54             if not was_root:
     55                 self.device.unroot()
     56                 self.device.wait()
     57 
     58     return wrapper
     59 
     60 
     61 def requires_non_root(func):
     62     def wrapper(self, *args):
     63         was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
     64         if was_root:
     65             self.device.unroot()
     66             self.device.wait()
     67 
     68         try:
     69             func(self, *args)
     70         finally:
     71             if was_root:
     72                 self.device.root()
     73                 self.device.wait()
     74 
     75     return wrapper
     76 
     77 
     78 class GetDeviceTest(unittest.TestCase):
     79     def setUp(self):
     80         self.android_serial = os.getenv('ANDROID_SERIAL')
     81         if 'ANDROID_SERIAL' in os.environ:
     82             del os.environ['ANDROID_SERIAL']
     83 
     84     def tearDown(self):
     85         if self.android_serial is not None:
     86             os.environ['ANDROID_SERIAL'] = self.android_serial
     87         else:
     88             if 'ANDROID_SERIAL' in os.environ:
     89                 del os.environ['ANDROID_SERIAL']
     90 
     91     @mock.patch('adb.device.get_devices')
     92     def test_explicit(self, mock_get_devices):
     93         mock_get_devices.return_value = ['foo', 'bar']
     94         device = adb.get_device('foo')
     95         self.assertEqual(device.serial, 'foo')
     96 
     97     @mock.patch('adb.device.get_devices')
     98     def test_from_env(self, mock_get_devices):
     99         mock_get_devices.return_value = ['foo', 'bar']
    100         os.environ['ANDROID_SERIAL'] = 'foo'
    101         device = adb.get_device()
    102         self.assertEqual(device.serial, 'foo')
    103 
    104     @mock.patch('adb.device.get_devices')
    105     def test_arg_beats_env(self, mock_get_devices):
    106         mock_get_devices.return_value = ['foo', 'bar']
    107         os.environ['ANDROID_SERIAL'] = 'bar'
    108         device = adb.get_device('foo')
    109         self.assertEqual(device.serial, 'foo')
    110 
    111     @mock.patch('adb.device.get_devices')
    112     def test_no_such_device(self, mock_get_devices):
    113         mock_get_devices.return_value = ['foo', 'bar']
    114         self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
    115 
    116         os.environ['ANDROID_SERIAL'] = 'baz'
    117         self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
    118 
    119     @mock.patch('adb.device.get_devices')
    120     def test_unique_device(self, mock_get_devices):
    121         mock_get_devices.return_value = ['foo']
    122         device = adb.get_device()
    123         self.assertEqual(device.serial, 'foo')
    124 
    125     @mock.patch('adb.device.get_devices')
    126     def test_no_unique_device(self, mock_get_devices):
    127         mock_get_devices.return_value = ['foo', 'bar']
    128         self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
    129 
    130 
    131 class DeviceTest(unittest.TestCase):
    132     def setUp(self):
    133         self.device = adb.get_device()
    134 
    135 
    136 class ForwardReverseTest(DeviceTest):
    137     def _test_no_rebind(self, description, direction_list, direction,
    138                        direction_no_rebind, direction_remove_all):
    139         msg = direction_list()
    140         self.assertEqual('', msg.strip(),
    141                          description + ' list must be empty to run this test.')
    142 
    143         # Use --no-rebind with no existing binding
    144         direction_no_rebind('tcp:5566', 'tcp:6655')
    145         msg = direction_list()
    146         self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
    147 
    148         # Use --no-rebind with existing binding
    149         with self.assertRaises(subprocess.CalledProcessError):
    150             direction_no_rebind('tcp:5566', 'tcp:6677')
    151         msg = direction_list()
    152         self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
    153         self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
    154 
    155         # Use the absence of --no-rebind with existing binding
    156         direction('tcp:5566', 'tcp:6677')
    157         msg = direction_list()
    158         self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
    159         self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
    160 
    161         direction_remove_all()
    162         msg = direction_list()
    163         self.assertEqual('', msg.strip())
    164 
    165     def test_forward_no_rebind(self):
    166         self._test_no_rebind('forward', self.device.forward_list,
    167                             self.device.forward, self.device.forward_no_rebind,
    168                             self.device.forward_remove_all)
    169 
    170     def test_reverse_no_rebind(self):
    171         self._test_no_rebind('reverse', self.device.reverse_list,
    172                             self.device.reverse, self.device.reverse_no_rebind,
    173                             self.device.reverse_remove_all)
    174 
    175     def test_forward(self):
    176         msg = self.device.forward_list()
    177         self.assertEqual('', msg.strip(),
    178                          'Forwarding list must be empty to run this test.')
    179         self.device.forward('tcp:5566', 'tcp:6655')
    180         msg = self.device.forward_list()
    181         self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
    182         self.device.forward('tcp:7788', 'tcp:8877')
    183         msg = self.device.forward_list()
    184         self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
    185         self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
    186         self.device.forward_remove('tcp:5566')
    187         msg = self.device.forward_list()
    188         self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
    189         self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
    190         self.device.forward_remove_all()
    191         msg = self.device.forward_list()
    192         self.assertEqual('', msg.strip())
    193 
    194     def test_reverse(self):
    195         msg = self.device.reverse_list()
    196         self.assertEqual('', msg.strip(),
    197                          'Reverse forwarding list must be empty to run this test.')
    198         self.device.reverse('tcp:5566', 'tcp:6655')
    199         msg = self.device.reverse_list()
    200         self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
    201         self.device.reverse('tcp:7788', 'tcp:8877')
    202         msg = self.device.reverse_list()
    203         self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
    204         self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
    205         self.device.reverse_remove('tcp:5566')
    206         msg = self.device.reverse_list()
    207         self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
    208         self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
    209         self.device.reverse_remove_all()
    210         msg = self.device.reverse_list()
    211         self.assertEqual('', msg.strip())
    212 
    213     # Note: If you run this test when adb connect'd to a physical device over
    214     # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821
    215     def test_forward_reverse_echo(self):
    216         """Send data through adb forward and read it back via adb reverse"""
    217         forward_port = 12345
    218         reverse_port = forward_port + 1
    219         forward_spec = 'tcp:' + str(forward_port)
    220         reverse_spec = 'tcp:' + str(reverse_port)
    221         forward_setup = False
    222         reverse_setup = False
    223 
    224         try:
    225             # listen on localhost:forward_port, connect to remote:forward_port
    226             self.device.forward(forward_spec, forward_spec)
    227             forward_setup = True
    228             # listen on remote:forward_port, connect to localhost:reverse_port
    229             self.device.reverse(forward_spec, reverse_spec)
    230             reverse_setup = True
    231 
    232             listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    233             with contextlib.closing(listener):
    234                 # Use SO_REUSEADDR so that subsequent runs of the test can grab
    235                 # the port even if it is in TIME_WAIT.
    236                 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    237 
    238                 # Listen on localhost:reverse_port before connecting to
    239                 # localhost:forward_port because that will cause adb to connect
    240                 # back to localhost:reverse_port.
    241                 listener.bind(('127.0.0.1', reverse_port))
    242                 listener.listen(4)
    243 
    244                 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    245                 with contextlib.closing(client):
    246                     # Connect to the listener.
    247                     client.connect(('127.0.0.1', forward_port))
    248 
    249                     # Accept the client connection.
    250                     accepted_connection, addr = listener.accept()
    251                     with contextlib.closing(accepted_connection) as server:
    252                         data = 'hello'
    253 
    254                         # Send data into the port setup by adb forward.
    255                         client.sendall(data)
    256                         # Explicitly close() so that server gets EOF.
    257                         client.close()
    258 
    259                         # Verify that the data came back via adb reverse.
    260                         self.assertEqual(data, server.makefile().read())
    261         finally:
    262             if reverse_setup:
    263                 self.device.reverse_remove(forward_spec)
    264             if forward_setup:
    265                 self.device.forward_remove(forward_spec)
    266 
    267 
    268 class ShellTest(DeviceTest):
    269     def _interactive_shell(self, shell_args, input):
    270         """Runs an interactive adb shell.
    271 
    272         Args:
    273           shell_args: List of string arguments to `adb shell`.
    274           input: String input to send to the interactive shell.
    275 
    276         Returns:
    277           The remote exit code.
    278 
    279         Raises:
    280           unittest.SkipTest: The device doesn't support exit codes.
    281         """
    282         if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
    283             raise unittest.SkipTest('exit codes are unavailable on this device')
    284 
    285         proc = subprocess.Popen(
    286                 self.device.adb_cmd + ['shell'] + shell_args,
    287                 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
    288                 stderr=subprocess.PIPE)
    289         # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
    290         # to explicitly add an exit command to close the session from the device
    291         # side, plus the necessary newline to complete the interactive command.
    292         proc.communicate(input + '; exit\n')
    293         return proc.returncode
    294 
    295     def test_cat(self):
    296         """Check that we can at least cat a file."""
    297         out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
    298         elements = out.split()
    299         self.assertEqual(len(elements), 2)
    300 
    301         uptime, idle = elements
    302         self.assertGreater(float(uptime), 0.0)
    303         self.assertGreater(float(idle), 0.0)
    304 
    305     def test_throws_on_failure(self):
    306         self.assertRaises(adb.ShellError, self.device.shell, ['false'])
    307 
    308     def test_output_not_stripped(self):
    309         out = self.device.shell(['echo', 'foo'])[0]
    310         self.assertEqual(out, 'foo' + self.device.linesep)
    311 
    312     def test_shell_nocheck_failure(self):
    313         rc, out, _ = self.device.shell_nocheck(['false'])
    314         self.assertNotEqual(rc, 0)
    315         self.assertEqual(out, '')
    316 
    317     def test_shell_nocheck_output_not_stripped(self):
    318         rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
    319         self.assertEqual(rc, 0)
    320         self.assertEqual(out, 'foo' + self.device.linesep)
    321 
    322     def test_can_distinguish_tricky_results(self):
    323         # If result checking on ADB shell is naively implemented as
    324         # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
    325         # output from the result for a cmd of `echo -n 1`.
    326         rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
    327         self.assertEqual(rc, 0)
    328         self.assertEqual(out, '1')
    329 
    330     def test_line_endings(self):
    331         """Ensure that line ending translation is not happening in the pty.
    332 
    333         Bug: http://b/19735063
    334         """
    335         output = self.device.shell(['uname'])[0]
    336         self.assertEqual(output, 'Linux' + self.device.linesep)
    337 
    338     def test_pty_logic(self):
    339         """Tests that a PTY is allocated when it should be.
    340 
    341         PTY allocation behavior should match ssh; some behavior requires
    342         a terminal stdin to test so this test will be skipped if stdin
    343         is not a terminal.
    344         """
    345         if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
    346             raise unittest.SkipTest('PTY arguments unsupported on this device')
    347         if not os.isatty(sys.stdin.fileno()):
    348             raise unittest.SkipTest('PTY tests require stdin terminal')
    349 
    350         def check_pty(args):
    351             """Checks adb shell PTY allocation.
    352 
    353             Tests |args| for terminal and non-terminal stdin.
    354 
    355             Args:
    356                 args: -Tt args in a list (e.g. ['-t', '-t']).
    357 
    358             Returns:
    359                 A tuple (<terminal>, <non-terminal>). True indicates
    360                 the corresponding shell allocated a remote PTY.
    361             """
    362             test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
    363 
    364             terminal = subprocess.Popen(
    365                     test_cmd, stdin=None,
    366                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    367             terminal.communicate()
    368 
    369             non_terminal = subprocess.Popen(
    370                     test_cmd, stdin=subprocess.PIPE,
    371                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    372             non_terminal.communicate()
    373 
    374             return (terminal.returncode == 0, non_terminal.returncode == 0)
    375 
    376         # -T: never allocate PTY.
    377         self.assertEqual((False, False), check_pty(['-T']))
    378 
    379         # No args: PTY only if stdin is a terminal and shell is interactive,
    380         # which is difficult to reliably test from a script.
    381         self.assertEqual((False, False), check_pty([]))
    382 
    383         # -t: PTY if stdin is a terminal.
    384         self.assertEqual((True, False), check_pty(['-t']))
    385 
    386         # -t -t: always allocate PTY.
    387         self.assertEqual((True, True), check_pty(['-t', '-t']))
    388 
    389     def test_shell_protocol(self):
    390         """Tests the shell protocol on the device.
    391 
    392         If the device supports shell protocol, this gives us the ability
    393         to separate stdout/stderr and return the exit code directly.
    394 
    395         Bug: http://b/19734861
    396         """
    397         if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
    398             raise unittest.SkipTest('shell protocol unsupported on this device')
    399 
    400         # Shell protocol should be used by default.
    401         result = self.device.shell_nocheck(
    402                 shlex.split('echo foo; echo bar >&2; exit 17'))
    403         self.assertEqual(17, result[0])
    404         self.assertEqual('foo' + self.device.linesep, result[1])
    405         self.assertEqual('bar' + self.device.linesep, result[2])
    406 
    407         self.assertEqual(17, self._interactive_shell([], 'exit 17'))
    408 
    409         # -x flag should disable shell protocol.
    410         result = self.device.shell_nocheck(
    411                 shlex.split('-x echo foo; echo bar >&2; exit 17'))
    412         self.assertEqual(0, result[0])
    413         self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
    414         self.assertEqual('', result[2])
    415 
    416         self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
    417 
    418     def test_non_interactive_sigint(self):
    419         """Tests that SIGINT in a non-interactive shell kills the process.
    420 
    421         This requires the shell protocol in order to detect the broken
    422         pipe; raw data transfer mode will only see the break once the
    423         subprocess tries to read or write.
    424 
    425         Bug: http://b/23825725
    426         """
    427         if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
    428             raise unittest.SkipTest('shell protocol unsupported on this device')
    429 
    430         # Start a long-running process.
    431         sleep_proc = subprocess.Popen(
    432                 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
    433                 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
    434                 stderr=subprocess.STDOUT)
    435         remote_pid = sleep_proc.stdout.readline().strip()
    436         self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
    437         proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
    438 
    439         # Verify that the process is running, send signal, verify it stopped.
    440         self.device.shell(proc_query)
    441         os.kill(sleep_proc.pid, signal.SIGINT)
    442         sleep_proc.communicate()
    443         self.assertEqual(1, self.device.shell_nocheck(proc_query)[0],
    444                          'subprocess failed to terminate')
    445 
    446     def test_non_interactive_stdin(self):
    447         """Tests that non-interactive shells send stdin."""
    448         if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
    449             raise unittest.SkipTest('non-interactive stdin unsupported '
    450                                     'on this device')
    451 
    452         # Test both small and large inputs.
    453         small_input = 'foo'
    454         large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
    455                                                   string.digits))
    456 
    457         for input in (small_input, large_input):
    458             proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
    459                                     stdin=subprocess.PIPE,
    460                                     stdout=subprocess.PIPE,
    461                                     stderr=subprocess.PIPE)
    462             stdout, stderr = proc.communicate(input)
    463             self.assertEqual(input.splitlines(), stdout.splitlines())
    464             self.assertEqual('', stderr)
    465 
    466 
    467 class ArgumentEscapingTest(DeviceTest):
    468     def test_shell_escaping(self):
    469         """Make sure that argument escaping is somewhat sane."""
    470 
    471         # http://b/19734868
    472         # Note that this actually matches ssh(1)'s behavior --- it's
    473         # converted to `sh -c echo hello; echo world` which sh interprets
    474         # as `sh -c echo` (with an argument to that shell of "hello"),
    475         # and then `echo world` back in the first shell.
    476         result = self.device.shell(
    477             shlex.split("sh -c 'echo hello; echo world'"))[0]
    478         result = result.splitlines()
    479         self.assertEqual(['', 'world'], result)
    480         # If you really wanted "hello" and "world", here's what you'd do:
    481         result = self.device.shell(
    482             shlex.split(r'echo hello\;echo world'))[0].splitlines()
    483         self.assertEqual(['hello', 'world'], result)
    484 
    485         # http://b/15479704
    486         result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
    487         self.assertEqual('t', result)
    488         result = self.device.shell(
    489             shlex.split("sh -c 'true && echo t'"))[0].strip()
    490         self.assertEqual('t', result)
    491 
    492         # http://b/20564385
    493         result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
    494         self.assertEqual('t', result)
    495         result = self.device.shell(
    496             shlex.split(r'echo -n 123\;uname'))[0].strip()
    497         self.assertEqual('123Linux', result)
    498 
    499     def test_install_argument_escaping(self):
    500         """Make sure that install argument escaping works."""
    501         # http://b/20323053, http://b/3090932.
    502         for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
    503             tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
    504                                              delete=False)
    505             tf.close()
    506 
    507             # Installing bogus .apks fails if the device supports exit codes.
    508             try:
    509                 output = self.device.install(tf.name)
    510             except subprocess.CalledProcessError as e:
    511                 output = e.output
    512 
    513             self.assertIn(file_suffix, output)
    514             os.remove(tf.name)
    515 
    516 
    517 class RootUnrootTest(DeviceTest):
    518     def _test_root(self):
    519         message = self.device.root()
    520         if 'adbd cannot run as root in production builds' in message:
    521             return
    522         self.device.wait()
    523         self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
    524 
    525     def _test_unroot(self):
    526         self.device.unroot()
    527         self.device.wait()
    528         self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
    529 
    530     def test_root_unroot(self):
    531         """Make sure that adb root and adb unroot work, using id(1)."""
    532         if self.device.get_prop('ro.debuggable') != '1':
    533             raise unittest.SkipTest('requires rootable build')
    534 
    535         original_user = self.device.shell(['id', '-un'])[0].strip()
    536         try:
    537             if original_user == 'root':
    538                 self._test_unroot()
    539                 self._test_root()
    540             elif original_user == 'shell':
    541                 self._test_root()
    542                 self._test_unroot()
    543         finally:
    544             if original_user == 'root':
    545                 self.device.root()
    546             else:
    547                 self.device.unroot()
    548             self.device.wait()
    549 
    550 
    551 class TcpIpTest(DeviceTest):
    552     def test_tcpip_failure_raises(self):
    553         """adb tcpip requires a port.
    554 
    555         Bug: http://b/22636927
    556         """
    557         self.assertRaises(
    558             subprocess.CalledProcessError, self.device.tcpip, '')
    559         self.assertRaises(
    560             subprocess.CalledProcessError, self.device.tcpip, 'foo')
    561 
    562 
    563 class SystemPropertiesTest(DeviceTest):
    564     def test_get_prop(self):
    565         self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
    566 
    567     @requires_root
    568     def test_set_prop(self):
    569         prop_name = 'foo.bar'
    570         self.device.shell(['setprop', prop_name, '""'])
    571 
    572         self.device.set_prop(prop_name, 'qux')
    573         self.assertEqual(
    574             self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
    575 
    576 
    577 def compute_md5(string):
    578     hsh = hashlib.md5()
    579     hsh.update(string)
    580     return hsh.hexdigest()
    581 
    582 
    583 def get_md5_prog(device):
    584     """Older platforms (pre-L) had the name md5 rather than md5sum."""
    585     try:
    586         device.shell(['md5sum', '/proc/uptime'])
    587         return 'md5sum'
    588     except adb.ShellError:
    589         return 'md5'
    590 
    591 
    592 class HostFile(object):
    593     def __init__(self, handle, checksum):
    594         self.handle = handle
    595         self.checksum = checksum
    596         self.full_path = handle.name
    597         self.base_name = os.path.basename(self.full_path)
    598 
    599 
    600 class DeviceFile(object):
    601     def __init__(self, checksum, full_path):
    602         self.checksum = checksum
    603         self.full_path = full_path
    604         self.base_name = posixpath.basename(self.full_path)
    605 
    606 
    607 def make_random_host_files(in_dir, num_files):
    608     min_size = 1 * (1 << 10)
    609     max_size = 16 * (1 << 10)
    610 
    611     files = []
    612     for _ in xrange(num_files):
    613         file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
    614 
    615         size = random.randrange(min_size, max_size, 1024)
    616         rand_str = os.urandom(size)
    617         file_handle.write(rand_str)
    618         file_handle.flush()
    619         file_handle.close()
    620 
    621         md5 = compute_md5(rand_str)
    622         files.append(HostFile(file_handle, md5))
    623     return files
    624 
    625 
    626 def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
    627     min_size = 1 * (1 << 10)
    628     max_size = 16 * (1 << 10)
    629 
    630     files = []
    631     for file_num in xrange(num_files):
    632         size = random.randrange(min_size, max_size, 1024)
    633 
    634         base_name = prefix + str(file_num)
    635         full_path = posixpath.join(in_dir, base_name)
    636 
    637         device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
    638                       'bs={}'.format(size), 'count=1'])
    639         dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
    640 
    641         files.append(DeviceFile(dev_md5, full_path))
    642     return files
    643 
    644 
    645 class FileOperationsTest(DeviceTest):
    646     SCRATCH_DIR = '/data/local/tmp'
    647     DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
    648     DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
    649 
    650     def _verify_remote(self, checksum, remote_path):
    651         dev_md5, _ = self.device.shell([get_md5_prog(self.device),
    652                                         remote_path])[0].split()
    653         self.assertEqual(checksum, dev_md5)
    654 
    655     def _verify_local(self, checksum, local_path):
    656         with open(local_path, 'rb') as host_file:
    657             host_md5 = compute_md5(host_file.read())
    658             self.assertEqual(host_md5, checksum)
    659 
    660     def test_push(self):
    661         """Push a randomly generated file to specified device."""
    662         kbytes = 512
    663         tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
    664         rand_str = os.urandom(1024 * kbytes)
    665         tmp.write(rand_str)
    666         tmp.close()
    667 
    668         self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
    669         self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
    670 
    671         self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
    672         self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
    673 
    674         os.remove(tmp.name)
    675 
    676     def test_push_dir(self):
    677         """Push a randomly generated directory of files to the device."""
    678         self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    679         self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
    680 
    681         try:
    682             host_dir = tempfile.mkdtemp()
    683 
    684             # Make sure the temp directory isn't setuid, or else adb will complain.
    685             os.chmod(host_dir, 0o700)
    686 
    687             # Create 32 random files.
    688             temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
    689             self.device.push(host_dir, self.DEVICE_TEMP_DIR)
    690 
    691             for temp_file in temp_files:
    692                 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
    693                                              os.path.basename(host_dir),
    694                                              temp_file.base_name)
    695                 self._verify_remote(temp_file.checksum, remote_path)
    696             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    697         finally:
    698             if host_dir is not None:
    699                 shutil.rmtree(host_dir)
    700 
    701     @unittest.expectedFailure # b/25566053
    702     def test_push_empty(self):
    703         """Push a directory containing an empty directory to the device."""
    704         self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    705         self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
    706 
    707         try:
    708             host_dir = tempfile.mkdtemp()
    709 
    710             # Make sure the temp directory isn't setuid, or else adb will complain.
    711             os.chmod(host_dir, 0o700)
    712 
    713             # Create an empty directory.
    714             os.mkdir(os.path.join(host_dir, 'empty'))
    715 
    716             self.device.push(host_dir, self.DEVICE_TEMP_DIR)
    717 
    718             test_empty_cmd = ['[', '-d',
    719                               os.path.join(self.DEVICE_TEMP_DIR, 'empty')]
    720             rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
    721             self.assertEqual(rc, 0)
    722             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    723         finally:
    724             if host_dir is not None:
    725                 shutil.rmtree(host_dir)
    726 
    727     def test_multiple_push(self):
    728         """Push multiple files to the device in one adb push command.
    729 
    730         Bug: http://b/25324823
    731         """
    732 
    733         self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    734         self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
    735 
    736         try:
    737             host_dir = tempfile.mkdtemp()
    738 
    739             # Create some random files and a subdirectory containing more files.
    740             temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
    741 
    742             subdir = os.path.join(host_dir, 'subdir')
    743             os.mkdir(subdir)
    744             subdir_temp_files = make_random_host_files(in_dir=subdir,
    745                                                        num_files=4)
    746 
    747             paths = map(lambda temp_file: temp_file.full_path, temp_files)
    748             paths.append(subdir)
    749             self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
    750 
    751             for temp_file in temp_files:
    752                 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
    753                                              temp_file.base_name)
    754                 self._verify_remote(temp_file.checksum, remote_path)
    755 
    756             for subdir_temp_file in subdir_temp_files:
    757                 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
    758                                              # BROKEN: http://b/25394682
    759                                              # 'subdir';
    760                                              temp_file.base_name)
    761                 self._verify_remote(temp_file.checksum, remote_path)
    762 
    763 
    764             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    765         finally:
    766             if host_dir is not None:
    767                 shutil.rmtree(host_dir)
    768 
    769     @requires_non_root
    770     def test_push_error_reporting(self):
    771         """Make sure that errors that occur while pushing a file get reported
    772 
    773         Bug: http://b/26816782
    774         """
    775         with tempfile.NamedTemporaryFile() as tmp_file:
    776             tmp_file.write('\0' * 1024 * 1024)
    777             tmp_file.flush()
    778             try:
    779                 self.device.push(local=tmp_file.name, remote='/system/')
    780                 self.fail('push should not have succeeded')
    781             except subprocess.CalledProcessError as e:
    782                 output = e.output
    783 
    784             self.assertIn('Permission denied', output)
    785 
    786     def _test_pull(self, remote_file, checksum):
    787         tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
    788         tmp_write.close()
    789         self.device.pull(remote=remote_file, local=tmp_write.name)
    790         with open(tmp_write.name, 'rb') as tmp_read:
    791             host_contents = tmp_read.read()
    792             host_md5 = compute_md5(host_contents)
    793         self.assertEqual(checksum, host_md5)
    794         os.remove(tmp_write.name)
    795 
    796     @requires_non_root
    797     def test_pull_error_reporting(self):
    798         self.device.shell(['touch', self.DEVICE_TEMP_FILE])
    799         self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
    800 
    801         try:
    802             output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
    803         except subprocess.CalledProcessError as e:
    804             output = e.output
    805 
    806         self.assertIn('Permission denied', output)
    807 
    808         self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
    809 
    810     def test_pull(self):
    811         """Pull a randomly generated file from specified device."""
    812         kbytes = 512
    813         self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
    814         cmd = ['dd', 'if=/dev/urandom',
    815                'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
    816                'count={}'.format(kbytes)]
    817         self.device.shell(cmd)
    818         dev_md5, _ = self.device.shell(
    819             [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
    820         self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
    821         self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
    822 
    823     def test_pull_dir(self):
    824         """Pull a randomly generated directory of files from the device."""
    825         try:
    826             host_dir = tempfile.mkdtemp()
    827 
    828             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    829             self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
    830 
    831             # Populate device directory with random files.
    832             temp_files = make_random_device_files(
    833                 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
    834 
    835             self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
    836 
    837             for temp_file in temp_files:
    838                 host_path = os.path.join(
    839                     host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
    840                     temp_file.base_name)
    841                 self._verify_local(temp_file.checksum, host_path)
    842 
    843             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    844         finally:
    845             if host_dir is not None:
    846                 shutil.rmtree(host_dir)
    847 
    848     def test_pull_dir_symlink(self):
    849         """Pull a directory into a symlink to a directory.
    850 
    851         Bug: http://b/27362811
    852         """
    853         if os.name != 'posix':
    854             raise unittest.SkipTest('requires POSIX')
    855 
    856         try:
    857             host_dir = tempfile.mkdtemp()
    858             real_dir = os.path.join(host_dir, 'dir')
    859             symlink = os.path.join(host_dir, 'symlink')
    860             os.mkdir(real_dir)
    861             os.symlink(real_dir, symlink)
    862 
    863             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    864             self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
    865 
    866             # Populate device directory with random files.
    867             temp_files = make_random_device_files(
    868                 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
    869 
    870             self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
    871 
    872             for temp_file in temp_files:
    873                 host_path = os.path.join(
    874                     real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
    875                     temp_file.base_name)
    876                 self._verify_local(temp_file.checksum, host_path)
    877 
    878             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    879         finally:
    880             if host_dir is not None:
    881                 shutil.rmtree(host_dir)
    882 
    883     def test_pull_dir_symlink_collision(self):
    884         """Pull a directory into a colliding symlink to directory."""
    885         if os.name != 'posix':
    886             raise unittest.SkipTest('requires POSIX')
    887 
    888         try:
    889             host_dir = tempfile.mkdtemp()
    890             real_dir = os.path.join(host_dir, 'real')
    891             tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
    892             symlink = os.path.join(host_dir, tmp_dirname)
    893             os.mkdir(real_dir)
    894             os.symlink(real_dir, symlink)
    895 
    896             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    897             self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
    898 
    899             # Populate device directory with random files.
    900             temp_files = make_random_device_files(
    901                 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
    902 
    903             self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
    904 
    905             for temp_file in temp_files:
    906                 host_path = os.path.join(real_dir, temp_file.base_name)
    907                 self._verify_local(temp_file.checksum, host_path)
    908 
    909             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    910         finally:
    911             if host_dir is not None:
    912                 shutil.rmtree(host_dir)
    913 
    914     def test_pull_dir_nonexistent(self):
    915         """Pull a directory of files from the device to a nonexistent path."""
    916         try:
    917             host_dir = tempfile.mkdtemp()
    918             dest_dir = os.path.join(host_dir, 'dest')
    919 
    920             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    921             self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
    922 
    923             # Populate device directory with random files.
    924             temp_files = make_random_device_files(
    925                 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
    926 
    927             self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
    928 
    929             for temp_file in temp_files:
    930                 host_path = os.path.join(dest_dir, temp_file.base_name)
    931                 self._verify_local(temp_file.checksum, host_path)
    932 
    933             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    934         finally:
    935             if host_dir is not None:
    936                 shutil.rmtree(host_dir)
    937 
    938     def test_pull_symlink_dir(self):
    939         """Pull a symlink to a directory of symlinks to files."""
    940         try:
    941             host_dir = tempfile.mkdtemp()
    942 
    943             remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
    944             remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
    945             remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
    946 
    947             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    948             self.device.shell(['mkdir', '-p', remote_dir, remote_links])
    949             self.device.shell(['ln', '-s', remote_links, remote_symlink])
    950 
    951             # Populate device directory with random files.
    952             temp_files = make_random_device_files(
    953                 self.device, in_dir=remote_dir, num_files=32)
    954 
    955             for temp_file in temp_files:
    956                 self.device.shell(
    957                     ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
    958                      posixpath.join(remote_links, temp_file.base_name)])
    959 
    960             self.device.pull(remote=remote_symlink, local=host_dir)
    961 
    962             for temp_file in temp_files:
    963                 host_path = os.path.join(
    964                     host_dir, 'symlink', temp_file.base_name)
    965                 self._verify_local(temp_file.checksum, host_path)
    966 
    967             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    968         finally:
    969             if host_dir is not None:
    970                 shutil.rmtree(host_dir)
    971 
    972     def test_pull_empty(self):
    973         """Pull a directory containing an empty directory from the device."""
    974         try:
    975             host_dir = tempfile.mkdtemp()
    976 
    977             remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
    978             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    979             self.device.shell(['mkdir', '-p', remote_empty_path])
    980 
    981             self.device.pull(remote=remote_empty_path, local=host_dir)
    982             self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
    983         finally:
    984             if host_dir is not None:
    985                 shutil.rmtree(host_dir)
    986 
    987     def test_multiple_pull(self):
    988         """Pull a randomly generated directory of files from the device."""
    989 
    990         try:
    991             host_dir = tempfile.mkdtemp()
    992 
    993             subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
    994             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
    995             self.device.shell(['mkdir', '-p', subdir])
    996 
    997             # Create some random files and a subdirectory containing more files.
    998             temp_files = make_random_device_files(
    999                 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
   1000 
   1001             subdir_temp_files = make_random_device_files(
   1002                 self.device, in_dir=subdir, num_files=4, prefix='subdir_')
   1003 
   1004             paths = map(lambda temp_file: temp_file.full_path, temp_files)
   1005             paths.append(subdir)
   1006             self.device._simple_call(['pull'] + paths + [host_dir])
   1007 
   1008             for temp_file in temp_files:
   1009                 local_path = os.path.join(host_dir, temp_file.base_name)
   1010                 self._verify_local(temp_file.checksum, local_path)
   1011 
   1012             for subdir_temp_file in subdir_temp_files:
   1013                 local_path = os.path.join(host_dir,
   1014                                           'subdir',
   1015                                           subdir_temp_file.base_name)
   1016                 self._verify_local(subdir_temp_file.checksum, local_path)
   1017 
   1018             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
   1019         finally:
   1020             if host_dir is not None:
   1021                 shutil.rmtree(host_dir)
   1022 
   1023     def test_sync(self):
   1024         """Sync a randomly generated directory of files to specified device."""
   1025 
   1026         try:
   1027             base_dir = tempfile.mkdtemp()
   1028 
   1029             # Create mirror device directory hierarchy within base_dir.
   1030             full_dir_path = base_dir + self.DEVICE_TEMP_DIR
   1031             os.makedirs(full_dir_path)
   1032 
   1033             # Create 32 random files within the host mirror.
   1034             temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
   1035 
   1036             # Clean up any trash on the device.
   1037             device = adb.get_device(product=base_dir)
   1038             device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
   1039 
   1040             device.sync('data')
   1041 
   1042             # Confirm that every file on the device mirrors that on the host.
   1043             for temp_file in temp_files:
   1044                 device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
   1045                                                   temp_file.base_name)
   1046                 dev_md5, _ = device.shell(
   1047                     [get_md5_prog(self.device), device_full_path])[0].split()
   1048                 self.assertEqual(temp_file.checksum, dev_md5)
   1049 
   1050             self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
   1051         finally:
   1052             if base_dir is not None:
   1053                 shutil.rmtree(base_dir)
   1054 
   1055     def test_unicode_paths(self):
   1056         """Ensure that we can support non-ASCII paths, even on Windows."""
   1057         name = u' '
   1058 
   1059         self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
   1060         remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
   1061 
   1062         ## push.
   1063         tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
   1064         tf.close()
   1065         self.device.push(tf.name, remote_path)
   1066         os.remove(tf.name)
   1067         self.assertFalse(os.path.exists(tf.name))
   1068 
   1069         # Verify that the device ended up with the expected UTF-8 path
   1070         output = self.device.shell(
   1071                 ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
   1072         self.assertEqual(remote_path.encode('utf-8'), output)
   1073 
   1074         # pull.
   1075         self.device.pull(remote_path, tf.name)
   1076         self.assertTrue(os.path.exists(tf.name))
   1077         os.remove(tf.name)
   1078         self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
   1079 
   1080 
   1081 def main():
   1082     random.seed(0)
   1083     if len(adb.get_devices()) > 0:
   1084         suite = unittest.TestLoader().loadTestsFromName(__name__)
   1085         unittest.TextTestRunner(verbosity=3).run(suite)
   1086     else:
   1087         print('Test suite must be run with attached devices')
   1088 
   1089 
   1090 if __name__ == '__main__':
   1091     main()
   1092