Home | History | Annotate | Download | only in emr
      1 # Copyright (c) 2010 Spotify AB
      2 # Copyright (c) 2010-2011 Yelp
      3 #
      4 # Permission is hereby granted, free of charge, to any person obtaining a
      5 # copy of this software and associated documentation files (the
      6 # "Software"), to deal in the Software without restriction, including
      7 # without limitation the rights to use, copy, modify, merge, publish, dis-
      8 # tribute, sublicense, and/or sell copies of the Software, and to permit
      9 # persons to whom the Software is furnished to do so, subject to the fol-
     10 # lowing conditions:
     11 #
     12 # The above copyright notice and this permission notice shall be included
     13 # in all copies or substantial portions of the Software.
     14 #
     15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
     16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
     17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
     18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
     19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     21 # IN THE SOFTWARE.
     22 
     23 from boto.compat import six
     24 
     25 
     26 class Step(object):
     27     """
     28     Jobflow Step base class
     29     """
     30     def jar(self):
     31         """
     32         :rtype: str
     33         :return: URI to the jar
     34         """
     35         raise NotImplemented()
     36 
     37     def args(self):
     38         """
     39         :rtype: list(str)
     40         :return: List of arguments for the step
     41         """
     42         raise NotImplemented()
     43 
     44     def main_class(self):
     45         """
     46         :rtype: str
     47         :return: The main class name
     48         """
     49         raise NotImplemented()
     50 
     51 
     52 class JarStep(Step):
     53     """
     54     Custom jar step
     55     """
     56     def __init__(self, name, jar, main_class=None,
     57                  action_on_failure='TERMINATE_JOB_FLOW', step_args=None):
     58         """
     59         A elastic mapreduce step that executes a jar
     60 
     61         :type name: str
     62         :param name: The name of the step
     63         :type jar: str
     64         :param jar: S3 URI to the Jar file
     65         :type main_class: str
     66         :param main_class: The class to execute in the jar
     67         :type action_on_failure: str
     68         :param action_on_failure: An action, defined in the EMR docs to
     69             take on failure.
     70         :type step_args: list(str)
     71         :param step_args: A list of arguments to pass to the step
     72         """
     73         self.name = name
     74         self._jar = jar
     75         self._main_class = main_class
     76         self.action_on_failure = action_on_failure
     77 
     78         if isinstance(step_args, six.string_types):
     79             step_args = [step_args]
     80 
     81         self.step_args = step_args
     82 
     83     def jar(self):
     84         return self._jar
     85 
     86     def args(self):
     87         args = []
     88 
     89         if self.step_args:
     90             args.extend(self.step_args)
     91 
     92         return args
     93 
     94     def main_class(self):
     95         return self._main_class
     96 
     97 
     98 class StreamingStep(Step):
     99     """
    100     Hadoop streaming step
    101     """
    102     def __init__(self, name, mapper, reducer=None, combiner=None,
    103                  action_on_failure='TERMINATE_JOB_FLOW',
    104                  cache_files=None, cache_archives=None,
    105                  step_args=None, input=None, output=None,
    106                  jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'):
    107         """
    108         A hadoop streaming elastic mapreduce step
    109 
    110         :type name: str
    111         :param name: The name of the step
    112         :type mapper: str
    113         :param mapper: The mapper URI
    114         :type reducer: str
    115         :param reducer: The reducer URI
    116         :type combiner: str
    117         :param combiner: The combiner URI. Only works for Hadoop 0.20
    118             and later!
    119         :type action_on_failure: str
    120         :param action_on_failure: An action, defined in the EMR docs to
    121             take on failure.
    122         :type cache_files: list(str)
    123         :param cache_files: A list of cache files to be bundled with the job
    124         :type cache_archives: list(str)
    125         :param cache_archives: A list of jar archives to be bundled with
    126             the job
    127         :type step_args: list(str)
    128         :param step_args: A list of arguments to pass to the step
    129         :type input: str or a list of str
    130         :param input: The input uri
    131         :type output: str
    132         :param output: The output uri
    133         :type jar: str
    134         :param jar: The hadoop streaming jar. This can be either a local
    135             path on the master node, or an s3:// URI.
    136         """
    137         self.name = name
    138         self.mapper = mapper
    139         self.reducer = reducer
    140         self.combiner = combiner
    141         self.action_on_failure = action_on_failure
    142         self.cache_files = cache_files
    143         self.cache_archives = cache_archives
    144         self.input = input
    145         self.output = output
    146         self._jar = jar
    147 
    148         if isinstance(step_args, six.string_types):
    149             step_args = [step_args]
    150 
    151         self.step_args = step_args
    152 
    153     def jar(self):
    154         return self._jar
    155 
    156     def main_class(self):
    157         return None
    158 
    159     def args(self):
    160         args = []
    161 
    162         # put extra args BEFORE -mapper and -reducer so that e.g. -libjar
    163         # will work
    164         if self.step_args:
    165             args.extend(self.step_args)
    166 
    167         args.extend(['-mapper', self.mapper])
    168 
    169         if self.combiner:
    170             args.extend(['-combiner', self.combiner])
    171 
    172         if self.reducer:
    173             args.extend(['-reducer', self.reducer])
    174         else:
    175             args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
    176 
    177         if self.input:
    178             if isinstance(self.input, list):
    179                 for input in self.input:
    180                     args.extend(('-input', input))
    181             else:
    182                 args.extend(('-input', self.input))
    183         if self.output:
    184             args.extend(('-output', self.output))
    185 
    186         if self.cache_files:
    187             for cache_file in self.cache_files:
    188                 args.extend(('-cacheFile', cache_file))
    189 
    190         if self.cache_archives:
    191             for cache_archive in self.cache_archives:
    192                 args.extend(('-cacheArchive', cache_archive))
    193 
    194         return args
    195 
    196     def __repr__(self):
    197         return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % (
    198             self.__class__.__module__, self.__class__.__name__,
    199             self.name, self.mapper, self.reducer, self.action_on_failure,
    200             self.cache_files, self.cache_archives, self.step_args,
    201             self.input, self.output, self._jar)
    202 
    203 
    204 class ScriptRunnerStep(JarStep):
    205 
    206     ScriptRunnerJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
    207 
    208     def __init__(self, name, **kw):
    209         super(ScriptRunnerStep, self).__init__(name, self.ScriptRunnerJar, **kw)
    210 
    211 
    212 class PigBase(ScriptRunnerStep):
    213 
    214     BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/pig/pig-script',
    215                 '--base-path', 's3n://us-east-1.elasticmapreduce/libs/pig/']
    216 
    217 
    218 class InstallPigStep(PigBase):
    219     """
    220     Install pig on emr step
    221     """
    222 
    223     InstallPigName = 'Install Pig'
    224 
    225     def __init__(self, pig_versions='latest'):
    226         step_args = []
    227         step_args.extend(self.BaseArgs)
    228         step_args.extend(['--install-pig'])
    229         step_args.extend(['--pig-versions', pig_versions])
    230         super(InstallPigStep, self).__init__(self.InstallPigName, step_args=step_args)
    231 
    232 
    233 class PigStep(PigBase):
    234     """
    235     Pig script step
    236     """
    237 
    238     def __init__(self, name, pig_file, pig_versions='latest', pig_args=[]):
    239         step_args = []
    240         step_args.extend(self.BaseArgs)
    241         step_args.extend(['--pig-versions', pig_versions])
    242         step_args.extend(['--run-pig-script', '--args', '-f', pig_file])
    243         step_args.extend(pig_args)
    244         super(PigStep, self).__init__(name, step_args=step_args)
    245 
    246 
    247 class HiveBase(ScriptRunnerStep):
    248 
    249     BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/hive/hive-script',
    250                 '--base-path', 's3n://us-east-1.elasticmapreduce/libs/hive/']
    251 
    252 
    253 class InstallHiveStep(HiveBase):
    254     """
    255     Install Hive on EMR step
    256     """
    257     InstallHiveName = 'Install Hive'
    258 
    259     def __init__(self, hive_versions='latest', hive_site=None):
    260         step_args = []
    261         step_args.extend(self.BaseArgs)
    262         step_args.extend(['--install-hive'])
    263         step_args.extend(['--hive-versions', hive_versions])
    264         if hive_site is not None:
    265             step_args.extend(['--hive-site=%s' % hive_site])
    266         super(InstallHiveStep, self).__init__(self.InstallHiveName,
    267                                   step_args=step_args)
    268 
    269 
    270 class HiveStep(HiveBase):
    271     """
    272     Hive script step
    273     """
    274 
    275     def __init__(self, name, hive_file, hive_versions='latest',
    276                  hive_args=None):
    277         step_args = []
    278         step_args.extend(self.BaseArgs)
    279         step_args.extend(['--hive-versions', hive_versions])
    280         step_args.extend(['--run-hive-script', '--args', '-f', hive_file])
    281         if hive_args is not None:
    282             step_args.extend(hive_args)
    283         super(HiveStep, self).__init__(name, step_args=step_args)
    284