1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package grpc 20 21 import ( 22 "strings" 23 "sync" 24 25 "golang.org/x/net/context" 26 "google.golang.org/grpc/balancer" 27 "google.golang.org/grpc/codes" 28 "google.golang.org/grpc/connectivity" 29 "google.golang.org/grpc/grpclog" 30 "google.golang.org/grpc/resolver" 31 "google.golang.org/grpc/status" 32 ) 33 34 type balancerWrapperBuilder struct { 35 b Balancer // The v1 balancer. 36 } 37 38 func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { 39 targetAddr := cc.Target() 40 targetSplitted := strings.Split(targetAddr, ":///") 41 if len(targetSplitted) >= 2 { 42 targetAddr = targetSplitted[1] 43 } 44 45 bwb.b.Start(targetAddr, BalancerConfig{ 46 DialCreds: opts.DialCreds, 47 Dialer: opts.Dialer, 48 }) 49 _, pickfirst := bwb.b.(*pickFirst) 50 bw := &balancerWrapper{ 51 balancer: bwb.b, 52 pickfirst: pickfirst, 53 cc: cc, 54 targetAddr: targetAddr, 55 startCh: make(chan struct{}), 56 conns: make(map[resolver.Address]balancer.SubConn), 57 connSt: make(map[balancer.SubConn]*scState), 58 csEvltr: &balancer.ConnectivityStateEvaluator{}, 59 state: connectivity.Idle, 60 } 61 cc.UpdateBalancerState(connectivity.Idle, bw) 62 go bw.lbWatcher() 63 return bw 64 } 65 66 func (bwb *balancerWrapperBuilder) Name() string { 67 return "wrapper" 68 } 69 70 type scState struct { 71 addr Address // The v1 address type. 72 s connectivity.State 73 down func(error) 74 } 75 76 type balancerWrapper struct { 77 balancer Balancer // The v1 balancer. 78 pickfirst bool 79 80 cc balancer.ClientConn 81 targetAddr string // Target without the scheme. 82 83 mu sync.Mutex 84 conns map[resolver.Address]balancer.SubConn 85 connSt map[balancer.SubConn]*scState 86 // This channel is closed when handling the first resolver result. 87 // lbWatcher blocks until this is closed, to avoid race between 88 // - NewSubConn is created, cc wants to notify balancer of state changes; 89 // - Build hasn't return, cc doesn't have access to balancer. 90 startCh chan struct{} 91 92 // To aggregate the connectivity state. 93 csEvltr *balancer.ConnectivityStateEvaluator 94 state connectivity.State 95 } 96 97 // lbWatcher watches the Notify channel of the balancer and manages 98 // connections accordingly. 99 func (bw *balancerWrapper) lbWatcher() { 100 <-bw.startCh 101 notifyCh := bw.balancer.Notify() 102 if notifyCh == nil { 103 // There's no resolver in the balancer. Connect directly. 104 a := resolver.Address{ 105 Addr: bw.targetAddr, 106 Type: resolver.Backend, 107 } 108 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{}) 109 if err != nil { 110 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err) 111 } else { 112 bw.mu.Lock() 113 bw.conns[a] = sc 114 bw.connSt[sc] = &scState{ 115 addr: Address{Addr: bw.targetAddr}, 116 s: connectivity.Idle, 117 } 118 bw.mu.Unlock() 119 sc.Connect() 120 } 121 return 122 } 123 124 for addrs := range notifyCh { 125 grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs) 126 if bw.pickfirst { 127 var ( 128 oldA resolver.Address 129 oldSC balancer.SubConn 130 ) 131 bw.mu.Lock() 132 for oldA, oldSC = range bw.conns { 133 break 134 } 135 bw.mu.Unlock() 136 if len(addrs) <= 0 { 137 if oldSC != nil { 138 // Teardown old sc. 139 bw.mu.Lock() 140 delete(bw.conns, oldA) 141 delete(bw.connSt, oldSC) 142 bw.mu.Unlock() 143 bw.cc.RemoveSubConn(oldSC) 144 } 145 continue 146 } 147 148 var newAddrs []resolver.Address 149 for _, a := range addrs { 150 newAddr := resolver.Address{ 151 Addr: a.Addr, 152 Type: resolver.Backend, // All addresses from balancer are all backends. 153 ServerName: "", 154 Metadata: a.Metadata, 155 } 156 newAddrs = append(newAddrs, newAddr) 157 } 158 if oldSC == nil { 159 // Create new sc. 160 sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{}) 161 if err != nil { 162 grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err) 163 } else { 164 bw.mu.Lock() 165 // For pickfirst, there should be only one SubConn, so the 166 // address doesn't matter. All states updating (up and down) 167 // and picking should all happen on that only SubConn. 168 bw.conns[resolver.Address{}] = sc 169 bw.connSt[sc] = &scState{ 170 addr: addrs[0], // Use the first address. 171 s: connectivity.Idle, 172 } 173 bw.mu.Unlock() 174 sc.Connect() 175 } 176 } else { 177 bw.mu.Lock() 178 bw.connSt[oldSC].addr = addrs[0] 179 bw.mu.Unlock() 180 oldSC.UpdateAddresses(newAddrs) 181 } 182 } else { 183 var ( 184 add []resolver.Address // Addresses need to setup connections. 185 del []balancer.SubConn // Connections need to tear down. 186 ) 187 resAddrs := make(map[resolver.Address]Address) 188 for _, a := range addrs { 189 resAddrs[resolver.Address{ 190 Addr: a.Addr, 191 Type: resolver.Backend, // All addresses from balancer are all backends. 192 ServerName: "", 193 Metadata: a.Metadata, 194 }] = a 195 } 196 bw.mu.Lock() 197 for a := range resAddrs { 198 if _, ok := bw.conns[a]; !ok { 199 add = append(add, a) 200 } 201 } 202 for a, c := range bw.conns { 203 if _, ok := resAddrs[a]; !ok { 204 del = append(del, c) 205 delete(bw.conns, a) 206 // Keep the state of this sc in bw.connSt until its state becomes Shutdown. 207 } 208 } 209 bw.mu.Unlock() 210 for _, a := range add { 211 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{}) 212 if err != nil { 213 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err) 214 } else { 215 bw.mu.Lock() 216 bw.conns[a] = sc 217 bw.connSt[sc] = &scState{ 218 addr: resAddrs[a], 219 s: connectivity.Idle, 220 } 221 bw.mu.Unlock() 222 sc.Connect() 223 } 224 } 225 for _, c := range del { 226 bw.cc.RemoveSubConn(c) 227 } 228 } 229 } 230 } 231 232 func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 233 bw.mu.Lock() 234 defer bw.mu.Unlock() 235 scSt, ok := bw.connSt[sc] 236 if !ok { 237 return 238 } 239 if s == connectivity.Idle { 240 sc.Connect() 241 } 242 oldS := scSt.s 243 scSt.s = s 244 if oldS != connectivity.Ready && s == connectivity.Ready { 245 scSt.down = bw.balancer.Up(scSt.addr) 246 } else if oldS == connectivity.Ready && s != connectivity.Ready { 247 if scSt.down != nil { 248 scSt.down(errConnClosing) 249 } 250 } 251 sa := bw.csEvltr.RecordTransition(oldS, s) 252 if bw.state != sa { 253 bw.state = sa 254 } 255 bw.cc.UpdateBalancerState(bw.state, bw) 256 if s == connectivity.Shutdown { 257 // Remove state for this sc. 258 delete(bw.connSt, sc) 259 } 260 } 261 262 func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) { 263 bw.mu.Lock() 264 defer bw.mu.Unlock() 265 select { 266 case <-bw.startCh: 267 default: 268 close(bw.startCh) 269 } 270 // There should be a resolver inside the balancer. 271 // All updates here, if any, are ignored. 272 } 273 274 func (bw *balancerWrapper) Close() { 275 bw.mu.Lock() 276 defer bw.mu.Unlock() 277 select { 278 case <-bw.startCh: 279 default: 280 close(bw.startCh) 281 } 282 bw.balancer.Close() 283 } 284 285 // The picker is the balancerWrapper itself. 286 // Pick should never return ErrNoSubConnAvailable. 287 // It either blocks or returns error, consistent with v1 balancer Get(). 288 func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { 289 failfast := true // Default failfast is true. 290 if ss, ok := rpcInfoFromContext(ctx); ok { 291 failfast = ss.failfast 292 } 293 a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast}) 294 if err != nil { 295 return nil, nil, err 296 } 297 var done func(balancer.DoneInfo) 298 if p != nil { 299 done = func(i balancer.DoneInfo) { p() } 300 } 301 var sc balancer.SubConn 302 bw.mu.Lock() 303 defer bw.mu.Unlock() 304 if bw.pickfirst { 305 // Get the first sc in conns. 306 for _, sc = range bw.conns { 307 break 308 } 309 } else { 310 var ok bool 311 sc, ok = bw.conns[resolver.Address{ 312 Addr: a.Addr, 313 Type: resolver.Backend, 314 ServerName: "", 315 Metadata: a.Metadata, 316 }] 317 if !ok && failfast { 318 return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available") 319 } 320 if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) { 321 // If the returned sc is not ready and RPC is failfast, 322 // return error, and this RPC will fail. 323 return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available") 324 } 325 } 326 327 return sc, done, nil 328 } 329