Home | History | Annotate | Download | only in cloudstorage
      1 # Copyright 2012 Google Inc. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #    http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing,
     10 # software distributed under the License is distributed on an
     11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
     12 # either express or implied. See the License for the specific
     13 # language governing permissions and limitations under the License.
     14 
     15 """File Interface for Google Cloud Storage."""
     16 
     17 
     18 
     19 from __future__ import with_statement
     20 
     21 
     22 
     23 __all__ = ['delete',
     24            'listbucket',
     25            'open',
     26            'stat',
     27           ]
     28 
     29 import logging
     30 import StringIO
     31 import urllib
     32 import xml.etree.cElementTree as ET
     33 from . import api_utils
     34 from . import common
     35 from . import errors
     36 from . import storage_api
     37 
     38 
     39 
     40 def open(filename,
     41          mode='r',
     42          content_type=None,
     43          options=None,
     44          read_buffer_size=storage_api.ReadBuffer.DEFAULT_BUFFER_SIZE,
     45          retry_params=None,
     46          _account_id=None):
     47   """Opens a Google Cloud Storage file and returns it as a File-like object.
     48 
     49   Args:
     50     filename: A Google Cloud Storage filename of form '/bucket/filename'.
     51     mode: 'r' for reading mode. 'w' for writing mode.
     52       In reading mode, the file must exist. In writing mode, a file will
     53       be created or be overrode.
     54     content_type: The MIME type of the file. str. Only valid in writing mode.
     55     options: A str->basestring dict to specify additional headers to pass to
     56       GCS e.g. {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
     57       Supported options are x-goog-acl, x-goog-meta-, cache-control,
     58       content-disposition, and content-encoding.
     59       Only valid in writing mode.
     60       See https://developers.google.com/storage/docs/reference-headers
     61       for details.
     62     read_buffer_size: The buffer size for read. Read keeps a buffer
     63       and prefetches another one. To minimize blocking for large files,
     64       always read by buffer size. To minimize number of RPC requests for
     65       small files, set a large buffer size. Max is 30MB.
     66     retry_params: An instance of api_utils.RetryParams for subsequent calls
     67       to GCS from this file handle. If None, the default one is used.
     68     _account_id: Internal-use only.
     69 
     70   Returns:
     71     A reading or writing buffer that supports File-like interface. Buffer
     72     must be closed after operations are done.
     73 
     74   Raises:
     75     errors.AuthorizationError: if authorization failed.
     76     errors.NotFoundError: if an object that's expected to exist doesn't.
     77     ValueError: invalid open mode or if content_type or options are specified
     78       in reading mode.
     79   """
     80   common.validate_file_path(filename)
     81   api = storage_api._get_storage_api(retry_params=retry_params,
     82                                      account_id=_account_id)
     83   filename = api_utils._quote_filename(filename)
     84 
     85   if mode == 'w':
     86     common.validate_options(options)
     87     return storage_api.StreamingBuffer(api, filename, content_type, options)
     88   elif mode == 'r':
     89     if content_type or options:
     90       raise ValueError('Options and content_type can only be specified '
     91                        'for writing mode.')
     92     return storage_api.ReadBuffer(api,
     93                                   filename,
     94                                   buffer_size=read_buffer_size)
     95   else:
     96     raise ValueError('Invalid mode %s.' % mode)
     97 
     98 
     99 def delete(filename, retry_params=None, _account_id=None):
    100   """Delete a Google Cloud Storage file.
    101 
    102   Args:
    103     filename: A Google Cloud Storage filename of form '/bucket/filename'.
    104     retry_params: An api_utils.RetryParams for this call to GCS. If None,
    105       the default one is used.
    106     _account_id: Internal-use only.
    107 
    108   Raises:
    109     errors.NotFoundError: if the file doesn't exist prior to deletion.
    110   """
    111   api = storage_api._get_storage_api(retry_params=retry_params,
    112                                      account_id=_account_id)
    113   common.validate_file_path(filename)
    114   filename = api_utils._quote_filename(filename)
    115   status, resp_headers, _ = api.delete_object(filename)
    116   errors.check_status(status, [204], filename, resp_headers=resp_headers)
    117 
    118 
    119 def stat(filename, retry_params=None, _account_id=None):
    120   """Get GCSFileStat of a Google Cloud storage file.
    121 
    122   Args:
    123     filename: A Google Cloud Storage filename of form '/bucket/filename'.
    124     retry_params: An api_utils.RetryParams for this call to GCS. If None,
    125       the default one is used.
    126     _account_id: Internal-use only.
    127 
    128   Returns:
    129     a GCSFileStat object containing info about this file.
    130 
    131   Raises:
    132     errors.AuthorizationError: if authorization failed.
    133     errors.NotFoundError: if an object that's expected to exist doesn't.
    134   """
    135   common.validate_file_path(filename)
    136   api = storage_api._get_storage_api(retry_params=retry_params,
    137                                      account_id=_account_id)
    138   status, headers, _ = api.head_object(api_utils._quote_filename(filename))
    139   errors.check_status(status, [200], filename, resp_headers=headers)
    140   file_stat = common.GCSFileStat(
    141       filename=filename,
    142       st_size=headers.get('content-length'),
    143       st_ctime=common.http_time_to_posix(headers.get('last-modified')),
    144       etag=headers.get('etag'),
    145       content_type=headers.get('content-type'),
    146       metadata=common.get_metadata(headers))
    147 
    148   return file_stat
    149 
    150 
    151 def _copy2(src, dst, metadata=None, retry_params=None):
    152   """Copy the file content from src to dst.
    153 
    154   Internal use only!
    155 
    156   Args:
    157     src: /bucket/filename
    158     dst: /bucket/filename
    159     metadata: a dict of metadata for this copy. If None, old metadata is copied.
    160       For example, {'x-goog-meta-foo': 'bar'}.
    161     retry_params: An api_utils.RetryParams for this call to GCS. If None,
    162       the default one is used.
    163 
    164   Raises:
    165     errors.AuthorizationError: if authorization failed.
    166     errors.NotFoundError: if an object that's expected to exist doesn't.
    167   """
    168   common.validate_file_path(src)
    169   common.validate_file_path(dst)
    170 
    171   if metadata is None:
    172     metadata = {}
    173     copy_meta = 'COPY'
    174   else:
    175     copy_meta = 'REPLACE'
    176   metadata.update({'x-goog-copy-source': src,
    177                    'x-goog-metadata-directive': copy_meta})
    178 
    179   api = storage_api._get_storage_api(retry_params=retry_params)
    180   status, resp_headers, _ = api.put_object(
    181       api_utils._quote_filename(dst), headers=metadata)
    182   errors.check_status(status, [200], src, metadata, resp_headers)
    183 
    184 
    185 def listbucket(path_prefix, marker=None, prefix=None, max_keys=None,
    186                delimiter=None, retry_params=None, _account_id=None):
    187   """Returns a GCSFileStat iterator over a bucket.
    188 
    189   Optional arguments can limit the result to a subset of files under bucket.
    190 
    191   This function has two modes:
    192   1. List bucket mode: Lists all files in the bucket without any concept of
    193      hierarchy. GCS doesn't have real directory hierarchies.
    194   2. Directory emulation mode: If you specify the 'delimiter' argument,
    195      it is used as a path separator to emulate a hierarchy of directories.
    196      In this mode, the "path_prefix" argument should end in the delimiter
    197      specified (thus designates a logical directory). The logical directory's
    198      contents, both files and subdirectories, are listed. The names of
    199      subdirectories returned will end with the delimiter. So listbucket
    200      can be called with the subdirectory name to list the subdirectory's
    201      contents.
    202 
    203   Args:
    204     path_prefix: A Google Cloud Storage path of format "/bucket" or
    205       "/bucket/prefix". Only objects whose fullpath starts with the
    206       path_prefix will be returned.
    207     marker: Another path prefix. Only objects whose fullpath starts
    208       lexicographically after marker will be returned (exclusive).
    209     prefix: Deprecated. Use path_prefix.
    210     max_keys: The limit on the number of objects to return. int.
    211       For best performance, specify max_keys only if you know how many objects
    212       you want. Otherwise, this method requests large batches and handles
    213       pagination for you.
    214     delimiter: Use to turn on directory mode. str of one or multiple chars
    215       that your bucket uses as its directory separator.
    216     retry_params: An api_utils.RetryParams for this call to GCS. If None,
    217       the default one is used.
    218     _account_id: Internal-use only.
    219 
    220   Examples:
    221     For files "/bucket/a",
    222               "/bucket/bar/1"
    223               "/bucket/foo",
    224               "/bucket/foo/1", "/bucket/foo/2/1", "/bucket/foo/3/1",
    225 
    226     Regular mode:
    227     listbucket("/bucket/f", marker="/bucket/foo/1")
    228     will match "/bucket/foo/2/1", "/bucket/foo/3/1".
    229 
    230     Directory mode:
    231     listbucket("/bucket/", delimiter="/")
    232     will match "/bucket/a, "/bucket/bar/" "/bucket/foo", "/bucket/foo/".
    233     listbucket("/bucket/foo/", delimiter="/")
    234     will match "/bucket/foo/1", "/bucket/foo/2/", "/bucket/foo/3/"
    235 
    236   Returns:
    237     Regular mode:
    238     A GCSFileStat iterator over matched files ordered by filename.
    239     The iterator returns GCSFileStat objects. filename, etag, st_size,
    240     st_ctime, and is_dir are set.
    241 
    242     Directory emulation mode:
    243     A GCSFileStat iterator over matched files and directories ordered by
    244     name. The iterator returns GCSFileStat objects. For directories,
    245     only the filename and is_dir fields are set.
    246 
    247     The last name yielded can be used as next call's marker.
    248   """
    249   if prefix:
    250     common.validate_bucket_path(path_prefix)
    251     bucket = path_prefix
    252   else:
    253     bucket, prefix = common._process_path_prefix(path_prefix)
    254 
    255   if marker and marker.startswith(bucket):
    256     marker = marker[len(bucket) + 1:]
    257 
    258   api = storage_api._get_storage_api(retry_params=retry_params,
    259                                      account_id=_account_id)
    260   options = {}
    261   if marker:
    262     options['marker'] = marker
    263   if max_keys:
    264     options['max-keys'] = max_keys
    265   if prefix:
    266     options['prefix'] = prefix
    267   if delimiter:
    268     options['delimiter'] = delimiter
    269 
    270   return _Bucket(api, bucket, options)
    271 
    272 
    273 class _Bucket(object):
    274   """A wrapper for a GCS bucket as the return value of listbucket."""
    275 
    276   def __init__(self, api, path, options):
    277     """Initialize.
    278 
    279     Args:
    280       api: storage_api instance.
    281       path: bucket path of form '/bucket'.
    282       options: a dict of listbucket options. Please see listbucket doc.
    283     """
    284     self._init(api, path, options)
    285 
    286   def _init(self, api, path, options):
    287     self._api = api
    288     self._path = path
    289     self._options = options.copy()
    290     self._get_bucket_fut = self._api.get_bucket_async(
    291         self._path + '?' + urllib.urlencode(self._options))
    292     self._last_yield = None
    293     self._new_max_keys = self._options.get('max-keys')
    294 
    295   def __getstate__(self):
    296     options = self._options
    297     if self._last_yield:
    298       options['marker'] = self._last_yield.filename[len(self._path) + 1:]
    299     if self._new_max_keys is not None:
    300       options['max-keys'] = self._new_max_keys
    301     return {'api': self._api,
    302             'path': self._path,
    303             'options': options}
    304 
    305   def __setstate__(self, state):
    306     self._init(state['api'], state['path'], state['options'])
    307 
    308   def __iter__(self):
    309     """Iter over the bucket.
    310 
    311     Yields:
    312       GCSFileStat: a GCSFileStat for an object in the bucket.
    313         They are ordered by GCSFileStat.filename.
    314     """
    315     total = 0
    316     max_keys = self._options.get('max-keys')
    317 
    318     while self._get_bucket_fut:
    319       status, resp_headers, content = self._get_bucket_fut.get_result()
    320       errors.check_status(status, [200], self._path, resp_headers=resp_headers,
    321                           extras=self._options)
    322 
    323       if self._should_get_another_batch(content):
    324         self._get_bucket_fut = self._api.get_bucket_async(
    325             self._path + '?' + urllib.urlencode(self._options))
    326       else:
    327         self._get_bucket_fut = None
    328 
    329       root = ET.fromstring(content)
    330       dirs = self._next_dir_gen(root)
    331       files = self._next_file_gen(root)
    332       next_file = files.next()
    333       next_dir = dirs.next()
    334 
    335       while ((max_keys is None or total < max_keys) and
    336              not (next_file is None and next_dir is None)):
    337         total += 1
    338         if next_file is None:
    339           self._last_yield = next_dir
    340           next_dir = dirs.next()
    341         elif next_dir is None:
    342           self._last_yield = next_file
    343           next_file = files.next()
    344         elif next_dir < next_file:
    345           self._last_yield = next_dir
    346           next_dir = dirs.next()
    347         elif next_file < next_dir:
    348           self._last_yield = next_file
    349           next_file = files.next()
    350         else:
    351           logging.error(
    352               'Should never reach. next file is %r. next dir is %r.',
    353               next_file, next_dir)
    354         if self._new_max_keys:
    355           self._new_max_keys -= 1
    356         yield self._last_yield
    357 
    358   def _next_file_gen(self, root):
    359     """Generator for next file element in the document.
    360 
    361     Args:
    362       root: root element of the XML tree.
    363 
    364     Yields:
    365       GCSFileStat for the next file.
    366     """
    367     for e in root.getiterator(common._T_CONTENTS):
    368       st_ctime, size, etag, key = None, None, None, None
    369       for child in e.getiterator('*'):
    370         if child.tag == common._T_LAST_MODIFIED:
    371           st_ctime = common.dt_str_to_posix(child.text)
    372         elif child.tag == common._T_ETAG:
    373           etag = child.text
    374         elif child.tag == common._T_SIZE:
    375           size = child.text
    376         elif child.tag == common._T_KEY:
    377           key = child.text
    378       yield common.GCSFileStat(self._path + '/' + key,
    379                                size, etag, st_ctime)
    380       e.clear()
    381     yield None
    382 
    383   def _next_dir_gen(self, root):
    384     """Generator for next directory element in the document.
    385 
    386     Args:
    387       root: root element in the XML tree.
    388 
    389     Yields:
    390       GCSFileStat for the next directory.
    391     """
    392     for e in root.getiterator(common._T_COMMON_PREFIXES):
    393       yield common.GCSFileStat(
    394           self._path + '/' + e.find(common._T_PREFIX).text,
    395           st_size=None, etag=None, st_ctime=None, is_dir=True)
    396       e.clear()
    397     yield None
    398 
    399   def _should_get_another_batch(self, content):
    400     """Whether to issue another GET bucket call.
    401 
    402     Args:
    403       content: response XML.
    404 
    405     Returns:
    406       True if should, also update self._options for the next request.
    407       False otherwise.
    408     """
    409     if ('max-keys' in self._options and
    410         self._options['max-keys'] <= common._MAX_GET_BUCKET_RESULT):
    411       return False
    412 
    413     elements = self._find_elements(
    414         content, set([common._T_IS_TRUNCATED,
    415                       common._T_NEXT_MARKER]))
    416     if elements.get(common._T_IS_TRUNCATED, 'false').lower() != 'true':
    417       return False
    418 
    419     next_marker = elements.get(common._T_NEXT_MARKER)
    420     if next_marker is None:
    421       self._options.pop('marker', None)
    422       return False
    423     self._options['marker'] = next_marker
    424     return True
    425 
    426   def _find_elements(self, result, elements):
    427     """Find interesting elements from XML.
    428 
    429     This function tries to only look for specified elements
    430     without parsing the entire XML. The specified elements is better
    431     located near the beginning.
    432 
    433     Args:
    434       result: response XML.
    435       elements: a set of interesting element tags.
    436 
    437     Returns:
    438       A dict from element tag to element value.
    439     """
    440     element_mapping = {}
    441     result = StringIO.StringIO(result)
    442     for _, e in ET.iterparse(result, events=('end',)):
    443       if not elements:
    444         break
    445       if e.tag in elements:
    446         element_mapping[e.tag] = e.text
    447         elements.remove(e.tag)
    448     return element_mapping
    449