Home | History | Annotate | Download | only in mapreduce
      1 #!/usr/bin/env python
      2 """A class representing entity property range."""
      3 
      4 
      5 
      6 # pylint: disable=g-bad-name
      7 # pylint: disable=g-import-not-at-top
      8 
      9 import datetime
     10 
     11 from google.appengine.ext import ndb
     12 
     13 from google.appengine.ext import db
     14 from mapreduce import errors
     15 from mapreduce import util
     16 
     17 __all__ = [
     18     "should_shard_by_property_range",
     19     "PropertyRange"]
     20 
     21 
     22 def should_shard_by_property_range(filters):
     23   """Returns whether these filters suggests sharding by property range.
     24 
     25   Args:
     26     filters: user supplied filters. Each filter should be a list or tuple of
     27       format (<property_name_as_str>, <query_operator_as_str>,
     28       <value_of_certain_type>). Value type is up to the property's type.
     29 
     30   Returns:
     31     True if these filters suggests sharding by property range. False
     32   Otherwise.
     33   """
     34   if not filters:
     35     return False
     36 
     37   for f in filters:
     38     if f[1] != "=":
     39       return True
     40   return False
     41 
     42 
     43 class PropertyRange(object):
     44   """A class that represents a range on a db.Model's property.
     45 
     46   It supports splitting the range into n shards and generating a query that
     47   returns entities within that range.
     48   """
     49 
     50   def __init__(self,
     51                filters,
     52                model_class_path):
     53     """Init.
     54 
     55     Args:
     56       filters: user supplied filters. Each filter should be a list or tuple of
     57         format (<property_name_as_str>, <query_operator_as_str>,
     58         <value_of_certain_type>). Value type should satisfy the property's type.
     59       model_class_path: full path to the model class in str.
     60     """
     61     self.filters = filters
     62     self.model_class_path = model_class_path
     63     self.model_class = util.for_name(self.model_class_path)
     64     self.prop, self.start, self.end = self._get_range_from_filters(
     65         self.filters, self.model_class)
     66 
     67   @classmethod
     68   def _get_range_from_filters(cls, filters, model_class):
     69     """Get property range from filters user provided.
     70 
     71     This method also validates there is one and only one closed range on a
     72     single property.
     73 
     74     Args:
     75       filters: user supplied filters. Each filter should be a list or tuple of
     76         format (<property_name_as_str>, <query_operator_as_str>,
     77         <value_of_certain_type>). Value type should satisfy the property's type.
     78       model_class: the model class for the entity type to apply filters on.
     79 
     80     Returns:
     81       a tuple of (property, start_filter, end_filter). property is the model's
     82     field that the range is about. start_filter and end_filter define the
     83     start and the end of the range. (None, None, None) if no range is found.
     84 
     85     Raises:
     86       BadReaderParamsError: if any filter is invalid in any way.
     87     """
     88     if not filters:
     89       return None, None, None
     90 
     91     range_property = None
     92     start_val = None
     93     end_val = None
     94     start_filter = None
     95     end_filter = None
     96     for f in filters:
     97       prop, op, val = f
     98 
     99       if op in [">", ">=", "<", "<="]:
    100         if range_property and range_property != prop:
    101           raise errors.BadReaderParamsError(
    102               "Range on only one property is supported.")
    103         range_property = prop
    104 
    105         if val is None:
    106           raise errors.BadReaderParamsError(
    107               "Range can't be None in filter %s", f)
    108 
    109         if op in [">", ">="]:
    110           if start_val is not None:
    111             raise errors.BadReaderParamsError(
    112                 "Operation %s is specified more than once.", op)
    113           start_val = val
    114           start_filter = f
    115         else:
    116           if end_val is not None:
    117             raise errors.BadReaderParamsError(
    118                 "Operation %s is specified more than once.", op)
    119           end_val = val
    120           end_filter = f
    121       elif op != "=":
    122         raise errors.BadReaderParamsError(
    123             "Only < <= > >= = are supported as operation. Got %s", op)
    124 
    125     if not range_property:
    126       return None, None, None
    127 
    128     if start_val is None or end_val is None:
    129       raise errors.BadReaderParamsError(
    130           "Filter should contains a complete range on property %s",
    131           range_property)
    132     if issubclass(model_class, db.Model):
    133       property_obj = model_class.properties()[range_property]
    134     else:
    135       property_obj = (
    136           model_class._properties[  # pylint: disable=protected-access
    137               range_property])
    138     supported_properties = (
    139         _DISCRETE_PROPERTY_SPLIT_FUNCTIONS.keys() +
    140         _CONTINUOUS_PROPERTY_SPLIT_FUNCTIONS.keys())
    141     if not isinstance(property_obj, tuple(supported_properties)):
    142       raise errors.BadReaderParamsError(
    143           "Filtered property %s is not supported by sharding.", range_property)
    144     if not start_val < end_val:
    145       raise errors.BadReaderParamsError(
    146           "Start value %s should be smaller than end value %s",
    147           start_val, end_val)
    148 
    149     return property_obj, start_filter, end_filter
    150 
    151   def split(self, n):
    152     """Evenly split this range into contiguous, non overlapping subranges.
    153 
    154     Args:
    155       n: number of splits.
    156 
    157     Returns:
    158       a list of contiguous, non overlapping sub PropertyRanges. Maybe less than
    159     n when not enough subranges.
    160     """
    161     new_range_filters = []
    162     name = self.start[0]
    163     prop_cls = self.prop.__class__
    164     if prop_cls in _DISCRETE_PROPERTY_SPLIT_FUNCTIONS:
    165       splitpoints = _DISCRETE_PROPERTY_SPLIT_FUNCTIONS[prop_cls](
    166           self.start[2], self.end[2], n,
    167           self.start[1] == ">=", self.end[1] == "<=")
    168       start_filter = (name, ">=", splitpoints[0])
    169       for p in splitpoints[1:]:
    170         end_filter = (name, "<", p)
    171         new_range_filters.append([start_filter, end_filter])
    172         start_filter = (name, ">=", p)
    173     else:
    174       splitpoints = _CONTINUOUS_PROPERTY_SPLIT_FUNCTIONS[prop_cls](
    175           self.start[2], self.end[2], n)
    176       start_filter = self.start
    177       for p in splitpoints:
    178         end_filter = (name, "<", p)
    179         new_range_filters.append([start_filter, end_filter])
    180         start_filter = (name, ">=", p)
    181       new_range_filters.append([start_filter, self.end])
    182 
    183     for f in new_range_filters:
    184       f.extend(self._equality_filters)
    185 
    186     return [self.__class__(f, self.model_class_path) for f in new_range_filters]
    187 
    188   def make_query(self, ns):
    189     """Make a query of entities within this range.
    190 
    191     Query options are not supported. They should be specified when the query
    192     is run.
    193 
    194     Args:
    195       ns: namespace of this query.
    196 
    197     Returns:
    198       a db.Query or ndb.Query, depends on the model class's type.
    199     """
    200     if issubclass(self.model_class, db.Model):
    201       query = db.Query(self.model_class, namespace=ns)
    202       for f in self.filters:
    203         query.filter("%s %s" % (f[0], f[1]), f[2])
    204     else:
    205       query = self.model_class.query(namespace=ns)
    206       for f in self.filters:
    207         query = query.filter(ndb.FilterNode(*f))
    208     return query
    209 
    210   @property
    211   def _equality_filters(self):
    212     return [f for f in self.filters if f[1] == "="]
    213 
    214   def to_json(self):
    215     return {"filters": self.filters,
    216             "model_class_path": self.model_class_path}
    217 
    218   @classmethod
    219   def from_json(cls, json):
    220     return cls(json["filters"], json["model_class_path"])
    221 
    222 
    223 def _split_datetime_property(start, end, n, include_start, include_end):
    224   # datastore stored datetime precision is microsecond.
    225   if not include_start:
    226     start += datetime.timedelta(microseconds=1)
    227   if include_end:
    228     end += datetime.timedelta(microseconds=1)
    229   delta = end - start
    230   stride = delta // n
    231   if stride <= datetime.timedelta():
    232     raise ValueError("Range too small to split: start %r end %r", start, end)
    233   splitpoints = [start]
    234   previous = start
    235   for _ in range(n-1):
    236     point = previous + stride
    237     if point == previous or point > end:
    238       continue
    239     previous = point
    240     splitpoints.append(point)
    241   if end not in splitpoints:
    242     splitpoints.append(end)
    243   return splitpoints
    244 
    245 
    246 def _split_float_property(start, end, n):
    247   delta = float(end - start)
    248   stride = delta / n
    249   if stride <= 0:
    250     raise ValueError("Range too small to split: start %r end %r", start, end)
    251   splitpoints = []
    252   for i in range(1, n):
    253     splitpoints.append(start + i * stride)
    254   return splitpoints
    255 
    256 
    257 def _split_integer_property(start, end, n, include_start, include_end):
    258   if not include_start:
    259     start += 1
    260   if include_end:
    261     end += 1
    262   delta = float(end - start)
    263   stride = delta / n
    264   if stride <= 0:
    265     raise ValueError("Range too small to split: start %r end %r", start, end)
    266   splitpoints = [start]
    267   previous = start
    268   for i in range(1, n):
    269     point = start + int(round(i * stride))
    270     if point == previous or point > end:
    271       continue
    272     previous = point
    273     splitpoints.append(point)
    274   if end not in splitpoints:
    275     splitpoints.append(end)
    276   return splitpoints
    277 
    278 
    279 def _split_string_property(start, end, n, include_start, include_end):
    280   try:
    281     start = start.encode("ascii")
    282     end = end.encode("ascii")
    283   except UnicodeEncodeError, e:
    284     raise ValueError("Only ascii str is supported.", e)
    285 
    286   return _split_byte_string_property(start, end, n, include_start, include_end)
    287 
    288 
    289 # The alphabet splitting supports.
    290 _ALPHABET = "".join(chr(i) for i in range(128))
    291 # String length determines how many unique strings we can choose from.
    292 # We can't split into more shards than this: len(_ALPHABET)^_STRING_LENGTH
    293 _STRING_LENGTH = 4
    294 
    295 
    296 def _split_byte_string_property(start, end, n, include_start, include_end):
    297   # Get prefix, suffix, and the real start/end to split on.
    298   i = 0
    299   for i, (s, e) in enumerate(zip(start, end)):
    300     if s != e:
    301       break
    302   common_prefix = start[:i]
    303   start_suffix = start[i+_STRING_LENGTH:]
    304   end_suffix = end[i+_STRING_LENGTH:]
    305   start = start[i:i+_STRING_LENGTH]
    306   end = end[i:i+_STRING_LENGTH]
    307 
    308   # Convert str to ord.
    309   weights = _get_weights(_STRING_LENGTH)
    310   start_ord = _str_to_ord(start, weights)
    311   if not include_start:
    312     start_ord += 1
    313   end_ord = _str_to_ord(end, weights)
    314   if include_end:
    315     end_ord += 1
    316 
    317   # Do split.
    318   stride = (end_ord - start_ord) / float(n)
    319   if stride <= 0:
    320     raise ValueError("Range too small to split: start %s end %s", start, end)
    321   splitpoints = [_ord_to_str(start_ord, weights)]
    322   previous = start_ord
    323   for i in range(1, n):
    324     point = start_ord + int(round(stride * i))
    325     if point == previous or point > end_ord:
    326       continue
    327     previous = point
    328     splitpoints.append(_ord_to_str(point, weights))
    329   end_str = _ord_to_str(end_ord, weights)
    330   if end_str not in splitpoints:
    331     splitpoints.append(end_str)
    332 
    333   # Append suffix.
    334   splitpoints[0] += start_suffix
    335   splitpoints[-1] += end_suffix
    336 
    337   return [common_prefix + point for point in splitpoints]
    338 
    339 
    340 def _get_weights(max_length):
    341   """Get weights for each offset in str of certain max length.
    342 
    343   Args:
    344     max_length: max length of the strings.
    345 
    346   Returns:
    347     A list of ints as weights.
    348 
    349   Example:
    350     If max_length is 2 and alphabet is "ab", then we have order "", "a", "aa",
    351   "ab", "b", "ba", "bb". So the weight for the first char is 3.
    352   """
    353   weights = [1]
    354   for i in range(1, max_length):
    355     weights.append(weights[i-1] * len(_ALPHABET) + 1)
    356   weights.reverse()
    357   return weights
    358 
    359 
    360 def _str_to_ord(content, weights):
    361   """Converts a string to its lexicographical order.
    362 
    363   Args:
    364     content: the string to convert. Of type str.
    365     weights: weights from _get_weights.
    366 
    367   Returns:
    368     an int or long that represents the order of this string. "" has order 0.
    369   """
    370   ordinal = 0
    371   for i, c in enumerate(content):
    372     ordinal += weights[i] * _ALPHABET.index(c) + 1
    373   return ordinal
    374 
    375 
    376 def _ord_to_str(ordinal, weights):
    377   """Reverse function of _str_to_ord."""
    378   chars = []
    379   for weight in weights:
    380     if ordinal == 0:
    381       return "".join(chars)
    382     ordinal -= 1
    383     index, ordinal = divmod(ordinal, weight)
    384     chars.append(_ALPHABET[index])
    385   return "".join(chars)
    386 
    387 
    388 # discrete property split functions all have the same interface.
    389 # They take start, end, shard_number n, include_start, include_end.
    390 # They return at most n+1 points, forming n ranges.
    391 # Each range should be include_start, exclude_end.
    392 _DISCRETE_PROPERTY_SPLIT_FUNCTIONS = {
    393     db.DateTimeProperty: _split_datetime_property,
    394     db.IntegerProperty: _split_integer_property,
    395     db.StringProperty: _split_string_property,
    396     db.ByteStringProperty: _split_byte_string_property,
    397     # ndb.
    398     ndb.DateTimeProperty: _split_datetime_property,
    399     ndb.IntegerProperty: _split_integer_property,
    400     ndb.StringProperty: _split_string_property,
    401     ndb.BlobProperty: _split_byte_string_property
    402 }
    403 
    404 _CONTINUOUS_PROPERTY_SPLIT_FUNCTIONS = {
    405     db.FloatProperty: _split_float_property,
    406     # ndb.
    407     ndb.FloatProperty: _split_float_property,
    408 }
    409