Home | History | Annotate | Download | only in transport
      1 /*
      2  *
      3  * Copyright 2014 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 // Package transport defines and implements message oriented communication
     20 // channel to complete various transactions (e.g., an RPC).  It is meant for
     21 // grpc-internal usage and is not intended to be imported directly by users.
     22 package transport // externally used as import "google.golang.org/grpc/transport"
     23 
     24 import (
     25 	"errors"
     26 	"fmt"
     27 	"io"
     28 	"net"
     29 	"sync"
     30 	"sync/atomic"
     31 
     32 	"golang.org/x/net/context"
     33 	"google.golang.org/grpc/codes"
     34 	"google.golang.org/grpc/credentials"
     35 	"google.golang.org/grpc/keepalive"
     36 	"google.golang.org/grpc/metadata"
     37 	"google.golang.org/grpc/stats"
     38 	"google.golang.org/grpc/status"
     39 	"google.golang.org/grpc/tap"
     40 )
     41 
     42 // recvMsg represents the received msg from the transport. All transport
     43 // protocol specific info has been removed.
     44 type recvMsg struct {
     45 	data []byte
     46 	// nil: received some data
     47 	// io.EOF: stream is completed. data is nil.
     48 	// other non-nil error: transport failure. data is nil.
     49 	err error
     50 }
     51 
     52 // recvBuffer is an unbounded channel of recvMsg structs.
     53 // Note recvBuffer differs from controlBuffer only in that recvBuffer
     54 // holds a channel of only recvMsg structs instead of objects implementing "item" interface.
     55 // recvBuffer is written to much more often than
     56 // controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
     57 type recvBuffer struct {
     58 	c       chan recvMsg
     59 	mu      sync.Mutex
     60 	backlog []recvMsg
     61 	err     error
     62 }
     63 
     64 func newRecvBuffer() *recvBuffer {
     65 	b := &recvBuffer{
     66 		c: make(chan recvMsg, 1),
     67 	}
     68 	return b
     69 }
     70 
     71 func (b *recvBuffer) put(r recvMsg) {
     72 	b.mu.Lock()
     73 	if b.err != nil {
     74 		b.mu.Unlock()
     75 		// An error had occurred earlier, don't accept more
     76 		// data or errors.
     77 		return
     78 	}
     79 	b.err = r.err
     80 	if len(b.backlog) == 0 {
     81 		select {
     82 		case b.c <- r:
     83 			b.mu.Unlock()
     84 			return
     85 		default:
     86 		}
     87 	}
     88 	b.backlog = append(b.backlog, r)
     89 	b.mu.Unlock()
     90 }
     91 
     92 func (b *recvBuffer) load() {
     93 	b.mu.Lock()
     94 	if len(b.backlog) > 0 {
     95 		select {
     96 		case b.c <- b.backlog[0]:
     97 			b.backlog[0] = recvMsg{}
     98 			b.backlog = b.backlog[1:]
     99 		default:
    100 		}
    101 	}
    102 	b.mu.Unlock()
    103 }
    104 
    105 // get returns the channel that receives a recvMsg in the buffer.
    106 //
    107 // Upon receipt of a recvMsg, the caller should call load to send another
    108 // recvMsg onto the channel if there is any.
    109 func (b *recvBuffer) get() <-chan recvMsg {
    110 	return b.c
    111 }
    112 
    113 //
    114 // recvBufferReader implements io.Reader interface to read the data from
    115 // recvBuffer.
    116 type recvBufferReader struct {
    117 	ctx     context.Context
    118 	ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
    119 	recv    *recvBuffer
    120 	last    []byte // Stores the remaining data in the previous calls.
    121 	err     error
    122 }
    123 
    124 // Read reads the next len(p) bytes from last. If last is drained, it tries to
    125 // read additional data from recv. It blocks if there no additional data available
    126 // in recv. If Read returns any non-nil error, it will continue to return that error.
    127 func (r *recvBufferReader) Read(p []byte) (n int, err error) {
    128 	if r.err != nil {
    129 		return 0, r.err
    130 	}
    131 	n, r.err = r.read(p)
    132 	return n, r.err
    133 }
    134 
    135 func (r *recvBufferReader) read(p []byte) (n int, err error) {
    136 	if r.last != nil && len(r.last) > 0 {
    137 		// Read remaining data left in last call.
    138 		copied := copy(p, r.last)
    139 		r.last = r.last[copied:]
    140 		return copied, nil
    141 	}
    142 	select {
    143 	case <-r.ctxDone:
    144 		return 0, ContextErr(r.ctx.Err())
    145 	case m := <-r.recv.get():
    146 		r.recv.load()
    147 		if m.err != nil {
    148 			return 0, m.err
    149 		}
    150 		copied := copy(p, m.data)
    151 		r.last = m.data[copied:]
    152 		return copied, nil
    153 	}
    154 }
    155 
    156 type streamState uint32
    157 
    158 const (
    159 	streamActive    streamState = iota
    160 	streamWriteDone             // EndStream sent
    161 	streamReadDone              // EndStream received
    162 	streamDone                  // the entire stream is finished.
    163 )
    164 
    165 // Stream represents an RPC in the transport layer.
    166 type Stream struct {
    167 	id           uint32
    168 	st           ServerTransport    // nil for client side Stream
    169 	ctx          context.Context    // the associated context of the stream
    170 	cancel       context.CancelFunc // always nil for client side Stream
    171 	done         chan struct{}      // closed at the end of stream to unblock writers. On the client side.
    172 	ctxDone      <-chan struct{}    // same as done chan but for server side. Cache of ctx.Done() (for performance)
    173 	method       string             // the associated RPC method of the stream
    174 	recvCompress string
    175 	sendCompress string
    176 	buf          *recvBuffer
    177 	trReader     io.Reader
    178 	fc           *inFlow
    179 	recvQuota    uint32
    180 	wq           *writeQuota
    181 
    182 	// Callback to state application's intentions to read data. This
    183 	// is used to adjust flow control, if needed.
    184 	requestRead func(int)
    185 
    186 	headerChan chan struct{} // closed to indicate the end of header metadata.
    187 	headerDone uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times.
    188 
    189 	// hdrMu protects header and trailer metadata on the server-side.
    190 	hdrMu   sync.Mutex
    191 	header  metadata.MD // the received header metadata.
    192 	trailer metadata.MD // the key-value map of trailer metadata.
    193 
    194 	// On the server-side, headerSent is atomically set to 1 when the headers are sent out.
    195 	headerSent uint32
    196 
    197 	state streamState
    198 
    199 	// On client-side it is the status error received from the server.
    200 	// On server-side it is unused.
    201 	status *status.Status
    202 
    203 	bytesReceived uint32 // indicates whether any bytes have been received on this stream
    204 	unprocessed   uint32 // set if the server sends a refused stream or GOAWAY including this stream
    205 
    206 	// contentSubtype is the content-subtype for requests.
    207 	// this must be lowercase or the behavior is undefined.
    208 	contentSubtype string
    209 }
    210 
    211 // isHeaderSent is only valid on the server-side.
    212 func (s *Stream) isHeaderSent() bool {
    213 	return atomic.LoadUint32(&s.headerSent) == 1
    214 }
    215 
    216 // updateHeaderSent updates headerSent and returns true
    217 // if it was alreay set. It is valid only on server-side.
    218 func (s *Stream) updateHeaderSent() bool {
    219 	return atomic.SwapUint32(&s.headerSent, 1) == 1
    220 }
    221 
    222 func (s *Stream) swapState(st streamState) streamState {
    223 	return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
    224 }
    225 
    226 func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
    227 	return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
    228 }
    229 
    230 func (s *Stream) getState() streamState {
    231 	return streamState(atomic.LoadUint32((*uint32)(&s.state)))
    232 }
    233 
    234 func (s *Stream) waitOnHeader() error {
    235 	if s.headerChan == nil {
    236 		// On the server headerChan is always nil since a stream originates
    237 		// only after having received headers.
    238 		return nil
    239 	}
    240 	select {
    241 	case <-s.ctx.Done():
    242 		return ContextErr(s.ctx.Err())
    243 	case <-s.headerChan:
    244 		return nil
    245 	}
    246 }
    247 
    248 // RecvCompress returns the compression algorithm applied to the inbound
    249 // message. It is empty string if there is no compression applied.
    250 func (s *Stream) RecvCompress() string {
    251 	if err := s.waitOnHeader(); err != nil {
    252 		return ""
    253 	}
    254 	return s.recvCompress
    255 }
    256 
    257 // SetSendCompress sets the compression algorithm to the stream.
    258 func (s *Stream) SetSendCompress(str string) {
    259 	s.sendCompress = str
    260 }
    261 
    262 // Done returns a chanel which is closed when it receives the final status
    263 // from the server.
    264 func (s *Stream) Done() <-chan struct{} {
    265 	return s.done
    266 }
    267 
    268 // Header acquires the key-value pairs of header metadata once it
    269 // is available. It blocks until i) the metadata is ready or ii) there is no
    270 // header metadata or iii) the stream is canceled/expired.
    271 func (s *Stream) Header() (metadata.MD, error) {
    272 	err := s.waitOnHeader()
    273 	// Even if the stream is closed, header is returned if available.
    274 	select {
    275 	case <-s.headerChan:
    276 		if s.header == nil {
    277 			return nil, nil
    278 		}
    279 		return s.header.Copy(), nil
    280 	default:
    281 	}
    282 	return nil, err
    283 }
    284 
    285 // Trailer returns the cached trailer metedata. Note that if it is not called
    286 // after the entire stream is done, it could return an empty MD. Client
    287 // side only.
    288 // It can be safely read only after stream has ended that is either read
    289 // or write have returned io.EOF.
    290 func (s *Stream) Trailer() metadata.MD {
    291 	c := s.trailer.Copy()
    292 	return c
    293 }
    294 
    295 // ServerTransport returns the underlying ServerTransport for the stream.
    296 // The client side stream always returns nil.
    297 func (s *Stream) ServerTransport() ServerTransport {
    298 	return s.st
    299 }
    300 
    301 // ContentSubtype returns the content-subtype for a request. For example, a
    302 // content-subtype of "proto" will result in a content-type of
    303 // "application/grpc+proto". This will always be lowercase.  See
    304 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
    305 // more details.
    306 func (s *Stream) ContentSubtype() string {
    307 	return s.contentSubtype
    308 }
    309 
    310 // Context returns the context of the stream.
    311 func (s *Stream) Context() context.Context {
    312 	return s.ctx
    313 }
    314 
    315 // Method returns the method for the stream.
    316 func (s *Stream) Method() string {
    317 	return s.method
    318 }
    319 
    320 // Status returns the status received from the server.
    321 // Status can be read safely only after the stream has ended,
    322 // that is, read or write has returned io.EOF.
    323 func (s *Stream) Status() *status.Status {
    324 	return s.status
    325 }
    326 
    327 // SetHeader sets the header metadata. This can be called multiple times.
    328 // Server side only.
    329 // This should not be called in parallel to other data writes.
    330 func (s *Stream) SetHeader(md metadata.MD) error {
    331 	if md.Len() == 0 {
    332 		return nil
    333 	}
    334 	if s.isHeaderSent() || s.getState() == streamDone {
    335 		return ErrIllegalHeaderWrite
    336 	}
    337 	s.hdrMu.Lock()
    338 	s.header = metadata.Join(s.header, md)
    339 	s.hdrMu.Unlock()
    340 	return nil
    341 }
    342 
    343 // SendHeader sends the given header metadata. The given metadata is
    344 // combined with any metadata set by previous calls to SetHeader and
    345 // then written to the transport stream.
    346 func (s *Stream) SendHeader(md metadata.MD) error {
    347 	t := s.ServerTransport()
    348 	return t.WriteHeader(s, md)
    349 }
    350 
    351 // SetTrailer sets the trailer metadata which will be sent with the RPC status
    352 // by the server. This can be called multiple times. Server side only.
    353 // This should not be called parallel to other data writes.
    354 func (s *Stream) SetTrailer(md metadata.MD) error {
    355 	if md.Len() == 0 {
    356 		return nil
    357 	}
    358 	if s.getState() == streamDone {
    359 		return ErrIllegalHeaderWrite
    360 	}
    361 	s.hdrMu.Lock()
    362 	s.trailer = metadata.Join(s.trailer, md)
    363 	s.hdrMu.Unlock()
    364 	return nil
    365 }
    366 
    367 func (s *Stream) write(m recvMsg) {
    368 	s.buf.put(m)
    369 }
    370 
    371 // Read reads all p bytes from the wire for this stream.
    372 func (s *Stream) Read(p []byte) (n int, err error) {
    373 	// Don't request a read if there was an error earlier
    374 	if er := s.trReader.(*transportReader).er; er != nil {
    375 		return 0, er
    376 	}
    377 	s.requestRead(len(p))
    378 	return io.ReadFull(s.trReader, p)
    379 }
    380 
    381 // tranportReader reads all the data available for this Stream from the transport and
    382 // passes them into the decoder, which converts them into a gRPC message stream.
    383 // The error is io.EOF when the stream is done or another non-nil error if
    384 // the stream broke.
    385 type transportReader struct {
    386 	reader io.Reader
    387 	// The handler to control the window update procedure for both this
    388 	// particular stream and the associated transport.
    389 	windowHandler func(int)
    390 	er            error
    391 }
    392 
    393 func (t *transportReader) Read(p []byte) (n int, err error) {
    394 	n, err = t.reader.Read(p)
    395 	if err != nil {
    396 		t.er = err
    397 		return
    398 	}
    399 	t.windowHandler(n)
    400 	return
    401 }
    402 
    403 // BytesReceived indicates whether any bytes have been received on this stream.
    404 func (s *Stream) BytesReceived() bool {
    405 	return atomic.LoadUint32(&s.bytesReceived) == 1
    406 }
    407 
    408 // Unprocessed indicates whether the server did not process this stream --
    409 // i.e. it sent a refused stream or GOAWAY including this stream ID.
    410 func (s *Stream) Unprocessed() bool {
    411 	return atomic.LoadUint32(&s.unprocessed) == 1
    412 }
    413 
    414 // GoString is implemented by Stream so context.String() won't
    415 // race when printing %#v.
    416 func (s *Stream) GoString() string {
    417 	return fmt.Sprintf("<stream: %p, %v>", s, s.method)
    418 }
    419 
    420 // state of transport
    421 type transportState int
    422 
    423 const (
    424 	reachable transportState = iota
    425 	closing
    426 	draining
    427 )
    428 
    429 // ServerConfig consists of all the configurations to establish a server transport.
    430 type ServerConfig struct {
    431 	MaxStreams            uint32
    432 	AuthInfo              credentials.AuthInfo
    433 	InTapHandle           tap.ServerInHandle
    434 	StatsHandler          stats.Handler
    435 	KeepaliveParams       keepalive.ServerParameters
    436 	KeepalivePolicy       keepalive.EnforcementPolicy
    437 	InitialWindowSize     int32
    438 	InitialConnWindowSize int32
    439 	WriteBufferSize       int
    440 	ReadBufferSize        int
    441 	ChannelzParentID      int64
    442 }
    443 
    444 // NewServerTransport creates a ServerTransport with conn or non-nil error
    445 // if it fails.
    446 func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
    447 	return newHTTP2Server(conn, config)
    448 }
    449 
    450 // ConnectOptions covers all relevant options for communicating with the server.
    451 type ConnectOptions struct {
    452 	// UserAgent is the application user agent.
    453 	UserAgent string
    454 	// Authority is the :authority pseudo-header to use. This field has no effect if
    455 	// TransportCredentials is set.
    456 	Authority string
    457 	// Dialer specifies how to dial a network address.
    458 	Dialer func(context.Context, string) (net.Conn, error)
    459 	// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
    460 	FailOnNonTempDialError bool
    461 	// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
    462 	PerRPCCredentials []credentials.PerRPCCredentials
    463 	// TransportCredentials stores the Authenticator required to setup a client connection.
    464 	TransportCredentials credentials.TransportCredentials
    465 	// KeepaliveParams stores the keepalive parameters.
    466 	KeepaliveParams keepalive.ClientParameters
    467 	// StatsHandler stores the handler for stats.
    468 	StatsHandler stats.Handler
    469 	// InitialWindowSize sets the initial window size for a stream.
    470 	InitialWindowSize int32
    471 	// InitialConnWindowSize sets the initial window size for a connection.
    472 	InitialConnWindowSize int32
    473 	// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
    474 	WriteBufferSize int
    475 	// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
    476 	ReadBufferSize int
    477 	// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
    478 	ChannelzParentID int64
    479 }
    480 
    481 // TargetInfo contains the information of the target such as network address and metadata.
    482 type TargetInfo struct {
    483 	Addr      string
    484 	Metadata  interface{}
    485 	Authority string
    486 }
    487 
    488 // NewClientTransport establishes the transport with the required ConnectOptions
    489 // and returns it to the caller.
    490 func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) {
    491 	return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess)
    492 }
    493 
    494 // Options provides additional hints and information for message
    495 // transmission.
    496 type Options struct {
    497 	// Last indicates whether this write is the last piece for
    498 	// this stream.
    499 	Last bool
    500 
    501 	// Delay is a hint to the transport implementation for whether
    502 	// the data could be buffered for a batching write. The
    503 	// transport implementation may ignore the hint.
    504 	Delay bool
    505 }
    506 
    507 // CallHdr carries the information of a particular RPC.
    508 type CallHdr struct {
    509 	// Host specifies the peer's host.
    510 	Host string
    511 
    512 	// Method specifies the operation to perform.
    513 	Method string
    514 
    515 	// SendCompress specifies the compression algorithm applied on
    516 	// outbound message.
    517 	SendCompress string
    518 
    519 	// Creds specifies credentials.PerRPCCredentials for a call.
    520 	Creds credentials.PerRPCCredentials
    521 
    522 	// Flush indicates whether a new stream command should be sent
    523 	// to the peer without waiting for the first data. This is
    524 	// only a hint.
    525 	// If it's true, the transport may modify the flush decision
    526 	// for performance purposes.
    527 	// If it's false, new stream will never be flushed.
    528 	Flush bool
    529 
    530 	// ContentSubtype specifies the content-subtype for a request. For example, a
    531 	// content-subtype of "proto" will result in a content-type of
    532 	// "application/grpc+proto". The value of ContentSubtype must be all
    533 	// lowercase, otherwise the behavior is undefined. See
    534 	// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
    535 	// for more details.
    536 	ContentSubtype string
    537 }
    538 
    539 // ClientTransport is the common interface for all gRPC client-side transport
    540 // implementations.
    541 type ClientTransport interface {
    542 	// Close tears down this transport. Once it returns, the transport
    543 	// should not be accessed any more. The caller must make sure this
    544 	// is called only once.
    545 	Close() error
    546 
    547 	// GracefulClose starts to tear down the transport. It stops accepting
    548 	// new RPCs and wait the completion of the pending RPCs.
    549 	GracefulClose() error
    550 
    551 	// Write sends the data for the given stream. A nil stream indicates
    552 	// the write is to be performed on the transport as a whole.
    553 	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
    554 
    555 	// NewStream creates a Stream for an RPC.
    556 	NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
    557 
    558 	// CloseStream clears the footprint of a stream when the stream is
    559 	// not needed any more. The err indicates the error incurred when
    560 	// CloseStream is called. Must be called when a stream is finished
    561 	// unless the associated transport is closing.
    562 	CloseStream(stream *Stream, err error)
    563 
    564 	// Error returns a channel that is closed when some I/O error
    565 	// happens. Typically the caller should have a goroutine to monitor
    566 	// this in order to take action (e.g., close the current transport
    567 	// and create a new one) in error case. It should not return nil
    568 	// once the transport is initiated.
    569 	Error() <-chan struct{}
    570 
    571 	// GoAway returns a channel that is closed when ClientTransport
    572 	// receives the draining signal from the server (e.g., GOAWAY frame in
    573 	// HTTP/2).
    574 	GoAway() <-chan struct{}
    575 
    576 	// GetGoAwayReason returns the reason why GoAway frame was received.
    577 	GetGoAwayReason() GoAwayReason
    578 
    579 	// IncrMsgSent increments the number of message sent through this transport.
    580 	IncrMsgSent()
    581 
    582 	// IncrMsgRecv increments the number of message received through this transport.
    583 	IncrMsgRecv()
    584 }
    585 
    586 // ServerTransport is the common interface for all gRPC server-side transport
    587 // implementations.
    588 //
    589 // Methods may be called concurrently from multiple goroutines, but
    590 // Write methods for a given Stream will be called serially.
    591 type ServerTransport interface {
    592 	// HandleStreams receives incoming streams using the given handler.
    593 	HandleStreams(func(*Stream), func(context.Context, string) context.Context)
    594 
    595 	// WriteHeader sends the header metadata for the given stream.
    596 	// WriteHeader may not be called on all streams.
    597 	WriteHeader(s *Stream, md metadata.MD) error
    598 
    599 	// Write sends the data for the given stream.
    600 	// Write may not be called on all streams.
    601 	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
    602 
    603 	// WriteStatus sends the status of a stream to the client.  WriteStatus is
    604 	// the final call made on a stream and always occurs.
    605 	WriteStatus(s *Stream, st *status.Status) error
    606 
    607 	// Close tears down the transport. Once it is called, the transport
    608 	// should not be accessed any more. All the pending streams and their
    609 	// handlers will be terminated asynchronously.
    610 	Close() error
    611 
    612 	// RemoteAddr returns the remote network address.
    613 	RemoteAddr() net.Addr
    614 
    615 	// Drain notifies the client this ServerTransport stops accepting new RPCs.
    616 	Drain()
    617 
    618 	// IncrMsgSent increments the number of message sent through this transport.
    619 	IncrMsgSent()
    620 
    621 	// IncrMsgRecv increments the number of message received through this transport.
    622 	IncrMsgRecv()
    623 }
    624 
    625 // streamErrorf creates an StreamError with the specified error code and description.
    626 func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
    627 	return StreamError{
    628 		Code: c,
    629 		Desc: fmt.Sprintf(format, a...),
    630 	}
    631 }
    632 
    633 // connectionErrorf creates an ConnectionError with the specified error description.
    634 func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
    635 	return ConnectionError{
    636 		Desc: fmt.Sprintf(format, a...),
    637 		temp: temp,
    638 		err:  e,
    639 	}
    640 }
    641 
    642 // ConnectionError is an error that results in the termination of the
    643 // entire connection and the retry of all the active streams.
    644 type ConnectionError struct {
    645 	Desc string
    646 	temp bool
    647 	err  error
    648 }
    649 
    650 func (e ConnectionError) Error() string {
    651 	return fmt.Sprintf("connection error: desc = %q", e.Desc)
    652 }
    653 
    654 // Temporary indicates if this connection error is temporary or fatal.
    655 func (e ConnectionError) Temporary() bool {
    656 	return e.temp
    657 }
    658 
    659 // Origin returns the original error of this connection error.
    660 func (e ConnectionError) Origin() error {
    661 	// Never return nil error here.
    662 	// If the original error is nil, return itself.
    663 	if e.err == nil {
    664 		return e
    665 	}
    666 	return e.err
    667 }
    668 
    669 var (
    670 	// ErrConnClosing indicates that the transport is closing.
    671 	ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
    672 	// errStreamDrain indicates that the stream is rejected because the
    673 	// connection is draining. This could be caused by goaway or balancer
    674 	// removing the address.
    675 	errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining")
    676 	// errStreamDone is returned from write at the client side to indiacte application
    677 	// layer of an error.
    678 	errStreamDone = errors.New("the stream is done")
    679 	// StatusGoAway indicates that the server sent a GOAWAY that included this
    680 	// stream's ID in unprocessed RPCs.
    681 	statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
    682 )
    683 
    684 // TODO: See if we can replace StreamError with status package errors.
    685 
    686 // StreamError is an error that only affects one stream within a connection.
    687 type StreamError struct {
    688 	Code codes.Code
    689 	Desc string
    690 }
    691 
    692 func (e StreamError) Error() string {
    693 	return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
    694 }
    695 
    696 // GoAwayReason contains the reason for the GoAway frame received.
    697 type GoAwayReason uint8
    698 
    699 const (
    700 	// GoAwayInvalid indicates that no GoAway frame is received.
    701 	GoAwayInvalid GoAwayReason = 0
    702 	// GoAwayNoReason is the default value when GoAway frame is received.
    703 	GoAwayNoReason GoAwayReason = 1
    704 	// GoAwayTooManyPings indicates that a GoAway frame with
    705 	// ErrCodeEnhanceYourCalm was received and that the debug data said
    706 	// "too_many_pings".
    707 	GoAwayTooManyPings GoAwayReason = 2
    708 )
    709