Home | History | Annotate | Download | only in cloudstorage
      1 # Copyright 2013 Google Inc. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #    http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing,
     10 # software distributed under the License is distributed on an
     11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
     12 # either express or implied. See the License for the specific
     13 # language governing permissions and limitations under the License.
     14 
     15 """Util functions and classes for cloudstorage_api."""
     16 
     17 
     18 
     19 __all__ = ['set_default_retry_params',
     20            'RetryParams',
     21           ]
     22 
     23 import copy
     24 import httplib
     25 import logging
     26 import math
     27 import os
     28 import threading
     29 import time
     30 import urllib
     31 
     32 
     33 try:
     34   from google.appengine.api import app_identity
     35   from google.appengine.api import urlfetch
     36   from google.appengine.datastore import datastore_rpc
     37   from google.appengine.ext import ndb
     38   from google.appengine.ext.ndb import eventloop
     39   from google.appengine.ext.ndb import tasklets
     40   from google.appengine.ext.ndb import utils
     41   from google.appengine import runtime
     42   from google.appengine.runtime import apiproxy_errors
     43 except ImportError:
     44   from google.appengine.api import app_identity
     45   from google.appengine.api import urlfetch
     46   from google.appengine.datastore import datastore_rpc
     47   from google.appengine import runtime
     48   from google.appengine.runtime import apiproxy_errors
     49   from google.appengine.ext import ndb
     50   from google.appengine.ext.ndb import eventloop
     51   from google.appengine.ext.ndb import tasklets
     52   from google.appengine.ext.ndb import utils
     53 
     54 
     55 _RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError,
     56                          apiproxy_errors.Error,
     57                          app_identity.InternalError,
     58                          app_identity.BackendDeadlineExceeded)
     59 
     60 _thread_local_settings = threading.local()
     61 _thread_local_settings.default_retry_params = None
     62 
     63 
     64 def set_default_retry_params(retry_params):
     65   """Set a default RetryParams for current thread current request."""
     66   _thread_local_settings.default_retry_params = copy.copy(retry_params)
     67 
     68 
     69 def _get_default_retry_params():
     70   """Get default RetryParams for current request and current thread.
     71 
     72   Returns:
     73     A new instance of the default RetryParams.
     74   """
     75   default = getattr(_thread_local_settings, 'default_retry_params', None)
     76   if default is None or not default.belong_to_current_request():
     77     return RetryParams()
     78   else:
     79     return copy.copy(default)
     80 
     81 
     82 def _quote_filename(filename):
     83   """Quotes filename to use as a valid URI path.
     84 
     85   Args:
     86     filename: user provided filename. /bucket/filename.
     87 
     88   Returns:
     89     The filename properly quoted to use as URI's path component.
     90   """
     91   return urllib.quote(filename)
     92 
     93 
     94 def _unquote_filename(filename):
     95   """Unquotes a valid URI path back to its filename.
     96 
     97   This is the opposite of _quote_filename.
     98 
     99   Args:
    100     filename: a quoted filename. /bucket/some%20filename.
    101 
    102   Returns:
    103     The filename unquoted.
    104   """
    105   return urllib.unquote(filename)
    106 
    107 
    108 def _should_retry(resp):
    109   """Given a urlfetch response, decide whether to retry that request."""
    110   return (resp.status_code == httplib.REQUEST_TIMEOUT or
    111           (resp.status_code >= 500 and
    112            resp.status_code < 600))
    113 
    114 
    115 class _RetryWrapper(object):
    116   """A wrapper that wraps retry logic around any tasklet."""
    117 
    118   def __init__(self,
    119                retry_params,
    120                retriable_exceptions=_RETRIABLE_EXCEPTIONS,
    121                should_retry=lambda r: False):
    122     """Init.
    123 
    124     Args:
    125       retry_params: an RetryParams instance.
    126       retriable_exceptions: a list of exception classes that are retriable.
    127       should_retry: a function that takes a result from the tasklet and returns
    128         a boolean. True if the result should be retried.
    129     """
    130     self.retry_params = retry_params
    131     self.retriable_exceptions = retriable_exceptions
    132     self.should_retry = should_retry
    133 
    134   @ndb.tasklet
    135   def run(self, tasklet, **kwds):
    136     """Run a tasklet with retry.
    137 
    138     The retry should be transparent to the caller: if no results
    139     are successful, the exception or result from the last retry is returned
    140     to the caller.
    141 
    142     Args:
    143       tasklet: the tasklet to run.
    144       **kwds: keywords arguments to run the tasklet.
    145 
    146     Raises:
    147       The exception from running the tasklet.
    148 
    149     Returns:
    150       The result from running the tasklet.
    151     """
    152     start_time = time.time()
    153     n = 1
    154 
    155     while True:
    156       e = None
    157       result = None
    158       got_result = False
    159 
    160       try:
    161         result = yield tasklet(**kwds)
    162         got_result = True
    163         if not self.should_retry(result):
    164           raise ndb.Return(result)
    165       except runtime.DeadlineExceededError:
    166         logging.debug(
    167             'Tasklet has exceeded request deadline after %s seconds total',
    168             time.time() - start_time)
    169         raise
    170       except self.retriable_exceptions, e:
    171         pass
    172 
    173       if n == 1:
    174         logging.debug('Tasklet is %r', tasklet)
    175 
    176       delay = self.retry_params.delay(n, start_time)
    177 
    178       if delay <= 0:
    179         logging.debug(
    180             'Tasklet failed after %s attempts and %s seconds in total',
    181             n, time.time() - start_time)
    182         if got_result:
    183           raise ndb.Return(result)
    184         elif e is not None:
    185           raise e
    186         else:
    187           assert False, 'Should never reach here.'
    188 
    189       if got_result:
    190         logging.debug(
    191             'Got result %r from tasklet.', result)
    192       else:
    193         logging.debug(
    194             'Got exception "%r" from tasklet.', e)
    195       logging.debug('Retry in %s seconds.', delay)
    196       n += 1
    197       yield tasklets.sleep(delay)
    198 
    199 
    200 class RetryParams(object):
    201   """Retry configuration parameters."""
    202 
    203   _DEFAULT_USER_AGENT = 'App Engine Python GCS Client'
    204 
    205   @datastore_rpc._positional(1)
    206   def __init__(self,
    207                backoff_factor=2.0,
    208                initial_delay=0.1,
    209                max_delay=10.0,
    210                min_retries=3,
    211                max_retries=6,
    212                max_retry_period=30.0,
    213                urlfetch_timeout=None,
    214                save_access_token=False,
    215                _user_agent=None):
    216     """Init.
    217 
    218     This object is unique per request per thread.
    219 
    220     Library will retry according to this setting when App Engine Server
    221     can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or
    222     500-600 response.
    223 
    224     Args:
    225       backoff_factor: exponential backoff multiplier.
    226       initial_delay: seconds to delay for the first retry.
    227       max_delay: max seconds to delay for every retry.
    228       min_retries: min number of times to retry. This value is automatically
    229         capped by max_retries.
    230       max_retries: max number of times to retry. Set this to 0 for no retry.
    231       max_retry_period: max total seconds spent on retry. Retry stops when
    232         this period passed AND min_retries has been attempted.
    233       urlfetch_timeout: timeout for urlfetch in seconds. Could be None,
    234         in which case the value will be chosen by urlfetch module.
    235       save_access_token: persist access token to datastore to avoid
    236         excessive usage of GetAccessToken API. Usually the token is cached
    237         in process and in memcache. In some cases, memcache isn't very
    238         reliable.
    239       _user_agent: The user agent string that you want to use in your requests.
    240     """
    241     self.backoff_factor = self._check('backoff_factor', backoff_factor)
    242     self.initial_delay = self._check('initial_delay', initial_delay)
    243     self.max_delay = self._check('max_delay', max_delay)
    244     self.max_retry_period = self._check('max_retry_period', max_retry_period)
    245     self.max_retries = self._check('max_retries', max_retries, True, int)
    246     self.min_retries = self._check('min_retries', min_retries, True, int)
    247     if self.min_retries > self.max_retries:
    248       self.min_retries = self.max_retries
    249 
    250     self.urlfetch_timeout = None
    251     if urlfetch_timeout is not None:
    252       self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout)
    253     self.save_access_token = self._check('save_access_token', save_access_token,
    254                                          True, bool)
    255     self._user_agent = _user_agent or self._DEFAULT_USER_AGENT
    256 
    257     self._request_id = os.getenv('REQUEST_LOG_ID')
    258 
    259   def __eq__(self, other):
    260     if not isinstance(other, self.__class__):
    261       return False
    262     return self.__dict__ == other.__dict__
    263 
    264   def __ne__(self, other):
    265     return not self.__eq__(other)
    266 
    267   @classmethod
    268   def _check(cls, name, val, can_be_zero=False, val_type=float):
    269     """Check init arguments.
    270 
    271     Args:
    272       name: name of the argument. For logging purpose.
    273       val: value. Value has to be non negative number.
    274       can_be_zero: whether value can be zero.
    275       val_type: Python type of the value.
    276 
    277     Returns:
    278       The value.
    279 
    280     Raises:
    281       ValueError: when invalid value is passed in.
    282       TypeError: when invalid value type is passed in.
    283     """
    284     valid_types = [val_type]
    285     if val_type is float:
    286       valid_types.append(int)
    287 
    288     if type(val) not in valid_types:
    289       raise TypeError(
    290           'Expect type %s for parameter %s' % (val_type.__name__, name))
    291     if val < 0:
    292       raise ValueError(
    293           'Value for parameter %s has to be greater than 0' % name)
    294     if not can_be_zero and val == 0:
    295       raise ValueError(
    296           'Value for parameter %s can not be 0' % name)
    297     return val
    298 
    299   def belong_to_current_request(self):
    300     return os.getenv('REQUEST_LOG_ID') == self._request_id
    301 
    302   def delay(self, n, start_time):
    303     """Calculate delay before the next retry.
    304 
    305     Args:
    306       n: the number of current attempt. The first attempt should be 1.
    307       start_time: the time when retry started in unix time.
    308 
    309     Returns:
    310       Number of seconds to wait before next retry. -1 if retry should give up.
    311     """
    312     if (n > self.max_retries or
    313         (n > self.min_retries and
    314          time.time() - start_time > self.max_retry_period)):
    315       return -1
    316     return min(
    317         math.pow(self.backoff_factor, n-1) * self.initial_delay,
    318         self.max_delay)
    319 
    320 
    321 def _run_until_rpc():
    322   """Eagerly evaluate tasklets until it is blocking on some RPC.
    323 
    324   Usually ndb eventloop el isn't run until some code calls future.get_result().
    325 
    326   When an async tasklet is called, the tasklet wrapper evaluates the tasklet
    327   code into a generator, enqueues a callback _help_tasklet_along onto
    328   the el.current queue, and returns a future.
    329 
    330   _help_tasklet_along, when called by the el, will
    331   get one yielded value from the generator. If the value if another future,
    332   set up a callback _on_future_complete to invoke _help_tasklet_along
    333   when the dependent future fulfills. If the value if a RPC, set up a
    334   callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills.
    335   Thus _help_tasklet_along drills down
    336   the chain of futures until some future is blocked by RPC. El runs
    337   all callbacks and constantly check pending RPC status.
    338   """
    339   el = eventloop.get_event_loop()
    340   while el.current:
    341     el.run0()
    342 
    343 
    344 def _eager_tasklet(tasklet):
    345   """Decorator to turn tasklet to run eagerly."""
    346 
    347   @utils.wrapping(tasklet)
    348   def eager_wrapper(*args, **kwds):
    349     fut = tasklet(*args, **kwds)
    350     _run_until_rpc()
    351     return fut
    352 
    353   return eager_wrapper
    354