Home | History | Annotate | Download | only in grpc
      1 /*
      2  *
      3  * Copyright 2017 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package grpc
     20 
     21 import (
     22 	"strings"
     23 	"sync"
     24 
     25 	"golang.org/x/net/context"
     26 	"google.golang.org/grpc/balancer"
     27 	"google.golang.org/grpc/codes"
     28 	"google.golang.org/grpc/connectivity"
     29 	"google.golang.org/grpc/grpclog"
     30 	"google.golang.org/grpc/resolver"
     31 	"google.golang.org/grpc/status"
     32 )
     33 
     34 type balancerWrapperBuilder struct {
     35 	b Balancer // The v1 balancer.
     36 }
     37 
     38 func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
     39 	targetAddr := cc.Target()
     40 	targetSplitted := strings.Split(targetAddr, ":///")
     41 	if len(targetSplitted) >= 2 {
     42 		targetAddr = targetSplitted[1]
     43 	}
     44 
     45 	bwb.b.Start(targetAddr, BalancerConfig{
     46 		DialCreds: opts.DialCreds,
     47 		Dialer:    opts.Dialer,
     48 	})
     49 	_, pickfirst := bwb.b.(*pickFirst)
     50 	bw := &balancerWrapper{
     51 		balancer:   bwb.b,
     52 		pickfirst:  pickfirst,
     53 		cc:         cc,
     54 		targetAddr: targetAddr,
     55 		startCh:    make(chan struct{}),
     56 		conns:      make(map[resolver.Address]balancer.SubConn),
     57 		connSt:     make(map[balancer.SubConn]*scState),
     58 		csEvltr:    &balancer.ConnectivityStateEvaluator{},
     59 		state:      connectivity.Idle,
     60 	}
     61 	cc.UpdateBalancerState(connectivity.Idle, bw)
     62 	go bw.lbWatcher()
     63 	return bw
     64 }
     65 
     66 func (bwb *balancerWrapperBuilder) Name() string {
     67 	return "wrapper"
     68 }
     69 
     70 type scState struct {
     71 	addr Address // The v1 address type.
     72 	s    connectivity.State
     73 	down func(error)
     74 }
     75 
     76 type balancerWrapper struct {
     77 	balancer  Balancer // The v1 balancer.
     78 	pickfirst bool
     79 
     80 	cc         balancer.ClientConn
     81 	targetAddr string // Target without the scheme.
     82 
     83 	mu     sync.Mutex
     84 	conns  map[resolver.Address]balancer.SubConn
     85 	connSt map[balancer.SubConn]*scState
     86 	// This channel is closed when handling the first resolver result.
     87 	// lbWatcher blocks until this is closed, to avoid race between
     88 	// - NewSubConn is created, cc wants to notify balancer of state changes;
     89 	// - Build hasn't return, cc doesn't have access to balancer.
     90 	startCh chan struct{}
     91 
     92 	// To aggregate the connectivity state.
     93 	csEvltr *balancer.ConnectivityStateEvaluator
     94 	state   connectivity.State
     95 }
     96 
     97 // lbWatcher watches the Notify channel of the balancer and manages
     98 // connections accordingly.
     99 func (bw *balancerWrapper) lbWatcher() {
    100 	<-bw.startCh
    101 	notifyCh := bw.balancer.Notify()
    102 	if notifyCh == nil {
    103 		// There's no resolver in the balancer. Connect directly.
    104 		a := resolver.Address{
    105 			Addr: bw.targetAddr,
    106 			Type: resolver.Backend,
    107 		}
    108 		sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
    109 		if err != nil {
    110 			grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
    111 		} else {
    112 			bw.mu.Lock()
    113 			bw.conns[a] = sc
    114 			bw.connSt[sc] = &scState{
    115 				addr: Address{Addr: bw.targetAddr},
    116 				s:    connectivity.Idle,
    117 			}
    118 			bw.mu.Unlock()
    119 			sc.Connect()
    120 		}
    121 		return
    122 	}
    123 
    124 	for addrs := range notifyCh {
    125 		grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs)
    126 		if bw.pickfirst {
    127 			var (
    128 				oldA  resolver.Address
    129 				oldSC balancer.SubConn
    130 			)
    131 			bw.mu.Lock()
    132 			for oldA, oldSC = range bw.conns {
    133 				break
    134 			}
    135 			bw.mu.Unlock()
    136 			if len(addrs) <= 0 {
    137 				if oldSC != nil {
    138 					// Teardown old sc.
    139 					bw.mu.Lock()
    140 					delete(bw.conns, oldA)
    141 					delete(bw.connSt, oldSC)
    142 					bw.mu.Unlock()
    143 					bw.cc.RemoveSubConn(oldSC)
    144 				}
    145 				continue
    146 			}
    147 
    148 			var newAddrs []resolver.Address
    149 			for _, a := range addrs {
    150 				newAddr := resolver.Address{
    151 					Addr:       a.Addr,
    152 					Type:       resolver.Backend, // All addresses from balancer are all backends.
    153 					ServerName: "",
    154 					Metadata:   a.Metadata,
    155 				}
    156 				newAddrs = append(newAddrs, newAddr)
    157 			}
    158 			if oldSC == nil {
    159 				// Create new sc.
    160 				sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{})
    161 				if err != nil {
    162 					grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err)
    163 				} else {
    164 					bw.mu.Lock()
    165 					// For pickfirst, there should be only one SubConn, so the
    166 					// address doesn't matter. All states updating (up and down)
    167 					// and picking should all happen on that only SubConn.
    168 					bw.conns[resolver.Address{}] = sc
    169 					bw.connSt[sc] = &scState{
    170 						addr: addrs[0], // Use the first address.
    171 						s:    connectivity.Idle,
    172 					}
    173 					bw.mu.Unlock()
    174 					sc.Connect()
    175 				}
    176 			} else {
    177 				bw.mu.Lock()
    178 				bw.connSt[oldSC].addr = addrs[0]
    179 				bw.mu.Unlock()
    180 				oldSC.UpdateAddresses(newAddrs)
    181 			}
    182 		} else {
    183 			var (
    184 				add []resolver.Address // Addresses need to setup connections.
    185 				del []balancer.SubConn // Connections need to tear down.
    186 			)
    187 			resAddrs := make(map[resolver.Address]Address)
    188 			for _, a := range addrs {
    189 				resAddrs[resolver.Address{
    190 					Addr:       a.Addr,
    191 					Type:       resolver.Backend, // All addresses from balancer are all backends.
    192 					ServerName: "",
    193 					Metadata:   a.Metadata,
    194 				}] = a
    195 			}
    196 			bw.mu.Lock()
    197 			for a := range resAddrs {
    198 				if _, ok := bw.conns[a]; !ok {
    199 					add = append(add, a)
    200 				}
    201 			}
    202 			for a, c := range bw.conns {
    203 				if _, ok := resAddrs[a]; !ok {
    204 					del = append(del, c)
    205 					delete(bw.conns, a)
    206 					// Keep the state of this sc in bw.connSt until its state becomes Shutdown.
    207 				}
    208 			}
    209 			bw.mu.Unlock()
    210 			for _, a := range add {
    211 				sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
    212 				if err != nil {
    213 					grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
    214 				} else {
    215 					bw.mu.Lock()
    216 					bw.conns[a] = sc
    217 					bw.connSt[sc] = &scState{
    218 						addr: resAddrs[a],
    219 						s:    connectivity.Idle,
    220 					}
    221 					bw.mu.Unlock()
    222 					sc.Connect()
    223 				}
    224 			}
    225 			for _, c := range del {
    226 				bw.cc.RemoveSubConn(c)
    227 			}
    228 		}
    229 	}
    230 }
    231 
    232 func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
    233 	bw.mu.Lock()
    234 	defer bw.mu.Unlock()
    235 	scSt, ok := bw.connSt[sc]
    236 	if !ok {
    237 		return
    238 	}
    239 	if s == connectivity.Idle {
    240 		sc.Connect()
    241 	}
    242 	oldS := scSt.s
    243 	scSt.s = s
    244 	if oldS != connectivity.Ready && s == connectivity.Ready {
    245 		scSt.down = bw.balancer.Up(scSt.addr)
    246 	} else if oldS == connectivity.Ready && s != connectivity.Ready {
    247 		if scSt.down != nil {
    248 			scSt.down(errConnClosing)
    249 		}
    250 	}
    251 	sa := bw.csEvltr.RecordTransition(oldS, s)
    252 	if bw.state != sa {
    253 		bw.state = sa
    254 	}
    255 	bw.cc.UpdateBalancerState(bw.state, bw)
    256 	if s == connectivity.Shutdown {
    257 		// Remove state for this sc.
    258 		delete(bw.connSt, sc)
    259 	}
    260 }
    261 
    262 func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
    263 	bw.mu.Lock()
    264 	defer bw.mu.Unlock()
    265 	select {
    266 	case <-bw.startCh:
    267 	default:
    268 		close(bw.startCh)
    269 	}
    270 	// There should be a resolver inside the balancer.
    271 	// All updates here, if any, are ignored.
    272 }
    273 
    274 func (bw *balancerWrapper) Close() {
    275 	bw.mu.Lock()
    276 	defer bw.mu.Unlock()
    277 	select {
    278 	case <-bw.startCh:
    279 	default:
    280 		close(bw.startCh)
    281 	}
    282 	bw.balancer.Close()
    283 }
    284 
    285 // The picker is the balancerWrapper itself.
    286 // Pick should never return ErrNoSubConnAvailable.
    287 // It either blocks or returns error, consistent with v1 balancer Get().
    288 func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
    289 	failfast := true // Default failfast is true.
    290 	if ss, ok := rpcInfoFromContext(ctx); ok {
    291 		failfast = ss.failfast
    292 	}
    293 	a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast})
    294 	if err != nil {
    295 		return nil, nil, err
    296 	}
    297 	var done func(balancer.DoneInfo)
    298 	if p != nil {
    299 		done = func(i balancer.DoneInfo) { p() }
    300 	}
    301 	var sc balancer.SubConn
    302 	bw.mu.Lock()
    303 	defer bw.mu.Unlock()
    304 	if bw.pickfirst {
    305 		// Get the first sc in conns.
    306 		for _, sc = range bw.conns {
    307 			break
    308 		}
    309 	} else {
    310 		var ok bool
    311 		sc, ok = bw.conns[resolver.Address{
    312 			Addr:       a.Addr,
    313 			Type:       resolver.Backend,
    314 			ServerName: "",
    315 			Metadata:   a.Metadata,
    316 		}]
    317 		if !ok && failfast {
    318 			return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available")
    319 		}
    320 		if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) {
    321 			// If the returned sc is not ready and RPC is failfast,
    322 			// return error, and this RPC will fail.
    323 			return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available")
    324 		}
    325 	}
    326 
    327 	return sc, done, nil
    328 }
    329