Home | History | Annotate | Download | only in cloudstorage
      1 # Copyright 2012 Google Inc. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #    http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing,
     10 # software distributed under the License is distributed on an
     11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
     12 # either express or implied. See the License for the specific
     13 # language governing permissions and limitations under the License.
     14 
     15 """File Interface for Google Cloud Storage."""
     16 
     17 
     18 
     19 from __future__ import with_statement
     20 
     21 
     22 
     23 __all__ = ['delete',
     24            'listbucket',
     25            'open',
     26            'stat',
     27           ]
     28 
     29 import logging
     30 import StringIO
     31 import urllib
     32 import xml.etree.cElementTree as ET
     33 from . import api_utils
     34 from . import common
     35 from . import errors
     36 from . import storage_api
     37 
     38 
     39 
     40 def open(filename,
     41          mode='r',
     42          content_type=None,
     43          options=None,
     44          read_buffer_size=storage_api.ReadBuffer.DEFAULT_BUFFER_SIZE,
     45          retry_params=None,
     46          _account_id=None):
     47   """Opens a Google Cloud Storage file and returns it as a File-like object.
     48 
     49   Args:
     50     filename: A Google Cloud Storage filename of form '/bucket/filename'.
     51     mode: 'r' for reading mode. 'w' for writing mode.
     52       In reading mode, the file must exist. In writing mode, a file will
     53       be created or be overrode.
     54     content_type: The MIME type of the file. str. Only valid in writing mode.
     55     options: A str->basestring dict to specify additional headers to pass to
     56       GCS e.g. {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
     57       Supported options are x-goog-acl, x-goog-meta-, cache-control,
     58       content-disposition, and content-encoding.
     59       Only valid in writing mode.
     60       See https://developers.google.com/storage/docs/reference-headers
     61       for details.
     62     read_buffer_size: The buffer size for read. Read keeps a buffer
     63       and prefetches another one. To minimize blocking for large files,
     64       always read by buffer size. To minimize number of RPC requests for
     65       small files, set a large buffer size. Max is 30MB.
     66     retry_params: An instance of api_utils.RetryParams for subsequent calls
     67       to GCS from this file handle. If None, the default one is used.
     68     _account_id: Internal-use only.
     69 
     70   Returns:
     71     A reading or writing buffer that supports File-like interface. Buffer
     72     must be closed after operations are done.
     73 
     74   Raises:
     75     errors.AuthorizationError: if authorization failed.
     76     errors.NotFoundError: if an object that's expected to exist doesn't.
     77     ValueError: invalid open mode or if content_type or options are specified
     78       in reading mode.
     79   """
     80   common.validate_file_path(filename)
     81   api = storage_api._get_storage_api(retry_params=retry_params,
     82                                      account_id=_account_id)
     83   filename = api_utils._quote_filename(filename)
     84 
     85   if mode == 'w':
     86     common.validate_options(options)
     87     return storage_api.StreamingBuffer(api, filename, content_type, options)
     88   elif mode == 'r':
     89     if content_type or options:
     90       raise ValueError('Options and content_type can only be specified '
     91                        'for writing mode.')
     92     return storage_api.ReadBuffer(api,
     93                                   filename,
     94                                   buffer_size=read_buffer_size)
     95   else:
     96     raise ValueError('Invalid mode %s.' % mode)
     97 
     98 
     99 def delete(filename, retry_params=None, _account_id=None):
    100   """Delete a Google Cloud Storage file.
    101 
    102   Args:
    103     filename: A Google Cloud Storage filename of form '/bucket/filename'.
    104     retry_params: An api_utils.RetryParams for this call to GCS. If None,
    105       the default one is used.
    106     _account_id: Internal-use only.
    107 
    108   Raises:
    109     errors.NotFoundError: if the file doesn't exist prior to deletion.
    110   """
    111   api = storage_api._get_storage_api(retry_params=retry_params,
    112                                      account_id=_account_id)
    113   common.validate_file_path(filename)
    114   filename = api_utils._quote_filename(filename)
    115   status, resp_headers, content = api.delete_object(filename)
    116   errors.check_status(status, [204], filename, resp_headers=resp_headers,
    117                       body=content)
    118 
    119 
    120 def stat(filename, retry_params=None, _account_id=None):
    121   """Get GCSFileStat of a Google Cloud storage file.
    122 
    123   Args:
    124     filename: A Google Cloud Storage filename of form '/bucket/filename'.
    125     retry_params: An api_utils.RetryParams for this call to GCS. If None,
    126       the default one is used.
    127     _account_id: Internal-use only.
    128 
    129   Returns:
    130     a GCSFileStat object containing info about this file.
    131 
    132   Raises:
    133     errors.AuthorizationError: if authorization failed.
    134     errors.NotFoundError: if an object that's expected to exist doesn't.
    135   """
    136   common.validate_file_path(filename)
    137   api = storage_api._get_storage_api(retry_params=retry_params,
    138                                      account_id=_account_id)
    139   status, headers, content = api.head_object(
    140       api_utils._quote_filename(filename))
    141   errors.check_status(status, [200], filename, resp_headers=headers,
    142                       body=content)
    143   file_stat = common.GCSFileStat(
    144       filename=filename,
    145       st_size=common.get_stored_content_length(headers),
    146       st_ctime=common.http_time_to_posix(headers.get('last-modified')),
    147       etag=headers.get('etag'),
    148       content_type=headers.get('content-type'),
    149       metadata=common.get_metadata(headers))
    150 
    151   return file_stat
    152 
    153 
    154 def _copy2(src, dst, metadata=None, retry_params=None):
    155   """Copy the file content from src to dst.
    156 
    157   Internal use only!
    158 
    159   Args:
    160     src: /bucket/filename
    161     dst: /bucket/filename
    162     metadata: a dict of metadata for this copy. If None, old metadata is copied.
    163       For example, {'x-goog-meta-foo': 'bar'}.
    164     retry_params: An api_utils.RetryParams for this call to GCS. If None,
    165       the default one is used.
    166 
    167   Raises:
    168     errors.AuthorizationError: if authorization failed.
    169     errors.NotFoundError: if an object that's expected to exist doesn't.
    170   """
    171   common.validate_file_path(src)
    172   common.validate_file_path(dst)
    173 
    174   if metadata is None:
    175     metadata = {}
    176     copy_meta = 'COPY'
    177   else:
    178     copy_meta = 'REPLACE'
    179   metadata.update({'x-goog-copy-source': src,
    180                    'x-goog-metadata-directive': copy_meta})
    181 
    182   api = storage_api._get_storage_api(retry_params=retry_params)
    183   status, resp_headers, content = api.put_object(
    184       api_utils._quote_filename(dst), headers=metadata)
    185   errors.check_status(status, [200], src, metadata, resp_headers, body=content)
    186 
    187 
    188 def listbucket(path_prefix, marker=None, prefix=None, max_keys=None,
    189                delimiter=None, retry_params=None, _account_id=None):
    190   """Returns a GCSFileStat iterator over a bucket.
    191 
    192   Optional arguments can limit the result to a subset of files under bucket.
    193 
    194   This function has two modes:
    195   1. List bucket mode: Lists all files in the bucket without any concept of
    196      hierarchy. GCS doesn't have real directory hierarchies.
    197   2. Directory emulation mode: If you specify the 'delimiter' argument,
    198      it is used as a path separator to emulate a hierarchy of directories.
    199      In this mode, the "path_prefix" argument should end in the delimiter
    200      specified (thus designates a logical directory). The logical directory's
    201      contents, both files and subdirectories, are listed. The names of
    202      subdirectories returned will end with the delimiter. So listbucket
    203      can be called with the subdirectory name to list the subdirectory's
    204      contents.
    205 
    206   Args:
    207     path_prefix: A Google Cloud Storage path of format "/bucket" or
    208       "/bucket/prefix". Only objects whose fullpath starts with the
    209       path_prefix will be returned.
    210     marker: Another path prefix. Only objects whose fullpath starts
    211       lexicographically after marker will be returned (exclusive).
    212     prefix: Deprecated. Use path_prefix.
    213     max_keys: The limit on the number of objects to return. int.
    214       For best performance, specify max_keys only if you know how many objects
    215       you want. Otherwise, this method requests large batches and handles
    216       pagination for you.
    217     delimiter: Use to turn on directory mode. str of one or multiple chars
    218       that your bucket uses as its directory separator.
    219     retry_params: An api_utils.RetryParams for this call to GCS. If None,
    220       the default one is used.
    221     _account_id: Internal-use only.
    222 
    223   Examples:
    224     For files "/bucket/a",
    225               "/bucket/bar/1"
    226               "/bucket/foo",
    227               "/bucket/foo/1", "/bucket/foo/2/1", "/bucket/foo/3/1",
    228 
    229     Regular mode:
    230     listbucket("/bucket/f", marker="/bucket/foo/1")
    231     will match "/bucket/foo/2/1", "/bucket/foo/3/1".
    232 
    233     Directory mode:
    234     listbucket("/bucket/", delimiter="/")
    235     will match "/bucket/a, "/bucket/bar/" "/bucket/foo", "/bucket/foo/".
    236     listbucket("/bucket/foo/", delimiter="/")
    237     will match "/bucket/foo/1", "/bucket/foo/2/", "/bucket/foo/3/"
    238 
    239   Returns:
    240     Regular mode:
    241     A GCSFileStat iterator over matched files ordered by filename.
    242     The iterator returns GCSFileStat objects. filename, etag, st_size,
    243     st_ctime, and is_dir are set.
    244 
    245     Directory emulation mode:
    246     A GCSFileStat iterator over matched files and directories ordered by
    247     name. The iterator returns GCSFileStat objects. For directories,
    248     only the filename and is_dir fields are set.
    249 
    250     The last name yielded can be used as next call's marker.
    251   """
    252   if prefix:
    253     common.validate_bucket_path(path_prefix)
    254     bucket = path_prefix
    255   else:
    256     bucket, prefix = common._process_path_prefix(path_prefix)
    257 
    258   if marker and marker.startswith(bucket):
    259     marker = marker[len(bucket) + 1:]
    260 
    261   api = storage_api._get_storage_api(retry_params=retry_params,
    262                                      account_id=_account_id)
    263   options = {}
    264   if marker:
    265     options['marker'] = marker
    266   if max_keys:
    267     options['max-keys'] = max_keys
    268   if prefix:
    269     options['prefix'] = prefix
    270   if delimiter:
    271     options['delimiter'] = delimiter
    272 
    273   return _Bucket(api, bucket, options)
    274 
    275 
    276 class _Bucket(object):
    277   """A wrapper for a GCS bucket as the return value of listbucket."""
    278 
    279   def __init__(self, api, path, options):
    280     """Initialize.
    281 
    282     Args:
    283       api: storage_api instance.
    284       path: bucket path of form '/bucket'.
    285       options: a dict of listbucket options. Please see listbucket doc.
    286     """
    287     self._init(api, path, options)
    288 
    289   def _init(self, api, path, options):
    290     self._api = api
    291     self._path = path
    292     self._options = options.copy()
    293     self._get_bucket_fut = self._api.get_bucket_async(
    294         self._path + '?' + urllib.urlencode(self._options))
    295     self._last_yield = None
    296     self._new_max_keys = self._options.get('max-keys')
    297 
    298   def __getstate__(self):
    299     options = self._options
    300     if self._last_yield:
    301       options['marker'] = self._last_yield.filename[len(self._path) + 1:]
    302     if self._new_max_keys is not None:
    303       options['max-keys'] = self._new_max_keys
    304     return {'api': self._api,
    305             'path': self._path,
    306             'options': options}
    307 
    308   def __setstate__(self, state):
    309     self._init(state['api'], state['path'], state['options'])
    310 
    311   def __iter__(self):
    312     """Iter over the bucket.
    313 
    314     Yields:
    315       GCSFileStat: a GCSFileStat for an object in the bucket.
    316         They are ordered by GCSFileStat.filename.
    317     """
    318     total = 0
    319     max_keys = self._options.get('max-keys')
    320 
    321     while self._get_bucket_fut:
    322       status, resp_headers, content = self._get_bucket_fut.get_result()
    323       errors.check_status(status, [200], self._path, resp_headers=resp_headers,
    324                           body=content, extras=self._options)
    325 
    326       if self._should_get_another_batch(content):
    327         self._get_bucket_fut = self._api.get_bucket_async(
    328             self._path + '?' + urllib.urlencode(self._options))
    329       else:
    330         self._get_bucket_fut = None
    331 
    332       root = ET.fromstring(content)
    333       dirs = self._next_dir_gen(root)
    334       files = self._next_file_gen(root)
    335       next_file = files.next()
    336       next_dir = dirs.next()
    337 
    338       while ((max_keys is None or total < max_keys) and
    339              not (next_file is None and next_dir is None)):
    340         total += 1
    341         if next_file is None:
    342           self._last_yield = next_dir
    343           next_dir = dirs.next()
    344         elif next_dir is None:
    345           self._last_yield = next_file
    346           next_file = files.next()
    347         elif next_dir < next_file:
    348           self._last_yield = next_dir
    349           next_dir = dirs.next()
    350         elif next_file < next_dir:
    351           self._last_yield = next_file
    352           next_file = files.next()
    353         else:
    354           logging.error(
    355               'Should never reach. next file is %r. next dir is %r.',
    356               next_file, next_dir)
    357         if self._new_max_keys:
    358           self._new_max_keys -= 1
    359         yield self._last_yield
    360 
    361   def _next_file_gen(self, root):
    362     """Generator for next file element in the document.
    363 
    364     Args:
    365       root: root element of the XML tree.
    366 
    367     Yields:
    368       GCSFileStat for the next file.
    369     """
    370     for e in root.getiterator(common._T_CONTENTS):
    371       st_ctime, size, etag, key = None, None, None, None
    372       for child in e.getiterator('*'):
    373         if child.tag == common._T_LAST_MODIFIED:
    374           st_ctime = common.dt_str_to_posix(child.text)
    375         elif child.tag == common._T_ETAG:
    376           etag = child.text
    377         elif child.tag == common._T_SIZE:
    378           size = child.text
    379         elif child.tag == common._T_KEY:
    380           key = child.text
    381       yield common.GCSFileStat(self._path + '/' + key,
    382                                size, etag, st_ctime)
    383       e.clear()
    384     yield None
    385 
    386   def _next_dir_gen(self, root):
    387     """Generator for next directory element in the document.
    388 
    389     Args:
    390       root: root element in the XML tree.
    391 
    392     Yields:
    393       GCSFileStat for the next directory.
    394     """
    395     for e in root.getiterator(common._T_COMMON_PREFIXES):
    396       yield common.GCSFileStat(
    397           self._path + '/' + e.find(common._T_PREFIX).text,
    398           st_size=None, etag=None, st_ctime=None, is_dir=True)
    399       e.clear()
    400     yield None
    401 
    402   def _should_get_another_batch(self, content):
    403     """Whether to issue another GET bucket call.
    404 
    405     Args:
    406       content: response XML.
    407 
    408     Returns:
    409       True if should, also update self._options for the next request.
    410       False otherwise.
    411     """
    412     if ('max-keys' in self._options and
    413         self._options['max-keys'] <= common._MAX_GET_BUCKET_RESULT):
    414       return False
    415 
    416     elements = self._find_elements(
    417         content, set([common._T_IS_TRUNCATED,
    418                       common._T_NEXT_MARKER]))
    419     if elements.get(common._T_IS_TRUNCATED, 'false').lower() != 'true':
    420       return False
    421 
    422     next_marker = elements.get(common._T_NEXT_MARKER)
    423     if next_marker is None:
    424       self._options.pop('marker', None)
    425       return False
    426     self._options['marker'] = next_marker
    427     return True
    428 
    429   def _find_elements(self, result, elements):
    430     """Find interesting elements from XML.
    431 
    432     This function tries to only look for specified elements
    433     without parsing the entire XML. The specified elements is better
    434     located near the beginning.
    435 
    436     Args:
    437       result: response XML.
    438       elements: a set of interesting element tags.
    439 
    440     Returns:
    441       A dict from element tag to element value.
    442     """
    443     element_mapping = {}
    444     result = StringIO.StringIO(result)
    445     for _, e in ET.iterparse(result, events=('end',)):
    446       if not elements:
    447         break
    448       if e.tag in elements:
    449         element_mapping[e.tag] = e.text
    450         elements.remove(e.tag)
    451     return element_mapping
    452