1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package grpc 20 21 import ( 22 "fmt" 23 "sync" 24 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/connectivity" 27 "google.golang.org/grpc/grpclog" 28 "google.golang.org/grpc/resolver" 29 ) 30 31 // scStateUpdate contains the subConn and the new state it changed to. 32 type scStateUpdate struct { 33 sc balancer.SubConn 34 state connectivity.State 35 } 36 37 // scStateUpdateBuffer is an unbounded channel for scStateChangeTuple. 38 // TODO make a general purpose buffer that uses interface{}. 39 type scStateUpdateBuffer struct { 40 c chan *scStateUpdate 41 mu sync.Mutex 42 backlog []*scStateUpdate 43 } 44 45 func newSCStateUpdateBuffer() *scStateUpdateBuffer { 46 return &scStateUpdateBuffer{ 47 c: make(chan *scStateUpdate, 1), 48 } 49 } 50 51 func (b *scStateUpdateBuffer) put(t *scStateUpdate) { 52 b.mu.Lock() 53 defer b.mu.Unlock() 54 if len(b.backlog) == 0 { 55 select { 56 case b.c <- t: 57 return 58 default: 59 } 60 } 61 b.backlog = append(b.backlog, t) 62 } 63 64 func (b *scStateUpdateBuffer) load() { 65 b.mu.Lock() 66 defer b.mu.Unlock() 67 if len(b.backlog) > 0 { 68 select { 69 case b.c <- b.backlog[0]: 70 b.backlog[0] = nil 71 b.backlog = b.backlog[1:] 72 default: 73 } 74 } 75 } 76 77 // get returns the channel that the scStateUpdate will be sent to. 78 // 79 // Upon receiving, the caller should call load to send another 80 // scStateChangeTuple onto the channel if there is any. 81 func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate { 82 return b.c 83 } 84 85 // resolverUpdate contains the new resolved addresses or error if there's 86 // any. 87 type resolverUpdate struct { 88 addrs []resolver.Address 89 err error 90 } 91 92 // ccBalancerWrapper is a wrapper on top of cc for balancers. 93 // It implements balancer.ClientConn interface. 94 type ccBalancerWrapper struct { 95 cc *ClientConn 96 balancer balancer.Balancer 97 stateChangeQueue *scStateUpdateBuffer 98 resolverUpdateCh chan *resolverUpdate 99 done chan struct{} 100 101 mu sync.Mutex 102 subConns map[*acBalancerWrapper]struct{} 103 } 104 105 func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper { 106 ccb := &ccBalancerWrapper{ 107 cc: cc, 108 stateChangeQueue: newSCStateUpdateBuffer(), 109 resolverUpdateCh: make(chan *resolverUpdate, 1), 110 done: make(chan struct{}), 111 subConns: make(map[*acBalancerWrapper]struct{}), 112 } 113 go ccb.watcher() 114 ccb.balancer = b.Build(ccb, bopts) 115 return ccb 116 } 117 118 // watcher balancer functions sequentially, so the balancer can be implemented 119 // lock-free. 120 func (ccb *ccBalancerWrapper) watcher() { 121 for { 122 select { 123 case t := <-ccb.stateChangeQueue.get(): 124 ccb.stateChangeQueue.load() 125 select { 126 case <-ccb.done: 127 ccb.balancer.Close() 128 return 129 default: 130 } 131 ccb.balancer.HandleSubConnStateChange(t.sc, t.state) 132 case t := <-ccb.resolverUpdateCh: 133 select { 134 case <-ccb.done: 135 ccb.balancer.Close() 136 return 137 default: 138 } 139 ccb.balancer.HandleResolvedAddrs(t.addrs, t.err) 140 case <-ccb.done: 141 } 142 143 select { 144 case <-ccb.done: 145 ccb.balancer.Close() 146 ccb.mu.Lock() 147 scs := ccb.subConns 148 ccb.subConns = nil 149 ccb.mu.Unlock() 150 for acbw := range scs { 151 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) 152 } 153 return 154 default: 155 } 156 } 157 } 158 159 func (ccb *ccBalancerWrapper) close() { 160 close(ccb.done) 161 } 162 163 func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 164 // When updating addresses for a SubConn, if the address in use is not in 165 // the new addresses, the old ac will be tearDown() and a new ac will be 166 // created. tearDown() generates a state change with Shutdown state, we 167 // don't want the balancer to receive this state change. So before 168 // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and 169 // this function will be called with (nil, Shutdown). We don't need to call 170 // balancer method in this case. 171 if sc == nil { 172 return 173 } 174 ccb.stateChangeQueue.put(&scStateUpdate{ 175 sc: sc, 176 state: s, 177 }) 178 } 179 180 func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err error) { 181 select { 182 case <-ccb.resolverUpdateCh: 183 default: 184 } 185 ccb.resolverUpdateCh <- &resolverUpdate{ 186 addrs: addrs, 187 err: err, 188 } 189 } 190 191 func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 192 if len(addrs) <= 0 { 193 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") 194 } 195 ccb.mu.Lock() 196 defer ccb.mu.Unlock() 197 if ccb.subConns == nil { 198 return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed") 199 } 200 ac, err := ccb.cc.newAddrConn(addrs) 201 if err != nil { 202 return nil, err 203 } 204 acbw := &acBalancerWrapper{ac: ac} 205 acbw.ac.mu.Lock() 206 ac.acbw = acbw 207 acbw.ac.mu.Unlock() 208 ccb.subConns[acbw] = struct{}{} 209 return acbw, nil 210 } 211 212 func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { 213 acbw, ok := sc.(*acBalancerWrapper) 214 if !ok { 215 return 216 } 217 ccb.mu.Lock() 218 defer ccb.mu.Unlock() 219 if ccb.subConns == nil { 220 return 221 } 222 delete(ccb.subConns, acbw) 223 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) 224 } 225 226 func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) { 227 ccb.mu.Lock() 228 defer ccb.mu.Unlock() 229 if ccb.subConns == nil { 230 return 231 } 232 ccb.cc.csMgr.updateState(s) 233 ccb.cc.blockingpicker.updatePicker(p) 234 } 235 236 func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOption) { 237 ccb.cc.resolveNow(o) 238 } 239 240 func (ccb *ccBalancerWrapper) Target() string { 241 return ccb.cc.target 242 } 243 244 // acBalancerWrapper is a wrapper on top of ac for balancers. 245 // It implements balancer.SubConn interface. 246 type acBalancerWrapper struct { 247 mu sync.Mutex 248 ac *addrConn 249 } 250 251 func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { 252 acbw.mu.Lock() 253 defer acbw.mu.Unlock() 254 if len(addrs) <= 0 { 255 acbw.ac.tearDown(errConnDrain) 256 return 257 } 258 if !acbw.ac.tryUpdateAddrs(addrs) { 259 cc := acbw.ac.cc 260 acbw.ac.mu.Lock() 261 // Set old ac.acbw to nil so the Shutdown state update will be ignored 262 // by balancer. 263 // 264 // TODO(bar) the state transition could be wrong when tearDown() old ac 265 // and creating new ac, fix the transition. 266 acbw.ac.acbw = nil 267 acbw.ac.mu.Unlock() 268 acState := acbw.ac.getState() 269 acbw.ac.tearDown(errConnDrain) 270 271 if acState == connectivity.Shutdown { 272 return 273 } 274 275 ac, err := cc.newAddrConn(addrs) 276 if err != nil { 277 grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err) 278 return 279 } 280 acbw.ac = ac 281 ac.mu.Lock() 282 ac.acbw = acbw 283 ac.mu.Unlock() 284 if acState != connectivity.Idle { 285 ac.connect() 286 } 287 } 288 } 289 290 func (acbw *acBalancerWrapper) Connect() { 291 acbw.mu.Lock() 292 defer acbw.mu.Unlock() 293 acbw.ac.connect() 294 } 295 296 func (acbw *acBalancerWrapper) getAddrConn() *addrConn { 297 acbw.mu.Lock() 298 defer acbw.mu.Unlock() 299 return acbw.ac 300 } 301