Home | History | Annotate | Download | only in zip
      1 // Copyright 2016 Google Inc. All rights reserved.
      2 //
      3 // Licensed under the Apache License, Version 2.0 (the "License");
      4 // you may not use this file except in compliance with the License.
      5 // You may obtain a copy of the License at
      6 //
      7 //     http://www.apache.org/licenses/LICENSE-2.0
      8 //
      9 // Unless required by applicable law or agreed to in writing, software
     10 // distributed under the License is distributed on an "AS IS" BASIS,
     11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 // See the License for the specific language governing permissions and
     13 // limitations under the License.
     14 
     15 package zip
     16 
     17 import (
     18 	"fmt"
     19 	"runtime"
     20 )
     21 
     22 type RateLimit struct {
     23 	requests    chan request
     24 	completions chan int64
     25 
     26 	stop chan struct{}
     27 }
     28 
     29 type request struct {
     30 	size     int64
     31 	serviced chan struct{}
     32 }
     33 
     34 // NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
     35 // except when no capacity is in use, in which case the first caller is always permitted
     36 func NewRateLimit(capacity int64) *RateLimit {
     37 	ret := &RateLimit{
     38 		requests:    make(chan request),
     39 		completions: make(chan int64),
     40 
     41 		stop: make(chan struct{}),
     42 	}
     43 
     44 	go ret.monitorChannels(capacity)
     45 
     46 	return ret
     47 }
     48 
     49 // RequestExecution blocks until another execution of size <size> can be allowed to run.
     50 func (r *RateLimit) Request(size int64) {
     51 	request := request{
     52 		size:     size,
     53 		serviced: make(chan struct{}, 1),
     54 	}
     55 
     56 	// wait for the request to be received
     57 	r.requests <- request
     58 
     59 	// wait for the request to be accepted
     60 	<-request.serviced
     61 }
     62 
     63 // Finish declares the completion of an execution of size <size>
     64 func (r *RateLimit) Finish(size int64) {
     65 	r.completions <- size
     66 }
     67 
     68 // Stop the background goroutine
     69 func (r *RateLimit) Stop() {
     70 	close(r.stop)
     71 }
     72 
     73 // monitorChannels processes incoming requests from channels
     74 func (r *RateLimit) monitorChannels(capacity int64) {
     75 	var usedCapacity int64
     76 	var currentRequest *request
     77 
     78 	for {
     79 		var requests chan request
     80 		if currentRequest == nil {
     81 			// If we don't already have a queued request, then we should check for a new request
     82 			requests = r.requests
     83 		}
     84 
     85 		select {
     86 		case newRequest := <-requests:
     87 			currentRequest = &newRequest
     88 		case amountCompleted := <-r.completions:
     89 			usedCapacity -= amountCompleted
     90 
     91 			if usedCapacity < 0 {
     92 				panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted))
     93 			}
     94 		case <-r.stop:
     95 			return
     96 		}
     97 
     98 		if currentRequest != nil {
     99 			accepted := false
    100 			if usedCapacity == 0 {
    101 				accepted = true
    102 			} else {
    103 				if capacity >= usedCapacity+currentRequest.size {
    104 					accepted = true
    105 				}
    106 			}
    107 			if accepted {
    108 				usedCapacity += currentRequest.size
    109 				currentRequest.serviced <- struct{}{}
    110 				currentRequest = nil
    111 			}
    112 		}
    113 	}
    114 }
    115 
    116 // A CPURateLimiter limits the number of active calls based on CPU requirements
    117 type CPURateLimiter struct {
    118 	impl *RateLimit
    119 }
    120 
    121 func NewCPURateLimiter(capacity int64) *CPURateLimiter {
    122 	if capacity <= 0 {
    123 		capacity = int64(runtime.NumCPU())
    124 	}
    125 	impl := NewRateLimit(capacity)
    126 	return &CPURateLimiter{impl: impl}
    127 }
    128 
    129 func (e CPURateLimiter) Request() {
    130 	e.impl.Request(1)
    131 }
    132 
    133 func (e CPURateLimiter) Finish() {
    134 	e.impl.Finish(1)
    135 }
    136 
    137 func (e CPURateLimiter) Stop() {
    138 	e.impl.Stop()
    139 }
    140 
    141 // A MemoryRateLimiter limits the number of active calls based on Memory requirements
    142 type MemoryRateLimiter struct {
    143 	*RateLimit
    144 }
    145 
    146 func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
    147 	if capacity <= 0 {
    148 		capacity = 512 * 1024 * 1024 // 512MB
    149 	}
    150 	impl := NewRateLimit(capacity)
    151 	return &MemoryRateLimiter{RateLimit: impl}
    152 }
    153