Home | History | Annotate | Download | only in grpc
      1 /*
      2  *
      3  * Copyright 2017 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package grpc
     20 
     21 import (
     22 	"fmt"
     23 	"sync"
     24 
     25 	"google.golang.org/grpc/balancer"
     26 	"google.golang.org/grpc/connectivity"
     27 	"google.golang.org/grpc/grpclog"
     28 	"google.golang.org/grpc/resolver"
     29 )
     30 
     31 // scStateUpdate contains the subConn and the new state it changed to.
     32 type scStateUpdate struct {
     33 	sc    balancer.SubConn
     34 	state connectivity.State
     35 }
     36 
     37 // scStateUpdateBuffer is an unbounded channel for scStateChangeTuple.
     38 // TODO make a general purpose buffer that uses interface{}.
     39 type scStateUpdateBuffer struct {
     40 	c       chan *scStateUpdate
     41 	mu      sync.Mutex
     42 	backlog []*scStateUpdate
     43 }
     44 
     45 func newSCStateUpdateBuffer() *scStateUpdateBuffer {
     46 	return &scStateUpdateBuffer{
     47 		c: make(chan *scStateUpdate, 1),
     48 	}
     49 }
     50 
     51 func (b *scStateUpdateBuffer) put(t *scStateUpdate) {
     52 	b.mu.Lock()
     53 	defer b.mu.Unlock()
     54 	if len(b.backlog) == 0 {
     55 		select {
     56 		case b.c <- t:
     57 			return
     58 		default:
     59 		}
     60 	}
     61 	b.backlog = append(b.backlog, t)
     62 }
     63 
     64 func (b *scStateUpdateBuffer) load() {
     65 	b.mu.Lock()
     66 	defer b.mu.Unlock()
     67 	if len(b.backlog) > 0 {
     68 		select {
     69 		case b.c <- b.backlog[0]:
     70 			b.backlog[0] = nil
     71 			b.backlog = b.backlog[1:]
     72 		default:
     73 		}
     74 	}
     75 }
     76 
     77 // get returns the channel that the scStateUpdate will be sent to.
     78 //
     79 // Upon receiving, the caller should call load to send another
     80 // scStateChangeTuple onto the channel if there is any.
     81 func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
     82 	return b.c
     83 }
     84 
     85 // resolverUpdate contains the new resolved addresses or error if there's
     86 // any.
     87 type resolverUpdate struct {
     88 	addrs []resolver.Address
     89 	err   error
     90 }
     91 
     92 // ccBalancerWrapper is a wrapper on top of cc for balancers.
     93 // It implements balancer.ClientConn interface.
     94 type ccBalancerWrapper struct {
     95 	cc               *ClientConn
     96 	balancer         balancer.Balancer
     97 	stateChangeQueue *scStateUpdateBuffer
     98 	resolverUpdateCh chan *resolverUpdate
     99 	done             chan struct{}
    100 
    101 	mu       sync.Mutex
    102 	subConns map[*acBalancerWrapper]struct{}
    103 }
    104 
    105 func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
    106 	ccb := &ccBalancerWrapper{
    107 		cc:               cc,
    108 		stateChangeQueue: newSCStateUpdateBuffer(),
    109 		resolverUpdateCh: make(chan *resolverUpdate, 1),
    110 		done:             make(chan struct{}),
    111 		subConns:         make(map[*acBalancerWrapper]struct{}),
    112 	}
    113 	go ccb.watcher()
    114 	ccb.balancer = b.Build(ccb, bopts)
    115 	return ccb
    116 }
    117 
    118 // watcher balancer functions sequentially, so the balancer can be implemented
    119 // lock-free.
    120 func (ccb *ccBalancerWrapper) watcher() {
    121 	for {
    122 		select {
    123 		case t := <-ccb.stateChangeQueue.get():
    124 			ccb.stateChangeQueue.load()
    125 			select {
    126 			case <-ccb.done:
    127 				ccb.balancer.Close()
    128 				return
    129 			default:
    130 			}
    131 			ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
    132 		case t := <-ccb.resolverUpdateCh:
    133 			select {
    134 			case <-ccb.done:
    135 				ccb.balancer.Close()
    136 				return
    137 			default:
    138 			}
    139 			ccb.balancer.HandleResolvedAddrs(t.addrs, t.err)
    140 		case <-ccb.done:
    141 		}
    142 
    143 		select {
    144 		case <-ccb.done:
    145 			ccb.balancer.Close()
    146 			ccb.mu.Lock()
    147 			scs := ccb.subConns
    148 			ccb.subConns = nil
    149 			ccb.mu.Unlock()
    150 			for acbw := range scs {
    151 				ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
    152 			}
    153 			return
    154 		default:
    155 		}
    156 	}
    157 }
    158 
    159 func (ccb *ccBalancerWrapper) close() {
    160 	close(ccb.done)
    161 }
    162 
    163 func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
    164 	// When updating addresses for a SubConn, if the address in use is not in
    165 	// the new addresses, the old ac will be tearDown() and a new ac will be
    166 	// created. tearDown() generates a state change with Shutdown state, we
    167 	// don't want the balancer to receive this state change. So before
    168 	// tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
    169 	// this function will be called with (nil, Shutdown). We don't need to call
    170 	// balancer method in this case.
    171 	if sc == nil {
    172 		return
    173 	}
    174 	ccb.stateChangeQueue.put(&scStateUpdate{
    175 		sc:    sc,
    176 		state: s,
    177 	})
    178 }
    179 
    180 func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err error) {
    181 	select {
    182 	case <-ccb.resolverUpdateCh:
    183 	default:
    184 	}
    185 	ccb.resolverUpdateCh <- &resolverUpdate{
    186 		addrs: addrs,
    187 		err:   err,
    188 	}
    189 }
    190 
    191 func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
    192 	if len(addrs) <= 0 {
    193 		return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
    194 	}
    195 	ccb.mu.Lock()
    196 	defer ccb.mu.Unlock()
    197 	if ccb.subConns == nil {
    198 		return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
    199 	}
    200 	ac, err := ccb.cc.newAddrConn(addrs)
    201 	if err != nil {
    202 		return nil, err
    203 	}
    204 	acbw := &acBalancerWrapper{ac: ac}
    205 	acbw.ac.mu.Lock()
    206 	ac.acbw = acbw
    207 	acbw.ac.mu.Unlock()
    208 	ccb.subConns[acbw] = struct{}{}
    209 	return acbw, nil
    210 }
    211 
    212 func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
    213 	acbw, ok := sc.(*acBalancerWrapper)
    214 	if !ok {
    215 		return
    216 	}
    217 	ccb.mu.Lock()
    218 	defer ccb.mu.Unlock()
    219 	if ccb.subConns == nil {
    220 		return
    221 	}
    222 	delete(ccb.subConns, acbw)
    223 	ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
    224 }
    225 
    226 func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
    227 	ccb.mu.Lock()
    228 	defer ccb.mu.Unlock()
    229 	if ccb.subConns == nil {
    230 		return
    231 	}
    232 	ccb.cc.csMgr.updateState(s)
    233 	ccb.cc.blockingpicker.updatePicker(p)
    234 }
    235 
    236 func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOption) {
    237 	ccb.cc.resolveNow(o)
    238 }
    239 
    240 func (ccb *ccBalancerWrapper) Target() string {
    241 	return ccb.cc.target
    242 }
    243 
    244 // acBalancerWrapper is a wrapper on top of ac for balancers.
    245 // It implements balancer.SubConn interface.
    246 type acBalancerWrapper struct {
    247 	mu sync.Mutex
    248 	ac *addrConn
    249 }
    250 
    251 func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
    252 	acbw.mu.Lock()
    253 	defer acbw.mu.Unlock()
    254 	if len(addrs) <= 0 {
    255 		acbw.ac.tearDown(errConnDrain)
    256 		return
    257 	}
    258 	if !acbw.ac.tryUpdateAddrs(addrs) {
    259 		cc := acbw.ac.cc
    260 		acbw.ac.mu.Lock()
    261 		// Set old ac.acbw to nil so the Shutdown state update will be ignored
    262 		// by balancer.
    263 		//
    264 		// TODO(bar) the state transition could be wrong when tearDown() old ac
    265 		// and creating new ac, fix the transition.
    266 		acbw.ac.acbw = nil
    267 		acbw.ac.mu.Unlock()
    268 		acState := acbw.ac.getState()
    269 		acbw.ac.tearDown(errConnDrain)
    270 
    271 		if acState == connectivity.Shutdown {
    272 			return
    273 		}
    274 
    275 		ac, err := cc.newAddrConn(addrs)
    276 		if err != nil {
    277 			grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
    278 			return
    279 		}
    280 		acbw.ac = ac
    281 		ac.mu.Lock()
    282 		ac.acbw = acbw
    283 		ac.mu.Unlock()
    284 		if acState != connectivity.Idle {
    285 			ac.connect()
    286 		}
    287 	}
    288 }
    289 
    290 func (acbw *acBalancerWrapper) Connect() {
    291 	acbw.mu.Lock()
    292 	defer acbw.mu.Unlock()
    293 	acbw.ac.connect()
    294 }
    295 
    296 func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
    297 	acbw.mu.Lock()
    298 	defer acbw.mu.Unlock()
    299 	return acbw.ac
    300 }
    301