1 #!/usr/bin/env python 2 # 3 # Copyright (C) 2009 The Android Open Source Project 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 17 """ 18 Check the signatures of all APKs in a target_files .zip file. With 19 -c, compare the signatures of each package to the ones in a separate 20 target_files (usually a previously distributed build for the same 21 device) and flag any changes. 22 23 Usage: check_target_file_signatures [flags] target_files 24 25 -c (--compare_with) <other_target_files> 26 Look for compatibility problems between the two sets of target 27 files (eg., packages whose keys have changed). 28 29 -l (--local_cert_dirs) <dir,dir,...> 30 Comma-separated list of top-level directories to scan for 31 .x509.pem files. Defaults to "vendor,build". Where cert files 32 can be found that match APK signatures, the filename will be 33 printed as the cert name, otherwise a hash of the cert plus its 34 subject string will be printed instead. 35 36 -t (--text) 37 Dump the certificate information for both packages in comparison 38 mode (this output is normally suppressed). 39 40 """ 41 42 import sys 43 44 if sys.hexversion < 0x02070000: 45 print >> sys.stderr, "Python 2.7 or newer is required." 46 sys.exit(1) 47 48 import os 49 import re 50 import shutil 51 import subprocess 52 import zipfile 53 54 import common 55 56 # Work around a bug in python's zipfile module that prevents opening 57 # of zipfiles if any entry has an extra field of between 1 and 3 bytes 58 # (which is common with zipaligned APKs). This overrides the 59 # ZipInfo._decodeExtra() method (which contains the bug) with an empty 60 # version (since we don't need to decode the extra field anyway). 61 class MyZipInfo(zipfile.ZipInfo): 62 def _decodeExtra(self): 63 pass 64 zipfile.ZipInfo = MyZipInfo 65 66 OPTIONS = common.OPTIONS 67 68 OPTIONS.text = False 69 OPTIONS.compare_with = None 70 OPTIONS.local_cert_dirs = ("vendor", "build") 71 72 PROBLEMS = [] 73 PROBLEM_PREFIX = [] 74 75 def AddProblem(msg): 76 PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg) 77 def Push(msg): 78 PROBLEM_PREFIX.append(msg) 79 def Pop(): 80 PROBLEM_PREFIX.pop() 81 82 83 def Banner(msg): 84 print "-" * 70 85 print " ", msg 86 print "-" * 70 87 88 89 def GetCertSubject(cert): 90 p = common.Run(["openssl", "x509", "-inform", "DER", "-text"], 91 stdin=subprocess.PIPE, 92 stdout=subprocess.PIPE) 93 out, err = p.communicate(cert) 94 if err and not err.strip(): 95 return "(error reading cert subject)" 96 for line in out.split("\n"): 97 line = line.strip() 98 if line.startswith("Subject:"): 99 return line[8:].strip() 100 return "(unknown cert subject)" 101 102 103 class CertDB(object): 104 def __init__(self): 105 self.certs = {} 106 107 def Add(self, cert, name=None): 108 if cert in self.certs: 109 if name: 110 self.certs[cert] = self.certs[cert] + "," + name 111 else: 112 if name is None: 113 name = "unknown cert %s (%s)" % (common.sha1(cert).hexdigest()[:12], 114 GetCertSubject(cert)) 115 self.certs[cert] = name 116 117 def Get(self, cert): 118 """Return the name for a given cert.""" 119 return self.certs.get(cert, None) 120 121 def FindLocalCerts(self): 122 to_load = [] 123 for top in OPTIONS.local_cert_dirs: 124 for dirpath, _, filenames in os.walk(top): 125 certs = [os.path.join(dirpath, i) 126 for i in filenames if i.endswith(".x509.pem")] 127 if certs: 128 to_load.extend(certs) 129 130 for i in to_load: 131 f = open(i) 132 cert = common.ParseCertificate(f.read()) 133 f.close() 134 name, _ = os.path.splitext(i) 135 name, _ = os.path.splitext(name) 136 self.Add(cert, name) 137 138 ALL_CERTS = CertDB() 139 140 141 def CertFromPKCS7(data, filename): 142 """Read the cert out of a PKCS#7-format file (which is what is 143 stored in a signed .apk).""" 144 Push(filename + ":") 145 try: 146 p = common.Run(["openssl", "pkcs7", 147 "-inform", "DER", 148 "-outform", "PEM", 149 "-print_certs"], 150 stdin=subprocess.PIPE, 151 stdout=subprocess.PIPE) 152 out, err = p.communicate(data) 153 if err and not err.strip(): 154 AddProblem("error reading cert:\n" + err) 155 return None 156 157 cert = common.ParseCertificate(out) 158 if not cert: 159 AddProblem("error parsing cert output") 160 return None 161 return cert 162 finally: 163 Pop() 164 165 166 class APK(object): 167 def __init__(self, full_filename, filename): 168 self.filename = filename 169 self.certs = None 170 self.shared_uid = None 171 self.package = None 172 173 Push(filename+":") 174 try: 175 self.RecordCerts(full_filename) 176 self.ReadManifest(full_filename) 177 finally: 178 Pop() 179 180 def RecordCerts(self, full_filename): 181 out = set() 182 try: 183 f = open(full_filename) 184 apk = zipfile.ZipFile(f, "r") 185 pkcs7 = None 186 for info in apk.infolist(): 187 if info.filename.startswith("META-INF/") and \ 188 (info.filename.endswith(".DSA") or info.filename.endswith(".RSA")): 189 pkcs7 = apk.read(info.filename) 190 cert = CertFromPKCS7(pkcs7, info.filename) 191 out.add(cert) 192 ALL_CERTS.Add(cert) 193 if not pkcs7: 194 AddProblem("no signature") 195 finally: 196 f.close() 197 self.certs = frozenset(out) 198 199 def ReadManifest(self, full_filename): 200 p = common.Run(["aapt", "dump", "xmltree", full_filename, 201 "AndroidManifest.xml"], 202 stdout=subprocess.PIPE) 203 manifest, err = p.communicate() 204 if err: 205 AddProblem("failed to read manifest") 206 return 207 208 self.shared_uid = None 209 self.package = None 210 211 for line in manifest.split("\n"): 212 line = line.strip() 213 m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line) 214 if m: 215 name = m.group(1) 216 if name == "android:sharedUserId": 217 if self.shared_uid is not None: 218 AddProblem("multiple sharedUserId declarations") 219 self.shared_uid = m.group(2) 220 elif name == "package": 221 if self.package is not None: 222 AddProblem("multiple package declarations") 223 self.package = m.group(2) 224 225 if self.package is None: 226 AddProblem("no package declaration") 227 228 229 class TargetFiles(object): 230 def __init__(self): 231 self.max_pkg_len = 30 232 self.max_fn_len = 20 233 self.apks = None 234 self.apks_by_basename = None 235 self.certmap = None 236 237 def LoadZipFile(self, filename): 238 # First read the APK certs file to figure out whether there are compressed 239 # APKs in the archive. If we do have compressed APKs in the archive, then we 240 # must decompress them individually before we perform any analysis. 241 242 # This is the list of wildcards of files we extract from |filename|. 243 apk_extensions = ['*.apk'] 244 245 self.certmap, compressed_extension = common.ReadApkCerts(zipfile.ZipFile(filename, "r")) 246 if compressed_extension: 247 apk_extensions.append("*.apk" + compressed_extension) 248 249 d, z = common.UnzipTemp(filename, apk_extensions) 250 try: 251 self.apks = {} 252 self.apks_by_basename = {} 253 for dirpath, _, filenames in os.walk(d): 254 for fn in filenames: 255 # Decompress compressed APKs before we begin processing them. 256 if compressed_extension and fn.endswith(compressed_extension): 257 # First strip the compressed extension from the file. 258 uncompressed_fn = fn[:-len(compressed_extension)] 259 260 # Decompress the compressed file to the output file. 261 common.Gunzip(os.path.join(dirpath, fn), 262 os.path.join(dirpath, uncompressed_fn)) 263 264 # Finally, delete the compressed file and use the uncompressed file 265 # for further processing. Note that the deletion is not strictly required, 266 # but is done here to ensure that we're not using too much space in 267 # the temporary directory. 268 os.remove(os.path.join(dirpath, fn)) 269 fn = uncompressed_fn 270 271 272 if fn.endswith(".apk"): 273 fullname = os.path.join(dirpath, fn) 274 displayname = fullname[len(d)+1:] 275 apk = APK(fullname, displayname) 276 self.apks[apk.filename] = apk 277 self.apks_by_basename[os.path.basename(apk.filename)] = apk 278 279 self.max_pkg_len = max(self.max_pkg_len, len(apk.package)) 280 self.max_fn_len = max(self.max_fn_len, len(apk.filename)) 281 finally: 282 shutil.rmtree(d) 283 284 z.close() 285 286 def CheckSharedUids(self): 287 """Look for any instances where packages signed with different 288 certs request the same sharedUserId.""" 289 apks_by_uid = {} 290 for apk in self.apks.itervalues(): 291 if apk.shared_uid: 292 apks_by_uid.setdefault(apk.shared_uid, []).append(apk) 293 294 for uid in sorted(apks_by_uid.keys()): 295 apks = apks_by_uid[uid] 296 for apk in apks[1:]: 297 if apk.certs != apks[0].certs: 298 break 299 else: 300 # all packages have the same set of certs; this uid is fine. 301 continue 302 303 AddProblem("different cert sets for packages with uid %s" % (uid,)) 304 305 print "uid %s is shared by packages with different cert sets:" % (uid,) 306 for apk in apks: 307 print "%-*s [%s]" % (self.max_pkg_len, apk.package, apk.filename) 308 for cert in apk.certs: 309 print " ", ALL_CERTS.Get(cert) 310 print 311 312 def CheckExternalSignatures(self): 313 for apk_filename, certname in self.certmap.iteritems(): 314 if certname == "EXTERNAL": 315 # Apps marked EXTERNAL should be signed with the test key 316 # during development, then manually re-signed after 317 # predexopting. Consider it an error if this app is now 318 # signed with any key that is present in our tree. 319 apk = self.apks_by_basename[apk_filename] 320 name = ALL_CERTS.Get(apk.cert) 321 if not name.startswith("unknown "): 322 Push(apk.filename) 323 AddProblem("hasn't been signed with EXTERNAL cert") 324 Pop() 325 326 def PrintCerts(self): 327 """Display a table of packages grouped by cert.""" 328 by_cert = {} 329 for apk in self.apks.itervalues(): 330 for cert in apk.certs: 331 by_cert.setdefault(cert, []).append((apk.package, apk)) 332 333 order = [(-len(v), k) for (k, v) in by_cert.iteritems()] 334 order.sort() 335 336 for _, cert in order: 337 print "%s:" % (ALL_CERTS.Get(cert),) 338 apks = by_cert[cert] 339 apks.sort() 340 for _, apk in apks: 341 if apk.shared_uid: 342 print " %-*s %-*s [%s]" % (self.max_fn_len, apk.filename, 343 self.max_pkg_len, apk.package, 344 apk.shared_uid) 345 else: 346 print " %-*s %s" % (self.max_fn_len, apk.filename, apk.package) 347 print 348 349 def CompareWith(self, other): 350 """Look for instances where a given package that exists in both 351 self and other have different certs.""" 352 353 all_apks = set(self.apks.keys()) 354 all_apks.update(other.apks.keys()) 355 356 max_pkg_len = max(self.max_pkg_len, other.max_pkg_len) 357 358 by_certpair = {} 359 360 for i in all_apks: 361 if i in self.apks: 362 if i in other.apks: 363 # in both; should have same set of certs 364 if self.apks[i].certs != other.apks[i].certs: 365 by_certpair.setdefault((other.apks[i].certs, 366 self.apks[i].certs), []).append(i) 367 else: 368 print "%s [%s]: new APK (not in comparison target_files)" % ( 369 i, self.apks[i].filename) 370 else: 371 if i in other.apks: 372 print "%s [%s]: removed APK (only in comparison target_files)" % ( 373 i, other.apks[i].filename) 374 375 if by_certpair: 376 AddProblem("some APKs changed certs") 377 Banner("APK signing differences") 378 for (old, new), packages in sorted(by_certpair.items()): 379 for i, o in enumerate(old): 380 if i == 0: 381 print "was", ALL_CERTS.Get(o) 382 else: 383 print " ", ALL_CERTS.Get(o) 384 for i, n in enumerate(new): 385 if i == 0: 386 print "now", ALL_CERTS.Get(n) 387 else: 388 print " ", ALL_CERTS.Get(n) 389 for i in sorted(packages): 390 old_fn = other.apks[i].filename 391 new_fn = self.apks[i].filename 392 if old_fn == new_fn: 393 print " %-*s [%s]" % (max_pkg_len, i, old_fn) 394 else: 395 print " %-*s [was: %s; now: %s]" % (max_pkg_len, i, 396 old_fn, new_fn) 397 print 398 399 400 def main(argv): 401 def option_handler(o, a): 402 if o in ("-c", "--compare_with"): 403 OPTIONS.compare_with = a 404 elif o in ("-l", "--local_cert_dirs"): 405 OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")] 406 elif o in ("-t", "--text"): 407 OPTIONS.text = True 408 else: 409 return False 410 return True 411 412 args = common.ParseOptions(argv, __doc__, 413 extra_opts="c:l:t", 414 extra_long_opts=["compare_with=", 415 "local_cert_dirs="], 416 extra_option_handler=option_handler) 417 418 if len(args) != 1: 419 common.Usage(__doc__) 420 sys.exit(1) 421 422 ALL_CERTS.FindLocalCerts() 423 424 Push("input target_files:") 425 try: 426 target_files = TargetFiles() 427 target_files.LoadZipFile(args[0]) 428 finally: 429 Pop() 430 431 compare_files = None 432 if OPTIONS.compare_with: 433 Push("comparison target_files:") 434 try: 435 compare_files = TargetFiles() 436 compare_files.LoadZipFile(OPTIONS.compare_with) 437 finally: 438 Pop() 439 440 if OPTIONS.text or not compare_files: 441 Banner("target files") 442 target_files.PrintCerts() 443 target_files.CheckSharedUids() 444 target_files.CheckExternalSignatures() 445 if compare_files: 446 if OPTIONS.text: 447 Banner("comparison files") 448 compare_files.PrintCerts() 449 target_files.CompareWith(compare_files) 450 451 if PROBLEMS: 452 print "%d problem(s) found:\n" % (len(PROBLEMS),) 453 for p in PROBLEMS: 454 print p 455 return 1 456 457 return 0 458 459 460 if __name__ == '__main__': 461 try: 462 r = main(sys.argv[1:]) 463 sys.exit(r) 464 except common.ExternalError as e: 465 print 466 print " ERROR: %s" % (e,) 467 print 468 sys.exit(1) 469