1 #!/usr/bin/env python 2 # Copyright (c) 2014 The Chromium Authors. All rights reserved. 3 # Use of this source code is governed by a BSD-style license that can be 4 # found in the LICENSE file. 5 6 """ 7 This utility takes a JSON input that describes a CRLSet and produces a 8 CRLSet from it. 9 10 The input is taken on stdin and is a dict with the following keys: 11 - BlockedBySPKI: An array of strings, where each string is a filename 12 containing a PEM certificate, from which an SPKI will be extracted. 13 - BlockedByHash: A dict of string to an array of ints, where the string is 14 a filename containing a PEM format certificate, and the ints are the 15 serial numbers. The listed serial numbers will be blocked when issued by 16 the given certificate. 17 18 For example: 19 20 { 21 "BlockedBySPKI": ["/tmp/blocked-certificate"], 22 "BlockedByHash": { 23 "/tmp/intermediate-certificate": [1, 2, 3] 24 } 25 } 26 """ 27 28 import hashlib 29 import json 30 import optparse 31 import struct 32 import sys 33 34 35 def _pem_cert_to_binary(pem_filename): 36 """Decodes the first PEM-encoded certificate in a given file into binary 37 38 Args: 39 pem_filename: A filename that contains a PEM-encoded certificate. It may 40 contain additional data (keys, textual representation) which will be 41 ignored 42 43 Returns: 44 A byte array containing the decoded certificate data 45 """ 46 base64 = "" 47 started = False 48 49 with open(pem_filename, 'r') as pem_file: 50 for line in pem_file: 51 if not started: 52 if line.startswith('-----BEGIN CERTIFICATE'): 53 started = True 54 else: 55 if line.startswith('-----END CERTIFICATE'): 56 break 57 base64 += line[:-1].strip() 58 59 return base64.decode('base64') 60 61 62 def _parse_asn1_element(der_bytes): 63 """Parses a DER-encoded tag/Length/Value into its component parts 64 65 Args: 66 der_bytes: A DER-encoded ASN.1 data type 67 68 Returns: 69 A tuple of the ASN.1 tag value, the length of the ASN.1 header that was 70 read, the sequence of bytes for the value, and then any data from der_bytes 71 that was not part of the tag/Length/Value. 72 """ 73 tag = ord(der_bytes[0]) 74 length = ord(der_bytes[1]) 75 header_length = 2 76 77 if length & 0x80: 78 num_length_bytes = length & 0x7f 79 length = 0 80 for i in xrange(2, 2 + num_length_bytes): 81 length <<= 8 82 length += ord(der_bytes[i]) 83 header_length = 2 + num_length_bytes 84 85 contents = der_bytes[:header_length + length] 86 rest = der_bytes[header_length + length:] 87 88 return (tag, header_length, contents, rest) 89 90 91 class ASN1Iterator(object): 92 """Iterator that parses and iterates through a ASN.1 DER structure""" 93 94 def __init__(self, contents): 95 self._tag = 0 96 self._header_length = 0 97 self._rest = None 98 self._contents = contents 99 self.step_into() 100 101 def step_into(self): 102 """Begins processing the inner contents of the next ASN.1 element""" 103 (self._tag, self._header_length, self._contents, self._rest) = ( 104 _parse_asn1_element(self._contents[self._header_length:])) 105 106 def step_over(self): 107 """Skips/ignores the next ASN.1 element""" 108 (self._tag, self._header_length, self._contents, self._rest) = ( 109 _parse_asn1_element(self._rest)) 110 111 def tag(self): 112 """Returns the ASN.1 tag of the current element""" 113 return self._tag 114 115 def contents(self): 116 """Returns the raw data of the current element""" 117 return self._contents 118 119 120 def _der_cert_to_spki(der_bytes): 121 """Returns the subjectPublicKeyInfo of a DER-encoded certificate 122 123 Args: 124 der_bytes: A DER-encoded certificate (RFC 5280) 125 126 Returns: 127 A byte array containing the subjectPublicKeyInfo 128 """ 129 iterator = ASN1Iterator(der_bytes) 130 iterator.step_into() # enter certificate structure 131 iterator.step_into() # enter TBSCertificate 132 iterator.step_over() # over version 133 iterator.step_over() # over serial 134 iterator.step_over() # over signature algorithm 135 iterator.step_over() # over issuer name 136 iterator.step_over() # over validity 137 iterator.step_over() # over subject name 138 return iterator.contents() 139 140 141 def pem_cert_file_to_spki_hash(pem_filename): 142 """Gets the SHA-256 hash of the subjectPublicKeyInfo of a cert in a file 143 144 Args: 145 pem_filename: A file containing a PEM-encoded certificate. 146 147 Returns: 148 The SHA-256 hash of the first certificate in the file, as a byte sequence 149 """ 150 return hashlib.sha256( 151 _der_cert_to_spki(_pem_cert_to_binary(pem_filename))).digest() 152 153 154 def main(): 155 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) 156 parser.add_option('-o', '--output', 157 help='Specifies the output file. The default is stdout.') 158 options, _ = parser.parse_args() 159 outfile = sys.stdout 160 if options.output and options.output != '-': 161 outfile = open(options.output, 'wb') 162 163 config = json.load(sys.stdin) 164 blocked_spkis = [ 165 pem_cert_file_to_spki_hash(pem_file).encode('base64').strip() 166 for pem_file in config.get('BlockedBySPKI', [])] 167 parents = { 168 pem_cert_file_to_spki_hash(pem_file): serials 169 for pem_file, serials in config.get('BlockedByHash', {}).iteritems() 170 } 171 header_json = { 172 'Version': 0, 173 'ContentType': 'CRLSet', 174 'Sequence': 0, 175 'DeltaFrom': 0, 176 'NumParents': len(parents), 177 'BlockedSPKIs': blocked_spkis, 178 } 179 header = json.dumps(header_json) 180 outfile.write(struct.pack('<H', len(header))) 181 outfile.write(header) 182 for spki, serials in sorted(parents.iteritems()): 183 outfile.write(spki) 184 outfile.write(struct.pack('<I', len(serials))) 185 for serial in serials: 186 raw_serial = [] 187 if not serial: 188 raw_serial = ['\x00'] 189 else: 190 while serial: 191 raw_serial.insert(0, chr(serial & 0xff)) 192 serial >>= 8 193 194 outfile.write(struct.pack('<B', len(raw_serial))) 195 outfile.write(''.join(raw_serial)) 196 return 0 197 198 199 if __name__ == '__main__': 200 sys.exit(main()) 201