Home | History | Annotate | Download | only in utils
      1 #!/usr/bin/env python
      2 
      3 """
      4 This is a generic fuzz testing tool, see --help for more information.
      5 """
      6 
      7 import os
      8 import sys
      9 import random
     10 import subprocess
     11 import itertools
     12 
     13 class TestGenerator:
     14     def __init__(self, inputs, delete, insert, replace,
     15                  insert_strings, pick_input):
     16         self.inputs = [(s, open(s).read()) for s in inputs]
     17 
     18         self.delete = bool(delete)
     19         self.insert = bool(insert)
     20         self.replace = bool(replace)
     21         self.pick_input = bool(pick_input)
     22         self.insert_strings = list(insert_strings)
     23 
     24         self.num_positions = sum([len(d) for _,d in self.inputs])
     25         self.num_insert_strings = len(insert_strings)
     26         self.num_tests = ((delete + (insert + replace)*self.num_insert_strings)
     27                           * self.num_positions)
     28         self.num_tests += 1
     29 
     30         if self.pick_input:
     31             self.num_tests *= self.num_positions
     32 
     33     def position_to_source_index(self, position):
     34         for i,(s,d) in enumerate(self.inputs):
     35             n = len(d)
     36             if position < n:
     37                 return (i,position)
     38             position -= n
     39         raise ValueError,'Invalid position.'
     40 
     41     def get_test(self, index):
     42         assert 0 <= index < self.num_tests
     43 
     44         picked_position = None
     45         if self.pick_input:
     46             index,picked_position = divmod(index, self.num_positions)
     47             picked_position = self.position_to_source_index(picked_position)
     48 
     49         if index == 0:
     50             return ('nothing', None, None, picked_position)
     51 
     52         index -= 1
     53         index,position = divmod(index, self.num_positions)
     54         position = self.position_to_source_index(position)
     55         if self.delete:
     56             if index == 0:
     57                 return ('delete', position, None, picked_position)
     58             index -= 1
     59 
     60         index,insert_index = divmod(index, self.num_insert_strings)
     61         insert_str = self.insert_strings[insert_index]
     62         if self.insert:
     63             if index == 0:
     64                 return ('insert', position, insert_str, picked_position)
     65             index -= 1
     66 
     67         assert self.replace
     68         assert index == 0
     69         return ('replace', position, insert_str, picked_position)
     70 
     71 class TestApplication:
     72     def __init__(self, tg, test):
     73         self.tg = tg
     74         self.test = test
     75 
     76     def apply(self):
     77         if self.test[0] == 'nothing':
     78             pass
     79         else:
     80             i,j = self.test[1]
     81             name,data = self.tg.inputs[i]
     82             if self.test[0] == 'delete':
     83                 data = data[:j] + data[j+1:]
     84             elif self.test[0] == 'insert':
     85                 data = data[:j] + self.test[2] + data[j:]
     86             elif self.test[0] == 'replace':
     87                 data = data[:j] + self.test[2] + data[j+1:]
     88             else:
     89                 raise ValueError,'Invalid test %r' % self.test
     90             open(name,'wb').write(data)
     91 
     92     def revert(self):
     93         if self.test[0] != 'nothing':
     94             i,j = self.test[1]
     95             name,data = self.tg.inputs[i]
     96             open(name,'wb').write(data)
     97 
     98 def quote(str):
     99     return '"' + str + '"'
    100         
    101 def run_one_test(test_application, index, input_files, args):
    102     test = test_application.test
    103 
    104     # Interpolate arguments.
    105     options = { 'index' : index,
    106                 'inputs' : ' '.join(quote(f) for f in input_files) }
    107 
    108     # Add picked input interpolation arguments, if used.
    109     if test[3] is not None:
    110         pos = test[3][1]
    111         options['picked_input'] = input_files[test[3][0]]
    112         options['picked_input_pos'] = pos
    113         # Compute the line and column.
    114         file_data = test_application.tg.inputs[test[3][0]][1]
    115         line = column = 1
    116         for i in range(pos):
    117             c = file_data[i]
    118             if c == '\n':
    119                 line += 1
    120                 column = 1
    121             else:
    122                 column += 1
    123         options['picked_input_line'] = line
    124         options['picked_input_col'] = column
    125         
    126     test_args = [a % options for a in args]
    127     if opts.verbose:
    128         print '%s: note: executing %r' % (sys.argv[0], test_args)
    129 
    130     stdout = None
    131     stderr = None
    132     if opts.log_dir:
    133         stdout_log_path = os.path.join(opts.log_dir, '%s.out' % index)
    134         stderr_log_path = os.path.join(opts.log_dir, '%s.err' % index)
    135         stdout = open(stdout_log_path, 'wb')
    136         stderr = open(stderr_log_path, 'wb')
    137     else:
    138         sys.stdout.flush()
    139     p = subprocess.Popen(test_args, stdout=stdout, stderr=stderr)
    140     p.communicate()
    141     exit_code = p.wait()
    142 
    143     test_result = (exit_code == opts.expected_exit_code or
    144                    exit_code in opts.extra_exit_codes)
    145 
    146     if stdout is not None:
    147         stdout.close()
    148         stderr.close()
    149 
    150         # Remove the logs for passes, unless logging all results.
    151         if not opts.log_all and test_result:
    152             os.remove(stdout_log_path)
    153             os.remove(stderr_log_path)
    154 
    155     if not test_result:
    156         print 'FAIL: %d' % index
    157     elif not opts.succinct:
    158         print 'PASS: %d' % index
    159 
    160 def main():
    161     global opts
    162     from optparse import OptionParser, OptionGroup
    163     parser = OptionParser("""%prog [options] ... test command args ...
    164 
    165 %prog is a tool for fuzzing inputs and testing them.
    166 
    167 The most basic usage is something like:
    168 
    169   $ %prog --file foo.txt ./test.sh
    170 
    171 which will run a default list of fuzzing strategies on the input. For each
    172 fuzzed input, it will overwrite the input files (in place), run the test script,
    173 then restore the files back to their original contents.
    174 
    175 NOTE: You should make sure you have a backup copy of your inputs, in case
    176 something goes wrong!!!
    177 
    178 You can cause the fuzzing to not restore the original files with
    179 '--no-revert'. Generally this is used with '--test <index>' to run one failing
    180 test and then leave the fuzzed inputs in place to examine the failure.
    181 
    182 For each fuzzed input, %prog will run the test command given on the command
    183 line. Each argument in the command is subject to string interpolation before
    184 being executed. The syntax is "%(VARIABLE)FORMAT" where FORMAT is a standard
    185 printf format, and VARIBLE is one of:
    186 
    187   'index' - the test index being run
    188   'inputs' - the full list of test inputs
    189   'picked_input'      - (with --pick-input) the selected input file
    190   'picked_input_pos'  - (with --pick-input) the selected input position
    191   'picked_input_line' - (with --pick-input) the selected input line
    192   'picked_input_col'  - (with --pick-input) the selected input column
    193 
    194 By default, the script will run forever continually picking new tests to
    195 run. You can limit the number of tests that are run with '--max-tests <number>',
    196 and you can run a particular test with '--test <index>'.
    197 """)
    198     parser.add_option("-v", "--verbose", help="Show more output",
    199                       action='store_true', dest="verbose", default=False)
    200     parser.add_option("-s", "--succinct",  help="Reduce amount of output",
    201                       action="store_true", dest="succinct", default=False)
    202 
    203     group = OptionGroup(parser, "Test Execution")
    204     group.add_option("", "--expected-exit-code", help="Set expected exit code",
    205                      type=int, dest="expected_exit_code",
    206                      default=0)
    207     group.add_option("", "--extra-exit-code",
    208                      help="Set additional expected exit code",
    209                      type=int, action="append", dest="extra_exit_codes",
    210                      default=[])
    211     group.add_option("", "--log-dir",
    212                      help="Capture test logs to an output directory",
    213                      type=str, dest="log_dir",
    214                      default=None)
    215     group.add_option("", "--log-all",
    216                      help="Log all outputs (not just failures)",
    217                      action="store_true", dest="log_all", default=False)
    218     parser.add_option_group(group)
    219 
    220     group = OptionGroup(parser, "Input Files")
    221     group.add_option("", "--file", metavar="PATH",
    222                      help="Add an input file to fuzz",
    223                      type=str, action="append", dest="input_files", default=[])
    224     group.add_option("", "--filelist", metavar="LIST",
    225                      help="Add a list of inputs files to fuzz (one per line)",
    226                      type=int, action="append", dest="filelists", default=[])
    227     parser.add_option_group(group)
    228 
    229     group = OptionGroup(parser, "Fuzz Options")
    230     group.add_option("", "--replacement-chars", dest="replacement_chars",
    231                      help="Characters to insert/replace",
    232                      default="0{}[]<>\;@#$^%& ")
    233     group.add_option("", "--replacement-string", dest="replacement_strings",
    234                      action="append", help="Add a replacement string to use",
    235                      default=[])
    236     group.add_option("", "--replacement-list", dest="replacement_lists",
    237                      help="Add a list of replacement strings (one per line)",
    238                      action="append", default=[])
    239     group.add_option("", "--no-delete", help="Don't delete characters",
    240                      action='store_false', dest="enable_delete", default=True)
    241     group.add_option("", "--no-insert", help="Don't insert strings",
    242                      action='store_false', dest="enable_insert", default=True)
    243     group.add_option("", "--no-replace", help="Don't replace strings",
    244                      action='store_false', dest="enable_replace", default=True)
    245     group.add_option("", "--no-revert", help="Don't revert changes",
    246                      action='store_false', dest="revert", default=True)
    247     parser.add_option_group(group)
    248 
    249     group = OptionGroup(parser, "Test Selection")
    250     group.add_option("", "--test", help="Run a particular test",
    251                      type=int, dest="test", default=None, metavar="INDEX")
    252     group.add_option("", "--max-tests", help="Maximum number of tests",
    253                      type=int, dest="max_tests", default=10, metavar="COUNT")
    254     group.add_option("", "--pick-input",
    255                      help="Randomly select an input byte as well as fuzzing",
    256                      action='store_true', dest="pick_input", default=False)
    257     parser.add_option_group(group)
    258 
    259     parser.disable_interspersed_args()
    260 
    261     (opts, args) = parser.parse_args()
    262 
    263     if not args:
    264         parser.error("Invalid number of arguments")
    265 
    266     # Collect the list of inputs.
    267     input_files = list(opts.input_files)
    268     for filelist in opts.filelists:
    269         f = open(filelist)
    270         try:
    271             for ln in f:
    272                 ln = ln.strip()
    273                 if ln:
    274                     input_files.append(ln)
    275         finally:
    276             f.close()
    277     input_files.sort()
    278 
    279     if not input_files:
    280         parser.error("No input files!")
    281 
    282     print '%s: note: fuzzing %d files.' % (sys.argv[0], len(input_files))
    283 
    284     # Make sure the log directory exists if used.
    285     if opts.log_dir:
    286         if not os.path.exists(opts.log_dir):
    287             try:
    288                 os.mkdir(opts.log_dir)
    289             except OSError:
    290                 print "%s: error: log directory couldn't be created!" % (
    291                     sys.argv[0],)
    292                 raise SystemExit,1
    293 
    294     # Get the list if insert/replacement strings.
    295     replacements = list(opts.replacement_chars)
    296     replacements.extend(opts.replacement_strings)
    297     for replacement_list in opts.replacement_lists:
    298         f = open(replacement_list)
    299         try:
    300             for ln in f:
    301                 ln = ln[:-1]
    302                 if ln:
    303                     replacements.append(ln)
    304         finally:
    305             f.close()
    306 
    307     # Unique and order the replacement list.
    308     replacements = list(set(replacements))
    309     replacements.sort()
    310 
    311     # Create the test generator.
    312     tg = TestGenerator(input_files, opts.enable_delete, opts.enable_insert,
    313                        opts.enable_replace, replacements, opts.pick_input)
    314 
    315     print '%s: note: %d input bytes.' % (sys.argv[0], tg.num_positions)
    316     print '%s: note: %d total tests.' % (sys.argv[0], tg.num_tests)
    317     if opts.test is not None:
    318         it = [opts.test]
    319     elif opts.max_tests is not None:
    320         it = itertools.imap(random.randrange,
    321                             itertools.repeat(tg.num_tests, opts.max_tests))
    322     else:
    323         it = itertools.imap(random.randrange, itertools.repeat(tg.num_tests))
    324     for test in it:
    325         t = tg.get_test(test)
    326 
    327         if opts.verbose:
    328             print '%s: note: running test %d: %r' % (sys.argv[0], test, t)
    329         ta = TestApplication(tg, t)
    330         try:
    331             ta.apply()
    332             run_one_test(ta, test, input_files, args)
    333         finally:
    334             if opts.revert:
    335                 ta.revert()
    336 
    337         sys.stdout.flush()
    338 
    339 if __name__ == '__main__':
    340     main()
    341