Home | History | Annotate | Download | only in releasetools
      1 #!/usr/bin/env python
      2 #
      3 # Copyright (C) 2009 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 #      http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 """
     18 Check the signatures of all APKs in a target_files .zip file.  With
     19 -c, compare the signatures of each package to the ones in a separate
     20 target_files (usually a previously distributed build for the same
     21 device) and flag any changes.
     22 
     23 Usage:  check_target_file_signatures [flags] target_files
     24 
     25   -c  (--compare_with)  <other_target_files>
     26       Look for compatibility problems between the two sets of target
     27       files (eg., packages whose keys have changed).
     28 
     29   -l  (--local_cert_dirs)  <dir,dir,...>
     30       Comma-separated list of top-level directories to scan for
     31       .x509.pem files.  Defaults to "vendor,build".  Where cert files
     32       can be found that match APK signatures, the filename will be
     33       printed as the cert name, otherwise a hash of the cert plus its
     34       subject string will be printed instead.
     35 
     36   -t  (--text)
     37       Dump the certificate information for both packages in comparison
     38       mode (this output is normally suppressed).
     39 
     40 """
     41 
     42 import logging
     43 import os
     44 import re
     45 import subprocess
     46 import sys
     47 import zipfile
     48 
     49 import common
     50 
     51 if sys.hexversion < 0x02070000:
     52   print >> sys.stderr, "Python 2.7 or newer is required."
     53   sys.exit(1)
     54 
     55 
     56 logger = logging.getLogger(__name__)
     57 
     58 # Work around a bug in Python's zipfile module that prevents opening of zipfiles
     59 # if any entry has an extra field of between 1 and 3 bytes (which is common with
     60 # zipaligned APKs). This overrides the ZipInfo._decodeExtra() method (which
     61 # contains the bug) with an empty version (since we don't need to decode the
     62 # extra field anyway).
     63 # Issue #14315: https://bugs.python.org/issue14315, fixed in Python 2.7.8 and
     64 # Python 3.5.0 alpha 1.
     65 class MyZipInfo(zipfile.ZipInfo):
     66   def _decodeExtra(self):
     67     pass
     68 zipfile.ZipInfo = MyZipInfo
     69 
     70 OPTIONS = common.OPTIONS
     71 
     72 OPTIONS.text = False
     73 OPTIONS.compare_with = None
     74 OPTIONS.local_cert_dirs = ("vendor", "build")
     75 
     76 PROBLEMS = []
     77 PROBLEM_PREFIX = []
     78 
     79 def AddProblem(msg):
     80   PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg)
     81 def Push(msg):
     82   PROBLEM_PREFIX.append(msg)
     83 def Pop():
     84   PROBLEM_PREFIX.pop()
     85 
     86 
     87 def Banner(msg):
     88   print "-" * 70
     89   print "  ", msg
     90   print "-" * 70
     91 
     92 
     93 def GetCertSubject(cert):
     94   p = common.Run(["openssl", "x509", "-inform", "DER", "-text"],
     95                  stdin=subprocess.PIPE,
     96                  stdout=subprocess.PIPE)
     97   out, err = p.communicate(cert)
     98   if err and not err.strip():
     99     return "(error reading cert subject)"
    100   for line in out.split("\n"):
    101     line = line.strip()
    102     if line.startswith("Subject:"):
    103       return line[8:].strip()
    104   return "(unknown cert subject)"
    105 
    106 
    107 class CertDB(object):
    108   def __init__(self):
    109     self.certs = {}
    110 
    111   def Add(self, cert, name=None):
    112     if cert in self.certs:
    113       if name:
    114         self.certs[cert] = self.certs[cert] + "," + name
    115     else:
    116       if name is None:
    117         name = "unknown cert %s (%s)" % (common.sha1(cert).hexdigest()[:12],
    118                                          GetCertSubject(cert))
    119       self.certs[cert] = name
    120 
    121   def Get(self, cert):
    122     """Return the name for a given cert."""
    123     return self.certs.get(cert, None)
    124 
    125   def FindLocalCerts(self):
    126     to_load = []
    127     for top in OPTIONS.local_cert_dirs:
    128       for dirpath, _, filenames in os.walk(top):
    129         certs = [os.path.join(dirpath, i)
    130                  for i in filenames if i.endswith(".x509.pem")]
    131         if certs:
    132           to_load.extend(certs)
    133 
    134     for i in to_load:
    135       f = open(i)
    136       cert = common.ParseCertificate(f.read())
    137       f.close()
    138       name, _ = os.path.splitext(i)
    139       name, _ = os.path.splitext(name)
    140       self.Add(cert, name)
    141 
    142 ALL_CERTS = CertDB()
    143 
    144 
    145 def CertFromPKCS7(data, filename):
    146   """Read the cert out of a PKCS#7-format file (which is what is
    147   stored in a signed .apk)."""
    148   Push(filename + ":")
    149   try:
    150     p = common.Run(["openssl", "pkcs7",
    151                     "-inform", "DER",
    152                     "-outform", "PEM",
    153                     "-print_certs"],
    154                    stdin=subprocess.PIPE,
    155                    stdout=subprocess.PIPE)
    156     out, err = p.communicate(data)
    157     if err and not err.strip():
    158       AddProblem("error reading cert:\n" + err)
    159       return None
    160 
    161     cert = common.ParseCertificate(out)
    162     if not cert:
    163       AddProblem("error parsing cert output")
    164       return None
    165     return cert
    166   finally:
    167     Pop()
    168 
    169 
    170 class APK(object):
    171 
    172   def __init__(self, full_filename, filename):
    173     self.filename = filename
    174     self.certs = None
    175     self.shared_uid = None
    176     self.package = None
    177 
    178     Push(filename+":")
    179     try:
    180       self.RecordCerts(full_filename)
    181       self.ReadManifest(full_filename)
    182     finally:
    183       Pop()
    184 
    185   def RecordCerts(self, full_filename):
    186     out = set()
    187     try:
    188       f = open(full_filename)
    189       apk = zipfile.ZipFile(f, "r")
    190       pkcs7 = None
    191       for info in apk.infolist():
    192         if info.filename.startswith("META-INF/") and \
    193            (info.filename.endswith(".DSA") or info.filename.endswith(".RSA")):
    194           pkcs7 = apk.read(info.filename)
    195           cert = CertFromPKCS7(pkcs7, info.filename)
    196           out.add(cert)
    197           ALL_CERTS.Add(cert)
    198       if not pkcs7:
    199         AddProblem("no signature")
    200     finally:
    201       f.close()
    202       self.certs = frozenset(out)
    203 
    204   def ReadManifest(self, full_filename):
    205     p = common.Run(["aapt", "dump", "xmltree", full_filename,
    206                     "AndroidManifest.xml"],
    207                    stdout=subprocess.PIPE)
    208     manifest, err = p.communicate()
    209     if err:
    210       AddProblem("failed to read manifest")
    211       return
    212 
    213     self.shared_uid = None
    214     self.package = None
    215 
    216     for line in manifest.split("\n"):
    217       line = line.strip()
    218       m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line)
    219       if m:
    220         name = m.group(1)
    221         if name == "android:sharedUserId":
    222           if self.shared_uid is not None:
    223             AddProblem("multiple sharedUserId declarations")
    224           self.shared_uid = m.group(2)
    225         elif name == "package":
    226           if self.package is not None:
    227             AddProblem("multiple package declarations")
    228           self.package = m.group(2)
    229 
    230     if self.package is None:
    231       AddProblem("no package declaration")
    232 
    233 
    234 class TargetFiles(object):
    235   def __init__(self):
    236     self.max_pkg_len = 30
    237     self.max_fn_len = 20
    238     self.apks = None
    239     self.apks_by_basename = None
    240     self.certmap = None
    241 
    242   def LoadZipFile(self, filename):
    243     # First read the APK certs file to figure out whether there are compressed
    244     # APKs in the archive. If we do have compressed APKs in the archive, then we
    245     # must decompress them individually before we perform any analysis.
    246 
    247     # This is the list of wildcards of files we extract from |filename|.
    248     apk_extensions = ['*.apk', '*.apex']
    249 
    250     self.certmap, compressed_extension = common.ReadApkCerts(
    251         zipfile.ZipFile(filename))
    252     if compressed_extension:
    253       apk_extensions.append('*.apk' + compressed_extension)
    254 
    255     d = common.UnzipTemp(filename, apk_extensions)
    256     self.apks = {}
    257     self.apks_by_basename = {}
    258     for dirpath, _, filenames in os.walk(d):
    259       for fn in filenames:
    260         # Decompress compressed APKs before we begin processing them.
    261         if compressed_extension and fn.endswith(compressed_extension):
    262           # First strip the compressed extension from the file.
    263           uncompressed_fn = fn[:-len(compressed_extension)]
    264 
    265           # Decompress the compressed file to the output file.
    266           common.Gunzip(os.path.join(dirpath, fn),
    267                         os.path.join(dirpath, uncompressed_fn))
    268 
    269           # Finally, delete the compressed file and use the uncompressed file
    270           # for further processing. Note that the deletion is not strictly
    271           # required, but is done here to ensure that we're not using too much
    272           # space in the temporary directory.
    273           os.remove(os.path.join(dirpath, fn))
    274           fn = uncompressed_fn
    275 
    276         if fn.endswith(('.apk', '.apex')):
    277           fullname = os.path.join(dirpath, fn)
    278           displayname = fullname[len(d)+1:]
    279           apk = APK(fullname, displayname)
    280           self.apks[apk.filename] = apk
    281           self.apks_by_basename[os.path.basename(apk.filename)] = apk
    282 
    283           self.max_pkg_len = max(self.max_pkg_len, len(apk.package))
    284           self.max_fn_len = max(self.max_fn_len, len(apk.filename))
    285 
    286   def CheckSharedUids(self):
    287     """Look for any instances where packages signed with different
    288     certs request the same sharedUserId."""
    289     apks_by_uid = {}
    290     for apk in self.apks.itervalues():
    291       if apk.shared_uid:
    292         apks_by_uid.setdefault(apk.shared_uid, []).append(apk)
    293 
    294     for uid in sorted(apks_by_uid):
    295       apks = apks_by_uid[uid]
    296       for apk in apks[1:]:
    297         if apk.certs != apks[0].certs:
    298           break
    299       else:
    300         # all packages have the same set of certs; this uid is fine.
    301         continue
    302 
    303       AddProblem("different cert sets for packages with uid %s" % (uid,))
    304 
    305       print "uid %s is shared by packages with different cert sets:" % (uid,)
    306       for apk in apks:
    307         print "%-*s  [%s]" % (self.max_pkg_len, apk.package, apk.filename)
    308         for cert in apk.certs:
    309           print "   ", ALL_CERTS.Get(cert)
    310       print
    311 
    312   def CheckExternalSignatures(self):
    313     for apk_filename, certname in self.certmap.iteritems():
    314       if certname == "EXTERNAL":
    315         # Apps marked EXTERNAL should be signed with the test key
    316         # during development, then manually re-signed after
    317         # predexopting.  Consider it an error if this app is now
    318         # signed with any key that is present in our tree.
    319         apk = self.apks_by_basename[apk_filename]
    320         name = ALL_CERTS.Get(apk.cert)
    321         if not name.startswith("unknown "):
    322           Push(apk.filename)
    323           AddProblem("hasn't been signed with EXTERNAL cert")
    324           Pop()
    325 
    326   def PrintCerts(self):
    327     """Display a table of packages grouped by cert."""
    328     by_cert = {}
    329     for apk in self.apks.itervalues():
    330       for cert in apk.certs:
    331         by_cert.setdefault(cert, []).append((apk.package, apk))
    332 
    333     order = [(-len(v), k) for (k, v) in by_cert.iteritems()]
    334     order.sort()
    335 
    336     for _, cert in order:
    337       print "%s:" % (ALL_CERTS.Get(cert),)
    338       apks = by_cert[cert]
    339       apks.sort()
    340       for _, apk in apks:
    341         if apk.shared_uid:
    342           print "  %-*s  %-*s  [%s]" % (self.max_fn_len, apk.filename,
    343                                         self.max_pkg_len, apk.package,
    344                                         apk.shared_uid)
    345         else:
    346           print "  %-*s  %s" % (self.max_fn_len, apk.filename, apk.package)
    347       print
    348 
    349   def CompareWith(self, other):
    350     """Look for instances where a given package that exists in both
    351     self and other have different certs."""
    352 
    353     all_apks = set(self.apks.keys())
    354     all_apks.update(other.apks.keys())
    355 
    356     max_pkg_len = max(self.max_pkg_len, other.max_pkg_len)
    357 
    358     by_certpair = {}
    359 
    360     for i in all_apks:
    361       if i in self.apks:
    362         if i in other.apks:
    363           # in both; should have same set of certs
    364           if self.apks[i].certs != other.apks[i].certs:
    365             by_certpair.setdefault((other.apks[i].certs,
    366                                     self.apks[i].certs), []).append(i)
    367         else:
    368           print "%s [%s]: new APK (not in comparison target_files)" % (
    369               i, self.apks[i].filename)
    370       else:
    371         if i in other.apks:
    372           print "%s [%s]: removed APK (only in comparison target_files)" % (
    373               i, other.apks[i].filename)
    374 
    375     if by_certpair:
    376       AddProblem("some APKs changed certs")
    377       Banner("APK signing differences")
    378       for (old, new), packages in sorted(by_certpair.items()):
    379         for i, o in enumerate(old):
    380           if i == 0:
    381             print "was", ALL_CERTS.Get(o)
    382           else:
    383             print "   ", ALL_CERTS.Get(o)
    384         for i, n in enumerate(new):
    385           if i == 0:
    386             print "now", ALL_CERTS.Get(n)
    387           else:
    388             print "   ", ALL_CERTS.Get(n)
    389         for i in sorted(packages):
    390           old_fn = other.apks[i].filename
    391           new_fn = self.apks[i].filename
    392           if old_fn == new_fn:
    393             print "  %-*s  [%s]" % (max_pkg_len, i, old_fn)
    394           else:
    395             print "  %-*s  [was: %s; now: %s]" % (max_pkg_len, i,
    396                                                   old_fn, new_fn)
    397         print
    398 
    399 
    400 def main(argv):
    401   def option_handler(o, a):
    402     if o in ("-c", "--compare_with"):
    403       OPTIONS.compare_with = a
    404     elif o in ("-l", "--local_cert_dirs"):
    405       OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")]
    406     elif o in ("-t", "--text"):
    407       OPTIONS.text = True
    408     else:
    409       return False
    410     return True
    411 
    412   args = common.ParseOptions(argv, __doc__,
    413                              extra_opts="c:l:t",
    414                              extra_long_opts=["compare_with=",
    415                                               "local_cert_dirs="],
    416                              extra_option_handler=option_handler)
    417 
    418   if len(args) != 1:
    419     common.Usage(__doc__)
    420     sys.exit(1)
    421 
    422   common.InitLogging()
    423 
    424   ALL_CERTS.FindLocalCerts()
    425 
    426   Push("input target_files:")
    427   try:
    428     target_files = TargetFiles()
    429     target_files.LoadZipFile(args[0])
    430   finally:
    431     Pop()
    432 
    433   compare_files = None
    434   if OPTIONS.compare_with:
    435     Push("comparison target_files:")
    436     try:
    437       compare_files = TargetFiles()
    438       compare_files.LoadZipFile(OPTIONS.compare_with)
    439     finally:
    440       Pop()
    441 
    442   if OPTIONS.text or not compare_files:
    443     Banner("target files")
    444     target_files.PrintCerts()
    445   target_files.CheckSharedUids()
    446   target_files.CheckExternalSignatures()
    447   if compare_files:
    448     if OPTIONS.text:
    449       Banner("comparison files")
    450       compare_files.PrintCerts()
    451     target_files.CompareWith(compare_files)
    452 
    453   if PROBLEMS:
    454     print "%d problem(s) found:\n" % (len(PROBLEMS),)
    455     for p in PROBLEMS:
    456       print p
    457     return 1
    458 
    459   return 0
    460 
    461 
    462 if __name__ == '__main__':
    463   try:
    464     r = main(sys.argv[1:])
    465     sys.exit(r)
    466   except common.ExternalError as e:
    467     print
    468     print "   ERROR: %s" % (e,)
    469     print
    470     sys.exit(1)
    471   finally:
    472     common.Cleanup()
    473