Home | History | Annotate | Download | only in cloudstorage
      1 # Copyright 2012 Google Inc. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #    http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing,
     10 # software distributed under the License is distributed on an
     11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
     12 # either express or implied. See the License for the specific
     13 # language governing permissions and limitations under the License.
     14 
     15 """Python wrappers for the Google Storage RESTful API."""
     16 
     17 
     18 
     19 
     20 
     21 __all__ = ['ReadBuffer',
     22            'StreamingBuffer',
     23           ]
     24 
     25 import collections
     26 import logging
     27 import os
     28 import urlparse
     29 
     30 from . import api_utils
     31 from . import common
     32 from . import errors
     33 from . import rest_api
     34 
     35 try:
     36   from google.appengine.api import urlfetch
     37   from google.appengine.ext import ndb
     38 except ImportError:
     39   from google.appengine.api import urlfetch
     40   from google.appengine.ext import ndb
     41 
     42 
     43 
     44 def _get_storage_api(retry_params, account_id=None):
     45   """Returns storage_api instance for API methods.
     46 
     47   Args:
     48     retry_params: An instance of api_utils.RetryParams. If none,
     49      thread's default will be used.
     50     account_id: Internal-use only.
     51 
     52   Returns:
     53     A storage_api instance to handle urlfetch work to GCS.
     54     On dev appserver, this instance by default will talk to a local stub
     55     unless common.ACCESS_TOKEN is set. That token will be used to talk
     56     to the real GCS.
     57   """
     58 
     59 
     60   api = _StorageApi(_StorageApi.full_control_scope,
     61                     service_account_id=account_id,
     62                     retry_params=retry_params)
     63   if common.local_run() and not common.get_access_token():
     64     api.api_url = common.local_api_url()
     65   if common.get_access_token():
     66     api.token = common.get_access_token()
     67   return api
     68 
     69 
     70 class _StorageApi(rest_api._RestApi):
     71   """A simple wrapper for the Google Storage RESTful API.
     72 
     73   WARNING: Do NOT directly use this api. It's an implementation detail
     74   and is subject to change at any release.
     75 
     76   All async methods have similar args and returns.
     77 
     78   Args:
     79     path: The path to the Google Storage object or bucket, e.g.
     80       '/mybucket/myfile' or '/mybucket'.
     81     **kwd: Options for urlfetch. e.g.
     82       headers={'content-type': 'text/plain'}, payload='blah'.
     83 
     84   Returns:
     85     A ndb Future. When fulfilled, future.get_result() should return
     86     a tuple of (status, headers, content) that represents a HTTP response
     87     of Google Cloud Storage XML API.
     88   """
     89 
     90   api_url = 'https://storage.googleapis.com'
     91   read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only'
     92   read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write'
     93   full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control'
     94 
     95   def __getstate__(self):
     96     """Store state as part of serialization/pickling.
     97 
     98     Returns:
     99       A tuple (of dictionaries) with the state of this object
    100     """
    101     return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url})
    102 
    103   def __setstate__(self, state):
    104     """Restore state as part of deserialization/unpickling.
    105 
    106     Args:
    107       state: the tuple from a __getstate__ call
    108     """
    109     superstate, localstate = state
    110     super(_StorageApi, self).__setstate__(superstate)
    111     self.api_url = localstate['api_url']
    112 
    113   @api_utils._eager_tasklet
    114   @ndb.tasklet
    115   def do_request_async(self, url, method='GET', headers=None, payload=None,
    116                        deadline=None, callback=None):
    117     """Inherit docs.
    118 
    119     This method translates urlfetch exceptions to more service specific ones.
    120     """
    121     if headers is None:
    122       headers = {}
    123     if 'x-goog-api-version' not in headers:
    124       headers['x-goog-api-version'] = '2'
    125     headers['accept-encoding'] = 'gzip, *'
    126     try:
    127       resp_tuple = yield super(_StorageApi, self).do_request_async(
    128           url, method=method, headers=headers, payload=payload,
    129           deadline=deadline, callback=callback)
    130     except urlfetch.DownloadError, e:
    131       raise errors.TimeoutError(
    132           'Request to Google Cloud Storage timed out.', e)
    133 
    134     raise ndb.Return(resp_tuple)
    135 
    136 
    137   def post_object_async(self, path, **kwds):
    138     """POST to an object."""
    139     return self.do_request_async(self.api_url + path, 'POST', **kwds)
    140 
    141   def put_object_async(self, path, **kwds):
    142     """PUT an object."""
    143     return self.do_request_async(self.api_url + path, 'PUT', **kwds)
    144 
    145   def get_object_async(self, path, **kwds):
    146     """GET an object.
    147 
    148     Note: No payload argument is supported.
    149     """
    150     return self.do_request_async(self.api_url + path, 'GET', **kwds)
    151 
    152   def delete_object_async(self, path, **kwds):
    153     """DELETE an object.
    154 
    155     Note: No payload argument is supported.
    156     """
    157     return self.do_request_async(self.api_url + path, 'DELETE', **kwds)
    158 
    159   def head_object_async(self, path, **kwds):
    160     """HEAD an object.
    161 
    162     Depending on request headers, HEAD returns various object properties,
    163     e.g. Content-Length, Last-Modified, and ETag.
    164 
    165     Note: No payload argument is supported.
    166     """
    167     return self.do_request_async(self.api_url + path, 'HEAD', **kwds)
    168 
    169   def get_bucket_async(self, path, **kwds):
    170     """GET a bucket."""
    171     return self.do_request_async(self.api_url + path, 'GET', **kwds)
    172 
    173 
    174 _StorageApi = rest_api.add_sync_methods(_StorageApi)
    175 
    176 
    177 class ReadBuffer(object):
    178   """A class for reading Google storage files."""
    179 
    180   DEFAULT_BUFFER_SIZE = 1024 * 1024
    181   MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE
    182 
    183   def __init__(self,
    184                api,
    185                path,
    186                buffer_size=DEFAULT_BUFFER_SIZE,
    187                max_request_size=MAX_REQUEST_SIZE):
    188     """Constructor.
    189 
    190     Args:
    191       api: A StorageApi instance.
    192       path: Quoted/escaped path to the object, e.g. /mybucket/myfile
    193       buffer_size: buffer size. The ReadBuffer keeps
    194         one buffer. But there may be a pending future that contains
    195         a second buffer. This size must be less than max_request_size.
    196       max_request_size: Max bytes to request in one urlfetch.
    197     """
    198     self._api = api
    199     self._path = path
    200     self.name = api_utils._unquote_filename(path)
    201     self.closed = False
    202 
    203     assert buffer_size <= max_request_size
    204     self._buffer_size = buffer_size
    205     self._max_request_size = max_request_size
    206     self._offset = 0
    207     self._buffer = _Buffer()
    208     self._etag = None
    209 
    210     self._request_next_buffer()
    211 
    212     status, headers, _ = self._api.head_object(path)
    213     errors.check_status(status, [200], path, resp_headers=headers)
    214     self._file_size = long(headers['content-length'])
    215     self._check_etag(headers.get('etag'))
    216     if self._file_size == 0:
    217       self._buffer_future = None
    218 
    219   def __getstate__(self):
    220     """Store state as part of serialization/pickling.
    221 
    222     The contents of the read buffer are not stored, only the current offset for
    223     data read by the client. A new read buffer is established at unpickling.
    224     The head information for the object (file size and etag) are stored to
    225     reduce startup and ensure the file has not changed.
    226 
    227     Returns:
    228       A dictionary with the state of this object
    229     """
    230     return {'api': self._api,
    231             'path': self._path,
    232             'buffer_size': self._buffer_size,
    233             'request_size': self._max_request_size,
    234             'etag': self._etag,
    235             'size': self._file_size,
    236             'offset': self._offset,
    237             'closed': self.closed}
    238 
    239   def __setstate__(self, state):
    240     """Restore state as part of deserialization/unpickling.
    241 
    242     Args:
    243       state: the dictionary from a __getstate__ call
    244 
    245     Along with restoring the state, pre-fetch the next read buffer.
    246     """
    247     self._api = state['api']
    248     self._path = state['path']
    249     self.name = api_utils._unquote_filename(self._path)
    250     self._buffer_size = state['buffer_size']
    251     self._max_request_size = state['request_size']
    252     self._etag = state['etag']
    253     self._file_size = state['size']
    254     self._offset = state['offset']
    255     self._buffer = _Buffer()
    256     self.closed = state['closed']
    257     self._buffer_future = None
    258     if self._remaining() and not self.closed:
    259       self._request_next_buffer()
    260 
    261   def __iter__(self):
    262     """Iterator interface.
    263 
    264     Note the ReadBuffer container itself is the iterator. It's
    265     (quote PEP0234)
    266     'destructive: they consumes all the values and a second iterator
    267     cannot easily be created that iterates independently over the same values.
    268     You could open the file for the second time, or seek() to the beginning.'
    269 
    270     Returns:
    271       Self.
    272     """
    273     return self
    274 
    275   def next(self):
    276     line = self.readline()
    277     if not line:
    278       raise StopIteration()
    279     return line
    280 
    281   def readline(self, size=-1):
    282     """Read one line delimited by '\n' from the file.
    283 
    284     A trailing newline character is kept in the string. It may be absent when a
    285     file ends with an incomplete line. If the size argument is non-negative,
    286     it specifies the maximum string size (counting the newline) to return.
    287     A negative size is the same as unspecified. Empty string is returned
    288     only when EOF is encountered immediately.
    289 
    290     Args:
    291       size: Maximum number of bytes to read. If not specified, readline stops
    292         only on '\n' or EOF.
    293 
    294     Returns:
    295       The data read as a string.
    296 
    297     Raises:
    298       IOError: When this buffer is closed.
    299     """
    300     self._check_open()
    301     if size == 0 or not self._remaining():
    302       return ''
    303 
    304     data_list = []
    305     newline_offset = self._buffer.find_newline(size)
    306     while newline_offset < 0:
    307       data = self._buffer.read(size)
    308       size -= len(data)
    309       self._offset += len(data)
    310       data_list.append(data)
    311       if size == 0 or not self._remaining():
    312         return ''.join(data_list)
    313       self._buffer.reset(self._buffer_future.get_result())
    314       self._request_next_buffer()
    315       newline_offset = self._buffer.find_newline(size)
    316 
    317     data = self._buffer.read_to_offset(newline_offset + 1)
    318     self._offset += len(data)
    319     data_list.append(data)
    320 
    321     return ''.join(data_list)
    322 
    323   def read(self, size=-1):
    324     """Read data from RAW file.
    325 
    326     Args:
    327       size: Number of bytes to read as integer. Actual number of bytes
    328         read is always equal to size unless EOF is reached. If size is
    329         negative or unspecified, read the entire file.
    330 
    331     Returns:
    332       data read as str.
    333 
    334     Raises:
    335       IOError: When this buffer is closed.
    336     """
    337     self._check_open()
    338     if not self._remaining():
    339       return ''
    340 
    341     data_list = []
    342     while True:
    343       remaining = self._buffer.remaining()
    344       if size >= 0 and size < remaining:
    345         data_list.append(self._buffer.read(size))
    346         self._offset += size
    347         break
    348       else:
    349         size -= remaining
    350         self._offset += remaining
    351         data_list.append(self._buffer.read())
    352 
    353         if self._buffer_future is None:
    354           if size < 0 or size >= self._remaining():
    355             needs = self._remaining()
    356           else:
    357             needs = size
    358           data_list.extend(self._get_segments(self._offset, needs))
    359           self._offset += needs
    360           break
    361 
    362         if self._buffer_future:
    363           self._buffer.reset(self._buffer_future.get_result())
    364           self._buffer_future = None
    365 
    366     if self._buffer_future is None:
    367       self._request_next_buffer()
    368     return ''.join(data_list)
    369 
    370   def _remaining(self):
    371     return self._file_size - self._offset
    372 
    373   def _request_next_buffer(self):
    374     """Request next buffer.
    375 
    376     Requires self._offset and self._buffer are in consistent state
    377     """
    378     self._buffer_future = None
    379     next_offset = self._offset + self._buffer.remaining()
    380     if not hasattr(self, '_file_size') or next_offset != self._file_size:
    381       self._buffer_future = self._get_segment(next_offset,
    382                                               self._buffer_size)
    383 
    384   def _get_segments(self, start, request_size):
    385     """Get segments of the file from Google Storage as a list.
    386 
    387     A large request is broken into segments to avoid hitting urlfetch
    388     response size limit. Each segment is returned from a separate urlfetch.
    389 
    390     Args:
    391       start: start offset to request. Inclusive. Have to be within the
    392         range of the file.
    393       request_size: number of bytes to request.
    394 
    395     Returns:
    396       A list of file segments in order
    397     """
    398     if not request_size:
    399       return []
    400 
    401     end = start + request_size
    402     futures = []
    403 
    404     while request_size > self._max_request_size:
    405       futures.append(self._get_segment(start, self._max_request_size))
    406       request_size -= self._max_request_size
    407       start += self._max_request_size
    408     if start < end:
    409       futures.append(self._get_segment(start, end-start))
    410     return [fut.get_result() for fut in futures]
    411 
    412   @ndb.tasklet
    413   def _get_segment(self, start, request_size):
    414     """Get a segment of the file from Google Storage.
    415 
    416     Args:
    417       start: start offset of the segment. Inclusive. Have to be within the
    418         range of the file.
    419       request_size: number of bytes to request. Have to be small enough
    420         for a single urlfetch request. May go over the logical range of the
    421         file.
    422 
    423     Yields:
    424       a segment [start, start + request_size) of the file.
    425 
    426     Raises:
    427       ValueError: if the file has changed while reading.
    428     """
    429     end = start + request_size - 1
    430     content_range = '%d-%d' % (start, end)
    431     headers = {'Range': 'bytes=' + content_range}
    432     status, resp_headers, content = yield self._api.get_object_async(
    433         self._path, headers=headers)
    434     errors.check_status(status, [200, 206], self._path, headers, resp_headers)
    435     self._check_etag(resp_headers.get('etag'))
    436     raise ndb.Return(content)
    437 
    438   def _check_etag(self, etag):
    439     """Check if etag is the same across requests to GCS.
    440 
    441     If self._etag is None, set it. If etag is set, check that the new
    442     etag equals the old one.
    443 
    444     In the __init__ method, we fire one HEAD and one GET request using
    445     ndb tasklet. One of them would return first and set the first value.
    446 
    447     Args:
    448       etag: etag from a GCS HTTP response. None if etag is not part of the
    449         response header. It could be None for example in the case of GCS
    450         composite file.
    451 
    452     Raises:
    453       ValueError: if two etags are not equal.
    454     """
    455     if etag is None:
    456       return
    457     elif self._etag is None:
    458       self._etag = etag
    459     elif self._etag != etag:
    460       raise ValueError('File on GCS has changed while reading.')
    461 
    462   def close(self):
    463     self.closed = True
    464     self._buffer = None
    465     self._buffer_future = None
    466 
    467   def __enter__(self):
    468     return self
    469 
    470   def __exit__(self, atype, value, traceback):
    471     self.close()
    472     return False
    473 
    474   def seek(self, offset, whence=os.SEEK_SET):
    475     """Set the file's current offset.
    476 
    477     Note if the new offset is out of bound, it is adjusted to either 0 or EOF.
    478 
    479     Args:
    480       offset: seek offset as number.
    481       whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
    482         os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
    483         (seek relative to the end, offset should be negative).
    484 
    485     Raises:
    486       IOError: When this buffer is closed.
    487       ValueError: When whence is invalid.
    488     """
    489     self._check_open()
    490 
    491     self._buffer.reset()
    492     self._buffer_future = None
    493 
    494     if whence == os.SEEK_SET:
    495       self._offset = offset
    496     elif whence == os.SEEK_CUR:
    497       self._offset += offset
    498     elif whence == os.SEEK_END:
    499       self._offset = self._file_size + offset
    500     else:
    501       raise ValueError('Whence mode %s is invalid.' % str(whence))
    502 
    503     self._offset = min(self._offset, self._file_size)
    504     self._offset = max(self._offset, 0)
    505     if self._remaining():
    506       self._request_next_buffer()
    507 
    508   def tell(self):
    509     """Tell the file's current offset.
    510 
    511     Returns:
    512       current offset in reading this file.
    513 
    514     Raises:
    515       IOError: When this buffer is closed.
    516     """
    517     self._check_open()
    518     return self._offset
    519 
    520   def _check_open(self):
    521     if self.closed:
    522       raise IOError('Buffer is closed.')
    523 
    524   def seekable(self):
    525     return True
    526 
    527   def readable(self):
    528     return True
    529 
    530   def writable(self):
    531     return False
    532 
    533 
    534 class _Buffer(object):
    535   """In memory buffer."""
    536 
    537   def __init__(self):
    538     self.reset()
    539 
    540   def reset(self, content='', offset=0):
    541     self._buffer = content
    542     self._offset = offset
    543 
    544   def read(self, size=-1):
    545     """Returns bytes from self._buffer and update related offsets.
    546 
    547     Args:
    548       size: number of bytes to read starting from current offset.
    549         Read the entire buffer if negative.
    550 
    551     Returns:
    552       Requested bytes from buffer.
    553     """
    554     if size < 0:
    555       offset = len(self._buffer)
    556     else:
    557       offset = self._offset + size
    558     return self.read_to_offset(offset)
    559 
    560   def read_to_offset(self, offset):
    561     """Returns bytes from self._buffer and update related offsets.
    562 
    563     Args:
    564       offset: read from current offset to this offset, exclusive.
    565 
    566     Returns:
    567       Requested bytes from buffer.
    568     """
    569     assert offset >= self._offset
    570     result = self._buffer[self._offset: offset]
    571     self._offset += len(result)
    572     return result
    573 
    574   def remaining(self):
    575     return len(self._buffer) - self._offset
    576 
    577   def find_newline(self, size=-1):
    578     """Search for newline char in buffer starting from current offset.
    579 
    580     Args:
    581       size: number of bytes to search. -1 means all.
    582 
    583     Returns:
    584       offset of newline char in buffer. -1 if doesn't exist.
    585     """
    586     if size < 0:
    587       return self._buffer.find('\n', self._offset)
    588     return self._buffer.find('\n', self._offset, self._offset + size)
    589 
    590 
    591 class StreamingBuffer(object):
    592   """A class for creating large objects using the 'resumable' API.
    593 
    594   The API is a subset of the Python writable stream API sufficient to
    595   support writing zip files using the zipfile module.
    596 
    597   The exact sequence of calls and use of headers is documented at
    598   https://developers.google.com/storage/docs/developer-guide#unknownresumables
    599   """
    600 
    601   _blocksize = 256 * 1024
    602 
    603   _maxrequestsize = 16 * _blocksize
    604 
    605   def __init__(self,
    606                api,
    607                path,
    608                content_type=None,
    609                gcs_headers=None):
    610     """Constructor.
    611 
    612     Args:
    613       api: A StorageApi instance.
    614       path: Quoted/escaped path to the object, e.g. /mybucket/myfile
    615       content_type: Optional content-type; Default value is
    616         delegate to Google Cloud Storage.
    617       gcs_headers: additional gs headers as a str->str dict, e.g
    618         {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
    619     """
    620     assert self._maxrequestsize > self._blocksize
    621     assert self._maxrequestsize % self._blocksize == 0
    622 
    623     self._api = api
    624     self._path = path
    625     self.name = api_utils._unquote_filename(path)
    626     self.closed = False
    627 
    628     self._buffer = collections.deque()
    629     self._buffered = 0
    630     self._written = 0
    631     self._offset = 0
    632 
    633     headers = {'x-goog-resumable': 'start'}
    634     if content_type:
    635       headers['content-type'] = content_type
    636     if gcs_headers:
    637       headers.update(gcs_headers)
    638     status, resp_headers, _ = self._api.post_object(path, headers=headers)
    639     errors.check_status(status, [201], path, headers, resp_headers)
    640     loc = resp_headers.get('location')
    641     if not loc:
    642       raise IOError('No location header found in 201 response')
    643     parsed = urlparse.urlparse(loc)
    644     self._path_with_token = '%s?%s' % (self._path, parsed.query)
    645 
    646   def __getstate__(self):
    647     """Store state as part of serialization/pickling.
    648 
    649     The contents of the write buffer are stored. Writes to the underlying
    650     storage are required to be on block boundaries (_blocksize) except for the
    651     last write. In the worst case the pickled version of this object may be
    652     slightly larger than the blocksize.
    653 
    654     Returns:
    655       A dictionary with the state of this object
    656 
    657     """
    658     return {'api': self._api,
    659             'path': self._path,
    660             'path_token': self._path_with_token,
    661             'buffer': self._buffer,
    662             'buffered': self._buffered,
    663             'written': self._written,
    664             'offset': self._offset,
    665             'closed': self.closed}
    666 
    667   def __setstate__(self, state):
    668     """Restore state as part of deserialization/unpickling.
    669 
    670     Args:
    671       state: the dictionary from a __getstate__ call
    672     """
    673     self._api = state['api']
    674     self._path_with_token = state['path_token']
    675     self._buffer = state['buffer']
    676     self._buffered = state['buffered']
    677     self._written = state['written']
    678     self._offset = state['offset']
    679     self.closed = state['closed']
    680     self._path = state['path']
    681     self.name = api_utils._unquote_filename(self._path)
    682 
    683   def write(self, data):
    684     """Write some bytes.
    685 
    686     Args:
    687       data: data to write. str.
    688 
    689     Raises:
    690       TypeError: if data is not of type str.
    691     """
    692     self._check_open()
    693     if not isinstance(data, str):
    694       raise TypeError('Expected str but got %s.' % type(data))
    695     if not data:
    696       return
    697     self._buffer.append(data)
    698     self._buffered += len(data)
    699     self._offset += len(data)
    700     if self._buffered >= self._blocksize:
    701       self._flush()
    702 
    703   def flush(self):
    704     """Dummy API.
    705 
    706     This API is provided because the zipfile module uses it.  It is a
    707     no-op because Google Storage *requires* that all writes except for
    708     the final one are multiples on 256K bytes aligned on 256K-byte
    709     boundaries.
    710     """
    711     self._check_open()
    712 
    713   def tell(self):
    714     """Return the total number of bytes passed to write() so far.
    715 
    716     (There is no seek() method.)
    717     """
    718     return self._offset
    719 
    720   def close(self):
    721     """Flush the buffer and finalize the file.
    722 
    723     When this returns the new file is available for reading.
    724     """
    725     if not self.closed:
    726       self.closed = True
    727       self._flush(finish=True)
    728       self._buffer = None
    729 
    730   def __enter__(self):
    731     return self
    732 
    733   def __exit__(self, atype, value, traceback):
    734     self.close()
    735     return False
    736 
    737   def _flush(self, finish=False):
    738     """Internal API to flush.
    739 
    740     This is called only when the total amount of buffered data is at
    741     least self._blocksize, or to flush the final (incomplete) block of
    742     the file with finish=True.
    743     """
    744     flush_len = 0 if finish else self._blocksize
    745 
    746     while self._buffered >= flush_len:
    747       buffer = []
    748       buffered = 0
    749 
    750       while self._buffer:
    751         buf = self._buffer.popleft()
    752         size = len(buf)
    753         self._buffered -= size
    754         buffer.append(buf)
    755         buffered += size
    756         if buffered >= self._maxrequestsize:
    757           break
    758 
    759       if buffered > self._maxrequestsize:
    760         excess = buffered - self._maxrequestsize
    761       elif finish:
    762         excess = 0
    763       else:
    764         excess = buffered % self._blocksize
    765 
    766       if excess:
    767         over = buffer.pop()
    768         size = len(over)
    769         assert size >= excess
    770         buffered -= size
    771         head, tail = over[:-excess], over[-excess:]
    772         self._buffer.appendleft(tail)
    773         self._buffered += len(tail)
    774         if head:
    775           buffer.append(head)
    776           buffered += len(head)
    777 
    778       data = ''.join(buffer)
    779       file_len = '*'
    780       if finish and not self._buffered:
    781         file_len = self._written + len(data)
    782       self._send_data(data, self._written, file_len)
    783       self._written += len(data)
    784       if file_len != '*':
    785         break
    786 
    787   def _send_data(self, data, start_offset, file_len):
    788     """Send the block to the storage service.
    789 
    790     This is a utility method that does not modify self.
    791 
    792     Args:
    793       data: data to send in str.
    794       start_offset: start offset of the data in relation to the file.
    795       file_len: an int if this is the last data to append to the file.
    796         Otherwise '*'.
    797     """
    798     headers = {}
    799     end_offset = start_offset + len(data) - 1
    800 
    801     if data:
    802       headers['content-range'] = ('bytes %d-%d/%s' %
    803                                   (start_offset, end_offset, file_len))
    804     else:
    805       headers['content-range'] = ('bytes */%s' % file_len)
    806 
    807     status, response_headers, _ = self._api.put_object(
    808         self._path_with_token, payload=data, headers=headers)
    809     if file_len == '*':
    810       expected = 308
    811     else:
    812       expected = 200
    813     errors.check_status(status, [expected], self._path, headers,
    814                         response_headers,
    815                         {'upload_path': self._path_with_token})
    816 
    817   def _get_offset_from_gcs(self):
    818     """Get the last offset that has been written to GCS.
    819 
    820     This is a utility method that does not modify self.
    821 
    822     Returns:
    823       an int of the last offset written to GCS by this upload, inclusive.
    824       -1 means nothing has been written.
    825     """
    826     headers = {'content-range': 'bytes */*'}
    827     status, response_headers, _ = self._api.put_object(
    828         self._path_with_token, headers=headers)
    829     errors.check_status(status, [308], self._path, headers,
    830                         response_headers,
    831                         {'upload_path': self._path_with_token})
    832     val = response_headers.get('range')
    833     if val is None:
    834       return -1
    835     _, offset = val.rsplit('-', 1)
    836     return int(offset)
    837 
    838   def _force_close(self, file_length=None):
    839     """Close this buffer on file_length.
    840 
    841     Finalize this upload immediately on file_length.
    842     Contents that are still in memory will not be uploaded.
    843 
    844     This is a utility method that does not modify self.
    845 
    846     Args:
    847       file_length: file length. Must match what has been uploaded. If None,
    848         it will be queried from GCS.
    849     """
    850     if file_length is None:
    851       file_length = self._get_offset_from_gcs() + 1
    852     self._send_data('', 0, file_length)
    853 
    854   def _check_open(self):
    855     if self.closed:
    856       raise IOError('Buffer is closed.')
    857 
    858   def seekable(self):
    859     return False
    860 
    861   def readable(self):
    862     return False
    863 
    864   def writable(self):
    865     return True
    866