1 # 2 # Copyright (C) 2016 The Android Open Source Project 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 # 16 17 import logging 18 import os 19 20 from vts.runners.host import asserts 21 from vts.runners.host import base_test 22 from vts.runners.host import const 23 from vts.runners.host import keys 24 from vts.runners.host import test_runner 25 from vts.utils.python.controllers import adb 26 27 from vts.utils.python.common import list_utils 28 from vts.utils.python.os import path_utils 29 30 from vts.testcases.template.llvmfuzzer_test import llvmfuzzer_test_config as config 31 32 33 class LLVMFuzzerTest(base_test.BaseTestClass): 34 """Runs fuzzer tests on target. 35 36 Attributes: 37 _dut: AndroidDevice, the device under test as config 38 _testcases: string list, list of testcases to run 39 """ 40 41 def setUpClass(self): 42 """Creates a remote shell instance, and copies data files.""" 43 required_params = [ 44 keys.ConfigKeys.IKEY_DATA_FILE_PATH, 45 config.ConfigKeys.FUZZER_CONFIGS 46 ] 47 self.getUserParams(required_params) 48 49 self._testcases = map(lambda x: str(x), self.fuzzer_configs.keys()) 50 51 logging.info("Testcases: %s", self._testcases) 52 logging.info("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH, 53 self.data_file_path) 54 logging.info("%s: %s", config.ConfigKeys.FUZZER_CONFIGS, 55 self.fuzzer_configs) 56 57 self._dut = self.registerController(android_device, False)[0] 58 self._dut.adb.shell("mkdir %s -p" % config.FUZZER_TEST_DIR) 59 60 def tearDownClass(self): 61 """Deletes all copied data.""" 62 self._dut.adb.shell("rm -rf %s" % config.FUZZER_TEST_DIR) 63 64 def PushFiles(self, testcase): 65 """adb pushes testcase file to target. 66 67 Args: 68 testcase: string, path to executable fuzzer. 69 """ 70 push_src = os.path.join(self.data_file_path, config.FUZZER_SRC_DIR, 71 testcase) 72 self._dut.adb.push("%s %s" % (push_src, config.FUZZER_TEST_DIR)) 73 logging.info("Adb pushed: %s", testcase) 74 75 def CreateFuzzerFlags(self, fuzzer_config): 76 """Creates flags for the fuzzer executable. 77 78 Args: 79 fuzzer_config: dict, contains configuration for the fuzzer. 80 81 Returns: 82 string, command line flags for fuzzer executable. 83 """ 84 85 def _SerializeVTSFuzzerParams(params): 86 """Creates VTS command line flags for fuzzer executable. 87 88 Args: 89 params: dict, contains flags and their values. 90 91 Returns: 92 string, of form "--<flag0>=<val0> --<flag1>=<val1> ... " 93 """ 94 VTS_SPEC_FILES = "vts_spec_files" 95 VTS_EXEC_SIZE = "vts_exec_size" 96 DELIMITER = ":" 97 98 # vts_spec_files is a string list, will be serialized like this: 99 # [a, b, c] -> "a:b:c" 100 vts_spec_files = params.get(VTS_SPEC_FILES, {}) 101 target_vts_spec_files = DELIMITER.join(map( 102 lambda x: path_utils.JoinTargetPath(config.FUZZER_SPEC_DIR, x), 103 vts_spec_files)) 104 flags = "--%s=\"%s\" " % (VTS_SPEC_FILES, target_vts_spec_files) 105 106 vts_exec_size = params.get(VTS_EXEC_SIZE, {}) 107 flags += "--%s=%s" % (VTS_EXEC_SIZE, vts_exec_size) 108 return flags 109 110 def _SerializeLLVMFuzzerParams(params): 111 """Creates LLVM libfuzzer command line flags for fuzzer executable. 112 113 Args: 114 params: dict, contains flags and their values. 115 116 Returns: 117 string, of form "--<flag0>=<val0> --<flag1>=<val1> ... " 118 """ 119 return " ".join(["-%s=%s" % (k, v) for k, v in params.items()]) 120 121 vts_fuzzer_params = fuzzer_config.get("vts_fuzzer_params", {}) 122 123 llvmfuzzer_params = config.FUZZER_PARAMS.copy() 124 llvmfuzzer_params.update(fuzzer_config.get("llvmfuzzer_params", {})) 125 126 vts_fuzzer_flags = _SerializeVTSFuzzerParams(vts_fuzzer_params) 127 llvmfuzzer_flags = _SerializeLLVMFuzzerParams(llvmfuzzer_params) 128 129 return vts_fuzzer_flags + " -- " + llvmfuzzer_flags 130 131 def CreateCorpus(self, fuzzer, fuzzer_config): 132 """Creates a corpus directory on target. 133 134 Args: 135 fuzzer: string, name of the fuzzer executable. 136 fuzzer_config: dict, contains configuration for the fuzzer. 137 138 Returns: 139 string, path to corpus directory on the target. 140 """ 141 corpus = fuzzer_config.get("corpus", []) 142 corpus_dir = path_utils.JoinTargetPath(config.FUZZER_TEST_DIR, 143 "%s_corpus" % fuzzer) 144 145 self._dut.adb.shell("mkdir %s -p" % corpus_dir) 146 for idx, corpus_entry in enumerate(corpus): 147 corpus_entry = corpus_entry.replace("x", "\\x") 148 corpus_entry_file = path_utils.JoinTargetPath( 149 corpus_dir, "input%s" % idx) 150 cmd = "echo -ne '%s' > %s" % (str(corpus_entry), corpus_entry_file) 151 # Vts shell drive doesn't play nicely with escape characters, 152 # so we use adb shell. 153 self._dut.adb.shell("\"%s\"" % cmd) 154 155 return corpus_dir 156 157 def RunTestcase(self, fuzzer): 158 """Runs the given testcase and asserts the result. 159 160 Args: 161 fuzzer: string, name of fuzzer executable. 162 """ 163 self.PushFiles(fuzzer) 164 165 fuzzer_config = self.fuzzer_configs.get(fuzzer, {}) 166 test_flags = self.CreateFuzzerFlags(fuzzer_config) 167 corpus_dir = self.CreateCorpus(fuzzer, fuzzer_config) 168 169 chmod_cmd = "chmod -R 755 %s" % path_utils.JoinTargetPath( 170 config.FUZZER_TEST_DIR, fuzzer) 171 self._dut.adb.shell(chmod_cmd) 172 173 cd_cmd = "cd %s" % config.FUZZER_TEST_DIR 174 ld_path = "LD_LIBRARY_PATH=/data/local/tmp/64:/data/local/tmp/32:$LD_LIBRARY_PATH" 175 test_cmd = "./%s" % fuzzer 176 177 fuzz_cmd = "%s && %s %s %s %s > /dev/null" % (cd_cmd, ld_path, 178 test_cmd, corpus_dir, 179 test_flags) 180 logging.info("Executing: %s", fuzz_cmd) 181 # TODO(trong): vts shell doesn't handle timeouts properly, change this after it does. 182 try: 183 stdout = self._dut.adb.shell("'%s'" % fuzz_cmd) 184 result = { 185 const.STDOUT: stdout, 186 const.STDERR: "", 187 const.EXIT_CODE: 0 188 } 189 except adb.AdbError as e: 190 result = { 191 const.STDOUT: e.stdout, 192 const.STDERR: e.stderr, 193 const.EXIT_CODE: e.ret_code 194 } 195 self.AssertTestResult(fuzzer, result) 196 197 def LogCrashReport(self, fuzzer): 198 """Logs crash-causing fuzzer input. 199 200 Reads the crash report file and logs the contents in format: 201 "\x01\x23\x45\x67\x89\xab\xcd\xef" 202 203 Args: 204 fuzzer: string, name of fuzzer executable. 205 """ 206 cmd = "xxd -p %s" % config.FUZZER_TEST_CRASH_REPORT 207 208 # output is string of a hexdump from crash report file. 209 # From the example above, output would be "0123456789abcdef". 210 output = self._dut.adb.shell(cmd) 211 remove_chars = ["\r", "\t", "\n", " "] 212 for char in remove_chars: 213 output = output.replace(char, "") 214 215 crash_report = "" 216 # output is guaranteed to be even in length since its a hexdump. 217 for offset in xrange(0, len(output), 2): 218 crash_report += "\\x%s" % output[offset:offset + 2] 219 220 logging.info('FUZZER_TEST_CRASH_REPORT for %s: "%s"', fuzzer, 221 crash_report) 222 223 # TODO(trong): differentiate between crashes and sanitizer rule violations. 224 def AssertTestResult(self, fuzzer, result): 225 """Asserts that testcase finished as expected. 226 227 Checks that device is in responsive state. If not, waits for boot 228 then reports test as failure. If it is, asserts that all test commands 229 returned exit code 0. 230 231 Args: 232 fuzzer: string, name of fuzzer executable. 233 result: dict(str, str, int), command results from shell. 234 """ 235 logging.info("Test result: %s" % result) 236 if not self._dut.hasBooted(): 237 self._dut.waitForBootCompletion() 238 asserts.fail("%s left the device in unresponsive state." % fuzzer) 239 240 exit_code = result[const.EXIT_CODE] 241 if exit_code == config.ExitCode.FUZZER_TEST_FAIL: 242 self.LogCrashReport(fuzzer) 243 asserts.fail("%s failed normally." % fuzzer) 244 elif exit_code != config.ExitCode.FUZZER_TEST_PASS: 245 asserts.fail("%s failed abnormally." % fuzzer) 246 247 def generateFuzzerTests(self): 248 """Runs fuzzer tests.""" 249 self.runGeneratedTests( 250 test_func=self.RunTestcase, 251 settings=self._testcases, 252 name_func=lambda x: x.split("/")[-1]) 253 254 255 if __name__ == "__main__": 256 test_runner.main() 257