Home | History | Annotate | Download | only in tests
      1 # Copyright (C) 2015 The Android Open Source Project
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #      http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 
     15 import math
     16 import os
     17 import random
     18 import shutil
     19 import subprocess
     20 import sys
     21 import tempfile
     22 
     23 blocksize = 4096
     24 roots = 2
     25 
     26 def corrupt(image, offset, length):
     27     print "corrupting %d bytes at offset %d" % (length, offset)
     28     f = os.open(image, os.O_WRONLY)
     29     os.lseek(f, offset, os.SEEK_SET)
     30     os.write(f, os.urandom(length))
     31     os.close(f)
     32 
     33 def corruptmax(image, roots):
     34     size = os.stat(image).st_size
     35 
     36     blocks = int(math.ceil(float(size) / blocksize))
     37     rounds = int(math.ceil(float(blocks) / (255 - roots)))
     38 
     39     max_errors = int(math.floor(rounds * roots / 2)) * blocksize
     40     offset = random.randrange(0, size - max_errors)
     41 
     42     corrupt(image, offset, max_errors)
     43 
     44 def encode(image, fec, roots):
     45     if subprocess.call([ "fec", "--roots= " + str(roots), image, fec ]) != 0:
     46         raise Exception("encoding failed")
     47 
     48 def decode(image, fec, output):
     49     return subprocess.call([ "fec", "--decode", image, fec, output ])
     50 
     51 def compare(a, b):
     52     return subprocess.call([ "cmp", "-s", a, b ])
     53 
     54 def simg2img(image, output):
     55     print "creating a non-sparse copy of '%s' to '%s'" % (image, output)
     56     if subprocess.call([ "simg2img", image, output]) != 0:
     57         raise Exception("simg2img failed")
     58 
     59 def main(argv):
     60     image = argv[0]
     61 
     62     temp_img = tempfile.NamedTemporaryFile()
     63     temp_cor = tempfile.NamedTemporaryFile()
     64     temp_fec = tempfile.NamedTemporaryFile()
     65     temp_out = tempfile.NamedTemporaryFile()
     66 
     67     simg2img(image, temp_img.name)
     68     simg2img(image, temp_cor.name)
     69 
     70     encode(image, temp_fec.name, roots)
     71     corruptmax(temp_cor.name, roots)
     72 
     73     if decode(temp_cor.name, temp_fec.name, temp_out.name) != 0:
     74         raise Exception("FAILED: failed to correct maximum expected errors")
     75 
     76     if compare(temp_img.name, temp_out.name) != 0:
     77         raise Exception("FAILED: corrected file not identical")
     78     else:
     79         print "corrected content matches original"
     80 
     81     corrupt(temp_cor.name, 0, blocksize)
     82 
     83     if decode(temp_cor.name, temp_fec.name, temp_out.name) == 0:
     84         raise Exception("FAILED: corrected more than maximum number of errors?")
     85 
     86     print "PASSED"
     87 
     88 if __name__ == '__main__':
     89     main(sys.argv[1:])
     90