Home | History | Annotate | Download | only in mapreduce
      1 #!/usr/bin/env python
      2 """Map job execution context."""
      3 
      4 import logging
      5 
      6 # pylint: disable=invalid-name
      7 
      8 
      9 class JobContext(object):
     10   """Context for map job."""
     11 
     12   def __init__(self, job_config):
     13     """Init.
     14 
     15     Read only properties:
     16       job_config: map_job.JobConfig for the job.
     17 
     18     Args:
     19       job_config: map_job.JobConfig.
     20     """
     21     self.job_config = job_config
     22 
     23 
     24 class ShardContext(object):
     25   """Context for a shard."""
     26 
     27   def __init__(self, job_context, shard_state):
     28     """Init.
     29 
     30     The signature of __init__ is subject to change.
     31 
     32     Read only properties:
     33       job_context: JobContext object.
     34       id: str. of format job_id-shard_number.
     35       number: int. shard number. 0 indexed.
     36       attempt: int. The current attempt at executing this shard.
     37         Starting at 1.
     38 
     39     Args:
     40       job_context: map_job.JobConfig.
     41       shard_state: model.ShardState.
     42     """
     43     self.job_context = job_context
     44     self.id = shard_state.shard_id
     45     self.number = shard_state.shard_number
     46     self.attempt = shard_state.retries + 1
     47     self._state = shard_state
     48 
     49   # TODO(user): standardize and document what format counter_name should take.
     50   def incr(self, counter_name, delta=1):
     51     """Changes counter by delta.
     52 
     53     Args:
     54       counter_name: the name of the counter to change. str.
     55       delta: int.
     56     """
     57     self._state.counters_map.increment(counter_name, delta)
     58 
     59   def counter(self, counter_name, default=0):
     60     """Get the current counter value.
     61 
     62     Args:
     63       counter_name: name of the counter in string.
     64       default: default value in int if one doesn't exist.
     65 
     66     Returns:
     67       Current value of the counter.
     68     """
     69     return self._state.counters_map.get(counter_name, default)
     70 
     71 
     72 class SliceContext(object):
     73   """Context for map job."""
     74 
     75   def __init__(self, shard_context, shard_state, tstate):
     76     """Init.
     77 
     78     The signature of __init__ is subject to change.
     79 
     80     Read only properties:
     81       job_context: JobContext object.
     82       shard_context: ShardContext object.
     83       number: int. slice number. 0 indexed.
     84       attempt: int. The current attempt at executing this slice.
     85         starting at 1.
     86 
     87     Args:
     88       shard_context: map_job.JobConfig.
     89       shard_state: model.ShardState.
     90       tstate: model.TransientShardstate.
     91     """
     92     self._tstate = tstate
     93     self.job_context = shard_context.job_context
     94     self.shard_context = shard_context
     95     self.number = shard_state.slice_id
     96     self.attempt = shard_state.slice_retries + 1
     97 
     98   def incr(self, counter_name, delta=1):
     99     """See shard_context.count."""
    100     self.shard_context.incr(counter_name, delta)
    101 
    102   def counter(self, counter_name, default=0):
    103     """See shard_context.count."""
    104     return self.shard_context.counter(counter_name, default)
    105 
    106   def emit(self, value):
    107     """Emits a value to output writer.
    108 
    109     Args:
    110       value: a value of type expected by the output writer.
    111     """
    112     if not self._tstate.output_writer:
    113       logging.error("emit is called, but no output writer is set.")
    114       return
    115     self._tstate.output_writer.write(value)
    116