1 #!/usr/bin/env python 2 3 """ 4 This is a generic fuzz testing tool, see --help for more information. 5 """ 6 7 import os 8 import sys 9 import random 10 import subprocess 11 import itertools 12 13 class TestGenerator: 14 def __init__(self, inputs, delete, insert, replace, 15 insert_strings, pick_input): 16 self.inputs = [(s, open(s).read()) for s in inputs] 17 18 self.delete = bool(delete) 19 self.insert = bool(insert) 20 self.replace = bool(replace) 21 self.pick_input = bool(pick_input) 22 self.insert_strings = list(insert_strings) 23 24 self.num_positions = sum([len(d) for _,d in self.inputs]) 25 self.num_insert_strings = len(insert_strings) 26 self.num_tests = ((delete + (insert + replace)*self.num_insert_strings) 27 * self.num_positions) 28 self.num_tests += 1 29 30 if self.pick_input: 31 self.num_tests *= self.num_positions 32 33 def position_to_source_index(self, position): 34 for i,(s,d) in enumerate(self.inputs): 35 n = len(d) 36 if position < n: 37 return (i,position) 38 position -= n 39 raise ValueError,'Invalid position.' 40 41 def get_test(self, index): 42 assert 0 <= index < self.num_tests 43 44 picked_position = None 45 if self.pick_input: 46 index,picked_position = divmod(index, self.num_positions) 47 picked_position = self.position_to_source_index(picked_position) 48 49 if index == 0: 50 return ('nothing', None, None, picked_position) 51 52 index -= 1 53 index,position = divmod(index, self.num_positions) 54 position = self.position_to_source_index(position) 55 if self.delete: 56 if index == 0: 57 return ('delete', position, None, picked_position) 58 index -= 1 59 60 index,insert_index = divmod(index, self.num_insert_strings) 61 insert_str = self.insert_strings[insert_index] 62 if self.insert: 63 if index == 0: 64 return ('insert', position, insert_str, picked_position) 65 index -= 1 66 67 assert self.replace 68 assert index == 0 69 return ('replace', position, insert_str, picked_position) 70 71 class TestApplication: 72 def __init__(self, tg, test): 73 self.tg = tg 74 self.test = test 75 76 def apply(self): 77 if self.test[0] == 'nothing': 78 pass 79 else: 80 i,j = self.test[1] 81 name,data = self.tg.inputs[i] 82 if self.test[0] == 'delete': 83 data = data[:j] + data[j+1:] 84 elif self.test[0] == 'insert': 85 data = data[:j] + self.test[2] + data[j:] 86 elif self.test[0] == 'replace': 87 data = data[:j] + self.test[2] + data[j+1:] 88 else: 89 raise ValueError,'Invalid test %r' % self.test 90 open(name,'wb').write(data) 91 92 def revert(self): 93 if self.test[0] != 'nothing': 94 i,j = self.test[1] 95 name,data = self.tg.inputs[i] 96 open(name,'wb').write(data) 97 98 def quote(str): 99 return '"' + str + '"' 100 101 def run_one_test(test_application, index, input_files, args): 102 test = test_application.test 103 104 # Interpolate arguments. 105 options = { 'index' : index, 106 'inputs' : ' '.join(quote(f) for f in input_files) } 107 108 # Add picked input interpolation arguments, if used. 109 if test[3] is not None: 110 pos = test[3][1] 111 options['picked_input'] = input_files[test[3][0]] 112 options['picked_input_pos'] = pos 113 # Compute the line and column. 114 file_data = test_application.tg.inputs[test[3][0]][1] 115 line = column = 1 116 for i in range(pos): 117 c = file_data[i] 118 if c == '\n': 119 line += 1 120 column = 1 121 else: 122 column += 1 123 options['picked_input_line'] = line 124 options['picked_input_col'] = column 125 126 test_args = [a % options for a in args] 127 if opts.verbose: 128 print '%s: note: executing %r' % (sys.argv[0], test_args) 129 130 stdout = None 131 stderr = None 132 if opts.log_dir: 133 stdout_log_path = os.path.join(opts.log_dir, '%s.out' % index) 134 stderr_log_path = os.path.join(opts.log_dir, '%s.err' % index) 135 stdout = open(stdout_log_path, 'wb') 136 stderr = open(stderr_log_path, 'wb') 137 else: 138 sys.stdout.flush() 139 p = subprocess.Popen(test_args, stdout=stdout, stderr=stderr) 140 p.communicate() 141 exit_code = p.wait() 142 143 test_result = (exit_code == opts.expected_exit_code or 144 exit_code in opts.extra_exit_codes) 145 146 if stdout is not None: 147 stdout.close() 148 stderr.close() 149 150 # Remove the logs for passes, unless logging all results. 151 if not opts.log_all and test_result: 152 os.remove(stdout_log_path) 153 os.remove(stderr_log_path) 154 155 if not test_result: 156 print 'FAIL: %d' % index 157 elif not opts.succinct: 158 print 'PASS: %d' % index 159 160 def main(): 161 global opts 162 from optparse import OptionParser, OptionGroup 163 parser = OptionParser("""%prog [options] ... test command args ... 164 165 %prog is a tool for fuzzing inputs and testing them. 166 167 The most basic usage is something like: 168 169 $ %prog --file foo.txt ./test.sh 170 171 which will run a default list of fuzzing strategies on the input. For each 172 fuzzed input, it will overwrite the input files (in place), run the test script, 173 then restore the files back to their original contents. 174 175 NOTE: You should make sure you have a backup copy of your inputs, in case 176 something goes wrong!!! 177 178 You can cause the fuzzing to not restore the original files with 179 '--no-revert'. Generally this is used with '--test <index>' to run one failing 180 test and then leave the fuzzed inputs in place to examine the failure. 181 182 For each fuzzed input, %prog will run the test command given on the command 183 line. Each argument in the command is subject to string interpolation before 184 being executed. The syntax is "%(VARIABLE)FORMAT" where FORMAT is a standard 185 printf format, and VARIBLE is one of: 186 187 'index' - the test index being run 188 'inputs' - the full list of test inputs 189 'picked_input' - (with --pick-input) the selected input file 190 'picked_input_pos' - (with --pick-input) the selected input position 191 'picked_input_line' - (with --pick-input) the selected input line 192 'picked_input_col' - (with --pick-input) the selected input column 193 194 By default, the script will run forever continually picking new tests to 195 run. You can limit the number of tests that are run with '--max-tests <number>', 196 and you can run a particular test with '--test <index>'. 197 """) 198 parser.add_option("-v", "--verbose", help="Show more output", 199 action='store_true', dest="verbose", default=False) 200 parser.add_option("-s", "--succinct", help="Reduce amount of output", 201 action="store_true", dest="succinct", default=False) 202 203 group = OptionGroup(parser, "Test Execution") 204 group.add_option("", "--expected-exit-code", help="Set expected exit code", 205 type=int, dest="expected_exit_code", 206 default=0) 207 group.add_option("", "--extra-exit-code", 208 help="Set additional expected exit code", 209 type=int, action="append", dest="extra_exit_codes", 210 default=[]) 211 group.add_option("", "--log-dir", 212 help="Capture test logs to an output directory", 213 type=str, dest="log_dir", 214 default=None) 215 group.add_option("", "--log-all", 216 help="Log all outputs (not just failures)", 217 action="store_true", dest="log_all", default=False) 218 parser.add_option_group(group) 219 220 group = OptionGroup(parser, "Input Files") 221 group.add_option("", "--file", metavar="PATH", 222 help="Add an input file to fuzz", 223 type=str, action="append", dest="input_files", default=[]) 224 group.add_option("", "--filelist", metavar="LIST", 225 help="Add a list of inputs files to fuzz (one per line)", 226 type=int, action="append", dest="filelists", default=[]) 227 parser.add_option_group(group) 228 229 group = OptionGroup(parser, "Fuzz Options") 230 group.add_option("", "--replacement-chars", dest="replacement_chars", 231 help="Characters to insert/replace", 232 default="0{}[]<>\;@#$^%& ") 233 group.add_option("", "--replacement-string", dest="replacement_strings", 234 action="append", help="Add a replacement string to use", 235 default=[]) 236 group.add_option("", "--replacement-list", dest="replacement_lists", 237 help="Add a list of replacement strings (one per line)", 238 action="append", default=[]) 239 group.add_option("", "--no-delete", help="Don't delete characters", 240 action='store_false', dest="enable_delete", default=True) 241 group.add_option("", "--no-insert", help="Don't insert strings", 242 action='store_false', dest="enable_insert", default=True) 243 group.add_option("", "--no-replace", help="Don't replace strings", 244 action='store_false', dest="enable_replace", default=True) 245 group.add_option("", "--no-revert", help="Don't revert changes", 246 action='store_false', dest="revert", default=True) 247 parser.add_option_group(group) 248 249 group = OptionGroup(parser, "Test Selection") 250 group.add_option("", "--test", help="Run a particular test", 251 type=int, dest="test", default=None, metavar="INDEX") 252 group.add_option("", "--max-tests", help="Maximum number of tests", 253 type=int, dest="max_tests", default=10, metavar="COUNT") 254 group.add_option("", "--pick-input", 255 help="Randomly select an input byte as well as fuzzing", 256 action='store_true', dest="pick_input", default=False) 257 parser.add_option_group(group) 258 259 parser.disable_interspersed_args() 260 261 (opts, args) = parser.parse_args() 262 263 if not args: 264 parser.error("Invalid number of arguments") 265 266 # Collect the list of inputs. 267 input_files = list(opts.input_files) 268 for filelist in opts.filelists: 269 f = open(filelist) 270 try: 271 for ln in f: 272 ln = ln.strip() 273 if ln: 274 input_files.append(ln) 275 finally: 276 f.close() 277 input_files.sort() 278 279 if not input_files: 280 parser.error("No input files!") 281 282 print '%s: note: fuzzing %d files.' % (sys.argv[0], len(input_files)) 283 284 # Make sure the log directory exists if used. 285 if opts.log_dir: 286 if not os.path.exists(opts.log_dir): 287 try: 288 os.mkdir(opts.log_dir) 289 except OSError: 290 print "%s: error: log directory couldn't be created!" % ( 291 sys.argv[0],) 292 raise SystemExit,1 293 294 # Get the list if insert/replacement strings. 295 replacements = list(opts.replacement_chars) 296 replacements.extend(opts.replacement_strings) 297 for replacement_list in opts.replacement_lists: 298 f = open(replacement_list) 299 try: 300 for ln in f: 301 ln = ln[:-1] 302 if ln: 303 replacements.append(ln) 304 finally: 305 f.close() 306 307 # Unique and order the replacement list. 308 replacements = list(set(replacements)) 309 replacements.sort() 310 311 # Create the test generator. 312 tg = TestGenerator(input_files, opts.enable_delete, opts.enable_insert, 313 opts.enable_replace, replacements, opts.pick_input) 314 315 print '%s: note: %d input bytes.' % (sys.argv[0], tg.num_positions) 316 print '%s: note: %d total tests.' % (sys.argv[0], tg.num_tests) 317 if opts.test is not None: 318 it = [opts.test] 319 elif opts.max_tests is not None: 320 it = itertools.imap(random.randrange, 321 itertools.repeat(tg.num_tests, opts.max_tests)) 322 else: 323 it = itertools.imap(random.randrange, itertools.repeat(tg.num_tests)) 324 for test in it: 325 t = tg.get_test(test) 326 327 if opts.verbose: 328 print '%s: note: running test %d: %r' % (sys.argv[0], test, t) 329 ta = TestApplication(tg, t) 330 try: 331 ta.apply() 332 run_one_test(ta, test, input_files, args) 333 finally: 334 if opts.revert: 335 ta.revert() 336 337 sys.stdout.flush() 338 339 if __name__ == '__main__': 340 main() 341