Home | History | Annotate | Download | only in mapreduce
      1 #!/usr/bin/env python
      2 # Copyright 2011 Google Inc. All Rights Reserved.
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #     http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 
     16 """Pipelines for mapreduce library."""
     17 
     18 from __future__ import with_statement
     19 
     20 
     21 
     22 __all__ = [
     23     "CleanupPipeline",
     24     "MapPipeline",
     25     "MapperPipeline",
     26     "MapreducePipeline",
     27     "ReducePipeline",
     28     "ShufflePipeline",
     29     ]
     30 
     31 
     32 import pipeline
     33 from pipeline import common as pipeline_common
     34 from google.appengine.api import app_identity
     35 from mapreduce import errors
     36 from mapreduce import input_readers
     37 from mapreduce import mapper_pipeline
     38 from mapreduce import model
     39 from mapreduce import output_writers
     40 from mapreduce import pipeline_base
     41 from mapreduce import shuffler
     42 from mapreduce import util
     43 
     44 # pylint: disable=g-bad-name
     45 # pylint: disable=protected-access
     46 
     47 # Mapper pipeline is extracted only to remove dependency cycle with shuffler.py
     48 # Reimport it back.
     49 MapperPipeline = mapper_pipeline.MapperPipeline
     50 
     51 ShufflePipeline = shuffler.ShufflePipeline
     52 
     53 CleanupPipeline = shuffler._GCSCleanupPipeline
     54 
     55 # For backward compatibility.
     56 _ReducerReader = input_readers._ReducerReader
     57 
     58 
     59 class MapPipeline(pipeline_base._OutputSlotsMixin,
     60                   pipeline_base.PipelineBase):
     61   """Runs the map stage of MapReduce.
     62 
     63   Iterates over input reader and outputs data into key/value format
     64   for shuffler consumption.
     65 
     66   Args:
     67     job_name: mapreduce job name as string.
     68     mapper_spec: specification of map handler function as string.
     69     input_reader_spec: input reader specification as string.
     70     params: mapper and input reader parameters as dict.
     71     shards: number of shards to start as int.
     72 
     73   Returns:
     74     list of filenames written to by this mapper, one for each shard.
     75   """
     76 
     77   def run(self,
     78           job_name,
     79           mapper_spec,
     80           input_reader_spec,
     81           params,
     82           shards=None):
     83     new_params = dict(params or {})
     84     # Although we are using all the default settings (and inherited bucket_name)
     85     # we still need to define an output_writer dict in order to pass validation.
     86     new_params.update({"output_writer": {}})
     87     yield MapperPipeline(
     88         job_name + "-map",
     89         mapper_spec,
     90         input_reader_spec,
     91         output_writer_spec=(output_writers.__name__ +
     92                             "._GoogleCloudStorageKeyValueOutputWriter"),
     93         params=new_params,
     94         shards=shards)
     95 
     96 
     97 class ReducePipeline(pipeline_base._OutputSlotsMixin,
     98                      pipeline_base.PipelineBase):
     99   """Runs the reduce stage of MapReduce.
    100 
    101   Merge-reads input files and runs reducer function on them.
    102 
    103   Args:
    104     job_name: mapreduce job name as string.
    105     reader_spec: specification of reduce function.
    106     output_writer_spec: specification of output write to use with reduce
    107       function.
    108     params: mapper parameters to use as dict.
    109     bucket_name: The name of the Google Cloud Storage bucket.
    110     filenames: list of filenames to reduce.
    111     combiner_spec: Optional. Specification of a combine function. If not
    112       supplied, no combine step will take place. The combine function takes a
    113       key, list of values and list of previously combined results. It yields
    114       combined values that might be processed by another combiner call, but will
    115       eventually end up in reducer. The combiner output key is assumed to be the
    116       same as the input key.
    117     shards: Optional. Number of output shards. Defaults to the number of
    118       input files.
    119 
    120   Returns:
    121     filenames from output writer.
    122   """
    123 
    124   output_names = mapper_pipeline.MapperPipeline.output_names
    125 
    126   def run(self,
    127           job_name,
    128           reducer_spec,
    129           output_writer_spec,
    130           params,
    131           bucket_name,
    132           filenames,
    133           combiner_spec=None,
    134           shards=None):
    135     filenames_only = (
    136         util.strip_prefix_from_items("/%s/" % bucket_name, filenames))
    137     new_params = dict(params or {})
    138     new_params.update({
    139         "input_reader": {
    140             "bucket_name": bucket_name,
    141             "objects": filenames_only,
    142         }})
    143     if combiner_spec:
    144       new_params.update({
    145           "combiner_spec": combiner_spec,
    146           })
    147 
    148     # TODO(user): Test this
    149     if shards is None:
    150       shards = len(filenames)
    151 
    152     yield mapper_pipeline.MapperPipeline(
    153         job_name + "-reduce",
    154         reducer_spec,
    155         __name__ + "._ReducerReader",
    156         output_writer_spec,
    157         new_params,
    158         shards=shards)
    159 
    160 
    161 class MapreducePipeline(pipeline_base._OutputSlotsMixin,
    162                         pipeline_base.PipelineBase):
    163   """Pipeline to execute MapReduce jobs.
    164 
    165   The Shuffle stage uses Google Cloud Storage (GCS). For newly created projects,
    166   GCS is activated automatically. To activate GCS follow these instructions:
    167   https://cloud.google.com/storage/docs/signup#activate
    168 
    169   Args:
    170     job_name: job name as string.
    171     mapper_spec: specification of mapper to use.
    172     reducer_spec: specification of reducer to use.
    173     input_reader_spec: specification of input reader to read data from.
    174     output_writer_spec: specification of output writer to save reduce output to.
    175     mapper_params: parameters to use for mapper phase.
    176     reducer_params: parameters to use for reduce phase.
    177     shards: number of shards to use as int.
    178     combiner_spec: Optional. Specification of a combine function. If not
    179       supplied, no combine step will take place. The combine function takes a
    180       key, list of values and list of previously combined results. It yields
    181       combined values that might be processed by another combiner call, but will
    182       eventually end up in reducer. The combiner output key is assumed to be the
    183       same as the input key.
    184 
    185   Returns:
    186     result_status: one of model.MapreduceState._RESULTS. Check this to see
    187       if the job is successful.
    188     default: a list of filenames if the mapreduce was successful and
    189       was outputting files. An empty list otherwise.
    190   """
    191 
    192   output_names = mapper_pipeline.MapperPipeline.output_names
    193 
    194   def run(self,
    195           job_name,
    196           mapper_spec,
    197           reducer_spec,
    198           input_reader_spec,
    199           output_writer_spec=None,
    200           mapper_params=None,
    201           reducer_params=None,
    202           shards=None,
    203           combiner_spec=None):
    204     # Check that you have a bucket_name set in the mapper_params and set it
    205     # to the default if not.
    206     if mapper_params.get("bucket_name") is None:
    207       try:
    208         mapper_params["bucket_name"] = (
    209             app_identity.get_default_gcs_bucket_name())
    210       except Exception, e:
    211         raise errors.Error("Unable to get the GCS default bucket name. "
    212                            "Check to see that GCS is properly activated. "
    213                            + str(e))
    214     if mapper_params["bucket_name"] is None:
    215       raise errors.Error("There is no GCS default bucket name. "
    216                          "Check to see that GCS is properly activated.")
    217     # TODO(user): Check that the bucket is indeed writable.
    218 
    219     map_pipeline = yield MapPipeline(job_name,
    220                                      mapper_spec,
    221                                      input_reader_spec,
    222                                      params=mapper_params,
    223                                      shards=shards)
    224     shuffler_pipeline = yield ShufflePipeline(
    225         job_name, mapper_params, map_pipeline)
    226     reducer_pipeline = yield ReducePipeline(
    227         job_name,
    228         reducer_spec,
    229         output_writer_spec,
    230         reducer_params,
    231         mapper_params["bucket_name"],
    232         shuffler_pipeline,
    233         combiner_spec=combiner_spec)
    234     with pipeline.After(reducer_pipeline):
    235       all_temp_files = yield pipeline_common.Extend(
    236           map_pipeline, shuffler_pipeline)
    237       yield CleanupPipeline(all_temp_files)
    238 
    239     yield _ReturnPipeline(map_pipeline.result_status,
    240                           reducer_pipeline.result_status,
    241                           reducer_pipeline.counters,
    242                           reducer_pipeline.job_id,
    243                           reducer_pipeline)
    244 
    245 
    246 class _ReturnPipeline(pipeline_base._OutputSlotsMixin,
    247                       pipeline_base.PipelineBase):
    248   """Returns Mapreduce result.
    249 
    250   Fills outputs for MapreducePipeline. See MapreducePipeline.
    251   """
    252 
    253   output_names = mapper_pipeline.MapperPipeline.output_names
    254 
    255   def run(self,
    256           map_result_status,
    257           reduce_result_status,
    258           reduce_counters,
    259           job_id,
    260           reduce_outputs):
    261 
    262     if (map_result_status == model.MapreduceState.RESULT_ABORTED or
    263         reduce_result_status == model.MapreduceState.RESULT_ABORTED):
    264       result_status = model.MapreduceState.RESULT_ABORTED
    265     elif (map_result_status == model.MapreduceState.RESULT_FAILED or
    266           reduce_result_status == model.MapreduceState.RESULT_FAILED):
    267       result_status = model.MapreduceState.RESULT_FAILED
    268     else:
    269       result_status = model.MapreduceState.RESULT_SUCCESS
    270 
    271     self.fill(self.outputs.result_status, result_status)
    272     self.fill(self.outputs.counters, reduce_counters)
    273     self.fill(self.outputs.job_id, job_id)
    274     if result_status == model.MapreduceState.RESULT_SUCCESS:
    275       yield pipeline_common.Return(reduce_outputs)
    276     else:
    277       yield pipeline_common.Return([])
    278