Home | History | Annotate | Download | only in testserver
      1 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import asn1
      6 import hashlib
      7 import os
      8 
      9 
     10 # This file implements very minimal certificate and OCSP generation. It's
     11 # designed to test revocation checking.
     12 
     13 def RandomNumber(length_in_bytes):
     14   '''RandomNumber returns a random number of length 8*|length_in_bytes| bits'''
     15   rand = os.urandom(length_in_bytes)
     16   n = 0
     17   for x in rand:
     18     n <<= 8
     19     n |= ord(x)
     20   return n
     21 
     22 
     23 def ModExp(n, e, p):
     24   '''ModExp returns n^e mod p'''
     25   r = 1
     26   while e != 0:
     27     if e & 1:
     28       r = (r*n) % p
     29     e >>= 1
     30     n = (n*n) % p
     31   return r
     32 
     33 # PKCS1v15_SHA1_PREFIX is the ASN.1 prefix for a SHA1 signature.
     34 PKCS1v15_SHA1_PREFIX = '3021300906052b0e03021a05000414'.decode('hex')
     35 
     36 class RSA(object):
     37   def __init__(self, modulus, e, d):
     38     self.m = modulus
     39     self.e = e
     40     self.d = d
     41 
     42     self.modlen = 0
     43     m = modulus
     44     while m != 0:
     45       self.modlen += 1
     46       m >>= 8
     47 
     48   def Sign(self, message):
     49     digest = hashlib.sha1(message).digest()
     50     prefix = PKCS1v15_SHA1_PREFIX
     51 
     52     em = ['\xff'] * (self.modlen - 1 - len(prefix) - len(digest))
     53     em[0] = '\x00'
     54     em[1] = '\x01'
     55     em += "\x00" + prefix + digest
     56 
     57     n = 0
     58     for x in em:
     59       n <<= 8
     60       n |= ord(x)
     61 
     62     s = ModExp(n, self.d, self.m)
     63     out = []
     64     while s != 0:
     65       out.append(s & 0xff)
     66       s >>= 8
     67     out.reverse()
     68     return '\x00' * (self.modlen - len(out)) + asn1.ToBytes(out)
     69 
     70   def ToDER(self):
     71     return asn1.ToDER(asn1.SEQUENCE([self.m, self.e]))
     72 
     73 
     74 def Name(cn = None, c = None, o = None):
     75   names = asn1.SEQUENCE([])
     76 
     77   if cn is not None:
     78     names.children.append(
     79       asn1.SET([
     80         asn1.SEQUENCE([
     81           COMMON_NAME, cn,
     82         ])
     83       ])
     84     )
     85 
     86   if c is not None:
     87     names.children.append(
     88       asn1.SET([
     89         asn1.SEQUENCE([
     90           COUNTRY, c,
     91         ])
     92       ])
     93     )
     94 
     95   if o is not None:
     96     names.children.append(
     97       asn1.SET([
     98         asn1.SEQUENCE([
     99           ORGANIZATION, o,
    100         ])
    101       ])
    102     )
    103 
    104   return names
    105 
    106 
    107 # The private key and root certificate name are hard coded here:
    108 
    109 # This is the private key
    110 KEY = RSA(0x00a71998f2930bfe73d031a87f133d2f378eeeeed52a77e44d0fc9ff6f07ff32cbf3da999de4ed65832afcb0807f98787506539d258a0ce3c2c77967653099a9034a9b115a876c39a8c4e4ed4acd0c64095946fb39eeeb47a0704dbb018acf48c3a1c4b895fc409fb4a340a986b1afc45519ab9eca47c30185c771c64aa5ecf07d,
    111           3,
    112           0x6f6665f70cb2a9a28acbc5aa0cd374cfb49f49e371a542de0a86aa4a0554cc87f7e71113edf399021ca875aaffbafaf8aee268c3b15ded2c84fb9a4375bbc6011d841e57833bc6f998d25daf6fa7f166b233e3e54a4bae7a5aaaba21431324967d5ff3e1d4f413827994262115ca54396e7068d0afa7af787a5782bc7040e6d3)
    113 
    114 # And the same thing in PEM format
    115 KEY_PEM = '''-----BEGIN RSA PRIVATE KEY-----
    116 MIICXAIBAAKBgQCnGZjykwv+c9AxqH8TPS83ju7u1Sp35E0Pyf9vB/8yy/PamZ3k
    117 7WWDKvywgH+YeHUGU50ligzjwsd5Z2UwmakDSpsRWodsOajE5O1KzQxkCVlG+znu
    118 60egcE27AYrPSMOhxLiV/ECftKNAqYaxr8RVGaueykfDAYXHccZKpezwfQIBAwKB
    119 gG9mZfcMsqmiisvFqgzTdM+0n0njcaVC3gqGqkoFVMyH9+cRE+3zmQIcqHWq/7r6
    120 +K7iaMOxXe0shPuaQ3W7xgEdhB5XgzvG+ZjSXa9vp/FmsjPj5UpLrnpaqrohQxMk
    121 ln1f8+HU9BOCeZQmIRXKVDlucGjQr6eveHpXgrxwQObTAkEA2wBAfuduw5G0/VfN
    122 Wx66D5fbPccfYFqLM5LuTimLmNqzK2gIKXckB2sm44gJZ6wVlumaB1CSNug2LNYx
    123 3cAjUwJBAMNUo1hbI8ugqqwI9kpxv9+2Heea4BlnXbS6tYF8pvkHMoliuxNbXmmB
    124 u4zNB5iZ6V0ZZ4nvtUNo2cGr/h/Lcu8CQQCSACr/RPSCYSNTj948vya1D+d+hL+V
    125 kbIiYfQ0G7Jl5yIc8AVw+hgE8hntBVuacrkPRmaviwwkms7IjsvpKsI3AkEAgjhs
    126 5ZIX3RXHHVtO3EvVP86+mmdAEO+TzdHOVlMZ+1ohsOx8t5I+8QEnszNaZbvw6Lua
    127 W/UjgkXmgR1UFTJMnwJBAKErmAw21/g3SST0a4wlyaGT/MbXL8Ouwnb5IOKQVe55
    128 CZdeVeSh6cJ4hAcQKfr2s1JaZTJFIBPGKAif5HqpydA=
    129 -----END RSA PRIVATE KEY-----
    130 '''
    131 
    132 # Root certificate CN
    133 ISSUER_CN = "Testing CA"
    134 
    135 # All certificates are issued under this policy OID, in the Google arc:
    136 CERT_POLICY_OID = asn1.OID([1, 3, 6, 1, 4, 1, 11129, 2, 4, 1])
    137 
    138 # These result in the following root certificate:
    139 # -----BEGIN CERTIFICATE-----
    140 # MIIB0TCCATqgAwIBAgIBATANBgkqhkiG9w0BAQUFADAVMRMwEQYDVQQDEwpUZXN0aW5nIENBMB4X
    141 # DTEwMDEwMTA2MDAwMFoXDTMyMTIwMTA2MDAwMFowFTETMBEGA1UEAxMKVGVzdGluZyBDQTCBnTAN
    142 # BgkqhkiG9w0BAQEFAAOBiwAwgYcCgYEApxmY8pML/nPQMah/Ez0vN47u7tUqd+RND8n/bwf/Msvz
    143 # 2pmd5O1lgyr8sIB/mHh1BlOdJYoM48LHeWdlMJmpA0qbEVqHbDmoxOTtSs0MZAlZRvs57utHoHBN
    144 # uwGKz0jDocS4lfxAn7SjQKmGsa/EVRmrnspHwwGFx3HGSqXs8H0CAQOjMzAxMBIGA1UdEwEB/wQI
    145 # MAYBAf8CAQAwGwYDVR0gAQEABBEwDzANBgsrBgEEAdZ5AgHODzANBgkqhkiG9w0BAQUFAAOBgQA/
    146 # STb40A6D+93jMfLGQzXc997IsaJZdoPt7tYa8PqGJBL62EiTj+erd/H5pDZx/2/bcpOG4m9J56yg
    147 # wOohbllw2TM+oeEd8syzV6X+1SIPnGI56JRrm3UXcHYx1Rq5loM9WKAiz/WmIWmskljsEQ7+542p
    148 # q0pkHjs8nuXovSkUYA==
    149 # -----END CERTIFICATE-----
    150 
    151 # If you update any of the above, you can generate a new root with the
    152 # following line:
    153 #   print DERToPEM(MakeCertificate(ISSUER_CN, ISSUER_CN, 1, KEY, KEY, None))
    154 
    155 
    156 # Various OIDs
    157 
    158 AIA_OCSP = asn1.OID([1, 3, 6, 1, 5, 5, 7, 48, 1])
    159 AUTHORITY_INFORMATION_ACCESS = asn1.OID([1, 3, 6, 1, 5, 5, 7, 1, 1])
    160 BASIC_CONSTRAINTS = asn1.OID([2, 5, 29, 19])
    161 CERT_POLICIES = asn1.OID([2, 5, 29, 32])
    162 COMMON_NAME = asn1.OID([2, 5, 4, 3])
    163 COUNTRY = asn1.OID([2, 5, 4, 6])
    164 HASH_SHA1 = asn1.OID([1, 3, 14, 3, 2, 26])
    165 OCSP_TYPE_BASIC = asn1.OID([1, 3, 6, 1, 5, 5, 7, 48, 1, 1])
    166 ORGANIZATION = asn1.OID([2, 5, 4, 10])
    167 PUBLIC_KEY_RSA = asn1.OID([1, 2, 840, 113549, 1, 1, 1])
    168 SHA1_WITH_RSA_ENCRYPTION = asn1.OID([1, 2, 840, 113549, 1, 1, 5])
    169 
    170 
    171 def MakeCertificate(
    172     issuer_cn, subject_cn, serial, pubkey, privkey, ocsp_url = None):
    173   '''MakeCertificate returns a DER encoded certificate, signed by privkey.'''
    174   extensions = asn1.SEQUENCE([])
    175 
    176   # Default subject name fields
    177   c = "XX"
    178   o = "Testing Org"
    179 
    180   if issuer_cn == subject_cn:
    181     # Root certificate.
    182     c = None
    183     o = None
    184     extensions.children.append(
    185       asn1.SEQUENCE([
    186         basic_constraints,
    187         True,
    188         asn1.OCTETSTRING(asn1.ToDER(asn1.SEQUENCE([
    189           True, # IsCA
    190           0, # Path len
    191         ]))),
    192       ]))
    193 
    194   if ocsp_url is not None:
    195     extensions.children.append(
    196       asn1.SEQUENCE([
    197         AUTHORITY_INFORMATION_ACCESS,
    198         False,
    199         asn1.OCTETSTRING(asn1.ToDER(asn1.SEQUENCE([
    200           asn1.SEQUENCE([
    201             AIA_OCSP,
    202             asn1.Raw(asn1.TagAndLength(0x86, len(ocsp_url)) + ocsp_url),
    203           ]),
    204         ]))),
    205       ]))
    206 
    207   extensions.children.append(
    208     asn1.SEQUENCE([
    209       CERT_POLICIES,
    210       False,
    211       asn1.OCTETSTRING(asn1.ToDER(asn1.SEQUENCE([
    212         asn1.SEQUENCE([ # PolicyInformation
    213           CERT_POLICY_OID,
    214         ]),
    215       ]))),
    216     ])
    217   )
    218 
    219   tbsCert = asn1.ToDER(asn1.SEQUENCE([
    220       asn1.Explicit(0, 2), # Version
    221       serial,
    222       asn1.SEQUENCE([SHA1_WITH_RSA_ENCRYPTION, None]), # SignatureAlgorithm
    223       Name(cn = issuer_cn), # Issuer
    224       asn1.SEQUENCE([ # Validity
    225         asn1.UTCTime("100101060000Z"), # NotBefore
    226         asn1.UTCTime("321201060000Z"), # NotAfter
    227       ]),
    228       Name(cn = subject_cn, c = c, o = o), # Subject
    229       asn1.SEQUENCE([ # SubjectPublicKeyInfo
    230         asn1.SEQUENCE([ # Algorithm
    231           PUBLIC_KEY_RSA,
    232           None,
    233         ]),
    234         asn1.BitString(asn1.ToDER(pubkey)),
    235       ]),
    236       asn1.Explicit(3, extensions),
    237     ]))
    238 
    239   return asn1.ToDER(asn1.SEQUENCE([
    240     asn1.Raw(tbsCert),
    241     asn1.SEQUENCE([
    242       SHA1_WITH_RSA_ENCRYPTION,
    243       None,
    244     ]),
    245     asn1.BitString(privkey.Sign(tbsCert)),
    246   ]))
    247 
    248 
    249 def MakeOCSPResponse(issuer_cn, issuer_key, serial, ocsp_state):
    250   # https://tools.ietf.org/html/rfc2560
    251   issuer_name_hash = asn1.OCTETSTRING(
    252       hashlib.sha1(asn1.ToDER(Name(cn = issuer_cn))).digest())
    253 
    254   issuer_key_hash = asn1.OCTETSTRING(
    255       hashlib.sha1(asn1.ToDER(issuer_key)).digest())
    256 
    257   cert_status = None
    258   if ocsp_state == OCSP_STATE_REVOKED:
    259     cert_status = asn1.Explicit(1, asn1.GeneralizedTime("20100101060000Z"))
    260   elif ocsp_state == OCSP_STATE_UNKNOWN:
    261     cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 2, 0))
    262   elif ocsp_state == OCSP_STATE_GOOD:
    263     cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 0, 0))
    264   else:
    265     raise ValueError('Bad OCSP state: ' + str(ocsp_state))
    266 
    267   basic_resp_data_der = asn1.ToDER(asn1.SEQUENCE([
    268     asn1.Explicit(2, issuer_key_hash),
    269     asn1.GeneralizedTime("20100101060000Z"), # producedAt
    270     asn1.SEQUENCE([
    271       asn1.SEQUENCE([ # SingleResponse
    272         asn1.SEQUENCE([ # CertID
    273           asn1.SEQUENCE([ # hashAlgorithm
    274             HASH_SHA1,
    275             None,
    276           ]),
    277           issuer_name_hash,
    278           issuer_key_hash,
    279           serial,
    280         ]),
    281         cert_status,
    282         asn1.GeneralizedTime("20100101060000Z"), # thisUpdate
    283         asn1.Explicit(0, asn1.GeneralizedTime("20300101060000Z")), # nextUpdate
    284       ]),
    285     ]),
    286   ]))
    287 
    288   basic_resp = asn1.SEQUENCE([
    289     asn1.Raw(basic_resp_data_der),
    290     asn1.SEQUENCE([
    291       SHA1_WITH_RSA_ENCRYPTION,
    292       None,
    293     ]),
    294     asn1.BitString(issuer_key.Sign(basic_resp_data_der)),
    295   ])
    296 
    297   resp = asn1.SEQUENCE([
    298     asn1.ENUMERATED(0),
    299     asn1.Explicit(0, asn1.SEQUENCE([
    300       OCSP_TYPE_BASIC,
    301       asn1.OCTETSTRING(asn1.ToDER(basic_resp)),
    302     ]))
    303   ])
    304 
    305   return asn1.ToDER(resp)
    306 
    307 
    308 def DERToPEM(der):
    309   pem = '-----BEGIN CERTIFICATE-----\n'
    310   pem += der.encode('base64')
    311   pem += '-----END CERTIFICATE-----\n'
    312   return pem
    313 
    314 OCSP_STATE_GOOD = 1
    315 OCSP_STATE_REVOKED = 2
    316 OCSP_STATE_INVALID = 3
    317 OCSP_STATE_UNAUTHORIZED = 4
    318 OCSP_STATE_UNKNOWN = 5
    319 
    320 # unauthorizedDER is an OCSPResponse with a status of 6:
    321 # SEQUENCE { ENUM(6) }
    322 unauthorizedDER = '30030a0106'.decode('hex')
    323 
    324 def GenerateCertKeyAndOCSP(subject = "127.0.0.1",
    325                            ocsp_url = "http://127.0.0.1",
    326                            ocsp_state = OCSP_STATE_GOOD,
    327                            serial = 0):
    328   '''GenerateCertKeyAndOCSP returns a (cert_and_key_pem, ocsp_der) where:
    329        * cert_and_key_pem contains a certificate and private key in PEM format
    330          with the given subject common name and OCSP URL.
    331        * ocsp_der contains a DER encoded OCSP response or None if ocsp_url is
    332          None'''
    333 
    334   if serial == 0:
    335     serial = RandomNumber(16)
    336   cert_der = MakeCertificate(ISSUER_CN, bytes(subject), serial, KEY, KEY,
    337                              bytes(ocsp_url))
    338   cert_pem = DERToPEM(cert_der)
    339 
    340   ocsp_der = None
    341   if ocsp_url is not None:
    342     if ocsp_state == OCSP_STATE_UNAUTHORIZED:
    343       ocsp_der = unauthorizedDER
    344     elif ocsp_state == OCSP_STATE_INVALID:
    345       ocsp_der = '3'
    346     else:
    347       ocsp_der = MakeOCSPResponse(ISSUER_CN, KEY, serial, ocsp_state)
    348 
    349   return (cert_pem + KEY_PEM, ocsp_der)
    350