Home | History | Annotate | Download | only in py
      1 #!/usr/bin/env python
      2 """Upload and download support for apitools."""
      3 from __future__ import print_function
      5 import email.generator as email_generator
      6 import email.mime.multipart as mime_multipart
      7 import email.mime.nonmultipart as mime_nonmultipart
      8 import io
      9 import json
     10 import mimetypes
     11 import os
     12 import threading
     14 import six
     15 from six.moves import http_client
     17 from apitools.base.py import buffered_stream
     18 from apitools.base.py import exceptions
     19 from apitools.base.py import http_wrapper
     20 from apitools.base.py import stream_slice
     21 from apitools.base.py import util
     23 __all__ = [
     24     'Download',
     25     'Upload',
     26     'RESUMABLE_UPLOAD',
     27     'SIMPLE_UPLOAD',
     28     'DownloadProgressPrinter',
     29     'DownloadCompletePrinter',
     30     'UploadProgressPrinter',
     31     'UploadCompletePrinter',
     32 ]
     35 SIMPLE_UPLOAD = 'simple'
     36 RESUMABLE_UPLOAD = 'resumable'
     39 def DownloadProgressPrinter(response, unused_download):
     40     """Print download progress based on response."""
     41     if 'content-range' in response.info:
     42         print('Received %s' % response.info['content-range'])
     43     else:
     44         print('Received %d bytes' % response.length)
     47 def DownloadCompletePrinter(unused_response, unused_download):
     48     """Print information about a completed download."""
     49     print('Download complete')
     52 def UploadProgressPrinter(response, unused_upload):
     53     """Print upload progress based on response."""
     54     print('Sent %s' % response.info['range'])
     57 def UploadCompletePrinter(unused_response, unused_upload):
     58     """Print information about a completed upload."""
     59     print('Upload complete')
     62 class _Transfer(object):
     64     """Generic bits common to Uploads and Downloads."""
     66     def __init__(self, stream, close_stream=False, chunksize=None,
     67                  auto_transfer=True, http=None, num_retries=5):
     68         self.__bytes_http = None
     69         self.__close_stream = close_stream
     70         self.__http = http
     71         self.__stream = stream
     72         self.__url = None
     74         self.__num_retries = 5
     75         # Let the @property do validation
     76         self.num_retries = num_retries
     78         self.retry_func = (
     79             http_wrapper.HandleExceptionsAndRebuildHttpConnections)
     80         self.auto_transfer = auto_transfer
     81         self.chunksize = chunksize or 1048576
     83     def __repr__(self):
     84         return str(self)
     86     @property
     87     def close_stream(self):
     88         return self.__close_stream
     90     @property
     91     def http(self):
     92         return self.__http
     94     @property
     95     def bytes_http(self):
     96         return self.__bytes_http or self.http
     98     @bytes_http.setter
     99     def bytes_http(self, value):
    100         self.__bytes_http = value
    102     @property
    103     def num_retries(self):
    104         return self.__num_retries
    106     @num_retries.setter
    107     def num_retries(self, value):
    108         util.Typecheck(value, six.integer_types)
    109         if value < 0:
    110             raise exceptions.InvalidDataError(
    111                 'Cannot have negative value for num_retries')
    112         self.__num_retries = value
    114     @property
    115     def stream(self):
    116         return self.__stream
    118     @property
    119     def url(self):
    120         return self.__url
    122     def _Initialize(self, http, url):
    123         """Initialize this download by setting self.http and self.url.
    125         We want the user to be able to override self.http by having set
    126         the value in the constructor; in that case, we ignore the provided
    127         http.
    129         Args:
    130           http: An httplib2.Http instance or None.
    131           url: The url for this transfer.
    133         Returns:
    134           None. Initializes self.
    135         """
    136         self.EnsureUninitialized()
    137         if self.http is None:
    138             self.__http = http or http_wrapper.GetHttp()
    139         self.__url = url
    141     @property
    142     def initialized(self):
    143         return self.url is not None and self.http is not None
    145     @property
    146     def _type_name(self):
    147         return type(self).__name__
    149     def EnsureInitialized(self):
    150         if not self.initialized:
    151             raise exceptions.TransferInvalidError(
    152                 'Cannot use uninitialized %s', self._type_name)
    154     def EnsureUninitialized(self):
    155         if self.initialized:
    156             raise exceptions.TransferInvalidError(
    157                 'Cannot re-initialize %s', self._type_name)
    159     def __del__(self):
    160         if self.__close_stream:
    161             self.__stream.close()
    163     def _ExecuteCallback(self, callback, response):
    164         # TODO(craigcitro): Push these into a queue.
    165         if callback is not None:
    166             threading.Thread(target=callback, args=(response, self)).start()
    169 class Download(_Transfer):
    171     """Data for a single download.
    173     Public attributes:
    174       chunksize: default chunksize to use for transfers.
    175     """
    176     _ACCEPTABLE_STATUSES = set((
    177         http_client.OK,
    178         http_client.NO_CONTENT,
    179         http_client.PARTIAL_CONTENT,
    180         http_client.REQUESTED_RANGE_NOT_SATISFIABLE,
    181     ))
    183         'auto_transfer', 'progress', 'total_size', 'url'))
    185     def __init__(self, stream, progress_callback=None, finish_callback=None,
    186                  **kwds):
    187         total_size = kwds.pop('total_size', None)
    188         super(Download, self).__init__(stream, **kwds)
    189         self.__initial_response = None
    190         self.__progress = 0
    191         self.__total_size = total_size
    192         self.__encoding = None
    194         self.progress_callback = progress_callback
    195         self.finish_callback = finish_callback
    197     @property
    198     def progress(self):
    199         return self.__progress
    201     @property
    202     def encoding(self):
    203         return self.__encoding
    205     @classmethod
    206     def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds):
    207         """Create a new download object from a filename."""
    208         path = os.path.expanduser(filename)
    209         if os.path.exists(path) and not overwrite:
    210             raise exceptions.InvalidUserInputError(
    211                 'File %s exists and overwrite not specified' % path)
    212         return cls(open(path, 'wb'), close_stream=True,
    213                    auto_transfer=auto_transfer, **kwds)
    215     @classmethod
    216     def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds):
    217         """Create a new Download object from a stream."""
    218         return cls(stream, auto_transfer=auto_transfer, total_size=total_size,
    219                    **kwds)
    221     @classmethod
    222     def FromData(cls, stream, json_data, http=None, auto_transfer=None,
    223                  **kwds):
    224         """Create a new Download object from a stream and serialized data."""
    225         info = json.loads(json_data)
    226         missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
    227         if missing_keys:
    228             raise exceptions.InvalidDataError(
    229                 'Invalid serialization data, missing keys: %s' % (
    230                     ', '.join(missing_keys)))
    231         download = cls.FromStream(stream, **kwds)
    232         if auto_transfer is not None:
    233             download.auto_transfer = auto_transfer
    234         else:
    235             download.auto_transfer = info['auto_transfer']
    236         setattr(download, '_Download__progress', info['progress'])
    237         setattr(download, '_Download__total_size', info['total_size'])
    238         download._Initialize(  # pylint: disable=protected-access
    239             http, info['url'])
    240         return download
    242     @property
    243     def serialization_data(self):
    244         self.EnsureInitialized()
    245         return {
    246             'auto_transfer': self.auto_transfer,
    247             'progress': self.progress,
    248             'total_size': self.total_size,
    249             'url': self.url,
    250         }
    252     @property
    253     def total_size(self):
    254         return self.__total_size
    256     def __str__(self):
    257         if not self.initialized:
    258             return 'Download (uninitialized)'
    259         else:
    260             return 'Download with %d/%s bytes transferred from url %s' % (
    261                 self.progress, self.total_size, self.url)
    263     def ConfigureRequest(self, http_request, url_builder):
    264         url_builder.query_params['alt'] = 'media'
    265         # TODO(craigcitro): We need to send range requests because by
    266         # default httplib2 stores entire reponses in memory. Override
    267         # httplib2's download method (as gsutil does) so that this is not
    268         # necessary.
    269         http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,)
    271     def __SetTotal(self, info):
    272         if 'content-range' in info:
    273             _, _, total = info['content-range'].rpartition('/')
    274             if total != '*':
    275                 self.__total_size = int(total)
    276         # Note "total_size is None" means we don't know it; if no size
    277         # info was returned on our initial range request, that means we
    278         # have a 0-byte file. (That last statement has been verified
    279         # empirically, but is not clearly documented anywhere.)
    280         if self.total_size is None:
    281             self.__total_size = 0
    283     def InitializeDownload(self, http_request, http=None, client=None):
    284         """Initialize this download by making a request.
    286         Args:
    287           http_request: The HttpRequest to use to initialize this download.
    288           http: The httplib2.Http instance for this request.
    289           client: If provided, let this client process the final URL before
    290               sending any additional requests. If client is provided and
    291               http is not, client.http will be used instead.
    292         """
    293         self.EnsureUninitialized()
    294         if http is None and client is None:
    295             raise exceptions.UserError('Must provide client or http.')
    296         http = http or client.http
    297         if client is not None:
    298             http_request.url = client.FinalizeTransferUrl(http_request.url)
    299         url = http_request.url
    300         if self.auto_transfer:
    301             end_byte = self.__ComputeEndByte(0)
    302             self.__SetRangeHeader(http_request, 0, end_byte)
    303             response = http_wrapper.MakeRequest(
    304                 self.bytes_http or http, http_request)
    305             if response.status_code not in self._ACCEPTABLE_STATUSES:
    306                 raise exceptions.HttpError.FromResponse(response)
    307             self.__initial_response = response
    308             self.__SetTotal(response.info)
    309             url = response.info.get('content-location', response.request_url)
    310         if client is not None:
    311             url = client.FinalizeTransferUrl(url)
    312         self._Initialize(http, url)
    313         # Unless the user has requested otherwise, we want to just
    314         # go ahead and pump the bytes now.
    315         if self.auto_transfer:
    316             self.StreamInChunks()
    318     def __NormalizeStartEnd(self, start, end=None):
    319         if end is not None:
    320             if start < 0:
    321                 raise exceptions.TransferInvalidError(
    322                     'Cannot have end index with negative start index')
    323             elif start >= self.total_size:
    324                 raise exceptions.TransferInvalidError(
    325                     'Cannot have start index greater than total size')
    326             end = min(end, self.total_size - 1)
    327             if end < start:
    328                 raise exceptions.TransferInvalidError(
    329                     'Range requested with end[%s] < start[%s]' % (end, start))
    330             return start, end
    331         else:
    332             if start < 0:
    333                 start = max(0, start + self.total_size)
    334             return start, self.total_size - 1
    336     def __SetRangeHeader(self, request, start, end=None):
    337         if start < 0:
    338             request.headers['range'] = 'bytes=%d' % start
    339         elif end is None:
    340             request.headers['range'] = 'bytes=%d-' % start
    341         else:
    342             request.headers['range'] = 'bytes=%d-%d' % (start, end)
    344     def __ComputeEndByte(self, start, end=None, use_chunks=True):
    345         """Compute the last byte to fetch for this request.
    347         This is all based on the HTTP spec for Range and
    348         Content-Range.
    350         Note that this is potentially confusing in several ways:
    351           * the value for the last byte is 0-based, eg "fetch 10 bytes
    352             from the beginning" would return 9 here.
    353           * if we have no information about size, and don't want to
    354             use the chunksize, we'll return None.
    355         See the tests for more examples.
    357         Args:
    358           start: byte to start at.
    359           end: (int or None, default: None) Suggested last byte.
    360           use_chunks: (bool, default: True) If False, ignore self.chunksize.
    362         Returns:
    363           Last byte to use in a Range header, or None.
    365         """
    366         end_byte = end
    368         if start < 0 and not self.total_size:
    369             return end_byte
    371         if use_chunks:
    372             alternate = start + self.chunksize - 1
    373             if end_byte is not None:
    374                 end_byte = min(end_byte, alternate)
    375             else:
    376                 end_byte = alternate
    378         if self.total_size:
    379             alternate = self.total_size - 1
    380             if end_byte is not None:
    381                 end_byte = min(end_byte, alternate)
    382             else:
    383                 end_byte = alternate
    385         return end_byte
    387     def __GetChunk(self, start, end, additional_headers=None):
    388         """Retrieve a chunk, and return the full response."""
    389         self.EnsureInitialized()
    390         request = http_wrapper.Request(url=self.url)
    391         self.__SetRangeHeader(request, start, end=end)
    392         if additional_headers is not None:
    393             request.headers.update(additional_headers)
    394         return http_wrapper.MakeRequest(
    395             self.bytes_http, request, retry_func=self.retry_func,
    396             retries=self.num_retries)
    398     def __ProcessResponse(self, response):
    399         """Process response (by updating self and writing to self.stream)."""
    400         if response.status_code not in self._ACCEPTABLE_STATUSES:
    401             # We distinguish errors that mean we made a mistake in setting
    402             # up the transfer versus something we should attempt again.
    403             if response.status_code in (http_client.FORBIDDEN,
    404                                         http_client.NOT_FOUND):
    405                 raise exceptions.HttpError.FromResponse(response)
    406             else:
    407                 raise exceptions.TransferRetryError(response.content)
    408         if response.status_code in (http_client.OK,
    409                                     http_client.PARTIAL_CONTENT):
    410             self.stream.write(response.content)
    411             self.__progress += response.length
    412             if response.info and 'content-encoding' in response.info:
    413                 # TODO(craigcitro): Handle the case where this changes over a
    414                 # download.
    415                 self.__encoding = response.info['content-encoding']
    416         elif response.status_code == http_client.NO_CONTENT:
    417             # It's important to write something to the stream for the case
    418             # of a 0-byte download to a file, as otherwise python won't
    419             # create the file.
    420             self.stream.write('')
    421         return response
    423     def GetRange(self, start, end=None, additional_headers=None,
    424                  use_chunks=True):
    425         """Retrieve a given byte range from this download, inclusive.
    427         Range must be of one of these three forms:
    428         * 0 <= start, end = None: Fetch from start to the end of the file.
    429         * 0 <= start <= end: Fetch the bytes from start to end.
    430         * start < 0, end = None: Fetch the last -start bytes of the file.
    432         (These variations correspond to those described in the HTTP 1.1
    433         protocol for range headers in RFC 2616, sec. 14.35.1.)
    435         Args:
    436           start: (int) Where to start fetching bytes. (See above.)
    437           end: (int, optional) Where to stop fetching bytes. (See above.)
    438           additional_headers: (bool, optional) Any additional headers to
    439               pass with the request.
    440           use_chunks: (bool, default: True) If False, ignore self.chunksize
    441               and fetch this range in a single request.
    443         Returns:
    444           None. Streams bytes into self.stream.
    445         """
    446         self.EnsureInitialized()
    447         progress_end_normalized = False
    448         if self.total_size is not None:
    449             progress, end_byte = self.__NormalizeStartEnd(start, end)
    450             progress_end_normalized = True
    451         else:
    452             progress = start
    453             end_byte = end
    454         while (not progress_end_normalized or end_byte is None or
    455                progress <= end_byte):
    456             end_byte = self.__ComputeEndByte(progress, end=end_byte,
    457                                              use_chunks=use_chunks)
    458             response = self.__GetChunk(progress, end_byte,
    459                                        additional_headers=additional_headers)
    460             if not progress_end_normalized:
    461                 self.__SetTotal(response.info)
    462                 progress, end_byte = self.__NormalizeStartEnd(start, end)
    463                 progress_end_normalized = True
    464             response = self.__ProcessResponse(response)
    465             progress += response.length
    466             if response.length == 0:
    467                 raise exceptions.TransferRetryError(
    468                     'Zero bytes unexpectedly returned in download response')
    470     def StreamInChunks(self, callback=None, finish_callback=None,
    471                        additional_headers=None):
    472         """Stream the entire download in chunks."""
    473         self.StreamMedia(callback=callback, finish_callback=finish_callback,
    474                          additional_headers=additional_headers,
    475                          use_chunks=True)
    477     def StreamMedia(self, callback=None, finish_callback=None,
    478                     additional_headers=None, use_chunks=True):
    479         """Stream the entire download.
    481         Args:
    482           callback: (default: None) Callback to call as each chunk is
    483               completed.
    484           finish_callback: (default: None) Callback to call when the
    485               download is complete.
    486           additional_headers: (default: None) Additional headers to
    487               include in fetching bytes.
    488           use_chunks: (bool, default: True) If False, ignore self.chunksize
    489               and stream this download in a single request.
    491         Returns:
    492             None. Streams bytes into self.stream.
    493         """
    494         callback = callback or self.progress_callback
    495         finish_callback = finish_callback or self.finish_callback
    497         self.EnsureInitialized()
    498         while True:
    499             if self.__initial_response is not None:
    500                 response = self.__initial_response
    501                 self.__initial_response = None
    502             else:
    503                 end_byte = self.__ComputeEndByte(self.progress,
    504                                                  use_chunks=use_chunks)
    505                 response = self.__GetChunk(
    506                     self.progress, end_byte,
    507                     additional_headers=additional_headers)
    508             if self.total_size is None:
    509                 self.__SetTotal(response.info)
    510             response = self.__ProcessResponse(response)
    511             self._ExecuteCallback(callback, response)
    512             if (response.status_code == http_client.OK or
    513                     self.progress >= self.total_size):
    514                 break
    515         self._ExecuteCallback(finish_callback, response)
    518 class Upload(_Transfer):
    520     """Data for a single Upload.
    522     Fields:
    523       stream: The stream to upload.
    524       mime_type: MIME type of the upload.
    525       total_size: (optional) Total upload size for the stream.
    526       close_stream: (default: False) Whether or not we should close the
    527           stream when finished with the upload.
    528       auto_transfer: (default: True) If True, stream all bytes as soon as
    529           the upload is created.
    530     """
    532         'auto_transfer', 'mime_type', 'total_size', 'url'))
    534     def __init__(self, stream, mime_type, total_size=None, http=None,
    535                  close_stream=False, chunksize=None, auto_transfer=True,
    536                  progress_callback=None, finish_callback=None,
    537                  **kwds):
    538         super(Upload, self).__init__(
    539             stream, close_stream=close_stream, chunksize=chunksize,
    540             auto_transfer=auto_transfer, http=http, **kwds)
    541         self.__complete = False
    542         self.__final_response = None
    543         self.__mime_type = mime_type
    544         self.__progress = 0
    545         self.__server_chunk_granularity = None
    546         self.__strategy = None
    547         self.__total_size = None
    549         self.progress_callback = progress_callback
    550         self.finish_callback = finish_callback
    551         self.total_size = total_size
    553     @property
    554     def progress(self):
    555         return self.__progress
    557     @classmethod
    558     def FromFile(cls, filename, mime_type=None, auto_transfer=True, **kwds):
    559         """Create a new Upload object from a filename."""
    560         path = os.path.expanduser(filename)
    561         if not os.path.exists(path):
    562             raise exceptions.NotFoundError('Could not find file %s' % path)
    563         if not mime_type:
    564             mime_type, _ = mimetypes.guess_type(path)
    565             if mime_type is None:
    566                 raise exceptions.InvalidUserInputError(
    567                     'Could not guess mime type for %s' % path)
    568         size = os.stat(path).st_size
    569         return cls(open(path, 'rb'), mime_type, total_size=size,
    570                    close_stream=True, auto_transfer=auto_transfer, **kwds)
    572     @classmethod
    573     def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True,
    574                    **kwds):
    575         """Create a new Upload object from a stream."""
    576         if mime_type is None:
    577             raise exceptions.InvalidUserInputError(
    578                 'No mime_type specified for stream')
    579         return cls(stream, mime_type, total_size=total_size,
    580                    close_stream=False, auto_transfer=auto_transfer, **kwds)
    582     @classmethod
    583     def FromData(cls, stream, json_data, http, auto_transfer=None, **kwds):
    584         """Create a new Upload of stream from serialized json_data and http."""
    585         info = json.loads(json_data)
    586         missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
    587         if missing_keys:
    588             raise exceptions.InvalidDataError(
    589                 'Invalid serialization data, missing keys: %s' % (
    590                     ', '.join(missing_keys)))
    591         if 'total_size' in kwds:
    592             raise exceptions.InvalidUserInputError(
    593                 'Cannot override total_size on serialized Upload')
    594         upload = cls.FromStream(stream, info['mime_type'],
    595                                 total_size=info.get('total_size'), **kwds)
    596         if isinstance(stream, io.IOBase) and not stream.seekable():
    597             raise exceptions.InvalidUserInputError(
    598                 'Cannot restart resumable upload on non-seekable stream')
    599         if auto_transfer is not None:
    600             upload.auto_transfer = auto_transfer
    601         else:
    602             upload.auto_transfer = info['auto_transfer']
    603         upload.strategy = RESUMABLE_UPLOAD
    604         upload._Initialize(  # pylint: disable=protected-access
    605             http, info['url'])
    606         upload.RefreshResumableUploadState()
    607         upload.EnsureInitialized()
    608         if upload.auto_transfer:
    609             upload.StreamInChunks()
    610         return upload
    612     @property
    613     def serialization_data(self):
    614         self.EnsureInitialized()
    615         if self.strategy != RESUMABLE_UPLOAD:
    616             raise exceptions.InvalidDataError(
    617                 'Serialization only supported for resumable uploads')
    618         return {
    619             'auto_transfer': self.auto_transfer,
    620             'mime_type': self.mime_type,
    621             'total_size': self.total_size,
    622             'url': self.url,
    623         }
    625     @property
    626     def complete(self):
    627         return self.__complete
    629     @property
    630     def mime_type(self):
    631         return self.__mime_type
    633     def __str__(self):
    634         if not self.initialized:
    635             return 'Upload (uninitialized)'
    636         else:
    637             return 'Upload with %d/%s bytes transferred for url %s' % (
    638                 self.progress, self.total_size or '???', self.url)
    640     @property
    641     def strategy(self):
    642         return self.__strategy
    644     @strategy.setter
    645     def strategy(self, value):
    646         if value not in (SIMPLE_UPLOAD, RESUMABLE_UPLOAD):
    647             raise exceptions.UserError((
    648                 'Invalid value "%s" for upload strategy, must be one of '
    649                 '"simple" or "resumable".') % value)
    650         self.__strategy = value
    652     @property
    653     def total_size(self):
    654         return self.__total_size
    656     @total_size.setter
    657     def total_size(self, value):
    658         self.EnsureUninitialized()
    659         self.__total_size = value
    661     def __SetDefaultUploadStrategy(self, upload_config, http_request):
    662         """Determine and set the default upload strategy for this upload.
    664         We generally prefer simple or multipart, unless we're forced to
    665         use resumable. This happens when any of (1) the upload is too
    666         large, (2) the simple endpoint doesn't support multipart requests
    667         and we have metadata, or (3) there is no simple upload endpoint.
    669         Args:
    670           upload_config: Configuration for the upload endpoint.
    671           http_request: The associated http request.
    673         Returns:
    674           None.
    675         """
    676         if upload_config.resumable_path is None:
    677             self.strategy = SIMPLE_UPLOAD
    678         if self.strategy is not None:
    679             return
    680         strategy = SIMPLE_UPLOAD
    681         if (self.total_size is not None and
    682                 self.total_size > _RESUMABLE_UPLOAD_THRESHOLD):
    683             strategy = RESUMABLE_UPLOAD
    684         if http_request.body and not upload_config.simple_multipart:
    685             strategy = RESUMABLE_UPLOAD
    686         if not upload_config.simple_path:
    687             strategy = RESUMABLE_UPLOAD
    688         self.strategy = strategy
    690     def ConfigureRequest(self, upload_config, http_request, url_builder):
    691         """Configure the request and url for this upload."""
    692         # Validate total_size vs. max_size
    693         if (self.total_size and upload_config.max_size and
    694                 self.total_size > upload_config.max_size):
    695             raise exceptions.InvalidUserInputError(
    696                 'Upload too big: %s larger than max size %s' % (
    697                     self.total_size, upload_config.max_size))
    698         # Validate mime type
    699         if not util.AcceptableMimeType(upload_config.accept, self.mime_type):
    700             raise exceptions.InvalidUserInputError(
    701                 'MIME type %s does not match any accepted MIME ranges %s' % (
    702                     self.mime_type, upload_config.accept))
    704         self.__SetDefaultUploadStrategy(upload_config, http_request)
    705         if self.strategy == SIMPLE_UPLOAD:
    706             url_builder.relative_path = upload_config.simple_path
    707             if http_request.body:
    708                 url_builder.query_params['uploadType'] = 'multipart'
    709                 self.__ConfigureMultipartRequest(http_request)
    710             else:
    711                 url_builder.query_params['uploadType'] = 'media'
    712                 self.__ConfigureMediaRequest(http_request)
    713         else:
    714             url_builder.relative_path = upload_config.resumable_path
    715             url_builder.query_params['uploadType'] = 'resumable'
    716             self.__ConfigureResumableRequest(http_request)
    718     def __ConfigureMediaRequest(self, http_request):
    719         """Configure http_request as a simple request for this upload."""
    720         http_request.headers['content-type'] = self.mime_type
    721         http_request.body = self.stream.read()
    722         http_request.loggable_body = '<media body>'
    724     def __ConfigureMultipartRequest(self, http_request):
    725         """Configure http_request as a multipart request for this upload."""
    726         # This is a multipart/related upload.
    727         msg_root = mime_multipart.MIMEMultipart('related')
    728         # msg_root should not write out its own headers
    729         setattr(msg_root, '_write_headers', lambda self: None)
    731         # attach the body as one part
    732         msg = mime_nonmultipart.MIMENonMultipart(
    733             *http_request.headers['content-type'].split('/'))
    734         msg.set_payload(http_request.body)
    735         msg_root.attach(msg)
    737         # attach the media as the second part
    738         msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/'))
    739         msg['Content-Transfer-Encoding'] = 'binary'
    740         msg.set_payload(self.stream.read())
    741         msg_root.attach(msg)
    743         # NOTE: We encode the body, but can't use
    744         #       `email.message.Message.as_string` because it prepends
    745         #       `> ` to `From ` lines.
    746         # NOTE: We must use six.StringIO() instead of io.StringIO() since the
    747         #       `email` library uses cStringIO in Py2 and io.StringIO in Py3.
    748         fp = six.StringIO()
    749         g = email_generator.Generator(fp, mangle_from_=False)
    750         g.flatten(msg_root, unixfrom=False)
    751         http_request.body = fp.getvalue()
    753         multipart_boundary = msg_root.get_boundary()
    754         http_request.headers['content-type'] = (
    755             'multipart/related; boundary=%r' % multipart_boundary)
    757         body_components = http_request.body.split(multipart_boundary)
    758         headers, _, _ = body_components[-2].partition('\n\n')
    759         body_components[-2] = '\n\n'.join([headers, '<media body>\n\n--'])
    760         http_request.loggable_body = multipart_boundary.join(body_components)
    762     def __ConfigureResumableRequest(self, http_request):
    763         http_request.headers['X-Upload-Content-Type'] = self.mime_type
    764         if self.total_size is not None:
    765             http_request.headers[
    766                 'X-Upload-Content-Length'] = str(self.total_size)
    768     def RefreshResumableUploadState(self):
    769         """Talk to the server and refresh the state of this resumable upload.
    771         Returns:
    772           Response if the upload is complete.
    773         """
    774         if self.strategy != RESUMABLE_UPLOAD:
    775             return
    776         self.EnsureInitialized()
    777         refresh_request = http_wrapper.Request(
    778             url=self.url, http_method='PUT',
    779             headers={'Content-Range': 'bytes */*'})
    780         refresh_response = http_wrapper.MakeRequest(
    781             self.http, refresh_request, redirections=0,
    782             retries=self.num_retries)
    783         range_header = self._GetRangeHeaderFromResponse(refresh_response)
    784         if refresh_response.status_code in (http_client.OK,
    785                                             http_client.CREATED):
    786             self.__complete = True
    787             self.__progress = self.total_size
    788             self.stream.seek(self.progress)
    789             # If we're finished, the refresh response will contain the metadata
    790             # originally requested. Cache it so it can be returned in
    791             # StreamInChunks.
    792             self.__final_response = refresh_response
    793         elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE:
    794             if range_header is None:
    795                 self.__progress = 0
    796             else:
    797                 self.__progress = self.__GetLastByte(range_header) + 1
    798             self.stream.seek(self.progress)
    799         else:
    800             raise exceptions.HttpError.FromResponse(refresh_response)
    802     def _GetRangeHeaderFromResponse(self, response):
    803         return response.info.get('Range', response.info.get('range'))
    805     def InitializeUpload(self, http_request, http=None, client=None):
    806         """Initialize this upload from the given http_request."""
    807         if self.strategy is None:
    808             raise exceptions.UserError(
    809                 'No upload strategy set; did you call ConfigureRequest?')
    810         if http is None and client is None:
    811             raise exceptions.UserError('Must provide client or http.')
    812         if self.strategy != RESUMABLE_UPLOAD:
    813             return
    814         http = http or client.http
    815         if client is not None:
    816             http_request.url = client.FinalizeTransferUrl(http_request.url)
    817         self.EnsureUninitialized()
    818         http_response = http_wrapper.MakeRequest(http, http_request,
    819                                                  retries=self.num_retries)
    820         if http_response.status_code != http_client.OK:
    821             raise exceptions.HttpError.FromResponse(http_response)
    823         self.__server_chunk_granularity = http_response.info.get(
    824             'X-Goog-Upload-Chunk-Granularity')
    825         url = http_response.info['location']
    826         if client is not None:
    827             url = client.FinalizeTransferUrl(url)
    828         self._Initialize(http, url)
    830         # Unless the user has requested otherwise, we want to just
    831         # go ahead and pump the bytes now.
    832         if self.auto_transfer:
    833             return self.StreamInChunks()
    835     def __GetLastByte(self, range_header):
    836         _, _, end = range_header.partition('-')
    837         # TODO(craigcitro): Validate start == 0?
    838         return int(end)
    840     def __ValidateChunksize(self, chunksize=None):
    841         if self.__server_chunk_granularity is None:
    842             return
    843         chunksize = chunksize or self.chunksize
    844         if chunksize % self.__server_chunk_granularity:
    845             raise exceptions.ConfigurationValueError(
    846                 'Server requires chunksize to be a multiple of %d',
    847                 self.__server_chunk_granularity)
    849     def __StreamMedia(self, callback=None, finish_callback=None,
    850                       additional_headers=None, use_chunks=True):
    851         """Helper function for StreamMedia / StreamInChunks."""
    852         if self.strategy != RESUMABLE_UPLOAD:
    853             raise exceptions.InvalidUserInputError(
    854                 'Cannot stream non-resumable upload')
    855         callback = callback or self.progress_callback
    856         finish_callback = finish_callback or self.finish_callback
    857         # final_response is set if we resumed an already-completed upload.
    858         response = self.__final_response
    859         send_func = self.__SendChunk if use_chunks else self.__SendMediaBody
    860         if use_chunks:
    861             self.__ValidateChunksize(self.chunksize)
    862         self.EnsureInitialized()
    863         while not self.complete:
    864             response = send_func(self.stream.tell(),
    865                                  additional_headers=additional_headers)
    866             if response.status_code in (http_client.OK, http_client.CREATED):
    867                 self.__complete = True
    868                 break
    869             self.__progress = self.__GetLastByte(response.info['range'])
    870             if self.progress + 1 != self.stream.tell():
    871                 # TODO(craigcitro): Add a better way to recover here.
    872                 raise exceptions.CommunicationError(
    873                     'Failed to transfer all bytes in chunk, upload paused at '
    874                     'byte %d' % self.progress)
    875             self._ExecuteCallback(callback, response)
    876         if self.__complete and hasattr(self.stream, 'seek'):
    877             current_pos = self.stream.tell()
    878             self.stream.seek(0, os.SEEK_END)
    879             end_pos = self.stream.tell()
    880             self.stream.seek(current_pos)
    881             if current_pos != end_pos:
    882                 raise exceptions.TransferInvalidError(
    883                     'Upload complete with %s additional bytes left in stream' %
    884                     (int(end_pos) - int(current_pos)))
    885         self._ExecuteCallback(finish_callback, response)
    886         return response
    888     def StreamMedia(self, callback=None, finish_callback=None,
    889                     additional_headers=None):
    890         """Send this resumable upload in a single request.
    892         Args:
    893           callback: Progress callback function with inputs
    894               (http_wrapper.Response, transfer.Upload)
    895           finish_callback: Final callback function with inputs
    896               (http_wrapper.Response, transfer.Upload)
    897           additional_headers: Dict of headers to include with the upload
    898               http_wrapper.Request.
    900         Returns:
    901           http_wrapper.Response of final response.
    902         """
    903         return self.__StreamMedia(
    904             callback=callback, finish_callback=finish_callback,
    905             additional_headers=additional_headers, use_chunks=False)
    907     def StreamInChunks(self, callback=None, finish_callback=None,
    908                        additional_headers=None):
    909         """Send this (resumable) upload in chunks."""
    910         return self.__StreamMedia(
    911             callback=callback, finish_callback=finish_callback,
    912             additional_headers=additional_headers)
    914     def __SendMediaRequest(self, request, end):
    915         """Request helper function for SendMediaBody & SendChunk."""
    916         response = http_wrapper.MakeRequest(
    917             self.bytes_http, request, retry_func=self.retry_func,
    918             retries=self.num_retries)
    919         if response.status_code not in (http_client.OK, http_client.CREATED,
    920                                         http_wrapper.RESUME_INCOMPLETE):
    921             # We want to reset our state to wherever the server left us
    922             # before this failed request, and then raise.
    923             self.RefreshResumableUploadState()
    924             raise exceptions.HttpError.FromResponse(response)
    925         if response.status_code == http_wrapper.RESUME_INCOMPLETE:
    926             last_byte = self.__GetLastByte(
    927                 self._GetRangeHeaderFromResponse(response))
    928             if last_byte + 1 != end:
    929                 self.stream.seek(last_byte)
    930         return response
    932     def __SendMediaBody(self, start, additional_headers=None):
    933         """Send the entire media stream in a single request."""
    934         self.EnsureInitialized()
    935         if self.total_size is None:
    936             raise exceptions.TransferInvalidError(
    937                 'Total size must be known for SendMediaBody')
    938         body_stream = stream_slice.StreamSlice(
    939             self.stream, self.total_size - start)
    941         request = http_wrapper.Request(url=self.url, http_method='PUT',
    942                                        body=body_stream)
    943         request.headers['Content-Type'] = self.mime_type
    944         if start == self.total_size:
    945             # End of an upload with 0 bytes left to send; just finalize.
    946             range_string = 'bytes */%s' % self.total_size
    947         else:
    948             range_string = 'bytes %s-%s/%s' % (start, self.total_size - 1,
    949                                                self.total_size)
    951         request.headers['Content-Range'] = range_string
    952         if additional_headers:
    953             request.headers.update(additional_headers)
    955         return self.__SendMediaRequest(request, self.total_size)
    957     def __SendChunk(self, start, additional_headers=None):
    958         """Send the specified chunk."""
    959         self.EnsureInitialized()
    960         no_log_body = self.total_size is None
    961         if self.total_size is None:
    962             # For the streaming resumable case, we need to detect when
    963             # we're at the end of the stream.
    964             body_stream = buffered_stream.BufferedStream(
    965                 self.stream, start, self.chunksize)
    966             end = body_stream.stream_end_position
    967             if body_stream.stream_exhausted:
    968                 self.__total_size = end
    969             # TODO: Here, change body_stream from a stream to a string object,
    970             # which means reading a chunk into memory.  This works around
    971             # https://code.google.com/p/httplib2/issues/detail?id=176 which can
    972             # cause httplib2 to skip bytes on 401's for file objects.
    973             # Rework this solution to be more general.
    974             body_stream = body_stream.read(self.chunksize)
    975         else:
    976             end = min(start + self.chunksize, self.total_size)
    977             body_stream = stream_slice.StreamSlice(self.stream, end - start)
    978         # TODO(craigcitro): Think about clearer errors on "no data in
    979         # stream".
    980         request = http_wrapper.Request(url=self.url, http_method='PUT',
    981                                        body=body_stream)
    982         request.headers['Content-Type'] = self.mime_type
    983         if no_log_body:
    984             # Disable logging of streaming body.
    985             # TODO: Remove no_log_body and rework as part of a larger logs
    986             # refactor.
    987             request.loggable_body = '<media body>'
    988         if self.total_size is None:
    989             # Streaming resumable upload case, unknown total size.
    990             range_string = 'bytes %s-%s/*' % (start, end - 1)
    991         elif end == start:
    992             # End of an upload with 0 bytes left to send; just finalize.
    993             range_string = 'bytes */%s' % self.total_size
    994         else:
    995             # Normal resumable upload case with known sizes.
    996             range_string = 'bytes %s-%s/%s' % (start, end - 1, self.total_size)
    998         request.headers['Content-Range'] = range_string
    999         if additional_headers:
   1000             request.headers.update(additional_headers)
   1002         return self.__SendMediaRequest(request, end)