Home | History | Annotate | Download | only in scripts
      1 #!/usr/bin/env python
      2 # Copyright (c) 2014 The Chromium Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 """
      7 This utility takes a JSON input that describes a CRLSet and produces a
      8 CRLSet from it.
      9 
     10 The input is taken on stdin and is a dict with the following keys:
     11   - BlockedBySPKI: An array of strings, where each string is a filename
     12       containing a PEM certificate, from which an SPKI will be extracted.
     13   - BlockedByHash: A dict of string to an array of ints, where the string is
     14       a filename containing a PEM format certificate, and the ints are the
     15       serial numbers. The listed serial numbers will be blocked when issued by
     16       the given certificate.
     17 
     18 For example:
     19 
     20 {
     21   "BlockedBySPKI": ["/tmp/blocked-certificate"],
     22   "BlockedByHash": {
     23     "/tmp/intermediate-certificate": [1, 2, 3]
     24   }
     25 }
     26 """
     27 
     28 import hashlib
     29 import json
     30 import optparse
     31 import struct
     32 import sys
     33 
     34 
     35 def _pem_cert_to_binary(pem_filename):
     36   """Decodes the first PEM-encoded certificate in a given file into binary
     37 
     38   Args:
     39     pem_filename: A filename that contains a PEM-encoded certificate. It may
     40         contain additional data (keys, textual representation) which will be
     41         ignored
     42 
     43   Returns:
     44     A byte array containing the decoded certificate data
     45   """
     46   base64 = ""
     47   started = False
     48 
     49   with open(pem_filename, 'r') as pem_file:
     50     for line in pem_file:
     51       if not started:
     52         if line.startswith('-----BEGIN CERTIFICATE'):
     53           started = True
     54       else:
     55         if line.startswith('-----END CERTIFICATE'):
     56           break
     57         base64 += line[:-1].strip()
     58 
     59   return base64.decode('base64')
     60 
     61 
     62 def _parse_asn1_element(der_bytes):
     63   """Parses a DER-encoded tag/Length/Value into its component parts
     64 
     65   Args:
     66     der_bytes: A DER-encoded ASN.1 data type
     67 
     68   Returns:
     69     A tuple of the ASN.1 tag value, the length of the ASN.1 header that was
     70     read, the sequence of bytes for the value, and then any data from der_bytes
     71     that was not part of the tag/Length/Value.
     72   """
     73   tag = ord(der_bytes[0])
     74   length = ord(der_bytes[1])
     75   header_length = 2
     76 
     77   if length & 0x80:
     78     num_length_bytes = length & 0x7f
     79     length = 0
     80     for i in xrange(2, 2 + num_length_bytes):
     81       length <<= 8
     82       length += ord(der_bytes[i])
     83     header_length = 2 + num_length_bytes
     84 
     85   contents = der_bytes[:header_length + length]
     86   rest = der_bytes[header_length + length:]
     87 
     88   return (tag, header_length, contents, rest)
     89 
     90 
     91 class ASN1Iterator(object):
     92   """Iterator that parses and iterates through a ASN.1 DER structure"""
     93 
     94   def __init__(self, contents):
     95     self._tag = 0
     96     self._header_length = 0
     97     self._rest = None
     98     self._contents = contents
     99     self.step_into()
    100 
    101   def step_into(self):
    102     """Begins processing the inner contents of the next ASN.1 element"""
    103     (self._tag, self._header_length, self._contents, self._rest) = (
    104         _parse_asn1_element(self._contents[self._header_length:]))
    105 
    106   def step_over(self):
    107     """Skips/ignores the next ASN.1 element"""
    108     (self._tag, self._header_length, self._contents, self._rest) = (
    109         _parse_asn1_element(self._rest))
    110 
    111   def tag(self):
    112     """Returns the ASN.1 tag of the current element"""
    113     return self._tag
    114 
    115   def contents(self):
    116     """Returns the raw data of the current element"""
    117     return self._contents
    118 
    119 
    120 def _der_cert_to_spki(der_bytes):
    121   """Returns the subjectPublicKeyInfo of a DER-encoded certificate
    122 
    123   Args:
    124     der_bytes: A DER-encoded certificate (RFC 5280)
    125 
    126   Returns:
    127     A byte array containing the subjectPublicKeyInfo
    128   """
    129   iterator = ASN1Iterator(der_bytes)
    130   iterator.step_into()  # enter certificate structure
    131   iterator.step_into()  # enter TBSCertificate
    132   iterator.step_over()  # over version
    133   iterator.step_over()  # over serial
    134   iterator.step_over()  # over signature algorithm
    135   iterator.step_over()  # over issuer name
    136   iterator.step_over()  # over validity
    137   iterator.step_over()  # over subject name
    138   return iterator.contents()
    139 
    140 
    141 def pem_cert_file_to_spki_hash(pem_filename):
    142   """Gets the SHA-256 hash of the subjectPublicKeyInfo of a cert in a file
    143 
    144   Args:
    145     pem_filename: A file containing a PEM-encoded certificate.
    146 
    147   Returns:
    148     The SHA-256 hash of the first certificate in the file, as a byte sequence
    149   """
    150   return hashlib.sha256(
    151     _der_cert_to_spki(_pem_cert_to_binary(pem_filename))).digest()
    152 
    153 
    154 def main():
    155   parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
    156   parser.add_option('-o', '--output',
    157                     help='Specifies the output file. The default is stdout.')
    158   options, _ = parser.parse_args()
    159   outfile = sys.stdout
    160   if options.output and options.output != '-':
    161     outfile = open(options.output, 'wb')
    162 
    163   config = json.load(sys.stdin)
    164   blocked_spkis = [
    165       pem_cert_file_to_spki_hash(pem_file).encode('base64').strip()
    166       for pem_file in config.get('BlockedBySPKI', [])]
    167   parents = {
    168     pem_cert_file_to_spki_hash(pem_file): serials
    169     for pem_file, serials in config.get('BlockedByHash', {}).iteritems()
    170   }
    171   header_json = {
    172       'Version': 0,
    173       'ContentType': 'CRLSet',
    174       'Sequence': 0,
    175       'DeltaFrom': 0,
    176       'NumParents': len(parents),
    177       'BlockedSPKIs': blocked_spkis,
    178   }
    179   header = json.dumps(header_json)
    180   outfile.write(struct.pack('<H', len(header)))
    181   outfile.write(header)
    182   for spki, serials in sorted(parents.iteritems()):
    183     outfile.write(spki)
    184     outfile.write(struct.pack('<I', len(serials)))
    185     for serial in serials:
    186       raw_serial = []
    187       if not serial:
    188         raw_serial = ['\x00']
    189       else:
    190         while serial:
    191           raw_serial.insert(0, chr(serial & 0xff))
    192           serial >>= 8
    193 
    194     outfile.write(struct.pack('<B', len(raw_serial)))
    195     outfile.write(''.join(raw_serial))
    196   return 0
    197 
    198 
    199 if __name__ == '__main__':
    200   sys.exit(main())
    201