Home | History | Annotate | Download | only in mapreduce
      1 #!/usr/bin/env python
      2 """Parameters to control Mapreduce."""
      3 
      4 __all__ = ["CONFIG_NAMESPACE",
      5            "config"]
      6 
      7 import pickle
      8 
      9 
     10 # To break circular dependency and more.
     11 # pylint: disable=g-import-not-at-top
     12 
     13 
     14 # For the mapreduce in python 25 runtime, this import will fail.
     15 # TODO(user): Remove all pipeline import protections after 25 mr defunct.
     16 try:
     17   from pipeline import util as pipeline_util
     18 except ImportError:
     19   pipeline_util = None
     20 
     21 from google.appengine.api import lib_config
     22 
     23 CONFIG_NAMESPACE = "mapreduce"
     24 
     25 
     26 # pylint: disable=protected-access
     27 # pylint: disable=invalid-name
     28 
     29 
     30 class _JobConfigMeta(type):
     31   """Metaclass that controls class creation."""
     32 
     33   _OPTIONS = "_options"
     34   _REQUIRED = "_required"
     35 
     36   def __new__(mcs, classname, bases, class_dict):
     37     """Creates a _Config class and modifies its class dict.
     38 
     39     Args:
     40       classname: name of the class.
     41       bases: a list of base classes.
     42       class_dict: original class dict.
     43 
     44     Returns:
     45       A new _Config class. The modified class will have two fields.
     46       _options field is a dict from option name to _Option objects.
     47       _required field is a set of required option names.
     48     """
     49     options = {}
     50     required = set()
     51     for name, option in class_dict.iteritems():
     52       if isinstance(option, _Option):
     53         options[name] = option
     54         if option.required:
     55           required.add(name)
     56 
     57     for name in options:
     58       class_dict.pop(name)
     59     class_dict[mcs._OPTIONS] = options
     60     class_dict[mcs._REQUIRED] = required
     61     cls = type.__new__(mcs, classname, bases, class_dict)
     62 
     63     # Handle inheritance of _Config.
     64     if object not in bases:
     65       parent_options = {}
     66       # Update options from the root down.
     67       for c in reversed(cls.__mro__):
     68         if mcs._OPTIONS in c.__dict__:
     69           # Children override parent.
     70           parent_options.update(c.__dict__[mcs._OPTIONS])
     71         if mcs._REQUIRED in c.__dict__:
     72           required.update(c.__dict__[mcs._REQUIRED])
     73       for k, v in parent_options.iteritems():
     74         if k not in options:
     75           options[k] = v
     76     return cls
     77 
     78 
     79 class _Option(object):
     80   """An option for _Config."""
     81 
     82   def __init__(self, kind, required=False, default_factory=None,
     83                can_be_none=False):
     84     """Init.
     85 
     86     Args:
     87       kind: type of the option.
     88       required: whether user is required to supply a value.
     89       default_factory: a factory, when called, returns the default value.
     90       can_be_none: whether value can be None.
     91 
     92     Raises:
     93       ValueError: if arguments aren't compatible.
     94     """
     95     if required and default_factory is not None:
     96       raise ValueError("No default_factory value when option is required.")
     97     self.kind = kind
     98     self.required = required
     99     self.default_factory = default_factory
    100     self.can_be_none = can_be_none
    101 
    102 
    103 class _Config(object):
    104   """Root class for all per job configuration."""
    105 
    106   __metaclass__ = _JobConfigMeta
    107 
    108   def __init__(self, _lenient=False, **kwds):
    109     """Init.
    110 
    111     Args:
    112       _lenient: When true, no option is required.
    113       **kwds: keyword arguments for options and their values.
    114     """
    115     self._verify_keys(kwds, _lenient)
    116     self._set_values(kwds, _lenient)
    117 
    118   def _verify_keys(self, kwds, _lenient):
    119     keys = set()
    120     for k in kwds:
    121       if k not in self._options:
    122         raise ValueError("Option %s is not supported." % (k))
    123       keys.add(k)
    124     if not _lenient:
    125       missing = self._required - keys
    126       if missing:
    127         raise ValueError("Options %s are required." % tuple(missing))
    128 
    129   def _set_values(self, kwds, _lenient):
    130     for k, option in self._options.iteritems():
    131       v = kwds.get(k)
    132       if v is None and option.default_factory:
    133         v = option.default_factory()
    134       setattr(self, k, v)
    135       if _lenient:
    136         continue
    137       if v is None and option.can_be_none:
    138         continue
    139       if isinstance(v, type) and not issubclass(v, option.kind):
    140         raise TypeError(
    141             "Expect subclass of %r for option %s. Got %r" % (
    142                 option.kind, k, v))
    143       if not isinstance(v, type) and not isinstance(v, option.kind):
    144         raise TypeError("Expect type %r for option %s. Got %r" % (
    145             option.kind, k, v))
    146 
    147   def __eq__(self, other):
    148     if not isinstance(other, self.__class__):
    149       return False
    150     return other.__dict__ == self.__dict__
    151 
    152   def __repr__(self):
    153     return str(self.__dict__)
    154 
    155   def to_json(self):
    156     return {"config": pickle.dumps(self)}
    157 
    158   @classmethod
    159   def from_json(cls, json):
    160     return pickle.loads(json["config"])
    161 
    162 
    163 # TODO(user): Make more of these private.
    164 class _ConfigDefaults(object):
    165   """Default configs.
    166 
    167   Do not change parameters whose names begin with _.
    168 
    169   SHARD_MAX_ATTEMPTS: Max attempts to execute a shard before giving up.
    170 
    171   TASK_MAX_ATTEMPTS: Max attempts to execute a task before dropping it. Task
    172     is any taskqueue task created by MR framework. A task is dropped
    173     when its X-AppEngine-TaskExecutionCount is bigger than this number.
    174     Dropping a task will cause abort on the entire MR job.
    175 
    176   TASK_MAX_DATA_PROCESSING_ATTEMPTS:
    177     Max times to execute a task when previous task attempts failed during
    178     data processing stage. An MR work task has three major stages:
    179     initial setup, data processing, and final checkpoint.
    180     Setup stage should be allowed to be retried more times than data processing
    181     stage: setup failures are caused by unavailable GAE services while
    182     data processing failures are mostly due to user function error out on
    183     certain input data. Thus, set TASK_MAX_ATTEMPTS higher than this parameter.
    184 
    185   QUEUE_NAME: Default queue for MR.
    186 
    187   SHARD_COUNT: Default shard count.
    188 
    189   PROCESSING_RATE_PER_SEC: Default rate of processed entities per second.
    190 
    191   BASE_PATH : Base path of mapreduce and pipeline handlers.
    192   """
    193 
    194   SHARD_MAX_ATTEMPTS = 4
    195 
    196   # Arbitrary big number.
    197   TASK_MAX_ATTEMPTS = 31
    198 
    199   TASK_MAX_DATA_PROCESSING_ATTEMPTS = 11
    200 
    201   QUEUE_NAME = "default"
    202 
    203   SHARD_COUNT = 8
    204 
    205   # Maximum number of mapper calls per second.
    206   # This parameter is useful for testing to force short slices.
    207   # Maybe make this a private constant instead.
    208   # If people want to rate limit their jobs, they can reduce shard count.
    209   PROCESSING_RATE_PER_SEC = 1000000
    210 
    211   # This path will be changed by build process when this is a part of SDK.
    212   BASE_PATH = "/mapreduce"
    213 
    214   # TODO(user): find a proper value for this.
    215   # The amount of time to perform scanning in one slice. New slice will be
    216   # scheduled as soon as current one takes this long.
    217   _SLICE_DURATION_SEC = 15
    218 
    219   # Delay between consecutive controller callback invocations.
    220   _CONTROLLER_PERIOD_SEC = 2
    221 
    222 
    223 # TODO(user): changes this name to app_config
    224 config = lib_config.register(CONFIG_NAMESPACE, _ConfigDefaults.__dict__)
    225 
    226 
    227 # The following are constants that depends on the value of _config.
    228 # They are constants because _config is completely initialized on the first
    229 # request of an instance and will never change until user deploy a new version.
    230 _DEFAULT_PIPELINE_BASE_PATH = config.BASE_PATH + "/pipeline"
    231 # See b/11341023 for context.
    232 _GCS_URLFETCH_TIMEOUT_SEC = 30
    233 # If a lock has been held longer than this value, mapreduce will start to use
    234 # logs API to check if the request has ended.
    235 _LEASE_DURATION_SEC = config._SLICE_DURATION_SEC * 1.1
    236 # In rare occasions, Logs API misses log entries. Thus
    237 # if a lock has been held longer than this timeout, mapreduce assumes the
    238 # request holding the lock has died, regardless of Logs API.
    239 # 10 mins is taskqueue task timeout on a frontend.
    240 _MAX_LEASE_DURATION_SEC = max(10 * 60 + 30, config._SLICE_DURATION_SEC * 1.5)
    241