Home | History | Annotate | Download | only in web-page-replay
      1 # Copyright 2014 Google Inc. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #      http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 
     15 """Routines to generate root and server certificates.
     16 
     17 Certificate Naming Conventions:
     18   ca_cert:  crypto.X509 for the certificate authority (w/ both the pub &
     19                 priv keys)
     20   cert:  a crypto.X509 certificate (w/ just the pub key)
     21   cert_str:  a certificate string (w/ just the pub cert)
     22   key:  a private crypto.PKey  (from ca or pem)
     23   ca_cert_str:  a certificae authority string (w/ both the pub & priv certs)
     24 """
     25 
     26 import logging
     27 import os
     28 import platform
     29 import socket
     30 import subprocess
     31 import time
     32 
     33 openssl_import_error = None
     34 
     35 Error = None
     36 SSL_METHOD = None
     37 SysCallError = None
     38 VERIFY_PEER = None
     39 ZeroReturnError = None
     40 FILETYPE_PEM = None
     41 
     42 try:
     43   from OpenSSL import crypto, SSL
     44 
     45   Error = SSL.Error
     46   SSL_METHOD = SSL.SSLv23_METHOD
     47   SysCallError = SSL.SysCallError
     48   VERIFY_PEER = SSL.VERIFY_PEER
     49   ZeroReturnError = SSL.ZeroReturnError
     50   FILETYPE_PEM = crypto.FILETYPE_PEM
     51 except ImportError, e:
     52   openssl_import_error = e
     53 
     54 
     55 def get_ssl_context(method=SSL_METHOD):
     56   # One of: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or TLSv1_METHOD
     57   if openssl_import_error:
     58     raise openssl_import_error  # pylint: disable=raising-bad-type
     59   return SSL.Context(method)
     60 
     61 
     62 class WrappedConnection(object):
     63 
     64   def __init__(self, obj):
     65     self._wrapped_obj = obj
     66 
     67   def __getattr__(self, attr):
     68     if attr in self.__dict__:
     69       return getattr(self, attr)
     70     return getattr(self._wrapped_obj, attr)
     71 
     72   def recv(self, buflen=1024, flags=0):
     73     try:
     74       return self._wrapped_obj.recv(buflen, flags)
     75     except SSL.SysCallError, e:
     76       if e.args[1] == 'Unexpected EOF':
     77         return ''
     78       raise
     79     except SSL.ZeroReturnError:
     80       return ''
     81 
     82 
     83 def get_ssl_connection(context, connection):
     84   return WrappedConnection(SSL.Connection(context, connection))
     85 
     86 
     87 def load_privatekey(key, filetype=FILETYPE_PEM):
     88   """Loads obj private key object from string."""
     89   return crypto.load_privatekey(filetype, key)
     90 
     91 
     92 def load_cert(cert_str, filetype=FILETYPE_PEM):
     93   """Loads obj cert object from string."""
     94   return crypto.load_certificate(filetype, cert_str)
     95 
     96 
     97 def _dump_privatekey(key, filetype=FILETYPE_PEM):
     98   """Dumps obj private key object to string."""
     99   return crypto.dump_privatekey(filetype, key)
    100 
    101 
    102 def _dump_cert(cert, filetype=FILETYPE_PEM):
    103   """Dumps obj cert object to string."""
    104   return crypto.dump_certificate(filetype, cert)
    105 
    106 
    107 def generate_dummy_ca_cert(subject='_WebPageReplayCert'):
    108   """Generates dummy certificate authority.
    109 
    110   Args:
    111     subject: a string representing the desired root cert issuer
    112   Returns:
    113     A tuple of the public key and the private key strings for the root
    114     certificate
    115   """
    116   if openssl_import_error:
    117     raise openssl_import_error  # pylint: disable=raising-bad-type
    118 
    119   key = crypto.PKey()
    120   key.generate_key(crypto.TYPE_RSA, 1024)
    121 
    122   ca_cert = crypto.X509()
    123   ca_cert.set_serial_number(int(time.time()*10000))
    124   ca_cert.set_version(2)
    125   ca_cert.get_subject().CN = subject
    126   ca_cert.get_subject().O = subject
    127   ca_cert.gmtime_adj_notBefore(-60 * 60 * 24 * 365 * 2)
    128   ca_cert.gmtime_adj_notAfter(60 * 60 * 24 * 365 * 2)
    129   ca_cert.set_issuer(ca_cert.get_subject())
    130   ca_cert.set_pubkey(key)
    131   ca_cert.add_extensions([
    132       crypto.X509Extension('basicConstraints', True, 'CA:TRUE'),
    133       crypto.X509Extension('nsCertType', True, 'sslCA'),
    134       crypto.X509Extension('extendedKeyUsage', True,
    135                            ('serverAuth,clientAuth,emailProtection,'
    136                             'timeStamping,msCodeInd,msCodeCom,msCTLSign,'
    137                             'msSGC,msEFS,nsSGC')),
    138       crypto.X509Extension('keyUsage', False, 'keyCertSign, cRLSign'),
    139       crypto.X509Extension('subjectKeyIdentifier', False, 'hash',
    140                            subject=ca_cert),
    141       ])
    142   ca_cert.sign(key, 'sha256')
    143   key_str = _dump_privatekey(key)
    144   ca_cert_str = _dump_cert(ca_cert)
    145   return ca_cert_str, key_str
    146 
    147 
    148 def get_host_cert(host, port=443):
    149   """Contacts the host and returns its certificate."""
    150   host_certs = []
    151   def verify_cb(conn, cert, errnum, depth, ok):
    152     host_certs.append(cert)
    153     # Return True to indicates that the certificate was ok.
    154     return True
    155 
    156   context = SSL.Context(SSL.SSLv23_METHOD)
    157   context.set_verify(SSL.VERIFY_PEER, verify_cb)  # Demand a certificate
    158   s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    159   connection = SSL.Connection(context, s)
    160   try:
    161     connection.connect((host, port))
    162     connection.send('')
    163   except SSL.SysCallError:
    164     pass
    165   except socket.gaierror:
    166     logging.debug('Host name is not valid')
    167   finally:
    168     connection.shutdown()
    169     connection.close()
    170   if not host_certs:
    171     logging.warning('Unable to get host certificate from %s:%s', host, port)
    172     return ''
    173   return _dump_cert(host_certs[-1])
    174 
    175 
    176 def write_dummy_ca_cert(ca_cert_str, key_str, cert_path):
    177   """Writes four certificate files.
    178 
    179   For example, if cert_path is "mycert.pem":
    180       mycert.pem - CA plus private key
    181       mycert-cert.pem - CA in PEM format
    182       mycert-cert.cer - CA for Android
    183       mycert-cert.p12 - CA in PKCS12 format for Windows devices
    184   Args:
    185     cert_path: path string such as "mycert.pem"
    186     ca_cert_str: certificate string
    187     key_str: private key string
    188   """
    189   dirname = os.path.dirname(cert_path)
    190   if dirname and not os.path.exists(dirname):
    191     os.makedirs(dirname)
    192 
    193   root_path = os.path.splitext(cert_path)[0]
    194   ca_cert_path = root_path + '-cert.pem'
    195   android_cer_path = root_path + '-cert.cer'
    196   windows_p12_path = root_path + '-cert.p12'
    197 
    198   # Dump the CA plus private key
    199   with open(cert_path, 'w') as f:
    200     f.write(key_str)
    201     f.write(ca_cert_str)
    202 
    203   # Dump the certificate in PEM format
    204   with open(ca_cert_path, 'w') as f:
    205     f.write(ca_cert_str)
    206 
    207   # Create a .cer file with the same contents for Android
    208   with open(android_cer_path, 'w') as f:
    209     f.write(ca_cert_str)
    210 
    211   ca_cert = load_cert(ca_cert_str)
    212   key = load_privatekey(key_str)
    213   # Dump the certificate in PKCS12 format for Windows devices
    214   with open(windows_p12_path, 'w') as f:
    215     p12 = crypto.PKCS12()
    216     p12.set_certificate(ca_cert)
    217     p12.set_privatekey(key)
    218     f.write(p12.export())
    219 
    220 
    221 def generate_cert(root_ca_cert_str, server_cert_str, server_host):
    222   """Generates a cert_str with the sni field in server_cert_str signed by the
    223   root_ca_cert_str.
    224 
    225   Args:
    226     root_ca_cert_str: PEM formatted string representing the root cert
    227     server_cert_str: PEM formatted string representing cert
    228     server_host: host name to use if there is no server_cert_str
    229   Returns:
    230     a PEM formatted certificate string
    231   """
    232   EXTENSION_WHITELIST = set(['subjectAltName'])
    233 
    234   if openssl_import_error:
    235     raise openssl_import_error  # pylint: disable=raising-bad-type
    236 
    237   common_name = server_host
    238   reused_extensions = []
    239   if server_cert_str:
    240     original_cert = load_cert(server_cert_str)
    241     common_name = original_cert.get_subject().commonName
    242     for i in xrange(original_cert.get_extension_count()):
    243       original_cert_extension = original_cert.get_extension(i)
    244       if original_cert_extension.get_short_name() in EXTENSION_WHITELIST:
    245         reused_extensions.append(original_cert_extension)
    246 
    247   ca_cert = load_cert(root_ca_cert_str)
    248   ca_key = load_privatekey(root_ca_cert_str)
    249 
    250   cert = crypto.X509()
    251   cert.get_subject().CN = common_name
    252   cert.gmtime_adj_notBefore(-60 * 60)
    253   cert.gmtime_adj_notAfter(60 * 60 * 24 * 30)
    254   cert.set_issuer(ca_cert.get_subject())
    255   cert.set_serial_number(int(time.time()*10000))
    256   cert.set_pubkey(ca_key)
    257   cert.add_extensions(reused_extensions)
    258   cert.sign(ca_key, 'sha256')
    259 
    260   return _dump_cert(cert)
    261 
    262 
    263 def install_cert_in_nssdb(home_directory_path, certificate_path):
    264   """Installs a certificate into the ~/.pki/nssdb database.
    265 
    266   Args:
    267     home_directory_path: Path of the home directory where to install
    268     certificate_path: Path of a CA in PEM format
    269   """
    270   assert os.path.isdir(home_directory_path)
    271   assert platform.system() == 'Linux', \
    272       'SSL certification authority has only been tested for linux.'
    273   if (os.path.abspath(home_directory_path) ==
    274           os.path.abspath(os.environ['HOME'])):
    275     raise Exception('Modifying $HOME/.pki/nssdb compromises your machine.')
    276 
    277   cert_database_path = os.path.join(home_directory_path, '.pki', 'nssdb')
    278   def certutil(args):
    279     cmd = ['certutil', '--empty-password', '-d', 'sql:' + cert_database_path]
    280     cmd.extend(args)
    281     logging.info(subprocess.list2cmdline(cmd))
    282     subprocess.check_call(cmd)
    283 
    284   if not os.path.isdir(cert_database_path):
    285     os.makedirs(cert_database_path)
    286     certutil(['-N'])
    287 
    288   certutil(['-A', '-t', 'PC,,', '-n', certificate_path, '-i', certificate_path])
    289