Home | History | Annotate | Download | only in releasetools
      1 #
      2 # Copyright (C) 2018 The Android Open Source Project
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #      http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 #
     16 
     17 import copy
     18 import os
     19 import os.path
     20 import subprocess
     21 import unittest
     22 import zipfile
     23 
     24 import common
     25 import test_utils
     26 from ota_from_target_files import (
     27     _LoadOemDicts, AbOtaPropertyFiles, BuildInfo, FinalizeMetadata,
     28     GetPackageMetadata, GetTargetFilesZipForSecondaryImages,
     29     GetTargetFilesZipWithoutPostinstallConfig, NonAbOtaPropertyFiles,
     30     Payload, PayloadSigner, POSTINSTALL_CONFIG, PropertyFiles,
     31     StreamingPropertyFiles, WriteFingerprintAssertion)
     32 
     33 
     34 def construct_target_files(secondary=False):
     35   """Returns a target-files.zip file for generating OTA packages."""
     36   target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
     37   with zipfile.ZipFile(target_files, 'w') as target_files_zip:
     38     # META/update_engine_config.txt
     39     target_files_zip.writestr(
     40         'META/update_engine_config.txt',
     41         "PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n")
     42 
     43     # META/postinstall_config.txt
     44     target_files_zip.writestr(
     45         POSTINSTALL_CONFIG,
     46         '\n'.join([
     47             "RUN_POSTINSTALL_system=true",
     48             "POSTINSTALL_PATH_system=system/bin/otapreopt_script",
     49             "FILESYSTEM_TYPE_system=ext4",
     50             "POSTINSTALL_OPTIONAL_system=true",
     51         ]))
     52 
     53     # META/ab_partitions.txt
     54     ab_partitions = ['boot', 'system', 'vendor']
     55     target_files_zip.writestr(
     56         'META/ab_partitions.txt',
     57         '\n'.join(ab_partitions))
     58 
     59     # Create dummy images for each of them.
     60     for partition in ab_partitions:
     61       target_files_zip.writestr('IMAGES/' + partition + '.img',
     62                                 os.urandom(len(partition)))
     63 
     64     if secondary:
     65       target_files_zip.writestr('IMAGES/system_other.img',
     66                                 os.urandom(len("system_other")))
     67 
     68   return target_files
     69 
     70 
     71 class MockScriptWriter(object):
     72   """A class that mocks edify_generator.EdifyGenerator.
     73 
     74   It simply pushes the incoming arguments onto script stack, which is to assert
     75   the calls to EdifyGenerator functions.
     76   """
     77 
     78   def __init__(self):
     79     self.script = []
     80 
     81   def Mount(self, *args):
     82     self.script.append(('Mount',) + args)
     83 
     84   def AssertDevice(self, *args):
     85     self.script.append(('AssertDevice',) + args)
     86 
     87   def AssertOemProperty(self, *args):
     88     self.script.append(('AssertOemProperty',) + args)
     89 
     90   def AssertFingerprintOrThumbprint(self, *args):
     91     self.script.append(('AssertFingerprintOrThumbprint',) + args)
     92 
     93   def AssertSomeFingerprint(self, *args):
     94     self.script.append(('AssertSomeFingerprint',) + args)
     95 
     96   def AssertSomeThumbprint(self, *args):
     97     self.script.append(('AssertSomeThumbprint',) + args)
     98 
     99 
    100 class BuildInfoTest(unittest.TestCase):
    101 
    102   TEST_INFO_DICT = {
    103       'build.prop' : {
    104           'ro.product.device' : 'product-device',
    105           'ro.product.name' : 'product-name',
    106           'ro.build.fingerprint' : 'build-fingerprint',
    107           'ro.build.foo' : 'build-foo',
    108       },
    109       'vendor.build.prop' : {
    110           'ro.vendor.build.fingerprint' : 'vendor-build-fingerprint',
    111       },
    112       'property1' : 'value1',
    113       'property2' : 4096,
    114   }
    115 
    116   TEST_INFO_DICT_USES_OEM_PROPS = {
    117       'build.prop' : {
    118           'ro.product.name' : 'product-name',
    119           'ro.build.thumbprint' : 'build-thumbprint',
    120           'ro.build.bar' : 'build-bar',
    121       },
    122       'vendor.build.prop' : {
    123           'ro.vendor.build.fingerprint' : 'vendor-build-fingerprint',
    124       },
    125       'property1' : 'value1',
    126       'property2' : 4096,
    127       'oem_fingerprint_properties' : 'ro.product.device ro.product.brand',
    128   }
    129 
    130   TEST_OEM_DICTS = [
    131       {
    132           'ro.product.brand' : 'brand1',
    133           'ro.product.device' : 'device1',
    134       },
    135       {
    136           'ro.product.brand' : 'brand2',
    137           'ro.product.device' : 'device2',
    138       },
    139       {
    140           'ro.product.brand' : 'brand3',
    141           'ro.product.device' : 'device3',
    142       },
    143   ]
    144 
    145   def test_init(self):
    146     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    147     self.assertEqual('product-device', target_info.device)
    148     self.assertEqual('build-fingerprint', target_info.fingerprint)
    149     self.assertFalse(target_info.is_ab)
    150     self.assertIsNone(target_info.oem_props)
    151 
    152   def test_init_with_oem_props(self):
    153     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    154                             self.TEST_OEM_DICTS)
    155     self.assertEqual('device1', target_info.device)
    156     self.assertEqual('brand1/product-name/device1:build-thumbprint',
    157                      target_info.fingerprint)
    158 
    159     # Swap the order in oem_dicts, which would lead to different BuildInfo.
    160     oem_dicts = copy.copy(self.TEST_OEM_DICTS)
    161     oem_dicts[0], oem_dicts[2] = oem_dicts[2], oem_dicts[0]
    162     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, oem_dicts)
    163     self.assertEqual('device3', target_info.device)
    164     self.assertEqual('brand3/product-name/device3:build-thumbprint',
    165                      target_info.fingerprint)
    166 
    167     # Missing oem_dict should be rejected.
    168     self.assertRaises(AssertionError, BuildInfo,
    169                       self.TEST_INFO_DICT_USES_OEM_PROPS, None)
    170 
    171   def test___getitem__(self):
    172     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    173     self.assertEqual('value1', target_info['property1'])
    174     self.assertEqual(4096, target_info['property2'])
    175     self.assertEqual('build-foo', target_info['build.prop']['ro.build.foo'])
    176 
    177   def test___getitem__with_oem_props(self):
    178     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    179                             self.TEST_OEM_DICTS)
    180     self.assertEqual('value1', target_info['property1'])
    181     self.assertEqual(4096, target_info['property2'])
    182     self.assertRaises(KeyError,
    183                       lambda: target_info['build.prop']['ro.build.foo'])
    184 
    185   def test_get(self):
    186     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    187     self.assertEqual('value1', target_info.get('property1'))
    188     self.assertEqual(4096, target_info.get('property2'))
    189     self.assertEqual(4096, target_info.get('property2', 1024))
    190     self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
    191     self.assertEqual('build-foo', target_info.get('build.prop')['ro.build.foo'])
    192 
    193   def test_get_with_oem_props(self):
    194     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    195                             self.TEST_OEM_DICTS)
    196     self.assertEqual('value1', target_info.get('property1'))
    197     self.assertEqual(4096, target_info.get('property2'))
    198     self.assertEqual(4096, target_info.get('property2', 1024))
    199     self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
    200     self.assertIsNone(target_info.get('build.prop').get('ro.build.foo'))
    201     self.assertRaises(KeyError,
    202                       lambda: target_info.get('build.prop')['ro.build.foo'])
    203 
    204   def test_GetBuildProp(self):
    205     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    206     self.assertEqual('build-foo', target_info.GetBuildProp('ro.build.foo'))
    207     self.assertRaises(common.ExternalError, target_info.GetBuildProp,
    208                       'ro.build.nonexistent')
    209 
    210   def test_GetBuildProp_with_oem_props(self):
    211     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    212                             self.TEST_OEM_DICTS)
    213     self.assertEqual('build-bar', target_info.GetBuildProp('ro.build.bar'))
    214     self.assertRaises(common.ExternalError, target_info.GetBuildProp,
    215                       'ro.build.nonexistent')
    216 
    217   def test_GetVendorBuildProp(self):
    218     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    219     self.assertEqual('vendor-build-fingerprint',
    220                      target_info.GetVendorBuildProp(
    221                          'ro.vendor.build.fingerprint'))
    222     self.assertRaises(common.ExternalError, target_info.GetVendorBuildProp,
    223                       'ro.build.nonexistent')
    224 
    225   def test_GetVendorBuildProp_with_oem_props(self):
    226     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    227                             self.TEST_OEM_DICTS)
    228     self.assertEqual('vendor-build-fingerprint',
    229                      target_info.GetVendorBuildProp(
    230                          'ro.vendor.build.fingerprint'))
    231     self.assertRaises(common.ExternalError, target_info.GetVendorBuildProp,
    232                       'ro.build.nonexistent')
    233 
    234   def test_WriteMountOemScript(self):
    235     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    236                             self.TEST_OEM_DICTS)
    237     script_writer = MockScriptWriter()
    238     target_info.WriteMountOemScript(script_writer)
    239     self.assertEqual([('Mount', '/oem', None)], script_writer.script)
    240 
    241   def test_WriteDeviceAssertions(self):
    242     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    243     script_writer = MockScriptWriter()
    244     target_info.WriteDeviceAssertions(script_writer, False)
    245     self.assertEqual([('AssertDevice', 'product-device')], script_writer.script)
    246 
    247   def test_WriteDeviceAssertions_with_oem_props(self):
    248     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    249                             self.TEST_OEM_DICTS)
    250     script_writer = MockScriptWriter()
    251     target_info.WriteDeviceAssertions(script_writer, False)
    252     self.assertEqual(
    253         [
    254             ('AssertOemProperty', 'ro.product.device',
    255              ['device1', 'device2', 'device3'], False),
    256             ('AssertOemProperty', 'ro.product.brand',
    257              ['brand1', 'brand2', 'brand3'], False),
    258         ],
    259         script_writer.script)
    260 
    261   def test_WriteFingerprintAssertion_without_oem_props(self):
    262     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    263     source_info_dict = copy.deepcopy(self.TEST_INFO_DICT)
    264     source_info_dict['build.prop']['ro.build.fingerprint'] = (
    265         'source-build-fingerprint')
    266     source_info = BuildInfo(source_info_dict, None)
    267 
    268     script_writer = MockScriptWriter()
    269     WriteFingerprintAssertion(script_writer, target_info, source_info)
    270     self.assertEqual(
    271         [('AssertSomeFingerprint', 'source-build-fingerprint',
    272           'build-fingerprint')],
    273         script_writer.script)
    274 
    275   def test_WriteFingerprintAssertion_with_source_oem_props(self):
    276     target_info = BuildInfo(self.TEST_INFO_DICT, None)
    277     source_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    278                             self.TEST_OEM_DICTS)
    279 
    280     script_writer = MockScriptWriter()
    281     WriteFingerprintAssertion(script_writer, target_info, source_info)
    282     self.assertEqual(
    283         [('AssertFingerprintOrThumbprint', 'build-fingerprint',
    284           'build-thumbprint')],
    285         script_writer.script)
    286 
    287   def test_WriteFingerprintAssertion_with_target_oem_props(self):
    288     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    289                             self.TEST_OEM_DICTS)
    290     source_info = BuildInfo(self.TEST_INFO_DICT, None)
    291 
    292     script_writer = MockScriptWriter()
    293     WriteFingerprintAssertion(script_writer, target_info, source_info)
    294     self.assertEqual(
    295         [('AssertFingerprintOrThumbprint', 'build-fingerprint',
    296           'build-thumbprint')],
    297         script_writer.script)
    298 
    299   def test_WriteFingerprintAssertion_with_both_oem_props(self):
    300     target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
    301                             self.TEST_OEM_DICTS)
    302     source_info_dict = copy.deepcopy(self.TEST_INFO_DICT_USES_OEM_PROPS)
    303     source_info_dict['build.prop']['ro.build.thumbprint'] = (
    304         'source-build-thumbprint')
    305     source_info = BuildInfo(source_info_dict, self.TEST_OEM_DICTS)
    306 
    307     script_writer = MockScriptWriter()
    308     WriteFingerprintAssertion(script_writer, target_info, source_info)
    309     self.assertEqual(
    310         [('AssertSomeThumbprint', 'build-thumbprint',
    311           'source-build-thumbprint')],
    312         script_writer.script)
    313 
    314 
    315 class LoadOemDictsTest(unittest.TestCase):
    316 
    317   def tearDown(self):
    318     common.Cleanup()
    319 
    320   def test_NoneDict(self):
    321     self.assertIsNone(_LoadOemDicts(None))
    322 
    323   def test_SingleDict(self):
    324     dict_file = common.MakeTempFile()
    325     with open(dict_file, 'w') as dict_fp:
    326       dict_fp.write('abc=1\ndef=2\nxyz=foo\na.b.c=bar\n')
    327 
    328     oem_dicts = _LoadOemDicts([dict_file])
    329     self.assertEqual(1, len(oem_dicts))
    330     self.assertEqual('foo', oem_dicts[0]['xyz'])
    331     self.assertEqual('bar', oem_dicts[0]['a.b.c'])
    332 
    333   def test_MultipleDicts(self):
    334     oem_source = []
    335     for i in range(3):
    336       dict_file = common.MakeTempFile()
    337       with open(dict_file, 'w') as dict_fp:
    338         dict_fp.write(
    339             'ro.build.index={}\ndef=2\nxyz=foo\na.b.c=bar\n'.format(i))
    340       oem_source.append(dict_file)
    341 
    342     oem_dicts = _LoadOemDicts(oem_source)
    343     self.assertEqual(3, len(oem_dicts))
    344     for i, oem_dict in enumerate(oem_dicts):
    345       self.assertEqual('2', oem_dict['def'])
    346       self.assertEqual('foo', oem_dict['xyz'])
    347       self.assertEqual('bar', oem_dict['a.b.c'])
    348       self.assertEqual('{}'.format(i), oem_dict['ro.build.index'])
    349 
    350 
    351 class OtaFromTargetFilesTest(unittest.TestCase):
    352 
    353   TEST_TARGET_INFO_DICT = {
    354       'build.prop' : {
    355           'ro.product.device' : 'product-device',
    356           'ro.build.fingerprint' : 'build-fingerprint-target',
    357           'ro.build.version.incremental' : 'build-version-incremental-target',
    358           'ro.build.version.sdk' : '27',
    359           'ro.build.version.security_patch' : '2017-12-01',
    360           'ro.build.date.utc' : '1500000000',
    361       },
    362   }
    363 
    364   TEST_SOURCE_INFO_DICT = {
    365       'build.prop' : {
    366           'ro.product.device' : 'product-device',
    367           'ro.build.fingerprint' : 'build-fingerprint-source',
    368           'ro.build.version.incremental' : 'build-version-incremental-source',
    369           'ro.build.version.sdk' : '25',
    370           'ro.build.version.security_patch' : '2016-12-01',
    371           'ro.build.date.utc' : '1400000000',
    372       },
    373   }
    374 
    375   def setUp(self):
    376     self.testdata_dir = test_utils.get_testdata_dir()
    377     self.assertTrue(os.path.exists(self.testdata_dir))
    378 
    379     # Reset the global options as in ota_from_target_files.py.
    380     common.OPTIONS.incremental_source = None
    381     common.OPTIONS.downgrade = False
    382     common.OPTIONS.timestamp = False
    383     common.OPTIONS.wipe_user_data = False
    384     common.OPTIONS.no_signing = False
    385     common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
    386     common.OPTIONS.key_passwords = {
    387         common.OPTIONS.package_key : None,
    388     }
    389 
    390     common.OPTIONS.search_path = test_utils.get_search_path()
    391     self.assertIsNotNone(common.OPTIONS.search_path)
    392 
    393   def tearDown(self):
    394     common.Cleanup()
    395 
    396   def test_GetPackageMetadata_abOta_full(self):
    397     target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
    398     target_info_dict['ab_update'] = 'true'
    399     target_info = BuildInfo(target_info_dict, None)
    400     metadata = GetPackageMetadata(target_info)
    401     self.assertDictEqual(
    402         {
    403             'ota-type' : 'AB',
    404             'ota-required-cache' : '0',
    405             'post-build' : 'build-fingerprint-target',
    406             'post-build-incremental' : 'build-version-incremental-target',
    407             'post-sdk-level' : '27',
    408             'post-security-patch-level' : '2017-12-01',
    409             'post-timestamp' : '1500000000',
    410             'pre-device' : 'product-device',
    411         },
    412         metadata)
    413 
    414   def test_GetPackageMetadata_abOta_incremental(self):
    415     target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
    416     target_info_dict['ab_update'] = 'true'
    417     target_info = BuildInfo(target_info_dict, None)
    418     source_info = BuildInfo(self.TEST_SOURCE_INFO_DICT, None)
    419     common.OPTIONS.incremental_source = ''
    420     metadata = GetPackageMetadata(target_info, source_info)
    421     self.assertDictEqual(
    422         {
    423             'ota-type' : 'AB',
    424             'ota-required-cache' : '0',
    425             'post-build' : 'build-fingerprint-target',
    426             'post-build-incremental' : 'build-version-incremental-target',
    427             'post-sdk-level' : '27',
    428             'post-security-patch-level' : '2017-12-01',
    429             'post-timestamp' : '1500000000',
    430             'pre-device' : 'product-device',
    431             'pre-build' : 'build-fingerprint-source',
    432             'pre-build-incremental' : 'build-version-incremental-source',
    433         },
    434         metadata)
    435 
    436   def test_GetPackageMetadata_nonAbOta_full(self):
    437     target_info = BuildInfo(self.TEST_TARGET_INFO_DICT, None)
    438     metadata = GetPackageMetadata(target_info)
    439     self.assertDictEqual(
    440         {
    441             'ota-type' : 'BLOCK',
    442             'post-build' : 'build-fingerprint-target',
    443             'post-build-incremental' : 'build-version-incremental-target',
    444             'post-sdk-level' : '27',
    445             'post-security-patch-level' : '2017-12-01',
    446             'post-timestamp' : '1500000000',
    447             'pre-device' : 'product-device',
    448         },
    449         metadata)
    450 
    451   def test_GetPackageMetadata_nonAbOta_incremental(self):
    452     target_info = BuildInfo(self.TEST_TARGET_INFO_DICT, None)
    453     source_info = BuildInfo(self.TEST_SOURCE_INFO_DICT, None)
    454     common.OPTIONS.incremental_source = ''
    455     metadata = GetPackageMetadata(target_info, source_info)
    456     self.assertDictEqual(
    457         {
    458             'ota-type' : 'BLOCK',
    459             'post-build' : 'build-fingerprint-target',
    460             'post-build-incremental' : 'build-version-incremental-target',
    461             'post-sdk-level' : '27',
    462             'post-security-patch-level' : '2017-12-01',
    463             'post-timestamp' : '1500000000',
    464             'pre-device' : 'product-device',
    465             'pre-build' : 'build-fingerprint-source',
    466             'pre-build-incremental' : 'build-version-incremental-source',
    467         },
    468         metadata)
    469 
    470   def test_GetPackageMetadata_wipe(self):
    471     target_info = BuildInfo(self.TEST_TARGET_INFO_DICT, None)
    472     common.OPTIONS.wipe_user_data = True
    473     metadata = GetPackageMetadata(target_info)
    474     self.assertDictEqual(
    475         {
    476             'ota-type' : 'BLOCK',
    477             'ota-wipe' : 'yes',
    478             'post-build' : 'build-fingerprint-target',
    479             'post-build-incremental' : 'build-version-incremental-target',
    480             'post-sdk-level' : '27',
    481             'post-security-patch-level' : '2017-12-01',
    482             'post-timestamp' : '1500000000',
    483             'pre-device' : 'product-device',
    484         },
    485         metadata)
    486 
    487   @staticmethod
    488   def _test_GetPackageMetadata_swapBuildTimestamps(target_info, source_info):
    489     (target_info['build.prop']['ro.build.date.utc'],
    490      source_info['build.prop']['ro.build.date.utc']) = (
    491          source_info['build.prop']['ro.build.date.utc'],
    492          target_info['build.prop']['ro.build.date.utc'])
    493 
    494   def test_GetPackageMetadata_unintentionalDowngradeDetected(self):
    495     target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
    496     source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
    497     self._test_GetPackageMetadata_swapBuildTimestamps(
    498         target_info_dict, source_info_dict)
    499 
    500     target_info = BuildInfo(target_info_dict, None)
    501     source_info = BuildInfo(source_info_dict, None)
    502     common.OPTIONS.incremental_source = ''
    503     self.assertRaises(RuntimeError, GetPackageMetadata, target_info,
    504                       source_info)
    505 
    506   def test_GetPackageMetadata_downgrade(self):
    507     target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
    508     source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
    509     self._test_GetPackageMetadata_swapBuildTimestamps(
    510         target_info_dict, source_info_dict)
    511 
    512     target_info = BuildInfo(target_info_dict, None)
    513     source_info = BuildInfo(source_info_dict, None)
    514     common.OPTIONS.incremental_source = ''
    515     common.OPTIONS.downgrade = True
    516     common.OPTIONS.wipe_user_data = True
    517     metadata = GetPackageMetadata(target_info, source_info)
    518     self.assertDictEqual(
    519         {
    520             'ota-downgrade' : 'yes',
    521             'ota-type' : 'BLOCK',
    522             'ota-wipe' : 'yes',
    523             'post-build' : 'build-fingerprint-target',
    524             'post-build-incremental' : 'build-version-incremental-target',
    525             'post-sdk-level' : '27',
    526             'post-security-patch-level' : '2017-12-01',
    527             'post-timestamp' : '1400000000',
    528             'pre-device' : 'product-device',
    529             'pre-build' : 'build-fingerprint-source',
    530             'pre-build-incremental' : 'build-version-incremental-source',
    531         },
    532         metadata)
    533 
    534   def test_GetTargetFilesZipForSecondaryImages(self):
    535     input_file = construct_target_files(secondary=True)
    536     target_file = GetTargetFilesZipForSecondaryImages(input_file)
    537 
    538     with zipfile.ZipFile(target_file) as verify_zip:
    539       namelist = verify_zip.namelist()
    540 
    541     self.assertIn('META/ab_partitions.txt', namelist)
    542     self.assertIn('IMAGES/boot.img', namelist)
    543     self.assertIn('IMAGES/system.img', namelist)
    544     self.assertIn('IMAGES/vendor.img', namelist)
    545     self.assertIn(POSTINSTALL_CONFIG, namelist)
    546 
    547     self.assertNotIn('IMAGES/system_other.img', namelist)
    548     self.assertNotIn('IMAGES/system.map', namelist)
    549 
    550   def test_GetTargetFilesZipForSecondaryImages_skipPostinstall(self):
    551     input_file = construct_target_files(secondary=True)
    552     target_file = GetTargetFilesZipForSecondaryImages(
    553         input_file, skip_postinstall=True)
    554 
    555     with zipfile.ZipFile(target_file) as verify_zip:
    556       namelist = verify_zip.namelist()
    557 
    558     self.assertIn('META/ab_partitions.txt', namelist)
    559     self.assertIn('IMAGES/boot.img', namelist)
    560     self.assertIn('IMAGES/system.img', namelist)
    561     self.assertIn('IMAGES/vendor.img', namelist)
    562 
    563     self.assertNotIn('IMAGES/system_other.img', namelist)
    564     self.assertNotIn('IMAGES/system.map', namelist)
    565     self.assertNotIn(POSTINSTALL_CONFIG, namelist)
    566 
    567   def test_GetTargetFilesZipWithoutPostinstallConfig(self):
    568     input_file = construct_target_files()
    569     target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file)
    570     with zipfile.ZipFile(target_file) as verify_zip:
    571       self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
    572 
    573   def test_GetTargetFilesZipWithoutPostinstallConfig_missingEntry(self):
    574     input_file = construct_target_files()
    575     common.ZipDelete(input_file, POSTINSTALL_CONFIG)
    576     target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file)
    577     with zipfile.ZipFile(target_file) as verify_zip:
    578       self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
    579 
    580   def _test_FinalizeMetadata(self, large_entry=False):
    581     entries = [
    582         'required-entry1',
    583         'required-entry2',
    584     ]
    585     zip_file = PropertyFilesTest.construct_zip_package(entries)
    586     # Add a large entry of 1 GiB if requested.
    587     if large_entry:
    588       with zipfile.ZipFile(zip_file, 'a') as zip_fp:
    589         zip_fp.writestr(
    590             # Using 'zoo' so that the entry stays behind others after signing.
    591             'zoo',
    592             'A' * 1024 * 1024 * 1024,
    593             zipfile.ZIP_STORED)
    594 
    595     metadata = {}
    596     output_file = common.MakeTempFile(suffix='.zip')
    597     needed_property_files = (
    598         TestPropertyFiles(),
    599     )
    600     FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
    601     self.assertIn('ota-test-property-files', metadata)
    602 
    603   def test_FinalizeMetadata(self):
    604     self._test_FinalizeMetadata()
    605 
    606   def test_FinalizeMetadata_withNoSigning(self):
    607     common.OPTIONS.no_signing = True
    608     self._test_FinalizeMetadata()
    609 
    610   def test_FinalizeMetadata_largeEntry(self):
    611     self._test_FinalizeMetadata(large_entry=True)
    612 
    613   def test_FinalizeMetadata_largeEntry_withNoSigning(self):
    614     common.OPTIONS.no_signing = True
    615     self._test_FinalizeMetadata(large_entry=True)
    616 
    617   def test_FinalizeMetadata_insufficientSpace(self):
    618     entries = [
    619         'required-entry1',
    620         'required-entry2',
    621         'optional-entry1',
    622         'optional-entry2',
    623     ]
    624     zip_file = PropertyFilesTest.construct_zip_package(entries)
    625     with zipfile.ZipFile(zip_file, 'a') as zip_fp:
    626       zip_fp.writestr(
    627           # 'foo-entry1' will appear ahead of all other entries (in alphabetical
    628           # order) after the signing, which will in turn trigger the
    629           # InsufficientSpaceException and an automatic retry.
    630           'foo-entry1',
    631           'A' * 1024 * 1024,
    632           zipfile.ZIP_STORED)
    633 
    634     metadata = {}
    635     needed_property_files = (
    636         TestPropertyFiles(),
    637     )
    638     output_file = common.MakeTempFile(suffix='.zip')
    639     FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
    640     self.assertIn('ota-test-property-files', metadata)
    641 
    642 
    643 class TestPropertyFiles(PropertyFiles):
    644   """A class that extends PropertyFiles for testing purpose."""
    645 
    646   def __init__(self):
    647     super(TestPropertyFiles, self).__init__()
    648     self.name = 'ota-test-property-files'
    649     self.required = (
    650         'required-entry1',
    651         'required-entry2',
    652     )
    653     self.optional = (
    654         'optional-entry1',
    655         'optional-entry2',
    656     )
    657 
    658 
    659 class PropertyFilesTest(unittest.TestCase):
    660 
    661   def setUp(self):
    662     common.OPTIONS.no_signing = False
    663 
    664   def tearDown(self):
    665     common.Cleanup()
    666 
    667   @staticmethod
    668   def construct_zip_package(entries):
    669     zip_file = common.MakeTempFile(suffix='.zip')
    670     with zipfile.ZipFile(zip_file, 'w') as zip_fp:
    671       for entry in entries:
    672         zip_fp.writestr(
    673             entry,
    674             entry.replace('.', '-').upper(),
    675             zipfile.ZIP_STORED)
    676     return zip_file
    677 
    678   @staticmethod
    679   def _parse_property_files_string(data):
    680     result = {}
    681     for token in data.split(','):
    682       name, info = token.split(':', 1)
    683       result[name] = info
    684     return result
    685 
    686   def _verify_entries(self, input_file, tokens, entries):
    687     for entry in entries:
    688       offset, size = map(int, tokens[entry].split(':'))
    689       with open(input_file, 'rb') as input_fp:
    690         input_fp.seek(offset)
    691         if entry == 'metadata':
    692           expected = b'META-INF/COM/ANDROID/METADATA'
    693         else:
    694           expected = entry.replace('.', '-').upper().encode()
    695         self.assertEqual(expected, input_fp.read(size))
    696 
    697   def test_Compute(self):
    698     entries = (
    699         'required-entry1',
    700         'required-entry2',
    701     )
    702     zip_file = self.construct_zip_package(entries)
    703     property_files = TestPropertyFiles()
    704     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    705       property_files_string = property_files.Compute(zip_fp)
    706 
    707     tokens = self._parse_property_files_string(property_files_string)
    708     self.assertEqual(3, len(tokens))
    709     self._verify_entries(zip_file, tokens, entries)
    710 
    711   def test_Compute_withOptionalEntries(self):
    712     entries = (
    713         'required-entry1',
    714         'required-entry2',
    715         'optional-entry1',
    716         'optional-entry2',
    717     )
    718     zip_file = self.construct_zip_package(entries)
    719     property_files = TestPropertyFiles()
    720     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    721       property_files_string = property_files.Compute(zip_fp)
    722 
    723     tokens = self._parse_property_files_string(property_files_string)
    724     self.assertEqual(5, len(tokens))
    725     self._verify_entries(zip_file, tokens, entries)
    726 
    727   def test_Compute_missingRequiredEntry(self):
    728     entries = (
    729         'required-entry2',
    730     )
    731     zip_file = self.construct_zip_package(entries)
    732     property_files = TestPropertyFiles()
    733     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    734       self.assertRaises(KeyError, property_files.Compute, zip_fp)
    735 
    736   def test_Finalize(self):
    737     entries = [
    738         'required-entry1',
    739         'required-entry2',
    740         'META-INF/com/android/metadata',
    741     ]
    742     zip_file = self.construct_zip_package(entries)
    743     property_files = TestPropertyFiles()
    744     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    745       # pylint: disable=protected-access
    746       raw_metadata = property_files._GetPropertyFilesString(
    747           zip_fp, reserve_space=False)
    748       streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata))
    749     tokens = self._parse_property_files_string(streaming_metadata)
    750 
    751     self.assertEqual(3, len(tokens))
    752     # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
    753     # streaming metadata.
    754     entries[2] = 'metadata'
    755     self._verify_entries(zip_file, tokens, entries)
    756 
    757   def test_Finalize_assertReservedLength(self):
    758     entries = (
    759         'required-entry1',
    760         'required-entry2',
    761         'optional-entry1',
    762         'optional-entry2',
    763         'META-INF/com/android/metadata',
    764     )
    765     zip_file = self.construct_zip_package(entries)
    766     property_files = TestPropertyFiles()
    767     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    768       # First get the raw metadata string (i.e. without padding space).
    769       # pylint: disable=protected-access
    770       raw_metadata = property_files._GetPropertyFilesString(
    771           zip_fp, reserve_space=False)
    772       raw_length = len(raw_metadata)
    773 
    774       # Now pass in the exact expected length.
    775       streaming_metadata = property_files.Finalize(zip_fp, raw_length)
    776       self.assertEqual(raw_length, len(streaming_metadata))
    777 
    778       # Or pass in insufficient length.
    779       self.assertRaises(
    780           PropertyFiles.InsufficientSpaceException,
    781           property_files.Finalize,
    782           zip_fp,
    783           raw_length - 1)
    784 
    785       # Or pass in a much larger size.
    786       streaming_metadata = property_files.Finalize(
    787           zip_fp,
    788           raw_length + 20)
    789       self.assertEqual(raw_length + 20, len(streaming_metadata))
    790       self.assertEqual(' ' * 20, streaming_metadata[raw_length:])
    791 
    792   def test_Verify(self):
    793     entries = (
    794         'required-entry1',
    795         'required-entry2',
    796         'optional-entry1',
    797         'optional-entry2',
    798         'META-INF/com/android/metadata',
    799     )
    800     zip_file = self.construct_zip_package(entries)
    801     property_files = TestPropertyFiles()
    802     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    803       # First get the raw metadata string (i.e. without padding space).
    804       # pylint: disable=protected-access
    805       raw_metadata = property_files._GetPropertyFilesString(
    806           zip_fp, reserve_space=False)
    807 
    808       # Should pass the test if verification passes.
    809       property_files.Verify(zip_fp, raw_metadata)
    810 
    811       # Or raise on verification failure.
    812       self.assertRaises(
    813           AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
    814 
    815 
    816 class StreamingPropertyFilesTest(PropertyFilesTest):
    817   """Additional sanity checks specialized for StreamingPropertyFiles."""
    818 
    819   def test_init(self):
    820     property_files = StreamingPropertyFiles()
    821     self.assertEqual('ota-streaming-property-files', property_files.name)
    822     self.assertEqual(
    823         (
    824             'payload.bin',
    825             'payload_properties.txt',
    826         ),
    827         property_files.required)
    828     self.assertEqual(
    829         (
    830             'care_map.txt',
    831             'compatibility.zip',
    832         ),
    833         property_files.optional)
    834 
    835   def test_Compute(self):
    836     entries = (
    837         'payload.bin',
    838         'payload_properties.txt',
    839         'care_map.txt',
    840         'compatibility.zip',
    841     )
    842     zip_file = self.construct_zip_package(entries)
    843     property_files = StreamingPropertyFiles()
    844     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    845       property_files_string = property_files.Compute(zip_fp)
    846 
    847     tokens = self._parse_property_files_string(property_files_string)
    848     self.assertEqual(5, len(tokens))
    849     self._verify_entries(zip_file, tokens, entries)
    850 
    851   def test_Finalize(self):
    852     entries = [
    853         'payload.bin',
    854         'payload_properties.txt',
    855         'care_map.txt',
    856         'compatibility.zip',
    857         'META-INF/com/android/metadata',
    858     ]
    859     zip_file = self.construct_zip_package(entries)
    860     property_files = StreamingPropertyFiles()
    861     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    862       # pylint: disable=protected-access
    863       raw_metadata = property_files._GetPropertyFilesString(
    864           zip_fp, reserve_space=False)
    865       streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata))
    866     tokens = self._parse_property_files_string(streaming_metadata)
    867 
    868     self.assertEqual(5, len(tokens))
    869     # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
    870     # streaming metadata.
    871     entries[4] = 'metadata'
    872     self._verify_entries(zip_file, tokens, entries)
    873 
    874   def test_Verify(self):
    875     entries = (
    876         'payload.bin',
    877         'payload_properties.txt',
    878         'care_map.txt',
    879         'compatibility.zip',
    880         'META-INF/com/android/metadata',
    881     )
    882     zip_file = self.construct_zip_package(entries)
    883     property_files = StreamingPropertyFiles()
    884     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
    885       # First get the raw metadata string (i.e. without padding space).
    886       # pylint: disable=protected-access
    887       raw_metadata = property_files._GetPropertyFilesString(
    888           zip_fp, reserve_space=False)
    889 
    890       # Should pass the test if verification passes.
    891       property_files.Verify(zip_fp, raw_metadata)
    892 
    893       # Or raise on verification failure.
    894       self.assertRaises(
    895           AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
    896 
    897 
    898 class AbOtaPropertyFilesTest(PropertyFilesTest):
    899   """Additional sanity checks specialized for AbOtaPropertyFiles."""
    900 
    901   # The size for payload and metadata signature size.
    902   SIGNATURE_SIZE = 256
    903 
    904   def setUp(self):
    905     self.testdata_dir = test_utils.get_testdata_dir()
    906     self.assertTrue(os.path.exists(self.testdata_dir))
    907 
    908     common.OPTIONS.wipe_user_data = False
    909     common.OPTIONS.payload_signer = None
    910     common.OPTIONS.payload_signer_args = None
    911     common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
    912     common.OPTIONS.key_passwords = {
    913         common.OPTIONS.package_key : None,
    914     }
    915 
    916   def test_init(self):
    917     property_files = AbOtaPropertyFiles()
    918     self.assertEqual('ota-property-files', property_files.name)
    919     self.assertEqual(
    920         (
    921             'payload.bin',
    922             'payload_properties.txt',
    923         ),
    924         property_files.required)
    925     self.assertEqual(
    926         (
    927             'care_map.txt',
    928             'compatibility.zip',
    929         ),
    930         property_files.optional)
    931 
    932   def test_GetPayloadMetadataOffsetAndSize(self):
    933     target_file = construct_target_files()
    934     payload = Payload()
    935     payload.Generate(target_file)
    936 
    937     payload_signer = PayloadSigner()
    938     payload.Sign(payload_signer)
    939 
    940     output_file = common.MakeTempFile(suffix='.zip')
    941     with zipfile.ZipFile(output_file, 'w') as output_zip:
    942       payload.WriteToZip(output_zip)
    943 
    944     # Find out the payload metadata offset and size.
    945     property_files = AbOtaPropertyFiles()
    946     with zipfile.ZipFile(output_file) as input_zip:
    947       # pylint: disable=protected-access
    948       payload_offset, metadata_total = (
    949           property_files._GetPayloadMetadataOffsetAndSize(input_zip))
    950 
    951     # Read in the metadata signature directly.
    952     with open(output_file, 'rb') as verify_fp:
    953       verify_fp.seek(payload_offset + metadata_total - self.SIGNATURE_SIZE)
    954       metadata_signature = verify_fp.read(self.SIGNATURE_SIZE)
    955 
    956     # Now we extract the metadata hash via brillo_update_payload script, which
    957     # will serve as the oracle result.
    958     payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
    959     metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
    960     cmd = ['brillo_update_payload', 'hash',
    961            '--unsigned_payload', payload.payload_file,
    962            '--signature_size', str(self.SIGNATURE_SIZE),
    963            '--metadata_hash_file', metadata_sig_file,
    964            '--payload_hash_file', payload_sig_file]
    965     proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    966     stdoutdata, _ = proc.communicate()
    967     self.assertEqual(
    968         0, proc.returncode,
    969         'Failed to run brillo_update_payload: {}'.format(stdoutdata))
    970 
    971     signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)
    972 
    973     # Finally we can compare the two signatures.
    974     with open(signed_metadata_sig_file, 'rb') as verify_fp:
    975       self.assertEqual(verify_fp.read(), metadata_signature)
    976 
    977   @staticmethod
    978   def construct_zip_package_withValidPayload(with_metadata=False):
    979     # Cannot use construct_zip_package() since we need a "valid" payload.bin.
    980     target_file = construct_target_files()
    981     payload = Payload()
    982     payload.Generate(target_file)
    983 
    984     payload_signer = PayloadSigner()
    985     payload.Sign(payload_signer)
    986 
    987     zip_file = common.MakeTempFile(suffix='.zip')
    988     with zipfile.ZipFile(zip_file, 'w') as zip_fp:
    989       # 'payload.bin',
    990       payload.WriteToZip(zip_fp)
    991 
    992       # Other entries.
    993       entries = ['care_map.txt', 'compatibility.zip']
    994 
    995       # Put META-INF/com/android/metadata if needed.
    996       if with_metadata:
    997         entries.append('META-INF/com/android/metadata')
    998 
    999       for entry in entries:
   1000         zip_fp.writestr(
   1001             entry, entry.replace('.', '-').upper(), zipfile.ZIP_STORED)
   1002 
   1003     return zip_file
   1004 
   1005   def test_Compute(self):
   1006     zip_file = self.construct_zip_package_withValidPayload()
   1007     property_files = AbOtaPropertyFiles()
   1008     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
   1009       property_files_string = property_files.Compute(zip_fp)
   1010 
   1011     tokens = self._parse_property_files_string(property_files_string)
   1012     # "6" indcludes the four entries above, one metadata entry, and one entry
   1013     # for payload-metadata.bin.
   1014     self.assertEqual(6, len(tokens))
   1015     self._verify_entries(
   1016         zip_file, tokens, ('care_map.txt', 'compatibility.zip'))
   1017 
   1018   def test_Finalize(self):
   1019     zip_file = self.construct_zip_package_withValidPayload(with_metadata=True)
   1020     property_files = AbOtaPropertyFiles()
   1021     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
   1022       # pylint: disable=protected-access
   1023       raw_metadata = property_files._GetPropertyFilesString(
   1024           zip_fp, reserve_space=False)
   1025       property_files_string = property_files.Finalize(zip_fp, len(raw_metadata))
   1026 
   1027     tokens = self._parse_property_files_string(property_files_string)
   1028     # "6" indcludes the four entries above, one metadata entry, and one entry
   1029     # for payload-metadata.bin.
   1030     self.assertEqual(6, len(tokens))
   1031     self._verify_entries(
   1032         zip_file, tokens, ('care_map.txt', 'compatibility.zip'))
   1033 
   1034   def test_Verify(self):
   1035     zip_file = self.construct_zip_package_withValidPayload(with_metadata=True)
   1036     property_files = AbOtaPropertyFiles()
   1037     with zipfile.ZipFile(zip_file, 'r') as zip_fp:
   1038       # pylint: disable=protected-access
   1039       raw_metadata = property_files._GetPropertyFilesString(
   1040           zip_fp, reserve_space=False)
   1041 
   1042       property_files.Verify(zip_fp, raw_metadata)
   1043 
   1044 
   1045 class NonAbOtaPropertyFilesTest(PropertyFilesTest):
   1046   """Additional sanity checks specialized for NonAbOtaPropertyFiles."""
   1047 
   1048   def test_init(self):
   1049     property_files = NonAbOtaPropertyFiles()
   1050     self.assertEqual('ota-property-files', property_files.name)
   1051     self.assertEqual((), property_files.required)
   1052     self.assertEqual((), property_files.optional)
   1053 
   1054   def test_Compute(self):
   1055     entries = ()
   1056     zip_file = self.construct_zip_package(entries)
   1057     property_files = NonAbOtaPropertyFiles()
   1058     with zipfile.ZipFile(zip_file) as zip_fp:
   1059       property_files_string = property_files.Compute(zip_fp)
   1060 
   1061     tokens = self._parse_property_files_string(property_files_string)
   1062     self.assertEqual(1, len(tokens))
   1063     self._verify_entries(zip_file, tokens, entries)
   1064 
   1065   def test_Finalize(self):
   1066     entries = [
   1067         'META-INF/com/android/metadata',
   1068     ]
   1069     zip_file = self.construct_zip_package(entries)
   1070     property_files = NonAbOtaPropertyFiles()
   1071     with zipfile.ZipFile(zip_file) as zip_fp:
   1072       # pylint: disable=protected-access
   1073       raw_metadata = property_files._GetPropertyFilesString(
   1074           zip_fp, reserve_space=False)
   1075       property_files_string = property_files.Finalize(zip_fp, len(raw_metadata))
   1076     tokens = self._parse_property_files_string(property_files_string)
   1077 
   1078     self.assertEqual(1, len(tokens))
   1079     # 'META-INF/com/android/metadata' will be key'd as 'metadata'.
   1080     entries[0] = 'metadata'
   1081     self._verify_entries(zip_file, tokens, entries)
   1082 
   1083   def test_Verify(self):
   1084     entries = (
   1085         'META-INF/com/android/metadata',
   1086     )
   1087     zip_file = self.construct_zip_package(entries)
   1088     property_files = NonAbOtaPropertyFiles()
   1089     with zipfile.ZipFile(zip_file) as zip_fp:
   1090       # pylint: disable=protected-access
   1091       raw_metadata = property_files._GetPropertyFilesString(
   1092           zip_fp, reserve_space=False)
   1093 
   1094       property_files.Verify(zip_fp, raw_metadata)
   1095 
   1096 
   1097 class PayloadSignerTest(unittest.TestCase):
   1098 
   1099   SIGFILE = 'sigfile.bin'
   1100   SIGNED_SIGFILE = 'signed-sigfile.bin'
   1101 
   1102   def setUp(self):
   1103     self.testdata_dir = test_utils.get_testdata_dir()
   1104     self.assertTrue(os.path.exists(self.testdata_dir))
   1105 
   1106     common.OPTIONS.payload_signer = None
   1107     common.OPTIONS.payload_signer_args = []
   1108     common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
   1109     common.OPTIONS.key_passwords = {
   1110         common.OPTIONS.package_key : None,
   1111     }
   1112 
   1113   def tearDown(self):
   1114     common.Cleanup()
   1115 
   1116   def _assertFilesEqual(self, file1, file2):
   1117     with open(file1, 'rb') as fp1, open(file2, 'rb') as fp2:
   1118       self.assertEqual(fp1.read(), fp2.read())
   1119 
   1120   def test_init(self):
   1121     payload_signer = PayloadSigner()
   1122     self.assertEqual('openssl', payload_signer.signer)
   1123 
   1124   def test_init_withPassword(self):
   1125     common.OPTIONS.package_key = os.path.join(
   1126         self.testdata_dir, 'testkey_with_passwd')
   1127     common.OPTIONS.key_passwords = {
   1128         common.OPTIONS.package_key : 'foo',
   1129     }
   1130     payload_signer = PayloadSigner()
   1131     self.assertEqual('openssl', payload_signer.signer)
   1132 
   1133   def test_init_withExternalSigner(self):
   1134     common.OPTIONS.payload_signer = 'abc'
   1135     common.OPTIONS.payload_signer_args = ['arg1', 'arg2']
   1136     payload_signer = PayloadSigner()
   1137     self.assertEqual('abc', payload_signer.signer)
   1138     self.assertEqual(['arg1', 'arg2'], payload_signer.signer_args)
   1139 
   1140   def test_Sign(self):
   1141     payload_signer = PayloadSigner()
   1142     input_file = os.path.join(self.testdata_dir, self.SIGFILE)
   1143     signed_file = payload_signer.Sign(input_file)
   1144 
   1145     verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
   1146     self._assertFilesEqual(verify_file, signed_file)
   1147 
   1148   def test_Sign_withExternalSigner_openssl(self):
   1149     """Uses openssl as the external payload signer."""
   1150     common.OPTIONS.payload_signer = 'openssl'
   1151     common.OPTIONS.payload_signer_args = [
   1152         'pkeyutl', '-sign', '-keyform', 'DER', '-inkey',
   1153         os.path.join(self.testdata_dir, 'testkey.pk8'),
   1154         '-pkeyopt', 'digest:sha256']
   1155     payload_signer = PayloadSigner()
   1156     input_file = os.path.join(self.testdata_dir, self.SIGFILE)
   1157     signed_file = payload_signer.Sign(input_file)
   1158 
   1159     verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
   1160     self._assertFilesEqual(verify_file, signed_file)
   1161 
   1162   def test_Sign_withExternalSigner_script(self):
   1163     """Uses testdata/payload_signer.sh as the external payload signer."""
   1164     common.OPTIONS.payload_signer = os.path.join(
   1165         self.testdata_dir, 'payload_signer.sh')
   1166     common.OPTIONS.payload_signer_args = [
   1167         os.path.join(self.testdata_dir, 'testkey.pk8')]
   1168     payload_signer = PayloadSigner()
   1169     input_file = os.path.join(self.testdata_dir, self.SIGFILE)
   1170     signed_file = payload_signer.Sign(input_file)
   1171 
   1172     verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
   1173     self._assertFilesEqual(verify_file, signed_file)
   1174 
   1175 
   1176 class PayloadTest(unittest.TestCase):
   1177 
   1178   def setUp(self):
   1179     self.testdata_dir = test_utils.get_testdata_dir()
   1180     self.assertTrue(os.path.exists(self.testdata_dir))
   1181 
   1182     common.OPTIONS.wipe_user_data = False
   1183     common.OPTIONS.payload_signer = None
   1184     common.OPTIONS.payload_signer_args = None
   1185     common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
   1186     common.OPTIONS.key_passwords = {
   1187         common.OPTIONS.package_key : None,
   1188     }
   1189 
   1190   def tearDown(self):
   1191     common.Cleanup()
   1192 
   1193   @staticmethod
   1194   def _create_payload_full(secondary=False):
   1195     target_file = construct_target_files(secondary)
   1196     payload = Payload(secondary)
   1197     payload.Generate(target_file)
   1198     return payload
   1199 
   1200   @staticmethod
   1201   def _create_payload_incremental():
   1202     target_file = construct_target_files()
   1203     source_file = construct_target_files()
   1204     payload = Payload()
   1205     payload.Generate(target_file, source_file)
   1206     return payload
   1207 
   1208   def test_Generate_full(self):
   1209     payload = self._create_payload_full()
   1210     self.assertTrue(os.path.exists(payload.payload_file))
   1211 
   1212   def test_Generate_incremental(self):
   1213     payload = self._create_payload_incremental()
   1214     self.assertTrue(os.path.exists(payload.payload_file))
   1215 
   1216   def test_Generate_additionalArgs(self):
   1217     target_file = construct_target_files()
   1218     source_file = construct_target_files()
   1219     payload = Payload()
   1220     # This should work the same as calling payload.Generate(target_file,
   1221     # source_file).
   1222     payload.Generate(
   1223         target_file, additional_args=["--source_image", source_file])
   1224     self.assertTrue(os.path.exists(payload.payload_file))
   1225 
   1226   def test_Generate_invalidInput(self):
   1227     target_file = construct_target_files()
   1228     common.ZipDelete(target_file, 'IMAGES/vendor.img')
   1229     payload = Payload()
   1230     self.assertRaises(AssertionError, payload.Generate, target_file)
   1231 
   1232   def test_Sign_full(self):
   1233     payload = self._create_payload_full()
   1234     payload.Sign(PayloadSigner())
   1235 
   1236     output_file = common.MakeTempFile(suffix='.zip')
   1237     with zipfile.ZipFile(output_file, 'w') as output_zip:
   1238       payload.WriteToZip(output_zip)
   1239 
   1240     import check_ota_package_signature
   1241     check_ota_package_signature.VerifyAbOtaPayload(
   1242         os.path.join(self.testdata_dir, 'testkey.x509.pem'),
   1243         output_file)
   1244 
   1245   def test_Sign_incremental(self):
   1246     payload = self._create_payload_incremental()
   1247     payload.Sign(PayloadSigner())
   1248 
   1249     output_file = common.MakeTempFile(suffix='.zip')
   1250     with zipfile.ZipFile(output_file, 'w') as output_zip:
   1251       payload.WriteToZip(output_zip)
   1252 
   1253     import check_ota_package_signature
   1254     check_ota_package_signature.VerifyAbOtaPayload(
   1255         os.path.join(self.testdata_dir, 'testkey.x509.pem'),
   1256         output_file)
   1257 
   1258   def test_Sign_withDataWipe(self):
   1259     common.OPTIONS.wipe_user_data = True
   1260     payload = self._create_payload_full()
   1261     payload.Sign(PayloadSigner())
   1262 
   1263     with open(payload.payload_properties) as properties_fp:
   1264       self.assertIn("POWERWASH=1", properties_fp.read())
   1265 
   1266   def test_Sign_secondary(self):
   1267     payload = self._create_payload_full(secondary=True)
   1268     payload.Sign(PayloadSigner())
   1269 
   1270     with open(payload.payload_properties) as properties_fp:
   1271       self.assertIn("SWITCH_SLOT_ON_REBOOT=0", properties_fp.read())
   1272 
   1273   def test_Sign_badSigner(self):
   1274     """Tests that signing failure can be captured."""
   1275     payload = self._create_payload_full()
   1276     payload_signer = PayloadSigner()
   1277     payload_signer.signer_args.append('bad-option')
   1278     self.assertRaises(AssertionError, payload.Sign, payload_signer)
   1279 
   1280   def test_WriteToZip(self):
   1281     payload = self._create_payload_full()
   1282     payload.Sign(PayloadSigner())
   1283 
   1284     output_file = common.MakeTempFile(suffix='.zip')
   1285     with zipfile.ZipFile(output_file, 'w') as output_zip:
   1286       payload.WriteToZip(output_zip)
   1287 
   1288     with zipfile.ZipFile(output_file) as verify_zip:
   1289       # First make sure we have the essential entries.
   1290       namelist = verify_zip.namelist()
   1291       self.assertIn(Payload.PAYLOAD_BIN, namelist)
   1292       self.assertIn(Payload.PAYLOAD_PROPERTIES_TXT, namelist)
   1293 
   1294       # Then assert these entries are stored.
   1295       for entry_info in verify_zip.infolist():
   1296         if entry_info.filename not in (Payload.PAYLOAD_BIN,
   1297                                        Payload.PAYLOAD_PROPERTIES_TXT):
   1298           continue
   1299         self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
   1300 
   1301   def test_WriteToZip_unsignedPayload(self):
   1302     """Unsigned payloads should not be allowed to be written to zip."""
   1303     payload = self._create_payload_full()
   1304 
   1305     output_file = common.MakeTempFile(suffix='.zip')
   1306     with zipfile.ZipFile(output_file, 'w') as output_zip:
   1307       self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
   1308 
   1309     # Also test with incremental payload.
   1310     payload = self._create_payload_incremental()
   1311 
   1312     output_file = common.MakeTempFile(suffix='.zip')
   1313     with zipfile.ZipFile(output_file, 'w') as output_zip:
   1314       self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
   1315 
   1316   def test_WriteToZip_secondary(self):
   1317     payload = self._create_payload_full(secondary=True)
   1318     payload.Sign(PayloadSigner())
   1319 
   1320     output_file = common.MakeTempFile(suffix='.zip')
   1321     with zipfile.ZipFile(output_file, 'w') as output_zip:
   1322       payload.WriteToZip(output_zip)
   1323 
   1324     with zipfile.ZipFile(output_file) as verify_zip:
   1325       # First make sure we have the essential entries.
   1326       namelist = verify_zip.namelist()
   1327       self.assertIn(Payload.SECONDARY_PAYLOAD_BIN, namelist)
   1328       self.assertIn(Payload.SECONDARY_PAYLOAD_PROPERTIES_TXT, namelist)
   1329 
   1330       # Then assert these entries are stored.
   1331       for entry_info in verify_zip.infolist():
   1332         if entry_info.filename not in (
   1333             Payload.SECONDARY_PAYLOAD_BIN,
   1334             Payload.SECONDARY_PAYLOAD_PROPERTIES_TXT):
   1335           continue
   1336         self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
   1337