Home | History | Annotate | Download | only in base
      1 /*
      2  *
      3  * Copyright 2017 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package base
     20 
     21 import (
     22 	"golang.org/x/net/context"
     23 	"google.golang.org/grpc/balancer"
     24 	"google.golang.org/grpc/connectivity"
     25 	"google.golang.org/grpc/grpclog"
     26 	"google.golang.org/grpc/resolver"
     27 )
     28 
     29 type baseBuilder struct {
     30 	name          string
     31 	pickerBuilder PickerBuilder
     32 }
     33 
     34 func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
     35 	return &baseBalancer{
     36 		cc:            cc,
     37 		pickerBuilder: bb.pickerBuilder,
     38 
     39 		subConns: make(map[resolver.Address]balancer.SubConn),
     40 		scStates: make(map[balancer.SubConn]connectivity.State),
     41 		csEvltr:  &connectivityStateEvaluator{},
     42 		// Initialize picker to a picker that always return
     43 		// ErrNoSubConnAvailable, because when state of a SubConn changes, we
     44 		// may call UpdateBalancerState with this picker.
     45 		picker: NewErrPicker(balancer.ErrNoSubConnAvailable),
     46 	}
     47 }
     48 
     49 func (bb *baseBuilder) Name() string {
     50 	return bb.name
     51 }
     52 
     53 type baseBalancer struct {
     54 	cc            balancer.ClientConn
     55 	pickerBuilder PickerBuilder
     56 
     57 	csEvltr *connectivityStateEvaluator
     58 	state   connectivity.State
     59 
     60 	subConns map[resolver.Address]balancer.SubConn
     61 	scStates map[balancer.SubConn]connectivity.State
     62 	picker   balancer.Picker
     63 }
     64 
     65 func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
     66 	if err != nil {
     67 		grpclog.Infof("base.baseBalancer: HandleResolvedAddrs called with error %v", err)
     68 		return
     69 	}
     70 	grpclog.Infoln("base.baseBalancer: got new resolved addresses: ", addrs)
     71 	// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
     72 	addrsSet := make(map[resolver.Address]struct{})
     73 	for _, a := range addrs {
     74 		addrsSet[a] = struct{}{}
     75 		if _, ok := b.subConns[a]; !ok {
     76 			// a is a new address (not existing in b.subConns).
     77 			sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
     78 			if err != nil {
     79 				grpclog.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
     80 				continue
     81 			}
     82 			b.subConns[a] = sc
     83 			b.scStates[sc] = connectivity.Idle
     84 			sc.Connect()
     85 		}
     86 	}
     87 	for a, sc := range b.subConns {
     88 		// a was removed by resolver.
     89 		if _, ok := addrsSet[a]; !ok {
     90 			b.cc.RemoveSubConn(sc)
     91 			delete(b.subConns, a)
     92 			// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
     93 			// The entry will be deleted in HandleSubConnStateChange.
     94 		}
     95 	}
     96 }
     97 
     98 // regeneratePicker takes a snapshot of the balancer, and generates a picker
     99 // from it. The picker is
    100 //  - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
    101 //  - built by the pickerBuilder with all READY SubConns otherwise.
    102 func (b *baseBalancer) regeneratePicker() {
    103 	if b.state == connectivity.TransientFailure {
    104 		b.picker = NewErrPicker(balancer.ErrTransientFailure)
    105 		return
    106 	}
    107 	readySCs := make(map[resolver.Address]balancer.SubConn)
    108 
    109 	// Filter out all ready SCs from full subConn map.
    110 	for addr, sc := range b.subConns {
    111 		if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
    112 			readySCs[addr] = sc
    113 		}
    114 	}
    115 	b.picker = b.pickerBuilder.Build(readySCs)
    116 }
    117 
    118 func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
    119 	grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
    120 	oldS, ok := b.scStates[sc]
    121 	if !ok {
    122 		grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
    123 		return
    124 	}
    125 	b.scStates[sc] = s
    126 	switch s {
    127 	case connectivity.Idle:
    128 		sc.Connect()
    129 	case connectivity.Shutdown:
    130 		// When an address was removed by resolver, b called RemoveSubConn but
    131 		// kept the sc's state in scStates. Remove state for this sc here.
    132 		delete(b.scStates, sc)
    133 	}
    134 
    135 	oldAggrState := b.state
    136 	b.state = b.csEvltr.recordTransition(oldS, s)
    137 
    138 	// Regenerate picker when one of the following happens:
    139 	//  - this sc became ready from not-ready
    140 	//  - this sc became not-ready from ready
    141 	//  - the aggregated state of balancer became TransientFailure from non-TransientFailure
    142 	//  - the aggregated state of balancer became non-TransientFailure from TransientFailure
    143 	if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
    144 		(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
    145 		b.regeneratePicker()
    146 	}
    147 
    148 	b.cc.UpdateBalancerState(b.state, b.picker)
    149 }
    150 
    151 // Close is a nop because base balancer doesn't have internal state to clean up,
    152 // and it doesn't need to call RemoveSubConn for the SubConns.
    153 func (b *baseBalancer) Close() {
    154 }
    155 
    156 // NewErrPicker returns a picker that always returns err on Pick().
    157 func NewErrPicker(err error) balancer.Picker {
    158 	return &errPicker{err: err}
    159 }
    160 
    161 type errPicker struct {
    162 	err error // Pick() always returns this err.
    163 }
    164 
    165 func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
    166 	return nil, nil, p.err
    167 }
    168 
    169 // connectivityStateEvaluator gets updated by addrConns when their
    170 // states transition, based on which it evaluates the state of
    171 // ClientConn.
    172 type connectivityStateEvaluator struct {
    173 	numReady            uint64 // Number of addrConns in ready state.
    174 	numConnecting       uint64 // Number of addrConns in connecting state.
    175 	numTransientFailure uint64 // Number of addrConns in transientFailure.
    176 }
    177 
    178 // recordTransition records state change happening in every subConn and based on
    179 // that it evaluates what aggregated state should be.
    180 // It can only transition between Ready, Connecting and TransientFailure. Other states,
    181 // Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
    182 // before any subConn is created ClientConn is in idle state. In the end when ClientConn
    183 // closes it is in Shutdown state.
    184 //
    185 // recordTransition should only be called synchronously from the same goroutine.
    186 func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
    187 	// Update counters.
    188 	for idx, state := range []connectivity.State{oldState, newState} {
    189 		updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
    190 		switch state {
    191 		case connectivity.Ready:
    192 			cse.numReady += updateVal
    193 		case connectivity.Connecting:
    194 			cse.numConnecting += updateVal
    195 		case connectivity.TransientFailure:
    196 			cse.numTransientFailure += updateVal
    197 		}
    198 	}
    199 
    200 	// Evaluate.
    201 	if cse.numReady > 0 {
    202 		return connectivity.Ready
    203 	}
    204 	if cse.numConnecting > 0 {
    205 		return connectivity.Connecting
    206 	}
    207 	return connectivity.TransientFailure
    208 }
    209