Home | History | Annotate | Download | only in gslib
      1 # -*- coding: utf-8 -*-
      2 # Copyright 2014 Google Inc. All Rights Reserved.
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #     http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 """Helper functions for hashing functionality."""
     16 
     17 import base64
     18 import binascii
     19 from hashlib import md5
     20 import os
     21 
     22 from boto import config
     23 import crcmod
     24 
     25 from gslib.exception import CommandException
     26 from gslib.util import DEFAULT_FILE_BUFFER_SIZE
     27 from gslib.util import MIN_SIZE_COMPUTE_LOGGING
     28 from gslib.util import TRANSFER_BUFFER_SIZE
     29 from gslib.util import UsingCrcmodExtension
     30 
     31 
     32 SLOW_CRCMOD_WARNING = """
     33 WARNING: You have requested checksumming but your crcmod installation isn't
     34 using the module's C extension, so checksumming will run very slowly. For help
     35 installing the extension, please see:
     36   $ gsutil help crcmod
     37 """
     38 
     39 
     40 _SLOW_CRCMOD_DOWNLOAD_WARNING = """
     41 WARNING: Downloading this composite object requires integrity checking with
     42 CRC32c, but your crcmod installation isn't using the module's C extension,
     43 so the hash computation will likely throttle download performance. For help
     44 installing the extension, please see:
     45   $ gsutil help crcmod
     46 To disable slow integrity checking, see the "check_hashes" option in your
     47 boto config file.
     48 """
     49 
     50 _SLOW_CRC_EXCEPTION_TEXT = """
     51 Downloading this composite object requires integrity checking with CRC32c,
     52 but your crcmod installation isn't using the module's C extension, so the
     53 hash computation will likely throttle download performance. For help
     54 installing the extension, please see:
     55 
     56   $ gsutil help crcmod
     57 
     58 To download regardless of crcmod performance or to skip slow integrity
     59 checks, see the "check_hashes" option in your boto config file.
     60 
     61 NOTE: It is strongly recommended that you not disable integrity checks. Doing so
     62 could allow data corruption to go undetected during uploading/downloading."""
     63 
     64 
     65 _NO_HASH_CHECK_WARNING = """
     66 WARNING: This download will not be validated since your crcmod installation
     67 doesn't use the module's C extension, so the hash computation would likely
     68 throttle download performance. For help in installing the extension, please
     69 see:
     70   $ gsutil help crcmod
     71 To force integrity checking, see the "check_hashes" option in your boto config
     72 file.
     73 """
     74 
     75 
     76 # Configuration values for hashing.
     77 CHECK_HASH_IF_FAST_ELSE_FAIL = 'if_fast_else_fail'
     78 CHECK_HASH_IF_FAST_ELSE_SKIP = 'if_fast_else_skip'
     79 CHECK_HASH_ALWAYS = 'always'
     80 CHECK_HASH_NEVER = 'never'
     81 
     82 # Table storing polynomial values of x^(2^k) mod CASTAGNOLI_POLY for all k < 31,
     83 # where x^(2^k) and CASTAGNOLI_POLY are both considered polynomials. This is
     84 # sufficient since x^(2^31) mod CASTAGNOLI_POLY = x.
     85 X_POW_2K_TABLE = [2, 4, 16, 256, 65536, 517762881, 984302966,
     86                   408362264, 1503875210, 2862076957, 3884826397, 1324787473,
     87                   621200174, 1758783527, 1416537776, 1180494764, 648569364,
     88                   2521473789, 994858823, 1728245375, 3498467999, 4059169852,
     89                   3345064394, 2828422810, 2429203150, 3336788029, 860151998,
     90                   2102628683, 1033187991, 4243778976, 1123580069]
     91 # Castagnoli polynomial and its degree.
     92 CASTAGNOLI_POLY = 4812730177
     93 DEGREE = 32
     94 
     95 
     96 def ConcatCrc32c(crc_a, crc_b, num_bytes_in_b):
     97   """Computes CRC32C for concat(A, B) given crc(A), crc(B) and len(B).
     98 
     99   An explanation of the algorithm can be found at
    100   crcutil.googlecode.com/files/crc-doc.1.0.pdf.
    101 
    102   Args:
    103     crc_a: A 32-bit integer representing crc(A) with least-significant
    104            coefficient first.
    105     crc_b: Same as crc_a.
    106     num_bytes_in_b: Length of B in bytes.
    107 
    108   Returns:
    109     CRC32C for concat(A, B)
    110   """
    111   if not num_bytes_in_b:
    112     return crc_a
    113 
    114   return _ExtendByZeros(crc_a, 8 * num_bytes_in_b) ^ crc_b
    115 
    116 
    117 def _CrcMultiply(p, q):
    118   """Multiplies two polynomials together modulo CASTAGNOLI_POLY.
    119 
    120   Args:
    121     p: The first polynomial.
    122     q: The second polynomial.
    123 
    124   Returns:
    125     Result of the multiplication.
    126   """
    127 
    128   result = 0
    129   top_bit = 1 << DEGREE
    130   for _ in range(DEGREE):
    131     if p & 1:
    132       result ^= q
    133     q <<= 1
    134     if q & top_bit:
    135       q ^= CASTAGNOLI_POLY
    136     p >>= 1
    137   return result
    138 
    139 
    140 def _ExtendByZeros(crc, num_bits):
    141   """Given crc representing polynomial P(x), compute P(x)*x^num_bits.
    142 
    143   Args:
    144     crc: crc respresenting polynomial P(x).
    145     num_bits: number of bits in crc.
    146 
    147   Returns:
    148     P(x)*x^num_bits
    149   """
    150   def _ReverseBits32(crc):
    151     return int('{0:032b}'.format(crc, width=32)[::-1], 2)
    152   crc = _ReverseBits32(crc)
    153   i = 0
    154 
    155   while num_bits != 0:
    156     if num_bits & 1:
    157       crc = _CrcMultiply(crc, X_POW_2K_TABLE[i % len(X_POW_2K_TABLE)])
    158     i += 1
    159     num_bits >>= 1
    160   crc = _ReverseBits32(crc)
    161   return crc
    162 
    163 
    164 def _CalculateHashFromContents(fp, hash_alg):
    165   """Calculates a base64 digest of the contents of a seekable stream.
    166 
    167   This function resets the file pointer to position 0.
    168 
    169   Args:
    170     fp: An already-open file object.
    171     hash_alg: Instance of hashing class initialized to start state.
    172 
    173   Returns:
    174     Hash of the stream in hex string format.
    175   """
    176   hash_dict = {'placeholder': hash_alg}
    177   fp.seek(0)
    178   CalculateHashesFromContents(fp, hash_dict)
    179   fp.seek(0)
    180   return hash_dict['placeholder'].hexdigest()
    181 
    182 
    183 def CalculateHashesFromContents(fp, hash_dict, callback_processor=None):
    184   """Calculates hashes of the contents of a file.
    185 
    186   Args:
    187     fp: An already-open file object (stream will be consumed).
    188     hash_dict: Dict of (string alg_name: initialized hashing class)
    189         Hashing class will be populated with digests upon return.
    190     callback_processor: Optional callback processing class that implements
    191         Progress(integer amount of bytes processed).
    192   """
    193   while True:
    194     data = fp.read(DEFAULT_FILE_BUFFER_SIZE)
    195     if not data:
    196       break
    197     for hash_alg in hash_dict.itervalues():
    198       hash_alg.update(data)
    199     if callback_processor:
    200       callback_processor.Progress(len(data))
    201 
    202 
    203 def CalculateB64EncodedCrc32cFromContents(fp):
    204   """Calculates a base64 CRC32c checksum of the contents of a seekable stream.
    205 
    206   This function sets the stream position 0 before and after calculation.
    207 
    208   Args:
    209     fp: An already-open file object.
    210 
    211   Returns:
    212     CRC32c checksum of the file in base64 format.
    213   """
    214   return _CalculateB64EncodedHashFromContents(
    215       fp, crcmod.predefined.Crc('crc-32c'))
    216 
    217 
    218 def CalculateB64EncodedMd5FromContents(fp):
    219   """Calculates a base64 MD5 digest of the contents of a seekable stream.
    220 
    221   This function sets the stream position 0 before and after calculation.
    222 
    223   Args:
    224     fp: An already-open file object.
    225 
    226   Returns:
    227     MD5 digest of the file in base64 format.
    228   """
    229   return _CalculateB64EncodedHashFromContents(fp, md5())
    230 
    231 
    232 def CalculateMd5FromContents(fp):
    233   """Calculates a base64 MD5 digest of the contents of a seekable stream.
    234 
    235   This function sets the stream position 0 before and after calculation.
    236 
    237   Args:
    238     fp: An already-open file object.
    239 
    240   Returns:
    241     MD5 digest of the file in hex format.
    242   """
    243   return _CalculateHashFromContents(fp, md5())
    244 
    245 
    246 def Base64EncodeHash(digest_value):
    247   """Returns the base64-encoded version of the input hex digest value."""
    248   return base64.encodestring(binascii.unhexlify(digest_value)).rstrip('\n')
    249 
    250 
    251 def Base64ToHexHash(base64_hash):
    252   """Returns the hex digest value of the input base64-encoded hash.
    253 
    254   Args:
    255     base64_hash: Base64-encoded hash, which may contain newlines and single or
    256         double quotes.
    257 
    258   Returns:
    259     Hex digest of the input argument.
    260   """
    261   return binascii.hexlify(base64.decodestring(base64_hash.strip('\n"\'')))
    262 
    263 
    264 def _CalculateB64EncodedHashFromContents(fp, hash_alg):
    265   """Calculates a base64 digest of the contents of a seekable stream.
    266 
    267   This function sets the stream position 0 before and after calculation.
    268 
    269   Args:
    270     fp: An already-open file object.
    271     hash_alg: Instance of hashing class initialized to start state.
    272 
    273   Returns:
    274     Hash of the stream in base64 format.
    275   """
    276   return Base64EncodeHash(_CalculateHashFromContents(fp, hash_alg))
    277 
    278 
    279 def GetUploadHashAlgs():
    280   """Returns a dict of hash algorithms for validating an uploaded object.
    281 
    282   This is for use only with single object uploads, not compose operations
    283   such as those used by parallel composite uploads (though it can be used to
    284   validate the individual components).
    285 
    286   Returns:
    287     dict of (algorithm_name: hash_algorithm)
    288   """
    289   check_hashes_config = config.get(
    290       'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
    291   if check_hashes_config == 'never':
    292     return {}
    293   return {'md5': md5}
    294 
    295 
    296 def GetDownloadHashAlgs(logger, consider_md5=False, consider_crc32c=False):
    297   """Returns a dict of hash algorithms for validating an object.
    298 
    299   Args:
    300     logger: logging.Logger for outputting log messages.
    301     consider_md5: If True, consider using a md5 hash.
    302     consider_crc32c: If True, consider using a crc32c hash.
    303 
    304   Returns:
    305     Dict of (string, hash algorithm).
    306 
    307   Raises:
    308     CommandException if hash algorithms satisfying the boto config file
    309     cannot be returned.
    310   """
    311   check_hashes_config = config.get(
    312       'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
    313   if check_hashes_config == CHECK_HASH_NEVER:
    314     return {}
    315 
    316   hash_algs = {}
    317   if consider_md5:
    318     hash_algs['md5'] = md5
    319   elif consider_crc32c:
    320     # If the cloud provider supplies a CRC, we'll compute a checksum to
    321     # validate if we're using a native crcmod installation and MD5 isn't
    322     # offered as an alternative.
    323     if UsingCrcmodExtension(crcmod):
    324       hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
    325     elif not hash_algs:
    326       if check_hashes_config == CHECK_HASH_IF_FAST_ELSE_FAIL:
    327         raise CommandException(_SLOW_CRC_EXCEPTION_TEXT)
    328       elif check_hashes_config == CHECK_HASH_IF_FAST_ELSE_SKIP:
    329         logger.warn(_NO_HASH_CHECK_WARNING)
    330       elif check_hashes_config == CHECK_HASH_ALWAYS:
    331         logger.warn(_SLOW_CRCMOD_DOWNLOAD_WARNING)
    332         hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
    333       else:
    334         raise CommandException(
    335             'Your boto config \'check_hashes\' option is misconfigured.')
    336 
    337   return hash_algs
    338 
    339 
    340 class HashingFileUploadWrapper(object):
    341   """Wraps an input stream in a hash digester and exposes a stream interface.
    342 
    343   This class provides integrity checking during file uploads via the
    344   following properties:
    345 
    346   Calls to read will appropriately update digesters with all bytes read.
    347   Calls to seek (assuming it is supported by the wrapped stream) using
    348       os.SEEK_SET will catch up / reset the digesters to the specified
    349       position. If seek is called with a different os.SEEK mode, the caller
    350       must return to the original position using os.SEEK_SET before further
    351       reads.
    352   Calls to seek are fast if the desired position is equal to the position at
    353       the beginning of the last read call (we only need to re-hash bytes
    354       from that point on).
    355   """
    356 
    357   def __init__(self, stream, digesters, hash_algs, src_url, logger):
    358     """Initializes the wrapper.
    359 
    360     Args:
    361       stream: Input stream.
    362       digesters: dict of {string: hash digester} containing digesters, where
    363           string is the name of the hash algorithm.
    364       hash_algs: dict of {string: hash algorithm} for resetting and
    365           recalculating digesters. String is the name of the hash algorithm.
    366       src_url: Source FileUrl that is being copied.
    367       logger: For outputting log messages.
    368     """
    369     if not digesters:
    370       raise CommandException('HashingFileUploadWrapper used with no digesters.')
    371     elif not hash_algs:
    372       raise CommandException('HashingFileUploadWrapper used with no hash_algs.')
    373 
    374     self._orig_fp = stream
    375     self._digesters = digesters
    376     self._src_url = src_url
    377     self._logger = logger
    378     self._seek_away = None
    379 
    380     self._digesters_previous = {}
    381     for alg in self._digesters:
    382       self._digesters_previous[alg] = self._digesters[alg].copy()
    383     self._digesters_previous_mark = 0
    384     self._digesters_current_mark = 0
    385     self._hash_algs = hash_algs
    386 
    387   def read(self, size=-1):  # pylint: disable=invalid-name
    388     """"Reads from the wrapped file pointer and calculates hash digests.
    389 
    390     Args:
    391       size: The amount of bytes to read. If ommited or negative, the entire
    392           contents of the file will be read, hashed, and returned.
    393 
    394     Returns:
    395       Bytes from the wrapped stream.
    396 
    397     Raises:
    398       CommandException if the position of the wrapped stream is unknown.
    399     """
    400     if self._seek_away is not None:
    401       raise CommandException('Read called on hashing file pointer in an '
    402                              'unknown position; cannot correctly compute '
    403                              'digest.')
    404 
    405     data = self._orig_fp.read(size)
    406     self._digesters_previous_mark = self._digesters_current_mark
    407     for alg in self._digesters:
    408       self._digesters_previous[alg] = self._digesters[alg].copy()
    409       self._digesters[alg].update(data)
    410     self._digesters_current_mark += len(data)
    411     return data
    412 
    413   def tell(self):  # pylint: disable=invalid-name
    414     """Returns the current stream position."""
    415     return self._orig_fp.tell()
    416 
    417   def seekable(self):  # pylint: disable=invalid-name
    418     """Returns true if the stream is seekable."""
    419     return self._orig_fp.seekable()
    420 
    421   def seek(self, offset, whence=os.SEEK_SET):  # pylint: disable=invalid-name
    422     """Seeks in the wrapped file pointer and catches up hash digests.
    423 
    424     Args:
    425       offset: The offset to seek to.
    426       whence: os.SEEK_CUR, or SEEK_END, SEEK_SET.
    427 
    428     Returns:
    429       Return value from the wrapped stream's seek call.
    430     """
    431     if whence != os.SEEK_SET:
    432       # We do not catch up hashes for non-absolute seeks, and rely on the
    433       # caller to seek to an absolute position before reading.
    434       self._seek_away = self._orig_fp.tell()
    435 
    436     else:
    437       # Hashes will be correct and it's safe to call read().
    438       self._seek_away = None
    439       if offset < self._digesters_previous_mark:
    440         # This is earlier than our earliest saved digest, so we need to
    441         # reset the digesters and scan from the beginning.
    442         for alg in self._digesters:
    443           self._digesters[alg] = self._hash_algs[alg]()
    444         self._digesters_current_mark = 0
    445         self._orig_fp.seek(0)
    446         self._CatchUp(offset)
    447 
    448       elif offset == self._digesters_previous_mark:
    449         # Just load the saved digests.
    450         self._digesters_current_mark = self._digesters_previous_mark
    451         for alg in self._digesters:
    452           self._digesters[alg] = self._digesters_previous[alg]
    453 
    454       elif offset < self._digesters_current_mark:
    455         # Reset the position to our previous digest and scan forward.
    456         self._digesters_current_mark = self._digesters_previous_mark
    457         for alg in self._digesters:
    458           self._digesters[alg] = self._digesters_previous[alg]
    459         self._orig_fp.seek(self._digesters_previous_mark)
    460         self._CatchUp(offset - self._digesters_previous_mark)
    461 
    462       else:
    463         # Scan forward from our current digest and position.
    464         self._orig_fp.seek(self._digesters_current_mark)
    465         self._CatchUp(offset - self._digesters_current_mark)
    466 
    467     return self._orig_fp.seek(offset, whence)
    468 
    469   def _CatchUp(self, bytes_to_read):
    470     """Catches up hashes, but does not return data and uses little memory.
    471 
    472     Before calling this function, digesters_current_mark should be updated
    473     to the current location of the original stream and the self._digesters
    474     should be current to that point (but no further).
    475 
    476     Args:
    477       bytes_to_read: Number of bytes to catch up from the original stream.
    478     """
    479     if self._orig_fp.tell() != self._digesters_current_mark:
    480       raise CommandException(
    481           'Invalid mark when catching up hashes. Stream position %s, hash '
    482           'position %s' % (self._orig_fp.tell(), self._digesters_current_mark))
    483 
    484     for alg in self._digesters:
    485       if bytes_to_read >= MIN_SIZE_COMPUTE_LOGGING:
    486         self._logger.info('Catching up %s for %s...', alg,
    487                           self._src_url.url_string)
    488       self._digesters_previous[alg] = self._digesters[alg].copy()
    489 
    490     self._digesters_previous_mark = self._digesters_current_mark
    491     bytes_remaining = bytes_to_read
    492     bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE)
    493     while bytes_this_round:
    494       data = self._orig_fp.read(bytes_this_round)
    495       bytes_remaining -= bytes_this_round
    496       for alg in self._digesters:
    497         self._digesters[alg].update(data)
    498       bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE)
    499     self._digesters_current_mark += bytes_to_read
    500