Home | History | Annotate | Download | only in releasetools
      1 #
      2 # Copyright (C) 2015 The Android Open Source Project
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #      http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 #
     16 
     17 import os
     18 import subprocess
     19 import tempfile
     20 import time
     21 import unittest
     22 import zipfile
     23 from hashlib import sha1
     24 
     25 import common
     26 import test_utils
     27 import validate_target_files
     28 from rangelib import RangeSet
     29 
     30 
     31 KiB = 1024
     32 MiB = 1024 * KiB
     33 GiB = 1024 * MiB
     34 
     35 
     36 def get_2gb_string():
     37   size = int(2 * GiB + 1)
     38   block_size = 4 * KiB
     39   step_size = 4 * MiB
     40   # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
     41   for _ in range(0, size, step_size):
     42     yield os.urandom(block_size)
     43     yield '\0' * (step_size - block_size)
     44 
     45 
     46 class CommonZipTest(unittest.TestCase):
     47   def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
     48               test_file_name=None, expected_stat=None, expected_mode=0o644,
     49               expected_compress_type=zipfile.ZIP_STORED):
     50     # Verify the stat if present.
     51     if test_file_name is not None:
     52       new_stat = os.stat(test_file_name)
     53       self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
     54       self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
     55 
     56     # Reopen the zip file to verify.
     57     zip_file = zipfile.ZipFile(zip_file_name, "r")
     58 
     59     # Verify the timestamp.
     60     info = zip_file.getinfo(arcname)
     61     self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
     62 
     63     # Verify the file mode.
     64     mode = (info.external_attr >> 16) & 0o777
     65     self.assertEqual(mode, expected_mode)
     66 
     67     # Verify the compress type.
     68     self.assertEqual(info.compress_type, expected_compress_type)
     69 
     70     # Verify the zip contents.
     71     entry = zip_file.open(arcname)
     72     sha1_hash = sha1()
     73     for chunk in iter(lambda: entry.read(4 * MiB), ''):
     74       sha1_hash.update(chunk)
     75     self.assertEqual(expected_hash, sha1_hash.hexdigest())
     76     self.assertIsNone(zip_file.testzip())
     77 
     78   def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
     79     extra_zipwrite_args = dict(extra_zipwrite_args or {})
     80 
     81     test_file = tempfile.NamedTemporaryFile(delete=False)
     82     test_file_name = test_file.name
     83 
     84     zip_file = tempfile.NamedTemporaryFile(delete=False)
     85     zip_file_name = zip_file.name
     86 
     87     # File names within an archive strip the leading slash.
     88     arcname = extra_zipwrite_args.get("arcname", test_file_name)
     89     if arcname[0] == "/":
     90       arcname = arcname[1:]
     91 
     92     zip_file.close()
     93     zip_file = zipfile.ZipFile(zip_file_name, "w")
     94 
     95     try:
     96       sha1_hash = sha1()
     97       for data in contents:
     98         sha1_hash.update(data)
     99         test_file.write(data)
    100       test_file.close()
    101 
    102       expected_stat = os.stat(test_file_name)
    103       expected_mode = extra_zipwrite_args.get("perms", 0o644)
    104       expected_compress_type = extra_zipwrite_args.get("compress_type",
    105                                                        zipfile.ZIP_STORED)
    106       time.sleep(5)  # Make sure the atime/mtime will change measurably.
    107 
    108       common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
    109       common.ZipClose(zip_file)
    110 
    111       self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
    112                    test_file_name, expected_stat, expected_mode,
    113                    expected_compress_type)
    114     finally:
    115       os.remove(test_file_name)
    116       os.remove(zip_file_name)
    117 
    118   def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
    119     extra_args = dict(extra_args or {})
    120 
    121     zip_file = tempfile.NamedTemporaryFile(delete=False)
    122     zip_file_name = zip_file.name
    123     zip_file.close()
    124 
    125     zip_file = zipfile.ZipFile(zip_file_name, "w")
    126 
    127     try:
    128       expected_compress_type = extra_args.get("compress_type",
    129                                               zipfile.ZIP_STORED)
    130       time.sleep(5)  # Make sure the atime/mtime will change measurably.
    131 
    132       if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
    133         arcname = zinfo_or_arcname
    134         expected_mode = extra_args.get("perms", 0o644)
    135       else:
    136         arcname = zinfo_or_arcname.filename
    137         expected_mode = extra_args.get("perms",
    138                                        zinfo_or_arcname.external_attr >> 16)
    139 
    140       common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
    141       common.ZipClose(zip_file)
    142 
    143       self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
    144                    expected_mode=expected_mode,
    145                    expected_compress_type=expected_compress_type)
    146     finally:
    147       os.remove(zip_file_name)
    148 
    149   def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
    150     extra_args = dict(extra_args or {})
    151 
    152     zip_file = tempfile.NamedTemporaryFile(delete=False)
    153     zip_file_name = zip_file.name
    154 
    155     test_file = tempfile.NamedTemporaryFile(delete=False)
    156     test_file_name = test_file.name
    157 
    158     arcname_large = test_file_name
    159     arcname_small = "bar"
    160 
    161     # File names within an archive strip the leading slash.
    162     if arcname_large[0] == "/":
    163       arcname_large = arcname_large[1:]
    164 
    165     zip_file.close()
    166     zip_file = zipfile.ZipFile(zip_file_name, "w")
    167 
    168     try:
    169       sha1_hash = sha1()
    170       for data in large:
    171         sha1_hash.update(data)
    172         test_file.write(data)
    173       test_file.close()
    174 
    175       expected_stat = os.stat(test_file_name)
    176       expected_mode = 0o644
    177       expected_compress_type = extra_args.get("compress_type",
    178                                               zipfile.ZIP_STORED)
    179       time.sleep(5)  # Make sure the atime/mtime will change measurably.
    180 
    181       common.ZipWrite(zip_file, test_file_name, **extra_args)
    182       common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
    183       common.ZipClose(zip_file)
    184 
    185       # Verify the contents written by ZipWrite().
    186       self._verify(zip_file, zip_file_name, arcname_large,
    187                    sha1_hash.hexdigest(), test_file_name, expected_stat,
    188                    expected_mode, expected_compress_type)
    189 
    190       # Verify the contents written by ZipWriteStr().
    191       self._verify(zip_file, zip_file_name, arcname_small,
    192                    sha1(small).hexdigest(),
    193                    expected_compress_type=expected_compress_type)
    194     finally:
    195       os.remove(zip_file_name)
    196       os.remove(test_file_name)
    197 
    198   def _test_reset_ZIP64_LIMIT(self, func, *args):
    199     default_limit = (1 << 31) - 1
    200     self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
    201     func(*args)
    202     self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
    203 
    204   def test_ZipWrite(self):
    205     file_contents = os.urandom(1024)
    206     self._test_ZipWrite(file_contents)
    207 
    208   def test_ZipWrite_with_opts(self):
    209     file_contents = os.urandom(1024)
    210     self._test_ZipWrite(file_contents, {
    211         "arcname": "foobar",
    212         "perms": 0o777,
    213         "compress_type": zipfile.ZIP_DEFLATED,
    214     })
    215     self._test_ZipWrite(file_contents, {
    216         "arcname": "foobar",
    217         "perms": 0o700,
    218         "compress_type": zipfile.ZIP_STORED,
    219     })
    220 
    221   def test_ZipWrite_large_file(self):
    222     file_contents = get_2gb_string()
    223     self._test_ZipWrite(file_contents, {
    224         "compress_type": zipfile.ZIP_DEFLATED,
    225     })
    226 
    227   def test_ZipWrite_resets_ZIP64_LIMIT(self):
    228     self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
    229 
    230   def test_ZipWriteStr(self):
    231     random_string = os.urandom(1024)
    232     # Passing arcname
    233     self._test_ZipWriteStr("foo", random_string)
    234 
    235     # Passing zinfo
    236     zinfo = zipfile.ZipInfo(filename="foo")
    237     self._test_ZipWriteStr(zinfo, random_string)
    238 
    239     # Timestamp in the zinfo should be overwritten.
    240     zinfo.date_time = (2015, 3, 1, 15, 30, 0)
    241     self._test_ZipWriteStr(zinfo, random_string)
    242 
    243   def test_ZipWriteStr_with_opts(self):
    244     random_string = os.urandom(1024)
    245     # Passing arcname
    246     self._test_ZipWriteStr("foo", random_string, {
    247         "perms": 0o700,
    248         "compress_type": zipfile.ZIP_DEFLATED,
    249     })
    250     self._test_ZipWriteStr("bar", random_string, {
    251         "compress_type": zipfile.ZIP_STORED,
    252     })
    253 
    254     # Passing zinfo
    255     zinfo = zipfile.ZipInfo(filename="foo")
    256     self._test_ZipWriteStr(zinfo, random_string, {
    257         "compress_type": zipfile.ZIP_DEFLATED,
    258     })
    259     self._test_ZipWriteStr(zinfo, random_string, {
    260         "perms": 0o600,
    261         "compress_type": zipfile.ZIP_STORED,
    262     })
    263 
    264   def test_ZipWriteStr_large_file(self):
    265     # zipfile.writestr() doesn't work when the str size is over 2GiB even with
    266     # the workaround. We will only test the case of writing a string into a
    267     # large archive.
    268     long_string = get_2gb_string()
    269     short_string = os.urandom(1024)
    270     self._test_ZipWriteStr_large_file(long_string, short_string, {
    271         "compress_type": zipfile.ZIP_DEFLATED,
    272     })
    273 
    274   def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
    275     self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
    276     zinfo = zipfile.ZipInfo(filename="foo")
    277     self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
    278 
    279   def test_bug21309935(self):
    280     zip_file = tempfile.NamedTemporaryFile(delete=False)
    281     zip_file_name = zip_file.name
    282     zip_file.close()
    283 
    284     try:
    285       random_string = os.urandom(1024)
    286       zip_file = zipfile.ZipFile(zip_file_name, "w")
    287       # Default perms should be 0o644 when passing the filename.
    288       common.ZipWriteStr(zip_file, "foo", random_string)
    289       # Honor the specified perms.
    290       common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
    291       # The perms in zinfo should be untouched.
    292       zinfo = zipfile.ZipInfo(filename="baz")
    293       zinfo.external_attr = 0o740 << 16
    294       common.ZipWriteStr(zip_file, zinfo, random_string)
    295       # Explicitly specified perms has the priority.
    296       zinfo = zipfile.ZipInfo(filename="qux")
    297       zinfo.external_attr = 0o700 << 16
    298       common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
    299       common.ZipClose(zip_file)
    300 
    301       self._verify(zip_file, zip_file_name, "foo",
    302                    sha1(random_string).hexdigest(),
    303                    expected_mode=0o644)
    304       self._verify(zip_file, zip_file_name, "bar",
    305                    sha1(random_string).hexdigest(),
    306                    expected_mode=0o755)
    307       self._verify(zip_file, zip_file_name, "baz",
    308                    sha1(random_string).hexdigest(),
    309                    expected_mode=0o740)
    310       self._verify(zip_file, zip_file_name, "qux",
    311                    sha1(random_string).hexdigest(),
    312                    expected_mode=0o400)
    313     finally:
    314       os.remove(zip_file_name)
    315 
    316   def test_ZipDelete(self):
    317     zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
    318     output_zip = zipfile.ZipFile(zip_file.name, 'w',
    319                                  compression=zipfile.ZIP_DEFLATED)
    320     with tempfile.NamedTemporaryFile() as entry_file:
    321       entry_file.write(os.urandom(1024))
    322       common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
    323       common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
    324       common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
    325       common.ZipClose(output_zip)
    326     zip_file.close()
    327 
    328     try:
    329       common.ZipDelete(zip_file.name, 'Test2')
    330       with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
    331         entries = check_zip.namelist()
    332         self.assertTrue('Test1' in entries)
    333         self.assertFalse('Test2' in entries)
    334         self.assertTrue('Test3' in entries)
    335 
    336       self.assertRaises(AssertionError, common.ZipDelete, zip_file.name,
    337                         'Test2')
    338       with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
    339         entries = check_zip.namelist()
    340         self.assertTrue('Test1' in entries)
    341         self.assertFalse('Test2' in entries)
    342         self.assertTrue('Test3' in entries)
    343 
    344       common.ZipDelete(zip_file.name, ['Test3'])
    345       with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
    346         entries = check_zip.namelist()
    347         self.assertTrue('Test1' in entries)
    348         self.assertFalse('Test2' in entries)
    349         self.assertFalse('Test3' in entries)
    350 
    351       common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
    352       with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
    353         entries = check_zip.namelist()
    354         self.assertFalse('Test1' in entries)
    355         self.assertFalse('Test2' in entries)
    356         self.assertFalse('Test3' in entries)
    357     finally:
    358       os.remove(zip_file.name)
    359 
    360 
    361 class CommonApkUtilsTest(unittest.TestCase):
    362   """Tests the APK utils related functions."""
    363 
    364   APKCERTS_TXT1 = (
    365       'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
    366       ' private_key="certs/devkey.pk8"\n'
    367       'name="Settings.apk"'
    368       ' certificate="build/target/product/security/platform.x509.pem"'
    369       ' private_key="build/target/product/security/platform.pk8"\n'
    370       'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
    371   )
    372 
    373   APKCERTS_CERTMAP1 = {
    374       'RecoveryLocalizer.apk' : 'certs/devkey',
    375       'Settings.apk' : 'build/target/product/security/platform',
    376       'TV.apk' : 'PRESIGNED',
    377   }
    378 
    379   APKCERTS_TXT2 = (
    380       'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
    381       ' private_key="certs/compressed1.pk8" compressed="gz"\n'
    382       'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
    383       ' private_key="certs/compressed2.pk8" compressed="gz"\n'
    384       'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
    385       ' private_key="certs/compressed2.pk8" compressed="gz"\n'
    386       'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
    387       ' private_key="certs/compressed3.pk8" compressed="gz"\n'
    388   )
    389 
    390   APKCERTS_CERTMAP2 = {
    391       'Compressed1.apk' : 'certs/compressed1',
    392       'Compressed2a.apk' : 'certs/compressed2',
    393       'Compressed2b.apk' : 'certs/compressed2',
    394       'Compressed3.apk' : 'certs/compressed3',
    395   }
    396 
    397   APKCERTS_TXT3 = (
    398       'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
    399       ' private_key="certs/compressed4.pk8" compressed="xz"\n'
    400   )
    401 
    402   APKCERTS_CERTMAP3 = {
    403       'Compressed4.apk' : 'certs/compressed4',
    404   }
    405 
    406   def setUp(self):
    407     self.testdata_dir = test_utils.get_testdata_dir()
    408 
    409   def tearDown(self):
    410     common.Cleanup()
    411 
    412   @staticmethod
    413   def _write_apkcerts_txt(apkcerts_txt, additional=None):
    414     if additional is None:
    415       additional = []
    416     target_files = common.MakeTempFile(suffix='.zip')
    417     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    418       target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
    419       for entry in additional:
    420         target_files_zip.writestr(entry, '')
    421     return target_files
    422 
    423   def test_ReadApkCerts_NoncompressedApks(self):
    424     target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
    425     with zipfile.ZipFile(target_files, 'r') as input_zip:
    426       certmap, ext = common.ReadApkCerts(input_zip)
    427 
    428     self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
    429     self.assertIsNone(ext)
    430 
    431   def test_ReadApkCerts_CompressedApks(self):
    432     # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
    433     # not stored in '.gz' format, so it shouldn't be considered as installed.
    434     target_files = self._write_apkcerts_txt(
    435         self.APKCERTS_TXT2,
    436         ['Compressed1.apk.gz', 'Compressed3.apk'])
    437 
    438     with zipfile.ZipFile(target_files, 'r') as input_zip:
    439       certmap, ext = common.ReadApkCerts(input_zip)
    440 
    441     self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
    442     self.assertEqual('.gz', ext)
    443 
    444     # Alternative case with '.xz'.
    445     target_files = self._write_apkcerts_txt(
    446         self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
    447 
    448     with zipfile.ZipFile(target_files, 'r') as input_zip:
    449       certmap, ext = common.ReadApkCerts(input_zip)
    450 
    451     self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
    452     self.assertEqual('.xz', ext)
    453 
    454   def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
    455     target_files = self._write_apkcerts_txt(
    456         self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
    457         ['Compressed1.apk.gz', 'Compressed3.apk'])
    458 
    459     with zipfile.ZipFile(target_files, 'r') as input_zip:
    460       certmap, ext = common.ReadApkCerts(input_zip)
    461 
    462     certmap_merged = self.APKCERTS_CERTMAP1.copy()
    463     certmap_merged.update(self.APKCERTS_CERTMAP2)
    464     self.assertDictEqual(certmap_merged, certmap)
    465     self.assertEqual('.gz', ext)
    466 
    467   def test_ReadApkCerts_MultipleCompressionMethods(self):
    468     target_files = self._write_apkcerts_txt(
    469         self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
    470         ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
    471 
    472     with zipfile.ZipFile(target_files, 'r') as input_zip:
    473       self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
    474 
    475   def test_ReadApkCerts_MismatchingKeys(self):
    476     malformed_apkcerts_txt = (
    477         'name="App1.apk" certificate="certs/cert1.x509.pem"'
    478         ' private_key="certs/cert2.pk8"\n'
    479     )
    480     target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
    481 
    482     with zipfile.ZipFile(target_files, 'r') as input_zip:
    483       self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
    484 
    485   def test_ExtractPublicKey(self):
    486     cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
    487     pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
    488     with open(pubkey, 'rb') as pubkey_fp:
    489       self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
    490 
    491   def test_ExtractPublicKey_invalidInput(self):
    492     wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
    493     self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
    494 
    495   def test_ParseCertificate(self):
    496     cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
    497 
    498     cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
    499     proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    500     expected, _ = proc.communicate()
    501     self.assertEqual(0, proc.returncode)
    502 
    503     with open(cert) as cert_fp:
    504       actual = common.ParseCertificate(cert_fp.read())
    505     self.assertEqual(expected, actual)
    506 
    507 
    508 class CommonUtilsTest(unittest.TestCase):
    509 
    510   def tearDown(self):
    511     common.Cleanup()
    512 
    513   def test_GetSparseImage_emptyBlockMapFile(self):
    514     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    515     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    516       target_files_zip.write(
    517           test_utils.construct_sparse_image([
    518               (0xCAC1, 6),
    519               (0xCAC3, 3),
    520               (0xCAC1, 4)]),
    521           arcname='IMAGES/system.img')
    522       target_files_zip.writestr('IMAGES/system.map', '')
    523       target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
    524       target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
    525 
    526     tempdir = common.UnzipTemp(target_files)
    527     with zipfile.ZipFile(target_files, 'r') as input_zip:
    528       sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
    529 
    530     self.assertDictEqual(
    531         {
    532             '__COPY': RangeSet("0"),
    533             '__NONZERO-0': RangeSet("1-5 9-12"),
    534         },
    535         sparse_image.file_map)
    536 
    537   def test_GetSparseImage_invalidImageName(self):
    538     self.assertRaises(
    539         AssertionError, common.GetSparseImage, 'system2', None, None, False)
    540     self.assertRaises(
    541         AssertionError, common.GetSparseImage, 'unknown', None, None, False)
    542 
    543   def test_GetSparseImage_missingBlockMapFile(self):
    544     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    545     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    546       target_files_zip.write(
    547           test_utils.construct_sparse_image([
    548               (0xCAC1, 6),
    549               (0xCAC3, 3),
    550               (0xCAC1, 4)]),
    551           arcname='IMAGES/system.img')
    552       target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
    553       target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
    554 
    555     tempdir = common.UnzipTemp(target_files)
    556     with zipfile.ZipFile(target_files, 'r') as input_zip:
    557       self.assertRaises(
    558           AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
    559           False)
    560 
    561   def test_GetSparseImage_sharedBlocks_notAllowed(self):
    562     """Tests the case of having overlapping blocks but disallowed."""
    563     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    564     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    565       target_files_zip.write(
    566           test_utils.construct_sparse_image([(0xCAC2, 16)]),
    567           arcname='IMAGES/system.img')
    568       # Block 10 is shared between two files.
    569       target_files_zip.writestr(
    570           'IMAGES/system.map',
    571           '\n'.join([
    572               '/system/file1 1-5 9-10',
    573               '/system/file2 10-12']))
    574       target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
    575       target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
    576 
    577     tempdir = common.UnzipTemp(target_files)
    578     with zipfile.ZipFile(target_files, 'r') as input_zip:
    579       self.assertRaises(
    580           AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
    581           False)
    582 
    583   def test_GetSparseImage_sharedBlocks_allowed(self):
    584     """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
    585     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    586     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    587       # Construct an image with a care_map of "0-5 9-12".
    588       target_files_zip.write(
    589           test_utils.construct_sparse_image([(0xCAC2, 16)]),
    590           arcname='IMAGES/system.img')
    591       # Block 10 is shared between two files.
    592       target_files_zip.writestr(
    593           'IMAGES/system.map',
    594           '\n'.join([
    595               '/system/file1 1-5 9-10',
    596               '/system/file2 10-12']))
    597       target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
    598       target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
    599 
    600     tempdir = common.UnzipTemp(target_files)
    601     with zipfile.ZipFile(target_files, 'r') as input_zip:
    602       sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
    603 
    604     self.assertDictEqual(
    605         {
    606             '__COPY': RangeSet("0"),
    607             '__NONZERO-0': RangeSet("6-8 13-15"),
    608             '/system/file1': RangeSet("1-5 9-10"),
    609             '/system/file2': RangeSet("11-12"),
    610         },
    611         sparse_image.file_map)
    612 
    613     # '/system/file2' should be marked with 'uses_shared_blocks', but not with
    614     # 'incomplete'.
    615     self.assertTrue(
    616         sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
    617     self.assertNotIn(
    618         'incomplete', sparse_image.file_map['/system/file2'].extra)
    619 
    620     # All other entries should look normal without any tags.
    621     self.assertFalse(sparse_image.file_map['__COPY'].extra)
    622     self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
    623     self.assertFalse(sparse_image.file_map['/system/file1'].extra)
    624 
    625   def test_GetSparseImage_incompleteRanges(self):
    626     """Tests the case of ext4 images with holes."""
    627     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    628     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    629       target_files_zip.write(
    630           test_utils.construct_sparse_image([(0xCAC2, 16)]),
    631           arcname='IMAGES/system.img')
    632       target_files_zip.writestr(
    633           'IMAGES/system.map',
    634           '\n'.join([
    635               '/system/file1 1-5 9-10',
    636               '/system/file2 11-12']))
    637       target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
    638       # '/system/file2' has less blocks listed (2) than actual (3).
    639       target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
    640 
    641     tempdir = common.UnzipTemp(target_files)
    642     with zipfile.ZipFile(target_files, 'r') as input_zip:
    643       sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
    644 
    645     self.assertFalse(sparse_image.file_map['/system/file1'].extra)
    646     self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
    647 
    648 
    649 class InstallRecoveryScriptFormatTest(unittest.TestCase):
    650   """Checks the format of install-recovery.sh.
    651 
    652   Its format should match between common.py and validate_target_files.py.
    653   """
    654 
    655   def setUp(self):
    656     self._tempdir = common.MakeTempDir()
    657     # Create a dummy dict that contains the fstab info for boot&recovery.
    658     self._info = {"fstab" : {}}
    659     dummy_fstab = [
    660         "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
    661         "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
    662     self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
    663     # Construct the gzipped recovery.img and boot.img
    664     self.recovery_data = bytearray([
    665         0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
    666         0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
    667         0x08, 0x00, 0x00, 0x00
    668     ])
    669     # echo -n "boot" | gzip -f | hd
    670     self.boot_data = bytearray([
    671         0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
    672         0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
    673     ])
    674 
    675   def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
    676     loc = os.path.join(self._tempdir, prefix, name)
    677     if not os.path.exists(os.path.dirname(loc)):
    678       os.makedirs(os.path.dirname(loc))
    679     with open(loc, "w+") as f:
    680       f.write(data)
    681 
    682   def test_full_recovery(self):
    683     recovery_image = common.File("recovery.img", self.recovery_data)
    684     boot_image = common.File("boot.img", self.boot_data)
    685     self._info["full_recovery_image"] = "true"
    686 
    687     common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
    688                              recovery_image, boot_image, self._info)
    689     validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
    690                                                         self._info)
    691 
    692   def test_recovery_from_boot(self):
    693     recovery_image = common.File("recovery.img", self.recovery_data)
    694     self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
    695     boot_image = common.File("boot.img", self.boot_data)
    696     self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
    697 
    698     common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
    699                              recovery_image, boot_image, self._info)
    700     validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
    701                                                         self._info)
    702     # Validate 'recovery-from-boot' with bonus argument.
    703     self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
    704     common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
    705                              recovery_image, boot_image, self._info)
    706     validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
    707                                                         self._info)
    708 
    709   def tearDown(self):
    710     common.Cleanup()
    711