1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package grpc 20 21 import ( 22 "io" 23 "sync" 24 "sync/atomic" 25 26 "golang.org/x/net/context" 27 "google.golang.org/grpc/balancer" 28 "google.golang.org/grpc/codes" 29 "google.golang.org/grpc/grpclog" 30 "google.golang.org/grpc/internal/channelz" 31 "google.golang.org/grpc/metadata" 32 "google.golang.org/grpc/resolver" 33 "google.golang.org/grpc/status" 34 "google.golang.org/grpc/transport" 35 ) 36 37 // pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick 38 // actions and unblock when there's a picker update. 39 type pickerWrapper struct { 40 mu sync.Mutex 41 done bool 42 blockingCh chan struct{} 43 picker balancer.Picker 44 45 // The latest connection happened. 46 connErrMu sync.Mutex 47 connErr error 48 49 stickinessMDKey atomic.Value 50 stickiness *stickyStore 51 } 52 53 func newPickerWrapper() *pickerWrapper { 54 bp := &pickerWrapper{ 55 blockingCh: make(chan struct{}), 56 stickiness: newStickyStore(), 57 } 58 return bp 59 } 60 61 func (bp *pickerWrapper) updateConnectionError(err error) { 62 bp.connErrMu.Lock() 63 bp.connErr = err 64 bp.connErrMu.Unlock() 65 } 66 67 func (bp *pickerWrapper) connectionError() error { 68 bp.connErrMu.Lock() 69 err := bp.connErr 70 bp.connErrMu.Unlock() 71 return err 72 } 73 74 func (bp *pickerWrapper) updateStickinessMDKey(newKey string) { 75 // No need to check ok because mdKey == "" if ok == false. 76 if oldKey, _ := bp.stickinessMDKey.Load().(string); oldKey != newKey { 77 bp.stickinessMDKey.Store(newKey) 78 bp.stickiness.reset(newKey) 79 } 80 } 81 82 func (bp *pickerWrapper) getStickinessMDKey() string { 83 // No need to check ok because mdKey == "" if ok == false. 84 mdKey, _ := bp.stickinessMDKey.Load().(string) 85 return mdKey 86 } 87 88 func (bp *pickerWrapper) clearStickinessState() { 89 if oldKey := bp.getStickinessMDKey(); oldKey != "" { 90 // There's no need to reset store if mdKey was "". 91 bp.stickiness.reset(oldKey) 92 } 93 } 94 95 // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. 96 func (bp *pickerWrapper) updatePicker(p balancer.Picker) { 97 bp.mu.Lock() 98 if bp.done { 99 bp.mu.Unlock() 100 return 101 } 102 bp.picker = p 103 // bp.blockingCh should never be nil. 104 close(bp.blockingCh) 105 bp.blockingCh = make(chan struct{}) 106 bp.mu.Unlock() 107 } 108 109 func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) { 110 acw.mu.Lock() 111 ac := acw.ac 112 acw.mu.Unlock() 113 ac.incrCallsStarted() 114 return func(b balancer.DoneInfo) { 115 if b.Err != nil && b.Err != io.EOF { 116 ac.incrCallsFailed() 117 } else { 118 ac.incrCallsSucceeded() 119 } 120 if done != nil { 121 done(b) 122 } 123 } 124 } 125 126 // pick returns the transport that will be used for the RPC. 127 // It may block in the following cases: 128 // - there's no picker 129 // - the current picker returns ErrNoSubConnAvailable 130 // - the current picker returns other errors and failfast is false. 131 // - the subConn returned by the current picker is not READY 132 // When one of these situations happens, pick blocks until the picker gets updated. 133 func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) { 134 135 mdKey := bp.getStickinessMDKey() 136 stickyKey, isSticky := stickyKeyFromContext(ctx, mdKey) 137 138 // Potential race here: if stickinessMDKey is updated after the above two 139 // lines, and this pick is a sticky pick, the following put could add an 140 // entry to sticky store with an outdated sticky key. 141 // 142 // The solution: keep the current md key in sticky store, and at the 143 // beginning of each get/put, check the mdkey against store.curMDKey. 144 // - Cons: one more string comparing for each get/put. 145 // - Pros: the string matching happens inside get/put, so the overhead for 146 // non-sticky RPCs will be minimal. 147 148 if isSticky { 149 if t, ok := bp.stickiness.get(mdKey, stickyKey); ok { 150 // Done function returned is always nil. 151 return t, nil, nil 152 } 153 } 154 155 var ( 156 p balancer.Picker 157 ch chan struct{} 158 ) 159 160 for { 161 bp.mu.Lock() 162 if bp.done { 163 bp.mu.Unlock() 164 return nil, nil, ErrClientConnClosing 165 } 166 167 if bp.picker == nil { 168 ch = bp.blockingCh 169 } 170 if ch == bp.blockingCh { 171 // This could happen when either: 172 // - bp.picker is nil (the previous if condition), or 173 // - has called pick on the current picker. 174 bp.mu.Unlock() 175 select { 176 case <-ctx.Done(): 177 return nil, nil, ctx.Err() 178 case <-ch: 179 } 180 continue 181 } 182 183 ch = bp.blockingCh 184 p = bp.picker 185 bp.mu.Unlock() 186 187 subConn, done, err := p.Pick(ctx, opts) 188 189 if err != nil { 190 switch err { 191 case balancer.ErrNoSubConnAvailable: 192 continue 193 case balancer.ErrTransientFailure: 194 if !failfast { 195 continue 196 } 197 return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError()) 198 default: 199 // err is some other error. 200 return nil, nil, toRPCErr(err) 201 } 202 } 203 204 acw, ok := subConn.(*acBalancerWrapper) 205 if !ok { 206 grpclog.Infof("subconn returned from pick is not *acBalancerWrapper") 207 continue 208 } 209 if t, ok := acw.getAddrConn().getReadyTransport(); ok { 210 if isSticky { 211 bp.stickiness.put(mdKey, stickyKey, acw) 212 } 213 if channelz.IsOn() { 214 return t, doneChannelzWrapper(acw, done), nil 215 } 216 return t, done, nil 217 } 218 grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick") 219 // If ok == false, ac.state is not READY. 220 // A valid picker always returns READY subConn. This means the state of ac 221 // just changed, and picker will be updated shortly. 222 // continue back to the beginning of the for loop to repick. 223 } 224 } 225 226 func (bp *pickerWrapper) close() { 227 bp.mu.Lock() 228 defer bp.mu.Unlock() 229 if bp.done { 230 return 231 } 232 bp.done = true 233 close(bp.blockingCh) 234 } 235 236 const stickinessKeyCountLimit = 1000 237 238 type stickyStoreEntry struct { 239 acw *acBalancerWrapper 240 addr resolver.Address 241 } 242 243 type stickyStore struct { 244 mu sync.Mutex 245 // curMDKey is check before every get/put to avoid races. The operation will 246 // abort immediately when the given mdKey is different from the curMDKey. 247 curMDKey string 248 store *linkedMap 249 } 250 251 func newStickyStore() *stickyStore { 252 return &stickyStore{ 253 store: newLinkedMap(), 254 } 255 } 256 257 // reset clears the map in stickyStore, and set the currentMDKey to newMDKey. 258 func (ss *stickyStore) reset(newMDKey string) { 259 ss.mu.Lock() 260 ss.curMDKey = newMDKey 261 ss.store.clear() 262 ss.mu.Unlock() 263 } 264 265 // stickyKey is the key to look up in store. mdKey will be checked against 266 // curMDKey to avoid races. 267 func (ss *stickyStore) put(mdKey, stickyKey string, acw *acBalancerWrapper) { 268 ss.mu.Lock() 269 defer ss.mu.Unlock() 270 if mdKey != ss.curMDKey { 271 return 272 } 273 // TODO(stickiness): limit the total number of entries. 274 ss.store.put(stickyKey, &stickyStoreEntry{ 275 acw: acw, 276 addr: acw.getAddrConn().getCurAddr(), 277 }) 278 if ss.store.len() > stickinessKeyCountLimit { 279 ss.store.removeOldest() 280 } 281 } 282 283 // stickyKey is the key to look up in store. mdKey will be checked against 284 // curMDKey to avoid races. 285 func (ss *stickyStore) get(mdKey, stickyKey string) (transport.ClientTransport, bool) { 286 ss.mu.Lock() 287 defer ss.mu.Unlock() 288 if mdKey != ss.curMDKey { 289 return nil, false 290 } 291 entry, ok := ss.store.get(stickyKey) 292 if !ok { 293 return nil, false 294 } 295 ac := entry.acw.getAddrConn() 296 if ac.getCurAddr() != entry.addr { 297 ss.store.remove(stickyKey) 298 return nil, false 299 } 300 t, ok := ac.getReadyTransport() 301 if !ok { 302 ss.store.remove(stickyKey) 303 return nil, false 304 } 305 return t, true 306 } 307 308 // Get one value from metadata in ctx with key stickinessMDKey. 309 // 310 // It returns "", false if stickinessMDKey is an empty string. 311 func stickyKeyFromContext(ctx context.Context, stickinessMDKey string) (string, bool) { 312 if stickinessMDKey == "" { 313 return "", false 314 } 315 316 md, added, ok := metadata.FromOutgoingContextRaw(ctx) 317 if !ok { 318 return "", false 319 } 320 321 if vv, ok := md[stickinessMDKey]; ok { 322 if len(vv) > 0 { 323 return vv[0], true 324 } 325 } 326 327 for _, ss := range added { 328 for i := 0; i < len(ss)-1; i += 2 { 329 if ss[i] == stickinessMDKey { 330 return ss[i+1], true 331 } 332 } 333 } 334 335 return "", false 336 } 337