Home | History | Annotate | Download | only in transport
      1 /*
      2  *
      3  * Copyright 2014 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package transport
     20 
     21 import (
     22 	"fmt"
     23 	"math"
     24 	"sync"
     25 	"sync/atomic"
     26 	"time"
     27 )
     28 
     29 const (
     30 	// The default value of flow control window size in HTTP2 spec.
     31 	defaultWindowSize = 65535
     32 	// The initial window size for flow control.
     33 	initialWindowSize             = defaultWindowSize // for an RPC
     34 	infinity                      = time.Duration(math.MaxInt64)
     35 	defaultClientKeepaliveTime    = infinity
     36 	defaultClientKeepaliveTimeout = 20 * time.Second
     37 	defaultMaxStreamsClient       = 100
     38 	defaultMaxConnectionIdle      = infinity
     39 	defaultMaxConnectionAge       = infinity
     40 	defaultMaxConnectionAgeGrace  = infinity
     41 	defaultServerKeepaliveTime    = 2 * time.Hour
     42 	defaultServerKeepaliveTimeout = 20 * time.Second
     43 	defaultKeepalivePolicyMinTime = 5 * time.Minute
     44 	// max window limit set by HTTP2 Specs.
     45 	maxWindowSize = math.MaxInt32
     46 	// defaultWriteQuota is the default value for number of data
     47 	// bytes that each stream can schedule before some of it being
     48 	// flushed out.
     49 	defaultWriteQuota = 64 * 1024
     50 )
     51 
     52 // writeQuota is a soft limit on the amount of data a stream can
     53 // schedule before some of it is written out.
     54 type writeQuota struct {
     55 	quota int32
     56 	// get waits on read from when quota goes less than or equal to zero.
     57 	// replenish writes on it when quota goes positive again.
     58 	ch chan struct{}
     59 	// done is triggered in error case.
     60 	done <-chan struct{}
     61 	// replenish is called by loopyWriter to give quota back to.
     62 	// It is implemented as a field so that it can be updated
     63 	// by tests.
     64 	replenish func(n int)
     65 }
     66 
     67 func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
     68 	w := &writeQuota{
     69 		quota: sz,
     70 		ch:    make(chan struct{}, 1),
     71 		done:  done,
     72 	}
     73 	w.replenish = w.realReplenish
     74 	return w
     75 }
     76 
     77 func (w *writeQuota) get(sz int32) error {
     78 	for {
     79 		if atomic.LoadInt32(&w.quota) > 0 {
     80 			atomic.AddInt32(&w.quota, -sz)
     81 			return nil
     82 		}
     83 		select {
     84 		case <-w.ch:
     85 			continue
     86 		case <-w.done:
     87 			return errStreamDone
     88 		}
     89 	}
     90 }
     91 
     92 func (w *writeQuota) realReplenish(n int) {
     93 	sz := int32(n)
     94 	a := atomic.AddInt32(&w.quota, sz)
     95 	b := a - sz
     96 	if b <= 0 && a > 0 {
     97 		select {
     98 		case w.ch <- struct{}{}:
     99 		default:
    100 		}
    101 	}
    102 }
    103 
    104 type trInFlow struct {
    105 	limit               uint32
    106 	unacked             uint32
    107 	effectiveWindowSize uint32
    108 }
    109 
    110 func (f *trInFlow) newLimit(n uint32) uint32 {
    111 	d := n - f.limit
    112 	f.limit = n
    113 	f.updateEffectiveWindowSize()
    114 	return d
    115 }
    116 
    117 func (f *trInFlow) onData(n uint32) uint32 {
    118 	f.unacked += n
    119 	if f.unacked >= f.limit/4 {
    120 		w := f.unacked
    121 		f.unacked = 0
    122 		f.updateEffectiveWindowSize()
    123 		return w
    124 	}
    125 	f.updateEffectiveWindowSize()
    126 	return 0
    127 }
    128 
    129 func (f *trInFlow) reset() uint32 {
    130 	w := f.unacked
    131 	f.unacked = 0
    132 	f.updateEffectiveWindowSize()
    133 	return w
    134 }
    135 
    136 func (f *trInFlow) updateEffectiveWindowSize() {
    137 	atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked)
    138 }
    139 
    140 func (f *trInFlow) getSize() uint32 {
    141 	return atomic.LoadUint32(&f.effectiveWindowSize)
    142 }
    143 
    144 // TODO(mmukhi): Simplify this code.
    145 // inFlow deals with inbound flow control
    146 type inFlow struct {
    147 	mu sync.Mutex
    148 	// The inbound flow control limit for pending data.
    149 	limit uint32
    150 	// pendingData is the overall data which have been received but not been
    151 	// consumed by applications.
    152 	pendingData uint32
    153 	// The amount of data the application has consumed but grpc has not sent
    154 	// window update for them. Used to reduce window update frequency.
    155 	pendingUpdate uint32
    156 	// delta is the extra window update given by receiver when an application
    157 	// is reading data bigger in size than the inFlow limit.
    158 	delta uint32
    159 }
    160 
    161 // newLimit updates the inflow window to a new value n.
    162 // It assumes that n is always greater than the old limit.
    163 func (f *inFlow) newLimit(n uint32) uint32 {
    164 	f.mu.Lock()
    165 	d := n - f.limit
    166 	f.limit = n
    167 	f.mu.Unlock()
    168 	return d
    169 }
    170 
    171 func (f *inFlow) maybeAdjust(n uint32) uint32 {
    172 	if n > uint32(math.MaxInt32) {
    173 		n = uint32(math.MaxInt32)
    174 	}
    175 	f.mu.Lock()
    176 	// estSenderQuota is the receiver's view of the maximum number of bytes the sender
    177 	// can send without a window update.
    178 	estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
    179 	// estUntransmittedData is the maximum number of bytes the sends might not have put
    180 	// on the wire yet. A value of 0 or less means that we have already received all or
    181 	// more bytes than the application is requesting to read.
    182 	estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
    183 	// This implies that unless we send a window update, the sender won't be able to send all the bytes
    184 	// for this message. Therefore we must send an update over the limit since there's an active read
    185 	// request from the application.
    186 	if estUntransmittedData > estSenderQuota {
    187 		// Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.
    188 		if f.limit+n > maxWindowSize {
    189 			f.delta = maxWindowSize - f.limit
    190 		} else {
    191 			// Send a window update for the whole message and not just the difference between
    192 			// estUntransmittedData and estSenderQuota. This will be helpful in case the message
    193 			// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
    194 			f.delta = n
    195 		}
    196 		f.mu.Unlock()
    197 		return f.delta
    198 	}
    199 	f.mu.Unlock()
    200 	return 0
    201 }
    202 
    203 // onData is invoked when some data frame is received. It updates pendingData.
    204 func (f *inFlow) onData(n uint32) error {
    205 	f.mu.Lock()
    206 	f.pendingData += n
    207 	if f.pendingData+f.pendingUpdate > f.limit+f.delta {
    208 		limit := f.limit
    209 		rcvd := f.pendingData + f.pendingUpdate
    210 		f.mu.Unlock()
    211 		return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
    212 	}
    213 	f.mu.Unlock()
    214 	return nil
    215 }
    216 
    217 // onRead is invoked when the application reads the data. It returns the window size
    218 // to be sent to the peer.
    219 func (f *inFlow) onRead(n uint32) uint32 {
    220 	f.mu.Lock()
    221 	if f.pendingData == 0 {
    222 		f.mu.Unlock()
    223 		return 0
    224 	}
    225 	f.pendingData -= n
    226 	if n > f.delta {
    227 		n -= f.delta
    228 		f.delta = 0
    229 	} else {
    230 		f.delta -= n
    231 		n = 0
    232 	}
    233 	f.pendingUpdate += n
    234 	if f.pendingUpdate >= f.limit/4 {
    235 		wu := f.pendingUpdate
    236 		f.pendingUpdate = 0
    237 		f.mu.Unlock()
    238 		return wu
    239 	}
    240 	f.mu.Unlock()
    241 	return 0
    242 }
    243