1 // Copyright 2016 Google Inc. All rights reserved. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package zip 16 17 import ( 18 "fmt" 19 "runtime" 20 ) 21 22 type RateLimit struct { 23 requests chan request 24 completions chan int64 25 26 stop chan struct{} 27 } 28 29 type request struct { 30 size int64 31 serviced chan struct{} 32 } 33 34 // NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once, 35 // except when no capacity is in use, in which case the first caller is always permitted 36 func NewRateLimit(capacity int64) *RateLimit { 37 ret := &RateLimit{ 38 requests: make(chan request), 39 completions: make(chan int64), 40 41 stop: make(chan struct{}), 42 } 43 44 go ret.monitorChannels(capacity) 45 46 return ret 47 } 48 49 // RequestExecution blocks until another execution of size <size> can be allowed to run. 50 func (r *RateLimit) Request(size int64) { 51 request := request{ 52 size: size, 53 serviced: make(chan struct{}, 1), 54 } 55 56 // wait for the request to be received 57 r.requests <- request 58 59 // wait for the request to be accepted 60 <-request.serviced 61 } 62 63 // Finish declares the completion of an execution of size <size> 64 func (r *RateLimit) Finish(size int64) { 65 r.completions <- size 66 } 67 68 // Stop the background goroutine 69 func (r *RateLimit) Stop() { 70 close(r.stop) 71 } 72 73 // monitorChannels processes incoming requests from channels 74 func (r *RateLimit) monitorChannels(capacity int64) { 75 var usedCapacity int64 76 var currentRequest *request 77 78 for { 79 var requests chan request 80 if currentRequest == nil { 81 // If we don't already have a queued request, then we should check for a new request 82 requests = r.requests 83 } 84 85 select { 86 case newRequest := <-requests: 87 currentRequest = &newRequest 88 case amountCompleted := <-r.completions: 89 usedCapacity -= amountCompleted 90 91 if usedCapacity < 0 { 92 panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted)) 93 } 94 case <-r.stop: 95 return 96 } 97 98 if currentRequest != nil { 99 accepted := false 100 if usedCapacity == 0 { 101 accepted = true 102 } else { 103 if capacity >= usedCapacity+currentRequest.size { 104 accepted = true 105 } 106 } 107 if accepted { 108 usedCapacity += currentRequest.size 109 currentRequest.serviced <- struct{}{} 110 currentRequest = nil 111 } 112 } 113 } 114 } 115 116 // A CPURateLimiter limits the number of active calls based on CPU requirements 117 type CPURateLimiter struct { 118 impl *RateLimit 119 } 120 121 func NewCPURateLimiter(capacity int64) *CPURateLimiter { 122 if capacity <= 0 { 123 capacity = int64(runtime.NumCPU()) 124 } 125 impl := NewRateLimit(capacity) 126 return &CPURateLimiter{impl: impl} 127 } 128 129 func (e CPURateLimiter) Request() { 130 e.impl.Request(1) 131 } 132 133 func (e CPURateLimiter) Finish() { 134 e.impl.Finish(1) 135 } 136 137 func (e CPURateLimiter) Stop() { 138 e.impl.Stop() 139 } 140 141 // A MemoryRateLimiter limits the number of active calls based on Memory requirements 142 type MemoryRateLimiter struct { 143 *RateLimit 144 } 145 146 func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter { 147 if capacity <= 0 { 148 capacity = 512 * 1024 * 1024 // 512MB 149 } 150 impl := NewRateLimit(capacity) 151 return &MemoryRateLimiter{RateLimit: impl} 152 } 153