Home | History | Annotate | Download | only in grpc
      1 /*
      2  *
      3  * Copyright 2017 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package grpc
     20 
     21 import (
     22 	"fmt"
     23 	"strings"
     24 
     25 	"google.golang.org/grpc/grpclog"
     26 	"google.golang.org/grpc/resolver"
     27 )
     28 
     29 // ccResolverWrapper is a wrapper on top of cc for resolvers.
     30 // It implements resolver.ClientConnection interface.
     31 type ccResolverWrapper struct {
     32 	cc       *ClientConn
     33 	resolver resolver.Resolver
     34 	addrCh   chan []resolver.Address
     35 	scCh     chan string
     36 	done     chan struct{}
     37 }
     38 
     39 // split2 returns the values from strings.SplitN(s, sep, 2).
     40 // If sep is not found, it returns ("", s, false) instead.
     41 func split2(s, sep string) (string, string, bool) {
     42 	spl := strings.SplitN(s, sep, 2)
     43 	if len(spl) < 2 {
     44 		return "", "", false
     45 	}
     46 	return spl[0], spl[1], true
     47 }
     48 
     49 // parseTarget splits target into a struct containing scheme, authority and
     50 // endpoint.
     51 //
     52 // If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
     53 // target}.
     54 func parseTarget(target string) (ret resolver.Target) {
     55 	var ok bool
     56 	ret.Scheme, ret.Endpoint, ok = split2(target, "://")
     57 	if !ok {
     58 		return resolver.Target{Endpoint: target}
     59 	}
     60 	ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
     61 	if !ok {
     62 		return resolver.Target{Endpoint: target}
     63 	}
     64 	return ret
     65 }
     66 
     67 // newCCResolverWrapper parses cc.target for scheme and gets the resolver
     68 // builder for this scheme and builds the resolver. The monitoring goroutine
     69 // for it is not started yet and can be created by calling start().
     70 //
     71 // If withResolverBuilder dial option is set, the specified resolver will be
     72 // used instead.
     73 func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
     74 	rb := cc.dopts.resolverBuilder
     75 	if rb == nil {
     76 		return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme)
     77 	}
     78 
     79 	ccr := &ccResolverWrapper{
     80 		cc:     cc,
     81 		addrCh: make(chan []resolver.Address, 1),
     82 		scCh:   make(chan string, 1),
     83 		done:   make(chan struct{}),
     84 	}
     85 
     86 	var err error
     87 	ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})
     88 	if err != nil {
     89 		return nil, err
     90 	}
     91 	return ccr, nil
     92 }
     93 
     94 func (ccr *ccResolverWrapper) start() {
     95 	go ccr.watcher()
     96 }
     97 
     98 // watcher processes address updates and service config updates sequentially.
     99 // Otherwise, we need to resolve possible races between address and service
    100 // config (e.g. they specify different balancer types).
    101 func (ccr *ccResolverWrapper) watcher() {
    102 	for {
    103 		select {
    104 		case <-ccr.done:
    105 			return
    106 		default:
    107 		}
    108 
    109 		select {
    110 		case addrs := <-ccr.addrCh:
    111 			select {
    112 			case <-ccr.done:
    113 				return
    114 			default:
    115 			}
    116 			grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
    117 			ccr.cc.handleResolvedAddrs(addrs, nil)
    118 		case sc := <-ccr.scCh:
    119 			select {
    120 			case <-ccr.done:
    121 				return
    122 			default:
    123 			}
    124 			grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
    125 			ccr.cc.handleServiceConfig(sc)
    126 		case <-ccr.done:
    127 			return
    128 		}
    129 	}
    130 }
    131 
    132 func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) {
    133 	ccr.resolver.ResolveNow(o)
    134 }
    135 
    136 func (ccr *ccResolverWrapper) close() {
    137 	ccr.resolver.Close()
    138 	close(ccr.done)
    139 }
    140 
    141 // NewAddress is called by the resolver implemenetion to send addresses to gRPC.
    142 func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
    143 	select {
    144 	case <-ccr.addrCh:
    145 	default:
    146 	}
    147 	ccr.addrCh <- addrs
    148 }
    149 
    150 // NewServiceConfig is called by the resolver implemenetion to send service
    151 // configs to gRPC.
    152 func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
    153 	select {
    154 	case <-ccr.scCh:
    155 	default:
    156 	}
    157 	ccr.scCh <- sc
    158 }
    159