Home | History | Annotate | Download | only in mapreduce
      1 #!/usr/bin/env python
      2 #
      3 # Copyright 2010 Google Inc.
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 #     http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 """API for controlling MapReduce execution outside of MapReduce framework."""
     18 
     19 
     20 
     21 __all__ = ["start_map"]
     22 
     23 # pylint: disable=g-bad-name
     24 # pylint: disable=protected-access
     25 
     26 
     27 import logging
     28 
     29 from google.appengine.ext import db
     30 from mapreduce import handlers
     31 from mapreduce import model
     32 from mapreduce import parameters
     33 from mapreduce import util
     34 from mapreduce.api import map_job
     35 
     36 
     37 def start_map(name,
     38               handler_spec,
     39               reader_spec,
     40               mapper_parameters,
     41               shard_count=None,
     42               output_writer_spec=None,
     43               mapreduce_parameters=None,
     44               base_path=None,
     45               queue_name=None,
     46               eta=None,
     47               countdown=None,
     48               hooks_class_name=None,
     49               _app=None,
     50               in_xg_transaction=False):
     51   """Start a new, mapper-only mapreduce.
     52 
     53   Deprecated! Use map_job.start instead.
     54 
     55   If a value can be specified both from an explicit argument and from
     56   a dictionary, the value from the explicit argument wins.
     57 
     58   Args:
     59     name: mapreduce name. Used only for display purposes.
     60     handler_spec: fully qualified name of mapper handler function/class to call.
     61     reader_spec: fully qualified name of mapper reader to use
     62     mapper_parameters: dictionary of parameters to pass to mapper. These are
     63       mapper-specific and also used for reader/writer initialization.
     64       Should have format {"input_reader": {}, "output_writer":{}}. Old
     65       deprecated style does not have sub dictionaries.
     66     shard_count: number of shards to create.
     67     mapreduce_parameters: dictionary of mapreduce parameters relevant to the
     68       whole job.
     69     base_path: base path of mapreduce library handler specified in app.yaml.
     70       "/mapreduce" by default.
     71     queue_name: taskqueue queue name to be used for mapreduce tasks.
     72       see util.get_queue_name.
     73     eta: absolute time when the MR should execute. May not be specified
     74       if 'countdown' is also supplied. This may be timezone-aware or
     75       timezone-naive.
     76     countdown: time in seconds into the future that this MR should execute.
     77       Defaults to zero.
     78     hooks_class_name: fully qualified name of a hooks.Hooks subclass.
     79     in_xg_transaction: controls what transaction scope to use to start this MR
     80       job. If True, there has to be an already opened cross-group transaction
     81       scope. MR will use one entity group from it.
     82       If False, MR will create an independent transaction to start the job
     83       regardless of any existing transaction scopes.
     84 
     85   Returns:
     86     mapreduce id as string.
     87   """
     88   if shard_count is None:
     89     shard_count = parameters.config.SHARD_COUNT
     90 
     91   if mapper_parameters:
     92     mapper_parameters = dict(mapper_parameters)
     93 
     94   # Make sure this old API fill all parameters with default values.
     95   mr_params = map_job.JobConfig._get_default_mr_params()
     96   if mapreduce_parameters:
     97     mr_params.update(mapreduce_parameters)
     98 
     99   # Override default values if user specified them as arguments.
    100   if base_path:
    101     mr_params["base_path"] = base_path
    102   mr_params["queue_name"] = util.get_queue_name(queue_name)
    103 
    104   mapper_spec = model.MapperSpec(handler_spec,
    105                                  reader_spec,
    106                                  mapper_parameters,
    107                                  shard_count,
    108                                  output_writer_spec=output_writer_spec)
    109 
    110   if in_xg_transaction and not db.is_in_transaction():
    111     logging.warning("Expects an opened xg transaction to start mapreduce "
    112                     "when transactional is True.")
    113 
    114   return handlers.StartJobHandler._start_map(
    115       name,
    116       mapper_spec,
    117       mr_params,
    118       # TODO(user): Now that "queue_name" is part of mr_params.
    119       # Remove all the other ways to get queue_name after one release.
    120       queue_name=mr_params["queue_name"],
    121       eta=eta,
    122       countdown=countdown,
    123       hooks_class_name=hooks_class_name,
    124       _app=_app,
    125       in_xg_transaction=in_xg_transaction)
    126