1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """Utilities for unit testing.""" 6 7 from __future__ import print_function 8 9 import cStringIO 10 import hashlib 11 import os 12 import struct 13 import subprocess 14 15 from update_payload import common 16 from update_payload import payload 17 from update_payload import update_metadata_pb2 18 19 20 class TestError(Exception): 21 """An error during testing of update payload code.""" 22 23 24 # Private/public RSA keys used for testing. 25 _PRIVKEY_FILE_NAME = os.path.join(os.path.dirname(__file__), 26 'payload-test-key.pem') 27 _PUBKEY_FILE_NAME = os.path.join(os.path.dirname(__file__), 28 'payload-test-key.pub') 29 30 31 def KiB(count): 32 return count << 10 33 34 35 def MiB(count): 36 return count << 20 37 38 39 def GiB(count): 40 return count << 30 41 42 43 def _WriteInt(file_obj, size, is_unsigned, val): 44 """Writes a binary-encoded integer to a file. 45 46 It will do the correct conversion based on the reported size and whether or 47 not a signed number is expected. Assumes a network (big-endian) byte 48 ordering. 49 50 Args: 51 file_obj: a file object 52 size: the integer size in bytes (2, 4 or 8) 53 is_unsigned: whether it is signed or not 54 val: integer value to encode 55 56 Raises: 57 PayloadError if a write error occurred. 58 """ 59 try: 60 file_obj.write(struct.pack(common.IntPackingFmtStr(size, is_unsigned), val)) 61 except IOError, e: 62 raise payload.PayloadError('error writing to file (%s): %s' % 63 (file_obj.name, e)) 64 65 66 def _SetMsgField(msg, field_name, val): 67 """Sets or clears a field in a protobuf message.""" 68 if val is None: 69 msg.ClearField(field_name) 70 else: 71 setattr(msg, field_name, val) 72 73 74 def SignSha256(data, privkey_file_name): 75 """Signs the data's SHA256 hash with an RSA private key. 76 77 Args: 78 data: the data whose SHA256 hash we want to sign 79 privkey_file_name: private key used for signing data 80 81 Returns: 82 The signature string, prepended with an ASN1 header. 83 84 Raises: 85 TestError if something goes wrong. 86 """ 87 data_sha256_hash = common.SIG_ASN1_HEADER + hashlib.sha256(data).digest() 88 sign_cmd = ['openssl', 'rsautl', '-sign', '-inkey', privkey_file_name] 89 try: 90 sign_process = subprocess.Popen(sign_cmd, stdin=subprocess.PIPE, 91 stdout=subprocess.PIPE) 92 sig, _ = sign_process.communicate(input=data_sha256_hash) 93 except Exception as e: 94 raise TestError('signing subprocess failed: %s' % e) 95 96 return sig 97 98 99 class SignaturesGenerator(object): 100 """Generates a payload signatures data block.""" 101 102 def __init__(self): 103 self.sigs = update_metadata_pb2.Signatures() 104 105 def AddSig(self, version, data): 106 """Adds a signature to the signature sequence. 107 108 Args: 109 version: signature version (None means do not assign) 110 data: signature binary data (None means do not assign) 111 """ 112 sig = self.sigs.signatures.add() 113 if version is not None: 114 sig.version = version 115 if data is not None: 116 sig.data = data 117 118 def ToBinary(self): 119 """Returns the binary representation of the signature block.""" 120 return self.sigs.SerializeToString() 121 122 123 class PayloadGenerator(object): 124 """Generates an update payload allowing low-level control. 125 126 Attributes: 127 manifest: the protobuf containing the payload manifest 128 version: the payload version identifier 129 block_size: the block size pertaining to update operations 130 131 """ 132 133 def __init__(self, version=1): 134 self.manifest = update_metadata_pb2.DeltaArchiveManifest() 135 self.version = version 136 self.block_size = 0 137 138 @staticmethod 139 def _WriteExtent(ex, val): 140 """Returns an Extent message.""" 141 start_block, num_blocks = val 142 _SetMsgField(ex, 'start_block', start_block) 143 _SetMsgField(ex, 'num_blocks', num_blocks) 144 145 @staticmethod 146 def _AddValuesToRepeatedField(repeated_field, values, write_func): 147 """Adds values to a repeated message field.""" 148 if values: 149 for val in values: 150 new_item = repeated_field.add() 151 write_func(new_item, val) 152 153 @staticmethod 154 def _AddExtents(extents_field, values): 155 """Adds extents to an extents field.""" 156 PayloadGenerator._AddValuesToRepeatedField( 157 extents_field, values, PayloadGenerator._WriteExtent) 158 159 def SetBlockSize(self, block_size): 160 """Sets the payload's block size.""" 161 self.block_size = block_size 162 _SetMsgField(self.manifest, 'block_size', block_size) 163 164 def SetPartInfo(self, is_kernel, is_new, part_size, part_hash): 165 """Set the partition info entry. 166 167 Args: 168 is_kernel: whether this is kernel partition info 169 is_new: whether to set old (False) or new (True) info 170 part_size: the partition size (in fact, filesystem size) 171 part_hash: the partition hash 172 """ 173 if is_kernel: 174 part_info = (self.manifest.new_kernel_info if is_new 175 else self.manifest.old_kernel_info) 176 else: 177 part_info = (self.manifest.new_rootfs_info if is_new 178 else self.manifest.old_rootfs_info) 179 _SetMsgField(part_info, 'size', part_size) 180 _SetMsgField(part_info, 'hash', part_hash) 181 182 def AddOperation(self, is_kernel, op_type, data_offset=None, 183 data_length=None, src_extents=None, src_length=None, 184 dst_extents=None, dst_length=None, data_sha256_hash=None): 185 """Adds an InstallOperation entry.""" 186 operations = (self.manifest.kernel_install_operations if is_kernel 187 else self.manifest.install_operations) 188 189 op = operations.add() 190 op.type = op_type 191 192 _SetMsgField(op, 'data_offset', data_offset) 193 _SetMsgField(op, 'data_length', data_length) 194 195 self._AddExtents(op.src_extents, src_extents) 196 _SetMsgField(op, 'src_length', src_length) 197 198 self._AddExtents(op.dst_extents, dst_extents) 199 _SetMsgField(op, 'dst_length', dst_length) 200 201 _SetMsgField(op, 'data_sha256_hash', data_sha256_hash) 202 203 def SetSignatures(self, sigs_offset, sigs_size): 204 """Set the payload's signature block descriptors.""" 205 _SetMsgField(self.manifest, 'signatures_offset', sigs_offset) 206 _SetMsgField(self.manifest, 'signatures_size', sigs_size) 207 208 def SetMinorVersion(self, minor_version): 209 """Set the payload's minor version field.""" 210 _SetMsgField(self.manifest, 'minor_version', minor_version) 211 212 def _WriteHeaderToFile(self, file_obj, manifest_len): 213 """Writes a payload heaer to a file.""" 214 # We need to access protected members in Payload for writing the header. 215 # pylint: disable=W0212 216 file_obj.write(payload.Payload._PayloadHeader._MAGIC) 217 _WriteInt(file_obj, payload.Payload._PayloadHeader._VERSION_SIZE, True, 218 self.version) 219 _WriteInt(file_obj, payload.Payload._PayloadHeader._MANIFEST_LEN_SIZE, True, 220 manifest_len) 221 222 def WriteToFile(self, file_obj, manifest_len=-1, data_blobs=None, 223 sigs_data=None, padding=None): 224 """Writes the payload content to a file. 225 226 Args: 227 file_obj: a file object open for writing 228 manifest_len: manifest len to dump (otherwise computed automatically) 229 data_blobs: a list of data blobs to be concatenated to the payload 230 sigs_data: a binary Signatures message to be concatenated to the payload 231 padding: stuff to dump past the normal data blobs provided (optional) 232 """ 233 manifest = self.manifest.SerializeToString() 234 if manifest_len < 0: 235 manifest_len = len(manifest) 236 self._WriteHeaderToFile(file_obj, manifest_len) 237 file_obj.write(manifest) 238 if data_blobs: 239 for data_blob in data_blobs: 240 file_obj.write(data_blob) 241 if sigs_data: 242 file_obj.write(sigs_data) 243 if padding: 244 file_obj.write(padding) 245 246 247 class EnhancedPayloadGenerator(PayloadGenerator): 248 """Payload generator with automatic handling of data blobs. 249 250 Attributes: 251 data_blobs: a list of blobs, in the order they were added 252 curr_offset: the currently consumed offset of blobs added to the payload 253 """ 254 255 def __init__(self): 256 super(EnhancedPayloadGenerator, self).__init__() 257 self.data_blobs = [] 258 self.curr_offset = 0 259 260 def AddData(self, data_blob): 261 """Adds a (possibly orphan) data blob.""" 262 data_length = len(data_blob) 263 data_offset = self.curr_offset 264 self.curr_offset += data_length 265 self.data_blobs.append(data_blob) 266 return data_length, data_offset 267 268 def AddOperationWithData(self, is_kernel, op_type, src_extents=None, 269 src_length=None, dst_extents=None, dst_length=None, 270 data_blob=None, do_hash_data_blob=True): 271 """Adds an install operation and associated data blob. 272 273 This takes care of obtaining a hash of the data blob (if so instructed) 274 and appending it to the internally maintained list of blobs, including the 275 necessary offset/length accounting. 276 277 Args: 278 is_kernel: whether this is a kernel (True) or rootfs (False) operation 279 op_type: one of REPLACE, REPLACE_BZ, MOVE or BSDIFF 280 src_extents: list of (start, length) pairs indicating src block ranges 281 src_length: size of the src data in bytes (needed for BSDIFF) 282 dst_extents: list of (start, length) pairs indicating dst block ranges 283 dst_length: size of the dst data in bytes (needed for BSDIFF) 284 data_blob: a data blob associated with this operation 285 do_hash_data_blob: whether or not to compute and add a data blob hash 286 """ 287 data_offset = data_length = data_sha256_hash = None 288 if data_blob is not None: 289 if do_hash_data_blob: 290 data_sha256_hash = hashlib.sha256(data_blob).digest() 291 data_length, data_offset = self.AddData(data_blob) 292 293 self.AddOperation(is_kernel, op_type, data_offset=data_offset, 294 data_length=data_length, src_extents=src_extents, 295 src_length=src_length, dst_extents=dst_extents, 296 dst_length=dst_length, data_sha256_hash=data_sha256_hash) 297 298 def WriteToFileWithData(self, file_obj, sigs_data=None, 299 privkey_file_name=None, 300 do_add_pseudo_operation=False, 301 is_pseudo_in_kernel=False, padding=None): 302 """Writes the payload content to a file, optionally signing the content. 303 304 Args: 305 file_obj: a file object open for writing 306 sigs_data: signatures blob to be appended to the payload (optional; 307 payload signature fields assumed to be preset by the caller) 308 privkey_file_name: key used for signing the payload (optional; used only 309 if explicit signatures blob not provided) 310 do_add_pseudo_operation: whether a pseudo-operation should be added to 311 account for the signature blob 312 is_pseudo_in_kernel: whether the pseudo-operation should be added to 313 kernel (True) or rootfs (False) operations 314 padding: stuff to dump past the normal data blobs provided (optional) 315 316 Raises: 317 TestError: if arguments are inconsistent or something goes wrong. 318 """ 319 sigs_len = len(sigs_data) if sigs_data else 0 320 321 # Do we need to generate a genuine signatures blob? 322 do_generate_sigs_data = sigs_data is None and privkey_file_name 323 324 if do_generate_sigs_data: 325 # First, sign some arbitrary data to obtain the size of a signature blob. 326 fake_sig = SignSha256('fake-payload-data', privkey_file_name) 327 fake_sigs_gen = SignaturesGenerator() 328 fake_sigs_gen.AddSig(1, fake_sig) 329 sigs_len = len(fake_sigs_gen.ToBinary()) 330 331 # Update the payload with proper signature attributes. 332 self.SetSignatures(self.curr_offset, sigs_len) 333 334 # Add a pseudo-operation to account for the signature blob, if requested. 335 if do_add_pseudo_operation: 336 if not self.block_size: 337 raise TestError('cannot add pseudo-operation without knowing the ' 338 'payload block size') 339 self.AddOperation( 340 is_pseudo_in_kernel, common.OpType.REPLACE, 341 data_offset=self.curr_offset, data_length=sigs_len, 342 dst_extents=[(common.PSEUDO_EXTENT_MARKER, 343 (sigs_len + self.block_size - 1) / self.block_size)]) 344 345 if do_generate_sigs_data: 346 # Once all payload fields are updated, dump and sign it. 347 temp_payload_file = cStringIO.StringIO() 348 self.WriteToFile(temp_payload_file, data_blobs=self.data_blobs) 349 sig = SignSha256(temp_payload_file.getvalue(), privkey_file_name) 350 sigs_gen = SignaturesGenerator() 351 sigs_gen.AddSig(1, sig) 352 sigs_data = sigs_gen.ToBinary() 353 assert len(sigs_data) == sigs_len, 'signature blob lengths mismatch' 354 355 # Dump the whole thing, complete with data and signature blob, to a file. 356 self.WriteToFile(file_obj, data_blobs=self.data_blobs, sigs_data=sigs_data, 357 padding=padding) 358