Home | History | Annotate | Download | only in releasetools
      1 #
      2 # Copyright (C) 2015 The Android Open Source Project
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #      http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 #
     16 
     17 import copy
     18 import os
     19 import subprocess
     20 import tempfile
     21 import time
     22 import zipfile
     23 from hashlib import sha1
     24 
     25 import common
     26 import test_utils
     27 import validate_target_files
     28 from rangelib import RangeSet
     29 
     30 from blockimgdiff import EmptyImage, DataImage
     31 
     32 KiB = 1024
     33 MiB = 1024 * KiB
     34 GiB = 1024 * MiB
     35 
     36 
     37 def get_2gb_string():
     38   size = int(2 * GiB + 1)
     39   block_size = 4 * KiB
     40   step_size = 4 * MiB
     41   # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
     42   for _ in range(0, size, step_size):
     43     yield os.urandom(block_size)
     44     yield '\0' * (step_size - block_size)
     45 
     46 
     47 class CommonZipTest(test_utils.ReleaseToolsTestCase):
     48 
     49   def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
     50               test_file_name=None, expected_stat=None, expected_mode=0o644,
     51               expected_compress_type=zipfile.ZIP_STORED):
     52     # Verify the stat if present.
     53     if test_file_name is not None:
     54       new_stat = os.stat(test_file_name)
     55       self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
     56       self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
     57 
     58     # Reopen the zip file to verify.
     59     zip_file = zipfile.ZipFile(zip_file_name, "r")
     60 
     61     # Verify the timestamp.
     62     info = zip_file.getinfo(arcname)
     63     self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
     64 
     65     # Verify the file mode.
     66     mode = (info.external_attr >> 16) & 0o777
     67     self.assertEqual(mode, expected_mode)
     68 
     69     # Verify the compress type.
     70     self.assertEqual(info.compress_type, expected_compress_type)
     71 
     72     # Verify the zip contents.
     73     entry = zip_file.open(arcname)
     74     sha1_hash = sha1()
     75     for chunk in iter(lambda: entry.read(4 * MiB), ''):
     76       sha1_hash.update(chunk)
     77     self.assertEqual(expected_hash, sha1_hash.hexdigest())
     78     self.assertIsNone(zip_file.testzip())
     79 
     80   def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
     81     extra_zipwrite_args = dict(extra_zipwrite_args or {})
     82 
     83     test_file = tempfile.NamedTemporaryFile(delete=False)
     84     test_file_name = test_file.name
     85 
     86     zip_file = tempfile.NamedTemporaryFile(delete=False)
     87     zip_file_name = zip_file.name
     88 
     89     # File names within an archive strip the leading slash.
     90     arcname = extra_zipwrite_args.get("arcname", test_file_name)
     91     if arcname[0] == "/":
     92       arcname = arcname[1:]
     93 
     94     zip_file.close()
     95     zip_file = zipfile.ZipFile(zip_file_name, "w")
     96 
     97     try:
     98       sha1_hash = sha1()
     99       for data in contents:
    100         sha1_hash.update(data)
    101         test_file.write(data)
    102       test_file.close()
    103 
    104       expected_stat = os.stat(test_file_name)
    105       expected_mode = extra_zipwrite_args.get("perms", 0o644)
    106       expected_compress_type = extra_zipwrite_args.get("compress_type",
    107                                                        zipfile.ZIP_STORED)
    108       time.sleep(5)  # Make sure the atime/mtime will change measurably.
    109 
    110       common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
    111       common.ZipClose(zip_file)
    112 
    113       self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
    114                    test_file_name, expected_stat, expected_mode,
    115                    expected_compress_type)
    116     finally:
    117       os.remove(test_file_name)
    118       os.remove(zip_file_name)
    119 
    120   def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
    121     extra_args = dict(extra_args or {})
    122 
    123     zip_file = tempfile.NamedTemporaryFile(delete=False)
    124     zip_file_name = zip_file.name
    125     zip_file.close()
    126 
    127     zip_file = zipfile.ZipFile(zip_file_name, "w")
    128 
    129     try:
    130       expected_compress_type = extra_args.get("compress_type",
    131                                               zipfile.ZIP_STORED)
    132       time.sleep(5)  # Make sure the atime/mtime will change measurably.
    133 
    134       if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
    135         arcname = zinfo_or_arcname
    136         expected_mode = extra_args.get("perms", 0o644)
    137       else:
    138         arcname = zinfo_or_arcname.filename
    139         expected_mode = extra_args.get("perms",
    140                                        zinfo_or_arcname.external_attr >> 16)
    141 
    142       common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
    143       common.ZipClose(zip_file)
    144 
    145       self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
    146                    expected_mode=expected_mode,
    147                    expected_compress_type=expected_compress_type)
    148     finally:
    149       os.remove(zip_file_name)
    150 
    151   def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
    152     extra_args = dict(extra_args or {})
    153 
    154     zip_file = tempfile.NamedTemporaryFile(delete=False)
    155     zip_file_name = zip_file.name
    156 
    157     test_file = tempfile.NamedTemporaryFile(delete=False)
    158     test_file_name = test_file.name
    159 
    160     arcname_large = test_file_name
    161     arcname_small = "bar"
    162 
    163     # File names within an archive strip the leading slash.
    164     if arcname_large[0] == "/":
    165       arcname_large = arcname_large[1:]
    166 
    167     zip_file.close()
    168     zip_file = zipfile.ZipFile(zip_file_name, "w")
    169 
    170     try:
    171       sha1_hash = sha1()
    172       for data in large:
    173         sha1_hash.update(data)
    174         test_file.write(data)
    175       test_file.close()
    176 
    177       expected_stat = os.stat(test_file_name)
    178       expected_mode = 0o644
    179       expected_compress_type = extra_args.get("compress_type",
    180                                               zipfile.ZIP_STORED)
    181       time.sleep(5)  # Make sure the atime/mtime will change measurably.
    182 
    183       common.ZipWrite(zip_file, test_file_name, **extra_args)
    184       common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
    185       common.ZipClose(zip_file)
    186 
    187       # Verify the contents written by ZipWrite().
    188       self._verify(zip_file, zip_file_name, arcname_large,
    189                    sha1_hash.hexdigest(), test_file_name, expected_stat,
    190                    expected_mode, expected_compress_type)
    191 
    192       # Verify the contents written by ZipWriteStr().
    193       self._verify(zip_file, zip_file_name, arcname_small,
    194                    sha1(small).hexdigest(),
    195                    expected_compress_type=expected_compress_type)
    196     finally:
    197       os.remove(zip_file_name)
    198       os.remove(test_file_name)
    199 
    200   def _test_reset_ZIP64_LIMIT(self, func, *args):
    201     default_limit = (1 << 31) - 1
    202     self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
    203     func(*args)
    204     self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
    205 
    206   def test_ZipWrite(self):
    207     file_contents = os.urandom(1024)
    208     self._test_ZipWrite(file_contents)
    209 
    210   def test_ZipWrite_with_opts(self):
    211     file_contents = os.urandom(1024)
    212     self._test_ZipWrite(file_contents, {
    213         "arcname": "foobar",
    214         "perms": 0o777,
    215         "compress_type": zipfile.ZIP_DEFLATED,
    216     })
    217     self._test_ZipWrite(file_contents, {
    218         "arcname": "foobar",
    219         "perms": 0o700,
    220         "compress_type": zipfile.ZIP_STORED,
    221     })
    222 
    223   def test_ZipWrite_large_file(self):
    224     file_contents = get_2gb_string()
    225     self._test_ZipWrite(file_contents, {
    226         "compress_type": zipfile.ZIP_DEFLATED,
    227     })
    228 
    229   def test_ZipWrite_resets_ZIP64_LIMIT(self):
    230     self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
    231 
    232   def test_ZipWriteStr(self):
    233     random_string = os.urandom(1024)
    234     # Passing arcname
    235     self._test_ZipWriteStr("foo", random_string)
    236 
    237     # Passing zinfo
    238     zinfo = zipfile.ZipInfo(filename="foo")
    239     self._test_ZipWriteStr(zinfo, random_string)
    240 
    241     # Timestamp in the zinfo should be overwritten.
    242     zinfo.date_time = (2015, 3, 1, 15, 30, 0)
    243     self._test_ZipWriteStr(zinfo, random_string)
    244 
    245   def test_ZipWriteStr_with_opts(self):
    246     random_string = os.urandom(1024)
    247     # Passing arcname
    248     self._test_ZipWriteStr("foo", random_string, {
    249         "perms": 0o700,
    250         "compress_type": zipfile.ZIP_DEFLATED,
    251     })
    252     self._test_ZipWriteStr("bar", random_string, {
    253         "compress_type": zipfile.ZIP_STORED,
    254     })
    255 
    256     # Passing zinfo
    257     zinfo = zipfile.ZipInfo(filename="foo")
    258     self._test_ZipWriteStr(zinfo, random_string, {
    259         "compress_type": zipfile.ZIP_DEFLATED,
    260     })
    261     self._test_ZipWriteStr(zinfo, random_string, {
    262         "perms": 0o600,
    263         "compress_type": zipfile.ZIP_STORED,
    264     })
    265 
    266   def test_ZipWriteStr_large_file(self):
    267     # zipfile.writestr() doesn't work when the str size is over 2GiB even with
    268     # the workaround. We will only test the case of writing a string into a
    269     # large archive.
    270     long_string = get_2gb_string()
    271     short_string = os.urandom(1024)
    272     self._test_ZipWriteStr_large_file(long_string, short_string, {
    273         "compress_type": zipfile.ZIP_DEFLATED,
    274     })
    275 
    276   def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
    277     self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
    278     zinfo = zipfile.ZipInfo(filename="foo")
    279     self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
    280 
    281   def test_bug21309935(self):
    282     zip_file = tempfile.NamedTemporaryFile(delete=False)
    283     zip_file_name = zip_file.name
    284     zip_file.close()
    285 
    286     try:
    287       random_string = os.urandom(1024)
    288       zip_file = zipfile.ZipFile(zip_file_name, "w")
    289       # Default perms should be 0o644 when passing the filename.
    290       common.ZipWriteStr(zip_file, "foo", random_string)
    291       # Honor the specified perms.
    292       common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
    293       # The perms in zinfo should be untouched.
    294       zinfo = zipfile.ZipInfo(filename="baz")
    295       zinfo.external_attr = 0o740 << 16
    296       common.ZipWriteStr(zip_file, zinfo, random_string)
    297       # Explicitly specified perms has the priority.
    298       zinfo = zipfile.ZipInfo(filename="qux")
    299       zinfo.external_attr = 0o700 << 16
    300       common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
    301       common.ZipClose(zip_file)
    302 
    303       self._verify(zip_file, zip_file_name, "foo",
    304                    sha1(random_string).hexdigest(),
    305                    expected_mode=0o644)
    306       self._verify(zip_file, zip_file_name, "bar",
    307                    sha1(random_string).hexdigest(),
    308                    expected_mode=0o755)
    309       self._verify(zip_file, zip_file_name, "baz",
    310                    sha1(random_string).hexdigest(),
    311                    expected_mode=0o740)
    312       self._verify(zip_file, zip_file_name, "qux",
    313                    sha1(random_string).hexdigest(),
    314                    expected_mode=0o400)
    315     finally:
    316       os.remove(zip_file_name)
    317 
    318   def test_ZipDelete(self):
    319     zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
    320     output_zip = zipfile.ZipFile(zip_file.name, 'w',
    321                                  compression=zipfile.ZIP_DEFLATED)
    322     with tempfile.NamedTemporaryFile() as entry_file:
    323       entry_file.write(os.urandom(1024))
    324       common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
    325       common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
    326       common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
    327       common.ZipClose(output_zip)
    328     zip_file.close()
    329 
    330     try:
    331       common.ZipDelete(zip_file.name, 'Test2')
    332       with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
    333         entries = check_zip.namelist()
    334         self.assertTrue('Test1' in entries)
    335         self.assertFalse('Test2' in entries)
    336         self.assertTrue('Test3' in entries)
    337 
    338       self.assertRaises(
    339           common.ExternalError, common.ZipDelete, zip_file.name, 'Test2')
    340       with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
    341         entries = check_zip.namelist()
    342         self.assertTrue('Test1' in entries)
    343         self.assertFalse('Test2' in entries)
    344         self.assertTrue('Test3' in entries)
    345 
    346       common.ZipDelete(zip_file.name, ['Test3'])
    347       with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
    348         entries = check_zip.namelist()
    349         self.assertTrue('Test1' in entries)
    350         self.assertFalse('Test2' in entries)
    351         self.assertFalse('Test3' in entries)
    352 
    353       common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
    354       with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
    355         entries = check_zip.namelist()
    356         self.assertFalse('Test1' in entries)
    357         self.assertFalse('Test2' in entries)
    358         self.assertFalse('Test3' in entries)
    359     finally:
    360       os.remove(zip_file.name)
    361 
    362   @staticmethod
    363   def _test_UnzipTemp_createZipFile():
    364     zip_file = common.MakeTempFile(suffix='.zip')
    365     output_zip = zipfile.ZipFile(
    366         zip_file, 'w', compression=zipfile.ZIP_DEFLATED)
    367     contents = os.urandom(1024)
    368     with tempfile.NamedTemporaryFile() as entry_file:
    369       entry_file.write(contents)
    370       common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
    371       common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
    372       common.ZipWrite(output_zip, entry_file.name, arcname='Foo3')
    373       common.ZipWrite(output_zip, entry_file.name, arcname='Bar4')
    374       common.ZipWrite(output_zip, entry_file.name, arcname='Dir5/Baz5')
    375       common.ZipClose(output_zip)
    376     common.ZipClose(output_zip)
    377     return zip_file
    378 
    379   def test_UnzipTemp(self):
    380     zip_file = self._test_UnzipTemp_createZipFile()
    381     unzipped_dir = common.UnzipTemp(zip_file)
    382     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
    383     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
    384     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
    385     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
    386     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
    387 
    388   def test_UnzipTemp_withPatterns(self):
    389     zip_file = self._test_UnzipTemp_createZipFile()
    390 
    391     unzipped_dir = common.UnzipTemp(zip_file, ['Test1'])
    392     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
    393     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
    394     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
    395     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
    396     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
    397 
    398     unzipped_dir = common.UnzipTemp(zip_file, ['Test1', 'Foo3'])
    399     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
    400     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
    401     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
    402     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
    403     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
    404 
    405     unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Foo3*'])
    406     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
    407     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
    408     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
    409     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
    410     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
    411 
    412     unzipped_dir = common.UnzipTemp(zip_file, ['*Test1', '*Baz*'])
    413     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
    414     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
    415     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
    416     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
    417     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
    418 
    419   def test_UnzipTemp_withEmptyPatterns(self):
    420     zip_file = self._test_UnzipTemp_createZipFile()
    421     unzipped_dir = common.UnzipTemp(zip_file, [])
    422     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
    423     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
    424     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
    425     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
    426     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
    427 
    428   def test_UnzipTemp_withPartiallyMatchingPatterns(self):
    429     zip_file = self._test_UnzipTemp_createZipFile()
    430     unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Nonexistent*'])
    431     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
    432     self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
    433     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
    434     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
    435     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
    436 
    437   def test_UnzipTemp_withNoMatchingPatterns(self):
    438     zip_file = self._test_UnzipTemp_createZipFile()
    439     unzipped_dir = common.UnzipTemp(zip_file, ['Foo4', 'Nonexistent*'])
    440     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
    441     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
    442     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
    443     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
    444     self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
    445 
    446 
    447 class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase):
    448   """Tests the APK utils related functions."""
    449 
    450   APKCERTS_TXT1 = (
    451       'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
    452       ' private_key="certs/devkey.pk8"\n'
    453       'name="Settings.apk"'
    454       ' certificate="build/target/product/security/platform.x509.pem"'
    455       ' private_key="build/target/product/security/platform.pk8"\n'
    456       'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
    457   )
    458 
    459   APKCERTS_CERTMAP1 = {
    460       'RecoveryLocalizer.apk' : 'certs/devkey',
    461       'Settings.apk' : 'build/target/product/security/platform',
    462       'TV.apk' : 'PRESIGNED',
    463   }
    464 
    465   APKCERTS_TXT2 = (
    466       'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
    467       ' private_key="certs/compressed1.pk8" compressed="gz"\n'
    468       'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
    469       ' private_key="certs/compressed2.pk8" compressed="gz"\n'
    470       'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
    471       ' private_key="certs/compressed2.pk8" compressed="gz"\n'
    472       'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
    473       ' private_key="certs/compressed3.pk8" compressed="gz"\n'
    474   )
    475 
    476   APKCERTS_CERTMAP2 = {
    477       'Compressed1.apk' : 'certs/compressed1',
    478       'Compressed2a.apk' : 'certs/compressed2',
    479       'Compressed2b.apk' : 'certs/compressed2',
    480       'Compressed3.apk' : 'certs/compressed3',
    481   }
    482 
    483   APKCERTS_TXT3 = (
    484       'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
    485       ' private_key="certs/compressed4.pk8" compressed="xz"\n'
    486   )
    487 
    488   APKCERTS_CERTMAP3 = {
    489       'Compressed4.apk' : 'certs/compressed4',
    490   }
    491 
    492   def setUp(self):
    493     self.testdata_dir = test_utils.get_testdata_dir()
    494 
    495   @staticmethod
    496   def _write_apkcerts_txt(apkcerts_txt, additional=None):
    497     if additional is None:
    498       additional = []
    499     target_files = common.MakeTempFile(suffix='.zip')
    500     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    501       target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
    502       for entry in additional:
    503         target_files_zip.writestr(entry, '')
    504     return target_files
    505 
    506   def test_ReadApkCerts_NoncompressedApks(self):
    507     target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
    508     with zipfile.ZipFile(target_files, 'r') as input_zip:
    509       certmap, ext = common.ReadApkCerts(input_zip)
    510 
    511     self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
    512     self.assertIsNone(ext)
    513 
    514   def test_ReadApkCerts_CompressedApks(self):
    515     # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
    516     # not stored in '.gz' format, so it shouldn't be considered as installed.
    517     target_files = self._write_apkcerts_txt(
    518         self.APKCERTS_TXT2,
    519         ['Compressed1.apk.gz', 'Compressed3.apk'])
    520 
    521     with zipfile.ZipFile(target_files, 'r') as input_zip:
    522       certmap, ext = common.ReadApkCerts(input_zip)
    523 
    524     self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
    525     self.assertEqual('.gz', ext)
    526 
    527     # Alternative case with '.xz'.
    528     target_files = self._write_apkcerts_txt(
    529         self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
    530 
    531     with zipfile.ZipFile(target_files, 'r') as input_zip:
    532       certmap, ext = common.ReadApkCerts(input_zip)
    533 
    534     self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
    535     self.assertEqual('.xz', ext)
    536 
    537   def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
    538     target_files = self._write_apkcerts_txt(
    539         self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
    540         ['Compressed1.apk.gz', 'Compressed3.apk'])
    541 
    542     with zipfile.ZipFile(target_files, 'r') as input_zip:
    543       certmap, ext = common.ReadApkCerts(input_zip)
    544 
    545     certmap_merged = self.APKCERTS_CERTMAP1.copy()
    546     certmap_merged.update(self.APKCERTS_CERTMAP2)
    547     self.assertDictEqual(certmap_merged, certmap)
    548     self.assertEqual('.gz', ext)
    549 
    550   def test_ReadApkCerts_MultipleCompressionMethods(self):
    551     target_files = self._write_apkcerts_txt(
    552         self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
    553         ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
    554 
    555     with zipfile.ZipFile(target_files, 'r') as input_zip:
    556       self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
    557 
    558   def test_ReadApkCerts_MismatchingKeys(self):
    559     malformed_apkcerts_txt = (
    560         'name="App1.apk" certificate="certs/cert1.x509.pem"'
    561         ' private_key="certs/cert2.pk8"\n'
    562     )
    563     target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
    564 
    565     with zipfile.ZipFile(target_files, 'r') as input_zip:
    566       self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
    567 
    568   def test_ExtractPublicKey(self):
    569     cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
    570     pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
    571     with open(pubkey, 'rb') as pubkey_fp:
    572       self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
    573 
    574   def test_ExtractPublicKey_invalidInput(self):
    575     wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
    576     self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
    577 
    578   def test_ExtractAvbPublicKey(self):
    579     privkey = os.path.join(self.testdata_dir, 'testkey.key')
    580     pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
    581     with open(common.ExtractAvbPublicKey(privkey)) as privkey_fp, \
    582         open(common.ExtractAvbPublicKey(pubkey)) as pubkey_fp:
    583       self.assertEqual(privkey_fp.read(), pubkey_fp.read())
    584 
    585   def test_ParseCertificate(self):
    586     cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
    587 
    588     cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
    589     proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    590     expected, _ = proc.communicate()
    591     self.assertEqual(0, proc.returncode)
    592 
    593     with open(cert) as cert_fp:
    594       actual = common.ParseCertificate(cert_fp.read())
    595     self.assertEqual(expected, actual)
    596 
    597   def test_GetMinSdkVersion(self):
    598     test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
    599     self.assertEqual('24', common.GetMinSdkVersion(test_app))
    600 
    601   def test_GetMinSdkVersion_invalidInput(self):
    602     self.assertRaises(
    603         common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk')
    604 
    605   def test_GetMinSdkVersionInt(self):
    606     test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
    607     self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {}))
    608 
    609   def test_GetMinSdkVersionInt_invalidInput(self):
    610     self.assertRaises(
    611         common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk',
    612         {})
    613 
    614 
    615 class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
    616 
    617   def setUp(self):
    618     self.testdata_dir = test_utils.get_testdata_dir()
    619 
    620   def test_GetSparseImage_emptyBlockMapFile(self):
    621     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    622     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    623       target_files_zip.write(
    624           test_utils.construct_sparse_image([
    625               (0xCAC1, 6),
    626               (0xCAC3, 3),
    627               (0xCAC1, 4)]),
    628           arcname='IMAGES/system.img')
    629       target_files_zip.writestr('IMAGES/system.map', '')
    630       target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
    631       target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
    632 
    633     tempdir = common.UnzipTemp(target_files)
    634     with zipfile.ZipFile(target_files, 'r') as input_zip:
    635       sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
    636 
    637     self.assertDictEqual(
    638         {
    639             '__COPY': RangeSet("0"),
    640             '__NONZERO-0': RangeSet("1-5 9-12"),
    641         },
    642         sparse_image.file_map)
    643 
    644   def test_GetSparseImage_missingImageFile(self):
    645     self.assertRaises(
    646         AssertionError, common.GetSparseImage, 'system2', self.testdata_dir,
    647         None, False)
    648     self.assertRaises(
    649         AssertionError, common.GetSparseImage, 'unknown', self.testdata_dir,
    650         None, False)
    651 
    652   def test_GetSparseImage_missingBlockMapFile(self):
    653     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    654     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    655       target_files_zip.write(
    656           test_utils.construct_sparse_image([
    657               (0xCAC1, 6),
    658               (0xCAC3, 3),
    659               (0xCAC1, 4)]),
    660           arcname='IMAGES/system.img')
    661       target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
    662       target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
    663 
    664     tempdir = common.UnzipTemp(target_files)
    665     with zipfile.ZipFile(target_files, 'r') as input_zip:
    666       self.assertRaises(
    667           AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
    668           False)
    669 
    670   def test_GetSparseImage_sharedBlocks_notAllowed(self):
    671     """Tests the case of having overlapping blocks but disallowed."""
    672     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    673     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    674       target_files_zip.write(
    675           test_utils.construct_sparse_image([(0xCAC2, 16)]),
    676           arcname='IMAGES/system.img')
    677       # Block 10 is shared between two files.
    678       target_files_zip.writestr(
    679           'IMAGES/system.map',
    680           '\n'.join([
    681               '/system/file1 1-5 9-10',
    682               '/system/file2 10-12']))
    683       target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
    684       target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
    685 
    686     tempdir = common.UnzipTemp(target_files)
    687     with zipfile.ZipFile(target_files, 'r') as input_zip:
    688       self.assertRaises(
    689           AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
    690           False)
    691 
    692   def test_GetSparseImage_sharedBlocks_allowed(self):
    693     """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
    694     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    695     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    696       # Construct an image with a care_map of "0-5 9-12".
    697       target_files_zip.write(
    698           test_utils.construct_sparse_image([(0xCAC2, 16)]),
    699           arcname='IMAGES/system.img')
    700       # Block 10 is shared between two files.
    701       target_files_zip.writestr(
    702           'IMAGES/system.map',
    703           '\n'.join([
    704               '/system/file1 1-5 9-10',
    705               '/system/file2 10-12']))
    706       target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
    707       target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
    708 
    709     tempdir = common.UnzipTemp(target_files)
    710     with zipfile.ZipFile(target_files, 'r') as input_zip:
    711       sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
    712 
    713     self.assertDictEqual(
    714         {
    715             '__COPY': RangeSet("0"),
    716             '__NONZERO-0': RangeSet("6-8 13-15"),
    717             '/system/file1': RangeSet("1-5 9-10"),
    718             '/system/file2': RangeSet("11-12"),
    719         },
    720         sparse_image.file_map)
    721 
    722     # '/system/file2' should be marked with 'uses_shared_blocks', but not with
    723     # 'incomplete'.
    724     self.assertTrue(
    725         sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
    726     self.assertNotIn(
    727         'incomplete', sparse_image.file_map['/system/file2'].extra)
    728 
    729     # All other entries should look normal without any tags.
    730     self.assertFalse(sparse_image.file_map['__COPY'].extra)
    731     self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
    732     self.assertFalse(sparse_image.file_map['/system/file1'].extra)
    733 
    734   def test_GetSparseImage_incompleteRanges(self):
    735     """Tests the case of ext4 images with holes."""
    736     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    737     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    738       target_files_zip.write(
    739           test_utils.construct_sparse_image([(0xCAC2, 16)]),
    740           arcname='IMAGES/system.img')
    741       target_files_zip.writestr(
    742           'IMAGES/system.map',
    743           '\n'.join([
    744               '/system/file1 1-5 9-10',
    745               '/system/file2 11-12']))
    746       target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
    747       # '/system/file2' has less blocks listed (2) than actual (3).
    748       target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
    749 
    750     tempdir = common.UnzipTemp(target_files)
    751     with zipfile.ZipFile(target_files, 'r') as input_zip:
    752       sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
    753 
    754     self.assertFalse(sparse_image.file_map['/system/file1'].extra)
    755     self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
    756 
    757   def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self):
    758     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    759     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    760       target_files_zip.write(
    761           test_utils.construct_sparse_image([(0xCAC2, 16)]),
    762           arcname='IMAGES/system.img')
    763       target_files_zip.writestr(
    764           'IMAGES/system.map',
    765           '\n'.join([
    766               '//system/file1 1-5 9-10',
    767               '//system/file2 11-12',
    768               '/system/app/file3 13-15']))
    769       target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
    770       # '/system/file2' has less blocks listed (2) than actual (3).
    771       target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
    772       # '/system/app/file3' has less blocks listed (3) than actual (4).
    773       target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4))
    774 
    775     tempdir = common.UnzipTemp(target_files)
    776     with zipfile.ZipFile(target_files, 'r') as input_zip:
    777       sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
    778 
    779     self.assertFalse(sparse_image.file_map['//system/file1'].extra)
    780     self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete'])
    781     self.assertTrue(
    782         sparse_image.file_map['/system/app/file3'].extra['incomplete'])
    783 
    784   def test_GetSparseImage_systemRootImage_nonSystemFiles(self):
    785     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    786     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    787       target_files_zip.write(
    788           test_utils.construct_sparse_image([(0xCAC2, 16)]),
    789           arcname='IMAGES/system.img')
    790       target_files_zip.writestr(
    791           'IMAGES/system.map',
    792           '\n'.join([
    793               '//system/file1 1-5 9-10',
    794               '//init.rc 13-15']))
    795       target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
    796       # '/init.rc' has less blocks listed (3) than actual (4).
    797       target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4))
    798 
    799     tempdir = common.UnzipTemp(target_files)
    800     with zipfile.ZipFile(target_files, 'r') as input_zip:
    801       sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
    802 
    803     self.assertFalse(sparse_image.file_map['//system/file1'].extra)
    804     self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete'])
    805 
    806   def test_GetSparseImage_fileNotFound(self):
    807     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    808     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    809       target_files_zip.write(
    810           test_utils.construct_sparse_image([(0xCAC2, 16)]),
    811           arcname='IMAGES/system.img')
    812       target_files_zip.writestr(
    813           'IMAGES/system.map',
    814           '\n'.join([
    815               '//system/file1 1-5 9-10',
    816               '//system/file2 11-12']))
    817       target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
    818 
    819     tempdir = common.UnzipTemp(target_files)
    820     with zipfile.ZipFile(target_files, 'r') as input_zip:
    821       self.assertRaises(
    822           AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
    823           False)
    824 
    825   def test_GetAvbChainedPartitionArg(self):
    826     pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
    827     info_dict = {
    828         'avb_avbtool': 'avbtool',
    829         'avb_system_key_path': pubkey,
    830         'avb_system_rollback_index_location': 2,
    831     }
    832     args = common.GetAvbChainedPartitionArg('system', info_dict).split(':')
    833     self.assertEqual(3, len(args))
    834     self.assertEqual('system', args[0])
    835     self.assertEqual('2', args[1])
    836     self.assertTrue(os.path.exists(args[2]))
    837 
    838   def test_GetAvbChainedPartitionArg_withPrivateKey(self):
    839     key = os.path.join(self.testdata_dir, 'testkey.key')
    840     info_dict = {
    841         'avb_avbtool': 'avbtool',
    842         'avb_product_key_path': key,
    843         'avb_product_rollback_index_location': 2,
    844     }
    845     args = common.GetAvbChainedPartitionArg('product', info_dict).split(':')
    846     self.assertEqual(3, len(args))
    847     self.assertEqual('product', args[0])
    848     self.assertEqual('2', args[1])
    849     self.assertTrue(os.path.exists(args[2]))
    850 
    851   def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
    852     info_dict = {
    853         'avb_avbtool': 'avbtool',
    854         'avb_system_key_path': 'does-not-exist',
    855         'avb_system_rollback_index_location': 2,
    856     }
    857     pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
    858     args = common.GetAvbChainedPartitionArg(
    859         'system', info_dict, pubkey).split(':')
    860     self.assertEqual(3, len(args))
    861     self.assertEqual('system', args[0])
    862     self.assertEqual('2', args[1])
    863     self.assertTrue(os.path.exists(args[2]))
    864 
    865   def test_GetAvbChainedPartitionArg_invalidKey(self):
    866     pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem')
    867     info_dict = {
    868         'avb_avbtool': 'avbtool',
    869         'avb_system_key_path': pubkey,
    870         'avb_system_rollback_index_location': 2,
    871     }
    872     self.assertRaises(
    873         common.ExternalError, common.GetAvbChainedPartitionArg, 'system',
    874         info_dict)
    875 
    876   INFO_DICT_DEFAULT = {
    877       'recovery_api_version': 3,
    878       'fstab_version': 2,
    879       'system_root_image': 'true',
    880       'no_recovery' : 'true',
    881       'recovery_as_boot': 'true',
    882   }
    883 
    884   @staticmethod
    885   def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path):
    886     target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    887     with zipfile.ZipFile(target_files, 'w') as target_files_zip:
    888       info_values = ''.join(
    889           ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.iteritems())])
    890       common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values)
    891 
    892       FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults"
    893       if info_dict.get('system_root_image') == 'true':
    894         fstab_values = FSTAB_TEMPLATE.format('/')
    895       else:
    896         fstab_values = FSTAB_TEMPLATE.format('/system')
    897       common.ZipWriteStr(target_files_zip, fstab_path, fstab_values)
    898 
    899       common.ZipWriteStr(
    900           target_files_zip, 'META/file_contexts', 'file-contexts')
    901     return target_files
    902 
    903   def test_LoadInfoDict(self):
    904     target_files = self._test_LoadInfoDict_createTargetFiles(
    905         self.INFO_DICT_DEFAULT,
    906         'BOOT/RAMDISK/system/etc/recovery.fstab')
    907     with zipfile.ZipFile(target_files, 'r') as target_files_zip:
    908       loaded_dict = common.LoadInfoDict(target_files_zip)
    909       self.assertEqual(3, loaded_dict['recovery_api_version'])
    910       self.assertEqual(2, loaded_dict['fstab_version'])
    911       self.assertIn('/', loaded_dict['fstab'])
    912       self.assertIn('/system', loaded_dict['fstab'])
    913 
    914   def test_LoadInfoDict_legacyRecoveryFstabPath(self):
    915     target_files = self._test_LoadInfoDict_createTargetFiles(
    916         self.INFO_DICT_DEFAULT,
    917         'BOOT/RAMDISK/etc/recovery.fstab')
    918     with zipfile.ZipFile(target_files, 'r') as target_files_zip:
    919       loaded_dict = common.LoadInfoDict(target_files_zip)
    920       self.assertEqual(3, loaded_dict['recovery_api_version'])
    921       self.assertEqual(2, loaded_dict['fstab_version'])
    922       self.assertIn('/', loaded_dict['fstab'])
    923       self.assertIn('/system', loaded_dict['fstab'])
    924 
    925   def test_LoadInfoDict_dirInput(self):
    926     target_files = self._test_LoadInfoDict_createTargetFiles(
    927         self.INFO_DICT_DEFAULT,
    928         'BOOT/RAMDISK/system/etc/recovery.fstab')
    929     unzipped = common.UnzipTemp(target_files)
    930     loaded_dict = common.LoadInfoDict(unzipped)
    931     self.assertEqual(3, loaded_dict['recovery_api_version'])
    932     self.assertEqual(2, loaded_dict['fstab_version'])
    933     self.assertIn('/', loaded_dict['fstab'])
    934     self.assertIn('/system', loaded_dict['fstab'])
    935 
    936   def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self):
    937     target_files = self._test_LoadInfoDict_createTargetFiles(
    938         self.INFO_DICT_DEFAULT,
    939         'BOOT/RAMDISK/system/etc/recovery.fstab')
    940     unzipped = common.UnzipTemp(target_files)
    941     loaded_dict = common.LoadInfoDict(unzipped)
    942     self.assertEqual(3, loaded_dict['recovery_api_version'])
    943     self.assertEqual(2, loaded_dict['fstab_version'])
    944     self.assertIn('/', loaded_dict['fstab'])
    945     self.assertIn('/system', loaded_dict['fstab'])
    946 
    947   def test_LoadInfoDict_systemRootImageFalse(self):
    948     # Devices not using system-as-root nor recovery-as-boot. Non-A/B devices
    949     # launched prior to P will likely have this config.
    950     info_dict = copy.copy(self.INFO_DICT_DEFAULT)
    951     del info_dict['no_recovery']
    952     del info_dict['system_root_image']
    953     del info_dict['recovery_as_boot']
    954     target_files = self._test_LoadInfoDict_createTargetFiles(
    955         info_dict,
    956         'RECOVERY/RAMDISK/system/etc/recovery.fstab')
    957     with zipfile.ZipFile(target_files, 'r') as target_files_zip:
    958       loaded_dict = common.LoadInfoDict(target_files_zip)
    959       self.assertEqual(3, loaded_dict['recovery_api_version'])
    960       self.assertEqual(2, loaded_dict['fstab_version'])
    961       self.assertNotIn('/', loaded_dict['fstab'])
    962       self.assertIn('/system', loaded_dict['fstab'])
    963 
    964   def test_LoadInfoDict_recoveryAsBootFalse(self):
    965     # Devices using system-as-root, but with standalone recovery image. Non-A/B
    966     # devices launched since P will likely have this config.
    967     info_dict = copy.copy(self.INFO_DICT_DEFAULT)
    968     del info_dict['no_recovery']
    969     del info_dict['recovery_as_boot']
    970     target_files = self._test_LoadInfoDict_createTargetFiles(
    971         info_dict,
    972         'RECOVERY/RAMDISK/system/etc/recovery.fstab')
    973     with zipfile.ZipFile(target_files, 'r') as target_files_zip:
    974       loaded_dict = common.LoadInfoDict(target_files_zip)
    975       self.assertEqual(3, loaded_dict['recovery_api_version'])
    976       self.assertEqual(2, loaded_dict['fstab_version'])
    977       self.assertIn('/', loaded_dict['fstab'])
    978       self.assertIn('/system', loaded_dict['fstab'])
    979 
    980   def test_LoadInfoDict_noRecoveryTrue(self):
    981     # Device doesn't have a recovery partition at all.
    982     info_dict = copy.copy(self.INFO_DICT_DEFAULT)
    983     del info_dict['recovery_as_boot']
    984     target_files = self._test_LoadInfoDict_createTargetFiles(
    985         info_dict,
    986         'RECOVERY/RAMDISK/system/etc/recovery.fstab')
    987     with zipfile.ZipFile(target_files, 'r') as target_files_zip:
    988       loaded_dict = common.LoadInfoDict(target_files_zip)
    989       self.assertEqual(3, loaded_dict['recovery_api_version'])
    990       self.assertEqual(2, loaded_dict['fstab_version'])
    991       self.assertIsNone(loaded_dict['fstab'])
    992 
    993   def test_LoadInfoDict_missingMetaMiscInfoTxt(self):
    994     target_files = self._test_LoadInfoDict_createTargetFiles(
    995         self.INFO_DICT_DEFAULT,
    996         'BOOT/RAMDISK/system/etc/recovery.fstab')
    997     common.ZipDelete(target_files, 'META/misc_info.txt')
    998     with zipfile.ZipFile(target_files, 'r') as target_files_zip:
    999       self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip)
   1000 
   1001   def test_LoadInfoDict_repacking(self):
   1002     target_files = self._test_LoadInfoDict_createTargetFiles(
   1003         self.INFO_DICT_DEFAULT,
   1004         'BOOT/RAMDISK/system/etc/recovery.fstab')
   1005     unzipped = common.UnzipTemp(target_files)
   1006     loaded_dict = common.LoadInfoDict(unzipped, True)
   1007     self.assertEqual(3, loaded_dict['recovery_api_version'])
   1008     self.assertEqual(2, loaded_dict['fstab_version'])
   1009     self.assertIn('/', loaded_dict['fstab'])
   1010     self.assertIn('/system', loaded_dict['fstab'])
   1011     self.assertEqual(
   1012         os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir'])
   1013     self.assertEqual(
   1014         os.path.join(unzipped, 'META', 'root_filesystem_config.txt'),
   1015         loaded_dict['root_fs_config'])
   1016 
   1017   def test_LoadInfoDict_repackingWithZipFileInput(self):
   1018     target_files = self._test_LoadInfoDict_createTargetFiles(
   1019         self.INFO_DICT_DEFAULT,
   1020         'BOOT/RAMDISK/system/etc/recovery.fstab')
   1021     with zipfile.ZipFile(target_files, 'r') as target_files_zip:
   1022       self.assertRaises(
   1023           AssertionError, common.LoadInfoDict, target_files_zip, True)
   1024 
   1025 
   1026 class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
   1027   """Checks the format of install-recovery.sh.
   1028 
   1029   Its format should match between common.py and validate_target_files.py.
   1030   """
   1031 
   1032   def setUp(self):
   1033     self._tempdir = common.MakeTempDir()
   1034     # Create a dummy dict that contains the fstab info for boot&recovery.
   1035     self._info = {"fstab" : {}}
   1036     dummy_fstab = [
   1037         "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
   1038         "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
   1039     self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
   1040     # Construct the gzipped recovery.img and boot.img
   1041     self.recovery_data = bytearray([
   1042         0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
   1043         0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
   1044         0x08, 0x00, 0x00, 0x00
   1045     ])
   1046     # echo -n "boot" | gzip -f | hd
   1047     self.boot_data = bytearray([
   1048         0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
   1049         0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
   1050     ])
   1051 
   1052   def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
   1053     loc = os.path.join(self._tempdir, prefix, name)
   1054     if not os.path.exists(os.path.dirname(loc)):
   1055       os.makedirs(os.path.dirname(loc))
   1056     with open(loc, "w+") as f:
   1057       f.write(data)
   1058 
   1059   def test_full_recovery(self):
   1060     recovery_image = common.File("recovery.img", self.recovery_data)
   1061     boot_image = common.File("boot.img", self.boot_data)
   1062     self._info["full_recovery_image"] = "true"
   1063 
   1064     common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
   1065                              recovery_image, boot_image, self._info)
   1066     validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
   1067                                                         self._info)
   1068 
   1069   def test_recovery_from_boot(self):
   1070     recovery_image = common.File("recovery.img", self.recovery_data)
   1071     self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
   1072     boot_image = common.File("boot.img", self.boot_data)
   1073     self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
   1074 
   1075     common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
   1076                              recovery_image, boot_image, self._info)
   1077     validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
   1078                                                         self._info)
   1079     # Validate 'recovery-from-boot' with bonus argument.
   1080     self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
   1081     common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
   1082                              recovery_image, boot_image, self._info)
   1083     validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
   1084                                                         self._info)
   1085 
   1086 
   1087 class MockScriptWriter(object):
   1088   """A class that mocks edify_generator.EdifyGenerator.
   1089   """
   1090   def __init__(self, enable_comments=False):
   1091     self.lines = []
   1092     self.enable_comments = enable_comments
   1093   def Comment(self, comment):
   1094     if self.enable_comments:
   1095       self.lines.append("# {}".format(comment))
   1096   def AppendExtra(self, extra):
   1097     self.lines.append(extra)
   1098   def __str__(self):
   1099     return "\n".join(self.lines)
   1100 
   1101 
   1102 class MockBlockDifference(object):
   1103   def __init__(self, partition, tgt, src=None):
   1104     self.partition = partition
   1105     self.tgt = tgt
   1106     self.src = src
   1107   def WriteScript(self, script, _, progress=None,
   1108                   write_verify_script=False):
   1109     if progress:
   1110       script.AppendExtra("progress({})".format(progress))
   1111     script.AppendExtra("patch({});".format(self.partition))
   1112     if write_verify_script:
   1113       self.WritePostInstallVerifyScript(script)
   1114   def WritePostInstallVerifyScript(self, script):
   1115     script.AppendExtra("verify({});".format(self.partition))
   1116 
   1117 
   1118 class FakeSparseImage(object):
   1119   def __init__(self, size):
   1120     self.blocksize = 4096
   1121     self.total_blocks = size // 4096
   1122     assert size % 4096 == 0, "{} is not a multiple of 4096".format(size)
   1123 
   1124 
   1125 class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
   1126   @staticmethod
   1127   def get_op_list(output_path):
   1128     with zipfile.ZipFile(output_path, 'r') as output_zip:
   1129       with output_zip.open("dynamic_partitions_op_list") as op_list:
   1130         return [line.strip() for line in op_list.readlines()
   1131                 if not line.startswith("#")]
   1132 
   1133   def setUp(self):
   1134     self.script = MockScriptWriter()
   1135     self.output_path = common.MakeTempFile(suffix='.zip')
   1136 
   1137   def test_full(self):
   1138     target_info = common.LoadDictionaryFromLines("""
   1139 dynamic_partition_list=system vendor
   1140 super_partition_groups=group_foo
   1141 super_group_foo_group_size={group_size}
   1142 super_group_foo_partition_list=system vendor
   1143 """.format(group_size=4 * GiB).split("\n"))
   1144     block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)),
   1145                    MockBlockDifference("vendor", FakeSparseImage(1 * GiB))]
   1146 
   1147     dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs)
   1148     with zipfile.ZipFile(self.output_path, 'w') as output_zip:
   1149       dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
   1150 
   1151     self.assertEqual(str(self.script).strip(), """
   1152 assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list")));
   1153 patch(vendor);
   1154 verify(vendor);
   1155 unmap_partition("vendor");
   1156 patch(system);
   1157 verify(system);
   1158 unmap_partition("system");
   1159 """.strip())
   1160 
   1161     lines = self.get_op_list(self.output_path)
   1162 
   1163     remove_all_groups = lines.index("remove_all_groups")
   1164     add_group = lines.index("add_group group_foo 4294967296")
   1165     add_vendor = lines.index("add vendor group_foo")
   1166     add_system = lines.index("add system group_foo")
   1167     resize_vendor = lines.index("resize vendor 1073741824")
   1168     resize_system = lines.index("resize system 3221225472")
   1169 
   1170     self.assertLess(remove_all_groups, add_group,
   1171                     "Should add groups after removing all groups")
   1172     self.assertLess(add_group, min(add_vendor, add_system),
   1173                     "Should add partitions after adding group")
   1174     self.assertLess(add_system, resize_system,
   1175                     "Should resize system after adding it")
   1176     self.assertLess(add_vendor, resize_vendor,
   1177                     "Should resize vendor after adding it")
   1178 
   1179   def test_inc_groups(self):
   1180     source_info = common.LoadDictionaryFromLines("""
   1181 super_partition_groups=group_foo group_bar group_baz
   1182 super_group_foo_group_size={group_foo_size}
   1183 super_group_bar_group_size={group_bar_size}
   1184 """.format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n"))
   1185     target_info = common.LoadDictionaryFromLines("""
   1186 super_partition_groups=group_foo group_baz group_qux
   1187 super_group_foo_group_size={group_foo_size}
   1188 super_group_baz_group_size={group_baz_size}
   1189 super_group_qux_group_size={group_qux_size}
   1190 """.format(group_foo_size=3 * GiB, group_baz_size=4 * GiB,
   1191            group_qux_size=1 * GiB).split("\n"))
   1192 
   1193     dp_diff = common.DynamicPartitionsDifference(target_info,
   1194                                                  block_diffs=[],
   1195                                                  source_info_dict=source_info)
   1196     with zipfile.ZipFile(self.output_path, 'w') as output_zip:
   1197       dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
   1198 
   1199     lines = self.get_op_list(self.output_path)
   1200 
   1201     removed = lines.index("remove_group group_bar")
   1202     shrunk = lines.index("resize_group group_foo 3221225472")
   1203     grown = lines.index("resize_group group_baz 4294967296")
   1204     added = lines.index("add_group group_qux 1073741824")
   1205 
   1206     self.assertLess(max(removed, shrunk) < min(grown, added),
   1207                     "ops that remove / shrink partitions must precede ops that "
   1208                     "grow / add partitions")
   1209 
   1210   def test_incremental(self):
   1211     source_info = common.LoadDictionaryFromLines("""
   1212 dynamic_partition_list=system vendor product product_services
   1213 super_partition_groups=group_foo
   1214 super_group_foo_group_size={group_foo_size}
   1215 super_group_foo_partition_list=system vendor product product_services
   1216 """.format(group_foo_size=4 * GiB).split("\n"))
   1217     target_info = common.LoadDictionaryFromLines("""
   1218 dynamic_partition_list=system vendor product odm
   1219 super_partition_groups=group_foo group_bar
   1220 super_group_foo_group_size={group_foo_size}
   1221 super_group_foo_partition_list=system vendor odm
   1222 super_group_bar_group_size={group_bar_size}
   1223 super_group_bar_partition_list=product
   1224 """.format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n"))
   1225 
   1226     block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB),
   1227                                        src=FakeSparseImage(1024 * MiB)),
   1228                    MockBlockDifference("vendor", FakeSparseImage(512 * MiB),
   1229                                        src=FakeSparseImage(1024 * MiB)),
   1230                    MockBlockDifference("product", FakeSparseImage(1024 * MiB),
   1231                                        src=FakeSparseImage(1024 * MiB)),
   1232                    MockBlockDifference("product_services", None,
   1233                                        src=FakeSparseImage(1024 * MiB)),
   1234                    MockBlockDifference("odm", FakeSparseImage(1024 * MiB),
   1235                                        src=None)]
   1236 
   1237     dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
   1238                                                  source_info_dict=source_info)
   1239     with zipfile.ZipFile(self.output_path, 'w') as output_zip:
   1240       dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
   1241 
   1242     metadata_idx = self.script.lines.index(
   1243         'assert(update_dynamic_partitions(package_extract_file('
   1244         '"dynamic_partitions_op_list")));')
   1245     self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx)
   1246     self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);'))
   1247     for p in ("product", "system", "odm"):
   1248       patch_idx = self.script.lines.index("patch({});".format(p))
   1249       verify_idx = self.script.lines.index("verify({});".format(p))
   1250       self.assertLess(metadata_idx, patch_idx,
   1251                       "Should patch {} after updating metadata".format(p))
   1252       self.assertLess(patch_idx, verify_idx,
   1253                       "Should verify {} after patching".format(p))
   1254 
   1255     self.assertNotIn("patch(product_services);", self.script.lines)
   1256 
   1257     lines = self.get_op_list(self.output_path)
   1258 
   1259     remove = lines.index("remove product_services")
   1260     move_product_out = lines.index("move product default")
   1261     shrink = lines.index("resize vendor 536870912")
   1262     shrink_group = lines.index("resize_group group_foo 3221225472")
   1263     add_group_bar = lines.index("add_group group_bar 1073741824")
   1264     add_odm = lines.index("add odm group_foo")
   1265     grow_existing = lines.index("resize system 1610612736")
   1266     grow_added = lines.index("resize odm 1073741824")
   1267     move_product_in = lines.index("move product group_bar")
   1268 
   1269     max_idx_move_partition_out_foo = max(remove, move_product_out, shrink)
   1270     min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added)
   1271 
   1272     self.assertLess(max_idx_move_partition_out_foo, shrink_group,
   1273                     "Must shrink group after partitions inside group are shrunk"
   1274                     " / removed")
   1275 
   1276     self.assertLess(add_group_bar, move_product_in,
   1277                     "Must add partitions to group after group is added")
   1278 
   1279     self.assertLess(max_idx_move_partition_out_foo,
   1280                     min_idx_move_partition_in_foo,
   1281                     "Must shrink partitions / remove partitions from group"
   1282                     "before adding / moving partitions into group")
   1283 
   1284   def test_remove_partition(self):
   1285     source_info = common.LoadDictionaryFromLines("""
   1286 blockimgdiff_versions=3,4
   1287 use_dynamic_partitions=true
   1288 dynamic_partition_list=foo
   1289 super_partition_groups=group_foo
   1290 super_group_foo_group_size={group_foo_size}
   1291 super_group_foo_partition_list=foo
   1292 """.format(group_foo_size=4 * GiB).split("\n"))
   1293     target_info = common.LoadDictionaryFromLines("""
   1294 blockimgdiff_versions=3,4
   1295 use_dynamic_partitions=true
   1296 super_partition_groups=group_foo
   1297 super_group_foo_group_size={group_foo_size}
   1298 """.format(group_foo_size=4 * GiB).split("\n"))
   1299 
   1300     common.OPTIONS.info_dict = target_info
   1301     common.OPTIONS.target_info_dict = target_info
   1302     common.OPTIONS.source_info_dict = source_info
   1303     common.OPTIONS.cache_size = 4 * 4096
   1304 
   1305     block_diffs = [common.BlockDifference("foo", EmptyImage(),
   1306                                           src=DataImage("source", pad=True))]
   1307 
   1308     dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
   1309                                                  source_info_dict=source_info)
   1310     with zipfile.ZipFile(self.output_path, 'w') as output_zip:
   1311       dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
   1312 
   1313     self.assertNotIn("block_image_update", str(self.script),
   1314                      "Removed partition should not be patched.")
   1315 
   1316     lines = self.get_op_list(self.output_path)
   1317     self.assertEqual(lines, ["remove foo"])
   1318