1 # 2 # Copyright (C) 2015 The Android Open Source Project 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 # 16 17 import copy 18 import os 19 import subprocess 20 import tempfile 21 import time 22 import zipfile 23 from hashlib import sha1 24 25 import common 26 import test_utils 27 import validate_target_files 28 from rangelib import RangeSet 29 30 from blockimgdiff import EmptyImage, DataImage 31 32 KiB = 1024 33 MiB = 1024 * KiB 34 GiB = 1024 * MiB 35 36 37 def get_2gb_string(): 38 size = int(2 * GiB + 1) 39 block_size = 4 * KiB 40 step_size = 4 * MiB 41 # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'. 42 for _ in range(0, size, step_size): 43 yield os.urandom(block_size) 44 yield '\0' * (step_size - block_size) 45 46 47 class CommonZipTest(test_utils.ReleaseToolsTestCase): 48 49 def _verify(self, zip_file, zip_file_name, arcname, expected_hash, 50 test_file_name=None, expected_stat=None, expected_mode=0o644, 51 expected_compress_type=zipfile.ZIP_STORED): 52 # Verify the stat if present. 53 if test_file_name is not None: 54 new_stat = os.stat(test_file_name) 55 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode)) 56 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime)) 57 58 # Reopen the zip file to verify. 59 zip_file = zipfile.ZipFile(zip_file_name, "r") 60 61 # Verify the timestamp. 62 info = zip_file.getinfo(arcname) 63 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0)) 64 65 # Verify the file mode. 66 mode = (info.external_attr >> 16) & 0o777 67 self.assertEqual(mode, expected_mode) 68 69 # Verify the compress type. 70 self.assertEqual(info.compress_type, expected_compress_type) 71 72 # Verify the zip contents. 73 entry = zip_file.open(arcname) 74 sha1_hash = sha1() 75 for chunk in iter(lambda: entry.read(4 * MiB), ''): 76 sha1_hash.update(chunk) 77 self.assertEqual(expected_hash, sha1_hash.hexdigest()) 78 self.assertIsNone(zip_file.testzip()) 79 80 def _test_ZipWrite(self, contents, extra_zipwrite_args=None): 81 extra_zipwrite_args = dict(extra_zipwrite_args or {}) 82 83 test_file = tempfile.NamedTemporaryFile(delete=False) 84 test_file_name = test_file.name 85 86 zip_file = tempfile.NamedTemporaryFile(delete=False) 87 zip_file_name = zip_file.name 88 89 # File names within an archive strip the leading slash. 90 arcname = extra_zipwrite_args.get("arcname", test_file_name) 91 if arcname[0] == "/": 92 arcname = arcname[1:] 93 94 zip_file.close() 95 zip_file = zipfile.ZipFile(zip_file_name, "w") 96 97 try: 98 sha1_hash = sha1() 99 for data in contents: 100 sha1_hash.update(data) 101 test_file.write(data) 102 test_file.close() 103 104 expected_stat = os.stat(test_file_name) 105 expected_mode = extra_zipwrite_args.get("perms", 0o644) 106 expected_compress_type = extra_zipwrite_args.get("compress_type", 107 zipfile.ZIP_STORED) 108 time.sleep(5) # Make sure the atime/mtime will change measurably. 109 110 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args) 111 common.ZipClose(zip_file) 112 113 self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(), 114 test_file_name, expected_stat, expected_mode, 115 expected_compress_type) 116 finally: 117 os.remove(test_file_name) 118 os.remove(zip_file_name) 119 120 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None): 121 extra_args = dict(extra_args or {}) 122 123 zip_file = tempfile.NamedTemporaryFile(delete=False) 124 zip_file_name = zip_file.name 125 zip_file.close() 126 127 zip_file = zipfile.ZipFile(zip_file_name, "w") 128 129 try: 130 expected_compress_type = extra_args.get("compress_type", 131 zipfile.ZIP_STORED) 132 time.sleep(5) # Make sure the atime/mtime will change measurably. 133 134 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo): 135 arcname = zinfo_or_arcname 136 expected_mode = extra_args.get("perms", 0o644) 137 else: 138 arcname = zinfo_or_arcname.filename 139 expected_mode = extra_args.get("perms", 140 zinfo_or_arcname.external_attr >> 16) 141 142 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args) 143 common.ZipClose(zip_file) 144 145 self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(), 146 expected_mode=expected_mode, 147 expected_compress_type=expected_compress_type) 148 finally: 149 os.remove(zip_file_name) 150 151 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None): 152 extra_args = dict(extra_args or {}) 153 154 zip_file = tempfile.NamedTemporaryFile(delete=False) 155 zip_file_name = zip_file.name 156 157 test_file = tempfile.NamedTemporaryFile(delete=False) 158 test_file_name = test_file.name 159 160 arcname_large = test_file_name 161 arcname_small = "bar" 162 163 # File names within an archive strip the leading slash. 164 if arcname_large[0] == "/": 165 arcname_large = arcname_large[1:] 166 167 zip_file.close() 168 zip_file = zipfile.ZipFile(zip_file_name, "w") 169 170 try: 171 sha1_hash = sha1() 172 for data in large: 173 sha1_hash.update(data) 174 test_file.write(data) 175 test_file.close() 176 177 expected_stat = os.stat(test_file_name) 178 expected_mode = 0o644 179 expected_compress_type = extra_args.get("compress_type", 180 zipfile.ZIP_STORED) 181 time.sleep(5) # Make sure the atime/mtime will change measurably. 182 183 common.ZipWrite(zip_file, test_file_name, **extra_args) 184 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args) 185 common.ZipClose(zip_file) 186 187 # Verify the contents written by ZipWrite(). 188 self._verify(zip_file, zip_file_name, arcname_large, 189 sha1_hash.hexdigest(), test_file_name, expected_stat, 190 expected_mode, expected_compress_type) 191 192 # Verify the contents written by ZipWriteStr(). 193 self._verify(zip_file, zip_file_name, arcname_small, 194 sha1(small).hexdigest(), 195 expected_compress_type=expected_compress_type) 196 finally: 197 os.remove(zip_file_name) 198 os.remove(test_file_name) 199 200 def _test_reset_ZIP64_LIMIT(self, func, *args): 201 default_limit = (1 << 31) - 1 202 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 203 func(*args) 204 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 205 206 def test_ZipWrite(self): 207 file_contents = os.urandom(1024) 208 self._test_ZipWrite(file_contents) 209 210 def test_ZipWrite_with_opts(self): 211 file_contents = os.urandom(1024) 212 self._test_ZipWrite(file_contents, { 213 "arcname": "foobar", 214 "perms": 0o777, 215 "compress_type": zipfile.ZIP_DEFLATED, 216 }) 217 self._test_ZipWrite(file_contents, { 218 "arcname": "foobar", 219 "perms": 0o700, 220 "compress_type": zipfile.ZIP_STORED, 221 }) 222 223 def test_ZipWrite_large_file(self): 224 file_contents = get_2gb_string() 225 self._test_ZipWrite(file_contents, { 226 "compress_type": zipfile.ZIP_DEFLATED, 227 }) 228 229 def test_ZipWrite_resets_ZIP64_LIMIT(self): 230 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "") 231 232 def test_ZipWriteStr(self): 233 random_string = os.urandom(1024) 234 # Passing arcname 235 self._test_ZipWriteStr("foo", random_string) 236 237 # Passing zinfo 238 zinfo = zipfile.ZipInfo(filename="foo") 239 self._test_ZipWriteStr(zinfo, random_string) 240 241 # Timestamp in the zinfo should be overwritten. 242 zinfo.date_time = (2015, 3, 1, 15, 30, 0) 243 self._test_ZipWriteStr(zinfo, random_string) 244 245 def test_ZipWriteStr_with_opts(self): 246 random_string = os.urandom(1024) 247 # Passing arcname 248 self._test_ZipWriteStr("foo", random_string, { 249 "perms": 0o700, 250 "compress_type": zipfile.ZIP_DEFLATED, 251 }) 252 self._test_ZipWriteStr("bar", random_string, { 253 "compress_type": zipfile.ZIP_STORED, 254 }) 255 256 # Passing zinfo 257 zinfo = zipfile.ZipInfo(filename="foo") 258 self._test_ZipWriteStr(zinfo, random_string, { 259 "compress_type": zipfile.ZIP_DEFLATED, 260 }) 261 self._test_ZipWriteStr(zinfo, random_string, { 262 "perms": 0o600, 263 "compress_type": zipfile.ZIP_STORED, 264 }) 265 266 def test_ZipWriteStr_large_file(self): 267 # zipfile.writestr() doesn't work when the str size is over 2GiB even with 268 # the workaround. We will only test the case of writing a string into a 269 # large archive. 270 long_string = get_2gb_string() 271 short_string = os.urandom(1024) 272 self._test_ZipWriteStr_large_file(long_string, short_string, { 273 "compress_type": zipfile.ZIP_DEFLATED, 274 }) 275 276 def test_ZipWriteStr_resets_ZIP64_LIMIT(self): 277 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "") 278 zinfo = zipfile.ZipInfo(filename="foo") 279 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "") 280 281 def test_bug21309935(self): 282 zip_file = tempfile.NamedTemporaryFile(delete=False) 283 zip_file_name = zip_file.name 284 zip_file.close() 285 286 try: 287 random_string = os.urandom(1024) 288 zip_file = zipfile.ZipFile(zip_file_name, "w") 289 # Default perms should be 0o644 when passing the filename. 290 common.ZipWriteStr(zip_file, "foo", random_string) 291 # Honor the specified perms. 292 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755) 293 # The perms in zinfo should be untouched. 294 zinfo = zipfile.ZipInfo(filename="baz") 295 zinfo.external_attr = 0o740 << 16 296 common.ZipWriteStr(zip_file, zinfo, random_string) 297 # Explicitly specified perms has the priority. 298 zinfo = zipfile.ZipInfo(filename="qux") 299 zinfo.external_attr = 0o700 << 16 300 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400) 301 common.ZipClose(zip_file) 302 303 self._verify(zip_file, zip_file_name, "foo", 304 sha1(random_string).hexdigest(), 305 expected_mode=0o644) 306 self._verify(zip_file, zip_file_name, "bar", 307 sha1(random_string).hexdigest(), 308 expected_mode=0o755) 309 self._verify(zip_file, zip_file_name, "baz", 310 sha1(random_string).hexdigest(), 311 expected_mode=0o740) 312 self._verify(zip_file, zip_file_name, "qux", 313 sha1(random_string).hexdigest(), 314 expected_mode=0o400) 315 finally: 316 os.remove(zip_file_name) 317 318 def test_ZipDelete(self): 319 zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip') 320 output_zip = zipfile.ZipFile(zip_file.name, 'w', 321 compression=zipfile.ZIP_DEFLATED) 322 with tempfile.NamedTemporaryFile() as entry_file: 323 entry_file.write(os.urandom(1024)) 324 common.ZipWrite(output_zip, entry_file.name, arcname='Test1') 325 common.ZipWrite(output_zip, entry_file.name, arcname='Test2') 326 common.ZipWrite(output_zip, entry_file.name, arcname='Test3') 327 common.ZipClose(output_zip) 328 zip_file.close() 329 330 try: 331 common.ZipDelete(zip_file.name, 'Test2') 332 with zipfile.ZipFile(zip_file.name, 'r') as check_zip: 333 entries = check_zip.namelist() 334 self.assertTrue('Test1' in entries) 335 self.assertFalse('Test2' in entries) 336 self.assertTrue('Test3' in entries) 337 338 self.assertRaises( 339 common.ExternalError, common.ZipDelete, zip_file.name, 'Test2') 340 with zipfile.ZipFile(zip_file.name, 'r') as check_zip: 341 entries = check_zip.namelist() 342 self.assertTrue('Test1' in entries) 343 self.assertFalse('Test2' in entries) 344 self.assertTrue('Test3' in entries) 345 346 common.ZipDelete(zip_file.name, ['Test3']) 347 with zipfile.ZipFile(zip_file.name, 'r') as check_zip: 348 entries = check_zip.namelist() 349 self.assertTrue('Test1' in entries) 350 self.assertFalse('Test2' in entries) 351 self.assertFalse('Test3' in entries) 352 353 common.ZipDelete(zip_file.name, ['Test1', 'Test2']) 354 with zipfile.ZipFile(zip_file.name, 'r') as check_zip: 355 entries = check_zip.namelist() 356 self.assertFalse('Test1' in entries) 357 self.assertFalse('Test2' in entries) 358 self.assertFalse('Test3' in entries) 359 finally: 360 os.remove(zip_file.name) 361 362 @staticmethod 363 def _test_UnzipTemp_createZipFile(): 364 zip_file = common.MakeTempFile(suffix='.zip') 365 output_zip = zipfile.ZipFile( 366 zip_file, 'w', compression=zipfile.ZIP_DEFLATED) 367 contents = os.urandom(1024) 368 with tempfile.NamedTemporaryFile() as entry_file: 369 entry_file.write(contents) 370 common.ZipWrite(output_zip, entry_file.name, arcname='Test1') 371 common.ZipWrite(output_zip, entry_file.name, arcname='Test2') 372 common.ZipWrite(output_zip, entry_file.name, arcname='Foo3') 373 common.ZipWrite(output_zip, entry_file.name, arcname='Bar4') 374 common.ZipWrite(output_zip, entry_file.name, arcname='Dir5/Baz5') 375 common.ZipClose(output_zip) 376 common.ZipClose(output_zip) 377 return zip_file 378 379 def test_UnzipTemp(self): 380 zip_file = self._test_UnzipTemp_createZipFile() 381 unzipped_dir = common.UnzipTemp(zip_file) 382 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 383 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 384 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 385 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 386 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 387 388 def test_UnzipTemp_withPatterns(self): 389 zip_file = self._test_UnzipTemp_createZipFile() 390 391 unzipped_dir = common.UnzipTemp(zip_file, ['Test1']) 392 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 393 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 394 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 395 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 396 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 397 398 unzipped_dir = common.UnzipTemp(zip_file, ['Test1', 'Foo3']) 399 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 400 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 401 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 402 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 403 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 404 405 unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Foo3*']) 406 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 407 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 408 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 409 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 410 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 411 412 unzipped_dir = common.UnzipTemp(zip_file, ['*Test1', '*Baz*']) 413 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 414 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 415 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 416 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 417 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 418 419 def test_UnzipTemp_withEmptyPatterns(self): 420 zip_file = self._test_UnzipTemp_createZipFile() 421 unzipped_dir = common.UnzipTemp(zip_file, []) 422 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 423 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 424 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 425 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 426 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 427 428 def test_UnzipTemp_withPartiallyMatchingPatterns(self): 429 zip_file = self._test_UnzipTemp_createZipFile() 430 unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Nonexistent*']) 431 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 432 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 433 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 434 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 435 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 436 437 def test_UnzipTemp_withNoMatchingPatterns(self): 438 zip_file = self._test_UnzipTemp_createZipFile() 439 unzipped_dir = common.UnzipTemp(zip_file, ['Foo4', 'Nonexistent*']) 440 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1'))) 441 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2'))) 442 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3'))) 443 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4'))) 444 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5'))) 445 446 447 class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase): 448 """Tests the APK utils related functions.""" 449 450 APKCERTS_TXT1 = ( 451 'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"' 452 ' private_key="certs/devkey.pk8"\n' 453 'name="Settings.apk"' 454 ' certificate="build/target/product/security/platform.x509.pem"' 455 ' private_key="build/target/product/security/platform.pk8"\n' 456 'name="TV.apk" certificate="PRESIGNED" private_key=""\n' 457 ) 458 459 APKCERTS_CERTMAP1 = { 460 'RecoveryLocalizer.apk' : 'certs/devkey', 461 'Settings.apk' : 'build/target/product/security/platform', 462 'TV.apk' : 'PRESIGNED', 463 } 464 465 APKCERTS_TXT2 = ( 466 'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"' 467 ' private_key="certs/compressed1.pk8" compressed="gz"\n' 468 'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"' 469 ' private_key="certs/compressed2.pk8" compressed="gz"\n' 470 'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"' 471 ' private_key="certs/compressed2.pk8" compressed="gz"\n' 472 'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"' 473 ' private_key="certs/compressed3.pk8" compressed="gz"\n' 474 ) 475 476 APKCERTS_CERTMAP2 = { 477 'Compressed1.apk' : 'certs/compressed1', 478 'Compressed2a.apk' : 'certs/compressed2', 479 'Compressed2b.apk' : 'certs/compressed2', 480 'Compressed3.apk' : 'certs/compressed3', 481 } 482 483 APKCERTS_TXT3 = ( 484 'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"' 485 ' private_key="certs/compressed4.pk8" compressed="xz"\n' 486 ) 487 488 APKCERTS_CERTMAP3 = { 489 'Compressed4.apk' : 'certs/compressed4', 490 } 491 492 def setUp(self): 493 self.testdata_dir = test_utils.get_testdata_dir() 494 495 @staticmethod 496 def _write_apkcerts_txt(apkcerts_txt, additional=None): 497 if additional is None: 498 additional = [] 499 target_files = common.MakeTempFile(suffix='.zip') 500 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 501 target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt) 502 for entry in additional: 503 target_files_zip.writestr(entry, '') 504 return target_files 505 506 def test_ReadApkCerts_NoncompressedApks(self): 507 target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1) 508 with zipfile.ZipFile(target_files, 'r') as input_zip: 509 certmap, ext = common.ReadApkCerts(input_zip) 510 511 self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap) 512 self.assertIsNone(ext) 513 514 def test_ReadApkCerts_CompressedApks(self): 515 # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is 516 # not stored in '.gz' format, so it shouldn't be considered as installed. 517 target_files = self._write_apkcerts_txt( 518 self.APKCERTS_TXT2, 519 ['Compressed1.apk.gz', 'Compressed3.apk']) 520 521 with zipfile.ZipFile(target_files, 'r') as input_zip: 522 certmap, ext = common.ReadApkCerts(input_zip) 523 524 self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap) 525 self.assertEqual('.gz', ext) 526 527 # Alternative case with '.xz'. 528 target_files = self._write_apkcerts_txt( 529 self.APKCERTS_TXT3, ['Compressed4.apk.xz']) 530 531 with zipfile.ZipFile(target_files, 'r') as input_zip: 532 certmap, ext = common.ReadApkCerts(input_zip) 533 534 self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap) 535 self.assertEqual('.xz', ext) 536 537 def test_ReadApkCerts_CompressedAndNoncompressedApks(self): 538 target_files = self._write_apkcerts_txt( 539 self.APKCERTS_TXT1 + self.APKCERTS_TXT2, 540 ['Compressed1.apk.gz', 'Compressed3.apk']) 541 542 with zipfile.ZipFile(target_files, 'r') as input_zip: 543 certmap, ext = common.ReadApkCerts(input_zip) 544 545 certmap_merged = self.APKCERTS_CERTMAP1.copy() 546 certmap_merged.update(self.APKCERTS_CERTMAP2) 547 self.assertDictEqual(certmap_merged, certmap) 548 self.assertEqual('.gz', ext) 549 550 def test_ReadApkCerts_MultipleCompressionMethods(self): 551 target_files = self._write_apkcerts_txt( 552 self.APKCERTS_TXT2 + self.APKCERTS_TXT3, 553 ['Compressed1.apk.gz', 'Compressed4.apk.xz']) 554 555 with zipfile.ZipFile(target_files, 'r') as input_zip: 556 self.assertRaises(ValueError, common.ReadApkCerts, input_zip) 557 558 def test_ReadApkCerts_MismatchingKeys(self): 559 malformed_apkcerts_txt = ( 560 'name="App1.apk" certificate="certs/cert1.x509.pem"' 561 ' private_key="certs/cert2.pk8"\n' 562 ) 563 target_files = self._write_apkcerts_txt(malformed_apkcerts_txt) 564 565 with zipfile.ZipFile(target_files, 'r') as input_zip: 566 self.assertRaises(ValueError, common.ReadApkCerts, input_zip) 567 568 def test_ExtractPublicKey(self): 569 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem') 570 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') 571 with open(pubkey, 'rb') as pubkey_fp: 572 self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert)) 573 574 def test_ExtractPublicKey_invalidInput(self): 575 wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8') 576 self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input) 577 578 def test_ExtractAvbPublicKey(self): 579 privkey = os.path.join(self.testdata_dir, 'testkey.key') 580 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') 581 with open(common.ExtractAvbPublicKey(privkey)) as privkey_fp, \ 582 open(common.ExtractAvbPublicKey(pubkey)) as pubkey_fp: 583 self.assertEqual(privkey_fp.read(), pubkey_fp.read()) 584 585 def test_ParseCertificate(self): 586 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem') 587 588 cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER'] 589 proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 590 expected, _ = proc.communicate() 591 self.assertEqual(0, proc.returncode) 592 593 with open(cert) as cert_fp: 594 actual = common.ParseCertificate(cert_fp.read()) 595 self.assertEqual(expected, actual) 596 597 def test_GetMinSdkVersion(self): 598 test_app = os.path.join(self.testdata_dir, 'TestApp.apk') 599 self.assertEqual('24', common.GetMinSdkVersion(test_app)) 600 601 def test_GetMinSdkVersion_invalidInput(self): 602 self.assertRaises( 603 common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk') 604 605 def test_GetMinSdkVersionInt(self): 606 test_app = os.path.join(self.testdata_dir, 'TestApp.apk') 607 self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {})) 608 609 def test_GetMinSdkVersionInt_invalidInput(self): 610 self.assertRaises( 611 common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk', 612 {}) 613 614 615 class CommonUtilsTest(test_utils.ReleaseToolsTestCase): 616 617 def setUp(self): 618 self.testdata_dir = test_utils.get_testdata_dir() 619 620 def test_GetSparseImage_emptyBlockMapFile(self): 621 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 622 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 623 target_files_zip.write( 624 test_utils.construct_sparse_image([ 625 (0xCAC1, 6), 626 (0xCAC3, 3), 627 (0xCAC1, 4)]), 628 arcname='IMAGES/system.img') 629 target_files_zip.writestr('IMAGES/system.map', '') 630 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8)) 631 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 632 633 tempdir = common.UnzipTemp(target_files) 634 with zipfile.ZipFile(target_files, 'r') as input_zip: 635 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 636 637 self.assertDictEqual( 638 { 639 '__COPY': RangeSet("0"), 640 '__NONZERO-0': RangeSet("1-5 9-12"), 641 }, 642 sparse_image.file_map) 643 644 def test_GetSparseImage_missingImageFile(self): 645 self.assertRaises( 646 AssertionError, common.GetSparseImage, 'system2', self.testdata_dir, 647 None, False) 648 self.assertRaises( 649 AssertionError, common.GetSparseImage, 'unknown', self.testdata_dir, 650 None, False) 651 652 def test_GetSparseImage_missingBlockMapFile(self): 653 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 654 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 655 target_files_zip.write( 656 test_utils.construct_sparse_image([ 657 (0xCAC1, 6), 658 (0xCAC3, 3), 659 (0xCAC1, 4)]), 660 arcname='IMAGES/system.img') 661 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8)) 662 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 663 664 tempdir = common.UnzipTemp(target_files) 665 with zipfile.ZipFile(target_files, 'r') as input_zip: 666 self.assertRaises( 667 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip, 668 False) 669 670 def test_GetSparseImage_sharedBlocks_notAllowed(self): 671 """Tests the case of having overlapping blocks but disallowed.""" 672 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 673 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 674 target_files_zip.write( 675 test_utils.construct_sparse_image([(0xCAC2, 16)]), 676 arcname='IMAGES/system.img') 677 # Block 10 is shared between two files. 678 target_files_zip.writestr( 679 'IMAGES/system.map', 680 '\n'.join([ 681 '/system/file1 1-5 9-10', 682 '/system/file2 10-12'])) 683 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 684 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 685 686 tempdir = common.UnzipTemp(target_files) 687 with zipfile.ZipFile(target_files, 'r') as input_zip: 688 self.assertRaises( 689 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip, 690 False) 691 692 def test_GetSparseImage_sharedBlocks_allowed(self): 693 """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true.""" 694 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 695 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 696 # Construct an image with a care_map of "0-5 9-12". 697 target_files_zip.write( 698 test_utils.construct_sparse_image([(0xCAC2, 16)]), 699 arcname='IMAGES/system.img') 700 # Block 10 is shared between two files. 701 target_files_zip.writestr( 702 'IMAGES/system.map', 703 '\n'.join([ 704 '/system/file1 1-5 9-10', 705 '/system/file2 10-12'])) 706 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 707 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 708 709 tempdir = common.UnzipTemp(target_files) 710 with zipfile.ZipFile(target_files, 'r') as input_zip: 711 sparse_image = common.GetSparseImage('system', tempdir, input_zip, True) 712 713 self.assertDictEqual( 714 { 715 '__COPY': RangeSet("0"), 716 '__NONZERO-0': RangeSet("6-8 13-15"), 717 '/system/file1': RangeSet("1-5 9-10"), 718 '/system/file2': RangeSet("11-12"), 719 }, 720 sparse_image.file_map) 721 722 # '/system/file2' should be marked with 'uses_shared_blocks', but not with 723 # 'incomplete'. 724 self.assertTrue( 725 sparse_image.file_map['/system/file2'].extra['uses_shared_blocks']) 726 self.assertNotIn( 727 'incomplete', sparse_image.file_map['/system/file2'].extra) 728 729 # All other entries should look normal without any tags. 730 self.assertFalse(sparse_image.file_map['__COPY'].extra) 731 self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra) 732 self.assertFalse(sparse_image.file_map['/system/file1'].extra) 733 734 def test_GetSparseImage_incompleteRanges(self): 735 """Tests the case of ext4 images with holes.""" 736 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 737 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 738 target_files_zip.write( 739 test_utils.construct_sparse_image([(0xCAC2, 16)]), 740 arcname='IMAGES/system.img') 741 target_files_zip.writestr( 742 'IMAGES/system.map', 743 '\n'.join([ 744 '/system/file1 1-5 9-10', 745 '/system/file2 11-12'])) 746 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 747 # '/system/file2' has less blocks listed (2) than actual (3). 748 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 749 750 tempdir = common.UnzipTemp(target_files) 751 with zipfile.ZipFile(target_files, 'r') as input_zip: 752 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 753 754 self.assertFalse(sparse_image.file_map['/system/file1'].extra) 755 self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete']) 756 757 def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self): 758 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 759 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 760 target_files_zip.write( 761 test_utils.construct_sparse_image([(0xCAC2, 16)]), 762 arcname='IMAGES/system.img') 763 target_files_zip.writestr( 764 'IMAGES/system.map', 765 '\n'.join([ 766 '//system/file1 1-5 9-10', 767 '//system/file2 11-12', 768 '/system/app/file3 13-15'])) 769 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 770 # '/system/file2' has less blocks listed (2) than actual (3). 771 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 772 # '/system/app/file3' has less blocks listed (3) than actual (4). 773 target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4)) 774 775 tempdir = common.UnzipTemp(target_files) 776 with zipfile.ZipFile(target_files, 'r') as input_zip: 777 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 778 779 self.assertFalse(sparse_image.file_map['//system/file1'].extra) 780 self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete']) 781 self.assertTrue( 782 sparse_image.file_map['/system/app/file3'].extra['incomplete']) 783 784 def test_GetSparseImage_systemRootImage_nonSystemFiles(self): 785 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 786 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 787 target_files_zip.write( 788 test_utils.construct_sparse_image([(0xCAC2, 16)]), 789 arcname='IMAGES/system.img') 790 target_files_zip.writestr( 791 'IMAGES/system.map', 792 '\n'.join([ 793 '//system/file1 1-5 9-10', 794 '//init.rc 13-15'])) 795 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 796 # '/init.rc' has less blocks listed (3) than actual (4). 797 target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4)) 798 799 tempdir = common.UnzipTemp(target_files) 800 with zipfile.ZipFile(target_files, 'r') as input_zip: 801 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 802 803 self.assertFalse(sparse_image.file_map['//system/file1'].extra) 804 self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete']) 805 806 def test_GetSparseImage_fileNotFound(self): 807 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 808 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 809 target_files_zip.write( 810 test_utils.construct_sparse_image([(0xCAC2, 16)]), 811 arcname='IMAGES/system.img') 812 target_files_zip.writestr( 813 'IMAGES/system.map', 814 '\n'.join([ 815 '//system/file1 1-5 9-10', 816 '//system/file2 11-12'])) 817 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 818 819 tempdir = common.UnzipTemp(target_files) 820 with zipfile.ZipFile(target_files, 'r') as input_zip: 821 self.assertRaises( 822 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip, 823 False) 824 825 def test_GetAvbChainedPartitionArg(self): 826 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') 827 info_dict = { 828 'avb_avbtool': 'avbtool', 829 'avb_system_key_path': pubkey, 830 'avb_system_rollback_index_location': 2, 831 } 832 args = common.GetAvbChainedPartitionArg('system', info_dict).split(':') 833 self.assertEqual(3, len(args)) 834 self.assertEqual('system', args[0]) 835 self.assertEqual('2', args[1]) 836 self.assertTrue(os.path.exists(args[2])) 837 838 def test_GetAvbChainedPartitionArg_withPrivateKey(self): 839 key = os.path.join(self.testdata_dir, 'testkey.key') 840 info_dict = { 841 'avb_avbtool': 'avbtool', 842 'avb_product_key_path': key, 843 'avb_product_rollback_index_location': 2, 844 } 845 args = common.GetAvbChainedPartitionArg('product', info_dict).split(':') 846 self.assertEqual(3, len(args)) 847 self.assertEqual('product', args[0]) 848 self.assertEqual('2', args[1]) 849 self.assertTrue(os.path.exists(args[2])) 850 851 def test_GetAvbChainedPartitionArg_withSpecifiedKey(self): 852 info_dict = { 853 'avb_avbtool': 'avbtool', 854 'avb_system_key_path': 'does-not-exist', 855 'avb_system_rollback_index_location': 2, 856 } 857 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') 858 args = common.GetAvbChainedPartitionArg( 859 'system', info_dict, pubkey).split(':') 860 self.assertEqual(3, len(args)) 861 self.assertEqual('system', args[0]) 862 self.assertEqual('2', args[1]) 863 self.assertTrue(os.path.exists(args[2])) 864 865 def test_GetAvbChainedPartitionArg_invalidKey(self): 866 pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem') 867 info_dict = { 868 'avb_avbtool': 'avbtool', 869 'avb_system_key_path': pubkey, 870 'avb_system_rollback_index_location': 2, 871 } 872 self.assertRaises( 873 common.ExternalError, common.GetAvbChainedPartitionArg, 'system', 874 info_dict) 875 876 INFO_DICT_DEFAULT = { 877 'recovery_api_version': 3, 878 'fstab_version': 2, 879 'system_root_image': 'true', 880 'no_recovery' : 'true', 881 'recovery_as_boot': 'true', 882 } 883 884 @staticmethod 885 def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path): 886 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 887 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 888 info_values = ''.join( 889 ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.iteritems())]) 890 common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values) 891 892 FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults" 893 if info_dict.get('system_root_image') == 'true': 894 fstab_values = FSTAB_TEMPLATE.format('/') 895 else: 896 fstab_values = FSTAB_TEMPLATE.format('/system') 897 common.ZipWriteStr(target_files_zip, fstab_path, fstab_values) 898 899 common.ZipWriteStr( 900 target_files_zip, 'META/file_contexts', 'file-contexts') 901 return target_files 902 903 def test_LoadInfoDict(self): 904 target_files = self._test_LoadInfoDict_createTargetFiles( 905 self.INFO_DICT_DEFAULT, 906 'BOOT/RAMDISK/system/etc/recovery.fstab') 907 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 908 loaded_dict = common.LoadInfoDict(target_files_zip) 909 self.assertEqual(3, loaded_dict['recovery_api_version']) 910 self.assertEqual(2, loaded_dict['fstab_version']) 911 self.assertIn('/', loaded_dict['fstab']) 912 self.assertIn('/system', loaded_dict['fstab']) 913 914 def test_LoadInfoDict_legacyRecoveryFstabPath(self): 915 target_files = self._test_LoadInfoDict_createTargetFiles( 916 self.INFO_DICT_DEFAULT, 917 'BOOT/RAMDISK/etc/recovery.fstab') 918 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 919 loaded_dict = common.LoadInfoDict(target_files_zip) 920 self.assertEqual(3, loaded_dict['recovery_api_version']) 921 self.assertEqual(2, loaded_dict['fstab_version']) 922 self.assertIn('/', loaded_dict['fstab']) 923 self.assertIn('/system', loaded_dict['fstab']) 924 925 def test_LoadInfoDict_dirInput(self): 926 target_files = self._test_LoadInfoDict_createTargetFiles( 927 self.INFO_DICT_DEFAULT, 928 'BOOT/RAMDISK/system/etc/recovery.fstab') 929 unzipped = common.UnzipTemp(target_files) 930 loaded_dict = common.LoadInfoDict(unzipped) 931 self.assertEqual(3, loaded_dict['recovery_api_version']) 932 self.assertEqual(2, loaded_dict['fstab_version']) 933 self.assertIn('/', loaded_dict['fstab']) 934 self.assertIn('/system', loaded_dict['fstab']) 935 936 def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self): 937 target_files = self._test_LoadInfoDict_createTargetFiles( 938 self.INFO_DICT_DEFAULT, 939 'BOOT/RAMDISK/system/etc/recovery.fstab') 940 unzipped = common.UnzipTemp(target_files) 941 loaded_dict = common.LoadInfoDict(unzipped) 942 self.assertEqual(3, loaded_dict['recovery_api_version']) 943 self.assertEqual(2, loaded_dict['fstab_version']) 944 self.assertIn('/', loaded_dict['fstab']) 945 self.assertIn('/system', loaded_dict['fstab']) 946 947 def test_LoadInfoDict_systemRootImageFalse(self): 948 # Devices not using system-as-root nor recovery-as-boot. Non-A/B devices 949 # launched prior to P will likely have this config. 950 info_dict = copy.copy(self.INFO_DICT_DEFAULT) 951 del info_dict['no_recovery'] 952 del info_dict['system_root_image'] 953 del info_dict['recovery_as_boot'] 954 target_files = self._test_LoadInfoDict_createTargetFiles( 955 info_dict, 956 'RECOVERY/RAMDISK/system/etc/recovery.fstab') 957 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 958 loaded_dict = common.LoadInfoDict(target_files_zip) 959 self.assertEqual(3, loaded_dict['recovery_api_version']) 960 self.assertEqual(2, loaded_dict['fstab_version']) 961 self.assertNotIn('/', loaded_dict['fstab']) 962 self.assertIn('/system', loaded_dict['fstab']) 963 964 def test_LoadInfoDict_recoveryAsBootFalse(self): 965 # Devices using system-as-root, but with standalone recovery image. Non-A/B 966 # devices launched since P will likely have this config. 967 info_dict = copy.copy(self.INFO_DICT_DEFAULT) 968 del info_dict['no_recovery'] 969 del info_dict['recovery_as_boot'] 970 target_files = self._test_LoadInfoDict_createTargetFiles( 971 info_dict, 972 'RECOVERY/RAMDISK/system/etc/recovery.fstab') 973 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 974 loaded_dict = common.LoadInfoDict(target_files_zip) 975 self.assertEqual(3, loaded_dict['recovery_api_version']) 976 self.assertEqual(2, loaded_dict['fstab_version']) 977 self.assertIn('/', loaded_dict['fstab']) 978 self.assertIn('/system', loaded_dict['fstab']) 979 980 def test_LoadInfoDict_noRecoveryTrue(self): 981 # Device doesn't have a recovery partition at all. 982 info_dict = copy.copy(self.INFO_DICT_DEFAULT) 983 del info_dict['recovery_as_boot'] 984 target_files = self._test_LoadInfoDict_createTargetFiles( 985 info_dict, 986 'RECOVERY/RAMDISK/system/etc/recovery.fstab') 987 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 988 loaded_dict = common.LoadInfoDict(target_files_zip) 989 self.assertEqual(3, loaded_dict['recovery_api_version']) 990 self.assertEqual(2, loaded_dict['fstab_version']) 991 self.assertIsNone(loaded_dict['fstab']) 992 993 def test_LoadInfoDict_missingMetaMiscInfoTxt(self): 994 target_files = self._test_LoadInfoDict_createTargetFiles( 995 self.INFO_DICT_DEFAULT, 996 'BOOT/RAMDISK/system/etc/recovery.fstab') 997 common.ZipDelete(target_files, 'META/misc_info.txt') 998 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 999 self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip) 1000 1001 def test_LoadInfoDict_repacking(self): 1002 target_files = self._test_LoadInfoDict_createTargetFiles( 1003 self.INFO_DICT_DEFAULT, 1004 'BOOT/RAMDISK/system/etc/recovery.fstab') 1005 unzipped = common.UnzipTemp(target_files) 1006 loaded_dict = common.LoadInfoDict(unzipped, True) 1007 self.assertEqual(3, loaded_dict['recovery_api_version']) 1008 self.assertEqual(2, loaded_dict['fstab_version']) 1009 self.assertIn('/', loaded_dict['fstab']) 1010 self.assertIn('/system', loaded_dict['fstab']) 1011 self.assertEqual( 1012 os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir']) 1013 self.assertEqual( 1014 os.path.join(unzipped, 'META', 'root_filesystem_config.txt'), 1015 loaded_dict['root_fs_config']) 1016 1017 def test_LoadInfoDict_repackingWithZipFileInput(self): 1018 target_files = self._test_LoadInfoDict_createTargetFiles( 1019 self.INFO_DICT_DEFAULT, 1020 'BOOT/RAMDISK/system/etc/recovery.fstab') 1021 with zipfile.ZipFile(target_files, 'r') as target_files_zip: 1022 self.assertRaises( 1023 AssertionError, common.LoadInfoDict, target_files_zip, True) 1024 1025 1026 class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase): 1027 """Checks the format of install-recovery.sh. 1028 1029 Its format should match between common.py and validate_target_files.py. 1030 """ 1031 1032 def setUp(self): 1033 self._tempdir = common.MakeTempDir() 1034 # Create a dummy dict that contains the fstab info for boot&recovery. 1035 self._info = {"fstab" : {}} 1036 dummy_fstab = [ 1037 "/dev/soc.0/by-name/boot /boot emmc defaults defaults", 1038 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"] 1039 self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab) 1040 # Construct the gzipped recovery.img and boot.img 1041 self.recovery_data = bytearray([ 1042 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a, 1043 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3, 1044 0x08, 0x00, 0x00, 0x00 1045 ]) 1046 # echo -n "boot" | gzip -f | hd 1047 self.boot_data = bytearray([ 1048 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca, 1049 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00 1050 ]) 1051 1052 def _out_tmp_sink(self, name, data, prefix="SYSTEM"): 1053 loc = os.path.join(self._tempdir, prefix, name) 1054 if not os.path.exists(os.path.dirname(loc)): 1055 os.makedirs(os.path.dirname(loc)) 1056 with open(loc, "w+") as f: 1057 f.write(data) 1058 1059 def test_full_recovery(self): 1060 recovery_image = common.File("recovery.img", self.recovery_data) 1061 boot_image = common.File("boot.img", self.boot_data) 1062 self._info["full_recovery_image"] = "true" 1063 1064 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 1065 recovery_image, boot_image, self._info) 1066 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 1067 self._info) 1068 1069 def test_recovery_from_boot(self): 1070 recovery_image = common.File("recovery.img", self.recovery_data) 1071 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES") 1072 boot_image = common.File("boot.img", self.boot_data) 1073 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES") 1074 1075 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 1076 recovery_image, boot_image, self._info) 1077 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 1078 self._info) 1079 # Validate 'recovery-from-boot' with bonus argument. 1080 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM") 1081 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 1082 recovery_image, boot_image, self._info) 1083 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 1084 self._info) 1085 1086 1087 class MockScriptWriter(object): 1088 """A class that mocks edify_generator.EdifyGenerator. 1089 """ 1090 def __init__(self, enable_comments=False): 1091 self.lines = [] 1092 self.enable_comments = enable_comments 1093 def Comment(self, comment): 1094 if self.enable_comments: 1095 self.lines.append("# {}".format(comment)) 1096 def AppendExtra(self, extra): 1097 self.lines.append(extra) 1098 def __str__(self): 1099 return "\n".join(self.lines) 1100 1101 1102 class MockBlockDifference(object): 1103 def __init__(self, partition, tgt, src=None): 1104 self.partition = partition 1105 self.tgt = tgt 1106 self.src = src 1107 def WriteScript(self, script, _, progress=None, 1108 write_verify_script=False): 1109 if progress: 1110 script.AppendExtra("progress({})".format(progress)) 1111 script.AppendExtra("patch({});".format(self.partition)) 1112 if write_verify_script: 1113 self.WritePostInstallVerifyScript(script) 1114 def WritePostInstallVerifyScript(self, script): 1115 script.AppendExtra("verify({});".format(self.partition)) 1116 1117 1118 class FakeSparseImage(object): 1119 def __init__(self, size): 1120 self.blocksize = 4096 1121 self.total_blocks = size // 4096 1122 assert size % 4096 == 0, "{} is not a multiple of 4096".format(size) 1123 1124 1125 class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase): 1126 @staticmethod 1127 def get_op_list(output_path): 1128 with zipfile.ZipFile(output_path, 'r') as output_zip: 1129 with output_zip.open("dynamic_partitions_op_list") as op_list: 1130 return [line.strip() for line in op_list.readlines() 1131 if not line.startswith("#")] 1132 1133 def setUp(self): 1134 self.script = MockScriptWriter() 1135 self.output_path = common.MakeTempFile(suffix='.zip') 1136 1137 def test_full(self): 1138 target_info = common.LoadDictionaryFromLines(""" 1139 dynamic_partition_list=system vendor 1140 super_partition_groups=group_foo 1141 super_group_foo_group_size={group_size} 1142 super_group_foo_partition_list=system vendor 1143 """.format(group_size=4 * GiB).split("\n")) 1144 block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)), 1145 MockBlockDifference("vendor", FakeSparseImage(1 * GiB))] 1146 1147 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs) 1148 with zipfile.ZipFile(self.output_path, 'w') as output_zip: 1149 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True) 1150 1151 self.assertEqual(str(self.script).strip(), """ 1152 assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list"))); 1153 patch(vendor); 1154 verify(vendor); 1155 unmap_partition("vendor"); 1156 patch(system); 1157 verify(system); 1158 unmap_partition("system"); 1159 """.strip()) 1160 1161 lines = self.get_op_list(self.output_path) 1162 1163 remove_all_groups = lines.index("remove_all_groups") 1164 add_group = lines.index("add_group group_foo 4294967296") 1165 add_vendor = lines.index("add vendor group_foo") 1166 add_system = lines.index("add system group_foo") 1167 resize_vendor = lines.index("resize vendor 1073741824") 1168 resize_system = lines.index("resize system 3221225472") 1169 1170 self.assertLess(remove_all_groups, add_group, 1171 "Should add groups after removing all groups") 1172 self.assertLess(add_group, min(add_vendor, add_system), 1173 "Should add partitions after adding group") 1174 self.assertLess(add_system, resize_system, 1175 "Should resize system after adding it") 1176 self.assertLess(add_vendor, resize_vendor, 1177 "Should resize vendor after adding it") 1178 1179 def test_inc_groups(self): 1180 source_info = common.LoadDictionaryFromLines(""" 1181 super_partition_groups=group_foo group_bar group_baz 1182 super_group_foo_group_size={group_foo_size} 1183 super_group_bar_group_size={group_bar_size} 1184 """.format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n")) 1185 target_info = common.LoadDictionaryFromLines(""" 1186 super_partition_groups=group_foo group_baz group_qux 1187 super_group_foo_group_size={group_foo_size} 1188 super_group_baz_group_size={group_baz_size} 1189 super_group_qux_group_size={group_qux_size} 1190 """.format(group_foo_size=3 * GiB, group_baz_size=4 * GiB, 1191 group_qux_size=1 * GiB).split("\n")) 1192 1193 dp_diff = common.DynamicPartitionsDifference(target_info, 1194 block_diffs=[], 1195 source_info_dict=source_info) 1196 with zipfile.ZipFile(self.output_path, 'w') as output_zip: 1197 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True) 1198 1199 lines = self.get_op_list(self.output_path) 1200 1201 removed = lines.index("remove_group group_bar") 1202 shrunk = lines.index("resize_group group_foo 3221225472") 1203 grown = lines.index("resize_group group_baz 4294967296") 1204 added = lines.index("add_group group_qux 1073741824") 1205 1206 self.assertLess(max(removed, shrunk) < min(grown, added), 1207 "ops that remove / shrink partitions must precede ops that " 1208 "grow / add partitions") 1209 1210 def test_incremental(self): 1211 source_info = common.LoadDictionaryFromLines(""" 1212 dynamic_partition_list=system vendor product product_services 1213 super_partition_groups=group_foo 1214 super_group_foo_group_size={group_foo_size} 1215 super_group_foo_partition_list=system vendor product product_services 1216 """.format(group_foo_size=4 * GiB).split("\n")) 1217 target_info = common.LoadDictionaryFromLines(""" 1218 dynamic_partition_list=system vendor product odm 1219 super_partition_groups=group_foo group_bar 1220 super_group_foo_group_size={group_foo_size} 1221 super_group_foo_partition_list=system vendor odm 1222 super_group_bar_group_size={group_bar_size} 1223 super_group_bar_partition_list=product 1224 """.format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n")) 1225 1226 block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB), 1227 src=FakeSparseImage(1024 * MiB)), 1228 MockBlockDifference("vendor", FakeSparseImage(512 * MiB), 1229 src=FakeSparseImage(1024 * MiB)), 1230 MockBlockDifference("product", FakeSparseImage(1024 * MiB), 1231 src=FakeSparseImage(1024 * MiB)), 1232 MockBlockDifference("product_services", None, 1233 src=FakeSparseImage(1024 * MiB)), 1234 MockBlockDifference("odm", FakeSparseImage(1024 * MiB), 1235 src=None)] 1236 1237 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs, 1238 source_info_dict=source_info) 1239 with zipfile.ZipFile(self.output_path, 'w') as output_zip: 1240 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True) 1241 1242 metadata_idx = self.script.lines.index( 1243 'assert(update_dynamic_partitions(package_extract_file(' 1244 '"dynamic_partitions_op_list")));') 1245 self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx) 1246 self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);')) 1247 for p in ("product", "system", "odm"): 1248 patch_idx = self.script.lines.index("patch({});".format(p)) 1249 verify_idx = self.script.lines.index("verify({});".format(p)) 1250 self.assertLess(metadata_idx, patch_idx, 1251 "Should patch {} after updating metadata".format(p)) 1252 self.assertLess(patch_idx, verify_idx, 1253 "Should verify {} after patching".format(p)) 1254 1255 self.assertNotIn("patch(product_services);", self.script.lines) 1256 1257 lines = self.get_op_list(self.output_path) 1258 1259 remove = lines.index("remove product_services") 1260 move_product_out = lines.index("move product default") 1261 shrink = lines.index("resize vendor 536870912") 1262 shrink_group = lines.index("resize_group group_foo 3221225472") 1263 add_group_bar = lines.index("add_group group_bar 1073741824") 1264 add_odm = lines.index("add odm group_foo") 1265 grow_existing = lines.index("resize system 1610612736") 1266 grow_added = lines.index("resize odm 1073741824") 1267 move_product_in = lines.index("move product group_bar") 1268 1269 max_idx_move_partition_out_foo = max(remove, move_product_out, shrink) 1270 min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added) 1271 1272 self.assertLess(max_idx_move_partition_out_foo, shrink_group, 1273 "Must shrink group after partitions inside group are shrunk" 1274 " / removed") 1275 1276 self.assertLess(add_group_bar, move_product_in, 1277 "Must add partitions to group after group is added") 1278 1279 self.assertLess(max_idx_move_partition_out_foo, 1280 min_idx_move_partition_in_foo, 1281 "Must shrink partitions / remove partitions from group" 1282 "before adding / moving partitions into group") 1283 1284 def test_remove_partition(self): 1285 source_info = common.LoadDictionaryFromLines(""" 1286 blockimgdiff_versions=3,4 1287 use_dynamic_partitions=true 1288 dynamic_partition_list=foo 1289 super_partition_groups=group_foo 1290 super_group_foo_group_size={group_foo_size} 1291 super_group_foo_partition_list=foo 1292 """.format(group_foo_size=4 * GiB).split("\n")) 1293 target_info = common.LoadDictionaryFromLines(""" 1294 blockimgdiff_versions=3,4 1295 use_dynamic_partitions=true 1296 super_partition_groups=group_foo 1297 super_group_foo_group_size={group_foo_size} 1298 """.format(group_foo_size=4 * GiB).split("\n")) 1299 1300 common.OPTIONS.info_dict = target_info 1301 common.OPTIONS.target_info_dict = target_info 1302 common.OPTIONS.source_info_dict = source_info 1303 common.OPTIONS.cache_size = 4 * 4096 1304 1305 block_diffs = [common.BlockDifference("foo", EmptyImage(), 1306 src=DataImage("source", pad=True))] 1307 1308 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs, 1309 source_info_dict=source_info) 1310 with zipfile.ZipFile(self.output_path, 'w') as output_zip: 1311 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True) 1312 1313 self.assertNotIn("block_image_update", str(self.script), 1314 "Removed partition should not be patched.") 1315 1316 lines = self.get_op_list(self.output_path) 1317 self.assertEqual(lines, ["remove foo"]) 1318