Home | History | Annotate | Download | only in grpc
      1 /*
      2  *
      3  * Copyright 2017 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package grpc
     20 
     21 import (
     22 	"io"
     23 	"sync"
     24 	"sync/atomic"
     25 
     26 	"golang.org/x/net/context"
     27 	"google.golang.org/grpc/balancer"
     28 	"google.golang.org/grpc/codes"
     29 	"google.golang.org/grpc/grpclog"
     30 	"google.golang.org/grpc/internal/channelz"
     31 	"google.golang.org/grpc/metadata"
     32 	"google.golang.org/grpc/resolver"
     33 	"google.golang.org/grpc/status"
     34 	"google.golang.org/grpc/transport"
     35 )
     36 
     37 // pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
     38 // actions and unblock when there's a picker update.
     39 type pickerWrapper struct {
     40 	mu         sync.Mutex
     41 	done       bool
     42 	blockingCh chan struct{}
     43 	picker     balancer.Picker
     44 
     45 	// The latest connection happened.
     46 	connErrMu sync.Mutex
     47 	connErr   error
     48 
     49 	stickinessMDKey atomic.Value
     50 	stickiness      *stickyStore
     51 }
     52 
     53 func newPickerWrapper() *pickerWrapper {
     54 	bp := &pickerWrapper{
     55 		blockingCh: make(chan struct{}),
     56 		stickiness: newStickyStore(),
     57 	}
     58 	return bp
     59 }
     60 
     61 func (bp *pickerWrapper) updateConnectionError(err error) {
     62 	bp.connErrMu.Lock()
     63 	bp.connErr = err
     64 	bp.connErrMu.Unlock()
     65 }
     66 
     67 func (bp *pickerWrapper) connectionError() error {
     68 	bp.connErrMu.Lock()
     69 	err := bp.connErr
     70 	bp.connErrMu.Unlock()
     71 	return err
     72 }
     73 
     74 func (bp *pickerWrapper) updateStickinessMDKey(newKey string) {
     75 	// No need to check ok because mdKey == "" if ok == false.
     76 	if oldKey, _ := bp.stickinessMDKey.Load().(string); oldKey != newKey {
     77 		bp.stickinessMDKey.Store(newKey)
     78 		bp.stickiness.reset(newKey)
     79 	}
     80 }
     81 
     82 func (bp *pickerWrapper) getStickinessMDKey() string {
     83 	// No need to check ok because mdKey == "" if ok == false.
     84 	mdKey, _ := bp.stickinessMDKey.Load().(string)
     85 	return mdKey
     86 }
     87 
     88 func (bp *pickerWrapper) clearStickinessState() {
     89 	if oldKey := bp.getStickinessMDKey(); oldKey != "" {
     90 		// There's no need to reset store if mdKey was "".
     91 		bp.stickiness.reset(oldKey)
     92 	}
     93 }
     94 
     95 // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
     96 func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
     97 	bp.mu.Lock()
     98 	if bp.done {
     99 		bp.mu.Unlock()
    100 		return
    101 	}
    102 	bp.picker = p
    103 	// bp.blockingCh should never be nil.
    104 	close(bp.blockingCh)
    105 	bp.blockingCh = make(chan struct{})
    106 	bp.mu.Unlock()
    107 }
    108 
    109 func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
    110 	acw.mu.Lock()
    111 	ac := acw.ac
    112 	acw.mu.Unlock()
    113 	ac.incrCallsStarted()
    114 	return func(b balancer.DoneInfo) {
    115 		if b.Err != nil && b.Err != io.EOF {
    116 			ac.incrCallsFailed()
    117 		} else {
    118 			ac.incrCallsSucceeded()
    119 		}
    120 		if done != nil {
    121 			done(b)
    122 		}
    123 	}
    124 }
    125 
    126 // pick returns the transport that will be used for the RPC.
    127 // It may block in the following cases:
    128 // - there's no picker
    129 // - the current picker returns ErrNoSubConnAvailable
    130 // - the current picker returns other errors and failfast is false.
    131 // - the subConn returned by the current picker is not READY
    132 // When one of these situations happens, pick blocks until the picker gets updated.
    133 func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
    134 
    135 	mdKey := bp.getStickinessMDKey()
    136 	stickyKey, isSticky := stickyKeyFromContext(ctx, mdKey)
    137 
    138 	// Potential race here: if stickinessMDKey is updated after the above two
    139 	// lines, and this pick is a sticky pick, the following put could add an
    140 	// entry to sticky store with an outdated sticky key.
    141 	//
    142 	// The solution: keep the current md key in sticky store, and at the
    143 	// beginning of each get/put, check the mdkey against store.curMDKey.
    144 	//  - Cons: one more string comparing for each get/put.
    145 	//  - Pros: the string matching happens inside get/put, so the overhead for
    146 	//  non-sticky RPCs will be minimal.
    147 
    148 	if isSticky {
    149 		if t, ok := bp.stickiness.get(mdKey, stickyKey); ok {
    150 			// Done function returned is always nil.
    151 			return t, nil, nil
    152 		}
    153 	}
    154 
    155 	var (
    156 		p  balancer.Picker
    157 		ch chan struct{}
    158 	)
    159 
    160 	for {
    161 		bp.mu.Lock()
    162 		if bp.done {
    163 			bp.mu.Unlock()
    164 			return nil, nil, ErrClientConnClosing
    165 		}
    166 
    167 		if bp.picker == nil {
    168 			ch = bp.blockingCh
    169 		}
    170 		if ch == bp.blockingCh {
    171 			// This could happen when either:
    172 			// - bp.picker is nil (the previous if condition), or
    173 			// - has called pick on the current picker.
    174 			bp.mu.Unlock()
    175 			select {
    176 			case <-ctx.Done():
    177 				return nil, nil, ctx.Err()
    178 			case <-ch:
    179 			}
    180 			continue
    181 		}
    182 
    183 		ch = bp.blockingCh
    184 		p = bp.picker
    185 		bp.mu.Unlock()
    186 
    187 		subConn, done, err := p.Pick(ctx, opts)
    188 
    189 		if err != nil {
    190 			switch err {
    191 			case balancer.ErrNoSubConnAvailable:
    192 				continue
    193 			case balancer.ErrTransientFailure:
    194 				if !failfast {
    195 					continue
    196 				}
    197 				return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError())
    198 			default:
    199 				// err is some other error.
    200 				return nil, nil, toRPCErr(err)
    201 			}
    202 		}
    203 
    204 		acw, ok := subConn.(*acBalancerWrapper)
    205 		if !ok {
    206 			grpclog.Infof("subconn returned from pick is not *acBalancerWrapper")
    207 			continue
    208 		}
    209 		if t, ok := acw.getAddrConn().getReadyTransport(); ok {
    210 			if isSticky {
    211 				bp.stickiness.put(mdKey, stickyKey, acw)
    212 			}
    213 			if channelz.IsOn() {
    214 				return t, doneChannelzWrapper(acw, done), nil
    215 			}
    216 			return t, done, nil
    217 		}
    218 		grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
    219 		// If ok == false, ac.state is not READY.
    220 		// A valid picker always returns READY subConn. This means the state of ac
    221 		// just changed, and picker will be updated shortly.
    222 		// continue back to the beginning of the for loop to repick.
    223 	}
    224 }
    225 
    226 func (bp *pickerWrapper) close() {
    227 	bp.mu.Lock()
    228 	defer bp.mu.Unlock()
    229 	if bp.done {
    230 		return
    231 	}
    232 	bp.done = true
    233 	close(bp.blockingCh)
    234 }
    235 
    236 const stickinessKeyCountLimit = 1000
    237 
    238 type stickyStoreEntry struct {
    239 	acw  *acBalancerWrapper
    240 	addr resolver.Address
    241 }
    242 
    243 type stickyStore struct {
    244 	mu sync.Mutex
    245 	// curMDKey is check before every get/put to avoid races. The operation will
    246 	// abort immediately when the given mdKey is different from the curMDKey.
    247 	curMDKey string
    248 	store    *linkedMap
    249 }
    250 
    251 func newStickyStore() *stickyStore {
    252 	return &stickyStore{
    253 		store: newLinkedMap(),
    254 	}
    255 }
    256 
    257 // reset clears the map in stickyStore, and set the currentMDKey to newMDKey.
    258 func (ss *stickyStore) reset(newMDKey string) {
    259 	ss.mu.Lock()
    260 	ss.curMDKey = newMDKey
    261 	ss.store.clear()
    262 	ss.mu.Unlock()
    263 }
    264 
    265 // stickyKey is the key to look up in store. mdKey will be checked against
    266 // curMDKey to avoid races.
    267 func (ss *stickyStore) put(mdKey, stickyKey string, acw *acBalancerWrapper) {
    268 	ss.mu.Lock()
    269 	defer ss.mu.Unlock()
    270 	if mdKey != ss.curMDKey {
    271 		return
    272 	}
    273 	// TODO(stickiness): limit the total number of entries.
    274 	ss.store.put(stickyKey, &stickyStoreEntry{
    275 		acw:  acw,
    276 		addr: acw.getAddrConn().getCurAddr(),
    277 	})
    278 	if ss.store.len() > stickinessKeyCountLimit {
    279 		ss.store.removeOldest()
    280 	}
    281 }
    282 
    283 // stickyKey is the key to look up in store. mdKey will be checked against
    284 // curMDKey to avoid races.
    285 func (ss *stickyStore) get(mdKey, stickyKey string) (transport.ClientTransport, bool) {
    286 	ss.mu.Lock()
    287 	defer ss.mu.Unlock()
    288 	if mdKey != ss.curMDKey {
    289 		return nil, false
    290 	}
    291 	entry, ok := ss.store.get(stickyKey)
    292 	if !ok {
    293 		return nil, false
    294 	}
    295 	ac := entry.acw.getAddrConn()
    296 	if ac.getCurAddr() != entry.addr {
    297 		ss.store.remove(stickyKey)
    298 		return nil, false
    299 	}
    300 	t, ok := ac.getReadyTransport()
    301 	if !ok {
    302 		ss.store.remove(stickyKey)
    303 		return nil, false
    304 	}
    305 	return t, true
    306 }
    307 
    308 // Get one value from metadata in ctx with key stickinessMDKey.
    309 //
    310 // It returns "", false if stickinessMDKey is an empty string.
    311 func stickyKeyFromContext(ctx context.Context, stickinessMDKey string) (string, bool) {
    312 	if stickinessMDKey == "" {
    313 		return "", false
    314 	}
    315 
    316 	md, added, ok := metadata.FromOutgoingContextRaw(ctx)
    317 	if !ok {
    318 		return "", false
    319 	}
    320 
    321 	if vv, ok := md[stickinessMDKey]; ok {
    322 		if len(vv) > 0 {
    323 			return vv[0], true
    324 		}
    325 	}
    326 
    327 	for _, ss := range added {
    328 		for i := 0; i < len(ss)-1; i += 2 {
    329 			if ss[i] == stickinessMDKey {
    330 				return ss[i+1], true
    331 			}
    332 		}
    333 	}
    334 
    335 	return "", false
    336 }
    337