Home | History | Annotate | Download | only in quotas
      1 #!/usr/bin/env python
      2 #
      3 # Copyright 2010 Google Inc.
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 #     http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 #
     17 
     18 """Quota service definition and implementation.
     19 
     20 Contains message and service definitions for a simple quota service.  The
     21 service maintains a set of quotas for users that can be deducted from in
     22 a single transaction.  The requests to do this can be configured so that if
     23 one quota check fails, none of the quota changes will take effect.
     24 
     25 The service is configured using a QuotaConfig object and can be passed an
     26 existing quota state (useful for if the service quits unexpectedly and is
     27 being restored from checkpoint).  For this reason it is necessary to use
     28 a factory instead of the default constructor.  For example:
     29 
     30   quota_config = QuotaConfig(
     31       buckets = [ QuotaBucket('DISK', 1000000),
     32                   QuotaBucket('EMAILS', 100, refresh_every=24 * 60 * 60),
     33                 ])
     34   quota_state = {}
     35   quota_service = QuotaService.new_factory(quota_config, quota_state)
     36 
     37 Every on-going request to the quota service shares the same configuration and
     38 state objects.
     39 
     40 Individual quota buckets can be specified to refresh to their original amounts
     41 at regular intervals.  These intervals are specified in seconds.  The example
     42 above specifies that the email quota is refreshed to 100 emails every day.
     43 
     44 It is up to the client using the quota service to respond correctly to the
     45 response of the quota service.  It does not try to raise an exception on
     46 dential.
     47 """
     48 
     49 import threading
     50 import time
     51 
     52 from protorpc import messages
     53 from protorpc import remote
     54 from protorpc import util
     55 
     56 
     57 class QuotaCheck(messages.Message):
     58   """Result of checking quota of a single bucket.
     59 
     60   Fields:
     61     name: Name of quota bucket to check.
     62     tokens: Number of tokens to check for quota or deduct.  A negative value
     63       can be used to credit quota buckets.
     64     mode: Quota check-mode.  See Mode enumeration class for more details.
     65   """
     66 
     67   class Mode(messages.Enum):
     68     """Mode for individual bucket quota check.
     69 
     70     Values:
     71       ALL: All tokens must be available for consumption or else quota check
     72         fails and all deductions/credits are ignored.
     73       SOME: At least some tokens must be available for consumption.  This check
     74         will only fail if the remaining tokens in the bucket are already at
     75         zero.
     76       CHECK_ALL: All tokens must be available in bucket or else quota check
     77         fails and all other deductions/credits are ignored.  This will not cause
     78         a deduction to occur for the indicated bucket.
     79       CHECK_ALL: At least some tokens must be available in bucket or else quota
     80         check fails and all other deductions/credits are ignored.  This will
     81         not cause a deduction to occur for the indicated bucket.
     82     """
     83     ALL = 1
     84     SOME = 2
     85     CHECK_ALL = 3
     86     CHECK_SOME = 4
     87 
     88   name = messages.StringField(1, required=True)
     89   tokens = messages.IntegerField(2, required=True)
     90   mode = messages.EnumField(Mode, 3, default=Mode.ALL)
     91 
     92 
     93 class QuotaRequest(messages.Message):
     94   """A request to check or deduct tokens from a users bucket.
     95 
     96   Fields:
     97     user: User to check or deduct quota for.
     98     quotas: Quotas to check or deduct against.
     99   """
    100 
    101   user = messages.StringField(1, required=True)
    102   quotas = messages.MessageField(QuotaCheck, 2, repeated=True)
    103   
    104 
    105 class CheckResult(messages.Message):
    106   """Quota check results.
    107 
    108   Fields:
    109     status: Status of quota check for bucket.  See Status enum for details.
    110     available: Number of actual tokens available or consumed.  Will be
    111       less than the number of requested tokens when bucket has fewer
    112       tokens than requested.
    113   """
    114 
    115   class Status(messages.Enum):
    116     """Status of check result.
    117 
    118     Values:
    119       OK: All requested tokens are available or were deducted.
    120       SOME: Some requested tokens are available or were deducted.  This will
    121         cause any deductions to fail if the request mode is ALL or CHECK_ALL.
    122       NONE: No tokens were available.  Quota check is considered to have failed.
    123     """
    124     OK = 1
    125     SOME = 2
    126     NONE = 3
    127 
    128   status = messages.EnumField(Status, 1, required=True)
    129   available = messages.IntegerField(2, required=True)
    130 
    131 
    132 class QuotaResponse(messages.Message):
    133   """ Response to QuotaRequest.
    134 
    135   Fields:
    136     all_status: Overall status of quota request.  If no quota tokens were
    137       available at all, this will be NONE.  If some tokens were available, even
    138       if some buckets had no tokens, this will be SOME.  If all tokens were
    139       available this will be OK.
    140     denied: If true, it means that some required quota check has failed.  Any
    141       deductions in the request will be ignored, even if those individual
    142       buckets had adequate tokens.
    143     results: Specific results of quota check for each requested bucket.  The
    144       names are not included as they will have a one to one correspondence with
    145       buckets indicated in the request.
    146   """
    147 
    148   all_status = messages.EnumField(CheckResult.Status, 1, required=True)
    149   denied = messages.BooleanField(2, required=True)
    150   results = messages.MessageField(CheckResult, 3, repeated=True)
    151 
    152 
    153 class QuotaConfig(messages.Message):
    154   """Quota configuration.
    155 
    156   Structure used for configuring quota server.  This message is not used
    157   directly in the service definition, but is used to configure the
    158   implementation.
    159 
    160   Fields:
    161     buckets: Individual bucket configurations.  Bucket configurations are
    162       specified per server and are configured for any user that is requested.
    163   """
    164 
    165   class Bucket(messages.Message):
    166     """Individual bucket configuration.
    167 
    168     Fields:
    169       name: Bucket name.
    170       initial_tokens: Number of tokens initially configured for this bucket.
    171       refresh_every: Number of seconds after which initial tokens are restored.
    172         If this value is None, tokens are never restored once used, unless
    173         credited by the application.
    174     """
    175 
    176     name = messages.StringField(1, required=True)
    177     initial_tokens = messages.IntegerField(2, required=True)
    178     refresh_every = messages.IntegerField(4)
    179 
    180   buckets = messages.MessageField(Bucket, 1, repeated=True)
    181 
    182 
    183 class QuotaStateRequest(messages.Message):
    184   """Request state of all quota buckets for a single user.
    185 
    186   Used for determining how many tokens remain in all the users quota buckets.
    187 
    188   Fields:
    189     user: The user to get buckets for.
    190   """
    191 
    192   user = messages.StringField(1, required=True)
    193 
    194 
    195 class BucketState(messages.Message):
    196   """State of an individual quota bucket.
    197 
    198   Fields:
    199     name: Name of bucket.
    200     remaining_tokens: Number of tokens that remain in bucket.
    201   """
    202 
    203   name = messages.StringField(1, required=True)
    204   remaining_tokens = messages.IntegerField(2, required=True)
    205 
    206 
    207 class QuotaStateResponse(messages.Message):
    208   """Response to QuotaStateRequest containing set of bucket states for user."""
    209 
    210   bucket_states = messages.MessageField(BucketState, 1, repeated=True)
    211 
    212 
    213 class QuotaState(object):
    214   """Quota state class, used by implementation of service.
    215 
    216   This class is responsible for managing all the bucket states for a user.
    217   Quota checks and deductions must be done in the context of a transaction.  If
    218   a transaction fails, it can be rolled back so that the values of the
    219   individual buckets are preserved, even if previous checks and deductions
    220   succeeded.
    221   """
    222 
    223   @util.positional(3)
    224   def __init__(self, state, buckets):
    225     """Constructor.
    226 
    227     Args:
    228       state: A dictionary that is used to contain the state, mapping buckets to
    229         tuples (remaining_tokens, next_refresh):
    230           remaining_tokens: Number of tokens remaining in the bucket.
    231           next_refresh: Time when bucket needs to be refilled with initial
    232             tokens.
    233       buckets: A dictionary that maps buckets to BucketConfig objects.
    234     """
    235     self.__state = state
    236     self.__buckets = buckets
    237 
    238     self.__lock = threading.Lock()  # Used at transaction commit time.
    239     self.__transaction = threading.local()
    240     self.__transaction.changes = None  # Dictionary bucket -> token deduction.
    241                                        # Can be negative indicating credit.
    242     self.__transaction.time = None     # Time at which transaction began.
    243 
    244   def in_transaction(self):
    245     return self.__transaction.changes is not None
    246 
    247   def begin_transaction(self):
    248     """Begin quota transaction."""
    249     assert not self.in_transaction()
    250     self.__transaction.changes = {}
    251     self.__transaction.time = int(time.time())
    252     self.__lock.acquire()
    253 
    254   def commit_transaction(self):
    255     """Commit deductions of quota transaction."""
    256     assert self.in_transaction()
    257     for name, change in self.__transaction.changes.iteritems():
    258       remaining_tokens, next_refresh = self.__state[name]
    259       new_tokens = max(0, remaining_tokens + change)
    260       self.__state[name] = new_tokens, next_refresh
    261     self.__transaction.changes = None
    262     self.__lock.release()
    263 
    264   def abort_transaction(self):
    265     """Roll back transaction ignoring quota changes."""
    266     assert self.in_transaction()
    267     self.__transaction.changes = None
    268     self.__lock.release()
    269 
    270   def get_remaining_tokens(self, name):
    271     """Get remaining tokens for a bucket.
    272 
    273     This function must be called within a transaction.
    274 
    275     Args:
    276       name: Bucket name.
    277 
    278     Returns:
    279       Integer of remaining tokens in users quota bucket.
    280     """
    281     assert self.in_transaction()
    282     changes = self.__transaction.changes.get(name, 0)
    283     remaining_tokens, next_refresh = self.__state.get(name, (None, None))
    284     if remaining_tokens is not None and (
    285       next_refresh is None or
    286       next_refresh >= self.__transaction.time):
    287       return remaining_tokens + changes
    288 
    289     bucket = self.__buckets.get(name, None)
    290     if bucket is None:
    291       return None
    292 
    293     if bucket.refresh_every:
    294       next_refresh = self.__transaction.time + bucket.refresh_every
    295     else:
    296       next_refresh = None
    297     self.__state[name] = bucket.initial_tokens, next_refresh
    298     return bucket.initial_tokens + changes
    299 
    300   def check_quota(self, name, tokens):
    301     """Check to determine if there are enough quotas in a bucket.
    302 
    303     Args:
    304       name: Name of bucket to check.
    305       tokens: Number of tokens to check for availability.  Can be negative.
    306 
    307     Returns:
    308       The count of requested tokens or if insufficient, the number of tokens
    309       available.
    310     """
    311     assert self.in_transaction()
    312     assert name not in self.__transaction.changes
    313     remaining_tokens = self.get_remaining_tokens(name)
    314     if remaining_tokens is None:
    315       return None
    316     return min(tokens, remaining_tokens)
    317 
    318   def deduct_quota(self, name, tokens):
    319     """Add a quota deduction to the transaction.
    320 
    321     Args:
    322       name: Name of bucket to deduct from.
    323       tokens: Number of tokens to request.
    324 
    325     Returns:
    326       The count of requested tokens or if insufficient, the number of tokens
    327       available that will be deducted upon transaction commit.
    328     """
    329     available_tokens = self.check_quota(name, tokens)
    330     if available_tokens is None:
    331       return None
    332     diff = max(0, tokens - available_tokens)
    333     self.__transaction.changes[name] = -(tokens - diff)
    334     return available_tokens
    335 
    336 
    337 class QuotaService(remote.Service):
    338   """Quota service."""
    339 
    340   __state_lock = threading.Lock()
    341 
    342   def __init__(self, config, states):
    343     """Constructor.
    344 
    345     NOTE: This constructor requires parameters which means a factory function
    346     must be used for instantiating the QuotaService.
    347 
    348     Args:
    349       config: An instance of QuotaConfig.
    350       states: Dictionary mapping user -> QuotaState objects.
    351     """
    352     self.__states = states
    353     self.__config = config
    354     self.__buckets = {}
    355     for bucket in self.__config.buckets:
    356       self.__buckets[bucket.name] = bucket
    357 
    358   def __get_state(self, user):
    359     """Get the state of a user.
    360 
    361     If no user state exists, this function will create one and store
    362     it for access later.
    363 
    364     user: User string to get quota state for.
    365     """
    366     state = self.__states.get(user, None)
    367     if state is None:
    368       state = QuotaState({}, self.__buckets)
    369       # TODO: Potentially problematic bottleneck.
    370       self.__state_lock.acquire()
    371       try:
    372         self.__states[user] = state
    373       finally:
    374         self.__state_lock.release()
    375     return state
    376 
    377   @remote.method(QuotaRequest, QuotaResponse)
    378   def check_quota(self, request):
    379     """Perform a quota check for a user."""
    380     state = self.__get_state(request.user)
    381 
    382     response = QuotaResponse(all_status=CheckResult.Status.OK)
    383     response.denied = False
    384 
    385     state.begin_transaction()
    386     try:
    387       for quota in request.quotas:
    388         if quota.mode in (QuotaCheck.Mode.CHECK_ALL,
    389                           QuotaCheck.Mode.CHECK_SOME):
    390           func = state.check_quota
    391         else:
    392           func = state.deduct_quota
    393 
    394         available = func(quota.name, quota.tokens)
    395         if available is None:
    396           raise remote.ApplicationError(
    397             'Unknown quota %s requested' % quota.name)
    398 
    399         result = CheckResult(available=available)
    400         response.results.append(result)
    401         if available == quota.tokens:
    402           result.status = CheckResult.Status.OK
    403           if response.all_status == CheckResult.Status.NONE:
    404             result.status = CheckResult.Status.SOME
    405         elif available == 0:
    406           result.status = CheckResult.Status.NONE
    407           if response.all_status == CheckResult.Status.OK:
    408             response.all_status = CheckResult.Status.NONE
    409           response.denied = True
    410         else:
    411           result.status = CheckResult.Status.SOME
    412           response.all_status = CheckResult.Status.SOME
    413           if quota.mode in (QuotaCheck.Mode.ALL, QuotaCheck.Mode.CHECK_ALL):
    414             response.denied = True
    415 
    416       if response.denied:
    417         state.abort_transaction()
    418       else:
    419         state.commit_transaction()
    420     except:
    421       state.abort_transaction()
    422       raise
    423     return response
    424 
    425   @remote.method(QuotaStateRequest, QuotaStateResponse)
    426   def get_quota_state(self, request):
    427     """Get current state of users quota buckets."""
    428     state = self.__get_state(request.user)
    429 
    430     state.begin_transaction()
    431 
    432     try:
    433       response = QuotaStateResponse()
    434       for name in sorted(self.__buckets.keys()):
    435         bucket_state = BucketState(
    436           name=name,
    437           remaining_tokens=state.get_remaining_tokens(name))
    438         response.bucket_states.append(bucket_state)
    439       return response
    440     finally:
    441       state.abort_transaction()
    442