1 /* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package grpc 20 21 import ( 22 "errors" 23 "fmt" 24 "math" 25 "net" 26 "reflect" 27 "strings" 28 "sync" 29 "time" 30 31 "golang.org/x/net/context" 32 "golang.org/x/net/trace" 33 "google.golang.org/grpc/balancer" 34 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/connectivity" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/grpclog" 39 "google.golang.org/grpc/internal" 40 "google.golang.org/grpc/internal/backoff" 41 "google.golang.org/grpc/internal/channelz" 42 "google.golang.org/grpc/keepalive" 43 "google.golang.org/grpc/resolver" 44 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver. 45 _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver. 46 "google.golang.org/grpc/stats" 47 "google.golang.org/grpc/status" 48 "google.golang.org/grpc/transport" 49 ) 50 51 const ( 52 // minimum time to give a connection to complete 53 minConnectTimeout = 20 * time.Second 54 // must match grpclbName in grpclb/grpclb.go 55 grpclbName = "grpclb" 56 ) 57 58 var ( 59 // ErrClientConnClosing indicates that the operation is illegal because 60 // the ClientConn is closing. 61 // 62 // Deprecated: this error should not be relied upon by users; use the status 63 // code of Canceled instead. 64 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") 65 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. 66 errConnDrain = errors.New("grpc: the connection is drained") 67 // errConnClosing indicates that the connection is closing. 68 errConnClosing = errors.New("grpc: the connection is closing") 69 // errConnUnavailable indicates that the connection is unavailable. 70 errConnUnavailable = errors.New("grpc: the connection is unavailable") 71 // errBalancerClosed indicates that the balancer is closed. 72 errBalancerClosed = errors.New("grpc: balancer is closed") 73 // We use an accessor so that minConnectTimeout can be 74 // atomically read and updated while testing. 75 getMinConnectTimeout = func() time.Duration { 76 return minConnectTimeout 77 } 78 ) 79 80 // The following errors are returned from Dial and DialContext 81 var ( 82 // errNoTransportSecurity indicates that there is no transport security 83 // being set for ClientConn. Users should either set one or explicitly 84 // call WithInsecure DialOption to disable security. 85 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") 86 // errTransportCredentialsMissing indicates that users want to transmit security 87 // information (e.g., oauth2 token) which requires secure connection on an insecure 88 // connection. 89 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") 90 // errCredentialsConflict indicates that grpc.WithTransportCredentials() 91 // and grpc.WithInsecure() are both called for a connection. 92 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") 93 // errNetworkIO indicates that the connection is down due to some network I/O error. 94 errNetworkIO = errors.New("grpc: failed with network I/O error") 95 ) 96 97 // dialOptions configure a Dial call. dialOptions are set by the DialOption 98 // values passed to Dial. 99 type dialOptions struct { 100 unaryInt UnaryClientInterceptor 101 streamInt StreamClientInterceptor 102 cp Compressor 103 dc Decompressor 104 bs backoff.Strategy 105 block bool 106 insecure bool 107 timeout time.Duration 108 scChan <-chan ServiceConfig 109 copts transport.ConnectOptions 110 callOptions []CallOption 111 // This is used by v1 balancer dial option WithBalancer to support v1 112 // balancer, and also by WithBalancerName dial option. 113 balancerBuilder balancer.Builder 114 // This is to support grpclb. 115 resolverBuilder resolver.Builder 116 waitForHandshake bool 117 channelzParentID int64 118 disableServiceConfig bool 119 } 120 121 const ( 122 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 123 defaultClientMaxSendMessageSize = math.MaxInt32 124 ) 125 126 // RegisterChannelz turns on channelz service. 127 // This is an EXPERIMENTAL API. 128 func RegisterChannelz() { 129 channelz.TurnOn() 130 } 131 132 // DialOption configures how we set up the connection. 133 type DialOption func(*dialOptions) 134 135 // WithWaitForHandshake blocks until the initial settings frame is received from the 136 // server before assigning RPCs to the connection. 137 // Experimental API. 138 func WithWaitForHandshake() DialOption { 139 return func(o *dialOptions) { 140 o.waitForHandshake = true 141 } 142 } 143 144 // WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched 145 // before doing a write on the wire. 146 func WithWriteBufferSize(s int) DialOption { 147 return func(o *dialOptions) { 148 o.copts.WriteBufferSize = s 149 } 150 } 151 152 // WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most 153 // for each read syscall. 154 func WithReadBufferSize(s int) DialOption { 155 return func(o *dialOptions) { 156 o.copts.ReadBufferSize = s 157 } 158 } 159 160 // WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream. 161 // The lower bound for window size is 64K and any value smaller than that will be ignored. 162 func WithInitialWindowSize(s int32) DialOption { 163 return func(o *dialOptions) { 164 o.copts.InitialWindowSize = s 165 } 166 } 167 168 // WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection. 169 // The lower bound for window size is 64K and any value smaller than that will be ignored. 170 func WithInitialConnWindowSize(s int32) DialOption { 171 return func(o *dialOptions) { 172 o.copts.InitialConnWindowSize = s 173 } 174 } 175 176 // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. 177 // 178 // Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead. 179 func WithMaxMsgSize(s int) DialOption { 180 return WithDefaultCallOptions(MaxCallRecvMsgSize(s)) 181 } 182 183 // WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection. 184 func WithDefaultCallOptions(cos ...CallOption) DialOption { 185 return func(o *dialOptions) { 186 o.callOptions = append(o.callOptions, cos...) 187 } 188 } 189 190 // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling. 191 // 192 // Deprecated: use WithDefaultCallOptions(CallCustomCodec(c)) instead. 193 func WithCodec(c Codec) DialOption { 194 return WithDefaultCallOptions(CallCustomCodec(c)) 195 } 196 197 // WithCompressor returns a DialOption which sets a Compressor to use for 198 // message compression. It has lower priority than the compressor set by 199 // the UseCompressor CallOption. 200 // 201 // Deprecated: use UseCompressor instead. 202 func WithCompressor(cp Compressor) DialOption { 203 return func(o *dialOptions) { 204 o.cp = cp 205 } 206 } 207 208 // WithDecompressor returns a DialOption which sets a Decompressor to use for 209 // incoming message decompression. If incoming response messages are encoded 210 // using the decompressor's Type(), it will be used. Otherwise, the message 211 // encoding will be used to look up the compressor registered via 212 // encoding.RegisterCompressor, which will then be used to decompress the 213 // message. If no compressor is registered for the encoding, an Unimplemented 214 // status error will be returned. 215 // 216 // Deprecated: use encoding.RegisterCompressor instead. 217 func WithDecompressor(dc Decompressor) DialOption { 218 return func(o *dialOptions) { 219 o.dc = dc 220 } 221 } 222 223 // WithBalancer returns a DialOption which sets a load balancer with the v1 API. 224 // Name resolver will be ignored if this DialOption is specified. 225 // 226 // Deprecated: use the new balancer APIs in balancer package and WithBalancerName. 227 func WithBalancer(b Balancer) DialOption { 228 return func(o *dialOptions) { 229 o.balancerBuilder = &balancerWrapperBuilder{ 230 b: b, 231 } 232 } 233 } 234 235 // WithBalancerName sets the balancer that the ClientConn will be initialized 236 // with. Balancer registered with balancerName will be used. This function 237 // panics if no balancer was registered by balancerName. 238 // 239 // The balancer cannot be overridden by balancer option specified by service 240 // config. 241 // 242 // This is an EXPERIMENTAL API. 243 func WithBalancerName(balancerName string) DialOption { 244 builder := balancer.Get(balancerName) 245 if builder == nil { 246 panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName)) 247 } 248 return func(o *dialOptions) { 249 o.balancerBuilder = builder 250 } 251 } 252 253 // withResolverBuilder is only for grpclb. 254 func withResolverBuilder(b resolver.Builder) DialOption { 255 return func(o *dialOptions) { 256 o.resolverBuilder = b 257 } 258 } 259 260 // WithServiceConfig returns a DialOption which has a channel to read the service configuration. 261 // 262 // Deprecated: service config should be received through name resolver, as specified here. 263 // https://github.com/grpc/grpc/blob/master/doc/service_config.md 264 func WithServiceConfig(c <-chan ServiceConfig) DialOption { 265 return func(o *dialOptions) { 266 o.scChan = c 267 } 268 } 269 270 // WithBackoffMaxDelay configures the dialer to use the provided maximum delay 271 // when backing off after failed connection attempts. 272 func WithBackoffMaxDelay(md time.Duration) DialOption { 273 return WithBackoffConfig(BackoffConfig{MaxDelay: md}) 274 } 275 276 // WithBackoffConfig configures the dialer to use the provided backoff 277 // parameters after connection failures. 278 // 279 // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up 280 // for use. 281 func WithBackoffConfig(b BackoffConfig) DialOption { 282 283 return withBackoff(backoff.Exponential{ 284 MaxDelay: b.MaxDelay, 285 }) 286 } 287 288 // withBackoff sets the backoff strategy used for connectRetryNum after a 289 // failed connection attempt. 290 // 291 // This can be exported if arbitrary backoff strategies are allowed by gRPC. 292 func withBackoff(bs backoff.Strategy) DialOption { 293 return func(o *dialOptions) { 294 o.bs = bs 295 } 296 } 297 298 // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying 299 // connection is up. Without this, Dial returns immediately and connecting the server 300 // happens in background. 301 func WithBlock() DialOption { 302 return func(o *dialOptions) { 303 o.block = true 304 } 305 } 306 307 // WithInsecure returns a DialOption which disables transport security for this ClientConn. 308 // Note that transport security is required unless WithInsecure is set. 309 func WithInsecure() DialOption { 310 return func(o *dialOptions) { 311 o.insecure = true 312 } 313 } 314 315 // WithTransportCredentials returns a DialOption which configures a 316 // connection level security credentials (e.g., TLS/SSL). 317 func WithTransportCredentials(creds credentials.TransportCredentials) DialOption { 318 return func(o *dialOptions) { 319 o.copts.TransportCredentials = creds 320 } 321 } 322 323 // WithPerRPCCredentials returns a DialOption which sets 324 // credentials and places auth state on each outbound RPC. 325 func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption { 326 return func(o *dialOptions) { 327 o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds) 328 } 329 } 330 331 // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn 332 // initially. This is valid if and only if WithBlock() is present. 333 // 334 // Deprecated: use DialContext and context.WithTimeout instead. 335 func WithTimeout(d time.Duration) DialOption { 336 return func(o *dialOptions) { 337 o.timeout = d 338 } 339 } 340 341 func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption { 342 return func(o *dialOptions) { 343 o.copts.Dialer = f 344 } 345 } 346 347 func init() { 348 internal.WithContextDialer = withContextDialer 349 internal.WithResolverBuilder = withResolverBuilder 350 } 351 352 // WithDialer returns a DialOption that specifies a function to use for dialing network addresses. 353 // If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's 354 // Temporary() method to decide if it should try to reconnect to the network address. 355 func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { 356 return withContextDialer( 357 func(ctx context.Context, addr string) (net.Conn, error) { 358 if deadline, ok := ctx.Deadline(); ok { 359 return f(addr, deadline.Sub(time.Now())) 360 } 361 return f(addr, 0) 362 }) 363 } 364 365 // WithStatsHandler returns a DialOption that specifies the stats handler 366 // for all the RPCs and underlying network connections in this ClientConn. 367 func WithStatsHandler(h stats.Handler) DialOption { 368 return func(o *dialOptions) { 369 o.copts.StatsHandler = h 370 } 371 } 372 373 // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors. 374 // If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network 375 // address and won't try to reconnect. 376 // The default value of FailOnNonTempDialError is false. 377 // This is an EXPERIMENTAL API. 378 func FailOnNonTempDialError(f bool) DialOption { 379 return func(o *dialOptions) { 380 o.copts.FailOnNonTempDialError = f 381 } 382 } 383 384 // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs. 385 func WithUserAgent(s string) DialOption { 386 return func(o *dialOptions) { 387 o.copts.UserAgent = s 388 } 389 } 390 391 // WithKeepaliveParams returns a DialOption that specifies keepalive parameters for the client transport. 392 func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption { 393 return func(o *dialOptions) { 394 o.copts.KeepaliveParams = kp 395 } 396 } 397 398 // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs. 399 func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { 400 return func(o *dialOptions) { 401 o.unaryInt = f 402 } 403 } 404 405 // WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs. 406 func WithStreamInterceptor(f StreamClientInterceptor) DialOption { 407 return func(o *dialOptions) { 408 o.streamInt = f 409 } 410 } 411 412 // WithAuthority returns a DialOption that specifies the value to be used as 413 // the :authority pseudo-header. This value only works with WithInsecure and 414 // has no effect if TransportCredentials are present. 415 func WithAuthority(a string) DialOption { 416 return func(o *dialOptions) { 417 o.copts.Authority = a 418 } 419 } 420 421 // WithChannelzParentID returns a DialOption that specifies the channelz ID of current ClientConn's 422 // parent. This function is used in nested channel creation (e.g. grpclb dial). 423 func WithChannelzParentID(id int64) DialOption { 424 return func(o *dialOptions) { 425 o.channelzParentID = id 426 } 427 } 428 429 // WithDisableServiceConfig returns a DialOption that causes grpc to ignore any 430 // service config provided by the resolver and provides a hint to the resolver 431 // to not fetch service configs. 432 func WithDisableServiceConfig() DialOption { 433 return func(o *dialOptions) { 434 o.disableServiceConfig = true 435 } 436 } 437 438 // Dial creates a client connection to the given target. 439 func Dial(target string, opts ...DialOption) (*ClientConn, error) { 440 return DialContext(context.Background(), target, opts...) 441 } 442 443 // DialContext creates a client connection to the given target. By default, it's 444 // a non-blocking dial (the function won't wait for connections to be 445 // established, and connecting happens in the background). To make it a blocking 446 // dial, use WithBlock() dial option. 447 // 448 // In the non-blocking case, the ctx does not act against the connection. It 449 // only controls the setup steps. 450 // 451 // In the blocking case, ctx can be used to cancel or expire the pending 452 // connection. Once this function returns, the cancellation and expiration of 453 // ctx will be noop. Users should call ClientConn.Close to terminate all the 454 // pending operations after this function returns. 455 // 456 // The target name syntax is defined in 457 // https://github.com/grpc/grpc/blob/master/doc/naming.md. 458 // e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. 459 func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 460 cc := &ClientConn{ 461 target: target, 462 csMgr: &connectivityStateManager{}, 463 conns: make(map[*addrConn]struct{}), 464 465 blockingpicker: newPickerWrapper(), 466 } 467 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 468 469 for _, opt := range opts { 470 opt(&cc.dopts) 471 } 472 473 if channelz.IsOn() { 474 if cc.dopts.channelzParentID != 0 { 475 cc.channelzID = channelz.RegisterChannel(cc, cc.dopts.channelzParentID, target) 476 } else { 477 cc.channelzID = channelz.RegisterChannel(cc, 0, target) 478 } 479 } 480 481 if !cc.dopts.insecure { 482 if cc.dopts.copts.TransportCredentials == nil { 483 return nil, errNoTransportSecurity 484 } 485 } else { 486 if cc.dopts.copts.TransportCredentials != nil { 487 return nil, errCredentialsConflict 488 } 489 for _, cd := range cc.dopts.copts.PerRPCCredentials { 490 if cd.RequireTransportSecurity() { 491 return nil, errTransportCredentialsMissing 492 } 493 } 494 } 495 496 cc.mkp = cc.dopts.copts.KeepaliveParams 497 498 if cc.dopts.copts.Dialer == nil { 499 cc.dopts.copts.Dialer = newProxyDialer( 500 func(ctx context.Context, addr string) (net.Conn, error) { 501 network, addr := parseDialTarget(addr) 502 return dialContext(ctx, network, addr) 503 }, 504 ) 505 } 506 507 if cc.dopts.copts.UserAgent != "" { 508 cc.dopts.copts.UserAgent += " " + grpcUA 509 } else { 510 cc.dopts.copts.UserAgent = grpcUA 511 } 512 513 if cc.dopts.timeout > 0 { 514 var cancel context.CancelFunc 515 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) 516 defer cancel() 517 } 518 519 defer func() { 520 select { 521 case <-ctx.Done(): 522 conn, err = nil, ctx.Err() 523 default: 524 } 525 526 if err != nil { 527 cc.Close() 528 } 529 }() 530 531 scSet := false 532 if cc.dopts.scChan != nil { 533 // Try to get an initial service config. 534 select { 535 case sc, ok := <-cc.dopts.scChan: 536 if ok { 537 cc.sc = sc 538 scSet = true 539 } 540 default: 541 } 542 } 543 if cc.dopts.bs == nil { 544 cc.dopts.bs = backoff.Exponential{ 545 MaxDelay: DefaultBackoffConfig.MaxDelay, 546 } 547 } 548 if cc.dopts.resolverBuilder == nil { 549 // Only try to parse target when resolver builder is not already set. 550 cc.parsedTarget = parseTarget(cc.target) 551 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme) 552 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) 553 if cc.dopts.resolverBuilder == nil { 554 // If resolver builder is still nil, the parse target's scheme is 555 // not registered. Fallback to default resolver and set Endpoint to 556 // the original unparsed target. 557 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) 558 cc.parsedTarget = resolver.Target{ 559 Scheme: resolver.GetDefaultScheme(), 560 Endpoint: target, 561 } 562 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) 563 } 564 } else { 565 cc.parsedTarget = resolver.Target{Endpoint: target} 566 } 567 creds := cc.dopts.copts.TransportCredentials 568 if creds != nil && creds.Info().ServerName != "" { 569 cc.authority = creds.Info().ServerName 570 } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" { 571 cc.authority = cc.dopts.copts.Authority 572 } else { 573 // Use endpoint from "scheme://authority/endpoint" as the default 574 // authority for ClientConn. 575 cc.authority = cc.parsedTarget.Endpoint 576 } 577 578 if cc.dopts.scChan != nil && !scSet { 579 // Blocking wait for the initial service config. 580 select { 581 case sc, ok := <-cc.dopts.scChan: 582 if ok { 583 cc.sc = sc 584 } 585 case <-ctx.Done(): 586 return nil, ctx.Err() 587 } 588 } 589 if cc.dopts.scChan != nil { 590 go cc.scWatcher() 591 } 592 593 var credsClone credentials.TransportCredentials 594 if creds := cc.dopts.copts.TransportCredentials; creds != nil { 595 credsClone = creds.Clone() 596 } 597 cc.balancerBuildOpts = balancer.BuildOptions{ 598 DialCreds: credsClone, 599 Dialer: cc.dopts.copts.Dialer, 600 ChannelzParentID: cc.channelzID, 601 } 602 603 // Build the resolver. 604 cc.resolverWrapper, err = newCCResolverWrapper(cc) 605 if err != nil { 606 return nil, fmt.Errorf("failed to build resolver: %v", err) 607 } 608 // Start the resolver wrapper goroutine after resolverWrapper is created. 609 // 610 // If the goroutine is started before resolverWrapper is ready, the 611 // following may happen: The goroutine sends updates to cc. cc forwards 612 // those to balancer. Balancer creates new addrConn. addrConn fails to 613 // connect, and calls resolveNow(). resolveNow() tries to use the non-ready 614 // resolverWrapper. 615 cc.resolverWrapper.start() 616 617 // A blocking dial blocks until the clientConn is ready. 618 if cc.dopts.block { 619 for { 620 s := cc.GetState() 621 if s == connectivity.Ready { 622 break 623 } 624 if !cc.WaitForStateChange(ctx, s) { 625 // ctx got timeout or canceled. 626 return nil, ctx.Err() 627 } 628 } 629 } 630 631 return cc, nil 632 } 633 634 // connectivityStateManager keeps the connectivity.State of ClientConn. 635 // This struct will eventually be exported so the balancers can access it. 636 type connectivityStateManager struct { 637 mu sync.Mutex 638 state connectivity.State 639 notifyChan chan struct{} 640 } 641 642 // updateState updates the connectivity.State of ClientConn. 643 // If there's a change it notifies goroutines waiting on state change to 644 // happen. 645 func (csm *connectivityStateManager) updateState(state connectivity.State) { 646 csm.mu.Lock() 647 defer csm.mu.Unlock() 648 if csm.state == connectivity.Shutdown { 649 return 650 } 651 if csm.state == state { 652 return 653 } 654 csm.state = state 655 if csm.notifyChan != nil { 656 // There are other goroutines waiting on this channel. 657 close(csm.notifyChan) 658 csm.notifyChan = nil 659 } 660 } 661 662 func (csm *connectivityStateManager) getState() connectivity.State { 663 csm.mu.Lock() 664 defer csm.mu.Unlock() 665 return csm.state 666 } 667 668 func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { 669 csm.mu.Lock() 670 defer csm.mu.Unlock() 671 if csm.notifyChan == nil { 672 csm.notifyChan = make(chan struct{}) 673 } 674 return csm.notifyChan 675 } 676 677 // ClientConn represents a client connection to an RPC server. 678 type ClientConn struct { 679 ctx context.Context 680 cancel context.CancelFunc 681 682 target string 683 parsedTarget resolver.Target 684 authority string 685 dopts dialOptions 686 csMgr *connectivityStateManager 687 688 balancerBuildOpts balancer.BuildOptions 689 resolverWrapper *ccResolverWrapper 690 blockingpicker *pickerWrapper 691 692 mu sync.RWMutex 693 sc ServiceConfig 694 scRaw string 695 conns map[*addrConn]struct{} 696 // Keepalive parameter can be updated if a GoAway is received. 697 mkp keepalive.ClientParameters 698 curBalancerName string 699 preBalancerName string // previous balancer name. 700 curAddresses []resolver.Address 701 balancerWrapper *ccBalancerWrapper 702 703 channelzID int64 // channelz unique identification number 704 czmu sync.RWMutex 705 callsStarted int64 706 callsSucceeded int64 707 callsFailed int64 708 lastCallStartedTime time.Time 709 } 710 711 // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or 712 // ctx expires. A true value is returned in former case and false in latter. 713 // This is an EXPERIMENTAL API. 714 func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { 715 ch := cc.csMgr.getNotifyChan() 716 if cc.csMgr.getState() != sourceState { 717 return true 718 } 719 select { 720 case <-ctx.Done(): 721 return false 722 case <-ch: 723 return true 724 } 725 } 726 727 // GetState returns the connectivity.State of ClientConn. 728 // This is an EXPERIMENTAL API. 729 func (cc *ClientConn) GetState() connectivity.State { 730 return cc.csMgr.getState() 731 } 732 733 func (cc *ClientConn) scWatcher() { 734 for { 735 select { 736 case sc, ok := <-cc.dopts.scChan: 737 if !ok { 738 return 739 } 740 cc.mu.Lock() 741 // TODO: load balance policy runtime change is ignored. 742 // We may revist this decision in the future. 743 cc.sc = sc 744 cc.scRaw = "" 745 cc.mu.Unlock() 746 case <-cc.ctx.Done(): 747 return 748 } 749 } 750 } 751 752 func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) { 753 cc.mu.Lock() 754 defer cc.mu.Unlock() 755 if cc.conns == nil { 756 // cc was closed. 757 return 758 } 759 760 if reflect.DeepEqual(cc.curAddresses, addrs) { 761 return 762 } 763 764 cc.curAddresses = addrs 765 766 if cc.dopts.balancerBuilder == nil { 767 // Only look at balancer types and switch balancer if balancer dial 768 // option is not set. 769 var isGRPCLB bool 770 for _, a := range addrs { 771 if a.Type == resolver.GRPCLB { 772 isGRPCLB = true 773 break 774 } 775 } 776 var newBalancerName string 777 if isGRPCLB { 778 newBalancerName = grpclbName 779 } else { 780 // Address list doesn't contain grpclb address. Try to pick a 781 // non-grpclb balancer. 782 newBalancerName = cc.curBalancerName 783 // If current balancer is grpclb, switch to the previous one. 784 if newBalancerName == grpclbName { 785 newBalancerName = cc.preBalancerName 786 } 787 // The following could be true in two cases: 788 // - the first time handling resolved addresses 789 // (curBalancerName="") 790 // - the first time handling non-grpclb addresses 791 // (curBalancerName="grpclb", preBalancerName="") 792 if newBalancerName == "" { 793 newBalancerName = PickFirstBalancerName 794 } 795 } 796 cc.switchBalancer(newBalancerName) 797 } else if cc.balancerWrapper == nil { 798 // Balancer dial option was set, and this is the first time handling 799 // resolved addresses. Build a balancer with dopts.balancerBuilder. 800 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) 801 } 802 803 cc.balancerWrapper.handleResolvedAddrs(addrs, nil) 804 } 805 806 // switchBalancer starts the switching from current balancer to the balancer 807 // with the given name. 808 // 809 // It will NOT send the current address list to the new balancer. If needed, 810 // caller of this function should send address list to the new balancer after 811 // this function returns. 812 // 813 // Caller must hold cc.mu. 814 func (cc *ClientConn) switchBalancer(name string) { 815 if cc.conns == nil { 816 return 817 } 818 819 if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) { 820 return 821 } 822 823 grpclog.Infof("ClientConn switching balancer to %q", name) 824 if cc.dopts.balancerBuilder != nil { 825 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead") 826 return 827 } 828 // TODO(bar switching) change this to two steps: drain and close. 829 // Keep track of sc in wrapper. 830 if cc.balancerWrapper != nil { 831 cc.balancerWrapper.close() 832 } 833 // Clear all stickiness state. 834 cc.blockingpicker.clearStickinessState() 835 836 builder := balancer.Get(name) 837 if builder == nil { 838 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name) 839 builder = newPickfirstBuilder() 840 } 841 cc.preBalancerName = cc.curBalancerName 842 cc.curBalancerName = builder.Name() 843 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) 844 } 845 846 func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 847 cc.mu.Lock() 848 if cc.conns == nil { 849 cc.mu.Unlock() 850 return 851 } 852 // TODO(bar switching) send updates to all balancer wrappers when balancer 853 // gracefully switching is supported. 854 cc.balancerWrapper.handleSubConnStateChange(sc, s) 855 cc.mu.Unlock() 856 } 857 858 // newAddrConn creates an addrConn for addrs and adds it to cc.conns. 859 // 860 // Caller needs to make sure len(addrs) > 0. 861 func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) { 862 ac := &addrConn{ 863 cc: cc, 864 addrs: addrs, 865 dopts: cc.dopts, 866 } 867 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 868 // Track ac in cc. This needs to be done before any getTransport(...) is called. 869 cc.mu.Lock() 870 if cc.conns == nil { 871 cc.mu.Unlock() 872 return nil, ErrClientConnClosing 873 } 874 if channelz.IsOn() { 875 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") 876 } 877 cc.conns[ac] = struct{}{} 878 cc.mu.Unlock() 879 return ac, nil 880 } 881 882 // removeAddrConn removes the addrConn in the subConn from clientConn. 883 // It also tears down the ac with the given error. 884 func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { 885 cc.mu.Lock() 886 if cc.conns == nil { 887 cc.mu.Unlock() 888 return 889 } 890 delete(cc.conns, ac) 891 cc.mu.Unlock() 892 ac.tearDown(err) 893 } 894 895 // ChannelzMetric returns ChannelInternalMetric of current ClientConn. 896 // This is an EXPERIMENTAL API. 897 func (cc *ClientConn) ChannelzMetric() *channelz.ChannelInternalMetric { 898 state := cc.GetState() 899 cc.czmu.RLock() 900 defer cc.czmu.RUnlock() 901 return &channelz.ChannelInternalMetric{ 902 State: state, 903 Target: cc.target, 904 CallsStarted: cc.callsStarted, 905 CallsSucceeded: cc.callsSucceeded, 906 CallsFailed: cc.callsFailed, 907 LastCallStartedTimestamp: cc.lastCallStartedTime, 908 } 909 } 910 911 func (cc *ClientConn) incrCallsStarted() { 912 cc.czmu.Lock() 913 cc.callsStarted++ 914 // TODO(yuxuanli): will make this a time.Time pointer improve performance? 915 cc.lastCallStartedTime = time.Now() 916 cc.czmu.Unlock() 917 } 918 919 func (cc *ClientConn) incrCallsSucceeded() { 920 cc.czmu.Lock() 921 cc.callsSucceeded++ 922 cc.czmu.Unlock() 923 } 924 925 func (cc *ClientConn) incrCallsFailed() { 926 cc.czmu.Lock() 927 cc.callsFailed++ 928 cc.czmu.Unlock() 929 } 930 931 // connect starts to creating transport and also starts the transport monitor 932 // goroutine for this ac. 933 // It does nothing if the ac is not IDLE. 934 // TODO(bar) Move this to the addrConn section. 935 // This was part of resetAddrConn, keep it here to make the diff look clean. 936 func (ac *addrConn) connect() error { 937 ac.mu.Lock() 938 if ac.state == connectivity.Shutdown { 939 ac.mu.Unlock() 940 return errConnClosing 941 } 942 if ac.state != connectivity.Idle { 943 ac.mu.Unlock() 944 return nil 945 } 946 ac.state = connectivity.Connecting 947 ac.cc.handleSubConnStateChange(ac.acbw, ac.state) 948 ac.mu.Unlock() 949 950 // Start a goroutine connecting to the server asynchronously. 951 go func() { 952 if err := ac.resetTransport(); err != nil { 953 grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err) 954 if err != errConnClosing { 955 // Keep this ac in cc.conns, to get the reason it's torn down. 956 ac.tearDown(err) 957 } 958 return 959 } 960 ac.transportMonitor() 961 }() 962 return nil 963 } 964 965 // tryUpdateAddrs tries to update ac.addrs with the new addresses list. 966 // 967 // It checks whether current connected address of ac is in the new addrs list. 968 // - If true, it updates ac.addrs and returns true. The ac will keep using 969 // the existing connection. 970 // - If false, it does nothing and returns false. 971 func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { 972 ac.mu.Lock() 973 defer ac.mu.Unlock() 974 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) 975 if ac.state == connectivity.Shutdown { 976 ac.addrs = addrs 977 return true 978 } 979 980 var curAddrFound bool 981 for _, a := range addrs { 982 if reflect.DeepEqual(ac.curAddr, a) { 983 curAddrFound = true 984 break 985 } 986 } 987 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) 988 if curAddrFound { 989 ac.addrs = addrs 990 ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list. 991 } 992 993 return curAddrFound 994 } 995 996 // GetMethodConfig gets the method config of the input method. 997 // If there's an exact match for input method (i.e. /service/method), we return 998 // the corresponding MethodConfig. 999 // If there isn't an exact match for the input method, we look for the default config 1000 // under the service (i.e /service/). If there is a default MethodConfig for 1001 // the service, we return it. 1002 // Otherwise, we return an empty MethodConfig. 1003 func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { 1004 // TODO: Avoid the locking here. 1005 cc.mu.RLock() 1006 defer cc.mu.RUnlock() 1007 m, ok := cc.sc.Methods[method] 1008 if !ok { 1009 i := strings.LastIndex(method, "/") 1010 m = cc.sc.Methods[method[:i+1]] 1011 } 1012 return m 1013 } 1014 1015 func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transport.ClientTransport, func(balancer.DoneInfo), error) { 1016 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{}) 1017 if err != nil { 1018 return nil, nil, toRPCErr(err) 1019 } 1020 return t, done, nil 1021 } 1022 1023 // handleServiceConfig parses the service config string in JSON format to Go native 1024 // struct ServiceConfig, and store both the struct and the JSON string in ClientConn. 1025 func (cc *ClientConn) handleServiceConfig(js string) error { 1026 if cc.dopts.disableServiceConfig { 1027 return nil 1028 } 1029 sc, err := parseServiceConfig(js) 1030 if err != nil { 1031 return err 1032 } 1033 cc.mu.Lock() 1034 cc.scRaw = js 1035 cc.sc = sc 1036 if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config. 1037 if cc.curBalancerName == grpclbName { 1038 // If current balancer is grpclb, there's at least one grpclb 1039 // balancer address in the resolved list. Don't switch the balancer, 1040 // but change the previous balancer name, so if a new resolved 1041 // address list doesn't contain grpclb address, balancer will be 1042 // switched to *sc.LB. 1043 cc.preBalancerName = *sc.LB 1044 } else { 1045 cc.switchBalancer(*sc.LB) 1046 cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil) 1047 } 1048 } 1049 1050 if envConfigStickinessOn { 1051 var newStickinessMDKey string 1052 if sc.stickinessMetadataKey != nil && *sc.stickinessMetadataKey != "" { 1053 newStickinessMDKey = *sc.stickinessMetadataKey 1054 } 1055 // newStickinessMDKey is "" if one of the following happens: 1056 // - stickinessMetadataKey is set to "" 1057 // - stickinessMetadataKey field doesn't exist in service config 1058 cc.blockingpicker.updateStickinessMDKey(strings.ToLower(newStickinessMDKey)) 1059 } 1060 1061 cc.mu.Unlock() 1062 return nil 1063 } 1064 1065 func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) { 1066 cc.mu.RLock() 1067 r := cc.resolverWrapper 1068 cc.mu.RUnlock() 1069 if r == nil { 1070 return 1071 } 1072 go r.resolveNow(o) 1073 } 1074 1075 // Close tears down the ClientConn and all underlying connections. 1076 func (cc *ClientConn) Close() error { 1077 defer cc.cancel() 1078 1079 cc.mu.Lock() 1080 if cc.conns == nil { 1081 cc.mu.Unlock() 1082 return ErrClientConnClosing 1083 } 1084 conns := cc.conns 1085 cc.conns = nil 1086 cc.csMgr.updateState(connectivity.Shutdown) 1087 1088 rWrapper := cc.resolverWrapper 1089 cc.resolverWrapper = nil 1090 bWrapper := cc.balancerWrapper 1091 cc.balancerWrapper = nil 1092 cc.mu.Unlock() 1093 1094 cc.blockingpicker.close() 1095 1096 if rWrapper != nil { 1097 rWrapper.close() 1098 } 1099 if bWrapper != nil { 1100 bWrapper.close() 1101 } 1102 1103 for ac := range conns { 1104 ac.tearDown(ErrClientConnClosing) 1105 } 1106 if channelz.IsOn() { 1107 channelz.RemoveEntry(cc.channelzID) 1108 } 1109 return nil 1110 } 1111 1112 // addrConn is a network connection to a given address. 1113 type addrConn struct { 1114 ctx context.Context 1115 cancel context.CancelFunc 1116 1117 cc *ClientConn 1118 addrs []resolver.Address 1119 dopts dialOptions 1120 events trace.EventLog 1121 acbw balancer.SubConn 1122 1123 mu sync.Mutex 1124 curAddr resolver.Address 1125 reconnectIdx int // The index in addrs list to start reconnecting from. 1126 state connectivity.State 1127 // ready is closed and becomes nil when a new transport is up or failed 1128 // due to timeout. 1129 ready chan struct{} 1130 transport transport.ClientTransport 1131 1132 // The reason this addrConn is torn down. 1133 tearDownErr error 1134 1135 connectRetryNum int 1136 // backoffDeadline is the time until which resetTransport needs to 1137 // wait before increasing connectRetryNum count. 1138 backoffDeadline time.Time 1139 // connectDeadline is the time by which all connection 1140 // negotiations must complete. 1141 connectDeadline time.Time 1142 1143 channelzID int64 // channelz unique identification number 1144 czmu sync.RWMutex 1145 callsStarted int64 1146 callsSucceeded int64 1147 callsFailed int64 1148 lastCallStartedTime time.Time 1149 } 1150 1151 // adjustParams updates parameters used to create transports upon 1152 // receiving a GoAway. 1153 func (ac *addrConn) adjustParams(r transport.GoAwayReason) { 1154 switch r { 1155 case transport.GoAwayTooManyPings: 1156 v := 2 * ac.dopts.copts.KeepaliveParams.Time 1157 ac.cc.mu.Lock() 1158 if v > ac.cc.mkp.Time { 1159 ac.cc.mkp.Time = v 1160 } 1161 ac.cc.mu.Unlock() 1162 } 1163 } 1164 1165 // printf records an event in ac's event log, unless ac has been closed. 1166 // REQUIRES ac.mu is held. 1167 func (ac *addrConn) printf(format string, a ...interface{}) { 1168 if ac.events != nil { 1169 ac.events.Printf(format, a...) 1170 } 1171 } 1172 1173 // errorf records an error in ac's event log, unless ac has been closed. 1174 // REQUIRES ac.mu is held. 1175 func (ac *addrConn) errorf(format string, a ...interface{}) { 1176 if ac.events != nil { 1177 ac.events.Errorf(format, a...) 1178 } 1179 } 1180 1181 // resetTransport recreates a transport to the address for ac. The old 1182 // transport will close itself on error or when the clientconn is closed. 1183 // The created transport must receive initial settings frame from the server. 1184 // In case that doesn't happen, transportMonitor will kill the newly created 1185 // transport after connectDeadline has expired. 1186 // In case there was an error on the transport before the settings frame was 1187 // received, resetTransport resumes connecting to backends after the one that 1188 // was previously connected to. In case end of the list is reached, resetTransport 1189 // backs off until the original deadline. 1190 // If the DialOption WithWaitForHandshake was set, resetTrasport returns 1191 // successfully only after server settings are received. 1192 // 1193 // TODO(bar) make sure all state transitions are valid. 1194 func (ac *addrConn) resetTransport() error { 1195 ac.mu.Lock() 1196 if ac.state == connectivity.Shutdown { 1197 ac.mu.Unlock() 1198 return errConnClosing 1199 } 1200 if ac.ready != nil { 1201 close(ac.ready) 1202 ac.ready = nil 1203 } 1204 ac.transport = nil 1205 ridx := ac.reconnectIdx 1206 ac.mu.Unlock() 1207 ac.cc.mu.RLock() 1208 ac.dopts.copts.KeepaliveParams = ac.cc.mkp 1209 ac.cc.mu.RUnlock() 1210 var backoffDeadline, connectDeadline time.Time 1211 for connectRetryNum := 0; ; connectRetryNum++ { 1212 ac.mu.Lock() 1213 if ac.backoffDeadline.IsZero() { 1214 // This means either a successful HTTP2 connection was established 1215 // or this is the first time this addrConn is trying to establish a 1216 // connection. 1217 backoffFor := ac.dopts.bs.Backoff(connectRetryNum) // time.Duration. 1218 // This will be the duration that dial gets to finish. 1219 dialDuration := getMinConnectTimeout() 1220 if backoffFor > dialDuration { 1221 // Give dial more time as we keep failing to connect. 1222 dialDuration = backoffFor 1223 } 1224 start := time.Now() 1225 backoffDeadline = start.Add(backoffFor) 1226 connectDeadline = start.Add(dialDuration) 1227 ridx = 0 // Start connecting from the beginning. 1228 } else { 1229 // Continue trying to connect with the same deadlines. 1230 connectRetryNum = ac.connectRetryNum 1231 backoffDeadline = ac.backoffDeadline 1232 connectDeadline = ac.connectDeadline 1233 ac.backoffDeadline = time.Time{} 1234 ac.connectDeadline = time.Time{} 1235 ac.connectRetryNum = 0 1236 } 1237 if ac.state == connectivity.Shutdown { 1238 ac.mu.Unlock() 1239 return errConnClosing 1240 } 1241 ac.printf("connecting") 1242 if ac.state != connectivity.Connecting { 1243 ac.state = connectivity.Connecting 1244 ac.cc.handleSubConnStateChange(ac.acbw, ac.state) 1245 } 1246 // copy ac.addrs in case of race 1247 addrsIter := make([]resolver.Address, len(ac.addrs)) 1248 copy(addrsIter, ac.addrs) 1249 copts := ac.dopts.copts 1250 ac.mu.Unlock() 1251 connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts) 1252 if err != nil { 1253 return err 1254 } 1255 if connected { 1256 return nil 1257 } 1258 } 1259 } 1260 1261 // createTransport creates a connection to one of the backends in addrs. 1262 // It returns true if a connection was established. 1263 func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) { 1264 for i := ridx; i < len(addrs); i++ { 1265 addr := addrs[i] 1266 target := transport.TargetInfo{ 1267 Addr: addr.Addr, 1268 Metadata: addr.Metadata, 1269 Authority: ac.cc.authority, 1270 } 1271 done := make(chan struct{}) 1272 onPrefaceReceipt := func() { 1273 ac.mu.Lock() 1274 close(done) 1275 if !ac.backoffDeadline.IsZero() { 1276 // If we haven't already started reconnecting to 1277 // other backends. 1278 // Note, this can happen when writer notices an error 1279 // and triggers resetTransport while at the same time 1280 // reader receives the preface and invokes this closure. 1281 ac.backoffDeadline = time.Time{} 1282 ac.connectDeadline = time.Time{} 1283 ac.connectRetryNum = 0 1284 } 1285 ac.mu.Unlock() 1286 } 1287 // Do not cancel in the success path because of 1288 // this issue in Go1.6: https://github.com/golang/go/issues/15078. 1289 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) 1290 if channelz.IsOn() { 1291 copts.ChannelzParentID = ac.channelzID 1292 } 1293 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt) 1294 if err != nil { 1295 cancel() 1296 ac.cc.blockingpicker.updateConnectionError(err) 1297 ac.mu.Lock() 1298 if ac.state == connectivity.Shutdown { 1299 // ac.tearDown(...) has been invoked. 1300 ac.mu.Unlock() 1301 return false, errConnClosing 1302 } 1303 ac.mu.Unlock() 1304 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) 1305 continue 1306 } 1307 if ac.dopts.waitForHandshake { 1308 select { 1309 case <-done: 1310 case <-connectCtx.Done(): 1311 // Didn't receive server preface, must kill this new transport now. 1312 grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.") 1313 newTr.Close() 1314 break 1315 case <-ac.ctx.Done(): 1316 } 1317 } 1318 ac.mu.Lock() 1319 if ac.state == connectivity.Shutdown { 1320 ac.mu.Unlock() 1321 // ac.tearDonn(...) has been invoked. 1322 newTr.Close() 1323 return false, errConnClosing 1324 } 1325 ac.printf("ready") 1326 ac.state = connectivity.Ready 1327 ac.cc.handleSubConnStateChange(ac.acbw, ac.state) 1328 ac.transport = newTr 1329 ac.curAddr = addr 1330 if ac.ready != nil { 1331 close(ac.ready) 1332 ac.ready = nil 1333 } 1334 select { 1335 case <-done: 1336 // If the server has responded back with preface already, 1337 // don't set the reconnect parameters. 1338 default: 1339 ac.connectRetryNum = connectRetryNum 1340 ac.backoffDeadline = backoffDeadline 1341 ac.connectDeadline = connectDeadline 1342 ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list. 1343 } 1344 ac.mu.Unlock() 1345 return true, nil 1346 } 1347 ac.mu.Lock() 1348 if ac.state == connectivity.Shutdown { 1349 ac.mu.Unlock() 1350 return false, errConnClosing 1351 } 1352 ac.state = connectivity.TransientFailure 1353 ac.cc.handleSubConnStateChange(ac.acbw, ac.state) 1354 ac.cc.resolveNow(resolver.ResolveNowOption{}) 1355 if ac.ready != nil { 1356 close(ac.ready) 1357 ac.ready = nil 1358 } 1359 ac.mu.Unlock() 1360 timer := time.NewTimer(backoffDeadline.Sub(time.Now())) 1361 select { 1362 case <-timer.C: 1363 case <-ac.ctx.Done(): 1364 timer.Stop() 1365 return false, ac.ctx.Err() 1366 } 1367 return false, nil 1368 } 1369 1370 // Run in a goroutine to track the error in transport and create the 1371 // new transport if an error happens. It returns when the channel is closing. 1372 func (ac *addrConn) transportMonitor() { 1373 for { 1374 var timer *time.Timer 1375 var cdeadline <-chan time.Time 1376 ac.mu.Lock() 1377 t := ac.transport 1378 if !ac.connectDeadline.IsZero() { 1379 timer = time.NewTimer(ac.connectDeadline.Sub(time.Now())) 1380 cdeadline = timer.C 1381 } 1382 ac.mu.Unlock() 1383 // Block until we receive a goaway or an error occurs. 1384 select { 1385 case <-t.GoAway(): 1386 done := t.Error() 1387 cleanup := t.Close 1388 // Since this transport will be orphaned (won't have a transportMonitor) 1389 // we need to launch a goroutine to keep track of clientConn.Close() 1390 // happening since it might not be noticed by any other goroutine for a while. 1391 go func() { 1392 <-done 1393 cleanup() 1394 }() 1395 case <-t.Error(): 1396 // In case this is triggered because clientConn.Close() 1397 // was called, we want to immeditately close the transport 1398 // since no other goroutine might notice it for a while. 1399 t.Close() 1400 case <-cdeadline: 1401 ac.mu.Lock() 1402 // This implies that client received server preface. 1403 if ac.backoffDeadline.IsZero() { 1404 ac.mu.Unlock() 1405 continue 1406 } 1407 ac.mu.Unlock() 1408 timer = nil 1409 // No server preface received until deadline. 1410 // Kill the connection. 1411 grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.") 1412 t.Close() 1413 } 1414 if timer != nil { 1415 timer.Stop() 1416 } 1417 // If a GoAway happened, regardless of error, adjust our keepalive 1418 // parameters as appropriate. 1419 select { 1420 case <-t.GoAway(): 1421 ac.adjustParams(t.GetGoAwayReason()) 1422 default: 1423 } 1424 ac.mu.Lock() 1425 if ac.state == connectivity.Shutdown { 1426 ac.mu.Unlock() 1427 return 1428 } 1429 // Set connectivity state to TransientFailure before calling 1430 // resetTransport. Transition READY->CONNECTING is not valid. 1431 ac.state = connectivity.TransientFailure 1432 ac.cc.handleSubConnStateChange(ac.acbw, ac.state) 1433 ac.cc.resolveNow(resolver.ResolveNowOption{}) 1434 ac.curAddr = resolver.Address{} 1435 ac.mu.Unlock() 1436 if err := ac.resetTransport(); err != nil { 1437 ac.mu.Lock() 1438 ac.printf("transport exiting: %v", err) 1439 ac.mu.Unlock() 1440 grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err) 1441 if err != errConnClosing { 1442 // Keep this ac in cc.conns, to get the reason it's torn down. 1443 ac.tearDown(err) 1444 } 1445 return 1446 } 1447 } 1448 } 1449 1450 // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or 1451 // iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true. 1452 func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) { 1453 for { 1454 ac.mu.Lock() 1455 switch { 1456 case ac.state == connectivity.Shutdown: 1457 if failfast || !hasBalancer { 1458 // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr. 1459 err := ac.tearDownErr 1460 ac.mu.Unlock() 1461 return nil, err 1462 } 1463 ac.mu.Unlock() 1464 return nil, errConnClosing 1465 case ac.state == connectivity.Ready: 1466 ct := ac.transport 1467 ac.mu.Unlock() 1468 return ct, nil 1469 case ac.state == connectivity.TransientFailure: 1470 if failfast || hasBalancer { 1471 ac.mu.Unlock() 1472 return nil, errConnUnavailable 1473 } 1474 } 1475 ready := ac.ready 1476 if ready == nil { 1477 ready = make(chan struct{}) 1478 ac.ready = ready 1479 } 1480 ac.mu.Unlock() 1481 select { 1482 case <-ctx.Done(): 1483 return nil, toRPCErr(ctx.Err()) 1484 // Wait until the new transport is ready or failed. 1485 case <-ready: 1486 } 1487 } 1488 } 1489 1490 // getReadyTransport returns the transport if ac's state is READY. 1491 // Otherwise it returns nil, false. 1492 // If ac's state is IDLE, it will trigger ac to connect. 1493 func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { 1494 ac.mu.Lock() 1495 if ac.state == connectivity.Ready { 1496 t := ac.transport 1497 ac.mu.Unlock() 1498 return t, true 1499 } 1500 var idle bool 1501 if ac.state == connectivity.Idle { 1502 idle = true 1503 } 1504 ac.mu.Unlock() 1505 // Trigger idle ac to connect. 1506 if idle { 1507 ac.connect() 1508 } 1509 return nil, false 1510 } 1511 1512 // tearDown starts to tear down the addrConn. 1513 // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in 1514 // some edge cases (e.g., the caller opens and closes many addrConn's in a 1515 // tight loop. 1516 // tearDown doesn't remove ac from ac.cc.conns. 1517 func (ac *addrConn) tearDown(err error) { 1518 ac.cancel() 1519 ac.mu.Lock() 1520 defer ac.mu.Unlock() 1521 if ac.state == connectivity.Shutdown { 1522 return 1523 } 1524 ac.curAddr = resolver.Address{} 1525 if err == errConnDrain && ac.transport != nil { 1526 // GracefulClose(...) may be executed multiple times when 1527 // i) receiving multiple GoAway frames from the server; or 1528 // ii) there are concurrent name resolver/Balancer triggered 1529 // address removal and GoAway. 1530 ac.transport.GracefulClose() 1531 } 1532 ac.state = connectivity.Shutdown 1533 ac.tearDownErr = err 1534 ac.cc.handleSubConnStateChange(ac.acbw, ac.state) 1535 if ac.events != nil { 1536 ac.events.Finish() 1537 ac.events = nil 1538 } 1539 if ac.ready != nil { 1540 close(ac.ready) 1541 ac.ready = nil 1542 } 1543 if channelz.IsOn() { 1544 channelz.RemoveEntry(ac.channelzID) 1545 } 1546 } 1547 1548 func (ac *addrConn) getState() connectivity.State { 1549 ac.mu.Lock() 1550 defer ac.mu.Unlock() 1551 return ac.state 1552 } 1553 1554 func (ac *addrConn) getCurAddr() (ret resolver.Address) { 1555 ac.mu.Lock() 1556 ret = ac.curAddr 1557 ac.mu.Unlock() 1558 return 1559 } 1560 1561 func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { 1562 ac.mu.Lock() 1563 addr := ac.curAddr.Addr 1564 ac.mu.Unlock() 1565 state := ac.getState() 1566 ac.czmu.RLock() 1567 defer ac.czmu.RUnlock() 1568 return &channelz.ChannelInternalMetric{ 1569 State: state, 1570 Target: addr, 1571 CallsStarted: ac.callsStarted, 1572 CallsSucceeded: ac.callsSucceeded, 1573 CallsFailed: ac.callsFailed, 1574 LastCallStartedTimestamp: ac.lastCallStartedTime, 1575 } 1576 } 1577 1578 func (ac *addrConn) incrCallsStarted() { 1579 ac.czmu.Lock() 1580 ac.callsStarted++ 1581 ac.lastCallStartedTime = time.Now() 1582 ac.czmu.Unlock() 1583 } 1584 1585 func (ac *addrConn) incrCallsSucceeded() { 1586 ac.czmu.Lock() 1587 ac.callsSucceeded++ 1588 ac.czmu.Unlock() 1589 } 1590 1591 func (ac *addrConn) incrCallsFailed() { 1592 ac.czmu.Lock() 1593 ac.callsFailed++ 1594 ac.czmu.Unlock() 1595 } 1596 1597 // ErrClientConnTimeout indicates that the ClientConn cannot establish the 1598 // underlying connections within the specified timeout. 1599 // 1600 // Deprecated: This error is never returned by grpc and should not be 1601 // referenced by users. 1602 var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 1603