1 /* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package transport 20 21 import ( 22 "fmt" 23 "math" 24 "sync" 25 "sync/atomic" 26 "time" 27 ) 28 29 const ( 30 // The default value of flow control window size in HTTP2 spec. 31 defaultWindowSize = 65535 32 // The initial window size for flow control. 33 initialWindowSize = defaultWindowSize // for an RPC 34 infinity = time.Duration(math.MaxInt64) 35 defaultClientKeepaliveTime = infinity 36 defaultClientKeepaliveTimeout = 20 * time.Second 37 defaultMaxStreamsClient = 100 38 defaultMaxConnectionIdle = infinity 39 defaultMaxConnectionAge = infinity 40 defaultMaxConnectionAgeGrace = infinity 41 defaultServerKeepaliveTime = 2 * time.Hour 42 defaultServerKeepaliveTimeout = 20 * time.Second 43 defaultKeepalivePolicyMinTime = 5 * time.Minute 44 // max window limit set by HTTP2 Specs. 45 maxWindowSize = math.MaxInt32 46 // defaultWriteQuota is the default value for number of data 47 // bytes that each stream can schedule before some of it being 48 // flushed out. 49 defaultWriteQuota = 64 * 1024 50 ) 51 52 // writeQuota is a soft limit on the amount of data a stream can 53 // schedule before some of it is written out. 54 type writeQuota struct { 55 quota int32 56 // get waits on read from when quota goes less than or equal to zero. 57 // replenish writes on it when quota goes positive again. 58 ch chan struct{} 59 // done is triggered in error case. 60 done <-chan struct{} 61 // replenish is called by loopyWriter to give quota back to. 62 // It is implemented as a field so that it can be updated 63 // by tests. 64 replenish func(n int) 65 } 66 67 func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota { 68 w := &writeQuota{ 69 quota: sz, 70 ch: make(chan struct{}, 1), 71 done: done, 72 } 73 w.replenish = w.realReplenish 74 return w 75 } 76 77 func (w *writeQuota) get(sz int32) error { 78 for { 79 if atomic.LoadInt32(&w.quota) > 0 { 80 atomic.AddInt32(&w.quota, -sz) 81 return nil 82 } 83 select { 84 case <-w.ch: 85 continue 86 case <-w.done: 87 return errStreamDone 88 } 89 } 90 } 91 92 func (w *writeQuota) realReplenish(n int) { 93 sz := int32(n) 94 a := atomic.AddInt32(&w.quota, sz) 95 b := a - sz 96 if b <= 0 && a > 0 { 97 select { 98 case w.ch <- struct{}{}: 99 default: 100 } 101 } 102 } 103 104 type trInFlow struct { 105 limit uint32 106 unacked uint32 107 effectiveWindowSize uint32 108 } 109 110 func (f *trInFlow) newLimit(n uint32) uint32 { 111 d := n - f.limit 112 f.limit = n 113 f.updateEffectiveWindowSize() 114 return d 115 } 116 117 func (f *trInFlow) onData(n uint32) uint32 { 118 f.unacked += n 119 if f.unacked >= f.limit/4 { 120 w := f.unacked 121 f.unacked = 0 122 f.updateEffectiveWindowSize() 123 return w 124 } 125 f.updateEffectiveWindowSize() 126 return 0 127 } 128 129 func (f *trInFlow) reset() uint32 { 130 w := f.unacked 131 f.unacked = 0 132 f.updateEffectiveWindowSize() 133 return w 134 } 135 136 func (f *trInFlow) updateEffectiveWindowSize() { 137 atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked) 138 } 139 140 func (f *trInFlow) getSize() uint32 { 141 return atomic.LoadUint32(&f.effectiveWindowSize) 142 } 143 144 // TODO(mmukhi): Simplify this code. 145 // inFlow deals with inbound flow control 146 type inFlow struct { 147 mu sync.Mutex 148 // The inbound flow control limit for pending data. 149 limit uint32 150 // pendingData is the overall data which have been received but not been 151 // consumed by applications. 152 pendingData uint32 153 // The amount of data the application has consumed but grpc has not sent 154 // window update for them. Used to reduce window update frequency. 155 pendingUpdate uint32 156 // delta is the extra window update given by receiver when an application 157 // is reading data bigger in size than the inFlow limit. 158 delta uint32 159 } 160 161 // newLimit updates the inflow window to a new value n. 162 // It assumes that n is always greater than the old limit. 163 func (f *inFlow) newLimit(n uint32) uint32 { 164 f.mu.Lock() 165 d := n - f.limit 166 f.limit = n 167 f.mu.Unlock() 168 return d 169 } 170 171 func (f *inFlow) maybeAdjust(n uint32) uint32 { 172 if n > uint32(math.MaxInt32) { 173 n = uint32(math.MaxInt32) 174 } 175 f.mu.Lock() 176 // estSenderQuota is the receiver's view of the maximum number of bytes the sender 177 // can send without a window update. 178 estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate)) 179 // estUntransmittedData is the maximum number of bytes the sends might not have put 180 // on the wire yet. A value of 0 or less means that we have already received all or 181 // more bytes than the application is requesting to read. 182 estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative. 183 // This implies that unless we send a window update, the sender won't be able to send all the bytes 184 // for this message. Therefore we must send an update over the limit since there's an active read 185 // request from the application. 186 if estUntransmittedData > estSenderQuota { 187 // Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec. 188 if f.limit+n > maxWindowSize { 189 f.delta = maxWindowSize - f.limit 190 } else { 191 // Send a window update for the whole message and not just the difference between 192 // estUntransmittedData and estSenderQuota. This will be helpful in case the message 193 // is padded; We will fallback on the current available window(at least a 1/4th of the limit). 194 f.delta = n 195 } 196 f.mu.Unlock() 197 return f.delta 198 } 199 f.mu.Unlock() 200 return 0 201 } 202 203 // onData is invoked when some data frame is received. It updates pendingData. 204 func (f *inFlow) onData(n uint32) error { 205 f.mu.Lock() 206 f.pendingData += n 207 if f.pendingData+f.pendingUpdate > f.limit+f.delta { 208 limit := f.limit 209 rcvd := f.pendingData + f.pendingUpdate 210 f.mu.Unlock() 211 return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit) 212 } 213 f.mu.Unlock() 214 return nil 215 } 216 217 // onRead is invoked when the application reads the data. It returns the window size 218 // to be sent to the peer. 219 func (f *inFlow) onRead(n uint32) uint32 { 220 f.mu.Lock() 221 if f.pendingData == 0 { 222 f.mu.Unlock() 223 return 0 224 } 225 f.pendingData -= n 226 if n > f.delta { 227 n -= f.delta 228 f.delta = 0 229 } else { 230 f.delta -= n 231 n = 0 232 } 233 f.pendingUpdate += n 234 if f.pendingUpdate >= f.limit/4 { 235 wu := f.pendingUpdate 236 f.pendingUpdate = 0 237 f.mu.Unlock() 238 return wu 239 } 240 f.mu.Unlock() 241 return 0 242 } 243