1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package grpc 20 21 import ( 22 "fmt" 23 "strings" 24 25 "google.golang.org/grpc/grpclog" 26 "google.golang.org/grpc/resolver" 27 ) 28 29 // ccResolverWrapper is a wrapper on top of cc for resolvers. 30 // It implements resolver.ClientConnection interface. 31 type ccResolverWrapper struct { 32 cc *ClientConn 33 resolver resolver.Resolver 34 addrCh chan []resolver.Address 35 scCh chan string 36 done chan struct{} 37 } 38 39 // split2 returns the values from strings.SplitN(s, sep, 2). 40 // If sep is not found, it returns ("", s, false) instead. 41 func split2(s, sep string) (string, string, bool) { 42 spl := strings.SplitN(s, sep, 2) 43 if len(spl) < 2 { 44 return "", "", false 45 } 46 return spl[0], spl[1], true 47 } 48 49 // parseTarget splits target into a struct containing scheme, authority and 50 // endpoint. 51 // 52 // If target is not a valid scheme://authority/endpoint, it returns {Endpoint: 53 // target}. 54 func parseTarget(target string) (ret resolver.Target) { 55 var ok bool 56 ret.Scheme, ret.Endpoint, ok = split2(target, "://") 57 if !ok { 58 return resolver.Target{Endpoint: target} 59 } 60 ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/") 61 if !ok { 62 return resolver.Target{Endpoint: target} 63 } 64 return ret 65 } 66 67 // newCCResolverWrapper parses cc.target for scheme and gets the resolver 68 // builder for this scheme and builds the resolver. The monitoring goroutine 69 // for it is not started yet and can be created by calling start(). 70 // 71 // If withResolverBuilder dial option is set, the specified resolver will be 72 // used instead. 73 func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) { 74 rb := cc.dopts.resolverBuilder 75 if rb == nil { 76 return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme) 77 } 78 79 ccr := &ccResolverWrapper{ 80 cc: cc, 81 addrCh: make(chan []resolver.Address, 1), 82 scCh: make(chan string, 1), 83 done: make(chan struct{}), 84 } 85 86 var err error 87 ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig}) 88 if err != nil { 89 return nil, err 90 } 91 return ccr, nil 92 } 93 94 func (ccr *ccResolverWrapper) start() { 95 go ccr.watcher() 96 } 97 98 // watcher processes address updates and service config updates sequentially. 99 // Otherwise, we need to resolve possible races between address and service 100 // config (e.g. they specify different balancer types). 101 func (ccr *ccResolverWrapper) watcher() { 102 for { 103 select { 104 case <-ccr.done: 105 return 106 default: 107 } 108 109 select { 110 case addrs := <-ccr.addrCh: 111 select { 112 case <-ccr.done: 113 return 114 default: 115 } 116 grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs) 117 ccr.cc.handleResolvedAddrs(addrs, nil) 118 case sc := <-ccr.scCh: 119 select { 120 case <-ccr.done: 121 return 122 default: 123 } 124 grpclog.Infof("ccResolverWrapper: got new service config: %v", sc) 125 ccr.cc.handleServiceConfig(sc) 126 case <-ccr.done: 127 return 128 } 129 } 130 } 131 132 func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) { 133 ccr.resolver.ResolveNow(o) 134 } 135 136 func (ccr *ccResolverWrapper) close() { 137 ccr.resolver.Close() 138 close(ccr.done) 139 } 140 141 // NewAddress is called by the resolver implemenetion to send addresses to gRPC. 142 func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { 143 select { 144 case <-ccr.addrCh: 145 default: 146 } 147 ccr.addrCh <- addrs 148 } 149 150 // NewServiceConfig is called by the resolver implemenetion to send service 151 // configs to gRPC. 152 func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { 153 select { 154 case <-ccr.scCh: 155 default: 156 } 157 ccr.scCh <- sc 158 } 159