1 #!/usr/bin/env python 2 # 3 # Copyright 2010 Google Inc. 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 # 17 18 """Quota service definition and implementation. 19 20 Contains message and service definitions for a simple quota service. The 21 service maintains a set of quotas for users that can be deducted from in 22 a single transaction. The requests to do this can be configured so that if 23 one quota check fails, none of the quota changes will take effect. 24 25 The service is configured using a QuotaConfig object and can be passed an 26 existing quota state (useful for if the service quits unexpectedly and is 27 being restored from checkpoint). For this reason it is necessary to use 28 a factory instead of the default constructor. For example: 29 30 quota_config = QuotaConfig( 31 buckets = [ QuotaBucket('DISK', 1000000), 32 QuotaBucket('EMAILS', 100, refresh_every=24 * 60 * 60), 33 ]) 34 quota_state = {} 35 quota_service = QuotaService.new_factory(quota_config, quota_state) 36 37 Every on-going request to the quota service shares the same configuration and 38 state objects. 39 40 Individual quota buckets can be specified to refresh to their original amounts 41 at regular intervals. These intervals are specified in seconds. The example 42 above specifies that the email quota is refreshed to 100 emails every day. 43 44 It is up to the client using the quota service to respond correctly to the 45 response of the quota service. It does not try to raise an exception on 46 dential. 47 """ 48 49 import threading 50 import time 51 52 from protorpc import messages 53 from protorpc import remote 54 from protorpc import util 55 56 57 class QuotaCheck(messages.Message): 58 """Result of checking quota of a single bucket. 59 60 Fields: 61 name: Name of quota bucket to check. 62 tokens: Number of tokens to check for quota or deduct. A negative value 63 can be used to credit quota buckets. 64 mode: Quota check-mode. See Mode enumeration class for more details. 65 """ 66 67 class Mode(messages.Enum): 68 """Mode for individual bucket quota check. 69 70 Values: 71 ALL: All tokens must be available for consumption or else quota check 72 fails and all deductions/credits are ignored. 73 SOME: At least some tokens must be available for consumption. This check 74 will only fail if the remaining tokens in the bucket are already at 75 zero. 76 CHECK_ALL: All tokens must be available in bucket or else quota check 77 fails and all other deductions/credits are ignored. This will not cause 78 a deduction to occur for the indicated bucket. 79 CHECK_ALL: At least some tokens must be available in bucket or else quota 80 check fails and all other deductions/credits are ignored. This will 81 not cause a deduction to occur for the indicated bucket. 82 """ 83 ALL = 1 84 SOME = 2 85 CHECK_ALL = 3 86 CHECK_SOME = 4 87 88 name = messages.StringField(1, required=True) 89 tokens = messages.IntegerField(2, required=True) 90 mode = messages.EnumField(Mode, 3, default=Mode.ALL) 91 92 93 class QuotaRequest(messages.Message): 94 """A request to check or deduct tokens from a users bucket. 95 96 Fields: 97 user: User to check or deduct quota for. 98 quotas: Quotas to check or deduct against. 99 """ 100 101 user = messages.StringField(1, required=True) 102 quotas = messages.MessageField(QuotaCheck, 2, repeated=True) 103 104 105 class CheckResult(messages.Message): 106 """Quota check results. 107 108 Fields: 109 status: Status of quota check for bucket. See Status enum for details. 110 available: Number of actual tokens available or consumed. Will be 111 less than the number of requested tokens when bucket has fewer 112 tokens than requested. 113 """ 114 115 class Status(messages.Enum): 116 """Status of check result. 117 118 Values: 119 OK: All requested tokens are available or were deducted. 120 SOME: Some requested tokens are available or were deducted. This will 121 cause any deductions to fail if the request mode is ALL or CHECK_ALL. 122 NONE: No tokens were available. Quota check is considered to have failed. 123 """ 124 OK = 1 125 SOME = 2 126 NONE = 3 127 128 status = messages.EnumField(Status, 1, required=True) 129 available = messages.IntegerField(2, required=True) 130 131 132 class QuotaResponse(messages.Message): 133 """ Response to QuotaRequest. 134 135 Fields: 136 all_status: Overall status of quota request. If no quota tokens were 137 available at all, this will be NONE. If some tokens were available, even 138 if some buckets had no tokens, this will be SOME. If all tokens were 139 available this will be OK. 140 denied: If true, it means that some required quota check has failed. Any 141 deductions in the request will be ignored, even if those individual 142 buckets had adequate tokens. 143 results: Specific results of quota check for each requested bucket. The 144 names are not included as they will have a one to one correspondence with 145 buckets indicated in the request. 146 """ 147 148 all_status = messages.EnumField(CheckResult.Status, 1, required=True) 149 denied = messages.BooleanField(2, required=True) 150 results = messages.MessageField(CheckResult, 3, repeated=True) 151 152 153 class QuotaConfig(messages.Message): 154 """Quota configuration. 155 156 Structure used for configuring quota server. This message is not used 157 directly in the service definition, but is used to configure the 158 implementation. 159 160 Fields: 161 buckets: Individual bucket configurations. Bucket configurations are 162 specified per server and are configured for any user that is requested. 163 """ 164 165 class Bucket(messages.Message): 166 """Individual bucket configuration. 167 168 Fields: 169 name: Bucket name. 170 initial_tokens: Number of tokens initially configured for this bucket. 171 refresh_every: Number of seconds after which initial tokens are restored. 172 If this value is None, tokens are never restored once used, unless 173 credited by the application. 174 """ 175 176 name = messages.StringField(1, required=True) 177 initial_tokens = messages.IntegerField(2, required=True) 178 refresh_every = messages.IntegerField(4) 179 180 buckets = messages.MessageField(Bucket, 1, repeated=True) 181 182 183 class QuotaStateRequest(messages.Message): 184 """Request state of all quota buckets for a single user. 185 186 Used for determining how many tokens remain in all the users quota buckets. 187 188 Fields: 189 user: The user to get buckets for. 190 """ 191 192 user = messages.StringField(1, required=True) 193 194 195 class BucketState(messages.Message): 196 """State of an individual quota bucket. 197 198 Fields: 199 name: Name of bucket. 200 remaining_tokens: Number of tokens that remain in bucket. 201 """ 202 203 name = messages.StringField(1, required=True) 204 remaining_tokens = messages.IntegerField(2, required=True) 205 206 207 class QuotaStateResponse(messages.Message): 208 """Response to QuotaStateRequest containing set of bucket states for user.""" 209 210 bucket_states = messages.MessageField(BucketState, 1, repeated=True) 211 212 213 class QuotaState(object): 214 """Quota state class, used by implementation of service. 215 216 This class is responsible for managing all the bucket states for a user. 217 Quota checks and deductions must be done in the context of a transaction. If 218 a transaction fails, it can be rolled back so that the values of the 219 individual buckets are preserved, even if previous checks and deductions 220 succeeded. 221 """ 222 223 @util.positional(3) 224 def __init__(self, state, buckets): 225 """Constructor. 226 227 Args: 228 state: A dictionary that is used to contain the state, mapping buckets to 229 tuples (remaining_tokens, next_refresh): 230 remaining_tokens: Number of tokens remaining in the bucket. 231 next_refresh: Time when bucket needs to be refilled with initial 232 tokens. 233 buckets: A dictionary that maps buckets to BucketConfig objects. 234 """ 235 self.__state = state 236 self.__buckets = buckets 237 238 self.__lock = threading.Lock() # Used at transaction commit time. 239 self.__transaction = threading.local() 240 self.__transaction.changes = None # Dictionary bucket -> token deduction. 241 # Can be negative indicating credit. 242 self.__transaction.time = None # Time at which transaction began. 243 244 def in_transaction(self): 245 return self.__transaction.changes is not None 246 247 def begin_transaction(self): 248 """Begin quota transaction.""" 249 assert not self.in_transaction() 250 self.__transaction.changes = {} 251 self.__transaction.time = int(time.time()) 252 self.__lock.acquire() 253 254 def commit_transaction(self): 255 """Commit deductions of quota transaction.""" 256 assert self.in_transaction() 257 for name, change in self.__transaction.changes.iteritems(): 258 remaining_tokens, next_refresh = self.__state[name] 259 new_tokens = max(0, remaining_tokens + change) 260 self.__state[name] = new_tokens, next_refresh 261 self.__transaction.changes = None 262 self.__lock.release() 263 264 def abort_transaction(self): 265 """Roll back transaction ignoring quota changes.""" 266 assert self.in_transaction() 267 self.__transaction.changes = None 268 self.__lock.release() 269 270 def get_remaining_tokens(self, name): 271 """Get remaining tokens for a bucket. 272 273 This function must be called within a transaction. 274 275 Args: 276 name: Bucket name. 277 278 Returns: 279 Integer of remaining tokens in users quota bucket. 280 """ 281 assert self.in_transaction() 282 changes = self.__transaction.changes.get(name, 0) 283 remaining_tokens, next_refresh = self.__state.get(name, (None, None)) 284 if remaining_tokens is not None and ( 285 next_refresh is None or 286 next_refresh >= self.__transaction.time): 287 return remaining_tokens + changes 288 289 bucket = self.__buckets.get(name, None) 290 if bucket is None: 291 return None 292 293 if bucket.refresh_every: 294 next_refresh = self.__transaction.time + bucket.refresh_every 295 else: 296 next_refresh = None 297 self.__state[name] = bucket.initial_tokens, next_refresh 298 return bucket.initial_tokens + changes 299 300 def check_quota(self, name, tokens): 301 """Check to determine if there are enough quotas in a bucket. 302 303 Args: 304 name: Name of bucket to check. 305 tokens: Number of tokens to check for availability. Can be negative. 306 307 Returns: 308 The count of requested tokens or if insufficient, the number of tokens 309 available. 310 """ 311 assert self.in_transaction() 312 assert name not in self.__transaction.changes 313 remaining_tokens = self.get_remaining_tokens(name) 314 if remaining_tokens is None: 315 return None 316 return min(tokens, remaining_tokens) 317 318 def deduct_quota(self, name, tokens): 319 """Add a quota deduction to the transaction. 320 321 Args: 322 name: Name of bucket to deduct from. 323 tokens: Number of tokens to request. 324 325 Returns: 326 The count of requested tokens or if insufficient, the number of tokens 327 available that will be deducted upon transaction commit. 328 """ 329 available_tokens = self.check_quota(name, tokens) 330 if available_tokens is None: 331 return None 332 diff = max(0, tokens - available_tokens) 333 self.__transaction.changes[name] = -(tokens - diff) 334 return available_tokens 335 336 337 class QuotaService(remote.Service): 338 """Quota service.""" 339 340 __state_lock = threading.Lock() 341 342 def __init__(self, config, states): 343 """Constructor. 344 345 NOTE: This constructor requires parameters which means a factory function 346 must be used for instantiating the QuotaService. 347 348 Args: 349 config: An instance of QuotaConfig. 350 states: Dictionary mapping user -> QuotaState objects. 351 """ 352 self.__states = states 353 self.__config = config 354 self.__buckets = {} 355 for bucket in self.__config.buckets: 356 self.__buckets[bucket.name] = bucket 357 358 def __get_state(self, user): 359 """Get the state of a user. 360 361 If no user state exists, this function will create one and store 362 it for access later. 363 364 user: User string to get quota state for. 365 """ 366 state = self.__states.get(user, None) 367 if state is None: 368 state = QuotaState({}, self.__buckets) 369 # TODO: Potentially problematic bottleneck. 370 self.__state_lock.acquire() 371 try: 372 self.__states[user] = state 373 finally: 374 self.__state_lock.release() 375 return state 376 377 @remote.method(QuotaRequest, QuotaResponse) 378 def check_quota(self, request): 379 """Perform a quota check for a user.""" 380 state = self.__get_state(request.user) 381 382 response = QuotaResponse(all_status=CheckResult.Status.OK) 383 response.denied = False 384 385 state.begin_transaction() 386 try: 387 for quota in request.quotas: 388 if quota.mode in (QuotaCheck.Mode.CHECK_ALL, 389 QuotaCheck.Mode.CHECK_SOME): 390 func = state.check_quota 391 else: 392 func = state.deduct_quota 393 394 available = func(quota.name, quota.tokens) 395 if available is None: 396 raise remote.ApplicationError( 397 'Unknown quota %s requested' % quota.name) 398 399 result = CheckResult(available=available) 400 response.results.append(result) 401 if available == quota.tokens: 402 result.status = CheckResult.Status.OK 403 if response.all_status == CheckResult.Status.NONE: 404 result.status = CheckResult.Status.SOME 405 elif available == 0: 406 result.status = CheckResult.Status.NONE 407 if response.all_status == CheckResult.Status.OK: 408 response.all_status = CheckResult.Status.NONE 409 response.denied = True 410 else: 411 result.status = CheckResult.Status.SOME 412 response.all_status = CheckResult.Status.SOME 413 if quota.mode in (QuotaCheck.Mode.ALL, QuotaCheck.Mode.CHECK_ALL): 414 response.denied = True 415 416 if response.denied: 417 state.abort_transaction() 418 else: 419 state.commit_transaction() 420 except: 421 state.abort_transaction() 422 raise 423 return response 424 425 @remote.method(QuotaStateRequest, QuotaStateResponse) 426 def get_quota_state(self, request): 427 """Get current state of users quota buckets.""" 428 state = self.__get_state(request.user) 429 430 state.begin_transaction() 431 432 try: 433 response = QuotaStateResponse() 434 for name in sorted(self.__buckets.keys()): 435 bucket_state = BucketState( 436 name=name, 437 remaining_tokens=state.get_remaining_tokens(name)) 438 response.bucket_states.append(bucket_state) 439 return response 440 finally: 441 state.abort_transaction() 442