Home | History | Annotate | Download | only in grpc
      1 /*
      2  *
      3  * Copyright 2014 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package grpc
     20 
     21 import (
     22 	"errors"
     23 	"fmt"
     24 	"math"
     25 	"net"
     26 	"reflect"
     27 	"strings"
     28 	"sync"
     29 	"time"
     30 
     31 	"golang.org/x/net/context"
     32 	"golang.org/x/net/trace"
     33 	"google.golang.org/grpc/balancer"
     34 	_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
     35 	"google.golang.org/grpc/codes"
     36 	"google.golang.org/grpc/connectivity"
     37 	"google.golang.org/grpc/credentials"
     38 	"google.golang.org/grpc/grpclog"
     39 	"google.golang.org/grpc/internal"
     40 	"google.golang.org/grpc/internal/backoff"
     41 	"google.golang.org/grpc/internal/channelz"
     42 	"google.golang.org/grpc/keepalive"
     43 	"google.golang.org/grpc/resolver"
     44 	_ "google.golang.org/grpc/resolver/dns"         // To register dns resolver.
     45 	_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
     46 	"google.golang.org/grpc/stats"
     47 	"google.golang.org/grpc/status"
     48 	"google.golang.org/grpc/transport"
     49 )
     50 
     51 const (
     52 	// minimum time to give a connection to complete
     53 	minConnectTimeout = 20 * time.Second
     54 	// must match grpclbName in grpclb/grpclb.go
     55 	grpclbName = "grpclb"
     56 )
     57 
     58 var (
     59 	// ErrClientConnClosing indicates that the operation is illegal because
     60 	// the ClientConn is closing.
     61 	//
     62 	// Deprecated: this error should not be relied upon by users; use the status
     63 	// code of Canceled instead.
     64 	ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
     65 	// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
     66 	errConnDrain = errors.New("grpc: the connection is drained")
     67 	// errConnClosing indicates that the connection is closing.
     68 	errConnClosing = errors.New("grpc: the connection is closing")
     69 	// errConnUnavailable indicates that the connection is unavailable.
     70 	errConnUnavailable = errors.New("grpc: the connection is unavailable")
     71 	// errBalancerClosed indicates that the balancer is closed.
     72 	errBalancerClosed = errors.New("grpc: balancer is closed")
     73 	// We use an accessor so that minConnectTimeout can be
     74 	// atomically read and updated while testing.
     75 	getMinConnectTimeout = func() time.Duration {
     76 		return minConnectTimeout
     77 	}
     78 )
     79 
     80 // The following errors are returned from Dial and DialContext
     81 var (
     82 	// errNoTransportSecurity indicates that there is no transport security
     83 	// being set for ClientConn. Users should either set one or explicitly
     84 	// call WithInsecure DialOption to disable security.
     85 	errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
     86 	// errTransportCredentialsMissing indicates that users want to transmit security
     87 	// information (e.g., oauth2 token) which requires secure connection on an insecure
     88 	// connection.
     89 	errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
     90 	// errCredentialsConflict indicates that grpc.WithTransportCredentials()
     91 	// and grpc.WithInsecure() are both called for a connection.
     92 	errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
     93 	// errNetworkIO indicates that the connection is down due to some network I/O error.
     94 	errNetworkIO = errors.New("grpc: failed with network I/O error")
     95 )
     96 
     97 // dialOptions configure a Dial call. dialOptions are set by the DialOption
     98 // values passed to Dial.
     99 type dialOptions struct {
    100 	unaryInt    UnaryClientInterceptor
    101 	streamInt   StreamClientInterceptor
    102 	cp          Compressor
    103 	dc          Decompressor
    104 	bs          backoff.Strategy
    105 	block       bool
    106 	insecure    bool
    107 	timeout     time.Duration
    108 	scChan      <-chan ServiceConfig
    109 	copts       transport.ConnectOptions
    110 	callOptions []CallOption
    111 	// This is used by v1 balancer dial option WithBalancer to support v1
    112 	// balancer, and also by WithBalancerName dial option.
    113 	balancerBuilder balancer.Builder
    114 	// This is to support grpclb.
    115 	resolverBuilder      resolver.Builder
    116 	waitForHandshake     bool
    117 	channelzParentID     int64
    118 	disableServiceConfig bool
    119 }
    120 
    121 const (
    122 	defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
    123 	defaultClientMaxSendMessageSize    = math.MaxInt32
    124 )
    125 
    126 // RegisterChannelz turns on channelz service.
    127 // This is an EXPERIMENTAL API.
    128 func RegisterChannelz() {
    129 	channelz.TurnOn()
    130 }
    131 
    132 // DialOption configures how we set up the connection.
    133 type DialOption func(*dialOptions)
    134 
    135 // WithWaitForHandshake blocks until the initial settings frame is received from the
    136 // server before assigning RPCs to the connection.
    137 // Experimental API.
    138 func WithWaitForHandshake() DialOption {
    139 	return func(o *dialOptions) {
    140 		o.waitForHandshake = true
    141 	}
    142 }
    143 
    144 // WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
    145 // before doing a write on the wire.
    146 func WithWriteBufferSize(s int) DialOption {
    147 	return func(o *dialOptions) {
    148 		o.copts.WriteBufferSize = s
    149 	}
    150 }
    151 
    152 // WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
    153 // for each read syscall.
    154 func WithReadBufferSize(s int) DialOption {
    155 	return func(o *dialOptions) {
    156 		o.copts.ReadBufferSize = s
    157 	}
    158 }
    159 
    160 // WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
    161 // The lower bound for window size is 64K and any value smaller than that will be ignored.
    162 func WithInitialWindowSize(s int32) DialOption {
    163 	return func(o *dialOptions) {
    164 		o.copts.InitialWindowSize = s
    165 	}
    166 }
    167 
    168 // WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
    169 // The lower bound for window size is 64K and any value smaller than that will be ignored.
    170 func WithInitialConnWindowSize(s int32) DialOption {
    171 	return func(o *dialOptions) {
    172 		o.copts.InitialConnWindowSize = s
    173 	}
    174 }
    175 
    176 // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive.
    177 //
    178 // Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
    179 func WithMaxMsgSize(s int) DialOption {
    180 	return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
    181 }
    182 
    183 // WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
    184 func WithDefaultCallOptions(cos ...CallOption) DialOption {
    185 	return func(o *dialOptions) {
    186 		o.callOptions = append(o.callOptions, cos...)
    187 	}
    188 }
    189 
    190 // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
    191 //
    192 // Deprecated: use WithDefaultCallOptions(CallCustomCodec(c)) instead.
    193 func WithCodec(c Codec) DialOption {
    194 	return WithDefaultCallOptions(CallCustomCodec(c))
    195 }
    196 
    197 // WithCompressor returns a DialOption which sets a Compressor to use for
    198 // message compression. It has lower priority than the compressor set by
    199 // the UseCompressor CallOption.
    200 //
    201 // Deprecated: use UseCompressor instead.
    202 func WithCompressor(cp Compressor) DialOption {
    203 	return func(o *dialOptions) {
    204 		o.cp = cp
    205 	}
    206 }
    207 
    208 // WithDecompressor returns a DialOption which sets a Decompressor to use for
    209 // incoming message decompression.  If incoming response messages are encoded
    210 // using the decompressor's Type(), it will be used.  Otherwise, the message
    211 // encoding will be used to look up the compressor registered via
    212 // encoding.RegisterCompressor, which will then be used to decompress the
    213 // message.  If no compressor is registered for the encoding, an Unimplemented
    214 // status error will be returned.
    215 //
    216 // Deprecated: use encoding.RegisterCompressor instead.
    217 func WithDecompressor(dc Decompressor) DialOption {
    218 	return func(o *dialOptions) {
    219 		o.dc = dc
    220 	}
    221 }
    222 
    223 // WithBalancer returns a DialOption which sets a load balancer with the v1 API.
    224 // Name resolver will be ignored if this DialOption is specified.
    225 //
    226 // Deprecated: use the new balancer APIs in balancer package and WithBalancerName.
    227 func WithBalancer(b Balancer) DialOption {
    228 	return func(o *dialOptions) {
    229 		o.balancerBuilder = &balancerWrapperBuilder{
    230 			b: b,
    231 		}
    232 	}
    233 }
    234 
    235 // WithBalancerName sets the balancer that the ClientConn will be initialized
    236 // with. Balancer registered with balancerName will be used. This function
    237 // panics if no balancer was registered by balancerName.
    238 //
    239 // The balancer cannot be overridden by balancer option specified by service
    240 // config.
    241 //
    242 // This is an EXPERIMENTAL API.
    243 func WithBalancerName(balancerName string) DialOption {
    244 	builder := balancer.Get(balancerName)
    245 	if builder == nil {
    246 		panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
    247 	}
    248 	return func(o *dialOptions) {
    249 		o.balancerBuilder = builder
    250 	}
    251 }
    252 
    253 // withResolverBuilder is only for grpclb.
    254 func withResolverBuilder(b resolver.Builder) DialOption {
    255 	return func(o *dialOptions) {
    256 		o.resolverBuilder = b
    257 	}
    258 }
    259 
    260 // WithServiceConfig returns a DialOption which has a channel to read the service configuration.
    261 //
    262 // Deprecated: service config should be received through name resolver, as specified here.
    263 // https://github.com/grpc/grpc/blob/master/doc/service_config.md
    264 func WithServiceConfig(c <-chan ServiceConfig) DialOption {
    265 	return func(o *dialOptions) {
    266 		o.scChan = c
    267 	}
    268 }
    269 
    270 // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
    271 // when backing off after failed connection attempts.
    272 func WithBackoffMaxDelay(md time.Duration) DialOption {
    273 	return WithBackoffConfig(BackoffConfig{MaxDelay: md})
    274 }
    275 
    276 // WithBackoffConfig configures the dialer to use the provided backoff
    277 // parameters after connection failures.
    278 //
    279 // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
    280 // for use.
    281 func WithBackoffConfig(b BackoffConfig) DialOption {
    282 
    283 	return withBackoff(backoff.Exponential{
    284 		MaxDelay: b.MaxDelay,
    285 	})
    286 }
    287 
    288 // withBackoff sets the backoff strategy used for connectRetryNum after a
    289 // failed connection attempt.
    290 //
    291 // This can be exported if arbitrary backoff strategies are allowed by gRPC.
    292 func withBackoff(bs backoff.Strategy) DialOption {
    293 	return func(o *dialOptions) {
    294 		o.bs = bs
    295 	}
    296 }
    297 
    298 // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
    299 // connection is up. Without this, Dial returns immediately and connecting the server
    300 // happens in background.
    301 func WithBlock() DialOption {
    302 	return func(o *dialOptions) {
    303 		o.block = true
    304 	}
    305 }
    306 
    307 // WithInsecure returns a DialOption which disables transport security for this ClientConn.
    308 // Note that transport security is required unless WithInsecure is set.
    309 func WithInsecure() DialOption {
    310 	return func(o *dialOptions) {
    311 		o.insecure = true
    312 	}
    313 }
    314 
    315 // WithTransportCredentials returns a DialOption which configures a
    316 // connection level security credentials (e.g., TLS/SSL).
    317 func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
    318 	return func(o *dialOptions) {
    319 		o.copts.TransportCredentials = creds
    320 	}
    321 }
    322 
    323 // WithPerRPCCredentials returns a DialOption which sets
    324 // credentials and places auth state on each outbound RPC.
    325 func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
    326 	return func(o *dialOptions) {
    327 		o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
    328 	}
    329 }
    330 
    331 // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
    332 // initially. This is valid if and only if WithBlock() is present.
    333 //
    334 // Deprecated: use DialContext and context.WithTimeout instead.
    335 func WithTimeout(d time.Duration) DialOption {
    336 	return func(o *dialOptions) {
    337 		o.timeout = d
    338 	}
    339 }
    340 
    341 func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
    342 	return func(o *dialOptions) {
    343 		o.copts.Dialer = f
    344 	}
    345 }
    346 
    347 func init() {
    348 	internal.WithContextDialer = withContextDialer
    349 	internal.WithResolverBuilder = withResolverBuilder
    350 }
    351 
    352 // WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
    353 // If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
    354 // Temporary() method to decide if it should try to reconnect to the network address.
    355 func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
    356 	return withContextDialer(
    357 		func(ctx context.Context, addr string) (net.Conn, error) {
    358 			if deadline, ok := ctx.Deadline(); ok {
    359 				return f(addr, deadline.Sub(time.Now()))
    360 			}
    361 			return f(addr, 0)
    362 		})
    363 }
    364 
    365 // WithStatsHandler returns a DialOption that specifies the stats handler
    366 // for all the RPCs and underlying network connections in this ClientConn.
    367 func WithStatsHandler(h stats.Handler) DialOption {
    368 	return func(o *dialOptions) {
    369 		o.copts.StatsHandler = h
    370 	}
    371 }
    372 
    373 // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
    374 // If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
    375 // address and won't try to reconnect.
    376 // The default value of FailOnNonTempDialError is false.
    377 // This is an EXPERIMENTAL API.
    378 func FailOnNonTempDialError(f bool) DialOption {
    379 	return func(o *dialOptions) {
    380 		o.copts.FailOnNonTempDialError = f
    381 	}
    382 }
    383 
    384 // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
    385 func WithUserAgent(s string) DialOption {
    386 	return func(o *dialOptions) {
    387 		o.copts.UserAgent = s
    388 	}
    389 }
    390 
    391 // WithKeepaliveParams returns a DialOption that specifies keepalive parameters for the client transport.
    392 func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
    393 	return func(o *dialOptions) {
    394 		o.copts.KeepaliveParams = kp
    395 	}
    396 }
    397 
    398 // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
    399 func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
    400 	return func(o *dialOptions) {
    401 		o.unaryInt = f
    402 	}
    403 }
    404 
    405 // WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
    406 func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
    407 	return func(o *dialOptions) {
    408 		o.streamInt = f
    409 	}
    410 }
    411 
    412 // WithAuthority returns a DialOption that specifies the value to be used as
    413 // the :authority pseudo-header. This value only works with WithInsecure and
    414 // has no effect if TransportCredentials are present.
    415 func WithAuthority(a string) DialOption {
    416 	return func(o *dialOptions) {
    417 		o.copts.Authority = a
    418 	}
    419 }
    420 
    421 // WithChannelzParentID returns a DialOption that specifies the channelz ID of current ClientConn's
    422 // parent. This function is used in nested channel creation (e.g. grpclb dial).
    423 func WithChannelzParentID(id int64) DialOption {
    424 	return func(o *dialOptions) {
    425 		o.channelzParentID = id
    426 	}
    427 }
    428 
    429 // WithDisableServiceConfig returns a DialOption that causes grpc to ignore any
    430 // service config provided by the resolver and provides a hint to the resolver
    431 // to not fetch service configs.
    432 func WithDisableServiceConfig() DialOption {
    433 	return func(o *dialOptions) {
    434 		o.disableServiceConfig = true
    435 	}
    436 }
    437 
    438 // Dial creates a client connection to the given target.
    439 func Dial(target string, opts ...DialOption) (*ClientConn, error) {
    440 	return DialContext(context.Background(), target, opts...)
    441 }
    442 
    443 // DialContext creates a client connection to the given target. By default, it's
    444 // a non-blocking dial (the function won't wait for connections to be
    445 // established, and connecting happens in the background). To make it a blocking
    446 // dial, use WithBlock() dial option.
    447 //
    448 // In the non-blocking case, the ctx does not act against the connection. It
    449 // only controls the setup steps.
    450 //
    451 // In the blocking case, ctx can be used to cancel or expire the pending
    452 // connection. Once this function returns, the cancellation and expiration of
    453 // ctx will be noop. Users should call ClientConn.Close to terminate all the
    454 // pending operations after this function returns.
    455 //
    456 // The target name syntax is defined in
    457 // https://github.com/grpc/grpc/blob/master/doc/naming.md.
    458 // e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
    459 func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    460 	cc := &ClientConn{
    461 		target: target,
    462 		csMgr:  &connectivityStateManager{},
    463 		conns:  make(map[*addrConn]struct{}),
    464 
    465 		blockingpicker: newPickerWrapper(),
    466 	}
    467 	cc.ctx, cc.cancel = context.WithCancel(context.Background())
    468 
    469 	for _, opt := range opts {
    470 		opt(&cc.dopts)
    471 	}
    472 
    473 	if channelz.IsOn() {
    474 		if cc.dopts.channelzParentID != 0 {
    475 			cc.channelzID = channelz.RegisterChannel(cc, cc.dopts.channelzParentID, target)
    476 		} else {
    477 			cc.channelzID = channelz.RegisterChannel(cc, 0, target)
    478 		}
    479 	}
    480 
    481 	if !cc.dopts.insecure {
    482 		if cc.dopts.copts.TransportCredentials == nil {
    483 			return nil, errNoTransportSecurity
    484 		}
    485 	} else {
    486 		if cc.dopts.copts.TransportCredentials != nil {
    487 			return nil, errCredentialsConflict
    488 		}
    489 		for _, cd := range cc.dopts.copts.PerRPCCredentials {
    490 			if cd.RequireTransportSecurity() {
    491 				return nil, errTransportCredentialsMissing
    492 			}
    493 		}
    494 	}
    495 
    496 	cc.mkp = cc.dopts.copts.KeepaliveParams
    497 
    498 	if cc.dopts.copts.Dialer == nil {
    499 		cc.dopts.copts.Dialer = newProxyDialer(
    500 			func(ctx context.Context, addr string) (net.Conn, error) {
    501 				network, addr := parseDialTarget(addr)
    502 				return dialContext(ctx, network, addr)
    503 			},
    504 		)
    505 	}
    506 
    507 	if cc.dopts.copts.UserAgent != "" {
    508 		cc.dopts.copts.UserAgent += " " + grpcUA
    509 	} else {
    510 		cc.dopts.copts.UserAgent = grpcUA
    511 	}
    512 
    513 	if cc.dopts.timeout > 0 {
    514 		var cancel context.CancelFunc
    515 		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
    516 		defer cancel()
    517 	}
    518 
    519 	defer func() {
    520 		select {
    521 		case <-ctx.Done():
    522 			conn, err = nil, ctx.Err()
    523 		default:
    524 		}
    525 
    526 		if err != nil {
    527 			cc.Close()
    528 		}
    529 	}()
    530 
    531 	scSet := false
    532 	if cc.dopts.scChan != nil {
    533 		// Try to get an initial service config.
    534 		select {
    535 		case sc, ok := <-cc.dopts.scChan:
    536 			if ok {
    537 				cc.sc = sc
    538 				scSet = true
    539 			}
    540 		default:
    541 		}
    542 	}
    543 	if cc.dopts.bs == nil {
    544 		cc.dopts.bs = backoff.Exponential{
    545 			MaxDelay: DefaultBackoffConfig.MaxDelay,
    546 		}
    547 	}
    548 	if cc.dopts.resolverBuilder == nil {
    549 		// Only try to parse target when resolver builder is not already set.
    550 		cc.parsedTarget = parseTarget(cc.target)
    551 		grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
    552 		cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
    553 		if cc.dopts.resolverBuilder == nil {
    554 			// If resolver builder is still nil, the parse target's scheme is
    555 			// not registered. Fallback to default resolver and set Endpoint to
    556 			// the original unparsed target.
    557 			grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
    558 			cc.parsedTarget = resolver.Target{
    559 				Scheme:   resolver.GetDefaultScheme(),
    560 				Endpoint: target,
    561 			}
    562 			cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
    563 		}
    564 	} else {
    565 		cc.parsedTarget = resolver.Target{Endpoint: target}
    566 	}
    567 	creds := cc.dopts.copts.TransportCredentials
    568 	if creds != nil && creds.Info().ServerName != "" {
    569 		cc.authority = creds.Info().ServerName
    570 	} else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
    571 		cc.authority = cc.dopts.copts.Authority
    572 	} else {
    573 		// Use endpoint from "scheme://authority/endpoint" as the default
    574 		// authority for ClientConn.
    575 		cc.authority = cc.parsedTarget.Endpoint
    576 	}
    577 
    578 	if cc.dopts.scChan != nil && !scSet {
    579 		// Blocking wait for the initial service config.
    580 		select {
    581 		case sc, ok := <-cc.dopts.scChan:
    582 			if ok {
    583 				cc.sc = sc
    584 			}
    585 		case <-ctx.Done():
    586 			return nil, ctx.Err()
    587 		}
    588 	}
    589 	if cc.dopts.scChan != nil {
    590 		go cc.scWatcher()
    591 	}
    592 
    593 	var credsClone credentials.TransportCredentials
    594 	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
    595 		credsClone = creds.Clone()
    596 	}
    597 	cc.balancerBuildOpts = balancer.BuildOptions{
    598 		DialCreds:        credsClone,
    599 		Dialer:           cc.dopts.copts.Dialer,
    600 		ChannelzParentID: cc.channelzID,
    601 	}
    602 
    603 	// Build the resolver.
    604 	cc.resolverWrapper, err = newCCResolverWrapper(cc)
    605 	if err != nil {
    606 		return nil, fmt.Errorf("failed to build resolver: %v", err)
    607 	}
    608 	// Start the resolver wrapper goroutine after resolverWrapper is created.
    609 	//
    610 	// If the goroutine is started before resolverWrapper is ready, the
    611 	// following may happen: The goroutine sends updates to cc. cc forwards
    612 	// those to balancer. Balancer creates new addrConn. addrConn fails to
    613 	// connect, and calls resolveNow(). resolveNow() tries to use the non-ready
    614 	// resolverWrapper.
    615 	cc.resolverWrapper.start()
    616 
    617 	// A blocking dial blocks until the clientConn is ready.
    618 	if cc.dopts.block {
    619 		for {
    620 			s := cc.GetState()
    621 			if s == connectivity.Ready {
    622 				break
    623 			}
    624 			if !cc.WaitForStateChange(ctx, s) {
    625 				// ctx got timeout or canceled.
    626 				return nil, ctx.Err()
    627 			}
    628 		}
    629 	}
    630 
    631 	return cc, nil
    632 }
    633 
    634 // connectivityStateManager keeps the connectivity.State of ClientConn.
    635 // This struct will eventually be exported so the balancers can access it.
    636 type connectivityStateManager struct {
    637 	mu         sync.Mutex
    638 	state      connectivity.State
    639 	notifyChan chan struct{}
    640 }
    641 
    642 // updateState updates the connectivity.State of ClientConn.
    643 // If there's a change it notifies goroutines waiting on state change to
    644 // happen.
    645 func (csm *connectivityStateManager) updateState(state connectivity.State) {
    646 	csm.mu.Lock()
    647 	defer csm.mu.Unlock()
    648 	if csm.state == connectivity.Shutdown {
    649 		return
    650 	}
    651 	if csm.state == state {
    652 		return
    653 	}
    654 	csm.state = state
    655 	if csm.notifyChan != nil {
    656 		// There are other goroutines waiting on this channel.
    657 		close(csm.notifyChan)
    658 		csm.notifyChan = nil
    659 	}
    660 }
    661 
    662 func (csm *connectivityStateManager) getState() connectivity.State {
    663 	csm.mu.Lock()
    664 	defer csm.mu.Unlock()
    665 	return csm.state
    666 }
    667 
    668 func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
    669 	csm.mu.Lock()
    670 	defer csm.mu.Unlock()
    671 	if csm.notifyChan == nil {
    672 		csm.notifyChan = make(chan struct{})
    673 	}
    674 	return csm.notifyChan
    675 }
    676 
    677 // ClientConn represents a client connection to an RPC server.
    678 type ClientConn struct {
    679 	ctx    context.Context
    680 	cancel context.CancelFunc
    681 
    682 	target       string
    683 	parsedTarget resolver.Target
    684 	authority    string
    685 	dopts        dialOptions
    686 	csMgr        *connectivityStateManager
    687 
    688 	balancerBuildOpts balancer.BuildOptions
    689 	resolverWrapper   *ccResolverWrapper
    690 	blockingpicker    *pickerWrapper
    691 
    692 	mu    sync.RWMutex
    693 	sc    ServiceConfig
    694 	scRaw string
    695 	conns map[*addrConn]struct{}
    696 	// Keepalive parameter can be updated if a GoAway is received.
    697 	mkp             keepalive.ClientParameters
    698 	curBalancerName string
    699 	preBalancerName string // previous balancer name.
    700 	curAddresses    []resolver.Address
    701 	balancerWrapper *ccBalancerWrapper
    702 
    703 	channelzID          int64 // channelz unique identification number
    704 	czmu                sync.RWMutex
    705 	callsStarted        int64
    706 	callsSucceeded      int64
    707 	callsFailed         int64
    708 	lastCallStartedTime time.Time
    709 }
    710 
    711 // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
    712 // ctx expires. A true value is returned in former case and false in latter.
    713 // This is an EXPERIMENTAL API.
    714 func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
    715 	ch := cc.csMgr.getNotifyChan()
    716 	if cc.csMgr.getState() != sourceState {
    717 		return true
    718 	}
    719 	select {
    720 	case <-ctx.Done():
    721 		return false
    722 	case <-ch:
    723 		return true
    724 	}
    725 }
    726 
    727 // GetState returns the connectivity.State of ClientConn.
    728 // This is an EXPERIMENTAL API.
    729 func (cc *ClientConn) GetState() connectivity.State {
    730 	return cc.csMgr.getState()
    731 }
    732 
    733 func (cc *ClientConn) scWatcher() {
    734 	for {
    735 		select {
    736 		case sc, ok := <-cc.dopts.scChan:
    737 			if !ok {
    738 				return
    739 			}
    740 			cc.mu.Lock()
    741 			// TODO: load balance policy runtime change is ignored.
    742 			// We may revist this decision in the future.
    743 			cc.sc = sc
    744 			cc.scRaw = ""
    745 			cc.mu.Unlock()
    746 		case <-cc.ctx.Done():
    747 			return
    748 		}
    749 	}
    750 }
    751 
    752 func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
    753 	cc.mu.Lock()
    754 	defer cc.mu.Unlock()
    755 	if cc.conns == nil {
    756 		// cc was closed.
    757 		return
    758 	}
    759 
    760 	if reflect.DeepEqual(cc.curAddresses, addrs) {
    761 		return
    762 	}
    763 
    764 	cc.curAddresses = addrs
    765 
    766 	if cc.dopts.balancerBuilder == nil {
    767 		// Only look at balancer types and switch balancer if balancer dial
    768 		// option is not set.
    769 		var isGRPCLB bool
    770 		for _, a := range addrs {
    771 			if a.Type == resolver.GRPCLB {
    772 				isGRPCLB = true
    773 				break
    774 			}
    775 		}
    776 		var newBalancerName string
    777 		if isGRPCLB {
    778 			newBalancerName = grpclbName
    779 		} else {
    780 			// Address list doesn't contain grpclb address. Try to pick a
    781 			// non-grpclb balancer.
    782 			newBalancerName = cc.curBalancerName
    783 			// If current balancer is grpclb, switch to the previous one.
    784 			if newBalancerName == grpclbName {
    785 				newBalancerName = cc.preBalancerName
    786 			}
    787 			// The following could be true in two cases:
    788 			// - the first time handling resolved addresses
    789 			//   (curBalancerName="")
    790 			// - the first time handling non-grpclb addresses
    791 			//   (curBalancerName="grpclb", preBalancerName="")
    792 			if newBalancerName == "" {
    793 				newBalancerName = PickFirstBalancerName
    794 			}
    795 		}
    796 		cc.switchBalancer(newBalancerName)
    797 	} else if cc.balancerWrapper == nil {
    798 		// Balancer dial option was set, and this is the first time handling
    799 		// resolved addresses. Build a balancer with dopts.balancerBuilder.
    800 		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
    801 	}
    802 
    803 	cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
    804 }
    805 
    806 // switchBalancer starts the switching from current balancer to the balancer
    807 // with the given name.
    808 //
    809 // It will NOT send the current address list to the new balancer. If needed,
    810 // caller of this function should send address list to the new balancer after
    811 // this function returns.
    812 //
    813 // Caller must hold cc.mu.
    814 func (cc *ClientConn) switchBalancer(name string) {
    815 	if cc.conns == nil {
    816 		return
    817 	}
    818 
    819 	if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
    820 		return
    821 	}
    822 
    823 	grpclog.Infof("ClientConn switching balancer to %q", name)
    824 	if cc.dopts.balancerBuilder != nil {
    825 		grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
    826 		return
    827 	}
    828 	// TODO(bar switching) change this to two steps: drain and close.
    829 	// Keep track of sc in wrapper.
    830 	if cc.balancerWrapper != nil {
    831 		cc.balancerWrapper.close()
    832 	}
    833 	// Clear all stickiness state.
    834 	cc.blockingpicker.clearStickinessState()
    835 
    836 	builder := balancer.Get(name)
    837 	if builder == nil {
    838 		grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
    839 		builder = newPickfirstBuilder()
    840 	}
    841 	cc.preBalancerName = cc.curBalancerName
    842 	cc.curBalancerName = builder.Name()
    843 	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
    844 }
    845 
    846 func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
    847 	cc.mu.Lock()
    848 	if cc.conns == nil {
    849 		cc.mu.Unlock()
    850 		return
    851 	}
    852 	// TODO(bar switching) send updates to all balancer wrappers when balancer
    853 	// gracefully switching is supported.
    854 	cc.balancerWrapper.handleSubConnStateChange(sc, s)
    855 	cc.mu.Unlock()
    856 }
    857 
    858 // newAddrConn creates an addrConn for addrs and adds it to cc.conns.
    859 //
    860 // Caller needs to make sure len(addrs) > 0.
    861 func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
    862 	ac := &addrConn{
    863 		cc:    cc,
    864 		addrs: addrs,
    865 		dopts: cc.dopts,
    866 	}
    867 	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
    868 	// Track ac in cc. This needs to be done before any getTransport(...) is called.
    869 	cc.mu.Lock()
    870 	if cc.conns == nil {
    871 		cc.mu.Unlock()
    872 		return nil, ErrClientConnClosing
    873 	}
    874 	if channelz.IsOn() {
    875 		ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
    876 	}
    877 	cc.conns[ac] = struct{}{}
    878 	cc.mu.Unlock()
    879 	return ac, nil
    880 }
    881 
    882 // removeAddrConn removes the addrConn in the subConn from clientConn.
    883 // It also tears down the ac with the given error.
    884 func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
    885 	cc.mu.Lock()
    886 	if cc.conns == nil {
    887 		cc.mu.Unlock()
    888 		return
    889 	}
    890 	delete(cc.conns, ac)
    891 	cc.mu.Unlock()
    892 	ac.tearDown(err)
    893 }
    894 
    895 // ChannelzMetric returns ChannelInternalMetric of current ClientConn.
    896 // This is an EXPERIMENTAL API.
    897 func (cc *ClientConn) ChannelzMetric() *channelz.ChannelInternalMetric {
    898 	state := cc.GetState()
    899 	cc.czmu.RLock()
    900 	defer cc.czmu.RUnlock()
    901 	return &channelz.ChannelInternalMetric{
    902 		State:                    state,
    903 		Target:                   cc.target,
    904 		CallsStarted:             cc.callsStarted,
    905 		CallsSucceeded:           cc.callsSucceeded,
    906 		CallsFailed:              cc.callsFailed,
    907 		LastCallStartedTimestamp: cc.lastCallStartedTime,
    908 	}
    909 }
    910 
    911 func (cc *ClientConn) incrCallsStarted() {
    912 	cc.czmu.Lock()
    913 	cc.callsStarted++
    914 	// TODO(yuxuanli): will make this a time.Time pointer improve performance?
    915 	cc.lastCallStartedTime = time.Now()
    916 	cc.czmu.Unlock()
    917 }
    918 
    919 func (cc *ClientConn) incrCallsSucceeded() {
    920 	cc.czmu.Lock()
    921 	cc.callsSucceeded++
    922 	cc.czmu.Unlock()
    923 }
    924 
    925 func (cc *ClientConn) incrCallsFailed() {
    926 	cc.czmu.Lock()
    927 	cc.callsFailed++
    928 	cc.czmu.Unlock()
    929 }
    930 
    931 // connect starts to creating transport and also starts the transport monitor
    932 // goroutine for this ac.
    933 // It does nothing if the ac is not IDLE.
    934 // TODO(bar) Move this to the addrConn section.
    935 // This was part of resetAddrConn, keep it here to make the diff look clean.
    936 func (ac *addrConn) connect() error {
    937 	ac.mu.Lock()
    938 	if ac.state == connectivity.Shutdown {
    939 		ac.mu.Unlock()
    940 		return errConnClosing
    941 	}
    942 	if ac.state != connectivity.Idle {
    943 		ac.mu.Unlock()
    944 		return nil
    945 	}
    946 	ac.state = connectivity.Connecting
    947 	ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
    948 	ac.mu.Unlock()
    949 
    950 	// Start a goroutine connecting to the server asynchronously.
    951 	go func() {
    952 		if err := ac.resetTransport(); err != nil {
    953 			grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
    954 			if err != errConnClosing {
    955 				// Keep this ac in cc.conns, to get the reason it's torn down.
    956 				ac.tearDown(err)
    957 			}
    958 			return
    959 		}
    960 		ac.transportMonitor()
    961 	}()
    962 	return nil
    963 }
    964 
    965 // tryUpdateAddrs tries to update ac.addrs with the new addresses list.
    966 //
    967 // It checks whether current connected address of ac is in the new addrs list.
    968 //  - If true, it updates ac.addrs and returns true. The ac will keep using
    969 //    the existing connection.
    970 //  - If false, it does nothing and returns false.
    971 func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
    972 	ac.mu.Lock()
    973 	defer ac.mu.Unlock()
    974 	grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
    975 	if ac.state == connectivity.Shutdown {
    976 		ac.addrs = addrs
    977 		return true
    978 	}
    979 
    980 	var curAddrFound bool
    981 	for _, a := range addrs {
    982 		if reflect.DeepEqual(ac.curAddr, a) {
    983 			curAddrFound = true
    984 			break
    985 		}
    986 	}
    987 	grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
    988 	if curAddrFound {
    989 		ac.addrs = addrs
    990 		ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list.
    991 	}
    992 
    993 	return curAddrFound
    994 }
    995 
    996 // GetMethodConfig gets the method config of the input method.
    997 // If there's an exact match for input method (i.e. /service/method), we return
    998 // the corresponding MethodConfig.
    999 // If there isn't an exact match for the input method, we look for the default config
   1000 // under the service (i.e /service/). If there is a default MethodConfig for
   1001 // the service, we return it.
   1002 // Otherwise, we return an empty MethodConfig.
   1003 func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
   1004 	// TODO: Avoid the locking here.
   1005 	cc.mu.RLock()
   1006 	defer cc.mu.RUnlock()
   1007 	m, ok := cc.sc.Methods[method]
   1008 	if !ok {
   1009 		i := strings.LastIndex(method, "/")
   1010 		m = cc.sc.Methods[method[:i+1]]
   1011 	}
   1012 	return m
   1013 }
   1014 
   1015 func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transport.ClientTransport, func(balancer.DoneInfo), error) {
   1016 	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{})
   1017 	if err != nil {
   1018 		return nil, nil, toRPCErr(err)
   1019 	}
   1020 	return t, done, nil
   1021 }
   1022 
   1023 // handleServiceConfig parses the service config string in JSON format to Go native
   1024 // struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
   1025 func (cc *ClientConn) handleServiceConfig(js string) error {
   1026 	if cc.dopts.disableServiceConfig {
   1027 		return nil
   1028 	}
   1029 	sc, err := parseServiceConfig(js)
   1030 	if err != nil {
   1031 		return err
   1032 	}
   1033 	cc.mu.Lock()
   1034 	cc.scRaw = js
   1035 	cc.sc = sc
   1036 	if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
   1037 		if cc.curBalancerName == grpclbName {
   1038 			// If current balancer is grpclb, there's at least one grpclb
   1039 			// balancer address in the resolved list. Don't switch the balancer,
   1040 			// but change the previous balancer name, so if a new resolved
   1041 			// address list doesn't contain grpclb address, balancer will be
   1042 			// switched to *sc.LB.
   1043 			cc.preBalancerName = *sc.LB
   1044 		} else {
   1045 			cc.switchBalancer(*sc.LB)
   1046 			cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
   1047 		}
   1048 	}
   1049 
   1050 	if envConfigStickinessOn {
   1051 		var newStickinessMDKey string
   1052 		if sc.stickinessMetadataKey != nil && *sc.stickinessMetadataKey != "" {
   1053 			newStickinessMDKey = *sc.stickinessMetadataKey
   1054 		}
   1055 		// newStickinessMDKey is "" if one of the following happens:
   1056 		// - stickinessMetadataKey is set to ""
   1057 		// - stickinessMetadataKey field doesn't exist in service config
   1058 		cc.blockingpicker.updateStickinessMDKey(strings.ToLower(newStickinessMDKey))
   1059 	}
   1060 
   1061 	cc.mu.Unlock()
   1062 	return nil
   1063 }
   1064 
   1065 func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
   1066 	cc.mu.RLock()
   1067 	r := cc.resolverWrapper
   1068 	cc.mu.RUnlock()
   1069 	if r == nil {
   1070 		return
   1071 	}
   1072 	go r.resolveNow(o)
   1073 }
   1074 
   1075 // Close tears down the ClientConn and all underlying connections.
   1076 func (cc *ClientConn) Close() error {
   1077 	defer cc.cancel()
   1078 
   1079 	cc.mu.Lock()
   1080 	if cc.conns == nil {
   1081 		cc.mu.Unlock()
   1082 		return ErrClientConnClosing
   1083 	}
   1084 	conns := cc.conns
   1085 	cc.conns = nil
   1086 	cc.csMgr.updateState(connectivity.Shutdown)
   1087 
   1088 	rWrapper := cc.resolverWrapper
   1089 	cc.resolverWrapper = nil
   1090 	bWrapper := cc.balancerWrapper
   1091 	cc.balancerWrapper = nil
   1092 	cc.mu.Unlock()
   1093 
   1094 	cc.blockingpicker.close()
   1095 
   1096 	if rWrapper != nil {
   1097 		rWrapper.close()
   1098 	}
   1099 	if bWrapper != nil {
   1100 		bWrapper.close()
   1101 	}
   1102 
   1103 	for ac := range conns {
   1104 		ac.tearDown(ErrClientConnClosing)
   1105 	}
   1106 	if channelz.IsOn() {
   1107 		channelz.RemoveEntry(cc.channelzID)
   1108 	}
   1109 	return nil
   1110 }
   1111 
   1112 // addrConn is a network connection to a given address.
   1113 type addrConn struct {
   1114 	ctx    context.Context
   1115 	cancel context.CancelFunc
   1116 
   1117 	cc     *ClientConn
   1118 	addrs  []resolver.Address
   1119 	dopts  dialOptions
   1120 	events trace.EventLog
   1121 	acbw   balancer.SubConn
   1122 
   1123 	mu           sync.Mutex
   1124 	curAddr      resolver.Address
   1125 	reconnectIdx int // The index in addrs list to start reconnecting from.
   1126 	state        connectivity.State
   1127 	// ready is closed and becomes nil when a new transport is up or failed
   1128 	// due to timeout.
   1129 	ready     chan struct{}
   1130 	transport transport.ClientTransport
   1131 
   1132 	// The reason this addrConn is torn down.
   1133 	tearDownErr error
   1134 
   1135 	connectRetryNum int
   1136 	// backoffDeadline is the time until which resetTransport needs to
   1137 	// wait before increasing connectRetryNum count.
   1138 	backoffDeadline time.Time
   1139 	// connectDeadline is the time by which all connection
   1140 	// negotiations must complete.
   1141 	connectDeadline time.Time
   1142 
   1143 	channelzID          int64 // channelz unique identification number
   1144 	czmu                sync.RWMutex
   1145 	callsStarted        int64
   1146 	callsSucceeded      int64
   1147 	callsFailed         int64
   1148 	lastCallStartedTime time.Time
   1149 }
   1150 
   1151 // adjustParams updates parameters used to create transports upon
   1152 // receiving a GoAway.
   1153 func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
   1154 	switch r {
   1155 	case transport.GoAwayTooManyPings:
   1156 		v := 2 * ac.dopts.copts.KeepaliveParams.Time
   1157 		ac.cc.mu.Lock()
   1158 		if v > ac.cc.mkp.Time {
   1159 			ac.cc.mkp.Time = v
   1160 		}
   1161 		ac.cc.mu.Unlock()
   1162 	}
   1163 }
   1164 
   1165 // printf records an event in ac's event log, unless ac has been closed.
   1166 // REQUIRES ac.mu is held.
   1167 func (ac *addrConn) printf(format string, a ...interface{}) {
   1168 	if ac.events != nil {
   1169 		ac.events.Printf(format, a...)
   1170 	}
   1171 }
   1172 
   1173 // errorf records an error in ac's event log, unless ac has been closed.
   1174 // REQUIRES ac.mu is held.
   1175 func (ac *addrConn) errorf(format string, a ...interface{}) {
   1176 	if ac.events != nil {
   1177 		ac.events.Errorf(format, a...)
   1178 	}
   1179 }
   1180 
   1181 // resetTransport recreates a transport to the address for ac.  The old
   1182 // transport will close itself on error or when the clientconn is closed.
   1183 // The created transport must receive initial settings frame from the server.
   1184 // In case that doesn't happen, transportMonitor will kill the newly created
   1185 // transport after connectDeadline has expired.
   1186 // In case there was an error on the transport before the settings frame was
   1187 // received, resetTransport resumes connecting to backends after the one that
   1188 // was previously connected to. In case end of the list is reached, resetTransport
   1189 // backs off until the original deadline.
   1190 // If the DialOption WithWaitForHandshake was set, resetTrasport returns
   1191 // successfully only after server settings are received.
   1192 //
   1193 // TODO(bar) make sure all state transitions are valid.
   1194 func (ac *addrConn) resetTransport() error {
   1195 	ac.mu.Lock()
   1196 	if ac.state == connectivity.Shutdown {
   1197 		ac.mu.Unlock()
   1198 		return errConnClosing
   1199 	}
   1200 	if ac.ready != nil {
   1201 		close(ac.ready)
   1202 		ac.ready = nil
   1203 	}
   1204 	ac.transport = nil
   1205 	ridx := ac.reconnectIdx
   1206 	ac.mu.Unlock()
   1207 	ac.cc.mu.RLock()
   1208 	ac.dopts.copts.KeepaliveParams = ac.cc.mkp
   1209 	ac.cc.mu.RUnlock()
   1210 	var backoffDeadline, connectDeadline time.Time
   1211 	for connectRetryNum := 0; ; connectRetryNum++ {
   1212 		ac.mu.Lock()
   1213 		if ac.backoffDeadline.IsZero() {
   1214 			// This means either a successful HTTP2 connection was established
   1215 			// or this is the first time this addrConn is trying to establish a
   1216 			// connection.
   1217 			backoffFor := ac.dopts.bs.Backoff(connectRetryNum) // time.Duration.
   1218 			// This will be the duration that dial gets to finish.
   1219 			dialDuration := getMinConnectTimeout()
   1220 			if backoffFor > dialDuration {
   1221 				// Give dial more time as we keep failing to connect.
   1222 				dialDuration = backoffFor
   1223 			}
   1224 			start := time.Now()
   1225 			backoffDeadline = start.Add(backoffFor)
   1226 			connectDeadline = start.Add(dialDuration)
   1227 			ridx = 0 // Start connecting from the beginning.
   1228 		} else {
   1229 			// Continue trying to connect with the same deadlines.
   1230 			connectRetryNum = ac.connectRetryNum
   1231 			backoffDeadline = ac.backoffDeadline
   1232 			connectDeadline = ac.connectDeadline
   1233 			ac.backoffDeadline = time.Time{}
   1234 			ac.connectDeadline = time.Time{}
   1235 			ac.connectRetryNum = 0
   1236 		}
   1237 		if ac.state == connectivity.Shutdown {
   1238 			ac.mu.Unlock()
   1239 			return errConnClosing
   1240 		}
   1241 		ac.printf("connecting")
   1242 		if ac.state != connectivity.Connecting {
   1243 			ac.state = connectivity.Connecting
   1244 			ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
   1245 		}
   1246 		// copy ac.addrs in case of race
   1247 		addrsIter := make([]resolver.Address, len(ac.addrs))
   1248 		copy(addrsIter, ac.addrs)
   1249 		copts := ac.dopts.copts
   1250 		ac.mu.Unlock()
   1251 		connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
   1252 		if err != nil {
   1253 			return err
   1254 		}
   1255 		if connected {
   1256 			return nil
   1257 		}
   1258 	}
   1259 }
   1260 
   1261 // createTransport creates a connection to one of the backends in addrs.
   1262 // It returns true if a connection was established.
   1263 func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) {
   1264 	for i := ridx; i < len(addrs); i++ {
   1265 		addr := addrs[i]
   1266 		target := transport.TargetInfo{
   1267 			Addr:      addr.Addr,
   1268 			Metadata:  addr.Metadata,
   1269 			Authority: ac.cc.authority,
   1270 		}
   1271 		done := make(chan struct{})
   1272 		onPrefaceReceipt := func() {
   1273 			ac.mu.Lock()
   1274 			close(done)
   1275 			if !ac.backoffDeadline.IsZero() {
   1276 				// If we haven't already started reconnecting to
   1277 				// other backends.
   1278 				// Note, this can happen when writer notices an error
   1279 				// and triggers resetTransport while at the same time
   1280 				// reader receives the preface and invokes this closure.
   1281 				ac.backoffDeadline = time.Time{}
   1282 				ac.connectDeadline = time.Time{}
   1283 				ac.connectRetryNum = 0
   1284 			}
   1285 			ac.mu.Unlock()
   1286 		}
   1287 		// Do not cancel in the success path because of
   1288 		// this issue in Go1.6: https://github.com/golang/go/issues/15078.
   1289 		connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
   1290 		if channelz.IsOn() {
   1291 			copts.ChannelzParentID = ac.channelzID
   1292 		}
   1293 		newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt)
   1294 		if err != nil {
   1295 			cancel()
   1296 			ac.cc.blockingpicker.updateConnectionError(err)
   1297 			ac.mu.Lock()
   1298 			if ac.state == connectivity.Shutdown {
   1299 				// ac.tearDown(...) has been invoked.
   1300 				ac.mu.Unlock()
   1301 				return false, errConnClosing
   1302 			}
   1303 			ac.mu.Unlock()
   1304 			grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
   1305 			continue
   1306 		}
   1307 		if ac.dopts.waitForHandshake {
   1308 			select {
   1309 			case <-done:
   1310 			case <-connectCtx.Done():
   1311 				// Didn't receive server preface, must kill this new transport now.
   1312 				grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.")
   1313 				newTr.Close()
   1314 				break
   1315 			case <-ac.ctx.Done():
   1316 			}
   1317 		}
   1318 		ac.mu.Lock()
   1319 		if ac.state == connectivity.Shutdown {
   1320 			ac.mu.Unlock()
   1321 			// ac.tearDonn(...) has been invoked.
   1322 			newTr.Close()
   1323 			return false, errConnClosing
   1324 		}
   1325 		ac.printf("ready")
   1326 		ac.state = connectivity.Ready
   1327 		ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
   1328 		ac.transport = newTr
   1329 		ac.curAddr = addr
   1330 		if ac.ready != nil {
   1331 			close(ac.ready)
   1332 			ac.ready = nil
   1333 		}
   1334 		select {
   1335 		case <-done:
   1336 			// If the server has responded back with preface already,
   1337 			// don't set the reconnect parameters.
   1338 		default:
   1339 			ac.connectRetryNum = connectRetryNum
   1340 			ac.backoffDeadline = backoffDeadline
   1341 			ac.connectDeadline = connectDeadline
   1342 			ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list.
   1343 		}
   1344 		ac.mu.Unlock()
   1345 		return true, nil
   1346 	}
   1347 	ac.mu.Lock()
   1348 	if ac.state == connectivity.Shutdown {
   1349 		ac.mu.Unlock()
   1350 		return false, errConnClosing
   1351 	}
   1352 	ac.state = connectivity.TransientFailure
   1353 	ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
   1354 	ac.cc.resolveNow(resolver.ResolveNowOption{})
   1355 	if ac.ready != nil {
   1356 		close(ac.ready)
   1357 		ac.ready = nil
   1358 	}
   1359 	ac.mu.Unlock()
   1360 	timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
   1361 	select {
   1362 	case <-timer.C:
   1363 	case <-ac.ctx.Done():
   1364 		timer.Stop()
   1365 		return false, ac.ctx.Err()
   1366 	}
   1367 	return false, nil
   1368 }
   1369 
   1370 // Run in a goroutine to track the error in transport and create the
   1371 // new transport if an error happens. It returns when the channel is closing.
   1372 func (ac *addrConn) transportMonitor() {
   1373 	for {
   1374 		var timer *time.Timer
   1375 		var cdeadline <-chan time.Time
   1376 		ac.mu.Lock()
   1377 		t := ac.transport
   1378 		if !ac.connectDeadline.IsZero() {
   1379 			timer = time.NewTimer(ac.connectDeadline.Sub(time.Now()))
   1380 			cdeadline = timer.C
   1381 		}
   1382 		ac.mu.Unlock()
   1383 		// Block until we receive a goaway or an error occurs.
   1384 		select {
   1385 		case <-t.GoAway():
   1386 			done := t.Error()
   1387 			cleanup := t.Close
   1388 			// Since this transport will be orphaned (won't have a transportMonitor)
   1389 			// we need to launch a goroutine to keep track of clientConn.Close()
   1390 			// happening since it might not be noticed by any other goroutine for a while.
   1391 			go func() {
   1392 				<-done
   1393 				cleanup()
   1394 			}()
   1395 		case <-t.Error():
   1396 			// In case this is triggered because clientConn.Close()
   1397 			// was called, we want to immeditately close the transport
   1398 			// since no other goroutine might notice it for a while.
   1399 			t.Close()
   1400 		case <-cdeadline:
   1401 			ac.mu.Lock()
   1402 			// This implies that client received server preface.
   1403 			if ac.backoffDeadline.IsZero() {
   1404 				ac.mu.Unlock()
   1405 				continue
   1406 			}
   1407 			ac.mu.Unlock()
   1408 			timer = nil
   1409 			// No server preface received until deadline.
   1410 			// Kill the connection.
   1411 			grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.")
   1412 			t.Close()
   1413 		}
   1414 		if timer != nil {
   1415 			timer.Stop()
   1416 		}
   1417 		// If a GoAway happened, regardless of error, adjust our keepalive
   1418 		// parameters as appropriate.
   1419 		select {
   1420 		case <-t.GoAway():
   1421 			ac.adjustParams(t.GetGoAwayReason())
   1422 		default:
   1423 		}
   1424 		ac.mu.Lock()
   1425 		if ac.state == connectivity.Shutdown {
   1426 			ac.mu.Unlock()
   1427 			return
   1428 		}
   1429 		// Set connectivity state to TransientFailure before calling
   1430 		// resetTransport. Transition READY->CONNECTING is not valid.
   1431 		ac.state = connectivity.TransientFailure
   1432 		ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
   1433 		ac.cc.resolveNow(resolver.ResolveNowOption{})
   1434 		ac.curAddr = resolver.Address{}
   1435 		ac.mu.Unlock()
   1436 		if err := ac.resetTransport(); err != nil {
   1437 			ac.mu.Lock()
   1438 			ac.printf("transport exiting: %v", err)
   1439 			ac.mu.Unlock()
   1440 			grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
   1441 			if err != errConnClosing {
   1442 				// Keep this ac in cc.conns, to get the reason it's torn down.
   1443 				ac.tearDown(err)
   1444 			}
   1445 			return
   1446 		}
   1447 	}
   1448 }
   1449 
   1450 // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
   1451 // iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
   1452 func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
   1453 	for {
   1454 		ac.mu.Lock()
   1455 		switch {
   1456 		case ac.state == connectivity.Shutdown:
   1457 			if failfast || !hasBalancer {
   1458 				// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
   1459 				err := ac.tearDownErr
   1460 				ac.mu.Unlock()
   1461 				return nil, err
   1462 			}
   1463 			ac.mu.Unlock()
   1464 			return nil, errConnClosing
   1465 		case ac.state == connectivity.Ready:
   1466 			ct := ac.transport
   1467 			ac.mu.Unlock()
   1468 			return ct, nil
   1469 		case ac.state == connectivity.TransientFailure:
   1470 			if failfast || hasBalancer {
   1471 				ac.mu.Unlock()
   1472 				return nil, errConnUnavailable
   1473 			}
   1474 		}
   1475 		ready := ac.ready
   1476 		if ready == nil {
   1477 			ready = make(chan struct{})
   1478 			ac.ready = ready
   1479 		}
   1480 		ac.mu.Unlock()
   1481 		select {
   1482 		case <-ctx.Done():
   1483 			return nil, toRPCErr(ctx.Err())
   1484 		// Wait until the new transport is ready or failed.
   1485 		case <-ready:
   1486 		}
   1487 	}
   1488 }
   1489 
   1490 // getReadyTransport returns the transport if ac's state is READY.
   1491 // Otherwise it returns nil, false.
   1492 // If ac's state is IDLE, it will trigger ac to connect.
   1493 func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
   1494 	ac.mu.Lock()
   1495 	if ac.state == connectivity.Ready {
   1496 		t := ac.transport
   1497 		ac.mu.Unlock()
   1498 		return t, true
   1499 	}
   1500 	var idle bool
   1501 	if ac.state == connectivity.Idle {
   1502 		idle = true
   1503 	}
   1504 	ac.mu.Unlock()
   1505 	// Trigger idle ac to connect.
   1506 	if idle {
   1507 		ac.connect()
   1508 	}
   1509 	return nil, false
   1510 }
   1511 
   1512 // tearDown starts to tear down the addrConn.
   1513 // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
   1514 // some edge cases (e.g., the caller opens and closes many addrConn's in a
   1515 // tight loop.
   1516 // tearDown doesn't remove ac from ac.cc.conns.
   1517 func (ac *addrConn) tearDown(err error) {
   1518 	ac.cancel()
   1519 	ac.mu.Lock()
   1520 	defer ac.mu.Unlock()
   1521 	if ac.state == connectivity.Shutdown {
   1522 		return
   1523 	}
   1524 	ac.curAddr = resolver.Address{}
   1525 	if err == errConnDrain && ac.transport != nil {
   1526 		// GracefulClose(...) may be executed multiple times when
   1527 		// i) receiving multiple GoAway frames from the server; or
   1528 		// ii) there are concurrent name resolver/Balancer triggered
   1529 		// address removal and GoAway.
   1530 		ac.transport.GracefulClose()
   1531 	}
   1532 	ac.state = connectivity.Shutdown
   1533 	ac.tearDownErr = err
   1534 	ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
   1535 	if ac.events != nil {
   1536 		ac.events.Finish()
   1537 		ac.events = nil
   1538 	}
   1539 	if ac.ready != nil {
   1540 		close(ac.ready)
   1541 		ac.ready = nil
   1542 	}
   1543 	if channelz.IsOn() {
   1544 		channelz.RemoveEntry(ac.channelzID)
   1545 	}
   1546 }
   1547 
   1548 func (ac *addrConn) getState() connectivity.State {
   1549 	ac.mu.Lock()
   1550 	defer ac.mu.Unlock()
   1551 	return ac.state
   1552 }
   1553 
   1554 func (ac *addrConn) getCurAddr() (ret resolver.Address) {
   1555 	ac.mu.Lock()
   1556 	ret = ac.curAddr
   1557 	ac.mu.Unlock()
   1558 	return
   1559 }
   1560 
   1561 func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
   1562 	ac.mu.Lock()
   1563 	addr := ac.curAddr.Addr
   1564 	ac.mu.Unlock()
   1565 	state := ac.getState()
   1566 	ac.czmu.RLock()
   1567 	defer ac.czmu.RUnlock()
   1568 	return &channelz.ChannelInternalMetric{
   1569 		State:                    state,
   1570 		Target:                   addr,
   1571 		CallsStarted:             ac.callsStarted,
   1572 		CallsSucceeded:           ac.callsSucceeded,
   1573 		CallsFailed:              ac.callsFailed,
   1574 		LastCallStartedTimestamp: ac.lastCallStartedTime,
   1575 	}
   1576 }
   1577 
   1578 func (ac *addrConn) incrCallsStarted() {
   1579 	ac.czmu.Lock()
   1580 	ac.callsStarted++
   1581 	ac.lastCallStartedTime = time.Now()
   1582 	ac.czmu.Unlock()
   1583 }
   1584 
   1585 func (ac *addrConn) incrCallsSucceeded() {
   1586 	ac.czmu.Lock()
   1587 	ac.callsSucceeded++
   1588 	ac.czmu.Unlock()
   1589 }
   1590 
   1591 func (ac *addrConn) incrCallsFailed() {
   1592 	ac.czmu.Lock()
   1593 	ac.callsFailed++
   1594 	ac.czmu.Unlock()
   1595 }
   1596 
   1597 // ErrClientConnTimeout indicates that the ClientConn cannot establish the
   1598 // underlying connections within the specified timeout.
   1599 //
   1600 // Deprecated: This error is never returned by grpc and should not be
   1601 // referenced by users.
   1602 var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
   1603