Home | History | Annotate | Download | only in releasetools
      1 #!/usr/bin/env python
      2 #
      3 # Copyright (C) 2009 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 #      http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 """
     18 Check the signatures of all APKs in a target_files .zip file.  With
     19 -c, compare the signatures of each package to the ones in a separate
     20 target_files (usually a previously distributed build for the same
     21 device) and flag any changes.
     22 
     23 Usage:  check_target_file_signatures [flags] target_files
     24 
     25   -c  (--compare_with)  <other_target_files>
     26       Look for compatibility problems between the two sets of target
     27       files (eg., packages whose keys have changed).
     28 
     29   -l  (--local_cert_dirs)  <dir,dir,...>
     30       Comma-separated list of top-level directories to scan for
     31       .x509.pem files.  Defaults to "vendor,build".  Where cert files
     32       can be found that match APK signatures, the filename will be
     33       printed as the cert name, otherwise a hash of the cert plus its
     34       subject string will be printed instead.
     35 
     36   -t  (--text)
     37       Dump the certificate information for both packages in comparison
     38       mode (this output is normally suppressed).
     39 
     40 """
     41 
     42 import os
     43 import re
     44 import subprocess
     45 import sys
     46 import zipfile
     47 
     48 import common
     49 
     50 if sys.hexversion < 0x02070000:
     51   print >> sys.stderr, "Python 2.7 or newer is required."
     52   sys.exit(1)
     53 
     54 
     55 # Work around a bug in Python's zipfile module that prevents opening of zipfiles
     56 # if any entry has an extra field of between 1 and 3 bytes (which is common with
     57 # zipaligned APKs). This overrides the ZipInfo._decodeExtra() method (which
     58 # contains the bug) with an empty version (since we don't need to decode the
     59 # extra field anyway).
     60 # Issue #14315: https://bugs.python.org/issue14315, fixed in Python 2.7.8 and
     61 # Python 3.5.0 alpha 1.
     62 class MyZipInfo(zipfile.ZipInfo):
     63   def _decodeExtra(self):
     64     pass
     65 zipfile.ZipInfo = MyZipInfo
     66 
     67 OPTIONS = common.OPTIONS
     68 
     69 OPTIONS.text = False
     70 OPTIONS.compare_with = None
     71 OPTIONS.local_cert_dirs = ("vendor", "build")
     72 
     73 PROBLEMS = []
     74 PROBLEM_PREFIX = []
     75 
     76 def AddProblem(msg):
     77   PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg)
     78 def Push(msg):
     79   PROBLEM_PREFIX.append(msg)
     80 def Pop():
     81   PROBLEM_PREFIX.pop()
     82 
     83 
     84 def Banner(msg):
     85   print "-" * 70
     86   print "  ", msg
     87   print "-" * 70
     88 
     89 
     90 def GetCertSubject(cert):
     91   p = common.Run(["openssl", "x509", "-inform", "DER", "-text"],
     92                  stdin=subprocess.PIPE,
     93                  stdout=subprocess.PIPE)
     94   out, err = p.communicate(cert)
     95   if err and not err.strip():
     96     return "(error reading cert subject)"
     97   for line in out.split("\n"):
     98     line = line.strip()
     99     if line.startswith("Subject:"):
    100       return line[8:].strip()
    101   return "(unknown cert subject)"
    102 
    103 
    104 class CertDB(object):
    105   def __init__(self):
    106     self.certs = {}
    107 
    108   def Add(self, cert, name=None):
    109     if cert in self.certs:
    110       if name:
    111         self.certs[cert] = self.certs[cert] + "," + name
    112     else:
    113       if name is None:
    114         name = "unknown cert %s (%s)" % (common.sha1(cert).hexdigest()[:12],
    115                                          GetCertSubject(cert))
    116       self.certs[cert] = name
    117 
    118   def Get(self, cert):
    119     """Return the name for a given cert."""
    120     return self.certs.get(cert, None)
    121 
    122   def FindLocalCerts(self):
    123     to_load = []
    124     for top in OPTIONS.local_cert_dirs:
    125       for dirpath, _, filenames in os.walk(top):
    126         certs = [os.path.join(dirpath, i)
    127                  for i in filenames if i.endswith(".x509.pem")]
    128         if certs:
    129           to_load.extend(certs)
    130 
    131     for i in to_load:
    132       f = open(i)
    133       cert = common.ParseCertificate(f.read())
    134       f.close()
    135       name, _ = os.path.splitext(i)
    136       name, _ = os.path.splitext(name)
    137       self.Add(cert, name)
    138 
    139 ALL_CERTS = CertDB()
    140 
    141 
    142 def CertFromPKCS7(data, filename):
    143   """Read the cert out of a PKCS#7-format file (which is what is
    144   stored in a signed .apk)."""
    145   Push(filename + ":")
    146   try:
    147     p = common.Run(["openssl", "pkcs7",
    148                     "-inform", "DER",
    149                     "-outform", "PEM",
    150                     "-print_certs"],
    151                    stdin=subprocess.PIPE,
    152                    stdout=subprocess.PIPE)
    153     out, err = p.communicate(data)
    154     if err and not err.strip():
    155       AddProblem("error reading cert:\n" + err)
    156       return None
    157 
    158     cert = common.ParseCertificate(out)
    159     if not cert:
    160       AddProblem("error parsing cert output")
    161       return None
    162     return cert
    163   finally:
    164     Pop()
    165 
    166 
    167 class APK(object):
    168   def __init__(self, full_filename, filename):
    169     self.filename = filename
    170     self.certs = None
    171     self.shared_uid = None
    172     self.package = None
    173 
    174     Push(filename+":")
    175     try:
    176       self.RecordCerts(full_filename)
    177       self.ReadManifest(full_filename)
    178     finally:
    179       Pop()
    180 
    181   def RecordCerts(self, full_filename):
    182     out = set()
    183     try:
    184       f = open(full_filename)
    185       apk = zipfile.ZipFile(f, "r")
    186       pkcs7 = None
    187       for info in apk.infolist():
    188         if info.filename.startswith("META-INF/") and \
    189            (info.filename.endswith(".DSA") or info.filename.endswith(".RSA")):
    190           pkcs7 = apk.read(info.filename)
    191           cert = CertFromPKCS7(pkcs7, info.filename)
    192           out.add(cert)
    193           ALL_CERTS.Add(cert)
    194       if not pkcs7:
    195         AddProblem("no signature")
    196     finally:
    197       f.close()
    198       self.certs = frozenset(out)
    199 
    200   def ReadManifest(self, full_filename):
    201     p = common.Run(["aapt", "dump", "xmltree", full_filename,
    202                     "AndroidManifest.xml"],
    203                    stdout=subprocess.PIPE)
    204     manifest, err = p.communicate()
    205     if err:
    206       AddProblem("failed to read manifest")
    207       return
    208 
    209     self.shared_uid = None
    210     self.package = None
    211 
    212     for line in manifest.split("\n"):
    213       line = line.strip()
    214       m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line)
    215       if m:
    216         name = m.group(1)
    217         if name == "android:sharedUserId":
    218           if self.shared_uid is not None:
    219             AddProblem("multiple sharedUserId declarations")
    220           self.shared_uid = m.group(2)
    221         elif name == "package":
    222           if self.package is not None:
    223             AddProblem("multiple package declarations")
    224           self.package = m.group(2)
    225 
    226     if self.package is None:
    227       AddProblem("no package declaration")
    228 
    229 
    230 class TargetFiles(object):
    231   def __init__(self):
    232     self.max_pkg_len = 30
    233     self.max_fn_len = 20
    234     self.apks = None
    235     self.apks_by_basename = None
    236     self.certmap = None
    237 
    238   def LoadZipFile(self, filename):
    239     # First read the APK certs file to figure out whether there are compressed
    240     # APKs in the archive. If we do have compressed APKs in the archive, then we
    241     # must decompress them individually before we perform any analysis.
    242 
    243     # This is the list of wildcards of files we extract from |filename|.
    244     apk_extensions = ['*.apk']
    245 
    246     self.certmap, compressed_extension = common.ReadApkCerts(
    247         zipfile.ZipFile(filename, "r"))
    248     if compressed_extension:
    249       apk_extensions.append("*.apk" + compressed_extension)
    250 
    251     d = common.UnzipTemp(filename, apk_extensions)
    252     self.apks = {}
    253     self.apks_by_basename = {}
    254     for dirpath, _, filenames in os.walk(d):
    255       for fn in filenames:
    256         # Decompress compressed APKs before we begin processing them.
    257         if compressed_extension and fn.endswith(compressed_extension):
    258           # First strip the compressed extension from the file.
    259           uncompressed_fn = fn[:-len(compressed_extension)]
    260 
    261           # Decompress the compressed file to the output file.
    262           common.Gunzip(os.path.join(dirpath, fn),
    263                         os.path.join(dirpath, uncompressed_fn))
    264 
    265           # Finally, delete the compressed file and use the uncompressed file
    266           # for further processing. Note that the deletion is not strictly
    267           # required, but is done here to ensure that we're not using too much
    268           # space in the temporary directory.
    269           os.remove(os.path.join(dirpath, fn))
    270           fn = uncompressed_fn
    271 
    272         if fn.endswith(".apk"):
    273           fullname = os.path.join(dirpath, fn)
    274           displayname = fullname[len(d)+1:]
    275           apk = APK(fullname, displayname)
    276           self.apks[apk.filename] = apk
    277           self.apks_by_basename[os.path.basename(apk.filename)] = apk
    278 
    279           self.max_pkg_len = max(self.max_pkg_len, len(apk.package))
    280           self.max_fn_len = max(self.max_fn_len, len(apk.filename))
    281 
    282   def CheckSharedUids(self):
    283     """Look for any instances where packages signed with different
    284     certs request the same sharedUserId."""
    285     apks_by_uid = {}
    286     for apk in self.apks.itervalues():
    287       if apk.shared_uid:
    288         apks_by_uid.setdefault(apk.shared_uid, []).append(apk)
    289 
    290     for uid in sorted(apks_by_uid):
    291       apks = apks_by_uid[uid]
    292       for apk in apks[1:]:
    293         if apk.certs != apks[0].certs:
    294           break
    295       else:
    296         # all packages have the same set of certs; this uid is fine.
    297         continue
    298 
    299       AddProblem("different cert sets for packages with uid %s" % (uid,))
    300 
    301       print "uid %s is shared by packages with different cert sets:" % (uid,)
    302       for apk in apks:
    303         print "%-*s  [%s]" % (self.max_pkg_len, apk.package, apk.filename)
    304         for cert in apk.certs:
    305           print "   ", ALL_CERTS.Get(cert)
    306       print
    307 
    308   def CheckExternalSignatures(self):
    309     for apk_filename, certname in self.certmap.iteritems():
    310       if certname == "EXTERNAL":
    311         # Apps marked EXTERNAL should be signed with the test key
    312         # during development, then manually re-signed after
    313         # predexopting.  Consider it an error if this app is now
    314         # signed with any key that is present in our tree.
    315         apk = self.apks_by_basename[apk_filename]
    316         name = ALL_CERTS.Get(apk.cert)
    317         if not name.startswith("unknown "):
    318           Push(apk.filename)
    319           AddProblem("hasn't been signed with EXTERNAL cert")
    320           Pop()
    321 
    322   def PrintCerts(self):
    323     """Display a table of packages grouped by cert."""
    324     by_cert = {}
    325     for apk in self.apks.itervalues():
    326       for cert in apk.certs:
    327         by_cert.setdefault(cert, []).append((apk.package, apk))
    328 
    329     order = [(-len(v), k) for (k, v) in by_cert.iteritems()]
    330     order.sort()
    331 
    332     for _, cert in order:
    333       print "%s:" % (ALL_CERTS.Get(cert),)
    334       apks = by_cert[cert]
    335       apks.sort()
    336       for _, apk in apks:
    337         if apk.shared_uid:
    338           print "  %-*s  %-*s  [%s]" % (self.max_fn_len, apk.filename,
    339                                         self.max_pkg_len, apk.package,
    340                                         apk.shared_uid)
    341         else:
    342           print "  %-*s  %s" % (self.max_fn_len, apk.filename, apk.package)
    343       print
    344 
    345   def CompareWith(self, other):
    346     """Look for instances where a given package that exists in both
    347     self and other have different certs."""
    348 
    349     all_apks = set(self.apks.keys())
    350     all_apks.update(other.apks.keys())
    351 
    352     max_pkg_len = max(self.max_pkg_len, other.max_pkg_len)
    353 
    354     by_certpair = {}
    355 
    356     for i in all_apks:
    357       if i in self.apks:
    358         if i in other.apks:
    359           # in both; should have same set of certs
    360           if self.apks[i].certs != other.apks[i].certs:
    361             by_certpair.setdefault((other.apks[i].certs,
    362                                     self.apks[i].certs), []).append(i)
    363         else:
    364           print "%s [%s]: new APK (not in comparison target_files)" % (
    365               i, self.apks[i].filename)
    366       else:
    367         if i in other.apks:
    368           print "%s [%s]: removed APK (only in comparison target_files)" % (
    369               i, other.apks[i].filename)
    370 
    371     if by_certpair:
    372       AddProblem("some APKs changed certs")
    373       Banner("APK signing differences")
    374       for (old, new), packages in sorted(by_certpair.items()):
    375         for i, o in enumerate(old):
    376           if i == 0:
    377             print "was", ALL_CERTS.Get(o)
    378           else:
    379             print "   ", ALL_CERTS.Get(o)
    380         for i, n in enumerate(new):
    381           if i == 0:
    382             print "now", ALL_CERTS.Get(n)
    383           else:
    384             print "   ", ALL_CERTS.Get(n)
    385         for i in sorted(packages):
    386           old_fn = other.apks[i].filename
    387           new_fn = self.apks[i].filename
    388           if old_fn == new_fn:
    389             print "  %-*s  [%s]" % (max_pkg_len, i, old_fn)
    390           else:
    391             print "  %-*s  [was: %s; now: %s]" % (max_pkg_len, i,
    392                                                   old_fn, new_fn)
    393         print
    394 
    395 
    396 def main(argv):
    397   def option_handler(o, a):
    398     if o in ("-c", "--compare_with"):
    399       OPTIONS.compare_with = a
    400     elif o in ("-l", "--local_cert_dirs"):
    401       OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")]
    402     elif o in ("-t", "--text"):
    403       OPTIONS.text = True
    404     else:
    405       return False
    406     return True
    407 
    408   args = common.ParseOptions(argv, __doc__,
    409                              extra_opts="c:l:t",
    410                              extra_long_opts=["compare_with=",
    411                                               "local_cert_dirs="],
    412                              extra_option_handler=option_handler)
    413 
    414   if len(args) != 1:
    415     common.Usage(__doc__)
    416     sys.exit(1)
    417 
    418   ALL_CERTS.FindLocalCerts()
    419 
    420   Push("input target_files:")
    421   try:
    422     target_files = TargetFiles()
    423     target_files.LoadZipFile(args[0])
    424   finally:
    425     Pop()
    426 
    427   compare_files = None
    428   if OPTIONS.compare_with:
    429     Push("comparison target_files:")
    430     try:
    431       compare_files = TargetFiles()
    432       compare_files.LoadZipFile(OPTIONS.compare_with)
    433     finally:
    434       Pop()
    435 
    436   if OPTIONS.text or not compare_files:
    437     Banner("target files")
    438     target_files.PrintCerts()
    439   target_files.CheckSharedUids()
    440   target_files.CheckExternalSignatures()
    441   if compare_files:
    442     if OPTIONS.text:
    443       Banner("comparison files")
    444       compare_files.PrintCerts()
    445     target_files.CompareWith(compare_files)
    446 
    447   if PROBLEMS:
    448     print "%d problem(s) found:\n" % (len(PROBLEMS),)
    449     for p in PROBLEMS:
    450       print p
    451     return 1
    452 
    453   return 0
    454 
    455 
    456 if __name__ == '__main__':
    457   try:
    458     r = main(sys.argv[1:])
    459     sys.exit(r)
    460   except common.ExternalError as e:
    461     print
    462     print "   ERROR: %s" % (e,)
    463     print
    464     sys.exit(1)
    465   finally:
    466     common.Cleanup()
    467