Home | History | Annotate | Download | only in balancer
      1 /*
      2  *
      3  * Copyright 2017 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 // Package balancer defines APIs for load balancing in gRPC.
     20 // All APIs in this package are experimental.
     21 package balancer
     22 
     23 import (
     24 	"errors"
     25 	"net"
     26 	"strings"
     27 
     28 	"golang.org/x/net/context"
     29 	"google.golang.org/grpc/connectivity"
     30 	"google.golang.org/grpc/credentials"
     31 	"google.golang.org/grpc/resolver"
     32 )
     33 
     34 var (
     35 	// m is a map from name to balancer builder.
     36 	m = make(map[string]Builder)
     37 )
     38 
     39 // Register registers the balancer builder to the balancer map. b.Name
     40 // (lowercased) will be used as the name registered with this builder.
     41 //
     42 // NOTE: this function must only be called during initialization time (i.e. in
     43 // an init() function), and is not thread-safe. If multiple Balancers are
     44 // registered with the same name, the one registered last will take effect.
     45 func Register(b Builder) {
     46 	m[strings.ToLower(b.Name())] = b
     47 }
     48 
     49 // Get returns the resolver builder registered with the given name.
     50 // Note that the compare is done in a case-insenstive fashion.
     51 // If no builder is register with the name, nil will be returned.
     52 func Get(name string) Builder {
     53 	if b, ok := m[strings.ToLower(name)]; ok {
     54 		return b
     55 	}
     56 	return nil
     57 }
     58 
     59 // SubConn represents a gRPC sub connection.
     60 // Each sub connection contains a list of addresses. gRPC will
     61 // try to connect to them (in sequence), and stop trying the
     62 // remainder once one connection is successful.
     63 //
     64 // The reconnect backoff will be applied on the list, not a single address.
     65 // For example, try_on_all_addresses -> backoff -> try_on_all_addresses.
     66 //
     67 // All SubConns start in IDLE, and will not try to connect. To trigger
     68 // the connecting, Balancers must call Connect.
     69 // When the connection encounters an error, it will reconnect immediately.
     70 // When the connection becomes IDLE, it will not reconnect unless Connect is
     71 // called.
     72 //
     73 // This interface is to be implemented by gRPC. Users should not need a
     74 // brand new implementation of this interface. For the situations like
     75 // testing, the new implementation should embed this interface. This allows
     76 // gRPC to add new methods to this interface.
     77 type SubConn interface {
     78 	// UpdateAddresses updates the addresses used in this SubConn.
     79 	// gRPC checks if currently-connected address is still in the new list.
     80 	// If it's in the list, the connection will be kept.
     81 	// If it's not in the list, the connection will gracefully closed, and
     82 	// a new connection will be created.
     83 	//
     84 	// This will trigger a state transition for the SubConn.
     85 	UpdateAddresses([]resolver.Address)
     86 	// Connect starts the connecting for this SubConn.
     87 	Connect()
     88 }
     89 
     90 // NewSubConnOptions contains options to create new SubConn.
     91 type NewSubConnOptions struct{}
     92 
     93 // ClientConn represents a gRPC ClientConn.
     94 //
     95 // This interface is to be implemented by gRPC. Users should not need a
     96 // brand new implementation of this interface. For the situations like
     97 // testing, the new implementation should embed this interface. This allows
     98 // gRPC to add new methods to this interface.
     99 type ClientConn interface {
    100 	// NewSubConn is called by balancer to create a new SubConn.
    101 	// It doesn't block and wait for the connections to be established.
    102 	// Behaviors of the SubConn can be controlled by options.
    103 	NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
    104 	// RemoveSubConn removes the SubConn from ClientConn.
    105 	// The SubConn will be shutdown.
    106 	RemoveSubConn(SubConn)
    107 
    108 	// UpdateBalancerState is called by balancer to nofity gRPC that some internal
    109 	// state in balancer has changed.
    110 	//
    111 	// gRPC will update the connectivity state of the ClientConn, and will call pick
    112 	// on the new picker to pick new SubConn.
    113 	UpdateBalancerState(s connectivity.State, p Picker)
    114 
    115 	// ResolveNow is called by balancer to notify gRPC to do a name resolving.
    116 	ResolveNow(resolver.ResolveNowOption)
    117 
    118 	// Target returns the dial target for this ClientConn.
    119 	Target() string
    120 }
    121 
    122 // BuildOptions contains additional information for Build.
    123 type BuildOptions struct {
    124 	// DialCreds is the transport credential the Balancer implementation can
    125 	// use to dial to a remote load balancer server. The Balancer implementations
    126 	// can ignore this if it does not need to talk to another party securely.
    127 	DialCreds credentials.TransportCredentials
    128 	// Dialer is the custom dialer the Balancer implementation can use to dial
    129 	// to a remote load balancer server. The Balancer implementations
    130 	// can ignore this if it doesn't need to talk to remote balancer.
    131 	Dialer func(context.Context, string) (net.Conn, error)
    132 	// ChannelzParentID is the entity parent's channelz unique identification number.
    133 	ChannelzParentID int64
    134 }
    135 
    136 // Builder creates a balancer.
    137 type Builder interface {
    138 	// Build creates a new balancer with the ClientConn.
    139 	Build(cc ClientConn, opts BuildOptions) Balancer
    140 	// Name returns the name of balancers built by this builder.
    141 	// It will be used to pick balancers (for example in service config).
    142 	Name() string
    143 }
    144 
    145 // PickOptions contains addition information for the Pick operation.
    146 type PickOptions struct{}
    147 
    148 // DoneInfo contains additional information for done.
    149 type DoneInfo struct {
    150 	// Err is the rpc error the RPC finished with. It could be nil.
    151 	Err error
    152 	// BytesSent indicates if any bytes have been sent to the server.
    153 	BytesSent bool
    154 	// BytesReceived indicates if any byte has been received from the server.
    155 	BytesReceived bool
    156 }
    157 
    158 var (
    159 	// ErrNoSubConnAvailable indicates no SubConn is available for pick().
    160 	// gRPC will block the RPC until a new picker is available via UpdateBalancerState().
    161 	ErrNoSubConnAvailable = errors.New("no SubConn is available")
    162 	// ErrTransientFailure indicates all SubConns are in TransientFailure.
    163 	// WaitForReady RPCs will block, non-WaitForReady RPCs will fail.
    164 	ErrTransientFailure = errors.New("all SubConns are in TransientFailure")
    165 )
    166 
    167 // Picker is used by gRPC to pick a SubConn to send an RPC.
    168 // Balancer is expected to generate a new picker from its snapshot every time its
    169 // internal state has changed.
    170 //
    171 // The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
    172 type Picker interface {
    173 	// Pick returns the SubConn to be used to send the RPC.
    174 	// The returned SubConn must be one returned by NewSubConn().
    175 	//
    176 	// This functions is expected to return:
    177 	// - a SubConn that is known to be READY;
    178 	// - ErrNoSubConnAvailable if no SubConn is available, but progress is being
    179 	//   made (for example, some SubConn is in CONNECTING mode);
    180 	// - other errors if no active connecting is happening (for example, all SubConn
    181 	//   are in TRANSIENT_FAILURE mode).
    182 	//
    183 	// If a SubConn is returned:
    184 	// - If it is READY, gRPC will send the RPC on it;
    185 	// - If it is not ready, or becomes not ready after it's returned, gRPC will block
    186 	//   until UpdateBalancerState() is called and will call pick on the new picker.
    187 	//
    188 	// If the returned error is not nil:
    189 	// - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState()
    190 	// - If the error is ErrTransientFailure:
    191 	//   - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState()
    192 	//     is called to pick again;
    193 	//   - Otherwise, RPC will fail with unavailable error.
    194 	// - Else (error is other non-nil error):
    195 	//   - The RPC will fail with unavailable error.
    196 	//
    197 	// The returned done() function will be called once the rpc has finished, with the
    198 	// final status of that RPC.
    199 	// done may be nil if balancer doesn't care about the RPC status.
    200 	Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error)
    201 }
    202 
    203 // Balancer takes input from gRPC, manages SubConns, and collects and aggregates
    204 // the connectivity states.
    205 //
    206 // It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
    207 //
    208 // HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed
    209 // to be called synchronously from the same goroutine.
    210 // There's no guarantee on picker.Pick, it may be called anytime.
    211 type Balancer interface {
    212 	// HandleSubConnStateChange is called by gRPC when the connectivity state
    213 	// of sc has changed.
    214 	// Balancer is expected to aggregate all the state of SubConn and report
    215 	// that back to gRPC.
    216 	// Balancer should also generate and update Pickers when its internal state has
    217 	// been changed by the new state.
    218 	HandleSubConnStateChange(sc SubConn, state connectivity.State)
    219 	// HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
    220 	// balancers.
    221 	// Balancer can create new SubConn or remove SubConn with the addresses.
    222 	// An empty address slice and a non-nil error will be passed if the resolver returns
    223 	// non-nil error to gRPC.
    224 	HandleResolvedAddrs([]resolver.Address, error)
    225 	// Close closes the balancer. The balancer is not required to call
    226 	// ClientConn.RemoveSubConn for its existing SubConns.
    227 	Close()
    228 }
    229 
    230 // ConnectivityStateEvaluator takes the connectivity states of multiple SubConns
    231 // and returns one aggregated connectivity state.
    232 //
    233 // It's not thread safe.
    234 type ConnectivityStateEvaluator struct {
    235 	numReady            uint64 // Number of addrConns in ready state.
    236 	numConnecting       uint64 // Number of addrConns in connecting state.
    237 	numTransientFailure uint64 // Number of addrConns in transientFailure.
    238 }
    239 
    240 // RecordTransition records state change happening in subConn and based on that
    241 // it evaluates what aggregated state should be.
    242 //
    243 //  - If at least one SubConn in Ready, the aggregated state is Ready;
    244 //  - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
    245 //  - Else the aggregated state is TransientFailure.
    246 //
    247 // Idle and Shutdown are not considered.
    248 func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State {
    249 	// Update counters.
    250 	for idx, state := range []connectivity.State{oldState, newState} {
    251 		updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
    252 		switch state {
    253 		case connectivity.Ready:
    254 			cse.numReady += updateVal
    255 		case connectivity.Connecting:
    256 			cse.numConnecting += updateVal
    257 		case connectivity.TransientFailure:
    258 			cse.numTransientFailure += updateVal
    259 		}
    260 	}
    261 
    262 	// Evaluate.
    263 	if cse.numReady > 0 {
    264 		return connectivity.Ready
    265 	}
    266 	if cse.numConnecting > 0 {
    267 		return connectivity.Connecting
    268 	}
    269 	return connectivity.TransientFailure
    270 }
    271