1 #!/usr/bin/env python3.4 2 # 3 # Copyright (C) 2016 The Android Open Source Project 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 # 17 18 import json 19 import logging 20 import os 21 22 from vts.runners.host import asserts 23 from vts.runners.host import base_test 24 from vts.runners.host import const 25 from vts.runners.host import keys 26 from vts.runners.host import test_runner 27 from vts.utils.python.controllers import adb 28 from vts.utils.python.controllers import android_device 29 from vts.utils.python.os import path_utils 30 31 from vts.testcases.security.poc.host import poc_test_config as config 32 33 class SecurityPoCKernelTest(base_test.BaseTestClass): 34 """Runs security PoC kernel test cases. 35 36 Attributes: 37 _dut: AndroidDevice, the device under test as config 38 _testcases: string list, list of testcases to run 39 _model: string, device model e.g. "Nexus 5X" 40 """ 41 def setUpClass(self): 42 """Creates device under test instance, and copies data files.""" 43 required_params = [ 44 keys.ConfigKeys.IKEY_DATA_FILE_PATH, 45 keys.ConfigKeys.IKEY_ABI_BITNESS, 46 config.ConfigKeys.RUN_STAGING 47 ] 48 self.getUserParams(required_params) 49 50 logging.info("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH, 51 self.data_file_path) 52 53 self._dut = self.registerController(android_device, False)[0] 54 self._testcases = config.POC_TEST_CASES_STABLE 55 if self.run_staging: 56 self._testcases += config.POC_TEST_CASES_STAGING 57 58 def tearDownClass(self): 59 """Deletes all copied data.""" 60 self._dut.adb.shell("rm -rf %s" % config.POC_TEST_DIR) 61 62 def PushFiles(self): 63 """adb pushes related file to target.""" 64 self._dut.adb.shell("mkdir %s -p" % config.POC_TEST_DIR) 65 66 bitness = getattr(self, keys.ConfigKeys.IKEY_ABI_BITNESS) 67 bitness_suffix = "64" if bitness == "64" else "" 68 native_test_dir = "nativetest{0}".format(bitness_suffix) 69 push_src = os.path.join(self.data_file_path, "DATA", native_test_dir, 70 "security", "poc", ".") 71 self._dut.adb.push("%s %s" % (push_src, config.POC_TEST_DIR)) 72 73 def CreateHostInput(self, testcase): 74 """Gathers information that will be passed to target-side code. 75 76 Args: 77 testcase: string, format testsuite/testname, specifies which 78 test case to examine. 79 80 Returns: 81 dict, information passed to native PoC test, contains info collected 82 from device and config. If None, poc should be skipped. 83 """ 84 out = self._dut.adb.shell("getprop ro.product.model") 85 device_model = out.strip() 86 testcase_path = os.path.join(*testcase.split("/")) 87 test_config_path = os.path.join( 88 self.data_file_path, "vts", "testcases", "security", "poc", 89 "target", testcase_path, "poc.config") 90 91 with open(test_config_path) as test_config_file: 92 poc_config = json.load(test_config_file)["target_models"] 93 94 # If dut model is not in the test config, test should be skipped. 95 if not device_model in poc_config.keys(): 96 return None 97 98 params = poc_config.get("default", {}) 99 params.update(poc_config[device_model]) 100 101 host_input = { 102 "device_model": device_model, 103 "params": params 104 } 105 106 return host_input 107 108 def CreateTestFlags(self, host_input): 109 """Packs host input info into command line flags. 110 111 Args: 112 host_input: dict, information passed to native PoC test. 113 114 Returns: 115 string, host_input packed into command-line flags. 116 """ 117 device_model_flag = "--device_model=\"%s\"" % host_input["device_model"] 118 119 params = ["%s=%s" % (k, v) for k, v in host_input["params"].items()] 120 params = ",".join(params) 121 params_flag = "--params=\"%s\"" % params 122 123 test_flags = [device_model_flag, params_flag] 124 return " ".join(test_flags) 125 126 def RunTestcase(self, testcase): 127 """Runs the given testcase and asserts the result. 128 129 Args: 130 testcase: string, format testsuite/testname, specifies which 131 test case to run. 132 """ 133 host_input = self.CreateHostInput(testcase) 134 asserts.skipIf(not host_input, 135 "%s not configured to run against this target model." % testcase) 136 137 items = testcase.split("/", 1) 138 testsuite = items[0] 139 140 chmod_cmd = "chmod -R 755 %s" % path_utils.JoinTargetPath( 141 config.POC_TEST_DIR, testsuite) 142 logging.info("Executing: %s", chmod_cmd) 143 self._dut.adb.shell(chmod_cmd) 144 145 test_flags = self.CreateTestFlags(host_input) 146 test_cmd = "%s %s" % ( 147 path_utils.JoinTargetPath(config.POC_TEST_DIR, testcase), 148 test_flags) 149 logging.info("Executing: %s", test_cmd) 150 151 try: 152 stdout = self._dut.adb.shell(test_cmd) 153 result = { 154 const.STDOUT: stdout, 155 const.STDERR: "", 156 const.EXIT_CODE: 0 157 } 158 except adb.AdbError as e: 159 result = { 160 const.STDOUT: e.stdout, 161 const.STDERR: e.stderr, 162 const.EXIT_CODE: e.ret_code 163 } 164 logging.info("Test results:\n%s", result) 165 166 self.AssertTestResult(result) 167 168 def AssertTestResult(self, result): 169 """Asserts that testcase finished as expected. 170 171 Checks that device is in responsive state. If not, waits for boot 172 then reports test as failure. If it is, asserts that all test commands 173 returned exit code 0. 174 175 Args: 176 result: dict(str, str, int), stdout, stderr and return code 177 from test run. 178 """ 179 if self._dut.hasBooted(): 180 exit_code = result[const.EXIT_CODE] 181 asserts.skipIf(exit_code == config.ExitCode.POC_TEST_SKIP, 182 "Test case was skipped.") 183 asserts.assertFalse(exit_code == config.ExitCode.POC_TEST_FAIL, 184 "Test case failed.") 185 else: 186 self._dut.waitForBootCompletion() 187 self._dut.rootAdb() 188 self.PushFiles() 189 asserts.fail("Test case left the device in unresponsive state.") 190 191 def generateSecurityPoCTests(self): 192 """Runs security PoC tests.""" 193 self.PushFiles() 194 self.runGeneratedTests( 195 test_func=self.RunTestcase, 196 settings=self._testcases, 197 name_func=lambda x: x.replace('/','_')) 198 199 if __name__ == "__main__": 200 test_runner.main() 201