Home | History | Annotate | Download | only in releasetools
      1 #
      2 # Copyright (C) 2018 The Android Open Source Project
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #      http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 #
     16 
     17 import copy
     18 import os
     19 import os.path
     20 import zipfile
     21 
     22 import common
     23 import test_utils
     24 from ota_from_target_files import (
     25     _LoadOemDicts, AbOtaPropertyFiles, BuildInfo, FinalizeMetadata,
     26     GetPackageMetadata, GetTargetFilesZipForSecondaryImages,
     27     GetTargetFilesZipWithoutPostinstallConfig, NonAbOtaPropertyFiles,
     28     Payload, PayloadSigner, POSTINSTALL_CONFIG, PropertyFiles,
     29     StreamingPropertyFiles, WriteFingerprintAssertion)
     30 
     31 
     32 def construct_target_files(secondary=False):
     33   """Returns a target-files.zip file for generating OTA packages."""
     34   target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
     35   with zipfile.ZipFile(target_files, 'w') as target_files_zip:
     36     # META/update_engine_config.txt
     37     target_files_zip.writestr(
     38         'META/update_engine_config.txt',
     39         "PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n")
     40 
     41     # META/postinstall_config.txt
     42     target_files_zip.writestr(
     43         POSTINSTALL_CONFIG,
     44         '\n'.join([
     45             "RUN_POSTINSTALL_system=true",
     46             "POSTINSTALL_PATH_system=system/bin/otapreopt_script",
     47             "FILESYSTEM_TYPE_system=ext4",
     48             "POSTINSTALL_OPTIONAL_system=true",
     49         ]))
     50 
     51     ab_partitions = [
     52         ('IMAGES', 'boot'),
     53         ('IMAGES', 'system'),
     54         ('IMAGES', 'vendor'),
     55         ('RADIO', 'bootloader'),
     56         ('RADIO', 'modem'),
     57     ]
     58     # META/ab_partitions.txt
     59     target_files_zip.writestr(
     60         'META/ab_partitions.txt',
     61         '\n'.join([partition[1] for partition in ab_partitions]))
     62 
     63     # Create dummy images for each of them.
     64     for path, partition in ab_partitions:
     65       target_files_zip.writestr(
     66           '{}/{}.img'.format(path, partition),
     67           os.urandom(len(partition)))
     68 
     69     # system_other shouldn't appear in META/ab_partitions.txt.
     70     if secondary:
     71       target_files_zip.writestr('IMAGES/system_other.img',
     72                                 os.urandom(len("system_other")))
     73 
     74   return target_files
     75 
     76 
     77 class MockScriptWriter(object):
     78   """A class that mocks edify_generator.EdifyGenerator.
     79 
     80   It simply pushes the incoming arguments onto script stack, which is to assert
     81   the calls to EdifyGenerator functions.
     82   """
     83 
     84   def __init__(self):
     85     self.script = []
     86 
     87   def Mount(self, *args):
     88     self.script.append(('Mount',) + args)
     89 
     90   def AssertDevice(self, *args):
     91     self.script.append(('AssertDevice',) + args)
     92 
     93   def AssertOemProperty(self, *args):
     94     self.script.append(('AssertOemProperty',) + args)
     95 
     96   def AssertFingerprintOrThumbprint(self, *args):
     97     self.script.append(('AssertFingerprintOrThumbprint',) + args)
     98 
     99   def AssertSomeFingerprint(self, *args):
    100     self.script.append(('AssertSomeFingerprint',) + args)
    101 
    102   def AssertSomeThumbprint(self, *args):
    103     self.script.append(('AssertSomeThumbprint',) + args)
    104 
    105 
    106 class BuildInfoTest(test_utils.ReleaseToolsTestCase):
    107 
    108   TEST_INFO_DICT = {
    109       'build.prop' : {
    110           'ro.product.device' : 'product-device',
    111           'ro.product.name' : 'product-name',
    112           'ro.build.fingerprint' : 'build-fingerprint',
    113           'ro.build.foo' : 'build-foo',
    114       },
    115       'vendor.build.prop' : {
    116           'ro.vendor.build.fingerprint' : 'vendor-build-fingerprint',
    117       },
    118       'property1' : 'value1',
    119       'property2' : 4096,
    120   }
    121 
    122   TEST_INFO_DICT_USES_OEM_PROPS = {
    123       'build.prop' : {
    124           'ro.product.name' : 'product-name',
    125           'ro.build.thumbprint' : 'build-thumbprint',
    126           'ro.build.bar' : 'build-bar',
    127       },
    128       'vendor.build.prop' : {
    129           'ro.vendor.build.fingerprint' : 'vendor-build-fingerprint',
    130       },
    131       'property1' : 'value1',
    132       'property2' : 4096,
    133       'oem_fingerprint_properties' : 'ro.product.device ro.product.brand',
    134   }
    135 
    136   TEST_OEM_DICTS = [
    137       {
    138           'ro.product.brand' : 'brand1',
    139           'ro.product.device' : 'device1',
    140       },
    141       {
    142           'ro.product.brand' : 'brand2',
    143           'ro.product.device' : 'device2',
    144       },
    145       {
    146           'ro.product.brand' : 'brand3',
    147           'ro.product.device' : 'device3',
    148       },
    149   ]
    150 
    151   def test_init(self):
    152     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    153     self.assertEqual('product-device', target_info.device)
    154     self.assertEqual('build-fingerprint', target_info.fingerprint)
    155     self.assertFalse(target_info.is_ab)
    156     self.assertIsNone(target_info.oem_props)
    157 
    158   def test_init_with_oem_props(self):
    159     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    160                             self.TEST_OEM_DICTS)
    161     self.assertEqual('device1', target_info.device)
    162     self.assertEqual('brand1/product-name/device1:build-thumbprint',
    163                      target_info.fingerprint)
    164 
    165     # Swap the order in oem_dicts, which would lead to different BuildInfo.
    166     oem_dicts = copy.copy(self.TEST_OEM_DICTS)
    167     oem_dicts[0], oem_dicts[2] = oem_dicts[2], oem_dicts[0]
    168     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, oem_dicts)
    169     self.assertEqual('device3', target_info.device)
    170     self.assertEqual('brand3/product-name/device3:build-thumbprint',
    171                      target_info.fingerprint)
    172 
    173     # Missing oem_dict should be rejected.
    174     self.assertRaises(AssertionError, BuildInfo,
    175                       self.TEST_INFO_DICT_USES_OEM_PROPS, None)
    176 
    177   def test___getitem__(self):
    178     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    179     self.assertEqual('value1', target_info['property1'])
    180     self.assertEqual(4096, target_info['property2'])
    181     self.assertEqual('build-foo', target_info['build.prop']['ro.build.foo'])
    182 
    183   def test___getitem__with_oem_props(self):
    184     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    185                             self.TEST_OEM_DICTS)
    186     self.assertEqual('value1', target_info['property1'])
    187     self.assertEqual(4096, target_info['property2'])
    188     self.assertRaises(KeyError,
    189                       lambda: target_info['build.prop']['ro.build.foo'])
    190 
    191   def test___setitem__(self):
    192     target_info = BuildInfo(copy.deepcopy(self.TEST_INFO_DICT), None)
    193     self.assertEqual('value1', target_info['property1'])
    194     target_info['property1'] = 'value2'
    195     self.assertEqual('value2', target_info['property1'])
    196 
    197     self.assertEqual('build-foo', target_info['build.prop']['ro.build.foo'])
    198     target_info['build.prop']['ro.build.foo'] = 'build-bar'
    199     self.assertEqual('build-bar', target_info['build.prop']['ro.build.foo'])
    200 
    201   def test_get(self):
    202     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    203     self.assertEqual('value1', target_info.get('property1'))
    204     self.assertEqual(4096, target_info.get('property2'))
    205     self.assertEqual(4096, target_info.get('property2', 1024))
    206     self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
    207     self.assertEqual('build-foo', target_info.get('build.prop')['ro.build.foo'])
    208 
    209   def test_get_with_oem_props(self):
    210     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    211                             self.TEST_OEM_DICTS)
    212     self.assertEqual('value1', target_info.get('property1'))
    213     self.assertEqual(4096, target_info.get('property2'))
    214     self.assertEqual(4096, target_info.get('property2', 1024))
    215     self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
    216     self.assertIsNone(target_info.get('build.prop').get('ro.build.foo'))
    217     self.assertRaises(KeyError,
    218                       lambda: target_info.get('build.prop')['ro.build.foo'])
    219 
    220   def test_items(self):
    221     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    222     items = target_info.items()
    223     self.assertIn(('property1', 'value1'), items)
    224     self.assertIn(('property2', 4096), items)
    225 
    226   def test_GetBuildProp(self):
    227     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    228     self.assertEqual('build-foo', target_info.GetBuildProp('ro.build.foo'))
    229     self.assertRaises(common.ExternalError, target_info.GetBuildProp,
    230                       'ro.build.nonexistent')
    231 
    232   def test_GetBuildProp_with_oem_props(self):
    233     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    234                             self.TEST_OEM_DICTS)
    235     self.assertEqual('build-bar', target_info.GetBuildProp('ro.build.bar'))
    236     self.assertRaises(common.ExternalError, target_info.GetBuildProp,
    237                       'ro.build.nonexistent')
    238 
    239   def test_GetVendorBuildProp(self):
    240     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    241     self.assertEqual('vendor-build-fingerprint',
    242                      target_info.GetVendorBuildProp(
    243                          'ro.vendor.build.fingerprint'))
    244     self.assertRaises(common.ExternalError, target_info.GetVendorBuildProp,
    245                       'ro.build.nonexistent')
    246 
    247   def test_GetVendorBuildProp_with_oem_props(self):
    248     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    249                             self.TEST_OEM_DICTS)
    250     self.assertEqual('vendor-build-fingerprint',
    251                      target_info.GetVendorBuildProp(
    252                          'ro.vendor.build.fingerprint'))
    253     self.assertRaises(common.ExternalError, target_info.GetVendorBuildProp,
    254                       'ro.build.nonexistent')
    255 
    256   def test_vendor_fingerprint(self):
    257     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    258     self.assertEqual('vendor-build-fingerprint',
    259                      target_info.vendor_fingerprint)
    260 
    261   def test_vendor_fingerprint_blacklisted(self):
    262     target_info_dict = copy.deepcopy(self.TEST_INFO_DICT_USES_OEM_PROPS)
    263     del target_info_dict['vendor.build.prop']['ro.vendor.build.fingerprint']
    264     target_info = BuildInfo(target_info_dict, self.TEST_OEM_DICTS)
    265     self.assertIsNone(target_info.vendor_fingerprint)
    266 
    267   def test_vendor_fingerprint_without_vendor_build_prop(self):
    268     target_info_dict = copy.deepcopy(self.TEST_INFO_DICT_USES_OEM_PROPS)
    269     del target_info_dict['vendor.build.prop']
    270     target_info = BuildInfo(target_info_dict, self.TEST_OEM_DICTS)
    271     self.assertIsNone(target_info.vendor_fingerprint)
    272 
    273   def test_WriteMountOemScript(self):
    274     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    275                             self.TEST_OEM_DICTS)
    276     script_writer = MockScriptWriter()
    277     target_info.WriteMountOemScript(script_writer)
    278     self.assertEqual([('Mount', '/oem', None)], script_writer.script)
    279 
    280   def test_WriteDeviceAssertions(self):
    281     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    282     script_writer = MockScriptWriter()
    283     target_info.WriteDeviceAssertions(script_writer, False)
    284     self.assertEqual([('AssertDevice', 'product-device')], script_writer.script)
    285 
    286   def test_WriteDeviceAssertions_with_oem_props(self):
    287     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    288                             self.TEST_OEM_DICTS)
    289     script_writer = MockScriptWriter()
    290     target_info.WriteDeviceAssertions(script_writer, False)
    291     self.assertEqual(
    292         [
    293             ('AssertOemProperty', 'ro.product.device',
    294              ['device1', 'device2', 'device3'], False),
    295             ('AssertOemProperty', 'ro.product.brand',
    296              ['brand1', 'brand2', 'brand3'], False),
    297         ],
    298         script_writer.script)
    299 
    300   def test_WriteFingerprintAssertion_without_oem_props(self):
    301     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    302     source_info_dict = copy.deepcopy(self.TEST_INFO_DICT)
    303     source_info_dict['build.prop']['ro.build.fingerprint'] = (
    304         'source-build-fingerprint')
    305     source_info = BuildInfo(source_info_dict, None)
    306 
    307     script_writer = MockScriptWriter()
    308     WriteFingerprintAssertion(script_writer, target_info, source_info)
    309     self.assertEqual(
    310         [('AssertSomeFingerprint', 'source-build-fingerprint',
    311           'build-fingerprint')],
    312         script_writer.script)
    313 
    314   def test_WriteFingerprintAssertion_with_source_oem_props(self):
    315     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    316     source_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    317                             self.TEST_OEM_DICTS)
    318 
    319     script_writer = MockScriptWriter()
    320     WriteFingerprintAssertion(script_writer, target_info, source_info)
    321     self.assertEqual(
    322         [('AssertFingerprintOrThumbprint', 'build-fingerprint',
    323           'build-thumbprint')],
    324         script_writer.script)
    325 
    326   def test_WriteFingerprintAssertion_with_target_oem_props(self):
    327     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    328                             self.TEST_OEM_DICTS)
    329     source_info = BuildInfo(self.TEST_INFO_DICT, None)
    330 
    331     script_writer = MockScriptWriter()
    332     WriteFingerprintAssertion(script_writer, target_info, source_info)
    333     self.assertEqual(
    334         [('AssertFingerprintOrThumbprint', 'build-fingerprint',
    335           'build-thumbprint')],
    336         script_writer.script)
    337 
    338   def test_WriteFingerprintAssertion_with_both_oem_props(self):
    339     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    340                             self.TEST_OEM_DICTS)
    341     source_info_dict = copy.deepcopy(self.TEST_INFO_DICT_USES_OEM_PROPS)
    342     source_info_dict['build.prop']['ro.build.thumbprint'] = (
    343         'source-build-thumbprint')
    344     source_info = BuildInfo(source_info_dict, self.TEST_OEM_DICTS)
    345 
    346     script_writer = MockScriptWriter()
    347     WriteFingerprintAssertion(script_writer, target_info, source_info)
    348     self.assertEqual(
    349         [('AssertSomeThumbprint', 'build-thumbprint',
    350           'source-build-thumbprint')],
    351         script_writer.script)
    352 
    353 
    354 class LoadOemDictsTest(test_utils.ReleaseToolsTestCase):
    355 
    356   def test_NoneDict(self):
    357     self.assertIsNone(_LoadOemDicts(None))
    358 
    359   def test_SingleDict(self):
    360     dict_file = common.MakeTempFile()
    361     with open(dict_file, 'w') as dict_fp:
    362       dict_fp.write('abc=1\ndef=2\nxyz=foo\na.b.c=bar\n')
    363 
    364     oem_dicts = _LoadOemDicts([dict_file])
    365     self.assertEqual(1, len(oem_dicts))
    366     self.assertEqual('foo', oem_dicts[0]['xyz'])
    367     self.assertEqual('bar', oem_dicts[0]['a.b.c'])
    368 
    369   def test_MultipleDicts(self):
    370     oem_source = []
    371     for i in range(3):
    372       dict_file = common.MakeTempFile()
    373       with open(dict_file, 'w') as dict_fp:
    374         dict_fp.write(
    375             'ro.build.index={}\ndef=2\nxyz=foo\na.b.c=bar\n'.format(i))
    376       oem_source.append(dict_file)
    377 
    378     oem_dicts = _LoadOemDicts(oem_source)
    379     self.assertEqual(3, len(oem_dicts))
    380     for i, oem_dict in enumerate(oem_dicts):
    381       self.assertEqual('2', oem_dict['def'])
    382       self.assertEqual('foo', oem_dict['xyz'])
    383       self.assertEqual('bar', oem_dict['a.b.c'])
    384       self.assertEqual('{}'.format(i), oem_dict['ro.build.index'])
    385 
    386 
    387 class OtaFromTargetFilesTest(test_utils.ReleaseToolsTestCase):
    388 
    389   TEST_TARGET_INFO_DICT = {
    390       'build.prop' : {
    391           'ro.product.device' : 'product-device',
    392           'ro.build.fingerprint' : 'build-fingerprint-target',
    393           'ro.build.version.incremental' : 'build-version-incremental-target',
    394           'ro.build.version.sdk' : '27',
    395           'ro.build.version.security_patch' : '2017-12-01',
    396           'ro.build.date.utc' : '1500000000',
    397       },
    398   }
    399 
    400   TEST_SOURCE_INFO_DICT = {
    401       'build.prop' : {
    402           'ro.product.device' : 'product-device',
    403           'ro.build.fingerprint' : 'build-fingerprint-source',
    404           'ro.build.version.incremental' : 'build-version-incremental-source',
    405           'ro.build.version.sdk' : '25',
    406           'ro.build.version.security_patch' : '2016-12-01',
    407           'ro.build.date.utc' : '1400000000',
    408       },
    409   }
    410 
    411   def setUp(self):
    412     self.testdata_dir = test_utils.get_testdata_dir()
    413     self.assertTrue(os.path.exists(self.testdata_dir))
    414 
    415     # Reset the global options as in ota_from_target_files.py.
    416     common.OPTIONS.incremental_source = None
    417     common.OPTIONS.downgrade = False
    418     common.OPTIONS.retrofit_dynamic_partitions = False
    419     common.OPTIONS.timestamp = False
    420     common.OPTIONS.wipe_user_data = False
    421     common.OPTIONS.no_signing = False
    422     common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
    423     common.OPTIONS.key_passwords = {
    424         common.OPTIONS.package_key : None,
    425     }
    426 
    427     common.OPTIONS.search_path = test_utils.get_search_path()
    428     self.assertIsNotNone(common.OPTIONS.search_path)
    429 
    430   def test_GetPackageMetadata_abOta_full(self):
    431     target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
    432     target_info_dict['ab_update'] = 'true'
    433     target_info = BuildInfo(target_info_dict, None)
    434     metadata = GetPackageMetadata(target_info)
    435     self.assertDictEqual(
    436         {
    437             'ota-type' : 'AB',
    438             'ota-required-cache' : '0',
    439             'post-build' : 'build-fingerprint-target',
    440             'post-build-incremental' : 'build-version-incremental-target',
    441             'post-sdk-level' : '27',
    442             'post-security-patch-level' : '2017-12-01',
    443             'post-timestamp' : '1500000000',
    444             'pre-device' : 'product-device',
    445         },
    446         metadata)
    447 
    448   def test_GetPackageMetadata_abOta_incremental(self):
    449     target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
    450     target_info_dict['ab_update'] = 'true'
    451     target_info = BuildInfo(target_info_dict, None)
    452     source_info = BuildInfo(self.TEST_SOURCE_INFO_DICT, None)
    453     common.OPTIONS.incremental_source = ''
    454     metadata = GetPackageMetadata(target_info, source_info)
    455     self.assertDictEqual(
    456         {
    457             'ota-type' : 'AB',
    458             'ota-required-cache' : '0',
    459             'post-build' : 'build-fingerprint-target',
    460             'post-build-incremental' : 'build-version-incremental-target',
    461             'post-sdk-level' : '27',
    462             'post-security-patch-level' : '2017-12-01',
    463             'post-timestamp' : '1500000000',
    464             'pre-device' : 'product-device',
    465             'pre-build' : 'build-fingerprint-source',
    466             'pre-build-incremental' : 'build-version-incremental-source',
    467         },
    468         metadata)
    469 
    470   def test_GetPackageMetadata_nonAbOta_full(self):
    471     target_info = BuildInfo(self.TEST_TARGET_INFO_DICT, None)
    472     metadata = GetPackageMetadata(target_info)
    473     self.assertDictEqual(
    474         {
    475             'ota-type' : 'BLOCK',
    476             'post-build' : 'build-fingerprint-target',
    477             'post-build-incremental' : 'build-version-incremental-target',
    478             'post-sdk-level' : '27',
    479             'post-security-patch-level' : '2017-12-01',
    480             'post-timestamp' : '1500000000',
    481             'pre-device' : 'product-device',
    482         },
    483         metadata)
    484 
    485   def test_GetPackageMetadata_nonAbOta_incremental(self):
    486     target_info = BuildInfo(self.TEST_TARGET_INFO_DICT, None)
    487     source_info = BuildInfo(self.TEST_SOURCE_INFO_DICT, None)
    488     common.OPTIONS.incremental_source = ''
    489     metadata = GetPackageMetadata(target_info, source_info)
    490     self.assertDictEqual(
    491         {
    492             'ota-type' : 'BLOCK',
    493             'post-build' : 'build-fingerprint-target',
    494             'post-build-incremental' : 'build-version-incremental-target',
    495             'post-sdk-level' : '27',
    496             'post-security-patch-level' : '2017-12-01',
    497             'post-timestamp' : '1500000000',
    498             'pre-device' : 'product-device',
    499             'pre-build' : 'build-fingerprint-source',
    500             'pre-build-incremental' : 'build-version-incremental-source',
    501         },
    502         metadata)
    503 
    504   def test_GetPackageMetadata_wipe(self):
    505     target_info = BuildInfo(self.TEST_TARGET_INFO_DICT, None)
    506     common.OPTIONS.wipe_user_data = True
    507     metadata = GetPackageMetadata(target_info)
    508     self.assertDictEqual(
    509         {
    510             'ota-type' : 'BLOCK',
    511             'ota-wipe' : 'yes',
    512             'post-build' : 'build-fingerprint-target',
    513             'post-build-incremental' : 'build-version-incremental-target',
    514             'post-sdk-level' : '27',
    515             'post-security-patch-level' : '2017-12-01',
    516             'post-timestamp' : '1500000000',
    517             'pre-device' : 'product-device',
    518         },
    519         metadata)
    520 
    521   def test_GetPackageMetadata_retrofitDynamicPartitions(self):
    522     target_info = BuildInfo(self.TEST_TARGET_INFO_DICT, None)
    523     common.OPTIONS.retrofit_dynamic_partitions = True
    524     metadata = GetPackageMetadata(target_info)
    525     self.assertDictEqual(
    526         {
    527             'ota-retrofit-dynamic-partitions' : 'yes',
    528             'ota-type' : 'BLOCK',
    529             'post-build' : 'build-fingerprint-target',
    530             'post-build-incremental' : 'build-version-incremental-target',
    531             'post-sdk-level' : '27',
    532             'post-security-patch-level' : '2017-12-01',
    533             'post-timestamp' : '1500000000',
    534             'pre-device' : 'product-device',
    535         },
    536         metadata)
    537 
    538   @staticmethod
    539   def _test_GetPackageMetadata_swapBuildTimestamps(target_info, source_info):
    540     (target_info['build.prop']['ro.build.date.utc'],
    541      source_info['build.prop']['ro.build.date.utc']) = (
    542          source_info['build.prop']['ro.build.date.utc'],
    543          target_info['build.prop']['ro.build.date.utc'])
    544 
    545   def test_GetPackageMetadata_unintentionalDowngradeDetected(self):
    546     target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
    547     source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
    548     self._test_GetPackageMetadata_swapBuildTimestamps(
    549         target_info_dict, source_info_dict)
    550 
    551     target_info = BuildInfo(target_info_dict, None)
    552     source_info = BuildInfo(source_info_dict, None)
    553     common.OPTIONS.incremental_source = ''
    554     self.assertRaises(RuntimeError, GetPackageMetadata, target_info,
    555                       source_info)
    556 
    557   def test_GetPackageMetadata_downgrade(self):
    558     target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
    559     source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
    560     self._test_GetPackageMetadata_swapBuildTimestamps(
    561         target_info_dict, source_info_dict)
    562 
    563     target_info = BuildInfo(target_info_dict, None)
    564     source_info = BuildInfo(source_info_dict, None)
    565     common.OPTIONS.incremental_source = ''
    566     common.OPTIONS.downgrade = True
    567     common.OPTIONS.wipe_user_data = True
    568     metadata = GetPackageMetadata(target_info, source_info)
    569     self.assertDictEqual(
    570         {
    571             'ota-downgrade' : 'yes',
    572             'ota-type' : 'BLOCK',
    573             'ota-wipe' : 'yes',
    574             'post-build' : 'build-fingerprint-target',
    575             'post-build-incremental' : 'build-version-incremental-target',
    576             'post-sdk-level' : '27',
    577             'post-security-patch-level' : '2017-12-01',
    578             'post-timestamp' : '1400000000',
    579             'pre-device' : 'product-device',
    580             'pre-build' : 'build-fingerprint-source',
    581             'pre-build-incremental' : 'build-version-incremental-source',
    582         },
    583         metadata)
    584 
    585   def test_GetTargetFilesZipForSecondaryImages(self):
    586     input_file = construct_target_files(secondary=True)
    587     target_file = GetTargetFilesZipForSecondaryImages(input_file)
    588 
    589     with zipfile.ZipFile(target_file) as verify_zip:
    590       namelist = verify_zip.namelist()
    591 
    592     self.assertIn('META/ab_partitions.txt', namelist)
    593     self.assertIn('IMAGES/boot.img', namelist)
    594     self.assertIn('IMAGES/system.img', namelist)
    595     self.assertIn('IMAGES/vendor.img', namelist)
    596     self.assertIn('RADIO/bootloader.img', namelist)
    597     self.assertIn('RADIO/modem.img', namelist)
    598     self.assertIn(POSTINSTALL_CONFIG, namelist)
    599 
    600     self.assertNotIn('IMAGES/system_other.img', namelist)
    601     self.assertNotIn('IMAGES/system.map', namelist)
    602 
    603   def test_GetTargetFilesZipForSecondaryImages_skipPostinstall(self):
    604     input_file = construct_target_files(secondary=True)
    605     target_file = GetTargetFilesZipForSecondaryImages(
    606         input_file, skip_postinstall=True)
    607 
    608     with zipfile.ZipFile(target_file) as verify_zip:
    609       namelist = verify_zip.namelist()
    610 
    611     self.assertIn('META/ab_partitions.txt', namelist)
    612     self.assertIn('IMAGES/boot.img', namelist)
    613     self.assertIn('IMAGES/system.img', namelist)
    614     self.assertIn('IMAGES/vendor.img', namelist)
    615     self.assertIn('RADIO/bootloader.img', namelist)
    616     self.assertIn('RADIO/modem.img', namelist)
    617 
    618     self.assertNotIn('IMAGES/system_other.img', namelist)
    619     self.assertNotIn('IMAGES/system.map', namelist)
    620     self.assertNotIn(POSTINSTALL_CONFIG, namelist)
    621 
    622   def test_GetTargetFilesZipForSecondaryImages_withoutRadioImages(self):
    623     input_file = construct_target_files(secondary=True)
    624     common.ZipDelete(input_file, 'RADIO/bootloader.img')
    625     common.ZipDelete(input_file, 'RADIO/modem.img')
    626     target_file = GetTargetFilesZipForSecondaryImages(input_file)
    627 
    628     with zipfile.ZipFile(target_file) as verify_zip:
    629       namelist = verify_zip.namelist()
    630 
    631     self.assertIn('META/ab_partitions.txt', namelist)
    632     self.assertIn('IMAGES/boot.img', namelist)
    633     self.assertIn('IMAGES/system.img', namelist)
    634     self.assertIn('IMAGES/vendor.img', namelist)
    635     self.assertIn(POSTINSTALL_CONFIG, namelist)
    636 
    637     self.assertNotIn('IMAGES/system_other.img', namelist)
    638     self.assertNotIn('IMAGES/system.map', namelist)
    639     self.assertNotIn('RADIO/bootloader.img', namelist)
    640     self.assertNotIn('RADIO/modem.img', namelist)
    641 
    642   def test_GetTargetFilesZipWithoutPostinstallConfig(self):
    643     input_file = construct_target_files()
    644     target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file)
    645     with zipfile.ZipFile(target_file) as verify_zip:
    646       self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
    647 
    648   def test_GetTargetFilesZipWithoutPostinstallConfig_missingEntry(self):
    649     input_file = construct_target_files()
    650     common.ZipDelete(input_file, POSTINSTALL_CONFIG)
    651     target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file)
    652     with zipfile.ZipFile(target_file) as verify_zip:
    653       self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
    654 
    655   def _test_FinalizeMetadata(self, large_entry=False):
    656     entries = [
    657         'required-entry1',
    658         'required-entry2',
    659     ]
    660     zip_file = PropertyFilesTest.construct_zip_package(entries)
    661     # Add a large entry of 1 GiB if requested.
    662     if large_entry:
    663       with zipfile.ZipFile(zip_file, 'a') as zip_fp:
    664         zip_fp.writestr(
    665             # Using 'zoo' so that the entry stays behind others after signing.
    666             'zoo',
    667             'A' * 1024 * 1024 * 1024,
    668             zipfile.ZIP_STORED)
    669 
    670     metadata = {}
    671     output_file = common.MakeTempFile(suffix='.zip')
    672     needed_property_files = (
    673         TestPropertyFiles(),
    674     )
    675     FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
    676     self.assertIn('ota-test-property-files', metadata)
    677 
    678   def test_FinalizeMetadata(self):
    679     self._test_FinalizeMetadata()
    680 
    681   def test_FinalizeMetadata_withNoSigning(self):
    682     common.OPTIONS.no_signing = True
    683     self._test_FinalizeMetadata()
    684 
    685   def test_FinalizeMetadata_largeEntry(self):
    686     self._test_FinalizeMetadata(large_entry=True)
    687 
    688   def test_FinalizeMetadata_largeEntry_withNoSigning(self):
    689     common.OPTIONS.no_signing = True
    690     self._test_FinalizeMetadata(large_entry=True)
    691 
    692   def test_FinalizeMetadata_insufficientSpace(self):
    693     entries = [
    694         'required-entry1',
    695         'required-entry2',
    696         'optional-entry1',
    697         'optional-entry2',
    698     ]
    699     zip_file = PropertyFilesTest.construct_zip_package(entries)
    700     with zipfile.ZipFile(zip_file, 'a') as zip_fp:
    701       zip_fp.writestr(
    702           # 'foo-entry1' will appear ahead of all other entries (in alphabetical
    703           # order) after the signing, which will in turn trigger the
    704           # InsufficientSpaceException and an automatic retry.
    705           'foo-entry1',
    706           'A' * 1024 * 1024,
    707           zipfile.ZIP_STORED)
    708 
    709     metadata = {}
    710     needed_property_files = (
    711         TestPropertyFiles(),
    712     )
    713     output_file = common.MakeTempFile(suffix='.zip')
    714     FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
    715     self.assertIn('ota-test-property-files', metadata)
    716 
    717 
    718 class TestPropertyFiles(PropertyFiles):
    719   """A class that extends PropertyFiles for testing purpose."""
    720 
    721   def __init__(self):
    722     super(TestPropertyFiles, self).__init__()
    723     self.name = 'ota-test-property-files'
    724     self.required = (
    725         'required-entry1',
    726         'required-entry2',
    727     )
    728     self.optional = (
    729         'optional-entry1',
    730         'optional-entry2',
    731     )
    732 
    733 
    734 class PropertyFilesTest(test_utils.ReleaseToolsTestCase):
    735 
    736   def setUp(self):
    737     common.OPTIONS.no_signing = False
    738 
    739   @staticmethod
    740   def construct_zip_package(entries):
    741     zip_file = common.MakeTempFile(suffix='.zip')
    742     with zipfile.ZipFile(zip_file, 'w') as zip_fp:
    743       for entry in entries:
    744         zip_fp.writestr(
    745             entry,
    746             entry.replace('.', '-').upper(),
    747             zipfile.ZIP_STORED)
    748     return zip_file
    749 
    750   @staticmethod
    751   def _parse_property_files_string(data):
    752     result = {}
    753     for token in data.split(','):
    754       name, info = token.split(':', 1)
    755       result[name] = info
    756     return result
    757 
    758   def _verify_entries(self, input_file, tokens, entries):
    759     for entry in entries:
    760       offset, size = map(int, tokens[entry].split(':'))
    761       with open(input_file, 'rb') as input_fp:
    762         input_fp.seek(offset)
    763         if entry == 'metadata':
    764           expected = b'META-INF/COM/ANDROID/METADATA'
    765         else:
    766           expected = entry.replace('.', '-').upper().encode()
    767         self.assertEqual(expected, input_fp.read(size))
    768 
    769   def test_Compute(self):
    770     entries = (
    771         'required-entry1',
    772         'required-entry2',
    773     )
    774     zip_file = self.construct_zip_package(entries)
    775     property_files = TestPropertyFiles()
    776     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    777       property_files_string = property_files.Compute(zip_fp)
    778 
    779     tokens = self._parse_property_files_string(property_files_string)
    780     self.assertEqual(3, len(tokens))
    781     self._verify_entries(zip_file, tokens, entries)
    782 
    783   def test_Compute_withOptionalEntries(self):
    784     entries = (
    785         'required-entry1',
    786         'required-entry2',
    787         'optional-entry1',
    788         'optional-entry2',
    789     )
    790     zip_file = self.construct_zip_package(entries)
    791     property_files = TestPropertyFiles()
    792     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    793       property_files_string = property_files.Compute(zip_fp)
    794 
    795     tokens = self._parse_property_files_string(property_files_string)
    796     self.assertEqual(5, len(tokens))
    797     self._verify_entries(zip_file, tokens, entries)
    798 
    799   def test_Compute_missingRequiredEntry(self):
    800     entries = (
    801         'required-entry2',
    802     )
    803     zip_file = self.construct_zip_package(entries)
    804     property_files = TestPropertyFiles()
    805     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    806       self.assertRaises(KeyError, property_files.Compute, zip_fp)
    807 
    808   def test_Finalize(self):
    809     entries = [
    810         'required-entry1',
    811         'required-entry2',
    812         'META-INF/com/android/metadata',
    813     ]
    814     zip_file = self.construct_zip_package(entries)
    815     property_files = TestPropertyFiles()
    816     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    817       raw_metadata = property_files.GetPropertyFilesString(
    818           zip_fp, reserve_space=False)
    819       streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata))
    820     tokens = self._parse_property_files_string(streaming_metadata)
    821 
    822     self.assertEqual(3, len(tokens))
    823     # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
    824     # streaming metadata.
    825     entries[2] = 'metadata'
    826     self._verify_entries(zip_file, tokens, entries)
    827 
    828   def test_Finalize_assertReservedLength(self):
    829     entries = (
    830         'required-entry1',
    831         'required-entry2',
    832         'optional-entry1',
    833         'optional-entry2',
    834         'META-INF/com/android/metadata',
    835     )
    836     zip_file = self.construct_zip_package(entries)
    837     property_files = TestPropertyFiles()
    838     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    839       # First get the raw metadata string (i.e. without padding space).
    840       raw_metadata = property_files.GetPropertyFilesString(
    841           zip_fp, reserve_space=False)
    842       raw_length = len(raw_metadata)
    843 
    844       # Now pass in the exact expected length.
    845       streaming_metadata = property_files.Finalize(zip_fp, raw_length)
    846       self.assertEqual(raw_length, len(streaming_metadata))
    847 
    848       # Or pass in insufficient length.
    849       self.assertRaises(
    850           PropertyFiles.InsufficientSpaceException,
    851           property_files.Finalize,
    852           zip_fp,
    853           raw_length - 1)
    854 
    855       # Or pass in a much larger size.
    856       streaming_metadata = property_files.Finalize(
    857           zip_fp,
    858           raw_length + 20)
    859       self.assertEqual(raw_length + 20, len(streaming_metadata))
    860       self.assertEqual(' ' * 20, streaming_metadata[raw_length:])
    861 
    862   def test_Verify(self):
    863     entries = (
    864         'required-entry1',
    865         'required-entry2',
    866         'optional-entry1',
    867         'optional-entry2',
    868         'META-INF/com/android/metadata',
    869     )
    870     zip_file = self.construct_zip_package(entries)
    871     property_files = TestPropertyFiles()
    872     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    873       # First get the raw metadata string (i.e. without padding space).
    874       raw_metadata = property_files.GetPropertyFilesString(
    875           zip_fp, reserve_space=False)
    876 
    877       # Should pass the test if verification passes.
    878       property_files.Verify(zip_fp, raw_metadata)
    879 
    880       # Or raise on verification failure.
    881       self.assertRaises(
    882           AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
    883 
    884 
    885 class StreamingPropertyFilesTest(PropertyFilesTest):
    886   """Additional sanity checks specialized for StreamingPropertyFiles."""
    887 
    888   def test_init(self):
    889     property_files = StreamingPropertyFiles()
    890     self.assertEqual('ota-streaming-property-files', property_files.name)
    891     self.assertEqual(
    892         (
    893             'payload.bin',
    894             'payload_properties.txt',
    895         ),
    896         property_files.required)
    897     self.assertEqual(
    898         (
    899             'care_map.pb',
    900             'care_map.txt',
    901             'compatibility.zip',
    902         ),
    903         property_files.optional)
    904 
    905   def test_Compute(self):
    906     entries = (
    907         'payload.bin',
    908         'payload_properties.txt',
    909         'care_map.txt',
    910         'compatibility.zip',
    911     )
    912     zip_file = self.construct_zip_package(entries)
    913     property_files = StreamingPropertyFiles()
    914     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    915       property_files_string = property_files.Compute(zip_fp)
    916 
    917     tokens = self._parse_property_files_string(property_files_string)
    918     self.assertEqual(5, len(tokens))
    919     self._verify_entries(zip_file, tokens, entries)
    920 
    921   def test_Finalize(self):
    922     entries = [
    923         'payload.bin',
    924         'payload_properties.txt',
    925         'care_map.txt',
    926         'compatibility.zip',
    927         'META-INF/com/android/metadata',
    928     ]
    929     zip_file = self.construct_zip_package(entries)
    930     property_files = StreamingPropertyFiles()
    931     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    932       raw_metadata = property_files.GetPropertyFilesString(
    933           zip_fp, reserve_space=False)
    934       streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata))
    935     tokens = self._parse_property_files_string(streaming_metadata)
    936 
    937     self.assertEqual(5, len(tokens))
    938     # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
    939     # streaming metadata.
    940     entries[4] = 'metadata'
    941     self._verify_entries(zip_file, tokens, entries)
    942 
    943   def test_Verify(self):
    944     entries = (
    945         'payload.bin',
    946         'payload_properties.txt',
    947         'care_map.txt',
    948         'compatibility.zip',
    949         'META-INF/com/android/metadata',
    950     )
    951     zip_file = self.construct_zip_package(entries)
    952     property_files = StreamingPropertyFiles()
    953     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    954       # First get the raw metadata string (i.e. without padding space).
    955       raw_metadata = property_files.GetPropertyFilesString(
    956           zip_fp, reserve_space=False)
    957 
    958       # Should pass the test if verification passes.
    959       property_files.Verify(zip_fp, raw_metadata)
    960 
    961       # Or raise on verification failure.
    962       self.assertRaises(
    963           AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
    964 
    965 
    966 class AbOtaPropertyFilesTest(PropertyFilesTest):
    967   """Additional sanity checks specialized for AbOtaPropertyFiles."""
    968 
    969   # The size for payload and metadata signature size.
    970   SIGNATURE_SIZE = 256
    971 
    972   def setUp(self):
    973     self.testdata_dir = test_utils.get_testdata_dir()
    974     self.assertTrue(os.path.exists(self.testdata_dir))
    975 
    976     common.OPTIONS.wipe_user_data = False
    977     common.OPTIONS.payload_signer = None
    978     common.OPTIONS.payload_signer_args = None
    979     common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
    980     common.OPTIONS.key_passwords = {
    981         common.OPTIONS.package_key : None,
    982     }
    983 
    984   def test_init(self):
    985     property_files = AbOtaPropertyFiles()
    986     self.assertEqual('ota-property-files', property_files.name)
    987     self.assertEqual(
    988         (
    989             'payload.bin',
    990             'payload_properties.txt',
    991         ),
    992         property_files.required)
    993     self.assertEqual(
    994         (
    995             'care_map.pb',
    996             'care_map.txt',
    997             'compatibility.zip',
    998         ),
    999         property_files.optional)
   1000 
   1001   def test_GetPayloadMetadataOffsetAndSize(self):
   1002     target_file = construct_target_files()
   1003     payload = Payload()
   1004     payload.Generate(target_file)
   1005 
   1006     payload_signer = PayloadSigner()
   1007     payload.Sign(payload_signer)
   1008 
   1009     output_file = common.MakeTempFile(suffix='.zip')
   1010     with zipfile.ZipFile(output_file, 'w') as output_zip:
   1011       payload.WriteToZip(output_zip)
   1012 
   1013     # Find out the payload metadata offset and size.
   1014     property_files = AbOtaPropertyFiles()
   1015     with zipfile.ZipFile(output_file) as input_zip:
   1016       # pylint: disable=protected-access
   1017       payload_offset, metadata_total = (
   1018           property_files._GetPayloadMetadataOffsetAndSize(input_zip))
   1019 
   1020     # Read in the metadata signature directly.
   1021     with open(output_file, 'rb') as verify_fp:
   1022       verify_fp.seek(payload_offset + metadata_total - self.SIGNATURE_SIZE)
   1023       metadata_signature = verify_fp.read(self.SIGNATURE_SIZE)
   1024 
   1025     # Now we extract the metadata hash via brillo_update_payload script, which
   1026     # will serve as the oracle result.
   1027     payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
   1028     metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
   1029     cmd = ['brillo_update_payload', 'hash',
   1030            '--unsigned_payload', payload.payload_file,
   1031            '--signature_size', str(self.SIGNATURE_SIZE),
   1032            '--metadata_hash_file', metadata_sig_file,
   1033            '--payload_hash_file', payload_sig_file]
   1034     proc = common.Run(cmd)
   1035     stdoutdata, _ = proc.communicate()
   1036     self.assertEqual(
   1037         0, proc.returncode,
   1038         'Failed to run brillo_update_payload:\n{}'.format(stdoutdata))
   1039 
   1040     signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)
   1041 
   1042     # Finally we can compare the two signatures.
   1043     with open(signed_metadata_sig_file, 'rb') as verify_fp:
   1044       self.assertEqual(verify_fp.read(), metadata_signature)
   1045 
   1046   @staticmethod
   1047   def construct_zip_package_withValidPayload(with_metadata=False):
   1048     # Cannot use construct_zip_package() since we need a "valid" payload.bin.
   1049     target_file = construct_target_files()
   1050     payload = Payload()
   1051     payload.Generate(target_file)
   1052 
   1053     payload_signer = PayloadSigner()
   1054     payload.Sign(payload_signer)
   1055 
   1056     zip_file = common.MakeTempFile(suffix='.zip')
   1057     with zipfile.ZipFile(zip_file, 'w') as zip_fp:
   1058       # 'payload.bin',
   1059       payload.WriteToZip(zip_fp)
   1060 
   1061       # Other entries.
   1062       entries = ['care_map.txt', 'compatibility.zip']
   1063 
   1064       # Put META-INF/com/android/metadata if needed.
   1065       if with_metadata:
   1066         entries.append('META-INF/com/android/metadata')
   1067 
   1068       for entry in entries:
   1069         zip_fp.writestr(
   1070             entry, entry.replace('.', '-').upper(), zipfile.ZIP_STORED)
   1071 
   1072     return zip_file
   1073 
   1074   def test_Compute(self):
   1075     zip_file = self.construct_zip_package_withValidPayload()
   1076     property_files = AbOtaPropertyFiles()
   1077     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
   1078       property_files_string = property_files.Compute(zip_fp)
   1079 
   1080     tokens = self._parse_property_files_string(property_files_string)
   1081     # "6" indcludes the four entries above, one metadata entry, and one entry
   1082     # for payload-metadata.bin.
   1083     self.assertEqual(6, len(tokens))
   1084     self._verify_entries(
   1085         zip_file, tokens, ('care_map.txt', 'compatibility.zip'))
   1086 
   1087   def test_Finalize(self):
   1088     zip_file = self.construct_zip_package_withValidPayload(with_metadata=True)
   1089     property_files = AbOtaPropertyFiles()
   1090     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
   1091       raw_metadata = property_files.GetPropertyFilesString(
   1092           zip_fp, reserve_space=False)
   1093       property_files_string = property_files.Finalize(zip_fp, len(raw_metadata))
   1094 
   1095     tokens = self._parse_property_files_string(property_files_string)
   1096     # "6" indcludes the four entries above, one metadata entry, and one entry
   1097     # for payload-metadata.bin.
   1098     self.assertEqual(6, len(tokens))
   1099     self._verify_entries(
   1100         zip_file, tokens, ('care_map.txt', 'compatibility.zip'))
   1101 
   1102   def test_Verify(self):
   1103     zip_file = self.construct_zip_package_withValidPayload(with_metadata=True)
   1104     property_files = AbOtaPropertyFiles()
   1105     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
   1106       raw_metadata = property_files.GetPropertyFilesString(
   1107           zip_fp, reserve_space=False)
   1108 
   1109       property_files.Verify(zip_fp, raw_metadata)
   1110 
   1111 
   1112 class NonAbOtaPropertyFilesTest(PropertyFilesTest):
   1113   """Additional sanity checks specialized for NonAbOtaPropertyFiles."""
   1114 
   1115   def test_init(self):
   1116     property_files = NonAbOtaPropertyFiles()
   1117     self.assertEqual('ota-property-files', property_files.name)
   1118     self.assertEqual((), property_files.required)
   1119     self.assertEqual((), property_files.optional)
   1120 
   1121   def test_Compute(self):
   1122     entries = ()
   1123     zip_file = self.construct_zip_package(entries)
   1124     property_files = NonAbOtaPropertyFiles()
   1125     with zipfile.ZipFile(zip_file) as zip_fp:
   1126       property_files_string = property_files.Compute(zip_fp)
   1127 
   1128     tokens = self._parse_property_files_string(property_files_string)
   1129     self.assertEqual(1, len(tokens))
   1130     self._verify_entries(zip_file, tokens, entries)
   1131 
   1132   def test_Finalize(self):
   1133     entries = [
   1134         'META-INF/com/android/metadata',
   1135     ]
   1136     zip_file = self.construct_zip_package(entries)
   1137     property_files = NonAbOtaPropertyFiles()
   1138     with zipfile.ZipFile(zip_file) as zip_fp:
   1139       raw_metadata = property_files.GetPropertyFilesString(
   1140           zip_fp, reserve_space=False)
   1141       property_files_string = property_files.Finalize(zip_fp, len(raw_metadata))
   1142     tokens = self._parse_property_files_string(property_files_string)
   1143 
   1144     self.assertEqual(1, len(tokens))
   1145     # 'META-INF/com/android/metadata' will be key'd as 'metadata'.
   1146     entries[0] = 'metadata'
   1147     self._verify_entries(zip_file, tokens, entries)
   1148 
   1149   def test_Verify(self):
   1150     entries = (
   1151         'META-INF/com/android/metadata',
   1152     )
   1153     zip_file = self.construct_zip_package(entries)
   1154     property_files = NonAbOtaPropertyFiles()
   1155     with zipfile.ZipFile(zip_file) as zip_fp:
   1156       raw_metadata = property_files.GetPropertyFilesString(
   1157           zip_fp, reserve_space=False)
   1158 
   1159       property_files.Verify(zip_fp, raw_metadata)
   1160 
   1161 
   1162 class PayloadSignerTest(test_utils.ReleaseToolsTestCase):
   1163 
   1164   SIGFILE = 'sigfile.bin'
   1165   SIGNED_SIGFILE = 'signed-sigfile.bin'
   1166 
   1167   def setUp(self):
   1168     self.testdata_dir = test_utils.get_testdata_dir()
   1169     self.assertTrue(os.path.exists(self.testdata_dir))
   1170 
   1171     common.OPTIONS.payload_signer = None
   1172     common.OPTIONS.payload_signer_args = []
   1173     common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
   1174     common.OPTIONS.key_passwords = {
   1175         common.OPTIONS.package_key : None,
   1176     }
   1177 
   1178   def _assertFilesEqual(self, file1, file2):
   1179     with open(file1, 'rb') as fp1, open(file2, 'rb') as fp2:
   1180       self.assertEqual(fp1.read(), fp2.read())
   1181 
   1182   def test_init(self):
   1183     payload_signer = PayloadSigner()
   1184     self.assertEqual('openssl', payload_signer.signer)
   1185     self.assertEqual(256, payload_signer.key_size)
   1186 
   1187   def test_init_withPassword(self):
   1188     common.OPTIONS.package_key = os.path.join(
   1189         self.testdata_dir, 'testkey_with_passwd')
   1190     common.OPTIONS.key_passwords = {
   1191         common.OPTIONS.package_key : 'foo',
   1192     }
   1193     payload_signer = PayloadSigner()
   1194     self.assertEqual('openssl', payload_signer.signer)
   1195 
   1196   def test_init_withExternalSigner(self):
   1197     common.OPTIONS.payload_signer = 'abc'
   1198     common.OPTIONS.payload_signer_args = ['arg1', 'arg2']
   1199     common.OPTIONS.payload_signer_key_size = '512'
   1200     payload_signer = PayloadSigner()
   1201     self.assertEqual('abc', payload_signer.signer)
   1202     self.assertEqual(['arg1', 'arg2'], payload_signer.signer_args)
   1203     self.assertEqual(512, payload_signer.key_size)
   1204 
   1205   def test_GetKeySizeInBytes_512Bytes(self):
   1206     signing_key = os.path.join(self.testdata_dir, 'testkey_RSA4096.key')
   1207     key_size = PayloadSigner._GetKeySizeInBytes(signing_key)
   1208     self.assertEqual(512, key_size)
   1209 
   1210   def test_Sign(self):
   1211     payload_signer = PayloadSigner()
   1212     input_file = os.path.join(self.testdata_dir, self.SIGFILE)
   1213     signed_file = payload_signer.Sign(input_file)
   1214 
   1215     verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
   1216     self._assertFilesEqual(verify_file, signed_file)
   1217 
   1218   def test_Sign_withExternalSigner_openssl(self):
   1219     """Uses openssl as the external payload signer."""
   1220     common.OPTIONS.payload_signer = 'openssl'
   1221     common.OPTIONS.payload_signer_args = [
   1222         'pkeyutl', '-sign', '-keyform', 'DER', '-inkey',
   1223         os.path.join(self.testdata_dir, 'testkey.pk8'),
   1224         '-pkeyopt', 'digest:sha256']
   1225     payload_signer = PayloadSigner()
   1226     input_file = os.path.join(self.testdata_dir, self.SIGFILE)
   1227     signed_file = payload_signer.Sign(input_file)
   1228 
   1229     verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
   1230     self._assertFilesEqual(verify_file, signed_file)
   1231 
   1232   def test_Sign_withExternalSigner_script(self):
   1233     """Uses testdata/payload_signer.sh as the external payload signer."""
   1234     common.OPTIONS.payload_signer = os.path.join(
   1235         self.testdata_dir, 'payload_signer.sh')
   1236     common.OPTIONS.payload_signer_args = [
   1237         os.path.join(self.testdata_dir, 'testkey.pk8')]
   1238     payload_signer = PayloadSigner()
   1239     input_file = os.path.join(self.testdata_dir, self.SIGFILE)
   1240     signed_file = payload_signer.Sign(input_file)
   1241 
   1242     verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
   1243     self._assertFilesEqual(verify_file, signed_file)
   1244 
   1245 
   1246 class PayloadTest(test_utils.ReleaseToolsTestCase):
   1247 
   1248   def setUp(self):
   1249     self.testdata_dir = test_utils.get_testdata_dir()
   1250     self.assertTrue(os.path.exists(self.testdata_dir))
   1251 
   1252     common.OPTIONS.wipe_user_data = False
   1253     common.OPTIONS.payload_signer = None
   1254     common.OPTIONS.payload_signer_args = None
   1255     common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
   1256     common.OPTIONS.key_passwords = {
   1257         common.OPTIONS.package_key : None,
   1258     }
   1259 
   1260   @staticmethod
   1261   def _create_payload_full(secondary=False):
   1262     target_file = construct_target_files(secondary)
   1263     payload = Payload(secondary)
   1264     payload.Generate(target_file)
   1265     return payload
   1266 
   1267   @staticmethod
   1268   def _create_payload_incremental():
   1269     target_file = construct_target_files()
   1270     source_file = construct_target_files()
   1271     payload = Payload()
   1272     payload.Generate(target_file, source_file)
   1273     return payload
   1274 
   1275   def test_Generate_full(self):
   1276     payload = self._create_payload_full()
   1277     self.assertTrue(os.path.exists(payload.payload_file))
   1278 
   1279   def test_Generate_incremental(self):
   1280     payload = self._create_payload_incremental()
   1281     self.assertTrue(os.path.exists(payload.payload_file))
   1282 
   1283   def test_Generate_additionalArgs(self):
   1284     target_file = construct_target_files()
   1285     source_file = construct_target_files()
   1286     payload = Payload()
   1287     # This should work the same as calling payload.Generate(target_file,
   1288     # source_file).
   1289     payload.Generate(
   1290         target_file, additional_args=["--source_image", source_file])
   1291     self.assertTrue(os.path.exists(payload.payload_file))
   1292 
   1293   def test_Generate_invalidInput(self):
   1294     target_file = construct_target_files()
   1295     common.ZipDelete(target_file, 'IMAGES/vendor.img')
   1296     payload = Payload()
   1297     self.assertRaises(common.ExternalError, payload.Generate, target_file)
   1298 
   1299   def test_Sign_full(self):
   1300     payload = self._create_payload_full()
   1301     payload.Sign(PayloadSigner())
   1302 
   1303     output_file = common.MakeTempFile(suffix='.zip')
   1304     with zipfile.ZipFile(output_file, 'w') as output_zip:
   1305       payload.WriteToZip(output_zip)
   1306 
   1307     import check_ota_package_signature
   1308     check_ota_package_signature.VerifyAbOtaPayload(
   1309         os.path.join(self.testdata_dir, 'testkey.x509.pem'),
   1310         output_file)
   1311 
   1312   def test_Sign_incremental(self):
   1313     payload = self._create_payload_incremental()
   1314     payload.Sign(PayloadSigner())
   1315 
   1316     output_file = common.MakeTempFile(suffix='.zip')
   1317     with zipfile.ZipFile(output_file, 'w') as output_zip:
   1318       payload.WriteToZip(output_zip)
   1319 
   1320     import check_ota_package_signature
   1321     check_ota_package_signature.VerifyAbOtaPayload(
   1322         os.path.join(self.testdata_dir, 'testkey.x509.pem'),
   1323         output_file)
   1324 
   1325   def test_Sign_withDataWipe(self):
   1326     common.OPTIONS.wipe_user_data = True
   1327     payload = self._create_payload_full()
   1328     payload.Sign(PayloadSigner())
   1329 
   1330     with open(payload.payload_properties) as properties_fp:
   1331       self.assertIn("POWERWASH=1", properties_fp.read())
   1332 
   1333   def test_Sign_secondary(self):
   1334     payload = self._create_payload_full(secondary=True)
   1335     payload.Sign(PayloadSigner())
   1336 
   1337     with open(payload.payload_properties) as properties_fp:
   1338       self.assertIn("SWITCH_SLOT_ON_REBOOT=0", properties_fp.read())
   1339 
   1340   def test_Sign_badSigner(self):
   1341     """Tests that signing failure can be captured."""
   1342     payload = self._create_payload_full()
   1343     payload_signer = PayloadSigner()
   1344     payload_signer.signer_args.append('bad-option')
   1345     self.assertRaises(common.ExternalError, payload.Sign, payload_signer)
   1346 
   1347   def test_WriteToZip(self):
   1348     payload = self._create_payload_full()
   1349     payload.Sign(PayloadSigner())
   1350 
   1351     output_file = common.MakeTempFile(suffix='.zip')
   1352     with zipfile.ZipFile(output_file, 'w') as output_zip:
   1353       payload.WriteToZip(output_zip)
   1354 
   1355     with zipfile.ZipFile(output_file) as verify_zip:
   1356       # First make sure we have the essential entries.
   1357       namelist = verify_zip.namelist()
   1358       self.assertIn(Payload.PAYLOAD_BIN, namelist)
   1359       self.assertIn(Payload.PAYLOAD_PROPERTIES_TXT, namelist)
   1360 
   1361       # Then assert these entries are stored.
   1362       for entry_info in verify_zip.infolist():
   1363         if entry_info.filename not in (Payload.PAYLOAD_BIN,
   1364                                        Payload.PAYLOAD_PROPERTIES_TXT):
   1365           continue
   1366         self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
   1367 
   1368   def test_WriteToZip_unsignedPayload(self):
   1369     """Unsigned payloads should not be allowed to be written to zip."""
   1370     payload = self._create_payload_full()
   1371 
   1372     output_file = common.MakeTempFile(suffix='.zip')
   1373     with zipfile.ZipFile(output_file, 'w') as output_zip:
   1374       self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
   1375 
   1376     # Also test with incremental payload.
   1377     payload = self._create_payload_incremental()
   1378 
   1379     output_file = common.MakeTempFile(suffix='.zip')
   1380     with zipfile.ZipFile(output_file, 'w') as output_zip:
   1381       self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
   1382 
   1383   def test_WriteToZip_secondary(self):
   1384     payload = self._create_payload_full(secondary=True)
   1385     payload.Sign(PayloadSigner())
   1386 
   1387     output_file = common.MakeTempFile(suffix='.zip')
   1388     with zipfile.ZipFile(output_file, 'w') as output_zip:
   1389       payload.WriteToZip(output_zip)
   1390 
   1391     with zipfile.ZipFile(output_file) as verify_zip:
   1392       # First make sure we have the essential entries.
   1393       namelist = verify_zip.namelist()
   1394       self.assertIn(Payload.SECONDARY_PAYLOAD_BIN, namelist)
   1395       self.assertIn(Payload.SECONDARY_PAYLOAD_PROPERTIES_TXT, namelist)
   1396 
   1397       # Then assert these entries are stored.
   1398       for entry_info in verify_zip.infolist():
   1399         if entry_info.filename not in (
   1400             Payload.SECONDARY_PAYLOAD_BIN,
   1401             Payload.SECONDARY_PAYLOAD_PROPERTIES_TXT):
   1402           continue
   1403         self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
   1404