Home | History | Annotate | Download | only in cloudstorage
      1 # Copyright 2013 Google Inc. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #    http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing,
     10 # software distributed under the License is distributed on an
     11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
     12 # either express or implied. See the License for the specific
     13 # language governing permissions and limitations under the License.
     14 
     15 """Util functions and classes for cloudstorage_api."""
     16 
     17 
     18 
     19 __all__ = ['set_default_retry_params',
     20            'RetryParams',
     21           ]
     22 
     23 import copy
     24 import httplib
     25 import logging
     26 import math
     27 import os
     28 import threading
     29 import time
     30 import urllib
     31 
     32 
     33 try:
     34   from google.appengine.api import urlfetch
     35   from google.appengine.datastore import datastore_rpc
     36   from google.appengine.ext.ndb import eventloop
     37   from google.appengine.ext.ndb import utils
     38   from google.appengine import runtime
     39   from google.appengine.runtime import apiproxy_errors
     40 except ImportError:
     41   from google.appengine.api import urlfetch
     42   from google.appengine.datastore import datastore_rpc
     43   from google.appengine import runtime
     44   from google.appengine.runtime import apiproxy_errors
     45   from google.appengine.ext.ndb import eventloop
     46   from google.appengine.ext.ndb import utils
     47 
     48 
     49 _RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError,
     50                          apiproxy_errors.Error)
     51 
     52 _thread_local_settings = threading.local()
     53 _thread_local_settings.default_retry_params = None
     54 
     55 
     56 def set_default_retry_params(retry_params):
     57   """Set a default RetryParams for current thread current request."""
     58   _thread_local_settings.default_retry_params = copy.copy(retry_params)
     59 
     60 
     61 def _get_default_retry_params():
     62   """Get default RetryParams for current request and current thread.
     63 
     64   Returns:
     65     A new instance of the default RetryParams.
     66   """
     67   default = getattr(_thread_local_settings, 'default_retry_params', None)
     68   if default is None or not default.belong_to_current_request():
     69     return RetryParams()
     70   else:
     71     return copy.copy(default)
     72 
     73 
     74 def _quote_filename(filename):
     75   """Quotes filename to use as a valid URI path.
     76 
     77   Args:
     78     filename: user provided filename. /bucket/filename.
     79 
     80   Returns:
     81     The filename properly quoted to use as URI's path component.
     82   """
     83   return urllib.quote(filename)
     84 
     85 
     86 def _unquote_filename(filename):
     87   """Unquotes a valid URI path back to its filename.
     88 
     89   This is the opposite of _quote_filename.
     90 
     91   Args:
     92     filename: a quoted filename. /bucket/some%20filename.
     93 
     94   Returns:
     95     The filename unquoted.
     96   """
     97   return urllib.unquote(filename)
     98 
     99 
    100 def _should_retry(resp):
    101   """Given a urlfetch response, decide whether to retry that request."""
    102   return (resp.status_code == httplib.REQUEST_TIMEOUT or
    103           (resp.status_code >= 500 and
    104            resp.status_code < 600))
    105 
    106 
    107 class RetryParams(object):
    108   """Retry configuration parameters."""
    109 
    110   @datastore_rpc._positional(1)
    111   def __init__(self,
    112                backoff_factor=2.0,
    113                initial_delay=0.1,
    114                max_delay=10.0,
    115                min_retries=2,
    116                max_retries=5,
    117                max_retry_period=30.0,
    118                urlfetch_timeout=None,
    119                save_access_token=False):
    120     """Init.
    121 
    122     This object is unique per request per thread.
    123 
    124     Library will retry according to this setting when App Engine Server
    125     can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or
    126     500-600 response.
    127 
    128     Args:
    129       backoff_factor: exponential backoff multiplier.
    130       initial_delay: seconds to delay for the first retry.
    131       max_delay: max seconds to delay for every retry.
    132       min_retries: min number of times to retry. This value is automatically
    133         capped by max_retries.
    134       max_retries: max number of times to retry. Set this to 0 for no retry.
    135       max_retry_period: max total seconds spent on retry. Retry stops when
    136         this period passed AND min_retries has been attempted.
    137       urlfetch_timeout: timeout for urlfetch in seconds. Could be None,
    138         in which case the value will be chosen by urlfetch module.
    139       save_access_token: persist access token to datastore to avoid
    140         excessive usage of GetAccessToken API. Usually the token is cached
    141         in process and in memcache. In some cases, memcache isn't very
    142         reliable.
    143     """
    144     self.backoff_factor = self._check('backoff_factor', backoff_factor)
    145     self.initial_delay = self._check('initial_delay', initial_delay)
    146     self.max_delay = self._check('max_delay', max_delay)
    147     self.max_retry_period = self._check('max_retry_period', max_retry_period)
    148     self.max_retries = self._check('max_retries', max_retries, True, int)
    149     self.min_retries = self._check('min_retries', min_retries, True, int)
    150     if self.min_retries > self.max_retries:
    151       self.min_retries = self.max_retries
    152 
    153     self.urlfetch_timeout = None
    154     if urlfetch_timeout is not None:
    155       self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout)
    156     self.save_access_token = self._check('save_access_token', save_access_token,
    157                                          True, bool)
    158 
    159     self._request_id = os.getenv('REQUEST_LOG_ID')
    160 
    161   def __eq__(self, other):
    162     if not isinstance(other, self.__class__):
    163       return False
    164     return self.__dict__ == other.__dict__
    165 
    166   def __ne__(self, other):
    167     return not self.__eq__(other)
    168 
    169   @classmethod
    170   def _check(cls, name, val, can_be_zero=False, val_type=float):
    171     """Check init arguments.
    172 
    173     Args:
    174       name: name of the argument. For logging purpose.
    175       val: value. Value has to be non negative number.
    176       can_be_zero: whether value can be zero.
    177       val_type: Python type of the value.
    178 
    179     Returns:
    180       The value.
    181 
    182     Raises:
    183       ValueError: when invalid value is passed in.
    184       TypeError: when invalid value type is passed in.
    185     """
    186     valid_types = [val_type]
    187     if val_type is float:
    188       valid_types.append(int)
    189 
    190     if type(val) not in valid_types:
    191       raise TypeError(
    192           'Expect type %s for parameter %s' % (val_type.__name__, name))
    193     if val < 0:
    194       raise ValueError(
    195           'Value for parameter %s has to be greater than 0' % name)
    196     if not can_be_zero and val == 0:
    197       raise ValueError(
    198           'Value for parameter %s can not be 0' % name)
    199     return val
    200 
    201   def belong_to_current_request(self):
    202     return os.getenv('REQUEST_LOG_ID') == self._request_id
    203 
    204   def delay(self, n, start_time):
    205     """Calculate delay before the next retry.
    206 
    207     Args:
    208       n: the number of current attempt. The first attempt should be 1.
    209       start_time: the time when retry started in unix time.
    210 
    211     Returns:
    212       Number of seconds to wait before next retry. -1 if retry should give up.
    213     """
    214     if (n > self.max_retries or
    215         (n > self.min_retries and
    216          time.time() - start_time > self.max_retry_period)):
    217       return -1
    218     return min(
    219         math.pow(self.backoff_factor, n-1) * self.initial_delay,
    220         self.max_delay)
    221 
    222 
    223 def _retry_fetch(url, retry_params, **kwds):
    224   """A blocking fetch function similar to urlfetch.fetch.
    225 
    226   This function should be used when a urlfetch has timed out or the response
    227   shows http request timeout. This function will put current thread to
    228   sleep between retry backoffs.
    229 
    230   Args:
    231     url: url to fetch.
    232     retry_params: an instance of RetryParams.
    233     **kwds: keyword arguments for urlfetch. If deadline is specified in kwds,
    234       it precedes the one in RetryParams. If none is specified, it's up to
    235       urlfetch to use its own default.
    236 
    237   Returns:
    238     A urlfetch response from the last retry. None if no retry was attempted.
    239 
    240   Raises:
    241     Whatever exception encountered during the last retry.
    242   """
    243   n = 1
    244   start_time = time.time()
    245   delay = retry_params.delay(n, start_time)
    246   if delay <= 0:
    247     return
    248 
    249   logging.info('Will retry request to %s.', url)
    250   while delay > 0:
    251     resp = None
    252     try:
    253       logging.info('Retry in %s seconds.', delay)
    254       time.sleep(delay)
    255       resp = urlfetch.fetch(url, **kwds)
    256     except runtime.DeadlineExceededError:
    257       logging.info(
    258           'Urlfetch retry %s will exceed request deadline '
    259           'after %s seconds total', n, time.time() - start_time)
    260       raise
    261     except _RETRIABLE_EXCEPTIONS, e:
    262       pass
    263 
    264     n += 1
    265     delay = retry_params.delay(n, start_time)
    266     if resp and not _should_retry(resp):
    267       break
    268     elif resp:
    269       logging.info(
    270           'Got status %s from GCS.', resp.status_code)
    271     else:
    272       logging.info(
    273           'Got exception "%r" while contacting GCS.', e)
    274 
    275   if resp:
    276     return resp
    277 
    278   logging.info('Urlfetch failed after %s retries and %s seconds in total.',
    279                n - 1, time.time() - start_time)
    280   raise
    281 
    282 
    283 def _run_until_rpc():
    284   """Eagerly evaluate tasklets until it is blocking on some RPC.
    285 
    286   Usually ndb eventloop el isn't run until some code calls future.get_result().
    287 
    288   When an async tasklet is called, the tasklet wrapper evaluates the tasklet
    289   code into a generator, enqueues a callback _help_tasklet_along onto
    290   the el.current queue, and returns a future.
    291 
    292   _help_tasklet_along, when called by the el, will
    293   get one yielded value from the generator. If the value if another future,
    294   set up a callback _on_future_complete to invoke _help_tasklet_along
    295   when the dependent future fulfills. If the value if a RPC, set up a
    296   callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills.
    297   Thus _help_tasklet_along drills down
    298   the chain of futures until some future is blocked by RPC. El runs
    299   all callbacks and constantly check pending RPC status.
    300   """
    301   el = eventloop.get_event_loop()
    302   while el.current:
    303     el.run0()
    304 
    305 
    306 def _eager_tasklet(tasklet):
    307   """Decorator to turn tasklet to run eagerly."""
    308 
    309   @utils.wrapping(tasklet)
    310   def eager_wrapper(*args, **kwds):
    311     fut = tasklet(*args, **kwds)
    312     _run_until_rpc()
    313     return fut
    314 
    315   return eager_wrapper
    316