Home | History | Annotate | Download | only in gslib
      1 # -*- coding: utf-8 -*-
      2 # Copyright 2012 Google Inc. All Rights Reserved.
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #     http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 """Name expansion iterator and result classes.
     16 
     17 Name expansion support for the various ways gsutil lets users refer to
     18 collections of data (via explicit wildcarding as well as directory,
     19 bucket, and bucket subdir implicit wildcarding). This class encapsulates
     20 the various rules for determining how these expansions are done.
     21 """
     22 
     23 # Disable warnings for NameExpansionIteratorQueue functions; they implement
     24 # an interface which does not follow lint guidelines.
     25 # pylint: disable=invalid-name
     26 
     27 from __future__ import absolute_import
     28 
     29 import os
     30 import sys
     31 
     32 import gslib
     33 from gslib.exception import CommandException
     34 from gslib.plurality_checkable_iterator import PluralityCheckableIterator
     35 import gslib.wildcard_iterator
     36 from gslib.wildcard_iterator import StorageUrlFromString
     37 
     38 
     39 class NameExpansionResult(object):
     40   """Holds one fully expanded result from iterating over NameExpansionIterator.
     41 
     42   The member data in this class need to be pickleable because
     43   NameExpansionResult instances are passed through Multiprocessing.Queue. In
     44   particular, don't include any boto state like StorageUri, since that pulls
     45   in a big tree of objects, some of which aren't pickleable (and even if
     46   they were, pickling/unpickling such a large object tree would result in
     47   significant overhead).
     48 
     49   The state held in this object is needed for handling the various naming cases
     50   (e.g., copying from a single source URL to a directory generates different
     51   dest URL names than copying multiple URLs to a directory, to be consistent
     52   with naming rules used by the Unix cp command). For more details see comments
     53   in _NameExpansionIterator.
     54   """
     55 
     56   def __init__(self, source_storage_url, is_multi_source_request,
     57                names_container, expanded_storage_url):
     58     """Instantiates a result from name expansion.
     59 
     60     Args:
     61       source_storage_url: StorageUrl that was being expanded.
     62       is_multi_source_request: bool indicator whether src_url_str expanded to
     63           more than one BucketListingRef.
     64       names_container: Bool indicator whether src_url names a container.
     65       expanded_storage_url: StorageUrl that was expanded.
     66     """
     67     self.source_storage_url = source_storage_url
     68     self.is_multi_source_request = is_multi_source_request
     69     self.names_container = names_container
     70     self.expanded_storage_url = expanded_storage_url
     71 
     72   def __repr__(self):
     73     return '%s' % self.expanded_storage_url
     74 
     75 
     76 class _NameExpansionIterator(object):
     77   """Class that iterates over all source URLs passed to the iterator.
     78 
     79   See details in __iter__ function doc.
     80   """
     81 
     82   def __init__(self, command_name, debug, logger, gsutil_api, url_strs,
     83                recursion_requested, all_versions=False,
     84                cmd_supports_recursion=True, project_id=None,
     85                continue_on_error=False):
     86     """Creates a NameExpansionIterator.
     87 
     88     Args:
     89       command_name: name of command being run.
     90       debug: Debug level to pass to underlying iterators (range 0..3).
     91       logger: logging.Logger object.
     92       gsutil_api: Cloud storage interface.  Settable for testing/mocking.
     93       url_strs: PluralityCheckableIterator of URL strings needing expansion.
     94       recursion_requested: True if -r specified on command-line.  If so,
     95           listings will be flattened so mapped-to results contain objects
     96           spanning subdirectories.
     97       all_versions: Bool indicating whether to iterate over all object versions.
     98       cmd_supports_recursion: Bool indicating whether this command supports a
     99           '-r' flag. Useful for printing helpful error messages.
    100       project_id: Project id to use for bucket retrieval.
    101       continue_on_error: If true, yield no-match exceptions encountered during
    102                          iteration instead of raising them.
    103 
    104     Examples of _NameExpansionIterator with recursion_requested=True:
    105       - Calling with one of the url_strs being 'gs://bucket' will enumerate all
    106         top-level objects, as will 'gs://bucket/' and 'gs://bucket/*'.
    107       - 'gs://bucket/**' will enumerate all objects in the bucket.
    108       - 'gs://bucket/abc' will enumerate either the single object abc or, if
    109          abc is a subdirectory, all objects under abc and any of its
    110          subdirectories.
    111       - 'gs://bucket/abc/**' will enumerate all objects under abc or any of its
    112         subdirectories.
    113       - 'file:///tmp' will enumerate all files under /tmp, as will
    114         'file:///tmp/*'
    115       - 'file:///tmp/**' will enumerate all files under /tmp or any of its
    116         subdirectories.
    117 
    118     Example if recursion_requested=False:
    119       calling with gs://bucket/abc/* lists matching objects
    120       or subdirs, but not sub-subdirs or objects beneath subdirs.
    121 
    122     Note: In step-by-step comments below we give examples assuming there's a
    123     gs://bucket with object paths:
    124       abcd/o1.txt
    125       abcd/o2.txt
    126       xyz/o1.txt
    127       xyz/o2.txt
    128     and a directory file://dir with file paths:
    129       dir/a.txt
    130       dir/b.txt
    131       dir/c/
    132     """
    133     self.command_name = command_name
    134     self.debug = debug
    135     self.logger = logger
    136     self.gsutil_api = gsutil_api
    137     self.url_strs = url_strs
    138     self.recursion_requested = recursion_requested
    139     self.all_versions = all_versions
    140     # Check self.url_strs.HasPlurality() at start because its value can change
    141     # if url_strs is itself an iterator.
    142     self.url_strs.has_plurality = self.url_strs.HasPlurality()
    143     self.cmd_supports_recursion = cmd_supports_recursion
    144     self.project_id = project_id
    145     self.continue_on_error = continue_on_error
    146 
    147     # Map holding wildcard strings to use for flat vs subdir-by-subdir listings.
    148     # (A flat listing means show all objects expanded all the way down.)
    149     self._flatness_wildcard = {True: '**', False: '*'}
    150 
    151   def __iter__(self):
    152     """Iterates over all source URLs passed to the iterator.
    153 
    154     For each src url, expands wildcards, object-less bucket names,
    155     subdir bucket names, and directory names, and generates a flat listing of
    156     all the matching objects/files.
    157 
    158     You should instantiate this object using the static factory function
    159     NameExpansionIterator, because consumers of this iterator need the
    160     PluralityCheckableIterator wrapper built by that function.
    161 
    162     Yields:
    163       gslib.name_expansion.NameExpansionResult.
    164 
    165     Raises:
    166       CommandException: if errors encountered.
    167     """
    168     for url_str in self.url_strs:
    169       storage_url = StorageUrlFromString(url_str)
    170 
    171       if storage_url.IsFileUrl() and storage_url.IsStream():
    172         if self.url_strs.has_plurality:
    173           raise CommandException('Multiple URL strings are not supported '
    174                                  'with streaming ("-") URLs.')
    175         yield NameExpansionResult(storage_url, False, False, storage_url)
    176         continue
    177 
    178       # Step 1: Expand any explicitly specified wildcards. The output from this
    179       # step is an iterator of BucketListingRef.
    180       # Starting with gs://buck*/abc* this step would expand to gs://bucket/abcd
    181 
    182       src_names_bucket = False
    183       if (storage_url.IsCloudUrl() and storage_url.IsBucket()
    184           and not self.recursion_requested):
    185         # UNIX commands like rm and cp will omit directory references.
    186         # If url_str refers only to buckets and we are not recursing,
    187         # then produce references of type BUCKET, because they are guaranteed
    188         # to pass through Step 2 and be omitted in Step 3.
    189         post_step1_iter = PluralityCheckableIterator(
    190             self.WildcardIterator(url_str).IterBuckets(
    191                 bucket_fields=['id']))
    192       else:
    193         # Get a list of objects and prefixes, expanding the top level for
    194         # any listed buckets.  If our source is a bucket, however, we need
    195         # to treat all of the top level expansions as names_container=True.
    196         post_step1_iter = PluralityCheckableIterator(
    197             self.WildcardIterator(url_str).IterAll(
    198                 bucket_listing_fields=['name'],
    199                 expand_top_level_buckets=True))
    200         if storage_url.IsCloudUrl() and storage_url.IsBucket():
    201           src_names_bucket = True
    202 
    203       # Step 2: Expand bucket subdirs. The output from this
    204       # step is an iterator of (names_container, BucketListingRef).
    205       # Starting with gs://bucket/abcd this step would expand to:
    206       #   iter([(True, abcd/o1.txt), (True, abcd/o2.txt)]).
    207       subdir_exp_wildcard = self._flatness_wildcard[self.recursion_requested]
    208       if self.recursion_requested:
    209         post_step2_iter = _ImplicitBucketSubdirIterator(
    210             self, post_step1_iter, subdir_exp_wildcard)
    211       else:
    212         post_step2_iter = _NonContainerTuplifyIterator(post_step1_iter)
    213       post_step2_iter = PluralityCheckableIterator(post_step2_iter)
    214 
    215       # Because we actually perform and check object listings here, this will
    216       # raise if url_args includes a non-existent object.  However,
    217       # plurality_checkable_iterator will buffer the exception for us, not
    218       # raising it until the iterator is actually asked to yield the first
    219       # result.
    220       if post_step2_iter.IsEmpty():
    221         if self.continue_on_error:
    222           try:
    223             raise CommandException('No URLs matched: %s' % url_str)
    224           except CommandException, e:
    225             # Yield a specialized tuple of (exception, stack_trace) to
    226             # the wrapping PluralityCheckableIterator.
    227             yield (e, sys.exc_info()[2])
    228         else:
    229           raise CommandException('No URLs matched: %s' % url_str)
    230 
    231       # Step 3. Omit any directories, buckets, or bucket subdirectories for
    232       # non-recursive expansions.
    233       post_step3_iter = PluralityCheckableIterator(_OmitNonRecursiveIterator(
    234           post_step2_iter, self.recursion_requested, self.command_name,
    235           self.cmd_supports_recursion, self.logger))
    236 
    237       src_url_expands_to_multi = post_step3_iter.HasPlurality()
    238       is_multi_source_request = (self.url_strs.has_plurality
    239                                  or src_url_expands_to_multi)
    240 
    241       # Step 4. Expand directories and buckets. This step yields the iterated
    242       # values. Starting with gs://bucket this step would expand to:
    243       #  [abcd/o1.txt, abcd/o2.txt, xyz/o1.txt, xyz/o2.txt]
    244       # Starting with file://dir this step would expand to:
    245       #  [dir/a.txt, dir/b.txt, dir/c/]
    246       for (names_container, blr) in post_step3_iter:
    247         src_names_container = src_names_bucket or names_container
    248 
    249         if blr.IsObject():
    250           yield NameExpansionResult(
    251               storage_url, is_multi_source_request, src_names_container,
    252               blr.storage_url)
    253         else:
    254           # Use implicit wildcarding to do the enumeration.
    255           # At this point we are guaranteed that:
    256           # - Recursion has been requested because non-object entries are
    257           #   filtered in step 3 otherwise.
    258           # - This is a prefix or bucket subdirectory because only
    259           #   non-recursive iterations product bucket references.
    260           expanded_url = StorageUrlFromString(blr.url_string)
    261           if expanded_url.IsFileUrl():
    262             # Convert dir to implicit recursive wildcard.
    263             url_to_iterate = '%s%s%s' % (blr, os.sep, subdir_exp_wildcard)
    264           else:
    265             # Convert subdir to implicit recursive wildcard.
    266             url_to_iterate = expanded_url.CreatePrefixUrl(
    267                 wildcard_suffix=subdir_exp_wildcard)
    268 
    269           wc_iter = PluralityCheckableIterator(
    270               self.WildcardIterator(url_to_iterate).IterObjects(
    271                   bucket_listing_fields=['name']))
    272           src_url_expands_to_multi = (src_url_expands_to_multi
    273                                       or wc_iter.HasPlurality())
    274           is_multi_source_request = (self.url_strs.has_plurality
    275                                      or src_url_expands_to_multi)
    276           # This will be a flattened listing of all underlying objects in the
    277           # subdir.
    278           for blr in wc_iter:
    279             yield NameExpansionResult(
    280                 storage_url, is_multi_source_request, True, blr.storage_url)
    281 
    282   def WildcardIterator(self, url_string):
    283     """Helper to instantiate gslib.WildcardIterator.
    284 
    285     Args are same as gslib.WildcardIterator interface, but this method fills
    286     in most of the values from instance state.
    287 
    288     Args:
    289       url_string: URL string naming wildcard objects to iterate.
    290 
    291     Returns:
    292       Wildcard iterator over URL string.
    293     """
    294     return gslib.wildcard_iterator.CreateWildcardIterator(
    295         url_string, self.gsutil_api, debug=self.debug,
    296         all_versions=self.all_versions,
    297         project_id=self.project_id)
    298 
    299 
    300 def NameExpansionIterator(command_name, debug, logger, gsutil_api, url_strs,
    301                           recursion_requested, all_versions=False,
    302                           cmd_supports_recursion=True, project_id=None,
    303                           continue_on_error=False):
    304   """Static factory function for instantiating _NameExpansionIterator.
    305 
    306   This wraps the resulting iterator in a PluralityCheckableIterator and checks
    307   that it is non-empty. Also, allows url_strs to be either an array or an
    308   iterator.
    309 
    310   Args:
    311     command_name: name of command being run.
    312     debug: Debug level to pass to underlying iterators (range 0..3).
    313     logger: logging.Logger object.
    314     gsutil_api: Cloud storage interface.  Settable for testing/mocking.
    315     url_strs: Iterable URL strings needing expansion.
    316     recursion_requested: True if -r specified on command-line.  If so,
    317         listings will be flattened so mapped-to results contain objects
    318         spanning subdirectories.
    319     all_versions: Bool indicating whether to iterate over all object versions.
    320     cmd_supports_recursion: Bool indicating whether this command supports a '-r'
    321         flag. Useful for printing helpful error messages.
    322     project_id: Project id to use for the current command.
    323     continue_on_error: If true, yield no-match exceptions encountered during
    324                        iteration instead of raising them.
    325 
    326   Raises:
    327     CommandException if underlying iterator is empty.
    328 
    329   Returns:
    330     Name expansion iterator instance.
    331 
    332   For example semantics, see comments in NameExpansionIterator.__init__.
    333   """
    334   url_strs = PluralityCheckableIterator(url_strs)
    335   name_expansion_iterator = _NameExpansionIterator(
    336       command_name, debug, logger, gsutil_api, url_strs, recursion_requested,
    337       all_versions=all_versions, cmd_supports_recursion=cmd_supports_recursion,
    338       project_id=project_id, continue_on_error=continue_on_error)
    339   name_expansion_iterator = PluralityCheckableIterator(name_expansion_iterator)
    340   if name_expansion_iterator.IsEmpty():
    341     raise CommandException('No URLs matched')
    342   return name_expansion_iterator
    343 
    344 
    345 class NameExpansionIteratorQueue(object):
    346   """Wrapper around NameExpansionIterator with Multiprocessing.Queue interface.
    347 
    348   Only a blocking get() function can be called, and the block and timeout
    349   params on that function are ignored. All other class functions raise
    350   NotImplementedError.
    351 
    352   This class is thread safe.
    353   """
    354 
    355   def __init__(self, name_expansion_iterator, final_value):
    356     self.name_expansion_iterator = name_expansion_iterator
    357     self.final_value = final_value
    358     self.lock = gslib.util.manager.Lock()
    359 
    360   def qsize(self):
    361     raise NotImplementedError(
    362         'NameExpansionIteratorQueue.qsize() not implemented')
    363 
    364   def empty(self):
    365     raise NotImplementedError(
    366         'NameExpansionIteratorQueue.empty() not implemented')
    367 
    368   def full(self):
    369     raise NotImplementedError(
    370         'NameExpansionIteratorQueue.full() not implemented')
    371 
    372   # pylint: disable=unused-argument
    373   def put(self, obj=None, block=None, timeout=None):
    374     raise NotImplementedError(
    375         'NameExpansionIteratorQueue.put() not implemented')
    376 
    377   def put_nowait(self, obj):
    378     raise NotImplementedError(
    379         'NameExpansionIteratorQueue.put_nowait() not implemented')
    380 
    381   # pylint: disable=unused-argument
    382   def get(self, block=None, timeout=None):
    383     self.lock.acquire()
    384     try:
    385       if self.name_expansion_iterator.IsEmpty():
    386         return self.final_value
    387       return self.name_expansion_iterator.next()
    388     finally:
    389       self.lock.release()
    390 
    391   def get_nowait(self):
    392     raise NotImplementedError(
    393         'NameExpansionIteratorQueue.get_nowait() not implemented')
    394 
    395   def get_no_wait(self):
    396     raise NotImplementedError(
    397         'NameExpansionIteratorQueue.get_no_wait() not implemented')
    398 
    399   def close(self):
    400     raise NotImplementedError(
    401         'NameExpansionIteratorQueue.close() not implemented')
    402 
    403   def join_thread(self):
    404     raise NotImplementedError(
    405         'NameExpansionIteratorQueue.join_thread() not implemented')
    406 
    407   def cancel_join_thread(self):
    408     raise NotImplementedError(
    409         'NameExpansionIteratorQueue.cancel_join_thread() not implemented')
    410 
    411 
    412 class _NonContainerTuplifyIterator(object):
    413   """Iterator that produces the tuple (False, blr) for each iterated value.
    414 
    415   Used for cases where blr_iter iterates over a set of
    416   BucketListingRefs known not to name containers.
    417   """
    418 
    419   def __init__(self, blr_iter):
    420     """Instantiates iterator.
    421 
    422     Args:
    423       blr_iter: iterator of BucketListingRef.
    424     """
    425     self.blr_iter = blr_iter
    426 
    427   def __iter__(self):
    428     for blr in self.blr_iter:
    429       yield (False, blr)
    430 
    431 
    432 class _OmitNonRecursiveIterator(object):
    433   """Iterator wrapper for that omits certain values for non-recursive requests.
    434 
    435   This iterates over tuples of (names_container, BucketListingReference) and
    436   omits directories, prefixes, and buckets from non-recurisve requests
    437   so that we can properly calculate whether the source URL expands to multiple
    438   URLs.
    439 
    440   For example, if we have a bucket containing two objects: bucket/foo and
    441   bucket/foo/bar and we do a non-recursive iteration, only bucket/foo will be
    442   yielded.
    443   """
    444 
    445   def __init__(self, tuple_iter, recursion_requested, command_name,
    446                cmd_supports_recursion, logger):
    447     """Instanties the iterator.
    448 
    449     Args:
    450       tuple_iter: Iterator over names_container, BucketListingReference
    451                   from step 2 in the NameExpansionIterator
    452       recursion_requested: If false, omit buckets, dirs, and subdirs
    453       command_name: Command name for user messages
    454       cmd_supports_recursion: Command recursion support for user messages
    455       logger: Log object for user messages
    456     """
    457     self.tuple_iter = tuple_iter
    458     self.recursion_requested = recursion_requested
    459     self.command_name = command_name
    460     self.cmd_supports_recursion = cmd_supports_recursion
    461     self.logger = logger
    462 
    463   def __iter__(self):
    464     for (names_container, blr) in self.tuple_iter:
    465       if not self.recursion_requested and not blr.IsObject():
    466         # At this point we either have a bucket or a prefix,
    467         # so if recursion is not requested, we're going to omit it.
    468         expanded_url = StorageUrlFromString(blr.url_string)
    469         if expanded_url.IsFileUrl():
    470           desc = 'directory'
    471         else:
    472           desc = blr.type_name
    473         if self.cmd_supports_recursion:
    474           self.logger.info(
    475               'Omitting %s "%s". (Did you mean to do %s -r?)',
    476               desc, blr.url_string, self.command_name)
    477         else:
    478           self.logger.info('Omitting %s "%s".', desc, blr.url_string)
    479       else:
    480         yield (names_container, blr)
    481 
    482 
    483 class _ImplicitBucketSubdirIterator(object):
    484   """Iterator wrapper that performs implicit bucket subdir expansion.
    485 
    486   Each iteration yields tuple (names_container, expanded BucketListingRefs)
    487     where names_container is true if URL names a directory, bucket,
    488     or bucket subdir.
    489 
    490   For example, iterating over [BucketListingRef("gs://abc")] would expand to:
    491     [BucketListingRef("gs://abc/o1"), BucketListingRef("gs://abc/o2")]
    492   if those subdir objects exist, and [BucketListingRef("gs://abc") otherwise.
    493   """
    494 
    495   def __init__(self, name_exp_instance, blr_iter, subdir_exp_wildcard):
    496     """Instantiates the iterator.
    497 
    498     Args:
    499       name_exp_instance: calling instance of NameExpansion class.
    500       blr_iter: iterator over BucketListingRef prefixes and objects.
    501       subdir_exp_wildcard: wildcard for expanding subdirectories;
    502           expected values are ** if the mapped-to results should contain
    503           objects spanning subdirectories, or * if only one level should
    504           be listed.
    505     """
    506     self.blr_iter = blr_iter
    507     self.name_exp_instance = name_exp_instance
    508     self.subdir_exp_wildcard = subdir_exp_wildcard
    509 
    510   def __iter__(self):
    511     for blr in self.blr_iter:
    512       if blr.IsPrefix():
    513         # This is a bucket subdirectory, list objects according to the wildcard.
    514         prefix_url = StorageUrlFromString(blr.url_string).CreatePrefixUrl(
    515             wildcard_suffix=self.subdir_exp_wildcard)
    516         implicit_subdir_iterator = PluralityCheckableIterator(
    517             self.name_exp_instance.WildcardIterator(
    518                 prefix_url).IterAll(bucket_listing_fields=['name']))
    519         if not implicit_subdir_iterator.IsEmpty():
    520           for exp_blr in implicit_subdir_iterator:
    521             yield (True, exp_blr)
    522         else:
    523           # Prefix that contains no objects, for example in the $folder$ case
    524           # or an empty filesystem directory.
    525           yield (False, blr)
    526       elif blr.IsObject():
    527         yield (False, blr)
    528       else:
    529         raise CommandException(
    530             '_ImplicitBucketSubdirIterator got a bucket reference %s' % blr)
    531