Home | History | Annotate | Download | only in pipeline
      1 #!/usr/bin/env python
      2 #
      3 # Copyright 2010 Google Inc.
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 #     http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 """Common Pipelines for easy reuse."""
     18 
     19 import cgi
     20 import logging
     21 import random
     22 
     23 from google.appengine.api import mail
     24 from google.appengine.api import taskqueue
     25 
     26 import pipeline
     27 
     28 
     29 class Return(pipeline.Pipeline):
     30   """Causes calling generator to have the supplied default output value.
     31 
     32   Only works when yielded last!
     33   """
     34 
     35   def run(self, return_value=None):
     36     return return_value
     37 
     38 
     39 class Ignore(pipeline.Pipeline):
     40   """Mark the supplied parameters as unused outputs of sibling pipelines."""
     41 
     42   def run(self, *args):
     43     pass
     44 
     45 
     46 class Dict(pipeline.Pipeline):
     47   """Returns a dictionary with the supplied keyword arguments."""
     48 
     49   def run(self, **kwargs):
     50     return dict(**kwargs)
     51 
     52 
     53 class List(pipeline.Pipeline):
     54   """Returns a list with the supplied positional arguments."""
     55 
     56   def run(self, *args):
     57     return list(args)
     58 
     59 
     60 class AbortIfTrue(pipeline.Pipeline):
     61   """Aborts the entire pipeline if the supplied argument is True."""
     62 
     63   def run(self, value, message=''):
     64     if value:
     65       raise pipeline.Abort(message)
     66 
     67 
     68 class All(pipeline.Pipeline):
     69   """Returns True if all of the values are True.
     70 
     71   Returns False if there are no values present.
     72   """
     73 
     74   def run(self, *args):
     75     if len(args) == 0:
     76       return False
     77     for value in args:
     78       if not value:
     79         return False
     80     return True
     81 
     82 
     83 class Any(pipeline.Pipeline):
     84   """Returns True if any of the values are True."""
     85 
     86   def run(self, *args):
     87     for value in args:
     88       if value:
     89         return True
     90     return False
     91 
     92 
     93 class Complement(pipeline.Pipeline):
     94   """Returns the boolean complement of the values."""
     95 
     96   def run(self, *args):
     97     if len(args) == 1:
     98       return not args[0]
     99     else:
    100       return [not value for value in args]
    101 
    102 
    103 class Max(pipeline.Pipeline):
    104   """Returns the max value."""
    105 
    106   def __init__(self, *args):
    107     if len(args) == 0:
    108       raise TypeError('max expected at least 1 argument, got 0')
    109     pipeline.Pipeline.__init__(self, *args)
    110 
    111   def run(self, *args):
    112     return max(args)
    113 
    114 
    115 class Min(pipeline.Pipeline):
    116   """Returns the min value."""
    117 
    118   def __init__(self, *args):
    119     if len(args) == 0:
    120       raise TypeError('min expected at least 1 argument, got 0')
    121     pipeline.Pipeline.__init__(self, *args)
    122 
    123   def run(self, *args):
    124     return min(args)
    125 
    126 
    127 class Sum(pipeline.Pipeline):
    128   """Returns the sum of all values."""
    129 
    130   def __init__(self, *args):
    131     if len(args) == 0:
    132       raise TypeError('sum expected at least 1 argument, got 0')
    133     pipeline.Pipeline.__init__(self, *args)
    134 
    135   def run(self, *args):
    136     return sum(args)
    137 
    138 
    139 class Multiply(pipeline.Pipeline):
    140   """Returns all values multiplied together."""
    141 
    142   def __init__(self, *args):
    143     if len(args) == 0:
    144       raise TypeError('multiply expected at least 1 argument, got 0')
    145     pipeline.Pipeline.__init__(self, *args)
    146 
    147   def run(self, *args):
    148     total = 1
    149     for value in args:
    150       total *= value
    151     return total
    152 
    153 
    154 class Negate(pipeline.Pipeline):
    155   """Returns each value supplied multiplied by -1."""
    156 
    157   def __init__(self, *args):
    158     if len(args) == 0:
    159       raise TypeError('negate expected at least 1 argument, got 0')
    160     pipeline.Pipeline.__init__(self, *args)
    161 
    162   def run(self, *args):
    163     if len(args) == 1:
    164       return -1 * args[0]
    165     else:
    166       return [-1 * x for x in args]
    167 
    168 
    169 class Extend(pipeline.Pipeline):
    170   """Combine together lists and tuples into a single list.
    171 
    172   Args:
    173     *args: One or more lists or tuples.
    174 
    175   Returns:
    176     A single list of all supplied lists merged together in order. Length of
    177     the output list is the sum of the lengths of all input lists.
    178   """
    179 
    180   def run(self, *args):
    181     combined = []
    182     for value in args:
    183       combined.extend(value)
    184     return combined
    185 
    186 
    187 class Append(pipeline.Pipeline):
    188   """Combine together values into a list.
    189 
    190   Args:
    191     *args: One or more values.
    192 
    193   Returns:
    194     A single list of all values appended to the same list. Length of the
    195     output list matches the length of the input list.
    196   """
    197 
    198   def run(self, *args):
    199     combined = []
    200     for value in args:
    201       combined.append(value)
    202     return combined
    203 
    204 
    205 class Concat(pipeline.Pipeline):
    206   """Concatenates strings together using a join character.
    207 
    208   Args:
    209     *args: One or more strings.
    210     separator: Keyword argument only; the string to use to join the args.
    211 
    212   Returns:
    213     The joined string.
    214   """
    215 
    216   def run(self, *args, **kwargs):
    217     separator = kwargs.get('separator', '')
    218     return separator.join(args)
    219 
    220 
    221 class Union(pipeline.Pipeline):
    222   """Like Extend, but the resulting list has all unique elements."""
    223 
    224   def run(self, *args):
    225     combined = set()
    226     for value in args:
    227       combined.update(value)
    228     return list(combined)
    229 
    230 
    231 class Intersection(pipeline.Pipeline):
    232   """Returns only those items belonging to all of the supplied lists.
    233 
    234   Each argument must be a list. No individual items are permitted.
    235   """
    236 
    237   def run(self, *args):
    238     if not args:
    239       return []
    240     result = set(args[0])
    241     for value in args[1:]:
    242       result.intersection_update(set(value))
    243     return list(result)
    244 
    245 
    246 class Uniquify(pipeline.Pipeline):
    247   """Returns a list of unique items from the list of items supplied."""
    248 
    249   def run(self, *args):
    250     return list(set(args))
    251 
    252 
    253 class Format(pipeline.Pipeline):
    254   """Formats a string with formatting arguments."""
    255 
    256   @classmethod
    257   def dict(cls, message, **format_dict):
    258     """Formats a dictionary.
    259 
    260     Args:
    261       message: The format string.
    262       **format_dict: Keyword arguments of format parameters to use for
    263         formatting the string.
    264 
    265     Returns:
    266       The formatted string.
    267     """
    268     return cls('dict', message, format_dict)
    269 
    270   @classmethod
    271   def tuple(cls, message, *params):
    272     """Formats a tuple.
    273 
    274     Args:
    275       message: The format string.
    276       *params: The formatting positional parameters.
    277 
    278     Returns:
    279       The formatted string.
    280     """
    281     return cls('tuple', message, *params)
    282 
    283   def run(self, format_type, message, *params):
    284     if format_type == 'dict':
    285       return message % params[0]
    286     elif format_type == 'tuple':
    287       return message % params
    288     else:
    289       raise pipeline.Abort('Invalid format type: %s' % format_type)
    290 
    291 
    292 class Log(pipeline.Pipeline):
    293   """Logs a message, just like the Python logging module."""
    294 
    295   # TODO: Hack the call stack of the logging message to use the file and line
    296   # context from when it was first scheduled, not when it actually ran.
    297 
    298   _log_method = logging.log
    299 
    300   @classmethod
    301   def log(cls, *args, **kwargs):
    302     return Log(*args, **kwargs)
    303 
    304   @classmethod
    305   def debug(cls, *args, **kwargs):
    306     return Log(logging.DEBUG, *args, **kwargs)
    307 
    308   @classmethod
    309   def info(cls, *args, **kwargs):
    310     return Log(logging.INFO, *args, **kwargs)
    311 
    312   @classmethod
    313   def warning(cls, *args, **kwargs):
    314     return Log(logging.WARNING, *args, **kwargs)
    315 
    316   @classmethod
    317   def error(cls, *args, **kwargs):
    318     return Log(logging.ERROR, *args, **kwargs)
    319 
    320   @classmethod
    321   def critical(cls, *args, **kwargs):
    322     return Log(logging.CRITICAL, *args, **kwargs)
    323 
    324   def run(self, level, message, *args):
    325     Log._log_method.im_func(level, message, *args)
    326 
    327 
    328 class Delay(pipeline.Pipeline):
    329   """Waits N seconds before completion.
    330 
    331   Args:
    332     seconds: Keyword argument only. The number of seconds to wait. Will be
    333       rounded to the nearest whole second.
    334 
    335   Returns:
    336     How long this delay waited.
    337   """
    338 
    339   async = True
    340 
    341   def __init__(self, *args, **kwargs):
    342     if len(args) != 0 or len(kwargs) != 1 or kwargs.keys()[0] != 'seconds':
    343       raise TypeError('Delay takes one keyword parameter, "seconds".')
    344     pipeline.Pipeline.__init__(self, *args, **kwargs)
    345 
    346   def run(self, seconds=None):
    347     task = self.get_callback_task(
    348         countdown=seconds,
    349         name='ae-pipeline-delay-' + self.pipeline_id)
    350     try:
    351       task.add(self.queue_name)
    352     except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError):
    353       pass
    354 
    355   def run_test(self, seconds=None):
    356     logging.debug('Delay pipeline pretending to sleep %0.2f seconds', seconds)
    357     self.complete(seconds)
    358 
    359   def callback(self):
    360     self.complete(self.kwargs['seconds'])
    361 
    362 
    363 class EmailToContinue(pipeline.Pipeline):
    364   """Emails someone asking if the pipeline should continue.
    365 
    366   When the user clicks "Approve", the pipeline will return True. When the
    367   user clicks "Disapprove", the pipeline will return False.
    368 
    369   Supply normal mail.EmailMessage parameters, plus two additional parameters:
    370 
    371     approve_html: HTML to show to the user after clicking approve.
    372     disapprove_html: HTML to show to the user after clicking disapprove.
    373 
    374   Additionally, the 'body' and 'html' keyword arguments are treated as Python
    375   dictionary templates with the keywords 'approval_url' and 'disapprove_url',
    376   which let you place those links in your email however you want (as long
    377   as clicking the links results in a GET request). The approve/disapprove URLs
    378   are relative paths (e.g., '/relative/foo/bar'), so you must connect them to
    379   whatever hostname you actually want users to access the callback on with an
    380   absolute URL.
    381 
    382   A random token is used to secure the asynchronous action.
    383   """
    384 
    385   async = True
    386   public_callbacks = True
    387 
    388   _email_message = mail.EmailMessage
    389 
    390   def __init__(self, **kwargs):
    391     if 'random_token' not in kwargs:
    392       kwargs['random_token'] = '%x' % random.randint(0, 2**64)
    393     if 'approve_html' not in kwargs:
    394       kwargs['approve_html'] = '<h1>Approved!</h1>'
    395     if 'disapprove_html' not in kwargs:
    396       kwargs['disapprove_html'] = '<h1>Not Approved!</h1>'
    397     pipeline.Pipeline.__init__(self, **kwargs)
    398 
    399   def run(self, **kwargs):
    400     random_token = kwargs.pop('random_token')
    401     kwargs.pop('approve_html', '')
    402     kwargs.pop('disapprove_html', '')
    403 
    404     approve_url = self.get_callback_url(
    405         random_token=random_token, choice='approve')
    406     disapprove_url = self.get_callback_url(
    407         random_token=random_token, choice='disapprove')
    408 
    409     mail_args = kwargs.copy()
    410     mail_args['body'] = mail_args['body'] % {
    411         'approve_url': approve_url,
    412         'disapprove_url': disapprove_url,
    413     }
    414     if 'html' in mail_args:
    415       mail_args['html'] = mail_args['html'] % {
    416         'approve_url': cgi.escape(approve_url),
    417         'disapprove_url': cgi.escape(disapprove_url),
    418       }
    419     EmailToContinue._email_message.im_func(**mail_args).send()
    420 
    421   def run_test(self, **kwargs):
    422     self.run(**kwargs)
    423     self.complete(True)
    424 
    425   def callback(self, random_token=None, choice=None):
    426     if random_token != self.kwargs['random_token']:
    427       return (403, 'text/html', '<h1>Invalid security token.</h1>')
    428 
    429     if choice == 'approve':
    430       self.complete(True)
    431       return (200, 'text/html', self.kwargs['approve_html'])
    432     elif choice == 'disapprove':
    433       self.complete(False)
    434       return (200, 'text/html', self.kwargs['disapprove_html'])
    435     else:
    436       return (400, 'text/html', '<h1>Invalid "choice" value.</h1>')
    437