Home | History | Annotate | Download | only in gslib
      1 # -*- coding: utf-8 -*-
      2 # Copyright 2011 Google Inc. All Rights Reserved.
      3 # Copyright 2011, Nexenta Systems Inc.
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 #     http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 """Helper functions for copy functionality."""
     17 
     18 from __future__ import absolute_import
     19 
     20 import base64
     21 from collections import namedtuple
     22 import csv
     23 import datetime
     24 import errno
     25 import gzip
     26 from hashlib import md5
     27 import json
     28 import logging
     29 import mimetypes
     30 from operator import attrgetter
     31 import os
     32 import pickle
     33 import random
     34 import re
     35 import shutil
     36 import stat
     37 import subprocess
     38 import tempfile
     39 import textwrap
     40 import time
     41 import traceback
     42 
     43 from boto import config
     44 import crcmod
     45 
     46 import gslib
     47 from gslib.cloud_api import ArgumentException
     48 from gslib.cloud_api import CloudApi
     49 from gslib.cloud_api import NotFoundException
     50 from gslib.cloud_api import PreconditionException
     51 from gslib.cloud_api import Preconditions
     52 from gslib.cloud_api import ResumableDownloadException
     53 from gslib.cloud_api import ResumableUploadAbortException
     54 from gslib.cloud_api import ResumableUploadException
     55 from gslib.cloud_api import ResumableUploadStartOverException
     56 from gslib.cloud_api_helper import GetDownloadSerializationData
     57 from gslib.commands.compose import MAX_COMPOSE_ARITY
     58 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE
     59 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD
     60 from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE
     61 from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS
     62 from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD
     63 from gslib.cs_api_map import ApiSelector
     64 from gslib.daisy_chain_wrapper import DaisyChainWrapper
     65 from gslib.exception import CommandException
     66 from gslib.exception import HashMismatchException
     67 from gslib.file_part import FilePart
     68 from gslib.hashing_helper import Base64EncodeHash
     69 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
     70 from gslib.hashing_helper import CalculateHashesFromContents
     71 from gslib.hashing_helper import CHECK_HASH_IF_FAST_ELSE_FAIL
     72 from gslib.hashing_helper import CHECK_HASH_NEVER
     73 from gslib.hashing_helper import ConcatCrc32c
     74 from gslib.hashing_helper import GetDownloadHashAlgs
     75 from gslib.hashing_helper import GetUploadHashAlgs
     76 from gslib.hashing_helper import HashingFileUploadWrapper
     77 from gslib.parallelism_framework_util import AtomicDict
     78 from gslib.progress_callback import ConstructAnnounceText
     79 from gslib.progress_callback import FileProgressCallbackHandler
     80 from gslib.progress_callback import ProgressCallbackWithBackoff
     81 from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper
     82 from gslib.storage_url import ContainsWildcard
     83 from gslib.storage_url import StorageUrlFromString
     84 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
     85 from gslib.tracker_file import DeleteDownloadTrackerFiles
     86 from gslib.tracker_file import DeleteTrackerFile
     87 from gslib.tracker_file import GetTrackerFilePath
     88 from gslib.tracker_file import RaiseUnwritableTrackerFileException
     89 from gslib.tracker_file import ReadOrCreateDownloadTrackerFile
     90 from gslib.tracker_file import TrackerFileType
     91 from gslib.tracker_file import WriteDownloadComponentTrackerFile
     92 from gslib.translation_helper import AddS3MarkerAclToObjectMetadata
     93 from gslib.translation_helper import CopyObjectMetadata
     94 from gslib.translation_helper import DEFAULT_CONTENT_TYPE
     95 from gslib.translation_helper import GenerationFromUrlAndString
     96 from gslib.translation_helper import ObjectMetadataFromHeaders
     97 from gslib.translation_helper import PreconditionsFromHeaders
     98 from gslib.translation_helper import S3MarkerAclFromObjectMetadata
     99 from gslib.util import CheckFreeSpace
    100 from gslib.util import CheckMultiprocessingAvailableAndInit
    101 from gslib.util import CreateLock
    102 from gslib.util import DEFAULT_FILE_BUFFER_SIZE
    103 from gslib.util import DivideAndCeil
    104 from gslib.util import GetCloudApiInstance
    105 from gslib.util import GetFileSize
    106 from gslib.util import GetJsonResumableChunkSize
    107 from gslib.util import GetMaxRetryDelay
    108 from gslib.util import GetNumRetries
    109 from gslib.util import GetStreamFromFileUrl
    110 from gslib.util import HumanReadableToBytes
    111 from gslib.util import IS_WINDOWS
    112 from gslib.util import IsCloudSubdirPlaceholder
    113 from gslib.util import MakeHumanReadable
    114 from gslib.util import MIN_SIZE_COMPUTE_LOGGING
    115 from gslib.util import ResumableThreshold
    116 from gslib.util import TEN_MIB
    117 from gslib.util import UsingCrcmodExtension
    118 from gslib.util import UTF8
    119 from gslib.wildcard_iterator import CreateWildcardIterator
    120 
    121 # pylint: disable=g-import-not-at-top
    122 if IS_WINDOWS:
    123   import msvcrt
    124 
    125 # Declare copy_helper_opts as a global because namedtuple isn't aware of
    126 # assigning to a class member (which breaks pickling done by multiprocessing).
    127 # For details see
    128 # http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instance-correctly
    129 # pylint: disable=global-at-module-level
    130 global global_copy_helper_opts
    131 
    132 # In-memory map of local files that are currently opened for write. Used to
    133 # ensure that if we write to the same file twice (say, for example, because the
    134 # user specified two identical source URLs), the writes occur serially.
    135 global open_files_map, open_files_lock
    136 open_files_map = (
    137     AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available
    138     else AtomicDict(manager=gslib.util.manager))
    139 
    140 # We don't allow multiple processes on Windows, so using a process-safe lock
    141 # would be unnecessary.
    142 open_files_lock = CreateLock()
    143 
    144 # For debugging purposes; if True, files and objects that fail hash validation
    145 # will be saved with the below suffix appended.
    146 _RENAME_ON_HASH_MISMATCH = False
    147 _RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt'
    148 
    149 PARALLEL_UPLOAD_TEMP_NAMESPACE = (
    150     u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/')
    151 
    152 PARALLEL_UPLOAD_STATIC_SALT = u"""
    153 PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS.
    154 The theory is that no user will have prepended this to the front of
    155 one of their object names and then done an MD5 hash of the name, and
    156 then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object
    157 name. Note that there will be no problems with object name length since we
    158 hash the original name.
    159 """
    160 
    161 # When uploading a file, get the following fields in the response for
    162 # filling in command output and manifests.
    163 UPLOAD_RETURN_FIELDS = ['crc32c', 'etag', 'generation', 'md5Hash', 'size']
    164 
    165 # This tuple is used only to encapsulate the arguments needed for
    166 # command.Apply() in the parallel composite upload case.
    167 # Note that content_type is used instead of a full apitools Object() because
    168 # apitools objects are not picklable.
    169 # filename: String name of file.
    170 # file_start: start byte of file (may be in the middle of a file for partitioned
    171 #             files).
    172 # file_length: length of upload (may not be the entire length of a file for
    173 #              partitioned files).
    174 # src_url: FileUrl describing the source file.
    175 # dst_url: CloudUrl describing the destination component file.
    176 # canned_acl: canned_acl to apply to the uploaded file/component.
    177 # content_type: content-type for final object, used for setting content-type
    178 #               of components and final object.
    179 # tracker_file: tracker file for this component.
    180 # tracker_file_lock: tracker file lock for tracker file(s).
    181 PerformParallelUploadFileToObjectArgs = namedtuple(
    182     'PerformParallelUploadFileToObjectArgs',
    183     'filename file_start file_length src_url dst_url canned_acl '
    184     'content_type tracker_file tracker_file_lock')
    185 
    186 PerformSlicedDownloadObjectToFileArgs = namedtuple(
    187     'PerformSlicedDownloadObjectToFileArgs',
    188     'component_num src_url src_obj_metadata dst_url download_file_name '
    189     'start_byte end_byte')
    190 
    191 PerformSlicedDownloadReturnValues = namedtuple(
    192     'PerformSlicedDownloadReturnValues',
    193     'component_num crc32c bytes_transferred server_encoding')
    194 
    195 ObjectFromTracker = namedtuple('ObjectFromTracker',
    196                                'object_name generation')
    197 
    198 # TODO: Refactor this file to be less cumbersome. In particular, some of the
    199 # different paths (e.g., uploading a file to an object vs. downloading an
    200 # object to a file) could be split into separate files.
    201 
    202 # Chunk size to use while zipping/unzipping gzip files.
    203 GZIP_CHUNK_SIZE = 8192
    204 
    205 # Number of bytes to wait before updating a sliced download component tracker
    206 # file.
    207 TRACKERFILE_UPDATE_THRESHOLD = TEN_MIB
    208 
    209 PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024
    210 
    211 # S3 requires special Multipart upload logic (that we currently don't implement)
    212 # for files > 5GiB in size.
    213 S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024
    214 
    215 # TODO: Create a multiprocessing framework value allocator, then use it instead
    216 # of a dict.
    217 global suggested_sliced_transfers, suggested_sliced_transfers_lock
    218 suggested_sliced_transfers = (
    219     AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available
    220     else AtomicDict(manager=gslib.util.manager))
    221 suggested_sliced_transfers_lock = CreateLock()
    222 
    223 
    224 class FileConcurrencySkipError(Exception):
    225   """Raised when skipping a file due to a concurrent, duplicate copy."""
    226 
    227 
    228 def _RmExceptionHandler(cls, e):
    229   """Simple exception handler to allow post-completion status."""
    230   cls.logger.error(str(e))
    231 
    232 
    233 def _ParallelCopyExceptionHandler(cls, e):
    234   """Simple exception handler to allow post-completion status."""
    235   cls.logger.error(str(e))
    236   cls.op_failure_count += 1
    237   cls.logger.debug('\n\nEncountered exception while copying:\n%s\n',
    238                    traceback.format_exc())
    239 
    240 
    241 def _PerformParallelUploadFileToObject(cls, args, thread_state=None):
    242   """Function argument to Apply for performing parallel composite uploads.
    243 
    244   Args:
    245     cls: Calling Command class.
    246     args: PerformParallelUploadFileToObjectArgs tuple describing the target.
    247     thread_state: gsutil Cloud API instance to use for the operation.
    248 
    249   Returns:
    250     StorageUrl representing a successfully uploaded component.
    251   """
    252   fp = FilePart(args.filename, args.file_start, args.file_length)
    253   gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
    254   with fp:
    255     # We take many precautions with the component names that make collisions
    256     # effectively impossible. Specifying preconditions will just allow us to
    257     # reach a state in which uploads will always fail on retries.
    258     preconditions = None
    259 
    260     # Fill in content type if one was provided.
    261     dst_object_metadata = apitools_messages.Object(
    262         name=args.dst_url.object_name,
    263         bucket=args.dst_url.bucket_name,
    264         contentType=args.content_type)
    265 
    266     try:
    267       if global_copy_helper_opts.canned_acl:
    268         # No canned ACL support in JSON, force XML API to be used for
    269         # upload/copy operations.
    270         orig_prefer_api = gsutil_api.prefer_api
    271         gsutil_api.prefer_api = ApiSelector.XML
    272       ret = _UploadFileToObject(args.src_url, fp, args.file_length,
    273                                 args.dst_url, dst_object_metadata,
    274                                 preconditions, gsutil_api, cls.logger, cls,
    275                                 _ParallelCopyExceptionHandler,
    276                                 gzip_exts=None, allow_splitting=False)
    277     finally:
    278       if global_copy_helper_opts.canned_acl:
    279         gsutil_api.prefer_api = orig_prefer_api
    280 
    281   component = ret[2]
    282   _AppendComponentTrackerToParallelUploadTrackerFile(
    283       args.tracker_file, component, args.tracker_file_lock)
    284   return ret
    285 
    286 
    287 CopyHelperOpts = namedtuple('CopyHelperOpts', [
    288     'perform_mv',
    289     'no_clobber',
    290     'daisy_chain',
    291     'read_args_from_stdin',
    292     'print_ver',
    293     'use_manifest',
    294     'preserve_acl',
    295     'canned_acl',
    296     'skip_unsupported_objects',
    297     'test_callback_file'])
    298 
    299 
    300 # pylint: disable=global-variable-undefined
    301 def CreateCopyHelperOpts(perform_mv=False, no_clobber=False, daisy_chain=False,
    302                          read_args_from_stdin=False, print_ver=False,
    303                          use_manifest=False, preserve_acl=False,
    304                          canned_acl=None, skip_unsupported_objects=False,
    305                          test_callback_file=None):
    306   """Creates CopyHelperOpts for passing options to CopyHelper."""
    307   # We create a tuple with union of options needed by CopyHelper and any
    308   # copy-related functionality in CpCommand, RsyncCommand, or Command class.
    309   global global_copy_helper_opts
    310   global_copy_helper_opts = CopyHelperOpts(
    311       perform_mv=perform_mv,
    312       no_clobber=no_clobber,
    313       daisy_chain=daisy_chain,
    314       read_args_from_stdin=read_args_from_stdin,
    315       print_ver=print_ver,
    316       use_manifest=use_manifest,
    317       preserve_acl=preserve_acl,
    318       canned_acl=canned_acl,
    319       skip_unsupported_objects=skip_unsupported_objects,
    320       test_callback_file=test_callback_file)
    321   return global_copy_helper_opts
    322 
    323 
    324 # pylint: disable=global-variable-undefined
    325 # pylint: disable=global-variable-not-assigned
    326 def GetCopyHelperOpts():
    327   """Returns namedtuple holding CopyHelper options."""
    328   global global_copy_helper_opts
    329   return global_copy_helper_opts
    330 
    331 
    332 def _SelectDownloadStrategy(dst_url):
    333   """Get download strategy based on the destination object.
    334 
    335   Args:
    336     dst_url: Destination StorageUrl.
    337 
    338   Returns:
    339     gsutil Cloud API DownloadStrategy.
    340   """
    341   dst_is_special = False
    342   if dst_url.IsFileUrl():
    343     # Check explicitly first because os.stat doesn't work on 'nul' in Windows.
    344     if dst_url.object_name == os.devnull:
    345       dst_is_special = True
    346     try:
    347       mode = os.stat(dst_url.object_name).st_mode
    348       if stat.S_ISCHR(mode):
    349         dst_is_special = True
    350     except OSError:
    351       pass
    352 
    353   if dst_is_special:
    354     return CloudApi.DownloadStrategy.ONE_SHOT
    355   else:
    356     return CloudApi.DownloadStrategy.RESUMABLE
    357 
    358 
    359 def _GetUploadTrackerData(tracker_file_name, logger):
    360   """Reads tracker data from an upload tracker file if it exists.
    361 
    362   Args:
    363     tracker_file_name: Tracker file name for this upload.
    364     logger: for outputting log messages.
    365 
    366   Returns:
    367     Serialization data if the tracker file already exists (resume existing
    368     upload), None otherwise.
    369   """
    370   tracker_file = None
    371 
    372   # If we already have a matching tracker file, get the serialization data
    373   # so that we can resume the upload.
    374   try:
    375     tracker_file = open(tracker_file_name, 'r')
    376     tracker_data = tracker_file.read()
    377     return tracker_data
    378   except IOError as e:
    379     # Ignore non-existent file (happens first time a upload is attempted on an
    380     # object, or when re-starting an upload after a
    381     # ResumableUploadStartOverException), but warn user for other errors.
    382     if e.errno != errno.ENOENT:
    383       logger.warn('Couldn\'t read upload tracker file (%s): %s. Restarting '
    384                   'upload from scratch.', tracker_file_name, e.strerror)
    385   finally:
    386     if tracker_file:
    387       tracker_file.close()
    388 
    389 
    390 def InsistDstUrlNamesContainer(exp_dst_url, have_existing_dst_container,
    391                                command_name):
    392   """Ensures the destination URL names a container.
    393 
    394   Acceptable containers include directory, bucket, bucket
    395   subdir, and non-existent bucket subdir.
    396 
    397   Args:
    398     exp_dst_url: Wildcard-expanded destination StorageUrl.
    399     have_existing_dst_container: bool indicator of whether exp_dst_url
    400       names a container (directory, bucket, or existing bucket subdir).
    401     command_name: Name of command making call. May not be the same as the
    402         calling class's self.command_name in the case of commands implemented
    403         atop other commands (like mv command).
    404 
    405   Raises:
    406     CommandException: if the URL being checked does not name a container.
    407   """
    408   if ((exp_dst_url.IsFileUrl() and not exp_dst_url.IsDirectory()) or
    409       (exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket()
    410        and not have_existing_dst_container)):
    411     raise CommandException('Destination URL must name a directory, bucket, '
    412                            'or bucket\nsubdirectory for the multiple '
    413                            'source form of the %s command.' % command_name)
    414 
    415 
    416 def _ShouldTreatDstUrlAsBucketSubDir(have_multiple_srcs, dst_url,
    417                                      have_existing_dest_subdir,
    418                                      src_url_names_container,
    419                                      recursion_requested):
    420   """Checks whether dst_url should be treated as a bucket "sub-directory".
    421 
    422   The decision about whether something constitutes a bucket "sub-directory"
    423   depends on whether there are multiple sources in this request and whether
    424   there is an existing bucket subdirectory. For example, when running the
    425   command:
    426     gsutil cp file gs://bucket/abc
    427   if there's no existing gs://bucket/abc bucket subdirectory we should copy
    428   file to the object gs://bucket/abc. In contrast, if
    429   there's an existing gs://bucket/abc bucket subdirectory we should copy
    430   file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc
    431   exists, when running the command:
    432     gsutil cp file1 file2 gs://bucket/abc
    433   we should copy file1 to gs://bucket/abc/file1 (and similarly for file2).
    434   Finally, for recursive copies, if the source is a container then we should
    435   copy to a container as the target.  For example, when running the command:
    436     gsutil cp -r dir1 gs://bucket/dir2
    437   we should copy the subtree of dir1 to gs://bucket/dir2.
    438 
    439   Note that we don't disallow naming a bucket "sub-directory" where there's
    440   already an object at that URL. For example it's legitimate (albeit
    441   confusing) to have an object called gs://bucket/dir and
    442   then run the command
    443   gsutil cp file1 file2 gs://bucket/dir
    444   Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1,
    445   and gs://bucket/dir/file2.
    446 
    447   Args:
    448     have_multiple_srcs: Bool indicator of whether this is a multi-source
    449         operation.
    450     dst_url: StorageUrl to check.
    451     have_existing_dest_subdir: bool indicator whether dest is an existing
    452       subdirectory.
    453     src_url_names_container: bool indicator of whether the source URL
    454       is a container.
    455     recursion_requested: True if a recursive operation has been requested.
    456 
    457   Returns:
    458     bool indicator.
    459   """
    460   if have_existing_dest_subdir:
    461     return True
    462   if dst_url.IsCloudUrl():
    463     return (have_multiple_srcs or
    464             (src_url_names_container and recursion_requested))
    465 
    466 
    467 def _ShouldTreatDstUrlAsSingleton(have_multiple_srcs,
    468                                   have_existing_dest_subdir, dst_url,
    469                                   recursion_requested):
    470   """Checks that dst_url names a single file/object after wildcard expansion.
    471 
    472   It is possible that an object path might name a bucket sub-directory.
    473 
    474   Args:
    475     have_multiple_srcs: Bool indicator of whether this is a multi-source
    476         operation.
    477     have_existing_dest_subdir: bool indicator whether dest is an existing
    478       subdirectory.
    479     dst_url: StorageUrl to check.
    480     recursion_requested: True if a recursive operation has been requested.
    481 
    482   Returns:
    483     bool indicator.
    484   """
    485   if recursion_requested:
    486     return False
    487   if dst_url.IsFileUrl():
    488     return not dst_url.IsDirectory()
    489   else:  # dst_url.IsCloudUrl()
    490     return (not have_multiple_srcs and
    491             not have_existing_dest_subdir and
    492             dst_url.IsObject())
    493 
    494 
    495 def ConstructDstUrl(src_url, exp_src_url, src_url_names_container,
    496                     have_multiple_srcs, exp_dst_url, have_existing_dest_subdir,
    497                     recursion_requested):
    498   """Constructs the destination URL for a given exp_src_url/exp_dst_url pair.
    499 
    500   Uses context-dependent naming rules that mimic Linux cp and mv behavior.
    501 
    502   Args:
    503     src_url: Source StorageUrl to be copied.
    504     exp_src_url: Single StorageUrl from wildcard expansion of src_url.
    505     src_url_names_container: True if src_url names a container (including the
    506         case of a wildcard-named bucket subdir (like gs://bucket/abc,
    507         where gs://bucket/abc/* matched some objects).
    508     have_multiple_srcs: True if this is a multi-source request. This can be
    509         true if src_url wildcard-expanded to multiple URLs or if there were
    510         multiple source URLs in the request.
    511     exp_dst_url: the expanded StorageUrl requested for the cp destination.
    512         Final written path is constructed from this plus a context-dependent
    513         variant of src_url.
    514     have_existing_dest_subdir: bool indicator whether dest is an existing
    515       subdirectory.
    516     recursion_requested: True if a recursive operation has been requested.
    517 
    518   Returns:
    519     StorageUrl to use for copy.
    520 
    521   Raises:
    522     CommandException if destination object name not specified for
    523     source and source is a stream.
    524   """
    525   if _ShouldTreatDstUrlAsSingleton(
    526       have_multiple_srcs, have_existing_dest_subdir, exp_dst_url,
    527       recursion_requested):
    528     # We're copying one file or object to one file or object.
    529     return exp_dst_url
    530 
    531   if exp_src_url.IsFileUrl() and exp_src_url.IsStream():
    532     if have_existing_dest_subdir:
    533       raise CommandException('Destination object name needed when '
    534                              'source is a stream')
    535     return exp_dst_url
    536 
    537   if not recursion_requested and not have_multiple_srcs:
    538     # We're copying one file or object to a subdirectory. Append final comp
    539     # of exp_src_url to exp_dst_url.
    540     src_final_comp = exp_src_url.object_name.rpartition(src_url.delim)[-1]
    541     return StorageUrlFromString('%s%s%s' % (
    542         exp_dst_url.url_string.rstrip(exp_dst_url.delim),
    543         exp_dst_url.delim, src_final_comp))
    544 
    545   # Else we're copying multiple sources to a directory, bucket, or a bucket
    546   # "sub-directory".
    547 
    548   # Ensure exp_dst_url ends in delim char if we're doing a multi-src copy or
    549   # a copy to a directory. (The check for copying to a directory needs
    550   # special-case handling so that the command:
    551   #   gsutil cp gs://bucket/obj dir
    552   # will turn into file://dir/ instead of file://dir -- the latter would cause
    553   # the file "dirobj" to be created.)
    554   # Note: need to check have_multiple_srcs or src_url.names_container()
    555   # because src_url could be a bucket containing a single object, named
    556   # as gs://bucket.
    557   if ((have_multiple_srcs or src_url_names_container or
    558        (exp_dst_url.IsFileUrl() and exp_dst_url.IsDirectory()))
    559       and not exp_dst_url.url_string.endswith(exp_dst_url.delim)):
    560     exp_dst_url = StorageUrlFromString('%s%s' % (exp_dst_url.url_string,
    561                                                  exp_dst_url.delim))
    562 
    563   # Making naming behavior match how things work with local Linux cp and mv
    564   # operations depends on many factors, including whether the destination is a
    565   # container, the plurality of the source(s), and whether the mv command is
    566   # being used:
    567   # 1. For the "mv" command that specifies a non-existent destination subdir,
    568   #    renaming should occur at the level of the src subdir, vs appending that
    569   #    subdir beneath the dst subdir like is done for copying. For example:
    570   #      gsutil rm -r gs://bucket
    571   #      gsutil cp -r dir1 gs://bucket
    572   #      gsutil cp -r dir2 gs://bucket/subdir1
    573   #      gsutil mv gs://bucket/subdir1 gs://bucket/subdir2
    574   #    would (if using cp naming behavior) end up with paths like:
    575   #      gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops
    576   #    whereas mv naming behavior should result in:
    577   #      gs://bucket/subdir2/dir2/.svn/all-wcprops
    578   # 2. Copying from directories, buckets, or bucket subdirs should result in
    579   #    objects/files mirroring the source directory hierarchy. For example:
    580   #      gsutil cp dir1/dir2 gs://bucket
    581   #    should create the object gs://bucket/dir2/file2, assuming dir1/dir2
    582   #    contains file2).
    583   #    To be consistent with Linux cp behavior, there's one more wrinkle when
    584   #    working with subdirs: The resulting object names depend on whether the
    585   #    destination subdirectory exists. For example, if gs://bucket/subdir
    586   #    exists, the command:
    587   #      gsutil cp -r dir1/dir2 gs://bucket/subdir
    588   #    should create objects named like gs://bucket/subdir/dir2/a/b/c. In
    589   #    contrast, if gs://bucket/subdir does not exist, this same command
    590   #    should create objects named like gs://bucket/subdir/a/b/c.
    591   # 3. Copying individual files or objects to dirs, buckets or bucket subdirs
    592   #    should result in objects/files named by the final source file name
    593   #    component. Example:
    594   #      gsutil cp dir1/*.txt gs://bucket
    595   #    should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt,
    596   #    assuming dir1 contains f1.txt and f2.txt.
    597 
    598   recursive_move_to_new_subdir = False
    599   if (global_copy_helper_opts.perform_mv and recursion_requested
    600       and src_url_names_container and not have_existing_dest_subdir):
    601     # Case 1. Handle naming rules for bucket subdir mv. Here we want to
    602     # line up the src_url against its expansion, to find the base to build
    603     # the new name. For example, running the command:
    604     #   gsutil mv gs://bucket/abcd gs://bucket/xyz
    605     # when processing exp_src_url=gs://bucket/abcd/123
    606     # exp_src_url_tail should become /123
    607     # Note: mv.py code disallows wildcard specification of source URL.
    608     recursive_move_to_new_subdir = True
    609     exp_src_url_tail = (
    610         exp_src_url.url_string[len(src_url.url_string):])
    611     dst_key_name = '%s/%s' % (exp_dst_url.object_name.rstrip('/'),
    612                               exp_src_url_tail.strip('/'))
    613 
    614   elif src_url_names_container and (exp_dst_url.IsCloudUrl() or
    615                                     exp_dst_url.IsDirectory()):
    616     # Case 2.  Container copy to a destination other than a file.
    617     # Build dst_key_name from subpath of exp_src_url past
    618     # where src_url ends. For example, for src_url=gs://bucket/ and
    619     # exp_src_url=gs://bucket/src_subdir/obj, dst_key_name should be
    620     # src_subdir/obj.
    621     src_url_path_sans_final_dir = GetPathBeforeFinalDir(src_url)
    622     dst_key_name = exp_src_url.versionless_url_string[
    623         len(src_url_path_sans_final_dir):].lstrip(src_url.delim)
    624     # Handle case where dst_url is a non-existent subdir.
    625     if not have_existing_dest_subdir:
    626       dst_key_name = dst_key_name.partition(src_url.delim)[-1]
    627     # Handle special case where src_url was a directory named with '.' or
    628     # './', so that running a command like:
    629     #   gsutil cp -r . gs://dest
    630     # will produce obj names of the form gs://dest/abc instead of
    631     # gs://dest/./abc.
    632     if dst_key_name.startswith('.%s' % os.sep):
    633       dst_key_name = dst_key_name[2:]
    634 
    635   else:
    636     # Case 3.
    637     dst_key_name = exp_src_url.object_name.rpartition(src_url.delim)[-1]
    638 
    639   if (not recursive_move_to_new_subdir and (
    640       exp_dst_url.IsFileUrl() or _ShouldTreatDstUrlAsBucketSubDir(
    641           have_multiple_srcs, exp_dst_url, have_existing_dest_subdir,
    642           src_url_names_container, recursion_requested))):
    643     if exp_dst_url.object_name and exp_dst_url.object_name.endswith(
    644         exp_dst_url.delim):
    645       dst_key_name = '%s%s%s' % (
    646           exp_dst_url.object_name.rstrip(exp_dst_url.delim),
    647           exp_dst_url.delim, dst_key_name)
    648     else:
    649       delim = exp_dst_url.delim if exp_dst_url.object_name else ''
    650       dst_key_name = '%s%s%s' % (exp_dst_url.object_name or '',
    651                                  delim, dst_key_name)
    652 
    653   new_exp_dst_url = exp_dst_url.Clone()
    654   new_exp_dst_url.object_name = dst_key_name.replace(src_url.delim,
    655                                                      exp_dst_url.delim)
    656   return new_exp_dst_url
    657 
    658 
    659 def _CreateDigestsFromDigesters(digesters):
    660   digests = {}
    661   if digesters:
    662     for alg in digesters:
    663       digests[alg] = base64.encodestring(
    664           digesters[alg].digest()).rstrip('\n')
    665   return digests
    666 
    667 
    668 def _CreateDigestsFromLocalFile(logger, algs, file_name, final_file_name,
    669                                 src_obj_metadata):
    670   """Creates a base64 CRC32C and/or MD5 digest from file_name.
    671 
    672   Args:
    673     logger: For outputting log messages.
    674     algs: List of algorithms to compute.
    675     file_name: File to digest.
    676     final_file_name: Permanent location to be used for the downloaded file
    677                      after validation (used for logging).
    678     src_obj_metadata: Metadata of source object.
    679 
    680   Returns:
    681     Dict of algorithm name : base 64 encoded digest
    682   """
    683   hash_dict = {}
    684   if 'md5' in algs:
    685     hash_dict['md5'] = md5()
    686   if 'crc32c' in algs:
    687     hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c')
    688   with open(file_name, 'rb') as fp:
    689     CalculateHashesFromContents(
    690         fp, hash_dict, ProgressCallbackWithBackoff(
    691             src_obj_metadata.size,
    692             FileProgressCallbackHandler(
    693                 ConstructAnnounceText('Hashing', final_file_name),
    694                 logger).call))
    695   digests = {}
    696   for alg_name, digest in hash_dict.iteritems():
    697     digests[alg_name] = Base64EncodeHash(digest.hexdigest())
    698   return digests
    699 
    700 
    701 def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata,
    702                       dst_obj_metadata):
    703   """Validates integrity of two cloud objects copied via daisy-chain.
    704 
    705   Args:
    706     logger: for outputting log messages.
    707     src_url: CloudUrl for source cloud object.
    708     dst_url: CloudUrl for destination cloud object.
    709     src_obj_metadata: Cloud Object metadata for object being downloaded from.
    710     dst_obj_metadata: Cloud Object metadata for object being uploaded to.
    711 
    712   Raises:
    713     CommandException: if cloud digests don't match local digests.
    714   """
    715   checked_one = False
    716   download_hashes = {}
    717   upload_hashes = {}
    718   if src_obj_metadata.md5Hash:
    719     download_hashes['md5'] = src_obj_metadata.md5Hash
    720   if src_obj_metadata.crc32c:
    721     download_hashes['crc32c'] = src_obj_metadata.crc32c
    722   if dst_obj_metadata.md5Hash:
    723     upload_hashes['md5'] = dst_obj_metadata.md5Hash
    724   if dst_obj_metadata.crc32c:
    725     upload_hashes['crc32c'] = dst_obj_metadata.crc32c
    726 
    727   for alg, upload_b64_digest in upload_hashes.iteritems():
    728     if alg not in download_hashes:
    729       continue
    730 
    731     download_b64_digest = download_hashes[alg]
    732     logger.debug(
    733         'Comparing source vs destination %s-checksum for %s. (%s/%s)', alg,
    734         dst_url, download_b64_digest, upload_b64_digest)
    735     if download_b64_digest != upload_b64_digest:
    736       raise HashMismatchException(
    737           '%s signature for source object (%s) doesn\'t match '
    738           'destination object digest (%s). Object (%s) will be deleted.' % (
    739               alg, download_b64_digest, upload_b64_digest, dst_url))
    740     checked_one = True
    741   if not checked_one:
    742     # One known way this can currently happen is when downloading objects larger
    743     # than 5 GiB from S3 (for which the etag is not an MD5).
    744     logger.warn(
    745         'WARNING: Found no hashes to validate object downloaded from %s and '
    746         'uploaded to %s. Integrity cannot be assured without hashes.',
    747         src_url, dst_url)
    748 
    749 
    750 def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests,
    751                  is_upload=False):
    752   """Validates integrity by comparing cloud digest to local digest.
    753 
    754   Args:
    755     logger: for outputting log messages.
    756     obj_url: CloudUrl for cloud object.
    757     obj_metadata: Cloud Object being downloaded from or uploaded to.
    758     file_name: Local file name on disk being downloaded to or uploaded from
    759                (used only for logging).
    760     digests: Computed Digests for the object.
    761     is_upload: If true, comparing for an uploaded object (controls logging).
    762 
    763   Raises:
    764     CommandException: if cloud digests don't match local digests.
    765   """
    766   local_hashes = digests
    767   cloud_hashes = {}
    768   if obj_metadata.md5Hash:
    769     cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n')
    770   if obj_metadata.crc32c:
    771     cloud_hashes['crc32c'] = obj_metadata.crc32c.rstrip('\n')
    772 
    773   checked_one = False
    774   for alg in local_hashes:
    775     if alg not in cloud_hashes:
    776       continue
    777 
    778     local_b64_digest = local_hashes[alg]
    779     cloud_b64_digest = cloud_hashes[alg]
    780     logger.debug(
    781         'Comparing local vs cloud %s-checksum for %s. (%s/%s)', alg, file_name,
    782         local_b64_digest, cloud_b64_digest)
    783     if local_b64_digest != cloud_b64_digest:
    784 
    785       raise HashMismatchException(
    786           '%s signature computed for local file (%s) doesn\'t match '
    787           'cloud-supplied digest (%s). %s (%s) will be deleted.' % (
    788               alg, local_b64_digest, cloud_b64_digest,
    789               'Cloud object' if is_upload else 'Local file',
    790               obj_url if is_upload else file_name))
    791     checked_one = True
    792   if not checked_one:
    793     if is_upload:
    794       logger.warn(
    795           'WARNING: Found no hashes to validate object uploaded to %s. '
    796           'Integrity cannot be assured without hashes.', obj_url)
    797     else:
    798     # One known way this can currently happen is when downloading objects larger
    799     # than 5 GB from S3 (for which the etag is not an MD5).
    800       logger.warn(
    801           'WARNING: Found no hashes to validate object downloaded to %s. '
    802           'Integrity cannot be assured without hashes.', file_name)
    803 
    804 
    805 def IsNoClobberServerException(e):
    806   """Checks to see if the server attempted to clobber a file.
    807 
    808   In this case we specified via a precondition that we didn't want the file
    809   clobbered.
    810 
    811   Args:
    812     e: The Exception that was generated by a failed copy operation
    813 
    814   Returns:
    815     bool indicator - True indicates that the server did attempt to clobber
    816         an existing file.
    817   """
    818   return ((isinstance(e, PreconditionException)) or
    819           (isinstance(e, ResumableUploadException) and '412' in e.message))
    820 
    821 
    822 def CheckForDirFileConflict(exp_src_url, dst_url):
    823   """Checks whether copying exp_src_url into dst_url is not possible.
    824 
    825      This happens if a directory exists in local file system where a file
    826      needs to go or vice versa. In that case we print an error message and
    827      exits. Example: if the file "./x" exists and you try to do:
    828        gsutil cp gs://mybucket/x/y .
    829      the request can't succeed because it requires a directory where
    830      the file x exists.
    831 
    832      Note that we don't enforce any corresponding restrictions for buckets,
    833      because the flat namespace semantics for buckets doesn't prohibit such
    834      cases the way hierarchical file systems do. For example, if a bucket
    835      contains an object called gs://bucket/dir and then you run the command:
    836        gsutil cp file1 file2 gs://bucket/dir
    837      you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and
    838      gs://bucket/dir/file2.
    839 
    840   Args:
    841     exp_src_url: Expanded source StorageUrl.
    842     dst_url: Destination StorageUrl.
    843 
    844   Raises:
    845     CommandException: if errors encountered.
    846   """
    847   if dst_url.IsCloudUrl():
    848     # The problem can only happen for file destination URLs.
    849     return
    850   dst_path = dst_url.object_name
    851   final_dir = os.path.dirname(dst_path)
    852   if os.path.isfile(final_dir):
    853     raise CommandException('Cannot retrieve %s because a file exists '
    854                            'where a directory needs to be created (%s).' %
    855                            (exp_src_url.url_string, final_dir))
    856   if os.path.isdir(dst_path):
    857     raise CommandException('Cannot retrieve %s because a directory exists '
    858                            '(%s) where the file needs to be created.' %
    859                            (exp_src_url.url_string, dst_path))
    860 
    861 
    862 def _PartitionFile(fp, file_size, src_url, content_type, canned_acl,
    863                    dst_bucket_url, random_prefix, tracker_file,
    864                    tracker_file_lock):
    865   """Partitions a file into FilePart objects to be uploaded and later composed.
    866 
    867   These objects, when composed, will match the original file. This entails
    868   splitting the file into parts, naming and forming a destination URL for each
    869   part, and also providing the PerformParallelUploadFileToObjectArgs
    870   corresponding to each part.
    871 
    872   Args:
    873     fp: The file object to be partitioned.
    874     file_size: The size of fp, in bytes.
    875     src_url: Source FileUrl from the original command.
    876     content_type: content type for the component and final objects.
    877     canned_acl: The user-provided canned_acl, if applicable.
    878     dst_bucket_url: CloudUrl for the destination bucket
    879     random_prefix: The randomly-generated prefix used to prevent collisions
    880                    among the temporary component names.
    881     tracker_file: The path to the parallel composite upload tracker file.
    882     tracker_file_lock: The lock protecting access to the tracker file.
    883 
    884   Returns:
    885     dst_args: The destination URIs for the temporary component objects.
    886   """
    887   parallel_composite_upload_component_size = HumanReadableToBytes(
    888       config.get('GSUtil', 'parallel_composite_upload_component_size',
    889                  DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE))
    890   (num_components, component_size) = _GetPartitionInfo(
    891       file_size, MAX_COMPOSE_ARITY, parallel_composite_upload_component_size)
    892 
    893   dst_args = {}  # Arguments to create commands and pass to subprocesses.
    894   file_names = []  # Used for the 2-step process of forming dst_args.
    895   for i in range(num_components):
    896     # "Salt" the object name with something a user is very unlikely to have
    897     # used in an object name, then hash the extended name to make sure
    898     # we don't run into problems with name length. Using a deterministic
    899     # naming scheme for the temporary components allows users to take
    900     # advantage of resumable uploads for each component.
    901     encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode(UTF8)
    902     content_md5 = md5()
    903     content_md5.update(encoded_name)
    904     digest = content_md5.hexdigest()
    905     temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE +
    906                       digest + '_' + str(i))
    907     tmp_dst_url = dst_bucket_url.Clone()
    908     tmp_dst_url.object_name = temp_file_name
    909 
    910     if i < (num_components - 1):
    911       # Every component except possibly the last is the same size.
    912       file_part_length = component_size
    913     else:
    914       # The last component just gets all of the remaining bytes.
    915       file_part_length = (file_size - ((num_components -1) * component_size))
    916     offset = i * component_size
    917     func_args = PerformParallelUploadFileToObjectArgs(
    918         fp.name, offset, file_part_length, src_url, tmp_dst_url, canned_acl,
    919         content_type, tracker_file, tracker_file_lock)
    920     file_names.append(temp_file_name)
    921     dst_args[temp_file_name] = func_args
    922 
    923   return dst_args
    924 
    925 
    926 def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata,
    927                                canned_acl, file_size, preconditions, gsutil_api,
    928                                command_obj, copy_exception_handler):
    929   """Uploads a local file to a cloud object using parallel composite upload.
    930 
    931   The file is partitioned into parts, and then the parts are uploaded in
    932   parallel, composed to form the original destination object, and deleted.
    933 
    934   Args:
    935     fp: The file object to be uploaded.
    936     src_url: FileUrl representing the local file.
    937     dst_url: CloudUrl representing the destination file.
    938     dst_obj_metadata: apitools Object describing the destination object.
    939     canned_acl: The canned acl to apply to the object, if any.
    940     file_size: The size of the source file in bytes.
    941     preconditions: Cloud API Preconditions for the final object.
    942     gsutil_api: gsutil Cloud API instance to use.
    943     command_obj: Command object (for calling Apply).
    944     copy_exception_handler: Copy exception handler (for use in Apply).
    945 
    946   Returns:
    947     Elapsed upload time, uploaded Object with generation, crc32c, and size
    948     fields populated.
    949   """
    950   start_time = time.time()
    951   dst_bucket_url = StorageUrlFromString(dst_url.bucket_url_string)
    952   api_selector = gsutil_api.GetApiSelector(provider=dst_url.scheme)
    953   # Determine which components, if any, have already been successfully
    954   # uploaded.
    955   tracker_file = GetTrackerFilePath(dst_url, TrackerFileType.PARALLEL_UPLOAD,
    956                                     api_selector, src_url)
    957   tracker_file_lock = CreateLock()
    958   (random_prefix, existing_components) = (
    959       _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock))
    960 
    961   # Create the initial tracker file for the upload.
    962   _CreateParallelUploadTrackerFile(tracker_file, random_prefix,
    963                                    existing_components, tracker_file_lock)
    964 
    965   # Get the set of all components that should be uploaded.
    966   dst_args = _PartitionFile(
    967       fp, file_size, src_url, dst_obj_metadata.contentType, canned_acl,
    968       dst_bucket_url, random_prefix, tracker_file, tracker_file_lock)
    969 
    970   (components_to_upload, existing_components, existing_objects_to_delete) = (
    971       FilterExistingComponents(dst_args, existing_components, dst_bucket_url,
    972                                gsutil_api))
    973 
    974   # In parallel, copy all of the file parts that haven't already been
    975   # uploaded to temporary objects.
    976   cp_results = command_obj.Apply(
    977       _PerformParallelUploadFileToObject, components_to_upload,
    978       copy_exception_handler, ('op_failure_count', 'total_bytes_transferred'),
    979       arg_checker=gslib.command.DummyArgChecker,
    980       parallel_operations_override=True, should_return_results=True)
    981   uploaded_components = []
    982   for cp_result in cp_results:
    983     uploaded_components.append(cp_result[2])
    984   components = uploaded_components + existing_components
    985 
    986   if len(components) == len(dst_args):
    987     # Only try to compose if all of the components were uploaded successfully.
    988 
    989     def _GetComponentNumber(component):
    990       return int(component.object_name[component.object_name.rfind('_')+1:])
    991     # Sort the components so that they will be composed in the correct order.
    992     components = sorted(components, key=_GetComponentNumber)
    993 
    994     request_components = []
    995     for component_url in components:
    996       src_obj_metadata = (
    997           apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
    998               name=component_url.object_name))
    999       if component_url.HasGeneration():
   1000         src_obj_metadata.generation = long(component_url.generation)
   1001       request_components.append(src_obj_metadata)
   1002 
   1003     composed_object = gsutil_api.ComposeObject(
   1004         request_components, dst_obj_metadata, preconditions=preconditions,
   1005         provider=dst_url.scheme, fields=['generation', 'crc32c', 'size'])
   1006 
   1007     try:
   1008       # Make sure only to delete things that we know were successfully
   1009       # uploaded (as opposed to all of the objects that we attempted to
   1010       # create) so that we don't delete any preexisting objects, except for
   1011       # those that were uploaded by a previous, failed run and have since
   1012       # changed (but still have an old generation lying around).
   1013       objects_to_delete = components + existing_objects_to_delete
   1014       command_obj.Apply(
   1015           _DeleteTempComponentObjectFn, objects_to_delete, _RmExceptionHandler,
   1016           arg_checker=gslib.command.DummyArgChecker,
   1017           parallel_operations_override=True)
   1018     except Exception:  # pylint: disable=broad-except
   1019       # If some of the delete calls fail, don't cause the whole command to
   1020       # fail. The copy was successful iff the compose call succeeded, so
   1021       # reduce this to a warning.
   1022       logging.warning(
   1023           'Failed to delete some of the following temporary objects:\n' +
   1024           '\n'.join(dst_args.keys()))
   1025     finally:
   1026       with tracker_file_lock:
   1027         if os.path.exists(tracker_file):
   1028           os.unlink(tracker_file)
   1029   else:
   1030     # Some of the components failed to upload. In this case, we want to exit
   1031     # without deleting the objects.
   1032     raise CommandException(
   1033         'Some temporary components were not uploaded successfully. '
   1034         'Please retry this upload.')
   1035 
   1036   elapsed_time = time.time() - start_time
   1037   return elapsed_time, composed_object
   1038 
   1039 
   1040 def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url,
   1041                                      file_size, canned_acl=None):
   1042   """Determines whether parallel composite upload strategy should be used.
   1043 
   1044   Args:
   1045     logger: for outputting log messages.
   1046     allow_splitting: If false, then this function returns false.
   1047     src_url: FileUrl corresponding to a local file.
   1048     dst_url: CloudUrl corresponding to destination cloud object.
   1049     file_size: The size of the source file, in bytes.
   1050     canned_acl: Canned ACL to apply to destination object, if any.
   1051 
   1052   Returns:
   1053     True iff a parallel upload should be performed on the source file.
   1054   """
   1055   global suggested_slice_transfers, suggested_sliced_transfers_lock
   1056   parallel_composite_upload_threshold = HumanReadableToBytes(config.get(
   1057       'GSUtil', 'parallel_composite_upload_threshold',
   1058       DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD))
   1059 
   1060   all_factors_but_size = (
   1061       allow_splitting  # Don't split the pieces multiple times.
   1062       and not src_url.IsStream()  # We can't partition streams.
   1063       and dst_url.scheme == 'gs'  # Compose is only for gs.
   1064       and not canned_acl)  # TODO: Implement canned ACL support for compose.
   1065 
   1066   # Since parallel composite uploads are disabled by default, make user aware of
   1067   # them.
   1068   # TODO: Once compiled crcmod is being distributed by major Linux distributions
   1069   # remove this check.
   1070   if (all_factors_but_size and parallel_composite_upload_threshold == 0
   1071       and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD):
   1072     with suggested_sliced_transfers_lock:
   1073       if not suggested_sliced_transfers.get('suggested'):
   1074         logger.info('\n'.join(textwrap.wrap(
   1075             '==> NOTE: You are uploading one or more large file(s), which '
   1076             'would run significantly faster if you enable parallel composite '
   1077             'uploads. This feature can be enabled by editing the '
   1078             '"parallel_composite_upload_threshold" value in your .boto '
   1079             'configuration file. However, note that if you do this you and any '
   1080             'users that download such composite files will need to have a '
   1081             'compiled crcmod installed (see "gsutil help crcmod").')) + '\n')
   1082         suggested_sliced_transfers['suggested'] = True
   1083 
   1084   return (all_factors_but_size
   1085           and parallel_composite_upload_threshold > 0
   1086           and file_size >= parallel_composite_upload_threshold)
   1087 
   1088 
   1089 def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id,
   1090                          treat_nonexistent_object_as_subdir=False):
   1091   """Expands wildcard if present in url_str.
   1092 
   1093   Args:
   1094     url_str: String representation of requested url.
   1095     gsutil_api: gsutil Cloud API instance to use.
   1096     debug: debug level to use (for iterators).
   1097     project_id: project ID to use (for iterators).
   1098     treat_nonexistent_object_as_subdir: indicates if should treat a non-existent
   1099                                         object as a subdir.
   1100 
   1101   Returns:
   1102       (exp_url, have_existing_dst_container)
   1103       where exp_url is a StorageUrl
   1104       and have_existing_dst_container is a bool indicating whether
   1105       exp_url names an existing directory, bucket, or bucket subdirectory.
   1106       In the case where we match a subdirectory AND an object, the
   1107       object is returned.
   1108 
   1109   Raises:
   1110     CommandException: if url_str matched more than 1 URL.
   1111   """
   1112   # Handle wildcarded url case.
   1113   if ContainsWildcard(url_str):
   1114     blr_expansion = list(CreateWildcardIterator(url_str, gsutil_api,
   1115                                                 debug=debug,
   1116                                                 project_id=project_id))
   1117     if len(blr_expansion) != 1:
   1118       raise CommandException('Destination (%s) must match exactly 1 URL' %
   1119                              url_str)
   1120     blr = blr_expansion[0]
   1121     # BLR is either an OBJECT, PREFIX, or BUCKET; the latter two represent
   1122     # directories.
   1123     return (StorageUrlFromString(blr.url_string), not blr.IsObject())
   1124 
   1125   storage_url = StorageUrlFromString(url_str)
   1126 
   1127   # Handle non-wildcarded URL.
   1128   if storage_url.IsFileUrl():
   1129     return (storage_url, storage_url.IsDirectory())
   1130 
   1131   # At this point we have a cloud URL.
   1132   if storage_url.IsBucket():
   1133     return (storage_url, True)
   1134 
   1135   # For object/prefix URLs, there are four cases that indicate the destination
   1136   # is a cloud subdirectory; these are always considered to be an existing
   1137   # container. Checking each case allows gsutil to provide Unix-like
   1138   # destination folder semantics, but requires up to three HTTP calls, noted
   1139   # below.
   1140 
   1141   # Case 1: If a placeholder object ending with '/' exists.
   1142   if IsCloudSubdirPlaceholder(storage_url):
   1143     return (storage_url, True)
   1144 
   1145   # HTTP call to make an eventually consistent check for a matching prefix,
   1146   # _$folder$, or empty listing.
   1147   expansion_empty = True
   1148   list_iterator = gsutil_api.ListObjects(
   1149       storage_url.bucket_name, prefix=storage_url.object_name, delimiter='/',
   1150       provider=storage_url.scheme, fields=['prefixes', 'items/name'])
   1151   for obj_or_prefix in list_iterator:
   1152     # To conserve HTTP calls for the common case, we make a single listing
   1153     # that covers prefixes and object names. Listing object names covers the
   1154     # _$folder$ case and the nonexistent-object-as-subdir case. However, if
   1155     # there are many existing objects for which the target URL is an exact
   1156     # prefix, this listing could be paginated and span multiple HTTP calls.
   1157     # If this case becomes common, we could heurestically abort the
   1158     # listing operation after the first page of results and just query for the
   1159     # _$folder$ object directly using GetObjectMetadata.
   1160     expansion_empty = False
   1161 
   1162     if obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.PREFIX:
   1163       # Case 2: If there is a matching prefix when listing the destination URL.
   1164       return (storage_url, True)
   1165     elif (obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.OBJECT and
   1166           obj_or_prefix.data.name == storage_url.object_name + '_$folder$'):
   1167       # Case 3: If a placeholder object matching destination + _$folder$
   1168       # exists.
   1169       return (storage_url, True)
   1170 
   1171   # Case 4: If no objects/prefixes matched, and nonexistent objects should be
   1172   # treated as subdirectories.
   1173   return (storage_url, expansion_empty and treat_nonexistent_object_as_subdir)
   1174 
   1175 
   1176 def FixWindowsNaming(src_url, dst_url):
   1177   """Translates Windows pathnames to cloud pathnames.
   1178 
   1179   Rewrites the destination URL built by ConstructDstUrl().
   1180 
   1181   Args:
   1182     src_url: Source StorageUrl to be copied.
   1183     dst_url: The destination StorageUrl built by ConstructDstUrl().
   1184 
   1185   Returns:
   1186     StorageUrl to use for copy.
   1187   """
   1188   if (src_url.IsFileUrl() and src_url.delim == '\\'
   1189       and dst_url.IsCloudUrl()):
   1190     trans_url_str = re.sub(r'\\', '/', dst_url.url_string)
   1191     dst_url = StorageUrlFromString(trans_url_str)
   1192   return dst_url
   1193 
   1194 
   1195 def SrcDstSame(src_url, dst_url):
   1196   """Checks if src_url and dst_url represent the same object or file.
   1197 
   1198   We don't handle anything about hard or symbolic links.
   1199 
   1200   Args:
   1201     src_url: Source StorageUrl.
   1202     dst_url: Destination StorageUrl.
   1203 
   1204   Returns:
   1205     Bool indicator.
   1206   """
   1207   if src_url.IsFileUrl() and dst_url.IsFileUrl():
   1208     # Translate a/b/./c to a/b/c, so src=dst comparison below works.
   1209     new_src_path = os.path.normpath(src_url.object_name)
   1210     new_dst_path = os.path.normpath(dst_url.object_name)
   1211     return new_src_path == new_dst_path
   1212   else:
   1213     return (src_url.url_string == dst_url.url_string and
   1214             src_url.generation == dst_url.generation)
   1215 
   1216 
   1217 def _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata):
   1218   """Logs copy operation, including Content-Type if appropriate.
   1219 
   1220   Args:
   1221     logger: logger instance to use for output.
   1222     src_url: Source StorageUrl.
   1223     dst_url: Destination StorageUrl.
   1224     dst_obj_metadata: Object-specific metadata that should be overidden during
   1225                       the copy.
   1226   """
   1227   if (dst_url.IsCloudUrl() and dst_obj_metadata and
   1228       dst_obj_metadata.contentType):
   1229     content_type_msg = ' [Content-Type=%s]' % dst_obj_metadata.contentType
   1230   else:
   1231     content_type_msg = ''
   1232   if src_url.IsFileUrl() and src_url.IsStream():
   1233     logger.info('Copying from <STDIN>%s...', content_type_msg)
   1234   else:
   1235     logger.info('Copying %s%s...', src_url.url_string, content_type_msg)
   1236 
   1237 
   1238 # pylint: disable=undefined-variable
   1239 def _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
   1240                             dst_obj_metadata, preconditions, gsutil_api,
   1241                             logger):
   1242   """Performs copy-in-the cloud from specified src to dest object.
   1243 
   1244   Args:
   1245     src_url: Source CloudUrl.
   1246     src_obj_metadata: Metadata for source object; must include etag and size.
   1247     dst_url: Destination CloudUrl.
   1248     dst_obj_metadata: Object-specific metadata that should be overidden during
   1249                       the copy.
   1250     preconditions: Preconditions to use for the copy.
   1251     gsutil_api: gsutil Cloud API instance to use for the copy.
   1252     logger: logging.Logger for log message output.
   1253 
   1254   Returns:
   1255     (elapsed_time, bytes_transferred, dst_url with generation,
   1256     md5 hash of destination) excluding overhead like initial GET.
   1257 
   1258   Raises:
   1259     CommandException: if errors encountered.
   1260   """
   1261   start_time = time.time()
   1262 
   1263   progress_callback = FileProgressCallbackHandler(
   1264       ConstructAnnounceText('Copying', dst_url.url_string), logger).call
   1265   if global_copy_helper_opts.test_callback_file:
   1266     with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
   1267       progress_callback = pickle.loads(test_fp.read()).call
   1268   dst_obj = gsutil_api.CopyObject(
   1269       src_obj_metadata, dst_obj_metadata, src_generation=src_url.generation,
   1270       canned_acl=global_copy_helper_opts.canned_acl,
   1271       preconditions=preconditions, progress_callback=progress_callback,
   1272       provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
   1273 
   1274   end_time = time.time()
   1275 
   1276   result_url = dst_url.Clone()
   1277   result_url.generation = GenerationFromUrlAndString(result_url,
   1278                                                      dst_obj.generation)
   1279 
   1280   return (end_time - start_time, src_obj_metadata.size, result_url,
   1281           dst_obj.md5Hash)
   1282 
   1283 
   1284 def _SetContentTypeFromFile(src_url, dst_obj_metadata):
   1285   """Detects and sets Content-Type if src_url names a local file.
   1286 
   1287   Args:
   1288     src_url: Source StorageUrl.
   1289     dst_obj_metadata: Object-specific metadata that should be overidden during
   1290                      the copy.
   1291   """
   1292   # contentType == '' if user requested default type.
   1293   if (dst_obj_metadata.contentType is None and src_url.IsFileUrl()
   1294       and not src_url.IsStream()):
   1295     # Only do content type recognition if src_url is a file. Object-to-object
   1296     # copies with no -h Content-Type specified re-use the content type of the
   1297     # source object.
   1298     object_name = src_url.object_name
   1299     content_type = None
   1300     # Streams (denoted by '-') are expected to be 'application/octet-stream'
   1301     # and 'file' would partially consume them.
   1302     if object_name != '-':
   1303       if config.getbool('GSUtil', 'use_magicfile', False):
   1304         p = subprocess.Popen(['file', '--mime-type', object_name],
   1305                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
   1306         output, error = p.communicate()
   1307         p.stdout.close()
   1308         p.stderr.close()
   1309         if p.returncode != 0 or error:
   1310           raise CommandException(
   1311               'Encountered error running "file --mime-type %s" '
   1312               '(returncode=%d).\n%s' % (object_name, p.returncode, error))
   1313         # Parse output by removing line delimiter and splitting on last ":
   1314         content_type = output.rstrip().rpartition(': ')[2]
   1315       else:
   1316         content_type = mimetypes.guess_type(object_name)[0]
   1317     if not content_type:
   1318       content_type = DEFAULT_CONTENT_TYPE
   1319     dst_obj_metadata.contentType = content_type
   1320 
   1321 
   1322 # pylint: disable=undefined-variable
   1323 def _UploadFileToObjectNonResumable(src_url, src_obj_filestream,
   1324                                     src_obj_size, dst_url, dst_obj_metadata,
   1325                                     preconditions, gsutil_api, logger):
   1326   """Uploads the file using a non-resumable strategy.
   1327 
   1328   Args:
   1329     src_url: Source StorageUrl to upload.
   1330     src_obj_filestream: File pointer to uploadable bytes.
   1331     src_obj_size: Size of the source object.
   1332     dst_url: Destination StorageUrl for the upload.
   1333     dst_obj_metadata: Metadata for the target object.
   1334     preconditions: Preconditions for the upload, if any.
   1335     gsutil_api: gsutil Cloud API instance to use for the upload.
   1336     logger: For outputting log messages.
   1337 
   1338   Returns:
   1339     Elapsed upload time, uploaded Object with generation, md5, and size fields
   1340     populated.
   1341   """
   1342   progress_callback = FileProgressCallbackHandler(
   1343       ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
   1344   if global_copy_helper_opts.test_callback_file:
   1345     with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
   1346       progress_callback = pickle.loads(test_fp.read()).call
   1347   start_time = time.time()
   1348 
   1349   if src_url.IsStream():
   1350     # TODO: gsutil-beta: Provide progress callbacks for streaming uploads.
   1351     uploaded_object = gsutil_api.UploadObjectStreaming(
   1352         src_obj_filestream, object_metadata=dst_obj_metadata,
   1353         canned_acl=global_copy_helper_opts.canned_acl,
   1354         preconditions=preconditions, progress_callback=progress_callback,
   1355         provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
   1356   else:
   1357     uploaded_object = gsutil_api.UploadObject(
   1358         src_obj_filestream, object_metadata=dst_obj_metadata,
   1359         canned_acl=global_copy_helper_opts.canned_acl, size=src_obj_size,
   1360         preconditions=preconditions, progress_callback=progress_callback,
   1361         provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
   1362   end_time = time.time()
   1363   elapsed_time = end_time - start_time
   1364 
   1365   return elapsed_time, uploaded_object
   1366 
   1367 
   1368 # pylint: disable=undefined-variable
   1369 def _UploadFileToObjectResumable(src_url, src_obj_filestream,
   1370                                  src_obj_size, dst_url, dst_obj_metadata,
   1371                                  preconditions, gsutil_api, logger):
   1372   """Uploads the file using a resumable strategy.
   1373 
   1374   Args:
   1375     src_url: Source FileUrl to upload.  Must not be a stream.
   1376     src_obj_filestream: File pointer to uploadable bytes.
   1377     src_obj_size: Size of the source object.
   1378     dst_url: Destination StorageUrl for the upload.
   1379     dst_obj_metadata: Metadata for the target object.
   1380     preconditions: Preconditions for the upload, if any.
   1381     gsutil_api: gsutil Cloud API instance to use for the upload.
   1382     logger: for outputting log messages.
   1383 
   1384   Returns:
   1385     Elapsed upload time, uploaded Object with generation, md5, and size fields
   1386     populated.
   1387   """
   1388   tracker_file_name = GetTrackerFilePath(
   1389       dst_url, TrackerFileType.UPLOAD,
   1390       gsutil_api.GetApiSelector(provider=dst_url.scheme))
   1391 
   1392   def _UploadTrackerCallback(serialization_data):
   1393     """Creates a new tracker file for starting an upload from scratch.
   1394 
   1395     This function is called by the gsutil Cloud API implementation and the
   1396     the serialization data is implementation-specific.
   1397 
   1398     Args:
   1399       serialization_data: Serialization data used in resuming the upload.
   1400     """
   1401     tracker_file = None
   1402     try:
   1403       tracker_file = open(tracker_file_name, 'w')
   1404       tracker_file.write(str(serialization_data))
   1405     except IOError as e:
   1406       RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror)
   1407     finally:
   1408       if tracker_file:
   1409         tracker_file.close()
   1410 
   1411   # This contains the upload URL, which will uniquely identify the
   1412   # destination object.
   1413   tracker_data = _GetUploadTrackerData(tracker_file_name, logger)
   1414   if tracker_data:
   1415     logger.info(
   1416         'Resuming upload for %s', src_url.url_string)
   1417 
   1418   retryable = True
   1419 
   1420   progress_callback = FileProgressCallbackHandler(
   1421       ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
   1422   if global_copy_helper_opts.test_callback_file:
   1423     with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
   1424       progress_callback = pickle.loads(test_fp.read()).call
   1425 
   1426   start_time = time.time()
   1427   num_startover_attempts = 0
   1428   # This loop causes us to retry when the resumable upload failed in a way that
   1429   # requires starting over with a new upload ID. Retries within a single upload
   1430   # ID within the current process are handled in
   1431   # gsutil_api.UploadObjectResumable, and retries within a single upload ID
   1432   # spanning processes happens if an exception occurs not caught below (which
   1433   # will leave the tracker file in place, and cause the upload ID to be reused
   1434   # the next time the user runs gsutil and attempts the same upload).
   1435   while retryable:
   1436     try:
   1437       uploaded_object = gsutil_api.UploadObjectResumable(
   1438           src_obj_filestream, object_metadata=dst_obj_metadata,
   1439           canned_acl=global_copy_helper_opts.canned_acl,
   1440           preconditions=preconditions, provider=dst_url.scheme,
   1441           size=src_obj_size, serialization_data=tracker_data,
   1442           fields=UPLOAD_RETURN_FIELDS,
   1443           tracker_callback=_UploadTrackerCallback,
   1444           progress_callback=progress_callback)
   1445       retryable = False
   1446     except ResumableUploadStartOverException, e:
   1447       # This can happen, for example, if the server sends a 410 response code.
   1448       # In that case the current resumable upload ID can't be reused, so delete
   1449       # the tracker file and try again up to max retries.
   1450       num_startover_attempts += 1
   1451       retryable = (num_startover_attempts < GetNumRetries())
   1452       if not retryable:
   1453         raise
   1454 
   1455       # If the server sends a 404 response code, then the upload should only
   1456       # be restarted if it was the object (and not the bucket) that was missing.
   1457       try:
   1458         gsutil_api.GetBucket(dst_obj_metadata.bucket, provider=dst_url.scheme)
   1459       except NotFoundException:
   1460         raise
   1461 
   1462       logger.info('Restarting upload from scratch after exception %s', e)
   1463       DeleteTrackerFile(tracker_file_name)
   1464       tracker_data = None
   1465       src_obj_filestream.seek(0)
   1466       # Reset the progress callback handler.
   1467       progress_callback = FileProgressCallbackHandler(
   1468           ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
   1469       logger.info('\n'.join(textwrap.wrap(
   1470           'Resumable upload of %s failed with a response code indicating we '
   1471           'need to start over with a new resumable upload ID. Backing off '
   1472           'and retrying.' % src_url.url_string)))
   1473       time.sleep(min(random.random() * (2 ** num_startover_attempts),
   1474                      GetMaxRetryDelay()))
   1475     except ResumableUploadAbortException:
   1476       retryable = False
   1477       raise
   1478     finally:
   1479       if not retryable:
   1480         DeleteTrackerFile(tracker_file_name)
   1481 
   1482   end_time = time.time()
   1483   elapsed_time = end_time - start_time
   1484 
   1485   return (elapsed_time, uploaded_object)
   1486 
   1487 
   1488 def _CompressFileForUpload(src_url, src_obj_filestream, src_obj_size, logger):
   1489   """Compresses a to-be-uploaded local file to save bandwidth.
   1490 
   1491   Args:
   1492     src_url: Source FileUrl.
   1493     src_obj_filestream: Read stream of the source file - will be consumed
   1494                         and closed.
   1495     src_obj_size: Size of the source file.
   1496     logger: for outputting log messages.
   1497 
   1498   Returns:
   1499     StorageUrl path to compressed file, compressed file size.
   1500   """
   1501   # TODO: Compress using a streaming model as opposed to all at once here.
   1502   if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING:
   1503     logger.info(
   1504         'Compressing %s (to tmp)...', src_url)
   1505   (gzip_fh, gzip_path) = tempfile.mkstemp()
   1506   gzip_fp = None
   1507   try:
   1508     # Check for temp space. Assume the compressed object is at most 2x
   1509     # the size of the object (normally should compress to smaller than
   1510     # the object)
   1511     if CheckFreeSpace(gzip_path) < 2*int(src_obj_size):
   1512       raise CommandException('Inadequate temp space available to compress '
   1513                              '%s. See the CHANGING TEMP DIRECTORIES section '
   1514                              'of "gsutil help cp" for more info.' % src_url)
   1515     gzip_fp = gzip.open(gzip_path, 'wb')
   1516     data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
   1517     while data:
   1518       gzip_fp.write(data)
   1519       data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
   1520   finally:
   1521     if gzip_fp:
   1522       gzip_fp.close()
   1523     os.close(gzip_fh)
   1524     src_obj_filestream.close()
   1525   gzip_size = os.path.getsize(gzip_path)
   1526   return StorageUrlFromString(gzip_path), gzip_size
   1527 
   1528 
   1529 def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size,
   1530                         dst_url, dst_obj_metadata, preconditions, gsutil_api,
   1531                         logger, command_obj, copy_exception_handler,
   1532                         gzip_exts=None, allow_splitting=True):
   1533   """Uploads a local file to an object.
   1534 
   1535   Args:
   1536     src_url: Source FileUrl.
   1537     src_obj_filestream: Read stream of the source file to be read and closed.
   1538     src_obj_size: Size of the source file.
   1539     dst_url: Destination CloudUrl.
   1540     dst_obj_metadata: Metadata to be applied to the destination object.
   1541     preconditions: Preconditions to use for the copy.
   1542     gsutil_api: gsutil Cloud API to use for the copy.
   1543     logger: for outputting log messages.
   1544     command_obj: command object for use in Apply in parallel composite uploads.
   1545     copy_exception_handler: For handling copy exceptions during Apply.
   1546     gzip_exts: List of file extensions to gzip prior to upload, if any.
   1547     allow_splitting: Whether to allow the file to be split into component
   1548                      pieces for an parallel composite upload.
   1549 
   1550   Returns:
   1551     (elapsed_time, bytes_transferred, dst_url with generation,
   1552     md5 hash of destination) excluding overhead like initial GET.
   1553 
   1554   Raises:
   1555     CommandException: if errors encountered.
   1556   """
   1557   if not dst_obj_metadata or not dst_obj_metadata.contentLanguage:
   1558     content_language = config.get_value('GSUtil', 'content_language')
   1559     if content_language:
   1560       dst_obj_metadata.contentLanguage = content_language
   1561 
   1562   fname_parts = src_url.object_name.split('.')
   1563   upload_url = src_url
   1564   upload_stream = src_obj_filestream
   1565   upload_size = src_obj_size
   1566   zipped_file = False
   1567   if gzip_exts and len(fname_parts) > 1 and fname_parts[-1] in gzip_exts:
   1568     upload_url, upload_size = _CompressFileForUpload(
   1569         src_url, src_obj_filestream, src_obj_size, logger)
   1570     upload_stream = open(upload_url.object_name, 'rb')
   1571     dst_obj_metadata.contentEncoding = 'gzip'
   1572     zipped_file = True
   1573 
   1574   elapsed_time = None
   1575   uploaded_object = None
   1576   hash_algs = GetUploadHashAlgs()
   1577   digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
   1578 
   1579   parallel_composite_upload = _ShouldDoParallelCompositeUpload(
   1580       logger, allow_splitting, upload_url, dst_url, src_obj_size,
   1581       canned_acl=global_copy_helper_opts.canned_acl)
   1582 
   1583   if (src_url.IsStream() and
   1584       gsutil_api.GetApiSelector(provider=dst_url.scheme) == ApiSelector.JSON):
   1585     orig_stream = upload_stream
   1586     # Add limited seekable properties to the stream via buffering.
   1587     upload_stream = ResumableStreamingJsonUploadWrapper(
   1588         orig_stream, GetJsonResumableChunkSize())
   1589 
   1590   if not parallel_composite_upload and len(hash_algs):
   1591     # Parallel composite uploads calculate hashes per-component in subsequent
   1592     # calls to this function, but the composition of the final object is a
   1593     # cloud-only operation.
   1594     wrapped_filestream = HashingFileUploadWrapper(upload_stream, digesters,
   1595                                                   hash_algs, upload_url, logger)
   1596   else:
   1597     wrapped_filestream = upload_stream
   1598 
   1599   try:
   1600     if parallel_composite_upload:
   1601       elapsed_time, uploaded_object = _DoParallelCompositeUpload(
   1602           upload_stream, upload_url, dst_url, dst_obj_metadata,
   1603           global_copy_helper_opts.canned_acl, upload_size, preconditions,
   1604           gsutil_api, command_obj, copy_exception_handler)
   1605     elif upload_size < ResumableThreshold() or src_url.IsStream():
   1606       elapsed_time, uploaded_object = _UploadFileToObjectNonResumable(
   1607           upload_url, wrapped_filestream, upload_size, dst_url,
   1608           dst_obj_metadata, preconditions, gsutil_api, logger)
   1609     else:
   1610       elapsed_time, uploaded_object = _UploadFileToObjectResumable(
   1611           upload_url, wrapped_filestream, upload_size, dst_url,
   1612           dst_obj_metadata, preconditions, gsutil_api, logger)
   1613 
   1614   finally:
   1615     if zipped_file:
   1616       try:
   1617         os.unlink(upload_url.object_name)
   1618       # Windows sometimes complains the temp file is locked when you try to
   1619       # delete it.
   1620       except Exception:  # pylint: disable=broad-except
   1621         logger.warning(
   1622             'Could not delete %s. This can occur in Windows because the '
   1623             'temporary file is still locked.', upload_url.object_name)
   1624     # In the gzip case, this is the gzip stream.  _CompressFileForUpload will
   1625     # have already closed the original source stream.
   1626     upload_stream.close()
   1627 
   1628   if not parallel_composite_upload:
   1629     try:
   1630       digests = _CreateDigestsFromDigesters(digesters)
   1631       _CheckHashes(logger, dst_url, uploaded_object, src_url.object_name,
   1632                    digests, is_upload=True)
   1633     except HashMismatchException:
   1634       if _RENAME_ON_HASH_MISMATCH:
   1635         corrupted_obj_metadata = apitools_messages.Object(
   1636             name=dst_obj_metadata.name,
   1637             bucket=dst_obj_metadata.bucket,
   1638             etag=uploaded_object.etag)
   1639         dst_obj_metadata.name = (dst_url.object_name +
   1640                                  _RENAME_ON_HASH_MISMATCH_SUFFIX)
   1641         gsutil_api.CopyObject(corrupted_obj_metadata,
   1642                               dst_obj_metadata, provider=dst_url.scheme)
   1643       # If the digest doesn't match, delete the object.
   1644       gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name,
   1645                               generation=uploaded_object.generation,
   1646                               provider=dst_url.scheme)
   1647       raise
   1648 
   1649   result_url = dst_url.Clone()
   1650 
   1651   result_url.generation = uploaded_object.generation
   1652   result_url.generation = GenerationFromUrlAndString(
   1653       result_url, uploaded_object.generation)
   1654 
   1655   return (elapsed_time, uploaded_object.size, result_url,
   1656           uploaded_object.md5Hash)
   1657 
   1658 
   1659 def _GetDownloadFile(dst_url, src_obj_metadata, logger):
   1660   """Creates a new download file, and deletes the file that will be replaced.
   1661 
   1662   Names and creates a temporary file for this download. Also, if there is an
   1663   existing file at the path where this file will be placed after the download
   1664   is completed, that file will be deleted.
   1665 
   1666   Args:
   1667     dst_url: Destination FileUrl.
   1668     src_obj_metadata: Metadata from the source object.
   1669     logger: for outputting log messages.
   1670 
   1671   Returns:
   1672     (download_file_name, need_to_unzip)
   1673     download_file_name: The name of the temporary file to which the object will
   1674                         be downloaded.
   1675     need_to_unzip: If true, a temporary zip file was used and must be
   1676                    uncompressed as part of validation.
   1677   """
   1678   dir_name = os.path.dirname(dst_url.object_name)
   1679   if dir_name and not os.path.exists(dir_name):
   1680     # Do dir creation in try block so can ignore case where dir already
   1681     # exists. This is needed to avoid a race condition when running gsutil
   1682     # -m cp.
   1683     try:
   1684       os.makedirs(dir_name)
   1685     except OSError, e:
   1686       if e.errno != errno.EEXIST:
   1687         raise
   1688 
   1689   need_to_unzip = False
   1690   # For gzipped objects download to a temp file and unzip. For the XML API,
   1691   # this represents the result of a HEAD request. For the JSON API, this is
   1692   # the stored encoding which the service may not respect. However, if the
   1693   # server sends decompressed bytes for a file that is stored compressed
   1694   # (double compressed case), there is no way we can validate the hash and
   1695   # we will fail our hash check for the object.
   1696   if (src_obj_metadata.contentEncoding and
   1697       src_obj_metadata.contentEncoding.lower().endswith('gzip')):
   1698     need_to_unzip = True
   1699     download_file_name = _GetDownloadTempZipFileName(dst_url)
   1700     logger.info(
   1701         'Downloading to temp gzip filename %s', download_file_name)
   1702   else:
   1703     download_file_name = _GetDownloadTempFileName(dst_url)
   1704 
   1705   # If a file exists at the permanent destination (where the file will be moved
   1706   # after the download is completed), delete it here to reduce disk space
   1707   # requirements.
   1708   if os.path.exists(dst_url.object_name):
   1709     os.unlink(dst_url.object_name)
   1710 
   1711   # Downloads open the temporary download file in r+b mode, which requires it
   1712   # to already exist, so we create it here if it doesn't exist already.
   1713   fp = open(download_file_name, 'ab')
   1714   fp.close()
   1715   return download_file_name, need_to_unzip
   1716 
   1717 
   1718 def _ShouldDoSlicedDownload(download_strategy, src_obj_metadata,
   1719                             allow_splitting, logger):
   1720   """Determines whether the sliced download strategy should be used.
   1721 
   1722   Args:
   1723     download_strategy: CloudApi download strategy.
   1724     src_obj_metadata: Metadata from the source object.
   1725     allow_splitting: If false, then this function returns false.
   1726     logger: logging.Logger for log message output.
   1727 
   1728   Returns:
   1729     True iff a sliced download should be performed on the source file.
   1730   """
   1731   sliced_object_download_threshold = HumanReadableToBytes(config.get(
   1732       'GSUtil', 'sliced_object_download_threshold',
   1733       DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD))
   1734 
   1735   max_components = config.getint(
   1736       'GSUtil', 'sliced_object_download_max_components',
   1737       DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS)
   1738 
   1739   # Don't use sliced download if it will prevent us from performing an
   1740   # integrity check.
   1741   check_hashes_config = config.get(
   1742       'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
   1743   parallel_hashing = src_obj_metadata.crc32c and UsingCrcmodExtension(crcmod)
   1744   hashing_okay = parallel_hashing or check_hashes_config == CHECK_HASH_NEVER
   1745 
   1746   use_slice = (
   1747       allow_splitting
   1748       and download_strategy is not CloudApi.DownloadStrategy.ONE_SHOT
   1749       and max_components > 1
   1750       and hashing_okay
   1751       and sliced_object_download_threshold > 0
   1752       and src_obj_metadata.size >= sliced_object_download_threshold)
   1753 
   1754   if (not use_slice
   1755       and src_obj_metadata.size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD
   1756       and not UsingCrcmodExtension(crcmod)
   1757       and check_hashes_config != CHECK_HASH_NEVER):
   1758     with suggested_sliced_transfers_lock:
   1759       if not suggested_sliced_transfers.get('suggested'):
   1760         logger.info('\n'.join(textwrap.wrap(
   1761             '==> NOTE: You are downloading one or more large file(s), which '
   1762             'would run significantly faster if you enabled sliced object '
   1763             'uploads. This feature is enabled by default but requires that '
   1764             'compiled crcmod be installed (see "gsutil help crcmod").')) + '\n')
   1765         suggested_sliced_transfers['suggested'] = True
   1766 
   1767   return use_slice
   1768 
   1769 
   1770 def _PerformSlicedDownloadObjectToFile(cls, args, thread_state=None):
   1771   """Function argument to Apply for performing sliced downloads.
   1772 
   1773   Args:
   1774     cls: Calling Command class.
   1775     args: PerformSlicedDownloadObjectToFileArgs tuple describing the target.
   1776     thread_state: gsutil Cloud API instance to use for the operation.
   1777 
   1778   Returns:
   1779     PerformSlicedDownloadReturnValues named-tuple filled with:
   1780     component_num: The component number for this download.
   1781     crc32c: CRC32C hash value (integer) of the downloaded bytes.
   1782     bytes_transferred: The number of bytes transferred, potentially less
   1783                        than the component size if the download was resumed.
   1784   """
   1785   gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
   1786   hash_algs = GetDownloadHashAlgs(
   1787       cls.logger, consider_crc32c=args.src_obj_metadata.crc32c)
   1788   digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
   1789 
   1790   (bytes_transferred, server_encoding) = (
   1791       _DownloadObjectToFileResumable(args.src_url, args.src_obj_metadata,
   1792                                      args.dst_url, args.download_file_name,
   1793                                      gsutil_api, cls.logger, digesters,
   1794                                      component_num=args.component_num,
   1795                                      start_byte=args.start_byte,
   1796                                      end_byte=args.end_byte))
   1797 
   1798   crc32c_val = None
   1799   if 'crc32c' in digesters:
   1800     crc32c_val = digesters['crc32c'].crcValue
   1801   return PerformSlicedDownloadReturnValues(
   1802       args.component_num, crc32c_val, bytes_transferred, server_encoding)
   1803 
   1804 
   1805 def _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url,
   1806                                         download_file_name, logger,
   1807                                         api_selector, num_components):
   1808   """Maintains sliced download tracker files in order to permit resumability.
   1809 
   1810   Reads or creates a sliced download tracker file representing this object
   1811   download. Upon an attempt at cross-process resumption, the contents of the
   1812   sliced download tracker file are verified to make sure a resumption is
   1813   possible and appropriate. In the case that a resumption should not be
   1814   attempted, existing component tracker files are deleted (to prevent child
   1815   processes from attempting resumption), and a new sliced download tracker
   1816   file is created.
   1817 
   1818   Args:
   1819     src_obj_metadata: Metadata from the source object. Must include etag and
   1820                       generation.
   1821     dst_url: Destination FileUrl.
   1822     download_file_name: Temporary file name to be used for the download.
   1823     logger: for outputting log messages.
   1824     api_selector: The Cloud API implementation used.
   1825     num_components: The number of components to perform this download with.
   1826   """
   1827   assert src_obj_metadata.etag
   1828   tracker_file = None
   1829 
   1830   # Only can happen if the resumable threshold is set higher than the
   1831   # parallel transfer threshold.
   1832   if src_obj_metadata.size < ResumableThreshold():
   1833     return
   1834 
   1835   tracker_file_name = GetTrackerFilePath(dst_url,
   1836                                          TrackerFileType.SLICED_DOWNLOAD,
   1837                                          api_selector)
   1838 
   1839   # Check to see if we should attempt resuming the download.
   1840   try:
   1841     fp = open(download_file_name, 'rb')
   1842     existing_file_size = GetFileSize(fp)
   1843     # A parallel resumption should be attempted only if the destination file
   1844     # size is exactly the same as the source size and the tracker file matches.
   1845     if existing_file_size == src_obj_metadata.size:
   1846       tracker_file = open(tracker_file_name, 'r')
   1847       tracker_file_data = json.load(tracker_file)
   1848       if (tracker_file_data['etag'] == src_obj_metadata.etag and
   1849           tracker_file_data['generation'] == src_obj_metadata.generation and
   1850           tracker_file_data['num_components'] == num_components):
   1851         return
   1852       else:
   1853         tracker_file.close()
   1854         logger.warn('Sliced download tracker file doesn\'t match for '
   1855                     'download of %s. Restarting download from scratch.' %
   1856                     dst_url.object_name)
   1857 
   1858   except (IOError, ValueError) as e:
   1859     # Ignore non-existent file (happens first time a download
   1860     # is attempted on an object), but warn user for other errors.
   1861     if isinstance(e, ValueError) or e.errno != errno.ENOENT:
   1862       logger.warn('Couldn\'t read sliced download tracker file (%s): %s. '
   1863                   'Restarting download from scratch.' %
   1864                   (tracker_file_name, str(e)))
   1865   finally:
   1866     if fp:
   1867       fp.close()
   1868     if tracker_file:
   1869       tracker_file.close()
   1870 
   1871   # Delete component tracker files to guarantee download starts from scratch.
   1872   DeleteDownloadTrackerFiles(dst_url, api_selector)
   1873 
   1874   # Create a new sliced download tracker file to represent this download.
   1875   try:
   1876     with open(tracker_file_name, 'w') as tracker_file:
   1877       tracker_file_data = {'etag': src_obj_metadata.etag,
   1878                            'generation': src_obj_metadata.generation,
   1879                            'num_components': num_components}
   1880       tracker_file.write(json.dumps(tracker_file_data))
   1881   except IOError as e:
   1882     RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror)
   1883 
   1884 
   1885 class SlicedDownloadFileWrapper(object):
   1886   """Wraps a file object to be used in GetObjectMedia for sliced downloads.
   1887 
   1888   In order to allow resumability, the file object used by each thread in a
   1889   sliced object download should be wrapped using SlicedDownloadFileWrapper.
   1890   Passing a SlicedDownloadFileWrapper object to GetObjectMedia will allow the
   1891   download component tracker file for this component to be updated periodically,
   1892   while the downloaded bytes are normally written to file.
   1893   """
   1894 
   1895   def __init__(self, fp, tracker_file_name, src_obj_metadata, start_byte,
   1896                end_byte):
   1897     """Initializes the SlicedDownloadFileWrapper.
   1898 
   1899     Args:
   1900       fp: The already-open file object to be used for writing in
   1901           GetObjectMedia. Data will be written to file starting at the current
   1902           seek position.
   1903       tracker_file_name: The name of the tracker file for this component.
   1904       src_obj_metadata: Metadata from the source object. Must include etag and
   1905                         generation.
   1906       start_byte: The first byte to be downloaded for this parallel component.
   1907       end_byte: The last byte to be downloaded for this parallel component.
   1908     """
   1909     self._orig_fp = fp
   1910     self._tracker_file_name = tracker_file_name
   1911     self._src_obj_metadata = src_obj_metadata
   1912     self._last_tracker_file_byte = None
   1913     self._start_byte = start_byte
   1914     self._end_byte = end_byte
   1915 
   1916   def write(self, data):  # pylint: disable=invalid-name
   1917     current_file_pos = self._orig_fp.tell()
   1918     assert (self._start_byte <= current_file_pos and
   1919             current_file_pos + len(data) <= self._end_byte + 1)
   1920 
   1921     self._orig_fp.write(data)
   1922     current_file_pos = self._orig_fp.tell()
   1923 
   1924     threshold = TRACKERFILE_UPDATE_THRESHOLD
   1925     if (self._last_tracker_file_byte is None or
   1926         current_file_pos - self._last_tracker_file_byte > threshold or
   1927         current_file_pos == self._end_byte + 1):
   1928       WriteDownloadComponentTrackerFile(
   1929           self._tracker_file_name, self._src_obj_metadata, current_file_pos)
   1930       self._last_tracker_file_byte = current_file_pos
   1931 
   1932   def seek(self, offset, whence=os.SEEK_SET):  # pylint: disable=invalid-name
   1933     if whence == os.SEEK_END:
   1934       self._orig_fp.seek(offset + self._end_byte + 1)
   1935     else:
   1936       self._orig_fp.seek(offset, whence)
   1937     assert self._start_byte <= self._orig_fp.tell() <= self._end_byte + 1
   1938 
   1939   def tell(self):  # pylint: disable=invalid-name
   1940     return self._orig_fp.tell()
   1941 
   1942   def flush(self):  # pylint: disable=invalid-name
   1943     self._orig_fp.flush()
   1944 
   1945   def close(self):  # pylint: disable=invalid-name
   1946     if self._orig_fp:
   1947       self._orig_fp.close()
   1948 
   1949 
   1950 def _PartitionObject(src_url, src_obj_metadata, dst_url,
   1951                      download_file_name):
   1952   """Partitions an object into components to be downloaded.
   1953 
   1954   Each component is a byte range of the object. The byte ranges
   1955   of the returned components are mutually exclusive and collectively
   1956   exhaustive. The byte ranges are inclusive at both end points.
   1957 
   1958   Args:
   1959     src_url: Source CloudUrl.
   1960     src_obj_metadata: Metadata from the source object.
   1961     dst_url: Destination FileUrl.
   1962     download_file_name: Temporary file name to be used for the download.
   1963 
   1964   Returns:
   1965     components_to_download: A list of PerformSlicedDownloadObjectToFileArgs
   1966                             to be used in Apply for the sliced download.
   1967   """
   1968   sliced_download_component_size = HumanReadableToBytes(
   1969       config.get('GSUtil', 'sliced_object_download_component_size',
   1970                  DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE))
   1971 
   1972   max_components = config.getint(
   1973       'GSUtil', 'sliced_object_download_max_components',
   1974       DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS)
   1975 
   1976   num_components, component_size = _GetPartitionInfo(
   1977       src_obj_metadata.size, max_components, sliced_download_component_size)
   1978 
   1979   components_to_download = []
   1980   component_lengths = []
   1981   for i in range(num_components):
   1982     start_byte = i * component_size
   1983     end_byte = min((i + 1) * (component_size) - 1, src_obj_metadata.size - 1)
   1984     component_lengths.append(end_byte - start_byte + 1)
   1985     components_to_download.append(
   1986         PerformSlicedDownloadObjectToFileArgs(
   1987             i, src_url, src_obj_metadata, dst_url, download_file_name,
   1988             start_byte, end_byte))
   1989   return components_to_download, component_lengths
   1990 
   1991 
   1992 def _DoSlicedDownload(src_url, src_obj_metadata, dst_url, download_file_name,
   1993                       command_obj, logger, copy_exception_handler,
   1994                       api_selector):
   1995   """Downloads a cloud object to a local file using sliced download.
   1996 
   1997   Byte ranges are decided for each thread/process, and then the parts are
   1998   downloaded in parallel.
   1999 
   2000   Args:
   2001     src_url: Source CloudUrl.
   2002     src_obj_metadata: Metadata from the source object.
   2003     dst_url: Destination FileUrl.
   2004     download_file_name: Temporary file name to be used for download.
   2005     command_obj: command object for use in Apply in parallel composite uploads.
   2006     logger: for outputting log messages.
   2007     copy_exception_handler: For handling copy exceptions during Apply.
   2008     api_selector: The Cloud API implementation used.
   2009 
   2010   Returns:
   2011     (bytes_transferred, crc32c)
   2012     bytes_transferred: Number of bytes transferred from server this call.
   2013     crc32c: a crc32c hash value (integer) for the downloaded bytes, or None if
   2014             crc32c hashing wasn't performed.
   2015   """
   2016   components_to_download, component_lengths = _PartitionObject(
   2017       src_url, src_obj_metadata, dst_url, download_file_name)
   2018 
   2019   num_components = len(components_to_download)
   2020   _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url,
   2021                                       download_file_name, logger,
   2022                                       api_selector, num_components)
   2023 
   2024   # Resize the download file so each child process can seek to its start byte.
   2025   with open(download_file_name, 'ab') as fp:
   2026     fp.truncate(src_obj_metadata.size)
   2027 
   2028   cp_results = command_obj.Apply(
   2029       _PerformSlicedDownloadObjectToFile, components_to_download,
   2030       copy_exception_handler, arg_checker=gslib.command.DummyArgChecker,
   2031       parallel_operations_override=True, should_return_results=True)
   2032 
   2033   if len(cp_results) < num_components:
   2034     raise CommandException(
   2035         'Some components of %s were not downloaded successfully. '
   2036         'Please retry this download.' % dst_url.object_name)
   2037 
   2038   # Crc32c hashes have to be concatenated in the correct order.
   2039   cp_results = sorted(cp_results, key=attrgetter('component_num'))
   2040   crc32c = cp_results[0].crc32c
   2041   if crc32c is not None:
   2042     for i in range(1, num_components):
   2043       crc32c = ConcatCrc32c(crc32c, cp_results[i].crc32c,
   2044                             component_lengths[i])
   2045 
   2046   bytes_transferred = 0
   2047   expect_gzip = (src_obj_metadata.contentEncoding and
   2048                  src_obj_metadata.contentEncoding.lower().endswith('gzip'))
   2049   for cp_result in cp_results:
   2050     bytes_transferred += cp_result.bytes_transferred
   2051     server_gzip = (cp_result.server_encoding and
   2052                    cp_result.server_encoding.lower().endswith('gzip'))
   2053     # If the server gzipped any components on the fly, we will have no chance of
   2054     # properly reconstructing the file.
   2055     if server_gzip and not expect_gzip:
   2056       raise CommandException(
   2057           'Download of %s failed because the server sent back data with an '
   2058           'unexpected encoding.' % dst_url.object_name)
   2059 
   2060   return bytes_transferred, crc32c
   2061 
   2062 
   2063 def _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
   2064                                    download_file_name, gsutil_api, logger,
   2065                                    digesters, component_num=None, start_byte=0,
   2066                                    end_byte=None):
   2067   """Downloads an object to a local file using the resumable strategy.
   2068 
   2069   Args:
   2070     src_url: Source CloudUrl.
   2071     src_obj_metadata: Metadata from the source object.
   2072     dst_url: Destination FileUrl.
   2073     download_file_name: Temporary file name to be used for download.
   2074     gsutil_api: gsutil Cloud API instance to use for the download.
   2075     logger: for outputting log messages.
   2076     digesters: Digesters corresponding to the hash algorithms that will be used
   2077                for validation.
   2078     component_num: Which component of a sliced download this call is for, or
   2079                    None if this is not a sliced download.
   2080     start_byte: The first byte of a byte range for a sliced download.
   2081     end_byte: The last byte of a byte range for a sliced download.
   2082 
   2083   Returns:
   2084     (bytes_transferred, server_encoding)
   2085     bytes_transferred: Number of bytes transferred from server this call.
   2086     server_encoding: Content-encoding string if it was detected that the server
   2087                      sent encoded bytes during transfer, None otherwise.
   2088   """
   2089   if end_byte is None:
   2090     end_byte = src_obj_metadata.size - 1
   2091   download_size = end_byte - start_byte + 1
   2092 
   2093   is_sliced = component_num is not None
   2094   api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
   2095   server_encoding = None
   2096 
   2097   # Used for logging
   2098   download_name = dst_url.object_name
   2099   if is_sliced:
   2100     download_name += ' component %d' % component_num
   2101 
   2102   try:
   2103     fp = open(download_file_name, 'r+b')
   2104     fp.seek(start_byte)
   2105     api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
   2106     existing_file_size = GetFileSize(fp)
   2107 
   2108     tracker_file_name, download_start_byte = (
   2109         ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger,
   2110                                         api_selector, start_byte,
   2111                                         existing_file_size, component_num))
   2112 
   2113     if download_start_byte < start_byte or download_start_byte > end_byte + 1:
   2114       DeleteTrackerFile(tracker_file_name)
   2115       raise CommandException(
   2116           'Resumable download start point for %s is not in the correct byte '
   2117           'range. Deleting tracker file, so if you re-try this download it '
   2118           'will start from scratch' % download_name)
   2119 
   2120     download_complete = (download_start_byte == start_byte + download_size)
   2121     resuming = (download_start_byte != start_byte) and not download_complete
   2122     if resuming:
   2123       logger.info('Resuming download for %s', download_name)
   2124     elif download_complete:
   2125       logger.info(
   2126           'Download already complete for %s, skipping download but '
   2127           'will run integrity checks.', download_name)
   2128 
   2129     # This is used for resuming downloads, but also for passing the mediaLink
   2130     # and size into the download for new downloads so that we can avoid
   2131     # making an extra HTTP call.
   2132     serialization_data = GetDownloadSerializationData(
   2133         src_obj_metadata, progress=download_start_byte)
   2134 
   2135     if resuming or download_complete:
   2136       # Catch up our digester with the hash data.
   2137       bytes_digested = 0
   2138       total_bytes_to_digest = download_start_byte - start_byte
   2139       hash_callback = ProgressCallbackWithBackoff(
   2140           total_bytes_to_digest,
   2141           FileProgressCallbackHandler(
   2142               ConstructAnnounceText('Hashing',
   2143                                     dst_url.url_string), logger).call)
   2144 
   2145       while bytes_digested < total_bytes_to_digest:
   2146         bytes_to_read = min(DEFAULT_FILE_BUFFER_SIZE,
   2147                             total_bytes_to_digest - bytes_digested)
   2148         data = fp.read(bytes_to_read)
   2149         bytes_digested += bytes_to_read
   2150         for alg_name in digesters:
   2151           digesters[alg_name].update(data)
   2152         hash_callback.Progress(len(data))
   2153 
   2154     elif not is_sliced:
   2155       # Delete file contents and start entire object download from scratch.
   2156       fp.truncate(0)
   2157       existing_file_size = 0
   2158 
   2159     progress_callback = FileProgressCallbackHandler(
   2160         ConstructAnnounceText('Downloading', dst_url.url_string), logger,
   2161         start_byte, download_size).call
   2162 
   2163     if global_copy_helper_opts.test_callback_file:
   2164       with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
   2165         progress_callback = pickle.loads(test_fp.read()).call
   2166 
   2167     if is_sliced and src_obj_metadata.size >= ResumableThreshold():
   2168       fp = SlicedDownloadFileWrapper(fp, tracker_file_name, src_obj_metadata,
   2169                                      start_byte, end_byte)
   2170 
   2171     # TODO: With gzip encoding (which may occur on-the-fly and not be part of
   2172     # the object's metadata), when we request a range to resume, it's possible
   2173     # that the server will just resend the entire object, which means our
   2174     # caught-up hash will be incorrect.  We recalculate the hash on
   2175     # the local file in the case of a failed gzip hash anyway, but it would
   2176     # be better if we actively detected this case.
   2177     if not download_complete:
   2178       fp.seek(download_start_byte)
   2179       server_encoding = gsutil_api.GetObjectMedia(
   2180           src_url.bucket_name, src_url.object_name, fp,
   2181           start_byte=download_start_byte, end_byte=end_byte,
   2182           generation=src_url.generation, object_size=src_obj_metadata.size,
   2183           download_strategy=CloudApi.DownloadStrategy.RESUMABLE,
   2184           provider=src_url.scheme, serialization_data=serialization_data,
   2185           digesters=digesters, progress_callback=progress_callback)
   2186 
   2187   except ResumableDownloadException as e:
   2188     logger.warning('Caught ResumableDownloadException (%s) for download of %s.',
   2189                    e.reason, download_name)
   2190     raise
   2191   finally:
   2192     if fp:
   2193       fp.close()
   2194 
   2195   bytes_transferred = end_byte - download_start_byte + 1
   2196   return bytes_transferred, server_encoding
   2197 
   2198 
   2199 def _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url,
   2200                                       download_file_name, gsutil_api, logger,
   2201                                       digesters):
   2202   """Downloads an object to a local file using the non-resumable strategy.
   2203 
   2204   Args:
   2205     src_url: Source CloudUrl.
   2206     src_obj_metadata: Metadata from the source object.
   2207     dst_url: Destination FileUrl.
   2208     download_file_name: Temporary file name to be used for download.
   2209     gsutil_api: gsutil Cloud API instance to use for the download.
   2210     logger: for outputting log messages.
   2211     digesters: Digesters corresponding to the hash algorithms that will be used
   2212                for validation.
   2213   Returns:
   2214     (bytes_transferred, server_encoding)
   2215     bytes_transferred: Number of bytes transferred from server this call.
   2216     server_encoding: Content-encoding string if it was detected that the server
   2217                      sent encoded bytes during transfer, None otherwise.
   2218   """
   2219   try:
   2220     fp = open(download_file_name, 'w')
   2221 
   2222     # This is used to pass the mediaLink and the size into the download so that
   2223     # we can avoid making an extra HTTP call.
   2224     serialization_data = GetDownloadSerializationData(src_obj_metadata)
   2225 
   2226     progress_callback = FileProgressCallbackHandler(
   2227         ConstructAnnounceText('Downloading', dst_url.url_string), logger).call
   2228 
   2229     if global_copy_helper_opts.test_callback_file:
   2230       with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
   2231         progress_callback = pickle.loads(test_fp.read()).call
   2232 
   2233     server_encoding = gsutil_api.GetObjectMedia(
   2234         src_url.bucket_name, src_url.object_name, fp,
   2235         generation=src_url.generation, object_size=src_obj_metadata.size,
   2236         download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
   2237         provider=src_url.scheme, serialization_data=serialization_data,
   2238         digesters=digesters, progress_callback=progress_callback)
   2239   finally:
   2240     if fp:
   2241       fp.close()
   2242 
   2243   return src_obj_metadata.size, server_encoding
   2244 
   2245 
   2246 def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
   2247                           gsutil_api, logger, command_obj,
   2248                           copy_exception_handler, allow_splitting=True):
   2249   """Downloads an object to a local file.
   2250 
   2251   Args:
   2252     src_url: Source CloudUrl.
   2253     src_obj_metadata: Metadata from the source object.
   2254     dst_url: Destination FileUrl.
   2255     gsutil_api: gsutil Cloud API instance to use for the download.
   2256     logger: for outputting log messages.
   2257     command_obj: command object for use in Apply in sliced downloads.
   2258     copy_exception_handler: For handling copy exceptions during Apply.
   2259     allow_splitting: Whether or not to allow sliced download.
   2260   Returns:
   2261     (elapsed_time, bytes_transferred, dst_url, md5), where time elapsed
   2262     excludes initial GET.
   2263 
   2264   Raises:
   2265     FileConcurrencySkipError: if this download is already in progress.
   2266     CommandException: if other errors encountered.
   2267   """
   2268   global open_files_map, open_files_lock
   2269   if dst_url.object_name.endswith(dst_url.delim):
   2270     logger.warn('\n'.join(textwrap.wrap(
   2271         'Skipping attempt to download to filename ending with slash (%s). This '
   2272         'typically happens when using gsutil to download from a subdirectory '
   2273         'created by the Cloud Console (https://cloud.google.com/console)'
   2274         % dst_url.object_name)))
   2275     return (0, 0, dst_url, '')
   2276 
   2277   api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
   2278   download_strategy = _SelectDownloadStrategy(dst_url)
   2279   sliced_download = _ShouldDoSlicedDownload(
   2280       download_strategy, src_obj_metadata, allow_splitting, logger)
   2281 
   2282   download_file_name, need_to_unzip = _GetDownloadFile(
   2283       dst_url, src_obj_metadata, logger)
   2284 
   2285   # Ensure another process/thread is not already writing to this file.
   2286   with open_files_lock:
   2287     if open_files_map.get(download_file_name, False):
   2288       raise FileConcurrencySkipError
   2289     open_files_map[download_file_name] = True
   2290 
   2291   # Set up hash digesters.
   2292   consider_md5 = src_obj_metadata.md5Hash and not sliced_download
   2293   hash_algs = GetDownloadHashAlgs(logger, consider_md5=consider_md5,
   2294                                   consider_crc32c=src_obj_metadata.crc32c)
   2295   digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
   2296 
   2297   # Tracks whether the server used a gzip encoding.
   2298   server_encoding = None
   2299   download_complete = (src_obj_metadata.size == 0)
   2300   bytes_transferred = 0
   2301 
   2302   start_time = time.time()
   2303   if not download_complete:
   2304     if sliced_download:
   2305       (bytes_transferred, crc32c) = (
   2306           _DoSlicedDownload(src_url, src_obj_metadata, dst_url,
   2307                             download_file_name, command_obj, logger,
   2308                             copy_exception_handler, api_selector))
   2309       if 'crc32c' in digesters:
   2310         digesters['crc32c'].crcValue = crc32c
   2311     elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
   2312       (bytes_transferred, server_encoding) = (
   2313           _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url,
   2314                                             download_file_name, gsutil_api,
   2315                                             logger, digesters))
   2316     elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
   2317       (bytes_transferred, server_encoding) = (
   2318           _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
   2319                                          download_file_name, gsutil_api, logger,
   2320                                          digesters))
   2321     else:
   2322       raise CommandException('Invalid download strategy %s chosen for'
   2323                              'file %s' % (download_strategy,
   2324                                           download_file_name))
   2325   end_time = time.time()
   2326 
   2327   server_gzip = server_encoding and server_encoding.lower().endswith('gzip')
   2328   local_md5 = _ValidateAndCompleteDownload(
   2329       logger, src_url, src_obj_metadata, dst_url, need_to_unzip, server_gzip,
   2330       digesters, hash_algs, download_file_name, api_selector, bytes_transferred)
   2331 
   2332   with open_files_lock:
   2333     open_files_map.delete(download_file_name)
   2334 
   2335   return (end_time - start_time, bytes_transferred, dst_url, local_md5)
   2336 
   2337 
   2338 def _GetDownloadTempZipFileName(dst_url):
   2339   """Returns temporary file name for a temporarily compressed download."""
   2340   return '%s_.gztmp' % dst_url.object_name
   2341 
   2342 
   2343 def _GetDownloadTempFileName(dst_url):
   2344   """Returns temporary download file name for uncompressed downloads."""
   2345   return '%s_.gstmp' % dst_url.object_name
   2346 
   2347 
   2348 def _ValidateAndCompleteDownload(logger, src_url, src_obj_metadata, dst_url,
   2349                                  need_to_unzip, server_gzip, digesters,
   2350                                  hash_algs, download_file_name,
   2351                                  api_selector, bytes_transferred):
   2352   """Validates and performs necessary operations on a downloaded file.
   2353 
   2354   Validates the integrity of the downloaded file using hash_algs. If the file
   2355   was compressed (temporarily), the file will be decompressed. Then, if the
   2356   integrity of the file was successfully validated, the file will be moved
   2357   from its temporary download location to its permanent location on disk.
   2358 
   2359   Args:
   2360     logger: For outputting log messages.
   2361     src_url: StorageUrl for the source object.
   2362     src_obj_metadata: Metadata for the source object, potentially containing
   2363                       hash values.
   2364     dst_url: StorageUrl describing the destination file.
   2365     need_to_unzip: If true, a temporary zip file was used and must be
   2366                    uncompressed as part of validation.
   2367     server_gzip: If true, the server gzipped the bytes (regardless of whether
   2368                  the object metadata claimed it was gzipped).
   2369     digesters: dict of {string, hash digester} that contains up-to-date digests
   2370                computed during the download. If a digester for a particular
   2371                algorithm is None, an up-to-date digest is not available and the
   2372                hash must be recomputed from the local file.
   2373     hash_algs: dict of {string, hash algorithm} that can be used if digesters
   2374                don't have up-to-date digests.
   2375     download_file_name: Temporary file name that was used for download.
   2376     api_selector: The Cloud API implementation used (used tracker file naming).
   2377     bytes_transferred: Number of bytes downloaded (used for logging).
   2378 
   2379   Returns:
   2380     An MD5 of the local file, if one was calculated as part of the integrity
   2381     check.
   2382   """
   2383   final_file_name = dst_url.object_name
   2384   file_name = download_file_name
   2385   digesters_succeeded = True
   2386 
   2387   for alg in digesters:
   2388     # If we get a digester with a None algorithm, the underlying
   2389     # implementation failed to calculate a digest, so we will need to
   2390     # calculate one from scratch.
   2391     if not digesters[alg]:
   2392       digesters_succeeded = False
   2393       break
   2394 
   2395   if digesters_succeeded:
   2396     local_hashes = _CreateDigestsFromDigesters(digesters)
   2397   else:
   2398     local_hashes = _CreateDigestsFromLocalFile(
   2399         logger, hash_algs, file_name, final_file_name, src_obj_metadata)
   2400 
   2401   digest_verified = True
   2402   hash_invalid_exception = None
   2403   try:
   2404     _CheckHashes(logger, src_url, src_obj_metadata, final_file_name,
   2405                  local_hashes)
   2406     DeleteDownloadTrackerFiles(dst_url, api_selector)
   2407   except HashMismatchException, e:
   2408     # If an non-gzipped object gets sent with gzip content encoding, the hash
   2409     # we calculate will match the gzipped bytes, not the original object. Thus,
   2410     # we'll need to calculate and check it after unzipping.
   2411     if server_gzip:
   2412       logger.debug(
   2413           'Hash did not match but server gzipped the content, will '
   2414           'recalculate.')
   2415       digest_verified = False
   2416     elif api_selector == ApiSelector.XML:
   2417       logger.debug(
   2418           'Hash did not match but server may have gzipped the content, will '
   2419           'recalculate.')
   2420       # Save off the exception in case this isn't a gzipped file.
   2421       hash_invalid_exception = e
   2422       digest_verified = False
   2423     else:
   2424       DeleteDownloadTrackerFiles(dst_url, api_selector)
   2425       if _RENAME_ON_HASH_MISMATCH:
   2426         os.rename(file_name,
   2427                   final_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
   2428       else:
   2429         os.unlink(file_name)
   2430       raise
   2431 
   2432   if need_to_unzip or server_gzip:
   2433     # Log that we're uncompressing if the file is big enough that
   2434     # decompressing would make it look like the transfer "stalled" at the end.
   2435     if bytes_transferred > TEN_MIB:
   2436       logger.info(
   2437           'Uncompressing temporarily gzipped file to %s...', final_file_name)
   2438 
   2439     gzip_fp = None
   2440     try:
   2441       # Downloaded temporarily gzipped file, unzip to file without '_.gztmp'
   2442       # suffix.
   2443       gzip_fp = gzip.open(file_name, 'rb')
   2444       with open(final_file_name, 'wb') as f_out:
   2445         data = gzip_fp.read(GZIP_CHUNK_SIZE)
   2446         while data:
   2447           f_out.write(data)
   2448           data = gzip_fp.read(GZIP_CHUNK_SIZE)
   2449     except IOError, e:
   2450       # In the XML case where we don't know if the file was gzipped, raise
   2451       # the original hash exception if we find that it wasn't.
   2452       if 'Not a gzipped file' in str(e) and hash_invalid_exception:
   2453         # Linter improperly thinks we're raising None despite the above check.
   2454         # pylint: disable=raising-bad-type
   2455         raise hash_invalid_exception
   2456     finally:
   2457       if gzip_fp:
   2458         gzip_fp.close()
   2459 
   2460     os.unlink(file_name)
   2461     file_name = final_file_name
   2462 
   2463   if not digest_verified:
   2464     try:
   2465       # Recalculate hashes on the unzipped local file.
   2466       local_hashes = _CreateDigestsFromLocalFile(
   2467           logger, hash_algs, file_name, final_file_name, src_obj_metadata)
   2468       _CheckHashes(logger, src_url, src_obj_metadata, final_file_name,
   2469                    local_hashes)
   2470       DeleteDownloadTrackerFiles(dst_url, api_selector)
   2471     except HashMismatchException:
   2472       DeleteDownloadTrackerFiles(dst_url, api_selector)
   2473       if _RENAME_ON_HASH_MISMATCH:
   2474         os.rename(file_name,
   2475                   file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
   2476       else:
   2477         os.unlink(file_name)
   2478       raise
   2479 
   2480   if file_name != final_file_name:
   2481     # Data is still in a temporary file, so move it to a permanent location.
   2482     if os.path.exists(final_file_name):
   2483       os.unlink(final_file_name)
   2484     os.rename(file_name,
   2485               final_file_name)
   2486 
   2487   if 'md5' in local_hashes:
   2488     return local_hashes['md5']
   2489 
   2490 
   2491 def _CopyFileToFile(src_url, dst_url):
   2492   """Copies a local file to a local file.
   2493 
   2494   Args:
   2495     src_url: Source FileUrl.
   2496     dst_url: Destination FileUrl.
   2497   Returns:
   2498     (elapsed_time, bytes_transferred, dst_url, md5=None).
   2499 
   2500   Raises:
   2501     CommandException: if errors encountered.
   2502   """
   2503   src_fp = GetStreamFromFileUrl(src_url)
   2504   dir_name = os.path.dirname(dst_url.object_name)
   2505   if dir_name and not os.path.exists(dir_name):
   2506     os.makedirs(dir_name)
   2507   dst_fp = open(dst_url.object_name, 'wb')
   2508   start_time = time.time()
   2509   shutil.copyfileobj(src_fp, dst_fp)
   2510   end_time = time.time()
   2511   return (end_time - start_time, os.path.getsize(dst_url.object_name),
   2512           dst_url, None)
   2513 
   2514 
   2515 def _DummyTrackerCallback(_):
   2516   pass
   2517 
   2518 
   2519 # pylint: disable=undefined-variable
   2520 def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url,
   2521                                 dst_obj_metadata, preconditions, gsutil_api,
   2522                                 logger):
   2523   """Copies from src_url to dst_url in "daisy chain" mode.
   2524 
   2525   See -D OPTION documentation about what daisy chain mode is.
   2526 
   2527   Args:
   2528     src_url: Source CloudUrl
   2529     src_obj_metadata: Metadata from source object
   2530     dst_url: Destination CloudUrl
   2531     dst_obj_metadata: Object-specific metadata that should be overidden during
   2532                       the copy.
   2533     preconditions: Preconditions to use for the copy.
   2534     gsutil_api: gsutil Cloud API to use for the copy.
   2535     logger: For outputting log messages.
   2536 
   2537   Returns:
   2538     (elapsed_time, bytes_transferred, dst_url with generation,
   2539     md5 hash of destination) excluding overhead like initial GET.
   2540 
   2541   Raises:
   2542     CommandException: if errors encountered.
   2543   """
   2544   # We don't attempt to preserve ACLs across providers because
   2545   # GCS and S3 support different ACLs and disjoint principals.
   2546   if (global_copy_helper_opts.preserve_acl
   2547       and src_url.scheme != dst_url.scheme):
   2548     raise NotImplementedError(
   2549         'Cross-provider cp -p not supported')
   2550   if not global_copy_helper_opts.preserve_acl:
   2551     dst_obj_metadata.acl = []
   2552 
   2553   # Don't use callbacks for downloads on the daisy chain wrapper because
   2554   # upload callbacks will output progress, but respect test hooks if present.
   2555   progress_callback = None
   2556   if global_copy_helper_opts.test_callback_file:
   2557     with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
   2558       progress_callback = pickle.loads(test_fp.read()).call
   2559 
   2560   start_time = time.time()
   2561   upload_fp = DaisyChainWrapper(src_url, src_obj_metadata.size, gsutil_api,
   2562                                 progress_callback=progress_callback)
   2563   uploaded_object = None
   2564   if src_obj_metadata.size == 0:
   2565     # Resumable uploads of size 0 are not supported.
   2566     uploaded_object = gsutil_api.UploadObject(
   2567         upload_fp, object_metadata=dst_obj_metadata,
   2568         canned_acl=global_copy_helper_opts.canned_acl,
   2569         preconditions=preconditions, provider=dst_url.scheme,
   2570         fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size)
   2571   else:
   2572     # TODO: Support process-break resumes. This will resume across connection
   2573     # breaks and server errors, but the tracker callback is a no-op so this
   2574     # won't resume across gsutil runs.
   2575     # TODO: Test retries via test_callback_file.
   2576     uploaded_object = gsutil_api.UploadObjectResumable(
   2577         upload_fp, object_metadata=dst_obj_metadata,
   2578         canned_acl=global_copy_helper_opts.canned_acl,
   2579         preconditions=preconditions, provider=dst_url.scheme,
   2580         fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size,
   2581         progress_callback=FileProgressCallbackHandler(
   2582             ConstructAnnounceText('Uploading', dst_url.url_string),
   2583             logger).call,
   2584         tracker_callback=_DummyTrackerCallback)
   2585   end_time = time.time()
   2586 
   2587   try:
   2588     _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata,
   2589                       uploaded_object)
   2590   except HashMismatchException:
   2591     if _RENAME_ON_HASH_MISMATCH:
   2592       corrupted_obj_metadata = apitools_messages.Object(
   2593           name=dst_obj_metadata.name,
   2594           bucket=dst_obj_metadata.bucket,
   2595           etag=uploaded_object.etag)
   2596       dst_obj_metadata.name = (dst_url.object_name +
   2597                                _RENAME_ON_HASH_MISMATCH_SUFFIX)
   2598       gsutil_api.CopyObject(corrupted_obj_metadata,
   2599                             dst_obj_metadata, provider=dst_url.scheme)
   2600     # If the digest doesn't match, delete the object.
   2601     gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name,
   2602                             generation=uploaded_object.generation,
   2603                             provider=dst_url.scheme)
   2604     raise
   2605 
   2606   result_url = dst_url.Clone()
   2607   result_url.generation = GenerationFromUrlAndString(
   2608       result_url, uploaded_object.generation)
   2609 
   2610   return (end_time - start_time, src_obj_metadata.size, result_url,
   2611           uploaded_object.md5Hash)
   2612 
   2613 
   2614 # pylint: disable=undefined-variable
   2615 # pylint: disable=too-many-statements
   2616 def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
   2617                 copy_exception_handler, allow_splitting=True,
   2618                 headers=None, manifest=None, gzip_exts=None):
   2619   """Performs copy from src_url to dst_url, handling various special cases.
   2620 
   2621   Args:
   2622     logger: for outputting log messages.
   2623     src_url: Source StorageUrl.
   2624     dst_url: Destination StorageUrl.
   2625     gsutil_api: gsutil Cloud API instance to use for the copy.
   2626     command_obj: command object for use in Apply in parallel composite uploads
   2627         and sliced object downloads.
   2628     copy_exception_handler: for handling copy exceptions during Apply.
   2629     allow_splitting: Whether to allow the file to be split into component
   2630                      pieces for an parallel composite upload or download.
   2631     headers: optional headers to use for the copy operation.
   2632     manifest: optional manifest for tracking copy operations.
   2633     gzip_exts: List of file extensions to gzip for uploads, if any.
   2634 
   2635   Returns:
   2636     (elapsed_time, bytes_transferred, version-specific dst_url) excluding
   2637     overhead like initial GET.
   2638 
   2639   Raises:
   2640     ItemExistsError: if no clobber flag is specified and the destination
   2641         object already exists.
   2642     SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified
   2643         and the source is an unsupported type.
   2644     CommandException: if other errors encountered.
   2645   """
   2646   if headers:
   2647     dst_obj_headers = headers.copy()
   2648   else:
   2649     dst_obj_headers = {}
   2650 
   2651   # Create a metadata instance for each destination object so metadata
   2652   # such as content-type can be applied per-object.
   2653   # Initialize metadata from any headers passed in via -h.
   2654   dst_obj_metadata = ObjectMetadataFromHeaders(dst_obj_headers)
   2655 
   2656   if dst_url.IsCloudUrl() and dst_url.scheme == 'gs':
   2657     preconditions = PreconditionsFromHeaders(dst_obj_headers)
   2658   else:
   2659     preconditions = Preconditions()
   2660 
   2661   src_obj_metadata = None
   2662   src_obj_filestream = None
   2663   if src_url.IsCloudUrl():
   2664     src_obj_fields = None
   2665     if dst_url.IsCloudUrl():
   2666       # For cloud or daisy chain copy, we need every copyable field.
   2667       # If we're not modifying or overriding any of the fields, we can get
   2668       # away without retrieving the object metadata because the copy
   2669       # operation can succeed with just the destination bucket and object
   2670       # name.  But if we are sending any metadata, the JSON API will expect a
   2671       # complete object resource.  Since we want metadata like the object size
   2672       # for our own tracking, we just get all of the metadata here.
   2673       src_obj_fields = ['cacheControl', 'componentCount',
   2674                         'contentDisposition', 'contentEncoding',
   2675                         'contentLanguage', 'contentType', 'crc32c',
   2676                         'etag', 'generation', 'md5Hash', 'mediaLink',
   2677                         'metadata', 'metageneration', 'size']
   2678       # We only need the ACL if we're going to preserve it.
   2679       if global_copy_helper_opts.preserve_acl:
   2680         src_obj_fields.append('acl')
   2681       if (src_url.scheme == dst_url.scheme
   2682           and not global_copy_helper_opts.daisy_chain):
   2683         copy_in_the_cloud = True
   2684       else:
   2685         copy_in_the_cloud = False
   2686     else:
   2687       # Just get the fields needed to validate the download.
   2688       src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag',
   2689                         'mediaLink', 'md5Hash', 'size', 'generation']
   2690 
   2691     if (src_url.scheme == 's3' and
   2692         global_copy_helper_opts.skip_unsupported_objects):
   2693       src_obj_fields.append('storageClass')
   2694 
   2695     try:
   2696       src_generation = GenerationFromUrlAndString(src_url, src_url.generation)
   2697       src_obj_metadata = gsutil_api.GetObjectMetadata(
   2698           src_url.bucket_name, src_url.object_name,
   2699           generation=src_generation, provider=src_url.scheme,
   2700           fields=src_obj_fields)
   2701     except NotFoundException:
   2702       raise CommandException(
   2703           'NotFoundException: Could not retrieve source object %s.' %
   2704           src_url.url_string)
   2705     if (src_url.scheme == 's3' and
   2706         global_copy_helper_opts.skip_unsupported_objects and
   2707         src_obj_metadata.storageClass == 'GLACIER'):
   2708       raise SkipGlacierError()
   2709 
   2710     src_obj_size = src_obj_metadata.size
   2711     dst_obj_metadata.contentType = src_obj_metadata.contentType
   2712     if global_copy_helper_opts.preserve_acl:
   2713       dst_obj_metadata.acl = src_obj_metadata.acl
   2714       # Special case for S3-to-S3 copy URLs using
   2715       # global_copy_helper_opts.preserve_acl.
   2716       # dst_url will be verified in _CopyObjToObjDaisyChainMode if it
   2717       # is not s3 (and thus differs from src_url).
   2718       if src_url.scheme == 's3':
   2719         acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata)
   2720         if acl_text:
   2721           AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text)
   2722   else:
   2723     try:
   2724       src_obj_filestream = GetStreamFromFileUrl(src_url)
   2725     except Exception, e:  # pylint: disable=broad-except
   2726       if command_obj.continue_on_error:
   2727         message = 'Error copying %s: %s' % (src_url, str(e))
   2728         command_obj.op_failure_count += 1
   2729         logger.error(message)
   2730         return
   2731       else:
   2732         raise CommandException('Error opening file "%s": %s.' % (src_url,
   2733                                                                  e.message))
   2734     if src_url.IsStream():
   2735       src_obj_size = None
   2736     else:
   2737       src_obj_size = os.path.getsize(src_url.object_name)
   2738 
   2739   if global_copy_helper_opts.use_manifest:
   2740     # Set the source size in the manifest.
   2741     manifest.Set(src_url.url_string, 'size', src_obj_size)
   2742 
   2743   if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE
   2744       and src_url != 's3'):
   2745     raise CommandException(
   2746         '"%s" exceeds the maximum gsutil-supported size for an S3 upload. S3 '
   2747         'objects greater than %s in size require multipart uploads, which '
   2748         'gsutil does not support.' % (src_url,
   2749                                       MakeHumanReadable(S3_MAX_UPLOAD_SIZE)))
   2750 
   2751   # On Windows, stdin is opened as text mode instead of binary which causes
   2752   # problems when piping a binary file, so this switches it to binary mode.
   2753   if IS_WINDOWS and src_url.IsFileUrl() and src_url.IsStream():
   2754     msvcrt.setmode(GetStreamFromFileUrl(src_url).fileno(), os.O_BINARY)
   2755 
   2756   if global_copy_helper_opts.no_clobber:
   2757     # There are two checks to prevent clobbering:
   2758     # 1) The first check is to see if the URL
   2759     #    already exists at the destination and prevent the upload/download
   2760     #    from happening. This is done by the exists() call.
   2761     # 2) The second check is only relevant if we are writing to gs. We can
   2762     #    enforce that the server only writes the object if it doesn't exist
   2763     #    by specifying the header below. This check only happens at the
   2764     #    server after the complete file has been uploaded. We specify this
   2765     #    header to prevent a race condition where a destination file may
   2766     #    be created after the first check and before the file is fully
   2767     #    uploaded.
   2768     # In order to save on unnecessary uploads/downloads we perform both
   2769     # checks. However, this may come at the cost of additional HTTP calls.
   2770     if preconditions.gen_match:
   2771       raise ArgumentException('Specifying x-goog-if-generation-match is '
   2772                               'not supported with cp -n')
   2773     else:
   2774       preconditions.gen_match = 0
   2775     if dst_url.IsFileUrl() and os.path.exists(dst_url.object_name):
   2776       # The local file may be a partial. Check the file sizes.
   2777       if src_obj_size == os.path.getsize(dst_url.object_name):
   2778         raise ItemExistsError()
   2779     elif dst_url.IsCloudUrl():
   2780       try:
   2781         dst_object = gsutil_api.GetObjectMetadata(
   2782             dst_url.bucket_name, dst_url.object_name, provider=dst_url.scheme)
   2783       except NotFoundException:
   2784         dst_object = None
   2785       if dst_object:
   2786         raise ItemExistsError()
   2787 
   2788   if dst_url.IsCloudUrl():
   2789     # Cloud storage API gets object and bucket name from metadata.
   2790     dst_obj_metadata.name = dst_url.object_name
   2791     dst_obj_metadata.bucket = dst_url.bucket_name
   2792     if src_url.IsCloudUrl():
   2793       # Preserve relevant metadata from the source object if it's not already
   2794       # provided from the headers.
   2795       CopyObjectMetadata(src_obj_metadata, dst_obj_metadata, override=False)
   2796       src_obj_metadata.name = src_url.object_name
   2797       src_obj_metadata.bucket = src_url.bucket_name
   2798     else:
   2799       _SetContentTypeFromFile(src_url, dst_obj_metadata)
   2800   else:
   2801     # Files don't have Cloud API metadata.
   2802     dst_obj_metadata = None
   2803 
   2804   _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata)
   2805 
   2806   if src_url.IsCloudUrl():
   2807     if dst_url.IsFileUrl():
   2808       return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
   2809                                    gsutil_api, logger, command_obj,
   2810                                    copy_exception_handler,
   2811                                    allow_splitting=allow_splitting)
   2812     elif copy_in_the_cloud:
   2813       return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
   2814                                      dst_obj_metadata, preconditions,
   2815                                      gsutil_api, logger)
   2816     else:
   2817       return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata,
   2818                                          dst_url, dst_obj_metadata,
   2819                                          preconditions, gsutil_api, logger)
   2820   else:  # src_url.IsFileUrl()
   2821     if dst_url.IsCloudUrl():
   2822       return _UploadFileToObject(
   2823           src_url, src_obj_filestream, src_obj_size, dst_url,
   2824           dst_obj_metadata, preconditions, gsutil_api, logger, command_obj,
   2825           copy_exception_handler, gzip_exts=gzip_exts,
   2826           allow_splitting=allow_splitting)
   2827     else:  # dst_url.IsFileUrl()
   2828       return _CopyFileToFile(src_url, dst_url)
   2829 
   2830 
   2831 class Manifest(object):
   2832   """Stores the manifest items for the CpCommand class."""
   2833 
   2834   def __init__(self, path):
   2835     # self.items contains a dictionary of rows
   2836     self.items = {}
   2837     self.manifest_filter = {}
   2838     self.lock = CreateLock()
   2839 
   2840     self.manifest_path = os.path.expanduser(path)
   2841     self._ParseManifest()
   2842     self._CreateManifestFile()
   2843 
   2844   def _ParseManifest(self):
   2845     """Load and parse a manifest file.
   2846 
   2847     This information will be used to skip any files that have a skip or OK
   2848     status.
   2849     """
   2850     try:
   2851       if os.path.exists(self.manifest_path):
   2852         with open(self.manifest_path, 'rb') as f:
   2853           first_row = True
   2854           reader = csv.reader(f)
   2855           for row in reader:
   2856             if first_row:
   2857               try:
   2858                 source_index = row.index('Source')
   2859                 result_index = row.index('Result')
   2860               except ValueError:
   2861                 # No header and thus not a valid manifest file.
   2862                 raise CommandException(
   2863                     'Missing headers in manifest file: %s' % self.manifest_path)
   2864             first_row = False
   2865             source = row[source_index]
   2866             result = row[result_index]
   2867             if result in ['OK', 'skip']:
   2868               # We're always guaranteed to take the last result of a specific
   2869               # source url.
   2870               self.manifest_filter[source] = result
   2871     except IOError:
   2872       raise CommandException('Could not parse %s' % self.manifest_path)
   2873 
   2874   def WasSuccessful(self, src):
   2875     """Returns whether the specified src url was marked as successful."""
   2876     return src in self.manifest_filter
   2877 
   2878   def _CreateManifestFile(self):
   2879     """Opens the manifest file and assigns it to the file pointer."""
   2880     try:
   2881       if ((not os.path.exists(self.manifest_path))
   2882           or (os.stat(self.manifest_path).st_size == 0)):
   2883         # Add headers to the new file.
   2884         with open(self.manifest_path, 'wb', 1) as f:
   2885           writer = csv.writer(f)
   2886           writer.writerow(['Source',
   2887                            'Destination',
   2888                            'Start',
   2889                            'End',
   2890                            'Md5',
   2891                            'UploadId',
   2892                            'Source Size',
   2893                            'Bytes Transferred',
   2894                            'Result',
   2895                            'Description'])
   2896     except IOError:
   2897       raise CommandException('Could not create manifest file.')
   2898 
   2899   def Set(self, url, key, value):
   2900     if value is None:
   2901       # In case we don't have any information to set we bail out here.
   2902       # This is so that we don't clobber existing information.
   2903       # To zero information pass '' instead of None.
   2904       return
   2905     if url in self.items:
   2906       self.items[url][key] = value
   2907     else:
   2908       self.items[url] = {key: value}
   2909 
   2910   def Initialize(self, source_url, destination_url):
   2911     # Always use the source_url as the key for the item. This is unique.
   2912     self.Set(source_url, 'source_uri', source_url)
   2913     self.Set(source_url, 'destination_uri', destination_url)
   2914     self.Set(source_url, 'start_time', datetime.datetime.utcnow())
   2915 
   2916   def SetResult(self, source_url, bytes_transferred, result,
   2917                 description=''):
   2918     self.Set(source_url, 'bytes', bytes_transferred)
   2919     self.Set(source_url, 'result', result)
   2920     self.Set(source_url, 'description', description)
   2921     self.Set(source_url, 'end_time', datetime.datetime.utcnow())
   2922     self._WriteRowToManifestFile(source_url)
   2923     self._RemoveItemFromManifest(source_url)
   2924 
   2925   def _WriteRowToManifestFile(self, url):
   2926     """Writes a manifest entry to the manifest file for the url argument."""
   2927     row_item = self.items[url]
   2928     data = [
   2929         str(row_item['source_uri'].encode(UTF8)),
   2930         str(row_item['destination_uri'].encode(UTF8)),
   2931         '%sZ' % row_item['start_time'].isoformat(),
   2932         '%sZ' % row_item['end_time'].isoformat(),
   2933         row_item['md5'] if 'md5' in row_item else '',
   2934         row_item['upload_id'] if 'upload_id' in row_item else '',
   2935         str(row_item['size']) if 'size' in row_item else '',
   2936         str(row_item['bytes']) if 'bytes' in row_item else '',
   2937         row_item['result'],
   2938         row_item['description'].encode(UTF8)]
   2939 
   2940     # Aquire a lock to prevent multiple threads writing to the same file at
   2941     # the same time. This would cause a garbled mess in the manifest file.
   2942     with self.lock:
   2943       with open(self.manifest_path, 'a', 1) as f:  # 1 == line buffered
   2944         writer = csv.writer(f)
   2945         writer.writerow(data)
   2946 
   2947   def _RemoveItemFromManifest(self, url):
   2948     # Remove the item from the dictionary since we're done with it and
   2949     # we don't want the dictionary to grow too large in memory for no good
   2950     # reason.
   2951     del self.items[url]
   2952 
   2953 
   2954 class ItemExistsError(Exception):
   2955   """Exception class for objects that are skipped because they already exist."""
   2956   pass
   2957 
   2958 
   2959 class SkipUnsupportedObjectError(Exception):
   2960   """Exception for objects skipped because they are an unsupported type."""
   2961 
   2962   def __init__(self):
   2963     super(SkipUnsupportedObjectError, self).__init__()
   2964     self.unsupported_type = 'Unknown'
   2965 
   2966 
   2967 class SkipGlacierError(SkipUnsupportedObjectError):
   2968   """Exception for objects skipped because they are an unsupported type."""
   2969 
   2970   def __init__(self):
   2971     super(SkipGlacierError, self).__init__()
   2972     self.unsupported_type = 'GLACIER'
   2973 
   2974 
   2975 def GetPathBeforeFinalDir(url):
   2976   """Returns the path section before the final directory component of the URL.
   2977 
   2978   This handles cases for file system directories, bucket, and bucket
   2979   subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket',
   2980   and for file://dir we'll return file://
   2981 
   2982   Args:
   2983     url: StorageUrl representing a filesystem directory, cloud bucket or
   2984          bucket subdir.
   2985 
   2986   Returns:
   2987     String name of above-described path, sans final path separator.
   2988   """
   2989   sep = url.delim
   2990   if url.IsFileUrl():
   2991     past_scheme = url.url_string[len('file://'):]
   2992     if past_scheme.find(sep) == -1:
   2993       return 'file://'
   2994     else:
   2995       return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0]
   2996   if url.IsBucket():
   2997     return '%s://' % url.scheme
   2998   # Else it names a bucket subdir.
   2999   return url.url_string.rstrip(sep).rpartition(sep)[0]
   3000 
   3001 
   3002 def _GetPartitionInfo(file_size, max_components, default_component_size):
   3003   """Gets info about a file partition for parallel file/object transfers.
   3004 
   3005   Args:
   3006     file_size: The number of bytes in the file to be partitioned.
   3007     max_components: The maximum number of components that can be composed.
   3008     default_component_size: The size of a component, assuming that
   3009                             max_components is infinite.
   3010   Returns:
   3011     The number of components in the partitioned file, and the size of each
   3012     component (except the last, which will have a different size iff
   3013     file_size != 0 (mod num_components)).
   3014   """
   3015   # num_components = ceil(file_size / default_component_size)
   3016   num_components = DivideAndCeil(file_size, default_component_size)
   3017 
   3018   # num_components must be in the range [2, max_components]
   3019   num_components = max(min(num_components, max_components), 2)
   3020 
   3021   # component_size = ceil(file_size / num_components)
   3022   component_size = DivideAndCeil(file_size, num_components)
   3023   return (num_components, component_size)
   3024 
   3025 
   3026 def _DeleteTempComponentObjectFn(cls, url_to_delete, thread_state=None):
   3027   """Wrapper func to be used with command.Apply to delete temporary objects."""
   3028   gsutil_api = GetCloudApiInstance(cls, thread_state)
   3029   try:
   3030     gsutil_api.DeleteObject(
   3031         url_to_delete.bucket_name, url_to_delete.object_name,
   3032         generation=url_to_delete.generation, provider=url_to_delete.scheme)
   3033   except NotFoundException:
   3034     # The temporary object could already be gone if a retry was
   3035     # issued at a lower layer but the original request succeeded.
   3036     # Barring other errors, the top-level command should still report success,
   3037     # so don't raise here.
   3038     pass
   3039 
   3040 
   3041 def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock):
   3042   """Parse the tracker file from the last parallel composite upload attempt.
   3043 
   3044   If it exists, the tracker file is of the format described in
   3045   _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be
   3046   read, then the upload will start from the beginning.
   3047 
   3048   Args:
   3049     tracker_file: The name of the file to parse.
   3050     tracker_file_lock: Lock protecting access to the tracker file.
   3051 
   3052   Returns:
   3053     random_prefix: A randomly-generated prefix to the name of the
   3054                    temporary components.
   3055     existing_objects: A list of ObjectFromTracker objects representing
   3056                       the set of files that have already been uploaded.
   3057   """
   3058 
   3059   def GenerateRandomPrefix():
   3060     return str(random.randint(1, (10 ** 10) - 1))
   3061 
   3062   existing_objects = []
   3063   try:
   3064     with tracker_file_lock:
   3065       with open(tracker_file, 'r') as fp:
   3066         lines = fp.readlines()
   3067         lines = [line.strip() for line in lines]
   3068         if not lines:
   3069           print('Parallel upload tracker file (%s) was invalid. '
   3070                 'Restarting upload from scratch.' % tracker_file)
   3071           lines = [GenerateRandomPrefix()]
   3072 
   3073   except IOError as e:
   3074     # We can't read the tracker file, so generate a new random prefix.
   3075     lines = [GenerateRandomPrefix()]
   3076 
   3077     # Ignore non-existent file (happens first time an upload
   3078     # is attempted on a file), but warn user for other errors.
   3079     if e.errno != errno.ENOENT:
   3080       # Will restart because we failed to read in the file.
   3081       print('Couldn\'t read parallel upload tracker file (%s): %s. '
   3082             'Restarting upload from scratch.' % (tracker_file, e.strerror))
   3083 
   3084   # The first line contains the randomly-generated prefix.
   3085   random_prefix = lines[0]
   3086 
   3087   # The remaining lines were written in pairs to describe a single component
   3088   # in the form:
   3089   #   object_name (without random prefix)
   3090   #   generation
   3091   # Newlines are used as the delimiter because only newlines and carriage
   3092   # returns are invalid characters in object names, and users can specify
   3093   # a custom prefix in the config file.
   3094   i = 1
   3095   while i < len(lines):
   3096     (name, generation) = (lines[i], lines[i+1])
   3097     if not generation:
   3098       # Cover the '' case.
   3099       generation = None
   3100     existing_objects.append(ObjectFromTracker(name, generation))
   3101     i += 2
   3102   return (random_prefix, existing_objects)
   3103 
   3104 
   3105 def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component,
   3106                                                        tracker_file_lock):
   3107   """Appends info about the uploaded component to an existing tracker file.
   3108 
   3109   Follows the format described in _CreateParallelUploadTrackerFile.
   3110 
   3111   Args:
   3112     tracker_file: Tracker file to append to.
   3113     component: Component that was uploaded.
   3114     tracker_file_lock: Thread and process-safe Lock for the tracker file.
   3115   """
   3116   lines = _GetParallelUploadTrackerFileLinesForComponents([component])
   3117   lines = [line + '\n' for line in lines]
   3118   with tracker_file_lock:
   3119     with open(tracker_file, 'a') as f:
   3120       f.writelines(lines)
   3121 
   3122 
   3123 def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components,
   3124                                      tracker_file_lock):
   3125   """Writes information about components that were successfully uploaded.
   3126 
   3127   This way the upload can be resumed at a later date. The tracker file has
   3128   the format:
   3129     random_prefix
   3130     temp_object_1_name
   3131     temp_object_1_generation
   3132     .
   3133     .
   3134     .
   3135     temp_object_N_name
   3136     temp_object_N_generation
   3137     where N is the number of components that have been successfully uploaded.
   3138 
   3139   Args:
   3140     tracker_file: The name of the parallel upload tracker file.
   3141     random_prefix: The randomly-generated prefix that was used for
   3142                    for uploading any existing components.
   3143     components: A list of ObjectFromTracker objects that were uploaded.
   3144     tracker_file_lock: The lock protecting access to the tracker file.
   3145   """
   3146   lines = [random_prefix]
   3147   lines += _GetParallelUploadTrackerFileLinesForComponents(components)
   3148   lines = [line + '\n' for line in lines]
   3149   try:
   3150     with tracker_file_lock:
   3151       open(tracker_file, 'w').close()  # Clear the file.
   3152       with open(tracker_file, 'w') as f:
   3153         f.writelines(lines)
   3154   except IOError as e:
   3155     RaiseUnwritableTrackerFileException(tracker_file, e.strerror)
   3156 
   3157 
   3158 def _GetParallelUploadTrackerFileLinesForComponents(components):
   3159   """Return a list of the lines for use in a parallel upload tracker file.
   3160 
   3161   The lines represent the given components, using the format as described in
   3162   _CreateParallelUploadTrackerFile.
   3163 
   3164   Args:
   3165     components: A list of ObjectFromTracker objects that were uploaded.
   3166 
   3167   Returns:
   3168     Lines describing components with their generation for outputting to the
   3169     tracker file.
   3170   """
   3171   lines = []
   3172   for component in components:
   3173     generation = None
   3174     generation = component.generation
   3175     if not generation:
   3176       generation = ''
   3177     lines += [component.object_name, str(generation)]
   3178   return lines
   3179 
   3180 
   3181 def FilterExistingComponents(dst_args, existing_components, bucket_url,
   3182                              gsutil_api):
   3183   """Determines course of action for component objects.
   3184 
   3185   Given the list of all target objects based on partitioning the file and
   3186   the list of objects that have already been uploaded successfully,
   3187   this function determines which objects should be uploaded, which
   3188   existing components are still valid, and which existing components should
   3189   be deleted.
   3190 
   3191   Args:
   3192     dst_args: The map of file_name -> PerformParallelUploadFileToObjectArgs
   3193               calculated by partitioning the file.
   3194     existing_components: A list of ObjectFromTracker objects that have been
   3195                          uploaded in the past.
   3196     bucket_url: CloudUrl of the bucket in which the components exist.
   3197     gsutil_api: gsutil Cloud API instance to use for retrieving object metadata.
   3198 
   3199   Returns:
   3200     components_to_upload: List of components that need to be uploaded.
   3201     uploaded_components: List of components that have already been
   3202                          uploaded and are still valid.
   3203     existing_objects_to_delete: List of components that have already
   3204                                 been uploaded, but are no longer valid
   3205                                 and are in a versioned bucket, and
   3206                                 therefore should be deleted.
   3207   """
   3208   components_to_upload = []
   3209   existing_component_names = [component.object_name
   3210                               for component in existing_components]
   3211   for component_name in dst_args:
   3212     if component_name not in existing_component_names:
   3213       components_to_upload.append(dst_args[component_name])
   3214 
   3215   objects_already_chosen = []
   3216 
   3217   # Don't reuse any temporary components whose MD5 doesn't match the current
   3218   # MD5 of the corresponding part of the file. If the bucket is versioned,
   3219   # also make sure that we delete the existing temporary version.
   3220   existing_objects_to_delete = []
   3221   uploaded_components = []
   3222   for tracker_object in existing_components:
   3223     if (tracker_object.object_name not in dst_args.keys()
   3224         or tracker_object.object_name in objects_already_chosen):
   3225       # This could happen if the component size has changed. This also serves
   3226       # to handle object names that get duplicated in the tracker file due
   3227       # to people doing things they shouldn't (e.g., overwriting an existing
   3228       # temporary component in a versioned bucket).
   3229 
   3230       url = bucket_url.Clone()
   3231       url.object_name = tracker_object.object_name
   3232       url.generation = tracker_object.generation
   3233       existing_objects_to_delete.append(url)
   3234       continue
   3235 
   3236     dst_arg = dst_args[tracker_object.object_name]
   3237     file_part = FilePart(dst_arg.filename, dst_arg.file_start,
   3238                          dst_arg.file_length)
   3239     # TODO: calculate MD5's in parallel when possible.
   3240     content_md5 = CalculateB64EncodedMd5FromContents(file_part)
   3241 
   3242     try:
   3243       # Get the MD5 of the currently-existing component.
   3244       dst_url = dst_arg.dst_url
   3245       dst_metadata = gsutil_api.GetObjectMetadata(
   3246           dst_url.bucket_name, dst_url.object_name,
   3247           generation=dst_url.generation, provider=dst_url.scheme,
   3248           fields=['md5Hash', 'etag'])
   3249       cloud_md5 = dst_metadata.md5Hash
   3250     except Exception:  # pylint: disable=broad-except
   3251       # We don't actually care what went wrong - we couldn't retrieve the
   3252       # object to check the MD5, so just upload it again.
   3253       cloud_md5 = None
   3254 
   3255     if cloud_md5 != content_md5:
   3256       components_to_upload.append(dst_arg)
   3257       objects_already_chosen.append(tracker_object.object_name)
   3258       if tracker_object.generation:
   3259         # If the old object doesn't have a generation (i.e., it isn't in a
   3260         # versioned bucket), then we will just overwrite it anyway.
   3261         invalid_component_with_generation = dst_arg.dst_url.Clone()
   3262         invalid_component_with_generation.generation = tracker_object.generation
   3263         existing_objects_to_delete.append(invalid_component_with_generation)
   3264     else:
   3265       url = dst_arg.dst_url.Clone()
   3266       url.generation = tracker_object.generation
   3267       uploaded_components.append(url)
   3268       objects_already_chosen.append(tracker_object.object_name)
   3269 
   3270   if uploaded_components:
   3271     logging.info('Found %d existing temporary components to reuse.',
   3272                  len(uploaded_components))
   3273 
   3274   return (components_to_upload, uploaded_components,
   3275           existing_objects_to_delete)
   3276