Home | History | Annotate | Download | only in mapreduce
      1 #!/usr/bin/env python
      2 # Copyright 2010 Google Inc. All Rights Reserved.
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #     http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 
     16 """Defines input readers for MapReduce."""
     17 
     18 
     19 
     20 __all__ = [
     21     "AbstractDatastoreInputReader",
     22     "ALLOW_CHECKPOINT",
     23     "BadReaderParamsError",
     24     "BlobstoreLineInputReader",
     25     "BlobstoreZipInputReader",
     26     "BlobstoreZipLineInputReader",
     27     "COUNTER_IO_READ_BYTES",
     28     "COUNTER_IO_READ_MSEC",
     29     "DatastoreEntityInputReader",
     30     "DatastoreInputReader",
     31     "DatastoreKeyInputReader",
     32     "GoogleCloudStorageInputReader",
     33     "GoogleCloudStorageRecordInputReader",
     34     "RandomStringInputReader",
     35     "RawDatastoreInputReader",
     36     "Error",
     37     "InputReader",
     38     "LogInputReader",
     39     "NamespaceInputReader",
     40     ]
     41 
     42 # pylint: disable=g-bad-name
     43 # pylint: disable=protected-access
     44 
     45 import base64
     46 import copy
     47 import logging
     48 import pickle
     49 import random
     50 import string
     51 import StringIO
     52 import time
     53 import zipfile
     54 
     55 from google.net.proto import ProtocolBuffer
     56 from google.appengine.ext import ndb
     57 
     58 from google.appengine.api import datastore
     59 from google.appengine.api import logservice
     60 from google.appengine.api.logservice import log_service_pb
     61 from google.appengine.ext import blobstore
     62 from google.appengine.ext import db
     63 from google.appengine.ext import key_range
     64 from google.appengine.ext.db import metadata
     65 from mapreduce import context
     66 from mapreduce import datastore_range_iterators as db_iters
     67 from mapreduce import errors
     68 from mapreduce import json_util
     69 from mapreduce import key_ranges
     70 from mapreduce import kv_pb
     71 from mapreduce import model
     72 from mapreduce import namespace_range
     73 from mapreduce import operation
     74 from mapreduce import property_range
     75 from mapreduce import records
     76 from mapreduce import util
     77 
     78 # pylint: disable=g-import-not-at-top
     79 # TODO(user): Cleanup imports if/when cloudstorage becomes part of runtime.
     80 try:
     81   # Check if the full cloudstorage package exists. The stub part is in runtime.
     82   cloudstorage = None
     83   import cloudstorage
     84   if hasattr(cloudstorage, "_STUB"):
     85     cloudstorage = None
     86 except ImportError:
     87   pass  # CloudStorage library not available
     88 
     89 # Attempt to load cloudstorage from the bundle (availble in some tests)
     90 if cloudstorage is None:
     91   try:
     92     import cloudstorage
     93   except ImportError:
     94     pass  # CloudStorage library really not available
     95 
     96 
     97 # Classes moved to errors module. Copied here for compatibility.
     98 Error = errors.Error
     99 BadReaderParamsError = errors.BadReaderParamsError
    100 
    101 
    102 # Counter name for number of bytes read.
    103 COUNTER_IO_READ_BYTES = "io-read-bytes"
    104 
    105 # Counter name for milliseconds spent reading data.
    106 COUNTER_IO_READ_MSEC = "io-read-msec"
    107 
    108 # Special value that can be yielded by InputReaders if they want to give the
    109 # framework an opportunity to save the state of the mapreduce without having
    110 # to yield an actual value to the handler.
    111 ALLOW_CHECKPOINT = object()
    112 
    113 
    114 class InputReader(json_util.JsonMixin):
    115   """Abstract base class for input readers.
    116 
    117   InputReaders have the following properties:
    118    * They are created by using the split_input method to generate a set of
    119      InputReaders from a MapperSpec.
    120    * They generate inputs to the mapper via the iterator interface.
    121    * After creation, they can be serialized and resumed using the JsonMixin
    122      interface.
    123    * They are cast to string for a user-readable description; it may be
    124      valuable to implement __str__.
    125   """
    126 
    127   # When expand_parameters is False, then value yielded by reader is passed
    128   # to handler as is. If it's true, then *value is passed, expanding arguments
    129   # and letting handler be a multi-parameter function.
    130   expand_parameters = False
    131 
    132   # Mapreduce parameters.
    133   _APP_PARAM = "_app"
    134   NAMESPACE_PARAM = "namespace"
    135   NAMESPACES_PARAM = "namespaces"  # Obsolete.
    136 
    137   def __iter__(self):
    138     return self
    139 
    140   def next(self):
    141     """Returns the next input from this input reader as a key, value pair.
    142 
    143     Returns:
    144       The next input from this input reader.
    145     """
    146     raise NotImplementedError("next() not implemented in %s" % self.__class__)
    147 
    148   @classmethod
    149   def from_json(cls, input_shard_state):
    150     """Creates an instance of the InputReader for the given input shard state.
    151 
    152     Args:
    153       input_shard_state: The InputReader state as a dict-like object.
    154 
    155     Returns:
    156       An instance of the InputReader configured using the values of json.
    157     """
    158     raise NotImplementedError("from_json() not implemented in %s" % cls)
    159 
    160   def to_json(self):
    161     """Returns an input shard state for the remaining inputs.
    162 
    163     Returns:
    164       A json-izable version of the remaining InputReader.
    165     """
    166     raise NotImplementedError("to_json() not implemented in %s" %
    167                               self.__class__)
    168 
    169   @classmethod
    170   def split_input(cls, mapper_spec):
    171     """Returns a list of input readers.
    172 
    173     This method creates a list of input readers, each for one shard.
    174     It attempts to split inputs among readers evenly.
    175 
    176     Args:
    177       mapper_spec: model.MapperSpec specifies the inputs and additional
    178         parameters to define the behavior of input readers.
    179 
    180     Returns:
    181       A list of InputReaders. None or [] when no input data can be found.
    182     """
    183     raise NotImplementedError("split_input() not implemented in %s" % cls)
    184 
    185   @classmethod
    186   def validate(cls, mapper_spec):
    187     """Validates mapper spec and all mapper parameters.
    188 
    189     Input reader parameters are expected to be passed as "input_reader"
    190     subdictionary in mapper_spec.params.
    191 
    192     Pre 1.6.4 API mixes input reader parameters with all other parameters. Thus
    193     to be compatible, input reader check mapper_spec.params as well and
    194     issue a warning if "input_reader" subdicationary is not present.
    195 
    196     Args:
    197       mapper_spec: The MapperSpec for this InputReader.
    198 
    199     Raises:
    200       BadReaderParamsError: required parameters are missing or invalid.
    201     """
    202     if mapper_spec.input_reader_class() != cls:
    203       raise BadReaderParamsError("Input reader class mismatch")
    204 
    205 
    206 def _get_params(mapper_spec, allowed_keys=None, allow_old=True):
    207   """Obtain input reader parameters.
    208 
    209   Utility function for input readers implementation. Fetches parameters
    210   from mapreduce specification giving appropriate usage warnings.
    211 
    212   Args:
    213     mapper_spec: The MapperSpec for the job
    214     allowed_keys: set of all allowed keys in parameters as strings. If it is not
    215       None, then parameters are expected to be in a separate "input_reader"
    216       subdictionary of mapper_spec parameters.
    217     allow_old: Allow parameters to exist outside of the input_reader
    218       subdictionary for compatability.
    219 
    220   Returns:
    221     mapper parameters as dict
    222 
    223   Raises:
    224     BadReaderParamsError: if parameters are invalid/missing or not allowed.
    225   """
    226   if "input_reader" not in mapper_spec.params:
    227     message = ("Input reader's parameters should be specified in "
    228                "input_reader subdictionary.")
    229     if not allow_old or allowed_keys:
    230       raise errors.BadReaderParamsError(message)
    231     params = mapper_spec.params
    232     params = dict((str(n), v) for n, v in params.iteritems())
    233   else:
    234     if not isinstance(mapper_spec.params.get("input_reader"), dict):
    235       raise errors.BadReaderParamsError(
    236           "Input reader parameters should be a dictionary")
    237     params = mapper_spec.params.get("input_reader")
    238     params = dict((str(n), v) for n, v in params.iteritems())
    239     if allowed_keys:
    240       params_diff = set(params.keys()) - allowed_keys
    241       if params_diff:
    242         raise errors.BadReaderParamsError(
    243             "Invalid input_reader parameters: %s" % ",".join(params_diff))
    244   return params
    245 
    246 
    247 class AbstractDatastoreInputReader(InputReader):
    248   """Abstract class for datastore input readers."""
    249 
    250   # Number of entities to fetch at once while doing scanning.
    251   _BATCH_SIZE = 50
    252 
    253   # Maximum number of shards we'll create.
    254   _MAX_SHARD_COUNT = 256
    255 
    256   # Factor for additional ranges to split when using inequality filters.
    257   _OVERSPLIT_FACTOR = 1
    258 
    259   # The maximum number of namespaces that will be sharded by datastore key
    260   # before switching to a strategy where sharding is done lexographically by
    261   # namespace.
    262   MAX_NAMESPACES_FOR_KEY_SHARD = 10
    263 
    264   # reader parameters.
    265   ENTITY_KIND_PARAM = "entity_kind"
    266   KEYS_ONLY_PARAM = "keys_only"
    267   BATCH_SIZE_PARAM = "batch_size"
    268   KEY_RANGE_PARAM = "key_range"
    269   FILTERS_PARAM = "filters"
    270   OVERSPLIT_FACTOR_PARAM = "oversplit_factor"
    271 
    272   _KEY_RANGE_ITER_CLS = db_iters.AbstractKeyRangeIterator
    273 
    274   def __init__(self, iterator):
    275     """Create new DatastoreInputReader object.
    276 
    277     This is internal constructor. Use split_input to create readers instead.
    278 
    279     Args:
    280       iterator: an iterator that generates objects for this input reader.
    281     """
    282     self._iter = iterator
    283 
    284   def __iter__(self):
    285     """Yields whatever internal iterator yields."""
    286     for o in self._iter:
    287       yield o
    288 
    289   def __str__(self):
    290     """Returns the string representation of this InputReader."""
    291     return repr(self._iter)
    292 
    293   def to_json(self):
    294     """Serializes input reader to json compatible format.
    295 
    296     Returns:
    297       all the data in json-compatible map.
    298     """
    299     return self._iter.to_json()
    300 
    301   @classmethod
    302   def from_json(cls, json):
    303     """Create new DatastoreInputReader from json, encoded by to_json.
    304 
    305     Args:
    306       json: json representation of DatastoreInputReader.
    307 
    308     Returns:
    309       an instance of DatastoreInputReader with all data deserialized from json.
    310     """
    311     return cls(db_iters.RangeIteratorFactory.from_json(json))
    312 
    313   @classmethod
    314   def _get_query_spec(cls, mapper_spec):
    315     """Construct a model.QuerySpec from model.MapperSpec."""
    316     params = _get_params(mapper_spec)
    317     entity_kind = params[cls.ENTITY_KIND_PARAM]
    318     filters = params.get(cls.FILTERS_PARAM)
    319     app = params.get(cls._APP_PARAM)
    320     ns = params.get(cls.NAMESPACE_PARAM)
    321 
    322     return model.QuerySpec(
    323         entity_kind=cls._get_raw_entity_kind(entity_kind),
    324         keys_only=bool(params.get(cls.KEYS_ONLY_PARAM, False)),
    325         filters=filters,
    326         batch_size=int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE)),
    327         oversplit_factor=int(params.get(cls.OVERSPLIT_FACTOR_PARAM,
    328                                         cls._OVERSPLIT_FACTOR)),
    329         model_class_path=entity_kind,
    330         app=app,
    331         ns=ns)
    332 
    333   @classmethod
    334   def split_input(cls, mapper_spec):
    335     """Inherit doc."""
    336     shard_count = mapper_spec.shard_count
    337     query_spec = cls._get_query_spec(mapper_spec)
    338 
    339     namespaces = None
    340     if query_spec.ns is not None:
    341       k_ranges = cls._to_key_ranges_by_shard(
    342           query_spec.app, [query_spec.ns], shard_count, query_spec)
    343     else:
    344       ns_keys = namespace_range.get_namespace_keys(
    345           query_spec.app, cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
    346       # No namespace means the app may have some data but those data are not
    347       # visible yet. Just return.
    348       if not ns_keys:
    349         return
    350       # If the number of ns is small, we shard each ns by key and assign each
    351       # shard a piece of a ns.
    352       elif len(ns_keys) <= cls.MAX_NAMESPACES_FOR_KEY_SHARD:
    353         namespaces = [ns_key.name() or "" for ns_key in ns_keys]
    354         k_ranges = cls._to_key_ranges_by_shard(
    355             query_spec.app, namespaces, shard_count, query_spec)
    356       # When number of ns is large, we can only split lexicographically by ns.
    357       else:
    358         ns_ranges = namespace_range.NamespaceRange.split(n=shard_count,
    359                                                          contiguous=False,
    360                                                          can_query=lambda: True,
    361                                                          _app=query_spec.app)
    362         k_ranges = [key_ranges.KeyRangesFactory.create_from_ns_range(ns_range)
    363                     for ns_range in ns_ranges]
    364 
    365     iters = [db_iters.RangeIteratorFactory.create_key_ranges_iterator(
    366         r, query_spec, cls._KEY_RANGE_ITER_CLS) for r in k_ranges]
    367 
    368     return [cls(i) for i in iters]
    369 
    370   @classmethod
    371   def _to_key_ranges_by_shard(cls, app, namespaces, shard_count, query_spec):
    372     """Get a list of key_ranges.KeyRanges objects, one for each shard.
    373 
    374     This method uses scatter index to split each namespace into pieces
    375     and assign those pieces to shards.
    376 
    377     Args:
    378       app: app_id in str.
    379       namespaces: a list of namespaces in str.
    380       shard_count: number of shards to split.
    381       query_spec: model.QuerySpec.
    382 
    383     Returns:
    384       a list of key_ranges.KeyRanges objects.
    385     """
    386     key_ranges_by_ns = []
    387     # Split each ns into n splits. If a ns doesn't have enough scatter to
    388     # split into n, the last few splits are None.
    389     for namespace in namespaces:
    390       ranges = cls._split_ns_by_scatter(
    391           shard_count,
    392           namespace,
    393           query_spec.entity_kind,
    394           query_spec.filters,
    395           app)
    396       # The nth split of each ns will be assigned to the nth shard.
    397       # Shuffle so that None are not all by the end.
    398       random.shuffle(ranges)
    399       key_ranges_by_ns.append(ranges)
    400 
    401     # KeyRanges from different namespaces might be very different in size.
    402     # Use round robin to make sure each shard can have at most one split
    403     # or a None from a ns.
    404     ranges_by_shard = [[] for _ in range(shard_count)]
    405     for ranges in key_ranges_by_ns:
    406       for i, k_range in enumerate(ranges):
    407         if k_range:
    408           ranges_by_shard[i].append(k_range)
    409 
    410     key_ranges_by_shard = []
    411     for ranges in ranges_by_shard:
    412       if ranges:
    413         key_ranges_by_shard.append(key_ranges.KeyRangesFactory.create_from_list(
    414             ranges))
    415     return key_ranges_by_shard
    416 
    417   @classmethod
    418   def _split_ns_by_scatter(cls,
    419                            shard_count,
    420                            namespace,
    421                            raw_entity_kind,
    422                            filters,
    423                            app):
    424     """Split a namespace by scatter index into key_range.KeyRange.
    425 
    426     TODO(user): Power this with key_range.KeyRange.compute_split_points.
    427 
    428     Args:
    429       shard_count: number of shards.
    430       namespace: namespace name to split. str.
    431       raw_entity_kind: low level datastore API entity kind.
    432       app: app id in str.
    433 
    434     Returns:
    435       A list of key_range.KeyRange objects. If there are not enough entities to
    436     splits into requested shards, the returned list will contain KeyRanges
    437     ordered lexicographically with any Nones appearing at the end.
    438     """
    439     if shard_count == 1:
    440       # With one shard we don't need to calculate any split points at all.
    441       return [key_range.KeyRange(namespace=namespace, _app=app)]
    442 
    443     ds_query = datastore.Query(kind=raw_entity_kind,
    444                                namespace=namespace,
    445                                _app=app,
    446                                keys_only=True)
    447     ds_query.Order("__scatter__")
    448     oversampling_factor = 32
    449     random_keys = None
    450     if filters:
    451       ds_query_with_filters = copy.copy(ds_query)
    452       for (key, op, value) in filters:
    453         ds_query_with_filters.update({'%s %s' % (key, op): value})
    454         try:
    455           random_keys = ds_query_with_filters.Get(shard_count *
    456                                                   oversampling_factor)
    457         except db.NeedIndexError, why:
    458           logging.warning('Need to add an index for optimal mapreduce-input'
    459                           ' splitting:\n%s' % why)
    460           # We'll try again without the filter.  We hope the filter
    461           # will filter keys uniformly across the key-name space!
    462 
    463     if not random_keys:
    464       random_keys = ds_query.Get(shard_count * oversampling_factor)
    465 
    466     if not random_keys:
    467       # There are no entities with scatter property. We have no idea
    468       # how to split.
    469       return ([key_range.KeyRange(namespace=namespace, _app=app)] +
    470               [None] * (shard_count - 1))
    471 
    472     random_keys.sort()
    473 
    474     if len(random_keys) >= shard_count:
    475       # We've got a lot of scatter values. Sample them down.
    476       random_keys = cls._choose_split_points(random_keys, shard_count)
    477 
    478     k_ranges = []
    479 
    480     k_ranges.append(key_range.KeyRange(
    481         key_start=None,
    482         key_end=random_keys[0],
    483         direction=key_range.KeyRange.ASC,
    484         include_start=False,
    485         include_end=False,
    486         namespace=namespace,
    487         _app=app))
    488 
    489     for i in range(0, len(random_keys) - 1):
    490       k_ranges.append(key_range.KeyRange(
    491           key_start=random_keys[i],
    492           key_end=random_keys[i+1],
    493           direction=key_range.KeyRange.ASC,
    494           include_start=True,
    495           include_end=False,
    496           namespace=namespace,
    497           _app=app))
    498 
    499     k_ranges.append(key_range.KeyRange(
    500         key_start=random_keys[-1],
    501         key_end=None,
    502         direction=key_range.KeyRange.ASC,
    503         include_start=True,
    504         include_end=False,
    505         namespace=namespace,
    506         _app=app))
    507 
    508     if len(k_ranges) < shard_count:
    509       # We need to have as many shards as it was requested. Add some Nones.
    510       k_ranges += [None] * (shard_count - len(k_ranges))
    511     return k_ranges
    512 
    513   @classmethod
    514   def _choose_split_points(cls, sorted_keys, shard_count):
    515     """Returns the best split points given a random set of datastore.Keys."""
    516     assert len(sorted_keys) >= shard_count
    517     index_stride = len(sorted_keys) / float(shard_count)
    518     return [sorted_keys[int(round(index_stride * i))]
    519             for i in range(1, shard_count)]
    520 
    521   @classmethod
    522   def validate(cls, mapper_spec):
    523     """Inherit docs."""
    524     params = _get_params(mapper_spec)
    525     if cls.ENTITY_KIND_PARAM not in params:
    526       raise BadReaderParamsError("Missing input reader parameter 'entity_kind'")
    527     if cls.BATCH_SIZE_PARAM in params:
    528       try:
    529         batch_size = int(params[cls.BATCH_SIZE_PARAM])
    530         if batch_size < 1:
    531           raise BadReaderParamsError("Bad batch size: %s" % batch_size)
    532       except ValueError, e:
    533         raise BadReaderParamsError("Bad batch size: %s" % e)
    534     if cls.OVERSPLIT_FACTOR_PARAM in params:
    535       try:
    536         oversplit_factor = int(params[cls.OVERSPLIT_FACTOR_PARAM])
    537         if oversplit_factor < 1:
    538           raise BadReaderParamsError("Bad oversplit factor:"
    539                                      " %s" % oversplit_factor)
    540       except ValueError, e:
    541         raise BadReaderParamsError("Bad oversplit factor: %s" % e)
    542     try:
    543       bool(params.get(cls.KEYS_ONLY_PARAM, False))
    544     except:
    545       raise BadReaderParamsError("keys_only expects a boolean value but got %s",
    546                                  params[cls.KEYS_ONLY_PARAM])
    547     if cls.NAMESPACE_PARAM in params:
    548       if not isinstance(params[cls.NAMESPACE_PARAM],
    549                         (str, unicode, type(None))):
    550         raise BadReaderParamsError(
    551             "Expected a single namespace string")
    552     if cls.NAMESPACES_PARAM in params:
    553       raise BadReaderParamsError("Multiple namespaces are no longer supported")
    554     if cls.FILTERS_PARAM in params:
    555       filters = params[cls.FILTERS_PARAM]
    556       if not isinstance(filters, list):
    557         raise BadReaderParamsError("Expected list for filters parameter")
    558       for f in filters:
    559         if not isinstance(f, (tuple, list)):
    560           raise BadReaderParamsError("Filter should be a tuple or list: %s", f)
    561         if len(f) != 3:
    562           raise BadReaderParamsError("Filter should be a 3-tuple: %s", f)
    563         prop, op, _ = f
    564         if not isinstance(prop, basestring):
    565           raise BadReaderParamsError("Property should be string: %s", prop)
    566         if not isinstance(op, basestring):
    567           raise BadReaderParamsError("Operator should be string: %s", op)
    568 
    569   @classmethod
    570   def _get_raw_entity_kind(cls, entity_kind_or_model_classpath):
    571     """Returns the entity kind to use with low level datastore calls.
    572 
    573     Args:
    574       entity_kind_or_model_classpath: user specified entity kind or model
    575         classpath.
    576 
    577     Returns:
    578       the entity kind in str to use with low level datastore calls.
    579     """
    580     return entity_kind_or_model_classpath
    581 
    582 
    583 class RawDatastoreInputReader(AbstractDatastoreInputReader):
    584   """Iterates over an entity kind and yields datastore.Entity."""
    585 
    586   _KEY_RANGE_ITER_CLS = db_iters.KeyRangeEntityIterator
    587 
    588   @classmethod
    589   def validate(cls, mapper_spec):
    590     """Inherit docs."""
    591     super(RawDatastoreInputReader, cls).validate(mapper_spec)
    592     params = _get_params(mapper_spec)
    593     entity_kind = params[cls.ENTITY_KIND_PARAM]
    594     if "." in entity_kind:
    595       logging.warning(
    596           ". detected in entity kind %s specified for reader %s."
    597           "Assuming entity kind contains the dot.",
    598           entity_kind, cls.__name__)
    599     if cls.FILTERS_PARAM in params:
    600       filters = params[cls.FILTERS_PARAM]
    601       for f in filters:
    602         if f[1] != "=":
    603           raise BadReaderParamsError(
    604               "Only equality filters are supported: %s", f)
    605 
    606 
    607 class DatastoreInputReader(AbstractDatastoreInputReader):
    608   """Iterates over a Model and yields model instances.
    609 
    610   Supports both db.model and ndb.model.
    611   """
    612 
    613   _KEY_RANGE_ITER_CLS = db_iters.KeyRangeModelIterator
    614 
    615   @classmethod
    616   def _get_raw_entity_kind(cls, model_classpath):
    617     entity_type = util.for_name(model_classpath)
    618     if isinstance(entity_type, db.Model):
    619       return entity_type.kind()
    620     elif isinstance(entity_type, (ndb.Model, ndb.MetaModel)):
    621       # pylint: disable=protected-access
    622       return entity_type._get_kind()
    623     else:
    624       return util.get_short_name(model_classpath)
    625 
    626   @classmethod
    627   def validate(cls, mapper_spec):
    628     """Inherit docs."""
    629     super(DatastoreInputReader, cls).validate(mapper_spec)
    630     params = _get_params(mapper_spec)
    631     entity_kind = params[cls.ENTITY_KIND_PARAM]
    632     # Fail fast if Model cannot be located.
    633     try:
    634       model_class = util.for_name(entity_kind)
    635     except ImportError, e:
    636       raise BadReaderParamsError("Bad entity kind: %s" % e)
    637     if cls.FILTERS_PARAM in params:
    638       filters = params[cls.FILTERS_PARAM]
    639       if issubclass(model_class, db.Model):
    640         cls._validate_filters(filters, model_class)
    641       else:
    642         cls._validate_filters_ndb(filters, model_class)
    643       property_range.PropertyRange(filters, entity_kind)
    644 
    645   @classmethod
    646   def _validate_filters(cls, filters, model_class):
    647     """Validate user supplied filters.
    648 
    649     Validate filters are on existing properties and filter values
    650     have valid semantics.
    651 
    652     Args:
    653       filters: user supplied filters. Each filter should be a list or tuple of
    654         format (<property_name_as_str>, <query_operator_as_str>,
    655         <value_of_certain_type>). Value type is up to the property's type.
    656       model_class: the db.Model class for the entity type to apply filters on.
    657 
    658     Raises:
    659       BadReaderParamsError: if any filter is invalid in any way.
    660     """
    661     if not filters:
    662       return
    663 
    664     properties = model_class.properties()
    665 
    666     for f in filters:
    667       prop, _, val = f
    668       if prop not in properties:
    669         raise errors.BadReaderParamsError(
    670             "Property %s is not defined for entity type %s",
    671             prop, model_class.kind())
    672 
    673       # Validate the value of each filter. We need to know filters have
    674       # valid value to carry out splits.
    675       try:
    676         properties[prop].validate(val)
    677       except db.BadValueError, e:
    678         raise errors.BadReaderParamsError(e)
    679 
    680   @classmethod
    681   # pylint: disable=protected-access
    682   def _validate_filters_ndb(cls, filters, model_class):
    683     """Validate ndb.Model filters."""
    684     if not filters:
    685       return
    686 
    687     properties = model_class._properties
    688 
    689 
    690     for idx, f in enumerate(filters):
    691       prop, ineq, val = f
    692       if prop not in properties:
    693         raise errors.BadReaderParamsError(
    694             "Property %s is not defined for entity type %s",
    695             prop, model_class._get_kind())
    696 
    697       # Attempt to cast the value to a KeyProperty if appropriate.
    698       # This enables filtering against keys.
    699       try:
    700         if (isinstance(val, basestring) and
    701             isinstance(properties[prop],
    702               (ndb.KeyProperty, ndb.ComputedProperty))):
    703           val = ndb.Key(urlsafe=val)
    704           filters[idx] = [prop, ineq, val]
    705       except:
    706         pass
    707 
    708       # Validate the value of each filter. We need to know filters have
    709       # valid value to carry out splits.
    710       try:
    711         properties[prop]._do_validate(val)
    712       except db.BadValueError, e:
    713         raise errors.BadReaderParamsError(e)
    714 
    715   @classmethod
    716   def split_input(cls, mapper_spec):
    717     """Inherit docs."""
    718     shard_count = mapper_spec.shard_count
    719     query_spec = cls._get_query_spec(mapper_spec)
    720 
    721     if not property_range.should_shard_by_property_range(query_spec.filters):
    722       return super(DatastoreInputReader, cls).split_input(mapper_spec)
    723 
    724     # Artificially increase the number of shards to get a more even split.
    725     # For example, if we are creating 7 shards for one week of data based on a
    726     # Day property and the data points tend to be clumped on certain days (say,
    727     # Monday and Wednesday), instead of assigning each shard a single day of
    728     # the week, we will split each day into "oversplit_factor" pieces, and
    729     # assign each shard "oversplit_factor" pieces with "1 / oversplit_factor"
    730     # the work, so that the data from Monday and Wednesday is more evenly
    731     # spread across all shards.
    732     oversplit_factor = query_spec.oversplit_factor
    733     oversplit_shard_count = oversplit_factor * shard_count
    734     p_range = property_range.PropertyRange(query_spec.filters,
    735                                            query_spec.model_class_path)
    736     p_ranges = p_range.split(oversplit_shard_count)
    737 
    738     # User specified a namespace.
    739     if query_spec.ns is not None:
    740       ns_range = namespace_range.NamespaceRange(
    741           namespace_start=query_spec.ns,
    742           namespace_end=query_spec.ns,
    743           _app=query_spec.app)
    744       ns_ranges = [copy.copy(ns_range) for _ in p_ranges]
    745     else:
    746       ns_keys = namespace_range.get_namespace_keys(
    747           query_spec.app, cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
    748       if not ns_keys:
    749         return
    750       # User doesn't specify ns but the number of ns is small.
    751       # We still split by property range.
    752       if len(ns_keys) <= cls.MAX_NAMESPACES_FOR_KEY_SHARD:
    753         ns_ranges = [namespace_range.NamespaceRange(_app=query_spec.app)
    754                      for _ in p_ranges]
    755       # Lots of namespaces. Split by ns.
    756       else:
    757         ns_ranges = namespace_range.NamespaceRange.split(n=oversplit_shard_count,
    758                                                          contiguous=False,
    759                                                          can_query=lambda: True,
    760                                                          _app=query_spec.app)
    761         p_ranges = [copy.copy(p_range) for _ in ns_ranges]
    762 
    763     assert len(p_ranges) == len(ns_ranges)
    764 
    765     iters = [
    766         db_iters.RangeIteratorFactory.create_property_range_iterator(
    767             p, ns, query_spec) for p, ns in zip(p_ranges, ns_ranges)]
    768 
    769     # Reduce the number of ranges back down to the shard count.
    770     # It's possible that we didn't split into enough shards even
    771     # after oversplitting, in which case we don't need to do anything.
    772     if len(iters) > shard_count:
    773       # We cycle through the iterators and chain them together, e.g.
    774       # if we look at the indices chained together, we get:
    775       # Shard #0 gets 0, num_shards, 2 * num_shards, ...
    776       # Shard #1 gets 1, num_shards + 1, 2 * num_shards + 1, ...
    777       # Shard #2 gets 2, num_shards + 2, 2 * num_shards + 2, ...
    778       # and so on. This should split fairly evenly.
    779       iters = [
    780         db_iters.RangeIteratorFactory.create_multi_property_range_iterator(
    781           [iters[i] for i in xrange(start_index, len(iters), shard_count)]
    782         ) for start_index in xrange(shard_count)
    783       ]
    784 
    785     return [cls(i) for i in iters]
    786 
    787 
    788 class DatastoreKeyInputReader(RawDatastoreInputReader):
    789   """Iterate over an entity kind and yields datastore.Key."""
    790 
    791   _KEY_RANGE_ITER_CLS = db_iters.KeyRangeKeyIterator
    792 
    793 
    794 # For backward compatibility.
    795 DatastoreEntityInputReader = RawDatastoreInputReader
    796 
    797 
    798 # TODO(user): Remove this after the only dependency GroomerMarkReader is
    799 class _OldAbstractDatastoreInputReader(InputReader):
    800   """Abstract base class for classes that iterate over datastore entities.
    801 
    802   Concrete subclasses must implement _iter_key_range(self, k_range). See the
    803   docstring for that method for details.
    804   """
    805 
    806   # Number of entities to fetch at once while doing scanning.
    807   _BATCH_SIZE = 50
    808 
    809   # Maximum number of shards we'll create.
    810   _MAX_SHARD_COUNT = 256
    811 
    812   # __scatter__ oversampling factor
    813   _OVERSAMPLING_FACTOR = 32
    814 
    815   # The maximum number of namespaces that will be sharded by datastore key
    816   # before switching to a strategy where sharding is done lexographically by
    817   # namespace.
    818   MAX_NAMESPACES_FOR_KEY_SHARD = 10
    819 
    820   # Mapreduce parameters.
    821   ENTITY_KIND_PARAM = "entity_kind"
    822   KEYS_ONLY_PARAM = "keys_only"
    823   BATCH_SIZE_PARAM = "batch_size"
    824   KEY_RANGE_PARAM = "key_range"
    825   NAMESPACE_RANGE_PARAM = "namespace_range"
    826   CURRENT_KEY_RANGE_PARAM = "current_key_range"
    827   FILTERS_PARAM = "filters"
    828 
    829   # TODO(user): Add support for arbitrary queries. It's not possible to
    830   # support them without cursors since right now you can't even serialize query
    831   # definition.
    832   # pylint: disable=redefined-outer-name
    833   def __init__(self,
    834                entity_kind,
    835                key_ranges=None,
    836                ns_range=None,
    837                batch_size=_BATCH_SIZE,
    838                current_key_range=None,
    839                filters=None):
    840     """Create new AbstractDatastoreInputReader object.
    841 
    842     This is internal constructor. Use split_query in a concrete class instead.
    843 
    844     Args:
    845       entity_kind: entity kind as string.
    846       key_ranges: a sequence of key_range.KeyRange instances to process. Only
    847           one of key_ranges or ns_range can be non-None.
    848       ns_range: a namespace_range.NamespaceRange to process. Only one of
    849           key_ranges or ns_range can be non-None.
    850       batch_size: size of read batch as int.
    851       current_key_range: the current key_range.KeyRange being processed.
    852       filters: optional list of filters to apply to the query. Each filter is
    853         a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
    854         User filters are applied first.
    855     """
    856     assert key_ranges is not None or ns_range is not None, (
    857         "must specify one of 'key_ranges' or 'ns_range'")
    858     assert key_ranges is None or ns_range is None, (
    859         "can't specify both 'key_ranges ' and 'ns_range'")
    860 
    861     self._entity_kind = entity_kind
    862     # Reverse the KeyRanges so they can be processed in order as a stack of
    863     # work items.
    864     self._key_ranges = key_ranges and list(reversed(key_ranges))
    865 
    866     self._ns_range = ns_range
    867     self._batch_size = int(batch_size)
    868     self._current_key_range = current_key_range
    869     self._filters = filters
    870 
    871   @classmethod
    872   def _get_raw_entity_kind(cls, entity_kind):
    873     if "." in entity_kind:
    874       logging.warning(
    875           ". detected in entity kind %s specified for reader %s."
    876           "Assuming entity kind contains the dot.",
    877           entity_kind, cls.__name__)
    878     return entity_kind
    879 
    880   def __iter__(self):
    881     """Iterates over the given KeyRanges or NamespaceRange.
    882 
    883     This method iterates over the given KeyRanges or NamespaceRange and sets
    884     the self._current_key_range to the KeyRange currently being processed. It
    885     then delegates to the _iter_key_range method to yield that actual
    886     results.
    887 
    888     Yields:
    889       Forwards the objects yielded by the subclasses concrete _iter_key_range()
    890       method. The caller must consume the result yielded because self.to_json()
    891       will not include it.
    892     """
    893     if self._key_ranges is not None:
    894       for o in self._iter_key_ranges():
    895         yield o
    896     elif self._ns_range is not None:
    897       for o in self._iter_ns_range():
    898         yield o
    899     else:
    900       assert False, "self._key_ranges and self._ns_range are both None"
    901 
    902   def _iter_key_ranges(self):
    903     """Iterates over self._key_ranges, delegating to self._iter_key_range()."""
    904     while True:
    905       if self._current_key_range is None:
    906         if self._key_ranges:
    907           self._current_key_range = self._key_ranges.pop()
    908           # The most recently popped key_range may be None, so continue here
    909           # to find the next keyrange that's valid.
    910           continue
    911         else:
    912           break
    913 
    914       for key, o in self._iter_key_range(
    915           copy.deepcopy(self._current_key_range)):
    916         # The caller must consume yielded values so advancing the KeyRange
    917         # before yielding is safe.
    918         self._current_key_range.advance(key)
    919         yield o
    920       self._current_key_range = None
    921 
    922   def _iter_ns_range(self):
    923     """Iterates over self._ns_range, delegating to self._iter_key_range()."""
    924     while True:
    925       if self._current_key_range is None:
    926         query = self._ns_range.make_datastore_query()
    927         namespace_result = query.Get(1)
    928         if not namespace_result:
    929           break
    930 
    931         namespace = namespace_result[0].name() or ""
    932         self._current_key_range = key_range.KeyRange(
    933             namespace=namespace, _app=self._ns_range.app)
    934         yield ALLOW_CHECKPOINT
    935 
    936       for key, o in self._iter_key_range(
    937           copy.deepcopy(self._current_key_range)):
    938         # The caller must consume yielded values so advancing the KeyRange
    939         # before yielding is safe.
    940         self._current_key_range.advance(key)
    941         yield o
    942 
    943       if (self._ns_range.is_single_namespace or
    944           self._current_key_range.namespace == self._ns_range.namespace_end):
    945         break
    946       self._ns_range = self._ns_range.with_start_after(
    947           self._current_key_range.namespace)
    948       self._current_key_range = None
    949 
    950   def _iter_key_range(self, k_range):
    951     """Yields a db.Key and the value that should be yielded by self.__iter__().
    952 
    953     Args:
    954       k_range: The key_range.KeyRange to iterate over.
    955 
    956     Yields:
    957       A 2-tuple containing the last db.Key processed and the value that should
    958       be yielded by __iter__. The returned db.Key will be used to determine the
    959       InputReader's current position in self._current_key_range.
    960     """
    961     raise NotImplementedError("_iter_key_range() not implemented in %s" %
    962                               self.__class__)
    963 
    964   def __str__(self):
    965     """Returns the string representation of this InputReader."""
    966     if self._ns_range is None:
    967       return repr(self._key_ranges)
    968     else:
    969       return repr(self._ns_range)
    970 
    971   @classmethod
    972   def _choose_split_points(cls, sorted_keys, shard_count):
    973     """Returns the best split points given a random set of db.Keys."""
    974     assert len(sorted_keys) >= shard_count
    975     index_stride = len(sorted_keys) / float(shard_count)
    976     return [sorted_keys[int(round(index_stride * i))]
    977             for i in range(1, shard_count)]
    978 
    979   # TODO(user): use query splitting functionality when it becomes available
    980   # instead.
    981   @classmethod
    982   def _split_input_from_namespace(cls, app, namespace, entity_kind,
    983                                   shard_count):
    984     """Helper for _split_input_from_params.
    985 
    986     If there are not enough Entities to make all of the given shards, the
    987     returned list of KeyRanges will include Nones. The returned list will
    988     contain KeyRanges ordered lexographically with any Nones appearing at the
    989     end.
    990 
    991     Args:
    992       app: the app.
    993       namespace: the namespace.
    994       entity_kind: entity kind as string.
    995       shard_count: the number of shards.
    996 
    997     Returns:
    998       KeyRange objects.
    999     """
   1000 
   1001     raw_entity_kind = cls._get_raw_entity_kind(entity_kind)
   1002     if shard_count == 1:
   1003       # With one shard we don't need to calculate any splitpoints at all.
   1004       return [key_range.KeyRange(namespace=namespace, _app=app)]
   1005 
   1006     ds_query = datastore.Query(kind=raw_entity_kind,
   1007                                namespace=namespace,
   1008                                _app=app,
   1009                                keys_only=True)
   1010     ds_query.Order("__scatter__")
   1011     random_keys = ds_query.Get(shard_count * cls._OVERSAMPLING_FACTOR)
   1012 
   1013     if not random_keys:
   1014       # There are no entities with scatter property. We have no idea
   1015       # how to split.
   1016       return ([key_range.KeyRange(namespace=namespace, _app=app)] +
   1017               [None] * (shard_count - 1))
   1018 
   1019     random_keys.sort()
   1020 
   1021     if len(random_keys) >= shard_count:
   1022       # We've got a lot of scatter values. Sample them down.
   1023       random_keys = cls._choose_split_points(random_keys, shard_count)
   1024 
   1025     # pylint: disable=redefined-outer-name
   1026     key_ranges = []
   1027 
   1028     key_ranges.append(key_range.KeyRange(
   1029         key_start=None,
   1030         key_end=random_keys[0],
   1031         direction=key_range.KeyRange.ASC,
   1032         include_start=False,
   1033         include_end=False,
   1034         namespace=namespace,
   1035         _app=app))
   1036 
   1037     for i in range(0, len(random_keys) - 1):
   1038       key_ranges.append(key_range.KeyRange(
   1039           key_start=random_keys[i],
   1040           key_end=random_keys[i+1],
   1041           direction=key_range.KeyRange.ASC,
   1042           include_start=True,
   1043           include_end=False,
   1044           namespace=namespace,
   1045           _app=app))
   1046 
   1047     key_ranges.append(key_range.KeyRange(
   1048         key_start=random_keys[-1],
   1049         key_end=None,
   1050         direction=key_range.KeyRange.ASC,
   1051         include_start=True,
   1052         include_end=False,
   1053         namespace=namespace,
   1054         _app=app))
   1055 
   1056     if len(key_ranges) < shard_count:
   1057       # We need to have as many shards as it was requested. Add some Nones.
   1058       key_ranges += [None] * (shard_count - len(key_ranges))
   1059 
   1060     return key_ranges
   1061 
   1062   @classmethod
   1063   def _split_input_from_params(cls, app, namespaces, entity_kind_name,
   1064                                params, shard_count):
   1065     """Return input reader objects. Helper for split_input."""
   1066     # pylint: disable=redefined-outer-name
   1067     key_ranges = []  # KeyRanges for all namespaces
   1068     for namespace in namespaces:
   1069       key_ranges.extend(
   1070           cls._split_input_from_namespace(app,
   1071                                           namespace,
   1072                                           entity_kind_name,
   1073                                           shard_count))
   1074 
   1075     # Divide the KeyRanges into shard_count shards. The KeyRanges for different
   1076     # namespaces might be very different in size so the assignment of KeyRanges
   1077     # to shards is done round-robin.
   1078     shared_ranges = [[] for _ in range(shard_count)]
   1079     for i, k_range in enumerate(key_ranges):
   1080       shared_ranges[i % shard_count].append(k_range)
   1081     batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
   1082 
   1083     return [cls(entity_kind_name,
   1084                 key_ranges=key_ranges,
   1085                 ns_range=None,
   1086                 batch_size=batch_size)
   1087             for key_ranges in shared_ranges if key_ranges]
   1088 
   1089   @classmethod
   1090   def validate(cls, mapper_spec):
   1091     """Validates mapper spec and all mapper parameters.
   1092 
   1093     Args:
   1094       mapper_spec: The MapperSpec for this InputReader.
   1095 
   1096     Raises:
   1097       BadReaderParamsError: required parameters are missing or invalid.
   1098     """
   1099     if mapper_spec.input_reader_class() != cls:
   1100       raise BadReaderParamsError("Input reader class mismatch")
   1101     params = _get_params(mapper_spec)
   1102     if cls.ENTITY_KIND_PARAM not in params:
   1103       raise BadReaderParamsError("Missing mapper parameter 'entity_kind'")
   1104     if cls.BATCH_SIZE_PARAM in params:
   1105       try:
   1106         batch_size = int(params[cls.BATCH_SIZE_PARAM])
   1107         if batch_size < 1:
   1108           raise BadReaderParamsError("Bad batch size: %s" % batch_size)
   1109       except ValueError, e:
   1110         raise BadReaderParamsError("Bad batch size: %s" % e)
   1111     if cls.NAMESPACE_PARAM in params:
   1112       if not isinstance(params[cls.NAMESPACE_PARAM],
   1113                         (str, unicode, type(None))):
   1114         raise BadReaderParamsError(
   1115             "Expected a single namespace string")
   1116     if cls.NAMESPACES_PARAM in params:
   1117       raise BadReaderParamsError("Multiple namespaces are no longer supported")
   1118     if cls.FILTERS_PARAM in params:
   1119       filters = params[cls.FILTERS_PARAM]
   1120       if not isinstance(filters, list):
   1121         raise BadReaderParamsError("Expected list for filters parameter")
   1122       for f in filters:
   1123         if not isinstance(f, (tuple, list)):
   1124           raise BadReaderParamsError("Filter should be a tuple or list: %s", f)
   1125         if len(f) != 3:
   1126           raise BadReaderParamsError("Filter should be a 3-tuple: %s", f)
   1127         if not isinstance(f[0], basestring):
   1128           raise BadReaderParamsError("First element should be string: %s", f)
   1129         if f[1] != "=":
   1130           raise BadReaderParamsError(
   1131               "Only equality filters are supported: %s", f)
   1132 
   1133   @classmethod
   1134   def split_input(cls, mapper_spec):
   1135     """Splits query into shards without fetching query results.
   1136 
   1137     Tries as best as it can to split the whole query result set into equal
   1138     shards. Due to difficulty of making the perfect split, resulting shards'
   1139     sizes might differ significantly from each other.
   1140 
   1141     Args:
   1142       mapper_spec: MapperSpec with params containing 'entity_kind'.
   1143         May have 'namespace' in the params as a string containing a single
   1144         namespace. If specified then the input reader will only yield values
   1145         in the given namespace. If 'namespace' is not given then values from
   1146         all namespaces will be yielded. May also have 'batch_size' in the params
   1147         to specify the number of entities to process in each batch.
   1148 
   1149     Returns:
   1150       A list of InputReader objects. If the query results are empty then the
   1151       empty list will be returned. Otherwise, the list will always have a length
   1152       equal to number_of_shards but may be padded with Nones if there are too
   1153       few results for effective sharding.
   1154     """
   1155     params = _get_params(mapper_spec)
   1156     entity_kind_name = params[cls.ENTITY_KIND_PARAM]
   1157     batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
   1158     shard_count = mapper_spec.shard_count
   1159     namespace = params.get(cls.NAMESPACE_PARAM)
   1160     app = params.get(cls._APP_PARAM)
   1161     filters = params.get(cls.FILTERS_PARAM)
   1162 
   1163     if namespace is None:
   1164       # It is difficult to efficiently shard large numbers of namespaces because
   1165       # there can be an arbitrary number of them. So the strategy is:
   1166       # 1. if there are a small number of namespaces in the datastore then
   1167       #    generate one KeyRange per namespace per shard and assign each shard a
   1168       #    KeyRange for every namespace. This should lead to nearly perfect
   1169       #    sharding.
   1170       # 2. if there are a large number of namespaces in the datastore then
   1171       #    generate one NamespaceRange per worker. This can lead to very bad
   1172       #    sharding because namespaces can contain very different numbers of
   1173       #    entities and each NamespaceRange may contain very different numbers
   1174       #    of namespaces.
   1175       namespace_query = datastore.Query("__namespace__",
   1176                                         keys_only=True,
   1177                                         _app=app)
   1178       namespace_keys = namespace_query.Get(
   1179           limit=cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
   1180 
   1181       if len(namespace_keys) > cls.MAX_NAMESPACES_FOR_KEY_SHARD:
   1182         ns_ranges = namespace_range.NamespaceRange.split(n=shard_count,
   1183                                                          contiguous=True,
   1184                                                          _app=app)
   1185         return [cls(entity_kind_name,
   1186                     key_ranges=None,
   1187                     ns_range=ns_range,
   1188                     batch_size=batch_size,
   1189                     filters=filters)
   1190                 for ns_range in ns_ranges]
   1191       elif not namespace_keys:
   1192         return [cls(entity_kind_name,
   1193                     key_ranges=None,
   1194                     ns_range=namespace_range.NamespaceRange(_app=app),
   1195                     batch_size=shard_count,
   1196                     filters=filters)]
   1197       else:
   1198         namespaces = [namespace_key.name() or ""
   1199                       for namespace_key in namespace_keys]
   1200     else:
   1201       namespaces = [namespace]
   1202 
   1203     readers = cls._split_input_from_params(
   1204         app, namespaces, entity_kind_name, params, shard_count)
   1205     if filters:
   1206       for reader in readers:
   1207         reader._filters = filters
   1208     return readers
   1209 
   1210   def to_json(self):
   1211     """Serializes all the data in this query range into json form.
   1212 
   1213     Returns:
   1214       all the data in json-compatible map.
   1215     """
   1216     if self._key_ranges is None:
   1217       key_ranges_json = None
   1218     else:
   1219       key_ranges_json = []
   1220       for k in self._key_ranges:
   1221         if k:
   1222           key_ranges_json.append(k.to_json())
   1223         else:
   1224           key_ranges_json.append(None)
   1225 
   1226     if self._ns_range is None:
   1227       namespace_range_json = None
   1228     else:
   1229       namespace_range_json = self._ns_range.to_json_object()
   1230 
   1231     if self._current_key_range is None:
   1232       current_key_range_json = None
   1233     else:
   1234       current_key_range_json = self._current_key_range.to_json()
   1235 
   1236     json_dict = {self.KEY_RANGE_PARAM: key_ranges_json,
   1237                  self.NAMESPACE_RANGE_PARAM: namespace_range_json,
   1238                  self.CURRENT_KEY_RANGE_PARAM: current_key_range_json,
   1239                  self.ENTITY_KIND_PARAM: self._entity_kind,
   1240                  self.BATCH_SIZE_PARAM: self._batch_size,
   1241                  self.FILTERS_PARAM: self._filters}
   1242     return json_dict
   1243 
   1244   @classmethod
   1245   def from_json(cls, json):
   1246     """Create new DatastoreInputReader from the json, encoded by to_json.
   1247 
   1248     Args:
   1249       json: json map representation of DatastoreInputReader.
   1250 
   1251     Returns:
   1252       an instance of DatastoreInputReader with all data deserialized from json.
   1253     """
   1254     if json[cls.KEY_RANGE_PARAM] is None:
   1255       # pylint: disable=redefined-outer-name
   1256       key_ranges = None
   1257     else:
   1258       key_ranges = []
   1259       for k in json[cls.KEY_RANGE_PARAM]:
   1260         if k:
   1261           key_ranges.append(key_range.KeyRange.from_json(k))
   1262         else:
   1263           key_ranges.append(None)
   1264 
   1265     if json[cls.NAMESPACE_RANGE_PARAM] is None:
   1266       ns_range = None
   1267     else:
   1268       ns_range = namespace_range.NamespaceRange.from_json_object(
   1269           json[cls.NAMESPACE_RANGE_PARAM])
   1270 
   1271     if json[cls.CURRENT_KEY_RANGE_PARAM] is None:
   1272       current_key_range = None
   1273     else:
   1274       current_key_range = key_range.KeyRange.from_json(
   1275           json[cls.CURRENT_KEY_RANGE_PARAM])
   1276 
   1277     return cls(
   1278         json[cls.ENTITY_KIND_PARAM],
   1279         key_ranges,
   1280         ns_range,
   1281         json[cls.BATCH_SIZE_PARAM],
   1282         current_key_range,
   1283         filters=json.get(cls.FILTERS_PARAM))
   1284 
   1285 
   1286 class BlobstoreLineInputReader(InputReader):
   1287   """Input reader for a newline delimited blob in Blobstore."""
   1288 
   1289   # TODO(user): Should we set this based on MAX_BLOB_FETCH_SIZE?
   1290   _BLOB_BUFFER_SIZE = 64000
   1291 
   1292   # Maximum number of shards to allow.
   1293   _MAX_SHARD_COUNT = 256
   1294 
   1295   # Maximum number of blobs to allow.
   1296   _MAX_BLOB_KEYS_COUNT = 246
   1297 
   1298   # Mapreduce parameters.
   1299   BLOB_KEYS_PARAM = "blob_keys"
   1300 
   1301   # Serialization parmaeters.
   1302   INITIAL_POSITION_PARAM = "initial_position"
   1303   END_POSITION_PARAM = "end_position"
   1304   BLOB_KEY_PARAM = "blob_key"
   1305 
   1306   def __init__(self, blob_key, start_position, end_position):
   1307     """Initializes this instance with the given blob key and character range.
   1308 
   1309     This BlobstoreInputReader will read from the first record starting after
   1310     strictly after start_position until the first record ending at or after
   1311     end_position (exclusive). As an exception, if start_position is 0, then
   1312     this InputReader starts reading at the first record.
   1313 
   1314     Args:
   1315       blob_key: the BlobKey that this input reader is processing.
   1316       start_position: the position to start reading at.
   1317       end_position: a position in the last record to read.
   1318     """
   1319     self._blob_key = blob_key
   1320     self._blob_reader = blobstore.BlobReader(blob_key,
   1321                                              self._BLOB_BUFFER_SIZE,
   1322                                              start_position)
   1323     self._end_position = end_position
   1324     self._has_iterated = False
   1325     self._read_before_start = bool(start_position)
   1326 
   1327   def next(self):
   1328     """Returns the next input from as an (offset, line) tuple."""
   1329     self._has_iterated = True
   1330 
   1331     if self._read_before_start:
   1332       self._blob_reader.readline()
   1333       self._read_before_start = False
   1334     start_position = self._blob_reader.tell()
   1335 
   1336     if start_position > self._end_position:
   1337       raise StopIteration()
   1338 
   1339     line = self._blob_reader.readline()
   1340 
   1341     if not line:
   1342       raise StopIteration()
   1343 
   1344     return start_position, line.rstrip("\n")
   1345 
   1346   def to_json(self):
   1347     """Returns an json-compatible input shard spec for remaining inputs."""
   1348     new_pos = self._blob_reader.tell()
   1349     if self._has_iterated:
   1350       new_pos -= 1
   1351     return {self.BLOB_KEY_PARAM: self._blob_key,
   1352             self.INITIAL_POSITION_PARAM: new_pos,
   1353             self.END_POSITION_PARAM: self._end_position}
   1354 
   1355   def __str__(self):
   1356     """Returns the string representation of this BlobstoreLineInputReader."""
   1357     return "blobstore.BlobKey(%r):[%d, %d]" % (
   1358         self._blob_key, self._blob_reader.tell(), self._end_position)
   1359 
   1360   @classmethod
   1361   def from_json(cls, json):
   1362     """Instantiates an instance of this InputReader for the given shard spec."""
   1363     return cls(json[cls.BLOB_KEY_PARAM],
   1364                json[cls.INITIAL_POSITION_PARAM],
   1365                json[cls.END_POSITION_PARAM])
   1366 
   1367   @classmethod
   1368   def validate(cls, mapper_spec):
   1369     """Validates mapper spec and all mapper parameters.
   1370 
   1371     Args:
   1372       mapper_spec: The MapperSpec for this InputReader.
   1373 
   1374     Raises:
   1375       BadReaderParamsError: required parameters are missing or invalid.
   1376     """
   1377     if mapper_spec.input_reader_class() != cls:
   1378       raise BadReaderParamsError("Mapper input reader class mismatch")
   1379     params = _get_params(mapper_spec)
   1380     if cls.BLOB_KEYS_PARAM not in params:
   1381       raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
   1382     blob_keys = params[cls.BLOB_KEYS_PARAM]
   1383     if isinstance(blob_keys, basestring):
   1384       # This is a mechanism to allow multiple blob keys (which do not contain
   1385       # commas) in a single string. It may go away.
   1386       blob_keys = blob_keys.split(",")
   1387     if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT:
   1388       raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
   1389     if not blob_keys:
   1390       raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
   1391     for blob_key in blob_keys:
   1392       blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
   1393       if not blob_info:
   1394         raise BadReaderParamsError("Could not find blobinfo for key %s" %
   1395                                    blob_key)
   1396 
   1397   @classmethod
   1398   def split_input(cls, mapper_spec):
   1399     """Returns a list of shard_count input_spec_shards for input_spec.
   1400 
   1401     Args:
   1402       mapper_spec: The mapper specification to split from. Must contain
   1403           'blob_keys' parameter with one or more blob keys.
   1404 
   1405     Returns:
   1406       A list of BlobstoreInputReaders corresponding to the specified shards.
   1407     """
   1408     params = _get_params(mapper_spec)
   1409     blob_keys = params[cls.BLOB_KEYS_PARAM]
   1410     if isinstance(blob_keys, basestring):
   1411       # This is a mechanism to allow multiple blob keys (which do not contain
   1412       # commas) in a single string. It may go away.
   1413       blob_keys = blob_keys.split(",")
   1414 
   1415     blob_sizes = {}
   1416     for blob_key in blob_keys:
   1417       blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
   1418       blob_sizes[blob_key] = blob_info.size
   1419 
   1420     shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
   1421     shards_per_blob = shard_count // len(blob_keys)
   1422     if shards_per_blob == 0:
   1423       shards_per_blob = 1
   1424 
   1425     chunks = []
   1426     for blob_key, blob_size in blob_sizes.items():
   1427       blob_chunk_size = blob_size // shards_per_blob
   1428       for i in xrange(shards_per_blob - 1):
   1429         chunks.append(BlobstoreLineInputReader.from_json(
   1430             {cls.BLOB_KEY_PARAM: blob_key,
   1431              cls.INITIAL_POSITION_PARAM: blob_chunk_size * i,
   1432              cls.END_POSITION_PARAM: blob_chunk_size * (i + 1)}))
   1433       chunks.append(BlobstoreLineInputReader.from_json(
   1434           {cls.BLOB_KEY_PARAM: blob_key,
   1435            cls.INITIAL_POSITION_PARAM: blob_chunk_size * (shards_per_blob - 1),
   1436            cls.END_POSITION_PARAM: blob_size}))
   1437     return chunks
   1438 
   1439 
   1440 class BlobstoreZipInputReader(InputReader):
   1441   """Input reader for files from a zip archive stored in the Blobstore.
   1442 
   1443   Each instance of the reader will read the TOC, from the end of the zip file,
   1444   and then only the contained files which it is responsible for.
   1445   """
   1446 
   1447   # Maximum number of shards to allow.
   1448   _MAX_SHARD_COUNT = 256
   1449 
   1450   # Mapreduce parameters.
   1451   BLOB_KEY_PARAM = "blob_key"
   1452   START_INDEX_PARAM = "start_index"
   1453   END_INDEX_PARAM = "end_index"
   1454 
   1455   def __init__(self, blob_key, start_index, end_index,
   1456                _reader=blobstore.BlobReader):
   1457     """Initializes this instance with the given blob key and file range.
   1458 
   1459     This BlobstoreZipInputReader will read from the file with index start_index
   1460     up to but not including the file with index end_index.
   1461 
   1462     Args:
   1463       blob_key: the BlobKey that this input reader is processing.
   1464       start_index: the index of the first file to read.
   1465       end_index: the index of the first file that will not be read.
   1466       _reader: a callable that returns a file-like object for reading blobs.
   1467           Used for dependency injection.
   1468     """
   1469     self._blob_key = blob_key
   1470     self._start_index = start_index
   1471     self._end_index = end_index
   1472     self._reader = _reader
   1473     self._zip = None
   1474     self._entries = None
   1475 
   1476   def next(self):
   1477     """Returns the next input from this input reader as (ZipInfo, opener) tuple.
   1478 
   1479     Returns:
   1480       The next input from this input reader, in the form of a 2-tuple.
   1481       The first element of the tuple is a zipfile.ZipInfo object.
   1482       The second element of the tuple is a zero-argument function that, when
   1483       called, returns the complete body of the file.
   1484     """
   1485     if not self._zip:
   1486       self._zip = zipfile.ZipFile(self._reader(self._blob_key))
   1487       # Get a list of entries, reversed so we can pop entries off in order
   1488       self._entries = self._zip.infolist()[self._start_index:self._end_index]
   1489       self._entries.reverse()
   1490     if not self._entries:
   1491       raise StopIteration()
   1492     entry = self._entries.pop()
   1493     self._start_index += 1
   1494     return (entry, lambda: self._read(entry))
   1495 
   1496   def _read(self, entry):
   1497     """Read entry content.
   1498 
   1499     Args:
   1500       entry: zip file entry as zipfile.ZipInfo.
   1501     Returns:
   1502       Entry content as string.
   1503     """
   1504     start_time = time.time()
   1505     content = self._zip.read(entry.filename)
   1506 
   1507     ctx = context.get()
   1508     if ctx:
   1509       operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
   1510       operation.counters.Increment(
   1511           COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
   1512 
   1513     return content
   1514 
   1515   @classmethod
   1516   def from_json(cls, json):
   1517     """Creates an instance of the InputReader for the given input shard state.
   1518 
   1519     Args:
   1520       json: The InputReader state as a dict-like object.
   1521 
   1522     Returns:
   1523       An instance of the InputReader configured using the values of json.
   1524     """
   1525     return cls(json[cls.BLOB_KEY_PARAM],
   1526                json[cls.START_INDEX_PARAM],
   1527                json[cls.END_INDEX_PARAM])
   1528 
   1529   def to_json(self):
   1530     """Returns an input shard state for the remaining inputs.
   1531 
   1532     Returns:
   1533       A json-izable version of the remaining InputReader.
   1534     """
   1535     return {self.BLOB_KEY_PARAM: self._blob_key,
   1536             self.START_INDEX_PARAM: self._start_index,
   1537             self.END_INDEX_PARAM: self._end_index}
   1538 
   1539   def __str__(self):
   1540     """Returns the string representation of this BlobstoreZipInputReader."""
   1541     return "blobstore.BlobKey(%r):[%d, %d]" % (
   1542         self._blob_key, self._start_index, self._end_index)
   1543 
   1544   @classmethod
   1545   def validate(cls, mapper_spec):
   1546     """Validates mapper spec and all mapper parameters.
   1547 
   1548     Args:
   1549       mapper_spec: The MapperSpec for this InputReader.
   1550 
   1551     Raises:
   1552       BadReaderParamsError: required parameters are missing or invalid.
   1553     """
   1554     if mapper_spec.input_reader_class() != cls:
   1555       raise BadReaderParamsError("Mapper input reader class mismatch")
   1556     params = _get_params(mapper_spec)
   1557     if cls.BLOB_KEY_PARAM not in params:
   1558       raise BadReaderParamsError("Must specify 'blob_key' for mapper input")
   1559     blob_key = params[cls.BLOB_KEY_PARAM]
   1560     blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
   1561     if not blob_info:
   1562       raise BadReaderParamsError("Could not find blobinfo for key %s" %
   1563                                  blob_key)
   1564 
   1565   @classmethod
   1566   def split_input(cls, mapper_spec, _reader=blobstore.BlobReader):
   1567     """Returns a list of input shard states for the input spec.
   1568 
   1569     Args:
   1570       mapper_spec: The MapperSpec for this InputReader. Must contain
   1571           'blob_key' parameter with one blob key.
   1572       _reader: a callable that returns a file-like object for reading blobs.
   1573           Used for dependency injection.
   1574 
   1575     Returns:
   1576       A list of InputReaders spanning files within the zip.
   1577     """
   1578     params = _get_params(mapper_spec)
   1579     blob_key = params[cls.BLOB_KEY_PARAM]
   1580     zip_input = zipfile.ZipFile(_reader(blob_key))
   1581     zfiles = zip_input.infolist()
   1582     total_size = sum(x.file_size for x in zfiles)
   1583     num_shards = min(mapper_spec.shard_count, cls._MAX_SHARD_COUNT)
   1584     size_per_shard = total_size // num_shards
   1585 
   1586     # Break the list of files into sublists, each of approximately
   1587     # size_per_shard bytes.
   1588     shard_start_indexes = [0]
   1589     current_shard_size = 0
   1590     for i, fileinfo in enumerate(zfiles):
   1591       current_shard_size += fileinfo.file_size
   1592       if current_shard_size >= size_per_shard:
   1593         shard_start_indexes.append(i + 1)
   1594         current_shard_size = 0
   1595 
   1596     if shard_start_indexes[-1] != len(zfiles):
   1597       shard_start_indexes.append(len(zfiles))
   1598 
   1599     return [cls(blob_key, start_index, end_index, _reader)
   1600             for start_index, end_index
   1601             in zip(shard_start_indexes, shard_start_indexes[1:])]
   1602 
   1603 
   1604 class BlobstoreZipLineInputReader(InputReader):
   1605   """Input reader for newline delimited files in zip archives from Blobstore.
   1606 
   1607   This has the same external interface as the BlobstoreLineInputReader, in that
   1608   it takes a list of blobs as its input and yields lines to the reader.
   1609   However the blobs themselves are expected to be zip archives of line delimited
   1610   files instead of the files themselves.
   1611 
   1612   This is useful as many line delimited files gain greatly from compression.
   1613   """
   1614 
   1615   # Maximum number of shards to allow.
   1616   _MAX_SHARD_COUNT = 256
   1617 
   1618   # Maximum number of blobs to allow.
   1619   _MAX_BLOB_KEYS_COUNT = 246
   1620 
   1621   # Mapreduce parameters.
   1622   BLOB_KEYS_PARAM = "blob_keys"
   1623 
   1624   # Serialization parameters.
   1625   BLOB_KEY_PARAM = "blob_key"
   1626   START_FILE_INDEX_PARAM = "start_file_index"
   1627   END_FILE_INDEX_PARAM = "end_file_index"
   1628   OFFSET_PARAM = "offset"
   1629 
   1630   def __init__(self, blob_key, start_file_index, end_file_index, offset,
   1631                _reader=blobstore.BlobReader):
   1632     """Initializes this instance with the given blob key and file range.
   1633 
   1634     This BlobstoreZipLineInputReader will read from the file with index
   1635     start_file_index up to but not including the file with index end_file_index.
   1636     It will return lines starting at offset within file[start_file_index]
   1637 
   1638     Args:
   1639       blob_key: the BlobKey that this input reader is processing.
   1640       start_file_index: the index of the first file to read within the zip.
   1641       end_file_index: the index of the first file that will not be read.
   1642       offset: the byte offset within blob_key.zip[start_file_index] to start
   1643         reading. The reader will continue to the end of the file.
   1644       _reader: a callable that returns a file-like object for reading blobs.
   1645           Used for dependency injection.
   1646     """
   1647     self._blob_key = blob_key
   1648     self._start_file_index = start_file_index
   1649     self._end_file_index = end_file_index
   1650     self._initial_offset = offset
   1651     self._reader = _reader
   1652     self._zip = None
   1653     self._entries = None
   1654     self._filestream = None
   1655 
   1656   @classmethod
   1657   def validate(cls, mapper_spec):
   1658     """Validates mapper spec and all mapper parameters.
   1659 
   1660     Args:
   1661       mapper_spec: The MapperSpec for this InputReader.
   1662 
   1663     Raises:
   1664       BadReaderParamsError: required parameters are missing or invalid.
   1665     """
   1666     if mapper_spec.input_reader_class() != cls:
   1667       raise BadReaderParamsError("Mapper input reader class mismatch")
   1668     params = _get_params(mapper_spec)
   1669     if cls.BLOB_KEYS_PARAM not in params:
   1670       raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
   1671 
   1672     blob_keys = params[cls.BLOB_KEYS_PARAM]
   1673     if isinstance(blob_keys, basestring):
   1674       # This is a mechanism to allow multiple blob keys (which do not contain
   1675       # commas) in a single string. It may go away.
   1676       blob_keys = blob_keys.split(",")
   1677     if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT:
   1678       raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
   1679     if not blob_keys:
   1680       raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
   1681     for blob_key in blob_keys:
   1682       blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
   1683       if not blob_info:
   1684         raise BadReaderParamsError("Could not find blobinfo for key %s" %
   1685                                    blob_key)
   1686 
   1687   @classmethod
   1688   def split_input(cls, mapper_spec, _reader=blobstore.BlobReader):
   1689     """Returns a list of input readers for the input spec.
   1690 
   1691     Args:
   1692       mapper_spec: The MapperSpec for this InputReader. Must contain
   1693           'blob_keys' parameter with one or more blob keys.
   1694       _reader: a callable that returns a file-like object for reading blobs.
   1695           Used for dependency injection.
   1696 
   1697     Returns:
   1698       A list of InputReaders spanning the subfiles within the blobs.
   1699       There will be at least one reader per blob, but it will otherwise
   1700       attempt to keep the expanded size even.
   1701     """
   1702     params = _get_params(mapper_spec)
   1703     blob_keys = params[cls.BLOB_KEYS_PARAM]
   1704     if isinstance(blob_keys, basestring):
   1705       # This is a mechanism to allow multiple blob keys (which do not contain
   1706       # commas) in a single string. It may go away.
   1707       blob_keys = blob_keys.split(",")
   1708 
   1709     blob_files = {}
   1710     total_size = 0
   1711     for blob_key in blob_keys:
   1712       zip_input = zipfile.ZipFile(_reader(blob_key))
   1713       blob_files[blob_key] = zip_input.infolist()
   1714       total_size += sum(x.file_size for x in blob_files[blob_key])
   1715 
   1716     shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
   1717 
   1718     # We can break on both blob key and file-within-zip boundaries.
   1719     # A shard will span at minimum a single blob key, but may only
   1720     # handle a few files within a blob.
   1721 
   1722     size_per_shard = total_size // shard_count
   1723 
   1724     readers = []
   1725     for blob_key in blob_keys:
   1726       bfiles = blob_files[blob_key]
   1727       current_shard_size = 0
   1728       start_file_index = 0
   1729       next_file_index = 0
   1730       for fileinfo in bfiles:
   1731         next_file_index += 1
   1732         current_shard_size += fileinfo.file_size
   1733         if current_shard_size >= size_per_shard:
   1734           readers.append(cls(blob_key, start_file_index, next_file_index, 0,
   1735                              _reader))
   1736           current_shard_size = 0
   1737           start_file_index = next_file_index
   1738       if current_shard_size != 0:
   1739         readers.append(cls(blob_key, start_file_index, next_file_index, 0,
   1740                            _reader))
   1741 
   1742     return readers
   1743 
   1744   def next(self):
   1745     """Returns the next line from this input reader as (lineinfo, line) tuple.
   1746 
   1747     Returns:
   1748       The next input from this input reader, in the form of a 2-tuple.
   1749       The first element of the tuple describes the source, it is itself
   1750         a tuple (blobkey, filenumber, byteoffset).
   1751       The second element of the tuple is the line found at that offset.
   1752     """
   1753     if not self._filestream:
   1754       if not self._zip:
   1755         self._zip = zipfile.ZipFile(self._reader(self._blob_key))
   1756         # Get a list of entries, reversed so we can pop entries off in order
   1757         self._entries = self._zip.infolist()[self._start_file_index:
   1758                                              self._end_file_index]
   1759         self._entries.reverse()
   1760       if not self._entries:
   1761         raise StopIteration()
   1762       entry = self._entries.pop()
   1763       value = self._zip.read(entry.filename)
   1764       self._filestream = StringIO.StringIO(value)
   1765       if self._initial_offset:
   1766         self._filestream.seek(self._initial_offset)
   1767         self._filestream.readline()
   1768 
   1769     start_position = self._filestream.tell()
   1770     line = self._filestream.readline()
   1771 
   1772     if not line:
   1773       # Done with this file in the zip. Move on to the next file.
   1774       self._filestream.close()
   1775       self._filestream = None
   1776       self._start_file_index += 1
   1777       self._initial_offset = 0
   1778       return self.next()
   1779 
   1780     return ((self._blob_key, self._start_file_index, start_position),
   1781             line.rstrip("\n"))
   1782 
   1783   def _next_offset(self):
   1784     """Return the offset of the next line to read."""
   1785     if self._filestream:
   1786       offset = self._filestream.tell()
   1787       if offset:
   1788         offset -= 1
   1789     else:
   1790       offset = self._initial_offset
   1791 
   1792     return offset
   1793 
   1794   def to_json(self):
   1795     """Returns an input shard state for the remaining inputs.
   1796 
   1797     Returns:
   1798       A json-izable version of the remaining InputReader.
   1799     """
   1800 
   1801     return {self.BLOB_KEY_PARAM: self._blob_key,
   1802             self.START_FILE_INDEX_PARAM: self._start_file_index,
   1803             self.END_FILE_INDEX_PARAM: self._end_file_index,
   1804             self.OFFSET_PARAM: self._next_offset()}
   1805 
   1806   @classmethod
   1807   def from_json(cls, json, _reader=blobstore.BlobReader):
   1808     """Creates an instance of the InputReader for the given input shard state.
   1809 
   1810     Args:
   1811       json: The InputReader state as a dict-like object.
   1812       _reader: For dependency injection.
   1813 
   1814     Returns:
   1815       An instance of the InputReader configured using the values of json.
   1816     """
   1817     return cls(json[cls.BLOB_KEY_PARAM],
   1818                json[cls.START_FILE_INDEX_PARAM],
   1819                json[cls.END_FILE_INDEX_PARAM],
   1820                json[cls.OFFSET_PARAM],
   1821                _reader)
   1822 
   1823   def __str__(self):
   1824     """Returns the string representation of this reader.
   1825 
   1826     Returns:
   1827       string blobkey:[start file num, end file num]:current offset.
   1828     """
   1829     return "blobstore.BlobKey(%r):[%d, %d]:%d" % (
   1830         self._blob_key, self._start_file_index, self._end_file_index,
   1831         self._next_offset())
   1832 
   1833 
   1834 class RandomStringInputReader(InputReader):
   1835   """RandomStringInputReader generates random strings as output.
   1836 
   1837   Primary usage is to populate output with testing entries.
   1838   """
   1839 
   1840   # Total number of entries this reader should generate.
   1841   COUNT = "count"
   1842   # Length of the generated strings.
   1843   STRING_LENGTH = "string_length"
   1844 
   1845   DEFAULT_STRING_LENGTH = 10
   1846 
   1847   def __init__(self, count, string_length):
   1848     """Initialize input reader.
   1849 
   1850     Args:
   1851       count: number of entries this shard should generate.
   1852       string_length: the length of generated random strings.
   1853     """
   1854     self._count = count
   1855     self._string_length = string_length
   1856 
   1857   def __iter__(self):
   1858     ctx = context.get()
   1859 
   1860     while self._count:
   1861       self._count -= 1
   1862       start_time = time.time()
   1863       content = "".join(random.choice(string.ascii_lowercase)
   1864                         for _ in range(self._string_length))
   1865       if ctx:
   1866         operation.counters.Increment(
   1867             COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
   1868         operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
   1869       yield content
   1870 
   1871   @classmethod
   1872   def split_input(cls, mapper_spec):
   1873     params = _get_params(mapper_spec)
   1874     count = params[cls.COUNT]
   1875     string_length = cls.DEFAULT_STRING_LENGTH
   1876     if cls.STRING_LENGTH in params:
   1877       string_length = params[cls.STRING_LENGTH]
   1878 
   1879     shard_count = mapper_spec.shard_count
   1880     count_per_shard = count // shard_count
   1881 
   1882     mr_input_readers = [
   1883         cls(count_per_shard, string_length) for _ in range(shard_count)]
   1884 
   1885     left = count - count_per_shard*shard_count
   1886     if left > 0:
   1887       mr_input_readers.append(cls(left, string_length))
   1888 
   1889     return mr_input_readers
   1890 
   1891   @classmethod
   1892   def validate(cls, mapper_spec):
   1893     if mapper_spec.input_reader_class() != cls:
   1894       raise BadReaderParamsError("Mapper input reader class mismatch")
   1895 
   1896     params = _get_params(mapper_spec)
   1897     if cls.COUNT not in params:
   1898       raise BadReaderParamsError("Must specify %s" % cls.COUNT)
   1899     if not isinstance(params[cls.COUNT], int):
   1900       raise BadReaderParamsError("%s should be an int but is %s" %
   1901                                  (cls.COUNT, type(params[cls.COUNT])))
   1902     if params[cls.COUNT] <= 0:
   1903       raise BadReaderParamsError("%s should be a positive int")
   1904     if cls.STRING_LENGTH in params and not (
   1905         isinstance(params[cls.STRING_LENGTH], int) and
   1906         params[cls.STRING_LENGTH] > 0):
   1907       raise BadReaderParamsError("%s should be a positive int but is %s" %
   1908                                  (cls.STRING_LENGTH, params[cls.STRING_LENGTH]))
   1909     if (not isinstance(mapper_spec.shard_count, int) or
   1910         mapper_spec.shard_count <= 0):
   1911       raise BadReaderParamsError(
   1912           "shard_count should be a positive int but is %s" %
   1913           mapper_spec.shard_count)
   1914 
   1915   @classmethod
   1916   def from_json(cls, json):
   1917     return cls(json[cls.COUNT], json[cls.STRING_LENGTH])
   1918 
   1919   def to_json(self):
   1920     return {self.COUNT: self._count, self.STRING_LENGTH: self._string_length}
   1921 
   1922 
   1923 # TODO(user): This reader always produces only one shard, because
   1924 # namespace entities use the mix of ids/names, and KeyRange-based splitting
   1925 # doesn't work satisfactory in this case.
   1926 # It's possible to implement specific splitting functionality for the reader
   1927 # instead of reusing generic one. Meanwhile 1 shard is enough for our
   1928 # applications.
   1929 class NamespaceInputReader(InputReader):
   1930   """An input reader to iterate over namespaces.
   1931 
   1932   This reader yields namespace names as string.
   1933   It will always produce only one shard.
   1934   """
   1935 
   1936   NAMESPACE_RANGE_PARAM = "namespace_range"
   1937   BATCH_SIZE_PARAM = "batch_size"
   1938   _BATCH_SIZE = 10
   1939 
   1940   def __init__(self, ns_range, batch_size=_BATCH_SIZE):
   1941     self.ns_range = ns_range
   1942     self._batch_size = batch_size
   1943 
   1944   def to_json(self):
   1945     """Serializes all the data in this query range into json form.
   1946 
   1947     Returns:
   1948       all the data in json-compatible map.
   1949     """
   1950     return {self.NAMESPACE_RANGE_PARAM: self.ns_range.to_json_object(),
   1951             self.BATCH_SIZE_PARAM: self._batch_size}
   1952 
   1953   @classmethod
   1954   def from_json(cls, json):
   1955     """Create new DatastoreInputReader from the json, encoded by to_json.
   1956 
   1957     Args:
   1958       json: json map representation of DatastoreInputReader.
   1959 
   1960     Returns:
   1961       an instance of DatastoreInputReader with all data deserialized from json.
   1962     """
   1963     return cls(
   1964         namespace_range.NamespaceRange.from_json_object(
   1965             json[cls.NAMESPACE_RANGE_PARAM]),
   1966         json[cls.BATCH_SIZE_PARAM])
   1967 
   1968   @classmethod
   1969   def validate(cls, mapper_spec):
   1970     """Validates mapper spec.
   1971 
   1972     Args:
   1973       mapper_spec: The MapperSpec for this InputReader.
   1974 
   1975     Raises:
   1976       BadReaderParamsError: required parameters are missing or invalid.
   1977     """
   1978     if mapper_spec.input_reader_class() != cls:
   1979       raise BadReaderParamsError("Input reader class mismatch")
   1980     params = _get_params(mapper_spec)
   1981     if cls.BATCH_SIZE_PARAM in params:
   1982       try:
   1983         batch_size = int(params[cls.BATCH_SIZE_PARAM])
   1984         if batch_size < 1:
   1985           raise BadReaderParamsError("Bad batch size: %s" % batch_size)
   1986       except ValueError, e:
   1987         raise BadReaderParamsError("Bad batch size: %s" % e)
   1988 
   1989   @classmethod
   1990   def split_input(cls, mapper_spec):
   1991     """Returns a list of input readers for the input spec.
   1992 
   1993     Args:
   1994       mapper_spec: The MapperSpec for this InputReader.
   1995 
   1996     Returns:
   1997       A list of InputReaders.
   1998     """
   1999     batch_size = int(_get_params(mapper_spec).get(
   2000         cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
   2001     shard_count = mapper_spec.shard_count
   2002     namespace_ranges = namespace_range.NamespaceRange.split(shard_count,
   2003                                                             contiguous=True)
   2004     return [NamespaceInputReader(ns_range, batch_size)
   2005             for ns_range in namespace_ranges]
   2006 
   2007   def __iter__(self):
   2008     while True:
   2009       keys = self.ns_range.make_datastore_query().Get(limit=self._batch_size)
   2010       if not keys:
   2011         break
   2012 
   2013       for key in keys:
   2014         namespace = metadata.Namespace.key_to_namespace(key)
   2015         self.ns_range = self.ns_range.with_start_after(namespace)
   2016         yield namespace
   2017 
   2018   def __str__(self):
   2019     return repr(self.ns_range)
   2020 
   2021 
   2022 class LogInputReader(InputReader):
   2023   """Input reader for a time range of logs via the Logs Reader API.
   2024 
   2025   The number of input shards may be specified by the SHARDS_PARAM mapper
   2026   parameter.  A starting and ending time (in seconds since the Unix epoch) are
   2027   required to generate time ranges over which to shard the input.
   2028   """
   2029   # Parameters directly mapping to those available via logservice.fetch().
   2030   START_TIME_PARAM = "start_time"
   2031   END_TIME_PARAM = "end_time"
   2032   MINIMUM_LOG_LEVEL_PARAM = "minimum_log_level"
   2033   INCLUDE_INCOMPLETE_PARAM = "include_incomplete"
   2034   INCLUDE_APP_LOGS_PARAM = "include_app_logs"
   2035   VERSION_IDS_PARAM = "version_ids"
   2036   MODULE_VERSIONS_PARAM = "module_versions"
   2037 
   2038   # Semi-hidden parameters used only internally or for privileged applications.
   2039   _OFFSET_PARAM = "offset"
   2040   _PROTOTYPE_REQUEST_PARAM = "prototype_request"
   2041 
   2042   _PARAMS = frozenset([START_TIME_PARAM, END_TIME_PARAM, _OFFSET_PARAM,
   2043                        MINIMUM_LOG_LEVEL_PARAM, INCLUDE_INCOMPLETE_PARAM,
   2044                        INCLUDE_APP_LOGS_PARAM, VERSION_IDS_PARAM,
   2045                        MODULE_VERSIONS_PARAM, _PROTOTYPE_REQUEST_PARAM])
   2046   _KWARGS = frozenset([_OFFSET_PARAM, _PROTOTYPE_REQUEST_PARAM])
   2047 
   2048   def __init__(self,
   2049                start_time=None,
   2050                end_time=None,
   2051                minimum_log_level=None,
   2052                include_incomplete=False,
   2053                include_app_logs=False,
   2054                version_ids=None,
   2055                module_versions=None,
   2056                **kwargs):
   2057     """Constructor.
   2058 
   2059     Args:
   2060       start_time: The earliest request completion or last-update time of logs
   2061         that should be mapped over, in seconds since the Unix epoch.
   2062       end_time: The latest request completion or last-update time that logs
   2063         should be mapped over, in seconds since the Unix epoch.
   2064       minimum_log_level: An application log level which serves as a filter on
   2065         the requests mapped over--requests with no application log at or above
   2066         the specified level will be omitted, even if include_app_logs is False.
   2067       include_incomplete: Whether or not to include requests that have started
   2068         but not yet finished, as a boolean.  Defaults to False.
   2069       include_app_logs: Whether or not to include application level logs in the
   2070         mapped logs, as a boolean.  Defaults to False.
   2071       version_ids: A list of version ids whose logs should be read. This can not
   2072         be used with module_versions
   2073       module_versions: A list of tuples containing a module and version id
   2074         whose logs should be read. This can not be used with version_ids
   2075       **kwargs: A dictionary of keywords associated with this input reader.
   2076     """
   2077     InputReader.__init__(self)  # pylint: disable=non-parent-init-called
   2078 
   2079     # The rule for __params is that its contents will always be suitable as
   2080     # input to logservice.fetch().
   2081     self.__params = dict(kwargs)
   2082 
   2083     if start_time is not None:
   2084       self.__params[self.START_TIME_PARAM] = start_time
   2085     if end_time is not None:
   2086       self.__params[self.END_TIME_PARAM] = end_time
   2087     if minimum_log_level is not None:
   2088       self.__params[self.MINIMUM_LOG_LEVEL_PARAM] = minimum_log_level
   2089     if include_incomplete is not None:
   2090       self.__params[self.INCLUDE_INCOMPLETE_PARAM] = include_incomplete
   2091     if include_app_logs is not None:
   2092       self.__params[self.INCLUDE_APP_LOGS_PARAM] = include_app_logs
   2093     if version_ids:
   2094       self.__params[self.VERSION_IDS_PARAM] = version_ids
   2095     if module_versions:
   2096       self.__params[self.MODULE_VERSIONS_PARAM] = module_versions
   2097 
   2098     # Any submitted prototype_request will be in encoded form.
   2099     if self._PROTOTYPE_REQUEST_PARAM in self.__params:
   2100       prototype_request = log_service_pb.LogReadRequest(
   2101           self.__params[self._PROTOTYPE_REQUEST_PARAM])
   2102       self.__params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request
   2103 
   2104   def __iter__(self):
   2105     """Iterates over logs in a given range of time.
   2106 
   2107     Yields:
   2108       A RequestLog containing all the information for a single request.
   2109     """
   2110     for log in logservice.fetch(**self.__params):
   2111       self.__params[self._OFFSET_PARAM] = log.offset
   2112       yield log
   2113 
   2114   @classmethod
   2115   def from_json(cls, json):
   2116     """Creates an instance of the InputReader for the given input shard's state.
   2117 
   2118     Args:
   2119       json: The InputReader state as a dict-like object.
   2120 
   2121     Returns:
   2122       An instance of the InputReader configured using the given JSON parameters.
   2123     """
   2124     # Strip out unrecognized parameters, as introduced by b/5960884.
   2125     params = dict((str(k), v) for k, v in json.iteritems()
   2126                   if k in cls._PARAMS)
   2127 
   2128     # This is not symmetric with to_json() wrt. PROTOTYPE_REQUEST_PARAM because
   2129     # the constructor parameters need to be JSON-encodable, so the decoding
   2130     # needs to happen there anyways.
   2131     if cls._OFFSET_PARAM in params:
   2132       params[cls._OFFSET_PARAM] = base64.b64decode(params[cls._OFFSET_PARAM])
   2133     return cls(**params)
   2134 
   2135   def to_json(self):
   2136     """Returns an input shard state for the remaining inputs.
   2137 
   2138     Returns:
   2139       A JSON serializable version of the remaining input to read.
   2140     """
   2141 
   2142     params = dict(self.__params)  # Shallow copy.
   2143     if self._PROTOTYPE_REQUEST_PARAM in params:
   2144       prototype_request = params[self._PROTOTYPE_REQUEST_PARAM]
   2145       params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request.Encode()
   2146     if self._OFFSET_PARAM in params:
   2147       params[self._OFFSET_PARAM] = base64.b64encode(params[self._OFFSET_PARAM])
   2148     return params
   2149 
   2150   @classmethod
   2151   def split_input(cls, mapper_spec):
   2152     """Returns a list of input readers for the given input specification.
   2153 
   2154     Args:
   2155       mapper_spec: The MapperSpec for this InputReader.
   2156 
   2157     Returns:
   2158       A list of InputReaders.
   2159     """
   2160     params = _get_params(mapper_spec)
   2161     shard_count = mapper_spec.shard_count
   2162 
   2163     # Pick out the overall start and end times and time step per shard.
   2164     start_time = params[cls.START_TIME_PARAM]
   2165     end_time = params[cls.END_TIME_PARAM]
   2166     seconds_per_shard = (end_time - start_time) / shard_count
   2167 
   2168     # Create a LogInputReader for each shard, modulating the params as we go.
   2169     shards = []
   2170     for _ in xrange(shard_count - 1):
   2171       params[cls.END_TIME_PARAM] = (params[cls.START_TIME_PARAM] +
   2172                                     seconds_per_shard)
   2173       shards.append(LogInputReader(**params))
   2174       params[cls.START_TIME_PARAM] = params[cls.END_TIME_PARAM]
   2175 
   2176     # Create a final shard to complete the time range.
   2177     params[cls.END_TIME_PARAM] = end_time
   2178     return shards + [LogInputReader(**params)]
   2179 
   2180   @classmethod
   2181   def validate(cls, mapper_spec):
   2182     """Validates the mapper's specification and all necessary parameters.
   2183 
   2184     Args:
   2185       mapper_spec: The MapperSpec to be used with this InputReader.
   2186 
   2187     Raises:
   2188       BadReaderParamsError: If the user fails to specify both a starting time
   2189         and an ending time, or if the starting time is later than the ending
   2190         time.
   2191     """
   2192     if mapper_spec.input_reader_class() != cls:
   2193       raise errors.BadReaderParamsError("Input reader class mismatch")
   2194 
   2195     params = _get_params(mapper_spec, allowed_keys=cls._PARAMS)
   2196     if (cls.VERSION_IDS_PARAM not in params and
   2197         cls.MODULE_VERSIONS_PARAM not in params):
   2198       raise errors.BadReaderParamsError("Must specify a list of version ids or "
   2199                                         "module/version ids for mapper input")
   2200     if (cls.VERSION_IDS_PARAM in params and
   2201         cls.MODULE_VERSIONS_PARAM in params):
   2202       raise errors.BadReaderParamsError("Can not supply both version ids or "
   2203                                         "module/version ids. Use only one.")
   2204     if (cls.START_TIME_PARAM not in params or
   2205         params[cls.START_TIME_PARAM] is None):
   2206       raise errors.BadReaderParamsError("Must specify a starting time for "
   2207                                         "mapper input")
   2208     if cls.END_TIME_PARAM not in params or params[cls.END_TIME_PARAM] is None:
   2209       params[cls.END_TIME_PARAM] = time.time()
   2210 
   2211     if params[cls.START_TIME_PARAM] >= params[cls.END_TIME_PARAM]:
   2212       raise errors.BadReaderParamsError("The starting time cannot be later "
   2213                                         "than or the same as the ending time.")
   2214 
   2215     if cls._PROTOTYPE_REQUEST_PARAM in params:
   2216       try:
   2217         params[cls._PROTOTYPE_REQUEST_PARAM] = log_service_pb.LogReadRequest(
   2218             params[cls._PROTOTYPE_REQUEST_PARAM])
   2219       except (TypeError, ProtocolBuffer.ProtocolBufferDecodeError):
   2220         raise errors.BadReaderParamsError("The prototype request must be "
   2221                                           "parseable as a LogReadRequest.")
   2222 
   2223     # Pass the parameters to logservice.fetch() to verify any underlying
   2224     # constraints on types or values.  This only constructs an iterator, it
   2225     # doesn't trigger any requests for actual log records.
   2226     try:
   2227       logservice.fetch(**params)
   2228     except logservice.InvalidArgumentError, e:
   2229       raise errors.BadReaderParamsError("One or more parameters are not valid "
   2230                                         "inputs to logservice.fetch(): %s" % e)
   2231 
   2232   def __str__(self):
   2233     """Returns the string representation of this LogInputReader."""
   2234     params = []
   2235     for key in sorted(self.__params.keys()):
   2236       value = self.__params[key]
   2237       if key is self._PROTOTYPE_REQUEST_PARAM:
   2238         params.append("%s='%s'" % (key, value))
   2239       elif key is self._OFFSET_PARAM:
   2240         params.append("%s='%s'" % (key, value))
   2241       else:
   2242         params.append("%s=%s" % (key, value))
   2243 
   2244     return "LogInputReader(%s)" % ", ".join(params)
   2245 
   2246 
   2247 class _GoogleCloudStorageInputReader(InputReader):
   2248   """Input reader from Google Cloud Storage using the cloudstorage library.
   2249 
   2250   This class is expected to be subclassed with a reader that understands
   2251   user-level records.
   2252 
   2253   Required configuration in the mapper_spec.input_reader dictionary.
   2254     BUCKET_NAME_PARAM: name of the bucket to use (with no extra delimiters or
   2255       suffixed such as directories.
   2256     OBJECT_NAMES_PARAM: a list of object names or prefixes. All objects must be
   2257       in the BUCKET_NAME_PARAM bucket. If the name ends with a * it will be
   2258       treated as prefix and all objects with matching names will be read.
   2259       Entries should not start with a slash unless that is part of the object's
   2260       name. An example list could be:
   2261       ["my-1st-input-file", "directory/my-2nd-file", "some/other/dir/input-*"]
   2262       To retrieve all files "*" will match every object in the bucket. If a file
   2263       is listed twice or is covered by multiple prefixes it will be read twice,
   2264       there is no deduplication.
   2265 
   2266   Optional configuration in the mapper_sec.input_reader dictionary.
   2267     BUFFER_SIZE_PARAM: the size of the read buffer for each file handle.
   2268     DELIMITER_PARAM: if specified, turn on the shallow splitting mode.
   2269       The delimiter is used as a path separator to designate directory
   2270       hierarchy. Matching of prefixes from OBJECT_NAME_PARAM
   2271       will stop at the first directory instead of matching
   2272       all files under the directory. This allows MR to process bucket with
   2273       hundreds of thousands of files.
   2274     FAIL_ON_MISSING_INPUT: if specified and True, the MR will fail if any of
   2275       the input files are missing. Missing files will be skipped otherwise.
   2276   """
   2277 
   2278   # Supported parameters
   2279   BUCKET_NAME_PARAM = "bucket_name"
   2280   OBJECT_NAMES_PARAM = "objects"
   2281   BUFFER_SIZE_PARAM = "buffer_size"
   2282   DELIMITER_PARAM = "delimiter"
   2283   FAIL_ON_MISSING_INPUT = "fail_on_missing_input"
   2284 
   2285   # Internal parameters
   2286   _ACCOUNT_ID_PARAM = "account_id"
   2287 
   2288   # Other internal configuration constants
   2289   _JSON_PICKLE = "pickle"
   2290   _JSON_FAIL_ON_MISSING_INPUT = "fail_on_missing_input"
   2291   _STRING_MAX_FILES_LISTED = 10  # Max files shown in the str representation
   2292 
   2293   # Input reader can also take in start and end filenames and do
   2294   # listbucket. This saves space but has two cons.
   2295   # 1. Files to read are less well defined: files can be added or removed over
   2296   #    the lifetime of the MR job.
   2297   # 2. A shard has to process files from a contiguous namespace.
   2298   #    May introduce staggering shard.
   2299   def __init__(self, filenames, index=0, buffer_size=None, _account_id=None,
   2300                delimiter=None):
   2301     """Initialize a GoogleCloudStorageInputReader instance.
   2302 
   2303     Args:
   2304       filenames: A list of Google Cloud Storage filenames of the form
   2305         '/bucket/objectname'.
   2306       index: Index of the next filename to read.
   2307       buffer_size: The size of the read buffer, None to use default.
   2308       _account_id: Internal use only. See cloudstorage documentation.
   2309       delimiter: Delimiter used as path separator. See class doc for details.
   2310     """
   2311     self._filenames = filenames
   2312     self._index = index
   2313     self._buffer_size = buffer_size
   2314     self._account_id = _account_id
   2315     self._delimiter = delimiter
   2316     self._bucket = None
   2317     self._bucket_iter = None
   2318 
   2319     # True iff we should fail on missing input (see class doc above). Set to
   2320     # None in constructor and overwritten in split_input and from_json.
   2321     # fail_on_missing_input is not parameter of the constructor to avoid
   2322     # breaking classes inheriting from _GoogleCloudStorageInputReader and
   2323     # overriding the constructor.
   2324     self._fail_on_missing_input = None
   2325 
   2326   def _next_file(self):
   2327     """Find next filename.
   2328 
   2329     self._filenames may need to be expanded via listbucket.
   2330 
   2331     Returns:
   2332       None if no more file is left. Filename otherwise.
   2333     """
   2334     while True:
   2335       if self._bucket_iter:
   2336         try:
   2337           return self._bucket_iter.next().filename
   2338         except StopIteration:
   2339           self._bucket_iter = None
   2340           self._bucket = None
   2341       if self._index >= len(self._filenames):
   2342         return
   2343       filename = self._filenames[self._index]
   2344       self._index += 1
   2345       if self._delimiter is None or not filename.endswith(self._delimiter):
   2346         return filename
   2347       self._bucket = cloudstorage.listbucket(filename,
   2348                                              delimiter=self._delimiter)
   2349       self._bucket_iter = iter(self._bucket)
   2350 
   2351   @classmethod
   2352   def get_params(cls, mapper_spec, allowed_keys=None, allow_old=True):
   2353     params = _get_params(mapper_spec, allowed_keys, allow_old)
   2354     # Use the bucket_name defined in mapper_spec params if one was not defined
   2355     # specifically in the input_reader params.
   2356     if (mapper_spec.params.get(cls.BUCKET_NAME_PARAM) is not None and
   2357         params.get(cls.BUCKET_NAME_PARAM) is None):
   2358       params[cls.BUCKET_NAME_PARAM] = mapper_spec.params[cls.BUCKET_NAME_PARAM]
   2359     return params
   2360 
   2361   @classmethod
   2362   def validate(cls, mapper_spec):
   2363     """Validate mapper specification.
   2364 
   2365     Args:
   2366       mapper_spec: an instance of model.MapperSpec
   2367 
   2368     Raises:
   2369       BadReaderParamsError: if the specification is invalid for any reason such
   2370         as missing the bucket name or providing an invalid bucket name.
   2371     """
   2372     reader_spec = cls.get_params(mapper_spec, allow_old=False)
   2373 
   2374     # Bucket Name is required
   2375     if cls.BUCKET_NAME_PARAM not in reader_spec:
   2376       raise errors.BadReaderParamsError(
   2377           "%s is required for Google Cloud Storage" %
   2378           cls.BUCKET_NAME_PARAM)
   2379     try:
   2380       cloudstorage.validate_bucket_name(
   2381           reader_spec[cls.BUCKET_NAME_PARAM])
   2382     except ValueError, error:
   2383       raise errors.BadReaderParamsError("Bad bucket name, %s" % (error))
   2384 
   2385     # Object Name(s) are required
   2386     if cls.OBJECT_NAMES_PARAM not in reader_spec:
   2387       raise errors.BadReaderParamsError(
   2388           "%s is required for Google Cloud Storage" %
   2389           cls.OBJECT_NAMES_PARAM)
   2390     filenames = reader_spec[cls.OBJECT_NAMES_PARAM]
   2391     if not isinstance(filenames, list):
   2392       raise errors.BadReaderParamsError(
   2393           "Object name list is not a list but a %s" %
   2394           filenames.__class__.__name__)
   2395     for filename in filenames:
   2396       if not isinstance(filename, basestring):
   2397         raise errors.BadReaderParamsError(
   2398             "Object name is not a string but a %s" %
   2399             filename.__class__.__name__)
   2400     if cls.DELIMITER_PARAM in reader_spec:
   2401       delimiter = reader_spec[cls.DELIMITER_PARAM]
   2402       if not isinstance(delimiter, basestring):
   2403         raise errors.BadReaderParamsError(
   2404             "%s is not a string but a %s" %
   2405             (cls.DELIMITER_PARAM, type(delimiter)))
   2406 
   2407   @classmethod
   2408   def split_input(cls, mapper_spec):
   2409     """Returns a list of input readers.
   2410 
   2411     An equal number of input files are assigned to each shard (+/- 1). If there
   2412     are fewer files than shards, fewer than the requested number of shards will
   2413     be used. Input files are currently never split (although for some formats
   2414     could be and may be split in a future implementation).
   2415 
   2416     Args:
   2417       mapper_spec: an instance of model.MapperSpec.
   2418 
   2419     Returns:
   2420       A list of InputReaders. None when no input data can be found.
   2421     """
   2422     reader_spec = cls.get_params(mapper_spec, allow_old=False)
   2423     bucket = reader_spec[cls.BUCKET_NAME_PARAM]
   2424     filenames = reader_spec[cls.OBJECT_NAMES_PARAM]
   2425     delimiter = reader_spec.get(cls.DELIMITER_PARAM)
   2426     account_id = reader_spec.get(cls._ACCOUNT_ID_PARAM)
   2427     buffer_size = reader_spec.get(cls.BUFFER_SIZE_PARAM)
   2428     fail_on_missing_input = reader_spec.get(cls.FAIL_ON_MISSING_INPUT)
   2429 
   2430     # Gather the complete list of files (expanding wildcards)
   2431     all_filenames = []
   2432     for filename in filenames:
   2433       if filename.endswith("*"):
   2434         all_filenames.extend(
   2435             [file_stat.filename for file_stat in cloudstorage.listbucket(
   2436                 "/" + bucket + "/" + filename[:-1], delimiter=delimiter,
   2437                 _account_id=account_id)])
   2438       else:
   2439         all_filenames.append("/%s/%s" % (bucket, filename))
   2440 
   2441     # Split into shards
   2442     readers = []
   2443     for shard in range(0, mapper_spec.shard_count):
   2444       shard_filenames = all_filenames[shard::mapper_spec.shard_count]
   2445       if shard_filenames:
   2446         reader = cls(
   2447             shard_filenames, buffer_size=buffer_size, _account_id=account_id,
   2448             delimiter=delimiter)
   2449         reader._fail_on_missing_input = fail_on_missing_input
   2450         readers.append(reader)
   2451     return readers
   2452 
   2453   @classmethod
   2454   def from_json(cls, state):
   2455     obj = pickle.loads(state[cls._JSON_PICKLE])
   2456     # fail_on_missing_input might not be set - default to False.
   2457     obj._fail_on_missing_input = state.get(
   2458         cls._JSON_FAIL_ON_MISSING_INPUT, False)
   2459     if obj._bucket:
   2460       obj._bucket_iter = iter(obj._bucket)
   2461     return obj
   2462 
   2463   def to_json(self):
   2464     before_iter = self._bucket_iter
   2465     self._bucket_iter = None
   2466     try:
   2467       return {
   2468           self._JSON_PICKLE: pickle.dumps(self),
   2469           # self._fail_on_missing_input gets pickled but we save it separately
   2470           # and override it in from_json to deal with version flipping.
   2471           self._JSON_FAIL_ON_MISSING_INPUT:
   2472               getattr(self, "_fail_on_missing_input", False)
   2473       }
   2474       return {self._JSON_PICKLE: pickle.dumps(self)}
   2475     finally:
   2476       self._bucket_itr = before_iter
   2477 
   2478   def next(self):
   2479     """Returns the next input from this input reader, a block of bytes.
   2480 
   2481     Non existent files will be logged and skipped. The file might have been
   2482     removed after input splitting.
   2483 
   2484     Returns:
   2485       The next input from this input reader in the form of a cloudstorage
   2486       ReadBuffer that supports a File-like interface (read, readline, seek,
   2487       tell, and close). An error may be raised if the file can not be opened.
   2488 
   2489     Raises:
   2490       StopIteration: The list of files has been exhausted.
   2491     """
   2492     options = {}
   2493     if self._buffer_size:
   2494       options["read_buffer_size"] = self._buffer_size
   2495     if self._account_id:
   2496       options["_account_id"] = self._account_id
   2497     while True:
   2498       filename = self._next_file()
   2499       if filename is None:
   2500         raise StopIteration()
   2501       try:
   2502         start_time = time.time()
   2503         handle = cloudstorage.open(filename, **options)
   2504 
   2505         ctx = context.get()
   2506         if ctx:
   2507           operation.counters.Increment(
   2508               COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
   2509 
   2510         return handle
   2511       except cloudstorage.NotFoundError:
   2512         # Fail the job if we're strict on missing input.
   2513         if getattr(self, "_fail_on_missing_input", False):
   2514           raise errors.FailJobError(
   2515               "File missing in GCS, aborting: %s" % filename)
   2516         # Move on otherwise.
   2517         logging.warning("File %s may have been removed. Skipping file.",
   2518                         filename)
   2519 
   2520   def __str__(self):
   2521     # Only show a limited number of files individually for readability
   2522     num_files = len(self._filenames)
   2523     if num_files > self._STRING_MAX_FILES_LISTED:
   2524       names = "%s...%s + %d not shown" % (
   2525           ",".join(self._filenames[0:self._STRING_MAX_FILES_LISTED-1]),
   2526           self._filenames[-1],
   2527           num_files - self._STRING_MAX_FILES_LISTED)
   2528     else:
   2529       names = ",".join(self._filenames)
   2530 
   2531     if self._index > num_files:
   2532       status = "EOF"
   2533     else:
   2534       status = "Next %s (%d of %d)" % (
   2535           self._filenames[self._index],
   2536           self._index + 1,  # +1 for human 1-indexing
   2537           num_files)
   2538     return "CloudStorage [%s, %s]" % (status, names)
   2539 
   2540 
   2541 GoogleCloudStorageInputReader = _GoogleCloudStorageInputReader
   2542 
   2543 
   2544 class _GoogleCloudStorageRecordInputReader(_GoogleCloudStorageInputReader):
   2545   """Read data from a Google Cloud Storage file using LevelDB format.
   2546 
   2547   See the _GoogleCloudStorageOutputWriter for additional configuration options.
   2548   """
   2549 
   2550   def __getstate__(self):
   2551     result = self.__dict__.copy()
   2552     # record reader may not exist if reader has not been used
   2553     if "_record_reader" in result:
   2554       # RecordsReader has no buffering, it can safely be reconstructed after
   2555       # deserialization
   2556       result.pop("_record_reader")
   2557     return result
   2558 
   2559   def next(self):
   2560     """Returns the next input from this input reader, a record.
   2561 
   2562     Returns:
   2563       The next input from this input reader in the form of a record read from
   2564       an LevelDB file.
   2565 
   2566     Raises:
   2567       StopIteration: The ordered set records has been exhausted.
   2568     """
   2569     while True:
   2570       if not hasattr(self, "_cur_handle") or self._cur_handle is None:
   2571         # If there are no more files, StopIteration is raised here
   2572         self._cur_handle = super(_GoogleCloudStorageRecordInputReader,
   2573                                  self).next()
   2574       if not hasattr(self, "_record_reader") or self._record_reader is None:
   2575         self._record_reader = records.RecordsReader(self._cur_handle)
   2576 
   2577       try:
   2578         start_time = time.time()
   2579         content = self._record_reader.read()
   2580 
   2581         ctx = context.get()
   2582         if ctx:
   2583           operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
   2584           operation.counters.Increment(
   2585               COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
   2586         return content
   2587 
   2588       except EOFError:
   2589         self._cur_handle = None
   2590         self._record_reader = None
   2591 
   2592 
   2593 GoogleCloudStorageRecordInputReader = _GoogleCloudStorageRecordInputReader
   2594 
   2595 
   2596 class _ReducerReader(_GoogleCloudStorageRecordInputReader):
   2597   """Reader to read KeyValues records from GCS."""
   2598 
   2599   expand_parameters = True
   2600 
   2601   def __init__(self, filenames, index=0, buffer_size=None, _account_id=None,
   2602                delimiter=None):
   2603     super(_ReducerReader, self).__init__(filenames, index, buffer_size,
   2604                                          _account_id, delimiter)
   2605     self.current_key = None
   2606     self.current_values = None
   2607 
   2608   def __iter__(self):
   2609     ctx = context.get()
   2610     combiner = None
   2611 
   2612     if ctx:
   2613       combiner_spec = ctx.mapreduce_spec.mapper.params.get("combiner_spec")
   2614       if combiner_spec:
   2615         combiner = util.handler_for_name(combiner_spec)
   2616 
   2617     try:
   2618       while True:
   2619         binary_record = super(_ReducerReader, self).next()
   2620         proto = kv_pb.KeyValues()
   2621         proto.ParseFromString(binary_record)
   2622 
   2623         to_yield = None
   2624         if self.current_key is not None and self.current_key != proto.key():
   2625           to_yield = (self.current_key, self.current_values)
   2626           self.current_key = None
   2627           self.current_values = None
   2628 
   2629         if self.current_key is None:
   2630           self.current_key = proto.key()
   2631           self.current_values = []
   2632 
   2633         if combiner:
   2634           combiner_result = combiner(
   2635               self.current_key, proto.value_list(), self.current_values)
   2636 
   2637           if not util.is_generator(combiner_result):
   2638             raise errors.BadCombinerOutputError(
   2639                 "Combiner %s should yield values instead of returning them "
   2640                 "(%s)" % (combiner, combiner_result))
   2641 
   2642           self.current_values = []
   2643           for value in combiner_result:
   2644             if isinstance(value, operation.Operation):
   2645               value(ctx)
   2646             else:
   2647               # With combiner the current values always come from the combiner.
   2648               self.current_values.append(value)
   2649 
   2650           # Check-point after each combiner call is run only when there's
   2651           # nothing that needs to be yielded below. Otherwise allowing a
   2652           # check-point here would cause the current to_yield data to be lost.
   2653           if not to_yield:
   2654             yield ALLOW_CHECKPOINT
   2655         else:
   2656           # Without combiner we just accumulate values.
   2657           self.current_values.extend(proto.value_list())
   2658 
   2659         if to_yield:
   2660           yield to_yield
   2661           # Check-point after each key is yielded.
   2662           yield ALLOW_CHECKPOINT
   2663     except StopIteration:
   2664       pass
   2665 
   2666     # There may be some accumulated values left at the end of an input file
   2667     # so be sure to yield those too.
   2668     if self.current_key is not None:
   2669       to_yield = (self.current_key, self.current_values)
   2670       self.current_key = None
   2671       self.current_values = None
   2672       yield to_yield
   2673 
   2674   @staticmethod
   2675   def encode_data(data):
   2676     """Encodes the given data, which may have include raw bytes.
   2677 
   2678     Works around limitations in JSON encoding, which cannot handle raw bytes.
   2679 
   2680     Args:
   2681       data: the data to encode.
   2682 
   2683     Returns:
   2684       The data encoded.
   2685     """
   2686     return base64.b64encode(pickle.dumps(data))
   2687 
   2688   @staticmethod
   2689   def decode_data(data):
   2690     """Decodes data encoded with the encode_data function."""
   2691     return pickle.loads(base64.b64decode(data))
   2692 
   2693   def to_json(self):
   2694     """Returns an input shard state for the remaining inputs.
   2695 
   2696     Returns:
   2697       A json-izable version of the remaining InputReader.
   2698     """
   2699     result = super(_ReducerReader, self).to_json()
   2700     result["current_key"] = self.encode_data(self.current_key)
   2701     result["current_values"] = self.encode_data(self.current_values)
   2702     return result
   2703 
   2704   @classmethod
   2705   def from_json(cls, json):
   2706     """Creates an instance of the InputReader for the given input shard state.
   2707 
   2708     Args:
   2709       json: The InputReader state as a dict-like object.
   2710 
   2711     Returns:
   2712       An instance of the InputReader configured using the values of json.
   2713     """
   2714     result = super(_ReducerReader, cls).from_json(json)
   2715     result.current_key = _ReducerReader.decode_data(json["current_key"])
   2716     result.current_values = _ReducerReader.decode_data(json["current_values"])
   2717     return result
   2718