Home | History | Annotate | Download | only in tests
      1 #!/usr/bin/env python2
      2 """Simple conformance test for adb.
      3 
      4 This script will use the available adb in path and run simple
      5 tests that attempt to touch all accessible attached devices.
      6 """
      7 import hashlib
      8 import os
      9 import pipes
     10 import random
     11 import re
     12 import shlex
     13 import subprocess
     14 import sys
     15 import tempfile
     16 import unittest
     17 
     18 
     19 def trace(cmd):
     20     """Print debug message if tracing enabled."""
     21     if False:
     22         print >> sys.stderr, cmd
     23 
     24 
     25 def call(cmd_str):
     26     """Run process and return output tuple (stdout, stderr, ret code)."""
     27     trace(cmd_str)
     28     process = subprocess.Popen(shlex.split(cmd_str),
     29                                stdout=subprocess.PIPE,
     30                                stderr=subprocess.PIPE)
     31     stdout, stderr = process.communicate()
     32     return stdout, stderr, process.returncode
     33 
     34 
     35 def call_combined(cmd_str):
     36     """Run process and return output tuple (stdout+stderr, ret code)."""
     37     trace(cmd_str)
     38     process = subprocess.Popen(shlex.split(cmd_str),
     39                                stdout=subprocess.PIPE,
     40                                stderr=subprocess.STDOUT)
     41     stdout, _ = process.communicate()
     42     return stdout, process.returncode
     43 
     44 
     45 def call_checked(cmd_str):
     46     """Run process and get stdout+stderr, raise an exception on trouble."""
     47     trace(cmd_str)
     48     return subprocess.check_output(shlex.split(cmd_str),
     49                                    stderr=subprocess.STDOUT)
     50 
     51 
     52 def call_checked_list(cmd_str):
     53     return call_checked(cmd_str).split('\n')
     54 
     55 
     56 def call_checked_list_skip(cmd_str):
     57     out_list = call_checked_list(cmd_str)
     58 
     59     def is_init_line(line):
     60         if (len(line) >= 3) and (line[0] == "*") and (line[-2] == "*"):
     61             return True
     62         else:
     63             return False
     64 
     65     return [line for line in out_list if not is_init_line(line)]
     66 
     67 
     68 def get_device_list():
     69     output = call_checked_list_skip("adb devices")
     70     dev_list = []
     71     for line in output[1:]:
     72         if line.strip() == "":
     73             continue
     74         device, _ = line.split()
     75         dev_list.append(device)
     76     return dev_list
     77 
     78 
     79 def get_attached_device_count():
     80     return len(get_device_list())
     81 
     82 
     83 def compute_md5(string):
     84     hsh = hashlib.md5()
     85     hsh.update(string)
     86     return hsh.hexdigest()
     87 
     88 
     89 class HostFile(object):
     90     def __init__(self, handle, md5):
     91         self.handle = handle
     92         self.md5 = md5
     93         self.full_path = handle.name
     94         self.base_name = os.path.basename(self.full_path)
     95 
     96 
     97 class DeviceFile(object):
     98     def __init__(self, md5, full_path):
     99         self.md5 = md5
    100         self.full_path = full_path
    101         self.base_name = os.path.basename(self.full_path)
    102 
    103 
    104 def make_random_host_files(in_dir, num_files, rand_size=True):
    105     files = {}
    106     min_size = 1 * (1 << 10)
    107     max_size = 16 * (1 << 10)
    108     fixed_size = min_size
    109 
    110     for _ in range(num_files):
    111         file_handle = tempfile.NamedTemporaryFile(dir=in_dir)
    112 
    113         if rand_size:
    114             size = random.randrange(min_size, max_size, 1024)
    115         else:
    116             size = fixed_size
    117         rand_str = os.urandom(size)
    118         file_handle.write(rand_str)
    119         file_handle.flush()
    120 
    121         md5 = compute_md5(rand_str)
    122         files[file_handle.name] = HostFile(file_handle, md5)
    123     return files
    124 
    125 
    126 def make_random_device_files(adb, in_dir, num_files, rand_size=True):
    127     files = {}
    128     min_size = 1 * (1 << 10)
    129     max_size = 16 * (1 << 10)
    130     fixed_size = min_size
    131 
    132     for i in range(num_files):
    133         if rand_size:
    134             size = random.randrange(min_size, max_size, 1024)
    135         else:
    136             size = fixed_size
    137 
    138         base_name = "device_tmpfile" + str(i)
    139         full_path = in_dir + "/" + base_name
    140 
    141         adb.shell("dd if=/dev/urandom of={} bs={} count=1".format(full_path,
    142                                                                   size))
    143         dev_md5, _ = adb.shell("md5sum {}".format(full_path)).split()
    144 
    145         files[full_path] = DeviceFile(dev_md5, full_path)
    146     return files
    147 
    148 
    149 class AdbWrapper(object):
    150     """Convenience wrapper object for the adb command."""
    151     def __init__(self, device=None, out_dir=None):
    152         self.device = device
    153         self.out_dir = out_dir
    154         self.adb_cmd = "adb "
    155         if self.device:
    156             self.adb_cmd += "-s {} ".format(device)
    157         if self.out_dir:
    158             self.adb_cmd += "-p {} ".format(out_dir)
    159 
    160     def shell(self, cmd):
    161         return call_checked(self.adb_cmd + "shell " + cmd)
    162 
    163     def shell_nocheck(self, cmd):
    164         return call_combined(self.adb_cmd + "shell " + cmd)
    165 
    166     def install(self, filename):
    167         return call_checked(self.adb_cmd + "install {}".format(pipes.quote(filename)))
    168 
    169     def push(self, local, remote):
    170         return call_checked(self.adb_cmd + "push {} {}".format(local, remote))
    171 
    172     def pull(self, remote, local):
    173         return call_checked(self.adb_cmd + "pull {} {}".format(remote, local))
    174 
    175     def sync(self, directory=""):
    176         return call_checked(self.adb_cmd + "sync {}".format(directory))
    177 
    178     def forward(self, local, remote):
    179         return call_checked(self.adb_cmd + "forward {} {}".format(local,
    180                                                                   remote))
    181 
    182     def tcpip(self, port):
    183         return call_checked(self.adb_cmd + "tcpip {}".format(port))
    184 
    185     def usb(self):
    186         return call_checked(self.adb_cmd + "usb")
    187 
    188     def root(self):
    189         return call_checked(self.adb_cmd + "root")
    190 
    191     def unroot(self):
    192         return call_checked(self.adb_cmd + "unroot")
    193 
    194     def forward_remove(self, local):
    195         return call_checked(self.adb_cmd + "forward --remove {}".format(local))
    196 
    197     def forward_remove_all(self):
    198         return call_checked(self.adb_cmd + "forward --remove-all")
    199 
    200     def connect(self, host):
    201         return call_checked(self.adb_cmd + "connect {}".format(host))
    202 
    203     def disconnect(self, host):
    204         return call_checked(self.adb_cmd + "disconnect {}".format(host))
    205 
    206     def reverse(self, remote, local):
    207         return call_checked(self.adb_cmd + "reverse {} {}".format(remote,
    208                                                                   local))
    209 
    210     def reverse_remove_all(self):
    211         return call_checked(self.adb_cmd + "reverse --remove-all")
    212 
    213     def reverse_remove(self, remote):
    214         return call_checked(
    215             self.adb_cmd + "reverse --remove {}".format(remote))
    216 
    217     def wait(self):
    218         return call_checked(self.adb_cmd + "wait-for-device")
    219 
    220 
    221 class AdbBasic(unittest.TestCase):
    222     def test_shell(self):
    223         """Check that we can at least cat a file."""
    224         adb = AdbWrapper()
    225         out = adb.shell("cat /proc/uptime")
    226         self.assertEqual(len(out.split()), 2)
    227         self.assertGreater(float(out.split()[0]), 0.0)
    228         self.assertGreater(float(out.split()[1]), 0.0)
    229 
    230     def test_help(self):
    231         """Make sure we get _something_ out of help."""
    232         out = call_checked("adb help")
    233         self.assertTrue(len(out) > 0)
    234 
    235     def test_version(self):
    236         """Get a version number out of the output of adb."""
    237         out = call_checked("adb version").split()
    238         version_num = False
    239         for item in out:
    240             if re.match(r"[\d+\.]*\d", item):
    241                 version_num = True
    242         self.assertTrue(version_num)
    243 
    244     def _test_root(self):
    245         adb = AdbWrapper()
    246         adb.root()
    247         adb.wait()
    248         self.assertEqual("root", adb.shell("id -un").strip())
    249 
    250     def _test_unroot(self):
    251         adb = AdbWrapper()
    252         adb.unroot()
    253         adb.wait()
    254         self.assertEqual("shell", adb.shell("id -un").strip())
    255 
    256     def test_root_unroot(self):
    257         """Make sure that adb root and adb unroot work, using id(1)."""
    258         adb = AdbWrapper()
    259         original_user = adb.shell("id -un").strip()
    260         try:
    261             if original_user == "root":
    262                 self._test_unroot()
    263                 self._test_root()
    264             elif original_user == "shell":
    265                 self._test_root()
    266                 self._test_unroot()
    267         finally:
    268             if original_user == "root":
    269                 adb.root()
    270             else:
    271                 adb.unroot()
    272             adb.wait()
    273 
    274     def test_argument_escaping(self):
    275         """Make sure that argument escaping is somewhat sane."""
    276         adb = AdbWrapper()
    277 
    278         # http://b/19734868
    279         # Note that this actually matches ssh(1)'s behavior --- it's
    280         # converted to "sh -c echo hello; echo world" which sh interprets
    281         # as "sh -c echo" (with an argument to that shell of "hello"),
    282         # and then "echo world" back in the first shell.
    283         result = adb.shell("sh -c 'echo hello; echo world'").splitlines()
    284         self.assertEqual(["", "world"], result)
    285         # If you really wanted "hello" and "world", here's what you'd do:
    286         result = adb.shell("echo hello\;echo world").splitlines()
    287         self.assertEqual(["hello", "world"], result)
    288 
    289         # http://b/15479704
    290         self.assertEqual('t', adb.shell("'true && echo t'").strip())
    291         self.assertEqual('t', adb.shell("sh -c 'true && echo t'").strip())
    292 
    293         # http://b/20564385
    294         self.assertEqual('t', adb.shell("FOO=a BAR=b echo t").strip())
    295         self.assertEqual('123Linux', adb.shell("echo -n 123\;uname").strip())
    296 
    297     def test_install_argument_escaping(self):
    298         """Make sure that install argument escaping works."""
    299         adb = AdbWrapper()
    300 
    301         # http://b/20323053
    302         tf = tempfile.NamedTemporaryFile("w", suffix="-text;ls;1.apk")
    303         self.assertIn("-text;ls;1.apk", adb.install(tf.name))
    304 
    305         # http://b/3090932
    306         tf = tempfile.NamedTemporaryFile("w", suffix="-Live Hold'em.apk")
    307         self.assertIn("-Live Hold'em.apk", adb.install(tf.name))
    308 
    309 
    310 class AdbFile(unittest.TestCase):
    311     SCRATCH_DIR = "/data/local/tmp"
    312     DEVICE_TEMP_FILE = SCRATCH_DIR + "/adb_test_file"
    313     DEVICE_TEMP_DIR = SCRATCH_DIR + "/adb_test_dir"
    314 
    315     def test_push(self):
    316         """Push a randomly generated file to specified device."""
    317         kbytes = 512
    318         adb = AdbWrapper()
    319         with tempfile.NamedTemporaryFile(mode="w") as tmp:
    320             rand_str = os.urandom(1024 * kbytes)
    321             tmp.write(rand_str)
    322             tmp.flush()
    323 
    324             host_md5 = compute_md5(rand_str)
    325             adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE))
    326             try:
    327                 adb.push(local=tmp.name, remote=AdbFile.DEVICE_TEMP_FILE)
    328                 dev_md5, _ = adb.shell(
    329                     "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split()
    330                 self.assertEqual(host_md5, dev_md5)
    331             finally:
    332                 adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE))
    333 
    334     # TODO: write push directory test.
    335 
    336     def test_pull(self):
    337         """Pull a randomly generated file from specified device."""
    338         kbytes = 512
    339         adb = AdbWrapper()
    340         adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE))
    341         try:
    342             adb.shell("dd if=/dev/urandom of={} bs=1024 count={}".format(
    343                 AdbFile.DEVICE_TEMP_FILE, kbytes))
    344             dev_md5, _ = adb.shell(
    345                 "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split()
    346 
    347             with tempfile.NamedTemporaryFile(mode="w") as tmp_write:
    348                 adb.pull(remote=AdbFile.DEVICE_TEMP_FILE, local=tmp_write.name)
    349                 with open(tmp_write.name) as tmp_read:
    350                     host_contents = tmp_read.read()
    351                     host_md5 = compute_md5(host_contents)
    352                 self.assertEqual(dev_md5, host_md5)
    353         finally:
    354             adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE))
    355 
    356     def test_pull_dir(self):
    357         """Pull a randomly generated directory of files from the device."""
    358         adb = AdbWrapper()
    359         temp_files = {}
    360         host_dir = None
    361         try:
    362             # create temporary host directory
    363             host_dir = tempfile.mkdtemp()
    364 
    365             # create temporary dir on device
    366             adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))
    367             adb.shell("mkdir -p {}".format(AdbFile.DEVICE_TEMP_DIR))
    368 
    369             # populate device dir with random files
    370             temp_files = make_random_device_files(
    371                 adb, in_dir=AdbFile.DEVICE_TEMP_DIR, num_files=32)
    372 
    373             adb.pull(remote=AdbFile.DEVICE_TEMP_DIR, local=host_dir)
    374 
    375             for device_full_path in temp_files:
    376                 host_path = os.path.join(
    377                     host_dir, temp_files[device_full_path].base_name)
    378                 with open(host_path) as host_file:
    379                     host_md5 = compute_md5(host_file.read())
    380                     self.assertEqual(host_md5,
    381                                      temp_files[device_full_path].md5)
    382         finally:
    383             for dev_file in temp_files.values():
    384                 host_path = os.path.join(host_dir, dev_file.base_name)
    385                 os.remove(host_path)
    386             adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))
    387             if host_dir:
    388                 os.removedirs(host_dir)
    389 
    390     def test_sync(self):
    391         """Sync a randomly generated directory of files to specified device."""
    392         try:
    393             adb = AdbWrapper()
    394             temp_files = {}
    395 
    396             # create temporary host directory
    397             base_dir = tempfile.mkdtemp()
    398 
    399             # create mirror device directory hierarchy within base_dir
    400             full_dir_path = base_dir + AdbFile.DEVICE_TEMP_DIR
    401             os.makedirs(full_dir_path)
    402 
    403             # create 32 random files within the host mirror
    404             temp_files = make_random_host_files(in_dir=full_dir_path,
    405                                                 num_files=32)
    406 
    407             # clean up any trash on the device
    408             adb = AdbWrapper(out_dir=base_dir)
    409             adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))
    410 
    411             # issue the sync
    412             adb.sync("data")
    413 
    414             # confirm that every file on the device mirrors that on the host
    415             for host_full_path in temp_files.keys():
    416                 device_full_path = os.path.join(
    417                     AdbFile.DEVICE_TEMP_DIR,
    418                     temp_files[host_full_path].base_name)
    419                 dev_md5, _ = adb.shell(
    420                     "md5sum {}".format(device_full_path)).split()
    421                 self.assertEqual(temp_files[host_full_path].md5, dev_md5)
    422 
    423         finally:
    424             adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))
    425             if temp_files:
    426                 for tf in temp_files.values():
    427                     tf.handle.close()
    428             if base_dir:
    429                 os.removedirs(base_dir + AdbFile.DEVICE_TEMP_DIR)
    430 
    431 
    432 if __name__ == '__main__':
    433     random.seed(0)
    434     dev_count = get_attached_device_count()
    435     if dev_count:
    436         suite = unittest.TestLoader().loadTestsFromName(__name__)
    437         unittest.TextTestRunner(verbosity=3).run(suite)
    438     else:
    439         print "Test suite must be run with attached devices"
    440