Home | History | Annotate | Download | only in pipeline
      1 #!/usr/bin/env python
      2 #
      3 # Copyright 2010 Google Inc.
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 #     http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 """Google App Engine Pipeline API for complex, asynchronous workflows."""
     18 
     19 __all__ = [
     20     # Public API.
     21     'Error', 'PipelineSetupError', 'PipelineExistsError',
     22     'PipelineRuntimeError', 'SlotNotFilledError', 'SlotNotDeclaredError',
     23     'UnexpectedPipelineError', 'PipelineStatusError', 'Slot', 'Pipeline',
     24     'PipelineFuture', 'After', 'InOrder', 'Retry', 'Abort', 'get_status_tree',
     25     'get_pipeline_names', 'get_root_list', 'create_handlers_map',
     26     'set_enforce_auth',
     27 ]
     28 
     29 import datetime
     30 import hashlib
     31 import itertools
     32 import logging
     33 import os
     34 import posixpath
     35 import pprint
     36 import re
     37 import sys
     38 import threading
     39 import time
     40 import urllib
     41 import uuid
     42 
     43 from google.appengine.api import mail
     44 from google.appengine.api import app_identity
     45 from google.appengine.api import users
     46 from google.appengine.api import taskqueue
     47 from google.appengine.ext import blobstore
     48 from google.appengine.ext import db
     49 from google.appengine.ext import webapp
     50 
     51 # pylint: disable=g-import-not-at-top
     52 # TODO(user): Cleanup imports if/when cloudstorage becomes part of runtime.
     53 try:
     54   # Check if the full cloudstorage package exists. The stub part is in runtime.
     55   import cloudstorage
     56   if hasattr(cloudstorage, "_STUB"):
     57     cloudstorage = None
     58 except ImportError:
     59   pass  # CloudStorage library not available
     60 
     61 try:
     62   import json
     63 except ImportError:
     64   import simplejson as json
     65 
     66 # Relative imports
     67 import models
     68 import status_ui
     69 import util as mr_util
     70 
     71 # pylint: disable=g-bad-name
     72 # pylint: disable=protected-access
     73 
     74 # For convenience
     75 _BarrierIndex = models._BarrierIndex
     76 _BarrierRecord = models._BarrierRecord
     77 _PipelineRecord = models._PipelineRecord
     78 _SlotRecord = models._SlotRecord
     79 _StatusRecord = models._StatusRecord
     80 
     81 
     82 # Overall TODOs:
     83 # - Add a human readable name for start()
     84 
     85 # Potential TODOs:
     86 # - Add support for ANY N barriers.
     87 # - Allow Pipelines to declare they are "short" and optimize the evaluate()
     88 #   function to run as many of them in quick succession.
     89 # - Add support in all Pipelines for hold/release where up-stream
     90 #   barriers will fire but do nothing because the Pipeline is not ready.
     91 
     92 ################################################################################
     93 
     94 
     95 class Error(Exception):
     96   """Base class for exceptions in this module."""
     97 
     98 
     99 class PipelineSetupError(Error):
    100   """Base class for exceptions that happen before Pipeline execution."""
    101 
    102 
    103 class PipelineExistsError(PipelineSetupError):
    104   """A new Pipeline with an assigned idempotence_key cannot be overwritten."""
    105 
    106 
    107 class PipelineRuntimeError(Error):
    108   """Base class for exceptions that happen during Pipeline execution."""
    109 
    110 
    111 class SlotNotFilledError(PipelineRuntimeError):
    112   """A slot that should have been filled already was not yet filled."""
    113 
    114 
    115 class SlotNotDeclaredError(PipelineRuntimeError):
    116   """A slot that was filled or passed along was not previously declared."""
    117 
    118 
    119 class UnexpectedPipelineError(PipelineRuntimeError):
    120   """An assertion failed, potentially leaving the pipeline unable to proceed."""
    121 
    122 
    123 class PipelineUserError(Error):
    124   """Exceptions raised indirectly by developers to cause certain behaviors."""
    125 
    126 
    127 class Retry(PipelineUserError):
    128   """The currently running pipeline should be retried at a later time."""
    129 
    130 
    131 class Abort(PipelineUserError):
    132   """The currently running pipeline should be aborted up to the root."""
    133 
    134 
    135 class PipelineStatusError(Error):
    136   """Exceptions raised when trying to collect pipeline status."""
    137 
    138 
    139 class _CallbackTaskError(Error):
    140   """A callback task was unable to execute properly for some reason."""
    141 
    142 
    143 ################################################################################
    144 
    145 _MAX_BARRIERS_TO_NOTIFY = 10
    146 
    147 _MAX_ABORTS_TO_BEGIN = 10
    148 
    149 _TEST_MODE = False
    150 
    151 _TEST_ROOT_PIPELINE_KEY = None
    152 
    153 _DEFAULT_BACKOFF_SECONDS = 15
    154 
    155 _DEFAULT_BACKOFF_FACTOR = 2
    156 
    157 _DEFAULT_MAX_ATTEMPTS = 3
    158 
    159 _RETRY_WIGGLE_TIMEDELTA = datetime.timedelta(seconds=20)
    160 
    161 _DEBUG = False
    162 
    163 _MAX_JSON_SIZE = 900000
    164 
    165 _ENFORCE_AUTH = True
    166 
    167 _MAX_CALLBACK_TASK_RETRIES = 5
    168 
    169 ################################################################################
    170 
    171 
    172 class Slot(object):
    173   """An output that is filled by a Pipeline as it executes."""
    174 
    175   def __init__(self, name=None, slot_key=None, strict=False):
    176     """Initializer.
    177 
    178     Args:
    179       name: The name of this slot.
    180       slot_key: The db.Key for this slot's _SlotRecord if it's already been
    181         allocated by an up-stream pipeline.
    182       strict: If this Slot was created as an output of a strictly defined
    183         pipeline.
    184     """
    185     if name is None:
    186       raise UnexpectedPipelineError('Slot with key "%s" missing a name.' %
    187                                     slot_key)
    188     if slot_key is None:
    189       slot_key = db.Key.from_path(_SlotRecord.kind(), uuid.uuid4().hex)
    190       self._exists = _TEST_MODE
    191     else:
    192       self._exists = True
    193     self._touched = False
    194     self._strict = strict
    195     self.name = name
    196     self.key = slot_key
    197     self.filled = False
    198     self._filler_pipeline_key = None
    199     self._fill_datetime = None
    200     self._value = None
    201 
    202   @property
    203   def value(self):
    204     """Returns the current value of this slot.
    205 
    206     Returns:
    207       The value of the slot (a serializable Python type).
    208 
    209     Raises:
    210       SlotNotFilledError if the value hasn't been filled yet.
    211     """
    212     if not self.filled:
    213       raise SlotNotFilledError('Slot with name "%s", key "%s" not yet filled.'
    214                                % (self.name, self.key))
    215     return self._value
    216 
    217   @property
    218   def filler(self):
    219     """Returns the pipeline ID that filled this slot's value.
    220 
    221     Returns:
    222       A string that is the pipeline ID.
    223 
    224     Raises:
    225       SlotNotFilledError if the value hasn't been filled yet.
    226     """
    227     if not self.filled:
    228       raise SlotNotFilledError('Slot with name "%s", key "%s" not yet filled.'
    229                                % (self.name, self.key))
    230     return self._filler_pipeline_key.name()
    231 
    232   @property
    233   def fill_datetime(self):
    234     """Returns when the slot was filled.
    235 
    236     Returns:
    237       A datetime.datetime.
    238 
    239     Raises:
    240       SlotNotFilledError if the value hasn't been filled yet.
    241     """
    242     if not self.filled:
    243       raise SlotNotFilledError('Slot with name "%s", key "%s" not yet filled.'
    244                                % (self.name, self.key))
    245     return self._fill_datetime
    246 
    247   def _set_value(self, slot_record):
    248     """Sets the value of this slot based on its corresponding _SlotRecord.
    249 
    250     Does nothing if the slot has not yet been filled.
    251 
    252     Args:
    253       slot_record: The _SlotRecord containing this Slot's value.
    254     """
    255     if slot_record.status == _SlotRecord.FILLED:
    256       self.filled = True
    257       self._filler_pipeline_key = _SlotRecord.filler.get_value_for_datastore(
    258           slot_record)
    259       self._fill_datetime = slot_record.fill_time
    260       self._value = slot_record.value
    261 
    262   def _set_value_test(self, filler_pipeline_key, value):
    263     """Sets the value of this slot for use in testing.
    264 
    265     Args:
    266       filler_pipeline_key: The db.Key of the _PipelineRecord that filled
    267         this slot.
    268       value: The serializable value set for this slot.
    269     """
    270     self.filled = True
    271     self._filler_pipeline_key = filler_pipeline_key
    272     self._fill_datetime = datetime.datetime.utcnow()
    273     # Convert to JSON and back again, to simulate the behavior of production.
    274     self._value = json.loads(json.dumps(
    275         value, cls=mr_util.JsonEncoder), cls=mr_util.JsonDecoder)
    276 
    277   def __repr__(self):
    278     """Returns a string representation of this slot."""
    279     if self.filled:
    280       return repr(self._value)
    281     else:
    282       return 'Slot(name="%s", slot_key="%s")' % (self.name, self.key)
    283 
    284 
    285 class PipelineFuture(object):
    286   """A future for accessing the outputs of a Pipeline."""
    287 
    288   # NOTE: Do not, ever, add a names() method to this class. Callers cannot do
    289   # introspection on their context of being called. Even though the runtime
    290   # environment of the Pipeline can allow for that to happen, such behavior
    291   # would prevent synchronous simulation and verification, whic is an
    292   # unacceptable tradeoff.
    293 
    294   def __init__(self, output_names, force_strict=False):
    295     """Initializer.
    296 
    297     Args:
    298       output_names: The list of require output names that will be strictly
    299         enforced by this class.
    300       force_strict: If True, force this future to be in strict mode.
    301     """
    302     self._after_all_pipelines = set()
    303     self._output_dict = {
    304       'default': Slot(name='default'),
    305     }
    306 
    307     self._strict = len(output_names) > 0 or force_strict
    308     if self._strict:
    309       for name in output_names:
    310         if name in self._output_dict:
    311           raise UnexpectedPipelineError('Output name reserved: "%s"' % name)
    312         self._output_dict[name] = Slot(name=name, strict=True)
    313 
    314   def _inherit_outputs(self,
    315                        pipeline_name,
    316                        already_defined,
    317                        resolve_outputs=False):
    318     """Inherits outputs from a calling Pipeline.
    319 
    320     Args:
    321       pipeline_name: The Pipeline class name (used for debugging).
    322       already_defined: Maps output name to stringified db.Key (of _SlotRecords)
    323         of any exiting output slots to be inherited by this future.
    324       resolve_outputs: When True, this method will dereference all output slots
    325         before returning back to the caller, making those output slots' values
    326         available.
    327 
    328     Raises:
    329       UnexpectedPipelineError when resolve_outputs is True and any of the output
    330       slots could not be retrived from the Datastore.
    331     """
    332     for name, slot_key in already_defined.iteritems():
    333       if not isinstance(slot_key, db.Key):
    334         slot_key = db.Key(slot_key)
    335 
    336       slot = self._output_dict.get(name)
    337       if slot is None:
    338         if self._strict:
    339           raise UnexpectedPipelineError(
    340               'Inherited output named "%s" must be filled but '
    341               'not declared for pipeline class "%s"' % (name, pipeline_name))
    342         else:
    343           self._output_dict[name] = Slot(name=name, slot_key=slot_key)
    344       else:
    345         slot.key = slot_key
    346         slot._exists = True
    347 
    348     if resolve_outputs:
    349       slot_key_dict = dict((s.key, s) for s in self._output_dict.itervalues())
    350       all_slots = db.get(slot_key_dict.keys())
    351       for slot, slot_record in zip(slot_key_dict.itervalues(), all_slots):
    352         if slot_record is None:
    353           raise UnexpectedPipelineError(
    354               'Inherited output named "%s" for pipeline class "%s" is '
    355               'missing its Slot in the datastore: "%s"' %
    356               (slot.name, pipeline_name, slot.key))
    357         slot = slot_key_dict[slot_record.key()]
    358         slot._set_value(slot_record)
    359 
    360   def __getattr__(self, name):
    361     """Provides an output Slot instance with the given name if allowed."""
    362     if name not in self._output_dict:
    363       if self._strict:
    364         raise SlotNotDeclaredError('Undeclared output with name "%s"' % name)
    365       self._output_dict[name] = Slot(name=name)
    366     slot = self._output_dict[name]
    367     return slot
    368 
    369 
    370 class _PipelineMeta(type):
    371   """Meta-class for recording all Pipelines that have been defined."""
    372 
    373   # List of all Pipeline classes that have been seen.
    374   _all_classes = []
    375 
    376   def __new__(meta, name, bases, cls_dict):
    377     """Initializes the class path of a Pipeline and saves it."""
    378     cls = type.__new__(meta, name, bases, cls_dict)
    379     meta._all_classes.append(cls)
    380     return cls
    381 
    382 
    383 class ClassProperty(object):
    384   """Descriptor that lets us have read-only class properties."""
    385 
    386   def __init__(self, method):
    387     self.method = method
    388 
    389   def __get__(self, cls, obj):
    390     return self.method(obj)
    391 
    392 
    393 class Pipeline(object):
    394   """A Pipeline function-object that performs operations and has a life cycle.
    395 
    396   Class properties (to be overridden by sub-classes):
    397     async: When True, this Pipeline will execute asynchronously and fill the
    398       default output slot itself using the complete() method.
    399     output_names: List of named outputs (in addition to the default slot) that
    400       this Pipeline must output to (no more, no less).
    401     public_callbacks: If the callback URLs generated for this class should be
    402       accessible by all external requests regardless of login or task queue.
    403     admin_callbacks: If the callback URLs generated for this class should be
    404       accessible by the task queue ane externally by users logged in as admins.
    405     class_path: String identifier for this Pipeline, which is derived from
    406       its path in the global system modules dictionary.
    407 
    408   Modifiable instance properties:
    409     backoff_seconds: How many seconds to use as the constant factor in
    410       exponential backoff; may be changed by the user.
    411     backoff_factor: Base factor to use for exponential backoff. The formula
    412       followed is (backoff_seconds * backoff_factor^current_attempt).
    413     max_attempts: Maximum number of retry attempts to make before failing
    414       completely and aborting the entire pipeline up to the root.
    415     target: The application version to use for processing this Pipeline. This
    416       can be set to the name of a backend to direct Pipelines to run there.
    417 
    418   Instance properties:
    419     pipeline_id: The ID of this pipeline.
    420     root_pipeline_id: The ID of the root of this pipeline.
    421     queue_name: The queue this pipeline runs on or None if unknown.
    422     current_attempt: The current attempt being tried for this pipeline.
    423   """
    424 
    425   __metaclass__ = _PipelineMeta
    426 
    427   # To be set by sub-classes
    428   async = False
    429   output_names = []
    430   public_callbacks = False
    431   admin_callbacks = False
    432 
    433   # Internal only.
    434   _class_path = None  # Set for each class
    435   _send_mail = mail.send_mail_to_admins  # For testing
    436 
    437   # callback_xg_transaction: Determines whether callbacks are processed within
    438   # a single entity-group transaction (False), a cross-entity-group
    439   # transaction (True), or no transaction (None, default). It is generally
    440   # unsafe for a callback to modify pipeline state outside of a transaction, in
    441   # particular any pre-initialized state from the pipeline record, such as the
    442   # outputs. If a transaction is used, the callback method must operate within
    443   # the datastore's transaction time limits.
    444   # TODO(user): Make non-internal once other API calls are considered for
    445   # transaction support.
    446   _callback_xg_transaction = None
    447 
    448   def __init__(self, *args, **kwargs):
    449     """Initializer.
    450 
    451     Args:
    452       *args: The positional arguments for this function-object.
    453       **kwargs: The keyword arguments for this function-object.
    454     """
    455     self.args = args
    456     self.kwargs = kwargs
    457     self.outputs = None
    458     self.backoff_seconds = _DEFAULT_BACKOFF_SECONDS
    459     self.backoff_factor = _DEFAULT_BACKOFF_FACTOR
    460     self.max_attempts = _DEFAULT_MAX_ATTEMPTS
    461     self.target = None
    462     self.task_retry = False
    463     self._current_attempt = 0
    464     self._root_pipeline_key = None
    465     self._pipeline_key = None
    466     self._context = None
    467     self._result_status = None
    468     self._set_class_path()
    469     # Introspectively set the target so pipelines stick to the version it
    470     # started.
    471     self.target = mr_util._get_task_target()
    472 
    473     if _TEST_MODE:
    474       self._context = _PipelineContext('', 'default', '')
    475       self._root_pipeline_key = _TEST_ROOT_PIPELINE_KEY
    476       self._pipeline_key = db.Key.from_path(
    477           _PipelineRecord.kind(), uuid.uuid4().hex)
    478       self.outputs = PipelineFuture(self.output_names)
    479       self._context.evaluate_test(self)
    480 
    481   @property
    482   def pipeline_id(self):
    483     """Returns the ID of this Pipeline as a string or None if unknown."""
    484     if self._pipeline_key is None:
    485       return None
    486     return self._pipeline_key.name()
    487 
    488   @property
    489   def root_pipeline_id(self):
    490     """Returns root pipeline ID as a websafe string or None if unknown."""
    491     if self._root_pipeline_key is None:
    492       return None
    493     return self._root_pipeline_key.name()
    494 
    495   @property
    496   def is_root(self):
    497     """Returns True if this pipeline is a root pipeline, False otherwise."""
    498     return self._root_pipeline_key == self._pipeline_key
    499 
    500   @property
    501   def queue_name(self):
    502     """Returns the queue name this Pipeline runs on or None if unknown."""
    503     if self._context:
    504       return self._context.queue_name
    505     return None
    506 
    507   @property
    508   def base_path(self):
    509     """Returns the base path for Pipeline URL handlers or None if unknown."""
    510     if self._context:
    511       return self._context.base_path
    512     return None
    513 
    514   @property
    515   def has_finalized(self):
    516     """Returns True if this pipeline has completed and finalized."""
    517     return self._result_status == _PipelineRecord.DONE
    518 
    519   @property
    520   def was_aborted(self):
    521     """Returns True if this pipeline was aborted."""
    522     return self._result_status == _PipelineRecord.ABORTED
    523 
    524   @property
    525   def current_attempt(self):
    526     """Returns the current attempt at running this pipeline, starting at 1."""
    527     return self._current_attempt + 1
    528 
    529   @property
    530   def test_mode(self):
    531     """Returns True if the pipeline is running in test mode."""
    532     return _TEST_MODE
    533 
    534   @ClassProperty
    535   def class_path(cls):
    536     """Returns the unique string identifier for this Pipeline class.
    537 
    538     Refers to how to find the Pipeline in the global modules dictionary.
    539     """
    540     cls._set_class_path()
    541     return cls._class_path
    542 
    543   @classmethod
    544   def from_id(cls, pipeline_id, resolve_outputs=True, _pipeline_record=None):
    545     """Returns an instance corresponding to an existing Pipeline.
    546 
    547     The returned object will have the same properties a Pipeline does while
    548     it's running synchronously (e.g., like what it's first allocated), allowing
    549     callers to inspect caller arguments, outputs, fill slots, complete the
    550     pipeline, abort, retry, etc.
    551 
    552     Args:
    553       pipeline_id: The ID of this pipeline (a string).
    554       resolve_outputs: When True, dereference the outputs of this Pipeline
    555         so their values can be accessed by the caller.
    556       _pipeline_record: Internal-only. The _PipelineRecord instance to use
    557         to instantiate this instance instead of fetching it from
    558         the datastore.
    559 
    560     Returns:
    561       Pipeline sub-class instances or None if it could not be found.
    562     """
    563     pipeline_record = _pipeline_record
    564 
    565     # Support pipeline IDs and idempotence_keys that are not unicode.
    566     if not isinstance(pipeline_id, unicode):
    567       try:
    568         pipeline_id = pipeline_id.encode('utf-8')
    569       except UnicodeDecodeError:
    570         pipeline_id = hashlib.sha1(pipeline_id).hexdigest()
    571 
    572     pipeline_key = db.Key.from_path(_PipelineRecord.kind(), pipeline_id)
    573 
    574     if pipeline_record is None:
    575       pipeline_record = db.get(pipeline_key)
    576     if pipeline_record is None:
    577       return None
    578 
    579     try:
    580       pipeline_func_class = mr_util.for_name(pipeline_record.class_path)
    581     except ImportError, e:
    582       logging.warning('Tried to find Pipeline %s#%s, but class could '
    583                       'not be found. Using default Pipeline class instead.',
    584                       pipeline_record.class_path, pipeline_id)
    585       pipeline_func_class = cls
    586 
    587     params = pipeline_record.params
    588     arg_list, kwarg_dict = _dereference_args(
    589         pipeline_record.class_path, params['args'], params['kwargs'])
    590     outputs = PipelineFuture(pipeline_func_class.output_names)
    591     outputs._inherit_outputs(
    592         pipeline_record.class_path,
    593         params['output_slots'],
    594         resolve_outputs=resolve_outputs)
    595 
    596     stage = pipeline_func_class(*arg_list, **kwarg_dict)
    597     stage.backoff_seconds = params['backoff_seconds']
    598     stage.backoff_factor = params['backoff_factor']
    599     stage.max_attempts = params['max_attempts']
    600     stage.task_retry = params['task_retry']
    601     stage.target = params.get('target')  # May not be defined for old Pipelines
    602     stage._current_attempt = pipeline_record.current_attempt
    603     stage._set_values_internal(
    604         _PipelineContext('', params['queue_name'], params['base_path']),
    605         pipeline_key,
    606         _PipelineRecord.root_pipeline.get_value_for_datastore(pipeline_record),
    607         outputs,
    608         pipeline_record.status)
    609     return stage
    610 
    611   # Methods that can be invoked on a Pipeline instance by anyone with a
    612   # valid object (e.g., directly instantiated, retrieve via from_id).
    613   def start(self,
    614             idempotence_key='',
    615             queue_name='default',
    616             base_path='/_ah/pipeline',
    617             return_task=False,
    618             countdown=None,
    619             eta=None):
    620     """Starts a new instance of this pipeline.
    621 
    622     Args:
    623       idempotence_key: The ID to use for this Pipeline and throughout its
    624         asynchronous workflow to ensure the operations are idempotent. If
    625         empty a starting key will be automatically assigned.
    626       queue_name: What queue this Pipeline's workflow should execute on.
    627       base_path: The relative URL path to where the Pipeline API is
    628         mounted for access by the taskqueue API or external requests.
    629       return_task: When True, a task to start this pipeline will be returned
    630         instead of submitted, allowing the caller to start off this pipeline
    631         as part of a separate transaction (potentially leaving this newly
    632         allocated pipeline's datastore entities in place if that separate
    633         transaction fails for any reason).
    634       countdown: Time in seconds into the future that this Task should execute.
    635         Defaults to zero.
    636       eta: A datetime.datetime specifying the absolute time at which the task
    637         should be executed. Must not be specified if 'countdown' is specified.
    638         This may be timezone-aware or timezone-naive. If None, defaults to now.
    639         For pull tasks, no worker will be able to lease this task before the
    640         time indicated by eta.
    641 
    642     Returns:
    643       A taskqueue.Task instance if return_task was True. This task will *not*
    644       have a name, thus to ensure reliable execution of your pipeline you
    645       should add() this task as part of a separate Datastore transaction.
    646 
    647     Raises:
    648       PipelineExistsError if the pipeline with the given idempotence key exists.
    649       PipelineSetupError if the pipeline could not start for any other reason.
    650     """
    651     if not idempotence_key:
    652       idempotence_key = uuid.uuid4().hex
    653     elif not isinstance(idempotence_key, unicode):
    654       try:
    655         idempotence_key.encode('utf-8')
    656       except UnicodeDecodeError:
    657         idempotence_key = hashlib.sha1(idempotence_key).hexdigest()
    658 
    659     pipeline_key = db.Key.from_path(_PipelineRecord.kind(), idempotence_key)
    660     context = _PipelineContext('', queue_name, base_path)
    661     future = PipelineFuture(self.output_names, force_strict=True)
    662     try:
    663       self._set_values_internal(
    664           context, pipeline_key, pipeline_key, future, _PipelineRecord.WAITING)
    665       return context.start(
    666           self, return_task=return_task, countdown=countdown, eta=eta)
    667     except Error:
    668       # Pass through exceptions that originate in this module.
    669       raise
    670     except Exception, e:
    671       # Re-type any exceptions that were raised in dependent methods.
    672       raise PipelineSetupError('Error starting %s#%s: %s' % (
    673           self, idempotence_key, str(e)))
    674 
    675   def start_test(self, idempotence_key=None, base_path='', **kwargs):
    676     """Starts this pipeline in test fashion.
    677 
    678     Args:
    679       idempotence_key: Dummy idempotence_key to use for this root pipeline.
    680       base_path: Dummy base URL path to use for this root pipeline.
    681       kwargs: Ignored keyword arguments usually passed to start().
    682     """
    683     if not idempotence_key:
    684       idempotence_key = uuid.uuid4().hex
    685     pipeline_key = db.Key.from_path(_PipelineRecord.kind(), idempotence_key)
    686     context = _PipelineContext('', 'default', base_path)
    687     future = PipelineFuture(self.output_names, force_strict=True)
    688     self._set_values_internal(
    689         context, pipeline_key, pipeline_key, future, _PipelineRecord.WAITING)
    690     context.start_test(self)
    691 
    692   # Pipeline control methods.
    693   def retry(self, retry_message=''):
    694     """Forces a currently running asynchronous pipeline to retry.
    695 
    696     Note this may not be called by synchronous or generator pipelines. Those
    697     must instead raise the 'Retry' exception during execution.
    698 
    699     Args:
    700       retry_message: Optional message explaining why the retry happened.
    701 
    702     Returns:
    703       True if the Pipeline should be retried, False if it cannot be cancelled
    704       mid-flight for some reason.
    705     """
    706     if not self.async:
    707       raise UnexpectedPipelineError(
    708           'May only call retry() method for asynchronous pipelines.')
    709     if self.try_cancel():
    710       self._context.transition_retry(self._pipeline_key, retry_message)
    711       return True
    712     else:
    713       return False
    714 
    715   def abort(self, abort_message=''):
    716     """Mark the entire pipeline up to the root as aborted.
    717 
    718     Note this should only be called from *outside* the context of a running
    719     pipeline. Synchronous and generator pipelines should raise the 'Abort'
    720     exception to cause this behavior during execution.
    721 
    722     Args:
    723       abort_message: Optional message explaining why the abort happened.
    724 
    725     Returns:
    726       True if the abort signal was sent successfully; False if the pipeline
    727       could not be aborted for any reason.
    728     """
    729     # TODO: Use thread-local variable to enforce that this is not called
    730     # while a pipeline is executing in the current thread.
    731     if (self.async and self._root_pipeline_key == self._pipeline_key and
    732         not self.try_cancel()):
    733       # Handle the special case where the root pipeline is async and thus
    734       # cannot be aborted outright.
    735       return False
    736     else:
    737       return self._context.begin_abort(
    738           self._root_pipeline_key, abort_message=abort_message)
    739 
    740   # Methods used by the Pipeline as it runs.
    741   def fill(self, name_or_slot, value):
    742     """Fills an output slot required by this Pipeline.
    743 
    744     Args:
    745       name_or_slot: The name of the slot (a string) or Slot record to fill.
    746       value: The serializable value to assign to this slot.
    747 
    748     Raises:
    749       UnexpectedPipelineError if the Slot no longer exists. SlotNotDeclaredError
    750       if trying to output to a slot that was not declared ahead of time.
    751     """
    752     if isinstance(name_or_slot, basestring):
    753       slot = getattr(self.outputs, name_or_slot)
    754     elif isinstance(name_or_slot, Slot):
    755       slot = name_or_slot
    756     else:
    757       raise UnexpectedPipelineError(
    758           'Could not fill invalid output name: %r' % name_or_slot)
    759 
    760     if not slot._exists:
    761       raise SlotNotDeclaredError(
    762           'Cannot fill output with name "%s" that was just '
    763           'declared within the Pipeline context.' % slot.name)
    764 
    765     self._context.fill_slot(self._pipeline_key, slot, value)
    766 
    767   def set_status(self, message=None, console_url=None, status_links=None):
    768     """Sets the current status of this pipeline.
    769 
    770     This method is purposefully non-transactional. Updates are written to the
    771     datastore immediately and overwrite all existing statuses.
    772 
    773     Args:
    774       message: (optional) Overall status message.
    775       console_url: (optional) Relative URL to use for the "console" of this
    776         pipeline that displays current progress. When None, no console will
    777         be displayed.
    778       status_links: (optional) Dictionary of readable link names to relative
    779         URLs that should be associated with this pipeline as it runs. These links
    780         provide convenient access to other dashboards, consoles, etc associated
    781         with the pipeline.
    782 
    783     Raises:
    784       PipelineRuntimeError if the status could not be set for any reason.
    785     """
    786     if _TEST_MODE:
    787       logging.info(
    788           'New status for %s#%s: message=%r, console_url=%r, status_links=%r',
    789           self, self.pipeline_id, message, console_url, status_links)
    790       return
    791 
    792     status_key = db.Key.from_path(_StatusRecord.kind(), self.pipeline_id)
    793     root_pipeline_key = db.Key.from_path(
    794         _PipelineRecord.kind(), self.root_pipeline_id)
    795     status_record = _StatusRecord(
    796         key=status_key, root_pipeline=root_pipeline_key)
    797 
    798     try:
    799       if message:
    800         status_record.message = message
    801       if console_url:
    802         status_record.console_url = console_url
    803       if status_links:
    804         # Alphabeticalize the list.
    805         status_record.link_names = sorted(
    806             db.Text(s) for s in status_links.iterkeys())
    807         status_record.link_urls = [
    808             db.Text(status_links[name]) for name in status_record.link_names]
    809 
    810       status_record.status_time = datetime.datetime.utcnow()
    811 
    812       status_record.put()
    813     except Exception, e:
    814       raise PipelineRuntimeError('Could not set status for %s#%s: %s' %
    815           (self, self.pipeline_id, str(e)))
    816 
    817   def complete(self, default_output=None):
    818     """Marks this asynchronous Pipeline as complete.
    819 
    820     Args:
    821       default_output: What value the 'default' output slot should be assigned.
    822 
    823     Raises:
    824       UnexpectedPipelineError if the slot no longer exists or this method was
    825       called for a pipeline that is not async.
    826     """
    827     # TODO: Enforce that all outputs expected by this async pipeline were
    828     # filled before this complete() function was called. May required all
    829     # async functions to declare their outputs upfront.
    830     if not self.async:
    831       raise UnexpectedPipelineError(
    832           'May only call complete() method for asynchronous pipelines.')
    833     self._context.fill_slot(
    834         self._pipeline_key, self.outputs.default, default_output)
    835 
    836   def get_callback_url(self, **kwargs):
    837     """Returns a relative URL for invoking this Pipeline's callback method.
    838 
    839     Args:
    840       kwargs: Dictionary mapping keyword argument names to single values that
    841         should be passed to the callback when it is invoked.
    842 
    843     Raises:
    844       UnexpectedPipelineError if this is invoked on pipeline that is not async.
    845     """
    846     # TODO: Support positional parameters.
    847     if not self.async:
    848       raise UnexpectedPipelineError(
    849           'May only call get_callback_url() method for asynchronous pipelines.')
    850     kwargs['pipeline_id'] = self._pipeline_key.name()
    851     params = urllib.urlencode(sorted(kwargs.items()))
    852     return '%s/callback?%s' % (self.base_path, params)
    853 
    854   def get_callback_task(self, *args, **kwargs):
    855     """Returns a task for calling back this Pipeline.
    856 
    857     Args:
    858       params: Keyword argument containing a dictionary of key/value pairs
    859         that will be passed to the callback when it is executed.
    860       args, kwargs: Passed to the taskqueue.Task constructor. Use these
    861         arguments to set the task name (for idempotence), etc.
    862 
    863     Returns:
    864       A taskqueue.Task instance that must be enqueued by the caller.
    865     """
    866     if not self.async:
    867       raise UnexpectedPipelineError(
    868           'May only call get_callback_task() method for asynchronous pipelines.')
    869 
    870     params = kwargs.get('params', {})
    871     kwargs['params'] = params
    872     params['pipeline_id'] = self._pipeline_key.name()
    873     kwargs['url'] = self.base_path + '/callback'
    874     kwargs['method'] = 'POST'
    875     return taskqueue.Task(*args, **kwargs)
    876 
    877   def send_result_email(self, sender=None):
    878     """Sends an email to admins indicating this Pipeline has completed.
    879 
    880     For developer convenience. Automatically called from finalized for root
    881     Pipelines that do not override the default action.
    882 
    883     Args:
    884       sender: (optional) Override the sender's email address.
    885     """
    886     status = 'successful'
    887     if self.was_aborted:
    888       status = 'aborted'
    889 
    890     app_id = os.environ['APPLICATION_ID']
    891     shard_index = app_id.find('~')
    892     if shard_index != -1:
    893       app_id = app_id[shard_index+1:]
    894 
    895     param_dict = {
    896         'status': status,
    897         'app_id': app_id,
    898         'class_path': self._class_path,
    899         'pipeline_id': self.root_pipeline_id,
    900         'base_path': '%s.appspot.com%s' % (app_id, self.base_path),
    901     }
    902     subject = (
    903         'Pipeline %(status)s: App "%(app_id)s", %(class_path)s'
    904         '#%(pipeline_id)s' % param_dict)
    905     body = """View the pipeline results here:
    906 
    907 http://%(base_path)s/status?root=%(pipeline_id)s
    908 
    909 Thanks,
    910 
    911 The Pipeline API
    912 """ % param_dict
    913 
    914     html = """<html><body>
    915 <p>View the pipeline results here:</p>
    916 
    917 <p><a href="http://%(base_path)s/status?root=%(pipeline_id)s"
    918 >http://%(base_path)s/status?root=%(pipeline_id)s</a></p>
    919 
    920 <p>
    921 Thanks,
    922 <br>
    923 The Pipeline API
    924 </p>
    925 </body></html>
    926 """ % param_dict
    927 
    928     if sender is None:
    929       sender = '%s@%s.appspotmail.com' % (app_id, app_id)
    930     try:
    931       self._send_mail(sender, subject, body, html=html)
    932     except (mail.InvalidSenderError, mail.InvalidEmailError):
    933       logging.warning('Could not send result email for '
    934                       'root pipeline ID "%s" from sender "%s"',
    935                       self.root_pipeline_id, sender)
    936 
    937   def cleanup(self):
    938     """Clean up this Pipeline and all Datastore records used for coordination.
    939 
    940     Only works when called on a root pipeline. Child pipelines will ignore
    941     calls to this method.
    942 
    943     After this method is called, Pipeline.from_id() and related status
    944     methods will return inconsistent or missing results. This method is
    945     fire-and-forget and asynchronous.
    946     """
    947     if self._root_pipeline_key is None:
    948       raise UnexpectedPipelineError(
    949           'Could not cleanup Pipeline with unknown root pipeline ID.')
    950     if not self.is_root:
    951       return
    952     task = taskqueue.Task(
    953         params=dict(root_pipeline_key=self._root_pipeline_key),
    954         url=self.base_path + '/cleanup',
    955         headers={'X-Ae-Pipeline-Key': self._root_pipeline_key})
    956     taskqueue.Queue(self.queue_name).add(task)
    957 
    958   def with_params(self, **kwargs):
    959     """Modify various execution parameters of a Pipeline before it runs.
    960 
    961     This method has no effect in test mode.
    962 
    963     Args:
    964       kwargs: Attributes to modify on this Pipeline instance before it has
    965         been executed.
    966 
    967     Returns:
    968       This Pipeline instance, for easy chaining.
    969     """
    970     if _TEST_MODE:
    971       logging.info(
    972           'Setting runtime parameters for %s#%s: %r',
    973           self, self.pipeline_id, kwargs)
    974       return self
    975 
    976     if self.pipeline_id is not None:
    977       raise UnexpectedPipelineError(
    978           'May only call with_params() on a Pipeline that has not yet '
    979           'been scheduled for execution.')
    980 
    981     ALLOWED = ('backoff_seconds', 'backoff_factor', 'max_attempts', 'target')
    982     for name, value in kwargs.iteritems():
    983       if name not in ALLOWED:
    984         raise TypeError('Unexpected keyword: %s=%r' % (name, value))
    985       setattr(self, name, value)
    986     return self
    987 
    988   # Methods implemented by developers for lifecycle management. These
    989   # must be idempotent under all circumstances.
    990   def run(self, *args, **kwargs):
    991     """Runs this Pipeline."""
    992     raise NotImplementedError('Must implement "run" in Pipeline sub-class.')
    993 
    994   def run_test(self, *args, **kwargs):
    995     """Runs this Pipeline in test mode."""
    996     raise NotImplementedError(
    997         'Must implement "run_test" in Pipeline sub-class.')
    998 
    999   def finalized(self):
   1000     """Finalizes this Pipeline after execution if it's a generator.
   1001 
   1002     Default action as the root pipeline is to email the admins with the status.
   1003     Implementors be sure to call 'was_aborted' to find out if the finalization
   1004     that you're handling is for a success or error case.
   1005     """
   1006     if self.pipeline_id == self.root_pipeline_id:
   1007       self.send_result_email()
   1008 
   1009   def finalized_test(self, *args, **kwargs):
   1010     """Finalized this Pipeline in test mode."""
   1011     raise NotImplementedError(
   1012         'Must implement "finalized_test" in Pipeline sub-class.')
   1013 
   1014   def callback(self, **kwargs):
   1015     """This Pipeline received an asynchronous callback request."""
   1016     raise NotImplementedError(
   1017         'Must implement "callback" in Pipeline sub-class.')
   1018 
   1019   def try_cancel(self):
   1020     """This pipeline has been cancelled.
   1021 
   1022     Called when a pipeline is interrupted part-way through due to some kind
   1023     of failure (an abort of the whole pipeline to the root or a forced retry on
   1024     this child pipeline).
   1025 
   1026     Returns:
   1027       True to indicate that cancellation was successful and this pipeline may
   1028       go in the retry or aborted state; False to indicate that this pipeline
   1029       cannot be canceled right now and must remain as-is.
   1030     """
   1031     return False
   1032 
   1033   # Internal methods.
   1034   @classmethod
   1035   def _set_class_path(cls, module_dict=sys.modules):
   1036     """Sets the absolute path to this class as a string.
   1037 
   1038     Used by the Pipeline API to reconstruct the Pipeline sub-class object
   1039     at execution time instead of passing around a serialized function.
   1040 
   1041     Args:
   1042       module_dict: Used for testing.
   1043     """
   1044     # Do not traverse the class hierarchy fetching the class path attribute.
   1045     found = cls.__dict__.get('_class_path')
   1046     if found is not None:
   1047       return
   1048 
   1049     # Do not set the _class_path for the base-class, otherwise all children's
   1050     # lookups for _class_path will fall through and return 'Pipeline' above.
   1051     # This situation can happen if users call the generic Pipeline.from_id
   1052     # to get the result of a Pipeline without knowing its specific class.
   1053     if cls is Pipeline:
   1054       return
   1055 
   1056     class_path = '%s.%s' % (cls.__module__, cls.__name__)
   1057     # When a WSGI handler is invoked as an entry point, any Pipeline class
   1058     # defined in the same file as the handler will get __module__ set to
   1059     # __main__. Thus we need to find out its real fully qualified path.
   1060     if cls.__module__ == '__main__':
   1061       for name, module in module_dict.items():
   1062         if name == '__main__':
   1063           continue
   1064         found = getattr(module, cls.__name__, None)
   1065         if found is cls:
   1066           class_path = '%s.%s' % (name, cls.__name__)
   1067           break
   1068     cls._class_path = class_path
   1069 
   1070   def _set_values_internal(self,
   1071                            context,
   1072                            pipeline_key,
   1073                            root_pipeline_key,
   1074                            outputs,
   1075                            result_status):
   1076     """Sets the user-visible values provided as an API by this class.
   1077 
   1078     Args:
   1079       context: The _PipelineContext used for this Pipeline.
   1080       pipeline_key: The db.Key of this pipeline.
   1081       root_pipeline_key: The db.Key of the root pipeline.
   1082       outputs: The PipelineFuture for this pipeline.
   1083       result_status: The result status of this pipeline.
   1084     """
   1085     self._context = context
   1086     self._pipeline_key = pipeline_key
   1087     self._root_pipeline_key = root_pipeline_key
   1088     self._result_status = result_status
   1089     self.outputs = outputs
   1090 
   1091   def _callback_internal(self, kwargs):
   1092     """Used to execute callbacks on asynchronous pipelines."""
   1093     logging.debug('Callback %s(*%s, **%s)#%s with params: %r',
   1094                   self._class_path, _short_repr(self.args),
   1095                   _short_repr(self.kwargs), self._pipeline_key.name(), kwargs)
   1096     return self.callback(**kwargs)
   1097 
   1098   def _run_internal(self,
   1099                     context,
   1100                     pipeline_key,
   1101                     root_pipeline_key,
   1102                     caller_output):
   1103     """Used by the Pipeline evaluator to execute this Pipeline."""
   1104     self._set_values_internal(
   1105         context, pipeline_key, root_pipeline_key, caller_output,
   1106         _PipelineRecord.RUN)
   1107     logging.debug('Running %s(*%s, **%s)#%s',
   1108                   self._class_path, _short_repr(self.args),
   1109                   _short_repr(self.kwargs), self._pipeline_key.name())
   1110     return self.run(*self.args, **self.kwargs)
   1111 
   1112   def _finalized_internal(self,
   1113                           context,
   1114                           pipeline_key,
   1115                           root_pipeline_key,
   1116                           caller_output,
   1117                           aborted):
   1118     """Used by the Pipeline evaluator to finalize this Pipeline."""
   1119     result_status = _PipelineRecord.RUN
   1120     if aborted:
   1121       result_status = _PipelineRecord.ABORTED
   1122 
   1123     self._set_values_internal(
   1124         context, pipeline_key, root_pipeline_key, caller_output, result_status)
   1125     logging.debug('Finalizing %s(*%r, **%r)#%s',
   1126                   self._class_path, _short_repr(self.args),
   1127                   _short_repr(self.kwargs), self._pipeline_key.name())
   1128     try:
   1129       self.finalized()
   1130     except NotImplementedError:
   1131       pass
   1132 
   1133   def __repr__(self):
   1134     """Returns a string representation of this Pipeline."""
   1135     return '%s(*%s, **%s)' % (
   1136         self._class_path, _short_repr(self.args), _short_repr(self.kwargs))
   1137 
   1138 
   1139 # TODO: Change InOrder and After to use a common thread-local list of
   1140 # execution modifications to apply to the current evaluating pipeline.
   1141 
   1142 class After(object):
   1143   """Causes all contained Pipelines to run after the given ones complete.
   1144 
   1145   Must be used in a 'with' block.
   1146   """
   1147 
   1148   _local = threading.local()
   1149 
   1150   def __init__(self, *futures):
   1151     """Initializer.
   1152 
   1153     Args:
   1154       *futures: PipelineFutures that all subsequent pipelines should follow.
   1155         May be empty, in which case this statement does nothing.
   1156     """
   1157     for f in futures:
   1158       if not isinstance(f, PipelineFuture):
   1159         raise TypeError('May only pass PipelineFuture instances to After(). %r',
   1160                         type(f))
   1161     self._futures = set(futures)
   1162 
   1163   def __enter__(self):
   1164     """When entering a 'with' block."""
   1165     After._thread_init()
   1166     After._local._after_all_futures.extend(self._futures)
   1167 
   1168   def __exit__(self, type, value, trace):
   1169     """When exiting a 'with' block."""
   1170     for future in self._futures:
   1171       After._local._after_all_futures.remove(future)
   1172     return False
   1173 
   1174   @classmethod
   1175   def _thread_init(cls):
   1176     """Ensure thread local is initialized."""
   1177     if not hasattr(cls._local, '_after_all_futures'):
   1178       cls._local._after_all_futures = []
   1179 
   1180 
   1181 class InOrder(object):
   1182   """Causes all contained Pipelines to run in order.
   1183 
   1184   Must be used in a 'with' block.
   1185   """
   1186 
   1187   _local = threading.local()
   1188 
   1189   @classmethod
   1190   def _add_future(cls, future):
   1191     """Adds a future to the list of in-order futures thus far.
   1192 
   1193     Args:
   1194       future: The future to add to the list.
   1195     """
   1196     if cls._local._activated:
   1197       cls._local._in_order_futures.add(future)
   1198 
   1199   def __init__(self):
   1200     """Initializer."""
   1201 
   1202   def __enter__(self):
   1203     """When entering a 'with' block."""
   1204     InOrder._thread_init()
   1205     if InOrder._local._activated:
   1206       raise UnexpectedPipelineError('Already in an InOrder "with" block.')
   1207     InOrder._local._activated = True
   1208     InOrder._local._in_order_futures.clear()
   1209 
   1210   def __exit__(self, type, value, trace):
   1211     """When exiting a 'with' block."""
   1212     InOrder._local._activated = False
   1213     InOrder._local._in_order_futures.clear()
   1214     return False
   1215 
   1216   @classmethod
   1217   def _thread_init(cls):
   1218     """Ensure thread local is initialized."""
   1219     if not hasattr(cls._local, '_in_order_futures'):
   1220       cls._local._in_order_futures = set()
   1221       cls._local._activated = False
   1222 
   1223 
   1224 ################################################################################
   1225 
   1226 def _short_repr(obj):
   1227   """Helper function returns a truncated repr() of an object."""
   1228   stringified = pprint.saferepr(obj)
   1229   if len(stringified) > 200:
   1230     return '%s... (%d bytes)' % (stringified[:200], len(stringified))
   1231   return stringified
   1232 
   1233 
   1234 def _write_json_blob(encoded_value, pipeline_id=None):
   1235   """Writes a JSON encoded value to a Cloud Storage File.
   1236   
   1237   This function will store the blob in a GCS file in the default bucket under
   1238   the appengine_pipeline directory. Optionally using another directory level
   1239   specified by pipeline_id
   1240   Args:
   1241     encoded_value: The encoded JSON string.
   1242     pipeline_id: A pipeline id to segment files in Cloud Storage, if none, 
   1243       the file will be created under appengine_pipeline
   1244 
   1245   Returns:
   1246     The blobstore.BlobKey for the file that was created.
   1247   """
   1248   
   1249   default_bucket = app_identity.get_default_gcs_bucket_name()
   1250   path_components = ['/', default_bucket, "appengine_pipeline"]
   1251   if pipeline_id:
   1252     path_components.append(pipeline_id)
   1253   path_components.append(uuid.uuid4().hex)
   1254   # Use posixpath to get a / even if we're running on windows somehow
   1255   file_name = posixpath.join(*path_components)
   1256   with cloudstorage.open(file_name, 'w', content_type='application/json') as f:
   1257     for start_index in xrange(0, len(encoded_value), _MAX_JSON_SIZE):
   1258       end_index = start_index + _MAX_JSON_SIZE
   1259       f.write(encoded_value[start_index:end_index])
   1260 
   1261   key_str = blobstore.create_gs_key("/gs" + file_name)
   1262   logging.debug("Created blob for filename = %s gs_key = %s", file_name, key_str)
   1263   return blobstore.BlobKey(key_str)
   1264 
   1265 
   1266 def _dereference_args(pipeline_name, args, kwargs):
   1267   """Dereference a Pipeline's arguments that are slots, validating them.
   1268 
   1269   Each argument value passed in is assumed to be a dictionary with the format:
   1270     {'type': 'value', 'value': 'serializable'}  # A resolved value.
   1271     {'type': 'slot', 'slot_key': 'str() on a db.Key'}  # A pending Slot.
   1272 
   1273   Args:
   1274     pipeline_name: The name of the pipeline class; used for debugging.
   1275     args: Iterable of positional arguments.
   1276     kwargs: Dictionary of keyword arguments.
   1277 
   1278   Returns:
   1279     Tuple (args, kwargs) where:
   1280       Args: A list of positional arguments values that are all dereferenced.
   1281       Kwargs: A list of keyword arguments values that are all dereferenced.
   1282 
   1283   Raises:
   1284     SlotNotFilledError if any of the supplied 'slot_key' records are not
   1285     present in the Datastore or have not yet been filled.
   1286     UnexpectedPipelineError if an unknown parameter type was passed.
   1287   """
   1288   lookup_slots = set()
   1289   for arg in itertools.chain(args, kwargs.itervalues()):
   1290     if arg['type'] == 'slot':
   1291       lookup_slots.add(db.Key(arg['slot_key']))
   1292 
   1293   slot_dict = {}
   1294   for key, slot_record in zip(lookup_slots, db.get(lookup_slots)):
   1295     if slot_record is None or slot_record.status != _SlotRecord.FILLED:
   1296       raise SlotNotFilledError(
   1297           'Slot "%s" missing its value. From %s(*args=%s, **kwargs=%s)' %
   1298           (key, pipeline_name, _short_repr(args), _short_repr(kwargs)))
   1299     slot_dict[key] = slot_record.value
   1300 
   1301   arg_list = []
   1302   for current_arg in args:
   1303     if current_arg['type'] == 'slot':
   1304       arg_list.append(slot_dict[db.Key(current_arg['slot_key'])])
   1305     elif current_arg['type'] == 'value':
   1306       arg_list.append(current_arg['value'])
   1307     else:
   1308       raise UnexpectedPipelineError('Unknown parameter type: %r' % current_arg)
   1309 
   1310   kwarg_dict = {}
   1311   for key, current_arg in kwargs.iteritems():
   1312     if current_arg['type'] == 'slot':
   1313       kwarg_dict[key] = slot_dict[db.Key(current_arg['slot_key'])]
   1314     elif current_arg['type'] == 'value':
   1315       kwarg_dict[key] = current_arg['value']
   1316     else:
   1317       raise UnexpectedPipelineError('Unknown parameter type: %r' % current_arg)
   1318 
   1319   return (arg_list, kwarg_dict)
   1320 
   1321 
   1322 def _generate_args(pipeline, future, queue_name, base_path):
   1323   """Generate the params used to describe a Pipeline's depedencies.
   1324 
   1325   The arguments passed to this method may be normal values, Slot instances
   1326   (for named outputs), or PipelineFuture instances (for referring to the
   1327   default output slot).
   1328 
   1329   Args:
   1330     pipeline: The Pipeline instance to generate args for.
   1331     future: The PipelineFuture for the Pipeline these arguments correspond to.
   1332     queue_name: The queue to run the pipeline on.
   1333     base_path: Relative URL for pipeline URL handlers.
   1334 
   1335   Returns:
   1336     Tuple (dependent_slots, output_slot_keys, params_text, params_blob) where:
   1337       dependent_slots: List of db.Key instances of _SlotRecords on which
   1338         this pipeline will need to block before execution (passed to
   1339         create a _BarrierRecord for running the pipeline).
   1340       output_slot_keys: List of db.Key instances of _SlotRecords that will
   1341         be filled by this pipeline during its execution (passed to create
   1342         a _BarrierRecord for finalizing the pipeline).
   1343       params_text: JSON dictionary of pipeline parameters to be serialized and
   1344         saved in a corresponding _PipelineRecord. Will be None if the params are
   1345         too big and must be saved in a blob instead.
   1346       params_blob: JSON dictionary of pipeline parameters to be serialized and
   1347         saved in a Blob file, and then attached to a _PipelineRecord. Will be
   1348         None if the params data size was small enough to fit in the entity.
   1349   """
   1350   params = {
   1351       'args': [],
   1352       'kwargs': {},
   1353       'after_all': [],
   1354       'output_slots': {},
   1355       'class_path': pipeline._class_path,
   1356       'queue_name': queue_name,
   1357       'base_path': base_path,
   1358       'backoff_seconds': pipeline.backoff_seconds,
   1359       'backoff_factor': pipeline.backoff_factor,
   1360       'max_attempts': pipeline.max_attempts,
   1361       'task_retry': pipeline.task_retry,
   1362       'target': pipeline.target,
   1363   }
   1364   dependent_slots = set()
   1365 
   1366   arg_list = params['args']
   1367   for current_arg in pipeline.args:
   1368     if isinstance(current_arg, PipelineFuture):
   1369       current_arg = current_arg.default
   1370     if isinstance(current_arg, Slot):
   1371       arg_list.append({'type': 'slot', 'slot_key': str(current_arg.key)})
   1372       dependent_slots.add(current_arg.key)
   1373     else:
   1374       arg_list.append({'type': 'value', 'value': current_arg})
   1375 
   1376   kwarg_dict = params['kwargs']
   1377   for name, current_arg in pipeline.kwargs.iteritems():
   1378     if isinstance(current_arg, PipelineFuture):
   1379       current_arg = current_arg.default
   1380     if isinstance(current_arg, Slot):
   1381       kwarg_dict[name] = {'type': 'slot', 'slot_key': str(current_arg.key)}
   1382       dependent_slots.add(current_arg.key)
   1383     else:
   1384       kwarg_dict[name] = {'type': 'value', 'value': current_arg}
   1385 
   1386   after_all = params['after_all']
   1387   for other_future in future._after_all_pipelines:
   1388     slot_key = other_future._output_dict['default'].key
   1389     after_all.append(str(slot_key))
   1390     dependent_slots.add(slot_key)
   1391 
   1392   output_slots = params['output_slots']
   1393   output_slot_keys = set()
   1394   for name, slot in future._output_dict.iteritems():
   1395     output_slot_keys.add(slot.key)
   1396     output_slots[name] = str(slot.key)
   1397 
   1398   params_encoded = json.dumps(params, cls=mr_util.JsonEncoder)
   1399   params_text = None
   1400   params_blob = None
   1401   if len(params_encoded) > _MAX_JSON_SIZE:
   1402     params_blob = _write_json_blob(params_encoded, pipeline.pipeline_id)
   1403   else:
   1404     params_text = params_encoded
   1405 
   1406   return dependent_slots, output_slot_keys, params_text, params_blob
   1407 
   1408 
   1409 class _PipelineContext(object):
   1410   """Internal API for interacting with Pipeline state."""
   1411 
   1412   _gettime = datetime.datetime.utcnow
   1413 
   1414   def __init__(self,
   1415                task_name,
   1416                queue_name,
   1417                base_path):
   1418     """Initializer.
   1419 
   1420     Args:
   1421       task_name: The name of the currently running task or empty if there
   1422         is no task running.
   1423       queue_name: The queue this pipeline should run on (may not be the
   1424         current queue this request is on).
   1425       base_path: Relative URL for the pipeline's handlers.
   1426     """
   1427     self.task_name = task_name
   1428     self.queue_name = queue_name
   1429     self.base_path = base_path
   1430     self.barrier_handler_path = '%s/output' % base_path
   1431     self.pipeline_handler_path = '%s/run' % base_path
   1432     self.finalized_handler_path = '%s/finalized' % base_path
   1433     self.fanout_handler_path = '%s/fanout' % base_path
   1434     self.abort_handler_path = '%s/abort' % base_path
   1435     self.fanout_abort_handler_path = '%s/fanout_abort' % base_path
   1436     self.session_filled_output_names = set()
   1437 
   1438   @classmethod
   1439   def from_environ(cls, environ=os.environ):
   1440     """Constructs a _PipelineContext from the task queue environment."""
   1441     base_path, unused = (environ['PATH_INFO'].rsplit('/', 1) + [''])[:2]
   1442     return cls(
   1443         environ['HTTP_X_APPENGINE_TASKNAME'],
   1444         environ['HTTP_X_APPENGINE_QUEUENAME'],
   1445         base_path)
   1446 
   1447   def fill_slot(self, filler_pipeline_key, slot, value):
   1448     """Fills a slot, enqueueing a task to trigger pending barriers.
   1449 
   1450     Args:
   1451       filler_pipeline_key: db.Key or stringified key of the _PipelineRecord
   1452         that filled this slot.
   1453       slot: The Slot instance to fill.
   1454       value: The serializable value to assign.
   1455 
   1456     Raises:
   1457       UnexpectedPipelineError if the _SlotRecord for the 'slot' could not
   1458       be found in the Datastore.
   1459     """
   1460     if not isinstance(filler_pipeline_key, db.Key):
   1461       filler_pipeline_key = db.Key(filler_pipeline_key)
   1462 
   1463     if _TEST_MODE:
   1464       slot._set_value_test(filler_pipeline_key, value)
   1465     else:
   1466       encoded_value = json.dumps(value,
   1467                                        sort_keys=True,
   1468                                        cls=mr_util.JsonEncoder)
   1469       value_text = None
   1470       value_blob = None
   1471       if len(encoded_value) <= _MAX_JSON_SIZE:
   1472         value_text = db.Text(encoded_value)
   1473       else:
   1474         # The encoded value is too big. Save it as a blob.
   1475         value_blob = _write_json_blob(encoded_value, filler_pipeline_key.name())
   1476 
   1477       def txn():
   1478         slot_record = db.get(slot.key)
   1479         if slot_record is None:
   1480           raise UnexpectedPipelineError(
   1481               'Tried to fill missing slot "%s" '
   1482               'by pipeline ID "%s" with value: %r'
   1483               % (slot.key, filler_pipeline_key.name(), value))
   1484         # NOTE: Always take the override value here. If down-stream pipelines
   1485         # need a consitent view of all up-stream outputs (meaning, all of the
   1486         # outputs came from the same retry attempt of the upstream pipeline),
   1487         # the down-stream pipeline must also wait for the 'default' output
   1488         # of these up-stream pipelines.
   1489         slot_record.filler = filler_pipeline_key
   1490         slot_record.value_text = value_text
   1491         slot_record.value_blob = value_blob
   1492         slot_record.status = _SlotRecord.FILLED
   1493         slot_record.fill_time = self._gettime()
   1494         slot_record.put()
   1495         task = taskqueue.Task(
   1496             url=self.barrier_handler_path,
   1497             params=dict(
   1498                 slot_key=slot.key,
   1499                 use_barrier_indexes=True),
   1500             headers={'X-Ae-Slot-Key': slot.key,
   1501                      'X-Ae-Filler-Pipeline-Key': filler_pipeline_key})
   1502         task.add(queue_name=self.queue_name, transactional=True)
   1503       db.run_in_transaction_options(
   1504           db.create_transaction_options(propagation=db.ALLOWED), txn)
   1505 
   1506     self.session_filled_output_names.add(slot.name)
   1507 
   1508   def notify_barriers(self,
   1509                       slot_key,
   1510                       cursor,
   1511                       use_barrier_indexes,
   1512                       max_to_notify=_MAX_BARRIERS_TO_NOTIFY):
   1513     """Searches for barriers affected by a slot and triggers completed ones.
   1514 
   1515     Args:
   1516       slot_key: db.Key or stringified key of the _SlotRecord that was filled.
   1517       cursor: Stringified Datastore cursor where the notification query
   1518         should pick up.
   1519       use_barrier_indexes: When True, use _BarrierIndex records to determine
   1520         which _Barriers to trigger by having this _SlotRecord filled. When
   1521         False, use the old method that queries for _BarrierRecords by
   1522         the blocking_slots parameter.
   1523       max_to_notify: Used for testing.
   1524 
   1525     Raises:
   1526       PipelineStatusError: If any of the barriers are in a bad state.
   1527     """
   1528     if not isinstance(slot_key, db.Key):
   1529       slot_key = db.Key(slot_key)
   1530     logging.debug('Notifying slot %r', slot_key)
   1531 
   1532     if use_barrier_indexes:
   1533       # Please see models.py:_BarrierIndex to understand how _BarrierIndex
   1534       # entities relate to _BarrierRecord entities.
   1535       query = (
   1536           _BarrierIndex.all(cursor=cursor, keys_only=True)
   1537           .ancestor(slot_key))
   1538       barrier_index_list = query.fetch(max_to_notify)
   1539       barrier_key_list = [
   1540           _BarrierIndex.to_barrier_key(key) for key in barrier_index_list]
   1541 
   1542       # If there are task and pipeline kickoff retries it's possible for a
   1543       # _BarrierIndex to exist for a _BarrierRecord that was not successfully
   1544       # written. It's safe to ignore this because the original task that wrote
   1545       # the _BarrierIndex and _BarrierRecord would not have made progress to
   1546       # kick off a real pipeline or child pipeline unless all of the writes for
   1547       # these dependent entities went through. We assume that the instigator
   1548       # retried from scratch and somehwere there exists a good _BarrierIndex and
   1549       # corresponding _BarrierRecord that tries to accomplish the same thing.
   1550       barriers = db.get(barrier_key_list)
   1551       results = []
   1552       for barrier_key, barrier in zip(barrier_key_list, barriers):
   1553         if barrier is None:
   1554           logging.debug('Ignoring that Barrier "%r" is missing, '
   1555                         'relies on Slot "%r"', barrier_key, slot_key)
   1556         else:
   1557           results.append(barrier)
   1558     else:
   1559       # TODO(user): Delete this backwards compatible codepath and
   1560       # make use_barrier_indexes the assumed default in all cases.
   1561       query = (
   1562           _BarrierRecord.all(cursor=cursor)
   1563           .filter('blocking_slots =', slot_key))
   1564       results = query.fetch(max_to_notify)
   1565 
   1566     # Fetch all blocking _SlotRecords for any potentially triggered barriers.
   1567     blocking_slot_keys = []
   1568     for barrier in results:
   1569       blocking_slot_keys.extend(barrier.blocking_slots)
   1570 
   1571     blocking_slot_dict = {}
   1572     for slot_record in db.get(blocking_slot_keys):
   1573       if slot_record is None:
   1574         continue
   1575       blocking_slot_dict[slot_record.key()] = slot_record
   1576 
   1577     task_list = []
   1578     updated_barriers = []
   1579     for barrier in results:
   1580       ready_slots = []
   1581       for blocking_slot_key in barrier.blocking_slots:
   1582         slot_record = blocking_slot_dict.get(blocking_slot_key)
   1583         if slot_record is None:
   1584           raise UnexpectedPipelineError(
   1585               'Barrier "%r" relies on Slot "%r" which is missing.' %
   1586               (barrier.key(), blocking_slot_key))
   1587         if slot_record.status == _SlotRecord.FILLED:
   1588           ready_slots.append(blocking_slot_key)
   1589 
   1590       # When all of the blocking_slots have been filled, consider the barrier
   1591       # ready to trigger. We'll trigger it regardless of the current
   1592       # _BarrierRecord status, since there could be task queue failures at any
   1593       # point in this flow; this rolls forward the state and de-dupes using
   1594       # the task name tombstones.
   1595       pending_slots = set(barrier.blocking_slots) - set(ready_slots)
   1596       if not pending_slots:
   1597         if barrier.status != _BarrierRecord.FIRED:
   1598           barrier.status = _BarrierRecord.FIRED
   1599           barrier.trigger_time = self._gettime()
   1600           updated_barriers.append(barrier)
   1601 
   1602         purpose = barrier.key().name()
   1603         if purpose == _BarrierRecord.START:
   1604           path = self.pipeline_handler_path
   1605           countdown = None
   1606         else:
   1607           path = self.finalized_handler_path
   1608           # NOTE: Wait one second before finalization to prevent
   1609           # contention on the _PipelineRecord entity.
   1610           countdown = 1
   1611         pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier)
   1612         logging.debug('Firing barrier %r', barrier.key())
   1613         task_list.append(taskqueue.Task(
   1614             url=path,
   1615             countdown=countdown,
   1616             name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose),
   1617             params=dict(pipeline_key=pipeline_key, purpose=purpose),
   1618             headers={'X-Ae-Pipeline-Key': pipeline_key}))
   1619       else:
   1620         logging.debug('Not firing barrier %r, Waiting for slots: %r',
   1621                       barrier.key(), pending_slots)
   1622 
   1623     # Blindly overwrite _BarrierRecords that have an updated status. This is
   1624     # acceptable because by this point all finalization barriers for
   1625     # generator children should have already had their final outputs assigned.
   1626     if updated_barriers:
   1627       db.put(updated_barriers)
   1628 
   1629     # Task continuation with sequence number to prevent fork-bombs.
   1630     if len(results) == max_to_notify:
   1631       the_match = re.match('(.*)-ae-barrier-notify-([0-9]+)', self.task_name)
   1632       if the_match:
   1633         prefix = the_match.group(1)
   1634         end = int(the_match.group(2)) + 1
   1635       else:
   1636         prefix = self.task_name
   1637         end = 0
   1638       task_list.append(taskqueue.Task(
   1639           name='%s-ae-barrier-notify-%d' % (prefix, end),
   1640           url=self.barrier_handler_path,
   1641           params=dict(
   1642               slot_key=slot_key,
   1643               cursor=query.cursor(),
   1644               use_barrier_indexes=use_barrier_indexes)))
   1645 
   1646     if task_list:
   1647       try:
   1648         taskqueue.Queue(self.queue_name).add(task_list)
   1649       except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError):
   1650         pass
   1651 
   1652   def begin_abort(self, root_pipeline_key, abort_message):
   1653     """Kicks off the abort process for a root pipeline and all its children.
   1654 
   1655     Args:
   1656       root_pipeline_key: db.Key of the root pipeline to abort.
   1657       abort_message: Message explaining why the abort happened, only saved
   1658           into the root pipeline.
   1659 
   1660     Returns:
   1661       True if the abort signal was sent successfully; False otherwise.
   1662     """
   1663     def txn():
   1664       pipeline_record = db.get(root_pipeline_key)
   1665       if pipeline_record is None:
   1666         logging.warning(
   1667             'Tried to abort root pipeline ID "%s" but it does not exist.',
   1668             root_pipeline_key.name())
   1669         raise db.Rollback()
   1670       if pipeline_record.status == _PipelineRecord.ABORTED:
   1671         logging.warning(
   1672             'Tried to abort root pipeline ID "%s"; already in state: %s',
   1673             root_pipeline_key.name(), pipeline_record.status)
   1674         raise db.Rollback()
   1675       if pipeline_record.abort_requested:
   1676         logging.warning(
   1677             'Tried to abort root pipeline ID "%s"; abort signal already sent.',
   1678             root_pipeline_key.name())
   1679         raise db.Rollback()
   1680 
   1681       pipeline_record.abort_requested = True
   1682       pipeline_record.abort_message = abort_message
   1683       pipeline_record.put()
   1684 
   1685       task = taskqueue.Task(
   1686           url=self.fanout_abort_handler_path,
   1687           params=dict(root_pipeline_key=root_pipeline_key))
   1688       task.add(queue_name=self.queue_name, transactional=True)
   1689       return True
   1690 
   1691     return db.run_in_transaction(txn)
   1692 
   1693   def continue_abort(self,
   1694                      root_pipeline_key,
   1695                      cursor=None,
   1696                      max_to_notify=_MAX_ABORTS_TO_BEGIN):
   1697     """Sends the abort signal to all children for a root pipeline.
   1698 
   1699     Args:
   1700       root_pipeline_key: db.Key of the root pipeline to abort.
   1701       cursor: The query cursor for enumerating _PipelineRecords when inserting
   1702         tasks to cause child pipelines to terminate.
   1703       max_to_notify: Used for testing.
   1704     """
   1705     if not isinstance(root_pipeline_key, db.Key):
   1706       root_pipeline_key = db.Key(root_pipeline_key)
   1707     # NOTE: The results of this query may include _PipelineRecord instances
   1708     # that are not actually "reachable", meaning you cannot get to them by
   1709     # starting at the root pipeline and following "fanned_out" onward. This
   1710     # is acceptable because even these defunct _PipelineRecords will properly
   1711     # set their status to ABORTED when the signal comes, regardless of any
   1712     # other status they may have had.
   1713     #
   1714     # The only gotcha here is if a Pipeline's finalize method somehow modifies
   1715     # its inputs (like deleting an input file). In the case there are
   1716     # unreachable child pipelines, it will appear as if two finalize methods
   1717     # have been called instead of just one. The saving grace here is that
   1718     # finalize must be idempotent, so this *should* be harmless.
   1719     query = (
   1720         _PipelineRecord.all(cursor=cursor)
   1721         .filter('root_pipeline =', root_pipeline_key))
   1722     results = query.fetch(max_to_notify)
   1723 
   1724     task_list = []
   1725     for pipeline_record in results:
   1726       if pipeline_record.status not in (
   1727           _PipelineRecord.RUN, _PipelineRecord.WAITING):
   1728         continue
   1729 
   1730       pipeline_key = pipeline_record.key()
   1731       task_list.append(taskqueue.Task(
   1732           name='%s-%s-abort' % (self.task_name, pipeline_key.name()),
   1733           url=self.abort_handler_path,
   1734           params=dict(pipeline_key=pipeline_key, purpose=_BarrierRecord.ABORT),
   1735           headers={'X-Ae-Pipeline-Key': pipeline_key}))
   1736 
   1737     # Task continuation with sequence number to prevent fork-bombs.
   1738     if len(results) == max_to_notify:
   1739       the_match = re.match('(.*)-([0-9]+)', self.task_name)
   1740       if the_match:
   1741         prefix = the_match.group(1)
   1742         end = int(the_match.group(2)) + 1
   1743       else:
   1744         prefix = self.task_name
   1745         end = 0
   1746       task_list.append(taskqueue.Task(
   1747           name='%s-%d' % (prefix, end),
   1748           url=self.fanout_abort_handler_path,
   1749           params=dict(root_pipeline_key=root_pipeline_key,
   1750                       cursor=query.cursor())))
   1751 
   1752     if task_list:
   1753       try:
   1754         taskqueue.Queue(self.queue_name).add(task_list)
   1755       except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError):
   1756         pass
   1757 
   1758   def start(self, pipeline, return_task=True, countdown=None, eta=None):
   1759     """Starts a pipeline.
   1760 
   1761     Args:
   1762       pipeline: Pipeline instance to run.
   1763       return_task: When True, do not submit the task to start the pipeline
   1764         but instead return it for someone else to enqueue.
   1765       countdown: Time in seconds into the future that this Task should execute.
   1766         Defaults to zero.
   1767       eta: A datetime.datetime specifying the absolute time at which the task
   1768         should be executed. Must not be specified if 'countdown' is specified.
   1769         This may be timezone-aware or timezone-naive. If None, defaults to now.
   1770         For pull tasks, no worker will be able to lease this task before the
   1771         time indicated by eta.
   1772 
   1773     Returns:
   1774       The task to start this pipeline if return_task was True.
   1775 
   1776     Raises:
   1777       PipelineExistsError if the pipeline with the given ID already exists.
   1778     """
   1779     # Adjust all pipeline output keys for this Pipeline to be children of
   1780     # the _PipelineRecord, that way we can write them all and submit in a
   1781     # single transaction.
   1782     for name, slot in pipeline.outputs._output_dict.iteritems():
   1783       slot.key = db.Key.from_path(
   1784           *slot.key.to_path(), **dict(parent=pipeline._pipeline_key))
   1785 
   1786     _, output_slots, params_text, params_blob = _generate_args(
   1787         pipeline, pipeline.outputs, self.queue_name, self.base_path)
   1788 
   1789     @db.transactional(propagation=db.INDEPENDENT)
   1790     def txn():
   1791       pipeline_record = db.get(pipeline._pipeline_key)
   1792       if pipeline_record is not None:
   1793         raise PipelineExistsError(
   1794             'Pipeline with idempotence key "%s" already exists; params=%s' %
   1795             (pipeline._pipeline_key.name(),
   1796              _short_repr(pipeline_record.params)))
   1797 
   1798       entities_to_put = []
   1799       for name, slot in pipeline.outputs._output_dict.iteritems():
   1800         entities_to_put.append(_SlotRecord(
   1801             key=slot.key,
   1802             root_pipeline=pipeline._pipeline_key))
   1803 
   1804       entities_to_put.append(_PipelineRecord(
   1805           key=pipeline._pipeline_key,
   1806           root_pipeline=pipeline._pipeline_key,
   1807           is_root_pipeline=True,
   1808           # Bug in DB means we need to use the storage name here,
   1809           # not the local property name.
   1810           params=params_text,
   1811           params_blob=params_blob,
   1812           start_time=self._gettime(),
   1813           class_path=pipeline._class_path,
   1814           max_attempts=pipeline.max_attempts))
   1815 
   1816       entities_to_put.extend(_PipelineContext._create_barrier_entities(
   1817           pipeline._pipeline_key,
   1818           pipeline._pipeline_key,
   1819           _BarrierRecord.FINALIZE,
   1820           output_slots))
   1821 
   1822       db.put(entities_to_put)
   1823 
   1824       task = taskqueue.Task(
   1825           url=self.pipeline_handler_path,
   1826           params=dict(pipeline_key=pipeline._pipeline_key),
   1827           headers={'X-Ae-Pipeline-Key': pipeline._pipeline_key},
   1828           target=pipeline.target,
   1829           countdown=countdown,
   1830           eta=eta)
   1831       if return_task:
   1832         return task
   1833       task.add(queue_name=self.queue_name, transactional=True)
   1834 
   1835     task = txn()
   1836     # Immediately mark the output slots as existing so they can be filled
   1837     # by asynchronous pipelines or used in test mode.
   1838     for output_slot in pipeline.outputs._output_dict.itervalues():
   1839       output_slot._exists = True
   1840     return task
   1841 
   1842   def start_test(self, pipeline):
   1843     """Starts a pipeline in the test mode.
   1844 
   1845     Args:
   1846       pipeline: The Pipeline instance to test.
   1847     """
   1848     global _TEST_MODE, _TEST_ROOT_PIPELINE_KEY
   1849     self.start(pipeline, return_task=True)
   1850     _TEST_MODE = True
   1851     _TEST_ROOT_PIPELINE_KEY = pipeline._pipeline_key
   1852     try:
   1853       self.evaluate_test(pipeline, root=True)
   1854     finally:
   1855       _TEST_MODE = False
   1856 
   1857   def evaluate_test(self, stage, root=False):
   1858     """Recursively evaluates the given pipeline in test mode.
   1859 
   1860     Args:
   1861       stage: The Pipeline instance to run at this stage in the flow.
   1862       root: True if the supplied stage is the root of the pipeline.
   1863     """
   1864     args_adjusted = []
   1865     for arg in stage.args:
   1866       if isinstance(arg, PipelineFuture):
   1867         arg = arg.default
   1868       if isinstance(arg, Slot):
   1869         value = arg.value
   1870         arg._touched = True
   1871       else:
   1872         value = arg
   1873       args_adjusted.append(value)
   1874 
   1875     kwargs_adjusted = {}
   1876     for name, arg in stage.kwargs.iteritems():
   1877       if isinstance(arg, PipelineFuture):
   1878         arg = arg.default
   1879       if isinstance(arg, Slot):
   1880         value = arg.value
   1881         arg._touched = True
   1882       else:
   1883         value = arg
   1884       kwargs_adjusted[name] = value
   1885 
   1886     stage.args, stage.kwargs = args_adjusted, kwargs_adjusted
   1887     pipeline_generator = mr_util.is_generator_function(stage.run)
   1888     logging.debug('Running %s(*%s, **%s)', stage._class_path,
   1889                   _short_repr(stage.args), _short_repr(stage.kwargs))
   1890 
   1891     if stage.async:
   1892       stage.run_test(*stage.args, **stage.kwargs)
   1893     elif pipeline_generator:
   1894       all_output_slots = set()
   1895       try:
   1896         pipeline_iter = stage.run_test(*stage.args, **stage.kwargs)
   1897       except NotImplementedError:
   1898         pipeline_iter = stage.run(*stage.args, **stage.kwargs)
   1899 
   1900       all_substages = set()
   1901       next_value = None
   1902       last_sub_stage = None
   1903       while True:
   1904         try:
   1905           yielded = pipeline_iter.send(next_value)
   1906         except StopIteration:
   1907           break
   1908 
   1909         if isinstance(yielded, Pipeline):
   1910           if yielded in all_substages:
   1911             raise UnexpectedPipelineError(
   1912                 'Already yielded pipeline object %r' % yielded)
   1913           else:
   1914             all_substages.add(yielded)
   1915 
   1916           last_sub_stage = yielded
   1917           next_value = yielded.outputs
   1918           all_output_slots.update(next_value._output_dict.itervalues())
   1919         else:
   1920           raise UnexpectedPipelineError(
   1921               'Yielded a disallowed value: %r' % yielded)
   1922 
   1923       if last_sub_stage:
   1924         # Generator's outputs inherited from last running sub-stage.
   1925         # If the generator changes its mind and doesn't yield anything, this
   1926         # may not happen at all. Missing outputs will be caught when they
   1927         # are passed to the stage as inputs, or verified from the outside by
   1928         # the test runner.
   1929         for slot_name, slot in last_sub_stage.outputs._output_dict.iteritems():
   1930           stage.outputs._output_dict[slot_name] = slot
   1931           # Any inherited slots won't be checked for declaration.
   1932           all_output_slots.remove(slot)
   1933       else:
   1934         # Generator yielded no children, so treat it as a sync function.
   1935         stage.outputs.default._set_value_test(stage._pipeline_key, None)
   1936 
   1937       # Enforce the policy of requiring all undeclared output slots from
   1938       # child pipelines to be consumed by their parent generator.
   1939       for slot in all_output_slots:
   1940         if slot.name == 'default':
   1941           continue
   1942         if slot.filled and not slot._strict and not slot._touched:
   1943           raise SlotNotDeclaredError(
   1944               'Undeclared output "%s"; all dynamic outputs from child '
   1945               'pipelines must be consumed.' % slot.name)
   1946     else:
   1947       try:
   1948         result = stage.run_test(*stage.args, **stage.kwargs)
   1949       except NotImplementedError:
   1950         result = stage.run(*stage.args, **stage.kwargs)
   1951       stage.outputs.default._set_value_test(stage._pipeline_key, result)
   1952 
   1953     # Enforce strict output usage at the top level.
   1954     if root:
   1955       found_outputs = set()
   1956       for slot in stage.outputs._output_dict.itervalues():
   1957         if slot.filled:
   1958           found_outputs.add(slot.name)
   1959         if slot.name == 'default':
   1960           continue
   1961         if slot.name not in stage.output_names:
   1962           raise SlotNotDeclaredError(
   1963               'Undeclared output from root pipeline "%s"' % slot.name)
   1964 
   1965       missing_outputs = set(stage.output_names) - found_outputs
   1966       if missing_outputs:
   1967         raise SlotNotFilledError(
   1968             'Outputs %r were never filled.' % missing_outputs)
   1969 
   1970     logging.debug('Finalizing %s(*%s, **%s)', stage._class_path,
   1971                   _short_repr(stage.args), _short_repr(stage.kwargs))
   1972     ran = False
   1973     try:
   1974       stage.finalized_test()
   1975       ran = True
   1976     except NotImplementedError:
   1977       pass
   1978     if not ran:
   1979       try:
   1980         stage.finalized()
   1981       except NotImplementedError:
   1982         pass
   1983 
   1984   def evaluate(self, pipeline_key, purpose=None, attempt=0):
   1985     """Evaluates the given Pipeline and enqueues sub-stages for execution.
   1986 
   1987     Args:
   1988       pipeline_key: The db.Key or stringified key of the _PipelineRecord to run.
   1989       purpose: Why evaluate was called ('start', 'finalize', or 'abort').
   1990       attempt: The attempt number that should be tried.
   1991     """
   1992     After._thread_init()
   1993     InOrder._thread_init()
   1994     InOrder._local._activated = False
   1995 
   1996     if not isinstance(pipeline_key, db.Key):
   1997       pipeline_key = db.Key(pipeline_key)
   1998     pipeline_record = db.get(pipeline_key)
   1999     if pipeline_record is None:
   2000       logging.error('Pipeline ID "%s" does not exist.', pipeline_key.name())
   2001       return
   2002     if pipeline_record.status not in (
   2003         _PipelineRecord.WAITING, _PipelineRecord.RUN):
   2004       logging.error('Pipeline ID "%s" in bad state for purpose "%s": "%s"',
   2005                     pipeline_key.name(), purpose or _BarrierRecord.START,
   2006                     pipeline_record.status)
   2007       return
   2008 
   2009     params = pipeline_record.params
   2010     root_pipeline_key = \
   2011         _PipelineRecord.root_pipeline.get_value_for_datastore(pipeline_record)
   2012     default_slot_key = db.Key(params['output_slots']['default'])
   2013 
   2014     default_slot_record, root_pipeline_record = db.get([
   2015         default_slot_key, root_pipeline_key])
   2016     if default_slot_record is None:
   2017       logging.error('Pipeline ID "%s" default slot "%s" does not exist.',
   2018                     pipeline_key.name(), default_slot_key)
   2019       return
   2020     if root_pipeline_record is None:
   2021       logging.error('Pipeline ID "%s" root pipeline ID "%s" is missing.',
   2022                     pipeline_key.name(), root_pipeline_key.name())
   2023       return
   2024 
   2025     # Always finalize if we're aborting so pipelines have a chance to cleanup
   2026     # before they terminate. Pipelines must access 'was_aborted' to find
   2027     # out how their finalization should work.
   2028     abort_signal = (
   2029         purpose == _BarrierRecord.ABORT or
   2030         root_pipeline_record.abort_requested == True)
   2031     finalize_signal = (
   2032         (default_slot_record.status == _SlotRecord.FILLED and
   2033          purpose == _BarrierRecord.FINALIZE) or abort_signal)
   2034 
   2035     try:
   2036       pipeline_func_class = mr_util.for_name(pipeline_record.class_path)
   2037     except ImportError, e:
   2038       # This means something is wrong with the deployed code. Rely on the
   2039       # taskqueue system to do retries.
   2040       retry_message = '%s: %s' % (e.__class__.__name__, str(e))
   2041       logging.exception(
   2042           'Could not locate %s#%s. %s',
   2043           pipeline_record.class_path, pipeline_key.name(), retry_message)
   2044       raise
   2045 
   2046     try:
   2047       pipeline_func = pipeline_func_class.from_id(
   2048           pipeline_key.name(),
   2049           resolve_outputs=finalize_signal,
   2050           _pipeline_record=pipeline_record)
   2051     except SlotNotFilledError, e:
   2052       logging.exception(
   2053           'Could not resolve arguments for %s#%s. Most likely this means there '
   2054           'is a bug in the Pipeline runtime or some intermediate data has been '
   2055           'deleted from the Datastore. Giving up.',
   2056           pipeline_record.class_path, pipeline_key.name())
   2057       self.transition_aborted(pipeline_key)
   2058       return
   2059     except Exception, e:
   2060       retry_message = '%s: %s' % (e.__class__.__name__, str(e))
   2061       logging.exception(
   2062           'Instantiating %s#%s raised exception. %s',
   2063           pipeline_record.class_path, pipeline_key.name(), retry_message)
   2064       self.transition_retry(pipeline_key, retry_message)
   2065       if pipeline_record.params['task_retry']:
   2066         raise
   2067       else:
   2068         return
   2069     else:
   2070       pipeline_generator = mr_util.is_generator_function(
   2071           pipeline_func_class.run)
   2072       caller_output = pipeline_func.outputs
   2073 
   2074     if (abort_signal and pipeline_func.async and
   2075         pipeline_record.status == _PipelineRecord.RUN
   2076         and not pipeline_func.try_cancel()):
   2077       logging.warning(
   2078           'Could not cancel and abort mid-flight async pipeline: %r#%s',
   2079           pipeline_func, pipeline_key.name())
   2080       return
   2081 
   2082     if finalize_signal:
   2083       try:
   2084         pipeline_func._finalized_internal(
   2085               self, pipeline_key, root_pipeline_key,
   2086               caller_output, abort_signal)
   2087       except Exception, e:
   2088         # This means something is wrong with the deployed finalization code.
   2089         # Rely on the taskqueue system to do retries.
   2090         retry_message = '%s: %s' % (e.__class__.__name__, str(e))
   2091         logging.exception('Finalizing %r#%s raised exception. %s',
   2092                           pipeline_func, pipeline_key.name(), retry_message)
   2093         raise
   2094       else:
   2095         if not abort_signal:
   2096           self.transition_complete(pipeline_key)
   2097           return
   2098 
   2099     if abort_signal:
   2100       logging.debug('Marking as aborted %s#%s', pipeline_func,
   2101                     pipeline_key.name())
   2102       self.transition_aborted(pipeline_key)
   2103       return
   2104 
   2105     if pipeline_record.current_attempt != attempt:
   2106       logging.error(
   2107           'Received evaluation task for pipeline ID "%s" attempt %d but '
   2108           'current pending attempt is %d', pipeline_key.name(), attempt,
   2109           pipeline_record.current_attempt)
   2110       return
   2111 
   2112     if pipeline_record.current_attempt >= pipeline_record.max_attempts:
   2113       logging.error(
   2114           'Received evaluation task for pipeline ID "%s" on attempt %d '
   2115           'but that exceeds max attempts %d', pipeline_key.name(), attempt,
   2116           pipeline_record.max_attempts)
   2117       return
   2118 
   2119     if pipeline_record.next_retry_time is not None:
   2120       retry_time = pipeline_record.next_retry_time - _RETRY_WIGGLE_TIMEDELTA
   2121       if self._gettime() <= retry_time:
   2122         detail_message = (
   2123             'Received evaluation task for pipeline ID "%s" on attempt %d, '
   2124             'which will not be ready until: %s' % (pipeline_key.name(),
   2125             pipeline_record.current_attempt, pipeline_record.next_retry_time))
   2126         logging.warning(detail_message)
   2127         raise UnexpectedPipelineError(detail_message)
   2128 
   2129     if pipeline_record.status == _PipelineRecord.RUN and pipeline_generator:
   2130       if (default_slot_record.status == _SlotRecord.WAITING and
   2131           not pipeline_record.fanned_out):
   2132         # This properly handles the yield-less generator case when the
   2133         # RUN state transition worked properly but outputting to the default
   2134         # slot failed.
   2135         self.fill_slot(pipeline_key, caller_output.default, None)
   2136       return
   2137 
   2138     if (pipeline_record.status == _PipelineRecord.WAITING and
   2139         pipeline_func.async):
   2140       self.transition_run(pipeline_key)
   2141 
   2142     try:
   2143       result = pipeline_func._run_internal(
   2144           self, pipeline_key, root_pipeline_key, caller_output)
   2145     except Exception, e:
   2146       if self.handle_run_exception(pipeline_key, pipeline_func, e):
   2147         raise
   2148       else:
   2149         return
   2150 
   2151     if pipeline_func.async:
   2152       return
   2153 
   2154     if not pipeline_generator:
   2155       # Catch any exceptions that are thrown when the pipeline's return
   2156       # value is being serialized. This ensures that serialization errors
   2157       # will cause normal abort/retry behavior.
   2158       try:
   2159         self.fill_slot(pipeline_key, caller_output.default, result)
   2160       except Exception, e:
   2161         retry_message = 'Bad return value. %s: %s' % (
   2162             e.__class__.__name__, str(e))
   2163         logging.exception(
   2164             'Generator %r#%s caused exception while serializing return '
   2165             'value %r. %s', pipeline_func, pipeline_key.name(), result,
   2166             retry_message)
   2167         self.transition_retry(pipeline_key, retry_message)
   2168         if pipeline_func.task_retry:
   2169           raise
   2170         else:
   2171           return
   2172 
   2173       expected_outputs = set(caller_output._output_dict.iterkeys())
   2174       found_outputs = self.session_filled_output_names
   2175       if expected_outputs != found_outputs:
   2176         exception = SlotNotFilledError(
   2177             'Outputs %r for pipeline ID "%s" were never filled by "%s".' % (
   2178             expected_outputs - found_outputs,
   2179             pipeline_key.name(), pipeline_func._class_path))
   2180         if self.handle_run_exception(pipeline_key, pipeline_func, exception):
   2181           raise exception
   2182       return
   2183 
   2184     pipeline_iter = result
   2185     next_value = None
   2186     last_sub_stage = None
   2187     sub_stage = None
   2188     sub_stage_dict = {}
   2189     sub_stage_ordering = []
   2190 
   2191     while True:
   2192       try:
   2193         yielded = pipeline_iter.send(next_value)
   2194       except StopIteration:
   2195         break
   2196       except Exception, e:
   2197         if self.handle_run_exception(pipeline_key, pipeline_func, e):
   2198           raise
   2199         else:
   2200           return
   2201 
   2202       if isinstance(yielded, Pipeline):
   2203         if yielded in sub_stage_dict:
   2204           raise UnexpectedPipelineError(
   2205               'Already yielded pipeline object %r with pipeline ID %s' %
   2206               (yielded, yielded.pipeline_id))
   2207 
   2208         last_sub_stage = yielded
   2209         next_value = PipelineFuture(yielded.output_names)
   2210         next_value._after_all_pipelines.update(After._local._after_all_futures)
   2211         next_value._after_all_pipelines.update(InOrder._local._in_order_futures)
   2212         sub_stage_dict[yielded] = next_value
   2213         sub_stage_ordering.append(yielded)
   2214         InOrder._add_future(next_value)
   2215 
   2216         # To aid local testing, the task_retry flag (which instructs the
   2217         # evaluator to raise all exceptions back up to the task queue) is
   2218         # inherited by all children from the root down.
   2219         yielded.task_retry = pipeline_func.task_retry
   2220       else:
   2221         raise UnexpectedPipelineError(
   2222             'Yielded a disallowed value: %r' % yielded)
   2223 
   2224     if last_sub_stage:
   2225       # Final yielded stage inherits outputs from calling pipeline that were not
   2226       # already filled during the generator's execution.
   2227       inherited_outputs = params['output_slots']
   2228       for slot_name in self.session_filled_output_names:
   2229         del inherited_outputs[slot_name]
   2230       sub_stage_dict[last_sub_stage]._inherit_outputs(
   2231           pipeline_record.class_path, inherited_outputs)
   2232     else:
   2233       # Here the generator has yielded nothing, and thus acts as a synchronous
   2234       # function. We can skip the rest of the generator steps completely and
   2235       # fill the default output slot to cause finalizing.
   2236       expected_outputs = set(caller_output._output_dict.iterkeys())
   2237       expected_outputs.remove('default')
   2238       found_outputs = self.session_filled_output_names
   2239       if expected_outputs != found_outputs:
   2240         exception = SlotNotFilledError(
   2241             'Outputs %r for pipeline ID "%s" were never filled by "%s".' % (
   2242             expected_outputs - found_outputs,
   2243             pipeline_key.name(), pipeline_func._class_path))
   2244         if self.handle_run_exception(pipeline_key, pipeline_func, exception):
   2245           raise exception
   2246       else:
   2247         self.fill_slot(pipeline_key, caller_output.default, None)
   2248         self.transition_run(pipeline_key)
   2249       return
   2250 
   2251     # Allocate any SlotRecords that do not yet exist.
   2252     entities_to_put = []
   2253     for future in sub_stage_dict.itervalues():
   2254       for slot in future._output_dict.itervalues():
   2255         if not slot._exists:
   2256           entities_to_put.append(_SlotRecord(
   2257               key=slot.key, root_pipeline=root_pipeline_key))
   2258 
   2259     # Allocate PipelineRecords and BarrierRecords for generator-run Pipelines.
   2260     pipelines_to_run = set()
   2261     all_children_keys = []
   2262     all_output_slots = set()
   2263     for sub_stage in sub_stage_ordering:
   2264       future = sub_stage_dict[sub_stage]
   2265 
   2266       # Catch any exceptions that are thrown when the pipeline's parameters
   2267       # are being serialized. This ensures that serialization errors will
   2268       # cause normal retry/abort behavior.
   2269       try:
   2270         dependent_slots, output_slots, params_text, params_blob = \
   2271             _generate_args(sub_stage, future, self.queue_name, self.base_path)
   2272       except Exception, e:
   2273         retry_message = 'Bad child arguments. %s: %s' % (
   2274             e.__class__.__name__, str(e))
   2275         logging.exception(
   2276             'Generator %r#%s caused exception while serializing args for '
   2277             'child pipeline %r. %s', pipeline_func, pipeline_key.name(),
   2278             sub_stage, retry_message)
   2279         self.transition_retry(pipeline_key, retry_message)
   2280         if pipeline_func.task_retry:
   2281           raise
   2282         else:
   2283           return
   2284 
   2285       child_pipeline_key = db.Key.from_path(
   2286           _PipelineRecord.kind(), uuid.uuid4().hex)
   2287       all_output_slots.update(output_slots)
   2288       all_children_keys.append(child_pipeline_key)
   2289 
   2290       child_pipeline = _PipelineRecord(
   2291           key=child_pipeline_key,
   2292           root_pipeline=root_pipeline_key,
   2293           # Bug in DB means we need to use the storage name here,
   2294           # not the local property name.
   2295           params=params_text,
   2296           params_blob=params_blob,
   2297           class_path=sub_stage._class_path,
   2298           max_attempts=sub_stage.max_attempts)
   2299       entities_to_put.append(child_pipeline)
   2300 
   2301       if not dependent_slots:
   2302         # This child pipeline will run immediately.
   2303         pipelines_to_run.add(child_pipeline_key)
   2304         child_pipeline.start_time = self._gettime()
   2305       else:
   2306         entities_to_put.extend(_PipelineContext._create_barrier_entities(
   2307             root_pipeline_key,
   2308             child_pipeline_key,
   2309             _BarrierRecord.START,
   2310             dependent_slots))
   2311 
   2312       entities_to_put.extend(_PipelineContext._create_barrier_entities(
   2313           root_pipeline_key,
   2314           child_pipeline_key,
   2315           _BarrierRecord.FINALIZE,
   2316           output_slots))
   2317 
   2318     # This generator pipeline's finalization barrier must include all of the
   2319     # outputs of any child pipelines that it runs. This ensures the finalized
   2320     # calls will not happen until all child pipelines have completed.
   2321     #
   2322     # The transition_run() call below will update the FINALIZE _BarrierRecord
   2323     # for this generator pipeline to include all of these child outputs in
   2324     # its list of blocking_slots. That update is done transactionally to
   2325     # make sure the _BarrierRecord only lists the slots that matter.
   2326     #
   2327     # However, the notify_barriers() method doesn't find _BarrierRecords
   2328     # through the blocking_slots field. It finds them through _BarrierIndexes
   2329     # entities. Thus, before we update the FINALIZE _BarrierRecord in
   2330     # transition_run(), we need to write _BarrierIndexes for all child outputs.
   2331     barrier_entities = _PipelineContext._create_barrier_entities(
   2332         root_pipeline_key,
   2333         pipeline_key,
   2334         _BarrierRecord.FINALIZE,
   2335         all_output_slots)
   2336     # Ignore the first element which is the _BarrierRecord. That entity must
   2337     # have already been created and put in the datastore for the parent
   2338     # pipeline before this code generated child pipelines.
   2339     barrier_indexes = barrier_entities[1:]
   2340     entities_to_put.extend(barrier_indexes)
   2341 
   2342     db.put(entities_to_put)
   2343 
   2344     self.transition_run(pipeline_key,
   2345                         blocking_slot_keys=all_output_slots,
   2346                         fanned_out_pipelines=all_children_keys,
   2347                         pipelines_to_run=pipelines_to_run)
   2348 
   2349   @staticmethod
   2350   def _create_barrier_entities(root_pipeline_key,
   2351                                child_pipeline_key,
   2352                                purpose,
   2353                                blocking_slot_keys):
   2354     """Creates all of the entities required for a _BarrierRecord.
   2355 
   2356     Args:
   2357       root_pipeline_key: The root pipeline this is part of.
   2358       child_pipeline_key: The pipeline this barrier is for.
   2359       purpose: _BarrierRecord.START or _BarrierRecord.FINALIZE.
   2360       blocking_slot_keys: Set of db.Keys corresponding to _SlotRecords that
   2361         this barrier should wait on before firing.
   2362 
   2363     Returns:
   2364       List of entities, starting with the _BarrierRecord entity, followed by
   2365       _BarrierIndexes used for firing when _SlotRecords are filled in the same
   2366       order as the blocking_slot_keys list provided. All of these entities
   2367       should be put in the Datastore to ensure the barrier fires properly.
   2368     """
   2369     result = []
   2370 
   2371     blocking_slot_keys = list(blocking_slot_keys)
   2372 
   2373     barrier = _BarrierRecord(
   2374         parent=child_pipeline_key,
   2375         key_name=purpose,
   2376         target=child_pipeline_key,
   2377         root_pipeline=root_pipeline_key,
   2378         blocking_slots=blocking_slot_keys)
   2379 
   2380     result.append(barrier)
   2381 
   2382     for slot_key in blocking_slot_keys:
   2383       barrier_index_path = []
   2384       barrier_index_path.extend(slot_key.to_path())
   2385       barrier_index_path.extend(child_pipeline_key.to_path())
   2386       barrier_index_path.extend([_BarrierIndex.kind(), purpose])
   2387       barrier_index_key = db.Key.from_path(*barrier_index_path)
   2388       barrier_index = _BarrierIndex(
   2389           key=barrier_index_key,
   2390           root_pipeline=root_pipeline_key)
   2391       result.append(barrier_index)
   2392 
   2393     return result
   2394 
   2395   def handle_run_exception(self, pipeline_key, pipeline_func, e):
   2396     """Handles an exception raised by a Pipeline's user code.
   2397 
   2398     Args:
   2399       pipeline_key: The pipeline that raised the error.
   2400       pipeline_func: The class path name of the Pipeline that was running.
   2401       e: The exception that was raised.
   2402 
   2403     Returns:
   2404       True if the exception should be re-raised up through the calling stack
   2405       by the caller of this method.
   2406     """
   2407     if isinstance(e, Retry):
   2408       retry_message = str(e)
   2409       logging.warning('User forced retry for pipeline ID "%s" of %r: %s',
   2410                       pipeline_key.name(), pipeline_func, retry_message)
   2411       self.transition_retry(pipeline_key, retry_message)
   2412     elif isinstance(e, Abort):
   2413       abort_message = str(e)
   2414       logging.warning('User forced abort for pipeline ID "%s" of %r: %s',
   2415                       pipeline_key.name(), pipeline_func, abort_message)
   2416       pipeline_func.abort(abort_message)
   2417     else:
   2418       retry_message = '%s: %s' % (e.__class__.__name__, str(e))
   2419       logging.exception('Generator %r#%s raised exception. %s',
   2420                         pipeline_func, pipeline_key.name(), retry_message)
   2421       self.transition_retry(pipeline_key, retry_message)
   2422 
   2423     return pipeline_func.task_retry
   2424 
   2425   def transition_run(self,
   2426                      pipeline_key,
   2427                      blocking_slot_keys=None,
   2428                      fanned_out_pipelines=None,
   2429                      pipelines_to_run=None):
   2430     """Marks an asynchronous or generator pipeline as running.
   2431 
   2432     Does nothing if the pipeline is no longer in a runnable state.
   2433 
   2434     Args:
   2435       pipeline_key: The db.Key of the _PipelineRecord to update.
   2436       blocking_slot_keys: List of db.Key instances that this pipeline's
   2437         finalization barrier should wait on in addition to the existing one.
   2438         This is used to update the barrier to include all child outputs. When
   2439         None, the barrier will not be updated.
   2440       fanned_out_pipelines: List of db.Key instances of _PipelineRecords that
   2441         were fanned out by this generator pipeline. This is distinct from the
   2442         'pipelines_to_run' list because not all of the pipelines listed here
   2443         will be immediately ready to execute. When None, then this generator
   2444         yielded no children.
   2445       pipelines_to_run: List of db.Key instances of _PipelineRecords that should
   2446         be kicked off (fan-out) transactionally as part of this transition.
   2447         When None, no child pipelines will run. All db.Keys in this list must
   2448         also be present in the fanned_out_pipelines list.
   2449 
   2450     Raises:
   2451       UnexpectedPipelineError if blocking_slot_keys was not empty and the
   2452       _BarrierRecord has gone missing.
   2453     """
   2454     def txn():
   2455       pipeline_record = db.get(pipeline_key)
   2456       if pipeline_record is None:
   2457         logging.warning('Pipeline ID "%s" cannot be marked as run. '
   2458                         'Does not exist.', pipeline_key.name())
   2459         raise db.Rollback()
   2460       if pipeline_record.status != _PipelineRecord.WAITING:
   2461         logging.warning('Pipeline ID "%s" in bad state to be marked as run: %s',
   2462                         pipeline_key.name(), pipeline_record.status)
   2463         raise db.Rollback()
   2464 
   2465       pipeline_record.status = _PipelineRecord.RUN
   2466 
   2467       if fanned_out_pipelines:
   2468         # NOTE: We must model the pipeline relationship in a top-down manner,
   2469         # meaning each pipeline must point forward to the pipelines that it
   2470         # fanned out to. The reason is race conditions. If evaluate()
   2471         # dies early, it may create many unused _PipelineRecord and _SlotRecord
   2472         # instances that never progress. The only way we know which of these
   2473         # are valid is by traversing the graph from the root, where the
   2474         # fanned_out property refers to those pipelines that were run using a
   2475         # transactional task.
   2476         child_pipeline_list = list(fanned_out_pipelines)
   2477         pipeline_record.fanned_out = child_pipeline_list
   2478 
   2479         if pipelines_to_run:
   2480           child_indexes = [
   2481               child_pipeline_list.index(p) for p in pipelines_to_run]
   2482           child_indexes.sort()
   2483           task = taskqueue.Task(
   2484               url=self.fanout_handler_path,
   2485               params=dict(parent_key=str(pipeline_key),
   2486                           child_indexes=child_indexes))
   2487           task.add(queue_name=self.queue_name, transactional=True)
   2488 
   2489       pipeline_record.put()
   2490 
   2491       if blocking_slot_keys:
   2492         # NOTE: Always update a generator pipeline's finalization barrier to
   2493         # include all of the outputs of any pipelines that it runs, to ensure
   2494         # that finalized calls will not happen until all child pipelines have
   2495         # completed. This must happen transactionally with the enqueue of
   2496         # the fan-out kickoff task above to ensure the child output slots and
   2497         # the barrier blocking slots are the same.
   2498         barrier_key = db.Key.from_path(
   2499             _BarrierRecord.kind(), _BarrierRecord.FINALIZE,
   2500             parent=pipeline_key)
   2501         finalize_barrier = db.get(barrier_key)
   2502         if finalize_barrier is None:
   2503           raise UnexpectedPipelineError(
   2504               'Pipeline ID "%s" cannot update finalize barrier. '
   2505               'Does not exist.' % pipeline_key.name())
   2506         else:
   2507           finalize_barrier.blocking_slots = list(
   2508               blocking_slot_keys.union(set(finalize_barrier.blocking_slots)))
   2509           finalize_barrier.put()
   2510 
   2511     db.run_in_transaction(txn)
   2512 
   2513   def transition_complete(self, pipeline_key):
   2514     """Marks the given pipeline as complete.
   2515 
   2516     Does nothing if the pipeline is no longer in a state that can be completed.
   2517 
   2518     Args:
   2519       pipeline_key: db.Key of the _PipelineRecord that has completed.
   2520     """
   2521     def txn():
   2522       pipeline_record = db.get(pipeline_key)
   2523       if pipeline_record is None:
   2524         logging.warning(
   2525             'Tried to mark pipeline ID "%s" as complete but it does not exist.',
   2526             pipeline_key.name())
   2527         raise db.Rollback()
   2528       if pipeline_record.status not in (
   2529           _PipelineRecord.WAITING, _PipelineRecord.RUN):
   2530         logging.warning(
   2531             'Tried to mark pipeline ID "%s" as complete, found bad state: %s',
   2532             pipeline_key.name(), pipeline_record.status)
   2533         raise db.Rollback()
   2534 
   2535       pipeline_record.status = _PipelineRecord.DONE
   2536       pipeline_record.finalized_time = self._gettime()
   2537       pipeline_record.put()
   2538 
   2539     db.run_in_transaction(txn)
   2540 
   2541   def transition_retry(self, pipeline_key, retry_message):
   2542     """Marks the given pipeline as requiring another retry.
   2543 
   2544     Does nothing if all attempts have been exceeded.
   2545 
   2546     Args:
   2547       pipeline_key: db.Key of the _PipelineRecord that needs to be retried.
   2548       retry_message: User-supplied message indicating the reason for the retry.
   2549     """
   2550     def txn():
   2551       pipeline_record = db.get(pipeline_key)
   2552       if pipeline_record is None:
   2553         logging.warning(
   2554             'Tried to retry pipeline ID "%s" but it does not exist.',
   2555             pipeline_key.name())
   2556         raise db.Rollback()
   2557       if pipeline_record.status not in (
   2558           _PipelineRecord.WAITING, _PipelineRecord.RUN):
   2559         logging.warning(
   2560             'Tried to retry pipeline ID "%s", found bad state: %s',
   2561             pipeline_key.name(), pipeline_record.status)
   2562         raise db.Rollback()
   2563 
   2564       params = pipeline_record.params
   2565       offset_seconds = (
   2566           params['backoff_seconds'] *
   2567           (params['backoff_factor'] ** pipeline_record.current_attempt))
   2568       pipeline_record.next_retry_time = (
   2569           self._gettime() + datetime.timedelta(seconds=offset_seconds))
   2570       pipeline_record.current_attempt += 1
   2571       pipeline_record.retry_message = retry_message
   2572       pipeline_record.status = _PipelineRecord.WAITING
   2573 
   2574       if pipeline_record.current_attempt >= pipeline_record.max_attempts:
   2575         root_pipeline_key = (
   2576             _PipelineRecord.root_pipeline.get_value_for_datastore(
   2577                 pipeline_record))
   2578         logging.warning(
   2579             'Giving up on pipeline ID "%s" after %d attempt(s); causing abort '
   2580             'all the way to the root pipeline ID "%s"', pipeline_key.name(),
   2581             pipeline_record.current_attempt, root_pipeline_key.name())
   2582         # NOTE: We do *not* set the status to aborted here to ensure that
   2583         # this pipeline will be finalized before it has been marked as aborted.
   2584         pipeline_record.abort_message = (
   2585             'Aborting after %d attempts' % pipeline_record.current_attempt)
   2586         task = taskqueue.Task(
   2587             url=self.fanout_abort_handler_path,
   2588             params=dict(root_pipeline_key=root_pipeline_key))
   2589         task.add(queue_name=self.queue_name, transactional=True)
   2590       else:
   2591         task = taskqueue.Task(
   2592             url=self.pipeline_handler_path,
   2593             eta=pipeline_record.next_retry_time,
   2594             params=dict(pipeline_key=pipeline_key,
   2595                         purpose=_BarrierRecord.START,
   2596                         attempt=pipeline_record.current_attempt),
   2597             headers={'X-Ae-Pipeline-Key': pipeline_key})
   2598         task.add(queue_name=self.queue_name, transactional=True)
   2599 
   2600       pipeline_record.put()
   2601 
   2602     db.run_in_transaction(txn)
   2603 
   2604   def transition_aborted(self, pipeline_key):
   2605     """Makes the given pipeline as having aborted.
   2606 
   2607     Does nothing if the pipeline is in a bad state.
   2608 
   2609     Args:
   2610       pipeline_key: db.Key of the _PipelineRecord that needs to be retried.
   2611     """
   2612     def txn():
   2613       pipeline_record = db.get(pipeline_key)
   2614       if pipeline_record is None:
   2615         logging.warning(
   2616             'Tried to abort pipeline ID "%s" but it does not exist.',
   2617             pipeline_key.name())
   2618         raise db.Rollback()
   2619       if pipeline_record.status not in (
   2620           _PipelineRecord.WAITING, _PipelineRecord.RUN):
   2621         logging.warning(
   2622             'Tried to abort pipeline ID "%s", found bad state: %s',
   2623             pipeline_key.name(), pipeline_record.status)
   2624         raise db.Rollback()
   2625 
   2626       pipeline_record.status = _PipelineRecord.ABORTED
   2627       pipeline_record.finalized_time = self._gettime()
   2628       pipeline_record.put()
   2629 
   2630     db.run_in_transaction(txn)
   2631 
   2632 ################################################################################
   2633 
   2634 
   2635 class _BarrierHandler(webapp.RequestHandler):
   2636   """Request handler for triggering barriers."""
   2637 
   2638   def post(self):
   2639     if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ:
   2640       self.response.set_status(403)
   2641       return
   2642 
   2643     context = _PipelineContext.from_environ(self.request.environ)
   2644     context.notify_barriers(
   2645         self.request.get('slot_key'),
   2646         self.request.get('cursor'),
   2647         use_barrier_indexes=self.request.get('use_barrier_indexes') == 'True')
   2648 
   2649 
   2650 class _PipelineHandler(webapp.RequestHandler):
   2651   """Request handler for running pipelines."""
   2652 
   2653   def post(self):
   2654     if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ:
   2655       self.response.set_status(403)
   2656       return
   2657 
   2658     context = _PipelineContext.from_environ(self.request.environ)
   2659     context.evaluate(self.request.get('pipeline_key'),
   2660                      purpose=self.request.get('purpose'),
   2661                      attempt=int(self.request.get('attempt', '0')))
   2662 
   2663 
   2664 class _FanoutAbortHandler(webapp.RequestHandler):
   2665   """Request handler for fanning out abort notifications."""
   2666 
   2667   def post(self):
   2668     if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ:
   2669       self.response.set_status(403)
   2670       return
   2671 
   2672     context = _PipelineContext.from_environ(self.request.environ)
   2673     context.continue_abort(
   2674         self.request.get('root_pipeline_key'),
   2675         self.request.get('cursor'))
   2676 
   2677 
   2678 class _FanoutHandler(webapp.RequestHandler):
   2679   """Request handler for fanning out pipeline children."""
   2680 
   2681   def post(self):
   2682     if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ:
   2683       self.response.set_status(403)
   2684       return
   2685 
   2686     context = _PipelineContext.from_environ(self.request.environ)
   2687 
   2688     # Set of stringified db.Keys of children to run.
   2689     all_pipeline_keys = set()
   2690 
   2691     # For backwards compatibility with the old style of fan-out requests.
   2692     all_pipeline_keys.update(self.request.get_all('pipeline_key'))
   2693 
   2694     # Fetch the child pipelines from the parent. This works around the 10KB
   2695     # task payload limit. This get() is consistent-on-read and the fan-out
   2696     # task is enqueued in the transaction that updates the parent, so the
   2697     # fanned_out property is consistent here.
   2698     parent_key = self.request.get('parent_key')
   2699     child_indexes = [int(x) for x in self.request.get_all('child_indexes')]
   2700     if parent_key:
   2701       parent_key = db.Key(parent_key)
   2702       parent = db.get(parent_key)
   2703       for index in child_indexes:
   2704         all_pipeline_keys.add(str(parent.fanned_out[index]))
   2705 
   2706     all_tasks = []
   2707     all_pipelines = db.get([db.Key(pipeline_key) for pipeline_key in all_pipeline_keys])
   2708     for child_pipeline in all_pipelines:
   2709       if child_pipeline is None:
   2710         continue
   2711       pipeline_key = str(child_pipeline.key())
   2712       all_tasks.append(taskqueue.Task(
   2713           url=context.pipeline_handler_path,
   2714           params=dict(pipeline_key=pipeline_key),
   2715           target=child_pipeline.params['target'],
   2716           headers={'X-Ae-Pipeline-Key': pipeline_key},
   2717           name='ae-pipeline-fan-out-' + child_pipeline.key().name()))
   2718 
   2719     batch_size = 100  # Limit of taskqueue API bulk add.
   2720     for i in xrange(0, len(all_tasks), batch_size):
   2721       batch = all_tasks[i:i+batch_size]
   2722       try:
   2723         taskqueue.Queue(context.queue_name).add(batch)
   2724       except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError):
   2725         pass
   2726 
   2727 
   2728 class _CleanupHandler(webapp.RequestHandler):
   2729   """Request handler for cleaning up a Pipeline."""
   2730 
   2731   def post(self):
   2732     if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ:
   2733       self.response.set_status(403)
   2734       return
   2735 
   2736     root_pipeline_key = db.Key(self.request.get('root_pipeline_key'))
   2737     logging.debug('Cleaning up root_pipeline_key=%r', root_pipeline_key)
   2738 
   2739     # TODO(user): Accumulate all BlobKeys from _PipelineRecord and
   2740     # _SlotRecord entities and delete them.
   2741     pipeline_keys = (
   2742         _PipelineRecord.all(keys_only=True)
   2743         .filter('root_pipeline =', root_pipeline_key))
   2744     db.delete(pipeline_keys)
   2745     slot_keys = (
   2746         _SlotRecord.all(keys_only=True)
   2747         .filter('root_pipeline =', root_pipeline_key))
   2748     db.delete(slot_keys)
   2749     barrier_keys = (
   2750         _BarrierRecord.all(keys_only=True)
   2751         .filter('root_pipeline =', root_pipeline_key))
   2752     db.delete(barrier_keys)
   2753     status_keys = (
   2754         _StatusRecord.all(keys_only=True)
   2755         .filter('root_pipeline =', root_pipeline_key))
   2756     db.delete(status_keys)
   2757     barrier_index_keys = (
   2758         _BarrierIndex.all(keys_only=True)
   2759         .filter('root_pipeline =', root_pipeline_key))
   2760     db.delete(barrier_index_keys)
   2761 
   2762 
   2763 class _CallbackHandler(webapp.RequestHandler):
   2764   """Receives asynchronous callback requests from humans or tasks."""
   2765 
   2766   def post(self):
   2767     self.get()
   2768 
   2769   def get(self):
   2770     try:
   2771       self.run_callback()
   2772     except _CallbackTaskError, e:
   2773       logging.error(str(e))
   2774       if 'HTTP_X_APPENGINE_TASKRETRYCOUNT' in self.request.environ:
   2775         # Silently give up on tasks that have retried many times. This
   2776         # probably means that the target pipeline has been deleted, so there's
   2777         # no reason to keep trying this task forever.
   2778         retry_count = int(
   2779             self.request.environ.get('HTTP_X_APPENGINE_TASKRETRYCOUNT'))
   2780         if retry_count > _MAX_CALLBACK_TASK_RETRIES:
   2781           logging.error('Giving up on task after %d retries',
   2782                         _MAX_CALLBACK_TASK_RETRIES)
   2783           return
   2784 
   2785       # NOTE: The undescriptive error code 400 are present to address security
   2786       # risks of giving external users access to cause PipelineRecord lookups
   2787       # and execution.
   2788       self.response.set_status(400)
   2789 
   2790   def run_callback(self):
   2791     """Runs the callback for the pipeline specified in the request.
   2792 
   2793     Raises:
   2794       _CallbackTaskError if something was wrong with the request parameters.
   2795     """
   2796     pipeline_id = self.request.get('pipeline_id')
   2797     if not pipeline_id:
   2798       raise _CallbackTaskError('"pipeline_id" parameter missing.')
   2799 
   2800     pipeline_key = db.Key.from_path(_PipelineRecord.kind(), pipeline_id)
   2801     pipeline_record = db.get(pipeline_key)
   2802     if pipeline_record is None:
   2803       raise _CallbackTaskError(
   2804           'Pipeline ID "%s" for callback does not exist.' % pipeline_id)
   2805 
   2806     params = pipeline_record.params
   2807     real_class_path = params['class_path']
   2808     try:
   2809       pipeline_func_class = mr_util.for_name(real_class_path)
   2810     except ImportError, e:
   2811       raise _CallbackTaskError(
   2812           'Cannot load class named "%s" for pipeline ID "%s".'
   2813           % (real_class_path, pipeline_id))
   2814 
   2815     if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ:
   2816       if pipeline_func_class.public_callbacks:
   2817         pass
   2818       elif pipeline_func_class.admin_callbacks:
   2819         if not users.is_current_user_admin():
   2820           raise _CallbackTaskError(
   2821               'Unauthorized callback for admin-only pipeline ID "%s"'
   2822               % pipeline_id)
   2823       else:
   2824         raise _CallbackTaskError(
   2825             'External callback for internal-only pipeline ID "%s"'
   2826             % pipeline_id)
   2827 
   2828     kwargs = {}
   2829     for key in self.request.arguments():
   2830       if key != 'pipeline_id':
   2831         kwargs[str(key)] = self.request.get(key)
   2832 
   2833     def perform_callback():
   2834       stage = pipeline_func_class.from_id(pipeline_id)
   2835       if stage is None:
   2836         raise _CallbackTaskError(
   2837             'Pipeline ID "%s" deleted during callback' % pipeline_id)
   2838       return stage._callback_internal(kwargs)
   2839 
   2840     # callback_xg_transaction is a 3-valued setting (None=no trans,
   2841     # False=1-eg-trans, True=xg-trans)
   2842     if pipeline_func_class._callback_xg_transaction is not None:
   2843       transaction_options = db.create_transaction_options(
   2844           xg=pipeline_func_class._callback_xg_transaction)
   2845       callback_result = db.run_in_transaction_options(transaction_options,
   2846                                                       perform_callback)
   2847     else:
   2848       callback_result = perform_callback()
   2849 
   2850     if callback_result is not None:
   2851       status_code, content_type, content = callback_result
   2852       self.response.set_status(status_code)
   2853       self.response.headers['Content-Type'] = content_type
   2854       self.response.out.write(content)
   2855 
   2856 
   2857 ################################################################################
   2858 
   2859 def _get_timestamp_ms(when):
   2860   """Converts a datetime.datetime to integer milliseconds since the epoch.
   2861 
   2862   Requires special handling to preserve microseconds.
   2863 
   2864   Args:
   2865     when: A datetime.datetime instance.
   2866 
   2867   Returns:
   2868     Integer time since the epoch in milliseconds. If the supplied 'when' is
   2869     None, the return value will be None.
   2870   """
   2871   if when is None:
   2872     return None
   2873   ms_since_epoch = float(time.mktime(when.utctimetuple()) * 1000.0)
   2874   ms_since_epoch += when.microsecond / 1000.0
   2875   return int(ms_since_epoch)
   2876 
   2877 
   2878 def _get_internal_status(pipeline_key=None,
   2879                          pipeline_dict=None,
   2880                          slot_dict=None,
   2881                          barrier_dict=None,
   2882                          status_dict=None):
   2883   """Gets the UI dictionary of a pipeline from a set of status dictionaries.
   2884 
   2885   Args:
   2886     pipeline_key: The key of the pipeline to lookup.
   2887     pipeline_dict: Dictionary mapping pipeline db.Key to _PipelineRecord.
   2888       Default is an empty dictionary.
   2889     slot_dict: Dictionary mapping slot db.Key to _SlotRecord.
   2890       Default is an empty dictionary.
   2891     barrier_dict: Dictionary mapping barrier db.Key to _BarrierRecord.
   2892       Default is an empty dictionary.
   2893     status_dict: Dictionary mapping status record db.Key to _StatusRecord.
   2894       Default is an empty dictionary.
   2895 
   2896   Returns:
   2897     Dictionary with the keys:
   2898       classPath: The pipeline function being run.
   2899       args: List of positional argument slot dictionaries.
   2900       kwargs: Dictionary of keyword argument slot dictionaries.
   2901       outputs: Dictionary of output slot dictionaries.
   2902       children: List of child pipeline IDs.
   2903       queueName: Queue on which this pipeline is running.
   2904       afterSlotKeys: List of Slot Ids after which this pipeline runs.
   2905       currentAttempt: Number of the current attempt, starting at 1.
   2906       maxAttempts: Maximum number of attempts before aborting.
   2907       backoffSeconds: Constant factor for backoff before retrying.
   2908       backoffFactor: Exponential factor for backoff before retrying.
   2909       status: Current status of the pipeline.
   2910       startTimeMs: When this pipeline ran or will run due to retries, if present.
   2911       endTimeMs: When this pipeline finalized, if present.
   2912       lastRetryMessage: Why the pipeline failed during the last retry, if there
   2913         was a failure; may be empty.
   2914       abortMessage: For root pipelines, why the pipeline was aborted if it was
   2915         aborted; may be empty.
   2916 
   2917     Dictionary will contain these keys if explicit status is set:
   2918       statusTimeMs: When the status was set as milliseconds since the epoch.
   2919       statusMessage: Status message, if present.
   2920       statusConsoleUrl: The relative URL for the console of this pipeline.
   2921       statusLinks: Dictionary mapping human-readable names to relative URLs
   2922         for related URLs to this pipeline.
   2923 
   2924   Raises:
   2925     PipelineStatusError if any input is bad.
   2926   """
   2927   if pipeline_dict is None:
   2928     pipeline_dict = {}
   2929   if slot_dict is None:
   2930     slot_dict = {}
   2931   if barrier_dict is None:
   2932     barrier_dict = {}
   2933   if status_dict is None:
   2934     status_dict = {}
   2935 
   2936   pipeline_record = pipeline_dict.get(pipeline_key)
   2937   if pipeline_record is None:
   2938     raise PipelineStatusError(
   2939         'Could not find pipeline ID "%s"' % pipeline_key.name())
   2940 
   2941   params = pipeline_record.params
   2942   root_pipeline_key = \
   2943       _PipelineRecord.root_pipeline.get_value_for_datastore(pipeline_record)
   2944   default_slot_key = db.Key(params['output_slots']['default'])
   2945   start_barrier_key = db.Key.from_path(
   2946       _BarrierRecord.kind(), _BarrierRecord.START, parent=pipeline_key)
   2947   finalize_barrier_key = db.Key.from_path(
   2948       _BarrierRecord.kind(), _BarrierRecord.FINALIZE, parent=pipeline_key)
   2949   status_record_key = db.Key.from_path(
   2950       _StatusRecord.kind(), pipeline_key.name())
   2951 
   2952   start_barrier = barrier_dict.get(start_barrier_key)
   2953   finalize_barrier = barrier_dict.get(finalize_barrier_key)
   2954   default_slot = slot_dict.get(default_slot_key)
   2955   status_record = status_dict.get(status_record_key)
   2956   if finalize_barrier is None:
   2957     raise PipelineStatusError(
   2958         'Finalization barrier missing for pipeline ID "%s"' %
   2959         pipeline_key.name())
   2960   if default_slot is None:
   2961     raise PipelineStatusError(
   2962         'Default output slot with key=%s missing for pipeline ID "%s"' % (
   2963         default_slot_key, pipeline_key.name()))
   2964 
   2965   output = {
   2966     'classPath': pipeline_record.class_path,
   2967     'args': list(params['args']),
   2968     'kwargs': params['kwargs'].copy(),
   2969     'outputs': params['output_slots'].copy(),
   2970     'children': [key.name() for key in pipeline_record.fanned_out],
   2971     'queueName': params['queue_name'],
   2972     'afterSlotKeys': [str(key) for key in params['after_all']],
   2973     'currentAttempt': pipeline_record.current_attempt + 1,
   2974     'maxAttempts': pipeline_record.max_attempts,
   2975     'backoffSeconds': pipeline_record.params['backoff_seconds'],
   2976     'backoffFactor': pipeline_record.params['backoff_factor'],
   2977   }
   2978 
   2979   # TODO(user): Truncate args, kwargs, and outputs to < 1MB each so we
   2980   # can reasonably return the whole tree of pipelines and their outputs.
   2981   # Coerce each value to a string to truncate if necessary. For now if the
   2982   # params are too big it will just cause the whole status page to break.
   2983 
   2984   # Fix the key names in parameters to match JavaScript style.
   2985   for value_dict in itertools.chain(
   2986       output['args'], output['kwargs'].itervalues()):
   2987     if 'slot_key' in value_dict:
   2988       value_dict['slotKey'] = value_dict.pop('slot_key')
   2989 
   2990   # Figure out the pipeline's status.
   2991   if pipeline_record.status in (_PipelineRecord.WAITING, _PipelineRecord.RUN):
   2992     if default_slot.status == _SlotRecord.FILLED:
   2993       status = 'finalizing'
   2994     elif (pipeline_record.status == _PipelineRecord.WAITING and
   2995           pipeline_record.next_retry_time is not None):
   2996       status = 'retry'
   2997     elif start_barrier and start_barrier.status == _BarrierRecord.WAITING:
   2998       # start_barrier will be missing for root pipelines
   2999       status = 'waiting'
   3000     else:
   3001       status = 'run'
   3002   elif pipeline_record.status == _PipelineRecord.DONE:
   3003     status = 'done'
   3004   elif pipeline_record.status == _PipelineRecord.ABORTED:
   3005     status = 'aborted'
   3006 
   3007   output['status'] = status
   3008 
   3009   if status_record:
   3010     output['statusTimeMs'] = _get_timestamp_ms(status_record.status_time)
   3011     if status_record.message:
   3012       output['statusMessage'] = status_record.message
   3013     if status_record.console_url:
   3014       output['statusConsoleUrl'] = status_record.console_url
   3015     if status_record.link_names:
   3016       output['statusLinks'] = dict(
   3017           zip(status_record.link_names, status_record.link_urls))
   3018 
   3019   # Populate status-depenedent fields.
   3020   if status in ('run', 'finalizing', 'done', 'retry'):
   3021     if pipeline_record.next_retry_time is not None:
   3022       output['startTimeMs'] = _get_timestamp_ms(pipeline_record.next_retry_time)
   3023     elif start_barrier:
   3024       # start_barrier will be missing for root pipelines
   3025       output['startTimeMs'] = _get_timestamp_ms(start_barrier.trigger_time)
   3026     elif pipeline_record.start_time:
   3027       # Assume this pipeline ran immediately upon spawning with no
   3028       # start barrier or it's the root pipeline.
   3029       output['startTimeMs'] = _get_timestamp_ms(pipeline_record.start_time)
   3030 
   3031   if status in ('finalizing',):
   3032     output['endTimeMs'] = _get_timestamp_ms(default_slot.fill_time)
   3033 
   3034   if status in ('done',):
   3035     output['endTimeMs'] = _get_timestamp_ms(pipeline_record.finalized_time)
   3036 
   3037   if pipeline_record.next_retry_time is not None:
   3038     output['lastRetryMessage'] = pipeline_record.retry_message
   3039 
   3040   if pipeline_record.abort_message:
   3041     output['abortMessage'] = pipeline_record.abort_message
   3042 
   3043   return output
   3044 
   3045 
   3046 def _get_internal_slot(slot_key=None,
   3047                        filler_pipeline_key=None,
   3048                        slot_dict=None):
   3049   """Gets information about a _SlotRecord for display in UI.
   3050 
   3051   Args:
   3052     slot_key: The db.Key of the slot to fetch.
   3053     filler_pipeline_key: In the case the slot has not yet been filled, assume
   3054       that the given db.Key (for a _PipelineRecord) will be the filler of
   3055       the slot in the future.
   3056     slot_dict: The slot JSON dictionary.
   3057 
   3058   Returns:
   3059     Dictionary with the keys:
   3060       status: Slot status: 'filled' or 'waiting'
   3061       fillTimeMs: Time in milliseconds since the epoch of when it was filled.
   3062       value: The current value of the slot, which is a slot's JSON dictionary.
   3063       fillerPipelineId: The pipeline ID of what stage has or should fill
   3064         this slot.
   3065 
   3066   Raises:
   3067     PipelineStatusError if any input is bad.
   3068   """
   3069   if slot_dict is None:
   3070     slot_dict = {}
   3071 
   3072   slot_record = slot_dict.get(slot_key)
   3073   if slot_record is None:
   3074     raise PipelineStatusError(
   3075         'Could not find data for output slot key "%s".' % slot_key)
   3076 
   3077   output = {}
   3078   if slot_record.status == _SlotRecord.FILLED:
   3079     output['status'] = 'filled'
   3080     output['fillTimeMs'] = _get_timestamp_ms(slot_record.fill_time)
   3081     output['value'] = slot_record.value
   3082     filler_pipeline_key = (
   3083         _SlotRecord.filler.get_value_for_datastore(slot_record))
   3084   else:
   3085     output['status'] = 'waiting'
   3086 
   3087   if filler_pipeline_key:
   3088     output['fillerPipelineId'] = filler_pipeline_key.name()
   3089 
   3090   return output
   3091 
   3092 
   3093 def get_status_tree(root_pipeline_id):
   3094   """Gets the full status tree of a pipeline.
   3095 
   3096   Args:
   3097     root_pipeline_id: The pipeline ID to get status for.
   3098 
   3099   Returns:
   3100     Dictionary with the keys:
   3101       rootPipelineId: The ID of the root pipeline.
   3102       slots: Mapping of slot IDs to result of from _get_internal_slot.
   3103       pipelines: Mapping of pipeline IDs to result of _get_internal_status.
   3104 
   3105   Raises:
   3106     PipelineStatusError if any input is bad.
   3107   """
   3108   root_pipeline_key = db.Key.from_path(_PipelineRecord.kind(), root_pipeline_id)
   3109   root_pipeline_record = db.get(root_pipeline_key)
   3110   if root_pipeline_record is None:
   3111     raise PipelineStatusError(
   3112         'Could not find pipeline ID "%s"' % root_pipeline_id)
   3113 
   3114   # If the supplied root_pipeline_id is not actually the root pipeline that's
   3115   # okay. We'll find the real root and override the value they passed in.
   3116   actual_root_key = _PipelineRecord.root_pipeline.get_value_for_datastore(
   3117       root_pipeline_record)
   3118   if actual_root_key != root_pipeline_key:
   3119     root_pipeline_key = actual_root_key
   3120     root_pipeline_id = root_pipeline_key.id_or_name()
   3121     root_pipeline_record = db.get(root_pipeline_key)
   3122     if not root_pipeline_record:
   3123       raise PipelineStatusError(
   3124           'Could not find pipeline ID "%s"' % root_pipeline_id)
   3125 
   3126   # Run all queries asynchronously.
   3127   queries = {}
   3128   for model in (_PipelineRecord, _SlotRecord, _BarrierRecord, _StatusRecord):
   3129     queries[model] = model.all().filter(
   3130         'root_pipeline =', root_pipeline_key).run(batch_size=1000)
   3131 
   3132   found_pipeline_dict = dict(
   3133       (stage.key(), stage) for stage in queries[_PipelineRecord])
   3134   found_slot_dict = dict(
   3135       (slot.key(), slot) for slot in queries[_SlotRecord])
   3136   found_barrier_dict = dict(
   3137       (barrier.key(), barrier) for barrier in queries[_BarrierRecord])
   3138   found_status_dict = dict(
   3139       (status.key(), status) for status in queries[_StatusRecord])
   3140 
   3141   # Breadth-first traversal of _PipelineRecord instances by following
   3142   # _PipelineRecord.fanned_out property values.
   3143   valid_pipeline_keys = set([root_pipeline_key])
   3144   slot_filler_dict = {}  # slot_key to pipeline_key
   3145   expand_stack = [root_pipeline_record]
   3146   while expand_stack:
   3147     old_stack = expand_stack
   3148     expand_stack = []
   3149     for pipeline_record in old_stack:
   3150       for child_pipeline_key in pipeline_record.fanned_out:
   3151         # This will let us prune off those pipelines which were allocated in
   3152         # the Datastore but were never run due to mid-flight task failures.
   3153         child_pipeline_record = found_pipeline_dict.get(child_pipeline_key)
   3154         if child_pipeline_record is None:
   3155           raise PipelineStatusError(
   3156               'Pipeline ID "%s" points to child ID "%s" which does not exist.'
   3157               % (pipeline_record.key().name(), child_pipeline_key.name()))
   3158         expand_stack.append(child_pipeline_record)
   3159         valid_pipeline_keys.add(child_pipeline_key)
   3160 
   3161         # Figure out the deepest pipeline that's responsible for outputting to
   3162         # a particular _SlotRecord, so we can report which pipeline *should*
   3163         # be the filler.
   3164         child_outputs = child_pipeline_record.params['output_slots']
   3165         for output_slot_key in child_outputs.itervalues():
   3166           slot_filler_dict[db.Key(output_slot_key)] = child_pipeline_key
   3167 
   3168   output = {
   3169     'rootPipelineId': root_pipeline_id,
   3170     'slots': {},
   3171     'pipelines': {},
   3172   }
   3173 
   3174   for pipeline_key in found_pipeline_dict.keys():
   3175     if pipeline_key not in valid_pipeline_keys:
   3176       continue
   3177     output['pipelines'][pipeline_key.name()] = _get_internal_status(
   3178         pipeline_key=pipeline_key,
   3179         pipeline_dict=found_pipeline_dict,
   3180         slot_dict=found_slot_dict,
   3181         barrier_dict=found_barrier_dict,
   3182         status_dict=found_status_dict)
   3183 
   3184   for slot_key, filler_pipeline_key in slot_filler_dict.iteritems():
   3185     output['slots'][str(slot_key)] = _get_internal_slot(
   3186         slot_key=slot_key,
   3187         filler_pipeline_key=filler_pipeline_key,
   3188         slot_dict=found_slot_dict)
   3189 
   3190   return output
   3191 
   3192 
   3193 def get_pipeline_names():
   3194   """Returns the class paths of all Pipelines defined in alphabetical order."""
   3195   class_path_set = set()
   3196   for cls in _PipelineMeta._all_classes:
   3197       if cls.class_path is not None:
   3198         class_path_set.add(cls.class_path)
   3199   return sorted(class_path_set)
   3200 
   3201 
   3202 def get_root_list(class_path=None, cursor=None, count=50):
   3203   """Gets a list root Pipelines.
   3204 
   3205   Args:
   3206     class_path: Optional. If supplied, only return root Pipelines with the
   3207       given class_path. By default all root pipelines are returned.
   3208     cursor: Optional. When supplied, the cursor returned from the last call to
   3209       get_root_list which indicates where to pick up.
   3210     count: How many pipeline returns to return.
   3211 
   3212   Returns:
   3213     Dictionary with the keys:
   3214       pipelines: The list of Pipeline records in the same format as
   3215         returned by get_status_tree, but with only the roots listed.
   3216       cursor: Cursor to pass back to this function to resume the query. Will
   3217         only be present if there is another page of results.
   3218 
   3219   Raises:
   3220     PipelineStatusError if any input is bad.
   3221   """
   3222   query = _PipelineRecord.all(cursor=cursor)
   3223   if class_path:
   3224     query.filter('class_path =', class_path)
   3225   query.filter('is_root_pipeline =', True)
   3226   query.order('-start_time')
   3227 
   3228   root_list = query.fetch(count)
   3229 
   3230   fetch_list = []
   3231   for pipeline_record in root_list:
   3232     fetch_list.append(db.Key(pipeline_record.params['output_slots']['default']))
   3233     fetch_list.append(db.Key.from_path(
   3234         _BarrierRecord.kind(), _BarrierRecord.FINALIZE,
   3235         parent=pipeline_record.key()))
   3236     fetch_list.append(db.Key.from_path(
   3237         _StatusRecord.kind(), pipeline_record.key().name()))
   3238 
   3239   pipeline_dict = dict((stage.key(), stage) for stage in root_list)
   3240   slot_dict = {}
   3241   barrier_dict = {}
   3242   status_dict = {}
   3243   for entity in db.get(fetch_list):
   3244     if isinstance(entity, _BarrierRecord):
   3245       barrier_dict[entity.key()] = entity
   3246     elif isinstance(entity, _SlotRecord):
   3247       slot_dict[entity.key()] = entity
   3248     elif isinstance(entity, _StatusRecord):
   3249       status_dict[entity.key()] = entity
   3250 
   3251   results = []
   3252   for pipeline_record in root_list:
   3253     try:
   3254       output = _get_internal_status(
   3255           pipeline_record.key(),
   3256           pipeline_dict=pipeline_dict,
   3257           slot_dict=slot_dict,
   3258           barrier_dict=barrier_dict,
   3259           status_dict=status_dict)
   3260       output['pipelineId'] = pipeline_record.key().name()
   3261       results.append(output)
   3262     except PipelineStatusError, e:
   3263       output = {'status': e.message}
   3264       output['classPath'] = ''
   3265       output['pipelineId'] = pipeline_record.key().name()
   3266       results.append(output)
   3267 
   3268   result_dict = {}
   3269   cursor = query.cursor()
   3270   query.with_cursor(cursor)
   3271   if query.get(keys_only=True):
   3272     result_dict.update(cursor=cursor)
   3273   result_dict.update(pipelines=results)
   3274   return result_dict
   3275 
   3276 ################################################################################
   3277 
   3278 def set_enforce_auth(new_status):
   3279   """Sets whether Pipeline API handlers rely on app.yaml for access control.
   3280 
   3281   Args:
   3282     new_status: If True, then the Pipeline API will enforce its own
   3283       access control on status and static file handlers. If False, then
   3284       it will assume app.yaml is doing the enforcement.
   3285   """
   3286   global _ENFORCE_AUTH
   3287   _ENFORCE_AUTH = new_status
   3288 
   3289 
   3290 def create_handlers_map(prefix='.*'):
   3291   """Create new handlers map.
   3292 
   3293   Args:
   3294     prefix: url prefix to use.
   3295 
   3296   Returns:
   3297     list of (regexp, handler) pairs for WSGIApplication constructor.
   3298   """
   3299   return [
   3300       (prefix + '/output', _BarrierHandler),
   3301       (prefix + '/run', _PipelineHandler),
   3302       (prefix + '/finalized', _PipelineHandler),
   3303       (prefix + '/cleanup', _CleanupHandler),
   3304       (prefix + '/abort', _PipelineHandler),
   3305       (prefix + '/fanout', _FanoutHandler),
   3306       (prefix + '/fanout_abort', _FanoutAbortHandler),
   3307       (prefix + '/callback', _CallbackHandler),
   3308       (prefix + '/rpc/tree', status_ui._TreeStatusHandler),
   3309       (prefix + '/rpc/class_paths', status_ui._ClassPathListHandler),
   3310       (prefix + '/rpc/list', status_ui._RootListHandler),
   3311       (prefix + '(/.+)', status_ui._StatusUiHandler),
   3312       ]
   3313