1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package grpc 20 21 import ( 22 "fmt" 23 "net" 24 "sync" 25 26 "golang.org/x/net/context" 27 "google.golang.org/grpc/codes" 28 "google.golang.org/grpc/credentials" 29 "google.golang.org/grpc/grpclog" 30 "google.golang.org/grpc/naming" 31 "google.golang.org/grpc/status" 32 ) 33 34 // Address represents a server the client connects to. 35 // 36 // Deprecated: please use package balancer. 37 type Address struct { 38 // Addr is the server address on which a connection will be established. 39 Addr string 40 // Metadata is the information associated with Addr, which may be used 41 // to make load balancing decision. 42 Metadata interface{} 43 } 44 45 // BalancerConfig specifies the configurations for Balancer. 46 // 47 // Deprecated: please use package balancer. 48 type BalancerConfig struct { 49 // DialCreds is the transport credential the Balancer implementation can 50 // use to dial to a remote load balancer server. The Balancer implementations 51 // can ignore this if it does not need to talk to another party securely. 52 DialCreds credentials.TransportCredentials 53 // Dialer is the custom dialer the Balancer implementation can use to dial 54 // to a remote load balancer server. The Balancer implementations 55 // can ignore this if it doesn't need to talk to remote balancer. 56 Dialer func(context.Context, string) (net.Conn, error) 57 } 58 59 // BalancerGetOptions configures a Get call. 60 // 61 // Deprecated: please use package balancer. 62 type BalancerGetOptions struct { 63 // BlockingWait specifies whether Get should block when there is no 64 // connected address. 65 BlockingWait bool 66 } 67 68 // Balancer chooses network addresses for RPCs. 69 // 70 // Deprecated: please use package balancer. 71 type Balancer interface { 72 // Start does the initialization work to bootstrap a Balancer. For example, 73 // this function may start the name resolution and watch the updates. It will 74 // be called when dialing. 75 Start(target string, config BalancerConfig) error 76 // Up informs the Balancer that gRPC has a connection to the server at 77 // addr. It returns down which is called once the connection to addr gets 78 // lost or closed. 79 // TODO: It is not clear how to construct and take advantage of the meaningful error 80 // parameter for down. Need realistic demands to guide. 81 Up(addr Address) (down func(error)) 82 // Get gets the address of a server for the RPC corresponding to ctx. 83 // i) If it returns a connected address, gRPC internals issues the RPC on the 84 // connection to this address; 85 // ii) If it returns an address on which the connection is under construction 86 // (initiated by Notify(...)) but not connected, gRPC internals 87 // * fails RPC if the RPC is fail-fast and connection is in the TransientFailure or 88 // Shutdown state; 89 // or 90 // * issues RPC on the connection otherwise. 91 // iii) If it returns an address on which the connection does not exist, gRPC 92 // internals treats it as an error and will fail the corresponding RPC. 93 // 94 // Therefore, the following is the recommended rule when writing a custom Balancer. 95 // If opts.BlockingWait is true, it should return a connected address or 96 // block if there is no connected address. It should respect the timeout or 97 // cancellation of ctx when blocking. If opts.BlockingWait is false (for fail-fast 98 // RPCs), it should return an address it has notified via Notify(...) immediately 99 // instead of blocking. 100 // 101 // The function returns put which is called once the rpc has completed or failed. 102 // put can collect and report RPC stats to a remote load balancer. 103 // 104 // This function should only return the errors Balancer cannot recover by itself. 105 // gRPC internals will fail the RPC if an error is returned. 106 Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) 107 // Notify returns a channel that is used by gRPC internals to watch the addresses 108 // gRPC needs to connect. The addresses might be from a name resolver or remote 109 // load balancer. gRPC internals will compare it with the existing connected 110 // addresses. If the address Balancer notified is not in the existing connected 111 // addresses, gRPC starts to connect the address. If an address in the existing 112 // connected addresses is not in the notification list, the corresponding connection 113 // is shutdown gracefully. Otherwise, there are no operations to take. Note that 114 // the Address slice must be the full list of the Addresses which should be connected. 115 // It is NOT delta. 116 Notify() <-chan []Address 117 // Close shuts down the balancer. 118 Close() error 119 } 120 121 // downErr implements net.Error. It is constructed by gRPC internals and passed to the down 122 // call of Balancer. 123 type downErr struct { 124 timeout bool 125 temporary bool 126 desc string 127 } 128 129 func (e downErr) Error() string { return e.desc } 130 func (e downErr) Timeout() bool { return e.timeout } 131 func (e downErr) Temporary() bool { return e.temporary } 132 133 func downErrorf(timeout, temporary bool, format string, a ...interface{}) downErr { 134 return downErr{ 135 timeout: timeout, 136 temporary: temporary, 137 desc: fmt.Sprintf(format, a...), 138 } 139 } 140 141 // RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch 142 // the name resolution updates and updates the addresses available correspondingly. 143 // 144 // Deprecated: please use package balancer/roundrobin. 145 func RoundRobin(r naming.Resolver) Balancer { 146 return &roundRobin{r: r} 147 } 148 149 type addrInfo struct { 150 addr Address 151 connected bool 152 } 153 154 type roundRobin struct { 155 r naming.Resolver 156 w naming.Watcher 157 addrs []*addrInfo // all the addresses the client should potentially connect 158 mu sync.Mutex 159 addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to. 160 next int // index of the next address to return for Get() 161 waitCh chan struct{} // the channel to block when there is no connected address available 162 done bool // The Balancer is closed. 163 } 164 165 func (rr *roundRobin) watchAddrUpdates() error { 166 updates, err := rr.w.Next() 167 if err != nil { 168 grpclog.Warningf("grpc: the naming watcher stops working due to %v.", err) 169 return err 170 } 171 rr.mu.Lock() 172 defer rr.mu.Unlock() 173 for _, update := range updates { 174 addr := Address{ 175 Addr: update.Addr, 176 Metadata: update.Metadata, 177 } 178 switch update.Op { 179 case naming.Add: 180 var exist bool 181 for _, v := range rr.addrs { 182 if addr == v.addr { 183 exist = true 184 grpclog.Infoln("grpc: The name resolver wanted to add an existing address: ", addr) 185 break 186 } 187 } 188 if exist { 189 continue 190 } 191 rr.addrs = append(rr.addrs, &addrInfo{addr: addr}) 192 case naming.Delete: 193 for i, v := range rr.addrs { 194 if addr == v.addr { 195 copy(rr.addrs[i:], rr.addrs[i+1:]) 196 rr.addrs = rr.addrs[:len(rr.addrs)-1] 197 break 198 } 199 } 200 default: 201 grpclog.Errorln("Unknown update.Op ", update.Op) 202 } 203 } 204 // Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified. 205 open := make([]Address, len(rr.addrs)) 206 for i, v := range rr.addrs { 207 open[i] = v.addr 208 } 209 if rr.done { 210 return ErrClientConnClosing 211 } 212 select { 213 case <-rr.addrCh: 214 default: 215 } 216 rr.addrCh <- open 217 return nil 218 } 219 220 func (rr *roundRobin) Start(target string, config BalancerConfig) error { 221 rr.mu.Lock() 222 defer rr.mu.Unlock() 223 if rr.done { 224 return ErrClientConnClosing 225 } 226 if rr.r == nil { 227 // If there is no name resolver installed, it is not needed to 228 // do name resolution. In this case, target is added into rr.addrs 229 // as the only address available and rr.addrCh stays nil. 230 rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}}) 231 return nil 232 } 233 w, err := rr.r.Resolve(target) 234 if err != nil { 235 return err 236 } 237 rr.w = w 238 rr.addrCh = make(chan []Address, 1) 239 go func() { 240 for { 241 if err := rr.watchAddrUpdates(); err != nil { 242 return 243 } 244 } 245 }() 246 return nil 247 } 248 249 // Up sets the connected state of addr and sends notification if there are pending 250 // Get() calls. 251 func (rr *roundRobin) Up(addr Address) func(error) { 252 rr.mu.Lock() 253 defer rr.mu.Unlock() 254 var cnt int 255 for _, a := range rr.addrs { 256 if a.addr == addr { 257 if a.connected { 258 return nil 259 } 260 a.connected = true 261 } 262 if a.connected { 263 cnt++ 264 } 265 } 266 // addr is only one which is connected. Notify the Get() callers who are blocking. 267 if cnt == 1 && rr.waitCh != nil { 268 close(rr.waitCh) 269 rr.waitCh = nil 270 } 271 return func(err error) { 272 rr.down(addr, err) 273 } 274 } 275 276 // down unsets the connected state of addr. 277 func (rr *roundRobin) down(addr Address, err error) { 278 rr.mu.Lock() 279 defer rr.mu.Unlock() 280 for _, a := range rr.addrs { 281 if addr == a.addr { 282 a.connected = false 283 break 284 } 285 } 286 } 287 288 // Get returns the next addr in the rotation. 289 func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) { 290 var ch chan struct{} 291 rr.mu.Lock() 292 if rr.done { 293 rr.mu.Unlock() 294 err = ErrClientConnClosing 295 return 296 } 297 298 if len(rr.addrs) > 0 { 299 if rr.next >= len(rr.addrs) { 300 rr.next = 0 301 } 302 next := rr.next 303 for { 304 a := rr.addrs[next] 305 next = (next + 1) % len(rr.addrs) 306 if a.connected { 307 addr = a.addr 308 rr.next = next 309 rr.mu.Unlock() 310 return 311 } 312 if next == rr.next { 313 // Has iterated all the possible address but none is connected. 314 break 315 } 316 } 317 } 318 if !opts.BlockingWait { 319 if len(rr.addrs) == 0 { 320 rr.mu.Unlock() 321 err = status.Errorf(codes.Unavailable, "there is no address available") 322 return 323 } 324 // Returns the next addr on rr.addrs for failfast RPCs. 325 addr = rr.addrs[rr.next].addr 326 rr.next++ 327 rr.mu.Unlock() 328 return 329 } 330 // Wait on rr.waitCh for non-failfast RPCs. 331 if rr.waitCh == nil { 332 ch = make(chan struct{}) 333 rr.waitCh = ch 334 } else { 335 ch = rr.waitCh 336 } 337 rr.mu.Unlock() 338 for { 339 select { 340 case <-ctx.Done(): 341 err = ctx.Err() 342 return 343 case <-ch: 344 rr.mu.Lock() 345 if rr.done { 346 rr.mu.Unlock() 347 err = ErrClientConnClosing 348 return 349 } 350 351 if len(rr.addrs) > 0 { 352 if rr.next >= len(rr.addrs) { 353 rr.next = 0 354 } 355 next := rr.next 356 for { 357 a := rr.addrs[next] 358 next = (next + 1) % len(rr.addrs) 359 if a.connected { 360 addr = a.addr 361 rr.next = next 362 rr.mu.Unlock() 363 return 364 } 365 if next == rr.next { 366 // Has iterated all the possible address but none is connected. 367 break 368 } 369 } 370 } 371 // The newly added addr got removed by Down() again. 372 if rr.waitCh == nil { 373 ch = make(chan struct{}) 374 rr.waitCh = ch 375 } else { 376 ch = rr.waitCh 377 } 378 rr.mu.Unlock() 379 } 380 } 381 } 382 383 func (rr *roundRobin) Notify() <-chan []Address { 384 return rr.addrCh 385 } 386 387 func (rr *roundRobin) Close() error { 388 rr.mu.Lock() 389 defer rr.mu.Unlock() 390 if rr.done { 391 return errBalancerClosed 392 } 393 rr.done = true 394 if rr.w != nil { 395 rr.w.Close() 396 } 397 if rr.waitCh != nil { 398 close(rr.waitCh) 399 rr.waitCh = nil 400 } 401 if rr.addrCh != nil { 402 close(rr.addrCh) 403 } 404 return nil 405 } 406 407 // pickFirst is used to test multi-addresses in one addrConn in which all addresses share the same addrConn. 408 // It is a wrapper around roundRobin balancer. The logic of all methods works fine because balancer.Get() 409 // returns the only address Up by resetTransport(). 410 type pickFirst struct { 411 *roundRobin 412 } 413 414 func pickFirstBalancerV1(r naming.Resolver) Balancer { 415 return &pickFirst{&roundRobin{r: r}} 416 } 417