Home | History | Annotate | Download | only in transport
      1 /*
      2  *
      3  * Copyright 2017 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package transport
     20 
     21 import (
     22 	"sync"
     23 	"time"
     24 )
     25 
     26 const (
     27 	// bdpLimit is the maximum value the flow control windows
     28 	// will be increased to.
     29 	bdpLimit = (1 << 20) * 4
     30 	// alpha is a constant factor used to keep a moving average
     31 	// of RTTs.
     32 	alpha = 0.9
     33 	// If the current bdp sample is greater than or equal to
     34 	// our beta * our estimated bdp and the current bandwidth
     35 	// sample is the maximum bandwidth observed so far, we
     36 	// increase our bbp estimate by a factor of gamma.
     37 	beta = 0.66
     38 	// To put our bdp to be smaller than or equal to twice the real BDP,
     39 	// we should multiply our current sample with 4/3, however to round things out
     40 	// we use 2 as the multiplication factor.
     41 	gamma = 2
     42 )
     43 
     44 // Adding arbitrary data to ping so that its ack can be identified.
     45 // Easter-egg: what does the ping message say?
     46 var bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}}
     47 
     48 type bdpEstimator struct {
     49 	// sentAt is the time when the ping was sent.
     50 	sentAt time.Time
     51 
     52 	mu sync.Mutex
     53 	// bdp is the current bdp estimate.
     54 	bdp uint32
     55 	// sample is the number of bytes received in one measurement cycle.
     56 	sample uint32
     57 	// bwMax is the maximum bandwidth noted so far (bytes/sec).
     58 	bwMax float64
     59 	// bool to keep track of the beginning of a new measurement cycle.
     60 	isSent bool
     61 	// Callback to update the window sizes.
     62 	updateFlowControl func(n uint32)
     63 	// sampleCount is the number of samples taken so far.
     64 	sampleCount uint64
     65 	// round trip time (seconds)
     66 	rtt float64
     67 }
     68 
     69 // timesnap registers the time bdp ping was sent out so that
     70 // network rtt can be calculated when its ack is received.
     71 // It is called (by controller) when the bdpPing is
     72 // being written on the wire.
     73 func (b *bdpEstimator) timesnap(d [8]byte) {
     74 	if bdpPing.data != d {
     75 		return
     76 	}
     77 	b.sentAt = time.Now()
     78 }
     79 
     80 // add adds bytes to the current sample for calculating bdp.
     81 // It returns true only if a ping must be sent. This can be used
     82 // by the caller (handleData) to make decision about batching
     83 // a window update with it.
     84 func (b *bdpEstimator) add(n uint32) bool {
     85 	b.mu.Lock()
     86 	defer b.mu.Unlock()
     87 	if b.bdp == bdpLimit {
     88 		return false
     89 	}
     90 	if !b.isSent {
     91 		b.isSent = true
     92 		b.sample = n
     93 		b.sentAt = time.Time{}
     94 		b.sampleCount++
     95 		return true
     96 	}
     97 	b.sample += n
     98 	return false
     99 }
    100 
    101 // calculate is called when an ack for a bdp ping is received.
    102 // Here we calculate the current bdp and bandwidth sample and
    103 // decide if the flow control windows should go up.
    104 func (b *bdpEstimator) calculate(d [8]byte) {
    105 	// Check if the ping acked for was the bdp ping.
    106 	if bdpPing.data != d {
    107 		return
    108 	}
    109 	b.mu.Lock()
    110 	rttSample := time.Since(b.sentAt).Seconds()
    111 	if b.sampleCount < 10 {
    112 		// Bootstrap rtt with an average of first 10 rtt samples.
    113 		b.rtt += (rttSample - b.rtt) / float64(b.sampleCount)
    114 	} else {
    115 		// Heed to the recent past more.
    116 		b.rtt += (rttSample - b.rtt) * float64(alpha)
    117 	}
    118 	b.isSent = false
    119 	// The number of bytes accumulated so far in the sample is smaller
    120 	// than or equal to 1.5 times the real BDP on a saturated connection.
    121 	bwCurrent := float64(b.sample) / (b.rtt * float64(1.5))
    122 	if bwCurrent > b.bwMax {
    123 		b.bwMax = bwCurrent
    124 	}
    125 	// If the current sample (which is smaller than or equal to the 1.5 times the real BDP) is
    126 	// greater than or equal to 2/3rd our perceived bdp AND this is the maximum bandwidth seen so far, we
    127 	// should update our perception of the network BDP.
    128 	if float64(b.sample) >= beta*float64(b.bdp) && bwCurrent == b.bwMax && b.bdp != bdpLimit {
    129 		sampleFloat := float64(b.sample)
    130 		b.bdp = uint32(gamma * sampleFloat)
    131 		if b.bdp > bdpLimit {
    132 			b.bdp = bdpLimit
    133 		}
    134 		bdp := b.bdp
    135 		b.mu.Unlock()
    136 		b.updateFlowControl(bdp)
    137 		return
    138 	}
    139 	b.mu.Unlock()
    140 }
    141