Home | History | Annotate | Download | only in utils
      1 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """A module containing kernel handler class used by SAFT."""
      6 
      7 import hashlib
      8 import os
      9 import re
     10 
     11 TMP_FILE_NAME = 'kernel_header_dump'
     12 
     13 # Types of kernel modifications.
     14 KERNEL_BODY_MOD = 1
     15 KERNEL_VERSION_MOD = 2
     16 KERNEL_RESIGN_MOD = 3
     17 
     18 
     19 class KernelHandlerError(Exception):
     20     pass
     21 
     22 
     23 class KernelHandler(object):
     24     """An object to provide ChromeOS kernel related actions.
     25 
     26     Mostly it allows to corrupt and restore a particular kernel partition
     27     (designated by the partition name, A or B.
     28     """
     29 
     30     # This value is used to alter contents of a byte in the appropriate kernel
     31     # image. First added to corrupt the image, then subtracted to restore the
     32     # image.
     33     DELTA = 1
     34 
     35     # The maximum kernel size in MB.
     36     KERNEL_SIZE_MB = 16
     37 
     38     def __init__(self):
     39         self.os_if = None
     40         self.dump_file_name = None
     41         self.partition_map = {}
     42         self.root_dev = None
     43 
     44     def _get_version(self, device):
     45         """Get version of the kernel hosted on the passed in partition."""
     46         # 16 K should be enough to include headers and keys
     47         data = self.os_if.read_partition(device, 0x4000)
     48         return self.os_if.retrieve_body_version(data)
     49 
     50     def _get_datakey_version(self,device):
     51         """Get datakey version of kernel hosted on the passed in partition."""
     52         # 16 K should be enought to include headers and keys
     53         data = self.os_if.read_partition(device, 0x4000)
     54         return self.os_if.retrieve_datakey_version(data)
     55 
     56     def _get_partition_map(self, internal_disk=True):
     57         """Scan `cgpt show <device> output to find kernel devices.
     58 
     59         Args:
     60           internal_disk - decide whether to use internal kernel disk.
     61         """
     62         if internal_disk:
     63             target_device = self.os_if.get_internal_disk(
     64                     self.os_if.get_root_part())
     65         else:
     66             target_device = self.root_dev
     67 
     68         kernel_partitions = re.compile('KERN-([AB])')
     69         disk_map = self.os_if.run_shell_command_get_output(
     70             'cgpt show %s' % target_device)
     71 
     72         for line in disk_map:
     73             matched_line = kernel_partitions.search(line)
     74             if not matched_line:
     75                 continue
     76             label = matched_line.group(1)
     77             part_info = {}
     78             device = self.os_if.join_part(target_device, line.split()[2])
     79             part_info['device'] = device
     80             part_info['version'] = self._get_version(device)
     81             part_info['datakey_version'] = self._get_datakey_version(device)
     82             self.partition_map[label] = part_info
     83 
     84     def dump_kernel(self, section, kernel_path):
     85         """Dump the specified kernel to a file.
     86 
     87         @param section: The kernel to dump. May be A or B.
     88         @param kernel_path: The path to the kernel image.
     89         """
     90         dev = self.partition_map[section.upper()]['device']
     91         cmd = 'dd if=%s of=%s bs=%dM count=1' % (
     92                 dev, kernel_path, self.KERNEL_SIZE_MB)
     93         self.os_if.run_shell_command(cmd)
     94 
     95     def write_kernel(self, section, kernel_path):
     96         """Write a kernel image to the specified section.
     97 
     98         @param section: The kernel to write. May be A or B.
     99         @param kernel_path: The path to the kernel image to write.
    100         """
    101         dev = self.partition_map[section.upper()]['device']
    102         cmd = 'dd if=%s of=%s bs=%dM count=1' % (
    103                 kernel_path, dev, self.KERNEL_SIZE_MB)
    104         self.os_if.run_shell_command(cmd)
    105 
    106     def _modify_kernel(self, section,
    107                        delta,
    108                        modification_type=KERNEL_BODY_MOD,
    109                        key_path=None):
    110         """Modify kernel image on a disk partition.
    111 
    112         This method supports three types of kernel modification. KERNEL_BODY_MOD
    113         just adds the value of delta to the first byte of the kernel blob.
    114         This might leave the kernel corrupted (as required by the test).
    115 
    116         The second type, KERNEL_VERSION_MOD - will use 'delta' as the new
    117         version number, it will put it in the kernel header, and then will
    118         resign the kernel blob.
    119 
    120         The third type. KERNEL_RESIGN_MOD - will resign the kernel with keys in
    121         argument key_path. If key_path is None, choose dev_key_path as resign
    122         key directory.
    123         """
    124         self.dump_kernel(section, self.dump_file_name)
    125         data = list(self.os_if.read_file(self.dump_file_name))
    126         if modification_type == KERNEL_BODY_MOD:
    127             data[0] = '%c' % ((ord(data[0]) + delta) % 0x100)
    128             self.os_if.write_file(self.dump_file_name, ''.join(data))
    129             kernel_to_write = self.dump_file_name
    130         elif modification_type == KERNEL_VERSION_MOD:
    131             new_version = delta
    132             kernel_to_write = self.dump_file_name + '.new'
    133             self.os_if.run_shell_command(
    134                 'vbutil_kernel --repack %s --version %d '
    135                 '--signprivate %s --oldblob %s' % (
    136                     kernel_to_write, new_version,
    137                     os.path.join(self.dev_key_path, 'kernel_data_key.vbprivk'),
    138                     self.dump_file_name))
    139         elif modification_type == KERNEL_RESIGN_MOD:
    140             if key_path and self.os_if.is_dir(key_path):
    141                 resign_key_path = key_path
    142             else:
    143                 resign_key_path = self.dev_key_path
    144 
    145             kernel_to_write = self.dump_file_name + '.new'
    146             self.os_if.run_shell_command(
    147                 'vbutil_kernel --repack %s '
    148                 '--signprivate %s --oldblob %s --keyblock %s' % (
    149                     kernel_to_write,
    150                     os.path.join(resign_key_path, 'kernel_data_key.vbprivk'),
    151                     self.dump_file_name,
    152                     os.path.join(resign_key_path, 'kernel.keyblock')))
    153         else:
    154             return  # Unsupported mode, ignore.
    155         self.write_kernel(section, kernel_to_write)
    156 
    157     def corrupt_kernel(self, section):
    158         """Corrupt a kernel section (add DELTA to the first byte)."""
    159         self._modify_kernel(section.upper(), self.DELTA)
    160 
    161     def restore_kernel(self, section):
    162         """Restore the previously corrupted kernel."""
    163         self._modify_kernel(section.upper(), -self.DELTA)
    164 
    165     def get_version(self, section):
    166         """Return version read from this section blob's header."""
    167         return self.partition_map[section.upper()]['version']
    168 
    169     def get_datakey_version(self, section):
    170         """Return datakey version read from this section blob's header."""
    171         return self.partition_map[section.upper()]['datakey_version']
    172 
    173     def get_sha(self, section):
    174         """Return the SHA1 hash of the section blob."""
    175         s = hashlib.sha1()
    176         dev = self.partition_map[section.upper()]['device']
    177         s.update(self.os_if.read_file(dev))
    178         return s.hexdigest()
    179 
    180     def set_version(self, section, version):
    181         """Set version of this kernel blob and re-sign it."""
    182         if version < 0:
    183             raise KernelHandlerError('Bad version value %d' % version)
    184         self._modify_kernel(section.upper(), version, KERNEL_VERSION_MOD)
    185 
    186     def resign_kernel(self, section, key_path=None):
    187         """Resign kernel with original kernel version and keys in key_path."""
    188         self._modify_kernel(section.upper(),
    189                             self.get_version(section),
    190                             KERNEL_RESIGN_MOD,
    191                             key_path)
    192 
    193     def init(self, os_if, dev_key_path='.', internal_disk=True):
    194         """Initialize the kernel handler object.
    195 
    196         Input argument is an OS interface object reference.
    197         """
    198         self.os_if = os_if
    199         self.dev_key_path = dev_key_path
    200         self.root_dev = os_if.get_root_dev()
    201         self.dump_file_name = os_if.state_dir_file(TMP_FILE_NAME)
    202         self._get_partition_map(internal_disk)
    203