1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package base 20 21 import ( 22 "golang.org/x/net/context" 23 "google.golang.org/grpc/balancer" 24 "google.golang.org/grpc/connectivity" 25 "google.golang.org/grpc/grpclog" 26 "google.golang.org/grpc/resolver" 27 ) 28 29 type baseBuilder struct { 30 name string 31 pickerBuilder PickerBuilder 32 } 33 34 func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { 35 return &baseBalancer{ 36 cc: cc, 37 pickerBuilder: bb.pickerBuilder, 38 39 subConns: make(map[resolver.Address]balancer.SubConn), 40 scStates: make(map[balancer.SubConn]connectivity.State), 41 csEvltr: &connectivityStateEvaluator{}, 42 // Initialize picker to a picker that always return 43 // ErrNoSubConnAvailable, because when state of a SubConn changes, we 44 // may call UpdateBalancerState with this picker. 45 picker: NewErrPicker(balancer.ErrNoSubConnAvailable), 46 } 47 } 48 49 func (bb *baseBuilder) Name() string { 50 return bb.name 51 } 52 53 type baseBalancer struct { 54 cc balancer.ClientConn 55 pickerBuilder PickerBuilder 56 57 csEvltr *connectivityStateEvaluator 58 state connectivity.State 59 60 subConns map[resolver.Address]balancer.SubConn 61 scStates map[balancer.SubConn]connectivity.State 62 picker balancer.Picker 63 } 64 65 func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { 66 if err != nil { 67 grpclog.Infof("base.baseBalancer: HandleResolvedAddrs called with error %v", err) 68 return 69 } 70 grpclog.Infoln("base.baseBalancer: got new resolved addresses: ", addrs) 71 // addrsSet is the set converted from addrs, it's used for quick lookup of an address. 72 addrsSet := make(map[resolver.Address]struct{}) 73 for _, a := range addrs { 74 addrsSet[a] = struct{}{} 75 if _, ok := b.subConns[a]; !ok { 76 // a is a new address (not existing in b.subConns). 77 sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{}) 78 if err != nil { 79 grpclog.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) 80 continue 81 } 82 b.subConns[a] = sc 83 b.scStates[sc] = connectivity.Idle 84 sc.Connect() 85 } 86 } 87 for a, sc := range b.subConns { 88 // a was removed by resolver. 89 if _, ok := addrsSet[a]; !ok { 90 b.cc.RemoveSubConn(sc) 91 delete(b.subConns, a) 92 // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. 93 // The entry will be deleted in HandleSubConnStateChange. 94 } 95 } 96 } 97 98 // regeneratePicker takes a snapshot of the balancer, and generates a picker 99 // from it. The picker is 100 // - errPicker with ErrTransientFailure if the balancer is in TransientFailure, 101 // - built by the pickerBuilder with all READY SubConns otherwise. 102 func (b *baseBalancer) regeneratePicker() { 103 if b.state == connectivity.TransientFailure { 104 b.picker = NewErrPicker(balancer.ErrTransientFailure) 105 return 106 } 107 readySCs := make(map[resolver.Address]balancer.SubConn) 108 109 // Filter out all ready SCs from full subConn map. 110 for addr, sc := range b.subConns { 111 if st, ok := b.scStates[sc]; ok && st == connectivity.Ready { 112 readySCs[addr] = sc 113 } 114 } 115 b.picker = b.pickerBuilder.Build(readySCs) 116 } 117 118 func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 119 grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s) 120 oldS, ok := b.scStates[sc] 121 if !ok { 122 grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s) 123 return 124 } 125 b.scStates[sc] = s 126 switch s { 127 case connectivity.Idle: 128 sc.Connect() 129 case connectivity.Shutdown: 130 // When an address was removed by resolver, b called RemoveSubConn but 131 // kept the sc's state in scStates. Remove state for this sc here. 132 delete(b.scStates, sc) 133 } 134 135 oldAggrState := b.state 136 b.state = b.csEvltr.recordTransition(oldS, s) 137 138 // Regenerate picker when one of the following happens: 139 // - this sc became ready from not-ready 140 // - this sc became not-ready from ready 141 // - the aggregated state of balancer became TransientFailure from non-TransientFailure 142 // - the aggregated state of balancer became non-TransientFailure from TransientFailure 143 if (s == connectivity.Ready) != (oldS == connectivity.Ready) || 144 (b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) { 145 b.regeneratePicker() 146 } 147 148 b.cc.UpdateBalancerState(b.state, b.picker) 149 } 150 151 // Close is a nop because base balancer doesn't have internal state to clean up, 152 // and it doesn't need to call RemoveSubConn for the SubConns. 153 func (b *baseBalancer) Close() { 154 } 155 156 // NewErrPicker returns a picker that always returns err on Pick(). 157 func NewErrPicker(err error) balancer.Picker { 158 return &errPicker{err: err} 159 } 160 161 type errPicker struct { 162 err error // Pick() always returns this err. 163 } 164 165 func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { 166 return nil, nil, p.err 167 } 168 169 // connectivityStateEvaluator gets updated by addrConns when their 170 // states transition, based on which it evaluates the state of 171 // ClientConn. 172 type connectivityStateEvaluator struct { 173 numReady uint64 // Number of addrConns in ready state. 174 numConnecting uint64 // Number of addrConns in connecting state. 175 numTransientFailure uint64 // Number of addrConns in transientFailure. 176 } 177 178 // recordTransition records state change happening in every subConn and based on 179 // that it evaluates what aggregated state should be. 180 // It can only transition between Ready, Connecting and TransientFailure. Other states, 181 // Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection 182 // before any subConn is created ClientConn is in idle state. In the end when ClientConn 183 // closes it is in Shutdown state. 184 // 185 // recordTransition should only be called synchronously from the same goroutine. 186 func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State { 187 // Update counters. 188 for idx, state := range []connectivity.State{oldState, newState} { 189 updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. 190 switch state { 191 case connectivity.Ready: 192 cse.numReady += updateVal 193 case connectivity.Connecting: 194 cse.numConnecting += updateVal 195 case connectivity.TransientFailure: 196 cse.numTransientFailure += updateVal 197 } 198 } 199 200 // Evaluate. 201 if cse.numReady > 0 { 202 return connectivity.Ready 203 } 204 if cse.numConnecting > 0 { 205 return connectivity.Connecting 206 } 207 return connectivity.TransientFailure 208 } 209