Home | History | Annotate | Download | only in utils
      1 #!/usr/bin/env python
      2 
      3 """
      4 This is a generic fuzz testing tool, see --help for more information.
      5 """
      6 
      7 import os
      8 import sys
      9 import random
     10 import subprocess
     11 import itertools
     12 
     13 class TestGenerator:
     14     def __init__(self, inputs, delete, insert, replace,
     15                  insert_strings, pick_input):
     16         self.inputs = [(s, open(s).read()) for s in inputs]
     17 
     18         self.delete = bool(delete)
     19         self.insert = bool(insert)
     20         self.replace = bool(replace)
     21         self.pick_input = bool(pick_input)
     22         self.insert_strings = list(insert_strings)
     23 
     24         self.num_positions = sum([len(d) for _,d in self.inputs])
     25         self.num_insert_strings = len(insert_strings)
     26         self.num_tests = ((delete + (insert + replace)*self.num_insert_strings)
     27                           * self.num_positions)
     28         self.num_tests += 1
     29 
     30         if self.pick_input:
     31             self.num_tests *= self.num_positions
     32 
     33     def position_to_source_index(self, position):
     34         for i,(s,d) in enumerate(self.inputs):
     35             n = len(d)
     36             if position < n:
     37                 return (i,position)
     38             position -= n
     39         raise ValueError,'Invalid position.'
     40 
     41     def get_test(self, index):
     42         assert 0 <= index < self.num_tests
     43 
     44         picked_position = None
     45         if self.pick_input:
     46             index,picked_position = divmod(index, self.num_positions)
     47             picked_position = self.position_to_source_index(picked_position)
     48 
     49         if index == 0:
     50             return ('nothing', None, None, picked_position)
     51 
     52         index -= 1
     53         index,position = divmod(index, self.num_positions)
     54         position = self.position_to_source_index(position)
     55         if self.delete:
     56             if index == 0:
     57                 return ('delete', position, None, picked_position)
     58             index -= 1
     59 
     60         index,insert_index = divmod(index, self.num_insert_strings)
     61         insert_str = self.insert_strings[insert_index]
     62         if self.insert:
     63             if index == 0:
     64                 return ('insert', position, insert_str, picked_position)
     65             index -= 1
     66 
     67         assert self.replace
     68         assert index == 0
     69         return ('replace', position, insert_str, picked_position)
     70 
     71 class TestApplication:
     72     def __init__(self, tg, test):
     73         self.tg = tg
     74         self.test = test
     75 
     76     def apply(self):
     77         if self.test[0] == 'nothing':
     78             pass
     79         else:
     80             i,j = self.test[1]
     81             name,data = self.tg.inputs[i]
     82             if self.test[0] == 'delete':
     83                 data = data[:j] + data[j+1:]
     84             elif self.test[0] == 'insert':
     85                 data = data[:j] + self.test[2] + data[j:]
     86             elif self.test[0] == 'replace':
     87                 data = data[:j] + self.test[2] + data[j+1:]
     88             else:
     89                 raise ValueError,'Invalid test %r' % self.test
     90             open(name,'wb').write(data)
     91 
     92     def revert(self):
     93         if self.test[0] != 'nothing':
     94             i,j = self.test[1]
     95             name,data = self.tg.inputs[i]
     96             open(name,'wb').write(data)
     97 
     98 def quote(str):
     99     return '"' + str + '"'
    100         
    101 def run_one_test(test_application, index, input_files, args):
    102     test = test_application.test
    103 
    104     # Interpolate arguments.
    105     options = { 'index' : index,
    106                 'inputs' : ' '.join(quote(f) for f in input_files) }
    107 
    108     # Add picked input interpolation arguments, if used.
    109     if test[3] is not None:
    110         pos = test[3][1]
    111         options['picked_input'] = input_files[test[3][0]]
    112         options['picked_input_pos'] = pos
    113         # Compute the line and column.
    114         file_data = test_application.tg.inputs[test[3][0]][1]
    115         line = column = 1
    116         for i in range(pos):
    117             c = file_data[i]
    118             if c == '\n':
    119                 line += 1
    120                 column = 1
    121             else:
    122                 column += 1
    123         options['picked_input_line'] = line
    124         options['picked_input_col'] = column
    125         
    126     test_args = [a % options for a in args]
    127     if opts.verbose:
    128         print '%s: note: executing %r' % (sys.argv[0], test_args)
    129 
    130     stdout = None
    131     stderr = None
    132     if opts.log_dir:
    133         stdout_log_path = os.path.join(opts.log_dir, '%s.out' % index)
    134         stderr_log_path = os.path.join(opts.log_dir, '%s.err' % index)
    135         stdout = open(stdout_log_path, 'wb')
    136         stderr = open(stderr_log_path, 'wb')
    137     else:
    138         sys.stdout.flush()
    139     p = subprocess.Popen(test_args, stdout=stdout, stderr=stderr)
    140     p.communicate()
    141     exit_code = p.wait()
    142 
    143     test_result = (exit_code == opts.expected_exit_code or
    144                    exit_code in opts.extra_exit_codes)
    145 
    146     if stdout is not None:
    147         stdout.close()
    148         stderr.close()
    149 
    150         # Remove the logs for passes, unless logging all results.
    151         if not opts.log_all and test_result:
    152             os.remove(stdout_log_path)
    153             os.remove(stderr_log_path)
    154 
    155     if not test_result:
    156         print 'FAIL: %d' % index
    157     elif not opts.succinct:
    158         print 'PASS: %d' % index
    159     return test_result
    160 
    161 def main():
    162     global opts
    163     from optparse import OptionParser, OptionGroup
    164     parser = OptionParser("""%prog [options] ... test command args ...
    165 
    166 %prog is a tool for fuzzing inputs and testing them.
    167 
    168 The most basic usage is something like:
    169 
    170   $ %prog --file foo.txt ./test.sh
    171 
    172 which will run a default list of fuzzing strategies on the input. For each
    173 fuzzed input, it will overwrite the input files (in place), run the test script,
    174 then restore the files back to their original contents.
    175 
    176 NOTE: You should make sure you have a backup copy of your inputs, in case
    177 something goes wrong!!!
    178 
    179 You can cause the fuzzing to not restore the original files with
    180 '--no-revert'. Generally this is used with '--test <index>' to run one failing
    181 test and then leave the fuzzed inputs in place to examine the failure.
    182 
    183 For each fuzzed input, %prog will run the test command given on the command
    184 line. Each argument in the command is subject to string interpolation before
    185 being executed. The syntax is "%(VARIABLE)FORMAT" where FORMAT is a standard
    186 printf format, and VARIABLE is one of:
    187 
    188   'index' - the test index being run
    189   'inputs' - the full list of test inputs
    190   'picked_input'      - (with --pick-input) the selected input file
    191   'picked_input_pos'  - (with --pick-input) the selected input position
    192   'picked_input_line' - (with --pick-input) the selected input line
    193   'picked_input_col'  - (with --pick-input) the selected input column
    194 
    195 By default, the script will run forever continually picking new tests to
    196 run. You can limit the number of tests that are run with '--max-tests <number>',
    197 and you can run a particular test with '--test <index>'.
    198 
    199 You can specify '--stop-on-fail' to stop the script on the first failure
    200 without reverting the changes.
    201 
    202 """)
    203     parser.add_option("-v", "--verbose", help="Show more output",
    204                       action='store_true', dest="verbose", default=False)
    205     parser.add_option("-s", "--succinct",  help="Reduce amount of output",
    206                       action="store_true", dest="succinct", default=False)
    207 
    208     group = OptionGroup(parser, "Test Execution")
    209     group.add_option("", "--expected-exit-code", help="Set expected exit code",
    210                      type=int, dest="expected_exit_code",
    211                      default=0)
    212     group.add_option("", "--extra-exit-code",
    213                      help="Set additional expected exit code",
    214                      type=int, action="append", dest="extra_exit_codes",
    215                      default=[])
    216     group.add_option("", "--log-dir",
    217                      help="Capture test logs to an output directory",
    218                      type=str, dest="log_dir",
    219                      default=None)
    220     group.add_option("", "--log-all",
    221                      help="Log all outputs (not just failures)",
    222                      action="store_true", dest="log_all", default=False)
    223     parser.add_option_group(group)
    224 
    225     group = OptionGroup(parser, "Input Files")
    226     group.add_option("", "--file", metavar="PATH",
    227                      help="Add an input file to fuzz",
    228                      type=str, action="append", dest="input_files", default=[])
    229     group.add_option("", "--filelist", metavar="LIST",
    230                      help="Add a list of inputs files to fuzz (one per line)",
    231                      type=str, action="append", dest="filelists", default=[])
    232     parser.add_option_group(group)
    233 
    234     group = OptionGroup(parser, "Fuzz Options")
    235     group.add_option("", "--replacement-chars", dest="replacement_chars",
    236                      help="Characters to insert/replace",
    237                      default="0{}[]<>\;@#$^%& ")
    238     group.add_option("", "--replacement-string", dest="replacement_strings",
    239                      action="append", help="Add a replacement string to use",
    240                      default=[])
    241     group.add_option("", "--replacement-list", dest="replacement_lists",
    242                      help="Add a list of replacement strings (one per line)",
    243                      action="append", default=[])
    244     group.add_option("", "--no-delete", help="Don't delete characters",
    245                      action='store_false', dest="enable_delete", default=True)
    246     group.add_option("", "--no-insert", help="Don't insert strings",
    247                      action='store_false', dest="enable_insert", default=True)
    248     group.add_option("", "--no-replace", help="Don't replace strings",
    249                      action='store_false', dest="enable_replace", default=True)
    250     group.add_option("", "--no-revert", help="Don't revert changes",
    251                      action='store_false', dest="revert", default=True)
    252     group.add_option("", "--stop-on-fail", help="Stop on first failure",
    253                      action='store_true', dest="stop_on_fail", default=False)
    254     parser.add_option_group(group)
    255 
    256     group = OptionGroup(parser, "Test Selection")
    257     group.add_option("", "--test", help="Run a particular test",
    258                      type=int, dest="test", default=None, metavar="INDEX")
    259     group.add_option("", "--max-tests", help="Maximum number of tests",
    260                      type=int, dest="max_tests", default=None, metavar="COUNT")
    261     group.add_option("", "--pick-input",
    262                      help="Randomly select an input byte as well as fuzzing",
    263                      action='store_true', dest="pick_input", default=False)
    264     parser.add_option_group(group)
    265 
    266     parser.disable_interspersed_args()
    267 
    268     (opts, args) = parser.parse_args()
    269 
    270     if not args:
    271         parser.error("Invalid number of arguments")
    272 
    273     # Collect the list of inputs.
    274     input_files = list(opts.input_files)
    275     for filelist in opts.filelists:
    276         f = open(filelist)
    277         try:
    278             for ln in f:
    279                 ln = ln.strip()
    280                 if ln:
    281                     input_files.append(ln)
    282         finally:
    283             f.close()
    284     input_files.sort()
    285 
    286     if not input_files:
    287         parser.error("No input files!")
    288 
    289     print '%s: note: fuzzing %d files.' % (sys.argv[0], len(input_files))
    290 
    291     # Make sure the log directory exists if used.
    292     if opts.log_dir:
    293         if not os.path.exists(opts.log_dir):
    294             try:
    295                 os.mkdir(opts.log_dir)
    296             except OSError:
    297                 print "%s: error: log directory couldn't be created!" % (
    298                     sys.argv[0],)
    299                 raise SystemExit,1
    300 
    301     # Get the list if insert/replacement strings.
    302     replacements = list(opts.replacement_chars)
    303     replacements.extend(opts.replacement_strings)
    304     for replacement_list in opts.replacement_lists:
    305         f = open(replacement_list)
    306         try:
    307             for ln in f:
    308                 ln = ln[:-1]
    309                 if ln:
    310                     replacements.append(ln)
    311         finally:
    312             f.close()
    313 
    314     # Unique and order the replacement list.
    315     replacements = list(set(replacements))
    316     replacements.sort()
    317 
    318     # Create the test generator.
    319     tg = TestGenerator(input_files, opts.enable_delete, opts.enable_insert,
    320                        opts.enable_replace, replacements, opts.pick_input)
    321 
    322     print '%s: note: %d input bytes.' % (sys.argv[0], tg.num_positions)
    323     print '%s: note: %d total tests.' % (sys.argv[0], tg.num_tests)
    324     if opts.test is not None:
    325         it = [opts.test]
    326     elif opts.max_tests is not None:
    327         it = itertools.imap(random.randrange,
    328                             itertools.repeat(tg.num_tests, opts.max_tests))
    329     else:
    330         it = itertools.imap(random.randrange, itertools.repeat(tg.num_tests))
    331     for test in it:
    332         t = tg.get_test(test)
    333 
    334         if opts.verbose:
    335             print '%s: note: running test %d: %r' % (sys.argv[0], test, t)
    336         ta = TestApplication(tg, t)
    337         try:
    338             ta.apply()
    339             test_result = run_one_test(ta, test, input_files, args)
    340             if not test_result and opts.stop_on_fail:
    341                 opts.revert = False
    342                 sys.exit(1)
    343         finally:
    344             if opts.revert:
    345                 ta.revert()
    346 
    347         sys.stdout.flush()
    348 
    349 if __name__ == '__main__':
    350     main()
    351