Home | History | Annotate | Download | only in cloudstorage
      1 # Copyright 2012 Google Inc. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #    http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing,
     10 # software distributed under the License is distributed on an
     11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
     12 # either express or implied. See the License for the specific
     13 # language governing permissions and limitations under the License.
     14 
     15 """Python wrappers for the Google Storage RESTful API."""
     16 
     17 
     18 
     19 
     20 
     21 __all__ = ['ReadBuffer',
     22            'StreamingBuffer',
     23           ]
     24 
     25 import collections
     26 import os
     27 import urlparse
     28 
     29 from . import api_utils
     30 from . import common
     31 from . import errors
     32 from . import rest_api
     33 
     34 try:
     35   from google.appengine.api import urlfetch
     36   from google.appengine.ext import ndb
     37 except ImportError:
     38   from google.appengine.api import urlfetch
     39   from google.appengine.ext import ndb
     40 
     41 
     42 
     43 def _get_storage_api(retry_params, account_id=None):
     44   """Returns storage_api instance for API methods.
     45 
     46   Args:
     47     retry_params: An instance of api_utils.RetryParams. If none,
     48      thread's default will be used.
     49     account_id: Internal-use only.
     50 
     51   Returns:
     52     A storage_api instance to handle urlfetch work to GCS.
     53     On dev appserver, this instance by default will talk to a local stub
     54     unless common.ACCESS_TOKEN is set. That token will be used to talk
     55     to the real GCS.
     56   """
     57 
     58 
     59   api = _StorageApi(_StorageApi.full_control_scope,
     60                     service_account_id=account_id,
     61                     retry_params=retry_params)
     62   if common.local_run() and not common.get_access_token():
     63     api.api_url = common.local_api_url()
     64   if common.get_access_token():
     65     api.token = common.get_access_token()
     66   return api
     67 
     68 
     69 class _StorageApi(rest_api._RestApi):
     70   """A simple wrapper for the Google Storage RESTful API.
     71 
     72   WARNING: Do NOT directly use this api. It's an implementation detail
     73   and is subject to change at any release.
     74 
     75   All async methods have similar args and returns.
     76 
     77   Args:
     78     path: The path to the Google Storage object or bucket, e.g.
     79       '/mybucket/myfile' or '/mybucket'.
     80     **kwd: Options for urlfetch. e.g.
     81       headers={'content-type': 'text/plain'}, payload='blah'.
     82 
     83   Returns:
     84     A ndb Future. When fulfilled, future.get_result() should return
     85     a tuple of (status, headers, content) that represents a HTTP response
     86     of Google Cloud Storage XML API.
     87   """
     88 
     89   api_url = 'https://storage.googleapis.com'
     90   read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only'
     91   read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write'
     92   full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control'
     93 
     94   def __getstate__(self):
     95     """Store state as part of serialization/pickling.
     96 
     97     Returns:
     98       A tuple (of dictionaries) with the state of this object
     99     """
    100     return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url})
    101 
    102   def __setstate__(self, state):
    103     """Restore state as part of deserialization/unpickling.
    104 
    105     Args:
    106       state: the tuple from a __getstate__ call
    107     """
    108     superstate, localstate = state
    109     super(_StorageApi, self).__setstate__(superstate)
    110     self.api_url = localstate['api_url']
    111 
    112   @api_utils._eager_tasklet
    113   @ndb.tasklet
    114   def do_request_async(self, url, method='GET', headers=None, payload=None,
    115                        deadline=None, callback=None):
    116     """Inherit docs.
    117 
    118     This method translates urlfetch exceptions to more service specific ones.
    119     """
    120     if headers is None:
    121       headers = {}
    122     if 'x-goog-api-version' not in headers:
    123       headers['x-goog-api-version'] = '2'
    124     headers['accept-encoding'] = 'gzip, *'
    125     try:
    126       resp_tuple = yield super(_StorageApi, self).do_request_async(
    127           url, method=method, headers=headers, payload=payload,
    128           deadline=deadline, callback=callback)
    129     except urlfetch.DownloadError, e:
    130       raise errors.TimeoutError(
    131           'Request to Google Cloud Storage timed out.', e)
    132 
    133     raise ndb.Return(resp_tuple)
    134 
    135 
    136   def post_object_async(self, path, **kwds):
    137     """POST to an object."""
    138     return self.do_request_async(self.api_url + path, 'POST', **kwds)
    139 
    140   def put_object_async(self, path, **kwds):
    141     """PUT an object."""
    142     return self.do_request_async(self.api_url + path, 'PUT', **kwds)
    143 
    144   def get_object_async(self, path, **kwds):
    145     """GET an object.
    146 
    147     Note: No payload argument is supported.
    148     """
    149     return self.do_request_async(self.api_url + path, 'GET', **kwds)
    150 
    151   def delete_object_async(self, path, **kwds):
    152     """DELETE an object.
    153 
    154     Note: No payload argument is supported.
    155     """
    156     return self.do_request_async(self.api_url + path, 'DELETE', **kwds)
    157 
    158   def head_object_async(self, path, **kwds):
    159     """HEAD an object.
    160 
    161     Depending on request headers, HEAD returns various object properties,
    162     e.g. Content-Length, Last-Modified, and ETag.
    163 
    164     Note: No payload argument is supported.
    165     """
    166     return self.do_request_async(self.api_url + path, 'HEAD', **kwds)
    167 
    168   def get_bucket_async(self, path, **kwds):
    169     """GET a bucket."""
    170     return self.do_request_async(self.api_url + path, 'GET', **kwds)
    171 
    172 
    173 _StorageApi = rest_api.add_sync_methods(_StorageApi)
    174 
    175 
    176 class ReadBuffer(object):
    177   """A class for reading Google storage files."""
    178 
    179   DEFAULT_BUFFER_SIZE = 1024 * 1024
    180   MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE
    181 
    182   def __init__(self,
    183                api,
    184                path,
    185                buffer_size=DEFAULT_BUFFER_SIZE,
    186                max_request_size=MAX_REQUEST_SIZE):
    187     """Constructor.
    188 
    189     Args:
    190       api: A StorageApi instance.
    191       path: Quoted/escaped path to the object, e.g. /mybucket/myfile
    192       buffer_size: buffer size. The ReadBuffer keeps
    193         one buffer. But there may be a pending future that contains
    194         a second buffer. This size must be less than max_request_size.
    195       max_request_size: Max bytes to request in one urlfetch.
    196     """
    197     self._api = api
    198     self._path = path
    199     self.name = api_utils._unquote_filename(path)
    200     self.closed = False
    201 
    202     assert buffer_size <= max_request_size
    203     self._buffer_size = buffer_size
    204     self._max_request_size = max_request_size
    205     self._offset = 0
    206     self._buffer = _Buffer()
    207     self._etag = None
    208 
    209     get_future = self._get_segment(0, self._buffer_size, check_response=False)
    210 
    211     status, headers, content = self._api.head_object(path)
    212     errors.check_status(status, [200], path, resp_headers=headers, body=content)
    213     self._file_size = long(common.get_stored_content_length(headers))
    214     self._check_etag(headers.get('etag'))
    215 
    216     self._buffer_future = None
    217 
    218     if self._file_size != 0:
    219       content, check_response_closure = get_future.get_result()
    220       check_response_closure()
    221       self._buffer.reset(content)
    222       self._request_next_buffer()
    223 
    224   def __getstate__(self):
    225     """Store state as part of serialization/pickling.
    226 
    227     The contents of the read buffer are not stored, only the current offset for
    228     data read by the client. A new read buffer is established at unpickling.
    229     The head information for the object (file size and etag) are stored to
    230     reduce startup and ensure the file has not changed.
    231 
    232     Returns:
    233       A dictionary with the state of this object
    234     """
    235     return {'api': self._api,
    236             'path': self._path,
    237             'buffer_size': self._buffer_size,
    238             'request_size': self._max_request_size,
    239             'etag': self._etag,
    240             'size': self._file_size,
    241             'offset': self._offset,
    242             'closed': self.closed}
    243 
    244   def __setstate__(self, state):
    245     """Restore state as part of deserialization/unpickling.
    246 
    247     Args:
    248       state: the dictionary from a __getstate__ call
    249 
    250     Along with restoring the state, pre-fetch the next read buffer.
    251     """
    252     self._api = state['api']
    253     self._path = state['path']
    254     self.name = api_utils._unquote_filename(self._path)
    255     self._buffer_size = state['buffer_size']
    256     self._max_request_size = state['request_size']
    257     self._etag = state['etag']
    258     self._file_size = state['size']
    259     self._offset = state['offset']
    260     self._buffer = _Buffer()
    261     self.closed = state['closed']
    262     self._buffer_future = None
    263     if self._remaining() and not self.closed:
    264       self._request_next_buffer()
    265 
    266   def __iter__(self):
    267     """Iterator interface.
    268 
    269     Note the ReadBuffer container itself is the iterator. It's
    270     (quote PEP0234)
    271     'destructive: they consumes all the values and a second iterator
    272     cannot easily be created that iterates independently over the same values.
    273     You could open the file for the second time, or seek() to the beginning.'
    274 
    275     Returns:
    276       Self.
    277     """
    278     return self
    279 
    280   def next(self):
    281     line = self.readline()
    282     if not line:
    283       raise StopIteration()
    284     return line
    285 
    286   def readline(self, size=-1):
    287     """Read one line delimited by '\n' from the file.
    288 
    289     A trailing newline character is kept in the string. It may be absent when a
    290     file ends with an incomplete line. If the size argument is non-negative,
    291     it specifies the maximum string size (counting the newline) to return.
    292     A negative size is the same as unspecified. Empty string is returned
    293     only when EOF is encountered immediately.
    294 
    295     Args:
    296       size: Maximum number of bytes to read. If not specified, readline stops
    297         only on '\n' or EOF.
    298 
    299     Returns:
    300       The data read as a string.
    301 
    302     Raises:
    303       IOError: When this buffer is closed.
    304     """
    305     self._check_open()
    306     if size == 0 or not self._remaining():
    307       return ''
    308 
    309     data_list = []
    310     newline_offset = self._buffer.find_newline(size)
    311     while newline_offset < 0:
    312       data = self._buffer.read(size)
    313       size -= len(data)
    314       self._offset += len(data)
    315       data_list.append(data)
    316       if size == 0 or not self._remaining():
    317         return ''.join(data_list)
    318       self._buffer.reset(self._buffer_future.get_result())
    319       self._request_next_buffer()
    320       newline_offset = self._buffer.find_newline(size)
    321 
    322     data = self._buffer.read_to_offset(newline_offset + 1)
    323     self._offset += len(data)
    324     data_list.append(data)
    325 
    326     return ''.join(data_list)
    327 
    328   def read(self, size=-1):
    329     """Read data from RAW file.
    330 
    331     Args:
    332       size: Number of bytes to read as integer. Actual number of bytes
    333         read is always equal to size unless EOF is reached. If size is
    334         negative or unspecified, read the entire file.
    335 
    336     Returns:
    337       data read as str.
    338 
    339     Raises:
    340       IOError: When this buffer is closed.
    341     """
    342     self._check_open()
    343     if not self._remaining():
    344       return ''
    345 
    346     data_list = []
    347     while True:
    348       remaining = self._buffer.remaining()
    349       if size >= 0 and size < remaining:
    350         data_list.append(self._buffer.read(size))
    351         self._offset += size
    352         break
    353       else:
    354         size -= remaining
    355         self._offset += remaining
    356         data_list.append(self._buffer.read())
    357 
    358         if self._buffer_future is None:
    359           if size < 0 or size >= self._remaining():
    360             needs = self._remaining()
    361           else:
    362             needs = size
    363           data_list.extend(self._get_segments(self._offset, needs))
    364           self._offset += needs
    365           break
    366 
    367         if self._buffer_future:
    368           self._buffer.reset(self._buffer_future.get_result())
    369           self._buffer_future = None
    370 
    371     if self._buffer_future is None:
    372       self._request_next_buffer()
    373     return ''.join(data_list)
    374 
    375   def _remaining(self):
    376     return self._file_size - self._offset
    377 
    378   def _request_next_buffer(self):
    379     """Request next buffer.
    380 
    381     Requires self._offset and self._buffer are in consistent state.
    382     """
    383     self._buffer_future = None
    384     next_offset = self._offset + self._buffer.remaining()
    385     if next_offset != self._file_size:
    386       self._buffer_future = self._get_segment(next_offset,
    387                                               self._buffer_size)
    388 
    389   def _get_segments(self, start, request_size):
    390     """Get segments of the file from Google Storage as a list.
    391 
    392     A large request is broken into segments to avoid hitting urlfetch
    393     response size limit. Each segment is returned from a separate urlfetch.
    394 
    395     Args:
    396       start: start offset to request. Inclusive. Have to be within the
    397         range of the file.
    398       request_size: number of bytes to request.
    399 
    400     Returns:
    401       A list of file segments in order
    402     """
    403     if not request_size:
    404       return []
    405 
    406     end = start + request_size
    407     futures = []
    408 
    409     while request_size > self._max_request_size:
    410       futures.append(self._get_segment(start, self._max_request_size))
    411       request_size -= self._max_request_size
    412       start += self._max_request_size
    413     if start < end:
    414       futures.append(self._get_segment(start, end-start))
    415     return [fut.get_result() for fut in futures]
    416 
    417   @ndb.tasklet
    418   def _get_segment(self, start, request_size, check_response=True):
    419     """Get a segment of the file from Google Storage.
    420 
    421     Args:
    422       start: start offset of the segment. Inclusive. Have to be within the
    423         range of the file.
    424       request_size: number of bytes to request. Have to be small enough
    425         for a single urlfetch request. May go over the logical range of the
    426         file.
    427       check_response: True to check the validity of GCS response automatically
    428         before the future returns. False otherwise. See Yields section.
    429 
    430     Yields:
    431       If check_response is True, the segment [start, start + request_size)
    432       of the file.
    433       Otherwise, a tuple. The first element is the unverified file segment.
    434       The second element is a closure that checks response. Caller should
    435       first invoke the closure before consuing the file segment.
    436 
    437     Raises:
    438       ValueError: if the file has changed while reading.
    439     """
    440     end = start + request_size - 1
    441     content_range = '%d-%d' % (start, end)
    442     headers = {'Range': 'bytes=' + content_range}
    443     status, resp_headers, content = yield self._api.get_object_async(
    444         self._path, headers=headers)
    445     def _checker():
    446       errors.check_status(status, [200, 206], self._path, headers,
    447                           resp_headers, body=content)
    448       self._check_etag(resp_headers.get('etag'))
    449     if check_response:
    450       _checker()
    451       raise ndb.Return(content)
    452     raise ndb.Return(content, _checker)
    453 
    454   def _check_etag(self, etag):
    455     """Check if etag is the same across requests to GCS.
    456 
    457     If self._etag is None, set it. If etag is set, check that the new
    458     etag equals the old one.
    459 
    460     In the __init__ method, we fire one HEAD and one GET request using
    461     ndb tasklet. One of them would return first and set the first value.
    462 
    463     Args:
    464       etag: etag from a GCS HTTP response. None if etag is not part of the
    465         response header. It could be None for example in the case of GCS
    466         composite file.
    467 
    468     Raises:
    469       ValueError: if two etags are not equal.
    470     """
    471     if etag is None:
    472       return
    473     elif self._etag is None:
    474       self._etag = etag
    475     elif self._etag != etag:
    476       raise ValueError('File on GCS has changed while reading.')
    477 
    478   def close(self):
    479     self.closed = True
    480     self._buffer = None
    481     self._buffer_future = None
    482 
    483   def __enter__(self):
    484     return self
    485 
    486   def __exit__(self, atype, value, traceback):
    487     self.close()
    488     return False
    489 
    490   def seek(self, offset, whence=os.SEEK_SET):
    491     """Set the file's current offset.
    492 
    493     Note if the new offset is out of bound, it is adjusted to either 0 or EOF.
    494 
    495     Args:
    496       offset: seek offset as number.
    497       whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
    498         os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
    499         (seek relative to the end, offset should be negative).
    500 
    501     Raises:
    502       IOError: When this buffer is closed.
    503       ValueError: When whence is invalid.
    504     """
    505     self._check_open()
    506 
    507     self._buffer.reset()
    508     self._buffer_future = None
    509 
    510     if whence == os.SEEK_SET:
    511       self._offset = offset
    512     elif whence == os.SEEK_CUR:
    513       self._offset += offset
    514     elif whence == os.SEEK_END:
    515       self._offset = self._file_size + offset
    516     else:
    517       raise ValueError('Whence mode %s is invalid.' % str(whence))
    518 
    519     self._offset = min(self._offset, self._file_size)
    520     self._offset = max(self._offset, 0)
    521     if self._remaining():
    522       self._request_next_buffer()
    523 
    524   def tell(self):
    525     """Tell the file's current offset.
    526 
    527     Returns:
    528       current offset in reading this file.
    529 
    530     Raises:
    531       IOError: When this buffer is closed.
    532     """
    533     self._check_open()
    534     return self._offset
    535 
    536   def _check_open(self):
    537     if self.closed:
    538       raise IOError('Buffer is closed.')
    539 
    540   def seekable(self):
    541     return True
    542 
    543   def readable(self):
    544     return True
    545 
    546   def writable(self):
    547     return False
    548 
    549 
    550 class _Buffer(object):
    551   """In memory buffer."""
    552 
    553   def __init__(self):
    554     self.reset()
    555 
    556   def reset(self, content='', offset=0):
    557     self._buffer = content
    558     self._offset = offset
    559 
    560   def read(self, size=-1):
    561     """Returns bytes from self._buffer and update related offsets.
    562 
    563     Args:
    564       size: number of bytes to read starting from current offset.
    565         Read the entire buffer if negative.
    566 
    567     Returns:
    568       Requested bytes from buffer.
    569     """
    570     if size < 0:
    571       offset = len(self._buffer)
    572     else:
    573       offset = self._offset + size
    574     return self.read_to_offset(offset)
    575 
    576   def read_to_offset(self, offset):
    577     """Returns bytes from self._buffer and update related offsets.
    578 
    579     Args:
    580       offset: read from current offset to this offset, exclusive.
    581 
    582     Returns:
    583       Requested bytes from buffer.
    584     """
    585     assert offset >= self._offset
    586     result = self._buffer[self._offset: offset]
    587     self._offset += len(result)
    588     return result
    589 
    590   def remaining(self):
    591     return len(self._buffer) - self._offset
    592 
    593   def find_newline(self, size=-1):
    594     """Search for newline char in buffer starting from current offset.
    595 
    596     Args:
    597       size: number of bytes to search. -1 means all.
    598 
    599     Returns:
    600       offset of newline char in buffer. -1 if doesn't exist.
    601     """
    602     if size < 0:
    603       return self._buffer.find('\n', self._offset)
    604     return self._buffer.find('\n', self._offset, self._offset + size)
    605 
    606 
    607 class StreamingBuffer(object):
    608   """A class for creating large objects using the 'resumable' API.
    609 
    610   The API is a subset of the Python writable stream API sufficient to
    611   support writing zip files using the zipfile module.
    612 
    613   The exact sequence of calls and use of headers is documented at
    614   https://developers.google.com/storage/docs/developer-guide#unknownresumables
    615   """
    616 
    617   _blocksize = 256 * 1024
    618 
    619   _flushsize = 8 * _blocksize
    620 
    621   _maxrequestsize = 9 * 4 * _blocksize
    622 
    623   def __init__(self,
    624                api,
    625                path,
    626                content_type=None,
    627                gcs_headers=None):
    628     """Constructor.
    629 
    630     Args:
    631       api: A StorageApi instance.
    632       path: Quoted/escaped path to the object, e.g. /mybucket/myfile
    633       content_type: Optional content-type; Default value is
    634         delegate to Google Cloud Storage.
    635       gcs_headers: additional gs headers as a str->str dict, e.g
    636         {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
    637     Raises:
    638       IOError: When this location can not be found.
    639     """
    640     assert self._maxrequestsize > self._blocksize
    641     assert self._maxrequestsize % self._blocksize == 0
    642     assert self._maxrequestsize >= self._flushsize
    643 
    644     self._api = api
    645     self._path = path
    646 
    647     self.name = api_utils._unquote_filename(path)
    648     self.closed = False
    649 
    650     self._buffer = collections.deque()
    651     self._buffered = 0
    652     self._written = 0
    653     self._offset = 0
    654 
    655     headers = {'x-goog-resumable': 'start'}
    656     if content_type:
    657       headers['content-type'] = content_type
    658     if gcs_headers:
    659       headers.update(gcs_headers)
    660     status, resp_headers, content = self._api.post_object(path, headers=headers)
    661     errors.check_status(status, [201], path, headers, resp_headers,
    662                         body=content)
    663     loc = resp_headers.get('location')
    664     if not loc:
    665       raise IOError('No location header found in 201 response')
    666     parsed = urlparse.urlparse(loc)
    667     self._path_with_token = '%s?%s' % (self._path, parsed.query)
    668 
    669   def __getstate__(self):
    670     """Store state as part of serialization/pickling.
    671 
    672     The contents of the write buffer are stored. Writes to the underlying
    673     storage are required to be on block boundaries (_blocksize) except for the
    674     last write. In the worst case the pickled version of this object may be
    675     slightly larger than the blocksize.
    676 
    677     Returns:
    678       A dictionary with the state of this object
    679 
    680     """
    681     return {'api': self._api,
    682             'path': self._path,
    683             'path_token': self._path_with_token,
    684             'buffer': self._buffer,
    685             'buffered': self._buffered,
    686             'written': self._written,
    687             'offset': self._offset,
    688             'closed': self.closed}
    689 
    690   def __setstate__(self, state):
    691     """Restore state as part of deserialization/unpickling.
    692 
    693     Args:
    694       state: the dictionary from a __getstate__ call
    695     """
    696     self._api = state['api']
    697     self._path_with_token = state['path_token']
    698     self._buffer = state['buffer']
    699     self._buffered = state['buffered']
    700     self._written = state['written']
    701     self._offset = state['offset']
    702     self.closed = state['closed']
    703     self._path = state['path']
    704     self.name = api_utils._unquote_filename(self._path)
    705 
    706   def write(self, data):
    707     """Write some bytes.
    708 
    709     Args:
    710       data: data to write. str.
    711 
    712     Raises:
    713       TypeError: if data is not of type str.
    714     """
    715     self._check_open()
    716     if not isinstance(data, str):
    717       raise TypeError('Expected str but got %s.' % type(data))
    718     if not data:
    719       return
    720     self._buffer.append(data)
    721     self._buffered += len(data)
    722     self._offset += len(data)
    723     if self._buffered >= self._flushsize:
    724       self._flush()
    725 
    726   def flush(self):
    727     """Flush as much as possible to GCS.
    728 
    729     GCS *requires* that all writes except for the final one align on
    730     256KB boundaries. So the internal buffer may still have < 256KB bytes left
    731     after flush.
    732     """
    733     self._check_open()
    734     self._flush(finish=False)
    735 
    736   def tell(self):
    737     """Return the total number of bytes passed to write() so far.
    738 
    739     (There is no seek() method.)
    740     """
    741     return self._offset
    742 
    743   def close(self):
    744     """Flush the buffer and finalize the file.
    745 
    746     When this returns the new file is available for reading.
    747     """
    748     if not self.closed:
    749       self.closed = True
    750       self._flush(finish=True)
    751       self._buffer = None
    752 
    753   def __enter__(self):
    754     return self
    755 
    756   def __exit__(self, atype, value, traceback):
    757     self.close()
    758     return False
    759 
    760   def _flush(self, finish=False):
    761     """Internal API to flush.
    762 
    763     Buffer is flushed to GCS only when the total amount of buffered data is at
    764     least self._blocksize, or to flush the final (incomplete) block of
    765     the file with finish=True.
    766     """
    767     while ((finish and self._buffered >= 0) or
    768            (not finish and self._buffered >= self._blocksize)):
    769       tmp_buffer = []
    770       tmp_buffer_len = 0
    771 
    772       excess = 0
    773       while self._buffer:
    774         buf = self._buffer.popleft()
    775         size = len(buf)
    776         self._buffered -= size
    777         tmp_buffer.append(buf)
    778         tmp_buffer_len += size
    779         if tmp_buffer_len >= self._maxrequestsize:
    780           excess = tmp_buffer_len - self._maxrequestsize
    781           break
    782         if not finish and (
    783             tmp_buffer_len % self._blocksize + self._buffered <
    784             self._blocksize):
    785           excess = tmp_buffer_len % self._blocksize
    786           break
    787 
    788       if excess:
    789         over = tmp_buffer.pop()
    790         size = len(over)
    791         assert size >= excess
    792         tmp_buffer_len -= size
    793         head, tail = over[:-excess], over[-excess:]
    794         self._buffer.appendleft(tail)
    795         self._buffered += len(tail)
    796         if head:
    797           tmp_buffer.append(head)
    798           tmp_buffer_len += len(head)
    799 
    800       data = ''.join(tmp_buffer)
    801       file_len = '*'
    802       if finish and not self._buffered:
    803         file_len = self._written + len(data)
    804       self._send_data(data, self._written, file_len)
    805       self._written += len(data)
    806       if file_len != '*':
    807         break
    808 
    809   def _send_data(self, data, start_offset, file_len):
    810     """Send the block to the storage service.
    811 
    812     This is a utility method that does not modify self.
    813 
    814     Args:
    815       data: data to send in str.
    816       start_offset: start offset of the data in relation to the file.
    817       file_len: an int if this is the last data to append to the file.
    818         Otherwise '*'.
    819     """
    820     headers = {}
    821     end_offset = start_offset + len(data) - 1
    822 
    823     if data:
    824       headers['content-range'] = ('bytes %d-%d/%s' %
    825                                   (start_offset, end_offset, file_len))
    826     else:
    827       headers['content-range'] = ('bytes */%s' % file_len)
    828 
    829     status, response_headers, content = self._api.put_object(
    830         self._path_with_token, payload=data, headers=headers)
    831     if file_len == '*':
    832       expected = 308
    833     else:
    834       expected = 200
    835     errors.check_status(status, [expected], self._path, headers,
    836                         response_headers, content,
    837                         {'upload_path': self._path_with_token})
    838 
    839   def _get_offset_from_gcs(self):
    840     """Get the last offset that has been written to GCS.
    841 
    842     This is a utility method that does not modify self.
    843 
    844     Returns:
    845       an int of the last offset written to GCS by this upload, inclusive.
    846       -1 means nothing has been written.
    847     """
    848     headers = {'content-range': 'bytes */*'}
    849     status, response_headers, content = self._api.put_object(
    850         self._path_with_token, headers=headers)
    851     errors.check_status(status, [308], self._path, headers,
    852                         response_headers, content,
    853                         {'upload_path': self._path_with_token})
    854     val = response_headers.get('range')
    855     if val is None:
    856       return -1
    857     _, offset = val.rsplit('-', 1)
    858     return int(offset)
    859 
    860   def _force_close(self, file_length=None):
    861     """Close this buffer on file_length.
    862 
    863     Finalize this upload immediately on file_length.
    864     Contents that are still in memory will not be uploaded.
    865 
    866     This is a utility method that does not modify self.
    867 
    868     Args:
    869       file_length: file length. Must match what has been uploaded. If None,
    870         it will be queried from GCS.
    871     """
    872     if file_length is None:
    873       file_length = self._get_offset_from_gcs() + 1
    874     self._send_data('', 0, file_length)
    875 
    876   def _check_open(self):
    877     if self.closed:
    878       raise IOError('Buffer is closed.')
    879 
    880   def seekable(self):
    881     return False
    882 
    883   def readable(self):
    884     return False
    885 
    886   def writable(self):
    887     return True
    888